编程 Temporal 深度实战:当持久化执行遇见 AI Agent 时代——从 Event History 到 Serverless Workers、Workflow Streams 与 MCP 调试的生产级完全指南(2026)

2026-06-17 07:29:34 +0800 CST views 8

Temporal 深度实战:当持久化执行遇见 AI Agent 时代——从 Event History 到 Serverless Workers、Workflow Streams 与 MCP 调试的生产级完全指南(2026)

一、为什么 2026 年你必须重新认识 Temporal

如果你对 Temporal 的印象还停留在「又一个分布式工作流引擎」,那你就大错特错了。

2026 年 6 月,Temporal 在 Replay 2026 大会上扔出了四枚重磅炸弹:Serverless WorkersStandalone ActivitiesWorkflow Streams,以及与 Google Cloud 的深度集成。与此同时,Coinbase 开源了 Temporal MCP Server,让 AI 编辑器直接读懂你的 Workflow 状态;Neo4j、Auth0、Redpanda 纷纷与 Temporal 深度整合,构建 AI Agent 的完整基础设施栈。

这不是「工作流引擎」的进化,这是持久化执行(Durable Execution)范式的成熟——在 AI Agent 大规模落地的 2026 年,Temporal 正在成为那个让 Agent 不再「失忆」、不再「崩溃后无法恢复」的核心基础设施。

本文将从 Temporal 的底层架构讲起,深入剖析 Event History、确定性重放、Workflow/Activity 分离等核心概念,然后带你实战 Replay 2026 的全新特性,最后给出生产级部署与性能调优的完整方案。


二、持久化执行:一个被严重低估的超能力

2.1 传统微服务的「失忆症」

先看一个典型场景:电商下单流程。

用户下单 → 扣库存 → 创建支付单 → 调用第三方支付 → 等待回调 → 发货 → 通知

在传统架构中,你会怎么实现?

  • 方案 A:同步链式调用 —— 任何一个环节超时,整个请求失败,用户看到 500。库存扣了但支付没成功?手动补偿吧。
  • 方案 B:消息队列 —— 每个步骤发个消息,下游消费。看起来解耦了,但「下单流程」这个业务概念被拆散到了 7 个 consumer 里,谁也说不出「订单 #12345 现在走到哪一步了」。
  • 方案 C:状态机 + 数据库 —— 每一步更新 DB 状态。能查进度了,但超时重试、补偿逻辑全要自己写,代码量爆炸。

三种方案的共同问题:没有持久化执行。进程崩溃、Pod 重启、节点故障——执行状态就丢了。你可能加了重试,但重试的语义是什么?幂等性保证了吗?补偿事务写了没有?

2.2 Temporal 的答案:代码即状态

Temporal 的核心哲学极其简单:你的代码就是状态机,我来保证它永远不会丢失执行进度

看一段代码:

@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self.payment_callback = None

    @workflow.run
    async def run(self, order: OrderRequest) -> OrderResult:
        # 第一步:扣库存
        inventory = await workflow.execute_activity(
            deduct_inventory,
            order,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(
                maximum_attempts=3,
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=10),
            ),
        )

        # 第二步:创建支付单
        payment = await workflow.execute_activity(
            create_payment,
            inventory,
            start_to_close_timeout=timedelta(seconds=30),
        )

        # 第三步:等待支付回调 —— 可以等几秒,也可以等几天
        await workflow.wait_condition(
            lambda: self.payment_callback is not None,
            timeout=timedelta(days=7),
        )

        # 第四步:发货
        shipment = await workflow.execute_activity(
            ship_order,
            payment,
            start_to_close_timeout=timedelta(minutes=5),
        )

        return OrderResult(shipment_id=shipment.id)

这段代码读起来就像同步代码——但它的执行是持久化的。当 wait_condition 等待支付回调时,Worker 进程可以直接关机。三天后用户付款了,回调触发,Temporal 自动恢复执行,从 wait_condition 之后的代码继续运行。

不需要数据库存状态,不需要消息队列传回调,不需要定时任务轮询超时。代码写出来是什么样,执行就是什么样——只是永远不会「忘记」执行到哪了。

2.3 这不是魔法,是 Event History

Temporal 之所以能做到这一点,核心机制是 Event History(事件历史)

每当 Workflow 执行一个操作(启动、调用 Activity、设置定时器、接收信号……),Temporal Server 都会向 Event History 追加一条不可变的事件记录:

1  WorkflowExecutionStarted    input={order_id: "12345"}
2  ActivityTaskScheduled       activity=deduct_inventory, task_queue="order"
3  ActivityTaskStarted         attempt=1
4  ActivityTaskCompleted       result={deducted: true, sku: "SKU-001"}
5  ActivityTaskScheduled       activity=create_payment
6  ActivityTaskStarted         attempt=1
7  ActivityTaskCompleted       result={payment_url: "https://pay.example/abc"}
8  TimerStarted                timeout=7d
9  WorkflowExecutionWaitCondition  reason="waiting for payment callback"

当 Worker 崩溃重启后,Temporal 会把整个 Event History 重放(Replay)一遍。因为 Workflow 代码是确定性的——同样的输入,同样的执行路径——重放后 Worker 会精确恢复到崩溃前的状态,然后继续执行。

这就是 Temporal 的核心:用不可变的事件日志 + 确定性重放,取代传统的状态持久化


三、架构深度剖析:Temporal 的每个组件都在干什么

3.1 整体架构

┌─────────────┐     ┌──────────────────────┐     ┌──────────────┐
│   Client     │────▶│   Temporal Server    │────▶│  Persistence │
│  (SDK调用)   │     │  (History Service)   │     │  (Cassandra/ │
└─────────────┘     │  (Matching Service)  │     │   PostgreSQL/ │
                    │  (Frontend Service)  │     │   MySQL)     │
                    └──────────┬───────────┘     └──────────────┘
                               │
                    ┌──────────▼───────────┐
                    │   Task Queue         │
                    │   (长轮询分发)        │
                    └──────────┬───────────┘
                               │
                    ┌──────────▼───────────┐
                    │   Worker Pool        │
                    │   (Workflow Worker)  │
                    │   (Activity Worker)  │
                    └──────────────────────┘

