字节跳动 DeerFlow 2.0 深度解析:46k Star 的超级智能体框架,让 AI 真正「动手做事」
2026年3月,字节跳动开源的 DeerFlow 2.0 在 GitHub 狂揽 46,333+ Star,连续登顶 Trending 榜首。它不是又一个聊天机器人——而是一个能让 AI 真正完成复杂任务的超级智能体框架。本文从架构设计、核心原理、代码实战、性能优化四个维度,彻底拆解这个项目。
一、为什么 DeerFlow 2.0 值得关注
1.1 从「深度研究工具」到「超级智能体框架」的进化
DeerFlow(Deep Exploration and Efficient Research Flow)最初只是一个深度研究框架。开发者在实际使用中发现了它的潜力远不止于此:有人用它构建数据管道,有人用它生成幻灯片和仪表盘,有人用它自动化内容工作流。团队意识到——DeerFlow 不只是研究工具,而是一个让智能体真正完成工作的运行环境。
于是,他们从零重写,推出了 DeerFlow 2.0。
1.2 核心定位:一句话定义
DeerFlow = 深度调研场景下的多智能体工作流框架 + LangGraph 驱动的状态机引擎。
它把「需求理解 → 任务拆解 → 多轮检索 → 实验验证 → 报告生成」整个过程,固化成一条可观察、可配置的流水线。
1.3 关键数据一览
| 指标 | 数据 |
|---|---|
| GitHub Star | 46,333+ |
| 今日新增 Star | 3,787+ |
| 技术栈 | Python 3.12+ / Node.js 22+ |
| Agent 框架 | LangGraph + LangChain |
| 部署方式 | Docker + Kubernetes |
| 赞助商 | 字节跳动 (ByteDance / Volcengine) |
二、架构设计:模块化解构
2.1 整体架构概览
DeerFlow 2.0 采用微服务设计理念,核心由三层组成:
┌─────────────────────────────────────────┐
│ Nginx (反向代理) │
│ 端口: 80 / 443 │
└─────────────────┬───────────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LangGraph │ │ Gateway API │ │ Frontend │
│ Server │ │ (FastAPI) │ │ (Next.js) │
│ 端口: 2024 │ │ 端口: 8001 │ │ 端口: 3000 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ LangSmith │ │ Langfuse │
│ (追踪) │ │ (可观测性) │
└─────────────────┘ └─────────────────┘
2.2 核心模块职责
| 模块 | 职责 | 技术栈 |
|---|---|---|
| LangGraph Server | Agent 运行时、线程管理、SSE 流式传输、检查点 | LangGraph、LangChain |
| Gateway API | 外部 API 封装、认证、任务调度 | FastAPI、Python |
| Frontend | 用户界面、实时交互、文件展示 | Next.js、React |
| Sandbox | 代码执行环境隔离 | Docker、Kubernetes |
2.3 两种运行模式
DeerFlow 提供两种运行模式,满足不同场景需求:
标准模式(Standard Mode):
- 4 个容器:Gateway + LangGraph + Frontend + Nginx
- 适合开发测试和小规模部署
生产模式(Production Mode):
- 支持 Kubernetes 水平扩展
- 沙箱独立部署,资源隔离更严格
- 适合大规模并发任务处理
三、核心原理:LangGraph 驱动的状态机
3.1 为什么选择 LangGraph?
传统的 Agent 实现往往是「线性流程」——用户输入 → LLM 处理 → 输出结果。这种方式的问题是:
- 无法处理复杂状态:当任务需要多轮交互、循环处理时,线性流程难以应对
- 缺乏可视化:流程不透明,难以调试和优化
- 状态丢失:对话中断后无法恢复上下文
LangGraph 的核心思想是将 Agent 建模为有向图,每个节点代表一个操作(如搜索、代码执行、生成报告),边代表状态转换。这带来了三大优势:
- 状态持久化:每个节点的状态都可以被保存和恢复
- 流程可视化:图结构天然支持可视化展示
- 复杂逻辑支持:支持条件分支、循环、子图嵌套
3.2 DeerFlow 的核心图结构
DeerFlow 的工作流由以下核心节点组成:
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
class ResearchState(TypedDict):
"""研究任务的全局状态"""
query: str # 用户原始查询
plan: list[str] # 研究计划(分步骤)
current_step: int # 当前执行步骤
research_results: list[dict] # 收集到的研究资料
code_results: dict # 代码执行结果
draft_report: str # 草稿报告
final_report: str # 最终报告
iterations: int # 迭代次数
def create_research_graph():
"""构建研究工作流图"""
workflow = StateGraph(ResearchState)
# 添加核心节点
workflow.add_node("planner", plan_research)
workflow.add_node("researcher", conduct_research)
workflow.add_node("coder", execute_code)
workflow.add_node("writer", write_report)
workflow.add_node("reviewer", review_and_refine)
# 定义边和条件
workflow.set_entry_point("planner")
workflow.add_edge("planner", "researcher")
workflow.add_edge("researcher", "coder")
workflow.add_edge("coder", "writer")
workflow.add_edge("writer", "reviewer")
workflow.add_edge("reviewer", END)
return workflow.compile()
3.3 子智能体协作机制
DeerFlow 的精髓在于「多智能体协作」。每个核心节点背后都是一个专门的子智能体:
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import JsonOutputParser
# 1. 规划者智能体:负责任务拆解
PLANNER_PROMPT = """你是一个专业的研究规划专家。
用户的研究目标:{query}
请将这个目标分解为具体的研究步骤。
每个步骤应该:
- 明确且可执行
- 有明确的验收标准
- 按逻辑顺序排列
输出格式(JSON):
{{
"steps": [
{{"step": 1, "action": "搜索...", "objective": "..."}},
...
],
"estimated_time": "预估耗时"
}}
"""
# 2. 研究员智能体:负责信息收集
RESEARCHER_PROMPT = """你是一个专业的研究员。
当前研究计划:{plan}
当前步骤:{current_step}
请执行以下研究任务:
1. 使用 Tavily 或 Brave Search 进行网络搜索
2. 使用 Arxiv API 搜索相关学术论文
3. 使用爬虫工具获取特定网页内容
收集的信息应包含:
- 关键事实和数据
- 引用来源(URL)
- 可信度评估
输出格式:
## 信息来源
- [来源1](url) - 核心发现
- [来源2](url) - 关键数据
"""
# 3. 编码员智能体:负责代码执行
CODER_PROMPT = """你是一个全栈工程师。
研究目标:{query}
收集到的资料:{research_results}
如果需要数据分析或可视化,请编写并执行 Python 代码。
要求:
1. 使用严格的错误处理
2. 代码必须可复现
3. 输出结果必须可解释
环境说明:
- Python 3.12+
- 可用库:pandas, numpy, matplotlib, plotly, requests
- 数据文件路径:/workspace/data/
"""
# 4. 报告员智能体:负责内容生成
WRITER_PROMPT = """你是一个专业的技术写作专家。
研究主题:{query}
研究成果:{research_results}
数据分析结果:{code_results}
请撰写一份结构完整的研究报告:
1. 执行摘要(150字以内)
2. 背景介绍(300字)
3. 核心发现(按主题分节,每节500字以上)
4. 数据分析(含图表说明)
5. 结论与展望(300字)
格式要求:
- Markdown 格式
- 关键数据用表格展示
- 引用来源用脚注标注
"""
3.4 沙箱隔离机制
DeerFlow 的沙箱是保证安全性的关键。它提供了三种执行模式:
# config.yaml
sandbox:
# 模式选择:local | docker | kubernetes
mode: docker
# Docker 配置
docker:
image: "deerflow/sandbox:latest"
memory_limit: "4g"
cpu_limit: 2
network: "bridge"
readonly_fs: false # 是否只读文件系统
# Kubernetes 配置(生产模式)
kubernetes:
namespace: "deerflow"
service_account: "deerflow-sandbox"
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
沙箱内部提供了完整的文件系统访问能力:
# 沙箱内可用的操作
class SandboxEnvironment:
"""沙箱环境模拟"""
# 文件系统操作
def read_file(self, path: str) -> str:
"""读取文件内容"""
...
def write_file(self, path: str, content: str) -> None:
"""写入文件"""
...
def list_directory(self, path: str) -> list[str]:
"""列出目录内容"""
...
# Shell 命令
def execute_command(self, cmd: str, timeout: int = 60) -> CommandResult:
"""执行 shell 命令"""
...
# Python 执行
def execute_python(self, code: str, context: dict = None) -> ExecutionResult:
"""安全执行 Python 代码"""
...
四、快速上手:从零搭建 DeerFlow 环境
4.1 环境准备
# 系统要求
- Ubuntu 20.04+ / macOS 12+
- Python 3.12+
- Node.js 22+
- Docker 24+(如使用 Docker 模式)
- Git
# 克隆项目
git clone https://github.com/bytedance/deer-flow.git
cd deer-flow
4.2 配置 LLM 提供商
DeerFlow 支持多种 LLM 提供商,包括 OpenAI、DeepSeek、Anthropic 等:
# 方式一:交互式配置(推荐新手)
make config
# 方式二:手动配置
cp config.example.yaml config.yaml
vim config.yaml
配置文件示例:
# config.yaml
models:
# 主模型(用于核心推理)
primary:
provider: openai
model: gpt-4o
api_key: ${OPENAI_API_KEY}
temperature: 0.7
max_tokens: 4096
# 辅助模型(用于简单任务,降低成本)
auxiliary:
provider: deepseek
model: deepseek-chat
api_key: ${DEEPSEEK_API_KEY}
temperature: 0.5
# 嵌入模型(用于文档检索)
embedding:
provider: openai
model: text-embedding-3-small
# 搜索工具配置
search:
tavily:
api_key: ${TAVILY_API_KEY}
brave:
api_key: ${BRAVE_SEARCH_API_KEY}
# MCP 服务器(可选扩展)
mcp:
enabled: true
servers:
- name: filesystem
command: ["npx", "mcp-server-filesystem", "/workspace"]
- name: sequential-thinking
command: ["npx", "@modelcontextprotocol/server-sequential-thinking"]
4.3 一键启动
# 开发模式(使用本地代码)
make dev
# Docker 模式(推荐)
make docker-up
# 生产模式(Kubernetes)
make k8s-deploy
启动后访问 http://localhost:3000,即可看到 DeerFlow 的 Web 界面。
4.4 第一个研究任务
在 Web 界面中输入研究主题,例如:
「分析 2026 年 Q1 全球 AI 投资趋势,聚焦生成式 AI 和 Agent 领域」
DeerFlow 将自动执行以下流程:
🎯 接收任务
└─ 理解用户意图
└─ 识别关键信息点
📋 制定计划
└─ 步骤 1:搜索 2026 Q1 AI 投资报告
└─ 步骤 2:收集生成式 AI 融资数据
└─ 步骤 3:分析 Agent 领域投资动态
└─ 步骤 4:对比区域分布
└─ 步骤 5:生成分析报告
🔍 执行研究
└─ [研究员] 调用 Tavily 搜索
└─ [研究员] 调用 Brave Search
└─ [研究员] 爬取行业报告网站
└─ [编码员] 整理数据
└─ [编码员] 生成可视化图表
📝 撰写报告
└─ [报告员] 综合所有发现
└─ [报告员] 生成 Markdown 报告
└─ [审阅员] 检查准确性和完整性
✅ 完成
└─ 报告已生成
└─ 可下载 PDF/PPT/语音版本
五、代码实战:深度定制 DeerFlow
5.1 自定义研究流程
DeerFlow 的图结构支持高度定制。以下是一个自定义研究流程的示例:
from deerflow.core import WorkflowBuilder
from deerflow.nodes import (
SearchNode,
CrawlNode,
CodeNode,
WriteNode,
ReviewNode
)
# 创建自定义工作流
builder = WorkflowBuilder(name="my-research-workflow")
# 添加标准节点
builder.add_node(SearchNode(
name="web_search",
providers=["tavily", "brave"],
max_results=20
))
builder.add_node(CrawlNode(
name="deep_crawl",
max_depth=3,
respect_robots_txt=True
))
builder.add_node(CodeNode(
name="analyze",
languages=["python", "r"],
timeout=300
))
# 添加自定义节点
@builder.node(name="custom_filter", type="filter")
def custom_filter_node(state: ResearchState) -> dict:
"""自定义过滤节点"""
filtered_results = []
for result in state["research_results"]:
# 自定义过滤逻辑
if result["reliability_score"] >= 0.7:
if result["date"] >= "2026-01-01":
filtered_results.append(result)
return {"filtered_results": filtered_results}
# 添加条件分支
builder.add_condition(
node="custom_filter",
conditions={
"has_results": "write_report",
"no_results": "retry_search"
}
)
# 编译工作流
workflow = builder.compile()
# 运行
result = workflow.run(query="2026 AI 投资趋势分析")
5.2 MCP 服务器集成
DeerFlow 支持 MCP(Model Context Protocol)扩展,可以连接各种外部工具:
# 配置 MCP 服务器连接
from deerflow.mcp import MCPClient
# 方式一:命令行工具
client = MCPClient.from_command(
"npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/workspace/data"]
)
# 方式二:HTTP 服务器
client = MCPClient.from_http(
url="http://mcp-server:3001",
headers={"Authorization": "Bearer ${MCP_API_KEY}"}
)
# 使用 MCP 工具
async def process_with_mcp():
async with client as mcp:
# 列出可用工具
tools = await mcp.list_tools()
print(f"可用工具: {[t.name for t in tools]}")
# 调用工具
result = await mcp.call_tool(
"read_file",
arguments={"path": "/workspace/data/report.txt"}
)
return result
# 方式三:集成 LangChain MCP 工具
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
tools = await load_mcp_tools(client)
agent = create_react_agent(model, tools)
5.3 长期记忆系统
DeerFlow 的记忆系统支持跨会话持久化:
from deerflow.memory import VectorMemory, SQLMemory
# 配置记忆存储
memory_config = {
"vector_store": {
"provider": "chroma", # 或 qdrant, pinecone
"collection": "research_memory",
"embedding": "text-embedding-3-small"
},
"sql_store": {
"provider": "sqlite",
"path": "/workspace/memory/memory.db"
}
}
# 存储记忆
async def store_research_memory(query: str, results: dict, summary: str):
memory = VectorMemory(config=memory_config)
await memory.add(
content=summary,
metadata={
"query": query,
"timestamp": datetime.now().isoformat(),
"sources": [r["url"] for r in results],
"tags": extract_keywords(query)
}
)
# 检索记忆
async def retrieve_related_memory(query: str, top_k: int = 5):
memory = VectorMemory(config=memory_config)
results = await memory.search(
query=query,
top_k=top_k,
filter_metadata={"tags": {"$contains": "AI投资"}}
)
return results
# 在工作流中使用记忆
@builder.node(name="memory_lookup", type="before")
async def memory_lookup_node(state: ResearchState) -> dict:
"""研究前先检索相关记忆"""
related = await retrieve_related_memory(state["query"])
if related:
return {
"previous_research": related,
"context": f"相关历史研究:{[r['query'] for r in related]}"
}
return {"previous_research": None}
5.4 Slack/飞书集成
DeerFlow 支持与企业通讯工具集成:
from deerflow.integrations import SlackBot, FeishuBot
# Slack 集成
slack = SlackBot(
bot_token="${SLACK_BOT_TOKEN}",
signing_secret="${SLACK_SIGNING_SECRET}",
workspace_id="TXXXXXX"
)
# 处理 Slack 命令
@slack.command("/research")
async def handle_research_command(command: Command):
query = command.text
# 启动研究任务
task_id = await workflow.run_async(query=query)
# 回复 Slack
await slack.reply(
channel=command.channel,
text=f"🎯 研究任务已启动!\n任务ID: {task_id}\n完成后我会通知你。"
)
return task_id
# 任务完成通知
@slack.event("app_mention")
async def handle_mention(event: dict):
if "research_complete" in event.get("text", ""):
task_id = extract_task_id(event["text"])
await slack.send_dm(
user=event["user"],
text=f"✅ 研究完成!\n查看报告: {get_report_link(task_id)}"
)
# 飞书集成(类似)
feishu = FeishuBot(
app_id="${FEISHU_APP_ID}",
app_secret="${FEISHU_APP_SECRET}"
)
六、性能优化:打造生产级 DeerFlow
6.1 资源调度优化
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: deerflow-langgraph
spec:
replicas: 3
selector:
matchLabels:
app: deerflow-langgraph
template:
spec:
containers:
- name: langgraph
image: deerflow/langgraph:latest
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
env:
- name: MODEL_CONCURRENCY
value: "10"
- name: ENABLE_STREAMING
value: "true"
- name: MAX_ITERATIONS
value: "50"
- name: TIMEOUT_PER_ITERATION
value: "120"
---
apiVersion: v1
kind: Service
metadata:
name: deerflow-langgraph-svc
spec:
type: ClusterIP
ports:
- port: 2024
targetPort: 2024
selector:
app: deerflow-langgraph
6.2 模型调用优化
from langchain.callbacks import TokenCountingCallbackHandler
from langchain.chat_models import init_chat_model
# 1. 使用批量处理减少 API 调用
async def batch_process_queries(queries: list[str], batch_size: int = 5):
results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
# 并行处理批次(注意 API 速率限制)
batch_results = await asyncio.gather(
*[process_single_query(q) for q in batch],
return_exceptions=True
)
results.extend(batch_results)
# 批次间延迟(遵守速率限制)
await asyncio.sleep(1)
return results
# 2. 缓存常用查询结果
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def get_cached_analysis(query_hash: str) -> dict:
"""缓存分析结果"""
# 从缓存存储读取
return cached_results.get(query_hash)
def generate_query_hash(query: str) -> str:
return hashlib.sha256(query.encode()).hexdigest()[:16]
# 3. 模型降级策略
async def smart_model_call(query: str):
# 简单查询使用小模型
if is_simple_query(query):
try:
return await call_model("deepseek-chat", query)
except Exception:
pass
# 复杂查询使用大模型
return await call_model("gpt-4o", query)
def is_simple_query(query: str) -> bool:
"""判断查询复杂度"""
indicators = [
len(query.split()) < 20,
"分析" not in query,
"比较" not in query,
"趋势" not in query
]
return sum(indicators) >= 3
6.3 可观测性配置
# observability/config.yaml
tracing:
enabled: true
provider: langsmith
langsmith:
api_key: ${LANGCHAIN_API_KEY}
project: deerflow-production
tags: ["production", "research"]
langfuse:
public_key: ${LANGFUSE_PUBLIC_KEY}
secret_key: ${LANGFUSE_SECRET_KEY}
host: https://cloud.langfuse.com
release: v2.0.0
metrics:
prometheus:
enabled: true
port: 9090
path: /metrics
custom_metrics:
- name: research_task_duration_seconds
type: histogram
buckets: [10, 30, 60, 120, 300, 600]
- name: llm_tokens_consumed
type: counter
labels: ["model", "task_type"]
- name: search_results_count
type: histogram
buckets: [5, 10, 20, 50, 100]
alerts:
enabled: true
rules:
- name: high_error_rate
condition: error_rate > 0.05
channel: slack
severity: high
- name: slow_response
condition: p95_latency > 300
channel: slack
severity: medium
- name: out_of_tokens
condition: daily_tokens > limit * 0.9
channel: email
severity: critical
七、对比分析:DeerFlow vs 竞品
7.1 核心能力对比
| 特性 | DeerFlow 2.0 | LangGraph | CrewAI | AutoGen |
|---|---|---|---|---|
| 多智能体协作 | ✅ 完善 | ✅ 基础 | ✅ 完善 | ✅ 完善 |
| 沙箱执行 | ✅ Docker/K8s | ❌ 需自建 | ❌ 需自建 | ❌ 需自建 |
| 长期记忆 | ✅ 内置 | ⚠️ 需配置 | ⚠️ 需配置 | ❌ 无 |
| 多模态输出 | ✅ 完善 | ⚠️ 基础 | ❌ 无 | ❌ 无 |
| 企业集成 | ✅ Slack/飞书 | ⚠️ 需配置 | ❌ 无 | ⚠️ 需配置 |
| 学习曲线 | 中等 | 陡峭 | 平缓 | 中等 |
| 生产就绪度 | ✅ 高 | ⚠️ 中 | ⚠️ 中 | ⚠️ 中 |
7.2 DeerFlow 的独特优势
- 开箱即用的完整解决方案:不像 LangGraph 需要大量配置,DeerFlow 提供了完整的 UI、工作流、记忆系统
- 字节跳动的工程背书:生产级代码质量,经过内部验证
- 活跃的开源社区:快速迭代,Issue 响应及时
- 多语言支持:前端支持国际化,方便扩展
7.3 适用场景
DeerFlow 最适合的场景:
- 📊 市场调研:自动收集行业数据、竞品分析
- 📈 投资研究:分析财报、行业趋势
- 📚 学术研究:文献综述、实验数据分析
- 📰 内容创作:新闻摘要、报告生成
- 🔍 技术调研:技术选型、方案评估
建议使用其他框架的场景:
- 需要极简定制:考虑 smolagents
- 需要复杂状态管理:考虑 LangGraph
- 需要多代理协作且预算有限:考虑 CrewAI
八、实战案例:构建 AI 研究助手
8.1 需求描述
构建一个「AI 股票研究助手」,能够:
- 自动获取指定公司最近的新闻和公告
- 分析财务数据变化趋势
- 对比同行业竞争对手
- 生成投资研究报告
8.2 实现方案
# research_agent.py
from deerflow import WorkflowBuilder, Node
from deerflow.nodes import SearchNode, CodeNode, WriteNode
from deerflow.memory import VectorMemory
import yfinance as yf
class StockResearchWorkflow:
def __init__(self):
self.workflow = self._build_workflow()
def _build_workflow(self):
builder = WorkflowBuilder(name="stock-research")
# 节点 1:搜索新闻
builder.add_node(SearchNode(
name="news_search",
providers=["tavily", "brave"],
query_template="{company} 最新新闻 2026"
))
# 节点 2:获取财务数据
@builder.node(name="financial_data", type="code")
def get_financial_data(state: dict) -> dict:
ticker = state["ticker"]
stock = yf.Ticker(ticker)
# 获取多个时间周期的财务数据
financials = stock.financials
balance_sheet = stock.balance_sheet
cashflow = stock.cashflow
# 计算关键指标
metrics = {
"revenue_growth": calculate_growth(financials.loc["Total Revenue"]),
"profit_margin": calculate_margin(financials),
"debt_ratio": calculate_debt_ratio(balance_sheet),
"cash_flow_health": analyze_cashflow(cashflow)
}
return {
"financial_metrics": metrics,
"raw_data": {
"financials": financials.to_dict(),
"balance_sheet": balance_sheet.to_dict()
}
}
# 节点 3:竞争对手对比
@builder.node(name="competitor_analysis", type="code")
def analyze_competitors(state: dict) -> dict:
ticker = state["ticker"]
competitors = get_competitors(ticker) # 需要自定义实现
comparison = {}
for comp in competitors:
stock = yf.Ticker(comp)
comparison[comp] = {
"market_cap": stock.info.get("marketCap"),
"pe_ratio": stock.info.get("trailingPE"),
"revenue": stock.info.get("totalRevenue"),
"growth": stock.info.get("earningsGrowth")
}
return {"competitor_data": comparison}
# 节点 4:生成报告
builder.add_node(WriteNode(
name="report_writer",
template="stock_research_template.md", # 自定义模板
output_format="pdf"
))
# 连接节点
builder.connect("news_search", "financial_data")
builder.connect("financial_data", "competitor_analysis")
builder.connect("competitor_analysis", "report_writer")
return builder.compile()
def run(self, company: str, ticker: str) -> str:
result = self.workflow.run(
query=f"{company} 股票研究",
ticker=ticker,
company=company
)
return result["report_path"]
# 使用示例
agent = StockResearchWorkflow()
report_path = agent.run(
company="苹果公司",
ticker="AAPL"
)
print(f"报告已生成: {report_path}")
8.3 运行效果
当用户输入「苹果公司(AAPL)股票研究」时:
🎯 开始研究:苹果公司 (AAPL)
📰 阶段 1/4:新闻搜索
└─ 搜索最近 30 天相关新闻...
└─ 找到 47 条相关新闻
└─ 识别关键事件:iPhone 17 发布、Q1 财报发布
📊 阶段 2/4:财务分析
└─ 获取 2023-2026 财年数据...
└─ 计算关键指标:
- 营收增长率:+8.2%
- 净利润率:26.4%
- 负债率:32.1%
- 现金流健康度:优秀
🔍 阶段 3/4:竞品对比
└─ 对比公司:微软、谷歌、亚马逊
└─ 对比维度:市值、PE、增速
└─ 竞争力评估:强
📝 阶段 4/4:生成报告
└─ 报告结构已确定
└─ 撰写各章节...
└─ 生成数据图表...
└─ 导出 PDF...
✅ 研究完成!
报告路径:/workspace/reports/AAPL_2026_04_28.pdf
页数:32 页
包含图表:15 个
九、总结与展望
9.1 核心价值总结
DeerFlow 2.0 的核心价值在于:
- 降低 AI 应用门槛:通过预置的工作流和组件,开发者无需从零构建 Agent 系统
- 提升研究效率:自动化完成资料收集、数据分析、报告生成全流程
- 保障安全隔离:沙箱机制确保代码执行安全可控
- 支持企业集成:开箱即用的 Slack/飞书集成,适配企业工作流
9.2 未来发展方向
根据 DeerFlow 的发展规划,未来可能的演进方向包括:
- 更强大的多模态能力:支持视频分析、语音交互
- 更智能的任务规划:基于历史学习自动优化工作流
- 更丰富的预置模板:覆盖更多垂直行业场景
- 更好的性能优化:支持更大规模的并发任务处理
- 更开放的开源生态:吸引更多社区贡献者
9.3 开发者建议
对于想要深入了解和使用 DeerFlow 的开发者,我建议:
- 从官方示例开始:GitHub 仓库中的 examples 目录提供了丰富的参考
- 深入理解 LangGraph:DeerFlow 的核心建立在 LangGraph 之上,理解图结构是深入定制的关键
- 关注社区动态:DeerFlow 发展迅速,保持关注可以获得最新功能和最佳实践
- 贡献开源:如果发现 bug 或有好的想法,提交 PR 是最快的反馈渠道
9.4 资源链接
- GitHub 仓库:https://github.com/bytedance/deer-flow
- 官方文档:https://deer-flow.github.io/
- 示例项目:https://github.com/bytedance/deer-flow/tree/main/examples
- Discord 社区:加入讨论,获取最新动态
本文基于 DeerFlow 2.0 版本编写,随着项目发展,部分内容可能需要更新。如有疑问或建议,欢迎在评论区交流。