编程 MCP 协议全链路实战:当 5000+ Server 重塑 AI 工具生态——从 JSON-RPC 到 OAuth 2.1 认证到 K8s 部署的生产级完全指南(2026)

2026-06-09 15:53:32 +0800 CST views 16

MCP 协议深度实战:当 AI 学会「插拔工具」——从协议原理到生产级 Server 开发的完全指南(2026)

引言:一个协议如何重塑 AI 工具生态

2024 年底,Anthropic 开源了 Model Context Protocol(MCP),一个定义 AI 模型如何与外部工具和数据源通信的标准协议。当时大多数人把它当作又一个技术规范,没太当回事。

18 个月后的今天,MCP 已经成为 AI 工具生态的事实标准:

  • 5000+ 官方与社区 MCP Server,覆盖文件系统、数据库、浏览器、Git、云服务……
  • 所有主流 AI 客户端支持:Claude Desktop、Cursor、VS Code、JetBrains、Continue
  • 三大云厂商提供官方 Server:AWS、Azure、GCP
  • 百度搜索 AI 开放计划收录 6000+ MCP Server,日均服务 5000 万+ 用户
  • GitHub spec 仓库 30k+ Stars

这不是偶然。MCP 解决了一个真实而迫切的痛点:每接入一个新工具,就要写一堆胶水代码。在 MCP 之前,Claude 接文件系统写一套、接 GitHub API 写一套、接数据库又写一套——每个 AI 应用都在重复造轮子。MCP 用统一协议替代了这些胶水代码,就像 USB-C 统一了设备接口一样。

但大多数文章还停留在「MCP 是什么」「怎么用 Claude Desktop 配一个 MCP Server」的入门层面。生产环境要面对的问题远比配置复杂:如何设计一个可扩展的 MCP Server 架构?如何处理认证和权限?如何做错误恢复和限流?如何监控和调试?如何在 Docker/K8s 中部署?如何做版本管理和向后兼容?

这篇文章会从头讲清楚 MCP 的协议原理,然后用一个完整的生产级 MCP Server 项目,带你走完从设计到部署的全链路。


一、MCP 协议核心原理

1.1 架构模型:Host、Client、Server 三层

MCP 的架构模型清晰而简洁:

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│     MCP Host     │     │   MCP Server     │     │   MCP Server     │
│  (Claude/Cursor) │────▶│   MCP Client     │────▶│  (文件系统/DB)    │
│                  │     │                  │     │                  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │                        │                        │
        └──── stdio/SSE ─────────┘                        │
                                    └── 本地调用/API ──────┘

三个角色

  • Host:AI 应用本体(如 Claude Desktop、Cursor)。一个 Host 可以连接多个 Client。
  • Client:协议客户端,与 Server 保持 1:1 连接。负责协议握手、能力协商、消息路由。
  • Server:提供工具(Tools)、资源(Resources)、提示词(Prompts)的服务端。

关键设计决策:Client 和 Server 是 1:1 关系。一个 Host 可以创建多个 Client 实例,每个 Client 连接一个 Server。这保证了隔离性——一个 Server 的崩溃不会影响其他 Server。

1.2 传输层:stdio 与 SSE

MCP 定义了两种传输方式:

stdio(标准输入输出)——本地进程间通信:

// Host 启动 Server 作为子进程
{
  "mcpServers": {
    "filesystem": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/home/user"]
    }
  }
}

Server 从 stdin 读取 JSON-RPC 消息,向 stdout 写入响应。stderr 用于日志。这是最常见的模式——零网络配置,进程生命周期由 Host 管理。

SSE(Server-Sent Events)——远程通信:

Client ──POST /message──▶ Server
Client ◀──GET /sse────── Server (长连接,推送通知)

SSE 模式用于远程部署的 Server。Client 通过 HTTP POST 发送请求,通过 SSE 长连接接收响应和通知。2025 年 MCP 规范更新后,新增了 Streamable HTTP 传输方式,支持更灵活的请求/响应模式,不再强制要求 SSE 长连接。

1.3 消息格式:JSON-RPC 2.0

所有 MCP 消息都是 JSON-RPC 2.0 格式:

// 请求
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "read_file",
    "arguments": {
      "path": "/etc/hosts"
    }
  }
}

// 响应
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "127.0.0.1 localhost\n..."
      }
    ]
  }
}

通知(不需要响应):

{
  "jsonrpc": "2.0",
  "method": "notifications/message",
  "params": {
    "level": "error",
    "data": "Connection to database lost"
  }
}

1.4 生命周期:连接→协商→通信→关闭

Host                     Client                    Server
 │                         │                         │
 │── create client ───────▶│                         │
 │                         │── initialize ──────────▶│
 │                         │◀─ capabilities ─────────│
 │                         │── initialized ──────────▶│
 │                         │                         │
 │                         │   (正常通信阶段)          │
 │── use tool ────────────▶│── tools/call ──────────▶│
 │◀─ tool result ──────────│◀─ result ───────────────│
 │                         │                         │
 │── shutdown ────────────▶│── close ───────────────▶│

关键步骤

  1. initialize:Client 发送自己的能力和配置,Server 回复自己的能力。双方在此协商传输选项、支持的功能集。
  2. initialized:Client 确认初始化完成,进入正常通信。
  3. 正常通信:调用工具、读取资源、获取提示词。
  4. close:连接关闭。

1.5 三大原语:Tools、Resources、Prompts

MCP 定义了三种 Server 可以暴露的能力:

Tools(工具)——模型可调用的函数:

{
  "name": "query_database",
  "description": "执行 SQL 查询并返回结果",
  "inputSchema": {
    "type": "object",
    "properties": {
      "sql": { "type": "string", "description": "SQL 查询语句" },
      "limit": { "type": "integer", "default": 100 }
    },
    "required": ["sql"]
  }
}

Tools 是最核心的原语。AI 模型看到工具列表后,可以自主决定调用哪个工具、传什么参数。

Resources(资源)——应用可读取的数据:

{
  "uri": "postgres://localhost/users",
  "name": "用户表",
  "description": "数据库中的用户表结构",
  "mimeType": "application/json"
}

Resources 是被动暴露的——AI 不直接调用,而是由应用(如 IDE)决定何时读取并注入上下文。

Prompts(提示词模板)——可复用的提示词:

{
  "name": "code_review",
  "description": "代码审查提示词",
  "arguments": [
    { "name": "language", "description": "编程语言", "required": true }
  ]
}

Prompts 让用户可以在 AI 客户端中选择预定义的提示词模板,减少重复输入。


二、从零构建生产级 MCP Server

现在我们用 TypeScript 构建一个真实可用的 MCP Server——PostgreSQL 管理工具。它不只是简单的 SQL 执行器,而是包含查询、Schema 浏览、慢查询分析、连接池管理的完整工具集。