Temporal Server 由三个内部服务组成:

  • Frontend Service:API 网关,处理所有 gRPC 请求,鉴权、限流
  • History Service:核心引擎,管理 Workflow 的 Event History,驱动状态机
  • Matching Service:任务调度器,把 Activity Task 和 Workflow Task 分发给 Worker

这三个服务通过 Persistence Layer(Cassandra / PostgreSQL / MySQL)共享状态。Worker 通过**长轮询(Long Polling)**从 Matching Service 获取任务——不需要注册中心,不需要服务发现,Worker 启动后自动开始拉取任务。

3.2 Workflow 与 Activity 的本质区别

这是初学者最容易混淆的概念,但理解它至关重要。

Workflow = 确定性编排逻辑

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, input_data):
        # ✅ 允许:调用 Activity、设置定时器、等待信号
        result = await workflow.execute_activity(my_activity, input_data)
        
        # ❌ 禁止:直接发 HTTP 请求
        # requests.get("https://api.example.com")  # 绝对不能这样做!
        
        # ❌ 禁止:直接操作数据库
        # db.query("INSERT INTO ...")  # 也绝对不行!
        
        # ❌ 禁止:使用随机数、当前时间等非确定性操作
        # random.randint(0, 100)  # 重放结果会不一致!
        
        return result

为什么?因为 Workflow 会被重放。每次重放时,random.randint 会返回不同的值,datetime.now() 也是不同的时间——这会导致重放后的状态与原始执行不一致,Workflow 就「迷路」了。

Activity = 真实的副作用

@activity.defn
async def my_activity(input_data):
    # ✅ 在这里可以做任何事
    response = requests.post("https://api.example.com/process", json=input_data)
    db.execute("INSERT INTO results VALUES (?)", (response.json(),))
    return response.json()

Activity 执行真实的副作用:HTTP 调用、数据库写入、文件操作……Activity 的结果会被记录在 Event History 中,重放时不会重新执行 Activity,而是直接从 History 中读取之前的结果。

这个设计的精妙之处:Workflow 代码只描述「做什么」,Activity 代码实际「做」——两者分离,重放才有意义

3.3 确定性约束的完整清单

约束原因解决方案
不能调用随机函数重放结果不一致使用 workflow.random()
不能获取当前时间重放时间不一致使用 workflow.now()
不能直接做 I/O重放时重复执行使用 Activity
不能使用全局可变状态重放时状态不一致使用 Workflow 状态字段
不能用多线程调度顺序不确定使用 asyncio.gather()
迭代 Map 顺序不确定Python 3.7+ dict 有序Go map 需要排序后迭代

3.4 Signal 与 Query:Workflow 的「双向通信」

Activity 是单向的(Workflow → 外部世界),但很多时候你需要从外部向 Workflow 传递信息:

Signal(信号):异步写入,不等待返回

@workflow.defn
class ApprovalWorkflow:
    def __init__(self):
        self.approved = False
        self.rejected = False

    @workflow.signal
    def approve(self):
        self.approved = True

    @workflow.signal
    def reject(self, reason: str):
        self.rejected = True

    @workflow.run
    async def run(self, request: ApprovalRequest) -> ApprovalResult:
        # 等待审批信号,最长等 30 天
        await workflow.wait_condition(
            lambda: self.approved or self.rejected,
            timeout=timedelta(days=30),
        )
        if self.approved:
            return ApprovalResult(status="approved")
        else:
            return ApprovalResult(status="rejected")

外部系统可以这样发信号:

# 审批人点击「通过」
handle = client.get_workflow_handle(workflow_id="approval-123")
handle.signal(ApprovalWorkflow.approve)

Query(查询):同步读取,不修改状态

@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self.current_step = "created"

    @workflow.query
    def get_status(self) -> str:
        return self.current_step

    @workflow.run
    async def run(self, order: OrderRequest):
        self.current_step = "deducting_inventory"
        # ...
        self.current_step = "waiting_payment"
        # ...

Query 不会产生 Event,也不会被记录在 History 中——它是纯只读的。非常适合用来做状态面板。


四、Replay 2026 四大新特性深度解析

4.1 Serverless Workers:不用管 Worker 了

之前,你需要自己运维 Worker 进程:部署、扩容、监控、处理 OOM。Replay 2026 发布的 Serverless Workers 彻底消除了这个负担。

工作原理

你的 Workflow 代码 → 上传到 Temporal Cloud → Serverless Workers 自动执行

你只需要写 Workflow 和 Activity 的代码,打包上传,Temporal Cloud 负责运行 Worker。自动扩缩容、零冷启动(与传统的 Serverless 不同,Temporal 的 Worker 是常驻的 Warm Pool)、按执行时间计费。

适用场景

  • 中小规模工作流(< 1000 QPS)
  • 快速原型验证
  • 不想维护 Worker 基础设施

不适用场景

  • 需要访问内网资源的 Activity(VPC 对等连接尚未 GA)
  • 极低延迟要求(< 50ms P99)
  • 自定义 Worker 中间件

实战配置

# temporal-serverless.yaml
apiVersion: temporal.io/v1
kind: ServerlessWorker
metadata:
  name: order-workflow
spec:
  workflowPackage: ./workflows/
  runtime: python3.12
  taskQueue: order-processing
  activities:
    - name: deduct_inventory
      timeout: 30s
      retryPolicy:
        maxAttempts: 3
    - name: create_payment
      timeout: 30s
  scaling:
    minWorkers: 2
    maxWorkers: 50
    targetConcurrentActivities: 100

