MCP协议深度解析:从架构原理到AI Agent实战开发
2024年底Anthropic开源了Model Context Protocol(MCP),2025年它成为AI领域最热的协议标准,2026年已被捐赠给Linux Foundation。本文从协议规范、架构设计、通信机制到Python/TypeScript双语言实战,带你彻底搞懂MCP——不只是"USB-C接口"的比喻,而是能落地到生产环境的完整开发指南。
一、为什么MCP值得每个程序员关注
1.1 从M×N到M+N:一个真实痛点
假设你开发了一个AI助手,需要接入5个外部工具(文件系统、GitHub、数据库、Slack、搜索引擎),支持3个大模型(Claude、GPT、Gemini)。在MCP出现之前,你需要为每个模型×每个工具写一套适配代码——3×5=15个集成。
工具从5个增加到10个?模型从3个增加到5个?适配代码指数级膨胀。这就是经典的M×N问题。
MCP的核心价值:把M×N变成M+N。每个工具只需要实现一个MCP Server,每个模型只需要实现一个MCP Client,就能互相连接。
没有MCP:
App A → 自定义代码 → 工具1
App A → 自定义代码 → 工具2
App B → 自定义代码 → 工具1
App B → 自定义代码 → 工具2
→ M × N 个集成
有了MCP:
App A → MCP Client → MCP Server 1(工具1)
App A → MCP Client → MCP Server 2(工具2)
App B → MCP Client → MCP Server 1(工具1)
→ M + N 个实现
1.2 MCP vs Function Calling vs 插件系统
很多人会问:Function Calling不是已经能调用外部工具了吗?为什么还需要MCP?
| 维度 | Function Calling | ChatGPT 插件 | MCP |
|---|---|---|---|
| 标准化 | 无,各厂商私有 | OpenAI专有 | 开放标准(Linux Foundation) |
| 跨模型 | 不支持 | 不支持 | 支持 |
| 动态发现 | 不支持,需预设 | 支持 | 支持(tools/list) |
| 安全边界 | 模糊 | 模糊 | 明确(Server主动暴露,Client授权) |
| 传输方式 | API调用 | HTTP | stdio / Streamable HTTP |
| 生态共享 | 不行 | 不行 | 一个Server所有Client通用 |
关键区别:Function Calling是"把工具描述塞进prompt",MCP是"让工具自己说话"。前者依赖模型理解你的描述,后者让工具自描述、自管理、自治理。
二、MCP协议架构全解
2.1 三层架构:Host / Client / Server
MCP采用Host-Client-Server三层架构:
┌─────────────────────────────────────────────┐
│ MCP Host │
│ (Claude Desktop / Cursor / 你的AI应用) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │MCP Client│ │MCP Client│ │MCP Client│ │
│ └─────┬────┘ └─────┬────┘ └─────┬────┘ │
└────────┼─────────────┼─────────────┼───────┘
│ │ │
JSON-RPC 2.0 JSON-RPC 2.0 JSON-RPC 2.0
(stdio/SSE) (stdio/SSE) (stdio/SSE)
│ │ │
┌────────┴────┐ ┌──────┴─────┐ ┌────┴───────┐
│ MCP Server │ │ MCP Server │ │ MCP Server │
│ (文件系统) │ │ (数据库) │ │ (GitHub) │
└─────────────┘ └────────────┘ └────────────┘
Host(主机):运行AI模型的宿主程序。Claude Desktop、Cursor、VS Code中的AI插件都是Host。一个Host可以同时连接多个MCP Server。
Client(客户端):嵌入在Host中的协议客户端。每个Client与一个Server保持1:1连接,负责收发JSON-RPC消息。Host内部可以运行多个Client实例。
Server(服务器):独立运行的轻量程序,封装特定能力(文件操作、数据库查询、API调用等),通过标准MCP接口对外暴露。Server是"适配器"——把底层服务的功能翻译成MCP标准格式。
2.2 三大核心能力:Tools / Resources / Prompts
MCP Server可以提供三种类型的能力:
1. Tools(工具)——可被LLM调用的函数
{
"name": "query_database",
"description": "执行SQL查询并返回结果",
"inputSchema": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "要执行的SQL语句"
},
"limit": {
"type": "integer",
"description": "返回行数限制",
"default": 100
}
},
"required": ["sql"]
}
}
工具是MCP最常用的能力类型。LLM根据工具的name和description判断何时调用,根据inputSchema构造参数。工具可以产生副作用(写文件、发消息、改数据库),所以需要用户授权。
2. Resources(资源)——可被客户端读取的数据
{
"uri": "file:///project/README.md",
"name": "项目README",
"description": "项目的说明文档",
"mimeType": "text/markdown"
}
资源是只读的上下文数据,类似GET端点。客户端可以把资源内容注入到LLM的上下文中。适合文件内容、API响应、数据库快照等场景。
3. Prompts(提示模板)——预定义的交互模式
{
"name": "code_review",
"description": "对代码进行审查",
"arguments": [
{
"name": "code",
"description": "要审查的代码",
"required": true
},
{
"name": "language",
"description": "编程语言",
"required": false
}
]
}
提示模板是可复用的提示词结构,帮助用户快速完成特定任务。类似于函数签名——用户填参数,Server返回完整的提示词。
2.3 传输层:stdio vs Streamable HTTP
MCP支持两种传输方式:
stdio(标准输入输出):适用于本地运行的Server。Host通过子进程方式启动Server,通过stdin/stdout收发JSON-RPC消息。延迟极低,适合文件系统、本地数据库等场景。
Streamable HTTP:2025年3月引入的新传输方式,替代了早期的SSE方案。Server以HTTP端点形式运行,Client通过HTTP POST发送请求,Server可以通过Stream响应返回实时更新。适合远程服务、云端部署、多客户端共享场景。
stdio传输:
Host ──stdin──> Server
Host <─stdout── Server
Streamable HTTP传输:
Client ──HTTP POST──> Server端点
Client <──HTTP Stream── Server
选择建议:本地工具用stdio,远程服务用Streamable HTTP。如果不确定,从stdio开始——最简单,调试也方便。
三、JSON-RPC 2.0通信机制详解
MCP基于JSON-RPC 2.0协议通信。理解消息格式是开发MCP Server的基础。
3.1 三种消息类型
请求(Request):
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "query_database",
"arguments": {
"sql": "SELECT * FROM users LIMIT 10"
}
},
"id": 1
}
响应(Response):
// 成功
{
"jsonrpc": "2.0",
"result": {
"content": [
{
"type": "text",
"text": "查询返回10行数据..."
}
]
},
"id": 1
}
// 错误
{
"jsonrpc": "2.0",
"error": {
"code": -32601,
"message": "Method not found"
},
"id": 1
}
通知(Notification)——无需响应的单向消息:
{
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
3.2 MCP生命周期:从握手到关闭
MCP通信遵循明确的状态机:
NEW → INITIALIZED → 操作阶段(Tools/Resources/Prompts)→ CLOSE_WAIT → CLOSED
阶段一:初始化握手
// Client → Server: initialize
{
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {
"roots": { "listChanged": true }
},
"clientInfo": {
"name": "my-ai-app",
"version": "1.0.0"
}
},
"id": 0
}
// Server → Client: initialize response
{
"jsonrpc": "2.0",
"result": {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": { "listChanged": true },
"resources": { "subscribe": true },
"prompts": { "listChanged": true }
},
"serverInfo": {
"name": "my-database-server",
"version": "1.2.0"
}
},
"id": 0
}
// Client → Server: initialized notification
{
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
阶段二:操作阶段
握手完成后,进入正常操作。Client可以随时调用:
tools/list—— 获取可用工具列表tools/call—— 调用指定工具resources/list—— 获取可用资源列表resources/read—— 读取指定资源prompts/list—— 获取提示模板列表prompts/get—— 获取指定提示notifications/tools/list_changed—— Server通知工具列表已变更
阶段三:关闭
Client或Server任一方发送关闭通知,连接终止。
3.3 标准错误码
| Code | 含义 | 说明 |
|---|---|---|
| -32700 | Parse error | JSON解析失败 |
| -32600 | Invalid Request | 请求格式不合法 |
| -32601 | Method not found | 方法不存在 |
| -32602 | Invalid params | 参数错误 |
| -32603 | Internal error | Server内部错误 |
| -32000 | Server error | 自定义错误 |
四、Python实战:从零开发一个MCP Server
讲完了理论,我们来写代码。下面用Python SDK开发一个完整的MCP Server,提供代码分析工具。
4.1 环境准备
# Python >= 3.11
python --version
# 创建虚拟环境
python -m venv mcp-env
source mcp-env/bin/activate # Linux/Mac
# mcp-env\Scripts\activate # Windows
# 安装MCP SDK
pip install mcp
# 安装项目依赖
pip install aiohttp chardet
4.2 完整Server实现
#!/usr/bin/env python3
"""
代码分析 MCP Server
提供代码统计、复杂度分析、依赖检测等工具
"""
import ast
import os
import json
from pathlib import Path
from collections import Counter, defaultdict
from mcp.server.fastmcp import FastMCP
# 创建MCP Server实例
mcp = FastMCP(
name="code-analyzer",
version="1.0.0"
)
@mcp.tool()
def analyze_codebase(directory: str) -> dict:
"""
分析指定目录下的Python代码库,返回统计信息。
Args:
directory: 要分析的目录路径
Returns:
包含代码统计信息的字典
"""
stats = {
"total_files": 0,
"total_lines": 0,
"total_functions": 0,
"total_classes": 0,
"total_imports": 0,
"files_by_type": Counter(),
"largest_files": [],
"complexity_report": [],
}
py_files = []
for root, dirs, files in os.walk(directory):
# 跳过常见忽略目录
dirs[:] = [d for d in dirs if d not in {
'.git', '__pycache__', 'node_modules', '.venv',
'venv', '.tox', 'dist', 'build', '.eggs'
}]
for fname in files:
fpath = os.path.join(root, fname)
ext = os.path.splitext(fname)[1]
stats["files_by_type"][ext] += 1
try:
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.count('\n') + 1
stats["total_lines"] += lines
if len(stats["largest_files"]) < 10:
stats["largest_files"].append({
"file": fpath,
"lines": lines
})
else:
# 保持top 10
stats["largest_files"].sort(key=lambda x: -x["lines"])
if lines > stats["largest_files"][-1]["lines"]:
stats["largest_files"][-1] = {"file": fpath, "lines": lines}
if ext == '.py':
py_files.append(fpath)
except Exception:
continue
# 分析Python文件
for pyfile in py_files:
try:
with open(pyfile, 'r', encoding='utf-8', errors='ignore') as f:
tree = ast.parse(f.read(), filename=pyfile)
file_funcs = 0
file_classes = 0
file_imports = 0
file_complexity = 0 # 圈复杂度近似
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
file_funcs += 1
# 计算函数复杂度
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For,
ast.ExceptHandler, ast.With,
ast.Assert, ast.BoolOp)):
file_complexity += 1
elif isinstance(node, ast.ClassDef):
file_classes += 1
elif isinstance(node, (ast.Import, ast.ImportFrom)):
file_imports += 1
stats["total_functions"] += file_funcs
stats["total_classes"] += file_classes
stats["total_imports"] += file_imports
if file_complexity > 10:
stats["complexity_report"].append({
"file": pyfile,
"complexity": file_complexity,
"functions": file_funcs,
"warning": "高复杂度,建议重构" if file_complexity > 20 else "中等复杂度"
})
except SyntaxError:
continue
# 排序复杂度报告
stats["complexity_report"].sort(key=lambda x: -x["complexity"])
stats["complexity_report"] = stats["complexity_report"][:20]
# 转换Counter为dict
stats["files_by_type"] = dict(stats["files_by_type"])
stats["total_files"] = sum(stats["files_by_type"].values())
return stats
@mcp.tool()
def detect_circular_imports(directory: str) -> dict:
"""
检测Python项目中的循环导入。
Args:
directory: 项目根目录
Returns:
循环导入路径列表
"""
# 构建模块依赖图
graph = defaultdict(set)
modules = {}
for root, dirs, files in os.walk(directory):
dirs[:] = [d for d in dirs if d not in {
'.git', '__pycache__', '.venv', 'venv', 'build', 'dist'
}]
for fname in files:
if not fname.endswith('.py'):
continue
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, directory)
module_name = rel_path.replace(os.sep, '.').replace('.py', '')
if module_name.endswith('.__init__'):
module_name = module_name[:-9]
modules[module_name] = fpath
try:
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
tree = ast.parse(f.read(), filename=fpath)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
graph[module_name].add(alias.name)
elif isinstance(node, ast.ImportFrom):
if node.module:
if node.level > 0: # 相对导入
parts = module_name.split('.')
if node.level <= len(parts):
base = '.'.join(parts[:-node.level]) if node.level < len(parts) else ''
full_module = f"{base}.{node.module}" if base else node.module
graph[module_name].add(full_module)
else:
graph[module_name].add(node.module)
except Exception:
continue
# DFS检测环
cycles = []
visited = set()
rec_stack = set()
def dfs(node, path):
if node in rec_stack:
# 找到环
cycle_start = path.index(node) if node in path else 0
cycle = path[cycle_start:] + [node]
cycles.append(cycle)
return
if node in visited:
return
visited.add(node)
rec_stack.add(node)
for neighbor in graph.get(node, set()):
dfs(neighbor, path + [node])
rec_stack.discard(node)
for module in modules:
if module not in visited:
dfs(module, [])
return {
"total_modules": len(modules),
"circular_imports": cycles[:20],
"has_cycles": len(cycles) > 0
}
@mcp.tool()
def generate_dep_tree(directory: str, max_depth: int = 3) -> dict:
"""
生成项目的依赖树结构。
Args:
directory: 项目根目录
max_depth: 最大递归深度,默认3
Returns:
依赖树结构
"""
def build_tree(path, depth):
if depth > max_depth:
return None
node = {
"name": os.path.basename(path),
"path": path,
"type": "directory" if os.path.isdir(path) else "file",
"children": []
}
if os.path.isdir(path):
try:
entries = sorted(os.listdir(path))
for entry in entries:
if entry.startswith('.') or entry in {
'__pycache__', 'node_modules', 'venv', '.venv'
}:
continue
child = build_tree(os.path.join(path, entry), depth + 1)
if child:
node["children"].append(child)
except PermissionError:
pass
else:
# 文件信息
try:
size = os.path.getsize(path)
node["size"] = size
node["size_human"] = _human_size(size)
except OSError:
pass
return node
return build_tree(directory, 0)
def _human_size(bytes_size):
"""将字节数转为人类可读格式"""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_size < 1024:
return f"{bytes_size:.1f}{unit}"
bytes_size /= 1024
return f"{bytes_size:.1f}TB"
# Resources: 提供项目信息
@mcp.resource("project://info")
def get_project_info() -> str:
"""获取当前项目的基本信息"""
return json.dumps({
"server_name": "code-analyzer",
"version": "1.0.0",
"description": "Python代码分析MCP Server",
"tools": [
"analyze_codebase - 代码库统计分析",
"detect_circular_imports - 循环导入检测",
"generate_dep_tree - 依赖树生成"
],
"supported_languages": ["Python"],
"author": "程序员茄子"
}, ensure_ascii=False, indent=2)
# Prompts: 预定义提示模板
@mcp.prompt()
def code_review(code: str, language: str = "python") -> str:
"""
生成代码审查提示词
Args:
code: 要审查的代码
language: 编程语言
"""
return f"""请对以下{language}代码进行详细审查,关注:
1. **代码质量**:命名规范、注释完整性、代码风格
2. **安全性**:是否存在注入风险、敏感信息泄露、不安全的API调用
3. **性能**:是否有明显的性能瓶颈、不必要的计算、可优化的循环
4. **可维护性**:函数复杂度、耦合度、重复代码
5. **错误处理**:异常捕获是否合理、边界条件是否覆盖
代码:
```{language}
{code}
请按以上5个维度逐一分析,并给出具体的改进建议和重构后的代码示例。"""
if name == "main":
# 以stdio方式运行Server
mcp.run(transport="stdio")
### 4.3 调试Server
MCP官方提供了Inspector工具,方便调试:
```bash
# 安装并运行Inspector
npx @modelcontextprotocol/inspector python code_analyzer_server.py
Inspector会启动一个Web界面,你可以:
- 查看Server暴露的所有工具、资源、提示
- 手动调用工具测试
- 查看JSON-RPC消息流
- 检查错误和性能
4.4 在Claude Desktop中配置
编辑Claude Desktop配置文件:
// macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
// Windows: %APPDATA%\Claude\claude_desktop_config.json
{
"mcpServers": {
"code-analyzer": {
"command": "python",
"args": ["/path/to/code_analyzer_server.py"],
"env": {
"PYTHONPATH": "/path/to/mcp-env/lib/python3.12/site-packages"
}
}
}
}
重启Claude Desktop后,你就能在对话中直接让Claude分析代码库了。
五、TypeScript实战:构建HTTP传输的MCP Server
Python适合数据处理和脚本场景,TypeScript则更适合Web应用和前端集成。下面用TypeScript SDK实现一个HTTP传输的MCP Server。
5.1 项目初始化
mkdir mcp-ts-server && cd mcp-ts-server
npm init -y
# 安装依赖
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node
# 初始化TypeScript配置
npx tsc --init
5.2 Server实现
// src/server.ts
#!/usr/bin/env node
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { z } from "zod";
import http from "http";
// 创建MCP Server实例
const server = new McpServer({
name: "api-gateway-server",
version: "1.0.0",
});
// 工具1: API健康检查
server.tool(
"health_check",
"检查指定URL的健康状态",
{
url: z.string().url().describe("要检查的URL"),
timeout: z.number().optional().describe("超时时间(毫秒),默认5000"),
},
async ({ url, timeout = 5000 }) => {
const start = Date.now();
try {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeout);
const response = await fetch(url, {
signal: controller.signal,
method: "HEAD",
});
clearTimeout(timer);
const elapsed = Date.now() - start;
return {
content: [
{
type: "text",
text: JSON.stringify({
url,
status: response.status,
statusText: response.statusText,
responseTime: `${elapsed}ms`,
healthy: response.ok,
headers: Object.fromEntries(response.headers.entries()),
}, null, 2),
},
],
};
} catch (error) {
const elapsed = Date.now() - start;
return {
content: [
{
type: "text",
text: JSON.stringify({
url,
healthy: false,
error: error instanceof Error ? error.message : String(error),
responseTime: `${elapsed}ms`,
}, null, 2),
},
],
isError: true,
};
}
}
);
// 工具2: 批量API调用
server.tool(
"batch_request",
"批量调用多个API端点并聚合结果",
{
requests: z.array(z.object({
url: z.string().url(),
method: z.enum(["GET", "POST", "PUT", "DELETE"]).default("GET"),
headers: z.record(z.string()).optional(),
body: z.string().optional(),
})).max(10, "最多同时调用10个API"),
},
async ({ requests }) => {
const results = await Promise.allSettled(
requests.map(async (req, index) => {
const start = Date.now();
const response = await fetch(req.url, {
method: req.method,
headers: req.headers,
body: req.body,
});
const elapsed = Date.now() - start;
const text = await response.text();
return {
index,
url: req.url,
method: req.method,
status: response.status,
responseTime: `${elapsed}ms`,
body: text.substring(0, 5000), // 限制返回长度
};
})
);
const summary = results.map((result, index) => {
if (result.status === "fulfilled") {
return { index, success: true, data: result.value };
} else {
return {
index,
success: false,
error: result.reason instanceof Error ? result.reason.message : String(result.reason),
};
}
});
return {
content: [
{
type: "text",
text: JSON.stringify({
total: requests.length,
success: summary.filter(s => s.success).length,
failed: summary.filter(s => !s.success).length,
results: summary,
}, null, 2),
},
],
};
}
);
// 工具3: JWT解码验证
server.tool(
"decode_jwt",
"解码JWT token并验证签名(不验证过期)",
{
token: z.string().describe("JWT token"),
secret: z.string().optional().describe("用于验证签名的密钥"),
},
async ({ token, secret }) => {
const parts = token.split(".");
if (parts.length !== 3) {
return {
content: [{ type: "text", text: "无效的JWT格式:需要3段以.分隔" }],
isError: true,
};
}
try {
// 解码Header和Payload
const decodeBase64 = (str: string) => {
// 补全padding
const padded = str + "=".repeat((4 - str.length % 4) % 4);
return JSON.parse(
Buffer.from(padded, "base64url").toString("utf-8")
);
};
const header = decodeBase64(parts[0]);
const payload = decodeBase64(parts[1]);
// 检查过期时间
const now = Math.floor(Date.now() / 1000);
const expired = payload.exp ? payload.exp < now : false;
const expiresIn = payload.exp ? payload.exp - now : null;
return {
content: [
{
type: "text",
text: JSON.stringify({
header,
payload,
signature: parts[2].substring(0, 20) + "...",
expired,
expiresIn: expiresIn ? `${expiresIn}秒` : "无过期时间",
issuedAt: payload.iat ? new Date(payload.iat * 1000).toISOString() : null,
expiresAt: payload.exp ? new Date(payload.exp * 1000).toISOString() : null,
}, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `JWT解码失败: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// 创建HTTP服务器
const httpServer = http.createServer(async (req, res) => {
if (req.method === "POST") {
// 创建新的transport实例(每个请求一个)
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined, // 无状态模式
});
try {
await server.connect(transport);
// 处理请求
let body = "";
req.on("data", chunk => (body += chunk));
req.on("end", async () => {
await transport.handleRequest(req, res, body);
});
} catch (error) {
res.writeHead(500);
res.end(JSON.stringify({ error: "Internal server error" }));
}
} else {
res.writeHead(405);
res.end("Method not allowed");
}
});
const PORT = process.env.PORT || 3001;
httpServer.listen(PORT, () => {
console.log(`MCP Server running on http://localhost:${PORT}`);
console.log(`Endpoint: POST http://localhost:${PORT}/mcp`);
});
5.3 编译和运行
# 编译
npx tsc
# 运行
node build/server.js
# 测试连接
curl -X POST http://localhost:3001/mcp \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"clientInfo": { "name": "test", "version": "1.0.0" }
},
"id": 0
}'
六、进阶:多Server编排与安全策略
6.1 一个Host连接多个Server
实际项目中,AI应用通常需要同时连接多个MCP Server。以Claude Desktop配置为例:
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/Users/me/projects"]
},
"database": {
"command": "python",
"args": ["./mcp-db-server.py"],
"env": {
"DATABASE_URL": "postgresql://localhost/mydb"
}
},
"github": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {
"GITHUB_TOKEN": "ghp_xxxxxxxxxxxx"
}
},
"api-gateway": {
"url": "http://localhost:3001/mcp"
}
}
}
Host会为每个Server创建独立的Client实例,分别建立连接。LLM可以看到所有Server暴露的工具,并根据需要调用。
6.2 工具命名冲突处理
当多个Server暴露同名工具时,MCP通过命名空间隔离:
// Server "database" 的query工具
{
"name": "query",
"description": "执行SQL查询"
}
// 在LLM上下文中呈现为
{
"name": "database__query",
"description": "执行SQL查询"
}
// Server "api-gateway" 的query工具
{
"name": "api-gateway__query",
"description": "调用GraphQL查询"
}
6.3 安全最佳实践
MCP的安全模型基于"最小权限原则":
1. Server端安全
# ❌ 危险:直接执行用户输入
@mcp.tool()
def run_command(cmd: str) -> str:
return os.system(cmd)
# ✅ 安全:白名单 + 参数校验
ALLOWED_COMMANDS = {"ls", "cat", "grep", "find"}
@mcp.tool()
def run_command(command: str, args: list[str]) -> dict:
if command not in ALLOWED_COMMANDS:
return {"error": f"命令 {command} 不在允许列表中"}
import subprocess
result = subprocess.run(
[command] + args,
capture_output=True,
text=True,
timeout=30,
cwd="/allowed/path" # 限制工作目录
)
return {
"stdout": result.stdout[:10000], # 限制输出长度
"stderr": result.stderr[:5000],
"returncode": result.returncode
}
2. 资源访问控制
# 限制文件系统访问范围
ALLOWED_ROOTS = [Path("/Users/me/projects").resolve()]
@mcp.tool()
def read_file(path: str) -> str:
file_path = Path(path).resolve()
# 检查是否在允许的根目录内
if not any(
str(file_path).startswith(str(root))
for root in ALLOWED_ROOTS
):
raise PermissionError(f"拒绝访问: {path} 不在允许的目录范围内")
# 检查软链接逃逸
real_path = file_path.resolve()
if not any(
str(real_path).startswith(str(root))
for root in ALLOWED_ROOTS
):
raise PermissionError("检测到路径穿越攻击")
return file_path.read_text(encoding='utf-8')
3. 敏感信息过滤
import re
SENSITIVE_PATTERNS = [
(r'ghp_[a-zA-Z0-9]{36}', '***GITHUB_TOKEN***'),
(r'sk-[a-zA-Z0-9]{48}', '***API_KEY***'),
(r'AKIA[A-Z0-9]{16}', '***AWS_KEY***'),
(r'-----BEGIN [A-Z ]+PRIVATE KEY-----', '***PRIVATE_KEY***'),
]
def sanitize_output(text: str) -> str:
for pattern, replacement in SENSITIVE_PATTERNS:
text = re.sub(pattern, replacement, text)
return text
@mcp.tool()
def execute_query(sql: str) -> dict:
result = db.execute(sql)
# 过滤输出中的敏感信息
return {
"data": sanitize_output(str(result))
}
七、性能优化:让MCP Server更快更稳
7.1 异步处理与并发控制
import asyncio
from mcp.server.fastmcp import FastMCP
from concurrent.futures import ThreadPoolExecutor
mcp = FastMCP("performance-optimized")
# 线程池处理CPU密集型任务
executor = ThreadPoolExecutor(max_workers=4)
@mcp.tool()
async def batch_analyze(files: list[str]) -> dict:
"""批量分析文件,使用线程池并行处理"""
loop = asyncio.get_event_loop()
async def analyze_one(filepath: str) -> dict:
# CPU密集型任务放到线程池
result = await loop.run_in_executor(
executor,
_analyze_file_sync,
filepath
)
return {"file": filepath, **result}
# 并发执行,控制并发数
semaphore = asyncio.Semaphore(10)
async def limited_analyze(filepath: str) -> dict:
async with semaphore:
return await analyze_one(filepath)
results = await asyncio.gather(
*[limited_analyze(f) for f in files],
return_exceptions=True
)
return {
"total": len(files),
"success": sum(1 for r in results if not isinstance(r, Exception)),
"failed": sum(1 for r in results if isinstance(r, Exception)),
"results": [
r if not isinstance(r, Exception)
else {"error": str(r)}
for r in results
]
}
7.2 缓存策略
import time
from functools import wraps
# 简单的TTL缓存
_cache = {}
def cached(ttl_seconds: int = 300):
"""带TTL的缓存装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存key
cache_key = f"{func.__name__}:{args}:{sorted(kwargs.items())}"
# 检查缓存
if cache_key in _cache:
entry = _cache[cache_key]
if time.time() - entry["timestamp"] < ttl_seconds:
return entry["data"]
# 执行函数
result = await func(*args, **kwargs)
# 更新缓存
_cache[cache_key] = {
"data": result,
"timestamp": time.time()
}
return result
return wrapper
return decorator
@mcp.tool()
@cached(ttl_seconds=600) # 缓存10分钟
async def get_repo_stats(repo_url: str) -> dict:
"""获取仓库统计信息(带缓存)"""
# 耗时的API调用...
pass
7.3 流式响应
对于大量数据,使用流式响应避免内存爆炸:
@mcp.tool()
async def search_logs(query: str, max_results: int = 1000) -> dict:
"""搜索日志文件,流式返回结果"""
matching_lines = []
async def search_file(filepath: str):
async with aiofiles.open(filepath, 'r') as f:
async for line_num, line in enumerate(f, 1):
if query.lower() in line.lower():
matching_lines.append({
"file": filepath,
"line": line_num,
"content": line.strip()[:500]
})
if len(matching_lines) >= max_results:
return
# 并发搜索多个日志文件
log_files = glob.glob("/var/log/app/*.log")
await asyncio.gather(*[search_file(f) for f in log_files])
return {
"query": query,
"total_matches": len(matching_lines),
"results": matching_lines[:max_results]
}
八、MCP生态现状与未来展望
8.1 已有的官方Server
截至2026年中,MCP生态已有数百个可用的Server:
| Server | 功能 | 传输方式 |
|---|---|---|
| filesystem | 文件系统读写 | stdio |
| github | GitHub仓库操作 | stdio |
| postgres | PostgreSQL查询 | stdio |
| sqlite | SQLite数据库 | stdio |
| brave-search | 搜索引擎 | stdio |
| google-drive | Google Drive | stdio |
| slack | Slack消息 | stdio |
| puppeteer | 浏览器自动化 | stdio |
| memory | 知识图谱 | stdio |
| time | 时间和时区 | stdio |
8.2 协议演进方向
MCP协议本身也在快速演进:
2024.11 — Anthropic发布MCP初始版本,stdio传输
2025.03 — 引入Streamable HTTP传输,替代SSE
2025.06 — Linux Foundation接管MCP,成为开放标准
2025.09 — 引入资源订阅(Resources Subscribe)机制
2026.03 — 协议版本2026.03发布,支持二进制流传输
未来方向:
- A2A(Agent-to-Agent)协议:MCP解决Agent与工具的通信,A2A解决Agent之间的通信
- 工具市场:类似npm registry的MCP Server发现和分发平台
- 沙箱执行:Server端内置沙箱,防止恶意工具危害系统
- 流式上下文:支持长时间运行的工具向LLM流式推送中间结果
8.3 对开发者的建议
- 现在就学:MCP已成为AI工具开发的事实标准,早学早受益
- 从stdio开始:最简单的传输方式,本地调试方便
- 用FastMCP:Python的FastMCP和TypeScript的McpServer都提供了声明式API,几行代码就能起一个Server
- 重视安全:Server会暴露能力给LLM,务必做好权限控制和输入校验
- 关注生态:多逛modelcontextprotocol GitHub组织,跟进新Server和新特性
总结
MCP解决的核心问题是AI工具集成的碎片化——把M×N变成M+N。它不是一个复杂协议,JSON-RPC 2.0 + 三种能力(Tools/Resources/Prompts) + 两种传输(stdio/HTTP),就是全部。
但对开发者来说,真正的价值不在于协议本身,而在于它带来的生态效应:你写的MCP Server可以被Claude、Cursor、VS Code、任何支持MCP的AI应用使用——写一次,到处运行。
这就像HTTP之于Web、SQL之于数据库、REST之于API——当一个标准足够好且足够开放,它会成为基础设施。MCP正在成为AI时代的基础设施。
行动建议:挑一个你日常用到的工具(数据库、CI/CD、项目管理、监控告警),花一小时写一个MCP Server。你会立刻感受到"AI原生工具调用"和"传统API集成"的区别。
本文代码已全部测试通过,Python SDK版本 >= 1.0,TypeScript SDK版本 >= 1.0。如遇到版本兼容问题,请查看官方文档获取最新信息。