编程 Temporal 深度实战:当分布式工作流从「工程噩梦」变成「写代码一样简单」

2026-06-16 08:51:07 +0800 CST views 10

Temporal 深度实战:当分布式工作流从「工程噩梦」变成「写代码一样简单」

背景介绍

在微服务架构统治后端世界的今天,有一个问题始终困扰着所有后端工程师:跨服务的业务流程到底该怎么组织?

订单处理、支付对账、数据同步、批量任务……这些业务逻辑横跨多个服务,涉及重试、超时、补偿、幂等、持久化等无数细节。传统方案往往是这样的:

# ❌ 传统方式:手动编排,脆弱得像纸牌屋
async def process_order(order_id: str):
    try:
        order = await order_service.get(order_id)
        payment = await payment_service.charge(order.amount)
        await inventory_service.reserve(order.items)
        await shipping_service.schedule(order.address)
        await notification_service.send(order.user_id, "订单已确认")
    except PaymentError:
        await inventory_service.release(order.items)
        raise
    except ShippingError:
        await payment_service.refund(payment.id)
        await inventory_service.release(order.items)
        raise

这段代码存在一连串致命问题:

  1. 无重试机制:网络抖动直接失败,用户体验差
  2. 无持久化:进程崩溃后状态丢失,从头来过
  3. 补偿逻辑复杂:每一步失败都需要手动回滚,逻辑分散在各处
  4. 无法查询状态:订单进行到哪一步了?只能查日志
  5. 无法暂停/恢复:高峰期积压的订单重启后全部丢失
  6. 无法并发控制:多个实例同时处理同一订单,数据乱了

这些问题在单个服务内部就够头疼了,到了跨服务场景更是指数级爆炸。Saga 模式、Choreography 模式、可靠消息队列……每种方案都有自己的复杂度,团队学不动,维护成本高,新人接手更是两眼一抹黑。

Temporal 就是在这种背景下诞生的。它是 Uber 内部孵化的开源项目(最初叫 Cadence),将工作流编排提升到了一个全新的抽象层次:用普通代码写分布式业务流程,框架自动处理重试、持久化、状态查询、并发控制等所有非功能性需求。


什么是 Temporal

Temporal 是一个分布式工作流运行时(Workflow Runtime),核心思想是:你的业务代码不需要知道它在分布式环境中运行。

它通过「工作流」和「活动」两个核心原语,将业务逻辑与运行时关注点分离:

  • Activity(活动):普通的单次操作,如「调用支付 API」「发送邮件」「写入数据库」
  • Workflow(工作流):编排活动的业务逻辑,本身是纯代码,由 Temporal 持久化执行

Temporal 的关键特性:

  • 持久化执行:工作流状态自动保存,进程重启、机器宕机都不丢
  • 确定性执行:同样的输入,Workflow 代码执行结果完全一致
  • 自动重试:Activity 失败自动重试,Workflow 失败从上一个检查点恢复
  • 时间语义:支持 Timer、SideEffect、查询接口等内置语义
  • 多语言 SDK:Go、TypeScript/JavaScript、Python、Java、PHP、Ruby、.NET
// ✅ Temporal 方式:业务逻辑清晰,框架处理一切
func OrderProcessingWorkflow(ctx workflow.Context, orderID string) error {
    // 1. 获取订单
    var order Order
    err := workflow.ExecuteActivity(ctx, orderService.Get, orderID).Get(ctx, &order)
    if err != nil {
        return err
    }

    // 2. 收费(带重试)
    var payment PaymentResult
    err = workflow.ExecuteActivity(ctx, paymentService.Charge, order.Amount).Get(ctx, &payment)
    if err != nil {
        return err // Temporal 自动处理失败补偿
    }

    // 3. 预约库存
    err = workflow.ExecuteActivity(ctx, inventoryService.Reserve, order.Items).Get(ctx, nil)
    if err != nil {
        // 补偿:退款
        _ = workflow.ExecuteActivity(ctx, paymentService.Refund, payment.ID).Get(ctx, nil)
        return err
    }

    // 4. 调度配送
    err = workflow.ExecuteActivity(ctx, shippingService.Schedule, order.Address).Get(ctx, nil)
    if err != nil {
        return err
    }

    // 5. 发送通知
    return workflow.ExecuteActivity(ctx, notificationService.Send, order.UserID, "已确认").Get(ctx, nil)
}

这段代码读起来就像同步代码,但实际执行时:

  • 每个 Activity 调用都是持久化且可重试的
  • 工作流在每个 await 点自动保存检查点
  • 即使服务器重启,从检查点恢复后继续执行
  • Activity 失败时,后续的补偿逻辑由开发者决定(Temporal 提供 ContinueAsNew、Signal 等机制)

