编程 Temporal 深度实战:当「持久化执行」重塑分布式系统可靠性——从事件溯源到 Serverless Workers、从 Workflow Streams 到 AI Agent 编排的生产级完全指南(2026)

2026-06-20 08:56:35 +0800 CST views 8

Temporal 深度实战:当「持久化执行」重塑分布式系统可靠性——从事件溯源到 Serverless Workers、从 Workflow Streams 到 AI Agent 编排的生产级完全指南(2026)

Replay 2026 刚刚发布了四项重磅更新:Serverless Workers、Standalone Activities、Workflow Streams,以及与 Google ADK 和 OpenAI Agents SDK 的深度集成。这不是一次普通的版本迭代——它在重新定义「代码可靠性」的边界。

背景:分布式系统的「可靠性税」

你一定写过这样的代码:

# ❌ 传统方式:手动编排,脆弱且难以维护
async def process_order(order_id: str):
    try:
        order = await order_service.get(order_id)
        payment = await payment_service.charge(order.amount)
        await inventory_service.reserve(order.items)
        await shipping_service.schedule(order.address)
        await notification_service.send(order.user_id, "订单已确认")
    except PaymentError:
        await inventory_service.release(order.items)
        raise
    except ShippingError:
        await payment_service.refund(payment.id)
        await inventory_service.release(order.items)
        raise

这段代码有什么问题?

  • 无重试机制:网络抖动直接失败
  • 无持久化:进程崩溃后状态丢失
  • 补偿逻辑复杂:每一步失败都需要手动回滚
  • 无法查询状态:订单进行到哪一步了?不知道
  • 无法暂停恢复:等待人工审核怎么办?

这是分布式系统的「可靠性税」——你必须为每个操作写重试、补偿、状态追踪逻辑。当一个业务流程涉及 5 个以上服务时,这些胶水代码会膨胀到让你怀疑人生。

Temporal 的创始人 Maxim Fateev 和 Sudhir Tonse,曾经在 AWS 负责 SQS 和 SWF,在 Uber 创建了 Cadence(Temporal 的前身)。他们花了 20 年时间回答一个核心问题:

如果代码的执行本身就是可靠的呢?

核心概念:持久化执行(Durable Execution)

Temporal 的核心思想极其简单,但实现却极其精妙:

把代码的每一步执行状态都持久化到事件日志中,任何崩溃都能从断点恢复。

这不是数据库的 ACID,也不是消息队列的 At-Least-Once。这是一种全新的编程范式——你写的代码,天生就是可靠的

事件溯源:代码的「时间旅行」

Temporal 的底层是一个事件溯源系统。每一个 Workflow 执行,都会生成一个不可变的事件序列:

WorkflowExecutionStarted →
  ActivityTaskScheduled → ActivityTaskStarted → ActivityTaskCompleted →
  TimerStarted → TimerFired →
  ActivityTaskScheduled → ActivityTaskStarted → ActivityTaskCompleted →
  WorkflowExecutionCompleted

这些事件记录了 Workflow 执行的完整历史。当 Worker 崩溃后重启,Temporal 会重放(Replay)这些事件,将 Workflow 的状态恢复到崩溃前的精确位置,然后继续执行。

这意味着:

  • 进程崩溃:自动从断点恢复,零数据丢失
  • 网络抖动:自动重试,指数退避
  • 部署更新:版本化 Worker 接管旧 Workflow
  • 调试排查:完整的事件历史,可回放可追溯

架构总览

Temporal 的架构由四个核心组件构成:

┌──────────────────────────────────────────────────────────────┐
│                        Temporal Cluster                      │
│                                                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │   Frontend   │  │  History    │  │    Matching          │  │
│  │   Service    │  │  Service    │  │    Service          │  │
│  │              │  │             │  │                      │  │
│  │ (API网关)    │  │ (事件存储)  │  │ (任务路由/匹配)      │  │
│  └──────┬───────┘  └──────┬──────┘  └──────────┬───────────┘  │
│         │                 │                     │              │
│         └────────┬────────┘                     │              │
│                  │                               │              │
│         ┌────────┴────────┐                      │              │
│         │  Worker Service  │                      │              │
│         │  (系统Worker)    │                      │              │
│         └─────────────────┘                      │              │
└──────────────────────────────────────────────────┼──────────────┘
                                                   │
                                    ┌──────────────┴──────────────┐
                                    │        Your Workers          │
                                    │                              │
                                    │  ┌─────────┐  ┌─────────┐   │
                                    │  │ Workflow │  │ Activity │   │
                                    │  │  Worker  │  │  Worker  │   │
                                    │  └─────────┘  └─────────┘   │
                                    └──────────────────────────────┘
  • Frontend Service:API 网关,处理所有客户端请求
  • History Service:存储和检索 Workflow 执行历史(基于 Cassandra/PostgreSQL/MySQL)
  • Matching Service:任务队列和调度,将任务路由到可用的 Worker
  • Worker Service:系统内部 Worker,处理定时任务、系统 Workflow 等

