From 0b6e9db8e4284063d931fc277cd0f8d9f6cec716 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Fri, 2 Jan 2026 17:18:23 +0800 Subject: [PATCH] feat: Add centralized vector storage and metadata management for embeddings --- codex-lens/src/codexlens/cli/commands.py | 46 ++- .../src/codexlens/cli/embedding_manager.py | 41 ++- .../src/codexlens/search/hybrid_search.py | 124 ++++++- codex-lens/src/codexlens/storage/__init__.py | 3 + .../codexlens/storage/vector_meta_store.py | 331 ++++++++++++++++++ 5 files changed, 534 insertions(+), 11 deletions(-) create mode 100644 codex-lens/src/codexlens/storage/vector_meta_store.py diff --git a/codex-lens/src/codexlens/cli/commands.py b/codex-lens/src/codexlens/cli/commands.py index 630d73b0..d0f6f034 100644 --- a/codex-lens/src/codexlens/cli/commands.py +++ b/codex-lens/src/codexlens/cli/commands.py @@ -2005,6 +2005,12 @@ def embeddings_generate( ), json_mode: bool = typer.Option(False, "--json", help="Output JSON response."), verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose output."), + centralized: bool = typer.Option( + False, + "--centralized", + "-c", + help="Use centralized vector storage (single HNSW index at project root).", + ), ) -> None: """Generate semantic embeddings for code search. @@ -2012,6 +2018,10 @@ def embeddings_generate( semantic search capabilities. Embeddings are stored in the same database as the FTS index. + Storage Modes: + - Default: Per-directory HNSW indexes alongside _index.db files + - Centralized: Single HNSW index at project root (_vectors.hnsw) + Embedding Backend Options: - fastembed: Local ONNX-based embeddings (default, no API calls) - litellm: Remote API embeddings via ccw-litellm (requires API keys) @@ -2033,12 +2043,14 @@ def embeddings_generate( codexlens embeddings-generate ~/.codexlens/indexes/project/_index.db # Specific index codexlens embeddings-generate ~/projects/my-app --backend litellm --model text-embedding-3-small # Use LiteLLM codexlens embeddings-generate ~/projects/my-app --model fast --force # Regenerate with fast profile + codexlens embeddings-generate ~/projects/my-app --centralized # Centralized vector storage """ _configure_logging(verbose, json_mode) from codexlens.cli.embedding_manager import ( generate_embeddings, generate_embeddings_recursive, + generate_dense_embeddings_centralized, scan_for_model_conflicts, check_global_model_lock, set_locked_model_config, @@ -2099,7 +2111,11 @@ def embeddings_generate( console.print(f" {msg}") console.print(f"[bold]Generating embeddings[/bold]") - if use_recursive: + if centralized: + effective_root = index_root if index_root else (index_path.parent if index_path else target_path) + console.print(f"Index root: [dim]{effective_root}[/dim]") + console.print(f"Mode: [green]Centralized[/green]") + elif use_recursive: console.print(f"Index root: [dim]{index_root}[/dim]") console.print(f"Mode: [yellow]Recursive[/yellow]") else: @@ -2179,7 +2195,20 @@ def embeddings_generate( console.print("[yellow]Cancelled.[/yellow] Use --force to skip this prompt.") raise typer.Exit(code=0) - if use_recursive: + if centralized: + # Centralized mode: single HNSW index at project root + if not index_root: + index_root = index_path.parent if index_path else target_path + result = generate_dense_embeddings_centralized( + index_root, + embedding_backend=backend, + model_profile=model, + force=force, + chunk_size=chunk_size, + progress_callback=progress_update, + max_workers=max_workers, + ) + elif use_recursive: result = generate_embeddings_recursive( index_root, embedding_backend=backend, @@ -2225,7 +2254,18 @@ def embeddings_generate( # This prevents using different models for future indexes set_locked_model_config(backend, model) - if use_recursive: + if centralized: + # Centralized mode output + elapsed = data.get("elapsed_time", 0) + console.print(f"[green]✓[/green] Centralized embeddings generated successfully!") + console.print(f" Model: {data.get('model_name', model)}") + console.print(f" Chunks created: {data['chunks_created']:,}") + console.print(f" Files processed: {data['files_processed']}") + if data.get("files_failed", 0) > 0: + console.print(f" [yellow]Files failed: {data['files_failed']}[/yellow]") + console.print(f" Central index: {data.get('central_index_path', 'N/A')}") + console.print(f" Time: {elapsed:.1f}s") + elif use_recursive: # Recursive mode output console.print(f"[green]✓[/green] Recursive embeddings generation complete!") console.print(f" Indexes processed: {data['indexes_processed']}") diff --git a/codex-lens/src/codexlens/cli/embedding_manager.py b/codex-lens/src/codexlens/cli/embedding_manager.py index dac2ae1d..8588bcdf 100644 --- a/codex-lens/src/codexlens/cli/embedding_manager.py +++ b/codex-lens/src/codexlens/cli/embedding_manager.py @@ -17,6 +17,11 @@ except ImportError: def is_embedding_backend_available(_backend: str): # type: ignore[no-redef] return False, "codexlens.semantic not available" +try: + from codexlens.config import VECTORS_META_DB_NAME +except ImportError: + VECTORS_META_DB_NAME = "_vectors_meta.db" + try: from codexlens.search.ranking import get_file_category except ImportError: @@ -1277,10 +1282,38 @@ def generate_dense_embeddings_centralized( } # Store chunk metadata in a centralized metadata database - vectors_meta_path = index_root / "VECTORS_META_DB_NAME" - # Note: The metadata is already stored in individual _index.db semantic_chunks tables - # For now, we rely on the existing per-index storage for metadata lookup - # A future enhancement could consolidate metadata into _vectors_meta.db + vectors_meta_path = index_root / VECTORS_META_DB_NAME + if chunk_id_to_info: + if progress_callback: + progress_callback(f"Storing {len(chunk_id_to_info)} chunk metadata records...") + + try: + from codexlens.storage.vector_meta_store import VectorMetadataStore + + with VectorMetadataStore(vectors_meta_path) as meta_store: + # Convert chunk_id_to_info dict to list of dicts for batch insert + chunks_to_store = [] + for cid, info in chunk_id_to_info.items(): + metadata = info.get("metadata", {}) + chunks_to_store.append({ + "chunk_id": cid, + "file_path": info["file_path"], + "content": info["content"], + "start_line": metadata.get("start_line"), + "end_line": metadata.get("end_line"), + "category": info.get("category"), + "metadata": metadata, + "source_index_db": None, # Not tracked per-chunk currently + }) + + meta_store.add_chunks(chunks_to_store) + + if progress_callback: + progress_callback(f"Saved metadata to {vectors_meta_path}") + + except Exception as e: + logger.warning("Failed to store vector metadata: %s", e) + # Non-fatal: continue without centralized metadata elapsed_time = time.time() - start_time diff --git a/codex-lens/src/codexlens/search/hybrid_search.py b/codex-lens/src/codexlens/search/hybrid_search.py index 919af35e..8b04becc 100644 --- a/codex-lens/src/codexlens/search/hybrid_search.py +++ b/codex-lens/src/codexlens/search/hybrid_search.py @@ -664,10 +664,15 @@ class HybridSearchEngine: scores: List[float], category: Optional[str] = None, ) -> List[SearchResult]: - """Fetch chunk metadata from all _index.db files for centralized search. + """Fetch chunk metadata from centralized _vectors_meta.db for fast lookup. + + This method uses the centralized VectorMetadataStore for O(1) lookup + instead of traversing all _index.db files (O(n) where n = number of indexes). + + Falls back to the legacy per-index lookup if centralized metadata is unavailable. Args: - index_root: Root directory containing _index.db files + index_root: Root directory containing _vectors_meta.db chunk_ids: List of chunk IDs from ANN search scores: Corresponding similarity scores category: Optional category filter @@ -675,12 +680,123 @@ class HybridSearchEngine: Returns: List of SearchResult objects """ - import sqlite3 - import json + from codexlens.config import VECTORS_META_DB_NAME # Build score map score_map = {cid: score for cid, score in zip(chunk_ids, scores)} + # Try centralized metadata store first (fast path) + vectors_meta_path = index_root / VECTORS_META_DB_NAME + if vectors_meta_path.exists(): + try: + return self._fetch_from_vector_meta_store( + vectors_meta_path, chunk_ids, score_map, category + ) + except Exception as e: + self.logger.debug( + "Centralized metadata lookup failed, falling back: %s", e + ) + + # Fallback: traverse _index.db files (legacy path) + return self._fetch_chunks_by_ids_legacy( + index_root, chunk_ids, score_map, category + ) + + def _fetch_from_vector_meta_store( + self, + meta_db_path: Path, + chunk_ids: List[int], + score_map: Dict[int, float], + category: Optional[str] = None, + ) -> List[SearchResult]: + """Fetch chunks from centralized VectorMetadataStore. + + Args: + meta_db_path: Path to _vectors_meta.db + chunk_ids: List of chunk IDs to fetch + score_map: Mapping of chunk_id to score + category: Optional category filter + + Returns: + List of SearchResult objects + """ + from codexlens.storage.vector_meta_store import VectorMetadataStore + + results = [] + + with VectorMetadataStore(meta_db_path) as meta_store: + rows = meta_store.get_chunks_by_ids(chunk_ids, category=category) + + for row in rows: + chunk_id = row["chunk_id"] + file_path = row["file_path"] + content = row["content"] or "" + metadata = row.get("metadata") or {} + start_line = row.get("start_line") + end_line = row.get("end_line") + + score = score_map.get(chunk_id, 0.0) + + # Build excerpt + excerpt = content[:200] + "..." if len(content) > 200 else content + + # Extract symbol information + symbol_name = metadata.get("symbol_name") + symbol_kind = metadata.get("symbol_kind") + + # Build Symbol object if available + symbol = None + if symbol_name and symbol_kind and start_line and end_line: + try: + from codexlens.entities import Symbol + symbol = Symbol( + name=symbol_name, + kind=symbol_kind, + range=(start_line, end_line) + ) + except Exception: + pass + + results.append(SearchResult( + path=file_path, + score=score, + excerpt=excerpt, + content=content, + symbol=symbol, + metadata=metadata, + start_line=start_line, + end_line=end_line, + symbol_name=symbol_name, + symbol_kind=symbol_kind, + )) + + # Sort by score descending + results.sort(key=lambda r: r.score, reverse=True) + return results + + def _fetch_chunks_by_ids_legacy( + self, + index_root: Path, + chunk_ids: List[int], + score_map: Dict[int, float], + category: Optional[str] = None, + ) -> List[SearchResult]: + """Legacy fallback: fetch chunk metadata by traversing all _index.db files. + + This is the O(n) fallback path used when centralized metadata is unavailable. + + Args: + index_root: Root directory containing _index.db files + chunk_ids: List of chunk IDs from ANN search + score_map: Mapping of chunk_id to score + category: Optional category filter + + Returns: + List of SearchResult objects + """ + import sqlite3 + import json + # Find all _index.db files index_files = list(index_root.rglob("_index.db")) diff --git a/codex-lens/src/codexlens/storage/__init__.py b/codex-lens/src/codexlens/storage/__init__.py index dd0820eb..815bc961 100644 --- a/codex-lens/src/codexlens/storage/__init__.py +++ b/codex-lens/src/codexlens/storage/__init__.py @@ -7,6 +7,7 @@ from .path_mapper import PathMapper from .registry import RegistryStore, ProjectInfo, DirMapping from .dir_index import DirIndexStore, SubdirLink, FileEntry from .index_tree import IndexTreeBuilder, BuildResult, DirBuildResult +from .vector_meta_store import VectorMetadataStore __all__ = [ # Legacy (workspace-local) @@ -25,5 +26,7 @@ __all__ = [ "IndexTreeBuilder", "BuildResult", "DirBuildResult", + # Vector metadata + "VectorMetadataStore", ] diff --git a/codex-lens/src/codexlens/storage/vector_meta_store.py b/codex-lens/src/codexlens/storage/vector_meta_store.py new file mode 100644 index 00000000..d42a75e1 --- /dev/null +++ b/codex-lens/src/codexlens/storage/vector_meta_store.py @@ -0,0 +1,331 @@ +"""Central storage for vector metadata. + +This module provides a centralized SQLite database for storing chunk metadata +associated with centralized vector indexes. Instead of traversing all _index.db +files to fetch chunk metadata, this provides O(1) lookup by chunk ID. +""" + +from __future__ import annotations + +import json +import logging +import sqlite3 +import threading +from pathlib import Path +from typing import Any, Dict, List, Optional + +from codexlens.errors import StorageError + +logger = logging.getLogger(__name__) + + +class VectorMetadataStore: + """Store and retrieve chunk metadata for centralized vector search. + + This class provides efficient storage and retrieval of chunk metadata + for the centralized vector index architecture. All chunk metadata is + stored in a single _vectors_meta.db file at the project root, enabling + fast lookups without traversing multiple _index.db files. + + Schema: + chunk_metadata: + - chunk_id: INTEGER PRIMARY KEY - Global chunk ID + - file_path: TEXT NOT NULL - Path to source file + - content: TEXT - Chunk text content + - start_line: INTEGER - Start line in source file + - end_line: INTEGER - End line in source file + - category: TEXT - Content category (code/doc) + - metadata: TEXT - JSON-encoded additional metadata + - source_index_db: TEXT - Path to source _index.db file + """ + + def __init__(self, db_path: Path | str) -> None: + """Initialize VectorMetadataStore. + + Args: + db_path: Path to SQLite database file. + """ + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + # Thread-safe connection management + self._lock = threading.RLock() + self._local = threading.local() + + def _get_connection(self) -> sqlite3.Connection: + """Get or create a thread-local database connection. + + Each thread gets its own connection to ensure thread safety. + """ + conn = getattr(self._local, "conn", None) + if conn is None: + conn = sqlite3.connect( + str(self.db_path), + timeout=30.0, + check_same_thread=True, + ) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA mmap_size=1073741824") # 1GB mmap + self._local.conn = conn + return conn + + def _ensure_schema(self) -> None: + """Create tables if they don't exist.""" + with self._lock: + conn = self._get_connection() + try: + conn.execute(''' + CREATE TABLE IF NOT EXISTS chunk_metadata ( + chunk_id INTEGER PRIMARY KEY, + file_path TEXT NOT NULL, + content TEXT, + start_line INTEGER, + end_line INTEGER, + category TEXT, + metadata TEXT, + source_index_db TEXT + ) + ''') + conn.execute( + 'CREATE INDEX IF NOT EXISTS idx_chunk_file_path ' + 'ON chunk_metadata(file_path)' + ) + conn.execute( + 'CREATE INDEX IF NOT EXISTS idx_chunk_category ' + 'ON chunk_metadata(category)' + ) + conn.commit() + logger.debug("VectorMetadataStore schema created/verified") + except sqlite3.Error as e: + raise StorageError( + f"Failed to create schema: {e}", + db_path=str(self.db_path), + operation="_ensure_schema" + ) from e + + def add_chunk( + self, + chunk_id: int, + file_path: str, + content: str, + start_line: Optional[int] = None, + end_line: Optional[int] = None, + category: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + source_index_db: Optional[str] = None, + ) -> None: + """Add a single chunk's metadata. + + Args: + chunk_id: Global unique chunk ID. + file_path: Path to source file. + content: Chunk text content. + start_line: Start line in source file. + end_line: End line in source file. + category: Content category (code/doc). + metadata: Additional metadata dictionary. + source_index_db: Path to source _index.db file. + """ + with self._lock: + conn = self._get_connection() + try: + metadata_json = json.dumps(metadata) if metadata else None + conn.execute( + ''' + INSERT OR REPLACE INTO chunk_metadata + (chunk_id, file_path, content, start_line, end_line, + category, metadata, source_index_db) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', + (chunk_id, file_path, content, start_line, end_line, + category, metadata_json, source_index_db) + ) + conn.commit() + except sqlite3.Error as e: + raise StorageError( + f"Failed to add chunk {chunk_id}: {e}", + db_path=str(self.db_path), + operation="add_chunk" + ) from e + + def add_chunks(self, chunks: List[Dict[str, Any]]) -> None: + """Batch insert chunk metadata. + + Args: + chunks: List of dictionaries with keys: + - chunk_id (required): Global unique chunk ID + - file_path (required): Path to source file + - content: Chunk text content + - start_line: Start line in source file + - end_line: End line in source file + - category: Content category (code/doc) + - metadata: Additional metadata dictionary + - source_index_db: Path to source _index.db file + """ + if not chunks: + return + + with self._lock: + conn = self._get_connection() + try: + batch_data = [] + for chunk in chunks: + metadata = chunk.get("metadata") + metadata_json = json.dumps(metadata) if metadata else None + batch_data.append(( + chunk["chunk_id"], + chunk["file_path"], + chunk.get("content"), + chunk.get("start_line"), + chunk.get("end_line"), + chunk.get("category"), + metadata_json, + chunk.get("source_index_db"), + )) + + conn.executemany( + ''' + INSERT OR REPLACE INTO chunk_metadata + (chunk_id, file_path, content, start_line, end_line, + category, metadata, source_index_db) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', + batch_data + ) + conn.commit() + logger.debug("Batch inserted %d chunk metadata records", len(chunks)) + except sqlite3.Error as e: + raise StorageError( + f"Failed to batch insert chunks: {e}", + db_path=str(self.db_path), + operation="add_chunks" + ) from e + + def get_chunks_by_ids( + self, + chunk_ids: List[int], + category: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """Retrieve chunks by their IDs - the key optimization. + + This is the primary method that replaces traversing all _index.db files. + Provides O(1) lookup by chunk ID instead of O(n) where n is the number + of index databases. + + Args: + chunk_ids: List of chunk IDs to retrieve. + category: Optional category filter ('code' or 'doc'). + + Returns: + List of dictionaries with chunk metadata: + - chunk_id: Global chunk ID + - file_path: Path to source file + - content: Chunk text content + - start_line: Start line in source file + - end_line: End line in source file + - category: Content category + - metadata: Parsed metadata dictionary + - source_index_db: Source _index.db path + """ + if not chunk_ids: + return [] + + with self._lock: + conn = self._get_connection() + try: + placeholders = ",".join("?" * len(chunk_ids)) + + if category: + query = f''' + SELECT chunk_id, file_path, content, start_line, end_line, + category, metadata, source_index_db + FROM chunk_metadata + WHERE chunk_id IN ({placeholders}) AND category = ? + ''' + params = list(chunk_ids) + [category] + else: + query = f''' + SELECT chunk_id, file_path, content, start_line, end_line, + category, metadata, source_index_db + FROM chunk_metadata + WHERE chunk_id IN ({placeholders}) + ''' + params = list(chunk_ids) + + rows = conn.execute(query, params).fetchall() + + results = [] + for row in rows: + metadata = None + if row["metadata"]: + try: + metadata = json.loads(row["metadata"]) + except json.JSONDecodeError: + metadata = {} + + results.append({ + "chunk_id": row["chunk_id"], + "file_path": row["file_path"], + "content": row["content"], + "start_line": row["start_line"], + "end_line": row["end_line"], + "category": row["category"], + "metadata": metadata or {}, + "source_index_db": row["source_index_db"], + }) + + return results + + except sqlite3.Error as e: + logger.error("Failed to get chunks by IDs: %s", e) + return [] + + def get_chunk_count(self) -> int: + """Get total number of chunks in store. + + Returns: + Total chunk count. + """ + with self._lock: + conn = self._get_connection() + try: + row = conn.execute( + "SELECT COUNT(*) FROM chunk_metadata" + ).fetchone() + return row[0] if row else 0 + except sqlite3.Error: + return 0 + + def clear(self) -> None: + """Clear all metadata.""" + with self._lock: + conn = self._get_connection() + try: + conn.execute("DELETE FROM chunk_metadata") + conn.commit() + logger.info("Cleared all chunk metadata") + except sqlite3.Error as e: + raise StorageError( + f"Failed to clear metadata: {e}", + db_path=str(self.db_path), + operation="clear" + ) from e + + def close(self) -> None: + """Close database connection.""" + with self._lock: + conn = getattr(self._local, "conn", None) + if conn is not None: + conn.close() + self._local.conn = None + + def __enter__(self) -> "VectorMetadataStore": + """Context manager entry.""" + self._ensure_schema() + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """Context manager exit.""" + self.close()