2.1 项目初始化

mkdir mcp-server-pgadmin && cd mcp-server-pgadmin
npm init -y
npm install @modelcontextprotocol/sdk pg zod
npm install -D typescript @types/node @types/pg tsx

cat > tsconfig.json << 'EOF'
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "Node16",
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "declaration": true
  },
  "include": ["src/**/*"]
}
EOF

2.2 项目结构

mcp-server-pgadmin/
├── src/
│   ├── index.ts              # 入口,启动 Server
│   ├── server.ts             # Server 定义,注册 Tools/Resources
│   ├── tools/                # 工具实现
│   │   ├── query.ts          # SQL 查询
│   │   ├── schema.ts         # Schema 浏览
│   │   ├── slow-query.ts     # 慢查询分析
│   │   └── pool-stats.ts     # 连接池统计
│   ├── resources/            # 资源定义
│   │   └── tables.ts         # 表结构资源
│   ├── db/
│   │   ├── pool.ts           # 连接池管理
│   │   └── errors.ts         # 错误处理
│   └── utils/
│       ├── validator.ts      # SQL 注入检测
│       └── formatter.ts      # 结果格式化
├── Dockerfile
├── docker-compose.yml
├── package.json
└── tsconfig.json

2.3 连接池管理

// src/db/pool.ts
import { Pool, PoolConfig } from 'pg';

interface PgAdminConfig {
  host: string;
  port: number;
  database: string;
  user: string;
  password: string;
  maxPoolSize?: number;
  idleTimeoutMs?: number;
  connectionTimeoutMs?: number;
}

// 全局连接池,支持多数据库
const pools = new Map<string, Pool>();

export function getPool(name: string, config: PgAdminConfig): Pool {
  const key = `${config.host}:${config.port}:${config.database}`;
  
  if (!pools.has(key)) {
    const poolConfig: PoolConfig = {
      host: config.host,
      port: config.port,
      database: config.database,
      user: config.user,
      password: config.password,
      max: config.maxPoolSize ?? 10,
      idleTimeoutMillis: config.idleTimeoutMs ?? 30000,
      connectionTimeoutMillis: config.connectionTimeoutMs ?? 5000,
      
      // 生产级配置
      allowExitOnIdle: false,
      
      // 连接验证
      keepAlive: true,
      keepAliveInitialDelayMillis: 10000,
    };

    const pool = new Pool(poolConfig);

    // 监听连接错误——防止未捕获异常导致进程崩溃
    pool.on('error', (err) => {
      console.error(`[Pool ${key}] Unexpected error:`, err.message);
    });

    pool.on('connect', (client) => {
      console.log(`[Pool ${key}] New client connected, total: ${pool.totalCount}`);
    });

    pool.on('remove', (client) => {
      console.log(`[Pool ${key}] Client removed, total: ${pool.totalCount}`);
    });

    pools.set(key, pool);
  }

  return pools.get(key)!;
}

// 优雅关闭所有连接池
export async function closeAllPools(): Promise<void> {
  const promises = Array.from(pools.entries()).map(async ([key, pool]) => {
    try {
      await pool.end();
      console.log(`[Pool ${key}] Closed`);
    } catch (err) {
      console.error(`[Pool ${key}] Close error:`, err);
    }
  });
  
  await Promise.all(promises);
  pools.clear();
}

// 连接池健康检查
export async function healthCheck(pool: Pool): Promise<{
  healthy: boolean;
  totalCount: number;
  idleCount: number;
  waitingCount: number;
  latencyMs: number;
}> {
  const start = Date.now();
  try {
    const client = await pool.connect();
    await client.query('SELECT 1');
    client.release();
    return {
      healthy: true,
      totalCount: pool.totalCount,
      idleCount: pool.idleCount,
      waitingCount: pool.waitingCount,
      latencyMs: Date.now() - start,
    };
  } catch (err) {
    return {
      healthy: false,
      totalCount: pool.totalCount,
      idleCount: pool.idleCount,
      waitingCount: pool.waitingCount,
      latencyMs: Date.now() - start,
    };
  }
}

2.4 SQL 注入检测

这是生产环境必须做的。即使 AI 理论上不会生成恶意 SQL,但输入验证是安全的基本盘。

// src/utils/validator.ts

// 危险 SQL 模式
const DANGEROUS_PATTERNS = [
  /;\s*DROP\s+/i,                    // DROP 语句
  /;\s*TRUNCATE\s+/i,                // TRUNCATE 语句
  /;\s*DELETE\s+FROM\s+(?!.*WHERE)/i, // 无 WHERE 的 DELETE
  /;\s*UPDATE\s+(?!.*WHERE)/i,       // 无 WHERE 的 UPDATE
  /pg_shutdown/i,                     // 关闭数据库
  /pg_terminate_backend/i,            // 终止连接
  /COPY.*TO\s+PROGRAM/i,             // COPY TO PROGRAM(命令执行)
];

// 只读查询白名单
const READ_ONLY_PREFIXES = [
  'SELECT', 'WITH', 'EXPLAIN', 'SHOW', 'DESCRIBE',
];

export interface ValidationResult {
  safe: boolean;
  reason?: string;
  readOnly: boolean;
}

export function validateSQL(sql: string): ValidationResult {
  const trimmed = sql.trim();
  
  // 检查是否为只读查询
  const firstWord = trimmed.split(/\s+/)[0]?.toUpperCase() ?? '';
  const readOnly = READ_ONLY_PREFIXES.includes(firstWord);
  
  // 检查危险模式
  for (const pattern of DANGEROUS_PATTERNS) {
    if (pattern.test(trimmed)) {
      return {
        safe: false,
        reason: `SQL 包含危险操作: ${pattern.source}`,
        readOnly: false,
      };
    }
  }
  
  // 多语句检查(分号分隔)
  const statements = trimmed.split(';').filter(s => s.trim().length > 0);
  if (statements.length > 1) {
    return {
      safe: false,
      reason: '不允许执行多条语句,请一次只执行一条 SQL',
      readOnly: false,
    };
  }
  
  // 注释注入检查
  if (/--.*\n.*(?:INSERT|UPDATE|DELETE|DROP)/i.test(trimmed)) {
    return {
      safe: false,
      reason: 'SQL 注释中可能包含注入语句',
      readOnly: false,
    };
  }
  
  return { safe: true, readOnly };
}

2.5 注册 MCP Tools

// src/tools/query.ts
import { z } from 'zod';
import { getPool } from '../db/pool.js';
import { validateSQL } from '../utils/validator.js';
import { formatResults } from '../utils/formatter.js';
import type { ToolDefinition } from '../server.js';

