编程 AI 时代的 USB 接口迎来最大升级:MCP 协议 2026-07-28 规范候选版深度解析

2026-06-26 14:16:30 +0800 CST views 7

AI 时代的 USB 接口迎来最大升级:MCP 协议 2026-07-28 规范候选版深度解析

45 天后,这些代码将全部失效——从有状态到无状态,远程 MCP Server 必须迁移的七个理由


一、背景:MCP 为什么突然需要「动大手术」?

2026 年 5 月 21 日,Model Context Protocol(MCP)核心维护团队正式发布了 2026-07-28 规范的 Release Candidate(候选版)。官方用了"自 MCP 发布以来最大规模的修订"来形容这次变更——这不是公关话术,而是字面意义上的破坏性更新。

MCP 是什么?它是 Anthropic 在 2025 年底主导推出的开放协议,旨在为 AI 模型提供统一的方式连接外部工具、数据源和服务。如果把 AI 模型比作一台电脑,那 MCP 就是** USB 接口**——无论你用的是 Claude、GPT-5.5 还是 Gemini 2.5,都可以通过同一套协议调用文件系统、GitHub、数据库、企业内部 API 等外部能力。

自发布以来,MCP 的生态扩张速度堪称惊人:

维度数据
MCP Server 数量5000+(官方 + 社区)
支持 MCP 的客户端Claude Desktop、Cursor、VS Code、JetBrains、Continue
主流云厂商支持AWS、Azure、GCP 均已提供官方 MCP Server
GitHub Stars(spec 仓库)30k+
规范版本2025-03-26(旧)→ 2026-07-28(新 RC)

问题在于:早期 MCP 的设计面向本地开发场景,协议层维护了一个有状态会话模型。这个设计在本地用一用没问题,但一旦进入生产环境——要做水平扩展、要上 K8s 集群、要暴露到公网——问题就全来了:

  1. 水平扩展困难:有状态意味着请求必须路由到同一台服务器,需要 sticky session。
  2. 基础设施成本高:需要 Redis 或 Memcached 存储会话状态。
  3. 认证混乱:旧规范对认证几乎没有强制要求,各实现各自为政,安全隐患巨大。

2026-07-28 规范从根本上解决了这三个问题。代价是——破坏性变更。最终规范将于 2026 年 7 月 28 日正式发布,从 RC 到废弃旧规范,你有约 45 天的迁移窗口期(过渡期截至 2026-08-28)。


二、七大破坏性变更:代码级详解

2.1 最核心变更:会话移除(Session Removal)

这是所有变更中最重要的一条。协议层彻底移除了会话概念

旧规范的工作流程:

// 第一步:初始化握手,建立会话
→ {"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {
    "protocolVersion": "2025-03-26",
    "clientInfo": {"name": "cursor", "version": "1.0"}
  }}

← {"jsonrpc": "2.0", "result": {
    "protocolVersion": "2025-03-26",
    "serverInfo": {"name": "github-mcp", "version": "2.1"},
    "sessionId": "abc-123-def-456"  // ← 会话ID,所有后续请求都要带
  }, "id": 1}

// 第二步:调用工具,携带 sessionId
→ {"jsonrpc": "2.0", "method": "tools/call", "id": 2, "params": {
    "name": "create_issue",
    "arguments": {"title": "Bug report"},
    "_sessionId": "abc-123-def-456"  // ← 必须带!
  }}

新规范的工作流程:

// 没有初始化,没有 sessionId,直接调用
→ {"jsonrpc": "2.0", "method": "tools/call", "id": 1, "params": {
    "name": "create_issue",
    "arguments": {"title": "Bug report"}
  }}

← {"jsonrpc": "2.0", "result": {
    "content": [{"type": "text", "text": "Issue created: #42"}]
  }, "id": 1}

这对工程实践意味着什么?

首先,你不再需要关心「连接建立」这个概念。每一次请求都是独立的、自包含的,天然适合 HTTP/2 和 HTTP/3 的多路复用。其次,如果你之前的 Server 依赖 session_id 做路由或状态管理,必须重构

新规范引入了「显式状态句柄」(Explicit State Handles)作为迁移路径:

# Python 实现:无状态 MCP Server 示例
import httpx
import asyncio
from dataclasses import dataclass
from typing import Optional

@dataclass
class ConnectionState:
    """无状态设计中的显式连接状态"""
    connection_id: str
    last_query: str
    user_context: dict

class StatelessMCPServer:
    """
    新规范下的无状态 MCP Server。
    所有状态通过 state_token 在请求/响应中显式传递,
    不再依赖协议层维护的会话。
    """
    
    def __init__(self, connection_pool_size: int = 10):
        self.connection_pool: dict[str, object] = {}
        self.state_store: dict[str, ConnectionState] = {}
    
    async def handle_tool_call(
        self, 
        request: dict, 
        state_token: Optional[str] = None
    ) -> dict:
        """
        核心方法:从请求中提取显式状态句柄,
        在响应中返回新的状态句柄。
        """
        # 从 state_token 恢复状态(如果有的话)
        if state_token and state_token in self.state_store:
            state = self.state_store[state_token]
        else:
            # 新请求,创建初始状态
            state = ConnectionState(
                connection_id=self._generate_conn_id(),
                last_query="",
                user_context={}
            )
        
        # 执行实际的工具调用
        method = request.get("method")
        params = request.get("params", {})
        
        if method == "tools/call":
            result = await self._execute_tool(
                name=params.get("name"),
                arguments=params.get("arguments", {}),
                state=state
            )
        else:
            result = {"error": f"Unknown method: {method}"}
        
        # 更新状态并返回新的 state_token
        new_token = self._save_state(state)
        
        return {
            "jsonrpc": "2.0",
            "result": {"content": [{"type": "text", "text": str(result)}]},
            "state_token": new_token  # 客户端下次请求需要携带这个
        }
    
    async def _execute_tool(self, name: str, arguments: dict, state: ConnectionState) -> str:
        """模拟工具执行"""
        if name == "database_query":
            sql = arguments.get("sql", "")
            state.last_query = sql
            return f"Query executed: {sql[:50]}..."
        return f"Tool '{name}' executed successfully"
    
    def _generate_conn_id(self) -> str:
        import uuid
        return str(uuid.uuid4())[:8]
    
    def _save_state(self, state: ConnectionState) -> str:
        """将状态加密序列化,返回 state_token"""
        import json
        import base64
        token_data = json.dumps({
            "conn_id": state.connection_id,
            "last_query": state.last_query,
            "ctx": state.user_context
        })
        token = base64.urlsafe_b64encode(token_data.encode()).decode()
        # 实际生产中应该用 AES-256-GCM 加密
        self.state_store[token] = state
        return token

