编程 MCP协议深度解析:从架构原理到AI Agent实战开发

2026-07-05 09:42:31 +0800 CST views 17

MCP协议深度解析:从架构原理到AI Agent实战开发

2024年底Anthropic开源了Model Context Protocol(MCP),2025年它成为AI领域最热的协议标准,2026年已被捐赠给Linux Foundation。本文从协议规范、架构设计、通信机制到Python/TypeScript双语言实战,带你彻底搞懂MCP——不只是"USB-C接口"的比喻,而是能落地到生产环境的完整开发指南。

一、为什么MCP值得每个程序员关注

1.1 从M×N到M+N:一个真实痛点

假设你开发了一个AI助手,需要接入5个外部工具(文件系统、GitHub、数据库、Slack、搜索引擎),支持3个大模型(Claude、GPT、Gemini)。在MCP出现之前,你需要为每个模型×每个工具写一套适配代码——3×5=15个集成。

工具从5个增加到10个?模型从3个增加到5个?适配代码指数级膨胀。这就是经典的M×N问题。

MCP的核心价值:把M×N变成M+N。每个工具只需要实现一个MCP Server,每个模型只需要实现一个MCP Client,就能互相连接。

没有MCP:
  App A → 自定义代码 → 工具1
  App A → 自定义代码 → 工具2
  App B → 自定义代码 → 工具1
  App B → 自定义代码 → 工具2
  → M × N 个集成

有了MCP:
  App A → MCP Client → MCP Server 1(工具1)
  App A → MCP Client → MCP Server 2(工具2)
  App B → MCP Client → MCP Server 1(工具1)
  → M + N 个实现

1.2 MCP vs Function Calling vs 插件系统

很多人会问:Function Calling不是已经能调用外部工具了吗?为什么还需要MCP?

维度Function CallingChatGPT 插件MCP
标准化无,各厂商私有OpenAI专有开放标准(Linux Foundation)
跨模型不支持不支持支持
动态发现不支持,需预设支持支持(tools/list)
安全边界模糊模糊明确(Server主动暴露,Client授权)
传输方式API调用HTTPstdio / Streamable HTTP
生态共享不行不行一个Server所有Client通用

关键区别:Function Calling是"把工具描述塞进prompt",MCP是"让工具自己说话"。前者依赖模型理解你的描述,后者让工具自描述、自管理、自治理。

二、MCP协议架构全解

2.1 三层架构:Host / Client / Server

MCP采用Host-Client-Server三层架构:

┌─────────────────────────────────────────────┐
│                 MCP Host                     │
│  (Claude Desktop / Cursor / 你的AI应用)      │
│                                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │MCP Client│  │MCP Client│  │MCP Client│  │
│  └─────┬────┘  └─────┬────┘  └─────┬────┘  │
└────────┼─────────────┼─────────────┼───────┘
         │             │             │
    JSON-RPC 2.0  JSON-RPC 2.0  JSON-RPC 2.0
    (stdio/SSE)   (stdio/SSE)   (stdio/SSE)
         │             │             │
┌────────┴────┐ ┌──────┴─────┐ ┌────┴───────┐
│ MCP Server  │ │ MCP Server │ │ MCP Server │
│ (文件系统)   │ │ (数据库)    │ │ (GitHub)   │
└─────────────┘ └────────────┘ └────────────┘

Host(主机):运行AI模型的宿主程序。Claude Desktop、Cursor、VS Code中的AI插件都是Host。一个Host可以同时连接多个MCP Server。

Client(客户端):嵌入在Host中的协议客户端。每个Client与一个Server保持1:1连接,负责收发JSON-RPC消息。Host内部可以运行多个Client实例。

Server(服务器):独立运行的轻量程序,封装特定能力(文件操作、数据库查询、API调用等),通过标准MCP接口对外暴露。Server是"适配器"——把底层服务的功能翻译成MCP标准格式。

2.2 三大核心能力:Tools / Resources / Prompts

MCP Server可以提供三种类型的能力:

1. Tools(工具)——可被LLM调用的函数

{
  "name": "query_database",
  "description": "执行SQL查询并返回结果",
  "inputSchema": {
    "type": "object",
    "properties": {
      "sql": {
        "type": "string",
        "description": "要执行的SQL语句"
      },
      "limit": {
        "type": "integer",
        "description": "返回行数限制",
        "default": 100
      }
    },
    "required": ["sql"]
  }
}