你的应用代码作为 Worker 运行,通过 Long Polling 从 Matching Service 拉取任务。

核心编程模型:Workflow 与 Activity

Temporal 的编程模型只有两个核心抽象:WorkflowActivity

Workflow:确定性的业务流程

Workflow 是业务流程的编排器。它必须是确定性的——同样的输入,无论重放多少次,都要走完全相同的路径。

from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from your_activities import validate_order, charge_payment, ship_order


@dataclass
class OrderInput:
    order_id: str
    user_id: str
    amount: float
    items: list[str]
    address: str


@workflow.defn(name="OrderProcessingWorkflow")
class OrderProcessingWorkflow:
    @workflow.run
    async def run(self, input: OrderInput) -> str:
        # Step 1: 验证订单
        await workflow.execute_activity(
            validate_order,
            input.order_id,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=workflow.RetryPolicy(
                maximum_attempts=3,
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=10),
            ),
        )

        # Step 2: 扣款
        payment_id = await workflow.execute_activity(
            charge_payment,
            args=[input.user_id, input.amount],
            start_to_close_timeout=timedelta(seconds=60),
        )

        # Step 3: 发货
        tracking = await workflow.execute_activity(
            ship_order,
            args=[input.order_id, input.address],
            start_to_close_timeout=timedelta(seconds=120),
        )

        return tracking

为什么 Workflow 必须是确定性的? 因为 Temporal 通过重放事件历史来恢复状态。如果每次重放走了不同的路径,状态就不一致了。所以:

# ❌ 禁止:非确定性操作
import random
import time
import requests

@workflow.defn
class BadWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        value = random.randint(1, 100)          # 每次重放结果不同
        now = time.time()                         # 每次重放时间不同
        resp = requests.get("https://api.example.com")  # 网络I/O,不可重放
        return f"{value}-{now}-{resp.text}"

# ✅ 正确:使用 Temporal 提供的确定性 API
@workflow.defn
class GoodWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        value = workflow.random().randint(1, 100)  # 确定性随机数
        now = workflow.now()                        # 确定性时间
        resp = await workflow.execute_activity(      # 网络I/O通过Activity
            fetch_data, "https://api.example.com",
            start_to_close_timeout=timedelta(seconds=10),
        )
        return f"{value}-{now}-{resp}"

Temporal SDK 提供了一系列 replay-safe 的替代 API:

禁止使用替代方案
random.random()workflow.random()
uuid.uuid4()workflow.uuid4()
datetime.now()workflow.now()
print() / loggingworkflow.logger
time.sleep()workflow.sleep()
直接网络调用workflow.execute_activity()

Activity:副作用与失败处理

Activity 是执行实际副作用的地方——网络调用、数据库操作、文件 I/O。Activity 不需要是确定性的,因为失败后就是重试,不需要重放。

from temporalio import activity


@activity.defn
async def charge_payment(user_id: str, amount: float) -> str:
    # Activity 内部可以执行任何操作
    # 网络调用、数据库操作、文件I/O 都可以
    try:
        response = await payment_client.charge(
            user_id=user_id,
            amount=amount,
            currency="USD",
        )
        activity.logger.info(f"Payment charged: {response.id}")
        return response.id
    except PaymentDeclinedError as e:
        # 业务异常:不需要重试
        raise activity.ApplicationError(
            "Payment declined",
            non_retryable=True,
            details={"user_id": user_id, "amount": amount},
        )
    except NetworkError as e:
        # 网络异常:自动重试
        raise  # Temporal 会按照 RetryPolicy 自动重试


@activity.defn
async def process_large_dataset(dataset_url: str) -> dict:
    """长时间运行的 Activity 需要心跳"""
    total = 0
    processed = 0

    async for batch in download_dataset(dataset_url):
        result = await transform_batch(batch)
        await save_batch(result)
        total += len(result)
        processed += 1

        # 关键:长时间 Activity 必须发心跳
        # 心跳让 Temporal 知道 Activity 还活着
        # 心跳中可以携带进度信息,用于重试时恢复进度
        activity.heartbeat(
            progress=processed,
            total_batches=total,
            last_checkpoint=result.checkpoint_id,
        )

    return {"total_processed": total}

超时与重试策略

Temporal 提供了精细的超时控制,覆盖 Activity 生命周期的每个阶段:

from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy

@workflow.defn
class CheckoutWorkflow:
    @workflow.run
    async def run(self, order: OrderInput) -> str:
        result = await workflow.execute_activity(
            process_payment,
            args=[order],
            # 四层超时控制
            schedule_to_start_timeout=timedelta(seconds=5),   # 排队等待超时
            start_to_close_timeout=timedelta(minutes=2),     # 执行超时
            schedule_to_close_timeout=timedelta(minutes=3),   # 总超时
            heartbeat_timeout=timedelta(seconds=30),          # 心跳超时

            # 重试策略
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=30),
                maximum_attempts=5,
                # 非重试异常
                non_retryable_error_types=[
                    "ApplicationError:PaymentDeclined",
                    "ApplicationError:InvalidOrder",
                ],
            ),
        )
        return result

四层超时的含义:

超时类型含义适用场景
schedule_to_start任务排队等待时间Worker 不足告警
start_to_closeActivity 执行时间业务超时控制
schedule_to_close排队+执行总时间SLA 保障
heartbeat心跳间隔长时间 Activity 检活

Replay 2026 重磅更新

2026 年的 Replay 大会发布了四项核心更新。让我们逐一深入。

一、Serverless Workers:零基础设施扩缩容

这是 Temporal 最具革命性的更新。传统 Worker 是一个长驻进程,你需要自己管理部署、扩缩容、健康检查。Serverless Workers 把这一切交给了 Temporal。

工作原理

                    ┌──────────────────────┐
                    │   Temporal Cluster   │
                    │                      │
  Task 提交 ──────► │   Matching Service   │
                    │         │            │
                    │    sync match?       │
                    │    ├─ Yes → 已有Worker处理
                    │    └─ No  → 触发 WCI │
                    │                      │
                    │   ┌──────────────┐   │
                    │   │ WCI (系统    │   │
                    │   │ Workflow)    │   │
                    │   │              │   │
                    │   │ 调用 Lambda  │   │
                    │   └──────┬───────┘   │
                    └──────────┼───────────┘
                               │
                    ┌──────────▼───────────┐
                    │  AWS Lambda (Worker)  │
                    │                       │
                    │  1. 启动 Worker        │
                    │  2. 连接 Temporal      │
                    │  3. 拉取任务            │
                    │  4. 执行               │
                    │  5. 优雅关闭           │
                    └───────────────────────┘

核心机制是 Worker Controller Instance (WCI)——一个系统 Workflow,运行在每个 Worker Deployment Version 上。当 Matching Service 发现没有可用的 Worker 时:

  1. WCI 收到 sync match failure 信号
  2. WCI 调用配置的 Compute Provider(目前支持 AWS Lambda)
  3. Lambda 函数启动,创建 Temporal Client,开始 poll 任务
  4. 任务处理完毕,Worker 优雅关闭

代码实现

# worker.py - Serverless Worker 的代码和传统 Worker 完全一样
import asyncio
from concurrent.futures import ThreadPoolExecutor
from temporalio.client import Client
from temporalio.worker import Worker
from my_activities import compose_greeting


async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="my-serverless-task-queue",
        activities=[compose_greeting],
        activity_executor=ThreadPoolExecutor(5),
    )
    print("Serverless Worker running...")
    await worker.run()


if __name__ == "__main__":
    asyncio.run(main())

关键点:你的 Worker 代码完全不需要改动。Serverless 的调度逻辑全由 Temporal 集群和 WCI 处理。你只需要在 Worker Deployment Version 上配置 Compute Provider。

生命周期管理

Serverless Worker 有三个生命周期阶段:

# Worker 生命周期:init → work → shutdown
#
# init:    建立 Client 连接
# work:    poll 任务、执行 Activity/Workflow
# shutdown: 等待 in-flight 任务完成 → 运行 shutdown hooks → 退出
#
# 关键参数调优:
# - Worker stop timeout > 最长 Activity 运行时间
# - Shutdown deadline buffer > stop timeout + shutdown hooks 时间
# - Invocation deadline > 最长 Activity 时间 + shutdown deadline buffer

与长驻 Worker 共存:Serverless Workers 可以和传统 Worker 共享同一个 Task Queue。Serverless Worker 只在长驻 Worker 处理不过来时被激活,相当于溢出容量。

⚠️ 注意:如果同一 Task Queue 上同时有 Serverless 和长驻 Worker,不要对长驻 Worker 启用动态扩缩容。两组 Worker 无法协调扩缩策略,会导致不必要的调用和不可预测的扩缩行为。

约束

约束说明
Activity 时长必须在 Compute Provider 限制内(Lambda 最大 15 分钟)
Workflow 时长无限制,Workflow 可以跨多次调用执行
Worker 代码使用标准 Temporal SDK + serverless 包
版本化必须启用 Worker Versioning

二、Standalone Activities:脱离 Workflow 的独立 Activity

