feat: add CCW Loop System for automated iterative workflow execution

Implements a complete loop execution system with multi-loop parallel support,
dashboard monitoring, and comprehensive security validation.

Core features:
- Loop orchestration engine (loop-manager, loop-state-manager)
- Multi-loop parallel execution with independent state management
- REST API endpoints for loop control (pause, resume, stop, retry)
- WebSocket real-time status updates
- Dashboard Loop Monitor view with live updates
- Security: path traversal protection and sandboxed JavaScript evaluation

Test coverage:
- 42 comprehensive tests covering multi-loop, API, WebSocket, security
- Security validation for success_condition injection attacks
- Edge case handling and end-to-end workflow tests
This commit is contained in:
catlog22
2026-01-21 22:55:24 +08:00
parent 64e064e775
commit d9f1d14d5e
28 changed files with 5912 additions and 17 deletions

View File

@@ -610,6 +610,19 @@ export function updateClaudeDefaultTool(
return settings;
}
/**
* Get the default tool from config
* Returns the configured defaultTool or 'gemini' as fallback
*/
export function getDefaultTool(projectDir: string): string {
try {
const settings = loadClaudeCliSettings(projectDir);
return settings.defaultTool || 'gemini';
} catch {
return 'gemini';
}
}
/**
* Add API endpoint as a tool with type: 'api-endpoint'
* Usage: --tool <name> or --tool custom --model <id>
@@ -943,3 +956,133 @@ export function getFullConfigResponse(projectDir: string): {
predefinedModels: { ...PREDEFINED_MODELS }
};
}
// ========== Tool Detection & Sync Functions ==========
/**
* Sync builtin tools availability with cli-tools.json
*
* For builtin tools (gemini, qwen, codex, claude, opencode):
* - Checks actual tool availability using system PATH
* - Updates enabled status based on actual availability
*
* For non-builtin tools (cli-wrapper, api-endpoint):
* - Leaves them unchanged as they have different availability mechanisms
*
* @returns Updated config and sync results
*/
export async function syncBuiltinToolsAvailability(projectDir: string): Promise<{
config: ClaudeCliToolsConfig;
changes: {
enabled: string[]; // Tools that were enabled
disabled: string[]; // Tools that were disabled
unchanged: string[]; // Tools that stayed the same
};
}> {
// Import getCliToolsStatus dynamically to avoid circular dependency
const { getCliToolsStatus } = await import('./cli-executor.js');
// Get actual tool availability
const actualStatus = await getCliToolsStatus();
// Load current config
const config = loadClaudeCliTools(projectDir);
const changes = {
enabled: [] as string[],
disabled: [] as string[],
unchanged: [] as string[]
};
// Builtin tools that need sync
const builtinTools = ['gemini', 'qwen', 'codex', 'claude', 'opencode'];
for (const toolName of builtinTools) {
const isAvailable = actualStatus[toolName]?.available ?? false;
const currentConfig = config.tools[toolName];
const wasEnabled = currentConfig?.enabled ?? true;
// Update based on actual availability
if (isAvailable && !wasEnabled) {
// Tool exists but was disabled - enable it
if (!currentConfig) {
config.tools[toolName] = {
enabled: true,
primaryModel: DEFAULT_TOOLS_CONFIG.tools[toolName]?.primaryModel || '',
secondaryModel: DEFAULT_TOOLS_CONFIG.tools[toolName]?.secondaryModel || '',
tags: [],
type: 'builtin'
};
} else {
currentConfig.enabled = true;
}
changes.enabled.push(toolName);
} else if (!isAvailable && wasEnabled) {
// Tool doesn't exist but was enabled - disable it
if (currentConfig) {
currentConfig.enabled = false;
}
changes.disabled.push(toolName);
} else {
// No change needed
changes.unchanged.push(toolName);
}
}
// Save updated config
saveClaudeCliTools(projectDir, config);
console.log('[claude-cli-tools] Synced builtin tools availability:', {
enabled: changes.enabled,
disabled: changes.disabled,
unchanged: changes.unchanged
});
return { config, changes };
}
/**
* Get sync status report without actually modifying config
*
* @returns Report showing what would change if sync were run
*/
export async function getBuiltinToolsSyncReport(projectDir: string): Promise<{
current: Record<string, { available: boolean; enabled: boolean }>;
recommended: Record<string, { shouldEnable: boolean; reason: string }>;
}> {
// Import getCliToolsStatus dynamically to avoid circular dependency
const { getCliToolsStatus } = await import('./cli-executor.js');
// Get actual tool availability
const actualStatus = await getCliToolsStatus();
// Load current config
const config = loadClaudeCliTools(projectDir);
const builtinTools = ['gemini', 'qwen', 'codex', 'claude', 'opencode'];
const current: Record<string, { available: boolean; enabled: boolean }> = {};
const recommended: Record<string, { shouldEnable: boolean; reason: string }> = {};
for (const toolName of builtinTools) {
const isAvailable = actualStatus[toolName]?.available ?? false;
const isEnabled = config.tools[toolName]?.enabled ?? true;
current[toolName] = {
available: isAvailable,
enabled: isEnabled
};
if (isAvailable && !isEnabled) {
recommended[toolName] = {
shouldEnable: true,
reason: 'Tool is installed but disabled in config'
};
} else if (!isAvailable && isEnabled) {
recommended[toolName] = {
shouldEnable: false,
reason: 'Tool is not installed but enabled in config'
};
}
}
return { current, recommended };
}