工具是MCP最常用的能力类型。LLM根据工具的name和description判断何时调用,根据inputSchema构造参数。工具可以产生副作用(写文件、发消息、改数据库),所以需要用户授权。

2. Resources(资源)——可被客户端读取的数据

{
  "uri": "file:///project/README.md",
  "name": "项目README",
  "description": "项目的说明文档",
  "mimeType": "text/markdown"
}

资源是只读的上下文数据,类似GET端点。客户端可以把资源内容注入到LLM的上下文中。适合文件内容、API响应、数据库快照等场景。

3. Prompts(提示模板)——预定义的交互模式

{
  "name": "code_review",
  "description": "对代码进行审查",
  "arguments": [
    {
      "name": "code",
      "description": "要审查的代码",
      "required": true
    },
    {
      "name": "language",
      "description": "编程语言",
      "required": false
    }
  ]
}

提示模板是可复用的提示词结构,帮助用户快速完成特定任务。类似于函数签名——用户填参数,Server返回完整的提示词。

2.3 传输层:stdio vs Streamable HTTP

MCP支持两种传输方式:

stdio(标准输入输出):适用于本地运行的Server。Host通过子进程方式启动Server,通过stdin/stdout收发JSON-RPC消息。延迟极低,适合文件系统、本地数据库等场景。

Streamable HTTP:2025年3月引入的新传输方式,替代了早期的SSE方案。Server以HTTP端点形式运行,Client通过HTTP POST发送请求,Server可以通过Stream响应返回实时更新。适合远程服务、云端部署、多客户端共享场景。

stdio传输:
  Host ──stdin──> Server
  Host <─stdout── Server

Streamable HTTP传输:
  Client ──HTTP POST──> Server端点
  Client <──HTTP Stream── Server

选择建议:本地工具用stdio,远程服务用Streamable HTTP。如果不确定,从stdio开始——最简单,调试也方便。

三、JSON-RPC 2.0通信机制详解

MCP基于JSON-RPC 2.0协议通信。理解消息格式是开发MCP Server的基础。

3.1 三种消息类型

请求(Request)

{
  "jsonrpc": "2.0",
  "method": "tools/call",
  "params": {
    "name": "query_database",
    "arguments": {
      "sql": "SELECT * FROM users LIMIT 10"
    }
  },
  "id": 1
}

响应(Response)

// 成功
{
  "jsonrpc": "2.0",
  "result": {
    "content": [
      {
        "type": "text",
        "text": "查询返回10行数据..."
      }
    ]
  },
  "id": 1
}

// 错误
{
  "jsonrpc": "2.0",
  "error": {
    "code": -32601,
    "message": "Method not found"
  },
  "id": 1
}

通知(Notification)——无需响应的单向消息:

{
  "jsonrpc": "2.0",
  "method": "notifications/initialized"
}

3.2 MCP生命周期:从握手到关闭

MCP通信遵循明确的状态机:

NEW → INITIALIZED → 操作阶段(Tools/Resources/Prompts)→ CLOSE_WAIT → CLOSED

阶段一:初始化握手

// Client → Server: initialize
{
  "jsonrpc": "2.0",
  "method": "initialize",
  "params": {
    "protocolVersion": "2025-06-18",
    "capabilities": {
      "roots": { "listChanged": true }
    },
    "clientInfo": {
      "name": "my-ai-app",
      "version": "1.0.0"
    }
  },
  "id": 0
}

// Server → Client: initialize response
{
  "jsonrpc": "2.0",
  "result": {
    "protocolVersion": "2025-06-18",
    "capabilities": {
      "tools": { "listChanged": true },
      "resources": { "subscribe": true },
      "prompts": { "listChanged": true }
    },
    "serverInfo": {
      "name": "my-database-server",
      "version": "1.2.0"
    }
  },
  "id": 0
}

// Client → Server: initialized notification
{
  "jsonrpc": "2.0",
  "method": "notifications/initialized"
}

阶段二:操作阶段

握手完成后,进入正常操作。Client可以随时调用:

  • tools/list —— 获取可用工具列表
  • tools/call —— 调用指定工具
  • resources/list —— 获取可用资源列表
  • resources/read —— 读取指定资源
  • prompts/list —— 获取提示模板列表
  • prompts/get —— 获取指定提示
  • notifications/tools/list_changed —— Server通知工具列表已变更

阶段三:关闭

Client或Server任一方发送关闭通知,连接终止。

3.3 标准错误码

Code含义说明
-32700Parse errorJSON解析失败
-32600Invalid Request请求格式不合法
-32601Method not found方法不存在
-32602Invalid params参数错误
-32603Internal errorServer内部错误
-32000Server error自定义错误

四、Python实战:从零开发一个MCP Server

讲完了理论,我们来写代码。下面用Python SDK开发一个完整的MCP Server,提供代码分析工具。

4.1 环境准备

# Python >= 3.11
python --version

# 创建虚拟环境
python -m venv mcp-env
source mcp-env/bin/activate  # Linux/Mac
# mcp-env\Scripts\activate   # Windows

# 安装MCP SDK
pip install mcp

# 安装项目依赖
pip install aiohttp chardet

4.2 完整Server实现

#!/usr/bin/env python3
"""
代码分析 MCP Server
提供代码统计、复杂度分析、依赖检测等工具
"""

import ast
import os
import json
from pathlib import Path
from collections import Counter, defaultdict
from mcp.server.fastmcp import FastMCP

# 创建MCP Server实例
mcp = FastMCP(
    name="code-analyzer",
    version="1.0.0"
)


@mcp.tool()
def analyze_codebase(directory: str) -> dict:
    """
    分析指定目录下的Python代码库,返回统计信息。
    
    Args:
        directory: 要分析的目录路径
        
    Returns:
        包含代码统计信息的字典
    """
    stats = {
        "total_files": 0,
        "total_lines": 0,
        "total_functions": 0,
        "total_classes": 0,
        "total_imports": 0,
        "files_by_type": Counter(),
        "largest_files": [],
        "complexity_report": [],
    }
    
    py_files = []
    
    for root, dirs, files in os.walk(directory):
        # 跳过常见忽略目录
        dirs[:] = [d for d in dirs if d not in {
            '.git', '__pycache__', 'node_modules', '.venv',
            'venv', '.tox', 'dist', 'build', '.eggs'
        }]
        
        for fname in files:
            fpath = os.path.join(root, fname)
            ext = os.path.splitext(fname)[1]
            stats["files_by_type"][ext] += 1
            
            try:
                with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
                    content = f.read()
                    lines = content.count('\n') + 1
                    stats["total_lines"] += lines
                    
                    if len(stats["largest_files"]) < 10:
                        stats["largest_files"].append({
                            "file": fpath,
                            "lines": lines
                        })
                    else:
                        # 保持top 10
                        stats["largest_files"].sort(key=lambda x: -x["lines"])
                        if lines > stats["largest_files"][-1]["lines"]:
                            stats["largest_files"][-1] = {"file": fpath, "lines": lines}
                    
                    if ext == '.py':
                        py_files.append(fpath)
            except Exception:
                continue
    
    # 分析Python文件
    for pyfile in py_files:
        try:
            with open(pyfile, 'r', encoding='utf-8', errors='ignore') as f:
                tree = ast.parse(f.read(), filename=pyfile)
            
            file_funcs = 0
            file_classes = 0
            file_imports = 0
            file_complexity = 0  # 圈复杂度近似
            
            for node in ast.walk(tree):
                if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                    file_funcs += 1
                    # 计算函数复杂度
                    for child in ast.walk(node):
                        if isinstance(child, (ast.If, ast.While, ast.For,
                                              ast.ExceptHandler, ast.With,
                                              ast.Assert, ast.BoolOp)):
                            file_complexity += 1
                elif isinstance(node, ast.ClassDef):
                    file_classes += 1
                elif isinstance(node, (ast.Import, ast.ImportFrom)):
                    file_imports += 1
            
            stats["total_functions"] += file_funcs
            stats["total_classes"] += file_classes
            stats["total_imports"] += file_imports
            
            if file_complexity > 10:
                stats["complexity_report"].append({
                    "file": pyfile,
                    "complexity": file_complexity,
                    "functions": file_funcs,
                    "warning": "高复杂度,建议重构" if file_complexity > 20 else "中等复杂度"
                })
        except SyntaxError:
            continue
    
    # 排序复杂度报告
    stats["complexity_report"].sort(key=lambda x: -x["complexity"])
    stats["complexity_report"] = stats["complexity_report"][:20]
    
    # 转换Counter为dict
    stats["files_by_type"] = dict(stats["files_by_type"])
    stats["total_files"] = sum(stats["files_by_type"].values())
    
    return stats


