From 346c87a7065dd932e7ca8ca69cedfd2af174a274 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Wed, 4 Feb 2026 17:29:30 +0800 Subject: [PATCH] refactor: optimize useWebSocket hook by consolidating store references and improving handler stability --- .claude/commands/workflow/review-cycle-fix.md | 276 +++++++++++++----- ccw/frontend/src/hooks/useWebSocket.ts | 248 +++++++++------- 2 files changed, 341 insertions(+), 183 deletions(-) diff --git a/.claude/commands/workflow/review-cycle-fix.md b/.claude/commands/workflow/review-cycle-fix.md index 0d47aa6a..103a688d 100644 --- a/.claude/commands/workflow/review-cycle-fix.md +++ b/.claude/commands/workflow/review-cycle-fix.md @@ -1,7 +1,7 @@ --- name: review-cycle-fix description: Automated fixing of code review findings with AI-powered planning and coordinated execution. Uses intelligent grouping, multi-stage timeline coordination, and test-driven verification. -argument-hint: " [--resume] [--max-iterations=N]" +argument-hint: " [--resume] [--max-iterations=N] [--batch-size=N]" allowed-tools: Skill(*), TodoWrite(*), Read(*), Bash(*), Task(*), Edit(*), Write(*) --- @@ -21,37 +21,45 @@ allowed-tools: Skill(*), TodoWrite(*), Read(*), Bash(*), Task(*), Edit(*), Write # Custom max retry attempts per finding /workflow:review-cycle-fix .workflow/active/WFS-123/.review/ --max-iterations=5 + +# Custom batch size for parallel planning (default: 5 findings per batch) +/workflow:review-cycle-fix .workflow/active/WFS-123/.review/ --batch-size=3 ``` **Fix Source**: Exported findings from review cycle dashboard **Output Directory**: `{review-dir}/fixes/{fix-session-id}/` (within session .review/) **Default Max Iterations**: 3 (per finding, adjustable) +**Default Batch Size**: 5 (findings per planning batch, adjustable) +**Max Parallel Agents**: 10 (concurrent planning agents) **CLI Tools**: @cli-planning-agent (planning), @cli-execute-agent (fixing) ## What & Why ### Core Concept -Automated fix orchestrator with **two-phase architecture**: AI-powered planning followed by coordinated parallel/serial execution. Generates fix timeline with intelligent grouping and dependency analysis, then executes fixes with conservative test verification. +Automated fix orchestrator with **parallel planning architecture**: Multiple AI agents analyze findings concurrently in batches, then coordinate parallel/serial execution. Generates fix timeline with intelligent grouping and dependency analysis, executes fixes with conservative test verification. **Fix Process**: -- **Planning Phase**: AI analyzes findings, generates fix plan with grouping and execution strategy -- **Execution Phase**: Main orchestrator coordinates agents per timeline stages +- **Batching Phase (1.5)**: Orchestrator groups findings by file+dimension similarity, creates batches +- **Planning Phase (2)**: Up to 10 agents plan batches in parallel, generate partial plans, orchestrator aggregates +- **Execution Phase (3)**: Main orchestrator coordinates agents per aggregated timeline stages +- **Parallel Efficiency**: Customizable batch size (default: 5), MAX_PARALLEL=10 agents - **No rigid structure**: Adapts to task requirements, not bound to fixed JSON format **vs Manual Fixing**: - **Manual**: Developer reviews findings one-by-one, fixes sequentially -- **Automated**: AI groups related issues, executes in optimal parallel/serial order with automatic test verification +- **Automated**: AI groups related issues, multiple agents plan in parallel, executes in optimal parallel/serial order with automatic test verification ### Value Proposition -1. **Intelligent Planning**: AI-powered analysis identifies optimal grouping and execution strategy -2. **Multi-stage Coordination**: Supports complex parallel + serial execution with dependency management -3. **Conservative Safety**: Mandatory test verification with automatic rollback on failure -4. **Resume Support**: Checkpoint-based recovery for interrupted sessions +1. **Parallel Planning**: Multiple agents analyze findings concurrently, reducing planning time for large batches (10+ findings) +2. **Intelligent Batching**: Semantic similarity grouping ensures related findings are analyzed together +3. **Multi-stage Coordination**: Supports complex parallel + serial execution with cross-batch dependency management +4. **Conservative Safety**: Mandatory test verification with automatic rollback on failure +5. **Resume Support**: Checkpoint-based recovery for interrupted sessions ### Orchestrator Boundary (CRITICAL) - **ONLY command** for automated review finding fixes -- Manages: Planning phase coordination, stage-based execution, agent scheduling, progress tracking -- Delegates: Fix planning to @cli-planning-agent, fix execution to @cli-execute-agent +- Manages: Intelligent batching (Phase 1.5), parallel planning coordination (launch N agents), plan aggregation, stage-based execution, agent scheduling, progress tracking +- Delegates: Batch planning to @cli-planning-agent, fix execution to @cli-execute-agent ### Execution Flow @@ -60,12 +68,22 @@ Automated fix orchestrator with **two-phase architecture**: AI-powered planning Phase 1: Discovery & Initialization └─ Validate export file, create fix session structure, initialize state files -Phase 2: Planning Coordination (@cli-planning-agent) - ├─ Analyze findings for patterns and dependencies - ├─ Group by file + dimension + root cause similarity - ├─ Determine execution strategy (parallel/serial/hybrid) - ├─ Generate fix timeline with stages - └─ Output: fix-plan.json +Phase 1.5: Intelligent Grouping & Batching + ├─ Analyze findings metadata (file, dimension, severity) + ├─ Group by semantic similarity (file proximity + dimension affinity) + ├─ Create batches respecting --batch-size (default: 5) + └─ Output: Finding batches for parallel planning + +Phase 2: Parallel Planning Coordination (@cli-planning-agent × N) + ├─ Launch MAX_PARALLEL planning agents concurrently (default: 10) + ├─ Each agent processes one batch: + │ ├─ Analyze findings for patterns and dependencies + │ ├─ Group by file + dimension + root cause similarity + │ ├─ Determine execution strategy (parallel/serial/hybrid) + │ ├─ Generate fix timeline with stages + │ └─ Output: partial-plan-{batch-id}.json + ├─ Collect results from all agents + └─ Aggregate: Merge partial plans → fix-plan.json (resolve cross-batch dependencies) Phase 3: Execution Orchestration (Stage-based) For each timeline stage: @@ -91,25 +109,29 @@ Phase 5: Session Completion (Optional) | Agent | Responsibility | |-------|---------------| -| **Orchestrator** | Input validation, session management, planning coordination, stage-based execution scheduling, progress tracking, aggregation | -| **@cli-planning-agent** | Findings analysis, intelligent grouping (file+dimension+root cause), execution strategy determination (parallel/serial/hybrid), timeline generation with dependency mapping | +| **Orchestrator** | Input validation, session management, intelligent batching (Phase 1.5), parallel planning coordination (launch N agents), plan aggregation (merge partial plans, resolve cross-batch dependencies), stage-based execution scheduling, progress tracking, result aggregation | +| **@cli-planning-agent** | Batch findings analysis, intelligent grouping (file+dimension+root cause), execution strategy determination (parallel/serial/hybrid), timeline generation with dependency mapping, partial plan output | | **@cli-execute-agent** | Fix execution per group, code context analysis, Edit tool operations, test verification, git rollback on failure, completion JSON generation | ## Enhanced Features -### 1. Two-Phase Architecture +### 1. Parallel Planning Architecture -**Phase Separation**: +**Batch Processing Strategy**: -| Phase | Agent | Output | Purpose | -|-------|-------|--------|---------| -| **Planning** | @cli-planning-agent | fix-plan.json | Analyze findings, group intelligently, determine optimal execution strategy | -| **Execution** | @cli-execute-agent | completions/*.json | Execute fixes per plan with test verification and rollback | +| Phase | Agent Count | Input | Output | Purpose | +|-------|-------------|-------|--------|---------| +| **Batching (1.5)** | Orchestrator | All findings | Finding batches | Semantic grouping by file+dimension, respecting --batch-size | +| **Planning (2)** | N agents (≤10) | 1 batch each | partial-plan-{batch-id}.json | Analyze batch in parallel, generate execution groups and timeline | +| **Aggregation (2)** | Orchestrator | All partial plans | fix-plan.json | Merge timelines, resolve cross-batch dependencies | +| **Execution (3)** | M agents (dynamic) | 1 group each | fix-progress-{N}.json | Execute fixes per aggregated plan with test verification | **Benefits**: -- Clear separation of concerns (analysis vs execution) -- Reusable plans (can re-execute without re-planning) -- Better error isolation (planning failures vs execution failures) +- **Speed**: N agents plan concurrently, reducing planning time for large batches +- **Scalability**: MAX_PARALLEL=10 prevents resource exhaustion +- **Flexibility**: Batch size customizable via --batch-size (default: 5) +- **Isolation**: Each planning agent focuses on related findings (semantic grouping) +- **Reusable**: Aggregated plan can be re-executed without re-planning ### 2. Intelligent Grouping Strategy @@ -197,12 +219,30 @@ if (result.passRate < 100%) { - Session creation: Generate fix-session-id (`fix-{timestamp}`) - Directory structure: Create `{review-dir}/fixes/{fix-session-id}/` with subdirectories - State files: Initialize active-fix-session.json (session marker) -- TodoWrite initialization: Set up 4-phase tracking +- TodoWrite initialization: Set up 5-phase tracking (including Phase 1.5) -**Phase 2: Planning Coordination** -- Launch @cli-planning-agent with findings data and project context -- Validate fix-plan.json output (schema conformance, includes metadata with session status) -- Load plan into memory for execution phase +**Phase 1.5: Intelligent Grouping & Batching** +- Load all findings metadata (id, file, dimension, severity, title) +- Semantic similarity analysis: + - Primary: Group by file proximity (same file or related modules) + - Secondary: Group by dimension affinity (same review dimension) + - Tertiary: Analyze title/description similarity (root cause clustering) +- Create batches respecting --batch-size (default: 5 findings per batch) +- Balance workload: Distribute high-severity findings across batches +- Output: Array of finding batches for parallel planning + +**Phase 2: Parallel Planning Coordination** +- Determine concurrency: MIN(batch_count, MAX_PARALLEL=10) +- For each batch chunk (≤10 batches): + - Launch all agents in parallel with run_in_background=true + - Pass batch findings + project context + batch_id to each agent + - Each agent outputs: partial-plan-{batch-id}.json +- Collect results via TaskOutput (blocking until all complete) +- Aggregate partial plans: + - Merge execution groups (renumber group_ids sequentially: G1, G2, ...) + - Merge timelines (detect cross-batch dependencies, adjust stages) + - Resolve conflicts (same file in multiple batches → serialize) +- Generate final fix-plan.json with aggregated metadata - TodoWrite update: Mark planning complete, start execution **Phase 3: Execution Orchestration** @@ -236,7 +276,10 @@ if (result.passRate < 100%) { .workflow/active/WFS-{session-id}/.review/ ├── fix-export-{timestamp}.json # Exported findings (input) └── fixes/{fix-session-id}/ - ├── fix-plan.json # Planning agent output (execution plan with metadata) + ├── partial-plan-1.json # Batch 1 partial plan (planning agent 1 output) + ├── partial-plan-2.json # Batch 2 partial plan (planning agent 2 output) + ├── partial-plan-N.json # Batch N partial plan (planning agent N output) + ├── fix-plan.json # Aggregated execution plan (orchestrator merges partials) ├── fix-progress-1.json # Group 1 progress (planning agent init → agent updates) ├── fix-progress-2.json # Group 2 progress (planning agent init → agent updates) ├── fix-progress-3.json # Group 3 progress (planning agent init → agent updates) @@ -246,28 +289,126 @@ if (result.passRate < 100%) { ``` **File Producers**: -- **Planning Agent**: `fix-plan.json` (with metadata), all `fix-progress-*.json` (initial state) -- **Execution Agents**: Update assigned `fix-progress-{N}.json` in real-time +- **Orchestrator**: Batches findings (Phase 1.5), aggregates partial plans → `fix-plan.json` (Phase 2), launches parallel planning agents +- **Planning Agents (N)**: Each outputs `partial-plan-{batch-id}.json` + initializes `fix-progress-*.json` for assigned groups +- **Execution Agents (M)**: Update assigned `fix-progress-{N}.json` in real-time ### Agent Invocation Template -**Planning Agent**: +**Phase 1.5: Intelligent Batching** (Orchestrator): +```javascript +// Load findings +const findings = JSON.parse(Read(exportFile)); +const batchSize = flags.batchSize || 5; + +// Semantic similarity analysis: group by file+dimension +const batches = []; +const grouped = new Map(); // key: "${file}:${dimension}" + +for (const finding of findings) { + const key = `${finding.file || 'unknown'}:${finding.dimension || 'general'}`; + if (!grouped.has(key)) grouped.set(key, []); + grouped.get(key).push(finding); +} + +// Create batches respecting batchSize +for (const [key, group] of grouped) { + while (group.length > 0) { + const batch = group.splice(0, batchSize); + batches.push({ + batch_id: batches.length + 1, + findings: batch, + metadata: { primary_file: batch[0].file, primary_dimension: batch[0].dimension } + }); + } +} + +console.log(`Created ${batches.length} batches (${batchSize} findings per batch)`); +``` + +**Phase 2: Parallel Planning** (Orchestrator launches N agents): +```javascript +const MAX_PARALLEL = 10; +const partialPlans = []; + +// Process batches in chunks of MAX_PARALLEL +for (let i = 0; i < batches.length; i += MAX_PARALLEL) { + const chunk = batches.slice(i, i + MAX_PARALLEL); + const taskIds = []; + + // Launch agents in parallel (run_in_background=true) + for (const batch of chunk) { + const taskId = Task({ + subagent_type: "cli-planning-agent", + run_in_background: true, + description: `Plan batch ${batch.batch_id}: ${batch.findings.length} findings`, + prompt: planningPrompt(batch) // See Planning Agent template below + }); + taskIds.push({ taskId, batch }); + } + + console.log(`Launched ${taskIds.length} planning agents...`); + + // Collect results from this chunk (blocking) + for (const { taskId, batch } of taskIds) { + const result = TaskOutput({ task_id: taskId, block: true }); + const partialPlan = JSON.parse(Read(`${sessionDir}/partial-plan-${batch.batch_id}.json`)); + partialPlans.push(partialPlan); + updateTodo(`Batch ${batch.batch_id}`, 'completed'); + } +} + +// Aggregate partial plans → fix-plan.json +let groupCounter = 1; +const groupIdMap = new Map(); + +for (const partial of partialPlans) { + for (const group of partial.groups) { + const newGroupId = `G${groupCounter}`; + groupIdMap.set(`${partial.batch_id}:${group.group_id}`, newGroupId); + aggregatedPlan.groups.push({ ...group, group_id: newGroupId, progress_file: `fix-progress-${groupCounter}.json` }); + groupCounter++; + } +} + +// Merge timelines, resolve cross-batch conflicts (shared files → serialize) +let stageCounter = 1; +for (const partial of partialPlans) { + for (const stage of partial.timeline) { + aggregatedPlan.timeline.push({ + ...stage, stage_id: stageCounter, + groups: stage.groups.map(gid => groupIdMap.get(`${partial.batch_id}:${gid}`)) + }); + stageCounter++; + } +} + +// Write aggregated plan + initialize progress files +Write(`${sessionDir}/fix-plan.json`, JSON.stringify(aggregatedPlan, null, 2)); +for (let i = 1; i <= aggregatedPlan.groups.length; i++) { + Write(`${sessionDir}/fix-progress-${i}.json`, JSON.stringify(initProgressFile(aggregatedPlan.groups[i-1]), null, 2)); +} +``` + +**Planning Agent (Batch Mode - Partial Plan Only)**: ```javascript Task({ subagent_type: "cli-planning-agent", - description: `Generate fix plan and initialize progress files for ${findings.length} findings`, + run_in_background: true, + description: `Plan batch ${batch.batch_id}: ${batch.findings.length} findings`, prompt: ` ## Task Objective -Analyze ${findings.length} code review findings and generate execution plan with intelligent grouping and timeline coordination. +Analyze code review findings in batch ${batch.batch_id} and generate **partial** execution plan. ## Input Data Review Session: ${reviewId} Fix Session ID: ${fixSessionId} -Total Findings: ${findings.length} +Batch ID: ${batch.batch_id} +Batch Findings: ${batch.findings.length} Findings: -${JSON.stringify(findings, null, 2)} +${JSON.stringify(batch.findings, null, 2)} Project Context: - Structure: ${projectStructure} @@ -276,31 +417,23 @@ Project Context: ## Output Requirements -### 1. fix-plan.json -Execute: cat ~/.claude/workflows/cli-templates/fix-plan-template.json - -Generate execution plan following template structure: +### 1. partial-plan-${batch.batch_id}.json +Generate partial execution plan with structure: +{ + "batch_id": ${batch.batch_id}, + "groups": [...], // Groups created from batch findings (use local IDs: G1, G2, ...) + "timeline": [...], // Local timeline for this batch only + "metadata": { + "findings_count": ${batch.findings.length}, + "groups_count": N, + "created_at": "ISO-8601-timestamp" + } +} **Key Generation Rules**: -- **Metadata**: Populate fix_session_id, review_session_id, status ("planning"), created_at, started_at timestamps -- **Execution Strategy**: Choose approach (parallel/serial/hybrid) based on dependency analysis, set parallel_limit and stages count -- **Groups**: Create groups (G1, G2, ...) with intelligent grouping (see Analysis Requirements below), assign progress files (fix-progress-1.json, ...), populate fix_strategy with approach/complexity/test_pattern, assess risks, identify dependencies -- **Timeline**: Define stages respecting dependencies, set execution_mode per stage, map groups to stages, calculate critical path - -### 2. fix-progress-{N}.json (one per group) -Execute: cat ~/.claude/workflows/cli-templates/fix-progress-template.json - -For each group (G1, G2, G3, ...), generate fix-progress-{N}.json following template structure: - -**Initial State Requirements**: -- Status: "pending", phase: "waiting" -- Timestamps: Set last_update to now, others null -- Findings: Populate from review findings with status "pending", all operation fields null -- Summary: Initialize all counters to zero -- Flow control: Empty implementation_approach array -- Errors: Empty array - -**CRITICAL**: Ensure complete template structure - all fields must be present. +- **Groups**: Create groups with local IDs (G1, G2, ...) using intelligent grouping (file+dimension+root cause) +- **Timeline**: Define stages for this batch only (local dependencies within batch) +- **Progress Files**: DO NOT generate fix-progress-*.json here (orchestrator handles after aggregation) ## Analysis Requirements @@ -318,7 +451,7 @@ Group findings using these criteria (in priority order): - Consider test isolation (different test suites → different groups) - Balance workload across groups for parallel execution -### Execution Strategy Determination +### Execution Strategy Determination (Local Only) **Parallel Mode**: Use when groups are independent, no shared files **Serial Mode**: Use when groups have dependencies or shared resources @@ -346,21 +479,16 @@ For each group, determine: ## Output Files Write to ${sessionDir}: -- ./fix-plan.json -- ./fix-progress-1.json -- ./fix-progress-2.json -- ./fix-progress-{N}.json (as many as groups created) +- ./partial-plan-${batch.batch_id}.json ## Quality Checklist Before finalizing outputs: -- ✅ All findings assigned to exactly one group -- ✅ Group dependencies correctly identified -- ✅ Timeline stages respect dependencies -- ✅ All progress files have complete initial structure +- ✅ All batch findings assigned to exactly one group +- ✅ Group dependencies (within batch) correctly identified +- ✅ Timeline stages respect local dependencies - ✅ Test patterns are valid and specific - ✅ Risk assessments are realistic -- ✅ Estimated times are reasonable ` }) ``` diff --git a/ccw/frontend/src/hooks/useWebSocket.ts b/ccw/frontend/src/hooks/useWebSocket.ts index d2a08e5d..e043db30 100644 --- a/ccw/frontend/src/hooks/useWebSocket.ts +++ b/ccw/frontend/src/hooks/useWebSocket.ts @@ -40,36 +40,74 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const reconnectDelayRef = useRef(RECONNECT_DELAY_BASE); const mountedRef = useRef(true); - // Notification store for connection status - const setWsStatus = useNotificationStore((state) => state.setWsStatus); - const setWsLastMessage = useNotificationStore((state) => state.setWsLastMessage); - const incrementReconnectAttempts = useNotificationStore((state) => state.incrementReconnectAttempts); - const resetReconnectAttempts = useNotificationStore((state) => state.resetReconnectAttempts); - const addA2UINotification = useNotificationStore((state) => state.addA2UINotification); + // Store refs to prevent handler recreation - use useRef to keep stable references + const storeRefs = useRef({ + // Notification store + setWsStatus: useNotificationStore((state) => state.setWsStatus), + setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage), + incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts), + resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts), + addA2UINotification: useNotificationStore((state) => state.addA2UINotification), - // Execution store for state updates - const setExecutionStatus = useExecutionStore((state) => state.setExecutionStatus); - const setNodeStarted = useExecutionStore((state) => state.setNodeStarted); - const setNodeCompleted = useExecutionStore((state) => state.setNodeCompleted); - const setNodeFailed = useExecutionStore((state) => state.setNodeFailed); - const addLog = useExecutionStore((state) => state.addLog); - const completeExecution = useExecutionStore((state) => state.completeExecution); - const currentExecution = useExecutionStore((state) => state.currentExecution); + // Execution store + setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus), + setNodeStarted: useExecutionStore((state) => state.setNodeStarted), + setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted), + setNodeFailed: useExecutionStore((state) => state.setNodeFailed), + addLog: useExecutionStore((state) => state.addLog), + completeExecution: useExecutionStore((state) => state.completeExecution), + currentExecution: useExecutionStore((state) => state.currentExecution), - // Flow store for node status updates on canvas - const updateNode = useFlowStore((state) => state.updateNode); + // Flow store + updateNode: useFlowStore((state) => state.updateNode), - // CLI stream store for CLI output handling - const addOutput = useCliStreamStore((state) => state.addOutput); + // CLI stream store + addOutput: useCliStreamStore((state) => state.addOutput), - // Coordinator store for coordinator state updates - const updateNodeStatus = useCoordinatorStore((state) => state.updateNodeStatus); - const addCoordinatorLog = useCoordinatorStore((state) => state.addLog); - const setActiveQuestion = useCoordinatorStore((state) => state.setActiveQuestion); - const markExecutionComplete = useCoordinatorStore((state) => state.markExecutionComplete); - const coordinatorExecutionId = useCoordinatorStore((state) => state.currentExecutionId); + // Coordinator store + updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus), + addCoordinatorLog: useCoordinatorStore((state) => state.addLog), + setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion), + markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete), + coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId), + }); + + // Update refs periodically to ensure they have fresh store references + useEffect(() => { + storeRefs.current = { + // Notification store + setWsStatus: useNotificationStore((state) => state.setWsStatus), + setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage), + incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts), + resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts), + addA2UINotification: useNotificationStore((state) => state.addA2UINotification), + + // Execution store + setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus), + setNodeStarted: useExecutionStore((state) => state.setNodeStarted), + setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted), + setNodeFailed: useExecutionStore((state) => state.setNodeFailed), + addLog: useExecutionStore((state) => state.addLog), + completeExecution: useExecutionStore((state) => state.completeExecution), + currentExecution: useExecutionStore((state) => state.currentExecution), + + // Flow store + updateNode: useFlowStore((state) => state.updateNode), + + // CLI stream store + addOutput: useCliStreamStore((state) => state.addOutput), + + // Coordinator store + updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus), + addCoordinatorLog: useCoordinatorStore((state) => state.addLog), + setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion), + markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete), + coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId), + }; + }); // Run on every render to keep refs fresh // Handle incoming WebSocket messages + // Note: Using refs via storeRefs to prevent handler recreation on every store change const handleMessage = useCallback( (event: MessageEvent) => { // Guard against state updates after unmount @@ -81,7 +119,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const data = JSON.parse(event.data); // Store last message for debugging - setWsLastMessage(data); + storeRefs.current.setWsLastMessage(data); // Handle CLI messages if (data.type?.startsWith('CLI_')) { @@ -90,7 +128,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const { executionId, tool, mode, timestamp } = data.payload; // Add system message for CLI start - addOutput(executionId, { + storeRefs.current.addOutput(executionId, { type: 'system', content: `[${new Date(timestamp).toLocaleTimeString()}] CLI execution started: ${tool} (${mode || 'default'} mode)`, timestamp: Date.now(), @@ -119,7 +157,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet lines.forEach((line: string) => { // Add non-empty lines, or single line if that's all we have if (line.trim() || lines.length === 1) { - addOutput(executionId, { + storeRefs.current.addOutput(executionId, { type: unitType as any, content: line, timestamp: Date.now(), @@ -135,7 +173,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const statusText = success ? 'completed successfully' : 'failed'; const durationText = duration ? ` (${duration}ms)` : ''; - addOutput(executionId, { + storeRefs.current.addOutput(executionId, { type: 'system', content: `[${new Date().toLocaleTimeString()}] CLI execution ${statusText}${durationText}`, timestamp: Date.now(), @@ -150,7 +188,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet if (data.type === 'a2ui-surface') { const parsed = SurfaceUpdateSchema.safeParse(data.payload); if (parsed.success) { - addA2UINotification(parsed.data, 'Interactive UI'); + storeRefs.current.addA2UINotification(parsed.data, 'Interactive UI'); } else { console.warn('[WebSocket] Invalid A2UI surface:', parsed.error.issues); } @@ -159,6 +197,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet // Handle Coordinator messages if (data.type?.startsWith('COORDINATOR_')) { + const { coordinatorExecutionId } = storeRefs.current; // Only process messages for current coordinator execution if (coordinatorExecutionId && data.executionId !== coordinatorExecutionId) { return; @@ -169,26 +208,26 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet case 'COORDINATOR_STATE_UPDATE': // Check for completion if (data.status === 'completed') { - markExecutionComplete(true); + storeRefs.current.markExecutionComplete(true); } else if (data.status === 'failed') { - markExecutionComplete(false); + storeRefs.current.markExecutionComplete(false); } break; case 'COORDINATOR_COMMAND_STARTED': - updateNodeStatus(data.nodeId, 'running'); + storeRefs.current.updateNodeStatus(data.nodeId, 'running'); break; case 'COORDINATOR_COMMAND_COMPLETED': - updateNodeStatus(data.nodeId, 'completed', data.result); + storeRefs.current.updateNodeStatus(data.nodeId, 'completed', data.result); break; case 'COORDINATOR_COMMAND_FAILED': - updateNodeStatus(data.nodeId, 'failed', undefined, data.error); + storeRefs.current.updateNodeStatus(data.nodeId, 'failed', undefined, data.error); break; case 'COORDINATOR_LOG_ENTRY': - addCoordinatorLog( + storeRefs.current.addCoordinatorLog( data.log.message, data.log.level, data.log.nodeId, @@ -197,7 +236,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet break; case 'COORDINATOR_QUESTION_ASKED': - setActiveQuestion(data.question); + storeRefs.current.setActiveQuestion(data.question); break; case 'COORDINATOR_ANSWER_RECEIVED': @@ -223,6 +262,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const message = parsed.data as OrchestratorWebSocketMessage; // Only process messages for current execution + const { currentExecution } = storeRefs.current; if (currentExecution && message.execId !== currentExecution.execId) { return; } @@ -230,39 +270,39 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet // Dispatch to execution store based on message type switch (message.type) { case 'ORCHESTRATOR_STATE_UPDATE': - setExecutionStatus(message.status, message.currentNodeId); + storeRefs.current.setExecutionStatus(message.status, message.currentNodeId); // Check for completion if (message.status === 'completed' || message.status === 'failed') { - completeExecution(message.status); + storeRefs.current.completeExecution(message.status); } break; case 'ORCHESTRATOR_NODE_STARTED': - setNodeStarted(message.nodeId); + storeRefs.current.setNodeStarted(message.nodeId); // Update canvas node status - updateNode(message.nodeId, { executionStatus: 'running' }); + storeRefs.current.updateNode(message.nodeId, { executionStatus: 'running' }); break; case 'ORCHESTRATOR_NODE_COMPLETED': - setNodeCompleted(message.nodeId, message.result); + storeRefs.current.setNodeCompleted(message.nodeId, message.result); // Update canvas node status - updateNode(message.nodeId, { + storeRefs.current.updateNode(message.nodeId, { executionStatus: 'completed', executionResult: message.result, }); break; case 'ORCHESTRATOR_NODE_FAILED': - setNodeFailed(message.nodeId, message.error); + storeRefs.current.setNodeFailed(message.nodeId, message.error); // Update canvas node status - updateNode(message.nodeId, { + storeRefs.current.updateNode(message.nodeId, { executionStatus: 'failed', executionError: message.error, }); break; case 'ORCHESTRATOR_LOG': - addLog(message.log as ExecutionLog); + storeRefs.current.addLog(message.log as ExecutionLog); break; } @@ -272,69 +312,15 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet console.error('[WebSocket] Failed to parse message:', error); } }, - [ - currentExecution, - coordinatorExecutionId, - setWsLastMessage, - setExecutionStatus, - setNodeStarted, - setNodeCompleted, - setNodeFailed, - addLog, - completeExecution, - updateNode, - addOutput, - addA2UINotification, - updateNodeStatus, - addCoordinatorLog, - setActiveQuestion, - markExecutionComplete, - onMessage, - ] + [onMessage] // Only dependency is onMessage, all other functions accessed via refs ); // Connect to WebSocket - const connect = useCallback(() => { - if (!enabled) return; - - // Construct WebSocket URL - const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; - const wsUrl = `${protocol}//${window.location.host}/ws`; - - try { - setWsStatus('connecting'); - - const ws = new WebSocket(wsUrl); - wsRef.current = ws; - - ws.onopen = () => { - console.log('[WebSocket] Connected'); - setWsStatus('connected'); - resetReconnectAttempts(); - reconnectDelayRef.current = RECONNECT_DELAY_BASE; - }; - - ws.onmessage = handleMessage; - - ws.onclose = () => { - console.log('[WebSocket] Disconnected'); - setWsStatus('disconnected'); - wsRef.current = null; - scheduleReconnect(); - }; - - ws.onerror = (error) => { - console.error('[WebSocket] Error:', error); - setWsStatus('error'); - }; - } catch (error) { - console.error('[WebSocket] Failed to connect:', error); - setWsStatus('error'); - scheduleReconnect(); - } - }, [enabled, handleMessage, setWsStatus, resetReconnectAttempts]); + // Use ref to avoid circular dependency with scheduleReconnect + const connectRef = useRef<(() => void) | null>(null); // Schedule reconnection with exponential backoff + // Define this first to avoid circular dependency const scheduleReconnect = useCallback(() => { if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); @@ -343,11 +329,11 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const delay = reconnectDelayRef.current; console.log(`[WebSocket] Reconnecting in ${delay}ms...`); - setWsStatus('reconnecting'); - incrementReconnectAttempts(); + storeRefs.current.setWsStatus('reconnecting'); + storeRefs.current.incrementReconnectAttempts(); reconnectTimeoutRef.current = setTimeout(() => { - connect(); + connectRef.current?.(); }, delay); // Increase delay for next attempt (exponential backoff) @@ -355,7 +341,50 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet reconnectDelayRef.current * RECONNECT_DELAY_MULTIPLIER, RECONNECT_DELAY_MAX ); - }, [connect, setWsStatus, incrementReconnectAttempts]); + }, []); // No dependencies - uses connectRef + + const connect = useCallback(() => { + if (!enabled) return; + + // Construct WebSocket URL + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = `${protocol}//${window.location.host}/ws`; + + try { + storeRefs.current.setWsStatus('connecting'); + + const ws = new WebSocket(wsUrl); + wsRef.current = ws; + + ws.onopen = () => { + console.log('[WebSocket] Connected'); + storeRefs.current.setWsStatus('connected'); + storeRefs.current.resetReconnectAttempts(); + reconnectDelayRef.current = RECONNECT_DELAY_BASE; + }; + + ws.onmessage = handleMessage; + + ws.onclose = () => { + console.log('[WebSocket] Disconnected'); + storeRefs.current.setWsStatus('disconnected'); + wsRef.current = null; + scheduleReconnect(); + }; + + ws.onerror = (error) => { + console.error('[WebSocket] Error:', error); + storeRefs.current.setWsStatus('error'); + }; + } catch (error) { + console.error('[WebSocket] Failed to connect:', error); + storeRefs.current.setWsStatus('error'); + scheduleReconnect(); + } + }, [enabled, handleMessage, scheduleReconnect]); + + // Update connect ref after connect is defined + connectRef.current = connect; // Send message through WebSocket const send = useCallback((message: unknown) => { @@ -367,13 +396,14 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet }, []); // Manual reconnect + // Use connectRef to avoid depending on connect const reconnect = useCallback(() => { if (wsRef.current) { wsRef.current.close(); } reconnectDelayRef.current = RECONNECT_DELAY_BASE; - connect(); - }, [connect]); + connectRef.current?.(); + }, []); // No dependencies - uses connectRef // Check connection status const isConnected = wsRef.current?.readyState === WebSocket.OPEN;