Files

3269 lines
131 KiB
Python

"""Chain search engine for recursive multi-directory searching.
Provides parallel search across directory hierarchies using indexed _index.db files.
Supports depth-limited traversal, result aggregation, and symbol search.
"""
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional, Dict, Any, Literal, Tuple, TYPE_CHECKING
import json
import logging
import os
import time
from codexlens.entities import SearchResult, Symbol
if TYPE_CHECKING:
import numpy as np
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
from codexlens.config import Config
from codexlens.storage.registry import RegistryStore, DirMapping
from codexlens.storage.dir_index import DirIndexStore, SubdirLink
from codexlens.storage.global_index import GlobalSymbolIndex
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.sqlite_store import SQLiteStore
from codexlens.storage.vector_meta_store import VectorMetadataStore
from codexlens.config import VECTORS_META_DB_NAME
from codexlens.search.hybrid_search import HybridSearchEngine
@dataclass
class SearchOptions:
"""Configuration options for chain search.
Attributes:
depth: Maximum search depth (-1 = unlimited, 0 = current dir only)
max_workers: Number of parallel worker threads
limit_per_dir: Maximum results per directory
total_limit: Total result limit across all directories
offset: Pagination offset - skip first N results (default 0)
include_symbols: Whether to include symbol search results
files_only: Return only file paths without excerpts
include_semantic: Whether to include semantic keyword search results
code_only: Only return code files (excludes md, txt, json, yaml, xml, etc.)
exclude_extensions: List of file extensions to exclude (e.g., ["md", "txt", "json"])
hybrid_mode: Enable hybrid search with RRF fusion (default False)
enable_fuzzy: Enable fuzzy FTS in hybrid mode (default True)
enable_vector: Enable vector semantic search (default False)
pure_vector: If True, only use vector search without FTS fallback (default False)
enable_splade: Enable SPLADE sparse neural search (default False)
enable_cascade: Enable cascade (binary+dense) two-stage retrieval (default False)
hybrid_weights: Custom RRF weights for hybrid search (optional)
group_results: Enable grouping of similar results (default False)
grouping_threshold: Score threshold for grouping similar results (default 0.01)
"""
depth: int = -1
max_workers: int = 8
limit_per_dir: int = 10
total_limit: int = 100
offset: int = 0
include_symbols: bool = False
files_only: bool = False
include_semantic: bool = False
code_only: bool = False
exclude_extensions: Optional[List[str]] = None
hybrid_mode: bool = False
enable_fuzzy: bool = True
enable_vector: bool = False
pure_vector: bool = False
enable_splade: bool = False
enable_cascade: bool = False
hybrid_weights: Optional[Dict[str, float]] = None
group_results: bool = False
grouping_threshold: float = 0.01
@dataclass
class SearchStats:
"""Statistics collected during search execution.
Attributes:
dirs_searched: Number of directories searched
files_matched: Number of files with matches
time_ms: Total search time in milliseconds
errors: List of error messages encountered
"""
dirs_searched: int = 0
files_matched: int = 0
time_ms: float = 0
errors: List[str] = field(default_factory=list)
@dataclass
class ChainSearchResult:
"""Comprehensive search result with metadata.
Attributes:
query: Original search query
results: List of SearchResult objects
related_results: Expanded results from graph neighbors (optional)
symbols: List of Symbol objects (if include_symbols=True)
stats: SearchStats with execution metrics
"""
query: str
results: List[SearchResult]
symbols: List[Symbol]
stats: SearchStats
related_results: List[SearchResult] = field(default_factory=list)
@dataclass
class ReferenceResult:
"""Result from reference search in code_relationships table.
Attributes:
file_path: Path to the file containing the reference
line: Line number where the reference occurs (1-based)
column: Column number where the reference occurs (0-based)
context: Surrounding code snippet for context
relationship_type: Type of relationship (call, import, inheritance, etc.)
"""
file_path: str
line: int
column: int
context: str
relationship_type: str
class ChainSearchEngine:
"""Parallel chain search engine for hierarchical directory indexes.
Searches across multiple directory indexes in parallel, following subdirectory
links to recursively traverse the file tree. Supports depth limits, result
aggregation, and both content and symbol searches.
Thread-safe with configurable parallelism.
Attributes:
registry: Global project registry
mapper: Path mapping utility
logger: Python logger instance
"""
def __init__(self,
registry: RegistryStore,
mapper: PathMapper,
max_workers: int = 8,
config: Config | None = None):
"""Initialize chain search engine.
Args:
registry: Global project registry for path lookups
mapper: Path mapper for source/index conversions
max_workers: Maximum parallel workers (default 8)
"""
self.registry = registry
self.mapper = mapper
self.logger = logging.getLogger(__name__)
self._max_workers = max_workers
self._executor: Optional[ThreadPoolExecutor] = None
self._config = config
def _get_executor(self, max_workers: Optional[int] = None) -> ThreadPoolExecutor:
"""Get or create the shared thread pool executor.
Lazy initialization to avoid creating executor if never used.
Args:
max_workers: Override default max_workers if specified
Returns:
ThreadPoolExecutor instance
"""
workers = max_workers or self._max_workers
if self._executor is None:
self._executor = ThreadPoolExecutor(max_workers=workers)
return self._executor
def close(self) -> None:
"""Shutdown the thread pool executor."""
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
def __enter__(self) -> "ChainSearchEngine":
"""Context manager entry."""
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
"""Context manager exit."""
self.close()
def search(self, query: str,
source_path: Path,
options: Optional[SearchOptions] = None) -> ChainSearchResult:
"""Execute chain search from source_path with recursive traversal.
Process:
1. Locate starting index for source_path
2. Collect all child indexes based on depth limit
3. Search indexes in parallel using ThreadPoolExecutor
4. Aggregate, deduplicate, and rank results
Args:
query: FTS5 search query string
source_path: Starting directory path
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with results, symbols, and statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper)
>>> result = engine.search("authentication", Path("D:/project/src"))
>>> for r in result.results[:5]:
... print(f"{r.path}: {r.score:.2f}")
"""
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths to search
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 3: Parallel search
results, search_stats = self._search_parallel(
index_paths, query, options
)
stats.errors = search_stats.errors
# Step 3.5: Filter by extension if requested
if options.code_only or options.exclude_extensions:
results = self._filter_by_extension(
results, options.code_only, options.exclude_extensions
)
# Step 4: Merge and rank
final_results = self._merge_and_rank(results, options.total_limit, options.offset)
# Step 5: Optional grouping of similar results
if options.group_results:
from codexlens.search.ranking import group_similar_results
final_results = group_similar_results(
final_results, score_threshold_abs=options.grouping_threshold
)
stats.files_matched = len(final_results)
# Optional: Symbol search
symbols = []
if options.include_symbols:
symbols = self._search_symbols_parallel(
index_paths, query, None, options.total_limit
)
# Optional: graph expansion using precomputed neighbors
related_results: List[SearchResult] = []
if self._config is not None and getattr(self._config, "enable_graph_expansion", False):
try:
from codexlens.search.enrichment import SearchEnrichmentPipeline
pipeline = SearchEnrichmentPipeline(self.mapper, config=self._config)
related_results = pipeline.expand_related_results(final_results)
except Exception as exc:
self.logger.debug("Graph expansion failed: %s", exc)
related_results = []
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=final_results,
symbols=symbols,
stats=stats,
related_results=related_results,
)
def hybrid_cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
) -> ChainSearchResult:
"""Execute two-stage cascade search with hybrid coarse retrieval and cross-encoder reranking.
Hybrid cascade search process:
1. Stage 1 (Coarse): Fast retrieval using RRF fusion of FTS + SPLADE + Vector
to get coarse_k candidates
2. Stage 2 (Fine): CrossEncoder reranking of candidates to get final k results
This approach balances recall (from broad coarse search) with precision
(from expensive but accurate cross-encoder scoring).
Note: This method is the original hybrid approach. For binary vector cascade,
use binary_cascade_search() instead.
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with reranked results and statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper, config=config)
>>> result = engine.hybrid_cascade_search(
... "how to authenticate users",
... Path("D:/project/src"),
... k=10,
... coarse_k=100
... )
>>> for r in result.results:
... print(f"{r.path}: {r.score:.3f}")
"""
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Use config defaults if available
if self._config is not None:
if hasattr(self._config, "cascade_coarse_k"):
coarse_k = coarse_k or self._config.cascade_coarse_k
if hasattr(self._config, "cascade_fine_k"):
k = k or self._config.cascade_fine_k
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Stage 1: Coarse retrieval with hybrid search (FTS + SPLADE + Vector)
# Use hybrid mode for multi-signal retrieval
coarse_options = SearchOptions(
depth=options.depth,
max_workers=1, # Single thread for GPU safety
limit_per_dir=max(coarse_k // len(index_paths), 20),
total_limit=coarse_k,
hybrid_mode=True,
enable_fuzzy=options.enable_fuzzy,
enable_vector=True, # Enable vector for semantic matching
pure_vector=False,
hybrid_weights=options.hybrid_weights,
)
self.logger.debug(
"Cascade Stage 1: Coarse retrieval for %d candidates", coarse_k
)
coarse_results, search_stats = self._search_parallel(
index_paths, query, coarse_options
)
stats.errors = search_stats.errors
# Merge and deduplicate coarse results
coarse_merged = self._merge_and_rank(coarse_results, coarse_k)
self.logger.debug(
"Cascade Stage 1 complete: %d candidates retrieved", len(coarse_merged)
)
if not coarse_merged:
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Stage 2: Cross-encoder reranking
self.logger.debug(
"Cascade Stage 2: Cross-encoder reranking %d candidates to top-%d",
len(coarse_merged),
k,
)
final_results = self._cross_encoder_rerank(query, coarse_merged, k)
# Optional: grouping of similar results
if options.group_results:
from codexlens.search.ranking import group_similar_results
final_results = group_similar_results(
final_results, score_threshold_abs=options.grouping_threshold
)
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
self.logger.debug(
"Cascade search complete: %d results in %.2fms",
len(final_results),
stats.time_ms,
)
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def binary_cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
) -> ChainSearchResult:
"""Execute binary cascade search with binary coarse ranking and dense fine ranking.
Binary cascade search process:
1. Stage 1 (Coarse): Fast binary vector search using Hamming distance
to quickly filter to coarse_k candidates (256-dim binary, 32 bytes/vector)
2. Stage 2 (Fine): Dense vector cosine similarity for precise reranking
of candidates (2048-dim float32)
This approach leverages the speed of binary search (~100x faster) while
maintaining precision through dense vector reranking.
Performance characteristics:
- Binary search: O(N) with SIMD-accelerated XOR + popcount
- Dense rerank: Only applied to top coarse_k candidates
- Memory: 32 bytes (binary) + 8KB (dense) per chunk
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with reranked results and statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper, config=config)
>>> result = engine.binary_cascade_search(
... "how to authenticate users",
... Path("D:/project/src"),
... k=10,
... coarse_k=100
... )
>>> for r in result.results:
... print(f"{r.path}: {r.score:.3f}")
"""
if not NUMPY_AVAILABLE:
self.logger.warning(
"NumPy not available, falling back to hybrid cascade search"
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Use config defaults if available
if self._config is not None:
if hasattr(self._config, "cascade_coarse_k"):
coarse_k = coarse_k or self._config.cascade_coarse_k
if hasattr(self._config, "cascade_fine_k"):
k = k or self._config.cascade_fine_k
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Initialize embedding backends
try:
from codexlens.indexing.embedding import (
BinaryEmbeddingBackend,
DenseEmbeddingBackend,
)
from codexlens.semantic.ann_index import BinaryANNIndex
except ImportError as exc:
self.logger.warning(
"Binary cascade dependencies not available: %s. "
"Falling back to hybrid cascade search.",
exc
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Stage 1: Binary vector coarse retrieval
self.logger.debug(
"Binary Cascade Stage 1: Binary coarse retrieval for %d candidates",
coarse_k,
)
use_gpu = True
if self._config is not None:
use_gpu = getattr(self._config, "embedding_use_gpu", True)
try:
binary_backend = BinaryEmbeddingBackend(use_gpu=use_gpu)
query_binary_packed = binary_backend.embed_packed([query])[0]
except Exception as exc:
self.logger.warning(
"Failed to generate binary query embedding: %s. "
"Falling back to hybrid cascade search.",
exc
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Try centralized BinarySearcher first (preferred for mmap indexes)
# The index root is the parent of the first index path
index_root = index_paths[0].parent if index_paths else None
all_candidates: List[Tuple[int, int, Path]] = [] # (chunk_id, distance, index_path)
used_centralized = False
if index_root:
centralized_searcher = self._get_centralized_binary_searcher(index_root)
if centralized_searcher is not None:
try:
# BinarySearcher expects dense vector, not packed binary
from codexlens.semantic.embedder import Embedder
embedder = Embedder()
query_dense = embedder.embed_to_numpy([query])[0]
# Centralized search - returns (chunk_id, hamming_distance) tuples
results = centralized_searcher.search(query_dense, top_k=coarse_k)
for chunk_id, dist in results:
all_candidates.append((chunk_id, dist, index_root))
used_centralized = True
self.logger.debug(
"Centralized binary search found %d candidates", len(results)
)
except Exception as exc:
self.logger.debug(
"Centralized binary search failed: %s, falling back to per-directory",
exc
)
centralized_searcher.clear()
# Fallback: Search per-directory indexes with legacy BinaryANNIndex
if not used_centralized:
for index_path in index_paths:
try:
# Get or create binary index for this path (uses deprecated BinaryANNIndex)
binary_index = self._get_or_create_binary_index(index_path)
if binary_index is None or binary_index.count() == 0:
continue
# Search binary index
ids, distances = binary_index.search(query_binary_packed, coarse_k)
for chunk_id, dist in zip(ids, distances):
all_candidates.append((chunk_id, dist, index_path))
except Exception as exc:
self.logger.debug(
"Binary search failed for %s: %s", index_path, exc
)
stats.errors.append(f"Binary search failed for {index_path}: {exc}")
if not all_candidates:
self.logger.debug("No binary candidates found, falling back to hybrid")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Sort by Hamming distance and take top coarse_k
all_candidates.sort(key=lambda x: x[1])
coarse_candidates = all_candidates[:coarse_k]
self.logger.debug(
"Binary Cascade Stage 1 complete: %d candidates retrieved",
len(coarse_candidates),
)
# Stage 2: Dense vector fine ranking
self.logger.debug(
"Binary Cascade Stage 2: Dense reranking %d candidates to top-%d",
len(coarse_candidates),
k,
)
try:
dense_backend = DenseEmbeddingBackend(use_gpu=use_gpu)
query_dense = dense_backend.embed_to_numpy([query])[0]
except Exception as exc:
self.logger.warning(
"Failed to generate dense query embedding: %s. "
"Using Hamming distance scores only.",
exc
)
# Fall back to using Hamming distance as score
return self._build_results_from_candidates(
coarse_candidates[:k], index_paths, stats, query, start_time,
use_centralized=used_centralized
)
# Group candidates by index path for batch retrieval
candidates_by_index: Dict[Path, List[int]] = {}
for chunk_id, _, index_path in coarse_candidates:
if index_path not in candidates_by_index:
candidates_by_index[index_path] = []
candidates_by_index[index_path].append(chunk_id)
# Retrieve dense embeddings and compute cosine similarity
scored_results: List[Tuple[float, SearchResult]] = []
import sqlite3
for index_path, chunk_ids in candidates_by_index.items():
try:
# Collect valid rows and dense vectors for batch processing
valid_rows: List[Dict[str, Any]] = []
dense_vectors: List["np.ndarray"] = []
if used_centralized:
# Centralized mode: index_path is actually index_root directory
# Dense embeddings are in per-directory _index.db files
# referenced by source_index_db in chunk_metadata
meta_db_path = index_path / VECTORS_META_DB_NAME
if not meta_db_path.exists():
self.logger.debug(
"VectorMetadataStore not found at %s, skipping dense reranking", meta_db_path
)
continue
# Get chunk metadata with source_index_db references
meta_store = VectorMetadataStore(meta_db_path)
chunks_meta = meta_store.get_chunks_by_ids(chunk_ids)
# Group chunks by source_index_db
chunks_by_source: Dict[str, List[Dict[str, Any]]] = {}
for chunk in chunks_meta:
source_db = chunk.get("source_index_db")
if source_db:
if source_db not in chunks_by_source:
chunks_by_source[source_db] = []
chunks_by_source[source_db].append(chunk)
# Retrieve dense embeddings from each source_index_db
for source_db, source_chunks in chunks_by_source.items():
try:
source_chunk_ids = [c["chunk_id"] for c in source_chunks]
conn = sqlite3.connect(source_db)
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(source_chunk_ids))
# Try semantic_chunks first (newer schema), fall back to chunks
try:
rows = conn.execute(
f"SELECT id, embedding_dense FROM semantic_chunks WHERE id IN ({placeholders})",
source_chunk_ids
).fetchall()
except sqlite3.OperationalError:
rows = conn.execute(
f"SELECT id, embedding_dense FROM chunks WHERE id IN ({placeholders})",
source_chunk_ids
).fetchall()
conn.close()
# Build dense vector lookup
dense_lookup = {row["id"]: row["embedding_dense"] for row in rows}
# Process chunks with their embeddings
for chunk in source_chunks:
chunk_id = chunk["chunk_id"]
dense_bytes = dense_lookup.get(chunk_id)
if dense_bytes is not None:
valid_rows.append({
"id": chunk_id,
"file_path": chunk["file_path"],
"content": chunk["content"],
})
dense_vectors.append(np.frombuffer(dense_bytes, dtype=np.float32))
except Exception as exc:
self.logger.debug(
"Failed to get dense embeddings from %s: %s", source_db, exc
)
else:
# Per-directory mode: index_path is the _index.db file
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
rows = conn.execute(
f"SELECT id, file_path, content, embedding_dense FROM semantic_chunks WHERE id IN ({placeholders})",
chunk_ids
).fetchall()
conn.close()
for row in rows:
dense_bytes = row["embedding_dense"]
if dense_bytes is not None:
valid_rows.append(dict(row))
dense_vectors.append(np.frombuffer(dense_bytes, dtype=np.float32))
# Skip if no dense embeddings found
if not dense_vectors:
continue
# Stack into matrix for batch computation
doc_matrix = np.vstack(dense_vectors)
# Batch compute cosine similarities
scores = self._compute_cosine_similarity_batch(query_dense, doc_matrix)
# Create search results
for i, row in enumerate(valid_rows):
score = float(scores[i])
excerpt = (row.get("content") or "")[:500]
result = SearchResult(
path=row.get("file_path") or "",
score=score,
excerpt=excerpt,
)
scored_results.append((score, result))
except Exception as exc:
self.logger.debug(
"Dense reranking failed for %s: %s", index_path, exc
)
stats.errors.append(f"Dense reranking failed for {index_path}: {exc}")
# Sort by score descending and deduplicate by path
scored_results.sort(key=lambda x: x[0], reverse=True)
path_to_result: Dict[str, SearchResult] = {}
for score, result in scored_results:
if result.path not in path_to_result:
path_to_result[result.path] = result
final_results = list(path_to_result.values())[:k]
# Optional: grouping of similar results
if options.group_results:
from codexlens.search.ranking import group_similar_results
final_results = group_similar_results(
final_results, score_threshold_abs=options.grouping_threshold
)
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
self.logger.debug(
"Binary cascade search complete: %d results in %.2fms",
len(final_results),
stats.time_ms,
)
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
strategy: Optional[Literal["binary", "hybrid", "binary_rerank", "dense_rerank", "staged"]] = None,
) -> ChainSearchResult:
"""Unified cascade search entry point with strategy selection.
Provides a single interface for cascade search with configurable strategy:
- "binary": Uses binary vector coarse ranking + dense fine ranking (fastest)
- "hybrid": Uses FTS+SPLADE+Vector coarse ranking + cross-encoder reranking (original)
- "binary_rerank": Uses binary vector coarse ranking + cross-encoder reranking (best balance)
- "dense_rerank": Uses dense vector coarse ranking + cross-encoder reranking
- "staged": 4-stage pipeline: binary -> LSP expand -> clustering -> optional rerank
The strategy is determined with the following priority:
1. The `strategy` parameter (e.g., from CLI --cascade-strategy option)
2. Config `cascade_strategy` setting from settings.json
3. Default: "binary"
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
strategy: Cascade strategy - "binary", "hybrid", "binary_rerank", "dense_rerank", or "staged".
Returns:
ChainSearchResult with reranked results and statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper, config=config)
>>> # Use binary cascade (default, fastest)
>>> result = engine.cascade_search("auth", Path("D:/project"))
>>> # Use hybrid cascade (original behavior)
>>> result = engine.cascade_search("auth", Path("D:/project"), strategy="hybrid")
>>> # Use binary + cross-encoder (best balance of speed and quality)
>>> result = engine.cascade_search("auth", Path("D:/project"), strategy="binary_rerank")
>>> # Use 4-stage pipeline (binary + LSP expand + clustering + optional rerank)
>>> result = engine.cascade_search("auth", Path("D:/project"), strategy="staged")
"""
# Strategy priority: parameter > config > default
effective_strategy = strategy
valid_strategies = ("binary", "hybrid", "binary_rerank", "dense_rerank", "staged")
if effective_strategy is None:
# Not passed via parameter, check config
if self._config is not None:
config_strategy = getattr(self._config, "cascade_strategy", None)
if config_strategy in valid_strategies:
effective_strategy = config_strategy
# If still not set, apply default
if effective_strategy not in valid_strategies:
effective_strategy = "binary"
if effective_strategy == "binary":
return self.binary_cascade_search(query, source_path, k, coarse_k, options)
elif effective_strategy == "binary_rerank":
return self.binary_rerank_cascade_search(query, source_path, k, coarse_k, options)
elif effective_strategy == "dense_rerank":
return self.dense_rerank_cascade_search(query, source_path, k, coarse_k, options)
elif effective_strategy == "staged":
return self.staged_cascade_search(query, source_path, k, coarse_k, options)
else:
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
def staged_cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
) -> ChainSearchResult:
"""Execute 4-stage cascade search pipeline with binary, LSP expansion, clustering, and optional reranking.
Staged cascade search process:
1. Stage 1 (Binary Coarse): Fast binary vector search using Hamming distance
to quickly filter to coarse_k candidates (256-bit binary vectors)
2. Stage 2 (LSP Expansion): Expand coarse candidates using GraphExpander to
include related symbols (definitions, references, callers/callees)
3. Stage 3 (Clustering): Use configurable clustering strategy to group similar
results and select representative results from each cluster
4. Stage 4 (Optional Rerank): If config.enable_staged_rerank is True, apply
cross-encoder reranking for final precision
This approach combines the speed of binary search with graph-based context
expansion and diversity-preserving clustering for high-quality results.
Performance characteristics:
- Stage 1: O(N) binary search with SIMD acceleration (~8ms)
- Stage 2: O(k * d) graph traversal where d is expansion depth
- Stage 3: O(n^2) clustering on expanded candidates
- Stage 4: Optional cross-encoder reranking (API call)
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with per-stage statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper, config=config)
>>> result = engine.staged_cascade_search(
... "authentication handler",
... Path("D:/project/src"),
... k=10,
... coarse_k=100
... )
>>> for r in result.results:
... print(f"{r.path}: {r.score:.3f}")
"""
if not NUMPY_AVAILABLE:
self.logger.warning(
"NumPy not available, falling back to hybrid cascade search"
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Per-stage timing stats
stage_times: Dict[str, float] = {}
stage_counts: Dict[str, int] = {}
# Use config defaults if available
if self._config is not None:
if hasattr(self._config, "cascade_coarse_k"):
coarse_k = coarse_k or self._config.cascade_coarse_k
if hasattr(self._config, "cascade_fine_k"):
k = k or self._config.cascade_fine_k
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# ========== Stage 1: Binary Coarse Search ==========
stage1_start = time.time()
coarse_results, index_root = self._stage1_binary_search(
query, index_paths, coarse_k, stats
)
stage_times["stage1_binary_ms"] = (time.time() - stage1_start) * 1000
stage_counts["stage1_candidates"] = len(coarse_results)
self.logger.debug(
"Staged Stage 1: Binary search found %d candidates in %.2fms",
len(coarse_results), stage_times["stage1_binary_ms"]
)
if not coarse_results:
self.logger.debug("No binary candidates found, falling back to hybrid cascade")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# ========== Stage 2: LSP Graph Expansion ==========
stage2_start = time.time()
expanded_results = self._stage2_lsp_expand(coarse_results, index_root)
stage_times["stage2_expand_ms"] = (time.time() - stage2_start) * 1000
stage_counts["stage2_expanded"] = len(expanded_results)
self.logger.debug(
"Staged Stage 2: LSP expansion %d -> %d results in %.2fms",
len(coarse_results), len(expanded_results), stage_times["stage2_expand_ms"]
)
# ========== Stage 3: Clustering and Representative Selection ==========
stage3_start = time.time()
clustered_results = self._stage3_cluster_prune(expanded_results, k * 2)
stage_times["stage3_cluster_ms"] = (time.time() - stage3_start) * 1000
stage_counts["stage3_clustered"] = len(clustered_results)
self.logger.debug(
"Staged Stage 3: Clustering %d -> %d representatives in %.2fms",
len(expanded_results), len(clustered_results), stage_times["stage3_cluster_ms"]
)
# ========== Stage 4: Optional Cross-Encoder Reranking ==========
enable_rerank = False
if self._config is not None:
enable_rerank = getattr(self._config, "enable_staged_rerank", False)
if enable_rerank:
stage4_start = time.time()
final_results = self._stage4_optional_rerank(query, clustered_results, k)
stage_times["stage4_rerank_ms"] = (time.time() - stage4_start) * 1000
stage_counts["stage4_reranked"] = len(final_results)
self.logger.debug(
"Staged Stage 4: Reranking %d -> %d results in %.2fms",
len(clustered_results), len(final_results), stage_times["stage4_rerank_ms"]
)
else:
# Skip reranking, just take top-k by score
final_results = sorted(
clustered_results, key=lambda r: r.score, reverse=True
)[:k]
stage_counts["stage4_reranked"] = len(final_results)
# Deduplicate by path (keep highest score)
path_to_result: Dict[str, SearchResult] = {}
for result in final_results:
if result.path not in path_to_result or result.score > path_to_result[result.path].score:
path_to_result[result.path] = result
final_results = list(path_to_result.values())[:k]
# Optional: grouping of similar results
if options.group_results:
from codexlens.search.ranking import group_similar_results
final_results = group_similar_results(
final_results, score_threshold_abs=options.grouping_threshold
)
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
# Add per-stage stats to errors field (as JSON for now, will be proper field later)
stage_stats_json = json.dumps({
"stage_times": stage_times,
"stage_counts": stage_counts,
})
stats.errors.append(f"STAGE_STATS:{stage_stats_json}")
self.logger.debug(
"Staged cascade search complete: %d results in %.2fms "
"(stage1=%.1fms, stage2=%.1fms, stage3=%.1fms)",
len(final_results),
stats.time_ms,
stage_times.get("stage1_binary_ms", 0),
stage_times.get("stage2_expand_ms", 0),
stage_times.get("stage3_cluster_ms", 0),
)
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def _stage1_binary_search(
self,
query: str,
index_paths: List[Path],
coarse_k: int,
stats: SearchStats,
) -> Tuple[List[SearchResult], Optional[Path]]:
"""Stage 1: Binary vector coarse search using Hamming distance.
Reuses the binary coarse search logic from binary_cascade_search.
Args:
query: Search query string
index_paths: List of index database paths to search
coarse_k: Number of coarse candidates to retrieve
stats: SearchStats to update with errors
Returns:
Tuple of (list of SearchResult objects, index_root path or None)
"""
# Initialize binary embedding backend
try:
from codexlens.indexing.embedding import BinaryEmbeddingBackend
except ImportError as exc:
self.logger.warning(
"BinaryEmbeddingBackend not available: %s", exc
)
return [], None
# Try centralized BinarySearcher first (preferred for mmap indexes)
index_root = index_paths[0].parent if index_paths else None
coarse_candidates: List[Tuple[int, int, Path]] = [] # (chunk_id, distance, index_path)
used_centralized = False
if index_root:
binary_searcher = self._get_centralized_binary_searcher(index_root)
if binary_searcher is not None:
try:
from codexlens.semantic.embedder import Embedder
embedder = Embedder()
query_dense = embedder.embed_to_numpy([query])[0]
results = binary_searcher.search(query_dense, top_k=coarse_k)
for chunk_id, distance in results:
coarse_candidates.append((chunk_id, distance, index_root))
if coarse_candidates:
used_centralized = True
self.logger.debug(
"Stage 1 centralized binary search: %d candidates", len(results)
)
except Exception as exc:
self.logger.debug(f"Centralized binary search failed: {exc}")
if not used_centralized:
# Fallback to per-directory binary indexes
use_gpu = True
if self._config is not None:
use_gpu = getattr(self._config, "embedding_use_gpu", True)
try:
binary_backend = BinaryEmbeddingBackend(use_gpu=use_gpu)
query_binary = binary_backend.embed_packed([query])[0]
except Exception as exc:
self.logger.warning(f"Failed to generate binary query embedding: {exc}")
return [], index_root
for index_path in index_paths:
try:
binary_index = self._get_or_create_binary_index(index_path)
if binary_index is None or binary_index.count() == 0:
continue
ids, distances = binary_index.search(query_binary, coarse_k)
for chunk_id, dist in zip(ids, distances):
coarse_candidates.append((chunk_id, dist, index_path))
except Exception as exc:
self.logger.debug(
"Binary search failed for %s: %s", index_path, exc
)
if not coarse_candidates:
return [], index_root
# Sort by Hamming distance and take top coarse_k
coarse_candidates.sort(key=lambda x: x[1])
coarse_candidates = coarse_candidates[:coarse_k]
# Build SearchResult objects from candidates
coarse_results: List[SearchResult] = []
# Group candidates by index path for efficient retrieval
candidates_by_index: Dict[Path, List[int]] = {}
for chunk_id, _, idx_path in coarse_candidates:
if idx_path not in candidates_by_index:
candidates_by_index[idx_path] = []
candidates_by_index[idx_path].append(chunk_id)
# Retrieve chunk content
import sqlite3
central_meta_path = index_root / VECTORS_META_DB_NAME if index_root else None
central_meta_store = None
if central_meta_path and central_meta_path.exists():
central_meta_store = VectorMetadataStore(central_meta_path)
for idx_path, chunk_ids in candidates_by_index.items():
try:
chunks_data = []
if central_meta_store:
chunks_data = central_meta_store.get_chunks_by_ids(chunk_ids)
if not chunks_data and used_centralized:
meta_db_path = idx_path / VECTORS_META_DB_NAME
if meta_db_path.exists():
meta_store = VectorMetadataStore(meta_db_path)
chunks_data = meta_store.get_chunks_by_ids(chunk_ids)
if not chunks_data:
try:
conn = sqlite3.connect(str(idx_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
cursor = conn.execute(
f"""
SELECT id, file_path, content, metadata, category
FROM semantic_chunks
WHERE id IN ({placeholders})
""",
chunk_ids
)
chunks_data = [
{
"id": row["id"],
"file_path": row["file_path"],
"content": row["content"],
"metadata": row["metadata"],
"category": row["category"],
}
for row in cursor.fetchall()
]
conn.close()
except Exception:
pass
for chunk in chunks_data:
chunk_id = chunk.get("id") or chunk.get("chunk_id")
distance = next(
(d for cid, d, _ in coarse_candidates if cid == chunk_id),
256
)
score = 1.0 - (distance / 256.0)
content = chunk.get("content", "")
# Extract symbol info from metadata if available
metadata = chunk.get("metadata")
symbol_name = None
symbol_kind = None
start_line = None
end_line = None
if metadata:
try:
meta_dict = json.loads(metadata) if isinstance(metadata, str) else metadata
symbol_name = meta_dict.get("symbol_name")
symbol_kind = meta_dict.get("symbol_kind")
start_line = meta_dict.get("start_line")
end_line = meta_dict.get("end_line")
except Exception:
pass
result = SearchResult(
path=chunk.get("file_path", ""),
score=float(score),
excerpt=content[:500] if content else "",
content=content,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
start_line=start_line,
end_line=end_line,
)
coarse_results.append(result)
except Exception as exc:
self.logger.debug(
"Failed to retrieve chunks from %s: %s", idx_path, exc
)
stats.errors.append(f"Stage 1 chunk retrieval failed for {idx_path}: {exc}")
return coarse_results, index_root
def _stage2_lsp_expand(
self,
coarse_results: List[SearchResult],
index_root: Optional[Path],
) -> List[SearchResult]:
"""Stage 2: LSP-based graph expansion using GraphExpander.
Expands coarse results with related symbols (definitions, references,
callers, callees) using precomputed graph neighbors.
Args:
coarse_results: Results from Stage 1 binary search
index_root: Root path of the index (for graph database access)
Returns:
Combined list of original results plus expanded related results
"""
if not coarse_results or index_root is None:
return coarse_results
try:
from codexlens.search.graph_expander import GraphExpander
# Get expansion depth from config
depth = 2
if self._config is not None:
depth = getattr(self._config, "graph_expansion_depth", 2)
expander = GraphExpander(self.mapper, config=self._config)
# Expand top results (limit expansion to avoid explosion)
max_expand = min(10, len(coarse_results))
max_related = 50
related_results = expander.expand(
coarse_results,
depth=depth,
max_expand=max_expand,
max_related=max_related,
)
if related_results:
self.logger.debug(
"Stage 2 expanded %d base results to %d related symbols",
len(coarse_results), len(related_results)
)
# Combine: original results + related results
# Keep original results first (higher relevance)
combined = list(coarse_results)
seen_keys = {(r.path, r.symbol_name, r.start_line) for r in coarse_results}
for related in related_results:
key = (related.path, related.symbol_name, related.start_line)
if key not in seen_keys:
seen_keys.add(key)
combined.append(related)
return combined
except ImportError as exc:
self.logger.debug("GraphExpander not available: %s", exc)
return coarse_results
except Exception as exc:
self.logger.debug("Stage 2 LSP expansion failed: %s", exc)
return coarse_results
def _stage3_cluster_prune(
self,
expanded_results: List[SearchResult],
target_count: int,
) -> List[SearchResult]:
"""Stage 3: Cluster expanded results and select representatives.
Uses the extensible clustering infrastructure from codexlens.search.clustering
to group similar results and select the best representative from each cluster.
Args:
expanded_results: Results from Stage 2 expansion
target_count: Target number of representative results
Returns:
List of representative results (one per cluster)
"""
if not expanded_results:
return []
# If few results, skip clustering
if len(expanded_results) <= target_count:
return expanded_results
try:
from codexlens.search.clustering import (
ClusteringConfig,
get_strategy,
)
# Get clustering config from config
strategy_name = "auto"
min_cluster_size = 3
if self._config is not None:
strategy_name = getattr(self._config, "staged_clustering_strategy", "auto")
min_cluster_size = getattr(self._config, "staged_clustering_min_size", 3)
# Get embeddings for clustering
# Try to get dense embeddings from results' content
embeddings = self._get_embeddings_for_clustering(expanded_results)
if embeddings is None or len(embeddings) == 0:
# No embeddings available, fall back to score-based selection
self.logger.debug("No embeddings for clustering, using score-based selection")
return sorted(
expanded_results, key=lambda r: r.score, reverse=True
)[:target_count]
# Create clustering config
config = ClusteringConfig(
min_cluster_size=min(min_cluster_size, max(2, len(expanded_results) // 5)),
min_samples=2,
metric="cosine",
)
# Get strategy with fallback
strategy = get_strategy(strategy_name, config, fallback=True)
# Cluster and select representatives
representatives = strategy.fit_predict(embeddings, expanded_results)
self.logger.debug(
"Stage 3 clustered %d results into %d representatives using %s",
len(expanded_results), len(representatives), type(strategy).__name__
)
# If clustering returned too few, supplement with top-scored unclustered
if len(representatives) < target_count:
rep_paths = {r.path for r in representatives}
remaining = [r for r in expanded_results if r.path not in rep_paths]
remaining_sorted = sorted(remaining, key=lambda r: r.score, reverse=True)
representatives.extend(remaining_sorted[:target_count - len(representatives)])
return representatives[:target_count]
except ImportError as exc:
self.logger.debug("Clustering not available: %s", exc)
return sorted(
expanded_results, key=lambda r: r.score, reverse=True
)[:target_count]
except Exception as exc:
self.logger.debug("Stage 3 clustering failed: %s", exc)
return sorted(
expanded_results, key=lambda r: r.score, reverse=True
)[:target_count]
def _stage4_optional_rerank(
self,
query: str,
clustered_results: List[SearchResult],
k: int,
) -> List[SearchResult]:
"""Stage 4: Optional cross-encoder reranking.
Applies cross-encoder reranking if enabled in config.
Args:
query: Search query string
clustered_results: Results from Stage 3 clustering
k: Number of final results to return
Returns:
Reranked results sorted by cross-encoder score
"""
if not clustered_results:
return []
# Use existing _cross_encoder_rerank method
return self._cross_encoder_rerank(query, clustered_results, k)
def _get_embeddings_for_clustering(
self,
results: List[SearchResult],
) -> Optional["np.ndarray"]:
"""Get dense embeddings for clustering results.
Tries to generate embeddings from result content for clustering.
Args:
results: List of SearchResult objects
Returns:
NumPy array of embeddings or None if not available
"""
if not NUMPY_AVAILABLE:
return None
if not results:
return None
try:
from codexlens.semantic.factory import get_embedder
# Get embedding settings from config
embedding_backend = "fastembed"
embedding_model = "code"
use_gpu = True
if self._config is not None:
embedding_backend = getattr(self._config, "embedding_backend", "fastembed")
embedding_model = getattr(self._config, "embedding_model", "code")
use_gpu = getattr(self._config, "embedding_use_gpu", True)
# Create embedder
if embedding_backend == "litellm":
embedder = get_embedder(backend="litellm", model=embedding_model)
else:
embedder = get_embedder(backend="fastembed", profile=embedding_model, use_gpu=use_gpu)
# Extract text content from results
texts = []
for result in results:
# Use content if available, otherwise use excerpt
text = result.content or result.excerpt or ""
if not text and result.path:
text = result.path
texts.append(text[:2000]) # Limit text length
# Generate embeddings
embeddings = embedder.embed_to_numpy(texts)
return embeddings
except ImportError as exc:
self.logger.debug("Embedder not available for clustering: %s", exc)
return None
except Exception as exc:
self.logger.debug("Failed to generate embeddings for clustering: %s", exc)
return None
def binary_rerank_cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
) -> ChainSearchResult:
"""Execute binary cascade search with cross-encoder reranking.
Combines the speed of binary vector coarse search with the quality of
cross-encoder reranking for the best balance of speed and accuracy.
Binary + Reranker cascade process:
1. Stage 1 (Coarse): Fast binary vector search using Hamming distance
to quickly filter to coarse_k candidates (256-dim binary, 32 bytes/vector)
2. Stage 2 (Fine): Cross-encoder reranking for precise semantic ranking
of candidates using query-document attention
This approach is typically faster than hybrid_cascade_search while
achieving similar or better quality through cross-encoder reranking.
Performance characteristics:
- Binary search: O(N) with SIMD-accelerated XOR + popcount (~8ms)
- Cross-encoder: Applied to top coarse_k candidates (~15-20s for API)
- Total: Faster coarse + high-quality fine = best balance
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with cross-encoder reranked results and statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper, config=config)
>>> result = engine.binary_rerank_cascade_search(
... "how to authenticate users",
... Path("D:/project/src"),
... k=10,
... coarse_k=100
... )
>>> for r in result.results:
... print(f"{r.path}: {r.score:.3f}")
"""
if not NUMPY_AVAILABLE:
self.logger.warning(
"NumPy not available, falling back to hybrid cascade search"
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Use config defaults if available
if self._config is not None:
if hasattr(self._config, "cascade_coarse_k"):
coarse_k = coarse_k or self._config.cascade_coarse_k
if hasattr(self._config, "cascade_fine_k"):
k = k or self._config.cascade_fine_k
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Initialize binary embedding backend
try:
from codexlens.indexing.embedding import BinaryEmbeddingBackend
except ImportError as exc:
self.logger.warning(
"BinaryEmbeddingBackend not available: %s, falling back to hybrid cascade",
exc
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Step 4: Binary coarse search (same as binary_cascade_search)
binary_coarse_time = time.time()
coarse_candidates: List[Tuple[int, int, Path]] = []
# Try centralized BinarySearcher first (preferred for mmap indexes)
# The index root is the parent of the first index path
index_root = index_paths[0].parent if index_paths else None
used_centralized = False
if index_root:
binary_searcher = self._get_centralized_binary_searcher(index_root)
if binary_searcher is not None:
try:
# BinarySearcher expects dense vector, not packed binary
from codexlens.semantic.embedder import Embedder
embedder = Embedder()
query_dense = embedder.embed_to_numpy([query])[0]
results = binary_searcher.search(query_dense, top_k=coarse_k)
for chunk_id, distance in results:
coarse_candidates.append((chunk_id, distance, index_root))
# Only mark as used if we got actual results
if coarse_candidates:
used_centralized = True
self.logger.debug(
"Binary coarse search (centralized): %d candidates in %.2fms",
len(results), (time.time() - binary_coarse_time) * 1000
)
except Exception as exc:
self.logger.debug(f"Centralized binary search failed: {exc}")
if not used_centralized:
# Get GPU preference from config
use_gpu = True
if self._config is not None:
use_gpu = getattr(self._config, "embedding_use_gpu", True)
try:
binary_backend = BinaryEmbeddingBackend(use_gpu=use_gpu)
query_binary = binary_backend.embed_packed([query])[0]
except Exception as exc:
self.logger.warning(f"Failed to generate binary query embedding: {exc}")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Fallback to per-directory binary indexes
for index_path in index_paths:
try:
binary_index = self._get_or_create_binary_index(index_path)
if binary_index is None or binary_index.count() == 0:
continue
# BinaryANNIndex returns (ids, distances) arrays
ids, distances = binary_index.search(query_binary, coarse_k)
for chunk_id, dist in zip(ids, distances):
coarse_candidates.append((chunk_id, dist, index_path))
except Exception as exc:
self.logger.debug(
"Binary search failed for %s: %s", index_path, exc
)
if not coarse_candidates:
self.logger.info("No binary candidates found, falling back to hybrid cascade for reranking")
# Fall back to hybrid_cascade_search which uses FTS+Vector coarse + cross-encoder rerank
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Sort by Hamming distance and take top coarse_k
coarse_candidates.sort(key=lambda x: x[1])
coarse_candidates = coarse_candidates[:coarse_k]
self.logger.debug(
"Binary coarse search: %d candidates in %.2fms",
len(coarse_candidates), (time.time() - binary_coarse_time) * 1000
)
# Step 5: Build SearchResult objects for cross-encoder reranking
# Group candidates by index path for efficient retrieval
candidates_by_index: Dict[Path, List[int]] = {}
for chunk_id, distance, index_path in coarse_candidates:
if index_path not in candidates_by_index:
candidates_by_index[index_path] = []
candidates_by_index[index_path].append(chunk_id)
# Retrieve chunk content for reranking
# Always use centralized VectorMetadataStore since chunks are stored there
import sqlite3
coarse_results: List[SearchResult] = []
# Find the centralized metadata store path (project root)
# index_root was computed earlier, use it for chunk retrieval
central_meta_path = index_root / VECTORS_META_DB_NAME if index_root else None
central_meta_store = None
if central_meta_path and central_meta_path.exists():
central_meta_store = VectorMetadataStore(central_meta_path)
for index_path, chunk_ids in candidates_by_index.items():
try:
chunks_data = []
if central_meta_store:
# Try centralized VectorMetadataStore first (preferred)
chunks_data = central_meta_store.get_chunks_by_ids(chunk_ids)
if not chunks_data and used_centralized:
# Fallback to per-index-path meta store
meta_db_path = index_path / VECTORS_META_DB_NAME
if meta_db_path.exists():
meta_store = VectorMetadataStore(meta_db_path)
chunks_data = meta_store.get_chunks_by_ids(chunk_ids)
if not chunks_data:
# Final fallback: query semantic_chunks table directly
# This handles per-directory indexes with semantic_chunks table
try:
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
cursor = conn.execute(
f"""
SELECT id, file_path, content, metadata, category
FROM semantic_chunks
WHERE id IN ({placeholders})
""",
chunk_ids
)
chunks_data = [
{
"id": row["id"],
"file_path": row["file_path"],
"content": row["content"],
"metadata": row["metadata"],
"category": row["category"],
}
for row in cursor.fetchall()
]
conn.close()
except Exception:
pass # Skip if table doesn't exist
for chunk in chunks_data:
# Find the Hamming distance for this chunk
chunk_id = chunk.get("id") or chunk.get("chunk_id")
distance = next(
(d for cid, d, _ in coarse_candidates if cid == chunk_id),
256
)
# Initial score from Hamming distance (will be replaced by reranker)
score = 1.0 - (distance / 256.0)
content = chunk.get("content", "")
result = SearchResult(
path=chunk.get("file_path", ""),
score=float(score),
excerpt=content[:500] if content else "",
content=content,
)
coarse_results.append(result)
except Exception as exc:
self.logger.debug(
"Failed to retrieve chunks from %s: %s", index_path, exc
)
if not coarse_results:
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query, results=[], symbols=[], stats=stats
)
self.logger.debug(
"Retrieved %d chunks for cross-encoder reranking", len(coarse_results)
)
# Step 6: Cross-encoder reranking (same as hybrid_cascade_search)
rerank_time = time.time()
reranked_results = self._cross_encoder_rerank(query, coarse_results, top_k=k)
self.logger.debug(
"Cross-encoder reranking: %d results in %.2fms",
len(reranked_results), (time.time() - rerank_time) * 1000
)
# Deduplicate by path (keep highest score)
path_to_result: Dict[str, SearchResult] = {}
for result in reranked_results:
if result.path not in path_to_result or result.score > path_to_result[result.path].score:
path_to_result[result.path] = result
final_results = list(path_to_result.values())[:k]
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
self.logger.debug(
"Binary+Rerank cascade search complete: %d results in %.2fms",
len(final_results),
stats.time_ms,
)
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def dense_rerank_cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
) -> ChainSearchResult:
"""Execute dense cascade search with cross-encoder reranking.
Combines dense vector coarse search (HNSW) with cross-encoder reranking
for comparison with binary_rerank strategy.
Dense + Reranker cascade process:
1. Stage 1 (Coarse): Dense vector search using HNSW (cosine similarity)
to get coarse_k candidates (2048-dim float32)
2. Stage 2 (Fine): Cross-encoder reranking for precise semantic ranking
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with cross-encoder reranked results and statistics
"""
if not NUMPY_AVAILABLE:
self.logger.warning(
"NumPy not available, falling back to hybrid cascade search"
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Use config defaults if available
if self._config is not None:
if hasattr(self._config, "cascade_coarse_k"):
coarse_k = coarse_k or self._config.cascade_coarse_k
if hasattr(self._config, "cascade_fine_k"):
k = k or self._config.cascade_fine_k
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 3: Find centralized HNSW index and read model config
from codexlens.config import VECTORS_HNSW_NAME
central_hnsw_path = None
index_root = start_index.parent
current_dir = index_root
for _ in range(10): # Limit search depth
candidate = current_dir / VECTORS_HNSW_NAME
if candidate.exists():
central_hnsw_path = candidate
index_root = current_dir # Update to where HNSW was found
break
parent = current_dir.parent
if parent == current_dir: # Reached root
break
current_dir = parent
# Step 4: Generate query dense embedding using same model as centralized index
# Read embedding config to match the model used during indexing
dense_coarse_time = time.time()
try:
from codexlens.semantic.factory import get_embedder
# Get embedding settings from centralized index config (preferred) or fallback to self._config
embedding_backend = "litellm" # Default to API for dense
embedding_model = "qwen3-embedding-sf" # Default model
use_gpu = True
# Try to read model config from centralized index's embeddings_config table
central_index_db = index_root / "_index.db"
if central_index_db.exists():
try:
from codexlens.semantic.vector_store import VectorStore
with VectorStore(central_index_db) as vs:
model_config = vs.get_model_config()
if model_config:
embedding_backend = model_config.get("backend", embedding_backend)
embedding_model = model_config.get("model_name", embedding_model)
self.logger.debug(
"Read model config from centralized index: %s/%s",
embedding_backend, embedding_model
)
except Exception as e:
self.logger.debug("Failed to read centralized model config: %s", e)
# Fallback to self._config if not read from index
if self._config is not None:
if embedding_backend == "litellm" and embedding_model == "qwen3-embedding-sf":
# Only use config values if we didn't read from centralized index
config_backend = getattr(self._config, "embedding_backend", None)
config_model = getattr(self._config, "embedding_model", None)
if config_backend:
embedding_backend = config_backend
if config_model:
embedding_model = config_model
use_gpu = getattr(self._config, "embedding_use_gpu", True)
# Create embedder matching index configuration
if embedding_backend == "litellm":
embedder = get_embedder(backend="litellm", model=embedding_model)
else:
embedder = get_embedder(backend="fastembed", profile=embedding_model, use_gpu=use_gpu)
query_dense = embedder.embed_to_numpy([query])[0]
self.logger.debug(f"Dense query embedding: {query_dense.shape[0]}-dim via {embedding_backend}/{embedding_model}")
except Exception as exc:
self.logger.warning(f"Failed to generate dense query embedding: {exc}")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Step 5: Dense coarse search using centralized HNSW index
coarse_candidates: List[Tuple[int, float, Path]] = [] # (chunk_id, distance, index_path)
if central_hnsw_path is not None:
# Use centralized index
try:
from codexlens.semantic.ann_index import ANNIndex
ann_index = ANNIndex.create_central(
index_root=index_root,
dim=query_dense.shape[0],
)
if ann_index.load() and ann_index.count() > 0:
# Search centralized HNSW index
ids, distances = ann_index.search(query_dense, top_k=coarse_k)
for chunk_id, dist in zip(ids, distances):
coarse_candidates.append((chunk_id, dist, index_root / "_index.db"))
self.logger.debug(
"Centralized dense search: %d candidates from %s",
len(ids), central_hnsw_path
)
except Exception as exc:
self.logger.debug(
"Centralized dense search failed for %s: %s", central_hnsw_path, exc
)
# Fallback: try per-directory HNSW indexes if centralized not found
if not coarse_candidates:
for index_path in index_paths:
try:
# Load HNSW index
from codexlens.semantic.ann_index import ANNIndex
ann_index = ANNIndex(index_path, dim=query_dense.shape[0])
if not ann_index.load():
continue
if ann_index.count() == 0:
continue
# Search HNSW index
ids, distances = ann_index.search(query_dense, top_k=coarse_k)
for chunk_id, dist in zip(ids, distances):
coarse_candidates.append((chunk_id, dist, index_path))
except Exception as exc:
self.logger.debug(
"Dense search failed for %s: %s", index_path, exc
)
if not coarse_candidates:
self.logger.info("No dense candidates found, falling back to hybrid cascade")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Sort by distance (ascending for cosine distance) and take top coarse_k
coarse_candidates.sort(key=lambda x: x[1])
coarse_candidates = coarse_candidates[:coarse_k]
self.logger.debug(
"Dense coarse search: %d candidates in %.2fms",
len(coarse_candidates), (time.time() - dense_coarse_time) * 1000
)
# Step 6: Build SearchResult objects for cross-encoder reranking
candidates_by_index: Dict[Path, List[int]] = {}
for chunk_id, distance, index_path in coarse_candidates:
if index_path not in candidates_by_index:
candidates_by_index[index_path] = []
candidates_by_index[index_path].append(chunk_id)
# Retrieve chunk content for reranking
import sqlite3
coarse_results: List[SearchResult] = []
for index_path, chunk_ids in candidates_by_index.items():
try:
# For centralized index, use _vectors_meta.db for chunk metadata
# which contains file_path, content, start_line, end_line
if central_hnsw_path is not None and index_path == index_root / "_index.db":
# Use centralized metadata from _vectors_meta.db
meta_db_path = index_root / "_vectors_meta.db"
if meta_db_path.exists():
conn = sqlite3.connect(str(meta_db_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
cursor = conn.execute(
f"""
SELECT chunk_id, file_path, content, start_line, end_line
FROM chunk_metadata
WHERE chunk_id IN ({placeholders})
""",
chunk_ids
)
chunks_data = [
{
"id": row["chunk_id"],
"file_path": row["file_path"],
"content": row["content"],
"metadata": json.dumps({
"start_line": row["start_line"],
"end_line": row["end_line"]
}),
"category": "code" if row["file_path"].endswith(('.py', '.ts', '.js', '.java', '.go', '.rs', '.cpp', '.c')) else "doc",
}
for row in cursor.fetchall()
]
conn.close()
else:
chunks_data = []
else:
# Fall back to per-directory semantic_chunks table
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
cursor = conn.execute(
f"""
SELECT id, file_path, content, metadata, category
FROM semantic_chunks
WHERE id IN ({placeholders})
""",
chunk_ids
)
chunks_data = [
{
"id": row["id"],
"file_path": row["file_path"],
"content": row["content"],
"metadata": row["metadata"],
"category": row["category"],
}
for row in cursor.fetchall()
]
conn.close()
for chunk in chunks_data:
chunk_id = chunk.get("id")
distance = next(
(d for cid, d, _ in coarse_candidates if cid == chunk_id),
1.0
)
# Convert cosine distance to score (clamp to [0, 1] for Pydantic validation)
# Cosine distance can be > 1 for anti-correlated vectors, causing negative scores
score = max(0.0, 1.0 - distance)
content = chunk.get("content", "")
result = SearchResult(
path=chunk.get("file_path", ""),
score=float(score),
excerpt=content[:500] if content else "",
content=content,
)
coarse_results.append(result)
except Exception as exc:
self.logger.debug(
"Failed to retrieve chunks from %s: %s", index_path, exc
)
if not coarse_results:
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query, results=[], symbols=[], stats=stats
)
self.logger.debug(
"Retrieved %d chunks for cross-encoder reranking", len(coarse_results)
)
# Step 6: Cross-encoder reranking
rerank_time = time.time()
reranked_results = self._cross_encoder_rerank(query, coarse_results, top_k=k)
self.logger.debug(
"Cross-encoder reranking: %d results in %.2fms",
len(reranked_results), (time.time() - rerank_time) * 1000
)
# Deduplicate by path (keep highest score)
path_to_result: Dict[str, SearchResult] = {}
for result in reranked_results:
if result.path not in path_to_result or result.score > path_to_result[result.path].score:
path_to_result[result.path] = result
final_results = list(path_to_result.values())[:k]
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
self.logger.debug(
"Dense+Rerank cascade search complete: %d results in %.2fms",
len(final_results),
stats.time_ms,
)
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def _get_or_create_binary_index(self, index_path: Path) -> Optional[Any]:
"""Get or create a BinaryANNIndex for the given index path.
.. deprecated::
This method uses the deprecated BinaryANNIndex. For centralized indexes,
use _get_centralized_binary_searcher() instead.
Attempts to load an existing binary index from disk. If not found,
returns None (binary index should be built during indexing).
Args:
index_path: Path to the _index.db file
Returns:
BinaryANNIndex instance or None if not available
"""
try:
import warnings
# Suppress deprecation warning since we're using it intentionally for legacy support
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
from codexlens.semantic.ann_index import BinaryANNIndex
binary_index = BinaryANNIndex(index_path, dim=256)
if binary_index.load():
return binary_index
return None
except Exception as exc:
self.logger.debug("Failed to load binary index for %s: %s", index_path, exc)
return None
def _get_centralized_binary_searcher(self, index_root: Path) -> Optional[Any]:
"""Get centralized BinarySearcher for memory-mapped binary vectors.
This is the preferred method for centralized indexes, providing faster
search via memory-mapped files.
Args:
index_root: Root directory containing centralized index files
Returns:
BinarySearcher instance or None if not available
"""
try:
from codexlens.search.binary_searcher import BinarySearcher
binary_searcher = BinarySearcher(index_root)
if binary_searcher.load():
self.logger.debug(
"Using centralized BinarySearcher with %d vectors (mmap=%s)",
binary_searcher.vector_count,
binary_searcher.is_memmap
)
return binary_searcher
return None
except Exception as exc:
self.logger.debug("Failed to load centralized binary searcher: %s", exc)
return None
def _compute_cosine_similarity(
self,
query_vec: "np.ndarray",
doc_vec: "np.ndarray",
) -> float:
"""Compute cosine similarity between query and document vectors.
Args:
query_vec: Query embedding vector
doc_vec: Document embedding vector
Returns:
Cosine similarity score in range [-1, 1]
"""
if not NUMPY_AVAILABLE:
return 0.0
# Ensure same shape
min_len = min(len(query_vec), len(doc_vec))
q = query_vec[:min_len]
d = doc_vec[:min_len]
# Compute cosine similarity
dot_product = np.dot(q, d)
norm_q = np.linalg.norm(q)
norm_d = np.linalg.norm(d)
if norm_q == 0 or norm_d == 0:
return 0.0
return float(dot_product / (norm_q * norm_d))
def _compute_cosine_similarity_batch(
self,
query_vec: "np.ndarray",
doc_matrix: "np.ndarray",
) -> "np.ndarray":
"""Compute cosine similarity between query and multiple document vectors.
Uses vectorized matrix operations for efficient batch computation.
Args:
query_vec: Query embedding vector of shape (dim,)
doc_matrix: Document embeddings matrix of shape (n_docs, dim)
Returns:
Array of cosine similarity scores of shape (n_docs,)
"""
if not NUMPY_AVAILABLE:
return np.zeros(doc_matrix.shape[0])
# Ensure query is 1D
if query_vec.ndim > 1:
query_vec = query_vec.flatten()
# Handle dimension mismatch by truncating to smaller dimension
min_dim = min(len(query_vec), doc_matrix.shape[1])
q = query_vec[:min_dim]
docs = doc_matrix[:, :min_dim]
# Compute query norm once
norm_q = np.linalg.norm(q)
if norm_q == 0:
return np.zeros(docs.shape[0])
# Normalize query
q_normalized = q / norm_q
# Compute document norms (vectorized)
doc_norms = np.linalg.norm(docs, axis=1)
# Avoid division by zero
nonzero_mask = doc_norms > 0
scores = np.zeros(docs.shape[0], dtype=np.float32)
if np.any(nonzero_mask):
# Normalize documents with non-zero norms
docs_normalized = docs[nonzero_mask] / doc_norms[nonzero_mask, np.newaxis]
# Batch dot product: (n_docs, dim) @ (dim,) = (n_docs,)
scores[nonzero_mask] = docs_normalized @ q_normalized
return scores
def _build_results_from_candidates(
self,
candidates: List[Tuple[int, int, Path]],
index_paths: List[Path],
stats: SearchStats,
query: str,
start_time: float,
use_centralized: bool = False,
) -> ChainSearchResult:
"""Build ChainSearchResult from binary candidates using Hamming distance scores.
Used as fallback when dense embeddings are not available.
Args:
candidates: List of (chunk_id, hamming_distance, index_path) tuples
index_paths: List of all searched index paths
stats: SearchStats to update
query: Original query string
start_time: Search start time for timing
use_centralized: If True, index_path is the index_root directory
and VectorMetadataStore should be used instead of SQLiteStore
Returns:
ChainSearchResult with results scored by Hamming distance
"""
results: List[SearchResult] = []
# Group by index path
candidates_by_index: Dict[Path, List[Tuple[int, int]]] = {}
for chunk_id, distance, index_path in candidates:
if index_path not in candidates_by_index:
candidates_by_index[index_path] = []
candidates_by_index[index_path].append((chunk_id, distance))
for index_path, chunk_tuples in candidates_by_index.items():
try:
chunk_ids = [c[0] for c in chunk_tuples]
# Use VectorMetadataStore for centralized search, SQLiteStore for per-directory
if use_centralized:
# index_path is actually index_root directory for centralized search
meta_db_path = index_path / VECTORS_META_DB_NAME
if not meta_db_path.exists():
self.logger.debug(
"VectorMetadataStore not found at %s, skipping", meta_db_path
)
continue
meta_store = VectorMetadataStore(meta_db_path)
chunks_data = meta_store.get_chunks_by_ids(chunk_ids)
else:
store = SQLiteStore(index_path)
chunks_data = store.get_chunks_by_ids(chunk_ids)
chunk_content: Dict[int, Dict[str, Any]] = {
c["id"]: c for c in chunks_data
}
for chunk_id, distance in chunk_tuples:
chunk_info = chunk_content.get(chunk_id)
if chunk_info is None:
continue
# Convert Hamming distance to score (lower distance = higher score)
# Max Hamming distance for 256-bit is 256
score = 1.0 - (distance / 256.0)
excerpt = chunk_info.get("content", "")[:500]
result = SearchResult(
path=chunk_info.get("file_path", ""),
score=float(score),
excerpt=excerpt,
)
results.append(result)
except Exception as exc:
self.logger.debug(
"Failed to build results from %s: %s", index_path, exc
)
# Deduplicate by path
path_to_result: Dict[str, SearchResult] = {}
for result in results:
if result.path not in path_to_result or result.score > path_to_result[result.path].score:
path_to_result[result.path] = result
final_results = sorted(
path_to_result.values(),
key=lambda r: r.score,
reverse=True,
)
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def _cross_encoder_rerank(
self,
query: str,
results: List[SearchResult],
top_k: int,
) -> List[SearchResult]:
"""Rerank results using cross-encoder model.
Args:
query: Search query string
results: Candidate results to rerank
top_k: Number of top results to return
Returns:
Reranked results sorted by cross-encoder score
"""
if not results:
return []
# Try to get reranker from config or create new one
reranker = None
try:
from codexlens.semantic.reranker import (
check_reranker_available,
get_reranker,
)
# Determine backend and model from config
backend = "onnx"
model_name = None
use_gpu = True
if self._config is not None:
backend = getattr(self._config, "reranker_backend", "onnx") or "onnx"
model_name = getattr(self._config, "reranker_model", None)
use_gpu = getattr(self._config, "embedding_use_gpu", True)
ok, err = check_reranker_available(backend)
if not ok:
self.logger.debug("Reranker backend unavailable (%s): %s", backend, err)
return results[:top_k]
# Create reranker
kwargs = {}
if backend == "onnx":
kwargs["use_gpu"] = use_gpu
elif backend == "api":
# Pass max_input_tokens for adaptive batching
max_tokens = getattr(self._config, "reranker_max_input_tokens", None)
if max_tokens:
kwargs["max_input_tokens"] = max_tokens
reranker = get_reranker(backend=backend, model_name=model_name, **kwargs)
except ImportError as exc:
self.logger.debug("Reranker not available: %s", exc)
return results[:top_k]
except Exception as exc:
self.logger.debug("Failed to initialize reranker: %s", exc)
return results[:top_k]
# Use cross_encoder_rerank from ranking module
from codexlens.search.ranking import cross_encoder_rerank
# Get chunk_type weights and test_file_penalty from config
chunk_type_weights = None
test_file_penalty = 0.0
if self._config is not None:
chunk_type_weights = getattr(self._config, "reranker_chunk_type_weights", None)
test_file_penalty = getattr(self._config, "reranker_test_file_penalty", 0.0)
return cross_encoder_rerank(
query=query,
results=results,
reranker=reranker,
top_k=top_k,
batch_size=32,
chunk_type_weights=chunk_type_weights,
test_file_penalty=test_file_penalty,
)
def search_files_only(self, query: str,
source_path: Path,
options: Optional[SearchOptions] = None) -> List[str]:
"""Search and return only matching file paths.
Faster than full search when excerpts are not needed.
Args:
query: FTS5 search query string
source_path: Starting directory path
options: Search configuration (uses defaults if None)
Returns:
List of file paths as strings
Examples:
>>> engine = ChainSearchEngine(registry, mapper)
>>> paths = engine.search_files_only("TODO", Path("D:/project"))
>>> print(f"Found {len(paths)} files with TODOs")
"""
options = options or SearchOptions()
options.files_only = True
result = self.search(query, source_path, options)
return [r.path for r in result.results]
def search_symbols(self, name: str,
source_path: Path,
kind: Optional[str] = None,
options: Optional[SearchOptions] = None) -> List[Symbol]:
"""Chain symbol search across directory hierarchy.
Args:
name: Symbol name pattern (partial match supported)
source_path: Starting directory path
kind: Optional symbol kind filter (e.g., 'function', 'class')
options: Search configuration (uses defaults if None)
Returns:
List of Symbol objects sorted by name
Examples:
>>> engine = ChainSearchEngine(registry, mapper)
>>> funcs = engine.search_symbols("init", Path("D:/project"), kind="function")
>>> for sym in funcs[:10]:
... print(f"{sym.name} ({sym.kind}): lines {sym.range}")
"""
options = options or SearchOptions()
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
return []
# Fast path: project-wide global symbol index (avoids chain traversal).
if self._config is None or getattr(self._config, "global_symbol_index_enabled", True):
try:
# Avoid relying on index_to_source() here; use the same logic as _find_start_index
# to determine the effective search root directory.
search_root = source_path.resolve()
exact_index = self.mapper.source_to_index_db(search_root)
if not exact_index.exists():
nearest = self.registry.find_nearest_index(search_root)
if nearest:
search_root = nearest.source_path
project = self.registry.find_by_source_path(str(search_root))
if project:
global_db_path = Path(project["index_root"]) / GlobalSymbolIndex.DEFAULT_DB_NAME
if global_db_path.exists():
query_limit = max(int(options.total_limit) * 10, int(options.total_limit))
with GlobalSymbolIndex(global_db_path, project_id=int(project["id"])) as global_index:
candidates = global_index.search(name=name, kind=kind, limit=query_limit)
# Apply depth constraint relative to the start index directory.
filtered: List[Symbol] = []
for sym in candidates:
if not sym.file:
continue
try:
root_str = str(search_root)
file_dir_str = str(Path(sym.file).parent)
# Normalize Windows long-path prefix (\\?\) if present.
if root_str.startswith("\\\\?\\"):
root_str = root_str[4:]
if file_dir_str.startswith("\\\\?\\"):
file_dir_str = file_dir_str[4:]
root_cmp = root_str.lower().rstrip("\\/")
dir_cmp = file_dir_str.lower().rstrip("\\/")
# Guard against Windows cross-drive comparisons (ValueError).
if os.name == "nt":
root_drive, _ = os.path.splitdrive(root_cmp)
dir_drive, _ = os.path.splitdrive(dir_cmp)
if root_drive and dir_drive and root_drive != dir_drive:
self.logger.debug(
"Skipping symbol due to cross-drive path (root=%s file=%s name=%s)",
root_cmp,
sym.file,
sym.name,
)
continue
if os.path.commonpath([root_cmp, dir_cmp]) != root_cmp:
continue
rel = os.path.relpath(dir_cmp, root_cmp)
rel_depth = 0 if rel == "." else len(rel.split(os.sep))
except ValueError as exc:
self.logger.debug(
"Skipping symbol due to path operation failure (root=%s file=%s name=%s): %s",
str(search_root),
sym.file,
sym.name,
exc,
)
continue
except Exception as exc:
self.logger.debug(
"Skipping symbol due to unexpected path error (root=%s file=%s name=%s): %s",
str(search_root),
sym.file,
sym.name,
exc,
)
continue
if options.depth >= 0 and rel_depth > options.depth:
continue
filtered.append(sym)
if filtered:
# Match existing semantics: dedupe by (name, kind, range), sort by name.
seen = set()
unique_symbols: List[Symbol] = []
for sym in filtered:
key = (sym.name, sym.kind, sym.range)
if key in seen:
continue
seen.add(key)
unique_symbols.append(sym)
unique_symbols.sort(key=lambda s: s.name)
return unique_symbols[: options.total_limit]
except Exception as exc:
self.logger.debug("Global symbol index fast path failed: %s", exc)
index_paths = self._collect_index_paths(start_index, options.depth)
if not index_paths:
return []
return self._search_symbols_parallel(
index_paths, name, kind, options.total_limit
)
def search_references(
self,
symbol_name: str,
source_path: Optional[Path] = None,
depth: int = -1,
limit: int = 100,
) -> List[ReferenceResult]:
"""Find all references to a symbol across the project.
Searches the code_relationships table in all index databases to find
where the given symbol is referenced (called, imported, inherited, etc.).
Args:
symbol_name: Fully qualified or simple name of the symbol to find references to
source_path: Starting path for search (default: workspace root from registry)
depth: Search depth (-1 = unlimited, 0 = current dir only)
limit: Maximum results to return (default 100)
Returns:
List of ReferenceResult objects sorted by file path and line number
Examples:
>>> engine = ChainSearchEngine(registry, mapper)
>>> refs = engine.search_references("authenticate", Path("D:/project/src"))
>>> for ref in refs[:10]:
... print(f"{ref.file_path}:{ref.line} ({ref.relationship_type})")
"""
import sqlite3
from concurrent.futures import as_completed
# Determine starting path
if source_path is None:
# Try to get workspace root from registry
mappings = self.registry.list_mappings()
if mappings:
source_path = Path(mappings[0].source_path)
else:
self.logger.warning("No source path provided and no mappings in registry")
return []
# Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
return []
# Collect all index paths
index_paths = self._collect_index_paths(start_index, depth)
if not index_paths:
self.logger.debug(f"No indexes collected from {start_index}")
return []
self.logger.debug(
"Searching %d indexes for references to '%s'",
len(index_paths), symbol_name
)
# Search in parallel
all_results: List[ReferenceResult] = []
executor = self._get_executor()
def search_single_index(index_path: Path) -> List[ReferenceResult]:
"""Search a single index for references."""
results: List[ReferenceResult] = []
try:
conn = sqlite3.connect(str(index_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
# Query code_relationships for references to this symbol
# Match either target_qualified_name containing the symbol name
# or an exact match on the last component
# Try full_path first (new schema), fallback to path (old schema)
try:
rows = conn.execute(
"""
SELECT DISTINCT
f.full_path as source_file,
cr.source_line,
cr.relationship_type,
f.content
FROM code_relationships cr
JOIN symbols s ON s.id = cr.source_symbol_id
JOIN files f ON f.id = s.file_id
WHERE cr.target_qualified_name LIKE ?
OR cr.target_qualified_name LIKE ?
OR cr.target_qualified_name = ?
ORDER BY f.full_path, cr.source_line
LIMIT ?
""",
(
f"%{symbol_name}", # Ends with symbol name
f"%.{symbol_name}", # Qualified name ending with .symbol_name
symbol_name, # Exact match
limit,
)
).fetchall()
except sqlite3.OperationalError:
# Fallback for old schema with 'path' column
rows = conn.execute(
"""
SELECT DISTINCT
f.path as source_file,
cr.source_line,
cr.relationship_type,
f.content
FROM code_relationships cr
JOIN symbols s ON s.id = cr.source_symbol_id
JOIN files f ON f.id = s.file_id
WHERE cr.target_qualified_name LIKE ?
OR cr.target_qualified_name LIKE ?
OR cr.target_qualified_name = ?
ORDER BY f.path, cr.source_line
LIMIT ?
""",
(
f"%{symbol_name}", # Ends with symbol name
f"%.{symbol_name}", # Qualified name ending with .symbol_name
symbol_name, # Exact match
limit,
)
).fetchall()
for row in rows:
file_path = row["source_file"]
line = row["source_line"] or 1
rel_type = row["relationship_type"]
content = row["content"] or ""
# Extract context (3 lines around reference)
context = self._extract_context(content, line, context_lines=3)
results.append(ReferenceResult(
file_path=file_path,
line=line,
column=0, # Column info not stored in code_relationships
context=context,
relationship_type=rel_type,
))
conn.close()
except sqlite3.DatabaseError as exc:
self.logger.debug(
"Failed to search references in %s: %s", index_path, exc
)
except Exception as exc:
self.logger.debug(
"Unexpected error searching references in %s: %s", index_path, exc
)
return results
# Submit parallel searches
futures = {
executor.submit(search_single_index, idx_path): idx_path
for idx_path in index_paths
}
for future in as_completed(futures):
try:
results = future.result()
all_results.extend(results)
except Exception as exc:
idx_path = futures[future]
self.logger.debug(
"Reference search failed for %s: %s", idx_path, exc
)
# Deduplicate by (file_path, line)
seen: set = set()
unique_results: List[ReferenceResult] = []
for ref in all_results:
key = (ref.file_path, ref.line)
if key not in seen:
seen.add(key)
unique_results.append(ref)
# Sort by file path and line
unique_results.sort(key=lambda r: (r.file_path, r.line))
# Apply limit
return unique_results[:limit]
def _extract_context(
self,
content: str,
line: int,
context_lines: int = 3
) -> str:
"""Extract lines around a given line number from file content.
Args:
content: Full file content
line: Target line number (1-based)
context_lines: Number of lines to include before and after
Returns:
Context snippet as a string
"""
if not content:
return ""
lines = content.splitlines()
total_lines = len(lines)
if line < 1 or line > total_lines:
return ""
# Calculate range (0-indexed internally)
start = max(0, line - 1 - context_lines)
end = min(total_lines, line + context_lines)
context = lines[start:end]
return "\n".join(context)
# === Internal Methods ===
def _find_start_index(self, source_path: Path) -> Optional[Path]:
"""Find index database path for source directory.
Attempts exact match first, then searches for nearest ancestor index.
Args:
source_path: Source directory path
Returns:
Path to _index.db file, or None if not found
"""
source_path = source_path.resolve()
# Try exact match first
exact_index = self.mapper.source_to_index_db(source_path)
if exact_index.exists():
self.logger.debug(f"Found exact index: {exact_index}")
return exact_index
# Try nearest ancestor via registry
nearest = self.registry.find_nearest_index(source_path)
if nearest:
self.logger.debug(f"Found nearest index: {nearest.index_path}")
return nearest.index_path
self.logger.warning(f"No index found for {source_path}")
return None
def _collect_index_paths(self, start_index: Path,
depth: int) -> List[Path]:
"""Recursively collect all subdirectory index paths.
Traverses directory tree via subdirs table in each _index.db,
respecting depth limit.
Args:
start_index: Starting _index.db path
depth: Maximum depth (-1 = unlimited, 0 = current only)
Returns:
List of _index.db paths to search
"""
collected = []
visited = set()
def _collect_recursive(index_path: Path, current_depth: int):
# Normalize path to avoid duplicates
normalized = index_path.resolve()
if normalized in visited:
return
visited.add(normalized)
# Add current index
if normalized.exists():
collected.append(normalized)
else:
self.logger.debug(f"Index does not exist: {normalized}")
return
# Check depth limit
if depth >= 0 and current_depth >= depth:
return
# Read subdirs and recurse
try:
with DirIndexStore(normalized) as store:
subdirs = store.get_subdirs()
for subdir in subdirs:
_collect_recursive(subdir.index_path, current_depth + 1)
except Exception as exc:
self.logger.warning(f"Failed to read subdirs from {normalized}: {exc}")
_collect_recursive(start_index, 0)
self.logger.info(f"Collected {len(collected)} indexes (depth={depth})")
return collected
def _search_parallel(self, index_paths: List[Path],
query: str,
options: SearchOptions) -> tuple[List[SearchResult], SearchStats]:
"""Search multiple indexes in parallel using shared ThreadPoolExecutor.
Args:
index_paths: List of _index.db paths to search
query: FTS5 query string
options: Search configuration
Returns:
Tuple of (all results, search statistics)
"""
all_results = []
stats = SearchStats()
# Force single-threaded execution for vector/hybrid search to avoid GPU crashes
# DirectML/ONNX have threading issues when multiple threads access GPU resources
effective_workers = options.max_workers
if options.enable_vector or options.hybrid_mode:
effective_workers = 1
self.logger.debug("Using single-threaded mode for vector search (GPU safety)")
# Pre-load embedder to avoid initialization overhead per-search
try:
from codexlens.semantic.embedder import get_embedder
get_embedder(profile="code", use_gpu=True)
except Exception:
pass # Ignore pre-load failures
executor = self._get_executor(effective_workers)
# Submit all search tasks
future_to_path = {
executor.submit(
self._search_single_index,
idx_path,
query,
options.limit_per_dir,
options.files_only,
options.include_semantic,
options.hybrid_mode,
options.enable_fuzzy,
options.enable_vector,
options.pure_vector,
options.enable_splade,
options.hybrid_weights
): idx_path
for idx_path in index_paths
}
# Collect results as they complete
for future in as_completed(future_to_path):
idx_path = future_to_path[future]
try:
results = future.result()
all_results.extend(results)
self.logger.debug(f"Got {len(results)} results from {idx_path.parent.name}")
except Exception as exc:
error_msg = f"Search failed for {idx_path}: {exc}"
self.logger.error(error_msg)
stats.errors.append(error_msg)
return all_results, stats
def _search_single_index(self, index_path: Path,
query: str,
limit: int,
files_only: bool = False,
include_semantic: bool = False,
hybrid_mode: bool = False,
enable_fuzzy: bool = True,
enable_vector: bool = False,
pure_vector: bool = False,
enable_splade: bool = False,
hybrid_weights: Optional[Dict[str, float]] = None) -> List[SearchResult]:
"""Search a single index database.
Handles exceptions gracefully, returning empty list on failure.
Args:
index_path: Path to _index.db file
query: FTS5 query string (for FTS) or natural language query (for vector)
limit: Maximum results from this index
files_only: If True, skip snippet generation for faster search
include_semantic: If True, also search semantic keywords and merge results
hybrid_mode: If True, use hybrid search with RRF fusion
enable_fuzzy: Enable fuzzy FTS in hybrid mode
enable_vector: Enable vector semantic search
pure_vector: If True, only use vector search without FTS fallback
enable_splade: If True, force SPLADE sparse neural search
hybrid_weights: Custom RRF weights for hybrid search
Returns:
List of SearchResult objects (empty on error)
"""
try:
# Use hybrid search if enabled
if hybrid_mode:
hybrid_engine = HybridSearchEngine(weights=hybrid_weights)
fts_results = hybrid_engine.search(
index_path,
query,
limit=limit,
enable_fuzzy=enable_fuzzy,
enable_vector=enable_vector,
pure_vector=pure_vector,
enable_splade=enable_splade,
)
else:
# Single-FTS search (exact or fuzzy mode)
with DirIndexStore(index_path) as store:
# Get FTS results
if files_only:
# Fast path: return paths only without snippets
paths = store.search_files_only(query, limit=limit)
fts_results = [SearchResult(path=p, score=0.0, excerpt="") for p in paths]
else:
# Use fuzzy FTS if enable_fuzzy=True (mode="fuzzy"), otherwise exact FTS
if enable_fuzzy:
fts_results = store.search_fts_fuzzy(
query, limit=limit, return_full_content=True
)
else:
fts_results = store.search_fts_exact(
query, limit=limit, return_full_content=True
)
# Optionally add semantic keyword results
if include_semantic:
try:
semantic_matches = store.search_semantic_keywords(query)
# Convert semantic matches to SearchResult with 0.8x weight
for file_entry, keywords in semantic_matches:
# Create excerpt from keywords
excerpt = f"Keywords: {', '.join(keywords[:5])}"
# Use a base score of 10.0 for semantic matches, weighted by 0.8
semantic_result = SearchResult(
path=str(file_entry.full_path),
score=10.0 * 0.8,
excerpt=excerpt
)
fts_results.append(semantic_result)
except Exception as sem_exc:
self.logger.debug(f"Semantic search error in {index_path}: {sem_exc}")
return fts_results
except Exception as exc:
self.logger.debug(f"Search error in {index_path}: {exc}")
return []
def _filter_by_extension(self, results: List[SearchResult],
code_only: bool = False,
exclude_extensions: Optional[List[str]] = None) -> List[SearchResult]:
"""Filter search results by file extension.
Args:
results: Search results to filter
code_only: If True, exclude non-code files (md, txt, json, yaml, xml, etc.)
exclude_extensions: List of extensions to exclude (e.g., ["md", "txt"])
Returns:
Filtered results
"""
# Non-code file extensions (same as MCP tool smart-search.ts)
NON_CODE_EXTENSIONS = {
'md', 'txt', 'json', 'yaml', 'yml', 'xml', 'csv', 'log',
'ini', 'cfg', 'conf', 'toml', 'env', 'properties',
'html', 'htm', 'svg', 'png', 'jpg', 'jpeg', 'gif', 'ico', 'webp',
'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx',
'lock', 'sum', 'mod',
}
# Build exclusion set
excluded_exts = set()
if exclude_extensions:
# Normalize extensions (remove leading dots, lowercase)
excluded_exts = {ext.lower().lstrip('.') for ext in exclude_extensions}
if code_only:
excluded_exts.update(NON_CODE_EXTENSIONS)
if not excluded_exts:
return results
# Filter results
filtered = []
for result in results:
path_str = result.path
if not path_str:
continue
# Extract extension from path
if '.' in path_str:
ext = path_str.rsplit('.', 1)[-1].lower()
if ext in excluded_exts:
continue # Skip this result
filtered.append(result)
return filtered
def _merge_and_rank(self, results: List[SearchResult],
limit: int, offset: int = 0) -> List[SearchResult]:
"""Aggregate, deduplicate, and rank results.
Process:
1. Deduplicate by path (keep highest score)
2. Sort by score descending
3. Apply offset and limit for pagination
Args:
results: Raw results from all indexes
limit: Maximum results to return
offset: Number of results to skip (pagination offset)
Returns:
Deduplicated and ranked results with pagination
"""
# Deduplicate by path, keeping best score
path_to_result: Dict[str, SearchResult] = {}
for result in results:
path = result.path
if path not in path_to_result or result.score > path_to_result[path].score:
path_to_result[path] = result
# Sort by score descending
unique_results = list(path_to_result.values())
unique_results.sort(key=lambda r: r.score, reverse=True)
# Apply offset and limit for pagination
return unique_results[offset:offset + limit]
def _search_symbols_parallel(self, index_paths: List[Path],
name: str,
kind: Optional[str],
limit: int) -> List[Symbol]:
"""Search symbols across multiple indexes in parallel.
Args:
index_paths: List of _index.db paths to search
name: Symbol name pattern
kind: Optional symbol kind filter
limit: Total symbol limit
Returns:
Deduplicated and sorted symbols
"""
all_symbols = []
executor = self._get_executor()
# Submit all symbol search tasks
future_to_path = {
executor.submit(
self._search_symbols_single,
idx_path,
name,
kind
): idx_path
for idx_path in index_paths
}
# Collect results
for future in as_completed(future_to_path):
try:
symbols = future.result()
all_symbols.extend(symbols)
except Exception as exc:
self.logger.error(f"Symbol search failed: {exc}")
# Deduplicate by (name, kind, range)
seen = set()
unique_symbols = []
for sym in all_symbols:
key = (sym.name, sym.kind, sym.range)
if key not in seen:
seen.add(key)
unique_symbols.append(sym)
# Sort by name
unique_symbols.sort(key=lambda s: s.name)
return unique_symbols[:limit]
def _search_symbols_single(self, index_path: Path,
name: str,
kind: Optional[str]) -> List[Symbol]:
"""Search symbols in a single index.
Args:
index_path: Path to _index.db file
name: Symbol name pattern
kind: Optional symbol kind filter
Returns:
List of Symbol objects (empty on error)
"""
try:
with DirIndexStore(index_path) as store:
return store.search_symbols(name, kind=kind)
except Exception as exc:
self.logger.debug(f"Symbol search error in {index_path}: {exc}")
return []
# === Convenience Functions ===
def quick_search(query: str,
source_path: Path,
depth: int = -1) -> List[SearchResult]:
"""Quick search convenience function with automatic initialization.
Creates temporary registry and mapper instances for one-off searches.
For repeated searches, create a ChainSearchEngine instance directly.
Args:
query: FTS5 search query string
source_path: Starting directory path
depth: Maximum search depth (-1 = unlimited)
Returns:
List of SearchResult objects sorted by relevance
Examples:
>>> from pathlib import Path
>>> results = quick_search("authentication", Path("D:/project/src"))
>>> print(f"Found {len(results)} matches")
"""
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
with ChainSearchEngine(registry, mapper) as engine:
options = SearchOptions(depth=depth)
result = engine.search(query, source_path, options)
registry.close()
return result.results