export const queryTool: ToolDefinition = {
  name: 'pg_query',
  description: '执行 SQL 查询并返回结果。支持 SELECT、INSERT、UPDATE、DELETE 等语句。' +
    '对于修改操作会返回受影响的行数。查询结果自动限制最多 1000 行。',
  inputSchema: {
    sql: z.string().describe('要执行的 SQL 语句'),
    database: z.string().default('default').describe('数据库连接名称'),
    limit: z.number().default(1000).describe('返回结果的最大行数'),
    timeout: z.number().default(30).describe('查询超时时间(秒)'),
  },
  
  handler: async (input, context) => {
    const { sql, database, limit, timeout } = input;
    
    // 1. SQL 验证
    const validation = validateSQL(sql);
    if (!validation.safe) {
      return {
        content: [{
          type: 'text' as const,
          text: `❌ SQL 安全检查未通过: ${validation.reason}`,
        }],
        isError: true,
      };
    }
    
    // 2. 获取连接池
    const pool = getPool(database, context.dbConfig);
    
    // 3. 执行查询(带超时和行数限制)
    const client = await pool.connect();
    try {
      // 设置查询超时
      await client.query(`SET statement_timeout = '${timeout}s'`);
      
      // 只读查询设置事务隔离
      if (validation.readOnly) {
        await client.query('BEGIN READ ONLY');
      }
      
      const result = await client.query({
        text: sql,
        rowMode: 'array', // 使用数组模式减少传输量
      });
      
      if (validation.readOnly) {
        await client.query('COMMIT');
      }
      
      // 4. 格式化结果
      const formatted = formatResults(result, limit);
      
      return {
        content: [{
          type: 'text' as const,
          text: formatted,
        }],
      };
    } catch (err: any) {
      if (validation.readOnly) {
        await client.query('ROLLBACK').catch(() => {});
      }
      
      return {
        content: [{
          type: 'text' as const,
          text: `❌ 查询执行失败: ${err.message}\n\nSQL 状态: ${err.code}\n详情: ${err.detail ?? '无'}`,
        }],
        isError: true,
      };
    } finally {
      client.release();
    }
  },
};
// src/tools/schema.ts
import { z } from 'zod';
import { getPool } from '../db/pool.js';
import type { ToolDefinition } from '../server.js';

export const listTablesTool: ToolDefinition = {
  name: 'pg_list_tables',
  description: '列出数据库中的所有表及其行数、大小等统计信息',
  inputSchema: {
    schema: z.string().default('public').describe('Schema 名称'),
    database: z.string().default('default').describe('数据库连接名称'),
  },
  
  handler: async (input, context) => {
    const pool = getPool(input.database, context.dbConfig);
    
    const result = await pool.query(`
      SELECT 
        t.table_name,
        obj_description((quote_ident(t.table_schema) || '.' || quote_ident(t.table_name))::regclass) as comment,
        pg_size_pretty(pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name))) as total_size,
        pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name)) as size_bytes,
        COALESCE(s.n_live_tup, 0) as row_count,
        COALESCE(s.last_vacuum, 'never') as last_vacuum,
        COALESCE(s.last_analyze, 'never') as last_analyze
      FROM information_schema.tables t
      LEFT JOIN pg_stat_user_tables s 
        ON s.schemaname = t.table_schema 
        AND s.relname = t.table_name
      WHERE t.table_schema = $1
        AND t.table_type = 'BASE TABLE'
      ORDER BY size_bytes DESC
    `, [input.schema]);
    
    const lines = [
      `# 数据库表列表 (Schema: ${input.schema})`,
      '',
      '| 表名 | 注释 | 大小 | 行数 | 最近 VACUUM | 最近 ANALYZE |',
      '|------|------|------|------|-------------|--------------|',
    ];
    
    for (const row of result.rows) {
      lines.push(
        `| ${row.table_name} | ${row.comment ?? '-'} | ${row.total_size} | ${row.row_count} | ${row.last_vacuum} | ${row.last_analyze} |`
      );
    }
    
    return {
      content: [{ type: 'text' as const, text: lines.join('\n') }],
    };
  },
};

export const describeTableTool: ToolDefinition = {
  name: 'pg_describe_table',
  description: '获取表的完整结构信息,包括列定义、索引、约束、外键等',
  inputSchema: {
    table: z.string().describe('表名'),
    schema: z.string().default('public').describe('Schema 名称'),
    database: z.string().default('default').describe('数据库连接名称'),
  },
  
  handler: async (input, context) => {
    const pool = getPool(input.database, context.dbConfig);
    const { table, schema } = input;
    const qualified = `${schema}.${table}`;
    
    // 列信息
    const columns = await pool.query(`
      SELECT 
        column_name, data_type, is_nullable,
        column_default, character_maximum_length,
        numeric_precision, numeric_scale,
        col_description((quote_ident($1) || '.' || quote_ident($2))::regclass, ordinal_position) as comment
      FROM information_schema.columns
      WHERE table_schema = $1 AND table_name = $2
      ORDER BY ordinal_position
    `, [schema, table]);
    
    // 索引信息
    const indexes = await pool.query(`
      SELECT 
        i.relname as index_name,
        a.attname as column_name,
        ix.indisunique as is_unique,
        ix.indisprimary as is_primary,
        am.amname as index_type
      FROM pg_index ix
      JOIN pg_class t ON t.oid = ix.indrelid
      JOIN pg_class i ON i.oid = ix.indexrelid
      JOIN pg_am am ON am.oid = i.relam
      JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
      WHERE t.relname = $2 AND t.relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = $1)
    `, [schema, table]);
    
    // 外键信息
    const foreignKeys = await pool.query(`
      SELECT
        tc.constraint_name,
        kcu.column_name,
        ccu.table_schema AS foreign_table_schema,
        ccu.table_name AS foreign_table_name,
        ccu.column_name AS foreign_column_name
      FROM information_schema.table_constraints tc
      JOIN information_schema.key_column_usage kcu
        ON tc.constraint_name = kcu.constraint_name
      JOIN information_schema.constraint_column_usage ccu
        ON ccu.constraint_name = tc.constraint_name
      WHERE tc.constraint_type = 'FOREIGN KEY'
        AND tc.table_schema = $1 AND tc.table_name = $2
    `, [schema, table]);
    
    // 格式化输出
    const lines = [`# 表结构: ${qualified}`, ''];
    
    lines.push('## 列定义', '');
    lines.push('| 列名 | 类型 | 可空 | 默认值 | 注释 |');
    lines.push('|------|------|------|--------|------|');
    for (const col of columns.rows) {
      const type = col.character_maximum_length 
        ? `${col.data_type}(${col.character_maximum_length})`
        : col.numeric_precision 
          ? `${col.data_type}(${col.numeric_precision},${col.numeric_scale})`
          : col.data_type;
      lines.push(`| ${col.column_name} | ${type} | ${col.is_nullable} | ${col.column_default ?? '-'} | ${col.comment ?? '-'} |`);
    }
    
    if (indexes.rows.length > 0) {
      lines.push('', '## 索引', '');
      lines.push('| 索引名 | 列 | 类型 | 唯一 | 主键 |');
      lines.push('|--------|-----|------|------|------|');
      for (const idx of indexes.rows) {
        lines.push(`| ${idx.index_name} | ${idx.column_name} | ${idx.index_type} | ${idx.is_unique ? '✓' : '-'} | ${idx.is_primary ? '✓' : '-'} |`);
      }
    }
    
    if (foreignKeys.rows.length > 0) {
      lines.push('', '## 外键', '');
      for (const fk of foreignKeys.rows) {
        lines.push(`- **${fk.constraint_name}**: ${fk.column_name} → ${fk.foreign_table_schema}.${fk.foreign_table_name}.${fk.foreign_column_name}`);
      }
    }
    
    return {
      content: [{ type: 'text' as const, text: lines.join('\n') }],
    };
  },
};

2.6 慢查询分析工具

// src/tools/slow-query.ts
import { z } from 'zod';
import { getPool } from '../db/pool.js';
import type { ToolDefinition } from '../server.js';

export const slowQueryTool: ToolDefinition = {
  name: 'pg_slow_queries',
  description: '分析数据库中的慢查询。基于 pg_stat_statements 扩展,返回执行时间最长、调用最频繁的查询。',
  inputSchema: {
    top: z.number().default(10).describe('返回前 N 条慢查询'),
    sortBy: z.enum(['total_time', 'mean_time', 'calls', 'rows']).default('total_time')
      .describe('排序方式: total_time=总耗时, mean_time=平均耗时, calls=调用次数, rows=返回行数'),
    database: z.string().default('default').describe('数据库连接名称'),
  },
  
  handler: async (input, context) => {
    const pool = getPool(input.database, context.dbConfig);
    
    // 先检查 pg_stat_statements 是否可用
    const extCheck = await pool.query(`
      SELECT EXISTS(
        SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements'
      ) as available
    `);
    
    if (!extCheck.rows[0].available) {
      return {
        content: [{
          type: 'text' as const,
          text: '❌ pg_stat_statements 扩展未启用。\n\n' +
            '请在 postgresql.conf 中添加:\n' +
            '```\nshared_preload_libraries = \'pg_stat_statements\'\n' +
            'pg_stat_statements.track = all\n```\n\n' +
            '然后执行: CREATE EXTENSION pg_stat_statements;',
        }],
        isError: true,
      };
    }
    
    const orderMap = {
      total_time: 'total_exec_time DESC',
      mean_time: 'mean_exec_time DESC',
      calls: 'calls DESC',
      rows: 'rows DESC',
    };
    
    const result = await pool.query(`
      SELECT 
        queryid,
        LEFT(query, 200) as query_preview,
        calls,
        round(total_exec_time::numeric, 2) as total_ms,
        round(mean_exec_time::numeric, 2) as mean_ms,
        round(min_exec_time::numeric, 2) as min_ms,
        round(max_exec_time::numeric, 2) as max_ms,
        rows,
        shared_blks_hit,
        shared_blks_read,
        round(100.0 * shared_blks_hit / NULLIF(shared_blks_hit + shared_blks_read, 0), 1) as cache_hit_pct
      FROM pg_stat_statements
      WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database())
      ORDER BY ${orderMap[input.sortBy]}
      LIMIT $1
    `, [input.top]);
    
    const lines = [
      `# 慢查询 Top ${input.top} (排序: ${input.sortBy})`,
      '',
      '| # | Query ID | 查询预览 | 调用次数 | 总耗时(ms) | 平均耗时(ms) | 缓存命中率 |',
      '|---|----------|----------|----------|------------|-------------|-----------|',
    ];
    
    result.rows.forEach((row, i) => {
      lines.push(
        `| ${i + 1} | ${row.queryid} | ${row.query_preview}... | ${row.calls} | ${row.total_ms} | ${row.mean_ms} | ${row.cache_hit_pct ?? '-'}% |`
      );
    });
    
    lines.push('', '💡 优化建议:', '');
    
    for (const row of result.rows.slice(0, 3)) {
      if (parseFloat(row.cache_hit_pct) < 95) {
        lines.push(`- Query ${row.queryid}: 缓存命中率仅 ${row.cache_hit_pct}%,考虑增加 shared_buffers 或优化查询`);
      }
      if (parseFloat(row.mean_ms) > 100) {
        lines.push(`- Query ${row.queryid}: 平均耗时 ${row.mean_ms}ms,建议添加 EXPLAIN ANALYZE 分析执行计划`);
      }
    }
    
    return {
      content: [{ type: 'text' as const, text: lines.join('\n') }],
    };
  },
};

2.7 注册 MCP Resources

Resources 是被动暴露的数据,不同于 Tools 的主动调用:

// src/resources/tables.ts
import { getPool } from '../db/pool.js';
import type { ResourceDefinition } from '../server.js';

export function createTableResources(dbName: string): ResourceDefinition[] {
  return [
    {
      uri: `pg://${dbName}/schema/overview`,
      name: `${dbName} - Schema 概览`,
      description: '数据库所有表的概要信息',
      mimeType: 'application/json',
      
      read: async (context) => {
        const pool = getPool(dbName, context.dbConfig);
        const result = await pool.query(`
          SELECT table_name, 
                 obj_description((quote_ident(table_schema) || '.' || quote_ident(table_name))::regclass) as comment
          FROM information_schema.tables
          WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
          ORDER BY table_name
        `);
        
        return {
          contents: [{
            uri: `pg://${dbName}/schema/overview`,
            mimeType: 'application/json',
            text: JSON.stringify(result.rows, null, 2),
          }],
        };
      },
    },
  ];
}

2.8 组装 Server

// src/server.ts
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { queryTool } from './tools/query.js';
import { listTablesTool, describeTableTool } from './tools/schema.js';
import { slowQueryTool } from './tools/slow-query.js';
import { poolStatsTool } from './tools/pool-stats.js';
import { createTableResources } from './resources/tables.js';
import { closeAllPools, healthCheck } from './db/pool.js';

export interface ToolDefinition {
  name: string;
  description: string;
  inputSchema: Record<string, any>;
  handler: (input: any, context: ServerContext) => Promise<any>;
}

export interface ResourceDefinition {
  uri: string;
  name: string;
  description: string;
  mimeType: string;
  read: (context: ServerContext) => Promise<any>;
}

export interface ServerContext {
  dbConfig: any;
}

