编程 Context Engineering:超越 Prompt Engineering 的 AI 工程新范式

2026-07-05 01:47:09 +0800 CST views 8

Context Engineering:超越 Prompt Engineering 的 AI 工程新范式

2026年,AI工程领域正在经历一场静悄悄的革命。当所有人还在讨论如何写好Prompt的时候,顶尖的AI工程团队已经悄然转向了一个新的方向——Context Engineering(上下文工程)。这不是简单的术语替换,而是对AI系统理解、推理和生成能力的根本性重新思考。本文将深入探讨Context Engineering的核心概念、架构设计、实战工具和未来趋势,帮助你在AI工程的新时代保持竞争力。

目录

  1. 为什么 Prompt Engineering 不够了?
  2. Context Engineering 是什么?
  3. 核心概念与架构设计
  4. Context Window 的深度管理
  5. 上下文压缩技术详解
  6. 工具与框架实战
  7. 生产环境最佳实践
  8. 性能优化技巧
  9. 实战案例研究
  10. 未来展望

为什么 Prompt Engineering 不够了?

Prompt Engineering 的黄金时代与局限

Prompt Engineering(提示工程)在过去三年里一直是AI应用开发的核心技能。我们学会了如何使用Few-shot Learning提升模型表现,如何设计Chain-of-Thought提示词引导推理,如何使用Role Prompting设定模型角色,以及如何避免Prompt Injection攻击。

但是,随着AI应用的复杂度呈指数级提升,Prompt Engineering的局限性日益凸显。让我们通过具体代码示例来理解这些局限:

# 传统 Prompt Engineering 的典型实现
# 这种实现方式在生产环境中会面临严重的问题

import openai
import os

openai.api_key = os.getenv("OPENAI_API_KEY")

def naive_qa_system(question: str) -> str:
    """
    最基础的QA系统实现
    问题:每次都重新构建完整的prompt,无法积累上下文
    """
    prompt = f"""你是一个智能助手,请回答以下问题:
    
问题:{question}

请仔细思考后给出答案。"""
    
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# 这个实现的问题:
# 1. 没有记忆能力 - 每次对话都是全新的开始
# 2. 无法引用历史 - 用户说"接着刚才的话题",系统无法理解
# 3. Token浪费严重 - 每个相关问题都要重新解释背景
# 4. 无法处理复杂任务 - 多步骤推理需要手动拆解

真实生产环境的痛点

  1. 上下文丢失问题:在长对话中,模型会"忘记"早期的重要信息。例如,用户在第一轮说"我是Python新手",在第十轮问"我该用哪个框架?"时,模型可能已经忘记了用户是新手这个信息。

  2. 信息检索低效:传统Prompt Engineering无法动态获取外部知识。当问题需要最新的技术文档或特定领域的深度知识时,静态的Prompt无法应对。

  3. 推理链不可控:虽然Chain-of-Thought有一定效果,但模型可能跳过关键推理步骤,或者在复杂逻辑链中"迷失方向"。

  4. 成本不可控:每次都发送完整上下文导致Token消耗巨大。一个典型的企业级对话系统,每天可能产生数百万Token的费用。

从 Prompt 到 Context 的范式转变

Context Engineering的核心洞察是:大模型的表现不仅仅取决于你"说什么"(Prompt),更取决于你"在什么信息环境中说"(Context)

这个洞察来源于对大模型工作机制的深入理解。当我们向模型发送一个请求时,模型并不是孤立地理解这个请求,而是会综合考虑:

  • 系统提示词(System Prompt)
  • 对话历史(Conversation History)
  • 用户提供的补充信息(User-provided Context)
  • 模型自己生成的中间结果(Intermediate Results)
  • 外部工具返回的结果(Tool Results)

所有这些信息的集合,构成了模型眼中的"完整上下文"。Context Engineering就是要系统地设计和管理这个"信息环境"。

让我们通过架构对比图来理解这种范式转变:

传统 Prompt Engineering 视角(线性、静态):
┌─────────────────────────────────┐
│  Prompt (你说了什么)             │
│  ↓                              │
│  Model (模型处理)               │
│  ↓                              │
│  Output (输出)                  │
└─────────────────────────────────┘
问题:信息环境是静态的、一次性的

Context Engineering 视角(动态、系统化):
┌──────────────────────────────────────────────────┐
│  Information Environment (信息环境)               │
│  ├─ System Context (系统上下文)                  │
│  │   - 角色定义、行为准则、输出格式               │
│  ├─ Conversation History (对话历史)              │
│  │   - 完整对话记录、摘要、关键点提取             │
│  ├─ External Knowledge (外部知识)                │
│  │   - 向量数据库检索结果、API调用结果            │
│  ├─ Tool Results (工具调用结果)                  │
│  │   - 代码执行结果、搜索引擎返回、数据库查询     │
│  ├─ User Profile (用户画像)                      │
│  │   - 技能水平、偏好设置、历史行为               │
│  └─ Task State (任务状态)                        │
│      - 当前步骤、已完成子任务、待解决问题         │
│  ↓                                              │
│  Dynamic Context Assembly (动态上下文组装)       │
│  - 根据当前任务选择相关信息                       │
│  - 压缩低价值信息                                │
│  - 优化信息排列顺序                              │
│  ↓                                              │
│  Optimized Context Window (优化后的上下文窗口)   │
│  ↓                                              │
│  Model (模型处理)                               │
│  ↓                                              │
│  Structured Output + State Update               │
│  (结构化输出 + 状态更新,为下一轮做准备)          │
└──────────────────────────────────────────────────┘
优势:信息环境是动态的、系统化的、可优化的

Context Engineering 是什么?

正式定义

Context Engineering 是一门系统地设计、管理和优化大语言模型(LLM)信息环境的工程学科。它不是单一的技术或工具,而是一套完整的方法论和最佳实践集合。

Context Engineering包括以下几个核心领域:

1. 上下文感知(Context Awareness)

理解当前任务需要哪些信息,不需要哪些信息。这看起来简单,但实际上需要深刻的任务理解能力。

class ContextAwarenessAnalyzer:
    """
    上下文感知分析器:分析任务需要哪些类型的上下文
    """
    
    TASK_CONTEXT_REQUIREMENTS = {
        "code_generation": {
            "required": ["language_version", "existing_codebase", "dependencies"],
            "optional": ["coding_style", "performance_requirements", "examples"],
            "excluded": ["unrelated_history", "personal_info"]
        },
        "technical_qa": {
            "required": ["question", "relevant_docs"],
            "optional": ["user_skill_level", "related_concepts"],
            "excluded": ["outdated_docs", "opinion_not_facts"]
        },
        "debugging": {
            "required": ["error_message", "code_snippet", "environment_info"],
            "optional": ["attempted_solutions", "stack_trace"],
            "excluded": ["unrelated_code", "speculation"]
        }
    }
    
    def analyze_task(self, task_description: str) -> dict:
        """
        分析任务并返回上下文需求
        """
        task_type = self._classify_task(task_description)
        requirements = self.TASK_CONTEXT_REQUIREMENTS.get(task_type, {})
        
        return {
            "task_type": task_type,
            "must_have": requirements.get("required", []),
            "nice_to_have": requirements.get("optional", []),
            "must_not_have": requirements.get("excluded", [])
        }

