编程 Temporal Replay 2026深度实战:当Durable Execution遇上AI Agent——从持久执行范式到生产级多智能体编排的完全指南

2026-06-10 15:21:36 +0800 CST views 6

Temporal Replay 2026 深度实战:当 Durable Execution 遇上 AI Agent——从持久执行范式到生产级多智能体编排的完全指南

一、引言:分布式系统的"不可能三角"与 Durable Execution 的破局

1.1 我们仍在用错误的方式构建分布式系统

2026年,分布式系统开发依然充满痛苦。每一个写过生产级微服务编排代码的工程师,都经历过这些噩梦般的场景:

# ❌ 传统方式:脆弱的分布式事务编排
async def process_order(order_id: str):
    try:
        order = await order_service.get(order_id)
        payment = await payment_service.charge(order.amount)
        await inventory_service.reserve(order.items)
        await shipping_service.schedule(order.address)
        await notification_service.send(order.user_id, "订单已确认")
    except PaymentError:
        await inventory_service.release(order.items)  # 手动补偿
        raise
    except ShippingError:
        await payment_service.refund(payment.id)
        await inventory_service.release(order.items)  # 手动补偿
        raise
    except Exception:
        # 服务崩溃了?从头重试?所有进度都丢了
        await send_alert("Order processing failed:", order_id)
        raise

这段代码暗藏了分布式系统的三大不可能满足的需求:一致性(要么全部成功,要么全部回滚)、可用性(任何节点宕机服务不能停)、分区容错性(网络分区时依然正确工作)。CAP 定理告诉我们,你只能同时满足两个——但即便放弃 CAP,我们还是在写大量脆弱的"胶水代码"。

1.2 Temporal 的核心洞察:让失败成为设计的一部分

Temporal 的创始人(来自 Cadence 的 Uber 团队)提出了一个根本性不同的思路:与其试图避免失败,不如让失败变得可预测、可重试、且代价极低

Durable Execution(持久执行)的核心思想是:

代码的执行状态被自动持久化。进程可以随时崩溃重启,但代码的执行进度永远不会丢失。

这意味着:

  • 网络抖动?Temporal 自动重试,从上次成功的步骤继续
  • 服务宕机?Workflow 在其他节点上无缝恢复
  • 超时配置错误?Workflow 在任意时间点暂停和恢复
  • 任意步骤失败?重试策略自动处理,无需手写补偿逻辑

1.3 Replay 2026:Temporal 的 AI Agent 时代宣言

2026年6月9日,Temporal 在其年度大会 Replay 2026 上发布了一系列重磅更新:

  • Serverless Workers:将 Worker 运行时带到无服务器环境
  • Standalone Activities:Activity 的独立部署能力
  • Workflow Streams:支持流式输出的 Workflow
  • Google ADK 集成:与 Google Agent Development Kit 的深度整合
  • OpenAI Agents SDK 集成:原生支持 OpenAI 的 AI Agent 开发框架

这些更新标志着 Temporal 从一个"工作流引擎"进化为"AI Agent 基础设施层"的关键转折。

二、Durable Execution 核心原理解析

2.1 工作原理:Event Sourcing + 确定性重放

Temporal 的持久执行引擎建立在两个核心机制之上:Event Sourcing确定性重放(Deterministic Replay)

Event Sourcing:每一次状态变更都是事件

Temporal Service 不会保存"当前状态",而是保存"状态变更历史"——也就是 Event History(事件历史)。每一次 Workflow 代码执行中的副作用(调用 Activity、网络请求、等待信号等)都会被记录为一个事件。

EventHistory:
  1. WorkflowStarted
  2. ActivityScheduled(id=1, fn=process_payment)
  3. ActivityCompleted(id=1, result=payment_id)
  4. ActivityScheduled(id=2, fn=send_notification)
  5. WorkflowCompleted

确定性重放:从头重建任意状态

当 Workflow 因为任何原因(进程崩溃、节点重启、负载均衡)需要恢复时,Temporal Service 会将整个 Event History 发送给 Worker。Worker 从头重放 Workflow 代码,每次遇到需要调用外部服务的点,就返回 Event History 中记录的已有结果。

关键点:Workflow 代码必须是确定性的。 同一个 Event History 无论重放多少次,必须产生完全相同的下一个事件。Temporal 通过严格的约束来保证这一点——不允许使用当前时间戳、随机数或外部状态作为分支条件。

