refactor: optimize useWebSocket hook by consolidating store references and improving handler stability

This commit is contained in:
catlog22
2026-02-04 17:29:30 +08:00
parent e260a3f77b
commit 346c87a706
2 changed files with 341 additions and 183 deletions

View File

@@ -1,7 +1,7 @@
---
name: review-cycle-fix
description: Automated fixing of code review findings with AI-powered planning and coordinated execution. Uses intelligent grouping, multi-stage timeline coordination, and test-driven verification.
argument-hint: "<export-file|review-dir> [--resume] [--max-iterations=N]"
argument-hint: "<export-file|review-dir> [--resume] [--max-iterations=N] [--batch-size=N]"
allowed-tools: Skill(*), TodoWrite(*), Read(*), Bash(*), Task(*), Edit(*), Write(*)
---
@@ -21,37 +21,45 @@ allowed-tools: Skill(*), TodoWrite(*), Read(*), Bash(*), Task(*), Edit(*), Write
# Custom max retry attempts per finding
/workflow:review-cycle-fix .workflow/active/WFS-123/.review/ --max-iterations=5
# Custom batch size for parallel planning (default: 5 findings per batch)
/workflow:review-cycle-fix .workflow/active/WFS-123/.review/ --batch-size=3
```
**Fix Source**: Exported findings from review cycle dashboard
**Output Directory**: `{review-dir}/fixes/{fix-session-id}/` (within session .review/)
**Default Max Iterations**: 3 (per finding, adjustable)
**Default Batch Size**: 5 (findings per planning batch, adjustable)
**Max Parallel Agents**: 10 (concurrent planning agents)
**CLI Tools**: @cli-planning-agent (planning), @cli-execute-agent (fixing)
## What & Why
### Core Concept
Automated fix orchestrator with **two-phase architecture**: AI-powered planning followed by coordinated parallel/serial execution. Generates fix timeline with intelligent grouping and dependency analysis, then executes fixes with conservative test verification.
Automated fix orchestrator with **parallel planning architecture**: Multiple AI agents analyze findings concurrently in batches, then coordinate parallel/serial execution. Generates fix timeline with intelligent grouping and dependency analysis, executes fixes with conservative test verification.
**Fix Process**:
- **Planning Phase**: AI analyzes findings, generates fix plan with grouping and execution strategy
- **Execution Phase**: Main orchestrator coordinates agents per timeline stages
- **Batching Phase (1.5)**: Orchestrator groups findings by file+dimension similarity, creates batches
- **Planning Phase (2)**: Up to 10 agents plan batches in parallel, generate partial plans, orchestrator aggregates
- **Execution Phase (3)**: Main orchestrator coordinates agents per aggregated timeline stages
- **Parallel Efficiency**: Customizable batch size (default: 5), MAX_PARALLEL=10 agents
- **No rigid structure**: Adapts to task requirements, not bound to fixed JSON format
**vs Manual Fixing**:
- **Manual**: Developer reviews findings one-by-one, fixes sequentially
- **Automated**: AI groups related issues, executes in optimal parallel/serial order with automatic test verification
- **Automated**: AI groups related issues, multiple agents plan in parallel, executes in optimal parallel/serial order with automatic test verification
### Value Proposition
1. **Intelligent Planning**: AI-powered analysis identifies optimal grouping and execution strategy
2. **Multi-stage Coordination**: Supports complex parallel + serial execution with dependency management
3. **Conservative Safety**: Mandatory test verification with automatic rollback on failure
4. **Resume Support**: Checkpoint-based recovery for interrupted sessions
1. **Parallel Planning**: Multiple agents analyze findings concurrently, reducing planning time for large batches (10+ findings)
2. **Intelligent Batching**: Semantic similarity grouping ensures related findings are analyzed together
3. **Multi-stage Coordination**: Supports complex parallel + serial execution with cross-batch dependency management
4. **Conservative Safety**: Mandatory test verification with automatic rollback on failure
5. **Resume Support**: Checkpoint-based recovery for interrupted sessions
### Orchestrator Boundary (CRITICAL)
- **ONLY command** for automated review finding fixes
- Manages: Planning phase coordination, stage-based execution, agent scheduling, progress tracking
- Delegates: Fix planning to @cli-planning-agent, fix execution to @cli-execute-agent
- Manages: Intelligent batching (Phase 1.5), parallel planning coordination (launch N agents), plan aggregation, stage-based execution, agent scheduling, progress tracking
- Delegates: Batch planning to @cli-planning-agent, fix execution to @cli-execute-agent
### Execution Flow
@@ -60,12 +68,22 @@ Automated fix orchestrator with **two-phase architecture**: AI-powered planning
Phase 1: Discovery & Initialization
└─ Validate export file, create fix session structure, initialize state files
Phase 2: Planning Coordination (@cli-planning-agent)
├─ Analyze findings for patterns and dependencies
├─ Group by file + dimension + root cause similarity
├─ Determine execution strategy (parallel/serial/hybrid)
Generate fix timeline with stages
└─ Output: fix-plan.json
Phase 1.5: Intelligent Grouping & Batching
├─ Analyze findings metadata (file, dimension, severity)
├─ Group by semantic similarity (file proximity + dimension affinity)
├─ Create batches respecting --batch-size (default: 5)
Output: Finding batches for parallel planning
Phase 2: Parallel Planning Coordination (@cli-planning-agent × N)
├─ Launch MAX_PARALLEL planning agents concurrently (default: 10)
├─ Each agent processes one batch:
│ ├─ Analyze findings for patterns and dependencies
│ ├─ Group by file + dimension + root cause similarity
│ ├─ Determine execution strategy (parallel/serial/hybrid)
│ ├─ Generate fix timeline with stages
│ └─ Output: partial-plan-{batch-id}.json
├─ Collect results from all agents
└─ Aggregate: Merge partial plans → fix-plan.json (resolve cross-batch dependencies)
Phase 3: Execution Orchestration (Stage-based)
For each timeline stage:
@@ -91,25 +109,29 @@ Phase 5: Session Completion (Optional)
| Agent | Responsibility |
|-------|---------------|
| **Orchestrator** | Input validation, session management, planning coordination, stage-based execution scheduling, progress tracking, aggregation |
| **@cli-planning-agent** | Findings analysis, intelligent grouping (file+dimension+root cause), execution strategy determination (parallel/serial/hybrid), timeline generation with dependency mapping |
| **Orchestrator** | Input validation, session management, intelligent batching (Phase 1.5), parallel planning coordination (launch N agents), plan aggregation (merge partial plans, resolve cross-batch dependencies), stage-based execution scheduling, progress tracking, result aggregation |
| **@cli-planning-agent** | Batch findings analysis, intelligent grouping (file+dimension+root cause), execution strategy determination (parallel/serial/hybrid), timeline generation with dependency mapping, partial plan output |
| **@cli-execute-agent** | Fix execution per group, code context analysis, Edit tool operations, test verification, git rollback on failure, completion JSON generation |
## Enhanced Features
### 1. Two-Phase Architecture
### 1. Parallel Planning Architecture
**Phase Separation**:
**Batch Processing Strategy**:
| Phase | Agent | Output | Purpose |
|-------|-------|--------|---------|
| **Planning** | @cli-planning-agent | fix-plan.json | Analyze findings, group intelligently, determine optimal execution strategy |
| **Execution** | @cli-execute-agent | completions/*.json | Execute fixes per plan with test verification and rollback |
| Phase | Agent Count | Input | Output | Purpose |
|-------|-------------|-------|--------|---------|
| **Batching (1.5)** | Orchestrator | All findings | Finding batches | Semantic grouping by file+dimension, respecting --batch-size |
| **Planning (2)** | N agents (≤10) | 1 batch each | partial-plan-{batch-id}.json | Analyze batch in parallel, generate execution groups and timeline |
| **Aggregation (2)** | Orchestrator | All partial plans | fix-plan.json | Merge timelines, resolve cross-batch dependencies |
| **Execution (3)** | M agents (dynamic) | 1 group each | fix-progress-{N}.json | Execute fixes per aggregated plan with test verification |
**Benefits**:
- Clear separation of concerns (analysis vs execution)
- Reusable plans (can re-execute without re-planning)
- Better error isolation (planning failures vs execution failures)
- **Speed**: N agents plan concurrently, reducing planning time for large batches
- **Scalability**: MAX_PARALLEL=10 prevents resource exhaustion
- **Flexibility**: Batch size customizable via --batch-size (default: 5)
- **Isolation**: Each planning agent focuses on related findings (semantic grouping)
- **Reusable**: Aggregated plan can be re-executed without re-planning
### 2. Intelligent Grouping Strategy
@@ -197,12 +219,30 @@ if (result.passRate < 100%) {
- Session creation: Generate fix-session-id (`fix-{timestamp}`)
- Directory structure: Create `{review-dir}/fixes/{fix-session-id}/` with subdirectories
- State files: Initialize active-fix-session.json (session marker)
- TodoWrite initialization: Set up 4-phase tracking
- TodoWrite initialization: Set up 5-phase tracking (including Phase 1.5)
**Phase 2: Planning Coordination**
- Launch @cli-planning-agent with findings data and project context
- Validate fix-plan.json output (schema conformance, includes metadata with session status)
- Load plan into memory for execution phase
**Phase 1.5: Intelligent Grouping & Batching**
- Load all findings metadata (id, file, dimension, severity, title)
- Semantic similarity analysis:
- Primary: Group by file proximity (same file or related modules)
- Secondary: Group by dimension affinity (same review dimension)
- Tertiary: Analyze title/description similarity (root cause clustering)
- Create batches respecting --batch-size (default: 5 findings per batch)
- Balance workload: Distribute high-severity findings across batches
- Output: Array of finding batches for parallel planning
**Phase 2: Parallel Planning Coordination**
- Determine concurrency: MIN(batch_count, MAX_PARALLEL=10)
- For each batch chunk (≤10 batches):
- Launch all agents in parallel with run_in_background=true
- Pass batch findings + project context + batch_id to each agent
- Each agent outputs: partial-plan-{batch-id}.json
- Collect results via TaskOutput (blocking until all complete)
- Aggregate partial plans:
- Merge execution groups (renumber group_ids sequentially: G1, G2, ...)
- Merge timelines (detect cross-batch dependencies, adjust stages)
- Resolve conflicts (same file in multiple batches → serialize)
- Generate final fix-plan.json with aggregated metadata
- TodoWrite update: Mark planning complete, start execution
**Phase 3: Execution Orchestration**
@@ -236,7 +276,10 @@ if (result.passRate < 100%) {
.workflow/active/WFS-{session-id}/.review/
├── fix-export-{timestamp}.json # Exported findings (input)
└── fixes/{fix-session-id}/
├── fix-plan.json # Planning agent output (execution plan with metadata)
├── partial-plan-1.json # Batch 1 partial plan (planning agent 1 output)
├── partial-plan-2.json # Batch 2 partial plan (planning agent 2 output)
├── partial-plan-N.json # Batch N partial plan (planning agent N output)
├── fix-plan.json # Aggregated execution plan (orchestrator merges partials)
├── fix-progress-1.json # Group 1 progress (planning agent init → agent updates)
├── fix-progress-2.json # Group 2 progress (planning agent init → agent updates)
├── fix-progress-3.json # Group 3 progress (planning agent init → agent updates)
@@ -246,28 +289,126 @@ if (result.passRate < 100%) {
```
**File Producers**:
- **Planning Agent**: `fix-plan.json` (with metadata), all `fix-progress-*.json` (initial state)
- **Execution Agents**: Update assigned `fix-progress-{N}.json` in real-time
- **Orchestrator**: Batches findings (Phase 1.5), aggregates partial plans → `fix-plan.json` (Phase 2), launches parallel planning agents
- **Planning Agents (N)**: Each outputs `partial-plan-{batch-id}.json` + initializes `fix-progress-*.json` for assigned groups
- **Execution Agents (M)**: Update assigned `fix-progress-{N}.json` in real-time
### Agent Invocation Template
**Planning Agent**:
**Phase 1.5: Intelligent Batching** (Orchestrator):
```javascript
// Load findings
const findings = JSON.parse(Read(exportFile));
const batchSize = flags.batchSize || 5;
// Semantic similarity analysis: group by file+dimension
const batches = [];
const grouped = new Map(); // key: "${file}:${dimension}"
for (const finding of findings) {
const key = `${finding.file || 'unknown'}:${finding.dimension || 'general'}`;
if (!grouped.has(key)) grouped.set(key, []);
grouped.get(key).push(finding);
}
// Create batches respecting batchSize
for (const [key, group] of grouped) {
while (group.length > 0) {
const batch = group.splice(0, batchSize);
batches.push({
batch_id: batches.length + 1,
findings: batch,
metadata: { primary_file: batch[0].file, primary_dimension: batch[0].dimension }
});
}
}
console.log(`Created ${batches.length} batches (${batchSize} findings per batch)`);
```
**Phase 2: Parallel Planning** (Orchestrator launches N agents):
```javascript
const MAX_PARALLEL = 10;
const partialPlans = [];
// Process batches in chunks of MAX_PARALLEL
for (let i = 0; i < batches.length; i += MAX_PARALLEL) {
const chunk = batches.slice(i, i + MAX_PARALLEL);
const taskIds = [];
// Launch agents in parallel (run_in_background=true)
for (const batch of chunk) {
const taskId = Task({
subagent_type: "cli-planning-agent",
run_in_background: true,
description: `Plan batch ${batch.batch_id}: ${batch.findings.length} findings`,
prompt: planningPrompt(batch) // See Planning Agent template below
});
taskIds.push({ taskId, batch });
}
console.log(`Launched ${taskIds.length} planning agents...`);
// Collect results from this chunk (blocking)
for (const { taskId, batch } of taskIds) {
const result = TaskOutput({ task_id: taskId, block: true });
const partialPlan = JSON.parse(Read(`${sessionDir}/partial-plan-${batch.batch_id}.json`));
partialPlans.push(partialPlan);
updateTodo(`Batch ${batch.batch_id}`, 'completed');
}
}
// Aggregate partial plans → fix-plan.json
let groupCounter = 1;
const groupIdMap = new Map();
for (const partial of partialPlans) {
for (const group of partial.groups) {
const newGroupId = `G${groupCounter}`;
groupIdMap.set(`${partial.batch_id}:${group.group_id}`, newGroupId);
aggregatedPlan.groups.push({ ...group, group_id: newGroupId, progress_file: `fix-progress-${groupCounter}.json` });
groupCounter++;
}
}
// Merge timelines, resolve cross-batch conflicts (shared files → serialize)
let stageCounter = 1;
for (const partial of partialPlans) {
for (const stage of partial.timeline) {
aggregatedPlan.timeline.push({
...stage, stage_id: stageCounter,
groups: stage.groups.map(gid => groupIdMap.get(`${partial.batch_id}:${gid}`))
});
stageCounter++;
}
}
// Write aggregated plan + initialize progress files
Write(`${sessionDir}/fix-plan.json`, JSON.stringify(aggregatedPlan, null, 2));
for (let i = 1; i <= aggregatedPlan.groups.length; i++) {
Write(`${sessionDir}/fix-progress-${i}.json`, JSON.stringify(initProgressFile(aggregatedPlan.groups[i-1]), null, 2));
}
```
**Planning Agent (Batch Mode - Partial Plan Only)**:
```javascript
Task({
subagent_type: "cli-planning-agent",
description: `Generate fix plan and initialize progress files for ${findings.length} findings`,
run_in_background: true,
description: `Plan batch ${batch.batch_id}: ${batch.findings.length} findings`,
prompt: `
## Task Objective
Analyze ${findings.length} code review findings and generate execution plan with intelligent grouping and timeline coordination.
Analyze code review findings in batch ${batch.batch_id} and generate **partial** execution plan.
## Input Data
Review Session: ${reviewId}
Fix Session ID: ${fixSessionId}
Total Findings: ${findings.length}
Batch ID: ${batch.batch_id}
Batch Findings: ${batch.findings.length}
Findings:
${JSON.stringify(findings, null, 2)}
${JSON.stringify(batch.findings, null, 2)}
Project Context:
- Structure: ${projectStructure}
@@ -276,31 +417,23 @@ Project Context:
## Output Requirements
### 1. fix-plan.json
Execute: cat ~/.claude/workflows/cli-templates/fix-plan-template.json
Generate execution plan following template structure:
### 1. partial-plan-${batch.batch_id}.json
Generate partial execution plan with structure:
{
"batch_id": ${batch.batch_id},
"groups": [...], // Groups created from batch findings (use local IDs: G1, G2, ...)
"timeline": [...], // Local timeline for this batch only
"metadata": {
"findings_count": ${batch.findings.length},
"groups_count": N,
"created_at": "ISO-8601-timestamp"
}
}
**Key Generation Rules**:
- **Metadata**: Populate fix_session_id, review_session_id, status ("planning"), created_at, started_at timestamps
- **Execution Strategy**: Choose approach (parallel/serial/hybrid) based on dependency analysis, set parallel_limit and stages count
- **Groups**: Create groups (G1, G2, ...) with intelligent grouping (see Analysis Requirements below), assign progress files (fix-progress-1.json, ...), populate fix_strategy with approach/complexity/test_pattern, assess risks, identify dependencies
- **Timeline**: Define stages respecting dependencies, set execution_mode per stage, map groups to stages, calculate critical path
### 2. fix-progress-{N}.json (one per group)
Execute: cat ~/.claude/workflows/cli-templates/fix-progress-template.json
For each group (G1, G2, G3, ...), generate fix-progress-{N}.json following template structure:
**Initial State Requirements**:
- Status: "pending", phase: "waiting"
- Timestamps: Set last_update to now, others null
- Findings: Populate from review findings with status "pending", all operation fields null
- Summary: Initialize all counters to zero
- Flow control: Empty implementation_approach array
- Errors: Empty array
**CRITICAL**: Ensure complete template structure - all fields must be present.
- **Groups**: Create groups with local IDs (G1, G2, ...) using intelligent grouping (file+dimension+root cause)
- **Timeline**: Define stages for this batch only (local dependencies within batch)
- **Progress Files**: DO NOT generate fix-progress-*.json here (orchestrator handles after aggregation)
## Analysis Requirements
@@ -318,7 +451,7 @@ Group findings using these criteria (in priority order):
- Consider test isolation (different test suites → different groups)
- Balance workload across groups for parallel execution
### Execution Strategy Determination
### Execution Strategy Determination (Local Only)
**Parallel Mode**: Use when groups are independent, no shared files
**Serial Mode**: Use when groups have dependencies or shared resources
@@ -346,21 +479,16 @@ For each group, determine:
## Output Files
Write to ${sessionDir}:
- ./fix-plan.json
- ./fix-progress-1.json
- ./fix-progress-2.json
- ./fix-progress-{N}.json (as many as groups created)
- ./partial-plan-${batch.batch_id}.json
## Quality Checklist
Before finalizing outputs:
- ✅ All findings assigned to exactly one group
- ✅ Group dependencies correctly identified
- ✅ Timeline stages respect dependencies
- ✅ All progress files have complete initial structure
- ✅ All batch findings assigned to exactly one group
- ✅ Group dependencies (within batch) correctly identified
- ✅ Timeline stages respect local dependencies
- ✅ Test patterns are valid and specific
- ✅ Risk assessments are realistic
- ✅ Estimated times are reasonable
`
})
```

