编程 Pydantic AI 深度实战:当 Pydantic 团队亲自下场做 AI Agent——从类型安全到生产级可观测、从依赖注入到 Durable Execution 的完全指南(2026)

2026-06-20 04:22:53 +0800 CST views 7

Pydantic AI 深度实战:当 Pydantic 团队亲自下场做 AI Agent——从类型安全到生产级可观测、从依赖注入到 Durable Execution 的完全指南(2026)

一句话概括:Pydantic AI 是 Pydantic 团队官方出品的 AI Agent 框架,目标是把 FastAPI 那种"写完就能跑、类型全对上"的爽感,带进 GenAI 应用开发。


目录

  1. 为什么需要 Pydantic AI?
  2. Pydantic AI 是什么?核心竞争力拆解
  3. 快速上手:从零到第一个 Agent
  4. 核心概念深度解析
  5. Tools 与依赖注入:最优雅的工具系统
  6. Structured Output:类型安全的结构化输出
  7. Capabilities:可组合的能力单元
  8. RAG 实战:构建生产级知识问答系统
  9. 多 Agent 协作:从简单委托到复杂工作流
  10. Durable Execution:生产级容错与持久化
  11. Graph:用类型提示定义复杂控制流
  12. MCP & A2A:Agent 互操作与工具扩展
  13. 可观测性:与 Pydantic Logfire 深度集成
  14. Evals:系统化的 Agent 评估框架
  15. 与其他框架对比:LangChain、CrewAI、OpenAI Agents SDK
  16. 生产最佳实践与性能优化
  17. 完整实战:构建一个银行客服 Agent 系统
  18. 总结与展望

为什么需要 Pydantic AI?

一个令人尴尬的事实

Pydantic Validation 是 Python AI 生态的"隐形基础设施":

  • OpenAI SDK 用 Pydantic 做响应验证
  • Anthropic SDK 用 Pydantic 做类型约束
  • Google ADK、LangChain、LlamaIndex、CrewAI、Instructor……全部依赖 Pydantic

但当你真正用这些框架构建生产级 Agent 时,你会发现:

# LangChain 的典型体验
from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import Tool
from langchain.chat_models import ChatOpenAI

# 类型?运行时才知道
# 验证?靠运气
# IDE 补全?基本没有
# 调试?print 大法

tools = [Tool(...), Tool(...)]
agent = create_react_agent(ChatOpenAI(), prompt, tools)
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke({"input": "..."})  # 返回类型是什么?不知道

Pydantic 团队在构建 Pydantic Logfire(他们的可观测性平台)时,深入使用了 LLM,然后发现了一个痛点到骨子里的问题:

"我们用 Pydantic 验证了全世界所有 LLM SDK 的输入输出,却找不到一个框架能给我们 FastAPI 那种开发体验。"

于是,Pydantic AI 诞生了。

FastAPI 时刻

还记得 FastAPI 出现之前吗?Flask 写起来自由但类型全靠文档,Django 重得像坦克。FastAPI 用类型提示 + Pydantic 验证,一键解决了"写起来爽、跑起来稳、IDE 全知道"三大痛点。

Pydantic AI 对 GenAI 开发做了一件完全相同的事。


Pydantic AI 是什么?

官方定义FastAPI for GenAI applications and agents.

核心竞争力

能力说明
Pydantic 团队官方出品你用的验证库的原作者亲自做的 Agent 框架,兼容性拉满
完全类型安全从 Agent 定义到 Tool 参数到输出结构,全部类型提示覆盖,IDE 补全无死角
Model-agnostic支持 30+ 模型/提供商,从 OpenAI 到 Ollama 到国产模型全打通
依赖注入系统像 FastAPI 的 Depends() 一样优雅地把数据库、配置、外部服务注入到 Tool 里
Capability 组合模型把 Tools + Hooks + Instructions + Model Settings 打包成可复用单元
Durable Execution生产级容错:API 故障?进程重启?长时间 Human-in-the-Loop?进度不丢
一流可观测性与 Pydantic Logfire 深度集成,Span、Token 用量、成本、延迟全追踪
MCP/A2A 原生支持Model Context Protocol 和 Agent2Agent 协议开箱即用
Graph 支持用类型提示定义复杂控制流,避免"意大利面条式"的 Agent 逻辑

快速上手

安装

pip install pydantic-ai

# 或者用 uv(推荐)
uv pip install pydantic-ai

Hello World Agent

from pydantic_ai import Agent

# 定义 Agent:指定模型 + 系统指令
agent = Agent(
    'openai:gpt-4o',
    instructions='用中文回答,简洁有力,一句话说清楚。',
)

# 同步运行
result = agent.run_sync('为什么 Python 的 GIL 还没完全去掉?')
print(result.output)
# 输出:Python 的 GIL 正在通过 PEP 703 逐步移除,但完全移除需要多年过渡期,
#       主要因为大量 C 扩展依赖 GIL 保证线程安全,强行移除会破坏生态兼容性。

和 LangChain 的核心差异result.output 的类型是 str,IDE 知道,类型检查器知道,你写代码的时候就知道。

异步运行

import asyncio
from pydantic_ai import Agent

async def main():
    agent = Agent('anthropic:claude-sonnet-4-6')
    result = await agent.run('用 Python 实现一个线程安全的 LRU Cache')
    print(result.output)

asyncio.run(main())

核心概念深度解析

Agent

Agent 是 Pydantic AI 的核心抽象,本质上是一个可配置的 LLM 对话运行器

from pydantic_ai import Agent
from pydantic import BaseModel

class UserInfo(BaseModel):
    user_id: int
    role: str

# Agent 的完整参数
agent = Agent(
    # 模型:支持任意注册的模型
    'openai:gpt-4o-mini',
    
    # 系统指令:可以是字符串,也可以是动态生成函数
    instructions='你是一个技术专家,擅长用代码示例解释复杂概念。',
    
    # 结构化输出类型(可选)
    output_type=str,  # 也可以是 BaseModel 的子类
    
    # 名称(用于可观测性)
    name='tech-expert-agent',
    
    # 描述(会写入 OTel Span)
    description='回答技术问题的专家 Agent',
)

RunContext:依赖注入的魔法

这是 Pydantic AI 最优雅的设计之一。像 FastAPI 的 Depends(),但更强大:

from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
import asyncpg

@dataclass
class AppDeps:
    db_pool: asyncpg.Pool
    api_key: str
    debug: bool

agent = Agent(
    'openai:gpt-4o',
    deps_type=AppDeps,  # 声明依赖类型,类型检查器会帮你
)

# Tool 函数可以声明第一个参数为 RunContext[AppDeps]
# ctx.deps 就是 AppDeps 的实例,类型安全!
@agent.tool
async def query_database(
    ctx: RunContext[AppDeps],
    sql: str,
) -> list[dict]:
    """执行一条只读 SQL 查询。"""
    if ctx.deps.debug:
        print(f"Executing SQL: {sql}")
    
    async with ctx.deps.db_pool.acquire() as conn:
        rows = await conn.fetch(sql)
        return [dict(r) for r in rows]

# 运行 Agent,传入 deps
async def main():
    pool = await asyncpg.create_pool(DATABASE_URL)
    deps = AppDeps(db_pool=pool, api_key='...', debug=True)
    
    result = await agent.run(
        '查询用户表中注册人数最多的前 5 个城市',
        deps=deps,
    )

核心优势:Tool 函数签名即文档,类型检查器在写代码时就能发现错误,不需要等到运行时。


Tools 与依赖注入

定义 Tool 的两种方式

方式一:@agent.tool 装饰器(推荐)

from pydantic_ai import Agent, RunContext
from pydantic import BaseModel

agent = Agent('openai:gpt-4o')

class WeatherInfo(BaseModel):
    city: str
    temperature: float
    condition: str

@agent.tool
async def get_weather(
    ctx: RunContext,
    city: str,
) -> WeatherInfo:
    """获取指定城市的当前天气。
    
    Args:
        city: 城市名称(中文或英文)
    
    Returns:
        WeatherInfo: 包含温度、天气状况的结构化数据
    """
    # 这里是实际调用天气 API 的逻辑
    # 为了示例,返回模拟数据
    return WeatherInfo(
        city=city,
        temperature=26.5,
        condition='晴',
    )

方式二:@agent.tool_plain(无 RunContext)

@agent.tool_plain
async def calculate(expression: str) -> float:
    """安全地计算数学表达式。"""
    import ast
    import operator
    
    ops = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
    }
    
    def eval_node(node):
        if isinstance(node, ast.Num):
            return node.n
        elif isinstance(node, ast.BinOp):
            left = eval_node(node.left)
            right = eval_node(node.right)
            return ops[type(node.op)](left, right)
        raise ValueError(f"Unsupported: {type(node)}")
    
    tree = ast.parse(expression, mode='eval')
    return eval_node(tree.body)

Tool 的自动 Schema 生成

Pydantic AI 会根据 Tool 函数的签名自动生成 JSON Schema,并注入到 LLM 的 Function Calling 参数里。你不需要手写任何 Schema。

# 你写的代码
@agent.tool
async def search_documents(
    ctx: RunContext,
    query: str,
    top_k: int = 5,
    filters: dict[str, str] | None = None,
) -> list[dict]:
    """在知识库中搜索文档。"""

# LLM 看到的 Tool Schema(自动生成)
{
  "name": "search_documents",
  "description": "在知识库中搜索文档。",
  "parameters": {
    "type": "object",
    "properties": {
      "query": {"type": "string"},
      "top_k": {"type": "integer", "default": 5},
      "filters": {
        "anyOf": [
          {"type": "object", "additionalProperties": {"type": "string"}},
          {"type": "null"}
        ]
      }
    },
    "required": ["query"]
  }
}

Structured Output

为什么 Structured Output 很重要?

让 LLM 直接返回文本,然后正则解析——这是生产事故的经典来源。Pydantic AI 把输出验证作为一等公民:

from pydantic import BaseModel, Field
from pydantic_ai import Agent
from typing import Literal

class CodeReviewResult(BaseModel):
    overall_score: int = Field(ge=1, le=10, description='代码质量评分 1-10')
    issues: list[str] = Field(description='发现的问题列表')
    suggestions: list[str] = Field(description='改进建议')
    approve: bool = Field(description='是否建议合并')
    confidence: Literal['high', 'medium', 'low'] = Field(description='评审置信度')

# 指定 output_type,Agent 的输出会被验证并解析成 CodeReviewResult
reviewer = Agent(
    'anthropic:claude-sonnet-4-6',
    output_type=CodeReviewResult,
    instructions='你是一个严格的代码 reviewer。对提交的代码进行深度评审。',
)