// ✅ 正确的 Temporal Workflow 代码
func OrderWorkflow(ctx workflow.Context, orderID string) error {
    // 确定性:通过 Activity 获取当前时间
    var currentTime string
    err := workflow.ExecuteActivity(ctx, getServerTime).Get(ctx, &currentTime)
    
    // 确定性:基于 Activity 结果而非系统时钟做分支
    if currentTime < "12:00" {
        err = workflow.ExecuteActivity(ctx, shipByMorning).Get(ctx, nil)
    } else {
        err = workflow.ExecuteActivity(ctx, shipByAfternoon).Get(ctx, nil)
    }
    return err
}

2.2 Workflow 与 Activity 的职责分离

Temporal 将所有操作分为两类:WorkflowActivity

特性WorkflowActivity
执行位置Worker 进程中任意地方(Worker 或外部服务)
状态持久化完全持久化,可重放不持久化,由 Workflow 协调
确定性要求必须确定性无要求
重试策略自动(Workflow 级别重试)Activity 级别独立重试
超时控制Workflow 超时Activity 超时、ScheduleToClose 超时
用途编排逻辑、状态机实际业务操作(DB、API 调用等)
// Activity:实际业务操作(可以有副作用)
func ProcessPayment(ctx context.Context, amount float64) (string, error) {
    // 真实的支付 API 调用
    result, err := paymentGateway.Charge(ctx, amount)
    return result.TransactionID, err
}

// Workflow:编排逻辑(必须是确定性的)
func PaymentWorkflow(ctx workflow.Context, amount float64) error {
    // 编排两个 Activity,按顺序执行
    var transactionID string
    err := workflow.ExecuteActivity(ctx, ProcessPayment, amount).Get(ctx, &transactionID)
    if err != nil {
        return fmt.Errorf("payment failed: %w", err)
    }
    
    err = workflow.ExecuteActivity(ctx, RecordTransaction, transactionID).Get(ctx, nil)
    return err
}

2.3 内置能力:重试、信号、查询、心跳

Temporal 提供了开箱即用的分布式协调能力:

// 重试策略
retryOpts := &workflow.ActivityOptions{
    RetryPolicy: &temporal.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval:    time.Minute,
        MaximumAttempts:    5,
        NonRetryableErrorTypes: []string{"InvalidInputError"},
    },
}

// 信号(Signal):外部向运行中的 Workflow 发送消息
// 在 Workflow 端接收信号:
func OrderWorkflow(ctx workflow.Context) error {
    var orderStatus string
    signalChan := workflow.GetSignalChannel(ctx, "order-update")
    
    selector := workflow.NewSelector(ctx)
    selector.AddSignalChan(signalChan, func(channel workflow.ReceiveChannel, more bool) {
        channel.Receive(ctx, &orderStatus)
    })
    selector.AddWorkflowTaskStarted(func() {
        // 处理超时
    })
    selector.Select(ctx)
    
    return nil
}

// 在外部发送信号:
await client.signalworkflow({
    workflowId: "order-123",
    signalName: "order-update",
    args: ["shipped"]
})

三、Replay 2026 四大新特性深度解析

3.1 Serverless Workers:无运维的 Temporal 运行时

传统的 Temporal Worker 需要部署和维护一个长期运行的进程。对于有大量短时任务的应用,这意味着资源浪费和管理复杂性。

Serverless Workers 将 Temporal 的运行时打包为云函数(目前支持 AWS Lambda、Vercel Functions 等主流 FaaS 平台)。Workflow 的每个步骤按需触发,函数执行完成后自动释放资源。

// Serverless Worker 示例(TypeScript)
// 每个 Activity 作为独立的 Lambda 函数部署
import { defineActivity, getLambdaContext } from "@temporal/serverless";

// 定义一个 Activity
const processPayment = defineActivity(
  async (input: { amount: number; userId: string }) => {
    const ctx = getLambdaContext();
    
    // Lambda 环境自动配置好了 Temporal 的连接
    const paymentResult = await paymentGateway.charge(input.amount);
    
    // 自动报告心跳(Lambda 超时前自动延长)
    ctx.heartbeat({ stage: "completed", transactionId: paymentResult.id });
    
    return paymentResult;
  },
  {
    // Activity 配置
    startToCloseTimeout: "30 seconds",
    heartbeatTimeout: "10 seconds",
    // Serverless 特有的配置
    lambdaMemoryMB: 512,
    lambdaTimeoutSeconds: 30,
  }
);

// Workflow 仍然部署在传统的 Long-Running Worker 上
export const paymentWorkflow: Workflow<
  { amount: number; userId: string },
  PaymentResult
> = {
  async execute({ amount, userId }) {
    const payment = await processPayment({ amount, userId });
    await sendConfirmationEmail({ userId, transactionId: payment.id });
    return payment;
  },
};

