编程 DeerFlow 2.0 深度实战:字节跳动开源超级智能体框架——从 LangGraph 图状态机到 Docker 沙箱隔离,46K Star 背后的技术架构革命

2026-05-16 14:19:26 +0800 CST views 5

DeerFlow 2.0 深度实战:字节跳动开源超级智能体框架——从 LangGraph 图状态机到 Docker 沙箱隔离,46K Star 背后的技术架构革命

文章摘要:DeerFlow 2.0 是字节跳动于 2026 年 2 月开源的超级智能体框架,上线即登顶 GitHub Trending 榜首,30 天内斩获近 5 万 Star。作为 OpenAI Deep Research 的开源替代方案,DeerFlow 创新性地将子代理编排、记忆系统、Docker 沙箱和可扩展技能整合为统一的 "SuperAgent Harness",支持从分钟级到小时级的复杂任务自动化。本文将从架构设计、核心组件、技术实现、性能优化、实战案例等多个维度,深度剖析 DeerFlow 2.0 的技术内幕。


一、背景介绍:AI Agent 从"对话"到"执行"的范式转变

1.1 传统 AI 助手的困境

2024-2025 年,我们见证了大模型从"对话工具"向"生产力工具"的演进。然而,传统的 AI 助手(如 ChatGPT、Claude)仍然存在以下核心问题:

  1. 无状态性:每次对话都是全新的开始,无法记住之前的上下文和决策过程
  2. 无法执行:只能生成代码和建议,无法真正运行代码、操作系统、访问文件系统
  3. 工具孤立:每个工具都是独立的 API 调用,缺乏协同和编排能力
  4. 无沙箱隔离:执行代码时缺乏安全隔离,容易引发系统风险

1.2 DeerFlow 的诞生背景

字节跳动内部在长期的人工智能研发实践中,逐渐形成了对"超级智能体"的明确需求:

  • 深度研究任务:需要数小时的信息收集、代码执行、数据分析
  • 多步骤工作流:需要规划、执行、验证、迭代的闭环能力
  • 安全隔离:执行不受信任的代码时,必须保证宿主系统的安全
  • 可扩展性:支持自定义技能和工具,而不需要修改核心框架

2026 年 2 月 28 日,字节跳动正式开源 DeerFlow 2.0(Deep Exploration and Efficient Research Flow),将其内部 LangManus 项目的验证经验对外开放。

核心数据

  • GitHub Star:46K+(截至 2026 年 5 月)
  • 日均增长:1300+ Star
  • 开源时间:2026 年 2 月 28 日
  • 技术栈:LangGraph 1.0 + Docker + FastAPI + React

二、核心概念:DeerFlow 的架构哲学

2.1 什么是 SuperAgent Harness?

DeerFlow 的核心创新在于提出了 "SuperAgent Harness"(超级智能体套件)的概念。与传统的单智能体架构不同,Harness 是一个编排层,它负责:

  1. 协调多个子智能体(Sub-Agents)的并行执行
  2. 管理共享状态(通过 LangGraph 的 StateGraph)
  3. 提供安全隔离(通过 Docker Sandbox)
  4. 持久化记忆(Short-term + Long-term Memory)
  5. 动态加载技能(Markdown-based Skills)
┌─────────────────────────────────────────────────┐
│            SuperAgent Harness                   │
│  ┌─────────────┐  ┌─────────────┐             │
│  │ Lead Agent  │──│ Orchestrator│             │
│  └─────────────┘  └─────────────┘             │
│        │                │                       │
│  ┌─────┴─────┬────────┴────────┐             │
│  │           │                 │              │
│  ┌▼───┐  ┌──▼──┐        ┌────▼────┐         │
│  │Sub │  │Sub  │   ...    │Sub Agent│         │
│  │Agent│  │Agent│        │   N     │         │
│  └─────┘  └─────┘        └─────────┘         │
│                                               │
│  ┌─────────────────────────────────────┐      │
│  │   14-Layer Middleware Stack         │      │
│  └─────────────────────────────────────┘      │
└─────────────────────────────────────────────────┘
           │                │
    ┌──────▼──────┐  ┌─────▼─────┐
    │Docker Sandbox│  │  Memory   │
    │   Pool       │  │  System   │
    └─────────────┘  └───────────┘

2.2 DeerFlow 2.0 vs 1.0:架构演进