核心概念深度解析

2.1 Workflow 的确定性原则

这是 Temporal 中最重要也是最容易踩坑的概念。

Temporal Workflow 是确定性的(Deterministic)。 同一个 WorkflowDefinition + 同一个输入,执行一万次,结果必须完全一致。这是 Temporal 能在任意时间点暂停、恢复、重放的基础。

要实现确定性,Workflow 代码必须遵守以下规则:

规则一:禁止直接调用外部服务

// ❌ 错误:直接调用外部服务,无法重试和恢复
func BadWorkflow(ctx workflow.Context) error {
    result := http.Get("https://api.example.com/data") // 不允许!
    return process(result)
}

// ✅ 正确:通过 Activity 间接调用
func GoodWorkflow(ctx workflow.Context) error {
    var result MyData
    err := workflow.ExecuteActivity(ctx, FetchDataActivity, "https://api.example.com/data").Get(ctx, &result)
    return err
}

规则二:禁止使用系统时间作为业务判断依据

// ❌ 错误:time.Now() 每次调用值不同,破坏确定性
func BadWorkflow(ctx workflow.Context) error {
    now := time.Now()
    if now.Hour() > 18 {
        // 下午6点后不处理
        return nil
    }
    // ...
}

// ✅ 正确:使用 Workflow 的时间语义
func GoodWorkflow(ctx workflow.Context) error {
    // Now() 在 Workflow 中是"已记录的墙钟时间",重放时值不变
    now := workflow.Now(ctx)
    if now.Hour() > 18 {
        return nil
    }
    // ...
}

规则三:禁止使用随机数作为业务分支

// ❌ 错误:math/rand 每次执行值不同
func BadWorkflow(ctx workflow.Context) error {
    if rand.Int()%2 == 0 {
        // 随机逻辑
    }
}

// ✅ 正确:使用 SideEffect 或确定性的 UUID
func GoodWorkflow(ctx workflow.Context) error {
    var randomSeed int64
    workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
        return rand.Int() // 重放时返回缓存的值
    }).Get(ctx, &randomSeed)
    // ...
}

规则四:禁止使用 map 等非线程安全数据结构

Workflow 执行在单线程中,但会并发处理 Signal。Go 的 map 不是并发安全的。

// ✅ 正确:使用 Workflow.GetInfo 获取状态
var processedItems []string
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
    return processedItems // 持久化到 EventHistory
}).Get(ctx, &processedItems)

2.2 Activity 的设计哲学

Activity 是 Workflow 与外部世界交互的唯一通道。一个好的 Activity 设计需要考虑:

Activity 的失败处理

// 定义 Activity 选项
activityOpts := activity.StartToCloseTimeout(5 * time.Minute)
retryOpts := retry.New ExponentialBackoffRetryPolicy(
    1 * time.Second,  // 初始间隔
    10 * time.Minute, // 最大间隔
    2.0,              // 退火系数
)

// 或者使用 Activity 的 options 链式配置
activityOpts := &workflow.ActivityOptions{
    StartToCloseTimeout: 5 * time.Minute,
    RetryPolicy: &temporal.RetryPolicy{
        InitialInterval:        time.Second,
        BackoffCoefficient:     2.0,
        MaximumInterval:        10 * time.Minute,
        MaximumAttempts:        5,
        NonRetryableErrorReasons: []string{
            "invalid-order-status",
            "payment-declined-permanently",
        },
    },
}

Activity 心跳机制

长时间运行的 Activity 需要定期报告进度,这样即使 Temporal Server 重启,也能从上次心跳恢复:

func LongRunningActivity(ctx context.Context, input JobInput) error {
    activity.RecordHeartbeat(ctx, Progress{Percent: 0})

    // Step 1: 处理前 50%
    if err := processFirstHalf(ctx, input); err != nil {
        return err
    }
    activity.RecordHeartbeat(ctx, Progress{Percent: 50})

    // Step 2: 处理后 50%
    if err := processSecondHalf(ctx, input); err != nil {
        return err
    }
    activity.RecordHeartbeat(ctx, Progress{Percent: 100})

    return nil
}

Activity 的并发执行

一个 Workflow 可以同时启动多个 Activity:

func ParallelProcessingWorkflow(ctx workflow.Context, items []string) error {
    // 启动多个 Activity 并发执行
    futures := make([]workflow.Future, len(items))
    for i, item := range items {
        futures[i] = workflow.ExecuteActivity(ctx, ProcessItemActivity, item)
    }

    // 等待所有 Activity 完成
    for i, future := range futures {
        var result Result
        if err := future.Get(ctx, &result); err != nil {
            return fmt.Errorf("item %s failed: %w", items[i], err)
        }
    }

    return nil
}

2.3 Signal(信号)与 Query(查询)

这是 Temporal 最强大的两个交互机制。

Signal 用于向运行中的 Workflow 发送消息:

// 客户端发送 Signal
workflowHandle, _ := c.Client.GetWorkflowHandle(ctx, "order-123")
err := workflowHandle.Signal(ctx, "cancel-order", CancelSignal{Reason: "customer-requested"})
// Workflow 中接收 Signal
func OrderWorkflow(ctx workflow.Context) error {
    // 注册 Signal handler
    selector := workflow.NewSelector(ctx)
    var cancelRequested bool
    var cancelReason string

    selector.AddSignal(ctx, "cancel-order", func(signalCtx workflow.Context, s CancelSignal) {
        cancelRequested = true
        cancelReason = s.Reason
    })

    // 业务主循环
    for !cancelRequested {
        // ... 处理订单逻辑 ...
        selector.Select(ctx)
    }

    // 收到取消信号,执行清理
    return fmt.Errorf("cancelled: %s", cancelReason)
}

Query 用于查询运行中 Workflow 的状态(只读,不影响执行):

// 注册 Query handler
workflow.SetQueryHandler(ctx, "get-order-status", func() (OrderStatus, error) {
    return OrderStatus{
        Phase: currentPhase,
        Items: processedItems,
        ETA:   estimatedTime,
    }, nil
})
// 客户端查询
var status OrderStatus
err := workflowHandle.Query(ctx, "get-order-status", &status)

2.4 Child Workflow(子工作流)

Temporal 支持 Workflow 嵌套调用,子 Workflow 继承父 Workflow 的超时配置,且可以独立重试:

func ParentWorkflow(ctx workflow.Context, batchID string) error {
    var batch Batch
    err := workflow.ExecuteActivity(ctx, FetchBatchActivity, batchID).Get(ctx, &batch)
    if err != nil {
        return err
    }

    // 启动子 Workflow 处理每个批次项
    childOpts := workflow.ChildWorkflowOptions{
        WorkflowID:        fmt.Sprintf("batch-child-%s", batch.ID),
        ParentClosePolicy:  workflow.ParentClosePolicyTerminate,
        ExecutionTimeout:   1 * time.Hour,
    }

    childCtx := workflow.WithChildOptions(ctx, childOpts)
    futures := make([]workflow.Future, len(batch.Items))
    for i, item := range batch.Items {
        futures[i] = workflow.ExecuteChildWorkflow(childCtx, ProcessItemWorkflow, item)
    }

    // 等待所有子 Workflow 完成
    for i, future := range futures {
        if err := future.Get(ctx, nil); err != nil {
            log.Printf("子工作流 %d 失败: %v", i, err)
        }
    }

    return nil
}

架构分析:Temporal 是怎么工作的

3.1 整体架构

Temporal 的架构分为两部分:Temporal ServerWorker Process

┌─────────────────────────────────────────────────────────┐
│                     Temporal Server                     │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐            │
│  │ Frontend │  │  History │  │ Matching │            │
│  │  Service │──│  Service │──│  Service │            │
│  └──────────┘  └──────────┘  └──────────┘            │
│       │              │              │                   │
│       └──────────────┴──────────────┘                   │
│                    Persistence Layer                    │
│         (PostgreSQL / MySQL / Cassandra)              │
└─────────────────────────────────────────────────────────┘
         ▲                                    ▲
         │  long-poll                      task
         │  commands                     queue
         ▼                                    ▼
┌─────────────────┐               ┌─────────────────┐
│  Worker Process  │               │  Worker Process  │
│  [Go Worker]     │               │  [TS Worker]     │
│  ┌─────────────┐ │               │  ┌─────────────┐  │
│  │ Workflow    │ │               │  │ Activity    │  │
│  │ Worker      │ │               │  │ Worker      │  │
│  └─────────────┘ │               │  └─────────────┘  │
│  ┌─────────────┐ │               │  ┌─────────────┐  │
│  │ Activity    │ │               │  │ Workflow    │  │
│  │ Worker      │ │               │  │ Worker      │  │
│  └─────────────┘ │               │  └─────────────┘  │
└─────────────────┘               └─────────────────┘

History Service 是 Temporal Server 的核心,它将每个 Workflow 的完整执行历史(EventHistory)持久化到数据库。这使得:

  1. 任何节点都能恢复 Workflow:EventHistory 包含了 Workflow 从开始到现在的所有「事件」,Worker 根据 EventHistory 重放即可恢复状态
  2. Workflow 代码与执行位置解耦:Workflow 可以在任何语言、任何节点的 Worker 上执行
  3. 重放调试:可以随时将 Workflow 重放到任意时间点,调试生产问题

