feat: 引入流式生成器以优化内存使用,改进嵌入生成过程

This commit is contained in:
catlog22
2025-12-21 23:47:29 +08:00
parent fa64e11a77
commit fc4a9af0cb

View File

@@ -4,8 +4,9 @@ import gc
import logging import logging
import sqlite3 import sqlite3
import time import time
from itertools import islice
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, Generator, List, Optional, Tuple
try: try:
from codexlens.semantic import SEMANTIC_AVAILABLE from codexlens.semantic import SEMANTIC_AVAILABLE
@@ -23,6 +24,61 @@ logger = logging.getLogger(__name__)
EMBEDDING_BATCH_SIZE = 64 # Increased from 8 for better performance EMBEDDING_BATCH_SIZE = 64 # Increased from 8 for better performance
def _generate_chunks_from_cursor(
cursor,
chunker,
path_column: str,
file_batch_size: int,
failed_files: List[Tuple[str, str]],
) -> Generator[Tuple, None, Tuple[int, int]]:
"""Generator that yields chunks from database cursor in a streaming fashion.
This avoids loading all chunks into memory at once, significantly reducing
peak memory usage for large codebases.
Args:
cursor: SQLite cursor with file data
chunker: Chunker instance for splitting files
path_column: Column name for file path
file_batch_size: Number of files to fetch at a time
failed_files: List to append failed files to
Yields:
(chunk, file_path) tuples
Returns:
(total_files_processed, batch_count) after iteration completes
"""
total_files = 0
batch_count = 0
while True:
file_batch = cursor.fetchmany(file_batch_size)
if not file_batch:
break
batch_count += 1
for file_row in file_batch:
file_path = file_row[path_column]
content = file_row["content"]
language = file_row["language"] or "python"
try:
chunks = chunker.chunk_sliding_window(
content,
file_path=file_path,
language=language
)
if chunks:
total_files += 1
for chunk in chunks:
yield (chunk, file_path)
except Exception as e:
logger.error(f"Failed to chunk {file_path}: {e}")
failed_files.append((file_path, str(e)))
def _get_path_column(conn: sqlite3.Connection) -> str: def _get_path_column(conn: sqlite3.Connection) -> str:
"""Detect whether files table uses 'path' or 'full_path' column. """Detect whether files table uses 'path' or 'full_path' column.
@@ -199,7 +255,9 @@ def generate_embeddings(
try: try:
# Initialize embedder (singleton, reused throughout the function) # Initialize embedder (singleton, reused throughout the function)
embedder = get_embedder(profile=model_profile) embedder = get_embedder(profile=model_profile)
chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size)) # skip_token_count=True: Use fast estimation (len/4) instead of expensive tiktoken
# This significantly reduces CPU usage with minimal impact on metadata accuracy
chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size, skip_token_count=True))
if progress_callback: if progress_callback:
progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)") progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)")
@@ -238,79 +296,51 @@ def generate_embeddings(
progress_callback(f"Processing {total_files} files for embeddings in batches of {FILE_BATCH_SIZE}...") progress_callback(f"Processing {total_files} files for embeddings in batches of {FILE_BATCH_SIZE}...")
cursor = conn.execute(f"SELECT {path_column}, content, language FROM files") cursor = conn.execute(f"SELECT {path_column}, content, language FROM files")
# --- STREAMING GENERATOR APPROACH ---
# Instead of accumulating all chunks from 100 files, we use a generator
# that yields chunks on-demand, keeping memory usage low and constant.
chunk_generator = _generate_chunks_from_cursor(
cursor, chunker, path_column, FILE_BATCH_SIZE, failed_files
)
batch_number = 0 batch_number = 0
files_seen = set()
while True: while True:
# Fetch a batch of files (streaming, not fetchall) # Get a small batch of chunks from the generator (EMBEDDING_BATCH_SIZE at a time)
file_batch = cursor.fetchmany(FILE_BATCH_SIZE) chunk_batch = list(islice(chunk_generator, EMBEDDING_BATCH_SIZE))
if not file_batch: if not chunk_batch:
break break
batch_number += 1 batch_number += 1
batch_chunks_with_paths = []
files_in_batch_with_chunks = set()
# Step 1: Chunking for the current file batch # Track unique files for progress
for file_row in file_batch: for _, file_path in chunk_batch:
file_path = file_row[path_column] files_seen.add(file_path)
content = file_row["content"]
language = file_row["language"] or "python"
# Generate embeddings directly to numpy (no tolist() conversion)
try: try:
chunks = chunker.chunk_sliding_window( batch_contents = [chunk.content for chunk, _ in chunk_batch]
content,
file_path=file_path,
language=language
)
if chunks:
for chunk in chunks:
batch_chunks_with_paths.append((chunk, file_path))
files_in_batch_with_chunks.add(file_path)
except Exception as e:
logger.error(f"Failed to chunk {file_path}: {e}")
failed_files.append((file_path, str(e)))
if not batch_chunks_with_paths:
continue
batch_chunk_count = len(batch_chunks_with_paths)
if progress_callback:
progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks")
# Step 2: Generate embeddings for this batch (use memory-efficient numpy method)
batch_embeddings = []
try:
for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE):
batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count)
batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]]
# Use embed_to_numpy() to avoid unnecessary list conversion
embeddings_numpy = embedder.embed_to_numpy(batch_contents) embeddings_numpy = embedder.embed_to_numpy(batch_contents)
# Convert to list only for storage (VectorStore expects list format)
embeddings = [emb.tolist() for emb in embeddings_numpy] # Use add_chunks_batch_numpy to avoid numpy->list->numpy roundtrip
batch_embeddings.extend(embeddings) vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy)
# Explicit cleanup of intermediate data
del batch_contents, embeddings_numpy total_chunks_created += len(chunk_batch)
total_files_processed = len(files_seen)
if progress_callback and batch_number % 10 == 0:
progress_callback(f" Batch {batch_number}: {total_chunks_created} chunks, {total_files_processed} files")
# Cleanup intermediate data
del batch_contents, embeddings_numpy, chunk_batch
except Exception as e: except Exception as e:
logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}") logger.error(f"Failed to process embedding batch {batch_number}: {str(e)}")
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) # Continue to next batch instead of failing entirely
continue continue
# Step 3: Assign embeddings to chunks
for (chunk, _), embedding in zip(batch_chunks_with_paths, batch_embeddings):
chunk.embedding = embedding
# Step 4: Store this batch to database (ANN update deferred in bulk_insert mode)
try:
vector_store.add_chunks_batch(batch_chunks_with_paths)
total_chunks_created += batch_chunk_count
total_files_processed += len(files_in_batch_with_chunks)
except Exception as e:
logger.error(f"Failed to store batch {batch_number}: {str(e)}")
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
# Release batch references (let Python GC handle cleanup naturally)
del batch_chunks_with_paths, batch_embeddings
# Notify before ANN index finalization (happens when bulk_insert context exits) # Notify before ANN index finalization (happens when bulk_insert context exits)
if progress_callback: if progress_callback:
progress_callback(f"Finalizing index... Building ANN index for {total_chunks_created} chunks") progress_callback(f"Finalizing index... Building ANN index for {total_chunks_created} chunks")