async def review_code(code: str) -> CodeReviewResult:
    result = await reviewer.run(f'请评审以下代码:\n\n{code}')
    # result.output 已经是 CodeReviewResult 实例,类型安全!
    return result.output

# 使用
review = await review_code("""
def add(a, b):
    return a + b
""")
print(review.overall_score)  # int,不是 str
print(review.approve)         # bool

流式结构化输出

from pydantic_ai import Agent
import asyncio

agent = Agent('openai:gpt-4o', output_type=CodeReviewResult)

async def stream_review(code: str):
    async with agent.run_stream(f'请评审:\n{code}') as result:
        # 流式获取结构化输出
        async for partial in result.stream_text():
            print(partial, end='', flush=True)
        
        # 最终验证完整输出
        final = await result.get_output()
        print(f"\n最终评分:{final.overall_score}")

Capabilities:可组合的能力单元

Capability 是 Pydantic AI 的架构亮点:把 Tools + Instructions + Hooks + Model Settings 打包成一个可复用、可组合的单位。

内置 Capabilities

Thinking(思维链)

from pydantic_ai import Agent
from pydantic_ai.capabilities import Thinking

agent = Agent(
    'anthropic:claude-sonnet-4-6',
    instructions='回答复杂推理问题。',
    capabilities=[Thinking(budget_tokens=10000)],
)

WebSearch(网络搜索)

from pydantic_ai.capabilities import WebSearch

agent = Agent(
    'google:gemini-2.5-flash',
    capabilities=[WebSearch()],
)

MCP(Model Context Protocol)

from pydantic_ai.capabilities import MCP

agent = Agent(
    'openai:gpt-4o',
    capabilities=[
        MCP(server_url='http://localhost:8080/mcp'),
    ],
)

自定义 Capability

from pydantic_ai.capabilities import Capability
from pydantic_ai import Agent, RunContext

class DatabaseCapability(Capability):
    def __init__(self, db_pool):
        self.db_pool = db_pool
    
    def apply(self, agent: Agent):
        # 注册 Tool
        @agent.tool
        async def db_query(ctx: RunContext, sql: str) -> list[dict]:
            async with self.db_pool.acquire() as conn:
                return await conn.fetch(sql)
        
        # 追加 Instructions
        agent.instructions += '\n你可以使用 db_query 工具查询数据库。'
        
        # 注册 Hook(可选)
        agent.hooks.on_run_start.append(self._on_start)

# 使用
agent = Agent('openai:gpt-4o')
agent.register(DatabaseCapability(db_pool))

RAG 实战

构建一个完整的 RAG Agent

import asyncio
from dataclasses import dataclass
from typing import Literal
import numpy as np

from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from sentence_transformers import SentenceTransformer
import faiss

# ===== 数据模型 =====
class Document(BaseModel):
    id: str
    content: str
    metadata: dict[str, str]

class RetrievalResult(BaseModel):
    documents: list[Document]
    scores: list[float]

# ===== 依赖 =====
@dataclass
class RAGDeps:
    embedding_model: SentenceTransformer
    index: faiss.IndexFlatIP
    documents: dict[str, Document]
    top_k: int = 5

# ===== Agent 定义 =====
rag_agent = Agent(
    'openai:gpt-4o',
    deps_type=RAGDeps,
    output_type=str,
    instructions="""你是一个知识库问答助手。
    
规则:
1. 使用 retrieve 工具获取相关文档
2. 基于检索到的内容回答,不要编造
3. 如果检索结果不足以回答问题,明确说明
4. 引用来源文档的 id
""",
)

@rag_agent.tool
async def retrieve(
    ctx: RunContext[RAGDeps],
    query: str,
) -> RetrievalResult:
    """从知识库中检索与查询最相关的文档。"""
    # 编码查询
    query_embedding = ctx.deps.embedding_model.encode([query])[0]
    query_embedding = query_embedding / np.linalg.norm(query_embedding)
    
    # 搜索
    scores, indices = ctx.deps.index.search(
        query_embedding.reshape(1, -1).astype(np.float32),
        ctx.deps.top_k,
    )
    
    # 收集结果
    docs = []
    valid_scores = []
    for score, idx in zip(scores[0], indices[0]):
        if idx == -1:
            continue
        doc_id = f"doc_{idx}"
        if doc_id in ctx.deps.documents:
            docs.append(ctx.deps.documents[doc_id])
            valid_scores.append(float(score))
    
    return RetrievalResult(documents=docs, scores=valid_scores)

# ===== 构建知识库 =====
def build_knowledge_base(
    documents: list[Document],
    embedding_model: SentenceTransformer,
) -> tuple[faiss.IndexFlatIP, dict[str, Document]]:
    """构建 FAISS 向量索引。"""
    texts = [doc.content for doc in documents]
    embeddings = embedding_model.encode(texts)
    
    # 归一化(用于余弦相似度)
    faiss.normalize_L2(embeddings)
    
    index = faiss.IndexFlatIP(embeddings.shape[1])
    index.add(embeddings.astype(np.float32))
    
    doc_dict = {doc.id: doc for doc in documents}
    return index, doc_dict

# ===== 主流程 =====
async def rag_query(question: str, deps: RAGDeps) -> str:
    result = await rag_agent.run(question, deps=deps)
    return result.output

# ===== 使用示例 =====
async def main():
    # 1. 准备文档
    docs = [
        Document(
            id='doc_0',
            content='Pydantic AI 是 Pydantic 团队开发的 AI Agent 框架...',
            metadata={'source': 'official_docs'},
        ),
        # ... 更多文档
    ]
    
    # 2. 构建索引
    model = SentenceTransformer('BAAI/bge-m3')
    index, doc_dict = build_knowledge_base(docs, model)
    
    # 3. 构建依赖
    deps = RAGDeps(
        embedding_model=model,
        index=index,
        documents=doc_dict,
        top_k=3,
    )
    
    # 4. 查询
    answer = await rag_query('Pydantic AI 和 LangChain 的核心区别是什么?', deps)
    print(answer)

if __name__ == '__main__':
    asyncio.run(main())

多 Agent 协作

简单委托:一个 Agent 调用另一个 Agent

from pydantic_ai import Agent

# 专家 Agent
python_expert = Agent(
    'openai:gpt-4o',
    name='python-expert',
    instructions='你是 Python 核心开发者,回答所有 Python 相关问题。',
)

rust_expert = Agent(
    'openai:gpt-4o',
    name='rust-expert',
    instructions='你是 Rust 核心开发者,回答所有 Rust 相关问题。',
)

# 路由 Agent
router = Agent(
    'openai:gpt-4o',
    instructions="""你是技术支持路由。
    根据用户问题选择合适的专家回答。
    可用专家:python-expert, rust-expert
    """,
)

# 注册委托 Tool
@router.tool_plain
async def ask_python_expert(question: str) -> str:
    """向 Python 专家提问。"""
    result = await python_expert.run(question)
    return result.output

@router.tool_plain
async def ask_rust_expert(question: str) -> str:
    """向 Rust 专家提问。"""
    result = await rust_expert.run(question)
    return result.output

# 使用
result = await router.run('Python 的 asyncio 和 Rust 的 tokio 有什么区别?')

使用 Graph 定义复杂工作流

from pydantic_ai.graph import Graph, Node
from pydantic_ai import Agent
from typing import TypedDict

class WorkflowState(TypedDict):
    query: str
    research_result: str | None
    draft: str | None
    review_comments: list[str] | None

# 定义节点
research_agent = Agent('openai:gpt-4o', name='researcher')
writing_agent = Agent('openai:gpt-4o', name='writer')
review_agent = Agent('openai:gpt-4o', name='reviewer')

# 构建 Graph
graph = Graph[WorkflowState]()

@graph.node
async def research(state: WorkflowState) -> WorkflowState:
    result = await research_agent.run(f"研究这个主题:{state['query']}")
    state['research_result'] = result.output
    return state

@graph.node
async def write(state: WorkflowState) -> WorkflowState:
    result = await writing_agent.run(
        f"基于以下研究内容写一篇文章:\n{state['research_result']}"
    )
    state['draft'] = result.output
    return state

@graph.node
async def review(state: WorkflowState) -> WorkflowState:
    result = await review_agent.run(
        f"审查这篇文章,给出具体修改建议:\n{state['draft']}"
    )
    # 解析评论(实际中应该用结构化输出)
    state['review_comments'] = [result.output]
    return state

@graph.node
async def revise(state: WorkflowState) -> WorkflowState:
    result = await writing_agent.run(
        f"根据以下评论修改文章:\n{state['draft']}\n\n评论:{state['review_comments']}"
    )
    state['draft'] = result.output
    return state

# 定义边(控制流)
graph.add_edge('research', 'write')
graph.add_edge('write', 'review')
graph.add_edge('review', 'revise', condition=lambda s: len(s['review_comments']) > 0)
graph.add_edge('revise', 'review')  # 循环直到满意

# 执行
result = await graph.run({'query': 'Pydantic AI 的核心优势'})

Durable Execution

为什么需要 Durable Execution?

生产环境中的 Agent 面临这些现实问题:

  1. LLM API 临时故障:网络抖动、限流、模型维护
  2. 长时间 Human-in-the-Loop:等待人工审批工具调用
  3. 进程重启:部署、崩溃、扩缩容
  4. 多步骤长流程:执行到一半失败,不能从头再来

Durable Execution 让 Agent 的执行状态持久化到数据库,可以随时暂停、恢复、重试。

使用 Durable Execution

from pydantic_ai.durable import DurableAgent, SqliteStateStore
from pydantic_ai import Agent, RunContext

# 1. 创建状态存储
state_store = SqliteStateStore('agent_state.db')

# 2. 创建 Durable Agent
agent = DurableAgent(
    'openai:gpt-4o',
    state_store=state_store,
    instructions='你是一个需要多步骤完成的任务执行器。',
)

@agent.tool
async def send_email(
    ctx: RunContext,
    to: str,
    subject: str,
    body: str,
) -> str:
    """发送邮件。需要人工审批。"""
    # 这个工具需要人工审批,会暂停执行
    # 直到人工调用 approve_tool(tool_call_id)
    return f"邮件已发送 to {to}"

# 3. 启动执行(可以中途停止,下次从断点继续)
run_id = 'my-unique-run-id'

