mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-03-01 15:03:57 +08:00
feat: add csv-wave-pipeline skill for wave-based CSV batch execution with context propagation
This commit is contained in:
834
.codex/skills/csv-wave-pipeline/SKILL.md
Normal file
834
.codex/skills/csv-wave-pipeline/SKILL.md
Normal file
@@ -0,0 +1,834 @@
|
||||
---
|
||||
name: csv-wave-pipeline
|
||||
description: Requirement planning to wave-based CSV execution pipeline. Decomposes requirement into dependency-sorted CSV tasks, computes execution waves, runs wave-by-wave via spawn_agents_on_csv with cross-wave context propagation.
|
||||
argument-hint: "[-y|--yes] [-c|--concurrency N] [--continue] \"requirement description\""
|
||||
allowed-tools: spawn_agents_on_csv, Read, Write, Edit, Bash, Glob, Grep, AskUserQuestion
|
||||
---
|
||||
|
||||
## Auto Mode
|
||||
|
||||
When `--yes` or `-y`: Auto-confirm task decomposition, skip interactive validation, use defaults.
|
||||
|
||||
# CSV Wave Pipeline
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
$csv-wave-pipeline "Implement user authentication with OAuth, JWT, and 2FA"
|
||||
$csv-wave-pipeline -c 4 "Refactor payment module with Stripe and PayPal"
|
||||
$csv-wave-pipeline -y "Build notification system with email and SMS"
|
||||
$csv-wave-pipeline --continue "auth-20260228"
|
||||
```
|
||||
|
||||
**Flags**:
|
||||
- `-y, --yes`: Skip all confirmations (auto mode)
|
||||
- `-c, --concurrency N`: Max concurrent agents within each wave (default: 4)
|
||||
- `--continue`: Resume existing session
|
||||
|
||||
**Output Directory**: `.workflow/.csv-wave/{session-id}/`
|
||||
**Core Output**: `tasks.csv` (master state) + `results.csv` (final) + `discoveries.ndjson` (shared exploration) + `context.md` (human-readable report)
|
||||
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
Wave-based batch execution using `spawn_agents_on_csv` with **cross-wave context propagation**. Tasks are grouped into dependency waves; each wave executes concurrently, and its results feed into the next wave.
|
||||
|
||||
**Core workflow**: Decompose → Compute Waves → Execute Wave-by-Wave → Aggregate
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ CSV BATCH EXECUTION WORKFLOW │
|
||||
├─────────────────────────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ Phase 1: Requirement → CSV │
|
||||
│ ├─ Parse requirement into subtasks (3-10 tasks) │
|
||||
│ ├─ Identify dependencies (deps column) │
|
||||
│ ├─ Compute dependency waves (topological sort → depth grouping) │
|
||||
│ ├─ Generate tasks.csv with wave column │
|
||||
│ └─ User validates task breakdown (skip if -y) │
|
||||
│ │
|
||||
│ Phase 2: Wave Execution Engine │
|
||||
│ ├─ For each wave (1..N): │
|
||||
│ │ ├─ Build wave CSV (filter rows for this wave) │
|
||||
│ │ ├─ Inject previous wave findings into prev_context column │
|
||||
│ │ ├─ spawn_agents_on_csv(wave CSV) │
|
||||
│ │ ├─ Collect results, merge into master tasks.csv │
|
||||
│ │ └─ Check: any failed? → skip dependents or retry │
|
||||
│ └─ discoveries.ndjson shared across all waves (append-only) │
|
||||
│ │
|
||||
│ Phase 3: Results Aggregation │
|
||||
│ ├─ Export final results.csv │
|
||||
│ ├─ Generate context.md with all findings │
|
||||
│ ├─ Display summary: completed/failed/skipped per wave │
|
||||
│ └─ Offer: view results | retry failed | done │
|
||||
│ │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## CSV Schema
|
||||
|
||||
### tasks.csv (Master State)
|
||||
|
||||
```csv
|
||||
id,title,description,deps,context_from,wave,status,findings,files_modified,error
|
||||
1,Setup auth module,Create auth directory structure and base files,,,1,,,,
|
||||
2,Implement OAuth,Add OAuth provider integration with Google and GitHub,1,1,2,,,,
|
||||
3,Add JWT tokens,Implement JWT generation and validation,1,1,2,,,,
|
||||
4,Setup 2FA,Add TOTP-based 2FA with QR code generation,2;3,1;2;3,3,,,,
|
||||
```
|
||||
|
||||
**Columns**:
|
||||
|
||||
| Column | Phase | Description |
|
||||
|--------|-------|-------------|
|
||||
| `id` | Input | Unique task identifier (string) |
|
||||
| `title` | Input | Short task title |
|
||||
| `description` | Input | Detailed task description |
|
||||
| `deps` | Input | Semicolon-separated dependency task IDs (empty = no deps) |
|
||||
| `context_from` | Input | Semicolon-separated task IDs whose findings this task needs |
|
||||
| `wave` | Computed | Wave number (computed by topological sort, 1-based) |
|
||||
| `status` | Output | `pending` → `completed` / `failed` / `skipped` |
|
||||
| `findings` | Output | Key discoveries or implementation notes (max 500 chars) |
|
||||
| `files_modified` | Output | Semicolon-separated file paths |
|
||||
| `error` | Output | Error message if failed (empty if success) |
|
||||
|
||||
### Per-Wave CSV (Temporary)
|
||||
|
||||
Each wave generates a temporary `wave-{N}.csv` with an extra `prev_context` column:
|
||||
|
||||
```csv
|
||||
id,title,description,deps,context_from,wave,prev_context
|
||||
2,Implement OAuth,Add OAuth integration,1,1,2,"[Task 1] Created auth/ with index.ts and types.ts"
|
||||
3,Add JWT tokens,Implement JWT,1,1,2,"[Task 1] Created auth/ with index.ts and types.ts"
|
||||
```
|
||||
|
||||
The `prev_context` column is built from `context_from` by looking up completed tasks' `findings` in the master CSV.
|
||||
|
||||
---
|
||||
|
||||
## Output Artifacts
|
||||
|
||||
| File | Purpose | Lifecycle |
|
||||
|------|---------|-----------|
|
||||
| `tasks.csv` | Master state — all tasks with status/findings | Updated after each wave |
|
||||
| `wave-{N}.csv` | Per-wave input (temporary) | Created before wave, deleted after |
|
||||
| `results.csv` | Final export of all task results | Created in Phase 3 |
|
||||
| `discoveries.ndjson` | Shared exploration board across all agents | Append-only, carries across waves |
|
||||
| `context.md` | Human-readable execution report | Created in Phase 3 |
|
||||
|
||||
---
|
||||
|
||||
## Session Structure
|
||||
|
||||
```
|
||||
.workflow/.csv-wave/{session-id}/
|
||||
├── tasks.csv # Master state (updated per wave)
|
||||
├── results.csv # Final results export
|
||||
├── discoveries.ndjson # Shared discovery board (all agents)
|
||||
├── context.md # Human-readable report
|
||||
└── wave-{N}.csv # Temporary per-wave input (cleaned up)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Implementation
|
||||
|
||||
### Session Initialization
|
||||
|
||||
```javascript
|
||||
const getUtc8ISOString = () => new Date(Date.now() + 8 * 60 * 60 * 1000).toISOString()
|
||||
|
||||
// Parse flags
|
||||
const AUTO_YES = $ARGUMENTS.includes('--yes') || $ARGUMENTS.includes('-y')
|
||||
const continueMode = $ARGUMENTS.includes('--continue')
|
||||
const concurrencyMatch = $ARGUMENTS.match(/(?:--concurrency|-c)\s+(\d+)/)
|
||||
const maxConcurrency = concurrencyMatch ? parseInt(concurrencyMatch[1]) : 4
|
||||
|
||||
// Clean requirement text (remove flags)
|
||||
const requirement = $ARGUMENTS
|
||||
.replace(/--yes|-y|--continue|--concurrency\s+\d+|-c\s+\d+/g, '')
|
||||
.trim()
|
||||
|
||||
const slug = requirement.toLowerCase()
|
||||
.replace(/[^a-z0-9\u4e00-\u9fa5]+/g, '-')
|
||||
.substring(0, 40)
|
||||
const dateStr = getUtc8ISOString().substring(0, 10).replace(/-/g, '')
|
||||
const sessionId = `cwp-${slug}-${dateStr}`
|
||||
const sessionFolder = `.workflow/.csv-wave/${sessionId}`
|
||||
|
||||
// Continue mode: find existing session
|
||||
if (continueMode) {
|
||||
const existing = Bash(`ls -t .workflow/.csv-wave/ 2>/dev/null | head -1`).trim()
|
||||
if (existing) {
|
||||
sessionId = existing
|
||||
sessionFolder = `.workflow/.csv-wave/${sessionId}`
|
||||
// Read existing tasks.csv, find incomplete waves, resume from there
|
||||
const existingCsv = Read(`${sessionFolder}/tasks.csv`)
|
||||
// → jump to Phase 2 with remaining waves
|
||||
}
|
||||
}
|
||||
|
||||
Bash(`mkdir -p ${sessionFolder}`)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Phase 1: Requirement → CSV
|
||||
|
||||
**Objective**: Decompose requirement into tasks, compute dependency waves, generate tasks.csv.
|
||||
|
||||
**Steps**:
|
||||
|
||||
1. **Decompose Requirement**
|
||||
|
||||
```javascript
|
||||
// Use ccw cli to decompose requirement into subtasks
|
||||
Bash({
|
||||
command: `ccw cli -p "PURPOSE: Decompose requirement into 3-10 atomic tasks for batch agent execution.
|
||||
TASK:
|
||||
• Parse requirement into independent subtasks
|
||||
• Identify dependencies between tasks (which must complete before others)
|
||||
• Identify context flow (which tasks need previous tasks' findings)
|
||||
• Each task must be executable by a single agent with file read/write access
|
||||
MODE: analysis
|
||||
CONTEXT: @**/*
|
||||
EXPECTED: JSON object with tasks array. Each task: {id: string, title: string, description: string, deps: string[], context_from: string[]}. deps = task IDs that must complete first. context_from = task IDs whose findings are needed.
|
||||
CONSTRAINTS: 3-10 tasks | Each task is atomic | No circular deps | description must be specific enough for an agent to execute independently
|
||||
|
||||
REQUIREMENT: ${requirement}" --tool gemini --mode analysis --rule planning-breakdown-task-steps`,
|
||||
run_in_background: true
|
||||
})
|
||||
// Wait for CLI completion via hook callback
|
||||
// Parse JSON from CLI output → decomposedTasks[]
|
||||
```
|
||||
|
||||
2. **Compute Waves** (Topological Sort → Depth Grouping)
|
||||
|
||||
```javascript
|
||||
function computeWaves(tasks) {
|
||||
// Build adjacency: task.deps → predecessors
|
||||
const taskMap = new Map(tasks.map(t => [t.id, t]))
|
||||
const inDegree = new Map(tasks.map(t => [t.id, 0]))
|
||||
const adjList = new Map(tasks.map(t => [t.id, []]))
|
||||
|
||||
for (const task of tasks) {
|
||||
for (const dep of task.deps) {
|
||||
if (taskMap.has(dep)) {
|
||||
adjList.get(dep).push(task.id)
|
||||
inDegree.set(task.id, inDegree.get(task.id) + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BFS-based topological sort with depth tracking
|
||||
const queue = [] // [taskId, depth]
|
||||
const waveAssignment = new Map()
|
||||
|
||||
for (const [id, deg] of inDegree) {
|
||||
if (deg === 0) {
|
||||
queue.push([id, 1])
|
||||
waveAssignment.set(id, 1)
|
||||
}
|
||||
}
|
||||
|
||||
let maxWave = 1
|
||||
let idx = 0
|
||||
while (idx < queue.length) {
|
||||
const [current, depth] = queue[idx++]
|
||||
for (const next of adjList.get(current)) {
|
||||
const newDeg = inDegree.get(next) - 1
|
||||
inDegree.set(next, newDeg)
|
||||
const nextDepth = Math.max(waveAssignment.get(next) || 0, depth + 1)
|
||||
waveAssignment.set(next, nextDepth)
|
||||
if (newDeg === 0) {
|
||||
queue.push([next, nextDepth])
|
||||
maxWave = Math.max(maxWave, nextDepth)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Detect cycles: any task without wave assignment
|
||||
for (const task of tasks) {
|
||||
if (!waveAssignment.has(task.id)) {
|
||||
throw new Error(`Circular dependency detected involving task ${task.id}`)
|
||||
}
|
||||
}
|
||||
|
||||
return { waveAssignment, maxWave }
|
||||
}
|
||||
|
||||
const { waveAssignment, maxWave } = computeWaves(decomposedTasks)
|
||||
```
|
||||
|
||||
3. **Generate tasks.csv**
|
||||
|
||||
```javascript
|
||||
const header = 'id,title,description,deps,context_from,wave,status,findings,files_modified,error'
|
||||
const rows = decomposedTasks.map(task => {
|
||||
const wave = waveAssignment.get(task.id)
|
||||
return [
|
||||
task.id,
|
||||
csvEscape(task.title),
|
||||
csvEscape(task.description),
|
||||
task.deps.join(';'),
|
||||
task.context_from.join(';'),
|
||||
wave,
|
||||
'pending', // status
|
||||
'', // findings
|
||||
'', // files_modified
|
||||
'' // error
|
||||
].map(cell => `"${String(cell).replace(/"/g, '""')}"`).join(',')
|
||||
})
|
||||
|
||||
Write(`${sessionFolder}/tasks.csv`, [header, ...rows].join('\n'))
|
||||
```
|
||||
|
||||
4. **User Validation** (skip if AUTO_YES)
|
||||
|
||||
```javascript
|
||||
if (!AUTO_YES) {
|
||||
// Display task breakdown with wave assignment
|
||||
console.log(`\n## Task Breakdown (${decomposedTasks.length} tasks, ${maxWave} waves)\n`)
|
||||
for (let w = 1; w <= maxWave; w++) {
|
||||
const waveTasks = decomposedTasks.filter(t => waveAssignment.get(t.id) === w)
|
||||
console.log(`### Wave ${w} (${waveTasks.length} tasks, concurrent)`)
|
||||
waveTasks.forEach(t => console.log(` - [${t.id}] ${t.title}`))
|
||||
}
|
||||
|
||||
const answer = AskUserQuestion({
|
||||
questions: [{
|
||||
question: "Approve task breakdown?",
|
||||
header: "Validation",
|
||||
multiSelect: false,
|
||||
options: [
|
||||
{ label: "Approve", description: "Proceed with wave execution" },
|
||||
{ label: "Modify", description: `Edit ${sessionFolder}/tasks.csv manually, then --continue` },
|
||||
{ label: "Cancel", description: "Abort" }
|
||||
]
|
||||
}]
|
||||
}) // BLOCKS
|
||||
|
||||
if (answer.Validation === "Modify") {
|
||||
console.log(`Edit: ${sessionFolder}/tasks.csv\nResume: $csv-wave-pipeline --continue`)
|
||||
return
|
||||
} else if (answer.Validation === "Cancel") {
|
||||
return
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Success Criteria**:
|
||||
- tasks.csv created with valid schema and wave assignments
|
||||
- No circular dependencies
|
||||
- User approved (or AUTO_YES)
|
||||
|
||||
---
|
||||
|
||||
### Phase 2: Wave Execution Engine
|
||||
|
||||
**Objective**: Execute tasks wave-by-wave via `spawn_agents_on_csv`. Each wave sees previous waves' results.
|
||||
|
||||
**Steps**:
|
||||
|
||||
1. **Wave Loop**
|
||||
|
||||
```javascript
|
||||
const failedIds = new Set()
|
||||
const skippedIds = new Set()
|
||||
|
||||
for (let wave = 1; wave <= maxWave; wave++) {
|
||||
console.log(`\n## Wave ${wave}/${maxWave}\n`)
|
||||
|
||||
// 1. Read current master CSV
|
||||
const masterCsv = parseCsv(Read(`${sessionFolder}/tasks.csv`))
|
||||
|
||||
// 2. Filter tasks for this wave
|
||||
const waveTasks = masterCsv.filter(row => parseInt(row.wave) === wave)
|
||||
|
||||
// 3. Skip tasks whose deps failed
|
||||
const executableTasks = []
|
||||
for (const task of waveTasks) {
|
||||
const deps = task.deps.split(';').filter(Boolean)
|
||||
if (deps.some(d => failedIds.has(d) || skippedIds.has(d))) {
|
||||
skippedIds.add(task.id)
|
||||
// Update master CSV: mark as skipped
|
||||
updateMasterCsvRow(sessionFolder, task.id, {
|
||||
status: 'skipped',
|
||||
error: 'Dependency failed or skipped'
|
||||
})
|
||||
console.log(` [${task.id}] ${task.title} → SKIPPED (dependency failed)`)
|
||||
continue
|
||||
}
|
||||
executableTasks.push(task)
|
||||
}
|
||||
|
||||
if (executableTasks.length === 0) {
|
||||
console.log(` No executable tasks in wave ${wave}`)
|
||||
continue
|
||||
}
|
||||
|
||||
// 4. Build prev_context for each task
|
||||
for (const task of executableTasks) {
|
||||
const contextIds = task.context_from.split(';').filter(Boolean)
|
||||
const prevFindings = contextIds
|
||||
.map(id => {
|
||||
const prevRow = masterCsv.find(r => r.id === id)
|
||||
if (prevRow && prevRow.status === 'completed' && prevRow.findings) {
|
||||
return `[Task ${id}: ${prevRow.title}] ${prevRow.findings}`
|
||||
}
|
||||
return null
|
||||
})
|
||||
.filter(Boolean)
|
||||
.join('\n')
|
||||
task.prev_context = prevFindings || 'No previous context available'
|
||||
}
|
||||
|
||||
// 5. Write wave CSV
|
||||
const waveHeader = 'id,title,description,deps,context_from,wave,prev_context'
|
||||
const waveRows = executableTasks.map(t =>
|
||||
[t.id, t.title, t.description, t.deps, t.context_from, t.wave, t.prev_context]
|
||||
.map(cell => `"${String(cell).replace(/"/g, '""')}"`)
|
||||
.join(',')
|
||||
)
|
||||
Write(`${sessionFolder}/wave-${wave}.csv`, [waveHeader, ...waveRows].join('\n'))
|
||||
|
||||
// 6. Execute wave
|
||||
console.log(` Executing ${executableTasks.length} tasks (concurrency: ${maxConcurrency})...`)
|
||||
|
||||
const waveResult = spawn_agents_on_csv({
|
||||
csv_path: `${sessionFolder}/wave-${wave}.csv`,
|
||||
id_column: "id",
|
||||
instruction: buildInstructionTemplate(sessionFolder, wave),
|
||||
max_concurrency: maxConcurrency,
|
||||
max_runtime_seconds: 600,
|
||||
output_csv_path: `${sessionFolder}/wave-${wave}-results.csv`,
|
||||
output_schema: {
|
||||
type: "object",
|
||||
properties: {
|
||||
id: { type: "string" },
|
||||
status: { type: "string", enum: ["completed", "failed"] },
|
||||
findings: { type: "string" },
|
||||
files_modified: { type: "array", items: { type: "string" } },
|
||||
error: { type: "string" }
|
||||
},
|
||||
required: ["id", "status", "findings"]
|
||||
}
|
||||
})
|
||||
// ↑ Blocks until all agents in this wave complete
|
||||
|
||||
// 7. Merge results into master CSV
|
||||
const waveResults = parseCsv(Read(`${sessionFolder}/wave-${wave}-results.csv`))
|
||||
for (const result of waveResults) {
|
||||
updateMasterCsvRow(sessionFolder, result.id, {
|
||||
status: result.status,
|
||||
findings: result.findings || '',
|
||||
files_modified: (result.files_modified || []).join(';'),
|
||||
error: result.error || ''
|
||||
})
|
||||
|
||||
if (result.status === 'failed') {
|
||||
failedIds.add(result.id)
|
||||
console.log(` [${result.id}] ${result.title} → FAILED: ${result.error}`)
|
||||
} else {
|
||||
console.log(` [${result.id}] ${result.title} → COMPLETED`)
|
||||
}
|
||||
}
|
||||
|
||||
// 8. Cleanup temporary wave CSV
|
||||
Bash(`rm -f "${sessionFolder}/wave-${wave}.csv"`)
|
||||
|
||||
console.log(` Wave ${wave} done: ${waveResults.filter(r => r.status === 'completed').length} completed, ${waveResults.filter(r => r.status === 'failed').length} failed`)
|
||||
}
|
||||
```
|
||||
|
||||
2. **Instruction Template Builder**
|
||||
|
||||
```javascript
|
||||
function buildInstructionTemplate(sessionFolder, wave) {
|
||||
return `
|
||||
## TASK ASSIGNMENT
|
||||
|
||||
### MANDATORY FIRST STEPS
|
||||
1. Read shared discoveries: ${sessionFolder}/discoveries.ndjson (if exists, skip if not)
|
||||
2. Read project context: .workflow/project-tech.json (if exists)
|
||||
|
||||
---
|
||||
|
||||
## Your Task
|
||||
|
||||
**Task ID**: {id}
|
||||
**Title**: {title}
|
||||
**Description**: {description}
|
||||
|
||||
### Previous Tasks' Findings (Context)
|
||||
{prev_context}
|
||||
|
||||
---
|
||||
|
||||
## Execution Protocol
|
||||
|
||||
1. **Read discoveries**: Load ${sessionFolder}/discoveries.ndjson for shared exploration findings
|
||||
2. **Use context**: Apply previous tasks' findings from prev_context above
|
||||
3. **Execute**: Implement the task as described
|
||||
4. **Share discoveries**: Append exploration findings to shared board:
|
||||
\`\`\`bash
|
||||
echo '{"ts":"<ISO8601>","worker":"{id}","type":"<type>","data":{...}}' >> ${sessionFolder}/discoveries.ndjson
|
||||
\`\`\`
|
||||
5. **Report result**: Return JSON via report_agent_job_result
|
||||
|
||||
### Discovery Types to Share
|
||||
- \`code_pattern\`: {name, file, description} — reusable patterns found
|
||||
- \`integration_point\`: {file, description, exports[]} — module connection points
|
||||
- \`convention\`: {naming, imports, formatting} — code style conventions
|
||||
- \`blocker\`: {issue, severity, impact} — blocking issues encountered
|
||||
|
||||
---
|
||||
|
||||
## Output (report_agent_job_result)
|
||||
|
||||
Return JSON:
|
||||
{
|
||||
"id": "{id}",
|
||||
"status": "completed" | "failed",
|
||||
"findings": "Key discoveries and implementation notes (max 500 chars)",
|
||||
"files_modified": ["path1", "path2"],
|
||||
"error": ""
|
||||
}
|
||||
`
|
||||
}
|
||||
```
|
||||
|
||||
3. **Master CSV Update Helper**
|
||||
|
||||
```javascript
|
||||
function updateMasterCsvRow(sessionFolder, taskId, updates) {
|
||||
const csvPath = `${sessionFolder}/tasks.csv`
|
||||
const content = Read(csvPath)
|
||||
const lines = content.split('\n')
|
||||
const header = lines[0].split(',')
|
||||
|
||||
for (let i = 1; i < lines.length; i++) {
|
||||
const cells = parseCsvLine(lines[i])
|
||||
if (cells[0] === taskId || cells[0] === `"${taskId}"`) {
|
||||
// Update specified columns
|
||||
for (const [col, val] of Object.entries(updates)) {
|
||||
const colIdx = header.indexOf(col)
|
||||
if (colIdx >= 0) {
|
||||
cells[colIdx] = `"${String(val).replace(/"/g, '""')}"`
|
||||
}
|
||||
}
|
||||
lines[i] = cells.join(',')
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
Write(csvPath, lines.join('\n'))
|
||||
}
|
||||
```
|
||||
|
||||
**Success Criteria**:
|
||||
- All waves executed in order
|
||||
- Each wave's results merged into master CSV before next wave starts
|
||||
- Dependent tasks skipped when predecessor failed
|
||||
- discoveries.ndjson accumulated across all waves
|
||||
|
||||
---
|
||||
|
||||
### Phase 3: Results Aggregation
|
||||
|
||||
**Objective**: Generate final results and human-readable report.
|
||||
|
||||
**Steps**:
|
||||
|
||||
1. **Export results.csv**
|
||||
|
||||
```javascript
|
||||
const masterCsv = Read(`${sessionFolder}/tasks.csv`)
|
||||
// results.csv = master CSV (already has all results populated)
|
||||
Write(`${sessionFolder}/results.csv`, masterCsv)
|
||||
```
|
||||
|
||||
2. **Generate context.md**
|
||||
|
||||
```javascript
|
||||
const tasks = parseCsv(masterCsv)
|
||||
const completed = tasks.filter(t => t.status === 'completed')
|
||||
const failed = tasks.filter(t => t.status === 'failed')
|
||||
const skipped = tasks.filter(t => t.status === 'skipped')
|
||||
|
||||
const contextContent = `# CSV Batch Execution Report
|
||||
|
||||
**Session**: ${sessionId}
|
||||
**Requirement**: ${requirement}
|
||||
**Completed**: ${getUtc8ISOString()}
|
||||
**Waves**: ${maxWave} | **Concurrency**: ${maxConcurrency}
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
| Metric | Count |
|
||||
|--------|-------|
|
||||
| Total Tasks | ${tasks.length} |
|
||||
| Completed | ${completed.length} |
|
||||
| Failed | ${failed.length} |
|
||||
| Skipped | ${skipped.length} |
|
||||
|
||||
---
|
||||
|
||||
## Wave Execution
|
||||
|
||||
${Array.from({ length: maxWave }, (_, i) => i + 1).map(w => {
|
||||
const waveTasks = tasks.filter(t => parseInt(t.wave) === w)
|
||||
return `### Wave ${w}
|
||||
${waveTasks.map(t => `- **[${t.id}] ${t.title}**: ${t.status}${t.error ? ' — ' + t.error : ''}
|
||||
${t.findings ? 'Findings: ' + t.findings : ''}`).join('\n')}`
|
||||
}).join('\n\n')}
|
||||
|
||||
---
|
||||
|
||||
## Task Details
|
||||
|
||||
${tasks.map(t => `### ${t.id}: ${t.title}
|
||||
|
||||
| Field | Value |
|
||||
|-------|-------|
|
||||
| Status | ${t.status} |
|
||||
| Wave | ${t.wave} |
|
||||
| Dependencies | ${t.deps || 'none'} |
|
||||
| Context From | ${t.context_from || 'none'} |
|
||||
| Error | ${t.error || 'none'} |
|
||||
|
||||
**Findings**: ${t.findings || 'N/A'}
|
||||
|
||||
**Files Modified**: ${t.files_modified || 'none'}
|
||||
`).join('\n---\n')}
|
||||
|
||||
---
|
||||
|
||||
## All Modified Files
|
||||
|
||||
${[...new Set(tasks.flatMap(t => (t.files_modified || '').split(';')).filter(Boolean))].map(f => '- ' + f).join('\n') || 'None'}
|
||||
`
|
||||
|
||||
Write(`${sessionFolder}/context.md`, contextContent)
|
||||
```
|
||||
|
||||
3. **Display Summary**
|
||||
|
||||
```javascript
|
||||
console.log(`
|
||||
## Execution Complete
|
||||
|
||||
- **Session**: ${sessionId}
|
||||
- **Waves**: ${maxWave}
|
||||
- **Completed**: ${completed.length}/${tasks.length}
|
||||
- **Failed**: ${failed.length}
|
||||
- **Skipped**: ${skipped.length}
|
||||
|
||||
**Results**: ${sessionFolder}/results.csv
|
||||
**Report**: ${sessionFolder}/context.md
|
||||
**Discoveries**: ${sessionFolder}/discoveries.ndjson
|
||||
`)
|
||||
```
|
||||
|
||||
4. **Offer Next Steps** (skip if AUTO_YES)
|
||||
|
||||
```javascript
|
||||
if (!AUTO_YES && failed.length > 0) {
|
||||
const answer = AskUserQuestion({
|
||||
questions: [{
|
||||
question: `${failed.length} tasks failed. Next action?`,
|
||||
header: "Next Step",
|
||||
multiSelect: false,
|
||||
options: [
|
||||
{ label: "Retry Failed", description: `Re-execute ${failed.length} failed tasks with updated context` },
|
||||
{ label: "View Report", description: "Display context.md" },
|
||||
{ label: "Done", description: "Complete session" }
|
||||
]
|
||||
}]
|
||||
}) // BLOCKS
|
||||
|
||||
if (answer['Next Step'] === "Retry Failed") {
|
||||
// Reset failed tasks to pending, re-run Phase 2 for their waves
|
||||
for (const task of failed) {
|
||||
updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
|
||||
}
|
||||
// Also reset skipped tasks whose deps are now retrying
|
||||
for (const task of skipped) {
|
||||
updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
|
||||
}
|
||||
// Re-execute Phase 2 (loop will skip already-completed tasks)
|
||||
// → goto Phase 2
|
||||
} else if (answer['Next Step'] === "View Report") {
|
||||
console.log(Read(`${sessionFolder}/context.md`))
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Success Criteria**:
|
||||
- results.csv exported
|
||||
- context.md generated
|
||||
- Summary displayed to user
|
||||
|
||||
---
|
||||
|
||||
## Shared Discovery Board Protocol
|
||||
|
||||
All agents across all waves share `discoveries.ndjson`. This eliminates redundant codebase exploration.
|
||||
|
||||
**Lifecycle**:
|
||||
- Created by the first agent to write a discovery
|
||||
- Carries over across waves — never cleared
|
||||
- Agents append via `echo '...' >> discoveries.ndjson`
|
||||
|
||||
**Format**: NDJSON, each line is a self-contained JSON:
|
||||
|
||||
```jsonl
|
||||
{"ts":"2026-02-28T10:00:00+08:00","worker":"1","type":"code_pattern","data":{"name":"repository-pattern","file":"src/repos/Base.ts","description":"Abstract CRUD repository"}}
|
||||
{"ts":"2026-02-28T10:01:00+08:00","worker":"2","type":"integration_point","data":{"file":"src/auth/index.ts","description":"Auth module entry","exports":["authenticate","authorize"]}}
|
||||
```
|
||||
|
||||
**Discovery Types**:
|
||||
|
||||
| type | Dedup Key | Description |
|
||||
|------|-----------|-------------|
|
||||
| `code_pattern` | `data.name` | Reusable code pattern found |
|
||||
| `integration_point` | `data.file` | Module connection point |
|
||||
| `convention` | singleton | Code style conventions |
|
||||
| `blocker` | `data.issue` | Blocking issue encountered |
|
||||
| `tech_stack` | singleton | Project technology stack |
|
||||
| `test_command` | singleton | Test commands discovered |
|
||||
|
||||
**Protocol Rules**:
|
||||
1. Read board before own exploration → skip covered areas
|
||||
2. Write discoveries immediately via `echo >>` → don't batch
|
||||
3. Deduplicate — check existing entries; skip if same type + dedup key exists
|
||||
4. Append-only — never modify or delete existing lines
|
||||
|
||||
---
|
||||
|
||||
## Wave Computation Details
|
||||
|
||||
### Algorithm
|
||||
|
||||
Kahn's BFS topological sort with depth tracking:
|
||||
|
||||
```
|
||||
Input: tasks[] with deps[]
|
||||
Output: waveAssignment (taskId → wave number)
|
||||
|
||||
1. Build in-degree map and adjacency list from deps
|
||||
2. Enqueue all tasks with in-degree 0 at wave 1
|
||||
3. BFS: for each dequeued task at wave W:
|
||||
- For each dependent task D:
|
||||
- Decrement D's in-degree
|
||||
- D.wave = max(D.wave, W + 1)
|
||||
- If D's in-degree reaches 0, enqueue D
|
||||
4. Any task without wave assignment → circular dependency error
|
||||
```
|
||||
|
||||
### Wave Properties
|
||||
|
||||
- **Wave 1**: No dependencies — all tasks in wave 1 are fully independent
|
||||
- **Wave N**: All dependencies are in waves 1..(N-1) — guaranteed completed before wave N starts
|
||||
- **Within a wave**: Tasks are independent of each other → safe for concurrent execution
|
||||
|
||||
### Example
|
||||
|
||||
```
|
||||
Task A (no deps) → Wave 1
|
||||
Task B (no deps) → Wave 1
|
||||
Task C (deps: A) → Wave 2
|
||||
Task D (deps: A, B) → Wave 2
|
||||
Task E (deps: C, D) → Wave 3
|
||||
|
||||
Execution:
|
||||
Wave 1: [A, B] ← concurrent
|
||||
Wave 2: [C, D] ← concurrent, sees A+B findings
|
||||
Wave 3: [E] ← sees A+B+C+D findings
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Context Propagation Flow
|
||||
|
||||
```
|
||||
Wave 1 agents:
|
||||
├─ Execute tasks (no prev_context)
|
||||
├─ Write findings to report_agent_job_result
|
||||
└─ Append discoveries to discoveries.ndjson
|
||||
|
||||
↓ merge results into master CSV
|
||||
|
||||
Wave 2 agents:
|
||||
├─ Read discoveries.ndjson (exploration sharing)
|
||||
├─ Read prev_context column (wave 1 findings from context_from)
|
||||
├─ Execute tasks with full upstream context
|
||||
├─ Write findings to report_agent_job_result
|
||||
└─ Append new discoveries to discoveries.ndjson
|
||||
|
||||
↓ merge results into master CSV
|
||||
|
||||
Wave 3 agents:
|
||||
├─ Read discoveries.ndjson (accumulated from waves 1+2)
|
||||
├─ Read prev_context column (wave 1+2 findings from context_from)
|
||||
├─ Execute tasks
|
||||
└─ ...
|
||||
```
|
||||
|
||||
**Two context channels**:
|
||||
1. **CSV findings** (structured): `context_from` column → `prev_context` injection — task-specific directed context
|
||||
2. **NDJSON discoveries** (broadcast): `discoveries.ndjson` — general exploration findings available to all
|
||||
|
||||
---
|
||||
|
||||
## Error Handling
|
||||
|
||||
| Error | Resolution |
|
||||
|-------|------------|
|
||||
| Circular dependency | Detect in wave computation, abort with error message |
|
||||
| Agent timeout | Mark as failed in results, continue with wave |
|
||||
| Agent failed | Mark as failed, skip dependent tasks in later waves |
|
||||
| All agents in wave failed | Log error, offer retry or abort |
|
||||
| CSV parse error | Validate CSV format before execution, show line number |
|
||||
| discoveries.ndjson corrupt | Ignore malformed lines, continue with valid entries |
|
||||
| Continue mode: no session found | List available sessions, prompt user to select |
|
||||
|
||||
---
|
||||
|
||||
## Core Rules
|
||||
|
||||
1. **Start Immediately**: First action is session initialization, then Phase 1
|
||||
2. **Wave Order is Sacred**: Never execute wave N before wave N-1 completes and results are merged
|
||||
3. **CSV is Source of Truth**: Master tasks.csv holds all state — always read before wave, always write after
|
||||
4. **Context Propagation**: prev_context built from master CSV, not from memory
|
||||
5. **Discovery Board is Append-Only**: Never clear, modify, or recreate discoveries.ndjson
|
||||
6. **Skip on Failure**: If a dependency failed, skip the dependent task (don't attempt)
|
||||
7. **Cleanup Temp Files**: Remove wave-{N}.csv after results are merged
|
||||
8. **DO NOT STOP**: Continuous execution until all waves complete or all remaining tasks are skipped
|
||||
|
||||
---
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Task Granularity**: 3-10 tasks optimal; too many = overhead, too few = no parallelism benefit
|
||||
2. **Minimize Cross-Wave Deps**: More tasks in wave 1 = more parallelism
|
||||
3. **Specific Descriptions**: Agent sees only its CSV row + prev_context — make description self-contained
|
||||
4. **Context From ≠ Deps**: `deps` = execution order constraint; `context_from` = information flow. A task can have `context_from` without `deps` (it just reads previous findings but doesn't require them to be done first in its wave)
|
||||
5. **Concurrency Tuning**: `-c 1` for serial execution (maximum context sharing); `-c 8` for I/O-bound tasks
|
||||
|
||||
---
|
||||
|
||||
## Usage Recommendations
|
||||
|
||||
| Scenario | Recommended Approach |
|
||||
|----------|---------------------|
|
||||
| Independent parallel tasks (no deps) | `$csv-wave-pipeline -c 8` — single wave, max parallelism |
|
||||
| Linear pipeline (A→B→C) | `$csv-wave-pipeline -c 1` — 3 waves, serial, full context |
|
||||
| Diamond dependency (A→B,C→D) | `$csv-wave-pipeline` — 3 waves, B+C concurrent in wave 2 |
|
||||
| Complex requirement, unclear tasks | Use `$roadmap-with-file` first for planning, then feed issues here |
|
||||
| Single complex task | Use `$lite-execute` instead |
|
||||
Reference in New Issue
Block a user