feat: Implement centralized storage for SPLADE and vector embeddings

- Added centralized SPLADE database and vector storage configuration in config.py.
- Updated embedding_manager.py to support centralized SPLADE database path.
- Enhanced generate_embeddings and generate_embeddings_recursive functions for centralized storage.
- Introduced centralized ANN index creation in ann_index.py.
- Modified hybrid_search.py to utilize centralized vector index for searches.
- Implemented methods to discover and manage centralized SPLADE and HNSW files.
This commit is contained in:
catlog22
2026-01-02 16:53:39 +08:00
parent 54fb7afdb2
commit 9157c5c78b
5 changed files with 1051 additions and 9 deletions

View File

@@ -6,6 +6,7 @@ import json
import logging import logging
import os import os
import shutil import shutil
import sqlite3
from pathlib import Path from pathlib import Path
from typing import Annotated, Any, Dict, Iterable, List, Optional from typing import Annotated, Any, Dict, Iterable, List, Optional
@@ -2514,7 +2515,8 @@ def splade_index_command(
console.print(f"[blue]Discovered {len(all_index_dbs)} index databases[/blue]") console.print(f"[blue]Discovered {len(all_index_dbs)} index databases[/blue]")
# SPLADE index is stored alongside the root _index.db # SPLADE index is stored alongside the root _index.db
splade_db = index_root / "_splade.db" from codexlens.config import SPLADE_DB_NAME
splade_db = index_root / SPLADE_DB_NAME
if splade_db.exists() and not rebuild: if splade_db.exists() and not rebuild:
console.print("[yellow]SPLADE index exists. Use --rebuild to regenerate.[/yellow]") console.print("[yellow]SPLADE index exists. Use --rebuild to regenerate.[/yellow]")
@@ -2626,15 +2628,16 @@ def splade_status_command(
from codexlens.storage.splade_index import SpladeIndex from codexlens.storage.splade_index import SpladeIndex
from codexlens.semantic.splade_encoder import check_splade_available from codexlens.semantic.splade_encoder import check_splade_available
from codexlens.config import SPLADE_DB_NAME
# Find index database # Find index database
target_path = path.expanduser().resolve() target_path = path.expanduser().resolve()
if target_path.is_file() and target_path.name == "_index.db": if target_path.is_file() and target_path.name == "_index.db":
splade_db = target_path.parent / "_splade.db" splade_db = target_path.parent / SPLADE_DB_NAME
elif target_path.is_dir(): elif target_path.is_dir():
# Check for local .codexlens/_splade.db # Check for local .codexlens/_splade.db
local_splade = target_path / ".codexlens" / "_splade.db" local_splade = target_path / ".codexlens" / SPLADE_DB_NAME
if local_splade.exists(): if local_splade.exists():
splade_db = local_splade splade_db = local_splade
else: else:
@@ -2644,7 +2647,7 @@ def splade_status_command(
registry.initialize() registry.initialize()
mapper = PathMapper() mapper = PathMapper()
index_db = mapper.source_to_index_db(target_path) index_db = mapper.source_to_index_db(target_path)
splade_db = index_db.parent / "_splade.db" splade_db = index_db.parent / SPLADE_DB_NAME
finally: finally:
registry.close() registry.close()
else: else:
@@ -3084,3 +3087,387 @@ def cascade_index(
console.print(f" [dim]{err}[/dim]") console.print(f" [dim]{err}[/dim]")
if len(errors_list) > 3: if len(errors_list) > 3:
console.print(f" [dim]... and {len(errors_list) - 3} more[/dim]") console.print(f" [dim]... and {len(errors_list) - 3} more[/dim]")
# ==================== Index Migration Commands ====================
# Index version for migration tracking (file-based version marker)
INDEX_FORMAT_VERSION = "2.0"
INDEX_VERSION_FILE = "_index_version.txt"
def _get_index_version(index_root: Path) -> Optional[str]:
"""Read index format version from version marker file.
Args:
index_root: Root directory of the index
Returns:
Version string if file exists, None otherwise
"""
version_file = index_root / INDEX_VERSION_FILE
if version_file.exists():
try:
return version_file.read_text(encoding="utf-8").strip()
except Exception:
return None
return None
def _set_index_version(index_root: Path, version: str) -> None:
"""Write index format version to version marker file.
Args:
index_root: Root directory of the index
version: Version string to write
"""
version_file = index_root / INDEX_VERSION_FILE
version_file.write_text(version, encoding="utf-8")
def _discover_distributed_splade(index_root: Path) -> List[Dict[str, Any]]:
"""Discover distributed SPLADE data in _index.db files.
Scans all _index.db files for embedded splade_postings tables.
This is the old distributed format that needs migration.
Args:
index_root: Root directory to scan
Returns:
List of dicts with db_path, posting_count, chunk_count
"""
results = []
for db_path in index_root.rglob("_index.db"):
try:
conn = sqlite3.connect(db_path, timeout=5.0)
conn.row_factory = sqlite3.Row
# Check if splade_postings table exists (old embedded format)
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='splade_postings'"
)
if cursor.fetchone():
# Count postings and chunks
try:
row = conn.execute(
"SELECT COUNT(*) as postings, COUNT(DISTINCT chunk_id) as chunks FROM splade_postings"
).fetchone()
results.append({
"db_path": db_path,
"posting_count": row["postings"] if row else 0,
"chunk_count": row["chunks"] if row else 0,
})
except Exception:
pass
conn.close()
except Exception:
pass
return results
def _discover_distributed_hnsw(index_root: Path) -> List[Dict[str, Any]]:
"""Discover distributed HNSW index files.
Scans for .hnsw files that are stored alongside _index.db files.
This is the old distributed format that needs migration.
Args:
index_root: Root directory to scan
Returns:
List of dicts with hnsw_path, size_bytes
"""
results = []
for hnsw_path in index_root.rglob("*.hnsw"):
try:
size = hnsw_path.stat().st_size
results.append({
"hnsw_path": hnsw_path,
"size_bytes": size,
})
except Exception:
pass
return results
def _check_centralized_storage(index_root: Path) -> Dict[str, Any]:
"""Check for centralized storage files.
Args:
index_root: Root directory to check
Returns:
Dict with has_splade, has_vectors, splade_stats, vector_stats
"""
from codexlens.config import SPLADE_DB_NAME, VECTORS_HNSW_NAME
splade_db = index_root / SPLADE_DB_NAME
vectors_hnsw = index_root / VECTORS_HNSW_NAME
result = {
"has_splade": splade_db.exists(),
"has_vectors": vectors_hnsw.exists(),
"splade_path": str(splade_db) if splade_db.exists() else None,
"vectors_path": str(vectors_hnsw) if vectors_hnsw.exists() else None,
"splade_stats": None,
"vector_stats": None,
}
# Get SPLADE stats if exists
if splade_db.exists():
try:
from codexlens.storage.splade_index import SpladeIndex
splade = SpladeIndex(splade_db)
if splade.has_index():
result["splade_stats"] = splade.get_stats()
splade.close()
except Exception:
pass
# Get vector stats if exists
if vectors_hnsw.exists():
try:
result["vector_stats"] = {
"size_bytes": vectors_hnsw.stat().st_size,
}
except Exception:
pass
return result
@app.command(name="index-migrate")
def index_migrate(
path: Annotated[Optional[str], typer.Argument(help="Project path to migrate")] = None,
dry_run: Annotated[bool, typer.Option("--dry-run", help="Show what would be migrated without making changes")] = False,
force: Annotated[bool, typer.Option("--force", help="Force migration even if already migrated")] = False,
json_mode: Annotated[bool, typer.Option("--json", help="Output JSON response")] = False,
verbose: Annotated[bool, typer.Option("--verbose", "-v", help="Enable verbose output")] = False,
) -> None:
"""Migrate old distributed index to new centralized architecture.
This command upgrades indexes from the old distributed storage format
(where SPLADE/vectors were stored in each _index.db) to the new centralized
format (single _splade.db and _vectors.hnsw at index root).
Migration Steps:
1. Detect if migration is needed (check version marker)
2. Discover distributed SPLADE data in _index.db files
3. Discover distributed .hnsw files
4. Report current status
5. Create version marker (unless --dry-run)
Use --dry-run to preview what would be migrated without making changes.
Use --force to re-run migration even if version marker exists.
Note: For full data migration (SPLADE/vectors consolidation), run:
codexlens splade-index <path> --rebuild
codexlens embeddings-generate <path> --recursive --force
Examples:
codexlens index-migrate ~/projects/my-app --dry-run
codexlens index-migrate . --force
codexlens index-migrate --json
"""
_configure_logging(verbose, json_mode)
# Resolve target path
if path:
target_path = Path(path).expanduser().resolve()
else:
target_path = Path.cwd()
if not target_path.exists():
if json_mode:
print_json(success=False, error=f"Path does not exist: {target_path}")
else:
console.print(f"[red]Error:[/red] Path does not exist: {target_path}")
raise typer.Exit(code=1)
# Find index root
registry: RegistryStore | None = None
index_root: Optional[Path] = None
try:
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
# Check if path is a project with an index
project_info = registry.get_project(target_path)
if project_info:
index_root = Path(project_info.index_root)
else:
# Try to find index via mapper
index_db = mapper.source_to_index_db(target_path)
if index_db.exists():
index_root = index_db.parent
finally:
if registry:
registry.close()
if not index_root or not index_root.exists():
if json_mode:
print_json(success=False, error=f"No index found for: {target_path}")
else:
console.print(f"[red]Error:[/red] No index found for: {target_path}")
console.print("[dim]Run 'codexlens init' first to create an index.[/dim]")
raise typer.Exit(code=1)
if not json_mode:
console.print(f"[bold]Index Migration Check[/bold]")
console.print(f"Source path: [dim]{target_path}[/dim]")
console.print(f"Index root: [dim]{index_root}[/dim]")
if dry_run:
console.print("[yellow]Mode: DRY RUN (no changes will be made)[/yellow]")
console.print()
# Check current version
current_version = _get_index_version(index_root)
needs_migration = current_version is None or (force and current_version != INDEX_FORMAT_VERSION)
if current_version and current_version >= INDEX_FORMAT_VERSION and not force:
result = {
"path": str(target_path),
"index_root": str(index_root),
"current_version": current_version,
"target_version": INDEX_FORMAT_VERSION,
"needs_migration": False,
"message": "Index is already at the latest version",
}
if json_mode:
print_json(success=True, result=result)
else:
console.print(f"[green]OK[/green] Index is already at version {current_version}")
console.print("[dim]No migration needed. Use --force to re-run migration.[/dim]")
return
# Discover distributed data
distributed_splade = _discover_distributed_splade(index_root)
distributed_hnsw = _discover_distributed_hnsw(index_root)
centralized = _check_centralized_storage(index_root)
# Count all _index.db files
all_index_dbs = list(index_root.rglob("_index.db"))
# Build migration report
migration_report = {
"path": str(target_path),
"index_root": str(index_root),
"dry_run": dry_run,
"current_version": current_version,
"target_version": INDEX_FORMAT_VERSION,
"needs_migration": needs_migration,
"discovery": {
"total_index_dbs": len(all_index_dbs),
"distributed_splade_count": len(distributed_splade),
"distributed_splade_total_postings": sum(d["posting_count"] for d in distributed_splade),
"distributed_hnsw_count": len(distributed_hnsw),
"distributed_hnsw_total_bytes": sum(d["size_bytes"] for d in distributed_hnsw),
},
"centralized": centralized,
"recommendations": [],
}
# Generate recommendations
if distributed_splade and not centralized["has_splade"]:
migration_report["recommendations"].append(
f"Run 'codexlens splade-index {target_path} --rebuild' to consolidate SPLADE data"
)
if distributed_hnsw and not centralized["has_vectors"]:
migration_report["recommendations"].append(
f"Run 'codexlens embeddings-generate {target_path} --recursive --force' to consolidate vector data"
)
if not distributed_splade and not distributed_hnsw:
migration_report["recommendations"].append(
"No distributed data found. Index may already be using centralized storage."
)
if json_mode:
# Perform migration action (set version marker) unless dry-run
if not dry_run and needs_migration:
_set_index_version(index_root, INDEX_FORMAT_VERSION)
migration_report["migrated"] = True
migration_report["new_version"] = INDEX_FORMAT_VERSION
else:
migration_report["migrated"] = False
print_json(success=True, result=migration_report)
else:
# Display discovery results
console.print("[bold]Discovery Results:[/bold]")
console.print(f" Total _index.db files: {len(all_index_dbs)}")
console.print()
# Distributed SPLADE
console.print("[bold]Distributed SPLADE Data:[/bold]")
if distributed_splade:
total_postings = sum(d["posting_count"] for d in distributed_splade)
total_chunks = sum(d["chunk_count"] for d in distributed_splade)
console.print(f" Found in {len(distributed_splade)} _index.db files")
console.print(f" Total postings: {total_postings:,}")
console.print(f" Total chunks: {total_chunks:,}")
if verbose:
for d in distributed_splade[:5]:
console.print(f" [dim]{d['db_path'].parent.name}: {d['posting_count']} postings[/dim]")
if len(distributed_splade) > 5:
console.print(f" [dim]... and {len(distributed_splade) - 5} more[/dim]")
else:
console.print(" [dim]None found (already centralized or not generated)[/dim]")
console.print()
# Distributed HNSW
console.print("[bold]Distributed HNSW Files:[/bold]")
if distributed_hnsw:
total_size = sum(d["size_bytes"] for d in distributed_hnsw)
console.print(f" Found {len(distributed_hnsw)} .hnsw files")
console.print(f" Total size: {total_size / (1024 * 1024):.1f} MB")
if verbose:
for d in distributed_hnsw[:5]:
console.print(f" [dim]{d['hnsw_path'].name}: {d['size_bytes'] / 1024:.1f} KB[/dim]")
if len(distributed_hnsw) > 5:
console.print(f" [dim]... and {len(distributed_hnsw) - 5} more[/dim]")
else:
console.print(" [dim]None found (already centralized or not generated)[/dim]")
console.print()
# Centralized storage status
console.print("[bold]Centralized Storage:[/bold]")
if centralized["has_splade"]:
stats = centralized.get("splade_stats") or {}
console.print(f" [green]OK[/green] _splade.db exists")
if stats:
console.print(f" Chunks: {stats.get('unique_chunks', 0):,}")
console.print(f" Postings: {stats.get('total_postings', 0):,}")
else:
console.print(f" [yellow]--[/yellow] _splade.db not found")
if centralized["has_vectors"]:
stats = centralized.get("vector_stats") or {}
size_mb = stats.get("size_bytes", 0) / (1024 * 1024)
console.print(f" [green]OK[/green] _vectors.hnsw exists ({size_mb:.1f} MB)")
else:
console.print(f" [yellow]--[/yellow] _vectors.hnsw not found")
console.print()
# Migration action
if not dry_run and needs_migration:
_set_index_version(index_root, INDEX_FORMAT_VERSION)
console.print(f"[green]OK[/green] Version marker created: {INDEX_FORMAT_VERSION}")
elif dry_run:
console.print(f"[yellow]DRY RUN:[/yellow] Would create version marker: {INDEX_FORMAT_VERSION}")
# Recommendations
if migration_report["recommendations"]:
console.print("\n[bold]Recommendations:[/bold]")
for rec in migration_report["recommendations"]:
console.print(f" [cyan]>[/cyan] {rec}")

View File

@@ -310,6 +310,7 @@ def generate_embeddings(
endpoints: Optional[List] = None, endpoints: Optional[List] = None,
strategy: Optional[str] = None, strategy: Optional[str] = None,
cooldown: Optional[float] = None, cooldown: Optional[float] = None,
splade_db_path: Optional[Path] = None,
) -> Dict[str, any]: ) -> Dict[str, any]:
"""Generate embeddings for an index using memory-efficient batch processing. """Generate embeddings for an index using memory-efficient batch processing.
@@ -339,6 +340,9 @@ def generate_embeddings(
Each dict has keys: model, api_key, api_base, weight. Each dict has keys: model, api_key, api_base, weight.
strategy: Selection strategy for multi-endpoint mode (round_robin, latency_aware). strategy: Selection strategy for multi-endpoint mode (round_robin, latency_aware).
cooldown: Default cooldown seconds for rate-limited endpoints. cooldown: Default cooldown seconds for rate-limited endpoints.
splade_db_path: Optional path to centralized SPLADE database. If None, SPLADE
is written to index_path (legacy behavior). Use index_root / SPLADE_DB_NAME
for centralized storage.
Returns: Returns:
Result dictionary with generation statistics Result dictionary with generation statistics
@@ -723,7 +727,7 @@ def generate_embeddings(
splade_error = None splade_error = None
try: try:
from codexlens.config import Config from codexlens.config import Config, SPLADE_DB_NAME
config = Config.load() config = Config.load()
if config.enable_splade: if config.enable_splade:
@@ -737,8 +741,9 @@ def generate_embeddings(
# Initialize SPLADE encoder and index # Initialize SPLADE encoder and index
splade_encoder = get_splade_encoder(use_gpu=use_gpu) splade_encoder = get_splade_encoder(use_gpu=use_gpu)
# Use main index database for SPLADE (not separate _splade.db) # Use centralized SPLADE database if provided, otherwise fallback to index_path
splade_index = SpladeIndex(index_path) effective_splade_path = splade_db_path if splade_db_path else index_path
splade_index = SpladeIndex(effective_splade_path)
splade_index.create_tables() splade_index.create_tables()
# Retrieve all chunks from database for SPLADE encoding # Retrieve all chunks from database for SPLADE encoding
@@ -953,6 +958,10 @@ def generate_embeddings_recursive(
if progress_callback: if progress_callback:
progress_callback(f"Found {len(index_files)} index databases to process") progress_callback(f"Found {len(index_files)} index databases to process")
# Calculate centralized SPLADE database path
from codexlens.config import SPLADE_DB_NAME
splade_db_path = index_root / SPLADE_DB_NAME
# Process each index database # Process each index database
all_results = [] all_results = []
total_chunks = 0 total_chunks = 0
@@ -982,6 +991,7 @@ def generate_embeddings_recursive(
endpoints=endpoints, endpoints=endpoints,
strategy=strategy, strategy=strategy,
cooldown=cooldown, cooldown=cooldown,
splade_db_path=splade_db_path, # Use centralized SPLADE storage
) )
all_results.append({ all_results.append({
@@ -1023,6 +1033,279 @@ def generate_embeddings_recursive(
} }
def generate_dense_embeddings_centralized(
index_root: Path,
embedding_backend: Optional[str] = None,
model_profile: Optional[str] = None,
force: bool = False,
chunk_size: int = 2000,
overlap: int = 200,
progress_callback: Optional[callable] = None,
use_gpu: Optional[bool] = None,
max_tokens_per_batch: Optional[int] = None,
max_workers: Optional[int] = None,
endpoints: Optional[List] = None,
strategy: Optional[str] = None,
cooldown: Optional[float] = None,
) -> Dict[str, any]:
"""Generate dense embeddings with centralized vector storage.
This function creates a single HNSW index at the project root instead of
per-directory indexes. All chunks from all _index.db files are combined
into one central _vectors.hnsw file.
Target architecture:
<index_root>/
|-- _vectors.hnsw # Centralized dense vector ANN index
|-- _splade.db # Centralized sparse vector index
|-- src/
|-- _index.db # No longer contains .hnsw file
Args:
index_root: Root index directory containing _index.db files
embedding_backend: Embedding backend (fastembed or litellm)
model_profile: Model profile or name
force: If True, regenerate even if embeddings exist
chunk_size: Maximum chunk size in characters
overlap: Overlap size in characters
progress_callback: Optional callback for progress updates
use_gpu: Whether to use GPU acceleration
max_tokens_per_batch: Maximum tokens per batch
max_workers: Maximum concurrent workers
endpoints: Multi-endpoint configurations
strategy: Endpoint selection strategy
cooldown: Rate-limit cooldown seconds
Returns:
Result dictionary with generation statistics
"""
from codexlens.config import VECTORS_HNSW_NAME, SPLADE_DB_NAME
# Get defaults from config if not specified
(default_backend, default_model, default_gpu,
default_endpoints, default_strategy, default_cooldown) = _get_embedding_defaults()
if embedding_backend is None:
embedding_backend = default_backend
if model_profile is None:
model_profile = default_model
if use_gpu is None:
use_gpu = default_gpu
if endpoints is None:
endpoints = default_endpoints
if strategy is None:
strategy = default_strategy
if cooldown is None:
cooldown = default_cooldown
# Calculate endpoint count for worker scaling
endpoint_count = len(endpoints) if endpoints else 1
if max_workers is None:
if embedding_backend == "litellm":
if endpoint_count > 1:
max_workers = endpoint_count * 2
else:
max_workers = 4
else:
max_workers = 1
backend_available, backend_error = is_embedding_backend_available(embedding_backend)
if not backend_available:
return {"success": False, "error": backend_error or "Embedding backend not available"}
# Discover all _index.db files
index_files = discover_all_index_dbs(index_root)
if not index_files:
return {
"success": False,
"error": f"No index databases found in {index_root}",
}
if progress_callback:
progress_callback(f"Found {len(index_files)} index databases for centralized embedding")
# Check for existing centralized index
central_hnsw_path = index_root / VECTORS_HNSW_NAME
if central_hnsw_path.exists() and not force:
return {
"success": False,
"error": f"Centralized vector index already exists at {central_hnsw_path}. Use --force to regenerate.",
}
# Initialize embedder
try:
from codexlens.semantic.factory import get_embedder as get_embedder_factory
from codexlens.semantic.chunker import Chunker, ChunkConfig
from codexlens.semantic.ann_index import ANNIndex
if embedding_backend == "fastembed":
embedder = get_embedder_factory(backend="fastembed", profile=model_profile, use_gpu=use_gpu)
elif embedding_backend == "litellm":
embedder = get_embedder_factory(
backend="litellm",
model=model_profile,
endpoints=endpoints if endpoints else None,
strategy=strategy,
cooldown=cooldown,
)
else:
return {
"success": False,
"error": f"Invalid embedding backend: {embedding_backend}",
}
chunker = Chunker(config=ChunkConfig(
max_chunk_size=chunk_size,
overlap=overlap,
skip_token_count=True
))
if progress_callback:
if endpoint_count > 1:
progress_callback(f"Using {endpoint_count} API endpoints with {strategy} strategy")
progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)")
except Exception as e:
return {
"success": False,
"error": f"Failed to initialize components: {str(e)}",
}
# Create centralized ANN index
central_ann_index = ANNIndex.create_central(
index_root=index_root,
dim=embedder.embedding_dim,
initial_capacity=100000, # Larger capacity for centralized index
auto_save=False,
)
# Process all index databases
start_time = time.time()
failed_files = []
total_chunks_created = 0
total_files_processed = 0
all_chunk_ids = []
all_embeddings = []
# Track chunk ID to file_path mapping for metadata
chunk_id_to_info: Dict[int, Dict[str, Any]] = {}
next_chunk_id = 1
for idx, index_path in enumerate(index_files, 1):
if progress_callback:
try:
rel_path = index_path.relative_to(index_root)
except ValueError:
rel_path = index_path
progress_callback(f"Processing {idx}/{len(index_files)}: {rel_path}")
try:
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
path_column = _get_path_column(conn)
# Get files from this index
cursor = conn.execute(f"SELECT {path_column}, content, language FROM files")
file_rows = cursor.fetchall()
for file_row in file_rows:
file_path = file_row[path_column]
content = file_row["content"]
language = file_row["language"] or "python"
try:
chunks = chunker.chunk_sliding_window(
content,
file_path=file_path,
language=language
)
if not chunks:
continue
total_files_processed += 1
# Generate embeddings for this file's chunks
batch_contents = [chunk.content for chunk in chunks]
embeddings_numpy = embedder.embed_to_numpy(batch_contents, batch_size=EMBEDDING_BATCH_SIZE)
# Assign chunk IDs and store embeddings
for i, chunk in enumerate(chunks):
chunk_id = next_chunk_id
next_chunk_id += 1
all_chunk_ids.append(chunk_id)
all_embeddings.append(embeddings_numpy[i])
# Store metadata for later retrieval
chunk_id_to_info[chunk_id] = {
"file_path": file_path,
"content": chunk.content,
"metadata": chunk.metadata,
"category": get_file_category(file_path) or "code",
}
total_chunks_created += 1
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
failed_files.append((file_path, str(e)))
except Exception as e:
logger.error(f"Failed to read index {index_path}: {e}")
failed_files.append((str(index_path), str(e)))
# Add all embeddings to centralized ANN index
if all_embeddings:
if progress_callback:
progress_callback(f"Building centralized ANN index with {len(all_embeddings)} vectors...")
try:
import numpy as np
embeddings_matrix = np.vstack(all_embeddings)
central_ann_index.add_vectors(all_chunk_ids, embeddings_matrix)
central_ann_index.save()
if progress_callback:
progress_callback(f"Saved centralized index to {central_hnsw_path}")
except Exception as e:
return {
"success": False,
"error": f"Failed to build centralized ANN index: {str(e)}",
}
# Store chunk metadata in a centralized metadata database
vectors_meta_path = index_root / "VECTORS_META_DB_NAME"
# Note: The metadata is already stored in individual _index.db semantic_chunks tables
# For now, we rely on the existing per-index storage for metadata lookup
# A future enhancement could consolidate metadata into _vectors_meta.db
elapsed_time = time.time() - start_time
# Cleanup
try:
_cleanup_fastembed_resources()
gc.collect()
except Exception:
pass
return {
"success": True,
"result": {
"chunks_created": total_chunks_created,
"files_processed": total_files_processed,
"files_failed": len(failed_files),
"elapsed_time": elapsed_time,
"model_profile": model_profile,
"model_name": embedder.model_name,
"central_index_path": str(central_hnsw_path),
"failed_files": failed_files[:5],
},
}
def get_embeddings_status(index_root: Path) -> Dict[str, any]: def get_embeddings_status(index_root: Path) -> Dict[str, any]:
"""Get comprehensive embeddings coverage status for all indexes. """Get comprehensive embeddings coverage status for all indexes.

View File

@@ -19,6 +19,13 @@ WORKSPACE_DIR_NAME = ".codexlens"
# Settings file name # Settings file name
SETTINGS_FILE_NAME = "settings.json" SETTINGS_FILE_NAME = "settings.json"
# SPLADE index database name (centralized storage)
SPLADE_DB_NAME = "_splade.db"
# Dense vector storage names (centralized storage)
VECTORS_HNSW_NAME = "_vectors.hnsw"
VECTORS_META_DB_NAME = "_vectors_meta.db"
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@@ -31,6 +31,7 @@ def timer(name: str, logger: logging.Logger, level: int = logging.DEBUG):
logger.log(level, "[TIMING] %s: %.2fms", name, elapsed_ms) logger.log(level, "[TIMING] %s: %.2fms", name, elapsed_ms)
from codexlens.config import Config from codexlens.config import Config
from codexlens.config import VECTORS_HNSW_NAME
from codexlens.entities import SearchResult from codexlens.entities import SearchResult
from codexlens.search.ranking import ( from codexlens.search.ranking import (
DEFAULT_WEIGHTS, DEFAULT_WEIGHTS,
@@ -517,11 +518,275 @@ class HybridSearchEngine:
self.logger.debug("Fuzzy search error: %s", exc) self.logger.debug("Fuzzy search error: %s", exc)
return [] return []
def _find_vectors_hnsw(self, index_path: Path) -> Optional[Path]:
"""Find the centralized _vectors.hnsw file by traversing up from index_path.
Similar to _search_splade's approach, this method searches for the
centralized dense vector index file in parent directories.
Args:
index_path: Path to the current _index.db file
Returns:
Path to _vectors.hnsw if found, None otherwise
"""
current_dir = index_path.parent
for _ in range(10): # Limit search depth
candidate = current_dir / VECTORS_HNSW_NAME
if candidate.exists():
return candidate
parent = current_dir.parent
if parent == current_dir: # Reached root
break
current_dir = parent
return None
def _search_vector_centralized(
self,
index_path: Path,
hnsw_path: Path,
query: str,
limit: int,
category: Optional[str] = None,
) -> List[SearchResult]:
"""Search using centralized vector index.
Args:
index_path: Path to _index.db file (for metadata lookup)
hnsw_path: Path to centralized _vectors.hnsw file
query: Natural language query string
limit: Maximum results
category: Optional category filter ('code' or 'doc')
Returns:
List of SearchResult objects ordered by semantic similarity
"""
try:
import sqlite3
import json
from codexlens.semantic.factory import get_embedder
from codexlens.semantic.ann_index import ANNIndex
# Get model config from the first index database we can find
# (all indexes should use the same embedding model)
index_root = hnsw_path.parent
model_config = None
# Try to get model config from the provided index_path first
try:
from codexlens.semantic.vector_store import VectorStore
with VectorStore(index_path) as vs:
model_config = vs.get_model_config()
except Exception:
pass
# Detect dimension from HNSW file if model config not found
if model_config is None:
self.logger.debug("Model config not found, will detect from HNSW index")
# Create a temporary ANNIndex to load and detect dimension
# We need to know the dimension to properly load the index
# Get embedder based on model config or default
if model_config:
backend = model_config.get("backend", "fastembed")
model_name = model_config["model_name"]
model_profile = model_config["model_profile"]
embedding_dim = model_config["embedding_dim"]
if backend == "litellm":
embedder = get_embedder(backend="litellm", model=model_name)
else:
embedder = get_embedder(backend="fastembed", profile=model_profile)
else:
# Default to code profile
embedder = get_embedder(backend="fastembed", profile="code")
embedding_dim = embedder.embedding_dim
# Load centralized ANN index
start_load = time.perf_counter()
ann_index = ANNIndex.create_central(
index_root=index_root,
dim=embedding_dim,
)
if not ann_index.load():
self.logger.warning("Failed to load centralized vector index from %s", hnsw_path)
return []
self.logger.debug(
"[TIMING] central_ann_load: %.2fms (%d vectors)",
(time.perf_counter() - start_load) * 1000,
ann_index.count()
)
# Generate query embedding
start_embed = time.perf_counter()
query_embedding = embedder.embed_single(query)
self.logger.debug(
"[TIMING] query_embedding: %.2fms",
(time.perf_counter() - start_embed) * 1000
)
# Search ANN index
start_search = time.perf_counter()
import numpy as np
query_vec = np.array(query_embedding, dtype=np.float32)
ids, distances = ann_index.search(query_vec, top_k=limit * 2) # Fetch extra for filtering
self.logger.debug(
"[TIMING] central_ann_search: %.2fms (%d results)",
(time.perf_counter() - start_search) * 1000,
len(ids) if ids else 0
)
if not ids:
return []
# Convert distances to similarity scores (for cosine: score = 1 - distance)
scores = [1.0 - d for d in distances]
# Fetch chunk metadata from semantic_chunks tables
# We need to search across all _index.db files in the project
results = self._fetch_chunks_by_ids_centralized(
index_root, ids, scores, category
)
return results[:limit]
except ImportError as exc:
self.logger.debug("Semantic dependencies not available: %s", exc)
return []
except Exception as exc:
self.logger.error("Centralized vector search error: %s", exc)
return []
def _fetch_chunks_by_ids_centralized(
self,
index_root: Path,
chunk_ids: List[int],
scores: List[float],
category: Optional[str] = None,
) -> List[SearchResult]:
"""Fetch chunk metadata from all _index.db files for centralized search.
Args:
index_root: Root directory containing _index.db files
chunk_ids: List of chunk IDs from ANN search
scores: Corresponding similarity scores
category: Optional category filter
Returns:
List of SearchResult objects
"""
import sqlite3
import json
# Build score map
score_map = {cid: score for cid, score in zip(chunk_ids, scores)}
# Find all _index.db files
index_files = list(index_root.rglob("_index.db"))
results = []
found_ids = set()
for index_path in index_files:
try:
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
# Check if semantic_chunks table exists
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='semantic_chunks'"
)
if cursor.fetchone() is None:
continue
# Build query for chunk IDs we haven't found yet
remaining_ids = [cid for cid in chunk_ids if cid not in found_ids]
if not remaining_ids:
break
placeholders = ",".join("?" * len(remaining_ids))
if category:
query = f"""
SELECT id, file_path, content, metadata
FROM semantic_chunks
WHERE id IN ({placeholders}) AND category = ?
"""
params = remaining_ids + [category]
else:
query = f"""
SELECT id, file_path, content, metadata
FROM semantic_chunks
WHERE id IN ({placeholders})
"""
params = remaining_ids
rows = conn.execute(query, params).fetchall()
for row in rows:
chunk_id = row["id"]
if chunk_id in found_ids:
continue
found_ids.add(chunk_id)
file_path = row["file_path"]
content = row["content"]
metadata_json = row["metadata"]
metadata = json.loads(metadata_json) if metadata_json else {}
score = score_map.get(chunk_id, 0.0)
# Build excerpt
excerpt = content[:200] + "..." if len(content) > 200 else content
# Extract symbol information
symbol_name = metadata.get("symbol_name")
symbol_kind = metadata.get("symbol_kind")
start_line = metadata.get("start_line")
end_line = metadata.get("end_line")
# Build Symbol object if available
symbol = None
if symbol_name and symbol_kind and start_line and end_line:
try:
from codexlens.entities import Symbol
symbol = Symbol(
name=symbol_name,
kind=symbol_kind,
range=(start_line, end_line)
)
except Exception:
pass
results.append(SearchResult(
path=file_path,
score=score,
excerpt=excerpt,
content=content,
symbol=symbol,
metadata=metadata,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
))
except Exception as e:
self.logger.debug("Failed to fetch chunks from %s: %s", index_path, e)
continue
# Sort by score descending
results.sort(key=lambda r: r.score, reverse=True)
return results
def _search_vector( def _search_vector(
self, index_path: Path, query: str, limit: int, category: Optional[str] = None self, index_path: Path, query: str, limit: int, category: Optional[str] = None
) -> List[SearchResult]: ) -> List[SearchResult]:
"""Execute vector similarity search using semantic embeddings. """Execute vector similarity search using semantic embeddings.
Supports both centralized vector storage (single _vectors.hnsw at project root)
and distributed storage (per-directory .hnsw files).
Args: Args:
index_path: Path to _index.db file index_path: Path to _index.db file
query: Natural language query string query: Natural language query string
@@ -532,6 +797,15 @@ class HybridSearchEngine:
List of SearchResult objects ordered by semantic similarity List of SearchResult objects ordered by semantic similarity
""" """
try: try:
# First, check for centralized vector index
central_hnsw_path = self._find_vectors_hnsw(index_path)
if central_hnsw_path is not None:
self.logger.debug("Found centralized vector index at %s", central_hnsw_path)
return self._search_vector_centralized(
index_path, central_hnsw_path, query, limit, category
)
# Fallback to distributed (per-index) vector storage
# Check if semantic chunks table exists # Check if semantic chunks table exists
import sqlite3 import sqlite3
@@ -677,6 +951,7 @@ class HybridSearchEngine:
try: try:
from codexlens.semantic.splade_encoder import get_splade_encoder, check_splade_available from codexlens.semantic.splade_encoder import get_splade_encoder, check_splade_available
from codexlens.storage.splade_index import SpladeIndex from codexlens.storage.splade_index import SpladeIndex
from codexlens.config import SPLADE_DB_NAME
import sqlite3 import sqlite3
import json import json
@@ -691,7 +966,7 @@ class HybridSearchEngine:
current_dir = index_path.parent current_dir = index_path.parent
splade_db_path = None splade_db_path = None
for _ in range(10): # Limit search depth for _ in range(10): # Limit search depth
candidate = current_dir / "_splade.db" candidate = current_dir / SPLADE_DB_NAME
if candidate.exists(): if candidate.exists():
splade_db_path = candidate splade_db_path = candidate
break break

View File

@@ -9,6 +9,7 @@ Key features:
- Incremental vector addition and deletion - Incremental vector addition and deletion
- Thread-safe operations - Thread-safe operations
- Cosine similarity metric - Cosine similarity metric
- Support for centralized storage mode (single index at project root)
""" """
from __future__ import annotations from __future__ import annotations
@@ -19,6 +20,7 @@ from pathlib import Path
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from codexlens.errors import StorageError from codexlens.errors import StorageError
from codexlens.config import VECTORS_HNSW_NAME
from . import SEMANTIC_AVAILABLE from . import SEMANTIC_AVAILABLE
@@ -127,6 +129,94 @@ class ANNIndex:
f"auto_save={auto_save}, expansion_threshold={expansion_threshold}" f"auto_save={auto_save}, expansion_threshold={expansion_threshold}"
) )
@classmethod
def create_central(
cls,
index_root: Path,
dim: int,
initial_capacity: int = 50000,
auto_save: bool = False,
expansion_threshold: float = 0.8,
) -> "ANNIndex":
"""Create a centralized ANN index at the project index root.
This method creates a single shared HNSW index file at the project root,
rather than per-directory indexes. Use this for projects that want all
dense vectors stored in one central location.
Args:
index_root: Root directory for the index (e.g., .codexlens/<project_hash>/)
dim: Dimension of embedding vectors
initial_capacity: Initial maximum elements capacity (default: 50000)
auto_save: Whether to automatically save index after operations (default: False)
expansion_threshold: Capacity threshold to trigger auto-expansion (default: 0.8)
Returns:
ANNIndex instance configured for centralized storage
Example:
>>> index = ANNIndex.create_central(Path(".codexlens/abc123"), dim=768)
>>> index.hnsw_path # Returns: .codexlens/abc123/_vectors.hnsw
"""
# Create a dummy index_path that will result in the central hnsw_path
# The index_path is used to derive hnsw_path, so we create a virtual path
# such that self.hnsw_path = index_root / VECTORS_HNSW_NAME
instance = cls.__new__(cls)
if not SEMANTIC_AVAILABLE:
raise ImportError(
"Semantic search dependencies not available. "
"Install with: pip install codexlens[semantic]"
)
if not HNSWLIB_AVAILABLE:
raise ImportError(
"hnswlib is required for ANN index. "
"Install with: pip install hnswlib"
)
if dim <= 0:
raise ValueError(f"Invalid dimension: {dim}")
if initial_capacity <= 0:
raise ValueError(f"Invalid initial capacity: {initial_capacity}")
if not 0.0 < expansion_threshold < 1.0:
raise ValueError(
f"Invalid expansion threshold: {expansion_threshold}. Must be between 0 and 1."
)
instance.index_path = index_root
instance.dim = dim
# Centralized mode: use VECTORS_HNSW_NAME directly at index_root
instance.hnsw_path = index_root / VECTORS_HNSW_NAME
# HNSW parameters
instance.space = "cosine"
instance.M = 16
instance.ef_construction = 200
instance.ef = 50
# Memory management parameters
instance._auto_save = auto_save
instance._expansion_threshold = expansion_threshold
# Thread safety
instance._lock = threading.RLock()
# HNSW index instance
instance._index: Optional[hnswlib.Index] = None
instance._max_elements = initial_capacity
instance._current_count = 0
logger.info(
f"Initialized centralized ANNIndex at {instance.hnsw_path} with "
f"capacity={initial_capacity}, auto_save={auto_save}"
)
return instance
def _ensure_index(self) -> None: def _ensure_index(self) -> None:
"""Ensure HNSW index is initialized (lazy initialization).""" """Ensure HNSW index is initialized (lazy initialization)."""
if self._index is None: if self._index is None: