AG-UI 协议深度解析:当 AI Agent 前端交互进入「双向心跳」时代
前言:从三足鼎立到四轮驱动
过去两年,AI Agent 的协议生态经历了一场静悄悄的标准化革命。
2024 年 11 月,Anthropic 正式发布 MCP(Model Context Protocol),解决了 AI Agent 到工具(Tools/API)的调用标准化问题——终于,所有 AI 应用不再需要为每个工具写一遍 adapter,Claude 能用的工具,理论上任何 MCP Client 都能用。
紧接着,2025 年初,A2A(Agent-to-Agent Protocol) 由多个头部 AI 企业联合推出,解决了多 Agent 协作时代的通信问题——一个 Agent 要把任务转交给另一个 Agent,终于有了标准握手协议,不需要各自实现一套私有协议。
但这两者加起来,依然有一个巨大的空白:用户到 Agent 的交互层面。
当你打开一个 AI 应用,你不是在 API 层面和 Agent 交互,你是在 UI 层面。你看到的是流式输出的文字、工具调用的进度条、实时的状态更新、Agent 之间切换时的平滑过渡。这些 UI 交互体验,每一家 AI 应用开发商都在重复造轮子。更要命的是——这些 UI 层和后端 Agent 框架(LangGraph、CrewAI、Dify、Mastra)深度耦合,换一个框架几乎等于重写整个前端。
这就是 AG-UI 协议诞生的背景。它专门解决前端应用与 AI Agent 之间的实时双向通信标准化问题,是 AI Agent 协议栈的最后一块拼图。
本文将从协议设计哲学出发,深入剖析 AG-UI 的技术架构、核心机制、代码实现,并与现有生态中的 MCP 和 A2A 进行横向对比,最后通过实战代码展示如何在实际项目中落地 AG-UI。
一、现状痛点:为什么 AI Agent 的前端交互是一团乱麻
1.1 每个应用都在重复造「交互轮子」
让我们先正视一个现实:今天如果你要构建一个 AI Agent 应用,你大概会经历以下步骤:
- 选一个 Agent 框架:LangGraph、CrewAI、Dify、Mastra……每个都有自己的概念模型和工作流定义
- 构建后端逻辑:定义工具、编排 Agent 行为、处理状态
- 然后在前端,你几乎要从零开始:实现流式输出(Server-Sent Events 还是 WebSocket?)、处理工具调用状态(显示loading?显示参数预览?)、处理 Agent 之间的切换动画、显示错误和重试……
第三步的工作量,比前两步加起来还大。而且每个框架的流式输出格式都不一样。LangGraph 用一种格式输出事件,Dify 用另一种格式,CrewAI 又是一种。你想把前端从 Dify 切换到 LangGraph?好,先把前端的整个事件处理层重写一遍。
这本质上是前端与后端的紧耦合问题。协议层面的碎片化,让整个行业在前端交互上浪费了大量重复劳动。
1.2 现有方案的不足
REST API / WebSocket 私有协议:最常见的方案。每个 AI 应用定义自己的消息格式。你能想到的所有 AI 助手产品——Claude、Copilot、ChatGPT——都对外暴露 API,但没有一个统一的协议来描述「前端应该如何渲染 AI Agent 的实时状态变化」。大家都在用私有格式传输 JSON,然后各自在前端解析这些私有格式。
SSE(Server-Sent Events):很多应用在用 SSE 做流式文本输出,但这只是传输层。SSE 只解决了"服务器主动推送数据给浏览器"的问题,没有解决"推送的数据是什么语义、前端应该如何处理"的问题。换句话说,SSE 是公路,但 AG-UI 是公路上跑的货车的规格说明书。
MCP:MCP 解决的是 Agent → Tools 的问题。Agent 可以通过 MCP 调用外部工具(搜索、代码执行、数据库查询),但它不处理用户在前端界面上的实时交互体验。
A2A:A2A 解决的是 Agent ↔ Agent 的协作问题。一个 Agent 可以把任务委托给另一个 Agent,有标准的状态转移和消息传递机制,但同样不涉及前端 UI 层。
所以,当你的 AI 应用需要让用户看到这样的交互体验时:
- 工具调用开始时显示一个精致的卡片
- 流式输出的文字逐字出现
- 状态在更新时前端有平滑的过渡动画
- Agent 之间切换时前端能感知到"控制权转移"
MCP 和 A2A 都帮不了你。你需要的是 AG-UI。
二、AG-UI 协议的设计哲学
2.1 定位:用户 ↔ Agent 的桥梁
AG-UI 的官方定位非常清晰:Frontend-Application to AI Agent 的通信协议。它的目标是把前端应用和 AI Agent 之间的交互标准化,让前端不再需要为每个后端框架写定制代码。
它的设计哲学可以概括为三个关键词:双向心跳、事件流驱动、框架无关。
2.2 双向心跳:重新理解"交互"的本质
传统 Web 应用是请求-响应模型:客户端发请求,服务器回响应,然后连接关闭或等待下一次请求。这是单向心跳——连接是由客户端主动发起的。
AI Agent 的交互模式完全不同。一个用户请求,可能引发 Agent 的多轮思考、多次工具调用、多个子 Agent 的协作。你不能等 Agent 全部完成再给用户一个结果——用户需要实时看到 Agent 的思考过程。
这催生了一种新的交互模式:双向心跳。客户端(前端)发起一次会话请求,建立一条长连接(通常基于 SSE),之后 Agent 通过这条连接主动推送结构化事件给前端。这些事件包括:流式文本片段、工具调用状态、进度更新、错误通知等等。同时,前端也可以反向发送事件给 Agent,比如用户中断操作、用户提供了额外上下文、用户点击了某个选项。
这种双向心跳机制,让前端和 Agent 之间形成了一条持久的事件流通道,而不是一次性的请求-响应对。
2.3 事件流驱动:结构化优于字符串
AG-UI 最大的设计亮点,是它用结构化事件取代了非结构化的文本流。
传统 SSE 方案传输的是这样的数据:
data: 这是一段流式输出的文字...
data: 继续输出更多内容...
data: {"type": "tool_call", "tool": "search", "params": {...}}
AG-UI 定义了一套标准的事件类型体系,每个事件都有明确的类型标识和标准化的载荷结构:
data: {"type": "TEXT_MESSAGE_CONTENT", "content": "正在搜索...", "index": 0}
data: {"type": "TOOL_CALL_START", "tool": "web_search", "call_id": "call_abc123"}
data: {"type": "STATE_DELTA", "path": ["current_step"], "value": "searching"}
data: {"type": "AGENT_HANDOFF", "from": "planner", "to": "researcher"}
这种设计的优势是致命的:前端可以精确地知道每条消息是什么类型的、应该用什么 UI 组件渲染。不用再自己解析字符串,不用再猜测数据类型,协议本身就是一份前端渲染指南。
2.4 框架无关:一次实现,多端复用
AG-UI 的另一个核心设计原则是框架无关(Framework Agnostic)。协议定义的是语义层的交互格式,而不绑定任何特定的 Agent 框架实现。
这意味着:
- 同一个 AG-UI 前端,可以连接到 Dify 的后端
- 同一个 AG-UI 前端,也可以连接到 LangGraph 的后端
- 同一个 AG-UI 前端,还可以连接到 Mastra 或自研的 Agent 系统
切换后端框架不需要重写前端。协议层抽离了两边的耦合,各自独立演进。
三、核心架构:AG-UI 的完整交互模型
3.1 三层架构
AG-UI 的整体架构可以划分为三层:
第一层:传输层(Transport)
AG-UI 推荐使用 Server-Sent Events (SSE) 作为默认传输协议。SSE 是 HTTP/1.1 的标准扩展,允许服务器通过单个 HTTP 连接持续向客户端推送数据。相比 WebSocket,SSE 更轻量,天然支持 HTTP/2,且可以用标准 HTTP 中间件(Caddy、Nginx、CDN)代理,不需要特殊的协议升级。
同时 AG-UI 也支持 WebSocket 和 Webhook 作为备选传输方式。WebSocket 适合需要双向高频通信的场景;Webhook 则适合 Agent 需要回调到外部系统的场景。
第二层:事件层(Events)
这是 AG-UI 的核心。协议定义了 16 种标准事件类型,每种事件都有明确的前端语义:
| 事件类型 | 前端语义 | 说明 |
|---|---|---|
TEXT_MESSAGE_CONTENT | 文本消息片段 | 用于令牌流式输出,逐字/逐词渲染 |
TEXT_MESSAGE_COMPLETE | 文本消息结束 | 一条完整消息输出完毕 |
TOOL_CALL_START | 工具调用开始 | 显示工具调用卡片,预览参数 |
TOOL_CALL_END | 工具调用结束 | 工具执行结果返回 |
TOOL_CALL_ERROR | 工具调用失败 | 显示错误信息 |
STATE_DELTA | 状态增量更新 | Agent 维护的共享状态发生变化 |
STATE_RESET | 状态重置 | Agent 状态被清空或重置 |
AGENT_HANDOFF | Agent 控制权转移 | 当前 Agent 完成任务,切换到另一个 Agent |
VISUAL_GENERATION_UPDATE | 可视化内容更新 | 生成图片、图表、地图等可视化内容 |
USER_CONTEXT_UPDATE | 用户上下文更新 | 前端向 Agent 提供了额外信息 |
AGENT_SUFFIX | Agent 输出结束标记 | 整个 Agent 流程结束 |
前端根据事件类型选择对应的 UI 组件渲染,这是 AG-UI 实现"协议即 UI 指南"的核心机制。
第三层:会话层(Session)
一个完整的 AG-UI 交互周期是这样的:
- 会话启动:前端通过
POST /session发起一次会话请求,携带用户输入、初始上下文、Agent 配置等信息 - 流连接建立:服务器返回
SSE连接,前端持续监听 - 事件流传输:Agent 持续推送结构化事件,前端实时渲染
- 双向交互:前端可反向发送事件(如用户中断、额外输入)
- 会话结束:Agent 发送
AGENT_SUFFIX事件,流连接关闭
3.2 架构图
┌──────────────┐ POST /session ┌──────────────────┐
│ │ ──────────────────────→│ │
│ Frontend │ │ AI Agent │
│ (React/Vue/ │←── SSE Event Stream ──→│ Backend │
│ Svelte) │ Structured Events │ (LangGraph/ │
│ │ ── User Events ────────→│ CrewAI/Dify) │
│ [UI Layer] │ (bidirectional) │ │
└──────────────┘ └──────────────────┘
│ │
│ 根据事件类型选择组件渲染: │
│ TEXT_MESSAGE_CONTENT → 流式文字 │
│ TOOL_CALL_START → 工具调用卡片 │
│ STATE_DELTA → 状态面板更新 │
│ AGENT_HANDOFF → 切换过渡动画 │
这个架构的关键洞察是:前端只和事件打交道,不关心 Agent 内部是怎么实现的。后端用 LangGraph 还是 CrewAI,对前端完全透明。
四、SDK 详解:从理论到代码
4.1 后端 Python SDK
先来看后端如何集成 AG-UI。官方提供了 Python SDK,安装方式:
pip install ag-ui
创建基础 Agent
from ag_ui.core import (
RunAgentInput,
Message,
Context,
Tool,
State,
)
from ag_ui.encoder import EventEncoder
from ag_ui.events import (
TextMessageContentEvent,
TextMessageCompleteEvent,
ToolCallStartEvent,
ToolCallEndEvent,
AgentSuffixEvent,
)
import json
# 模拟一个简单的工具调用 Agent
class SimpleAgent:
def __init__(self):
self.tools = [
Tool(
name="web_search",
description="Search the web for information",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
)
]
async def run(self, user_message: str, context: Context):
"""主运行循环,yield 事件"""
# 第一步:流式输出思考过程
for chunk in self._think(user_message):
yield TextMessageContentEvent(
content=chunk,
index=0,
is_visible=True
)
# 第二步:调用工具
yield ToolCallStartEvent(
call_id="call_001",
tool="web_search",
input={"query": user_message}
)
# 模拟工具执行
result = await self._execute_tool("web_search", {"query": user_message})
yield ToolCallEndEvent(
call_id="call_001",
output=result
)
# 第三步:基于工具结果输出最终回复
yield TextMessageContentEvent(
content=f"根据搜索结果:{result}",
index=1,
is_visible=True
)
# 第四步:结束
yield AgentSuffixEvent()
def _think(self, message: str):
"""模拟流式思考输出"""
thoughts = ["收到您的请求", "正在分析", "准备调用搜索工具"]
for t in thoughts:
yield f"💭 {t}..."
async def _execute_tool(self, tool_name: str, params: dict):
"""模拟工具执行"""
return f"搜索「{params['query']}」找到 10 条相关结果"
使用 EventEncoder 编码 SSE 响应
from ag_ui.encoder import EventEncoder
encoder = EventEncoder()
async def stream_response(agent: SimpleAgent, user_input: str):
"""将 Agent 事件编码为 SSE 格式"""
context = Context(
session_id="session_123",
user_id="user_456"
)
async for event in agent.run(user_input, context):
# EventEncoder 自动将事件编码为 SSE data 格式
encoded = encoder.encode(event)
yield encoded
# 编码后格式示例:
# data: {"type":"TEXT_MESSAGE_CONTENT","content":"💭 收到您的请求...","index":0,"is_visible":true}
# data: {"type":"TEXT_MESSAGE_CONTENT","content":"💭 正在分析...","index":0,"is_visible":true}
# data: {"type":"TOOL_CALL_START","call_id":"call_001","tool":"web_search","input":{"query":"..."}}
# data: {"type":"TOOL_CALL_END","call_id":"call_001","output":"搜索结果..."}
# data: {"type":"AGENT_SUFFIX"}
完整 FastAPI 集成
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
agent = SimpleAgent()
@app.post("/session")
async def start_session(request: Request):
body = await request.json()
user_input = body.get("input", "")
async def event_generator():
context = Context(session_id=body.get("session_id"))
async for event in agent.run(user_input, context):
encoded = encoder.encode(event)
yield {
"event": "message",
"data": encoded
}
return EventSourceResponse(event_generator())
4.2 前端 TypeScript SDK
前端 SDK 同样简洁。安装方式:
npm install @ag-ui/client
# 或者
pnpm add @ag-ui/client
基础使用:连接 Agent 并渲染事件
import { createClient, AGUIEvent } from '@ag-ui/client';
import React, { useState, useEffect, useRef } from 'react';
// 创建 AG-UI 客户端
const client = createClient({
url: 'https://your-agent-server.com/session',
transport: 'sse', // 默认 SSE,可选 'websocket'
});
// React Hook:封装 AG-UI 会话逻辑
function useAGUISession(initialInput?: string) {
const [messages, setMessages] = useState<Message[]>([]);
const [toolCalls, setToolCalls] = useState<ToolCall[]>([]);
const [agentState, setAgentState] = useState<Record<string, any>>({});
const [status, setStatus] = useState<'idle' | 'connecting' | 'running' | 'done'>('idle');
const sessionRef = useRef<any>(null);
const startSession = async (userInput: string) => {
setStatus('connecting');
// 启动会话
sessionRef.current = client.startSession({
input: userInput,
context: {
timestamp: Date.now(),
},
});
setStatus('running');
// 监听所有事件类型
sessionRef.current.on(AGUIEvent.TEXT_MESSAGE_CONTENT, (event) => {
// 流式文本追加到最新消息
setMessages((prev) => {
const lastMsg = prev[prev.length - 1];
if (lastMsg && lastMsg.index === event.index) {
// 追加到现有消息
return [
...prev.slice(0, -1),
{ ...lastMsg, content: lastMsg.content + event.content }
];
} else {
// 新建消息
return [...prev, {
id: crypto.randomUUID(),
role: 'assistant',
content: event.content,
index: event.index,
}];
}
});
};
sessionRef.current.on(AGUIEvent.TEXT_MESSAGE_COMPLETE, (event) => {
// 单条消息完成,可触发某些 UI 效果(如打字机停止动画)
console.log(`消息 ${event.index} 完成`);
});
sessionRef.current.on(AGUIEvent.TOOL_CALL_START, (event) => {
// 显示工具调用卡片
setToolCalls((prev) => [...prev, {
callId: event.call_id,
tool: event.tool,
input: event.input,
status: 'running',
}]);
});
sessionRef.current.on(AGUIEvent.TOOL_CALL_END, (event) => {
// 更新工具调用状态,显示结果
setToolCalls((prev) =>
prev.map((tc) =>
tc.callId === event.call_id
? { ...tc, status: 'done', output: event.output }
: tc
)
);
});
sessionRef.current.on(AGUIEvent.TOOL_CALL_ERROR, (event) => {
// 显示工具调用错误
setToolCalls((prev) =>
prev.map((tc) =>
tc.callId === event.call_id
? { ...tc, status: 'error', error: event.error }
: tc
)
);
});
sessionRef.current.on(AGUIEvent.STATE_DELTA, (event) => {
// 更新 Agent 共享状态
setAgentState((prev) => ({
...prev,
[event.path.join('.')]: event.value,
}));
});
sessionRef.current.on(AGUIEvent.AGENT_HANDOFF, (event) => {
// Agent 切换,显示过渡动画
console.log(`控制权从 ${event.from} 转移到 ${event.to}`);
});
sessionRef.current.on(AGUIEvent.AGENT_SUFFIX, () => {
setStatus('done');
});
// 错误处理
sessionRef.current.onError((error) => {
console.error('Session error:', error);
setStatus('idle');
});
await sessionRef.current.connect();
};
const sendUserEvent = (payload: any) => {
// 前端反向发送事件给 Agent
sessionRef.current?.sendEvent({
type: 'USER_CONTEXT_UPDATE',
payload,
});
};
const stopSession = () => {
sessionRef.current?.stop();
setStatus('idle');
};
return {
messages,
toolCalls,
agentState,
status,
startSession,
sendUserEvent,
stopSession,
};
}
完整的 Chat UI 组件
// ChatComponent.tsx - 完整的 AG-UI 聊天界面
function ChatComponent() {
const [input, setInput] = useState('');
const [isLoading, setIsLoading] = useState(false);
const {
messages,
toolCalls,
status,
startSession,
stopSession,
} = useAGUISession();
const messagesEndRef = useRef<HTMLDivElement>(null);
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages, toolCalls]);
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim() || status === 'running') return;
setIsLoading(true);
await startSession(input);
setInput('');
setIsLoading(false);
};
return (
<div className="flex flex-col h-screen max-w-3xl mx-auto p-4">
{/* 消息列表 */}
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((msg) => (
<div key={msg.id} className={`flex ${msg.role === 'user' ? 'justify-end' : 'justify-start'}`}>
<div className={`max-w-[70%] rounded-lg px-4 py-2 ${
msg.role === 'user' ? 'bg-blue-500 text-white' : 'bg-gray-100'
}`}>
{msg.content}
</div>
</div>
))}
{/* 工具调用卡片 */}
{toolCalls.map((tc) => (
<div key={tc.callId} className="bg-yellow-50 border border-yellow-200 rounded-lg p-4">
<div className="flex items-center gap-2 mb-2">
<span className="text-sm font-medium text-yellow-800">
🔧 工具调用: {tc.tool}
</span>
{tc.status === 'running' && <span className="animate-spin">⏳</span>}
{tc.status === 'done' && <span>✅</span>}
{tc.status === 'error' && <span>❌</span>}
</div>
<pre className="text-xs bg-gray-100 p-2 rounded overflow-x-auto">
{JSON.stringify(tc.input, null, 2)}
</pre>
{tc.output && (
<div className="mt-2 text-sm text-green-700">
结果: {tc.output}
</div>
)}
</div>
))}
</div>
{/* 输入框 */}
<form onSubmit={handleSubmit} className="flex gap-2">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
disabled={status === 'running'}
placeholder="输入你的问题..."
className="flex-1 border rounded-lg px-4 py-2"
/>
{status === 'running' ? (
<button
type="button"
onClick={stopSession}
className="bg-red-500 text-white px-4 py-2 rounded-lg"
>
停止
</button>
) : (
<button
type="submit"
disabled={!input.trim()}
className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
>
发送
</button>
)}
</form>
</div>
);
}
4.3 与 LangGraph 的深度集成
最有价值的场景之一是将 AG-UI 与 LangGraph 集成。LangGraph 的编译图模型天然支持多步骤、多 Agent 协作,而这正是 AG-UI 事件流最能发挥价值的地方。
from ag_ui.core import RunAgentInput, Context
from ag_ui.encoder import EventEncoder
from ag_ui.events import (
TextMessageContentEvent,
ToolCallStartEvent,
ToolCallEndEvent,
StateDeltaEvent,
AgentHandoffEvent,
AgentSuffixEvent,
)
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator
# 定义 Agent 状态
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
current_agent: str
tool_results: dict
progress: str
llm = ChatOpenAI(model="gpt-4o")
# LangGraph 节点:研究 Agent
def research_node(state: AgentState) -> AgentState:
query = state["messages"][-1].content
# 通过 AG-UI 发送事件
yield TextMessageContentEvent(
content="🔍 开始搜索相关资料...",
index=0,
is_visible=True
)
yield StateDeltaEvent(
path=["progress"],
value="researching",
old_value=""
)
# 模拟搜索工具调用
yield ToolCallStartEvent(
call_id="search_001",
tool="web_search",
input={"query": query}
)
# 实际执行搜索
result = f"关于「{query}」的研究资料:..."
yield ToolCallEndEvent(
call_id="search_001",
output=result
)
yield StateDeltaEvent(
path=["tool_results", "research"],
value=result,
old_value=None
)
return {
"messages": [AIMessage(content=f"研究结果:{result}")],
"current_agent": "writer",
"progress": "writing"
}
# LangGraph 节点:写作 Agent
def writer_node(state: AgentState) -> AgentState:
research_result = state["tool_results"].get("research", "")
yield AgentHandoffEvent(
from_agent="researcher",
to_agent="writer",
reason="研究阶段完成,开始撰写报告"
)
yield TextMessageContentEvent(
content="✍️ 开始撰写报告...",
index=1,
is_visible=True
)
report = f"基于研究资料,以下是我的分析报告:{research_result[:100]}..."
yield TextMessageContentEvent(
content=report,
index=2,
is_visible=True
)
yield AgentSuffixEvent()
return {
"messages": [AIMessage(content=report)],
"current_agent": "done",
"progress": "complete"
}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("research", research_node)
graph.add_node("writer", writer_node)
graph.set_entry_point("research")
graph.add_edge("research", "writer")
graph.add_edge("writer", END)
compiled_graph = graph.compile()
# SSE 流生成器
async def stream_langgraph_events(user_input: str):
encoder = EventEncoder()
async for event in compiled_graph.astream_events(
{"messages": [HumanMessage(content=user_input)], "current_agent": "researcher"}
):
# LangGraph 事件转换为 AG-UI 事件
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
ag_event = TextMessageContentEvent(
content=token,
index=0,
is_visible=True
)
yield encoder.encode(ag_event)
4.4 与 Dify 的集成
Dify 作为国内最流行的开源 AI 应用平台,已有社区贡献者实现了 AG-UI 集成。以下是一个典型场景:
// dify-ag-ui-client.ts
import { createClient } from '@ag-ui/client';
const client = createClient({
// Dify 的 SSE 端点
url: 'https://your-dify-instance/v1/chat-messages',
transport: 'sse',
headers: {
'Authorization': `Bearer ${DIFY_API_KEY}`,
},
});
// Dify 的响应需要转换层(因为 Dify 的事件格式与 AG-UI 不同)
function createDifyEventAdapter() {
return {
onMessage(data: any) {
const parsed = JSON.parse(data);
switch (parsed.event) {
case 'message':
return {
type: 'TEXT_MESSAGE_CONTENT',
content: parsed.answer,
index: 0,
};
case 'message_end':
return {
type: 'AGENT_SUFFIX',
stats: parsed.stats,
};
case 'message_file':
return {
type: 'VISUAL_GENERATION_UPDATE',
url: parsed.url,
mime_type: parsed.type,
};
case 'agent_thought':
return {
type: 'TOOL_CALL_START',
call_id: parsed.id,
tool: 'llm_thought',
input: { thought: parsed.thought },
};
default:
return null;
}
},
};
}
五、生态定位:AG-UI 与 MCP、A2A 的协同关系
5.1 三角协作模型
MCP、A2A、AG-UI 三个协议并非竞争关系,而是互补关系,共同构成了 AI Agent 通信协议栈的完整三角:
┌─────────────────┐
│ User │
│ (Frontend UI) │
└────────┬────────┘
│ AG-UI 协议
│ 用户 ↔ Agent 双向交互
│ 流式 UI 更新
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ AI Agent A │◄──►│ AI Agent B │
│ │ A2A │ │
└────────┬────────┘ └────────┬────────┘
│ │
│ MCP │ MCP
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ MCP Server │ │ MCP Server │
│ (Web Search) │ │ (Code Runner) │
└─────────────────┘ └─────────────────┘
在这个架构中:
- AG-UI:负责 用户 ↔ Agent 的交互标准化(实时流、双向事件、UI 渲染指南)
- A2A:负责 Agent ↔ Agent 的协作标准化(任务委托、状态传递、角色切换)
- MCP:负责 Agent ↔ Tools 的工具调用标准化(工具发现、参数传递、结果返回)
5.2 实际场景中的协同
举一个实际的多 Agent 协作场景:用户在前端说"帮我分析一下特斯拉股票并生成一份报告"。
通过 AG-UI(用户 → Agent):
前端发送用户请求,建立 SSE 连接,实时显示 Agent 的思考过程。
通过 A2A(Agent → Agent):
主 Agent 将任务分解,委托给两个子 Agent:
- 子 Agent A:负责市场数据分析
- 子 Agent B:负责财务数据分析
两者通过 A2A 协议交换中间结果。
通过 MCP(Agent → Tools):
- 调用
web_search工具获取最新新闻 - 调用
financial_data工具获取财报 - 调用
code_interpreter工具运行数据分析脚本
通过 AG-UI(Agent → 用户):
TOOL_CALL_START→ 前端显示"正在获取市场数据..."STATE_DELTA→ 前端更新进度条TEXT_MESSAGE_CONTENT→ 流式输出分析报告VISUAL_GENERATION_UPDATE→ 生成图表
三个协议各司其职,没有任何重叠。这正是协议分层设计的优雅之处。
5.3 与 MCP Server 的集成
AG-UI 的前端不需要直接理解 MCP 协议。MCP Server 的调用发生在后端 Agent 层,前端只看到 AG-UI 事件。
from ag_ui.core import Tool
from ag_ui.events import ToolCallStartEvent, ToolCallEndEvent
from langchain_mcp_adapters.client import MultiServerMCPClient
# MCP Server 配置
mcp_client = MultiServerMCPClient({
"web_search": {
"command": "python",
"args": ["/path/to/search_server.py"],
"transport": "stdio",
},
})
async def agent_with_mcp_tools(user_query: str):
"""集成 MCP 工具调用的 Agent"""
tools = await mcp_client.get_tools()
# AG-UI: 通知前端开始工具调用
yield ToolCallStartEvent(
call_id="mcp_search",
tool="web_search",
input={"query": user_query}
)
# 执行 MCP 工具
result = await tools["web_search"].invoke({"query": user_query})
# AG-UI: 通知前端工具执行完成
yield ToolCallEndEvent(
call_id="mcp_search",
output=str(result)
)
六、性能优化与工程实践
6.1 前端渲染性能:防抖与批量处理
在高频事件流场景下(如 LLM 的流式 token 输出),每个 token 都触发一次 React 状态更新是不现实的。需要实现防抖机制:
// 事件批处理:每 16ms(约60fps)批量处理一次
class EventBatcher {
private queue: AGUIEvent[] = [];
private rafId: number | null = null;
private onFlush: (events: AGUIEvent[]) => void;
constructor(onFlush: (events: AGUIEvent[]) => void) {
this.onFlush = onFlush;
}
add(event: AGUIEvent) {
this.queue.push(event);
this.scheduleFlush();
}
private scheduleFlush() {
if (this.rafId !== null) return;
this.rafId = requestAnimationFrame(() => {
const batch = [...this.queue];
this.queue = [];
this.rafId = null;
this.onFlush(batch);
});
}
}
// 使用
const batcher = new EventBatcher((events) => {
// 批量更新状态
setMessages((prev) => {
const updated = [...prev];
for (const event of events) {
if (event.type === 'TEXT_MESSAGE_CONTENT') {
// 合并文本片段
const last = updated[updated.length - 1];
if (last && last.index === event.index) {
updated[updated.length - 1] = {
...last,
content: last.content + event.content
};
} else {
updated.push({ id: crypto.randomUUID(), content: event.content, index: event.index });
}
}
}
return updated;
});
});
session.on('TEXT_MESSAGE_CONTENT', (e) => batcher.add(e));
6.2 连接管理与自动重连
SSE 连接可能因网络波动而中断,需要实现自动重连逻辑:
class ReconnectingAGUIClient {
private url: string;
private maxRetries = 5;
private retryDelay = 1000;
private session: any;
private retryCount = 0;
async connect(initialInput: string) {
while (this.retryCount < this.maxRetries) {
try {
this.session = createClient({ url: this.url });
await this.session.startSession({ input: initialInput });
this.retryCount = 0; // 重置重试计数
return;
} catch (error) {
this.retryCount++;
console.log(`连接失败,${this.retryDelay}ms 后重试...`);
await new Promise(r => setTimeout(r, this.retryDelay));
this.retryDelay = Math.min(this.retryDelay * 2, 30000); // 指数退避,上限30s
}
}
throw new Error('最大重连次数已达上限');
}
}
6.3 安全考量
身份验证与授权:
AG-UI 会话需要携带认证信息,建议使用短期令牌:
const client = createClient({
url: '/api/agent/session',
transport: 'sse',
headers: {
'Authorization': `Bearer ${getShortLivedToken()}`,
'X-Session-ID': getSessionId(),
},
});
流式输出的 XSS 防护:
AG-UI 的 TEXT_MESSAGE_CONTENT 事件可能包含用户可控的文本内容,必须做转义处理:
import { escapeHtml } from '@ag-ui/client/utils';
session.on('TEXT_MESSAGE_CONTENT', (event) => {
const safeContent = escapeHtml(event.content);
// 然后再渲染
setMessages((prev) => [...prev, { content: safeContent }]);
});
工具调用参数校验:
后端收到前端事件时,必须对参数进行严格校验:
from pydantic import BaseModel, ValidationError
from ag_ui.core import Tool
class WebSearchInput(BaseModel):
query: str
max_results: int = 10
async def handle_tool_event(event: ToolCallStartEvent):
try:
params = WebSearchInput(**event.input)
result = await web_search(params.query, max_results=params.max_results)
except ValidationError as e:
yield ToolCallErrorEvent(
call_id=event.call_id,
error=str(e),
code="INVALID_PARAMS"
)
七、现状与未来:AG-UI 的演进路线
7.1 当前生态状态
截至 2026 年初,AG-UI 协议的生态已经初具规模:
- GitHub 仓库:
ag-ui-protocol/ag-ui已有活跃的社区贡献 - SDK 支持:官方提供 TypeScript(前端)和 Python(后端)SDK,Java SDK 由社区贡献
- 框架集成:LangGraph、Dify、CrewAI、Mastra 均有社区插件或示例代码
- 文档:官方文档站点
docs.ag-ui.com提供了完整的协议规范和集成指南
7.2 面临的挑战
协议稳定性:作为新兴协议,AG-UI 的事件类型体系还在演进中。16 种标准事件是否能覆盖所有场景?未来是否会引入破坏性变更?这些都是早期采用者需要关注的问题。
采纳成本:对于已有大量自定义 SSE 实现的项目,迁移到 AG-UI 需要重构前后端的事件处理层。这不是一个小工程,需要组织层面认可协议的价值。
服务端支持:目前主流的 Agent 框架(尤其是 LangGraph)对 AG-UI 的原生支持还不够深入,社区插件的质量参差不齐。真正无缝的集成需要框架作者在底层架构上的配合。
7.3 演进方向预测
从当前的发展趋势来看,AG-UI 未来可能在以下方向演进:
- 多模态事件扩展:增加对语音、视频、3D 可视化等模态的标准事件定义
- 工具调用标准化:与 MCP 深度整合,形成 AG-UI(交互层)+ MCP(工具层)的标准组合
- 性能基准测试:建立协议层面的性能基准,规范延迟、吞吐量等关键指标
- 调试工具:类似于 Chrome DevTools 的 AG-UI 事件流调试器,帮助开发者可视化 Agent 的运行过程
八、实战:从零搭建一个 AG-UI 增强的 AI 研究助手
8.1 项目结构
ag-ui-research-assistant/
├── backend/
│ ├── agent/
│ │ ├── __init__.py
│ │ ├── researcher.py # 研究 Agent 核心逻辑
│ │ ├── tools.py # MCP 工具定义
│ │ └── graph.py # LangGraph 工作流
│ ├── api/
│ │ ├── __init__.py
│ │ └── routes.py # FastAPI 路由
│ └── main.py
├── frontend/
│ ├── src/
│ │ ├── components/
│ │ │ ├── ChatWindow.tsx
│ │ │ ├── ToolCallCard.tsx
│ │ │ └── ProgressPanel.tsx
│ │ ├── hooks/
│ │ │ └── useAGUI.ts
│ │ └── utils/
│ │ └── eventAdapter.ts
│ └── package.json
├── docker-compose.yml
└── README.md
8.2 完整的后端实现
# backend/agent/researcher.py
from ag_ui.core import RunAgentInput, Context, Tool, State
from ag_ui.encoder import EventEncoder
from ag_ui.events import (
TextMessageContentEvent, TextMessageCompleteEvent,
ToolCallStartEvent, ToolCallEndEvent, ToolCallErrorEvent,
StateDeltaEvent, AgentHandoffEvent, AgentSuffixEvent,
VisualGenerationUpdateEvent
)
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
import json
class ResearchState(TypedDict):
query: str
search_results: dict
analysis: str
report: str
current_step: str
encoder = EventEncoder()
async def search_step(state: ResearchState):
"""搜索阶段"""
query = state["query"]
yield TextMessageContentEvent(
content=f"🔍 正在搜索:「{query}」...",
index=0,
is_visible=True
)
yield StateDeltaEvent(
path=["current_step"], value="searching", old_value=""
)
yield ToolCallStartEvent(
call_id="search_001",
tool="web_search",
input={"query": query, "max_results": 5}
)
results = {
"sources": [
{"title": "相关研究文献A", "url": "https://...", "snippet": "..."},
{"title": "行业分析报告B", "url": "https://...", "snippet": "..."},
]
}
yield ToolCallEndEvent(
call_id="search_001",
output=json.dumps(results)
)
yield StateDeltaEvent(
path=["search_results"], value=results, old_value=None
)
yield TextMessageCompleteEvent(index=0)
return {"search_results": results, "current_step": "analyzing"}
async def analyze_step(state: ResearchState):
"""分析阶段"""
results = state["search_results"]
yield AgentHandoffEvent(
from_agent="searcher",
to_agent="analyzer",
reason="搜索完成,开始分析"
)
yield StateDeltaEvent(
path=["current_step"], value="analyzing", old_value="searching"
)
yield TextMessageContentEvent(
content="📊 正在分析收集到的资料...",
index=1,
is_visible=True
)
# 模拟分析过程
analysis = "经过分析,相关资料的主要观点如下:..."
yield VisualGenerationUpdateEvent(
content_type="chart",
url="/tmp/analysis_chart.png"
)
yield StateDeltaEvent(
path=["analysis"], value=analysis, old_value=""
)
return {"analysis": analysis, "current_step": "reporting"}
async def report_step(state: ResearchState):
"""报告生成阶段"""
analysis = state["analysis"]
yield AgentHandoffEvent(
from_agent="analyzer",
to_agent="writer",
reason="分析完成,开始撰写报告"
)
yield StateDeltaEvent(
path=["current_step"], value="reporting", old_value="analyzing"
)
yield TextMessageContentEvent(
content="📝 正在撰写研究报告...",
index=2,
is_visible=True
)
report = f"""# 研究报告
## 研究主题
{state['query']}
## 主要发现
{analysis}
## 参考来源
"""
for i, src in enumerate(state['search_results']['sources']):
report += f"{i+1}. [{src['title']}]({src['url']})\n"
yield TextMessageContentEvent(
content=report,
index=3,
is_visible=True
)
yield AgentSuffixEvent()
return {"report": report, "current_step": "complete"}
# 构建 LangGraph
graph = StateGraph(ResearchState)
graph.add_node("search", search_step)
graph.add_node("analyze", analyze_step)
graph.add_node("report", report_step)
graph.set_entry_point("search")
graph.add_edge("search", "analyze")
graph.add_edge("analyze", "report")
graph.add_edge("report", END)
compiled = graph.compile()
# SSE 流生成
async def research_stream(query: str):
async for event in compiled.astream_events({"query": query, "current_step": ""}):
if hasattr(event.get('data'), 'chunk'):
chunk = event['data']['chunk']
if chunk:
# 这里应该做事件类型映射
# 简化处理,直接 yield
pass
8.3 前端完整实现
// frontend/src/App.tsx
import React, { useState, useRef, useEffect } from 'react';
import { createClient, AGUIEvent } from '@ag-ui/client';
const client = createClient({
url: 'http://localhost:8000/session',
transport: 'sse',
});
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
index: number;
}
interface ToolCall {
callId: string;
tool: string;
input: any;
output?: string;
status: 'running' | 'done' | 'error';
}
function App() {
const [messages, setMessages] = useState<Message[]>([]);
const [toolCalls, setToolCalls] = useState<ToolCall[]>([]);
const [currentStep, setCurrentStep] = useState('');
const [input, setInput] = useState('');
const [isRunning, setIsRunning] = useState(false);
const sessionRef = useRef<any>(null);
const messagesEndRef = useRef<HTMLDivElement>(null);
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages, toolCalls]);
const setupSession = (session: any) => {
session.on(AGUIEvent.TEXT_MESSAGE_CONTENT, (e: any) => {
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last && last.index === e.index && last.role === 'assistant') {
return [...prev.slice(0, -1), { ...last, content: last.content + e.content }];
}
return [...prev, { id: crypto.randomUUID(), role: 'assistant', content: e.content, index: e.index }];
});
});
session.on(AGUIEvent.TOOL_CALL_START, (e: any) => {
setToolCalls((prev) => [...prev, { callId: e.call_id, tool: e.tool, input: e.input, status: 'running' }]);
});
session.on(AGUIEvent.TOOL_CALL_END, (e: any) => {
setToolCalls((prev) =>
prev.map((tc) => tc.callId === e.call_id ? { ...tc, status: 'done', output: e.output } : tc)
);
});
session.on(AGUIEvent.STATE_DELTA, (e: any) => {
if (e.path?.includes('current_step')) {
setCurrentStep(e.value);
}
});
session.on(AGUIEvent.VISUAL_GENERATION_UPDATE, (e: any) => {
setMessages((prev) => [
...prev,
{ id: crypto.randomUUID(), role: 'assistant', content: `[可视化: ${e.content_type}]`, index: -1 }
]);
});
session.on(AGUIEvent.AGENT_SUFFIX, () => {
setIsRunning(false);
});
};
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim() || isRunning) return;
setMessages([{ id: crypto.randomUUID(), role: 'user', content: input, index: -1 }]);
setToolCalls([]);
setCurrentStep('connecting');
setIsRunning(true);
sessionRef.current = client.startSession({ input });
setupSession(sessionRef.current);
await sessionRef.current.connect();
setInput('');
};
const stopSession = () => {
sessionRef.current?.stop();
setIsRunning(false);
};
const stepLabels: Record<string, string> = {
'': '等待输入',
'connecting': '连接中...',
'searching': '🔍 搜索中',
'analyzing': '📊 分析中',
'reporting': '📝 撰写报告',
'complete': '✅ 完成',
};
return (
<div className="min-h-screen bg-gray-50 p-4">
<div className="max-w-3xl mx-auto bg-white rounded-xl shadow-lg overflow-hidden">
{/* Header */}
<div className="bg-gradient-to-r from-blue-600 to-purple-600 text-white px-6 py-4">
<h1 className="text-xl font-bold">🔬 AI 研究助手</h1>
<p className="text-sm opacity-80">Powered by AG-UI 协议</p>
</div>
{/* Progress Bar */}
{isRunning && (
<div className="bg-gray-100 px-6 py-3 border-b">
<div className="flex items-center gap-3">
<div className="flex gap-2">
{['searching', 'analyzing', 'reporting'].map((step) => (
<div key={step} className="flex items-center gap-1 text-sm">
<div className={`w-3 h-3 rounded-full ${
step === currentStep ? 'bg-blue-500 animate-pulse' :
['searching', 'analyzing', 'reporting'].indexOf(currentStep) >
['searching', 'analyzing', 'reporting'].indexOf(step) ? 'bg-green-500' : 'bg-gray-300'
}`} />
<span className={step === currentStep ? 'font-bold' : 'text-gray-500'}>
{stepLabels[step]}
</span>
</div>
))}
</div>
{currentStep && (
<span className="ml-auto text-blue-600 text-sm font-medium">
{stepLabels[currentStep]}
</span>
)}
</div>
</div>
)}
{/* Messages */}
<div className="h-[500px] overflow-y-auto p-6 space-y-4">
{messages.length === 0 && (
<div className="text-center text-gray-400 mt-20">
<div className="text-4xl mb-4">🤖</div>
<p>输入你的研究问题,我来帮你分析</p>
</div>
)}
{messages.map((msg) => (
<div key={msg.id} className={`flex ${msg.role === 'user' ? 'justify-end' : 'justify-start'}`}>
<div className={`max-w-[80%] rounded-2xl px-4 py-3 ${
msg.role === 'user'
? 'bg-blue-500 text-white'
: msg.index === -1
? 'bg-purple-100 text-purple-800'
: 'bg-gray-100 text-gray-800'
}`}>
{msg.role === 'assistant' && msg.index >= 0 && (
<div className="text-xs text-gray-400 mb-1">
{stepLabels[msg.index === 0 ? 'searching' : msg.index === 1 ? 'analyzing' : 'reporting']}
</div>
)}
<pre className="whitespace-pre-wrap text-sm font-sans">{msg.content}</pre>
</div>
</div>
))}
{/* Tool Calls */}
{toolCalls.map((tc) => (
<div key={tc.callId} className="ml-4 border-l-2 border-yellow-400 pl-4">
<div className="flex items-center gap-2 mb-1">
<span className="text-sm font-bold text-yellow-700">
🔧 {tc.tool}
</span>
{tc.status === 'running' && (
<span className="text-xs bg-yellow-100 text-yellow-700 px-2 py-0.5 rounded animate-pulse">
运行中
</span>
)}
{tc.status === 'done' && (
<span className="text-xs bg-green-100 text-green-700 px-2 py-0.5 rounded">✅ 完成</span>
)}
{tc.status === 'error' && (
<span className="text-xs bg-red-100 text-red-700 px-2 py-0.5 rounded">❌ 错误</span>
)}
</div>
<div className="text-xs text-gray-500 mb-1">参数:</div>
<pre className="text-xs bg-gray-800 text-green-400 p-2 rounded overflow-x-auto">
{JSON.stringify(tc.input, null, 2)}
</pre>
{tc.output && (
<>
<div className="text-xs text-gray-500 mt-2 mb-1">结果:</div>
<pre className="text-xs bg-gray-100 p-2 rounded overflow-x-auto max-h-32">
{tc.output}
</pre>
</>
)}
</div>
))}
<div ref={messagesEndRef} />
</div>
{/* Input */}
<div className="border-t p-4">
<form onSubmit={handleSubmit} className="flex gap-2">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
disabled={isRunning}
placeholder="例如:分析 2025 年 AI Agent 市场的发展趋势"
className="flex-1 border rounded-lg px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500 disabled:bg-gray-100"
/>
{isRunning ? (
<button
type="button"
onClick={stopSession}
className="bg-red-500 hover:bg-red-600 text-white px-6 py-2 rounded-lg transition-colors"
>
停止
</button>
) : (
<button
type="submit"
disabled={!input.trim()}
className="bg-blue-500 hover:bg-blue-600 text-white px-6 py-2 rounded-lg transition-colors disabled:opacity-50"
>
研究
</button>
)}
</form>
</div>
</div>
</div>
);
}
export default App;
九、总结与展望
AG-UI 协议的出现,标志着 AI Agent 生态从"后端协议标准化"进入了"全栈协议标准化"的新阶段。MCP 标准化了工具调用,A2A 标准化了 Agent 协作,而 AG-UI 终于把用户体验层也纳入了标准化框架。
对于框架作者而言,拥抱 AG-UI 意味着一次投入,多端受益——只需要实现一次 AG-UI 事件编码器,前端就能跨框架复用。
对于应用开发者而言,AG-UI 让 AI 应用的前端开发从"手工作坊"升级为"流水线工厂"——用标准协议连接前后端,不用再为每个新框架重写适配层。
对于终端用户而言,AG-UI 的标准化交互体验意味着更流畅、更一致的 AI 应用使用体验——无论底层用的是什么框架,用户看到的都是精心设计的实时交互。
当然,协议的生命力最终取决于生态的采纳程度。MCP 之所以能快速普及,是因为 Anthropic 的背书和 Claude 的巨大用户量。AG-UI 需要更多头部玩家的加入,才能真正成为前端- Agent 交互的事实标准。
但无论如何,方向是对的。当 AI Agent 从实验室走向千家万户,当 AI 应用从极客玩具变成日常工具,交互体验的标准化是不可绕过的一步。AG-UI,就是这一步的起点。
参考资源
- AG-UI 官方文档:https://docs.ag-ui.com/introduction
- AG-UI GitHub 仓库:https://github.com/ag-ui-protocol/ag-ui
- MCP 协议规范:https://modelcontextprotocol.io/
- A2A 协议:https://github.com/A2A-Protocol/a2a
- LangGraph 集成示例:https://github.com/langchain-ai/langgraph
- Dify 平台:https://dify.ai/