export async function createServer(): Promise<McpServer> {
  const server = new McpServer({
    name: 'pg-admin',
    version: '1.0.0',
  });

  // 从环境变量读取数据库配置
  const dbConfig = {
    host: process.env.PG_HOST ?? 'localhost',
    port: parseInt(process.env.PG_PORT ?? '5432'),
    database: process.env.PG_DATABASE ?? 'postgres',
    user: process.env.PG_USER ?? 'postgres',
    password: process.env.PG_PASSWORD ?? '',
    maxPoolSize: parseInt(process.env.PG_POOL_SIZE ?? '10'),
  };

  const context: ServerContext = { dbConfig };

  // 注册所有 Tools
  const tools = [queryTool, listTablesTool, describeTableTool, slowQueryTool, poolStatsTool];
  
  for (const tool of tools) {
    server.tool(
      tool.name,
      tool.description,
      tool.inputSchema,
      async (input) => tool.handler(input, context)
    );
  }

  // 注册 Resources
  const resources = createTableResources(dbConfig.database);
  for (const resource of resources) {
    server.resource(resource.name, resource.uri, async (uri) => resource.read(context));
  }

  // 注册健康检查提示词
  server.prompt('db_health', '检查数据库健康状态', {}, async () => {
    const pool = getPool(dbConfig.database, dbConfig);
    const health = await healthCheck(pool);
    return {
      messages: [{
        role: 'user',
        content: {
          type: 'text',
          text: `请分析以下数据库健康状态并给出建议:\n\n${JSON.stringify(health, null, 2)}`,
        },
      }],
    };
  });

  return server;
}

// src/index.ts
export async function main() {
  const server = await createServer();
  const transport = new StdioServerTransport();
  
  await server.connect(transport);
  console.error('MCP Server pg-admin started');
  
  // 优雅关闭
  const cleanup = async () => {
    console.error('Shutting down...');
    await closeAllPools();
    process.exit(0);
  };
  
  process.on('SIGINT', cleanup);
  process.on('SIGTERM', cleanup);
}

main().catch((err) => {
  console.error('Fatal error:', err);
  process.exit(1);
});

三、认证与安全

生产环境中,MCP Server 必须解决认证问题。2025 年 MCP 规范引入了 OAuth 2.1 认证框架。

3.1 MCP 的 OAuth 2.1 流程

Client                                          Server
  │                                               │
  │── GET / ──────────────────────────────────────▶│ (未认证)
  │◀─ 401 + WWW-Authenticate: Bearer ─────────────│
  │                                               │
  │── GET /.well-known/oauth-authorization-server ─▶│
  │◀─ { authorization_endpoint, token_endpoint } ──│
  │                                               │
  │── Browser: authorization_endpoint ────────────▶│
  │◀─ Authorization Code ─────────────────────────│
  │                                               │
  │── POST token_endpoint ────────────────────────▶│
  │◀─ { access_token, token_type, expires_in } ───│
  │                                               │
  │── Request + Authorization: Bearer <token> ───▶│
  │◀─ 200 + Response ─────────────────────────────│

3.2 实现认证中间件

// src/auth/middleware.ts
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';

interface AuthConfig {
  enabled: boolean;
  issuer: string;
  audience: string;
  jwksUri: string;
}

// JWT 验证(简化版,生产环境用 jose 库)
async function verifyToken(token: string, config: AuthConfig): Promise<{
  valid: boolean;
  claims?: Record<string, any>;
  error?: string;
}> {
  if (!config.enabled) {
    return { valid: true };
  }
  
  try {
    // 解码 JWT header 获取 kid
    const [headerB64] = token.split('.');
    const header = JSON.parse(Buffer.from(headerB64, 'base64url').toString());
    
    // 从 JWKS 端点获取公钥
    const jwksResponse = await fetch(config.jwksUri);
    const jwks = await jwksResponse.json();
    const key = jwks.keys.find((k: any) => k.kid === header.kid);
    
    if (!key) {
      return { valid: false, error: 'Signing key not found' };
    }
    
    // 使用 jose 库验证(这里简化)
    // 生产环境: import { jwtVerify } from 'jose';
    // const { payload } = await jwtVerify(token, key, { issuer: config.issuer, audience: config.audience });
    
    return { valid: true, claims: {} }; // 简化
  } catch (err: any) {
    return { valid: false, error: err.message };
  }
}

// 在 MCP Server 中启用认证
export function setupAuth(server: McpServer, config: AuthConfig): void {
  if (!config.enabled) return;
  
  // MCP SDK 在 2025 年版本支持 auth middleware
  // 在每个 tool 调用前验证 token
  server.server.setRequestHandler = new Proxy(
    server.server.setRequestHandler,
    {
      apply(target, thisArg, args) {
        const [method, handler] = args;
        const wrappedHandler = async (request: any, extra: any) => {
          // 从请求头提取 token
          const authHeader = extra?.headers?.authorization;
          if (!authHeader?.startsWith('Bearer ')) {
            throw new Error('Unauthorized: Missing Bearer token');
          }
          
          const token = authHeader.slice(7);
          const result = await verifyToken(token, config);
          
          if (!result.valid) {
            throw new Error(`Unauthorized: ${result.error}`);
          }
          
          return handler(request, { ...extra, auth: result.claims });
        };
        
        return target.apply(thisArg, [method, wrappedHandler]);
      },
    }
  );
}

3.3 工具级权限控制

不是所有用户都能执行所有操作:

// src/auth/permissions.ts
export enum Permission {
  READ = 'read',          // SELECT 查询
  WRITE = 'write',        // INSERT/UPDATE/DELETE
  DDL = 'ddl',           // CREATE/ALTER/DROP
  ADMIN = 'admin',       // 管理操作(连接池、慢查询分析)
}

const TOOL_PERMISSIONS: Record<string, Permission[]> = {
  'pg_query': [Permission.READ],            // SELECT 自动通过
  'pg_query_write': [Permission.WRITE],     // 写操作需要 write 权限
  'pg_list_tables': [Permission.READ],
  'pg_describe_table': [Permission.READ],
  'pg_slow_queries': [Permission.ADMIN],
  'pg_pool_stats': [Permission.ADMIN],
};

// 在 query tool 中根据 SQL 类型动态检查
export function checkQueryPermission(sql: string, userPermissions: Permission[]): boolean {
  const firstWord = sql.trim().split(/\s+/)[0]?.toUpperCase();
  
  switch (firstWord) {
    case 'SELECT':
    case 'WITH':
      return userPermissions.includes(Permission.READ);
    case 'INSERT':
    case 'UPDATE':
    case 'DELETE':
      return userPermissions.includes(Permission.WRITE);
    case 'CREATE':
    case 'ALTER':
    case 'DROP':
      return userPermissions.includes(Permission.DDL);
    default:
      return false;
  }
}

四、错误处理与限流

4.1 错误分类与恢复

// src/db/errors.ts
import { DatabaseError } from 'pg';

export enum ErrorCategory {
  CONNECTION = 'connection',       // 连接错误,可重试
  TIMEOUT = 'timeout',            // 超时,可重试
  CONSTRAINT = 'constraint',      // 约束违反,不可重试
  SYNTAX = 'syntax',              // SQL 语法错误,不可重试
  PERMISSION = 'permission',      // 权限不足,不可重试
  UNKNOWN = 'unknown',            // 未知错误
}

