Context Engineering:超越 Prompt Engineering 的 AI 工程新范式
2026年,AI工程领域正在经历一场静悄悄的革命。当所有人还在讨论如何写好Prompt的时候,顶尖的AI工程团队已经悄然转向了一个新的方向——Context Engineering(上下文工程)。这不是简单的术语替换,而是对AI系统理解、推理和生成能力的根本性重新思考。本文将深入探讨Context Engineering的核心概念、架构设计、实战工具和未来趋势,帮助你在AI工程的新时代保持竞争力。
目录
- 为什么 Prompt Engineering 不够了?
- Context Engineering 是什么?
- 核心概念与架构设计
- Context Window 的深度管理
- 上下文压缩技术详解
- 工具与框架实战
- 生产环境最佳实践
- 性能优化技巧
- 实战案例研究
- 未来展望
为什么 Prompt Engineering 不够了?
Prompt Engineering 的黄金时代与局限
Prompt Engineering(提示工程)在过去三年里一直是AI应用开发的核心技能。我们学会了如何使用Few-shot Learning提升模型表现,如何设计Chain-of-Thought提示词引导推理,如何使用Role Prompting设定模型角色,以及如何避免Prompt Injection攻击。
但是,随着AI应用的复杂度呈指数级提升,Prompt Engineering的局限性日益凸显。让我们通过具体代码示例来理解这些局限:
# 传统 Prompt Engineering 的典型实现
# 这种实现方式在生产环境中会面临严重的问题
import openai
import os
openai.api_key = os.getenv("OPENAI_API_KEY")
def naive_qa_system(question: str) -> str:
"""
最基础的QA系统实现
问题:每次都重新构建完整的prompt,无法积累上下文
"""
prompt = f"""你是一个智能助手,请回答以下问题:
问题:{question}
请仔细思考后给出答案。"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# 这个实现的问题:
# 1. 没有记忆能力 - 每次对话都是全新的开始
# 2. 无法引用历史 - 用户说"接着刚才的话题",系统无法理解
# 3. Token浪费严重 - 每个相关问题都要重新解释背景
# 4. 无法处理复杂任务 - 多步骤推理需要手动拆解
真实生产环境的痛点:
上下文丢失问题:在长对话中,模型会"忘记"早期的重要信息。例如,用户在第一轮说"我是Python新手",在第十轮问"我该用哪个框架?"时,模型可能已经忘记了用户是新手这个信息。
信息检索低效:传统Prompt Engineering无法动态获取外部知识。当问题需要最新的技术文档或特定领域的深度知识时,静态的Prompt无法应对。
推理链不可控:虽然Chain-of-Thought有一定效果,但模型可能跳过关键推理步骤,或者在复杂逻辑链中"迷失方向"。
成本不可控:每次都发送完整上下文导致Token消耗巨大。一个典型的企业级对话系统,每天可能产生数百万Token的费用。
从 Prompt 到 Context 的范式转变
Context Engineering的核心洞察是:大模型的表现不仅仅取决于你"说什么"(Prompt),更取决于你"在什么信息环境中说"(Context)。
这个洞察来源于对大模型工作机制的深入理解。当我们向模型发送一个请求时,模型并不是孤立地理解这个请求,而是会综合考虑:
- 系统提示词(System Prompt)
- 对话历史(Conversation History)
- 用户提供的补充信息(User-provided Context)
- 模型自己生成的中间结果(Intermediate Results)
- 外部工具返回的结果(Tool Results)
所有这些信息的集合,构成了模型眼中的"完整上下文"。Context Engineering就是要系统地设计和管理这个"信息环境"。
让我们通过架构对比图来理解这种范式转变:
传统 Prompt Engineering 视角(线性、静态):
┌─────────────────────────────────┐
│ Prompt (你说了什么) │
│ ↓ │
│ Model (模型处理) │
│ ↓ │
│ Output (输出) │
└─────────────────────────────────┘
问题:信息环境是静态的、一次性的
Context Engineering 视角(动态、系统化):
┌──────────────────────────────────────────────────┐
│ Information Environment (信息环境) │
│ ├─ System Context (系统上下文) │
│ │ - 角色定义、行为准则、输出格式 │
│ ├─ Conversation History (对话历史) │
│ │ - 完整对话记录、摘要、关键点提取 │
│ ├─ External Knowledge (外部知识) │
│ │ - 向量数据库检索结果、API调用结果 │
│ ├─ Tool Results (工具调用结果) │
│ │ - 代码执行结果、搜索引擎返回、数据库查询 │
│ ├─ User Profile (用户画像) │
│ │ - 技能水平、偏好设置、历史行为 │
│ └─ Task State (任务状态) │
│ - 当前步骤、已完成子任务、待解决问题 │
│ ↓ │
│ Dynamic Context Assembly (动态上下文组装) │
│ - 根据当前任务选择相关信息 │
│ - 压缩低价值信息 │
│ - 优化信息排列顺序 │
│ ↓ │
│ Optimized Context Window (优化后的上下文窗口) │
│ ↓ │
│ Model (模型处理) │
│ ↓ │
│ Structured Output + State Update │
│ (结构化输出 + 状态更新,为下一轮做准备) │
└──────────────────────────────────────────────────┘
优势:信息环境是动态的、系统化的、可优化的
Context Engineering 是什么?
正式定义
Context Engineering 是一门系统地设计、管理和优化大语言模型(LLM)信息环境的工程学科。它不是单一的技术或工具,而是一套完整的方法论和最佳实践集合。
Context Engineering包括以下几个核心领域:
1. 上下文感知(Context Awareness)
理解当前任务需要哪些信息,不需要哪些信息。这看起来简单,但实际上需要深刻的任务理解能力。
class ContextAwarenessAnalyzer:
"""
上下文感知分析器:分析任务需要哪些类型的上下文
"""
TASK_CONTEXT_REQUIREMENTS = {
"code_generation": {
"required": ["language_version", "existing_codebase", "dependencies"],
"optional": ["coding_style", "performance_requirements", "examples"],
"excluded": ["unrelated_history", "personal_info"]
},
"technical_qa": {
"required": ["question", "relevant_docs"],
"optional": ["user_skill_level", "related_concepts"],
"excluded": ["outdated_docs", "opinion_not_facts"]
},
"debugging": {
"required": ["error_message", "code_snippet", "environment_info"],
"optional": ["attempted_solutions", "stack_trace"],
"excluded": ["unrelated_code", "speculation"]
}
}
def analyze_task(self, task_description: str) -> dict:
"""
分析任务并返回上下文需求
"""
task_type = self._classify_task(task_description)
requirements = self.TASK_CONTEXT_REQUIREMENTS.get(task_type, {})
return {
"task_type": task_type,
"must_have": requirements.get("required", []),
"nice_to_have": requirements.get("optional", []),
"must_not_have": requirements.get("excluded", [])
}
2. 上下文获取(Context Retrieval)
从各种来源动态获取相关信息。这包括:
- 向量数据库检索:从知识库中找到语义最相关的文档
- 关键词检索:使用BM25等算法进行精确匹配
- API调用:获取实时数据(天气、股票、最新新闻等)
- 代码执行:运行代码获取实际输出
- 用户交互:向用户询问必要的澄清信息
class MultiSourceContextRetriever:
"""
多源上下文检索器
从多个来源并行检索相关信息
"""
def __init__(self):
self.sources = {
"vector_db": VectorDBRetriever(),
"keyword_search": BM25Retriever(),
"api": APIRetriever(),
"code_executor": CodeExecutor(),
"user_interaction": UserInteractionRetriever()
}
async def retrieve(self, query: str, task_type: str) -> dict:
"""
从所有相关源并行检索上下文
"""
import asyncio
# 根据任务类型决定使用哪些源
active_sources = self._select_sources_for_task(task_type)
# 并行检索
tasks = []
for source_name in active_sources:
source = self.sources[source_name]
tasks.append(self._safe_retrieve(source, query))
results = await asyncio.gather(*tasks, return_exceptions=True)
# 整合结果
consolidated = {}
for source_name, result in zip(active_sources, results):
if isinstance(result, Exception):
print(f"Warning: {source_name} retrieval failed: {result}")
continue
consolidated[source_name] = result
return consolidated
async def _safe_retrieve(self, source, query):
"""安全的检索包装器,防止单个源失败影响整体"""
try:
return await source.retrieve(query)
except Exception as e:\n return {"error": str(e), "content": ""}
3. 上下文压缩(Context Compression)
在有限的Context Window内最大化信息密度。这是Context Engineering中最具技术挑战的部分。
上下文压缩的核心思想是:不是所有信息都值得占用宝贵的Context Window空间。我们需要识别并保留高价值信息,压缩或丢弃低价值信息。
压缩技术分为三大类:
- Extractive Compression(抽取式压缩):从原文中抽取关键句子,保持原文措辞
- Abstractive Compression(生成式压缩):用大模型重新表述,保留意思但改变措辞
- Semantic Compression(语义压缩):基于语义相似度聚类,删除冗余信息
我们将在后续章节详细讨论这些技术。
4. 上下文维护(Context Maintenance)
管理长对话中的状态一致性。在多轮对话中,上下文是动态变化的,需要精心维护。
class ContextStateManager:
"""
上下文状态管理器
维护对话状态的一致性
"""
def __init__(self):
self.state = {
"conversation_history": [],
"extracted_facts": {}, # 从对话中提取的关键事实
"user_preferences": {},
"task_progress": {},
"resolved_references": {} # 解决"它"、"这个"等指代
}
def update_state(self, user_message: str, assistant_response: str):
"""更新对话状态"""
# 1. 提取关键事实
new_facts = self._extract_facts(user_message)
self.state["extracted_facts"].update(new_facts)
# 2. 更新用户偏好
preferences = self._infer_preferences(user_message)
self.state["user_preferences"].update(preferences)
# 3. 记录对话历史(带摘要)
self.state["conversation_history"].append({
"user": user_message,
"assistant": assistant_response,
"summary": self._summarize_turn(user_message, assistant_response),
"timestamp": time.time()
})
# 4. 解决指代
self._resolve_references()
def _extract_facts(self, text: str) -> dict:
"""
从文本中提取关键事实
例如:用户说"我叫张三,是Python开发者" → {"name": "张三", "role": "Python开发者"}
"""
extraction_prompt = f"""从以下文本中提取关键事实,以JSON格式返回:
文本:{text}
提取的事实(JSON格式,只包含明确陈述的事实):"""
response = llm(extraction_prompt)
try:
return json.loads(response)
except:
return {}
5. 上下文安全(Context Security)
防止敏感信息泄露和上下文污染。这是企业级应用最关心的部分。
常见安全风险:
- 敏感信息泄露:上下文中的API密钥、密码、个人信息等被模型读取或输出
- 上下文污染:恶意用户通过精心构造的输入,让模型"记住"错误信息
- 跨用户泄露:多用户系统中,一个用户的信息泄露给另一个用户
class ContextSecurityManager:
"""
上下文安全管理器
"""
SENSITIVE_PATTERNS = [
r"sk-[a-zA-Z0-9]{48}", # OpenAI API Key
r"ghp_[a-zA-Z0-9]{36}", # GitHub Personal Access Token
r"\b[\w._%+-]+@[\w.-]+\.\w{2,}\b", # Email
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b(?:\d{4}[-\s]?){3}\d{4}\b", # Credit Card
]
def __init__(self):
self.sensitive_detectors = [
re.compile(pattern) for pattern in self.SENSITIVE_PATTERNS
]
def sanitize_context(self, context: str) -> str:
"""
清理上下文中的敏感信息
"""
sanitized = context
for detector in self.sensitive_detectors:
sanitized = detector.sub("[REDACTED]", sanitized)
# 使用LLM进行更深层的敏感信息检测
sensitive_check_prompt = f"""检查以下文本是否包含敏感信息(密码、私钥、个人信息等)。
如果包含,返回清理后的文本;如果不包含,返回原文本。
文本:
{sanitized}
清理后的文本:"""
return llm(sensitive_check_prompt)
def isolate_user_context(self, user_id: str, context: dict) -> dict:
"""
确保上下文隔离,防止跨用户泄露
"""
# 为每个用户创建独立的上下文命名空间
return {
"user_id": user_id,
"context": context,
"isolated": True
}
Context Engineering 的核心原则
原则1:信息密度优先
不是"把所有信息都塞进去",而是"只放最有价值的信息"。
这个原则听起来简单,但执行起来需要深刻的理解。让我们看一个对比示例:
# 低信息密度的上下文(Bad Example)
# 这种上下文浪费了大量token在冗余信息上
context_low_density = f"""
用户之前说过:
- 他喜欢蓝色
- 他昨天问了关于Python的问题
- 他今天早上说要去开会
- 他觉得上一个回答太长
- 他在上午10点登录了系统
- 他的浏览器是Chrome
- 他使用了Mac电脑
...(再写5000 tokens of low-density chat history)
现在请回答他的问题:{current_question}
"""
# 问题分析:
# 1. "喜欢蓝色" - 与编程问题无关
# 2. "去开会" - 过时信息,与当前任务无关
# 3. "登录时间"、"浏览器"、"操作系统" - 除非特定场景需要,否则无关
# 4. 没有结构化 - 模型需要自己从头理解哪些信息相关
# 高信息密度的上下文(Good Example)
# 同样的信息量,但高度结构化,只保留相关内容
context_high_density = {
"user_profile": {
"expertise": "intermediate_python",
"preferences": {
"detail_level": "concise", # 从"觉得上一个回答太长"推断
"code_examples": True,
"format": "markdown"
}
},
"conversation_summary": {
"last_topic": "debugging FastAPI async route",
"current_stuck_point": "event loop integration with SQLAlchemy",
"attempted_solutions": ["used async def", "tried asyncio.run()"],
"user_frustration_level": "medium"
},
"relevant_docs": [
{"title": "FastAPI Async Patterns", "relevance": 0.95},
{"title": "Python asyncio Best Practices", "relevance": 0.88}
],
"pending_tasks": [
"explain event loop integration",
"provide working code example"
]
}
# 优势:
# 1. 仅用 800 tokens 承载了等价的信息量(原来需要5000+ tokens)
# 2. 结构化 - 模型可以直接索引到需要的信息
# 3. 可维护 - 添加、删除、修改信息都很清晰
# 4. 可测试 - 可以单独测试每个字段的提取逻辑
原则2:动态上下文组装
上下文不是静态的,而是根据当前任务动态组装的。
不同的任务需要不同类型的上下文。一个代码生成任务需要的上下文,与一个文档摘要任务需要的上下文,截然不同。
class DynamicContextAssembler:
"""
动态上下文组装器
根据任务类型动态组装最优上下文
"""
def __init__(self, max_context_tokens: int = 128000):
self.max_tokens = max_context_tokens
self.token_budget = TokenBudgetManager(max_context_tokens)
self.sources = self._init_context_sources()
def assemble(self, task: Task, state: ConversationState) -> str:
"""
根据任务类型动态组装上下文
核心思路:
1. 识别任务所需的信息类型
2. 为每个信息类型分配token预算
3. 从对应源检索信息
4. 在预算内优化信息排列
"""
# 1. 任务分析:这个任务需要什么信息?
info_requirements = self._analyze_task_requirements(task)
# 2. Token预算分配
# 核心原则:高价值信息获得更多预算
budgets = self._allocate_budget(info_requirements)
# 3. 从各源检索信息
context_parts = {}
for info_type, budget in budgets.items():
source = self.sources.get(info_type)
if source:
# 检索,但不超过预算
items = source.retrieve(
query=task.query,
max_tokens=budget,
relevance_threshold=0.7 # 只保留相关度>0.7的信息
)
context_parts[info_type] = items
# 4. 上下文优化
# 4.1 去重:删除跨源的重复信息
context_parts = self._deduplicate(context_parts)
# 4.2 重排序:按相关度重新排列
context_parts = self._rerank_by_relevance(context_parts, task.query)
# 4.3 截断:如果超过预算,智能截断
context_parts = self._smart_truncate(context_parts, self.max_tokens)
# 5. 组装最终上下文
return self._compose(context_parts, task.type)
def _allocate_budget(self, requirements: dict) -> dict:
"""
Token预算分配算法
策略:
- 必须信息(must_have):分配 60% 预算
- 有用信息(nice_to_have):分配 30% 预算
- 补充信息(supplementary):分配 10% 预算
"""
total_budget = self.max_tokens
must_have = requirements.get("must_have", [])
nice_to_have = requirements.get("nice_to_have", [])
# 必须信息平分60%
must_budget = total_budget * 0.6 / max(len(must_have), 1)
# 有用信息平分30%
nice_budget = total_budget * 0.3 / max(len(nice_to_have), 1)
budgets = {}
for info_type in must_have:
budgets[info_type] = int(must_budget)
for info_type in nice_to_have:
budgets[info_type] = int(nice_budget)
# 剩余10%作为灵活预算
budgets["flexible"] = int(total_budget * 0.1)
return budgets
原则3:多模态上下文统一
2026年的Context Engineering必须处理多种信息形式。
现代AI应用不再只是处理纯文本。一个典型的技术问答可能包含:
- 文本:问题描述、背景说明
- 代码:用户提供的代码片段、错误日志
- 图像:架构图、流程图、截图
- 表格:性能对比数据、配置参数
- API响应:工具调用返回的结构化数据
所有这些都需要统一在上下文管理中。
from dataclasses import dataclass
from typing import Union, List
import base64
import json
@dataclass
class ContextItem:
"""
统一的上下文项
可以表示任何类型的信息
"""
content_type: str # "text" | "code" | "image" | "table" | "api_response" | "error_log"
content: Union[str, bytes, dict]
relevance_score: float # 与当前任务的相关度 0-1
token_estimate: int # 估计占用的token数
metadata: dict # 额外信息(语言、来源、时间戳等)
def to_context_string(self) -> str:
"""将上下文项转换为模型可理解的字符串表示"""
if self.content_type == "text":
return f"""[TEXT]
{self.content}
[/TEXT]"""
elif self.content_type == "code":
lang = self.metadata.get("language", "python")
return f"""[CODE language={lang}]
{self.content}
[/CODE]"""
elif self.content_type == "image":
# 图像需要特殊处理:转换为base64或使用多模态模型的图像输入
if isinstance(self.content, bytes):
img_base64 = base64.b64encode(self.content).decode()
return f"""[IMAGE]
{img_base64}
[/IMAGE]"""
else:
return f"""[IMAGE description]
{self.content}
[/IMAGE description]"""
elif self.content_type == "table":
# 表格转换为Markdown格式
if isinstance(self.content, dict):
# dict转换为表格
table_md = "| Key | Value |\n|-----|-------|\n"
for k, v in self.content.items():
table_md += f"| {k} | {v} |\n"
return f"""[TABLE]
{table_md}
[/TABLE]"""
else:
return f"""[TABLE]
{self.content}
[/TABLE]"""
elif self.content_type == "api_response":
# API响应格式化为JSON
try:
response_json = json.dumps(self.content, indent=2, ensure_ascii=False)
return f"""[API Response]
{response_json}
[/API Response]"""
except:
return f"""[API Response]
{str(self.content)}
[/API Response]"""
elif self.content_type == "error_log":
return f"""[Error Log]
{self.content}
[/Error Log]"""
else:
return f"""[Unknown Type: {self.content_type}]
{str(self.content)}
[/Unknown Type]"""
核心概念与架构设计
Context Window 的物理限制与突破策略
现代大模型的Context Window已经扩展到128K、200K甚至1M tokens(如Claude 3.5的200K、Gemini 2.0的1M)。但这并不意味着你可以无脑填充。原因如下:
限制1:注意力分散(Lost in the Middle)
研究表明,当上下文过长时,模型对上下文开头和结尾的内容记忆较好,但对中间部分的内容记忆较差。这个现象被称为"Lost in the Middle"。
# 实验:验证 Lost in the Middle 现象
def test_lost_in_the_middle(model, context_length: int = 100000):
"""
测试模型在长上下文中的信息检索能力
"""
# 构造一个长上下文,在随机位置插入一个关键信息
import random
context_parts = ["这是一段普通文本。" * 1000 for _ in range(context_length // 1000)]
# 在随机位置插入关键信息
key_position = random.randint(0, len(context_parts) - 1)
key_info = "重要信息:用户密码是 ABC123XYZ"
context_parts[key_position] = key_info
long_context = "\n".join(context_parts)
# 让模型回答
prompt = f"""{long_context}
问题:用户密码是什么?
回答:"""
response = model.generate(prompt)
# 检查模型是否能找到关键信息
if "ABC123XYZ" in response:
print(f"✓ 在位置 {key_position} 的关键信息被成功检索")
return True
else:
print(f"✗ 在位置 {key_position} 的关键信息未被检索")
return False
# 测试结果(典型):
# 位置 0 (开头): 95% 成功率
# 位置 50% (中间): 40% 成功率 ← Lost in the Middle
# 位置 100% (结尾): 90% 成功率
解决方案:
- 注意力锚点(Attention Anchors):在长上下文中定期插入提醒,帮助模型关注关键信息
- 信息重构(Information Restructuring):将最关键的信息放在开头或结尾
- 分层上下文(Layered Context):不是一次性发送所有信息,而是分层发送
def insert_attention_anchors(context: str, num_anchors: int = 3) -> str:
"""
在长上下文中插入注意力锚点
"""
lines = context.split('\n')
total_lines = len(lines)
# 在上下文的 25%, 50%, 75% 位置插入锚点
anchor_positions = [
int(total_lines * 0.25),
int(total_lines * 0.5),
int(total_lines * 0.75)
]
anchored_lines = []
for i, line in enumerate(lines):
if i in anchor_positions:
anchored_lines.append("\n" + "="*60)
anchored_lines.append("⚠️ attention anchor: 以上是重要的背景信息,请仔细理解")
anchored_lines.append("="*60 + "\n")
anchored_lines.append(line)
return '\n'.join(anchored_lines)
# 更高级的策略:使用结构化格式
def build_structured_long_context(sections: List[dict]) -> str:
"""
使用明确的章节结构帮助模型理解长上下文
"""
context_parts = []
for i, section in enumerate(sections):
# 每个section用特殊的分隔符和标记包裹
section_text = f"""
{'='*80}
SECTION {i+1}: {section['title']}
重要性: {'⭐' * section.get('importance', 3)}
{'='*80}
{section['content']}
{'='*80}
END OF SECTION {i+1}
{'='*80}
"""
context_parts.append(section_text)
# 在最后添加一个总结性的"元信息"
context_parts.append("""
{'='*80}
CONTEXT SUMMARY:
请在回答时参考以上所有SECTION,特别关注标记为⭐⭐⭐的内容。
如果某个SECTION不相关,可以忽略。
{'='*80}
""")
return '\n'.join(context_parts)
限制2:推理质量下降
过量上下文会导致模型产生"幻觉"或遗漏关键细节。这是因为模型需要在更多信息中区分信号和噪声。
解决方案:渐进式上下文展开
async def progressive_context_expansion(query: str, llm):
"""
渐进式上下文展开:从最小上下文开始,逐步增加直到得到满意答案
"""
context_sizes = [
(4000, "minimal"), # 最小上下文:只有最核心信息
(16000, "standard"), # 标准上下文:加上相关文档
(32000, "extended"), # 扩展上下文:加上历史对话
(64000, "comprehensive") # 完整上下文:加上所有可用信息
]
for size, level in context_sizes:
print(f"Trying context level: {level} ({size} tokens)")
# 组装对应大小的上下文
context = assembler.assemble(
query,
max_tokens=size,
strategy=level
)
# 生成回答
response = await llm.agenerate(
prompt=build_prompt(query, context),
max_tokens=4096
)
# 评估回答质量
quality_score = evaluate_response_quality(response, query)
if quality_score > 0.85: # 置信度阈值
print(f"✓ Sufficient context found at level: {level}")
return {
"response": response,
"context_size": size,
"quality_score": quality_score
}
# 所有大小都尝试了,返回最佳结果
print("⚠ All context levels tried, returning best effort")
return {"response": response, "context_size": size, "quality_score": quality_score}
限制3:成本线性增长
每个token都意味着成本。以GPT-4o为例:
- 输入:$5 / 1M tokens
- 输出:$15 / 1M tokens
如果一个系统每天处理10,000个请求,每个请求平均使用10K tokens输入,那么每天的成本是:
10,000 requests/day * 10K tokens/request * $5/1M tokens = $0.5/day (输入)
+ 10,000 requests/day * 2K tokens/response * $15/1M tokens = $0.3/day (输出)
= $0.8/day 基本成本
如果优化上下文,将每个请求的输入降至5K tokens:
10,000 * 5K * $5/1M = $0.25/day (输入)
节省 50% 成本!
解决方案:上下文缓存 + 上下文压缩
class CostOptimizedContextManager:
"""
成本优化的上下文管理器
"""
def __init__(self, llm_config: dict):
self.llm_config = llm_config
self.cache = ContextCache()
self.compressor = ContextCompressor()
async def get_context_optimized(self, query: str, cache_key: str = None):
"""
获取成本优化的上下文
"""
# 1. 检查缓存(降低成本)
if cache_key:
cached = self.cache.get(cache_key)
if cached:
print(f"✓ Context cache hit for key: {cache_key}")
return cached
# 2. 检索完整上下文
full_context = await self.retriever.retrieve(query)
full_tokens = count_tokens(full_context)
# 3. 如果超过成本阈值,进行压缩
cost_threshold = self.llm_config.get("cost_threshold_per_query", 0.05)
estimated_cost = (full_tokens / 1000000) * self.llm_config["input_price_per_million"]
if estimated_cost > cost_threshold:
print(f"Context too large ({full_tokens} tokens), compressing...")
compressed = self.compressor.compress(
full_context,
target_tokens=int(cost_threshold * 1000000 / self.llm_config["input_price_per_million"])
)
context = compressed
else:
context = full_context
# 4. 缓存结果
if cache_key:
self.cache.set(cache_key, context)
return context
突破策略:分层上下文架构(Layered Context Architecture)
这是处理长上下文的最有效架构之一。核心思想是:不是所有信息都需要在同一层级被处理。
分层上下文架构:
Layer 0 (Always Present, ~2K tokens):
- 系统提示词(角色、行为准则、输出格式)
- 当前用户输入
- 优先级:最高(始终保留)
Layer 1 (High Priority, ~4K tokens):
- 当前任务的直接上下文(问题描述、错误信息、代码片段)
- 优先级:高(除非超出预算,否则保留)
Layer 2 (Medium Priority, ~8K tokens):
- 相关历史对话(最近3-5轮)
- 外部知识库检索结果(最相关的2-3个文档)
- 优先级:中(可以根据相关性动态选择)
Layer 3 (Low Priority, ~16K tokens):
- 背景知识(相关但不紧急的信息)
- 参考文档(可能需要但不一定用得到)
- 优先级:低(第一个被压缩或丢弃)
Layer 4 (Archive, 不占用实时Context Window):
- 长期记忆(通过向量检索动态加载)
- 完整历史(可以回溯但不主动加载)
- 优先级:存档(按需检索)
class LayeredContextManager:
"""
分层上下文管理器
实现分层上下文架构
"""
def __init__(self, max_context_tokens: int = 32000):
self.max_tokens = max_context_tokens
self.layers = {
0: {"name": "core", "max_tokens": 2000, "strategy": "static", "priority": "critical"},
1: {"name": "task", "max_tokens": 4000, "strategy": "dynamic_high", "priority": "high"},
2: {"name": "relevant", "max_tokens": 8000, "strategy": "retrieval", "priority": "medium"},
3: {"name": "background", "max_tokens": 16000, "strategy": "compression", "priority": "low"},
4: {"name": "archive", "max_tokens": None, "strategy": "vector_search", "priority": "archive"}
}
def build_context(self, query: str, state: ConversationState) -> str:
"""
构建分层上下文
"""
context_layers = {}
used_tokens = 0
# 从最高优先级层开始处理
for layer_id in sorted(self.layers.keys()):
layer = self.layers[layer_id]
# 检查token预算
remaining_budget = self.max_tokens - used_tokens
if remaining_budget < 500: # 保留500 tokens的缓冲
print(f"⚠ Token budget nearly exhausted, skipping layer {layer_id}")
break
# 获取该层的上下文
if layer["strategy"] == "static":
items = self._get_static_context()
elif layer["strategy"] == "dynamic_high":
items = self._get_task_context(query, max_tokens=min(layer["max_tokens"], remaining_budget))
elif layer["strategy"] == "retrieval":
items = self._retrieve_relevant(
query,
max_tokens=min(layer["max_tokens"], remaining_budget),
top_k=5
)
elif layer["strategy"] == "compression":
items = self._get_compressed_background(
query,
max_tokens=min(layer["max_tokens"], remaining_budget)
)
elif layer["strategy"] == "vector_search":
# Layer 4 不主动加载,只在需要时检索
items = [] # 留空,后续按需检索
# 统计token
layer_tokens = sum(item.get("token_estimate", 0) for item in items)
# 如果超出预算,压缩该层
if used_tokens + layer_tokens > self.max_tokens:
items = self._compress_layer_items(items, self.max_tokens - used_tokens)
layer_tokens = sum(item.get("token_estimate", 0) for item in items)
context_layers[layer_id] = {
"name": layer["name"],
"items": items,
"token_count": layer_tokens
}
used_tokens += layer_tokens
print(f"✓ Context built: {used_tokens} tokens across {len(context_layers)} layers")
# 渲染为字符串
return self._render_layers(context_layers)
def _compress_layer_items(self, items: list, max_tokens: int) -> list:
"""
压缩层内的上下文项以适应token预算
"""
# 按相关度排序
items_sorted = sorted(items, key=lambda x: x.get("relevance", 0), reverse=True)
compressed = []
token_count = 0
for item in items_sorted:
item_tokens = item.get("token_estimate", 0)
if token_count + item_tokens <= max_tokens:
compressed.append(item)
token_count += item_tokens
else:
# 尝试压缩单个项
if item.get("compressible", True):
compressed_item = self._compress_single_item(item, max_tokens - token_count)
if compressed_item:
compressed.append(compressed_item)
break
return compressed
上下文压缩技术详解
当信息量超过Context Window限制时,需要智能压缩而非简单截断。上下文压缩是Context Engineering中最具技术深度的领域之一。
技术1:Extractive Compression(抽取式压缩)
原理:从原文中抽取关键句子,保持原文措辞。适合需要准确保留技术细节的场景。
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
import torch
class ExtractiveCompressor:
"""
抽取式压缩器
使用小型模型提取关键信息
"""
def __init__(self, model_name: str = "google/flan-t5-small"):
"""
初始化压缩模型
flan-t5-small 只有77M参数,速度快,适合实时压缩
"""
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(self.device)
# 对于中文,可以使用:hfl/chinese-roberta-wwm-ext
# 对于代码,可以使用:microsoft/codebert-base
def compress(self, text: str, target_ratio: float = 0.3, preserve_code: bool = True) -> str:
"""
将文本压缩到原来的target_ratio比例
Args:
text: 原始文本
target_ratio: 目标压缩比(0.3表示保留30%)
preserve_code: 是否保留代码块完整
"""
# 1. 如果需要保留代码块,先分离代码和文本
if preserve_code:
import re
code_blocks = re.findall(r'```(\w+)\n(.*?)\n```', text, re.DOTALL)
text_without_code = re.sub(r'```.*?\n.*?\n```', ' [CODE_BLOCK] ', text, flags=re.DOTALL)
else:
text_without_code = text
code_blocks = []
# 2. 将文本分成句子
sentences = self._split_into_sentences(text_without_code)
if len(sentences) <= 3:
# 文本太短,不需要压缩
compressed_text = text_without_code
else:
# 3. 使用模型对每个句子进行重要性评分
sentence_scores = self._score_sentences(sentences, text_without_code)
# 4. 选择得分最高的句子(保留target_ratio比例)
num_to_keep = max(1, int(len(sentences) * target_ratio))
top_indices = sorted(range(len(sentence_scores)), key=lambda i: sentence_scores[i], reverse=True)[:num_to_keep]
top_indices.sort() # 保持原文顺序
# 5. 重组文本
compressed_text = " ".join([sentences[i] for i in top_indices])
# 6. 将代码块插回
if preserve_code and code_blocks:
for lang, code in code_blocks:
code_block = f"\n```{lang}\n{code}\n```"
compressed_text = compressed_text.replace('[CODE_BLOCK]', code_block, 1)
return compressed_text
def _split_into_sentences(self, text: str) -> List[str]:
"""将文本分割成句子"""
# 简单的句子分割(可以改用nltk或spacy)
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def _score_sentences(self, sentences: List[str], original_text: str) -> List[float]:
"""
对每个句子进行重要性评分
评分标准:
1. 包含关键词的句子得分高
2. 出现在段首/段尾的句子得分高
3. 包含数字、代码示例的句子得分高
"""
scores = []
for i, sentence in enumerate(sentences):
score = 0.0
# 标准1:位置权重
if i == 0 or i == len(sentences) - 1:
score += 0.3 # 段首段尾
# 标准2:关键词检测
keywords = ["重要", "关键", "注意", "必须", "important", "key", "note", "must"]
if any(kw in sentence.lower() for kw in keywords):
score += 0.3
# 标准3:包含代码或数字
if "```" in sentence or any(c.isdigit() for c in sentence):
score += 0.2
# 标准4:句子长度(太长或太短的句子可能不太重要)
length = len(sentence.split())
if 5 <= length <= 30:
score += 0.2
scores.append(min(score, 1.0))
return scores
技术2:Abstractive Compression(生成式压缩)
原理:用大模型重新表述,保留意思但改变措辞。适合需要大幅压缩且可以接受一定信息损失的场景。
def abstractive_compress_with_code_preservation(text: str, llm, target_ratio: float = 0.4) -> str:
"""
生成式压缩,但保留所有代码块完整
这种压缩方式的优势:
- 可以大幅压缩(40-60%压缩比)
- 生成的摘要更连贯
- 可以重写技术表述使其更清晰
劣势:
- 可能丢失细节
- 需要调用LLM,有成本和延迟
"""
import re
# 1. 分离代码块和文本
code_blocks = re.findall(r'(```[\w]*\n.*?\n```)', text, re.DOTALL)
text_with_placeholders = text
for i, block in enumerate(code_blocks):
placeholder = f"\n[CODE_BLOCK_{i}]\n"
text_with_placeholders = text_with_placeholders.replace(block, placeholder, 1)
# 2. 压缩文本部分
compression_prompt = f"""请将以下技术文档压缩到原来长度的{int(target_ratio*100)}%,要求:
1. 保留所有关键技术细节、参数名称、API接口名称
2. 保留所有的因果关系和逻辑推理链
3. 删除冗余的解释和重复的示例
4. 保持技术准确性,不要简化技术概念
5. 输出格式为Markdown
文档:
{text_with_placeholders}
压缩后的文档:"""
compressed_text = llm(compression_prompt)
# 3. 将代码块插回
for i, block in enumerate(code_blocks):
placeholder = f"[CODE_BLOCK_{i}]"
if placeholder in compressed_text:
compressed_text = compressed_text.replace(placeholder, block, 1)
else:
# 如果占位符被模型删除了,追加到末尾
compressed_text += f"\n{block}\n"
return compressed_text
# 批量压缩多个文档
async def batch_abstractive_compress(documents: List[str], llm, max_concurrent: int = 3) -> List[str]:
"""
批量压缩多个文档,使用信号量控制并发
"""
import asyncio
from asyncio import Semaphore
semaphore = Semaphore(max_concurrent)
async def compress_one(doc: str) -> str:
async with semaphore:
return await llm.agenerate(
prompt=f"压缩以下文档到50%长度,保留关键技术信息:\n\n{doc}",
max_tokens=len(doc) // 2
)
tasks = [compress_one(doc) for doc in documents]
results = await asyncio.gather(*tasks)
return results
技术3:Semantic Compression(语义压缩)
原理:利用嵌入向量进行语义层面的压缩。将语义相似的句子聚类,每类只保留最具代表性的一个。
import numpy as np
from sklearn.cluster import KMeans
from sentence_transformers import SentenceTransformer
class SemanticCompressor:
"""
基于语义相似度的上下文压缩
适用场景:
- 上下文中有大量重复或高度相似的信息
- 需要从多个角度的综合信息中提取核心观点
"""
def __init__(self, embedding_model_name: str = "BAAI/bge-large-zh-v1.5"):
"""
初始化语义压缩器
Args:
embedding_model_name: 嵌入模型名称
- 中文:BAAI/bge-large-zh-v1.5
- 英文:sentence-transformers/all-MiniLM-L6-v2
- 代码:microsoft/codebert-base
"""
self.embedder = SentenceTransformer(embedding_model_name)
def compress_by_clustering(self, texts: List[str], n_clusters: int = 5, preserve_top_k: int = 1) -> List[str]:
"""
将相似的文本聚类,每类只保留最具代表性的一个(或top_k个)
Args:
texts: 文本列表
n_clusters: 聚类数量(目标压缩比 = preserve_top_k / n_clusters)
preserve_top_k: 每类保留几个样本
"""
if len(texts) <= n_clusters:
# 文本数量小于聚类数,无需压缩
return texts
# 1. 计算所有文本的嵌入
embeddings = self.embedder.encode(texts)
# 2. KMeans聚类
n_clusters = min(n_clusters, len(texts))
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = kmeans.fit_predict(embeddings)
# 3. 每类选择距离质心最近的文本(最具代表性的)
compressed = []
for i in range(n_clusters):
cluster_indices = np.where(labels == i)[0]
if len(cluster_indices) == 0:
continue
cluster_embeddings = embeddings[cluster_indices]
centroid = kmeans.cluster_centers_[i]
# 计算所有样本到质心的距离
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
# 选择距离最小的top_k个
top_k_indices = cluster_indices[np.argsort(distances)[:preserve_top_k]]
for idx in top_k_indices:
compressed.append(texts[idx])
return compressed
def compress_by_similarity_threshold(self, texts: List[str], similarity_threshold: float = 0.85) -> List[str]:
"""
基于相似度阈值的压缩:删除与其他文本高度相似的文本
Args:
texts: 文本列表
similarity_threshold: 相似度阈值,超过此值认为是重复
"""
embeddings = self.embedder.encode(texts)
kept_indices = set([0]) # 至少保留第一个
for i in range(1, len(texts)):
# 计算当前文本与所有已保留文本的相似度
similarities = [
np.dot(embeddings[i], embeddings[j]) /
(np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]))
for j in kept_indices
]
max_similarity = max(similarities) if similarities else 0
if max_similarity < similarity_threshold:
# 与所有已保留文本都不太相似,保留
kept_indices.add(i)
return [texts[i] for i in sorted(kept_indices)]
技术4:LLMLingua-style Compression(基于小模型的极致压缩)
这种方法使用一个小模型(如GPT-2)来预测大模型(如GPT-4)会关注哪些token,然后只保留这些token。
class LLMLinguaCompressor:
"""
基于LLMLingua论文的压缩方法
核心思想:
使用一个小模型(如GPT-2)计算每个token的重要性,
然后只保留最重要的token(可以是30-50%)
论文:LLMLingua: Compressing Prompts for Accelerated Inference of Large Language Models
"""
def __init__(self, small_model_name: str = "gpt2"):
from transformers import GPT2LMHeadModel, GPT2Tokenizer
self.tokenizer = GPT2Tokenizer.from_pretrained(small_model_name)
self.model = GPT2LMHeadModel.from_pretrained(small_model_name)
self.model.eval()
def compress(self, text: str, compression_ratio: float = 0.5) -> str:
"""
压缩文本到指定比例
Args:
text: 原始文本
compression_ratio: 压缩比(0.5表示保留50%的token)
"""
# 1. Tokenize
tokens = self.tokenizer.encode(text)
if len(tokens) <= 10:
return text # 太短,无需压缩
# 2. 计算每个token的重要性(使用perplexity)
import torch
token_importance = []
for i in range(len(tokens)):
# 计算包含这个token时的perplexity
with torch.no_grad():
input_ids = torch.tensor([tokens[:i+1]])
outputs = self.model(input_ids)
logits = outputs.logits
# 计算这个token的log probability
probs = torch.softmax(logits[0, -1, :], dim=0)
token_prob = probs[tokens[i]]
# 重要性 = -log(probability) (概率越低,越重要)
importance = -torch.log(token_prob + 1e-10).item()
token_importance.append((i, importance))
# 3. 选择最重要的token(保留compression_ratio比例)
token_importance.sort(key=lambda x: x[1], reverse=True)
num_to_keep = max(1, int(len(tokens) * compression_ratio))
kept_indices = sorted([idx for idx, _ in token_importance[:num_to_keep]])
# 4. 重构文本(只保留选中的token)
kept_tokens = [tokens[i] for i in kept_indices]
# 注意:这种方法可能会破坏语法,需要后续处理
compressed_text = self.tokenizer.decode(kept_tokens)
return compressed_text
核心概念与架构设计
Context Window 的物理限制与突破策略
现代大模型的Context Window已经扩展到128K、200K甚至1M tokens(如Claude 3.5的200K、Gemini 2.0的1M)。但这并不意味着你可以无脑填充。原因如下:
限制1:注意力分散(Lost in the Middle)
研究表明,当上下文过长时,模型对上下文开头和结尾的内容记忆较好,但对中间部分的内容记忆较差。这个现象被称为"Lost in the Middle"。
# 实验:验证 Lost in the Middle 现象
def test_lost_in_the_middle(model, context_length: int = 100000):
"""
测试模型在长上下文中的信息检索能力
"""
# 构造一个长上下文,在随机位置插入一个关键信息
import random
context_parts = ["这是一段普通文本。" * 1000 for _ in range(context_length // 1000)]
# 在随机位置插入关键信息
key_position = random.randint(0, len(context_parts) - 1)
key_info = "重要信息:用户密码是 ABC123XYZ"
context_parts[key_position] = key_info
long_context = "\n".join(context_parts)
# 让模型回答
prompt = f"""{long_context}
问题:用户密码是什么?
回答:"""
response = model.generate(prompt)
# 检查模型是否能找到关键信息
if "ABC123XYZ" in response:
print(f"✓ 在位置 {key_position} 的关键信息被成功检索")
return True
else:
print(f"✗ 在位置 {key_position} 的关键信息未被检索")
return False
# 测试结果(典型):
# 位置 0 (开头): 95% 成功率
# 位置 50% (中间): 40% 成功率 ← Lost in the Middle
# 位置 100% (结尾): 90% 成功率
解决方案:
- 注意力锚点(Attention Anchors):在长上下文中定期插入提醒,帮助模型关注关键信息
- 信息重构(Information Restructuring):将最关键的信息放在开头或结尾
- 分层上下文(Layered Context):不是一次性发送所有信息,而是分层发送
def insert_attention_anchors(context: str, num_anchors: int = 3) -> str:
"""
在长上下文中插入注意力锚点
"""
lines = context.split('\n')
total_lines = len(lines)
# 在上下文的 25%, 50%, 75% 位置插入锚点
anchor_positions = [
int(total_lines * 0.25),
int(total_lines * 0.5),
int(total_lines * 0.75)
]
anchored_lines = []
for i, line in enumerate(lines):
if i in anchor_positions:
anchored_lines.append("\n" + "="*60)
anchored_lines.append("⚠️ attention anchor: 以上是重要的背景信息,请仔细理解")
anchored_lines.append("="*60 + "\n")
anchored_lines.append(line)
return '\n'.join(anchored_lines)
# 更高级的策略:使用结构化格式
def build_structured_long_context(sections: List[dict]) -> str:
"""
使用明确的章节结构帮助模型理解长上下文
"""
context_parts = []
for i, section in enumerate(sections):
# 每个section用特殊的分隔符和标记包裹
section_text = f"""
{'='*80}
SECTION {i+1}: {section['title']}
重要性: {'⭐' * section.get('importance', 3)}
{'='*80}
{section['content']}
{'='*80}
END OF SECTION {i+1}
{'='*80}
"""
context_parts.append(section_text)
# 在最后添加一个总结性的"元信息"
context_parts.append("""
{'='*80}
CONTEXT SUMMARY:
请在回答时参考以上所有SECTION,特别关注标记为⭐⭐⭐的内容。
如果某个SECTION不相关,可以忽略。
{'='*80}
""")
return '\n'.join(context_parts)
限制2:推理质量下降
过量上下文会导致模型产生"幻觉"或遗漏关键细节。这是因为模型需要在更多信息中区分信号和噪声。
解决方案:渐进式上下文展开
async def progressive_context_expansion(query: str, llm):
"""
渐进式上下文展开:从最小上下文开始,逐步增加直到得到满意答案
"""
context_sizes = [
(4000, "minimal"), # 最小上下文:只有最核心信息
(16000, "standard"), # 标准上下文:加上相关文档
(32000, "extended"), # 扩展上下文:加上历史对话
(64000, "comprehensive") # 完整上下文:加上所有可用信息
]
for size, level in context_sizes:
print(f"Trying context level: {level} ({size} tokens)")
# 组装对应大小的上下文
context = assembler.assemble(
query,
max_tokens=size,
strategy=level
)
# 生成回答
response = await llm.agenerate(
prompt=build_prompt(query, context),
max_tokens=4096
)
# 评估回答质量
quality_score = evaluate_response_quality(response, query)
if quality_score > 0.85: # 置信度阈值
print(f"✓ Sufficient context found at level: {level}")
return {
"response": response,
"context_size": size,
"quality_score": quality_score
}
# 所有大小都尝试了,返回最佳结果
print("⚠ All context levels tried, returning best effort")
return {"response": response, "context_size": size, "quality_score": quality_score}
限制3:成本线性增长
每个token都意味着成本。以GPT-4o为例:
- 输入:$5 / 1M tokens
- 输出:$15 / 1M tokens
如果一个系统每天处理10,000个请求,每个请求平均使用10K tokens输入,那么每天的成本是:
10,000 requests/day * 10K tokens/request * $5/1M tokens = $0.5/day (输入)
+ 10,000 requests/day * 2K tokens/response * $15/1M tokens = $0.3/day (输出)
= $0.8/day 基本成本
如果优化上下文,将每个请求的输入降至5K tokens:
10,000 * 5K * $5/1M = $0.25/day (输入)
节省 50% 成本!
解决方案:上下文缓存 + 上下文压缩
class CostOptimizedContextManager:
"""
成本优化的上下文管理器
"""
def __init__(self, llm_config: dict):
self.llm_config = llm_config
self.cache = ContextCache()
self.compressor = ContextCompressor()
async def get_context_optimized(self, query: str, cache_key: str = None):
"""
获取成本优化的上下文
"""
# 1. 检查缓存(降低成本)
if cache_key:
cached = self.cache.get(cache_key)
if cached:
print(f"✓ Context cache hit for key: {cache_key}")
return cached
# 2. 检索完整上下文
full_context = await self.retriever.retrieve(query)
full_tokens = count_tokens(full_context)
# 3. 如果超过成本阈值,进行压缩
cost_threshold = self.llm_config.get("cost_threshold_per_query", 0.05)
estimated_cost = (full_tokens / 1000000) * self.llm_config["input_price_per_million"]
if estimated_cost > cost_threshold:
print(f"Context too large ({full_tokens} tokens), compressing...")
compressed = self.compressor.compress(
full_context,
target_tokens=int(cost_threshold * 1000000 / self.llm_config["input_price_per_million"])
)
context = compressed
else:
context = full_context
# 4. 缓存结果
if cache_key:
self.cache.set(cache_key, context)
return context
突破策略:分层上下文架构(Layered Context Architecture)
这是处理长上下文的最有效架构之一。核心思想是:不是所有信息都需要在同一层级被处理。
分层上下文架构:
Layer 0 (Always Present, ~2K tokens):
- 系统提示词(角色、行为准则、输出格式)
- 当前用户输入
- 优先级:最高(始终保留)
Layer 1 (High Priority, ~4K tokens):
- 当前任务的直接上下文(问题描述、错误信息、代码片段)
- 优先级:高(除非超出预算,否则保留)
Layer 2 (Medium Priority, ~8K tokens):
- 相关历史对话(最近3-5轮)
- 外部知识库检索结果(最相关的2-3个文档)
- 优先级:中(可以根据相关性动态选择)
Layer 3 (Low Priority, ~16K tokens):
- 背景知识(相关但不紧急的信息)
- 参考文档(可能需要但不一定用得到)
- 优先级:低(第一个被压缩或丢弃)
Layer 4 (Archive, 不占用实时Context Window):
- 长期记忆(通过向量检索动态加载)
- 完整历史(可以回溯但不主动加载)
- 优先级:存档(按需检索)
class LayeredContextManager:
"""
分层上下文管理器
实现分层上下文架构
"""
def __init__(self, max_context_tokens: int = 32000):
self.max_tokens = max_context_tokens
self.layers = {
0: {"name": "core", "max_tokens": 2000, "strategy": "static", "priority": "critical"},
1: {"name": "task", "max_tokens": 4000, "strategy": "dynamic_high", "priority": "high"},
2: {"name": "relevant", "max_tokens": 8000, "strategy": "retrieval", "priority": "medium"},
3: {"name": "background", "max_tokens": 16000, "strategy": "compression", "priority": "low"},
4: {"name": "archive", "max_tokens": None, "strategy": "vector_search", "priority": "archive"}
}
def build_context(self, query: str, state: ConversationState) -> str:
"""
构建分层上下文
"""
context_layers = {}
used_tokens = 0
# 从最高优先级层开始处理
for layer_id in sorted(self.layers.keys()):
layer = self.layers[layer_id]
# 检查token预算
remaining_budget = self.max_tokens - used_tokens
if remaining_budget < 500: # 保留500 tokens的缓冲
print(f"⚠ Token budget nearly exhausted, skipping layer {layer_id}")
break
# 获取该层的上下文
if layer["strategy"] == "static":
items = self._get_static_context()
elif layer["strategy"] == "dynamic_high":
items = self._get_task_context(query, max_tokens=min(layer["max_tokens"], remaining_budget))
elif layer["strategy"] == "retrieval":
items = self._retrieve_relevant(
query,
max_tokens=min(layer["max_tokens"], remaining_budget),
top_k=5
)
elif layer["strategy"] == "compression":
items = self._get_compressed_background(
query,
max_tokens=min(layer["max_tokens"], remaining_budget)
)
elif layer["strategy"] == "vector_search":
# Layer 4 不主动加载,只在需要时检索
items = [] # 留空,后续按需检索
# 统计token
layer_tokens = sum(item.get("token_estimate", 0) for item in items)
# 如果超出预算,压缩该层
if used_tokens + layer_tokens > self.max_tokens:
items = self._compress_layer_items(items, self.max_tokens - used_tokens)
layer_tokens = sum(item.get("token_estimate", 0) for item in items)
context_layers[layer_id] = {
"name": layer["name"],
"items": items,
"token_count": layer_tokens
}
used_tokens += layer_tokens
print(f"✓ Context built: {used_tokens} tokens across {len(context_layers)} layers")
# 渲染为字符串
return self._render_layers(context_layers)
def _compress_layer_items(self, items: list, max_tokens: int) -> list:
"""
压缩层内的上下文项以适应token预算
"""
# 按相关度排序
items_sorted = sorted(items, key=lambda x: x.get("relevance", 0), reverse=True)
compressed = []
token_count = 0
for item in items_sorted:
item_tokens = item.get("token_estimate", 0)
if token_count + item_tokens <= max_tokens:
compressed.append(item)
token_count += item_tokens
else:
# 尝试压缩单个项
if item.get("compressible", True):
compressed_item = self._compress_single_item(item, max_tokens - token_count)
if compressed_item:
compressed.append(compressed_item)
break
return compressed