View File

@@ -0,0 +1,519 @@
/**
* Loop Manager
* CCW Loop System - Core orchestration engine
* Reference: .workflow/.scratchpad/loop-system-complete-design-20260121.md section 4.2
*/
import chalk from 'chalk';
import { LoopStateManager } from './loop-state-manager.js';
import { cliExecutorTool } from './cli-executor.js';
import { broadcastLoopUpdate } from '../core/websocket.js';
import type { LoopState, LoopStatus, CliStepConfig, ExecutionRecord, Task } from '../types/loop.js';
export class LoopManager {
private stateManager: LoopStateManager;
constructor(workflowDir: string) {
this.stateManager = new LoopStateManager(workflowDir);
}
/**
* Start new loop
*/
async startLoop(task: Task): Promise<string> {
if (!task.loop_control?.enabled) {
throw new Error(`Task ${task.id} does not have loop enabled`);
}
const loopId = this.generateLoopId(task.id);
console.log(chalk.cyan(`\n 🔄 Starting loop: ${loopId}\n`));
// Create initial state
const state = await this.stateManager.createState(
loopId,
task.id,
task.loop_control
);
// Update to running status
await this.stateManager.updateState(loopId, { status: 'running' as LoopStatus });
// Start execution (non-blocking)
this.runNextStep(loopId).catch(err => {
console.error(chalk.red(`\n ✗ Loop execution error: ${err}\n`));
});
return loopId;
}
/**
* Execute next step
*/
async runNextStep(loopId: string): Promise<void> {
const state = await this.stateManager.readState(loopId);
// Check if should terminate
if (await this.shouldTerminate(state)) {
return;
}
// Get current step config
const stepConfig = state.cli_sequence[state.current_cli_step];
if (!stepConfig) {
console.error(chalk.red(` ✗ Invalid step index: ${state.current_cli_step}`));
await this.markFailed(loopId, 'Invalid step configuration');
return;
}
console.log(chalk.gray(` [Iteration ${state.current_iteration}] Step ${state.current_cli_step + 1}/${state.cli_sequence.length}: ${stepConfig.step_id}`));
try {
// Execute step
const result = await this.executeStep(state, stepConfig);
// Update state after step
await this.updateStateAfterStep(loopId, stepConfig, result);
// Check if iteration completed
const newState = await this.stateManager.readState(loopId);
if (newState.current_cli_step === 0) {
console.log(chalk.green(` ✓ Iteration ${newState.current_iteration - 1} completed\n`));
// Check success condition
if (await this.evaluateSuccessCondition(newState)) {
await this.markCompleted(loopId);
return;
}
}
// Schedule next step (prevent stack overflow)
setImmediate(() => this.runNextStep(loopId).catch(err => {
console.error(chalk.red(`\n ✗ Next step error: ${err}\n`));
}));
} catch (error) {
await this.handleError(loopId, stepConfig, error as Error);
}
}
/**
* Execute single step
*/
private async executeStep(
state: LoopState,
stepConfig: CliStepConfig
): Promise<{ output: string; stderr: string; conversationId: string; exitCode: number; durationMs: number }> {
const startTime = Date.now();
// Prepare prompt (replace variables)
const prompt = stepConfig.prompt_template
? this.replaceVariables(stepConfig.prompt_template, state.state_variables)
: '';
// Get resume ID
const sessionKey = `${stepConfig.tool}_${state.current_cli_step}`;
const resumeId = state.session_mapping[sessionKey];
// Prepare execution params
const execParams: any = {
tool: stepConfig.tool,
prompt,
mode: stepConfig.mode || 'analysis',
resume: resumeId,
stream: false
};
// Bash command special handling
if (stepConfig.tool === 'bash' && stepConfig.command) {
execParams.prompt = stepConfig.command;
}
// Execute CLI tool
const result = await cliExecutorTool.execute(execParams);
const durationMs = Date.now() - startTime;
return {
output: result.stdout || '',
stderr: result.stderr || '',
conversationId: result.execution.id,
exitCode: result.execution.exit_code || 0,
durationMs
};
}
/**
* Update state after step execution
*/
private async updateStateAfterStep(
loopId: string,
stepConfig: CliStepConfig,
result: { output: string; stderr: string; conversationId: string; exitCode: number; durationMs: number }
): Promise<void> {
const state = await this.stateManager.readState(loopId);
// Update session_mapping
const sessionKey = `${stepConfig.tool}_${state.current_cli_step}`;
const newSessionMapping = {
...state.session_mapping,
[sessionKey]: result.conversationId
};
// Update state_variables
const newStateVariables = {
...state.state_variables,
[`${stepConfig.step_id}_stdout`]: result.output,
[`${stepConfig.step_id}_stderr`]: result.stderr
};
// Add execution record
const executionRecord: ExecutionRecord = {
iteration: state.current_iteration,
step_index: state.current_cli_step,
step_id: stepConfig.step_id,
tool: stepConfig.tool,
conversation_id: result.conversationId,
exit_code: result.exitCode,
duration_ms: result.durationMs,
timestamp: new Date().toISOString()
};
const newExecutionHistory = [...(state.execution_history || []), executionRecord];
// Calculate next step
let nextStep = state.current_cli_step + 1;
let nextIteration = state.current_iteration;
// Reset step and increment iteration if round complete
if (nextStep >= state.cli_sequence.length) {
nextStep = 0;
nextIteration += 1;
}
// Update state
const newState = await this.stateManager.updateState(loopId, {
session_mapping: newSessionMapping,
state_variables: newStateVariables,
execution_history: newExecutionHistory,
current_cli_step: nextStep,
current_iteration: nextIteration
});
// Broadcast step completion with step-specific data
this.broadcastStepCompletion(loopId, stepConfig.step_id, result.exitCode, result.durationMs, result.output);
}
/**
* Replace template variables
*/
private replaceVariables(template: string, variables: Record<string, string>): string {
let result = template;
// Replace [variable_name] format
for (const [key, value] of Object.entries(variables)) {
const regex = new RegExp(`\\[${key}\\]`, 'g');
result = result.replace(regex, value);
}
return result;
}
/**
* Evaluate success condition with security constraints
* Only allows simple comparison and logical expressions
*/
private async evaluateSuccessCondition(state: LoopState): Promise<boolean> {
if (!state.success_condition) {
return false;
}
try {
// Security: Validate condition before execution
// Only allow safe characters: letters, digits, spaces, operators, parentheses, dots, quotes, underscores
const unsafePattern = /[^\w\s\.\(\)\[\]\{\}\'\"\!\=\>\<\&\|\+\-\*\/\?\:]/;
if (unsafePattern.test(state.success_condition)) {
console.error(chalk.yellow(` ⚠ Unsafe success condition contains invalid characters`));
return false;
}
// Block dangerous patterns
const blockedPatterns = [
/process\./,
/require\(/,
/import\s/,
/eval\(/,
/Function\(/,
/__proto__/,
/constructor\[/
];
for (const pattern of blockedPatterns) {
if (pattern.test(state.success_condition)) {
console.error(chalk.yellow(` ⚠ Blocked dangerous pattern in success condition`));
return false;
}
}
// Create a minimal sandbox context with only necessary data
// Using a Proxy to restrict access to only state_variables and current_iteration
const sandbox = {
get state_variables() {
return state.state_variables;
},
get current_iteration() {
return state.current_iteration;
}
};
// Create restricted context using Proxy
const restrictedContext = new Proxy(sandbox, {
has() {
return true; // Allow all property access
},
get(target, prop) {
// Only allow access to state_variables and current_iteration
if (prop === 'state_variables' || prop === 'current_iteration') {
return target[prop];
}
// Block access to other properties (including dangerous globals)
return undefined;
}
});
// Evaluate condition in restricted context
// We use the Function constructor but with a restricted scope
const conditionFn = new Function(
'state_variables',
'current_iteration',
`return (${state.success_condition});`
);
const result = conditionFn(
restrictedContext.state_variables,
restrictedContext.current_iteration
);
return Boolean(result);
} catch (error) {
console.error(chalk.yellow(` ⚠ Failed to evaluate success condition: ${error instanceof Error ? error.message : error}`));
return false;
}
}
/**
* Check if should terminate loop
*/
private async shouldTerminate(state: LoopState): Promise<boolean> {
// Completed or failed
if (state.status === 'completed' || state.status === 'failed') {
return true;
}
// Paused
if (state.status === 'paused') {
console.log(chalk.yellow(` ⏸ Loop is paused: ${state.loop_id}`));
return true;
}
// Max iterations exceeded
if (state.current_iteration > state.max_iterations) {
console.log(chalk.yellow(` ⚠ Max iterations reached: ${state.max_iterations}`));
await this.markCompleted(state.loop_id, 'Max iterations reached');
return true;
}
return false;
}
/**
* Handle errors
*/
private async handleError(loopId: string, stepConfig: CliStepConfig, error: Error): Promise<void> {
console.error(chalk.red(` ✗ Step failed: ${stepConfig.step_id}`));
console.error(chalk.red(` ${error.message}`));
const state = await this.stateManager.readState(loopId);
// Act based on error_policy
switch (state.error_policy.on_failure) {
case 'pause':
await this.pauseLoop(loopId, `Step ${stepConfig.step_id} failed: ${error.message}`);
break;
case 'retry':
if (state.error_policy.retry_count < (state.error_policy.max_retries || 3)) {
console.log(chalk.yellow(` 🔄 Retrying... (${state.error_policy.retry_count + 1}/${state.error_policy.max_retries})`));
await this.stateManager.updateState(loopId, {
error_policy: {
...state.error_policy,
retry_count: state.error_policy.retry_count + 1
}
});
// Re-execute current step
await this.runNextStep(loopId);
} else {
await this.markFailed(loopId, `Max retries exceeded for step ${stepConfig.step_id}`);
}
break;
case 'fail_fast':
await this.markFailed(loopId, `Step ${stepConfig.step_id} failed: ${error.message}`);
break;
}
}
/**
* Pause loop
*/
async pauseLoop(loopId: string, reason?: string): Promise<void> {
console.log(chalk.yellow(`\n ⏸ Pausing loop: ${loopId}`));
if (reason) {
console.log(chalk.gray(` Reason: ${reason}`));
}
await this.stateManager.updateState(loopId, {
status: 'paused' as LoopStatus,
failure_reason: reason
});
}
/**
* Resume loop
*/
async resumeLoop(loopId: string): Promise<void> {
console.log(chalk.cyan(`\n ▶ Resuming loop: ${loopId}\n`));
await this.stateManager.updateState(loopId, {
status: 'running' as LoopStatus,
error_policy: {
...(await this.stateManager.readState(loopId)).error_policy,
retry_count: 0
}
});
await this.runNextStep(loopId);
}
/**
* Stop loop
*/
async stopLoop(loopId: string): Promise<void> {
console.log(chalk.red(`\n ⏹ Stopping loop: ${loopId}\n`));
await this.stateManager.updateState(loopId, {
status: 'failed' as LoopStatus,
failure_reason: 'Manually stopped by user',
completed_at: new Date().toISOString()
});
}
/**
* Broadcast state update via WebSocket
*/
private broadcastStateUpdate(state: LoopState, eventType: 'LOOP_STATE_UPDATE' | 'LOOP_COMPLETED' = 'LOOP_STATE_UPDATE'): void {
try {
if (eventType === 'LOOP_STATE_UPDATE') {
broadcastLoopUpdate({
type: 'LOOP_STATE_UPDATE',
loop_id: state.loop_id,
status: state.status as 'created' | 'running' | 'paused' | 'completed' | 'failed',
current_iteration: state.current_iteration,
current_cli_step: state.current_cli_step,
updated_at: state.updated_at
});
} else if (eventType === 'LOOP_COMPLETED') {
broadcastLoopUpdate({
type: 'LOOP_COMPLETED',
loop_id: state.loop_id,
final_status: state.status === 'completed' ? 'completed' : 'failed',
total_iterations: state.current_iteration,
reason: state.failure_reason
});
}
} catch (error) {
// Silently ignore broadcast errors
}
}
/**
* Broadcast step completion via WebSocket
*/
private broadcastStepCompletion(
loopId: string,
stepId: string,
exitCode: number,
durationMs: number,
output: string
): void {
try {
broadcastLoopUpdate({
type: 'LOOP_STEP_COMPLETED',
loop_id: loopId,
step_id: stepId,
exit_code: exitCode,
duration_ms: durationMs,
output: output
});
} catch (error) {
// Silently ignore broadcast errors
}
}
/**
* Mark as completed
*/
private async markCompleted(loopId: string, reason?: string): Promise<void> {
console.log(chalk.green(`\n ✓ Loop completed: ${loopId}`));
if (reason) {
console.log(chalk.gray(` ${reason}`));
}
const state = await this.stateManager.updateState(loopId, {
status: 'completed' as LoopStatus,
completed_at: new Date().toISOString()
});
// Broadcast completion
this.broadcastStateUpdate(state, 'LOOP_COMPLETED');
}
/**
* Mark as failed
*/
private async markFailed(loopId: string, reason: string): Promise<void> {
console.log(chalk.red(`\n ✗ Loop failed: ${loopId}`));
console.log(chalk.gray(` ${reason}\n`));
const state = await this.stateManager.updateState(loopId, {
status: 'failed' as LoopStatus,
failure_reason: reason,
completed_at: new Date().toISOString()
});
// Broadcast failure
this.broadcastStateUpdate(state, 'LOOP_COMPLETED');
}
/**
* Get loop status
*/
async getStatus(loopId: string): Promise<LoopState> {
return this.stateManager.readState(loopId);
}
/**
* List all loops
*/
async listLoops(): Promise<LoopState[]> {
return this.stateManager.listStates();
}
/**
* Generate loop ID
*/
private generateLoopId(taskId: string): string {
const timestamp = new Date().toISOString().replace(/[-:]/g, '').split('.')[0];
return `loop-${taskId}-${timestamp}`;
}
}

View File

@@ -0,0 +1,173 @@
/**
* Loop State Manager
* CCW Loop System - JSON state persistence layer
* Reference: .workflow/.scratchpad/loop-system-complete-design-20260121.md section 4.1
*/
import { readFile, writeFile, unlink, mkdir, copyFile } from 'fs/promises';
import { join } from 'path';
import { existsSync } from 'fs';
import type { LoopState, LoopStatus, TaskLoopControl } from '../types/loop.js';
export class LoopStateManager {
private baseDir: string;
constructor(workflowDir: string) {
// State files stored in .workflow/active/WFS-{session}/.loop/
this.baseDir = join(workflowDir, '.loop');
}
/**
* Create new loop state
*/
async createState(loopId: string, taskId: string, config: TaskLoopControl): Promise<LoopState> {
await this.ensureDir();
const state: LoopState = {
loop_id: loopId,
task_id: taskId,
status: 'created' as LoopStatus,
current_iteration: 1,
max_iterations: config.max_iterations,
current_cli_step: 0,
cli_sequence: config.cli_sequence,
session_mapping: {},
state_variables: {},
success_condition: config.success_condition,
error_policy: {
on_failure: config.error_policy.on_failure,
retry_count: 0,
max_retries: config.error_policy.max_retries || 3
},
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
execution_history: []
};
await this.writeState(loopId, state);
return state;
}
/**
* Read loop state
*/
async readState(loopId: string): Promise<LoopState> {
const filePath = this.getStateFilePath(loopId);
if (!existsSync(filePath)) {
throw new Error(`Loop state not found: ${loopId}`);
}
const content = await readFile(filePath, 'utf-8');
return JSON.parse(content) as LoopState;
}
/**
* Update loop state
*/
async updateState(loopId: string, updates: Partial<LoopState>): Promise<LoopState> {
const currentState = await this.readState(loopId);
const newState: LoopState = {
...currentState,
...updates,
updated_at: new Date().toISOString()
};
await this.writeState(loopId, newState);
return newState;
}
/**
* Delete loop state
*/
async deleteState(loopId: string): Promise<void> {
const filePath = this.getStateFilePath(loopId);
if (existsSync(filePath)) {
await unlink(filePath);
}
}
/**
* List all loop states
*/
async listStates(): Promise<LoopState[]> {
if (!existsSync(this.baseDir)) {
return [];
}
const { readdir } = await import('fs/promises');
const files = await readdir(this.baseDir);
const stateFiles = files.filter(f => f.startsWith('loop-') && f.endsWith('.json'));
const states: LoopState[] = [];
for (const file of stateFiles) {
const loopId = file.replace('.json', '');
try {
const state = await this.readState(loopId);
states.push(state);
} catch (err) {
console.error(`Failed to read state ${loopId}:`, err);
}
}
return states;
}
/**
* Read state with recovery from backup
*/
async readStateWithRecovery(loopId: string): Promise<LoopState> {
try {
return await this.readState(loopId);
} catch (error) {
console.warn(`State file corrupted, attempting recovery for ${loopId}...`);
// Try reading from backup
const backupFile = `${this.getStateFilePath(loopId)}.backup`;
if (existsSync(backupFile)) {
const content = await readFile(backupFile, 'utf-8');
const state = JSON.parse(content) as LoopState;
// Restore from backup
await this.writeState(loopId, state);
return state;
}
throw error;
}
}
/**
* Get state file path
*/
getStateFilePath(loopId: string): string {
return join(this.baseDir, `${loopId}.json`);
}
/**
* Ensure directory exists
*/
private async ensureDir(): Promise<void> {
if (!existsSync(this.baseDir)) {
await mkdir(this.baseDir, { recursive: true });
}
}
/**
* Write state file with automatic backup
*/
private async writeState(loopId: string, state: LoopState): Promise<void> {
const filePath = this.getStateFilePath(loopId);
// Create backup if file exists
if (existsSync(filePath)) {
const backupPath = `${filePath}.backup`;
await copyFile(filePath, backupPath).catch(() => {
// Ignore backup errors
});
}
await writeFile(filePath, JSON.stringify(state, null, 2), 'utf-8');
}
}