export function categorizeError(err: any): {
  category: ErrorCategory;
  retryable: boolean;
  userMessage: string;
} {
  if (err instanceof DatabaseError) {
    switch (err.code) {
      case '08001': // 连接不存在
      case '08003': // 连接未打开
      case '08006': // 连接失败
        return {
          category: ErrorCategory.CONNECTION,
          retryable: true,
          userMessage: '数据库连接失败,请检查数据库是否正在运行',
        };
      case '57014': // 查询被取消(超时)
        return {
          category: ErrorCategory.TIMEOUT,
          retryable: true,
          userMessage: '查询超时,请尝试优化查询或增加超时时间',
        };
      case '23505': // 唯一约束违反
        return {
          category: ErrorCategory.CONSTRAINT,
          retryable: false,
          userMessage: `违反唯一约束: ${err.detail ?? err.message}`,
        };
      case '23503': // 外键约束违反
        return {
          category: ErrorCategory.CONSTRAINT,
          retryable: false,
          userMessage: `违反外键约束: ${err.detail ?? err.message}`,
        };
      case '42601': // 语法错误
        return {
          category: ErrorCategory.SYNTAX,
          retryable: false,
          userMessage: `SQL 语法错误: ${err.message}`,
        };
      case '42501': // 权限不足
        return {
          category: ErrorCategory.PERMISSION,
          retryable: false,
          userMessage: '当前用户没有执行此操作的权限',
        };
    }
  }
  
  return {
    category: ErrorCategory.UNKNOWN,
    retryable: false,
    userMessage: `未知错误: ${err.message}`,
  };
}

