From 64e772f9b8fa1c996cc3e17d9ea585308d93e5e2 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Wed, 4 Mar 2026 16:35:07 +0800 Subject: [PATCH] feat: add message bus (.msg/) to codex team-lifecycle skill Align codex version with Claude version's team_msg message bus for frontend pipeline UI integration: - Phase 2: create .msg/ dir, write meta.json with pipeline_stages/roles, init messages.jsonl with session event - Phase 4: log task completion events, update role_state in meta.json - Phase 5: update meta.json status to completed, log shutdown event - SKILL.md: document .msg/ in session directory and add schema reference --- .codex/skills/team-lifecycle/SKILL.md | 53 +++++++++ .../phases/02-team-initialization.md | 62 +++++++++- .../phases/04-pipeline-coordination.md | 112 ++++++++++++++++++ .../phases/05-completion-report.md | 21 ++++ 4 files changed, 246 insertions(+), 2 deletions(-) diff --git a/.codex/skills/team-lifecycle/SKILL.md b/.codex/skills/team-lifecycle/SKILL.md index 7f494c87..8db21c52 100644 --- a/.codex/skills/team-lifecycle/SKILL.md +++ b/.codex/skills/team-lifecycle/SKILL.md @@ -457,6 +457,9 @@ TASK_COMPLETE: | +-- decisions.md # Architecture and design decisions | +-- conventions.md # Codebase conventions | +-- issues.md # Known risks and issues ++-- .msg/ # Message bus (UI integration) +| +-- meta.json # Pipeline metadata (stages, roles, team_name) +| +-- messages.jsonl # NDJSON event log +-- shared-memory.json # Cross-agent state ``` @@ -505,6 +508,56 @@ The state file replaces Claude's TaskCreate/TaskList/TaskGet/TaskUpdate system. --- +## Message Bus (.msg/) + +The `.msg/` directory provides pipeline metadata for the frontend UI. This is the **same format** used by Claude version's `team_msg` tool with `type: "state_update"`. + +### meta.json + +Pipeline metadata read by the API for frontend display: + +```json +{ + "status": "active", + "pipeline_mode": "", + "pipeline_stages": ["role1", "role2", "..."], + "roles": ["coordinator", "role1", "role2", "..."], + "team_name": "lifecycle", + "role_state": { + "": { + "status": "completed", + "task_id": "TASK-ID", + "_updated_at": "" + } + }, + "updated_at": "" +} +``` + +**pipeline_stages by mode**: + +| Mode | pipeline_stages | +|------|-----------------| +| spec-only | `["analyst", "writer", "reviewer"]` | +| impl-only | `["planner", "executor", "tester", "reviewer"]` | +| fe-only | `["planner", "fe-developer", "fe-qa"]` | +| fullstack | `["planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"]` | +| full-lifecycle | `["analyst", "writer", "planner", "executor", "tester", "reviewer"]` | +| full-lifecycle-fe | `["analyst", "writer", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"]` | + +### messages.jsonl + +NDJSON event log (one JSON object per line): + +```json +{"id":"MSG-001","ts":"","from":"coordinator","to":"coordinator","type":"state_update","summary":"Session initialized","data":{...}} +{"id":"MSG-002","ts":"","from":"analyst","to":"coordinator","type":"impl_complete","summary":"RESEARCH-001 completed","data":{...}} +``` + +**Message types**: `state_update`, `impl_complete`, `impl_progress`, `test_result`, `review_result`, `error`, `shutdown` + +--- + ## Session Resume When the orchestrator detects an existing active/paused session: diff --git a/.codex/skills/team-lifecycle/phases/02-team-initialization.md b/.codex/skills/team-lifecycle/phases/02-team-initialization.md index a7700db9..0bb07ab9 100644 --- a/.codex/skills/team-lifecycle/phases/02-team-initialization.md +++ b/.codex/skills/team-lifecycle/phases/02-team-initialization.md @@ -52,6 +52,7 @@ mkdir -p "/architecture" mkdir -p "/analysis" mkdir -p "/qa" mkdir -p "/wisdom" +mkdir -p "/.msg" ``` **Directory purpose reference**: @@ -66,6 +67,7 @@ mkdir -p "/wisdom" | analysis/ | Analyst design intelligence (UI mode) | analyst | | qa/ | QA audit reports | fe-qa | | wisdom/ | Cross-task knowledge accumulation | all agents | +| .msg/ | Message bus: meta.json (pipeline metadata) + messages.jsonl (event log) | orchestrator | ### Step 2.3: Initialize Wisdom Directory @@ -155,7 +157,59 @@ Write("/team-session.json", JSON.stringify(state, null, 2)) ``` -### Step 2.8: Output Confirmation +### Step 2.8: Initialize Message Bus (CRITICAL for UI) + +Initialize `.msg/meta.json` with pipeline metadata so the frontend can display dynamic pipeline stages. This is the **same format** used by the Claude version's `team_msg` tool with `type: "state_update"`. + +**pipeline_stages**: Array of role names representing pipeline stages. The UI displays these as the pipeline workflow visualization. + +```javascript +// Determine pipeline_stages and roles based on mode +const PIPELINE_CONFIG = { + "spec-only": { stages: ["analyst", "writer", "reviewer"], roles: ["coordinator", "analyst", "writer", "reviewer"] }, + "impl-only": { stages: ["planner", "executor", "tester", "reviewer"], roles: ["coordinator", "planner", "executor", "tester", "reviewer"] }, + "fe-only": { stages: ["planner", "fe-developer", "fe-qa"], roles: ["coordinator", "planner", "fe-developer", "fe-qa"] }, + "fullstack": { stages: ["planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"], roles: ["coordinator", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"] }, + "full-lifecycle": { stages: ["analyst", "writer", "planner", "executor", "tester", "reviewer"], roles: ["coordinator", "analyst", "writer", "planner", "executor", "tester", "reviewer"] }, + "full-lifecycle-fe": { stages: ["analyst", "writer", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"], roles: ["coordinator", "analyst", "writer", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"] } +} + +const config = PIPELINE_CONFIG[requirements.mode] + +// Write meta.json (read by API for frontend pipeline display) +const meta = { + status: "active", + pipeline_mode: requirements.mode, + pipeline_stages: config.stages, + roles: config.roles, + team_name: "lifecycle", + updated_at: new Date().toISOString() +} + +Write("/.msg/meta.json", JSON.stringify(meta, null, 2)) + +// Initialize messages.jsonl with session init event +const initMsg = { + id: "MSG-001", + ts: new Date().toISOString(), + from: "coordinator", + to: "coordinator", + type: "state_update", + summary: "Session initialized", + data: { + pipeline_mode: requirements.mode, + pipeline_stages: config.stages, + roles: config.roles, + team_name: "lifecycle" + } +} + +Write("/.msg/messages.jsonl", JSON.stringify(initMsg) + "\n") +``` + +> **Why**: The frontend API reads `.msg/meta.json` to derive pipeline stages, roles, and team_name for the dynamic pipeline UI. Without this file, the UI falls back to hardcoded 4-stage pipeline or message inference which may be inaccurate. + +### Step 2.9: Output Confirmation ``` [orchestrator] Phase 2: Session initialized @@ -164,6 +218,7 @@ Write("/team-session.json", Mode: ( tasks) Scope: Execution: + Message bus: .msg/meta.json initialized ``` --- @@ -175,16 +230,19 @@ Write("/team-session.json", | sessionId | String | Passed to Phase 3 | | sessionDir | String | Passed to Phase 3 | | state | Object | Written to team-session.json, passed to Phase 3 | +| meta | Object | Written to .msg/meta.json (read by frontend API) | --- ## Success Criteria -- Session directory created with all subdirectories +- Session directory created with all subdirectories (including `.msg/`) - Wisdom files initialized (4 files) - Explorations cache-index.json created (empty entries) - Shared-memory.json created - team-session.json written with correct mode, scope, task count +- `.msg/meta.json` written with pipeline_stages, roles, team_name, pipeline_mode +- `.msg/messages.jsonl` initialized with session init event - State file is valid JSON and readable --- diff --git a/.codex/skills/team-lifecycle/phases/04-pipeline-coordination.md b/.codex/skills/team-lifecycle/phases/04-pipeline-coordination.md index 0a1b7440..061c67fd 100644 --- a/.codex/skills/team-lifecycle/phases/04-pipeline-coordination.md +++ b/.codex/skills/team-lifecycle/phases/04-pipeline-coordination.md @@ -272,6 +272,9 @@ Execution timeout reached for task ${agent.taskId}. Please: For each completed agent, extract TASK_COMPLETE data and update state. ```javascript +// Message counter for NDJSON log +let msgCounter = getLastMsgCounter("/.msg/messages.jsonl") + 1 + for (const agent of spawnedAgents) { const output = results.status[agent.agentId]?.completed if (!output) continue // handled in timeout section @@ -307,6 +310,26 @@ for (const agent of spawnedAgents) { // Close agent close_agent({ id: agent.agentId }) + // Log to message bus + logToMessageBus("", msgCounter++, { + from: task.owner, + type: "impl_complete", + summary: `${task.id} completed: ${taskResult.summary || task.status}`, + data: { + task_id: task.id, + status: task.status, + artifact: task.artifact_path, + discuss_verdict: task.discuss_verdict + } + }) + + // Update role_state in meta.json + updateMetaRoleState("", task.owner, { + status: task.status, + task_id: task.id, + completed_at: task.completed_at + }) + // Route by consensus verdict if (taskResult.discuss_verdict === "consensus_blocked") { handleConsensusBlocked(task, taskResult) @@ -714,6 +737,95 @@ result = wait([reviewer]) -> close_agent(reviewer) --- +## Message Bus Helpers + +Helper functions for writing to `.msg/messages.jsonl` and updating `.msg/meta.json`. These maintain compatibility with the Claude version's `team_msg` tool format. + +### logToMessageBus + +Append an NDJSON event to `messages.jsonl`: + +```javascript +function logToMessageBus(sessionDir, counter, { from, to, type, summary, data }) { + const msg = { + id: `MSG-${String(counter).padStart(3, '0')}`, + ts: new Date().toISOString(), + from: from || "coordinator", + to: to || "coordinator", + type: type || "message", + summary: summary || `${type} from ${from}`, + data: data || {} + } + + // Append to NDJSON log + appendToFile(`${sessionDir}/.msg/messages.jsonl`, JSON.stringify(msg) + "\n") +} +``` + +### updateMetaRoleState + +Update role-specific state in `meta.json`: + +```javascript +function updateMetaRoleState(sessionDir, role, stateData) { + const metaPath = `${sessionDir}/.msg/meta.json` + const meta = JSON.parse(Read(metaPath)) + + if (!meta.role_state) meta.role_state = {} + meta.role_state[role] = { + ...meta.role_state[role], + ...stateData, + _updated_at: new Date().toISOString() + } + meta.updated_at = new Date().toISOString() + + Write(metaPath, JSON.stringify(meta, null, 2)) +} +``` + +### getLastMsgCounter + +Read the last message counter from `messages.jsonl`: + +```javascript +function getLastMsgCounter(logPath) { + try { + const content = Read(logPath) + const lines = content.trim().split("\n").filter(Boolean) + if (lines.length === 0) return 0 + const lastMsg = JSON.parse(lines[lines.length - 1]) + return parseInt(lastMsg.id.replace("MSG-", ""), 10) + } catch { + return 0 + } +} +``` + +### meta.json Schema + +The `.msg/meta.json` file follows this schema (compatible with Claude version's `team_msg state_update`): + +```json +{ + "status": "active | paused | completed", + "pipeline_mode": "", + "pipeline_stages": ["role1", "role2", "..."], + "roles": ["coordinator", "role1", "role2", "..."], + "team_name": "lifecycle", + "role_state": { + "": { + "status": "completed", + "task_id": "TASK-ID", + "completed_at": "", + "_updated_at": "" + } + }, + "updated_at": "" +} +``` + +--- + ## Next Phase When pipeline completes (all tasks done, no paused checkpoint), proceed to [Phase 5: Completion Report](05-completion-report.md). diff --git a/.codex/skills/team-lifecycle/phases/05-completion-report.md b/.codex/skills/team-lifecycle/phases/05-completion-report.md index e54d4a44..3231ff92 100644 --- a/.codex/skills/team-lifecycle/phases/05-completion-report.md +++ b/.codex/skills/team-lifecycle/phases/05-completion-report.md @@ -138,6 +138,27 @@ state.completed_at = new Date().toISOString() Write("/team-session.json", JSON.stringify(state, null, 2)) + +// Update message bus meta.json status +const meta = JSON.parse(Read("/.msg/meta.json")) +meta.status = "completed" +meta.updated_at = new Date().toISOString() +Write("/.msg/meta.json", JSON.stringify(meta, null, 2)) + +// Log completion event to message bus +const msgCounter = getLastMsgCounter("/.msg/messages.jsonl") + 1 +logToMessageBus("", msgCounter, { + from: "coordinator", + type: "shutdown", + summary: `Pipeline completed: ${completedTasks}/${totalTasks} tasks (${successRate}%)`, + data: { + tasks_completed: completedTasks, + tasks_total: totalTasks, + success_rate: successRate, + duration_min: durationMin, + failed_tasks: failedTasks + } +}) ``` ### Step 5.8: Output Completion Report