diff --git a/ccw/frontend/src/hooks/useWebSocket.ts b/ccw/frontend/src/hooks/useWebSocket.ts index e043db30..9f9eceff 100644 --- a/ccw/frontend/src/hooks/useWebSocket.ts +++ b/ccw/frontend/src/hooks/useWebSocket.ts @@ -21,6 +21,42 @@ const RECONNECT_DELAY_BASE = 1000; // 1 second const RECONNECT_DELAY_MAX = 30000; // 30 seconds const RECONNECT_DELAY_MULTIPLIER = 1.5; +// Access store state/actions via getState() - avoids calling hooks in callbacks/effects +// This is the zustand-recommended pattern for non-rendering store access +function getStoreState() { + const notification = useNotificationStore.getState(); + const execution = useExecutionStore.getState(); + const flow = useFlowStore.getState(); + const cliStream = useCliStreamStore.getState(); + const coordinator = useCoordinatorStore.getState(); + return { + // Notification store + setWsStatus: notification.setWsStatus, + setWsLastMessage: notification.setWsLastMessage, + incrementReconnectAttempts: notification.incrementReconnectAttempts, + resetReconnectAttempts: notification.resetReconnectAttempts, + addA2UINotification: notification.addA2UINotification, + // Execution store + setExecutionStatus: execution.setExecutionStatus, + setNodeStarted: execution.setNodeStarted, + setNodeCompleted: execution.setNodeCompleted, + setNodeFailed: execution.setNodeFailed, + addLog: execution.addLog, + completeExecution: execution.completeExecution, + currentExecution: execution.currentExecution, + // Flow store + updateNode: flow.updateNode, + // CLI stream store + addOutput: cliStream.addOutput, + // Coordinator store + updateNodeStatus: coordinator.updateNodeStatus, + addCoordinatorLog: coordinator.addLog, + setActiveQuestion: coordinator.setActiveQuestion, + markExecutionComplete: coordinator.markExecutionComplete, + coordinatorExecutionId: coordinator.currentExecutionId, + }; +} + interface UseWebSocketOptions { enabled?: boolean; onMessage?: (message: OrchestratorWebSocketMessage) => void; @@ -40,74 +76,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const reconnectDelayRef = useRef(RECONNECT_DELAY_BASE); const mountedRef = useRef(true); - // Store refs to prevent handler recreation - use useRef to keep stable references - const storeRefs = useRef({ - // Notification store - setWsStatus: useNotificationStore((state) => state.setWsStatus), - setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage), - incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts), - resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts), - addA2UINotification: useNotificationStore((state) => state.addA2UINotification), - - // Execution store - setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus), - setNodeStarted: useExecutionStore((state) => state.setNodeStarted), - setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted), - setNodeFailed: useExecutionStore((state) => state.setNodeFailed), - addLog: useExecutionStore((state) => state.addLog), - completeExecution: useExecutionStore((state) => state.completeExecution), - currentExecution: useExecutionStore((state) => state.currentExecution), - - // Flow store - updateNode: useFlowStore((state) => state.updateNode), - - // CLI stream store - addOutput: useCliStreamStore((state) => state.addOutput), - - // Coordinator store - updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus), - addCoordinatorLog: useCoordinatorStore((state) => state.addLog), - setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion), - markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete), - coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId), - }); - - // Update refs periodically to ensure they have fresh store references - useEffect(() => { - storeRefs.current = { - // Notification store - setWsStatus: useNotificationStore((state) => state.setWsStatus), - setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage), - incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts), - resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts), - addA2UINotification: useNotificationStore((state) => state.addA2UINotification), - - // Execution store - setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus), - setNodeStarted: useExecutionStore((state) => state.setNodeStarted), - setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted), - setNodeFailed: useExecutionStore((state) => state.setNodeFailed), - addLog: useExecutionStore((state) => state.addLog), - completeExecution: useExecutionStore((state) => state.completeExecution), - currentExecution: useExecutionStore((state) => state.currentExecution), - - // Flow store - updateNode: useFlowStore((state) => state.updateNode), - - // CLI stream store - addOutput: useCliStreamStore((state) => state.addOutput), - - // Coordinator store - updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus), - addCoordinatorLog: useCoordinatorStore((state) => state.addLog), - setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion), - markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete), - coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId), - }; - }); // Run on every render to keep refs fresh - // Handle incoming WebSocket messages - // Note: Using refs via storeRefs to prevent handler recreation on every store change const handleMessage = useCallback( (event: MessageEvent) => { // Guard against state updates after unmount @@ -117,9 +86,10 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet try { const data = JSON.parse(event.data); + const stores = getStoreState(); // Store last message for debugging - storeRefs.current.setWsLastMessage(data); + stores.setWsLastMessage(data); // Handle CLI messages if (data.type?.startsWith('CLI_')) { @@ -128,7 +98,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const { executionId, tool, mode, timestamp } = data.payload; // Add system message for CLI start - storeRefs.current.addOutput(executionId, { + stores.addOutput(executionId, { type: 'system', content: `[${new Date(timestamp).toLocaleTimeString()}] CLI execution started: ${tool} (${mode || 'default'} mode)`, timestamp: Date.now(), @@ -157,7 +127,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet lines.forEach((line: string) => { // Add non-empty lines, or single line if that's all we have if (line.trim() || lines.length === 1) { - storeRefs.current.addOutput(executionId, { + stores.addOutput(executionId, { type: unitType as any, content: line, timestamp: Date.now(), @@ -173,7 +143,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const statusText = success ? 'completed successfully' : 'failed'; const durationText = duration ? ` (${duration}ms)` : ''; - storeRefs.current.addOutput(executionId, { + stores.addOutput(executionId, { type: 'system', content: `[${new Date().toLocaleTimeString()}] CLI execution ${statusText}${durationText}`, timestamp: Date.now(), @@ -188,7 +158,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet if (data.type === 'a2ui-surface') { const parsed = SurfaceUpdateSchema.safeParse(data.payload); if (parsed.success) { - storeRefs.current.addA2UINotification(parsed.data, 'Interactive UI'); + stores.addA2UINotification(parsed.data, 'Interactive UI'); } else { console.warn('[WebSocket] Invalid A2UI surface:', parsed.error.issues); } @@ -197,7 +167,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet // Handle Coordinator messages if (data.type?.startsWith('COORDINATOR_')) { - const { coordinatorExecutionId } = storeRefs.current; + const { coordinatorExecutionId } = stores; // Only process messages for current coordinator execution if (coordinatorExecutionId && data.executionId !== coordinatorExecutionId) { return; @@ -208,26 +178,26 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet case 'COORDINATOR_STATE_UPDATE': // Check for completion if (data.status === 'completed') { - storeRefs.current.markExecutionComplete(true); + stores.markExecutionComplete(true); } else if (data.status === 'failed') { - storeRefs.current.markExecutionComplete(false); + stores.markExecutionComplete(false); } break; case 'COORDINATOR_COMMAND_STARTED': - storeRefs.current.updateNodeStatus(data.nodeId, 'running'); + stores.updateNodeStatus(data.nodeId, 'running'); break; case 'COORDINATOR_COMMAND_COMPLETED': - storeRefs.current.updateNodeStatus(data.nodeId, 'completed', data.result); + stores.updateNodeStatus(data.nodeId, 'completed', data.result); break; case 'COORDINATOR_COMMAND_FAILED': - storeRefs.current.updateNodeStatus(data.nodeId, 'failed', undefined, data.error); + stores.updateNodeStatus(data.nodeId, 'failed', undefined, data.error); break; case 'COORDINATOR_LOG_ENTRY': - storeRefs.current.addCoordinatorLog( + stores.addCoordinatorLog( data.log.message, data.log.level, data.log.nodeId, @@ -236,7 +206,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet break; case 'COORDINATOR_QUESTION_ASKED': - storeRefs.current.setActiveQuestion(data.question); + stores.setActiveQuestion(data.question); break; case 'COORDINATOR_ANSWER_RECEIVED': @@ -262,7 +232,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const message = parsed.data as OrchestratorWebSocketMessage; // Only process messages for current execution - const { currentExecution } = storeRefs.current; + const { currentExecution } = stores; if (currentExecution && message.execId !== currentExecution.execId) { return; } @@ -270,39 +240,39 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet // Dispatch to execution store based on message type switch (message.type) { case 'ORCHESTRATOR_STATE_UPDATE': - storeRefs.current.setExecutionStatus(message.status, message.currentNodeId); + stores.setExecutionStatus(message.status, message.currentNodeId); // Check for completion if (message.status === 'completed' || message.status === 'failed') { - storeRefs.current.completeExecution(message.status); + stores.completeExecution(message.status); } break; case 'ORCHESTRATOR_NODE_STARTED': - storeRefs.current.setNodeStarted(message.nodeId); + stores.setNodeStarted(message.nodeId); // Update canvas node status - storeRefs.current.updateNode(message.nodeId, { executionStatus: 'running' }); + stores.updateNode(message.nodeId, { executionStatus: 'running' }); break; case 'ORCHESTRATOR_NODE_COMPLETED': - storeRefs.current.setNodeCompleted(message.nodeId, message.result); + stores.setNodeCompleted(message.nodeId, message.result); // Update canvas node status - storeRefs.current.updateNode(message.nodeId, { + stores.updateNode(message.nodeId, { executionStatus: 'completed', executionResult: message.result, }); break; case 'ORCHESTRATOR_NODE_FAILED': - storeRefs.current.setNodeFailed(message.nodeId, message.error); + stores.setNodeFailed(message.nodeId, message.error); // Update canvas node status - storeRefs.current.updateNode(message.nodeId, { + stores.updateNode(message.nodeId, { executionStatus: 'failed', executionError: message.error, }); break; case 'ORCHESTRATOR_LOG': - storeRefs.current.addLog(message.log as ExecutionLog); + stores.addLog(message.log as ExecutionLog); break; } @@ -312,7 +282,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet console.error('[WebSocket] Failed to parse message:', error); } }, - [onMessage] // Only dependency is onMessage, all other functions accessed via refs + [onMessage] // Only dependency is onMessage, store access via getState() ); // Connect to WebSocket @@ -329,8 +299,9 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const delay = reconnectDelayRef.current; console.log(`[WebSocket] Reconnecting in ${delay}ms...`); - storeRefs.current.setWsStatus('reconnecting'); - storeRefs.current.incrementReconnectAttempts(); + const stores = getStoreState(); + stores.setWsStatus('reconnecting'); + stores.incrementReconnectAttempts(); reconnectTimeoutRef.current = setTimeout(() => { connectRef.current?.(); @@ -341,7 +312,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet reconnectDelayRef.current * RECONNECT_DELAY_MULTIPLIER, RECONNECT_DELAY_MAX ); - }, []); // No dependencies - uses connectRef + }, []); // No dependencies - uses connectRef and getStoreState() const connect = useCallback(() => { if (!enabled) return; @@ -351,15 +322,16 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet const wsUrl = `${protocol}//${window.location.host}/ws`; try { - storeRefs.current.setWsStatus('connecting'); + getStoreState().setWsStatus('connecting'); const ws = new WebSocket(wsUrl); wsRef.current = ws; ws.onopen = () => { console.log('[WebSocket] Connected'); - storeRefs.current.setWsStatus('connected'); - storeRefs.current.resetReconnectAttempts(); + const s = getStoreState(); + s.setWsStatus('connected'); + s.resetReconnectAttempts(); reconnectDelayRef.current = RECONNECT_DELAY_BASE; }; @@ -367,18 +339,18 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet ws.onclose = () => { console.log('[WebSocket] Disconnected'); - storeRefs.current.setWsStatus('disconnected'); + getStoreState().setWsStatus('disconnected'); wsRef.current = null; scheduleReconnect(); }; ws.onerror = (error) => { console.error('[WebSocket] Error:', error); - storeRefs.current.setWsStatus('error'); + getStoreState().setWsStatus('error'); }; } catch (error) { console.error('[WebSocket] Failed to connect:', error); - storeRefs.current.setWsStatus('error'); + getStoreState().setWsStatus('error'); scheduleReconnect(); } }, [enabled, handleMessage, scheduleReconnect]);