feat: Enhance team messaging system with new operations and data handling

- Added support for new message type 'state_update' in TeamMessageType.
- Updated TeamMessageFeed component to handle both legacy and new reference formats.
- Modified CLI options to clarify usage and added new commands for broadcasting messages and retrieving role states.
- Implemented new command 'get_state' to read role state from meta.json.
- Enhanced team message logging to auto-generate summaries and handle structured data.
- Improved backward compatibility by enriching team metadata from legacy files.
- Refactored message handling functions to streamline operations and improve clarity.
This commit is contained in:
catlog22
2026-03-03 16:11:57 +08:00
parent 628578b2bb
commit a7ed0365f7
7 changed files with 409 additions and 269 deletions

View File

@@ -56,23 +56,38 @@ Parse the following fields from your prompt:
---
## Main Execution Loop
## Execution Flow
```
Entry:
Parse prompt → extract role, role_spec, session, session_id, team_name, inner_loop
Read role_spec → parse frontmatter (prefix, discuss_rounds, etc.)
Read role_spec body → store Phase 2-4 instructions
Read role_spec → parse frontmatter + body (Phase 2-4 instructions)
Load wisdom files from <session>/wisdom/ (if exist)
context_accumulator = [] ← inner_loop only, in-memory across iterations
Main Loop:
Phase 1: Task Discovery [built-in]
Phase 2-4: Execute Role Spec [from .md]
Phase 5: Report [built-in]
inner_loop AND more same-prefix tasks? → Phase 5-L → back to Phase 1
no more tasks? → Phase 5-F → STOP
inner_loop=true AND more same-prefix tasks? → Phase 5-L → back to Phase 1
inner_loop=false OR no more tasks? → Phase 5-F → STOP
```
**Inner loop** (`inner_loop=true`): Processes ALL same-prefix tasks sequentially in a single agent instance. `context_accumulator` is passed to each subsequent subagent as `## Prior Context` for knowledge continuity.
| Step | Phase 5-L (loop) | Phase 5-F (final) |
|------|-----------------|------------------|
| TaskUpdate completed | YES | YES |
| team_msg state_update | YES | YES |
| Accumulate summary | YES | - |
| SendMessage to coordinator | NO | YES (all tasks) |
| Fast-Advance check | - | YES |
**Interrupt conditions** (break inner loop immediately):
- consensus_blocked HIGH → SendMessage → STOP
- Cumulative errors >= 3 → SendMessage → STOP
---
## Phase 1: Task Discovery (Built-in)
@@ -232,25 +247,32 @@ Discussion: <session-folder>/discussions/<round-id>-discussion.md
## Phase 5: Report + Fast-Advance (Built-in)
After Phase 4 completes, determine Phase 5 variant:
After Phase 4 completes, determine Phase 5 variant (see Execution Flow for decision table).
### Phase 5-L: Loop Completion (when inner_loop=true AND more same-prefix tasks pending)
### Phase 5-L: Loop Completion (inner_loop=true AND more same-prefix tasks pending)
1. **TaskUpdate**: Mark current task `completed`
2. **Message Bus**: Log completion with verification evidence
2. **Message Bus**: Log state_update (combines state publish + audit log)
```
mcp__ccw-tools__team_msg(
operation="log",
team=<session_id>,
team_session_id=<session_id>,
from=<role>,
to="coordinator",
type=<message_types.success>,
summary="[<role>] <task-id> complete. <brief-summary>. Verified: <verification_method>",
ref=<artifact-path>
type="state_update",
data={
status: "task_complete",
task_id: "<task-id>",
ref: "<artifact-path>",
key_findings: <from Phase 4>,
decisions: <from Phase 4>,
files_modified: <from Phase 4>,
artifact_path: "<artifact-path>",
verification: "<verification_method>"
}
)
```
**CLI fallback**: `ccw team log --team <session_id> --from <role> --to coordinator --type <type> --summary "[<role>] ..." --json`
3. **Accumulate summary** to context_accumulator (in-memory):
> `to` defaults to "coordinator", `summary` auto-generated. `type="state_update"` auto-syncs data to `meta.json.role_state[<role>]`.
3. **Accumulate** to `context_accumulator` (in-memory):
```
context_accumulator.append({
task: "<task-id>",
@@ -258,38 +280,36 @@ After Phase 4 completes, determine Phase 5 variant:
key_decisions: <from Phase 4>,
discuss_verdict: <from Phase 4 or "none">,
discuss_rating: <from Phase 4 or null>,
summary: "<brief summary>"
summary: "<brief summary>",
files_modified: <from Phase 4>
})
```
4. **Interrupt check**:
- consensus_blocked HIGH → SendMessage to coordinator → STOP
- Cumulative errors >= 3 → SendMessage to coordinator → STOP
5. **Loop**: Return to Phase 1 to find next same-prefix task
4. **Interrupt check**: consensus_blocked HIGH or errors >= 3 → SendMessage → STOP
5. **Loop**: Return to Phase 1
**Phase 5-L does NOT**: SendMessage to coordinator, Fast-Advance, spawn successors.
### Phase 5-F: Final Report (when no more same-prefix tasks OR inner_loop=false)
### Phase 5-F: Final Report (no more same-prefix tasks OR inner_loop=false)
1. **TaskUpdate**: Mark current task `completed`
2. **Message Bus**: Log completion (same as Phase 5-L step 2)
3. **Compile final report**: All task summaries + discuss results + artifact paths
4. **Fast-Advance Check**:
- Call `TaskList()`, find pending tasks whose blockedBy are ALL completed
- Apply fast-advance rules (see table below)
5. **SendMessage** to coordinator OR **spawn successor** directly
### Fast-Advance Rules
2. **Message Bus**: Log state_update (same call as Phase 5-L step 2)
3. **Compile final report** and **SendMessage** to coordinator:
```
SendMessage(team_name=<team_name>, recipient="coordinator", message="[<role>] <final-report>")
```
Report contents: tasks completed (count + list), artifacts produced (paths), files modified (with evidence), discuss results (verdicts + ratings), key decisions (from context_accumulator), verification summary, warnings/issues.
4. **Fast-Advance Check**: Call `TaskList()`, find pending tasks whose blockedBy are ALL completed, apply rules:
| Condition | Action |
|-----------|--------|
| Same-prefix successor (inner loop role) | Do NOT spawn — main agent handles via inner loop |
| 1 ready task, simple linear successor, different prefix | Spawn directly via `Task(run_in_background: true)` + log `fast_advance` to message bus |
| 1 ready task, simple linear successor, different prefix | Spawn directly via `Task(run_in_background: true)` + log `fast_advance` |
| Multiple ready tasks (parallel window) | SendMessage to coordinator (needs orchestration) |
| No ready tasks + others running | SendMessage to coordinator (status update) |
| No ready tasks + nothing running | SendMessage to coordinator (pipeline may be complete) |
| Checkpoint task (e.g., spec->impl transition) | SendMessage to coordinator (needs user confirmation) |
### Fast-Advance Spawn Template
### Fast-Advance Spawn
When fast-advancing to a different-prefix successor:
@@ -311,160 +331,104 @@ inner_loop: <true|false based on successor role>`
})
```
### Fast-Advance Notification
After spawning a successor via fast-advance, MUST log to message bus:
After spawning, MUST log to message bus (passive log, NOT a SendMessage):
```
mcp__ccw-tools__team_msg(
operation="log",
team=<session_id>,
team_session_id=<session_id>,
from=<role>,
to="coordinator",
type="fast_advance",
summary="[<role>] fast-advanced <completed-task-id> → spawned <successor-role> for <successor-task-id>"
)
```
This is a passive log entry (NOT a SendMessage). Coordinator reads it on next callback to reconcile `active_workers`.
### SendMessage Format
```
SendMessage(team_name=<team_name>, recipient="coordinator", message="[<role>] <final-report>")
```
**Final report contents**:
- Tasks completed (count + list)
- Artifacts produced (paths)
- Files modified (paths + before/after evidence from Phase 4 verification)
- Discuss results (verdicts + ratings)
- Key decisions (from context_accumulator)
- Verification summary (methods used, pass/fail status)
- Any warnings or issues
Coordinator reads this on next callback to reconcile `active_workers`.
---
## Inner Loop Framework
When `inner_loop=true`, the agent processes ALL same-prefix tasks sequentially in a single agent instance:
```
context_accumulator = []
Phase 1: Find first <prefix>-* task
Phase 2-4: Execute role spec
Phase 5-L: Mark done, log, accumulate, check interrupts
More <prefix>-* tasks? → Phase 1 (loop)
No more? → Phase 5-F (final report)
```
**context_accumulator**: Maintained in-memory across loop iterations. Each entry contains task summary + key decisions + discuss results. Passed to subagents as context for knowledge continuity.
**Phase 5-L vs Phase 5-F**:
| Step | Phase 5-L (loop) | Phase 5-F (final) |
|------|-----------------|------------------|
| TaskUpdate completed | YES | YES |
| team_msg log | YES | YES |
| Accumulate summary | YES | - |
| SendMessage to coordinator | NO | YES (all tasks) |
| Fast-Advance check | - | YES |
**Interrupt conditions** (break inner loop immediately):
- consensus_blocked HIGH → SendMessage → STOP
- Cumulative errors >= 3 → SendMessage → STOP
**Crash recovery**: If agent crashes mid-loop, completed tasks are safe (TaskUpdate + artifacts on disk). Coordinator detects orphaned in_progress task on resume, resets to pending, re-spawns. New agent resumes from the interrupted task via Resume Artifact Check.
---
## Wisdom Accumulation
### Load (Phase 2)
Extract session folder from prompt. Read wisdom files if they exist:
```
<session>/wisdom/learnings.md
<session>/wisdom/decisions.md
<session>/wisdom/conventions.md
<session>/wisdom/issues.md
```
Use wisdom context to inform Phase 2-4 execution.
### Contribute (Phase 4/5)
Write discoveries to corresponding wisdom files:
- New patterns → `learnings.md`
- Architecture/design decisions → `decisions.md`
- Codebase conventions → `conventions.md`
- Risks and known issues → `issues.md`
---
## Knowledge Transfer
## Knowledge Transfer & Wisdom
### Upstream Context Loading (Phase 2)
When executing Phase 2 of a role-spec, the worker MUST load available cross-role context:
The worker MUST load available cross-role context before executing role-spec Phase 2:
| Source | Path | Load Method |
|--------|------|-------------|
| Upstream artifacts | `<session>/artifacts/*.md` | Read files listed in task description or dependency chain |
| Shared memory | `<session>/shared-memory.json` | Read and parse JSON |
| Wisdom | `<session>/wisdom/*.md` | Read all wisdom files |
| Exploration cache | `<session>/explorations/cache-index.json` | Check before new explorations |
| Source | Method | Priority |
|--------|--------|----------|
| Upstream role state | `team_msg(operation="get_state", role=<upstream_role>)` | **Primary** — O(1) from meta.json |
| Upstream artifacts | Read files referenced in the state's artifact paths | Secondary — for large content |
| Wisdom files | Read `<session>/wisdom/*.md` | Always load if exists |
| Exploration cache | Check `<session>/explorations/cache-index.json` | Before new explorations |
> **Legacy fallback**: If `get_state` returns null (older sessions), fall back to reading `<session>/shared-memory.json`.
### Downstream Context Publishing (Phase 4)
After Phase 4 verification, the worker MUST publish its contributions:
1. **Artifact**: Write deliverable to `<session>/artifacts/<prefix>-<task-id>-<name>.md`
2. **shared-memory.json**: Read-merge-write under role namespace
```json
{ "<role>": { "key_findings": [...], "decisions": [...], "files_modified": [...] } }
```
2. **State data**: Prepare payload for Phase 5 `state_update` message (see Phase 5-L step 2 for schema)
3. **Wisdom**: Append new patterns to `learnings.md`, decisions to `decisions.md`, issues to `issues.md`
4. **Context accumulator** (inner_loop only): Append summary (see Phase 5-L step 3 for schema). Pass full accumulator to subsequent subagents as `## Prior Context`.
### Inner Loop Context Accumulator
For `inner_loop: true` roles, `context_accumulator` is maintained in-memory:
### Wisdom Files
```
context_accumulator.append({
task: "<task-id>",
artifact: "<output-path>",
key_decisions: [...],
summary: "<brief>",
files_modified: [...]
})
<session>/wisdom/learnings.md ← New patterns discovered
<session>/wisdom/decisions.md ← Architecture/design decisions
<session>/wisdom/conventions.md ← Codebase conventions
<session>/wisdom/issues.md ← Risks and known issues
```
Pass the full accumulator to each subsequent task's Phase 3 subagent as `## Prior Context`.
Load in Phase 2 to inform execution. Contribute in Phase 4/5 with discoveries.
---
## Message Bus Protocol
Always use `mcp__ccw-tools__team_msg` for logging. Parameters:
Always use `mcp__ccw-tools__team_msg` for team communication.
### log (with state_update) — Primary for Phase 5
| Param | Value |
|-------|-------|
| operation | "log" |
| team | `<session_id>` (NOT team_name) |
| team_session_id | `<session_id>` (NOT team_name) |
| from | `<role>` |
| to | "coordinator" |
| type | From role_spec message_types |
| summary | `[<role>] <message>` |
| ref | artifact path (optional) |
| type | "state_update" for completion; or role_spec message_types for non-state messages |
| data | structured state payload (auto-synced to meta.json when type="state_update"). Use `data.ref` for artifact paths |
**Critical**: `team` param must be session ID (e.g., `TLS-my-project-2026-02-27`), not team name.
> **Defaults**: `to` defaults to "coordinator", `summary` auto-generated as `[<from>] <type> → <to>`.
> When `type="state_update"`: data is auto-synced to `meta.json.role_state[<role>]`. Top-level keys (`pipeline_mode`, `pipeline_stages`, `team_name`, `task_description`) are promoted to meta root.
### get_state — Primary for Phase 2
```
mcp__ccw-tools__team_msg(
operation="get_state",
team_session_id=<session_id>,
role=<upstream_role> // omit to get ALL role states
)
```
Returns `role_state[<role>]` from meta.json.
### broadcast — For team-wide signals
```
mcp__ccw-tools__team_msg(
operation="broadcast",
team_session_id=<session_id>,
from=<role>,
type=<type>
)
```
Equivalent to `log` with `to="all"`. Summary auto-generated.
**CLI fallback** (if MCP tool unavailable):
```
ccw team log --team <session_id> --from <role> --to coordinator --type <type> --summary "[<role>] ..." --json
ccw team log --team <session_id> --from <role> --type <type> --json
```
---
@@ -492,7 +456,7 @@ ccw team log --team <session_id> --from <role> --to coordinator --type <type> --
| Cumulative errors >= 3 | SendMessage to coordinator with error summary, STOP |
| No tasks found | SendMessage idle status, STOP |
| Context missing (prior doc, template) | Request from coordinator via SendMessage |
| Agent crash mid-loop | Self-healing: coordinator resets orphaned task, re-spawns |
| Agent crash mid-loop | Self-healing: completed tasks are safe (TaskUpdate + artifacts on disk). Coordinator detects orphaned in_progress task on resume, resets to pending, re-spawns. New agent resumes via Resume Artifact Check. |
---