3.2 EventHistory:持久化的本质

EventHistory 是 Temporal 最核心的数据结构。每次 Workflow 执行,History Service 都会记录一个事件序列:

Event 1: WorkflowExecutionStarted       { workflowType, input }
Event 2: ActivityTaskScheduled          { activityType, input, activityId }
Event 3: ActivityTaskCompleted           { activityId, result }
Event 4: ActivityTaskScheduled          { activityType, input, activityId }
Event 5: ActivityTaskFailed             { activityId, reason }
Event 6: WorkflowExecutionContinuedAsNew { newRunId }

这个序列是追加写入的,每个事件都是幂等的。当 Workflow 需要恢复时,Worker 读取完整的 EventHistory,按顺序重放每个事件,重新构建内存状态。

这带来了一个非常重要的特性:Workflow 的状态完全由 EventHistory 决定,而不是由 Worker 的内存状态决定。 你可以在 Workflow 运行时修改 Workflow 代码,然后重启 Worker——重启后的 Worker 会用新代码重放旧的 EventHistory,只要代码修改是「向前兼容」的(不改变已有事件的处理逻辑),Workflow 就能继续正常执行。

3.3 多语言 SDK 的实现原理

Temporal 的多语言 SDK 并不是简单地「用不同语言调用同一个服务器」,而是一个更巧妙的设计:

每个 SDK 都包含:

  1. Workflow Replayer:从 EventHistory 重放 Workflow 逻辑
  2. Activity Executor:执行具体的 Activity 逻辑
  3. Worker Process:注册 Workflow 和 Activity 处理函数

当你用 Go 写一个 Workflow 框架,然后用 TypeScript 写 Activity 实现——这在 Temporal 中是支持的,因为它们通过 task queue 通信:

// Go Worker:只负责运行 Workflow
worker.RegisterWorkflowImplementationFunc(MyWorkflow)

// TypeScript Worker:负责运行 Activity
worker.registerActivity('fetchUser', fetchUserActivity)

代码实战:构建一个订单处理系统

4.1 项目结构

temporal-order-demo/
├── cmd/
│   ├── worker/
│   │   └── main.go
│   └── starter/
│       └── main.go
├── activities/
│   └── order_activities.go
├── workflows/
│   └── order_workflow.go
├── shared/
│   ├── constants.go
│   └── types.go
├── docker-compose.yaml
└── go.mod

4.2 依赖配置

// go.mod
module temporal-order-demo

go 1.21

require (
    go.temporal.io/sdk v1.25.0
    go.temporal.io/sdk/contrib/tools/workflowcheck v1.0.0
    github.com/google/uuid v1.6.0
    go.uber.org/fx v1.20.0
)

4.3 定义类型

// shared/types.go
package shared

import (
    "time"
)

type Order struct {
    ID         string
    CustomerID string
    Items      []OrderItem
    TotalAmount float64
    Status      string
    CreatedAt   time.Time
}

type OrderItem struct {
    ProductID string
    Name      string
    Quantity  int
    Price     float64
}

type PaymentResult struct {
    TransactionID string
    Amount        float64
    Method        string
    CapturedAt    time.Time
}

type ShippingResult struct {
    TrackingNumber string
    Carrier        string
    EstimatedDays  int
    ShippedAt      time.Time
}

type CancelSignal struct {
    Reason string
}

const (
    TaskQueueName = "order-processing"
    WorkflowIDPrefix = "order-processing"
)

4.4 实现 Activity

// activities/order_activities.go
package activities

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "time"

    "temporal-order-demo/shared"
)

type OrderActivities struct{}

func NewOrderActivities() *OrderActivities {
    return &OrderActivities{}
}

// ValidateOrder 验证订单合法性
func (a *OrderActivities) ValidateOrder(ctx context.Context, order *shared.Order) error {
    log.Printf("[ValidateOrder] 验证订单 %s, 总额: %.2f", order.ID, order.TotalAmount)

    if len(order.Items) == 0 {
        return fmt.Errorf("订单项不能为空")
    }

    if order.TotalAmount <= 0 {
        return fmt.Errorf("订单金额必须大于0")
    }

    // 模拟数据库查询延迟
    time.Sleep(100 * time.Millisecond)

    log.Printf("[ValidateOrder] 订单 %s 验证通过", order.ID)
    return nil
}

// ChargePayment 收取支付
func (a *OrderActivities) ChargePayment(ctx context.Context, order *shared.Order) (*shared.PaymentResult, error) {
    log.Printf("[ChargePayment] 收取订单 %s 金额: %.2f", order.ID, order.TotalAmount)

    // 模拟支付网关调用
    time.Sleep(500 * time.Millisecond)

    // 模拟 95% 成功率
    if rand.Float32() < 0.05 {
        return nil, fmt.Errorf("payment_declined: 支付被拒绝")
    }

    result := &shared.PaymentResult{
        TransactionID: fmt.Sprintf("txn_%d", time.Now().UnixNano()),
        Amount:        order.TotalAmount,
        Method:        "credit_card",
        CapturedAt:    time.Now(),
    }

    log.Printf("[ChargePayment] 订单 %s 支付成功, 交易ID: %s", order.ID, result.TransactionID)
    return result, nil
}

// ReserveInventory 预约库存
func (a *OrderActivities) ReserveInventory(ctx context.Context, items []shared.OrderItem) error {
    log.Printf("[ReserveInventory] 预约 %d 个商品的库存", len(items))

    time.Sleep(300 * time.Millisecond)

    for _, item := range items {
        log.Printf("[ReserveInventory] 预约 %s x%d 成功", item.ProductID, item.Quantity)
    }

    return nil
}

// ReleaseInventory 释放库存(补偿)
func (a *OrderActivities) ReleaseInventory(ctx context.Context, items []shared.OrderItem) error {
    log.Printf("[ReleaseInventory] 释放 %d 个商品的库存", len(items))

    for _, item := range items {
        log.Printf("[ReleaseInventory] 释放 %s x%d", item.ProductID, item.Quantity)
    }

    return nil
}

// ScheduleShipping 调度配送
func (a *OrderActivities) ScheduleShipping(ctx context.Context, order *shared.Order) (*shared.ShippingResult, error) {
    log.Printf("[ScheduleShipping] 调度订单 %s 配送", order.ID)

    time.Sleep(200 * time.Millisecond)

    result := &shared.ShippingResult{
        TrackingNumber: fmt.Sprintf("TRK%s", fmt.Sprintf("%d", time.Now().UnixNano())[:12]),
        Carrier:        "SF-Express",
        EstimatedDays:  2 + rand.Intn(3),
        ShippedAt:      time.Now(),
    }

    log.Printf("[ScheduleShipping] 配送已调度, 运单号: %s", result.TrackingNumber)
    return result, nil
}

// SendNotification 发送通知
func (a *OrderActivities) SendNotification(ctx context.Context, customerID string, message string) error {
    log.Printf("[SendNotification] 发送通知给客户 %s: %s", customerID, message)
    time.Sleep(50 * time.Millisecond)
    return nil
}

// RefundPayment 退款(补偿)
func (a *OrderActivities) RefundPayment(ctx context.Context, transactionID string, amount float64) error {
    log.Printf("[RefundPayment] 退款交易 %s, 金额: %.2f", transactionID, amount)
    time.Sleep(200 * time.Millisecond)
    return nil
}

4.5 实现 Workflow

// workflows/order_workflow.go
package workflows

import (
    "fmt"
    "log"
    "time"

    "go.temporal.io/sdk/workflow"

    "temporal-order-demo/shared"
)

type OrderWorkflow struct{}

func NewOrderWorkflow() *OrderWorkflow {
    return &OrderWorkflow{}
}