4.2 Standalone Activities:Activity 独立部署

以前,Activity 必须和 Worker 部署在一起——如果你的 Activity 需要访问特定数据库或内部 API,整个 Worker 都得部署在那个网络环境里。

Standalone Activities 打破了这个限制:

Workflow Worker (Temporal Cloud) → Task Queue → Activity Worker (你的 VPC 内)

Activity 可以独立注册到 Task Queue,部署在你自己的基础设施上。Workflow 代码不变,只是 Activity 的执行位置变了。

# activity_worker.py — 部署在你的 VPC 内
async def main():
    client = await Client.connect("temporal.cloud:7233", namespace="production")
    worker = Worker(
        client,
        task_queue="order-activities",  # 专用 Activity 队列
        activities=[deduct_inventory, create_payment, ship_order],
    )
    await worker.run()
# workflow.py — 运行在 Serverless Workers 上
@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order: OrderRequest):
        # 这个 Activity 实际运行在你 VPC 内的 Worker 上
        result = await workflow.execute_activity(
            deduct_inventory,
            order,
            task_queue="order-activities",  # 指定 Activity 所在的队列
            start_to_close_timeout=timedelta(seconds=30),
        )

这给架构设计带来了极大的灵活性:Workflow 的编排逻辑无状态、可 Serverless;Activity 的业务逻辑有状态、可内网部署。两者解耦,各司其职。

4.3 Workflow Streams:实时事件流

以前,要从外部观察 Workflow 的执行进度,只能轮询 Query 或监听 Signal——又慢又笨。Workflow Streams 让你可以实时订阅 Workflow 的事件流:

# 订阅 Workflow 的实时事件
async with client.workflow_stream(workflow_id="order-123") as stream:
    async for event in stream:
        match event.type:
            case "ActivityTaskCompleted":
                print(f"Activity {event.activity_type} completed: {event.result}")
            case "TimerFired":
                print(f"Timer fired: {event.timer_id}")
            case "WorkflowExecutionCompleted":
                print(f"Workflow completed: {event.result}")
                break

与 Kafka/Redpanda 集成

Temporal 原生支持将 Workflow Streams 导出到 Kafka 或 Redpanda,实现实时分析和监控:

# temporal-streams-export.yaml
export:
  type: kafka
  bootstrapServers: "redpanda:9092"
  topic: "temporal.workflow.events"
  format: protobuf
  filters:
    namespaces: ["production"]
    workflowTypes: ["OrderWorkflow", "RefundWorkflow"]
    eventTypes: ["ActivityTaskCompleted", "WorkflowExecutionCompleted", "WorkflowExecutionFailed"]

这意味着你可以用 ClickHouse 做实时看板,用 Flink 做流处理,用 Grafana 做告警——Workflow 的每一次状态变更都实时可观测。

4.4 Google Cloud 集成

Replay 2026 宣布了与 Google Cloud 的深度集成:

  • Cloud Spanner 作为 Persistence Layer 的新选项(全球分布式一致性,适合多区域部署)
  • Cloud Run 托管 Worker(配合 Serverless Workers 的混合模式)
  • Pub/Sub 作为 Workflow Streams 的原生 sink
  • Cloud Logging & Cloud Monitoring 原生集成
# GCP 部署示例
apiVersion: temporal.io/v1
kind: TemporalCluster
spec:
  persistence:
    store: spanner
    spanner:
      instanceId: temporal-prod
      databaseId: temporal-main
  workers:
    type: cloudrun
    cloudrun:
      service: temporal-worker
      region: us-central1
      minInstances: 1
      maxInstances: 100

五、Temporal × AI Agent:为什么 AI Agent 天然需要持久化执行

5.1 AI Agent 的「冰山之下」

Temporal 官方与 Neo4j、Auth0、Redpanda 联合发布了一个 Deep Research Agent Demo,完美揭示了 AI Agent 的基础设施需求:

用户看到的只是聊天界面和生成的报告——但冰山之下是这样的架构:

用户提问
  ↓
Triage Agent(判断是否需要追问)
  ↓
Clarifying Agent(生成追问问题)→ 等待用户回答(可能几分钟,也可能几天)
  ↓
Planner Agent(生成搜索计划)
  ↓
Search Agent × N(并行搜索多个来源)
  ↓
Writer Agent(综合生成报告)
  ↓
PDF Generator Agent(生成最终文档)

7 个 Agent,2-3 分钟执行时间,涉及数十次 LLM 调用和 API 请求。如果中途崩溃——没有 Temporal 的话,所有进度丢失,从头来过。有了 Temporal,崩溃后自动恢复到上次的位置。

5.2 实战:用 Temporal 编排多 Agent 工作流

from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy

with workflow.unsafe.imports_passed_through():
    from activities import (
        triage_agent,
        clarifying_agent,
        planner_agent,
        search_agent,
        writer_agent,
        pdf_generator,
    )

