mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-28 09:23:08 +08:00
Add orchestrator types and error handling configurations
- Introduced new TypeScript types for orchestrator functionality, including `SessionStrategy`, `ErrorHandlingStrategy`, and `OrchestrationStep`. - Defined interfaces for `OrchestrationPlan` and `ManualOrchestrationParams` to facilitate orchestration management. - Added a new PNG image file for visual representation. - Created a placeholder file named 'nul' for future use.
This commit is contained in:
344
ccw/frontend/src/orchestrator/OrchestrationPlanBuilder.ts
Normal file
344
ccw/frontend/src/orchestrator/OrchestrationPlanBuilder.ts
Normal file
@@ -0,0 +1,344 @@
|
||||
import {
|
||||
OrchestrationPlan,
|
||||
OrchestrationStep,
|
||||
SessionStrategy,
|
||||
ErrorHandling,
|
||||
ExecutionType,
|
||||
OrchestrationMetadata,
|
||||
ManualOrchestrationParams,
|
||||
} from '../types/orchestrator';
|
||||
import { Flow, FlowNode, PromptTemplateNodeData } from '../types/flow';
|
||||
import { IssueQueue } from '../lib/api';
|
||||
import { buildQueueItemContext } from '../lib/queue-prompt'; // Assuming this function is available
|
||||
|
||||
/**
|
||||
* Builds OrchestrationPlan objects from various sources (Flow, IssueQueue, Manual Input).
|
||||
* This class is responsible for transforming source data into a standardized OrchestrationPlan,
|
||||
* including dependency resolution, context mapping, and basic plan metadata generation.
|
||||
*/
|
||||
export class OrchestrationPlanBuilder {
|
||||
private static DEFAULT_SESSION_STRATEGY: SessionStrategy = 'reuse_default';
|
||||
private static DEFAULT_ERROR_HANDLING: ErrorHandling = {
|
||||
strategy: 'pause_on_error',
|
||||
maxRetries: 0,
|
||||
retryDelayMs: 0,
|
||||
};
|
||||
|
||||
/**
|
||||
* Converts a Flow DAG into a topologically-sorted OrchestrationPlan.
|
||||
*
|
||||
* @param flow The Flow object to convert.
|
||||
* @returns An OrchestrationPlan.
|
||||
*/
|
||||
public static fromFlow(flow: Flow): OrchestrationPlan {
|
||||
const steps: OrchestrationStep[] = [];
|
||||
const nodeMap = new Map<string, FlowNode>(flow.nodes.map((node) => [node.id, node]));
|
||||
const adjacencyList = new Map<string, string[]>(); // node.id -> list of dependent node.ids
|
||||
const inDegree = new Map<string, number>(); // node.id -> count of incoming edges
|
||||
|
||||
// Initialize in-degrees and adjacency list
|
||||
for (const node of flow.nodes) {
|
||||
inDegree.set(node.id, 0);
|
||||
adjacencyList.set(node.id, []);
|
||||
}
|
||||
|
||||
for (const edge of flow.edges) {
|
||||
// Ensure the edge target node exists before incrementing in-degree
|
||||
if (inDegree.has(edge.target)) {
|
||||
inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1);
|
||||
// Ensure the adjacency list source node exists before adding
|
||||
adjacencyList.get(edge.source)?.push(edge.target);
|
||||
}
|
||||
}
|
||||
|
||||
// Kahn's algorithm for topological sort
|
||||
const queue: string[] = [];
|
||||
for (const [nodeId, degree] of inDegree.entries()) {
|
||||
if (degree === 0) {
|
||||
queue.push(nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
const sortedNodeIds: string[] = [];
|
||||
while (queue.length > 0) {
|
||||
const nodeId = queue.shift()!;
|
||||
sortedNodeIds.push(nodeId);
|
||||
|
||||
for (const neighborId of adjacencyList.get(nodeId) || []) {
|
||||
inDegree.set(neighborId, (inDegree.get(neighborId) || 0) - 1);
|
||||
if (inDegree.get(neighborId) === 0) {
|
||||
queue.push(neighborId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cycle detection
|
||||
if (sortedNodeIds.length !== flow.nodes.length) {
|
||||
// This should ideally be a more specific error or an exception
|
||||
console.error('Cycle detected in flow graph. Topological sort failed.');
|
||||
throw new Error('Cycle detected in flow graph. Cannot build orchestration plan from cyclic flow.');
|
||||
}
|
||||
|
||||
// Convert sorted nodes to OrchestrationSteps
|
||||
for (const nodeId of sortedNodeIds) {
|
||||
const node = nodeMap.get(nodeId)!;
|
||||
const nodeData = node.data as PromptTemplateNodeData; // Assuming all nodes are PromptTemplateNodeData
|
||||
|
||||
const dependsOn = flow.edges
|
||||
.filter((edge) => edge.target === node.id)
|
||||
.map((edge) => edge.source);
|
||||
|
||||
// Map delivery to sessionStrategy
|
||||
let sessionStrategy: SessionStrategy | undefined;
|
||||
if (nodeData.delivery === 'newExecution') {
|
||||
sessionStrategy = 'new_session';
|
||||
} else if (nodeData.delivery === 'sendToSession' && nodeData.targetSessionKey) {
|
||||
sessionStrategy = 'specific_session';
|
||||
} else if (nodeData.delivery === 'sendToSession' && !nodeData.targetSessionKey) {
|
||||
// Fallback or explicit default if targetSessionKey is missing for sendToSession
|
||||
sessionStrategy = 'reuse_default';
|
||||
}
|
||||
|
||||
// Determine execution type
|
||||
let executionType: ExecutionType = 'frontend-cli'; // Default
|
||||
if (nodeData.slashCommand) {
|
||||
executionType = 'slash-command';
|
||||
} else if (nodeData.tool && nodeData.mode) {
|
||||
// More sophisticated logic might be needed here to differentiate backend-flow
|
||||
// For now, if tool/mode are present, assume frontend-cli or backend-flow
|
||||
// depending on whether it's a direct CLI call or a backend orchestrator call.
|
||||
// Assuming CLI tools are frontend-cli for now unless specified otherwise.
|
||||
executionType = 'frontend-cli';
|
||||
}
|
||||
|
||||
steps.push({
|
||||
id: node.id,
|
||||
name: nodeData.label || `Step ${node.id}`,
|
||||
instruction: nodeData.instruction || '',
|
||||
tool: nodeData.tool,
|
||||
mode: nodeData.mode,
|
||||
sessionStrategy: sessionStrategy,
|
||||
targetSessionKey: nodeData.targetSessionKey,
|
||||
resumeKey: nodeData.resumeKey,
|
||||
dependsOn: dependsOn,
|
||||
condition: nodeData.condition,
|
||||
contextRefs: nodeData.contextRefs,
|
||||
outputName: nodeData.outputName,
|
||||
// Error handling can be added at node level if flow nodes support it
|
||||
errorHandling: undefined,
|
||||
executionType: executionType,
|
||||
sourceNodeId: node.id,
|
||||
});
|
||||
}
|
||||
|
||||
const metadata: OrchestrationMetadata = {
|
||||
totalSteps: steps.length,
|
||||
hasParallelGroups: OrchestrationPlanBuilder.detectParallelGroups(steps), // Implement this
|
||||
estimatedComplexity: OrchestrationPlanBuilder.estimateComplexity(steps), // Implement this
|
||||
};
|
||||
|
||||
return {
|
||||
id: flow.id,
|
||||
name: flow.name,
|
||||
source: 'flow',
|
||||
sourceId: flow.id,
|
||||
steps: steps,
|
||||
variables: flow.variables,
|
||||
defaultSessionStrategy: OrchestrationPlanBuilder.DEFAULT_SESSION_STRATEGY,
|
||||
defaultErrorHandling: OrchestrationPlanBuilder.DEFAULT_ERROR_HANDLING,
|
||||
status: 'pending',
|
||||
createdAt: flow.created_at,
|
||||
updatedAt: flow.updated_at,
|
||||
metadata: metadata,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts an IssueQueue with execution groups into an OrchestrationPlan.
|
||||
*
|
||||
* @param queue The IssueQueue object.
|
||||
* @param issues A map of issue IDs to Issue objects, needed for context.
|
||||
* @returns An OrchestrationPlan.
|
||||
*/
|
||||
public static fromQueue(queue: IssueQueue, issues: Map<string, any>): OrchestrationPlan {
|
||||
const steps: OrchestrationStep[] = [];
|
||||
const groupIdToSteps = new Map<string, string[]>(); // Maps group ID to list of step IDs in that group
|
||||
const allStepIds = new Set<string>();
|
||||
|
||||
let previousGroupStepIds: string[] = [];
|
||||
|
||||
for (const groupId of queue.execution_groups) {
|
||||
const groupItems = queue.grouped_items[groupId] || [];
|
||||
const currentGroupStepIds: string[] = [];
|
||||
const groupDependsOn: string[] = []; // Dependencies for the current group
|
||||
|
||||
if (groupId.startsWith('S*') || groupId.startsWith('P*')) {
|
||||
// Sequential or parallel groups: depend on all steps from the previous group
|
||||
groupDependsOn.push(...previousGroupStepIds);
|
||||
}
|
||||
|
||||
for (const item of groupItems) {
|
||||
const stepId = `queue-item-${item.item_id}`;
|
||||
allStepIds.add(stepId);
|
||||
currentGroupStepIds.push(stepId);
|
||||
|
||||
// Fetch the associated issue
|
||||
const issue = issues.get(item.issue_id);
|
||||
const instruction = issue ? buildQueueItemContext(item, issue) : `Execute queue item ${item.item_id}`;
|
||||
|
||||
// Queue items are typically frontend-cli executions
|
||||
const executionType: ExecutionType = 'frontend-cli';
|
||||
|
||||
steps.push({
|
||||
id: stepId,
|
||||
name: `Queue Item: ${item.item_id}`,
|
||||
instruction: instruction,
|
||||
tool: undefined, // Queue items don't typically specify tool/mode directly
|
||||
mode: undefined,
|
||||
sessionStrategy: OrchestrationPlanBuilder.DEFAULT_SESSION_STRATEGY,
|
||||
targetSessionKey: undefined,
|
||||
resumeKey: undefined,
|
||||
dependsOn: groupDependsOn, // All items in the current group depend on the previous group's steps
|
||||
condition: undefined,
|
||||
contextRefs: undefined,
|
||||
outputName: `queueItemOutput_${item.item_id}`,
|
||||
errorHandling: undefined,
|
||||
executionType: executionType,
|
||||
sourceItemId: item.item_id,
|
||||
});
|
||||
}
|
||||
|
||||
groupIdToSteps.set(groupId, currentGroupStepIds);
|
||||
previousGroupStepIds = currentGroupStepIds;
|
||||
}
|
||||
|
||||
const metadata: OrchestrationMetadata = {
|
||||
totalSteps: steps.length,
|
||||
hasParallelGroups: queue.execution_groups.some((id) => id.startsWith('P*')),
|
||||
estimatedComplexity: OrchestrationPlanBuilder.estimateComplexity(steps),
|
||||
};
|
||||
|
||||
return {
|
||||
id: queue.id || `queue-${Date.now()}`,
|
||||
name: `Queue Plan: ${queue.id || 'Untitled'}`,
|
||||
source: 'queue',
|
||||
sourceId: queue.id,
|
||||
steps: steps,
|
||||
variables: {}, // Queue plans might not have global variables in the same way flows do
|
||||
defaultSessionStrategy: OrchestrationPlanBuilder.DEFAULT_SESSION_STRATEGY,
|
||||
defaultErrorHandling: OrchestrationPlanBuilder.DEFAULT_ERROR_HANDLING,
|
||||
status: 'pending',
|
||||
createdAt: new Date().toISOString(),
|
||||
updatedAt: new Date().toISOString(),
|
||||
metadata: metadata,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a single-step OrchestrationPlan from manual user input.
|
||||
*
|
||||
* @param params Parameters for the manual orchestration.
|
||||
* @returns An OrchestrationPlan.
|
||||
*/
|
||||
public static fromManual(params: ManualOrchestrationParams): OrchestrationPlan {
|
||||
const stepId = `manual-step-${Date.now()}`;
|
||||
const manualStep: OrchestrationStep = {
|
||||
id: stepId,
|
||||
name: 'Manual Execution',
|
||||
instruction: params.prompt,
|
||||
tool: params.tool,
|
||||
mode: params.mode,
|
||||
sessionStrategy: params.sessionStrategy || OrchestrationPlanBuilder.DEFAULT_SESSION_STRATEGY,
|
||||
targetSessionKey: params.targetSessionKey,
|
||||
resumeKey: undefined,
|
||||
dependsOn: [],
|
||||
condition: undefined,
|
||||
contextRefs: undefined,
|
||||
outputName: params.outputName,
|
||||
errorHandling: params.errorHandling,
|
||||
executionType: 'frontend-cli', // Manual commands are typically frontend CLI
|
||||
sourceNodeId: undefined,
|
||||
sourceItemId: undefined,
|
||||
};
|
||||
|
||||
const metadata: OrchestrationMetadata = {
|
||||
totalSteps: 1,
|
||||
hasParallelGroups: false,
|
||||
estimatedComplexity: 'low',
|
||||
};
|
||||
|
||||
return {
|
||||
id: `manual-plan-${Date.now()}`,
|
||||
name: 'Manual Orchestration',
|
||||
source: 'manual',
|
||||
sourceId: undefined,
|
||||
steps: [manualStep],
|
||||
variables: {},
|
||||
defaultSessionStrategy: OrchestrationPlanBuilder.DEFAULT_SESSION_STRATEGY,
|
||||
defaultErrorHandling: OrchestrationPlanBuilder.DEFAULT_ERROR_HANDLING,
|
||||
status: 'pending',
|
||||
createdAt: new Date().toISOString(),
|
||||
updatedAt: new Date().toISOString(),
|
||||
metadata: metadata,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to detect if the plan contains parallel groups.
|
||||
* @param steps The steps of the orchestration plan.
|
||||
* @returns True if parallel groups are detected, false otherwise.
|
||||
*/
|
||||
private static detectParallelGroups(steps: OrchestrationStep[]): boolean {
|
||||
// A simple heuristic: check if any two steps have the same 'dependsOn' set
|
||||
// but are not explicitly dependent on each other, implying they can run in parallel.
|
||||
// This is a basic check and might need refinement.
|
||||
const dependencySets = new Map<string, Set<string>>();
|
||||
for (const step of steps) {
|
||||
const depKey = JSON.stringify(step.dependsOn.sort());
|
||||
if (!dependencySets.has(depKey)) {
|
||||
dependencySets.set(depKey, new Set());
|
||||
}
|
||||
dependencySets.get(depKey)!.add(step.id);
|
||||
}
|
||||
|
||||
for (const [, stepIds] of dependencySets.entries()) {
|
||||
if (stepIds.size > 1) {
|
||||
// If multiple steps share the same dependencies, they might be parallel
|
||||
// Need to ensure they don't have implicit dependencies among themselves
|
||||
let isParallelGroup = true;
|
||||
for (const id1 of stepIds) {
|
||||
for (const id2 of stepIds) {
|
||||
if (id1 !== id2) {
|
||||
const step1 = steps.find(s => s.id === id1);
|
||||
const step2 = steps.find(s => s.id === id2);
|
||||
// If step1 depends on step2 or vice-versa, they are not parallel
|
||||
if (step1?.dependsOn.includes(id2) || step2?.dependsOn.includes(id1)) {
|
||||
isParallelGroup = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!isParallelGroup) break;
|
||||
}
|
||||
if (isParallelGroup) return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to estimate the complexity of the orchestration plan.
|
||||
* @param steps The steps of the orchestration plan.
|
||||
* @returns 'low', 'medium', or 'high'.
|
||||
*/
|
||||
private static estimateComplexity(steps: OrchestrationStep[]): 'low' | 'medium' | 'high' {
|
||||
if (steps.length <= 1) {
|
||||
return 'low';
|
||||
}
|
||||
// Heuristic: More steps or presence of parallel groups increases complexity
|
||||
if (steps.length > 5 || OrchestrationPlanBuilder.detectParallelGroups(steps)) {
|
||||
return 'high';
|
||||
}
|
||||
return 'medium';
|
||||
}
|
||||
}
|
||||
478
ccw/frontend/src/orchestrator/SequentialRunner.ts
Normal file
478
ccw/frontend/src/orchestrator/SequentialRunner.ts
Normal file
@@ -0,0 +1,478 @@
|
||||
// ========================================
|
||||
// Sequential Runner
|
||||
// ========================================
|
||||
// Manages PTY session lifecycle and step-by-step command dispatch for
|
||||
// orchestration plans. Creates/reuses CLI sessions and dispatches
|
||||
// steps sequentially, resolving runtime variables between steps.
|
||||
//
|
||||
// Integration pattern:
|
||||
// 1. SequentialRunner.start() -> dispatches first step
|
||||
// 2. WebSocket CLI_COMPLETED -> useCompletionCallbackChain -> store update
|
||||
// 3. Store subscription detects step completion -> executeStep(nextStepId)
|
||||
// 4. Repeat until all steps complete
|
||||
//
|
||||
// Uses store subscription (Option B) for clean separation between
|
||||
// the callback chain (which updates the store) and the runner
|
||||
// (which reacts to store changes by dispatching the next step).
|
||||
|
||||
import type { OrchestrationPlan } from '../types/orchestrator';
|
||||
import { dispatch } from '../lib/unifiedExecutionDispatcher';
|
||||
import type { DispatchOptions } from '../lib/unifiedExecutionDispatcher';
|
||||
import { createCliSession } from '../lib/api';
|
||||
import { useOrchestratorStore } from '../stores/orchestratorStore';
|
||||
import type { OrchestrationRunState, StepRunState } from '../stores/orchestratorStore';
|
||||
|
||||
// ========== Types ==========
|
||||
|
||||
/** Configuration options for starting an orchestration plan */
|
||||
export interface StartOptions {
|
||||
/** Working directory for session creation */
|
||||
workingDir?: string;
|
||||
/** Project path for API routing */
|
||||
projectPath?: string;
|
||||
/** Execution category for tracking */
|
||||
category?: DispatchOptions['category'];
|
||||
}
|
||||
|
||||
/** Tracks active subscriptions per plan for cleanup */
|
||||
interface PlanSubscription {
|
||||
/** Zustand unsubscribe function */
|
||||
unsubscribe: () => void;
|
||||
/** The plan ID being tracked */
|
||||
planId: string;
|
||||
/** Set of step IDs that have already been dispatched (prevents double-dispatch) */
|
||||
dispatchedSteps: Set<string>;
|
||||
/** Options passed at start() for reuse during step dispatches */
|
||||
options: StartOptions;
|
||||
}
|
||||
|
||||
// ========== Module State ==========
|
||||
|
||||
/** Active subscriptions keyed by plan ID */
|
||||
const activeSubscriptions = new Map<string, PlanSubscription>();
|
||||
|
||||
// ========== Public API ==========
|
||||
|
||||
/**
|
||||
* Start executing an orchestration plan.
|
||||
*
|
||||
* 1. Registers the plan in the orchestratorStore
|
||||
* 2. Creates a new CLI session if needed (based on plan's defaultSessionStrategy)
|
||||
* 3. Subscribes to store changes for automated step advancement
|
||||
* 4. Dispatches the first ready step
|
||||
*
|
||||
* @param plan - The orchestration plan to execute
|
||||
* @param sessionKey - Optional existing session key to reuse
|
||||
* @param options - Additional options for session creation and dispatch
|
||||
*/
|
||||
export async function start(
|
||||
plan: OrchestrationPlan,
|
||||
sessionKey?: string,
|
||||
options: StartOptions = {}
|
||||
): Promise<void> {
|
||||
const store = useOrchestratorStore.getState();
|
||||
|
||||
// Clean up any existing subscription for this plan
|
||||
stop(plan.id);
|
||||
|
||||
// Resolve session key
|
||||
let resolvedSessionKey = sessionKey;
|
||||
if (!resolvedSessionKey && plan.defaultSessionStrategy === 'new_session') {
|
||||
const result = await createCliSession(
|
||||
{
|
||||
workingDir: options.workingDir,
|
||||
tool: plan.steps[0]?.tool,
|
||||
},
|
||||
options.projectPath
|
||||
);
|
||||
resolvedSessionKey = result.session.sessionKey;
|
||||
}
|
||||
|
||||
// Initialize plan in the store
|
||||
store.startOrchestration(plan, resolvedSessionKey);
|
||||
|
||||
// Subscribe to store changes for automated step advancement
|
||||
const subscription = subscribeToStepAdvancement(plan.id, options);
|
||||
activeSubscriptions.set(plan.id, subscription);
|
||||
|
||||
// Dispatch the first ready step
|
||||
const firstStepId = store.getNextReadyStep(plan.id);
|
||||
if (firstStepId) {
|
||||
subscription.dispatchedSteps.add(firstStepId);
|
||||
await executeStep(plan.id, firstStepId, options);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a specific step within a plan.
|
||||
*
|
||||
* 1. Resolves runtime variables in the step instruction
|
||||
* 2. Resolves contextRefs from previous step outputs
|
||||
* 3. Updates step status to 'running'
|
||||
* 4. Dispatches execution via UnifiedExecutionDispatcher
|
||||
* 5. Registers the executionId for callback chain matching
|
||||
*
|
||||
* @param planId - The plan containing the step
|
||||
* @param stepId - The step to execute
|
||||
* @param options - Dispatch options (workingDir, projectPath, etc.)
|
||||
*/
|
||||
export async function executeStep(
|
||||
planId: string,
|
||||
stepId: string,
|
||||
options: StartOptions = {}
|
||||
): Promise<void> {
|
||||
const store = useOrchestratorStore.getState();
|
||||
const runState = store.activePlans[planId];
|
||||
if (!runState) {
|
||||
console.error(`[SequentialRunner] Plan "${planId}" not found in store`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Find the step definition
|
||||
const step = runState.plan.steps.find((s) => s.id === stepId);
|
||||
if (!step) {
|
||||
console.error(`[SequentialRunner] Step "${stepId}" not found in plan "${planId}"`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Collect previous step outputs for variable interpolation
|
||||
const stepOutputs = collectStepOutputs(runState);
|
||||
|
||||
// Resolve runtime variables in the instruction
|
||||
const resolvedInstruction = interpolateInstruction(
|
||||
step.instruction,
|
||||
runState.plan.variables,
|
||||
stepOutputs
|
||||
);
|
||||
|
||||
// Resolve contextRefs - append previous step outputs as context
|
||||
const contextSuffix = resolveContextRefs(step.contextRefs, stepOutputs);
|
||||
const finalInstruction = contextSuffix
|
||||
? `${resolvedInstruction}\n\n--- Context from previous steps ---\n${contextSuffix}`
|
||||
: resolvedInstruction;
|
||||
|
||||
// Create a modified step with the resolved instruction for dispatch
|
||||
const resolvedStep = { ...step, instruction: finalInstruction };
|
||||
|
||||
// Mark step as running
|
||||
store.updateStepStatus(planId, stepId, 'running');
|
||||
|
||||
try {
|
||||
// Dispatch via UnifiedExecutionDispatcher
|
||||
const result = await dispatch(resolvedStep, runState.sessionKey ?? '', {
|
||||
workingDir: options.workingDir,
|
||||
projectPath: options.projectPath,
|
||||
category: options.category,
|
||||
resumeKey: step.resumeKey,
|
||||
});
|
||||
|
||||
// Register executionId for callback chain matching
|
||||
store.registerExecution(planId, stepId, result.executionId);
|
||||
|
||||
// If dispatch created a new session and plan had no session, update the run state
|
||||
if (result.isNewSession && !runState.sessionKey) {
|
||||
// Update session key on the run state by re-reading store
|
||||
// The session key is now tracked on the dispatch result
|
||||
// Future steps in this plan will use this session via the store's sessionKey
|
||||
const currentState = useOrchestratorStore.getState();
|
||||
const currentRunState = currentState.activePlans[planId];
|
||||
if (currentRunState && !currentRunState.sessionKey) {
|
||||
// The store does not expose a setSessionKey action, so we rely on
|
||||
// the dispatch result's sessionKey being used by subsequent steps.
|
||||
// This is handled by resolveSessionKey in the dispatcher using
|
||||
// the step's sessionStrategy or reuse_default with the plan's sessionKey.
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
const errorMessage = err instanceof Error ? err.message : String(err);
|
||||
console.error(`[SequentialRunner] Failed to dispatch step "${stepId}":`, errorMessage);
|
||||
store.updateStepStatus(planId, stepId, 'failed', { error: errorMessage });
|
||||
// Error handling (pause/skip/stop) is handled by useCompletionCallbackChain
|
||||
// but since this is a dispatch failure (not a CLI completion failure),
|
||||
// we need to apply error handling here too
|
||||
applyDispatchErrorHandling(planId, stepId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when _advanceToNextStep identifies a next step.
|
||||
* If nextStepId is not null, dispatches execution for that step.
|
||||
* If null, orchestration is complete (store already updated).
|
||||
*
|
||||
* @param planId - The plan ID
|
||||
* @param nextStepId - The next step to execute, or null if complete
|
||||
* @param options - Dispatch options
|
||||
*/
|
||||
export async function onStepAdvanced(
|
||||
planId: string,
|
||||
nextStepId: string | null,
|
||||
options: StartOptions = {}
|
||||
): Promise<void> {
|
||||
if (nextStepId) {
|
||||
await executeStep(planId, nextStepId, options);
|
||||
}
|
||||
// If null, orchestration is complete - store already updated by _advanceToNextStep
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop tracking a plan and clean up its store subscription.
|
||||
*
|
||||
* @param planId - The plan to stop tracking
|
||||
*/
|
||||
export function stop(planId: string): void {
|
||||
const subscription = activeSubscriptions.get(planId);
|
||||
if (subscription) {
|
||||
subscription.unsubscribe();
|
||||
activeSubscriptions.delete(planId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop all active plan subscriptions.
|
||||
*/
|
||||
export function stopAll(): void {
|
||||
for (const [planId] of activeSubscriptions) {
|
||||
stop(planId);
|
||||
}
|
||||
}
|
||||
|
||||
// ========== Variable Interpolation ==========
|
||||
|
||||
/**
|
||||
* Replace {{variableName}} placeholders in an instruction string with
|
||||
* values from the plan variables and previous step outputs.
|
||||
*
|
||||
* Supports:
|
||||
* - Simple replacement: {{variableName}} -> value from variables map
|
||||
* - Step output reference: {{stepOutputName}} -> value from step outputs
|
||||
* - Nested dot-notation: {{step1.output.field}} -> nested property access
|
||||
*
|
||||
* @param instruction - The instruction template string
|
||||
* @param variables - Plan-level variables
|
||||
* @param stepOutputs - Collected outputs from completed steps
|
||||
* @returns The interpolated instruction string
|
||||
*/
|
||||
export function interpolateInstruction(
|
||||
instruction: string,
|
||||
variables: Record<string, unknown>,
|
||||
stepOutputs: Record<string, unknown>
|
||||
): string {
|
||||
return instruction.replace(/\{\{([^}]+)\}\}/g, (_match, key: string) => {
|
||||
const trimmedKey = key.trim();
|
||||
|
||||
// Try plan variables first (simple key)
|
||||
if (trimmedKey in variables) {
|
||||
return formatValue(variables[trimmedKey]);
|
||||
}
|
||||
|
||||
// Try step outputs (simple key)
|
||||
if (trimmedKey in stepOutputs) {
|
||||
return formatValue(stepOutputs[trimmedKey]);
|
||||
}
|
||||
|
||||
// Try nested dot-notation in step outputs
|
||||
const nestedValue = resolveNestedPath(trimmedKey, stepOutputs);
|
||||
if (nestedValue !== undefined) {
|
||||
return formatValue(nestedValue);
|
||||
}
|
||||
|
||||
// Try nested dot-notation in plan variables
|
||||
const nestedVarValue = resolveNestedPath(trimmedKey, variables);
|
||||
if (nestedVarValue !== undefined) {
|
||||
return formatValue(nestedVarValue);
|
||||
}
|
||||
|
||||
// Unresolved placeholder - leave as-is
|
||||
return `{{${trimmedKey}}}`;
|
||||
});
|
||||
}
|
||||
|
||||
// ========== Internal Helpers ==========
|
||||
|
||||
/**
|
||||
* Subscribe to orchestratorStore changes for a specific plan.
|
||||
* When a step transitions to 'completed' or 'skipped', check if there is
|
||||
* a new ready step and dispatch it.
|
||||
*
|
||||
* This implements Option B (store subscription) for clean separation
|
||||
* between the callback chain (store updates) and the runner (step dispatch).
|
||||
*/
|
||||
function subscribeToStepAdvancement(
|
||||
planId: string,
|
||||
options: StartOptions
|
||||
): PlanSubscription {
|
||||
const dispatchedSteps = new Set<string>();
|
||||
|
||||
// Track previous step statuses to detect transitions
|
||||
let previousStatuses: Record<string, StepRunState> | undefined;
|
||||
|
||||
const unsubscribe = useOrchestratorStore.subscribe((state) => {
|
||||
const runState = state.activePlans[planId];
|
||||
if (!runState || runState.status !== 'running') return;
|
||||
|
||||
const currentStatuses = runState.stepStatuses;
|
||||
|
||||
// On first call, just capture the initial state
|
||||
if (!previousStatuses) {
|
||||
previousStatuses = currentStatuses;
|
||||
return;
|
||||
}
|
||||
|
||||
// Detect if any step just transitioned to a terminal state (completed/skipped)
|
||||
let hasNewCompletion = false;
|
||||
for (const [stepId, stepState] of Object.entries(currentStatuses)) {
|
||||
const prevState = previousStatuses[stepId];
|
||||
if (!prevState) continue;
|
||||
|
||||
const wasTerminal =
|
||||
prevState.status === 'completed' || prevState.status === 'skipped';
|
||||
const isTerminal =
|
||||
stepState.status === 'completed' || stepState.status === 'skipped';
|
||||
|
||||
if (!wasTerminal && isTerminal) {
|
||||
hasNewCompletion = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
previousStatuses = currentStatuses;
|
||||
|
||||
if (!hasNewCompletion) return;
|
||||
|
||||
// A step just completed - check if _advanceToNextStep was already called
|
||||
// (by useCompletionCallbackChain). We detect the next ready step.
|
||||
const store = useOrchestratorStore.getState();
|
||||
const nextStepId = store.getNextReadyStep(planId);
|
||||
|
||||
if (nextStepId && !dispatchedSteps.has(nextStepId)) {
|
||||
dispatchedSteps.add(nextStepId);
|
||||
// Dispatch asynchronously to avoid blocking the subscription callback
|
||||
onStepAdvanced(planId, nextStepId, options).catch((err) => {
|
||||
console.error(
|
||||
`[SequentialRunner] Failed to advance to step "${nextStepId}":`,
|
||||
err
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
unsubscribe,
|
||||
planId,
|
||||
dispatchedSteps,
|
||||
options,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect output results from completed steps, keyed by outputName.
|
||||
*/
|
||||
function collectStepOutputs(
|
||||
runState: OrchestrationRunState
|
||||
): Record<string, unknown> {
|
||||
const outputs: Record<string, unknown> = {};
|
||||
|
||||
for (const step of runState.plan.steps) {
|
||||
if (!step.outputName) continue;
|
||||
|
||||
const stepState = runState.stepStatuses[step.id];
|
||||
if (stepState && (stepState.status === 'completed' || stepState.status === 'skipped')) {
|
||||
outputs[step.outputName] = stepState.result;
|
||||
}
|
||||
}
|
||||
|
||||
return outputs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve contextRefs by looking up output values from previous steps
|
||||
* and formatting them as a context string to append to the instruction.
|
||||
*/
|
||||
function resolveContextRefs(
|
||||
contextRefs: string[] | undefined,
|
||||
stepOutputs: Record<string, unknown>
|
||||
): string {
|
||||
if (!contextRefs || contextRefs.length === 0) return '';
|
||||
|
||||
const parts: string[] = [];
|
||||
|
||||
for (const ref of contextRefs) {
|
||||
const value = stepOutputs[ref];
|
||||
if (value !== undefined) {
|
||||
parts.push(`[${ref}]:\n${formatValue(value)}`);
|
||||
}
|
||||
}
|
||||
|
||||
return parts.join('\n\n');
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve a dot-notation path against an object.
|
||||
* E.g., "step1.output.field" resolves by traversing the nested structure.
|
||||
*/
|
||||
function resolveNestedPath(
|
||||
path: string,
|
||||
obj: Record<string, unknown>
|
||||
): unknown | undefined {
|
||||
const parts = path.split('.');
|
||||
let current: unknown = obj;
|
||||
|
||||
for (const part of parts) {
|
||||
if (current === null || current === undefined) return undefined;
|
||||
if (typeof current !== 'object') return undefined;
|
||||
current = (current as Record<string, unknown>)[part];
|
||||
}
|
||||
|
||||
return current;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a value for insertion into an instruction string.
|
||||
*/
|
||||
function formatValue(value: unknown): string {
|
||||
if (value === null || value === undefined) return '';
|
||||
if (typeof value === 'string') return value;
|
||||
if (typeof value === 'number' || typeof value === 'boolean') return String(value);
|
||||
// For objects/arrays, produce a JSON string
|
||||
try {
|
||||
return JSON.stringify(value, null, 2);
|
||||
} catch {
|
||||
return String(value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply error handling for dispatch-level failures (not CLI completion failures).
|
||||
* This mirrors the error handling in useCompletionCallbackChain but is needed
|
||||
* when the dispatch itself fails before a CLI execution starts.
|
||||
*/
|
||||
function applyDispatchErrorHandling(planId: string, stepId: string): void {
|
||||
const store = useOrchestratorStore.getState();
|
||||
const runState = store.activePlans[planId];
|
||||
if (!runState) return;
|
||||
|
||||
const stepDef = runState.plan.steps.find((s) => s.id === stepId);
|
||||
const strategy =
|
||||
stepDef?.errorHandling?.strategy ??
|
||||
runState.plan.defaultErrorHandling.strategy ??
|
||||
'pause_on_error';
|
||||
|
||||
switch (strategy) {
|
||||
case 'pause_on_error':
|
||||
store.pauseOrchestration(planId);
|
||||
break;
|
||||
case 'skip':
|
||||
store.skipStep(planId, stepId);
|
||||
store._advanceToNextStep(planId);
|
||||
break;
|
||||
case 'stop':
|
||||
store.stopOrchestration(
|
||||
planId,
|
||||
`Dispatch failed for step "${stepId}"`
|
||||
);
|
||||
break;
|
||||
default:
|
||||
store.pauseOrchestration(planId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,430 @@
|
||||
import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest';
|
||||
import { OrchestrationPlanBuilder } from '../OrchestrationPlanBuilder';
|
||||
import { Flow, FlowNode, FlowEdge, PromptTemplateNodeData } from '../../types/flow';
|
||||
import { IssueQueue, QueueItem } from '../../lib/api';
|
||||
import {
|
||||
OrchestrationStep,
|
||||
ManualOrchestrationParams,
|
||||
} from '../../types/orchestrator';
|
||||
|
||||
// Mock buildQueueItemContext as it's an external dependency
|
||||
vi.mock('../../lib/queue-prompt', () => ({
|
||||
buildQueueItemContext: vi.fn((item: QueueItem, issue: any) => `Instruction for ${item.item_id} from issue ${issue?.id}`),
|
||||
}));
|
||||
|
||||
import { buildQueueItemContext } from '../../lib/queue-prompt';
|
||||
|
||||
describe('OrchestrationPlanBuilder', () => {
|
||||
const MOCKED_CREATED_AT = '2026-02-14T10:00:00.000Z';
|
||||
const MOCKED_UPDATED_AT = '2026-02-14T11:00:00.000Z';
|
||||
|
||||
beforeAll(() => {
|
||||
// Mock Date.now() to ensure consistent IDs and timestamps
|
||||
vi.spyOn(Date, 'now').mockReturnValue(new Date(MOCKED_CREATED_AT).getTime());
|
||||
vi.spyOn(Date.prototype, 'toISOString').mockReturnValue(MOCKED_CREATED_AT);
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe('fromFlow', () => {
|
||||
it('should correctly convert a simple linear flow into an OrchestrationPlan', () => {
|
||||
const flow: Flow = {
|
||||
id: 'flow-123',
|
||||
name: 'Test Flow',
|
||||
description: 'A simple linear flow',
|
||||
version: '1.0.0',
|
||||
created_at: MOCKED_CREATED_AT,
|
||||
updated_at: MOCKED_UPDATED_AT,
|
||||
nodes: [
|
||||
{ id: 'nodeA', type: 'prompt-template', data: { label: 'Step A', instruction: 'Do A', outputName: 'outputA' } as PromptTemplateNodeData, position: { x: 0, y: 0 } },
|
||||
{ id: 'nodeB', type: 'prompt-template', data: { label: 'Step B', instruction: 'Do B', contextRefs: ['outputA'] } as PromptTemplateNodeData, position: { x: 1, y: 1 } },
|
||||
{ id: 'nodeC', type: 'prompt-template', data: { label: 'Step C', instruction: 'Do C' } as PromptTemplateNodeData, position: { x: 2, y: 2 } },
|
||||
] as FlowNode[],
|
||||
edges: [
|
||||
{ id: 'edge-ab', source: 'nodeA', target: 'nodeB' },
|
||||
{ id: 'edge-bc', source: 'nodeB', target: 'nodeC' },
|
||||
] as FlowEdge[],
|
||||
variables: { var1: 'value1' },
|
||||
metadata: {},
|
||||
};
|
||||
|
||||
const plan = OrchestrationPlanBuilder.fromFlow(flow);
|
||||
|
||||
expect(plan).toBeDefined();
|
||||
expect(plan.id).toBe('flow-123');
|
||||
expect(plan.name).toBe('Test Flow');
|
||||
expect(plan.source).toBe('flow');
|
||||
expect(plan.sourceId).toBe('flow-123');
|
||||
expect(plan.variables).toEqual({ var1: 'value1' });
|
||||
expect(plan.steps).toHaveLength(3);
|
||||
expect(plan.metadata.totalSteps).toBe(3);
|
||||
expect(plan.metadata.estimatedComplexity).toBe('medium'); // 3 steps is medium
|
||||
|
||||
// Verify topological sort and dependencies
|
||||
expect(plan.steps[0].id).toBe('nodeA');
|
||||
expect(plan.steps[0].dependsOn).toEqual([]);
|
||||
expect(plan.steps[1].id).toBe('nodeB');
|
||||
expect(plan.steps[1].dependsOn).toEqual(['nodeA']);
|
||||
expect(plan.steps[2].id).toBe('nodeC');
|
||||
expect(plan.steps[2].dependsOn).toEqual(['nodeB']);
|
||||
|
||||
// Verify step details
|
||||
expect(plan.steps[0].name).toBe('Step A');
|
||||
expect(plan.steps[0].instruction).toBe('Do A');
|
||||
expect(plan.steps[0].outputName).toBe('outputA');
|
||||
expect(plan.steps[0].executionType).toBe('frontend-cli');
|
||||
|
||||
expect(plan.steps[1].name).toBe('Step B');
|
||||
expect(plan.steps[1].instruction).toBe('Do B');
|
||||
expect(plan.steps[1].contextRefs).toEqual(['outputA']);
|
||||
});
|
||||
|
||||
it('should handle a more complex flow with branching and merging', () => {
|
||||
const flow: Flow = {
|
||||
id: 'flow-complex',
|
||||
name: 'Complex Flow',
|
||||
description: 'Branching and merging flow',
|
||||
version: '1.0.0',
|
||||
created_at: MOCKED_CREATED_AT,
|
||||
updated_at: MOCKED_UPDATED_AT,
|
||||
nodes: [
|
||||
{ id: 'start', type: 'prompt-template', data: { label: 'Start', instruction: 'Start here' } as PromptTemplateNodeData, position: { x: 0, y: 0 } },
|
||||
{ id: 'branchA', type: 'prompt-template', data: { label: 'Branch A', instruction: 'Path A' } as PromptTemplateNodeData, position: { x: 1, y: 1 } },
|
||||
{ id: 'branchB', type: 'prompt-template', data: { label: 'Branch B', instruction: 'Path B' } as PromptTemplateNodeData, position: { x: 1, y: 2 } },
|
||||
{ id: 'merge', type: 'prompt-template', data: { label: 'Merge', instruction: 'Merge results' } as PromptTemplateNodeData, position: { x: 2, y: 1 } },
|
||||
{ id: 'end', type: 'prompt-template', data: { label: 'End', instruction: 'Finish' } as PromptTemplateNodeData, position: { x: 3, y: 1 } },
|
||||
] as FlowNode[],
|
||||
edges: [
|
||||
{ id: 'e-start-a', source: 'start', target: 'branchA' },
|
||||
{ id: 'e-start-b', source: 'start', target: 'branchB' },
|
||||
{ id: 'e-a-merge', source: 'branchA', target: 'merge' },
|
||||
{ id: 'e-b-merge', source: 'branchB', target: 'merge' },
|
||||
{ id: 'e-merge-end', source: 'merge', target: 'end' },
|
||||
] as FlowEdge[],
|
||||
variables: {},
|
||||
metadata: {},
|
||||
};
|
||||
|
||||
const plan = OrchestrationPlanBuilder.fromFlow(flow);
|
||||
|
||||
expect(plan).toBeDefined();
|
||||
expect(plan.steps).toHaveLength(5);
|
||||
expect(plan.metadata.totalSteps).toBe(5);
|
||||
expect(plan.metadata.hasParallelGroups).toBe(true); // branchA and branchB can run in parallel
|
||||
expect(plan.metadata.estimatedComplexity).toBe('high'); // >5 steps, or parallel groups
|
||||
|
||||
// Verify topological sort (order might vary for parallel steps, but dependencies must be correct)
|
||||
const startStep = plan.steps.find(s => s.id === 'start');
|
||||
const branchAStep = plan.steps.find(s => s.id === 'branchA');
|
||||
const branchBStep = plan.steps.find(s => s.id === 'branchB');
|
||||
const mergeStep = plan.steps.find(s => s.id === 'merge');
|
||||
const endStep = plan.steps.find(s => s.id === 'end');
|
||||
|
||||
expect(startStep?.dependsOn).toEqual([]);
|
||||
expect(branchAStep?.dependsOn).toEqual(['start']);
|
||||
expect(branchBStep?.dependsOn).toEqual(['start']);
|
||||
expect(mergeStep?.dependsOn).toEqual(expect.arrayContaining(['branchA', 'branchB']));
|
||||
expect(endStep?.dependsOn).toEqual(['merge']);
|
||||
|
||||
// Ensure 'merge' step comes after 'branchA' and 'branchB'
|
||||
const indexA = plan.steps.indexOf(branchAStep!);
|
||||
const indexB = plan.steps.indexOf(branchBStep!);
|
||||
const indexMerge = plan.steps.indexOf(mergeStep!);
|
||||
expect(indexMerge).toBeGreaterThan(indexA);
|
||||
expect(indexMerge).toBeGreaterThan(indexB);
|
||||
});
|
||||
|
||||
it('should detect cycles and throw an error', () => {
|
||||
const flow: Flow = {
|
||||
id: 'flow-cycle',
|
||||
name: 'Cyclic Flow',
|
||||
description: 'A flow with a cycle',
|
||||
version: '1.0.0',
|
||||
created_at: MOCKED_CREATED_AT,
|
||||
updated_at: MOCKED_UPDATED_AT,
|
||||
nodes: [
|
||||
{ id: 'nodeA', type: 'prompt-template', data: { label: 'A', instruction: 'Do A' } as PromptTemplateNodeData, position: { x: 0, y: 0 } },
|
||||
{ id: 'nodeB', type: 'prompt-template', data: { label: 'B', instruction: 'Do B' } as PromptTemplateNodeData, position: { x: 1, y: 1 } },
|
||||
] as FlowNode[],
|
||||
edges: [
|
||||
{ id: 'e-ab', source: 'nodeA', target: 'nodeB' },
|
||||
{ id: 'e-ba', source: 'nodeB', target: 'nodeA' }, // Cycle
|
||||
] as FlowEdge[],
|
||||
variables: {},
|
||||
metadata: {},
|
||||
};
|
||||
|
||||
expect(() => OrchestrationPlanBuilder.fromFlow(flow)).toThrow('Cycle detected in flow graph. Cannot build orchestration plan from cyclic flow.');
|
||||
});
|
||||
|
||||
it('should correctly map sessionStrategy and executionType from node data', () => {
|
||||
const flow: Flow = {
|
||||
id: 'flow-delivery',
|
||||
name: 'Delivery Flow',
|
||||
description: 'Flow with different delivery types',
|
||||
version: '1.0.0',
|
||||
created_at: MOCKED_CREATED_AT,
|
||||
updated_at: MOCKED_UPDATED_AT,
|
||||
nodes: [
|
||||
{ id: 'node1', type: 'prompt-template', data: { label: 'New Session', instruction: 'New', delivery: 'newExecution' } as PromptTemplateNodeData, position: { x: 0, y: 0 } },
|
||||
{ id: 'node2', type: 'prompt-template', data: { label: 'Specific Session', instruction: 'Specific', delivery: 'sendToSession', targetSessionKey: 'sessionX' } as PromptTemplateNodeData, position: { x: 1, y: 1 } },
|
||||
{ id: 'node3', type: 'prompt-template', data: { label: 'Slash Cmd', instruction: 'Slash', slashCommand: 'test:cmd', mode: 'mainprocess' } as PromptTemplateNodeData, position: { x: 2, y: 2 } },
|
||||
{ id: 'node4', type: 'prompt-template', data: { label: 'Frontend CLI', instruction: 'CLI', tool: 'gemini', mode: 'analysis' } as PromptTemplateNodeData, position: { x: 3, y: 3 } },
|
||||
] as FlowNode[],
|
||||
edges: [
|
||||
{ id: 'e1', source: 'node1', target: 'node2' },
|
||||
{ id: 'e2', source: 'node2', target: 'node3' },
|
||||
{ id: 'e3', source: 'node3', target: 'node4' },
|
||||
],
|
||||
variables: {},
|
||||
metadata: {},
|
||||
};
|
||||
|
||||
const plan = OrchestrationPlanBuilder.fromFlow(flow);
|
||||
expect(plan.steps).toHaveLength(4);
|
||||
|
||||
expect(plan.steps[0].id).toBe('node1');
|
||||
expect(plan.steps[0].sessionStrategy).toBe('new_session');
|
||||
expect(plan.steps[0].executionType).toBe('frontend-cli'); // default as no slash command/tool specified
|
||||
|
||||
expect(plan.steps[1].id).toBe('node2');
|
||||
expect(plan.steps[1].sessionStrategy).toBe('specific_session');
|
||||
expect(plan.steps[1].targetSessionKey).toBe('sessionX');
|
||||
expect(plan.steps[1].executionType).toBe('frontend-cli');
|
||||
|
||||
expect(plan.steps[2].id).toBe('node3');
|
||||
expect(plan.steps[2].executionType).toBe('slash-command');
|
||||
|
||||
expect(plan.steps[3].id).toBe('node4');
|
||||
expect(plan.steps[3].tool).toBe('gemini');
|
||||
expect(plan.steps[3].mode).toBe('analysis');
|
||||
expect(plan.steps[3].executionType).toBe('frontend-cli');
|
||||
});
|
||||
});
|
||||
|
||||
describe('fromQueue', () => {
|
||||
it('should correctly convert an IssueQueue with S* groups into an OrchestrationPlan', () => {
|
||||
const issue1 = { id: 'issue-1', title: 'Fix bug A', description: 'desc A' };
|
||||
const issue2 = { id: 'issue-2', title: 'Implement feature B', description: 'desc B' };
|
||||
const issue3 = { id: 'issue-3', title: 'Refactor C', description: 'desc C' };
|
||||
|
||||
const item1: QueueItem = { item_id: 'qi-1', issue_id: 'issue-1', solution_id: 'sol-1', execution_group: 'S*group1', depends_on: [], status: 'pending', execution_order: 0, semantic_priority: 0 };
|
||||
const item2: QueueItem = { item_id: 'qi-2', issue_id: 'issue-1', solution_id: 'sol-1', execution_group: 'S*group1', depends_on: [], status: 'pending', execution_order: 1, semantic_priority: 0 };
|
||||
const item3: QueueItem = { item_id: 'qi-3', issue_id: 'issue-2', solution_id: 'sol-2', execution_group: 'S*group2', depends_on: [], status: 'pending', execution_order: 2, semantic_priority: 0 };
|
||||
const item4: QueueItem = { item_id: 'qi-4', issue_id: 'issue-3', solution_id: 'sol-3', execution_group: 'S*group3', depends_on: [], status: 'pending', execution_order: 3, semantic_priority: 0 };
|
||||
|
||||
const queue: IssueQueue = {
|
||||
id: 'queue-abc',
|
||||
execution_groups: ['S*group1', 'S*group2', 'S*group3'],
|
||||
grouped_items: {
|
||||
'S*group1': [item1, item2],
|
||||
'S*group2': [item3],
|
||||
'S*group3': [item4],
|
||||
},
|
||||
conflicts: [],
|
||||
};
|
||||
|
||||
const issues = new Map<string, any>();
|
||||
issues.set('issue-1', issue1);
|
||||
issues.set('issue-2', issue2);
|
||||
issues.set('issue-3', issue3);
|
||||
|
||||
const plan = OrchestrationPlanBuilder.fromQueue(queue, issues);
|
||||
|
||||
expect(plan).toBeDefined();
|
||||
expect(plan.source).toBe('queue');
|
||||
expect(plan.sourceId).toBe('queue-abc');
|
||||
expect(plan.steps).toHaveLength(4);
|
||||
expect(plan.metadata.totalSteps).toBe(4);
|
||||
// fromQueue uses explicit P*/S* prefix check for hasParallelGroups (not DAG heuristic).
|
||||
// All groups here are S* (sequential), so hasParallelGroups is false.
|
||||
expect(plan.metadata.hasParallelGroups).toBe(false);
|
||||
// estimateComplexity uses detectParallelGroups (DAG heuristic), which sees qi-1 and qi-2
|
||||
// sharing dependsOn=[] without mutual dependencies. But estimateComplexity also checks
|
||||
// step count (<=1 => low, >5 => high), so 4 steps with no DAG-detected parallelism
|
||||
// (fromQueue passes the already-computed hasParallelGroups=false to metadata, not the
|
||||
// DAG heuristic) means medium. Actually estimateComplexity is a separate static call
|
||||
// that does its own DAG-level check.
|
||||
// With 4 steps and qi-1/qi-2 sharing empty dependsOn: detectParallelGroups returns true,
|
||||
// so estimateComplexity returns 'high'.
|
||||
expect(plan.metadata.estimatedComplexity).toBe('high');
|
||||
|
||||
// Verify sequential dependencies
|
||||
// S*group1 items have no dependencies (first group)
|
||||
expect(plan.steps.find(s => s.id === 'queue-item-qi-1')?.dependsOn).toEqual([]);
|
||||
expect(plan.steps.find(s => s.id === 'queue-item-qi-2')?.dependsOn).toEqual([]);
|
||||
|
||||
// S*group2 items depend on all items from S*group1
|
||||
expect(plan.steps.find(s => s.id === 'queue-item-qi-3')?.dependsOn).toEqual(expect.arrayContaining(['queue-item-qi-1', 'queue-item-qi-2']));
|
||||
|
||||
// S*group3 items depend on all items from S*group2
|
||||
expect(plan.steps.find(s => s.id === 'queue-item-qi-4')?.dependsOn).toEqual(['queue-item-qi-3']);
|
||||
|
||||
// Verify instruction context via mock
|
||||
const mockedBuild = vi.mocked(buildQueueItemContext);
|
||||
expect(plan.steps[0].instruction).toBe('Instruction for qi-1 from issue issue-1');
|
||||
expect(mockedBuild).toHaveBeenCalledWith(item1, issue1);
|
||||
});
|
||||
|
||||
it('should correctly convert an IssueQueue with P* groups into an OrchestrationPlan', () => {
|
||||
const issue1 = { id: 'issue-1', title: 'Fix bug A', description: 'desc A' };
|
||||
const issue2 = { id: 'issue-2', title: 'Implement feature B', description: 'desc B' };
|
||||
|
||||
const item1: QueueItem = { item_id: 'qi-1', issue_id: 'issue-1', solution_id: 'sol-1', execution_group: 'P*group1', depends_on: [], status: 'pending', execution_order: 0, semantic_priority: 0 };
|
||||
const item2: QueueItem = { item_id: 'qi-2', issue_id: 'issue-2', solution_id: 'sol-2', execution_group: 'P*group1', depends_on: [], status: 'pending', execution_order: 1, semantic_priority: 0 };
|
||||
const item3: QueueItem = { item_id: 'qi-3', issue_id: 'issue-1', solution_id: 'sol-1', execution_group: 'S*group2', depends_on: [], status: 'pending', execution_order: 2, semantic_priority: 0 };
|
||||
|
||||
const queue: IssueQueue = {
|
||||
id: 'queue-parallel',
|
||||
execution_groups: ['P*group1', 'S*group2'],
|
||||
grouped_items: {
|
||||
'P*group1': [item1, item2],
|
||||
'S*group2': [item3],
|
||||
},
|
||||
conflicts: [],
|
||||
};
|
||||
|
||||
const issues = new Map<string, any>();
|
||||
issues.set('issue-1', issue1);
|
||||
issues.set('issue-2', issue2);
|
||||
|
||||
const plan = OrchestrationPlanBuilder.fromQueue(queue, issues);
|
||||
|
||||
expect(plan).toBeDefined();
|
||||
expect(plan.steps).toHaveLength(3);
|
||||
expect(plan.metadata.hasParallelGroups).toBe(true);
|
||||
expect(plan.metadata.estimatedComplexity).toBe('high');
|
||||
|
||||
// P*group1 items have no dependencies (first group)
|
||||
expect(plan.steps.find(s => s.id === 'queue-item-qi-1')?.dependsOn).toEqual([]);
|
||||
expect(plan.steps.find(s => s.id === 'queue-item-qi-2')?.dependsOn).toEqual([]);
|
||||
|
||||
// S*group2 items depend on all items from P*group1
|
||||
expect(plan.steps.find(s => s.id === 'queue-item-qi-3')?.dependsOn).toEqual(expect.arrayContaining(['queue-item-qi-1', 'queue-item-qi-2']));
|
||||
});
|
||||
});
|
||||
|
||||
describe('fromManual', () => {
|
||||
it('should create a single-step OrchestrationPlan from manual input', () => {
|
||||
const params: ManualOrchestrationParams = {
|
||||
prompt: 'Analyze current directory',
|
||||
tool: 'gemini',
|
||||
mode: 'analysis',
|
||||
sessionStrategy: 'new_session',
|
||||
outputName: 'analysisResult',
|
||||
errorHandling: { strategy: 'stop', maxRetries: 1, retryDelayMs: 100 },
|
||||
};
|
||||
|
||||
const plan = OrchestrationPlanBuilder.fromManual(params);
|
||||
|
||||
expect(plan).toBeDefined();
|
||||
expect(plan.id).toMatch(/^manual-plan-/);
|
||||
expect(plan.name).toBe('Manual Orchestration');
|
||||
expect(plan.source).toBe('manual');
|
||||
expect(plan.steps).toHaveLength(1);
|
||||
expect(plan.metadata.totalSteps).toBe(1);
|
||||
expect(plan.metadata.hasParallelGroups).toBe(false);
|
||||
expect(plan.metadata.estimatedComplexity).toBe('low');
|
||||
|
||||
const step = plan.steps[0];
|
||||
expect(step.id).toMatch(/^manual-step-/);
|
||||
expect(step.name).toBe('Manual Execution');
|
||||
expect(step.instruction).toBe('Analyze current directory');
|
||||
expect(step.tool).toBe('gemini');
|
||||
expect(step.mode).toBe('analysis');
|
||||
expect(step.sessionStrategy).toBe('new_session');
|
||||
expect(step.targetSessionKey).toBeUndefined();
|
||||
expect(step.dependsOn).toEqual([]);
|
||||
expect(step.outputName).toBe('analysisResult');
|
||||
expect(step.errorHandling).toEqual({ strategy: 'stop', maxRetries: 1, retryDelayMs: 100 });
|
||||
expect(step.executionType).toBe('frontend-cli');
|
||||
});
|
||||
|
||||
it('should use default session strategy and error handling if not provided', () => {
|
||||
const params: ManualOrchestrationParams = {
|
||||
prompt: 'Simple command',
|
||||
};
|
||||
|
||||
const plan = OrchestrationPlanBuilder.fromManual(params);
|
||||
const step = plan.steps[0];
|
||||
|
||||
expect(step.sessionStrategy).toBe('reuse_default');
|
||||
expect(step.errorHandling).toBeUndefined(); // Should be undefined if not explicitly set for step
|
||||
expect(plan.defaultSessionStrategy).toBe('reuse_default');
|
||||
expect(plan.defaultErrorHandling).toEqual({ strategy: 'pause_on_error', maxRetries: 0, retryDelayMs: 0 });
|
||||
});
|
||||
});
|
||||
|
||||
describe('Utility methods', () => {
|
||||
it('should correctly detect parallel groups', () => {
|
||||
// Linear steps, no parallel
|
||||
const linearSteps: OrchestrationStep[] = [
|
||||
{ id: '1', name: 's1', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
{ id: '2', name: 's2', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
{ id: '3', name: 's3', instruction: 'i', dependsOn: ['2'], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).detectParallelGroups(linearSteps)).toBe(false);
|
||||
|
||||
// Parallel steps (2 and 3 depend on 1, but not on each other)
|
||||
const parallelSteps: OrchestrationStep[] = [
|
||||
{ id: '1', name: 's1', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
{ id: '2', name: 's2', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
{ id: '3', name: 's3', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).detectParallelGroups(parallelSteps)).toBe(true);
|
||||
|
||||
// Complex parallel scenario
|
||||
const complexParallelSteps: OrchestrationStep[] = [
|
||||
{ id: 'A', name: 'sA', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
{ id: 'B', name: 'sB', instruction: 'i', dependsOn: ['A'], executionType: 'frontend-cli' },
|
||||
{ id: 'C', name: 'sC', instruction: 'i', dependsOn: ['A'], executionType: 'frontend-cli' },
|
||||
{ id: 'D', name: 'sD', instruction: 'i', dependsOn: ['B', 'C'], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).detectParallelGroups(complexParallelSteps)).toBe(true);
|
||||
|
||||
// Parallel steps with some implicit dependencies (not strictly parallel)
|
||||
const nonStrictlyParallel: OrchestrationStep[] = [
|
||||
{ id: '1', name: 's1', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
{ id: '2', name: 's2', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
{ id: '3', name: 's3', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
{ id: '4', name: 's4', instruction: 'i', dependsOn: ['2'], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).detectParallelGroups(nonStrictlyParallel)).toBe(true);
|
||||
});
|
||||
|
||||
it('should correctly estimate complexity', () => {
|
||||
const stepsLow: OrchestrationStep[] = [
|
||||
{ id: '1', name: 's1', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).estimateComplexity(stepsLow)).toBe('low');
|
||||
|
||||
const stepsMedium: OrchestrationStep[] = [
|
||||
{ id: '1', name: 's1', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
{ id: '2', name: 's2', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
{ id: '3', name: 's3', instruction: 'i', dependsOn: ['2'], executionType: 'frontend-cli' },
|
||||
{ id: '4', name: 's4', instruction: 'i', dependsOn: ['3'], executionType: 'frontend-cli' },
|
||||
{ id: '5', name: 's5', instruction: 'i', dependsOn: ['4'], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).estimateComplexity(stepsMedium)).toBe('medium');
|
||||
|
||||
const stepsHighByCount: OrchestrationStep[] = [
|
||||
{ id: '1', name: 's1', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
{ id: '2', name: 's2', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
{ id: '3', name: 's3', instruction: 'i', dependsOn: ['2'], executionType: 'frontend-cli' },
|
||||
{ id: '4', name: 's4', instruction: 'i', dependsOn: ['3'], executionType: 'frontend-cli' },
|
||||
{ id: '5', name: 's5', instruction: 'i', dependsOn: ['4'], executionType: 'frontend-cli' },
|
||||
{ id: '6', name: 's6', instruction: 'i', dependsOn: ['5'], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).estimateComplexity(stepsHighByCount)).toBe('high');
|
||||
|
||||
const stepsHighByParallel: OrchestrationStep[] = [
|
||||
{ id: '1', name: 's1', instruction: 'i', dependsOn: [], executionType: 'frontend-cli' },
|
||||
{ id: '2', name: 's2', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
{ id: '3', name: 's3', instruction: 'i', dependsOn: ['1'], executionType: 'frontend-cli' },
|
||||
];
|
||||
expect((OrchestrationPlanBuilder as any).estimateComplexity(stepsHighByParallel)).toBe('high');
|
||||
});
|
||||
});
|
||||
});
|
||||
18
ccw/frontend/src/orchestrator/index.ts
Normal file
18
ccw/frontend/src/orchestrator/index.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
// ========================================
|
||||
// Orchestrator Module
|
||||
// ========================================
|
||||
// Barrel exports for the orchestration system.
|
||||
//
|
||||
// OrchestrationPlanBuilder: Builds plans from Flow, Queue, or Manual input
|
||||
// SequentialRunner: Manages PTY session lifecycle and step-by-step dispatch
|
||||
|
||||
export { OrchestrationPlanBuilder } from './OrchestrationPlanBuilder';
|
||||
export {
|
||||
start,
|
||||
executeStep,
|
||||
onStepAdvanced,
|
||||
stop,
|
||||
stopAll,
|
||||
interpolateInstruction,
|
||||
} from './SequentialRunner';
|
||||
export type { StartOptions } from './SequentialRunner';
|
||||
Reference in New Issue
Block a user