fix(cli-viewer): resolve duplicate CLI output and sticky scroll button issues

- Create centralized useCliStreamWebSocket hook with module-level
  message tracking to ensure each WebSocket message is processed only once globally
- Replace position:absolute with position:sticky + flexbox wrapper
  for scroll-to-bottom buttons to fix viewport positioning
- Remove duplicate WebSocket handling from CliViewerPage, CliStreamMonitorLegacy,
  and CliStreamMonitorNew components

Fixes:
- CLI output no longer duplicated when multiple components subscribe
  to the same WebSocket feed
- Scroll-to-bottom button now stays fixed at bottom-right corner
  of viewport instead of scrolling with content
This commit is contained in:
catlog22
2026-02-24 09:40:43 +08:00
parent 418d605bd0
commit b2c1288dab
6 changed files with 243 additions and 444 deletions

View File

@@ -3,15 +3,15 @@
// ======================================== // ========================================
// Redesigned CLI streaming monitor with smart parsing and message-based layout // Redesigned CLI streaming monitor with smart parsing and message-based layout
import { useEffect, useState, useCallback, useMemo, useRef } from 'react'; import { useState, useCallback, useMemo } from 'react';
import { useIntl } from 'react-intl'; import { useIntl } from 'react-intl';
import { import {
Terminal, Terminal,
} from 'lucide-react'; } from 'lucide-react';
import { cn } from '@/lib/utils'; import { cn } from '@/lib/utils';
import { useCliStreamStore, type CliOutputLine } from '@/stores/cliStreamStore'; import { useCliStreamStore, type CliOutputLine } from '@/stores/cliStreamStore';
import { useNotificationStore, selectWsLastMessage } from '@/stores'; import { useActiveCliExecutions } from '@/hooks/useActiveCliExecutions';
import { useActiveCliExecutions, useInvalidateActiveCliExecutions } from '@/hooks/useActiveCliExecutions'; import { useCliStreamWebSocket } from '@/hooks/useCliStreamWebSocket';
// New layout components // New layout components
import { MonitorHeader } from './MonitorHeader'; import { MonitorHeader } from './MonitorHeader';
@@ -24,37 +24,8 @@ import {
ErrorMessage, ErrorMessage,
} from './messages'; } from './messages';
// ========== Types for CLI WebSocket Messages ========== // ========== Types ==========
// WebSocket message types are now handled centrally in useCliStreamWebSocket hook
interface CliStreamStartedPayload {
executionId: string;
tool: string;
mode: string;
timestamp: string;
}
interface CliStreamOutputPayload {
executionId: string;
chunkType: string;
data: unknown;
unit?: {
content: unknown;
type?: string;
};
}
interface CliStreamCompletedPayload {
executionId: string;
success: boolean;
duration?: number;
timestamp: string;
}
interface CliStreamErrorPayload {
executionId: string;
error?: string;
timestamp: string;
}
// ========== Message Type Detection ========== // ========== Message Type Detection ==========
@@ -179,20 +150,6 @@ function parseOutputToMessages(
return messages; return messages;
} }
// ========== Helper Functions ==========
function formatDuration(ms: number): string {
if (ms < 1000) return `${ms}ms`;
const seconds = Math.floor(ms / 1000);
if (seconds < 60) return `${seconds}s`;
const minutes = Math.floor(seconds / 60);
const remainingSeconds = seconds % 60;
if (minutes < 60) return `${minutes}m ${remainingSeconds}s`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return `${hours}h ${remainingMinutes}m`;
}
// ========== Component ========== // ========== Component ==========
export interface CliStreamMonitorNewProps { export interface CliStreamMonitorNewProps {
@@ -215,104 +172,9 @@ export function CliStreamMonitorNew({ isOpen, onClose }: CliStreamMonitorNewProp
// Active execution sync // Active execution sync
const { isLoading: isSyncing, refetch } = useActiveCliExecutions(isOpen); const { isLoading: isSyncing, refetch } = useActiveCliExecutions(isOpen);
const invalidateActive = useInvalidateActiveCliExecutions();
// WebSocket last message // CENTRALIZED WebSocket handler - processes each message only once globally
const lastMessage = useNotificationStore(selectWsLastMessage); useCliStreamWebSocket();
// Track last processed WebSocket message to prevent duplicate processing
const lastProcessedMsgRef = useRef<unknown>(null);
// Handle WebSocket messages (same as original)
useEffect(() => {
// Skip if no message or same message already processed (prevents React strict mode double-execution)
if (!lastMessage || lastMessage === lastProcessedMsgRef.current) return;
lastProcessedMsgRef.current = lastMessage;
const { type, payload } = lastMessage;
if (type === 'CLI_STARTED') {
const p = payload as CliStreamStartedPayload;
const startTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
tool: p.tool || 'cli',
mode: p.mode || 'analysis',
status: 'running',
startTime,
output: [
{
type: 'system',
content: `[${new Date(startTime).toLocaleTimeString()}] CLI execution started: ${p.tool} (${p.mode} mode)`,
timestamp: startTime
}
]
});
invalidateActive();
} else if (type === 'CLI_OUTPUT') {
const p = payload as CliStreamOutputPayload;
const unitContent = p.unit?.content ?? p.data;
const unitType = p.unit?.type || p.chunkType;
let content: string;
if (unitType === 'tool_call' && typeof unitContent === 'object' && unitContent !== null) {
const toolCall = unitContent as { action?: string; toolName?: string; parameters?: unknown; status?: string; output?: string };
if (toolCall.action === 'invoke') {
const params = toolCall.parameters ? JSON.stringify(toolCall.parameters) : '';
content = `[Tool] ${toolCall.toolName}(${params})`;
} else if (toolCall.action === 'result') {
const status = toolCall.status || 'unknown';
const output = toolCall.output ? `: ${toolCall.output.substring(0, 200)}${toolCall.output.length > 200 ? '...' : ''}` : '';
content = `[Tool Result] ${status}${output}`;
} else {
content = JSON.stringify(unitContent);
}
} else {
content = typeof unitContent === 'string' ? unitContent : JSON.stringify(unitContent);
}
const lines = content.split('\n');
const addOutput = useCliStreamStore.getState().addOutput;
lines.forEach(line => {
if (line.trim() || lines.length === 1) {
addOutput(p.executionId, {
type: (unitType as CliOutputLine['type']) || 'stdout',
content: line,
timestamp: Date.now()
});
}
});
} else if (type === 'CLI_COMPLETED') {
const p = payload as CliStreamCompletedPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: p.success ? 'completed' : 'error',
endTime,
output: [
{
type: 'system',
content: `[${new Date(endTime).toLocaleTimeString()}] CLI execution ${p.success ? 'completed successfully' : 'failed'}${p.duration ? ` (${formatDuration(p.duration)})` : ''}`,
timestamp: endTime
}
]
});
invalidateActive();
} else if (type === 'CLI_ERROR') {
const p = payload as CliStreamErrorPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: 'error',
endTime,
output: [
{
type: 'stderr',
content: `[ERROR] ${p.error || 'Unknown error occurred'}`,
timestamp: endTime
}
]
});
invalidateActive();
}
}, [lastMessage, invalidateActive]);
// Get execution stats // Get execution stats
const executionStats = useMemo(() => { const executionStats = useMemo(() => {

View File

@@ -33,16 +33,18 @@ interface ScrollToBottomButtonProps {
function ScrollToBottomButton({ onClick, className }: ScrollToBottomButtonProps) { function ScrollToBottomButton({ onClick, className }: ScrollToBottomButtonProps) {
return ( return (
<div className="sticky bottom-4 flex justify-end">
<Button <Button
size="sm" size="sm"
variant="secondary" variant="secondary"
className={cn('absolute bottom-4 right-4 shadow-lg', className)} className={cn('shadow-lg', className)}
onClick={onClick} onClick={onClick}
title="Scroll to bottom" title="Scroll to bottom"
> >
<ArrowDownToLine className="h-4 w-4 mr-1" /> <ArrowDownToLine className="h-4 w-4 mr-1" />
Scroll to bottom Scroll to bottom
</Button> </Button>
</div>
); );
} }

View File

@@ -24,45 +24,16 @@ import { Tabs, TabsList, TabsTrigger } from '@/components/ui/Tabs';
import { Badge } from '@/components/ui/Badge'; import { Badge } from '@/components/ui/Badge';
import { LogBlockList } from '@/components/shared/LogBlock'; import { LogBlockList } from '@/components/shared/LogBlock';
import { useCliStreamStore, type CliOutputLine } from '@/stores/cliStreamStore'; import { useCliStreamStore, type CliOutputLine } from '@/stores/cliStreamStore';
import { useNotificationStore, selectWsLastMessage } from '@/stores'; import { useActiveCliExecutions } from '@/hooks/useActiveCliExecutions';
import { useActiveCliExecutions, useInvalidateActiveCliExecutions } from '@/hooks/useActiveCliExecutions'; import { useCliStreamWebSocket } from '@/hooks/useCliStreamWebSocket';
// New components for Tab + JSON Cards // New components for Tab + JSON Cards
import { ExecutionTab } from './CliStreamMonitor/components/ExecutionTab'; import { ExecutionTab } from './CliStreamMonitor/components/ExecutionTab';
import ReactMarkdown from 'react-markdown'; import ReactMarkdown from 'react-markdown';
import remarkGfm from 'remark-gfm'; import remarkGfm from 'remark-gfm';
// ========== Types for CLI WebSocket Messages ========== // ========== Types ==========
// WebSocket message types are now handled centrally in useCliStreamWebSocket hook
interface CliStreamStartedPayload {
executionId: string;
tool: string;
mode: string;
timestamp: string;
}
interface CliStreamOutputPayload {
executionId: string;
chunkType: string;
data: unknown;
unit?: {
content: unknown;
type?: string;
};
}
interface CliStreamCompletedPayload {
executionId: string;
success: boolean;
duration?: number;
timestamp: string;
}
interface CliStreamErrorPayload {
executionId: string;
error?: string;
timestamp: string;
}
// ========== Helper Functions ========== // ========== Helper Functions ==========
@@ -209,9 +180,6 @@ export function CliStreamMonitor({ isOpen, onClose }: CliStreamMonitorProps) {
// Track last output length to detect new output // Track last output length to detect new output
const lastOutputLengthRef = useRef<Record<string, number>>({}); const lastOutputLengthRef = useRef<Record<string, number>>({});
// Track last processed WebSocket message to prevent duplicate processing
const lastProcessedMsgRef = useRef<unknown>(null);
// Store state // Store state
const executions = useCliStreamStore((state) => state.executions); const executions = useCliStreamStore((state) => state.executions);
const currentExecutionId = useCliStreamStore((state) => state.currentExecutionId); const currentExecutionId = useCliStreamStore((state) => state.currentExecutionId);
@@ -221,105 +189,9 @@ export function CliStreamMonitor({ isOpen, onClose }: CliStreamMonitorProps) {
// Active execution sync // Active execution sync
const { isLoading: isSyncing, refetch } = useActiveCliExecutions(isOpen); const { isLoading: isSyncing, refetch } = useActiveCliExecutions(isOpen);
const invalidateActive = useInvalidateActiveCliExecutions();
// WebSocket last message from notification store // CENTRALIZED WebSocket handler - processes each message only once globally
const lastMessage = useNotificationStore(selectWsLastMessage); useCliStreamWebSocket();
// Handle WebSocket messages for CLI stream
useEffect(() => {
// Skip if no message or same message already processed (prevents React strict mode double-execution)
if (!lastMessage || lastMessage === lastProcessedMsgRef.current) return;
lastProcessedMsgRef.current = lastMessage;
const { type, payload } = lastMessage;
if (type === 'CLI_STARTED') {
const p = payload as CliStreamStartedPayload;
const startTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
tool: p.tool || 'cli',
mode: p.mode || 'analysis',
status: 'running',
startTime,
output: [
{
type: 'system',
content: `[${new Date(startTime).toLocaleTimeString()}] CLI execution started: ${p.tool} (${p.mode} mode)`,
timestamp: startTime
}
]
});
// Set as current if none selected
if (!currentExecutionId) {
setCurrentExecution(p.executionId);
}
invalidateActive();
} else if (type === 'CLI_OUTPUT') {
const p = payload as CliStreamOutputPayload;
const unitContent = p.unit?.content ?? p.data;
const unitType = p.unit?.type || p.chunkType;
let content: string;
if (unitType === 'tool_call' && typeof unitContent === 'object' && unitContent !== null) {
const toolCall = unitContent as { action?: string; toolName?: string; parameters?: unknown; status?: string; output?: string };
if (toolCall.action === 'invoke') {
const params = toolCall.parameters ? JSON.stringify(toolCall.parameters) : '';
content = `[Tool] ${toolCall.toolName}(${params})`;
} else if (toolCall.action === 'result') {
const status = toolCall.status || 'unknown';
const output = toolCall.output ? `: ${toolCall.output.substring(0, 200)}${toolCall.output.length > 200 ? '...' : ''}` : '';
content = `[Tool Result] ${status}${output}`;
} else {
content = JSON.stringify(unitContent);
}
} else {
content = typeof unitContent === 'string' ? unitContent : JSON.stringify(unitContent);
}
const lines = content.split('\n');
const addOutput = useCliStreamStore.getState().addOutput;
lines.forEach(line => {
if (line.trim() || lines.length === 1) {
addOutput(p.executionId, {
type: (unitType as CliOutputLine['type']) || 'stdout',
content: line,
timestamp: Date.now()
});
}
});
} else if (type === 'CLI_COMPLETED') {
const p = payload as CliStreamCompletedPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: p.success ? 'completed' : 'error',
endTime,
output: [
{
type: 'system',
content: `[${new Date(endTime).toLocaleTimeString()}] CLI execution ${p.success ? 'completed successfully' : 'failed'}${p.duration ? ` (${formatDuration(p.duration)})` : ''}`,
timestamp: endTime
}
]
});
invalidateActive();
} else if (type === 'CLI_ERROR') {
const p = payload as CliStreamErrorPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: 'error',
endTime,
output: [
{
type: 'stderr',
content: `[ERROR] ${p.error || 'Unknown error occurred'}`,
timestamp: endTime
}
]
});
invalidateActive();
}
}, [lastMessage, invalidateActive]);
// Auto-scroll to bottom when new output arrives (optimized - only scroll when output length changes) // Auto-scroll to bottom when new output arrives (optimized - only scroll when output length changes)
useEffect(() => { useEffect(() => {

View File

@@ -125,15 +125,17 @@ export function StreamingOutput({
{/* Scroll to bottom button */} {/* Scroll to bottom button */}
{isUserScrolling && outputs.length > 0 && ( {isUserScrolling && outputs.length > 0 && (
<div className="sticky bottom-3 flex justify-end">
<Button <Button
size="sm" size="sm"
variant="secondary" variant="secondary"
className="absolute bottom-3 right-3" className="shadow-lg"
onClick={scrollToBottom} onClick={scrollToBottom}
> >
<ArrowDownToLine className="h-4 w-4 mr-1" /> <ArrowDownToLine className="h-4 w-4 mr-1" />
Scroll to bottom Scroll to bottom
</Button> </Button>
</div>
)} )}
</div> </div>
); );

View File

@@ -0,0 +1,202 @@
// ========================================
// useCliStreamWebSocket Hook
// ========================================
// Centralized WebSocket message handler for CLI stream
// Ensures each message is processed ONLY ONCE across all components
import { useEffect, useRef } from 'react';
import { useNotificationStore, selectWsLastMessage } from '@/stores';
import { useCliStreamStore, type CliOutputLine } from '@/stores/cliStreamStore';
import { useInvalidateActiveCliExecutions } from '@/hooks/useActiveCliExecutions';
// ========== Module-level Message Tracking ==========
// CRITICAL: This MUST be at module level to ensure single instance across all component mounts
// Prevents duplicate message processing when multiple components subscribe to WebSocket
let globalLastProcessedMsg: unknown = null;
// ========== Types for CLI WebSocket Messages ==========
interface CliStreamStartedPayload {
executionId: string;
tool: string;
mode: string;
timestamp: string;
}
interface CliStreamOutputPayload {
executionId: string;
chunkType: string;
data: unknown;
unit?: {
content: unknown;
type?: string;
};
}
interface CliStreamCompletedPayload {
executionId: string;
success: boolean;
duration?: number;
timestamp: string;
}
interface CliStreamErrorPayload {
executionId: string;
error?: string;
timestamp: string;
}
// ========== Helper Functions ==========
function formatDuration(ms: number): string {
if (ms < 1000) return `${ms}ms`;
const seconds = Math.floor(ms / 1000);
if (seconds < 60) return `${seconds}s`;
const minutes = Math.floor(seconds / 60);
const remainingSeconds = seconds % 60;
if (minutes < 60) return `${minutes}m ${remainingSeconds}s`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return `${hours}h ${remainingMinutes}m`;
}
// ========== Hook ==========
/**
* Centralized WebSocket handler for CLI stream messages
*
* IMPORTANT: This hook uses module-level state to ensure each WebSocket message
* is processed exactly ONCE, even when multiple components are mounted.
*
* Components should simply consume data from useCliStreamStore - no need to
* handle WebSocket messages individually.
*
* @example
* ```tsx
* // In your app root or layout component
* useCliStreamWebSocket();
*
* // In any component that needs CLI output
* const executions = useCliStreamStore(state => state.executions);
* ```
*/
export function useCliStreamWebSocket(): void {
const lastMessage = useNotificationStore(selectWsLastMessage);
const invalidateActive = useInvalidateActiveCliExecutions();
// Ref to track if this hook instance is active (for cleanup)
const isActiveRef = useRef(true);
useEffect(() => {
isActiveRef.current = true;
return () => {
isActiveRef.current = false;
};
}, []);
useEffect(() => {
// Skip if no message or already processed GLOBALLY
if (!lastMessage || lastMessage === globalLastProcessedMsg) return;
// Mark as processed immediately at module level
globalLastProcessedMsg = lastMessage;
// Check if hook is still active (component not unmounted)
if (!isActiveRef.current) return;
const { type, payload } = lastMessage;
if (type === 'CLI_STARTED') {
const p = payload as CliStreamStartedPayload;
const startTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
tool: p.tool || 'cli',
mode: p.mode || 'analysis',
status: 'running',
startTime,
output: [
{
type: 'system',
content: `[${new Date(startTime).toLocaleTimeString()}] CLI execution started: ${p.tool} (${p.mode} mode)`,
timestamp: startTime
}
]
});
invalidateActive();
} else if (type === 'CLI_OUTPUT') {
const p = payload as CliStreamOutputPayload;
const unitContent = p.unit?.content ?? p.data;
const unitType = p.unit?.type || p.chunkType;
let content: string;
if (unitType === 'tool_call' && typeof unitContent === 'object' && unitContent !== null) {
const toolCall = unitContent as { action?: string; toolName?: string; parameters?: unknown; status?: string; output?: string };
if (toolCall.action === 'invoke') {
const params = toolCall.parameters ? JSON.stringify(toolCall.parameters) : '';
content = `[Tool] ${toolCall.toolName}(${params})`;
} else if (toolCall.action === 'result') {
const status = toolCall.status || 'unknown';
const output = toolCall.output ? `: ${toolCall.output.substring(0, 200)}${toolCall.output.length > 200 ? '...' : ''}` : '';
content = `[Tool Result] ${status}${output}`;
} else {
content = JSON.stringify(unitContent);
}
} else {
content = typeof unitContent === 'string' ? unitContent : JSON.stringify(unitContent);
}
const lines = content.split('\n');
const addOutput = useCliStreamStore.getState().addOutput;
lines.forEach(line => {
if (line.trim() || lines.length === 1) {
addOutput(p.executionId, {
type: (unitType as CliOutputLine['type']) || 'stdout',
content: line,
timestamp: Date.now()
});
}
});
} else if (type === 'CLI_COMPLETED') {
const p = payload as CliStreamCompletedPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: p.success ? 'completed' : 'error',
endTime,
output: [
{
type: 'system',
content: `[${new Date(endTime).toLocaleTimeString()}] CLI execution ${p.success ? 'completed successfully' : 'failed'}${p.duration ? ` (${formatDuration(p.duration)})` : ''}`,
timestamp: endTime
}
]
});
invalidateActive();
} else if (type === 'CLI_ERROR') {
const p = payload as CliStreamErrorPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: 'error',
endTime,
output: [
{
type: 'stderr',
content: `[ERROR] ${p.error || 'Unknown error occurred'}`,
timestamp: endTime
}
]
});
invalidateActive();
}
}, [lastMessage, invalidateActive]);
}
/**
* Reset the global message tracker
* Useful for testing or when explicitly re-processing messages is needed
*/
export function resetCliStreamWebSocketTracker(): void {
globalLastProcessedMsg = null;
}
export default useCliStreamWebSocket;

