编程 A2A + MCP 双协议深度解析:当多智能体协作成为工程现实——从协议原理到生产级代码实战

2026-06-10 12:19:13 +0800 CST views 3

A2A + MCP 双协议深度解析:当多智能体协作成为工程现实——从协议原理到生产级代码实战

作者:程序员茄子 | 2026-06-10 | 分类:编程 | 字数:约 12000 字


前言:Agent 时代的基础设施战争

2026 年,AI Agent 从「能聊天」进化到「能干活」,这个转变的关键不在模型本身,而在协议层

当一个 AI Agent 需要调用另一个 Agent 完成子任务时,它面临三个根本问题:信谁?怎么通信?出了事谁负责?

MCP(Model Context Protocol)和 A2A(Agent-to-Agent)协议,正是解决这三个问题的两套互补方案。它们不是竞争关系,而是同一套基础设施的两层:MCP 解决「AI 如何操控工具」,A2A 解决「AI 如何与其他 AI 协作」。

清华大学 2026 年 6 月发布的 A2A 研究报告,更是将 A2A 定义为「AI 时代数字委托协议与组织基础设施」,类比 Web 时代的 PageRank。Google 将 A2A 协议捐给 Linux 基金会,微信与华为、小米、OPPO、vivo 等手机厂商合作推出 A2A 助手能力——这标志着 Agent 互联协议正式进入商用阶段。

本文从工程视角出发,深入拆解这两套协议的架构原理、协作机制、代码实现,以及在生产环境中的集成策略。无论你是做 AI Agent 开发、后端架构设计,还是在做企业级 AI 选型,这篇文章都会给你完整的技术参照系。


一、为什么需要多协议:拆解 AI Agent 的三大痛点

在深入协议细节之前,我们先理解为什么 MCP + A2A 的双协议架构会成为 2026 年的主流选择。

1.1 痛点一:工具调用「诸侯混战」

在 MCP 出现之前,每个 AI 平台都有自己的工具调用规范:

OpenAI → function calling (GPT 系列专用)
Anthropic → tool use (Claude 系列专用)
Google → Bison tool use
各公司自研 Agent → 自定义 JSON Schema

这导致一个严重问题:写好的工具在平台间无法复用。你在 Cursor 里调试好的 GitHub MCP Server,切到 Claude Code 就得重写一遍。

MCP 的核心价值是标准化工具描述,让「AI 调工具」这件事变得像 USB 接口一样即插即用。

1.2 痛点二:多 Agent 协作「没有共同语言」

当业务复杂到需要多个 Agent 协作时,问题来了:

  • Agent A 想知道 Agent B 能做什么 → 没有「能力发现」机制
  • Agent A 把任务委托给 Agent B → 没有「任务描述」标准
  • Agent B 遇到无法解决的问题 → 没有「升级上报」通道
  • 两方协作出了 bug → 没有「责任追溯」体系

A2A 协议正是为了解决这些问题而设计的。它定义了 Agent 之间的通信语言、能力描述、任务委托流程和信任框架。

1.3 痛点三:安全与信任「最后一公里」

清华大学 A2A 报告指出了一个被大多数开发者忽视的问题:多 Agent 协作中的信任传递。当 Agent A 委托 Agent B 访问用户数据时,谁来保证 Agent B 不会滥用权限?

MCP 提供了资源隔离和权限控制的基础能力,而 A2A 则引入了 Agent Link 信任框架——关系信誉、证据溯源、委托审计三大模块,解决「信谁、凭什么信任、事后追责」的难题。


二、MCP 协议深度解析:AI 的「USB 接口」

2.1 MCP 是什么

MCP(Model Context Protocol)是 Anthropic 于 2024 年底开源的标准化协议,定义了 AI 大模型如何与外部工具、数据源进行通信

它的设计哲学非常清晰:

┌─────────────────────────────────────────────────────┐
│                    MCP Host (如 Cursor/Claude Code) │
│  ┌──────────────┐    ┌──────────────┐              │
│  │  LLM (大脑)  │ ←→ │ MCP Client   │              │
│  └──────────────┘    └──────┬───────┘              │
└─────────────────────────────┼──────────────────────┘
                              │ stdio / SSE / HTTP
        ┌─────────────────────┼─────────────────────┐
        │        MCP Server 1 │    MCP Server 2     │
        │  ┌───────────────┐  │  ┌───────────────┐  │
        │  │ GitHub Tools  │  │  │ DB Tools      │  │
        │  └───────────────┘  │  └───────────────┘  │
        └────────────────────────────────────────────┘

2.2 MCP 的核心架构

MCP 采用客户端-服务器架构,包含四个核心组件:

2.2.1 MCP Host(宿主应用)

集成 MCP 功能的 AI 驱动应用程序,如 Cursor、Claude Code、VS Code 等。Host 负责:

  • 管理 MCP Client 连接
  • 展示工具调用结果给用户
  • 处理认证和会话管理

2.2.2 MCP Client(客户端)

运行在 Host 内部的客户端模块,每个连接的 MCP Server 对应一个 Client 实例。Client 的职责是:

  • 与 Server 建立连接(stdio / SSE / HTTP)
  • 转发 LLM 的工具调用请求
  • 接收并处理 Server 返回的结果

2.2.3 MCP Server(服务器)

提供特定工具和资源的独立服务。每个 Server 暴露一组 Tools(AI 可调用的操作)、Resources(AI 可读取的数据)和 Prompts(预定义的提示模板)。

2.2.4 Transport Layer(传输层)

MCP 支持三种传输方式:

stdio(标准输入/输出):适合本地进程通信,启动命令后通过 stdin/stdout 交换 JSON-RPC 消息。

// mcp_server.py
from mcp.server.fastmcp import FastMCP

mcp = FastMCP(name="my-server")

@mcp.tool(name="get_weather", description="获取城市天气")
def get_weather(city: str) -> str:
    return f"{city} 今日晴朗,25°C"

if __name__ == "__main__":
    mcp.run(transport_type="stdio")

SSE(Server-Sent Events):适合需要服务端推送的场景,如实时日志、进度通知。

# 通过 HTTP + SSE 方式运行
mcp.run(transport_type="sse", host="127.0.0.1", port=3000)

HTTP + POST:适合微服务架构,每个工具调用是一个 HTTP POST 请求。

2.3 MCP 工具定义:代码级拆解

MCP 的核心是工具描述 schema。一个标准的 MCP 工具定义包含以下字段:

// MCP Tool 定义(TypeScript 类型)
interface Tool {
  name: string;           // 工具名称,AI 用它来决定调用哪个工具
  description: string;    // 工具描述,AI 用它来理解工具用途
  inputSchema: {          // 输入参数 schema(JSON Schema 格式)
    type: "object";
    properties: {
      [key: string]: {
        type: string;
        description: string;
        default?: any;
        enum?: string[];
      };
    };
    required: string[];
  };
}

// 示例:一个完整的 MCP 工具
{
  name: "search_code",
  description: "在代码仓库中搜索包含特定关键词的文件和代码片段",
  inputSchema: {
    type: "object",
    properties: {
      query: {
        type: "string",
        description: "搜索关键词,支持正则表达式"
      },
      file_pattern: {
        type: "string",
        description: "文件匹配模式,如 *.py、src/**/*.ts",
        default: "*"
      },
      max_results: {
        type: "number",
        description: "最大返回结果数",
        default: 10
      }
    },
    required: ["query"]
  }
}

这个 schema 是 AI 决定「何时调用」和「传什么参数」的核心依据。MCP 的工具描述比 OpenAI 的 function calling 更规范,因为它遵循 JSON Schema 标准,任何支持 JSON Schema 的系统都能自动解析和验证

2.4 MCP 实战:用 FastMCP 从零构建生产级 Server

下面是一个完整的实战案例:构建一个支持代码仓库分析的 MCP Server,包含文件搜索、代码统计、依赖分析三个工具。

# mcp_code_analyzer/server.py
import os
import re
import json
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP

mcp = FastMCP(
    name="code-analyzer",
    description="代码仓库分析工具集"
)


@mcp.tool(
    name="search_code",
    description="在代码仓库中全文搜索代码,支持正则表达式"
)
def search_code(
    repo_path: str,
    query: str,
    file_pattern: str = "*",
    case_sensitive: bool = False
) -> dict:
    """
    在指定仓库中搜索代码。

    Args:
        repo_path: 仓库根目录路径
        query: 搜索关键词或正则表达式
        file_pattern: 文件匹配模式(如 *.py, src/**/*.ts)
        case_sensitive: 是否大小写敏感

    Returns:
        包含匹配结果的字典:文件名、行号、匹配内容
    """
    results = []
    flags = 0 if case_sensitive else re.IGNORECASE

    try:
        pattern = re.compile(query, flags)
    except re.error:
        return {"error": f"Invalid regex pattern: {query}"}

    repo = Path(repo_path)
    if not repo.exists():
        return {"error": f"Repository path not found: {repo_path}"}

    # 递归搜索文件
    for file_path in repo.rglob(file_pattern):
        if not file_path.is_file():
            continue
        # 跳过 node_modules, .git 等目录
        if any(part.startswith('.') or part == 'node_modules'
               for part in file_path.parts):
            continue

        try:
            content = file_path.read_text(encoding='utf-8', errors='ignore')
            lines = content.split('\n')

            for line_no, line in enumerate(lines, start=1):
                if pattern.search(line):
                    results.append({
                        "file": str(file_path.relative_to(repo)),
                        "line": line_no,
                        "content": line.strip(),
                        "context": lines[max(0, line_no-3):line_no+2]
                    })
        except Exception:
            continue

    return {
        "total_matches": len(results),
        "files_matched": len(set(r["file"] for r in results)),
        "results": results[:100]  # 限制返回数量
    }


@mcp.tool(
    name="analyze_structure",
    description="分析代码仓库的整体结构和复杂度指标"
)
def analyze_structure(repo_path: str) -> dict:
    """
    分析仓库结构,生成复杂度指标和依赖图谱。

    Returns:
        包含代码统计、文件类型分布、依赖关系等信息的字典
    """
    repo = Path(repo_path)
    if not repo.exists():
        return {"error": f"Repository path not found: {repo_path}"}

    stats = {
        "total_files": 0,
        "total_lines": 0,
        "languages": {},
        "largest_files": [],
        "dir_structure": {},
        "empty_files": 0
    }

    for file_path in repo.rglob('*'):
        if not file_path.is_file():
            continue
        if any(part.startswith('.') for part in file_path.parts):
            continue

        stats["total_files"] += 1
        ext = file_path.suffix.lower()
        lang_map = {
            '.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript',
            '.java': 'Java', '.go': 'Go', '.rs': 'Rust',
            '.cpp': 'C++', '.c': 'C', '.rb': 'Ruby',
            '.php': 'PHP', '.swift': 'Swift', '.kt': 'Kotlin',
            '.vue': 'Vue', '.svelte': 'Svelte', '.md': 'Markdown'
        }
        lang = lang_map.get(ext, 'Other')
        stats["languages"][lang] = stats["languages"].get(lang, 0) + 1

        try:
            lines = file_path.read_text(encoding='utf-8', errors='ignore').split('\n')
            line_count = len(lines)
            stats["total_lines"] += line_count

            if line_count == 0:
                stats["empty_files"] += 1

            # 记录大文件
            if line_count > 500:
                stats["largest_files"].append({
                    "file": str(file_path.relative_to(repo)),
                    "lines": line_count
                })
        except Exception:
            continue

    stats["largest_files"].sort(key=lambda x: x["lines"], reverse=True)
    stats["largest_files"] = stats["largest_files"][:10]

    return stats


@mcp.tool(
    name="find_dependencies",
    description="分析代码中的 import/require 语句,生成依赖图谱"
)
def find_dependencies(repo_path: str, target_file: str) -> dict:
    """
    分析指定文件及其依赖关系。

    Args:
        repo_path: 仓库根目录
        target_file: 目标文件的相对路径

    Returns:
        包含 imports、exports、潜在循环依赖等信息的字典
    """
    file_path = Path(repo_path) / target_file
    if not file_path.exists():
        return {"error": f"File not found: {target_file}"}

    ext = file_path.suffix.lower()
    imports = []
    exports = []

    try:
        content = file_path.read_text(encoding='utf-8', errors='ignore')

        # Python: import / from ... import
        if ext == '.py':
            import re as reg
            imports = reg.findall(
                r'^(?:from\s+([\w.]+)\s+)?import\s+(.+)$',
                content, reg.MULTILINE
            )

        # JavaScript/TypeScript: import / require
        elif ext in ('.js', '.ts', '.jsx', '.tsx', '.vue'):
            import re as reg
            # ES6 imports
            es6_imports = reg.findall(
                r'import\s+(?:{\s*([^}]+)\s*|\*\s+as\s+\w+\s+|\w+\s+)\s+from\s+[\'"]([^\'"]+)[\'"]',
                content
            )
            # CommonJS require
            cjs_imports = reg.findall(
                r'require\s*\(\s*[\'"]([^\'"]+)[\'"]\s*\)',
                content
            )
            # Named exports
            exports = reg.findall(r'export\s+(?:const|let|var|function|class)\s+(\w+)', content)

        return {
            "file": target_file,
            "imports": imports[:50],  # 限制数量
            "exports": exports,
            "line_count": len(content.split('\n')),
            "file_size_bytes": file_path.stat().st_size
        }

    except Exception as e:
        return {"error": str(e)}


if __name__ == "__main__":
    # 本地 stdio 模式运行
    mcp.run()

在 VS Code 中使用这个 Server.vscode/mcp.json):

{
  "servers": {
    "code-analyzer": {
      "type": "stdio",
      "command": "python3",
      "args": ["/path/to/mcp_code_analyzer/server.py"]
    }
  }
}

之后,在 Cursor 或支持 MCP 的编辑器中,AI 就能直接调用 search_codeanalyze_structurefind_dependencies 这三个工具了——无需任何额外的提示词工程

2.5 MCP 的安全机制

MCP 在安全设计上做了多层次考虑:

资源隔离:MCP Server 只能访问自己声明的资源,Host 在调用前会检查权限。

输入验证:所有工具参数都经过 JSON Schema 校验,防止注入攻击。

本地优先:默认支持本地 stdio 模式,数据不离开本地环境。

OAuth 2.1 支持(计划中):MCP 官方计划增加对企业级身份验证的支持,允许远程服务器的安全认证。


三、A2A 协议深度解析:AI 之间的「TCP/IP」

3.1 A2A 是什么

A2A(Agent-to-Agent)协议是 Google 主导、已捐给 Linux 基金会的开放协议,用于实现 AI Agent 之间的无缝通信和互操作性。

如果说 MCP 是「AI 操控工具的 USB 接口」,那么 A2A 就是「AI 与 AI 通信的 TCP/IP 协议」。

A2A 的核心设计目标是解决多 Agent 协作中的五个关键问题

问题              A2A 解决方案
─────────────────────────────────────────────────
能力发现          Agent Card(能力描述卡)
任务委托          Task/TaskSubmit 消息
状态同步          TaskStatusUpdate/TaskState
结果返回          TaskResult/Artifact
信任传递          Agent Link 框架

3.2 A2A 的核心概念

3.2.1 Agent Card(能力描述卡)

Agent Card 是 A2A 的「自我介绍」机制。每个 Agent 在 A2A Server 上注册时,必须声明自己的能力:

{
  "name": "code-review-agent",
  "description": "专业的代码审查 Agent,支持安全漏洞检测、代码风格检查、性能问题识别",
  "version": "1.2.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitions": true
  },
  "skills": [
    {
      "id": "security-scan",
      "name": "安全扫描",
      "description": "检测代码中的安全漏洞,包括 SQL 注入、XSS、CSRF 等",
      "tags": ["security", "vulnerability", "OWASP"],
      "inputModes": ["text", "code"],
      "outputModes": ["text", "json", "report"]
    },
    {
      "id": "style-check",
      "name": "代码风格检查",
      "description": "检查代码风格是否遵循 PEP8/ESLint 规范",
      "tags": ["style", "lint", "formatting"],
      "inputModes": ["code"],
      "outputModes": ["text", "json"]
    }
  ],
  "endpoints": {
    "openapiSpecUrl": "http://localhost:8080/openapi.json"
  },
  "provider": {
    "organization": "DevTeam",
    "service": "code-review-service"
  }
}

这个 Agent Card 是其他 Agent「发现」和「选择」协作伙伴的核心依据。当 Host Agent 需要做代码审查时,它会查询 A2A Server 上的 Agent Card,找到 code-review-agent,然后决定委托任务。

3.2.2 Task(任务)生命周期

A2A 中,任务(Task)是协作的基本单元。每个 Task 有完整的状态流转:

Task 状态流转图:

SUBMITTED ──→ WORKING ──→ INPUT_REQUIRED
                    │            │
                    ↓            │
                COMPLETED ←──────┘
                    │
            (遇到无法解决的问题)
                    │
                    ↓
               INPUT_REQUIRED ──→ (等待外部输入)
                    │
            (外部输入到达,继续)
                    ↓
                WORKING ──→ COMPLETED

任何状态下 ──→ FAILED / CANCELED

Task 的 JSON 结构

{
  "id": "task-uuid-xxxxx",
  "sessionId": "session-uuid-yyyyy",
  "status": {
    "state": "working",
    "timestamp": "2026-06-10T12:00:00Z"
  },
  "artifacts": [
    {
      "artifactId": "artifact-1",
      "name": "review-report",
      "description": "代码审查报告,包含安全漏洞列表和改进建议",
      "contentType": "application/json",
      "parts": [
        {
          "type": "data",
          "data": {
            "summary": "发现 3 个高危漏洞,5 个中危漏洞",
            "vulnerabilities": [
              {"severity": "HIGH", "type": "SQL Injection", "location": "src/db.py:45"}
            ]
          }
        }
      ]
    }
  ],
  "messages": [
    {
      "role": "agent",
      "content": {
        "parts": [
          {
            "type": "text",
            "text": "开始对 main.py 进行安全扫描..."
          }
        ]
      }
    }
  ]
}

3.3 A2A 协议的消息类型

A2A 定义了六类核心消息,覆盖了多 Agent 协作的全流程:

3.3.1 tasks/send 消息(同步任务发送)

Host Agent 向 Remote Agent 发送任务,Remote Agent 同步处理后立即返回结果:

// Request
{
  "method": "tasks/send",
  "params": {
    "id": "task-001",
    "sessionId": "session-001",
    "message": {
      "role": "user",
      "content": {
        "parts": [
          {
            "type": "text",
            "text": "请审查 src/auth.py 文件,重点检查身份认证逻辑"
          }
        ]
      }
    }
  }
}

// Response
{
  "result": {
    "id": "task-001",
    "status": { "state": "completed" },
    "artifacts": [
      {
        "artifactId": "review-result",
        "name": "auth-review",
        "contentType": "application/json",
        "parts": [
          {
            "type": "data",
            "data": {
              "issues": [
                {
                  "severity": "HIGH",
                  "line": 45,
                  "type": "Hardcoded Credentials",
                  "description": "发现硬编码密码,应使用环境变量",
                  "suggestion": "使用 os.environ.get('DB_PASSWORD')"
                }
              ],
              "summary": "共发现 1 个高危、2 个中危、4 个低危问题"
            }
          }
        ]
      }
    ]
  }
}

3.3.2 tasks/sendSubscribe 消息(订阅式任务)

适用于需要长时间处理的任务,Host Agent 订阅后可以实时接收状态更新:

// Request
{
  "method": "tasks/sendSubscribe",
  "params": {
    "id": "task-002",
    "sessionId": "session-002",
    "message": {
      "role": "user",
      "content": {
        "parts": [
          {
            "type": "text",
            "text": "扫描整个代码仓库,生成完整的安全报告"
          }
        ]
      }
    }
  }
}

// Server 会通过 SSE 推送多个状态更新
// Update 1: status=working, message="正在扫描 src/api..."
// Update 2: status=working, message="发现可疑代码,正在分析..."
// Update 3: status=completed, artifacts=[完整报告]

3.3.3 tasks/get 消息(查询任务状态)

查询某个任务当前的状态和已有的结果:

{
  "method": "tasks/get",
  "params": {
    "id": "task-001"
  }
}

3.4 A2A 实战:用 Google ADK 构建多 Agent 协作系统

下面是一个完整的实战案例:使用 Google Agent Development Kit (ADK) 构建 Host Agent + Remote Agents 的协作系统。

项目结构

a2a_collaboration/
├── host_agent/
│   ├── __init__.py
│   ├── main.py              # Host Agent 主程序
│   ├── agents/
│   │   ├── __init__.py
│   │   ├── coordinator.py   # 协调 Agent(主控)
│   │   ├── weather_agent.py # 天气查询 Agent
│   │   └── code_agent.py   # 代码分析 Agent
│   └── a2a/
│       ├── server.py        # A2A Server
│       └── agent_cards.json # Agent Card 注册表
└── requirements.txt

Remote Agent 1:天气查询host_agent/agents/weather_agent.py):

import asyncio
from google.adk import Agent
from google.adk.agents import Runner
from google.adk.models.lite_llm import LiteLLM
from google.adk.tools import FunctionTool
from a2a.server import A2AServer
from a2a.server.models import AgentCard, Task
from a2a.types import (
    TaskStatus, TaskState, Message, TextPart, DataPart
)


