feat(issue): Add multi-queue support and enhanced failure handling

- Add --queue parameter for explicit queue targeting (next, dag, detail, done, retry)
- Implement serialized multi-queue execution (complete Q1 before Q2)
- Add queue activate/priority subcommands for multi-queue management
- Add FailureDetail interface for structured failure tracking
- Preserve failure history on retry for debugging
- Show failure reasons and retry count in queue status display
- Auto-detect queue from item ID in done/detail commands
This commit is contained in:
catlog22
2026-01-03 11:31:49 +08:00
parent 54fd94547c
commit 4fb247f7c5
2 changed files with 393 additions and 66 deletions

View File

@@ -288,6 +288,7 @@ export function run(argv: string[]): void {
.option('--reason <text>', 'Failure reason')
.option('--fail', 'Mark task as failed')
.option('--from-queue [queue-id]', 'Sync issue statuses from queue (default: active queue)')
.option('--queue <queue-id>', 'Target queue ID for multi-queue operations')
.action((subcommand, args, options) => issueCommand(subcommand, args, options));
program.parse(argv);

View File

@@ -108,6 +108,15 @@ interface Solution {
bound_at?: string;
}
// Structured failure detail for debugging
interface FailureDetail {
task_id?: string; // Which task failed within the solution
error_type: string; // e.g., "compilation", "test_failure", "timeout"
message: string; // Human-readable error message
stack_trace?: string; // Optional stack trace
timestamp: string; // ISO timestamp
}
interface QueueItem {
item_id: string; // Item ID in queue: T-1, T-2, ... (task-level) or S-1, S-2, ... (solution-level)
issue_id: string;
@@ -124,7 +133,9 @@ interface QueueItem {
started_at?: string;
completed_at?: string;
result?: Record<string, any>;
failure_reason?: string;
failure_reason?: string; // Simple string (backward compat)
failure_details?: FailureDetail; // Structured failure info
failure_history?: FailureDetail[]; // Preserved on retry for debugging
}
interface QueueConflict {
@@ -168,10 +179,12 @@ interface Queue {
}
interface QueueIndex {
active_queue_id: string | null;
active_queue_id: string | null; // Single active queue (backward compat)
active_queue_ids?: string[]; // Multiple active queues, ordered by priority
queues: {
id: string;
status: string;
priority?: number; // Queue execution priority (lower = higher priority)
issue_ids: string[];
total_tasks?: number; // For task-level queues
total_solutions?: number; // For solution-level queues
@@ -197,6 +210,7 @@ interface IssueOptions {
brief?: boolean; // List brief info only (id, title, status, priority, tags) - JSON format
data?: string; // JSON data for create
fromQueue?: boolean | string; // Sync statuses from queue (true=active, string=specific queue ID)
queue?: string; // Target queue ID for multi-queue operations
}
const ISSUES_DIR = '.workflow/issues';
@@ -528,6 +542,119 @@ function createEmptyQueue(): Queue {
};
}
// ============ Multi-Queue Helper Functions ============
/**
* Find which queue contains a given item ID
* Supports both simple (S-1) and qualified (QUE-xxx:S-1) formats
*/
function findItemQueue(itemId: string): { queue: Queue; item: QueueItem; itemIndex: number } | null {
// Check if qualified format (QUE-xxx:S-1)
const qualifiedMatch = itemId.match(/^(QUE-[^:]+):(.+)$/);
if (qualifiedMatch) {
const [, queueId, actualItemId] = qualifiedMatch;
const queue = readQueue(queueId);
if (!queue) return null;
const items = queue.solutions || queue.tasks || [];
const itemIndex = items.findIndex(i => i.item_id === actualItemId);
if (itemIndex === -1) return null;
return { queue, item: items[itemIndex], itemIndex };
}
// Search all queues for unqualified item ID
const index = readQueueIndex();
const activeQueueIds = index.active_queue_ids || (index.active_queue_id ? [index.active_queue_id] : []);
// Search active queues first
for (const queueId of activeQueueIds) {
const queue = readQueue(queueId);
if (!queue) continue;
const items = queue.solutions || queue.tasks || [];
const itemIndex = items.findIndex(i => i.item_id === itemId);
if (itemIndex >= 0) {
return { queue, item: items[itemIndex], itemIndex };
}
}
// Search all other queues
for (const queueEntry of index.queues) {
if (activeQueueIds.includes(queueEntry.id)) continue;
const queue = readQueue(queueEntry.id);
if (!queue) continue;
const items = queue.solutions || queue.tasks || [];
const itemIndex = items.findIndex(i => i.item_id === itemId);
if (itemIndex >= 0) {
return { queue, item: items[itemIndex], itemIndex };
}
}
return null;
}
/**
* Get all active queues ordered by priority (lower = higher priority)
* Falls back to creation date order
*/
function getActiveQueues(): Queue[] {
const index = readQueueIndex();
const activeIds = index.active_queue_ids || (index.active_queue_id ? [index.active_queue_id] : []);
const queues: Queue[] = [];
for (const queueId of activeIds) {
const queue = readQueue(queueId);
if (queue && queue.status === 'active') {
queues.push(queue);
}
}
// Sort by priority field in index (lower = higher priority)
const priorityMap = new Map<string, number>();
for (const entry of index.queues) {
priorityMap.set(entry.id, entry.priority ?? Number.MAX_SAFE_INTEGER);
}
queues.sort((a, b) => {
const pa = priorityMap.get(a.id) ?? Number.MAX_SAFE_INTEGER;
const pb = priorityMap.get(b.id) ?? Number.MAX_SAFE_INTEGER;
if (pa !== pb) return pa - pb;
// Fall back to creation date (from queue ID)
return a.id.localeCompare(b.id);
});
return queues;
}
/**
* Parse failure reason into structured FailureDetail
* Detects JSON format vs plain string
*/
function parseFailureReason(reason: string): FailureDetail {
const timestamp = new Date().toISOString();
// Try to parse as JSON first
if (reason.trim().startsWith('{')) {
try {
const parsed = JSON.parse(reason);
return {
task_id: parsed.task_id,
error_type: parsed.error_type || 'unknown',
message: parsed.message || reason,
stack_trace: parsed.stack_trace,
timestamp
};
} catch {
// Not valid JSON, treat as plain message
}
}
// Plain string message
return {
error_type: 'execution_error',
message: reason,
timestamp
};
}
function writeQueue(queue: Queue): void {
ensureQueuesDir();
@@ -1323,15 +1450,82 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
const index = readQueueIndex();
index.active_queue_id = queueId;
// Also update active_queue_ids for multi-queue support
index.active_queue_ids = [queueId];
writeQueueIndex(index);
console.log(chalk.green(`✓ Switched to queue ${queueId}`));
return;
}
// Set queue priority (lower = higher priority)
if (subAction === 'priority' && issueId) {
const queueId = issueId;
const priority = parseInt(options.priority || '0');
if (isNaN(priority)) {
console.error(chalk.red('Invalid priority value (must be a number)'));
process.exit(1);
}
const index = readQueueIndex();
const queueEntry = index.queues.find(q => q.id === queueId);
if (!queueEntry) {
console.error(chalk.red(`Queue "${queueId}" not found`));
process.exit(1);
}
queueEntry.priority = priority;
writeQueueIndex(index);
console.log(chalk.green(`✓ Queue ${queueId} priority set to ${priority}`));
return;
}
// Activate multiple queues at once
if (subAction === 'activate' && issueId) {
const queueIds = issueId.split(',').map(id => id.trim());
const index = readQueueIndex();
// Validate all queue IDs
for (const queueId of queueIds) {
if (!index.queues.some(q => q.id === queueId)) {
console.error(chalk.red(`Queue "${queueId}" not found`));
process.exit(1);
}
}
index.active_queue_ids = queueIds;
index.active_queue_id = queueIds[0] || null; // Backward compat
writeQueueIndex(index);
console.log(chalk.green(`✓ Activated ${queueIds.length} queue(s): ${queueIds.join(', ')}`));
return;
}
// DAG - Return dependency graph for parallel execution planning (solution-level)
if (subAction === 'dag') {
const queue = readActiveQueue();
let queue: Queue;
// Use explicit queue if provided via --queue or issueId, otherwise use active queue
if (options.queue) {
const targetQueue = readQueue(options.queue);
if (!targetQueue) {
console.log(JSON.stringify({ error: `Queue ${options.queue} not found`, nodes: [], edges: [], groups: [] }));
return;
}
queue = targetQueue;
} else if (issueId && issueId.startsWith('QUE-')) {
const targetQueue = readQueue(issueId);
if (!targetQueue) {
console.log(JSON.stringify({ error: `Queue ${issueId} not found`, nodes: [], edges: [], groups: [] }));
return;
}
queue = targetQueue;
} else {
queue = readActiveQueue();
}
// Support both old (tasks) and new (solutions) queue format
const items = queue.solutions || queue.tasks || [];
@@ -1648,12 +1842,26 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
? String(item.task_count || 0).padEnd(8)
: (item.task_id || '-').padEnd(8);
console.log(
item.item_id.padEnd(10) +
let line = item.item_id.padEnd(10) +
item.issue_id.substring(0, 13).padEnd(15) +
thirdCol +
statusColor(item.status)
);
statusColor(item.status);
// Show failure reason for failed items
if (item.status === 'failed') {
const reason = item.failure_details?.message || item.failure_reason;
if (reason) {
// Truncate to 40 chars for display
const shortReason = reason.length > 40 ? reason.substring(0, 37) + '...' : reason;
line += chalk.gray(` [${shortReason}]`);
}
// Show retry count if there's failure history
if (item.failure_history && item.failure_history.length > 0) {
line += chalk.gray(` (${item.failure_history.length} retry)`);
}
}
console.log(line);
}
}
@@ -1662,17 +1870,58 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
* Accepts optional item_id to fetch a specific task directly
*/
async function nextAction(itemId: string | undefined, options: IssueOptions): Promise<void> {
const queue = readActiveQueue();
// Support both old (tasks) and new (solutions) queue format
const items = queue.solutions || queue.tasks || [];
let nextItem: typeof items[0] | undefined;
let queue: Queue;
let items: QueueItem[];
// Determine which queue(s) to use
if (options.queue) {
// Explicit queue specified
const targetQueue = readQueue(options.queue);
if (!targetQueue) {
console.log(JSON.stringify({ status: 'error', message: `Queue ${options.queue} not found` }));
return;
}
queue = targetQueue;
items = queue.solutions || queue.tasks || [];
} else {
// Multi-queue: iterate active queues in priority order (serialized execution)
const activeQueues = getActiveQueues();
if (activeQueues.length === 0) {
console.log(JSON.stringify({ status: 'empty', message: 'No active queues' }));
return;
}
// Find first queue with incomplete items (serialized: complete Q1 before Q2)
let foundQueue: Queue | null = null;
for (const q of activeQueues) {
const queueItems = q.solutions || q.tasks || [];
const hasIncomplete = queueItems.some(i =>
i.status === 'pending' || i.status === 'executing'
);
if (hasIncomplete) {
foundQueue = q;
break;
}
}
if (!foundQueue) {
console.log(JSON.stringify({ status: 'empty', message: 'All queues completed' }));
return;
}
queue = foundQueue;
items = queue.solutions || queue.tasks || [];
}
let nextItem: QueueItem | undefined;
let isResume = false;
// If specific item_id provided, fetch that item directly
if (itemId) {
nextItem = items.find(t => t.item_id === itemId);
if (!nextItem) {
console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found` }));
console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found in queue ${queue.id}` }));
return;
}
if (nextItem.status === 'completed') {
@@ -1701,6 +1950,7 @@ async function nextAction(itemId: string | undefined, options: IssueOptions): Pr
console.log(JSON.stringify({
status: 'empty',
message: 'No ready items',
queue_id: queue.id,
queue_status: queue._metadata
}, null, 2));
return;
@@ -1748,6 +1998,7 @@ async function nextAction(itemId: string | undefined, options: IssueOptions): Pr
const totalMinutes = solution.tasks?.reduce((sum, t) => sum + (t.estimated_minutes || 30), 0) || 30;
console.log(JSON.stringify({
queue_id: queue.id,
item_id: nextItem.item_id,
issue_id: nextItem.issue_id,
solution_id: nextItem.solution_id,
@@ -1783,10 +2034,30 @@ async function detailAction(itemId: string | undefined, options: IssueOptions):
return;
}
const queue = readActiveQueue();
// Support both old (tasks) and new (solutions) queue format
const items = queue.solutions || queue.tasks || [];
const queueItem = items.find(t => t.item_id === itemId);
let queue: Queue;
let queueItem: QueueItem | undefined;
// Use explicit queue if provided, otherwise auto-detect
if (options.queue) {
const targetQueue = readQueue(options.queue);
if (!targetQueue) {
console.log(JSON.stringify({ status: 'error', message: `Queue ${options.queue} not found` }));
return;
}
queue = targetQueue;
const items = queue.solutions || queue.tasks || [];
queueItem = items.find(t => t.item_id === itemId);
} else {
// Auto-detect queue from item ID
const found = findItemQueue(itemId);
if (found) {
queue = found.queue;
queueItem = found.item;
} else {
console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found in any queue` }));
return;
}
}
if (!queueItem) {
console.log(JSON.stringify({ status: 'error', message: `Item ${itemId} not found` }));
@@ -1806,6 +2077,7 @@ async function detailAction(itemId: string | undefined, options: IssueOptions):
// Return FULL SOLUTION with all tasks (READ-ONLY - no status update)
console.log(JSON.stringify({
queue_id: queue.id,
item_id: queueItem.item_id,
issue_id: queueItem.issue_id,
solution_id: queueItem.solution_id,
@@ -1827,21 +2099,41 @@ async function detailAction(itemId: string | undefined, options: IssueOptions):
/**
* done - Mark task completed or failed
*/
async function doneAction(queueId: string | undefined, options: IssueOptions): Promise<void> {
if (!queueId) {
async function doneAction(queueItemId: string | undefined, options: IssueOptions): Promise<void> {
if (!queueItemId) {
console.error(chalk.red('Item ID is required'));
console.error(chalk.gray('Usage: ccw issue done <item-id> [--fail] [--reason "..."]'));
console.error(chalk.gray('Usage: ccw issue done <item-id> [--fail] [--reason "..."] [--queue <queue-id>]'));
process.exit(1);
}
const queue = readActiveQueue();
// Support both old (tasks) and new (solutions) queue format
const items = queue.solutions || queue.tasks || [];
const idx = items.findIndex(q => q.item_id === queueId);
let queue: Queue;
let items: QueueItem[];
let idx: number;
if (idx === -1) {
console.error(chalk.red(`Queue item "${queueId}" not found`));
process.exit(1);
// Use explicit queue if provided, otherwise auto-detect
if (options.queue) {
const targetQueue = readQueue(options.queue);
if (!targetQueue) {
console.error(chalk.red(`Queue "${options.queue}" not found`));
process.exit(1);
}
queue = targetQueue;
items = queue.solutions || queue.tasks || [];
idx = items.findIndex(q => q.item_id === queueItemId);
if (idx === -1) {
console.error(chalk.red(`Queue item "${queueItemId}" not found in queue ${options.queue}`));
process.exit(1);
}
} else {
// Auto-detect queue from item ID
const found = findItemQueue(queueItemId);
if (!found) {
console.error(chalk.red(`Queue item "${queueItemId}" not found in any queue`));
process.exit(1);
}
queue = found.queue;
items = queue.solutions || queue.tasks || [];
idx = found.itemIndex;
}
const isFail = options.fail;
@@ -1849,7 +2141,9 @@ async function doneAction(queueId: string | undefined, options: IssueOptions): P
items[idx].completed_at = new Date().toISOString();
if (isFail) {
items[idx].failure_reason = options.reason || 'Unknown failure';
const reason = options.reason || 'Unknown failure';
items[idx].failure_reason = reason; // Backward compat
items[idx].failure_details = parseFailureReason(reason); // Structured failure
} else if (options.result) {
try {
items[idx].result = JSON.parse(options.result);
@@ -1863,10 +2157,10 @@ async function doneAction(queueId: string | undefined, options: IssueOptions): P
if (isFail) {
updateIssue(issueId, { status: 'failed' });
console.log(chalk.red(`${queueId} failed`));
console.log(chalk.red(`${queueItemId} failed`));
} else {
updateIssue(issueId, { status: 'completed', completed_at: new Date().toISOString() });
console.log(chalk.green(`${queueId} completed`));
console.log(chalk.green(`${queueItemId} completed`));
console.log(chalk.green(`✓ Issue ${issueId} completed`));
}
@@ -1895,53 +2189,81 @@ async function doneAction(queueId: string | undefined, options: IssueOptions): P
* retry - Reset failed items to pending for re-execution
*/
async function retryAction(issueId: string | undefined, options: IssueOptions): Promise<void> {
const queue = readActiveQueue();
// Support both old (tasks) and new (solutions) queue format
const items = queue.solutions || queue.tasks || [];
let queues: Queue[];
if (!queue.id || items.length === 0) {
console.log(chalk.yellow('No active queue'));
// Use explicit queue if provided, otherwise use all active queues
if (options.queue) {
const targetQueue = readQueue(options.queue);
if (!targetQueue) {
console.log(chalk.red(`Queue "${options.queue}" not found`));
return;
}
queues = [targetQueue];
} else {
queues = getActiveQueues();
}
if (queues.length === 0) {
console.log(chalk.yellow('No active queues'));
return;
}
let updated = 0;
let totalUpdated = 0;
for (const item of items) {
// Retry failed items only
if (item.status === 'failed') {
if (!issueId || item.issue_id === issueId) {
item.status = 'pending';
item.failure_reason = undefined;
item.started_at = undefined;
item.completed_at = undefined;
updated++;
for (const queue of queues) {
const items = queue.solutions || queue.tasks || [];
let queueUpdated = 0;
for (const item of items) {
// Retry failed items only
if (item.status === 'failed') {
if (!issueId || item.issue_id === issueId) {
// Preserve failure history before resetting
if (item.failure_details) {
if (!item.failure_history) {
item.failure_history = [];
}
item.failure_history.push(item.failure_details);
}
// Reset for retry
item.status = 'pending';
item.failure_reason = undefined;
item.failure_details = undefined;
item.started_at = undefined;
item.completed_at = undefined;
queueUpdated++;
}
}
}
if (queueUpdated > 0) {
// Reset queue status if it was failed
if (queue.status === 'failed') {
queue.status = 'active';
}
// Write back to queue
if (queue.solutions) {
queue.solutions = items;
} else {
queue.tasks = items;
}
writeQueue(queue);
totalUpdated += queueUpdated;
}
}
if (updated === 0) {
if (totalUpdated === 0) {
console.log(chalk.yellow('No failed items to retry'));
return;
}
// Reset queue status if it was failed
if (queue.status === 'failed') {
queue.status = 'active';
}
// Write back to queue
if (queue.solutions) {
queue.solutions = items;
} else {
queue.tasks = items;
}
writeQueue(queue);
if (issueId) {
updateIssue(issueId, { status: 'queued' });
}
console.log(chalk.green(`✓ Reset ${updated} item(s) to pending`));
console.log(chalk.green(`✓ Reset ${totalUpdated} item(s) to pending (failure history preserved)`));
}
// ============ Main Entry ============
@@ -2025,16 +2347,18 @@ export async function issueCommand(
console.log(chalk.gray(' queue list List all queues (history)'));
console.log(chalk.gray(' queue add <issue-id> Add issue to active queue (or create new)'));
console.log(chalk.gray(' queue switch <queue-id> Switch active queue'));
console.log(chalk.gray(' queue dag Get dependency graph (JSON) for parallel execution'));
console.log(chalk.gray(' queue activate <q1,q2,...> Activate multiple queues (comma-separated)'));
console.log(chalk.gray(' queue priority <queue-id> Set queue priority (--priority N, lower=higher)'));
console.log(chalk.gray(' queue dag [--queue <id>] Get dependency graph (JSON) for parallel execution'));
console.log(chalk.gray(' queue archive Archive current queue'));
console.log(chalk.gray(' queue delete <queue-id> Delete queue from history'));
console.log(chalk.gray(' retry [issue-id] Retry failed tasks'));
console.log(chalk.gray(' retry [issue-id] [--queue <id>] Retry failed tasks'));
console.log();
console.log(chalk.bold('Execution Endpoints:'));
console.log(chalk.gray(' next [item-id] Get & mark task executing (JSON)'));
console.log(chalk.gray(' detail <item-id> Get task details (READ-ONLY, for parallel)'));
console.log(chalk.gray(' done <item-id> Mark task completed'));
console.log(chalk.gray(' done <item-id> --fail Mark task failed'));
console.log(chalk.gray(' next [item-id] [--queue <id>] Get & mark task executing (JSON)'));
console.log(chalk.gray(' detail <item-id> [--queue <id>] Get task details (READ-ONLY, for parallel)'));
console.log(chalk.gray(' done <item-id> [--queue <id>] Mark task completed'));
console.log(chalk.gray(' done <item-id> --fail --reason "." Mark task failed with reason (supports JSON)'));
console.log();
console.log(chalk.bold('Options:'));
console.log(chalk.gray(' --title <title> Issue/task title'));
@@ -2042,7 +2366,9 @@ export async function issueCommand(
console.log(chalk.gray(' --brief Brief JSON output (minimal fields)'));
console.log(chalk.gray(' --solution <path> Solution JSON file'));
console.log(chalk.gray(' --result <json> Execution result'));
console.log(chalk.gray(' --reason <text> Failure reason'));
console.log(chalk.gray(' --reason <text> Failure reason (string or JSON)'));
console.log(chalk.gray(' --queue <queue-id> Target queue for multi-queue operations'));
console.log(chalk.gray(' --priority <n> Queue priority (lower = higher)'));
console.log(chalk.gray(' --json JSON output'));
console.log(chalk.gray(' --force Force operation'));
console.log();