From 9157c5c78b1804602b402d10262233945023819e Mon Sep 17 00:00:00 2001 From: catlog22 Date: Fri, 2 Jan 2026 16:53:39 +0800 Subject: [PATCH] feat: Implement centralized storage for SPLADE and vector embeddings - Added centralized SPLADE database and vector storage configuration in config.py. - Updated embedding_manager.py to support centralized SPLADE database path. - Enhanced generate_embeddings and generate_embeddings_recursive functions for centralized storage. - Introduced centralized ANN index creation in ann_index.py. - Modified hybrid_search.py to utilize centralized vector index for searches. - Implemented methods to discover and manage centralized SPLADE and HNSW files. --- codex-lens/src/codexlens/cli/commands.py | 395 +++++++++++++++++- .../src/codexlens/cli/embedding_manager.py | 289 ++++++++++++- codex-lens/src/codexlens/config.py | 7 + .../src/codexlens/search/hybrid_search.py | 279 ++++++++++++- .../src/codexlens/semantic/ann_index.py | 90 ++++ 5 files changed, 1051 insertions(+), 9 deletions(-) diff --git a/codex-lens/src/codexlens/cli/commands.py b/codex-lens/src/codexlens/cli/commands.py index cc61010e..630d73b0 100644 --- a/codex-lens/src/codexlens/cli/commands.py +++ b/codex-lens/src/codexlens/cli/commands.py @@ -6,6 +6,7 @@ import json import logging import os import shutil +import sqlite3 from pathlib import Path from typing import Annotated, Any, Dict, Iterable, List, Optional @@ -2514,7 +2515,8 @@ def splade_index_command( console.print(f"[blue]Discovered {len(all_index_dbs)} index databases[/blue]") # SPLADE index is stored alongside the root _index.db - splade_db = index_root / "_splade.db" + from codexlens.config import SPLADE_DB_NAME + splade_db = index_root / SPLADE_DB_NAME if splade_db.exists() and not rebuild: console.print("[yellow]SPLADE index exists. Use --rebuild to regenerate.[/yellow]") @@ -2626,15 +2628,16 @@ def splade_status_command( from codexlens.storage.splade_index import SpladeIndex from codexlens.semantic.splade_encoder import check_splade_available + from codexlens.config import SPLADE_DB_NAME # Find index database target_path = path.expanduser().resolve() if target_path.is_file() and target_path.name == "_index.db": - splade_db = target_path.parent / "_splade.db" + splade_db = target_path.parent / SPLADE_DB_NAME elif target_path.is_dir(): # Check for local .codexlens/_splade.db - local_splade = target_path / ".codexlens" / "_splade.db" + local_splade = target_path / ".codexlens" / SPLADE_DB_NAME if local_splade.exists(): splade_db = local_splade else: @@ -2644,7 +2647,7 @@ def splade_status_command( registry.initialize() mapper = PathMapper() index_db = mapper.source_to_index_db(target_path) - splade_db = index_db.parent / "_splade.db" + splade_db = index_db.parent / SPLADE_DB_NAME finally: registry.close() else: @@ -3084,3 +3087,387 @@ def cascade_index( console.print(f" [dim]{err}[/dim]") if len(errors_list) > 3: console.print(f" [dim]... and {len(errors_list) - 3} more[/dim]") + + +# ==================== Index Migration Commands ==================== + +# Index version for migration tracking (file-based version marker) +INDEX_FORMAT_VERSION = "2.0" +INDEX_VERSION_FILE = "_index_version.txt" + + +def _get_index_version(index_root: Path) -> Optional[str]: + """Read index format version from version marker file. + + Args: + index_root: Root directory of the index + + Returns: + Version string if file exists, None otherwise + """ + version_file = index_root / INDEX_VERSION_FILE + if version_file.exists(): + try: + return version_file.read_text(encoding="utf-8").strip() + except Exception: + return None + return None + + +def _set_index_version(index_root: Path, version: str) -> None: + """Write index format version to version marker file. + + Args: + index_root: Root directory of the index + version: Version string to write + """ + version_file = index_root / INDEX_VERSION_FILE + version_file.write_text(version, encoding="utf-8") + + +def _discover_distributed_splade(index_root: Path) -> List[Dict[str, Any]]: + """Discover distributed SPLADE data in _index.db files. + + Scans all _index.db files for embedded splade_postings tables. + This is the old distributed format that needs migration. + + Args: + index_root: Root directory to scan + + Returns: + List of dicts with db_path, posting_count, chunk_count + """ + results = [] + + for db_path in index_root.rglob("_index.db"): + try: + conn = sqlite3.connect(db_path, timeout=5.0) + conn.row_factory = sqlite3.Row + + # Check if splade_postings table exists (old embedded format) + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='splade_postings'" + ) + if cursor.fetchone(): + # Count postings and chunks + try: + row = conn.execute( + "SELECT COUNT(*) as postings, COUNT(DISTINCT chunk_id) as chunks FROM splade_postings" + ).fetchone() + results.append({ + "db_path": db_path, + "posting_count": row["postings"] if row else 0, + "chunk_count": row["chunks"] if row else 0, + }) + except Exception: + pass + + conn.close() + except Exception: + pass + + return results + + +def _discover_distributed_hnsw(index_root: Path) -> List[Dict[str, Any]]: + """Discover distributed HNSW index files. + + Scans for .hnsw files that are stored alongside _index.db files. + This is the old distributed format that needs migration. + + Args: + index_root: Root directory to scan + + Returns: + List of dicts with hnsw_path, size_bytes + """ + results = [] + + for hnsw_path in index_root.rglob("*.hnsw"): + try: + size = hnsw_path.stat().st_size + results.append({ + "hnsw_path": hnsw_path, + "size_bytes": size, + }) + except Exception: + pass + + return results + + +def _check_centralized_storage(index_root: Path) -> Dict[str, Any]: + """Check for centralized storage files. + + Args: + index_root: Root directory to check + + Returns: + Dict with has_splade, has_vectors, splade_stats, vector_stats + """ + from codexlens.config import SPLADE_DB_NAME, VECTORS_HNSW_NAME + + splade_db = index_root / SPLADE_DB_NAME + vectors_hnsw = index_root / VECTORS_HNSW_NAME + + result = { + "has_splade": splade_db.exists(), + "has_vectors": vectors_hnsw.exists(), + "splade_path": str(splade_db) if splade_db.exists() else None, + "vectors_path": str(vectors_hnsw) if vectors_hnsw.exists() else None, + "splade_stats": None, + "vector_stats": None, + } + + # Get SPLADE stats if exists + if splade_db.exists(): + try: + from codexlens.storage.splade_index import SpladeIndex + splade = SpladeIndex(splade_db) + if splade.has_index(): + result["splade_stats"] = splade.get_stats() + splade.close() + except Exception: + pass + + # Get vector stats if exists + if vectors_hnsw.exists(): + try: + result["vector_stats"] = { + "size_bytes": vectors_hnsw.stat().st_size, + } + except Exception: + pass + + return result + + +@app.command(name="index-migrate") +def index_migrate( + path: Annotated[Optional[str], typer.Argument(help="Project path to migrate")] = None, + dry_run: Annotated[bool, typer.Option("--dry-run", help="Show what would be migrated without making changes")] = False, + force: Annotated[bool, typer.Option("--force", help="Force migration even if already migrated")] = False, + json_mode: Annotated[bool, typer.Option("--json", help="Output JSON response")] = False, + verbose: Annotated[bool, typer.Option("--verbose", "-v", help="Enable verbose output")] = False, +) -> None: + """Migrate old distributed index to new centralized architecture. + + This command upgrades indexes from the old distributed storage format + (where SPLADE/vectors were stored in each _index.db) to the new centralized + format (single _splade.db and _vectors.hnsw at index root). + + Migration Steps: + 1. Detect if migration is needed (check version marker) + 2. Discover distributed SPLADE data in _index.db files + 3. Discover distributed .hnsw files + 4. Report current status + 5. Create version marker (unless --dry-run) + + Use --dry-run to preview what would be migrated without making changes. + Use --force to re-run migration even if version marker exists. + + Note: For full data migration (SPLADE/vectors consolidation), run: + codexlens splade-index --rebuild + codexlens embeddings-generate --recursive --force + + Examples: + codexlens index-migrate ~/projects/my-app --dry-run + codexlens index-migrate . --force + codexlens index-migrate --json + """ + _configure_logging(verbose, json_mode) + + # Resolve target path + if path: + target_path = Path(path).expanduser().resolve() + else: + target_path = Path.cwd() + + if not target_path.exists(): + if json_mode: + print_json(success=False, error=f"Path does not exist: {target_path}") + else: + console.print(f"[red]Error:[/red] Path does not exist: {target_path}") + raise typer.Exit(code=1) + + # Find index root + registry: RegistryStore | None = None + index_root: Optional[Path] = None + + try: + registry = RegistryStore() + registry.initialize() + mapper = PathMapper() + + # Check if path is a project with an index + project_info = registry.get_project(target_path) + if project_info: + index_root = Path(project_info.index_root) + else: + # Try to find index via mapper + index_db = mapper.source_to_index_db(target_path) + if index_db.exists(): + index_root = index_db.parent + finally: + if registry: + registry.close() + + if not index_root or not index_root.exists(): + if json_mode: + print_json(success=False, error=f"No index found for: {target_path}") + else: + console.print(f"[red]Error:[/red] No index found for: {target_path}") + console.print("[dim]Run 'codexlens init' first to create an index.[/dim]") + raise typer.Exit(code=1) + + if not json_mode: + console.print(f"[bold]Index Migration Check[/bold]") + console.print(f"Source path: [dim]{target_path}[/dim]") + console.print(f"Index root: [dim]{index_root}[/dim]") + if dry_run: + console.print("[yellow]Mode: DRY RUN (no changes will be made)[/yellow]") + console.print() + + # Check current version + current_version = _get_index_version(index_root) + needs_migration = current_version is None or (force and current_version != INDEX_FORMAT_VERSION) + + if current_version and current_version >= INDEX_FORMAT_VERSION and not force: + result = { + "path": str(target_path), + "index_root": str(index_root), + "current_version": current_version, + "target_version": INDEX_FORMAT_VERSION, + "needs_migration": False, + "message": "Index is already at the latest version", + } + + if json_mode: + print_json(success=True, result=result) + else: + console.print(f"[green]OK[/green] Index is already at version {current_version}") + console.print("[dim]No migration needed. Use --force to re-run migration.[/dim]") + return + + # Discover distributed data + distributed_splade = _discover_distributed_splade(index_root) + distributed_hnsw = _discover_distributed_hnsw(index_root) + centralized = _check_centralized_storage(index_root) + + # Count all _index.db files + all_index_dbs = list(index_root.rglob("_index.db")) + + # Build migration report + migration_report = { + "path": str(target_path), + "index_root": str(index_root), + "dry_run": dry_run, + "current_version": current_version, + "target_version": INDEX_FORMAT_VERSION, + "needs_migration": needs_migration, + "discovery": { + "total_index_dbs": len(all_index_dbs), + "distributed_splade_count": len(distributed_splade), + "distributed_splade_total_postings": sum(d["posting_count"] for d in distributed_splade), + "distributed_hnsw_count": len(distributed_hnsw), + "distributed_hnsw_total_bytes": sum(d["size_bytes"] for d in distributed_hnsw), + }, + "centralized": centralized, + "recommendations": [], + } + + # Generate recommendations + if distributed_splade and not centralized["has_splade"]: + migration_report["recommendations"].append( + f"Run 'codexlens splade-index {target_path} --rebuild' to consolidate SPLADE data" + ) + + if distributed_hnsw and not centralized["has_vectors"]: + migration_report["recommendations"].append( + f"Run 'codexlens embeddings-generate {target_path} --recursive --force' to consolidate vector data" + ) + + if not distributed_splade and not distributed_hnsw: + migration_report["recommendations"].append( + "No distributed data found. Index may already be using centralized storage." + ) + + if json_mode: + # Perform migration action (set version marker) unless dry-run + if not dry_run and needs_migration: + _set_index_version(index_root, INDEX_FORMAT_VERSION) + migration_report["migrated"] = True + migration_report["new_version"] = INDEX_FORMAT_VERSION + else: + migration_report["migrated"] = False + + print_json(success=True, result=migration_report) + else: + # Display discovery results + console.print("[bold]Discovery Results:[/bold]") + console.print(f" Total _index.db files: {len(all_index_dbs)}") + console.print() + + # Distributed SPLADE + console.print("[bold]Distributed SPLADE Data:[/bold]") + if distributed_splade: + total_postings = sum(d["posting_count"] for d in distributed_splade) + total_chunks = sum(d["chunk_count"] for d in distributed_splade) + console.print(f" Found in {len(distributed_splade)} _index.db files") + console.print(f" Total postings: {total_postings:,}") + console.print(f" Total chunks: {total_chunks:,}") + if verbose: + for d in distributed_splade[:5]: + console.print(f" [dim]{d['db_path'].parent.name}: {d['posting_count']} postings[/dim]") + if len(distributed_splade) > 5: + console.print(f" [dim]... and {len(distributed_splade) - 5} more[/dim]") + else: + console.print(" [dim]None found (already centralized or not generated)[/dim]") + console.print() + + # Distributed HNSW + console.print("[bold]Distributed HNSW Files:[/bold]") + if distributed_hnsw: + total_size = sum(d["size_bytes"] for d in distributed_hnsw) + console.print(f" Found {len(distributed_hnsw)} .hnsw files") + console.print(f" Total size: {total_size / (1024 * 1024):.1f} MB") + if verbose: + for d in distributed_hnsw[:5]: + console.print(f" [dim]{d['hnsw_path'].name}: {d['size_bytes'] / 1024:.1f} KB[/dim]") + if len(distributed_hnsw) > 5: + console.print(f" [dim]... and {len(distributed_hnsw) - 5} more[/dim]") + else: + console.print(" [dim]None found (already centralized or not generated)[/dim]") + console.print() + + # Centralized storage status + console.print("[bold]Centralized Storage:[/bold]") + if centralized["has_splade"]: + stats = centralized.get("splade_stats") or {} + console.print(f" [green]OK[/green] _splade.db exists") + if stats: + console.print(f" Chunks: {stats.get('unique_chunks', 0):,}") + console.print(f" Postings: {stats.get('total_postings', 0):,}") + else: + console.print(f" [yellow]--[/yellow] _splade.db not found") + + if centralized["has_vectors"]: + stats = centralized.get("vector_stats") or {} + size_mb = stats.get("size_bytes", 0) / (1024 * 1024) + console.print(f" [green]OK[/green] _vectors.hnsw exists ({size_mb:.1f} MB)") + else: + console.print(f" [yellow]--[/yellow] _vectors.hnsw not found") + console.print() + + # Migration action + if not dry_run and needs_migration: + _set_index_version(index_root, INDEX_FORMAT_VERSION) + console.print(f"[green]OK[/green] Version marker created: {INDEX_FORMAT_VERSION}") + elif dry_run: + console.print(f"[yellow]DRY RUN:[/yellow] Would create version marker: {INDEX_FORMAT_VERSION}") + + # Recommendations + if migration_report["recommendations"]: + console.print("\n[bold]Recommendations:[/bold]") + for rec in migration_report["recommendations"]: + console.print(f" [cyan]>[/cyan] {rec}") diff --git a/codex-lens/src/codexlens/cli/embedding_manager.py b/codex-lens/src/codexlens/cli/embedding_manager.py index a0508e1a..dac2ae1d 100644 --- a/codex-lens/src/codexlens/cli/embedding_manager.py +++ b/codex-lens/src/codexlens/cli/embedding_manager.py @@ -310,6 +310,7 @@ def generate_embeddings( endpoints: Optional[List] = None, strategy: Optional[str] = None, cooldown: Optional[float] = None, + splade_db_path: Optional[Path] = None, ) -> Dict[str, any]: """Generate embeddings for an index using memory-efficient batch processing. @@ -339,6 +340,9 @@ def generate_embeddings( Each dict has keys: model, api_key, api_base, weight. strategy: Selection strategy for multi-endpoint mode (round_robin, latency_aware). cooldown: Default cooldown seconds for rate-limited endpoints. + splade_db_path: Optional path to centralized SPLADE database. If None, SPLADE + is written to index_path (legacy behavior). Use index_root / SPLADE_DB_NAME + for centralized storage. Returns: Result dictionary with generation statistics @@ -723,7 +727,7 @@ def generate_embeddings( splade_error = None try: - from codexlens.config import Config + from codexlens.config import Config, SPLADE_DB_NAME config = Config.load() if config.enable_splade: @@ -737,8 +741,9 @@ def generate_embeddings( # Initialize SPLADE encoder and index splade_encoder = get_splade_encoder(use_gpu=use_gpu) - # Use main index database for SPLADE (not separate _splade.db) - splade_index = SpladeIndex(index_path) + # Use centralized SPLADE database if provided, otherwise fallback to index_path + effective_splade_path = splade_db_path if splade_db_path else index_path + splade_index = SpladeIndex(effective_splade_path) splade_index.create_tables() # Retrieve all chunks from database for SPLADE encoding @@ -953,6 +958,10 @@ def generate_embeddings_recursive( if progress_callback: progress_callback(f"Found {len(index_files)} index databases to process") + # Calculate centralized SPLADE database path + from codexlens.config import SPLADE_DB_NAME + splade_db_path = index_root / SPLADE_DB_NAME + # Process each index database all_results = [] total_chunks = 0 @@ -982,6 +991,7 @@ def generate_embeddings_recursive( endpoints=endpoints, strategy=strategy, cooldown=cooldown, + splade_db_path=splade_db_path, # Use centralized SPLADE storage ) all_results.append({ @@ -1023,6 +1033,279 @@ def generate_embeddings_recursive( } +def generate_dense_embeddings_centralized( + index_root: Path, + embedding_backend: Optional[str] = None, + model_profile: Optional[str] = None, + force: bool = False, + chunk_size: int = 2000, + overlap: int = 200, + progress_callback: Optional[callable] = None, + use_gpu: Optional[bool] = None, + max_tokens_per_batch: Optional[int] = None, + max_workers: Optional[int] = None, + endpoints: Optional[List] = None, + strategy: Optional[str] = None, + cooldown: Optional[float] = None, +) -> Dict[str, any]: + """Generate dense embeddings with centralized vector storage. + + This function creates a single HNSW index at the project root instead of + per-directory indexes. All chunks from all _index.db files are combined + into one central _vectors.hnsw file. + + Target architecture: + / + |-- _vectors.hnsw # Centralized dense vector ANN index + |-- _splade.db # Centralized sparse vector index + |-- src/ + |-- _index.db # No longer contains .hnsw file + + Args: + index_root: Root index directory containing _index.db files + embedding_backend: Embedding backend (fastembed or litellm) + model_profile: Model profile or name + force: If True, regenerate even if embeddings exist + chunk_size: Maximum chunk size in characters + overlap: Overlap size in characters + progress_callback: Optional callback for progress updates + use_gpu: Whether to use GPU acceleration + max_tokens_per_batch: Maximum tokens per batch + max_workers: Maximum concurrent workers + endpoints: Multi-endpoint configurations + strategy: Endpoint selection strategy + cooldown: Rate-limit cooldown seconds + + Returns: + Result dictionary with generation statistics + """ + from codexlens.config import VECTORS_HNSW_NAME, SPLADE_DB_NAME + + # Get defaults from config if not specified + (default_backend, default_model, default_gpu, + default_endpoints, default_strategy, default_cooldown) = _get_embedding_defaults() + + if embedding_backend is None: + embedding_backend = default_backend + if model_profile is None: + model_profile = default_model + if use_gpu is None: + use_gpu = default_gpu + if endpoints is None: + endpoints = default_endpoints + if strategy is None: + strategy = default_strategy + if cooldown is None: + cooldown = default_cooldown + + # Calculate endpoint count for worker scaling + endpoint_count = len(endpoints) if endpoints else 1 + + if max_workers is None: + if embedding_backend == "litellm": + if endpoint_count > 1: + max_workers = endpoint_count * 2 + else: + max_workers = 4 + else: + max_workers = 1 + + backend_available, backend_error = is_embedding_backend_available(embedding_backend) + if not backend_available: + return {"success": False, "error": backend_error or "Embedding backend not available"} + + # Discover all _index.db files + index_files = discover_all_index_dbs(index_root) + + if not index_files: + return { + "success": False, + "error": f"No index databases found in {index_root}", + } + + if progress_callback: + progress_callback(f"Found {len(index_files)} index databases for centralized embedding") + + # Check for existing centralized index + central_hnsw_path = index_root / VECTORS_HNSW_NAME + if central_hnsw_path.exists() and not force: + return { + "success": False, + "error": f"Centralized vector index already exists at {central_hnsw_path}. Use --force to regenerate.", + } + + # Initialize embedder + try: + from codexlens.semantic.factory import get_embedder as get_embedder_factory + from codexlens.semantic.chunker import Chunker, ChunkConfig + from codexlens.semantic.ann_index import ANNIndex + + if embedding_backend == "fastembed": + embedder = get_embedder_factory(backend="fastembed", profile=model_profile, use_gpu=use_gpu) + elif embedding_backend == "litellm": + embedder = get_embedder_factory( + backend="litellm", + model=model_profile, + endpoints=endpoints if endpoints else None, + strategy=strategy, + cooldown=cooldown, + ) + else: + return { + "success": False, + "error": f"Invalid embedding backend: {embedding_backend}", + } + + chunker = Chunker(config=ChunkConfig( + max_chunk_size=chunk_size, + overlap=overlap, + skip_token_count=True + )) + + if progress_callback: + if endpoint_count > 1: + progress_callback(f"Using {endpoint_count} API endpoints with {strategy} strategy") + progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)") + + except Exception as e: + return { + "success": False, + "error": f"Failed to initialize components: {str(e)}", + } + + # Create centralized ANN index + central_ann_index = ANNIndex.create_central( + index_root=index_root, + dim=embedder.embedding_dim, + initial_capacity=100000, # Larger capacity for centralized index + auto_save=False, + ) + + # Process all index databases + start_time = time.time() + failed_files = [] + total_chunks_created = 0 + total_files_processed = 0 + all_chunk_ids = [] + all_embeddings = [] + + # Track chunk ID to file_path mapping for metadata + chunk_id_to_info: Dict[int, Dict[str, Any]] = {} + next_chunk_id = 1 + + for idx, index_path in enumerate(index_files, 1): + if progress_callback: + try: + rel_path = index_path.relative_to(index_root) + except ValueError: + rel_path = index_path + progress_callback(f"Processing {idx}/{len(index_files)}: {rel_path}") + + try: + with sqlite3.connect(index_path) as conn: + conn.row_factory = sqlite3.Row + path_column = _get_path_column(conn) + + # Get files from this index + cursor = conn.execute(f"SELECT {path_column}, content, language FROM files") + file_rows = cursor.fetchall() + + for file_row in file_rows: + file_path = file_row[path_column] + content = file_row["content"] + language = file_row["language"] or "python" + + try: + chunks = chunker.chunk_sliding_window( + content, + file_path=file_path, + language=language + ) + + if not chunks: + continue + + total_files_processed += 1 + + # Generate embeddings for this file's chunks + batch_contents = [chunk.content for chunk in chunks] + embeddings_numpy = embedder.embed_to_numpy(batch_contents, batch_size=EMBEDDING_BATCH_SIZE) + + # Assign chunk IDs and store embeddings + for i, chunk in enumerate(chunks): + chunk_id = next_chunk_id + next_chunk_id += 1 + + all_chunk_ids.append(chunk_id) + all_embeddings.append(embeddings_numpy[i]) + + # Store metadata for later retrieval + chunk_id_to_info[chunk_id] = { + "file_path": file_path, + "content": chunk.content, + "metadata": chunk.metadata, + "category": get_file_category(file_path) or "code", + } + total_chunks_created += 1 + + except Exception as e: + logger.error(f"Failed to process {file_path}: {e}") + failed_files.append((file_path, str(e))) + + except Exception as e: + logger.error(f"Failed to read index {index_path}: {e}") + failed_files.append((str(index_path), str(e))) + + # Add all embeddings to centralized ANN index + if all_embeddings: + if progress_callback: + progress_callback(f"Building centralized ANN index with {len(all_embeddings)} vectors...") + + try: + import numpy as np + embeddings_matrix = np.vstack(all_embeddings) + central_ann_index.add_vectors(all_chunk_ids, embeddings_matrix) + central_ann_index.save() + + if progress_callback: + progress_callback(f"Saved centralized index to {central_hnsw_path}") + + except Exception as e: + return { + "success": False, + "error": f"Failed to build centralized ANN index: {str(e)}", + } + + # Store chunk metadata in a centralized metadata database + vectors_meta_path = index_root / "VECTORS_META_DB_NAME" + # Note: The metadata is already stored in individual _index.db semantic_chunks tables + # For now, we rely on the existing per-index storage for metadata lookup + # A future enhancement could consolidate metadata into _vectors_meta.db + + elapsed_time = time.time() - start_time + + # Cleanup + try: + _cleanup_fastembed_resources() + gc.collect() + except Exception: + pass + + return { + "success": True, + "result": { + "chunks_created": total_chunks_created, + "files_processed": total_files_processed, + "files_failed": len(failed_files), + "elapsed_time": elapsed_time, + "model_profile": model_profile, + "model_name": embedder.model_name, + "central_index_path": str(central_hnsw_path), + "failed_files": failed_files[:5], + }, + } + + def get_embeddings_status(index_root: Path) -> Dict[str, any]: """Get comprehensive embeddings coverage status for all indexes. diff --git a/codex-lens/src/codexlens/config.py b/codex-lens/src/codexlens/config.py index d29f13d5..a1a39155 100644 --- a/codex-lens/src/codexlens/config.py +++ b/codex-lens/src/codexlens/config.py @@ -19,6 +19,13 @@ WORKSPACE_DIR_NAME = ".codexlens" # Settings file name SETTINGS_FILE_NAME = "settings.json" +# SPLADE index database name (centralized storage) +SPLADE_DB_NAME = "_splade.db" + +# Dense vector storage names (centralized storage) +VECTORS_HNSW_NAME = "_vectors.hnsw" +VECTORS_META_DB_NAME = "_vectors_meta.db" + log = logging.getLogger(__name__) diff --git a/codex-lens/src/codexlens/search/hybrid_search.py b/codex-lens/src/codexlens/search/hybrid_search.py index 108c7656..919af35e 100644 --- a/codex-lens/src/codexlens/search/hybrid_search.py +++ b/codex-lens/src/codexlens/search/hybrid_search.py @@ -31,6 +31,7 @@ def timer(name: str, logger: logging.Logger, level: int = logging.DEBUG): logger.log(level, "[TIMING] %s: %.2fms", name, elapsed_ms) from codexlens.config import Config +from codexlens.config import VECTORS_HNSW_NAME from codexlens.entities import SearchResult from codexlens.search.ranking import ( DEFAULT_WEIGHTS, @@ -517,11 +518,275 @@ class HybridSearchEngine: self.logger.debug("Fuzzy search error: %s", exc) return [] + def _find_vectors_hnsw(self, index_path: Path) -> Optional[Path]: + """Find the centralized _vectors.hnsw file by traversing up from index_path. + + Similar to _search_splade's approach, this method searches for the + centralized dense vector index file in parent directories. + + Args: + index_path: Path to the current _index.db file + + Returns: + Path to _vectors.hnsw if found, None otherwise + """ + current_dir = index_path.parent + for _ in range(10): # Limit search depth + candidate = current_dir / VECTORS_HNSW_NAME + if candidate.exists(): + return candidate + parent = current_dir.parent + if parent == current_dir: # Reached root + break + current_dir = parent + return None + + def _search_vector_centralized( + self, + index_path: Path, + hnsw_path: Path, + query: str, + limit: int, + category: Optional[str] = None, + ) -> List[SearchResult]: + """Search using centralized vector index. + + Args: + index_path: Path to _index.db file (for metadata lookup) + hnsw_path: Path to centralized _vectors.hnsw file + query: Natural language query string + limit: Maximum results + category: Optional category filter ('code' or 'doc') + + Returns: + List of SearchResult objects ordered by semantic similarity + """ + try: + import sqlite3 + import json + from codexlens.semantic.factory import get_embedder + from codexlens.semantic.ann_index import ANNIndex + + # Get model config from the first index database we can find + # (all indexes should use the same embedding model) + index_root = hnsw_path.parent + model_config = None + + # Try to get model config from the provided index_path first + try: + from codexlens.semantic.vector_store import VectorStore + with VectorStore(index_path) as vs: + model_config = vs.get_model_config() + except Exception: + pass + + # Detect dimension from HNSW file if model config not found + if model_config is None: + self.logger.debug("Model config not found, will detect from HNSW index") + # Create a temporary ANNIndex to load and detect dimension + # We need to know the dimension to properly load the index + + # Get embedder based on model config or default + if model_config: + backend = model_config.get("backend", "fastembed") + model_name = model_config["model_name"] + model_profile = model_config["model_profile"] + embedding_dim = model_config["embedding_dim"] + + if backend == "litellm": + embedder = get_embedder(backend="litellm", model=model_name) + else: + embedder = get_embedder(backend="fastembed", profile=model_profile) + else: + # Default to code profile + embedder = get_embedder(backend="fastembed", profile="code") + embedding_dim = embedder.embedding_dim + + # Load centralized ANN index + start_load = time.perf_counter() + ann_index = ANNIndex.create_central( + index_root=index_root, + dim=embedding_dim, + ) + if not ann_index.load(): + self.logger.warning("Failed to load centralized vector index from %s", hnsw_path) + return [] + self.logger.debug( + "[TIMING] central_ann_load: %.2fms (%d vectors)", + (time.perf_counter() - start_load) * 1000, + ann_index.count() + ) + + # Generate query embedding + start_embed = time.perf_counter() + query_embedding = embedder.embed_single(query) + self.logger.debug( + "[TIMING] query_embedding: %.2fms", + (time.perf_counter() - start_embed) * 1000 + ) + + # Search ANN index + start_search = time.perf_counter() + import numpy as np + query_vec = np.array(query_embedding, dtype=np.float32) + ids, distances = ann_index.search(query_vec, top_k=limit * 2) # Fetch extra for filtering + self.logger.debug( + "[TIMING] central_ann_search: %.2fms (%d results)", + (time.perf_counter() - start_search) * 1000, + len(ids) if ids else 0 + ) + + if not ids: + return [] + + # Convert distances to similarity scores (for cosine: score = 1 - distance) + scores = [1.0 - d for d in distances] + + # Fetch chunk metadata from semantic_chunks tables + # We need to search across all _index.db files in the project + results = self._fetch_chunks_by_ids_centralized( + index_root, ids, scores, category + ) + + return results[:limit] + + except ImportError as exc: + self.logger.debug("Semantic dependencies not available: %s", exc) + return [] + except Exception as exc: + self.logger.error("Centralized vector search error: %s", exc) + return [] + + def _fetch_chunks_by_ids_centralized( + self, + index_root: Path, + chunk_ids: List[int], + scores: List[float], + category: Optional[str] = None, + ) -> List[SearchResult]: + """Fetch chunk metadata from all _index.db files for centralized search. + + Args: + index_root: Root directory containing _index.db files + chunk_ids: List of chunk IDs from ANN search + scores: Corresponding similarity scores + category: Optional category filter + + Returns: + List of SearchResult objects + """ + import sqlite3 + import json + + # Build score map + score_map = {cid: score for cid, score in zip(chunk_ids, scores)} + + # Find all _index.db files + index_files = list(index_root.rglob("_index.db")) + + results = [] + found_ids = set() + + for index_path in index_files: + try: + with sqlite3.connect(index_path) as conn: + conn.row_factory = sqlite3.Row + + # Check if semantic_chunks table exists + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='semantic_chunks'" + ) + if cursor.fetchone() is None: + continue + + # Build query for chunk IDs we haven't found yet + remaining_ids = [cid for cid in chunk_ids if cid not in found_ids] + if not remaining_ids: + break + + placeholders = ",".join("?" * len(remaining_ids)) + + if category: + query = f""" + SELECT id, file_path, content, metadata + FROM semantic_chunks + WHERE id IN ({placeholders}) AND category = ? + """ + params = remaining_ids + [category] + else: + query = f""" + SELECT id, file_path, content, metadata + FROM semantic_chunks + WHERE id IN ({placeholders}) + """ + params = remaining_ids + + rows = conn.execute(query, params).fetchall() + + for row in rows: + chunk_id = row["id"] + if chunk_id in found_ids: + continue + found_ids.add(chunk_id) + + file_path = row["file_path"] + content = row["content"] + metadata_json = row["metadata"] + metadata = json.loads(metadata_json) if metadata_json else {} + + score = score_map.get(chunk_id, 0.0) + + # Build excerpt + excerpt = content[:200] + "..." if len(content) > 200 else content + + # Extract symbol information + symbol_name = metadata.get("symbol_name") + symbol_kind = metadata.get("symbol_kind") + start_line = metadata.get("start_line") + end_line = metadata.get("end_line") + + # Build Symbol object if available + symbol = None + if symbol_name and symbol_kind and start_line and end_line: + try: + from codexlens.entities import Symbol + symbol = Symbol( + name=symbol_name, + kind=symbol_kind, + range=(start_line, end_line) + ) + except Exception: + pass + + results.append(SearchResult( + path=file_path, + score=score, + excerpt=excerpt, + content=content, + symbol=symbol, + metadata=metadata, + start_line=start_line, + end_line=end_line, + symbol_name=symbol_name, + symbol_kind=symbol_kind, + )) + + except Exception as e: + self.logger.debug("Failed to fetch chunks from %s: %s", index_path, e) + continue + + # Sort by score descending + results.sort(key=lambda r: r.score, reverse=True) + return results + def _search_vector( self, index_path: Path, query: str, limit: int, category: Optional[str] = None ) -> List[SearchResult]: """Execute vector similarity search using semantic embeddings. + Supports both centralized vector storage (single _vectors.hnsw at project root) + and distributed storage (per-directory .hnsw files). + Args: index_path: Path to _index.db file query: Natural language query string @@ -532,6 +797,15 @@ class HybridSearchEngine: List of SearchResult objects ordered by semantic similarity """ try: + # First, check for centralized vector index + central_hnsw_path = self._find_vectors_hnsw(index_path) + if central_hnsw_path is not None: + self.logger.debug("Found centralized vector index at %s", central_hnsw_path) + return self._search_vector_centralized( + index_path, central_hnsw_path, query, limit, category + ) + + # Fallback to distributed (per-index) vector storage # Check if semantic chunks table exists import sqlite3 @@ -677,9 +951,10 @@ class HybridSearchEngine: try: from codexlens.semantic.splade_encoder import get_splade_encoder, check_splade_available from codexlens.storage.splade_index import SpladeIndex + from codexlens.config import SPLADE_DB_NAME import sqlite3 import json - + # Check dependencies ok, err = check_splade_available() if not ok: @@ -691,7 +966,7 @@ class HybridSearchEngine: current_dir = index_path.parent splade_db_path = None for _ in range(10): # Limit search depth - candidate = current_dir / "_splade.db" + candidate = current_dir / SPLADE_DB_NAME if candidate.exists(): splade_db_path = candidate break diff --git a/codex-lens/src/codexlens/semantic/ann_index.py b/codex-lens/src/codexlens/semantic/ann_index.py index 7f042c5a..f823e046 100644 --- a/codex-lens/src/codexlens/semantic/ann_index.py +++ b/codex-lens/src/codexlens/semantic/ann_index.py @@ -9,6 +9,7 @@ Key features: - Incremental vector addition and deletion - Thread-safe operations - Cosine similarity metric +- Support for centralized storage mode (single index at project root) """ from __future__ import annotations @@ -19,6 +20,7 @@ from pathlib import Path from typing import List, Optional, Tuple from codexlens.errors import StorageError +from codexlens.config import VECTORS_HNSW_NAME from . import SEMANTIC_AVAILABLE @@ -127,6 +129,94 @@ class ANNIndex: f"auto_save={auto_save}, expansion_threshold={expansion_threshold}" ) + @classmethod + def create_central( + cls, + index_root: Path, + dim: int, + initial_capacity: int = 50000, + auto_save: bool = False, + expansion_threshold: float = 0.8, + ) -> "ANNIndex": + """Create a centralized ANN index at the project index root. + + This method creates a single shared HNSW index file at the project root, + rather than per-directory indexes. Use this for projects that want all + dense vectors stored in one central location. + + Args: + index_root: Root directory for the index (e.g., .codexlens//) + dim: Dimension of embedding vectors + initial_capacity: Initial maximum elements capacity (default: 50000) + auto_save: Whether to automatically save index after operations (default: False) + expansion_threshold: Capacity threshold to trigger auto-expansion (default: 0.8) + + Returns: + ANNIndex instance configured for centralized storage + + Example: + >>> index = ANNIndex.create_central(Path(".codexlens/abc123"), dim=768) + >>> index.hnsw_path # Returns: .codexlens/abc123/_vectors.hnsw + """ + # Create a dummy index_path that will result in the central hnsw_path + # The index_path is used to derive hnsw_path, so we create a virtual path + # such that self.hnsw_path = index_root / VECTORS_HNSW_NAME + instance = cls.__new__(cls) + + if not SEMANTIC_AVAILABLE: + raise ImportError( + "Semantic search dependencies not available. " + "Install with: pip install codexlens[semantic]" + ) + + if not HNSWLIB_AVAILABLE: + raise ImportError( + "hnswlib is required for ANN index. " + "Install with: pip install hnswlib" + ) + + if dim <= 0: + raise ValueError(f"Invalid dimension: {dim}") + + if initial_capacity <= 0: + raise ValueError(f"Invalid initial capacity: {initial_capacity}") + + if not 0.0 < expansion_threshold < 1.0: + raise ValueError( + f"Invalid expansion threshold: {expansion_threshold}. Must be between 0 and 1." + ) + + instance.index_path = index_root + instance.dim = dim + + # Centralized mode: use VECTORS_HNSW_NAME directly at index_root + instance.hnsw_path = index_root / VECTORS_HNSW_NAME + + # HNSW parameters + instance.space = "cosine" + instance.M = 16 + instance.ef_construction = 200 + instance.ef = 50 + + # Memory management parameters + instance._auto_save = auto_save + instance._expansion_threshold = expansion_threshold + + # Thread safety + instance._lock = threading.RLock() + + # HNSW index instance + instance._index: Optional[hnswlib.Index] = None + instance._max_elements = initial_capacity + instance._current_count = 0 + + logger.info( + f"Initialized centralized ANNIndex at {instance.hnsw_path} with " + f"capacity={initial_capacity}, auto_save={auto_save}" + ) + + return instance + def _ensure_index(self) -> None: """Ensure HNSW index is initialized (lazy initialization).""" if self._index is None: