fix: 修复向量索引进度显示过早完成的问题

问题:FTS 索引完成后立即显示 100%,但嵌入生成仍在后台运行

修复:
- codex-lens.ts: 将 "Indexed X files" 阶段从 complete 改为 fts_complete (60%)
- codex-lens.ts: 添加嵌入批次和 Finalizing index 阶段解析
- embedding_manager.py: 使用 bulk_insert() 模式延迟 ANN 索引构建
- embedding_manager.py: 添加 "Finalizing index" 进度回调

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
catlog22
2025-12-21 20:55:45 +08:00
parent 5849f751bc
commit 2871950ab8
2 changed files with 121 additions and 89 deletions

View File

@@ -414,17 +414,42 @@ function parseProgressLine(line: string): ProgressInfo | null {
return { stage: 'complete', message: 'Finalizing...', percent: 95 };
}
// Parse indexed count: "Indexed X files"
// Parse indexed count: "Indexed X files" - FTS complete, but embeddings may follow
const indexedMatch = line.match(/Indexed (\d+) files/i);
if (indexedMatch) {
return {
stage: 'complete',
message: `Indexed ${indexedMatch[1]} files`,
percent: 100,
stage: 'fts_complete', // Not 'complete' - embeddings generation may still be pending
message: `Indexed ${indexedMatch[1]} files, generating embeddings...`,
percent: 60, // FTS done, embeddings starting
filesProcessed: parseInt(indexedMatch[1], 10),
};
}
// Parse embedding batch progress: "Batch X: N files, M chunks"
const batchMatch = line.match(/Batch (\d+):\s*(\d+) files,\s*(\d+) chunks/i);
if (batchMatch) {
return {
stage: 'embeddings',
message: `Embedding batch ${batchMatch[1]}: ${batchMatch[3]} chunks`,
percent: 70, // Stay at 70% during embedding batches
};
}
// Parse embedding progress with file count
const embedProgressMatch = line.match(/Processing (\d+) files/i);
if (embedProgressMatch && line.toLowerCase().includes('embed')) {
return {
stage: 'embeddings',
message: `Processing ${embedProgressMatch[1]} files for embeddings`,
percent: 75,
};
}
// Parse finalizing ANN index
if (line.includes('Finalizing index') || line.includes('Building ANN')) {
return { stage: 'finalizing', message: 'Finalizing vector index...', percent: 90 };
}
return null;
}

View File