@workflow.defn
class DeepResearchWorkflow:
    def __init__(self):
        self.clarification_answers = None

    @workflow.signal
    def submit_clarifications(self, answers: dict):
        self.clarification_answers = answers

    @workflow.run
    async def run(self, query: str) -> dict:
        # Step 1: 判断是否需要追问
        triage = await workflow.execute_activity(
            triage_agent,
            query,
            start_to_close_timeout=timedelta(minutes=2),
        )

        # Step 2: 如果需要追问,生成问题并等待用户回答
        if triage.needs_clarification:
            questions = await workflow.execute_activity(
                clarifying_agent,
                triage.clarification_context,
                start_to_close_timeout=timedelta(minutes=1),
            )
            # 等待用户回答 —— 可以等几个小时
            await workflow.wait_condition(
                lambda: self.clarification_answers is not None,
                timeout=timedelta(hours=24),
            )
            refined_query = f"{query}\nAdditional context: {self.clarification_answers}"
        else:
            refined_query = query

        # Step 3: 生成搜索计划
        search_plan = await workflow.execute_activity(
            planner_agent,
            refined_query,
            start_to_close_timeout=timedelta(minutes=2),
        )

        # Step 4: 并行搜索 —— 每个搜索都是独立的 Activity
        import asyncio
        search_results = await asyncio.gather(*[
            workflow.execute_activity(
                search_agent,
                search_query,
                start_to_close_timeout=timedelta(minutes=2),
                retry_policy=RetryPolicy(
                    maximum_attempts=3,
                    initial_interval=timedelta(seconds=5),
                ),
            )
            for search_query in search_plan.queries
        ])

        # Step 5: 综合生成报告
        report = await workflow.execute_activity(
            writer_agent,
            {"query": refined_query, "search_results": search_results},
            start_to_close_timeout=timedelta(minutes=5),
        )

        # Step 6: 生成 PDF
        pdf = await workflow.execute_activity(
            pdf_generator,
            report,
            start_to_close_timeout=timedelta(minutes=2),
        )

        return {"report": report, "pdf_url": pdf.url}

关键设计点:

  1. Human-in-the-loopwait_condition + signal 组合,让 Workflow 可以等待用户输入,不限时长
  2. 并行搜索asyncio.gather 同时发起多个搜索 Activity,充分利用 I/O 并发
  3. 自动重试:LLM 调用容易失败(超时、限流),RetryPolicy 自动处理
  4. 持久化:即使 Worker 在 Step 4 执行到一半崩溃,重启后自动从 Event History 恢复

5.3 Coinbase 的 Temporal MCP Server:AI 编辑器直接调试 Workflow

Coinbase 的工程师做了一个了不起的项目:Temporal MCP Server——让 AI 编辑器(如 Cursor、Claude Code)直接读取 Temporal 的运行时状态。

# Coinbase 开源的 Temporal MCP Server 核心代码
from fastmcp import FastMCP

mcp = FastMCP("temporal-mcp")

@mcp.tool()
async def list_workflows(
    namespace: str,
    account: str,
    query: str | None = None,
    limit: int = 10,
) -> str:
    """List Workflow Executions in a Temporal Namespace."""
    client = await get_client(account)
    workflows = []
    async for wf in client.list_workflows(
        namespace=namespace,
        query=query,
        page_size=limit,
    ):
        workflows.append({
            "id": wf.id,
            "run_id": wf.run_id,
            "status": wf.status.name,
            "start_time": str(wf.start_time),
            "close_time": str(wf.close_time) if wf.close_time else None,
        })
    return json.dumps(workflows, indent=2)

@mcp.tool()
async def get_workflow_history(
    namespace: str,
    workflow_id: str,
    run_id: str | None = None,
    account: str = "default",
) -> str:
    """Get the full Event History for a Workflow Execution."""
    client = await get_client(account)
    handle = client.get_workflow_handle(workflow_id, run_id)
    events = []
    async for event in handle.fetch_history():
        events.append({
            "event_id": event.event_id,
            "event_type": event.event_type.name,
            "timestamp": str(event.event_time),
            # 精简输出,避免 token 爆炸
        })
    return json.dumps(events, indent=2)

使用方式——在 Cursor 里直接用自然语言:

你: "列出 payments namespace 里最近失败的 Workflow"

AI: 调用 list_workflows(namespace="payments", query="ExecutionStatus='Failed'")
    → 返回最近的 5 个失败 Workflow,附上 Run ID 和失败时间

你: "看看第一个为什么失败了"

AI: 调用 get_workflow_history(namespace="payments", workflow_id="pay-78901")
    → 定位到 Event #23: ActivityTaskFailed
    → 原因:deduct_inventory 超时(30s timeout exceeded)
    → 结合代码搜索,找到 deduct_inventory 函数的实现
    → 建议:该 Activity 内部调用的库存服务 P99 延迟最近飙升至 45s

Coinbase 的数据:两个月内 14,000+ 次 MCP 调用,其中 list_workflowsget_workflow_history 是使用最频繁的两个工具。这说明工程师真的在用它替代 Temporal Web UI。


六、Go SDK 实战:电商订单工作流

Python 适合 AI Agent 场景,但在高吞吐的后端服务中,Go 才是主力。下面用一个完整的电商订单工作流展示 Go SDK 的用法。

6.1 Workflow 定义

package workflow

import (
	"time"

	"go.temporal.io/sdk/workflow"
)

type OrderRequest struct {
	OrderID    string  `json:"order_id"`
	UserID     string  `json:"user_id"`
	SKU        string  `json:"sku"`
	Quantity   int     `json:"quantity"`
	TotalPrice float64 `json:"total_price"`
}

type OrderResult struct {
	ShipmentID string `json:"shipment_id"`
	TrackingNo string `json:"tracking_no"`
}

type OrderWorkflow struct {
	paymentCallback *PaymentCallback
	cancelReason    string
}

func (w *OrderWorkflow) SubmitPaymentCallback(ctx workflow.Context, callback PaymentCallback) {
	w.paymentCallback = &callback
}

func (w *OrderWorkflow) CancelOrder(ctx workflow.Context, reason string) {
	w.cancelReason = reason
}

func (w *OrderWorkflow) Execute(ctx workflow.Context, req OrderRequest) (OrderResult, error) {
	ao := workflow.ActivityOptions{
		StartToCloseTimeout: 30 * time.Second,
		RetryPolicy: &workflow.RetryPolicy{
			MaximumAttempts:    3,
			InitialInterval:    1 * time.Second,
			MaximumInterval:    10 * time.Second,
			BackoffCoefficient: 2.0,
		},
	}
	ctx = workflow.WithActivityOptions(ctx, ao)

	// Step 1: 扣库存
	var inventoryResult DeductInventoryResult
	err := workflow.ExecuteActivity(ctx, DeductInventory, req).Get(ctx, &inventoryResult)
	if err != nil {
		return OrderResult{}, err
	}

	// Step 2: 创建支付单
	var paymentResult CreatePaymentResult
	err = workflow.ExecuteActivity(ctx, CreatePayment, inventoryResult).Get(ctx, &paymentResult)
	if err != nil {
		// 支付创建失败,补偿:归还库存
		_ = workflow.ExecuteActivity(ctx, RestoreInventory, req).Get(ctx, nil)
		return OrderResult{}, err
	}

	// Step 3: 等待支付回调或取消
	workflow.await(ctx, func() bool {
		return w.paymentCallback != nil || w.cancelReason != ""
	})

	if w.cancelReason != "" {
		// 用户取消,归还库存
		_ = workflow.ExecuteActivity(ctx, RestoreInventory, req).Get(ctx, nil)
		_ = workflow.ExecuteActivity(ctx, CancelPayment, paymentResult.PaymentID).Get(ctx, nil)
		return OrderResult{}, fmt.Errorf("order cancelled: %s", w.cancelReason)
	}

	// Step 4: 发货
	shipAo := workflow.ActivityOptions{
		StartToCloseTimeout: 5 * time.Minute, // 发货可能较慢
	}
	shipCtx := workflow.WithActivityOptions(ctx, shipAo)

	var shipmentResult ShipOrderResult
	err = workflow.ExecuteActivity(shipCtx, ShipOrder, paymentResult).Get(shipCtx, &shipmentResult)
	if err != nil {
		return OrderResult{}, err
	}

	// Step 5: 发送通知
	_ = workflow.ExecuteActivity(ctx, SendNotification, req.UserID, shipmentResult).Get(ctx, nil)

	return OrderResult{
		ShipmentID: shipmentResult.ID,
		TrackingNo: shipmentResult.TrackingNo,
	}, nil
}

6.2 Activity 实现

package activities

import (
	"context"
	"database/sql"
	"fmt"
	"net/http"

	"go.temporal.io/sdk/activity"
)

func DeductInventory(ctx context.Context, req OrderRequest) (DeductInventoryResult, error) {
	logger := activity.GetLogger(ctx)
	logger.Info("Deducting inventory", "order_id", req.OrderID, "sku", req.SKU)

	// 获取心跳信息,支持优雅中断
	heartbeatDetails := activity.GetHeartbeatDetails(ctx)
	if heartbeatDetails != nil {
		logger.Info("Resuming from heartbeat", "details", heartbeatDetails)
	}

	// 调用库存服务
	resp, err := http.Post(
		"http://inventory-service:8080/deduct",
		"application/json",
		bytes.NewReader(marshal(req)),
	)
	if err != nil {
		return DeductInventoryResult{}, fmt.Errorf("inventory service unavailable: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		return DeductInventoryResult{}, fmt.Errorf("deduction failed: status %d", resp.StatusCode)
	}

	// 发送心跳,记录进度
	activity.RecordHeartbeat(ctx, req.OrderID)

	return DeductInventoryResult{Success: true, SKU: req.SKU}, nil
}

func CreatePayment(ctx context.Context, inv DeductInventoryResult) (CreatePaymentResult, error) {
	// 调用支付服务...
}

func ShipOrder(ctx context.Context, payment CreatePaymentResult) (ShipOrderResult, error) {
	// 调用物流服务...
	// 长时间运行的 Activity,需要定期心跳
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			activity.RecordHeartbeat(ctx, "shipping in progress")
		// ... 实际发货逻辑
		}
	}
}

func RestoreInventory(ctx context.Context, req OrderRequest) error {
	// 补偿事务:归还库存
}

func CancelPayment(ctx context.Context, paymentID string) error {
	// 补偿事务:取消支付
}

func SendNotification(ctx context.Context, userID string, result ShipOrderResult) error {
	// 发送通知...
}

6.3 Worker 启动

package main

import (
	"log"

	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
)

func main() {
	c, err := client.Dial(client.Options{
		HostPort:  "localhost:7233",
		Namespace: "default",
	})
	if err != nil {
		log.Fatalln("Unable to create Temporal client", err)
	}
	defer c.Close()

	w := worker.New(c, "order-task-queue", worker.Options{
		MaxConcurrentWorkflowTaskExecutionSize:     100,
		MaxConcurrentActivityExecutionSize:         500,
		MaxConcurrentLocalActivityExecutionSize:    200,
		WorkflowPanicPolicy:                        worker.FailWorkflow,
	})

	w.RegisterWorkflow(OrderWorkflow.Execute)
	w.RegisterActivity(DeductInventory)
	w.RegisterActivity(CreatePayment)
	w.RegisterActivity(ShipOrder)
	w.RegisterActivity(RestoreInventory)
	w.RegisterActivity(CancelPayment)
	w.RegisterActivity(SendNotification)

	if err := w.Run(worker.InterruptCh()); err != nil {
		log.Fatalln("Unable to start worker", err)
	}
}

6.4 启动工作流

package main

import (
	"context"
	"log"

	"go.temporal.io/sdk/client"
)