View File

@@ -15,44 +15,9 @@ import {
useFocusedPaneId, useFocusedPaneId,
type AllotmentLayout, type AllotmentLayout,
} from '@/stores/viewerStore'; } from '@/stores/viewerStore';
import { useCliStreamStore, type CliOutputLine } from '@/stores/cliStreamStore'; import { useCliStreamStore } from '@/stores/cliStreamStore';
import { useNotificationStore, selectWsLastMessage } from '@/stores'; import { useActiveCliExecutions } from '@/hooks/useActiveCliExecutions';
import { useActiveCliExecutions, useInvalidateActiveCliExecutions } from '@/hooks/useActiveCliExecutions'; import { useCliStreamWebSocket } from '@/hooks/useCliStreamWebSocket';
// ========================================
// Types
// ========================================
// CLI WebSocket message types (matching CliStreamMonitorLegacy)
interface CliStreamStartedPayload {
executionId: string;
tool: string;
mode: string;
timestamp: string;
}
interface CliStreamOutputPayload {
executionId: string;
chunkType: string;
data: unknown;
unit?: {
content: unknown;
type?: string;
};
}
interface CliStreamCompletedPayload {
executionId: string;
success: boolean;
duration?: number;
timestamp: string;
}
interface CliStreamErrorPayload {
executionId: string;
error?: string;
timestamp: string;
}
// ======================================== // ========================================
// Constants // Constants
@@ -64,18 +29,6 @@ const DEFAULT_LAYOUT = 'split-h' as const;
// Helper Functions // Helper Functions
// ======================================== // ========================================
function formatDuration(ms: number): string {
if (ms < 1000) return `${ms}ms`;
const seconds = Math.floor(ms / 1000);
if (seconds < 60) return `${seconds}s`;
const minutes = Math.floor(seconds / 60);
const remainingSeconds = seconds % 60;
if (minutes < 60) return `${minutes}m ${remainingSeconds}s`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return `${hours}h ${remainingMinutes}m`;
}
/** /**
* Count total panes in layout * Count total panes in layout
*/ */
@@ -113,105 +66,11 @@ export function CliViewerPage() {
// CLI Stream Store hooks // CLI Stream Store hooks
const executions = useCliStreamStore((state) => state.executions); const executions = useCliStreamStore((state) => state.executions);
// Track last processed WebSocket message to prevent duplicate processing
const lastProcessedMsgRef = useRef<unknown>(null);
// WebSocket last message from notification store
const lastMessage = useNotificationStore(selectWsLastMessage);
// Active execution sync from server // Active execution sync from server
useActiveCliExecutions(true); useActiveCliExecutions(true);
const invalidateActive = useInvalidateActiveCliExecutions();
// Handle WebSocket messages for CLI stream (same logic as CliStreamMonitorLegacy) // CENTRALIZED WebSocket handler - processes each message only ONCE globally
useEffect(() => { useCliStreamWebSocket();
if (!lastMessage || lastMessage === lastProcessedMsgRef.current) return;
lastProcessedMsgRef.current = lastMessage;
const { type, payload } = lastMessage;
if (type === 'CLI_STARTED') {
const p = payload as CliStreamStartedPayload;
const startTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
tool: p.tool || 'cli',
mode: p.mode || 'analysis',
status: 'running',
startTime,
output: [
{
type: 'system',
content: `[${new Date(startTime).toLocaleTimeString()}] CLI execution started: ${p.tool} (${p.mode} mode)`,
timestamp: startTime
}
]
});
invalidateActive();
} else if (type === 'CLI_OUTPUT') {
const p = payload as CliStreamOutputPayload;
const unitContent = p.unit?.content ?? p.data;
const unitType = p.unit?.type || p.chunkType;
let content: string;
if (unitType === 'tool_call' && typeof unitContent === 'object' && unitContent !== null) {
const toolCall = unitContent as { action?: string; toolName?: string; parameters?: unknown; status?: string; output?: string };
if (toolCall.action === 'invoke') {
const params = toolCall.parameters ? JSON.stringify(toolCall.parameters) : '';
content = `[Tool] ${toolCall.toolName}(${params})`;
} else if (toolCall.action === 'result') {
const status = toolCall.status || 'unknown';
const output = toolCall.output ? `: ${toolCall.output.substring(0, 200)}${toolCall.output.length > 200 ? '...' : ''}` : '';
content = `[Tool Result] ${status}${output}`;
} else {
content = JSON.stringify(unitContent);
}
} else {
content = typeof unitContent === 'string' ? unitContent : JSON.stringify(unitContent);
}
const lines = content.split('\n');
const addOutput = useCliStreamStore.getState().addOutput;
lines.forEach(line => {
if (line.trim() || lines.length === 1) {
addOutput(p.executionId, {
type: (unitType as CliOutputLine['type']) || 'stdout',
content: line,
timestamp: Date.now()
});
}
});
} else if (type === 'CLI_COMPLETED') {
const p = payload as CliStreamCompletedPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: p.success ? 'completed' : 'error',
endTime,
output: [
{
type: 'system',
content: `[${new Date(endTime).toLocaleTimeString()}] CLI execution ${p.success ? 'completed successfully' : 'failed'}${p.duration ? ` (${formatDuration(p.duration)})` : ''}`,
timestamp: endTime
}
]
});
invalidateActive();
} else if (type === 'CLI_ERROR') {
const p = payload as CliStreamErrorPayload;
const endTime = p.timestamp ? new Date(p.timestamp).getTime() : Date.now();
useCliStreamStore.getState().upsertExecution(p.executionId, {
status: 'error',
endTime,
output: [
{
type: 'stderr',
content: `[ERROR] ${p.error || 'Unknown error occurred'}`,
timestamp: endTime
}
]
});
invalidateActive();
}
}, [lastMessage, invalidateActive]);
// Auto-add new executions as tabs, distributing across available panes // Auto-add new executions as tabs, distributing across available panes
const addedExecutionsRef = useRef<Set<string>>(new Set()); const addedExecutionsRef = useRef<Set<string>>(new Set());