Temporal 深度实战:当「持久化执行」重塑分布式系统可靠性——从事件溯源到 Serverless Workers、从 Workflow Streams 到 AI Agent 编排的生产级完全指南(2026)
Replay 2026 刚刚发布了四项重磅更新:Serverless Workers、Standalone Activities、Workflow Streams,以及与 Google ADK 和 OpenAI Agents SDK 的深度集成。这不是一次普通的版本迭代——它在重新定义「代码可靠性」的边界。
背景:分布式系统的「可靠性税」
你一定写过这样的代码:
# ❌ 传统方式:手动编排,脆弱且难以维护
async def process_order(order_id: str):
try:
order = await order_service.get(order_id)
payment = await payment_service.charge(order.amount)
await inventory_service.reserve(order.items)
await shipping_service.schedule(order.address)
await notification_service.send(order.user_id, "订单已确认")
except PaymentError:
await inventory_service.release(order.items)
raise
except ShippingError:
await payment_service.refund(payment.id)
await inventory_service.release(order.items)
raise
这段代码有什么问题?
- 无重试机制:网络抖动直接失败
- 无持久化:进程崩溃后状态丢失
- 补偿逻辑复杂:每一步失败都需要手动回滚
- 无法查询状态:订单进行到哪一步了?不知道
- 无法暂停恢复:等待人工审核怎么办?
这是分布式系统的「可靠性税」——你必须为每个操作写重试、补偿、状态追踪逻辑。当一个业务流程涉及 5 个以上服务时,这些胶水代码会膨胀到让你怀疑人生。
Temporal 的创始人 Maxim Fateev 和 Sudhir Tonse,曾经在 AWS 负责 SQS 和 SWF,在 Uber 创建了 Cadence(Temporal 的前身)。他们花了 20 年时间回答一个核心问题:
如果代码的执行本身就是可靠的呢?
核心概念:持久化执行(Durable Execution)
Temporal 的核心思想极其简单,但实现却极其精妙:
把代码的每一步执行状态都持久化到事件日志中,任何崩溃都能从断点恢复。
这不是数据库的 ACID,也不是消息队列的 At-Least-Once。这是一种全新的编程范式——你写的代码,天生就是可靠的。
事件溯源:代码的「时间旅行」
Temporal 的底层是一个事件溯源系统。每一个 Workflow 执行,都会生成一个不可变的事件序列:
WorkflowExecutionStarted →
ActivityTaskScheduled → ActivityTaskStarted → ActivityTaskCompleted →
TimerStarted → TimerFired →
ActivityTaskScheduled → ActivityTaskStarted → ActivityTaskCompleted →
WorkflowExecutionCompleted
这些事件记录了 Workflow 执行的完整历史。当 Worker 崩溃后重启,Temporal 会重放(Replay)这些事件,将 Workflow 的状态恢复到崩溃前的精确位置,然后继续执行。
这意味着:
- 进程崩溃:自动从断点恢复,零数据丢失
- 网络抖动:自动重试,指数退避
- 部署更新:版本化 Worker 接管旧 Workflow
- 调试排查:完整的事件历史,可回放可追溯
架构总览
Temporal 的架构由四个核心组件构成:
┌──────────────────────────────────────────────────────────────┐
│ Temporal Cluster │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Frontend │ │ History │ │ Matching │ │
│ │ Service │ │ Service │ │ Service │ │
│ │ │ │ │ │ │ │
│ │ (API网关) │ │ (事件存储) │ │ (任务路由/匹配) │ │
│ └──────┬───────┘ └──────┬──────┘ └──────────┬───────────┘ │
│ │ │ │ │
│ └────────┬────────┘ │ │
│ │ │ │
│ ┌────────┴────────┐ │ │
│ │ Worker Service │ │ │
│ │ (系统Worker) │ │ │
│ └─────────────────┘ │ │
└──────────────────────────────────────────────────┼──────────────┘
│
┌──────────────┴──────────────┐
│ Your Workers │
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ Workflow │ │ Activity │ │
│ │ Worker │ │ Worker │ │
│ └─────────┘ └─────────┘ │
└──────────────────────────────┘
- Frontend Service:API 网关,处理所有客户端请求
- History Service:存储和检索 Workflow 执行历史(基于 Cassandra/PostgreSQL/MySQL)
- Matching Service:任务队列和调度,将任务路由到可用的 Worker
- Worker Service:系统内部 Worker,处理定时任务、系统 Workflow 等
你的应用代码作为 Worker 运行,通过 Long Polling 从 Matching Service 拉取任务。
核心编程模型:Workflow 与 Activity
Temporal 的编程模型只有两个核心抽象:Workflow 和 Activity。
Workflow:确定性的业务流程
Workflow 是业务流程的编排器。它必须是确定性的——同样的输入,无论重放多少次,都要走完全相同的路径。
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from your_activities import validate_order, charge_payment, ship_order
@dataclass
class OrderInput:
order_id: str
user_id: str
amount: float
items: list[str]
address: str
@workflow.defn(name="OrderProcessingWorkflow")
class OrderProcessingWorkflow:
@workflow.run
async def run(self, input: OrderInput) -> str:
# Step 1: 验证订单
await workflow.execute_activity(
validate_order,
input.order_id,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=workflow.RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
),
)
# Step 2: 扣款
payment_id = await workflow.execute_activity(
charge_payment,
args=[input.user_id, input.amount],
start_to_close_timeout=timedelta(seconds=60),
)
# Step 3: 发货
tracking = await workflow.execute_activity(
ship_order,
args=[input.order_id, input.address],
start_to_close_timeout=timedelta(seconds=120),
)
return tracking
为什么 Workflow 必须是确定性的? 因为 Temporal 通过重放事件历史来恢复状态。如果每次重放走了不同的路径,状态就不一致了。所以:
# ❌ 禁止:非确定性操作
import random
import time
import requests
@workflow.defn
class BadWorkflow:
@workflow.run
async def run(self, name: str) -> str:
value = random.randint(1, 100) # 每次重放结果不同
now = time.time() # 每次重放时间不同
resp = requests.get("https://api.example.com") # 网络I/O,不可重放
return f"{value}-{now}-{resp.text}"
# ✅ 正确:使用 Temporal 提供的确定性 API
@workflow.defn
class GoodWorkflow:
@workflow.run
async def run(self, name: str) -> str:
value = workflow.random().randint(1, 100) # 确定性随机数
now = workflow.now() # 确定性时间
resp = await workflow.execute_activity( # 网络I/O通过Activity
fetch_data, "https://api.example.com",
start_to_close_timeout=timedelta(seconds=10),
)
return f"{value}-{now}-{resp}"
Temporal SDK 提供了一系列 replay-safe 的替代 API:
| 禁止使用 | 替代方案 |
|---|---|
random.random() | workflow.random() |
uuid.uuid4() | workflow.uuid4() |
datetime.now() | workflow.now() |
print() / logging | workflow.logger |
time.sleep() | workflow.sleep() |
| 直接网络调用 | workflow.execute_activity() |
Activity:副作用与失败处理
Activity 是执行实际副作用的地方——网络调用、数据库操作、文件 I/O。Activity 不需要是确定性的,因为失败后就是重试,不需要重放。
from temporalio import activity
@activity.defn
async def charge_payment(user_id: str, amount: float) -> str:
# Activity 内部可以执行任何操作
# 网络调用、数据库操作、文件I/O 都可以
try:
response = await payment_client.charge(
user_id=user_id,
amount=amount,
currency="USD",
)
activity.logger.info(f"Payment charged: {response.id}")
return response.id
except PaymentDeclinedError as e:
# 业务异常:不需要重试
raise activity.ApplicationError(
"Payment declined",
non_retryable=True,
details={"user_id": user_id, "amount": amount},
)
except NetworkError as e:
# 网络异常:自动重试
raise # Temporal 会按照 RetryPolicy 自动重试
@activity.defn
async def process_large_dataset(dataset_url: str) -> dict:
"""长时间运行的 Activity 需要心跳"""
total = 0
processed = 0
async for batch in download_dataset(dataset_url):
result = await transform_batch(batch)
await save_batch(result)
total += len(result)
processed += 1
# 关键:长时间 Activity 必须发心跳
# 心跳让 Temporal 知道 Activity 还活着
# 心跳中可以携带进度信息,用于重试时恢复进度
activity.heartbeat(
progress=processed,
total_batches=total,
last_checkpoint=result.checkpoint_id,
)
return {"total_processed": total}
超时与重试策略
Temporal 提供了精细的超时控制,覆盖 Activity 生命周期的每个阶段:
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
@workflow.defn
class CheckoutWorkflow:
@workflow.run
async def run(self, order: OrderInput) -> str:
result = await workflow.execute_activity(
process_payment,
args=[order],
# 四层超时控制
schedule_to_start_timeout=timedelta(seconds=5), # 排队等待超时
start_to_close_timeout=timedelta(minutes=2), # 执行超时
schedule_to_close_timeout=timedelta(minutes=3), # 总超时
heartbeat_timeout=timedelta(seconds=30), # 心跳超时
# 重试策略
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=5,
# 非重试异常
non_retryable_error_types=[
"ApplicationError:PaymentDeclined",
"ApplicationError:InvalidOrder",
],
),
)
return result
四层超时的含义:
| 超时类型 | 含义 | 适用场景 |
|---|---|---|
schedule_to_start | 任务排队等待时间 | Worker 不足告警 |
start_to_close | Activity 执行时间 | 业务超时控制 |
schedule_to_close | 排队+执行总时间 | SLA 保障 |
heartbeat | 心跳间隔 | 长时间 Activity 检活 |
Replay 2026 重磅更新
2026 年的 Replay 大会发布了四项核心更新。让我们逐一深入。
一、Serverless Workers:零基础设施扩缩容
这是 Temporal 最具革命性的更新。传统 Worker 是一个长驻进程,你需要自己管理部署、扩缩容、健康检查。Serverless Workers 把这一切交给了 Temporal。
工作原理
┌──────────────────────┐
│ Temporal Cluster │
│ │
Task 提交 ──────► │ Matching Service │
│ │ │
│ sync match? │
│ ├─ Yes → 已有Worker处理
│ └─ No → 触发 WCI │
│ │
│ ┌──────────────┐ │
│ │ WCI (系统 │ │
│ │ Workflow) │ │
│ │ │ │
│ │ 调用 Lambda │ │
│ └──────┬───────┘ │
└──────────┼───────────┘
│
┌──────────▼───────────┐
│ AWS Lambda (Worker) │
│ │
│ 1. 启动 Worker │
│ 2. 连接 Temporal │
│ 3. 拉取任务 │
│ 4. 执行 │
│ 5. 优雅关闭 │
└───────────────────────┘
核心机制是 Worker Controller Instance (WCI)——一个系统 Workflow,运行在每个 Worker Deployment Version 上。当 Matching Service 发现没有可用的 Worker 时:
- WCI 收到 sync match failure 信号
- WCI 调用配置的 Compute Provider(目前支持 AWS Lambda)
- Lambda 函数启动,创建 Temporal Client,开始 poll 任务
- 任务处理完毕,Worker 优雅关闭
代码实现
# worker.py - Serverless Worker 的代码和传统 Worker 完全一样
import asyncio
from concurrent.futures import ThreadPoolExecutor
from temporalio.client import Client
from temporalio.worker import Worker
from my_activities import compose_greeting
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-serverless-task-queue",
activities=[compose_greeting],
activity_executor=ThreadPoolExecutor(5),
)
print("Serverless Worker running...")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
关键点:你的 Worker 代码完全不需要改动。Serverless 的调度逻辑全由 Temporal 集群和 WCI 处理。你只需要在 Worker Deployment Version 上配置 Compute Provider。
生命周期管理
Serverless Worker 有三个生命周期阶段:
# Worker 生命周期:init → work → shutdown
#
# init: 建立 Client 连接
# work: poll 任务、执行 Activity/Workflow
# shutdown: 等待 in-flight 任务完成 → 运行 shutdown hooks → 退出
#
# 关键参数调优:
# - Worker stop timeout > 最长 Activity 运行时间
# - Shutdown deadline buffer > stop timeout + shutdown hooks 时间
# - Invocation deadline > 最长 Activity 时间 + shutdown deadline buffer
与长驻 Worker 共存:Serverless Workers 可以和传统 Worker 共享同一个 Task Queue。Serverless Worker 只在长驻 Worker 处理不过来时被激活,相当于溢出容量。
⚠️ 注意:如果同一 Task Queue 上同时有 Serverless 和长驻 Worker,不要对长驻 Worker 启用动态扩缩容。两组 Worker 无法协调扩缩策略,会导致不必要的调用和不可预测的扩缩行为。
约束
| 约束 | 说明 |
|---|---|
| Activity 时长 | 必须在 Compute Provider 限制内(Lambda 最大 15 分钟) |
| Workflow 时长 | 无限制,Workflow 可以跨多次调用执行 |
| Worker 代码 | 使用标准 Temporal SDK + serverless 包 |
| 版本化 | 必须启用 Worker Versioning |
二、Standalone Activities:脱离 Workflow 的独立 Activity
传统模式中,Activity 必须从 Workflow 内部调用。Standalone Activities 打破了这个限制——你可以直接从 Client 启动一个 Activity,不需要 Workflow 编排。
什么时候用 Standalone Activities?
- 简单任务:不需要多步编排,就执行一个操作
- 批处理:数据处理、文件转换、报告生成
- 微服务替代:用 Temporal 替代传统 RPC,获得重试和可观测性
- 事件驱动:从外部系统触发一个可靠的原子操作
完整代码示例
# my_activity.py
from dataclasses import dataclass
from temporalio import activity
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@activity.defn
def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
# worker.py - 注册 Worker(和传统方式完全一样)
import asyncio
from concurrent.futures import ThreadPoolExecutor
from temporalio.client import Client
from temporalio.worker import Worker
from my_activity import compose_greeting
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-standalone-activity-task-queue",
activities=[compose_greeting],
activity_executor=ThreadPoolExecutor(5),
)
print("Worker running...")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
# execute_activity.py - 直接从 Client 执行 Standalone Activity
import asyncio
from datetime import timedelta
from temporalio.client import Client
from my_activity import ComposeGreetingInput, compose_greeting
async def main():
client = await Client.connect("localhost:7233")
# 同步等待结果
result = await client.execute_activity(
compose_greeting,
args=[ComposeGreetingInput("Hello", "World")],
id="my-standalone-activity-id",
task_queue="my-standalone-activity-task-queue",
start_to_close_timeout=timedelta(seconds=10),
)
print(f"Activity result: {result}")
asyncio.run(main())
# start_activity.py - 异步启动,稍后获取结果
import asyncio
from datetime import timedelta
from temporalio.client import Client
from my_activity import ComposeGreetingInput, compose_greeting
async def main():
client = await Client.connect("localhost:7233")
# 启动 Activity,不等待
handle = await client.start_activity(
compose_greeting,
args=[ComposeGreetingInput("Hello", "World")],
id="my-standalone-activity-id",
task_queue="my-standalone-activity-task-queue",
start_to_close_timeout=timedelta(seconds=10),
)
# 做其他事情...
print("Activity started, doing other work...")
# 稍后获取结果
result = await handle.result()
print(f"Activity result: {result}")
asyncio.run(main())
列表与计数
# list_activities.py
import asyncio
from temporalio.client import Client
async def main():
client = await Client.connect("localhost:7233")
# 列出所有 Standalone Activities
activities = client.list_activities(
query="TaskQueue = 'my-standalone-activity-task-queue'",
)
async for info in activities:
print(
f"ActivityID: {info.activity_id}, "
f"Type: {info.activity_type}, "
f"Status: {info.status}"
)
asyncio.run(main())
# count_activities.py
import asyncio
from temporalio.client import Client
async def main():
client = await Client.connect("localhost:7233")
resp = await client.count_activities(
query="TaskQueue = 'my-standalone-activity-task-queue'",
)
print("Total activities:", resp.count)
for group in resp.groups:
print(f"Group {group.group_values}: {group.count}")
asyncio.run(main())
也可以用 CLI 操作:
# 执行 Standalone Activity
temporal activity execute \
--type compose_greeting \
--activity-id my-standalone-activity-id \
--task-queue my-standalone-activity-task-queue \
--start-to-close-timeout 10s \
--input '{"greeting": "Hello", "name": "World"}'
# 列出所有 Standalone Activities
temporal activity list
# 计数
temporal activity count
# 获取结果
temporal activity result --activity-id my-standalone-activity-id
三、Workflow Streams:Workflow 的实时事件流
这是 Replay 2026 最具技术深度的更新。Workflow Streams 让 Workflow 变成了一个持久化的事件通道——外部观察者可以实时订阅 Workflow 的进度。
核心概念
Workflow Streams 的工作方式类似一个嵌在 Workflow 内部的 Kafka topic:
- Publisher(Workflow 自身、Activity、外部进程)向 topic 发布事件
- Subscriber(任何有 Temporal Client 的进程)通过 long-polling 订阅事件
- 事件存储在 Workflow 状态中,天然持久化,天然可恢复
┌─────────────────────────────────────────────┐
│ Workflow (host) │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ In-memory append-only log │ │
│ │ │ │
│ │ offset 0: (status, validating) │ │
│ │ offset 1: (status, charging) │ │
│ │ offset 2: (progress, 33%) │ │
│ │ offset 3: (status, shipping) │ │
│ │ offset 4: (progress, 66%) │ │
│ │ offset 5: (status, completed) │ │
│ └──────────────┬──────────────────────┘ │
│ │ │
│ Signal ←──┘ └──→ Update (long-poll) │
│ (publish) (subscribe) │
│ │ │
└─────────────────┼────────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
Subscriber Subscriber Publisher
#1 #2 (external)
代码实战:订单处理实时进度
from dataclasses import dataclass, field
from datetime import timedelta
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream
@dataclass
class StatusEvent:
state: str
progress: int = 0
detail: str = ""
@dataclass
class OrderInput:
order_id: str
user_id: str
amount: float
@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
# 在 init 中创建 stream(不能在 run 中创建)
self.stream = WorkflowStream()
self.status = self.stream.topic("status", type=StatusEvent)
@workflow.run
async def run(self, input: OrderInput) -> str:
# 发布事件:开始验证
self.status.publish(StatusEvent(
state="validating",
detail="checking inventory"
))
await workflow.execute_activity(
validate_order, input.order_id,
start_to_close_timeout=timedelta(seconds=30),
)
# 发布事件:开始扣款
self.status.publish(StatusEvent(
state="charging",
progress=33,
detail="authorizing payment"
))
payment_id = await workflow.execute_activity(
charge_payment,
args=[input.user_id, input.amount],
start_to_close_timeout=timedelta(seconds=60),
)
# 发布事件:开始发货
self.status.publish(StatusEvent(
state="shipping",
progress=66,
detail="dispatching to warehouse"
))
tracking = await workflow.execute_activity(
dispatch_order, input.order_id,
start_to_close_timeout=timedelta(seconds=120),
)
# 发布事件:完成
self.status.publish(StatusEvent(
state="completed",
progress=100
))
# 等待订阅者确认收到终止事件
try:
await workflow.wait_condition(
lambda: self.subscriber_done,
timeout=timedelta(seconds=30),
)
except TimeoutError:
pass # 没有订阅者,正常退出
return tracking
@workflow.signal
async def subscriber_acknowledged_terminator(self) -> None:
self.subscriber_done = True
客户端订阅
from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient
async def watch_order(order_id: str) -> None:
client = await Client.connect("localhost:7233")
# 创建 stream client
stream = WorkflowStreamClient.create(
client,
workflow_id=order_id,
)
# 订阅 status topic
status = stream.topic("status", type=StatusEvent)
async for item in status.subscribe():
evt = item.data
print(f"[{evt.progress:3d}%] {evt.state}: {evt.detail}")
if evt.state == "completed":
break
# 检查最终状态
desc = await client.get_workflow_handle(order_id).describe()
print(f"Workflow status: {desc.status}")
从 Activity 发布事件
from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient
@activity.defn
async def stream_deltas(order_id: str) -> None:
"""Activity 内部直接发布事件到父 Workflow 的 stream"""
client = WorkflowStreamClient.from_within_activity()
async with client:
deltas = client.topic("delta", type=Delta)
for delta in generate_deltas(order_id):
deltas.publish(delta)
activity.heartbeat() # 长时间 Activity 发心跳
# force_flush 用于低延迟场景
deltas.publish(Delta(final=True), force_flush=True)
异构 Topic 订阅
当多个 topic 有不同的 payload 类型时:
from temporalio.common import RawValue
async def watch_all_topics(order_id: str) -> None:
client = await Client.connect("localhost:7233")
stream = WorkflowStreamClient.create(client, workflow_id=order_id)
converter = client.data_converter.payload_converter
# 一次订阅多个 topic
async for item in stream.subscribe(["status", "progress"], result_type=RawValue):
if item.topic == "status":
evt = converter.from_payload(item.data.payload, StatusEvent)
print(f"[status] {evt.state}: {evt.detail}")
elif item.topic == "progress":
evt = converter.from_payload(item.data.payload, ProgressEvent)
print(f"[progress] {evt.message}")
关键设计决策
精确一次发布:每个 (publisher_id, sequence) 批次最多落入日志一次,即使 Signal 被网络重试。去重状态跨 Continue-As-New 保留。
顺序保证:单个 publisher 内严格有序。跨 publisher 的顺序由 Workflow 接收 Signal 的顺序决定,一旦写入不可变。
Activity 重试的事件:当 Activity 失败重试时,两个尝试的事件都会出现在 stream 中。这是设计决策而非 bug——让订阅者看到完整的重试过程。
批处理与延迟:客户端默认 2 秒刷新一次缓冲区。force_flush=True 可以立即刷新(用于低延迟场景如第一个 LLM token)。await client.flush() 提供中间屏障保证。
长运行 Workflow 的 Continue-As-New:
from dataclasses import dataclass, field
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamState
@dataclass
class AppState:
items_processed: int = 0
@dataclass
class WorkflowInput:
app_state: AppState = field(default_factory=AppState)
stream_state: WorkflowStreamState | None = None # 跨 rollover 携带
@workflow.defn
class LongRunningWorkflow:
@workflow.init
def __init__(self, input: WorkflowInput) -> None:
self.app_state = input.app_state
self.stream = WorkflowStream(prior_state=input.stream_state)
@workflow.run
async def run(self, input: WorkflowInput) -> None:
while True:
await do_one_iteration(self)
# 历史过大时自动 Continue-As-New
if workflow.info().is_continue_as_new_suggested():
await self.stream.continue_as_new(
lambda stream_state: [
WorkflowInput(
app_state=self.app_state,
stream_state=stream_state,
)
]
)
四、AI Agent 集成:Google ADK 与 OpenAI Agents SDK
Temporal 在 Replay 2026 上宣布了与 Google ADK 和 OpenAI Agents SDK 的深度集成。这标志着 Temporal 正式进入 AI Agent 基础设施领域。
核心场景是让 AI Agent 具备「持久化执行」能力:
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream
@dataclass
class AgentStep:
thought: str
action: str
observation: str
@workflow.defn
class DurableAgentWorkflow:
@workflow.init
def __init__(self, input: AgentInput) -> None:
self.stream = WorkflowStream()
self.steps = self.stream.topic("agent_steps", type=AgentStep)
self.user_messages = self.stream.topic("messages", type=UserMessage)
@workflow.run
async def run(self, input: AgentInput) -> str:
# Agent 可以运行数小时甚至数天
# Temporal 保证每一步都不会丢失
while not self.task_complete:
# 等待用户输入(Signal 驱动)
await workflow.wait_condition(lambda: self.has_new_message)
# 执行推理步骤
step = await workflow.execute_activity(
agent_reasoning,
args=[self.conversation_history],
start_to_close_timeout=timedelta(minutes=5),
)
# 实时流式输出到订阅者(前端 UI)
self.steps.publish(step)
if step.action == "tool_call":
tool_result = await workflow.execute_activity(
execute_tool,
args=[step.action_params],
start_to_close_timeout=timedelta(minutes=10),
)
step.observation = tool_result
self.steps.publish(step)
return self.final_answer
@workflow.signal
async def send_message(self, message: str) -> None:
"""外部用户可以通过 Signal 发送消息"""
self.has_new_message = True
self.conversation_history.append({"role": "user", "content": message})
这就是 Temporal 在 AI 领域的核心价值:Agent 的每一步推理、每一次工具调用,都天生持久化、可恢复、可观测。 当 LLM API 超时、Agent 进程崩溃、模型提供商宕机时,Temporal 保证 Agent 从精确的断点恢复,不丢上下文,不重复工作。
生产级部署
自托管 vs Temporal Cloud
自托管(开源,MIT 协议):
# 开发环境:一行命令启动
temporal server start-dev
# 生产环境需要部署完整的 Cluster
# 组件:Frontend × 3, History × 3+, Matching × 3+
# 存储:Cassandra / PostgreSQL / MySQL
Temporal Cloud:全托管,按用量付费,提供 SLA 保证。
Worker 部署最佳实践
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from your_workflows import OrderWorkflow
from your_activities import validate_order, charge_payment, ship_order
async def run_worker():
client = await Client.connect("temporal.example.com:7233")
worker = Worker(
client,
task_queue="order-processing-task-queue",
workflows=[OrderWorkflow],
activities=[validate_order, charge_payment, ship_order],
# 并发控制
max_concurrent_workflow_tasks=100,
max_concurrent_activities=50,
max_concurrent_local_activities=200,
# 优雅关闭
graceful_shutdown_timeout=timedelta(seconds=30),
# 限流
workflow_task_poller_behavior=workflow.PollerBehavior(
max_polls_per_second=10,
),
# 遥测
interceptors=[MyMetricsInterceptor()],
)
# 优雅关闭信号处理
import signal
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, stop_event.set)
await worker.run(stop_event)
if __name__ == "__main__":
asyncio.run(run_worker())
可观测性
Temporal 天生提供完整的可观测性——不需要额外 instrumentation:
# 查看 Workflow 执行历史
temporal workflow show --workflow-id my-order-123
# 查询 Workflow 状态
temporal workflow describe --workflow-id my-order-123
# 列出所有 Workflow
temporal workflow list --query 'WorkflowType = "OrderWorkflow" AND Status = "Running"'
# 终止 Workflow
temporal workflow terminate --workflow-id my-order-123 --reason "customer cancelled"
在 Workflow 内部,可以使用 workflow.unsafe.is_replaying() 来避免重放时重复发送指标:
@workflow.defn
class ObservableWorkflow:
@workflow.run
async def run(self, input: OrderInput) -> str:
if not workflow.unsafe.is_replaying():
# 只在首次执行时发送指标
metrics.counter("workflow.started").inc()
result = await workflow.execute_activity(process, input)
if not workflow.unsafe.is_replaying():
metrics.histogram("workflow.duration").observe(
(workflow.now() - self.start_time).total_seconds()
)
return result
版本化与热升级
Temporal 的 Worker Versioning 让你可以零停机升级 Workflow 代码:
from temporalio.worker import Worker
from temporalio.workflow import VersioningBehavior
@workflow.defn
class OrderWorkflow:
# AutoUpgrade: 新版本自动接管旧 Workflow(需要向后兼容)
@workflow.init
def __init__(self):
self.version = "2.0"
@workflow.run
async def run(self, input: OrderInput) -> str:
# 新版本代码
pass
# Worker 配置
worker = Worker(
client,
task_queue="order-processing-v2",
workflows=[OrderWorkflow],
build_id="v2.0.0", # 唯一构建 ID
# Worker Versioning 配置
)
性能调优与最佳实践
1. Activity 粒度控制
Activity 太粗 → 失败重试代价大。太细 → 历史事件膨胀。
# ❌ 太粗:一个 Activity 做太多事
@activity.defn
async def process_everything(order: OrderInput) -> str:
await validate(order)
await charge(order)
await ship(order)
await notify(order)
return "done"
# ❌ 太细:每次网络调用都是一个 Activity
@activity.defn
async def get_user(user_id: str) -> User: ...
@activity.defn
async def get_user_email(user_id: str) -> str: ...
@activity.defn
async def send_email(to: str, body: str) -> None: ...
# ✅ 合适:按业务边界划分
@activity.defn
async def validate_order(order: OrderInput) -> None: ... # 验证相关
@activity.defn
async def charge_payment(user_id: str, amount: float) -> str: ... # 支付相关
@activity.defn
async def ship_order(order_id: str, address: str) -> str: ... # 物流相关
2. Local Activity 优化
对于耗时短、不需要单独重试的操作,使用 Local Activity 减少网络开销:
@workflow.defn
class OptimizedWorkflow:
@workflow.run
async def run(self, input: OrderInput) -> str:
# Local Activity:在 Worker 进程内执行,不经过 Matching Service
user = await workflow.execute_local_activity(
get_user_from_cache,
input.user_id,
start_to_close_timeout=timedelta(seconds=5),
)
# Regular Activity:通过 Matching Service 调度,支持独立重试
payment = await workflow.execute_activity(
charge_payment,
args=[user, input.amount],
start_to_close_timeout=timedelta(seconds=60),
)
return payment
3. 历史事件控制
Workflow 历史不能无限增长。默认限制 50,000 个事件。
@workflow.defn
class BoundedHistoryWorkflow:
@workflow.run
async def run(self, input: BatchInput) -> dict:
results = []
for item in input.items:
# ✅ 使用 Continue-As-New 避免历史膨胀
if workflow.info().get_current_history_size() > 40000:
workflow.continue_as_new(
args=[BatchInput(
items=input.items[len(results):],
results=results,
)]
)
result = await workflow.execute_activity(
process_item, item,
start_to_close_timeout=timedelta(seconds=30),
)
results.append(result)
return {"processed": len(results)}
4. Saga 模式实现
Temporal 让 Saga 模式变得极其简洁——就是 try/catch:
@workflow.defn
class SagaWorkflow:
@workflow.run
async def run(self, input: BookingInput) -> str:
try:
# 正向操作
flight = await workflow.execute_activity(
book_flight, input.flight_details,
start_to_close_timeout=timedelta(seconds=30),
)
hotel = await workflow.execute_activity(
book_hotel, input.hotel_details,
start_to_close_timeout=timedelta(seconds=30),
)
car = await workflow.execute_activity(
book_car, input.car_details,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Booked: {flight}, {hotel}, {car}"
except Exception:
# 补偿操作(逆序)
if 'car' in locals():
await workflow.execute_activity(
cancel_car, car,
start_to_close_timeout=timedelta(seconds=30),
)
if 'hotel' in locals():
await workflow.execute_activity(
cancel_hotel, hotel,
start_to_close_timeout=timedelta(seconds=30),
)
if 'flight' in locals():
await workflow.execute_activity(
cancel_flight, flight,
start_to_close_timeout=timedelta(seconds=30),
)
raise
5. Child Workflow 隔离
复杂流程使用 Child Workflow 隔离故障域:
@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self, input: MultiOrderInput) -> dict:
results = []
# 并行启动多个 Child Workflow
child_handles = []
for order in input.orders:
handle = await workflow.start_child_workflow(
OrderWorkflow.run,
order,
# Child Workflow 有独立的历史和重试策略
retry_policy=workflow.RetryPolicy(maximum_attempts=3),
)
child_handles.append(handle)
# 等待所有 Child Workflow 完成
for handle in child_handles:
try:
result = await handle
results.append({"status": "ok", "result": result})
except Exception as e:
results.append({"status": "failed", "error": str(e)})
return {"orders": results}
谁在使用 Temporal?
Temporal 的用户名单读起来像一份科技巨头名录:
- NVIDIA:用 Temporal 管理跨云 GPU 集群调度
- Salesforce:从单体架构迁移到 Temporal
- Twilio:放弃自研系统,迁移到 Temporal Cloud
- Descript:用 Temporal 提升 AI 运行时间
- OpenAI:VP of App Infrastructure 亲自背书,称 Temporal 为 AI 系统的核心需求
- Hashiorp:联合创始人 Mitchell Hashimoto 表示 Temporal 满足了他们所有需求
"Temporal 的技术开箱即用满足了我们所有需求。如果没有 Temporal,我们会花大量时间重新造一个更差的 Temporal。"
—— Mitchell Hashimoto, Co-founder, HashiCorp
总结与展望
Temporal 不是又一个工作流引擎。它是一个编程范式的转变——从「编写可靠代码很难」到「代码天生就可靠」。
2026 年 Replay 发布的四项更新,标志着 Temporal 从「分布式系统基础设施」向「全场景可靠执行平台」的演进:
Serverless Workers 让你无需管理基础设施——Lambda 式的弹性扩缩容,Worker 在需要时启动,完成后关闭。这彻底改变了 Worker 的运维模式。
Standalone Activities 打破了 Workflow 的编排束缚——简单的原子操作不需要包裹在 Workflow 中。这让 Temporal 可以替代传统的 RPC/消息队列场景。
Workflow Streams 补齐了实时可观测性——不再需要轮询查询状态,外部系统可以实时订阅 Workflow 进度。这对 AI Agent 场景尤其重要——用户可以实时看到 Agent 的每一步推理。
AI Agent 集成(Google ADK + OpenAI Agents SDK)明确了战略方向——Temporal 正在成为 AI Agent 的「可靠性层」。当 Agent 运行数小时甚至数天时,持久化执行不是 nice-to-have,而是 must-have。
从 Uber 的 Cadence 到今天的 Temporal,从简单的 Activity 重试到 Serverless Workers 和 Workflow Streams,这条路线告诉我们一个深刻的工程道理:
可靠性的最高境界,不是在代码里处理所有失败场景,而是让框架保证失败永远不会造成数据丢失。
当你不再需要写补偿逻辑、重试代码、状态追踪时,你才能真正专注于业务逻辑本身。这就是 Temporal 的核心价值——不是给你更多工具去处理失败,而是让失败不再需要被处理。
相关链接: