编程 LangGraph Human-in-the-Loop 深度实战:为 AI Agent 装上安全阀——从 interrupt 中断机制到四大应用模式的完全指南(2026)

2026-06-01 17:25:07 +0800 CST views 16

LangGraph Human-in-the-Loop 深度实战:为 AI Agent 装上"安全阀"——从 interrupt 中断机制到四大应用模式的完全指南(2026)

当 AI Agent 开始拥有调用支付接口、删除数据库、发送邮件的能力时,我们真的能放心让它"全自动驾驶"吗?LangGraph 的 Human-in-the-Loop 机制给出了答案:在关键节点暂停,让人类介入决策。这不是对 AI 的不信任,而是工程成熟度的体现。


一、引言:为什么 AI Agent 需要"安全阀"

1.1 一个真实场景的警示

想象这样一个场景:

你开发了一款面向金融领域的 AI Agent,它能自动分析交易异常并执行风控操作。某天,Agent 检测到一笔大额转账,判定为可疑交易,自动调用了风控 API 冻结了对方账户。

结果呢?那是一笔正常的 B2B 交易,冻结操作导致客户损失了数百万的商机。客户投诉、法务介入、品牌受损——一切都在 Agent 执行那个"冻结"操作的瞬间发生。

问题出在哪?

  • Agent 有决策能力,但缺乏人类判断的上下文
  • 自动化追求效率,但某些场景需要"慢一点"
  • 没有任何机制让人类在关键节点介入

1.2 Human-in-the-Loop 的核心价值

Human-in-the-Loop(HITL,人在环中)是一种 AI 系统设计模式,其核心思想是:

在 AI 自动处理大部分任务的同时,保留人类参与关键决策点的能力。

这不是"不信任 AI",而是承认:

  1. AI 有知识盲区:某些业务规则、合规要求、人情世故,AI 无法完全掌握
  2. 高风险操作需要兜底:支付、删除、发布等操作一旦执行就无法撤回
  3. 责任归属清晰:人机协作的决策链条,责任边界明确

1.3 LangGraph 的 HITL 方案

LangGraph 作为 LangChain 团队推出的 Agent 编排框架,提供了完善的人工干预机制:

  • interrupt():在任意节点暂停执行
  • Command(resume=...):恢复执行并传递人工输入
  • Command(goto=...):动态路由到不同分支
  • Checkpoint(检查点):持久化状态,支持断点续跑

这套机制让 Agent 在"全自动"和"人工接管"之间找到了平衡点。


二、核心概念:interrupt、Command 与 Checkpoint

2.1 interrupt —— 暂停执行的"开关"

interrupt 是 LangGraph 提供的函数,用于在图执行过程中暂停当前流程,等待外部输入。

from langgraph.types import interrupt

# 在工具或节点中调用
human_response = interrupt({
    "query": "请确认是否执行此操作",
    "status": "waiting_for_human_input"
})
# 执行会在这里暂停,等待外部恢复
print(human_response)  # 这行代码在恢复后才会执行

关键特性:

  • 调用后,图的执行立即暂停
  • 当前状态自动保存到 Checkpoint
  • 可以在暂停点注入外部数据后恢复

使用限制:

  • 必须在编译图时配置 checkpointer
  • 只能在节点函数或工具函数内部调用

2.2 Command —— 恢复执行的"遥控器"

Command 是 LangGraph 的类型,用于恢复被中断的执行流程,有两种模式:

from langgraph.types import Command

# 模式一:resume —— 恢复执行并传递数据
command = Command(resume={"data": "人工确认执行"})
graph.stream(command, config)

# 模式二:goto —— 跳转到指定节点
command = Command(goto="cancel_payment")
graph.stream(command, config)

resume vs goto:

模式作用典型场景
resume从中断点继续,传递数据给 interrupt() 的返回值审批通过后继续原流程
goto跳转到其他节点,覆盖预设的边根据审批结果走不同分支

2.3 Checkpoint —— 状态持久化的基石

要支持中断和恢复,必须配置检查点:

from langgraph.checkpoint.memory import MemorySaver

# 创建内存检查点
memory = MemorySaver()