func main() {
	c, err := client.Dial(client.Options{HostPort: "localhost:7233"})
	if err != nil {
		log.Fatalln(err)
	}
	defer c.Close()

	workflowID := "order-20260617-001"
	options := client.StartWorkflowOptions{
		ID:                       workflowID,
		TaskQueue:                "order-task-queue",
		WorkflowExecutionTimeout: 7 * 24 * time.Hour, // 订单最长 7 天
		WorkflowTaskTimeout:      10 * time.Second,
		CronSchedule:             "", // 非定时任务
	}

	we, err := c.ExecuteWorkflow(context.Background(), options, OrderWorkflow.Execute, OrderRequest{
		OrderID:    "20260617-001",
		UserID:     "user-42",
		SKU:        "SKU-001",
		Quantity:   1,
		TotalPrice: 299.99,
	})
	if err != nil {
		log.Fatalln("Unable to execute workflow", err)
	}

	log.Printf("Workflow started: %s, RunID: %s", we.GetID(), we.GetRunID())

	// 模拟支付回调
	time.Sleep(5 * time.Second)
	c.SignalWorkflow(context.Background(), workflowID, "", "SubmitPaymentCallback", PaymentCallback{
		PaymentID: "pay-123",
		Status:    "success",
	})
}

七、性能优化与生产级调优

7.1 History 膨胀问题与 Continue-As-New

Workflow 的 Event History 会持续增长。一个运行数月的 Workflow,History 可能包含数十万条事件,导致重放变慢、内存占用增加。

解决方案:Continue-As-New——在当前 Workflow 执行完毕前,自动启动一个新的 Workflow Run,把状态传递过去:

@workflow.defn
class LongRunningMonitor:
    @workflow.run
    async def run(self, config: MonitorConfig):
        while True:
            # 每分钟检查一次
            result = await workflow.execute_activity(
                check_health,
                config,
                start_to_close_timeout=timedelta(seconds=30),
            )
            
            if result.unhealthy:
                await workflow.execute_activity(send_alert, result)
            
            await workflow.sleep(timedelta(minutes=1))
            
            # History 超过 10000 条事件时,Continue-As-New
            if workflow.info().history_length > 10000:
                # 把当前状态传给新的 Run
                workflow.continue_as_new(config)

最佳实践

  • 设置 workflow_execution_timeout 防止无限运行
  • History 超过 10,000 条事件时触发 Continue-As-New
  • 对于真正需要长期运行的 Workflow,考虑 Child Workflow 分拆

7.2 Worker 性能参数

w := worker.New(c, "task-queue", worker.Options{
    // Workflow Task 并发数:取决于 Workflow 逻辑复杂度
    // CPU 密集型(大量重放):设为 CPU 核心数
    // I/O 密集型(大量 await):可以更高
    MaxConcurrentWorkflowTaskExecutionSize: 100,
    
    // Activity 并发数:取决于下游服务承受能力
    // 如果 Activity 调用外部 API,注意限流
    MaxConcurrentActivityExecutionSize: 500,
    
    // Activity Task 心跳超时
    // 长时间 Activity 必须定期 RecordHeartbeat
    ActivityHeartbeatTimeout: 30 * time.Second,
    
    // 本地 Activity(与 Workflow 同进程执行,不经过 Matching Service)
    // 适合轻量操作(如数据库查询),减少网络开销
    MaxConcurrentLocalActivityExecutionSize: 200,
})

7.3 Persistence Layer 选型

数据库优势劣势推荐场景
PostgreSQL运维简单,生态丰富单节点写入瓶颈中小规模(< 500 QPS)
Cassandra高写入吞吐,线性扩展运维复杂,最终一致大规模(> 1000 QPS)
MySQL团队熟悉度高与 PG 类似的瓶颈已有 MySQL 基础设施
Spanner全球分布式一致性多区域部署
SQLite零依赖,嵌入式不支持集群开发/测试

7.4 监控指标

Temporal 暴露了 Prometheus 格式的指标,关键看板:

# 以下指标建议在 Grafana 中配置告警

# Workflow 执行延迟 P99
temporal_workflow_task_execution_latency{namespace="production"} > 1s

# Activity 执行失败率
rate(temporal_activity_execution_failed{namespace="production"}[5m]) 
  / rate(temporal_activity_execution_completed{namespace="production"}[5m]) > 0.05

# Task Queue 积压
temporal_task_queue_schedule_to_start_latency{namespace="production"} > 5s

# History 长度(防止膨胀)
temporal_workflow_history_length{namespace="production"} > 50000

# Worker 活跃数
temporal_worker_task_slots_available{namespace="production"} < 5

八、Temporal vs 其他工作流引擎:2026 年度对比

维度TemporalApache AirflowCamunda 8Netflix ConductorInngest
执行模型持久化执行DAG 调度BPMN 流程DAG 调度事件驱动
代码方式原生代码Python DSLBPMN XML + 代码JSON DSL原生代码
长时间运行✅ 天/周/月❌ 小时级✅ 支持✅ 支持✅ 支持
Human-in-the-loop✅ Signal + Wait❌ 不原生支持✅ User Task✅ 支持✅ 支持
AI Agent 编排✅ 原生适配❌ 不适合⚠️ 需要适配⚠️ 需要适配✅ 适配
多语言 SDKGo/Python/TS/Java/PHPPython 优先Java 优先Java/PythonTS/Python/Go
自托管❌ 仅 Cloud
Cloud 托管✅ Temporal Cloud✅ Astronomer✅ Camunda SaaS✅ Orkes✅ Inngest Cloud
社区规模52K+ GitHub Stars42K+3.5K+13K+10K+

选型建议

  • 数据管道 / ETL → Airflow(生态成熟,Operator 丰富)
  • 企业审批流 / BPMN → Camunda(可视化流程设计,合规友好)
  • 微服务编排 / AI Agent → Temporal(持久化执行,代码优先,多语言)
  • 轻量级事件驱动 → Inngest(上手快,但功能有限)

九、Child Workflow 与 Saga 模式

9.1 Child Workflow:组合与隔离

当主 Workflow 变得过于复杂时,可以用 Child Workflow 拆分:

