编程 LangGraph 深度实战:从状态图到生产级 Agent 系统——用图结构重新定义 AI 工程边界

2026-04-14 10:54:15 +0800 CST views 6

LangGraph 深度实战:从状态图到生产级 Agent 系统——用图结构重新定义 AI 工程边界

2026 年,AI Agent 已经从实验室走进了每个工程团队。LangChain 的"链式调用"时代正在谢幕,而 LangGraph 用"图结构"接管了复杂 Agent 编排的话语权。本文是一篇 5000+ 字的深度实战指南,带你从原理到生产落地,彻底搞懂 LangGraph。


一、背景:为什么链式调用不够用了?

2022 年,LangChain 横空出世,用 Chain(链)的思路串联 LLM 调用。那时候大多数应用都是:输入 → Prompt → LLM → 输出,线性的,可预期的。

但 2025 年之后,业务需求变了:

  • 用户要求 Agent 能"回头重来":如果搜索结果不好,重新搜索
  • 要求多个 Agent 并发协作,互相通信
  • 要求在关键决策点"暂停等人审批"(Human-in-the-loop)
  • 要求 Agent 的执行过程可回溯、可 replay

线性 Chain 处理这些需求非常痛苦——你得手动管理状态、手写循环、自己实现分支逻辑。代码很快变成意大利面。

LangGraph 的核心洞察:AI Agent 的执行流程本质上是一个有向图。节点是执行单元,边是流转逻辑,State 是全局共享的上下文。把业务流程画成图,比手写 if-else 直观 10 倍,维护 10 倍容易。


二、核心概念:三要素一图

LangGraph 的核心就三个概念:

2.1 State(状态)

State 是贯穿整个图的"公共黑板",所有节点都可以读写它。

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, add_messages

class AgentState(TypedDict):
    # messages 字段使用 add_messages reducer,自动追加而非覆盖
    messages: Annotated[list, add_messages]
    # 自定义业务字段
    search_results: list[str]
    retry_count: int
    final_answer: str

这里有个关键点:Reducer。默认情况下,节点返回的值会"覆盖"State 对应字段。但对于 messages(消息历史),我们想要追加而不是覆盖,所以用 add_messages 这个内置 Reducer。

你也可以自定义 Reducer:

from typing import Annotated
import operator

def merge_results(left: list, right: list) -> list:
    """去重合并搜索结果"""
    return list(set(left + right))

class ResearchState(TypedDict):
    # 自定义 reducer,搜索结果去重合并
    search_results: Annotated[list, merge_results]
    # 普通字段,直接覆盖
    current_query: str
    is_done: bool

2.2 Nodes(节点)

节点是执行单元,本质是一个函数:接收 State,返回更新后的 State(部分更新即可)。

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def research_planner(state: AgentState) -> dict:
    """规划节点:分析用户问题,制定研究计划"""
    system_prompt = """你是一个研究规划专家。
    分析用户的研究问题,制定一个清晰的调研计划。
    返回 3-5 个具体的搜索查询词,用于后续信息收集。"""
    
    response = llm.invoke([
        SystemMessage(content=system_prompt),
        *state["messages"]
    ])
    
    return {"messages": [response]}


def web_searcher(state: AgentState) -> dict:
    """搜索节点:执行网络搜索"""
    # 从最新消息中提取搜索词
    last_message = state["messages"][-1].content
    
    # 模拟搜索(实际替换为 SerpAPI、Tavily 等)
    results = mock_search(last_message)
    
    return {
        "search_results": results,
        "retry_count": state.get("retry_count", 0)
    }


def quality_checker(state: AgentState) -> dict:
    """质量检查节点:判断搜索结果是否足够"""
    results = state.get("search_results", [])
    retry_count = state.get("retry_count", 0)
    
    if len(results) < 3 and retry_count < 2:
        # 结果不够,需要重新搜索
        return {"retry_count": retry_count + 1}
    else:
        return {"is_done": True}