传统模式中,Activity 必须从 Workflow 内部调用。Standalone Activities 打破了这个限制——你可以直接从 Client 启动一个 Activity,不需要 Workflow 编排。

什么时候用 Standalone Activities?

  • 简单任务:不需要多步编排,就执行一个操作
  • 批处理:数据处理、文件转换、报告生成
  • 微服务替代:用 Temporal 替代传统 RPC,获得重试和可观测性
  • 事件驱动:从外部系统触发一个可靠的原子操作

完整代码示例

# my_activity.py
from dataclasses import dataclass
from temporalio import activity


@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str


@activity.defn
def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"
# worker.py - 注册 Worker(和传统方式完全一样)
import asyncio
from concurrent.futures import ThreadPoolExecutor
from temporalio.client import Client
from temporalio.worker import Worker
from my_activity import compose_greeting


async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="my-standalone-activity-task-queue",
        activities=[compose_greeting],
        activity_executor=ThreadPoolExecutor(5),
    )
    print("Worker running...")
    await worker.run()


if __name__ == "__main__":
    asyncio.run(main())
# execute_activity.py - 直接从 Client 执行 Standalone Activity
import asyncio
from datetime import timedelta
from temporalio.client import Client
from my_activity import ComposeGreetingInput, compose_greeting


async def main():
    client = await Client.connect("localhost:7233")

    # 同步等待结果
    result = await client.execute_activity(
        compose_greeting,
        args=[ComposeGreetingInput("Hello", "World")],
        id="my-standalone-activity-id",
        task_queue="my-standalone-activity-task-queue",
        start_to_close_timeout=timedelta(seconds=10),
    )
    print(f"Activity result: {result}")


asyncio.run(main())
# start_activity.py - 异步启动,稍后获取结果
import asyncio
from datetime import timedelta
from temporalio.client import Client
from my_activity import ComposeGreetingInput, compose_greeting


async def main():
    client = await Client.connect("localhost:7233")

    # 启动 Activity,不等待
    handle = await client.start_activity(
        compose_greeting,
        args=[ComposeGreetingInput("Hello", "World")],
        id="my-standalone-activity-id",
        task_queue="my-standalone-activity-task-queue",
        start_to_close_timeout=timedelta(seconds=10),
    )

    # 做其他事情...
    print("Activity started, doing other work...")

    # 稍后获取结果
    result = await handle.result()
    print(f"Activity result: {result}")


asyncio.run(main())

列表与计数

# list_activities.py
import asyncio
from temporalio.client import Client


async def main():
    client = await Client.connect("localhost:7233")

    # 列出所有 Standalone Activities
    activities = client.list_activities(
        query="TaskQueue = 'my-standalone-activity-task-queue'",
    )
    async for info in activities:
        print(
            f"ActivityID: {info.activity_id}, "
            f"Type: {info.activity_type}, "
            f"Status: {info.status}"
        )


asyncio.run(main())
# count_activities.py
import asyncio
from temporalio.client import Client


async def main():
    client = await Client.connect("localhost:7233")

    resp = await client.count_activities(
        query="TaskQueue = 'my-standalone-activity-task-queue'",
    )
    print("Total activities:", resp.count)

    for group in resp.groups:
        print(f"Group {group.group_values}: {group.count}")


asyncio.run(main())

也可以用 CLI 操作:

# 执行 Standalone Activity
temporal activity execute \
  --type compose_greeting \
  --activity-id my-standalone-activity-id \
  --task-queue my-standalone-activity-task-queue \
  --start-to-close-timeout 10s \
  --input '{"greeting": "Hello", "name": "World"}'

# 列出所有 Standalone Activities
temporal activity list

# 计数
temporal activity count

# 获取结果
temporal activity result --activity-id my-standalone-activity-id

三、Workflow Streams:Workflow 的实时事件流

这是 Replay 2026 最具技术深度的更新。Workflow Streams 让 Workflow 变成了一个持久化的事件通道——外部观察者可以实时订阅 Workflow 的进度。

核心概念

Workflow Streams 的工作方式类似一个嵌在 Workflow 内部的 Kafka topic:

  • Publisher(Workflow 自身、Activity、外部进程)向 topic 发布事件
  • Subscriber(任何有 Temporal Client 的进程)通过 long-polling 订阅事件
  • 事件存储在 Workflow 状态中,天然持久化,天然可恢复