2. 上下文获取(Context Retrieval)

从各种来源动态获取相关信息。这包括:

  • 向量数据库检索:从知识库中找到语义最相关的文档
  • 关键词检索:使用BM25等算法进行精确匹配
  • API调用:获取实时数据(天气、股票、最新新闻等)
  • 代码执行:运行代码获取实际输出
  • 用户交互:向用户询问必要的澄清信息
class MultiSourceContextRetriever:
    """
    多源上下文检索器
    从多个来源并行检索相关信息
    """
    
    def __init__(self):
        self.sources = {
            "vector_db": VectorDBRetriever(),
            "keyword_search": BM25Retriever(),
            "api": APIRetriever(),
            "code_executor": CodeExecutor(),
            "user_interaction": UserInteractionRetriever()
        }
    
    async def retrieve(self, query: str, task_type: str) -> dict:
        """
        从所有相关源并行检索上下文
        """
        import asyncio
        
        # 根据任务类型决定使用哪些源
        active_sources = self._select_sources_for_task(task_type)
        
        # 并行检索
        tasks = []
        for source_name in active_sources:
            source = self.sources[source_name]
            tasks.append(self._safe_retrieve(source, query))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 整合结果
        consolidated = {}
        for source_name, result in zip(active_sources, results):
            if isinstance(result, Exception):
                print(f"Warning: {source_name} retrieval failed: {result}")
                continue
            consolidated[source_name] = result
        
        return consolidated
    
    async def _safe_retrieve(self, source, query):
        """安全的检索包装器,防止单个源失败影响整体"""
        try:
            return await source.retrieve(query)
        except Exception as e:\n            return {"error": str(e), "content": ""}

3. 上下文压缩(Context Compression)

在有限的Context Window内最大化信息密度。这是Context Engineering中最具技术挑战的部分。

上下文压缩的核心思想是:不是所有信息都值得占用宝贵的Context Window空间。我们需要识别并保留高价值信息,压缩或丢弃低价值信息。

压缩技术分为三大类:

  1. Extractive Compression(抽取式压缩):从原文中抽取关键句子,保持原文措辞
  2. Abstractive Compression(生成式压缩):用大模型重新表述,保留意思但改变措辞
  3. Semantic Compression(语义压缩):基于语义相似度聚类,删除冗余信息

我们将在后续章节详细讨论这些技术。

4. 上下文维护(Context Maintenance)

管理长对话中的状态一致性。在多轮对话中,上下文是动态变化的,需要精心维护。

class ContextStateManager:
    """
    上下文状态管理器
    维护对话状态的一致性
    """
    
    def __init__(self):
        self.state = {
            "conversation_history": [],
            "extracted_facts": {},  # 从对话中提取的关键事实
            "user_preferences": {},
            "task_progress": {},
            "resolved_references": {}  # 解决"它"、"这个"等指代
        }
    
    def update_state(self, user_message: str, assistant_response: str):
        """更新对话状态"""
        
        # 1. 提取关键事实
        new_facts = self._extract_facts(user_message)
        self.state["extracted_facts"].update(new_facts)
        
        # 2. 更新用户偏好
        preferences = self._infer_preferences(user_message)
        self.state["user_preferences"].update(preferences)
        
        # 3. 记录对话历史(带摘要)
        self.state["conversation_history"].append({
            "user": user_message,
            "assistant": assistant_response,
            "summary": self._summarize_turn(user_message, assistant_response),
            "timestamp": time.time()
        })
        
        # 4. 解决指代
        self._resolve_references()
    
    def _extract_facts(self, text: str) -> dict:
        """
        从文本中提取关键事实
        例如:用户说"我叫张三,是Python开发者" → {"name": "张三", "role": "Python开发者"}
        """
        extraction_prompt = f"""从以下文本中提取关键事实,以JSON格式返回:
        
文本:{text}

提取的事实(JSON格式,只包含明确陈述的事实):"""
        
        response = llm(extraction_prompt)
        try:
            return json.loads(response)
        except:
            return {}

5. 上下文安全(Context Security)

防止敏感信息泄露和上下文污染。这是企业级应用最关心的部分。

常见安全风险

  1. 敏感信息泄露:上下文中的API密钥、密码、个人信息等被模型读取或输出
  2. 上下文污染:恶意用户通过精心构造的输入,让模型"记住"错误信息
  3. 跨用户泄露:多用户系统中,一个用户的信息泄露给另一个用户
class ContextSecurityManager:
    """
    上下文安全管理器
    """
    
    SENSITIVE_PATTERNS = [
        r"sk-[a-zA-Z0-9]{48}",  # OpenAI API Key
        r"ghp_[a-zA-Z0-9]{36}",  # GitHub Personal Access Token
        r"\b[\w._%+-]+@[\w.-]+\.\w{2,}\b",  # Email
        r"\b\d{3}-\d{2}-\d{4}\b",  # SSN
        r"\b(?:\d{4}[-\s]?){3}\d{4}\b",  # Credit Card
    ]
    
    def __init__(self):
        self.sensitive_detectors = [
            re.compile(pattern) for pattern in self.SENSITIVE_PATTERNS
        ]
    
    def sanitize_context(self, context: str) -> str:
        """
        清理上下文中的敏感信息
        """
        sanitized = context
        
        for detector in self.sensitive_detectors:
            sanitized = detector.sub("[REDACTED]", sanitized)
        
        # 使用LLM进行更深层的敏感信息检测
        sensitive_check_prompt = f"""检查以下文本是否包含敏感信息(密码、私钥、个人信息等)。
如果包含,返回清理后的文本;如果不包含,返回原文本。

文本:
{sanitized}

清理后的文本:"""
        
        return llm(sensitive_check_prompt)
    
    def isolate_user_context(self, user_id: str, context: dict) -> dict:
        """
        确保上下文隔离,防止跨用户泄露
        """
        # 为每个用户创建独立的上下文命名空间
        return {
            "user_id": user_id,
            "context": context,
            "isolated": True
        }

Context Engineering 的核心原则

原则1:信息密度优先

不是"把所有信息都塞进去",而是"只放最有价值的信息"

这个原则听起来简单,但执行起来需要深刻的理解。让我们看一个对比示例:

# 低信息密度的上下文(Bad Example)
# 这种上下文浪费了大量token在冗余信息上

context_low_density = f"""
用户之前说过:
- 他喜欢蓝色
- 他昨天问了关于Python的问题
- 他今天早上说要去开会
- 他觉得上一个回答太长
- 他在上午10点登录了系统
- 他的浏览器是Chrome
- 他使用了Mac电脑
...(再写5000 tokens of low-density chat history)

现在请回答他的问题:{current_question}
"""

# 问题分析:
# 1. "喜欢蓝色" - 与编程问题无关
# 2. "去开会" - 过时信息,与当前任务无关
# 3. "登录时间"、"浏览器"、"操作系统" - 除非特定场景需要,否则无关
# 4. 没有结构化 - 模型需要自己从头理解哪些信息相关


# 高信息密度的上下文(Good Example)
# 同样的信息量,但高度结构化,只保留相关内容

