diff --git a/codex-lens/src/codexlens/cli/embedding_manager.py b/codex-lens/src/codexlens/cli/embedding_manager.py index ac658c19..0fd39dd6 100644 --- a/codex-lens/src/codexlens/cli/embedding_manager.py +++ b/codex-lens/src/codexlens/cli/embedding_manager.py @@ -1,5 +1,6 @@ """Embedding Manager - Manage semantic embeddings for code indexes.""" +import gc import logging import sqlite3 import time @@ -9,7 +10,7 @@ from typing import Dict, List, Optional try: from codexlens.semantic import SEMANTIC_AVAILABLE if SEMANTIC_AVAILABLE: - from codexlens.semantic.embedder import Embedder, get_embedder + from codexlens.semantic.embedder import Embedder, get_embedder, clear_embedder_cache from codexlens.semantic.vector_store import VectorStore from codexlens.semantic.chunker import Chunker, ChunkConfig except ImportError: @@ -17,6 +18,9 @@ except ImportError: logger = logging.getLogger(__name__) +# Periodic embedder recreation interval to prevent memory accumulation +EMBEDDER_RECREATION_INTERVAL = 10 # Recreate embedder every N batches + def _get_path_column(conn: sqlite3.Connection) -> str: """Detect whether files table uses 'path' or 'full_path' column. @@ -192,12 +196,13 @@ def generate_embeddings( # Initialize components try: - # Use cached embedder (singleton) for performance + # Initialize embedder (will be periodically recreated to prevent memory leaks) embedder = get_embedder(profile=model_profile) chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size)) if progress_callback: progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)") + progress_callback(f"Memory optimization: Embedder will be recreated every {EMBEDDER_RECREATION_INTERVAL} batches") except Exception as e: return { @@ -242,6 +247,14 @@ def generate_embeddings( batch_chunks_with_paths = [] files_in_batch_with_chunks = set() + # Periodic embedder recreation to prevent memory accumulation + if batch_number % EMBEDDER_RECREATION_INTERVAL == 0: + if progress_callback: + progress_callback(f" [Memory optimization] Recreating embedder at batch {batch_number}") + clear_embedder_cache() + embedder = get_embedder(profile=model_profile) + gc.collect() + # Step 1: Chunking for the current file batch for file_row in file_batch: file_path = file_row[path_column] @@ -269,14 +282,19 @@ def generate_embeddings( if progress_callback: progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks") - # Step 2: Generate embeddings for this batch + # Step 2: Generate embeddings for this batch (use memory-efficient numpy method) batch_embeddings = [] try: for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE): batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count) batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]] - embeddings = embedder.embed(batch_contents) + # Use embed_to_numpy() to avoid unnecessary list conversion + embeddings_numpy = embedder.embed_to_numpy(batch_contents) + # Convert to list only for storage (VectorStore expects list format) + embeddings = [emb.tolist() for emb in embeddings_numpy] batch_embeddings.extend(embeddings) + # Explicit cleanup of intermediate data + del batch_contents, embeddings_numpy except Exception as e: logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}") failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) @@ -295,7 +313,9 @@ def generate_embeddings( logger.error(f"Failed to store batch {batch_number}: {str(e)}") failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) - # Memory is released here as batch_chunks_with_paths and batch_embeddings go out of scope + # Explicit memory cleanup after each batch + del batch_chunks_with_paths, batch_embeddings + gc.collect() except Exception as e: return {"success": False, "error": f"Failed to read or process files: {str(e)}"} diff --git a/codex-lens/src/codexlens/semantic/ann_index.py b/codex-lens/src/codexlens/semantic/ann_index.py index 90c5fe30..85f0e40d 100644 --- a/codex-lens/src/codexlens/semantic/ann_index.py +++ b/codex-lens/src/codexlens/semantic/ann_index.py @@ -13,6 +13,7 @@ Key features: from __future__ import annotations +import logging import threading from pathlib import Path from typing import List, Optional, Tuple @@ -24,6 +25,8 @@ from . import SEMANTIC_AVAILABLE if SEMANTIC_AVAILABLE: import numpy as np +logger = logging.getLogger(__name__) + # Try to import hnswlib (optional dependency) try: import hnswlib @@ -48,16 +51,26 @@ class ANNIndex: - ef: 50 (search width during query - higher = better recall) """ - def __init__(self, index_path: Path, dim: int) -> None: + def __init__( + self, + index_path: Path, + dim: int, + initial_capacity: int = 50000, + auto_save: bool = False, + expansion_threshold: float = 0.8, + ) -> None: """Initialize ANN index. Args: index_path: Path to SQLite database (index will be saved as _vectors.hnsw) dim: Dimension of embedding vectors + initial_capacity: Initial maximum elements capacity (default: 50000) + auto_save: Whether to automatically save index after operations (default: False) + expansion_threshold: Capacity threshold to trigger auto-expansion (default: 0.8) Raises: ImportError: If required dependencies are not available - ValueError: If dimension is invalid + ValueError: If dimension or capacity is invalid """ if not SEMANTIC_AVAILABLE: raise ImportError( @@ -74,6 +87,14 @@ class ANNIndex: if dim <= 0: raise ValueError(f"Invalid dimension: {dim}") + if initial_capacity <= 0: + raise ValueError(f"Invalid initial capacity: {initial_capacity}") + + if not 0.0 < expansion_threshold < 1.0: + raise ValueError( + f"Invalid expansion threshold: {expansion_threshold}. Must be between 0 and 1." + ) + self.index_path = Path(index_path) self.dim = dim @@ -89,14 +110,23 @@ class ANNIndex: self.ef_construction = 200 # Build-time search width (higher = better quality) self.ef = 50 # Query-time search width (higher = better recall) + # Memory management parameters + self._auto_save = auto_save + self._expansion_threshold = expansion_threshold + # Thread safety self._lock = threading.RLock() # HNSW index instance self._index: Optional[hnswlib.Index] = None - self._max_elements = 1000000 # Initial capacity (auto-resizes) + self._max_elements = initial_capacity # Initial capacity (reduced from 1M to 50K) self._current_count = 0 # Track number of vectors + logger.info( + f"Initialized ANNIndex with capacity={initial_capacity}, " + f"auto_save={auto_save}, expansion_threshold={expansion_threshold}" + ) + def _ensure_index(self) -> None: """Ensure HNSW index is initialized (lazy initialization).""" if self._index is None: @@ -108,6 +138,33 @@ class ANNIndex: ) self._index.set_ef(self.ef) self._current_count = 0 + logger.debug(f"Created new HNSW index with capacity {self._max_elements}") + + def _auto_expand_if_needed(self, additional_count: int) -> None: + """Auto-expand index capacity if threshold is reached. + + Args: + additional_count: Number of vectors to be added + + Note: + This is called internally by add_vectors and is thread-safe. + """ + usage_ratio = (self._current_count + additional_count) / self._max_elements + + if usage_ratio >= self._expansion_threshold: + # Calculate new capacity (2x current or enough to fit new vectors) + new_capacity = max( + self._max_elements * 2, + self._current_count + additional_count, + ) + + logger.info( + f"Expanding index capacity: {self._max_elements} -> {new_capacity} " + f"(usage: {usage_ratio:.1%}, threshold: {self._expansion_threshold:.1%})" + ) + + self._index.resize_index(new_capacity) + self._max_elements = new_capacity def add_vectors(self, ids: List[int], vectors: np.ndarray) -> None: """Add vectors to the index. @@ -137,14 +194,8 @@ class ANNIndex: try: self._ensure_index() - # Resize index if needed - if self._current_count + len(ids) > self._max_elements: - new_max = max( - self._max_elements * 2, - self._current_count + len(ids) - ) - self._index.resize_index(new_max) - self._max_elements = new_max + # Auto-expand if threshold reached + self._auto_expand_if_needed(len(ids)) # Ensure vectors are C-contiguous float32 (hnswlib requirement) if not vectors.flags['C_CONTIGUOUS'] or vectors.dtype != np.float32: @@ -154,6 +205,15 @@ class ANNIndex: self._index.add_items(vectors, ids) self._current_count += len(ids) + logger.debug( + f"Added {len(ids)} vectors to index " + f"(total: {self._current_count}/{self._max_elements})" + ) + + # Auto-save if enabled + if self._auto_save: + self.save() + except Exception as e: raise StorageError(f"Failed to add vectors to ANN index: {e}") @@ -178,13 +238,21 @@ class ANNIndex: return # Nothing to remove # Mark vectors as deleted + deleted_count = 0 for vec_id in ids: try: self._index.mark_deleted(vec_id) + deleted_count += 1 except RuntimeError: # ID not found - ignore (idempotent deletion) pass + logger.debug(f"Marked {deleted_count}/{len(ids)} vectors as deleted") + + # Auto-save if enabled + if self._auto_save and deleted_count > 0: + self.save() + except Exception as e: raise StorageError(f"Failed to remove vectors from ANN index: {e}") @@ -248,6 +316,7 @@ class ANNIndex: with self._lock: try: if self._index is None or self._current_count == 0: + logger.debug("Skipping save: index is empty") return # Nothing to save # Ensure parent directory exists @@ -256,6 +325,11 @@ class ANNIndex: # Save index self._index.save_index(str(self.hnsw_path)) + logger.debug( + f"Saved index to {self.hnsw_path} " + f"({self._current_count} vectors, capacity: {self._max_elements})" + ) + except Exception as e: raise StorageError(f"Failed to save ANN index: {e}") @@ -271,20 +345,28 @@ class ANNIndex: with self._lock: try: if not self.hnsw_path.exists(): + logger.debug(f"Index file not found: {self.hnsw_path}") return False # Index file doesn't exist (not an error) # Create fresh index object for loading (don't call init_index first) self._index = hnswlib.Index(space=self.space, dim=self.dim) # Load index from disk + # Note: max_elements here is just for initial allocation, can expand later self._index.load_index(str(self.hnsw_path), max_elements=self._max_elements) - # Update count from loaded index + # Update count and capacity from loaded index self._current_count = self._index.get_current_count() + self._max_elements = self._index.get_max_elements() # Set query-time ef parameter self._index.set_ef(self.ef) + logger.info( + f"Loaded index from {self.hnsw_path} " + f"({self._current_count} vectors, capacity: {self._max_elements})" + ) + return True except Exception as e: @@ -299,6 +381,28 @@ class ANNIndex: with self._lock: return self._current_count + @property + def capacity(self) -> int: + """Get current maximum capacity of the index. + + Returns: + Maximum number of vectors the index can hold before expansion + """ + with self._lock: + return self._max_elements + + @property + def usage_ratio(self) -> float: + """Get current usage ratio (count / capacity). + + Returns: + Usage ratio between 0.0 and 1.0 + """ + with self._lock: + if self._max_elements == 0: + return 0.0 + return self._current_count / self._max_elements + @property def is_loaded(self) -> bool: """Check if index is loaded and ready for use. diff --git a/codex-lens/src/codexlens/semantic/chunker.py b/codex-lens/src/codexlens/semantic/chunker.py index a1df4686..8846ebca 100644 --- a/codex-lens/src/codexlens/semantic/chunker.py +++ b/codex-lens/src/codexlens/semantic/chunker.py @@ -1,4 +1,29 @@ -"""Code chunking strategies for semantic search.""" +"""Code chunking strategies for semantic search. + +This module provides various chunking strategies for breaking down source code +into semantic chunks suitable for embedding and search. + +Lightweight Mode: + The ChunkConfig supports a `skip_token_count` option for performance optimization. + When enabled, token counting uses a fast character-based estimation (char/4) + instead of expensive tiktoken encoding. + + Use cases for lightweight mode: + - Large-scale indexing where speed is critical + - Scenarios where approximate token counts are acceptable + - Memory-constrained environments + - Initial prototyping and development + + Example: + # Default mode (accurate tiktoken encoding) + config = ChunkConfig() + chunker = Chunker(config) + + # Lightweight mode (fast char/4 estimation) + config = ChunkConfig(skip_token_count=True) + chunker = Chunker(config) + chunks = chunker.chunk_file(content, symbols, path, language) +""" from __future__ import annotations @@ -17,6 +42,7 @@ class ChunkConfig: overlap: int = 100 # Overlap for sliding window strategy: str = "auto" # Chunking strategy: auto, symbol, sliding_window, hybrid min_chunk_size: int = 50 # Minimum chunk size + skip_token_count: bool = False # Skip expensive token counting (use char/4 estimate) class Chunker: @@ -26,6 +52,23 @@ class Chunker: self.config = config or ChunkConfig() self._tokenizer = get_default_tokenizer() + def _estimate_token_count(self, text: str) -> int: + """Estimate token count based on config. + + If skip_token_count is True, uses character-based estimation (char/4). + Otherwise, uses accurate tiktoken encoding. + + Args: + text: Text to count tokens for + + Returns: + Estimated token count + """ + if self.config.skip_token_count: + # Fast character-based estimation: ~4 chars per token + return max(1, len(text) // 4) + return self._tokenizer.count_tokens(text) + def chunk_by_symbol( self, content: str, @@ -63,7 +106,7 @@ class Chunker: if symbol_token_counts and symbol.name in symbol_token_counts: token_count = symbol_token_counts[symbol.name] else: - token_count = self._tokenizer.count_tokens(chunk_content) + token_count = self._estimate_token_count(chunk_content) chunks.append(SemanticChunk( content=chunk_content, @@ -122,7 +165,7 @@ class Chunker: chunk_content = "".join(lines[start:end]) if len(chunk_content.strip()) >= self.config.min_chunk_size: - token_count = self._tokenizer.count_tokens(chunk_content) + token_count = self._estimate_token_count(chunk_content) # Calculate correct line numbers if line_mapping: @@ -346,14 +389,14 @@ class HybridChunker: symbol_token_counts: Optional dict mapping symbol names to token counts """ chunks: List[SemanticChunk] = [] - tokenizer = get_default_tokenizer() # Step 1: Extract docstrings as dedicated chunks docstrings = self.docstring_extractor.extract_docstrings(content, language) for docstring_content, start_line, end_line in docstrings: if len(docstring_content.strip()) >= self.config.min_chunk_size: - token_count = tokenizer.count_tokens(docstring_content) + # Use base chunker's token estimation method + token_count = self.base_chunker._estimate_token_count(docstring_content) chunks.append(SemanticChunk( content=docstring_content, embedding=None, diff --git a/codex-lens/src/codexlens/semantic/embedder.py b/codex-lens/src/codexlens/semantic/embedder.py index bd3a903c..2cf20236 100644 --- a/codex-lens/src/codexlens/semantic/embedder.py +++ b/codex-lens/src/codexlens/semantic/embedder.py @@ -2,9 +2,12 @@ from __future__ import annotations +import gc import threading from typing import Dict, Iterable, List, Optional +import numpy as np + from . import SEMANTIC_AVAILABLE @@ -47,10 +50,20 @@ def get_embedder(profile: str = "code") -> "Embedder": def clear_embedder_cache() -> None: - """Clear the embedder cache (useful for testing or memory management).""" + """Clear the embedder cache and release ONNX resources. + + This method ensures proper cleanup of ONNX model resources to prevent + memory leaks when embedders are no longer needed. + """ global _embedder_cache with _cache_lock: + # Release ONNX resources before clearing cache + for embedder in _embedder_cache.values(): + if embedder._model is not None: + del embedder._model + embedder._model = None _embedder_cache.clear() + gc.collect() class Embedder: @@ -128,6 +141,10 @@ class Embedder: Returns: List of embedding vectors (each is a list of floats). + + Note: + This method converts numpy arrays to Python lists for backward compatibility. + For memory-efficient processing, use embed_to_numpy() instead. """ self._load_model() @@ -139,6 +156,30 @@ class Embedder: embeddings = list(self._model.embed(texts)) return [emb.tolist() for emb in embeddings] + def embed_to_numpy(self, texts: str | Iterable[str]) -> np.ndarray: + """Generate embeddings for one or more texts (returns numpy arrays). + + This method is more memory-efficient than embed() as it avoids converting + numpy arrays to Python lists, which can significantly reduce memory usage + during batch processing. + + Args: + texts: Single text or iterable of texts to embed. + + Returns: + numpy.ndarray of shape (n_texts, embedding_dim) containing embeddings. + """ + self._load_model() + + if isinstance(texts, str): + texts = [texts] + else: + texts = list(texts) + + # Return embeddings as numpy array directly (no .tolist() conversion) + embeddings = list(self._model.embed(texts)) + return np.array(embeddings) + def embed_single(self, text: str) -> List[float]: """Generate embedding for a single text.""" return self.embed(text)[0] diff --git a/codex-lens/src/codexlens/semantic/vector_store.py b/codex-lens/src/codexlens/semantic/vector_store.py index fbcfbfca..0299f68b 100644 --- a/codex-lens/src/codexlens/semantic/vector_store.py +++ b/codex-lens/src/codexlens/semantic/vector_store.py @@ -61,6 +61,7 @@ class VectorStore: - NumPy vectorized operations instead of Python loops (fallback) - Lazy content loading - only fetch full content for top-k results - Thread-safe cache invalidation + - Bulk insert mode for efficient batch operations """ # Default embedding dimension (used when creating new index) @@ -88,6 +89,11 @@ class VectorStore: self._ann_dim: Optional[int] = None self._ann_write_lock = threading.Lock() # Protects ANN index modifications + # Bulk insert mode tracking + self._bulk_insert_mode: bool = False + self._bulk_insert_ids: List[int] = [] + self._bulk_insert_embeddings: List[np.ndarray] = [] + self._init_schema() self._init_ann_index() @@ -395,7 +401,10 @@ class VectorStore: return ids def add_chunks_batch( - self, chunks_with_paths: List[Tuple[SemanticChunk, str]] + self, + chunks_with_paths: List[Tuple[SemanticChunk, str]], + update_ann: bool = True, + auto_save_ann: bool = True, ) -> List[int]: """Batch insert chunks from multiple files in a single transaction. @@ -403,6 +412,9 @@ class VectorStore: Args: chunks_with_paths: List of (chunk, file_path) tuples + update_ann: If True, update ANN index with new vectors (default: True) + auto_save_ann: If True, save ANN index after update (default: True). + Set to False for bulk inserts to reduce I/O overhead. Returns: List of inserted chunk IDs @@ -416,7 +428,11 @@ class VectorStore: for chunk, file_path in chunks_with_paths: if chunk.embedding is None: raise ValueError("All chunks must have embeddings") - embedding_arr = np.array(chunk.embedding, dtype=np.float32) + # Optimize: avoid repeated np.array() if already numpy + if isinstance(chunk.embedding, np.ndarray): + embedding_arr = chunk.embedding.astype(np.float32) + else: + embedding_arr = np.array(chunk.embedding, dtype=np.float32) embedding_blob = embedding_arr.tobytes() metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None batch_data.append((file_path, chunk.content, embedding_blob, metadata_json)) @@ -439,20 +455,182 @@ class VectorStore: # Calculate inserted IDs based on starting ID ids = list(range(start_id, start_id + len(chunks_with_paths))) - # Add to ANN index - if embeddings_list and self._ensure_ann_index(len(embeddings_list[0])): - with self._ann_write_lock: - try: - embeddings_matrix = np.vstack(embeddings_list) - self._ann_index.add_vectors(ids, embeddings_matrix) - self._ann_index.save() - except Exception as e: - logger.warning("Failed to add batch to ANN index: %s", e) + # Handle ANN index updates + if embeddings_list and update_ann and self._ensure_ann_index(len(embeddings_list[0])): + # In bulk insert mode, accumulate for later batch update + if self._bulk_insert_mode: + self._bulk_insert_ids.extend(ids) + self._bulk_insert_embeddings.extend(embeddings_list) + else: + # Normal mode: update immediately + with self._ann_write_lock: + try: + embeddings_matrix = np.vstack(embeddings_list) + self._ann_index.add_vectors(ids, embeddings_matrix) + if auto_save_ann: + self._ann_index.save() + except Exception as e: + logger.warning("Failed to add batch to ANN index: %s", e) # Invalidate cache after modification self._invalidate_cache() return ids + def add_chunks_batch_numpy( + self, + chunks_with_paths: List[Tuple[SemanticChunk, str]], + embeddings_matrix: np.ndarray, + update_ann: bool = True, + auto_save_ann: bool = True, + ) -> List[int]: + """Batch insert chunks with pre-computed numpy embeddings matrix. + + This method accepts embeddings as a numpy matrix to avoid list->array conversions. + Useful when embeddings are already in numpy format from batch encoding. + + Args: + chunks_with_paths: List of (chunk, file_path) tuples (embeddings can be None) + embeddings_matrix: Pre-computed embeddings as (N, D) numpy array + update_ann: If True, update ANN index with new vectors (default: True) + auto_save_ann: If True, save ANN index after update (default: True) + + Returns: + List of inserted chunk IDs + """ + if not chunks_with_paths: + return [] + + if len(chunks_with_paths) != embeddings_matrix.shape[0]: + raise ValueError( + f"Mismatch: {len(chunks_with_paths)} chunks but " + f"{embeddings_matrix.shape[0]} embeddings" + ) + + # Ensure float32 format + embeddings_matrix = embeddings_matrix.astype(np.float32) + + # Prepare batch data + batch_data = [] + for i, (chunk, file_path) in enumerate(chunks_with_paths): + embedding_arr = embeddings_matrix[i] + embedding_blob = embedding_arr.tobytes() + metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None + batch_data.append((file_path, chunk.content, embedding_blob, metadata_json)) + + # Batch insert to SQLite in single transaction + with sqlite3.connect(self.db_path) as conn: + # Get starting ID before insert + row = conn.execute("SELECT MAX(id) FROM semantic_chunks").fetchone() + start_id = (row[0] or 0) + 1 + + conn.executemany( + """ + INSERT INTO semantic_chunks (file_path, content, embedding, metadata) + VALUES (?, ?, ?, ?) + """, + batch_data + ) + conn.commit() + # Calculate inserted IDs based on starting ID + ids = list(range(start_id, start_id + len(chunks_with_paths))) + + # Handle ANN index updates + if update_ann and self._ensure_ann_index(embeddings_matrix.shape[1]): + # In bulk insert mode, accumulate for later batch update + if self._bulk_insert_mode: + self._bulk_insert_ids.extend(ids) + # Split matrix into individual arrays for accumulation + self._bulk_insert_embeddings.extend([embeddings_matrix[i] for i in range(len(ids))]) + else: + # Normal mode: update immediately + with self._ann_write_lock: + try: + self._ann_index.add_vectors(ids, embeddings_matrix) + if auto_save_ann: + self._ann_index.save() + except Exception as e: + logger.warning("Failed to add batch to ANN index: %s", e) + + # Invalidate cache after modification + self._invalidate_cache() + return ids + + def begin_bulk_insert(self) -> None: + """Begin bulk insert mode - disable ANN auto-update for better performance. + + Usage: + store.begin_bulk_insert() + try: + for batch in batches: + store.add_chunks_batch(batch, auto_save_ann=False) + finally: + store.end_bulk_insert() + + Or use context manager: + with store.bulk_insert(): + for batch in batches: + store.add_chunks_batch(batch) + """ + self._bulk_insert_mode = True + self._bulk_insert_ids.clear() + self._bulk_insert_embeddings.clear() + logger.debug("Entered bulk insert mode") + + def end_bulk_insert(self) -> None: + """End bulk insert mode and rebuild ANN index from accumulated data. + + This method should be called after all bulk inserts are complete to + update the ANN index in a single batch operation. + """ + if not self._bulk_insert_mode: + logger.warning("end_bulk_insert called but not in bulk insert mode") + return + + self._bulk_insert_mode = False + + # Update ANN index with all accumulated data + if self._bulk_insert_ids and self._bulk_insert_embeddings: + if self._ensure_ann_index(len(self._bulk_insert_embeddings[0])): + with self._ann_write_lock: + try: + embeddings_matrix = np.vstack(self._bulk_insert_embeddings) + self._ann_index.add_vectors(self._bulk_insert_ids, embeddings_matrix) + self._ann_index.save() + logger.info( + "Bulk insert complete: added %d vectors to ANN index", + len(self._bulk_insert_ids) + ) + except Exception as e: + logger.error("Failed to update ANN index after bulk insert: %s", e) + + # Clear accumulated data + self._bulk_insert_ids.clear() + self._bulk_insert_embeddings.clear() + logger.debug("Exited bulk insert mode") + + class BulkInsertContext: + """Context manager for bulk insert operations.""" + + def __init__(self, store: "VectorStore") -> None: + self.store = store + + def __enter__(self) -> "VectorStore": + self.store.begin_bulk_insert() + return self.store + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.store.end_bulk_insert() + + def bulk_insert(self) -> "VectorStore.BulkInsertContext": + """Return a context manager for bulk insert operations. + + Usage: + with store.bulk_insert(): + for batch in batches: + store.add_chunks_batch(batch) + """ + return self.BulkInsertContext(self) + def delete_file_chunks(self, file_path: str) -> int: """Delete all chunks for a file.