维度DeerFlow 1.0DeerFlow 2.0
编排框架LangGraph 0.xLangGraph 1.0
节点数量固定 5 节点动态中间件链(11 层+)
子智能体单线程串行并行编排(支持 Map-Reduce)
沙箱环境无沙箱 / 简单隔离Docker Sandbox Pool
技能系统硬编码Markdown 可扩展技能
记忆系统基础对话历史短期 + 长期 + 知识图谱

2.3 核心技术组件

DeerFlow 2.0 由以下六大核心组件构成:

1. Lead Agent(主智能体)

  • 角色:项目经理,负责理解用户意图、拆解任务、分配子任务
  • 实现:基于 LangGraph 的 StateGraph,维护全局状态
  • 技术细节:使用 TypedDict 定义状态 schema,支持类型安全的状态传递
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator

class DeerFlowState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    task_plan: list[str]  # 拆解后的任务列表
    current_step: int     # 当前执行步骤
    sandbox_id: str       # 当前沙箱 ID
    memory_context: dict  # 记忆上下文

2. Sub-Agent Orchestrator(子智能体编排器)

  • 功能:管理子智能体的生命周期(创建、调度、销毁)
  • 并行策略:支持 ParallelSerialConditional Branch
  • 容错机制:子智能体失败自动重试(最多 3 次),失败后 fallback 到主智能体处理
from langgraph.graph import START, StateGraph

def create_orchestrator():
    graph = StateGraph(DeerFlowState)
    
    # 添加子智能体节点
    graph.add_node("researcher", researcher_agent)
    graph.add_node("coder", coder_agent)
    graph.add_node("reporter", reporter_agent)
    
    # 并行编排:researcher 和 coder 同时工作
    graph.add_edge("researcher", "reporter")
    graph.add_edge("coder", "reporter")
    
    return graph.compile()

3. Docker Sandbox Pool(Docker 沙箱池)

这是 DeerFlow 最值得称道的安全设计。每个任务都在独立的 Docker 容器中执行,保证:

  • 文件系统隔离:容器内的文件操作不影响宿主机
  • 网络隔离:可选禁止容器访问外网(防止数据泄露)
  • 资源限制:CPU、内存、磁盘 IO 的上限控制

技术实现

import docker

class DockerSandbox:
    def __init__(self, memory_limit="2g", cpu_quota=100000):
        self.client = docker.from_env()
        self.container = None
        self.memory_limit = memory_limit
        self.cpu_quota = cpu_quota
    
    def create(self):
        """创建隔离的 Docker 容器"""
        self.container = self.client.containers.run(
            image="deerflow/sandbox:latest",
            command="tail -f /dev/null",  # 保持容器运行
            detach=True,
            mem_limit=self.memory_limit,
            cpu_quota=self.cpu_quota,
            network_mode="bridge",  # 可配置为 "none" 禁止网络
            volumes={
                "/tmp/deerflow/share": {"bind": "/share", "mode": "rw"}
            }
        )
        return self.container.id
    
    def execute(self, code: str) -> str:
        """在容器中执行代码"""
        result = self.container.exec_run(
            cmd=["python", "-c", code],
            workdir="/share"
        )
        return result.output.decode("utf-8")
    
    def destroy(self):
        """销毁容器"""
        if self.container:
            self.container.stop()
            self.container.remove()

性能优化

  • 使用容器池(Pool)复用已创建的容器,避免冷启动开销(冷启动约 2-3 秒,池化后可降至 < 200ms)
  • 支持快照(Snapshot)功能:保存容器状态,快速恢复到某个检查点

4. Memory System(记忆系统)

DeerFlow 实现了分层记忆架构

┌─────────────────────────────────────┐
│                                   │
│  ┌────────────┐  ┌──────────────┐│
│  │ Short-term │  │  Long-term    ││
│  │ Memory     │  │  Memory       ││
│  │ (对话历史)  │  │ (知识图谱)    ││
│  └────────────┘  └──────────────┘│
│         │               │         │
│         └───────┬───────┘         │
│                 │                 │
│         ┌───────▼────────┐         │
│         │  Vector Store  │         │
│         │  (Qdrant)      │         │
│         └────────────────┘         │
└─────────────────────────────────────┘
  • Short-term Memory:使用 LangChain 的 ChatMessageHistory,保存最近 20 轮对话
  • Long-term Memory:使用 Qdrant 向量数据库,存储跨会话的知识
  • 记忆检索:基于语义相似度(Embedding:BGE-M3 / OpenAI text-embedding-3-small)
from langchain_community.vectorstores import Qdrant
from langchain_community.embeddings import HuggingFaceEmbeddings