实际意义:对于 AI Agent 场景,Serverless Workers 意味着每个 LLM 调用、每次向量检索、每条工具执行都可以作为独立的 Lambda 函数,按需扩缩容,零冷启动(通过 Provisioned Concurrency),成本从"常驻进程费"变成"按调用计费"。

3.2 Standalone Activities:Activity 的独立生命周期

传统 Temporal 架构中,Activity 和 Worker 进程紧密耦合。如果 Activity 需要更新代码,必须重启整个 Worker,甚至需要考虑运行中的 Activity 的兼容性问题。

Standalone Activities 将 Activity 的部署和执行完全解耦。每个 Activity 可以独立部署、独立版本化、独立回滚。

# Standalone Activity 示例(Python SDK)
from temporalio.activities import activity, stub
from temporalio.worker import ActivityRunner

# 定义一个独立的 Activity,不绑定到任何 Worker
@activity.defn(sstandalone=True, name="user-onboarding")
async def send_welcome_email(user_id: str) -> None:
    # 这个 Activity 可以独立部署、独立版本化
    await email_service.send(
        to=await user_db.get_email(user_id),
        template="welcome_v2",  # Activity 代码可以热更新
    )

# Activity Worker 可以独立部署这个 Activity
# python -m temporal_worker --activity send_welcome_email

# Workflow 调用方式完全不变
@workflow.defn
class OnboardingWorkflow:
    @workflow.run
    async def run(self, user_id: str) -> None:
        await workflow.execute_activity(
            "user-onboarding",  # Activity 名称
            user_id,
            start_to_close_timeout=timedelta(minutes=1),
        )

实际意义:在 AI Agent 场景中,不同的 Agent 组件(Planner、Searcher、Synthesizer)通常由不同团队维护。Standalone Activities 让每个组件可以独立发布、独立灰度,而不影响整个 Workflow 的稳定性。

3.3 Workflow Streams:流式输出支持

在 AI Agent 场景中,用户体验的关键是实时反馈。LLM 生成的内容应该实时流式传输给用户,而不是等待整个生成过程完成后一次性显示。

Workflow Streams 允许 Workflow 在执行过程中实时向外部推送数据变更。

// Workflow Streams 示例(Go SDK)
func ResearchWorkflow(ctx workflow.Context, query string) (string, error) {
    // 创建 Stream,用于实时推送中间结果
    stream, err := workflow.NewStream[string](ctx, "research-progress")
    if err != nil {
        return "", err
    }
    defer stream.Close()
    
    // 阶段1:规划
    await stream.Send("开始规划研究方案...")
    plan := await planningAgent.Plan(query)
    await stream.Send("规划完成,开始搜索...")
    
    // 阶段2:搜索(并行)
    searchResults := await parallelSearchAgent.Search(query, plan.queries)
    await stream.Send(fmt.Sprintf("搜索完成,找到 %d 条结果", len(searchResults)))
    
    // 阶段3:综合
    await stream.Send("正在综合分析...")
    report := await synthesisAgent.Synthesize(searchResults)
    
    await stream.Send("报告生成完成")
    return report, nil
}

// 客户端订阅 Stream
stream := await client.getWorkflowStream(wfID)
for await (const update of stream) {
    // 实时更新 UI
    ui.appendProgressMessage(update)
}

3.4 AI Agent SDK 集成:Durable Execution 作为 Agent 编排层

Replay 2026 最重要的更新是 Google ADKOpenAI Agents SDK 的原生集成。这代表了 Temporal 在 AI Agent 架构中的战略定位:作为 Agent 编排和状态管理的底层基础设施

Temporal × AI Agent 的深度整合架构

┌─────────────────────────────────────────────────────────────┐
│                    AI Agent (LLM + Tools)                   │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐          │
│  │ Planner │ │Searcher │ │ Browser │ │CodeGen  │  ...     │
│  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘          │
└───────┼───────────┼───────────┼───────────┼────────────────┘
        │           │           │           │
        ▼           ▼           ▼           ▼
┌─────────────────────────────────────────────────────────────┐
│           Temporal Workflow (编排 + 持久化)                 │
│  ┌──────────────────────────────────────────────────┐      │
│  │  Event History (完整执行轨迹,任意时间点可重放)   │      │
│  │  - Activity 调用记录                             │      │
│  │  - LLM 响应历史                                   │      │
│  │  - 工具调用结果                                   │      │
│  │  - 决策分支                                       │      │
│  └──────────────────────────────────────────────────┘      │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐            │
│  │ 重试策略   │  │ 信号/查询   │  │ 超时/限流  │            │
│  └────────────┘  └────────────┘  └────────────┘            │
└───────────────────────────┬─────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│              Temporal Service (状态 + 协调)                  │
│  - Workflow State Persistence                               │
│  - Activity Scheduling & Routing                           │
│  - Namespaces & Task Queues                               │
│  - Visibility & Observability                              │
└─────────────────────────────────────────────────────────────┘