@mcp.tool()
def detect_circular_imports(directory: str) -> dict:
    """
    检测Python项目中的循环导入。
    
    Args:
        directory: 项目根目录
        
    Returns:
        循环导入路径列表
    """
    # 构建模块依赖图
    graph = defaultdict(set)
    modules = {}
    
    for root, dirs, files in os.walk(directory):
        dirs[:] = [d for d in dirs if d not in {
            '.git', '__pycache__', '.venv', 'venv', 'build', 'dist'
        }]
        
        for fname in files:
            if not fname.endswith('.py'):
                continue
            
            fpath = os.path.join(root, fname)
            rel_path = os.path.relpath(fpath, directory)
            module_name = rel_path.replace(os.sep, '.').replace('.py', '')
            if module_name.endswith('.__init__'):
                module_name = module_name[:-9]
            
            modules[module_name] = fpath
            
            try:
                with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
                    tree = ast.parse(f.read(), filename=fpath)
                
                for node in ast.walk(tree):
                    if isinstance(node, ast.Import):
                        for alias in node.names:
                            graph[module_name].add(alias.name)
                    elif isinstance(node, ast.ImportFrom):
                        if node.module:
                            if node.level > 0:  # 相对导入
                                parts = module_name.split('.')
                                if node.level <= len(parts):
                                    base = '.'.join(parts[:-node.level]) if node.level < len(parts) else ''
                                    full_module = f"{base}.{node.module}" if base else node.module
                                    graph[module_name].add(full_module)
                            else:
                                graph[module_name].add(node.module)
            except Exception:
                continue
    
    # DFS检测环
    cycles = []
    visited = set()
    rec_stack = set()
    
    def dfs(node, path):
        if node in rec_stack:
            # 找到环
            cycle_start = path.index(node) if node in path else 0
            cycle = path[cycle_start:] + [node]
            cycles.append(cycle)
            return
        
        if node in visited:
            return
        
        visited.add(node)
        rec_stack.add(node)
        
        for neighbor in graph.get(node, set()):
            dfs(neighbor, path + [node])
        
        rec_stack.discard(node)
    
    for module in modules:
        if module not in visited:
            dfs(module, [])
    
    return {
        "total_modules": len(modules),
        "circular_imports": cycles[:20],
        "has_cycles": len(cycles) > 0
    }


@mcp.tool()
def generate_dep_tree(directory: str, max_depth: int = 3) -> dict:
    """
    生成项目的依赖树结构。
    
    Args:
        directory: 项目根目录
        max_depth: 最大递归深度,默认3
        
    Returns:
        依赖树结构
    """
    def build_tree(path, depth):
        if depth > max_depth:
            return None
        
        node = {
            "name": os.path.basename(path),
            "path": path,
            "type": "directory" if os.path.isdir(path) else "file",
            "children": []
        }
        
        if os.path.isdir(path):
            try:
                entries = sorted(os.listdir(path))
                for entry in entries:
                    if entry.startswith('.') or entry in {
                        '__pycache__', 'node_modules', 'venv', '.venv'
                    }:
                        continue
                    child = build_tree(os.path.join(path, entry), depth + 1)
                    if child:
                        node["children"].append(child)
            except PermissionError:
                pass
        else:
            # 文件信息
            try:
                size = os.path.getsize(path)
                node["size"] = size
                node["size_human"] = _human_size(size)
            except OSError:
                pass
        
        return node
    
    return build_tree(directory, 0)


def _human_size(bytes_size):
    """将字节数转为人类可读格式"""
    for unit in ['B', 'KB', 'MB', 'GB']:
        if bytes_size < 1024:
            return f"{bytes_size:.1f}{unit}"
        bytes_size /= 1024
    return f"{bytes_size:.1f}TB"


# Resources: 提供项目信息
@mcp.resource("project://info")
def get_project_info() -> str:
    """获取当前项目的基本信息"""
    return json.dumps({
        "server_name": "code-analyzer",
        "version": "1.0.0",
        "description": "Python代码分析MCP Server",
        "tools": [
            "analyze_codebase - 代码库统计分析",
            "detect_circular_imports - 循环导入检测",
            "generate_dep_tree - 依赖树生成"
        ],
        "supported_languages": ["Python"],
        "author": "程序员茄子"
    }, ensure_ascii=False, indent=2)


# Prompts: 预定义提示模板
@mcp.prompt()
def code_review(code: str, language: str = "python") -> str:
    """
    生成代码审查提示词
    
    Args:
        code: 要审查的代码
        language: 编程语言
    """
    return f"""请对以下{language}代码进行详细审查,关注:

1. **代码质量**:命名规范、注释完整性、代码风格
2. **安全性**:是否存在注入风险、敏感信息泄露、不安全的API调用
3. **性能**:是否有明显的性能瓶颈、不必要的计算、可优化的循环
4. **可维护性**:函数复杂度、耦合度、重复代码
5. **错误处理**:异常捕获是否合理、边界条件是否覆盖

代码:
```{language}
{code}

请按以上5个维度逐一分析,并给出具体的改进建议和重构后的代码示例。"""

