Temporal 深度实战:当持久化执行遇见 AI Agent 时代——从 Event History 到 Serverless Workers、Workflow Streams 与 MCP 调试的生产级完全指南(2026)
一、为什么 2026 年你必须重新认识 Temporal
如果你对 Temporal 的印象还停留在「又一个分布式工作流引擎」,那你就大错特错了。
2026 年 6 月,Temporal 在 Replay 2026 大会上扔出了四枚重磅炸弹:Serverless Workers、Standalone Activities、Workflow Streams,以及与 Google Cloud 的深度集成。与此同时,Coinbase 开源了 Temporal MCP Server,让 AI 编辑器直接读懂你的 Workflow 状态;Neo4j、Auth0、Redpanda 纷纷与 Temporal 深度整合,构建 AI Agent 的完整基础设施栈。
这不是「工作流引擎」的进化,这是持久化执行(Durable Execution)范式的成熟——在 AI Agent 大规模落地的 2026 年,Temporal 正在成为那个让 Agent 不再「失忆」、不再「崩溃后无法恢复」的核心基础设施。
本文将从 Temporal 的底层架构讲起,深入剖析 Event History、确定性重放、Workflow/Activity 分离等核心概念,然后带你实战 Replay 2026 的全新特性,最后给出生产级部署与性能调优的完整方案。
二、持久化执行:一个被严重低估的超能力
2.1 传统微服务的「失忆症」
先看一个典型场景:电商下单流程。
用户下单 → 扣库存 → 创建支付单 → 调用第三方支付 → 等待回调 → 发货 → 通知
在传统架构中,你会怎么实现?
- 方案 A:同步链式调用 —— 任何一个环节超时,整个请求失败,用户看到 500。库存扣了但支付没成功?手动补偿吧。
- 方案 B:消息队列 —— 每个步骤发个消息,下游消费。看起来解耦了,但「下单流程」这个业务概念被拆散到了 7 个 consumer 里,谁也说不出「订单 #12345 现在走到哪一步了」。
- 方案 C:状态机 + 数据库 —— 每一步更新 DB 状态。能查进度了,但超时重试、补偿逻辑全要自己写,代码量爆炸。
三种方案的共同问题:没有持久化执行。进程崩溃、Pod 重启、节点故障——执行状态就丢了。你可能加了重试,但重试的语义是什么?幂等性保证了吗?补偿事务写了没有?
2.2 Temporal 的答案:代码即状态
Temporal 的核心哲学极其简单:你的代码就是状态机,我来保证它永远不会丢失执行进度。
看一段代码:
@workflow.defn
class OrderWorkflow:
def __init__(self):
self.payment_callback = None
@workflow.run
async def run(self, order: OrderRequest) -> OrderResult:
# 第一步:扣库存
inventory = await workflow.execute_activity(
deduct_inventory,
order,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
),
)
# 第二步:创建支付单
payment = await workflow.execute_activity(
create_payment,
inventory,
start_to_close_timeout=timedelta(seconds=30),
)
# 第三步:等待支付回调 —— 可以等几秒,也可以等几天
await workflow.wait_condition(
lambda: self.payment_callback is not None,
timeout=timedelta(days=7),
)
# 第四步:发货
shipment = await workflow.execute_activity(
ship_order,
payment,
start_to_close_timeout=timedelta(minutes=5),
)
return OrderResult(shipment_id=shipment.id)
这段代码读起来就像同步代码——但它的执行是持久化的。当 wait_condition 等待支付回调时,Worker 进程可以直接关机。三天后用户付款了,回调触发,Temporal 自动恢复执行,从 wait_condition 之后的代码继续运行。
不需要数据库存状态,不需要消息队列传回调,不需要定时任务轮询超时。代码写出来是什么样,执行就是什么样——只是永远不会「忘记」执行到哪了。
2.3 这不是魔法,是 Event History
Temporal 之所以能做到这一点,核心机制是 Event History(事件历史)。
每当 Workflow 执行一个操作(启动、调用 Activity、设置定时器、接收信号……),Temporal Server 都会向 Event History 追加一条不可变的事件记录:
1 WorkflowExecutionStarted input={order_id: "12345"}
2 ActivityTaskScheduled activity=deduct_inventory, task_queue="order"
3 ActivityTaskStarted attempt=1
4 ActivityTaskCompleted result={deducted: true, sku: "SKU-001"}
5 ActivityTaskScheduled activity=create_payment
6 ActivityTaskStarted attempt=1
7 ActivityTaskCompleted result={payment_url: "https://pay.example/abc"}
8 TimerStarted timeout=7d
9 WorkflowExecutionWaitCondition reason="waiting for payment callback"
当 Worker 崩溃重启后,Temporal 会把整个 Event History 重放(Replay)一遍。因为 Workflow 代码是确定性的——同样的输入,同样的执行路径——重放后 Worker 会精确恢复到崩溃前的状态,然后继续执行。
这就是 Temporal 的核心:用不可变的事件日志 + 确定性重放,取代传统的状态持久化。
三、架构深度剖析:Temporal 的每个组件都在干什么
3.1 整体架构
┌─────────────┐ ┌──────────────────────┐ ┌──────────────┐
│ Client │────▶│ Temporal Server │────▶│ Persistence │
│ (SDK调用) │ │ (History Service) │ │ (Cassandra/ │
└─────────────┘ │ (Matching Service) │ │ PostgreSQL/ │
│ (Frontend Service) │ │ MySQL) │
└──────────┬───────────┘ └──────────────┘
│
┌──────────▼───────────┐
│ Task Queue │
│ (长轮询分发) │
└──────────┬───────────┘
│
┌──────────▼───────────┐
│ Worker Pool │
│ (Workflow Worker) │
│ (Activity Worker) │
└──────────────────────┘
Temporal Server 由三个内部服务组成:
- Frontend Service:API 网关,处理所有 gRPC 请求,鉴权、限流
- History Service:核心引擎,管理 Workflow 的 Event History,驱动状态机
- Matching Service:任务调度器,把 Activity Task 和 Workflow Task 分发给 Worker
这三个服务通过 Persistence Layer(Cassandra / PostgreSQL / MySQL)共享状态。Worker 通过**长轮询(Long Polling)**从 Matching Service 获取任务——不需要注册中心,不需要服务发现,Worker 启动后自动开始拉取任务。
3.2 Workflow 与 Activity 的本质区别
这是初学者最容易混淆的概念,但理解它至关重要。
Workflow = 确定性编排逻辑
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, input_data):
# ✅ 允许:调用 Activity、设置定时器、等待信号
result = await workflow.execute_activity(my_activity, input_data)
# ❌ 禁止:直接发 HTTP 请求
# requests.get("https://api.example.com") # 绝对不能这样做!
# ❌ 禁止:直接操作数据库
# db.query("INSERT INTO ...") # 也绝对不行!
# ❌ 禁止:使用随机数、当前时间等非确定性操作
# random.randint(0, 100) # 重放结果会不一致!
return result
为什么?因为 Workflow 会被重放。每次重放时,random.randint 会返回不同的值,datetime.now() 也是不同的时间——这会导致重放后的状态与原始执行不一致,Workflow 就「迷路」了。
Activity = 真实的副作用
@activity.defn
async def my_activity(input_data):
# ✅ 在这里可以做任何事
response = requests.post("https://api.example.com/process", json=input_data)
db.execute("INSERT INTO results VALUES (?)", (response.json(),))
return response.json()
Activity 执行真实的副作用:HTTP 调用、数据库写入、文件操作……Activity 的结果会被记录在 Event History 中,重放时不会重新执行 Activity,而是直接从 History 中读取之前的结果。
这个设计的精妙之处:Workflow 代码只描述「做什么」,Activity 代码实际「做」——两者分离,重放才有意义。
3.3 确定性约束的完整清单
| 约束 | 原因 | 解决方案 |
|---|---|---|
| 不能调用随机函数 | 重放结果不一致 | 使用 workflow.random() |
| 不能获取当前时间 | 重放时间不一致 | 使用 workflow.now() |
| 不能直接做 I/O | 重放时重复执行 | 使用 Activity |
| 不能使用全局可变状态 | 重放时状态不一致 | 使用 Workflow 状态字段 |
| 不能用多线程 | 调度顺序不确定 | 使用 asyncio.gather() |
| 迭代 Map 顺序不确定 | Python 3.7+ dict 有序 | Go map 需要排序后迭代 |
3.4 Signal 与 Query:Workflow 的「双向通信」
Activity 是单向的(Workflow → 外部世界),但很多时候你需要从外部向 Workflow 传递信息:
Signal(信号):异步写入,不等待返回
@workflow.defn
class ApprovalWorkflow:
def __init__(self):
self.approved = False
self.rejected = False
@workflow.signal
def approve(self):
self.approved = True
@workflow.signal
def reject(self, reason: str):
self.rejected = True
@workflow.run
async def run(self, request: ApprovalRequest) -> ApprovalResult:
# 等待审批信号,最长等 30 天
await workflow.wait_condition(
lambda: self.approved or self.rejected,
timeout=timedelta(days=30),
)
if self.approved:
return ApprovalResult(status="approved")
else:
return ApprovalResult(status="rejected")
外部系统可以这样发信号:
# 审批人点击「通过」
handle = client.get_workflow_handle(workflow_id="approval-123")
handle.signal(ApprovalWorkflow.approve)
Query(查询):同步读取,不修改状态
@workflow.defn
class OrderWorkflow:
def __init__(self):
self.current_step = "created"
@workflow.query
def get_status(self) -> str:
return self.current_step
@workflow.run
async def run(self, order: OrderRequest):
self.current_step = "deducting_inventory"
# ...
self.current_step = "waiting_payment"
# ...
Query 不会产生 Event,也不会被记录在 History 中——它是纯只读的。非常适合用来做状态面板。
四、Replay 2026 四大新特性深度解析
4.1 Serverless Workers:不用管 Worker 了
之前,你需要自己运维 Worker 进程:部署、扩容、监控、处理 OOM。Replay 2026 发布的 Serverless Workers 彻底消除了这个负担。
工作原理:
你的 Workflow 代码 → 上传到 Temporal Cloud → Serverless Workers 自动执行
你只需要写 Workflow 和 Activity 的代码,打包上传,Temporal Cloud 负责运行 Worker。自动扩缩容、零冷启动(与传统的 Serverless 不同,Temporal 的 Worker 是常驻的 Warm Pool)、按执行时间计费。
适用场景:
- 中小规模工作流(< 1000 QPS)
- 快速原型验证
- 不想维护 Worker 基础设施
不适用场景:
- 需要访问内网资源的 Activity(VPC 对等连接尚未 GA)
- 极低延迟要求(< 50ms P99)
- 自定义 Worker 中间件
实战配置:
# temporal-serverless.yaml
apiVersion: temporal.io/v1
kind: ServerlessWorker
metadata:
name: order-workflow
spec:
workflowPackage: ./workflows/
runtime: python3.12
taskQueue: order-processing
activities:
- name: deduct_inventory
timeout: 30s
retryPolicy:
maxAttempts: 3
- name: create_payment
timeout: 30s
scaling:
minWorkers: 2
maxWorkers: 50
targetConcurrentActivities: 100
4.2 Standalone Activities:Activity 独立部署
以前,Activity 必须和 Worker 部署在一起——如果你的 Activity 需要访问特定数据库或内部 API,整个 Worker 都得部署在那个网络环境里。
Standalone Activities 打破了这个限制:
Workflow Worker (Temporal Cloud) → Task Queue → Activity Worker (你的 VPC 内)
Activity 可以独立注册到 Task Queue,部署在你自己的基础设施上。Workflow 代码不变,只是 Activity 的执行位置变了。
# activity_worker.py — 部署在你的 VPC 内
async def main():
client = await Client.connect("temporal.cloud:7233", namespace="production")
worker = Worker(
client,
task_queue="order-activities", # 专用 Activity 队列
activities=[deduct_inventory, create_payment, ship_order],
)
await worker.run()
# workflow.py — 运行在 Serverless Workers 上
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order: OrderRequest):
# 这个 Activity 实际运行在你 VPC 内的 Worker 上
result = await workflow.execute_activity(
deduct_inventory,
order,
task_queue="order-activities", # 指定 Activity 所在的队列
start_to_close_timeout=timedelta(seconds=30),
)
这给架构设计带来了极大的灵活性:Workflow 的编排逻辑无状态、可 Serverless;Activity 的业务逻辑有状态、可内网部署。两者解耦,各司其职。
4.3 Workflow Streams:实时事件流
以前,要从外部观察 Workflow 的执行进度,只能轮询 Query 或监听 Signal——又慢又笨。Workflow Streams 让你可以实时订阅 Workflow 的事件流:
# 订阅 Workflow 的实时事件
async with client.workflow_stream(workflow_id="order-123") as stream:
async for event in stream:
match event.type:
case "ActivityTaskCompleted":
print(f"Activity {event.activity_type} completed: {event.result}")
case "TimerFired":
print(f"Timer fired: {event.timer_id}")
case "WorkflowExecutionCompleted":
print(f"Workflow completed: {event.result}")
break
与 Kafka/Redpanda 集成:
Temporal 原生支持将 Workflow Streams 导出到 Kafka 或 Redpanda,实现实时分析和监控:
# temporal-streams-export.yaml
export:
type: kafka
bootstrapServers: "redpanda:9092"
topic: "temporal.workflow.events"
format: protobuf
filters:
namespaces: ["production"]
workflowTypes: ["OrderWorkflow", "RefundWorkflow"]
eventTypes: ["ActivityTaskCompleted", "WorkflowExecutionCompleted", "WorkflowExecutionFailed"]
这意味着你可以用 ClickHouse 做实时看板,用 Flink 做流处理,用 Grafana 做告警——Workflow 的每一次状态变更都实时可观测。
4.4 Google Cloud 集成
Replay 2026 宣布了与 Google Cloud 的深度集成:
- Cloud Spanner 作为 Persistence Layer 的新选项(全球分布式一致性,适合多区域部署)
- Cloud Run 托管 Worker(配合 Serverless Workers 的混合模式)
- Pub/Sub 作为 Workflow Streams 的原生 sink
- Cloud Logging & Cloud Monitoring 原生集成
# GCP 部署示例
apiVersion: temporal.io/v1
kind: TemporalCluster
spec:
persistence:
store: spanner
spanner:
instanceId: temporal-prod
databaseId: temporal-main
workers:
type: cloudrun
cloudrun:
service: temporal-worker
region: us-central1
minInstances: 1
maxInstances: 100
五、Temporal × AI Agent:为什么 AI Agent 天然需要持久化执行
5.1 AI Agent 的「冰山之下」
Temporal 官方与 Neo4j、Auth0、Redpanda 联合发布了一个 Deep Research Agent Demo,完美揭示了 AI Agent 的基础设施需求:
用户看到的只是聊天界面和生成的报告——但冰山之下是这样的架构:
用户提问
↓
Triage Agent(判断是否需要追问)
↓
Clarifying Agent(生成追问问题)→ 等待用户回答(可能几分钟,也可能几天)
↓
Planner Agent(生成搜索计划)
↓
Search Agent × N(并行搜索多个来源)
↓
Writer Agent(综合生成报告)
↓
PDF Generator Agent(生成最终文档)
7 个 Agent,2-3 分钟执行时间,涉及数十次 LLM 调用和 API 请求。如果中途崩溃——没有 Temporal 的话,所有进度丢失,从头来过。有了 Temporal,崩溃后自动恢复到上次的位置。
5.2 实战:用 Temporal 编排多 Agent 工作流
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
with workflow.unsafe.imports_passed_through():
from activities import (
triage_agent,
clarifying_agent,
planner_agent,
search_agent,
writer_agent,
pdf_generator,
)
@workflow.defn
class DeepResearchWorkflow:
def __init__(self):
self.clarification_answers = None
@workflow.signal
def submit_clarifications(self, answers: dict):
self.clarification_answers = answers
@workflow.run
async def run(self, query: str) -> dict:
# Step 1: 判断是否需要追问
triage = await workflow.execute_activity(
triage_agent,
query,
start_to_close_timeout=timedelta(minutes=2),
)
# Step 2: 如果需要追问,生成问题并等待用户回答
if triage.needs_clarification:
questions = await workflow.execute_activity(
clarifying_agent,
triage.clarification_context,
start_to_close_timeout=timedelta(minutes=1),
)
# 等待用户回答 —— 可以等几个小时
await workflow.wait_condition(
lambda: self.clarification_answers is not None,
timeout=timedelta(hours=24),
)
refined_query = f"{query}\nAdditional context: {self.clarification_answers}"
else:
refined_query = query
# Step 3: 生成搜索计划
search_plan = await workflow.execute_activity(
planner_agent,
refined_query,
start_to_close_timeout=timedelta(minutes=2),
)
# Step 4: 并行搜索 —— 每个搜索都是独立的 Activity
import asyncio
search_results = await asyncio.gather(*[
workflow.execute_activity(
search_agent,
search_query,
start_to_close_timeout=timedelta(minutes=2),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=5),
),
)
for search_query in search_plan.queries
])
# Step 5: 综合生成报告
report = await workflow.execute_activity(
writer_agent,
{"query": refined_query, "search_results": search_results},
start_to_close_timeout=timedelta(minutes=5),
)
# Step 6: 生成 PDF
pdf = await workflow.execute_activity(
pdf_generator,
report,
start_to_close_timeout=timedelta(minutes=2),
)
return {"report": report, "pdf_url": pdf.url}
关键设计点:
- Human-in-the-loop:
wait_condition+signal组合,让 Workflow 可以等待用户输入,不限时长 - 并行搜索:
asyncio.gather同时发起多个搜索 Activity,充分利用 I/O 并发 - 自动重试:LLM 调用容易失败(超时、限流),RetryPolicy 自动处理
- 持久化:即使 Worker 在 Step 4 执行到一半崩溃,重启后自动从 Event History 恢复
5.3 Coinbase 的 Temporal MCP Server:AI 编辑器直接调试 Workflow
Coinbase 的工程师做了一个了不起的项目:Temporal MCP Server——让 AI 编辑器(如 Cursor、Claude Code)直接读取 Temporal 的运行时状态。
# Coinbase 开源的 Temporal MCP Server 核心代码
from fastmcp import FastMCP
mcp = FastMCP("temporal-mcp")
@mcp.tool()
async def list_workflows(
namespace: str,
account: str,
query: str | None = None,
limit: int = 10,
) -> str:
"""List Workflow Executions in a Temporal Namespace."""
client = await get_client(account)
workflows = []
async for wf in client.list_workflows(
namespace=namespace,
query=query,
page_size=limit,
):
workflows.append({
"id": wf.id,
"run_id": wf.run_id,
"status": wf.status.name,
"start_time": str(wf.start_time),
"close_time": str(wf.close_time) if wf.close_time else None,
})
return json.dumps(workflows, indent=2)
@mcp.tool()
async def get_workflow_history(
namespace: str,
workflow_id: str,
run_id: str | None = None,
account: str = "default",
) -> str:
"""Get the full Event History for a Workflow Execution."""
client = await get_client(account)
handle = client.get_workflow_handle(workflow_id, run_id)
events = []
async for event in handle.fetch_history():
events.append({
"event_id": event.event_id,
"event_type": event.event_type.name,
"timestamp": str(event.event_time),
# 精简输出,避免 token 爆炸
})
return json.dumps(events, indent=2)
使用方式——在 Cursor 里直接用自然语言:
你: "列出 payments namespace 里最近失败的 Workflow"
AI: 调用 list_workflows(namespace="payments", query="ExecutionStatus='Failed'")
→ 返回最近的 5 个失败 Workflow,附上 Run ID 和失败时间
你: "看看第一个为什么失败了"
AI: 调用 get_workflow_history(namespace="payments", workflow_id="pay-78901")
→ 定位到 Event #23: ActivityTaskFailed
→ 原因:deduct_inventory 超时(30s timeout exceeded)
→ 结合代码搜索,找到 deduct_inventory 函数的实现
→ 建议:该 Activity 内部调用的库存服务 P99 延迟最近飙升至 45s
Coinbase 的数据:两个月内 14,000+ 次 MCP 调用,其中 list_workflows 和 get_workflow_history 是使用最频繁的两个工具。这说明工程师真的在用它替代 Temporal Web UI。
六、Go SDK 实战:电商订单工作流
Python 适合 AI Agent 场景,但在高吞吐的后端服务中,Go 才是主力。下面用一个完整的电商订单工作流展示 Go SDK 的用法。
6.1 Workflow 定义
package workflow
import (
"time"
"go.temporal.io/sdk/workflow"
)
type OrderRequest struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
SKU string `json:"sku"`
Quantity int `json:"quantity"`
TotalPrice float64 `json:"total_price"`
}
type OrderResult struct {
ShipmentID string `json:"shipment_id"`
TrackingNo string `json:"tracking_no"`
}
type OrderWorkflow struct {
paymentCallback *PaymentCallback
cancelReason string
}
func (w *OrderWorkflow) SubmitPaymentCallback(ctx workflow.Context, callback PaymentCallback) {
w.paymentCallback = &callback
}
func (w *OrderWorkflow) CancelOrder(ctx workflow.Context, reason string) {
w.cancelReason = reason
}
func (w *OrderWorkflow) Execute(ctx workflow.Context, req OrderRequest) (OrderResult, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &workflow.RetryPolicy{
MaximumAttempts: 3,
InitialInterval: 1 * time.Second,
MaximumInterval: 10 * time.Second,
BackoffCoefficient: 2.0,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Step 1: 扣库存
var inventoryResult DeductInventoryResult
err := workflow.ExecuteActivity(ctx, DeductInventory, req).Get(ctx, &inventoryResult)
if err != nil {
return OrderResult{}, err
}
// Step 2: 创建支付单
var paymentResult CreatePaymentResult
err = workflow.ExecuteActivity(ctx, CreatePayment, inventoryResult).Get(ctx, &paymentResult)
if err != nil {
// 支付创建失败,补偿:归还库存
_ = workflow.ExecuteActivity(ctx, RestoreInventory, req).Get(ctx, nil)
return OrderResult{}, err
}
// Step 3: 等待支付回调或取消
workflow.await(ctx, func() bool {
return w.paymentCallback != nil || w.cancelReason != ""
})
if w.cancelReason != "" {
// 用户取消,归还库存
_ = workflow.ExecuteActivity(ctx, RestoreInventory, req).Get(ctx, nil)
_ = workflow.ExecuteActivity(ctx, CancelPayment, paymentResult.PaymentID).Get(ctx, nil)
return OrderResult{}, fmt.Errorf("order cancelled: %s", w.cancelReason)
}
// Step 4: 发货
shipAo := workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute, // 发货可能较慢
}
shipCtx := workflow.WithActivityOptions(ctx, shipAo)
var shipmentResult ShipOrderResult
err = workflow.ExecuteActivity(shipCtx, ShipOrder, paymentResult).Get(shipCtx, &shipmentResult)
if err != nil {
return OrderResult{}, err
}
// Step 5: 发送通知
_ = workflow.ExecuteActivity(ctx, SendNotification, req.UserID, shipmentResult).Get(ctx, nil)
return OrderResult{
ShipmentID: shipmentResult.ID,
TrackingNo: shipmentResult.TrackingNo,
}, nil
}
6.2 Activity 实现
package activities
import (
"context"
"database/sql"
"fmt"
"net/http"
"go.temporal.io/sdk/activity"
)
func DeductInventory(ctx context.Context, req OrderRequest) (DeductInventoryResult, error) {
logger := activity.GetLogger(ctx)
logger.Info("Deducting inventory", "order_id", req.OrderID, "sku", req.SKU)
// 获取心跳信息,支持优雅中断
heartbeatDetails := activity.GetHeartbeatDetails(ctx)
if heartbeatDetails != nil {
logger.Info("Resuming from heartbeat", "details", heartbeatDetails)
}
// 调用库存服务
resp, err := http.Post(
"http://inventory-service:8080/deduct",
"application/json",
bytes.NewReader(marshal(req)),
)
if err != nil {
return DeductInventoryResult{}, fmt.Errorf("inventory service unavailable: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return DeductInventoryResult{}, fmt.Errorf("deduction failed: status %d", resp.StatusCode)
}
// 发送心跳,记录进度
activity.RecordHeartbeat(ctx, req.OrderID)
return DeductInventoryResult{Success: true, SKU: req.SKU}, nil
}
func CreatePayment(ctx context.Context, inv DeductInventoryResult) (CreatePaymentResult, error) {
// 调用支付服务...
}
func ShipOrder(ctx context.Context, payment CreatePaymentResult) (ShipOrderResult, error) {
// 调用物流服务...
// 长时间运行的 Activity,需要定期心跳
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
activity.RecordHeartbeat(ctx, "shipping in progress")
// ... 实际发货逻辑
}
}
}
func RestoreInventory(ctx context.Context, req OrderRequest) error {
// 补偿事务:归还库存
}
func CancelPayment(ctx context.Context, paymentID string) error {
// 补偿事务:取消支付
}
func SendNotification(ctx context.Context, userID string, result ShipOrderResult) error {
// 发送通知...
}
6.3 Worker 启动
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("Unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, "order-task-queue", worker.Options{
MaxConcurrentWorkflowTaskExecutionSize: 100,
MaxConcurrentActivityExecutionSize: 500,
MaxConcurrentLocalActivityExecutionSize: 200,
WorkflowPanicPolicy: worker.FailWorkflow,
})
w.RegisterWorkflow(OrderWorkflow.Execute)
w.RegisterActivity(DeductInventory)
w.RegisterActivity(CreatePayment)
w.RegisterActivity(ShipOrder)
w.RegisterActivity(RestoreInventory)
w.RegisterActivity(CancelPayment)
w.RegisterActivity(SendNotification)
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatalln("Unable to start worker", err)
}
}
6.4 启动工作流
package main
import (
"context"
"log"
"go.temporal.io/sdk/client"
)
func main() {
c, err := client.Dial(client.Options{HostPort: "localhost:7233"})
if err != nil {
log.Fatalln(err)
}
defer c.Close()
workflowID := "order-20260617-001"
options := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: "order-task-queue",
WorkflowExecutionTimeout: 7 * 24 * time.Hour, // 订单最长 7 天
WorkflowTaskTimeout: 10 * time.Second,
CronSchedule: "", // 非定时任务
}
we, err := c.ExecuteWorkflow(context.Background(), options, OrderWorkflow.Execute, OrderRequest{
OrderID: "20260617-001",
UserID: "user-42",
SKU: "SKU-001",
Quantity: 1,
TotalPrice: 299.99,
})
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Printf("Workflow started: %s, RunID: %s", we.GetID(), we.GetRunID())
// 模拟支付回调
time.Sleep(5 * time.Second)
c.SignalWorkflow(context.Background(), workflowID, "", "SubmitPaymentCallback", PaymentCallback{
PaymentID: "pay-123",
Status: "success",
})
}
七、性能优化与生产级调优
7.1 History 膨胀问题与 Continue-As-New
Workflow 的 Event History 会持续增长。一个运行数月的 Workflow,History 可能包含数十万条事件,导致重放变慢、内存占用增加。
解决方案:Continue-As-New——在当前 Workflow 执行完毕前,自动启动一个新的 Workflow Run,把状态传递过去:
@workflow.defn
class LongRunningMonitor:
@workflow.run
async def run(self, config: MonitorConfig):
while True:
# 每分钟检查一次
result = await workflow.execute_activity(
check_health,
config,
start_to_close_timeout=timedelta(seconds=30),
)
if result.unhealthy:
await workflow.execute_activity(send_alert, result)
await workflow.sleep(timedelta(minutes=1))
# History 超过 10000 条事件时,Continue-As-New
if workflow.info().history_length > 10000:
# 把当前状态传给新的 Run
workflow.continue_as_new(config)
最佳实践:
- 设置
workflow_execution_timeout防止无限运行 - History 超过 10,000 条事件时触发 Continue-As-New
- 对于真正需要长期运行的 Workflow,考虑 Child Workflow 分拆
7.2 Worker 性能参数
w := worker.New(c, "task-queue", worker.Options{
// Workflow Task 并发数:取决于 Workflow 逻辑复杂度
// CPU 密集型(大量重放):设为 CPU 核心数
// I/O 密集型(大量 await):可以更高
MaxConcurrentWorkflowTaskExecutionSize: 100,
// Activity 并发数:取决于下游服务承受能力
// 如果 Activity 调用外部 API,注意限流
MaxConcurrentActivityExecutionSize: 500,
// Activity Task 心跳超时
// 长时间 Activity 必须定期 RecordHeartbeat
ActivityHeartbeatTimeout: 30 * time.Second,
// 本地 Activity(与 Workflow 同进程执行,不经过 Matching Service)
// 适合轻量操作(如数据库查询),减少网络开销
MaxConcurrentLocalActivityExecutionSize: 200,
})
7.3 Persistence Layer 选型
| 数据库 | 优势 | 劣势 | 推荐场景 |
|---|---|---|---|
| PostgreSQL | 运维简单,生态丰富 | 单节点写入瓶颈 | 中小规模(< 500 QPS) |
| Cassandra | 高写入吞吐,线性扩展 | 运维复杂,最终一致 | 大规模(> 1000 QPS) |
| MySQL | 团队熟悉度高 | 与 PG 类似的瓶颈 | 已有 MySQL 基础设施 |
| Spanner | 全球分布式一致性 | 贵 | 多区域部署 |
| SQLite | 零依赖,嵌入式 | 不支持集群 | 开发/测试 |
7.4 监控指标
Temporal 暴露了 Prometheus 格式的指标,关键看板:
# 以下指标建议在 Grafana 中配置告警
# Workflow 执行延迟 P99
temporal_workflow_task_execution_latency{namespace="production"} > 1s
# Activity 执行失败率
rate(temporal_activity_execution_failed{namespace="production"}[5m])
/ rate(temporal_activity_execution_completed{namespace="production"}[5m]) > 0.05
# Task Queue 积压
temporal_task_queue_schedule_to_start_latency{namespace="production"} > 5s
# History 长度(防止膨胀)
temporal_workflow_history_length{namespace="production"} > 50000
# Worker 活跃数
temporal_worker_task_slots_available{namespace="production"} < 5
八、Temporal vs 其他工作流引擎:2026 年度对比
| 维度 | Temporal | Apache Airflow | Camunda 8 | Netflix Conductor | Inngest |
|---|---|---|---|---|---|
| 执行模型 | 持久化执行 | DAG 调度 | BPMN 流程 | DAG 调度 | 事件驱动 |
| 代码方式 | 原生代码 | Python DSL | BPMN XML + 代码 | JSON DSL | 原生代码 |
| 长时间运行 | ✅ 天/周/月 | ❌ 小时级 | ✅ 支持 | ✅ 支持 | ✅ 支持 |
| Human-in-the-loop | ✅ Signal + Wait | ❌ 不原生支持 | ✅ User Task | ✅ 支持 | ✅ 支持 |
| AI Agent 编排 | ✅ 原生适配 | ❌ 不适合 | ⚠️ 需要适配 | ⚠️ 需要适配 | ✅ 适配 |
| 多语言 SDK | Go/Python/TS/Java/PHP | Python 优先 | Java 优先 | Java/Python | TS/Python/Go |
| 自托管 | ✅ | ✅ | ✅ | ✅ | ❌ 仅 Cloud |
| Cloud 托管 | ✅ Temporal Cloud | ✅ Astronomer | ✅ Camunda SaaS | ✅ Orkes | ✅ Inngest Cloud |
| 社区规模 | 52K+ GitHub Stars | 42K+ | 3.5K+ | 13K+ | 10K+ |
选型建议:
- 数据管道 / ETL → Airflow(生态成熟,Operator 丰富)
- 企业审批流 / BPMN → Camunda(可视化流程设计,合规友好)
- 微服务编排 / AI Agent → Temporal(持久化执行,代码优先,多语言)
- 轻量级事件驱动 → Inngest(上手快,但功能有限)
九、Child Workflow 与 Saga 模式
9.1 Child Workflow:组合与隔离
当主 Workflow 变得过于复杂时,可以用 Child Workflow 拆分:
@workflow.defn
class TripBookingWorkflow:
@workflow.run
async def run(self, trip: TripRequest) -> TripResult:
# 并行预订航班、酒店、租车
import asyncio
flight, hotel, car = await asyncio.gather(
workflow.execute_child_workflow(
BookFlightWorkflow.run,
trip.flight,
workflow_id=f"flight-{trip.id}",
),
workflow.execute_child_workflow(
BookHotelWorkflow.run,
trip.hotel,
workflow_id=f"hotel-{trip.id}",
),
workflow.execute_child_workflow(
BookCarWorkflow.run,
trip.car,
workflow_id=f"car-{trip.id}",
),
)
return TripResult(flight=flight, hotel=hotel, car=car)
Child Workflow 的优势:
- History 隔离:每个 Child 有独立的 Event History,不会膨胀 Parent
- 独立重试:Child 失败后可以独立重试,不影响 Parent 的其他 Child
- 权限隔离:不同的 Child 可以由不同的 Worker 执行
9.2 Saga 模式:分布式事务补偿
上面的 TripBooking 有个问题:如果航班和酒店都订好了,但租车失败了怎么办?需要补偿——取消航班和酒店。
Temporal 的 Saga 模式:
@workflow.defn
class TripBookingSagaWorkflow:
@workflow.run
async def run(self, trip: TripRequest) -> TripResult:
# 编排 + 补偿函数的对应关系
compensations = []
try:
# 预订航班
flight = await workflow.execute_child_workflow(
BookFlightWorkflow.run, trip.flight
)
compensations.append(lambda: workflow.execute_child_workflow(
CancelFlightWorkflow.run, flight
))
# 预订酒店
hotel = await workflow.execute_child_workflow(
BookHotelWorkflow.run, trip.hotel
)
compensations.append(lambda: workflow.execute_child_workflow(
CancelHotelWorkflow.run, hotel
))
# 预订租车
car = await workflow.execute_child_workflow(
BookCarWorkflow.run, trip.car
)
compensations.append(lambda: workflow.execute_child_workflow(
CancelCarWorkflow.run, car
))
return TripResult(flight=flight, hotel=hotel, car=car)
except Exception:
# 按逆序执行补偿
for compensation in reversed(compensations):
try:
await compensation()
except Exception as e:
# 补偿失败需要人工介入
await workflow.execute_activity(
alert_manual_intervention,
{"error": str(e), "trip_id": trip.id},
)
raise
十、安全与多租户
10.1 Namespace 隔离
Temporal 用 Namespace 实现多租户隔离:
# 创建 Namespace
temporal operator namespace create \
--namespace production \
--retention 30d \
--description "Production workloads"
temporal operator namespace create \
--namespace staging \
--retention 7d \
--description "Staging workloads"
每个 Namespace 有独立的:
- Task Queue
- Workflow ID 空间
- 搜索属性
- 数据保留策略
10.2 Payload Codec:数据加密
如果 Event History 中包含敏感信息(PII、密钥),可以用 Payload Codec 加密:
from temporalio.converter import PayloadCodec, Payload
from cryptography.fernet import Fernet
class EncryptionCodec(PayloadCodec):
def __init__(self, key: bytes):
self._fernet = Fernet(key)
async def encode(self, payloads: list[Payload]) -> list[Payload]:
return [
Payload(
metadata={"encoding": b"binary/encrypted"},
data=self._fernet.encrypt(p.data),
)
for p in payloads
]
async def decode(self, payloads: list[Payload]) -> list[Payload]:
result = []
for p in payloads:
if p.metadata.get("encoding") == b"binary/encrypted":
result.append(Payload(
metadata={"encoding": b"json/plain"},
data=self._fernet.decrypt(p.data),
))
else:
result.append(p)
return result
# 注册 Codec
client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
data_converter,
payload_codec=EncryptionCodec(key),
),
)
Coinbase 的 MCP Server 就是用 Codec 来解码 Payload 的——在 AI 编辑器中看到的是明文,但存储在 Temporal 中的是密文。
10.3 mTLS 与 Authorization
生产环境推荐 mTLS:
# temporal-server-mtls.yaml
global:
mtls:
internode:
enabled: true
certFile: /certs/server.crt
keyFile: /certs/server.key
caFile: /certs/ca.crt
frontend:
enabled: true
certFile: /certs/server.crt
keyFile: /certs/server.key
caFile: /certs/ca.crt
authorization:
jwtKeySet:
uri: "https://auth.example.com/.well-known/jwks.json"
permissionsClaimName: "permissions"
十一、从零搭建生产级 Temporal 集群
11.1 Kubernetes 部署
# temporal-cluster.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: temporal-server
spec:
replicas: 3
selector:
matchLabels:
app: temporal-server
template:
spec:
containers:
- name: temporal
image: temporalio/server:1.26.0
ports:
- containerPort: 7233 # gRPC (Frontend)
- containerPort: 9090 # Metrics (Prometheus)
env:
- name: DB
value: "postgresql"
- name: POSTGRES_SEEDS
value: "postgres-cluster.default.svc.cluster.local:5432"
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: temporal-db-creds
key: username
- name: POSTGRES_PWD
valueFrom:
secretKeyRef:
name: temporal-db-creds
key: password
- name: ENABLE_ES
value: "true" # Elasticsearch for Advanced Visibility
- name: ES_SEEDS
value: "elasticsearch.default.svc.cluster.local:9200"
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
livenessProbe:
grpc:
port: 7233
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
grpc:
port: 7233
initialDelaySeconds: 10
periodSeconds: 5
11.2 高可用考量
- Server:3+ 节点,每个 History Service 负责一部分 Workflow(分片)
- 数据库:PostgreSQL 使用 Patroni 或 Cloud SQL HA;Cassandra 使用 3 DC 部署
- Worker:至少 2 个副本,分布在不同可用区
- Elasticsearch:3 节点集群,用于高级可见性查询
11.3 Temporal Cloud vs 自托管
| 维度 | Temporal Cloud | 自托管 |
|---|---|---|
| 运维成本 | 零 | 高(DB + ES + Server + Worker) |
| 初始费用 | $0.04/1000 Workflow Task | 服务器成本 |
| SLA | 99.9% | 自行保障 |
| 定制性 | 有限 | 完全可控 |
| 合规 | SOC2 Type II | 自行审计 |
| 推荐 | < 10M tasks/month | > 50M tasks/month |
十二、常见陷阱与排障指南
12.1 Workflow 确定性违规
症状:NonDeterministicError,Workflow 执行到一半报错。
原因:Workflow 代码中包含了非确定性操作,导致重放时行为不一致。
排查:
# 查看 Workflow History 中是否有异常
temporal workflow show \
--workflow-id my-workflow \
--run-id <run-id> \
--output json | jq '.events[] | select(.eventType == "WORKFLOW_TASK_FAILED")'
常见违规:
# ❌ 错误:使用 Python 内置 random
import random
value = random.randint(0, 100)
# ✅ 正确:使用 Temporal 提供的确定性随机
value = workflow.random().randint(0, 100)
# ❌ 错误:获取当前时间
now = datetime.now()
# ✅ 正确:使用 Temporal 提供的时间
now = workflow.now()
# ❌ 错误:遍历 Go map(顺序不确定)
for k, v := range myMap { ... }
# ✅ 正确:排序后遍历
keys := make([]string, 0, len(myMap))
for k := range myMap { keys = append(keys, k) }
sort.Strings(keys)
for _, k := range keys { v := myMap[k]; ... }
12.2 Activity 幂等性
Activity 可能被重试,因此必须幂等:
@activity.defn
async def create_payment(order: OrderRequest) -> PaymentResult:
# ❌ 非幂等:每次调用都会创建新支付单
# return payment_api.create(order)
# ✅ 幂等:基于 order_id 去重
existing = payment_api.get_by_order_id(order.order_id)
if existing:
return existing # 已经创建过,直接返回
return payment_api.create(order)
实现幂等性的几种方式:
- 自然幂等:GET 请求、读数据库(天然幂等)
- 唯一键去重:基于业务 ID 检查是否已处理
- Token 机制:下游服务提供 idempotency key
- 数据库约束:UNIQUE 约束防止重复插入
12.3 Workflow Task 超时
症状:WorkflowTaskTimedOut,Workflow 看起来卡住了。
原因:Worker 没有及时处理 Workflow Task(通常是因为 Worker 全部下线或 Task Queue 名称错误)。
排查:
# 检查 Task Queue 的 Worker 数量
temporal task-queue describe \
--task-queue order-task-queue \
--namespace production
确保 PollerCount > 0。如果为 0,说明没有 Worker 在监听这个 Task Queue。
十三、总结与展望
13.1 Temporal 的核心价值
| 场景 | 不用 Temporal | 用 Temporal |
|---|---|---|
| 长时间业务流程 | 状态机 + DB + 定时任务 | 一段代码 |
| 故障恢复 | 手动补偿 + 运维脚本 | 自动重放恢复 |
| 人机交互 | Webhook + 轮询 | Signal + Wait |
| 并行任务 | asyncio.gather 还是自己写? | Activity 并行 + 自动聚合 |
| 可观测性 | 自己埋点 + APM | Event History + UI |
| AI Agent 编排 | 脆弱、不可恢复 | 持久化、可恢复、可观测 |
13.2 2026 年下半年展望
Replay 2026 发布的新特性预示着 Temporal 的演进方向:
- Serverless Workers GA:降低运维门槛,让更多团队用起来
- Workflow Streams 生态:与 Kafka/Pub/Sub/Redpanda 深度整合,成为实时事件中枢
- AI Agent 原生支持:与 MCP、知识图谱、向量数据库的集成会更加紧密
- 多区域部署:基于 Spanner 的全球分布式 Persistence Layer
- 可视化编排:代码优先不会变,但会提供可视化工具辅助理解复杂 Workflow
13.3 什么时候该用 Temporal?
该用:
- 业务流程有多个步骤,且步骤之间有依赖或等待
- 需要人工介入(审批、确认)
- 流程可能运行很长时间(分钟、小时、天)
- 故障后需要自动恢复,不能丢失进度
- 编排 AI Agent 的多轮交互
不该用:
- 纯 CRUD 操作(直接用框架就行了)
- 毫秒级实时响应(Temporal 的延迟在 10ms+ 级别)
- 一次性批量任务(用 Airflow 或 Spark)
- 简单的异步消息传递(用 Kafka/RabbitMQ 足矣)
Temporal 不是银弹,但在「需要可靠执行的复杂业务流程」这个领域,它是 2026 年最好的选择。Serverless Workers、Standalone Activities、Workflow Streams 这三大新特性,让它的上手门槛进一步降低,适用场景进一步拓宽。如果你还没用过 Temporal,现在就是最好的时机。