@workflow.defn
class TripBookingWorkflow:
    @workflow.run
    async def run(self, trip: TripRequest) -> TripResult:
        # 并行预订航班、酒店、租车
        import asyncio
        flight, hotel, car = await asyncio.gather(
            workflow.execute_child_workflow(
                BookFlightWorkflow.run,
                trip.flight,
                workflow_id=f"flight-{trip.id}",
            ),
            workflow.execute_child_workflow(
                BookHotelWorkflow.run,
                trip.hotel,
                workflow_id=f"hotel-{trip.id}",
            ),
            workflow.execute_child_workflow(
                BookCarWorkflow.run,
                trip.car,
                workflow_id=f"car-{trip.id}",
            ),
        )
        return TripResult(flight=flight, hotel=hotel, car=car)

Child Workflow 的优势:

  1. History 隔离:每个 Child 有独立的 Event History,不会膨胀 Parent
  2. 独立重试:Child 失败后可以独立重试,不影响 Parent 的其他 Child
  3. 权限隔离:不同的 Child 可以由不同的 Worker 执行

9.2 Saga 模式:分布式事务补偿

上面的 TripBooking 有个问题:如果航班和酒店都订好了,但租车失败了怎么办?需要补偿——取消航班和酒店。

Temporal 的 Saga 模式:

@workflow.defn
class TripBookingSagaWorkflow:
    @workflow.run
    async def run(self, trip: TripRequest) -> TripResult:
        # 编排 + 补偿函数的对应关系
        compensations = []

        try:
            # 预订航班
            flight = await workflow.execute_child_workflow(
                BookFlightWorkflow.run, trip.flight
            )
            compensations.append(lambda: workflow.execute_child_workflow(
                CancelFlightWorkflow.run, flight
            ))

            # 预订酒店
            hotel = await workflow.execute_child_workflow(
                BookHotelWorkflow.run, trip.hotel
            )
            compensations.append(lambda: workflow.execute_child_workflow(
                CancelHotelWorkflow.run, hotel
            ))

            # 预订租车
            car = await workflow.execute_child_workflow(
                BookCarWorkflow.run, trip.car
            )
            compensations.append(lambda: workflow.execute_child_workflow(
                CancelCarWorkflow.run, car
            ))

            return TripResult(flight=flight, hotel=hotel, car=car)

        except Exception:
            # 按逆序执行补偿
            for compensation in reversed(compensations):
                try:
                    await compensation()
                except Exception as e:
                    # 补偿失败需要人工介入
                    await workflow.execute_activity(
                        alert_manual_intervention,
                        {"error": str(e), "trip_id": trip.id},
                    )
            raise

十、安全与多租户

10.1 Namespace 隔离

Temporal 用 Namespace 实现多租户隔离:

# 创建 Namespace
temporal operator namespace create \
  --namespace production \
  --retention 30d \
  --description "Production workloads"

temporal operator namespace create \
  --namespace staging \
  --retention 7d \
  --description "Staging workloads"

每个 Namespace 有独立的:

  • Task Queue
  • Workflow ID 空间
  • 搜索属性
  • 数据保留策略

10.2 Payload Codec:数据加密

如果 Event History 中包含敏感信息(PII、密钥),可以用 Payload Codec 加密:

from temporalio.converter import PayloadCodec, Payload
from cryptography.fernet import Fernet

class EncryptionCodec(PayloadCodec):
    def __init__(self, key: bytes):
        self._fernet = Fernet(key)

    async def encode(self, payloads: list[Payload]) -> list[Payload]:
        return [
            Payload(
                metadata={"encoding": b"binary/encrypted"},
                data=self._fernet.encrypt(p.data),
            )
            for p in payloads
        ]

    async def decode(self, payloads: list[Payload]) -> list[Payload]:
        result = []
        for p in payloads:
            if p.metadata.get("encoding") == b"binary/encrypted":
                result.append(Payload(
                    metadata={"encoding": b"json/plain"},
                    data=self._fernet.decrypt(p.data),
                ))
            else:
                result.append(p)
        return result

# 注册 Codec
client = await Client.connect(
    "localhost:7233",
    data_converter=dataclasses.replace(
        data_converter,
        payload_codec=EncryptionCodec(key),
    ),
)

Coinbase 的 MCP Server 就是用 Codec 来解码 Payload 的——在 AI 编辑器中看到的是明文,但存储在 Temporal 中的是密文。

10.3 mTLS 与 Authorization

生产环境推荐 mTLS:

# temporal-server-mtls.yaml
global:
  mtls:
    internode:
      enabled: true
      certFile: /certs/server.crt
      keyFile: /certs/server.key
      caFile: /certs/ca.crt
    frontend:
      enabled: true
      certFile: /certs/server.crt
      keyFile: /certs/server.key
      caFile: /certs/ca.crt
  authorization:
    jwtKeySet:
      uri: "https://auth.example.com/.well-known/jwks.json"
    permissionsClaimName: "permissions"

十一、从零搭建生产级 Temporal 集群

11.1 Kubernetes 部署

# temporal-cluster.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: temporal-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: temporal-server
  template:
    spec:
      containers:
        - name: temporal
          image: temporalio/server:1.26.0
          ports:
            - containerPort: 7233  # gRPC (Frontend)
            - containerPort: 9090  # Metrics (Prometheus)
          env:
            - name: DB
              value: "postgresql"
            - name: POSTGRES_SEEDS
              value: "postgres-cluster.default.svc.cluster.local:5432"
            - name: POSTGRES_USER
              valueFrom:
                secretKeyRef:
                  name: temporal-db-creds
                  key: username
            - name: POSTGRES_PWD
              valueFrom:
                secretKeyRef:
                  name: temporal-db-creds
                  key: password
            - name: ENABLE_ES
              value: "true"  # Elasticsearch for Advanced Visibility
            - name: ES_SEEDS
              value: "elasticsearch.default.svc.cluster.local:9200"
          resources:
            requests:
              cpu: "2"
              memory: "4Gi"
            limits:
              cpu: "4"
              memory: "8Gi"
          livenessProbe:
            grpc:
              port: 7233
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            grpc:
              port: 7233
            initialDelaySeconds: 10
            periodSeconds: 5