def report_writer(state: AgentState) -> dict:
    """报告生成节点:基于搜索结果撰写报告"""
    system_prompt = """基于提供的搜索结果,撰写一份详细的研究报告。
    要求:逻辑清晰,数据准确,有独特见解。"""
    
    context = "\n".join(state.get("search_results", []))
    
    response = llm.invoke([
        SystemMessage(content=f"{system_prompt}\n\n参考资料:\n{context}"),
        *state["messages"]
    ])
    
    return {
        "messages": [response],
        "final_answer": response.content
    }

2.3 Edges(边)

边决定执行流转逻辑。LangGraph 支持三种边:

普通边:无条件跳转

graph.add_edge("planner", "searcher")

条件边:根据 State 动态决定下一个节点

def route_after_check(state: AgentState) -> str:
    """根据质量检查结果决定流转方向"""
    if state.get("is_done"):
        return "writer"  # 结果够了,写报告
    elif state.get("retry_count", 0) < 2:
        return "searcher"  # 重试搜索
    else:
        return "writer"  # 超过最大重试次数,强制写报告

graph.add_conditional_edges(
    "quality_check",
    route_after_check,
    {
        "writer": "writer",
        "searcher": "searcher"
    }
)

入口和出口

from langgraph.graph import START, END

graph.add_edge(START, "planner")    # 图的入口
graph.add_edge("writer", END)       # 图的出口

三、架构设计:搭建完整的研究 Agent

把上面三个概念组合起来,搭建一个完整的"超级研究员" Agent:

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

def build_research_agent():
    # 1. 初始化图
    workflow = StateGraph(AgentState)
    
    # 2. 添加节点
    workflow.add_node("planner", research_planner)
    workflow.add_node("searcher", web_searcher)
    workflow.add_node("quality_check", quality_checker)
    workflow.add_node("writer", report_writer)
    
    # 3. 定义边(流转逻辑)
    workflow.add_edge(START, "planner")          # 入口 → 规划
    workflow.add_edge("planner", "searcher")     # 规划 → 搜索
    workflow.add_edge("searcher", "quality_check")  # 搜索 → 质量检查
    
    # 条件边:根据质量检查结果决定
    workflow.add_conditional_edges(
        "quality_check",
        route_after_check,
        {"writer": "writer", "searcher": "searcher"}
    )
    
    workflow.add_edge("writer", END)             # 写报告 → 结束
    
    # 4. 添加 checkpoint(持久化 State,支持 Human-in-the-loop)
    memory = MemorySaver()
    
    # 5. 编译图
    agent = workflow.compile(checkpointer=memory)
    
    return agent


# 运行 Agent
agent = build_research_agent()

# 配置线程 ID(每个对话独立的上下文)
config = {"configurable": {"thread_id": "research-001"}}

result = agent.invoke(
    {"messages": [HumanMessage(content="分析 2026 年 AI Agent 技术的发展趋势")]},
    config=config
)

print(result["final_answer"])

3.1 可视化执行图

LangGraph 支持一行代码生成图的可视化:

# 生成 Mermaid 图表
from IPython.display import display, Image

display(Image(agent.get_graph().draw_mermaid_png()))

输出类似:

START → planner → searcher → quality_check
                      ↑              ↓
                      └──────────────┘ (retry)
                              ↓
                           writer → END

四、进阶能力:Human-in-the-Loop

这是 LangGraph 最强大的功能之一——在执行到关键节点时,暂停等待人类输入,然后继续执行。

4.1 设置中断点

# 编译时指定需要中断的节点
agent = workflow.compile(
    checkpointer=memory,
    interrupt_before=["writer"]  # 在写报告前中断,让人类审核搜索结果
)

4.2 执行到中断点

config = {"configurable": {"thread_id": "research-002"}}

# 第一次运行:执行到 writer 节点前暂停
result = agent.invoke(
    {"messages": [HumanMessage(content="分析量子计算的商业化进展")]},
    config=config
)

# 此时 result 包含已完成的搜索结果
print("搜索结果:")
for r in result["search_results"]:
    print(f"  - {r}")

# 可以检查当前状态
current_state = agent.get_state(config)
print(f"下一步:{current_state.next}")  # ('writer',)

4.3 人类审核后继续执行

# 人类可以修改状态(例如补充额外信息)
agent.update_state(
    config,
    {"search_results": result["search_results"] + ["额外补充:量子纠错突破最新进展..."]}
)