┌─────────────────────────────────────────────┐
│              Workflow (host)                │
│                                             │
│  ┌─────────────────────────────────────┐    │
│  │     In-memory append-only log       │    │
│  │                                     │    │
│  │  offset 0: (status, validating)    │    │
│  │  offset 1: (status, charging)      │    │
│  │  offset 2: (progress, 33%)         │    │
│  │  offset 3: (status, shipping)      │    │
│  │  offset 4: (progress, 66%)         │    │
│  │  offset 5: (status, completed)     │    │
│  └──────────────┬──────────────────────┘    │
│                 │                            │
│     Signal ←──┘  └──→ Update (long-poll)    │
│     (publish)        (subscribe)             │
│                 │                            │
└─────────────────┼────────────────────────────┘
                  │
    ┌─────────────┼─────────────┐
    │             │             │
    ▼             ▼             ▼
 Subscriber    Subscriber   Publisher
   #1            #2        (external)

代码实战:订单处理实时进度

from dataclasses import dataclass, field
from datetime import timedelta
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream


@dataclass
class StatusEvent:
    state: str
    progress: int = 0
    detail: str = ""


@dataclass
class OrderInput:
    order_id: str
    user_id: str
    amount: float


@workflow.defn
class OrderWorkflow:
    @workflow.init
    def __init__(self, input: OrderInput) -> None:
        # 在 init 中创建 stream(不能在 run 中创建)
        self.stream = WorkflowStream()
        self.status = self.stream.topic("status", type=StatusEvent)

    @workflow.run
    async def run(self, input: OrderInput) -> str:
        # 发布事件:开始验证
        self.status.publish(StatusEvent(
            state="validating",
            detail="checking inventory"
        ))
        await workflow.execute_activity(
            validate_order, input.order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        # 发布事件:开始扣款
        self.status.publish(StatusEvent(
            state="charging",
            progress=33,
            detail="authorizing payment"
        ))
        payment_id = await workflow.execute_activity(
            charge_payment,
            args=[input.user_id, input.amount],
            start_to_close_timeout=timedelta(seconds=60),
        )

        # 发布事件:开始发货
        self.status.publish(StatusEvent(
            state="shipping",
            progress=66,
            detail="dispatching to warehouse"
        ))
        tracking = await workflow.execute_activity(
            dispatch_order, input.order_id,
            start_to_close_timeout=timedelta(seconds=120),
        )

        # 发布事件:完成
        self.status.publish(StatusEvent(
            state="completed",
            progress=100
        ))

        # 等待订阅者确认收到终止事件
        try:
            await workflow.wait_condition(
                lambda: self.subscriber_done,
                timeout=timedelta(seconds=30),
            )
        except TimeoutError:
            pass  # 没有订阅者,正常退出

        return tracking

    @workflow.signal
    async def subscriber_acknowledged_terminator(self) -> None:
        self.subscriber_done = True

客户端订阅

from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient


async def watch_order(order_id: str) -> None:
    client = await Client.connect("localhost:7233")

    # 创建 stream client
    stream = WorkflowStreamClient.create(
        client,
        workflow_id=order_id,
    )

    # 订阅 status topic
    status = stream.topic("status", type=StatusEvent)
    async for item in status.subscribe():
        evt = item.data
        print(f"[{evt.progress:3d}%] {evt.state}: {evt.detail}")
        if evt.state == "completed":
            break

    # 检查最终状态
    desc = await client.get_workflow_handle(order_id).describe()
    print(f"Workflow status: {desc.status}")

从 Activity 发布事件

from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient


@activity.defn
async def stream_deltas(order_id: str) -> None:
    """Activity 内部直接发布事件到父 Workflow 的 stream"""
    client = WorkflowStreamClient.from_within_activity()
    async with client:
        deltas = client.topic("delta", type=Delta)
        for delta in generate_deltas(order_id):
            deltas.publish(delta)
            activity.heartbeat()  # 长时间 Activity 发心跳

        # force_flush 用于低延迟场景
        deltas.publish(Delta(final=True), force_flush=True)

异构 Topic 订阅

当多个 topic 有不同的 payload 类型时:

from temporalio.common import RawValue


async def watch_all_topics(order_id: str) -> None:
    client = await Client.connect("localhost:7233")
    stream = WorkflowStreamClient.create(client, workflow_id=order_id)
    converter = client.data_converter.payload_converter

    # 一次订阅多个 topic
    async for item in stream.subscribe(["status", "progress"], result_type=RawValue):
        if item.topic == "status":
            evt = converter.from_payload(item.data.payload, StatusEvent)
            print(f"[status] {evt.state}: {evt.detail}")
        elif item.topic == "progress":
            evt = converter.from_payload(item.data.payload, ProgressEvent)
            print(f"[progress] {evt.message}")

关键设计决策

精确一次发布:每个 (publisher_id, sequence) 批次最多落入日志一次,即使 Signal 被网络重试。去重状态跨 Continue-As-New 保留。

顺序保证:单个 publisher 内严格有序。跨 publisher 的顺序由 Workflow 接收 Signal 的顺序决定,一旦写入不可变。

Activity 重试的事件:当 Activity 失败重试时,两个尝试的事件都会出现在 stream 中。这是设计决策而非 bug——让订阅者看到完整的重试过程。

批处理与延迟:客户端默认 2 秒刷新一次缓冲区。force_flush=True 可以立即刷新(用于低延迟场景如第一个 LLM token)。await client.flush() 提供中间屏障保证。

长运行 Workflow 的 Continue-As-New

from dataclasses import dataclass, field
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamState


@dataclass
class AppState:
    items_processed: int = 0


@dataclass
class WorkflowInput:
    app_state: AppState = field(default_factory=AppState)
    stream_state: WorkflowStreamState | None = None  # 跨 rollover 携带


@workflow.defn
class LongRunningWorkflow:
    @workflow.init
    def __init__(self, input: WorkflowInput) -> None:
        self.app_state = input.app_state
        self.stream = WorkflowStream(prior_state=input.stream_state)

    @workflow.run
    async def run(self, input: WorkflowInput) -> None:
        while True:
            await do_one_iteration(self)

            # 历史过大时自动 Continue-As-New
            if workflow.info().is_continue_as_new_suggested():
                await self.stream.continue_as_new(
                    lambda stream_state: [
                        WorkflowInput(
                            app_state=self.app_state,
                            stream_state=stream_state,
                        )
                    ]
                )

四、AI Agent 集成:Google ADK 与 OpenAI Agents SDK

Temporal 在 Replay 2026 上宣布了与 Google ADK 和 OpenAI Agents SDK 的深度集成。这标志着 Temporal 正式进入 AI Agent 基础设施领域。

核心场景是让 AI Agent 具备「持久化执行」能力:

from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream


@dataclass
class AgentStep:
    thought: str
    action: str
    observation: str


@workflow.defn
class DurableAgentWorkflow:
    @workflow.init
    def __init__(self, input: AgentInput) -> None:
        self.stream = WorkflowStream()
        self.steps = self.stream.topic("agent_steps", type=AgentStep)
        self.user_messages = self.stream.topic("messages", type=UserMessage)

    @workflow.run
    async def run(self, input: AgentInput) -> str:
        # Agent 可以运行数小时甚至数天
        # Temporal 保证每一步都不会丢失
        while not self.task_complete:
            # 等待用户输入(Signal 驱动)
            await workflow.wait_condition(lambda: self.has_new_message)

            # 执行推理步骤
            step = await workflow.execute_activity(
                agent_reasoning,
                args=[self.conversation_history],
                start_to_close_timeout=timedelta(minutes=5),
            )

            # 实时流式输出到订阅者(前端 UI)
            self.steps.publish(step)

            if step.action == "tool_call":
                tool_result = await workflow.execute_activity(
                    execute_tool,
                    args=[step.action_params],
                    start_to_close_timeout=timedelta(minutes=10),
                )
                step.observation = tool_result
                self.steps.publish(step)

        return self.final_answer

    @workflow.signal
    async def send_message(self, message: str) -> None:
        """外部用户可以通过 Signal 发送消息"""
        self.has_new_message = True
        self.conversation_history.append({"role": "user", "content": message})

这就是 Temporal 在 AI 领域的核心价值:Agent 的每一步推理、每一次工具调用,都天生持久化、可恢复、可观测。 当 LLM API 超时、Agent 进程崩溃、模型提供商宕机时,Temporal 保证 Agent 从精确的断点恢复,不丢上下文,不重复工作。

生产级部署

自托管 vs Temporal Cloud

自托管(开源,MIT 协议):

# 开发环境:一行命令启动
temporal server start-dev

# 生产环境需要部署完整的 Cluster
# 组件:Frontend × 3, History × 3+, Matching × 3+
# 存储:Cassandra / PostgreSQL / MySQL

Temporal Cloud:全托管,按用量付费,提供 SLA 保证。

Worker 部署最佳实践

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from your_workflows import OrderWorkflow
from your_activities import validate_order, charge_payment, ship_order


async def run_worker():
    client = await Client.connect("temporal.example.com:7233")

    worker = Worker(
        client,
        task_queue="order-processing-task-queue",
        workflows=[OrderWorkflow],
        activities=[validate_order, charge_payment, ship_order],

        # 并发控制
        max_concurrent_workflow_tasks=100,
        max_concurrent_activities=50,
        max_concurrent_local_activities=200,

        # 优雅关闭
        graceful_shutdown_timeout=timedelta(seconds=30),

        # 限流
        workflow_task_poller_behavior=workflow.PollerBehavior(
            max_polls_per_second=10,
        ),

        # 遥测
        interceptors=[MyMetricsInterceptor()],
    )

    # 优雅关闭信号处理
    import signal
    loop = asyncio.get_running_loop()
    stop_event = asyncio.Event()

    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, stop_event.set)

    await worker.run(stop_event)