@@ -222,100 +222,107 @@ def generate_embeddings(
try:
with VectorStore(index_path) as vector_store:
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
path_column = _get_path_column(conn)
# Use bulk insert mode for efficient batch ANN index building
# This defers ANN updates until end_bulk_insert() is called
with vector_store.bulk_insert():
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
path_column = _get_path_column(conn)
# Get total file count for progress reporting
total_files = conn.execute("SELECT COUNT(*) FROM files").fetchone()[0]
if total_files == 0:
return {"success": False, "error": "No files found in index"}
# Get total file count for progress reporting
total_files = conn.execute("SELECT COUNT(*) FROM files").fetchone()[0]
if total_files == 0:
return {"success": False, "error": "No files found in index"}
if progress_callback:
progress_callback(f"Processing {total_files} files in batches of {FILE_BATCH_SIZE}...")
if progress_callback:
progress_callback(f"Processing {total_files} files in batches of {FILE_BATCH_SIZE}...")
cursor = conn.execute(f"SELECT {path_column}, content, language FROM files")
batch_number = 0
cursor = conn.execute(f"SELECT {path_column}, content, language FROM files")
batch_number = 0
while True:
# Fetch a batch of files (streaming, not fetchall)
file_batch = cursor.fetchmany(FILE_BATCH_SIZE)
if not file_batch:
break
while True:
# Fetch a batch of files (streaming, not fetchall)
file_batch = cursor.fetchmany(FILE_BATCH_SIZE)
if not file_batch:
break
batch_number += 1
batch_chunks_with_paths = []
files_in_batch_with_chunks = set()
batch_number += 1
batch_chunks_with_paths = []
files_in_batch_with_chunks = set()
# Periodic embedder recreation to prevent memory accumulation
if batch_number % EMBEDDER_RECREATION_INTERVAL == 0:
# Periodic embedder recreation to prevent memory accumulation
if batch_number % EMBEDDER_RECREATION_INTERVAL == 0:
if progress_callback:
progress_callback(f" [Memory optimization] Recreating embedder at batch {batch_number}")
clear_embedder_cache()
embedder = get_embedder(profile=model_profile)
gc.collect()
# Step 1: Chunking for the current file batch
for file_row in file_batch:
file_path = file_row[path_column]
content = file_row["content"]
language = file_row["language"] or "python"
try:
chunks = chunker.chunk_sliding_window(
content,
file_path=file_path,
language=language
)
if chunks:
for chunk in chunks:
batch_chunks_with_paths.append((chunk, file_path))
files_in_batch_with_chunks.add(file_path)
except Exception as e:
logger.error(f"Failed to chunk {file_path}: {e}")
failed_files.append((file_path, str(e)))
if not batch_chunks_with_paths:
continue
batch_chunk_count = len(batch_chunks_with_paths)
if progress_callback:
progress_callback(f" [Memory optimization] Recreating embedder at batch {batch_number}")
clear_embedder_cache()
embedder = get_embedder(profile=model_profile)
progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks")
# Step 2: Generate embeddings for this batch (use memory-efficient numpy method)
batch_embeddings = []
try:
for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE):
batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count)
batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]]
# Use embed_to_numpy() to avoid unnecessary list conversion
embeddings_numpy = embedder.embed_to_numpy(batch_contents)
# Convert to list only for storage (VectorStore expects list format)
embeddings = [emb.tolist() for emb in embeddings_numpy]
batch_embeddings.extend(embeddings)
# Explicit cleanup of intermediate data
del batch_contents, embeddings_numpy
except Exception as e:
logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}")
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
continue
# Step 3: Assign embeddings to chunks
for (chunk, _), embedding in zip(batch_chunks_with_paths, batch_embeddings):
chunk.embedding = embedding
# Step 4: Store this batch to database (ANN update deferred in bulk_insert mode)
try:
vector_store.add_chunks_batch(batch_chunks_with_paths)
total_chunks_created += batch_chunk_count
total_files_processed += len(files_in_batch_with_chunks)
except Exception as e:
logger.error(f"Failed to store batch {batch_number}: {str(e)}")
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
# Explicit memory cleanup after each batch
del batch_chunks_with_paths, batch_embeddings
gc.collect()
# Step 1: Chunking for the current file batch
for file_row in file_batch:
file_path = file_row[path_column]
content = file_row["content"]
language = file_row["language"] or "python"
try:
chunks = chunker.chunk_sliding_window(
content,
file_path=file_path,
language=language
)
if chunks:
for chunk in chunks:
batch_chunks_with_paths.append((chunk, file_path))
files_in_batch_with_chunks.add(file_path)
except Exception as e:
logger.error(f"Failed to chunk {file_path}: {e}")
failed_files.append((file_path, str(e)))
if not batch_chunks_with_paths:
continue
batch_chunk_count = len(batch_chunks_with_paths)
if progress_callback:
progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks")
# Step 2: Generate embeddings for this batch (use memory-efficient numpy method)
batch_embeddings = []
try:
for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE):
batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count)
batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]]
# Use embed_to_numpy() to avoid unnecessary list conversion
embeddings_numpy = embedder.embed_to_numpy(batch_contents)
# Convert to list only for storage (VectorStore expects list format)
embeddings = [emb.tolist() for emb in embeddings_numpy]
batch_embeddings.extend(embeddings)
# Explicit cleanup of intermediate data
del batch_contents, embeddings_numpy
except Exception as e:
logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}")
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
continue
# Step 3: Assign embeddings to chunks
for (chunk, _), embedding in zip(batch_chunks_with_paths, batch_embeddings):
chunk.embedding = embedding
# Step 4: Store this batch to database immediately (releases memory)
try:
vector_store.add_chunks_batch(batch_chunks_with_paths)
total_chunks_created += batch_chunk_count
total_files_processed += len(files_in_batch_with_chunks)
except Exception as e:
logger.error(f"Failed to store batch {batch_number}: {str(e)}")
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
# Explicit memory cleanup after each batch
del batch_chunks_with_paths, batch_embeddings
gc.collect()
# Notify before ANN index finalization (happens when bulk_insert context exits)
if progress_callback:
progress_callback(f"Finalizing index... Building ANN index for {total_chunks_created} chunks")
except Exception as e:
return {"success": False, "error": f"Failed to read or process files: {str(e)}"}