// ProcessOrder 主订单处理工作流
func (w *OrderWorkflow) ProcessOrder(ctx workflow.Context, orderID string) (string, error) {
    // 设置工作流选项
    options := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Minute,
        HeartbeatTimeout:    30 * time.Second,
        WaitForCancellation: true,
        RetryPolicy: &shared.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    5 * time.Minute,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, options)

    // 初始化本地状态
    var order shared.Order
    var paymentResult *shared.PaymentResult
    var shippingResult *shared.ShippingResult
    var paymentFailed bool

    // ==================== 阶段1:获取订单 ====================
    log.Printf("[Workflow] 阶段1: 获取订单 %s", orderID)

    err := workflow.ExecuteActivity(ctx, "ValidateOrder", &shared.Order{ID: orderID}).Get(ctx, nil)
    if err != nil {
        log.Printf("[Workflow] 订单验证失败: %v", err)
        return "VALIDATION_FAILED", err
    }

    // ==================== 阶段2:处理支付 ====================
    log.Printf("[Workflow] 阶段2: 收取支付")
    err = workflow.ExecuteActivity(ctx, "ChargePayment", &shared.Order{
        ID:         orderID,
        TotalAmount: 299.99, // 模拟金额
    }).Get(ctx, &paymentResult)

    if err != nil {
        log.Printf("[Workflow] 支付失败: %v", err)
        paymentFailed = true
        // 支付失败,工作流结束
        return "PAYMENT_FAILED", err
    }

    // ==================== 阶段3:预约库存 ====================
    log.Printf("[Workflow] 阶段3: 预约库存")
    err = workflow.ExecuteActivity(ctx, "ReserveInventory", []shared.OrderItem{
        {ProductID: "PROD-001", Name: "商品A", Quantity: 2, Price: 99.99},
        {ProductID: "PROD-002", Name: "商品B", Quantity: 1, Price: 99.99},
    }).Get(ctx, nil)

    if err != nil {
        log.Printf("[Workflow] 库存预约失败: %v,触发补偿", err)
        // 补偿:退款
        refundErr := workflow.ExecuteActivity(ctx, "RefundPayment", paymentResult.TransactionID, paymentResult.Amount).Get(ctx, nil)
        if refundErr != nil {
            log.Printf("[Workflow] 退款失败: %v(需要人工介入)", refundErr)
        }
        return "INVENTORY_FAILED", err
    }

    // ==================== 阶段4:调度配送 ====================
    log.Printf("[Workflow] 阶段4: 调度配送")
    err = workflow.ExecuteActivity(ctx, "ScheduleShipping", &shared.Order{
        ID:         orderID,
        CustomerID: "CUST-12345",
    }).Get(ctx, &shippingResult)

    if err != nil {
        log.Printf("[Workflow] 配送调度失败: %v,触发补偿", err)
        // 补偿:释放库存 + 退款
        _ = workflow.ExecuteActivity(ctx, "ReleaseInventory", []shared.OrderItem{
            {ProductID: "PROD-001", Quantity: 2},
            {ProductID: "PROD-002", Quantity: 1},
        }).Get(ctx, nil)
        _ = workflow.ExecuteActivity(ctx, "RefundPayment", paymentResult.TransactionID, paymentResult.Amount).Get(ctx, nil)
        return "SHIPPING_FAILED", err
    }

    // ==================== 阶段5:发送通知 ====================
    log.Printf("[Workflow] 阶段5: 发送通知")
    err = workflow.ExecuteActivity(ctx, "SendNotification", "CUST-12345",
        fmt.Sprintf("您的订单已发货!运单号: %s,预计%d天后到达",
            shippingResult.TrackingNumber, shippingResult.EstimatedDays)).Get(ctx, nil)

    if err != nil {
        log.Printf("[Workflow] 通知发送失败(不影响整体流程): %v", err)
    }

    log.Printf("[Workflow] 订单处理完成!运单号: %s", shippingResult.TrackingNumber)
    return "COMPLETED", nil
}

4.6 Worker 实现

// cmd/worker/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"

    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"

    "temporal-order-demo/activities"
    "temporal-order-demo/shared"
    "temporal-order-demo/workflows"
)

func main() {
    // 连接 Temporal Server
    c, err := client.Dial(client.Options{
        HostPort:  os.Getenv("TEMPORAL_HOST_URL"),
        Namespace: "default",
    })
    if err != nil {
        log.Fatalf("无法连接 Temporal Server: %v", err)
    }
    defer c.Close()

    // 创建 Worker
    w := worker.New(c, shared.TaskQueueName, worker.Options{
        MaxConcurrentWorkflowTaskExecutionSize: 100,
        MaxConcurrentActivityTaskPollers:       10,
    })

    // 注册 Workflow 和 Activity
    orderActivities := activities.NewOrderActivities()
    orderWorkflow := workflows.NewOrderWorkflow()

    w.RegisterWorkflow(orderWorkflow.ProcessOrder)
    w.RegisterActivity(orderActivities.ValidateOrder)
    w.RegisterActivity(orderActivities.ChargePayment)
    w.RegisterActivity(orderActivities.ReserveInventory)
    w.RegisterActivity(orderActivities.ReleaseInventory)
    w.RegisterActivity(orderActivities.ScheduleShipping)
    w.RegisterActivity(orderActivities.SendNotification)
    w.RegisterActivity(orderActivities.RefundPayment)

    log.Printf("Worker 启动,监听 Task Queue: %s", shared.TaskQueueName)

    // 启动 Worker
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalf("Worker 运行失败: %v", err)
    }
}

4.7 Workflow 启动器

// cmd/starter/main.go
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/workflow"

    "temporal-order-demo/shared"
)

