From a7ed0365f7d9e20643056d2aa80cfe16fe24797e Mon Sep 17 00:00:00 2001 From: catlog22 Date: Tue, 3 Mar 2026 16:11:57 +0800 Subject: [PATCH] feat: Enhance team messaging system with new operations and data handling - Added support for new message type 'state_update' in TeamMessageType. - Updated TeamMessageFeed component to handle both legacy and new reference formats. - Modified CLI options to clarify usage and added new commands for broadcasting messages and retrieving role states. - Implemented new command 'get_state' to read role state from meta.json. - Enhanced team message logging to auto-generate summaries and handle structured data. - Improved backward compatibility by enriching team metadata from legacy files. - Refactored message handling functions to streamline operations and improve clarity. --- .claude/agents/team-worker.md | 250 ++++++------- .../src/components/team/TeamMessageFeed.tsx | 6 +- ccw/frontend/src/types/team.ts | 3 + ccw/src/cli.ts | 10 +- ccw/src/commands/team.ts | 69 ++-- ccw/src/core/routes/team-routes.ts | 9 +- ccw/src/tools/team-msg.ts | 331 +++++++++++++----- 7 files changed, 409 insertions(+), 269 deletions(-) diff --git a/.claude/agents/team-worker.md b/.claude/agents/team-worker.md index ce4affed..167dd858 100644 --- a/.claude/agents/team-worker.md +++ b/.claude/agents/team-worker.md @@ -56,23 +56,38 @@ Parse the following fields from your prompt: --- -## Main Execution Loop +## Execution Flow ``` Entry: Parse prompt → extract role, role_spec, session, session_id, team_name, inner_loop - Read role_spec → parse frontmatter (prefix, discuss_rounds, etc.) - Read role_spec body → store Phase 2-4 instructions + Read role_spec → parse frontmatter + body (Phase 2-4 instructions) Load wisdom files from /wisdom/ (if exist) + context_accumulator = [] ← inner_loop only, in-memory across iterations + Main Loop: Phase 1: Task Discovery [built-in] Phase 2-4: Execute Role Spec [from .md] Phase 5: Report [built-in] - inner_loop AND more same-prefix tasks? → Phase 5-L → back to Phase 1 - no more tasks? → Phase 5-F → STOP + inner_loop=true AND more same-prefix tasks? → Phase 5-L → back to Phase 1 + inner_loop=false OR no more tasks? → Phase 5-F → STOP ``` +**Inner loop** (`inner_loop=true`): Processes ALL same-prefix tasks sequentially in a single agent instance. `context_accumulator` is passed to each subsequent subagent as `## Prior Context` for knowledge continuity. + +| Step | Phase 5-L (loop) | Phase 5-F (final) | +|------|-----------------|------------------| +| TaskUpdate completed | YES | YES | +| team_msg state_update | YES | YES | +| Accumulate summary | YES | - | +| SendMessage to coordinator | NO | YES (all tasks) | +| Fast-Advance check | - | YES | + +**Interrupt conditions** (break inner loop immediately): +- consensus_blocked HIGH → SendMessage → STOP +- Cumulative errors >= 3 → SendMessage → STOP + --- ## Phase 1: Task Discovery (Built-in) @@ -232,25 +247,32 @@ Discussion: /discussions/-discussion.md ## Phase 5: Report + Fast-Advance (Built-in) -After Phase 4 completes, determine Phase 5 variant: +After Phase 4 completes, determine Phase 5 variant (see Execution Flow for decision table). -### Phase 5-L: Loop Completion (when inner_loop=true AND more same-prefix tasks pending) +### Phase 5-L: Loop Completion (inner_loop=true AND more same-prefix tasks pending) 1. **TaskUpdate**: Mark current task `completed` -2. **Message Bus**: Log completion with verification evidence +2. **Message Bus**: Log state_update (combines state publish + audit log) ``` mcp__ccw-tools__team_msg( operation="log", - team=, + team_session_id=, from=, - to="coordinator", - type=, - summary="[] complete. . Verified: ", - ref= + type="state_update", + data={ + status: "task_complete", + task_id: "", + ref: "", + key_findings: , + decisions: , + files_modified: , + artifact_path: "", + verification: "" + } ) ``` - **CLI fallback**: `ccw team log --team --from --to coordinator --type --summary "[] ..." --json` -3. **Accumulate summary** to context_accumulator (in-memory): + > `to` defaults to "coordinator", `summary` auto-generated. `type="state_update"` auto-syncs data to `meta.json.role_state[]`. +3. **Accumulate** to `context_accumulator` (in-memory): ``` context_accumulator.append({ task: "", @@ -258,38 +280,36 @@ After Phase 4 completes, determine Phase 5 variant: key_decisions: , discuss_verdict: , discuss_rating: , - summary: "" + summary: "", + files_modified: }) ``` -4. **Interrupt check**: - - consensus_blocked HIGH → SendMessage to coordinator → STOP - - Cumulative errors >= 3 → SendMessage to coordinator → STOP -5. **Loop**: Return to Phase 1 to find next same-prefix task +4. **Interrupt check**: consensus_blocked HIGH or errors >= 3 → SendMessage → STOP +5. **Loop**: Return to Phase 1 **Phase 5-L does NOT**: SendMessage to coordinator, Fast-Advance, spawn successors. -### Phase 5-F: Final Report (when no more same-prefix tasks OR inner_loop=false) +### Phase 5-F: Final Report (no more same-prefix tasks OR inner_loop=false) 1. **TaskUpdate**: Mark current task `completed` -2. **Message Bus**: Log completion (same as Phase 5-L step 2) -3. **Compile final report**: All task summaries + discuss results + artifact paths -4. **Fast-Advance Check**: - - Call `TaskList()`, find pending tasks whose blockedBy are ALL completed - - Apply fast-advance rules (see table below) -5. **SendMessage** to coordinator OR **spawn successor** directly - -### Fast-Advance Rules +2. **Message Bus**: Log state_update (same call as Phase 5-L step 2) +3. **Compile final report** and **SendMessage** to coordinator: + ``` + SendMessage(team_name=, recipient="coordinator", message="[] ") + ``` + Report contents: tasks completed (count + list), artifacts produced (paths), files modified (with evidence), discuss results (verdicts + ratings), key decisions (from context_accumulator), verification summary, warnings/issues. +4. **Fast-Advance Check**: Call `TaskList()`, find pending tasks whose blockedBy are ALL completed, apply rules: | Condition | Action | |-----------|--------| | Same-prefix successor (inner loop role) | Do NOT spawn — main agent handles via inner loop | -| 1 ready task, simple linear successor, different prefix | Spawn directly via `Task(run_in_background: true)` + log `fast_advance` to message bus | +| 1 ready task, simple linear successor, different prefix | Spawn directly via `Task(run_in_background: true)` + log `fast_advance` | | Multiple ready tasks (parallel window) | SendMessage to coordinator (needs orchestration) | | No ready tasks + others running | SendMessage to coordinator (status update) | | No ready tasks + nothing running | SendMessage to coordinator (pipeline may be complete) | | Checkpoint task (e.g., spec->impl transition) | SendMessage to coordinator (needs user confirmation) | -### Fast-Advance Spawn Template +### Fast-Advance Spawn When fast-advancing to a different-prefix successor: @@ -311,160 +331,104 @@ inner_loop: ` }) ``` -### Fast-Advance Notification - -After spawning a successor via fast-advance, MUST log to message bus: +After spawning, MUST log to message bus (passive log, NOT a SendMessage): ``` mcp__ccw-tools__team_msg( operation="log", - team=, + team_session_id=, from=, - to="coordinator", type="fast_advance", summary="[] fast-advanced → spawned for " ) ``` -This is a passive log entry (NOT a SendMessage). Coordinator reads it on next callback to reconcile `active_workers`. - -### SendMessage Format - -``` -SendMessage(team_name=, recipient="coordinator", message="[] ") -``` - -**Final report contents**: -- Tasks completed (count + list) -- Artifacts produced (paths) -- Files modified (paths + before/after evidence from Phase 4 verification) -- Discuss results (verdicts + ratings) -- Key decisions (from context_accumulator) -- Verification summary (methods used, pass/fail status) -- Any warnings or issues +Coordinator reads this on next callback to reconcile `active_workers`. --- -## Inner Loop Framework - -When `inner_loop=true`, the agent processes ALL same-prefix tasks sequentially in a single agent instance: - -``` -context_accumulator = [] - -Phase 1: Find first -* task - Phase 2-4: Execute role spec - Phase 5-L: Mark done, log, accumulate, check interrupts - More -* tasks? → Phase 1 (loop) - No more? → Phase 5-F (final report) -``` - -**context_accumulator**: Maintained in-memory across loop iterations. Each entry contains task summary + key decisions + discuss results. Passed to subagents as context for knowledge continuity. - -**Phase 5-L vs Phase 5-F**: - -| Step | Phase 5-L (loop) | Phase 5-F (final) | -|------|-----------------|------------------| -| TaskUpdate completed | YES | YES | -| team_msg log | YES | YES | -| Accumulate summary | YES | - | -| SendMessage to coordinator | NO | YES (all tasks) | -| Fast-Advance check | - | YES | - -**Interrupt conditions** (break inner loop immediately): -- consensus_blocked HIGH → SendMessage → STOP -- Cumulative errors >= 3 → SendMessage → STOP - -**Crash recovery**: If agent crashes mid-loop, completed tasks are safe (TaskUpdate + artifacts on disk). Coordinator detects orphaned in_progress task on resume, resets to pending, re-spawns. New agent resumes from the interrupted task via Resume Artifact Check. - ---- - -## Wisdom Accumulation - -### Load (Phase 2) - -Extract session folder from prompt. Read wisdom files if they exist: - -``` -/wisdom/learnings.md -/wisdom/decisions.md -/wisdom/conventions.md -/wisdom/issues.md -``` - -Use wisdom context to inform Phase 2-4 execution. - -### Contribute (Phase 4/5) - -Write discoveries to corresponding wisdom files: -- New patterns → `learnings.md` -- Architecture/design decisions → `decisions.md` -- Codebase conventions → `conventions.md` -- Risks and known issues → `issues.md` - ---- - -## Knowledge Transfer +## Knowledge Transfer & Wisdom ### Upstream Context Loading (Phase 2) -When executing Phase 2 of a role-spec, the worker MUST load available cross-role context: +The worker MUST load available cross-role context before executing role-spec Phase 2: -| Source | Path | Load Method | -|--------|------|-------------| -| Upstream artifacts | `/artifacts/*.md` | Read files listed in task description or dependency chain | -| Shared memory | `/shared-memory.json` | Read and parse JSON | -| Wisdom | `/wisdom/*.md` | Read all wisdom files | -| Exploration cache | `/explorations/cache-index.json` | Check before new explorations | +| Source | Method | Priority | +|--------|--------|----------| +| Upstream role state | `team_msg(operation="get_state", role=)` | **Primary** — O(1) from meta.json | +| Upstream artifacts | Read files referenced in the state's artifact paths | Secondary — for large content | +| Wisdom files | Read `/wisdom/*.md` | Always load if exists | +| Exploration cache | Check `/explorations/cache-index.json` | Before new explorations | + +> **Legacy fallback**: If `get_state` returns null (older sessions), fall back to reading `/shared-memory.json`. ### Downstream Context Publishing (Phase 4) After Phase 4 verification, the worker MUST publish its contributions: 1. **Artifact**: Write deliverable to `/artifacts/--.md` -2. **shared-memory.json**: Read-merge-write under role namespace - ```json - { "": { "key_findings": [...], "decisions": [...], "files_modified": [...] } } - ``` +2. **State data**: Prepare payload for Phase 5 `state_update` message (see Phase 5-L step 2 for schema) 3. **Wisdom**: Append new patterns to `learnings.md`, decisions to `decisions.md`, issues to `issues.md` +4. **Context accumulator** (inner_loop only): Append summary (see Phase 5-L step 3 for schema). Pass full accumulator to subsequent subagents as `## Prior Context`. -### Inner Loop Context Accumulator - -For `inner_loop: true` roles, `context_accumulator` is maintained in-memory: +### Wisdom Files ``` -context_accumulator.append({ - task: "", - artifact: "", - key_decisions: [...], - summary: "", - files_modified: [...] -}) +/wisdom/learnings.md ← New patterns discovered +/wisdom/decisions.md ← Architecture/design decisions +/wisdom/conventions.md ← Codebase conventions +/wisdom/issues.md ← Risks and known issues ``` -Pass the full accumulator to each subsequent task's Phase 3 subagent as `## Prior Context`. +Load in Phase 2 to inform execution. Contribute in Phase 4/5 with discoveries. --- ## Message Bus Protocol -Always use `mcp__ccw-tools__team_msg` for logging. Parameters: +Always use `mcp__ccw-tools__team_msg` for team communication. + +### log (with state_update) — Primary for Phase 5 | Param | Value | |-------|-------| | operation | "log" | -| team | `` (NOT team_name) | +| team_session_id | `` (NOT team_name) | | from | `` | -| to | "coordinator" | -| type | From role_spec message_types | -| summary | `[] ` | -| ref | artifact path (optional) | +| type | "state_update" for completion; or role_spec message_types for non-state messages | +| data | structured state payload (auto-synced to meta.json when type="state_update"). Use `data.ref` for artifact paths | -**Critical**: `team` param must be session ID (e.g., `TLS-my-project-2026-02-27`), not team name. +> **Defaults**: `to` defaults to "coordinator", `summary` auto-generated as `[] `. +> When `type="state_update"`: data is auto-synced to `meta.json.role_state[]`. Top-level keys (`pipeline_mode`, `pipeline_stages`, `team_name`, `task_description`) are promoted to meta root. + +### get_state — Primary for Phase 2 + +``` +mcp__ccw-tools__team_msg( + operation="get_state", + team_session_id=, + role= // omit to get ALL role states +) +``` + +Returns `role_state[]` from meta.json. + +### broadcast — For team-wide signals + +``` +mcp__ccw-tools__team_msg( + operation="broadcast", + team_session_id=, + from=, + type= +) +``` + +Equivalent to `log` with `to="all"`. Summary auto-generated. **CLI fallback** (if MCP tool unavailable): ``` -ccw team log --team --from --to coordinator --type --summary "[] ..." --json +ccw team log --team --from --type --json ``` --- @@ -492,7 +456,7 @@ ccw team log --team --from --to coordinator --type -- | Cumulative errors >= 3 | SendMessage to coordinator with error summary, STOP | | No tasks found | SendMessage idle status, STOP | | Context missing (prior doc, template) | Request from coordinator via SendMessage | -| Agent crash mid-loop | Self-healing: coordinator resets orphaned task, re-spawns | +| Agent crash mid-loop | Self-healing: completed tasks are safe (TaskUpdate + artifacts on disk). Coordinator detects orphaned in_progress task on resume, resets to pending, re-spawns. New agent resumes via Resume Artifact Check. | --- diff --git a/ccw/frontend/src/components/team/TeamMessageFeed.tsx b/ccw/frontend/src/components/team/TeamMessageFeed.tsx index 3592d9a5..9985bb3d 100644 --- a/ccw/frontend/src/components/team/TeamMessageFeed.tsx +++ b/ccw/frontend/src/components/team/TeamMessageFeed.tsx @@ -90,11 +90,11 @@ function MessageRow({ msg }: { msg: TeamMessage }) { {/* Summary */}

