feat(queue): 添加队列合并功能,支持跳过重复项并标记源队列为已合并

This commit is contained in:
catlog22
2026-01-19 15:35:41 +08:00
parent e58c33fb6e
commit eeaefa7208
5 changed files with 592 additions and 59 deletions

View File

@@ -650,6 +650,125 @@ function createEmptyQueue(): Queue {
};
}
interface MergeResult {
success: boolean;
itemsMerged: number;
totalItems: number;
skippedDuplicates: number;
reason?: string;
}
/**
* Merge items from source queue into target queue
* - Skips duplicate items (same issue_id + solution_id)
* - Re-generates item IDs for merged items
* - Marks source queue as 'merged' with metadata (or deletes if deleteSource=true)
* - Updates queue index
*/
function mergeQueues(target: Queue, source: Queue, options?: { deleteSource?: boolean }): MergeResult {
const sourceItems = source.solutions || source.tasks || [];
const targetItems = target.solutions || target.tasks || [];
if (sourceItems.length === 0) {
return { success: false, itemsMerged: 0, totalItems: targetItems.length, skippedDuplicates: 0, reason: 'Source queue is empty' };
}
// Ensure target has solutions array
if (!target.solutions) {
target.solutions = [];
}
let itemsMerged = 0;
let skippedDuplicates = 0;
for (const sourceItem of sourceItems) {
// Skip if already exists in target (same issue_id + solution_id)
const exists = target.solutions.some(
t => t.issue_id === sourceItem.issue_id && t.solution_id === sourceItem.solution_id
);
if (exists) {
skippedDuplicates++;
continue;
}
// Add issue to target's issue_ids if not present
if (!target.issue_ids.includes(sourceItem.issue_id)) {
target.issue_ids.push(sourceItem.issue_id);
}
// Clone and add item with new item_id
const newItem: QueueItem = {
...sourceItem,
item_id: generateQueueItemId(target, 'solution'),
execution_order: target.solutions.length + 1
};
target.solutions.push(newItem);
itemsMerged++;
}
// Merge conflicts if any
if (source.conflicts && source.conflicts.length > 0) {
if (!target.conflicts) target.conflicts = [];
target.conflicts.push(...source.conflicts);
}
// Write updated target queue
writeQueue(target);
// Handle source queue: delete or mark as merged
const index = readQueueIndex();
if (options?.deleteSource) {
// Delete source queue file and remove from index
const queuePath = join(getQueuesDir(), `${source.id}.json`);
if (existsSync(queuePath)) {
unlinkSync(queuePath);
}
index.queues = index.queues.filter(q => q.id !== source.id);
} else {
// Mark source queue as merged
source.status = 'merged' as any;
if (!source._metadata) {
source._metadata = {
version: '2.1',
total_tasks: 0,
pending_count: 0,
executing_count: 0,
completed_count: 0,
failed_count: 0,
updated_at: new Date().toISOString()
};
}
(source._metadata as any).merged_into = target.id;
(source._metadata as any).merged_at = new Date().toISOString();
writeQueue(source);
const sourceEntry = index.queues.find(q => q.id === source.id);
if (sourceEntry) {
sourceEntry.status = 'merged';
}
}
// Update target entry in index
const targetEntry = index.queues.find(q => q.id === target.id);
if (targetEntry) {
targetEntry.total_solutions = target.solutions.length;
targetEntry.completed_solutions = target.solutions.filter(s => s.status === 'completed').length;
targetEntry.issue_ids = target.issue_ids;
}
writeQueueIndex(index);
return {
success: itemsMerged > 0,
itemsMerged,
totalItems: target.solutions.length,
skippedDuplicates,
reason: itemsMerged === 0 ? 'All items already exist in target queue' : undefined
};
}
// ============ Multi-Queue Helper Functions ============
/**
@@ -1826,6 +1945,58 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
return;
}
// Merge queues: ccw issue queue merge <source-id> --queue <target-id>
if (subAction === 'merge' && issueId) {
const sourceQueueId = issueId; // issueId is actually source queue ID here
const targetQueueId = options.queue; // --queue option
if (!targetQueueId) {
console.error(chalk.red('Target queue ID required'));
console.error(chalk.gray('Usage: ccw issue queue merge <source-id> --queue <target-id>'));
process.exit(1);
}
const sourceQueue = readQueue(sourceQueueId);
const targetQueue = readQueue(targetQueueId);
if (!sourceQueue) {
console.error(chalk.red(`Source queue "${sourceQueueId}" not found`));
process.exit(1);
}
if (!targetQueue) {
console.error(chalk.red(`Target queue "${targetQueueId}" not found`));
process.exit(1);
}
// mergeQueues marks source as 'merged' and updates index
const result = mergeQueues(targetQueue, sourceQueue);
if (options.json) {
console.log(JSON.stringify({
success: result.success,
sourceQueueId,
targetQueueId,
itemsMerged: result.itemsMerged,
skippedDuplicates: result.skippedDuplicates,
totalItems: result.totalItems,
reason: result.reason
}, null, 2));
} else {
if (result.success) {
console.log(chalk.green(`✓ Merged ${result.itemsMerged} items from ${sourceQueueId} into ${targetQueueId}`));
if (result.skippedDuplicates > 0) {
console.log(chalk.gray(` Skipped ${result.skippedDuplicates} duplicate items`));
}
console.log(chalk.gray(` Total items in target: ${result.totalItems}`));
console.log(chalk.gray(` Source queue ${sourceQueueId} marked as 'merged'`));
} else {
console.log(chalk.yellow(`⚠ Merge skipped: ${result.reason}`));
}
}
return;
}
// Archive current queue
if (subAction === 'archive') {
const queue = readActiveQueue();
@@ -1900,32 +2071,12 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
process.exit(1);
}
// Get or create active queue (create new if current is completed/archived)
let queue = readActiveQueue();
const items = queue.solutions || [];
const isNewQueue = items.length === 0 || queue.status !== 'active';
if (queue.status !== 'active') {
// Create new queue if current is not active
queue = createEmptyQueue();
}
// Ensure solutions array exists
if (!queue.solutions) {
queue.solutions = [];
}
// Check if solution already in queue
const exists = queue.solutions.some(q => q.issue_id === issueId && q.solution_id === solution.id);
if (exists) {
console.log(chalk.yellow(`Solution ${solution.id} already in queue`));
return;
}
// Step 1: Create new queue (temporary, not active yet)
const newQueue = createEmptyQueue();
newQueue.solutions = [];
// Add issue to queue's issue list
if (!queue.issue_ids.includes(issueId)) {
queue.issue_ids.push(issueId);
}
newQueue.issue_ids.push(issueId);
// Collect all files touched by this solution
const filesTouched = new Set<string>();
@@ -1936,12 +2087,12 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
}
// Create solution-level queue item (S-N)
queue.solutions.push({
item_id: generateQueueItemId(queue, 'solution'),
newQueue.solutions.push({
item_id: generateQueueItemId(newQueue, 'solution'),
issue_id: issueId,
solution_id: solution.id,
status: 'pending',
execution_order: queue.solutions.length + 1,
execution_order: 1,
execution_group: 'P1',
depends_on: [],
semantic_priority: 0.5,
@@ -1949,13 +2100,76 @@ async function queueAction(subAction: string | undefined, issueId: string | unde
files_touched: Array.from(filesTouched)
});
writeQueue(queue);
// Step 2: Write temporary queue file
writeQueue(newQueue);
updateIssue(issueId, { status: 'queued', queued_at: new Date().toISOString() });
if (isNewQueue) {
console.log(chalk.green(`✓ Created queue ${queue.id}`));
console.log(chalk.green(`✓ Created temporary queue ${newQueue.id}`));
console.log(chalk.gray(` Solution ${solution.id} (${solution.tasks?.length || 0} tasks)`));
// Step 3: Check for existing active queue
const existingQueue = readQueue();
const hasActiveQueue = existingQueue && existingQueue.status === 'active' &&
(existingQueue.solutions?.length || existingQueue.tasks?.length || 0) > 0;
if (!hasActiveQueue || options.force) {
// No active queue or force flag - set new queue as active
const index = readQueueIndex();
index.active_queue_id = newQueue.id;
writeQueueIndex(index);
console.log(chalk.green(`✓ Queue ${newQueue.id} activated`));
return;
}
console.log(chalk.green(`✓ Added solution ${solution.id} (${solution.tasks?.length || 0} tasks) to queue`));
// Step 4: Active queue exists - prompt user
const existingItems = existingQueue!.solutions || existingQueue!.tasks || [];
console.log();
console.log(chalk.cyan(`Active queue exists: ${existingQueue!.id}`));
console.log(chalk.gray(` Issues: ${existingQueue!.issue_ids.join(', ')}`));
console.log(chalk.gray(` Items: ${existingItems.length} (${existingItems.filter(i => i.status === 'completed').length} completed)`));
console.log();
const { action } = await inquirer.prompt([{
type: 'list',
name: 'action',
message: 'How would you like to proceed?',
choices: [
{ name: 'Merge into existing queue', value: 'merge_to_existing' },
{ name: 'Use new queue', value: 'use_new' },
{ name: 'Cancel', value: 'cancel' }
]
}]);
// Step 5: Execute user choice
if (action === 'cancel') {
// Delete temporary queue
const queuePath = join(getQueuesDir(), `${newQueue.id}.json`);
unlinkSync(queuePath);
console.log(chalk.yellow(`✓ New queue deleted, keeping ${existingQueue!.id} active`));
return;
}
if (action === 'use_new') {
// Switch to new queue
const index = readQueueIndex();
index.active_queue_id = newQueue.id;
writeQueueIndex(index);
console.log(chalk.green(`✓ Switched to new queue ${newQueue.id}`));
console.log(chalk.gray(` Previous queue ${existingQueue!.id} remains in history`));
return;
}
if (action === 'merge_to_existing') {
// Merge new → existing, delete temporary queue
const mergeResult = mergeQueues(existingQueue!, newQueue, { deleteSource: true });
console.log(chalk.green(`✓ Merged ${mergeResult.itemsMerged} items into ${existingQueue!.id}`));
if (mergeResult.skippedDuplicates > 0) {
console.log(chalk.gray(` Skipped ${mergeResult.skippedDuplicates} duplicate items`));
}
console.log(chalk.gray(` Temporary queue ${newQueue.id} deleted`));
return;
}
return;
}