Temporal Replay 2026 深度实战:当 Durable Execution 遇上 AI Agent——从持久执行范式到生产级多智能体编排的完全指南
一、引言:分布式系统的"不可能三角"与 Durable Execution 的破局
1.1 我们仍在用错误的方式构建分布式系统
2026年,分布式系统开发依然充满痛苦。每一个写过生产级微服务编排代码的工程师,都经历过这些噩梦般的场景:
# ❌ 传统方式:脆弱的分布式事务编排
async def process_order(order_id: str):
try:
order = await order_service.get(order_id)
payment = await payment_service.charge(order.amount)
await inventory_service.reserve(order.items)
await shipping_service.schedule(order.address)
await notification_service.send(order.user_id, "订单已确认")
except PaymentError:
await inventory_service.release(order.items) # 手动补偿
raise
except ShippingError:
await payment_service.refund(payment.id)
await inventory_service.release(order.items) # 手动补偿
raise
except Exception:
# 服务崩溃了?从头重试?所有进度都丢了
await send_alert("Order processing failed:", order_id)
raise
这段代码暗藏了分布式系统的三大不可能满足的需求:一致性(要么全部成功,要么全部回滚)、可用性(任何节点宕机服务不能停)、分区容错性(网络分区时依然正确工作)。CAP 定理告诉我们,你只能同时满足两个——但即便放弃 CAP,我们还是在写大量脆弱的"胶水代码"。
1.2 Temporal 的核心洞察:让失败成为设计的一部分
Temporal 的创始人(来自 Cadence 的 Uber 团队)提出了一个根本性不同的思路:与其试图避免失败,不如让失败变得可预测、可重试、且代价极低。
Durable Execution(持久执行)的核心思想是:
代码的执行状态被自动持久化。进程可以随时崩溃重启,但代码的执行进度永远不会丢失。
这意味着:
- 网络抖动?Temporal 自动重试,从上次成功的步骤继续
- 服务宕机?Workflow 在其他节点上无缝恢复
- 超时配置错误?Workflow 在任意时间点暂停和恢复
- 任意步骤失败?重试策略自动处理,无需手写补偿逻辑
1.3 Replay 2026:Temporal 的 AI Agent 时代宣言
2026年6月9日,Temporal 在其年度大会 Replay 2026 上发布了一系列重磅更新:
- Serverless Workers:将 Worker 运行时带到无服务器环境
- Standalone Activities:Activity 的独立部署能力
- Workflow Streams:支持流式输出的 Workflow
- Google ADK 集成:与 Google Agent Development Kit 的深度整合
- OpenAI Agents SDK 集成:原生支持 OpenAI 的 AI Agent 开发框架
这些更新标志着 Temporal 从一个"工作流引擎"进化为"AI Agent 基础设施层"的关键转折。
二、Durable Execution 核心原理解析
2.1 工作原理:Event Sourcing + 确定性重放
Temporal 的持久执行引擎建立在两个核心机制之上:Event Sourcing 和 确定性重放(Deterministic Replay)。
Event Sourcing:每一次状态变更都是事件
Temporal Service 不会保存"当前状态",而是保存"状态变更历史"——也就是 Event History(事件历史)。每一次 Workflow 代码执行中的副作用(调用 Activity、网络请求、等待信号等)都会被记录为一个事件。
EventHistory:
1. WorkflowStarted
2. ActivityScheduled(id=1, fn=process_payment)
3. ActivityCompleted(id=1, result=payment_id)
4. ActivityScheduled(id=2, fn=send_notification)
5. WorkflowCompleted
确定性重放:从头重建任意状态
当 Workflow 因为任何原因(进程崩溃、节点重启、负载均衡)需要恢复时,Temporal Service 会将整个 Event History 发送给 Worker。Worker 从头重放 Workflow 代码,每次遇到需要调用外部服务的点,就返回 Event History 中记录的已有结果。
关键点:Workflow 代码必须是确定性的。 同一个 Event History 无论重放多少次,必须产生完全相同的下一个事件。Temporal 通过严格的约束来保证这一点——不允许使用当前时间戳、随机数或外部状态作为分支条件。
// ✅ 正确的 Temporal Workflow 代码
func OrderWorkflow(ctx workflow.Context, orderID string) error {
// 确定性:通过 Activity 获取当前时间
var currentTime string
err := workflow.ExecuteActivity(ctx, getServerTime).Get(ctx, ¤tTime)
// 确定性:基于 Activity 结果而非系统时钟做分支
if currentTime < "12:00" {
err = workflow.ExecuteActivity(ctx, shipByMorning).Get(ctx, nil)
} else {
err = workflow.ExecuteActivity(ctx, shipByAfternoon).Get(ctx, nil)
}
return err
}
2.2 Workflow 与 Activity 的职责分离
Temporal 将所有操作分为两类:Workflow 和 Activity。
| 特性 | Workflow | Activity |
|---|---|---|
| 执行位置 | Worker 进程中 | 任意地方(Worker 或外部服务) |
| 状态持久化 | 完全持久化,可重放 | 不持久化,由 Workflow 协调 |
| 确定性要求 | 必须确定性 | 无要求 |
| 重试策略 | 自动(Workflow 级别重试) | Activity 级别独立重试 |
| 超时控制 | Workflow 超时 | Activity 超时、ScheduleToClose 超时 |
| 用途 | 编排逻辑、状态机 | 实际业务操作(DB、API 调用等) |
// Activity:实际业务操作(可以有副作用)
func ProcessPayment(ctx context.Context, amount float64) (string, error) {
// 真实的支付 API 调用
result, err := paymentGateway.Charge(ctx, amount)
return result.TransactionID, err
}
// Workflow:编排逻辑(必须是确定性的)
func PaymentWorkflow(ctx workflow.Context, amount float64) error {
// 编排两个 Activity,按顺序执行
var transactionID string
err := workflow.ExecuteActivity(ctx, ProcessPayment, amount).Get(ctx, &transactionID)
if err != nil {
return fmt.Errorf("payment failed: %w", err)
}
err = workflow.ExecuteActivity(ctx, RecordTransaction, transactionID).Get(ctx, nil)
return err
}
2.3 内置能力:重试、信号、查询、心跳
Temporal 提供了开箱即用的分布式协调能力:
// 重试策略
retryOpts := &workflow.ActivityOptions{
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
NonRetryableErrorTypes: []string{"InvalidInputError"},
},
}
// 信号(Signal):外部向运行中的 Workflow 发送消息
// 在 Workflow 端接收信号:
func OrderWorkflow(ctx workflow.Context) error {
var orderStatus string
signalChan := workflow.GetSignalChannel(ctx, "order-update")
selector := workflow.NewSelector(ctx)
selector.AddSignalChan(signalChan, func(channel workflow.ReceiveChannel, more bool) {
channel.Receive(ctx, &orderStatus)
})
selector.AddWorkflowTaskStarted(func() {
// 处理超时
})
selector.Select(ctx)
return nil
}
// 在外部发送信号:
await client.signalworkflow({
workflowId: "order-123",
signalName: "order-update",
args: ["shipped"]
})
三、Replay 2026 四大新特性深度解析
3.1 Serverless Workers:无运维的 Temporal 运行时
传统的 Temporal Worker 需要部署和维护一个长期运行的进程。对于有大量短时任务的应用,这意味着资源浪费和管理复杂性。
Serverless Workers 将 Temporal 的运行时打包为云函数(目前支持 AWS Lambda、Vercel Functions 等主流 FaaS 平台)。Workflow 的每个步骤按需触发,函数执行完成后自动释放资源。
// Serverless Worker 示例(TypeScript)
// 每个 Activity 作为独立的 Lambda 函数部署
import { defineActivity, getLambdaContext } from "@temporal/serverless";
// 定义一个 Activity
const processPayment = defineActivity(
async (input: { amount: number; userId: string }) => {
const ctx = getLambdaContext();
// Lambda 环境自动配置好了 Temporal 的连接
const paymentResult = await paymentGateway.charge(input.amount);
// 自动报告心跳(Lambda 超时前自动延长)
ctx.heartbeat({ stage: "completed", transactionId: paymentResult.id });
return paymentResult;
},
{
// Activity 配置
startToCloseTimeout: "30 seconds",
heartbeatTimeout: "10 seconds",
// Serverless 特有的配置
lambdaMemoryMB: 512,
lambdaTimeoutSeconds: 30,
}
);
// Workflow 仍然部署在传统的 Long-Running Worker 上
export const paymentWorkflow: Workflow<
{ amount: number; userId: string },
PaymentResult
> = {
async execute({ amount, userId }) {
const payment = await processPayment({ amount, userId });
await sendConfirmationEmail({ userId, transactionId: payment.id });
return payment;
},
};
实际意义:对于 AI Agent 场景,Serverless Workers 意味着每个 LLM 调用、每次向量检索、每条工具执行都可以作为独立的 Lambda 函数,按需扩缩容,零冷启动(通过 Provisioned Concurrency),成本从"常驻进程费"变成"按调用计费"。
3.2 Standalone Activities:Activity 的独立生命周期
传统 Temporal 架构中,Activity 和 Worker 进程紧密耦合。如果 Activity 需要更新代码,必须重启整个 Worker,甚至需要考虑运行中的 Activity 的兼容性问题。
Standalone Activities 将 Activity 的部署和执行完全解耦。每个 Activity 可以独立部署、独立版本化、独立回滚。
# Standalone Activity 示例(Python SDK)
from temporalio.activities import activity, stub
from temporalio.worker import ActivityRunner
# 定义一个独立的 Activity,不绑定到任何 Worker
@activity.defn(sstandalone=True, name="user-onboarding")
async def send_welcome_email(user_id: str) -> None:
# 这个 Activity 可以独立部署、独立版本化
await email_service.send(
to=await user_db.get_email(user_id),
template="welcome_v2", # Activity 代码可以热更新
)
# Activity Worker 可以独立部署这个 Activity
# python -m temporal_worker --activity send_welcome_email
# Workflow 调用方式完全不变
@workflow.defn
class OnboardingWorkflow:
@workflow.run
async def run(self, user_id: str) -> None:
await workflow.execute_activity(
"user-onboarding", # Activity 名称
user_id,
start_to_close_timeout=timedelta(minutes=1),
)
实际意义:在 AI Agent 场景中,不同的 Agent 组件(Planner、Searcher、Synthesizer)通常由不同团队维护。Standalone Activities 让每个组件可以独立发布、独立灰度,而不影响整个 Workflow 的稳定性。
3.3 Workflow Streams:流式输出支持
在 AI Agent 场景中,用户体验的关键是实时反馈。LLM 生成的内容应该实时流式传输给用户,而不是等待整个生成过程完成后一次性显示。
Workflow Streams 允许 Workflow 在执行过程中实时向外部推送数据变更。
// Workflow Streams 示例(Go SDK)
func ResearchWorkflow(ctx workflow.Context, query string) (string, error) {
// 创建 Stream,用于实时推送中间结果
stream, err := workflow.NewStream[string](ctx, "research-progress")
if err != nil {
return "", err
}
defer stream.Close()
// 阶段1:规划
await stream.Send("开始规划研究方案...")
plan := await planningAgent.Plan(query)
await stream.Send("规划完成,开始搜索...")
// 阶段2:搜索(并行)
searchResults := await parallelSearchAgent.Search(query, plan.queries)
await stream.Send(fmt.Sprintf("搜索完成,找到 %d 条结果", len(searchResults)))
// 阶段3:综合
await stream.Send("正在综合分析...")
report := await synthesisAgent.Synthesize(searchResults)
await stream.Send("报告生成完成")
return report, nil
}
// 客户端订阅 Stream
stream := await client.getWorkflowStream(wfID)
for await (const update of stream) {
// 实时更新 UI
ui.appendProgressMessage(update)
}
3.4 AI Agent SDK 集成:Durable Execution 作为 Agent 编排层
Replay 2026 最重要的更新是 Google ADK 和 OpenAI Agents SDK 的原生集成。这代表了 Temporal 在 AI Agent 架构中的战略定位:作为 Agent 编排和状态管理的底层基础设施。
Temporal × AI Agent 的深度整合架构
┌─────────────────────────────────────────────────────────────┐
│ AI Agent (LLM + Tools) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Planner │ │Searcher │ │ Browser │ │CodeGen │ ... │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼───────────┼───────────┼───────────┼────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Temporal Workflow (编排 + 持久化) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Event History (完整执行轨迹,任意时间点可重放) │ │
│ │ - Activity 调用记录 │ │
│ │ - LLM 响应历史 │ │
│ │ - 工具调用结果 │ │
│ │ - 决策分支 │ │
│ └──────────────────────────────────────────────────┘ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ 重试策略 │ │ 信号/查询 │ │ 超时/限流 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└───────────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Temporal Service (状态 + 协调) │
│ - Workflow State Persistence │
│ - Activity Scheduling & Routing │
│ - Namespaces & Task Queues │
│ - Visibility & Observability │
└─────────────────────────────────────────────────────────────┘
Google ADK 集成示例
# Google ADK × Temporal 集成
# 使用 Temporal 作为 ADK Agent 的持久化后端
from google.adk.agents import Agent
from temporalio.client import Client
import temporalio.workflow as twf
with twf.unsafe.sandbox_worker() as worker:
# ADK Agent 自动获得 Durable Execution 能力
research_agent = Agent(
model="gemini-2.5-pro",
name="deep_researcher",
tools=[web_search, code_executor, file_writer],
# Temporal 集成配置
temporal_config={
"workflow_type": "agent_execution",
"checkpoint_interval": 5, # 每5步checkpoint一次
"resume_on_failure": True,
}
)
# 每个 Agent 执行都是一个持久化的 Workflow
result = await research_agent.run(
"分析 2026 年 Q2 全球 AI 芯片市场份额",
workflow_id=f"research-{session_id}",
)
# 如果执行中断,可以无缝恢复
# result = await research_agent.resume("research-{session_id}")
OpenAI Agents SDK 集成示例
# OpenAI Agents SDK × Temporal
from agents import Agent, tool
from temporalio.client import Client
@tool
def search_web(query: str) -> str:
"""搜索互联网获取最新信息"""
return web_search(query)
researcher = Agent(
name="web_researcher",
model="gpt-4o",
tools=[search_web],
# Temporal Hints:提示 LLM 将此 Agent 编排交由 Temporal 管理
temporal_hint={
"activity_name": "web_researcher",
"timeout": "5 minutes",
"retry_policy": {"max_attempts": 3},
}
)
# OpenAI Agents SDK 会自动将这个 Agent 的执行包装为 Temporal Activity
# 结合 Temporal 的 Durable Execution,整个研究过程永不丢失
orchestrator = Agent(
name="research_orchestrator",
model="gpt-4o",
agents=[researcher],
handoffs=["researcher"], # Agent 间的交接也由 Temporal 管理
)
# 完整的 deep research pipeline:
# - 每个 LLM 调用 -> Temporal Activity
# - 每次工具使用 -> Temporal Activity
# - 整个执行过程 -> Temporal Workflow (完整轨迹可审计、可重放)
四、生产级代码实战:构建持久的 Deep Research Agent
4.1 架构设计
基于 Temporal + Braintrust 的 Deep Research 系统,使用多 Agent 管道架构:
用户查询
│
▼
┌─────────────────────┐
│ Planning Agent │ ← Temporal Activity
│ (拆解研究问题) │
└──────────┬──────────┘
│ (3-7个研究子问题)
▼
┌──────┴──────┐
│ 并行执行 │
┌───┴───┐ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐
│Query │ │Query │ │Query │ │Query │
│Agent 1│ │Agent 2│ │Agent 3│ │Agent N│ ← Temporal Activity
└───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────┐
│ Web Search Agent │ ← Temporal Activity
│ (并行搜索,容错处理) │
└──────────────────┬──────────────────────┘
│ (带置信度评分的搜索结果)
▼
┌─────────────────────────────────────────┐
│ Synthesis Agent │ ← Temporal Activity
│ (综合报告 + 置信度评估 + 引用) │
└──────────────────┬──────────────────────┘
│
▼
最终研究报告
4.2 完整实现代码
类型定义
from dataclasses import dataclass
from enum import Enum
from typing import Protocol, Any
import json
class AgentStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class ResearchQuery:
id: str
text: str
type: str # "factual", "news", "analysis", "case_study"
priority: int
@dataclass
class SearchResult:
query_id: str
url: str
title: str
snippet: str
relevance_score: float
authority_score: float # 权威性评分
@dataclass
class ResearchReport:
summary: str
sections: list[dict]
citations: list[dict]
confidence: float
follow_up_questions: list[str]
@dataclass
class WorkflowState:
plan: list[ResearchQuery]
search_results: dict[str, list[SearchResult]]
synthesis: ResearchReport | None
completed_queries: set[str]
failed_queries: set[str]
Activities 实现
import asyncio
from temporalio import activity
from openai import AsyncOpenAI
client = AsyncOpenAI()
@activity.defn
async def generate_research_plan(query: str) -> list[ResearchQuery]:
"""
Planning Agent:将用户查询拆解为 3-7 个研究子问题。
每个子问题标记类型(事实类、新闻类、分析类、案例类),
以便后续选择不同的检索策略。
"""
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """将研究问题拆解为 3-7 个子查询。
每个子查询必须包含:id、text、type。
type 可选值:factual(需要权威来源的事实)、news(最新新闻)、
analysis(深度分析)、case_study(具体案例)。"""},
{"role": "user", "content": query}
],
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
queries = []
for i, q in enumerate(result.get("queries", [])[:7]):
queries.append(ResearchQuery(
id=f"q_{i+1}",
text=q["text"],
type=q.get("type", "factual"),
priority=q.get("priority", 1),
))
return queries
@activity.defn
async def generate_search_queries(sub_query: ResearchQuery) -> list[str]:
"""
Query Generation Agent:将子问题转换为优化的搜索查询。
对于不同类型的查询使用不同策略:
- news: 加时间戳 + site:news
- factual: 精确关键词 + 权威来源
- case_study: 公司/产品名 + 案例
"""
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"""为以下{sub_query.type}类型的研究子问题
生成 2-3 个优化的搜索查询。每个查询应该是独立的、互补的。
类型说明:factual=事实类、news=新闻类、analysis=分析类、case_study=案例类"""},
{"role": "user", "content": sub_query.text}
],
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
return result.get("queries", [])[:3]
@activity.defn
async def execute_web_search(search_query: str) -> list[SearchResult]:
"""
Web Search Agent:执行搜索并评估结果质量。
关键设计:部分失败不导致整体失败,
每个搜索独立重试,整体 Pipeline 继续执行。
"""
# 使用搜索 API(这里用伪代码表示)
raw_results = await search_api.search(
query=search_query,
num_results=10,
include_snippets=True,
)
results = []
for item in raw_results:
results.append(SearchResult(
query_id="", # 后续填充
url=item["url"],
title=item["title"],
snippet=item["snippet"],
relevance_score=item.get("relevance", 0.5),
authority_score=await assess_authority(item["url"]),
))
# 按综合评分排序,只返回前5个
results.sort(key=lambda x: x.relevance_score * 0.4 + x.authority_score * 0.6, reverse=True)
return results[:5]
@activity.defn
async def synthesize_report(
query: str,
all_results: dict[str, list[SearchResult]],
) -> ResearchReport:
"""
Synthesis Agent:将所有搜索结果综合为结构化报告。
包含执行摘要、置信度评估、可信引用,
以及后续研究问题。
"""
# 将搜索结果格式化为 LLM 输入
context = format_search_results(all_results)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """你是深度研究分析师。根据提供的搜索结果,
生成一份结构化研究报告。报告必须包含:
1. executive_summary:300字以内的执行摘要
2. sections:每个主要发现一个章节,包含观点和证据
3. citations:每条引用必须包含 URL
4. confidence_score:0-1 的置信度评分,说明不确定性来源
5. follow_up_questions:3-5 个值得进一步研究的问题"""},
{"role": "user", "content": f"研究问题:{query}\n\n搜索结果:{context}"}
],
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
return ResearchReport(
summary=result["executive_summary"],
sections=result["sections"],
citations=result["citations"],
confidence=result["confidence_score"],
follow_up_questions=result["follow_up_questions"],
)
Workflow 实现(核心编排逻辑)
import asyncio
from temporalio import workflow
from datetime import timedelta
import logging
with workflow.unsafe.imports_passed_through():
from .activities import (
generate_research_plan, generate_search_queries,
execute_web_search, synthesize_report,
)
from .models import ResearchQuery, SearchResult, ResearchReport
@workflow.defn
class DeepResearchWorkflow:
"""
Deep Research Workflow 的核心编排逻辑。
使用 Temporal 的 Durable Execution 保证整个研究过程永不丢失。
"""
@workflow.run
async def run(self, query: str) -> ResearchReport:
logger = logging.getLogger("deep_research")
logger.info(f"Starting research for query: {query}")
# ===== 阶段1:规划 =====
logger.info("Phase 1: Generating research plan")
plan_queries = await workflow.execute_activity(
generate_research_plan,
query,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=workflow._wrap_retry(maximum_attempts=2), # Activity 重试
)
logger.info(f"Plan generated: {len(plan_queries)} sub-queries")
# ===== 阶段2:并行搜索(每个子问题独立) =====
logger.info("Phase 2: Executing parallel searches")
all_results: dict[str, list[SearchResult]] = {}
# 为每个子问题生成多个搜索查询
search_tasks = []
for sub_query in plan_queries:
# 动态生成搜索查询
queries = await workflow.execute_activity(
generate_search_queries,
sub_query,
start_to_close_timeout=timedelta(seconds=20),
)
# 为每个搜索查询创建并行任务
for sq in queries:
all_results[sq] = []
search_tasks.append(
self._search_with_fallback(sq, sub_query.id)
)
# 并行执行所有搜索,使用 asyncio.gather 收集结果
# 关键:即使部分搜索失败,整体流程继续
results = await asyncio.gather(*search_tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.warning(f"Search failed: {result}")
else:
query_text, sr_list = result
all_results[query_text] = sr_list
logger.info(f"Search completed: {sum(len(v) for v in all_results.values())} results")
# ===== 阶段3:综合报告 =====
logger.info("Phase 3: Synthesizing final report")
final_report = await workflow.execute_activity(
synthesize_report,
query,
all_results,
start_to_close_timeout=timedelta(minutes=2), # LLM 综合需要更长时间
retry_policy=workflow._wrap_retry(maximum_attempts=2),
)
logger.info(f"Report synthesized, confidence: {final_report.confidence}")
return final_report
async def _search_with_fallback(
self, search_query: str, query_id: str
) -> tuple[str, list[SearchResult]]:
"""
搜索任务包装器:支持重试和部分失败处理
"""
max_attempts = 3
for attempt in range(max_attempts):
try:
results = await workflow.execute_activity(
execute_web_search,
search_query,
start_to_close_timeout=timedelta(seconds=15),
# 每次重试间隔递增
retry_policy=workflow._wrap_retry(
maximum_attempts=1, # Activity 级别不重试,由 Workflow 级别处理
),
)
for r in results:
r.query_id = query_id
return search_query, results
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"Search permanently failed after {max_attempts} attempts: {search_query}")
return search_query, []
wait_seconds = 2 ** attempt
await asyncio.sleep(wait_seconds)
return search_query, []
4.3 为什么 AI Agent 需要 Durable Execution
多 Agent 系统的四大失败模式
在上面的 Deep Research Agent 中,如果没有 Temporal,会遇到以下问题:
失败模式1:LLM API 超时导致整条 Pipeline 报废
深度研究任务可能需要 3-30 分钟才能完成。使用推理模型(如 o3、GPT-4.1)时,单次 LLM 调用可能需要 30 秒到 3 分钟。如果超时设置太短,会陷入重试循环;如果设置太长,会阻塞整个系统。
Temporal 解决方案:每个 LLM 调用都是独立的 Activity。Planning Agent 超时只会重试 Planning,不会影响 Search 和 Synthesis 阶段。
失败模式2:部分搜索失败导致整体结果不完整
5 个并行搜索中,有 1 个失败是很常见的(网站限流、API 临时不可用等)。没有 Durable Execution,你需要自己处理部分成功的情况。
Temporal 解决方案:asyncio.gather(*tasks, return_exceptions=True) 收集所有结果,即使部分失败,整体 Pipeline 继续,失败的子问题被记录,但不污染其他结果。
失败模式3:非确定性输出导致调试地狱
LLM 的输出是非确定性的。当报告出现错误信息时,根因可能在 Planning、Query Generation、Search 或 Synthesis 的任何一个环节。
Temporal 解决方案:完整的 Event History 记录了每个 Agent 的输入、输出和决策分支。通过 Workflow History 回放,可以精确重现任何一个失败场景。
失败模式4:长时间 Pipeline 中途失败,之前的计算全部浪费
一个 30 分钟的研究任务在第 28 分钟失败了?没有 Durable Execution,你需要从头开始,前面 28 分钟全部浪费。
Temporal 解决方案:Workflow 从最近的成功 Activity 之后恢复,不需要从头开始。结合 Workflow Streams,客户端可以在执行过程中看到实时进度。
五、性能优化与生产最佳实践
5.1 Workflow 设计的性能陷阱
陷阱1:过大的 Event History
Temporal 的 Event History 大小直接影响重放性能。每个 Event 有大小限制,整个 Workflow 的 Event History 也有上限(通常 50MB)。
反例:在 Workflow 中处理大文件或长列表
// ❌ 错误:将大对象放在 Workflow 状态中
func BadWorkflow(ctx workflow.Context, filePath string) error {
content, _ := os.ReadFile(filePath)
// 整个文件内容被序列化到 Event History
lines := strings.Split(string(content), "\n")
// 如果文件有 100MB,这个 Workflow 的 Event History 也是 100MB+
var results []ProcessResult
for _, line := range lines {
result := workflow.ExecuteActivity(ctx, ProcessLine, line)
results = append(results, result) // 每个 Activity 结果也被序列化
}
return nil
}
正确做法:使用 Pagination + Activity 本地处理
// ✅ 正确:将大对象处理下推到 Activity 层
func GoodWorkflow(ctx workflow.Context, filePath string) error {
// 只传递文件路径(几字节),不传递文件内容
totalLines, err := workflow.ExecuteActivity(ctx, CountLines, filePath,
activity.StartToCloseTimeout(time.Minute),
).Get(ctx)
// 分页处理:每页 1000 行
pageSize := 1000
for page := 0; page < totalLines; page += pageSize {
// 每次只传递页码范围(几字节)
result, err := workflow.ExecuteActivity(ctx, ProcessLinePage,
LinePage{FilePath: filePath, Start: page, End: min(page+pageSize, totalLines)},
activity.StartToCloseTimeout(5*time.Minute),
).Get(ctx)
if err != nil {
return err
}
// 只存储汇总结果(几个字节)
workflow.UpsertTypedSearchAttributes(map[string]interface{}{
"ProcessedLines": page + pageSize,
})
}
return nil
}
陷阱2:Activity 之间的隐式依赖
如果两个 Activity 实际上可以并行,但你写成串行了,会浪费大量时间。
// ❌ 串行:浪费 200ms
err := workflow.ExecuteActivity(ctx, ActivityA).Get(ctx, nil)
err = workflow.ExecuteActivity(ctx, ActivityB).Get(ctx, nil)
// ✅ 并行:只需要 150ms
futA := workflow.ExecuteActivity(ctx, ActivityA)
futB := workflow.ExecuteActivity(ctx, ActivityB)
err = futA.Get(ctx, nil)
if err == nil {
err = futB.Get(ctx, nil)
}
5.2 长时 Agent 的任务队列设计
对于需要持续运行数小时的 AI Agent,推荐使用 Task Queue 隔离 来实现优雅的资源管理:
// 不同的 Agent 类型使用不同的 Task Queue
// 实现资源隔离和优先级控制
taskQueues := map[string]WorkerOptions{
"planner-queue": {
MaxConcurrentWorkflows: 10,
MaxConcurrentActivities: 50,
// Planning Agent 资源需求低
},
"llm-queue": {
MaxConcurrentWorkflows: 5,
MaxConcurrentActivities: 5,
// LLM 调用资源密集,需要严格限制并发
},
"search-queue": {
MaxConcurrentWorkflows: 50,
MaxConcurrentActivities: 200,
// 搜索相对轻量,可以高并发
},
"synthesis-queue": {
MaxConcurrentWorkflows: 10,
MaxConcurrentActivities: 10,
// 综合需要 LLM,资源需求高
},
}
5.3 可观测性:Workflow 级别的 Tracing
import opentelemetry.trace as otel
from temporalio import workflow as twf
tracer = otel.get_tracer("deep-research-agent")
@twf.defn
class TracedResearchWorkflow:
@twf.run
async def run(self, query: str) -> ResearchReport:
with tracer.start_as_current_span("deep_research") as span:
span.set_attribute("query", query)
span.set_attribute("workflow.type", "deep_research")
plan = await self._plan(query)
span.set_attribute("plan.size", len(plan))
results = await self._search(plan)
span.set_attribute("search.results", sum(len(r) for r in results.values()))
report = await self._synthesize(query, results)
span.set_attribute("report.confidence", report.confidence)
return report
六、与竞品的深度对比
6.1 Temporal vs AWS Step Functions
| 维度 | Temporal | AWS Step Functions |
|---|---|---|
| 编程模型 | 代码优先(Go/Python/TS) | 声明式(JSON/YAML) |
| 状态管理 | 内置完整 Event History | 需要外部存储(DynamoDB) |
| 重放能力 | 确定性重放,任意历史点恢复 | 不支持 |
| Activity 重试 | 细粒度策略 + backoff | 基础 retry 配置 |
| AI Agent 集成 | Google ADK + OpenAI SDK 原生支持 | 需要 Lambda 包装 |
| 定价模型 | 开源免费(Cloud 托管) | 按状态转换次数计费 |
| 调试工具 | Web UI + MCP + CLI | AWS Console |
核心差异:Step Functions 仍然是声明式工作流,适合流程固定、步骤少的场景。而 Temporal 的代码优先模型更适合 AI Agent 这种逻辑复杂、需要 LLM 判断分支的场景。
6.2 Temporal vs Camunda / Airflow
- Camunda:BPMN 流程引擎,适合业务流程管理(BPM),不是为长时间运行的任务设计
- Airflow:数据管道编排工具,适合 ETL/ML Pipeline,强调调度而非状态持久化
Temporal 的核心差异化定位:长时间运行、有 LLM 调用、需要可靠状态持久化的 AI Agent 编排。
七、总结与展望
7.1 Temporal 的战略价值
Replay 2026 标志着 Temporal 的战略重心发生了本质转变:
- 从"工作流引擎"到"AI Agent 基础设施":Serverless Workers + AI SDK 集成,让 Temporal 成为 AI Agent 的"操作系统"
- 从"开发者工具"到"平台服务":Standalone Activities + Workflow Streams,让非 Temporal 专家也能构建生产级 Agent
- 从"分布式事务"到"分布式智能":Deep Research Agent 展示了 Durable Execution 如何解决 AI Agent 的核心工程挑战
7.2 什么时候应该使用 Temporal
应该用 Temporal:
- AI Agent 开发(Deep Research、Code Agent、Data Agent)
- 长时间运行的任务(数分钟到数天)
- 需要可靠状态持久化的多步骤流程
- 需要完整执行轨迹审计的场景
- 多 Agent 协作编排
不需要 Temporal:
- 简单的同步请求-响应
- 固定流程、低复杂度的数据管道(用 Airflow)
- 纯业务 BPM 流程(用 Camunda)
7.3 2026 年下半年的 Temporal 路线图
根据 Replay 2026 透露的信息,Temporal 正在推进:
- 多云 Durable Execution:Workflow 在不同云厂商之间无缝迁移
- WASM 运行时:Activity 可以用 WASM 部署,零冷启动
- AI Native 调试器:基于 Workflow History 的 AI 辅助调试
- Compliance Mode:HIPAA、SOC2 合规的 Workflow 执行环境
Durable Execution 的范式正在从"处理分布式系统的失败"扩展到"处理 AI Agent 的不确定性"。这两者的共同点是:失败是常态,可恢复才是关键。Temporal 正是这一理念的最佳实践。
参考资源:
- Temporal 官方文档:https://docs.temporal.io
- Replay 2026 回顾:https://temporal.io/blog/replay-2026
- Temporal × Braintrust Deep Research:https://temporal.io/blog/how-to-build-deep-research-agents-using-temporal-and-braintrust
- Coinbase × Temporal MCP:https://temporal.io/blog/bringing-temporal-into-your-ai-editor-how-coinbase-debugs-workflows-with-mcp
- Temporal GitHub:https://github.com/temporalio