if __name__ == "__main__":
    asyncio.run(run_worker())

可观测性

Temporal 天生提供完整的可观测性——不需要额外 instrumentation:

# 查看 Workflow 执行历史
temporal workflow show --workflow-id my-order-123

# 查询 Workflow 状态
temporal workflow describe --workflow-id my-order-123

# 列出所有 Workflow
temporal workflow list --query 'WorkflowType = "OrderWorkflow" AND Status = "Running"'

# 终止 Workflow
temporal workflow terminate --workflow-id my-order-123 --reason "customer cancelled"

在 Workflow 内部,可以使用 workflow.unsafe.is_replaying() 来避免重放时重复发送指标:

@workflow.defn
class ObservableWorkflow:
    @workflow.run
    async def run(self, input: OrderInput) -> str:
        if not workflow.unsafe.is_replaying():
            # 只在首次执行时发送指标
            metrics.counter("workflow.started").inc()

        result = await workflow.execute_activity(process, input)

        if not workflow.unsafe.is_replaying():
            metrics.histogram("workflow.duration").observe(
                (workflow.now() - self.start_time).total_seconds()
            )

        return result

版本化与热升级

Temporal 的 Worker Versioning 让你可以零停机升级 Workflow 代码:

from temporalio.worker import Worker
from temporalio.workflow import VersioningBehavior


