Temporal Replay 2026 深度实战:当持久化执行引擎成为 AI Agent 的神经系统——从 Workflow Streams 到 Serverless Workers、从 AI 调试到生产级部署的完全指南(2026)
引言:当"容错"从负担变成竞争优势
写过分布式系统的工程师都知道,最痛苦的不是写业务逻辑,而是写"失败之后怎么办"。网络抖动、进程崩溃、外部服务超时——这些在任何生产环境里都是必然事件。传统做法是把这些全交给开发者处理:重试逻辑、超时设置、幂等保障、状态回滚……一行业务代码背后,可能要堆十行容错代码。
Temporal 解决的就是这个问题。它的核心概念叫 Durable Execution(持久化执行):你的代码不需要处理失败,框架帮你记住每个执行步骤;出了任何问题,Workflow 可以从上次停下的地方继续跑,而不是从头重试。
2026 年 6 月,Temporal 在 Replay 2026 大会上发布了一批重量级更新:Serverless Workers、Standalone Activities、Workflow Streams,以及与 Google ADK、OpenAI Agents SDK 的深度集成。这些功能合在一起,正在把 Temporal 从一个"工作流引擎"推向 AI Agent 运行时基础设施 的位置。
本文从架构原理出发,深入解析这批更新的技术本质,配上完整的代码示例,告诉你这些能力在生产环境里怎么用、为什么强、以及它们的边界在哪里。
一、Durable Execution 基础:Temporal 的核心模型
在深入新特性之前,先快速梳理一下 Temporal 的基本模型,帮助我们理解后续内容。
1.1 三个核心抽象
Temporal 的核心有三个概念:
Workflow — 你想要执行的业务逻辑。Workflow 是确定性的:给定相同的输入,无论执行多少次,Workflow 内部的状态变化必须完全相同。Temporal 通过记录每次执行的历史(History),保证了 Workflow 可以随时从任意点恢复。
Activity — 与外部世界交互的单元。发 HTTP 请求、写数据库、发送邮件,这些都放在 Activity 里。Activity 可以失败,可以重试,开发者自行控制重试策略。
Signal — 单向消息,外部可以随时发送给一个运行中的 Workflow。Workflow 内部通过 workflow.wait_condition 等待 Signal,从而实现与外部系统的交互式响应。
┌─────────────────────────────────────────────────────────────┐
│ Temporal Server │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Workflow History (持久化执行历史) │ │
│ │ step1 → step2 → step3 → [crash] → resume step3 │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
↑ 记录每步状态 ↑ 从断点恢复
│ │
┌─────┴──────┐ ┌──────┴───────┐
│ Workflow │ │ Worker │
│ (纯逻辑) │◄────── Signals ──────►│ (执行业务) │
│ 无外部依赖 │ │ 有状态/可重启 │
└────────────┘ └─────────────┘
│
┌─────┴──────┐
│ Activity │
│ (外部交互) │
│ HTTP/DB │
└────────────┘
1.2 为什么 Workflow 必须是确定性的
这是 Temporal 最容易让人困惑的设计点。Temporal 通过重放历史(Replaying)来恢复 Workflow:Worker 重启后,从头读一遍 History,重建内存状态,然后从断点继续执行。
这意味着:
- 禁止在 Workflow 里调用
time.Now()(每次重放时间不同) - 禁止访问随机数
- 禁止直接发网络请求
正确做法是把外部数据通过 Signal 传进来,或者在 Activity 里完成外部交互:
// ❌ 错误:Workflow 内不能调用 time.Now()
func MyWorkflow(ctx workflow.Context) error {
t := time.Now() // 每次重放结果不同,破坏确定性
...
}
// ✅ 正确:通过 Activity 获取时间
func MyWorkflow(ctx workflow.Context) error {
var t time.Time
err := workflow.ExecuteActivity(ctx, GetCurrentTime).Get(ctx, &t)
...
}
理解了这个前提,我们才能真正理解 Replay 2026 新特性的价值。
二、Replay 2026 全览:四个改变游戏规则的新能力
Replay 2026 发布的核心能力可以分成两类:运行时基础设施升级(Serverless Workers、Standalone Activities)和 交互层革命(Workflow Streams、AI 框架集成)。先来看全局:
| 新特性 | 类型 | 解决的问题 |
|---|---|---|
| Serverless Workers | 运行时基础设施 | Worker 的运维负担,Serverless 环境下的自动扩缩容 |
| Standalone Activities | 运行时基础设施 | Activity 独立于 Workflow 生态系统的编排能力 |
| Workflow Streams | 交互层 | 持久化执行 + 实时 UI 反馈的融合 |
| AI 框架集成 | 应用层 | Temporal 作为 AI Agent 运行时的基础设施支撑 |
三、Serverless Workers:Temporal 的无服务器化之路
3.1 传统 Worker 模型的问题
Temporal 传统的 Worker 模型要求开发者自己管理进程池:
w, err := worker.New(client, worker.Options{
TaskQueue: "my-queue",
})
w.RegisterWorkflow(MyWorkflow)
w.RegisterActivity(MyActivity)
err = w.Run(worker.InterruptCh())
这里有几个实际痛点:
- 需要自己管理扩缩容 — 业务高峰期手动调 Worker 数量,或者用 Kubernetes HPA,但配置复杂
- 冷启动延迟 — 函数即服务(FaaS)的核心诉求是"按需启动、零空闲成本",但传统 Worker 需要预先占用资源
- 跨云部署的复杂度 — 每个云厂商的 Serverless 方案不同,Temporal Worker 需要适配各种触发器
3.2 Serverless Workers 的架构设计
Replay 2026 的 Serverless Workers 解决了这个问题。Temporal 官方描述为:"Workers that scale to zero and spin up on demand"。
核心思路是:把 Worker 的生命周期管理与 Temporal Server 的任务分发解耦。
传统模式下,Worker 是长驻进程,持续从 Temporal Server 拉取任务:
Worker (长驻进程) ──poll──► Temporal Server ──dispatch──► Activity
│ │
持续占用资源 按需触发
Serverless Workers 模式下,Worker 可以被平台托管,Task Queue 的消息触发 Worker 实例启动:
Serverless Platform (AWS Lambda / GCP Cloud Run / Azure Func)
│
├── Worker 实例(按需启动,计费到毫秒)
│
└── Task Queue 消息触发启动
开发者只需要声明式配置:
// serverless_worker.go
package main
import (
"context"
"log"
"os"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
// Serverless 环境变量注入的地址
temporalHost := os.Getenv("TEMPORAL_HOST")
if temporalHost == "" {
temporalHost = "localhost:7233"
}
clientOptions := client.Options{
HostPort: temporalHost,
}
c, err := client.Dial(clientOptions)
if err != nil {
log.Fatal("无法连接 Temporal Server", err)
}
defer c.Close()
// Serverless 模式:无需预先 Run(),框架自动管理生命周期
w := worker.New(c, "my-serverless-queue", worker.Options{
// 关键配置:Serverless 模式
ServerWorkerOptions: &worker.ServerWorkerOptions{
// 最大并发任务数,按需扩展
MaxConcurrentWorkflowTaskPollers: 10,
MaxConcurrentActivityTaskPollers: 50,
},
// 空闲超时:无任务时自动释放资源
WorkerStopTimeout: 30 * 1e9, // 纳秒
})
w.RegisterWorkflow(MyWorkflow)
w.RegisterActivity(ProcessOrder)
log.Println("Serverless Worker 已注册,等待任务...")
// 在 FaaS 环境下,这个调用会被平台管理
// 函数返回 = Worker 停止 = 计费结束
}
配合 AWS Lambda 的示例配置:
# template.yaml (AWS SAM)
Resources:
TemporalServerlessFunction:
Type: AWS::Serverless::Function
Properties:
Handler: dist/serverless_worker
Runtime: provided.al2023
Events:
TemporalTaskQueue:
Type: SQSEvent
Properties:
Queue: !GetAtt TemporalTaskQueue.Arn
Environment:
Variables:
TEMPORAL_HOST: !GetAtt TemporalCluster.Arn
TEMPORAL_NAMESPACE: default
核心优势:成本模型从"常驻实例"变为"按调用计费"。假设一个 AI Agent 每天处理 1000 个任务,峰值 QPS 不超过 5,传统模式需要维持一个常驻 Worker($15/月),Serverless 模式下每天实际计算时间可能只有几分钟,成本降到 $1 以下。
3.3 冷启动延迟的优化
FaaS 最大的痛点是冷启动延迟。Temporal 通过预热(Warm Pool)机制缓解:
w := worker.New(c, "my-queue", worker.Options{
WarmupTimeout: 5 * 1e9, // Worker 启动后等待 5 秒内的任务
})
同时,Temporal Server 支持 sticky execution——同一个 Workflow 的后续任务会被优先路由到最近处理过它的 Worker,这样热 Workflow 几乎零冷启动。
四、Standalone Activities:挣脱 Workflow 束缚的 Activity
4.1 问题的本质
在 Temporal 的传统模型里,Activity 必须通过 Workflow 触发:
Workflow ──execute──► Activity ──返回──► Workflow
(由 Temporal 调度)
这带来一个隐性约束:Activity 的调度依赖于 Workflow 的存在。如果你的系统里有些任务根本不需要 Workflow 编排(比如定时任务、事件驱动的独立任务),你仍然需要创建一个"空的 Workflow"来包裹它。
4.2 Standalone Activities 的设计
Replay 2026 引入的 Standalone Activities 允许 Activity 直接从外部客户端触发,不需要 Workflow 作为中间层:
// standalone_activity_client.go
package main
import (
"context"
"log"
"time"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.Dial(client.Options{Namespace: "default"})
if err != nil {
log.Fatal(err)
}
defer c.Close()
// 方式一:通过 Workflow 触发(传统方式)
we, err := c.ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{TaskQueue: "my-queue"},
MyWorkflow{}, "input",
)
// 方式二:直接启动 Standalone Activity(新增能力)
// Activity 可以独立于 Workflow 直接调度
activityHandle, err := c.StartStandaloneActivity(
context.Background(),
client.StartActivityOptions{
TaskQueue: "standalone-queue",
ID: "standalone-task-001",
RetryPolicy: &client.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
},
SendEmailActivity,
EmailParams{
To: "user@example.com",
Subject: "订单确认",
Body: "您的订单 #12345 已确认",
},
)
// 独立等待结果,或在 Activity 内部通过 WorkflowStreamClient 发布进度
var result string
err = activityHandle.Get(context.Background(), &result)
log.Printf("Standalone Activity 结果: %s", result)
}
服务端注册 Standalone Activity 的方式:
w := worker.New(c, "standalone-queue", worker.Options{})
w.RegisterStandaloneActivityWithOptions(
SendEmailActivity,
"standalone-send-email", // 独立 Activity 名称
)
使用场景:
- 事件驱动任务:Kafka 消费、Webhook 接收、外部系统回调
- 定时任务:不需要 Workflow 编排的独立后台任务
- 微服务解耦:两个服务之间不需要共享 Workflow,只需要共享 Activity 的执行语义(重试、幂等、超时)
- 批量处理:对一组数据做独立的批量操作,不需要整体的编排逻辑
Standalone Activities 本质上是把 Temporal 的可靠执行(重试、幂等、持久化)能力从 Workflow 的"壳"里解放出来,让它成为一个独立的任务执行层。
五、Workflow Streams:持久化执行 + 实时交互的融合
这是 Replay 2026 最重要的新功能,也是最具变革性的一个。
5.1 传统方案的问题
在 Temporal 里,Workflow 的状态变化都被记录在 History 里。但外部客户端想要实时看到 Workflow 的进展,传统方案有两种:
方案一:轮询 Query
// 客户端不断发送 Query 请求
queryResult, err := c.QueryWorkflow(ctx, workflowID, runID, "get_status")
问题:轮询浪费资源,且无法区分"真的没变化"和"刚变化但还没处理到"。
方案二:外部消息队列(Redis Pub/Sub)
很多团队在 Activity 里写一个 Redis Publish,消费者订阅 Redis 频道来获取实时更新。
问题:
- Redis 是"尽力而为"(at-most-once),消息可能丢失
- 需要额外的运维基础设施
- Workflow 本身不感知这个流,无法利用 Workflow 的可靠性保证
- 两套系统需要额外的对账逻辑
5.2 Workflow Streams 的架构
Workflow Streams 是一个基于 Temporal 核心原语构建的持久化流抽象。它不是新建一个消息系统,而是用 Workflows、Signals、Updates 和 Queries 组合实现的库:
┌─────────────────────────────────────────────────────────────┐
│ Workflow │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ WorkflowStream (持久化流) │ │
│ │ topic: "status" ───► event1 ──► event2 ──► event3 │ │
│ │ topic: "outputs" ──► output1 ─► output2 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ 内部发布 = 追加到列表(极低开销) │
│ ▼ Signal (外部发布 = Signal 路由到 History) │
└─────────────────────────────────────────────────────────────┘
│ │
┌────┴──────────────────────┐ │
│ WorkflowStreamClient │ │
│ (订阅端: Update 长轮询) │ │
└────┬──────────────────────┘ │
│ │
┌────┴──────────────────────────────────┐
│ 外部订阅者(浏览器 / 手机 App / 控制台) │
└───────────────────────────────────────┘
关键设计决策:当发布来自 Workflow 内部时,仅仅是向内存中的列表追加元素,零额外开销。当发布来自外部(Activity、其他服务)时,通过 Signal 路由,Signal 会被记录在 History 中——这是 exactly-once 语义的核心保证。
5.3 代码实战:构建 AI Agent 实时交互界面
Workflow Streams 最杀手级的应用场景是 AI Agent 的实时交互界面。想象一个场景:
用户提交一个问题给 AI Agent,Agent 开始工作,它的思考过程、工具调用、生成内容,都实时展示在网页上。用户可以随时中断或干预。
没有 Workflow Streams 之前,这个实时性很难保证,而且 Workflow 重试时界面上已经显示的内容会乱。Workflow Streams 让 UI 成为 History 的纯函数,完美解决这个问题。
Step 1:定义事件类型
# workflow_streams_demo/models.py
from dataclasses import dataclass
from enum import Enum
class AgentEventType(Enum):
REASONING_START = "reasoning_start"
REASONING_STEP = "reasoning_step"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
FINAL_OUTPUT = "final_output"
@dataclass
class ReasoningEvent:
step: int
thought: str
@dataclass
class ToolCallEvent:
tool_name: str
arguments: dict
@dataclass
class OutputEvent:
delta: str # 流式输出的文本片段
Step 2:定义带 Workflow Streams 的 Agent Workflow
# workflow_streams_demo/agent_workflow.py
from datetime import timedelta
from typing import Optional
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream
with workflow.unsafe.imports_passed_through():
from workflow_streams_demo.models import (
ReasoningEvent,
ToolCallEvent,
OutputEvent
)
from temporalio.contrib.openai_agents import (
OpenAIAgentsPlugin,
ModelActivityParameters,
)
@workflow.defn
class StreamingAgentWorkflow:
@workflow.init
def __init__(self):
# 初始化 WorkflowStream(在 @workflow.init 中初始化是关键)
self.stream = WorkflowStream()
@workflow.run
async def run(self, prompt: str) -> str:
# 定义两个持久化 topic
reasoning_topic = self.stream.topic(
"reasoning",
type=ReasoningEvent
)
output_topic = self.stream.topic(
"output",
type=OutputEvent
)
# 发布第一步思考
reasoning_topic.publish(
ReasoningEvent(step=1, thought="分析用户问题...")
)
# 执行搜索 Activity
reasoning_topic.publish(
ReasoningEvent(step=2, thought="调用搜索工具收集信息...")
)
search_results = await workflow.execute_activity(
"search_knowledge_base",
prompt,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=workflow.RetryPolicy(maximum_attempts=2),
)
# 发布工具调用结果
reasoning_topic.publish(
ReasoningEvent(
step=3,
thought=f"搜索完成,获得 {len(search_results)} 条结果"
)
)
# 生成最终回复(Activity 返回流式数据)
final_output = await workflow.execute_activity(
"generate_response",
{"prompt": prompt, "context": search_results},
start_to_close_timeout=timedelta(seconds=60),
)
return final_output
Step 3:在 Activity 中发布流式输出
# workflow_streams_demo/activities.py
from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient
@activity.defn
async def generate_response(params: dict) -> str:
"""模拟流式 LLM 响应,每生成一个 token 就发布到 Workflow Stream"""
client = WorkflowStreamClient.from_within_activity()
output_stream = client.topic("output", type=OutputEvent)
async with client:
prompt = params["prompt"]
context = params["context"]
accumulated = ""
# 模拟 LLM 流式生成
for chunk in simulate_llm_stream(prompt, context):
accumulated += chunk
# 发布每个文本片段到持久化流
output_stream.publish(OutputEvent(delta=chunk))
return accumulated
def simulate_llm_stream(prompt: str, context: list):
"""模拟流式 LLM 输出,实际场景替换为真实 LLM API"""
template = f"基于以下信息回答「{prompt}」:\n"
yield template
for i, item in enumerate(context[:3], 1):
yield f"\n{i}. {item[title]}: {item[snippet][:50]}..."
yield "\n\n综合以上信息,答案是..."
Step 4:外部订阅者(浏览器端)
# workflow_streams_demo/subscriber.py
import asyncio
from temporalio.contrib.workflow_streams import WorkflowStreamClient
from temporalio.client import Client
async def subscribe_agent_output(temporal_client: Client, workflow_id: str):
"""
外部订阅者:连接到 Workflow Stream,实时渲染 AI Agent 输出。
对应 SSE (Server-Sent Events) 到浏览器的场景。
"""
stream = WorkflowStreamClient.create(temporal_client, workflow_id)
# 订阅 reasoning topic
reasoning_topic = stream.topic("reasoning")
# 订阅 output topic
output_topic = stream.topic("output")
# 消费两个 topic 的事件
reasoning_task = asyncio.create_task(
consume_topic(reasoning_topic, "reasoning")
)
output_task = asyncio.create_task(
consume_topic(output_topic, "output")
)
await asyncio.gather(reasoning_task, output_task)
async def consume_topic(topic, topic_name: str):
"""消费单个 topic 的事件"""
accumulated_reasoning = []
accumulated_output = []
async for event in topic.subscribe():
if topic_name == "reasoning":
accumulated_reasoning.append(event.thought)
print(f"[Reasoning Step {event.step}] {event.thought}")
elif topic_name == "output":
accumulated_output.append(event.delta)
print(event.delta, end="", flush=True)
5.4 与 AI 框架的集成
Workflow Streams 已经原生集成了 OpenAI Agents SDK 和 Google ADK:
from datetime import timedelta
from temporalio.contrib.openai_agents import (
OpenAIAgentsPlugin,
ModelActivityParameters,
)
# 配置插件:让 OpenAI 的流式响应自动发布到 topic
plugin = OpenAIAgentsPlugin(
model_params=ModelActivityParameters(
streaming_topic="openai_events",
streaming_batch_interval=timedelta(milliseconds=100), # 100ms 批量
),
)
# 在 Workflow 中直接运行 OpenAI Agent
@workflow.defn
class OpenAIWorkflow:
@workflow.init
def __init__(self):
self.stream = WorkflowStream()
@workflow.run
async def run(self, prompt: str) -> str:
from agents import Agent, Runner
agent = Agent(
name="Researcher",
instructions="你是一个研究助手,帮助用户深入分析问题。"
)
result = Runner.run_streamed(agent, prompt)
return result.final_output
这里的精妙之处在于:Workflow 本身不感知流式事件——它调用 Agent,拿最终结果;但订阅者通过 Workflow Stream 看到了每个 token 的生成过程。如果 Activity 失败并重试,已发布的事件不会被撤回,消费者会收到重试信号并自行处理。
六、AI Agent 调试的新范式:Kelet AI 案例
Replay 2026 同期,Temporal 官方博客还发布了一篇重量级案例:Kelet AI——一个"会调试 AI Agent 的 AI",它本身完全构建在 Temporal 之上。
6.1 AI 调试的特殊性
传统软件调试有栈跟踪(Stack Trace),你知道哪一行出错了。AI Agent 的失败没有栈跟踪——同一个输入成功十次、第十一次失败,十次失败各有不同表现。真正的根因是一个模糊的簇模式,只有跨越数百次会话才能浮现。
Kelet AI 的创始人 Almog Baku 在博客中描述了一个真实案例:
一个保险公司的双 Agent 流水线持续错误分类保险理赔。每个单独看都像是第二个 Agent 的评分错误。真正的原因在第一个 Agent:它把通话记录的时序信息剥离了,而恰恰是时序信息决定了这个保险公司的理赔类型。
6.2 Kelet 的 Temporal 架构
Kelet 构建了四层 Workflow 层级,每一层利用了 Temporal 的不同能力:
Layer 4: Investigate Issue Workflow
└── 触发条件:足够多的 hypothesis 积累后
└── 根因分析 + Prompt Patch 生成
Layer 3: Agent Aggregation Workflow
└── 跨会话聚合同一 Agent 的 hypothesis
└── 跨部署失败模式发现
Layer 2: Signal Workflows (并行)
└── Signal enrichment / merging / agent interrogation
└── 每会话独立并行执行
Layer 1: Session Workflow
└── 接收会话数据
└── 5 分钟防抖窗口(debounce):
await workflow.wait_condition(
lambda: (workflow.now() - self._last_message) >= DEBOUNCE_WINDOW
)
关键 Temporal 能力的使用:
- wait_condition:等待 5 分钟静默窗口,避免会话还在进行时就触发分析
- continue_as_new:长运行 Workflow 重置历史而不丢状态
- Signal 驱动:新会话通过 Signal 立即触发处理,无轮询
- 多层 Workflow 门控:只有 hypothesis 积累到阈值才触发高级分析
6.3 自监控:Kelet 监控自身
Kelet 监控自己的 Temporal Workflows,用自己的框架分析自己的问题。核心是通过 Temporal 的 OpenTelemetry 集成,在所有内部调用上自动传播 session 上下文:
import kelet
from kelet.temporal import KeletPlugin
# 三行代码接入 Kelet 监控
kelet.configure(api_key="...", project="my-agent")
client = await Client.connect(
"localhost:7233",
plugins=[KeletPlugin()], # 自动传播 session 上下文
)
worker = Worker(
client,
task_queue="ai",
workflows=[MyWorkflow],
activities=[my_activity],
)
关键是 Kelet 的拦截器会过滤掉自己的监控 Workflow,避免自我监控变成无限循环:每次诊断产生一个会话,会话触发新诊断,新诊断又产生会话……Temporal 的 History 让这个循环监控成为可能,但也通过信号设计让它安全退出。
七、性能与可靠性的深层分析
7.1 Exactly-Once 语义的实现
Workflow Streams 提供了 exactly-once 发布语义,通过两个机制:
发布端去重:
# 每个批次带上 sequence key
batch_with_key = {
"key": f"{publisher_id}:{sequence_number}",
"events": [event1, event2],
"publisher_ttl": 900, # 15 分钟去重窗口
}
重试时如果 key 相同则去重。
订阅端 offset 追踪:
# 订阅者记录最后收到的 offset
async for event in topic.subscribe(start_from=last_known_offset):
last_known_offset = event.offset
process(event)
Consumer 重连时自动从断点继续。
7.2 Continue-as-New 的 Stream 连续性
Temporal 的 Continue-as-New 机制用于重置长 Workflow 的 History 边界(History 过长会影响性能)。Workflow Streams 对此做了特殊处理:
# WorkflowStreams 提供了专用的 continue_as_new 包装
await self.stream.continue_as_new(
lambda stream_state: [
WorkflowInput(
app_state=self.app_state,
stream_state=stream_state, # 自动序列化流状态
)
]
)
- 在继续运行之前,所有长轮询的订阅者被先清空(drain)
- 流事件历史和去重表被快照并传入新 Workflow
- 订阅者客户端自动跟随 Workflow 链,从下一个事件继续消费
7.3 批量策略对 History 大小的影响
默认的 batch_interval 是 2 秒。如果一个 LLM 流每秒产生 100 个 token:
- 不批处理:每秒 100 个 Signal → 100 条 History 记录
- 批处理(100ms):每秒 10 个 Signal → 10 条 History 记录(History 体积减少 90%)
这个批量策略让 Workflow Streams 在高吞吐场景下对 History 大小的影响降到最低。
八、部署架构与生产环境建议
8.1 推荐部署架构
┌──────────────────────────────────┐
│ Temporal Cloud / 自托管 │
│ ┌─────────────┐ ┌────────────┐ │
│ │ Frontend │ │ History │ │
│ │ Service │ │ Service │ │
│ └──────┬──────┘ └─────┬─────┘ │
│ │ │ │
│ ┌──────┴───────────────┴─────┐ │
│ │ Workflow Service │ │
│ └───────────────────────────┘ │
└──────────────────────────────────┘
▲
┌────────────────────────────────┼────────────────────────────┐
│ │ │
┌──────┴──────┐ ┌─────────┴─────────┐ ┌─────────┴────────┐
│ Serverless │ │ On-Prem Worker │ │ Serverless │
│ Workers │ │ (常驻,金丝雀发布) │ │ Workers │
│ Lambda/Func│ └───────────────────┘ │ Cloud Run │
└────────────┘ └─────────────────┘
8.2 各新特性的适用场景
| 场景 | 推荐特性 | 不推荐理由 |
|---|---|---|
| AI Agent 实时界面 | Workflow Streams | 传统方案无法保证 exactly-once |
| 高峰期突发流量 | Serverless Workers | 常驻 Worker 成本高 |
| 事件驱动独立任务 | Standalone Activities | 勉强用 Workflow 包装增加复杂度 |
| 多 AI Agent 编排 | Workflow Streams + AI SDK 集成 | 自建编排层无法保证持久性 |
| 跨会话 AI 调试 | Temporal Workflow 层级架构 | 传统监控系统缺少事件驱动能力 |
8.3 History 大小管理
生产环境需要监控 History 增长:
# 查询 Workflow 历史长度
tctl workflow show <workflow-id> | wc -l
# 设置历史事件限制(超过后自动 Continue-as-New)
temporal operator workflow update-handler \
--workflow-id <workflow-id> \
--max-history-events 50000
建议在 Activity 级别设置 heartbeat_timeout,让长时间运行的 Activity 定期汇报进度,从而限制 History 中单 Activity 的最大事件数。
九、总结与展望
Replay 2026 发布的四个能力,从不同维度扩展了 Temporal 的边界:
Serverless Workers 把 Temporal 的可靠执行带入了 Serverless 时代,让成本模型从"常驻实例"变成"按调用计费"。对于 AI Agent 这类 QPS 不稳定的工作负载,这是从基础设施层释放的红利。
Standalone Activities 把 Temporal 的核心价值(持久化、幂等、重试)从 Workflow 的强绑定中解放出来,成为独立的任务执行层。两个微服务之间不再需要共享 Workflow 定义,只需要共享 Activity 契约。
Workflow Streams 是最具变革性的创新——它用 Temporal 自身的基础能力,构建了一个具有 exactly-once 保证的持久化流系统。这不是"又加了一个消息队列",而是把持久化执行和实时交互统一在同一套语义下。对于 AI Agent 开发者来说,这是目前最优雅的"让 AI 思考过程可见"的技术方案。
AI 框架集成(OpenAI Agents SDK + Google ADK)则把 Temporal 定位为 AI Agent 的基础设施层,而不仅仅是工作流引擎。Durable Execution + Durable AI Agent = 可以随时中断、恢复、重试的 AI 任务,这是从"AI 生成代码"到"AI 运行系统"的关键一步。
Durable Execution 的核心哲学是:让你的业务逻辑把失败当成不存在。当这个哲学从人类编写的代码延伸到 AI Agent 的行为,当 Workflow Streams 让 AI 的思考过程也变得可观测、可干预、可重试——我们正在接近一个更可靠的 AI 应用时代。
这不是炫技,这是工业级的可靠。
参考资源
- Temporal 官方博客:https://temporal.io/blog
- Workflow Streams 原文:https://temporal.io/blog/workflow-streams-live-interactivity-agents-other-applications
- Kelet AI 案例:https://temporal.io/blog/we-built-a-durable-agent-debugs-durable-agents
- Temporal Go SDK 文档:https://docs.temporal.io/dev-guide/go
- Temporal Python SDK:https://docs.temporal.io/dev-guide/python
- Replay 2026 公告:https://temporal.io/replay2026