feat: Enhance SPLADE indexing command to support multiple index databases and add chunk ID management

This commit is contained in:
catlog22
2026-01-02 13:25:23 +08:00
parent 56c03c847a
commit 92ed2524b7
4 changed files with 232 additions and 42 deletions

View File

@@ -2456,6 +2456,9 @@ def splade_index_command(
Encodes all semantic chunks with SPLADE model and builds inverted index Encodes all semantic chunks with SPLADE model and builds inverted index
for efficient sparse retrieval. for efficient sparse retrieval.
This command discovers all _index.db files recursively in the project's
index directory and builds SPLADE encodings for chunks across all of them.
Examples: Examples:
codexlens splade-index ~/projects/my-app codexlens splade-index ~/projects/my-app
codexlens splade-index . --rebuild codexlens splade-index . --rebuild
@@ -2473,17 +2476,17 @@ def splade_index_command(
console.print("[dim]Install with: pip install transformers torch[/dim]") console.print("[dim]Install with: pip install transformers torch[/dim]")
raise typer.Exit(1) raise typer.Exit(1)
# Find index database # Find index root directory
target_path = path.expanduser().resolve() target_path = path.expanduser().resolve()
# Try to find _index.db # Determine index root directory (containing _index.db files)
if target_path.is_file() and target_path.name == "_index.db": if target_path.is_file() and target_path.name == "_index.db":
index_db = target_path index_root = target_path.parent
elif target_path.is_dir(): elif target_path.is_dir():
# Check for local .codexlens/_index.db # Check for local .codexlens/_index.db
local_index = target_path / ".codexlens" / "_index.db" local_index = target_path / ".codexlens" / "_index.db"
if local_index.exists(): if local_index.exists():
index_db = local_index index_root = local_index.parent
else: else:
# Try to find via registry # Try to find via registry
registry = RegistryStore() registry = RegistryStore()
@@ -2495,29 +2498,66 @@ def splade_index_command(
console.print(f"[red]Error:[/red] No index found for {target_path}") console.print(f"[red]Error:[/red] No index found for {target_path}")
console.print("Run 'codexlens init' first to create an index") console.print("Run 'codexlens init' first to create an index")
raise typer.Exit(1) raise typer.Exit(1)
index_root = index_db.parent
finally: finally:
registry.close() registry.close()
else: else:
console.print(f"[red]Error:[/red] Path must be _index.db file or indexed directory") console.print(f"[red]Error:[/red] Path must be _index.db file or indexed directory")
raise typer.Exit(1) raise typer.Exit(1)
splade_db = index_db.parent / "_splade.db" # Discover all _index.db files recursively
all_index_dbs = sorted(index_root.rglob("_index.db"))
if not all_index_dbs:
console.print(f"[red]Error:[/red] No _index.db files found in {index_root}")
raise typer.Exit(1)
console.print(f"[blue]Discovered {len(all_index_dbs)} index databases[/blue]")
# SPLADE index is stored alongside the root _index.db
splade_db = index_root / "_splade.db"
if splade_db.exists() and not rebuild: if splade_db.exists() and not rebuild:
console.print("[yellow]SPLADE index exists. Use --rebuild to regenerate.[/yellow]") console.print("[yellow]SPLADE index exists. Use --rebuild to regenerate.[/yellow]")
return return
# Load chunks from vector store # If rebuild, delete existing splade database
console.print(f"[blue]Loading chunks from {index_db.name}...[/blue]") if splade_db.exists() and rebuild:
splade_db.unlink()
# Collect all chunks from all distributed index databases
# Assign globally unique IDs to avoid collisions (each DB starts with ID 1)
console.print(f"[blue]Loading chunks from {len(all_index_dbs)} distributed indexes...[/blue]")
all_chunks = [] # (global_id, chunk) pairs
total_files_checked = 0
indexes_with_chunks = 0
global_id = 0 # Sequential global ID across all databases
for index_db in all_index_dbs:
total_files_checked += 1
try:
vector_store = VectorStore(index_db) vector_store = VectorStore(index_db)
chunks = vector_store.get_all_chunks() chunks = vector_store.get_all_chunks()
if chunks:
indexes_with_chunks += 1
# Assign sequential global IDs to avoid collisions
for chunk in chunks:
global_id += 1
all_chunks.append((global_id, chunk, index_db))
if verbose:
console.print(f" [dim]{index_db.parent.name}: {len(chunks)} chunks[/dim]")
vector_store.close()
except Exception as e:
if verbose:
console.print(f" [yellow]Warning: Failed to read {index_db}: {e}[/yellow]")
if not chunks: if not all_chunks:
console.print("[yellow]No chunks found in vector store[/yellow]") console.print("[yellow]No chunks found in any index database[/yellow]")
console.print("[dim]Generate embeddings first with 'codexlens embeddings-generate'[/dim]") console.print(f"[dim]Checked {total_files_checked} index files, found 0 chunks[/dim]")
console.print("[dim]Generate embeddings first with 'codexlens embeddings-generate --recursive'[/dim]")
raise typer.Exit(1) raise typer.Exit(1)
console.print(f"[blue]Encoding {len(chunks)} chunks with SPLADE...[/blue]") console.print(f"[blue]Found {len(all_chunks)} chunks across {indexes_with_chunks} indexes[/blue]")
console.print(f"[blue]Encoding with SPLADE...[/blue]")
# Initialize SPLADE # Initialize SPLADE
encoder = get_splade_encoder() encoder = get_splade_encoder()
@@ -2525,6 +2565,7 @@ def splade_index_command(
splade_index.create_tables() splade_index.create_tables()
# Encode in batches with progress bar # Encode in batches with progress bar
chunk_metadata_batch = []
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
@@ -2533,12 +2574,31 @@ def splade_index_command(
TimeElapsedColumn(), TimeElapsedColumn(),
console=console, console=console,
) as progress: ) as progress:
task = progress.add_task("Encoding...", total=len(chunks)) task = progress.add_task("Encoding...", total=len(all_chunks))
for chunk in chunks: for global_id, chunk, source_db_path in all_chunks:
sparse_vec = encoder.encode_text(chunk.content) sparse_vec = encoder.encode_text(chunk.content)
splade_index.add_posting(chunk.id, sparse_vec) splade_index.add_posting(global_id, sparse_vec)
# Store chunk metadata for self-contained search
# Serialize metadata dict to JSON string
metadata_str = None
if hasattr(chunk, 'metadata') and chunk.metadata:
try:
metadata_str = json.dumps(chunk.metadata) if isinstance(chunk.metadata, dict) else chunk.metadata
except Exception:
pass
chunk_metadata_batch.append((
global_id,
chunk.file_path or "",
chunk.content,
metadata_str,
str(source_db_path)
))
progress.advance(task) progress.advance(task)
# Batch insert chunk metadata
if chunk_metadata_batch:
splade_index.add_chunks_metadata_batch(chunk_metadata_batch)
# Set metadata # Set metadata
splade_index.set_metadata( splade_index.set_metadata(
model_name=encoder.model_name, model_name=encoder.model_name,
@@ -2546,7 +2606,8 @@ def splade_index_command(
) )
stats = splade_index.get_stats() stats = splade_index.get_stats()
console.print(f"[green][/green] SPLADE index built: {stats['unique_chunks']} chunks, {stats['total_postings']} postings") console.print(f"[green]OK[/green] SPLADE index built: {stats['unique_chunks']} chunks, {stats['total_postings']} postings")
console.print(f" Source indexes: {indexes_with_chunks}")
console.print(f" Database: [dim]{splade_db}[/dim]") console.print(f" Database: [dim]{splade_db}[/dim]")

View File

@@ -36,6 +36,8 @@ class SemanticChunk(BaseModel):
content: str = Field(..., min_length=1) content: str = Field(..., min_length=1)
embedding: Optional[List[float]] = Field(default=None, description="Vector embedding for semantic search") embedding: Optional[List[float]] = Field(default=None, description="Vector embedding for semantic search")
metadata: Dict[str, Any] = Field(default_factory=dict) metadata: Dict[str, Any] = Field(default_factory=dict)
id: Optional[int] = Field(default=None, description="Database row ID")
file_path: Optional[str] = Field(default=None, description="Source file path")
@field_validator("embedding") @field_validator("embedding")
@classmethod @classmethod

View File

@@ -656,8 +656,25 @@ class HybridSearchEngine:
self.logger.debug("SPLADE not available: %s", err) self.logger.debug("SPLADE not available: %s", err)
return [] return []
# Use main index database (SPLADE tables are in _index.db, not separate _splade.db) # SPLADE index is stored in _splade.db at the project index root
splade_index = SpladeIndex(index_path) # Traverse up from the current index to find the root _splade.db
current_dir = index_path.parent
splade_db_path = None
for _ in range(10): # Limit search depth
candidate = current_dir / "_splade.db"
if candidate.exists():
splade_db_path = candidate
break
parent = current_dir.parent
if parent == current_dir: # Reached root
break
current_dir = parent
if not splade_db_path:
self.logger.debug("SPLADE index not found in ancestor directories of %s", index_path)
return []
splade_index = SpladeIndex(splade_db_path)
if not splade_index.has_index(): if not splade_index.has_index():
self.logger.debug("SPLADE index not initialized") self.logger.debug("SPLADE index not initialized")
return [] return []
@@ -672,22 +689,12 @@ class HybridSearchEngine:
if not raw_results: if not raw_results:
return [] return []
# Fetch chunk details from main index database # Fetch chunk details from splade_chunks table (self-contained)
chunk_ids = [chunk_id for chunk_id, _ in raw_results] chunk_ids = [chunk_id for chunk_id, _ in raw_results]
score_map = {chunk_id: score for chunk_id, score in raw_results} score_map = {chunk_id: score for chunk_id, score in raw_results}
# Query semantic_chunks table for full details # Get chunk metadata from SPLADE database
placeholders = ",".join("?" * len(chunk_ids)) rows = splade_index.get_chunks_by_ids(chunk_ids)
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
f"""
SELECT id, file_path, content, metadata
FROM semantic_chunks
WHERE id IN ({placeholders})
""",
chunk_ids
).fetchall()
# Build SearchResult objects # Build SearchResult objects
results = [] results = []

View File

@@ -102,21 +102,22 @@ class SpladeIndex:
def create_tables(self) -> None: def create_tables(self) -> None:
"""Create SPLADE schema if not exists. """Create SPLADE schema if not exists.
Note: The splade_posting_list table has a FOREIGN KEY constraint Note: When used with distributed indexes (multiple _index.db files),
referencing semantic_chunks(id). Ensure VectorStore.create_tables() the SPLADE database stores chunk IDs from multiple sources. In this case,
is called first to create the semantic_chunks table. foreign key constraints are not enforced to allow cross-database references.
""" """
with self._lock: with self._lock:
conn = self._get_connection() conn = self._get_connection()
try: try:
# Inverted index for sparse vectors # Inverted index for sparse vectors
# Note: No FOREIGN KEY constraint to support distributed index architecture
# where chunks may come from multiple _index.db files
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS splade_posting_list ( CREATE TABLE IF NOT EXISTS splade_posting_list (
token_id INTEGER NOT NULL, token_id INTEGER NOT NULL,
chunk_id INTEGER NOT NULL, chunk_id INTEGER NOT NULL,
weight REAL NOT NULL, weight REAL NOT NULL,
PRIMARY KEY (token_id, chunk_id), PRIMARY KEY (token_id, chunk_id)
FOREIGN KEY (chunk_id) REFERENCES semantic_chunks(id) ON DELETE CASCADE
) )
""") """)
@@ -141,6 +142,18 @@ class SpladeIndex:
) )
""") """)
# Chunk metadata for self-contained search results
# Stores all chunk info needed to build SearchResult without querying _index.db
conn.execute("""
CREATE TABLE IF NOT EXISTS splade_chunks (
id INTEGER PRIMARY KEY,
file_path TEXT NOT NULL,
content TEXT NOT NULL,
metadata TEXT,
source_db TEXT
)
""")
conn.commit() conn.commit()
logger.debug("SPLADE schema created successfully") logger.debug("SPLADE schema created successfully")
except sqlite3.Error as e: except sqlite3.Error as e:
@@ -234,6 +247,113 @@ class SpladeIndex:
operation="add_postings_batch" operation="add_postings_batch"
) from e ) from e
def add_chunk_metadata(
self,
chunk_id: int,
file_path: str,
content: str,
metadata: Optional[str] = None,
source_db: Optional[str] = None
) -> None:
"""Store chunk metadata for self-contained search results.
Args:
chunk_id: Global chunk ID.
file_path: Path to source file.
content: Chunk text content.
metadata: JSON metadata string.
source_db: Path to source _index.db.
"""
with self._lock:
conn = self._get_connection()
try:
conn.execute(
"""
INSERT OR REPLACE INTO splade_chunks
(id, file_path, content, metadata, source_db)
VALUES (?, ?, ?, ?, ?)
""",
(chunk_id, file_path, content, metadata, source_db)
)
conn.commit()
except sqlite3.Error as e:
raise StorageError(
f"Failed to add chunk metadata for chunk_id={chunk_id}: {e}",
db_path=str(self.db_path),
operation="add_chunk_metadata"
) from e
def add_chunks_metadata_batch(
self,
chunks: List[Tuple[int, str, str, Optional[str], Optional[str]]]
) -> None:
"""Batch insert chunk metadata.
Args:
chunks: List of (chunk_id, file_path, content, metadata, source_db) tuples.
"""
if not chunks:
return
with self._lock:
conn = self._get_connection()
try:
conn.executemany(
"""
INSERT OR REPLACE INTO splade_chunks
(id, file_path, content, metadata, source_db)
VALUES (?, ?, ?, ?, ?)
""",
chunks
)
conn.commit()
logger.debug("Batch inserted %d chunk metadata records", len(chunks))
except sqlite3.Error as e:
raise StorageError(
f"Failed to batch insert chunk metadata: {e}",
db_path=str(self.db_path),
operation="add_chunks_metadata_batch"
) from e
def get_chunks_by_ids(self, chunk_ids: List[int]) -> List[Dict]:
"""Get chunk metadata by IDs.
Args:
chunk_ids: List of chunk IDs to retrieve.
Returns:
List of dicts with id, file_path, content, metadata, source_db.
"""
if not chunk_ids:
return []
with self._lock:
conn = self._get_connection()
try:
placeholders = ",".join("?" * len(chunk_ids))
rows = conn.execute(
f"""
SELECT id, file_path, content, metadata, source_db
FROM splade_chunks
WHERE id IN ({placeholders})
""",
chunk_ids
).fetchall()
return [
{
"id": row["id"],
"file_path": row["file_path"],
"content": row["content"],
"metadata": row["metadata"],
"source_db": row["source_db"]
}
for row in rows
]
except sqlite3.Error as e:
logger.error("Failed to get chunks by IDs: %s", e)
return []
def remove_chunk(self, chunk_id: int) -> int: def remove_chunk(self, chunk_id: int) -> int:
"""Remove all postings for a chunk. """Remove all postings for a chunk.