refactor: streamline store access in useWebSocket by consolidating state retrieval into getStoreState function

This commit is contained in:
catlog22
2026-02-04 18:00:06 +08:00
parent 346c87a706
commit 7b2ac46760

View File

@@ -21,6 +21,42 @@ const RECONNECT_DELAY_BASE = 1000; // 1 second
const RECONNECT_DELAY_MAX = 30000; // 30 seconds const RECONNECT_DELAY_MAX = 30000; // 30 seconds
const RECONNECT_DELAY_MULTIPLIER = 1.5; const RECONNECT_DELAY_MULTIPLIER = 1.5;
// Access store state/actions via getState() - avoids calling hooks in callbacks/effects
// This is the zustand-recommended pattern for non-rendering store access
function getStoreState() {
const notification = useNotificationStore.getState();
const execution = useExecutionStore.getState();
const flow = useFlowStore.getState();
const cliStream = useCliStreamStore.getState();
const coordinator = useCoordinatorStore.getState();
return {
// Notification store
setWsStatus: notification.setWsStatus,
setWsLastMessage: notification.setWsLastMessage,
incrementReconnectAttempts: notification.incrementReconnectAttempts,
resetReconnectAttempts: notification.resetReconnectAttempts,
addA2UINotification: notification.addA2UINotification,
// Execution store
setExecutionStatus: execution.setExecutionStatus,
setNodeStarted: execution.setNodeStarted,
setNodeCompleted: execution.setNodeCompleted,
setNodeFailed: execution.setNodeFailed,
addLog: execution.addLog,
completeExecution: execution.completeExecution,
currentExecution: execution.currentExecution,
// Flow store
updateNode: flow.updateNode,
// CLI stream store
addOutput: cliStream.addOutput,
// Coordinator store
updateNodeStatus: coordinator.updateNodeStatus,
addCoordinatorLog: coordinator.addLog,
setActiveQuestion: coordinator.setActiveQuestion,
markExecutionComplete: coordinator.markExecutionComplete,
coordinatorExecutionId: coordinator.currentExecutionId,
};
}
interface UseWebSocketOptions { interface UseWebSocketOptions {
enabled?: boolean; enabled?: boolean;
onMessage?: (message: OrchestratorWebSocketMessage) => void; onMessage?: (message: OrchestratorWebSocketMessage) => void;
@@ -40,74 +76,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const reconnectDelayRef = useRef(RECONNECT_DELAY_BASE); const reconnectDelayRef = useRef(RECONNECT_DELAY_BASE);
const mountedRef = useRef(true); const mountedRef = useRef(true);
// Store refs to prevent handler recreation - use useRef to keep stable references
const storeRefs = useRef({
// Notification store
setWsStatus: useNotificationStore((state) => state.setWsStatus),
setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage),
incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts),
resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts),
addA2UINotification: useNotificationStore((state) => state.addA2UINotification),
// Execution store
setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus),
setNodeStarted: useExecutionStore((state) => state.setNodeStarted),
setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted),
setNodeFailed: useExecutionStore((state) => state.setNodeFailed),
addLog: useExecutionStore((state) => state.addLog),
completeExecution: useExecutionStore((state) => state.completeExecution),
currentExecution: useExecutionStore((state) => state.currentExecution),
// Flow store
updateNode: useFlowStore((state) => state.updateNode),
// CLI stream store
addOutput: useCliStreamStore((state) => state.addOutput),
// Coordinator store
updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus),
addCoordinatorLog: useCoordinatorStore((state) => state.addLog),
setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion),
markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete),
coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId),
});
// Update refs periodically to ensure they have fresh store references
useEffect(() => {
storeRefs.current = {
// Notification store
setWsStatus: useNotificationStore((state) => state.setWsStatus),
setWsLastMessage: useNotificationStore((state) => state.setWsLastMessage),
incrementReconnectAttempts: useNotificationStore((state) => state.incrementReconnectAttempts),
resetReconnectAttempts: useNotificationStore((state) => state.resetReconnectAttempts),
addA2UINotification: useNotificationStore((state) => state.addA2UINotification),
// Execution store
setExecutionStatus: useExecutionStore((state) => state.setExecutionStatus),
setNodeStarted: useExecutionStore((state) => state.setNodeStarted),
setNodeCompleted: useExecutionStore((state) => state.setNodeCompleted),
setNodeFailed: useExecutionStore((state) => state.setNodeFailed),
addLog: useExecutionStore((state) => state.addLog),
completeExecution: useExecutionStore((state) => state.completeExecution),
currentExecution: useExecutionStore((state) => state.currentExecution),
// Flow store
updateNode: useFlowStore((state) => state.updateNode),
// CLI stream store
addOutput: useCliStreamStore((state) => state.addOutput),
// Coordinator store
updateNodeStatus: useCoordinatorStore((state) => state.updateNodeStatus),
addCoordinatorLog: useCoordinatorStore((state) => state.addLog),
setActiveQuestion: useCoordinatorStore((state) => state.setActiveQuestion),
markExecutionComplete: useCoordinatorStore((state) => state.markExecutionComplete),
coordinatorExecutionId: useCoordinatorStore((state) => state.currentExecutionId),
};
}); // Run on every render to keep refs fresh
// Handle incoming WebSocket messages // Handle incoming WebSocket messages
// Note: Using refs via storeRefs to prevent handler recreation on every store change
const handleMessage = useCallback( const handleMessage = useCallback(
(event: MessageEvent) => { (event: MessageEvent) => {
// Guard against state updates after unmount // Guard against state updates after unmount
@@ -117,9 +86,10 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
try { try {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
const stores = getStoreState();
// Store last message for debugging // Store last message for debugging
storeRefs.current.setWsLastMessage(data); stores.setWsLastMessage(data);
// Handle CLI messages // Handle CLI messages
if (data.type?.startsWith('CLI_')) { if (data.type?.startsWith('CLI_')) {
@@ -128,7 +98,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const { executionId, tool, mode, timestamp } = data.payload; const { executionId, tool, mode, timestamp } = data.payload;
// Add system message for CLI start // Add system message for CLI start
storeRefs.current.addOutput(executionId, { stores.addOutput(executionId, {
type: 'system', type: 'system',
content: `[${new Date(timestamp).toLocaleTimeString()}] CLI execution started: ${tool} (${mode || 'default'} mode)`, content: `[${new Date(timestamp).toLocaleTimeString()}] CLI execution started: ${tool} (${mode || 'default'} mode)`,
timestamp: Date.now(), timestamp: Date.now(),
@@ -157,7 +127,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
lines.forEach((line: string) => { lines.forEach((line: string) => {
// Add non-empty lines, or single line if that's all we have // Add non-empty lines, or single line if that's all we have
if (line.trim() || lines.length === 1) { if (line.trim() || lines.length === 1) {
storeRefs.current.addOutput(executionId, { stores.addOutput(executionId, {
type: unitType as any, type: unitType as any,
content: line, content: line,
timestamp: Date.now(), timestamp: Date.now(),
@@ -173,7 +143,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const statusText = success ? 'completed successfully' : 'failed'; const statusText = success ? 'completed successfully' : 'failed';
const durationText = duration ? ` (${duration}ms)` : ''; const durationText = duration ? ` (${duration}ms)` : '';
storeRefs.current.addOutput(executionId, { stores.addOutput(executionId, {
type: 'system', type: 'system',
content: `[${new Date().toLocaleTimeString()}] CLI execution ${statusText}${durationText}`, content: `[${new Date().toLocaleTimeString()}] CLI execution ${statusText}${durationText}`,
timestamp: Date.now(), timestamp: Date.now(),
@@ -188,7 +158,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
if (data.type === 'a2ui-surface') { if (data.type === 'a2ui-surface') {
const parsed = SurfaceUpdateSchema.safeParse(data.payload); const parsed = SurfaceUpdateSchema.safeParse(data.payload);
if (parsed.success) { if (parsed.success) {
storeRefs.current.addA2UINotification(parsed.data, 'Interactive UI'); stores.addA2UINotification(parsed.data, 'Interactive UI');
} else { } else {
console.warn('[WebSocket] Invalid A2UI surface:', parsed.error.issues); console.warn('[WebSocket] Invalid A2UI surface:', parsed.error.issues);
} }
@@ -197,7 +167,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
// Handle Coordinator messages // Handle Coordinator messages
if (data.type?.startsWith('COORDINATOR_')) { if (data.type?.startsWith('COORDINATOR_')) {
const { coordinatorExecutionId } = storeRefs.current; const { coordinatorExecutionId } = stores;
// Only process messages for current coordinator execution // Only process messages for current coordinator execution
if (coordinatorExecutionId && data.executionId !== coordinatorExecutionId) { if (coordinatorExecutionId && data.executionId !== coordinatorExecutionId) {
return; return;
@@ -208,26 +178,26 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
case 'COORDINATOR_STATE_UPDATE': case 'COORDINATOR_STATE_UPDATE':
// Check for completion // Check for completion
if (data.status === 'completed') { if (data.status === 'completed') {
storeRefs.current.markExecutionComplete(true); stores.markExecutionComplete(true);
} else if (data.status === 'failed') { } else if (data.status === 'failed') {
storeRefs.current.markExecutionComplete(false); stores.markExecutionComplete(false);
} }
break; break;
case 'COORDINATOR_COMMAND_STARTED': case 'COORDINATOR_COMMAND_STARTED':
storeRefs.current.updateNodeStatus(data.nodeId, 'running'); stores.updateNodeStatus(data.nodeId, 'running');
break; break;
case 'COORDINATOR_COMMAND_COMPLETED': case 'COORDINATOR_COMMAND_COMPLETED':
storeRefs.current.updateNodeStatus(data.nodeId, 'completed', data.result); stores.updateNodeStatus(data.nodeId, 'completed', data.result);
break; break;
case 'COORDINATOR_COMMAND_FAILED': case 'COORDINATOR_COMMAND_FAILED':
storeRefs.current.updateNodeStatus(data.nodeId, 'failed', undefined, data.error); stores.updateNodeStatus(data.nodeId, 'failed', undefined, data.error);
break; break;
case 'COORDINATOR_LOG_ENTRY': case 'COORDINATOR_LOG_ENTRY':
storeRefs.current.addCoordinatorLog( stores.addCoordinatorLog(
data.log.message, data.log.message,
data.log.level, data.log.level,
data.log.nodeId, data.log.nodeId,
@@ -236,7 +206,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
break; break;
case 'COORDINATOR_QUESTION_ASKED': case 'COORDINATOR_QUESTION_ASKED':
storeRefs.current.setActiveQuestion(data.question); stores.setActiveQuestion(data.question);
break; break;
case 'COORDINATOR_ANSWER_RECEIVED': case 'COORDINATOR_ANSWER_RECEIVED':
@@ -262,7 +232,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const message = parsed.data as OrchestratorWebSocketMessage; const message = parsed.data as OrchestratorWebSocketMessage;
// Only process messages for current execution // Only process messages for current execution
const { currentExecution } = storeRefs.current; const { currentExecution } = stores;
if (currentExecution && message.execId !== currentExecution.execId) { if (currentExecution && message.execId !== currentExecution.execId) {
return; return;
} }
@@ -270,39 +240,39 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
// Dispatch to execution store based on message type // Dispatch to execution store based on message type
switch (message.type) { switch (message.type) {
case 'ORCHESTRATOR_STATE_UPDATE': case 'ORCHESTRATOR_STATE_UPDATE':
storeRefs.current.setExecutionStatus(message.status, message.currentNodeId); stores.setExecutionStatus(message.status, message.currentNodeId);
// Check for completion // Check for completion
if (message.status === 'completed' || message.status === 'failed') { if (message.status === 'completed' || message.status === 'failed') {
storeRefs.current.completeExecution(message.status); stores.completeExecution(message.status);
} }
break; break;
case 'ORCHESTRATOR_NODE_STARTED': case 'ORCHESTRATOR_NODE_STARTED':
storeRefs.current.setNodeStarted(message.nodeId); stores.setNodeStarted(message.nodeId);
// Update canvas node status // Update canvas node status
storeRefs.current.updateNode(message.nodeId, { executionStatus: 'running' }); stores.updateNode(message.nodeId, { executionStatus: 'running' });
break; break;
case 'ORCHESTRATOR_NODE_COMPLETED': case 'ORCHESTRATOR_NODE_COMPLETED':
storeRefs.current.setNodeCompleted(message.nodeId, message.result); stores.setNodeCompleted(message.nodeId, message.result);
// Update canvas node status // Update canvas node status
storeRefs.current.updateNode(message.nodeId, { stores.updateNode(message.nodeId, {
executionStatus: 'completed', executionStatus: 'completed',
executionResult: message.result, executionResult: message.result,
}); });
break; break;
case 'ORCHESTRATOR_NODE_FAILED': case 'ORCHESTRATOR_NODE_FAILED':
storeRefs.current.setNodeFailed(message.nodeId, message.error); stores.setNodeFailed(message.nodeId, message.error);
// Update canvas node status // Update canvas node status
storeRefs.current.updateNode(message.nodeId, { stores.updateNode(message.nodeId, {
executionStatus: 'failed', executionStatus: 'failed',
executionError: message.error, executionError: message.error,
}); });
break; break;
case 'ORCHESTRATOR_LOG': case 'ORCHESTRATOR_LOG':
storeRefs.current.addLog(message.log as ExecutionLog); stores.addLog(message.log as ExecutionLog);
break; break;
} }
@@ -312,7 +282,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
console.error('[WebSocket] Failed to parse message:', error); console.error('[WebSocket] Failed to parse message:', error);
} }
}, },
[onMessage] // Only dependency is onMessage, all other functions accessed via refs [onMessage] // Only dependency is onMessage, store access via getState()
); );
// Connect to WebSocket // Connect to WebSocket
@@ -329,8 +299,9 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const delay = reconnectDelayRef.current; const delay = reconnectDelayRef.current;
console.log(`[WebSocket] Reconnecting in ${delay}ms...`); console.log(`[WebSocket] Reconnecting in ${delay}ms...`);
storeRefs.current.setWsStatus('reconnecting'); const stores = getStoreState();
storeRefs.current.incrementReconnectAttempts(); stores.setWsStatus('reconnecting');
stores.incrementReconnectAttempts();
reconnectTimeoutRef.current = setTimeout(() => { reconnectTimeoutRef.current = setTimeout(() => {
connectRef.current?.(); connectRef.current?.();
@@ -341,7 +312,7 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
reconnectDelayRef.current * RECONNECT_DELAY_MULTIPLIER, reconnectDelayRef.current * RECONNECT_DELAY_MULTIPLIER,
RECONNECT_DELAY_MAX RECONNECT_DELAY_MAX
); );
}, []); // No dependencies - uses connectRef }, []); // No dependencies - uses connectRef and getStoreState()
const connect = useCallback(() => { const connect = useCallback(() => {
if (!enabled) return; if (!enabled) return;
@@ -351,15 +322,16 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
const wsUrl = `${protocol}//${window.location.host}/ws`; const wsUrl = `${protocol}//${window.location.host}/ws`;
try { try {
storeRefs.current.setWsStatus('connecting'); getStoreState().setWsStatus('connecting');
const ws = new WebSocket(wsUrl); const ws = new WebSocket(wsUrl);
wsRef.current = ws; wsRef.current = ws;
ws.onopen = () => { ws.onopen = () => {
console.log('[WebSocket] Connected'); console.log('[WebSocket] Connected');
storeRefs.current.setWsStatus('connected'); const s = getStoreState();
storeRefs.current.resetReconnectAttempts(); s.setWsStatus('connected');
s.resetReconnectAttempts();
reconnectDelayRef.current = RECONNECT_DELAY_BASE; reconnectDelayRef.current = RECONNECT_DELAY_BASE;
}; };
@@ -367,18 +339,18 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
ws.onclose = () => { ws.onclose = () => {
console.log('[WebSocket] Disconnected'); console.log('[WebSocket] Disconnected');
storeRefs.current.setWsStatus('disconnected'); getStoreState().setWsStatus('disconnected');
wsRef.current = null; wsRef.current = null;
scheduleReconnect(); scheduleReconnect();
}; };
ws.onerror = (error) => { ws.onerror = (error) => {
console.error('[WebSocket] Error:', error); console.error('[WebSocket] Error:', error);
storeRefs.current.setWsStatus('error'); getStoreState().setWsStatus('error');
}; };
} catch (error) { } catch (error) {
console.error('[WebSocket] Failed to connect:', error); console.error('[WebSocket] Failed to connect:', error);
storeRefs.current.setWsStatus('error'); getStoreState().setWsStatus('error');
scheduleReconnect(); scheduleReconnect();
} }
}, [enabled, handleMessage, scheduleReconnect]); }, [enabled, handleMessage, scheduleReconnect]);