编程 Google LangExtract 深度实战:从非结构化文本到结构化知识的工程化完全指南(2026)

2026-05-24 13:30:29 +0800 CST views 15

Google LangExtract 深度实战:从非结构化文本到结构化知识的工程化完全指南(2026)

关键词:LangExtract、LLM 结构化提取、Schema 驱动、Source Grounding、RAG、Google、Python

目录

  1. 背景与痛点:非结构化文本的「提取困境」
  2. LangExtract 是什么?核心设计哲学
  3. 核心概念深度解析
    • 3.1 Schema 驱动抽取引擎
    • 3.2 Source Grounding(精确溯源)
    • 3.3 交互式可视化
    • 3.4 模型路由与成本优化
  4. 架构分析:LangExtract 内部工作原理
    • 4.1 整体架构图
    • 4.2 Prompt 工程层
    • 4.3 模型调用层
    • 4.4 输出解析与验证层
    • 4.5 可视化渲染层
  5. 安装与配置完全指南
  6. 代码实战:从入门到生产级应用
    • 6.1 快速入门:从客服对话中提取投诉原因
    • 6.2 医疗场景:从电子病历中提取关键实体
    • 6.3 法律场景:合同关键条款提取
    • 6.4 批量处理与异步调用
    • 6.5 与 Milvus 集成:构建语义搜索系统
  7. 高级技巧与性能优化
    • 7.1 模型选择策略(Gemini Flash vs Pro)
    • 7.2 Prompt 优化技巧
    • 7.3 批量处理与速率限制
    • 7.4 成本优化实战
    • 7.5 错误处理与重试机制
  8. 生产级部署方案
    • 8.1 容器化部署
    • 8.2 与现有系统集成
    • 8.3 监控与可观测性
  9. LangExtract vs 其他方案对比
  10. 总结与展望
  11. 参考资料

1. 背景与痛点:非结构化文本的「提取困境」

在当今的数字化时代,全球每天产生的文本数据是天文数字——邮件、客服对话、医疗记录、法律合同、社交媒体内容……这些数据中蕴含着巨大的价值,但有一个共同特点:它们是非结构化的

1.1 传统方法的天花板

传统的信息提取方法主要依赖:

基于规则的方法(正则表达式、关键词匹配):

# 传统方法:从客服对话中提取投诉原因
import re

def extract_complaint(text):
    if "不满意" in text or "投诉" in text:
        return "客户投诉"
    return "未知"

# 问题:无法处理语义变体
# "你们这服务真的垃圾" → 无法识别
# "非常失望,下次不来了" → 无法识别

基于机器学习的方法(NER、文本分类):

  • 需要大量标注数据
  • 泛化能力差,换领域就要重新训练
  • 无法处理复杂语义关系

基于 LLM 的直接调用(Naive Approach):

# 直接调用 LLM 提取信息
import google.generativeai as genai

response = genai.generate_text(
    prompt=f"从以下文本中提取产品缺陷:{text}",
    model="gemini-pro"
)
# 问题:输出不稳定、无法溯源、难以验证

1.2 核心痛点

痛点描述影响
输出不稳定同样输入,多次调用输出可能不同生产环境不可靠
无法溯源不知道提取结果来自原文哪一段无法验证、无法审计
格式不统一输出格式不一致,难以程序化处理需要大量后处理
成本不可控每次都调用大模型,成本高昂大规模应用受限
缺乏可视化提取结果难以直观展示和验证调试困难

Google LangExtract 正是为了解决这些痛点而生。


2. LangExtract 是什么?核心设计哲学

LangExtract 是 Google 开源的一个 Python 库,利用大语言模型(LLM)从非结构化文本中精确提取结构化信息,并附带精确的源文本溯源(Source Grounding)和交互式可视化

2.1 核心特性

✅ Schema 驱动 —— 用 Pydantic Model 定义提取结构
✅ 精确溯源 —— 每个提取结果都标注原文出处(字符级定位)
✅ 交互式可视化 —— 生成 HTML 报告,点击结果高亮原文
✅ 模型无关 —— 支持 Gemini 系列,可扩展其他模型
✅ 批量处理 —— 支持大规模文档处理
✅ 成本优化 —— 智能模型路由,平衡质量与成本

2.2 设计哲学

LangExtract 的设计哲学可以归纳为三点:

1. 声明式提取(Declarative Extraction)

传统方法需要写大量的提取逻辑,而 LangExtract 采用声明式范式——你只需要定义你想提取什么(Schema),而不需要关心怎么提取

from pydantic import BaseModel

class ComplaintInfo(BaseModel):
    reason: str          # 投诉原因
    product: str | None  # 涉及产品
    sentiment: str       # 情感倾向:positive/negative/neutral
    severity: int        # 严重程度:1-5

# 只需要定义 Schema,LangExtract 自动处理提取逻辑

2. 可验证性优先(Verifiability First)

LangExtract 的每个输出都附带精确的原文引用——你可以直接点击查看模型是基于哪一段文本得出的结论。这极大地提升了系统的可审计性可信度

3. 工程化思维(Engineering-Grade Design)

LangExtract 不是玩具项目,而是按照生产级标准设计的:

  • 完整的错误处理
  • 速率限制和重试机制
  • 详细的日志和调试信息
  • 支持异步批量处理

3. 核心概念深度解析

3.1 Schema 驱动抽取引擎

Schema 是 LangExtract 的核心——它定义了你想从文本中提取什么信息

3.1.1 基础 Schema 定义

from pydantic import BaseModel, Field
from typing import Optional, List