# 编译图时传入 checkpointer
graph = graph_builder.compile(checkpointer=memory)

Checkpoint 保存的是"图的完整状态快照":

  • 所有 State 字段的当前值
  • 当前执行到哪个节点
  • 下一步要执行的节点(next)
  • 父检查点 ID(形成版本链)
  • 时间戳和元数据

为什么必须配置 Checkpoint?

interrupt() 被调用时,图需要将当前状态序列化保存。没有 Checkpointer,就无法恢复执行。

生产环境推荐的 Checkpointer:

from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool

# PostgreSQL 持久化(生产推荐)
pool = ConnectionPool(conninfo="postgresql://user:pass@localhost/langgraph")
checkpointer = PostgresSaver(pool)

graph = graph_builder.compile(checkpointer=checkpointer)

三、四种典型应用模式

3.1 模式一:批准或拒绝(Approve / Reject)

典型场景:

  • 财务 Agent 检测可疑交易,需人工确认是否放行
  • 运维 Agent 准备删除生产数据,需人工审批
  • 内容发布 Agent 准备推送公告,需人工审核

实现思路:

  1. 在关键操作前插入审批节点
  2. 调用 interrupt() 展示操作信息
  3. 根据人工输入,路由到"执行"或"取消"分支

完整代码示例:

from typing import TypedDict, Literal, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver

# 定义状态
class ApprovalState(TypedDict):
    transaction_id: str
    amount: float
    recipient: str
    status: Optional[Literal["pending", "approved", "rejected"]]
    reason: Optional[str]

# 审批节点
def approval_node(state: ApprovalState):
    """请求人工审批"""
    decision = interrupt({
        "question": "是否批准以下交易?",
        "transaction": {
            "id": state["transaction_id"],
            "amount": state["amount"],
            "recipient": state["recipient"]
        },
        "options": "请输入 '批准' 或 '拒绝',拒绝时可附上原因"
    })
    
    # 根据人工输入决定路由
    if decision.get("action") == "批准":
        return Command(goto="execute_payment")
    else:
        return {
            "status": "rejected",
            "reason": decision.get("reason", "人工拒绝")
        }

# 执行支付节点
def execute_payment(state: ApprovalState):
    """实际执行支付"""
    print(f"✅ 交易 {state['transaction_id']} 已执行")
    print(f"   金额: {state['amount']} 元")
    print(f"   收款方: {state['recipient']}")
    return {"status": "approved"}

# 取消支付节点
def cancel_payment(state: ApprovalState):
    """取消交易"""
    print(f"❌ 交易 {state['transaction_id']} 已取消")
    print(f"   原因: {state.get('reason', '人工拒绝')}")
    return {"status": "rejected"}

# 构建图
builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("execute_payment", execute_payment)
builder.add_node("cancel_payment", cancel_payment)

# 添加边
builder.add_edge(START, "approval")
builder.add_edge("approval", "cancel_payment")  # 默认走取消(被 reject 时)
builder.add_edge("execute_payment", END)
builder.add_edge("cancel_payment", END)

# 编译
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行 —— 会在 approval 节点暂停
config = {"configurable": {"thread_id": "tx-001"}}
initial_state = {
    "transaction_id": "TX-20260601001",
    "amount": 50000.0,
    "recipient": "张三"
}

for event in graph.stream(initial_state, config):
    print(event)

# 此时图已暂停,等待人工输入
# 人工输入"批准"后恢复
from langgraph.types import Command

# 方式一:批准
graph.invoke(Command(resume={"action": "批准"}), config=config)

# 方式二:拒绝并附原因
# graph.invoke(Command(resume={"action": "拒绝", "reason": "收款方信息存疑"}), config=config)

运行效果:

✅ 交易 TX-20260601001 已执行
   金额: 50000.0 元
   收款方: 张三

3.2 模式二:查看和编辑状态(Review & Edit State)

典型场景:

  • AI 生成营销邮件草稿,人工修改后确认发送
  • AI 撰写报告,人工润色后发布
  • AI 生成代码,人工审查后合并

实现思路:

  1. AI 先生成内容
  2. 在审查节点展示内容,允许人工修改
  3. 修改后的状态覆盖原值,继续后续流程