# 天气查询工具(Remote Agent 提供的核心能力)
def get_weather(city: str, days: int = 3) -> dict:
    """获取指定城市的天气预报"""
    # 模拟真实 API 调用
    weather_data = {
        "北京": [
            {"day": "今天", "temp": "28°C", "weather": "晴", "pm25": "35"},
            {"day": "明天", "temp": "26°C", "weather": "多云", "pm25": "42"},
            {"day": "后天", "temp": "24°C", "weather": "小雨", "pm25": "58"},
        ],
        "上海": [
            {"day": "今天", "temp": "30°C", "weather": "晴", "pm25": "28"},
            {"day": "明天", "temp": "31°C", "weather": "晴", "pm25": "30"},
            {"day": "后天", "temp": "29°C", "weather": "阴", "pm25": "45"},
        ]
    }

    result = weather_data.get(city, [])
    if not result:
        return {"error": f"未找到城市 {city} 的数据"}

    return {
        "city": city,
        "forecast": result[:days],
        "generated_at": "2026-06-10T12:00:00+08:00"
    }


# 定义 Remote Agent 的 A2A Card
def get_weather_agent_card() -> AgentCard:
    return AgentCard(
        name="weather-agent",
        description="专业天气查询 Agent,支持多城市、多日预报、空气质量查询",
        version="1.0.0",
        capabilities={
            "streaming": True,
            "pushNotifications": False,
            "stateTransitions": True
        },
        skills=[
            {
                "id": "weather-forecast",
                "name": "天气预报查询",
                "description": "查询任意城市的天气预报,支持1-7日预报",
                "tags": ["weather", "forecast", "air-quality"],
                "inputModes": ["text"],
                "outputModes": ["text", "json"]
            }
        ],
        endpoints={
            "openapiSpecUrl": "http://localhost:8001/openapi.json"
        },
        provider={
            "organization": "WeatherService",
            "service": "weather-agent"
        }
    )


# 创建 ADK Agent
def create_weather_agent() -> Agent:
    return Agent(
        model=LiteLLM(model="gpt-4o"),
        name="weather_agent",
        description="天气查询专家",
        tools=[
            FunctionTool(
                func=get_weather,
                name="get_weather",
                description="获取天气预报数据"
            )
        ],
        instruction="""
        你是一个专业的天气预报 Agent。
        当用户询问天气时,使用 get_weather 工具获取数据,
        并以清晰友好的方式呈现天气预报结果。
        如果用户询问的是不支持的城市,礼貌地说明。
        """
    )


# 启动 A2A Server
async def main():
    server = A2AServer(
        agent_card=get_weather_agent_card(),
        agent=create_weather_agent(),
        host="0.0.0.0",
        port=8001
    )
    print("🌤️  Weather Agent A2A Server 启动中...")
    await server.start()


if __name__ == "__main__":
    asyncio.run(main())

Remote Agent 2:代码分析host_agent/agents/code_agent.py):

import asyncio
from pathlib import Path
from google.adk import Agent
from google.adk.tools import FunctionTool
from a2a.server import A2AServer
from a2a.server.models import AgentCard


def analyze_code_complexity(file_path: str) -> dict:
    """分析代码文件的复杂度指标"""
    path = Path(file_path)
    if not path.exists():
        return {"error": f"File not found: {file_path}"}

    try:
        lines = path.read_text(encoding='utf-8').split('\n')
        code_lines = [l for l in lines if l.strip() and not l.strip().startswith('#')]

        # 简单的圈复杂度估算(基于分支关键词)
        branch_keywords = ['if', 'elif', 'for', 'while', 'except', 'and', 'or', '?', '&&', '||']
        branches = sum(1 for line in lines for kw in branch_keywords if kw in line)

        return {
            "file": file_path,
            "total_lines": len(lines),
            "code_lines": len(code_lines),
            "comment_lines": len(lines) - len(code_lines),
            "estimated_complexity": branches,
            "complexity_rating": "HIGH" if branches > 20 else "MEDIUM" if branches > 10 else "LOW",
            "top_level_definitions": [
                line.strip() for line in lines
                if line.strip().startswith(('def ', 'class ', 'func ', 'fn '))
            ]
        }
    except Exception as e:
        return {"error": str(e)}


def create_code_agent() -> Agent:
    return Agent(
        name="code_agent",
        description="代码分析专家",
        tools=[
            FunctionTool(
                func=analyze_code_complexity,
                name="analyze_code_complexity",
                description="分析代码文件复杂度"
            )
        ],
        instruction="""
        你是一个代码分析专家。当用户提供代码文件路径时,
        使用 analyze_code_complexity 工具分析代码复杂度,
        并给出改进建议。关注:圈复杂度、代码行数、注释比例。
        """
    )


def get_code_agent_card() -> AgentCard:
    return AgentCard(
        name="code-analysis-agent",
        description="代码复杂度分析和质量评估 Agent",
        version="1.0.0",
        capabilities={"streaming": True, "stateTransitions": True},
        skills=[
            {
                "id": "complexity-analysis",
                "name": "代码复杂度分析",
                "description": "分析代码文件的圈复杂度和质量指标",
                "tags": ["code", "complexity", "quality"],
                "inputModes": ["text"],
                "outputModes": ["text", "json"]
            }
        ],
        provider={"organization": "CodeTools", "service": "code-analysis"}
    )


async def main():
    server = A2AServer(
        agent_card=get_code_agent_card(),
        agent=create_code_agent(),
        host="0.0.0.0",
        port=8002
    )
    print("📊 Code Analysis Agent A2A Server 启动中...")
    await server.start()


if __name__ == "__main__":
    asyncio.run(main())

Host Agent(协调者)host_agent/agents/coordinator.py):

import asyncio
from typing import Optional
from google.adk import Agent
from google.adk.agents import Runner
from google.adk.models.lite_llm import LiteLLM
from a2a.client import A2AClient
from a2a.types import (
    Message, TextPart, TaskQueryParams, TaskSendParams
)