class ProductReview(BaseModel):
    """从产品评论中提取结构化信息"""
    
    product_name: str = Field(
        description="产品名称,如 'iPhone 15 Pro'"
    )
    
    rating: Optional[int] = Field(
        default=None,
        description="用户评分,1-5 之间的整数"
    )
    
    pros: List[str] = Field(
        default_factory=list,
        description="用户提到的优点列表"
    )
    
    cons: List[str] = Field(
        default_factory=list,
        description="用户提到的缺点列表"
    )
    
    sentiment: str = Field(
        description="总体情感:positive / negative / neutral"
    )
    
    would_recommend: bool = Field(
        description="是否愿意推荐给朋友"
    )

3.1.2 嵌套 Schema(复杂结构提取)

class Feature(BaseModel):
    name: str = Field(description="功能名称")
    rating: int = Field(description="该功能评分 1-5")
    comment: Optional[str] = Field(default=None)

class DetailedReview(BaseModel):
    product: str
    overall_rating: int
    features: List[Feature]  # 嵌套结构
    purchase_date: Optional[str] = Field(
        default=None,
        description="购买日期,格式 YYYY-MM-DD"
    )
    verified_purchase: bool

3.1.3 Schema 设计最佳实践

原则一:字段描述要精确

# ❌ 不好的描述
class BadSchema(BaseModel):
    info: str  # 太模糊

# ✅ 好的描述
class GoodSchema(BaseModel):
    payment_method: str = Field(
        description="""
        支付方式,必须是以下之一:
        - 'credit_card'(信用卡)
        - 'paypal'(PayPal)
        - 'bank_transfer'(银行转账)
        - 'cash'(现金)
        如果无法确定,返回 'unknown'
        """
    )

原则二:使用 Optional 处理缺失信息

# 真实文本中信息往往不完整
class RealWorldSchema(BaseModel):
    name: str                    # 必填
    email: Optional[str] = None  # 选填,可能不存在
    phone: Optional[str] = None  # 选填

原则三:用 description 引导模型

模型完全依赖 description 来理解每个字段的含义——描述越精确,提取质量越高。

3.2 Source Grounding(精确溯源)

Source Grounding 是 LangExtract 的杀手级特性——它不仅能提取信息,还能告诉你这个信息来自原文的哪个位置

3.2.1 Grounding 的工作原理

import langextract as lx

result = lx.extract(
    text_or_documents="用户表示对产品非常不满意,说是买了两周就坏了。",
    prompt_description="提取客户反馈信息",
    examples=[...],
    model_id="gemini-2.5-flash",
    grounding=True  # 启用溯源
)

# 输出结构
{
  "sentiment": "negative",
  "_grounding": {
    "sentiment": {
      "text": "非常不满意",
      "start_char": 6,
      "end_char": 11,
      "score": 0.98
    }
  }
}

3.2.2 Grounding 的生产价值

场景价值
审计合规每个提取结果都可追溯到原文,满足金融/医疗行业的合规要求
错误调试提取错误时,可以直接看到模型基于哪段文本得出结论
人机协作人工审核时,可以直接看到原文依据,快速判断正确性
模型评估可以量化分析模型在哪些类型的文本上表现好/差

3.3 交互式可视化

LangExtract 内置了强大的可视化功能,可以生成交互式 HTML 报告。

import langextract as lx

# 提取并生成可视化报告
result = lx.extract(
    text_or_documents=long_text,
    prompt_description="提取所有关键信息",
    examples=examples,
    model_id="gemini-2.5-pro",
    visualization=True  # 启用可视化
)

# 生成 HTML 报告
lx.visualize(
    result=result,
    output_path="report.html",
    highlight_color="#fff3cd"  # 自定义高亮颜色
)

可视化报告的功能:

  • 原文与提取结果并排显示
  • 点击提取结果,自动高亮原文对应位置
  • 支持按实体类型筛选显示
  • 可导出为独立 HTML 文件

3.4 模型路由与成本优化

LangExtract 支持多种 Gemini 模型,并提供了智能路由策略:

模型速度质量成本适用场景
gemini-2.5-flash⚡⚡⚡★★★$大规模批量处理、简单提取
gemini-2.5-pro⚡⚡★★★★★$$$复杂语义、高精度要求
gemini-1.5-flash⚡⚡⚡★★$超大规模、低精度要求
# 智能模型选择策略
def select_model(text_length, complexity):
    if text_length < 500 and complexity == "low":
        return "gemini-2.5-flash"
    elif text_length > 10000 or complexity == "high":
        return "gemini-2.5-pro"
    else:
        return "gemini-2.5-flash"  # 默认选择

4. 架构分析:LangExtract 内部工作原理

4.1 整体架构图

输入文本
   │
   ▼
┌─────────────────────────────────────┐
│     Schema 验证层                    │
│  (Pydantic Model 解析与验证)        │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│     Prompt 工程层                   │
│  • Schema → JSON Schema 转换        │
│  • Few-shot 示例注入                │
│  • Grounding 指令注入               │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│     模型调用层                       │
│  • 速率限制                         │
│  • 重试机制                         │
│  • 成本追踪                         │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│     输出解析与验证层                  │
│  • JSON 解析                        │
│  • Schema 验证                      │
│  • Grounding 解析                   │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│     可视化渲染层                     │
│  • HTML 生成                        │
│  • 交互式组件                       │
└─────────────────────────────────────┘
               │
               ▼
         输出结果 + 可视化报告

4.2 Prompt 工程层深度分析

LangExtract 的 Prompt 工程是其核心竞争优势之一。它自动将 Schema 转换为精心设计的 Prompt。

4.2.1 自动生成的 Prompt 结构

# LangExtract 内部生成的 Prompt(简化版)
prompt = f"""
你是一个专业的信息提取助手。

## 任务描述
{prompt_description}

## 提取 Schema
{json_schema}  # 从 Pydantic Model 自动生成

## 示例(Few-shot)
{examples}  # 用户提供的示例

## 输出要求
1. 严格按 JSON 格式输出,符合上述 Schema
2. 每个字段的提取结果必须附带 grounding 信息:
   {{
     "field_name": "提取结果",
     "_grounding": {{
       "field_name": {{
         "text": "原文片段",
         "start_char": 0,
         "end_char": 10,
         "confidence": 0.95
       }}
     }}
   }}
3. 如果某个字段在文本中不存在,返回 null

## 待提取文本
{text}
"""

