A2A + MCP 双协议深度解析:当多智能体协作成为工程现实——从协议原理到生产级代码实战
作者:程序员茄子 | 2026-06-10 | 分类:编程 | 字数:约 12000 字
前言:Agent 时代的基础设施战争
2026 年,AI Agent 从「能聊天」进化到「能干活」,这个转变的关键不在模型本身,而在协议层。
当一个 AI Agent 需要调用另一个 Agent 完成子任务时,它面临三个根本问题:信谁?怎么通信?出了事谁负责?
MCP(Model Context Protocol)和 A2A(Agent-to-Agent)协议,正是解决这三个问题的两套互补方案。它们不是竞争关系,而是同一套基础设施的两层:MCP 解决「AI 如何操控工具」,A2A 解决「AI 如何与其他 AI 协作」。
清华大学 2026 年 6 月发布的 A2A 研究报告,更是将 A2A 定义为「AI 时代数字委托协议与组织基础设施」,类比 Web 时代的 PageRank。Google 将 A2A 协议捐给 Linux 基金会,微信与华为、小米、OPPO、vivo 等手机厂商合作推出 A2A 助手能力——这标志着 Agent 互联协议正式进入商用阶段。
本文从工程视角出发,深入拆解这两套协议的架构原理、协作机制、代码实现,以及在生产环境中的集成策略。无论你是做 AI Agent 开发、后端架构设计,还是在做企业级 AI 选型,这篇文章都会给你完整的技术参照系。
一、为什么需要多协议:拆解 AI Agent 的三大痛点
在深入协议细节之前,我们先理解为什么 MCP + A2A 的双协议架构会成为 2026 年的主流选择。
1.1 痛点一:工具调用「诸侯混战」
在 MCP 出现之前,每个 AI 平台都有自己的工具调用规范:
OpenAI → function calling (GPT 系列专用)
Anthropic → tool use (Claude 系列专用)
Google → Bison tool use
各公司自研 Agent → 自定义 JSON Schema
这导致一个严重问题:写好的工具在平台间无法复用。你在 Cursor 里调试好的 GitHub MCP Server,切到 Claude Code 就得重写一遍。
MCP 的核心价值是标准化工具描述,让「AI 调工具」这件事变得像 USB 接口一样即插即用。
1.2 痛点二:多 Agent 协作「没有共同语言」
当业务复杂到需要多个 Agent 协作时,问题来了:
- Agent A 想知道 Agent B 能做什么 → 没有「能力发现」机制
- Agent A 把任务委托给 Agent B → 没有「任务描述」标准
- Agent B 遇到无法解决的问题 → 没有「升级上报」通道
- 两方协作出了 bug → 没有「责任追溯」体系
A2A 协议正是为了解决这些问题而设计的。它定义了 Agent 之间的通信语言、能力描述、任务委托流程和信任框架。
1.3 痛点三:安全与信任「最后一公里」
清华大学 A2A 报告指出了一个被大多数开发者忽视的问题:多 Agent 协作中的信任传递。当 Agent A 委托 Agent B 访问用户数据时,谁来保证 Agent B 不会滥用权限?
MCP 提供了资源隔离和权限控制的基础能力,而 A2A 则引入了 Agent Link 信任框架——关系信誉、证据溯源、委托审计三大模块,解决「信谁、凭什么信任、事后追责」的难题。
二、MCP 协议深度解析:AI 的「USB 接口」
2.1 MCP 是什么
MCP(Model Context Protocol)是 Anthropic 于 2024 年底开源的标准化协议,定义了 AI 大模型如何与外部工具、数据源进行通信。
它的设计哲学非常清晰:
┌─────────────────────────────────────────────────────┐
│ MCP Host (如 Cursor/Claude Code) │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ LLM (大脑) │ ←→ │ MCP Client │ │
│ └──────────────┘ └──────┬───────┘ │
└─────────────────────────────┼──────────────────────┘
│ stdio / SSE / HTTP
┌─────────────────────┼─────────────────────┐
│ MCP Server 1 │ MCP Server 2 │
│ ┌───────────────┐ │ ┌───────────────┐ │
│ │ GitHub Tools │ │ │ DB Tools │ │
│ └───────────────┘ │ └───────────────┘ │
└────────────────────────────────────────────┘
2.2 MCP 的核心架构
MCP 采用客户端-服务器架构,包含四个核心组件:
2.2.1 MCP Host(宿主应用)
集成 MCP 功能的 AI 驱动应用程序,如 Cursor、Claude Code、VS Code 等。Host 负责:
- 管理 MCP Client 连接
- 展示工具调用结果给用户
- 处理认证和会话管理
2.2.2 MCP Client(客户端)
运行在 Host 内部的客户端模块,每个连接的 MCP Server 对应一个 Client 实例。Client 的职责是:
- 与 Server 建立连接(stdio / SSE / HTTP)
- 转发 LLM 的工具调用请求
- 接收并处理 Server 返回的结果
2.2.3 MCP Server(服务器)
提供特定工具和资源的独立服务。每个 Server 暴露一组 Tools(AI 可调用的操作)、Resources(AI 可读取的数据)和 Prompts(预定义的提示模板)。
2.2.4 Transport Layer(传输层)
MCP 支持三种传输方式:
stdio(标准输入/输出):适合本地进程通信,启动命令后通过 stdin/stdout 交换 JSON-RPC 消息。
// mcp_server.py
from mcp.server.fastmcp import FastMCP
mcp = FastMCP(name="my-server")
@mcp.tool(name="get_weather", description="获取城市天气")
def get_weather(city: str) -> str:
return f"{city} 今日晴朗,25°C"
if __name__ == "__main__":
mcp.run(transport_type="stdio")
SSE(Server-Sent Events):适合需要服务端推送的场景,如实时日志、进度通知。
# 通过 HTTP + SSE 方式运行
mcp.run(transport_type="sse", host="127.0.0.1", port=3000)
HTTP + POST:适合微服务架构,每个工具调用是一个 HTTP POST 请求。
2.3 MCP 工具定义:代码级拆解
MCP 的核心是工具描述 schema。一个标准的 MCP 工具定义包含以下字段:
// MCP Tool 定义(TypeScript 类型)
interface Tool {
name: string; // 工具名称,AI 用它来决定调用哪个工具
description: string; // 工具描述,AI 用它来理解工具用途
inputSchema: { // 输入参数 schema(JSON Schema 格式)
type: "object";
properties: {
[key: string]: {
type: string;
description: string;
default?: any;
enum?: string[];
};
};
required: string[];
};
}
// 示例:一个完整的 MCP 工具
{
name: "search_code",
description: "在代码仓库中搜索包含特定关键词的文件和代码片段",
inputSchema: {
type: "object",
properties: {
query: {
type: "string",
description: "搜索关键词,支持正则表达式"
},
file_pattern: {
type: "string",
description: "文件匹配模式,如 *.py、src/**/*.ts",
default: "*"
},
max_results: {
type: "number",
description: "最大返回结果数",
default: 10
}
},
required: ["query"]
}
}
这个 schema 是 AI 决定「何时调用」和「传什么参数」的核心依据。MCP 的工具描述比 OpenAI 的 function calling 更规范,因为它遵循 JSON Schema 标准,任何支持 JSON Schema 的系统都能自动解析和验证。
2.4 MCP 实战:用 FastMCP 从零构建生产级 Server
下面是一个完整的实战案例:构建一个支持代码仓库分析的 MCP Server,包含文件搜索、代码统计、依赖分析三个工具。
# mcp_code_analyzer/server.py
import os
import re
import json
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP
mcp = FastMCP(
name="code-analyzer",
description="代码仓库分析工具集"
)
@mcp.tool(
name="search_code",
description="在代码仓库中全文搜索代码,支持正则表达式"
)
def search_code(
repo_path: str,
query: str,
file_pattern: str = "*",
case_sensitive: bool = False
) -> dict:
"""
在指定仓库中搜索代码。
Args:
repo_path: 仓库根目录路径
query: 搜索关键词或正则表达式
file_pattern: 文件匹配模式(如 *.py, src/**/*.ts)
case_sensitive: 是否大小写敏感
Returns:
包含匹配结果的字典:文件名、行号、匹配内容
"""
results = []
flags = 0 if case_sensitive else re.IGNORECASE
try:
pattern = re.compile(query, flags)
except re.error:
return {"error": f"Invalid regex pattern: {query}"}
repo = Path(repo_path)
if not repo.exists():
return {"error": f"Repository path not found: {repo_path}"}
# 递归搜索文件
for file_path in repo.rglob(file_pattern):
if not file_path.is_file():
continue
# 跳过 node_modules, .git 等目录
if any(part.startswith('.') or part == 'node_modules'
for part in file_path.parts):
continue
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
lines = content.split('\n')
for line_no, line in enumerate(lines, start=1):
if pattern.search(line):
results.append({
"file": str(file_path.relative_to(repo)),
"line": line_no,
"content": line.strip(),
"context": lines[max(0, line_no-3):line_no+2]
})
except Exception:
continue
return {
"total_matches": len(results),
"files_matched": len(set(r["file"] for r in results)),
"results": results[:100] # 限制返回数量
}
@mcp.tool(
name="analyze_structure",
description="分析代码仓库的整体结构和复杂度指标"
)
def analyze_structure(repo_path: str) -> dict:
"""
分析仓库结构,生成复杂度指标和依赖图谱。
Returns:
包含代码统计、文件类型分布、依赖关系等信息的字典
"""
repo = Path(repo_path)
if not repo.exists():
return {"error": f"Repository path not found: {repo_path}"}
stats = {
"total_files": 0,
"total_lines": 0,
"languages": {},
"largest_files": [],
"dir_structure": {},
"empty_files": 0
}
for file_path in repo.rglob('*'):
if not file_path.is_file():
continue
if any(part.startswith('.') for part in file_path.parts):
continue
stats["total_files"] += 1
ext = file_path.suffix.lower()
lang_map = {
'.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript',
'.java': 'Java', '.go': 'Go', '.rs': 'Rust',
'.cpp': 'C++', '.c': 'C', '.rb': 'Ruby',
'.php': 'PHP', '.swift': 'Swift', '.kt': 'Kotlin',
'.vue': 'Vue', '.svelte': 'Svelte', '.md': 'Markdown'
}
lang = lang_map.get(ext, 'Other')
stats["languages"][lang] = stats["languages"].get(lang, 0) + 1
try:
lines = file_path.read_text(encoding='utf-8', errors='ignore').split('\n')
line_count = len(lines)
stats["total_lines"] += line_count
if line_count == 0:
stats["empty_files"] += 1
# 记录大文件
if line_count > 500:
stats["largest_files"].append({
"file": str(file_path.relative_to(repo)),
"lines": line_count
})
except Exception:
continue
stats["largest_files"].sort(key=lambda x: x["lines"], reverse=True)
stats["largest_files"] = stats["largest_files"][:10]
return stats
@mcp.tool(
name="find_dependencies",
description="分析代码中的 import/require 语句,生成依赖图谱"
)
def find_dependencies(repo_path: str, target_file: str) -> dict:
"""
分析指定文件及其依赖关系。
Args:
repo_path: 仓库根目录
target_file: 目标文件的相对路径
Returns:
包含 imports、exports、潜在循环依赖等信息的字典
"""
file_path = Path(repo_path) / target_file
if not file_path.exists():
return {"error": f"File not found: {target_file}"}
ext = file_path.suffix.lower()
imports = []
exports = []
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
# Python: import / from ... import
if ext == '.py':
import re as reg
imports = reg.findall(
r'^(?:from\s+([\w.]+)\s+)?import\s+(.+)$',
content, reg.MULTILINE
)
# JavaScript/TypeScript: import / require
elif ext in ('.js', '.ts', '.jsx', '.tsx', '.vue'):
import re as reg
# ES6 imports
es6_imports = reg.findall(
r'import\s+(?:{\s*([^}]+)\s*|\*\s+as\s+\w+\s+|\w+\s+)\s+from\s+[\'"]([^\'"]+)[\'"]',
content
)
# CommonJS require
cjs_imports = reg.findall(
r'require\s*\(\s*[\'"]([^\'"]+)[\'"]\s*\)',
content
)
# Named exports
exports = reg.findall(r'export\s+(?:const|let|var|function|class)\s+(\w+)', content)
return {
"file": target_file,
"imports": imports[:50], # 限制数量
"exports": exports,
"line_count": len(content.split('\n')),
"file_size_bytes": file_path.stat().st_size
}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
# 本地 stdio 模式运行
mcp.run()
在 VS Code 中使用这个 Server(.vscode/mcp.json):
{
"servers": {
"code-analyzer": {
"type": "stdio",
"command": "python3",
"args": ["/path/to/mcp_code_analyzer/server.py"]
}
}
}
之后,在 Cursor 或支持 MCP 的编辑器中,AI 就能直接调用 search_code、analyze_structure、find_dependencies 这三个工具了——无需任何额外的提示词工程。
2.5 MCP 的安全机制
MCP 在安全设计上做了多层次考虑:
资源隔离:MCP Server 只能访问自己声明的资源,Host 在调用前会检查权限。
输入验证:所有工具参数都经过 JSON Schema 校验,防止注入攻击。
本地优先:默认支持本地 stdio 模式,数据不离开本地环境。
OAuth 2.1 支持(计划中):MCP 官方计划增加对企业级身份验证的支持,允许远程服务器的安全认证。
三、A2A 协议深度解析:AI 之间的「TCP/IP」
3.1 A2A 是什么
A2A(Agent-to-Agent)协议是 Google 主导、已捐给 Linux 基金会的开放协议,用于实现 AI Agent 之间的无缝通信和互操作性。
如果说 MCP 是「AI 操控工具的 USB 接口」,那么 A2A 就是「AI 与 AI 通信的 TCP/IP 协议」。
A2A 的核心设计目标是解决多 Agent 协作中的五个关键问题:
问题 A2A 解决方案
─────────────────────────────────────────────────
能力发现 Agent Card(能力描述卡)
任务委托 Task/TaskSubmit 消息
状态同步 TaskStatusUpdate/TaskState
结果返回 TaskResult/Artifact
信任传递 Agent Link 框架
3.2 A2A 的核心概念
3.2.1 Agent Card(能力描述卡)
Agent Card 是 A2A 的「自我介绍」机制。每个 Agent 在 A2A Server 上注册时,必须声明自己的能力:
{
"name": "code-review-agent",
"description": "专业的代码审查 Agent,支持安全漏洞检测、代码风格检查、性能问题识别",
"version": "1.2.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitions": true
},
"skills": [
{
"id": "security-scan",
"name": "安全扫描",
"description": "检测代码中的安全漏洞,包括 SQL 注入、XSS、CSRF 等",
"tags": ["security", "vulnerability", "OWASP"],
"inputModes": ["text", "code"],
"outputModes": ["text", "json", "report"]
},
{
"id": "style-check",
"name": "代码风格检查",
"description": "检查代码风格是否遵循 PEP8/ESLint 规范",
"tags": ["style", "lint", "formatting"],
"inputModes": ["code"],
"outputModes": ["text", "json"]
}
],
"endpoints": {
"openapiSpecUrl": "http://localhost:8080/openapi.json"
},
"provider": {
"organization": "DevTeam",
"service": "code-review-service"
}
}
这个 Agent Card 是其他 Agent「发现」和「选择」协作伙伴的核心依据。当 Host Agent 需要做代码审查时,它会查询 A2A Server 上的 Agent Card,找到 code-review-agent,然后决定委托任务。
3.2.2 Task(任务)生命周期
A2A 中,任务(Task)是协作的基本单元。每个 Task 有完整的状态流转:
Task 状态流转图:
SUBMITTED ──→ WORKING ──→ INPUT_REQUIRED
│ │
↓ │
COMPLETED ←──────┘
│
(遇到无法解决的问题)
│
↓
INPUT_REQUIRED ──→ (等待外部输入)
│
(外部输入到达,继续)
↓
WORKING ──→ COMPLETED
任何状态下 ──→ FAILED / CANCELED
Task 的 JSON 结构:
{
"id": "task-uuid-xxxxx",
"sessionId": "session-uuid-yyyyy",
"status": {
"state": "working",
"timestamp": "2026-06-10T12:00:00Z"
},
"artifacts": [
{
"artifactId": "artifact-1",
"name": "review-report",
"description": "代码审查报告,包含安全漏洞列表和改进建议",
"contentType": "application/json",
"parts": [
{
"type": "data",
"data": {
"summary": "发现 3 个高危漏洞,5 个中危漏洞",
"vulnerabilities": [
{"severity": "HIGH", "type": "SQL Injection", "location": "src/db.py:45"}
]
}
}
]
}
],
"messages": [
{
"role": "agent",
"content": {
"parts": [
{
"type": "text",
"text": "开始对 main.py 进行安全扫描..."
}
]
}
}
]
}
3.3 A2A 协议的消息类型
A2A 定义了六类核心消息,覆盖了多 Agent 协作的全流程:
3.3.1 tasks/send 消息(同步任务发送)
Host Agent 向 Remote Agent 发送任务,Remote Agent 同步处理后立即返回结果:
// Request
{
"method": "tasks/send",
"params": {
"id": "task-001",
"sessionId": "session-001",
"message": {
"role": "user",
"content": {
"parts": [
{
"type": "text",
"text": "请审查 src/auth.py 文件,重点检查身份认证逻辑"
}
]
}
}
}
}
// Response
{
"result": {
"id": "task-001",
"status": { "state": "completed" },
"artifacts": [
{
"artifactId": "review-result",
"name": "auth-review",
"contentType": "application/json",
"parts": [
{
"type": "data",
"data": {
"issues": [
{
"severity": "HIGH",
"line": 45,
"type": "Hardcoded Credentials",
"description": "发现硬编码密码,应使用环境变量",
"suggestion": "使用 os.environ.get('DB_PASSWORD')"
}
],
"summary": "共发现 1 个高危、2 个中危、4 个低危问题"
}
}
]
}
]
}
}
3.3.2 tasks/sendSubscribe 消息(订阅式任务)
适用于需要长时间处理的任务,Host Agent 订阅后可以实时接收状态更新:
// Request
{
"method": "tasks/sendSubscribe",
"params": {
"id": "task-002",
"sessionId": "session-002",
"message": {
"role": "user",
"content": {
"parts": [
{
"type": "text",
"text": "扫描整个代码仓库,生成完整的安全报告"
}
]
}
}
}
}
// Server 会通过 SSE 推送多个状态更新
// Update 1: status=working, message="正在扫描 src/api..."
// Update 2: status=working, message="发现可疑代码,正在分析..."
// Update 3: status=completed, artifacts=[完整报告]
3.3.3 tasks/get 消息(查询任务状态)
查询某个任务当前的状态和已有的结果:
{
"method": "tasks/get",
"params": {
"id": "task-001"
}
}
3.4 A2A 实战:用 Google ADK 构建多 Agent 协作系统
下面是一个完整的实战案例:使用 Google Agent Development Kit (ADK) 构建 Host Agent + Remote Agents 的协作系统。
项目结构:
a2a_collaboration/
├── host_agent/
│ ├── __init__.py
│ ├── main.py # Host Agent 主程序
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── coordinator.py # 协调 Agent(主控)
│ │ ├── weather_agent.py # 天气查询 Agent
│ │ └── code_agent.py # 代码分析 Agent
│ └── a2a/
│ ├── server.py # A2A Server
│ └── agent_cards.json # Agent Card 注册表
└── requirements.txt
Remote Agent 1:天气查询(host_agent/agents/weather_agent.py):
import asyncio
from google.adk import Agent
from google.adk.agents import Runner
from google.adk.models.lite_llm import LiteLLM
from google.adk.tools import FunctionTool
from a2a.server import A2AServer
from a2a.server.models import AgentCard, Task
from a2a.types import (
TaskStatus, TaskState, Message, TextPart, DataPart
)
# 天气查询工具(Remote Agent 提供的核心能力)
def get_weather(city: str, days: int = 3) -> dict:
"""获取指定城市的天气预报"""
# 模拟真实 API 调用
weather_data = {
"北京": [
{"day": "今天", "temp": "28°C", "weather": "晴", "pm25": "35"},
{"day": "明天", "temp": "26°C", "weather": "多云", "pm25": "42"},
{"day": "后天", "temp": "24°C", "weather": "小雨", "pm25": "58"},
],
"上海": [
{"day": "今天", "temp": "30°C", "weather": "晴", "pm25": "28"},
{"day": "明天", "temp": "31°C", "weather": "晴", "pm25": "30"},
{"day": "后天", "temp": "29°C", "weather": "阴", "pm25": "45"},
]
}
result = weather_data.get(city, [])
if not result:
return {"error": f"未找到城市 {city} 的数据"}
return {
"city": city,
"forecast": result[:days],
"generated_at": "2026-06-10T12:00:00+08:00"
}
# 定义 Remote Agent 的 A2A Card
def get_weather_agent_card() -> AgentCard:
return AgentCard(
name="weather-agent",
description="专业天气查询 Agent,支持多城市、多日预报、空气质量查询",
version="1.0.0",
capabilities={
"streaming": True,
"pushNotifications": False,
"stateTransitions": True
},
skills=[
{
"id": "weather-forecast",
"name": "天气预报查询",
"description": "查询任意城市的天气预报,支持1-7日预报",
"tags": ["weather", "forecast", "air-quality"],
"inputModes": ["text"],
"outputModes": ["text", "json"]
}
],
endpoints={
"openapiSpecUrl": "http://localhost:8001/openapi.json"
},
provider={
"organization": "WeatherService",
"service": "weather-agent"
}
)
# 创建 ADK Agent
def create_weather_agent() -> Agent:
return Agent(
model=LiteLLM(model="gpt-4o"),
name="weather_agent",
description="天气查询专家",
tools=[
FunctionTool(
func=get_weather,
name="get_weather",
description="获取天气预报数据"
)
],
instruction="""
你是一个专业的天气预报 Agent。
当用户询问天气时,使用 get_weather 工具获取数据,
并以清晰友好的方式呈现天气预报结果。
如果用户询问的是不支持的城市,礼貌地说明。
"""
)
# 启动 A2A Server
async def main():
server = A2AServer(
agent_card=get_weather_agent_card(),
agent=create_weather_agent(),
host="0.0.0.0",
port=8001
)
print("🌤️ Weather Agent A2A Server 启动中...")
await server.start()
if __name__ == "__main__":
asyncio.run(main())
Remote Agent 2:代码分析(host_agent/agents/code_agent.py):
import asyncio
from pathlib import Path
from google.adk import Agent
from google.adk.tools import FunctionTool
from a2a.server import A2AServer
from a2a.server.models import AgentCard
def analyze_code_complexity(file_path: str) -> dict:
"""分析代码文件的复杂度指标"""
path = Path(file_path)
if not path.exists():
return {"error": f"File not found: {file_path}"}
try:
lines = path.read_text(encoding='utf-8').split('\n')
code_lines = [l for l in lines if l.strip() and not l.strip().startswith('#')]
# 简单的圈复杂度估算(基于分支关键词)
branch_keywords = ['if', 'elif', 'for', 'while', 'except', 'and', 'or', '?', '&&', '||']
branches = sum(1 for line in lines for kw in branch_keywords if kw in line)
return {
"file": file_path,
"total_lines": len(lines),
"code_lines": len(code_lines),
"comment_lines": len(lines) - len(code_lines),
"estimated_complexity": branches,
"complexity_rating": "HIGH" if branches > 20 else "MEDIUM" if branches > 10 else "LOW",
"top_level_definitions": [
line.strip() for line in lines
if line.strip().startswith(('def ', 'class ', 'func ', 'fn '))
]
}
except Exception as e:
return {"error": str(e)}
def create_code_agent() -> Agent:
return Agent(
name="code_agent",
description="代码分析专家",
tools=[
FunctionTool(
func=analyze_code_complexity,
name="analyze_code_complexity",
description="分析代码文件复杂度"
)
],
instruction="""
你是一个代码分析专家。当用户提供代码文件路径时,
使用 analyze_code_complexity 工具分析代码复杂度,
并给出改进建议。关注:圈复杂度、代码行数、注释比例。
"""
)
def get_code_agent_card() -> AgentCard:
return AgentCard(
name="code-analysis-agent",
description="代码复杂度分析和质量评估 Agent",
version="1.0.0",
capabilities={"streaming": True, "stateTransitions": True},
skills=[
{
"id": "complexity-analysis",
"name": "代码复杂度分析",
"description": "分析代码文件的圈复杂度和质量指标",
"tags": ["code", "complexity", "quality"],
"inputModes": ["text"],
"outputModes": ["text", "json"]
}
],
provider={"organization": "CodeTools", "service": "code-analysis"}
)
async def main():
server = A2AServer(
agent_card=get_code_agent_card(),
agent=create_code_agent(),
host="0.0.0.0",
port=8002
)
print("📊 Code Analysis Agent A2A Server 启动中...")
await server.start()
if __name__ == "__main__":
asyncio.run(main())
Host Agent(协调者)(host_agent/agents/coordinator.py):
import asyncio
from typing import Optional
from google.adk import Agent
from google.adk.agents import Runner
from google.adk.models.lite_llm import LiteLLM
from a2a.client import A2AClient
from a2a.types import (
Message, TextPart, TaskQueryParams, TaskSendParams
)
class HostAgent:
"""
Host Agent(主控 Agent):
- 接收用户请求
- 解析意图,选择合适的 Remote Agent
- 委托任务,聚合结果
- 返回最终答案给用户
"""
def __init__(self):
# 连接 Remote Agent
self.weather_client = A2AClient(
url="http://localhost:8001"
)
self.code_client = A2AClient(
url="http://localhost:8002"
)
# 创建 Host Agent 本身
self.agent = Agent(
model=LiteLLM(model="gpt-4o"),
name="coordinator",
description="多功能 AI 助手,协调多个专业 Agent 协作",
instruction="""
你是一个智能协调者 Agent。当用户提出问题时,你需要判断:
1. 如果是天气相关 → 调用 weather-agent
2. 如果是代码分析相关 → 调用 code-analysis-agent
3. 如果两者都有 → 并行调用两个 Agent,再聚合结果
使用 A2A Client 向 Remote Agent 发送任务。
"""
)
self.runner = Runner(agent=self.agent)
async def handle_request(self, user_message: str) -> str:
"""
处理用户请求,智能路由到合适的 Remote Agent
"""
# 首先让 Host Agent 分析用户意图
intent_analysis = await self._analyze_intent(user_message)
# 根据意图决定调用哪些 Agent
tasks = []
if "weather" in intent_analysis or "天气" in intent_analysis:
city = self._extract_city(user_message)
tasks.append(("weather", self._call_weather_agent(city)))
if "code" in intent_analysis or "代码" in intent_analysis:
file_path = self._extract_file_path(user_message)
tasks.append(("code", self._call_code_agent(file_path)))
# 并行执行所有任务
if tasks:
results = await asyncio.gather(
*[task for _, task in tasks],
return_exceptions=True
)
# 聚合结果
return self._aggregate_results(tasks, results)
else:
return "抱歉,我无法处理这个请求。"
async def _call_weather_agent(self, city: str) -> dict:
"""调用天气 Agent"""
try:
result = await self.weather_client.send_task(
TaskSendParams(
id=f"weather-task-{id(city)}",
message=Message(
role="user",
content=TextPart(text=f"查询 {city} 的天气")
)
)
)
return {"status": "success", "data": result}
except Exception as e:
return {"status": "error", "data": str(e)}
async def _call_code_agent(self, file_path: str) -> dict:
"""调用代码分析 Agent"""
try:
result = await self.code_client.send_task(
TaskSendParams(
id=f"code-task-{id(file_path)}",
message=Message(
role="user",
content=TextPart(text=f"分析代码 {file_path}")
)
)
)
return {"status": "success", "data": result}
except Exception as e:
return {"status": "error", "data": str(e)}
async def _analyze_intent(self, message: str) -> str:
"""分析用户意图(简化版)"""
return message.lower()
def _extract_city(self, message: str) -> str:
"""提取城市名(简化版)"""
cities = ["北京", "上海", "广州", "深圳", "杭州"]
for city in cities:
if city in message:
return city
return "北京" # 默认
def _extract_file_path(self, message: str) -> str:
"""提取文件路径(简化版)"""
import re
match = re.search(r'[\w/._-]+\.(py|js|ts|go|rs|java)', message)
return match.group(0) if match else "main.py"
def _aggregate_results(self, tasks: list, results: list) -> str:
"""聚合多个 Agent 的结果"""
parts = []
for (task_type, _), result in zip(tasks, results):
if task_type == "weather":
if result.get("status") == "success":
data = result["data"]
parts.append(f"🌤️ 天气信息:{data}")
else:
parts.append(f"天气查询失败:{result.get('data')}")
elif task_type == "code":
if result.get("status") == "success":
data = result["data"]
parts.append(f"📊 代码分析:{data}")
else:
parts.append(f"代码分析失败:{result.get('data')}")
return "\n".join(parts) if parts else "未获取到有效结果"
# 运行 Host Agent
async def main():
host = HostAgent()
# 模拟用户请求
test_queries = [
"帮我查一下北京今天的天气",
"帮我分析 main.py 的代码复杂度",
"北京天气怎么样,同时帮我看看 src/app.py 的代码"
]
for query in test_queries:
print(f"\n👤 用户: {query}")
result = await host.handle_request(query)
print(f"🤖 回复: {result}")
if __name__ == "__main__":
asyncio.run(main())
四、MCP 与 A2A 的协作关系:不是竞争,是互补
4.1 协议层级对比
很多开发者容易混淆 MCP 和 A2A 的定位。实际上,它们解决的是不同层次的问题:
┌─────────────────────────────────────────────┐
│ 应用层:具体业务场景 │
│ ┌──────────────┐ ┌──────────────────────┐│
│ │ Host Agent │ │ Remote Agent(s) ││
│ └──────┬───────┘ └──────────┬───────────┘│
│ │ │ │
│ A2A │◄── Agent ↔ Agent 通讯层 │
│ 协议层 │ (任务委托、状态同步) │
│ │ │ │
│ MCP │◄── Agent ↔ 工具 交互层 │
│ 协议层 │ (文件、API、数据库) │
│ │ │ │
│ ┌──────┴───────┐ ┌──────────┴───────────┐│
│ │ MCP Server │ │ 外部工具/数据源 ││
│ │ (工具集) │ │ (GitHub/DB/地图...) ││
│ └──────────────┘ └──────────────────────┘│
└─────────────────────────────────────────────┘
4.2 典型场景:两者协同工作
场景:用户要求 Agent 自动审查 GitHub PR 并生成报告
步骤 1(用户 → Host Agent):"帮我审查 GitHub PR #123"
步骤 2(Host Agent):
├── 通过 MCP Server 调用 GitHub API 获取 PR 内容
├── 通过 MCP Server 调用代码分析工具
└── 通过 A2A 委托 code-review-agent 做深度审查
步骤 3(Remote Agent 通过 MCP):
├── 调用 GitHub MCP Server 提交 review 意见
├── 调用 Slack MCP Server 通知团队
步骤 4(Remote Agent → Host Agent):
└── 通过 A2A 返回审查结果
步骤 5(Host Agent → 用户):
└── 聚合所有结果,生成最终报告
4.3 技术选型决策树
需要选择 MCP 还是 A2A?
Q1: 你的 Agent 需要调用外部工具/服务吗?
│
├── YES → 需要 MCP
│ - 操作文件/数据库
│ - 调用第三方 API
│ - 搜索、翻译、地图等能力
│
└── NO → 继续判断
Q2: 你的系统中有多个 Agent 需要协作吗?
│
├── YES → 需要 A2A
│ - 任务委派
│ - 能力发现
│ - 多 Agent 串联
│
└── NO → 可能是单 Agent 系统,只需 MCP 或两者都不需要
Q3: 你的 Agent 需要与「其他公司的 Agent」通信吗?
│
└── YES → 必须用 A2A
(MCP 是工具协议,不处理跨 Agent 通信)
五、生产级集成策略
5.1 企业级 MCP + A2A 架构设计
对于生产环境,推荐以下架构:
┌──────────────────────────────────────────────────────┐
│ API Gateway │
│ (认证、限流、日志、路由) │
└────────────────────┬─────────────────────────────────┘
│
┌─────────────┼─────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Host │ │ Host │ │ Host │
│ Agent 1 │ │ Agent 2 │ │ Agent 3 │
│ (主控) │ │ (垂直 │ │ (垂直 │
│ │ │ 领域) │ │ 领域) │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ A2A 协议通信层 │
│◄──────────────────────────►│
│ │ │
├──→ MCP Server (内部工具集)
│ ├── GitHub MCP
│ ├── Database MCP
│ ├── Notification MCP
│
└──→ A2A Server (对外暴露)
├── Agent Card 注册表
├── 任务队列
└── 审计日志
5.2 安全治理:Agent Link 信任框架
清华大学 A2A 报告提出的 Agent Link 框架,为多 Agent 协作提供了完整的安全治理方案:
关系信誉层:记录 Agent 之间的历史协作数据,评估可信度:
# Agent Link: 关系信誉追踪
class AgentTrustLedger:
"""
追踪 Agent 之间的信任关系
类似 Web 时代的 PageRank,但用于 Agent 间的信任传播
"""
def __init__(self):
self.relationships: dict[str, list[TrustEdge]] = {}
def record_interaction(
self,
from_agent: str,
to_agent: str,
task_type: str,
success: bool,
data_accessed: list[str]
):
"""记录一次协作交互"""
edge = TrustEdge(
from_agent=from_agent,
to_agent=to_agent,
task_type=task_type,
success=success,
data_accessed=data_accessed,
timestamp=datetime.now(),
weight=self._calculate_weight(success, data_accessed)
)
self.relationships.setdefault(from_agent, []).append(edge)
def get_trust_score(self, agent_id: str) -> float:
"""计算 Agent 的信任分数(0-100)"""
edges = self.relationships.get(agent_id, [])
if not edges:
return 50.0 # 默认分数
weights = [e.weight for e in edges]
return sum(weights) / len(weights) * 100
def _calculate_weight(self, success: bool, data_accessed: list) -> float:
"""计算单次交互权重"""
base = 1.0 if success else 0.3
sensitivity_penalty = len([d for d in data_accessed if d in SENSITIVE_DATA]) * 0.1
return max(0.0, base - sensitivity_penalty)
SENSITIVE_DATA = ["password", "credit_card", "ssn", "api_key", "private_key"]
证据溯源层:记录每次数据访问的完整链路:
# Agent Link: 证据溯源
@dataclass
class AuditEntry:
"""审计日志条目:谁在什么时间访问了什么数据"""
agent_id: str
action: str
data_accessed: list[str]
purpose: str
authorized_by: str
timestamp: datetime
result: str # success / denied / error
session_id: str
class AuditLogger:
"""
不可篡改的审计日志
用于事后追溯和数据合规(GDPR 等)
"""
def __init__(self, storage_path: str):
self.storage_path = Path(storage_path)
self.current_file = self.storage_path / f"audit_{date.today()}.jsonl"
def log(self, entry: AuditEntry):
"""追加一条审计日志"""
self.current_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.current_file, 'a') as f:
f.write(json.dumps(asdict(entry), ensure_ascii=False) + '\n')
def query(self, agent_id: str, start: datetime, end: datetime) -> list[AuditEntry]:
"""查询指定 Agent 的审计记录"""
results = []
for log_file in self.storage_path.glob("audit_*.jsonl"):
with open(log_file) as f:
for line in f:
entry = json.loads(line)
if (entry["agent_id"] == agent_id and
start <= parse_datetime(entry["timestamp"]) <= end):
results.append(entry)
return results
委托审计层:验证跨 Agent 委托的合法性:
# Agent Link: 委托审计
class DelegationAuditor:
"""
检查 Agent 之间的委托请求是否合法
在 A2A 任务委托时自动触发
"""
def __init__(
self,
trust_ledger: AgentTrustLedger,
audit_logger: AuditLogger
):
self.trust_ledger = trust_ledger
self.audit_logger = audit_logger
async def authorize_delegation(
self,
delegator: str, # 委托方
delegate: str, # 受托方
task: str, # 任务描述
data_scope: list[str] # 需要访问的数据范围
) -> DelegationDecision:
"""判断委托是否授权"""
# 1. 检查受托 Agent 的信任分数
trust_score = self.trust_ledger.get_trust_score(delegate)
if trust_score < TRUST_THRESHOLD:
return DelegationDecision(
approved=False,
reason=f"受托方信任分数 {trust_score:.1f} 低于阈值 {TRUST_THRESHOLD}"
)
# 2. 检查数据访问范围是否合理
sensitive_data_in_scope = [d for d in data_scope if d in SENSITIVE_DATA]
if sensitive_data_in_scope:
# 敏感数据需要额外确认
audit_entry = AuditEntry(
agent_id=delegate,
action=f"delegation_access:{task}",
data_accessed=sensitive_data_in_scope,
purpose=task,
authorized_by=delegator,
timestamp=datetime.now(),
result="pending_approval",
session_id=generate_session_id()
)
self.audit_logger.log(audit_entry)
return DelegationDecision(
approved=True,
reason="包含敏感数据,已记录审计日志",
audit_required=True
)
return DelegationDecision(approved=True, reason="授权通过")
TRUST_THRESHOLD = 70 # 信任分数阈值
5.3 可观测性:多 Agent 系统的调试挑战
多 Agent 协作系统的可观测性,是生产部署中最容易被忽视但又最关键的问题。
核心挑战:
- 链路追踪:一个用户请求可能触发 10+ 个 Agent 调用,需要全链路追踪
- 状态一致性:多个 Agent 并行执行,中间状态如何保持一致
- 调试困难:Agent 的「思考过程」不透明,出问题难以定位
解决方案:Structured Logging + Trace ID
# 统一的分布式追踪
import uuid
from contextvars import ContextVar
trace_id: ContextVar[str] = ContextVar('trace_id', default='')
class AgentTracer:
"""为每次 Agent 协作生成完整的追踪日志"""
def __init__(self, logger: logging.Logger):
self.logger = logger
def start_trace(self) -> str:
"""启动一次追踪,返回 trace_id"""
tid = str(uuid.uuid4())[:8]
trace_id.set(tid)
self.logger.info(f"[{tid}] 🚀 追踪开始")
return tid
def log_agent_call(
self,
agent: str,
action: str,
input_data: dict,
output_data: dict = None,
error: str = None
):
"""记录一次 Agent 调用"""
tid = trace_id.get()
entry = {
"trace_id": tid,
"agent": agent,
"action": action,
"input": input_data,
"output": output_data,
"error": error,
"timestamp": datetime.now().isoformat()
}
self.logger.info(f"[{tid}] 📞 {agent}.{action}()", extra=entry)
# 同时写入结构化日志文件(供后续分析)
self._write_structured_log(entry)
def _write_structured_log(self, entry: dict):
"""写入结构化日志到 ES/Loki"""
# 生产环境应该发送到 ELK / Loki / Jaeger
log_file = Path("logs/agent_traces.jsonl")
log_file.parent.mkdir(exist_ok=True)
with open(log_file, 'a') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
5.4 性能优化:减少 Agent 间通信开销
多 Agent 协作的性能瓶颈主要在通信延迟和上下文膨胀:
策略一:批量委托
# 不要:逐个调用
for file in many_files:
await code_agent.analyze(file)
# 要:批量委托
await code_agent.analyze_batch(file_list)
策略二:结果缓存
from functools import lru_cache
class AgentResultCache:
"""Agent 调用结果缓存(基于内容hash)"""
def __init__(self, ttl_seconds: int = 300):
self.cache = {}
self.ttl = ttl_seconds
def _make_key(self, agent: str, task: str, params: dict) -> str:
content = f"{agent}:{task}:{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def get(self, agent: str, task: str, params: dict) -> Optional[dict]:
key = self._make_key(agent, task, params)
if key in self.cache:
entry = self.cache[key]
if time.time() - entry['timestamp'] < self.ttl:
return entry['result']
del self.cache[key]
return None
def set(self, agent: str, task: str, params: dict, result: dict):
key = self._make_key(agent, task, params)
self.cache[key] = {'result': result, 'timestamp': time.time()}
策略三:流式处理避免等待
对于长时间任务,使用 A2A 的 tasks/sendSubscribe 替代 tasks/send,让用户看到实时进度:
# 流式处理示例
async def process_with_progress(client: A2AClient, task_id: str):
async for update in client.send_subscribe(task_id):
state = update.status.state
message = update.status.message
if state == "working":
print(f"⏳ {message}") # 实时展示进度
elif state == "completed":
print(f"✅ 完成")
return update.artifacts
elif state == "failed":
print(f"❌ 失败: {message}")
return None
六、实战:从零构建一个 MCP + A2A 混合系统
6.1 项目目标
构建一个代码助手系统,包含:
- 1 个 Host Agent(协调者)
- 2 个 Remote Agent(代码审查 Agent、文档生成 Agent)
- 3 个 MCP Server(GitHub、数据库、文件系统)
6.2 完整代码
requirements.txt:
google-adk>=0.2.0
mcp>=1.0.0
fastmcp>=0.1.0
aiohttp>=3.9.0
structlog>=24.0.0
system_architecture.py(完整系统):
"""
MCP + A2A 混合系统完整示例
一个代码助手:接收用户请求 → 协调多个 Agent → 返回结果
"""
import asyncio
import json
import logging
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional
import structlog
# 配置结构化日志
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
# ============================================================
# MCP Server: 文件系统操作
# ============================================================
from mcp.server.fastmcp import FastMCP
mcp_fs = FastMCP(name="filesystem")
@mcp_fs.tool(name="read_file", description="读取文件内容")
def read_file(path: str, encoding: str = "utf-8") -> dict:
try:
content = Path(path).read_text(encoding=encoding, errors='replace')
return {
"success": True,
"path": path,
"lines": len(content.split('\n')),
"preview": content[:500]
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp_fs.tool(name="list_directory", description="列出目录内容")
def list_directory(path: str, recursive: bool = False) -> dict:
try:
p = Path(path)
if not p.exists():
return {"success": False, "error": "目录不存在"}
if recursive:
items = [
{"path": str(item), "type": "dir" if item.is_dir() else "file"}
for item in p.rglob('*')
]
else:
items = [
{"name": item.name, "type": "dir" if item.is_dir() else "file"}
for item in p.iterdir()
]
return {"success": True, "path": path, "items": items}
except Exception as e:
return {"success": False, "error": str(e)}
# ============================================================
# A2A: Remote Agent 定义
# ============================================================
from a2a.server import A2AServer
from a2a.types import AgentCard, TaskStatus, TaskState, Message, TextPart, DataPart
async def create_code_review_agent_server():
"""代码审查 Agent(Remote Agent 2)"""
def get_agent_card() -> AgentCard:
return AgentCard(
name="code-review-agent",
description="专业的代码审查 Agent,支持安全扫描、风格检查、性能分析",
version="1.0.0",
capabilities={"streaming": True, "stateTransitions": True},
skills=[
{
"id": "security-scan",
"name": "安全扫描",
"tags": ["security", "vulnerability"]
},
{
"id": "style-check",
"name": "风格检查",
"tags": ["style", "lint"]
}
]
)
async def handle_task(message: Message) -> dict:
"""处理代码审查任务"""
user_text = message.content.parts[0].text if message.content.parts else ""
# 简化版代码审查逻辑
findings = []
# 检测潜在安全问题
if "password" in user_text.lower() or "secret" in user_text.lower():
findings.append({
"type": "SECURITY",
"severity": "HIGH",
"description": "可能存在敏感信息泄露风险"
})
# 检测代码复杂度
if len(user_text) > 500:
findings.append({
"type": "COMPLEXITY",
"severity": "MEDIUM",
"description": f"代码较长({len(user_text)} 字符),建议拆分"
})
return {
"status": "completed",
"findings": findings,
"summary": f"审查完成,发现 {len(findings)} 个问题"
}
server = A2AServer(
agent_card=get_agent_card(),
skill_handlers={"security-scan": handle_task},
host="0.0.0.0",
port=8001
)
return server
# ============================================================
# Host Agent: 主控协调器
# ============================================================
@dataclass
class UserRequest:
"""用户请求结构"""
text: str
attachments: list[str] = None
user_id: str = "anonymous"
@dataclass
class AgentResponse:
"""Agent 响应结构"""
agent: str
result: dict
duration_ms: float
success: bool
class HostAgent:
"""
Host Agent(主控协调器)
职责:
1. 接收用户请求
2. 通过 MCP 操作本地资源(文件系统)
3. 通过 A2A 委托专业 Agent
4. 聚合结果返回给用户
"""
def __init__(self):
self.a2a_clients = {
"code-review": A2AClient(url="http://localhost:8001"),
}
logger.info("Host Agent 初始化完成")
async def process(self, request: UserRequest) -> dict:
"""
处理用户请求的主入口
"""
trace_id = logger.new(request_id=request.user_id).info("开始处理请求")
# 阶段1:解析用户意图
intent = self._parse_intent(request.text)
logger.info(f"意图识别结果: {intent}")
results = []
start_time = asyncio.get_event_loop().time()
# 阶段2:根据意图调用相应 Agent
if "review" in intent or "审查" in intent:
result = await self._call_code_review_agent(request.text)
results.append(result)
if "read" in intent or "读取" in intent:
# 直接使用 MCP
file_path = self._extract_file_path(request.text)
result = await self._call_mcp_fs("read_file", {"path": file_path})
results.append(result)
# 阶段3:聚合结果
duration = (asyncio.get_event_loop().time() - start_time) * 1000
return {
"success": True,
"request": request.text,
"intent": intent,
"results": results,
"duration_ms": round(duration, 2),
"trace_id": trace_id
}
def _parse_intent(self, text: str) -> list[str]:
"""解析用户意图(简化版)"""
intents = []
keywords = {
"review": ["审查", "review", "检查", "扫描"],
"read": ["读取", "read", "查看", "打开"]
}
text_lower = text.lower()
for intent, kws in keywords.items():
if any(kw in text_lower for kw in kws):
intents.append(intent)
return intents or ["general"]
def _extract_file_path(self, text: str) -> str:
"""提取文件路径"""
import re
patterns = [
r'[\w/._-]+\.(py|js|ts|go|rs|java|cpp|c|h)',
r'[/\\][\w/._-]+' # 通用路径
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(0)
return "."
async def _call_code_review_agent(self, task: str) -> AgentResponse:
"""通过 A2A 调用代码审查 Agent"""
start = asyncio.get_event_loop().time()
try:
client = self.a2a_clients["code-review"]
result = await client.send_task(TaskSendParams(
id=f"cr-{datetime.now().timestamp()}",
message=Message(role="user", content=TextPart(text=task))
))
duration = (asyncio.get_event_loop().time() - start) * 1000
return AgentResponse(
agent="code-review-agent",
result=result,
duration_ms=round(duration, 2),
success=True
)
except Exception as e:
duration = (asyncio.get_event_loop().time() - start) * 1000
return AgentResponse(
agent="code-review-agent",
result={"error": str(e)},
duration_ms=round(duration, 2),
success=False
)
async def _call_mcp_fs(self, tool: str, params: dict) -> AgentResponse:
"""通过 MCP 调用文件系统工具"""
start = asyncio.get_event_loop().time()
try:
if tool == "read_file":
result = read_file(**params)
elif tool == "list_directory":
result = list_directory(**params)
else:
result = {"error": f"Unknown tool: {tool}"}
duration = (asyncio.get_event_loop().time() - start) * 1000
return AgentResponse(
agent="mcp-filesystem",
result=result,
duration_ms=round(duration, 2),
success=result.get("success", True)
)
except Exception as e:
duration = (asyncio.get_event_loop().time() - start) * 1000
return AgentResponse(
agent="mcp-filesystem",
result={"error": str(e)},
duration_ms=round(duration, 2),
success=False
)
# ============================================================
# 启动入口
# ============================================================
async def main():
"""启动完整系统"""
print("=" * 60)
print(" MCP + A2A 混合代码助手系统")
print(" 架构:1 Host Agent + 2 Remote Agents + 3 MCP Servers")
print("=" * 60)
# 启动 MCP Server(文件系统)
print("\n📁 启动 MCP 文件系统 Server...")
# mcp_fs.run() # 阻塞运行,此处省略
# 启动 A2A Remote Agent
print("\n🔍 启动 A2A 代码审查 Agent...")
# await create_code_review_agent_server() # 阻塞运行,此处省略
# 启动 Host Agent
print("\n🤖 启动 Host Agent...")
host = HostAgent()
# 模拟用户请求
print("\n" + "=" * 60)
print(" 系统就绪,接收请求测试")
print("=" * 60)
test_cases = [
UserRequest(
text="帮我审查 src/main.py 的安全性",
user_id="user-001"
),
UserRequest(
text="列出当前目录的文件结构",
user_id="user-002"
)
]
for req in test_cases:
print(f"\n👤 用户请求: {req.text}")
result = await host.process(req)
print(f"✅ 处理完成,耗时: {result['duration_ms']}ms")
print(f" 意图: {result['intent']}")
print(f" Agent 调用: {len(result['results'])} 个")
for r in result['results']:
status = "✅" if r.success else "❌"
print(f" {status} {r.agent}: {r.duration_ms}ms")
if __name__ == "__main__":
asyncio.run(main())
七、总结与展望
7.1 核心要点回顾
| 维度 | MCP | A2A |
|---|---|---|
| 定位 | AI ↔ 工具/数据 | AI ↔ AI |
| 主导方 | Anthropic | Google (Linux Foundation) |
| 核心价值 | 工具标准化,即插即用 | Agent 协作语言,能力发现 |
| 传输模式 | stdio / SSE / HTTP | HTTP + JSON-RPC |
| 典型场景 | 调用 GitHub API、读写文件 | 任务委派、并行协作 |
| 安全重点 | 资源隔离、输入验证 | 信任传递、审计追溯 |
7.2 2026 年 Agent 协作技术趋势
MCP + A2A 双协议融合:主流 Agent 平台将同时支持 MCP 和 A2A,形成完整的 Agent 生态
Agent Card 标准化:类似 App Store 的 Agent 发现机制,企业内部可以构建私有 Agent 市场
信任框架落地:Agent Link 等信任框架将从研究走向生产,成为企业合规的必备能力
跨平台协作:微信、支付宝等超级 App 开放 A2A 能力后,AI Agent 的商业化路径将彻底打通
7.3 给开发者的建议
短期(1-3个月):
- 熟悉 MCP Server 的开发方式,从简单的 stdio 模式开始
- 选一个你熟悉的场景,尝试接入 MCP(如 GitHub、数据库)
- 关注 Google ADK 和 Anthropic MCP SDK 的更新
中期(3-6个月):
- 了解 A2A 协议的核心概念(Agent Card、Task 生命周期)
- 尝试构建多 Agent 协作的原型系统
- 建立 Agent 可观测性基础设施(结构化日志、追踪)
长期(6-12个月):
- 构建企业内部 Agent 治理体系(权限、审计、合规)
- 参与开源 Agent 协议标准的制定和贡献
- 探索跨组织 Agent 协作的商业模式
写在最后:MCP 和 A2A 不是两个要选一个的竞争方案,而是同一套 Agent 基础设施的两层。理解它们的互补关系,比争论哪个更好更重要。2026 年是 Agent 规模化落地的元年,多 Agent 协作从概念走向生产,是这个领域最重要的发展方向。早点掌握这两套协议,你就能在 Agent 时代抢占先机。
本文首发于 程序员茄子,如需转载,请保留原文链接。
标签:AI Agent | MCP | A2A | 多智能体协作 | Google ADK | Anthropic | 生产级架构
关键词:Model Context Protocol | Agent-to-Agent | AI Agent | Multi-Agent | MCP Server | A2A Protocol | Agent Card | Google ADK | 智能体协作 | Agent Link | 信任框架