class HostAgent:
    """
    Host Agent(主控 Agent):
    - 接收用户请求
    - 解析意图,选择合适的 Remote Agent
    - 委托任务,聚合结果
    - 返回最终答案给用户
    """

    def __init__(self):
        # 连接 Remote Agent
        self.weather_client = A2AClient(
            url="http://localhost:8001"
        )
        self.code_client = A2AClient(
            url="http://localhost:8002"
        )

        # 创建 Host Agent 本身
        self.agent = Agent(
            model=LiteLLM(model="gpt-4o"),
            name="coordinator",
            description="多功能 AI 助手,协调多个专业 Agent 协作",
            instruction="""
            你是一个智能协调者 Agent。当用户提出问题时,你需要判断:
            1. 如果是天气相关 → 调用 weather-agent
            2. 如果是代码分析相关 → 调用 code-analysis-agent
            3. 如果两者都有 → 并行调用两个 Agent,再聚合结果

            使用 A2A Client 向 Remote Agent 发送任务。
            """
        )
        self.runner = Runner(agent=self.agent)

    async def handle_request(self, user_message: str) -> str:
        """
        处理用户请求,智能路由到合适的 Remote Agent
        """
        # 首先让 Host Agent 分析用户意图
        intent_analysis = await self._analyze_intent(user_message)

        # 根据意图决定调用哪些 Agent
        tasks = []
        if "weather" in intent_analysis or "天气" in intent_analysis:
            city = self._extract_city(user_message)
            tasks.append(("weather", self._call_weather_agent(city)))

        if "code" in intent_analysis or "代码" in intent_analysis:
            file_path = self._extract_file_path(user_message)
            tasks.append(("code", self._call_code_agent(file_path)))

        # 并行执行所有任务
        if tasks:
            results = await asyncio.gather(
                *[task for _, task in tasks],
                return_exceptions=True
            )

            # 聚合结果
            return self._aggregate_results(tasks, results)
        else:
            return "抱歉,我无法处理这个请求。"

    async def _call_weather_agent(self, city: str) -> dict:
        """调用天气 Agent"""
        try:
            result = await self.weather_client.send_task(
                TaskSendParams(
                    id=f"weather-task-{id(city)}",
                    message=Message(
                        role="user",
                        content=TextPart(text=f"查询 {city} 的天气")
                    )
                )
            )
            return {"status": "success", "data": result}
        except Exception as e:
            return {"status": "error", "data": str(e)}

    async def _call_code_agent(self, file_path: str) -> dict:
        """调用代码分析 Agent"""
        try:
            result = await self.code_client.send_task(
                TaskSendParams(
                    id=f"code-task-{id(file_path)}",
                    message=Message(
                        role="user",
                        content=TextPart(text=f"分析代码 {file_path}")
                    )
                )
            )
            return {"status": "success", "data": result}
        except Exception as e:
            return {"status": "error", "data": str(e)}

    async def _analyze_intent(self, message: str) -> str:
        """分析用户意图(简化版)"""
        return message.lower()

    def _extract_city(self, message: str) -> str:
        """提取城市名(简化版)"""
        cities = ["北京", "上海", "广州", "深圳", "杭州"]
        for city in cities:
            if city in message:
                return city
        return "北京"  # 默认

    def _extract_file_path(self, message: str) -> str:
        """提取文件路径(简化版)"""
        import re
        match = re.search(r'[\w/._-]+\.(py|js|ts|go|rs|java)', message)
        return match.group(0) if match else "main.py"

    def _aggregate_results(self, tasks: list, results: list) -> str:
        """聚合多个 Agent 的结果"""
        parts = []
        for (task_type, _), result in zip(tasks, results):
            if task_type == "weather":
                if result.get("status") == "success":
                    data = result["data"]
                    parts.append(f"🌤️ 天气信息:{data}")
                else:
                    parts.append(f"天气查询失败:{result.get('data')}")

            elif task_type == "code":
                if result.get("status") == "success":
                    data = result["data"]
                    parts.append(f"📊 代码分析:{data}")
                else:
                    parts.append(f"代码分析失败:{result.get('data')}")

        return "\n".join(parts) if parts else "未获取到有效结果"


# 运行 Host Agent
async def main():
    host = HostAgent()

    # 模拟用户请求
    test_queries = [
        "帮我查一下北京今天的天气",
        "帮我分析 main.py 的代码复杂度",
        "北京天气怎么样,同时帮我看看 src/app.py 的代码"
    ]

    for query in test_queries:
        print(f"\n👤 用户: {query}")
        result = await host.handle_request(query)
        print(f"🤖 回复: {result}")


if __name__ == "__main__":
    asyncio.run(main())

四、MCP 与 A2A 的协作关系:不是竞争,是互补

4.1 协议层级对比

很多开发者容易混淆 MCP 和 A2A 的定位。实际上,它们解决的是不同层次的问题:

┌─────────────────────────────────────────────┐
│           应用层:具体业务场景                 │
│  ┌──────────────┐  ┌──────────────────────┐│
│  │  Host Agent  │  │    Remote Agent(s)   ││
│  └──────┬───────┘  └──────────┬───────────┘│
│         │                      │             │
│    A2A  │◄── Agent ↔ Agent 通讯层           │
│  协议层  │        (任务委托、状态同步)        │
│         │                      │             │
│    MCP  │◄── Agent ↔ 工具 交互层             │
│  协议层  │        (文件、API、数据库)         │
│         │                      │             │
│  ┌──────┴───────┐  ┌──────────┴───────────┐│
│  │ MCP Server   │  │    外部工具/数据源    ││
│  │ (工具集)      │  │ (GitHub/DB/地图...)  ││
│  └──────────────┘  └──────────────────────┘│
└─────────────────────────────────────────────┘

4.2 典型场景:两者协同工作

场景:用户要求 Agent 自动审查 GitHub PR 并生成报告

步骤 1(用户 → Host Agent):"帮我审查 GitHub PR #123"

步骤 2(Host Agent):
  ├── 通过 MCP Server 调用 GitHub API 获取 PR 内容
  ├── 通过 MCP Server 调用代码分析工具
  └── 通过 A2A 委托 code-review-agent 做深度审查

步骤 3(Remote Agent 通过 MCP):
  ├── 调用 GitHub MCP Server 提交 review 意见
  ├── 调用 Slack MCP Server 通知团队

步骤 4(Remote Agent → Host Agent):
  └── 通过 A2A 返回审查结果

步骤 5(Host Agent → 用户):
  └── 聚合所有结果,生成最终报告

4.3 技术选型决策树

需要选择 MCP 还是 A2A?

Q1: 你的 Agent 需要调用外部工具/服务吗?
    │
    ├── YES → 需要 MCP
    │         - 操作文件/数据库
    │         - 调用第三方 API
    │         - 搜索、翻译、地图等能力
    │
    └── NO → 继续判断

Q2: 你的系统中有多个 Agent 需要协作吗?
    │
    ├── YES → 需要 A2A
    │         - 任务委派
    │         - 能力发现
    │         - 多 Agent 串联
    │
    └── NO → 可能是单 Agent 系统,只需 MCP 或两者都不需要

Q3: 你的 Agent 需要与「其他公司的 Agent」通信吗?
    │
    └── YES → 必须用 A2A
              (MCP 是工具协议,不处理跨 Agent 通信)

五、生产级集成策略

5.1 企业级 MCP + A2A 架构设计

对于生产环境,推荐以下架构:

┌──────────────────────────────────────────────────────┐
│                    API Gateway                        │
│            (认证、限流、日志、路由)                    │
└────────────────────┬─────────────────────────────────┘
                     │
       ┌─────────────┼─────────────┐
       ↓             ↓             ↓
┌──────────┐  ┌──────────┐  ┌──────────┐
│  Host    │  │  Host    │  │  Host    │
│  Agent 1 │  │  Agent 2 │  │  Agent 3 │
│  (主控)   │  │  (垂直   │  │  (垂直   │
│          │  │   领域)   │  │   领域)   │
└────┬─────┘  └────┬─────┘  └────┬─────┘
     │             │             │
     │  A2A 协议通信层             │
     │◄──────────────────────────►│
     │             │             │
     ├──→ MCP Server (内部工具集)
     │    ├── GitHub MCP
     │    ├── Database MCP
     │    ├── Notification MCP
     │
     └──→ A2A Server (对外暴露)
          ├── Agent Card 注册表
          ├── 任务队列
          └── 审计日志