11.2 高可用考量

  • Server:3+ 节点,每个 History Service 负责一部分 Workflow(分片)
  • 数据库:PostgreSQL 使用 Patroni 或 Cloud SQL HA;Cassandra 使用 3 DC 部署
  • Worker:至少 2 个副本,分布在不同可用区
  • Elasticsearch:3 节点集群,用于高级可见性查询

11.3 Temporal Cloud vs 自托管

维度Temporal Cloud自托管
运维成本高(DB + ES + Server + Worker)
初始费用$0.04/1000 Workflow Task服务器成本
SLA99.9%自行保障
定制性有限完全可控
合规SOC2 Type II自行审计
推荐< 10M tasks/month> 50M tasks/month

十二、常见陷阱与排障指南

12.1 Workflow 确定性违规

症状NonDeterministicError,Workflow 执行到一半报错。

原因:Workflow 代码中包含了非确定性操作,导致重放时行为不一致。

排查

# 查看 Workflow History 中是否有异常
temporal workflow show \
  --workflow-id my-workflow \
  --run-id <run-id> \
  --output json | jq '.events[] | select(.eventType == "WORKFLOW_TASK_FAILED")'

常见违规

# ❌ 错误:使用 Python 内置 random
import random
value = random.randint(0, 100)

# ✅ 正确:使用 Temporal 提供的确定性随机
value = workflow.random().randint(0, 100)

# ❌ 错误:获取当前时间
now = datetime.now()

# ✅ 正确:使用 Temporal 提供的时间
now = workflow.now()

# ❌ 错误:遍历 Go map(顺序不确定)
for k, v := range myMap { ... }

# ✅ 正确:排序后遍历
keys := make([]string, 0, len(myMap))
for k := range myMap { keys = append(keys, k) }
sort.Strings(keys)
for _, k := range keys { v := myMap[k]; ... }

12.2 Activity 幂等性

Activity 可能被重试,因此必须幂等:

@activity.defn
async def create_payment(order: OrderRequest) -> PaymentResult:
    # ❌ 非幂等:每次调用都会创建新支付单
    # return payment_api.create(order)

    # ✅ 幂等:基于 order_id 去重
    existing = payment_api.get_by_order_id(order.order_id)
    if existing:
        return existing  # 已经创建过,直接返回
    
    return payment_api.create(order)

实现幂等性的几种方式

  1. 自然幂等:GET 请求、读数据库(天然幂等)
  2. 唯一键去重:基于业务 ID 检查是否已处理
  3. Token 机制:下游服务提供 idempotency key
  4. 数据库约束:UNIQUE 约束防止重复插入

12.3 Workflow Task 超时

症状WorkflowTaskTimedOut,Workflow 看起来卡住了。

原因:Worker 没有及时处理 Workflow Task(通常是因为 Worker 全部下线或 Task Queue 名称错误)。

排查

# 检查 Task Queue 的 Worker 数量
temporal task-queue describe \
  --task-queue order-task-queue \
  --namespace production

确保 PollerCount > 0。如果为 0,说明没有 Worker 在监听这个 Task Queue。


十三、总结与展望

13.1 Temporal 的核心价值

场景不用 Temporal用 Temporal
长时间业务流程状态机 + DB + 定时任务一段代码
故障恢复手动补偿 + 运维脚本自动重放恢复
人机交互Webhook + 轮询Signal + Wait
并行任务asyncio.gather 还是自己写?Activity 并行 + 自动聚合
可观测性自己埋点 + APMEvent History + UI
AI Agent 编排脆弱、不可恢复持久化、可恢复、可观测

13.2 2026 年下半年展望

Replay 2026 发布的新特性预示着 Temporal 的演进方向:

  1. Serverless Workers GA:降低运维门槛,让更多团队用起来
  2. Workflow Streams 生态:与 Kafka/Pub/Sub/Redpanda 深度整合,成为实时事件中枢
  3. AI Agent 原生支持:与 MCP、知识图谱、向量数据库的集成会更加紧密
  4. 多区域部署:基于 Spanner 的全球分布式 Persistence Layer
  5. 可视化编排:代码优先不会变,但会提供可视化工具辅助理解复杂 Workflow

13.3 什么时候该用 Temporal?

该用

  • 业务流程有多个步骤,且步骤之间有依赖或等待
  • 需要人工介入(审批、确认)
  • 流程可能运行很长时间(分钟、小时、天)
  • 故障后需要自动恢复,不能丢失进度
  • 编排 AI Agent 的多轮交互

不该用

  • 纯 CRUD 操作(直接用框架就行了)
  • 毫秒级实时响应(Temporal 的延迟在 10ms+ 级别)
  • 一次性批量任务(用 Airflow 或 Spark)
  • 简单的异步消息传递(用 Kafka/RabbitMQ 足矣)

Temporal 不是银弹,但在「需要可靠执行的复杂业务流程」这个领域,它是 2026 年最好的选择。Serverless Workers、Standalone Activities、Workflow Streams 这三大新特性,让它的上手门槛进一步降低,适用场景进一步拓宽。如果你还没用过 Temporal,现在就是最好的时机。

推荐文章

在Vue3中实现代码分割和懒加载
2024-11-17 06:18:00 +0800 CST
php指定版本安装php扩展
2024-11-19 04:10:55 +0800 CST
页面不存在404
2024-11-19 02:13:01 +0800 CST
Vue3 中提供了哪些新的指令
2024-11-19 01:48:20 +0800 CST
windows安装sphinx3.0.3(中文检索)
2024-11-17 05:23:31 +0800 CST
如何在 Vue 3 中使用 TypeScript?
2024-11-18 22:30:18 +0800 CST
程序员茄子在线接单