# 继续执行(从中断点恢复)
final_result = agent.invoke(None, config=config)
print(final_result["final_answer"])

4.4 审批工作流实战

一个更实际的例子——邮件发送审批工作流:

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END, add_messages
from langchain_core.messages import HumanMessage, AIMessage

class EmailWorkflowState(TypedDict):
    messages: Annotated[list, add_messages]
    draft_email: str
    approved: bool
    recipient: str

def draft_email_node(state: EmailWorkflowState) -> dict:
    """起草邮件"""
    prompt = f"为 {state['recipient']} 起草一封专业的商务邮件,内容:{state['messages'][-1].content}"
    response = llm.invoke([HumanMessage(content=prompt)])
    return {
        "messages": [response],
        "draft_email": response.content
    }

def send_email_node(state: EmailWorkflowState) -> dict:
    """发送邮件(只有 approved=True 才会到这里)"""
    # 实际发送逻辑
    print(f"✅ 邮件已发送给 {state['recipient']}")
    return {"messages": [AIMessage(content="邮件发送成功!")]}

def reject_email_node(state: EmailWorkflowState) -> dict:
    """拒绝发送"""
    return {"messages": [AIMessage(content="邮件发送已取消。")]}

def route_after_approval(state: EmailWorkflowState) -> str:
    if state.get("approved"):
        return "send"
    return "reject"

# 构建工作流
workflow = StateGraph(EmailWorkflowState)
workflow.add_node("draft", draft_email_node)
workflow.add_node("send", send_email_node)
workflow.add_node("reject", reject_email_node)

workflow.add_edge(START, "draft")
workflow.add_conditional_edges(
    "draft",
    route_after_approval,
    {"send": "send", "reject": "reject"}
)
workflow.add_edge("send", END)
workflow.add_edge("reject", END)

memory = MemorySaver()
email_agent = workflow.compile(
    checkpointer=memory,
    interrupt_after=["draft"]  # 起草后中断等待审批
)

config = {"configurable": {"thread_id": "email-001"}}

# 触发工作流
email_agent.invoke(
    {
        "messages": [HumanMessage(content="通知项目团队明天下午3点开会")],
        "recipient": "team@company.com",
        "approved": False
    },
    config=config
)

# 查看草稿
state = email_agent.get_state(config)
print("📧 邮件草稿:")
print(state.values["draft_email"])

# 人类审批(批准发送)
email_agent.update_state(config, {"approved": True})

# 继续执行
email_agent.invoke(None, config=config)

五、多 Agent 协作:Supervisor 架构

LangGraph 支持构建多 Agent 系统,最常见的是 Supervisor 架构:一个 Supervisor Agent 负责分配任务,多个 Worker Agent 负责执行。

from langgraph.graph import StateGraph, START, END
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent

# 创建专门的 Worker Agent
research_agent = create_react_agent(
    llm,
    tools=[web_search_tool, arxiv_search_tool],
    name="researcher",
    prompt="你是研究专家,负责搜集和分析信息。"
)

code_agent = create_react_agent(
    llm,
    tools=[python_repl_tool, code_sandbox_tool],
    name="coder",
    prompt="你是代码专家,负责编写和调试代码。"
)

writing_agent = create_react_agent(
    llm,
    tools=[],
    name="writer",
    prompt="你是写作专家,负责将信息整理成清晰的文章。"
)

# 创建 Supervisor
supervisor = create_supervisor(
    agents=[research_agent, code_agent, writing_agent],
    model=llm,
    prompt="""你是一个项目经理。根据任务需求,决定让哪个专家来处理:
    - 需要搜索信息 → researcher
    - 需要写代码 → coder  
    - 需要写文档 → writer
    当任务完成时,汇总结果并结束。"""
)

multi_agent = supervisor.compile(checkpointer=MemorySaver())

5.1 Swarm 架构(Agent 自主切换)

另一种模式是 Swarm(蜂群):Agent 之间可以互相"移交"控制权,不需要中央 Supervisor。

from langgraph_swarm import create_swarm, create_handoff_tool