清华大学 A2A 报告提出的 Agent Link 框架,为多 Agent 协作提供了完整的安全治理方案:

关系信誉层:记录 Agent 之间的历史协作数据,评估可信度:

# Agent Link: 关系信誉追踪
class AgentTrustLedger:
    """
    追踪 Agent 之间的信任关系
    类似 Web 时代的 PageRank,但用于 Agent 间的信任传播
    """
    def __init__(self):
        self.relationships: dict[str, list[TrustEdge]] = {}

    def record_interaction(
        self,
        from_agent: str,
        to_agent: str,
        task_type: str,
        success: bool,
        data_accessed: list[str]
    ):
        """记录一次协作交互"""
        edge = TrustEdge(
            from_agent=from_agent,
            to_agent=to_agent,
            task_type=task_type,
            success=success,
            data_accessed=data_accessed,
            timestamp=datetime.now(),
            weight=self._calculate_weight(success, data_accessed)
        )
        self.relationships.setdefault(from_agent, []).append(edge)

    def get_trust_score(self, agent_id: str) -> float:
        """计算 Agent 的信任分数(0-100)"""
        edges = self.relationships.get(agent_id, [])
        if not edges:
            return 50.0  # 默认分数

        weights = [e.weight for e in edges]
        return sum(weights) / len(weights) * 100

    def _calculate_weight(self, success: bool, data_accessed: list) -> float:
        """计算单次交互权重"""
        base = 1.0 if success else 0.3
        sensitivity_penalty = len([d for d in data_accessed if d in SENSITIVE_DATA]) * 0.1
        return max(0.0, base - sensitivity_penalty)


SENSITIVE_DATA = ["password", "credit_card", "ssn", "api_key", "private_key"]

证据溯源层:记录每次数据访问的完整链路:

# Agent Link: 证据溯源
@dataclass
class AuditEntry:
    """审计日志条目:谁在什么时间访问了什么数据"""
    agent_id: str
    action: str
    data_accessed: list[str]
    purpose: str
    authorized_by: str
    timestamp: datetime
    result: str  # success / denied / error
    session_id: str


class AuditLogger:
    """
    不可篡改的审计日志
    用于事后追溯和数据合规(GDPR 等)
    """
    def __init__(self, storage_path: str):
        self.storage_path = Path(storage_path)
        self.current_file = self.storage_path / f"audit_{date.today()}.jsonl"

    def log(self, entry: AuditEntry):
        """追加一条审计日志"""
        self.current_file.parent.mkdir(parents=True, exist_ok=True)
        with open(self.current_file, 'a') as f:
            f.write(json.dumps(asdict(entry), ensure_ascii=False) + '\n')

    def query(self, agent_id: str, start: datetime, end: datetime) -> list[AuditEntry]:
        """查询指定 Agent 的审计记录"""
        results = []
        for log_file in self.storage_path.glob("audit_*.jsonl"):
            with open(log_file) as f:
                for line in f:
                    entry = json.loads(line)
                    if (entry["agent_id"] == agent_id and
                        start <= parse_datetime(entry["timestamp"]) <= end):
                        results.append(entry)
        return results

委托审计层:验证跨 Agent 委托的合法性:

# Agent Link: 委托审计
class DelegationAuditor:
    """
    检查 Agent 之间的委托请求是否合法
    在 A2A 任务委托时自动触发
    """
    def __init__(
        self,
        trust_ledger: AgentTrustLedger,
        audit_logger: AuditLogger
    ):
        self.trust_ledger = trust_ledger
        self.audit_logger = audit_logger

    async def authorize_delegation(
        self,
        delegator: str,      # 委托方
        delegate: str,        # 受托方
        task: str,           # 任务描述
        data_scope: list[str] # 需要访问的数据范围
    ) -> DelegationDecision:
        """判断委托是否授权"""

        # 1. 检查受托 Agent 的信任分数
        trust_score = self.trust_ledger.get_trust_score(delegate)
        if trust_score < TRUST_THRESHOLD:
            return DelegationDecision(
                approved=False,
                reason=f"受托方信任分数 {trust_score:.1f} 低于阈值 {TRUST_THRESHOLD}"
            )

        # 2. 检查数据访问范围是否合理
        sensitive_data_in_scope = [d for d in data_scope if d in SENSITIVE_DATA]
        if sensitive_data_in_scope:
            # 敏感数据需要额外确认
            audit_entry = AuditEntry(
                agent_id=delegate,
                action=f"delegation_access:{task}",
                data_accessed=sensitive_data_in_scope,
                purpose=task,
                authorized_by=delegator,
                timestamp=datetime.now(),
                result="pending_approval",
                session_id=generate_session_id()
            )
            self.audit_logger.log(audit_entry)
            return DelegationDecision(
                approved=True,
                reason="包含敏感数据,已记录审计日志",
                audit_required=True
            )

        return DelegationDecision(approved=True, reason="授权通过")


TRUST_THRESHOLD = 70  # 信任分数阈值

5.3 可观测性:多 Agent 系统的调试挑战

多 Agent 协作系统的可观测性,是生产部署中最容易被忽视但又最关键的问题。

核心挑战

  1. 链路追踪:一个用户请求可能触发 10+ 个 Agent 调用,需要全链路追踪
  2. 状态一致性:多个 Agent 并行执行,中间状态如何保持一致
  3. 调试困难:Agent 的「思考过程」不透明,出问题难以定位

解决方案:Structured Logging + Trace ID

# 统一的分布式追踪
import uuid
from contextvars import ContextVar

trace_id: ContextVar[str] = ContextVar('trace_id', default='')


class AgentTracer:
    """为每次 Agent 协作生成完整的追踪日志"""

    def __init__(self, logger: logging.Logger):
        self.logger = logger

    def start_trace(self) -> str:
        """启动一次追踪,返回 trace_id"""
        tid = str(uuid.uuid4())[:8]
        trace_id.set(tid)
        self.logger.info(f"[{tid}] 🚀 追踪开始")
        return tid

    def log_agent_call(
        self,
        agent: str,
        action: str,
        input_data: dict,
        output_data: dict = None,
        error: str = None
    ):
        """记录一次 Agent 调用"""
        tid = trace_id.get()
        entry = {
            "trace_id": tid,
            "agent": agent,
            "action": action,
            "input": input_data,
            "output": output_data,
            "error": error,
            "timestamp": datetime.now().isoformat()
        }
        self.logger.info(f"[{tid}] 📞 {agent}.{action}()", extra=entry)

        # 同时写入结构化日志文件(供后续分析)
        self._write_structured_log(entry)

    def _write_structured_log(self, entry: dict):
        """写入结构化日志到 ES/Loki"""
        # 生产环境应该发送到 ELK / Loki / Jaeger
        log_file = Path("logs/agent_traces.jsonl")
        log_file.parent.mkdir(exist_ok=True)
        with open(log_file, 'a') as f:
            f.write(json.dumps(entry, ensure_ascii=False) + '\n')

5.4 性能优化:减少 Agent 间通信开销

多 Agent 协作的性能瓶颈主要在通信延迟上下文膨胀