context_high_density = {
    "user_profile": {
        "expertise": "intermediate_python",
        "preferences": {
            "detail_level": "concise",  # 从"觉得上一个回答太长"推断
            "code_examples": True,
            "format": "markdown"
        }
    },
    "conversation_summary": {
        "last_topic": "debugging FastAPI async route",
        "current_stuck_point": "event loop integration with SQLAlchemy",
        "attempted_solutions": ["used async def", "tried asyncio.run()"],
        "user_frustration_level": "medium"
    },
    "relevant_docs": [
        {"title": "FastAPI Async Patterns", "relevance": 0.95},
        {"title": "Python asyncio Best Practices", "relevance": 0.88}
    ],
    "pending_tasks": [
        "explain event loop integration",
        "provide working code example"
    ]
}

# 优势:
# 1. 仅用 800 tokens 承载了等价的信息量(原来需要5000+ tokens)
# 2. 结构化 - 模型可以直接索引到需要的信息
# 3. 可维护 - 添加、删除、修改信息都很清晰
# 4. 可测试 - 可以单独测试每个字段的提取逻辑

原则2:动态上下文组装

上下文不是静态的,而是根据当前任务动态组装的

不同的任务需要不同类型的上下文。一个代码生成任务需要的上下文,与一个文档摘要任务需要的上下文,截然不同。

class DynamicContextAssembler:
    """
    动态上下文组装器
    根据任务类型动态组装最优上下文
    """
    
    def __init__(self, max_context_tokens: int = 128000):
        self.max_tokens = max_context_tokens
        self.token_budget = TokenBudgetManager(max_context_tokens)
        self.sources = self._init_context_sources()
    
    def assemble(self, task: Task, state: ConversationState) -> str:
        """
        根据任务类型动态组装上下文
        
        核心思路:
        1. 识别任务所需的信息类型
        2. 为每个信息类型分配token预算
        3. 从对应源检索信息
        4. 在预算内优化信息排列
        """
        
        # 1. 任务分析:这个任务需要什么信息?
        info_requirements = self._analyze_task_requirements(task)
        
        # 2. Token预算分配
        # 核心原则:高价值信息获得更多预算
        budgets = self._allocate_budget(info_requirements)
        
        # 3. 从各源检索信息
        context_parts = {}
        
        for info_type, budget in budgets.items():
            source = self.sources.get(info_type)
            if source:
                # 检索,但不超过预算
                items = source.retrieve(
                    query=task.query,
                    max_tokens=budget,
                    relevance_threshold=0.7  # 只保留相关度>0.7的信息
                )
                context_parts[info_type] = items
        
        # 4. 上下文优化
        # 4.1 去重:删除跨源的重复信息
        context_parts = self._deduplicate(context_parts)
        
        # 4.2 重排序:按相关度重新排列
        context_parts = self._rerank_by_relevance(context_parts, task.query)
        
        # 4.3 截断:如果超过预算,智能截断
        context_parts = self._smart_truncate(context_parts, self.max_tokens)
        
        # 5. 组装最终上下文
        return self._compose(context_parts, task.type)
    
    def _allocate_budget(self, requirements: dict) -> dict:
        """
        Token预算分配算法
        
        策略:
        - 必须信息(must_have):分配 60% 预算
        - 有用信息(nice_to_have):分配 30% 预算
        - 补充信息(supplementary):分配 10% 预算
        """
        total_budget = self.max_tokens
        
        must_have = requirements.get("must_have", [])
        nice_to_have = requirements.get("nice_to_have", [])
        
        # 必须信息平分60%
        must_budget = total_budget * 0.6 / max(len(must_have), 1)
        
        # 有用信息平分30%
        nice_budget = total_budget * 0.3 / max(len(nice_to_have), 1)
        
        budgets = {}
        for info_type in must_have:
            budgets[info_type] = int(must_budget)
        
        for info_type in nice_to_have:
            budgets[info_type] = int(nice_budget)
        
        # 剩余10%作为灵活预算
        budgets["flexible"] = int(total_budget * 0.1)
        
        return budgets

原则3:多模态上下文统一

2026年的Context Engineering必须处理多种信息形式

现代AI应用不再只是处理纯文本。一个典型的技术问答可能包含:

  • 文本:问题描述、背景说明
  • 代码:用户提供的代码片段、错误日志
  • 图像:架构图、流程图、截图
  • 表格:性能对比数据、配置参数
  • API响应:工具调用返回的结构化数据

所有这些都需要统一在上下文管理中。

from dataclasses import dataclass
from typing import Union, List
import base64
import json

@dataclass
class ContextItem:
    """
    统一的上下文项
    可以表示任何类型的信息
    """
    content_type: str  # "text" | "code" | "image" | "table" | "api_response" | "error_log"
    content: Union[str, bytes, dict]
    relevance_score: float  # 与当前任务的相关度 0-1
    token_estimate: int  # 估计占用的token数
    metadata: dict  # 额外信息(语言、来源、时间戳等)
    
    def to_context_string(self) -> str:
        """将上下文项转换为模型可理解的字符串表示"""
        
        if self.content_type == "text":
            return f"""[TEXT]
{self.content}
[/TEXT]"""
        
        elif self.content_type == "code":
            lang = self.metadata.get("language", "python")
            return f"""[CODE language={lang}]
{self.content}
[/CODE]"""
        
        elif self.content_type == "image":
            # 图像需要特殊处理:转换为base64或使用多模态模型的图像输入
            if isinstance(self.content, bytes):
                img_base64 = base64.b64encode(self.content).decode()
                return f"""[IMAGE]
{img_base64}
[/IMAGE]"""
            else:
                return f"""[IMAGE description]
{self.content}
[/IMAGE description]"""
        
        elif self.content_type == "table":
            # 表格转换为Markdown格式
            if isinstance(self.content, dict):
                # dict转换为表格
                table_md = "| Key | Value |\n|-----|-------|\n"
                for k, v in self.content.items():
                    table_md += f"| {k} | {v} |\n"
                return f"""[TABLE]
{table_md}
[/TABLE]"""
            else:
                return f"""[TABLE]
{self.content}
[/TABLE]"""
        
        elif self.content_type == "api_response":
            # API响应格式化为JSON
            try:
                response_json = json.dumps(self.content, indent=2, ensure_ascii=False)
                return f"""[API Response]
{response_json}
[/API Response]"""
            except:
                return f"""[API Response]
{str(self.content)}
[/API Response]"""
        
        elif self.content_type == "error_log":
            return f"""[Error Log]
{self.content}
[/Error Log]"""
        
        else:
            return f"""[Unknown Type: {self.content_type}]
{str(self.content)}
[/Unknown Type]"""

核心概念与架构设计

Context Window 的物理限制与突破策略

现代大模型的Context Window已经扩展到128K、200K甚至1M tokens(如Claude 3.5的200K、Gemini 2.0的1M)。但这并不意味着你可以无脑填充。原因如下:

限制1:注意力分散(Lost in the Middle)

研究表明,当上下文过长时,模型对上下文开头和结尾的内容记忆较好,但对中间部分的内容记忆较差。这个现象被称为"Lost in the Middle"。