# 定义移交工具
transfer_to_coder = create_handoff_tool(
    agent_name="coder",
    description="当需要写代码或调试程序时,移交给代码专家"
)

transfer_to_writer = create_handoff_tool(
    agent_name="writer",
    description="当需要写文档或报告时,移交给写作专家"
)

# 研究 Agent(可以移交给 coder 或 writer)
research_agent = create_react_agent(
    llm,
    tools=[web_search_tool, transfer_to_coder, transfer_to_writer],
    name="researcher"
)

# 代码 Agent(可以移交回 researcher 或给 writer)
transfer_to_researcher = create_handoff_tool(agent_name="researcher")
code_agent = create_react_agent(
    llm,
    tools=[python_repl_tool, transfer_to_researcher, transfer_to_writer],
    name="coder"
)

# 组装 Swarm
swarm = create_swarm(
    agents=[research_agent, code_agent, writing_agent],
    default_active_agent="researcher"  # 从 researcher 开始
)

六、持久化与生产级部署

6.1 生产级 Checkpoint

内存版 MemorySaver 不适合生产,进程重启就丢失了。生产环境需要持久化:

# PostgreSQL 持久化(推荐生产使用)
from langgraph.checkpoint.postgres import PostgresSaver

connection_string = "postgresql://user:password@localhost:5432/langgraph_db"
checkpointer = PostgresSaver.from_conn_string(connection_string)

# 初始化数据库表(只需执行一次)
checkpointer.setup()

agent = workflow.compile(checkpointer=checkpointer)
# Redis 持久化(适合高并发场景)
from langgraph.checkpoint.redis import RedisSaver

checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
agent = workflow.compile(checkpointer=checkpointer)

6.2 流式输出

LangGraph 原生支持流式输出,适合 Web 应用实时展示进度:

import asyncio

async def stream_research(query: str):
    config = {"configurable": {"thread_id": f"stream-{query[:10]}"}}
    
    async for chunk in agent.astream(
        {"messages": [HumanMessage(content=query)]},
        config=config,
        stream_mode="updates"  # 每次节点更新都推送
    ):
        node_name = list(chunk.keys())[0]
        node_output = chunk[node_name]
        
        # 实时打印节点执行情况
        print(f"\n[{node_name}] 执行完成")
        
        if "messages" in node_output:
            last_msg = node_output["messages"][-1]
            if hasattr(last_msg, "content"):
                print(f"输出:{last_msg.content[:200]}...")

# 运行
asyncio.run(stream_research("分析 Rust 在系统编程中的最新进展"))

也可以流式获取 Token 级别的输出:

async for chunk in agent.astream(
    {"messages": [HumanMessage(content=query)]},
    config=config,
    stream_mode="messages"  # Token 级流式
):
    if isinstance(chunk, tuple):
        message, metadata = chunk
        if hasattr(message, "content") and message.content:
            print(message.content, end="", flush=True)

6.3 LangGraph Cloud / LangGraph Platform

LangChain 官方提供了 LangGraph Platform,一个专门用于部署 LangGraph 应用的托管服务:

# langgraph.json 配置文件
{
  "dependencies": ["."],
  "graphs": {
    "research_agent": "./src/agent.py:graph",
    "email_workflow": "./src/email.py:workflow"
  },
  "env": ".env"
}