@workflow.defn
class OrderWorkflow:
    # AutoUpgrade: 新版本自动接管旧 Workflow(需要向后兼容)
    @workflow.init
    def __init__(self):
        self.version = "2.0"

    @workflow.run
    async def run(self, input: OrderInput) -> str:
        # 新版本代码
        pass


# Worker 配置
worker = Worker(
    client,
    task_queue="order-processing-v2",
    workflows=[OrderWorkflow],
    build_id="v2.0.0",  # 唯一构建 ID
    # Worker Versioning 配置
)

性能调优与最佳实践

1. Activity 粒度控制

Activity 太粗 → 失败重试代价大。太细 → 历史事件膨胀。

# ❌ 太粗:一个 Activity 做太多事
@activity.defn
async def process_everything(order: OrderInput) -> str:
    await validate(order)
    await charge(order)
    await ship(order)
    await notify(order)
    return "done"

# ❌ 太细:每次网络调用都是一个 Activity
@activity.defn
async def get_user(user_id: str) -> User: ...
@activity.defn
async def get_user_email(user_id: str) -> str: ...
@activity.defn
async def send_email(to: str, body: str) -> None: ...

# ✅ 合适:按业务边界划分
@activity.defn
async def validate_order(order: OrderInput) -> None: ...  # 验证相关
@activity.defn
async def charge_payment(user_id: str, amount: float) -> str: ...  # 支付相关
@activity.defn
async def ship_order(order_id: str, address: str) -> str: ...  # 物流相关

2. Local Activity 优化

对于耗时短、不需要单独重试的操作,使用 Local Activity 减少网络开销:

@workflow.defn
class OptimizedWorkflow:
    @workflow.run
    async def run(self, input: OrderInput) -> str:
        # Local Activity:在 Worker 进程内执行,不经过 Matching Service
        user = await workflow.execute_local_activity(
            get_user_from_cache,
            input.user_id,
            start_to_close_timeout=timedelta(seconds=5),
        )

        # Regular Activity:通过 Matching Service 调度,支持独立重试
        payment = await workflow.execute_activity(
            charge_payment,
            args=[user, input.amount],
            start_to_close_timeout=timedelta(seconds=60),
        )

        return payment

3. 历史事件控制

Workflow 历史不能无限增长。默认限制 50,000 个事件。

@workflow.defn
class BoundedHistoryWorkflow:
    @workflow.run
    async def run(self, input: BatchInput) -> dict:
        results = []

        for item in input.items:
            # ✅ 使用 Continue-As-New 避免历史膨胀
            if workflow.info().get_current_history_size() > 40000:
                workflow.continue_as_new(
                    args=[BatchInput(
                        items=input.items[len(results):],
                        results=results,
                    )]
                )

            result = await workflow.execute_activity(
                process_item, item,
                start_to_close_timeout=timedelta(seconds=30),
            )
            results.append(result)

        return {"processed": len(results)}

4. Saga 模式实现

Temporal 让 Saga 模式变得极其简洁——就是 try/catch:

