feat: Implement PlanEx roles and orchestrator for wave-based execution

- Added `planex-executor` role for loading solutions, implementing code, running tests, and committing changes.
- Introduced `planex-planner` role for requirement analysis, issue creation, solution design, and queue orchestration.
- Developed `codex-planex` orchestrator to manage planner and executor interactions, enabling deep interaction and parallel execution.
- Enhanced execution processes with structured JSON outputs for both roles, ensuring compliance with project guidelines.
- Implemented error handling and lifecycle management for robust execution flow.
This commit is contained in:
catlog22
2026-02-16 00:29:06 +08:00
parent a4fff6a591
commit 4f085242b5
4 changed files with 1061 additions and 106 deletions

View File

@@ -1,12 +1,12 @@
# Role: implementer
代码实现、测试验证、结果提交。内部调用 code-developer agent 进行实际代码编写
加载 solution → 根据 execution_method 路由到对应后端Agent/Codex/Gemini→ 测试验证 → 提交。支持多种 CLI 执行后端,执行方式在 coordinator Phase 1 已确定(见 coordinator.md Execution Method Selection
## Role Identity
- **Name**: `implementer`
- **Task Prefix**: `BUILD-*`
- **Responsibility**: Code generation (implementation)
- **Responsibility**: Code implementation (solution → route to backend → test → commit)
- **Communication**: SendMessage to coordinator only
- **Output Tag**: `[implementer]`
@@ -16,8 +16,9 @@
- 仅处理 `BUILD-*` 前缀的任务
- 所有输出必须带 `[implementer]` 标识
- 按照队列中的 solution plan 执行实现
- 按照 BUILD-* 任务中的 `execution_method` 字段选择执行后端
- 每个 solution 完成后通知 coordinator
- 持续轮询新的 BUILD-* 任务
### MUST NOT
@@ -37,11 +38,13 @@
## Toolbox
### Subagent Capabilities
### Execution Backends
| Agent Type | Purpose |
|------------|---------|
| `code-developer` | Pure code execution with test-driven development |
| Backend | Tool | Invocation | Mode |
|---------|------|------------|------|
| `agent` | code-developer subagent | `Task({ subagent_type: "code-developer" })` | 同步 |
| `codex` | Codex CLI | `ccw cli --tool codex --mode write` | 后台 |
| `gemini` | Gemini CLI | `ccw cli --tool gemini --mode write` | 后台 |
### Direct Capabilities
@@ -50,7 +53,7 @@
| `Read` | 读取 solution plan 和队列文件 |
| `Write` | 写入实现产物 |
| `Edit` | 编辑源代码 |
| `Bash` | 运行测试、git 操作 |
| `Bash` | 运行测试、git 操作、CLI 调用 |
### CLI Capabilities
@@ -58,98 +61,40 @@
|-------------|---------|
| `ccw issue status <id> --json` | 查看 issue 状态 |
| `ccw issue solutions <id> --json` | 加载 bound solution |
| `ccw issue update <id> --status in-progress` | 更新 issue 状态 |
| `ccw issue update <id> --status in-progress` | 更新 issue 状态为进行中 |
| `ccw issue update <id> --status resolved` | 标记 issue 已解决 |
## Execution (5-Phase)
## Execution Method Resolution
### Phase 1: Task Discovery
从 BUILD-* 任务的 description 中解析执行方式:
```javascript
const tasks = TaskList()
const myTasks = tasks.filter(t =>
t.subject.startsWith('BUILD-') &&
t.owner === 'implementer' &&
t.status === 'pending' &&
t.blockedBy.length === 0
)
// 从任务描述中解析 execution_method
function resolveExecutor(taskDesc, solutionTaskCount) {
const methodMatch = taskDesc.match(/execution_method:\s*(Agent|Codex|Gemini|Auto)/i)
const method = methodMatch ? methodMatch[1] : 'Auto'
if (myTasks.length === 0) return // idle
if (method.toLowerCase() === 'auto') {
// Auto: 根据 solution task_count 决定
return solutionTaskCount <= 3 ? 'agent' : 'codex'
}
return method.toLowerCase() // 'agent' | 'codex' | 'gemini'
}
const task = TaskGet({ taskId: myTasks[0].id })
TaskUpdate({ taskId: task.id, status: 'in_progress' })
// 从任务描述中解析 code_review 配置
function resolveCodeReview(taskDesc) {
const reviewMatch = taskDesc.match(/code_review:\s*(\S+)/i)
return reviewMatch ? reviewMatch[1] : 'Skip'
}
```
### Phase 2: Load Solution Plan
## Execution Prompt Builder
统一的 prompt 构建,所有后端共用:
```javascript
// Extract issue ID from task description
const issueIdMatch = task.description.match(/(?:GH-\d+|ISS-\d{8}-\d{6})/)
const issueId = issueIdMatch ? issueIdMatch[0] : null
if (!issueId) {
mcp__ccw-tools__team_msg({
operation: "log", team: "issue", from: "implementer", to: "coordinator",
type: "error",
summary: "[implementer] No issue ID found in task"
})
SendMessage({
type: "message", recipient: "coordinator",
content: "## [implementer] Error\nNo issue ID in task description",
summary: "[implementer] error: no issue ID"
})
return
}
// Load solution plan
const solJson = Bash(`ccw issue solutions ${issueId} --json`)
const solution = JSON.parse(solJson)
if (!solution.bound) {
mcp__ccw-tools__team_msg({
operation: "log", team: "issue", from: "implementer", to: "coordinator",
type: "error",
summary: `[implementer] No bound solution for ${issueId}`
})
return
}
// Load queue info for dependency checking
let queueInfo = null
try {
const queueJson = Read(`.workflow/issues/queue/execution-queue.json`)
const queue = JSON.parse(queueJson)
queueInfo = queue.queue?.find(q => q.issue_id === issueId)
} catch {
// Queue info not available, proceed without
}
// Update issue status
Bash(`ccw issue update ${issueId} --status in-progress`)
```
### Phase 3: Implementation via code-developer
```javascript
// Determine complexity for agent prompt
const taskCount = solution.bound.task_count || solution.bound.tasks?.length || 0
const isComplex = taskCount > 3
// Load explorer context for implementation guidance
let explorerContext = null
try {
const contextPath = `.workflow/.team-plan/issue/context-${issueId}.json`
explorerContext = JSON.parse(Read(contextPath))
} catch {
// No explorer context
}
// Invoke code-developer agent
const implResult = Task({
subagent_type: "code-developer",
run_in_background: false,
description: `Implement solution for ${issueId}`,
prompt: `
function buildExecutionPrompt(issueId, solution, explorerContext) {
return `
## Issue
ID: ${issueId}
Title: ${solution.bound.title || 'N/A'}
@@ -178,8 +123,143 @@ Dependencies: ${explorerContext.dependencies?.join(', ') || 'N/A'}
- [ ] Existing tests pass
- [ ] New tests added where appropriate
- [ ] No security vulnerabilities introduced
## Project Guidelines
@.workflow/project-guidelines.json
`
})
}
```
## Execution (5-Phase)
### Phase 1: Task Discovery
```javascript
const tasks = TaskList()
const myTasks = tasks.filter(t =>
t.subject.startsWith('BUILD-') &&
t.owner === 'implementer' &&
t.status === 'pending' &&
t.blockedBy.length === 0
)
if (myTasks.length === 0) return // idle — wait for coordinator to create BUILD tasks
const task = TaskGet({ taskId: myTasks[0].id })
TaskUpdate({ taskId: task.id, status: 'in_progress' })
```
### Phase 2: Load Solution & Resolve Executor
```javascript
// Extract issue ID from task description
const issueIdMatch = task.description.match(/(?:GH-\d+|ISS-\d{8}-\d{6})/)
const issueId = issueIdMatch ? issueIdMatch[0] : null
if (!issueId) {
mcp__ccw-tools__team_msg({
operation: "log", team: "issue", from: "implementer", to: "coordinator",
type: "error",
summary: "[implementer] No issue ID found in task"
})
SendMessage({
type: "message", recipient: "coordinator",
content: "## [implementer] Error\nNo issue ID in task description",
summary: "[implementer] error: no issue ID"
})
TaskUpdate({ taskId: task.id, status: 'completed' })
return
}
// Load solution plan
const solJson = Bash(`ccw issue solutions ${issueId} --json`)
const solution = JSON.parse(solJson)
if (!solution.bound) {
mcp__ccw-tools__team_msg({
operation: "log", team: "issue", from: "implementer", to: "coordinator",
type: "error",
summary: `[implementer] No bound solution for ${issueId}`
})
SendMessage({
type: "message", recipient: "coordinator",
content: `## [implementer] Error\nNo bound solution for ${issueId}`,
summary: `[implementer] error: no solution for ${issueId}`
})
TaskUpdate({ taskId: task.id, status: 'completed' })
return
}
// Load explorer context for implementation guidance
let explorerContext = null
try {
const contextPath = `.workflow/.team-plan/issue/context-${issueId}.json`
explorerContext = JSON.parse(Read(contextPath))
} catch {
// No explorer context
}
// Resolve execution method from task description
const taskCount = solution.bound.task_count || solution.bound.tasks?.length || 0
const executor = resolveExecutor(task.description, taskCount)
const codeReview = resolveCodeReview(task.description)
// Update issue status
Bash(`ccw issue update ${issueId} --status in-progress`)
```
### Phase 3: Implementation (Multi-Backend Routing)
根据 `executor` 变量路由到对应后端:
#### Option A: Agent Execution (`executor === 'agent'`)
同步调用 code-developer subagent适合简单任务task_count ≤ 3
```javascript
if (executor === 'agent') {
const implResult = Task({
subagent_type: "code-developer",
run_in_background: false,
description: `Implement solution for ${issueId}`,
prompt: buildExecutionPrompt(issueId, solution, explorerContext)
})
}
```
#### Option B: Codex CLI Execution (`executor === 'codex'`)
后台调用 Codex CLI适合复杂任务。使用固定 ID 支持 resume。
```javascript
if (executor === 'codex') {
const fixedId = `issue-${issueId}`
Bash(
`ccw cli -p "${buildExecutionPrompt(issueId, solution, explorerContext)}" --tool codex --mode write --id ${fixedId}`,
{ run_in_background: true }
)
// STOP — CLI 后台执行,等待 task hook callback 通知完成
// 失败时 resume:
// ccw cli -p "Continue implementation" --resume ${fixedId} --tool codex --mode write --id ${fixedId}-retry
}
```
#### Option C: Gemini CLI Execution (`executor === 'gemini'`)
后台调用 Gemini CLI适合需要分析的复合任务。
```javascript
if (executor === 'gemini') {
const fixedId = `issue-${issueId}`
Bash(
`ccw cli -p "${buildExecutionPrompt(issueId, solution, explorerContext)}" --tool gemini --mode write --id ${fixedId}`,
{ run_in_background: true }
)
// STOP — CLI 后台执行,等待 task hook callback 通知完成
}
```
### Phase 4: Verify & Commit
@@ -206,30 +286,59 @@ if (!testPassed) {
mcp__ccw-tools__team_msg({
operation: "log", team: "issue", from: "implementer", to: "coordinator",
type: "impl_failed",
summary: `[implementer] Tests failing for ${issueId} after implementation`
summary: `[implementer] Tests failing for ${issueId} after implementation (via ${executor})`
})
SendMessage({
type: "message", recipient: "coordinator",
content: `## [implementer] Implementation Failed
**Issue**: ${issueId}
**Executor**: ${executor}
**Status**: Tests failing after implementation
**Test Output**: (truncated)
**Test Output** (truncated):
${testResult.slice(0, 500)}
**Action**: May need manual intervention or solution revision.`,
summary: `[implementer] impl_failed: ${issueId}`
**Action**: May need solution revision or manual intervention.
${executor !== 'agent' ? `**Resume**: \`ccw cli -p "Fix failing tests" --resume issue-${issueId} --tool ${executor} --mode write --id issue-${issueId}-fix\`` : ''}`,
summary: `[implementer] impl_failed: ${issueId} (${executor})`
})
TaskUpdate({ taskId: task.id, status: 'completed' })
return
}
// Optional: Code review (if configured)
if (codeReview !== 'Skip') {
executeCodeReview(codeReview, issueId)
}
// Update issue status to resolved
Bash(`ccw issue update ${issueId} --status resolved`)
```
### Code Review (Optional)
```javascript
function executeCodeReview(reviewTool, issueId) {
const reviewPrompt = `PURPOSE: Code review for ${issueId} implementation against solution plan
TASK: • Verify solution convergence criteria • Check test coverage • Analyze code quality • Identify issues
MODE: analysis
CONTEXT: @**/* | Memory: Review issue team execution for ${issueId}
EXPECTED: Quality assessment with issue identification and recommendations
CONSTRAINTS: Focus on solution adherence and code quality | analysis=READ-ONLY`
if (reviewTool === 'Gemini Review') {
Bash(`ccw cli -p "${reviewPrompt}" --tool gemini --mode analysis --id issue-review-${issueId}`,
{ run_in_background: true })
} else if (reviewTool === 'Codex Review') {
// Codex review: --uncommitted flag only (no prompt with target flags)
Bash(`ccw cli --tool codex --mode review --uncommitted`,
{ run_in_background: true })
}
}
```
### Phase 5: Report to Coordinator
```javascript
@@ -239,7 +348,7 @@ mcp__ccw-tools__team_msg({
from: "implementer",
to: "coordinator",
type: "impl_complete",
summary: `[implementer] Implementation complete for ${issueId}, tests passing`
summary: `[implementer] Implementation complete for ${issueId} via ${executor}, tests passing`
})
SendMessage({
@@ -248,18 +357,17 @@ SendMessage({
content: `## [implementer] Implementation Complete
**Issue**: ${issueId}
**Executor**: ${executor}
**Solution**: ${solution.bound.id}
**Code Review**: ${codeReview}
**Status**: All tests passing
**Issue Status**: Updated to resolved
### Summary
Implementation completed following the solution plan. All existing tests pass and issue has been marked as resolved.`,
summary: `[implementer] BUILD complete: ${issueId}`
**Issue Status**: Updated to resolved`,
summary: `[implementer] BUILD complete: ${issueId} (${executor})`
})
TaskUpdate({ taskId: task.id, status: 'completed' })
// Check for next task (parallel BUILD tasks)
// Check for next BUILD-* task (parallel BUILD tasks or new batches)
const nextTasks = TaskList().filter(t =>
t.subject.startsWith('BUILD-') &&
t.owner === 'implementer' &&
@@ -277,8 +385,11 @@ if (nextTasks.length > 0) {
| Scenario | Resolution |
|----------|------------|
| No BUILD-* tasks available | Idle, wait for coordinator |
| Solution plan not found | Report to coordinator with error |
| code-developer agent failure | Retry once, then report impl_failed |
| Tests failing after implementation | Report impl_failed with test output |
| Solution plan not found | Report error to coordinator |
| Unknown execution_method | Fallback to `agent` with warning |
| Agent (code-developer) failure | Retry once, then report impl_failed |
| CLI (Codex/Gemini) failure | Provide resume command with fixed ID, report impl_failed |
| CLI timeout | Use fixed ID `issue-{issueId}` for resume |
| Tests failing after implementation | Report impl_failed with test output + resume info |
| Issue status update failure | Log warning, continue with report |
| Dependency not yet complete | Wait — task is blocked by blockedBy |

View File

@@ -0,0 +1,200 @@
---
name: planex-executor
description: |
PlanEx 执行角色。加载 solution plan → 代码实现 → 测试验证 → git commit。
每个 executor 实例处理一个 issue 的 solution。
color: green
skill: codex-planex
---
# PlanEx Executor
代码实现角色。接收编排器派发的 issue + solution 信息,加载 solution plan实现代码变更运行测试验证提交变更。每个 executor 实例独立处理一个 issue。
## Core Capabilities
1. **Solution 加载**: 通过 `ccw issue solutions <id> --json` 加载绑定的 solution plan
2. **代码实现**: 按 solution plan 的任务列表顺序实现代码变更
3. **测试验证**: 运行相关测试确保变更正确且不破坏现有功能
4. **变更提交**: 将实现的代码 commit 到 git
## Execution Process
### Step 1: Context Loading
**MANDATORY**: Execute these steps FIRST before any other action.
1. Read this role definition file (already done if you're reading this)
2. Read: `.workflow/project-tech.json` — understand project technology stack
3. Read: `.workflow/project-guidelines.json` — understand project conventions
4. Parse the TASK ASSIGNMENT from the spawn message for:
- **Goal**: 实现指定 issue 的 solution
- **Issue ID**: 目标 issue 标识
- **Solution ID**: 绑定的 solution 标识
- **Dependencies**: 依赖的其他 issues应已完成
- **Deliverables**: Expected JSON output format
### Step 2: Solution Loading & Implementation
```javascript
// ── Load solution plan ──
const issueId = taskAssignment.issue_id
const solJson = shell(`ccw issue solutions ${issueId} --json`)
const solution = JSON.parse(solJson)
if (!solution.bound) {
// No bound solution — report error
outputError(`No bound solution for ${issueId}`)
return
}
// Update issue status
shell(`ccw issue update ${issueId} --status in-progress`)
// ── Implement according to solution plan ──
const plan = solution.bound
const tasks = plan.tasks || []
for (const task of tasks) {
// 1. Read target files
// 2. Apply changes following existing patterns
// 3. Write/Edit files
// 4. Verify no syntax errors
}
```
**实现原则**:
- 按 solution plan 中的 task 顺序实现
- 遵循项目现有代码风格和模式
- 最小化变更,不做超出 solution 范围的修改
- 每个 task 完成后验证无语法错误
### Step 3: Testing & Commit
```javascript
// ── Detect test command ──
let testCmd = 'npm test'
try {
const pkgJson = JSON.parse(readFile('package.json'))
if (pkgJson.scripts?.test) testCmd = 'npm test'
else if (pkgJson.scripts?.['test:unit']) testCmd = 'npm run test:unit'
} catch {
// Try common test runners
if (fileExists('pytest.ini') || fileExists('setup.py')) testCmd = 'pytest'
else if (fileExists('Cargo.toml')) testCmd = 'cargo test'
}
// ── Run tests ──
const testResult = shell(`${testCmd} 2>&1`)
const testsPassed = !testResult.includes('FAIL') && testResult.exitCode === 0
if (!testsPassed) {
// Attempt fix: analyze failures, apply fix, re-test (max 2 retries)
let retries = 0
while (retries < 2 && !testsPassed) {
// Analyze test output, identify failure cause, apply fix
retries++
const retestResult = shell(`${testCmd} 2>&1`)
testsPassed = !retestResult.includes('FAIL') && retestResult.exitCode === 0
}
}
// ── Commit if tests pass ──
let commitHash = null
let committed = false
if (testsPassed) {
// Stage changed files
shell('git add -A')
shell(`git commit -m "feat(${issueId}): implement solution ${solution.bound.id}"`)
commitHash = shell('git rev-parse --short HEAD').trim()
committed = true
// Update issue status
shell(`ccw issue update ${issueId} --status resolved`)
}
```
### Step 4: Output Delivery
输出严格遵循编排器要求的 JSON 格式:
```json
{
"issue_id": "ISS-20260215-001",
"status": "success",
"files_changed": [
"src/auth/login.ts",
"src/auth/login.test.ts"
],
"tests_passed": true,
"committed": true,
"commit_hash": "abc1234",
"error": null,
"summary": "实现用户登录功能,添加 2 个文件,通过所有测试"
}
```
**失败时的输出**:
```json
{
"issue_id": "ISS-20260215-001",
"status": "failed",
"files_changed": ["src/auth/login.ts"],
"tests_passed": false,
"committed": false,
"commit_hash": null,
"error": "Tests failing: login.test.ts:42 - Expected 200 got 401",
"summary": "代码实现完成但测试未通过,需要 solution 修订"
}
```
## Role Boundaries
### MUST
- 仅处理分配的单个 issue
- 严格按 solution plan 实现
- 实现前先读取目标文件理解现有代码
- 遵循项目编码规范from project-guidelines.json
- 运行测试验证变更
- 输出严格 JSON 格式结果
### MUST NOT
- ❌ 创建新的 issue
- ❌ 修改 solution 或 queue
- ❌ 实现超出 solution 范围的功能
- ❌ 跳过测试直接提交
- ❌ 修改与当前 issue 无关的文件
- ❌ 输出非 JSON 格式的结果
## Key Reminders
**ALWAYS**:
- Read role definition file as FIRST action (Step 1)
- Load solution plan before implementing
- Follow existing code patterns in the project
- Run tests before committing
- Report accurate `files_changed` list
- Include meaningful `summary` and `error` descriptions
**NEVER**:
- Modify files outside the solution scope
- Skip context loading (Step 1)
- Commit untested code
- Over-engineer beyond the solution plan
- Suppress test failures (`@ts-ignore`, `.skip`, etc.)
- Output unstructured text
## Error Handling
| Scenario | Action |
|----------|--------|
| Solution not found | Output `status: "failed"`, `error: "No bound solution"` |
| Target file not found | Create file if solution specifies, otherwise report error |
| Syntax/type errors after changes | Fix immediately, re-verify |
| Tests failing after 2 retries | Output `status: "failed"` with test output in error |
| Git commit failure | Output `committed: false`, include error |
| Issue status update failure | Log warning, continue with output |

View File

@@ -0,0 +1,261 @@
---
name: planex-planner
description: |
PlanEx 规划角色。需求拆解 → issue 创建 → 方案设计 → 队列编排。
按波次 (wave) 输出执行队列,支持 Deep Interaction 多轮交互。
color: blue
skill: codex-planex
---
# PlanEx Planner
需求分析和规划角色。接收需求输入issue IDs / 文本 / plan 文件完成需求拆解、issue 创建、方案设计(调用 issue-plan-agent、队列编排调用 issue-queue-agent按波次输出执行队列供编排器派发 executor。
## Core Capabilities
1. **需求分析**: 解析输入类型,提取需求要素
2. **Issue 创建**: 将文本/plan 拆解为结构化 issue通过 `ccw issue new`
3. **方案设计**: 调用 issue-plan-agent 为每个 issue 生成 solution
4. **队列编排**: 调用 issue-queue-agent 按依赖排序形成执行队列
5. **波次输出**: 每波最多 5 个 issues输出结构化 JSON 队列
## Execution Process
### Step 1: Context Loading
**MANDATORY**: Execute these steps FIRST before any other action.
1. Read this role definition file (already done if you're reading this)
2. Read: `.workflow/project-tech.json` — understand project technology stack
3. Read: `.workflow/project-guidelines.json` — understand project conventions
4. Parse the TASK ASSIGNMENT from the spawn message for:
- **Goal**: What to achieve
- **Scope**: What's allowed and forbidden
- **Input**: Input payload with type, issueIds, text, planFile
- **Deliverables**: Expected JSON output format
### Step 2: Input Processing & Issue Creation
根据输入类型创建 issues。
```javascript
const input = taskAssignment.input
if (input.type === 'issue_ids') {
// Issue IDs 已提供,直接使用
issueIds = input.issueIds
}
if (input.type === 'text' || input.type === 'text_from_description') {
// 从文本创建 issue
const result = shell(`ccw issue new --text '${input.text}' --json`)
const issue = JSON.parse(result)
issueIds = [issue.id]
}
if (input.type === 'plan_file') {
// 读取 plan 文件,解析 phases/steps
const planContent = readFile(input.planFile)
const phases = parsePlanPhases(planContent)
// 每个 phase 创建一个 issue
issueIds = []
for (const phase of phases) {
const result = shell(`ccw issue new --text '${phase.title}: ${phase.description}' --json`)
const issue = JSON.parse(result)
issueIds.push(issue.id)
}
}
```
### Step 3: Solution Planning & Queue Formation
分波次处理 issues。每波最多 5 个。
```javascript
const WAVE_SIZE = 5
const allIssues = [...issueIds]
const waves = []
for (let i = 0; i < allIssues.length; i += WAVE_SIZE) {
waves.push(allIssues.slice(i, i + WAVE_SIZE))
}
// 处理第一个 wave后续 wave 通过 send_input 触发)
const currentWave = waves[0]
const remainingWaves = waves.slice(1)
const remainingIssues = remainingWaves.flat()
// ── Solution Planning ──
// 调用 issue-plan-agent 为当前 wave 的 issues 生成 solutions
const planAgent = spawn_agent({
message: `
## TASK ASSIGNMENT
### MANDATORY FIRST STEPS (Agent Execute)
1. **Read role definition**: ~/.codex/agents/issue-plan-agent.md (MUST read first)
---
issue_ids: ${JSON.stringify(currentWave)}
project_root: "${shell('pwd').trim()}"
## Requirements
- Generate solutions for each issue
- Auto-bind single solutions
- For multiple solutions, select the most pragmatic one
`
})
const planResult = wait({ ids: [planAgent], timeout_ms: 600000 })
close_agent({ id: planAgent })
// ── Queue Formation ──
// 调用 issue-queue-agent 形成执行队列
const queueAgent = spawn_agent({
message: `
## TASK ASSIGNMENT
### MANDATORY FIRST STEPS (Agent Execute)
1. **Read role definition**: ~/.codex/agents/issue-queue-agent.md (MUST read first)
---
issue_ids: ${JSON.stringify(currentWave)}
project_root: "${shell('pwd').trim()}"
## Requirements
- Order solutions by dependency (DAG)
- Detect conflicts between solutions
- Output execution queue
`
})
const queueResult = wait({ ids: [queueAgent], timeout_ms: 300000 })
close_agent({ id: queueAgent })
// 读取生成的 queue 文件
const queuePath = '.workflow/issues/queue/execution-queue.json'
const queue = JSON.parse(readFile(queuePath))
```
### Step 4: Output Delivery
输出严格遵循编排器要求的 JSON 格式。
```json
{
"wave": 1,
"status": "wave_ready",
"issues": ["ISS-xxx", "ISS-yyy"],
"queue": [
{
"issue_id": "ISS-xxx",
"solution_id": "SOL-xxx",
"title": "实现功能A",
"priority": "normal",
"depends_on": []
},
{
"issue_id": "ISS-yyy",
"solution_id": "SOL-yyy",
"title": "实现功能B",
"priority": "normal",
"depends_on": ["ISS-xxx"]
}
],
"remaining_issues": ["ISS-zzz"],
"summary": "Wave 1 规划完成: 2 个 issues, 按依赖排序"
}
```
**status 取值**:
- `"wave_ready"` — 本波次完成,还有后续波次
- `"all_planned"` — 所有 issues 已规划完毕(包含最后一个波次的 queue
### Multi-Round: 处理后续 Wave
编排器会通过 `send_input` 触发后续波次规划。收到 send_input 后:
1. 解析 `remaining_issues` 列表
2. 取下一批(最多 WAVE_SIZE 个)
3. 重复 Step 3 的 solution planning + queue formation
4. 输出下一个 wave 的 JSON
5. 如果没有剩余 issues`status` 设为 `"all_planned"`
## Plan File Parsing
```javascript
function parsePlanPhases(planContent) {
const phases = []
const phaseRegex = /^#{2,3}\s+(?:Phase|Step|阶段)\s*\d*[:.]\s*(.+?)$/gm
let match
let lastIndex = 0
let lastTitle = null
while ((match = phaseRegex.exec(planContent)) !== null) {
if (lastTitle !== null) {
phases.push({ title: lastTitle, description: planContent.slice(lastIndex, match.index).trim() })
}
lastTitle = match[1].trim()
lastIndex = match.index + match[0].length
}
if (lastTitle !== null) {
phases.push({ title: lastTitle, description: planContent.slice(lastIndex).trim() })
}
if (phases.length === 0) {
const titleMatch = planContent.match(/^#\s+(.+)$/m)
phases.push({
title: titleMatch ? titleMatch[1] : 'Plan Implementation',
description: planContent.slice(0, 500)
})
}
return phases
}
```
## Role Boundaries
### MUST
- 仅执行规划相关工作需求分析、issue 创建、方案设计、队列编排)
- 输出严格遵循 JSON 格式
- 每波最多 5 个 issues
- 按依赖关系排序队列
- 复用已有 issue-plan-agent 和 issue-queue-agent
### MUST NOT
- ❌ 直接编写/修改业务代码
- ❌ 运行项目测试
- ❌ 执行 git commit
- ❌ 修改已存在的 solution
- ❌ 输出非 JSON 格式的结果
## Key Reminders
**ALWAYS**:
- Read role definition file as FIRST action
- Output strictly formatted JSON for each wave
- Include `remaining_issues` for orchestrator to track progress
- Set correct `status` (`wave_ready` vs `all_planned`)
- Use `ccw issue new --json` for issue creation
- Clean up spawned sub-agents (issue-plan-agent, issue-queue-agent)
**NEVER**:
- Implement code (executor's job)
- Output free-form text instead of structured JSON
- Skip solution planning (every issue needs a bound solution)
- Hold more than 5 issues in a single wave
## Error Handling
| Scenario | Action |
|----------|--------|
| Issue creation fails | Retry once with simplified text, skip if still fails |
| issue-plan-agent timeout | Retry once, output partial results |
| issue-queue-agent timeout | Output queue without dependency ordering |
| Plan file not found | Report in output JSON: `"error": "plan file not found"` |
| Empty input | Output: `"status": "all_planned", "queue": [], "error": "no input"` |
| Sub-agent parse failure | Use raw output, include in summary |

View File

@@ -0,0 +1,383 @@
---
name: codex-planex
description: |
Plan-and-Execute pipeline with Wave Pipeline pattern.
Orchestrator coordinates planner (Deep Interaction) and executors (Parallel Fan-out).
Planner produces wave queues, executors implement solutions concurrently.
agents: 4
phases: 4
---
# Codex PlanEx
边规划边执行流水线。编排器通过 Wave Pipeline 协调 planner 和 executor(s)planner 完成一个 wave 的规划后输出执行队列,编排器立即为该 wave 派发 executor agents同时 planner 继续规划下一 wave。
## Architecture Overview
```
┌─────────────────────────────────────────────────────────────┐
│ Orchestrator (this file) │
│ → Parse input → Manage planner → Dispatch executors │
└───────────┬──────────────────────────────────────┬──────────┘
│ │
┌──────┴──────┐ ┌──────────┴──────────┐
│ Planner │ │ Executors (N) │
│ (Deep │ │ (Parallel Fan-out) │
│ Interaction│ │ │
│ multi-round│ │ exec-1 exec-2 ... │
└──────┬──────┘ └──────────┬──────────┘
│ │
┌──────┴──────┐ ┌──────────┴──────────┐
│ issue-plan │ │ code-developer │
│ issue-queue │ │ (role reference) │
│ (existing) │ │ │
└─────────────┘ └─────────────────────┘
```
**Wave Pipeline Flow**:
```
Planner Round 1 → Wave 1 queue
↓ (spawn executors for wave 1)
↓ send_input → Planner Round 2 → Wave 2 queue
↓ (spawn executors for wave 2)
...
↓ Planner outputs "ALL_PLANNED"
↓ wait for all executor agents
↓ Aggregate results → Done
```
## Agent Registry
| Agent | Role File | Responsibility | New/Existing |
|-------|-----------|----------------|--------------|
| `planex-planner` | `~/.codex/agents/planex-planner.md` | 需求拆解 → issue 创建 → 方案设计 → 队列编排 | New |
| `planex-executor` | `~/.codex/agents/planex-executor.md` | 加载 solution → 代码实现 → 测试 → 提交 | New |
| `issue-plan-agent` | `~/.codex/agents/issue-plan-agent.md` | Closed-loop: ACE 探索 + solution 生成 | Existing |
| `issue-queue-agent` | `~/.codex/agents/issue-queue-agent.md` | Solution 排序 + 冲突检测 → 执行队列 | Existing |
## Input Types
支持 3 种输入方式(通过 orchestrator 参数传入):
| 输入类型 | 格式 | 示例 |
|----------|------|------|
| Issue IDs | 直接传入 ID | `ISS-20260215-001 ISS-20260215-002` |
| 需求文本 | `--text '...'` | `--text '实现用户认证模块'` |
| Plan 文件 | `--plan path` | `--plan plan/2026-02-15-auth.md` |
## Phase Execution
### Phase 1: Input Parsing (Orchestrator Inline)
```javascript
// Parse input arguments
const args = orchestratorInput
const issueIds = args.match(/ISS-\d{8}-\d{6}/g) || []
const textMatch = args.match(/--text\s+['"]([^'"]+)['"]/)
const planMatch = args.match(/--plan\s+(\S+)/)
let inputType = 'unknown'
if (issueIds.length > 0) inputType = 'issue_ids'
else if (textMatch) inputType = 'text'
else if (planMatch) inputType = 'plan_file'
else inputType = 'text_from_description'
const inputPayload = {
type: inputType,
issueIds: issueIds,
text: textMatch ? textMatch[1] : args,
planFile: planMatch ? planMatch[1] : null
}
```
### Phase 2: Planning (Deep Interaction with Planner)
```javascript
// Track all agents for cleanup
const allAgentIds = []
// Spawn planner agent
const plannerId = spawn_agent({
message: `
## TASK ASSIGNMENT
### MANDATORY FIRST STEPS (Agent Execute)
1. **Read role definition**: ~/.codex/agents/planex-planner.md (MUST read first)
2. Read: .workflow/project-tech.json
3. Read: .workflow/project-guidelines.json
---
Goal: 分析需求并完成第一波 (Wave 1) 的规划。输出执行队列。
Input:
${JSON.stringify(inputPayload, null, 2)}
Scope:
- Include: 需求分析、issue 创建、方案设计、队列编排
- Exclude: 代码实现、测试执行、git 操作
Deliverables:
输出严格遵循以下 JSON 格式:
\`\`\`json
{
"wave": 1,
"status": "wave_ready" | "all_planned",
"issues": ["ISS-xxx", ...],
"queue": [
{
"issue_id": "ISS-xxx",
"solution_id": "SOL-xxx",
"title": "描述",
"priority": "normal",
"depends_on": []
}
],
"remaining_issues": ["ISS-yyy", ...],
"summary": "本波次规划摘要"
}
\`\`\`
Quality bar:
- 每个 issue 必须有绑定的 solution
- 队列必须按依赖排序
- 每波最多 5 个 issues
`
})
allAgentIds.push(plannerId)
// Wait for planner Wave 1 output
let plannerResult = wait({ ids: [plannerId], timeout_ms: 900000 })
if (plannerResult.timed_out) {
send_input({ id: plannerId, message: "请尽快输出当前已完成的规划结果。" })
plannerResult = wait({ ids: [plannerId], timeout_ms: 120000 })
}
// Parse planner output
let waveData = parseWaveOutput(plannerResult.status[plannerId].completed)
```
### Phase 3: Wave Execution Loop
```javascript
const executorResults = []
let waveNum = 0
while (true) {
waveNum++
// ─── Dispatch executors for current wave (Parallel Fan-out) ───
const waveExecutors = waveData.queue.map(entry =>
spawn_agent({
message: `
## TASK ASSIGNMENT
### MANDATORY FIRST STEPS (Agent Execute)
1. **Read role definition**: ~/.codex/agents/planex-executor.md (MUST read first)
2. Read: .workflow/project-tech.json
3. Read: .workflow/project-guidelines.json
---
Goal: 实现 ${entry.issue_id} 的 solution
Issue: ${entry.issue_id}
Solution: ${entry.solution_id}
Title: ${entry.title}
Priority: ${entry.priority}
Dependencies: ${entry.depends_on?.join(', ') || 'none'}
Scope:
- Include: 加载 solution plan、代码实现、测试运行、git commit
- Exclude: issue 创建、方案修改、队列变更
Deliverables:
输出严格遵循以下格式:
\`\`\`json
{
"issue_id": "${entry.issue_id}",
"status": "success" | "failed",
"files_changed": ["path/to/file", ...],
"tests_passed": true | false,
"committed": true | false,
"commit_hash": "abc123" | null,
"error": null | "错误描述",
"summary": "实现摘要"
}
\`\`\`
Quality bar:
- solution plan 中的所有任务必须实现
- 现有测试不能 break
- 遵循项目编码规范
- 每个变更必须 commit
`
})
)
allAgentIds.push(...waveExecutors)
// ─── Check if more waves needed ───
if (waveData.status === 'all_planned') {
// No more waves — wait for current executors and finish
const execResults = wait({ ids: waveExecutors, timeout_ms: 1200000 })
waveExecutors.forEach((id, i) => {
executorResults.push({
wave: waveNum,
issue: waveData.queue[i].issue_id,
result: execResults.status[id]?.completed || 'timeout'
})
})
break
}
// ─── Request next wave from planner (while executors run) ───
send_input({
id: plannerId,
message: `
## WAVE ${waveNum} 已派发
已为 Wave ${waveNum} 创建 ${waveExecutors.length} 个 executor agents。
## NEXT
请继续规划下一波 (Wave ${waveNum + 1})。
剩余 issues: ${JSON.stringify(waveData.remaining_issues)}
输出格式同前。如果所有 issues 已规划完毕status 设为 "all_planned"。
`
})
// ─── Wait for both: executors (current wave) + planner (next wave) ───
const allWaiting = [...waveExecutors, plannerId]
const batchResult = wait({ ids: allWaiting, timeout_ms: 1200000 })
// Collect executor results
waveExecutors.forEach((id, i) => {
executorResults.push({
wave: waveNum,
issue: waveData.queue[i].issue_id,
result: batchResult.status[id]?.completed || 'timeout'
})
})
// Parse next wave from planner
if (batchResult.status[plannerId]?.completed) {
waveData = parseWaveOutput(batchResult.status[plannerId].completed)
} else {
// Planner timed out — wait more
const plannerRetry = wait({ ids: [plannerId], timeout_ms: 300000 })
if (plannerRetry.timed_out) {
// Abort pipeline
break
}
waveData = parseWaveOutput(plannerRetry.status[plannerId].completed)
}
}
```
### Phase 4: Aggregation & Cleanup
```javascript
// ─── Aggregate results ───
const succeeded = executorResults.filter(r => {
try {
const parsed = JSON.parse(r.result)
return parsed.status === 'success'
} catch { return false }
})
const failed = executorResults.filter(r => {
try {
const parsed = JSON.parse(r.result)
return parsed.status === 'failed'
} catch { return true }
})
// ─── Output final report ───
const report = `
## PlanEx Pipeline Complete
**Waves**: ${waveNum}
**Total Issues**: ${executorResults.length}
**Succeeded**: ${succeeded.length}
**Failed**: ${failed.length}
### Results by Wave
${executorResults.map(r => `- Wave ${r.wave} | ${r.issue} | ${(() => {
try { return JSON.parse(r.result).status } catch { return 'error' }
})()}`).join('\n')}
${failed.length > 0 ? `### Failed Issues
${failed.map(r => `- ${r.issue}: ${(() => {
try { return JSON.parse(r.result).error } catch { return r.result.slice(0, 200) }
})()}`).join('\n')}` : ''}
`
console.log(report)
// ─── Lifecycle cleanup ───
allAgentIds.forEach(id => {
try { close_agent({ id }) } catch { /* already closed */ }
})
```
## Helper Functions
```javascript
function parseWaveOutput(output) {
// Extract JSON block from agent output
const jsonMatch = output.match(/```json\s*([\s\S]*?)```/)
if (jsonMatch) {
try { return JSON.parse(jsonMatch[1]) } catch {}
}
// Fallback: try parsing entire output as JSON
try { return JSON.parse(output) } catch {}
// Last resort: return empty wave with all_planned
return { wave: 0, status: 'all_planned', queue: [], remaining_issues: [], summary: 'Parse failed' }
}
```
## Configuration
```javascript
const CONFIG = {
sessionDir: ".workflow/.team/PEX-{slug}-{date}/",
issueDataDir: ".workflow/issues/",
maxWaveSize: 5,
plannerTimeout: 900000, // 15 min
executorTimeout: 1200000, // 20 min
maxWaves: 10
}
```
## Lifecycle Management
### Timeout Handling
| Scenario | Action |
|----------|--------|
| Planner wave timeout | send_input 催促收敛retry wait 120s |
| Executor timeout | 标记为 failed继续其他 executor |
| Batch wait partial timeout | 收集已完成结果,继续 pipeline |
| Pipeline stall (> 2 waves timeout) | 中止 pipeline输出部分结果 |
### Cleanup Protocol
```javascript
// All agents tracked in allAgentIds
// Final cleanup at end or on error
allAgentIds.forEach(id => {
try { close_agent({ id }) } catch { /* already closed */ }
})
```
## Error Handling
| Scenario | Resolution |
|----------|------------|
| Planner output parse failure | Retry with send_input asking for strict JSON |
| No issues created | Report error, abort pipeline |
| Solution planning failure | Skip issue, report in final results |
| Executor implementation failure | Mark as failed, continue with other executors |
| All executors in wave fail | Report wave failure, continue to next wave |
| Planner exits early | Treat as all_planned, finish current wave |