# 第一次运行(可能在中途暂停)
try:
    result = await agent.run_durable(
        '帮我给 team@company.com 发邮件,主题是周报',
        run_id=run_id,
    )
except InterruptedForApproval as e:
    print(f"工具调用需要审批:{e.tool_call_id}")
    print(f"工具名称:{e.tool_name}")
    print(f"参数:{e.tool_args}")
    # 这里可以展示给人工审批界面
    # 人工审批后调用:
    # await agent.approve_tool(run_id, e.tool_call_id)
    # 然后重新调用 run_durable,会从断点继续执行

# 如果之前暂停了,恢复执行
result = await agent.run_durable(
    '继续上次的任务',
    run_id=run_id,  # 同一个 run_id,从上次断点继续
)

自定义状态存储

from pydantic_ai.durable import StateStore, ExecutionState
from dataclasses import dataclass
import json
import asyncpg

class PostgresStateStore(StateStore):
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
    
    async def save(self, state: ExecutionState):
        await self.pool.execute("""
            INSERT INTO agent_state (run_id, state, updated_at)
            VALUES ($1, $2, NOW())
            ON CONFLICT (run_id) DO UPDATE
            SET state = EXCLUDED.state, updated_at = NOW()
        """, state.run_id, json.dumps(state.model_dump()))
    
    async def load(self, run_id: str) -> ExecutionState | None:
        row = await self.pool.fetchrow(
            "SELECT state FROM agent_state WHERE run_id = $1",
            run_id,
        )
        if row:
            return ExecutionState.model_validate(json.loads(row['state']))
        return None
    
    async def delete(self, run_id: str):
        await self.pool.execute(
            "DELETE FROM agent_state WHERE run_id = $1",
            run_id,
        )

Graph

为什么需要 Graph?

当 Agent 的逻辑变得复杂,你会写出这样的代码:

# 意大利面条式控制流
if condition_a:
    result = await agent_a.run(...)
    if condition_b:
        result2 = await agent_b.run(...)
        if condition_c:
            # 嵌套越来越深...

Graph 让你用声明式的方式定义控制流,并用类型提示保证正确性。

定义 Graph

from pydantic_ai.graph import Graph, Node, Edge
from pydantic import BaseModel
from typing import Literal

# 状态类型
class AnalysisState(BaseModel):
    code: str
    language: Literal['python', 'rust', 'go']
    style_score: float | None = None
    bug_score: float | None = None
    performance_score: float | None = None
    final_report: str | None = None

# 创建 Graph
graph = Graph[AnalysisState]()

# 节点 1:风格分析
@graph.node
async def analyze_style(state: AnalysisState) -> AnalysisState:
    agent = Agent('openai:gpt-4o', output_type=dict)
    result = await agent.run(f"分析以下代码的风格质量(0-10分):\n{state.code}")
    state.style_score = result.output['score']
    return state

# 节点 2:Bug 分析
@graph.node
async def analyze_bugs(state: AnalysisState) -> AnalysisState:
    agent = Agent('openai:gpt-4o', output_type=dict)
    result = await agent.run(f"找出以下代码中的潜在 Bug:\n{state.code}")
    state.bug_score = result.output['score']
    return state

# 节点 3:性能分析
@graph.node
async def analyze_performance(state: AnalysisState) -> AnalysisState:
    agent = Agent('openai:gpt-4o', output_type=dict)
    result = await agent.run(f"评估以下代码的性能(0-10分):\n{state.code}")
    state.performance_score = result.output['score']
    return state

# 节点 4:生成报告
@graph.node
async def generate_report(state: AnalysisState) -> AnalysisState:
    report = f"""
# 代码分析报告

- 语言:{state.language}
- 风格评分:{state.style_score}/10
- Bug 安全评分:{state.bug_score}/10
- 性能评分:{state.performance_score}/10

## 总体评价
{"需要改进" if sum(v or 0 for v in [state.style_score, state.bug_score, state.performance_score]) / 3 < 6 else "质量良好"}
"""
    state.final_report = report
    return state

# 定义执行顺序(可以并行!)
graph.add_edge('__start__', ['analyze_style', 'analyze_bugs', 'analyze_performance'])
# 上面三个节点并行执行,全部完成后才执行下一个
graph.add_edge(['analyze_style', 'analyze_bugs', 'analyze_performance'], 'generate_report')
graph.add_edge('generate_report', '__end__')

# 执行
result = await graph.run(AnalysisState(
    code='def add(a, b): return a + b',
    language='python',
))
print(result.final_report)

MCP & A2A

MCP(Model Context Protocol)

MCP 是 Anthropic 推出的标准协议,让 AI 应用可以访问外部工具和数据源。

from pydantic_ai import Agent
from pydantic_ai.capabilities import MCP

# 方式一:通过 URL 连接 MCP 服务器
agent = Agent(
    'openai:gpt-4o',
    capabilities=[
        MCP(server_url='https://mcp-server.example.com/mcp'),
    ],
)

# 方式二:本地 MCP 服务器(通过 stdio)
agent = Agent(
    'openai:gpt-4o',
    capabilities=[
        MCP(command='npx', args=['-y', '@modelcontextprotocol/server-filesystem', '/tmp']),
    ],
)

# 使用(MCP 服务器提供的 Tools 会自动注册到 Agent)
result = await agent.run('读取 /tmp/README.md 的内容')

A2A(Agent2Agent)