部署后会自动提供:

  • REST API(POST /runs, GET /runs/{run_id}/stream
  • WebSocket 支持
  • 水平扩展
  • 内置监控和可观测性

七、性能优化:让 Agent 跑得更快

7.1 节点并发执行

LangGraph 支持节点并发——没有依赖关系的节点可以并行执行:

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class ParallelState(TypedDict):
    query: str
    # 三个并发搜索结果,用 operator.add 合并
    web_results: Annotated[list, operator.add]
    arxiv_results: Annotated[list, operator.add]
    news_results: Annotated[list, operator.add]
    final_report: str

def web_search(state: ParallelState) -> dict:
    """并发节点1:网页搜索"""
    results = search_web(state["query"])
    return {"web_results": results}

def arxiv_search(state: ParallelState) -> dict:
    """并发节点2:学术搜索"""
    results = search_arxiv(state["query"])
    return {"arxiv_results": results}

def news_search(state: ParallelState) -> dict:
    """并发节点3:新闻搜索"""
    results = search_news(state["query"])
    return {"news_results": results}

def synthesize(state: ParallelState) -> dict:
    """汇总节点:在所有并发节点完成后执行"""
    all_results = state["web_results"] + state["arxiv_results"] + state["news_results"]
    report = generate_report(state["query"], all_results)
    return {"final_report": report}

# 构建并发图
workflow = StateGraph(ParallelState)
workflow.add_node("web_search", web_search)
workflow.add_node("arxiv_search", arxiv_search)
workflow.add_node("news_search", news_search)
workflow.add_node("synthesize", synthesize)

# 从 START 同时触发三个搜索节点(并发)
workflow.add_edge(START, "web_search")
workflow.add_edge(START, "arxiv_search")
workflow.add_edge(START, "news_search")

# 三个节点都完成后执行汇总
workflow.add_edge("web_search", "synthesize")
workflow.add_edge("arxiv_search", "synthesize")
workflow.add_edge("news_search", "synthesize")
workflow.add_edge("synthesize", END)

parallel_agent = workflow.compile()

这样三个搜索任务并发执行,总耗时从串行的 3 × T 降低到约 1 × T

7.2 缓存节点结果

from functools import lru_cache
import hashlib

def cached_search(state: AgentState) -> dict:
    """带缓存的搜索节点"""
    query = state["messages"][-1].content
    cache_key = hashlib.md5(query.encode()).hexdigest()
    
    # 检查缓存
    cached = redis_client.get(f"search:{cache_key}")
    if cached:
        return {"search_results": json.loads(cached)}
    
    # 执行搜索
    results = actual_search(query)
    
    # 写入缓存(1小时过期)
    redis_client.setex(f"search:{cache_key}", 3600, json.dumps(results))
    
    return {"search_results": results}

7.3 动态限流防止 API 超额

import asyncio
from collections import deque
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_calls: int, period: float):
        self.max_calls = max_calls
        self.period = period
        self.calls = deque()
    
    async def acquire(self):
        now = datetime.now()
        # 清理过期的调用记录
        while self.calls and self.calls[0] < now - timedelta(seconds=self.period):
            self.calls.popleft()
        
        if len(self.calls) >= self.max_calls:
            # 等待直到有空位
            wait_time = (self.calls[0] + timedelta(seconds=self.period) - now).total_seconds()
            await asyncio.sleep(wait_time)
        
        self.calls.append(now)

# 每分钟最多 60 次 LLM 调用
llm_limiter = RateLimiter(max_calls=60, period=60)

async def rate_limited_llm_node(state: AgentState) -> dict:
    await llm_limiter.acquire()
    response = await llm.ainvoke(state["messages"])
    return {"messages": [response]}

八、可观测性:生产监控必备

8.1 集成 LangSmith

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "research-agent-prod"

# 之后所有的 LangGraph 执行都会自动上报到 LangSmith
# 可以看到每个节点的输入输出、耗时、Token 消耗等

8.2 自定义回调

from langchain_core.callbacks import BaseCallbackHandler
from datetime import datetime

class AgentMonitorCallback(BaseCallbackHandler):
    def __init__(self, metrics_client):
        self.metrics = metrics_client
    
    def on_chain_start(self, serialized, inputs, **kwargs):
        chain_name = serialized.get("name", "unknown")
        self.metrics.increment(f"agent.node.start.{chain_name}")
        self.start_time = datetime.now()
    
    def on_chain_end(self, outputs, **kwargs):
        duration = (datetime.now() - self.start_time).total_seconds()
        self.metrics.histogram("agent.node.duration", duration)
    
    def on_chain_error(self, error, **kwargs):
        self.metrics.increment("agent.node.error")
        logger.error(f"节点执行错误: {error}")

# 使用回调
monitor = AgentMonitorCallback(prometheus_client)
result = agent.invoke(
    {"messages": [HumanMessage(content=query)]},
    config={
        "configurable": {"thread_id": "prod-001"},
        "callbacks": [monitor]
    }
)