View File

@@ -90,11 +90,11 @@ function MessageRow({ msg }: { msg: TeamMessage }) {
{/* Summary */}
<p className="text-xs text-foreground/80">{msg.summary}</p>
{/* Ref link */}
{msg.ref && (
{/* Ref link (supports both legacy msg.ref and new data.ref) */}
{(msg.ref || (msg.data?.ref as string)) && (
<div className="flex items-center gap-1 text-[10px] text-muted-foreground">
<FileText className="w-3 h-3" />
<span className="font-mono truncate">{msg.ref}</span>
<span className="font-mono truncate">{msg.ref || (msg.data?.ref as string)}</span>
</div>
)}

View File

@@ -26,6 +26,7 @@ export type TeamMessageType =
| 'fix_required'
| 'error'
| 'shutdown'
| 'state_update'
| 'message';
export interface TeamMember {
@@ -49,6 +50,8 @@ export interface TeamSummaryExtended extends TeamSummary {
updated_at: string;
archived_at?: string;
pipeline_mode?: string;
pipeline_stages?: string[];
role_state?: Record<string, Record<string, unknown>>;
memberCount: number;
members: string[]; // Always provided by backend
}

View File

@@ -358,15 +358,15 @@ export function run(argv: string[]): void {
program
.command('team [subcommand] [args...]')
.description('Team message bus for Agent Team communication')
.option('--team <name>', 'Team name')
.option('--team <name>', 'Session ID (e.g., TLS-my-project-2026-02-27)')
.option('--from <role>', 'Sender role name')
.option('--to <role>', 'Recipient role name')
.option('--to <role>', 'Recipient role name (default: coordinator)')
.option('--type <type>', 'Message type')
.option('--summary <text>', 'One-line summary')
.option('--ref <path>', 'File path reference')
.option('--summary <text>', 'One-line summary (auto-generated if omitted)')
.option('--data <json>', 'JSON structured data')
.option('--id <id>', 'Message ID (for read)')
.option('--id <id>', 'Message ID (for read/delete)')
.option('--last <n>', 'Last N messages (for list)')
.option('--role <role>', 'Role name (for get_state)')
.option('--json', 'Output as JSON')
.action((subcommand, args, options) => teamCommand(subcommand, args, options));

View File

@@ -3,7 +3,9 @@
* Delegates to team-msg.ts handler for JSONL-based persistent messaging
*
* Commands:
* ccw team log --team <session-id> --from <role> --to <role> --type <type> --summary "..."
* ccw team log --team <session-id> --from <role> [--to <role>] [--type <type>] [--summary "..."]
* ccw team broadcast --team <session-id> --from <role> [--type <type>] [--summary "..."]
* ccw team get_state --team <session-id> [--role <role>]
* ccw team read --team <session-id> --id <MSG-NNN>
* ccw team list --team <session-id> [--from <role>] [--to <role>] [--type <type>] [--last <n>]
* ccw team status --team <session-id>
@@ -24,6 +26,7 @@ interface TeamOptions {
data?: string;
id?: string;
last?: string;
role?: string;
json?: boolean;
}
@@ -55,6 +58,7 @@ export async function teamCommand(
if (options.ref) params.ref = options.ref;
if (options.id) params.id = options.id;
if (options.last) params.last = parseInt(options.last, 10);
if (options.role) params.role = options.role;
// Parse --data as JSON
if (options.data) {
@@ -82,17 +86,19 @@ export async function teamCommand(
// Formatted output by operation
switch (subcommand) {
case 'log': {
case 'log':
case 'broadcast': {
const r = result.result as { id: string; message: string };
console.log(chalk.green(`${r.message}`));
break;
}
case 'read': {
const msg = result.result as { id: string; ts: string; from: string; to: string; type: string; summary: string; ref?: string; data?: unknown };
const msg = result.result as { id: string; ts: string; from: string; to: string; type: string; summary: string; ref?: string; data?: Record<string, unknown> };
console.log(chalk.bold(`${msg.id} [${msg.ts}]`));
console.log(` ${chalk.cyan(msg.from)}${chalk.yellow(msg.to)} (${msg.type})`);
console.log(` ${msg.summary}`);
if (msg.ref) console.log(chalk.gray(` ref: ${msg.ref}`));
const refPath = msg.ref || (msg.data?.ref as string | undefined);
if (refPath) console.log(chalk.gray(` ref: ${refPath}`));
if (msg.data) console.log(chalk.gray(` data: ${JSON.stringify(msg.data)}`));
break;
}
@@ -122,6 +128,18 @@ export async function teamCommand(
console.log(chalk.green(`${r.message}`));
break;
}
case 'get_state': {
const r = result.result as { role?: string; state?: unknown; role_state?: unknown; message?: string };
if (r.message) {
console.log(chalk.yellow(r.message));
} else if (r.role) {
console.log(chalk.bold(`Role: ${r.role}`));
console.log(JSON.stringify(r.state, null, 2));
} else {
console.log(JSON.stringify(r.role_state, null, 2));
}
break;
}
default:
console.error(chalk.red(`Unknown subcommand: ${subcommand}`));
printHelp();
@@ -137,7 +155,9 @@ function printHelp(): void {
console.log(chalk.bold.cyan('\n CCW Team Message Bus\n'));
console.log(' CLI interface for team message logging and retrieval.\n');
console.log(' Subcommands:');
console.log(chalk.gray(' log Log a team message'));
console.log(chalk.gray(' log Log a team message (to defaults to "coordinator", summary auto-generated)'));
console.log(chalk.gray(' broadcast Broadcast message to all team members (to="all")'));
console.log(chalk.gray(' get_state Read role state from meta.json'));
console.log(chalk.gray(' read Read a specific message by ID'));
console.log(chalk.gray(' list List recent messages with filters'));
console.log(chalk.gray(' status Show team member activity summary'));
@@ -147,13 +167,15 @@ function printHelp(): void {
console.log(' Required:');
console.log(chalk.gray(' --team <session-id> Session ID (e.g., TLS-my-project-2026-02-27), NOT team name'));
console.log();
console.log(' Log Options:');
console.log(chalk.gray(' --from <role> Sender role name'));
console.log(chalk.gray(' --to <role> Recipient role name'));
console.log(chalk.gray(' --type <type> Message type (plan_ready, impl_complete, etc.)'));
console.log(chalk.gray(' --summary <text> One-line summary'));
console.log(chalk.gray(' --ref <path> File path reference'));
console.log(chalk.gray(' --data <json> JSON structured data'));
console.log(' Log/Broadcast Options:');
console.log(chalk.gray(' --from <role> Sender role name (required)'));
console.log(chalk.gray(' --to <role> Recipient role (default: "coordinator")'));
console.log(chalk.gray(' --type <type> Message type (state_update, plan_ready, shutdown, etc.)'));
console.log(chalk.gray(' --summary <text> One-line summary (auto-generated if omitted)'));
console.log(chalk.gray(' --data <json> JSON structured data. Use data.ref for file paths'));
console.log();
console.log(' Get State Options:');
console.log(chalk.gray(' --role <role> Role name to query (omit for all roles)'));
console.log();
console.log(' Read/Delete Options:');
console.log(chalk.gray(' --id <MSG-NNN> Message ID'));
@@ -168,12 +190,11 @@ function printHelp(): void {
console.log(chalk.gray(' --json Output as JSON'));
console.log();
console.log(' Examples:');
console.log(chalk.gray(' ccw team log --team TLS-my-project-2026-02-27 --from executor --to coordinator --type impl_complete --summary "Task done"'));
console.log(chalk.gray(' ccw team list --team TLS-my-project-2026-02-27 --last 5'));
console.log(chalk.gray(' ccw team read --team TLS-my-project-2026-02-27 --id MSG-003'));
console.log(chalk.gray(' ccw team status --team TLS-my-project-2026-02-27'));
console.log(chalk.gray(' ccw team delete --team TLS-my-project-2026-02-27 --id MSG-003'));
console.log(chalk.gray(' ccw team clear --team TLS-my-project-2026-02-27'));
console.log(chalk.gray(' ccw team log --team TLS-my-project-2026-02-27 --from planner --to coordinator --type plan_ready --summary "Plan ready" --json'));
console.log(chalk.gray(' ccw team log --team TLS-xxx --from executor --type state_update --data \'{"status":"done"}\''));
console.log(chalk.gray(' ccw team broadcast --team TLS-xxx --from coordinator --type shutdown'));
console.log(chalk.gray(' ccw team get_state --team TLS-xxx --role executor'));
console.log(chalk.gray(' ccw team list --team TLS-xxx --last 5'));
console.log(chalk.gray(' ccw team read --team TLS-xxx --id MSG-003'));
console.log(chalk.gray(' ccw team status --team TLS-xxx'));
console.log();
}

View File

@@ -432,13 +432,14 @@ export async function handleTeamRoutes(ctx: RouteContext): Promise<boolean> {
const sessionDir = getSessionDir(artifactsTeamName, root);
if (!existsSync(sessionDir)) {
// Check if it's a legacy team with session_id
// Check if it's a legacy team with session_id in meta
const meta = getEffectiveTeamMeta(artifactsTeamName);
if (meta.session_id) {
const legacySessionId = (meta as Record<string, unknown>).session_id as string | undefined;
if (legacySessionId) {
// Legacy team with session_id - redirect to session directory
const legacySessionDir = getSessionDir(meta.session_id, root);
const legacySessionDir = getSessionDir(legacySessionId, root);
if (existsSync(legacySessionDir)) {
serveArtifacts(legacySessionDir, meta.session_id, meta, artifactPath, res);
serveArtifacts(legacySessionDir, legacySessionId, meta, artifactPath, res);
return true;
}
}

View File

@@ -2,12 +2,14 @@
* Team Message Bus - JSONL-based persistent message log for Agent Teams
*
* Operations:
* - log: Append a message, returns auto-incremented ID
* - log: Append a message (to defaults to "coordinator", summary auto-generated if omitted)
* - read: Read message(s) by ID
* - list: List recent messages with optional filters (from/to/type/last N)
* - status: Summarize team member activity from message history
* - delete: Delete a specific message by ID
* - clear: Clear all messages for a team
* - broadcast: Log a message with to="all"
* - get_state: Read role state from meta.json
*/
import { z } from 'zod';
@@ -23,8 +25,16 @@ export interface TeamMeta {
created_at: string;
updated_at: string;
archived_at?: string;
// Pipeline configuration (previously in team-session.json)
pipeline_mode?: string;
session_id?: string; // Links to .workflow/.team/{session-id}/ artifacts directory
pipeline_stages?: string[];
team_name?: string;
task_description?: string;
roles?: string[];
// Role state snapshot (previously in shared-memory.json)
role_state?: Record<string, Record<string, unknown>>;
}
export function getMetaPath(team: string): string {
@@ -61,13 +71,32 @@ export function inferTeamStatus(team: string): TeamMeta['status'] {
}
/**
* Get effective team meta: reads meta.json or infers from messages.
* Get effective team meta: reads meta.json, with fallback to shared-memory.json
* and team-session.json for backward compatibility with older sessions.
*/
export function getEffectiveTeamMeta(team: string): TeamMeta {
const meta = readTeamMeta(team);
if (meta) return meta;
if (meta) {
// Enrich from legacy files if role_state/pipeline_mode missing
if (!meta.role_state || !meta.pipeline_mode) {
const legacyData = readLegacyFiles(team);
if (!meta.pipeline_mode && legacyData.pipeline_mode) {
meta.pipeline_mode = legacyData.pipeline_mode;
}
if (!meta.role_state && legacyData.role_state) {
meta.role_state = legacyData.role_state;
}
if (!meta.pipeline_stages && legacyData.pipeline_stages) {
meta.pipeline_stages = legacyData.pipeline_stages;
}
if (!meta.team_name && legacyData.team_name) {
meta.team_name = legacyData.team_name;
}
}
return meta;
}
// Infer from messages and directory stat
// No meta.json — build from legacy files + inferred status
const status = inferTeamStatus(team);
const dir = getLogDir(team);
let created_at = new Date().toISOString();
@@ -80,7 +109,57 @@ export function getEffectiveTeamMeta(team: string): TeamMeta {
const lastMsg = messages[messages.length - 1];
const updated_at = lastMsg?.ts || created_at;
return { status, created_at, updated_at };
const legacyData = readLegacyFiles(team);
return {
status,
created_at,
updated_at,
...legacyData,
};
}
/**
* Read legacy files (shared-memory.json, team-session.json) for backward compatibility.
*/
function readLegacyFiles(team: string): Partial<TeamMeta> {
const root = getProjectRoot();
const sessionDir = join(root, '.workflow', '.team', team);
const result: Partial<TeamMeta> = {};
// Try shared-memory.json (role state + pipeline_mode)
const sharedMemPath = join(sessionDir, 'shared-memory.json');
if (existsSync(sharedMemPath)) {
try {
const sharedMem = JSON.parse(readFileSync(sharedMemPath, 'utf-8'));
if (sharedMem.pipeline_mode) result.pipeline_mode = sharedMem.pipeline_mode;
if (sharedMem.pipeline_stages) result.pipeline_stages = sharedMem.pipeline_stages;
// Extract role state: any key that looks like a role namespace
const roleState: Record<string, Record<string, unknown>> = {};
for (const [key, value] of Object.entries(sharedMem)) {
if (typeof value === 'object' && value !== null && !Array.isArray(value)
&& !['pipeline_mode', 'pipeline_stages'].includes(key)) {
roleState[key] = value as Record<string, unknown>;
}
}
if (Object.keys(roleState).length > 0) result.role_state = roleState;
} catch { /* ignore parse errors */ }
}
// Try team-session.json (pipeline config)
const sessionPath = join(sessionDir, 'team-session.json');
if (existsSync(sessionPath)) {
try {
const session = JSON.parse(readFileSync(sessionPath, 'utf-8'));
if (!result.pipeline_mode && session.pipeline_mode) result.pipeline_mode = session.pipeline_mode;
if (!result.pipeline_stages && session.pipeline_stages) result.pipeline_stages = session.pipeline_stages;
if (session.team_name) result.team_name = session.team_name;
if (session.task_description) result.task_description = session.task_description;
if (session.roles) result.roles = session.roles;
} catch { /* ignore parse errors */ }
}
return result;
}
// --- Types ---
@@ -92,7 +171,6 @@ export interface TeamMessage {
to: string;
type: string;
summary: string;
ref?: string;
data?: Record<string, unknown>;
}
@@ -106,29 +184,40 @@ export interface StatusEntry {
// --- Zod Schema ---
const ParamsSchema = z.object({
operation: z.enum(['log', 'read', 'list', 'status', 'delete', 'clear']).describe('Operation to perform'),
team: z.string().describe('Session ID (new: .workflow/.team/{session-id}/.msg/) or team name (legacy: .workflow/.team-msg/{team}/)'),
operation: z.enum(['log', 'read', 'list', 'status', 'delete', 'clear', 'broadcast', 'get_state']).describe('Operation to perform'),
// log params
from: z.string().optional().describe('[log/list] Sender role name'),
to: z.string().optional().describe('[log/list] Recipient role name'),
type: z.string().optional().describe('[log/list] Message type (plan_ready, impl_complete, test_result, etc.)'),
summary: z.string().optional().describe('[log] One-line human-readable summary'),
ref: z.string().optional().describe('[log] File path reference for large content'),
data: z.record(z.string(), z.unknown()).optional().describe('[log] Optional structured data'),
// Accept both 'team' (legacy) and 'team_session_id' (preferred)
team: z.string().optional().describe('[deprecated] Use team_session_id instead'),
team_session_id: z.string().optional().describe('Session ID that determines message storage path (e.g., TLS-my-project-2026-02-27)'),
// read params
id: z.string().optional().describe('[read] Message ID to read (e.g. MSG-003)'),
// log/broadcast params
from: z.string().optional().describe('[log/broadcast/list] Sender role name'),
to: z.string().optional().describe('[log/list] Recipient role (defaults to "coordinator")'),
type: z.string().optional().describe('[log/broadcast/list] Message type (state_update, plan_ready, shutdown, etc.)'),
summary: z.string().optional().describe('[log/broadcast] One-line summary (auto-generated from type+from if omitted)'),
data: z.record(z.string(), z.unknown()).optional().describe('[log/broadcast] Structured data payload. Use data.ref for file paths. When type="state_update", auto-synced to meta.json'),
// read/delete params
id: z.string().optional().describe('[read/delete] Message ID (e.g. MSG-003)'),
// list params
last: z.number().min(1).max(100).optional().describe('[list] Return last N messages (default: 20)'),
// session_id for artifact discovery
session_id: z.string().optional().describe('[log] Session ID for artifact discovery (links team to .workflow/.team/{session-id}/)'),
// get_state params
role: z.string().optional().describe('[get_state] Role name to query. Omit to get all role states'),
// Legacy backward compat (accepted but ignored — team_session_id replaces this)
ref: z.string().optional().describe('[deprecated] Use data.ref instead'),
session_id: z.string().optional().describe('[deprecated] Use team_session_id instead'),
});
type Params = z.infer<typeof ParamsSchema>;
/** Resolve team session ID from params, supporting legacy 'team' and new 'team_session_id' */
function resolveTeamId(params: Params): string | null {
return params.team_session_id || params.team || params.session_id || null;
}
// --- Tool Schema ---
export const schema: ToolSchema = {
@@ -142,39 +231,44 @@ Directory Structure (LEGACY):
.workflow/.team-msg/{team-name}/messages.jsonl
Operations:
team_msg(operation="log", team="TLS-my-team-2026-02-15", from="planner", to="coordinator", type="plan_ready", summary="Plan ready: 3 tasks", ref=".workflow/.team-plan/my-team/plan.json")
team_msg(operation="log", team="TLS-my-team-2026-02-15", from="coordinator", to="implementer", type="task_unblocked", summary="Task ready")
team_msg(operation="read", team="TLS-my-team-2026-02-15", id="MSG-003")
team_msg(operation="list", team="TLS-my-team-2026-02-15")
team_msg(operation="list", team="TLS-my-team-2026-02-15", from="tester", last=5)
team_msg(operation="status", team="TLS-my-team-2026-02-15")
team_msg(operation="delete", team="TLS-my-team-2026-02-15", id="MSG-003")
team_msg(operation="clear", team="TLS-my-team-2026-02-15")
team_msg(operation="log", team_session_id="TLS-xxx", from="planner", type="plan_ready", data={ref: "plan.json"})
team_msg(operation="log", team_session_id="TLS-xxx", from="coordinator", type="state_update", data={pipeline_mode: "full"})
team_msg(operation="broadcast", team_session_id="TLS-xxx", from="coordinator", type="shutdown")
team_msg(operation="get_state", team_session_id="TLS-xxx", role="researcher")
team_msg(operation="read", team_session_id="TLS-xxx", id="MSG-003")
team_msg(operation="list", team_session_id="TLS-xxx", from="tester", last=5)
team_msg(operation="status", team_session_id="TLS-xxx")
team_msg(operation="delete", team_session_id="TLS-xxx", id="MSG-003")
team_msg(operation="clear", team_session_id="TLS-xxx")
Message types: plan_ready, plan_approved, plan_revision, task_unblocked, impl_complete, impl_progress, test_result, review_result, fix_required, error, shutdown`,
Defaults: to="coordinator", summary=auto-generated if omitted, type="message"
Message types: plan_ready, plan_approved, plan_revision, task_unblocked, impl_complete, impl_progress, test_result, review_result, fix_required, error, shutdown, state_update`,
inputSchema: {
type: 'object',
properties: {
operation: {
type: 'string',
enum: ['log', 'read', 'list', 'status', 'delete', 'clear'],
description: 'Operation: log | read | list | status | delete | clear',
enum: ['log', 'read', 'list', 'status', 'delete', 'clear', 'broadcast', 'get_state'],
description: 'Operation: log | read | list | status | delete | clear | broadcast | get_state',
},
team: {
team_session_id: {
type: 'string',
description: 'Session ID (e.g., TLS-my-project-2026-02-27). Maps to .workflow/.team/{session-id}/.msg/. Use session ID, NOT team name.',
description: 'Session ID (e.g., TLS-my-project-2026-02-27). Maps to .workflow/.team/{session-id}/.msg/',
},
from: { type: 'string', description: '[log/list] Sender role' },
to: { type: 'string', description: '[log/list] Recipient role' },
type: { type: 'string', description: '[log/list] Message type' },
summary: { type: 'string', description: '[log] One-line summary' },
ref: { type: 'string', description: '[log] File path for large content' },
data: { type: 'object', description: '[log] Optional structured data' },
id: { type: 'string', description: '[read] Message ID (e.g. MSG-003)' },
from: { type: 'string', description: '[log/broadcast/list] Sender role' },
to: { type: 'string', description: '[log/list] Recipient role (defaults to "coordinator")' },
type: { type: 'string', description: '[log/broadcast/list] Message type' },
summary: { type: 'string', description: '[log/broadcast] One-line summary (auto-generated if omitted)' },
data: { type: 'object', description: '[log/broadcast] Structured data. Use data.ref for file paths. When type="state_update", auto-synced to meta.json' },
id: { type: 'string', description: '[read/delete] Message ID (e.g. MSG-003)' },
last: { type: 'number', description: '[list] Last N messages (default 20)', minimum: 1, maximum: 100 },
session_id: { type: 'string', description: '[log] Session ID for artifact discovery' },
role: { type: 'string', description: '[get_state] Role name to query. Omit for all roles' },
// Legacy params (backward compat)
team: { type: 'string', description: '[deprecated] Use team_session_id' },
ref: { type: 'string', description: '[deprecated] Use data.ref instead' },
session_id: { type: 'string', description: '[deprecated] Use team_session_id' },
},
required: ['operation', 'team'],
required: ['operation'],
},
};
@@ -202,12 +296,12 @@ export function getLogDirWithFallback(sessionId: string): string {
return join(root, '.workflow', '.team-msg', sessionId);
}
function getLogPath(team: string): string {
return join(getLogDir(team), 'messages.jsonl');
function getLogPath(teamId: string): string {
return join(getLogDir(teamId), 'messages.jsonl');
}
function ensureLogFile(team: string): string {
const logPath = getLogPath(team);
function ensureLogFile(teamId: string): string {
const logPath = getLogPath(teamId);
const dir = dirname(logPath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
@@ -218,8 +312,8 @@ function ensureLogFile(team: string): string {
return logPath;
}
export function readAllMessages(team: string): TeamMessage[] {
const logPath = getLogPath(team);
export function readAllMessages(teamId: string): TeamMessage[] {
const logPath = getLogPath(teamId);
if (!existsSync(logPath)) return [];
const content = readFileSync(logPath, 'utf-8').trim();
@@ -248,54 +342,81 @@ function nowISO(): string {
// --- Operations ---
function opLog(params: Params): ToolResult {
function opLog(params: Params, teamId: string): ToolResult {
if (!params.from) return { success: false, error: 'log requires "from"' };
if (!params.to) return { success: false, error: 'log requires "to"' };
if (!params.summary) return { success: false, error: 'log requires "summary"' };
const logPath = ensureLogFile(params.team);
const messages = readAllMessages(params.team);
// Default to "coordinator" if to is not specified
const to = params.to || 'coordinator';
// Backward compat: merge legacy ref into data.ref
if (params.ref) {
if (!params.data) params.data = {};
if (!params.data.ref) params.data.ref = params.ref;
}
// Auto-generate summary if not provided
const summary = params.summary || `[${params.from}] ${params.type || 'message'}${to}`;
const logPath = ensureLogFile(teamId);
const messages = readAllMessages(teamId);
const id = getNextId(messages);
const msg: TeamMessage = {
id,
ts: nowISO(),
from: params.from,
to: params.to,
to,
type: params.type || 'message',
summary: params.summary,
summary,
};
if (params.ref) msg.ref = params.ref;
if (params.data) msg.data = params.data;
appendFileSync(logPath, JSON.stringify(msg) + '\n', 'utf-8');
// Update meta with session_id if provided
if (params.session_id) {
const meta = getEffectiveTeamMeta(params.team);
meta.session_id = params.session_id;
// Auto-sync state_update to meta.json
if (params.type === 'state_update' && params.data) {
const meta = getEffectiveTeamMeta(teamId);
// Role state: store under role_state namespace
if (params.from) {
if (!meta.role_state) meta.role_state = {};
meta.role_state[params.from] = {
...meta.role_state[params.from],
...params.data,
_updated_at: nowISO(),
};
}
// Promote top-level keys directly to meta
const topLevelKeys = ['pipeline_mode', 'pipeline_stages', 'team_name', 'task_description', 'roles'] as const;
for (const key of topLevelKeys) {
if (params.data[key] !== undefined) {
(meta as any)[key] = params.data[key];
}
}
meta.updated_at = nowISO();
writeTeamMeta(params.team, meta);
writeTeamMeta(teamId, meta);
}
return { success: true, result: { id, message: `Logged ${id}: [${msg.from}${msg.to}] ${msg.summary}` } };
}
function opRead(params: Params): ToolResult {
function opRead(params: Params, teamId: string): ToolResult {
if (!params.id) return { success: false, error: 'read requires "id"' };
const messages = readAllMessages(params.team);
const messages = readAllMessages(teamId);
const msg = messages.find(m => m.id === params.id);
if (!msg) {
return { success: false, error: `Message ${params.id} not found in team "${params.team}"` };
return { success: false, error: `Message ${params.id} not found in team "${teamId}"` };
}
return { success: true, result: msg };
}
function opList(params: Params): ToolResult {
let messages = readAllMessages(params.team);
function opList(params: Params, teamId: string): ToolResult {
let messages = readAllMessages(teamId);
// Apply filters
if (params.from) messages = messages.filter(m => m.from === params.from);
@@ -319,8 +440,8 @@ function opList(params: Params): ToolResult {
};
}
function opStatus(params: Params): ToolResult {
const messages = readAllMessages(params.team);
function opStatus(params: Params, teamId: string): ToolResult {
const messages = readAllMessages(teamId);
if (messages.length === 0) {
return { success: true, result: { members: [], summary: 'No messages recorded yet.' } };
@@ -357,35 +478,57 @@ function opStatus(params: Params): ToolResult {
};
}
function opDelete(params: Params): ToolResult {
function opDelete(params: Params, teamId: string): ToolResult {
if (!params.id) return { success: false, error: 'delete requires "id"' };
const messages = readAllMessages(params.team);
const messages = readAllMessages(teamId);
const idx = messages.findIndex(m => m.id === params.id);
if (idx === -1) {
return { success: false, error: `Message ${params.id} not found in team "${params.team}"` };
return { success: false, error: `Message ${params.id} not found in team "${teamId}"` };
}
const removed = messages.splice(idx, 1)[0];
const logPath = ensureLogFile(params.team);
const logPath = ensureLogFile(teamId);
writeFileSync(logPath, messages.map(m => JSON.stringify(m)).join('\n') + (messages.length > 0 ? '\n' : ''), 'utf-8');
return { success: true, result: { deleted: removed.id, message: `Deleted ${removed.id}: [${removed.from}${removed.to}] ${removed.summary}` } };
}
function opClear(params: Params): ToolResult {
const logPath = getLogPath(params.team);
const dir = getLogDir(params.team);
function opBroadcast(params: Params, teamId: string): ToolResult {
if (!params.from) return { success: false, error: 'broadcast requires "from"' };
if (!existsSync(logPath)) {
return { success: true, result: { message: `Team "${params.team}" has no messages to clear.` } };
// Delegate to opLog with to="all"
return opLog({ ...params, operation: 'log', to: 'all' }, teamId);
}
function opGetState(params: Params, teamId: string): ToolResult {
const meta = getEffectiveTeamMeta(teamId);
const roleState = meta.role_state || {};
if (params.role) {
const state = roleState[params.role];
if (!state) {
return { success: true, result: { role: params.role, state: null, message: `No state found for role "${params.role}"` } };
}
return { success: true, result: { role: params.role, state } };
}
const count = readAllMessages(params.team).length;
return { success: true, result: { role_state: roleState } };
}
function opClear(params: Params, teamId: string): ToolResult {
const logPath = getLogPath(teamId);
const dir = getLogDir(teamId);
if (!existsSync(logPath)) {
return { success: true, result: { message: `Team "${teamId}" has no messages to clear.` } };
}
const count = readAllMessages(teamId).length;
rmSync(dir, { recursive: true, force: true });
return { success: true, result: { cleared: count, message: `Cleared ${count} messages for team "${params.team}".` } };
return { success: true, result: { cleared: count, message: `Cleared ${count} messages for team "${teamId}".` } };
}
// --- Handler ---
@@ -398,13 +541,21 @@ export async function handler(params: Record<string, unknown>): Promise<ToolResu
const p = parsed.data;
// Resolve team ID from team_session_id / team / session_id (backward compat)
const teamId = resolveTeamId(p);
if (!teamId) {
return { success: false, error: 'Missing required parameter: team_session_id (or legacy "team")' };
}
switch (p.operation) {
case 'log': return opLog(p);
case 'read': return opRead(p);
case 'list': return opList(p);
case 'status': return opStatus(p);
case 'delete': return opDelete(p);
case 'clear': return opClear(p);
case 'log': return opLog(p, teamId);
case 'read': return opRead(p, teamId);
case 'list': return opList(p, teamId);
case 'status': return opStatus(p, teamId);
case 'delete': return opDelete(p, teamId);
case 'clear': return opClear(p, teamId);
case 'broadcast': return opBroadcast(p, teamId);
case 'get_state': return opGetState(p, teamId);
default:
return { success: false, error: `Unknown operation: ${p.operation}` };
}