完整代码示例:

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI

class DraftState(TypedDict):
    subject: str
    body: str
    recipient: str
    edited: bool

def generate_draft(state: DraftState):
    """AI 生成邮件草稿"""
    # 模拟 LLM 生成
    llm = ChatOpenAI(model="gpt-4")
    
    return {
        "subject": "限时优惠:全场商品8折",
        "body": """亲爱的用户:

本周末全场商品8折优惠,快来抢购吧!

活动时间:6月1日-6月2日

祝好,
营销团队""",
        "edited": False
    }

def review_draft(state: DraftState):
    """人工审查并修改"""
    updated = interrupt({
        "instruction": "请审查以下邮件草稿,可修改后确认发送",
        "current_draft": {
            "subject": state["subject"],
            "body": state["body"],
            "recipient": state["recipient"]
        },
        "tips": "返回格式:{\"subject\": \"...\", \"body\": \"...\"},或直接返回 '确认' 保持原内容"
    })
    
    # 处理人工输入
    if isinstance(updated, dict):
        return {
            "subject": updated.get("subject", state["subject"]),
            "body": updated.get("body", state["body"]),
            "edited": True
        }
    else:
        # 用户确认,保持原内容
        return {"edited": False}

def send_email(state: DraftState):
    """发送邮件"""
    print("=" * 50)
    print("📧 邮件已发送")
    print(f"   收件人: {state['recipient']}")
    print(f"   主题: {state['subject']}")
    print(f"   是否经过编辑: {'是' if state['edited'] else '否'}")
    print("=" * 50)
    return state

# 构建图
builder = StateGraph(DraftState)
builder.add_node("generate", generate_draft)
builder.add_node("review", review_draft)
builder.add_node("send", send_email)

builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
builder.add_edge("review", "send")
builder.add_edge("send", END)

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行
config = {"configurable": {"thread_id": "email-001"}}
initial = {"recipient": "customer@example.com"}

for event in graph.stream(initial, config):
    pass

# 此时暂停在 review 节点,人工可以修改内容
edited_content = {
    "subject": "【紧急】全场8折仅剩2天!错过再等一年",
    "body": """亲爱的用户:

⏰ 折扣倒计时!全场商品8折优惠即将结束!

活动时间:6月1日-6月2日(仅剩2天!)
优惠码:SUMMER2026

👉 立即抢购:https://shop.example.com

祝好,
营销团队"""
}

graph.invoke(Command(resume=edited_content), config=config)

运行效果:

==================================================
📧 邮件已发送
   收件人: customer@example.com
   主题: 【紧急】全场8折仅剩2天!错过再等一年
   是否经过编辑: 是
==================================================

3.3 模式三:工具中断(Tool Interrupts)

典型场景:

  • Agent 调用"发送邮件"工具前,需人工确认收件人
  • Agent 调用"删除文件"工具前,需人工确认文件列表
  • Agent 调用"支付"工具前,需人工确认金额

实现思路:

直接在工具函数内部调用 interrupt(),当 Agent 执行到该工具时自动暂停。

完整代码示例:

from langchain_core.tools import tool
from langgraph.types import interrupt

@tool
def send_email_tool(to: str, subject: str, body: str) -> str:
    """发送邮件给指定收件人
    
    Args:
        to: 收件人邮箱
        subject: 邮件主题
        body: 邮件正文
    """
    # 在真正发送前,请求人工确认
    confirmation = interrupt({
        "action": "即将发送邮件",
        "details": {
            "to": to,
            "subject": subject,
            "body_preview": body[:100] + ("..." if len(body) > 100 else ""),
            "body_length": len(body)
        },
        "instruction": "请确认发送,或返回修改后的参数",
        "options": [
            "输入 '发送' 确认发送",
            "或返回 JSON 格式修改参数:{\"to\": \"...\", \"subject\": \"...\", \"body\": \"...\"}"
        ]
    })
    
    # 处理人工输入
    if isinstance(confirmation, dict):
        to = confirmation.get("to", to)
        subject = confirmation.get("subject", subject)
        body = confirmation.get("body", body)
    elif confirmation == "发送":
        pass  # 使用原参数
    else:
        return "用户取消了邮件发送"
    
    # 实际发送逻辑(这里模拟)
    print(f"📧 邮件已发送至 {to}")
    print(f"   主题: {subject}")
    
    return f"邮件已成功发送至 {to}"

@tool
def delete_files_tool(paths: list[str], force: bool = False) -> str:
    """删除指定文件
    
    Args:
        paths: 文件路径列表
        force: 是否强制删除(不进回收站)
    """
    # 危险操作,必须人工确认
    confirmation = interrupt({
        "action": "⚠️ 即将删除文件",
        "warning": "此操作不可恢复!" if force else "文件将移入回收站",
        "details": {
            "files": paths,
            "count": len(paths),
            "force": force
        },
        "instruction": "请确认删除操作"
    })
    
    if confirmation == "确认删除":
        # 模拟删除
        for path in paths:
            print(f"🗑️ 已删除: {path}")
        return f"已删除 {len(paths)} 个文件"
    else:
        return "用户取消了删除操作"

# 完整的 Agent 示例
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver

class State(TypedDict):
    messages: Annotated[list, add_messages]

def create_agent():
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    tools = [send_email_tool, delete_files_tool]
    llm_with_tools = llm.bind_tools(tools)
    
    def chatbot(state: State):
        message = llm_with_tools.invoke(state["messages"])
        # 禁用并行工具调用,避免恢复时重复执行
        assert len(message.tool_calls) <= 1
        return {"messages": [message]}
    
    builder = StateGraph(State)
    builder.add_node("chatbot", chatbot)
    builder.add_node("tools", ToolNode(tools=tools))
    
    builder.add_edge(START, "chatbot")
    builder.add_conditional_edges("chatbot", tools_condition)
    builder.add_edge("tools", "chatbot")
    
    memory = MemorySaver()
    return builder.compile(checkpointer=memory)

# 使用示例
agent = create_agent()
config = {"configurable": {"thread_id": "agent-001"}}

# 用户请求
agent.invoke({"messages": [{"role": "user", "content": "帮我发送一封邮件给 boss@company.com,主题是周报,内容是本周完成了所有任务"}]}, config)

# 图会在 send_email_tool 内部暂停
# 人工确认后恢复
agent.invoke(Command(resume="发送"), config)

3.4 模式四:验证人类输入(Validating Human Input)

典型场景:

  • 收集用户年龄,要求必须是正整数
  • 收集用户邮箱,要求格式正确
  • 收集用户手机号,要求符合规范

实现思路:

在一个循环内反复调用 interrupt(),每次检查输入是否合法,不合法则重新提示。

完整代码示例:

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
import re

class FormState(TypedDict):
    name: str
    age: int | None
    email: str
    phone: str
    validation_errors: list[str]

def collect_name(state: FormState):
    """收集姓名"""
    name = interrupt({
        "field": "name",
        "prompt": "请输入您的姓名"
    })
    return {"name": name}

def collect_age(state: FormState):
    """收集年龄(带验证)"""
    prompt = "请输入您的年龄(必须是正整数)"
    retries = 0
    max_retries = 3
    
    while retries < max_retries:
        answer = interrupt({
            "field": "age",
            "prompt": prompt
        })
        
        # 验证输入
        try:
            age = int(answer)
            if age > 0 and age < 150:
                return {"age": age}
            else:
                prompt = f"年龄 {age} 不在有效范围内(1-149),请重新输入"
        except (ValueError, TypeError):
            prompt = f"'{answer}' 不是有效的数字,请输入正整数"
        
        retries += 1
    
    # 超过重试次数,设置默认值
    return {"age": 18, "validation_errors": ["年龄输入无效,已设置默认值 18"]}

def collect_email(state: FormState):
    """收集邮箱(带格式验证)"""
    prompt = "请输入您的邮箱地址"
    email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    
    while True:
        answer = interrupt({
            "field": "email",
            "prompt": prompt
        })
        
        if re.match(email_pattern, answer):
            return {"email": answer}
        else:
            prompt = f"'{answer}' 不是有效的邮箱格式,请重新输入"