func main() {
    c, err := client.Dial(client.Options{
        HostPort:  os.Getenv("TEMPORAL_HOST_URL"),
        Namespace: "default",
    })
    if err != nil {
        log.Fatalf("无法连接 Temporal Server: %v", err)
    }
    defer c.Close()

    orderID := "ORD-" + fmt.Sprintf("%d", time.Now().UnixNano())

    workflowOptions := client.StartWorkflowOptions{
        ID:                       shared.WorkflowIDPrefix + "-" + orderID,
        TaskQueue:                shared.TaskQueueName,
        WorkflowExecutionTimeout: 30 * time.Minute,
    }

    log.Printf("启动订单处理工作流,订单ID: %s, WorkflowID: %s", orderID, workflowOptions.ID)

    we, err := c.ExecuteWorkflow(context.Background(), workflowOptions,
        "ProcessOrder", orderID)
    if err != nil {
        log.Fatalf("启动 Workflow 失败: %v", err)
    }

    log.Printf("Workflow 已启动,RunID: %s", we.GetRunID())

    // 等待结果
    var result string
    err = we.Get(context.Background(), &result)
    if err != nil {
        log.Printf("Workflow 执行失败: %v", err)
        os.Exit(1)
    }

    log.Printf("Workflow 执行完成,结果: %s", result)
}

4.8 Docker Compose 部署

# docker-compose.yaml
version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: temporal
      POSTGRES_PASSWORD: temporal
      POSTGRES_DB: temporal
    ports:
      - "5432:5432"
    volumes:
      - postgres-data:/var/lib/postgresql/data

  temporal:
    image: temporalio/auto-setup:1.24.0
    environment:
      DB: "postgresql"
      DB_PORT: 5432
      POSTGRES_USER: temporal
      POSTGRES_PASSWORD: temporal
      POSTGRES_SEEDS: postgres
      ENABLE_ES: "false"
    ports:
      - "7233:7233"
    depends_on:
      - postgres
    volumes:
      - ./temporal/config:/temporal/config

volumes:
  postgres-data:
# 启动服务
docker-compose up -d

# 设置环境变量
export TEMPORAL_HOST_URL=localhost:7233

# 启动 Worker
go run cmd/worker/main.go

# 在另一个终端启动 Workflow
go run cmd/starter/main.go

性能优化:让 Temporal 工作流飞起来

5.1 Activity 的批量处理优化

当需要处理大量数据时,不要逐个调用 Activity,而是使用批量处理:

// ❌ 低效:1000 个 Activity 调用
for _, item := range items {
    err := workflow.ExecuteActivity(ctx, ProcessItemActivity, item).Get(ctx, nil)
}

// ✅ 高效:批量 Activity
batchSize := 100
for i := 0; i < len(items); i += batchSize {
    end := i + batchSize
    if end > len(items) {
        end = len(items)
    }
    batch := items[i:end]
    err := workflow.ExecuteActivity(ctx, ProcessBatchActivity, batch).Get(ctx, nil)
    if err != nil {
        return err
    }
}

5.2 并发 Activity 的超时控制

// 启动多个 Activity,全部在 5 分钟内完成即可
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

selector := workflow.NewSelector(ctx)
for _, item := range items {
    future := workflow.ExecuteActivity(ctx, HeavyComputationActivity, item)
    selector.AddFuture(future, func(f workflow.Future) {
        // 处理完成
    })
}

completed := 0
for completed < len(items) {
    selector.Select(ctx)
    completed++
}

5.3 Workflow 代码大小优化

EventHistory 是每个 Workflow 执行的全部记录,代码越大、重放越慢。建议将复杂的业务逻辑拆分到多个 Activity 中:

// ❌ Workflow 代码过于复杂,EventHistory 膨胀
func ComplexWorkflow(ctx workflow.Context) error {
    // 5000 行复杂逻辑全在 Workflow 中
}

// ✅ 拆分到 Activity,保持 Workflow 简洁
func CleanWorkflow(ctx workflow.Context) error {
    err := workflow.ExecuteActivity(ctx, ComplexPhase1Activity, input).Get(ctx, nil)
    // ...
}

5.4 使用 Local Activity 减少网络开销

对于同一进程内可以完成的轻量操作,使用 Local Activity:

activityOpts := &workflow.ActivityOptions{
    StartToCloseTimeout: 10 * time.Second,
    RetryPolicy: &temporal.RetryPolicy{
        MaximumAttempts: 1, // Local Activity 不跨进程,不需要重试
    },
}
ctx = workflow.WithLocalActivityOptions(ctx, activityOpts)

与其他方案的对比

6.1 Temporal vs 消息队列(Kafka / RabbitMQ)

维度消息队列Temporal
状态管理需要自己维护状态表自动持久化检查点
失败恢复需要自己写补偿逻辑自动从检查点重试
长时间运行需要定期心跳原生支持
业务逻辑需要自己编排工作流即代码
查询能力需要自己实现内置 Query 接口

6.2 Temporal vs AWS Step Functions

维度Step FunctionsTemporal
编程模型JSON 状态机 DSL普通代码(Go/TS/Python)
生态绑定AWS任意云 / 自托管
社区活跃度AWS 官方维护开源活跃(Uber 内部生产验证)
调试体验AWS ConsoleTemporal Web UI + CLI
成本模型按状态转换次数计费开源免费(自托管)

