From 4fb247f7c57bca3a167a3800d1548c78185b7553 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Sat, 3 Jan 2026 11:31:49 +0800 Subject: [PATCH] feat(issue): Add multi-queue support and enhanced failure handling - Add --queue parameter for explicit queue targeting (next, dag, detail, done, retry) - Implement serialized multi-queue execution (complete Q1 before Q2) - Add queue activate/priority subcommands for multi-queue management - Add FailureDetail interface for structured failure tracking - Preserve failure history on retry for debugging - Show failure reasons and retry count in queue status display - Auto-detect queue from item ID in done/detail commands --- ccw/src/cli.ts | 1 + ccw/src/commands/issue.ts | 458 ++++++++++++++++++++++++++++++++------ 2 files changed, 393 insertions(+), 66 deletions(-) diff --git a/ccw/src/cli.ts b/ccw/src/cli.ts index 8d2290c4..fc441d47 100644 --- a/ccw/src/cli.ts +++ b/ccw/src/cli.ts @@ -288,6 +288,7 @@ export function run(argv: string[]): void { .option('--reason ', 'Failure reason') .option('--fail', 'Mark task as failed') .option('--from-queue [queue-id]', 'Sync issue statuses from queue (default: active queue)') + .option('--queue ', 'Target queue ID for multi-queue operations') .action((subcommand, args, options) => issueCommand(subcommand, args, options)); program.parse(argv); diff --git a/ccw/src/commands/issue.ts b/ccw/src/commands/issue.ts index f2481112..652f141e 100644 --- a/ccw/src/commands/issue.ts +++ b/ccw/src/commands/issue.ts @@ -108,6 +108,15 @@ interface Solution { bound_at?: string; } +// Structured failure detail for debugging +interface FailureDetail { + task_id?: string; // Which task failed within the solution + error_type: string; // e.g., "compilation", "test_failure", "timeout" + message: string; // Human-readable error message + stack_trace?: string; // Optional stack trace + timestamp: string; // ISO timestamp +} + interface QueueItem { item_id: string; // Item ID in queue: T-1, T-2, ... (task-level) or S-1, S-2, ... (solution-level) issue_id: string; @@ -124,7 +133,9 @@ interface QueueItem { started_at?: string; completed_at?: string; result?: Record; - failure_reason?: string; + failure_reason?: string; // Simple string (backward compat) + failure_details?: FailureDetail; // Structured failure info + failure_history?: FailureDetail[]; // Preserved on retry for debugging } interface QueueConflict { @@ -168,10 +179,12 @@ interface Queue { } interface QueueIndex { - active_queue_id: string | null; + active_queue_id: string | null; // Single active queue (backward compat) + active_queue_ids?: string[]; // Multiple active queues, ordered by priority queues: { id: string; status: string; + priority?: number; // Queue execution priority (lower = higher priority) issue_ids: string[]; total_tasks?: number; // For task-level queues total_solutions?: number; // For solution-level queues @@ -197,6 +210,7 @@ interface IssueOptions { brief?: boolean; // List brief info only (id, title, status, priority, tags) - JSON format data?: string; // JSON data for create fromQueue?: boolean | string; // Sync statuses from queue (true=active, string=specific queue ID) + queue?: string; // Target queue ID for multi-queue operations } const ISSUES_DIR = '.workflow/issues'; @@ -528,6 +542,119 @@ function createEmptyQueue(): Queue { }; } +// ============ Multi-Queue Helper Functions ============ + +/** + * Find which queue contains a given item ID + * Supports both simple (S-1) and qualified (QUE-xxx:S-1) formats + */ +function findItemQueue(itemId: string): { queue: Queue; item: QueueItem; itemIndex: number } | null { + // Check if qualified format (QUE-xxx:S-1) + const qualifiedMatch = itemId.match(/^(QUE-[^:]+):(.+)$/); + if (qualifiedMatch) { + const [, queueId, actualItemId] = qualifiedMatch; + const queue = readQueue(queueId); + if (!queue) return null; + const items = queue.solutions || queue.tasks || []; + const itemIndex = items.findIndex(i => i.item_id === actualItemId); + if (itemIndex === -1) return null; + return { queue, item: items[itemIndex], itemIndex }; + } + + // Search all queues for unqualified item ID + const index = readQueueIndex(); + const activeQueueIds = index.active_queue_ids || (index.active_queue_id ? [index.active_queue_id] : []); + + // Search active queues first + for (const queueId of activeQueueIds) { + const queue = readQueue(queueId); + if (!queue) continue; + const items = queue.solutions || queue.tasks || []; + const itemIndex = items.findIndex(i => i.item_id === itemId); + if (itemIndex >= 0) { + return { queue, item: items[itemIndex], itemIndex }; + } + } + + // Search all other queues + for (const queueEntry of index.queues) { + if (activeQueueIds.includes(queueEntry.id)) continue; + const queue = readQueue(queueEntry.id); + if (!queue) continue; + const items = queue.solutions || queue.tasks || []; + const itemIndex = items.findIndex(i => i.item_id === itemId); + if (itemIndex >= 0) { + return { queue, item: items[itemIndex], itemIndex }; + } + } + + return null; +} + +/** + * Get all active queues ordered by priority (lower = higher priority) + * Falls back to creation date order + */ +function getActiveQueues(): Queue[] { + const index = readQueueIndex(); + const activeIds = index.active_queue_ids || (index.active_queue_id ? [index.active_queue_id] : []); + + const queues: Queue[] = []; + for (const queueId of activeIds) { + const queue = readQueue(queueId); + if (queue && queue.status === 'active') { + queues.push(queue); + } + } + + // Sort by priority field in index (lower = higher priority) + const priorityMap = new Map(); + for (const entry of index.queues) { + priorityMap.set(entry.id, entry.priority ?? Number.MAX_SAFE_INTEGER); + } + + queues.sort((a, b) => { + const pa = priorityMap.get(a.id) ?? Number.MAX_SAFE_INTEGER; + const pb = priorityMap.get(b.id) ?? Number.MAX_SAFE_INTEGER; + if (pa !== pb) return pa - pb; + // Fall back to creation date (from queue ID) + return a.id.localeCompare(b.id); + }); + + return queues; +} + +/** + * Parse failure reason into structured FailureDetail + * Detects JSON format vs plain string + */ +function parseFailureReason(reason: string): FailureDetail { + const timestamp = new Date().toISOString(); + + // Try to parse as JSON first + if (reason.trim().startsWith('{')) { + try { + const parsed = JSON.parse(reason); + return { + task_id: parsed.task_id, + error_type: parsed.error_type || 'unknown', + message: parsed.message || reason, + stack_trace: parsed.stack_trace, + timestamp + }; + } catch { + // Not valid JSON, treat as plain message + } + } + + // Plain string message + return { + error_type: 'execution_error', + message: reason, + timestamp + }; +} + function writeQueue(queue: Queue): void { ensureQueuesDir(); @@ -1323,15 +1450,82 @@ async function queueAction(subAction: string | undefined, issueId: string | unde const index = readQueueIndex(); index.active_queue_id = queueId; + // Also update active_queue_ids for multi-queue support + index.active_queue_ids = [queueId]; writeQueueIndex(index); console.log(chalk.green(`✓ Switched to queue ${queueId}`)); return; } + // Set queue priority (lower = higher priority) + if (subAction === 'priority' && issueId) { + const queueId = issueId; + const priority = parseInt(options.priority || '0'); + + if (isNaN(priority)) { + console.error(chalk.red('Invalid priority value (must be a number)')); + process.exit(1); + } + + const index = readQueueIndex(); + const queueEntry = index.queues.find(q => q.id === queueId); + + if (!queueEntry) { + console.error(chalk.red(`Queue "${queueId}" not found`)); + process.exit(1); + } + + queueEntry.priority = priority; + writeQueueIndex(index); + + console.log(chalk.green(`✓ Queue ${queueId} priority set to ${priority}`)); + return; + } + + // Activate multiple queues at once + if (subAction === 'activate' && issueId) { + const queueIds = issueId.split(',').map(id => id.trim()); + const index = readQueueIndex(); + + // Validate all queue IDs + for (const queueId of queueIds) { + if (!index.queues.some(q => q.id === queueId)) { + console.error(chalk.red(`Queue "${queueId}" not found`)); + process.exit(1); + } + } + + index.active_queue_ids = queueIds; + index.active_queue_id = queueIds[0] || null; // Backward compat + writeQueueIndex(index); + + console.log(chalk.green(`✓ Activated ${queueIds.length} queue(s): ${queueIds.join(', ')}`)); + return; + } + // DAG - Return dependency graph for parallel execution planning (solution-level) if (subAction === 'dag') { - const queue = readActiveQueue(); + let queue: Queue; + + // Use explicit queue if provided via --queue or issueId, otherwise use active queue + if (options.queue) { + const targetQueue = readQueue(options.queue); + if (!targetQueue) { + console.log(JSON.stringify({ error: `Queue ${options.queue} not found`, nodes: [], edges: [], groups: [] })); + return; + } + queue = targetQueue; + } else if (issueId && issueId.startsWith('QUE-')) { + const targetQueue = readQueue(issueId); + if (!targetQueue) { + console.log(JSON.stringify({ error: `Queue ${issueId} not found`, nodes: [], edges: [], groups: [] })); + return; + } + queue = targetQueue; + } else { + queue = readActiveQueue(); + } // Support both old (tasks) and new (solutions) queue format const items = queue.solutions || queue.tasks || []; @@ -1648,12 +1842,26 @@ async function queueAction(subAction: string | undefined, issueId: string | unde ? String(item.task_count || 0).padEnd(8) : (item.task_id || '-').padEnd(8); - console.log( - item.item_id.padEnd(10) + + let line = item.item_id.padEnd(10) + item.issue_id.substring(0, 13).padEnd(15) + thirdCol + - statusColor(item.status) - ); + statusColor(item.status); + + // Show failure reason for failed items + if (item.status === 'failed') { + const reason = item.failure_details?.message || item.failure_reason; + if (reason) { + // Truncate to 40 chars for display + const shortReason = reason.length > 40 ? reason.substring(0, 37) + '...' : reason; + line += chalk.gray(` [${shortReason}]`); + } + // Show retry count if there's failure history + if (item.failure_history && item.failure_history.length > 0) { + line += chalk.gray(` (${item.failure_history.length} retry)`); + } + } + + console.log(line); } } @@ -1662,17 +1870,58 @@ async function queueAction(subAction: string | undefined, issueId: string | unde * Accepts optional item_id to fetch a specific task directly */ async function nextAction(itemId: string | undefined, options: IssueOptions): Promise { - const queue = readActiveQueue(); - // Support both old (tasks) and new (solutions) queue format - const items = queue.solutions || queue.tasks || []; - let nextItem: typeof items[0] | undefined; + let queue: Queue; + let items: QueueItem[]; + + // Determine which queue(s) to use + if (options.queue) { + // Explicit queue specified + const targetQueue = readQueue(options.queue); + if (!targetQueue) { + console.log(JSON.stringify({ status: 'error', message: `Queue ${options.queue} not found` })); + return; + } + queue = targetQueue; + items = queue.solutions || queue.tasks || []; + } else { + // Multi-queue: iterate active queues in priority order (serialized execution) + const activeQueues = getActiveQueues(); + + if (activeQueues.length === 0) { + console.log(JSON.stringify({ status: 'empty', message: 'No active queues' })); + return; + } + + // Find first queue with incomplete items (serialized: complete Q1 before Q2) + let foundQueue: Queue | null = null; + for (const q of activeQueues) { + const queueItems = q.solutions || q.tasks || []; + const hasIncomplete = queueItems.some(i => + i.status === 'pending' || i.status === 'executing' + ); + if (hasIncomplete) { + foundQueue = q; + break; + } + } + + if (!foundQueue) { + console.log(JSON.stringify({ status: 'empty', message: 'All queues completed' })); + return; + } + + queue = foundQueue; + items = queue.solutions || queue.tasks || []; + } + + let nextItem: QueueItem | undefined; let isResume = false; // If specific item_id provided, fetch that item directly if (itemId) { nextItem = items.find(t => t.item_id === itemId); if (!nextItem) { - console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found` })); + console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found in queue ${queue.id}` })); return; } if (nextItem.status === 'completed') { @@ -1701,6 +1950,7 @@ async function nextAction(itemId: string | undefined, options: IssueOptions): Pr console.log(JSON.stringify({ status: 'empty', message: 'No ready items', + queue_id: queue.id, queue_status: queue._metadata }, null, 2)); return; @@ -1748,6 +1998,7 @@ async function nextAction(itemId: string | undefined, options: IssueOptions): Pr const totalMinutes = solution.tasks?.reduce((sum, t) => sum + (t.estimated_minutes || 30), 0) || 30; console.log(JSON.stringify({ + queue_id: queue.id, item_id: nextItem.item_id, issue_id: nextItem.issue_id, solution_id: nextItem.solution_id, @@ -1783,10 +2034,30 @@ async function detailAction(itemId: string | undefined, options: IssueOptions): return; } - const queue = readActiveQueue(); - // Support both old (tasks) and new (solutions) queue format - const items = queue.solutions || queue.tasks || []; - const queueItem = items.find(t => t.item_id === itemId); + let queue: Queue; + let queueItem: QueueItem | undefined; + + // Use explicit queue if provided, otherwise auto-detect + if (options.queue) { + const targetQueue = readQueue(options.queue); + if (!targetQueue) { + console.log(JSON.stringify({ status: 'error', message: `Queue ${options.queue} not found` })); + return; + } + queue = targetQueue; + const items = queue.solutions || queue.tasks || []; + queueItem = items.find(t => t.item_id === itemId); + } else { + // Auto-detect queue from item ID + const found = findItemQueue(itemId); + if (found) { + queue = found.queue; + queueItem = found.item; + } else { + console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found in any queue` })); + return; + } + } if (!queueItem) { console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found` })); @@ -1806,6 +2077,7 @@ async function detailAction(itemId: string | undefined, options: IssueOptions): // Return FULL SOLUTION with all tasks (READ-ONLY - no status update) console.log(JSON.stringify({ + queue_id: queue.id, item_id: queueItem.item_id, issue_id: queueItem.issue_id, solution_id: queueItem.solution_id, @@ -1827,21 +2099,41 @@ async function detailAction(itemId: string | undefined, options: IssueOptions): /** * done - Mark task completed or failed */ -async function doneAction(queueId: string | undefined, options: IssueOptions): Promise { - if (!queueId) { +async function doneAction(queueItemId: string | undefined, options: IssueOptions): Promise { + if (!queueItemId) { console.error(chalk.red('Item ID is required')); - console.error(chalk.gray('Usage: ccw issue done [--fail] [--reason "..."]')); + console.error(chalk.gray('Usage: ccw issue done [--fail] [--reason "..."] [--queue ]')); process.exit(1); } - const queue = readActiveQueue(); - // Support both old (tasks) and new (solutions) queue format - const items = queue.solutions || queue.tasks || []; - const idx = items.findIndex(q => q.item_id === queueId); + let queue: Queue; + let items: QueueItem[]; + let idx: number; - if (idx === -1) { - console.error(chalk.red(`Queue item "${queueId}" not found`)); - process.exit(1); + // Use explicit queue if provided, otherwise auto-detect + if (options.queue) { + const targetQueue = readQueue(options.queue); + if (!targetQueue) { + console.error(chalk.red(`Queue "${options.queue}" not found`)); + process.exit(1); + } + queue = targetQueue; + items = queue.solutions || queue.tasks || []; + idx = items.findIndex(q => q.item_id === queueItemId); + if (idx === -1) { + console.error(chalk.red(`Queue item "${queueItemId}" not found in queue ${options.queue}`)); + process.exit(1); + } + } else { + // Auto-detect queue from item ID + const found = findItemQueue(queueItemId); + if (!found) { + console.error(chalk.red(`Queue item "${queueItemId}" not found in any queue`)); + process.exit(1); + } + queue = found.queue; + items = queue.solutions || queue.tasks || []; + idx = found.itemIndex; } const isFail = options.fail; @@ -1849,7 +2141,9 @@ async function doneAction(queueId: string | undefined, options: IssueOptions): P items[idx].completed_at = new Date().toISOString(); if (isFail) { - items[idx].failure_reason = options.reason || 'Unknown failure'; + const reason = options.reason || 'Unknown failure'; + items[idx].failure_reason = reason; // Backward compat + items[idx].failure_details = parseFailureReason(reason); // Structured failure } else if (options.result) { try { items[idx].result = JSON.parse(options.result); @@ -1863,10 +2157,10 @@ async function doneAction(queueId: string | undefined, options: IssueOptions): P if (isFail) { updateIssue(issueId, { status: 'failed' }); - console.log(chalk.red(`✗ ${queueId} failed`)); + console.log(chalk.red(`✗ ${queueItemId} failed`)); } else { updateIssue(issueId, { status: 'completed', completed_at: new Date().toISOString() }); - console.log(chalk.green(`✓ ${queueId} completed`)); + console.log(chalk.green(`✓ ${queueItemId} completed`)); console.log(chalk.green(`✓ Issue ${issueId} completed`)); } @@ -1895,53 +2189,81 @@ async function doneAction(queueId: string | undefined, options: IssueOptions): P * retry - Reset failed items to pending for re-execution */ async function retryAction(issueId: string | undefined, options: IssueOptions): Promise { - const queue = readActiveQueue(); - // Support both old (tasks) and new (solutions) queue format - const items = queue.solutions || queue.tasks || []; + let queues: Queue[]; - if (!queue.id || items.length === 0) { - console.log(chalk.yellow('No active queue')); + // Use explicit queue if provided, otherwise use all active queues + if (options.queue) { + const targetQueue = readQueue(options.queue); + if (!targetQueue) { + console.log(chalk.red(`Queue "${options.queue}" not found`)); + return; + } + queues = [targetQueue]; + } else { + queues = getActiveQueues(); + } + + if (queues.length === 0) { + console.log(chalk.yellow('No active queues')); return; } - let updated = 0; + let totalUpdated = 0; - for (const item of items) { - // Retry failed items only - if (item.status === 'failed') { - if (!issueId || item.issue_id === issueId) { - item.status = 'pending'; - item.failure_reason = undefined; - item.started_at = undefined; - item.completed_at = undefined; - updated++; + for (const queue of queues) { + const items = queue.solutions || queue.tasks || []; + let queueUpdated = 0; + + for (const item of items) { + // Retry failed items only + if (item.status === 'failed') { + if (!issueId || item.issue_id === issueId) { + // Preserve failure history before resetting + if (item.failure_details) { + if (!item.failure_history) { + item.failure_history = []; + } + item.failure_history.push(item.failure_details); + } + + // Reset for retry + item.status = 'pending'; + item.failure_reason = undefined; + item.failure_details = undefined; + item.started_at = undefined; + item.completed_at = undefined; + queueUpdated++; + } } } + + if (queueUpdated > 0) { + // Reset queue status if it was failed + if (queue.status === 'failed') { + queue.status = 'active'; + } + + // Write back to queue + if (queue.solutions) { + queue.solutions = items; + } else { + queue.tasks = items; + } + writeQueue(queue); + totalUpdated += queueUpdated; + } } - if (updated === 0) { + if (totalUpdated === 0) { console.log(chalk.yellow('No failed items to retry')); return; } - // Reset queue status if it was failed - if (queue.status === 'failed') { - queue.status = 'active'; - } - - // Write back to queue - if (queue.solutions) { - queue.solutions = items; - } else { - queue.tasks = items; - } - writeQueue(queue); - if (issueId) { updateIssue(issueId, { status: 'queued' }); } - console.log(chalk.green(`✓ Reset ${updated} item(s) to pending`)); + console.log(chalk.green(`✓ Reset ${totalUpdated} item(s) to pending (failure history preserved)`)); } // ============ Main Entry ============ @@ -2025,16 +2347,18 @@ export async function issueCommand( console.log(chalk.gray(' queue list List all queues (history)')); console.log(chalk.gray(' queue add Add issue to active queue (or create new)')); console.log(chalk.gray(' queue switch Switch active queue')); - console.log(chalk.gray(' queue dag Get dependency graph (JSON) for parallel execution')); + console.log(chalk.gray(' queue activate Activate multiple queues (comma-separated)')); + console.log(chalk.gray(' queue priority Set queue priority (--priority N, lower=higher)')); + console.log(chalk.gray(' queue dag [--queue ] Get dependency graph (JSON) for parallel execution')); console.log(chalk.gray(' queue archive Archive current queue')); console.log(chalk.gray(' queue delete Delete queue from history')); - console.log(chalk.gray(' retry [issue-id] Retry failed tasks')); + console.log(chalk.gray(' retry [issue-id] [--queue ] Retry failed tasks')); console.log(); console.log(chalk.bold('Execution Endpoints:')); - console.log(chalk.gray(' next [item-id] Get & mark task executing (JSON)')); - console.log(chalk.gray(' detail Get task details (READ-ONLY, for parallel)')); - console.log(chalk.gray(' done Mark task completed')); - console.log(chalk.gray(' done --fail Mark task failed')); + console.log(chalk.gray(' next [item-id] [--queue ] Get & mark task executing (JSON)')); + console.log(chalk.gray(' detail [--queue ] Get task details (READ-ONLY, for parallel)')); + console.log(chalk.gray(' done [--queue ] Mark task completed')); + console.log(chalk.gray(' done --fail --reason "." Mark task failed with reason (supports JSON)')); console.log(); console.log(chalk.bold('Options:')); console.log(chalk.gray(' --title Issue/task title')); @@ -2042,7 +2366,9 @@ export async function issueCommand( console.log(chalk.gray(' --brief Brief JSON output (minimal fields)')); console.log(chalk.gray(' --solution <path> Solution JSON file')); console.log(chalk.gray(' --result <json> Execution result')); - console.log(chalk.gray(' --reason <text> Failure reason')); + console.log(chalk.gray(' --reason <text> Failure reason (string or JSON)')); + console.log(chalk.gray(' --queue <queue-id> Target queue for multi-queue operations')); + console.log(chalk.gray(' --priority <n> Queue priority (lower = higher)')); console.log(chalk.gray(' --json JSON output')); console.log(chalk.gray(' --force Force operation')); console.log();