Google ADK 集成示例

# Google ADK × Temporal 集成
# 使用 Temporal 作为 ADK Agent 的持久化后端
from google.adk.agents import Agent
from temporalio.client import Client
import temporalio.workflow as twf

with twf.unsafe.sandbox_worker() as worker:
    # ADK Agent 自动获得 Durable Execution 能力
    research_agent = Agent(
        model="gemini-2.5-pro",
        name="deep_researcher",
        tools=[web_search, code_executor, file_writer],
        # Temporal 集成配置
        temporal_config={
            "workflow_type": "agent_execution",
            "checkpoint_interval": 5,  # 每5步checkpoint一次
            "resume_on_failure": True,
        }
    )
    
    # 每个 Agent 执行都是一个持久化的 Workflow
    result = await research_agent.run(
        "分析 2026 年 Q2 全球 AI 芯片市场份额",
        workflow_id=f"research-{session_id}",
    )
    
    # 如果执行中断,可以无缝恢复
    # result = await research_agent.resume("research-{session_id}")

OpenAI Agents SDK 集成示例

# OpenAI Agents SDK × Temporal
from agents import Agent, tool
from temporalio.client import Client

@tool
def search_web(query: str) -> str:
    """搜索互联网获取最新信息"""
    return web_search(query)

researcher = Agent(
    name="web_researcher",
    model="gpt-4o",
    tools=[search_web],
    # Temporal Hints:提示 LLM 将此 Agent 编排交由 Temporal 管理
    temporal_hint={
        "activity_name": "web_researcher",
        "timeout": "5 minutes",
        "retry_policy": {"max_attempts": 3},
    }
)

# OpenAI Agents SDK 会自动将这个 Agent 的执行包装为 Temporal Activity
# 结合 Temporal 的 Durable Execution,整个研究过程永不丢失
orchestrator = Agent(
    name="research_orchestrator",
    model="gpt-4o",
    agents=[researcher],
    handoffs=["researcher"],  # Agent 间的交接也由 Temporal 管理
)

# 完整的 deep research pipeline:
# - 每个 LLM 调用 -> Temporal Activity
# - 每次工具使用 -> Temporal Activity  
# - 整个执行过程 -> Temporal Workflow (完整轨迹可审计、可重放)

四、生产级代码实战:构建持久的 Deep Research Agent

4.1 架构设计

基于 Temporal + Braintrust 的 Deep Research 系统,使用多 Agent 管道架构:

用户查询
    │
    ▼
┌─────────────────────┐
│   Planning Agent    │  ← Temporal Activity
│  (拆解研究问题)      │
└──────────┬──────────┘
           │ (3-7个研究子问题)
           ▼
    ┌──────┴──────┐
    │  并行执行   │
┌───┴───┐ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐
│Query  │ │Query  │ │Query  │ │Query  │
│Agent 1│ │Agent 2│ │Agent 3│ │Agent N│  ← Temporal Activity
└───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘
    │         │         │         │
    ▼         ▼         ▼         ▼
┌─────────────────────────────────────────┐
│          Web Search Agent               │  ← Temporal Activity
│     (并行搜索,容错处理)                │
└──────────────────┬──────────────────────┘
                   │ (带置信度评分的搜索结果)
                   ▼
┌─────────────────────────────────────────┐
│         Synthesis Agent                 │  ← Temporal Activity
│   (综合报告 + 置信度评估 + 引用)         │
└──────────────────┬──────────────────────┘
                   │
                   ▼
              最终研究报告

4.2 完整实现代码

类型定义

from dataclasses import dataclass
from enum import Enum
from typing import Protocol, Any
import json

class AgentStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class ResearchQuery:
    id: str
    text: str
    type: str  # "factual", "news", "analysis", "case_study"
    priority: int

@dataclass
class SearchResult:
    query_id: str
    url: str
    title: str
    snippet: str
    relevance_score: float
    authority_score: float  # 权威性评分

@dataclass
class ResearchReport:
    summary: str
    sections: list[dict]
    citations: list[dict]
    confidence: float
    follow_up_questions: list[str]