九、真实案例:代码审查 Agent

综合以上技术,实现一个完整的代码审查 Agent:

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

class CodeReviewState(TypedDict):
    messages: Annotated[list, add_messages]
    code_snippet: str
    language: str
    security_issues: list[str]
    performance_issues: list[str]
    style_issues: list[str]
    overall_score: int  # 0-100
    review_approved: bool
    revision_count: int

# ===== 节点实现 =====

def detect_language(state: CodeReviewState) -> dict:
    """检测编程语言"""
    response = llm.invoke([
        SystemMessage(content="判断代码使用的编程语言,只返回语言名称,如 Python、JavaScript、Go 等"),
        HumanMessage(content=state["code_snippet"])
    ])
    return {"language": response.content.strip()}


def security_audit(state: CodeReviewState) -> dict:
    """安全审查节点"""
    response = llm.invoke([
        SystemMessage(content=f"""你是 {state['language']} 安全专家。
        审查代码中的安全问题,包括:SQL注入、XSS、命令注入、敏感信息泄露、权限漏洞等。
        以 JSON 列表形式返回问题,如:["存在 SQL 注入风险:第3行", "硬编码密码:第7行"]
        如果没有安全问题,返回 []"""),
        HumanMessage(content=state["code_snippet"])
    ])
    
    try:
        import json
        issues = json.loads(response.content)
    except:
        issues = [response.content] if response.content != "[]" else []
    
    return {"security_issues": issues}


def performance_audit(state: CodeReviewState) -> dict:
    """性能审查节点(与安全审查并发执行)"""
    response = llm.invoke([
        SystemMessage(content=f"""你是 {state['language']} 性能优化专家。
        审查代码中的性能问题,包括:N+1查询、不必要的循环、内存泄漏、阻塞I/O等。
        以 JSON 列表形式返回问题。"""),
        HumanMessage(content=state["code_snippet"])
    ])
    
    try:
        import json
        issues = json.loads(response.content)
    except:
        issues = [response.content] if response.content.strip() else []
    
    return {"performance_issues": issues}


def style_audit(state: CodeReviewState) -> dict:
    """代码风格审查节点(与前两个并发执行)"""
    response = llm.invoke([
        SystemMessage(content=f"""你是 {state['language']} 代码风格专家。
        审查代码风格问题:命名规范、注释质量、函数长度、复杂度等。
        以 JSON 列表形式返回问题。"""),
        HumanMessage(content=state["code_snippet"])
    ])
    
    try:
        import json
        issues = json.loads(response.content)
    except:
        issues = []
    
    return {"style_issues": issues}


def score_and_summarize(state: CodeReviewState) -> dict:
    """汇总评分节点"""
    security = state.get("security_issues", [])
    performance = state.get("performance_issues", [])
    style = state.get("style_issues", [])
    
    # 计算分数(安全问题扣分最多)
    base_score = 100
    base_score -= len(security) * 15   # 每个安全问题扣15分
    base_score -= len(performance) * 8  # 每个性能问题扣8分
    base_score -= len(style) * 3        # 每个风格问题扣3分
    score = max(0, min(100, base_score))
    
    # 生成总结报告
    summary_prompt = f"""基于以下审查结果,生成一份简洁的代码审查报告:
    
安全问题({len(security)}个):{security}
性能问题({len(performance)}个):{performance}
风格问题({len(style)}个):{style}
综合评分:{score}/100

要求:
1. 指出最关键的问题(按优先级)
2. 给出具体的修改建议
3. 语气专业但友好"""
    
    response = llm.invoke([HumanMessage(content=summary_prompt)])
    
    return {
        "messages": [AIMessage(content=response.content)],
        "overall_score": score
    }


def check_approval(state: CodeReviewState) -> Literal["approve", "request_revision"]:
    """决策节点:是否通过审查"""
    score = state.get("overall_score", 0)
    security_issues = state.get("security_issues", [])
    
    # 安全问题零容忍,或分数低于 60 都需要修改
    if security_issues or score < 60:
        return "request_revision"
    return "approve"