架构师视角的思考:无状态设计的最大好处是可观测性提升。每一对请求/响应都包含了完整的上下文,分布式追踪(如 Jaeger、Zipkin)的接入成本大幅降低。不再需要额外的 session 存储,Redis 的使用场景可以减少一个。


2.2 初始化握手移除(Initialize Handshake Removal)

旧规范要求客户端首先调用 initialize 方法,交换协议版本和能力信息。这是每个 MCP Client 启动时必经的第一步。

旧方式(JSON-RPC 初始化):

# Python MCP Client 旧实现
import asyncio
import mcp.protocol as mcp

async def old_initialize():
    client = mcp.Client(server_url="https://api.example.com/mcp")
    
    # 第一步:必须先初始化
    init_result = await client.request(
        method="initialize",
        params={
            "protocolVersion": "2025-03-26",
            "clientInfo": {"name": "my-app", "version": "1.0.0"},
            "capabilities": {"tools": {"listChanged": True}}
        }
    )
    
    # 从响应中提取 session_id
    session_id = init_result["sessionId"]
    print(f"Session established: {session_id}")
    
    # 第二步:才可以使用工具
    tools = await client.list_tools(session_id=session_id)
    return tools

新方式(HTTP 头部协商):

# Python MCP Client 新实现
import httpx