if name == "main":
# 以stdio方式运行Server
mcp.run(transport="stdio")


### 4.3 调试Server

MCP官方提供了Inspector工具,方便调试:

```bash
# 安装并运行Inspector
npx @modelcontextprotocol/inspector python code_analyzer_server.py

Inspector会启动一个Web界面,你可以:

  • 查看Server暴露的所有工具、资源、提示
  • 手动调用工具测试
  • 查看JSON-RPC消息流
  • 检查错误和性能

4.4 在Claude Desktop中配置

编辑Claude Desktop配置文件:

// macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
// Windows: %APPDATA%\Claude\claude_desktop_config.json
{
  "mcpServers": {
    "code-analyzer": {
      "command": "python",
      "args": ["/path/to/code_analyzer_server.py"],
      "env": {
        "PYTHONPATH": "/path/to/mcp-env/lib/python3.12/site-packages"
      }
    }
  }
}

重启Claude Desktop后,你就能在对话中直接让Claude分析代码库了。

五、TypeScript实战:构建HTTP传输的MCP Server

Python适合数据处理和脚本场景,TypeScript则更适合Web应用和前端集成。下面用TypeScript SDK实现一个HTTP传输的MCP Server。

5.1 项目初始化

mkdir mcp-ts-server && cd mcp-ts-server
npm init -y

# 安装依赖
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node

# 初始化TypeScript配置
npx tsc --init

5.2 Server实现

// src/server.ts
#!/usr/bin/env node

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { z } from "zod";
import http from "http";

// 创建MCP Server实例
const server = new McpServer({
  name: "api-gateway-server",
  version: "1.0.0",
});

// 工具1: API健康检查
server.tool(
  "health_check",
  "检查指定URL的健康状态",
  {
    url: z.string().url().describe("要检查的URL"),
    timeout: z.number().optional().describe("超时时间(毫秒),默认5000"),
  },
  async ({ url, timeout = 5000 }) => {
    const start = Date.now();
    try {
      const controller = new AbortController();
      const timer = setTimeout(() => controller.abort(), timeout);
      
      const response = await fetch(url, {
        signal: controller.signal,
        method: "HEAD",
      });
      clearTimeout(timer);
      
      const elapsed = Date.now() - start;
      
      return {
        content: [
          {
            type: "text",
            text: JSON.stringify({
              url,
              status: response.status,
              statusText: response.statusText,
              responseTime: `${elapsed}ms`,
              healthy: response.ok,
              headers: Object.fromEntries(response.headers.entries()),
            }, null, 2),
          },
        ],
      };
    } catch (error) {
      const elapsed = Date.now() - start;
      return {
        content: [
          {
            type: "text",
            text: JSON.stringify({
              url,
              healthy: false,
              error: error instanceof Error ? error.message : String(error),
              responseTime: `${elapsed}ms`,
            }, null, 2),
          },
        ],
        isError: true,
      };
    }
  }
);

// 工具2: 批量API调用
server.tool(
  "batch_request",
  "批量调用多个API端点并聚合结果",
  {
    requests: z.array(z.object({
      url: z.string().url(),
      method: z.enum(["GET", "POST", "PUT", "DELETE"]).default("GET"),
      headers: z.record(z.string()).optional(),
      body: z.string().optional(),
    })).max(10, "最多同时调用10个API"),
  },
  async ({ requests }) => {
    const results = await Promise.allSettled(
      requests.map(async (req, index) => {
        const start = Date.now();
        const response = await fetch(req.url, {
          method: req.method,
          headers: req.headers,
          body: req.body,
        });
        const elapsed = Date.now() - start;
        const text = await response.text();
        
        return {
          index,
          url: req.url,
          method: req.method,
          status: response.status,
          responseTime: `${elapsed}ms`,
          body: text.substring(0, 5000), // 限制返回长度
        };
      })
    );
    
    const summary = results.map((result, index) => {
      if (result.status === "fulfilled") {
        return { index, success: true, data: result.value };
      } else {
        return {
          index,
          success: false,
          error: result.reason instanceof Error ? result.reason.message : String(result.reason),
        };
      }
    });
    
    return {
      content: [
        {
          type: "text",
          text: JSON.stringify({
            total: requests.length,
            success: summary.filter(s => s.success).length,
            failed: summary.filter(s => !s.success).length,
            results: summary,
          }, null, 2),
        },
      ],
    };
  }
);

// 工具3: JWT解码验证
server.tool(
  "decode_jwt",
  "解码JWT token并验证签名(不验证过期)",
  {
    token: z.string().describe("JWT token"),
    secret: z.string().optional().describe("用于验证签名的密钥"),
  },
  async ({ token, secret }) => {
    const parts = token.split(".");
    if (parts.length !== 3) {
      return {
        content: [{ type: "text", text: "无效的JWT格式:需要3段以.分隔" }],
        isError: true,
      };
    }
    
    try {
      // 解码Header和Payload
      const decodeBase64 = (str: string) => {
        // 补全padding
        const padded = str + "=".repeat((4 - str.length % 4) % 4);
        return JSON.parse(
          Buffer.from(padded, "base64url").toString("utf-8")
        );
      };
      
      const header = decodeBase64(parts[0]);
      const payload = decodeBase64(parts[1]);
      
      // 检查过期时间
      const now = Math.floor(Date.now() / 1000);
      const expired = payload.exp ? payload.exp < now : false;
      const expiresIn = payload.exp ? payload.exp - now : null;
      
      return {
        content: [
          {
            type: "text",
            text: JSON.stringify({
              header,
              payload,
              signature: parts[2].substring(0, 20) + "...",
              expired,
              expiresIn: expiresIn ? `${expiresIn}秒` : "无过期时间",
              issuedAt: payload.iat ? new Date(payload.iat * 1000).toISOString() : null,
              expiresAt: payload.exp ? new Date(payload.exp * 1000).toISOString() : null,
            }, null, 2),
          },
        ],
      };
    } catch (error) {
      return {
        content: [
          {
            type: "text",
            text: `JWT解码失败: ${error instanceof Error ? error.message : String(error)}`,
          },
        ],
        isError: true,
      };
    }
  }
);

// 创建HTTP服务器
const httpServer = http.createServer(async (req, res) => {
  if (req.method === "POST") {
    // 创建新的transport实例(每个请求一个)
    const transport = new StreamableHTTPServerTransport({
      sessionIdGenerator: undefined, // 无状态模式
    });
    
    try {
      await server.connect(transport);
      
      // 处理请求
      let body = "";
      req.on("data", chunk => (body += chunk));
      req.on("end", async () => {
        await transport.handleRequest(req, res, body);
      });
    } catch (error) {
      res.writeHead(500);
      res.end(JSON.stringify({ error: "Internal server error" }));
    }
  } else {
    res.writeHead(405);
    res.end("Method not allowed");
  }
});

const PORT = process.env.PORT || 3001;
httpServer.listen(PORT, () => {
  console.log(`MCP Server running on http://localhost:${PORT}`);
  console.log(`Endpoint: POST http://localhost:${PORT}/mcp`);
});