def request_revision(state: CodeReviewState) -> dict:
    """请求修改节点"""
    revision_count = state.get("revision_count", 0) + 1
    msg = f"代码审查未通过(第{revision_count}次)。评分:{state['overall_score']}/100。请根据上述问题修改后重新提交。"
    return {
        "messages": [AIMessage(content=msg)],
        "review_approved": False,
        "revision_count": revision_count
    }


def approve_code(state: CodeReviewState) -> dict:
    """批准节点"""
    msg = f"✅ 代码审查通过!评分:{state['overall_score']}/100。代码质量良好,可以合并。"
    return {
        "messages": [AIMessage(content=msg)],
        "review_approved": True
    }


# ===== 构建图 =====

def build_code_review_agent():
    workflow = StateGraph(CodeReviewState)
    
    # 添加节点
    workflow.add_node("detect_language", detect_language)
    workflow.add_node("security_audit", security_audit)
    workflow.add_node("performance_audit", performance_audit)
    workflow.add_node("style_audit", style_audit)
    workflow.add_node("score_and_summarize", score_and_summarize)
    workflow.add_node("request_revision", request_revision)
    workflow.add_node("approve_code", approve_code)
    
    # 定义边
    workflow.add_edge(START, "detect_language")
    
    # 语言检测后,并发执行三个审查
    workflow.add_edge("detect_language", "security_audit")
    workflow.add_edge("detect_language", "performance_audit")
    workflow.add_edge("detect_language", "style_audit")
    
    # 三个审查完成后汇总
    workflow.add_edge("security_audit", "score_and_summarize")
    workflow.add_edge("performance_audit", "score_and_summarize")
    workflow.add_edge("style_audit", "score_and_summarize")
    
    # 条件判断
    workflow.add_conditional_edges(
        "score_and_summarize",
        check_approval,
        {
            "approve": "approve_code",
            "request_revision": "request_revision"
        }
    )
    
    workflow.add_edge("approve_code", END)
    workflow.add_edge("request_revision", END)
    
    # 生产级 PostgreSQL checkpoint
    checkpointer = PostgresSaver.from_conn_string(
        "postgresql://localhost/code_review_db"
    )
    checkpointer.setup()
    
    return workflow.compile(checkpointer=checkpointer)


# ===== 使用示例 =====

review_agent = build_code_review_agent()

test_code = """
import sqlite3

def get_user(username):
    conn = sqlite3.connect('users.db')
    cursor = conn.cursor()
    # 危险:直接拼接SQL,存在注入风险
    query = f"SELECT * FROM users WHERE username = '{username}'"
    cursor.execute(query)
    user = cursor.fetchone()
    conn.close()
    return user

def process_users():
    users = get_all_users()
    result = []
    for user in users:
        # N+1 问题:每个用户单独查询订单
        orders = get_user_orders(user['id'])
        result.append({**user, 'orders': orders})
    return result
"""

config = {"configurable": {"thread_id": "review-001"}}

result = review_agent.invoke(
    {
        "code_snippet": test_code,
        "messages": [HumanMessage(content="请审查这段代码")],
        "security_issues": [],
        "performance_issues": [],
        "style_issues": [],
        "revision_count": 0
    },
    config=config
)

print(result["messages"][-1].content)
print(f"\n综合评分:{result['overall_score']}/100")
print(f"审查结果:{'通过' if result['review_approved'] else '需要修改'}")

十、LangGraph vs 竞品对比

维度LangGraphCrewAIAutoGen直接手写
学习曲线中等中等低(但维护成本高)
灵活性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Human-in-the-loop原生支持有限支持支持需手写
持久化多种后端有限有限需手写
可视化内置
生产成熟度取决于实现
并发支持原生有限需手写
社区活跃非常活跃活跃活跃N/A

结论:如果你要在生产环境部署复杂的 Agent 系统,LangGraph 是目前最成熟的选择。如果只是快速原型,CrewAI 更简单。AutoGen 适合需要 Agent 间"自由对话"的场景。


十一、常见坑和最佳实践

坑 1:State 字段覆盖问题

# ❌ 错误:每次更新都会覆盖 messages,只保留最新一条
class BadState(TypedDict):
    messages: list  # 没有 reducer