# 实验:验证 Lost in the Middle 现象

def test_lost_in_the_middle(model, context_length: int = 100000):
    """
    测试模型在长上下文中的信息检索能力
    """
    
    # 构造一个长上下文,在随机位置插入一个关键信息
    import random
    
    context_parts = ["这是一段普通文本。" * 1000 for _ in range(context_length // 1000)]
    
    # 在随机位置插入关键信息
    key_position = random.randint(0, len(context_parts) - 1)
    key_info = "重要信息:用户密码是 ABC123XYZ"
    context_parts[key_position] = key_info
    
    long_context = "\n".join(context_parts)
    
    # 让模型回答
    prompt = f"""{long_context}

问题:用户密码是什么?
回答:"""
    
    response = model.generate(prompt)
    
    # 检查模型是否能找到关键信息
    if "ABC123XYZ" in response:
        print(f"✓ 在位置 {key_position} 的关键信息被成功检索")
        return True
    else:
        print(f"✗ 在位置 {key_position} 的关键信息未被检索")
        return False


# 测试结果(典型):
# 位置 0 (开头): 95% 成功率
# 位置 50% (中间): 40% 成功率  ← Lost in the Middle
# 位置 100% (结尾): 90% 成功率

解决方案

  1. 注意力锚点(Attention Anchors):在长上下文中定期插入提醒,帮助模型关注关键信息
  2. 信息重构(Information Restructuring):将最关键的信息放在开头或结尾
  3. 分层上下文(Layered Context):不是一次性发送所有信息,而是分层发送
def insert_attention_anchors(context: str, num_anchors: int = 3) -> str:
    """
    在长上下文中插入注意力锚点
    """
    lines = context.split('\n')
    total_lines = len(lines)
    
    # 在上下文的 25%, 50%, 75% 位置插入锚点
    anchor_positions = [
        int(total_lines * 0.25),
        int(total_lines * 0.5),
        int(total_lines * 0.75)
    ]
    
    anchored_lines = []
    for i, line in enumerate(lines):
        if i in anchor_positions:
            anchored_lines.append("\n" + "="*60)
            anchored_lines.append("⚠️  attention anchor: 以上是重要的背景信息,请仔细理解")
            anchored_lines.append("="*60 + "\n")
        anchored_lines.append(line)
    
    return '\n'.join(anchored_lines)


# 更高级的策略:使用结构化格式
def build_structured_long_context(sections: List[dict]) -> str:
    """
    使用明确的章节结构帮助模型理解长上下文
    """
    context_parts = []
    
    for i, section in enumerate(sections):
        # 每个section用特殊的分隔符和标记包裹
        section_text = f"""
{'='*80}
SECTION {i+1}: {section['title']}
重要性: {'⭐' * section.get('importance', 3)}
{'='*80}

{section['content']}

{'='*80}
END OF SECTION {i+1}
{'='*80}
"""
        context_parts.append(section_text)
    
    # 在最后添加一个总结性的"元信息"
    context_parts.append("""
{'='*80}
CONTEXT SUMMARY:
请在回答时参考以上所有SECTION,特别关注标记为⭐⭐⭐的内容。
如果某个SECTION不相关,可以忽略。
{'='*80}
""")
    
    return '\n'.join(context_parts)

限制2:推理质量下降

过量上下文会导致模型产生"幻觉"或遗漏关键细节。这是因为模型需要在更多信息中区分信号和噪声。

解决方案:渐进式上下文展开

async def progressive_context_expansion(query: str, llm):
    """
    渐进式上下文展开:从最小上下文开始,逐步增加直到得到满意答案
    """
    
    context_sizes = [
        (4000, "minimal"),      # 最小上下文:只有最核心信息
        (16000, "standard"),     # 标准上下文:加上相关文档
        (32000, "extended"),     # 扩展上下文:加上历史对话
        (64000, "comprehensive") # 完整上下文:加上所有可用信息
    ]
    
    for size, level in context_sizes:
        print(f"Trying context level: {level} ({size} tokens)")
        
        # 组装对应大小的上下文
        context = assembler.assemble(
            query,
            max_tokens=size,
            strategy=level
        )
        
        # 生成回答
        response = await llm.agenerate(
            prompt=build_prompt(query, context),
            max_tokens=4096
        )
        
        # 评估回答质量
        quality_score = evaluate_response_quality(response, query)
        
        if quality_score > 0.85:  # 置信度阈值
            print(f"✓ Sufficient context found at level: {level}")
            return {
                "response": response,
                "context_size": size,
                "quality_score": quality_score
            }
    
    # 所有大小都尝试了,返回最佳结果
    print("⚠ All context levels tried, returning best effort")
    return {"response": response, "context_size": size, "quality_score": quality_score}

限制3:成本线性增长

每个token都意味着成本。以GPT-4o为例:

  • 输入:$5 / 1M tokens
  • 输出:$15 / 1M tokens

如果一个系统每天处理10,000个请求,每个请求平均使用10K tokens输入,那么每天的成本是:

10,000 requests/day * 10K tokens/request * $5/1M tokens = $0.5/day (输入)
+ 10,000 requests/day * 2K tokens/response * $15/1M tokens = $0.3/day (输出)
= $0.8/day 基本成本

如果优化上下文,将每个请求的输入降至5K tokens:
10,000 * 5K * $5/1M = $0.25/day (输入)
节省 50% 成本!

解决方案:上下文缓存 + 上下文压缩

class CostOptimizedContextManager:
    """
    成本优化的上下文管理器
    """
    
    def __init__(self, llm_config: dict):
        self.llm_config = llm_config
        self.cache = ContextCache()
        self.compressor = ContextCompressor()
    
    async def get_context_optimized(self, query: str, cache_key: str = None):
        """
        获取成本优化的上下文
        """
        
        # 1. 检查缓存(降低成本)
        if cache_key:
            cached = self.cache.get(cache_key)
            if cached:
                print(f"✓ Context cache hit for key: {cache_key}")
                return cached
        
        # 2. 检索完整上下文
        full_context = await self.retriever.retrieve(query)
        full_tokens = count_tokens(full_context)
        
        # 3. 如果超过成本阈值,进行压缩
        cost_threshold = self.llm_config.get("cost_threshold_per_query", 0.05)
        estimated_cost = (full_tokens / 1000000) * self.llm_config["input_price_per_million"]
        
        if estimated_cost > cost_threshold:
            print(f"Context too large ({full_tokens} tokens), compressing...")
            compressed = self.compressor.compress(
                full_context,
                target_tokens=int(cost_threshold * 1000000 / self.llm_config["input_price_per_million"])
            )
            context = compressed
        else:
            context = full_context
        
        # 4. 缓存结果
        if cache_key:
            self.cache.set(cache_key, context)
        
        return context

突破策略:分层上下文架构(Layered Context Architecture)

这是处理长上下文的最有效架构之一。核心思想是:不是所有信息都需要在同一层级被处理

分层上下文架构:

Layer 0 (Always Present, ~2K tokens): 
  - 系统提示词(角色、行为准则、输出格式)
  - 当前用户输入
  - 优先级:最高(始终保留)

Layer 1 (High Priority, ~4K tokens):
  - 当前任务的直接上下文(问题描述、错误信息、代码片段)
  - 优先级:高(除非超出预算,否则保留)

Layer 2 (Medium Priority, ~8K tokens):
  - 相关历史对话(最近3-5轮)
  - 外部知识库检索结果(最相关的2-3个文档)
  - 优先级:中(可以根据相关性动态选择)

Layer 3 (Low Priority, ~16K tokens):
  - 背景知识(相关但不紧急的信息)
  - 参考文档(可能需要但不一定用得到)
  - 优先级:低(第一个被压缩或丢弃)

Layer 4 (Archive, 不占用实时Context Window):
  - 长期记忆(通过向量检索动态加载)
  - 完整历史(可以回溯但不主动加载)
  - 优先级:存档(按需检索)
class LayeredContextManager:
    """
    分层上下文管理器
    实现分层上下文架构
    """
    
    def __init__(self, max_context_tokens: int = 32000):
        self.max_tokens = max_context_tokens
        self.layers = {
            0: {"name": "core", "max_tokens": 2000, "strategy": "static", "priority": "critical"},
            1: {"name": "task", "max_tokens": 4000, "strategy": "dynamic_high", "priority": "high"},
            2: {"name": "relevant", "max_tokens": 8000, "strategy": "retrieval", "priority": "medium"},
            3: {"name": "background", "max_tokens": 16000, "strategy": "compression", "priority": "low"},
            4: {"name": "archive", "max_tokens": None, "strategy": "vector_search", "priority": "archive"}
        }
    
    def build_context(self, query: str, state: ConversationState) -> str:
        """
        构建分层上下文
        """
        context_layers = {}
        used_tokens = 0
        
        # 从最高优先级层开始处理
        for layer_id in sorted(self.layers.keys()):
            layer = self.layers[layer_id]
            
            # 检查token预算
            remaining_budget = self.max_tokens - used_tokens
            if remaining_budget < 500:  # 保留500 tokens的缓冲
                print(f"⚠ Token budget nearly exhausted, skipping layer {layer_id}")
                break
            
            # 获取该层的上下文
            if layer["strategy"] == "static":
                items = self._get_static_context()
            
            elif layer["strategy"] == "dynamic_high":
                items = self._get_task_context(query, max_tokens=min(layer["max_tokens"], remaining_budget))
            
            elif layer["strategy"] == "retrieval":
                items = self._retrieve_relevant(
                    query,
                    max_tokens=min(layer["max_tokens"], remaining_budget),
                    top_k=5
                )
            
            elif layer["strategy"] == "compression":
                items = self._get_compressed_background(
                    query,
                    max_tokens=min(layer["max_tokens"], remaining_budget)
                )
            
            elif layer["strategy"] == "vector_search":
                # Layer 4 不主动加载,只在需要时检索
                items = []  # 留空,后续按需检索
            
            # 统计token
            layer_tokens = sum(item.get("token_estimate", 0) for item in items)
            
            # 如果超出预算,压缩该层
            if used_tokens + layer_tokens > self.max_tokens:
                items = self._compress_layer_items(items, self.max_tokens - used_tokens)
                layer_tokens = sum(item.get("token_estimate", 0) for item in items)
            
            context_layers[layer_id] = {
                "name": layer["name"],
                "items": items,
                "token_count": layer_tokens
            }
            
            used_tokens += layer_tokens
        
        print(f"✓ Context built: {used_tokens} tokens across {len(context_layers)} layers")
        
        # 渲染为字符串
        return self._render_layers(context_layers)
    
    def _compress_layer_items(self, items: list, max_tokens: int) -> list:
        """
        压缩层内的上下文项以适应token预算
        """
        # 按相关度排序
        items_sorted = sorted(items, key=lambda x: x.get("relevance", 0), reverse=True)
        
        compressed = []
        token_count = 0
        
        for item in items_sorted:
            item_tokens = item.get("token_estimate", 0)
            
            if token_count + item_tokens <= max_tokens:
                compressed.append(item)
                token_count += item_tokens
            else:
                # 尝试压缩单个项
                if item.get("compressible", True):
                    compressed_item = self._compress_single_item(item, max_tokens - token_count)
                    if compressed_item:
                        compressed.append(compressed_item)
                break
        
        return compressed

上下文压缩技术详解

当信息量超过Context Window限制时,需要智能压缩而非简单截断。上下文压缩是Context Engineering中最具技术深度的领域之一。

技术1:Extractive Compression(抽取式压缩)

原理:从原文中抽取关键句子,保持原文措辞。适合需要准确保留技术细节的场景。

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
import torch

class ExtractiveCompressor:
    """
    抽取式压缩器
    使用小型模型提取关键信息
    """
    
    def __init__(self, model_name: str = "google/flan-t5-small"):
        """
        初始化压缩模型
        flan-t5-small 只有77M参数,速度快,适合实时压缩
        """
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(self.device)
        
        # 对于中文,可以使用:hfl/chinese-roberta-wwm-ext
        # 对于代码,可以使用:microsoft/codebert-base
    
    def compress(self, text: str, target_ratio: float = 0.3, preserve_code: bool = True) -> str:
        """
        将文本压缩到原来的target_ratio比例
        
        Args:
            text: 原始文本
            target_ratio: 目标压缩比(0.3表示保留30%)
            preserve_code: 是否保留代码块完整
        """
        
        # 1. 如果需要保留代码块,先分离代码和文本
        if preserve_code:
            import re
            code_blocks = re.findall(r'```(\w+)\n(.*?)\n```', text, re.DOTALL)
            text_without_code = re.sub(r'```.*?\n.*?\n```', ' [CODE_BLOCK] ', text, flags=re.DOTALL)
        else:
            text_without_code = text
            code_blocks = []
        
        # 2. 将文本分成句子
        sentences = self._split_into_sentences(text_without_code)
        
        if len(sentences) <= 3:
            # 文本太短,不需要压缩
            compressed_text = text_without_code
        else:
            # 3. 使用模型对每个句子进行重要性评分
            sentence_scores = self._score_sentences(sentences, text_without_code)
            
            # 4. 选择得分最高的句子(保留target_ratio比例)
            num_to_keep = max(1, int(len(sentences) * target_ratio))
            top_indices = sorted(range(len(sentence_scores)), key=lambda i: sentence_scores[i], reverse=True)[:num_to_keep]
            top_indices.sort()  # 保持原文顺序
            
            # 5. 重组文本
            compressed_text = " ".join([sentences[i] for i in top_indices])
        
        # 6. 将代码块插回
        if preserve_code and code_blocks:
            for lang, code in code_blocks:
                code_block = f"\n```{lang}\n{code}\n```"
                compressed_text = compressed_text.replace('[CODE_BLOCK]', code_block, 1)
        
        return compressed_text
    
    def _split_into_sentences(self, text: str) -> List[str]:
        """将文本分割成句子"""
        # 简单的句子分割(可以改用nltk或spacy)
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _score_sentences(self, sentences: List[str], original_text: str) -> List[float]:
        """
        对每个句子进行重要性评分
        
        评分标准:
        1. 包含关键词的句子得分高
        2. 出现在段首/段尾的句子得分高
        3. 包含数字、代码示例的句子得分高
        """
        scores = []
        
        for i, sentence in enumerate(sentences):
            score = 0.0
            
            # 标准1:位置权重
            if i == 0 or i == len(sentences) - 1:
                score += 0.3  # 段首段尾
            
            # 标准2:关键词检测
            keywords = ["重要", "关键", "注意", "必须", "important", "key", "note", "must"]
            if any(kw in sentence.lower() for kw in keywords):
                score += 0.3
            
            # 标准3:包含代码或数字
            if "```" in sentence or any(c.isdigit() for c in sentence):
                score += 0.2
            
            # 标准4:句子长度(太长或太短的句子可能不太重要)
            length = len(sentence.split())
            if 5 <= length <= 30:
                score += 0.2
            
            scores.append(min(score, 1.0))
        
        return scores

技术2:Abstractive Compression(生成式压缩)

原理:用大模型重新表述,保留意思但改变措辞。适合需要大幅压缩且可以接受一定信息损失的场景。

def abstractive_compress_with_code_preservation(text: str, llm, target_ratio: float = 0.4) -> str:
    """
    生成式压缩,但保留所有代码块完整
    
    这种压缩方式的优势:
    - 可以大幅压缩(40-60%压缩比)
    - 生成的摘要更连贯
    - 可以重写技术表述使其更清晰
    
    劣势:
    - 可能丢失细节
    - 需要调用LLM,有成本和延迟
    """
    
    import re
    
    # 1. 分离代码块和文本
    code_blocks = re.findall(r'(```[\w]*\n.*?\n```)', text, re.DOTALL)
    text_with_placeholders = text
    
    for i, block in enumerate(code_blocks):
        placeholder = f"\n[CODE_BLOCK_{i}]\n"
        text_with_placeholders = text_with_placeholders.replace(block, placeholder, 1)
    
    # 2. 压缩文本部分
    compression_prompt = f"""请将以下技术文档压缩到原来长度的{int(target_ratio*100)}%,要求:

1. 保留所有关键技术细节、参数名称、API接口名称
2. 保留所有的因果关系和逻辑推理链
3. 删除冗余的解释和重复的示例
4. 保持技术准确性,不要简化技术概念
5. 输出格式为Markdown

文档:
{text_with_placeholders}

压缩后的文档:"""

    compressed_text = llm(compression_prompt)
    
    # 3. 将代码块插回
    for i, block in enumerate(code_blocks):
        placeholder = f"[CODE_BLOCK_{i}]"
        if placeholder in compressed_text:
            compressed_text = compressed_text.replace(placeholder, block, 1)
        else:
            # 如果占位符被模型删除了,追加到末尾
            compressed_text += f"\n{block}\n"
    
    return compressed_text


# 批量压缩多个文档
async def batch_abstractive_compress(documents: List[str], llm, max_concurrent: int = 3) -> List[str]:
    """
    批量压缩多个文档,使用信号量控制并发
    """
    import asyncio
    from asyncio import Semaphore
    
    semaphore = Semaphore(max_concurrent)
    
    async def compress_one(doc: str) -> str:
        async with semaphore:
            return await llm.agenerate(
                prompt=f"压缩以下文档到50%长度,保留关键技术信息:\n\n{doc}",
                max_tokens=len(doc) // 2
            )
    
    tasks = [compress_one(doc) for doc in documents]
    results = await asyncio.gather(*tasks)
    
    return results

技术3:Semantic Compression(语义压缩)

原理:利用嵌入向量进行语义层面的压缩。将语义相似的句子聚类,每类只保留最具代表性的一个。

import numpy as np
from sklearn.cluster import KMeans
from sentence_transformers import SentenceTransformer

class SemanticCompressor:
    """
    基于语义相似度的上下文压缩
    
    适用场景:
    - 上下文中有大量重复或高度相似的信息
    - 需要从多个角度的综合信息中提取核心观点
    """
    
    def __init__(self, embedding_model_name: str = "BAAI/bge-large-zh-v1.5"):
        """
        初始化语义压缩器
        
        Args:
            embedding_model_name: 嵌入模型名称
                - 中文:BAAI/bge-large-zh-v1.5
                - 英文:sentence-transformers/all-MiniLM-L6-v2
                - 代码:microsoft/codebert-base
        """
        self.embedder = SentenceTransformer(embedding_model_name)
    
    def compress_by_clustering(self, texts: List[str], n_clusters: int = 5, preserve_top_k: int = 1) -> List[str]:
        """
        将相似的文本聚类,每类只保留最具代表性的一个(或top_k个)
        
        Args:
            texts: 文本列表
            n_clusters: 聚类数量(目标压缩比 = preserve_top_k / n_clusters)
            preserve_top_k: 每类保留几个样本
        """
        
        if len(texts) <= n_clusters:
            # 文本数量小于聚类数,无需压缩
            return texts
        
        # 1. 计算所有文本的嵌入
        embeddings = self.embedder.encode(texts)
        
        # 2. KMeans聚类
        n_clusters = min(n_clusters, len(texts))
        kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
        labels = kmeans.fit_predict(embeddings)
        
        # 3. 每类选择距离质心最近的文本(最具代表性的)
        compressed = []
        for i in range(n_clusters):
            cluster_indices = np.where(labels == i)[0]
            
            if len(cluster_indices) == 0:
                continue
            
            cluster_embeddings = embeddings[cluster_indices]
            centroid = kmeans.cluster_centers_[i]
            
            # 计算所有样本到质心的距离
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            
            # 选择距离最小的top_k个
            top_k_indices = cluster_indices[np.argsort(distances)[:preserve_top_k]]
            
            for idx in top_k_indices:
                compressed.append(texts[idx])
        
        return compressed
    
    def compress_by_similarity_threshold(self, texts: List[str], similarity_threshold: float = 0.85) -> List[str]:
        """
        基于相似度阈值的压缩:删除与其他文本高度相似的文本
        
        Args:
            texts: 文本列表
            similarity_threshold: 相似度阈值,超过此值认为是重复
        """
        
        embeddings = self.embedder.encode(texts)
        kept_indices = set([0])  # 至少保留第一个
        
        for i in range(1, len(texts)):
            # 计算当前文本与所有已保留文本的相似度
            similarities = [
                np.dot(embeddings[i], embeddings[j]) / 
                (np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]))
                for j in kept_indices
            ]
            
            max_similarity = max(similarities) if similarities else 0
            
            if max_similarity < similarity_threshold:
                # 与所有已保留文本都不太相似,保留
                kept_indices.add(i)
        
        return [texts[i] for i in sorted(kept_indices)]

