从"盯着AI写代码"到"管理AI团队工作":OpenAI Symphony 如何定义AI智能体的工程化编排
当所有人都在关注"如何让单个AI编程助手更强"的时候,OpenAI 已经开始了下一个赛道的布局:如何让多个AI智能体像工程师团队一样协同工作。
前言:AI编程助手的困局
2024-2025 年,Claude Code、Cursor、GitHub Copilot 等 AI 编程工具席卷全球。但这些工具都有一个共同的特点:它们都是"单人AI助手"。
你对着一个 AI 说话,它帮你写代码、修 Bug。但当你需要完成一个完整的项目时,你会发现:
- 任务串行执行:一个AI处理完一个任务,才能处理下一个
- 没有边界和隔离:所有上下文混在一起,复杂任务容易崩溃
- 无法规模化:10个人的团队用1个AI助手,效率提升有限
- 缺乏工程管理:没有 CI、没有 PR 审核、没有任务追踪
这就像给你一个"超级实习生",但没有项目管理工具、没有代码审核流程、没有部署流水线。
2026年5月6日,OpenAI 发布了 Symphony——一个开源的 AI 智能体编排规范和参考实现。它解决的不是"让AI更强",而是**"如何像管理工程师团队一样管理AI智能体"**。
核心观点(来自 SPEC.md):
我们不应该盯着 AI 一个字符一个字符地写代码,而应该像管理人类工程师一样,通过定义明确的边界、策略和输入输出来管理 AI 的工作。
一、Symphony 是什么?
1.1 官方定义
Symphony 是 OpenAI 发布的一个开源编排框架(以 Elixir/OTP 为参考实现)和一套语言无关的规范(SPEC.md)。它的核心功能是:
将项目管理系统(如 Linear)中的任务,自动转化为一个个隔离的、可监控的 AI 实施任务。
传统模式:
人类工程师 → 从 Linear 领取任务 → 写代码 → 跑 CI → 提 PR → 人工审核 → 合并
Symphony 模式:
AI 智能体编排引擎 → 从 Linear 监控任务 →
├─ Agent 1:处理任务A → 隔离工作空间 → 跑 CI → 提 PR
├─ Agent 2:处理任务B → 隔离工作空间 → 跑 CI → 提 PR
└─ Agent 3:处理任务C → 隔离工作空间 → 跑 CI → 提 PR
→ PR 自动排队 → 人工审核/自动合并 → 任务板状态自动更新
1.2 一句话总结
Symphony = AI智能体的"工程管理层"。
它把 Linear(任务追踪)、Codex(AI 执行)、CI 系统(验证)连接成一个完整的自动化闭环,让多个 AI Agent 能够像工程师团队一样并行工作、互相隔离、各司其职。
1.3 与 Claude Code 的根本区别
| 维度 | Claude Code | Symphony |
|---|---|---|
| 定位 | 单人AI编程助手 | AI团队编排框架 |
| 交互方式 | 人类 ↔ 单个Agent | 人类 ↔ Agent团队 |
| 任务执行 | 串行,一个接一个 | 并行,多Agent同时工作 |
| 工作隔离 | 无隔离,全局上下文 | 严格隔离,每个任务独立 |
| 任务来源 | 人类手动输入 | 自动监听任务系统 |
| 工程集成 | 手动运行CI | 自动触发+监控CI |
| PR管理 | 无 | 自动提PR,排队审核 |
| 适用场景 | 个人开发者 | 工程团队、AI运营 |
二、为什么需要 Symphony?
2.1 "单人AI助手"的瓶颈
当前 AI 编程工具有三个根本性瓶颈:
瓶颈1:串行执行,无法并行
当你有 10 个独立的任务(比如"修复10个Bug")时:
- Claude Code:你需要按顺序一个一个告诉它,效率低
- Symphony:10 个 Agent 同时处理,10倍速
瓶颈2:上下文污染
当你在同一个会话里处理多个不相关的功能时,AI 容易混淆。一个关于用户认证的修复,可能影响到电商功能的代码。
瓶颈3:缺乏工程闭环
真正的软件工程不只是"写代码",还包括:
- 任务领取(从 Linear/JIRA)
- 代码编写(Agent)
- 持续集成(CI: GitHub Actions)
- 代码审核(PR Review)
- 合并部署(Merge & Deploy)
单人 AI 助手无法自动完成这个闭环,你需要手动在多个系统之间切换。
2.2 团队AI的愿景
Symphony 提出的愿景是:Scale AI agents like engineering teams。
就像一个 10 人工程师团队:
- 每个人负责不同的模块
- 每个人在独立的工作空间工作
- 所有人通过 PR 提交代码
- Tech Lead 负责审核和合并
Symphony 让 AI 智能体也能这样工作:
- 多个 Agent 同时处理不同任务
- 每个 Agent 在隔离的工作空间运行
- 所有变更通过 PR 提交
- 人类(或自动规则)负责审核和合并
三、核心架构解析
3.1 整体架构图
┌─────────────────────────────────────────────────────┐
│ Symphony Core │
│ ┌──────────┐ ┌────────────┐ ┌────────────────┐ │
│ │ Task │ │ Agent │ │ Workflow │ │
│ │ Monitor │→ │ Orchestrator│→ │ Engine │ │
│ │ (监听) │ │ (编排) │ │ (执行) │ │
│ └────┬─────┘ └─────┬──────┘ └───────┬────────┘ │
└───────┼──────────────┼─────────────────┼────────────┘
│ │ │
▼ ▼ ▼
┌────────────┐ ┌──────────────┐ ┌──────────────────────┐
│ Linear │ │ Codex Agents │ │ CI Systems │
│ (任务追踪) │ │ (AI执行) │ │ (GitHub Actions等) │
└────────────┘ └──────────────┘ └──────────────────────┘
3.2 四大核心组件
组件1:Task Monitor(任务监听器)
Task Monitor 负责持续监听任务追踪系统(Linear)的状态变化。
class TaskMonitor:
"""
任务监听器:监控任务板状态,自动触发 Agent
"""
def __init__(self, task_source: str = "linear"):
self.task_source = task_source
self.polling_interval = 60 # 每60秒检查一次
async def start_monitoring(self):
"""启动任务监听循环"""
while True:
# 获取所有 "In Progress" 或 "Ready" 状态的任务
tasks = await self.fetch_tasks_by_status(
status=["in_progress", "ready"]
)
for task in tasks:
# 检查是否已经有 Agent 在处理
if not await self.has_active_agent(task.id):
# 触发新的 Agent 处理此任务
await self.trigger_agent(task)
# 更新任务状态为 "In Review"
await self.update_task_status(task.id, "in_review")
await asyncio.sleep(self.polling_interval)
async def fetch_tasks_by_status(self, status: list):
"""从 Linear 获取指定状态的任务"""
# Linear API 调用
response = await linear_client.issues.list(
filter={
"state": {"in": status},
" assignee": None # 未分配给人类工程师的任务
}
)
return response.items
async def trigger_agent(self, task):
"""触发一个新的 Agent 来处理任务"""
agent_id = await orchestrator.spawn_agent(
task_id=task.id,
workspace=self.allocate_workspace(task.id),
model="codex",
instructions=self.generate_instructions(task)
)
return agent_id
组件2:Agent Orchestrator(智能体编排器)
Agent Orchestrator 负责管理多个并行运行的 Agent。
class AgentOrchestrator:
"""
智能体编排器:管理 Agent 的生命周期和资源分配
"""
def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.active_agents = {} # agent_id → AgentState
self.workspace_pool = WorkspacePool(max_size=max_concurrent)
async def spawn_agent(self, task_id: str, workspace: str,
model: str, instructions: str) -> str:
"""
创建一个新的 Agent 实例
关键设计:
1. 每个 Agent 有独立的工作空间(隔离)
2. Agent 是状态机,有明确的生命周期
3. 最大并发数有限制,防止资源耗尽
"""
# 检查是否达到并发上限
if len(self.active_agents) >= self.max_concurrent:
# 等待直到有 Agent 完成
await self.wait_for_slot()
agent_id = f"agent-{task_id}-{uuid.uuid4().hex[:8]}"
agent = Agent(
id=agent_id,
task_id=task_id,
workspace=workspace,
model=model,
instructions=instructions,
state=AgentState.IDLE
)
self.active_agents[agent_id] = agent
# 启动 Agent 执行
asyncio.create_task(agent.run())
return agent_id
async def wait_for_slot(self):
"""等待并发槽位"""
while len(self.active_agents) >= self.max_concurrent:
# 等待任意 Agent 完成
completed = [aid for aid, a in self.active_agents.items()
if a.state == AgentState.COMPLETED]
if completed:
for aid in completed:
await self.cleanup_agent(aid)
return
await asyncio.sleep(5)
async def cleanup_agent(self, agent_id: str):
"""清理已完成的 Agent"""
agent = self.active_agents.pop(agent_id)
# 释放工作空间
self.workspace_pool.release(agent.workspace)
# 保存执行日志
await self.save_audit_log(agent)
组件3:Workspace Isolation(工作空间隔离)
每个 Agent 必须在隔离的工作空间中运行,这是 Symphony 的核心安全机制。
class IsolatedWorkspace:
"""
隔离工作空间:每个 Agent 独立的工作环境
"""
def __init__(self, workspace_id: str, repo_url: str):
self.id = workspace_id
self.repo_url = repo_url
self.path = f"/tmp/symphony-workspaces/{workspace_id}"
async def setup(self):
"""
设置隔离工作空间
1. Clone 仓库到隔离目录
2. 创建分支
3. 设置环境变量
"""
os.makedirs(self.path, exist_ok=True)
# Clone 仓库到隔离目录
subprocess.run(
["git", "clone", self.repo_url, self.path],
check=True,
cwd=self.path
)
# 创建 Agent 专用分支
branch_name = f"agent/{self.id}"
subprocess.run(
["git", "checkout", "-b", branch_name],
cwd=self.path,
check=True
)
# 隔离环境变量
os.environ["AGENT_WORKSPACE"] = self.path
os.environ["AGENT_ID"] = self.id
async def run_command(self, cmd: str) -> CommandResult:
"""在隔离环境中运行命令"""
result = subprocess.run(
cmd,
shell=True,
cwd=self.path,
capture_output=True,
text=True,
timeout=300
)
return CommandResult(
stdout=result.stdout,
stderr=result.stderr,
returncode=result.returncode
)
async def create_pr(self, title: str, description: str) -> str:
"""创建 Pull Request"""
subprocess.run(["git", "add", "."], cwd=self.path, check=True)
subprocess.run(
["git", "commit", "-m", f"{title}\n\n{description}"],
cwd=self.path,
check=True
)
subprocess.run(["git", "push", "origin", "HEAD"], cwd=self.path, check=True)
# 通过 GitHub API 创建 PR
pr = await github_client.create_pr(
repo=self.repo_url,
title=title,
body=description,
head=f"agent/{self.id}",
base="main"
)
return pr.url
组件4:Workflow Engine(工作流引擎)
Workflow Engine 定义了 Agent 执行任务的标准化流程。
class WorkflowEngine:
"""
工作流引擎:定义 Agent 的标准化执行流程
"""
# 标准工作流步骤
STEPS = [
"understand_task", # 理解任务
"explore_codebase", # 探索代码库
"implement", # 实现代码
"write_tests", # 编写测试
"run_tests", # 运行测试
"verify_ci", # 验证CI状态
"create_pr", # 创建PR
"await_review", # 等待审核
]
async def execute(self, agent: Agent) -> WorkflowResult:
"""
执行标准工作流
每个步骤都是可观测的:记录输入、输出、耗时
"""
workflow_log = []
for step in self.STEPS:
step_start = time.time()
try:
# 执行当前步骤
step_input = self.get_step_input(step, agent)
step_output = await self.execute_step(step, agent, step_input)
workflow_log.append({
"step": step,
"status": "success",
"duration": time.time() - step_start,
"output_summary": self.summarize(step_output)
})
# 如果测试失败,自动进入修复循环
if step == "run_tests" and not step_output.passed:
await self.fix_and_retry(agent, step_output.errors)
except StepError as e:
workflow_log.append({
"step": step,
"status": "failed",
"error": str(e),
"duration": time.time() - step_start
})
# 记录失败,不阻塞其他步骤(其他 Agent 继续工作)
agent.log_error(step, e)
return WorkflowResult(
success=all(s["status"] == "success" for s in workflow_log),
log=workflow_log
)
async def execute_step(self, step: str, agent: Agent,
step_input: Any) -> StepOutput:
"""执行单个工作流步骤"""
if step == "understand_task":
return await agent.understand_task()
elif step == "explore_codebase":
return await agent.explore_codebase()
elif step == "implement":
return await agent.implement_changes(step_input)
elif step == "write_tests":
return await agent.write_tests()
elif step == "run_tests":
return await agent.run_tests()
elif step == "verify_ci":
return await agent.monitor_ci_status()
elif step == "create_pr":
return await agent.create_pull_request()
elif step == "await_review":
return await agent.wait_for_review()
四、SPEC.md 规范解析
4.1 规范的核心原则
Symphony 的规范(SPEC.md)定义了 AI Agent 编排的标准化接口。核心原则:
原则1:隔离优于共享
每个任务在独立的工作空间执行,避免交叉污染。
原则2:显式优于隐式
Agent 的所有操作都有明确的输入、输出和边界。
原则3:可观测性
每个步骤都有日志、耗时和状态记录。
原则4:可审核性
所有变更通过 PR 提交,人工(或规则)审核后才能合并。
4.2 规范中的关键接口
# SPEC.md 核心接口定义(简化版)
TaskSource:
# 任务来源(Linear, Jira, GitHub Issues 等)
interface: TaskSourceInterface
methods:
- list_tasks(status: Status[]) -> Task[]
- get_task(id: TaskID) -> Task
- update_status(id: TaskID, status: Status)
AgentRuntime:
# Agent 运行时环境
interface: AgentRuntimeInterface
methods:
- spawn(config: AgentConfig) -> AgentID
- send_message(agent_id: AgentID, message: Message) -> Response
- get_state(agent_id: AgentID) -> AgentState
- terminate(agent_id: AgentID)
WorkspaceManager:
# 工作空间管理
interface: WorkspaceManagerInterface
methods:
- allocate(task_id: TaskID) -> Workspace
- execute(workspace: Workspace, cmd: string) -> CommandResult
- release(workspace: Workspace)
CIConnector:
# CI 系统集成
interface: CIConnectorInterface
methods:
- trigger_build(workspace: Workspace) -> BuildID
- get_build_status(build_id: BuildID) -> BuildStatus
- get_test_results(build_id: BuildID) -> TestResults
PRManager:
# Pull Request 管理
interface: PRManagerInterface
methods:
- create_pr(workspace: Workspace, title: string, body: string) -> PR
- get_pr_status(pr: PR) -> PRStatus
- merge_pr(pr: PR)
4.3 Elixir/OTP 参考实现
Symphony 选择 Elixir/OTP 作为参考实现,这不是偶然的:
- 容错性:OTP 的监督树(Supervision Tree)天然适合管理有状态的长连接
- 并发:BEAM VM 可以轻松管理数万个并发 Agent
- 分布式:OTP 原生支持跨节点分布式部署
- 可观测性:Elixir 的日志和监控生态非常成熟
# Symphony 的 OTP 架构(简化示意)
defmodule Symphony.Supervisor do
use Supervisor
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
@impl true
def init(_arg) do
children = [
# 任务监听器
{Symphony.TaskMonitor, polling_interval: 60_000},
# Agent 编排器(监督树)
Symphony.AgentOrchestrator,
# 工作空间管理器
Symphony.WorkspaceManager,
# CI 连接器
Symphony.CIConnector,
# PR 管理器
Symphony.PRManager
]
Supervisor.init(children, strategy: :one_for_one)
end
end
# 每个 Agent 是一个 GenServer
defmodule Symphony.Agent do
use GenServer
# Agent 状态机
defstruct [:id, :task_id, :workspace, :state, :workflow_log]
# 状态转换
@states [:idle, :running, :waiting_ci, :pr_created, :completed, :failed]
def handle_call(:get_state, _from, state) do
{:reply, state.state, state}
end
def handle_cast({:execute_step, step}, state) do
new_state = execute_step(step, state)
{:noreply, %{state | state: new_state}}
end
end
五、与 Linear 的深度集成
5.1 任务自动流转
Symphony 与 Linear 的集成实现了任务的自动流转:
Linear 任务状态 Symphony 动作 Agent 行为
───────────────────────────────────────────────────────────────
Ready → 监听器发现任务 → 分配 Agent
In Progress → 更新任务分配信息 → Agent 开始工作
In Review → 监听 CI 状态 → 等待 CI 通过
Approved → 自动合并 PR → 清理工作空间
Done → 记录完成状态 → 任务完成
5.2 Linear 集成代码
class LinearIntegration:
"""
Linear 集成:任务监听和状态更新
"""
def __init__(self, api_key: str):
self.client = LinearClient(api_key=api_key)
async def sync_task_status(self, task_id: str,
symphony_state: str):
"""将 Symphony 中的 Agent 状态同步到 Linear"""
# 状态映射
state_mapping = {
"ready": "unstarted",
"running": "in_progress",
"waiting_ci": "in_review",
"pr_created": "in_review",
"completed": "done",
"failed": "titled"
}
linear_status = state_mapping.get(symphony_state, "titled")
await self.client.issues.update(
task_id,
state=linear_status,
# 追加评论记录 Agent 工作日志
comment=f"""
🤖 Symphony Agent 执行报告
状态: {symphony_state}
工作空间: {self.get_workspace_path(task_id)}
PR: {self.get_pr_url(task_id) or '待创建'}
执行日志:
{self.format_workflow_log(task_id)}
"""
)
async def filter_ai_tasks(self) -> list:
"""
筛选适合 AI Agent 处理的任务
规则:
1. 没有 assignee(未分配给人类)
2. 标签包含 "ai-ready"
3. 状态为 Ready 或 In Progress
"""
tasks = await self.client.issues.list(
filter={
"assignee": {"eq": None},
"labels": {"contains": "ai-ready"},
"state": {"in": ["unstarted", "in_progress"]}
}
)
# 进一步过滤:排除需要设计决策的任务
return [
t for t in tasks
if not self.requires_human_judgment(t)
]
def requires_human_judgment(self, task: Task) -> bool:
"""判断任务是否需要人类判断(不适合AI处理)"""
# 架构决策、安全相关、性能基准等任务
return any(
keyword in task.title.lower()
for keyword in ["architecture", "security", "design decision"]
)
六、CI/CD 深度集成
6.1 自动触发和监控
Symphony 不只是帮 AI 写代码,还自动管理 CI/CD 流程。
class CIIntegration:
"""
CI 系统集成:自动触发构建、监控状态、获取结果
"""
async def trigger_and_monitor(self, workspace: Workspace,
agent_id: str) -> CIResult:
"""
触发 CI 并持续监控直到完成
"""
# 1. 触发 GitHub Actions
build_id = await self.github_actions.trigger(
workflow="ci.yml",
ref=f"agent/{agent_id}",
inputs={
"workspace": workspace.path,
"agent_id": agent_id
}
)
# 2. 持续监控状态
max_wait = 3600 # 最多等待1小时
elapsed = 0
poll_interval = 30
while elapsed < max_wait:
status = await self.github_actions.get_status(build_id)
if status == "completed":
# 3. 获取测试结果
results = await self.github_actions.get_test_results(build_id)
return CIResult(
passed=results.all_passed,
tests_run=results.total,
tests_failed=results.failed,
duration=results.duration,
artifacts=results.artifacts
)
elif status in ["action_required", "cancelled", "failure"]:
return CIResult(
passed=False,
error=f"CI {status}",
tests_run=0,
tests_failed=0
)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
return CIResult(passed=False, error="Timeout")
async def monitor_pr_checks(self, pr_url: str) -> CheckResult:
"""监控 PR 的所有检查项状态"""
checks = await self.github_checks.list(pr_url)
pending = [c for c in checks if c.status == "pending"]
running = [c for c in checks if c.status == "in_progress"]
completed = [c for c in checks if c.status == "completed"]
failed = [c for c in completed if c.conclusion == "failure"]
return CheckResult(
total=len(checks),
pending=len(pending),
running=len(running),
failed=len(failed),
all_passed=len(failed) == 0 and len(pending) == 0
)
6.2 CI 失败自动修复循环
当 CI 失败时,Symphony 可以自动触发修复循环:
async def fix_and_retry(self, agent: Agent, ci_failures: list,
max_retries: int = 3):
"""
CI 失败自动修复循环
流程:
CI失败 → 分析失败原因 → Agent修复 → 重新运行CI →
→ 通过 → 提PR
→ 失败 → 记录原因 → 通知人工介入
"""
for attempt in range(1, max_retries + 1):
# 分析失败原因
failure_analysis = await self.analyze_failures(ci_failures)
# 让 Agent 修复
fix_result = await agent.fix_failures(failure_analysis)
if not fix_result.fixed:
# 无法自动修复,通知人工
await self.notify_human_intervention(
agent=agent,
reason=failure_analysis.summary,
attempt=attempt
)
return False
# 重新运行 CI
ci_result = await self.trigger_and_monitor(agent.workspace, agent.id)
if ci_result.passed:
# CI 通过,创建 PR
pr = await agent.create_pull_request()
return True
else:
ci_failures = ci_result.failures
agent.log(f"Attempt {attempt} failed, retrying...")
return False
七、PR 管理和审核流程
7.1 自动 PR 创建
class PRManager:
"""
Pull Request 管理:自动创建、组织PR队列、合并策略
"""
async def create_agent_pr(self, agent: Agent) -> str:
"""
为 Agent 的工作创建 Pull Request
"""
# 生成 PR 描述(包含完整的工作日志)
pr_body = self.generate_pr_description(agent)
# 创建 PR
pr = await self.github.create_pull_request(
title=f"[Agent] {agent.task.title}",
body=pr_body,
head=f"agent/{agent.id}",
base="main",
labels=["ai-generated", "symphony"]
)
# 更新 Linear 任务状态
await linear.update_task(
agent.task_id,
{"comment": f"🤖 PR 创建成功: {pr.url}"}
)
return pr.url
def generate_pr_description(self, agent: Agent) -> str:
"""生成标准化的 PR 描述"""
workflow_summary = self.summarize_workflow(agent.workflow_log)
return f"""
## 🤖 AI Agent PR
**Agent ID**: {agent.id}
**Task**: [{agent.task.title}]({agent.task.url})
### 执行摘要
{workflow_summary.summary}
### 变更文件
{workflow_summary.files_changed}
### 测试结果
- 总测试数:{workflow_summary.tests_run}
- 通过:{workflow_summary.tests_passed}
- 失败:{workflow_summary.tests_failed}
- CI 状态:{workflow_summary.ci_status}
### 工作日志
{workflow_summary.detailed_log}
---
*Generated by Symphony - OpenAI AI Agent Orchestration Framework*
"""
7.2 PR 合并策略
class MergeStrategy:
"""
PR 合并策略:定义何时可以自动合并
"""
async def can_auto_merge(self, pr: PR) -> AutoMergeDecision:
"""
判断 PR 是否可以自动合并
"""
checks = await self.github.get_all_checks(pr)
# 必须全部通过
if not all(c.passed for c in checks):
return AutoMergeDecision(
can_merge=False,
reason="Some checks failed",
blocking_checks=[c.name for c in checks if not c.passed]
)
# 文件大小检查(防止一次性提交过多代码)
diff_size = await self.github.get_diff_size(pr)
if diff_size > 10000: # 超过10000行
return AutoMergeDecision(
can_merge=False,
reason="Diff too large, requires human review",
size=diff_size
)
# 风险标签检查
if self.has_risk_labels(pr):
return AutoMergeDecision(
can_merge=False,
reason="Contains risk labels, requires human review"
)
return AutoMergeDecision(
can_merge=True,
reason="All checks passed, diff size acceptable"
)
async def queue_for_merge(self, pr: PR):
"""
加入合并队列(防止同时合并多个 PR 冲突)
"""
await self.merge_queue.enqueue(pr, priority=pr.priority)
# 等待队列轮到
while not await self.merge_queue.is_front(pr):
await asyncio.sleep(10)
# 执行合并
await self.github.merge(pr)
八、规模化部署
8.1 多节点部署
# docker-compose.yml 示例(Symphony 分布式部署)
version: '3.8'
services:
symphony-core:
image: openai/symphony:latest
ports:
- "4000:4000"
environment:
SYMPHONY_MODE: "leader"
LINEAR_API_KEY: "${LINEAR_API_KEY}"
GITHUB_TOKEN: "${GITHUB_TOKEN}"
volumes:
- workspace-pool:/workspaces
deploy:
replicas: 1
symphony-worker:
image: openai/symphony:latest
environment:
SYMPHONY_MODE: "worker"
SYMPHONY_LEADER: "symphony-core:4000"
MAX_CONCURRENT_AGENTS: 20
WORKSPACE_POOL_SIZE: 50
volumes:
- workspace-pool:/workspaces
deploy:
replicas: 3
task-monitor:
image: openai/symphony:latest
command: "symphony monitor --source=linear"
environment:
SYMPHONY_MODE: "monitor"
LINEAR_API_KEY: "${LINEAR_API_KEY}"
POLLING_INTERVAL: "60"
deploy:
replicas: 1
volumes:
workspace-pool:
driver: local
8.2 配置示例
# symphony.yaml 配置文件
# 任务源配置
task_sources:
linear:
api_key: ${LINEAR_API_KEY}
team_id: ${LINEAR_TEAM_ID}
filter:
labels: ["ai-ready"]
states: ["unstarted", "in_progress"]
github_issues:
repo: "my-org/my-repo"
label: "ai-ready"
# Agent 配置
agents:
max_concurrent: 20
default_model: "codex"
workspace_pool_size: 50
workspace_timeout: 3600 # 1小时超时
# CI 配置
ci:
provider: "github-actions"
workflows:
- "ci.yml"
- "test.yml"
auto_trigger: true
monitor_duration: 3600
# PR 配置
pr:
auto_create: true
auto_merge: false # 默认不自动合并,需人工审核
merge_queue: true
labels:
- "ai-generated"
- "symphony"
# 通知配置
notifications:
on_failure:
- type: "linear-comment"
- type: "slack"
webhook: ${SLACK_WEBHOOK}
on_success:
- type: "linear-comment"
九、与 OpenHands 的关系
9.1 两者定位对比
Symphony 和 OpenHands 都是 OpenAI 支持的 AI Agent 项目,但定位不同:
| 维度 | OpenHands | Symphony |
|---|---|---|
| 定位 | 全能型 AI 工程师界面 | AI Agent 编排框架 |
| 交互方式 | 单一 Agent,交互式 | 多 Agent,自动化 |
| 用户界面 | 有人工交互界面 | 无界面,纯自动化 |
| 任务来源 | 人类手动输入 | 自动监听任务系统 |
| PR 管理 | 手动 | 自动 |
| 适用场景 | 个人开发者 | 工程团队 |
9.2 互补关系
OpenHands 和 Symphony 可以互补:
Symphony(编排层)
│
├─ Agent 1: OpenHands → 处理用户认证模块
├─ Agent 2: OpenHands → 处理支付模块
└─ Agent 3: OpenHands → 处理通知模块
│
└── 每个 OpenHands 实例在其隔离工作空间中运行
Symphony 负责"谁来做什么",OpenHands 负责"具体怎么做"。
十、实践案例
10.1 场景:Bug 修复自动化
传统流程(人工):
- 工程师从 Linear 领取 Bug
- 分析代码,找出问题
- 写修复代码
- 运行本地测试
- 提交 PR
- CI 通过后人工合并
- 更新 Linear 状态
Symphony 流程(自动化):
- Task Monitor 发现 Linear 中的未分配 Bug
- Orchestrator 分配 Agent 处理
- Agent 在隔离工作空间分析 + 修复 + 测试
- CI 自动触发
- PR 自动创建
- 工程师审核 PR → 确认合并
- Linear 状态自动更新
人力节省:工程师从每个 Bug 花费 30-60 分钟,减少到只需审核 PR 的 5 分钟。
10.2 场景:多模块并行开发
当团队需要在同一时间开发多个功能时:
Sprint: 实现用户画像系统
Symphony 编排:
├─ Agent 1: 实现用户基本信息模块
│ └─ → PR #42
├─ Agent 2: 实现用户偏好分析模块
│ └─ → PR #43
├─ Agent 3: 实现用户画像展示组件
│ └─ → PR #44
└─ Agent 4: 实现画像 API 端点
└─ → PR #45
所有 PR 都在 CI 通过后排队等待审核合并
工程师的职责从"写代码"变成"审核代码和做架构决策"。
十一、局限性和挑战
11.1 架构决策问题
AI Agent 目前还无法做出架构层面的决策。比如"是否需要重构这个模块"、"这个性能优化值不值得做"这类判断,还是需要人类工程师来做。
Symphony 的解法:通过标签过滤,让 AI 只处理"明确可执行"的任务(修复Bug、实现明确的功能需求),避免架构决策类任务。
11.2 代码质量风险
AI 生成的代码虽然能通过测试,但可能存在:
- 不符合团队编码规范
- 没有考虑边界情况
- 可读性差
Symphony 的解法:PR 审核环节必须有工程师参与,不能完全自动合并。
11.3 安全性
让 AI 自动提交代码、合并 PR,存在一定的安全风险:
- AI 可能被恶意指令诱导
- 自动合并可能导致有问题的代码进入主分支
Symphony 的解法:
- 工作空间完全隔离
- PR 合并默认需要人工审核
- 所有操作都有完整审计日志
十二、Symphony 对开发者的意义
12.1 重新定义开发者的角色
Symphony 带来一个重要的转变:开发者从"代码执行者"变成"AI 管理者"。
传统角色: Symphony 角色:
├─ 写代码 ├─ 定义任务(从 Linear)
├─ 跑测试 ├─ 审核 PR(最终决策)
├─ 提交 PR ├─ 架构决策(AI 做不了的)
└─ 合并代码 └─ 监控 AI 团队工作
12.2 适合哪些团队?
适合:
- 有大量重复性开发任务的中大型团队
- 使用 Linear/JIRA 管理任务的项目
- 需要并行处理多个功能的敏捷团队
- 有 CI/CD 完整流水线的团队
不适合:
- 个人开发者(直接用 Claude Code 更简单)
- 小型项目(开销大于收益)
- 需要大量架构设计的项目
12.3 如何开始?
# 1. 克隆 Symphony
git clone https://github.com/openai/symphony.git
cd symphony
# 2. 安装依赖
npm install
# 3. 配置环境变量
cp .env.example .env
# 编辑 .env,填入 Linear API Key 和 GitHub Token
# 4. 启动 Symphony
npm run start
# 5. 配置 Linear 标签
# 在 Linear 中创建 "ai-ready" 标签
# 将适合 AI 处理的任务打上此标签
# 6. Symphony 开始自动监听并处理任务
十三、总结:AI团队协作的新范式
Symphony 最重要的意义,不是某一个具体功能的实现,而是它提出了一个根本性的问题:
当我们有了多个 AI 智能体,如何让它们像工程师团队一样协同工作?
这不是一个技术问题,而是一个工程管理问题。
Claude Code 解决的是"单人 AI 助手的体验"。Symphony 解决的是"AI 团队的工程化管理"。两者代表了 AI 编程工具演进的两个方向:
- 深度:让单个 AI 越来越强(Claude Code 的方向)
- 广度:让多个 AI 协同工作(Symphony 的方向)
最终,两者会汇合:一群强大的 AI 智能体,像一个高效的工程师团队一样协作。
Symphony 是这个愿景的第一步。
相关资源:
- GitHub:https://github.com/openai/symphony
- SPEC.md:https://github.com/openai/symphony/blob/main/SPEC.md
- 官方文档:https://symphony.openai.com
- Elixir/OTP 参考实现:https://github.com/openai/symphony/tree/main/elixir