feat: Implement cascade indexing command and benchmark script for performance evaluation

This commit is contained in:
catlog22
2026-01-02 11:24:06 +08:00
parent e21d801523
commit da68ba0b82
4 changed files with 984 additions and 3 deletions

View File

@@ -7,7 +7,7 @@ import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
from typing import Annotated, Any, Dict, Iterable, List, Optional
import typer
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
@@ -2721,3 +2721,305 @@ def _display_index_result(result) -> None:
console.print(f" [red]Error:[/red] {error}")
if len(result.errors) > 3:
console.print(f" [dim]... and {len(result.errors) - 3} more errors[/dim]")
# ==================== Cascade Index Commands ====================
def get_binary_index_path(db_path: Path) -> Path:
"""Get the path for binary ANN index file.
Args:
db_path: Path to the _index.db file
Returns:
Path to the binary index file (_index_binary.bin)
"""
return db_path.parent / f"{db_path.stem}_binary.bin"
@app.command("cascade-index")
def cascade_index(
path: Annotated[Path, typer.Argument(help="Directory to index")],
force: Annotated[bool, typer.Option("--force", "-f", help="Force regenerate")] = False,
batch_size: Annotated[int, typer.Option("--batch-size", "-b", help="Batch size for embedding")] = 32,
json_mode: Annotated[bool, typer.Option("--json", help="Output JSON response")] = False,
verbose: Annotated[bool, typer.Option("--verbose", "-v", help="Enable verbose logging")] = False,
) -> None:
"""Generate cascade embeddings (binary + dense) for two-stage retrieval.
Cascade retrieval uses a two-stage approach:
1. Binary search (fast, 32 bytes/vector) -> coarse filtering
2. Dense rerank (precise, 8KB/vector) -> final results
This command:
- Finds all _index.db files in the directory
- Generates binary (256-dim) and dense (2048-dim) embeddings for each chunk
- Stores embeddings in the database (embedding_binary, embedding_dense columns)
- Creates a BinaryANNIndex file for fast coarse retrieval
Examples:
codexlens cascade-index ~/projects/my-app
codexlens cascade-index . --force
codexlens cascade-index . --batch-size 64 --verbose
"""
_configure_logging(verbose, json_mode)
target_path = path.expanduser().resolve()
# Find index database(s)
if target_path.is_file() and target_path.name == "_index.db":
index_dbs = [target_path]
elif target_path.is_dir():
# Check local .codexlens/_index.db first
local_index = target_path / ".codexlens" / "_index.db"
if local_index.exists():
index_dbs = [local_index]
else:
# Find via registry
registry = RegistryStore()
try:
registry.initialize()
mapper = PathMapper()
index_db = mapper.source_to_index_db(target_path)
if not index_db.exists():
if json_mode:
print_json(success=False, error=f"No index found for {target_path}")
else:
console.print(f"[red]Error:[/red] No index found for {target_path}")
console.print("Run 'codexlens init' first to create an index")
raise typer.Exit(code=1)
# Find all _index.db files under the index root
index_root = index_db.parent
index_dbs = list(index_root.rglob("_index.db"))
finally:
registry.close()
else:
if json_mode:
print_json(success=False, error="Path must be _index.db file or indexed directory")
else:
console.print("[red]Error:[/red] Path must be _index.db file or indexed directory")
raise typer.Exit(code=1)
if not index_dbs:
if json_mode:
print_json(success=False, error="No index databases found")
else:
console.print("[yellow]No index databases found[/yellow]")
raise typer.Exit(code=1)
# Import cascade embedding backend
try:
from codexlens.indexing.embedding import CascadeEmbeddingBackend
from codexlens.semantic.ann_index import BinaryANNIndex
from codexlens.indexing.embedding import pack_binary_embedding
except ImportError as e:
error_msg = f"Cascade embedding dependencies not available: {e}"
if json_mode:
print_json(success=False, error=error_msg)
else:
console.print(f"[red]Error:[/red] {error_msg}")
console.print("[dim]Install with: pip install codexlens[semantic][/dim]")
raise typer.Exit(code=1)
if not json_mode:
console.print(f"[bold]Generating cascade embeddings[/bold]")
console.print(f"Path: [dim]{target_path}[/dim]")
console.print(f"Index databases: [cyan]{len(index_dbs)}[/cyan]")
console.print(f"Batch size: [cyan]{batch_size}[/cyan]")
console.print()
# Initialize cascade embedding backend
try:
cascade_backend = CascadeEmbeddingBackend()
except Exception as e:
error_msg = f"Failed to initialize cascade embedding backend: {e}"
if json_mode:
print_json(success=False, error=error_msg)
else:
console.print(f"[red]Error:[/red] {error_msg}")
raise typer.Exit(code=1)
# Process statistics
total_chunks_processed = 0
total_indexes_processed = 0
total_indexes_successful = 0
total_binary_indexes_created = 0
errors_list: List[str] = []
# Process each index database
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("({task.completed}/{task.total})"),
TimeElapsedColumn(),
console=console,
disable=json_mode,
) as progress:
db_task = progress.add_task("Processing indexes...", total=len(index_dbs))
for db_path in index_dbs:
total_indexes_processed += 1
index_name = db_path.parent.name
try:
# Open the index store
store = DirIndexStore(db_path)
store.initialize()
# Get connection for direct queries
conn = store._get_connection()
# Ensure cascade columns exist in semantic_chunks table
try:
conn.execute("ALTER TABLE semantic_chunks ADD COLUMN embedding_binary BLOB")
except Exception:
pass # Column already exists
try:
conn.execute("ALTER TABLE semantic_chunks ADD COLUMN embedding_dense BLOB")
except Exception:
pass # Column already exists
conn.commit()
# Check if semantic_chunks table exists and has data
try:
cursor = conn.execute("SELECT COUNT(*) FROM semantic_chunks")
chunk_count = cursor.fetchone()[0]
except Exception:
# semantic_chunks table doesn't exist or is empty
chunk_count = 0
if chunk_count == 0:
if verbose and not json_mode:
console.print(f" [dim]Skipping {index_name}: no chunks found[/dim]")
progress.advance(db_task)
store.close()
continue
# Check if embeddings already exist (unless force)
if not force:
cursor = conn.execute(
"SELECT COUNT(*) FROM semantic_chunks WHERE embedding_binary IS NOT NULL"
)
existing_count = cursor.fetchone()[0]
if existing_count > 0:
if verbose and not json_mode:
console.print(f" [dim]Skipping {index_name}: embeddings exist (use --force to regenerate)[/dim]")
progress.advance(db_task)
store.close()
continue
# If force, clear existing cascade embeddings
if force:
conn.execute(
"UPDATE semantic_chunks SET embedding_binary = NULL, embedding_dense = NULL"
)
conn.commit()
# Get all chunks
cursor = conn.execute("SELECT id, content FROM semantic_chunks")
chunks = cursor.fetchall()
if not chunks:
progress.advance(db_task)
store.close()
continue
if verbose and not json_mode:
console.print(f" Processing {index_name}: {len(chunks)} chunks")
# Process in batches
chunk_task = progress.add_task(
f" {index_name}", total=len(chunks)
)
# Prepare for BinaryANNIndex
binary_index_path = get_binary_index_path(db_path)
binary_ann_index = BinaryANNIndex(db_path, dim=256)
for i in range(0, len(chunks), batch_size):
batch_chunks = chunks[i:i + batch_size]
batch_ids = [c[0] for c in batch_chunks]
batch_contents = [c[1] for c in batch_chunks]
# Generate cascade embeddings
binary_embeddings, dense_embeddings = cascade_backend.encode_cascade(
batch_contents, batch_size=batch_size
)
# Pack binary embeddings and convert dense to bytes
packed_binaries = []
dense_bytes_list = []
for j in range(len(batch_ids)):
# Pack binary embedding (256 bits -> 32 bytes)
packed_binary = pack_binary_embedding(binary_embeddings[j])
packed_binaries.append(packed_binary)
# Convert dense embedding to bytes
import numpy as np
dense_blob = dense_embeddings[j].astype(np.float32).tobytes()
dense_bytes_list.append(dense_blob)
# Update database
for j, chunk_id in enumerate(batch_ids):
conn.execute(
"""
UPDATE semantic_chunks
SET embedding_binary = ?, embedding_dense = ?
WHERE id = ?
""",
(packed_binaries[j], dense_bytes_list[j], chunk_id)
)
# Add to binary ANN index
binary_ann_index.add_vectors(batch_ids, packed_binaries)
conn.commit()
total_chunks_processed += len(batch_ids)
progress.advance(chunk_task, len(batch_ids))
# Save binary ANN index
binary_ann_index.save()
total_binary_indexes_created += 1
progress.remove_task(chunk_task)
store.close()
total_indexes_successful += 1
except Exception as e:
error_msg = f"{index_name}: {e}"
errors_list.append(error_msg)
if verbose and not json_mode:
console.print(f" [red]Error processing {index_name}:[/red] {e}")
progress.advance(db_task)
# Build result
result = {
"path": str(target_path),
"indexes_processed": total_indexes_processed,
"indexes_successful": total_indexes_successful,
"chunks_processed": total_chunks_processed,
"binary_indexes_created": total_binary_indexes_created,
"errors": len(errors_list),
"error_details": errors_list[:5] if errors_list else [],
}
if json_mode:
print_json(success=True, result=result)
else:
console.print(f"\n[green]Cascade indexing complete[/green]")
console.print(f" Indexes processed: {total_indexes_processed}")
console.print(f" Indexes successful: {total_indexes_successful}")
console.print(f" Chunks processed: {total_chunks_processed:,}")
console.print(f" Binary indexes created: {total_binary_indexes_created}")
if errors_list:
console.print(f" [yellow]Errors: {len(errors_list)}[/yellow]")
for err in errors_list[:3]:
console.print(f" [dim]{err}[/dim]")
if len(errors_list) > 3:
console.print(f" [dim]... and {len(errors_list) - 3} more[/dim]")

View File

@@ -265,8 +265,8 @@ class DenseEmbeddingBackend(BaseEmbedder):
Model: BAAI/bge-large-en-v1.5 (1024 dim) with optional expansion
"""
DEFAULT_MODEL = "BAAI/bge-large-en-v1.5" # 1024 dim, high quality
TARGET_DIM = 2048
DEFAULT_MODEL = "BAAI/bge-small-en-v1.5" # 384 dim, use small for testing
TARGET_DIM = 768 # Reduced target for faster testing
def __init__(
self,