4.2.2 Few-shot 示例的作用

examples = [
    {
        "text": "我买了个iPhone,用了一周就卡得不行,非常失望。",
        "output": {
            "product": "iPhone",
            "sentiment": "negative",
            "issue": "卡顿",
            "_grounding": {...}
        }
    },
    {
        "text": "MacBook Pro 太好用了,性能强悍,就是有点重。",
        "output": {
            "product": "MacBook Pro",
            "sentiment": "positive",
            "pros": ["性能强悍"],
            "cons": ["有点重"],
            "_grounding": {...}
        }
    }
]

# LangExtract 会将示例注入到 Prompt 中
# 这显著提升了提取质量和一致性

4.3 模型调用层

# 模型调用层的核心逻辑(概念性代码)
class ModelCaller:
    def __init__(self, model_id, rate_limiter, cost_tracker):
        self.model_id = model_id
        self.rate_limiter = rate_limiter
        self.cost_tracker = cost_tracker
    
    def call(self, prompt, max_retries=3):
        for attempt in range(max_retries):
            try:
                # 速率限制
                self.rate_limiter.acquire()
                
                # 调用模型
                response = genai.generate_text(
                    prompt=prompt,
                    model=self.model_id,
                    temperature=0.1  # 低温度确保一致性
                )
                
                # 成本追踪
                self.cost_tracker.track(
                    model=self.model_id,
                    input_tokens=count_tokens(prompt),
                    output_tokens=count_tokens(response)
                )
                
                return response
            
            except RateLimitError:
                wait_time = 2 ** attempt  # 指数退避
                time.sleep(wait_time)
            
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(1)

4.4 输出解析与验证层

# 输出解析的核心挑战:LLM 输出可能不严格符合 JSON 格式
import json
import re

def parse_llm_output(raw_output, schema):
    # Step 1: 尝试直接解析 JSON
    try:
        parsed = json.loads(raw_output)
        return validate_against_schema(parsed, schema)
    except json.JSONDecodeError:
        pass
    
    # Step 2: 使用正则提取 JSON 块
    json_match = re.search(r'```json\n(.*?)\n```', raw_output, re.DOTALL)
    if json_match:
        try:
            parsed = json.loads(json_match.group(1))
            return validate_against_schema(parsed, schema)
        except:
            pass
    
    # Step 3: 容错解析(跳过无法解析的字段)
    return fallback_parse(raw_output, schema)

4.5 可视化渲染层

可视化渲染层使用 Jinja2 模板生成交互式 HTML:

# 可视化渲染的核心逻辑
def render_visualization(result, output_path):
    # 1. 将提取结果转换为高亮区间
    highlights = []
    for field_name, grounding in result.grounding.items():
        highlights.append({
            "start": grounding.start_char,
            "end": grounding.end_char,
            "field": field_name,
            "color": get_color_for_field(field_name)
        })
    
    # 2. 渲染 HTML 模板
    html = template.render(
        original_text=result.original_text,
        extracted_data=result.data,
        highlights=highlights,
        grounding=result.grounding
    )
    
    # 3. 写入文件
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(html)

5. 安装与配置完全指南

5.1 环境要求

Python >= 3.10
Google Generative AI SDK
Pydantic >= 2.0

5.2 安装步骤

# 从 PyPI 安装(如果已发布)
pip install langextract

# 从源码安装(当前推荐方式)
git clone https://github.com/google/langextract.git
cd langextract
pip install -e .

5.3 配置 Google API Key

import google.generativeai as genai

# 方法一:环境变量
export GOOGLE_API_KEY="your-api-key-here"

# 方法二:代码中配置
genai.configure(api_key="your-api-key-here")

5.4 验证安装

import langextract as lx

# 简单测试
result = lx.extract(
    text_or_documents="这是一个测试。",
    prompt_description="测试提取",
    examples=[],
    model_id="gemini-2.5-flash"
)

print(result)

6. 代码实战:从入门到生产级应用

6.1 快速入门:从客服对话中提取投诉原因

场景描述

某电商平台的客服系统每天处理数万条对话,需要自动提取:

  • 投诉原因
  • 涉及产品
  • 情感倾向
  • 处理优先级

完整代码实现

import langextract as lx
from pydantic import BaseModel, Field
from typing import Optional, List

# ========== Step 1: 定义 Schema ==========
class CustomerComplaint(BaseModel):
    """客服对话投诉信息提取 Schema"""
    
    complaint_reason: str = Field(
        description="""
        客户投诉的具体原因,用一句话概括。
        例如:"手机电池续航时间短"、"物流配送延迟"
        """
    )
    
    product: Optional[str] = Field(
        default=None,
        description="涉及的产品名称,如果未提及则为 null"
    )
    
    sentiment: str = Field(
        description="""
        客户情感倾向,必须是以下之一:
        - 'angry'(愤怒)
        - 'dissatisfied'(不满意)
        - 'neutral'(中性)
        - 'satisfied'(满意)
        """
    )
    
    priority: int = Field(
        description="""
        处理优先级,根据投诉严重程度判断:
        - 1 = 低优先级(一般咨询)
        - 2 = 中优先级(普通投诉)
        - 3 = 高优先级(严重问题)
        - 4 = 紧急(需要立即处理)
        """
    )
    
    requested_solution: Optional[str] = Field(
        default=None,
        description="客户要求的解决方案,如'退款'、'换货'等"
    )

# ========== Step 2: 准备 Few-shot 示例 ==========
examples = [
    {
        "text": "你们这个破手机才买了三天就黑屏了,我要退款!",
        "output": {
            "complaint_reason": "手机黑屏故障",
            "product": "手机",
            "sentiment": "angry",
            "priority": 4,
            "requested_solution": "退款"
        }
    },
    {
        "text": "物流太慢了,说我上周就能到,现在还没发货,不高兴",
        "output": {
            "complaint_reason": "物流配送延迟",
            "product": None,
            "sentiment": "dissatisfied",
            "priority": 2,
            "requested_solution": None
        }
    },
    {
        "text": "商品还不错,就是包装有点简陋,整体满意",
        "output": {
            "complaint_reason": "包装简陋",
            "product": None,
            "sentiment": "satisfied",
            "priority": 1,
            "requested_solution": None
        }
    }
]

# ========== Step 3: 执行提取 ==========
customer_text = """
客服你好,我上个月在你们这里买的 AirPods Pro 2,
 left 耳机昨天突然没声音了,重启了好几次都没用。
 我之前用的索尼耳机都没这问题,有点失望。
 能不能帮我换个新的?
"""

result = lx.extract(
    text_or_documents=customer_text,
    prompt_description="""
    从客服对话中提取投诉信息。
    注意:
    1. complaint_reason 要具体,不要只写'产品质量问题'
    2. priority 要根据问题的严重程度合理判断
    3. sentiment 要准确反映客户的情绪状态
    """,
    examples=examples,
    model_id="gemini-2.5-flash",  # 性价比高的选择
    grounding=True  # 启用溯源
)

# ========== Step 4: 解析结果 ==========
print("=== 提取结果 ===")
print(f"投诉原因: {result.data.complaint_reason}")
print(f"涉及产品: {result.data.product}")
print(f"情感倾向: {result.data.sentiment}")
print(f"优先级: {result.data.priority}")
print(f"要求方案: {result.data.requested_solution}")

print("\n=== Grounding 溯源信息 ===")
for field_name, grounding in result.grounding.items():
    print(f"[{field_name}]")
    print(f"  原文: {grounding.text}")
    print(f"  位置: {grounding.start_char}-{grounding.end_char}")
    print(f"  置信度: {grounding.confidence:.2f}")

# ========== Step 5: 生成可视化报告 ==========
lx.visualize(
    result=result,
    output_path="complaint_analysis.html",
    title="客服投诉信息提取报告"
)
print("\n可视化报告已生成: complaint_analysis.html")

输出示例

=== 提取结果 ===
投诉原因: AirPods Pro 2 左耳机突然无声音
涉及产品: AirPods Pro 2
情感倾向: dissatisfied
优先级: 3
要求方案: 换货

=== Grounding 溯源信息 ===
[complaint_reason]
  原文: left 耳机昨天突然没声音了
  位置: 45-60
  置信度: 0.97

[product]
  原文: AirPods Pro 2
  位置: 12-28
  置信度: 1.00
...

6.2 医疗场景:从电子病历中提取关键实体

场景描述

医疗机构需要从大量非结构化电子病历(EMR)中提取结构化信息,用于:

  • 疾病编码(ICD-10)
  • 药物不良反应监测
  • 临床决策支持

完整实现

from pydantic import BaseModel, Field
from typing import Optional, List
import langextract as lx
from datetime import date

class Medication(BaseModel):
    """药物信息"""
    name: str = Field(description="药物名称")
    dosage: str = Field(description="剂量,如 '500mg'、'10ml'")
    frequency: str = Field(description="用药频率,如 '每日两次'")
    duration: Optional[str] = Field(default=None, description="用药时长")

class LabTest(BaseModel):
    """实验室检查"""
    test_name: str = Field(description="检查项目名称")
    result: str = Field(description="检查结果/数值")
    unit: Optional[str] = Field(default=None, description="单位")
    reference_range: Optional[str] = Field(default=None, description="参考范围")

class Diagnosis(BaseModel):
    """诊断信息"""
    disease_name: str = Field(description="疾病名称")
    icd10_code: Optional[str] = Field(default=None, description="ICD-10 编码")
    severity: Optional[str] = Field(default=None, description="严重程度")

class EMRRecord(BaseModel):
    """电子病历结构化 Schema"""
    
    patient_id: Optional[str] = Field(default=None, description="患者ID(如果提及)")
    
    chief_complaint: str = Field(
        description="主诉:患者最主要的症状和持续时间"
    )
    
    diagnoses: List[Diagnosis] = Field(
        default_factory=list,
        description="诊断列表"
    )
    
    medications: List[Medication] = Field(
        default_factory=list,
        description="当前用药列表"
    )
    
    lab_tests: List[LabTest] = Field(
        default_factory=list,
        description="实验室检查结果"
    )
    
    allergies: List[str] = Field(
        default_factory=list,
        description="已知药物过敏史"
    )
    
    vital_signs: Optional[str] = Field(
        default=None,
        description="生命体征摘要(血压、心率、体温等)"
    )

# 示例电子病历文本
emr_text = """
患者男性,45岁,因"反复上腹痛3个月,加重1周"入院。

【主诉】反复上腹痛3个月,餐后加重,伴有反酸、嗳气。

【现病史】患者3个月前无明显诱因出现上腹痛,多为餐后1-2小时出现,
疼痛为烧灼样,伴反酸、嗳气,无放射痛。1周前症状加重,
夜间也有疼痛,影响睡眠。

【既往史】高血压5年,目前在用"苯磺酸氨氯地平 5mg qd"。
否认糖尿病、冠心病史。青霉素过敏。

【体格检查】BP 145/95 mmHg,HR 82 bpm,T 36.5°C。
上腹部有压痛,无反跳痛。

【实验室检查】
- 血常规:WBC 6.5×10^9/L,Hb 142 g/L
- 胃镜:胃窦黏膜充血、水肿,可见糜烂,快速尿素酶试验阳性

【诊断】
1. 慢性胃炎(ICD-10: K29.5)
2. 幽门螺杆菌感染(ICD-10: A49.86)
3. 高血压1级(ICD-10: I10)

【处理】
1. 标准四联根除治疗:
   - 艾司奥美拉唑 20mg bid
   - 阿莫西林 1000mg bid
   - 克拉霉素 500mg bid
   - 枸橼酸铋钾 220mg bid
   疗程14天
2. 生活方式干预:戒烟酒,规律饮食
3. 4周后复查胃镜

【随访】建议4周后门诊随访,评估症状改善情况。
"""

# 执行提取
result = lx.extract(
    text_or_documents=emr_text,
    prompt_description="""
    从电子病历文本中提取结构化医疗信息。
    注意事项:
    1. diagnoses 中的 icd10_code 如果文本中没有明确给出,可以推断但需标注为不确定
    2. medications 要完整提取剂量和用药频次
    3. lab_tests 的 reference_range 如果文本中有"正常"字样,可以写"正常范围"
    """,
    examples=[],  # 医疗场景建议提供示例
    model_id="gemini-2.5-pro",  # 使用高精度模型
    grounding=True
)

# 输出结构化结果
import json
print(json.dumps(result.data.model_dump(), ensure_ascii=False, indent=2))

6.3 法律场景:合同关键条款提取

class ContractParty(BaseModel):
    name: str = Field(description="当事方名称")
    role: str = Field(description="角色:甲方/乙方/卖方/买方等")
    address: Optional[str] = Field(default=None)

class PaymentTerms(BaseModel):
    amount: str = Field(description="金额(含货币单位)")
    payment_method: Optional[str] = Field(default=None, description="支付方式")
    payment_deadline: Optional[str] = Field(default=None, description="付款期限")

class KeyClause(BaseModel):
    clause_type: str = Field(
        description="""
        条款类型,如:
        - 'payment'(付款条款)
        - 'delivery'(交付条款)
        - 'confidentiality'(保密条款)
        - 'termination'(终止条款)
        - 'liability'(责任限制条款)
        """
    )
    content: str = Field(description="条款内容摘要")
    risk_level: Optional[str] = Field(
        default=None,
        description="风险等级:low / medium / high"
    )

class ContractInfo(BaseModel):
    """合同关键信息提取 Schema"""
    
    contract_title: str = Field(description="合同标题/名称")
    
    parties: List[ContractParty] = Field(
        description="合同当事方列表"
    )
    
    effective_date: Optional[str] = Field(
        default=None,
        description="合同生效日期 (YYYY-MM-DD)"
    )
    
    expiration_date: Optional[str] = Field(
        default=None,
        description="合同到期日期 (YYYY-MM-DD)"
    )
    
    payment_terms: Optional[PaymentTerms] = Field(default=None)
    
    key_clauses: List[KeyClause] = Field(
        default_factory=list,
        description="关键条款列表(识别对甲方/乙方有重大影响的条款)"
    )
    
    governing_law: Optional[str] = Field(
        default=None,
        description="管辖法律(如'中华人民共和国法律')"
    )
    
    dispute_resolution: Optional[str] = Field(
        default=None,
        description="争议解决方式(如'仲裁'、'诉讼')"
    )

6.4 批量处理与异步调用

生产环境中往往需要批量处理大量文档,LangExtract 支持异步批量处理:

import asyncio
from typing import List
import langextract as lx

async def batch_extract(
    documents: List[str],
    schema,
    prompt_description: str,
    model_id: str = "gemini-2.5-flash",
    max_concurrent: int = 5
) -> List[dict]:
    """
    批量异步提取
    
    Args:
        documents: 文档列表
        schema: Pydantic Schema 类
        prompt_description: 提取任务描述
        model_id: 模型 ID
        max_concurrent: 最大并发数
    """
    
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def extract_one(text: str, idx: int) -> dict:
        async with semaphore:
            try:
                result = await lx.aextract(  # 异步版本
                    text_or_documents=text,
                    prompt_description=prompt_description,
                    examples=[],
                    model_id=model_id,
                    grounding=True
                )
                return {
                    "index": idx,
                    "success": True,
                    "data": result.data.model_dump(),
                    "grounding": result.grounding
                }
            except Exception as e:
                return {
                    "index": idx,
                    "success": False,
                    "error": str(e)
                }
    
    tasks = [extract_one(doc, i) for i, doc in enumerate(documents)]
    results = await asyncio.gather(*tasks)
    
    return results

# 使用示例
async def main():
    # 假设有 100 份客服对话需要批量处理
    with open("customer_conversations.txt", "r") as f:
        documents = [line.strip() for line in f if line.strip()]
    
    results = await batch_extract(
        documents=documents,
        schema=CustomerComplaint,
        prompt_description="提取客服对话中的投诉信息",
        model_id="gemini-2.5-flash",
        max_concurrent=10  # 控制并发,避免速率限制
    )
    
    # 统计结果
    success_count = sum(1 for r in results if r["success"])
    print(f"成功处理: {success_count}/{len(documents)}")
    
    # 保存结果
    with open("extraction_results.json", "w") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)

if __name__ == "__main__":
    asyncio.run(main())

6.5 与 Milvus 集成:构建语义搜索系统

LangExtract + Milvus 的组合可以实现强大的智能文档处理系统

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import langextract as lx
import numpy as np