@dataclass
class WorkflowState:
    plan: list[ResearchQuery]
    search_results: dict[str, list[SearchResult]]
    synthesis: ResearchReport | None
    completed_queries: set[str]
    failed_queries: set[str]

Activities 实现

import asyncio
from temporalio import activity
from openai import AsyncOpenAI

client = AsyncOpenAI()

@activity.defn
async def generate_research_plan(query: str) -> list[ResearchQuery]:
    """
    Planning Agent:将用户查询拆解为 3-7 个研究子问题。
    每个子问题标记类型(事实类、新闻类、分析类、案例类),
    以便后续选择不同的检索策略。
    """
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """将研究问题拆解为 3-7 个子查询。
每个子查询必须包含:id、text、type。
type 可选值:factual(需要权威来源的事实)、news(最新新闻)、
analysis(深度分析)、case_study(具体案例)。"""},
            {"role": "user", "content": query}
        ],
        response_format={"type": "json_object"},
    )
    
    result = json.loads(response.choices[0].message.content)
    queries = []
    for i, q in enumerate(result.get("queries", [])[:7]):
        queries.append(ResearchQuery(
            id=f"q_{i+1}",
            text=q["text"],
            type=q.get("type", "factual"),
            priority=q.get("priority", 1),
        ))
    return queries


@activity.defn
async def generate_search_queries(sub_query: ResearchQuery) -> list[str]:
    """
    Query Generation Agent:将子问题转换为优化的搜索查询。
    对于不同类型的查询使用不同策略:
    - news: 加时间戳 + site:news
    - factual: 精确关键词 + 权威来源
    - case_study: 公司/产品名 + 案例
    """
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"""为以下{sub_query.type}类型的研究子问题
生成 2-3 个优化的搜索查询。每个查询应该是独立的、互补的。
类型说明:factual=事实类、news=新闻类、analysis=分析类、case_study=案例类"""},
            {"role": "user", "content": sub_query.text}
        ],
        response_format={"type": "json_object"},
    )
    result = json.loads(response.choices[0].message.content)
    return result.get("queries", [])[:3]


@activity.defn
async def execute_web_search(search_query: str) -> list[SearchResult]:
    """
    Web Search Agent:执行搜索并评估结果质量。
    关键设计:部分失败不导致整体失败,
    每个搜索独立重试,整体 Pipeline 继续执行。
    """
    # 使用搜索 API(这里用伪代码表示)
    raw_results = await search_api.search(
        query=search_query,
        num_results=10,
        include_snippets=True,
    )
    
    results = []
    for item in raw_results:
        results.append(SearchResult(
            query_id="",  # 后续填充
            url=item["url"],
            title=item["title"],
            snippet=item["snippet"],
            relevance_score=item.get("relevance", 0.5),
            authority_score=await assess_authority(item["url"]),
        ))
    
    # 按综合评分排序,只返回前5个
    results.sort(key=lambda x: x.relevance_score * 0.4 + x.authority_score * 0.6, reverse=True)
    return results[:5]


@activity.defn
async def synthesize_report(
    query: str,
    all_results: dict[str, list[SearchResult]],
) -> ResearchReport:
    """
    Synthesis Agent:将所有搜索结果综合为结构化报告。
    包含执行摘要、置信度评估、可信引用,
    以及后续研究问题。
    """
    # 将搜索结果格式化为 LLM 输入
    context = format_search_results(all_results)
    
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """你是深度研究分析师。根据提供的搜索结果,
生成一份结构化研究报告。报告必须包含:
1. executive_summary:300字以内的执行摘要
2. sections:每个主要发现一个章节,包含观点和证据
3. citations:每条引用必须包含 URL
4. confidence_score:0-1 的置信度评分,说明不确定性来源
5. follow_up_questions:3-5 个值得进一步研究的问题"""},
            {"role": "user", "content": f"研究问题:{query}\n\n搜索结果:{context}"}
        ],
        response_format={"type": "json_object"},
    )
    
    result = json.loads(response.choices[0].message.content)
    return ResearchReport(
        summary=result["executive_summary"],
        sections=result["sections"],
        citations=result["citations"],
        confidence=result["confidence_score"],
        follow_up_questions=result["follow_up_questions"],
    )

Workflow 实现(核心编排逻辑)

import asyncio
from temporalio import workflow
from datetime import timedelta
import logging

with workflow.unsafe.imports_passed_through():
    from .activities import (
        generate_research_plan, generate_search_queries,
        execute_web_search, synthesize_report,
    )
    from .models import ResearchQuery, SearchResult, ResearchReport

