DeerFlow 2.0 深度解析:63K Star 的超级智能体执行底座,从架构哲学到生产实战
写在前面
2026 年 2 月 28 日,字节跳动把 DeerFlow 2.0 推向开源社区,24 小时内冲上 GitHub Trending 第一名,三个月后 Star 数突破 6.3 万,Fork 超 8300。这不是又一个套壳聊天机器人——它是一个从底层重新定义"AI Agent 该怎么干活"的超级智能体执行底座(Super Agent Harness)。
作为一个从 1.0 时代就在关注 DeerFlow 的开发者,我想说的不是"又一个框架",而是一个更本质的问题:当 AI 从"对话工具"走向"执行系统",我们到底需要什么样的基础设施?
这篇文章不搬运 README,不罗列功能清单。我会从架构设计的底层逻辑出发,拆解 DeerFlow 2.0 的每一个关键决策,配合完整的代码实战,让你真正理解它为什么能成为 2026 年最火的 AI Agent 框架。
一、从 Deep Research 到 Super Agent Harness:一次范式跃迁
1.1 1.0 的局限:研究助手的天花板
DeerFlow 1.0 的定位很清晰——深度研究助手(Deep Exploration and Efficient Research Flow)。它能把一个研究问题拆成子任务,自动搜索、汇总、生成带引用的研究报告。在 2025 年 5 月开源后,社区跑出了很多意想不到的用法:数据管道、幻灯片生成、仪表盘搭建、内容自动化工作流……
这暴露了一个核心矛盾:DeerFlow 1.0 是一个"你把它拼起来"的框架,不是一个"开箱即用"的运行时。
开发者不得不自己处理:
- 子任务之间的状态传递和错误恢复
- 代码执行的安全隔离
- 长时程任务的上下文管理
- 跨会话的记忆持久化
这些问题不是 DeerFlow 独有的,而是整个 AI Agent 行业的通病。
1.2 2.0 的决断:零代码共享的全量重构
2025 年 9 月,字节团队启动 2.0 版本的全量重构。和 1.x 分支共享的代码为零,整个架构基于 LangGraph 1.0 + LangChain 技术栈完全重写。
这个决策背后的哲学转变才是关键:
| 维度 | DeerFlow 1.0 | DeerFlow 2.0 |
|---|---|---|
| 定位 | 深度研究框架 | 超级智能体执行底座 |
| 架构 | 链式管道(Pipeline) | 编排式运行时(Harness) |
| 能力扩展 | 改代码 | 加 Skill |
| 执行环境 | 宿主机直接运行 | Docker 沙箱隔离 |
| 上下文管理 | 单会话全量 | 分层压缩 + 文件系统卸载 |
| 记忆 | 无持久化 | 跨会话长期记忆 |
| 子任务 | 串行执行 | 并行子智能体 |
从 Pipeline 到 Harness,不只是换个词。Pipeline 是"按顺序执行步骤",Harness 是"给 Agent 一台真正的计算机,让它自己决定怎么干活"。
1.3 为什么是"执行优先"
传统 AI 对话框架的核心假设是:用户发一条消息,模型回一条消息。交互模型是请求-响应。
DeerFlow 2.0 的核心假设是:用户提一个目标,Agent 自主规划、拆解、执行、汇报,全程可能持续几分钟到几小时。 交互模型是目标-执行-结果。
这个转变要求底层架构从根本上支持:
- 自主闭环工作流——从任务拆解、信息收集、代码执行到报告生成,无需人工干预
- 渐进式技能加载——技能仅在任务需要时才被加载到上下文中,而非一次性全部载入
- 安全隔离执行——每个任务运行在独立的 Docker 容器中,Agent 拥有完整的文件系统和 Bash 执行能力
这三个能力构成了 DeerFlow 2.0 的架构三角:自主性、高效性、安全性。
二、架构深度拆解:五层设计哲学
2.1 整体架构
DeerFlow 2.0 的架构可以分成五个清晰的层次:
┌──────────────────────────────────────────────────┐
│ 前端 UI 层 │
│ Next.js + React (端口 3000) │
├──────────────────────────────────────────────────┤
│ API 网关层 │
│ Gateway API (端口 8001) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ IM 通道 │ │ REST API │ │ WebSocket│ │
│ └──────────┘ └──────────┘ └──────────┘ │
├──────────────────────────────────────────────────┤
│ 智能体运行时层 │
│ LangGraph Server (端口 2024) │
│ ┌──────────────────────────────────┐ │
│ │ Lead Agent(主智能体) │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │Sub- │ │Sub- │ │Sub- │ ... │ │
│ │ │Agent│ │Agent│ │Agent│ │ │
│ │ └─────┘ └─────┘ └─────┘ │ │
│ └──────────────────────────────────┘ │
├──────────────────────────────────────────────────┤
│ 基础设施层 │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │沙箱 │ │记忆 │ │技能 │ │文件 │ │
│ │Sandbox│ │Memory│ │Skills│ │Files │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
├──────────────────────────────────────────────────┤
│ 反向代理层 │
│ Nginx (端口 2026) │
└──────────────────────────────────────────────────┘
这五层各司其职,但最值得关注的是中间三层:网关层、运行时层、基础设施层。它们之间的交互方式决定了 DeerFlow 的核心能力。
2.2 网关层:Agent 的 HTTP 入口
Gateway API 不仅仅是一个 REST 代理,它是整个系统的控制平面:
# Gateway API 的核心路由
# backend/packages/harness/deerflow/gateway/
# 线程管理
POST /api/threads # 创建对话线程
GET /api/threads/{thread_id} # 获取线程详情
DELETE /api/threads/{thread_id} # 删除线程
# 消息交互
POST /api/threads/{thread_id}/chat # 发送消息
POST /api/threads/{thread_id}/stream # 流式响应
# 文件操作
POST /api/threads/{thread_id}/files # 上传文件
GET /api/threads/{thread_id}/files # 列出文件
# 配置管理
GET /api/models # 列出可用模型
GET /api/skills # 列出技能
PUT /api/skills/{name} # 更新技能配置
网关层的一个重要设计是 IM 通道集成。DeerFlow 原生支持 Telegram、Slack、飞书、企业微信、微信五个 IM 平台,而且配置极其简单——只需在 config.yaml 中启用对应通道,填入凭证即可,无需公网 IP:
# config.yaml - IM 通道配置
channels:
langgraph_url: http://localhost:2024
gateway_url: http://localhost:8001
telegram:
enabled: true
bot_token: ${TELEGRAM_BOT_TOKEN}
allowed_users: [] # 空 = 允许所有
feishu:
enabled: true
app_id: ${FEISHU_APP_ID}
app_secret: ${FEISHU_APP_SECRET}
wecom:
enabled: true
bot_id: ${WECOM_BOT_ID}
bot_secret: ${WECOM_BOT_SECRET}
这种设计意味着你可以用 Telegram 机器人的方式给 DeerFlow 下达任务,它会在沙箱里执行完再把结果推回来。一个"随叫随到的研究助手"就这样变成了现实。
2.3 运行时层:LangGraph 驱动的智能体编排
DeerFlow 2.0 选择 LangGraph 作为编排引擎,这不是偶然。LangGraph 的核心优势在于**状态图(StateGraph)**模型——把 Agent 的工作流定义为一个有向图,节点是处理步骤,边是条件转换。
Lead Agent:项目经理
Lead Agent 是整个系统的"项目经理"。它负责:
- 接收用户的目标
- 分析目标,决定是否需要拆分
- 如果需要,派生子智能体
- 收集子智能体的结果
- 综合输出最终结果
# Lead Agent 的核心逻辑(简化版)
# 实际代码在 backend/packages/harness/deerflow/agents/
from langgraph.graph import StateGraph, END
class LeadAgent:
"""主智能体:接收目标、规划任务、编排子智能体"""
def __init__(self, config):
self.config = config
self.sub_agent_manager = SubAgentManager(config)
self.memory_manager = MemoryManager(config)
self.skill_loader = SkillLoader(config)
def build_graph(self):
"""构建工作流状态图"""
graph = StateGraph(AgentState)
# 定义节点
graph.add_node("analyze", self.analyze_task)
graph.add_node("plan", self.create_plan)
graph.add_node("execute", self.execute_plan)
graph.add_node("synthesize", self.synthesize_results)
graph.add_node("respond", self.generate_response)
# 定义边
graph.set_entry_point("analyze")
graph.add_conditional_edges(
"analyze",
self.should_plan,
{
True: "plan", # 复杂任务 → 规划
False: "respond", # 简单问题 → 直接回答
}
)
graph.add_edge("plan", "execute")
graph.add_edge("execute", "synthesize")
graph.add_edge("synthesize", "respond")
graph.add_edge("respond", END)
return graph.compile()
async def analyze_task(self, state: AgentState):
"""分析任务复杂度,决定执行策略"""
task = state["messages"][-1]
# 渐进式技能加载:只在需要时加载相关技能
relevant_skills = self.skill_loader.find_relevant(task)
if relevant_skills:
state["loaded_skills"] = relevant_skills
# 评估是否需要子智能体
complexity = self.assess_complexity(task)
state["complexity"] = complexity
return state
async def execute_plan(self, state: AgentState):
"""执行计划:并行派生子智能体"""
plan = state["plan"]
sub_tasks = plan["sub_tasks"]
# 并行派生子智能体
results = await self.sub_agent_manager.dispatch_parallel(
sub_tasks,
parent_context=state["context"]
)
state["sub_results"] = results
return state
Sub-Agent:专业执行者
子智能体是 DeerFlow 2.0 最核心的创新之一。每个子智能体拥有:
- 隔离的上下文——看不到主智能体和其他子智能体的上下文,避免信息干扰
- 独立的工具集——只拥有完成任务所需的最小工具集
- 独立的终止条件——完成后自动退出,释放资源
# 子智能体管理器
class SubAgentManager:
"""子智能体管理器:创建、调度、监控子智能体"""
async def dispatch_parallel(self, sub_tasks, parent_context):
"""并行派生多个子智能体"""
tasks = []
for sub_task in sub_tasks:
# 创建隔离上下文
isolated_context = self.create_isolated_context(
parent_context,
sub_task
)
# 选择合适的工具集
tools = self.select_tools(sub_task)
# 创建子智能体
agent = SubAgent(
task=sub_task,
context=isolated_context,
tools=tools,
termination=self.create_termination(sub_task)
)
tasks.append(agent.run())
# 并行执行,收集结果
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
processed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed.append({
"task": sub_tasks[i],
"status": "failed",
"error": str(result)
})
else:
processed.append({
"task": sub_tasks[i],
"status": "completed",
"result": result
})
return processed
def create_isolated_context(self, parent_context, sub_task):
"""创建隔离的子智能体上下文
关键设计:子智能体只能看到自己的任务描述,
不能看到主智能体的完整上下文或其他子智能体的信息。
这样做有两个好处:
1. 减少上下文窗口占用
2. 避免信息干扰,提高执行质量
"""
return {
"task": sub_task["description"],
"constraints": sub_task.get("constraints", []),
"parent_thread_id": parent_context["thread_id"],
"workspace": self.allocate_workspace(),
}
这种"Lead Agent + Sub-Agent"的模式本质上是一种分布式计算思想在 AI Agent 上的应用:把大任务分解成小任务,分配给独立的执行单元,最后聚合结果。和 MapReduce 的哲学一脉相承。
2.4 基础设施层:沙箱、记忆、技能、文件
沙箱系统:给 Agent 一台真正的计算机
这是 DeerFlow 2.0 和普通聊天机器人最本质的区别。
DeerFlow 的沙箱系统提供三种执行模式:
- Local Execution——直接在宿主机运行(开发调试用)
- Docker Execution——在隔离的 Docker 容器中运行(推荐)
- Kubernetes Execution——在 K8s Pod 中运行(生产环境)
# 沙箱提供者接口
class SandboxProvider(ABC):
"""沙箱提供者基类"""
@abstractmethod
async def create_sandbox(self, thread_id: str) -> SandboxInstance:
"""创建沙箱实例"""
pass
@abstractmethod
async def execute_code(self, sandbox_id: str, code: str) -> ExecutionResult:
"""在沙箱中执行代码"""
pass
@abstractmethod
async def read_file(self, sandbox_id: str, path: str) -> str:
"""读取沙箱中的文件"""
pass
@abstractmethod
async def write_file(self, sandbox_id: str, path: str, content: str):
"""写入文件到沙箱"""
pass
@abstractmethod
async def destroy_sandbox(self, sandbox_id: str):
"""销毁沙箱"""
pass
class AioSandboxProvider(SandboxProvider):
"""基于 Docker 的异步沙箱提供者(推荐)"""
def __init__(self, config):
self.docker_client = docker.DockerClient()
self.sandbox_image = config.get("sandbox_image", "deerflow/sandbox:latest")
self.provisioner_url = config.get("provisioner_url")
async def create_sandbox(self, thread_id: str) -> SandboxInstance:
"""为每个线程创建独立的 Docker 容器
文件系统结构:
/mnt/user-data/
├── uploads/ ← 用户上传的文件
├── workspace/ ← Agent 的工作目录
└── outputs/ ← 最终输出物
/mnt/skills/
├── public/ ← 内置技能
└── custom/ ← 自定义技能
"""
container = await self.docker_client.containers.run(
self.sandbox_image,
detach=True,
name=f"deerflow-sandbox-{thread_id}",
# 资源限制
mem_limit="2g",
cpu_count=2,
# 网络隔离
network_mode="bridge",
# 只读根文件系统 + 可写数据卷
read_only=True,
tmpfs={"/tmp": "size=500m"},
volumes={
f"deerflow-data-{thread_id}": {
"bind": "/mnt/user-data",
"mode": "rw"
}
},
# 安全选项
security_opt=["no-new-privileges"],
cap_drop=["ALL"],
)
return SandboxInstance(
id=container.id,
thread_id=thread_id,
container=container
)
async def execute_code(self, sandbox_id: str, code: str) -> ExecutionResult:
"""在容器中安全执行代码
执行流程:
1. 将代码写入容器内的临时文件
2. 在容器内执行 python3 /tmp/task.py
3. 收集 stdout、stderr、返回码
4. 设置超时保护(默认 300s)
"""
container = self.docker_client.containers.get(sandbox_id)
# 写入代码文件
exec_id = container.exec_run(
f"bash -c 'cat > /tmp/task.py << \"EOF\"\n{code}\nEOF'",
workdir="/mnt/user-data/workspace"
)
# 执行代码
result = container.exec_run(
"python3 /tmp/task.py",
workdir="/mnt/user-data/workspace",
demux=True,
stdout=True,
stderr=True
)
return ExecutionResult(
exit_code=result.exit_code,
stdout=result.output[0].decode() if result.output[0] else "",
stderr=result.output[1].decode() if result.output[1] else "",
)
沙箱内的文件系统是三层结构:
/mnt/user-data/
├── uploads/ ← 用户上传的文件
├── workspace/ ← Agent 的工作目录(中间产物)
└── outputs/ ← 最终输出物(报告、图表、代码等)
/mnt/skills/
├── public/ ← 内置技能
│ ├── research/SKILL.md
│ ├── report-generation/SKILL.md
│ ├── slide-creation/SKILL.md
│ ├── web-page/SKILL.md
│ └── image-generation/SKILL.md
└── custom/ ← 你的自定义技能
└── your-skill/SKILL.md
这个设计把"输入-加工-输出"的流转关系映射到了文件系统层级上,非常清晰。
记忆系统:让 Agent 真正"认识"你
大多数 Agent 的记忆是对话历史的简单回放。DeerFlow 2.0 的记忆系统是分层的:
- 短期记忆(Working Memory)——当前会话的上下文,存储在 LangGraph 的状态图中
- 中期记忆(Session Memory)——跨对话的用户偏好、技术栈、工作习惯
- 长期记忆(Long-term Memory)——持久化的知识积累,存储在本地文件中
# 记忆管理器(简化版)
class MemoryManager:
"""分层记忆管理器"""
def __init__(self, config):
self.memory_store = LocalMemoryStore(config["memory_path"])
async def apply_memory(self, user_id: str, thread_id: str):
"""在会话开始时加载用户记忆
去重设计:重复的偏好和上下文不会无限累积,
apply 时会跳过已存在的相同事实条目。
"""
memory = await self.memory_store.load(user_id)
# 去重
existing_facts = set(memory.get("facts", []))
new_facts = []
for fact in memory.get("pending_facts", []):
if fact not in existing_facts:
new_facts.append(fact)
existing_facts.add(fact)
memory["facts"] = list(existing_facts)
memory["pending_facts"] = []
await self.memory_store.save(user_id, memory)
return memory
async def update_memory(self, user_id: str, new_info: dict):
"""在会话中更新记忆
不是简单的追加,而是智能合并:
- 新偏好覆盖旧偏好
- 新事实跳过已有重复
- 上下文信息按相关性排序
"""
current = await self.memory_store.load(user_id)
# 合并偏好(新值覆盖旧值)
for key, value in new_info.get("preferences", {}).items():
current.setdefault("preferences", {})[key] = value
# 合并事实(跳过重复)
existing = set(current.get("facts", []))
for fact in new_info.get("facts", []):
if fact not in existing:
current.setdefault("facts", []).append(fact)
existing.add(fact)
await self.memory_store.save(user_id, current)
记忆系统的另一个巧妙之处:它和沙箱文件系统是协作的。大块的中间数据(比如分析结果、生成的图表)不会被塞进上下文窗口,而是被卸载到文件系统,记忆中只保存文件路径和摘要。这是一个典型的"用空间换时间"策略——用磁盘空间换取上下文窗口的可用性。
技能系统:Markdown 即能力
DeerFlow 的技能系统是它最优雅的设计之一。一个技能就是一个 Markdown 文件,定义了能力的工作流、最佳实践和参考资源。
skills/
├── public/
│ ├── research/
│ │ └── SKILL.md ← 深度研究技能
│ ├── report-generation/
│ │ └── SKILL.md ← 报告生成技能
│ ├── slide-creation/
│ │ └── SKILL.md ← 幻灯片创建技能
│ ├── web-page/
│ │ └── SKILL.md ← 网页生成技能
│ └── image-generation/
│ └── SKILL.md ← 图片生成技能
└── custom/
└── your-skill/
└── SKILL.md ← 你的自定义技能
渐进式加载是关键设计:技能只在任务需要时才被加载到上下文中。这意味着:
- 简单问答:零技能加载,上下文极小
- 研究任务:只加载 research 技能
- 生成报告:加载 research + report-generation
- 创建 PPT:加载 research + slide-creation + image-generation
# 技能加载器(简化版)
class SkillLoader:
"""渐进式技能加载器"""
def __init__(self, skills_dir: str):
self.skills_dir = Path(skills_dir)
self.loaded_skills = {}
self._skill_registry = self._build_registry()
def _build_registry(self):
"""扫描所有技能,建立索引"""
registry = {}
for skill_dir in self.skills_dir.rglob("SKILL.md"):
skill_name = skill_dir.parent.name
with open(skill_dir) as f:
content = f.read()
# 提取技能元数据(frontmatter)
metadata = self._parse_frontmatter(content)
registry[skill_name] = {
"path": skill_dir,
"metadata": metadata,
"keywords": metadata.get("keywords", []),
"token_count": len(content) // 4, # 粗估
}
return registry
def find_relevant(self, task: str) -> list[str]:
"""根据任务描述找到相关技能
不是简单的关键词匹配,而是基于语义相关性排序:
1. 提取任务中的关键意图
2. 与技能的 keywords 和 description 做语义匹配
3. 按相关性和 token 开销排序
4. 在上下文预算内选择最优组合
"""
relevant = []
for name, info in self._skill_registry.items():
score = self._compute_relevance(task, info)
if score > 0.5: # 相关性阈值
relevant.append((name, score, info["token_count"]))
# 按相关性降序排序
relevant.sort(key=lambda x: x[1], reverse=True)
# 在 token 预算内贪心选择
selected = []
total_tokens = 0
max_tokens = 8000 # 技能上下文预算
for name, score, tokens in relevant:
if total_tokens + tokens <= max_tokens:
selected.append(name)
total_tokens += tokens
return selected
async def load_skill(self, skill_name: str) -> str:
"""加载技能内容到上下文"""
if skill_name in self.loaded_skills:
return self.loaded_skills[skill_name]
info = self._skill_registry[skill_name]
with open(info["path"]) as f:
content = f.read()
self.loaded_skills[skill_name] = content
return content
这种设计让 DeerFlow 在 token 敏感的模型上也能高效运行。你不需要把所有技能一次性塞进上下文,而是按需加载,用完即卸。
三、代码实战:从零搭建一个自动化研究系统
理论够了,现在上手。
3.1 快速启动
# 克隆仓库
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow
# 运行安装向导(约2分钟)
make setup
# 验证安装
make doctor
# 启动开发服务
make dev
make setup 会启动一个交互式向导,引导你选择 LLM 提供商、可选的 Web 搜索、执行/安全偏好(沙箱模式、Bash 访问、文件写入工具)。它会生成一个最小化的 config.yaml 并把密钥写入 .env。
3.2 配置文件深度解读
DeerFlow 的配置中心是 config.yaml,这里有一个生产级别的配置示例:
# config.yaml - 生产级配置
# LLM 提供商配置
llm:
default_provider: openai
providers:
openai:
api_key: ${OPENAI_API_KEY}
model: gpt-4o
temperature: 0.7
max_tokens: 4096
deepseek:
api_key: ${DEEPSEEK_API_KEY}
model: deepseek-chat
base_url: https://api.deepseek.com/v1
kimi:
api_key: ${KIMI_API_KEY}
model: moonshot-v1-128k
base_url: https://api.moonshot.cn/v1
# 沙箱配置
sandbox:
use: deerflow.community.aio_sandbox:AioSandboxProvider
image: deerflow/sandbox:latest
# 资源限制
memory_limit: "2g"
cpu_limit: 2
# 执行超时
execution_timeout: 300
# K8s 部署时的 Provisioner
# provisioner_url: http://sandbox-provisioner:8080
# 记忆配置
memory:
enabled: true
storage_path: ./.deer-flow/memory
# 记忆更新策略
dedup: true # 跳过重复事实
max_facts: 1000 # 最大事实条目数
max_preferences: 100 # 最大偏好条目数
# 技能配置
skills:
public_dir: ./skills/public
custom_dir: ./skills/custom
# 渐进式加载预算
max_context_tokens: 8000
# IM 通道
channels:
langgraph_url: http://localhost:2024
gateway_url: http://localhost:8001
telegram:
enabled: true
bot_token: ${TELEGRAM_BOT_TOKEN}
allowed_users: ["123456789"]
# 可观测性
tracing:
langsmith:
enabled: true
project: deerflow-prod
langfuse:
enabled: false
3.3 自定义技能开发
DeerFlow 的技能就是一个 SKILL.md 文件。我们来开发一个"竞品分析"技能:
---
name: competitor-analysis
version: 1.0.0
author: your-name
keywords: competitor, analysis, market-research, benchmark
description: 竞品分析技能 - 自动收集竞争对手信息并生成结构化对比报告
---
# 竞品分析技能
## 概述
当用户需要进行竞品分析时,使用此技能。该技能会:
1. 识别竞品范围
2. 从多个维度收集数据
3. 生成结构化对比表格
4. 提供战略建议
## 工作流
### Step 1: 确定分析框架
在开始搜索之前,先和用户确认分析框架:
- 目标产品/服务
- 竞争对手列表(至少3个)
- 分析维度:功能、定价、技术栈、用户评价、市场份额
### Step 2: 并行信息收集
对每个竞品,并行执行以下搜索:
搜索关键词模板:
- "{产品名} 功能介绍 2026"
- "{产品名} 定价方案"
- "{产品名} 技术架构"
- "{产品名} 用户评价 优缺点"
- "{产品名} 市场份额 数据"
### Step 3: 数据整理与对比
将收集到的数据整理为结构化表格:
| 维度 | 我方 | 竞品A | 竞品B | 竞品C |
|------|------|-------|-------|-------|
| 核心功能 | ... | ... | ... | ... |
| 定价 | ... | ... | ... | ... |
| 技术栈 | ... | ... | ... | ... |
| 用户口碑 | ... | ... | ... | ... |
### Step 4: 战略建议
基于对比结果,提供:
- 差异化优势
- 需要追赶的短板
- 建议的功能优先级
- 定价策略建议
## 输出格式
使用 Markdown 格式,包含:
1. 执行摘要(200字内)
2. 详细对比表格
3. 雷达图(用 Mermaid 语法生成)
4. 战略建议
5. 数据来源列表
## 注意事项
- 数据必须标注来源和日期
- 主观评价需标注"基于公开信息"
- 定价信息需标注"截至 YYYY-MM-DD"
- 不确定的数据标注"待验证"
把这个文件放到 skills/custom/competitor-analysis/SKILL.md,DeerFlow 就会自动发现并注册这个技能。下次你让它做竞品分析时,它会自动加载这个技能到上下文中。
3.4 嵌入式 Python 客户端
DeerFlow 不一定要跑完整的服务端,你也可以把它作为 Python 库嵌入到自己的应用中:
from deerflow.client import DeerFlowClient
# 创建客户端(直连模式,无需 HTTP 服务)
client = DeerFlowClient()
# 同步聊天
response = client.chat(
"帮我分析一下 React 19 和 Vue 4 的 Server Components 方案差异",
thread_id="tech-comparison"
)
print(response)
# 流式响应
for event in client.stream("分析一下最近的 GitHub Trending 项目"):
if event.type == "messages-tuple" and event.data.get("type") == "ai":
print(event.data["content"], end="", flush=True)
# 上传文件进行分析
client.upload_files("thread-1", ["./report.pdf", "./data.csv"])
# 管理技能
skills = client.list_skills()
print(f"可用技能: {skills}")
# 启用/禁用技能
client.update_skill("competitor-analysis", enabled=True)
# 列出模型
models = client.list_models()
print(f"可用模型: {models}")
这个嵌入式客户端的 API 完全对齐 Gateway HTTP API 的响应模型,在 CI 中有 TestGatewayConformance 测试确保两者始终一致。这意味着你可以先在嵌入模式下开发调试,无缝切换到 HTTP 服务模式部署生产。
3.5 实战:用 DeerFlow 构建自动化技术雷达
让我用一个完整案例展示 DeerFlow 的端到端能力:构建一个自动化技术雷达系统,定期扫描技术趋势,生成可视化报告。
# tech_radar_agent.py
"""
自动化技术雷达:用 DeerFlow 定期扫描技术趋势
"""
import asyncio
import json
from datetime import datetime
from deerflow.client import DeerFlowClient
class TechRadarAgent:
"""技术雷达 Agent"""
# 雷达四象限
QUADRANTS = {
"techniques": "技术方法",
"platforms": "平台与基础设施",
"tools": "工具链",
"languages": "语言与框架"
}
# 环形分级
RINGS = {
"adopt": "采纳", # 建议立即采用
"trial": "试验", # 值得尝试
"assess": "评估", # 值得关注
"hold": "暂缓" # 暂不建议
}
def __init__(self):
self.client = DeerFlowClient()
self.thread_id = f"tech-radar-{datetime.now().strftime('%Y-%m-%d')}"
async def scan_trends(self):
"""扫描技术趋势"""
prompt = """
请执行以下技术雷达扫描任务:
1. 搜索过去一周的 GitHub Trending 项目
2. 搜索最近的技术博客和新闻
3. 对每个发现的技术/项目进行分类:
- 象限:techniques/platforms/tools/languages
- 环形:adopt/trial/assess/hold
4. 生成 Markdown 格式的技术雷达报告
报告格式:
## 采纳(Adopt)
- **[技术名]** - 一句话说明为什么建议采纳
## 试验(Trial)
- **[技术名]** - 一句话说明为什么值得尝试
## 评估(Assess)
- **[技术名]** - 一句话说明为什么值得关注
## 暂缓(Hold)
- **[技术名]** - 一句话说明为什么暂不建议
要求:
- 每个分类至少3个条目
- 必须有具体的版本号或时间信息
- 判断依据必须基于事实而非主观偏好
"""
result = await self.client.chat(
prompt,
thread_id=self.thread_id
)
return result
async def generate_radar_chart(self, report: str):
"""生成雷达可视化(Mermaid 格式)"""
prompt = f"""
基于以下技术雷达报告,生成一个 Mermaid mindmap 图:
{report}
要求:
1. 根节点为"技术雷达 YYYY-MM-DD"
2. 四个象限作为一级子节点
3. 环形分级作为二级子节点
4. 具体技术作为三级子节点
只输出 Mermaid 代码,不要其他内容。
"""
result = await self.client.chat(
prompt,
thread_id=self.thread_id
)
return result
async def run(self):
"""运行完整流程"""
print("🔍 正在扫描技术趋势...")
report = await self.scan_trends()
print("📊 正在生成雷达可视化...")
chart = await self.generate_radar_chart(report)
# 保存结果
output = {
"date": datetime.now().isoformat(),
"report": report,
"chart": chart
}
with open(f"tech-radar-{datetime.now().strftime('%Y%m%d')}.json", "w") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
print("✅ 技术雷达生成完成!")
return output
if __name__ == "__main__":
radar = TechRadarAgent()
asyncio.run(radar.run())
四、上下文工程:长时程任务的生命线
4.1 问题:上下文窗口的"内存泄漏"
AI Agent 执行长时程任务时,最大的敌人不是模型能力不够,而是上下文窗口溢出。
一个典型的场景:你让 Agent 做一份深度研究报告,它需要搜索 10 个网页,每个网页的内容都塞进上下文,再加上中间的分析步骤、子智能体的汇报……很快,128K 的上下文窗口就满了。结果就是:Agent 忘了最初的目标,开始胡说八道。
DeerFlow 2.0 的上下文工程(Context Engineering)是解决这个问题的系统方案。
4.2 三大策略
策略一:隔离式子智能体上下文
每个子智能体运行在完全隔离的上下文中。它看不到主智能体的完整上下文,也看不到其他子智能体的信息。
这意味着:
- 10 个子智能体并行工作,每个只消耗自己的上下文窗口
- 主智能体的上下文窗口只保留任务描述和最终结果
- 总的 token 消耗是 O(n × k) 而不是 O(n × n),其中 n 是子任务数,k 是每个子任务的上下文大小
策略二:渐进式上下文压缩
在一个会话内,DeerFlow 会主动管理上下文:
class ContextCompressor:
"""上下文压缩器"""
async def compress(self, state: AgentState) -> AgentState:
"""压缩已完成子任务的上下文
策略:
1. 已完成的子任务 → 只保留摘要
2. 中间产物 → 卸载到文件系统,上下文只保留路径
3. 搜索结果 → 只保留关键信息,丢弃原始 HTML
"""
completed = state.get("completed_sub_tasks", [])
for task in completed:
# 子任务结果卸载到文件
if task.get("result_size", 0) > 2000: # 超过2K字符
file_path = await self.offload_to_file(
thread_id=state["thread_id"],
task_id=task["id"],
content=task["result"]
)
# 上下文中只保留摘要和文件路径
task["result_summary"] = self.summarize(task["result"])
task["result_file"] = file_path
task["result"] = None # 清空完整结果
return state
def summarize(self, text: str, max_length: int = 200) -> str:
"""生成摘要(使用模型或规则)"""
if len(text) <= max_length:
return text
# 简单规则:取首句 + 关键数据
first_sentence = text.split("。")[0] + "。"
# 提取数字和百分比
key_data = re.findall(r'\d+\.?\d*%?', text)
summary = first_sentence
if key_data:
summary += f" 关键数据: {', '.join(key_data[:5])}"
return summary[:max_length]
策略三:严格工具调用恢复
这是一个很多人忽略的工程细节。当 LLM 提供商或中间件强制中断一个工具调用循环时,上下文中会留下"悬挂"的工具调用——模型已经发出了 tool_call,但还没有收到对应的 tool_result。
DeerFlow 的处理方式是:在下一个模型调用之前,剥离提供商级别的原始工具调用元数据,并为悬挂的调用注入占位符工具结果。
class ToolCallRecovery:
"""工具调用恢复器
解决的问题:OpenAI 兼容模型会严格验证
tool_call_id 序列,如果上下文中有悬挂的
tool_call(没有对应的 tool_result),
下一次模型调用会报错。
"""
def recover(self, messages: list) -> list:
"""恢复悬挂的工具调用"""
recovered = []
pending_tool_calls = {}
for msg in messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
# 记录所有工具调用 ID
for tc in msg["tool_calls"]:
pending_tool_calls[tc["id"]] = tc
elif msg.get("role") == "tool":
# 工具结果已经收到,移除对应的 pending
tc_id = msg.get("tool_call_id")
if tc_id in pending_tool_calls:
del pending_tool_calls[tc_id]
recovered.append(msg)
# 为悬挂的工具调用注入占位符结果
for tc_id, tc in pending_tool_calls.items():
recovered.append({
"role": "tool",
"tool_call_id": tc_id,
"content": f"[工具调用被中断,结果不可用] 函数: {tc['function']['name']}"
})
return recovered
这个看似小的细节,实际上是生产环境中 Agent 稳定性的关键。没有它,Agent 在长时间运行中会因为格式错误而崩溃。
五、部署与性能优化
5.1 部署模式选择
DeerFlow 提供两种部署模式:标准模式(Standard)和网关模式(Gateway)。
| 维度 | 标准模式 | 网关模式 |
|---|---|---|
| 架构 | Gateway + LangGraph (4 进程) | Gateway 嵌入运行时 (3 进程) |
| 并发 | 受限于 LangGraph worker 许可 | --workers × 异步任务(无上限) |
| 资源 | 更高(两个 Python 运行时) | 更低(单一 Python 运行时) |
| 冷启动 | 较慢(两个服务初始化) | 更快 |
| LangGraph 许可 | 生产镜像需要 | 不需要 |
# 标准模式
make up
# 网关模式(推荐生产环境)
make up-pro
# 或
./scripts/deploy.sh --gateway
5.2 资源规划
官方给出的部署规模参考:
| 场景 | 最低配置 | 推荐配置 |
|---|---|---|
| 本地评估 | 4 vCPU, 8 GB RAM | 8 vCPU, 16 GB RAM |
| Docker 开发 | 4 vCPU, 8 GB RAM | 8 vCPU, 16 GB RAM |
| 长期运行服务 | 8 vCPU, 16 GB RAM | 16 vCPU, 32 GB RAM |
注意:这些配置是给 DeerFlow 本身的,如果你还要在本地跑 LLM,需要额外规划。
5.3 Docker 生产部署
# 一步构建+启动
deploy.sh --gateway
# 分步:先构建,后启动(支持切换模式)
deploy.sh build
deploy.sh start --gateway
# 停止
deploy.sh down
Docker 生产部署的一个关键优化:国内镜像加速。
# 设置国内镜像(构建前)
export UV_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
export NPM_REGISTRY=https://registry.npmmirror.com
# 然后再构建
deploy.sh build
5.4 可观测性
DeerFlow 内置了 LangSmith 和 Langfuse 双追踪支持:
# .env
# LangSmith
LANGSMITH_TRACING=true
LANGSMITH_ENDPOINT=https://api.smith.langchain.com
LANGSMITH_API_KEY=lsv2_pt_xxxxxxxxxxxxxxxx
LANGSMITH_PROJECT=deerflow-prod
# Langfuse
LANGFUSE_TRACING=true
LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxxxxxxxxxx
LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxxxxxxxxxx
LANGFUSE_BASE_URL=https://cloud.langfuse.com
两个可以同时启用,DeerFlow 会把相同的模型活动同时上报给两个系统。
5.5 K8s 沙箱:生产级安全隔离
对于需要高安全性的生产环境,DeerFlow 支持 Kubernetes 沙箱模式。每个任务运行在独立的 Pod 中,通过 Provisioner 服务管理 Pod 的生命周期:
# K8s Provisioner 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: sandbox-provisioner
spec:
replicas: 1
selector:
matchLabels:
app: sandbox-provisioner
template:
metadata:
labels:
app: sandbox-provisioner
spec:
containers:
- name: provisioner
image: deerflow/sandbox-provisioner:latest
ports:
- containerPort: 8080
env:
- name: KUBECONFIG_PATH
value: /etc/kubernetes/kubeconfig
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1"
DeerFlow 的最新版本还支持为沙箱卷配置可选的 PVC(PersistentVolumeClaim),让沙箱的文件持久化更灵活。
六、Claude Code 集成:终端里的 AI 团队
DeerFlow 提供了一个 claude-to-deerflow 技能,让你直接在 Claude Code 终端里和运行中的 DeerFlow 实例交互:
# 安装技能
npx skills add https://github.com/bytedance/deer-flow --skill claude-to-deerflow
# 使用(确保 DeerFlow 在 http://localhost:2026 运行)
# 在 Claude Code 中输入 /claude-to-deerflow
支持的操作:
- 发送研究任务并获取流式响应
- 选择执行模式:flash(快速)、standard、pro(规划)、ultra(子智能体)
- 检查 DeerFlow 健康状态、列出模型/技能/智能体
- 管理线程和对话历史
- 上传文件进行分析
这意味着你可以在 Claude Code 的终端里完成"写代码 → 让 DeerFlow 做研究 → 获取结果 → 继续写代码"的完整工作流,无需切换窗口。
七、安全:不只是沙箱
DeerFlow 默认绑定 127.0.0.1,只在本地可访问。如果你要部署到公网,官方给出了三个安全建议:
- IP 白名单——用 iptables 或防火墙 ACL,只允许授权 IP 访问
- 认证网关——在前面放一个 Nginx 反向代理,开启强认证
- 网络隔离——把 Agent 和可信设备放在同一个 VLAN
最新版本还有一个安全增强:Gateway 对 HTML/SVG 等活跃 Web 内容类型强制下载而非内联渲染,降低 XSS 攻击面。这是一个很容易被忽视但非常重要的安全细节——Agent 生成的 HTML 文件如果直接在浏览器里渲染,可能包含恶意脚本。
八、与同类框架的对比
2026 年的 AI Agent 框架市场已经相当拥挤。DeerFlow、Hermes Agent、OpenClaw 是三个定位不同但经常被拿来比较的项目。
| 维度 | DeerFlow 2.0 | Hermes Agent | OpenClaw |
|---|---|---|---|
| 核心定位 | 超级智能体编排框架 | 自进化 AI 代理框架 | 个人 AI Agent 运行时 |
| 开发者 | 字节跳动 | NousResearch | 社区 |
| 架构 | LangGraph 编排 + 沙箱 + 技能 | 自进化 + 技能树 | 多通道 + 技能 + 节点 |
| 适用场景 | 企业级复杂任务 | 个人 AI 助手 | 个人/小型团队 |
| 部署复杂度 | 中高 | 低 | 低 |
| 沙箱隔离 | Docker/K8s | 无 | 容器可选 |
| 长期记忆 | ✅ | ✅ | ✅ |
| IM 集成 | 5 平台 | 无 | 多平台 |
| 自托管 | ✅ | ✅ | ✅ |
三者不是直接竞品,而是覆盖了 AI Agent 从个人轻量使用到企业级复杂任务的全场景。
九、总结与展望
DeerFlow 2.0 的核心贡献
- 重新定义了 AI Agent 的定位——从"对话工具"到"执行系统"
- 解决了长时程任务的工程难题——上下文压缩、子智能体隔离、文件系统卸载
- 提供了生产级的安全隔离——Docker/K8s 沙箱、严格的访问控制
- 实现了渐进式能力加载——按需加载技能,token 敏感模型也能高效运行
- 打通了 IM 通道——从 Telegram 到飞书,一个配置搞定
我的看法
DeerFlow 2.0 不是银弹。它最擅长的是长时间、多步骤、需要代码执行和分析能力的任务——比如深度研究、竞品分析、技术雷达、自动化报告。对于简单的问答或单步工具调用,它有点杀鸡用牛刀。
但它代表了一个正确的方向:AI Agent 不应该只是一个更聪明的聊天机器人,它应该是一个真正能干活的执行系统。 这需要的不只是更强的模型,更需要更完善的工程基础设施——沙箱、记忆、技能、编排、安全。DeerFlow 2.0 在这些方面做出了业界领先的设计。
适合你的场景
✅ 需要自动化深度研究工作流
✅ 需要长时间运行的 Agent 任务
✅ 需要安全的代码执行环境
✅ 需要多 IM 平台集成
✅ 有 Docker/K8s 运维经验
❌ 只需要简单的对话式 AI
❌ 没有服务器资源
❌ 需要极度轻量级的方案
参考
- DeerFlow GitHub: https://github.com/bytedance/deer-flow
- DeerFlow 官网: https://deerflow.tech
- LangGraph 文档: https://langchain-ai.github.io/langgraph/
- LangChain 文档: https://python.langchain.com/