class DocumentProcessor:
    """
    将 LangExtract 提取的结构化信息存储到 Milvus,
    实现语义搜索 + 精确元数据过滤
    """
    
    def __init__(self, milvus_host="localhost", milvus_port="19530"):
        # 连接 Milvus
        connections.connect(host=milvus_host, port=milvus_port)
        
        # 定义 Collection Schema
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=100),
            FieldSchema(name="content_vector", dtype=DataType.FLOAT_VECTOR, dim=768),
            FieldSchema(name="extracted_entities", dtype=DataType.JSON),  # LangExtract 提取的实体
            FieldSchema(name="source_text", dtype=DataType.VARCHAR, max_length=5000),
            FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50),
            FieldSchema(name="timestamp", dtype=DataType.INT64)
        ]
        
        schema = CollectionSchema(fields, description="智能文档处理 Collection")
        
        # 创建或加载 Collection
        self.collection = Collection(name="smart_documents", schema=schema)
        self.collection.create_index(
            field_name="content_vector",
            index_params={"metric_type": "IP", "index_type": "IVF_FLAT", "params": {"nlist": 1024}}
        )
    
    def process_and_store(self, document_id: str, text: str, category: str):
        """处理文档并存储到 Milvus"""
        
        # Step 1: 使用 LangExtract 提取结构化信息
        extraction_result = lx.extract(
            text_or_documents=text,
            prompt_description="提取文档中的关键实体和信息",
            examples=[],
            model_id="gemini-2.5-flash"
        )
        
        # Step 2: 将提取的信息转换为向量(使用 Embedding 模型)
        entity_text = json.dumps(extraction_result.data.model_dump(), ensure_ascii=False)
        vector = self.get_embedding(entity_text)  # 需要实现 Embedding 函数
        
        # Step 3: 插入 Milvus
        self.collection.insert([
            [document_id],
            [vector.tolist()],
            [extraction_result.data.model_dump()],
            [text],
            [category],
            [int(time.time())]
        ])
        
        print(f"文档 {document_id} 已处理并存储")
    
    def search(self, query: str, category: Optional[str] = None, top_k: int = 5):
        """语义搜索 + 元数据过滤"""
        
        # 将查询转换为向量
        query_vector = self.get_embedding(query)
        
        # 构建搜索参数
        search_params = {"metric_type": "IP", "params": {"nprobe": 10}}
        
        # 执行搜索
        results = self.collection.search(
            data=[query_vector.tolist()],
            anns_field="content_vector",
            param=search_params,
            limit=top_k,
            expr=f"category == '{category}'" if category else None  # 元数据过滤
        )
        
        return results

# 使用示例
processor = DocumentProcessor()

# 处理文档
processor.process_and_store(
    document_id="doc_001",
    text="患者男性,45岁,因反复上腹痛入院...",  # 病历文本
    category="medical"
)

# 语义搜索
results = processor.search(
    query="胃病患者用药记录",
    category="medical",
    top_k=5
)

for hit in results[0]:
    print(f"文档ID: {hit.entity.get('document_id')}")
    print(f"相关度: {hit.distance}")
    print(f"提取的实体: {hit.entity.get('extracted_entities')}")

7. 高级技巧与性能优化

7.1 模型选择策略

from enum import Enum

class TaskComplexity(Enum):
    LOW = "low"        # 简单信息提取(命名实体、日期等)
    MEDIUM = "medium"  # 中等复杂度(关系提取、情感分析)
    HIGH = "high"      # 高复杂度(推理、摘要、多跳关系)

def smart_model_selection(text_length: int, complexity: TaskComplexity) -> str:
    """
    智能模型选择策略
    
    Args:
        text_length: 文本长度(字符数)
        complexity: 任务复杂度
    """
    
    # 超短文本 + 低复杂度 → 最快模型
    if text_length < 200 and complexity == TaskComplexity.LOW:
        return "gemini-2.5-flash"
    
    # 长文本 + 高复杂度 → 最强模型
    if text_length > 10000 or complexity == TaskComplexity.HIGH:
        return "gemini-2.5-pro"
    
    # 默认:性价比最高的模型
    return "gemini-2.5-flash"

# 实际使用
model_id = smart_model_selection(
    text_length=len(long_medical_text),
    complexity=TaskComplexity.HIGH
)

7.2 Prompt 优化技巧

技巧一:使用角色扮演(Role Playing)

# ❌ 普通 Prompt
prompt_description = "提取合同中的关键信息"

# ✅ 角色扮演 Prompt
prompt_description = """
你是一位拥有 20 年经验的企业法务顾问,擅长快速识别合同中的关键条款和潜在风险。
请从以下合同中提取关键信息,特别关注对甲方有重大影响的条款。
"""

技巧二:链式思考(Chain of Thought)

prompt_description = """
请按以下步骤分析文本:

Step 1: 通读全文,理解整体内容和上下文
Step 2: 识别所有命名实体(人名、地名、组织名、产品名)
Step 3: 识别实体之间的关系(谁做了什么)
Step 4: 根据上述分析,提取结构化信息

让我们一步步来,确保不遗漏任何关键信息。
"""

技巧三:输出格式强化

prompt_description = """
提取信息并以严格 JSON 格式输出。

重要要求:
1. 所有字符串必须用双引号(不能是单引号)
2. 不能有 trailing comma
3. null 必须小写
4. 如果某个字段无法确定,显式返回 null,不要返回空字符串
5. 输出必须是合法 JSON,可以被 json.loads() 解析

错误示例(不要这样):
{'name': 'John',}  # 单引号 + trailing comma

正确示例:
{"name": "John"}
"""

7.3 批量处理与速率限制

import time
from dataclasses import dataclass
from typing import Iterable

@dataclass
class RateLimiter:
    """令牌桶速率限制器"""
    
    max_calls: int      # 时间窗口内最大调用次数
    period: float = 60.0  # 时间窗口(秒)
    
    def __post_init__(self):
        self.calls: list[float] = []
    
    def acquire(self):
        """获取一个调用令牌,如果超限则阻塞等待"""
        now = time.time()
        
        # 移除时间窗口之外的记录
        self.calls = [t for t in self.calls if now - t < self.period]
        
        if len(self.calls) >= self.max_calls:
            # 计算需要等待的时间
            wait_time = self.period - (now - self.calls[0])
            if wait_time > 0:
                time.sleep(wait_time)
        
        self.calls.append(time.time())

# 使用速率限制器
rate_limiter = RateLimiter(max_calls=60, period=60)  # 60次/分钟

def extract_with_rate_limit(text: str, **kwargs):
    rate_limiter.acquire()  # 自动限速
    return lx.extract(text_or_documents=text, **kwargs)

7.4 成本优化实战

import tiktoken

def estimate_cost(text: str, model_id: str) -> float:
    """
    估算提取成本(美元)
    
    Args:
        text: 输入文本
        model_id: 模型 ID
    """
    
    # 价格表(示例,实际价格请参考 Google AI 官方文档)
    pricing = {
        "gemini-2.5-flash": {"input": 0.075 / 1e6, "output": 0.30 / 1e6},  # $/token
        "gemini-2.5-pro": {"input": 1.25 / 1e6, "output": 5.0 / 1e6}
    }
    
    # 估算 Token 数(简化估算:1 token ≈ 4 characters)
    input_tokens = len(text) / 4
    
    # 估算输出 Token 数(根据 Schema 复杂度)
    estimated_output_tokens = 500  # 可以根据 Schema 动态调整
    
    model_pricing = pricing[model_id]
    cost = (input_tokens * model_pricing["input"] + 
            estimated_output_tokens * model_pricing["output"])
    
    return cost

# 成本对比
text = "这是一个很长的合同文档..." * 100  # 假设 3000 字

flash_cost = estimate_cost(text, "gemini-2.5-flash")
pro_cost = estimate_cost(text, "gemini-2.5-pro")

print(f"Gemini Flash 估算成本: ${flash_cost:.4f}")
print(f"Gemini Pro 估算成本: ${pro_cost:.4f}")
print(f"成本差异: {pro_cost / flash_cost:.1f}x")

7.5 错误处理与重试机制

from functools import wraps
import random

