feat: add message bus (.msg/) to codex team-lifecycle skill

Align codex version with Claude version's team_msg message bus for
frontend pipeline UI integration:

- Phase 2: create .msg/ dir, write meta.json with pipeline_stages/roles,
  init messages.jsonl with session event
- Phase 4: log task completion events, update role_state in meta.json
- Phase 5: update meta.json status to completed, log shutdown event
- SKILL.md: document .msg/ in session directory and add schema reference
This commit is contained in:
catlog22
2026-03-04 16:35:07 +08:00
parent e9f8a72343
commit 64e772f9b8
4 changed files with 246 additions and 2 deletions

View File

@@ -457,6 +457,9 @@ TASK_COMPLETE:
| +-- decisions.md # Architecture and design decisions
| +-- conventions.md # Codebase conventions
| +-- issues.md # Known risks and issues
+-- .msg/ # Message bus (UI integration)
| +-- meta.json # Pipeline metadata (stages, roles, team_name)
| +-- messages.jsonl # NDJSON event log
+-- shared-memory.json # Cross-agent state
```
@@ -505,6 +508,56 @@ The state file replaces Claude's TaskCreate/TaskList/TaskGet/TaskUpdate system.
---
## Message Bus (.msg/)
The `.msg/` directory provides pipeline metadata for the frontend UI. This is the **same format** used by Claude version's `team_msg` tool with `type: "state_update"`.
### meta.json
Pipeline metadata read by the API for frontend display:
```json
{
"status": "active",
"pipeline_mode": "<mode>",
"pipeline_stages": ["role1", "role2", "..."],
"roles": ["coordinator", "role1", "role2", "..."],
"team_name": "lifecycle",
"role_state": {
"<role>": {
"status": "completed",
"task_id": "TASK-ID",
"_updated_at": "<ISO8601>"
}
},
"updated_at": "<ISO8601>"
}
```
**pipeline_stages by mode**:
| Mode | pipeline_stages |
|------|-----------------|
| spec-only | `["analyst", "writer", "reviewer"]` |
| impl-only | `["planner", "executor", "tester", "reviewer"]` |
| fe-only | `["planner", "fe-developer", "fe-qa"]` |
| fullstack | `["planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"]` |
| full-lifecycle | `["analyst", "writer", "planner", "executor", "tester", "reviewer"]` |
| full-lifecycle-fe | `["analyst", "writer", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"]` |
### messages.jsonl
NDJSON event log (one JSON object per line):
```json
{"id":"MSG-001","ts":"<ISO8601>","from":"coordinator","to":"coordinator","type":"state_update","summary":"Session initialized","data":{...}}
{"id":"MSG-002","ts":"<ISO8601>","from":"analyst","to":"coordinator","type":"impl_complete","summary":"RESEARCH-001 completed","data":{...}}
```
**Message types**: `state_update`, `impl_complete`, `impl_progress`, `test_result`, `review_result`, `error`, `shutdown`
---
## Session Resume
When the orchestrator detects an existing active/paused session:

View File

@@ -52,6 +52,7 @@ mkdir -p "<session-dir>/architecture"
mkdir -p "<session-dir>/analysis"
mkdir -p "<session-dir>/qa"
mkdir -p "<session-dir>/wisdom"
mkdir -p "<session-dir>/.msg"
```
**Directory purpose reference**:
@@ -66,6 +67,7 @@ mkdir -p "<session-dir>/wisdom"
| analysis/ | Analyst design intelligence (UI mode) | analyst |
| qa/ | QA audit reports | fe-qa |
| wisdom/ | Cross-task knowledge accumulation | all agents |
| .msg/ | Message bus: meta.json (pipeline metadata) + messages.jsonl (event log) | orchestrator |
### Step 2.3: Initialize Wisdom Directory
@@ -155,7 +157,59 @@ Write("<session-dir>/team-session.json",
JSON.stringify(state, null, 2))
```
### Step 2.8: Output Confirmation
### Step 2.8: Initialize Message Bus (CRITICAL for UI)
Initialize `.msg/meta.json` with pipeline metadata so the frontend can display dynamic pipeline stages. This is the **same format** used by the Claude version's `team_msg` tool with `type: "state_update"`.
**pipeline_stages**: Array of role names representing pipeline stages. The UI displays these as the pipeline workflow visualization.
```javascript
// Determine pipeline_stages and roles based on mode
const PIPELINE_CONFIG = {
"spec-only": { stages: ["analyst", "writer", "reviewer"], roles: ["coordinator", "analyst", "writer", "reviewer"] },
"impl-only": { stages: ["planner", "executor", "tester", "reviewer"], roles: ["coordinator", "planner", "executor", "tester", "reviewer"] },
"fe-only": { stages: ["planner", "fe-developer", "fe-qa"], roles: ["coordinator", "planner", "fe-developer", "fe-qa"] },
"fullstack": { stages: ["planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"], roles: ["coordinator", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"] },
"full-lifecycle": { stages: ["analyst", "writer", "planner", "executor", "tester", "reviewer"], roles: ["coordinator", "analyst", "writer", "planner", "executor", "tester", "reviewer"] },
"full-lifecycle-fe": { stages: ["analyst", "writer", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"], roles: ["coordinator", "analyst", "writer", "planner", "executor", "fe-developer", "tester", "fe-qa", "reviewer"] }
}
const config = PIPELINE_CONFIG[requirements.mode]
// Write meta.json (read by API for frontend pipeline display)
const meta = {
status: "active",
pipeline_mode: requirements.mode,
pipeline_stages: config.stages,
roles: config.roles,
team_name: "lifecycle",
updated_at: new Date().toISOString()
}
Write("<session-dir>/.msg/meta.json", JSON.stringify(meta, null, 2))
// Initialize messages.jsonl with session init event
const initMsg = {
id: "MSG-001",
ts: new Date().toISOString(),
from: "coordinator",
to: "coordinator",
type: "state_update",
summary: "Session initialized",
data: {
pipeline_mode: requirements.mode,
pipeline_stages: config.stages,
roles: config.roles,
team_name: "lifecycle"
}
}
Write("<session-dir>/.msg/messages.jsonl", JSON.stringify(initMsg) + "\n")
```
> **Why**: The frontend API reads `.msg/meta.json` to derive pipeline stages, roles, and team_name for the dynamic pipeline UI. Without this file, the UI falls back to hardcoded 4-stage pipeline or message inference which may be inaccurate.
### Step 2.9: Output Confirmation
```
[orchestrator] Phase 2: Session initialized
@@ -164,6 +218,7 @@ Write("<session-dir>/team-session.json",
Mode: <mode> (<task-count> tasks)
Scope: <scope-summary>
Execution: <sequential | parallel>
Message bus: .msg/meta.json initialized
```
---
@@ -175,16 +230,19 @@ Write("<session-dir>/team-session.json",
| sessionId | String | Passed to Phase 3 |
| sessionDir | String | Passed to Phase 3 |
| state | Object | Written to team-session.json, passed to Phase 3 |
| meta | Object | Written to .msg/meta.json (read by frontend API) |
---
## Success Criteria
- Session directory created with all subdirectories
- Session directory created with all subdirectories (including `.msg/`)
- Wisdom files initialized (4 files)
- Explorations cache-index.json created (empty entries)
- Shared-memory.json created
- team-session.json written with correct mode, scope, task count
- `.msg/meta.json` written with pipeline_stages, roles, team_name, pipeline_mode
- `.msg/messages.jsonl` initialized with session init event
- State file is valid JSON and readable
---

View File

@@ -272,6 +272,9 @@ Execution timeout reached for task ${agent.taskId}. Please:
For each completed agent, extract TASK_COMPLETE data and update state.
```javascript
// Message counter for NDJSON log
let msgCounter = getLastMsgCounter("<session-dir>/.msg/messages.jsonl") + 1
for (const agent of spawnedAgents) {
const output = results.status[agent.agentId]?.completed
if (!output) continue // handled in timeout section
@@ -307,6 +310,26 @@ for (const agent of spawnedAgents) {
// Close agent
close_agent({ id: agent.agentId })
// Log to message bus
logToMessageBus("<session-dir>", msgCounter++, {
from: task.owner,
type: "impl_complete",
summary: `${task.id} completed: ${taskResult.summary || task.status}`,
data: {
task_id: task.id,
status: task.status,
artifact: task.artifact_path,
discuss_verdict: task.discuss_verdict
}
})
// Update role_state in meta.json
updateMetaRoleState("<session-dir>", task.owner, {
status: task.status,
task_id: task.id,
completed_at: task.completed_at
})
// Route by consensus verdict
if (taskResult.discuss_verdict === "consensus_blocked") {
handleConsensusBlocked(task, taskResult)
@@ -714,6 +737,95 @@ result = wait([reviewer]) -> close_agent(reviewer)
---
## Message Bus Helpers
Helper functions for writing to `.msg/messages.jsonl` and updating `.msg/meta.json`. These maintain compatibility with the Claude version's `team_msg` tool format.
### logToMessageBus
Append an NDJSON event to `messages.jsonl`:
```javascript
function logToMessageBus(sessionDir, counter, { from, to, type, summary, data }) {
const msg = {
id: `MSG-${String(counter).padStart(3, '0')}`,
ts: new Date().toISOString(),
from: from || "coordinator",
to: to || "coordinator",
type: type || "message",
summary: summary || `${type} from ${from}`,
data: data || {}
}
// Append to NDJSON log
appendToFile(`${sessionDir}/.msg/messages.jsonl`, JSON.stringify(msg) + "\n")
}
```
### updateMetaRoleState
Update role-specific state in `meta.json`:
```javascript
function updateMetaRoleState(sessionDir, role, stateData) {
const metaPath = `${sessionDir}/.msg/meta.json`
const meta = JSON.parse(Read(metaPath))
if (!meta.role_state) meta.role_state = {}
meta.role_state[role] = {
...meta.role_state[role],
...stateData,
_updated_at: new Date().toISOString()
}
meta.updated_at = new Date().toISOString()
Write(metaPath, JSON.stringify(meta, null, 2))
}
```
### getLastMsgCounter
Read the last message counter from `messages.jsonl`:
```javascript
function getLastMsgCounter(logPath) {
try {
const content = Read(logPath)
const lines = content.trim().split("\n").filter(Boolean)
if (lines.length === 0) return 0
const lastMsg = JSON.parse(lines[lines.length - 1])
return parseInt(lastMsg.id.replace("MSG-", ""), 10)
} catch {
return 0
}
}
```
### meta.json Schema
The `.msg/meta.json` file follows this schema (compatible with Claude version's `team_msg state_update`):
```json
{
"status": "active | paused | completed",
"pipeline_mode": "<mode>",
"pipeline_stages": ["role1", "role2", "..."],
"roles": ["coordinator", "role1", "role2", "..."],
"team_name": "lifecycle",
"role_state": {
"<role>": {
"status": "completed",
"task_id": "TASK-ID",
"completed_at": "<ISO8601>",
"_updated_at": "<ISO8601>"
}
},
"updated_at": "<ISO8601>"
}
```
---
## Next Phase
When pipeline completes (all tasks done, no paused checkpoint), proceed to [Phase 5: Completion Report](05-completion-report.md).

View File

@@ -138,6 +138,27 @@ state.completed_at = new Date().toISOString()
Write("<session-dir>/team-session.json",
JSON.stringify(state, null, 2))
// Update message bus meta.json status
const meta = JSON.parse(Read("<session-dir>/.msg/meta.json"))
meta.status = "completed"
meta.updated_at = new Date().toISOString()
Write("<session-dir>/.msg/meta.json", JSON.stringify(meta, null, 2))
// Log completion event to message bus
const msgCounter = getLastMsgCounter("<session-dir>/.msg/messages.jsonl") + 1
logToMessageBus("<session-dir>", msgCounter, {
from: "coordinator",
type: "shutdown",
summary: `Pipeline completed: ${completedTasks}/${totalTasks} tasks (${successRate}%)`,
data: {
tasks_completed: completedTasks,
tasks_total: totalTasks,
success_rate: successRate,
duration_min: durationMin,
failed_tasks: failedTasks
}
})
```
### Step 5.8: Output Completion Report