{msg.summary}

- {/* Ref link */} - {msg.ref && ( + {/* Ref link (supports both legacy msg.ref and new data.ref) */} + {(msg.ref || (msg.data?.ref as string)) && (
- {msg.ref} + {msg.ref || (msg.data?.ref as string)}
)} diff --git a/ccw/frontend/src/types/team.ts b/ccw/frontend/src/types/team.ts index 39587b6c..28fc279b 100644 --- a/ccw/frontend/src/types/team.ts +++ b/ccw/frontend/src/types/team.ts @@ -26,6 +26,7 @@ export type TeamMessageType = | 'fix_required' | 'error' | 'shutdown' + | 'state_update' | 'message'; export interface TeamMember { @@ -49,6 +50,8 @@ export interface TeamSummaryExtended extends TeamSummary { updated_at: string; archived_at?: string; pipeline_mode?: string; + pipeline_stages?: string[]; + role_state?: Record>; memberCount: number; members: string[]; // Always provided by backend } diff --git a/ccw/src/cli.ts b/ccw/src/cli.ts index 5cceebed..f04dde6f 100644 --- a/ccw/src/cli.ts +++ b/ccw/src/cli.ts @@ -358,15 +358,15 @@ export function run(argv: string[]): void { program .command('team [subcommand] [args...]') .description('Team message bus for Agent Team communication') - .option('--team ', 'Team name') + .option('--team ', 'Session ID (e.g., TLS-my-project-2026-02-27)') .option('--from ', 'Sender role name') - .option('--to ', 'Recipient role name') + .option('--to ', 'Recipient role name (default: coordinator)') .option('--type ', 'Message type') - .option('--summary ', 'One-line summary') - .option('--ref ', 'File path reference') + .option('--summary ', 'One-line summary (auto-generated if omitted)') .option('--data ', 'JSON structured data') - .option('--id ', 'Message ID (for read)') + .option('--id ', 'Message ID (for read/delete)') .option('--last ', 'Last N messages (for list)') + .option('--role ', 'Role name (for get_state)') .option('--json', 'Output as JSON') .action((subcommand, args, options) => teamCommand(subcommand, args, options)); diff --git a/ccw/src/commands/team.ts b/ccw/src/commands/team.ts index 1acd7471..9b41500b 100644 --- a/ccw/src/commands/team.ts +++ b/ccw/src/commands/team.ts @@ -3,12 +3,14 @@ * Delegates to team-msg.ts handler for JSONL-based persistent messaging * * Commands: - * ccw team log --team --from --to --type --summary "..." - * ccw team read --team --id - * ccw team list --team [--from ] [--to ] [--type ] [--last ] - * ccw team status --team - * ccw team delete --team --id - * ccw team clear --team + * ccw team log --team --from [--to ] [--type ] [--summary "..."] + * ccw team broadcast --team --from [--type ] [--summary "..."] + * ccw team get_state --team [--role ] + * ccw team read --team --id + * ccw team list --team [--from ] [--to ] [--type ] [--last ] + * ccw team status --team + * ccw team delete --team --id + * ccw team clear --team */ import chalk from 'chalk'; @@ -24,6 +26,7 @@ interface TeamOptions { data?: string; id?: string; last?: string; + role?: string; json?: boolean; } @@ -55,6 +58,7 @@ export async function teamCommand( if (options.ref) params.ref = options.ref; if (options.id) params.id = options.id; if (options.last) params.last = parseInt(options.last, 10); + if (options.role) params.role = options.role; // Parse --data as JSON if (options.data) { @@ -82,17 +86,19 @@ export async function teamCommand( // Formatted output by operation switch (subcommand) { - case 'log': { + case 'log': + case 'broadcast': { const r = result.result as { id: string; message: string }; console.log(chalk.green(`✓ ${r.message}`)); break; } case 'read': { - const msg = result.result as { id: string; ts: string; from: string; to: string; type: string; summary: string; ref?: string; data?: unknown }; + const msg = result.result as { id: string; ts: string; from: string; to: string; type: string; summary: string; ref?: string; data?: Record }; console.log(chalk.bold(`${msg.id} [${msg.ts}]`)); console.log(` ${chalk.cyan(msg.from)} → ${chalk.yellow(msg.to)} (${msg.type})`); console.log(` ${msg.summary}`); - if (msg.ref) console.log(chalk.gray(` ref: ${msg.ref}`)); + const refPath = msg.ref || (msg.data?.ref as string | undefined); + if (refPath) console.log(chalk.gray(` ref: ${refPath}`)); if (msg.data) console.log(chalk.gray(` data: ${JSON.stringify(msg.data)}`)); break; } @@ -122,6 +128,18 @@ export async function teamCommand( console.log(chalk.green(`✓ ${r.message}`)); break; } + case 'get_state': { + const r = result.result as { role?: string; state?: unknown; role_state?: unknown; message?: string }; + if (r.message) { + console.log(chalk.yellow(r.message)); + } else if (r.role) { + console.log(chalk.bold(`Role: ${r.role}`)); + console.log(JSON.stringify(r.state, null, 2)); + } else { + console.log(JSON.stringify(r.role_state, null, 2)); + } + break; + } default: console.error(chalk.red(`Unknown subcommand: ${subcommand}`)); printHelp(); @@ -137,7 +155,9 @@ function printHelp(): void { console.log(chalk.bold.cyan('\n CCW Team Message Bus\n')); console.log(' CLI interface for team message logging and retrieval.\n'); console.log(' Subcommands:'); - console.log(chalk.gray(' log Log a team message')); + console.log(chalk.gray(' log Log a team message (to defaults to "coordinator", summary auto-generated)')); + console.log(chalk.gray(' broadcast Broadcast message to all team members (to="all")')); + console.log(chalk.gray(' get_state Read role state from meta.json')); console.log(chalk.gray(' read Read a specific message by ID')); console.log(chalk.gray(' list List recent messages with filters')); console.log(chalk.gray(' status Show team member activity summary')); @@ -147,13 +167,15 @@ function printHelp(): void { console.log(' Required:'); console.log(chalk.gray(' --team Session ID (e.g., TLS-my-project-2026-02-27), NOT team name')); console.log(); - console.log(' Log Options:'); - console.log(chalk.gray(' --from Sender role name')); - console.log(chalk.gray(' --to Recipient role name')); - console.log(chalk.gray(' --type Message type (plan_ready, impl_complete, etc.)')); - console.log(chalk.gray(' --summary One-line summary')); - console.log(chalk.gray(' --ref File path reference')); - console.log(chalk.gray(' --data JSON structured data')); + console.log(' Log/Broadcast Options:'); + console.log(chalk.gray(' --from Sender role name (required)')); + console.log(chalk.gray(' --to Recipient role (default: "coordinator")')); + console.log(chalk.gray(' --type Message type (state_update, plan_ready, shutdown, etc.)')); + console.log(chalk.gray(' --summary One-line summary (auto-generated if omitted)')); + console.log(chalk.gray(' --data JSON structured data. Use data.ref for file paths')); + console.log(); + console.log(' Get State Options:'); + console.log(chalk.gray(' --role Role name to query (omit for all roles)')); console.log(); console.log(' Read/Delete Options:'); console.log(chalk.gray(' --id Message ID')); @@ -168,12 +190,11 @@ function printHelp(): void { console.log(chalk.gray(' --json Output as JSON')); console.log(); console.log(' Examples:'); - console.log(chalk.gray(' ccw team log --team TLS-my-project-2026-02-27 --from executor --to coordinator --type impl_complete --summary "Task done"')); - console.log(chalk.gray(' ccw team list --team TLS-my-project-2026-02-27 --last 5')); - console.log(chalk.gray(' ccw team read --team TLS-my-project-2026-02-27 --id MSG-003')); - console.log(chalk.gray(' ccw team status --team TLS-my-project-2026-02-27')); - console.log(chalk.gray(' ccw team delete --team TLS-my-project-2026-02-27 --id MSG-003')); - console.log(chalk.gray(' ccw team clear --team TLS-my-project-2026-02-27')); - console.log(chalk.gray(' ccw team log --team TLS-my-project-2026-02-27 --from planner --to coordinator --type plan_ready --summary "Plan ready" --json')); + console.log(chalk.gray(' ccw team log --team TLS-xxx --from executor --type state_update --data \'{"status":"done"}\'')); + console.log(chalk.gray(' ccw team broadcast --team TLS-xxx --from coordinator --type shutdown')); + console.log(chalk.gray(' ccw team get_state --team TLS-xxx --role executor')); + console.log(chalk.gray(' ccw team list --team TLS-xxx --last 5')); + console.log(chalk.gray(' ccw team read --team TLS-xxx --id MSG-003')); + console.log(chalk.gray(' ccw team status --team TLS-xxx')); console.log(); } diff --git a/ccw/src/core/routes/team-routes.ts b/ccw/src/core/routes/team-routes.ts index 621c7c28..342df1b8 100644 --- a/ccw/src/core/routes/team-routes.ts +++ b/ccw/src/core/routes/team-routes.ts @@ -432,13 +432,14 @@ export async function handleTeamRoutes(ctx: RouteContext): Promise { const sessionDir = getSessionDir(artifactsTeamName, root); if (!existsSync(sessionDir)) { - // Check if it's a legacy team with session_id + // Check if it's a legacy team with session_id in meta const meta = getEffectiveTeamMeta(artifactsTeamName); - if (meta.session_id) { + const legacySessionId = (meta as Record).session_id as string | undefined; + if (legacySessionId) { // Legacy team with session_id - redirect to session directory - const legacySessionDir = getSessionDir(meta.session_id, root); + const legacySessionDir = getSessionDir(legacySessionId, root); if (existsSync(legacySessionDir)) { - serveArtifacts(legacySessionDir, meta.session_id, meta, artifactPath, res); + serveArtifacts(legacySessionDir, legacySessionId, meta, artifactPath, res); return true; } } diff --git a/ccw/src/tools/team-msg.ts b/ccw/src/tools/team-msg.ts index 75a2a9a2..498a824b 100644 --- a/ccw/src/tools/team-msg.ts +++ b/ccw/src/tools/team-msg.ts @@ -2,12 +2,14 @@ * Team Message Bus - JSONL-based persistent message log for Agent Teams * * Operations: - * - log: Append a message, returns auto-incremented ID - * - read: Read message(s) by ID - * - list: List recent messages with optional filters (from/to/type/last N) - * - status: Summarize team member activity from message history - * - delete: Delete a specific message by ID - * - clear: Clear all messages for a team + * - log: Append a message (to defaults to "coordinator", summary auto-generated if omitted) + * - read: Read message(s) by ID + * - list: List recent messages with optional filters (from/to/type/last N) + * - status: Summarize team member activity from message history + * - delete: Delete a specific message by ID + * - clear: Clear all messages for a team + * - broadcast: Log a message with to="all" + * - get_state: Read role state from meta.json */ import { z } from 'zod'; @@ -23,8 +25,16 @@ export interface TeamMeta { created_at: string; updated_at: string; archived_at?: string; + + // Pipeline configuration (previously in team-session.json) pipeline_mode?: string; - session_id?: string; // Links to .workflow/.team/{session-id}/ artifacts directory + pipeline_stages?: string[]; + team_name?: string; + task_description?: string; + roles?: string[]; + + // Role state snapshot (previously in shared-memory.json) + role_state?: Record>; } export function getMetaPath(team: string): string { @@ -61,13 +71,32 @@ export function inferTeamStatus(team: string): TeamMeta['status'] { } /** - * Get effective team meta: reads meta.json or infers from messages. + * Get effective team meta: reads meta.json, with fallback to shared-memory.json + * and team-session.json for backward compatibility with older sessions. */ export function getEffectiveTeamMeta(team: string): TeamMeta { const meta = readTeamMeta(team); - if (meta) return meta; + if (meta) { + // Enrich from legacy files if role_state/pipeline_mode missing + if (!meta.role_state || !meta.pipeline_mode) { + const legacyData = readLegacyFiles(team); + if (!meta.pipeline_mode && legacyData.pipeline_mode) { + meta.pipeline_mode = legacyData.pipeline_mode; + } + if (!meta.role_state && legacyData.role_state) { + meta.role_state = legacyData.role_state; + } + if (!meta.pipeline_stages && legacyData.pipeline_stages) { + meta.pipeline_stages = legacyData.pipeline_stages; + } + if (!meta.team_name && legacyData.team_name) { + meta.team_name = legacyData.team_name; + } + } + return meta; + } - // Infer from messages and directory stat + // No meta.json — build from legacy files + inferred status const status = inferTeamStatus(team); const dir = getLogDir(team); let created_at = new Date().toISOString(); @@ -80,7 +109,57 @@ export function getEffectiveTeamMeta(team: string): TeamMeta { const lastMsg = messages[messages.length - 1]; const updated_at = lastMsg?.ts || created_at; - return { status, created_at, updated_at }; + const legacyData = readLegacyFiles(team); + + return { + status, + created_at, + updated_at, + ...legacyData, + }; +} + +/** + * Read legacy files (shared-memory.json, team-session.json) for backward compatibility. + */ +function readLegacyFiles(team: string): Partial { + const root = getProjectRoot(); + const sessionDir = join(root, '.workflow', '.team', team); + const result: Partial = {}; + + // Try shared-memory.json (role state + pipeline_mode) + const sharedMemPath = join(sessionDir, 'shared-memory.json'); + if (existsSync(sharedMemPath)) { + try { + const sharedMem = JSON.parse(readFileSync(sharedMemPath, 'utf-8')); + if (sharedMem.pipeline_mode) result.pipeline_mode = sharedMem.pipeline_mode; + if (sharedMem.pipeline_stages) result.pipeline_stages = sharedMem.pipeline_stages; + // Extract role state: any key that looks like a role namespace + const roleState: Record> = {}; + for (const [key, value] of Object.entries(sharedMem)) { + if (typeof value === 'object' && value !== null && !Array.isArray(value) + && !['pipeline_mode', 'pipeline_stages'].includes(key)) { + roleState[key] = value as Record; + } + } + if (Object.keys(roleState).length > 0) result.role_state = roleState; + } catch { /* ignore parse errors */ } + } + + // Try team-session.json (pipeline config) + const sessionPath = join(sessionDir, 'team-session.json'); + if (existsSync(sessionPath)) { + try { + const session = JSON.parse(readFileSync(sessionPath, 'utf-8')); + if (!result.pipeline_mode && session.pipeline_mode) result.pipeline_mode = session.pipeline_mode; + if (!result.pipeline_stages && session.pipeline_stages) result.pipeline_stages = session.pipeline_stages; + if (session.team_name) result.team_name = session.team_name; + if (session.task_description) result.task_description = session.task_description; + if (session.roles) result.roles = session.roles; + } catch { /* ignore parse errors */ } + } + + return result; } // --- Types --- @@ -92,7 +171,6 @@ export interface TeamMessage { to: string; type: string; summary: string; - ref?: string; data?: Record; } @@ -106,29 +184,40 @@ export interface StatusEntry { // --- Zod Schema --- const ParamsSchema = z.object({ - operation: z.enum(['log', 'read', 'list', 'status', 'delete', 'clear']).describe('Operation to perform'), - team: z.string().describe('Session ID (new: .workflow/.team/{session-id}/.msg/) or team name (legacy: .workflow/.team-msg/{team}/)'), + operation: z.enum(['log', 'read', 'list', 'status', 'delete', 'clear', 'broadcast', 'get_state']).describe('Operation to perform'), - // log params - from: z.string().optional().describe('[log/list] Sender role name'), - to: z.string().optional().describe('[log/list] Recipient role name'), - type: z.string().optional().describe('[log/list] Message type (plan_ready, impl_complete, test_result, etc.)'), - summary: z.string().optional().describe('[log] One-line human-readable summary'), - ref: z.string().optional().describe('[log] File path reference for large content'), - data: z.record(z.string(), z.unknown()).optional().describe('[log] Optional structured data'), + // Accept both 'team' (legacy) and 'team_session_id' (preferred) + team: z.string().optional().describe('[deprecated] Use team_session_id instead'), + team_session_id: z.string().optional().describe('Session ID that determines message storage path (e.g., TLS-my-project-2026-02-27)'), - // read params - id: z.string().optional().describe('[read] Message ID to read (e.g. MSG-003)'), + // log/broadcast params + from: z.string().optional().describe('[log/broadcast/list] Sender role name'), + to: z.string().optional().describe('[log/list] Recipient role (defaults to "coordinator")'), + type: z.string().optional().describe('[log/broadcast/list] Message type (state_update, plan_ready, shutdown, etc.)'), + summary: z.string().optional().describe('[log/broadcast] One-line summary (auto-generated from type+from if omitted)'), + data: z.record(z.string(), z.unknown()).optional().describe('[log/broadcast] Structured data payload. Use data.ref for file paths. When type="state_update", auto-synced to meta.json'), + + // read/delete params + id: z.string().optional().describe('[read/delete] Message ID (e.g. MSG-003)'), // list params last: z.number().min(1).max(100).optional().describe('[list] Return last N messages (default: 20)'), - // session_id for artifact discovery - session_id: z.string().optional().describe('[log] Session ID for artifact discovery (links team to .workflow/.team/{session-id}/)'), + // get_state params + role: z.string().optional().describe('[get_state] Role name to query. Omit to get all role states'), + + // Legacy backward compat (accepted but ignored — team_session_id replaces this) + ref: z.string().optional().describe('[deprecated] Use data.ref instead'), + session_id: z.string().optional().describe('[deprecated] Use team_session_id instead'), }); type Params = z.infer; +/** Resolve team session ID from params, supporting legacy 'team' and new 'team_session_id' */ +function resolveTeamId(params: Params): string | null { + return params.team_session_id || params.team || params.session_id || null; +} + // --- Tool Schema --- export const schema: ToolSchema = { @@ -142,39 +231,44 @@ Directory Structure (LEGACY): .workflow/.team-msg/{team-name}/messages.jsonl Operations: - team_msg(operation="log", team="TLS-my-team-2026-02-15", from="planner", to="coordinator", type="plan_ready", summary="Plan ready: 3 tasks", ref=".workflow/.team-plan/my-team/plan.json") - team_msg(operation="log", team="TLS-my-team-2026-02-15", from="coordinator", to="implementer", type="task_unblocked", summary="Task ready") - team_msg(operation="read", team="TLS-my-team-2026-02-15", id="MSG-003") - team_msg(operation="list", team="TLS-my-team-2026-02-15") - team_msg(operation="list", team="TLS-my-team-2026-02-15", from="tester", last=5) - team_msg(operation="status", team="TLS-my-team-2026-02-15") - team_msg(operation="delete", team="TLS-my-team-2026-02-15", id="MSG-003") - team_msg(operation="clear", team="TLS-my-team-2026-02-15") + team_msg(operation="log", team_session_id="TLS-xxx", from="planner", type="plan_ready", data={ref: "plan.json"}) + team_msg(operation="log", team_session_id="TLS-xxx", from="coordinator", type="state_update", data={pipeline_mode: "full"}) + team_msg(operation="broadcast", team_session_id="TLS-xxx", from="coordinator", type="shutdown") + team_msg(operation="get_state", team_session_id="TLS-xxx", role="researcher") + team_msg(operation="read", team_session_id="TLS-xxx", id="MSG-003") + team_msg(operation="list", team_session_id="TLS-xxx", from="tester", last=5) + team_msg(operation="status", team_session_id="TLS-xxx") + team_msg(operation="delete", team_session_id="TLS-xxx", id="MSG-003") + team_msg(operation="clear", team_session_id="TLS-xxx") -Message types: plan_ready, plan_approved, plan_revision, task_unblocked, impl_complete, impl_progress, test_result, review_result, fix_required, error, shutdown`, +Defaults: to="coordinator", summary=auto-generated if omitted, type="message" +Message types: plan_ready, plan_approved, plan_revision, task_unblocked, impl_complete, impl_progress, test_result, review_result, fix_required, error, shutdown, state_update`, inputSchema: { type: 'object', properties: { operation: { type: 'string', - enum: ['log', 'read', 'list', 'status', 'delete', 'clear'], - description: 'Operation: log | read | list | status | delete | clear', + enum: ['log', 'read', 'list', 'status', 'delete', 'clear', 'broadcast', 'get_state'], + description: 'Operation: log | read | list | status | delete | clear | broadcast | get_state', }, - team: { + team_session_id: { type: 'string', - description: 'Session ID (e.g., TLS-my-project-2026-02-27). Maps to .workflow/.team/{session-id}/.msg/. Use session ID, NOT team name.', + description: 'Session ID (e.g., TLS-my-project-2026-02-27). Maps to .workflow/.team/{session-id}/.msg/', }, - from: { type: 'string', description: '[log/list] Sender role' }, - to: { type: 'string', description: '[log/list] Recipient role' }, - type: { type: 'string', description: '[log/list] Message type' }, - summary: { type: 'string', description: '[log] One-line summary' }, - ref: { type: 'string', description: '[log] File path for large content' }, - data: { type: 'object', description: '[log] Optional structured data' }, - id: { type: 'string', description: '[read] Message ID (e.g. MSG-003)' }, + from: { type: 'string', description: '[log/broadcast/list] Sender role' }, + to: { type: 'string', description: '[log/list] Recipient role (defaults to "coordinator")' }, + type: { type: 'string', description: '[log/broadcast/list] Message type' }, + summary: { type: 'string', description: '[log/broadcast] One-line summary (auto-generated if omitted)' }, + data: { type: 'object', description: '[log/broadcast] Structured data. Use data.ref for file paths. When type="state_update", auto-synced to meta.json' }, + id: { type: 'string', description: '[read/delete] Message ID (e.g. MSG-003)' }, last: { type: 'number', description: '[list] Last N messages (default 20)', minimum: 1, maximum: 100 }, - session_id: { type: 'string', description: '[log] Session ID for artifact discovery' }, + role: { type: 'string', description: '[get_state] Role name to query. Omit for all roles' }, + // Legacy params (backward compat) + team: { type: 'string', description: '[deprecated] Use team_session_id' }, + ref: { type: 'string', description: '[deprecated] Use data.ref instead' }, + session_id: { type: 'string', description: '[deprecated] Use team_session_id' }, }, - required: ['operation', 'team'], + required: ['operation'], }, }; @@ -202,12 +296,12 @@ export function getLogDirWithFallback(sessionId: string): string { return join(root, '.workflow', '.team-msg', sessionId); } -function getLogPath(team: string): string { - return join(getLogDir(team), 'messages.jsonl'); +function getLogPath(teamId: string): string { + return join(getLogDir(teamId), 'messages.jsonl'); } -function ensureLogFile(team: string): string { - const logPath = getLogPath(team); +function ensureLogFile(teamId: string): string { + const logPath = getLogPath(teamId); const dir = dirname(logPath); if (!existsSync(dir)) { mkdirSync(dir, { recursive: true }); @@ -218,8 +312,8 @@ function ensureLogFile(team: string): string { return logPath; } -export function readAllMessages(team: string): TeamMessage[] { - const logPath = getLogPath(team); +export function readAllMessages(teamId: string): TeamMessage[] { + const logPath = getLogPath(teamId); if (!existsSync(logPath)) return []; const content = readFileSync(logPath, 'utf-8').trim(); @@ -248,54 +342,81 @@ function nowISO(): string { // --- Operations --- -function opLog(params: Params): ToolResult { +function opLog(params: Params, teamId: string): ToolResult { if (!params.from) return { success: false, error: 'log requires "from"' }; - if (!params.to) return { success: false, error: 'log requires "to"' }; - if (!params.summary) return { success: false, error: 'log requires "summary"' }; - const logPath = ensureLogFile(params.team); - const messages = readAllMessages(params.team); + // Default to "coordinator" if to is not specified + const to = params.to || 'coordinator'; + + // Backward compat: merge legacy ref into data.ref + if (params.ref) { + if (!params.data) params.data = {}; + if (!params.data.ref) params.data.ref = params.ref; + } + + // Auto-generate summary if not provided + const summary = params.summary || `[${params.from}] ${params.type || 'message'} → ${to}`; + + const logPath = ensureLogFile(teamId); + const messages = readAllMessages(teamId); const id = getNextId(messages); const msg: TeamMessage = { id, ts: nowISO(), from: params.from, - to: params.to, + to, type: params.type || 'message', - summary: params.summary, + summary, }; - if (params.ref) msg.ref = params.ref; if (params.data) msg.data = params.data; appendFileSync(logPath, JSON.stringify(msg) + '\n', 'utf-8'); - // Update meta with session_id if provided - if (params.session_id) { - const meta = getEffectiveTeamMeta(params.team); - meta.session_id = params.session_id; + // Auto-sync state_update to meta.json + if (params.type === 'state_update' && params.data) { + const meta = getEffectiveTeamMeta(teamId); + + // Role state: store under role_state namespace + if (params.from) { + if (!meta.role_state) meta.role_state = {}; + meta.role_state[params.from] = { + ...meta.role_state[params.from], + ...params.data, + _updated_at: nowISO(), + }; + } + + // Promote top-level keys directly to meta + const topLevelKeys = ['pipeline_mode', 'pipeline_stages', 'team_name', 'task_description', 'roles'] as const; + for (const key of topLevelKeys) { + if (params.data[key] !== undefined) { + (meta as any)[key] = params.data[key]; + } + } + meta.updated_at = nowISO(); - writeTeamMeta(params.team, meta); + writeTeamMeta(teamId, meta); } return { success: true, result: { id, message: `Logged ${id}: [${msg.from} → ${msg.to}] ${msg.summary}` } }; } -function opRead(params: Params): ToolResult { +function opRead(params: Params, teamId: string): ToolResult { if (!params.id) return { success: false, error: 'read requires "id"' }; - const messages = readAllMessages(params.team); + const messages = readAllMessages(teamId); const msg = messages.find(m => m.id === params.id); if (!msg) { - return { success: false, error: `Message ${params.id} not found in team "${params.team}"` }; + return { success: false, error: `Message ${params.id} not found in team "${teamId}"` }; } return { success: true, result: msg }; } -function opList(params: Params): ToolResult { - let messages = readAllMessages(params.team); +function opList(params: Params, teamId: string): ToolResult { + let messages = readAllMessages(teamId); // Apply filters if (params.from) messages = messages.filter(m => m.from === params.from); @@ -319,8 +440,8 @@ function opList(params: Params): ToolResult { }; } -function opStatus(params: Params): ToolResult { - const messages = readAllMessages(params.team); +function opStatus(params: Params, teamId: string): ToolResult { + const messages = readAllMessages(teamId); if (messages.length === 0) { return { success: true, result: { members: [], summary: 'No messages recorded yet.' } }; @@ -357,35 +478,57 @@ function opStatus(params: Params): ToolResult { }; } -function opDelete(params: Params): ToolResult { +function opDelete(params: Params, teamId: string): ToolResult { if (!params.id) return { success: false, error: 'delete requires "id"' }; - const messages = readAllMessages(params.team); + const messages = readAllMessages(teamId); const idx = messages.findIndex(m => m.id === params.id); if (idx === -1) { - return { success: false, error: `Message ${params.id} not found in team "${params.team}"` }; + return { success: false, error: `Message ${params.id} not found in team "${teamId}"` }; } const removed = messages.splice(idx, 1)[0]; - const logPath = ensureLogFile(params.team); + const logPath = ensureLogFile(teamId); writeFileSync(logPath, messages.map(m => JSON.stringify(m)).join('\n') + (messages.length > 0 ? '\n' : ''), 'utf-8'); return { success: true, result: { deleted: removed.id, message: `Deleted ${removed.id}: [${removed.from} → ${removed.to}] ${removed.summary}` } }; } -function opClear(params: Params): ToolResult { - const logPath = getLogPath(params.team); - const dir = getLogDir(params.team); +function opBroadcast(params: Params, teamId: string): ToolResult { + if (!params.from) return { success: false, error: 'broadcast requires "from"' }; - if (!existsSync(logPath)) { - return { success: true, result: { message: `Team "${params.team}" has no messages to clear.` } }; + // Delegate to opLog with to="all" + return opLog({ ...params, operation: 'log', to: 'all' }, teamId); +} + +function opGetState(params: Params, teamId: string): ToolResult { + const meta = getEffectiveTeamMeta(teamId); + const roleState = meta.role_state || {}; + + if (params.role) { + const state = roleState[params.role]; + if (!state) { + return { success: true, result: { role: params.role, state: null, message: `No state found for role "${params.role}"` } }; + } + return { success: true, result: { role: params.role, state } }; } - const count = readAllMessages(params.team).length; + return { success: true, result: { role_state: roleState } }; +} + +function opClear(params: Params, teamId: string): ToolResult { + const logPath = getLogPath(teamId); + const dir = getLogDir(teamId); + + if (!existsSync(logPath)) { + return { success: true, result: { message: `Team "${teamId}" has no messages to clear.` } }; + } + + const count = readAllMessages(teamId).length; rmSync(dir, { recursive: true, force: true }); - return { success: true, result: { cleared: count, message: `Cleared ${count} messages for team "${params.team}".` } }; + return { success: true, result: { cleared: count, message: `Cleared ${count} messages for team "${teamId}".` } }; } // --- Handler --- @@ -398,13 +541,21 @@ export async function handler(params: Record): Promise