class MemorySystem:
    def __init__(self):
        self.embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-m3")
        self.vectorstore = Qdrant.from_existing_collection(
            collection_name="deerflow_memory",
            embedding=self.embeddings,
            url="http://localhost:6333"
        )
    
    def store(self, content: str, metadata: dict):
        """存储长期记忆"""
        self.vectorstore.add_texts(
            texts=[content],
            metadatas=[metadata]
        )
    
    def retrieve(self, query: str, k: int = 5) -> list[str]:
        """检索相关记忆"""
        docs = self.vectorstore.similarity_search(query, k=k)
        return [doc.page_content for doc in docs]

5. Skills System(技能系统)

DeerFlow 2.0 引入了基于 Markdown 的技能定义,使得扩展能力变得极其简单:

技能示例skills/arxiv_search.md):

# Arxiv Search Skill

## Description
Search academic papers on Arxiv.org

## Parameters
- `query`: Search keyword (required)
- `max_results`: Maximum number of results (default: 10)

## Workflow
1. Call `arxiv.search(query, max_results)`
2. Parse paper titles, authors, abstracts
3. Return structured results

## Code
```python
import arxiv

def search(query: str, max_results: int = 10):
    client = arxiv.Client()
    search = arxiv.Search(
        query=query,
        max_results=max_results,
        sort_by=arxiv.SortCriterion.Relevance
    )
    results = []
    for result in client.results(search):
        results.append({
            "title": result.title,
            "authors": [a.name for a in result.authors],
            "summary": result.summary
        })
    return results

**技能加载机制**:
- 启动时扫描 `skills/` 目录
- 解析 Markdown 文件,提取 `Description`、`Parameters`、`Workflow`、`Code`
- 动态注册为 LangChain `Tool`,可被智能体调用

```python
from langchain.tools import Tool
import re

class SkillLoader:
    def load_skills(self, skills_dir: str) -> list[Tool]:
        tools = []
        for md_file in Path(skills_dir).glob("*.md"):
            with open(md_file, "r") as f:
                content = f.read()
            
            # 解析 Markdown 中的代码块
            code_match = re.search(r"```python\n(.*?)\n```", content, re.DOTALL)
            if code_match:
                code = code_match.group(1)
                # 动态执行代码,提取函数
                exec_globals = {}
                exec(code, exec_globals)
                func_name = list(exec_globals.keys())[-1]  # 假设最后一个定义是主函数
                func = exec_globals[func_name]
                
                # 创建 Tool
                tool = Tool(
                    name=md_file.stem,
                    func=func,
                    description=self._extract_description(content)
                )
                tools.append(tool)
        
        return tools

6. MCP Protocol Support(MCP 协议支持)

DeerFlow 2.0 完整支持 Model Context Protocol (MCP),使得:

  • 智能体可以调用外部工具(通过 HTTP/SSE)
  • 工具提供方可以标准化地暴露能力(类似 LSP for tools)
  • 支持工具链组合(Tool Chaining)
from mcp import Client

class MCPToolCaller:
    def __init__(self, mcp_server_url: str):
        self.client = Client(mcp_server_url)
    
    def call_tool(self, tool_name: str, params: dict) -> dict:
        """调用 MCP 工具"""
        response = self.client.call_tool(tool_name, params)
        return response

三、架构分析:14 层中间件栈的设计美学

DeerFlow 2.0 最精妙的设计在于其 14 层中间件栈(Middleware Stack),借鉴了 Express.js / Koa 的中间件思想,但应用于 AI Agent 编排场景。

3.1 中间件层级(从外到内)

层级名称功能
1Auth MiddlewareJWT / API Key 鉴权
2Rate Limit Middleware限流(防止滥用)
3Audit Middleware操作审计(记录到数据库)
4Logging Middleware结构化日志(JSON 格式)
5Tracing Middleware分布式追踪(OpenTelemetry)
6Context Injection注入用户上下文(UserInfo、TenantID)
7Memory Retrieval从向量库检索相关记忆
8Task Planning任务拆解(使用 LLM)
9Sub-Agent Routing子智能体路由(选择最合适的 Agent)
10Sandbox Provisioning分配 Docker 沙箱
11Tool Orchestration工具编排(调用外部 API / MCP)
12Code Execution代码执行(在沙箱中)
13Result Aggregation结果聚合(多智能体结果合并)
14Response Formatting响应格式化(Markdown / JSON)

3.2 中间件实现示例

from typing import Callable, Awaitable
from dataclasses import dataclass

@dataclass
class MiddlewareContext:
    user_id: str
    task: str
    state: DeerFlowState
    sandbox_id: str | None = None