@workflow.defn
class DeepResearchWorkflow:
    """
    Deep Research Workflow 的核心编排逻辑。
    使用 Temporal 的 Durable Execution 保证整个研究过程永不丢失。
    """
    
    @workflow.run
    async def run(self, query: str) -> ResearchReport:
        logger = logging.getLogger("deep_research")
        logger.info(f"Starting research for query: {query}")
        
        # ===== 阶段1:规划 =====
        logger.info("Phase 1: Generating research plan")
        plan_queries = await workflow.execute_activity(
            generate_research_plan,
            query,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=workflow._wrap_retry(maximum_attempts=2),  # Activity 重试
        )
        logger.info(f"Plan generated: {len(plan_queries)} sub-queries")
        
        # ===== 阶段2:并行搜索(每个子问题独立) =====
        logger.info("Phase 2: Executing parallel searches")
        all_results: dict[str, list[SearchResult]] = {}
        
        # 为每个子问题生成多个搜索查询
        search_tasks = []
        for sub_query in plan_queries:
            # 动态生成搜索查询
            queries = await workflow.execute_activity(
                generate_search_queries,
                sub_query,
                start_to_close_timeout=timedelta(seconds=20),
            )
            
            # 为每个搜索查询创建并行任务
            for sq in queries:
                all_results[sq] = []
                search_tasks.append(
                    self._search_with_fallback(sq, sub_query.id)
                )
        
        # 并行执行所有搜索,使用 asyncio.gather 收集结果
        # 关键:即使部分搜索失败,整体流程继续
        results = await asyncio.gather(*search_tasks, return_exceptions=True)
        
        for result in results:
            if isinstance(result, Exception):
                logger.warning(f"Search failed: {result}")
            else:
                query_text, sr_list = result
                all_results[query_text] = sr_list
        
        logger.info(f"Search completed: {sum(len(v) for v in all_results.values())} results")
        
        # ===== 阶段3:综合报告 =====
        logger.info("Phase 3: Synthesizing final report")
        final_report = await workflow.execute_activity(
            synthesize_report,
            query,
            all_results,
            start_to_close_timeout=timedelta(minutes=2),  # LLM 综合需要更长时间
            retry_policy=workflow._wrap_retry(maximum_attempts=2),
        )
        
        logger.info(f"Report synthesized, confidence: {final_report.confidence}")
        return final_report
    
    async def _search_with_fallback(
        self, search_query: str, query_id: str
    ) -> tuple[str, list[SearchResult]]:
        """
        搜索任务包装器:支持重试和部分失败处理
        """
        max_attempts = 3
        for attempt in range(max_attempts):
            try:
                results = await workflow.execute_activity(
                    execute_web_search,
                    search_query,
                    start_to_close_timeout=timedelta(seconds=15),
                    # 每次重试间隔递增
                    retry_policy=workflow._wrap_retry(
                        maximum_attempts=1,  # Activity 级别不重试,由 Workflow 级别处理
                    ),
                )
                for r in results:
                    r.query_id = query_id
                return search_query, results
            except Exception as e:
                if attempt == max_attempts - 1:
                    logger.error(f"Search permanently failed after {max_attempts} attempts: {search_query}")
                    return search_query, []
                wait_seconds = 2 ** attempt
                await asyncio.sleep(wait_seconds)
        return search_query, []

4.3 为什么 AI Agent 需要 Durable Execution

多 Agent 系统的四大失败模式

在上面的 Deep Research Agent 中,如果没有 Temporal,会遇到以下问题:

失败模式1:LLM API 超时导致整条 Pipeline 报废

深度研究任务可能需要 3-30 分钟才能完成。使用推理模型(如 o3、GPT-4.1)时,单次 LLM 调用可能需要 30 秒到 3 分钟。如果超时设置太短,会陷入重试循环;如果设置太长,会阻塞整个系统。

Temporal 解决方案:每个 LLM 调用都是独立的 Activity。Planning Agent 超时只会重试 Planning,不会影响 Search 和 Synthesis 阶段。

失败模式2:部分搜索失败导致整体结果不完整

5 个并行搜索中,有 1 个失败是很常见的(网站限流、API 临时不可用等)。没有 Durable Execution,你需要自己处理部分成功的情况。

Temporal 解决方案asyncio.gather(*tasks, return_exceptions=True) 收集所有结果,即使部分失败,整体 Pipeline 继续,失败的子问题被记录,但不污染其他结果。

失败模式3:非确定性输出导致调试地狱

LLM 的输出是非确定性的。当报告出现错误信息时,根因可能在 Planning、Query Generation、Search 或 Synthesis 的任何一个环节。