技术4:LLMLingua-style Compression(基于小模型的极致压缩)

这种方法使用一个小模型(如GPT-2)来预测大模型(如GPT-4)会关注哪些token,然后只保留这些token。

class LLMLinguaCompressor:
    """
    基于LLMLingua论文的压缩方法
    
    核心思想:
    使用一个小模型(如GPT-2)计算每个token的重要性,
    然后只保留最重要的token(可以是30-50%)
    
    论文:LLMLingua: Compressing Prompts for Accelerated Inference of Large Language Models
    """
    
    def __init__(self, small_model_name: str = "gpt2"):
        from transformers import GPT2LMHeadModel, GPT2Tokenizer
        
        self.tokenizer = GPT2Tokenizer.from_pretrained(small_model_name)
        self.model = GPT2LMHeadModel.from_pretrained(small_model_name)
        self.model.eval()
    
    def compress(self, text: str, compression_ratio: float = 0.5) -> str:
        """
        压缩文本到指定比例
        
        Args:
            text: 原始文本
            compression_ratio: 压缩比(0.5表示保留50%的token)
        """
        
        # 1. Tokenize
        tokens = self.tokenizer.encode(text)
        if len(tokens) <= 10:
            return text  # 太短,无需压缩
        
        # 2. 计算每个token的重要性(使用perplexity)
        import torch
        
        token_importance = []
        
        for i in range(len(tokens)):
            # 计算包含这个token时的perplexity
            with torch.no_grad():
                input_ids = torch.tensor([tokens[:i+1]])
                outputs = self.model(input_ids)
                logits = outputs.logits
                
                # 计算这个token的log probability
                probs = torch.softmax(logits[0, -1, :], dim=0)
                token_prob = probs[tokens[i]]
                
                # 重要性 = -log(probability) (概率越低,越重要)
                importance = -torch.log(token_prob + 1e-10).item()
                token_importance.append((i, importance))
        
        # 3. 选择最重要的token(保留compression_ratio比例)
        token_importance.sort(key=lambda x: x[1], reverse=True)
        num_to_keep = max(1, int(len(tokens) * compression_ratio))
        
        kept_indices = sorted([idx for idx, _ in token_importance[:num_to_keep]])
        
        # 4. 重构文本(只保留选中的token)
        kept_tokens = [tokens[i] for i in kept_indices]
        
        # 注意:这种方法可能会破坏语法,需要后续处理
        compressed_text = self.tokenizer.decode(kept_tokens)
        
        return compressed_text

核心概念与架构设计

Context Window 的物理限制与突破策略

现代大模型的Context Window已经扩展到128K、200K甚至1M tokens(如Claude 3.5的200K、Gemini 2.0的1M)。但这并不意味着你可以无脑填充。原因如下:

限制1:注意力分散(Lost in the Middle)

研究表明,当上下文过长时,模型对上下文开头和结尾的内容记忆较好,但对中间部分的内容记忆较差。这个现象被称为"Lost in the Middle"。

# 实验:验证 Lost in the Middle 现象

def test_lost_in_the_middle(model, context_length: int = 100000):
    """
    测试模型在长上下文中的信息检索能力
    """
    
    # 构造一个长上下文,在随机位置插入一个关键信息
    import random
    
    context_parts = ["这是一段普通文本。" * 1000 for _ in range(context_length // 1000)]
    
    # 在随机位置插入关键信息
    key_position = random.randint(0, len(context_parts) - 1)
    key_info = "重要信息:用户密码是 ABC123XYZ"
    context_parts[key_position] = key_info
    
    long_context = "\n".join(context_parts)
    
    # 让模型回答
    prompt = f"""{long_context}

问题:用户密码是什么?
回答:"""
    
    response = model.generate(prompt)
    
    # 检查模型是否能找到关键信息
    if "ABC123XYZ" in response:
        print(f"✓ 在位置 {key_position} 的关键信息被成功检索")
        return True
    else:
        print(f"✗ 在位置 {key_position} 的关键信息未被检索")
        return False


# 测试结果(典型):
# 位置 0 (开头): 95% 成功率
# 位置 50% (中间): 40% 成功率  ← Lost in the Middle
# 位置 100% (结尾): 90% 成功率

解决方案

  1. 注意力锚点(Attention Anchors):在长上下文中定期插入提醒,帮助模型关注关键信息
  2. 信息重构(Information Restructuring):将最关键的信息放在开头或结尾
  3. 分层上下文(Layered Context):不是一次性发送所有信息,而是分层发送
def insert_attention_anchors(context: str, num_anchors: int = 3) -> str:
    """
    在长上下文中插入注意力锚点
    """
    lines = context.split('\n')
    total_lines = len(lines)
    
    # 在上下文的 25%, 50%, 75% 位置插入锚点
    anchor_positions = [
        int(total_lines * 0.25),
        int(total_lines * 0.5),
        int(total_lines * 0.75)
    ]
    
    anchored_lines = []
    for i, line in enumerate(lines):
        if i in anchor_positions:
            anchored_lines.append("\n" + "="*60)
            anchored_lines.append("⚠️  attention anchor: 以上是重要的背景信息,请仔细理解")
            anchored_lines.append("="*60 + "\n")
        anchored_lines.append(line)
    
    return '\n'.join(anchored_lines)


# 更高级的策略:使用结构化格式
def build_structured_long_context(sections: List[dict]) -> str:
    """
    使用明确的章节结构帮助模型理解长上下文
    """
    context_parts = []
    
    for i, section in enumerate(sections):
        # 每个section用特殊的分隔符和标记包裹
        section_text = f"""
{'='*80}
SECTION {i+1}: {section['title']}
重要性: {'⭐' * section.get('importance', 3)}
{'='*80}

{section['content']}

{'='*80}
END OF SECTION {i+1}
{'='*80}
"""
        context_parts.append(section_text)
    
    # 在最后添加一个总结性的"元信息"
    context_parts.append("""
{'='*80}
CONTEXT SUMMARY:
请在回答时参考以上所有SECTION,特别关注标记为⭐⭐⭐的内容。
如果某个SECTION不相关,可以忽略。
{'='*80}
""")
    
    return '\n'.join(context_parts)

限制2:推理质量下降

过量上下文会导致模型产生"幻觉"或遗漏关键细节。这是因为模型需要在更多信息中区分信号和噪声。

解决方案:渐进式上下文展开

async def progressive_context_expansion(query: str, llm):
    """
    渐进式上下文展开:从最小上下文开始,逐步增加直到得到满意答案
    """
    
    context_sizes = [
        (4000, "minimal"),      # 最小上下文:只有最核心信息
        (16000, "standard"),     # 标准上下文:加上相关文档
        (32000, "extended"),     # 扩展上下文:加上历史对话
        (64000, "comprehensive") # 完整上下文:加上所有可用信息
    ]
    
    for size, level in context_sizes:
        print(f"Trying context level: {level} ({size} tokens)")
        
        # 组装对应大小的上下文
        context = assembler.assemble(
            query,
            max_tokens=size,
            strategy=level
        )
        
        # 生成回答
        response = await llm.agenerate(
            prompt=build_prompt(query, context),
            max_tokens=4096
        )
        
        # 评估回答质量
        quality_score = evaluate_response_quality(response, query)
        
        if quality_score > 0.85:  # 置信度阈值
            print(f"✓ Sufficient context found at level: {level}")
            return {
                "response": response,
                "context_size": size,
                "quality_score": quality_score
            }
    
    # 所有大小都尝试了,返回最佳结果
    print("⚠ All context levels tried, returning best effort")
    return {"response": response, "context_size": size, "quality_score": quality_score}

限制3:成本线性增长

每个token都意味着成本。以GPT-4o为例:

  • 输入:$5 / 1M tokens
  • 输出:$15 / 1M tokens

如果一个系统每天处理10,000个请求,每个请求平均使用10K tokens输入,那么每天的成本是:

10,000 requests/day * 10K tokens/request * $5/1M tokens = $0.5/day (输入)
+ 10,000 requests/day * 2K tokens/response * $15/1M tokens = $0.3/day (输出)
= $0.8/day 基本成本

如果优化上下文,将每个请求的输入降至5K tokens:
10,000 * 5K * $5/1M = $0.25/day (输入)
节省 50% 成本!

解决方案:上下文缓存 + 上下文压缩

class CostOptimizedContextManager:
    """
    成本优化的上下文管理器
    """
    
    def __init__(self, llm_config: dict):
        self.llm_config = llm_config
        self.cache = ContextCache()
        self.compressor = ContextCompressor()
    
    async def get_context_optimized(self, query: str, cache_key: str = None):
        """
        获取成本优化的上下文
        """
        
        # 1. 检查缓存(降低成本)
        if cache_key:
            cached = self.cache.get(cache_key)
            if cached:
                print(f"✓ Context cache hit for key: {cache_key}")
                return cached
        
        # 2. 检索完整上下文
        full_context = await self.retriever.retrieve(query)
        full_tokens = count_tokens(full_context)
        
        # 3. 如果超过成本阈值,进行压缩
        cost_threshold = self.llm_config.get("cost_threshold_per_query", 0.05)
        estimated_cost = (full_tokens / 1000000) * self.llm_config["input_price_per_million"]
        
        if estimated_cost > cost_threshold:
            print(f"Context too large ({full_tokens} tokens), compressing...")
            compressed = self.compressor.compress(
                full_context,
                target_tokens=int(cost_threshold * 1000000 / self.llm_config["input_price_per_million"])
            )
            context = compressed
        else:
            context = full_context
        
        # 4. 缓存结果
        if cache_key:
            self.cache.set(cache_key, context)
        
        return context

突破策略:分层上下文架构(Layered Context Architecture)

这是处理长上下文的最有效架构之一。核心思想是:不是所有信息都需要在同一层级被处理

分层上下文架构:

Layer 0 (Always Present, ~2K tokens): 
  - 系统提示词(角色、行为准则、输出格式)
  - 当前用户输入
  - 优先级:最高(始终保留)

Layer 1 (High Priority, ~4K tokens):
  - 当前任务的直接上下文(问题描述、错误信息、代码片段)
  - 优先级:高(除非超出预算,否则保留)

Layer 2 (Medium Priority, ~8K tokens):
  - 相关历史对话(最近3-5轮)
  - 外部知识库检索结果(最相关的2-3个文档)
  - 优先级:中(可以根据相关性动态选择)

Layer 3 (Low Priority, ~16K tokens):
  - 背景知识(相关但不紧急的信息)
  - 参考文档(可能需要但不一定用得到)
  - 优先级:低(第一个被压缩或丢弃)

Layer 4 (Archive, 不占用实时Context Window):
  - 长期记忆(通过向量检索动态加载)
  - 完整历史(可以回溯但不主动加载)
  - 优先级:存档(按需检索)
class LayeredContextManager:
    """
    分层上下文管理器
    实现分层上下文架构
    """
    
    def __init__(self, max_context_tokens: int = 32000):
        self.max_tokens = max_context_tokens
        self.layers = {
            0: {"name": "core", "max_tokens": 2000, "strategy": "static", "priority": "critical"},
            1: {"name": "task", "max_tokens": 4000, "strategy": "dynamic_high", "priority": "high"},
            2: {"name": "relevant", "max_tokens": 8000, "strategy": "retrieval", "priority": "medium"},
            3: {"name": "background", "max_tokens": 16000, "strategy": "compression", "priority": "low"},
            4: {"name": "archive", "max_tokens": None, "strategy": "vector_search", "priority": "archive"}
        }
    
    def build_context(self, query: str, state: ConversationState) -> str:
        """
        构建分层上下文
        """
        context_layers = {}
        used_tokens = 0
        
        # 从最高优先级层开始处理
        for layer_id in sorted(self.layers.keys()):
            layer = self.layers[layer_id]
            
            # 检查token预算
            remaining_budget = self.max_tokens - used_tokens
            if remaining_budget < 500:  # 保留500 tokens的缓冲
                print(f"⚠ Token budget nearly exhausted, skipping layer {layer_id}")
                break
            
            # 获取该层的上下文
            if layer["strategy"] == "static":
                items = self._get_static_context()
            
            elif layer["strategy"] == "dynamic_high":
                items = self._get_task_context(query, max_tokens=min(layer["max_tokens"], remaining_budget))
            
            elif layer["strategy"] == "retrieval":
                items = self._retrieve_relevant(
                    query,
                    max_tokens=min(layer["max_tokens"], remaining_budget),
                    top_k=5
                )
            
            elif layer["strategy"] == "compression":
                items = self._get_compressed_background(
                    query,
                    max_tokens=min(layer["max_tokens"], remaining_budget)
                )
            
            elif layer["strategy"] == "vector_search":
                # Layer 4 不主动加载,只在需要时检索
                items = []  # 留空,后续按需检索
            
            # 统计token
            layer_tokens = sum(item.get("token_estimate", 0) for item in items)
            
            # 如果超出预算,压缩该层
            if used_tokens + layer_tokens > self.max_tokens:
                items = self._compress_layer_items(items, self.max_tokens - used_tokens)
                layer_tokens = sum(item.get("token_estimate", 0) for item in items)
            
            context_layers[layer_id] = {
                "name": layer["name"],
                "items": items,
                "token_count": layer_tokens
            }
            
            used_tokens += layer_tokens
        
        print(f"✓ Context built: {used_tokens} tokens across {len(context_layers)} layers")
        
        # 渲染为字符串
        return self._render_layers(context_layers)
    
    def _compress_layer_items(self, items: list, max_tokens: int) -> list:
        """
        压缩层内的上下文项以适应token预算
        """
        # 按相关度排序
        items_sorted = sorted(items, key=lambda x: x.get("relevance", 0), reverse=True)
        
        compressed = []
        token_count = 0
        
        for item in items_sorted:
            item_tokens = item.get("token_estimate", 0)
            
            if token_count + item_tokens <= max_tokens:
                compressed.append(item)
                token_count += item_tokens
            else:
                # 尝试压缩单个项
                if item.get("compressible", True):
                    compressed_item = self._compress_single_item(item, max_tokens - token_count)
                    if compressed_item:
                        compressed.append(compressed_item)
                break
        
        return compressed

推荐文章

随机分数html
2025-01-25 10:56:34 +0800 CST
Go 1.23 中的新包:unique
2024-11-18 12:32:57 +0800 CST
liunx服务器监控workerman进程守护
2024-11-18 13:28:44 +0800 CST
用 Rust 构建一个 WebSocket 服务器
2024-11-19 10:08:22 +0800 CST
从Go开发者的视角看Rust
2024-11-18 11:49:49 +0800 CST
程序员茄子在线接单