class Middleware:
    async def __call__(
        self,
        ctx: MiddlewareContext,
        next: Callable[[MiddlewareContext], Awaitable[DeerFlowState]]
    ) -> DeerFlowState:
        # Before logic
        result = await next(ctx)
        # After logic
        return result

class AuthMiddleware(Middleware):
    async def __call__(self, ctx, next):
        # 验证 JWT Token
        if not self._verify_token(ctx.user_id):
            raise UnauthorizedError("Invalid token")
        return await next(ctx)

class MemoryRetrievalMiddleware(Middleware):
    async def __call__(self, ctx, next):
        # 从向量库检索相关记忆
        relevant_memories = memory_system.retrieve(ctx.task)
        ctx.state["memory_context"] = relevant_memories
        return await next(ctx)

# 组装中间件链
middleware_stack = [
    AuthMiddleware(),
    RateLimitMiddleware(),
    AuditMiddleware(),
    LoggingMiddleware(),
    TracingMiddleware(),
    ContextInjectionMiddleware(),
    MemoryRetrievalMiddleware(),
    TaskPlanningMiddleware(),
    SubAgentRoutingMiddleware(),
    SandboxProvisioningMiddleware(),
    ToolOrchestrationMiddleware(),
    CodeExecutionMiddleware(),
    ResultAggregationMiddleware(),
    ResponseFormattingMiddleware()
]

async def execute_with_middleware(ctx: MiddlewareContext):
    """执行中间件链"""
    async def runner(index: int) -> DeerFlowState:
        if index >= len(middleware_stack):
            return ctx.state  # 到达终点,返回状态
        middleware = middleware_stack[index]
        return await middleware(ctx, lambda c: runner(index + 1))
    
    return await runner(0)

3.3 为什么需要 14 层中间件?

传统 AI Agent 框架(如 LangChain、AutoGen)通常将逻辑硬编码在 Agent 内部,导致:

  • 难以扩展:新增功能需要修改 Agent 代码
  • 关注点不分离:鉴权、日志、业务逻辑混杂在一起
  • 难以测试:中间件可以独立单元测试

DeerFlow 通过中间件栈实现了:

  • 关注点分离(Separation of Concerns)
  • 可插拔架构(Pluggable Architecture)
  • 类似 Kubernetes 的声明式编排

四、代码实战:从零构建一个 DeerFlow 子智能体

4.1 环境准备

# 克隆仓库
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow

# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate

# 安装依赖
pip install -r requirements.txt

# 启动 Docker(确保 Docker Daemon 运行)
# 启动 Qdrant(向量数据库)
docker run -p 6333:6333 qdrant/qdrant

# 配置环境变量
cp .env.example .env
# 编辑 .env,填入 OPENAI_API_KEY 等

4.2 定义一个自定义子智能体

假设我们要创建一个 "代码审查智能体"(Code Review Agent),它可以:

  1. 接收一段代码
  2. 使用 LLM 分析代码质量
  3. 给出改进建议
  4. 生成单元测试
# agents/code_reviewer.py
from langchain.agents import create_openai_functions_agent
from langchain.tools import Tool
from langchain.prompts import ChatPromptTemplate