Temporal 解决方案:完整的 Event History 记录了每个 Agent 的输入、输出和决策分支。通过 Workflow History 回放,可以精确重现任何一个失败场景。

失败模式4:长时间 Pipeline 中途失败,之前的计算全部浪费

一个 30 分钟的研究任务在第 28 分钟失败了?没有 Durable Execution,你需要从头开始,前面 28 分钟全部浪费。

Temporal 解决方案:Workflow 从最近的成功 Activity 之后恢复,不需要从头开始。结合 Workflow Streams,客户端可以在执行过程中看到实时进度。

五、性能优化与生产最佳实践

5.1 Workflow 设计的性能陷阱

陷阱1:过大的 Event History

Temporal 的 Event History 大小直接影响重放性能。每个 Event 有大小限制,整个 Workflow 的 Event History 也有上限(通常 50MB)。

反例:在 Workflow 中处理大文件或长列表

// ❌ 错误:将大对象放在 Workflow 状态中
func BadWorkflow(ctx workflow.Context, filePath string) error {
    content, _ := os.ReadFile(filePath)
    // 整个文件内容被序列化到 Event History
    lines := strings.Split(string(content), "\n")
    // 如果文件有 100MB,这个 Workflow 的 Event History 也是 100MB+
    
    var results []ProcessResult
    for _, line := range lines {
        result := workflow.ExecuteActivity(ctx, ProcessLine, line)
        results = append(results, result)  // 每个 Activity 结果也被序列化
    }
    return nil
}

正确做法:使用 Pagination + Activity 本地处理

// ✅ 正确:将大对象处理下推到 Activity 层
func GoodWorkflow(ctx workflow.Context, filePath string) error {
    // 只传递文件路径(几字节),不传递文件内容
    totalLines, err := workflow.ExecuteActivity(ctx, CountLines, filePath,
        activity.StartToCloseTimeout(time.Minute),
    ).Get(ctx)
    
    // 分页处理:每页 1000 行
    pageSize := 1000
    for page := 0; page < totalLines; page += pageSize {
        // 每次只传递页码范围(几字节)
        result, err := workflow.ExecuteActivity(ctx, ProcessLinePage,
            LinePage{FilePath: filePath, Start: page, End: min(page+pageSize, totalLines)},
            activity.StartToCloseTimeout(5*time.Minute),
        ).Get(ctx)
        if err != nil {
            return err
        }
        // 只存储汇总结果(几个字节)
        workflow.UpsertTypedSearchAttributes(map[string]interface{}{
            "ProcessedLines": page + pageSize,
        })
    }
    return nil
}

陷阱2:Activity 之间的隐式依赖

如果两个 Activity 实际上可以并行,但你写成串行了,会浪费大量时间。

// ❌ 串行:浪费 200ms
err := workflow.ExecuteActivity(ctx, ActivityA).Get(ctx, nil)
err = workflow.ExecuteActivity(ctx, ActivityB).Get(ctx, nil)

// ✅ 并行:只需要 150ms
futA := workflow.ExecuteActivity(ctx, ActivityA)
futB := workflow.ExecuteActivity(ctx, ActivityB)
err = futA.Get(ctx, nil)
if err == nil {
    err = futB.Get(ctx, nil)
}

5.2 长时 Agent 的任务队列设计

对于需要持续运行数小时的 AI Agent,推荐使用 Task Queue 隔离 来实现优雅的资源管理:

// 不同的 Agent 类型使用不同的 Task Queue
// 实现资源隔离和优先级控制

taskQueues := map[string]WorkerOptions{
    "planner-queue": {
        MaxConcurrentWorkflows: 10,
        MaxConcurrentActivities: 50,
        // Planning Agent 资源需求低
    },
    "llm-queue": {
        MaxConcurrentWorkflows: 5,
        MaxConcurrentActivities: 5,
        // LLM 调用资源密集,需要严格限制并发
    },
    "search-queue": {
        MaxConcurrentWorkflows: 50,
        MaxConcurrentActivities: 200,
        // 搜索相对轻量,可以高并发
    },
    "synthesis-queue": {
        MaxConcurrentWorkflows: 10,
        MaxConcurrentActivities: 10,
        // 综合需要 LLM,资源需求高
    },
}

5.3 可观测性:Workflow 级别的 Tracing

import opentelemetry.trace as otel
from temporalio import workflow as twf

tracer = otel.get_tracer("deep-research-agent")

