万字深度解析 Temporal 工作流编排:当分布式系统学会「故障自愈」——从事件溯源到生产级持久化执行的完全指南(2026)
Temporal 是微服务架构中缺失的那块拼图——它不是队列,不是状态机,而是一个「让代码在任何故障下都能从断点继续执行」的持久化执行引擎。本文从事件溯源原理到生产级部署,彻底拆解 Temporal 的技术内核。
摘要
2026 年,分布式系统的复杂性已经达到一个临界点:微服务数量突破三位数、跨服务事务频繁、网络分区成为常态、部分故障几乎每秒都在发生。传统方案(消息队列 + 数据库状态机)在这些场景下越来越力不从心——代码必须时刻考虑「如果这一步失败了怎么办」。
Temporal 给出了一个根本性的答案:让普通的 Go/Java/Python 代码具备「故障自愈」能力。你写的 Workflow 代码,即使在执行过程中进程崩溃、网络中断、数据库故障,重启后也能从最后一个成功的步骤继续执行,就像什么都没发生过一样。
本文深度解析 Temporal 的核心原理(Event Sourcing、Command Pattern、Deterministic Replay)、架构设计(Worker、History Service、Matching Service)、Go SDK 实战(Workflow、Activity、Child Workflow、Saga Pattern)、生产级部署(Prometheus 监控、Grafana 仪表盘、灰度发布、版本ing)、性能优化(吞吐量与延迟调优),以及何时应该选择 Temporal、何时不应该用 Temporal。
目录
- 为什么需要 Temporal?——分布式系统的「失败地狱」
- Temporal 是什么?——重新定义「可靠执行」
- 核心概念速览
- 架构深度解析
- 事件溯源(Event Sourcing)原理
- 确定性重放(Deterministic Replay)
- 快速上手:第一个 Temporal Workflow
- Workflow 编写规范与陷阱
- Activity:分布式系统中的「副作用」
- 错误处理与重试策略
- Saga 模式:分布式事务的最终一致性
- Child Workflow 与 Parent-Child 关系
- Signal:向运行中的 Workflow 发送消息
- Query:查询 Workflow 状态
- 版本ing:不停机的代码升级
- 生产级部署
- 可观测性:Metrics、Tracing、Logging
- 性能优化实战
- Temporal vs 其他方案对比
- 真实案例:Uber、Netflix、Box 如何使用 Temporal
- 总结与展望
1. 为什么需要 Temporal?——分布式系统的「失败地狱」
1.1 传统分布式系统的痛点
假设你正在编写一个电商订单处理流程:
1. 创建订单(数据库写入)
2. 扣减库存(调用库存服务)
3. 扣减账户余额(调用支付服务)
4. 发送确认邮件(调用邮件服务)
5. 更新订单状态为「已完成」
在理想情况下,这段代码按顺序执行,一切顺利。但生产环境中:
- 步骤 2 失败(库存服务超时):订单已创建,但库存未扣减。是重试?还是标记订单为失败?如果重试时网络又断了呢?
- 步骤 3 失败(余额不足):订单和库存怎么办?已经扣减的库存如何回滚?
- 步骤 4 失败(邮件服务宕机):订单已经创建、库存已扣、钱已扣,但用户没收到确认邮件。是重试邮件?还是整个流程回滚?
- 服务进程在步骤 2 之后崩溃:重启后,服务不知道订单处理到哪一步了。是从头开始(可能导致重复扣减)?还是跳过步骤 1 和 2(但怎么知道它们已经成功了)?
传统解决方案是在每一步之后把「当前进度」写入数据库,并在服务重启后检查进度,然后从断点继续执行。但这带来了新的问题:
- 进度状态表的 schema 会随着业务流程变化而频繁修改
- 「执行逻辑」和「状态管理」紧密耦合,代码难以维护
- 重试逻辑、超时处理、幂等性保证需要手写大量样板代码
- 跨服务的事务一致性(Saga 模式)实现复杂,容易出错
1.2 Temporal 的解决思路
Temporal 的核心洞察是:把「执行状态」和「业务逻辑」完全分离。
在 Temporal 中,你写的代码看起来完全是同步的、顺序的:
func OrderWorkflow(ctx workflow.Context, orderID string) error {
// 1. 创建订单
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var createResult string
err := workflow.ExecuteActivity(ctx, CreateOrderActivity, orderID).Get(ctx, &createResult)
if err != nil {
return err
}
// 2. 扣减库存
var inventoryResult string
err = workflow.ExecuteActivity(ctx, DeductInventoryActivity, orderID).Get(ctx, &inventoryResult)
if err != nil {
// 失败:需要回滚创建订单
_ = workflow.ExecuteActivity(ctx, CancelOrderActivity, orderID).Get(ctx, nil)
return err
}
// 3. 扣减余额
err = workflow.ExecuteActivity(ctx, DeductBalanceActivity, orderID).Get(ctx, nil)
if err != nil {
// 失败:需要回滚库存和订单
_ = workflow.ExecuteActivity(ctx, RestoreInventoryActivity, orderID).Get(ctx, nil)
_ = workflow.ExecuteActivity(ctx, CancelOrderActivity, orderID).Get(ctx, nil)
return err
}
// 4. 发送邮件
_ = workflow.ExecuteActivity(ctx, SendEmailActivity, orderID).Get(ctx, nil)
// 5. 完成
_ = workflow.ExecuteActivity(ctx, MarkOrderCompleteActivity, orderID).Get(ctx, nil)
return nil
}
关键点:这段代码在 Temporal 中运行,具有「故障自愈」能力:
- 如果服务在步骤 2 之后崩溃,Temporal 会在另一个 Worker 上从步骤 3 继续执行(步骤 1 和 2 不会重跑)
- 如果 Activity 执行失败(网络超时、服务宕机),Temporal 会自动重试(可配置重试策略)
- 整个 Workflow 的执行状态由 Temporal 持久化存储,开发者无需手动管理
2. Temporal 是什么?——重新定义「可靠执行」
2.1 Temporal 的定义
Temporal 是一个持久化执行引擎(Durable Execution Engine)。它的核心能力是:
让任意代码(Workflow)在任何故障(进程崩溃、网络中断、机器宕机)的情况下,都能从最后一个成功的断点继续执行,且保证执行结果的最终一致性。
Temporal 不是:
- ❌ 消息队列(RabbitMQ、Kafka)—— Temporal 不负责消息路由,而是负责「有状态的长时运行流程」
- ❌ 任务调度器(Celery、Quartz)—— Temporal 不仅调度任务,还保证任务的完整执行(包括失败恢复)
- ❌ 状态机框架(AWS Step Functions)—— Temporal 更灵活,支持复杂的编程逻辑(循环、条件、错误处理)
- ❌ 工作流引擎(Camunda、Zeebe)—— Temporal 更底层,直接以代码定义工作流(而非 XML/BPMN)
2.2 Temporal 的核心架构
Temporal 采用「控制平面 + 数据平面」分离的架构:
┌─────────────────────────────────────────────────────────────┐
│ Temporal Cluster │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Frontend │ │ History │ │ Matching │ │
│ │ Service │ │ Service │ │ Service │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌───────┴────────┐ │
│ │ Persistence │ │
│ │ (PostgreSQL/ │ │
│ │ MySQL/Cassandra) │
│ └────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │
│ gRPC │ Task Queue
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Worker │ │ Worker │
│ (Go/Java/ │ │ (Go/Java/ │
│ Python) │ │ Python) │
└─────────────┘ └─────────────┘
核心组件:
- Frontend Service:接收来自客户端的请求(启动 Workflow、发送 Signal、查询状态)
- History Service:核心组件,负责记录 Workflow 执行的历史事件(Event Sourcing),并在 Worker 失败时调度重试
- Matching Service:将 Task(Activity Task、Workflow Task)分发给可用的 Worker
- Persistence Layer:持久化存储 Workflow 状态(事件历史、当前状态)
Worker:
- Worker 是执行 Workflow 和 Activity 的进程
- Worker 可以水平扩展(添加更多 Worker 节点)
- Worker 从 Task Queue 拉取任务执行
3. 核心概念速览
3.1 Workflow
Workflow 是 Temporal 的核心抽象。它定义了「要做什么」的业务逻辑。
特性:
- 持久化执行:即使 Worker 崩溃,Workflow 也会从断点继续执行
- 确定性:同样的输入,必须产生同样的执行路径(不能调用随机数、当前时间等不确定函数)
- 可重放:Temporal 会根据事件历史「重放」Workflow 代码,以恢复内存状态
func MyWorkflow(ctx workflow.Context, input string) (string, error) {
// Workflow 代码:定义执行逻辑
// 不能调用:time.Now()、math.Random()、goroutine、channel
// 只能调用:workflow.Sleep()、workflow.ExecuteActivity()、workflow.WaitCondition()
var result string
err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}
3.2 Activity
Activity 是 Workflow 中执行「副作用」的单元。比如:调用外部 API、写入数据库、发送邮件。
特性:
- 可重试:Activity 失败后可以自动重试(可配置重试策略)
- 幂等性:Activity 可能被多次执行(由于重试),必须是幂等的
- 超时控制:可以设置 StartToCloseTimeout、HeartbeatTimeout
func MyActivity(ctx context.Context, input string) (string, error) {
// Activity 代码:执行副作用
// 可以调用:任何 Go 代码(time.Now()、goroutine、第三方库)
result, err := callExternalAPI(input)
return result, err
}
3.3 Task Queue
Task Queue 是 Worker 拉取任务的队列。
- Workflow Task Queue:用于调度 Workflow 执行
- Activity Task Queue:用于调度 Activity 执行
// 启动 Workflow,指定 Task Queue
client.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
ID: "workflow-id-123",
TaskQueue: "my-task-queue",
}, MyWorkflow, "input")
3.4 Namespace
Namespace 是 Temporal 中的多租户隔离单元。
- 不同 Namespace 的 Workflow 完全隔离
- 可以用于:环境隔离(dev/staging/prod)、团队隔离、客户隔离
// 连接到指定 Namespace
client, err := client.Dial(client.Options{
HostPort: "temporal:7233",
Namespace: "production",
})
3.5 Worker
Worker 是执行 Workflow 和 Activity 的进程。
func main() {
c, err := client.Dial(client.Options{HostPort: "temporal:7233"})
if err != nil {
log.Fatalln("Unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, "my-task-queue", worker.Options{})
// 注册 Workflow 和 Activity
w.RegisterWorkflow(MyWorkflow)
w.RegisterActivity(MyActivity)
// 启动 Worker(阻塞)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start Worker", err)
}
}
4. 架构深度解析
4.1 Temporal 的执行模型
Temporal 的执行模型基于事件溯源(Event Sourcing)和命令模式(Command Pattern)。
执行流程:
- 客户端调用
ExecuteWorkflow(),Temporal 记录WorkflowExecutionStarted事件 - History Service 调度第一个
WorkflowTask到 Task Queue - Worker 拉取
WorkflowTask,执行 Workflow 代码 - Workflow 代码调用
ExecuteActivity(),Temporal 记录ActivityTaskScheduled事件 - History Service 调度
ActivityTask到 Task Queue - Worker 拉取
ActivityTask,执行 Activity 代码 - Activity 执行完成,Worker 返回结果
- Temporal 记录
ActivityTaskCompleted事件 - History Service 调度下一个
WorkflowTask - Worker 重放 Workflow 代码(从事件历史恢复状态),继续执行
- 重复步骤 4-10,直到 Workflow 完成
关键点:Workflow 代码会被执行多次:
- 第一次:正常执行
- 后续:Worker 失败重启后,Temporal 会「重放」Workflow 代码,从事件历史恢复状态
因此,Workflow 代码必须是确定性的(Deterministic)。
4.2 事件历史(Event History)
Temporal 将 Workflow 的每次执行都记录为一系列事件。
示例事件历史:
1. WorkflowExecutionStarted
2. WorkflowTaskScheduled
3. WorkflowTaskStarted
4. WorkflowTaskCompleted
5. ActivityTaskScheduled (Activity: CreateOrder)
6. ActivityTaskStarted
7. ActivityTaskCompleted (Result: "order-123")
8. ActivityTaskScheduled (Activity: DeductInventory)
9. ActivityTaskStarted
10. ActivityTaskFailed (Error: "timeout")
11. ActivityTaskScheduled (Activity: DeductInventory) # 重试
12. ActivityTaskStarted
13. ActivityTaskCompleted
...
事件历史的作用:
- 故障恢复:Worker 重启后,根据事件历史重放 Workflow 状态
- 审计:可以查看 Workflow 的完整执行历史
- 调试:可以「回放」Workflow 执行过程
4.3 确定性重放(Deterministic Replay)
问题:Workflow 代码在执行过程中会调用 ExecuteActivity()。如果 Worker 重启,Temporal 需要「恢复」Workflow 的内存状态。但 Activity 是异步的(需要等待完成),如何恢复?
解决:Temporal 使用事件历史来重放 Workflow 代码。
重放过程:
- Worker 重启后,Temporal 将事件历史发送给 Worker
- Worker 重新执行 Workflow 代码
- 当代码执行到
ExecuteActivity()时,Temporal 检查事件历史中是否已经有对应的ActivityTaskCompleted事件- 如果有:直接返回结果,不实际执行 Activity
- 如果没有:实际执行 Activity
- 这样,Workflow 代码执行到「断点」时,内存状态就恢复了
要求:Workflow 代码必须是确定性的。
❌ 错误示例(不确定性代码):
func MyWorkflow(ctx workflow.Context) error {
// 错误:time.Now() 每次执行结果不同
now := time.Now()
// 错误:随机数
randNum := rand.Intn(100)
// 错误:goroutine(执行顺序不确定)
go func() {
fmt.Println("hello")
}()
return nil
}
✅ 正确示例:
func MyWorkflow(ctx workflow.Context) error {
// 正确:使用 workflow.Now()
now := workflow.Now(ctx)
// 正确:使用 workflow.Random()
randNum := workflow.Random(ctx, 100)
// 正确:使用 workflow.Go()
workflow.Go(ctx, func(ctx workflow.Context) {
// ...
})
return nil
}
5. 事件溯源(Event Sourcing)原理
5.1 什么是事件溯源?
事件溯源是一种架构模式:系统的状态不是直接存储的,而是存储为一系列不可变的事件。当前状态是通过「重放」这些事件计算出来的。
传统方式(CRUD):
订单状态:{ "status": "PAID", "updated_at": "2026-01-01" }
事件溯源方式:
事件 1:OrderCreated { "order_id": "123", "amount": 100, "timestamp": "..." }
事件 2:PaymentDeducted { "order_id": "123", "amount": 100, "timestamp": "..." }
事件 3:OrderPaid { "order_id": "123", "timestamp": "..." }
当前状态 = 重放事件 1 → 2 → 3。
5.2 Temporal 中的事件溯源
Temporal 使用事件溯源来实现「故障恢复」:
- Workflow 的每次执行都记录为事件
- Worker 重启后,根据事件历史「重放」Workflow 代码,恢复内存状态
优势:
- 完整的审计日志:可以查看 Workflow 的完整执行历史
- 时间旅行调试:可以将 Workflow 状态「回滚」到任意历史事件
- 故障恢复:Worker 崩溃后,可以从事件历史恢复
劣势:
- 事件历史可能很大:长时间运行的 Workflow 会产生大量事件
- Temporal 解决方案:可以「截断」旧事件(保留最近 N 个事件)
- 重放开销:Worker 重启后需要重放事件历史
- Temporal 解决方案:缓存(Worker 本地缓存事件历史)
6. 确定性重放(Deterministic Replay)
6.1 为什么需要确定性?
Workflow 代码会被执行多次(正常执行 + 重放)。如果代码是非确定性的(比如调用 time.Now()),那么重放时得到的结果会和第一次执行不同,导致状态不一致。
示例:
func MyWorkflow(ctx workflow.Context) error {
// 非确定性:每次执行结果不同
waitTime := rand.Intn(10) // 可能是 3,也可能是 7
workflow.Sleep(ctx, time.Duration(waitTime)*time.Second)
return nil
}
- 第一次执行:
waitTime = 3,记录事件TimerScheduled (3s) - Worker 重启,重放:
- 重放时
waitTime = 7(随机数种子不同) - 代码尝试记录
TimerScheduled (7s) - Temporal 发现事件历史中是
TimerScheduled (3s),不一致 → 报错
- 重放时
6.2 Temporal 如何保证确定性?
Temporal 提供了一套确定性的 API 来替代不确定的标准库函数:
| 标准库 | Temporal API | 说明 |
|---|---|---|
time.Now() | workflow.Now(ctx) | 返回事件发生时间(固定) |
time.Sleep() | workflow.Sleep() | 确定性的定时器 |
rand.Intn() | workflow.Random() | 使用固定的随机数种子 |
go func() | workflow.Go() | 确定性的 goroutine |
select {} | workflow.Select() | 确定性的 select |
channel | workflow.Channel | 确定性的 channel |
禁用的函数:
- ❌
time.Now()(使用workflow.Now()) - ❌
time.Sleep()(使用workflow.Sleep()) - ❌
rand.*(使用workflow.Random()) - ❌
go func()(使用workflow.Go()) - ❌ 直接创建 goroutine
- ❌ 直接操作 channel
6.3 重放模式(Replay Mode)
Temporal Worker 在执行 Workflow 时,有两种模式:
- 正常执行模式:第一次执行 Workflow 代码
- 重放模式:根据事件历史重放 Workflow 代码
如何判断当前是否处于重放模式?
func MyWorkflow(ctx workflow.Context) error {
// 判断是否在重放
if workflow.IsReplaying(ctx) {
// 重放模式:不执行副作用(比如打印日志)
// 因为这段代码在第一次执行时已经执行过了
} else {
// 正常模式:可以执行副作用
fmt.Println("Workflow started")
}
// ...
return nil
}
注意:通常不需要手动判断 IsReplaying(),Temporal 会自动处理。
7. 快速上手:第一个 Temporal Workflow
7.1 安装 Temporal
本地开发:使用 Temporal Standalone 模式(所有组件运行在一个进程中)
# 安装 Temporal CLI
brew install temporal
# 启动本地 Temporal 服务
temporal server start-dev
生产环境:使用 Temporal Cloud 或自托管 Temporal Cluster
# 使用 Docker Compose 部署 Temporal Cluster
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up
7.2 第一个 Workflow
项目结构:
my-temporal-app/
├── go.mod
├── go.sum
├── worker/
│ └── main.go
└── workflow/
├── workflow.go
└── activity.go
workflow/workflow.go:
package workflow
import (
"context"
"time"
"go.temporal.io/sdk/workflow"
)
// SayHelloWorkflow 是一个简单的 Workflow:说 Hello
func SayHelloWorkflow(ctx workflow.Context, name string) (string, error) {
// 记录开始日志(仅在非重放模式下执行)
workflow.GetLogger(ctx).Info("SayHelloWorkflow started", "name", name)
// 调用 Activity
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, SayHelloActivity, name).Get(ctx, &result)
if err != nil {
return "", err
}
// 记录完成日志
workflow.GetLogger(ctx).Info("SayHelloWorkflow completed", "result", result)
return result, nil
}
workflow/activity.go:
package workflow
import (
"context"
"fmt"
)
// SayHelloActivity 是一个简单的 Activity:返回 "Hello, {name}!"
func SayHelloActivity(ctx context.Context, name string) (string, error) {
// Activity 可以调用任意 Go 代码
result := fmt.Sprintf("Hello, %s!", name)
return result, nil
}
worker/main.go:
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"my-temporal-app/workflow"
)
func main() {
// 创建 Temporal 客户端
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatalln("Unable to create Temporal client", err)
}
defer c.Close()
// 创建 Worker
w := worker.New(c, "hello-task-queue", worker.Options{})
// 注册 Workflow 和 Activity
w.RegisterWorkflow(workflow.SayHelloWorkflow)
w.RegisterActivity(workflow.SayHelloActivity)
// 启动 Worker(阻塞)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start Worker", err)
}
}
启动 Workflow(客户端):
package main
import (
"context"
"log"
"time"
"go.temporal.io/sdk/client"
"my-temporal-app/workflow"
)
func main() {
// 创建 Temporal 客户端
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatalln("Unable to create Temporal client", err)
}
defer c.Close()
// 启动 Workflow
workflowOptions := client.StartWorkflowOptions{
ID: "hello-workflow-1",
TaskQueue: "hello-task-queue",
}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflow.SayHelloWorkflow, "World")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
// 等待 Workflow 完成
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Workflow failed", err)
}
log.Println("Workflow completed", "result", result)
}
7.3 运行
# 终端 1:启动 Temporal 服务
temporal server start-dev
# 终端 2:启动 Worker
go run worker/main.go
# 终端 3:启动 Workflow
go run client/main.go
输出:
2026/01/01 12:00:00 SayHelloWorkflow started {name: World}
2026/01/01 12:00:00 SayHelloWorkflow completed {result: Hello, World!}
2026/01/01 12:00:00 Workflow completed {result: Hello, World!}
8. Workflow 编写规范与陷阱
8.1 确定性规则
必须遵守:
- ❌ 不要调用
time.Now()→ ✅ 使用workflow.Now(ctx) - ❌ 不要调用
time.Sleep()→ ✅ 使用workflow.Sleep() - ❌ 不要使用
rand包 → ✅ 使用workflow.Random() - ❌ 不要直接创建 goroutine → ✅ 使用
workflow.Go() - ❌ 不要直接操作 channel → ✅ 使用
workflow.NewChannel() - ❌ 不要使用
select→ ✅ 使用workflow.Select() - ❌ 不要调用外部 API → ✅ 使用 Activity
- ❌ 不要写文件/数据库 → ✅ 使用 Activity
示例:错误的 Workflow
func BadWorkflow(ctx workflow.Context) error {
// 错误 1:调用 time.Now()
now := time.Now() // ❌
// 错误 2:使用 rand
num := rand.Intn(100) // ❌
// 错误 3:直接创建 goroutine
go func() { // ❌
fmt.Println("hello")
}()
// 错误 4:调用外部 API
resp, err := http.Get("https://api.example.com") // ❌
return nil
}
示例:正确的 Workflow
func GoodWorkflow(ctx workflow.Context) error {
// 正确 1:使用 workflow.Now()
now := workflow.Now(ctx) // ✅
// 正确 2:使用 workflow.Random()
num := workflow.Random(ctx, 100) // ✅
// 正确 3:使用 workflow.Go()
workflow.Go(ctx, func(ctx workflow.Context) { // ✅
workflow.GetLogger(ctx).Info("hello")
})
// 正确 4:使用 Activity 调用外部 API
var result string
err := workflow.ExecuteActivity(ctx, CallAPIActivity).Get(ctx, &result) // ✅
return nil
}
8.2 循环与条件
Workflow 中可以使用循环和条件,但必须是确定性的。
示例:确定性循环
func LoopWorkflow(ctx workflow.Context, count int) error {
for i := 0; i < count; i++ {
// 确定性:循环次数由输入参数决定(固定)
var result string
err := workflow.ExecuteActivity(ctx, MyActivity, i).Get(ctx, &result)
if err != nil {
return err
}
}
return nil
}
示例:不确定性循环(错误)
func BadLoopWorkflow(ctx workflow.Context) error {
// 错误:循环次数取决于外部状态(不确定)
count := getCountFromDatabase() // ❌ 不能这样做
for i := 0; i < count; i++ {
// ...
}
return nil
}
8.3 异常处理
Workflow 中可以使用 defer 和 recover。
func MyWorkflow(ctx workflow.Context) (err error) {
// defer:无论是否 panic,都会执行
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("workflow panicked: %v", r)
}
}()
// 调用 Activity
err = workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
if err != nil {
// 处理错误
return err
}
return nil
}
9. Activity:分布式系统中的「副作用」
9.1 Activity 的特性
Activity 是 Workflow 中执行「副作用」的单元。
特性:
- 可重试:Activity 失败后可以自动重试
- 幂等性:Activity 可能被多次执行(由于重试),必须是幂等的
- 超时控制:可以设置 StartToCloseTimeout、HeartbeatTimeout
- 心跳(Heartbeat):长时间运行的 Activity 可以定期发送心跳,证明自己还活着
9.2 Activity 重试策略
func MyWorkflow(ctx workflow.Context) error {
// 配置 Activity 重试策略
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Second, // 首次重试等待 1s
MaximumInterval: 10 * time.Second, // 最多等待 10s
BackoffCoefficient: 2.0, // 退避系数:1s → 2s → 4s → 8s → 10s
MaximumAttempts: 5, // 最多重试 5 次
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 调用 Activity(失败会自动重试)
err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
return err
}
9.3 心跳(Heartbeat)
长时间运行的 Activity 应该定期发送心跳,防止被判定为「超时」。
func LongRunningActivity(ctx context.Context) error {
// 获取心跳详情
heartbeatDetails := temporal.GetHeartbeatDetails(ctx)
// 执行长时间任务
for i := 0; i < 100; i++ {
// 执行一部分工作
// ...
// 发送心跳(附带进度信息)
temporal.Heartbeat(ctx, i)
// 检查是否被取消
if temporal.IsCanceled(ctx) {
return fmt.Errorf("activity canceled")
}
time.Sleep(1 * time.Second)
}
return nil
}
9.4 Activity 超时控制
func MyWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second, // Activity 必须在 30s 内完成
HeartbeatTimeout: 5 * time.Second, // 每 5s 必须发送一次心跳
}
ctx = workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, nil)
return err
}
10. 错误处理与重试策略
10.1 Workflow 级别的重试
Workflow 失败后,可以配置重试策略。
func main() {
c, _ := client.Dial(client.Options{HostPort: "localhost:7233"})
defer c.Close()
workflowOptions := client.StartWorkflowOptions{
ID: "my-workflow",
TaskQueue: "my-task-queue",
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Second,
MaximumInterval: 10 * time.Second,
BackoffCoefficient: 2.0,
MaximumAttempts: 3,
},
}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, MyWorkflow)
// ...
}
10.2 Activity 级别的重试
10.3 自定义错误处理
func MyWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
NonRetryableErrorTypes: []string{"InvalidInputError"}, // 这些错误不重试
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
if err != nil {
var applicationErr *temporal.ApplicationError
if errors.As(err, &applicationErr) {
if applicationErr.Type() == "InvalidInputError" {
// 不重试,直接返回
return err
}
}
// 其他错误会重试
return err
}
return nil
}
11. Saga 模式:分布式事务的最终一致性
11.1 什么是 Saga 模式?
Saga 模式是一种分布式事务模式:将大事务拆分为一系列本地事务,每个本地事务都有对应的「补偿事务」。如果某个本地事务失败,则按顺序执行前面所有本地事务的补偿事务,实现回滚。
示例:电商订单流程
事务 1:创建订单
事务 2:扣减库存
事务 3:扣减余额
事务 4:发送确认邮件
如果事务 3 失败:
补偿 2:恢复库存
补偿 1:取消订单
11.2 Temporal 中实现 Saga
Temporal 提供了 workflow.Saga 来简化 Saga 模式的实现。
func OrderWorkflow(ctx workflow.Context, orderID string) error {
// 创建 Saga
saga := workflow.NewSaga(ctx)
// 事务 1:创建订单
var orderResult string
err := workflow.ExecuteActivity(ctx, CreateOrderActivity, orderID).Get(ctx, &orderResult)
if err != nil {
return err
}
// 注册补偿事务
saga.AddCompensation(ctx, CancelOrderActivity, orderID)
// 事务 2:扣减库存
var inventoryResult string
err = workflow.ExecuteActivity(ctx, DeductInventoryActivity, orderID).Get(ctx, &inventoryResult)
if err != nil {
// 失败:执行补偿事务(取消订单)
return saga.Compensate(ctx, nil)
}
// 注册补偿事务
saga.AddCompensation(ctx, RestoreInventoryActivity, orderID)
// 事务 3:扣减余额
err = workflow.ExecuteActivity(ctx, DeductBalanceActivity, orderID).Get(ctx, nil)
if err != nil {
// 失败:执行补偿事务(恢复库存、取消订单)
return saga.Compensate(ctx, nil)
}
// 注册补偿事务
saga.AddCompensation(ctx, RefundBalanceActivity, orderID)
// 事务 4:发送邮件
_ = workflow.ExecuteActivity(ctx, SendEmailActivity, orderID).Get(ctx, nil)
return nil
}
11.3 手动实现 Saga
如果不使用 workflow.Saga,也可以手动实现:
func OrderWorkflowManual(ctx workflow.Context, orderID string) (err error) {
// defer:如果出错,执行补偿事务
defer func() {
if err != nil {
// 执行补偿事务(逆序)
_ = workflow.ExecuteActivity(ctx, RefundBalanceActivity, orderID).Get(ctx, nil)
_ = workflow.ExecuteActivity(ctx, RestoreInventoryActivity, orderID).Get(ctx, nil)
_ = workflow.ExecuteActivity(ctx, CancelOrderActivity, orderID).Get(ctx, nil)
}
}()
// 事务 1
err = workflow.ExecuteActivity(ctx, CreateOrderActivity, orderID).Get(ctx, nil)
if err != nil {
return err
}
// 事务 2
err = workflow.ExecuteActivity(ctx, DeductInventoryActivity, orderID).Get(ctx, nil)
if err != nil {
return err
}
// 事务 3
err = workflow.ExecuteActivity(ctx, DeductBalanceActivity, orderID).Get(ctx, nil)
if err != nil {
return err
}
return nil
}
12. Child Workflow 与 Parent-Child 关系
12.1 什么是 Child Workflow?
Child Workflow 是由 Parent Workflow 启动的 Workflow。
使用场景:
- 将一个大 Workflow 拆分为多个小 Workflow
- 并行执行多个独立的子流程
- 需要独立的重试策略、超时控制
12.2 启动 Child Workflow
func ParentWorkflow(ctx workflow.Context) error {
// 启动 Child Workflow
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: "child-workflow-1",
ParentClosePolicy: workflow.ParentClosePolicyTerminate, // Parent 失败时,Child 也终止
})
var childResult string
err := workflow.ExecuteChildWorkflow(childCtx, ChildWorkflow, "input").Get(ctx, &childResult)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Child workflow completed", "result", childResult)
return nil
}
func ChildWorkflow(ctx workflow.Context, input string) (string, error) {
// Child Workflow 逻辑
return "child result", nil
}
12.3 Parent-Child 关系
ParentClosePolicy:
ParentClosePolicyTerminate:Parent 失败时,Child 也被终止ParentClosePolicyAbandon:Parent 失败时,Child 继续执行ParentClosePolicyRequestCancel:Parent 失败时,请求取消 Child
13. Signal:向运行中的 Workflow 发送消息
13.1 什么是 Signal?
Signal 是向运行中的 Workflow 发送消息的机制。
使用场景:
- 用户取消订单(发送
cancel信号) - 管理员审批(发送
approve信号) - 动态修改 Workflow 参数
13.2 接收 Signal
func MyWorkflow(ctx workflow.Context) error {
// 注册 Signal 处理器
selector := workflow.NewSelector(ctx)
var cancelRequested bool
signalChan := workflow.GetSignalChannel(ctx, "cancel")
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
cancelRequested = true
workflow.GetLogger(ctx).Info("Cancel signal received")
})
// 执行逻辑,检查 cancelRequested
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
if err != nil {
return err
}
// 等待 Signal 或 Activity 完成
for {
if cancelRequested {
return fmt.Errorf("workflow canceled")
}
// 等待 1s 或 Signal 到达
selector.Select(ctx)
// 检查 Activity 是否完成
// ...
}
}
13.3 发送 Signal
func main() {
c, _ := client.Dial(client.Options{HostPort: "localhost:7233"})
defer c.Close()
// 发送 Signal 到运行中的 Workflow
err := c.SignalWorkflow(context.Background(), "my-workflow-id", "", "cancel", nil)
if err != nil {
log.Fatalln("Unable to signal workflow", err)
}
}
14. Query:查询 Workflow 状态
14.1 什么是 Query?
Query 是查询运行中 Workflow 状态的机制。与 Signal 不同,Query 不修改 Workflow 状态。
14.2 注册 Query
func MyWorkflow(ctx workflow.Context) error {
// 状态
var orderStatus string = "CREATED"
// 注册 Query 处理器
workflow.SetQueryHandler(ctx, "get_status", func() (string, error) {
return orderStatus, nil
})
// 更新状态
orderStatus = "PROCESSING"
_ = workflow.ExecuteActivity(ctx, ProcessOrderActivity).Get(ctx, nil)
orderStatus = "COMPLETED"
return nil
}
14.3 发送 Query
func main() {
c, _ := client.Dial(client.Options{HostPort: "localhost:7233"})
defer c.Close()
// 查询 Workflow 状态
response, err := c.QueryWorkflow(context.Background(), "my-workflow-id", "", "get_status")
if err != nil {
log.Fatalln("Unable to query workflow", err)
}
var status string
response.Get(&status)
log.Println("Workflow status:", status)
}
15. 版本ing:不停机的代码升级
15.1 问题:Workflow 代码升级后,如何兼容旧实例?
假设你的 Workflow 代码已经部署,且有 100 个运行中的 Workflow 实例。现在你需要修改 Workflow 逻辑(比如增加一个新步骤)。如果直接升级代码,旧实例重放时会使用新逻辑,可能导致不一致。
15.2 使用 workflow.GetVersion()
Temporal 提供了 workflow.GetVersion() 来实现版本兼容。
func MyWorkflow(ctx workflow.Context) error {
// 获取当前代码版本
version := workflow.GetVersion(ctx, "add-new-step", workflow.DefaultVersion, 1)
// 原有逻辑
_ = workflow.ExecuteActivity(ctx, Step1Activity).Get(ctx, nil)
// 新逻辑:仅在新版本中执行
if version == 1 {
_ = workflow.ExecuteActivity(ctx, NewStepActivity).Get(ctx, nil)
}
// 原有逻辑
_ = workflow.ExecuteActivity(ctx, Step2Activity).Get(ctx, nil)
return nil
}
原理:
- 旧实例(使用 DefaultVersion):
GetVersion()返回workflow.DefaultVersion,不执行新步骤 - 新实例(使用版本 1):
GetVersion()返回1,执行新步骤
15.3 版本迁移最佳实践
- 先部署新代码:新代码同时支持旧版本和新版本
- 等待所有旧实例完成:确保没有运行中的旧实例
- 删除旧版本代码:清理
GetVersion()中的旧版本分支
16. 生产级部署
16.1 Temporal Cluster 部署
生产环境推荐架构:
┌─────────────────────────────────────────────────┐
│ Load Balancer │
│ (HAProxy) │
└──────────────┬──────────────────────────────────┘
│
┌───────┴───────┐
│ │
┌──────▼─────┐ ┌──────▼─────┐
│ Frontend │ │ Frontend │ (2+ 实例,高可用)
│ Service │ │ Service │
└──────┬─────┘ └──────┬─────┘
│ │
└───────┬───────┘
│
┌───────┴───────┐
│ │
┌──────▼─────┐ ┌──────▼─────┐
│ History │ │ History │ (2+ 实例)
│ Service │ │ Service │
└──────┬─────┘ └──────┬─────┘
│ │
└───────┬───────┘
│
┌───────┴───────┐
│ │
┌──────▼─────┐ ┌──────▼─────┐
│ Matching │ │ Matching │ (2+ 实例)
│ Service │ │ Service │
└──────┬─────┘ └──────┬─────┘
│ │
└───────┬───────┘
│
┌───────┴───────┐
│ │
┌──────▼─────┐ ┌──────▼─────┐
│ Database │ │ Database │ (PostgreSQL/MySQL 主从复制)
│ (Primary) │ │ (Replica) │
└────────────┘ └────────────┘
使用 Docker Compose 部署(适用于小团队):
# docker-compose.yml
version: '3.8'
services:
temporal:
image: temporalio/auto-setup:latest
ports:
- "7233:7233"
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_DB=temporal
depends_on:
- postgres
postgres:
image: postgres:15
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
POSTGRES_DB: temporal
volumes:
- postgres-data:/var/lib/postgresql/data
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
postgres-data:
16.2 Worker 部署
Worker 部署建议:
- 水平扩展:根据 Task Queue 的 backlog 大小,动态添加/移除 Worker 实例
- 资源隔离:Workflow Worker 和 Activity Worker 分开部署(避免 Activity 长时间运行阻塞 Workflow)
- 健康检查:使用
/health端点(Temporal SDK 提供) - 优雅关闭:捕获
SIGTERM信号,完成当前任务后再退出
func main() {
c, _ := client.Dial(client.Options{HostPort: "temporal:7233"})
defer c.Close()
w := worker.New(c, "my-task-queue", worker.Options{})
w.RegisterWorkflow(MyWorkflow)
w.RegisterActivity(MyActivity)
// 优雅关闭
go func() {
<-worker.InterruptCh()
log.Println("Shutting down worker...")
w.Stop() // 完成当前任务后退出
}()
err := w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
16.3 数据库选择
Temporal 支持多种数据库:
| 数据库 | 适用场景 | 性能 |
|---|---|---|
| PostgreSQL | 生产环境(推荐) | 高 |
| MySQL | 生产环境 | 高 |
| Cassandra | 大规模部署(百万级 Workflow) | 最高 |
| SQLite | 本地开发/测试 | 低 |
PostgreSQL 配置建议:
# postgresql.conf
max_connections = 1000
shared_buffers = 4GB
effective_cache_size = 12GB
maintenance_work_mem = 1GB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 500
17. 可观测性:Metrics、Tracing、Logging
17.1 Metrics
Temporal 提供了丰富的 Prometheus Metrics。
关键指标:
temporal_workflow_started:Workflow 启动次数temporal_workflow_completed:Workflow 完成次数temporal_workflow_failed:Workflow 失败次数temporal_activity_execution_latency:Activity 执行延迟temporal_worker_task_slots_available:Worker 可用任务槽位数
配置 Prometheus:
# prometheus.yml
scrape_configs:
- job_name: 'temporal'
static_configs:
- targets: ['temporal:8233'] # Temporal Metrics 端口
- job_name: 'worker'
static_configs:
- targets: ['worker:8234'] # Worker Metrics 端口
17.2 Tracing
Temporal 支持 OpenTelemetry Tracing。
func main() {
// 创建 OpenTelemetry Tracer
tracerProvider, _ := newOtelTracerProvider()
c, _ := client.Dial(client.Options{
HostPort: "temporal:7233",
Tracer: tracerProvider.Tracer("my-app"),
})
defer c.Close()
w := worker.New(c, "my-task-queue", worker.Options{
Tracer: tracerProvider.Tracer("my-app"),
})
// ...
}
17.3 Logging
Workflow 中使用 workflow.GetLogger(ctx) 记录日志。
func MyWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
logger.Info("Workflow started", "input", "value")
logger.Warn("Something is wrong")
logger.Error("Activity failed", "error", err)
return nil
}
日志最佳实践:
- 使用结构化日志(key-value 格式)
- 包含 Workflow ID、Run ID
- 记录关键业务事件(订单创建、支付完成)
18. 性能优化实战
18.1 Workflow 性能优化
问题:Workflow 重放时,需要读取完整的事件历史。如果事件历史很大(比如 10 万事件),重放会很慢。
优化方案:
- 使用 ContinueAsNew:
- 当事件历史超过一定大小(比如 1 万事件),创建新的 Workflow Run
- 新 Run 的事件历史从空开始
func MyLongRunningWorkflow(ctx workflow.Context) error {
for {
// 执行一批任务
// ...
// 检查事件历史大小
if workflow.GetInfo(ctx).GetCurrentHistoryLength() > 10000 {
// 创建新 Run
return workflow.NewContinueAsNewError(ctx, MyLongRunningWorkflow, newInput)
}
}
}
- 减少事件数量:
- 避免过多的小型 Activity(合并为一个大型 Activity)
- 使用
workflow.ExecuteLocalActivity()(不记录事件)
18.2 Activity 性能优化
- 并行执行独立 Activity:
func MyWorkflow(ctx workflow.Context) error {
// 并行执行多个独立的 Activity
selector := workflow.NewSelector(ctx)
var result1 string
future1 := workflow.ExecuteActivity(ctx, Activity1)
selector.AddFuture(future1, func(f workflow.Future) {
f.Get(ctx, &result1)
})
var result2 string
future2 := workflow.ExecuteActivity(ctx, Activity2)
selector.AddFuture(future2, func(f workflow.Future) {
f.Get(ctx, &result2)
})
// 等待所有 Activity 完成
selector.Select(ctx)
selector.Select(ctx)
return nil
}
- 使用 Local Activity:
- Local Activity 在 Worker 本地执行,不通过 Temporal Cluster
- 适用于:执行时间短(< 1s)、失败影响小的任务
func MyWorkflow(ctx workflow.Context) error {
// 使用 Local Activity(不记录事件,性能更高)
var result string
err := workflow.ExecuteLocalActivity(ctx, MyLocalActivity, "input").Get(ctx, &result)
return err
}
18.3 Worker 性能优化
- 增加 Worker 数量:水平扩展
- 调整 Task Queue 并发度:
w := worker.New(c, "my-task-queue", worker.Options{
MaxConcurrentActivityExecution: 100, // 最多同时执行 100 个 Activity
MaxConcurrentWorkflowTaskExecution: 50, // 最多同时执行 50 个 Workflow Task
MaxConcurrentLocalActivityExecution: 200, // 最多同时执行 200 个 Local Activity
})
19. Temporal vs 其他方案对比
19.1 Temporal vs 消息队列(RabbitMQ、Kafka)
| 特性 | Temporal | 消息队列 |
|---|---|---|
| 持久化执行 | ✅ 原生支持 | ❌ 需要手动实现 |
| 故障恢复 | ✅ 自动 | ❌ 手动(消费端 ACK) |
| 分布式事务 | ✅ Saga 模式 | ❌ 需要手动实现 |
| 状态管理 | ✅ 自动 | ❌ 手动(数据库) |
| 超时控制 | ✅ 原生支持 | ❌ 手动(延迟队列) |
| 可观测性 | ✅ Web UI | ❌ 需要第三方工具 |
选择建议:
- 需要有状态的长时运行流程:选择 Temporal
- 简单的事件通知:选择消息队列
19.2 Temporal vs AWS Step Functions
| 特性 | Temporal | AWS Step Functions |
|---|---|---|
| 编程模型 | 代码定义(Go/Java/Python) | JSON/YAML 定义 |
| 灵活性 | ✅ 高(支持循环、条件、错误处理) | ❌ 低(受限于状态机模型) |
| 自托管 | ✅ 支持 | ❌ 仅云服务 |
| 成本 | ✅ 开源免费 | ❌ 按状态转换收费 |
| 多云支持 | ✅ 支持 | ❌ 仅 AWS |
选择建议:
- 需要复杂编程逻辑:选择 Temporal
- 简单流程 + 深度集成 AWS:选择 Step Functions
19.3 Temporal vs Camunda、Zeebe
| 特性 | Temporal | Camunda/Zeebe |
|---|---|---|
| 编程模型 | 代码定义 | BPMN 图形化定义 |
| 目标用户 | 开发者 | 业务分析师 + 开发者 |
| 灵活性 | ✅ 高 | ❌ 受限于 BPMN |
| 学习曲线 | ❌ 陡峭(需要理解事件溯源) | ✅ 平缓(图形化) |
选择建议:
- 开发者主导的项目:选择 Temporal
- 需要业务分析师参与流程设计:选择 Camunda
20. 真实案例:Uber、Netflix、Box 如何使用 Temporal
20.1 Uber:支付流程编排
Uber 使用 Temporal(前身是 Cadence)来编排支付流程。
挑战:
- 支付涉及多个服务(银行卡、PayPal、Uber Cash)
- 需要保证最终一致性(不能多扣、不能少扣)
- 部分服务可能超时或失败
Temporal 解决方案:
- 每个支付流程定义为一个 Workflow
- 使用 Saga 模式实现分布式事务
- 失败自动重试,保证最终一致性
20.2 Netflix:内容转码流程
Netflix 使用 Temporal 来编排视频转码流程。
挑战:
- 视频转码是长时间运行的任务(几小时到几天)
- 需要支持断点续传(Worker 崩溃后从中断点继续)
- 需要并行处理多个视频片段
Temporal 解决方案:
- 每个视频转码任务定义为一个 Workflow
- 并行转码多个视频片段(Child Workflow)
- Worker 崩溃后,Temporal 自动调度到其他 Worker 继续执行
20.3 Box:文件处理流程
Box 使用 Temporal 来编排文件处理流程(病毒扫描、缩略图生成、元数据提取)。
挑战:
- 文件处理涉及多个步骤,每个步骤可能失败
- 需要保证文件处理的原子性(要么全部成功,要么全部失败)
- 需要支持手动审批(比如病毒扫描失败)
Temporal 解决方案:
- 每个文件处理任务定义为一个 Workflow
- 使用 Signal 实现手动审批
- 使用 Query 实现处理进度查询
21. 总结与展望
21.1 Temporal 的核心价值
简化分布式系统开发:
- 不需要手动管理状态、重试、超时
- 代码看起来是同步的、顺序的,但实际是分布式的、容错的
提高系统可靠性:
- 自动故障恢复
- 保证最终一致性
提高开发效率:
- 不需要写大量的样板代码(重试、超时、状态管理)
- 专注于业务逻辑
21.2 何时应该使用 Temporal?
适合使用 Temporal 的场景:
- ✅ 长时运行的流程(几秒到几天)
- ✅ 需要保证最终一致性的分布式事务
- ✅ 需要故障恢复(从断点继续执行)
- ✅ 需要人工审批(Signal/Query)
- ✅ 需要查询流程状态(Query)
不适合使用 Temporal 的场景:
- ❌ 简单的 CRUD 操作(直接用数据库)
- ❌ 高吞吐量、低延迟的请求(直接用 gRPC/HTTP)
- ❌ 实时流处理(用 Flink/Spark)
21.3 Temporal 的未来
- 多云支持:Temporal Cloud 已经支持 AWS、GCP、Azure
- 更多语言 SDK:已经支持 Go、Java、Python、TypeScript,未来会支持更多语言
- 更好的可观测性:Temporal Web UI 正在改进(更好的可视化、调试工具)
- 性能优化:事件历史压缩、更快的重放
参考资源
- 官方文档:https://docs.temporal.io/
- GitHub:https://github.com/temporalio/temporal
- 社区论坛:https://community.temporal.io/
- 示例代码:https://github.com/temporalio/samples-go
作者注:本文基于 Temporal v1.25.0(2026 年 6 月)编写。Temporal 是一个快速发展的开源项目,具体 API 可能会有所变化,请以官方文档为准。
文章字数:约 18000 字
标签:Temporal 工作流编排 分布式系统 持久化执行 事件溯源 Go 微服务 Saga模式 故障恢复 云原生