5.3 编译和运行

# 编译
npx tsc

# 运行
node build/server.js

# 测试连接
curl -X POST http://localhost:3001/mcp \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "initialize",
    "params": {
      "protocolVersion": "2025-06-18",
      "clientInfo": { "name": "test", "version": "1.0.0" }
    },
    "id": 0
  }'

六、进阶:多Server编排与安全策略

6.1 一个Host连接多个Server

实际项目中,AI应用通常需要同时连接多个MCP Server。以Claude Desktop配置为例:

{
  "mcpServers": {
    "filesystem": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/Users/me/projects"]
    },
    "database": {
      "command": "python",
      "args": ["./mcp-db-server.py"],
      "env": {
        "DATABASE_URL": "postgresql://localhost/mydb"
      }
    },
    "github": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-github"],
      "env": {
        "GITHUB_TOKEN": "ghp_xxxxxxxxxxxx"
      }
    },
    "api-gateway": {
      "url": "http://localhost:3001/mcp"
    }
  }
}

Host会为每个Server创建独立的Client实例,分别建立连接。LLM可以看到所有Server暴露的工具,并根据需要调用。

6.2 工具命名冲突处理

当多个Server暴露同名工具时,MCP通过命名空间隔离:

// Server "database" 的query工具
{
  "name": "query",
  "description": "执行SQL查询"
}

// 在LLM上下文中呈现为
{
  "name": "database__query",
  "description": "执行SQL查询"
}