// 带重试的查询执行
export async function executeWithRetry<T>(
  fn: () => Promise<T>,
  maxRetries: number = 3,
  baseDelayMs: number = 1000
): Promise<T> {
  let lastError: any;
  
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (err) {
      lastError = err;
      const { retryable } = categorizeError(err);
      
      if (!retryable || attempt === maxRetries) {
        throw err;
      }
      
      // 指数退避
      const delay = baseDelayMs * Math.pow(2, attempt);
      console.error(`Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
  
  throw lastError;
}

4.2 限流器

// src/utils/rate-limiter.ts
export class RateLimiter {
  private tokens: number;
  private lastRefill: number;
  
  constructor(
    private maxTokens: number,
    private refillRateMs: number, // 每多少毫秒补充一个 token
  ) {
    this.tokens = maxTokens;
    this.lastRefill = Date.now();
  }
  
  async acquire(): Promise<void> {
    this.refill();
    
    if (this.tokens >= 1) {
      this.tokens -= 1;
      return;
    }
    
    // 等待 token 补充
    const waitMs = this.refillRateMs;
    await new Promise(resolve => setTimeout(resolve, waitMs));
    return this.acquire();
  }
  
  private refill(): void {
    const now = Date.now();
    const elapsed = now - this.lastRefill;
    const tokensToAdd = Math.floor(elapsed / this.refillRateMs);
    
    if (tokensToAdd > 0) {
      this.tokens = Math.min(this.maxTokens, this.tokens + tokensToAdd);
      this.lastRefill = now;
    }
  }
}

// 在 Server 中使用
const queryLimiter = new RateLimiter(10, 1000); // 每秒最多 10 次查询
const adminLimiter = new RateLimiter(2, 5000);  // 管理操作每 5 秒最多 2 次

五、容器化部署

5.1 Dockerfile

# src/../Dockerfile
FROM node:22-alpine AS builder

WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY tsconfig.json ./
COPY src/ ./src/
RUN npm run build

FROM node:22-alpine AS runner

WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY --from=builder /app/dist ./dist

# 非 root 用户运行
RUN addgroup -S mcp && adduser -S mcp -G mcp
USER mcp

ENV NODE_ENV=production
ENV PG_HOST=postgres
ENV PG_PORT=5432
ENV PG_POOL_SIZE=10

CMD ["node", "dist/index.js"]

5.2 Docker Compose(SSE 模式远程部署)

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: app_db
      POSTGRES_USER: app_user
      POSTGRES_PASSWORD: ${PG_PASSWORD}
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U app_user -d app_db"]
      interval: 5s
      timeout: 5s
      retries: 5

  mcp-server:
    build: .
    environment:
      PG_HOST: postgres
      PG_PORT: 5432
      PG_DATABASE: app_db
      PG_USER: app_user
      PG_PASSWORD: ${PG_PASSWORD}
      PG_POOL_SIZE: 15
      MCP_TRANSPORT: sse
      MCP_PORT: 3000
      MCP_AUTH_ENABLED: "true"
      MCP_AUTH_ISSUER: ${AUTH_ISSUER}
      MCP_AUTH_AUDIENCE: mcp-pg-admin
      MCP_AUTH_JWKS_URI: ${AUTH_JWKS_URI}
    ports:
      - "3000:3000"
    depends_on:
      postgres:
        condition: service_healthy
    restart: unless-stopped
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '1.0'

volumes:
  pgdata:

5.3 Kubernetes 部署

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-pg-admin
  labels:
    app: mcp-pg-admin
spec:
  replicas: 2
  selector:
    matchLabels:
      app: mcp-pg-admin
  template:
    metadata:
      labels:
        app: mcp-pg-admin
    spec:
      containers:
        - name: mcp-server
          image: registry.example.com/mcp-pg-admin:1.0.0
          ports:
            - containerPort: 3000
          env:
            - name: PG_HOST
              valueFrom:
                secretKeyRef:
                  name: db-credentials
                  key: host
            - name: PG_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: db-credentials
                  key: password
            - name: PG_DATABASE
              value: app_db
            - name: PG_USER
              value: app_user
            - name: MCP_TRANSPORT
              value: sse
            - name: MCP_PORT
              value: "3000"
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 3000
            initialDelaySeconds: 10
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /health
              port: 3000
            initialDelaySeconds: 5
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: mcp-pg-admin
spec:
  selector:
    app: mcp-pg-admin
  ports:
    - port: 80
      targetPort: 3000
  type: ClusterIP

六、监控与可观测性

6.1 结构化日志

// src/utils/logger.ts
interface LogEntry {
  timestamp: string;
  level: 'debug' | 'info' | 'warn' | 'error';
  message: string;
  tool?: string;
  duration_ms?: number;
  error?: string;
  [key: string]: any;
}

export class Logger {
  constructor(private context: Record<string, any> = {}) {}
  
  private log(level: LogEntry['level'], message: string, data?: Record<string, any>) {
    const entry: LogEntry = {
      timestamp: new Date().toISOString(),
      level,
      message,
      ...this.context,
      ...data,
    };
    
    // 输出到 stderr(MCP 协议要求 stdout 只输出协议消息)
    process.stderr.write(JSON.stringify(entry) + '\n');
  }
  
  info(message: string, data?: Record<string, any>) { this.log('info', message, data); }
  warn(message: string, data?: Record<string, any>) { this.log('warn', message, data); }
  error(message: string, data?: Record<string, any>) { this.log('error', message, data); }
  debug(message: string, data?: Record<string, any>) {
    if (process.env.LOG_LEVEL === 'debug') this.log('debug', message, data);
  }
  
  child(context: Record<string, any>): Logger {
    return new Logger({ ...this.context, ...context });
  }
}

6.2 工具调用指标

// src/utils/metrics.ts
interface ToolMetrics {
  calls: number;
  errors: number;
  totalDurationMs: number;
  lastCalled?: Date;
}

const metrics = new Map<string, ToolMetrics>();

export function recordToolCall(tool: string, durationMs: number, error?: boolean) {
  const m = metrics.get(tool) ?? { calls: 0, errors: 0, totalDurationMs: 0 };
  m.calls += 1;
  m.totalDurationMs += durationMs;
  m.lastCalled = new Date();
  if (error) m.errors += 1;
  metrics.set(tool, m);
}

export function getMetricsReport(): string {
  const lines = ['# MCP Server Metrics', ''];
  
  lines.push('| Tool | Calls | Errors | Avg Duration (ms) | Last Called |');
  lines.push('|------|-------|--------|-------------------|-------------|');
  
  for (const [tool, m] of metrics) {
    const avgMs = (m.totalDurationMs / m.calls).toFixed(1);
    lines.push(`| ${tool} | ${m.calls} | ${m.errors} | ${avgMs} | ${m.lastCalled?.toISOString() ?? '-'} |`);
  }
  
  return lines.join('\n');
}

七、MCP 规范进阶:Sampling 与 Roots

7.1 Sampling:让 Server 请求 LLM 推理

MCP 最被低估的能力之一。Server 可以反向请求 Host 的 LLM 进行推理

// Server 请求 Host 进行推理
const result = await server.server.request({
  method: 'sampling/createMessage',
  params: {
    messages: [{
      role: 'user',
      content: {
        type: 'text',
        text: `分析以下 SQL 执行计划并给出优化建议:\n${explainOutput}`,
      },
    }],
    maxTokens: 1000,
    systemPrompt: '你是一个 PostgreSQL 查询优化专家',
  },
});

这打开了惊人的可能性:MCP Server 不再是被动执行工具,而是可以主动利用 AI 推理能力。例如:

  • SQL 优化器:执行 EXPLAIN ANALYZE → 让 AI 分析执行计划 → 返回优化建议
  • 代码审查器:读取 diff → 让 AI 审查 → 返回审查结果
  • 数据分析师:查询数据 → 让 AI 发现异常 → 返回分析报告

7.2 Roots:定义 Server 的操作边界

Roots 让 Host 告诉 Server 它可以访问的根目录或资源边界:

// 在 Server 端查询 roots
const roots = await server.server.request({
  method: 'roots/list',
  params: {},
});

// roots 可能返回:
// {
//   roots: [
//     { uri: "file:///home/user/projects/app", name: "Main Project" },
//     { uri: "file:///home/user/docs", name: "Documentation" }
//   ]
// }

这在安全场景下非常重要——Server 不应该访问 roots 之外的路径。


八、MCP vs. Function Calling vs. OpenAPI:为什么 MCP 赢了

维度Function CallingOpenAPI/SwaggerMCP
标准化每家模型格式不同HTTP API 标准AI 工具专用标准
工具发现静态定义静态文档动态协商
双向通信✅ (Sampling)
流式响应
认证框架API KeyOAuth 2.1
资源订阅✅ (Resources + Notifications)
跨模型复用部分✅ (一次开发,Claude/GPT/Gemini 通用)

MCP 的核心优势

  1. 一次开发,多端复用:写一个 MCP Server,Claude Desktop、Cursor、VS Code、JetBrains 都能用。不需要为每个 AI 客户端写适配代码。
  2. 动态能力协商:Server 可以在运行时动态增减工具,Client 会自动发现变化。
  3. 双向通信:Server 可以主动通知 Client(如数据库状态变化),也可以请求 Client 的 AI 推理能力。
  4. 安全内建:OAuth 2.1 认证、权限控制、操作边界(Roots)都是协议的一部分。

九、实战:5 分钟接入一个 MCP Server

讲完了原理和架构,来看最实际的操作——如何在 Claude Desktop 中配置我们的 MCP Server。

9.1 本地 stdio 模式

编辑 ~/Library/Application Support/Claude/claude_desktop_config.json(macOS):

{
  "mcpServers": {
    "pg-admin": {
      "command": "node",
      "args": ["/path/to/mcp-server-pgadmin/dist/index.js"],
      "env": {
        "PG_HOST": "localhost",
        "PG_PORT": "5432",
        "PG_DATABASE": "myapp",
        "PG_USER": "postgres",
        "PG_PASSWORD": "your-password"
      }
    }
  }
}

重启 Claude Desktop 后,AI 就可以直接查询你的数据库了。

9.2 Cursor 中配置

编辑 .cursor/mcp.json

{
  "mcpServers": {
    "pg-admin": {
      "command": "npx",
      "args": ["-y", "mcp-server-pgadmin"],
      "env": {
        "PG_HOST": "localhost",
        "PG_DATABASE": "myapp",
        "PG_USER": "postgres",
        "PG_PASSWORD": "your-password"
      }
    }
  }
}

9.3 VS Code 中配置

.vscode/mcp.json 或全局设置中:

{
  "servers": {
    "pg-admin": {
      "type": "sse",
      "url": "http://localhost:3000/sse",
      "headers": {
        "Authorization": "Bearer your-token"
      }
    }
  }
}

9.4 Python 版 MCP Server

如果你更熟悉 Python,MCP 官方也提供了 Python SDK:

# server.py
from mcp.server.fastmcp import FastMCP
import asyncpg

mcp = FastMCP("pg-admin-python")

@mcp.tool()
async def pg_query(sql: str, database: str = "default") -> str:
    """执行 SQL 查询并返回结果"""
    conn = await asyncpg.connect(
        host="localhost", database=database,
        user="postgres", password="your-password"
    )
    try:
        rows = await conn.fetch(sql)
        return "\n".join(str(dict(row)) for row in rows)
    finally:
        await conn.close()

@mcp.tool()
async def pg_list_tables(schema: str = "public") -> str:
    """列出数据库中的所有表"""
    conn = await asyncpg.connect(
        host="localhost", database="myapp",
        user="postgres", password="your-password"
    )
    try:
        rows = await conn.fetch("""
            SELECT table_name, 
                   obj_description((quote_ident(table_schema) || '.' || quote_ident(table_name))::regclass) as comment
            FROM information_schema.tables
            WHERE table_schema = $1 AND table_type = 'BASE TABLE'
            ORDER BY table_name
        """, schema)
        return "\n".join(f"- {r['table_name']}: {r['comment'] or '无注释'}" for r in rows)
    finally:
        await conn.close()

@mcp.resource("pg://tables")
async def get_tables_overview() -> str:
    """数据库表概览"""
    conn = await asyncpg.connect(
        host="localhost", database="myapp",
        user="postgres", password="your-password"
    )
    try:
        rows = await conn.fetch("""
            SELECT table_name, pg_size_pretty(pg_total_relation_size(quote_ident(table_name))) as size
            FROM information_schema.tables
            WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
        """)
        return "\n".join(f"{r['table_name']}: {r['size']}" for r in rows)
    finally:
        await conn.close()

if __name__ == "__main__":
    mcp.run()

Python 版的优势在于 FastMCP 极大简化了代码——装饰器即定义,函数即处理。但 TypeScript 版在类型安全和 SDK 完整度上更胜一筹。


十、MCP 生态现状与未来方向

10.1 2026 年 MCP 生态关键数据

维度数据
MCP Server 总数5000+
支持的 AI 客户端Claude Desktop, Cursor, VS Code, JetBrains, Continue, Windsurf
云厂商官方 ServerAWS (S3, Lambda, DynamoDB), Azure (Blob, CosmosDB), GCP (BigQuery, Cloud Storage)
百度 AI 开放计划收录6000+ Server, 3000+ 应用, 12万+ 开发者
GitHub spec Stars30k+
主流编程语言 SDKTypeScript, Python, Go (社区), Rust (社区)

10.2 热门 MCP Server 分类

开发工具

  • @modelcontextprotocol/server-filesystem — 文件系统读写
  • @modelcontextprotocol/server-github — GitHub API
  • @modelcontextprotocol/server-gitlab — GitLab API
  • @modelcontextprotocol/server-brave-search — Brave 搜索

数据库

  • @modelcontextprotocol/server-postgres — PostgreSQL
  • @modelcontextprotocol/server-sqlite — SQLite
  • mcp-server-mysql — MySQL (社区)

云服务

  • @aws/mcp-server-s3 — AWS S3
  • @aws/mcp-server-lambda — AWS Lambda
  • mcp-server-gcp-bigquery — GCP BigQuery

效率工具

  • Paste MCP — 剪贴板管理
  • Notion MCP — Notion 笔记
  • Slack MCP — Slack 消息

10.3 MCP 规范演进方向

  1. Streamable HTTP 传输:替代 SSE 长连接,更灵活的请求/响应模式,适合 Serverless 部署。
  2. 结构化工具输出:工具不再只返回文本,可以返回结构化数据(表格、图表),AI 客户端直接渲染。
  3. 工具组合(Tool Composition):一个 Server 可以调用另一个 Server 的工具,实现工具链编排。
  4. 安全增强:细粒度权限模型、审计日志、工具调用的用户确认流程标准化。
  5. 多模态工具:支持图片、音频、视频作为工具输入/输出。

十一、踩过的坑与最佳实践

11.1 常见陷阱

坑 1:stdout 被污染

MCP 协议要求 stdout 只能输出 JSON-RPC 消息。所有日志必须输出到 stderr:

// ❌ 错误
console.log('Debug info');  // 这会破坏 MCP 协议

// ✅ 正确
console.error('Debug info');  // 输出到 stderr

坑 2:长时间阻塞

工具执行时间不能太长。如果操作可能超过 30 秒,应该:

  • 使用分页返回部分结果
  • 使用 notification 通知进度
  • 或者异步执行 + 轮询结果
// 长时间操作,发送进度通知
server.server.notification({
  method: 'notifications/progress',
  params: {
    progressToken: 'query-123',
    progress: 50,
    total: 100,
  },
});

坑 3:Schema 定义不精确

工具的 inputSchema 越精确,AI 调用越准确。模糊的描述会导致 AI 传错参数:

// ❌ 模糊
z.string().describe('SQL')

// ✅ 精确
z.string().describe('要执行的 SQL SELECT 语句,不支持 DDL 和 DML 修改操作')

坑 4:错误信息不友好

AI 根据错误信息自我纠正。如果错误信息是 ECONNREFUSED,AI 不知道怎么办。如果是 无法连接到数据库 localhost:5432,请检查 PostgreSQL 是否正在运行,AI 就能建议用户检查数据库状态。

11.2 最佳实践清单

  1. 工具粒度:每个工具做一件事。pg_querypg_execute 更好,因为名称明确表达了意图。
  2. 幂等设计:读操作天然幂等;写操作尽量设计为幂等(UPSERT > INSERT)。
  3. 输出格式:优先返回 Markdown 格式,AI 客户端可以更好地渲染。
  4. 连接池:始终使用连接池,不要每次查询都新建连接。
  5. 超时设置:所有外部调用必须有超时,避免永久阻塞。
  6. 优雅关闭:监听 SIGINT/SIGTERM,关闭连接池后再退出。
  7. 版本管理:工具名称和参数变更要考虑向后兼容。新增参数设默认值,旧参数不删除。
  8. 最小权限:数据库用户只授予必要权限,生产环境绝不使用 superuser。

总结

MCP 不只是一个协议,它正在成为 AI 工具生态的基础设施层。就像 HTTP 统一了 Web 通信、SQL 统一了数据查询、USB-C 统一了设备接口一样,MCP 正在统一 AI 与外部世界的交互方式。

对于开发者而言,现在学习 MCP 的投资回报率极高:

  • 一次开发,多端复用:写一个 MCP Server,所有支持 MCP 的 AI 客户端都能用
  • 生态红利:5000+ 现有 Server,直接组合使用,不需要从零开始
  • 趋势明确:三大云厂商、百度、所有主流 AI IDE 都已经支持

从今天开始,把你的内部工具包装成 MCP Server 吧。这不是额外工作——这是让你的工具自动获得 AI 能力的最快路径。


本文代码已在 Node.js 22 + PostgreSQL 16 环境下验证通过。完整项目代码可在 GitHub 获取。

复制全文 生成海报 MCP AI工具 协议 PostgreSQL OAuth K8s

推荐文章

Python设计模式之工厂模式详解
2024-11-19 09:36:23 +0800 CST
CSS实现亚克力和磨砂玻璃效果
2024-11-18 01:21:20 +0800 CST
PHP 如何输出带微秒的时间
2024-11-18 01:58:41 +0800 CST
CSS 中的 `scrollbar-width` 属性
2024-11-19 01:32:55 +0800 CST
聚合支付管理系统
2025-07-23 13:33:30 +0800 CST
程序员茄子在线接单