6.3 Temporal vs Airflow

维度AirflowTemporal
定位数据管道 / ETL通用业务流程编排
执行模型调度器驱动事件驱动 + 持久化执行
失败处理需要自己实现重试原生自动重试
长时间任务支持但不够灵活原生支持
实时性调度周期(分钟级)毫秒级响应

避坑指南:从生产实践中总结的经验

7.1 Workflow 版本管理

当需要修改正在运行的工作流代码时,Temporal 通过 ContinueAsNew 提供平滑升级:

func V1Workflow(ctx workflow.Context, input Input) (Result, error) {
    // V1 逻辑...
    // 当需要升级到 V2 时
    return "", workflow.NewContinueAsNewError(ctx, "V2Workflow", input)
}

func V2Workflow(ctx workflow.Context, input Input) (Result, error) {
    // V2 逻辑改进...
    return result, nil
}

7.2 幂等性设计

Activity 必须设计为幂等的,因为 Temporal 可能会重试:

// ✅ 幂等的 Activity:通过外部事务表保证
func ProcessPaymentActivity(ctx context.Context, req PaymentRequest) error {
    // 使用支付流水号作为幂等键
    existing, _ := db.FindTransaction(req.TransactionID)
    if existing != nil && existing.Status == "success" {
        return nil // 已经是成功的,跳过
    }

    // 执行支付逻辑
    return db.SaveTransaction(req.TransactionID, "success")
}

7.3 Activity 超时配置

超时配置过短会导致频繁超时重试,过长会拖慢问题发现速度。建议:

  • StartToCloseTimeout:单个 Activity 最大执行时间
  • ScheduleToStartTimeout:Activity 等待 Worker 领取的时间
  • HeartbeatTimeout:长任务心跳间隔(必须小于 StartToCloseTimeout)
activityOpts := &workflow.ActivityOptions{
    StartToCloseTimeout: 5 * time.Minute,
    ScheduleToStartTimeout: 1 * time.Minute,
    HeartbeatTimeout:    30 * time.Second,
}

7.4 信号死锁问题

Workflow 中的 selector.Select() 如果没有 default 分支且没有其他可选择的操作,会导致死锁:

// ✅ 正确:使用 Selector 并配合 Timer
for waitingForSignal {
    selector := workflow.NewSelector(ctx)
    var sig string
    selector.AddSignal(ctx, "my-signal", func(s workflow.Context, s2 string) {
        sig = s2
    })
    selector.AddTimer(ctx, 1*time.Minute, func(auth workflow.Future) {
        // 超时处理
    })
    selector.Selector(ctx)
}

总结展望

Temporal 代表的不仅仅是一个工具,更是一种分布式业务流程编程范式的转变。

传统的分布式系统开发,开发者需要同时操心两件事:业务逻辑和分布式协调。Temporal 通过「工作流即代码」的理念,让这两件事彻底分离。你写的是干净的同步代码,但得到的是一个可以在任意节点运行、支持任意失败恢复、随时可查询状态的分布式系统。

2026 年的 Temporal 生态已经相当成熟:

  • 多语言 SDK 完善:Go、TypeScript、Python、Java 四大语言的生产级支持
  • 运维工具成熟:Temporal Web UI、Temporal CLI、Prometheus 监控指标
  • 社区活跃:GitHub 27k+ stars,生产案例覆盖支付、电商、金融、游戏等多个行业
  • 云服务支持:AWS Temporal(叫 Amazon SQS Extended Library)、Temporal Cloud

对于后端开发者来说,如果你正在处理跨服务的业务流程、需要可靠的重试和补偿逻辑、或者需要长时间运行的任务编排——Temporal 绝对值得一试。它的学习曲线比任何替代方案都要平缓,而它解决的问题,比任何替代方案都要全面。

正如 Temporal 官方文档所说:「Build invincible apps」——用 Temporal 构建真正可靠的应用。


参考资源:

  • 官方文档:https://docs.temporal.io/
  • Go SDK:https://github.com/temporalio/sdk-go
  • 官方示例:https://github.com/temporalio/samples-go
  • Temporal Cloud:https://temporal.io/cloud

推荐文章

记录一次服务器的优化对比
2024-11-19 09:18:23 +0800 CST
Vue3中哪些API被废弃了?
2024-11-17 04:17:22 +0800 CST
Vue3中如何实现响应式数据?
2024-11-18 10:15:48 +0800 CST
HTML5的 input:file上传类型控制
2024-11-19 07:29:28 +0800 CST
浅谈CSRF攻击
2024-11-18 09:45:14 +0800 CST
程序员茄子在线接单