feat(issue): add DAG support for parallel execution planning and enhance task fetching

This commit is contained in:
catlog22
2025-12-28 12:04:10 +08:00
parent 35ffd3419e
commit 2c42cefa5a
2 changed files with 306 additions and 383 deletions

View File

@@ -845,6 +845,101 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
return;
}
// DAG - Return dependency graph for parallel execution planning
if (subAction === 'dag') {
const queue = readActiveQueue();
if (!queue.id || queue.tasks.length === 0) {
console.log(JSON.stringify({ error: 'No active queue', nodes: [], edges: [], groups: [] }));
return;
}
// Build DAG nodes
const completedIds = new Set(queue.tasks.filter(t => t.status === 'completed').map(t => t.item_id));
const failedIds = new Set(queue.tasks.filter(t => t.status === 'failed').map(t => t.item_id));
const nodes = queue.tasks.map(task => ({
id: task.item_id,
issue_id: task.issue_id,
task_id: task.task_id,
status: task.status,
executor: task.assigned_executor,
priority: task.semantic_priority,
depends_on: task.depends_on,
// Calculate if ready (dependencies satisfied)
ready: task.status === 'pending' && task.depends_on.every(d => completedIds.has(d)),
blocked_by: task.depends_on.filter(d => !completedIds.has(d) && !failedIds.has(d))
}));
// Build edges for visualization
const edges = queue.tasks.flatMap(task =>
task.depends_on.map(dep => ({ from: dep, to: task.item_id }))
);
// Group ready tasks by execution_group for parallel execution
const readyTasks = nodes.filter(n => n.ready || n.status === 'executing');
const groups: Record<string, string[]> = {};
for (const task of queue.tasks) {
if (readyTasks.some(r => r.id === task.item_id)) {
const group = task.execution_group || 'P1';
if (!groups[group]) groups[group] = [];
groups[group].push(task.item_id);
}
}
// Calculate parallel batches (tasks with no dependencies on each other)
const parallelBatches: string[][] = [];
const remainingReady = new Set(readyTasks.map(t => t.id));
while (remainingReady.size > 0) {
const batch: string[] = [];
const batchFiles = new Set<string>();
for (const taskId of remainingReady) {
const task = queue.tasks.find(t => t.item_id === taskId);
if (!task) continue;
// Check for file conflicts with already-batched tasks
const solution = findSolution(task.issue_id, task.solution_id);
const taskDef = solution?.tasks.find(t => t.id === task.task_id);
const taskFiles = taskDef?.modification_points?.map(mp => mp.file) || [];
const hasConflict = taskFiles.some(f => batchFiles.has(f));
if (!hasConflict) {
batch.push(taskId);
taskFiles.forEach(f => batchFiles.add(f));
}
}
if (batch.length === 0) {
// Fallback: take one at a time if all conflict
const first = Array.from(remainingReady)[0];
batch.push(first);
}
parallelBatches.push(batch);
batch.forEach(id => remainingReady.delete(id));
}
console.log(JSON.stringify({
queue_id: queue.id,
total: nodes.length,
ready_count: readyTasks.length,
completed_count: completedIds.size,
nodes,
edges,
groups: Object.entries(groups).map(([id, tasks]) => ({ id, tasks })),
parallel_batches: parallelBatches,
_summary: {
can_parallel: parallelBatches[0]?.length || 0,
batches_needed: parallelBatches.length
}
}, null, 2));
return;
}
// Archive current queue
if (subAction === 'archive') {
const queue = readActiveQueue();
@@ -998,39 +1093,56 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
/**
* next - Get next ready task for execution (JSON output)
* Accepts optional item_id to fetch a specific task directly
*/
async function nextAction(options: IssueOptions): Promise<void> {
async function nextAction(itemId: string | undefined, options: IssueOptions): Promise<void> {
const queue = readActiveQueue();
let nextItem: typeof queue.tasks[0] | undefined;
let isResume = false;
// Priority 1: Resume executing tasks (interrupted/crashed)
const executingTasks = queue.tasks.filter(item => item.status === 'executing');
// Priority 2: Find pending tasks with satisfied dependencies
const pendingTasks = queue.tasks.filter(item => {
if (item.status !== 'pending') return false;
return item.depends_on.every(depId => {
const dep = queue.tasks.find(q => q.item_id === depId);
return !dep || dep.status === 'completed';
// If specific item_id provided, fetch that task directly
if (itemId) {
nextItem = queue.tasks.find(t => t.item_id === itemId);
if (!nextItem) {
console.log(JSON.stringify({ status: 'error', message: `Task ${itemId} not found` }));
return;
}
if (nextItem.status === 'completed') {
console.log(JSON.stringify({ status: 'completed', message: `Task ${itemId} already completed` }));
return;
}
if (nextItem.status === 'failed') {
console.log(JSON.stringify({ status: 'failed', message: `Task ${itemId} failed, use retry to reset` }));
return;
}
isResume = nextItem.status === 'executing';
} else {
// Auto-select: Priority 1 - executing, Priority 2 - ready pending
const executingTasks = queue.tasks.filter(item => item.status === 'executing');
const pendingTasks = queue.tasks.filter(item => {
if (item.status !== 'pending') return false;
return item.depends_on.every(depId => {
const dep = queue.tasks.find(q => q.item_id === depId);
return !dep || dep.status === 'completed';
});
});
});
// Combine: executing first, then pending
const readyTasks = [...executingTasks, ...pendingTasks];
const readyTasks = [...executingTasks, ...pendingTasks];
if (readyTasks.length === 0) {
console.log(JSON.stringify({
status: 'empty',
message: 'No ready tasks',
queue_status: queue._metadata
}, null, 2));
return;
if (readyTasks.length === 0) {
console.log(JSON.stringify({
status: 'empty',
message: 'No ready tasks',
queue_status: queue._metadata
}, null, 2));
return;
}
readyTasks.sort((a, b) => a.execution_order - b.execution_order);
nextItem = readyTasks[0];
isResume = nextItem.status === 'executing';
}
// Sort by execution order
readyTasks.sort((a, b) => a.execution_order - b.execution_order);
const nextItem = readyTasks[0];
const isResume = nextItem.status === 'executing';
// Load task definition
const solution = findSolution(nextItem.issue_id, nextItem.solution_id);
const taskDef = solution?.tasks.find(t => t.id === nextItem.task_id);
@@ -1054,8 +1166,8 @@ async function nextAction(options: IssueOptions): Promise<void> {
total: queue.tasks.length,
completed: queue.tasks.filter(q => q.status === 'completed').length,
failed: queue.tasks.filter(q => q.status === 'failed').length,
executing: executingTasks.length,
pending: pendingTasks.length
executing: queue.tasks.filter(q => q.status === 'executing').length,
pending: queue.tasks.filter(q => q.status === 'pending').length
};
const remaining = stats.pending + stats.executing;
@@ -1219,7 +1331,7 @@ export async function issueCommand(
await queueAction(argsArray[0], argsArray[1], options);
break;
case 'next':
await nextAction(options);
await nextAction(argsArray[0], options);
break;
case 'done':
await doneAction(argsArray[0], options);
@@ -1252,14 +1364,15 @@ export async function issueCommand(
console.log(chalk.gray(' queue list List all queues (history)'));
console.log(chalk.gray(' queue add <issue-id> Add issue to active queue (or create new)'));
console.log(chalk.gray(' queue switch <queue-id> Switch active queue'));
console.log(chalk.gray(' queue dag Get dependency graph (JSON) for parallel execution'));
console.log(chalk.gray(' queue archive Archive current queue'));
console.log(chalk.gray(' queue delete <queue-id> Delete queue from history'));
console.log(chalk.gray(' retry [issue-id] Retry failed tasks'));
console.log();
console.log(chalk.bold('Execution Endpoints:'));
console.log(chalk.gray(' next Get next ready task (JSON)'));
console.log(chalk.gray(' done <queue-id> Mark task completed'));
console.log(chalk.gray(' done <queue-id> --fail Mark task failed'));
console.log(chalk.gray(' next [item-id] Get task by ID or next ready task (JSON)'));
console.log(chalk.gray(' done <item-id> Mark task completed'));
console.log(chalk.gray(' done <item-id> --fail Mark task failed'));
console.log();
console.log(chalk.bold('Options:'));
console.log(chalk.gray(' --title <title> Issue/task title'));