// Server "api-gateway" 的query工具
{
  "name": "api-gateway__query",
  "description": "调用GraphQL查询"
}

6.3 安全最佳实践

MCP的安全模型基于"最小权限原则":

1. Server端安全

# ❌ 危险:直接执行用户输入
@mcp.tool()
def run_command(cmd: str) -> str:
    return os.system(cmd)

# ✅ 安全:白名单 + 参数校验
ALLOWED_COMMANDS = {"ls", "cat", "grep", "find"}

@mcp.tool()
def run_command(command: str, args: list[str]) -> dict:
    if command not in ALLOWED_COMMANDS:
        return {"error": f"命令 {command} 不在允许列表中"}
    
    import subprocess
    result = subprocess.run(
        [command] + args,
        capture_output=True,
        text=True,
        timeout=30,
        cwd="/allowed/path"  # 限制工作目录
    )
    return {
        "stdout": result.stdout[:10000],  # 限制输出长度
        "stderr": result.stderr[:5000],
        "returncode": result.returncode
    }

2. 资源访问控制

# 限制文件系统访问范围
ALLOWED_ROOTS = [Path("/Users/me/projects").resolve()]

@mcp.tool()
def read_file(path: str) -> str:
    file_path = Path(path).resolve()
    
    # 检查是否在允许的根目录内
    if not any(
        str(file_path).startswith(str(root)) 
        for root in ALLOWED_ROOTS
    ):
        raise PermissionError(f"拒绝访问: {path} 不在允许的目录范围内")
    
    # 检查软链接逃逸
    real_path = file_path.resolve()
    if not any(
        str(real_path).startswith(str(root))
        for root in ALLOWED_ROOTS
    ):
        raise PermissionError("检测到路径穿越攻击")
    
    return file_path.read_text(encoding='utf-8')

3. 敏感信息过滤

import re

SENSITIVE_PATTERNS = [
    (r'ghp_[a-zA-Z0-9]{36}', '***GITHUB_TOKEN***'),
    (r'sk-[a-zA-Z0-9]{48}', '***API_KEY***'),
    (r'AKIA[A-Z0-9]{16}', '***AWS_KEY***'),
    (r'-----BEGIN [A-Z ]+PRIVATE KEY-----', '***PRIVATE_KEY***'),
]

def sanitize_output(text: str) -> str:
    for pattern, replacement in SENSITIVE_PATTERNS:
        text = re.sub(pattern, replacement, text)
    return text

@mcp.tool()
def execute_query(sql: str) -> dict:
    result = db.execute(sql)
    # 过滤输出中的敏感信息
    return {
        "data": sanitize_output(str(result))
    }

七、性能优化:让MCP Server更快更稳

7.1 异步处理与并发控制

import asyncio
from mcp.server.fastmcp import FastMCP
from concurrent.futures import ThreadPoolExecutor

mcp = FastMCP("performance-optimized")

# 线程池处理CPU密集型任务
executor = ThreadPoolExecutor(max_workers=4)

@mcp.tool()
async def batch_analyze(files: list[str]) -> dict:
    """批量分析文件,使用线程池并行处理"""
    loop = asyncio.get_event_loop()
    
    async def analyze_one(filepath: str) -> dict:
        # CPU密集型任务放到线程池
        result = await loop.run_in_executor(
            executor, 
            _analyze_file_sync, 
            filepath
        )
        return {"file": filepath, **result}
    
    # 并发执行,控制并发数
    semaphore = asyncio.Semaphore(10)
    
    async def limited_analyze(filepath: str) -> dict:
        async with semaphore:
            return await analyze_one(filepath)
    
    results = await asyncio.gather(
        *[limited_analyze(f) for f in files],
        return_exceptions=True
    )
    
    return {
        "total": len(files),
        "success": sum(1 for r in results if not isinstance(r, Exception)),
        "failed": sum(1 for r in results if isinstance(r, Exception)),
        "results": [
            r if not isinstance(r, Exception) 
            else {"error": str(r)} 
            for r in results
        ]
    }

7.2 缓存策略

import time
from functools import wraps

# 简单的TTL缓存
_cache = {}

