feat: Implement SQLite storage for CLI execution history

- Introduced a new SQLite-based storage backend for managing CLI execution history.
- Added `CliHistoryStore` class to handle conversation records and turns with efficient queries.
- Migrated existing JSON history files to the new SQLite format.
- Updated CLI executor to use asynchronous and synchronous methods for saving and loading conversations.
- Enhanced execution history retrieval with support for filtering by tool, status, and search terms.
- Added prompt concatenation utilities to build multi-turn prompts in various formats (plain, YAML, JSON).
- Implemented batch deletion of conversations and improved error handling for database operations.
This commit is contained in:
catlog22
2025-12-13 14:53:53 +08:00
parent 37417caca2
commit 029384c427
9 changed files with 2380 additions and 279 deletions

View File

@@ -12,6 +12,29 @@ import { join, relative } from 'path';
// CLI History storage path
const CLI_HISTORY_DIR = join(process.cwd(), '.workflow', '.cli-history');
// Lazy-loaded SQLite store module
let sqliteStoreModule: typeof import('./cli-history-store.js') | null = null;
/**
* Get or initialize SQLite store (async)
*/
async function getSqliteStore(baseDir: string) {
if (!sqliteStoreModule) {
sqliteStoreModule = await import('./cli-history-store.js');
}
return sqliteStoreModule.getHistoryStore(baseDir);
}
/**
* Get SQLite store (sync - uses cached module)
*/
function getSqliteStoreSync(baseDir: string) {
if (!sqliteStoreModule) {
throw new Error('SQLite store not initialized. Call an async function first.');
}
return sqliteStoreModule.getHistoryStore(baseDir);
}
// Define Zod schema for validation
const ParamsSchema = z.object({
tool: z.enum(['gemini', 'qwen', 'codex']),
@@ -240,85 +263,51 @@ function loadHistoryIndex(historyDir: string): HistoryIndex {
}
/**
* Save conversation to history (create new or append turn)
* Save conversation to SQLite
*/
function saveConversation(historyDir: string, conversation: ConversationRecord): void {
// Create date-based subdirectory using created_at date
const dateStr = conversation.created_at.split('T')[0];
const dateDir = join(historyDir, dateStr);
if (!existsSync(dateDir)) {
mkdirSync(dateDir, { recursive: true });
}
// Save conversation record
const filename = `${conversation.id}.json`;
writeFileSync(join(dateDir, filename), JSON.stringify(conversation, null, 2), 'utf8');
// Update index
const index = loadHistoryIndex(historyDir);
// Check if this conversation already exists in index
const existingIdx = index.executions.findIndex(e => e.id === conversation.id);
const latestTurn = conversation.turns[conversation.turns.length - 1];
const indexEntry = {
id: conversation.id,
timestamp: conversation.created_at,
updated_at: conversation.updated_at,
tool: conversation.tool,
status: conversation.latest_status,
duration_ms: conversation.total_duration_ms,
turn_count: conversation.turn_count,
prompt_preview: latestTurn.prompt.substring(0, 100) + (latestTurn.prompt.length > 100 ? '...' : '')
};
if (existingIdx >= 0) {
// Update existing entry and move to top
index.executions.splice(existingIdx, 1);
index.executions.unshift(indexEntry);
} else {
// Add new entry
index.total_executions++;
index.executions.unshift(indexEntry);
}
if (index.executions.length > 100) {
index.executions = index.executions.slice(0, 100);
}
writeFileSync(join(historyDir, 'index.json'), JSON.stringify(index, null, 2), 'utf8');
async function saveConversationAsync(historyDir: string, conversation: ConversationRecord): Promise<void> {
const baseDir = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
const store = await getSqliteStore(baseDir);
store.saveConversation(conversation);
}
/**
* Load existing conversation by ID
* Sync wrapper for saveConversation (uses cached SQLite module)
*/
function saveConversation(historyDir: string, conversation: ConversationRecord): void {
const baseDir = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
try {
const store = getSqliteStoreSync(baseDir);
store.saveConversation(conversation);
} catch {
// If sync not available, queue for async save
saveConversationAsync(historyDir, conversation).catch(err => {
console.error('[CLI Executor] Failed to save conversation:', err.message);
});
}
}
/**
* Load existing conversation by ID from SQLite
*/
async function loadConversationAsync(historyDir: string, conversationId: string): Promise<ConversationRecord | null> {
const baseDir = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
const store = await getSqliteStore(baseDir);
return store.getConversation(conversationId);
}
/**
* Sync wrapper for loadConversation (uses cached SQLite module)
*/
function loadConversation(historyDir: string, conversationId: string): ConversationRecord | null {
// Search in all date directories
if (existsSync(historyDir)) {
const dateDirs = readdirSync(historyDir).filter(d => {
const dirPath = join(historyDir, d);
return statSync(dirPath).isDirectory() && /^\d{4}-\d{2}-\d{2}$/.test(d);
});
// Search newest first
for (const dateDir of dateDirs.sort().reverse()) {
const filePath = join(historyDir, dateDir, `${conversationId}.json`);
if (existsSync(filePath)) {
try {
const data = JSON.parse(readFileSync(filePath, 'utf8'));
// Check if it's a conversation record (has turns array)
if (data.turns && Array.isArray(data.turns)) {
return data as ConversationRecord;
}
// Convert legacy ExecutionRecord to ConversationRecord
return convertToConversation(data);
} catch {
continue;
}
}
}
const baseDir = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
try {
const store = getSqliteStoreSync(baseDir);
return store.getConversation(conversationId);
} catch {
// SQLite not initialized yet, return null
return null;
}
return null;
}
/**
@@ -880,7 +869,55 @@ function findCliHistoryDirs(baseDir: string, maxDepth: number = 3): string[] {
}
/**
* Get execution history
* Get execution history from SQLite
*/
export async function getExecutionHistoryAsync(baseDir: string, options: {
limit?: number;
tool?: string | null;
status?: string | null;
search?: string | null;
recursive?: boolean;
} = {}): Promise<{
total: number;
count: number;
executions: (HistoryIndex['executions'][0] & { sourceDir?: string })[];
}> {
const { limit = 50, tool = null, status = null, search = null, recursive = false } = options;
if (recursive) {
// For recursive, we need to check multiple directories
const historyDirs = findCliHistoryDirs(baseDir);
let allExecutions: (HistoryIndex['executions'][0] & { sourceDir?: string })[] = [];
let totalCount = 0;
for (const historyDir of historyDirs) {
const dirBase = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
const store = await getSqliteStore(dirBase);
const result = store.getHistory({ limit: 100, tool, status, search });
totalCount += result.total;
const relativeSource = relative(baseDir, dirBase) || '.';
for (const exec of result.executions) {
allExecutions.push({ ...exec, sourceDir: relativeSource });
}
}
// Sort by timestamp (newest first)
allExecutions.sort((a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime());
return {
total: totalCount,
count: Math.min(allExecutions.length, limit),
executions: allExecutions.slice(0, limit)
};
}
const store = await getSqliteStore(baseDir);
return store.getHistory({ limit, tool, status, search });
}
/**
* Get execution history (sync version - uses cached SQLite module)
*/
export function getExecutionHistory(baseDir: string, options: {
limit?: number;
@@ -894,54 +931,39 @@ export function getExecutionHistory(baseDir: string, options: {
} {
const { limit = 50, tool = null, status = null, recursive = false } = options;
let allExecutions: (HistoryIndex['executions'][0] & { sourceDir?: string })[] = [];
let totalCount = 0;
try {
if (recursive) {
const historyDirs = findCliHistoryDirs(baseDir);
let allExecutions: (HistoryIndex['executions'][0] & { sourceDir?: string })[] = [];
let totalCount = 0;
if (recursive) {
// Find all CLI history directories in subdirectories
const historyDirs = findCliHistoryDirs(baseDir);
for (const historyDir of historyDirs) {
const dirBase = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
const store = getSqliteStoreSync(dirBase);
const result = store.getHistory({ limit: 100, tool, status });
totalCount += result.total;
for (const historyDir of historyDirs) {
const index = loadHistoryIndex(historyDir);
totalCount += index.total_executions;
// Add source directory info to each execution
const sourceDir = historyDir.replace(/[\\\/]\.workflow[\\\/]\.cli-history$/, '');
const relativeSource = relative(baseDir, sourceDir) || '.';
for (const exec of index.executions) {
allExecutions.push({ ...exec, sourceDir: relativeSource });
const relativeSource = relative(baseDir, dirBase) || '.';
for (const exec of result.executions) {
allExecutions.push({ ...exec, sourceDir: relativeSource });
}
}
allExecutions.sort((a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime());
return {
total: totalCount,
count: Math.min(allExecutions.length, limit),
executions: allExecutions.slice(0, limit)
};
}
// Sort by timestamp (newest first)
allExecutions.sort((a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime());
} else {
// Original behavior - single directory
const historyDir = join(baseDir, '.workflow', '.cli-history');
const index = loadHistoryIndex(historyDir);
totalCount = index.total_executions;
allExecutions = index.executions;
const store = getSqliteStoreSync(baseDir);
return store.getHistory({ limit, tool, status });
} catch {
// SQLite not initialized, return empty
return { total: 0, count: 0, executions: [] };
}
// Filter by tool
if (tool) {
allExecutions = allExecutions.filter(e => e.tool === tool);
}
// Filter by status
if (status) {
allExecutions = allExecutions.filter(e => e.status === status);
}
// Limit results
const executions = allExecutions.slice(0, limit);
return {
total: totalCount,
count: executions.length,
executions
};
}
/**
@@ -976,38 +998,37 @@ export function getExecutionDetail(baseDir: string, executionId: string): Execut
}
/**
* Delete execution by ID
* Delete execution by ID (async version)
*/
export async function deleteExecutionAsync(baseDir: string, executionId: string): Promise<{ success: boolean; error?: string }> {
const store = await getSqliteStore(baseDir);
return store.deleteConversation(executionId);
}
/**
* Delete execution by ID (sync version - uses cached SQLite module)
*/
export function deleteExecution(baseDir: string, executionId: string): { success: boolean; error?: string } {
const historyDir = join(baseDir, '.workflow', '.cli-history');
// Parse date from execution ID
const timestamp = parseInt(executionId.split('-')[0], 10);
const date = new Date(timestamp);
const dateStr = date.toISOString().split('T')[0];
const filePath = join(historyDir, dateStr, `${executionId}.json`);
// Delete the execution file
if (existsSync(filePath)) {
try {
unlinkSync(filePath);
} catch (err) {
return { success: false, error: `Failed to delete file: ${(err as Error).message}` };
}
}
// Update index
try {
const index = loadHistoryIndex(historyDir);
index.executions = index.executions.filter(e => e.id !== executionId);
index.total_executions = Math.max(0, index.total_executions - 1);
writeFileSync(join(historyDir, 'index.json'), JSON.stringify(index, null, 2), 'utf8');
} catch (err) {
return { success: false, error: `Failed to update index: ${(err as Error).message}` };
const store = getSqliteStoreSync(baseDir);
return store.deleteConversation(executionId);
} catch {
return { success: false, error: 'SQLite store not initialized' };
}
}
return { success: true };
/**
* Batch delete executions (async)
*/
export async function batchDeleteExecutionsAsync(baseDir: string, ids: string[]): Promise<{
success: boolean;
deleted: number;
total: number;
errors?: string[];
}> {
const store = await getSqliteStore(baseDir);
const result = store.batchDelete(ids);
return { ...result, total: ids.length };
}
/**
@@ -1024,31 +1045,367 @@ export async function getCliToolsStatus(): Promise<Record<string, ToolAvailabili
return results;
}
// ========== Prompt Concatenation System ==========
/**
* Build multi-turn prompt with full conversation history
* Supported prompt concatenation formats
*/
function buildMultiTurnPrompt(conversation: ConversationRecord, newPrompt: string): string {
const parts: string[] = [];
type PromptFormat = 'plain' | 'yaml' | 'json';
parts.push('=== CONVERSATION HISTORY ===');
parts.push('');
/**
* Turn data structure for concatenation
*/
interface TurnData {
turn: number;
timestamp?: string;
role: 'user' | 'assistant';
content: string;
status?: string;
duration_ms?: number;
source_id?: string; // For merged conversations
}
// Add all previous turns
for (const turn of conversation.turns) {
parts.push(`--- Turn ${turn.turn} ---`);
parts.push('USER:');
parts.push(turn.prompt);
parts.push('');
parts.push('ASSISTANT:');
parts.push(turn.output.stdout || '[No output recorded]');
parts.push('');
/**
* Prompt concatenation options
*/
interface ConcatOptions {
format: PromptFormat;
includeMetadata?: boolean;
includeTurnMarkers?: boolean;
maxOutputLength?: number; // Truncate output for context efficiency
}
/**
* PromptConcatenator - Dedicated class for building multi-turn prompts
* Supports multiple output formats: plain text, YAML, JSON
*/
class PromptConcatenator {
private turns: TurnData[] = [];
private options: ConcatOptions;
private metadata: Record<string, unknown> = {};
constructor(options: Partial<ConcatOptions> = {}) {
this.options = {
format: options.format || 'plain',
includeMetadata: options.includeMetadata ?? true,
includeTurnMarkers: options.includeTurnMarkers ?? true,
maxOutputLength: options.maxOutputLength || 8192
};
}
parts.push('=== NEW REQUEST ===');
parts.push('');
parts.push(newPrompt);
/**
* Set metadata for the conversation
*/
setMetadata(key: string, value: unknown): this {
this.metadata[key] = value;
return this;
}
return parts.join('\n');
/**
* Add a user turn
*/
addUserTurn(content: string, options: Partial<Omit<TurnData, 'role' | 'content'>> = {}): this {
this.turns.push({
turn: this.turns.length + 1,
role: 'user',
content,
...options
});
return this;
}
/**
* Add an assistant turn
*/
addAssistantTurn(content: string, options: Partial<Omit<TurnData, 'role' | 'content'>> = {}): this {
// Truncate output if needed
const truncatedContent = content.length > this.options.maxOutputLength!
? content.substring(0, this.options.maxOutputLength!) + '\n... [truncated]'
: content;
this.turns.push({
turn: this.turns.length + 1,
role: 'assistant',
content: truncatedContent,
...options
});
return this;
}
/**
* Add a conversation turn from ConversationTurn
*/
addFromConversationTurn(turn: ConversationTurn, sourceId?: string): this {
this.addUserTurn(turn.prompt, {
turn: turn.turn * 2 - 1,
timestamp: turn.timestamp,
source_id: sourceId
});
this.addAssistantTurn(turn.output.stdout || '[No output]', {
turn: turn.turn * 2,
timestamp: turn.timestamp,
status: turn.status,
duration_ms: turn.duration_ms,
source_id: sourceId
});
return this;
}
/**
* Load turns from an existing conversation
*/
loadConversation(conversation: ConversationRecord): this {
for (const turn of conversation.turns) {
this.addFromConversationTurn(turn);
}
return this;
}
/**
* Build the final prompt in plain text format
*/
private buildPlainText(newPrompt: string): string {
const parts: string[] = [];
// Metadata section
if (this.options.includeMetadata && Object.keys(this.metadata).length > 0) {
parts.push('=== CONTEXT ===');
for (const [key, value] of Object.entries(this.metadata)) {
parts.push(`${key}: ${String(value)}`);
}
parts.push('');
}
// Conversation history
if (this.turns.length > 0) {
parts.push('=== CONVERSATION HISTORY ===');
parts.push('');
let currentTurn = 0;
for (let i = 0; i < this.turns.length; i += 2) {
currentTurn++;
const userTurn = this.turns[i];
const assistantTurn = this.turns[i + 1];
if (this.options.includeTurnMarkers) {
const sourceMarker = userTurn.source_id ? ` [${userTurn.source_id}]` : '';
parts.push(`--- Turn ${currentTurn}${sourceMarker} ---`);
}
parts.push('USER:');
parts.push(userTurn.content);
parts.push('');
if (assistantTurn) {
parts.push('ASSISTANT:');
parts.push(assistantTurn.content);
parts.push('');
}
}
}
// New request
parts.push('=== NEW REQUEST ===');
parts.push('');
parts.push(newPrompt);
return parts.join('\n');
}
/**
* Build the final prompt in YAML format
*/
private buildYaml(newPrompt: string): string {
const yamlLines: string[] = [];
// Metadata
if (this.options.includeMetadata && Object.keys(this.metadata).length > 0) {
yamlLines.push('context:');
for (const [key, value] of Object.entries(this.metadata)) {
yamlLines.push(` ${key}: ${this.yamlValue(value)}`);
}
yamlLines.push('');
}
// Conversation history
if (this.turns.length > 0) {
yamlLines.push('conversation:');
let currentTurn = 0;
for (let i = 0; i < this.turns.length; i += 2) {
currentTurn++;
const userTurn = this.turns[i];
const assistantTurn = this.turns[i + 1];
yamlLines.push(` - turn: ${currentTurn}`);
if (userTurn.source_id) {
yamlLines.push(` source: ${userTurn.source_id}`);
}
if (userTurn.timestamp) {
yamlLines.push(` timestamp: ${userTurn.timestamp}`);
}
// User message
yamlLines.push(' user: |');
const userLines = userTurn.content.split('\n');
for (const line of userLines) {
yamlLines.push(` ${line}`);
}
// Assistant message
if (assistantTurn) {
if (assistantTurn.status) {
yamlLines.push(` status: ${assistantTurn.status}`);
}
if (assistantTurn.duration_ms) {
yamlLines.push(` duration_ms: ${assistantTurn.duration_ms}`);
}
yamlLines.push(' assistant: |');
const assistantLines = assistantTurn.content.split('\n');
for (const line of assistantLines) {
yamlLines.push(` ${line}`);
}
}
yamlLines.push('');
}
}
// New request
yamlLines.push('new_request: |');
const requestLines = newPrompt.split('\n');
for (const line of requestLines) {
yamlLines.push(` ${line}`);
}
return yamlLines.join('\n');
}
/**
* Build the final prompt in JSON format
*/
private buildJson(newPrompt: string): string {
const data: Record<string, unknown> = {};
// Metadata
if (this.options.includeMetadata && Object.keys(this.metadata).length > 0) {
data.context = this.metadata;
}
// Conversation history
if (this.turns.length > 0) {
const conversation: Array<{
turn: number;
source?: string;
timestamp?: string;
user: string;
assistant?: string;
status?: string;
duration_ms?: number;
}> = [];
for (let i = 0; i < this.turns.length; i += 2) {
const userTurn = this.turns[i];
const assistantTurn = this.turns[i + 1];
const turnData: typeof conversation[0] = {
turn: Math.ceil((i + 1) / 2),
user: userTurn.content
};
if (userTurn.source_id) turnData.source = userTurn.source_id;
if (userTurn.timestamp) turnData.timestamp = userTurn.timestamp;
if (assistantTurn) {
turnData.assistant = assistantTurn.content;
if (assistantTurn.status) turnData.status = assistantTurn.status;
if (assistantTurn.duration_ms) turnData.duration_ms = assistantTurn.duration_ms;
}
conversation.push(turnData);
}
data.conversation = conversation;
}
data.new_request = newPrompt;
return JSON.stringify(data, null, 2);
}
/**
* Helper to format YAML values
*/
private yamlValue(value: unknown): string {
if (typeof value === 'string') {
// Quote strings that might be interpreted as other types
if (/[:\[\]{}#&*!|>'"@`]/.test(value) || value === '') {
return `"${value.replace(/"/g, '\\"')}"`;
}
return value;
}
if (typeof value === 'number' || typeof value === 'boolean') {
return String(value);
}
if (value === null || value === undefined) {
return 'null';
}
return JSON.stringify(value);
}
/**
* Build the final prompt string
*/
build(newPrompt: string): string {
switch (this.options.format) {
case 'yaml':
return this.buildYaml(newPrompt);
case 'json':
return this.buildJson(newPrompt);
case 'plain':
default:
return this.buildPlainText(newPrompt);
}
}
/**
* Reset the concatenator for reuse
*/
reset(): this {
this.turns = [];
this.metadata = {};
return this;
}
}
/**
* Create a prompt concatenator with specified options
*/
function createPromptConcatenator(options?: Partial<ConcatOptions>): PromptConcatenator {
return new PromptConcatenator(options);
}
/**
* Quick helper to build a multi-turn prompt in any format
*/
function buildPrompt(
conversation: ConversationRecord,
newPrompt: string,
format: PromptFormat = 'plain'
): string {
return createPromptConcatenator({ format })
.loadConversation(conversation)
.build(newPrompt);
}
/**
* Build multi-turn prompt with full conversation history
* Uses the PromptConcatenator with plain text format by default
*/
function buildMultiTurnPrompt(
conversation: ConversationRecord,
newPrompt: string,
format: PromptFormat = 'plain'
): string {
return buildPrompt(conversation, newPrompt, format);
}
/**
@@ -1111,11 +1468,17 @@ export function getLatestExecution(baseDir: string, tool?: string): ExecutionRec
}
// Export types
export type { ConversationRecord, ConversationTurn, ExecutionRecord };
export type { ConversationRecord, ConversationTurn, ExecutionRecord, PromptFormat, ConcatOptions };
// Export utility functions and tool definition for backward compatibility
export { executeCliTool, checkToolAvailability };
// Export prompt concatenation utilities
export { PromptConcatenator, createPromptConcatenator, buildPrompt, buildMultiTurnPrompt };
// Note: Async storage functions (getExecutionHistoryAsync, deleteExecutionAsync,
// batchDeleteExecutionsAsync, setStorageBackend) are exported at declaration site
// Export tool definition (for legacy imports) - This allows direct calls to execute with onOutput
export const cliExecutorTool = {
schema,