LangGraph Human-in-the-Loop 深度实战:为 AI Agent 装上"安全阀"——从 interrupt 中断机制到四大应用模式的完全指南(2026)
当 AI Agent 开始拥有调用支付接口、删除数据库、发送邮件的能力时,我们真的能放心让它"全自动驾驶"吗?LangGraph 的 Human-in-the-Loop 机制给出了答案:在关键节点暂停,让人类介入决策。这不是对 AI 的不信任,而是工程成熟度的体现。
一、引言:为什么 AI Agent 需要"安全阀"
1.1 一个真实场景的警示
想象这样一个场景:
你开发了一款面向金融领域的 AI Agent,它能自动分析交易异常并执行风控操作。某天,Agent 检测到一笔大额转账,判定为可疑交易,自动调用了风控 API 冻结了对方账户。
结果呢?那是一笔正常的 B2B 交易,冻结操作导致客户损失了数百万的商机。客户投诉、法务介入、品牌受损——一切都在 Agent 执行那个"冻结"操作的瞬间发生。
问题出在哪?
- Agent 有决策能力,但缺乏人类判断的上下文
- 自动化追求效率,但某些场景需要"慢一点"
- 没有任何机制让人类在关键节点介入
1.2 Human-in-the-Loop 的核心价值
Human-in-the-Loop(HITL,人在环中)是一种 AI 系统设计模式,其核心思想是:
在 AI 自动处理大部分任务的同时,保留人类参与关键决策点的能力。
这不是"不信任 AI",而是承认:
- AI 有知识盲区:某些业务规则、合规要求、人情世故,AI 无法完全掌握
- 高风险操作需要兜底:支付、删除、发布等操作一旦执行就无法撤回
- 责任归属清晰:人机协作的决策链条,责任边界明确
1.3 LangGraph 的 HITL 方案
LangGraph 作为 LangChain 团队推出的 Agent 编排框架,提供了完善的人工干预机制:
- interrupt():在任意节点暂停执行
- Command(resume=...):恢复执行并传递人工输入
- Command(goto=...):动态路由到不同分支
- Checkpoint(检查点):持久化状态,支持断点续跑
这套机制让 Agent 在"全自动"和"人工接管"之间找到了平衡点。
二、核心概念:interrupt、Command 与 Checkpoint
2.1 interrupt —— 暂停执行的"开关"
interrupt 是 LangGraph 提供的函数,用于在图执行过程中暂停当前流程,等待外部输入。
from langgraph.types import interrupt
# 在工具或节点中调用
human_response = interrupt({
"query": "请确认是否执行此操作",
"status": "waiting_for_human_input"
})
# 执行会在这里暂停,等待外部恢复
print(human_response) # 这行代码在恢复后才会执行
关键特性:
- 调用后,图的执行立即暂停
- 当前状态自动保存到 Checkpoint
- 可以在暂停点注入外部数据后恢复
使用限制:
- 必须在编译图时配置
checkpointer - 只能在节点函数或工具函数内部调用
2.2 Command —— 恢复执行的"遥控器"
Command 是 LangGraph 的类型,用于恢复被中断的执行流程,有两种模式:
from langgraph.types import Command
# 模式一:resume —— 恢复执行并传递数据
command = Command(resume={"data": "人工确认执行"})
graph.stream(command, config)
# 模式二:goto —— 跳转到指定节点
command = Command(goto="cancel_payment")
graph.stream(command, config)
resume vs goto:
| 模式 | 作用 | 典型场景 |
|---|---|---|
| resume | 从中断点继续,传递数据给 interrupt() 的返回值 | 审批通过后继续原流程 |
| goto | 跳转到其他节点,覆盖预设的边 | 根据审批结果走不同分支 |
2.3 Checkpoint —— 状态持久化的基石
要支持中断和恢复,必须配置检查点:
from langgraph.checkpoint.memory import MemorySaver
# 创建内存检查点
memory = MemorySaver()
# 编译图时传入 checkpointer
graph = graph_builder.compile(checkpointer=memory)
Checkpoint 保存的是"图的完整状态快照":
- 所有 State 字段的当前值
- 当前执行到哪个节点
- 下一步要执行的节点(next)
- 父检查点 ID(形成版本链)
- 时间戳和元数据
为什么必须配置 Checkpoint?
当 interrupt() 被调用时,图需要将当前状态序列化保存。没有 Checkpointer,就无法恢复执行。
生产环境推荐的 Checkpointer:
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
# PostgreSQL 持久化(生产推荐)
pool = ConnectionPool(conninfo="postgresql://user:pass@localhost/langgraph")
checkpointer = PostgresSaver(pool)
graph = graph_builder.compile(checkpointer=checkpointer)
三、四种典型应用模式
3.1 模式一:批准或拒绝(Approve / Reject)
典型场景:
- 财务 Agent 检测可疑交易,需人工确认是否放行
- 运维 Agent 准备删除生产数据,需人工审批
- 内容发布 Agent 准备推送公告,需人工审核
实现思路:
- 在关键操作前插入审批节点
- 调用
interrupt()展示操作信息 - 根据人工输入,路由到"执行"或"取消"分支
完整代码示例:
from typing import TypedDict, Literal, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
# 定义状态
class ApprovalState(TypedDict):
transaction_id: str
amount: float
recipient: str
status: Optional[Literal["pending", "approved", "rejected"]]
reason: Optional[str]
# 审批节点
def approval_node(state: ApprovalState):
"""请求人工审批"""
decision = interrupt({
"question": "是否批准以下交易?",
"transaction": {
"id": state["transaction_id"],
"amount": state["amount"],
"recipient": state["recipient"]
},
"options": "请输入 '批准' 或 '拒绝',拒绝时可附上原因"
})
# 根据人工输入决定路由
if decision.get("action") == "批准":
return Command(goto="execute_payment")
else:
return {
"status": "rejected",
"reason": decision.get("reason", "人工拒绝")
}
# 执行支付节点
def execute_payment(state: ApprovalState):
"""实际执行支付"""
print(f"✅ 交易 {state['transaction_id']} 已执行")
print(f" 金额: {state['amount']} 元")
print(f" 收款方: {state['recipient']}")
return {"status": "approved"}
# 取消支付节点
def cancel_payment(state: ApprovalState):
"""取消交易"""
print(f"❌ 交易 {state['transaction_id']} 已取消")
print(f" 原因: {state.get('reason', '人工拒绝')}")
return {"status": "rejected"}
# 构建图
builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("execute_payment", execute_payment)
builder.add_node("cancel_payment", cancel_payment)
# 添加边
builder.add_edge(START, "approval")
builder.add_edge("approval", "cancel_payment") # 默认走取消(被 reject 时)
builder.add_edge("execute_payment", END)
builder.add_edge("cancel_payment", END)
# 编译
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# 执行 —— 会在 approval 节点暂停
config = {"configurable": {"thread_id": "tx-001"}}
initial_state = {
"transaction_id": "TX-20260601001",
"amount": 50000.0,
"recipient": "张三"
}
for event in graph.stream(initial_state, config):
print(event)
# 此时图已暂停,等待人工输入
# 人工输入"批准"后恢复
from langgraph.types import Command
# 方式一:批准
graph.invoke(Command(resume={"action": "批准"}), config=config)
# 方式二:拒绝并附原因
# graph.invoke(Command(resume={"action": "拒绝", "reason": "收款方信息存疑"}), config=config)
运行效果:
✅ 交易 TX-20260601001 已执行
金额: 50000.0 元
收款方: 张三
3.2 模式二:查看和编辑状态(Review & Edit State)
典型场景:
- AI 生成营销邮件草稿,人工修改后确认发送
- AI 撰写报告,人工润色后发布
- AI 生成代码,人工审查后合并
实现思路:
- AI 先生成内容
- 在审查节点展示内容,允许人工修改
- 修改后的状态覆盖原值,继续后续流程
完整代码示例:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
class DraftState(TypedDict):
subject: str
body: str
recipient: str
edited: bool
def generate_draft(state: DraftState):
"""AI 生成邮件草稿"""
# 模拟 LLM 生成
llm = ChatOpenAI(model="gpt-4")
return {
"subject": "限时优惠:全场商品8折",
"body": """亲爱的用户:
本周末全场商品8折优惠,快来抢购吧!
活动时间:6月1日-6月2日
祝好,
营销团队""",
"edited": False
}
def review_draft(state: DraftState):
"""人工审查并修改"""
updated = interrupt({
"instruction": "请审查以下邮件草稿,可修改后确认发送",
"current_draft": {
"subject": state["subject"],
"body": state["body"],
"recipient": state["recipient"]
},
"tips": "返回格式:{\"subject\": \"...\", \"body\": \"...\"},或直接返回 '确认' 保持原内容"
})
# 处理人工输入
if isinstance(updated, dict):
return {
"subject": updated.get("subject", state["subject"]),
"body": updated.get("body", state["body"]),
"edited": True
}
else:
# 用户确认,保持原内容
return {"edited": False}
def send_email(state: DraftState):
"""发送邮件"""
print("=" * 50)
print("📧 邮件已发送")
print(f" 收件人: {state['recipient']}")
print(f" 主题: {state['subject']}")
print(f" 是否经过编辑: {'是' if state['edited'] else '否'}")
print("=" * 50)
return state
# 构建图
builder = StateGraph(DraftState)
builder.add_node("generate", generate_draft)
builder.add_node("review", review_draft)
builder.add_node("send", send_email)
builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
builder.add_edge("review", "send")
builder.add_edge("send", END)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# 执行
config = {"configurable": {"thread_id": "email-001"}}
initial = {"recipient": "customer@example.com"}
for event in graph.stream(initial, config):
pass
# 此时暂停在 review 节点,人工可以修改内容
edited_content = {
"subject": "【紧急】全场8折仅剩2天!错过再等一年",
"body": """亲爱的用户:
⏰ 折扣倒计时!全场商品8折优惠即将结束!
活动时间:6月1日-6月2日(仅剩2天!)
优惠码:SUMMER2026
👉 立即抢购:https://shop.example.com
祝好,
营销团队"""
}
graph.invoke(Command(resume=edited_content), config=config)
运行效果:
==================================================
📧 邮件已发送
收件人: customer@example.com
主题: 【紧急】全场8折仅剩2天!错过再等一年
是否经过编辑: 是
==================================================
3.3 模式三:工具中断(Tool Interrupts)
典型场景:
- Agent 调用"发送邮件"工具前,需人工确认收件人
- Agent 调用"删除文件"工具前,需人工确认文件列表
- Agent 调用"支付"工具前,需人工确认金额
实现思路:
直接在工具函数内部调用 interrupt(),当 Agent 执行到该工具时自动暂停。
完整代码示例:
from langchain_core.tools import tool
from langgraph.types import interrupt
@tool
def send_email_tool(to: str, subject: str, body: str) -> str:
"""发送邮件给指定收件人
Args:
to: 收件人邮箱
subject: 邮件主题
body: 邮件正文
"""
# 在真正发送前,请求人工确认
confirmation = interrupt({
"action": "即将发送邮件",
"details": {
"to": to,
"subject": subject,
"body_preview": body[:100] + ("..." if len(body) > 100 else ""),
"body_length": len(body)
},
"instruction": "请确认发送,或返回修改后的参数",
"options": [
"输入 '发送' 确认发送",
"或返回 JSON 格式修改参数:{\"to\": \"...\", \"subject\": \"...\", \"body\": \"...\"}"
]
})
# 处理人工输入
if isinstance(confirmation, dict):
to = confirmation.get("to", to)
subject = confirmation.get("subject", subject)
body = confirmation.get("body", body)
elif confirmation == "发送":
pass # 使用原参数
else:
return "用户取消了邮件发送"
# 实际发送逻辑(这里模拟)
print(f"📧 邮件已发送至 {to}")
print(f" 主题: {subject}")
return f"邮件已成功发送至 {to}"
@tool
def delete_files_tool(paths: list[str], force: bool = False) -> str:
"""删除指定文件
Args:
paths: 文件路径列表
force: 是否强制删除(不进回收站)
"""
# 危险操作,必须人工确认
confirmation = interrupt({
"action": "⚠️ 即将删除文件",
"warning": "此操作不可恢复!" if force else "文件将移入回收站",
"details": {
"files": paths,
"count": len(paths),
"force": force
},
"instruction": "请确认删除操作"
})
if confirmation == "确认删除":
# 模拟删除
for path in paths:
print(f"🗑️ 已删除: {path}")
return f"已删除 {len(paths)} 个文件"
else:
return "用户取消了删除操作"
# 完整的 Agent 示例
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
messages: Annotated[list, add_messages]
def create_agent():
llm = ChatOpenAI(model="gpt-4", temperature=0)
tools = [send_email_tool, delete_files_tool]
llm_with_tools = llm.bind_tools(tools)
def chatbot(state: State):
message = llm_with_tools.invoke(state["messages"])
# 禁用并行工具调用,避免恢复时重复执行
assert len(message.tool_calls) <= 1
return {"messages": [message]}
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_node("tools", ToolNode(tools=tools))
builder.add_edge(START, "chatbot")
builder.add_conditional_edges("chatbot", tools_condition)
builder.add_edge("tools", "chatbot")
memory = MemorySaver()
return builder.compile(checkpointer=memory)
# 使用示例
agent = create_agent()
config = {"configurable": {"thread_id": "agent-001"}}
# 用户请求
agent.invoke({"messages": [{"role": "user", "content": "帮我发送一封邮件给 boss@company.com,主题是周报,内容是本周完成了所有任务"}]}, config)
# 图会在 send_email_tool 内部暂停
# 人工确认后恢复
agent.invoke(Command(resume="发送"), config)
3.4 模式四:验证人类输入(Validating Human Input)
典型场景:
- 收集用户年龄,要求必须是正整数
- 收集用户邮箱,要求格式正确
- 收集用户手机号,要求符合规范
实现思路:
在一个循环内反复调用 interrupt(),每次检查输入是否合法,不合法则重新提示。
完整代码示例:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
import re
class FormState(TypedDict):
name: str
age: int | None
email: str
phone: str
validation_errors: list[str]
def collect_name(state: FormState):
"""收集姓名"""
name = interrupt({
"field": "name",
"prompt": "请输入您的姓名"
})
return {"name": name}
def collect_age(state: FormState):
"""收集年龄(带验证)"""
prompt = "请输入您的年龄(必须是正整数)"
retries = 0
max_retries = 3
while retries < max_retries:
answer = interrupt({
"field": "age",
"prompt": prompt
})
# 验证输入
try:
age = int(answer)
if age > 0 and age < 150:
return {"age": age}
else:
prompt = f"年龄 {age} 不在有效范围内(1-149),请重新输入"
except (ValueError, TypeError):
prompt = f"'{answer}' 不是有效的数字,请输入正整数"
retries += 1
# 超过重试次数,设置默认值
return {"age": 18, "validation_errors": ["年龄输入无效,已设置默认值 18"]}
def collect_email(state: FormState):
"""收集邮箱(带格式验证)"""
prompt = "请输入您的邮箱地址"
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
while True:
answer = interrupt({
"field": "email",
"prompt": prompt
})
if re.match(email_pattern, answer):
return {"email": answer}
else:
prompt = f"'{answer}' 不是有效的邮箱格式,请重新输入"
def collect_phone(state: FormState):
"""收集手机号(中国大陆格式)"""
prompt = "请输入您的手机号(11位数字)"
phone_pattern = r'^1[3-9]\d{9}$'
while True:
answer = interrupt({
"field": "phone",
"prompt": prompt
})
# 移除可能的前缀和空格
phone = answer.replace(" ", "").replace("-", "")
if re.match(phone_pattern, phone):
return {"phone": phone}
else:
prompt = f"'{answer}' 不是有效的手机号格式,请输入11位手机号"
def summarize(state: FormState):
"""汇总信息"""
print("\n" + "=" * 50)
print("📋 表单收集完成")
print("=" * 50)
print(f"姓名: {state['name']}")
print(f"年龄: {state['age']}")
print(f"邮箱: {state['email']}")
print(f"手机: {state['phone']}")
if state.get("validation_errors"):
print("\n⚠️ 验证警告:")
for err in state["validation_errors"]:
print(f" - {err}")
print("=" * 50)
return state
# 构建图
builder = StateGraph(FormState)
builder.add_node("collect_name", collect_name)
builder.add_node("collect_age", collect_age)
builder.add_node("collect_email", collect_email)
builder.add_node("collect_phone", collect_phone)
builder.add_node("summarize", summarize)
builder.add_edge(START, "collect_name")
builder.add_edge("collect_name", "collect_age")
builder.add_edge("collect_age", "collect_email")
builder.add_edge("collect_email", "collect_phone")
builder.add_edge("collect_phone", "summarize")
builder.add_edge("summarize", END)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# 执行流程(需要多次恢复)
config = {"configurable": {"thread_id": "form-001"}}
# 启动
graph.invoke({}, config)
# 用户输入姓名
graph.invoke(Command(resume="张三"), config)
# 用户输入年龄(故意输入错误值测试)
graph.invoke(Command(resume="abc"), config) # 第一次错误
graph.invoke(Command(resume="-10"), config) # 第二次错误
graph.invoke(Command(resume="25"), config) # 正确
# 用户输入邮箱
graph.invoke(Command(resume="invalid-email"), config) # 错误格式
graph.invoke(Command(resume="zhangsan@example.com"), config) # 正确
# 用户输入手机号
graph.invoke(Command(resume="12345"), config) # 错误
graph.invoke(Command(resume="13812345678"), config) # 正确
四、Checkpoint 机制深度解析
4.1 Checkpoint 保存的是什么
很多人误以为 Checkpoint 只是"保存对话历史",这是第一个认知偏差。
Checkpoint 保存的是 Graph 在某一执行步骤的完整状态快照,包括:
- 所有 Channel 的当前值:State 里每个字段的值
- 当前执行到哪个节点:next 字段
- 父检查点 ID:形成版本链
- 时间戳和元数据:写入时间、来源等
- 待执行任务:tasks 字段
# 获取当前状态的快照
snapshot = graph.get_state(config)
print(snapshot.values) # 当前 State 的所有字段值
print(snapshot.next) # 下一步要执行的节点
print(snapshot.config) # thread_id 和 checkpoint_id
print(snapshot.metadata) # 元数据
4.2 Super-Step 执行模型
LangGraph 采用"超步"(Super-Step)执行模型:
[读取上一个 Checkpoint]
↓
[执行当前节点,更新 State]
↓
[写入新 Checkpoint(快照)]
↓
[决定下一步:继续 / 等待 / 结束]
每个节点执行完成后,都会自动保存一个 Checkpoint。
4.3 时间旅行:回到历史状态
由于 Checkpoint 形成了版本链,我们可以"穿越"到任意历史状态:
# 获取所有历史状态
history = list(graph.get_state_history(config))
# 回到某个历史状态
past_state = history[-3] # 往前数第3个状态
graph.invoke(None, config={**config, "checkpoint_id": past_state.config["configurable"]["checkpoint_id"]})
4.4 生产环境的 Checkpointer 选择
| Checkpointer | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| MemorySaver | 开发测试 | 零配置、速度快 | 重启丢失、不支持多实例 |
| SqliteSaver | 单机生产 | 持久化、简单 | 单文件锁、不支持高并发 |
| PostgresSaver | 生产环境 | 高并发、事务支持 | 需要数据库、配置复杂 |
| RedisSaver | 高性能场景 | 极快速度 | 内存成本高、需要 Redis |
PostgresSaver 生产配置示例:
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
# 连接池配置
pool = ConnectionPool(
conninfo="postgresql://langgraph:password@localhost:5432/langgraph_db",
min_size=5,
max_size=20
)
# 创建检查点存储
checkpointer = PostgresSaver(pool)
# 首次使用需要初始化表
# checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer)
五、最佳实践与踩坑指南
5.1 必须配置 Checkpointer
错误:
graph = builder.compile() # 没有 checkpointer
# 调用 interrupt() 时会报错
正确:
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
5.2 interrupt() 只能在节点或工具中调用
错误:
def some_helper_function():
return interrupt("等待输入") # 不是节点或工具
def my_node(state):
some_helper_function() # 会报错
正确:
def my_node(state):
return interrupt("等待输入") # 直接在节点中调用
5.3 禁用并行工具调用
当使用人工干预时,需要禁用并行工具调用,避免恢复时重复执行:
def chatbot(state: State):
message = llm_with_tools.invoke(state["messages"])
# 确保最多只有一个工具调用
assert len(message.tool_calls) <= 1, "请禁用并行工具调用"
return {"messages": [message]}
5.4 恢复时使用相同的 thread_id
# 暂停时的配置
config = {"configurable": {"thread_id": "session-001"}}
# 恢复时必须使用相同的配置
graph.invoke(Command(resume="确认"), config=config) # ✅ 正确
# 错误的恢复方式
graph.invoke(Command(resume="确认"), config={"configurable": {"thread_id": "session-002"}}) # ❌ 错误
5.5 检查中断状态的时机
错误:
for event in graph.stream(input, config):
pass
snapshot = graph.get_state(config)
if snapshot.next:
print("已中断") # 可能在 stream 完成前就检查了
正确:
for event in graph.stream(input, config):
# 处理事件
pass
# stream 完成后再检查
snapshot = graph.get_state(config)
if snapshot.next:
print(f"已中断,等待节点: {snapshot.next}")
5.6 设计合理的中断提示
# ❌ 不好的设计:信息不足
interrupt("确认吗?")
# ✅ 好的设计:信息完整、选项清晰
interrupt({
"action": "即将执行删除操作",
"target": f"文件列表:{files}",
"warning": "此操作不可恢复!",
"options": ["输入 '确认删除' 继续", "输入 '取消' 放弃"]
})
5.7 设置超时机制
避免用户无限等待:
from datetime import datetime, timedelta
def approval_with_timeout(state):
start_time = datetime.now()
timeout = timedelta(hours=24) # 24小时超时
while datetime.now() - start_time < timeout:
decision = interrupt({
"action": "等待审批",
"timeout": str(timeout - (datetime.now() - start_time))
})
if decision:
return {"approved": decision == "approve"}
# 超时后默认拒绝
return {"approved": False, "timeout": True}
六、完整实战案例:金融风控 Agent
6.1 场景描述
构建一个金融风控 Agent,当检测到大额交易时:
- 自动分析交易风险
- 高风险交易请求人工审批
- 审批通过后执行,否则拒绝
6.2 完整代码
"""
金融风控 Agent - Human-in-the-Loop 实战案例
"""
from typing import TypedDict, Literal, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
import json
# ============== 状态定义 ==============
class RiskAnalysis(BaseModel):
risk_level: Literal["low", "medium", "high", "critical"]
risk_factors: list[str]
recommendation: str
confidence: float
class TransactionState(TypedDict):
transaction_id: str
amount: float
sender: str
receiver: str
timestamp: str
risk_analysis: Optional[RiskAnalysis]
human_decision: Optional[Literal["approve", "reject"]]
execution_result: Optional[str]
# ============== 节点定义 ==============
def analyze_risk(state: TransactionState):
"""AI 分析交易风险"""
llm = ChatOpenAI(model="gpt-4", temperature=0)
# 模拟风险分析(实际应用中调用风控模型)
prompt = f"""
分析以下交易的风险等级:
交易ID: {state['transaction_id']}
金额: {state['amount']}
发送方: {state['sender']}
接收方: {state['receiver']}
时间: {state['timestamp']}
请返回 JSON 格式:
{{
"risk_level": "low/medium/high/critical",
"risk_factors": ["因子1", "因子2"],
"recommendation": "建议",
"confidence": 0.0-1.0
}}
"""
# 这里简化处理,实际应该调用 LLM
if state['amount'] > 100000:
risk = RiskAnalysis(
risk_level="high",
risk_factors=["大额交易", "首次交易对手"],
recommendation="建议人工审核",
confidence=0.85
)
elif state['amount'] > 50000:
risk = RiskAnalysis(
risk_level="medium",
risk_factors=["金额较大"],
recommendation="可自动放行或人工确认",
confidence=0.75
)
else:
risk = RiskAnalysis(
risk_level="low",
risk_factors=[],
recommendation="自动放行",
confidence=0.95
)
return {"risk_analysis": risk.model_dump()}
def route_by_risk(state: TransactionState):
"""根据风险等级路由"""
risk = state["risk_analysis"]
if risk["risk_level"] in ["high", "critical"]:
return "request_approval"
elif risk["risk_level"] == "medium":
return "request_approval" # 中等风险也请求人工确认
else:
return "auto_approve"
def request_approval(state: TransactionState):
"""请求人工审批"""
risk = state["risk_analysis"]
decision = interrupt({
"type": "approval_request",
"transaction": {
"id": state["transaction_id"],
"amount": state["amount"],
"sender": state["sender"],
"receiver": state["receiver"]
},
"risk_analysis": risk,
"instruction": "请审核以上交易",
"options": ["approve - 批准交易", "reject - 拒绝交易"]
})
return {"human_decision": decision}
def auto_approve(state: TransactionState):
"""自动批准(低风险)"""
return {"human_decision": "approve"}
def execute_transaction(state: TransactionState):
"""执行交易"""
print(f"\n✅ 交易 {state['transaction_id']} 已执行")
print(f" 金额: ¥{state['amount']:,.2f}")
print(f" {state['sender']} → {state['receiver']}")
return {"execution_result": "success"}
def reject_transaction(state: TransactionState):
"""拒绝交易"""
print(f"\n❌ 交易 {state['transaction_id']} 已拒绝")
print(f" 原因: 人工审核未通过")
return {"execution_result": "rejected"}
def log_result(state: TransactionState):
"""记录日志"""
print(f"\n📋 交易处理完成")
print(f" ID: {state['transaction_id']}")
print(f" 结果: {state['execution_result']}")
print(f" 风险等级: {state['risk_analysis']['risk_level']}")
if state.get('human_decision'):
print(f" 人工决策: {state['human_decision']}")
return state
# ============== 构建图 ==============
builder = StateGraph(TransactionState)
# 添加节点
builder.add_node("analyze_risk", analyze_risk)
builder.add_node("request_approval", request_approval)
builder.add_node("auto_approve", auto_approve)
builder.add_node("execute", execute_transaction)
builder.add_node("reject", reject_transaction)
builder.add_node("log", log_result)
# 添加边
builder.add_edge(START, "analyze_risk")
builder.add_conditional_edges("analyze_risk", route_by_risk)
builder.add_edge("request_approval", "execute") # 需要动态路由
builder.add_edge("auto_approve", "execute")
builder.add_edge("execute", "log")
builder.add_edge("reject", "log")
builder.add_edge("log", END)
# ============== 动态路由逻辑 ==============
def approval_router(state: TransactionState):
"""审批结果路由"""
if state["human_decision"] == "approve":
return "execute"
else:
return "reject"
# 修改 request_approval 节点的输出
def request_approval_with_route(state: TransactionState):
decision = interrupt({...})
if decision == "approve":
return Command(goto="execute")
else:
return Command(goto="reject")
# ============== 编译与运行 ==============
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# 测试用例
def test_transaction():
config = {"configurable": {"thread_id": "tx-test-001"}}
# 模拟一笔大额交易
transaction = {
"transaction_id": "TX-20260601001",
"amount": 150000.0,
"sender": "账户A",
"receiver": "账户B",
"timestamp": "2026-06-01T10:00:00Z"
}
# 执行分析
for event in graph.stream(transaction, config):
print(f"节点输出: {event}")
# 此时图在 request_approval 节点暂停
snapshot = graph.get_state(config)
print(f"\n⏸️ 等待人工审批...")
print(f"风险分析: {snapshot.values['risk_analysis']}")
# 模拟人工审批
print("\n模拟人工审批:批准")
graph.invoke(Command(resume="approve"), config)
if __name__ == "__main__":
test_transaction()
6.3 运行效果
节点输出: {'analyze_risk': {'risk_analysis': {...}}}
⏸️ 等待人工审批...
风险分析: {'risk_level': 'high', 'risk_factors': ['大额交易', '首次交易对手'], 'recommendation': '建议人工审核', 'confidence': 0.85}
模拟人工审批:批准
✅ 交易 TX-20260601001 已执行
金额: ¥150,000.00
账户A → 账户B
📋 交易处理完成
ID: TX-20260601001
结果: success
风险等级: high
人工决策: approve
七、总结与展望
7.1 四种模式总结
| 模式 | 核心能力 | 关键机制 | 典型场景 |
|---|---|---|---|
| 批准/拒绝 | 关键操作前人工决策 | interrupt + Command(resume) | 转账、删除、发布 |
| 查看/编辑 | 允许修改中间状态 | interrupt + 状态更新 | 邮件草稿、报告生成 |
| 工具中断 | 工具调用前插入干预 | 工具内部 interrupt | 发送消息、调用 API |
| 输入验证 | 循环获取并校验输入 | interrupt + 循环校验 | 表单填写、参数配置 |
7.2 Human-in-the-Loop 的价值
- 安全兜底:高风险操作有人类把关
- 质量保障:AI 生成内容由人工审核
- 责任清晰:人机协作的决策链条可追溯
- 用户体验:用户可以在流程中参与决策
7.3 未来展望
随着 AI Agent 能力的增强,Human-in-the-Loop 机制会更加重要:
- 自适应中断:AI 自动判断何时需要人工介入
- 多人协作:支持多级审批、会签等复杂流程
- 审计追踪:完整的人机交互记录,满足合规要求
- 智能提示:AI 分析风险后给出审批建议
一句话总结:interrupt() 保存现场,Command(resume=...) 恢复执行,Human-in-the-Loop 让 AI Agent 既高效又可靠。
相关阅读:
- LangGraph 状态管理:如何设计高效的状态流
- LangGraph 工具使用:让 AI 具备行动能力
- LangGraph 持久化完全指南:Checkpoint 机制深度解析
本文通过 LangGraph 的 Human-in-the-Loop 机制,展示了如何在 AI Agent 中实现人机协作。四种应用模式覆盖了绝大多数业务场景,从审批流程到输入验证,从工具调用到内容审核。这套机制让复杂任务既自动化又可靠,是生产级 AI Agent 的必备能力。