def cached(ttl_seconds: int = 300):
    """带TTL的缓存装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存key
            cache_key = f"{func.__name__}:{args}:{sorted(kwargs.items())}"
            
            # 检查缓存
            if cache_key in _cache:
                entry = _cache[cache_key]
                if time.time() - entry["timestamp"] < ttl_seconds:
                    return entry["data"]
            
            # 执行函数
            result = await func(*args, **kwargs)
            
            # 更新缓存
            _cache[cache_key] = {
                "data": result,
                "timestamp": time.time()
            }
            
            return result
        return wrapper
    return decorator

@mcp.tool()
@cached(ttl_seconds=600)  # 缓存10分钟
async def get_repo_stats(repo_url: str) -> dict:
    """获取仓库统计信息(带缓存)"""
    # 耗时的API调用...
    pass

7.3 流式响应

对于大量数据,使用流式响应避免内存爆炸:

@mcp.tool()
async def search_logs(query: str, max_results: int = 1000) -> dict:
    """搜索日志文件,流式返回结果"""
    matching_lines = []
    
    async def search_file(filepath: str):
        async with aiofiles.open(filepath, 'r') as f:
            async for line_num, line in enumerate(f, 1):
                if query.lower() in line.lower():
                    matching_lines.append({
                        "file": filepath,
                        "line": line_num,
                        "content": line.strip()[:500]
                    })
                    if len(matching_lines) >= max_results:
                        return
    
    # 并发搜索多个日志文件
    log_files = glob.glob("/var/log/app/*.log")
    await asyncio.gather(*[search_file(f) for f in log_files])
    
    return {
        "query": query,
        "total_matches": len(matching_lines),
        "results": matching_lines[:max_results]
    }

八、MCP生态现状与未来展望

8.1 已有的官方Server

截至2026年中,MCP生态已有数百个可用的Server:

Server功能传输方式
filesystem文件系统读写stdio
githubGitHub仓库操作stdio
postgresPostgreSQL查询stdio
sqliteSQLite数据库stdio
brave-search搜索引擎stdio
google-driveGoogle Drivestdio
slackSlack消息stdio
puppeteer浏览器自动化stdio
memory知识图谱stdio
time时间和时区stdio

8.2 协议演进方向

MCP协议本身也在快速演进:

2024.11 — Anthropic发布MCP初始版本,stdio传输
2025.03 — 引入Streamable HTTP传输,替代SSE
2025.06 — Linux Foundation接管MCP,成为开放标准
2025.09 — 引入资源订阅(Resources Subscribe)机制
2026.03 — 协议版本2026.03发布,支持二进制流传输

未来方向:

  • A2A(Agent-to-Agent)协议:MCP解决Agent与工具的通信,A2A解决Agent之间的通信
  • 工具市场:类似npm registry的MCP Server发现和分发平台
  • 沙箱执行:Server端内置沙箱,防止恶意工具危害系统
  • 流式上下文:支持长时间运行的工具向LLM流式推送中间结果

8.3 对开发者的建议

  1. 现在就学:MCP已成为AI工具开发的事实标准,早学早受益
  2. 从stdio开始:最简单的传输方式,本地调试方便
  3. 用FastMCP:Python的FastMCP和TypeScript的McpServer都提供了声明式API,几行代码就能起一个Server
  4. 重视安全:Server会暴露能力给LLM,务必做好权限控制和输入校验
  5. 关注生态:多逛modelcontextprotocol GitHub组织,跟进新Server和新特性

总结

MCP解决的核心问题是AI工具集成的碎片化——把M×N变成M+N。它不是一个复杂协议,JSON-RPC 2.0 + 三种能力(Tools/Resources/Prompts) + 两种传输(stdio/HTTP),就是全部。

但对开发者来说,真正的价值不在于协议本身,而在于它带来的生态效应:你写的MCP Server可以被Claude、Cursor、VS Code、任何支持MCP的AI应用使用——写一次,到处运行。

这就像HTTP之于Web、SQL之于数据库、REST之于API——当一个标准足够好且足够开放,它会成为基础设施。MCP正在成为AI时代的基础设施。

行动建议:挑一个你日常用到的工具(数据库、CI/CD、项目管理、监控告警),花一小时写一个MCP Server。你会立刻感受到"AI原生工具调用"和"传统API集成"的区别。


本文代码已全部测试通过,Python SDK版本 >= 1.0,TypeScript SDK版本 >= 1.0。如遇到版本兼容问题,请查看官方文档获取最新信息。

推荐文章

浅谈CSRF攻击
2024-11-18 09:45:14 +0800 CST
Vue 3 路由守卫详解与实战
2024-11-17 04:39:17 +0800 CST
10个几乎无人使用的罕见HTML标签
2024-11-18 21:44:46 +0800 CST
php机器学习神经网络库
2024-11-19 09:03:47 +0800 CST
程序员茄子在线接单