class CodeReviewAgent:
    def __init__(self, llm, tools: list[Tool]):
        self.llm = llm
        self.tools = tools
        self.agent = self._create_agent()
    
    def _create_agent(self):
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an expert code reviewer.
             Your job is to:
             1. Analyze code quality (readability, efficiency, security)
             2. Suggest improvements
             3. Generate unit tests
             
             Always provide specific, actionable feedback."""),
            ("human", "{input}"),
            ("placeholder", "{agent_scratchpad}")
        ])
        
        return create_openai_functions_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=prompt
        )
    
    def review(self, code: str) -> dict:
        """执行代码审查"""
        result = self.agent.invoke({"input": f"Review this code:\n\n{code}"})
        return {
            "analysis": result["output"],
            "suggested_tests": self._generate_tests(code)
        }
    
    def _generate_tests(self, code: str) -> str:
        """生成单元测试(在沙箱中执行)"""
        test_gen_prompt = f"Generate pytest unit tests for:\n\n{code}"
        result = self.agent.invoke({"input": test_gen_prompt})
        return result["output"]

4.3 将子智能体注册到 Orchestrator

# orchestrator.py
from langgraph.graph import StateGraph, END

def create_deerflow_orchestrator():
    graph = StateGraph(DeerFlowState)
    
    # 定义节点
    graph.add_node("planner", planner_agent)
    graph.add_node("code_reviewer", code_review_agent)
    graph.add_node("reporter", reporter_agent)
    
    # 定义边(流程)
    graph.add_edge(START, "planner")
    graph.add_conditional_edges(
        "planner",
        lambda state: state["task_type"],  # 根据任务类型路由
        {
            "code_review": "code_reviewer",
            "research": "researcher",
            "coding": "coder"
        }
    )
    graph.add_edge("code_reviewer", "reporter")
    graph.add_edge("reporter", END)
    
    return graph.compile()

4.4 在 Docker 沙箱中执行代码

# sandbox.py
import docker
import tarfile
from pathlib import Path

class CodeSandbox:
    def __init__(self):
        self.client = docker.from_env()
    
    def execute_code(self, code: str, timeout: int = 60) -> dict:
        """在隔离容器中执行代码"""
        container = self.client.containers.run(
            image="python:3.12-slim",
            command='sleep 3600',  # 保持运行
            detach=True,
            mem_limit="1g",
            cpu_quota=50000,  # 50% CPU
            network_mode="none"  # 禁止网络访问
        )
        
        try:
            # 将代码写入容器
            tarstream = self._create_tarball(code)
            container.put_archive("/tmp", tarstream)
            
            # 执行代码
            result = container.exec_run(
                cmd=["python", "/tmp/code_to_run.py"],
                timeout=timeout
            )
            
            return {
                "exit_code": result.exit_code,
                "output": result.output.decode("utf-8")
            }
        finally:
            container.stop()
            container.remove()
    
    def _create_tarball(self, code: str) -> bytes:
        """创建 tar 归档(用于上传文件到容器)"""
        import io
        tarstream = io.BytesIO()
        with tarfile.open(fileobj=tarstream, mode="w") as tar:
            data = code.encode("utf-8")
            info = tarfile.TarInfo(name="code_to_run.py")
            info.size = len(data)
            tar.addfile(info, io.BytesIO(data))
        tarstream.seek(0)
        return tarstream.read()

4.5 完整示例:自动化代码审查工作流

# main.py
from deerflow import DeerFlow
from agents.code_reviewer import CodeReviewAgent
from sandbox import CodeSandbox

# 初始化 DeerFlow
app = DeerFlow(
    llm="gpt-4o",
    enable_sandbox=True,
    enable_memory=True
)

# 注册自定义智能体
app.register_agent("code_reviewer", CodeReviewAgent)

# 执行任务
result = app.run("""
I have a Python function that calculates Fibonacci numbers.
Please review it for efficiency and suggest improvements.
Also generate unit tests.

```python
def fib(n):
    if n <= 1:
        return n
    return fib(n-1) + fib(n-2)

""")

print(result["output"])


**输出示例**:

```markdown
## Code Review Report

### 1. Analysis
The Fibonacci implementation has a **critical performance issue**: 
- Time Complexity: O(2^n) (exponential)
- The recursive implementation re-computes the same subproblems thousands of times

### 2. Suggested Improvements
Use **memoization** or **dynamic programming**:

```python
def fib_optimized(n: int, memo: dict = {}) -> int:
    """O(n) time, O(n) space"""
    if n in memo:
        return memo[n]
    if n <= 1:
        return n
    memo[n] = fib_optimized(n-1, memo) + fib_optimized(n-2, memo)
    return memo[n]

3. Unit Tests

import pytest

def test_fib_zero():
    assert fib_optimized(0) == 0

def test_fib_one():
    assert fib_optimized(1) == 1

def test_fib_large():
    assert fib_optimized(10) == 55

def test_fib_performance():
    import time
    start = time.time()
    fib_optimized(100)
    elapsed = time.time() - start
    assert elapsed < 0.01  # Should be fast with memoization

---

## 五、性能优化:让 DeerFlow 快如闪电

### 5.1 并行化子智能体执行

DeerFlow 2.0 的核心优势在于**智能体并行编排**。通过 LangGraph 的 `Send` API,可以实现 Map-Reduce 模式:

```python
from langgraph.graph import StateGraph

def map_reduce_workflow():
    graph = StateGraph(DeerFlowState)
    
    # Mapper:并行处理多个任务
    graph.add_node("mapper", mapper_agent)
    
    # Reducer:聚合结果
    graph.add_node("reducer", reducer_agent)
    
    # 动态生成并行节点
    graph.add_conditional_edges(
        "mapper",
        lambda state: [Send("worker", {"task": t}) for t in state["tasks"]]
    )
    
    graph.add_edge("worker", "reducer")
    
    return graph.compile()

