feat: Implement resume strategy engine and session content parser

- Added `resume-strategy.ts` to determine optimal resume approaches including native, prompt concatenation, and hybrid modes.
- Introduced `determineResumeStrategy` function to evaluate various resume scenarios.
- Created utility functions for building context prefixes and formatting outputs in plain, YAML, and JSON formats.
- Added `session-content-parser.ts` to parse native CLI tool session files supporting Gemini/Qwen JSON and Codex JSONL formats.
- Implemented parsing logic for different session formats, including error handling for invalid lines.
- Provided functions to format conversations and extract user-assistant pairs from parsed sessions.
This commit is contained in:
catlog22
2025-12-13 20:29:19 +08:00
parent 32217f87fd
commit 52935d4b8e
26 changed files with 9387 additions and 86 deletions

View File

@@ -9,6 +9,20 @@ import { spawn, ChildProcess } from 'child_process';
import { existsSync, mkdirSync, readFileSync, writeFileSync, unlinkSync, readdirSync, statSync } from 'fs';
import { join, relative } from 'path';
// Native resume support
import {
trackNewSession,
getNativeResumeArgs,
supportsNativeResume,
calculateProjectHash
} from './native-session-discovery.js';
import {
determineResumeStrategy,
buildContextPrefix,
getResumeModeDescription,
type ResumeDecision
} from './resume-strategy.js';
// CLI History storage path
const CLI_HISTORY_DIR = join(process.cwd(), '.workflow', '.cli-history');
@@ -47,8 +61,13 @@ const ParamsSchema = z.object({
timeout: z.number().default(300000),
resume: z.union([z.boolean(), z.string()]).optional(), // true = last, string = single ID or comma-separated IDs
id: z.string().optional(), // Custom execution ID (e.g., IMPL-001-step1)
noNative: z.boolean().optional(), // Force prompt concatenation instead of native resume
category: z.enum(['user', 'internal', 'insight']).default('user'), // Execution category for tracking
});
// Execution category types
export type ExecutionCategory = 'user' | 'internal' | 'insight';
type Params = z.infer<typeof ParamsSchema>;
// Prompt concatenation format types
@@ -82,6 +101,7 @@ interface ConversationRecord {
tool: string;
model: string;
mode: string;
category: ExecutionCategory; // user | internal | insight
total_duration_ms: number;
turn_count: number;
latest_status: 'success' | 'error' | 'timeout';
@@ -165,6 +185,13 @@ async function checkToolAvailability(tool: string): Promise<ToolAvailability> {
});
}
// Native resume configuration
interface NativeResumeConfig {
enabled: boolean;
sessionId?: string; // Native UUID
isLatest?: boolean; // Use latest/--last flag
}
/**
* Build command arguments based on tool and options
*/
@@ -175,8 +202,9 @@ function buildCommand(params: {
model?: string;
dir?: string;
include?: string;
nativeResume?: NativeResumeConfig;
}): { command: string; args: string[]; useStdin: boolean } {
const { tool, prompt, mode = 'analysis', model, dir, include } = params;
const { tool, prompt, mode = 'analysis', model, dir, include, nativeResume } = params;
let command = tool;
let args: string[] = [];
@@ -185,7 +213,14 @@ function buildCommand(params: {
switch (tool) {
case 'gemini':
// gemini reads from stdin when no positional prompt is provided
// Native resume: gemini -r <uuid> or -r latest
if (nativeResume?.enabled) {
if (nativeResume.isLatest) {
args.push('-r', 'latest');
} else if (nativeResume.sessionId) {
args.push('-r', nativeResume.sessionId);
}
}
if (model) {
args.push('-m', model);
}
@@ -198,7 +233,14 @@ function buildCommand(params: {
break;
case 'qwen':
// qwen reads from stdin when no positional prompt is provided
// Native resume: qwen --continue (latest) or --resume <uuid>
if (nativeResume?.enabled) {
if (nativeResume.isLatest) {
args.push('--continue');
} else if (nativeResume.sessionId) {
args.push('--resume', nativeResume.sessionId);
}
}
if (model) {
args.push('-m', model);
}
@@ -211,26 +253,50 @@ function buildCommand(params: {
break;
case 'codex':
// codex reads from stdin for prompt
args.push('exec');
if (dir) {
args.push('-C', dir);
}
args.push('--full-auto');
if (mode === 'write' || mode === 'auto') {
args.push('--skip-git-repo-check', '-s', 'danger-full-access');
}
if (model) {
args.push('-m', model);
}
if (include) {
// Codex uses --add-dir for additional directories
const dirs = include.split(',').map(d => d.trim()).filter(d => d);
for (const addDir of dirs) {
args.push('--add-dir', addDir);
// Native resume: codex resume <uuid> [prompt] or --last
if (nativeResume?.enabled) {
args.push('resume');
if (nativeResume.isLatest) {
args.push('--last');
} else if (nativeResume.sessionId) {
args.push(nativeResume.sessionId);
}
// Codex resume still supports additional flags
if (dir) {
args.push('-C', dir);
}
if (mode === 'write' || mode === 'auto') {
args.push('--skip-git-repo-check', '-s', 'danger-full-access');
}
if (model) {
args.push('-m', model);
}
if (include) {
const dirs = include.split(',').map(d => d.trim()).filter(d => d);
for (const addDir of dirs) {
args.push('--add-dir', addDir);
}
}
} else {
// Standard exec mode
args.push('exec');
if (dir) {
args.push('-C', dir);
}
args.push('--full-auto');
if (mode === 'write' || mode === 'auto') {
args.push('--skip-git-repo-check', '-s', 'danger-full-access');
}
if (model) {
args.push('-m', model);
}
if (include) {
const dirs = include.split(',').map(d => d.trim()).filter(d => d);
for (const addDir of dirs) {
args.push('--add-dir', addDir);
}
}
}
// Prompt passed via stdin (default)
break;
default:
@@ -310,6 +376,7 @@ function convertToConversation(record: ExecutionRecord): ConversationRecord {
tool: record.tool,
model: record.model,
mode: record.mode,
category: 'user', // Legacy records default to user category
total_duration_ms: record.duration_ms,
turn_count: 1,
latest_status: record.status,
@@ -406,12 +473,15 @@ async function executeCliTool(
throw new Error(`Invalid params: ${parsed.error.message}`);
}
const { tool, prompt, mode, format, model, cd, includeDirs, timeout, resume, id: customId } = parsed.data;
const { tool, prompt, mode, format, model, cd, includeDirs, timeout, resume, id: customId, noNative, category } = parsed.data;
// Determine working directory early (needed for conversation lookup)
const workingDir = cd || process.cwd();
const historyDir = ensureHistoryDir(workingDir);
// Get SQLite store for native session lookup
const store = await getSqliteStore(workingDir);
// Determine conversation ID and load existing conversation
// Logic:
// - If --resume <id1,id2,...> (multiple IDs): merge conversations
@@ -484,14 +554,61 @@ async function executeCliTool(
conversationId = `${Date.now()}-${tool}`;
}
// Determine resume strategy (native vs prompt-concat vs hybrid)
let resumeDecision: ResumeDecision | null = null;
let nativeResumeConfig: NativeResumeConfig | undefined;
// resume=true (latest) - use native latest if supported
if (resume === true && !noNative && supportsNativeResume(tool)) {
resumeDecision = {
strategy: 'native',
isLatest: true,
primaryConversationId: conversationId
};
}
// Use strategy engine for complex scenarios
else if (resumeIds.length > 0 && !noNative) {
resumeDecision = determineResumeStrategy({
tool,
resumeIds,
customId,
forcePromptConcat: noNative,
getNativeSessionId: (ccwId) => store.getNativeSessionId(ccwId),
getConversation: (ccwId) => loadConversation(historyDir, ccwId),
getConversationTool: (ccwId) => {
const conv = loadConversation(historyDir, ccwId);
return conv?.tool || null;
}
});
}
// Configure native resume if strategy decided to use it
if (resumeDecision && (resumeDecision.strategy === 'native' || resumeDecision.strategy === 'hybrid')) {
nativeResumeConfig = {
enabled: true,
sessionId: resumeDecision.nativeSessionId,
isLatest: resumeDecision.isLatest
};
}
// Build final prompt with conversation context
// For merge: use merged context from all source conversations
// For fork: use contextConversation (from resume ID) for prompt context
// For append: use existingConversation (from target ID)
// For native: minimal prompt (native tool handles context)
// For hybrid: context prefix from other conversations + new prompt
// For prompt-concat: full multi-turn prompt
let finalPrompt = prompt;
if (mergeResult && mergeResult.mergedTurns.length > 0) {
if (resumeDecision?.strategy === 'native') {
// Native mode: just use the new prompt, tool handles context
finalPrompt = prompt;
} else if (resumeDecision?.strategy === 'hybrid' && resumeDecision.contextTurns?.length) {
// Hybrid mode: add context prefix from other conversations
const contextPrefix = buildContextPrefix(resumeDecision.contextTurns, format);
finalPrompt = contextPrefix + prompt;
} else if (mergeResult && mergeResult.mergedTurns.length > 0) {
// Full merge: use merged prompt
finalPrompt = buildMergedPrompt(mergeResult, prompt, format);
} else {
// Standard prompt-concat
const conversationForContext = contextConversation || existingConversation;
if (conversationForContext && conversationForContext.turns.length > 0) {
finalPrompt = buildMultiTurnPrompt(conversationForContext, prompt, format);
@@ -504,6 +621,14 @@ async function executeCliTool(
throw new Error(`CLI tool not available: ${tool}. Please ensure it is installed and in PATH.`);
}
// Log resume mode for debugging
if (resumeDecision) {
const modeDesc = getResumeModeDescription(resumeDecision);
if (onOutput) {
onOutput({ type: 'stderr', data: `[Resume mode: ${modeDesc}]\n` });
}
}
// Build command
const { command, args, useStdin } = buildCommand({
tool,
@@ -511,7 +636,8 @@ async function executeCliTool(
mode,
model,
dir: cd,
include: includeDirs
include: includeDirs,
nativeResume: nativeResumeConfig
});
const startTime = Date.now();
@@ -668,6 +794,7 @@ async function executeCliTool(
tool,
model: model || 'default',
mode,
category,
total_duration_ms: mergeResult.totalDuration + duration,
turn_count: mergedTurns.length + 1,
latest_status: status,
@@ -697,6 +824,7 @@ async function executeCliTool(
tool,
model: model || 'default',
mode,
category,
total_duration_ms: duration,
turn_count: 1,
latest_status: status,
@@ -711,6 +839,29 @@ async function executeCliTool(
}
}
// Track native session after execution (async, non-blocking)
trackNewSession(tool, new Date(startTime), workingDir)
.then((nativeSession) => {
if (nativeSession) {
// Save native session mapping
try {
store.saveNativeSessionMapping({
ccw_id: conversationId,
tool,
native_session_id: nativeSession.sessionId,
native_session_path: nativeSession.filePath,
project_hash: nativeSession.projectHash,
created_at: new Date().toISOString()
});
} catch (err) {
console.error('[CLI Executor] Failed to save native session mapping:', (err as Error).message);
}
}
})
.catch((err) => {
console.error('[CLI Executor] Failed to track native session:', (err as Error).message);
});
// Create legacy execution record for backward compatibility
const execution: ExecutionRecord = {
id: conversationId,
@@ -860,6 +1011,7 @@ export async function getExecutionHistoryAsync(baseDir: string, options: {
limit?: number;
tool?: string | null;
status?: string | null;
category?: ExecutionCategory | null;
search?: string | null;
recursive?: boolean;
} = {}): Promise<{
@@ -867,7 +1019,7 @@ export async function getExecutionHistoryAsync(baseDir: string, options: {
count: number;
executions: (HistoryIndex['executions'][0] & { sourceDir?: string })[];
}> {
const { limit = 50, tool = null, status = null, search = null, recursive = false } = options;
const { limit = 50, tool = null, status = null, category = null, search = null, recursive = false } = options;
if (recursive) {
// For recursive, we need to check multiple directories
@@ -878,7 +1030,7 @@ export async function getExecutionHistoryAsync(baseDir: string, options: {
for (const historyDir of historyDirs) {
const dirBase = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
const store = await getSqliteStore(dirBase);
const result = store.getHistory({ limit: 100, tool, status, search });
const result = store.getHistory({ limit: 100, tool, status, category, search });
totalCount += result.total;
const relativeSource = relative(baseDir, dirBase) || '.';
@@ -898,7 +1050,7 @@ export async function getExecutionHistoryAsync(baseDir: string, options: {
}
const store = await getSqliteStore(baseDir);
return store.getHistory({ limit, tool, status, search });
return store.getHistory({ limit, tool, status, category, search });
}
/**
@@ -1447,6 +1599,61 @@ export function getLatestExecution(baseDir: string, tool?: string): ExecutionRec
return getExecutionDetail(baseDir, history.executions[0].id);
}
// ========== Native Session Content Functions ==========
/**
* Get native session content by CCW ID
* Parses the native session file and returns full conversation data
*/
export async function getNativeSessionContent(baseDir: string, ccwId: string) {
const store = await getSqliteStore(baseDir);
return store.getNativeSessionContent(ccwId);
}
/**
* Get formatted native conversation text
*/
export async function getFormattedNativeConversation(baseDir: string, ccwId: string, options?: {
includeThoughts?: boolean;
includeToolCalls?: boolean;
includeTokens?: boolean;
maxContentLength?: number;
}) {
const store = await getSqliteStore(baseDir);
return store.getFormattedNativeConversation(ccwId, options);
}
/**
* Get conversation pairs from native session
*/
export async function getNativeConversationPairs(baseDir: string, ccwId: string) {
const store = await getSqliteStore(baseDir);
return store.getNativeConversationPairs(ccwId);
}
/**
* Get enriched conversation (CCW + native session merged)
*/
export async function getEnrichedConversation(baseDir: string, ccwId: string) {
const store = await getSqliteStore(baseDir);
return store.getEnrichedConversation(ccwId);
}
/**
* Get history with native session info
*/
export async function getHistoryWithNativeInfo(baseDir: string, options?: {
limit?: number;
offset?: number;
tool?: string | null;
status?: string | null;
category?: ExecutionCategory | null;
search?: string | null;
}) {
const store = await getSqliteStore(baseDir);
return store.getHistoryWithNativeInfo(options || {});
}
// Export types
export type { ConversationRecord, ConversationTurn, ExecutionRecord, PromptFormat, ConcatOptions };