def retry_with_exponential_backoff(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
):
    """
    指数退避重试装饰器
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = initial_delay
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                
                except RateLimitError:
                    if attempt == max_retries - 1:
                        raise
                    
                    sleep_time = delay
                    if jitter:
                        sleep_time = sleep_time * (1 + random.random())
                    
                    sleep_time = min(sleep_time, max_delay)
                    time.sleep(sleep_time)
                    delay *= exponential_base
                
                except Exception as e:
                    # 对于非速率限制错误,直接抛出
                    if "rate" not in str(e).lower():
                        raise
                    
                    if attempt == max_retries - 1:
                        raise
                    
                    time.sleep(delay)
                    delay *= exponential_base
        
        return wrapper
    return decorator

# 使用装饰器
@retry_with_exponential_backoff(max_retries=5, initial_delay=2.0)
def robust_extract(text: str, **kwargs):
    return lx.extract(text_or_documents=text, **kwargs)

8. 生产级部署方案

8.1 容器化部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 安装 LangExtract
COPY langextract /app/langextract
RUN pip install -e /app/langextract

# 配置环境变量
ENV GOOGLE_API_KEY=""
ENV MAX_CONCURRENT_REQUESTS="10"
ENV LOG_LEVEL="INFO"

# 复制应用代码
COPY app.py .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# app.py - FastAPI 封装
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Any, Optional
import langextract as lx

app = FastAPI(title="LangExtract API")

class ExtractRequest(BaseModel):
    text: str
    schema_definition: dict  # JSON Schema 格式
    model_id: str = "gemini-2.5-flash"
    grounding: bool = True

class ExtractResponse(BaseModel):
    success: bool
    data: Optional[dict] = None
    grounding: Optional[dict] = None
    error: Optional[str] = None

@app.post("/extract", response_model=ExtractResponse)
async def extract_endpoint(request: ExtractRequest):
    try:
        # 将 schema_definition 转换为 Pydantic Model
        # (实际实现需要动态创建 Pydantic Model)
        
        result = lx.extract(
            text_or_documents=request.text,
            prompt_description="提取结构化信息",
            examples=[],
            model_id=request.model_id,
            grounding=request.grounding
        )
        
        return ExtractResponse(
            success=True,
            data=result.data.model_dump(),
            grounding=result.grounding
        )
    
    except Exception as e:
        return ExtractResponse(
            success=False,
            error=str(e)
        )

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

8.2 与现有系统集成

与 Apache Airflow 集成

# Airflow DAG: 批量处理文档
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
import langextract as lx
from datetime import datetime

def process_document(file_path: str, **context):
    """处理单个文档的 Airflow Task"""
    
    # 从 GCS 读取文档
    from google.cloud import storage
    client = storage.Client()
    bucket = client.bucket("my-document-bucket")
    blob = bucket.blob(file_path)
    text = blob.download_as_text()
    
    # 使用 LangExtract 提取
    result = lx.extract(
        text_or_documents=text,
        prompt_description="提取文档关键信息",
        examples=[],
        model_id="gemini-2.5-flash"
    )
    
    # 将结果写回 GCS
    result_blob = bucket.blob(f"extracted/{file_path}.json")
    result_blob.upload_from_string(
        json.dumps(result.data.model_dump(), ensure_ascii=False)
    )

with DAG(
    dag_id="batch_document_extraction",
    start_date=datetime(2026, 1, 1),
    schedule_interval="0 2 * * *",  # 每天凌晨2点
    catchup=False
) as dag:
    
    # Task 1: 列出所有待处理文档
    list_files = GCSListObjectsOperator(
        task_id="list_files",
        bucket="my-document-bucket",
        prefix="raw/"
    )
    
    # Task 2: 并行处理文档
    process_files = PythonOperator(
        task_id="process_documents",
        python_callable=process_document,
        op_kwargs={"file_path": "{{ ti.xcom_pull(task_ids='list_files')[0] }}"},
        execution_timeout=300  # 5分钟超时
    )
    
    list_files >> process_files

8.3 监控与可观测性

import logging
from prometheus_client import Counter, Histogram, start_http_server
import time

# Prometheus 指标
EXTRACTION_REQUESTS = Counter(
    "lagextract_requests_total",
    "Total extraction requests",
    ["model_id", "status"]
)

EXTRACTION_LATENCY = Histogram(
    "lagextract_latency_seconds",
    "Extraction latency in seconds",
    ["model_id"]
)

EXTRACTION_COST = Counter(
    "lagextract_cost_total",
    "Total cost in USD",
    ["model_id"]
)

def monitored_extract(text: str, model_id: str, **kwargs):
    """带监控的提取函数"""
    
    start_time = time.time()
    
    try:
        result = lx.extract(
            text_or_documents=text,
            model_id=model_id,
            **kwargs
        )
        
        # 记录成功指标
        EXTRACTION_REQUESTS.labels(model_id=model_id, status="success").inc()
        
        # 记录延迟
        latency = time.time() - start_time
        EXTRACTION_LATENCY.labels(model_id=model_id).observe(latency)
        
        # 记录成本(需要实际实现成本计算)
        estimated_cost = estimate_cost(text, model_id)
        EXTRACTION_COST.labels(model_id=model_id).inc(estimated_cost)
        
        return result
    
    except Exception as e:
        # 记录失败指标
        EXTRACTION_REQUESTS.labels(model_id=model_id, status="error").inc()
        raise

# 启动 Prometheus metrics 服务器
start_http_server(8000)  # metrics 暴露在 :8000/metrics

9. LangExtract vs 其他方案对比

维度LangExtractLangChainLlamaIndex直接使用 LLM API
学习曲线低(声明式)
Source Grounding✅ 原生支持❌ 不支持❌ 不支持❌ 需自己实现
可视化✅ 内置 HTML 报告❌ 不支持❌ 不支持❌ 需自己实现
Schema 驱动✅ Pydantic 原生✅ 支持✅ 支持❌ 需自己实现
批量处理✅ 原生异步支持✅ 支持✅ 支持❌ 需自己实现
成本优化✅ 智能模型路由⚠️ 需自己实现⚠️ 需自己实现❌ 需自己实现
开源✅ Google 官方开源N/A
适合场景信息提取专项通用 LLM 应用RAG 应用高度定制需求

10. 总结与展望

10.1 核心收获

通过本文的深度实战,我们系统地掌握了 Google LangExtract 的:

  1. 设计哲学:声明式提取、可验证性优先、工程化思维
  2. 核心概念:Schema 驱动、Source Grounding、交互式可视化、模型路由
  3. 架构原理:从 Prompt 工程到输出解析的完整流水线
  4. 实战代码:客服、医疗、法律三大场景的完整实现
  5. 性能优化:模型选择、Prompt 优化、批量处理、成本优化
  6. 生产部署:容器化、与 Airflow 集成、监控可观测性

10.2 LangExtract 的适用边界

✅ 适合的场景

  • 从大量非结构化文本中提取结构化信息
  • 对提取结果的可审计性有要求(金融、医疗、法律)
  • 需要人机协作审核的提取任务
  • 批量文档处理

❌ 不适合的场景

  • 实时性要求极高的场景(LLM 调用有延迟)
  • 成本极度敏感的大规模应用(需仔细评估)
  • 需要复杂推理链的任务(考虑使用 Agent 框架)

10.3 未来展望

随着 LLM 能力的不断提升,信息提取领域正在经历一场革命:

  1. 多模态提取:未来的 LangExtract 可能支持从图片、视频中提取结构化信息
  2. 实时流式提取:支持对实时数据流(如直播字幕)的在线提取
  3. 领域自适应:自动根据领域调整 Prompt 和模型选择策略
  4. 与 Agent 框架深度融合:作为 Agent 的「感知层」,从环境中提取结构化信息

10.4 实践建议

如果你打算在生产环境中使用 LangExtract,建议按照以下步骤推进:

Phase 1: 原型验证(1-2周)
  → 选择1-2个典型场景
  → 手工准备 Few-shot 示例
  → 验证提取质量

Phase 2: 小规模试点(2-4周)
  → 处理 100-1000 份真实文档
  → 评估成本和时间
  → 优化 Prompt 和模型选择

Phase 3: 生产部署(4-8周)
  → 容器化部署
  → 接入监控和可观测性
  → 建立人工审核流程
  → 逐步扩大规模

11. 参考资料


本文完成于 2026 年 5 月,基于 LangExtract 最新版本。如有问题或建议,欢迎通过 程序员茄子 联系作者。


版权声明:本文由 程序员茄子 原创,转载请注明出处。

复制全文 生成海报 LangExtract LLM 结构化提取 Google Python

推荐文章

动态渐变背景
2024-11-19 01:49:50 +0800 CST
Vue3中的v-bind指令有什么新特性?
2024-11-18 14:58:47 +0800 CST
Vue3 vue-office 插件实现 Word 预览
2024-11-19 02:19:34 +0800 CST
php curl并发代码
2024-11-18 01:45:03 +0800 CST
html流光登陆页面
2024-11-18 15:36:18 +0800 CST
H5保险购买与投诉意见
2024-11-19 03:48:35 +0800 CST
前端代码规范 - Commit 提交规范
2024-11-18 10:18:08 +0800 CST
Go 如何做好缓存
2024-11-18 13:33:37 +0800 CST
JavaScript 策略模式
2024-11-19 07:34:29 +0800 CST
程序员茄子在线接单