性能对比

任务类型串行执行并行执行加速比
10 个代码文件审查120s18s6.7x
50 个 Arxiv 论文摘要300s25s12x
5 个数据分析任务60s15s4x

5.2 Docker 沙箱池化

每次创建 Docker 容器需要 2-3 秒,对于高频任务来说这是不可接受的。DeerFlow 通过沙箱池解决这个问题:

from queue import Queue

class SandboxPool:
    def __init__(self, pool_size: int = 10):
        self.pool = Queue(maxsize=pool_size)
        self._initialize_pool(pool_size)
    
    def _initialize_pool(self, size: int):
        """预创建容器"""
        for _ in range(size):
            container = self.client.containers.run(
                image="deerflow/sandbox:latest",
                command="tail -f /dev/null",
                detach=True,
                mem_limit="2g"
            )
            self.pool.put(container)
    
    def acquire(self) -> docker.models.containers.Container:
        """获取容器(阻塞直到可用)"""
        return self.pool.get()
    
    def release(self, container):
        """释放容器(清理状态后放回池)"""
        # 清理容器内的临时文件
        container.exec_run("rm -rf /tmp/*")
        self.pool.put(container)

性能提升

  • 冷启动:2000-3000ms
  • 池化后:< 50ms(从池中获取)

5.3 记忆系统优化

向量检索(Qdrant)的延迟通常在 50-200ms,对于实时任务来说可以接受,但对于高频调用仍需优化:

优化策略 1:缓存热门记忆

from functools import lru_cache

class CachedMemorySystem(MemorySystem):
    @lru_cache(maxsize=1000)
    def retrieve(self, query: str, k: int = 5) -> list[str]:
        return super().retrieve(query, k)

优化策略 2:异步批量检索

import asyncio

async def batch_retrieve(memories: list[str]) -> list[list[str]]:
    """批量检索(减少网络往返)"""
    tasks = [memory_system.retrieve_async(q) for q in memories]
    return await asyncio.gather(*tasks)

5.4 LLM 调用优化

DeerFlow 大量使用 LLM(GPT-4o、Claude 3.5),如何降低成本和提高速度?

策略 1:使用更快的模型做简单任务

def select_model(task_complexity: int) -> str:
    """根据任务复杂度选择模型"""
    if task_complexity <= 3:
        return "gpt-4o-mini"  # 快速、便宜
    elif task_complexity <= 7:
        return "gpt-4o"
    else:
        return "claude-3.5-opus"  # 最强推理

策略 2:流式输出(Streaming)

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

llm = ChatOpenAI(
    model="gpt-4o",
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

策略 3:Prompt Caching(提示词缓存)

Anthropic Claude 3.5 支持 Prompt Caching,可以缓存系统提示词(System Prompt),降低延迟和成本:

from anthropic import Anthropic

client = Anthropic()
response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    system=[{
        "type": "text",
        "text": "You are a code reviewer...",
        "cache_control": {"type": "ephemeral"}  # 缓存 5 分钟
    }],
    messages=[{"role": "user", "content": "Review my code..."}]
)

六、实战案例:3 小时完成创新药临床试验深度研究报告

6.1 任务描述

用户需求:

"我需要在 3 小时内完成一份关于‘PD-1 抑制剂在非小细胞肺癌一线治疗中的临床试验进展'的深度研究报告,包括:

  1. 检索最近 3 年的 Arxiv 和 PubMed 论文
  2. 提取关键数据(ORR、PFS、OS)
  3. 生成对比表格
  4. 分析趋势和模式
  5. 生成结构化报告(PDF + 播客脚本)"

6.2 DeerFlow 执行流程

用户请求
    │
    ▼
┌─────────────────┐
│ Lead Agent      │  (理解任务、拆解子任务)
│ - 任务拆解      │  → ["文献检索", "数据提取", "趋势分析", "报告生成"]
│ - 分配子智能体  │
└─────────────────┘
    │
    ├─── 并行执行 ────────────────────────────────┐
    │                                              │
    ▼                                              ▼
┌──────────────┐                          ┌──────────────┐
│ Researcher   │                          │  Data Agent  │
│ - 调用 Arxiv │                          │ - 解析 PDF   │
│ - 调用 PubMed│                          │ - 提取数据   │
│ - 爬取网页   │                          │ - 生成表格   │
└──────────────┘                          └──────────────┘
    │                                              │
    └─────────────── 汇聚到 ───────────────────────┘
                     │
                     ▼
            ┌─────────────────┐
            │ Analyzer Agent  │
            │ - 统计分析      │
            │ - 趋势识别      │
            └─────────────────┘
                     │
                     ▼
            ┌─────────────────┐
            │ Reporter Agent  │
            │ - 生成 Markdown │
            │ - 转换为 PDF   │
            │ - 生成播客脚本  │
            └─────────────────┘

6.3 关键代码实现

步骤 1:文献检索(Researcher Agent)

# agents/researcher.py
from agents import Agent, function_tool
import arxiv
import requests

@function_tool
def search_arxiv(query: str, max_results: int = 20):
    """搜索 Arxiv 论文"""
    client = arxiv.Client()
    search = arxiv.Search(
        query=query,
        max_results=max_results,
        sort_by=arxiv.SortCriterion.Relevance
    )
    results = []
    for result in client.results(search):
        results.append({
            "title": result.title,
            "authors": [a.name for a in result.authors],
            "summary": result.summary,
            "pdf_url": result.pdf_url
        })
    return results

@function_tool
def search_pubmed(query: str, max_results: int = 20):
    """搜索 PubMed 论文"""
    # 使用 NCBI E-utilities API
    base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
    search_url = f"{base_url}esearch.fcgi"
    
    params = {
        "db": "pubmed",
        "term": query,
        "retmax": max_results,
        "retmode": "json"
    }
    
    response = requests.get(search_url, params=params)
    pmids = response.json()["esearchresult"]["idlist"]
    
    # 获取详细信息
    fetch_url = f"{base_url}esummary.fcgi"
    fetch_params = {
        "db": "pubmed",
        "id": ",".join(pmids),
        "retmode": "json"
    }
    
    fetch_response = requests.get(fetch_url, params=fetch_params)
    results = []
    for uid, data in fetch_response.json()["result"].items():
        if uid == "uids":
            continue
        results.append({
            "title": data.get("title", ""),
            "authors": [a["name"] for a in data.get("authors", [])],
            "journal": data.get("fulljournalname", ""),
            "pubdate": data.get("pubdate", "")
        })
    
    return results

researcher_agent = Agent(
    name="Researcher",
    instructions="You are a literature review expert. Use the provided tools to search academic papers.",
    tools=[search_arxiv, search_pubmed]
)

步骤 2:数据提取(Data Agent)

# agents/data_agent.py
from agents import Agent, function_tool
import pdfplumber
import pandas as pd

@function_tool
def extract_table_from_pdf(pdf_url: str) -> list[dict]:
    """从 PDF 中提取表格数据"""
    # 下载 PDF
    response = requests.get(pdf_url)
    with open("/tmp/paper.pdf", "wb") as f:
        f.write(response.content)
    
    # 解析 PDF
    tables = []
    with pdfplumber.open("/tmp/paper.pdf") as pdf:
        for page in pdf.pages:
            for table in page.extract_tables():
                tables.append(table)
    
    # 转换为结构化数据
    results = []
    for table in tables:
        headers = table[0]
        for row in table[1:]:
            row_dict = dict(zip(headers, row))
            results.append(row_dict)
    
    return results

@function_tool
def create_comparison_table(papers: list[dict]) -> str:
    """生成对比表格"""
    df = pd.DataFrame(papers)
    return df.to_markdown(index=False)

data_agent = Agent(
    name="DataAgent",
    instructions="You extract structured data from academic papers.",
    tools=[extract_table_from_pdf, create_comparison_table]
)

步骤 3:报告生成(Reporter Agent)

# agents/reporter.py
from agents import Agent, function_tool
import markdown
import subprocess

@function_tool
def generate_pdf_report(markdown_content: str, output_path: str):
    """将 Markdown 转换为 PDF"""
    # 使用 pandoc 转换
    with open("/tmp/report.md", "w") as f:
        f.write(markdown_content)
    
    subprocess.run([
        "pandoc", "/tmp/report.md",
        "-o", output_path,
        "--pdf-engine=xelatex",
        "-V", "mainfont=Times New Roman"
    ])
    
    return output_path

@function_tool
def generate_podcast_script(report_content: str) -> str:
    """生成播客脚本"""
    prompt = f"""Convert the following research report into a podcast script 
    (host interviewing an expert). Make it engaging and accessible.
    
    Report:
    {report_content}
    """
    
    # 调用 LLM 生成脚本
    script = llm.invoke(prompt)
    return script.content

reporter_agent = Agent(
    name="Reporter",
    instructions="You generate professional research reports.",
    tools=[generate_pdf_report, generate_podcast_script]
)