# ✅ 正确:使用 add_messages reducer,自动追加
class GoodState(TypedDict):
    messages: Annotated[list, add_messages]

坑 2:无限循环

# ❌ 危险:没有终止条件的条件边可能死循环
def route(state):
    if not state["is_done"]:
        return "retry_node"  # 如果 retry_node 不更新 is_done,永远循环!

# ✅ 安全:添加最大重试次数限制
def route(state):
    if not state["is_done"] and state["retry_count"] < 3:
        return "retry_node"
    return "end_node"

坑 3:checkpoint 泄漏

# ❌ 同一个 thread_id 重复使用会叠加历史状态
config = {"configurable": {"thread_id": "fixed-id"}}  # 每次都用同一个

# ✅ 每个独立任务用唯一的 thread_id
import uuid
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

坑 4:节点函数不是纯函数

# ❌ 全局状态导致并发问题
counter = 0
def bad_node(state):
    global counter
    counter += 1  # 并发时会竞争
    return {"count": counter}

# ✅ 状态都放 State 里,节点是纯函数
def good_node(state):
    return {"count": state.get("count", 0) + 1}

最佳实践清单

  1. State 设计优先:在写任何节点之前,先设计好 State 结构
  2. 节点保持纯函数:不要在节点里维护全局状态
  3. 条件边要有兜底:所有条件分支都要有默认情况
  4. 生产用 PostgreSQL checkpoint:内存 Saver 不适合生产
  5. 流式输出改善用户体验:长任务一定要用 astream
  6. 为重要节点加日志:方便排查 Agent 行为

十二、总结与展望

LangGraph 的出现代表了 AI Agent 工程化的一次重要进步:

它解决的核心问题

  • 复杂 Agent 流程的可维护性(图 vs 意大利面代码)
  • Human-in-the-Loop 的生产级支持
  • 多 Agent 协作的标准化接口
  • 状态持久化和可恢复性

它的局限

  • 对于简单的单轮问答,它是过度设计
  • 学习曲线比 CrewAI 更陡
  • 图结构对动态变化的流程支持有限(节点和边是静态的)

2026 年的展望:随着 AI Agent 从实验走向大规模生产,LangGraph 这类"Agent 操作系统"会变得越来越重要。下一步值得关注的方向:

  • LangGraph + 向量数据库:长期记忆的标准解决方案
  • 跨语言 Agent 互操作:不同框架的 Agent 能互相调用
  • Agent 评估框架:自动化测试 Agent 行为的基准体系

工具只是手段,搞清楚你的业务流程,才是用好 LangGraph 的关键。


本文代码示例基于 LangGraph 0.2.x,依赖:pip install langgraph langchain langchain-openai

复制全文 生成海报 LangGraph AI Agent Python LangChain 多智能体

推荐文章

Roop是一款免费开源的AI换脸工具
2024-11-19 08:31:01 +0800 CST
如何实现生产环境代码加密
2024-11-18 14:19:35 +0800 CST
使用 `nohup` 命令的概述及案例
2024-11-18 08:18:36 +0800 CST
JavaScript设计模式:发布订阅模式
2024-11-18 01:52:39 +0800 CST
虚拟DOM渲染器的内部机制
2024-11-19 06:49:23 +0800 CST
H5抖音商城小黄车购物系统
2024-11-19 08:04:29 +0800 CST
MyLib5,一个Python中非常有用的库
2024-11-18 12:50:13 +0800 CST
纯CSS实现3D云动画效果
2024-11-18 18:48:05 +0800 CST
Python设计模式之工厂模式详解
2024-11-19 09:36:23 +0800 CST
Linux 网站访问日志分析脚本
2024-11-18 19:58:45 +0800 CST
Nginx 防止IP伪造,绕过IP限制
2025-01-15 09:44:42 +0800 CST
MySQL 日志详解
2024-11-19 02:17:30 +0800 CST
thinkphp分页扩展
2024-11-18 10:18:09 +0800 CST
Golang Select 的使用及基本实现
2024-11-18 13:48:21 +0800 CST
PHP 命令行模式后台执行指南
2025-05-14 10:05:31 +0800 CST
程序员茄子在线接单