View File

@@ -40,36 +40,74 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const reconnectDelayRef = useRef(RECONNECT_DELAY_BASE);
const mountedRef = useRef(true);
// Notification store for connection status
const setWsStatus = useNotificationStore((state) => state.setWsStatus);
const setWsLastMessage = useNotificationStore((state) => state.setWsLastMessage);
const incrementReconnectAttempts = useNotificationStore((state) => state.incrementReconnectAttempts);
const resetReconnectAttempts = useNotificationStore((state) => state.resetReconnectAttempts);
const addA2UINotification = useNotificationStore((state) => state.addA2UINotification);
// Store refs to prevent handler recreation - use useRef to keep stable references
const storeRefs = useRef({
// Notification store
setWsStatus: useNotificationStore((state) => state.setWsStatus),
setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage),
incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts),
resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts),
addA2UINotification: useNotificationStore((state) => state.addA2UINotification),
// Execution store for state updates
const setExecutionStatus = useExecutionStore((state) => state.setExecutionStatus);
const setNodeStarted = useExecutionStore((state) => state.setNodeStarted);
const setNodeCompleted = useExecutionStore((state) => state.setNodeCompleted);
const setNodeFailed = useExecutionStore((state) => state.setNodeFailed);
const addLog = useExecutionStore((state) => state.addLog);
const completeExecution = useExecutionStore((state) => state.completeExecution);
const currentExecution = useExecutionStore((state) => state.currentExecution);
// Execution store
setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus),
setNodeStarted: useExecutionStore((state) => state.setNodeStarted),
setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted),
setNodeFailed: useExecutionStore((state) => state.setNodeFailed),
addLog: useExecutionStore((state) => state.addLog),
completeExecution: useExecutionStore((state) => state.completeExecution),
currentExecution: useExecutionStore((state) => state.currentExecution),
// Flow store for node status updates on canvas
const updateNode = useFlowStore((state) => state.updateNode);
// Flow store
updateNode: useFlowStore((state) => state.updateNode),
// CLI stream store for CLI output handling
const addOutput = useCliStreamStore((state) => state.addOutput);
// CLI stream store
addOutput: useCliStreamStore((state) => state.addOutput),
// Coordinator store for coordinator state updates
const updateNodeStatus = useCoordinatorStore((state) => state.updateNodeStatus);
const addCoordinatorLog = useCoordinatorStore((state) => state.addLog);
const setActiveQuestion = useCoordinatorStore((state) => state.setActiveQuestion);
const markExecutionComplete = useCoordinatorStore((state) => state.markExecutionComplete);
const coordinatorExecutionId = useCoordinatorStore((state) => state.currentExecutionId);
// Coordinator store
updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus),
addCoordinatorLog: useCoordinatorStore((state) => state.addLog),
setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion),
markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete),
coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId),
});
// Update refs periodically to ensure they have fresh store references
useEffect(() => {
storeRefs.current = {
// Notification store
setWsStatus: useNotificationStore((state) => state.setWsStatus),
setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage),
incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts),
resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts),
addA2UINotification: useNotificationStore((state) => state.addA2UINotification),
// Execution store
setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus),
setNodeStarted: useExecutionStore((state) => state.setNodeStarted),
setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted),
setNodeFailed: useExecutionStore((state) => state.setNodeFailed),
addLog: useExecutionStore((state) => state.addLog),
completeExecution: useExecutionStore((state) => state.completeExecution),
currentExecution: useExecutionStore((state) => state.currentExecution),
// Flow store
updateNode: useFlowStore((state) => state.updateNode),
// CLI stream store
addOutput: useCliStreamStore((state) => state.addOutput),
// Coordinator store
updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus),
addCoordinatorLog: useCoordinatorStore((state) => state.addLog),
setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion),
markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete),
coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId),
};
}); // Run on every render to keep refs fresh
// Handle incoming WebSocket messages
// Note: Using refs via storeRefs to prevent handler recreation on every store change
const handleMessage = useCallback(
(event: MessageEvent) => {
// Guard against state updates after unmount
@@ -81,7 +119,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const data = JSON.parse(event.data);
// Store last message for debugging
setWsLastMessage(data);
storeRefs.current.setWsLastMessage(data);
// Handle CLI messages
if (data.type?.startsWith('CLI_')) {
@@ -90,7 +128,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const { executionId, tool, mode, timestamp } = data.payload;
// Add system message for CLI start
addOutput(executionId, {
storeRefs.current.addOutput(executionId, {
type: 'system',
content: `[${new Date(timestamp).toLocaleTimeString()}] CLI execution started: ${tool} (${mode || 'default'} mode)`,
timestamp: Date.now(),
@@ -119,7 +157,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
lines.forEach((line: string) => {
// Add non-empty lines, or single line if that's all we have
if (line.trim() || lines.length === 1) {
addOutput(executionId, {
storeRefs.current.addOutput(executionId, {
type: unitType as any,
content: line,
timestamp: Date.now(),
@@ -135,7 +173,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const statusText = success ? 'completed successfully' : 'failed';
const durationText = duration ? ` (${duration}ms)` : '';
addOutput(executionId, {
storeRefs.current.addOutput(executionId, {
type: 'system',
content: `[${new Date().toLocaleTimeString()}] CLI execution ${statusText}${durationText}`,
timestamp: Date.now(),
@@ -150,7 +188,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
if (data.type === 'a2ui-surface') {
const parsed = SurfaceUpdateSchema.safeParse(data.payload);
if (parsed.success) {
addA2UINotification(parsed.data, 'Interactive UI');
storeRefs.current.addA2UINotification(parsed.data, 'Interactive UI');
} else {
console.warn('[WebSocket] Invalid A2UI surface:', parsed.error.issues);
}
@@ -159,6 +197,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
// Handle Coordinator messages
if (data.type?.startsWith('COORDINATOR_')) {
const { coordinatorExecutionId } = storeRefs.current;
// Only process messages for current coordinator execution
if (coordinatorExecutionId && data.executionId !== coordinatorExecutionId) {
return;
@@ -169,26 +208,26 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
case 'COORDINATOR_STATE_UPDATE':
// Check for completion
if (data.status === 'completed') {
markExecutionComplete(true);
storeRefs.current.markExecutionComplete(true);
} else if (data.status === 'failed') {
markExecutionComplete(false);
storeRefs.current.markExecutionComplete(false);
}
break;
case 'COORDINATOR_COMMAND_STARTED':
updateNodeStatus(data.nodeId, 'running');
storeRefs.current.updateNodeStatus(data.nodeId, 'running');
break;
case 'COORDINATOR_COMMAND_COMPLETED':
updateNodeStatus(data.nodeId, 'completed', data.result);
storeRefs.current.updateNodeStatus(data.nodeId, 'completed', data.result);
break;
case 'COORDINATOR_COMMAND_FAILED':
updateNodeStatus(data.nodeId, 'failed', undefined, data.error);
storeRefs.current.updateNodeStatus(data.nodeId, 'failed', undefined, data.error);
break;
case 'COORDINATOR_LOG_ENTRY':
addCoordinatorLog(
storeRefs.current.addCoordinatorLog(
data.log.message,
data.log.level,
data.log.nodeId,
@@ -197,7 +236,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
break;
case 'COORDINATOR_QUESTION_ASKED':
setActiveQuestion(data.question);
storeRefs.current.setActiveQuestion(data.question);
break;
case 'COORDINATOR_ANSWER_RECEIVED':
@@ -223,6 +262,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const message = parsed.data as OrchestratorWebSocketMessage;
// Only process messages for current execution
const { currentExecution } = storeRefs.current;
if (currentExecution && message.execId !== currentExecution.execId) {
return;
}
@@ -230,39 +270,39 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
// Dispatch to execution store based on message type
switch (message.type) {
case 'ORCHESTRATOR_STATE_UPDATE':
setExecutionStatus(message.status, message.currentNodeId);
storeRefs.current.setExecutionStatus(message.status, message.currentNodeId);
// Check for completion
if (message.status === 'completed' || message.status === 'failed') {
completeExecution(message.status);
storeRefs.current.completeExecution(message.status);
}
break;
case 'ORCHESTRATOR_NODE_STARTED':
setNodeStarted(message.nodeId);
storeRefs.current.setNodeStarted(message.nodeId);
// Update canvas node status
updateNode(message.nodeId, { executionStatus: 'running' });
storeRefs.current.updateNode(message.nodeId, { executionStatus: 'running' });
break;
case 'ORCHESTRATOR_NODE_COMPLETED':
setNodeCompleted(message.nodeId, message.result);
storeRefs.current.setNodeCompleted(message.nodeId, message.result);
// Update canvas node status
updateNode(message.nodeId, {
storeRefs.current.updateNode(message.nodeId, {
executionStatus: 'completed',
executionResult: message.result,
});
break;
case 'ORCHESTRATOR_NODE_FAILED':
setNodeFailed(message.nodeId, message.error);
storeRefs.current.setNodeFailed(message.nodeId, message.error);
// Update canvas node status
updateNode(message.nodeId, {
storeRefs.current.updateNode(message.nodeId, {
executionStatus: 'failed',
executionError: message.error,
});
break;
case 'ORCHESTRATOR_LOG':
addLog(message.log as ExecutionLog);
storeRefs.current.addLog(message.log as ExecutionLog);
break;
}
@@ -272,69 +312,15 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
console.error('[WebSocket] Failed to parse message:', error);
}
},
[
currentExecution,
coordinatorExecutionId,
setWsLastMessage,
setExecutionStatus,
setNodeStarted,
setNodeCompleted,
setNodeFailed,
addLog,
completeExecution,
updateNode,
addOutput,
addA2UINotification,
updateNodeStatus,
addCoordinatorLog,
setActiveQuestion,
markExecutionComplete,
onMessage,
]
[onMessage] // Only dependency is onMessage, all other functions accessed via refs
);
// Connect to WebSocket
const connect = useCallback(() => {
if (!enabled) return;
// Construct WebSocket URL
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws`;
try {
setWsStatus('connecting');
const ws = new WebSocket(wsUrl);
wsRef.current = ws;
ws.onopen = () => {
console.log('[WebSocket] Connected');
setWsStatus('connected');
resetReconnectAttempts();
reconnectDelayRef.current = RECONNECT_DELAY_BASE;
};
ws.onmessage = handleMessage;
ws.onclose = () => {
console.log('[WebSocket] Disconnected');
setWsStatus('disconnected');
wsRef.current = null;
scheduleReconnect();
};
ws.onerror = (error) => {
console.error('[WebSocket] Error:', error);
setWsStatus('error');
};
} catch (error) {
console.error('[WebSocket] Failed to connect:', error);
setWsStatus('error');
scheduleReconnect();
}
}, [enabled, handleMessage, setWsStatus, resetReconnectAttempts]);
// Use ref to avoid circular dependency with scheduleReconnect
const connectRef = useRef<(() => void) | null>(null);
// Schedule reconnection with exponential backoff
// Define this first to avoid circular dependency
const scheduleReconnect = useCallback(() => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
@@ -343,11 +329,11 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const delay = reconnectDelayRef.current;
console.log(`[WebSocket] Reconnecting in ${delay}ms...`);
setWsStatus('reconnecting');
incrementReconnectAttempts();
storeRefs.current.setWsStatus('reconnecting');
storeRefs.current.incrementReconnectAttempts();
reconnectTimeoutRef.current = setTimeout(() => {
connect();
connectRef.current?.();
}, delay);
// Increase delay for next attempt (exponential backoff)
@@ -355,7 +341,50 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
reconnectDelayRef.current * RECONNECT_DELAY_MULTIPLIER,
RECONNECT_DELAY_MAX
);
}, [connect, setWsStatus, incrementReconnectAttempts]);
}, []); // No dependencies - uses connectRef
const connect = useCallback(() => {
if (!enabled) return;
// Construct WebSocket URL
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws`;
try {
storeRefs.current.setWsStatus('connecting');
const ws = new WebSocket(wsUrl);
wsRef.current = ws;
ws.onopen = () => {
console.log('[WebSocket] Connected');
storeRefs.current.setWsStatus('connected');
storeRefs.current.resetReconnectAttempts();
reconnectDelayRef.current = RECONNECT_DELAY_BASE;
};
ws.onmessage = handleMessage;
ws.onclose = () => {
console.log('[WebSocket] Disconnected');
storeRefs.current.setWsStatus('disconnected');
wsRef.current = null;
scheduleReconnect();
};
ws.onerror = (error) => {
console.error('[WebSocket] Error:', error);
storeRefs.current.setWsStatus('error');
};
} catch (error) {
console.error('[WebSocket] Failed to connect:', error);
storeRefs.current.setWsStatus('error');
scheduleReconnect();
}
}, [enabled, handleMessage, scheduleReconnect]);
// Update connect ref after connect is defined
connectRef.current = connect;
// Send message through WebSocket
const send = useCallback((message: unknown) => {
@@ -367,13 +396,14 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
}, []);
// Manual reconnect
// Use connectRef to avoid depending on connect
const reconnect = useCallback(() => {
if (wsRef.current) {
wsRef.current.close();
}
reconnectDelayRef.current = RECONNECT_DELAY_BASE;
connect();
}, [connect]);
connectRef.current?.();
}, []); // No dependencies - uses connectRef
// Check connection status
const isConnected = wsRef.current?.readyState === WebSocket.OPEN;