@workflow.defn
class SagaWorkflow:
    @workflow.run
    async def run(self, input: BookingInput) -> str:
        try:
            # 正向操作
            flight = await workflow.execute_activity(
                book_flight, input.flight_details,
                start_to_close_timeout=timedelta(seconds=30),
            )
            hotel = await workflow.execute_activity(
                book_hotel, input.hotel_details,
                start_to_close_timeout=timedelta(seconds=30),
            )
            car = await workflow.execute_activity(
                book_car, input.car_details,
                start_to_close_timeout=timedelta(seconds=30),
            )
            return f"Booked: {flight}, {hotel}, {car}"

        except Exception:
            # 补偿操作(逆序)
            if 'car' in locals():
                await workflow.execute_activity(
                    cancel_car, car,
                    start_to_close_timeout=timedelta(seconds=30),
                )
            if 'hotel' in locals():
                await workflow.execute_activity(
                    cancel_hotel, hotel,
                    start_to_close_timeout=timedelta(seconds=30),
                )
            if 'flight' in locals():
                await workflow.execute_activity(
                    cancel_flight, flight,
                    start_to_close_timeout=timedelta(seconds=30),
                )
            raise

5. Child Workflow 隔离

复杂流程使用 Child Workflow 隔离故障域:

@workflow.defn
class ParentWorkflow:
    @workflow.run
    async def run(self, input: MultiOrderInput) -> dict:
        results = []

        # 并行启动多个 Child Workflow
        child_handles = []
        for order in input.orders:
            handle = await workflow.start_child_workflow(
                OrderWorkflow.run,
                order,
                # Child Workflow 有独立的历史和重试策略
                retry_policy=workflow.RetryPolicy(maximum_attempts=3),
            )
            child_handles.append(handle)

        # 等待所有 Child Workflow 完成
        for handle in child_handles:
            try:
                result = await handle
                results.append({"status": "ok", "result": result})
            except Exception as e:
                results.append({"status": "failed", "error": str(e)})

        return {"orders": results}

谁在使用 Temporal?

Temporal 的用户名单读起来像一份科技巨头名录:

  • NVIDIA:用 Temporal 管理跨云 GPU 集群调度
  • Salesforce:从单体架构迁移到 Temporal
  • Twilio:放弃自研系统,迁移到 Temporal Cloud
  • Descript:用 Temporal 提升 AI 运行时间
  • OpenAI:VP of App Infrastructure 亲自背书,称 Temporal 为 AI 系统的核心需求
  • Hashiorp:联合创始人 Mitchell Hashimoto 表示 Temporal 满足了他们所有需求

"Temporal 的技术开箱即用满足了我们所有需求。如果没有 Temporal,我们会花大量时间重新造一个更差的 Temporal。"
—— Mitchell Hashimoto, Co-founder, HashiCorp

总结与展望

Temporal 不是又一个工作流引擎。它是一个编程范式的转变——从「编写可靠代码很难」到「代码天生就可靠」。

2026 年 Replay 发布的四项更新,标志着 Temporal 从「分布式系统基础设施」向「全场景可靠执行平台」的演进:

  1. Serverless Workers 让你无需管理基础设施——Lambda 式的弹性扩缩容,Worker 在需要时启动,完成后关闭。这彻底改变了 Worker 的运维模式。

  2. Standalone Activities 打破了 Workflow 的编排束缚——简单的原子操作不需要包裹在 Workflow 中。这让 Temporal 可以替代传统的 RPC/消息队列场景。

  3. Workflow Streams 补齐了实时可观测性——不再需要轮询查询状态,外部系统可以实时订阅 Workflow 进度。这对 AI Agent 场景尤其重要——用户可以实时看到 Agent 的每一步推理。

  4. AI Agent 集成(Google ADK + OpenAI Agents SDK)明确了战略方向——Temporal 正在成为 AI Agent 的「可靠性层」。当 Agent 运行数小时甚至数天时,持久化执行不是 nice-to-have,而是 must-have。

从 Uber 的 Cadence 到今天的 Temporal,从简单的 Activity 重试到 Serverless Workers 和 Workflow Streams,这条路线告诉我们一个深刻的工程道理:

可靠性的最高境界,不是在代码里处理所有失败场景,而是让框架保证失败永远不会造成数据丢失。

当你不再需要写补偿逻辑、重试代码、状态追踪时,你才能真正专注于业务逻辑本身。这就是 Temporal 的核心价值——不是给你更多工具去处理失败,而是让失败不再需要被处理。


相关链接

推荐文章

使用 Go Embed
2024-11-19 02:54:20 +0800 CST
底部导航栏
2024-11-19 01:12:32 +0800 CST
Nginx 状态监控与日志分析
2024-11-19 09:36:18 +0800 CST
Nginx 反向代理 Redis 服务
2024-11-19 09:41:21 +0800 CST
Vue 中如何处理跨组件通信?
2024-11-17 15:59:54 +0800 CST
使用临时邮箱的重要性
2025-07-16 17:13:32 +0800 CST
Python上下文管理器:with语句
2024-11-19 06:25:31 +0800 CST
程序员茄子在线接单