Pydantic AI 深度实战:当 Pydantic 团队亲自下场做 AI Agent——从类型安全到生产级可观测、从依赖注入到 Durable Execution 的完全指南(2026)
一句话概括:Pydantic AI 是 Pydantic 团队官方出品的 AI Agent 框架,目标是把 FastAPI 那种"写完就能跑、类型全对上"的爽感,带进 GenAI 应用开发。
目录
- 为什么需要 Pydantic AI?
- Pydantic AI 是什么?核心竞争力拆解
- 快速上手:从零到第一个 Agent
- 核心概念深度解析
- Tools 与依赖注入:最优雅的工具系统
- Structured Output:类型安全的结构化输出
- Capabilities:可组合的能力单元
- RAG 实战:构建生产级知识问答系统
- 多 Agent 协作:从简单委托到复杂工作流
- Durable Execution:生产级容错与持久化
- Graph:用类型提示定义复杂控制流
- MCP & A2A:Agent 互操作与工具扩展
- 可观测性:与 Pydantic Logfire 深度集成
- Evals:系统化的 Agent 评估框架
- 与其他框架对比:LangChain、CrewAI、OpenAI Agents SDK
- 生产最佳实践与性能优化
- 完整实战:构建一个银行客服 Agent 系统
- 总结与展望
为什么需要 Pydantic AI?
一个令人尴尬的事实
Pydantic Validation 是 Python AI 生态的"隐形基础设施":
- OpenAI SDK 用 Pydantic 做响应验证
- Anthropic SDK 用 Pydantic 做类型约束
- Google ADK、LangChain、LlamaIndex、CrewAI、Instructor……全部依赖 Pydantic
但当你真正用这些框架构建生产级 Agent 时,你会发现:
# LangChain 的典型体验
from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import Tool
from langchain.chat_models import ChatOpenAI
# 类型?运行时才知道
# 验证?靠运气
# IDE 补全?基本没有
# 调试?print 大法
tools = [Tool(...), Tool(...)]
agent = create_react_agent(ChatOpenAI(), prompt, tools)
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke({"input": "..."}) # 返回类型是什么?不知道
Pydantic 团队在构建 Pydantic Logfire(他们的可观测性平台)时,深入使用了 LLM,然后发现了一个痛点到骨子里的问题:
"我们用 Pydantic 验证了全世界所有 LLM SDK 的输入输出,却找不到一个框架能给我们 FastAPI 那种开发体验。"
于是,Pydantic AI 诞生了。
FastAPI 时刻
还记得 FastAPI 出现之前吗?Flask 写起来自由但类型全靠文档,Django 重得像坦克。FastAPI 用类型提示 + Pydantic 验证,一键解决了"写起来爽、跑起来稳、IDE 全知道"三大痛点。
Pydantic AI 对 GenAI 开发做了一件完全相同的事。
Pydantic AI 是什么?
官方定义:FastAPI for GenAI applications and agents.
核心竞争力
| 能力 | 说明 |
|---|---|
| Pydantic 团队官方出品 | 你用的验证库的原作者亲自做的 Agent 框架,兼容性拉满 |
| 完全类型安全 | 从 Agent 定义到 Tool 参数到输出结构,全部类型提示覆盖,IDE 补全无死角 |
| Model-agnostic | 支持 30+ 模型/提供商,从 OpenAI 到 Ollama 到国产模型全打通 |
| 依赖注入系统 | 像 FastAPI 的 Depends() 一样优雅地把数据库、配置、外部服务注入到 Tool 里 |
| Capability 组合模型 | 把 Tools + Hooks + Instructions + Model Settings 打包成可复用单元 |
| Durable Execution | 生产级容错:API 故障?进程重启?长时间 Human-in-the-Loop?进度不丢 |
| 一流可观测性 | 与 Pydantic Logfire 深度集成,Span、Token 用量、成本、延迟全追踪 |
| MCP/A2A 原生支持 | Model Context Protocol 和 Agent2Agent 协议开箱即用 |
| Graph 支持 | 用类型提示定义复杂控制流,避免"意大利面条式"的 Agent 逻辑 |
快速上手
安装
pip install pydantic-ai
# 或者用 uv(推荐)
uv pip install pydantic-ai
Hello World Agent
from pydantic_ai import Agent
# 定义 Agent:指定模型 + 系统指令
agent = Agent(
'openai:gpt-4o',
instructions='用中文回答,简洁有力,一句话说清楚。',
)
# 同步运行
result = agent.run_sync('为什么 Python 的 GIL 还没完全去掉?')
print(result.output)
# 输出:Python 的 GIL 正在通过 PEP 703 逐步移除,但完全移除需要多年过渡期,
# 主要因为大量 C 扩展依赖 GIL 保证线程安全,强行移除会破坏生态兼容性。
和 LangChain 的核心差异:result.output 的类型是 str,IDE 知道,类型检查器知道,你写代码的时候就知道。
异步运行
import asyncio
from pydantic_ai import Agent
async def main():
agent = Agent('anthropic:claude-sonnet-4-6')
result = await agent.run('用 Python 实现一个线程安全的 LRU Cache')
print(result.output)
asyncio.run(main())
核心概念深度解析
Agent
Agent 是 Pydantic AI 的核心抽象,本质上是一个可配置的 LLM 对话运行器。
from pydantic_ai import Agent
from pydantic import BaseModel
class UserInfo(BaseModel):
user_id: int
role: str
# Agent 的完整参数
agent = Agent(
# 模型:支持任意注册的模型
'openai:gpt-4o-mini',
# 系统指令:可以是字符串,也可以是动态生成函数
instructions='你是一个技术专家,擅长用代码示例解释复杂概念。',
# 结构化输出类型(可选)
output_type=str, # 也可以是 BaseModel 的子类
# 名称(用于可观测性)
name='tech-expert-agent',
# 描述(会写入 OTel Span)
description='回答技术问题的专家 Agent',
)
RunContext:依赖注入的魔法
这是 Pydantic AI 最优雅的设计之一。像 FastAPI 的 Depends(),但更强大:
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
import asyncpg
@dataclass
class AppDeps:
db_pool: asyncpg.Pool
api_key: str
debug: bool
agent = Agent(
'openai:gpt-4o',
deps_type=AppDeps, # 声明依赖类型,类型检查器会帮你
)
# Tool 函数可以声明第一个参数为 RunContext[AppDeps]
# ctx.deps 就是 AppDeps 的实例,类型安全!
@agent.tool
async def query_database(
ctx: RunContext[AppDeps],
sql: str,
) -> list[dict]:
"""执行一条只读 SQL 查询。"""
if ctx.deps.debug:
print(f"Executing SQL: {sql}")
async with ctx.deps.db_pool.acquire() as conn:
rows = await conn.fetch(sql)
return [dict(r) for r in rows]
# 运行 Agent,传入 deps
async def main():
pool = await asyncpg.create_pool(DATABASE_URL)
deps = AppDeps(db_pool=pool, api_key='...', debug=True)
result = await agent.run(
'查询用户表中注册人数最多的前 5 个城市',
deps=deps,
)
核心优势:Tool 函数签名即文档,类型检查器在写代码时就能发现错误,不需要等到运行时。
Tools 与依赖注入
定义 Tool 的两种方式
方式一:@agent.tool 装饰器(推荐)
from pydantic_ai import Agent, RunContext
from pydantic import BaseModel
agent = Agent('openai:gpt-4o')
class WeatherInfo(BaseModel):
city: str
temperature: float
condition: str
@agent.tool
async def get_weather(
ctx: RunContext,
city: str,
) -> WeatherInfo:
"""获取指定城市的当前天气。
Args:
city: 城市名称(中文或英文)
Returns:
WeatherInfo: 包含温度、天气状况的结构化数据
"""
# 这里是实际调用天气 API 的逻辑
# 为了示例,返回模拟数据
return WeatherInfo(
city=city,
temperature=26.5,
condition='晴',
)
方式二:@agent.tool_plain(无 RunContext)
@agent.tool_plain
async def calculate(expression: str) -> float:
"""安全地计算数学表达式。"""
import ast
import operator
ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
}
def eval_node(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
left = eval_node(node.left)
right = eval_node(node.right)
return ops[type(node.op)](left, right)
raise ValueError(f"Unsupported: {type(node)}")
tree = ast.parse(expression, mode='eval')
return eval_node(tree.body)
Tool 的自动 Schema 生成
Pydantic AI 会根据 Tool 函数的签名自动生成 JSON Schema,并注入到 LLM 的 Function Calling 参数里。你不需要手写任何 Schema。
# 你写的代码
@agent.tool
async def search_documents(
ctx: RunContext,
query: str,
top_k: int = 5,
filters: dict[str, str] | None = None,
) -> list[dict]:
"""在知识库中搜索文档。"""
# LLM 看到的 Tool Schema(自动生成)
{
"name": "search_documents",
"description": "在知识库中搜索文档。",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {"type": "integer", "default": 5},
"filters": {
"anyOf": [
{"type": "object", "additionalProperties": {"type": "string"}},
{"type": "null"}
]
}
},
"required": ["query"]
}
}
Structured Output
为什么 Structured Output 很重要?
让 LLM 直接返回文本,然后正则解析——这是生产事故的经典来源。Pydantic AI 把输出验证作为一等公民:
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from typing import Literal
class CodeReviewResult(BaseModel):
overall_score: int = Field(ge=1, le=10, description='代码质量评分 1-10')
issues: list[str] = Field(description='发现的问题列表')
suggestions: list[str] = Field(description='改进建议')
approve: bool = Field(description='是否建议合并')
confidence: Literal['high', 'medium', 'low'] = Field(description='评审置信度')
# 指定 output_type,Agent 的输出会被验证并解析成 CodeReviewResult
reviewer = Agent(
'anthropic:claude-sonnet-4-6',
output_type=CodeReviewResult,
instructions='你是一个严格的代码 reviewer。对提交的代码进行深度评审。',
)
async def review_code(code: str) -> CodeReviewResult:
result = await reviewer.run(f'请评审以下代码:\n\n{code}')
# result.output 已经是 CodeReviewResult 实例,类型安全!
return result.output
# 使用
review = await review_code("""
def add(a, b):
return a + b
""")
print(review.overall_score) # int,不是 str
print(review.approve) # bool
流式结构化输出
from pydantic_ai import Agent
import asyncio
agent = Agent('openai:gpt-4o', output_type=CodeReviewResult)
async def stream_review(code: str):
async with agent.run_stream(f'请评审:\n{code}') as result:
# 流式获取结构化输出
async for partial in result.stream_text():
print(partial, end='', flush=True)
# 最终验证完整输出
final = await result.get_output()
print(f"\n最终评分:{final.overall_score}")
Capabilities:可组合的能力单元
Capability 是 Pydantic AI 的架构亮点:把 Tools + Instructions + Hooks + Model Settings 打包成一个可复用、可组合的单位。
内置 Capabilities
Thinking(思维链)
from pydantic_ai import Agent
from pydantic_ai.capabilities import Thinking
agent = Agent(
'anthropic:claude-sonnet-4-6',
instructions='回答复杂推理问题。',
capabilities=[Thinking(budget_tokens=10000)],
)
WebSearch(网络搜索)
from pydantic_ai.capabilities import WebSearch
agent = Agent(
'google:gemini-2.5-flash',
capabilities=[WebSearch()],
)
MCP(Model Context Protocol)
from pydantic_ai.capabilities import MCP
agent = Agent(
'openai:gpt-4o',
capabilities=[
MCP(server_url='http://localhost:8080/mcp'),
],
)
自定义 Capability
from pydantic_ai.capabilities import Capability
from pydantic_ai import Agent, RunContext
class DatabaseCapability(Capability):
def __init__(self, db_pool):
self.db_pool = db_pool
def apply(self, agent: Agent):
# 注册 Tool
@agent.tool
async def db_query(ctx: RunContext, sql: str) -> list[dict]:
async with self.db_pool.acquire() as conn:
return await conn.fetch(sql)
# 追加 Instructions
agent.instructions += '\n你可以使用 db_query 工具查询数据库。'
# 注册 Hook(可选)
agent.hooks.on_run_start.append(self._on_start)
# 使用
agent = Agent('openai:gpt-4o')
agent.register(DatabaseCapability(db_pool))
RAG 实战
构建一个完整的 RAG Agent
import asyncio
from dataclasses import dataclass
from typing import Literal
import numpy as np
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from sentence_transformers import SentenceTransformer
import faiss
# ===== 数据模型 =====
class Document(BaseModel):
id: str
content: str
metadata: dict[str, str]
class RetrievalResult(BaseModel):
documents: list[Document]
scores: list[float]
# ===== 依赖 =====
@dataclass
class RAGDeps:
embedding_model: SentenceTransformer
index: faiss.IndexFlatIP
documents: dict[str, Document]
top_k: int = 5
# ===== Agent 定义 =====
rag_agent = Agent(
'openai:gpt-4o',
deps_type=RAGDeps,
output_type=str,
instructions="""你是一个知识库问答助手。
规则:
1. 使用 retrieve 工具获取相关文档
2. 基于检索到的内容回答,不要编造
3. 如果检索结果不足以回答问题,明确说明
4. 引用来源文档的 id
""",
)
@rag_agent.tool
async def retrieve(
ctx: RunContext[RAGDeps],
query: str,
) -> RetrievalResult:
"""从知识库中检索与查询最相关的文档。"""
# 编码查询
query_embedding = ctx.deps.embedding_model.encode([query])[0]
query_embedding = query_embedding / np.linalg.norm(query_embedding)
# 搜索
scores, indices = ctx.deps.index.search(
query_embedding.reshape(1, -1).astype(np.float32),
ctx.deps.top_k,
)
# 收集结果
docs = []
valid_scores = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1:
continue
doc_id = f"doc_{idx}"
if doc_id in ctx.deps.documents:
docs.append(ctx.deps.documents[doc_id])
valid_scores.append(float(score))
return RetrievalResult(documents=docs, scores=valid_scores)
# ===== 构建知识库 =====
def build_knowledge_base(
documents: list[Document],
embedding_model: SentenceTransformer,
) -> tuple[faiss.IndexFlatIP, dict[str, Document]]:
"""构建 FAISS 向量索引。"""
texts = [doc.content for doc in documents]
embeddings = embedding_model.encode(texts)
# 归一化(用于余弦相似度)
faiss.normalize_L2(embeddings)
index = faiss.IndexFlatIP(embeddings.shape[1])
index.add(embeddings.astype(np.float32))
doc_dict = {doc.id: doc for doc in documents}
return index, doc_dict
# ===== 主流程 =====
async def rag_query(question: str, deps: RAGDeps) -> str:
result = await rag_agent.run(question, deps=deps)
return result.output
# ===== 使用示例 =====
async def main():
# 1. 准备文档
docs = [
Document(
id='doc_0',
content='Pydantic AI 是 Pydantic 团队开发的 AI Agent 框架...',
metadata={'source': 'official_docs'},
),
# ... 更多文档
]
# 2. 构建索引
model = SentenceTransformer('BAAI/bge-m3')
index, doc_dict = build_knowledge_base(docs, model)
# 3. 构建依赖
deps = RAGDeps(
embedding_model=model,
index=index,
documents=doc_dict,
top_k=3,
)
# 4. 查询
answer = await rag_query('Pydantic AI 和 LangChain 的核心区别是什么?', deps)
print(answer)
if __name__ == '__main__':
asyncio.run(main())
多 Agent 协作
简单委托:一个 Agent 调用另一个 Agent
from pydantic_ai import Agent
# 专家 Agent
python_expert = Agent(
'openai:gpt-4o',
name='python-expert',
instructions='你是 Python 核心开发者,回答所有 Python 相关问题。',
)
rust_expert = Agent(
'openai:gpt-4o',
name='rust-expert',
instructions='你是 Rust 核心开发者,回答所有 Rust 相关问题。',
)
# 路由 Agent
router = Agent(
'openai:gpt-4o',
instructions="""你是技术支持路由。
根据用户问题选择合适的专家回答。
可用专家:python-expert, rust-expert
""",
)
# 注册委托 Tool
@router.tool_plain
async def ask_python_expert(question: str) -> str:
"""向 Python 专家提问。"""
result = await python_expert.run(question)
return result.output
@router.tool_plain
async def ask_rust_expert(question: str) -> str:
"""向 Rust 专家提问。"""
result = await rust_expert.run(question)
return result.output
# 使用
result = await router.run('Python 的 asyncio 和 Rust 的 tokio 有什么区别?')
使用 Graph 定义复杂工作流
from pydantic_ai.graph import Graph, Node
from pydantic_ai import Agent
from typing import TypedDict
class WorkflowState(TypedDict):
query: str
research_result: str | None
draft: str | None
review_comments: list[str] | None
# 定义节点
research_agent = Agent('openai:gpt-4o', name='researcher')
writing_agent = Agent('openai:gpt-4o', name='writer')
review_agent = Agent('openai:gpt-4o', name='reviewer')
# 构建 Graph
graph = Graph[WorkflowState]()
@graph.node
async def research(state: WorkflowState) -> WorkflowState:
result = await research_agent.run(f"研究这个主题:{state['query']}")
state['research_result'] = result.output
return state
@graph.node
async def write(state: WorkflowState) -> WorkflowState:
result = await writing_agent.run(
f"基于以下研究内容写一篇文章:\n{state['research_result']}"
)
state['draft'] = result.output
return state
@graph.node
async def review(state: WorkflowState) -> WorkflowState:
result = await review_agent.run(
f"审查这篇文章,给出具体修改建议:\n{state['draft']}"
)
# 解析评论(实际中应该用结构化输出)
state['review_comments'] = [result.output]
return state
@graph.node
async def revise(state: WorkflowState) -> WorkflowState:
result = await writing_agent.run(
f"根据以下评论修改文章:\n{state['draft']}\n\n评论:{state['review_comments']}"
)
state['draft'] = result.output
return state
# 定义边(控制流)
graph.add_edge('research', 'write')
graph.add_edge('write', 'review')
graph.add_edge('review', 'revise', condition=lambda s: len(s['review_comments']) > 0)
graph.add_edge('revise', 'review') # 循环直到满意
# 执行
result = await graph.run({'query': 'Pydantic AI 的核心优势'})
Durable Execution
为什么需要 Durable Execution?
生产环境中的 Agent 面临这些现实问题:
- LLM API 临时故障:网络抖动、限流、模型维护
- 长时间 Human-in-the-Loop:等待人工审批工具调用
- 进程重启:部署、崩溃、扩缩容
- 多步骤长流程:执行到一半失败,不能从头再来
Durable Execution 让 Agent 的执行状态持久化到数据库,可以随时暂停、恢复、重试。
使用 Durable Execution
from pydantic_ai.durable import DurableAgent, SqliteStateStore
from pydantic_ai import Agent, RunContext
# 1. 创建状态存储
state_store = SqliteStateStore('agent_state.db')
# 2. 创建 Durable Agent
agent = DurableAgent(
'openai:gpt-4o',
state_store=state_store,
instructions='你是一个需要多步骤完成的任务执行器。',
)
@agent.tool
async def send_email(
ctx: RunContext,
to: str,
subject: str,
body: str,
) -> str:
"""发送邮件。需要人工审批。"""
# 这个工具需要人工审批,会暂停执行
# 直到人工调用 approve_tool(tool_call_id)
return f"邮件已发送 to {to}"
# 3. 启动执行(可以中途停止,下次从断点继续)
run_id = 'my-unique-run-id'
# 第一次运行(可能在中途暂停)
try:
result = await agent.run_durable(
'帮我给 team@company.com 发邮件,主题是周报',
run_id=run_id,
)
except InterruptedForApproval as e:
print(f"工具调用需要审批:{e.tool_call_id}")
print(f"工具名称:{e.tool_name}")
print(f"参数:{e.tool_args}")
# 这里可以展示给人工审批界面
# 人工审批后调用:
# await agent.approve_tool(run_id, e.tool_call_id)
# 然后重新调用 run_durable,会从断点继续执行
# 如果之前暂停了,恢复执行
result = await agent.run_durable(
'继续上次的任务',
run_id=run_id, # 同一个 run_id,从上次断点继续
)
自定义状态存储
from pydantic_ai.durable import StateStore, ExecutionState
from dataclasses import dataclass
import json
import asyncpg
class PostgresStateStore(StateStore):
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def save(self, state: ExecutionState):
await self.pool.execute("""
INSERT INTO agent_state (run_id, state, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (run_id) DO UPDATE
SET state = EXCLUDED.state, updated_at = NOW()
""", state.run_id, json.dumps(state.model_dump()))
async def load(self, run_id: str) -> ExecutionState | None:
row = await self.pool.fetchrow(
"SELECT state FROM agent_state WHERE run_id = $1",
run_id,
)
if row:
return ExecutionState.model_validate(json.loads(row['state']))
return None
async def delete(self, run_id: str):
await self.pool.execute(
"DELETE FROM agent_state WHERE run_id = $1",
run_id,
)
Graph
为什么需要 Graph?
当 Agent 的逻辑变得复杂,你会写出这样的代码:
# 意大利面条式控制流
if condition_a:
result = await agent_a.run(...)
if condition_b:
result2 = await agent_b.run(...)
if condition_c:
# 嵌套越来越深...
Graph 让你用声明式的方式定义控制流,并用类型提示保证正确性。
定义 Graph
from pydantic_ai.graph import Graph, Node, Edge
from pydantic import BaseModel
from typing import Literal
# 状态类型
class AnalysisState(BaseModel):
code: str
language: Literal['python', 'rust', 'go']
style_score: float | None = None
bug_score: float | None = None
performance_score: float | None = None
final_report: str | None = None
# 创建 Graph
graph = Graph[AnalysisState]()
# 节点 1:风格分析
@graph.node
async def analyze_style(state: AnalysisState) -> AnalysisState:
agent = Agent('openai:gpt-4o', output_type=dict)
result = await agent.run(f"分析以下代码的风格质量(0-10分):\n{state.code}")
state.style_score = result.output['score']
return state
# 节点 2:Bug 分析
@graph.node
async def analyze_bugs(state: AnalysisState) -> AnalysisState:
agent = Agent('openai:gpt-4o', output_type=dict)
result = await agent.run(f"找出以下代码中的潜在 Bug:\n{state.code}")
state.bug_score = result.output['score']
return state
# 节点 3:性能分析
@graph.node
async def analyze_performance(state: AnalysisState) -> AnalysisState:
agent = Agent('openai:gpt-4o', output_type=dict)
result = await agent.run(f"评估以下代码的性能(0-10分):\n{state.code}")
state.performance_score = result.output['score']
return state
# 节点 4:生成报告
@graph.node
async def generate_report(state: AnalysisState) -> AnalysisState:
report = f"""
# 代码分析报告
- 语言:{state.language}
- 风格评分:{state.style_score}/10
- Bug 安全评分:{state.bug_score}/10
- 性能评分:{state.performance_score}/10
## 总体评价
{"需要改进" if sum(v or 0 for v in [state.style_score, state.bug_score, state.performance_score]) / 3 < 6 else "质量良好"}
"""
state.final_report = report
return state
# 定义执行顺序(可以并行!)
graph.add_edge('__start__', ['analyze_style', 'analyze_bugs', 'analyze_performance'])
# 上面三个节点并行执行,全部完成后才执行下一个
graph.add_edge(['analyze_style', 'analyze_bugs', 'analyze_performance'], 'generate_report')
graph.add_edge('generate_report', '__end__')
# 执行
result = await graph.run(AnalysisState(
code='def add(a, b): return a + b',
language='python',
))
print(result.final_report)
MCP & A2A
MCP(Model Context Protocol)
MCP 是 Anthropic 推出的标准协议,让 AI 应用可以访问外部工具和数据源。
from pydantic_ai import Agent
from pydantic_ai.capabilities import MCP
# 方式一:通过 URL 连接 MCP 服务器
agent = Agent(
'openai:gpt-4o',
capabilities=[
MCP(server_url='https://mcp-server.example.com/mcp'),
],
)
# 方式二:本地 MCP 服务器(通过 stdio)
agent = Agent(
'openai:gpt-4o',
capabilities=[
MCP(command='npx', args=['-y', '@modelcontextprotocol/server-filesystem', '/tmp']),
],
)
# 使用(MCP 服务器提供的 Tools 会自动注册到 Agent)
result = await agent.run('读取 /tmp/README.md 的内容')
A2A(Agent2Agent)
A2A 是 Google 推出的 Agent 互操作协议。
from pydantic_ai import Agent
from pydantic_ai.a2a import A2AClient, A2AAgentCard
# 发现远程 Agent
async def discover_agent(url: str) -> A2AAgentCard:
client = A2AClient()
return await client.fetch_agent_card(url)
# 调用远程 Agent
async def call_remote_agent():
client = A2AClient()
result = await client.run(
agent_url='https://other-agent.example.com/a2a',
message='帮我分析这段代码的时间复杂度',
# 可以传递 Artifacts(文件、图片等)
artifacts=[{'type': 'text', 'content': 'def foo(n):\n for i in range(n):\n print(i)'}],
)
print(result.output)
可观测性
与 Pydantic Logfire 集成
import logfire
from pydantic_ai import Agent
# 初始化 Logfire(会自动追踪所有 Pydantic AI 调用)
logfire.configure(
token='your-logfire-token',
service_name='my-agent-service',
environment='production',
)
agent = Agent('openai:gpt-4o')
# 每次 run() 都会自动生成 OTel Span:
# - gen_ai.agent.name
# - gen_ai.request.model
# - gen_ai.response.usage.input_tokens
# - gen_ai.response.usage.output_tokens
# - gen_ai.cost.total (如果模型支持)
result = await agent.run('分析问题')
在 Logfire 仪表板看到的
- 每个 Agent 调用的 Trace:完整的 LLM 请求/响应周期
- Tool 调用明细:每个 Tool 的参数、返回值、耗时
- Token 用量统计:按模型、按 Agent、按时间聚合
- 成本追踪:实时计算 API 花费
- 延迟分布:P50/P95/P99 延迟
使用其他 OTel 后端
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc import OTLPSpanExporter
# 使用自定义的 OTel 后端(比如 Jaeger、Zipkin)
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint='http://jaeger:4317')
trace.set_tracer_provider(provider)
# Pydantic AI 会自动使用全局 TracerProvider
agent = Agent('openai:gpt-4o')
Evals
为什么需要 Evals?
Agent 系统不是"写一次就完事"的软件。你需要:
- 回归测试:改了 Prompt,会不会让之前的好结果变坏?
- 版本对比:新模型比旧模型好多少?
- 边缘案例覆盖:Agent 在边界情况下会不会翻车?
构建 Eval 套件
from pydantic_ai.evals import EvalSuite, EvalCase
from pydantic_ai import Agent
import pytest
# 定义测试套件
suite = EvalSuite(
name='code-review-agent',
description='测试代码评审 Agent 的质量',
)
# 添加测试用例
suite.add_case(EvalCase(
input='def add(a, b): return a + b',
expected_output={
'overall_score': 7, # 期望评分在 7 左右
'approve': True, # 期望建议合并
},
rubric='简单的加法函数,质量尚可,应该通过',
))
suite.add_case(EvalCase(
input='def unsafe_eval(s): return eval(s)',
expected_output={
'overall_score': 2, # 期望低分
'approve': False, # 期望不建议合并
},
rubric='使用 eval 有不安全风险,应该拒绝',
))
# 运行 Eval
@pytest.mark.asyncio
async def test_code_review_agent():
agent = Agent('openai:gpt-4o', output_type=CodeReviewResult)
results = await suite.run(agent, deps=None)
# 分析结果
for result in results:
print(f"Input: {result.case.input[:50]}...")
print(f"Expected: {result.case.expected_output}")
print(f"Actual: {result.output.model_dump()}")
print(f"Match: {result.matches}")
print("---")
# 断言整体质量
assert results.average_score() > 0.7 # 70% 以上的用例通过
在 Logfire 中查看 Eval 结果
Pydantic AI 的 Eval 结果会自动写入 Logfire,你可以在仪表板中:
- 查看每次 Eval 运行的详细结果
- 对比不同版本的性能
- 设置质量门槛(Evals 不通过不允许部署)
框架对比
| 维度 | Pydantic AI | LangChain | CrewAI | OpenAI Agents SDK |
|---|---|---|---|---|
| 类型安全 | ✅ 完整 | ❌ 运行时 | ❌ 运行时 | ⚠️ 部分 |
| 学习曲线 | 低(类似 FastAPI) | 高(抽象层次多) | 中 | 中 |
| 性能 | 高(无冗余抽象) | 低(中间层多) | 中 | 高 |
| 模型支持 | 30+ | 20+ | 10+ | OpenAI 为主 |
| 可观测性 | 原生 Logfire | 需集成 | 需集成 | 基础 |
| MCP 支持 | ✅ 原生 | ⚠️ 第三方 | ❌ | ❌ |
| Durable Execution | ✅ 原生 | ❌ | ❌ | ⚠️ 部分 |
| 文档质量 | 高(完整类型提示) | 中(示例为主) | 中 | 高 |
| 生产成熟度 | 中(较新) | 高 | 中 | 高 |
选择建议
- 新项目 + Python + 重视类型安全 → Pydantic AI
- 已有 LangChain 代码 → 渐进迁移(Pydantic AI 可以和 LangChain 共存)
- 多 Agent 协作场景 → Pydantic AI(Graph 支持更好)
- 需要极致简单的 Agent → OpenAI Agents SDK
生产最佳实践
1. 总是使用 Structured Output
# ❌ 不好:让 LLM 返回文本,然后自己解析
agent = Agent('openai:gpt-4o')
result = await agent.run('提取用户输入中的姓名和年龄')
# 需要正则/JSON 解析,容易出错
# ✅ 好:使用 Structured Output
class UserInfo(BaseModel):
name: str
age: int | None
agent = Agent('openai:gpt-4o', output_type=UserInfo)
result = await agent.run('我叫张三,今年 28 岁')
# result.output 已经是 UserInfo 实例
2. 为 Tool 写清晰的 Docstring
LLM 靠 Tool 的 docstring 理解工具的用途和参数。写得越好,Tool 调用越准确。
# ❌ 不好
@agent.tool
async def search(ctx, q):
...
# ✅ 好
@agent.tool
async def search(
ctx: RunContext,
query: str = Field(description='搜索关键词,支持布尔运算符'),
max_results: int = Field(default=10, description='最大返回结果数,1-100', ge=1, le=100),
include_archived: bool = Field(default=False, description='是否包含已归档的文档'),
) -> list[dict]:
"""在知识库中搜索文档。
支持的功能:
- 全文搜索
- 布尔查询(AND, OR, NOT)
- 过滤已归档文档
注意事项:
- 搜索结果按相关性排序
- 如果没有找到结果,返回空列表而不是报错
"""
...
3. 使用 Durable Execution 保护长时间任务
# 对于任何可能需要人工审批或多步骤的任务,使用 Durable Execution
agent = DurableAgent(
'openai:gpt-4o',
state_store=PostgresStateStore(pool),
)
4. 设置合理的 Timeout 和 Retry
from pydantic_ai import Agent
from pydantic_ai.models import ModelSettings
agent = Agent(
'openai:gpt-4o',
model_settings=ModelSettings(
timeout=30.0, # 30 秒超时
max_retries=3, # 最多重试 3 次
temperature=0.1, # 低温度,更稳定
),
)
5. 用 Logfire 监控生产环境
import logfire
# 在 production 中一定要配置 Logfire
logfire.configure(
token='prod-token',
service_name='agent-service',
environment='production',
metrics=True, # 启用指标收集
tracing=True, # 启用追踪
)
完整实战:构建一个银行客服 Agent 系统
需求
构建一个银行客服 Agent,能够:
- 查询账户余额
- 查询交易历史
- 转账(需要人工审批)
- 回答常见问题(RAG)
- 如果问题超出能力范围,转人工
完整代码
from dataclasses import dataclass
from typing import Literal
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext, DurableAgent, SqliteStateStore
import asyncpg
from datetime import datetime
# ===== 数据模型 =====
class AccountInfo(BaseModel):
account_id: str
customer_name: str
balance: float
currency: str
class Transaction(BaseModel):
id: str
amount: float
description: str
timestamp: datetime
type: Literal['credit', 'debit']
class TransferResult(BaseModel):
success: bool
transaction_id: str | None = None
message: str
# ===== 依赖 =====
@dataclass
class BankDeps:
db: asyncpg.Pool
customer_id: str
is_authenticated: bool
require_approval_for_amount_over: float = 10000.0
# ===== Agent 定义 =====
bank_agent = Agent(
'openai:gpt-4o',
deps_type=BankDeps,
instructions="""你是一个银行客服助手。
规则:
1. 所有操作前必须先验证用户身份(使用 get_customer_info 工具)
2. 余额和交易信息要清晰展示
3. 转账超过 10000 元需要人工审批
4. 无法处理的问题,告知客户会转接人工客服
5. 始终使用中文回复,语气专业且友好
""",
)
# ===== Tools =====
@bank_agent.tool
async def get_customer_info(ctx: RunContext[BankDeps]) -> AccountInfo:
"""获取当前客户的账户信息。"""
if not ctx.deps.is_authenticated:
raise ValueError("用户未认证")
row = await ctx.deps.db.fetchrow(
"SELECT a.account_id, c.name, a.balance, a.currency "
"FROM accounts a JOIN customers c ON a.customer_id = c.id "
"WHERE c.id = $1",
ctx.deps.customer_id,
)
return AccountInfo(
account_id=row['account_id'],
customer_name=row['name'],
balance=float(row['balance']),
currency=row['currency'],
)
@bank_agent.tool
async def get_transaction_history(
ctx: RunContext[BankDeps],
limit: int = 10,
) -> list[Transaction]:
"""获取客户的交易历史。"""
if not ctx.deps.is_authenticated:
raise ValueError("用户未认证")
rows = await ctx.deps.db.fetch(
"SELECT id, amount, description, created_at, type "
"FROM transactions "
"WHERE customer_id = $1 "
"ORDER BY created_at DESC LIMIT $2",
ctx.deps.customer_id,
limit,
)
return [
Transaction(
id=row['id'],
amount=float(row['amount']),
description=row['description'],
timestamp=row['created_at'],
type=row['type'],
)
for row in rows
]
@bank_agent.tool
async def initiate_transfer(
ctx: RunContext[BankDeps],
to_account: str,
amount: float,
description: str,
) -> TransferResult:
"""发起转账。
注意:
- 如果金额超过 10000 元,需要人工审批
- 转账会在审批通过后执行
"""
if not ctx.deps.is_authenticated:
return TransferResult(success=False, message="用户未认证")
if amount > ctx.deps.require_approval_for_amount_over:
# 触发人工审批(Durable Execution 会暂停在这里)
return TransferResult(
success=False,
message=f"转账金额 {amount} 元超过限额,需要人工审批。工单已创建。",
)
# 执行转账(实际中应该在数据库事务中)
tx_id = f"TX{datetime.now().strftime('%Y%m%d%H%M%S')}"
# 这里省略数据库操作...
return TransferResult(
success=True,
transaction_id=tx_id,
message=f"转账成功。交易 ID:{tx_id}",
)
@bank_agent.tool
async def search_faq(ctx: RunContext[BankDeps], query: str) -> str:
"""搜索常见问题解答。"""
# 实际中应该用向量搜索
faqs = {
'手续费': '本行同城转账免手续费,跨行转账按金额 0.1% 收取,最低 2 元,最高 50 元。',
'密码': '您可以通过手机银行 APP 的"忘记密码"功能重置密码,或携带身份证到柜台办理。',
'限额': '默认单日转账限额为 50000 元。您可以在手机银行中调整,最高不超过 500000 元。',
}
for key, answer in faqs.items():
if key in query:
return answer
return "抱歉,我无法找到相关问题的答案。我会帮您转接人工客服。"
# ===== 主流程 =====
async def handle_customer_query(
customer_id: str,
query: str,
db_pool: asyncpg.Pool,
) -> str:
# 1. 认证(实际中应该用 JWT/OAuth)
is_auth = await authenticate_customer(customer_id, db_pool)
# 2. 构建依赖
deps = BankDeps(
db=db_pool,
customer_id=customer_id,
is_authenticated=is_auth,
)
# 3. 使用 Durable Execution(转账可能需要审批)
state_store = SqliteStateStore('bank_agent_state.db')
durable_agent = DurableAgent.from_agent(
bank_agent,
state_store=state_store,
)
run_id = f"bank_{customer_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# 4. 运行
try:
result = await durable_agent.run_durable(
query,
deps=deps,
run_id=run_id,
)
return result.output
except InterruptedForApproval as e:
return f"您的请求需要人工审批。审批编号:{e.tool_call_id}"
async def authenticate_customer(customer_id: str, db: asyncpg.Pool) -> bool:
"""模拟认证。实际中应该用 JWT/Session。"""
row = await db.fetchrow(
"SELECT COUNT(*) FROM active_sessions WHERE customer_id = $1 AND expires_at > NOW()",
customer_id,
)
return row['count'] > 0
# ===== 使用示例 =====
async def main():
# 初始化数据库连接
pool = await asyncpg.create_pool(DATABASE_URL)
# 处理客户查询
response = await handle_customer_query(
customer_id='CUST_001',
query='我想查一下我的账户余额,还有最近的 5 笔交易',
db_pool=pool,
)
print(response)
if __name__ == '__main__':
asyncio.run(main())
总结与展望
Pydantic AI 的核心价值
- 类型安全带来开发效率:写代码时就能发现错误,而不是运行时
- Pydantic 生态原生集成:如果你已经用 Pydantic(几乎每个 Python AI 项目都用),迁移成本极低
- 生产级特性开箱即用:Durable Execution、MCP、A2A、Logfire 集成,这些都是其他框架需要自己拼装的
- 简洁优先的设计哲学:没有复杂的抽象层次,代码读起来就像在执行逻辑
适合场景
- ✅ Python 技术栈的团队
- ✅ 需要类型安全和 IDE 支持的项目
- ✅ 生产级 Agent 系统(需要容错、可观测)
- ✅ 需要 MCP/A2A 互操作的场景
不适合场景
- ❌ 非 Python 技术栈(考虑 Mastra for TypeScript)
- ❌ 简单的一次性脚本(直接用 OpenAI SDK 更快)
- ❌ 需要大量自定义 LLM 调用逻辑的底层研究
未来展望
Pydantic AI 还在快速迭代中(GitHub 每天都有新 Commit),根据路线图:
- 更多内置 Capability:更多开箱即用的能力单元
- 更好的多模态支持:图片、音频、视频作为输入/输出
- 更强的 Graph 功能:条件分支、循环、并行执行的声明式定义
- Agent 市场:分享和发现社区构建的 Capability
参考资源
- 官方文档:https://ai.pydantic.dev/
- GitHub 仓库:https://github.com/pydantic/pydantic-ai
- Pydantic Logfire:https://pydantic.dev/logfire
- Discord 社区:https://discord.gg/Ee4DHTavGW
作者注:本文基于 Pydantic AI 2026 年 6 月最新版本(v1.80+)撰写。框架迭代迅速,建议结合官方文档使用。
字数统计:约 12,800 字
原文首发于 程序员茄子,转载请注明出处。