def collect_phone(state: FormState):
    """收集手机号(中国大陆格式)"""
    prompt = "请输入您的手机号(11位数字)"
    phone_pattern = r'^1[3-9]\d{9}$'
    
    while True:
        answer = interrupt({
            "field": "phone",
            "prompt": prompt
        })
        
        # 移除可能的前缀和空格
        phone = answer.replace(" ", "").replace("-", "")
        
        if re.match(phone_pattern, phone):
            return {"phone": phone}
        else:
            prompt = f"'{answer}' 不是有效的手机号格式,请输入11位手机号"

def summarize(state: FormState):
    """汇总信息"""
    print("\n" + "=" * 50)
    print("📋 表单收集完成")
    print("=" * 50)
    print(f"姓名: {state['name']}")
    print(f"年龄: {state['age']}")
    print(f"邮箱: {state['email']}")
    print(f"手机: {state['phone']}")
    if state.get("validation_errors"):
        print("\n⚠️ 验证警告:")
        for err in state["validation_errors"]:
            print(f"   - {err}")
    print("=" * 50)
    return state

# 构建图
builder = StateGraph(FormState)
builder.add_node("collect_name", collect_name)
builder.add_node("collect_age", collect_age)
builder.add_node("collect_email", collect_email)
builder.add_node("collect_phone", collect_phone)
builder.add_node("summarize", summarize)

builder.add_edge(START, "collect_name")
builder.add_edge("collect_name", "collect_age")
builder.add_edge("collect_age", "collect_email")
builder.add_edge("collect_email", "collect_phone")
builder.add_edge("collect_phone", "summarize")
builder.add_edge("summarize", END)

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行流程(需要多次恢复)
config = {"configurable": {"thread_id": "form-001"}}

# 启动
graph.invoke({}, config)

# 用户输入姓名
graph.invoke(Command(resume="张三"), config)

# 用户输入年龄(故意输入错误值测试)
graph.invoke(Command(resume="abc"), config)  # 第一次错误
graph.invoke(Command(resume="-10"), config)  # 第二次错误
graph.invoke(Command(resume="25"), config)   # 正确

# 用户输入邮箱
graph.invoke(Command(resume="invalid-email"), config)  # 错误格式
graph.invoke(Command(resume="zhangsan@example.com"), config)  # 正确

# 用户输入手机号
graph.invoke(Command(resume="12345"), config)  # 错误
graph.invoke(Command(resume="13812345678"), config)  # 正确

四、Checkpoint 机制深度解析

4.1 Checkpoint 保存的是什么

很多人误以为 Checkpoint 只是"保存对话历史",这是第一个认知偏差。

Checkpoint 保存的是 Graph 在某一执行步骤的完整状态快照,包括:

  1. 所有 Channel 的当前值:State 里每个字段的值
  2. 当前执行到哪个节点:next 字段
  3. 父检查点 ID:形成版本链
  4. 时间戳和元数据:写入时间、来源等
  5. 待执行任务:tasks 字段
# 获取当前状态的快照
snapshot = graph.get_state(config)

print(snapshot.values)     # 当前 State 的所有字段值
print(snapshot.next)       # 下一步要执行的节点
print(snapshot.config)     # thread_id 和 checkpoint_id
print(snapshot.metadata)   # 元数据

4.2 Super-Step 执行模型

LangGraph 采用"超步"(Super-Step)执行模型:

[读取上一个 Checkpoint]
        ↓
[执行当前节点,更新 State]
        ↓
[写入新 Checkpoint(快照)]
        ↓
[决定下一步:继续 / 等待 / 结束]

每个节点执行完成后,都会自动保存一个 Checkpoint。

4.3 时间旅行:回到历史状态

由于 Checkpoint 形成了版本链,我们可以"穿越"到任意历史状态:

# 获取所有历史状态
history = list(graph.get_state_history(config))

# 回到某个历史状态
past_state = history[-3]  # 往前数第3个状态
graph.invoke(None, config={**config, "checkpoint_id": past_state.config["configurable"]["checkpoint_id"]})

4.4 生产环境的 Checkpointer 选择

