Temporal 深度实战:当分布式工作流从「工程噩梦」变成「写代码一样简单」
背景介绍
在微服务架构统治后端世界的今天,有一个问题始终困扰着所有后端工程师:跨服务的业务流程到底该怎么组织?
订单处理、支付对账、数据同步、批量任务……这些业务逻辑横跨多个服务,涉及重试、超时、补偿、幂等、持久化等无数细节。传统方案往往是这样的:
# ❌ 传统方式:手动编排,脆弱得像纸牌屋
async def process_order(order_id: str):
try:
order = await order_service.get(order_id)
payment = await payment_service.charge(order.amount)
await inventory_service.reserve(order.items)
await shipping_service.schedule(order.address)
await notification_service.send(order.user_id, "订单已确认")
except PaymentError:
await inventory_service.release(order.items)
raise
except ShippingError:
await payment_service.refund(payment.id)
await inventory_service.release(order.items)
raise
这段代码存在一连串致命问题:
- 无重试机制:网络抖动直接失败,用户体验差
- 无持久化:进程崩溃后状态丢失,从头来过
- 补偿逻辑复杂:每一步失败都需要手动回滚,逻辑分散在各处
- 无法查询状态:订单进行到哪一步了?只能查日志
- 无法暂停/恢复:高峰期积压的订单重启后全部丢失
- 无法并发控制:多个实例同时处理同一订单,数据乱了
这些问题在单个服务内部就够头疼了,到了跨服务场景更是指数级爆炸。Saga 模式、Choreography 模式、可靠消息队列……每种方案都有自己的复杂度,团队学不动,维护成本高,新人接手更是两眼一抹黑。
Temporal 就是在这种背景下诞生的。它是 Uber 内部孵化的开源项目(最初叫 Cadence),将工作流编排提升到了一个全新的抽象层次:用普通代码写分布式业务流程,框架自动处理重试、持久化、状态查询、并发控制等所有非功能性需求。
什么是 Temporal
Temporal 是一个分布式工作流运行时(Workflow Runtime),核心思想是:你的业务代码不需要知道它在分布式环境中运行。
它通过「工作流」和「活动」两个核心原语,将业务逻辑与运行时关注点分离:
- Activity(活动):普通的单次操作,如「调用支付 API」「发送邮件」「写入数据库」
- Workflow(工作流):编排活动的业务逻辑,本身是纯代码,由 Temporal 持久化执行
Temporal 的关键特性:
- 持久化执行:工作流状态自动保存,进程重启、机器宕机都不丢
- 确定性执行:同样的输入,Workflow 代码执行结果完全一致
- 自动重试:Activity 失败自动重试,Workflow 失败从上一个检查点恢复
- 时间语义:支持 Timer、SideEffect、查询接口等内置语义
- 多语言 SDK:Go、TypeScript/JavaScript、Python、Java、PHP、Ruby、.NET
// ✅ Temporal 方式:业务逻辑清晰,框架处理一切
func OrderProcessingWorkflow(ctx workflow.Context, orderID string) error {
// 1. 获取订单
var order Order
err := workflow.ExecuteActivity(ctx, orderService.Get, orderID).Get(ctx, &order)
if err != nil {
return err
}
// 2. 收费(带重试)
var payment PaymentResult
err = workflow.ExecuteActivity(ctx, paymentService.Charge, order.Amount).Get(ctx, &payment)
if err != nil {
return err // Temporal 自动处理失败补偿
}
// 3. 预约库存
err = workflow.ExecuteActivity(ctx, inventoryService.Reserve, order.Items).Get(ctx, nil)
if err != nil {
// 补偿:退款
_ = workflow.ExecuteActivity(ctx, paymentService.Refund, payment.ID).Get(ctx, nil)
return err
}
// 4. 调度配送
err = workflow.ExecuteActivity(ctx, shippingService.Schedule, order.Address).Get(ctx, nil)
if err != nil {
return err
}
// 5. 发送通知
return workflow.ExecuteActivity(ctx, notificationService.Send, order.UserID, "已确认").Get(ctx, nil)
}
这段代码读起来就像同步代码,但实际执行时:
- 每个 Activity 调用都是持久化且可重试的
- 工作流在每个 await 点自动保存检查点
- 即使服务器重启,从检查点恢复后继续执行
- Activity 失败时,后续的补偿逻辑由开发者决定(Temporal 提供 ContinueAsNew、Signal 等机制)
核心概念深度解析
2.1 Workflow 的确定性原则
这是 Temporal 中最重要也是最容易踩坑的概念。
Temporal Workflow 是确定性的(Deterministic)。 同一个 WorkflowDefinition + 同一个输入,执行一万次,结果必须完全一致。这是 Temporal 能在任意时间点暂停、恢复、重放的基础。
要实现确定性,Workflow 代码必须遵守以下规则:
规则一:禁止直接调用外部服务
// ❌ 错误:直接调用外部服务,无法重试和恢复
func BadWorkflow(ctx workflow.Context) error {
result := http.Get("https://api.example.com/data") // 不允许!
return process(result)
}
// ✅ 正确:通过 Activity 间接调用
func GoodWorkflow(ctx workflow.Context) error {
var result MyData
err := workflow.ExecuteActivity(ctx, FetchDataActivity, "https://api.example.com/data").Get(ctx, &result)
return err
}
规则二:禁止使用系统时间作为业务判断依据
// ❌ 错误:time.Now() 每次调用值不同,破坏确定性
func BadWorkflow(ctx workflow.Context) error {
now := time.Now()
if now.Hour() > 18 {
// 下午6点后不处理
return nil
}
// ...
}
// ✅ 正确:使用 Workflow 的时间语义
func GoodWorkflow(ctx workflow.Context) error {
// Now() 在 Workflow 中是"已记录的墙钟时间",重放时值不变
now := workflow.Now(ctx)
if now.Hour() > 18 {
return nil
}
// ...
}
规则三:禁止使用随机数作为业务分支
// ❌ 错误:math/rand 每次执行值不同
func BadWorkflow(ctx workflow.Context) error {
if rand.Int()%2 == 0 {
// 随机逻辑
}
}
// ✅ 正确:使用 SideEffect 或确定性的 UUID
func GoodWorkflow(ctx workflow.Context) error {
var randomSeed int64
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Int() // 重放时返回缓存的值
}).Get(ctx, &randomSeed)
// ...
}
规则四:禁止使用 map 等非线程安全数据结构
Workflow 执行在单线程中,但会并发处理 Signal。Go 的 map 不是并发安全的。
// ✅ 正确:使用 Workflow.GetInfo 获取状态
var processedItems []string
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return processedItems // 持久化到 EventHistory
}).Get(ctx, &processedItems)
2.2 Activity 的设计哲学
Activity 是 Workflow 与外部世界交互的唯一通道。一个好的 Activity 设计需要考虑:
Activity 的失败处理
// 定义 Activity 选项
activityOpts := activity.StartToCloseTimeout(5 * time.Minute)
retryOpts := retry.New ExponentialBackoffRetryPolicy(
1 * time.Second, // 初始间隔
10 * time.Minute, // 最大间隔
2.0, // 退火系数
)
// 或者使用 Activity 的 options 链式配置
activityOpts := &workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 10 * time.Minute,
MaximumAttempts: 5,
NonRetryableErrorReasons: []string{
"invalid-order-status",
"payment-declined-permanently",
},
},
}
Activity 心跳机制
长时间运行的 Activity 需要定期报告进度,这样即使 Temporal Server 重启,也能从上次心跳恢复:
func LongRunningActivity(ctx context.Context, input JobInput) error {
activity.RecordHeartbeat(ctx, Progress{Percent: 0})
// Step 1: 处理前 50%
if err := processFirstHalf(ctx, input); err != nil {
return err
}
activity.RecordHeartbeat(ctx, Progress{Percent: 50})
// Step 2: 处理后 50%
if err := processSecondHalf(ctx, input); err != nil {
return err
}
activity.RecordHeartbeat(ctx, Progress{Percent: 100})
return nil
}
Activity 的并发执行
一个 Workflow 可以同时启动多个 Activity:
func ParallelProcessingWorkflow(ctx workflow.Context, items []string) error {
// 启动多个 Activity 并发执行
futures := make([]workflow.Future, len(items))
for i, item := range items {
futures[i] = workflow.ExecuteActivity(ctx, ProcessItemActivity, item)
}
// 等待所有 Activity 完成
for i, future := range futures {
var result Result
if err := future.Get(ctx, &result); err != nil {
return fmt.Errorf("item %s failed: %w", items[i], err)
}
}
return nil
}
2.3 Signal(信号)与 Query(查询)
这是 Temporal 最强大的两个交互机制。
Signal 用于向运行中的 Workflow 发送消息:
// 客户端发送 Signal
workflowHandle, _ := c.Client.GetWorkflowHandle(ctx, "order-123")
err := workflowHandle.Signal(ctx, "cancel-order", CancelSignal{Reason: "customer-requested"})
// Workflow 中接收 Signal
func OrderWorkflow(ctx workflow.Context) error {
// 注册 Signal handler
selector := workflow.NewSelector(ctx)
var cancelRequested bool
var cancelReason string
selector.AddSignal(ctx, "cancel-order", func(signalCtx workflow.Context, s CancelSignal) {
cancelRequested = true
cancelReason = s.Reason
})
// 业务主循环
for !cancelRequested {
// ... 处理订单逻辑 ...
selector.Select(ctx)
}
// 收到取消信号,执行清理
return fmt.Errorf("cancelled: %s", cancelReason)
}
Query 用于查询运行中 Workflow 的状态(只读,不影响执行):
// 注册 Query handler
workflow.SetQueryHandler(ctx, "get-order-status", func() (OrderStatus, error) {
return OrderStatus{
Phase: currentPhase,
Items: processedItems,
ETA: estimatedTime,
}, nil
})
// 客户端查询
var status OrderStatus
err := workflowHandle.Query(ctx, "get-order-status", &status)
2.4 Child Workflow(子工作流)
Temporal 支持 Workflow 嵌套调用,子 Workflow 继承父 Workflow 的超时配置,且可以独立重试:
func ParentWorkflow(ctx workflow.Context, batchID string) error {
var batch Batch
err := workflow.ExecuteActivity(ctx, FetchBatchActivity, batchID).Get(ctx, &batch)
if err != nil {
return err
}
// 启动子 Workflow 处理每个批次项
childOpts := workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("batch-child-%s", batch.ID),
ParentClosePolicy: workflow.ParentClosePolicyTerminate,
ExecutionTimeout: 1 * time.Hour,
}
childCtx := workflow.WithChildOptions(ctx, childOpts)
futures := make([]workflow.Future, len(batch.Items))
for i, item := range batch.Items {
futures[i] = workflow.ExecuteChildWorkflow(childCtx, ProcessItemWorkflow, item)
}
// 等待所有子 Workflow 完成
for i, future := range futures {
if err := future.Get(ctx, nil); err != nil {
log.Printf("子工作流 %d 失败: %v", i, err)
}
}
return nil
}
架构分析:Temporal 是怎么工作的
3.1 整体架构
Temporal 的架构分为两部分:Temporal Server 和 Worker Process。
┌─────────────────────────────────────────────────────────┐
│ Temporal Server │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Frontend │ │ History │ │ Matching │ │
│ │ Service │──│ Service │──│ Service │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ └──────────────┴──────────────┘ │
│ Persistence Layer │
│ (PostgreSQL / MySQL / Cassandra) │
└─────────────────────────────────────────────────────────┘
▲ ▲
│ long-poll task
│ commands queue
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Worker Process │ │ Worker Process │
│ [Go Worker] │ │ [TS Worker] │
│ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ Workflow │ │ │ │ Activity │ │
│ │ Worker │ │ │ │ Worker │ │
│ └─────────────┘ │ │ └─────────────┘ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ Activity │ │ │ │ Workflow │ │
│ │ Worker │ │ │ │ Worker │ │
│ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘
History Service 是 Temporal Server 的核心,它将每个 Workflow 的完整执行历史(EventHistory)持久化到数据库。这使得:
- 任何节点都能恢复 Workflow:EventHistory 包含了 Workflow 从开始到现在的所有「事件」,Worker 根据 EventHistory 重放即可恢复状态
- Workflow 代码与执行位置解耦:Workflow 可以在任何语言、任何节点的 Worker 上执行
- 重放调试:可以随时将 Workflow 重放到任意时间点,调试生产问题
3.2 EventHistory:持久化的本质
EventHistory 是 Temporal 最核心的数据结构。每次 Workflow 执行,History Service 都会记录一个事件序列:
Event 1: WorkflowExecutionStarted { workflowType, input }
Event 2: ActivityTaskScheduled { activityType, input, activityId }
Event 3: ActivityTaskCompleted { activityId, result }
Event 4: ActivityTaskScheduled { activityType, input, activityId }
Event 5: ActivityTaskFailed { activityId, reason }
Event 6: WorkflowExecutionContinuedAsNew { newRunId }
这个序列是追加写入的,每个事件都是幂等的。当 Workflow 需要恢复时,Worker 读取完整的 EventHistory,按顺序重放每个事件,重新构建内存状态。
这带来了一个非常重要的特性:Workflow 的状态完全由 EventHistory 决定,而不是由 Worker 的内存状态决定。 你可以在 Workflow 运行时修改 Workflow 代码,然后重启 Worker——重启后的 Worker 会用新代码重放旧的 EventHistory,只要代码修改是「向前兼容」的(不改变已有事件的处理逻辑),Workflow 就能继续正常执行。
3.3 多语言 SDK 的实现原理
Temporal 的多语言 SDK 并不是简单地「用不同语言调用同一个服务器」,而是一个更巧妙的设计:
每个 SDK 都包含:
- Workflow Replayer:从 EventHistory 重放 Workflow 逻辑
- Activity Executor:执行具体的 Activity 逻辑
- Worker Process:注册 Workflow 和 Activity 处理函数
当你用 Go 写一个 Workflow 框架,然后用 TypeScript 写 Activity 实现——这在 Temporal 中是支持的,因为它们通过 task queue 通信:
// Go Worker:只负责运行 Workflow
worker.RegisterWorkflowImplementationFunc(MyWorkflow)
// TypeScript Worker:负责运行 Activity
worker.registerActivity('fetchUser', fetchUserActivity)
代码实战:构建一个订单处理系统
4.1 项目结构
temporal-order-demo/
├── cmd/
│ ├── worker/
│ │ └── main.go
│ └── starter/
│ └── main.go
├── activities/
│ └── order_activities.go
├── workflows/
│ └── order_workflow.go
├── shared/
│ ├── constants.go
│ └── types.go
├── docker-compose.yaml
└── go.mod
4.2 依赖配置
// go.mod
module temporal-order-demo
go 1.21
require (
go.temporal.io/sdk v1.25.0
go.temporal.io/sdk/contrib/tools/workflowcheck v1.0.0
github.com/google/uuid v1.6.0
go.uber.org/fx v1.20.0
)
4.3 定义类型
// shared/types.go
package shared
import (
"time"
)
type Order struct {
ID string
CustomerID string
Items []OrderItem
TotalAmount float64
Status string
CreatedAt time.Time
}
type OrderItem struct {
ProductID string
Name string
Quantity int
Price float64
}
type PaymentResult struct {
TransactionID string
Amount float64
Method string
CapturedAt time.Time
}
type ShippingResult struct {
TrackingNumber string
Carrier string
EstimatedDays int
ShippedAt time.Time
}
type CancelSignal struct {
Reason string
}
const (
TaskQueueName = "order-processing"
WorkflowIDPrefix = "order-processing"
)
4.4 实现 Activity
// activities/order_activities.go
package activities
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"temporal-order-demo/shared"
)
type OrderActivities struct{}
func NewOrderActivities() *OrderActivities {
return &OrderActivities{}
}
// ValidateOrder 验证订单合法性
func (a *OrderActivities) ValidateOrder(ctx context.Context, order *shared.Order) error {
log.Printf("[ValidateOrder] 验证订单 %s, 总额: %.2f", order.ID, order.TotalAmount)
if len(order.Items) == 0 {
return fmt.Errorf("订单项不能为空")
}
if order.TotalAmount <= 0 {
return fmt.Errorf("订单金额必须大于0")
}
// 模拟数据库查询延迟
time.Sleep(100 * time.Millisecond)
log.Printf("[ValidateOrder] 订单 %s 验证通过", order.ID)
return nil
}
// ChargePayment 收取支付
func (a *OrderActivities) ChargePayment(ctx context.Context, order *shared.Order) (*shared.PaymentResult, error) {
log.Printf("[ChargePayment] 收取订单 %s 金额: %.2f", order.ID, order.TotalAmount)
// 模拟支付网关调用
time.Sleep(500 * time.Millisecond)
// 模拟 95% 成功率
if rand.Float32() < 0.05 {
return nil, fmt.Errorf("payment_declined: 支付被拒绝")
}
result := &shared.PaymentResult{
TransactionID: fmt.Sprintf("txn_%d", time.Now().UnixNano()),
Amount: order.TotalAmount,
Method: "credit_card",
CapturedAt: time.Now(),
}
log.Printf("[ChargePayment] 订单 %s 支付成功, 交易ID: %s", order.ID, result.TransactionID)
return result, nil
}
// ReserveInventory 预约库存
func (a *OrderActivities) ReserveInventory(ctx context.Context, items []shared.OrderItem) error {
log.Printf("[ReserveInventory] 预约 %d 个商品的库存", len(items))
time.Sleep(300 * time.Millisecond)
for _, item := range items {
log.Printf("[ReserveInventory] 预约 %s x%d 成功", item.ProductID, item.Quantity)
}
return nil
}
// ReleaseInventory 释放库存(补偿)
func (a *OrderActivities) ReleaseInventory(ctx context.Context, items []shared.OrderItem) error {
log.Printf("[ReleaseInventory] 释放 %d 个商品的库存", len(items))
for _, item := range items {
log.Printf("[ReleaseInventory] 释放 %s x%d", item.ProductID, item.Quantity)
}
return nil
}
// ScheduleShipping 调度配送
func (a *OrderActivities) ScheduleShipping(ctx context.Context, order *shared.Order) (*shared.ShippingResult, error) {
log.Printf("[ScheduleShipping] 调度订单 %s 配送", order.ID)
time.Sleep(200 * time.Millisecond)
result := &shared.ShippingResult{
TrackingNumber: fmt.Sprintf("TRK%s", fmt.Sprintf("%d", time.Now().UnixNano())[:12]),
Carrier: "SF-Express",
EstimatedDays: 2 + rand.Intn(3),
ShippedAt: time.Now(),
}
log.Printf("[ScheduleShipping] 配送已调度, 运单号: %s", result.TrackingNumber)
return result, nil
}
// SendNotification 发送通知
func (a *OrderActivities) SendNotification(ctx context.Context, customerID string, message string) error {
log.Printf("[SendNotification] 发送通知给客户 %s: %s", customerID, message)
time.Sleep(50 * time.Millisecond)
return nil
}
// RefundPayment 退款(补偿)
func (a *OrderActivities) RefundPayment(ctx context.Context, transactionID string, amount float64) error {
log.Printf("[RefundPayment] 退款交易 %s, 金额: %.2f", transactionID, amount)
time.Sleep(200 * time.Millisecond)
return nil
}
4.5 实现 Workflow
// workflows/order_workflow.go
package workflows
import (
"fmt"
"log"
"time"
"go.temporal.io/sdk/workflow"
"temporal-order-demo/shared"
)
type OrderWorkflow struct{}
func NewOrderWorkflow() *OrderWorkflow {
return &OrderWorkflow{}
}
// ProcessOrder 主订单处理工作流
func (w *OrderWorkflow) ProcessOrder(ctx workflow.Context, orderID string) (string, error) {
// 设置工作流选项
options := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
HeartbeatTimeout: 30 * time.Second,
WaitForCancellation: true,
RetryPolicy: &shared.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 5 * time.Minute,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, options)
// 初始化本地状态
var order shared.Order
var paymentResult *shared.PaymentResult
var shippingResult *shared.ShippingResult
var paymentFailed bool
// ==================== 阶段1:获取订单 ====================
log.Printf("[Workflow] 阶段1: 获取订单 %s", orderID)
err := workflow.ExecuteActivity(ctx, "ValidateOrder", &shared.Order{ID: orderID}).Get(ctx, nil)
if err != nil {
log.Printf("[Workflow] 订单验证失败: %v", err)
return "VALIDATION_FAILED", err
}
// ==================== 阶段2:处理支付 ====================
log.Printf("[Workflow] 阶段2: 收取支付")
err = workflow.ExecuteActivity(ctx, "ChargePayment", &shared.Order{
ID: orderID,
TotalAmount: 299.99, // 模拟金额
}).Get(ctx, &paymentResult)
if err != nil {
log.Printf("[Workflow] 支付失败: %v", err)
paymentFailed = true
// 支付失败,工作流结束
return "PAYMENT_FAILED", err
}
// ==================== 阶段3:预约库存 ====================
log.Printf("[Workflow] 阶段3: 预约库存")
err = workflow.ExecuteActivity(ctx, "ReserveInventory", []shared.OrderItem{
{ProductID: "PROD-001", Name: "商品A", Quantity: 2, Price: 99.99},
{ProductID: "PROD-002", Name: "商品B", Quantity: 1, Price: 99.99},
}).Get(ctx, nil)
if err != nil {
log.Printf("[Workflow] 库存预约失败: %v,触发补偿", err)
// 补偿:退款
refundErr := workflow.ExecuteActivity(ctx, "RefundPayment", paymentResult.TransactionID, paymentResult.Amount).Get(ctx, nil)
if refundErr != nil {
log.Printf("[Workflow] 退款失败: %v(需要人工介入)", refundErr)
}
return "INVENTORY_FAILED", err
}
// ==================== 阶段4:调度配送 ====================
log.Printf("[Workflow] 阶段4: 调度配送")
err = workflow.ExecuteActivity(ctx, "ScheduleShipping", &shared.Order{
ID: orderID,
CustomerID: "CUST-12345",
}).Get(ctx, &shippingResult)
if err != nil {
log.Printf("[Workflow] 配送调度失败: %v,触发补偿", err)
// 补偿:释放库存 + 退款
_ = workflow.ExecuteActivity(ctx, "ReleaseInventory", []shared.OrderItem{
{ProductID: "PROD-001", Quantity: 2},
{ProductID: "PROD-002", Quantity: 1},
}).Get(ctx, nil)
_ = workflow.ExecuteActivity(ctx, "RefundPayment", paymentResult.TransactionID, paymentResult.Amount).Get(ctx, nil)
return "SHIPPING_FAILED", err
}
// ==================== 阶段5:发送通知 ====================
log.Printf("[Workflow] 阶段5: 发送通知")
err = workflow.ExecuteActivity(ctx, "SendNotification", "CUST-12345",
fmt.Sprintf("您的订单已发货!运单号: %s,预计%d天后到达",
shippingResult.TrackingNumber, shippingResult.EstimatedDays)).Get(ctx, nil)
if err != nil {
log.Printf("[Workflow] 通知发送失败(不影响整体流程): %v", err)
}
log.Printf("[Workflow] 订单处理完成!运单号: %s", shippingResult.TrackingNumber)
return "COMPLETED", nil
}
4.6 Worker 实现
// cmd/worker/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"temporal-order-demo/activities"
"temporal-order-demo/shared"
"temporal-order-demo/workflows"
)
func main() {
// 连接 Temporal Server
c, err := client.Dial(client.Options{
HostPort: os.Getenv("TEMPORAL_HOST_URL"),
Namespace: "default",
})
if err != nil {
log.Fatalf("无法连接 Temporal Server: %v", err)
}
defer c.Close()
// 创建 Worker
w := worker.New(c, shared.TaskQueueName, worker.Options{
MaxConcurrentWorkflowTaskExecutionSize: 100,
MaxConcurrentActivityTaskPollers: 10,
})
// 注册 Workflow 和 Activity
orderActivities := activities.NewOrderActivities()
orderWorkflow := workflows.NewOrderWorkflow()
w.RegisterWorkflow(orderWorkflow.ProcessOrder)
w.RegisterActivity(orderActivities.ValidateOrder)
w.RegisterActivity(orderActivities.ChargePayment)
w.RegisterActivity(orderActivities.ReserveInventory)
w.RegisterActivity(orderActivities.ReleaseInventory)
w.RegisterActivity(orderActivities.ScheduleShipping)
w.RegisterActivity(orderActivities.SendNotification)
w.RegisterActivity(orderActivities.RefundPayment)
log.Printf("Worker 启动,监听 Task Queue: %s", shared.TaskQueueName)
// 启动 Worker
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalf("Worker 运行失败: %v", err)
}
}
4.7 Workflow 启动器
// cmd/starter/main.go
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"temporal-order-demo/shared"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: os.Getenv("TEMPORAL_HOST_URL"),
Namespace: "default",
})
if err != nil {
log.Fatalf("无法连接 Temporal Server: %v", err)
}
defer c.Close()
orderID := "ORD-" + fmt.Sprintf("%d", time.Now().UnixNano())
workflowOptions := client.StartWorkflowOptions{
ID: shared.WorkflowIDPrefix + "-" + orderID,
TaskQueue: shared.TaskQueueName,
WorkflowExecutionTimeout: 30 * time.Minute,
}
log.Printf("启动订单处理工作流,订单ID: %s, WorkflowID: %s", orderID, workflowOptions.ID)
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions,
"ProcessOrder", orderID)
if err != nil {
log.Fatalf("启动 Workflow 失败: %v", err)
}
log.Printf("Workflow 已启动,RunID: %s", we.GetRunID())
// 等待结果
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Printf("Workflow 执行失败: %v", err)
os.Exit(1)
}
log.Printf("Workflow 执行完成,结果: %s", result)
}
4.8 Docker Compose 部署
# docker-compose.yaml
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
POSTGRES_DB: temporal
ports:
- "5432:5432"
volumes:
- postgres-data:/var/lib/postgresql/data
temporal:
image: temporalio/auto-setup:1.24.0
environment:
DB: "postgresql"
DB_PORT: 5432
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
POSTGRES_SEEDS: postgres
ENABLE_ES: "false"
ports:
- "7233:7233"
depends_on:
- postgres
volumes:
- ./temporal/config:/temporal/config
volumes:
postgres-data:
# 启动服务
docker-compose up -d
# 设置环境变量
export TEMPORAL_HOST_URL=localhost:7233
# 启动 Worker
go run cmd/worker/main.go
# 在另一个终端启动 Workflow
go run cmd/starter/main.go
性能优化:让 Temporal 工作流飞起来
5.1 Activity 的批量处理优化
当需要处理大量数据时,不要逐个调用 Activity,而是使用批量处理:
// ❌ 低效:1000 个 Activity 调用
for _, item := range items {
err := workflow.ExecuteActivity(ctx, ProcessItemActivity, item).Get(ctx, nil)
}
// ✅ 高效:批量 Activity
batchSize := 100
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
err := workflow.ExecuteActivity(ctx, ProcessBatchActivity, batch).Get(ctx, nil)
if err != nil {
return err
}
}
5.2 并发 Activity 的超时控制
// 启动多个 Activity,全部在 5 分钟内完成即可
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
selector := workflow.NewSelector(ctx)
for _, item := range items {
future := workflow.ExecuteActivity(ctx, HeavyComputationActivity, item)
selector.AddFuture(future, func(f workflow.Future) {
// 处理完成
})
}
completed := 0
for completed < len(items) {
selector.Select(ctx)
completed++
}
5.3 Workflow 代码大小优化
EventHistory 是每个 Workflow 执行的全部记录,代码越大、重放越慢。建议将复杂的业务逻辑拆分到多个 Activity 中:
// ❌ Workflow 代码过于复杂,EventHistory 膨胀
func ComplexWorkflow(ctx workflow.Context) error {
// 5000 行复杂逻辑全在 Workflow 中
}
// ✅ 拆分到 Activity,保持 Workflow 简洁
func CleanWorkflow(ctx workflow.Context) error {
err := workflow.ExecuteActivity(ctx, ComplexPhase1Activity, input).Get(ctx, nil)
// ...
}
5.4 使用 Local Activity 减少网络开销
对于同一进程内可以完成的轻量操作,使用 Local Activity:
activityOpts := &workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1, // Local Activity 不跨进程,不需要重试
},
}
ctx = workflow.WithLocalActivityOptions(ctx, activityOpts)
与其他方案的对比
6.1 Temporal vs 消息队列(Kafka / RabbitMQ)
| 维度 | 消息队列 | Temporal |
|---|---|---|
| 状态管理 | 需要自己维护状态表 | 自动持久化检查点 |
| 失败恢复 | 需要自己写补偿逻辑 | 自动从检查点重试 |
| 长时间运行 | 需要定期心跳 | 原生支持 |
| 业务逻辑 | 需要自己编排 | 工作流即代码 |
| 查询能力 | 需要自己实现 | 内置 Query 接口 |
6.2 Temporal vs AWS Step Functions
| 维度 | Step Functions | Temporal |
|---|---|---|
| 编程模型 | JSON 状态机 DSL | 普通代码(Go/TS/Python) |
| 生态绑定 | AWS | 任意云 / 自托管 |
| 社区活跃度 | AWS 官方维护 | 开源活跃(Uber 内部生产验证) |
| 调试体验 | AWS Console | Temporal Web UI + CLI |
| 成本模型 | 按状态转换次数计费 | 开源免费(自托管) |
6.3 Temporal vs Airflow
| 维度 | Airflow | Temporal |
|---|---|---|
| 定位 | 数据管道 / ETL | 通用业务流程编排 |
| 执行模型 | 调度器驱动 | 事件驱动 + 持久化执行 |
| 失败处理 | 需要自己实现重试 | 原生自动重试 |
| 长时间任务 | 支持但不够灵活 | 原生支持 |
| 实时性 | 调度周期(分钟级) | 毫秒级响应 |
避坑指南:从生产实践中总结的经验
7.1 Workflow 版本管理
当需要修改正在运行的工作流代码时,Temporal 通过 ContinueAsNew 提供平滑升级:
func V1Workflow(ctx workflow.Context, input Input) (Result, error) {
// V1 逻辑...
// 当需要升级到 V2 时
return "", workflow.NewContinueAsNewError(ctx, "V2Workflow", input)
}
func V2Workflow(ctx workflow.Context, input Input) (Result, error) {
// V2 逻辑改进...
return result, nil
}
7.2 幂等性设计
Activity 必须设计为幂等的,因为 Temporal 可能会重试:
// ✅ 幂等的 Activity:通过外部事务表保证
func ProcessPaymentActivity(ctx context.Context, req PaymentRequest) error {
// 使用支付流水号作为幂等键
existing, _ := db.FindTransaction(req.TransactionID)
if existing != nil && existing.Status == "success" {
return nil // 已经是成功的,跳过
}
// 执行支付逻辑
return db.SaveTransaction(req.TransactionID, "success")
}
7.3 Activity 超时配置
超时配置过短会导致频繁超时重试,过长会拖慢问题发现速度。建议:
StartToCloseTimeout:单个 Activity 最大执行时间ScheduleToStartTimeout:Activity 等待 Worker 领取的时间HeartbeatTimeout:长任务心跳间隔(必须小于 StartToCloseTimeout)
activityOpts := &workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
ScheduleToStartTimeout: 1 * time.Minute,
HeartbeatTimeout: 30 * time.Second,
}
7.4 信号死锁问题
Workflow 中的 selector.Select() 如果没有 default 分支且没有其他可选择的操作,会导致死锁:
// ✅ 正确:使用 Selector 并配合 Timer
for waitingForSignal {
selector := workflow.NewSelector(ctx)
var sig string
selector.AddSignal(ctx, "my-signal", func(s workflow.Context, s2 string) {
sig = s2
})
selector.AddTimer(ctx, 1*time.Minute, func(auth workflow.Future) {
// 超时处理
})
selector.Selector(ctx)
}
总结展望
Temporal 代表的不仅仅是一个工具,更是一种分布式业务流程编程范式的转变。
传统的分布式系统开发,开发者需要同时操心两件事:业务逻辑和分布式协调。Temporal 通过「工作流即代码」的理念,让这两件事彻底分离。你写的是干净的同步代码,但得到的是一个可以在任意节点运行、支持任意失败恢复、随时可查询状态的分布式系统。
2026 年的 Temporal 生态已经相当成熟:
- 多语言 SDK 完善:Go、TypeScript、Python、Java 四大语言的生产级支持
- 运维工具成熟:Temporal Web UI、Temporal CLI、Prometheus 监控指标
- 社区活跃:GitHub 27k+ stars,生产案例覆盖支付、电商、金融、游戏等多个行业
- 云服务支持:AWS Temporal(叫 Amazon SQS Extended Library)、Temporal Cloud
对于后端开发者来说,如果你正在处理跨服务的业务流程、需要可靠的重试和补偿逻辑、或者需要长时间运行的任务编排——Temporal 绝对值得一试。它的学习曲线比任何替代方案都要平缓,而它解决的问题,比任何替代方案都要全面。
正如 Temporal 官方文档所说:「Build invincible apps」——用 Temporal 构建真正可靠的应用。
参考资源:
- 官方文档:https://docs.temporal.io/
- Go SDK:https://github.com/temporalio/sdk-go
- 官方示例:https://github.com/temporalio/samples-go
- Temporal Cloud:https://temporal.io/cloud