From 92ed2524b784153e9133f8e4f2e6d91bb1441954 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Fri, 2 Jan 2026 13:25:23 +0800 Subject: [PATCH] feat: Enhance SPLADE indexing command to support multiple index databases and add chunk ID management --- codex-lens/src/codexlens/cli/commands.py | 95 ++++++++++--- codex-lens/src/codexlens/entities.py | 2 + .../src/codexlens/search/hybrid_search.py | 47 ++++--- .../src/codexlens/storage/splade_index.py | 130 +++++++++++++++++- 4 files changed, 232 insertions(+), 42 deletions(-) diff --git a/codex-lens/src/codexlens/cli/commands.py b/codex-lens/src/codexlens/cli/commands.py index 02f01bcd..cc61010e 100644 --- a/codex-lens/src/codexlens/cli/commands.py +++ b/codex-lens/src/codexlens/cli/commands.py @@ -2456,6 +2456,9 @@ def splade_index_command( Encodes all semantic chunks with SPLADE model and builds inverted index for efficient sparse retrieval. + This command discovers all _index.db files recursively in the project's + index directory and builds SPLADE encodings for chunks across all of them. + Examples: codexlens splade-index ~/projects/my-app codexlens splade-index . --rebuild @@ -2473,17 +2476,17 @@ def splade_index_command( console.print("[dim]Install with: pip install transformers torch[/dim]") raise typer.Exit(1) - # Find index database + # Find index root directory target_path = path.expanduser().resolve() - # Try to find _index.db + # Determine index root directory (containing _index.db files) if target_path.is_file() and target_path.name == "_index.db": - index_db = target_path + index_root = target_path.parent elif target_path.is_dir(): # Check for local .codexlens/_index.db local_index = target_path / ".codexlens" / "_index.db" if local_index.exists(): - index_db = local_index + index_root = local_index.parent else: # Try to find via registry registry = RegistryStore() @@ -2495,29 +2498,66 @@ def splade_index_command( console.print(f"[red]Error:[/red] No index found for {target_path}") console.print("Run 'codexlens init' first to create an index") raise typer.Exit(1) + index_root = index_db.parent finally: registry.close() else: console.print(f"[red]Error:[/red] Path must be _index.db file or indexed directory") raise typer.Exit(1) - splade_db = index_db.parent / "_splade.db" + # Discover all _index.db files recursively + all_index_dbs = sorted(index_root.rglob("_index.db")) + if not all_index_dbs: + console.print(f"[red]Error:[/red] No _index.db files found in {index_root}") + raise typer.Exit(1) + + console.print(f"[blue]Discovered {len(all_index_dbs)} index databases[/blue]") + + # SPLADE index is stored alongside the root _index.db + splade_db = index_root / "_splade.db" if splade_db.exists() and not rebuild: console.print("[yellow]SPLADE index exists. Use --rebuild to regenerate.[/yellow]") return - # Load chunks from vector store - console.print(f"[blue]Loading chunks from {index_db.name}...[/blue]") - vector_store = VectorStore(index_db) - chunks = vector_store.get_all_chunks() + # If rebuild, delete existing splade database + if splade_db.exists() and rebuild: + splade_db.unlink() - if not chunks: - console.print("[yellow]No chunks found in vector store[/yellow]") - console.print("[dim]Generate embeddings first with 'codexlens embeddings-generate'[/dim]") + # Collect all chunks from all distributed index databases + # Assign globally unique IDs to avoid collisions (each DB starts with ID 1) + console.print(f"[blue]Loading chunks from {len(all_index_dbs)} distributed indexes...[/blue]") + all_chunks = [] # (global_id, chunk) pairs + total_files_checked = 0 + indexes_with_chunks = 0 + global_id = 0 # Sequential global ID across all databases + + for index_db in all_index_dbs: + total_files_checked += 1 + try: + vector_store = VectorStore(index_db) + chunks = vector_store.get_all_chunks() + if chunks: + indexes_with_chunks += 1 + # Assign sequential global IDs to avoid collisions + for chunk in chunks: + global_id += 1 + all_chunks.append((global_id, chunk, index_db)) + if verbose: + console.print(f" [dim]{index_db.parent.name}: {len(chunks)} chunks[/dim]") + vector_store.close() + except Exception as e: + if verbose: + console.print(f" [yellow]Warning: Failed to read {index_db}: {e}[/yellow]") + + if not all_chunks: + console.print("[yellow]No chunks found in any index database[/yellow]") + console.print(f"[dim]Checked {total_files_checked} index files, found 0 chunks[/dim]") + console.print("[dim]Generate embeddings first with 'codexlens embeddings-generate --recursive'[/dim]") raise typer.Exit(1) - console.print(f"[blue]Encoding {len(chunks)} chunks with SPLADE...[/blue]") + console.print(f"[blue]Found {len(all_chunks)} chunks across {indexes_with_chunks} indexes[/blue]") + console.print(f"[blue]Encoding with SPLADE...[/blue]") # Initialize SPLADE encoder = get_splade_encoder() @@ -2525,6 +2565,7 @@ def splade_index_command( splade_index.create_tables() # Encode in batches with progress bar + chunk_metadata_batch = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), @@ -2533,12 +2574,31 @@ def splade_index_command( TimeElapsedColumn(), console=console, ) as progress: - task = progress.add_task("Encoding...", total=len(chunks)) - for chunk in chunks: + task = progress.add_task("Encoding...", total=len(all_chunks)) + for global_id, chunk, source_db_path in all_chunks: sparse_vec = encoder.encode_text(chunk.content) - splade_index.add_posting(chunk.id, sparse_vec) + splade_index.add_posting(global_id, sparse_vec) + # Store chunk metadata for self-contained search + # Serialize metadata dict to JSON string + metadata_str = None + if hasattr(chunk, 'metadata') and chunk.metadata: + try: + metadata_str = json.dumps(chunk.metadata) if isinstance(chunk.metadata, dict) else chunk.metadata + except Exception: + pass + chunk_metadata_batch.append(( + global_id, + chunk.file_path or "", + chunk.content, + metadata_str, + str(source_db_path) + )) progress.advance(task) + # Batch insert chunk metadata + if chunk_metadata_batch: + splade_index.add_chunks_metadata_batch(chunk_metadata_batch) + # Set metadata splade_index.set_metadata( model_name=encoder.model_name, @@ -2546,7 +2606,8 @@ def splade_index_command( ) stats = splade_index.get_stats() - console.print(f"[green]✓[/green] SPLADE index built: {stats['unique_chunks']} chunks, {stats['total_postings']} postings") + console.print(f"[green]OK[/green] SPLADE index built: {stats['unique_chunks']} chunks, {stats['total_postings']} postings") + console.print(f" Source indexes: {indexes_with_chunks}") console.print(f" Database: [dim]{splade_db}[/dim]") diff --git a/codex-lens/src/codexlens/entities.py b/codex-lens/src/codexlens/entities.py index 9de58c07..d569cc3e 100644 --- a/codex-lens/src/codexlens/entities.py +++ b/codex-lens/src/codexlens/entities.py @@ -36,6 +36,8 @@ class SemanticChunk(BaseModel): content: str = Field(..., min_length=1) embedding: Optional[List[float]] = Field(default=None, description="Vector embedding for semantic search") metadata: Dict[str, Any] = Field(default_factory=dict) + id: Optional[int] = Field(default=None, description="Database row ID") + file_path: Optional[str] = Field(default=None, description="Source file path") @field_validator("embedding") @classmethod diff --git a/codex-lens/src/codexlens/search/hybrid_search.py b/codex-lens/src/codexlens/search/hybrid_search.py index 82ac816c..3b6a554f 100644 --- a/codex-lens/src/codexlens/search/hybrid_search.py +++ b/codex-lens/src/codexlens/search/hybrid_search.py @@ -655,9 +655,26 @@ class HybridSearchEngine: if not ok: self.logger.debug("SPLADE not available: %s", err) return [] - - # Use main index database (SPLADE tables are in _index.db, not separate _splade.db) - splade_index = SpladeIndex(index_path) + + # SPLADE index is stored in _splade.db at the project index root + # Traverse up from the current index to find the root _splade.db + current_dir = index_path.parent + splade_db_path = None + for _ in range(10): # Limit search depth + candidate = current_dir / "_splade.db" + if candidate.exists(): + splade_db_path = candidate + break + parent = current_dir.parent + if parent == current_dir: # Reached root + break + current_dir = parent + + if not splade_db_path: + self.logger.debug("SPLADE index not found in ancestor directories of %s", index_path) + return [] + + splade_index = SpladeIndex(splade_db_path) if not splade_index.has_index(): self.logger.debug("SPLADE index not initialized") return [] @@ -671,24 +688,14 @@ class HybridSearchEngine: if not raw_results: return [] - - # Fetch chunk details from main index database + + # Fetch chunk details from splade_chunks table (self-contained) chunk_ids = [chunk_id for chunk_id, _ in raw_results] score_map = {chunk_id: score for chunk_id, score in raw_results} - - # Query semantic_chunks table for full details - placeholders = ",".join("?" * len(chunk_ids)) - with sqlite3.connect(index_path) as conn: - conn.row_factory = sqlite3.Row - rows = conn.execute( - f""" - SELECT id, file_path, content, metadata - FROM semantic_chunks - WHERE id IN ({placeholders}) - """, - chunk_ids - ).fetchall() - + + # Get chunk metadata from SPLADE database + rows = splade_index.get_chunks_by_ids(chunk_ids) + # Build SearchResult objects results = [] for row in rows: @@ -697,7 +704,7 @@ class HybridSearchEngine: content = row["content"] metadata_json = row["metadata"] metadata = json.loads(metadata_json) if metadata_json else {} - + score = score_map.get(chunk_id, 0.0) # Build excerpt (short preview) diff --git a/codex-lens/src/codexlens/storage/splade_index.py b/codex-lens/src/codexlens/storage/splade_index.py index 6a7c2fa1..62a623f0 100644 --- a/codex-lens/src/codexlens/storage/splade_index.py +++ b/codex-lens/src/codexlens/storage/splade_index.py @@ -102,21 +102,22 @@ class SpladeIndex: def create_tables(self) -> None: """Create SPLADE schema if not exists. - Note: The splade_posting_list table has a FOREIGN KEY constraint - referencing semantic_chunks(id). Ensure VectorStore.create_tables() - is called first to create the semantic_chunks table. + Note: When used with distributed indexes (multiple _index.db files), + the SPLADE database stores chunk IDs from multiple sources. In this case, + foreign key constraints are not enforced to allow cross-database references. """ with self._lock: conn = self._get_connection() try: # Inverted index for sparse vectors + # Note: No FOREIGN KEY constraint to support distributed index architecture + # where chunks may come from multiple _index.db files conn.execute(""" CREATE TABLE IF NOT EXISTS splade_posting_list ( token_id INTEGER NOT NULL, chunk_id INTEGER NOT NULL, weight REAL NOT NULL, - PRIMARY KEY (token_id, chunk_id), - FOREIGN KEY (chunk_id) REFERENCES semantic_chunks(id) ON DELETE CASCADE + PRIMARY KEY (token_id, chunk_id) ) """) @@ -140,6 +141,18 @@ class SpladeIndex: created_at REAL ) """) + + # Chunk metadata for self-contained search results + # Stores all chunk info needed to build SearchResult without querying _index.db + conn.execute(""" + CREATE TABLE IF NOT EXISTS splade_chunks ( + id INTEGER PRIMARY KEY, + file_path TEXT NOT NULL, + content TEXT NOT NULL, + metadata TEXT, + source_db TEXT + ) + """) conn.commit() logger.debug("SPLADE schema created successfully") @@ -233,6 +246,113 @@ class SpladeIndex: db_path=str(self.db_path), operation="add_postings_batch" ) from e + + def add_chunk_metadata( + self, + chunk_id: int, + file_path: str, + content: str, + metadata: Optional[str] = None, + source_db: Optional[str] = None + ) -> None: + """Store chunk metadata for self-contained search results. + + Args: + chunk_id: Global chunk ID. + file_path: Path to source file. + content: Chunk text content. + metadata: JSON metadata string. + source_db: Path to source _index.db. + """ + with self._lock: + conn = self._get_connection() + try: + conn.execute( + """ + INSERT OR REPLACE INTO splade_chunks + (id, file_path, content, metadata, source_db) + VALUES (?, ?, ?, ?, ?) + """, + (chunk_id, file_path, content, metadata, source_db) + ) + conn.commit() + except sqlite3.Error as e: + raise StorageError( + f"Failed to add chunk metadata for chunk_id={chunk_id}: {e}", + db_path=str(self.db_path), + operation="add_chunk_metadata" + ) from e + + def add_chunks_metadata_batch( + self, + chunks: List[Tuple[int, str, str, Optional[str], Optional[str]]] + ) -> None: + """Batch insert chunk metadata. + + Args: + chunks: List of (chunk_id, file_path, content, metadata, source_db) tuples. + """ + if not chunks: + return + + with self._lock: + conn = self._get_connection() + try: + conn.executemany( + """ + INSERT OR REPLACE INTO splade_chunks + (id, file_path, content, metadata, source_db) + VALUES (?, ?, ?, ?, ?) + """, + chunks + ) + conn.commit() + logger.debug("Batch inserted %d chunk metadata records", len(chunks)) + except sqlite3.Error as e: + raise StorageError( + f"Failed to batch insert chunk metadata: {e}", + db_path=str(self.db_path), + operation="add_chunks_metadata_batch" + ) from e + + def get_chunks_by_ids(self, chunk_ids: List[int]) -> List[Dict]: + """Get chunk metadata by IDs. + + Args: + chunk_ids: List of chunk IDs to retrieve. + + Returns: + List of dicts with id, file_path, content, metadata, source_db. + """ + if not chunk_ids: + return [] + + with self._lock: + conn = self._get_connection() + try: + placeholders = ",".join("?" * len(chunk_ids)) + rows = conn.execute( + f""" + SELECT id, file_path, content, metadata, source_db + FROM splade_chunks + WHERE id IN ({placeholders}) + """, + chunk_ids + ).fetchall() + + return [ + { + "id": row["id"], + "file_path": row["file_path"], + "content": row["content"], + "metadata": row["metadata"], + "source_db": row["source_db"] + } + for row in rows + ] + except sqlite3.Error as e: + logger.error("Failed to get chunks by IDs: %s", e) + return [] def remove_chunk(self, chunk_id: int) -> int: """Remove all postings for a chunk.