Checkpointer适用场景优点缺点
MemorySaver开发测试零配置、速度快重启丢失、不支持多实例
SqliteSaver单机生产持久化、简单单文件锁、不支持高并发
PostgresSaver生产环境高并发、事务支持需要数据库、配置复杂
RedisSaver高性能场景极快速度内存成本高、需要 Redis

PostgresSaver 生产配置示例:

from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool

# 连接池配置
pool = ConnectionPool(
    conninfo="postgresql://langgraph:password@localhost:5432/langgraph_db",
    min_size=5,
    max_size=20
)

# 创建检查点存储
checkpointer = PostgresSaver(pool)

# 首次使用需要初始化表
# checkpointer.setup()

graph = builder.compile(checkpointer=checkpointer)

五、最佳实践与踩坑指南

5.1 必须配置 Checkpointer

错误:

graph = builder.compile()  # 没有 checkpointer
# 调用 interrupt() 时会报错

正确:

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

5.2 interrupt() 只能在节点或工具中调用

错误:

def some_helper_function():
    return interrupt("等待输入")  # 不是节点或工具

def my_node(state):
    some_helper_function()  # 会报错

正确:

def my_node(state):
    return interrupt("等待输入")  # 直接在节点中调用

5.3 禁用并行工具调用

当使用人工干预时,需要禁用并行工具调用,避免恢复时重复执行:

def chatbot(state: State):
    message = llm_with_tools.invoke(state["messages"])
    # 确保最多只有一个工具调用
    assert len(message.tool_calls) <= 1, "请禁用并行工具调用"
    return {"messages": [message]}

5.4 恢复时使用相同的 thread_id

# 暂停时的配置
config = {"configurable": {"thread_id": "session-001"}}

# 恢复时必须使用相同的配置
graph.invoke(Command(resume="确认"), config=config)  # ✅ 正确

# 错误的恢复方式
graph.invoke(Command(resume="确认"), config={"configurable": {"thread_id": "session-002"}})  # ❌ 错误

5.5 检查中断状态的时机

错误:

for event in graph.stream(input, config):
    pass
snapshot = graph.get_state(config)
if snapshot.next:
    print("已中断")  # 可能在 stream 完成前就检查了

正确:

for event in graph.stream(input, config):
    # 处理事件
    pass
# stream 完成后再检查
snapshot = graph.get_state(config)
if snapshot.next:
    print(f"已中断,等待节点: {snapshot.next}")

5.6 设计合理的中断提示

# ❌ 不好的设计:信息不足
interrupt("确认吗?")

# ✅ 好的设计:信息完整、选项清晰
interrupt({
    "action": "即将执行删除操作",
    "target": f"文件列表:{files}",
    "warning": "此操作不可恢复!",
    "options": ["输入 '确认删除' 继续", "输入 '取消' 放弃"]
})

5.7 设置超时机制

避免用户无限等待:

from datetime import datetime, timedelta

def approval_with_timeout(state):
    start_time = datetime.now()
    timeout = timedelta(hours=24)  # 24小时超时
    
    while datetime.now() - start_time < timeout:
        decision = interrupt({
            "action": "等待审批",
            "timeout": str(timeout - (datetime.now() - start_time))
        })
        
        if decision:
            return {"approved": decision == "approve"}
    
    # 超时后默认拒绝
    return {"approved": False, "timeout": True}

六、完整实战案例:金融风控 Agent

6.1 场景描述

构建一个金融风控 Agent,当检测到大额交易时:

  1. 自动分析交易风险
  2. 高风险交易请求人工审批
  3. 审批通过后执行,否则拒绝

6.2 完整代码

"""
金融风控 Agent - Human-in-the-Loop 实战案例
"""
from typing import TypedDict, Literal, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
import json

# ============== 状态定义 ==============
class RiskAnalysis(BaseModel):
    risk_level: Literal["low", "medium", "high", "critical"]
    risk_factors: list[str]
    recommendation: str
    confidence: float

class TransactionState(TypedDict):
    transaction_id: str
    amount: float
    sender: str
    receiver: str
    timestamp: str
    risk_analysis: Optional[RiskAnalysis]
    human_decision: Optional[Literal["approve", "reject"]]
    execution_result: Optional[str]