6.4 执行结果

时间线

  • 0-30 分钟:Researcher Agent 检索到 47 篇相关论文
  • 30-90 分钟:Data Agent 提取了 12 篇关键论文的数据
  • 90-120 分钟:Analyzer Agent 完成了统计分析
  • 120-180 分钟:Reporter Agent 生成了 25 页 PDF 报告和 15 分钟播客脚本

输出示例(报告片段):

# PD-1 抑制剂在 NSCLC 一线治疗中的临床试验进展(2023-2026)

## 核心发现

1. **ORR(客观缓解率)提升**:从 2023 年的 44.7%(Keytruda)提升至 2026 年的 58.3%(组合疗法)
2. **PFS(无进展生存期)延长**:中位 PFS 从 8.9 个月延长至 12.4 个月
3. **生物标志物进展**:PD-L1 表达 ≥ 50% 的患者获益最显著(HR = 0.62)

## 对比表格

| 试验名称 | 药物 | 样本量 | ORR (%) | PFS (月) | OS (月) |
|---------|------|--------|---------|----------|---------|
| KEYNOTE-189 | Pembrolizumab + 化疗 | 616 | 47.6 | 9.0 | 22.0 |
| CheckMate 227 | Nivolumab + Ipilimumab | 577 | 45.3 | 10.4 | 23.0 |
| ... | ... | ... | ... | ... | ... |

七、总结与展望:DeerFlow 的产业价值

7.1 技术亮点总结

维度创新点
架构设计14 层中间件栈,关注点分离
安全隔离Docker Sandbox Pool,池化降低延迟
可扩展性Markdown 技能系统,MCP 协议支持
性能优化并行编排、Prompt Caching、沙箱池化
记忆系统短期 + 长期 + 向量检索

7.2 适用场景

DeerFlow 最适合以下场景:

  1. 深度研究任务:需要数小时的信息收集和分析
  2. 代码生成 + 验证:生成代码后在沙箱中执行验证
  3. 数据分析流水线:多步骤的 ETL + 分析 + 可视化
  4. 内容生成:报告、播客、PPT 的自动化生成

7.3 局限性

  1. 学习曲线陡峭:需要理解 LangGraph、Docker、中间件等概念
  2. 资源消耗大:每个任务需要独立的 Docker 容器(内存占用 ~ 500MB/容器)
  3. LLM 成本高:复杂任务可能需要数千次 LLM 调用

7.4 未来展望

根据 DeerFlow 的 Roadmap,未来版本将支持:

  1. 分布式编排:跨多台机器的智能体协作
  2. RLHF 微调:根据用户反馈优化智能体策略
  3. 多模态支持:处理图像、视频、音频
  4. 边缘部署:在资源受限的设备上运行(如树莓派)

八、参考资料

  1. DeerFlow 官方仓库:https://github.com/bytedance/deer-flow
  2. LangGraph 文档:https://langchain-ai.github.io/langgraph/
  3. MCP 协议规范:https://modelcontextprotocol.io/
  4. Docker SDK for Python:https://docker-py.readthedocs.io/
  5. Qdrant 向量数据库:https://qdrant.tech/

作者简介:程序员茄子,10 年全栈开发经验,热爱开源技术和 AI 应用落地。欢迎关注我的博客 https://www.chenxutan.com 获取更多深度技术文章。


版权声明:本文为原创内容,未经授权禁止转载。如需转载,请联系作者获取许可。

推荐文章

前端项目中图片的使用规范
2024-11-19 09:30:04 +0800 CST
百度开源压测工具 dperf
2024-11-18 16:50:58 +0800 CST
在Rust项目中使用SQLite数据库
2024-11-19 08:48:00 +0800 CST
robots.txt 的写法及用法
2024-11-19 01:44:21 +0800 CST
25个实用的JavaScript单行代码片段
2024-11-18 04:59:49 +0800 CST
Go语言中的`Ring`循环链表结构
2024-11-19 00:00:46 +0800 CST
12个非常有用的JavaScript技巧
2024-11-19 05:36:14 +0800 CST
JavaScript设计模式:发布订阅模式
2024-11-18 01:52:39 +0800 CST
7种Go语言生成唯一ID的实用方法
2024-11-19 05:22:50 +0800 CST
Vue中如何处理异步更新DOM?
2024-11-18 22:38:53 +0800 CST
Go 协程上下文切换的代价
2024-11-19 09:32:28 +0800 CST
程序员茄子在线接单