A2A 是 Google 推出的 Agent 互操作协议。

from pydantic_ai import Agent
from pydantic_ai.a2a import A2AClient, A2AAgentCard

# 发现远程 Agent
async def discover_agent(url: str) -> A2AAgentCard:
    client = A2AClient()
    return await client.fetch_agent_card(url)

# 调用远程 Agent
async def call_remote_agent():
    client = A2AClient()
    
    result = await client.run(
        agent_url='https://other-agent.example.com/a2a',
        message='帮我分析这段代码的时间复杂度',
        # 可以传递 Artifacts(文件、图片等)
        artifacts=[{'type': 'text', 'content': 'def foo(n):\n    for i in range(n):\n        print(i)'}],
    )
    
    print(result.output)

可观测性

与 Pydantic Logfire 集成

import logfire
from pydantic_ai import Agent

# 初始化 Logfire(会自动追踪所有 Pydantic AI 调用)
logfire.configure(
    token='your-logfire-token',
    service_name='my-agent-service',
    environment='production',
)

agent = Agent('openai:gpt-4o')

# 每次 run() 都会自动生成 OTel Span:
# - gen_ai.agent.name
# - gen_ai.request.model
# - gen_ai.response.usage.input_tokens
# - gen_ai.response.usage.output_tokens
# - gen_ai.cost.total (如果模型支持)
result = await agent.run('分析问题')

在 Logfire 仪表板看到的

  • 每个 Agent 调用的 Trace:完整的 LLM 请求/响应周期
  • Tool 调用明细:每个 Tool 的参数、返回值、耗时
  • Token 用量统计:按模型、按 Agent、按时间聚合
  • 成本追踪:实时计算 API 花费
  • 延迟分布:P50/P95/P99 延迟

使用其他 OTel 后端

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc import OTLPSpanExporter

# 使用自定义的 OTel 后端(比如 Jaeger、Zipkin)
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint='http://jaeger:4317')
trace.set_tracer_provider(provider)

# Pydantic AI 会自动使用全局 TracerProvider
agent = Agent('openai:gpt-4o')

Evals

为什么需要 Evals?

Agent 系统不是"写一次就完事"的软件。你需要:

  1. 回归测试:改了 Prompt,会不会让之前的好结果变坏?
  2. 版本对比:新模型比旧模型好多少?
  3. 边缘案例覆盖:Agent 在边界情况下会不会翻车?

构建 Eval 套件

from pydantic_ai.evals import EvalSuite, EvalCase
from pydantic_ai import Agent
import pytest

# 定义测试套件
suite = EvalSuite(
    name='code-review-agent',
    description='测试代码评审 Agent 的质量',
)

# 添加测试用例
suite.add_case(EvalCase(
    input='def add(a, b): return a + b',
    expected_output={
        'overall_score': 7,  # 期望评分在 7 左右
        'approve': True,      # 期望建议合并
    },
    rubric='简单的加法函数,质量尚可,应该通过',
))

suite.add_case(EvalCase(
    input='def unsafe_eval(s): return eval(s)',
    expected_output={
        'overall_score': 2,  # 期望低分
        'approve': False,     # 期望不建议合并
    },
    rubric='使用 eval 有不安全风险,应该拒绝',
))

# 运行 Eval
@pytest.mark.asyncio
async def test_code_review_agent():
    agent = Agent('openai:gpt-4o', output_type=CodeReviewResult)
    
    results = await suite.run(agent, deps=None)
    
    # 分析结果
    for result in results:
        print(f"Input: {result.case.input[:50]}...")
        print(f"Expected: {result.case.expected_output}")
        print(f"Actual: {result.output.model_dump()}")
        print(f"Match: {result.matches}")
        print("---")
    
    # 断言整体质量
    assert results.average_score() > 0.7  # 70% 以上的用例通过

在 Logfire 中查看 Eval 结果

Pydantic AI 的 Eval 结果会自动写入 Logfire,你可以在仪表板中:

  • 查看每次 Eval 运行的详细结果
  • 对比不同版本的性能
  • 设置质量门槛(Evals 不通过不允许部署)

框架对比

维度Pydantic AILangChainCrewAIOpenAI Agents SDK
类型安全✅ 完整❌ 运行时❌ 运行时⚠️ 部分
学习曲线低(类似 FastAPI)高(抽象层次多)
性能高(无冗余抽象)低(中间层多)
模型支持30+20+10+OpenAI 为主
可观测性原生 Logfire需集成需集成基础
MCP 支持✅ 原生⚠️ 第三方
Durable Execution✅ 原生⚠️ 部分
文档质量高(完整类型提示)中(示例为主)
生产成熟度中(较新)

选择建议

  • 新项目 + Python + 重视类型安全 → Pydantic AI
  • 已有 LangChain 代码 → 渐进迁移(Pydantic AI 可以和 LangChain 共存)
  • 多 Agent 协作场景 → Pydantic AI(Graph 支持更好)
  • 需要极致简单的 Agent → OpenAI Agents SDK

生产最佳实践

1. 总是使用 Structured Output

# ❌ 不好:让 LLM 返回文本,然后自己解析
agent = Agent('openai:gpt-4o')
result = await agent.run('提取用户输入中的姓名和年龄')
# 需要正则/JSON 解析,容易出错

# ✅ 好:使用 Structured Output
class UserInfo(BaseModel):
    name: str
    age: int | None