class NewMCPClient:
    """
    2026-07-28 规范下的 MCP Client。
    版本和能力信息通过 HTTP 头部传递,无需初始化握手。
    """
    
    def __init__(self, server_url: str, access_token: str):
        self.server_url = server_url
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
            "MCP-Protocol-Version": "2026-07-28",
            "MCP-Capabilities": "tools,listChanged"
        }
    
    async def call_tool(self, name: str, arguments: dict) -> dict:
        """
        直接调用,无需初始化。
        头部信息告诉 Server 我们的协议版本和能力。
        """
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.server_url,
                json={
                    "jsonrpc": "2.0",
                    "method": "tools/call",
                    "params": {
                        "name": name,
                        "arguments": arguments
                    },
                    "id": self._next_id()
                },
                headers=self.headers
            )
            return response.json()
    
    async def list_tools(self) -> dict:
        """直接列出工具,无需先建立会话"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.server_url,
                json={
                    "jsonrpc": "2.0",
                    "method": "tools/list",
                    "params": {},
                    "id": self._next_id()
                },
                headers=self.headers
            )
            return response.json()
    
    def _next_id(self) -> int:
        if not hasattr(self, "_counter"):
            self._counter = 0
        self._counter += 1
        return self._counter

架构影响分析:初始化握手的移除,使得 MCP 的冷启动时间大幅缩短。对于需要频繁创建/销毁连接的边缘计算场景(Cloudflare Workers、AWS Lambda),这是巨大的利好。你不需要再维护一个「连接生命周期」,每一次请求都是完全独立的。


2.3 三个核心功能废弃

废弃功能替代方案废弃原因
resources/list统一到 tools/list资源本质上是一种只读工具,统一接口减少概念复杂度
prompts/list扩展机制(Extensions)Prompts 作为核心功能使用率极低,移至扩展层
sampling客户端决定采样是客户端的职责,不属于协议层

这对大多数开发者影响不大。但如果你的 Server 实现了 prompts/list 接口,需要迁移到新的扩展机制:

# 新规范:Prompts 作为扩展实现
class MCPExtensions:
    """
    MCP 2026-07-28 扩展注册表
    """
    # 官方扩展
    TASKS = "tasks/v1"        # 长时间运行的任务
    MCP_APPS = "mcp-apps/v1"  # 服务端渲染 UI
    CUSTOM = "custom/v1"      # 自定义扩展
    
    @staticmethod
    def register_prompt_extension(server, prompts: list[dict]):
        """将旧的 prompts/list 迁移为扩展"""
        @server.route("/extensions/prompts/list")
        async def list_prompts(request):
            return {"prompts": prompts}

2.4 OAuth 2.1 全面接入(最重要企业级变更)

这是对企业用户影响最大的一条变更。新规范将 MCP Server 明确定义为 OAuth 2.1 资源服务器,MCP Client 作为 OAuth 2.1 客户端。PKCE 成为强制要求,不再是可选的安全加固。

OAuth 2.1 的关键要求:

  1. PKCE 强制化:Authorization Code Flow 必须使用 code_challenge + code_verifier
  2. Redirect URI 精确匹配:不允许通配符
  3. 令牌内省:服务端必须支持令牌撤销
# TypeScript: MCP Server OAuth 2.1 认证实现
import { 
  AuthorizationCode, 
  ResourceServer, 
  generateCodeVerifier, 
  generateCodeChallenge 
} from "oauth2.1";

interface MCPOAuthConfig {
  authorizationEndpoint: string;
  tokenEndpoint: string;
  requirePKCE: true;
  supportedScopes: string[];
  revocationEndpoint?: string;
}

class MCPOAuthResourceServer implements ResourceServer {
  private config: MCPOAuthConfig;
  
  constructor(config: MCPOAuthConfig) {
    this.config = config;
  }
  
  // 服务端:提供 OAuth 元数据端点(RFC 8414)
  async getOAuthMetadata(): Promise<object> {
    return {
      authorization_endpoint: this.config.authorizationEndpoint,
      token_endpoint: this.config.tokenEndpoint,
      scopes_supported: this.config.supportedScopes,
      grant_types_supported: ["authorization_code"],
      code_challenge_methods_supported: ["S256"],
      revocation_endpoint: this.config.revocationEndpoint
    };
  }
  
  // 服务端:授权码发放
  async issueAuthorizationCode(
    clientId: string, 
    redirectUri: string, 
    scope: string
  ): Promise<string> {
    const code = crypto.randomBytes(32).toString("hex");
    const pkceVerifier = generateCodeVerifier(); // 服务端也应存储 verifier 的 hash
    
    await this.codeStore.set(code, {
      clientId,
      redirectUri,
      scope,
      pkceVerifierHash: await this._hashVerifier(pkceVerifier),
      expiresAt: Date.now() + 10 * 60 * 1000 // 10 分钟有效期
    });
    
    return code;
  }
  
  // 服务端:令牌交换(必须验证 PKCE)
  async exchangeCodeForToken(
    code: string,
    codeVerifier: string,
    redirectUri: string
  ): Promise<{ access_token: string; expires_in: number }> {
    const stored = await this.codeStore.get(code);
    
    if (!stored || stored.redirectUri !== redirectUri) {
      throw new Error("INVALID_GRANT");
    }
    
    // 验证 PKCE - 这是 OAuth 2.1 的核心要求
    const verifierHash = await this._hashVerifier(codeVerifier);
    if (verifierHash !== stored.pkceVerifierHash) {
      throw new Error("INVALID_CODE_VERIFIER"); // 攻击者无法伪造
    }
    
    const accessToken = crypto.randomBytes(32).toString("base64url");
    const expiresIn = 3600; // 1 小时
    
    await this.tokenStore.set(accessToken, {
      clientId: stored.clientId,
      scope: stored.scope,
      issuedAt: Date.now()
    });
    
    // 授权码一次性使用
    await this.codeStore.delete(code);
    
    return { access_token: accessToken, expires_in: expiresIn };
  }
  
  private async _hashVerifier(verifier: string): Promise<string> {
    const encoder = new TextEncoder();
    const data = encoder.encode(verifier);
    const hash = await crypto.subtle.digest("SHA-256", data);
    return Buffer.from(hash).toString("base64url");
  }
}

MCP Client 端的 PKCE 流程:

# Python: MCP Client OAuth 2.1 认证(带 PKCE)
import httpx
import asyncio
import secrets
import base64
import hashlib

class MCPOAuthClient:
    """
    MCP 2026-07-28 规范下的 OAuth 2.1 Client。
    PKCE 是强制要求。
    """
    
    def __init__(self, server_url: str, client_id: str):
        self.server_url = server_url
        self.client_id = client_id
        self.access_token: str | None = None
        self.token_expires_at: float = 0
    
    async def authenticate(self) -> str:
        """完整的 OAuth 2.1 + PKCE 认证流程"""
        
        # 第一步:发现 OAuth 元数据
        metadata = await self._discover_oauth_metadata()
        
        # 第二步:生成 PKCE 挑战
        code_verifier = secrets.token_urlsafe(64)
        code_challenge = self._generate_code_challenge(code_verifier)
        state = secrets.token_urlsafe(16)
        
        # 第三步:构造授权 URL(这里简化了,实际需要启动本地 HTTP 服务处理回调)
        auth_url = metadata["authorization_endpoint"]
        auth_url += f"?response_type=code"
        auth_url += f"&client_id={self.client_id}"
        auth_url += f"&redirect_uri=http://localhost:8765/callback"
        auth_url += f"&scope={' '.join(metadata['supported_scopes'])}"
        auth_url += f"&state={state}"
        auth_url += f"&code_challenge={code_challenge}"
        auth_url += f"&code_challenge_method=S256"
        
        print(f"请访问以下地址完成授权:\n{auth_url}")
        
        # 第四步:等待授权码(实际应用中启动本地 HTTP 服务器接收回调)
        auth_code = await self._wait_for_auth_code()
        
        # 第五步:交换令牌
        token_response = await httpx.AsyncClient().post(
            metadata["token_endpoint"],
            data={
                "grant_type": "authorization_code",
                "code": auth_code,
                "redirect_uri": "http://localhost:8765/callback",
                "client_id": self.client_id,
                "code_verifier": code_verifier  # PKCE: 发送明文 verifier
            }
        )
        
        token_data = token_response.json()
        self.access_token = token_data["access_token"]
        self.token_expires_at = asyncio.get_event_loop().time() + token_data["expires_in"]
        
        return self.access_token
    
    def _generate_code_challenge(self, verifier: str) -> str:
        """S256 PKCE 方法:SHA-256 哈希 + Base64URL 编码"""
        digest = hashlib.sha256(verifier.encode()).digest()
        return base64.urlsafe_b64encode(digest).decode().rstrip("=")
    
    async def _discover_oauth_metadata(self) -> dict:
        """从 /.well-known/oauth-authorization-server 发现元数据"""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{self.server_url}/.well-known/oauth-authorization-server"
            )
            return response.json()
    
    async def _wait_for_auth_code(self) -> str:
        """等待授权回调(实际应用中启动 HTTP 服务器)"""
        # 这里简化处理,实际需要 asyncio.Event 和本地 HTTP 服务器
        await asyncio.sleep(0)
        return "PLACEHOLDER_AUTH_CODE"
    
    async def call_tool(self, name: str, arguments: dict) -> dict:
        """带认证的 MCP 调用"""
        if not self.access_token or asyncio.get_event_loop().time() >= self.token_expires_at:
            await self.authenticate()
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.server_url,
                json={
                    "jsonrpc": "2.0",
                    "method": "tools/call",
                    "params": {"name": name, "arguments": arguments}
                },
                headers={
                    "Authorization": f"Bearer {self.access_token}",
                    "MCP-Protocol-Version": "2026-07-28"
                }
            )
            return response.json()

这条变更的实际意义:旧规范对认证几乎是放任的——很多 MCP Server 根本没有认证,或者用简单的 API Key 认证。现在 OAuth 2.1 强制化,PKCE 强制化,意味着 MCP 可以安全地暴露在公网上了。对企业用户来说,这意味着终于可以在生产环境中放心使用第三方 MCP Server。


2.5 传输层标准化:Streamable HTTP 取代 SSE

旧规范使用 SSE(Server-Sent Events)作为远程传输方式。新规范统一为 Streamable HTTP,兼顾流式响应和水平扩展能力。

# Python: 新规范的 Streamable HTTP Transport
import httpx
import asyncio
from typing import AsyncIterator

class StreamableHTTPTransport:
    """
    MCP 2026-07-28 Streamable HTTP Transport。
    支持:
    - 短请求/响应(无状态)
    - 流式响应(Server-Sent Events 替代方案)
    - 错误处理
    """
    
    def __init__(
        self, 
        server_url: str, 
        access_token: str,
        protocol_version: str = "2026-07-28"
    ):
        self.server_url = server_url
        self.default_headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
            "MCP-Protocol-Version": protocol_version,
            "Accept": "application/json, text/event-stream"
        }
        self._client: httpx.AsyncClient | None = None
    
    @property
    def client(self) -> httpx.AsyncClient:
        if self._client is None:
            self._client = httpx.AsyncClient(
                timeout=httpx.Timeout(30.0, connect=5.0),
                limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
            )
        return self._client
    
    async def request(
        self, 
        method: str, 
        params: dict,
        idempotency_key: str | None = None
    ) -> dict:
        """
        发送短请求,返回完整响应。
        用于 tools/call 等同步操作。
        """
        headers = {**self.default_headers}
        if idempotency_key:
            headers["Idempotency-Key"] = idempotency_key
        
        response = await self.client.post(
            self.server_url,
            json={
                "jsonrpc": "2.0",
                "method": method,
                "params": params
            },
            headers=headers
        )
        
        content_type = response.headers.get("content-type", "")
        
        if "application/json" in content_type:
            return response.json()
        elif "text/event-stream" in content_type:
            # 处理流式响应(累积直到完整)
            return await self._collect_sse_response(response)
        else:
            raise ValueError(f"Unexpected content-type: {content_type}")
    
    async def stream_request(
        self, 
        method: str, 
        params: dict
    ) -> AsyncIterator[dict]:
        """
        流式请求,用于返回大量数据的场景。
        生成器模式,不等待完整响应。
        """
        async with self.client.stream(
            "POST",
            self.server_url,
            json={
                "jsonrpc": "2.0",
                "method": method,
                "params": params
            },
            headers=self.default_headers
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    import json
                    yield json.loads(line[6:])
    
    async def _collect_sse_response(self, response: httpx.Response) -> dict:
        """将 SSE 流累积为完整 JSON 响应"""
        chunks = []
        async for line in response.aiter_lines():
            if line.startswith("data: "):
                chunks.append(line[6:])
        import json
        return json.loads("".join(chunks))
    
    async def close(self):
        if self._client:
            await self._client.aclose()
            self._client = None

2.6 错误码标准化

新规范统一了 JSON-RPC 错误码体系,并新增了认证相关的标准错误码:

# MCP 2026-07-28 标准化错误码
class MCPErrors:
    # JSON-RPC 标准错误(保留)
    PARSE_ERROR = -32700
    INVALID_REQUEST = -32600
    METHOD_NOT_FOUND = -32601
    INVALID_PARAMS = -32602
    INTERNAL_ERROR = -32603
    
    # MCP 专用错误(新增)
    AUTHENTICATION_REQUIRED = -32000  # 请求缺少认证信息
    AUTHORIZATION_FAILED = -32001    # 认证通过但权限不足
    TOKEN_EXPIRED = -32002           # 令牌过期
    TOKEN_REVOKED = -32003           # 令牌已被撤销
    SCOPE_INSUFFICIENT = -32004      # 令牌 scope 不足以执行该操作
    SERVER_UNAVAILABLE = -32005      # 服务暂时不可用(可重试)


# 统一错误响应格式
def create_error_response(error_code: int, message: str, data: dict = None) -> dict:
    return {
        "jsonrpc": "2.0",
        "error": {
            "code": error_code,
            "message": message,
            "data": data or {}
        }
    }

2.7 扩展机制:MCP Apps 和 Tasks

新规范引入了正式的扩展机制,用于协议核心之外的功能:

# 扩展声明与使用示例

# MCP Apps 扩展:服务端渲染 UI
# 用于让 AI 生成的响应中包含可交互的 UI 组件
async def create_mcp_app(client, app_spec: dict):
    """创建 MCP App(服务端渲染的交互界面)"""
    response = await client.request(
        method="extensions/mcp-apps/create",
        params={
            "spec": app_spec,  # React/Vue/Svelte 组件定义
            "framework": "react",
            "styles": {"theme": "dark", "maxWidth": "800px"}
        }
    )
    return response["app_id"]  # 返回 App ID,前端渲染该 URL


# Tasks 扩展:长时间运行的任务
# 用于代码重构、大规模分析等耗时操作
async def run_long_task(client, task_type: str, params: dict):
    """创建长时间运行的任务"""
    task = await client.request(
        method="extensions/tasks/create",
        params={
            "type": task_type,
            "params": params,
            "timeout": 3600,  # 1 小时超时
            "priority": "normal"
        }
    )
    
    task_id = task["id"]
    
    # 轮询任务状态
    while True:
        status = await client.request(
            method="extensions/tasks/status",
            params={"id": task_id}
        )
        
        if status["state"] == "completed":
            return status["result"]
        elif status["state"] == "failed":
            raise RuntimeError(f"Task failed: {status['error']}")
        
        await asyncio.sleep(10)  # 每 10 秒轮询

三、迁移路径:从有状态到无状态的实战指南

3.1 完整迁移示例:数据库查询 MCP Server

假设你有一个提供数据库查询能力的 MCP Server,旧实现基于有状态会话:

# ========== 旧实现(有状态) ==========
import uuid
from typing import Optional
from dataclasses import dataclass, field

@dataclass
class Session:
    db_connection: object
    query_history: list[str] = field(default_factory=list)
    user: Optional[str] = None

class OldDatabaseMCPServer:
    """旧规范:有状态会话管理"""
    
    def __init__(self, db_url: str):
        self.db_url = db_url
        self.sessions: dict[str, Session] = {}  # ← 内存存储会话
        self.db_pool = self._create_pool(db_url)
    
    async def initialize(self, client_info: dict) -> dict:
        """旧规范:初始化建立会话"""
        session_id = str(uuid.uuid4())
        self.sessions[session_id] = Session(
            db_connection=await self.db_pool.get_connection(),
            user=client_info.get("name")
        )
        return {
            "sessionId": session_id,
            "protocolVersion": "2025-03-26",
            "instructions": "所有请求请携带 sessionId"
        }
    
    async def execute_query(
        self, 
        session_id: str, 
        sql: str
    ) -> dict:
        """旧规范:通过 session_id 查找会话"""
        if session_id not in self.sessions:
            raise ValueError(f"Invalid session: {session_id}")
        
        session = self.sessions[session_id]
        result = await session.db_connection.execute(sql)
        session.query_history.append(sql)
        
        return {
            "rows": result.rows,
            "affected": result.affected,
            "history_count": len(session.query_history)
        }

迁移到新规范后:

# ========== 新实现(无状态 + OAuth 2.1) ==========
import uuid
import base64
import json
import asyncio
from typing import Optional
from dataclasses import dataclass

@dataclass
class ConnectionState:
    """显式状态:替代旧规范的 Session"""
    connection_id: str
    user: Optional[str]
    query_history: list[str]
    last_activity: float


class NewDatabaseMCPServer:
    """
    新规范:无状态 + OAuth 2.1
    关键变化:
    1. 移除 initialize 方法
    2. 无 session_id 依赖
    3. 状态通过 state_token 显式传递
    4. 强制 OAuth 2.1 认证
    """
    
    def __init__(self, db_url: str):
        self.db_url = db_url
        # 连接池(用于无状态设计中的连接复用)
        self._connection_pool: asyncio.Queue = asyncio.Queue()
        self._init_pool(db_url)
        # 状态存储(可以用 Redis 替代内存)
        self._state_store: dict[str, ConnectionState] = {}
    
    def _init_pool(self, db_url: str):
        """初始化连接池"""
        # 实际生产中初始化连接...
        pass
    
    async def handle_request(
        self, 
        request: dict,
        access_token: str,
        state_token: Optional[str] = None
    ) -> dict:
        """统一请求处理入口"""
        
        # 第一步:验证 OAuth 令牌
        if not await self._verify_token(access_token):
            return self._error_response(-32000, "Authentication required")
        
        # 第二步:从 state_token 恢复状态(或创建新状态)
        current_state = self._restore_or_create_state(state_token)
        
        # 第三步:执行方法
        method = request.get("method", "")
        params = request.get("params", {})
        
        if method == "tools/call":
            result = await self._call_tool(params, current_state)
        elif method == "tools/list":
            result = self._list_tools()
        else:
            return self._error_response(-32601, f"Unknown method: {method}")
        
        # 第四步:保存状态,返回新的 state_token
        new_state_token = self._save_state(current_state)
        
        return {
            "jsonrpc": "2.0",
            "result": result,
            "state_token": new_state_token  # ← 显式状态句柄
        }
    
    async def _call_tool(self, params: dict, state: ConnectionState) -> dict:
        """执行工具调用"""
        tool_name = params.get("name")
        arguments = params.get("arguments", {})
        
        if tool_name == "database_query":
            sql = arguments.get("sql", "")
            
            # 获取连接(无状态:从池中获取,不依赖会话)
            conn = await self._get_connection(state)
            
            try:
                result = await conn.execute(sql)
                state.query_history.append(sql)
                state.last_activity = asyncio.get_event_loop().time()
                
                return {
                    "content": [{
                        "type": "text",
                        "text": json.dumps({
                            "rows": result.rows[:100],  # 限制返回行数
                            "affected": result.affected,
                            "total_rows": result.total
                        }, default=str)
                    }]
                }
            finally:
                await self._release_connection(conn)
        
        return {"error": f"Unknown tool: {tool_name}"}
    
    async def _get_connection(self, state: ConnectionState):
        """从连接池获取连接"""
        try:
            conn = await asyncio.wait_for(
                self._connection_pool.get(), 
                timeout=5.0
            )
            return conn
        except asyncio.TimeoutError:
            # 超时:创建临时连接
            return await self._create_temp_connection()
    
    async def _release_connection(self, conn):
        """归还连接到池"""
        if not self._connection_pool.full():
            await self._connection_pool.put(conn)
    
    def _list_tools(self) -> dict:
        """列出可用工具(替代旧规范的 resources/list)"""
        return {
            "tools": [
                {
                    "name": "database_query",
                    "description": "Execute SQL query against the database",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "sql": {
                                "type": "string",
                                "description": "SQL query to execute"
                            }
                        },
                        "required": ["sql"]
                    }
                }
            ]
        }
    
    def _restore_or_create_state(
        self, 
        state_token: Optional[str]
    ) -> ConnectionState:
        """从 state_token 恢复状态,或创建新状态"""
        if state_token:
            try:
                # 解码 state_token(实际生产中应该解密)
                decoded = base64.urlsafe_b64decode(state_token).decode()
                data = json.loads(decoded)
                
                if data.get("conn_id") in self._state_store:
                    return self._state_store[data["conn_id"]]
            except Exception:
                pass  # token 无效,当作新状态处理
        
        # 创建新状态
        return ConnectionState(
            connection_id=str(uuid.uuid4())[:8],
            user=None,
            query_history=[],
            last_activity=asyncio.get_event_loop().time()
        )
    
    def _save_state(self, state: ConnectionState) -> str:
        """将状态编码为 state_token"""
        data = json.dumps({
            "conn_id": state.connection_id,
            "user": state.user,
            "history_len": len(state.query_history)
        })
        token = base64.urlsafe_b64encode(data.encode()).decode()
        self._state_store[state.connection_id] = state
        return token
    
    async def _verify_token(self, token: str) -> bool:
        """验证 OAuth 2.1 访问令牌"""
        # 实际实现中应该调用令牌内省端点或验证 JWT
        return len(token) > 0  # 简化示例
    
    def _error_response(self, code: int, message: str) -> dict:
        return {
            "jsonrpc": "2.0",
            "error": {"code": code, "message": message}
        }
    
    def _create_temp_connection(self):
        """创建临时连接(池耗尽时的降级方案)"""
        pass

四、连接池设计与无状态架构的权衡

无状态架构带来了一个关键问题:连接谁来管理?

在旧规范中,连接在 initialize 时建立,在整个会话生命周期内复用。这有好处(减少 TCP 握手开销),也有坏处(连接长期占用,资源利用率低)。

新规范下,这个问题变得有趣了:

连接池策略选择

from enum import Enum
from typing import Protocol
import asyncio

class PoolStrategy(Enum):
    PER_REQUEST = "per_request"      # 每次请求新建连接
    POOL_SHARED = "pool_shared"      # 全局连接池共享
    STATEFUL_AWARE = "stateful_aware"  # 按 state_token 亲和


class ConnectionManager(Protocol):
    """连接管理器接口"""
    async def acquire(self) -> Connection: ...
    async def release(self, conn: Connection) -> None: ...


class PerRequestPool(ConnectionManager):
    """
    策略一:每请求一个连接
    优点:简单,无状态,适合短生命周期的 Serverless 函数
    缺点:TCP 握手开销,数据库连接建立开销
    """
    
    def __init__(self, db_url: str):
        self.db_url = db_url
    
    async def acquire(self) -> Connection:
        # 每次都创建新连接(实际用数据库驱动)
        return await create_connection(self.db_url)
    
    async def release(self, conn: Connection):
        await conn.close()


class SharedPool(ConnectionManager):
    """
    策略二:全局连接池
    优点:高复用率,低延迟,适合高并发场景
    缺点:连接数有上限,需要处理池耗尽
    """
    
    def __init__(self, db_url: str, pool_size: int = 20):
        self._pool = asyncio.Queue(maxsize=pool_size)
        self._semaphore = asyncio.Semaphore(pool_size)
        self._db_url = db_url
        # 预热连接池
        for _ in range(pool_size):
            conn = await create_connection(db_url)
            await self._pool.put(conn)
    
    async def acquire(self) -> Connection:
        await self._semaphore.acquire()
        try:
            # 带超时的获取
            conn = await asyncio.wait_for(
                self._pool.get(), 
                timeout=10.0
            )
            return conn
        except asyncio.TimeoutError:
            self._semaphore.release()
            raise RuntimeError("Connection pool exhausted")
    
    async def release(self, conn: Connection):
        if not self._pool.full():
            await self._pool.put(conn)
        self._semaphore.release()


class StatefulAwarePool(ConnectionManager):
    """
    策略三:状态感知连接池(推荐)
    根据 state_token 将请求路由到固定的连接,
    兼顾无状态架构的灵活性与连接复用的效率。
    
    工作原理:
    1. state_token 中编码了 connection_id
    2. 连接池按 connection_id 哈希分配连接
    3. 同一 state_token 的请求复用同一连接
    4. 不同 state_token 的请求分散到不同连接
    """
    
    def __init__(self, db_url: str, pool_size: int = 50):
        self._pools: dict[str, asyncio.Queue] = {}
        self._db_url = db_url
        self._pool_size = pool_size
        self._lock = asyncio.Lock()
    
    async def _get_or_create_pool(self, conn_id: str) -> asyncio.Queue:
        if conn_id not in self._pools:
            async with self._lock:
                if conn_id not in self._pools:
                    q = asyncio.Queue(maxsize=3)  # 每个连接 ID 最多 3 个并发
                    for _ in range(2):
                        conn = await create_connection(self._db_url)
                        await q.put(conn)
                    self._pools[conn_id] = q
        return self._pools[conn_id]
    
    async def acquire(self) -> tuple[str, Connection]:
        """返回 (conn_id, connection)"""
        # 这个设计让无状态架构下仍能保持连接亲和性
        conn_id = "default"  # 实际从 state_token 解析
        pool = await self._get_or_create_pool(conn_id)
        conn = await pool.get()
        return conn_id, conn
    
    async def release(self, conn_id: str, conn: Connection):
        pool = self._pools.get(conn_id)
        if pool and not pool.full():
            await pool.put(conn)

五、OAuth 2.1 企业级集成:真实部署架构

对于企业用户,OAuth 2.1 强制化意味着需要重新设计认证架构。下面是一个基于 Keycloak 的企业级部署方案:

# docker-compose.yml: MCP Server 企业级部署架构

version: '3.9'

services:
  # MCP Server(无状态)
  mcp-server:
    image: your-org/mcp-server:latest
    environment:
      DB_URL: postgresql://db:5432/mcp
      OAUTH_ISSUER: http://keycloak:8080/realms/your-org
      MCP_PROTOCOL_VERSION: "2026-07-28"
    ports:
      - "8080:8080"
    deploy:
      replicas: 3  # ← 无状态设计支持水平扩展!
    depends_on:
      - db
      - keycloak
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 5s
      retries: 3

  # PostgreSQL(状态存储,不再存会话)
  db:
    image: postgres:16
    environment:
      POSTGRES_DB: mcp
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./migrations:/docker-entrypoint-initdb.d
    command: >
      postgres 
      -c max_connections=200
      -c shared_buffers=256MB
      -c work_mem=4MB

  # Keycloak(OAuth 2.1 授权服务器)
  keycloak:
    image: quay.io/keycloak/keycloak:24.0
    environment:
      KEYCLOAK_ADMIN: admin
      KEYCLOAK_ADMIN_PASSWORD: ${KEYCLOAK_ADMIN_PASSWORD}
      KC_HEALTH_ENABLED: true
      KC_METRICS_ENABLED: true
    command: start-dev
    volumes:
      - ./keycloak/themes:/opt/keycloak/themes
    ports:
      - "8180:8080"

  # Redis(状态缓存,可选,用于高性能场景)
  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru

  # Nginx(负载均衡 + TLS 终止)
  nginx:
    image: nginx:1.27-alpine
    ports:
      - "443:443"
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - mcp-server

volumes:
  pgdata:
# nginx.conf: 支持 Streamable HTTP 的负载均衡配置

events {
    worker_connections 1024;
}

http {
    upstream mcp_backend {
        least_conn;  # 最少连接优先,适合长连接场景
        
        server mcp-server-1:8080;
        server mcp-server-2:8080;
        server mcp-server-3:8080;
        
        keepalive 32;  # 保持长连接复用
    }
    
    server {
        listen 443 ssl http2;
        server_name api.example.com;
        
        ssl_certificate /certs/fullchain.pem;
        ssl_certificate_key /certs/privkey.pem;
        ssl_protocols TLSv1.2 TLSv1.3;
        
        # OAuth 令牌验证(通过 Nginx Auth Request Module)
        auth_request /auth/validate;
        auth_request_set $auth_status $upstream_status;
        
        location / {
            proxy_pass http://mcp_backend;
            
            # Streamable HTTP 必需的配置
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            
            # 流式响应支持
            proxy_buffering off;
            proxy_cache off;
            
            # 超时配置
            proxy_connect_timeout 10s;
            proxy_send_timeout 300s;
            proxy_read_timeout 300s;
        }
        
        # OAuth 令牌验证端点
        location /auth/validate {
            internal;
            
            proxy_pass http://keycloak:8080/realms/your-org/protocol/openid-connect/token/introspect;
            proxy_method POST;
            proxy_pass_request_body off;
            proxy_set_header Content-Type "application/x-www-form-urlencoded";
            proxy_set_body "token=$http_authorization";
            
            # 验证通过继续,不通过返回 401
            proxy_intercept_errors off;
        }
    }
}

六、迁移检查清单与时间线

日期事件行动
2026-05-21RC 发布开始评估影响范围
2026-06-24当前(本文发布)开始迁移开发
2026-07-28正式发布完成迁移,上线测试
2026-08-28过渡期结束旧规范废弃

高优先级(必须完成)

# 1. 检查代码中的废弃 API
grep -rn "initialize\|sessionId\|resources/list\|prompts/list\|sampling" ./mcp_server/

# 2. 检查 SSE 传输实现
grep -rn "EventSource\|sse\|text/event-stream" ./mcp_server/

# 3. 检查认证实现
grep -rn "api_key\|basic_auth" ./mcp_server/

迁移自检脚本

#!/usr/bin/env python3
"""
MCP 2026-07-28 规范迁移自检脚本
运行此脚本检查你的 MCP 实现是否需要迁移
"""

import ast
import sys
from pathlib import Path
from typing import Optional

class MCPMigrationChecker:
    """MCP 2026-07-28 迁移自检工具"""
    
    BREAKING_CHANGES = {
        "initialize": {
            "severity": "HIGH",
            "reason": "初始化握手已移除,改用 HTTP 头部协商",
            "fix": "删除 initialize 调用,更新 Client 实现"
        },
        "sessionId": {
            "severity": "HIGH",
            "reason": "会话机制已移除,改用显式状态句柄",
            "fix": "实现 state_token 编码/解码逻辑"
        },
        "resources/list": {
            "severity": "MEDIUM",
            "reason": "已废弃,统一到 tools/list",
            "fix": "将 resources/list 端点迁移到 tools/list"
        },
        "prompts/list": {
            "severity": "MEDIUM",
            "reason": "已废弃,改用扩展机制",
            "fix": "将 prompts 迁移到 MCP Extensions"
        },
        "sampling": {
            "severity": "LOW",
            "reason": "采样是客户端职责,不属于协议层",
            "fix": "从服务端移除 sampling 相关代码"
        },
        "EventSource": {
            "severity": "MEDIUM",
            "reason": "SSE 已废弃,改用 Streamable HTTP",
            "fix": "迁移到 httpx/aiohttp 的流式请求"
        },
    }
    
    def __init__(self, project_path: str):
        self.project_path = Path(project_path)
        self.issues: list[dict] = []
    
    def scan(self) -> None:
        """扫描项目中的所有 Python 文件"""
        for py_file in self.project_path.rglob("*.py"):
            if "node_modules" in str(py_file) or ".venv" in str(py_file):
                continue
            
            self._scan_file(py_file)
    
    def _scan_file(self, file_path: Path) -> None:
        """扫描单个文件"""
        try:
            content = file_path.read_text(encoding="utf-8")
            tree = ast.parse(content)
        except (SyntaxError, UnicodeDecodeError):
            return
        
        # 检查字符串字面量
        for node in ast.walk(tree):
            if isinstance(node, ast.Constant) and isinstance(node.value, str):
                self._check_string(file_path, node.value, node.lineno)
    
    def _check_string(self, file_path: Path, content: str, line_no: int):
        """检查字符串内容"""
        for key, info in self.BREAKING_CHANGES.items():
            if key in content:
                self.issues.append({
                    "file": str(file_path),
                    "line": line_no,
                    "pattern": key,
                    "severity": info["severity"],
                    "reason": info["reason"],
                    "fix": info["fix"]
                })
    
    def report(self) -> str:
        """生成迁移报告"""
        if not self.issues:
            return "✅ 未发现问题,你的代码已兼容 MCP 2026-07-28 规范"
        
        # 按严重程度排序
        order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
        self.issues.sort(key=lambda x: order.get(x["severity"], 3))
        
        lines = ["# MCP 2026-07-28 迁移报告", ""]
        
        high = [i for i in self.issues if i["severity"] == "HIGH"]
        medium = [i for i in self.issues if i["severity"] == "MEDIUM"]
        low = [i for i in self.issues if i["severity"] == "LOW"]
        
        if high:
            lines.append(f"## 🔴 高优先级({len(high)} 项)")
            for issue in high:
                lines.append(f"\n### {issue['file']}:{issue['line']}")
                lines.append(f"- **模式**: `{issue['pattern']}`")
                lines.append(f"- **原因**: {issue['reason']}")
                lines.append(f"- **修复**: {issue['fix']}")
        
        if medium:
            lines.append(f"\n## 🟡 中优先级({len(medium)} 项)")
            for issue in medium:
                lines.append(f"- `{issue['file']}:{issue['line']}` - {issue['pattern']}")
        
        if low:
            lines.append(f"\n## 🟢 低优先级({len(low)} 项)")
            for issue in low:
                lines.append(f"- `{issue['file']}:{issue['line']}` - {issue['pattern']}")
        
        lines.append(f"\n---\n**总计**: {len(high)} 高 + {len(medium)} 中 + {len(low)} 低 = {len(self.issues)} 项待处理")
        lines.append(f"**建议**: 在 2026-07-28 正式发布前完成高优先级项的迁移")
        
        return "\n".join(lines)


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("用法: python mcp_migration_check.py /path/to/your/mcp/project")
        sys.exit(1)
    
    checker = MCPMigrationChecker(sys.argv[1])
    checker.scan()
    print(checker.report())

七、对 AI 生态的影响与展望

开发者工具的影响

Cursor、Windsurf、Claude Code、VS Code 等主流工具需要更新其 MCP Client 实现。好消息是,新规范中的 stdio 传输方式保持了向后兼容,本地开发场景(通过命令行启动的本地 MCP Server)几乎不受影响。

真正受影响的是:

  • 远程 MCP Server 部署:必须重新设计认证和状态管理
  • 企业 MCP Gateway:需要升级 OAuth 集成
  • MCP SDK 维护者:需要发布兼容新规范的 SDK 版本

云厂商的影响

AWS、Azure、GCP 都已经提供了官方 MCP Server。这些服务需要:

  1. 升级 OAuth 2.1 实现(PKCE 强制化)
  2. 从 SSE 迁移到 Streamable HTTP
  3. 设计无状态的 Serverless 部署方案

这次规范升级的深层意义

MCP 2026-07-28 规范的意义,远不止「修 bug」或「优化性能」。它标志着 MCP 从一个实验性协议正式走向生产级基础设施

有状态 → 无状态的转变,让 MCP 终于可以:

  • 水平扩展:不再需要 sticky session,K8s HPA 自动扩缩容成为可能
  • Serverless 部署:Cloudflare Workers、AWS Lambda 可以原生支持 MCP
  • 安全合规:OAuth 2.1 强制化,满足企业安全审计要求
  • 多租户隔离:每个请求独立验证,无需担心会话混淆

MCP 从「开发者玩具」变成了真正可以承载生产流量的协议。


总结

MCP 2026-07-28 规范是一次彻底的架构升级,带来了七个方向的破坏性变更:

变更影响迁移难度
会话移除必须重写状态管理逻辑⭐⭐⭐⭐
初始化握手移除需要更新 Client SDK⭐⭐⭐
三大功能废弃迁移到统一接口/扩展⭐⭐
OAuth 2.1 强制化企业认证架构需重建⭐⭐⭐⭐
Streamable HTTP传输层升级⭐⭐⭐
错误码标准化错误处理逻辑调整
扩展机制引入新功能可选接入

45 天的迁移窗口期并不宽裕。建议立即行动,按以下顺序推进:

  1. 第一周:运行迁移自检脚本,评估影响范围
  2. 第二周:完成 OAuth 2.1 认证改造(最重要)
  3. 第三周:迁移传输层,删除 initialize 调用
  4. 第四周:替换废弃方法,全面测试
  5. 第五周(缓冲):灰度发布,监控异常

现在就去打开你的 MCP 代码,搜索 initializesessionId,开始这场迟来已久的架构升级吧。


参考资源

推荐文章

快速提升Vue3开发者的效率和界面
2025-05-11 23:37:03 +0800 CST
Go 语言实现 API 限流的最佳实践
2024-11-19 01:51:21 +0800 CST
赚点点任务系统
2024-11-19 02:17:29 +0800 CST
程序员茄子在线接单