From fc4a9af0cb733e5fc366a478044cc3466784c0ef Mon Sep 17 00:00:00 2001 From: catlog22 Date: Sun, 21 Dec 2025 23:47:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=BC=95=E5=85=A5=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E7=94=9F=E6=88=90=E5=99=A8=E4=BB=A5=E4=BC=98=E5=8C=96=E5=86=85?= =?UTF-8?q?=E5=AD=98=E4=BD=BF=E7=94=A8=EF=BC=8C=E6=94=B9=E8=BF=9B=E5=B5=8C?= =?UTF-8?q?=E5=85=A5=E7=94=9F=E6=88=90=E8=BF=87=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/codexlens/cli/embedding_manager.py | 156 +++++++++++------- 1 file changed, 93 insertions(+), 63 deletions(-) diff --git a/codex-lens/src/codexlens/cli/embedding_manager.py b/codex-lens/src/codexlens/cli/embedding_manager.py index 03263325..514fe74e 100644 --- a/codex-lens/src/codexlens/cli/embedding_manager.py +++ b/codex-lens/src/codexlens/cli/embedding_manager.py @@ -4,8 +4,9 @@ import gc import logging import sqlite3 import time +from itertools import islice from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, Generator, List, Optional, Tuple try: from codexlens.semantic import SEMANTIC_AVAILABLE @@ -23,6 +24,61 @@ logger = logging.getLogger(__name__) EMBEDDING_BATCH_SIZE = 64 # Increased from 8 for better performance +def _generate_chunks_from_cursor( + cursor, + chunker, + path_column: str, + file_batch_size: int, + failed_files: List[Tuple[str, str]], +) -> Generator[Tuple, None, Tuple[int, int]]: + """Generator that yields chunks from database cursor in a streaming fashion. + + This avoids loading all chunks into memory at once, significantly reducing + peak memory usage for large codebases. + + Args: + cursor: SQLite cursor with file data + chunker: Chunker instance for splitting files + path_column: Column name for file path + file_batch_size: Number of files to fetch at a time + failed_files: List to append failed files to + + Yields: + (chunk, file_path) tuples + + Returns: + (total_files_processed, batch_count) after iteration completes + """ + total_files = 0 + batch_count = 0 + + while True: + file_batch = cursor.fetchmany(file_batch_size) + if not file_batch: + break + + batch_count += 1 + + for file_row in file_batch: + file_path = file_row[path_column] + content = file_row["content"] + language = file_row["language"] or "python" + + try: + chunks = chunker.chunk_sliding_window( + content, + file_path=file_path, + language=language + ) + if chunks: + total_files += 1 + for chunk in chunks: + yield (chunk, file_path) + except Exception as e: + logger.error(f"Failed to chunk {file_path}: {e}") + failed_files.append((file_path, str(e))) + + def _get_path_column(conn: sqlite3.Connection) -> str: """Detect whether files table uses 'path' or 'full_path' column. @@ -199,7 +255,9 @@ def generate_embeddings( try: # Initialize embedder (singleton, reused throughout the function) embedder = get_embedder(profile=model_profile) - chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size)) + # skip_token_count=True: Use fast estimation (len/4) instead of expensive tiktoken + # This significantly reduces CPU usage with minimal impact on metadata accuracy + chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size, skip_token_count=True)) if progress_callback: progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)") @@ -238,79 +296,51 @@ def generate_embeddings( progress_callback(f"Processing {total_files} files for embeddings in batches of {FILE_BATCH_SIZE}...") cursor = conn.execute(f"SELECT {path_column}, content, language FROM files") + + # --- STREAMING GENERATOR APPROACH --- + # Instead of accumulating all chunks from 100 files, we use a generator + # that yields chunks on-demand, keeping memory usage low and constant. + chunk_generator = _generate_chunks_from_cursor( + cursor, chunker, path_column, FILE_BATCH_SIZE, failed_files + ) + batch_number = 0 + files_seen = set() while True: - # Fetch a batch of files (streaming, not fetchall) - file_batch = cursor.fetchmany(FILE_BATCH_SIZE) - if not file_batch: + # Get a small batch of chunks from the generator (EMBEDDING_BATCH_SIZE at a time) + chunk_batch = list(islice(chunk_generator, EMBEDDING_BATCH_SIZE)) + if not chunk_batch: break batch_number += 1 - batch_chunks_with_paths = [] - files_in_batch_with_chunks = set() - # Step 1: Chunking for the current file batch - for file_row in file_batch: - file_path = file_row[path_column] - content = file_row["content"] - language = file_row["language"] or "python" + # Track unique files for progress + for _, file_path in chunk_batch: + files_seen.add(file_path) - try: - chunks = chunker.chunk_sliding_window( - content, - file_path=file_path, - language=language - ) - if chunks: - for chunk in chunks: - batch_chunks_with_paths.append((chunk, file_path)) - files_in_batch_with_chunks.add(file_path) - except Exception as e: - logger.error(f"Failed to chunk {file_path}: {e}") - failed_files.append((file_path, str(e))) - - if not batch_chunks_with_paths: - continue - - batch_chunk_count = len(batch_chunks_with_paths) - if progress_callback: - progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks") - - # Step 2: Generate embeddings for this batch (use memory-efficient numpy method) - batch_embeddings = [] + # Generate embeddings directly to numpy (no tolist() conversion) try: - for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE): - batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count) - batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]] - # Use embed_to_numpy() to avoid unnecessary list conversion - embeddings_numpy = embedder.embed_to_numpy(batch_contents) - # Convert to list only for storage (VectorStore expects list format) - embeddings = [emb.tolist() for emb in embeddings_numpy] - batch_embeddings.extend(embeddings) - # Explicit cleanup of intermediate data - del batch_contents, embeddings_numpy + batch_contents = [chunk.content for chunk, _ in chunk_batch] + embeddings_numpy = embedder.embed_to_numpy(batch_contents) + + # Use add_chunks_batch_numpy to avoid numpy->list->numpy roundtrip + vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy) + + total_chunks_created += len(chunk_batch) + total_files_processed = len(files_seen) + + if progress_callback and batch_number % 10 == 0: + progress_callback(f" Batch {batch_number}: {total_chunks_created} chunks, {total_files_processed} files") + + # Cleanup intermediate data + del batch_contents, embeddings_numpy, chunk_batch + except Exception as e: - logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}") - failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) + logger.error(f"Failed to process embedding batch {batch_number}: {str(e)}") + # Continue to next batch instead of failing entirely continue - # Step 3: Assign embeddings to chunks - for (chunk, _), embedding in zip(batch_chunks_with_paths, batch_embeddings): - chunk.embedding = embedding - - # Step 4: Store this batch to database (ANN update deferred in bulk_insert mode) - try: - vector_store.add_chunks_batch(batch_chunks_with_paths) - total_chunks_created += batch_chunk_count - total_files_processed += len(files_in_batch_with_chunks) - except Exception as e: - logger.error(f"Failed to store batch {batch_number}: {str(e)}") - failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) - - # Release batch references (let Python GC handle cleanup naturally) - del batch_chunks_with_paths, batch_embeddings - # Notify before ANN index finalization (happens when bulk_insert context exits) if progress_callback: progress_callback(f"Finalizing index... Building ANN index for {total_chunks_created} chunks")