A2A 协议深度解析:当多 Agent 系统告别「战国时代」——从协议原理到生产级实战完全指南(2026)
引言
2025年4月,Google 在 Google Cloud Next 大会上发布了一个看似低调、实则意义深远的协议——A2A(Agent-to-Agent Protocol)。同年6月,整个项目捐赠给 Linux Foundation,由 AWS、Cisco、Google、Microsoft、Salesforce、SAP 等八大厂商联合维护。截至2026年,已有超过150家组织支持该协议。
但大多数开发者对 A2A 的认知仍然停留在"A2A是给多 Agent 用的协议"这个层面,对它的设计哲学、核心数据模型、与 MCP 的关系、以及如何在生产环境中落地,仍然一头雾水。
本文从协议诞生的深层动机讲起,深入解析 A2A 的五大设计原则、逐字段拆解核心数据模型(Agent Card、Task、Message、Artifact),并用一套可以实际运行的 Python 代码演示完整的 A2A 交互流程。最后,我们会对比 MCP 与 A2A,帮助你理解这两个协议如何互补,共同构成 AI Agent 生态的通信基础设施。
一、为什么需要 A2A?多 Agent 系统的「战国时代」之痛
1.1 碎片化困境
在 A2A 出现之前,企业构建多 Agent 系统时面临一个根本性的碎片化问题。想象这样一个场景:
你有一个部署在 Google Vertex AI 上的"招聘协调 Agent",需要调用三件事:
- 从 Salesforce 的 Agent 拿候选人列表
- 从 ServiceNow 的 Agent 安排面试
- 从第三方背调服务的 Agent 触发背景调查
在 A2A 之前,每一对 Agent 之间都需要写一套定制的"胶水代码":Salesforce Agent 有自己的消息格式,ServiceNow Agent 有自己的 API 风格,背调服务又是另一套。每增加一个新 Agent,就要再写一套新的连接层。团队时间大量消耗在"翻译"上,而不是业务逻辑上。
这像极了 REST API 出现之前的 RPC 时代——每个厂商都有自己的序列化格式和调用约定,互操作性几乎为零。
1.2 MCP 解决了 Agent→工具,那 Agent→Agent 呢?
你可能已经熟悉 MCP(Model Context Protocol)——Anthropic 在 2024年底发布的协议,专门解决 Agent 与外部工具(数据库、搜索、API) 的连接问题。MCP 就像是"AI 世界的 USB 接口",让任何 Agent 都能用标准方式调用工具。
但 MCP 有一个天然局限:它只定义 Agent 如何调用工具,不定义 Agent 如何与其他 Agent 协作。当你在一个复杂系统中需要多个专业 Agent(代码 Agent、测试 Agent、分析 Agent)协同工作时,MCP 就力不从心了。
A2A 解决的就是这个空白:Agent 如何发现彼此、如何交换任务、如何同步状态、如何在长时间运行的工作流中保持通信。
1.3 跨框架互操作:A2A 的核心价值
A2A 用一句话解决碎片化问题:定义一个所有 Agent 都说的公共语言。
只要两个 Agent 都支持 A2A,不管它们:
- 用什么框架实现(LangChain、AutoGen、CrewAI、自己写的)
- 部署在哪个平台(AWS、Azure、GCP、私有 IDC)
- 用什么语言编写(Python、Go、TypeScript、Rust)
都能直接通信。
这意味着企业可以把来自不同供应商的 Agent 组合在一起工作,就像 REST API 让不同公司的服务可以互相调用一样。
二、A2A 的五大设计原则
深入理解 A2A 之前,需要先理解它的设计哲学。协议设计者没有发明新轮子,而是做了大量克制而深思熟虑的选择。
原则一:Agent 是不透明的黑盒(Opaque Agents)
A2A 的设计前提是:你不应该、也不需要知道对方 Agent 的内部实现。
它用什么模型?怎么推理?有没有记忆?代码是什么语言写的?这些都不重要。A2A 只定义两个 Agent 之间交换什么,而不是内部怎么运行。
这与 SOAP/WSDL 时代的设计思路截然不同——WSDL 要求服务提供方暴露完整的接口签名和服务描述,而 A2A 只要求暴露"我能做什么"(能力声明),不需要暴露"我是怎么做的"。
这一点是跨厂商互操作的根本保障。你不需要关心 Salesforce 的 Agent 是用 GPT-4 还是 Claude 实现的,也不需要关心 ServiceNow 用的是什么推理框架。
原则二:建在已有标准上
A2A 刻意没有发明新的传输协议。它直接复用:
| 技术 | 用途 |
|---|---|
| HTTP/HTTPS | 传输层 |
| JSON-RPC 2.0 | 消息格式 |
| Server-Sent Events(SSE) | 流式推送 |
| OAuth 2.0 / OpenID Connect / API Key / mTLS | 认证 |
这意味着任何熟悉 Web 开发的工程师都不需要学新东西。HTTP 客户端、SSE 处理、JSON 解析——这些技能可以直接平移到 A2A 开发中。
原则三:任务(Task)第一
A2A 里所有工作都围绕 Task 展开。Agent 之间不是在"聊天",而是在"委托任务"和"完成任务"。
Task 有明确的生命周期状态,可以被查询、取消、订阅更新。这让长时间运行的复杂工作流变得可管理——你不会陷入无限等待,也可以在任何时刻检查进度或主动取消。
原则四:能力通过 Agent Card 公开声明
每个 Agent 在固定路径公开一份 JSON 文档(/.well-known/agent.json),描述自己能做什么、支持哪些输入输出格式、需要什么认证方式。其他 Agent 读取这份文档来决定是否把任务交给它。
这是发现机制,类似 OpenAPI 规范之于 REST API。
原则五:支持多种交互模式
A2A 同时支持四种交互模式,Agent 根据自身能力声明支持哪些,客户端按需选择:
- 同步请求-响应:短任务立即返回结果
- 异步轮询:发出去后定期来查状态
- 流式 SSE:长任务实时推送进度
- Push Notification:完成后主动回调
三、核心数据模型:逐字段深度拆解
3.1 Agent Card:Agent 的「名片」
每个遵循 A2A 协议的 Agent 必须在 /.well-known/agent.json 路径上托管一份 Agent Card。这是发现机制的入口。
{
"name": "财务分析 Agent",
"description": "专业处理财务数据分析、报表生成和趋势预测任务",
"version": "1.2.0",
"url": "https://finance-agent.example.com/a2a",
"provider": {
"organization": "FinTech Corp",
"url": "https://fintechcorp.example.com"
},
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": ["text", "data"],
"defaultOutputModes": ["text", "data", "file"],
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
},
"security": [{ "bearerAuth": [] }],
"skills": [
{
"id": "financial-analysis",
"name": "财务数据分析",
"description": "对财务数据进行多维度分析,输出结构化报告",
"tags": ["finance", "analysis", "reporting"],
"examples": [
"分析 Q3 的营收趋势",
"对比各部门的预算执行情况"
],
"inputModes": ["text", "data"],
"outputModes": ["text", "data", "file"]
},
{
"id": "forecast",
"name": "财务预测",
"description": "基于历史数据进行未来财务预测",
"tags": ["finance", "forecast", "ml"],
"inputModes": ["data"],
"outputModes": ["data", "file"]
}
]
}
逐字段解析:
| 字段 | 含义 | 开发者关注点 |
|---|---|---|
url | Agent 的 A2A 服务端点 | 所有 JSON-RPC 请求都发到这里 |
capabilities.streaming | 是否支持 SSE 流式响应 | true 才可以用 message/stream |
capabilities.pushNotifications | 是否支持完成后主动回调 | true 才可以用 Push Notification |
defaultInputModes | 支持哪些输入类型 | text / data / file / audio / video |
securitySchemes | 认证方案格式 | 与 OpenAPI 3.0 Security Scheme 一致 |
skills | Agent 能力清单 | 每个 skill 有 id、tags、examples,客户端据此匹配任务 |
为什么 skills 字段如此重要?
你可能有一个"数据分析 Agent",但它的 skills 列表里可能有"SQL查询"、"可视化"、"报告生成"三个不同的能力。客户端不应该把"帮我写一封商务邮件"这种任务交给数据分析 Agent——即使它的名字叫"数据分析 Agent"。通过 skills 的 tags 和 examples,客户端可以精确匹配。
# 伪代码:客户端根据 skills 做任务路由
def should_delegate(task: str, agent_card: AgentCard) -> bool:
for skill in agent_card.skills:
# 用简单关键词匹配,实际可用 embedding
if any(keyword in task for keyword in skill['tags']):
return True
if any(example in task for example in skill['examples']):
return True
return False
3.2 Task:工作的基本单元
Task 是 A2A 里最核心的对象,贯穿整个交互生命周期:
{
"id": "363422be-b0f9-4692-a24d-278670e7c7f1",
"contextId": "c295ea44-7543-4f78-b524-7a38915ad6e4",
"status": {
"state": "completed",
"timestamp": "2025-04-17T17:47:09Z",
"message": {
"role": "agent",
"parts": [
{ "kind": "text", "text": "分析完成,详见 artifact。" }
]
}
},
"artifacts": [],
"history": []
}
核心字段:
id:Task 的唯一标识(UUID)。用于后续查询(tasks/get)和取消(tasks/cancel)contextId:会话上下文 ID。同一个多轮对话里的所有 Task 共享同一个 contextId。可以把taskId理解为"这次具体做的事",contextId理解为"整个对话脉络"status.state:当前状态,取值如下
Task 状态机(完整生命周期):
submitted ──→ working ──→ completed
│
├──→ failed
│
├──→ input-required ──→ working ──→ completed
│ │
│ └──→ failed
│
├──→ canceled(任意时刻客户端主动取消)
└──→ rejected(服务端拒绝执行)
input-required 状态是多轮交互的关键。 Agent 处理到一半发现需要更多信息,就把 Task 切到这个状态,告诉客户端"我还需要你告诉我 X"。客户端补充信息后,用同一个 taskId 继续发送,Task 重新进入 working。
这在很多实际场景中非常有用:
- 订票 Agent:需要出发地、目的地、日期三个信息,但用户只给了"帮我订票"
- 代码审查 Agent:需要知道代码是用什么语言、有什么安全约束
- 报告生成 Agent:需要用户确认报告的受众是谁、语气应该正式还是轻松
3.3 Message:通信的载体
Message 是客户端和 Agent 之间实际传递的内容:
{
"role": "user",
"parts": [
{
"kind": "text",
"text": "请分析附件中的 Q3 财务数据,重点看毛利率变化趋势"
},
{
"kind": "file",
"file": {
"name": "q3_finance.xlsx",
"mimeType": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"data": "<base64编码>"
}
}
],
"messageId": "9229e770-767c-417b-a0b0-f0741243c589",
"taskId": "363422be-b0f9-4692-a24d-278670e7c7f1",
"contextId": "c295ea44-7543-4f78-b524-7a38915ad6e4"
}
Part 的 kind 类型:
| kind | 说明 | 典型场景 |
|---|---|---|
text | 纯文本 | 用户指令、Agent 回复 |
file | 文件(含 base64 或 URL) | 上传代码文件、数据文件、图片 |
data | 任意 JSON 结构数据 | 结构化参数、API 响应、查询结果 |
设计洞察: A2A 的 Part 模型与 Anthropic 的消息格式一脉相承,这意味着 AI Agent 内部的消息可以直接序列化为 A2A Message,无需额外的格式转换。这是一个非常聪明的设计决策。
3.4 Artifact:任务输出物
Artifact 是 Agent 完成任务后交付的成果:
{
"artifactId": "9b6934dd-37e3-4eb1-8766-962efaab63a1",
"name": "财务分析报告",
"description": "Q3毛利率趋势分析结果",
"parts": [
{
"kind": "text",
"text": "## Q3 毛利率分析\n\n根据数据,Q3 毛利率为 42.3%,环比下降 1.2 个百分点..."
},
{
"kind": "data",
"data": {
"grossMargin": 0.423,
"qoqChange": -0.012,
"trend": "declining",
"monthlyBreakdown": [0.435, 0.428, 0.406]
}
}
]
}
Artifact vs Message 的语义区别:
- Message:通信过程中的"说话"——发送方在表达、询问、回应
- Artifact:任务完成后交付的"产品"——有名字、有描述、可被引用、可被下游消费
一个 Task 可以生成多个 Artifact(例如:一份 SQL 报告 + 一个可视化图表 + 一个数据导出文件)。这些 Artifact 全部关联到同一个 taskId,便于客户端一次性获取完整成果。
四、JSON-RPC 方法全景解析
A2A 通过 JSON-RPC 2.0 定义了一套方法,全部 POST 到 Agent 的 url 端点:
4.1 方法总览
| 方法名 | 用途 | SSE支持 |
|---|---|---|
message/send | 发送消息,同步等待完整结果 | ❌ |
message/stream | 发送消息,SSE 流式接收进度和结果 | ✅ |
tasks/get | 查询某个 Task 的当前状态和结果 | - |
tasks/list | 列出所有(或过滤的)Task | - |
tasks/cancel | 取消一个正在进行的 Task | - |
tasks/resubscribe | 重新订阅某个 Task 的 SSE 流 | ✅ |
tasks/pushNotificationConfig/set | 配置 Task 完成后的推送回调地址 | - |
agent/authenticatedExtendedCard | 获取需认证才能看到的完整 Agent Card | - |
JSON-RPC 请求基础结构:
{
"jsonrpc": "2.0",
"id": "请求ID(客户端自生成,用于对应响应)",
"method": "方法名",
"params": { ... }
}
4.2 同步请求:message/send
最短路径:发一条消息,等 Agent 处理完返回完整结果。
请求示例:
{
"jsonrpc": "2.0",
"id": 1,
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [
{
"kind": "text",
"text": "把这段 Python 代码翻译成 Go:\ndef add(a, b):\n return a + b"
}
],
"messageId": "msg-001"
},
"configuration": {
"blocking": true,
"acceptedOutputModes": ["text"]
}
}
}
关键配置字段:
blocking: true:客户端愿意一直等到 Task 完成。如果服务端支持就直接返回最终结果;如果不支持,则先返回 Task 初始状态,客户端再轮询acceptedOutputModes:客户端能接受哪些格式的输出
同步响应(Task 已完成):
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"kind": "task",
"id": "task-abc-123",
"contextId": "ctx-xyz-789",
"status": {
"state": "completed",
"timestamp": "2025-04-17T10:30:00Z"
},
"artifacts": [
{
"artifactId": "artifact-001",
"name": "Go 代码",
"parts": [
{
"kind": "text",
"text": "func add(a, b int) int {\n return a + b\n}"
}
]
}
]
}
}
4.3 流式响应:message/stream
适合长时间运行的任务。HTTP 响应使用 Content-Type: text/event-stream,内容是一连串 SSE 事件:
// 事件1:Task 被接受
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"task","id":"task-001","status":{"state":"submitted","timestamp":"2025-04-17T10:30:00Z"}}}
// 事件2:进入处理中
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"status-update","taskId":"task-001","status":{"state":"working"}}}
// 事件3:第一个内容块到达(append: false = 新 Artifact 的第一块)
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"artifact-update","taskId":"task-001","artifact":{"artifactId":"art-001","parts":[{"kind":"text","text":"# Q3 财务分析报告\n\n## 摘要\n"}]},"append":false,"lastChunk":false}}
// 事件4:追加内容(append: true = 拼接到同一个 Artifact)
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"artifact-update","taskId":"task-001","artifact":{"artifactId":"art-001","parts":[{"kind":"text","text":"Q3 毛利率为 42.3%,环比下降 1.2 个百分点,主要受原材料成本上升影响..."}]},"append":true,"lastChunk":false}}
// 事件5:最后一块(lastChunk: true)
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"artifact-update","taskId":"task-001","artifact":{"artifactId":"art-001","parts":[{"kind":"text","text":"\n\n## 结论\n建议关注供应链成本优化机会。"}]},"append":true,"lastChunk":true}}
// 事件6:Task 完成,final: true,SSE 连接关闭
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"status-update","taskId":"task-001","status":{"state":"completed","timestamp":"2025-04-17T10:30:45Z"},"final":true}}
客户端处理 SSE 的逻辑:
import sseclient
import requests
def stream_task(a2a_url: str, message: dict) -> list[dict]:
artifacts = []
current_artifact = None
with requests.post(f"{a2a_url}/a2a", json=message, stream=True) as r:
client = sseclient.SSEClient(r)
for event in client.events():
data = json.loads(event.data)
result = data.get("result", {})
kind = result.get("kind")
if kind == "task":
# 初始化,获取 taskId
task_id = result["id"]
elif kind == "artifact-update":
artifact = result["artifact"]
is_append = result.get("append", False)
is_last = result.get("lastChunk", False)
if not is_append:
# 新 Artifact 开始
current_artifact = {
"artifactId": artifact["artifactId"],
"parts": artifact["parts"]
}
artifacts.append(current_artifact)
else:
# 追加到当前 Artifact
current_artifact["parts"].extend(artifact["parts"])
if is_last:
current_artifact = None
elif kind == "status-update" and result.get("final"):
break # 流结束
return artifacts
4.4 多轮交互:input-required 模式
这是 A2A 最体现"协作"特性的场景:
第一轮:客户端发起,Agent 表示需要补充信息
// 响应
{
"jsonrpc": "2.0",
"id": "req-001",
"result": {
"kind": "task",
"id": "task-flight-001",
"contextId": "ctx-flight-session",
"status": {
"state": "input-required",
"message": {
"role": "agent",
"parts": [
{ "kind": "text", "text": "我可以帮你订机票!请告诉我出发城市、目的地和出行日期?" }
]
}
}
}
}
第二轮:客户端补充信息,使用同一个 taskId 和 contextId
{
"jsonrpc": "2.0",
"id": "req-002",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [
{ "kind": "text", "text": "从上海出发飞东京,10月15日去,10月22日回" }
],
"messageId": "user-msg-002",
"taskId": "task-flight-001",
"contextId": "ctx-flight-session"
},
"configuration": { "blocking": true }
}
}
关键点: 续接任务时,message 里必须带上 taskId 和 contextId,Agent 才能把这条消息关联到正在等待 input-required 的 Task 上继续处理。这与 HTTP 会话的 Cookie 机制有异曲同工之妙。
4.5 错误码体系
A2A 在 JSON-RPC 标准错误码之外定义了协议专属错误:
| 错误码 | 常量名 | 含义 |
|---|---|---|
| -32001 | TaskNotFound | 任务 ID 不存在 |
| -32002 | TaskNotCancelable | 任务已终止,无法取消 |
| -32003 | PushNotificationNotSupported | Agent 不支持推送通知 |
| -32004 | UnsupportedOperation | 不支持该方法 |
| -32005 | ContentTypeNotSupported | 不支持该输入/输出格式 |
| -32006 | InvalidAgentResponse | Agent 内部返回了非法数据 |
| -32700 | ParseError | JSON 解析失败 |
| -32600 | InvalidRequest | 请求格式不合法 |
| -32601 | MethodNotFound | 方法不存在 |
| -32602 | InvalidParams | 参数不合法 |
五、安全机制:生产环境必备
A2A 对安全的要求非常明确,不是可选项,而是协议的正式组成部分。
5.1 传输层安全
必须使用 HTTPS,不得在生产环境中使用明文 HTTP。这与 Web 安全的最佳实践一致。
5.2 客户端认证
A2A 完整复用 OpenAPI 3.0 的 Security Scheme 体系,Agent Card 里声明什么认证方式,客户端就按什么方式提供凭证:
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
支持的方案包括:
- API Key:放在 Header 里,最简单
- HTTP Basic / Bearer:标准 HTTP 认证
- OAuth 2.0:支持 Authorization Code、Client Credentials、Device Code 三种流程
- OpenID Connect:带身份验证的 OAuth 2.0 超集
- mTLS:双向 TLS 证书认证,安全性最高
5.3 Agent Card 签名(v1.0 新特性)
Agent Card 可以被签名,防止中间人篡改。签名格式基于 JWS(JSON Web Signature)。
攻击场景: 攻击者拦截并修改 Agent Card,将 url 指向恶意服务器。客户端基于伪造的 Card 发送请求,导致敏感数据泄露。
防御机制: Agent Card 的发布者用私钥对 Card 内容签名,客户端用公钥验证签名,确认 Card 来自真实的发布者、且内容未被篡改。
5.4 Push Notification 安全
当 Agent 使用 Push Notification 回调客户端时,存在一个固有风险:回调请求可能被伪造。
A2A 的解决方案是:客户端在配置回调地址时,可以同时提供一个 JWKS URL,要求 Agent 对每次回调请求做签名。客户端收到回调后,用 JWKS 中的公钥验证签名,确认通知来自合法的 Agent。
// 客户端设置 Push Notification 配置
{
"method": "tasks/pushNotificationConfig/set",
"params": {
"taskId": "task-abc-123",
"pushNotificationEndpoint": "https://my-app.example.com/a2a-callback",
"jwksUri": "https://my-app.example.com/.well-known/jwks.json"
}
}
这样,即使回调 URL 被泄露,攻击者也无法伪造有效的签名请求。
六、MCP vs A2A:一张图讲清楚
很多人会把 MCP 和 A2A 搞混,觉得"都是 Agent 协议,有什么区别"。其实它们解决的是完全不同的问题:
┌─────────────────────────────────────────────────────────────┐
│ AI Agent 系统 │
│ │
│ ┌──────────┐ MCP协议 ┌──────────┐ │
│ │ Agent │ ───────────→ │ 工具层 │ │
│ │ │ │ 数据库 │ │
│ │ │ │ 搜索API │ │
│ │ │ │ 文件系统 │ │
│ └──────────┘ └──────────┘ │
│ │ │
│ │ A2A协议 │
│ ▼ │
│ ┌──────────┐ A2A协议 ┌──────────┐ │
│ │ Agent A │ ←──────────→ │ Agent B │ │
│ │ (招聘协调) │ │ (背调服务) │ │
│ └──────────┘ └──────────┘ │
│ │ │
│ │ A2A协议 │
│ ▼ │
│ ┌──────────┐ A2A协议 ┌──────────┐ │
│ │ Agent A │ ←──────────→ │ Agent C │ │
│ │ (招聘协调) │ │ (日历服务) │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
MCP = "USB 接口":连接 Agent 与外部资源(工具、数据源)
A2A = "网络协议":连接 Agent 与 Agent(协作、工作流)
| 维度 | MCP | A2A |
|---|---|---|
| 解决问题 | Agent → 工具 | Agent → Agent |
| 发起方 | Agent 主动调用工具 | 客户端协调或 Agent 间相互委托 |
| 通信模式 | 单次请求-响应 | 单次/流式/多轮/异步 |
| 状态管理 | 无状态 | 有状态的 Task 生命周期 |
| 典型厂商 | Anthropic(为主) | Google(牵头),多家联合维护 |
| 核心抽象 | Tool | Task |
| 2025年状态 | 成熟,广泛采用 | 快速演进,v0.3 已发布 |
6.1 互补使用场景
在实际系统中,MCP 和 A2A 不是互斥的,而是互补的:
场景:一个 AI 代码审查系统
- A2A 层:协调 Agent(Orchestrator)把代码审查任务分发给代码审查 Agent
- MCP 层:代码审查 Agent 通过 MCP 调用 Git 工具获取源码、通过 MCP 调用 Linter 执行检查、通过 MCP 调用数据库查询代码库信息
协调 Agent (A2A) ──→ 代码审查 Agent (A2A)
│
├── MCP: git工具(获取源码)
├── MCP: linter工具(执行检查)
└── MCP: 数据库工具(查询代码库)
这就是 2026 年最流行的 MCP + A2A 双协议架构:A2A 处理 Agent 间的工作流编排,MCP 处理 Agent 的工具调用能力。
七、生产级实战:Python 实现完整 A2A 交互
7.1 项目结构
a2a_demo/
├── agent_card.json # Agent 名片
├── server.py # A2A 服务端
├── client.py # A2A 客户端
└── main.py # 演示入口
依赖安装:
pip install fastapi uvicorn httpx sseclient-py
7.2 agent_card.json
{
"name": "代码翻译 Agent",
"description": "将代码从一种编程语言翻译成另一种语言的专业 Agent",
"version": "1.0.0",
"url": "http://localhost:8000/a2a",
"provider": {
"organization": "Demo Corp",
"url": "https://demo.example.com"
},
"capabilities": {
"streaming": false,
"pushNotifications": false
},
"defaultInputModes": ["text"],
"defaultOutputModes": ["text"],
"skills": [
{
"id": "code-translate",
"name": "代码翻译",
"description": "将源代码从一种语言翻译成另一种语言",
"tags": ["code", "translation", "programming"],
"examples": [
"把这段 Python 代码翻译成 Go",
"将以下 JavaScript 代码转换为 TypeScript"
],
"inputModes": ["text"],
"outputModes": ["text"]
}
]
}
7.3 server.py:A2A 服务端
"""
A2A 服务端:发布 Agent Card,处理 JSON-RPC 请求
职责最小化:只做协议层,不含业务逻辑
"""
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
# 任务存储,生产环境替换为数据库或 Redis
_tasks: dict[str, dict] = {}
# ─────────────────────────────────────────
# 业务逻辑占位符(真实场景替换为 LLM 调用)
# ─────────────────────────────────────────
def translate_code(text: str) -> str:
"""模拟代码翻译。真实场景接入 Claude/GPT API。"""
text_lower = text.lower()
if "python" in text_lower and ("go" in text_lower or "golang" in text_lower):
return (
"// Go 翻译结果\n\n"
"package main\n\n"
"func add(a, b int) int {\n"
" return a + b\n"
"}\n\n"
"func main() {}\n"
)
elif "python" in text_lower and "typescript" in text_lower:
return (
"// TypeScript 翻译结果\n\n"
"function add(a: number, b: number): number {\n"
" return a + b;\n"
"}\n"
)
else:
return f"// 翻译完成(输入摘要:{text[:60]}...)"
# ─────────────────────────────────────────
# 端点一:发布 Agent Card
# ─────────────────────────────────────────
@app.get("/.well-known/agent.json")
async def get_agent_card():
"""GET /.well-known/agent.json,对外暴露 Agent 能力"""
card = json.loads(Path("agent_card.json").read_text(encoding="utf-8"))
return JSONResponse(content=card)
# ─────────────────────────────────────────
# 端点二:A2A 主入口
# ─────────────────────────────────────────
@app.post("/a2a")
async def a2a_endpoint(request: Request):
"""所有 JSON-RPC 请求的统一入口"""
body: dict = await request.json()
method = body.get("method", "")
req_id = body.get("id")
if method == "message/send":
return _handle_message_send(body)
elif method == "message/stream":
return _handle_message_stream(body)
elif method == "tasks/get":
return _handle_tasks_get(body)
elif method == "tasks/cancel":
return _handle_tasks_cancel(body)
else:
return _error(req_id, -32601, "Method not found")
# ─────────────────────────────────────────
# 处理器:message/send(同步)
# ─────────────────────────────────────────
def _handle_message_send(body: dict) -> JSONResponse:
params = body["params"]
message = params["message"]
req_id = body["id"]
# 从 parts 提取文本内容
user_text = "".join(
p.get("text", "")
for p in message.get("parts", [])
if p.get("kind") == "text"
)
# 提取上下文 ID
task_id = str(uuid.uuid4())
context_id = message.get("contextId") or str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# 执行翻译(业务逻辑)
translated = translate_code(user_text)
# 构建 Task
task = {
"kind": "task",
"id": task_id,
"contextId": context_id,
"status": {
"state": "completed",
"timestamp": now,
"message": {
"role": "agent",
"parts": [
{"kind": "text", "text": "翻译完成。"}
]
}
},
"artifacts": [
{
"artifactId": str(uuid.uuid4()),
"name": "翻译结果",
"parts": [{"kind": "text", "text": translated}]
}
],
"history": [
{
"role": "user",
"parts": message.get("parts", []),
"messageId": message.get("messageId", str(uuid.uuid4())),
"taskId": task_id,
"contextId": context_id
}
]
}
_tasks[task_id] = task
return JSONResponse(content={
"jsonrpc": "2.0",
"id": req_id,
"result": task
})
# ─────────────────────────────────────────
# 处理器:message/stream(流式,SSE)
# ─────────────────────────────────────────
def _handle_message_stream(body: dict) -> JSONResponse:
"""演示流式响应,返回 Task 初始状态(含 taskId)
真实场景用 StreamingResponse + 异步生成器推送 SSE 事件"""
params = body["params"]
message = params["message"]
req_id = body.get("id")
task_id = str(uuid.uuid4())
context_id = message.get("contextId") or str(uuid.uuid4())
# 保存任务(状态为 working)
_tasks[task_id] = {
"kind": "task",
"id": task_id,
"contextId": context_id,
"status": {"state": "working", "timestamp": datetime.now(timezone.utc).isoformat()}
}
return JSONResponse(content={
"jsonrpc": "2.0",
"id": req_id,
"result": _tasks[task_id]
})
# ─────────────────────────────────────────
# 处理器:tasks/get(查询状态)
# ─────────────────────────────────────────
def _handle_tasks_get(body: dict) -> JSONResponse:
req_id = body["id"]
task_id = body["params"].get("id")
if task_id not in _tasks:
return _error(req_id, -32001, "TaskNotFound", {"taskId": task_id})
return JSONResponse(content={
"jsonrpc": "2.0",
"id": req_id,
"result": _tasks[task_id]
})
# ─────────────────────────────────────────
# 处理器:tasks/cancel(取消任务)
# ─────────────────────────────────────────
def _handle_tasks_cancel(body: dict) -> JSONResponse:
req_id = body["id"]
task_id = body["params"].get("id")
if task_id not in _tasks:
return _error(req_id, -32001, "TaskNotFound", {"taskId": task_id})
state = _tasks[task_id]["status"]["state"]
if state in ("completed", "failed", "canceled"):
return _error(req_id, -32002, "TaskNotCancelable", {
"taskId": task_id,
"currentState": state
})
_tasks[task_id]["status"]["state"] = "canceled"
return JSONResponse(content={
"jsonrpc": "2.0",
"id": req_id,
"result": _tasks[task_id]
})
# ─────────────────────────────────────────
# 工具函数
# ─────────────────────────────────────────
def _error(req_id: Any, code: int, message: str, data: dict = None) -> JSONResponse:
err = {"code": code, "message": message}
if data:
err["data"] = data
return JSONResponse(content={"jsonrpc": "2.0", "id": req_id, "error": err})
7.4 client.py:A2A 客户端
"""
A2A 客户端:封装发现、发送、查询逻辑
对外暴露语义清晰的方法,屏蔽 JSON-RPC 细节
"""
import uuid
import httpx
from typing import AsyncIterator, Optional
class A2AClient:
"""符合 A2A 规范的客户端"""
def __init__(self, agent_card_url: str):
self.agent_card_url = agent_card_url # 例如: http://localhost:8000/.well-known/agent.json
self.agent_card: Optional[dict] = None
self.a2a_url: str = ""
# ─────────────────────────────────
# 发现:读取 Agent Card
# ─────────────────────────────────
async def discover(self) -> dict:
"""GET /.well-known/agent.json,获取对方的能力描述和通信地址"""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(self.agent_card_url)
response.raise_for_status()
self.agent_card = response.json()
# A2A URL 来自 agent_card.url 字段
self.a2a_url = self.agent_card["url"]
return self.agent_card
# ─────────────────────────────────
# 发送消息(同步,等结果)
# ─────────────────────────────────
async def send_message(
self,
text: str,
context_id: Optional[str] = None,
task_id: Optional[str] = None,
) -> dict:
"""发送消息,同步等待完整结果"""
message_id = str(uuid.uuid4())
body = {
"jsonrpc": "2.0",
"id": message_id,
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": text}],
"messageId": message_id,
"taskId": task_id,
"contextId": context_id,
},
"configuration": {
"blocking": True,
"acceptedOutputModes": self.agent_card.get("defaultOutputModes", ["text"])
}
}
}
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(self.a2a_url, json=body)
response.raise_for_status()
result = response.json()
if "error" in result:
raise RuntimeError(f"A2A Error {result['error']['code']}: {result['error']['message']}")
return result["result"]
# ─────────────────────────────────
# 查询任务状态
# ─────────────────────────────────
async def get_task(self, task_id: str) -> dict:
"""查询某个 Task 的当前状态"""
body = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/get",
"params": {"id": task_id}
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(self.a2a_url, json=body)
response.raise_for_status()
result = response.json()
if "error" in result:
raise RuntimeError(f"A2A Error {result['error']['code']}: {result['error']['message']}")
return result["result"]
# ─────────────────────────────────
# 取消任务
# ─────────────────────────────────
async def cancel_task(self, task_id: str) -> dict:
"""取消一个正在进行的 Task"""
body = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/cancel",
"params": {"id": task_id}
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(self.a2a_url, json=body)
response.raise_for_status()
result = response.json()
if "error" in result:
raise RuntimeError(f"A2A Error {result['error']['code']}: {result['error']['message']}")
return result["result"]
7.5 main.py:演示入口
"""
演示 A2A 完整交互流程
启动方式:uvicorn server:app --reload(另一个终端)
"""
import asyncio
from client import A2AClient
async def demo():
# 第一步:发现 Agent
client = A2AClient("http://localhost:8000/.well-known/agent.json")
card = await client.discover()
print(f"发现 Agent:{card['name']}")
print(f"版本:{card['version']}")
print(f"支持的能力:{card['capabilities']}")
print(f"可提供服务:{[s['name'] for s in card['skills']]}")
print("-" * 50)
# 第二步:同步调用
result = await client.send_message(
"把这段 Python 代码翻译成 Go:\ndef add(a, b):\n return a + b"
)
print(f"Task ID:{result['id']}")
print(f"状态:{result['status']['state']}")
print(f"输出数量:{len(result['artifacts'])} 个 Artifact")
for artifact in result["artifacts"]:
for part in artifact["parts"]:
if part["kind"] == "text":
print(f"\n{artifact['name']}:\n{part['text']}")
if __name__ == "__main__":
asyncio.run(demo())
运行效果:
发现 Agent:代码翻译 Agent
版本:1.0.0
支持的能力:{'streaming': False, 'pushNotifications': False}
可提供服务:['代码翻译']
--------------------------------------------------
Task ID:6f82c1de-3b4a-4e9f-8c2d-1a7b3e5f9c2d
状态:completed
输出数量:1 个 Artifact
翻译结果:
// Go 翻译结果
package main
func add(a, b int) int {
return a + b
}
func main() {}
八、架构演进:从单体 Agent 到 Agent 协作网络
8.1 单体 Agent 的局限性
传统单体 Agent(如 GPT-4 配合几个 Tool)的局限性在于:
- 能力边界固定:一个 Agent 不可能同时成为代码专家、财务分析师、法律顾问
- 上下文膨胀:把所有专业知识塞进 Prompt 导致上下文爆炸、推理成本飙升
- 单点故障:一个 Agent 崩溃,整个系统崩溃
- 升级困难:替换某个能力意味着重新训练或重新配置整个 Agent
8.2 多 Agent 协作网络
A2A 推动的架构是 Agent 协作网络(Agentic Network):
┌─────────────┐
│ 用户请求 │
└──────┬──────┘
│
┌──────▼──────┐
│ 协调 Agent │ ← 理解意图,路由任务
│ (Orchestrator)│
└──────┬──────┘
┌──────────────┼──────────────┐
│ │ │
┌──────▼──────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ 招聘 Agent │ │背调 Agent │ │日历 Agent │
│ (A2A) │ │ (A2A) │ │ (A2A) │
└──────┬──────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
┌──────▼──────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ MCP: LinkedIn│ │MCP: 背调API│ │MCP: Google│
│ MCP: 简历库 │ │MCP: 数据库│ │ Calendar │
└─────────────┘ └───────────┘ └───────────┘
协调 Agent 不需要知道各个专业 Agent 的内部实现,只需要通过 A2A 委托任务、获取结果。专业 Agent 也不需要知道协调 Agent 是如何做出决策的——它们只关心输入和输出。
8.3 标准化带来的生态效应
当 A2A 成为事实标准后,会催生几个有趣的生态效应:
Agent 市场(Agent Marketplace): 企业可以像采购 SaaS 一样,直接采购专业 Agent——"招聘 Agent"、"CRM Agent"、"数据分析 Agent",插上 A2A 就能用,不需要定制开发。
Agent 组合器(Agent Composer): 低代码/无代码平台提供可视化界面,通过拖拽组合不同的 Agent,配置工作流,自动生成多 Agent 协作系统。
Agent 发现服务(Agent Discovery): 类似 DNS 的 Agent 发现网络,通过 Agent Card 的元数据让 Agent 自动找到其他 Agent,无需手动配置。
九、性能优化与生产实践
9.1 降低发现延迟
Agent Card 应该使用 CDN 缓存或边缘节点,避免每次调用都从源站获取。对于高频调用场景,可以将 Card 内容缓存在客户端本地,定期刷新(TTL 可设为 5-10 分钟)。
from functools import lru_cache
import httpx
@lru_cache(maxsize=128)
def get_agent_card_cached(url: str, ttl_seconds: int = 300):
"""带 TTL 的 Agent Card 缓存"""
# 实际实现可加时间戳检查
with httpx.Client(timeout=10.0) as client:
r = client.get(url)
r.raise_for_status()
return r.json()
9.2 Task 超时与重试策略
对于长时间运行的 Task,建议设置合理的超时和重试策略:
import asyncio
from httpx import TimeoutException
async def send_with_retry(client: A2AClient, text: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await client.send_message(text)
except TimeoutException:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 指数退避
9.3 多轮对话的上下文管理
contextId 的设计让同一个会话的所有 Task 共享上下文。但要注意,上下文信息可能随时间推移变得陈旧。建议定期检查 Task 历史,清理不再需要的中间结果:
# 定期清理过期的中间 Task(保留最终结果)
async def cleanup_intermediate_tasks(context_id: str, keep_final: bool = True):
tasks = await client.list_tasks(context_id)
for task in tasks:
if task["status"]["state"] in ("completed", "failed") and keep_final:
# 保留最终结果,清理中间步骤的 history
task["history"] = [] # 释放内存
十、总结与展望
核心要点回顾
- A2A 解决的是 Agent→Agent 的通信问题,与 MCP(Agent→工具)形成互补关系,共同构成 AI Agent 生态的双协议栈
- Task 是 A2A 的核心抽象,完整的状态机(submitted→working→input-required/completed/failed)覆盖了真实世界协作的各种场景
- Agent Card 是发现机制的入口,通过 skills 的声明式描述实现松耦合的任务路由
- 基于已有 Web 标准(HTTP/JSON-RPC/SSE) 是 A2A 最重要的设计决策,大幅降低了学习和实现成本
- 安全是协议的正式组成部分,包括传输层 TLS、认证方案、Card 签名和 Push Notification 签名
2026 年展望
- v1.0 稳定版:预计2026年Q4发布,提供向后兼容性保证,企业可以开始大规模采用
- 生态工具成熟:A2A SDK 会覆盖更多语言(Go、Rust、TypeScript),A2A 调试工具、可视化工作流编辑器会逐步出现
- 与 MCP 的深度集成:MCP + A2A 双协议架构将成为 2026 年多 Agent 系统的事实标准
- 企业采用加速:随着 Agent 市场的发展,会有更多企业通过 A2A 组合专业 Agent,而非从头构建自己的多 Agent 系统
A2A 协议的意义,不仅在于让 Agent 能互相通信,更在于它为 AI Agent 生态的"工业化"奠定了基础——就像 REST API 让互联网服务可以互相调用,A2A 让 AI Agent 可以互相协作。当 Agent 协作网络足够丰富,AI 能做的事情将远超今天任何单体 Agent 的能力边界。
参考资源:
- A2A 协议规范:https://github.com/a2a-protocol/a2a
- Linux Foundation A2A 项目:https://lf-a2a.io
- A2A Python SDK(社区):github.com/a2a-protocol/a2a-python