# ============== 节点定义 ==============
def analyze_risk(state: TransactionState):
    """AI 分析交易风险"""
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    
    # 模拟风险分析(实际应用中调用风控模型)
    prompt = f"""
    分析以下交易的风险等级:
    
    交易ID: {state['transaction_id']}
    金额: {state['amount']}
    发送方: {state['sender']}
    接收方: {state['receiver']}
    时间: {state['timestamp']}
    
    请返回 JSON 格式:
    {{
        "risk_level": "low/medium/high/critical",
        "risk_factors": ["因子1", "因子2"],
        "recommendation": "建议",
        "confidence": 0.0-1.0
    }}
    """
    
    # 这里简化处理,实际应该调用 LLM
    if state['amount'] > 100000:
        risk = RiskAnalysis(
            risk_level="high",
            risk_factors=["大额交易", "首次交易对手"],
            recommendation="建议人工审核",
            confidence=0.85
        )
    elif state['amount'] > 50000:
        risk = RiskAnalysis(
            risk_level="medium",
            risk_factors=["金额较大"],
            recommendation="可自动放行或人工确认",
            confidence=0.75
        )
    else:
        risk = RiskAnalysis(
            risk_level="low",
            risk_factors=[],
            recommendation="自动放行",
            confidence=0.95
        )
    
    return {"risk_analysis": risk.model_dump()}

def route_by_risk(state: TransactionState):
    """根据风险等级路由"""
    risk = state["risk_analysis"]
    
    if risk["risk_level"] in ["high", "critical"]:
        return "request_approval"
    elif risk["risk_level"] == "medium":
        return "request_approval"  # 中等风险也请求人工确认
    else:
        return "auto_approve"

def request_approval(state: TransactionState):
    """请求人工审批"""
    risk = state["risk_analysis"]
    
    decision = interrupt({
        "type": "approval_request",
        "transaction": {
            "id": state["transaction_id"],
            "amount": state["amount"],
            "sender": state["sender"],
            "receiver": state["receiver"]
        },
        "risk_analysis": risk,
        "instruction": "请审核以上交易",
        "options": ["approve - 批准交易", "reject - 拒绝交易"]
    })
    
    return {"human_decision": decision}

def auto_approve(state: TransactionState):
    """自动批准(低风险)"""
    return {"human_decision": "approve"}

def execute_transaction(state: TransactionState):
    """执行交易"""
    print(f"\n✅ 交易 {state['transaction_id']} 已执行")
    print(f"   金额: ¥{state['amount']:,.2f}")
    print(f"   {state['sender']} → {state['receiver']}")
    return {"execution_result": "success"}

def reject_transaction(state: TransactionState):
    """拒绝交易"""
    print(f"\n❌ 交易 {state['transaction_id']} 已拒绝")
    print(f"   原因: 人工审核未通过")
    return {"execution_result": "rejected"}

def log_result(state: TransactionState):
    """记录日志"""
    print(f"\n📋 交易处理完成")
    print(f"   ID: {state['transaction_id']}")
    print(f"   结果: {state['execution_result']}")
    print(f"   风险等级: {state['risk_analysis']['risk_level']}")
    if state.get('human_decision'):
        print(f"   人工决策: {state['human_decision']}")
    return state

# ============== 构建图 ==============
builder = StateGraph(TransactionState)

# 添加节点
builder.add_node("analyze_risk", analyze_risk)
builder.add_node("request_approval", request_approval)
builder.add_node("auto_approve", auto_approve)
builder.add_node("execute", execute_transaction)
builder.add_node("reject", reject_transaction)
builder.add_node("log", log_result)

# 添加边
builder.add_edge(START, "analyze_risk")
builder.add_conditional_edges("analyze_risk", route_by_risk)
builder.add_edge("request_approval", "execute")  # 需要动态路由
builder.add_edge("auto_approve", "execute")
builder.add_edge("execute", "log")
builder.add_edge("reject", "log")
builder.add_edge("log", END)

