fix(orchestrator): complete remaining high/medium priority fixes

Backend (orchestrator-routes.ts):
- Added broadcastExecutionStatusMessage helper for specific message types
- Added EXECUTION_PAUSED, EXECUTION_RESUMED, EXECUTION_STOPPED broadcasts
- Added CLI_SESSION_UNLOCKED broadcast on execution completion/failure
- Added sessionKey to ExecutionState interface for tracking
- Added totalSteps to EXECUTION_STARTED WebSocket message

Frontend (executionMonitorStore.ts):
- Added EXECUTION_FAILED message type handling
- Added totalSteps extraction from EXECUTION_STARTED payload
- Implemented pauseExecution, resumeExecution, stopExecution API calls
- Replaced TODO console.log with actual fetch API calls

Frontend (useWebSocket.ts):
- Added import for executionMonitorStore
- Added EXECUTION_* message routing to executionMonitorStore
This commit is contained in:
catlog22
2026-02-20 22:25:00 +08:00
parent ca1a3fca83
commit 87634740a3
3 changed files with 131 additions and 12 deletions

View File

@@ -13,6 +13,10 @@ import {
handleSessionLockedMessage,
handleSessionUnlockedMessage,
} from '@/stores/sessionManagerStore';
import {
useExecutionMonitorStore,
type ExecutionWSMessage,
} from '@/stores/executionMonitorStore';
import {
OrchestratorMessageSchema,
type OrchestratorWebSocketMessage,
@@ -330,6 +334,13 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
return;
}
// Handle EXECUTION messages (from orchestrator execution-in-session)
if (data.type?.startsWith('EXECUTION_')) {
const handleExecutionMessage = useExecutionMonitorStore.getState().handleExecutionMessage;
handleExecutionMessage(data as ExecutionWSMessage);
return;
}
// Handle A2UI surface messages
if (data.type === 'a2ui-surface') {
const parsed = SurfaceUpdateSchema.safeParse(data.payload);

View File

@@ -44,7 +44,8 @@ export type ExecutionWSMessageType =
| 'EXECUTION_PAUSED'
| 'EXECUTION_RESUMED'
| 'EXECUTION_STOPPED'
| 'EXECUTION_COMPLETED';
| 'EXECUTION_COMPLETED'
| 'EXECUTION_FAILED';
export interface ExecutionWSMessage {
type: ExecutionWSMessageType;
@@ -54,9 +55,11 @@ export interface ExecutionWSMessage {
sessionKey: string;
stepId?: string;
stepName?: string;
totalSteps?: number;
progress?: number;
output?: string;
error?: string;
reason?: string;
timestamp: string;
};
}
@@ -99,7 +102,7 @@ export const useExecutionMonitorStore = create<ExecutionMonitorStore>()(
handleExecutionMessage: (msg: ExecutionWSMessage) => {
const { type, payload } = msg;
const { executionId, flowId, sessionKey, stepId, stepName, output, error, timestamp } = payload;
const { executionId, flowId, sessionKey, stepId, stepName, totalSteps, output, error, timestamp } = payload;
set((state) => {
const existing = state.activeExecutions[executionId];
@@ -115,7 +118,7 @@ export const useExecutionMonitorStore = create<ExecutionMonitorStore>()(
flowName: stepName || 'Workflow',
sessionKey,
status: 'running',
totalSteps: 0,
totalSteps: totalSteps || 0,
completedSteps: 0,
steps: [],
startedAt: timestamp,
@@ -233,6 +236,15 @@ export const useExecutionMonitorStore = create<ExecutionMonitorStore>()(
},
};
case 'EXECUTION_FAILED':
if (!existing) return state;
return {
activeExecutions: {
...state.activeExecutions,
[executionId]: { ...existing, status: 'failed', completedAt: timestamp },
},
};
default:
return state;
}
@@ -243,19 +255,49 @@ export const useExecutionMonitorStore = create<ExecutionMonitorStore>()(
set({ currentExecutionId: executionId }, false, 'selectExecution');
},
pauseExecution: (executionId: string) => {
// TODO: Call API to pause execution
console.log('[ExecutionMonitor] Pause execution:', executionId);
pauseExecution: async (executionId: string) => {
try {
const response = await fetch(`/api/orchestrator/executions/${executionId}/pause`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
});
const result = await response.json();
if (!result.success) {
console.error('[ExecutionMonitor] Pause failed:', result.error);
}
} catch (error) {
console.error('[ExecutionMonitor] Pause error:', error);
}
},
resumeExecution: (executionId: string) => {
// TODO: Call API to resume execution
console.log('[ExecutionMonitor] Resume execution:', executionId);
resumeExecution: async (executionId: string) => {
try {
const response = await fetch(`/api/orchestrator/executions/${executionId}/resume`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
});
const result = await response.json();
if (!result.success) {
console.error('[ExecutionMonitor] Resume failed:', result.error);
}
} catch (error) {
console.error('[ExecutionMonitor] Resume error:', error);
}
},
stopExecution: (executionId: string) => {
// TODO: Call API to stop execution
console.log('[ExecutionMonitor] Stop execution:', executionId);
stopExecution: async (executionId: string) => {
try {
const response = await fetch(`/api/orchestrator/executions/${executionId}/stop`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
});
const result = await response.json();
if (!result.success) {
console.error('[ExecutionMonitor] Stop failed:', result.error);
}
} catch (error) {
console.error('[ExecutionMonitor] Stop error:', error);
}
},
setPanelOpen: (open: boolean) => {

View File

@@ -305,6 +305,8 @@ export interface ExecutionState {
startedAt?: string;
completedAt?: string;
currentNodeId?: string;
/** Session key if execution is running in a PTY session */
sessionKey?: string;
variables: Record<string, unknown>;
nodeStates: Record<string, NodeExecutionState>;
logs: ExecutionLog[];
@@ -1190,6 +1192,54 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
}
};
// Helper to broadcast specific execution status messages (for frontend executionMonitorStore)
const broadcastExecutionStatusMessage = (
execution: ExecutionState,
sessionKey?: string
): void => {
const timestamp = new Date().toISOString();
// Map execution status to specific message types
const messageTypeMap: Record<string, string> = {
paused: 'EXECUTION_PAUSED',
running: 'EXECUTION_RESUMED',
completed: 'EXECUTION_COMPLETED',
failed: 'EXECUTION_FAILED',
};
const messageType = messageTypeMap[execution.status];
if (messageType) {
try {
broadcastToClients({
type: messageType,
payload: {
executionId: execution.id,
flowId: execution.flowId,
status: execution.status,
timestamp,
},
});
} catch {
// Ignore broadcast errors
}
}
// Broadcast CLI_SESSION_UNLOCKED when execution completes or fails
if ((execution.status === 'completed' || execution.status === 'failed') && sessionKey) {
try {
broadcastToClients({
type: 'CLI_SESSION_UNLOCKED',
payload: {
sessionKey,
timestamp,
},
});
} catch {
// Ignore broadcast errors
}
}
};
// ==== EXECUTE FLOW ====
// POST /api/orchestrator/flows/:id/execute
if (pathname.match(/^\/api\/orchestrator\/flows\/[^/]+\/execute$/) && req.method === 'POST') {
@@ -1370,6 +1420,7 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
flowId: flowId,
status: 'pending',
startedAt: now,
sessionKey: sessionKey,
variables: { ...flow.variables, ...inputVariables },
nodeStates,
logs: [{
@@ -1393,6 +1444,7 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
flowId: flowId,
sessionKey: sessionKey,
stepName: flow.name,
totalSteps: flow.nodes.length,
timestamp: now
}
});
@@ -1484,6 +1536,7 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
await writeExecutionStorage(workflowDir, execution);
broadcastExecutionStateUpdate(execution);
broadcastExecutionStatusMessage(execution);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
@@ -1567,6 +1620,7 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
await writeExecutionStorage(workflowDir, execution);
broadcastExecutionStateUpdate(execution);
broadcastExecutionStatusMessage(execution);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
@@ -1652,6 +1706,18 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
await writeExecutionStorage(workflowDir, execution);
broadcastExecutionStateUpdate(execution);
broadcastExecutionStatusMessage(execution, execution.sessionKey);
// Broadcast EXECUTION_STOPPED for frontend executionMonitorStore
broadcastToClients({
type: 'EXECUTION_STOPPED',
payload: {
executionId: execution.id,
flowId: execution.flowId,
reason: 'User requested stop',
timestamp: now,
},
});
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({