策略一:批量委托

# 不要:逐个调用
for file in many_files:
    await code_agent.analyze(file)

# 要:批量委托
await code_agent.analyze_batch(file_list)

策略二:结果缓存

from functools import lru_cache

class AgentResultCache:
    """Agent 调用结果缓存(基于内容hash)"""

    def __init__(self, ttl_seconds: int = 300):
        self.cache = {}
        self.ttl = ttl_seconds

    def _make_key(self, agent: str, task: str, params: dict) -> str:
        content = f"{agent}:{task}:{json.dumps(params, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def get(self, agent: str, task: str, params: dict) -> Optional[dict]:
        key = self._make_key(agent, task, params)
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry['timestamp'] < self.ttl:
                return entry['result']
            del self.cache[key]
        return None

    def set(self, agent: str, task: str, params: dict, result: dict):
        key = self._make_key(agent, task, params)
        self.cache[key] = {'result': result, 'timestamp': time.time()}

策略三:流式处理避免等待

对于长时间任务,使用 A2A 的 tasks/sendSubscribe 替代 tasks/send,让用户看到实时进度:

# 流式处理示例
async def process_with_progress(client: A2AClient, task_id: str):
    async for update in client.send_subscribe(task_id):
        state = update.status.state
        message = update.status.message

        if state == "working":
            print(f"⏳ {message}")      # 实时展示进度
        elif state == "completed":
            print(f"✅ 完成")
            return update.artifacts
        elif state == "failed":
            print(f"❌ 失败: {message}")
            return None

六、实战:从零构建一个 MCP + A2A 混合系统

6.1 项目目标

构建一个代码助手系统,包含:

  • 1 个 Host Agent(协调者)
  • 2 个 Remote Agent(代码审查 Agent、文档生成 Agent)
  • 3 个 MCP Server(GitHub、数据库、文件系统)

6.2 完整代码

requirements.txt

google-adk>=0.2.0
mcp>=1.0.0
fastmcp>=0.1.0
aiohttp>=3.9.0
structlog>=24.0.0

system_architecture.py(完整系统):

"""
MCP + A2A 混合系统完整示例
一个代码助手:接收用户请求 → 协调多个 Agent → 返回结果
"""
import asyncio
import json
import logging
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional

import structlog

# 配置结构化日志
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()


# ============================================================
# MCP Server: 文件系统操作
# ============================================================
from mcp.server.fastmcp import FastMCP

mcp_fs = FastMCP(name="filesystem")


@mcp_fs.tool(name="read_file", description="读取文件内容")
def read_file(path: str, encoding: str = "utf-8") -> dict:
    try:
        content = Path(path).read_text(encoding=encoding, errors='replace')
        return {
            "success": True,
            "path": path,
            "lines": len(content.split('\n')),
            "preview": content[:500]
        }
    except Exception as e:
        return {"success": False, "error": str(e)}


@mcp_fs.tool(name="list_directory", description="列出目录内容")
def list_directory(path: str, recursive: bool = False) -> dict:
    try:
        p = Path(path)
        if not p.exists():
            return {"success": False, "error": "目录不存在"}

        if recursive:
            items = [
                {"path": str(item), "type": "dir" if item.is_dir() else "file"}
                for item in p.rglob('*')
            ]
        else:
            items = [
                {"name": item.name, "type": "dir" if item.is_dir() else "file"}
                for item in p.iterdir()
            ]
        return {"success": True, "path": path, "items": items}
    except Exception as e:
        return {"success": False, "error": str(e)}


# ============================================================
# A2A: Remote Agent 定义
# ============================================================
from a2a.server import A2AServer
from a2a.types import AgentCard, TaskStatus, TaskState, Message, TextPart, DataPart


async def create_code_review_agent_server():
    """代码审查 Agent(Remote Agent 2)"""

    def get_agent_card() -> AgentCard:
        return AgentCard(
            name="code-review-agent",
            description="专业的代码审查 Agent,支持安全扫描、风格检查、性能分析",
            version="1.0.0",
            capabilities={"streaming": True, "stateTransitions": True},
            skills=[
                {
                    "id": "security-scan",
                    "name": "安全扫描",
                    "tags": ["security", "vulnerability"]
                },
                {
                    "id": "style-check",
                    "name": "风格检查",
                    "tags": ["style", "lint"]
                }
            ]
        )

    async def handle_task(message: Message) -> dict:
        """处理代码审查任务"""
        user_text = message.content.parts[0].text if message.content.parts else ""

        # 简化版代码审查逻辑
        findings = []

        # 检测潜在安全问题
        if "password" in user_text.lower() or "secret" in user_text.lower():
            findings.append({
                "type": "SECURITY",
                "severity": "HIGH",
                "description": "可能存在敏感信息泄露风险"
            })

        # 检测代码复杂度
        if len(user_text) > 500:
            findings.append({
                "type": "COMPLEXITY",
                "severity": "MEDIUM",
                "description": f"代码较长({len(user_text)} 字符),建议拆分"
            })

        return {
            "status": "completed",
            "findings": findings,
            "summary": f"审查完成,发现 {len(findings)} 个问题"
        }

    server = A2AServer(
        agent_card=get_agent_card(),
        skill_handlers={"security-scan": handle_task},
        host="0.0.0.0",
        port=8001
    )
    return server


# ============================================================
# Host Agent: 主控协调器
# ============================================================
@dataclass
class UserRequest:
    """用户请求结构"""
    text: str
    attachments: list[str] = None
    user_id: str = "anonymous"


@dataclass
class AgentResponse:
    """Agent 响应结构"""
    agent: str
    result: dict
    duration_ms: float
    success: bool