# ============== 动态路由逻辑 ==============
def approval_router(state: TransactionState):
    """审批结果路由"""
    if state["human_decision"] == "approve":
        return "execute"
    else:
        return "reject"

# 修改 request_approval 节点的输出
def request_approval_with_route(state: TransactionState):
    decision = interrupt({...})
    if decision == "approve":
        return Command(goto="execute")
    else:
        return Command(goto="reject")

# ============== 编译与运行 ==============
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 测试用例
def test_transaction():
    config = {"configurable": {"thread_id": "tx-test-001"}}
    
    # 模拟一笔大额交易
    transaction = {
        "transaction_id": "TX-20260601001",
        "amount": 150000.0,
        "sender": "账户A",
        "receiver": "账户B",
        "timestamp": "2026-06-01T10:00:00Z"
    }
    
    # 执行分析
    for event in graph.stream(transaction, config):
        print(f"节点输出: {event}")
    
    # 此时图在 request_approval 节点暂停
    snapshot = graph.get_state(config)
    print(f"\n⏸️ 等待人工审批...")
    print(f"风险分析: {snapshot.values['risk_analysis']}")
    
    # 模拟人工审批
    print("\n模拟人工审批:批准")
    graph.invoke(Command(resume="approve"), config)

if __name__ == "__main__":
    test_transaction()

6.3 运行效果

节点输出: {'analyze_risk': {'risk_analysis': {...}}}

⏸️ 等待人工审批...
风险分析: {'risk_level': 'high', 'risk_factors': ['大额交易', '首次交易对手'], 'recommendation': '建议人工审核', 'confidence': 0.85}

模拟人工审批:批准

✅ 交易 TX-20260601001 已执行
   金额: ¥150,000.00
   账户A → 账户B

📋 交易处理完成
   ID: TX-20260601001
   结果: success
   风险等级: high
   人工决策: approve

七、总结与展望

7.1 四种模式总结

模式核心能力关键机制典型场景
批准/拒绝关键操作前人工决策interrupt + Command(resume)转账、删除、发布
查看/编辑允许修改中间状态interrupt + 状态更新邮件草稿、报告生成
工具中断工具调用前插入干预工具内部 interrupt发送消息、调用 API
输入验证循环获取并校验输入interrupt + 循环校验表单填写、参数配置

7.2 Human-in-the-Loop 的价值

  1. 安全兜底:高风险操作有人类把关
  2. 质量保障:AI 生成内容由人工审核
  3. 责任清晰:人机协作的决策链条可追溯
  4. 用户体验:用户可以在流程中参与决策

7.3 未来展望

随着 AI Agent 能力的增强,Human-in-the-Loop 机制会更加重要:

  • 自适应中断:AI 自动判断何时需要人工介入
  • 多人协作:支持多级审批、会签等复杂流程
  • 审计追踪:完整的人机交互记录,满足合规要求
  • 智能提示:AI 分析风险后给出审批建议

一句话总结:interrupt() 保存现场,Command(resume=...) 恢复执行,Human-in-the-Loop 让 AI Agent 既高效又可靠。


相关阅读:

  • LangGraph 状态管理:如何设计高效的状态流
  • LangGraph 工具使用:让 AI 具备行动能力
  • LangGraph 持久化完全指南:Checkpoint 机制深度解析

本文通过 LangGraph 的 Human-in-the-Loop 机制,展示了如何在 AI Agent 中实现人机协作。四种应用模式覆盖了绝大多数业务场景,从审批流程到输入验证,从工具调用到内容审核。这套机制让复杂任务既自动化又可靠,是生产级 AI Agent 的必备能力。

推荐文章

如何在Rust中使用UUID?
2024-11-19 06:10:59 +0800 CST
CSS实现亚克力和磨砂玻璃效果
2024-11-18 01:21:20 +0800 CST
使用Python提取图片中的GPS信息
2024-11-18 13:46:22 +0800 CST
Vue3中的v-bind指令有什么新特性?
2024-11-18 14:58:47 +0800 CST
Vue3中如何实现插件?
2024-11-18 04:27:04 +0800 CST
程序员出海搞钱工具库
2024-11-18 22:16:19 +0800 CST
程序员茄子在线接单