agent = Agent('openai:gpt-4o', output_type=UserInfo)
result = await agent.run('我叫张三,今年 28 岁')
# result.output 已经是 UserInfo 实例

2. 为 Tool 写清晰的 Docstring

LLM 靠 Tool 的 docstring 理解工具的用途和参数。写得越好,Tool 调用越准确。

# ❌ 不好
@agent.tool
async def search(ctx, q):
    ...

# ✅ 好
@agent.tool
async def search(
    ctx: RunContext,
    query: str = Field(description='搜索关键词,支持布尔运算符'),
    max_results: int = Field(default=10, description='最大返回结果数,1-100', ge=1, le=100),
    include_archived: bool = Field(default=False, description='是否包含已归档的文档'),
) -> list[dict]:
    """在知识库中搜索文档。
    
    支持的功能:
    - 全文搜索
    - 布尔查询(AND, OR, NOT)
    - 过滤已归档文档
    
    注意事项:
    - 搜索结果按相关性排序
    - 如果没有找到结果,返回空列表而不是报错
    """
    ...

3. 使用 Durable Execution 保护长时间任务

# 对于任何可能需要人工审批或多步骤的任务,使用 Durable Execution
agent = DurableAgent(
    'openai:gpt-4o',
    state_store=PostgresStateStore(pool),
)

4. 设置合理的 Timeout 和 Retry

from pydantic_ai import Agent
from pydantic_ai.models import ModelSettings

agent = Agent(
    'openai:gpt-4o',
    model_settings=ModelSettings(
        timeout=30.0,        # 30 秒超时
        max_retries=3,        # 最多重试 3 次
        temperature=0.1,      # 低温度,更稳定
    ),
)

5. 用 Logfire 监控生产环境

import logfire

# 在 production 中一定要配置 Logfire
logfire.configure(
    token='prod-token',
    service_name='agent-service',
    environment='production',
    metrics=True,  # 启用指标收集
    tracing=True,   # 启用追踪
)

完整实战:构建一个银行客服 Agent 系统

需求

构建一个银行客服 Agent,能够:

  1. 查询账户余额
  2. 查询交易历史
  3. 转账(需要人工审批)
  4. 回答常见问题(RAG)
  5. 如果问题超出能力范围,转人工

完整代码

from dataclasses import dataclass
from typing import Literal
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext, DurableAgent, SqliteStateStore
import asyncpg
from datetime import datetime

# ===== 数据模型 =====
class AccountInfo(BaseModel):
    account_id: str
    customer_name: str
    balance: float
    currency: str

class Transaction(BaseModel):
    id: str
    amount: float
    description: str
    timestamp: datetime
    type: Literal['credit', 'debit']

class TransferResult(BaseModel):
    success: bool
    transaction_id: str | None = None
    message: str

# ===== 依赖 =====
@dataclass
class BankDeps:
    db: asyncpg.Pool
    customer_id: str
    is_authenticated: bool
    require_approval_for_amount_over: float = 10000.0

# ===== Agent 定义 =====
bank_agent = Agent(
    'openai:gpt-4o',
    deps_type=BankDeps,
    instructions="""你是一个银行客服助手。
    
规则:
1. 所有操作前必须先验证用户身份(使用 get_customer_info 工具)
2. 余额和交易信息要清晰展示
3. 转账超过 10000 元需要人工审批
4. 无法处理的问题,告知客户会转接人工客服
5. 始终使用中文回复,语气专业且友好
""",
)

# ===== Tools =====
@bank_agent.tool
async def get_customer_info(ctx: RunContext[BankDeps]) -> AccountInfo:
    """获取当前客户的账户信息。"""
    if not ctx.deps.is_authenticated:
        raise ValueError("用户未认证")
    
    row = await ctx.deps.db.fetchrow(
        "SELECT a.account_id, c.name, a.balance, a.currency "
        "FROM accounts a JOIN customers c ON a.customer_id = c.id "
        "WHERE c.id = $1",
        ctx.deps.customer_id,
    )
    
    return AccountInfo(
        account_id=row['account_id'],
        customer_name=row['name'],
        balance=float(row['balance']),
        currency=row['currency'],
    )

@bank_agent.tool
async def get_transaction_history(
    ctx: RunContext[BankDeps],
    limit: int = 10,
) -> list[Transaction]:
    """获取客户的交易历史。"""
    if not ctx.deps.is_authenticated:
        raise ValueError("用户未认证")
    
    rows = await ctx.deps.db.fetch(
        "SELECT id, amount, description, created_at, type "
        "FROM transactions "
        "WHERE customer_id = $1 "
        "ORDER BY created_at DESC LIMIT $2",
        ctx.deps.customer_id,
        limit,
    )
    
    return [
        Transaction(
            id=row['id'],
            amount=float(row['amount']),
            description=row['description'],
            timestamp=row['created_at'],
            type=row['type'],
        )
        for row in rows
    ]

@bank_agent.tool
async def initiate_transfer(
    ctx: RunContext[BankDeps],
    to_account: str,
    amount: float,
    description: str,
) -> TransferResult:
    """发起转账。
    
    注意:
    - 如果金额超过 10000 元,需要人工审批
    - 转账会在审批通过后执行
    """
    if not ctx.deps.is_authenticated:
        return TransferResult(success=False, message="用户未认证")
    
    if amount > ctx.deps.require_approval_for_amount_over:
        # 触发人工审批(Durable Execution 会暂停在这里)
        return TransferResult(
            success=False,
            message=f"转账金额 {amount} 元超过限额,需要人工审批。工单已创建。",
        )
    
    # 执行转账(实际中应该在数据库事务中)
    tx_id = f"TX{datetime.now().strftime('%Y%m%d%H%M%S')}"
    
    # 这里省略数据库操作...
    
    return TransferResult(
        success=True,
        transaction_id=tx_id,
        message=f"转账成功。交易 ID:{tx_id}",
    )

@bank_agent.tool
async def search_faq(ctx: RunContext[BankDeps], query: str) -> str:
    """搜索常见问题解答。"""
    # 实际中应该用向量搜索
    faqs = {
        '手续费': '本行同城转账免手续费,跨行转账按金额 0.1% 收取,最低 2 元,最高 50 元。',
        '密码': '您可以通过手机银行 APP 的"忘记密码"功能重置密码,或携带身份证到柜台办理。',
        '限额': '默认单日转账限额为 50000 元。您可以在手机银行中调整,最高不超过 500000 元。',
    }
    
    for key, answer in faqs.items():
        if key in query:
            return answer
    
    return "抱歉,我无法找到相关问题的答案。我会帮您转接人工客服。"

# ===== 主流程 =====
async def handle_customer_query(
    customer_id: str,
    query: str,
    db_pool: asyncpg.Pool,
) -> str:
    # 1. 认证(实际中应该用 JWT/OAuth)
    is_auth = await authenticate_customer(customer_id, db_pool)
    
    # 2. 构建依赖
    deps = BankDeps(
        db=db_pool,
        customer_id=customer_id,
        is_authenticated=is_auth,
    )
    
    # 3. 使用 Durable Execution(转账可能需要审批)
    state_store = SqliteStateStore('bank_agent_state.db')
    durable_agent = DurableAgent.from_agent(
        bank_agent,
        state_store=state_store,
    )
    
    run_id = f"bank_{customer_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
    
    # 4. 运行
    try:
        result = await durable_agent.run_durable(
            query,
            deps=deps,
            run_id=run_id,
        )
        return result.output
    except InterruptedForApproval as e:
        return f"您的请求需要人工审批。审批编号:{e.tool_call_id}"

async def authenticate_customer(customer_id: str, db: asyncpg.Pool) -> bool:
    """模拟认证。实际中应该用 JWT/Session。"""
    row = await db.fetchrow(
        "SELECT COUNT(*) FROM active_sessions WHERE customer_id = $1 AND expires_at > NOW()",
        customer_id,
    )
    return row['count'] > 0

# ===== 使用示例 =====
async def main():
    # 初始化数据库连接
    pool = await asyncpg.create_pool(DATABASE_URL)
    
    # 处理客户查询
    response = await handle_customer_query(
        customer_id='CUST_001',
        query='我想查一下我的账户余额,还有最近的 5 笔交易',
        db_pool=pool,
    )
    
    print(response)

if __name__ == '__main__':
    asyncio.run(main())

总结与展望

Pydantic AI 的核心价值

  1. 类型安全带来开发效率:写代码时就能发现错误,而不是运行时
  2. Pydantic 生态原生集成:如果你已经用 Pydantic(几乎每个 Python AI 项目都用),迁移成本极低
  3. 生产级特性开箱即用:Durable Execution、MCP、A2A、Logfire 集成,这些都是其他框架需要自己拼装的
  4. 简洁优先的设计哲学:没有复杂的抽象层次,代码读起来就像在执行逻辑

适合场景

  • ✅ Python 技术栈的团队
  • ✅ 需要类型安全和 IDE 支持的项目
  • ✅ 生产级 Agent 系统(需要容错、可观测)
  • ✅ 需要 MCP/A2A 互操作的场景

不适合场景

  • ❌ 非 Python 技术栈(考虑 Mastra for TypeScript)
  • ❌ 简单的一次性脚本(直接用 OpenAI SDK 更快)
  • ❌ 需要大量自定义 LLM 调用逻辑的底层研究

未来展望

Pydantic AI 还在快速迭代中(GitHub 每天都有新 Commit),根据路线图:

  1. 更多内置 Capability:更多开箱即用的能力单元
  2. 更好的多模态支持:图片、音频、视频作为输入/输出
  3. 更强的 Graph 功能:条件分支、循环、并行执行的声明式定义
  4. Agent 市场:分享和发现社区构建的 Capability

参考资源

  • 官方文档:https://ai.pydantic.dev/
  • GitHub 仓库:https://github.com/pydantic/pydantic-ai
  • Pydantic Logfire:https://pydantic.dev/logfire
  • Discord 社区:https://discord.gg/Ee4DHTavGW

作者注:本文基于 Pydantic AI 2026 年 6 月最新版本(v1.80+)撰写。框架迭代迅速,建议结合官方文档使用。


字数统计:约 12,800 字

原文首发于 程序员茄子,转载请注明出处。

推荐文章

Python中何时应该使用异常处理
2024-11-19 01:16:28 +0800 CST
开发外贸客户的推荐网站
2024-11-17 04:44:05 +0800 CST
html5在客户端存储数据
2024-11-17 05:02:17 +0800 CST
程序员茄子在线接单