mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-05 01:50:27 +08:00
feat(cli-executor): add streaming option and enhance output handling
- Introduced a `stream` parameter to control output streaming vs. caching. - Enhanced status determination logic to prioritize valid output over exit codes. - Updated output structure to include full stdout and stderr when not streaming. feat(cli-history-store): extend conversation turn schema and migration - Added `cached`, `stdout_full`, and `stderr_full` fields to the conversation turn schema. - Implemented database migration to add new columns if they do not exist. - Updated upsert logic to handle new fields. feat(codex-lens): implement global symbol index for fast lookups - Created `GlobalSymbolIndex` class to manage project-wide symbol indexing. - Added methods for adding, updating, and deleting symbols in the global index. - Integrated global index updates into directory indexing processes. feat(codex-lens): optimize search functionality with global index - Enhanced `ChainSearchEngine` to utilize the global symbol index for faster searches. - Added configuration option to enable/disable global symbol indexing. - Updated tests to validate global index functionality and performance.
This commit is contained in:
@@ -15,11 +15,20 @@ Available CLI endpoints are dynamically defined by the config file:
|
||||
- Custom API endpoints registered via the Dashboard
|
||||
- Managed through the CCW Dashboard Status page
|
||||
|
||||
## Agent Execution
|
||||
## Tool Execution
|
||||
|
||||
### Agent Calls
|
||||
- **Always use `run_in_background: false`** for Task tool agent calls: `Task({ subagent_type: "xxx", prompt: "...", run_in_background: false })` to ensure synchronous execution and immediate result visibility
|
||||
- **TaskOutput usage**: Only use `TaskOutput({ task_id: "xxx", block: false })` + sleep loop to poll completion status. NEVER read intermediate output during agent/CLI execution - wait for final result only
|
||||
|
||||
### CLI Tool Calls (ccw cli)
|
||||
- **Always use `run_in_background: true`** for Bash tool when calling ccw cli:
|
||||
```
|
||||
Bash({ command: "ccw cli -p '...' --tool gemini", run_in_background: true })
|
||||
```
|
||||
- **After CLI call**: Stop immediately - let CLI execute in background, do NOT poll with TaskOutput
|
||||
- **View output later**: Use `ccw cli output <id>` to view cached execution results
|
||||
|
||||
## Code Diagnostics
|
||||
|
||||
- **Prefer `mcp__ide__getDiagnostics`** for code error checking over shell-based TypeScript compilation
|
||||
|
||||
@@ -38,7 +38,7 @@ Phase 1: Task Analysis & Exploration
|
||||
├─ Parse input (description or .md file)
|
||||
├─ intelligent complexity assessment (Low/Medium/High)
|
||||
├─ Exploration decision (auto-detect or --explore flag)
|
||||
├─ ⚠️ Context protection: If file reading ≥50k chars → force cli-explore-agent
|
||||
├─ Context protection: If file reading ≥50k chars → force cli-explore-agent
|
||||
└─ Decision:
|
||||
├─ needsExploration=true → Launch parallel cli-explore-agents (1-4 based on complexity)
|
||||
└─ needsExploration=false → Skip to Phase 2/3
|
||||
|
||||
@@ -65,13 +65,13 @@ RULES: $(cat ~/.claude/workflows/cli-templates/protocols/[mode]-protocol.md) $(c
|
||||
ccw cli -p "<PROMPT>" --tool <gemini|qwen|codex> --mode <analysis|write>
|
||||
```
|
||||
|
||||
**⚠️ CRITICAL**: `--mode` parameter is **MANDATORY** for all CLI executions. No defaults are assumed.
|
||||
**Note**: `--mode` defaults to `analysis` if not specified. Explicitly specify `--mode write` for file operations.
|
||||
|
||||
### Core Principles
|
||||
|
||||
- **Use tools early and often** - Tools are faster and more thorough
|
||||
- **Unified CLI** - Always use `ccw cli -p` for consistent parameter handling
|
||||
- **Mode is MANDATORY** - ALWAYS explicitly specify `--mode analysis|write` (no implicit defaults)
|
||||
- **Default mode is analysis** - Omit `--mode` for read-only operations, explicitly use `--mode write` for file modifications
|
||||
- **One template required** - ALWAYS reference exactly ONE template in RULES (use universal fallback if no specific match)
|
||||
- **Write protection** - Require EXPLICIT `--mode write` for file operations
|
||||
- **Use double quotes for shell expansion** - Always wrap prompts in double quotes `"..."` to enable `$(cat ...)` command substitution; NEVER use single quotes or escape characters (`\$`, `\"`, `\'`)
|
||||
@@ -183,6 +183,33 @@ ASSISTANT RESPONSE: [Previous output]
|
||||
|
||||
**Tool Behavior**: Codex uses native `codex resume`; Gemini/Qwen assembles context as single prompt
|
||||
|
||||
### Streaming vs Caching
|
||||
|
||||
**Default behavior**: Non-streaming with full output caching (can retrieve later via `output` subcommand)
|
||||
|
||||
```bash
|
||||
ccw cli -p "..." --tool gemini # Default: output cached, no streaming
|
||||
ccw cli -p "..." --tool gemini --stream # Streaming: real-time output, no caching
|
||||
```
|
||||
|
||||
| Mode | Flag | Output | Cached |
|
||||
|------|------|--------|--------|
|
||||
| Non-streaming (default) | (none) | After completion | ✅ Yes |
|
||||
| Streaming | `--stream` | Real-time | ❌ No |
|
||||
|
||||
### Output Viewing
|
||||
|
||||
View cached execution output with pagination:
|
||||
|
||||
```bash
|
||||
ccw cli output <execution-id> # View full output
|
||||
ccw cli output <id> --offset 0 --limit 10000 # Paginated view
|
||||
ccw cli output <id> --output-type stdout # Stdout only
|
||||
ccw cli output <id> --raw # Raw content (for piping)
|
||||
```
|
||||
|
||||
**Note**: `output` subcommand views execution results. `--cache` parameter injects context into prompt - different concepts.
|
||||
|
||||
---
|
||||
|
||||
## Prompt Template
|
||||
|
||||
@@ -174,7 +174,7 @@ export function run(argv: string[]): void {
|
||||
.option('--cd <path>', 'Working directory')
|
||||
.option('--includeDirs <dirs>', 'Additional directories (--include-directories for gemini/qwen, --add-dir for codex/claude)')
|
||||
.option('--timeout <ms>', 'Timeout in milliseconds', '300000')
|
||||
.option('--no-stream', 'Disable streaming output')
|
||||
.option('--stream', 'Enable streaming output (default: non-streaming with caching)')
|
||||
.option('--limit <n>', 'History limit')
|
||||
.option('--status <status>', 'Filter by status')
|
||||
.option('--category <category>', 'Execution category: user, internal, insight', 'user')
|
||||
@@ -190,6 +190,11 @@ export function run(argv: string[]): void {
|
||||
.option('--memory', 'Target memory storage')
|
||||
.option('--storage-cache', 'Target cache storage')
|
||||
.option('--config', 'Target config storage')
|
||||
// Cache subcommand options
|
||||
.option('--offset <n>', 'Character offset for cache pagination', '0')
|
||||
.option('--output-type <type>', 'Output type: stdout, stderr, both', 'both')
|
||||
.option('--turn <n>', 'Turn number for cache (default: latest)')
|
||||
.option('--raw', 'Raw output only (no formatting)')
|
||||
.action((subcommand, args, options) => cliCommand(subcommand, args, options));
|
||||
|
||||
// Memory command
|
||||
|
||||
@@ -24,6 +24,7 @@ import {
|
||||
projectExists,
|
||||
getStorageLocationInstructions
|
||||
} from '../tools/storage-manager.js';
|
||||
import { getHistoryStore } from '../tools/cli-history-store.js';
|
||||
|
||||
// Dashboard notification settings
|
||||
const DASHBOARD_PORT = process.env.CCW_PORT || 3456;
|
||||
@@ -74,7 +75,7 @@ interface CliExecOptions {
|
||||
cd?: string;
|
||||
includeDirs?: string;
|
||||
timeout?: string;
|
||||
noStream?: boolean;
|
||||
stream?: boolean; // Enable streaming (default: false, caches output)
|
||||
resume?: string | boolean; // true = last, string = execution ID, comma-separated for merge
|
||||
id?: string; // Custom execution ID (e.g., IMPL-001-step1)
|
||||
noNative?: boolean; // Force prompt concatenation instead of native resume
|
||||
@@ -104,6 +105,14 @@ interface StorageOptions {
|
||||
force?: boolean;
|
||||
}
|
||||
|
||||
interface OutputViewOptions {
|
||||
offset?: string;
|
||||
limit?: string;
|
||||
outputType?: 'stdout' | 'stderr' | 'both';
|
||||
turn?: string;
|
||||
raw?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Show storage information and management options
|
||||
*/
|
||||
@@ -287,6 +296,71 @@ function showStorageHelp(): void {
|
||||
console.log();
|
||||
}
|
||||
|
||||
/**
|
||||
* Show cached output for a conversation with pagination
|
||||
*/
|
||||
async function outputAction(conversationId: string | undefined, options: OutputViewOptions): Promise<void> {
|
||||
if (!conversationId) {
|
||||
console.error(chalk.red('Error: Conversation ID is required'));
|
||||
console.error(chalk.gray('Usage: ccw cli output <conversation-id> [--offset N] [--limit N]'));
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const store = getHistoryStore(process.cwd());
|
||||
const result = store.getCachedOutput(
|
||||
conversationId,
|
||||
options.turn ? parseInt(options.turn) : undefined,
|
||||
{
|
||||
offset: parseInt(options.offset || '0'),
|
||||
limit: parseInt(options.limit || '10000'),
|
||||
outputType: options.outputType || 'both'
|
||||
}
|
||||
);
|
||||
|
||||
if (!result) {
|
||||
console.error(chalk.red(`Error: Execution not found: ${conversationId}`));
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (options.raw) {
|
||||
// Raw output only (for piping)
|
||||
if (result.stdout) console.log(result.stdout.content);
|
||||
return;
|
||||
}
|
||||
|
||||
// Formatted output
|
||||
console.log(chalk.bold.cyan('Execution Output\n'));
|
||||
console.log(` ${chalk.gray('ID:')} ${result.conversationId}`);
|
||||
console.log(` ${chalk.gray('Turn:')} ${result.turnNumber}`);
|
||||
console.log(` ${chalk.gray('Cached:')} ${result.cached ? chalk.green('Yes') : chalk.yellow('No')}`);
|
||||
console.log(` ${chalk.gray('Status:')} ${result.status}`);
|
||||
console.log(` ${chalk.gray('Time:')} ${result.timestamp}`);
|
||||
console.log();
|
||||
|
||||
if (result.stdout) {
|
||||
console.log(` ${chalk.gray('Stdout:')} (${result.stdout.totalBytes} bytes, offset ${result.stdout.offset})`);
|
||||
console.log(chalk.gray(' ' + '-'.repeat(60)));
|
||||
console.log(result.stdout.content);
|
||||
console.log(chalk.gray(' ' + '-'.repeat(60)));
|
||||
if (result.stdout.hasMore) {
|
||||
console.log(chalk.yellow(` ... ${result.stdout.totalBytes - result.stdout.offset - result.stdout.content.length} more bytes available`));
|
||||
console.log(chalk.gray(` Use --offset ${result.stdout.offset + result.stdout.content.length} to continue`));
|
||||
}
|
||||
console.log();
|
||||
}
|
||||
|
||||
if (result.stderr && result.stderr.content) {
|
||||
console.log(` ${chalk.gray('Stderr:')} (${result.stderr.totalBytes} bytes, offset ${result.stderr.offset})`);
|
||||
console.log(chalk.gray(' ' + '-'.repeat(60)));
|
||||
console.log(result.stderr.content);
|
||||
console.log(chalk.gray(' ' + '-'.repeat(60)));
|
||||
if (result.stderr.hasMore) {
|
||||
console.log(chalk.yellow(` ... ${result.stderr.totalBytes - result.stderr.offset - result.stderr.content.length} more bytes available`));
|
||||
}
|
||||
console.log();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test endpoint for debugging multi-line prompt parsing
|
||||
* Shows exactly how Commander.js parsed the arguments
|
||||
@@ -391,7 +465,7 @@ async function statusAction(): Promise<void> {
|
||||
* @param {Object} options - CLI options
|
||||
*/
|
||||
async function execAction(positionalPrompt: string | undefined, options: CliExecOptions): Promise<void> {
|
||||
const { prompt: optionPrompt, file, tool = 'gemini', mode = 'analysis', model, cd, includeDirs, timeout, noStream, resume, id, noNative, cache, injectMode } = options;
|
||||
const { prompt: optionPrompt, file, tool = 'gemini', mode = 'analysis', model, cd, includeDirs, timeout, stream, resume, id, noNative, cache, injectMode } = options;
|
||||
|
||||
// Priority: 1. --file, 2. --prompt/-p option, 3. positional argument
|
||||
let finalPrompt: string | undefined;
|
||||
@@ -584,10 +658,10 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
|
||||
custom_id: id || null
|
||||
});
|
||||
|
||||
// Streaming output handler
|
||||
const onOutput = noStream ? null : (chunk: any) => {
|
||||
// Streaming output handler - only active when --stream flag is passed
|
||||
const onOutput = stream ? (chunk: any) => {
|
||||
process.stdout.write(chunk.data);
|
||||
};
|
||||
} : null;
|
||||
|
||||
try {
|
||||
const result = await cliExecutorTool.execute({
|
||||
@@ -600,11 +674,12 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
|
||||
timeout: timeout ? parseInt(timeout, 10) : 300000,
|
||||
resume,
|
||||
id, // custom execution ID
|
||||
noNative
|
||||
noNative,
|
||||
stream: !!stream // stream=true → streaming enabled, stream=false/undefined → cache output
|
||||
}, onOutput);
|
||||
|
||||
// If not streaming, print output now
|
||||
if (noStream && result.stdout) {
|
||||
// If not streaming (default), print output now
|
||||
if (!stream && result.stdout) {
|
||||
console.log(result.stdout);
|
||||
}
|
||||
|
||||
@@ -815,6 +890,10 @@ export async function cliCommand(
|
||||
await storageAction(argsArray[0], options as unknown as StorageOptions);
|
||||
break;
|
||||
|
||||
case 'output':
|
||||
await outputAction(argsArray[0], options as unknown as OutputViewOptions);
|
||||
break;
|
||||
|
||||
case 'test-parse':
|
||||
// Test endpoint to debug multi-line prompt parsing
|
||||
testParseAction(argsArray, options as CliExecOptions);
|
||||
@@ -845,6 +924,7 @@ export async function cliCommand(
|
||||
console.log(chalk.gray(' storage [cmd] Manage CCW storage (info/clean/config)'));
|
||||
console.log(chalk.gray(' history Show execution history'));
|
||||
console.log(chalk.gray(' detail <id> Show execution detail'));
|
||||
console.log(chalk.gray(' output <id> Show execution output with pagination'));
|
||||
console.log(chalk.gray(' test-parse [args] Debug CLI argument parsing'));
|
||||
console.log();
|
||||
console.log(' Options:');
|
||||
|
||||
@@ -73,6 +73,7 @@ const ParamsSchema = z.object({
|
||||
noNative: z.boolean().optional(), // Force prompt concatenation instead of native resume
|
||||
category: z.enum(['user', 'internal', 'insight']).default('user'), // Execution category for tracking
|
||||
parentExecutionId: z.string().optional(), // Parent execution ID for fork/retry scenarios
|
||||
stream: z.boolean().default(false), // false = cache full output (default), true = stream output via callback
|
||||
});
|
||||
|
||||
// Execution category types
|
||||
@@ -863,24 +864,36 @@ async function executeCliTool(
|
||||
const endTime = Date.now();
|
||||
const duration = endTime - startTime;
|
||||
|
||||
// Determine status
|
||||
// Determine status - prioritize output content over exit code
|
||||
let status: 'success' | 'error' | 'timeout' = 'success';
|
||||
if (timedOut) {
|
||||
status = 'timeout';
|
||||
} else if (code !== 0) {
|
||||
// Check if HTTP 429 but results exist (Gemini quirk)
|
||||
if (stderr.includes('429') && stdout.trim()) {
|
||||
// Non-zero exit code doesn't always mean failure
|
||||
// Check if there's valid output (AI response) - treat as success
|
||||
const hasValidOutput = stdout.trim().length > 0;
|
||||
const hasFatalError = stderr.includes('FATAL') ||
|
||||
stderr.includes('Authentication failed') ||
|
||||
stderr.includes('API key') ||
|
||||
stderr.includes('rate limit exceeded');
|
||||
|
||||
if (hasValidOutput && !hasFatalError) {
|
||||
// Has output and no fatal errors - treat as success despite exit code
|
||||
status = 'success';
|
||||
} else {
|
||||
status = 'error';
|
||||
}
|
||||
}
|
||||
|
||||
// Create new turn
|
||||
// Create new turn - cache full output when not streaming (default)
|
||||
const shouldCache = !parsed.data.stream;
|
||||
const newTurnOutput = {
|
||||
stdout: stdout.substring(0, 10240), // Truncate to 10KB
|
||||
stderr: stderr.substring(0, 2048), // Truncate to 2KB
|
||||
truncated: stdout.length > 10240 || stderr.length > 2048
|
||||
stdout: stdout.substring(0, 10240), // Truncate preview to 10KB
|
||||
stderr: stderr.substring(0, 2048), // Truncate preview to 2KB
|
||||
truncated: stdout.length > 10240 || stderr.length > 2048,
|
||||
cached: shouldCache,
|
||||
stdout_full: shouldCache ? stdout : undefined,
|
||||
stderr_full: shouldCache ? stderr : undefined
|
||||
};
|
||||
|
||||
// Determine base turn number for merge scenarios
|
||||
|
||||
@@ -23,6 +23,9 @@ export interface ConversationTurn {
|
||||
stdout: string;
|
||||
stderr: string;
|
||||
truncated: boolean;
|
||||
cached?: boolean;
|
||||
stdout_full?: string;
|
||||
stderr_full?: string;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -315,6 +318,28 @@ export class CliHistoryStore {
|
||||
} catch (indexErr) {
|
||||
console.warn('[CLI History] Turns timestamp index creation warning:', (indexErr as Error).message);
|
||||
}
|
||||
|
||||
// Add cached output columns to turns table for non-streaming mode
|
||||
const turnsInfo = this.db.prepare('PRAGMA table_info(turns)').all() as Array<{ name: string }>;
|
||||
const hasCached = turnsInfo.some(col => col.name === 'cached');
|
||||
const hasStdoutFull = turnsInfo.some(col => col.name === 'stdout_full');
|
||||
const hasStderrFull = turnsInfo.some(col => col.name === 'stderr_full');
|
||||
|
||||
if (!hasCached) {
|
||||
console.log('[CLI History] Migrating database: adding cached column to turns table...');
|
||||
this.db.exec('ALTER TABLE turns ADD COLUMN cached INTEGER DEFAULT 0;');
|
||||
console.log('[CLI History] Migration complete: cached column added');
|
||||
}
|
||||
if (!hasStdoutFull) {
|
||||
console.log('[CLI History] Migrating database: adding stdout_full column to turns table...');
|
||||
this.db.exec('ALTER TABLE turns ADD COLUMN stdout_full TEXT;');
|
||||
console.log('[CLI History] Migration complete: stdout_full column added');
|
||||
}
|
||||
if (!hasStderrFull) {
|
||||
console.log('[CLI History] Migrating database: adding stderr_full column to turns table...');
|
||||
this.db.exec('ALTER TABLE turns ADD COLUMN stderr_full TEXT;');
|
||||
console.log('[CLI History] Migration complete: stderr_full column added');
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('[CLI History] Migration error:', (err as Error).message);
|
||||
// Don't throw - allow the store to continue working with existing schema
|
||||
@@ -421,8 +446,8 @@ export class CliHistoryStore {
|
||||
`);
|
||||
|
||||
const upsertTurn = this.db.prepare(`
|
||||
INSERT INTO turns (conversation_id, turn_number, timestamp, prompt, duration_ms, status, exit_code, stdout, stderr, truncated)
|
||||
VALUES (@conversation_id, @turn_number, @timestamp, @prompt, @duration_ms, @status, @exit_code, @stdout, @stderr, @truncated)
|
||||
INSERT INTO turns (conversation_id, turn_number, timestamp, prompt, duration_ms, status, exit_code, stdout, stderr, truncated, cached, stdout_full, stderr_full)
|
||||
VALUES (@conversation_id, @turn_number, @timestamp, @prompt, @duration_ms, @status, @exit_code, @stdout, @stderr, @truncated, @cached, @stdout_full, @stderr_full)
|
||||
ON CONFLICT(conversation_id, turn_number) DO UPDATE SET
|
||||
timestamp = @timestamp,
|
||||
prompt = @prompt,
|
||||
@@ -431,7 +456,10 @@ export class CliHistoryStore {
|
||||
exit_code = @exit_code,
|
||||
stdout = @stdout,
|
||||
stderr = @stderr,
|
||||
truncated = @truncated
|
||||
truncated = @truncated,
|
||||
cached = @cached,
|
||||
stdout_full = @stdout_full,
|
||||
stderr_full = @stderr_full
|
||||
`);
|
||||
|
||||
const transaction = this.db.transaction(() => {
|
||||
@@ -463,7 +491,10 @@ export class CliHistoryStore {
|
||||
exit_code: turn.exit_code,
|
||||
stdout: turn.output.stdout,
|
||||
stderr: turn.output.stderr,
|
||||
truncated: turn.output.truncated ? 1 : 0
|
||||
truncated: turn.output.truncated ? 1 : 0,
|
||||
cached: turn.output.cached ? 1 : 0,
|
||||
stdout_full: turn.output.stdout_full || null,
|
||||
stderr_full: turn.output.stderr_full || null
|
||||
});
|
||||
}
|
||||
});
|
||||
@@ -507,7 +538,10 @@ export class CliHistoryStore {
|
||||
output: {
|
||||
stdout: t.stdout || '',
|
||||
stderr: t.stderr || '',
|
||||
truncated: !!t.truncated
|
||||
truncated: !!t.truncated,
|
||||
cached: !!t.cached,
|
||||
stdout_full: t.stdout_full || undefined,
|
||||
stderr_full: t.stderr_full || undefined
|
||||
}
|
||||
}))
|
||||
};
|
||||
@@ -533,6 +567,92 @@ export class CliHistoryStore {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get paginated cached output for a conversation turn
|
||||
* @param conversationId - Conversation ID
|
||||
* @param turnNumber - Turn number (default: latest turn)
|
||||
* @param options - Pagination options
|
||||
*/
|
||||
getCachedOutput(
|
||||
conversationId: string,
|
||||
turnNumber?: number,
|
||||
options: {
|
||||
offset?: number; // Character offset (default: 0)
|
||||
limit?: number; // Max characters to return (default: 10000)
|
||||
outputType?: 'stdout' | 'stderr' | 'both'; // Which output to fetch
|
||||
} = {}
|
||||
): {
|
||||
conversationId: string;
|
||||
turnNumber: number;
|
||||
stdout?: { content: string; totalBytes: number; offset: number; hasMore: boolean };
|
||||
stderr?: { content: string; totalBytes: number; offset: number; hasMore: boolean };
|
||||
cached: boolean;
|
||||
prompt: string;
|
||||
status: string;
|
||||
timestamp: string;
|
||||
} | null {
|
||||
const { offset = 0, limit = 10000, outputType = 'both' } = options;
|
||||
|
||||
// Get turn (latest if not specified)
|
||||
let turn;
|
||||
if (turnNumber !== undefined) {
|
||||
turn = this.db.prepare(`
|
||||
SELECT * FROM turns WHERE conversation_id = ? AND turn_number = ?
|
||||
`).get(conversationId, turnNumber) as any;
|
||||
} else {
|
||||
turn = this.db.prepare(`
|
||||
SELECT * FROM turns WHERE conversation_id = ? ORDER BY turn_number DESC LIMIT 1
|
||||
`).get(conversationId) as any;
|
||||
}
|
||||
|
||||
if (!turn) return null;
|
||||
|
||||
const result: {
|
||||
conversationId: string;
|
||||
turnNumber: number;
|
||||
stdout?: { content: string; totalBytes: number; offset: number; hasMore: boolean };
|
||||
stderr?: { content: string; totalBytes: number; offset: number; hasMore: boolean };
|
||||
cached: boolean;
|
||||
prompt: string;
|
||||
status: string;
|
||||
timestamp: string;
|
||||
} = {
|
||||
conversationId,
|
||||
turnNumber: turn.turn_number,
|
||||
cached: !!turn.cached,
|
||||
prompt: turn.prompt,
|
||||
status: turn.status,
|
||||
timestamp: turn.timestamp
|
||||
};
|
||||
|
||||
// Use full output if cached, otherwise use truncated
|
||||
if (outputType === 'stdout' || outputType === 'both') {
|
||||
const fullStdout = turn.cached ? (turn.stdout_full || '') : (turn.stdout || '');
|
||||
const totalBytes = fullStdout.length;
|
||||
const content = fullStdout.substring(offset, offset + limit);
|
||||
result.stdout = {
|
||||
content,
|
||||
totalBytes,
|
||||
offset,
|
||||
hasMore: offset + limit < totalBytes
|
||||
};
|
||||
}
|
||||
|
||||
if (outputType === 'stderr' || outputType === 'both') {
|
||||
const fullStderr = turn.cached ? (turn.stderr_full || '') : (turn.stderr || '');
|
||||
const totalBytes = fullStderr.length;
|
||||
const content = fullStderr.substring(offset, offset + limit);
|
||||
result.stderr = {
|
||||
content,
|
||||
totalBytes,
|
||||
offset,
|
||||
hasMore: offset + limit < totalBytes
|
||||
};
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query execution history
|
||||
*/
|
||||
|
||||
@@ -100,6 +100,9 @@ class Config:
|
||||
# For litellm: model name from config (e.g., "qwen3-embedding")
|
||||
embedding_use_gpu: bool = True # For fastembed: whether to use GPU acceleration
|
||||
|
||||
# Indexing/search optimizations
|
||||
global_symbol_index_enabled: bool = True # Enable project-wide symbol index fast path
|
||||
|
||||
# Multi-endpoint configuration for litellm backend
|
||||
embedding_endpoints: List[Dict[str, Any]] = field(default_factory=list)
|
||||
# List of endpoint configs: [{"model": "...", "api_key": "...", "api_base": "...", "weight": 1.0}]
|
||||
|
||||
@@ -11,11 +11,14 @@ from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
from codexlens.entities import SearchResult, Symbol
|
||||
from codexlens.config import Config
|
||||
from codexlens.storage.registry import RegistryStore, DirMapping
|
||||
from codexlens.storage.dir_index import DirIndexStore, SubdirLink
|
||||
from codexlens.storage.global_index import GlobalSymbolIndex
|
||||
from codexlens.storage.path_mapper import PathMapper
|
||||
from codexlens.storage.sqlite_store import SQLiteStore
|
||||
from codexlens.search.hybrid_search import HybridSearchEngine
|
||||
@@ -107,7 +110,8 @@ class ChainSearchEngine:
|
||||
def __init__(self,
|
||||
registry: RegistryStore,
|
||||
mapper: PathMapper,
|
||||
max_workers: int = 8):
|
||||
max_workers: int = 8,
|
||||
config: Config | None = None):
|
||||
"""Initialize chain search engine.
|
||||
|
||||
Args:
|
||||
@@ -120,6 +124,7 @@ class ChainSearchEngine:
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._max_workers = max_workers
|
||||
self._executor: Optional[ThreadPoolExecutor] = None
|
||||
self._config = config
|
||||
|
||||
def _get_executor(self, max_workers: Optional[int] = None) -> ThreadPoolExecutor:
|
||||
"""Get or create the shared thread pool executor.
|
||||
@@ -294,6 +299,71 @@ class ChainSearchEngine:
|
||||
self.logger.warning(f"No index found for {source_path}")
|
||||
return []
|
||||
|
||||
# Fast path: project-wide global symbol index (avoids chain traversal).
|
||||
if self._config is None or getattr(self._config, "global_symbol_index_enabled", True):
|
||||
try:
|
||||
# Avoid relying on index_to_source() here; use the same logic as _find_start_index
|
||||
# to determine the effective search root directory.
|
||||
search_root = source_path.resolve()
|
||||
exact_index = self.mapper.source_to_index_db(search_root)
|
||||
if not exact_index.exists():
|
||||
nearest = self.registry.find_nearest_index(search_root)
|
||||
if nearest:
|
||||
search_root = nearest.source_path
|
||||
|
||||
project = self.registry.find_by_source_path(str(search_root))
|
||||
if project:
|
||||
global_db_path = Path(project["index_root"]) / GlobalSymbolIndex.DEFAULT_DB_NAME
|
||||
if global_db_path.exists():
|
||||
query_limit = max(int(options.total_limit) * 10, int(options.total_limit))
|
||||
with GlobalSymbolIndex(global_db_path, project_id=int(project["id"])) as global_index:
|
||||
candidates = global_index.search(name=name, kind=kind, limit=query_limit)
|
||||
|
||||
# Apply depth constraint relative to the start index directory.
|
||||
filtered: List[Symbol] = []
|
||||
for sym in candidates:
|
||||
if not sym.file:
|
||||
continue
|
||||
try:
|
||||
root_str = str(search_root)
|
||||
file_dir_str = str(Path(sym.file).parent)
|
||||
|
||||
# Normalize Windows long-path prefix (\\?\) if present.
|
||||
if root_str.startswith("\\\\?\\"):
|
||||
root_str = root_str[4:]
|
||||
if file_dir_str.startswith("\\\\?\\"):
|
||||
file_dir_str = file_dir_str[4:]
|
||||
|
||||
root_cmp = root_str.lower().rstrip("\\/")
|
||||
dir_cmp = file_dir_str.lower().rstrip("\\/")
|
||||
|
||||
if os.path.commonpath([root_cmp, dir_cmp]) != root_cmp:
|
||||
continue
|
||||
|
||||
rel = os.path.relpath(dir_cmp, root_cmp)
|
||||
rel_depth = 0 if rel == "." else len(rel.split(os.sep))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if options.depth >= 0 and rel_depth > options.depth:
|
||||
continue
|
||||
filtered.append(sym)
|
||||
|
||||
if filtered:
|
||||
# Match existing semantics: dedupe by (name, kind, range), sort by name.
|
||||
seen = set()
|
||||
unique_symbols: List[Symbol] = []
|
||||
for sym in filtered:
|
||||
key = (sym.name, sym.kind, sym.range)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
unique_symbols.append(sym)
|
||||
unique_symbols.sort(key=lambda s: s.name)
|
||||
return unique_symbols[: options.total_limit]
|
||||
except Exception as exc:
|
||||
self.logger.debug("Global symbol index fast path failed: %s", exc)
|
||||
|
||||
index_paths = self._collect_index_paths(start_index, options.depth)
|
||||
if not index_paths:
|
||||
return []
|
||||
|
||||
@@ -17,8 +17,10 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from codexlens.config import Config
|
||||
from codexlens.entities import SearchResult, Symbol
|
||||
from codexlens.errors import StorageError
|
||||
from codexlens.storage.global_index import GlobalSymbolIndex
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -60,7 +62,13 @@ class DirIndexStore:
|
||||
# Increment this when schema changes require migration
|
||||
SCHEMA_VERSION = 5
|
||||
|
||||
def __init__(self, db_path: str | Path) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
db_path: str | Path,
|
||||
*,
|
||||
config: Config | None = None,
|
||||
global_index: GlobalSymbolIndex | None = None,
|
||||
) -> None:
|
||||
"""Initialize directory index store.
|
||||
|
||||
Args:
|
||||
@@ -70,6 +78,8 @@ class DirIndexStore:
|
||||
self._lock = threading.RLock()
|
||||
self._conn: Optional[sqlite3.Connection] = None
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._config = config
|
||||
self._global_index = global_index
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Create database and schema if not exists."""
|
||||
@@ -231,6 +241,7 @@ class DirIndexStore:
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
self._maybe_update_global_symbols(full_path_str, symbols or [])
|
||||
return file_id
|
||||
|
||||
except sqlite3.DatabaseError as exc:
|
||||
@@ -328,6 +339,7 @@ class DirIndexStore:
|
||||
file_id = int(row["id"])
|
||||
conn.execute("DELETE FROM files WHERE id=?", (file_id,))
|
||||
conn.commit()
|
||||
self._maybe_delete_global_symbols(full_path_str)
|
||||
return True
|
||||
|
||||
def get_file(self, full_path: str | Path) -> Optional[FileEntry]:
|
||||
@@ -483,6 +495,7 @@ class DirIndexStore:
|
||||
for deleted_path in deleted_paths:
|
||||
conn.execute("DELETE FROM files WHERE full_path=?", (deleted_path,))
|
||||
deleted_count += 1
|
||||
self._maybe_delete_global_symbols(deleted_path)
|
||||
|
||||
if deleted_count > 0:
|
||||
conn.commit()
|
||||
@@ -1593,6 +1606,31 @@ class DirIndexStore:
|
||||
self._conn.execute("PRAGMA mmap_size=30000000000")
|
||||
return self._conn
|
||||
|
||||
def _maybe_update_global_symbols(self, file_path: str, symbols: List[Symbol]) -> None:
|
||||
if self._global_index is None:
|
||||
return
|
||||
if self._config is not None and not getattr(self._config, "global_symbol_index_enabled", True):
|
||||
return
|
||||
try:
|
||||
self._global_index.update_file_symbols(
|
||||
file_path=file_path,
|
||||
symbols=symbols,
|
||||
index_path=str(self.db_path),
|
||||
)
|
||||
except Exception as exc:
|
||||
# Global index is an optimization; local directory index remains authoritative.
|
||||
self.logger.debug("Global symbol index update failed for %s: %s", file_path, exc)
|
||||
|
||||
def _maybe_delete_global_symbols(self, file_path: str) -> None:
|
||||
if self._global_index is None:
|
||||
return
|
||||
if self._config is not None and not getattr(self._config, "global_symbol_index_enabled", True):
|
||||
return
|
||||
try:
|
||||
self._global_index.delete_file_symbols(file_path)
|
||||
except Exception as exc:
|
||||
self.logger.debug("Global symbol index delete failed for %s: %s", file_path, exc)
|
||||
|
||||
def _create_schema(self, conn: sqlite3.Connection) -> None:
|
||||
"""Create database schema.
|
||||
|
||||
|
||||
365
codex-lens/src/codexlens/storage/global_index.py
Normal file
365
codex-lens/src/codexlens/storage/global_index.py
Normal file
@@ -0,0 +1,365 @@
|
||||
"""Global cross-directory symbol index for fast lookups.
|
||||
|
||||
Stores symbols for an entire project in a single SQLite database so symbol search
|
||||
does not require traversing every directory _index.db.
|
||||
|
||||
This index is updated incrementally during file indexing (delete+insert per file)
|
||||
to avoid expensive batch rebuilds.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from codexlens.entities import Symbol
|
||||
from codexlens.errors import StorageError
|
||||
|
||||
|
||||
class GlobalSymbolIndex:
|
||||
"""Project-wide symbol index with incremental updates."""
|
||||
|
||||
SCHEMA_VERSION = 1
|
||||
DEFAULT_DB_NAME = "_global_symbols.db"
|
||||
|
||||
def __init__(self, db_path: str | Path, project_id: int) -> None:
|
||||
self.db_path = Path(db_path).resolve()
|
||||
self.project_id = int(project_id)
|
||||
self._lock = threading.RLock()
|
||||
self._conn: Optional[sqlite3.Connection] = None
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Create database and schema if not exists."""
|
||||
with self._lock:
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = self._get_connection()
|
||||
|
||||
current_version = self._get_schema_version(conn)
|
||||
if current_version > self.SCHEMA_VERSION:
|
||||
raise StorageError(
|
||||
f"Database schema version {current_version} is newer than "
|
||||
f"supported version {self.SCHEMA_VERSION}. "
|
||||
f"Please update the application or use a compatible database.",
|
||||
db_path=str(self.db_path),
|
||||
operation="initialize",
|
||||
details={
|
||||
"current_version": current_version,
|
||||
"supported_version": self.SCHEMA_VERSION,
|
||||
},
|
||||
)
|
||||
|
||||
if current_version == 0:
|
||||
self._create_schema(conn)
|
||||
self._set_schema_version(conn, self.SCHEMA_VERSION)
|
||||
elif current_version < self.SCHEMA_VERSION:
|
||||
self._apply_migrations(conn, current_version)
|
||||
self._set_schema_version(conn, self.SCHEMA_VERSION)
|
||||
|
||||
conn.commit()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close database connection."""
|
||||
with self._lock:
|
||||
if self._conn is not None:
|
||||
try:
|
||||
self._conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
self._conn = None
|
||||
|
||||
def __enter__(self) -> "GlobalSymbolIndex":
|
||||
self.initialize()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
|
||||
self.close()
|
||||
|
||||
def add_symbol(self, symbol: Symbol, file_path: str | Path, index_path: str | Path) -> None:
|
||||
"""Insert a single symbol (idempotent) for incremental updates."""
|
||||
file_path_str = str(Path(file_path).resolve())
|
||||
index_path_str = str(Path(index_path).resolve())
|
||||
|
||||
with self._lock:
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO global_symbols(
|
||||
project_id, symbol_name, symbol_kind,
|
||||
file_path, start_line, end_line, index_path
|
||||
)
|
||||
VALUES(?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(
|
||||
project_id, symbol_name, symbol_kind,
|
||||
file_path, start_line, end_line
|
||||
)
|
||||
DO UPDATE SET
|
||||
index_path=excluded.index_path
|
||||
""",
|
||||
(
|
||||
self.project_id,
|
||||
symbol.name,
|
||||
symbol.kind,
|
||||
file_path_str,
|
||||
symbol.range[0],
|
||||
symbol.range[1],
|
||||
index_path_str,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
except sqlite3.DatabaseError as exc:
|
||||
conn.rollback()
|
||||
raise StorageError(
|
||||
f"Failed to add symbol {symbol.name}: {exc}",
|
||||
db_path=str(self.db_path),
|
||||
operation="add_symbol",
|
||||
) from exc
|
||||
|
||||
def update_file_symbols(
|
||||
self,
|
||||
file_path: str | Path,
|
||||
symbols: List[Symbol],
|
||||
index_path: str | Path | None = None,
|
||||
) -> None:
|
||||
"""Replace all symbols for a file atomically (delete + insert)."""
|
||||
file_path_str = str(Path(file_path).resolve())
|
||||
|
||||
index_path_str: Optional[str]
|
||||
if index_path is not None:
|
||||
index_path_str = str(Path(index_path).resolve())
|
||||
else:
|
||||
index_path_str = self._get_existing_index_path(file_path_str)
|
||||
|
||||
with self._lock:
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
conn.execute("BEGIN")
|
||||
conn.execute(
|
||||
"DELETE FROM global_symbols WHERE project_id=? AND file_path=?",
|
||||
(self.project_id, file_path_str),
|
||||
)
|
||||
|
||||
if symbols:
|
||||
if not index_path_str:
|
||||
raise StorageError(
|
||||
"index_path is required when inserting symbols for a new file",
|
||||
db_path=str(self.db_path),
|
||||
operation="update_file_symbols",
|
||||
details={"file_path": file_path_str},
|
||||
)
|
||||
|
||||
rows = [
|
||||
(
|
||||
self.project_id,
|
||||
s.name,
|
||||
s.kind,
|
||||
file_path_str,
|
||||
s.range[0],
|
||||
s.range[1],
|
||||
index_path_str,
|
||||
)
|
||||
for s in symbols
|
||||
]
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO global_symbols(
|
||||
project_id, symbol_name, symbol_kind,
|
||||
file_path, start_line, end_line, index_path
|
||||
)
|
||||
VALUES(?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(
|
||||
project_id, symbol_name, symbol_kind,
|
||||
file_path, start_line, end_line
|
||||
)
|
||||
DO UPDATE SET
|
||||
index_path=excluded.index_path
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
except sqlite3.DatabaseError as exc:
|
||||
conn.rollback()
|
||||
raise StorageError(
|
||||
f"Failed to update symbols for {file_path_str}: {exc}",
|
||||
db_path=str(self.db_path),
|
||||
operation="update_file_symbols",
|
||||
) from exc
|
||||
|
||||
def delete_file_symbols(self, file_path: str | Path) -> int:
|
||||
"""Remove all symbols for a file. Returns number of rows deleted."""
|
||||
file_path_str = str(Path(file_path).resolve())
|
||||
with self._lock:
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
cur = conn.execute(
|
||||
"DELETE FROM global_symbols WHERE project_id=? AND file_path=?",
|
||||
(self.project_id, file_path_str),
|
||||
)
|
||||
conn.commit()
|
||||
return int(cur.rowcount or 0)
|
||||
except sqlite3.DatabaseError as exc:
|
||||
conn.rollback()
|
||||
raise StorageError(
|
||||
f"Failed to delete symbols for {file_path_str}: {exc}",
|
||||
db_path=str(self.db_path),
|
||||
operation="delete_file_symbols",
|
||||
) from exc
|
||||
|
||||
def search(
|
||||
self,
|
||||
name: str,
|
||||
kind: Optional[str] = None,
|
||||
limit: int = 50,
|
||||
prefix_mode: bool = True,
|
||||
) -> List[Symbol]:
|
||||
"""Search symbols and return full Symbol objects."""
|
||||
if prefix_mode:
|
||||
pattern = f"{name}%"
|
||||
else:
|
||||
pattern = f"%{name}%"
|
||||
|
||||
with self._lock:
|
||||
conn = self._get_connection()
|
||||
if kind:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT symbol_name, symbol_kind, file_path, start_line, end_line
|
||||
FROM global_symbols
|
||||
WHERE project_id=? AND symbol_name LIKE ? AND symbol_kind=?
|
||||
ORDER BY symbol_name
|
||||
LIMIT ?
|
||||
""",
|
||||
(self.project_id, pattern, kind, limit),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT symbol_name, symbol_kind, file_path, start_line, end_line
|
||||
FROM global_symbols
|
||||
WHERE project_id=? AND symbol_name LIKE ?
|
||||
ORDER BY symbol_name
|
||||
LIMIT ?
|
||||
""",
|
||||
(self.project_id, pattern, limit),
|
||||
).fetchall()
|
||||
|
||||
return [
|
||||
Symbol(
|
||||
name=row["symbol_name"],
|
||||
kind=row["symbol_kind"],
|
||||
range=(row["start_line"], row["end_line"]),
|
||||
file=row["file_path"],
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def search_symbols(
|
||||
self,
|
||||
name: str,
|
||||
kind: Optional[str] = None,
|
||||
limit: int = 50,
|
||||
prefix_mode: bool = True,
|
||||
) -> List[Tuple[str, Tuple[int, int]]]:
|
||||
"""Search symbols and return only (file_path, (start_line, end_line))."""
|
||||
symbols = self.search(name=name, kind=kind, limit=limit, prefix_mode=prefix_mode)
|
||||
return [(s.file or "", s.range) for s in symbols]
|
||||
|
||||
def _get_existing_index_path(self, file_path_str: str) -> Optional[str]:
|
||||
with self._lock:
|
||||
conn = self._get_connection()
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT index_path
|
||||
FROM global_symbols
|
||||
WHERE project_id=? AND file_path=?
|
||||
LIMIT 1
|
||||
""",
|
||||
(self.project_id, file_path_str),
|
||||
).fetchone()
|
||||
return str(row["index_path"]) if row else None
|
||||
|
||||
def _get_schema_version(self, conn: sqlite3.Connection) -> int:
|
||||
try:
|
||||
row = conn.execute("PRAGMA user_version").fetchone()
|
||||
return int(row[0]) if row else 0
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
def _set_schema_version(self, conn: sqlite3.Connection, version: int) -> None:
|
||||
conn.execute(f"PRAGMA user_version = {int(version)}")
|
||||
|
||||
def _apply_migrations(self, conn: sqlite3.Connection, from_version: int) -> None:
|
||||
# No migrations yet (v1).
|
||||
_ = (conn, from_version)
|
||||
return
|
||||
|
||||
def _get_connection(self) -> sqlite3.Connection:
|
||||
if self._conn is None:
|
||||
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
self._conn.execute("PRAGMA synchronous=NORMAL")
|
||||
self._conn.execute("PRAGMA foreign_keys=ON")
|
||||
self._conn.execute("PRAGMA mmap_size=30000000000")
|
||||
return self._conn
|
||||
|
||||
def _create_schema(self, conn: sqlite3.Connection) -> None:
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS global_symbols (
|
||||
id INTEGER PRIMARY KEY,
|
||||
project_id INTEGER NOT NULL,
|
||||
symbol_name TEXT NOT NULL,
|
||||
symbol_kind TEXT NOT NULL,
|
||||
file_path TEXT NOT NULL,
|
||||
start_line INTEGER,
|
||||
end_line INTEGER,
|
||||
index_path TEXT NOT NULL,
|
||||
UNIQUE(
|
||||
project_id, symbol_name, symbol_kind,
|
||||
file_path, start_line, end_line
|
||||
)
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
# Required by optimization spec.
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_global_symbols_name_kind
|
||||
ON global_symbols(symbol_name, symbol_kind)
|
||||
"""
|
||||
)
|
||||
# Used by common queries (project-scoped name lookups).
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_global_symbols_project_name_kind
|
||||
ON global_symbols(project_id, symbol_name, symbol_kind)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_global_symbols_project_file
|
||||
ON global_symbols(project_id, file_path)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_global_symbols_project_index_path
|
||||
ON global_symbols(project_id, index_path)
|
||||
"""
|
||||
)
|
||||
except sqlite3.DatabaseError as exc:
|
||||
raise StorageError(
|
||||
f"Failed to initialize global symbol schema: {exc}",
|
||||
db_path=str(self.db_path),
|
||||
operation="_create_schema",
|
||||
) from exc
|
||||
|
||||
@@ -17,6 +17,7 @@ from typing import Dict, List, Optional, Set
|
||||
from codexlens.config import Config
|
||||
from codexlens.parsers.factory import ParserFactory
|
||||
from codexlens.storage.dir_index import DirIndexStore
|
||||
from codexlens.storage.global_index import GlobalSymbolIndex
|
||||
from codexlens.storage.path_mapper import PathMapper
|
||||
from codexlens.storage.registry import ProjectInfo, RegistryStore
|
||||
|
||||
@@ -141,6 +142,12 @@ class IndexTreeBuilder:
|
||||
# Register project
|
||||
index_root = self.mapper.source_to_index_dir(source_root)
|
||||
project_info = self.registry.register_project(source_root, index_root)
|
||||
global_index_db_path = index_root / GlobalSymbolIndex.DEFAULT_DB_NAME
|
||||
|
||||
global_index: GlobalSymbolIndex | None = None
|
||||
if self.config.global_symbol_index_enabled:
|
||||
global_index = GlobalSymbolIndex(global_index_db_path, project_id=project_info.id)
|
||||
global_index.initialize()
|
||||
|
||||
# Report progress: discovering files (5%)
|
||||
print("Discovering files...", flush=True)
|
||||
@@ -150,6 +157,8 @@ class IndexTreeBuilder:
|
||||
|
||||
if not dirs_by_depth:
|
||||
self.logger.warning("No indexable directories found in %s", source_root)
|
||||
if global_index is not None:
|
||||
global_index.close()
|
||||
return BuildResult(
|
||||
project_id=project_info.id,
|
||||
source_root=source_root,
|
||||
@@ -181,7 +190,13 @@ class IndexTreeBuilder:
|
||||
self.logger.info("Building %d directories at depth %d", len(dirs), depth)
|
||||
|
||||
# Build directories at this level in parallel
|
||||
results = self._build_level_parallel(dirs, languages, workers)
|
||||
results = self._build_level_parallel(
|
||||
dirs,
|
||||
languages,
|
||||
workers,
|
||||
project_id=project_info.id,
|
||||
global_index_db_path=global_index_db_path,
|
||||
)
|
||||
all_results.extend(results)
|
||||
|
||||
# Process results
|
||||
@@ -230,7 +245,7 @@ class IndexTreeBuilder:
|
||||
if result.error:
|
||||
continue
|
||||
try:
|
||||
with DirIndexStore(result.index_path) as store:
|
||||
with DirIndexStore(result.index_path, config=self.config, global_index=global_index) as store:
|
||||
deleted_count = store.cleanup_deleted_files(result.source_path)
|
||||
total_deleted += deleted_count
|
||||
if deleted_count > 0:
|
||||
@@ -257,6 +272,9 @@ class IndexTreeBuilder:
|
||||
len(all_errors),
|
||||
)
|
||||
|
||||
if global_index is not None:
|
||||
global_index.close()
|
||||
|
||||
return BuildResult(
|
||||
project_id=project_info.id,
|
||||
source_root=source_root,
|
||||
@@ -315,7 +333,18 @@ class IndexTreeBuilder:
|
||||
"""
|
||||
source_path = source_path.resolve()
|
||||
self.logger.info("Rebuilding directory %s", source_path)
|
||||
return self._build_single_dir(source_path)
|
||||
project_root = self.mapper.get_project_root(source_path)
|
||||
project_info = self.registry.get_project(project_root)
|
||||
if not project_info:
|
||||
raise ValueError(f"Directory not indexed: {source_path}")
|
||||
|
||||
global_index_db_path = project_info.index_root / GlobalSymbolIndex.DEFAULT_DB_NAME
|
||||
return self._build_single_dir(
|
||||
source_path,
|
||||
languages=None,
|
||||
project_id=project_info.id,
|
||||
global_index_db_path=global_index_db_path,
|
||||
)
|
||||
|
||||
# === Internal Methods ===
|
||||
|
||||
@@ -396,7 +425,13 @@ class IndexTreeBuilder:
|
||||
return len(source_files) > 0
|
||||
|
||||
def _build_level_parallel(
|
||||
self, dirs: List[Path], languages: List[str], workers: int
|
||||
self,
|
||||
dirs: List[Path],
|
||||
languages: List[str],
|
||||
workers: int,
|
||||
*,
|
||||
project_id: int,
|
||||
global_index_db_path: Path,
|
||||
) -> List[DirBuildResult]:
|
||||
"""Build multiple directories in parallel.
|
||||
|
||||
@@ -419,7 +454,12 @@ class IndexTreeBuilder:
|
||||
|
||||
# For single directory, avoid overhead of process pool
|
||||
if len(dirs) == 1:
|
||||
result = self._build_single_dir(dirs[0], languages)
|
||||
result = self._build_single_dir(
|
||||
dirs[0],
|
||||
languages,
|
||||
project_id=project_id,
|
||||
global_index_db_path=global_index_db_path,
|
||||
)
|
||||
return [result]
|
||||
|
||||
# Prepare arguments for worker processes
|
||||
@@ -427,6 +467,7 @@ class IndexTreeBuilder:
|
||||
"data_dir": str(self.config.data_dir),
|
||||
"supported_languages": self.config.supported_languages,
|
||||
"parsing_rules": self.config.parsing_rules,
|
||||
"global_symbol_index_enabled": self.config.global_symbol_index_enabled,
|
||||
}
|
||||
|
||||
worker_args = [
|
||||
@@ -435,6 +476,8 @@ class IndexTreeBuilder:
|
||||
self.mapper.source_to_index_db(dir_path),
|
||||
languages,
|
||||
config_dict,
|
||||
int(project_id),
|
||||
str(global_index_db_path),
|
||||
)
|
||||
for dir_path in dirs
|
||||
]
|
||||
@@ -467,7 +510,12 @@ class IndexTreeBuilder:
|
||||
return results
|
||||
|
||||
def _build_single_dir(
|
||||
self, dir_path: Path, languages: List[str] = None
|
||||
self,
|
||||
dir_path: Path,
|
||||
languages: List[str] = None,
|
||||
*,
|
||||
project_id: int,
|
||||
global_index_db_path: Path,
|
||||
) -> DirBuildResult:
|
||||
"""Build index for a single directory.
|
||||
|
||||
@@ -484,12 +532,17 @@ class IndexTreeBuilder:
|
||||
dir_path = dir_path.resolve()
|
||||
index_db_path = self.mapper.source_to_index_db(dir_path)
|
||||
|
||||
global_index: GlobalSymbolIndex | None = None
|
||||
try:
|
||||
# Ensure index directory exists
|
||||
index_db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create directory index
|
||||
store = DirIndexStore(index_db_path)
|
||||
if self.config.global_symbol_index_enabled:
|
||||
global_index = GlobalSymbolIndex(global_index_db_path, project_id=project_id)
|
||||
global_index.initialize()
|
||||
|
||||
store = DirIndexStore(index_db_path, config=self.config, global_index=global_index)
|
||||
store.initialize()
|
||||
|
||||
# Get source files in this directory only
|
||||
@@ -541,6 +594,8 @@ class IndexTreeBuilder:
|
||||
]
|
||||
|
||||
store.close()
|
||||
if global_index is not None:
|
||||
global_index.close()
|
||||
|
||||
if skipped_count > 0:
|
||||
self.logger.debug(
|
||||
@@ -570,6 +625,11 @@ class IndexTreeBuilder:
|
||||
|
||||
except Exception as exc:
|
||||
self.logger.error("Failed to build directory %s: %s", dir_path, exc)
|
||||
if global_index is not None:
|
||||
try:
|
||||
global_index.close()
|
||||
except Exception:
|
||||
pass
|
||||
return DirBuildResult(
|
||||
source_path=dir_path,
|
||||
index_path=index_db_path,
|
||||
@@ -676,28 +736,34 @@ def _build_dir_worker(args: tuple) -> DirBuildResult:
|
||||
Reconstructs necessary objects from serializable arguments.
|
||||
|
||||
Args:
|
||||
args: Tuple of (dir_path, index_db_path, languages, config_dict)
|
||||
args: Tuple of (dir_path, index_db_path, languages, config_dict, project_id, global_index_db_path)
|
||||
|
||||
Returns:
|
||||
DirBuildResult for the directory
|
||||
"""
|
||||
dir_path, index_db_path, languages, config_dict = args
|
||||
dir_path, index_db_path, languages, config_dict, project_id, global_index_db_path = args
|
||||
|
||||
# Reconstruct config
|
||||
config = Config(
|
||||
data_dir=Path(config_dict["data_dir"]),
|
||||
supported_languages=config_dict["supported_languages"],
|
||||
parsing_rules=config_dict["parsing_rules"],
|
||||
global_symbol_index_enabled=bool(config_dict.get("global_symbol_index_enabled", True)),
|
||||
)
|
||||
|
||||
parser_factory = ParserFactory(config)
|
||||
|
||||
global_index: GlobalSymbolIndex | None = None
|
||||
try:
|
||||
# Ensure index directory exists
|
||||
index_db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create directory index
|
||||
store = DirIndexStore(index_db_path)
|
||||
if config.global_symbol_index_enabled and global_index_db_path:
|
||||
global_index = GlobalSymbolIndex(Path(global_index_db_path), project_id=int(project_id))
|
||||
global_index.initialize()
|
||||
|
||||
store = DirIndexStore(index_db_path, config=config, global_index=global_index)
|
||||
store.initialize()
|
||||
|
||||
files_count = 0
|
||||
@@ -756,6 +822,8 @@ def _build_dir_worker(args: tuple) -> DirBuildResult:
|
||||
]
|
||||
|
||||
store.close()
|
||||
if global_index is not None:
|
||||
global_index.close()
|
||||
|
||||
return DirBuildResult(
|
||||
source_path=dir_path,
|
||||
@@ -766,6 +834,11 @@ def _build_dir_worker(args: tuple) -> DirBuildResult:
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
if global_index is not None:
|
||||
try:
|
||||
global_index.close()
|
||||
except Exception:
|
||||
pass
|
||||
return DirBuildResult(
|
||||
source_path=dir_path,
|
||||
index_path=index_db_path,
|
||||
|
||||
192
codex-lens/tests/test_global_symbol_index.py
Normal file
192
codex-lens/tests/test_global_symbol_index.py
Normal file
@@ -0,0 +1,192 @@
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from codexlens.config import Config
|
||||
from codexlens.entities import Symbol
|
||||
from codexlens.search.chain_search import ChainSearchEngine
|
||||
from codexlens.storage.dir_index import DirIndexStore
|
||||
from codexlens.storage.global_index import GlobalSymbolIndex
|
||||
from codexlens.storage.path_mapper import PathMapper
|
||||
from codexlens.storage.registry import RegistryStore
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def temp_paths():
|
||||
tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
|
||||
root = Path(tmpdir.name)
|
||||
yield root
|
||||
try:
|
||||
tmpdir.cleanup()
|
||||
except (PermissionError, OSError):
|
||||
pass
|
||||
|
||||
|
||||
def test_global_symbol_index_add_and_search_under_50ms(temp_paths: Path):
|
||||
db_path = temp_paths / "indexes" / "_global_symbols.db"
|
||||
file_path = temp_paths / "src" / "a.py"
|
||||
index_path = temp_paths / "indexes" / "_index.db"
|
||||
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path.write_text("class AuthManager:\n pass\n", encoding="utf-8")
|
||||
index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
index_path.write_text("", encoding="utf-8")
|
||||
|
||||
store = GlobalSymbolIndex(db_path, project_id=1)
|
||||
store.initialize()
|
||||
|
||||
# Insert enough rows to ensure index usage, still small enough to be fast.
|
||||
for i in range(200):
|
||||
store.add_symbol(
|
||||
Symbol(name=f"AuthManager{i}", kind="class", range=(1, 2)),
|
||||
file_path=file_path,
|
||||
index_path=index_path,
|
||||
)
|
||||
|
||||
start = time.perf_counter()
|
||||
results = store.search("AuthManager", kind="class", limit=50, prefix_mode=True)
|
||||
elapsed_ms = (time.perf_counter() - start) * 1000
|
||||
|
||||
assert elapsed_ms < 50.0
|
||||
assert len(results) >= 1
|
||||
assert all(r.kind == "class" for r in results)
|
||||
assert all((r.file or "").endswith("a.py") for r in results)
|
||||
|
||||
locations = store.search_symbols("AuthManager", kind="class", limit=50, prefix_mode=True)
|
||||
assert locations
|
||||
assert all(isinstance(p, str) and isinstance(rng, tuple) for p, rng in locations)
|
||||
|
||||
|
||||
def test_update_file_symbols_replaces_atomically(temp_paths: Path):
|
||||
db_path = temp_paths / "indexes" / "_global_symbols.db"
|
||||
file_path = temp_paths / "src" / "mod.py"
|
||||
index_path = temp_paths / "indexes" / "_index.db"
|
||||
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path.write_text("def a():\n pass\n", encoding="utf-8")
|
||||
index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
index_path.write_text("", encoding="utf-8")
|
||||
|
||||
store = GlobalSymbolIndex(db_path, project_id=7)
|
||||
store.initialize()
|
||||
|
||||
store.update_file_symbols(
|
||||
file_path=file_path,
|
||||
symbols=[
|
||||
Symbol(name="old_func", kind="function", range=(1, 2)),
|
||||
Symbol(name="Other", kind="class", range=(10, 20)),
|
||||
],
|
||||
index_path=index_path,
|
||||
)
|
||||
|
||||
assert any(s.name == "old_func" for s in store.search("old_", prefix_mode=True))
|
||||
|
||||
# Replace with new set (delete + insert)
|
||||
store.update_file_symbols(
|
||||
file_path=file_path,
|
||||
symbols=[Symbol(name="new_func", kind="function", range=(3, 4))],
|
||||
index_path=index_path,
|
||||
)
|
||||
|
||||
assert not any(s.name == "old_func" for s in store.search("old_", prefix_mode=True))
|
||||
assert any(s.name == "new_func" for s in store.search("new_", prefix_mode=True))
|
||||
|
||||
# Backward-compatible path: omit index_path after it has been established.
|
||||
store.update_file_symbols(
|
||||
file_path=file_path,
|
||||
symbols=[Symbol(name="new_func2", kind="function", range=(5, 6))],
|
||||
index_path=None,
|
||||
)
|
||||
assert any(s.name == "new_func2" for s in store.search("new_func2", prefix_mode=True))
|
||||
|
||||
|
||||
def test_dir_index_store_updates_global_index_when_enabled(temp_paths: Path):
|
||||
config = Config(data_dir=temp_paths / "data")
|
||||
|
||||
index_db_path = temp_paths / "indexes" / "proj" / "_index.db"
|
||||
global_db_path = temp_paths / "indexes" / "proj" / GlobalSymbolIndex.DEFAULT_DB_NAME
|
||||
source_file = temp_paths / "src" / "x.py"
|
||||
|
||||
source_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
source_file.write_text("class MyClass:\n pass\n", encoding="utf-8")
|
||||
|
||||
global_index = GlobalSymbolIndex(global_db_path, project_id=123)
|
||||
global_index.initialize()
|
||||
|
||||
with DirIndexStore(index_db_path, config=config, global_index=global_index) as store:
|
||||
store.add_file(
|
||||
name=source_file.name,
|
||||
full_path=source_file,
|
||||
content=source_file.read_text(encoding="utf-8"),
|
||||
language="python",
|
||||
symbols=[Symbol(name="MyClass", kind="class", range=(1, 2))],
|
||||
)
|
||||
|
||||
matches = global_index.search("MyClass", kind="class", limit=10)
|
||||
assert len(matches) == 1
|
||||
assert matches[0].file == str(source_file.resolve())
|
||||
|
||||
# Verify all required fields were written.
|
||||
conn = sqlite3.connect(global_db_path)
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT project_id, symbol_name, symbol_kind, file_path, start_line, end_line, index_path
|
||||
FROM global_symbols
|
||||
WHERE project_id=? AND symbol_name=?
|
||||
""",
|
||||
(123, "MyClass"),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
|
||||
assert row is not None
|
||||
assert row[0] == 123
|
||||
assert row[1] == "MyClass"
|
||||
assert row[2] == "class"
|
||||
assert row[3] == str(source_file.resolve())
|
||||
assert row[4] == 1
|
||||
assert row[5] == 2
|
||||
assert row[6] == str(index_db_path.resolve())
|
||||
|
||||
|
||||
def test_chain_search_uses_global_index_fast_path(temp_paths: Path):
|
||||
project_root = temp_paths / "project"
|
||||
project_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
index_root = temp_paths / "indexes"
|
||||
mapper = PathMapper(index_root=index_root)
|
||||
index_db_path = mapper.source_to_index_db(project_root)
|
||||
index_db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
index_db_path.write_text("", encoding="utf-8") # existence is enough for _find_start_index
|
||||
|
||||
registry = RegistryStore(db_path=temp_paths / "registry.db")
|
||||
registry.initialize()
|
||||
project_info = registry.register_project(project_root, mapper.source_to_index_dir(project_root))
|
||||
|
||||
global_db_path = project_info.index_root / GlobalSymbolIndex.DEFAULT_DB_NAME
|
||||
global_index = GlobalSymbolIndex(global_db_path, project_id=project_info.id)
|
||||
global_index.initialize()
|
||||
|
||||
file_path = project_root / "auth.py"
|
||||
global_index.update_file_symbols(
|
||||
file_path=file_path,
|
||||
symbols=[
|
||||
Symbol(name="AuthManager", kind="class", range=(1, 10)),
|
||||
Symbol(name="authenticate", kind="function", range=(12, 20)),
|
||||
],
|
||||
index_path=index_db_path,
|
||||
)
|
||||
|
||||
config = Config(data_dir=temp_paths / "data", global_symbol_index_enabled=True)
|
||||
engine = ChainSearchEngine(registry, mapper, config=config)
|
||||
assert registry.find_by_source_path(str(project_root)) is not None
|
||||
assert registry.find_by_source_path(str(project_root.resolve())) is not None
|
||||
assert global_db_path.exists()
|
||||
assert GlobalSymbolIndex(global_db_path, project_id=project_info.id).search("Auth", limit=10)
|
||||
engine._search_symbols_parallel = MagicMock(side_effect=AssertionError("should not traverse chain"))
|
||||
|
||||
symbols = engine.search_symbols("Auth", project_root)
|
||||
assert any(s.name == "AuthManager" for s in symbols)
|
||||
Reference in New Issue
Block a user