@twf.defn
class TracedResearchWorkflow:
    @twf.run
    async def run(self, query: str) -> ResearchReport:
        with tracer.start_as_current_span("deep_research") as span:
            span.set_attribute("query", query)
            span.set_attribute("workflow.type", "deep_research")
            
            plan = await self._plan(query)
            span.set_attribute("plan.size", len(plan))
            
            results = await self._search(plan)
            span.set_attribute("search.results", sum(len(r) for r in results.values()))
            
            report = await self._synthesize(query, results)
            span.set_attribute("report.confidence", report.confidence)
            
            return report

六、与竞品的深度对比

6.1 Temporal vs AWS Step Functions

维度TemporalAWS Step Functions
编程模型代码优先(Go/Python/TS)声明式(JSON/YAML)
状态管理内置完整 Event History需要外部存储(DynamoDB)
重放能力确定性重放,任意历史点恢复不支持
Activity 重试细粒度策略 + backoff基础 retry 配置
AI Agent 集成Google ADK + OpenAI SDK 原生支持需要 Lambda 包装
定价模型开源免费(Cloud 托管)按状态转换次数计费
调试工具Web UI + MCP + CLIAWS Console

核心差异:Step Functions 仍然是声明式工作流,适合流程固定、步骤少的场景。而 Temporal 的代码优先模型更适合 AI Agent 这种逻辑复杂、需要 LLM 判断分支的场景。

6.2 Temporal vs Camunda / Airflow

  • Camunda:BPMN 流程引擎,适合业务流程管理(BPM),不是为长时间运行的任务设计
  • Airflow:数据管道编排工具,适合 ETL/ML Pipeline,强调调度而非状态持久化

Temporal 的核心差异化定位:长时间运行、有 LLM 调用、需要可靠状态持久化的 AI Agent 编排

七、总结与展望

7.1 Temporal 的战略价值

Replay 2026 标志着 Temporal 的战略重心发生了本质转变:

  1. 从"工作流引擎"到"AI Agent 基础设施":Serverless Workers + AI SDK 集成,让 Temporal 成为 AI Agent 的"操作系统"
  2. 从"开发者工具"到"平台服务":Standalone Activities + Workflow Streams,让非 Temporal 专家也能构建生产级 Agent
  3. 从"分布式事务"到"分布式智能":Deep Research Agent 展示了 Durable Execution 如何解决 AI Agent 的核心工程挑战

7.2 什么时候应该使用 Temporal

应该用 Temporal

  • AI Agent 开发(Deep Research、Code Agent、Data Agent)
  • 长时间运行的任务(数分钟到数天)
  • 需要可靠状态持久化的多步骤流程
  • 需要完整执行轨迹审计的场景
  • 多 Agent 协作编排

不需要 Temporal

  • 简单的同步请求-响应
  • 固定流程、低复杂度的数据管道(用 Airflow)
  • 纯业务 BPM 流程(用 Camunda)

7.3 2026 年下半年的 Temporal 路线图

根据 Replay 2026 透露的信息,Temporal 正在推进:

  • 多云 Durable Execution:Workflow 在不同云厂商之间无缝迁移
  • WASM 运行时:Activity 可以用 WASM 部署,零冷启动
  • AI Native 调试器:基于 Workflow History 的 AI 辅助调试
  • Compliance Mode:HIPAA、SOC2 合规的 Workflow 执行环境

Durable Execution 的范式正在从"处理分布式系统的失败"扩展到"处理 AI Agent 的不确定性"。这两者的共同点是:失败是常态,可恢复才是关键。Temporal 正是这一理念的最佳实践。


参考资源

  • Temporal 官方文档:https://docs.temporal.io
  • Replay 2026 回顾:https://temporal.io/blog/replay-2026
  • Temporal × Braintrust Deep Research:https://temporal.io/blog/how-to-build-deep-research-agents-using-temporal-and-braintrust
  • Coinbase × Temporal MCP:https://temporal.io/blog/bringing-temporal-into-your-ai-editor-how-coinbase-debugs-workflows-with-mcp
  • Temporal GitHub:https://github.com/temporalio

推荐文章

PHP来做一个短网址(短链接)服务
2024-11-17 22:18:37 +0800 CST
Vue3中如何进行错误处理?
2024-11-18 05:17:47 +0800 CST
mendeley2 一个Python管理文献的库
2024-11-19 02:56:20 +0800 CST
一个收银台的HTML
2025-01-17 16:15:32 +0800 CST
使用Vue 3实现无刷新数据加载
2024-11-18 17:48:20 +0800 CST
阿里云免sdk发送短信代码
2025-01-01 12:22:14 +0800 CST
JavaScript 实现访问本地文件夹
2024-11-18 23:12:47 +0800 CST
程序员茄子在线接单