class HostAgent:
    """
    Host Agent(主控协调器)

    职责:
    1. 接收用户请求
    2. 通过 MCP 操作本地资源(文件系统)
    3. 通过 A2A 委托专业 Agent
    4. 聚合结果返回给用户
    """

    def __init__(self):
        self.a2a_clients = {
            "code-review": A2AClient(url="http://localhost:8001"),
        }
        logger.info("Host Agent 初始化完成")

    async def process(self, request: UserRequest) -> dict:
        """
        处理用户请求的主入口
        """
        trace_id = logger.new(request_id=request.user_id).info("开始处理请求")

        # 阶段1:解析用户意图
        intent = self._parse_intent(request.text)
        logger.info(f"意图识别结果: {intent}")

        results = []
        start_time = asyncio.get_event_loop().time()

        # 阶段2:根据意图调用相应 Agent
        if "review" in intent or "审查" in intent:
            result = await self._call_code_review_agent(request.text)
            results.append(result)

        if "read" in intent or "读取" in intent:
            # 直接使用 MCP
            file_path = self._extract_file_path(request.text)
            result = await self._call_mcp_fs("read_file", {"path": file_path})
            results.append(result)

        # 阶段3:聚合结果
        duration = (asyncio.get_event_loop().time() - start_time) * 1000

        return {
            "success": True,
            "request": request.text,
            "intent": intent,
            "results": results,
            "duration_ms": round(duration, 2),
            "trace_id": trace_id
        }

    def _parse_intent(self, text: str) -> list[str]:
        """解析用户意图(简化版)"""
        intents = []
        keywords = {
            "review": ["审查", "review", "检查", "扫描"],
            "read": ["读取", "read", "查看", "打开"]
        }
        text_lower = text.lower()
        for intent, kws in keywords.items():
            if any(kw in text_lower for kw in kws):
                intents.append(intent)
        return intents or ["general"]

    def _extract_file_path(self, text: str) -> str:
        """提取文件路径"""
        import re
        patterns = [
            r'[\w/._-]+\.(py|js|ts|go|rs|java|cpp|c|h)',
            r'[/\\][\w/._-]+'  # 通用路径
        ]
        for pattern in patterns:
            match = re.search(pattern, text)
            if match:
                return match.group(0)
        return "."

    async def _call_code_review_agent(self, task: str) -> AgentResponse:
        """通过 A2A 调用代码审查 Agent"""
        start = asyncio.get_event_loop().time()
        try:
            client = self.a2a_clients["code-review"]
            result = await client.send_task(TaskSendParams(
                id=f"cr-{datetime.now().timestamp()}",
                message=Message(role="user", content=TextPart(text=task))
            ))
            duration = (asyncio.get_event_loop().time() - start) * 1000
            return AgentResponse(
                agent="code-review-agent",
                result=result,
                duration_ms=round(duration, 2),
                success=True
            )
        except Exception as e:
            duration = (asyncio.get_event_loop().time() - start) * 1000
            return AgentResponse(
                agent="code-review-agent",
                result={"error": str(e)},
                duration_ms=round(duration, 2),
                success=False
            )

    async def _call_mcp_fs(self, tool: str, params: dict) -> AgentResponse:
        """通过 MCP 调用文件系统工具"""
        start = asyncio.get_event_loop().time()
        try:
            if tool == "read_file":
                result = read_file(**params)
            elif tool == "list_directory":
                result = list_directory(**params)
            else:
                result = {"error": f"Unknown tool: {tool}"}
            duration = (asyncio.get_event_loop().time() - start) * 1000
            return AgentResponse(
                agent="mcp-filesystem",
                result=result,
                duration_ms=round(duration, 2),
                success=result.get("success", True)
            )
        except Exception as e:
            duration = (asyncio.get_event_loop().time() - start) * 1000
            return AgentResponse(
                agent="mcp-filesystem",
                result={"error": str(e)},
                duration_ms=round(duration, 2),
                success=False
            )


# ============================================================
# 启动入口
# ============================================================
async def main():
    """启动完整系统"""
    print("=" * 60)
    print("  MCP + A2A 混合代码助手系统")
    print("  架构:1 Host Agent + 2 Remote Agents + 3 MCP Servers")
    print("=" * 60)

    # 启动 MCP Server(文件系统)
    print("\n📁 启动 MCP 文件系统 Server...")
    # mcp_fs.run()  # 阻塞运行,此处省略

    # 启动 A2A Remote Agent
    print("\n🔍 启动 A2A 代码审查 Agent...")
    # await create_code_review_agent_server()  # 阻塞运行,此处省略

    # 启动 Host Agent
    print("\n🤖 启动 Host Agent...")
    host = HostAgent()

    # 模拟用户请求
    print("\n" + "=" * 60)
    print("  系统就绪,接收请求测试")
    print("=" * 60)

    test_cases = [
        UserRequest(
            text="帮我审查 src/main.py 的安全性",
            user_id="user-001"
        ),
        UserRequest(
            text="列出当前目录的文件结构",
            user_id="user-002"
        )
    ]

    for req in test_cases:
        print(f"\n👤 用户请求: {req.text}")
        result = await host.process(req)
        print(f"✅ 处理完成,耗时: {result['duration_ms']}ms")
        print(f"   意图: {result['intent']}")
        print(f"   Agent 调用: {len(result['results'])} 个")
        for r in result['results']:
            status = "✅" if r.success else "❌"
            print(f"   {status} {r.agent}: {r.duration_ms}ms")


if __name__ == "__main__":
    asyncio.run(main())

七、总结与展望

7.1 核心要点回顾

维度MCPA2A
定位AI ↔ 工具/数据AI ↔ AI
主导方AnthropicGoogle (Linux Foundation)
核心价值工具标准化,即插即用Agent 协作语言,能力发现
传输模式stdio / SSE / HTTPHTTP + JSON-RPC
典型场景调用 GitHub API、读写文件任务委派、并行协作
安全重点资源隔离、输入验证信任传递、审计追溯

7.2 2026 年 Agent 协作技术趋势

  1. MCP + A2A 双协议融合:主流 Agent 平台将同时支持 MCP 和 A2A,形成完整的 Agent 生态

  2. Agent Card 标准化:类似 App Store 的 Agent 发现机制,企业内部可以构建私有 Agent 市场

  3. 信任框架落地:Agent Link 等信任框架将从研究走向生产,成为企业合规的必备能力

  4. 跨平台协作:微信、支付宝等超级 App 开放 A2A 能力后,AI Agent 的商业化路径将彻底打通

7.3 给开发者的建议

短期(1-3个月)

  • 熟悉 MCP Server 的开发方式,从简单的 stdio 模式开始
  • 选一个你熟悉的场景,尝试接入 MCP(如 GitHub、数据库)
  • 关注 Google ADK 和 Anthropic MCP SDK 的更新

中期(3-6个月)

  • 了解 A2A 协议的核心概念(Agent Card、Task 生命周期)
  • 尝试构建多 Agent 协作的原型系统
  • 建立 Agent 可观测性基础设施(结构化日志、追踪)

长期(6-12个月)

  • 构建企业内部 Agent 治理体系(权限、审计、合规)
  • 参与开源 Agent 协议标准的制定和贡献
  • 探索跨组织 Agent 协作的商业模式

写在最后:MCP 和 A2A 不是两个要选一个的竞争方案,而是同一套 Agent 基础设施的两层。理解它们的互补关系,比争论哪个更好更重要。2026 年是 Agent 规模化落地的元年,多 Agent 协作从概念走向生产,是这个领域最重要的发展方向。早点掌握这两套协议,你就能在 Agent 时代抢占先机。


本文首发于 程序员茄子,如需转载,请保留原文链接。

标签:AI Agent | MCP | A2A | 多智能体协作 | Google ADK | Anthropic | 生产级架构

关键词:Model Context Protocol | Agent-to-Agent | AI Agent | Multi-Agent | MCP Server | A2A Protocol | Agent Card | Google ADK | 智能体协作 | Agent Link | 信任框架

复制全文 生成海报 AI Agent MCP A2A 多智能体协作

推荐文章

JavaScript 异步编程入门
2024-11-19 07:07:43 +0800 CST
Vue3中的v-model指令有什么变化?
2024-11-18 20:00:17 +0800 CST
js函数常见的写法以及调用方法
2024-11-19 08:55:17 +0800 CST
2025,重新认识 HTML!
2025-02-07 14:40:00 +0800 CST
JavaScript设计模式:发布订阅模式
2024-11-18 01:52:39 +0800 CST
如何在Vue中处理动态路由?
2024-11-19 06:09:50 +0800 CST
开发外贸客户的推荐网站
2024-11-17 04:44:05 +0800 CST
程序员茄子在线接单