refactor: 优化嵌入生成过程,调整批处理大小和内存管理策略

This commit is contained in:
catlog22
2025-12-21 23:37:34 +08:00
parent 210f0f1012
commit fa64e11a77
2 changed files with 15 additions and 21 deletions

View File

@@ -410,9 +410,8 @@ function parseProgressLine(line: string): ProgressInfo | null {
if (line.includes('Generating embeddings') || line.includes('Creating embeddings')) {
return { stage: 'embeddings', message: 'Generating embeddings...', percent: 70 };
}
if (line.includes('Finalizing') || line.includes('Complete')) {
return { stage: 'complete', message: 'Finalizing...', percent: 95 };
}
// Note: "Finalizing index" and "Building ANN" are handled separately below
// Only match generic "Complete" here (not "Finalizing" which has specific handlers)
// Parse indexed count: "Indexed X files" - FTS complete, but embeddings may follow
const indexedMatch = line.match(/Indexed (\d+) files/i);
@@ -460,6 +459,11 @@ function parseProgressLine(line: string): ProgressInfo | null {
};
}
// Parse generic completion (but not "Embeddings complete" which is handled above)
if (line.includes('Complete') && !line.toLowerCase().includes('embeddings complete')) {
return { stage: 'complete', message: 'Complete', percent: 98 };
}
return null;
}

View File

@@ -18,8 +18,9 @@ except ImportError:
logger = logging.getLogger(__name__)
# Periodic embedder recreation interval to prevent memory accumulation
EMBEDDER_RECREATION_INTERVAL = 10 # Recreate embedder every N batches
# Embedding batch size - larger values improve throughput on modern hardware
# Default 64 balances memory usage and GPU/CPU utilization
EMBEDDING_BATCH_SIZE = 64 # Increased from 8 for better performance
def _get_path_column(conn: sqlite3.Connection) -> str:
@@ -196,13 +197,12 @@ def generate_embeddings(
# Initialize components
try:
# Initialize embedder (will be periodically recreated to prevent memory leaks)
# Initialize embedder (singleton, reused throughout the function)
embedder = get_embedder(profile=model_profile)
chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size))
if progress_callback:
progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)")
progress_callback(f"Memory optimization: Embedder will be recreated every {EMBEDDER_RECREATION_INTERVAL} batches")
except Exception as e:
return {
@@ -210,15 +210,14 @@ def generate_embeddings(
"error": f"Failed to initialize components: {str(e)}",
}
# --- MEMORY-OPTIMIZED STREAMING PROCESSING ---
# Process files in small batches to control memory usage
# This keeps peak memory under 2GB regardless of project size
# --- STREAMING PROCESSING ---
# Process files in batches to control memory usage
start_time = time.time()
failed_files = []
total_chunks_created = 0
total_files_processed = 0
FILE_BATCH_SIZE = 100 # Process 100 files at a time
EMBEDDING_BATCH_SIZE = 8 # jina-embeddings-v2-base-code needs small batches
# EMBEDDING_BATCH_SIZE is defined at module level (default: 64)
try:
with VectorStore(index_path) as vector_store:
@@ -251,14 +250,6 @@ def generate_embeddings(
batch_chunks_with_paths = []
files_in_batch_with_chunks = set()
# Periodic embedder recreation to prevent memory accumulation
if batch_number % EMBEDDER_RECREATION_INTERVAL == 0:
if progress_callback:
progress_callback(f" [Memory optimization] Recreating embedder at batch {batch_number}")
clear_embedder_cache()
embedder = get_embedder(profile=model_profile)
gc.collect()
# Step 1: Chunking for the current file batch
for file_row in file_batch:
file_path = file_row[path_column]
@@ -317,9 +308,8 @@ def generate_embeddings(
logger.error(f"Failed to store batch {batch_number}: {str(e)}")
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
# Explicit memory cleanup after each batch
# Release batch references (let Python GC handle cleanup naturally)
del batch_chunks_with_paths, batch_embeddings
gc.collect()
# Notify before ANN index finalization (happens when bulk_insert context exits)
if progress_callback: