From 2871950ab8ffa58967065c45ce1b6026044fc584 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Sun, 21 Dec 2025 20:55:45 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=90=91=E9=87=8F?= =?UTF-8?q?=E7=B4=A2=E5=BC=95=E8=BF=9B=E5=BA=A6=E6=98=BE=E7=A4=BA=E8=BF=87?= =?UTF-8?q?=E6=97=A9=E5=AE=8C=E6=88=90=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:FTS 索引完成后立即显示 100%,但嵌入生成仍在后台运行 修复: - codex-lens.ts: 将 "Indexed X files" 阶段从 complete 改为 fts_complete (60%) - codex-lens.ts: 添加嵌入批次和 Finalizing index 阶段解析 - embedding_manager.py: 使用 bulk_insert() 模式延迟 ANN 索引构建 - embedding_manager.py: 添加 "Finalizing index" 进度回调 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- ccw/src/tools/codex-lens.ts | 33 +++- .../src/codexlens/cli/embedding_manager.py | 177 +++++++++--------- 2 files changed, 121 insertions(+), 89 deletions(-) diff --git a/ccw/src/tools/codex-lens.ts b/ccw/src/tools/codex-lens.ts index d6229b6e..4890c9f3 100644 --- a/ccw/src/tools/codex-lens.ts +++ b/ccw/src/tools/codex-lens.ts @@ -414,17 +414,42 @@ function parseProgressLine(line: string): ProgressInfo | null { return { stage: 'complete', message: 'Finalizing...', percent: 95 }; } - // Parse indexed count: "Indexed X files" + // Parse indexed count: "Indexed X files" - FTS complete, but embeddings may follow const indexedMatch = line.match(/Indexed (\d+) files/i); if (indexedMatch) { return { - stage: 'complete', - message: `Indexed ${indexedMatch[1]} files`, - percent: 100, + stage: 'fts_complete', // Not 'complete' - embeddings generation may still be pending + message: `Indexed ${indexedMatch[1]} files, generating embeddings...`, + percent: 60, // FTS done, embeddings starting filesProcessed: parseInt(indexedMatch[1], 10), }; } + // Parse embedding batch progress: "Batch X: N files, M chunks" + const batchMatch = line.match(/Batch (\d+):\s*(\d+) files,\s*(\d+) chunks/i); + if (batchMatch) { + return { + stage: 'embeddings', + message: `Embedding batch ${batchMatch[1]}: ${batchMatch[3]} chunks`, + percent: 70, // Stay at 70% during embedding batches + }; + } + + // Parse embedding progress with file count + const embedProgressMatch = line.match(/Processing (\d+) files/i); + if (embedProgressMatch && line.toLowerCase().includes('embed')) { + return { + stage: 'embeddings', + message: `Processing ${embedProgressMatch[1]} files for embeddings`, + percent: 75, + }; + } + + // Parse finalizing ANN index + if (line.includes('Finalizing index') || line.includes('Building ANN')) { + return { stage: 'finalizing', message: 'Finalizing vector index...', percent: 90 }; + } + return null; } diff --git a/codex-lens/src/codexlens/cli/embedding_manager.py b/codex-lens/src/codexlens/cli/embedding_manager.py index 0fd39dd6..72eb29c4 100644 --- a/codex-lens/src/codexlens/cli/embedding_manager.py +++ b/codex-lens/src/codexlens/cli/embedding_manager.py @@ -222,100 +222,107 @@ def generate_embeddings( try: with VectorStore(index_path) as vector_store: - with sqlite3.connect(index_path) as conn: - conn.row_factory = sqlite3.Row - path_column = _get_path_column(conn) + # Use bulk insert mode for efficient batch ANN index building + # This defers ANN updates until end_bulk_insert() is called + with vector_store.bulk_insert(): + with sqlite3.connect(index_path) as conn: + conn.row_factory = sqlite3.Row + path_column = _get_path_column(conn) - # Get total file count for progress reporting - total_files = conn.execute("SELECT COUNT(*) FROM files").fetchone()[0] - if total_files == 0: - return {"success": False, "error": "No files found in index"} + # Get total file count for progress reporting + total_files = conn.execute("SELECT COUNT(*) FROM files").fetchone()[0] + if total_files == 0: + return {"success": False, "error": "No files found in index"} - if progress_callback: - progress_callback(f"Processing {total_files} files in batches of {FILE_BATCH_SIZE}...") + if progress_callback: + progress_callback(f"Processing {total_files} files in batches of {FILE_BATCH_SIZE}...") - cursor = conn.execute(f"SELECT {path_column}, content, language FROM files") - batch_number = 0 + cursor = conn.execute(f"SELECT {path_column}, content, language FROM files") + batch_number = 0 - while True: - # Fetch a batch of files (streaming, not fetchall) - file_batch = cursor.fetchmany(FILE_BATCH_SIZE) - if not file_batch: - break + while True: + # Fetch a batch of files (streaming, not fetchall) + file_batch = cursor.fetchmany(FILE_BATCH_SIZE) + if not file_batch: + break - batch_number += 1 - batch_chunks_with_paths = [] - files_in_batch_with_chunks = set() + batch_number += 1 + batch_chunks_with_paths = [] + files_in_batch_with_chunks = set() - # Periodic embedder recreation to prevent memory accumulation - if batch_number % EMBEDDER_RECREATION_INTERVAL == 0: + # Periodic embedder recreation to prevent memory accumulation + if batch_number % EMBEDDER_RECREATION_INTERVAL == 0: + if progress_callback: + progress_callback(f" [Memory optimization] Recreating embedder at batch {batch_number}") + clear_embedder_cache() + embedder = get_embedder(profile=model_profile) + gc.collect() + + # Step 1: Chunking for the current file batch + for file_row in file_batch: + file_path = file_row[path_column] + content = file_row["content"] + language = file_row["language"] or "python" + + try: + chunks = chunker.chunk_sliding_window( + content, + file_path=file_path, + language=language + ) + if chunks: + for chunk in chunks: + batch_chunks_with_paths.append((chunk, file_path)) + files_in_batch_with_chunks.add(file_path) + except Exception as e: + logger.error(f"Failed to chunk {file_path}: {e}") + failed_files.append((file_path, str(e))) + + if not batch_chunks_with_paths: + continue + + batch_chunk_count = len(batch_chunks_with_paths) if progress_callback: - progress_callback(f" [Memory optimization] Recreating embedder at batch {batch_number}") - clear_embedder_cache() - embedder = get_embedder(profile=model_profile) + progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks") + + # Step 2: Generate embeddings for this batch (use memory-efficient numpy method) + batch_embeddings = [] + try: + for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE): + batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count) + batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]] + # Use embed_to_numpy() to avoid unnecessary list conversion + embeddings_numpy = embedder.embed_to_numpy(batch_contents) + # Convert to list only for storage (VectorStore expects list format) + embeddings = [emb.tolist() for emb in embeddings_numpy] + batch_embeddings.extend(embeddings) + # Explicit cleanup of intermediate data + del batch_contents, embeddings_numpy + except Exception as e: + logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}") + failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) + continue + + # Step 3: Assign embeddings to chunks + for (chunk, _), embedding in zip(batch_chunks_with_paths, batch_embeddings): + chunk.embedding = embedding + + # Step 4: Store this batch to database (ANN update deferred in bulk_insert mode) + try: + vector_store.add_chunks_batch(batch_chunks_with_paths) + total_chunks_created += batch_chunk_count + total_files_processed += len(files_in_batch_with_chunks) + except Exception as e: + logger.error(f"Failed to store batch {batch_number}: {str(e)}") + failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) + + # Explicit memory cleanup after each batch + del batch_chunks_with_paths, batch_embeddings gc.collect() - # Step 1: Chunking for the current file batch - for file_row in file_batch: - file_path = file_row[path_column] - content = file_row["content"] - language = file_row["language"] or "python" - - try: - chunks = chunker.chunk_sliding_window( - content, - file_path=file_path, - language=language - ) - if chunks: - for chunk in chunks: - batch_chunks_with_paths.append((chunk, file_path)) - files_in_batch_with_chunks.add(file_path) - except Exception as e: - logger.error(f"Failed to chunk {file_path}: {e}") - failed_files.append((file_path, str(e))) - - if not batch_chunks_with_paths: - continue - - batch_chunk_count = len(batch_chunks_with_paths) - if progress_callback: - progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks") - - # Step 2: Generate embeddings for this batch (use memory-efficient numpy method) - batch_embeddings = [] - try: - for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE): - batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count) - batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]] - # Use embed_to_numpy() to avoid unnecessary list conversion - embeddings_numpy = embedder.embed_to_numpy(batch_contents) - # Convert to list only for storage (VectorStore expects list format) - embeddings = [emb.tolist() for emb in embeddings_numpy] - batch_embeddings.extend(embeddings) - # Explicit cleanup of intermediate data - del batch_contents, embeddings_numpy - except Exception as e: - logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}") - failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) - continue - - # Step 3: Assign embeddings to chunks - for (chunk, _), embedding in zip(batch_chunks_with_paths, batch_embeddings): - chunk.embedding = embedding - - # Step 4: Store this batch to database immediately (releases memory) - try: - vector_store.add_chunks_batch(batch_chunks_with_paths) - total_chunks_created += batch_chunk_count - total_files_processed += len(files_in_batch_with_chunks) - except Exception as e: - logger.error(f"Failed to store batch {batch_number}: {str(e)}") - failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) - - # Explicit memory cleanup after each batch - del batch_chunks_with_paths, batch_embeddings - gc.collect() + # Notify before ANN index finalization (happens when bulk_insert context exits) + if progress_callback: + progress_callback(f"Finalizing index... Building ANN index for {total_chunks_created} chunks") except Exception as e: return {"success": False, "error": f"Failed to read or process files: {str(e)}"}