mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-10 02:24:35 +08:00
refactor: unify node types into a single PromptTemplate model
- Removed individual node components (SlashCommandNode, FileOperationNode, etc.) and replaced them with a unified PromptTemplateNode. - Updated flow types and interfaces to reflect the new single node type system. - Refactored flow execution logic to handle the new unified model, simplifying node execution and context handling. - Adjusted UI components to support the new PromptTemplateNode, including instruction display and context references. - Cleaned up legacy code related to removed node types and ensured compatibility with the new structure.
This commit is contained in:
@@ -40,63 +40,101 @@ const __dirname = dirname(__filename);
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Node types supported by the orchestrator
|
||||
* Unified node type - all nodes are prompt templates
|
||||
* Replaces previous 6-type system with single unified model
|
||||
*/
|
||||
export type FlowNodeType = 'slash-command' | 'file-operation' | 'conditional' | 'parallel';
|
||||
export type FlowNodeType = 'prompt-template';
|
||||
|
||||
/**
|
||||
* SlashCommand node data
|
||||
* Available CLI tools for execution
|
||||
*/
|
||||
export interface SlashCommandNodeData {
|
||||
command: string;
|
||||
args?: string;
|
||||
execution: {
|
||||
mode: 'mainprocess' | 'async';
|
||||
timeout?: number;
|
||||
};
|
||||
contextHint?: string;
|
||||
export type CliTool = 'gemini' | 'qwen' | 'codex' | 'claude';
|
||||
|
||||
/**
|
||||
* Execution modes for prompt templates
|
||||
* - analysis: Read-only operations, code review, exploration
|
||||
* - write: Create/modify/delete files
|
||||
* - mainprocess: Execute in main process (blocking)
|
||||
* - async: Execute asynchronously (non-blocking)
|
||||
*/
|
||||
export type ExecutionMode = 'analysis' | 'write' | 'mainprocess' | 'async';
|
||||
|
||||
/**
|
||||
* Unified PromptTemplate node data model
|
||||
*
|
||||
* All workflow nodes are represented as prompt templates with natural language
|
||||
* instructions. This model replaces the previous specialized node types:
|
||||
* - slash-command -> instruction: "Execute /command args"
|
||||
* - cli-command -> instruction + tool + mode
|
||||
* - file-operation -> instruction: "Save {{ref}} to path"
|
||||
* - conditional -> instruction: "If {{condition}} then..."
|
||||
* - parallel -> instruction: "Execute in parallel..."
|
||||
* - prompt -> instruction (direct)
|
||||
*/
|
||||
export interface PromptTemplateNodeData {
|
||||
/**
|
||||
* Display label for the node in the editor
|
||||
*/
|
||||
label: string;
|
||||
|
||||
/**
|
||||
* Natural language instruction describing what to execute
|
||||
* Can include context references using {{variableName}} syntax
|
||||
*/
|
||||
instruction: string;
|
||||
|
||||
/**
|
||||
* Optional name for the output, allowing subsequent steps to reference it
|
||||
* via contextRefs or {{outputName}} syntax in instructions
|
||||
*/
|
||||
outputName?: string;
|
||||
|
||||
/**
|
||||
* Optional CLI tool to use for execution
|
||||
* If not specified, the system selects based on task requirements
|
||||
*/
|
||||
tool?: CliTool;
|
||||
|
||||
/**
|
||||
* Optional execution mode
|
||||
* Defaults to 'mainprocess' if not specified
|
||||
*/
|
||||
mode?: ExecutionMode;
|
||||
|
||||
/**
|
||||
* References to outputs from previous steps
|
||||
* Use the outputName values from earlier nodes
|
||||
*/
|
||||
contextRefs?: string[];
|
||||
|
||||
/**
|
||||
* Error handling behavior
|
||||
*/
|
||||
onError?: 'continue' | 'pause' | 'fail';
|
||||
|
||||
// ========== Execution State Fields ==========
|
||||
|
||||
/**
|
||||
* Current execution status of this node
|
||||
* Uses same values as NodeExecutionStatus defined below
|
||||
*/
|
||||
executionStatus?: 'pending' | 'running' | 'completed' | 'failed' | 'skipped';
|
||||
|
||||
/**
|
||||
* Error message if execution failed
|
||||
*/
|
||||
executionError?: string;
|
||||
|
||||
/**
|
||||
* Result data from execution
|
||||
*/
|
||||
executionResult?: unknown;
|
||||
}
|
||||
|
||||
/**
|
||||
* FileOperation node data
|
||||
* NodeData type - unified to single PromptTemplateNodeData
|
||||
*/
|
||||
export interface FileOperationNodeData {
|
||||
operation: 'read' | 'write' | 'append' | 'delete' | 'copy' | 'move';
|
||||
path: string;
|
||||
content?: string;
|
||||
destinationPath?: string;
|
||||
encoding?: string;
|
||||
outputVariable?: string;
|
||||
addToContext: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Conditional node data
|
||||
*/
|
||||
export interface ConditionalNodeData {
|
||||
condition: string;
|
||||
trueLabel?: string;
|
||||
falseLabel?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parallel node data
|
||||
*/
|
||||
export interface ParallelNodeData {
|
||||
joinMode: 'all' | 'any' | 'none';
|
||||
timeout?: number;
|
||||
failFast?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Union type for all node data types
|
||||
*/
|
||||
export type NodeData =
|
||||
| SlashCommandNodeData
|
||||
| FileOperationNodeData
|
||||
| ConditionalNodeData
|
||||
| ParallelNodeData;
|
||||
export type NodeData = PromptTemplateNodeData;
|
||||
|
||||
/**
|
||||
* Flow node definition
|
||||
|
||||
@@ -14,9 +14,9 @@
|
||||
* - cli-executor for slash-command execution
|
||||
*/
|
||||
|
||||
import { readFile, writeFile, mkdir, unlink, copyFile, rename } from 'fs/promises';
|
||||
import { readFile, writeFile, mkdir } from 'fs/promises';
|
||||
import { existsSync } from 'fs';
|
||||
import { join, dirname } from 'path';
|
||||
import { join } from 'path';
|
||||
import { broadcastToClients } from '../websocket.js';
|
||||
import { executeCliTool } from '../../tools/cli-executor-core.js';
|
||||
import type {
|
||||
@@ -25,14 +25,13 @@ import type {
|
||||
FlowEdge,
|
||||
FlowNodeType,
|
||||
ExecutionState,
|
||||
ExecutionStatus,
|
||||
ExecutionStatus as RouteExecutionStatus,
|
||||
NodeExecutionState,
|
||||
NodeExecutionStatus,
|
||||
ExecutionLog,
|
||||
SlashCommandNodeData,
|
||||
FileOperationNodeData,
|
||||
ConditionalNodeData,
|
||||
ParallelNodeData,
|
||||
PromptTemplateNodeData,
|
||||
CliTool,
|
||||
ExecutionMode,
|
||||
} from '../routes/orchestrator-routes.js';
|
||||
|
||||
// ============================================================================
|
||||
@@ -161,11 +160,17 @@ export function interpolateObject<T>(obj: T, variables: Record<string, unknown>)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// NodeRunner - Type-specific Node Execution
|
||||
// NodeRunner - Unified Prompt Template Execution
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* NodeRunner executes individual nodes based on their type
|
||||
* Default CLI tool when not specified
|
||||
*/
|
||||
const DEFAULT_CLI_TOOL: CliTool = 'claude';
|
||||
|
||||
/**
|
||||
* NodeRunner executes unified prompt-template nodes
|
||||
* All nodes are interpreted through natural language instructions
|
||||
*/
|
||||
export class NodeRunner {
|
||||
private context: ExecutionContext;
|
||||
@@ -176,44 +181,37 @@ export class NodeRunner {
|
||||
|
||||
/**
|
||||
* Execute a node and return the result
|
||||
* All nodes are prompt-template type
|
||||
*/
|
||||
async run(node: FlowNode): Promise<NodeResult> {
|
||||
switch (node.type) {
|
||||
case 'slash-command':
|
||||
return this.runSlashCommand(node);
|
||||
case 'file-operation':
|
||||
return this.runFileOperation(node);
|
||||
case 'conditional':
|
||||
return this.runConditional(node);
|
||||
case 'parallel':
|
||||
return this.runParallel(node);
|
||||
default:
|
||||
return {
|
||||
success: false,
|
||||
error: `Unknown node type: ${(node as FlowNode).type}`
|
||||
};
|
||||
// All nodes are prompt-template type
|
||||
if (node.type === 'prompt-template') {
|
||||
return this.runPromptTemplate(node);
|
||||
}
|
||||
|
||||
// Fallback for any legacy node types
|
||||
return {
|
||||
success: false,
|
||||
error: `Unsupported node type: ${node.type}. Only 'prompt-template' is supported.`
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a slash-command node
|
||||
* Integrates with executeCliTool from cli-executor
|
||||
* Execute a prompt-template node
|
||||
* Interprets instruction field to build and execute CLI command
|
||||
*/
|
||||
private async runSlashCommand(node: FlowNode): Promise<NodeResult> {
|
||||
const data = node.data as SlashCommandNodeData;
|
||||
private async runPromptTemplate(node: FlowNode): Promise<NodeResult> {
|
||||
const data = node.data as PromptTemplateNodeData;
|
||||
|
||||
// Interpolate command and args
|
||||
const command = interpolate(data.command, this.context.variables);
|
||||
const args = data.args ? interpolate(data.args, this.context.variables) : '';
|
||||
const contextHint = data.contextHint ? interpolate(data.contextHint, this.context.variables) : '';
|
||||
// Interpolate instruction with variables
|
||||
let instruction = interpolate(data.instruction, this.context.variables);
|
||||
|
||||
// Build prompt: combine command, args, and context hint
|
||||
let prompt = command;
|
||||
if (args) {
|
||||
prompt += ` ${args}`;
|
||||
}
|
||||
if (contextHint) {
|
||||
prompt = `${contextHint}\n\n${prompt}`;
|
||||
// Resolve context references
|
||||
if (data.contextRefs && data.contextRefs.length > 0) {
|
||||
const contextContent = this.resolveContextRefs(data.contextRefs);
|
||||
if (contextContent) {
|
||||
instruction = `${contextContent}\n\n${instruction}`;
|
||||
}
|
||||
}
|
||||
|
||||
// Add file context if available
|
||||
@@ -224,23 +222,33 @@ export class NodeRunner {
|
||||
.join('\n\n');
|
||||
|
||||
if (fileContextStr) {
|
||||
prompt = `${fileContextStr}\n\n${prompt}`;
|
||||
instruction = `${fileContextStr}\n\n${instruction}`;
|
||||
}
|
||||
}
|
||||
|
||||
// Determine tool and mode
|
||||
const tool = data.tool || DEFAULT_CLI_TOOL;
|
||||
const mode = this.determineCliMode(data.mode);
|
||||
|
||||
try {
|
||||
// Use claude tool for slash-command execution
|
||||
// Execute via CLI tool
|
||||
const result = await executeCliTool({
|
||||
tool: 'claude',
|
||||
prompt,
|
||||
mode: data.execution?.mode === 'mainprocess' ? 'write' : 'analysis',
|
||||
tool,
|
||||
prompt: instruction,
|
||||
mode,
|
||||
cd: this.context.workingDir
|
||||
});
|
||||
|
||||
// Store output in variables for subsequent nodes
|
||||
const outputVar = `${node.id}_output`;
|
||||
this.context.variables[outputVar] = result.stdout;
|
||||
// Store output using outputName if specified, otherwise use node.id
|
||||
const outputKey = data.outputName || `${node.id}_output`;
|
||||
this.context.variables[outputKey] = result.stdout;
|
||||
this.context.variables[`${node.id}_exitCode`] = result.execution?.exit_code ?? 0;
|
||||
this.context.variables[`${node.id}_success`] = result.success;
|
||||
|
||||
// If outputName is specified, also store structured result
|
||||
if (data.outputName) {
|
||||
this.context.variables[data.outputName] = result.stdout;
|
||||
}
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
@@ -256,248 +264,38 @@ export class NodeRunner {
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a file-operation node
|
||||
* Supports: read, write, append, delete, copy, move
|
||||
* Resolve context references to actual output values
|
||||
* Looks up outputName values from previous nodes
|
||||
*/
|
||||
private async runFileOperation(node: FlowNode): Promise<NodeResult> {
|
||||
const data = node.data as FileOperationNodeData;
|
||||
private resolveContextRefs(refs: string[]): string {
|
||||
const resolvedParts: string[] = [];
|
||||
|
||||
// Interpolate path and content
|
||||
const filePath = interpolate(data.path, this.context.variables);
|
||||
const resolvedPath = join(this.context.workingDir, filePath);
|
||||
const encoding = (data.encoding || 'utf-8') as BufferEncoding;
|
||||
|
||||
try {
|
||||
switch (data.operation) {
|
||||
case 'read': {
|
||||
const content = await readFile(resolvedPath, encoding);
|
||||
|
||||
// Store in output variable if specified
|
||||
if (data.outputVariable) {
|
||||
this.context.variables[data.outputVariable] = content;
|
||||
}
|
||||
|
||||
// Add to file context for subsequent nodes
|
||||
if (data.addToContext) {
|
||||
this.context.fileContext.push({ path: filePath, content });
|
||||
}
|
||||
|
||||
return { success: true, output: content };
|
||||
}
|
||||
|
||||
case 'write': {
|
||||
const content = data.content ? interpolate(data.content, this.context.variables) : '';
|
||||
|
||||
// Ensure directory exists
|
||||
const dir = dirname(resolvedPath);
|
||||
if (!existsSync(dir)) {
|
||||
await mkdir(dir, { recursive: true });
|
||||
}
|
||||
|
||||
await writeFile(resolvedPath, content, encoding);
|
||||
|
||||
if (data.addToContext) {
|
||||
this.context.fileContext.push({ path: filePath, operation: 'written' });
|
||||
}
|
||||
|
||||
return { success: true, output: { path: filePath, bytesWritten: content.length } };
|
||||
}
|
||||
|
||||
case 'append': {
|
||||
const content = data.content ? interpolate(data.content, this.context.variables) : '';
|
||||
|
||||
// Ensure directory exists
|
||||
const dir = dirname(resolvedPath);
|
||||
if (!existsSync(dir)) {
|
||||
await mkdir(dir, { recursive: true });
|
||||
}
|
||||
|
||||
// Read existing content and append
|
||||
let existingContent = '';
|
||||
if (existsSync(resolvedPath)) {
|
||||
existingContent = await readFile(resolvedPath, encoding);
|
||||
}
|
||||
await writeFile(resolvedPath, existingContent + content, encoding);
|
||||
|
||||
if (data.addToContext) {
|
||||
this.context.fileContext.push({ path: filePath, operation: 'appended' });
|
||||
}
|
||||
|
||||
return { success: true, output: { path: filePath, bytesAppended: content.length } };
|
||||
}
|
||||
|
||||
case 'delete': {
|
||||
if (existsSync(resolvedPath)) {
|
||||
await unlink(resolvedPath);
|
||||
}
|
||||
return { success: true, output: { path: filePath, deleted: true } };
|
||||
}
|
||||
|
||||
case 'copy': {
|
||||
const destPath = data.destinationPath
|
||||
? join(this.context.workingDir, interpolate(data.destinationPath, this.context.variables))
|
||||
: resolvedPath + '.copy';
|
||||
|
||||
// Ensure destination directory exists
|
||||
const destDir = dirname(destPath);
|
||||
if (!existsSync(destDir)) {
|
||||
await mkdir(destDir, { recursive: true });
|
||||
}
|
||||
|
||||
await copyFile(resolvedPath, destPath);
|
||||
return { success: true, output: { source: filePath, destination: destPath } };
|
||||
}
|
||||
|
||||
case 'move': {
|
||||
const destPath = data.destinationPath
|
||||
? join(this.context.workingDir, interpolate(data.destinationPath, this.context.variables))
|
||||
: resolvedPath;
|
||||
|
||||
if (destPath === resolvedPath) {
|
||||
return { success: false, error: 'Source and destination are the same' };
|
||||
}
|
||||
|
||||
// Ensure destination directory exists
|
||||
const destDir = dirname(destPath);
|
||||
if (!existsSync(destDir)) {
|
||||
await mkdir(destDir, { recursive: true });
|
||||
}
|
||||
|
||||
await rename(resolvedPath, destPath);
|
||||
return { success: true, output: { source: filePath, destination: destPath } };
|
||||
}
|
||||
|
||||
default:
|
||||
return { success: false, error: `Unknown file operation: ${data.operation}` };
|
||||
for (const ref of refs) {
|
||||
const value = this.context.variables[ref];
|
||||
if (value !== undefined && value !== null) {
|
||||
const valueStr = typeof value === 'object' ? JSON.stringify(value, null, 2) : String(value);
|
||||
resolvedParts.push(`=== Context: ${ref} ===\n${valueStr}`);
|
||||
}
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
error: (error as Error).message
|
||||
};
|
||||
}
|
||||
|
||||
return resolvedParts.join('\n\n');
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a conditional node
|
||||
* Evaluates condition and returns branch decision
|
||||
* Determine CLI mode from execution mode
|
||||
* Maps prompt-template modes to CLI executor modes
|
||||
*/
|
||||
private async runConditional(node: FlowNode): Promise<NodeResult> {
|
||||
const data = node.data as ConditionalNodeData;
|
||||
|
||||
// Interpolate condition
|
||||
const condition = interpolate(data.condition, this.context.variables);
|
||||
|
||||
try {
|
||||
// Evaluate condition in a safe context
|
||||
const result = this.evaluateCondition(condition);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: result,
|
||||
branch: result ? 'true' : 'false'
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
error: `Condition evaluation failed: ${(error as Error).message}`
|
||||
};
|
||||
private determineCliMode(mode?: ExecutionMode): 'analysis' | 'write' {
|
||||
switch (mode) {
|
||||
case 'write':
|
||||
case 'mainprocess':
|
||||
return 'write';
|
||||
case 'analysis':
|
||||
case 'async':
|
||||
default:
|
||||
return 'analysis';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely evaluate a condition expression
|
||||
* Uses Function constructor with limited scope
|
||||
*/
|
||||
private evaluateCondition(condition: string): boolean {
|
||||
// Create a safe evaluation context with common comparison helpers
|
||||
const safeContext = {
|
||||
// Allow access to variables
|
||||
...this.context.variables,
|
||||
// Add helper functions
|
||||
isEmpty: (v: unknown) => v === null || v === undefined || v === '' || (Array.isArray(v) && v.length === 0),
|
||||
isNotEmpty: (v: unknown) => !(v === null || v === undefined || v === '' || (Array.isArray(v) && v.length === 0)),
|
||||
contains: (str: string, search: string) => String(str).includes(search),
|
||||
startsWith: (str: string, search: string) => String(str).startsWith(search),
|
||||
endsWith: (str: string, search: string) => String(str).endsWith(search),
|
||||
};
|
||||
|
||||
// Build a safe evaluation function
|
||||
const keys = Object.keys(safeContext);
|
||||
const values = Object.values(safeContext);
|
||||
|
||||
try {
|
||||
// Create function with explicit parameters to prevent scope leakage
|
||||
const evalFn = new Function(...keys, `return (${condition})`);
|
||||
const result = evalFn(...values);
|
||||
return Boolean(result);
|
||||
} catch (error) {
|
||||
// If direct evaluation fails, try simpler comparison
|
||||
// Handle common patterns like "value >= 0.95"
|
||||
const simpleMatch = condition.match(/^(.+?)\s*(===|!==|==|!=|>=|<=|>|<)\s*(.+)$/);
|
||||
if (simpleMatch) {
|
||||
const [, left, op, right] = simpleMatch;
|
||||
const leftVal = this.parseValue(left.trim());
|
||||
const rightVal = this.parseValue(right.trim());
|
||||
|
||||
switch (op) {
|
||||
case '===': return leftVal === rightVal;
|
||||
case '!==': return leftVal !== rightVal;
|
||||
case '==': return leftVal == rightVal;
|
||||
case '!=': return leftVal != rightVal;
|
||||
case '>=': return Number(leftVal) >= Number(rightVal);
|
||||
case '<=': return Number(leftVal) <= Number(rightVal);
|
||||
case '>': return Number(leftVal) > Number(rightVal);
|
||||
case '<': return Number(leftVal) < Number(rightVal);
|
||||
}
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a value from condition string
|
||||
*/
|
||||
private parseValue(val: string): unknown {
|
||||
// Check for number
|
||||
if (/^-?\d+(\.\d+)?$/.test(val)) {
|
||||
return parseFloat(val);
|
||||
}
|
||||
// Check for boolean
|
||||
if (val === 'true') return true;
|
||||
if (val === 'false') return false;
|
||||
// Check for null/undefined
|
||||
if (val === 'null') return null;
|
||||
if (val === 'undefined') return undefined;
|
||||
// Check for quoted string
|
||||
if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) {
|
||||
return val.slice(1, -1);
|
||||
}
|
||||
// Otherwise, try to get from context
|
||||
return this.context.variables[val] ?? val;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a parallel node
|
||||
* Forks execution into multiple branches
|
||||
*/
|
||||
private async runParallel(node: FlowNode): Promise<NodeResult> {
|
||||
const data = node.data as ParallelNodeData;
|
||||
|
||||
// Parallel node doesn't execute directly - it's a control flow marker
|
||||
// The FlowExecutor handles the actual parallel execution based on outgoing edges
|
||||
// This method returns success to indicate the fork point was reached
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
joinMode: data.joinMode,
|
||||
timeout: data.timeout,
|
||||
failFast: data.failFast
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
@@ -673,33 +471,13 @@ export class FlowExecutor {
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a node should be skipped based on conditional branching
|
||||
* Check if a node should be skipped
|
||||
* With unified prompt-template model, conditional logic is handled
|
||||
* via natural language instructions interpreted by the LLM
|
||||
*/
|
||||
private shouldSkipNode(node: FlowNode): boolean {
|
||||
const dagNode = this.dag.get(node.id);
|
||||
if (!dagNode) return false;
|
||||
|
||||
// Check if this node is on a conditional branch that wasn't taken
|
||||
for (const depId of dagNode.incoming) {
|
||||
const depState = this.state.nodeStates[depId];
|
||||
const depNode = this.flow.nodes.find(n => n.id === depId);
|
||||
|
||||
if (depNode?.type === 'conditional' && depState.status === 'completed') {
|
||||
const result = this.state.nodeStates[depId].result as NodeResult;
|
||||
const branch = result?.branch;
|
||||
|
||||
// Find the edge from conditional to this node
|
||||
const edge = this.flow.edges.find(e => e.source === depId && e.target === node.id);
|
||||
if (edge) {
|
||||
// Check if edge label matches the branch taken
|
||||
const edgeLabel = edge.sourceHandle || edge.label || 'true';
|
||||
if (branch && edgeLabel !== branch) {
|
||||
return true; // Skip this branch
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private shouldSkipNode(_node: FlowNode): boolean {
|
||||
// With unified prompt-template nodes, branching decisions are made
|
||||
// by the LLM interpreting instructions. No special skip logic needed.
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -796,6 +574,7 @@ export class FlowExecutor {
|
||||
|
||||
/**
|
||||
* Execute a single node
|
||||
* All nodes are prompt-template type in the unified model
|
||||
*/
|
||||
private async executeNode(node: FlowNode, context: ExecutionContext): Promise<void> {
|
||||
const nodeState = this.state.nodeStates[node.id];
|
||||
@@ -816,14 +595,9 @@ export class FlowExecutor {
|
||||
// Create node runner with current context
|
||||
const runner = new NodeRunner(context);
|
||||
|
||||
// Execute the node
|
||||
// Execute the node (all nodes are prompt-template type)
|
||||
const result = await runner.run(node);
|
||||
|
||||
// Handle parallel node specially
|
||||
if (node.type === 'parallel') {
|
||||
await this.executeParallelBranches(node, context);
|
||||
}
|
||||
|
||||
// Update node state
|
||||
nodeState.status = result.success ? 'completed' : 'failed';
|
||||
nodeState.completedAt = new Date().toISOString();
|
||||
@@ -848,7 +622,7 @@ export class FlowExecutor {
|
||||
this.addLog('info', `Completed node: ${node.id}`, node.id);
|
||||
} else {
|
||||
// Handle error based on node's onError setting
|
||||
const nodeData = node.data as SlashCommandNodeData;
|
||||
const nodeData = node.data as PromptTemplateNodeData;
|
||||
const onError = nodeData.onError || 'fail';
|
||||
|
||||
this.addLog('error', `Node failed: ${node.id} - ${result.error}`, node.id);
|
||||
@@ -875,77 +649,6 @@ export class FlowExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute parallel branches
|
||||
*/
|
||||
private async executeParallelBranches(parallelNode: FlowNode, context: ExecutionContext): Promise<void> {
|
||||
const data = parallelNode.data as ParallelNodeData;
|
||||
const dagNode = this.dag.get(parallelNode.id);
|
||||
|
||||
if (!dagNode) return;
|
||||
|
||||
// Get branch starting nodes (direct outgoing edges from parallel node)
|
||||
const branchNodeIds = dagNode.outgoing;
|
||||
if (branchNodeIds.length === 0) return;
|
||||
|
||||
this.addLog('info', `Executing ${branchNodeIds.length} parallel branches`, parallelNode.id);
|
||||
|
||||
// Create promises for each branch
|
||||
const branchPromises = branchNodeIds.map(async (branchNodeId) => {
|
||||
const branchNode = this.flow.nodes.find(n => n.id === branchNodeId);
|
||||
if (!branchNode) return { success: false, error: 'Branch node not found' };
|
||||
|
||||
try {
|
||||
await this.executeNode(branchNode, context);
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
return { success: false, error: (error as Error).message };
|
||||
}
|
||||
});
|
||||
|
||||
// Execute based on join mode
|
||||
const timeout = data.timeout || 300000; // Default 5 minutes
|
||||
|
||||
try {
|
||||
if (data.joinMode === 'all') {
|
||||
// Wait for all branches to complete
|
||||
const results = await Promise.all(
|
||||
branchPromises.map(p =>
|
||||
Promise.race([
|
||||
p,
|
||||
new Promise<{ success: false; error: string }>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Branch timeout')), timeout)
|
||||
)
|
||||
])
|
||||
)
|
||||
);
|
||||
|
||||
// Check if any failed (and failFast is enabled)
|
||||
if (data.failFast) {
|
||||
const failed = results.find(r => !r.success);
|
||||
if (failed && 'error' in failed) {
|
||||
throw new Error(`Parallel branch failed: ${failed.error}`);
|
||||
}
|
||||
}
|
||||
} else if (data.joinMode === 'any') {
|
||||
// Wait for first branch to complete
|
||||
await Promise.race([
|
||||
Promise.race(branchPromises),
|
||||
new Promise((_, reject) => setTimeout(() => reject(new Error('All branches timeout')), timeout))
|
||||
]);
|
||||
} else {
|
||||
// 'none' - fire and forget, don't wait
|
||||
// Just trigger the branches without awaiting
|
||||
for (const promise of branchPromises) {
|
||||
promise.catch(() => {}); // Suppress unhandled rejection
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.addLog('error', `Parallel execution failed: ${(error as Error).message}`, parallelNode.id);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request pause (will pause at next safe point)
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user