feat: Add code analysis and LLM action templates with detailed configurations and examples

- Introduced a comprehensive code analysis action template for integrating code exploration and analysis capabilities.
- Added LLM action template for seamless integration of LLM calls with customizable prompts and tools.
- Implemented a benchmark search script to compare multiple search methods across various dimensions including speed, result quality, ranking stability, and coverage.
- Provided preset configurations for common analysis tasks and LLM actions, enhancing usability and flexibility.
This commit is contained in:
catlog22
2026-01-03 17:37:49 +08:00
parent 6a45035e3f
commit be498acf59
10 changed files with 3076 additions and 217 deletions

View File

@@ -432,75 +432,55 @@ def search(
limit: int = typer.Option(20, "--limit", "-n", min=1, max=500, help="Max results."),
depth: int = typer.Option(-1, "--depth", "-d", help="Search depth (-1 = unlimited, 0 = current only)."),
files_only: bool = typer.Option(False, "--files-only", "-f", help="Return only file paths without content snippets."),
method: str = typer.Option("hybrid", "--method", "-m", help="Search method: fts, vector, splade, hybrid, cascade."),
method: str = typer.Option("dense_rerank", "--method", "-m", help="Search method: 'dense_rerank' (semantic, default), 'fts' (exact keyword)."),
use_fuzzy: bool = typer.Option(False, "--use-fuzzy", help="Enable fuzzy matching in FTS method."),
# Hidden advanced options for backward compatibility
weights: Optional[str] = typer.Option(
None,
"--weights", "-w",
help="RRF weights as key=value pairs (e.g., 'splade=0.4,vector=0.6' or 'fts=0.4,vector=0.6'). Default: auto-detect based on available backends."
hidden=True,
help="[Advanced] RRF weights as key=value pairs."
),
cascade_strategy: Optional[str] = typer.Option(
None,
"--cascade-strategy",
help="Cascade search strategy: 'binary' (fast binary+dense) or 'hybrid' (FTS+cross-encoder). Only used with --method cascade."
hidden=True,
help="[Advanced] Cascade strategy for --method cascade."
),
# Hidden deprecated parameter for backward compatibility
mode: Optional[str] = typer.Option(None, "--mode", hidden=True, help="[DEPRECATED] Use --method instead."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Search indexed file contents using hybrid retrieval.
"""Search indexed file contents.
Uses chain search across directory indexes.
Use --depth to limit search recursion (0 = current dir only).
Search Methods:
- fts: Full-text search using FTS5 (unicode61 tokenizer). Use --use-fuzzy for typo tolerance.
- vector: Pure semantic vector search - for natural language queries.
- splade: SPLADE sparse neural search - semantic term expansion.
- hybrid: RRF fusion of sparse + dense search (default) - best recall.
- cascade: Two-stage retrieval (binary coarse + dense rerank) - fast + accurate.
- dense_rerank (default): Semantic search using Dense embedding coarse retrieval +
Cross-encoder reranking. Best for natural language queries and code understanding.
- fts: Full-text search using FTS5 (unicode61 tokenizer). Best for exact code
identifiers like function/class names. Use --use-fuzzy for typo tolerance.
Method Selection Guide:
- Code identifiers (function/class names): fts
- Natural language queries: vector or hybrid
- Natural language queries: dense_rerank (default)
- Typo-tolerant search: fts --use-fuzzy
- Best overall quality: hybrid (default)
- Large codebase performance: cascade
Vector Search Requirements:
Vector, hybrid, and cascade methods require pre-generated embeddings.
Requirements:
The dense_rerank method requires pre-generated embeddings.
Use 'codexlens embeddings-generate' to create embeddings first.
Hybrid Mode Weights:
Use --weights to adjust RRF fusion weights:
- SPLADE mode: 'splade=0.4,vector=0.6' (default)
- FTS mode: 'fts=0.4,vector=0.6' (default)
Examples:
# Default hybrid search
codexlens search "authentication"
# Default semantic search (dense_rerank)
codexlens search "authentication logic"
# Exact code identifier search
codexlens search "authenticate_user" --method fts
# Typo-tolerant fuzzy search
codexlens search "authentcate" --method fts --use-fuzzy
# Pure semantic search
codexlens search "how to verify user credentials" --method vector
# SPLADE sparse neural search
codexlens search "user login flow" --method splade
# Fast cascade retrieval for large codebases (binary strategy)
codexlens search "authentication" --method cascade
# Cascade with cross-encoder reranking (hybrid strategy)
codexlens search "authentication" --method cascade --cascade-strategy hybrid
# Hybrid with custom weights
codexlens search "authentication" --method hybrid --weights splade=0.5,vector=0.5
"""
_configure_logging(verbose, json_mode)
search_path = path.expanduser().resolve()
@@ -538,29 +518,33 @@ def search(
# Configure search (load settings from file)
config = Config.load()
# Validate method
valid_methods = ["fts", "vector", "splade", "hybrid", "cascade"]
# Validate method - simplified interface exposes only dense_rerank and fts
# Other methods (vector, splade, hybrid, cascade) are hidden but still work for backward compatibility
valid_methods = ["fts", "dense_rerank", "vector", "splade", "hybrid", "cascade"]
if actual_method not in valid_methods:
if json_mode:
print_json(success=False, error=f"Invalid method: {actual_method}. Must be one of: {', '.join(valid_methods)}")
print_json(success=False, error=f"Invalid method: {actual_method}. Use 'dense_rerank' (semantic) or 'fts' (exact keyword).")
else:
console.print(f"[red]Invalid method:[/red] {actual_method}")
console.print(f"[dim]Valid methods: {', '.join(valid_methods)}[/dim]")
console.print("[dim]Use 'dense_rerank' (semantic, default) or 'fts' (exact keyword)[/dim]")
raise typer.Exit(code=1)
# Validate cascade_strategy if provided
if cascade_strategy is not None:
valid_strategies = ["binary", "hybrid"]
if cascade_strategy not in valid_strategies:
# Map dense_rerank to cascade method internally
internal_cascade_strategy = cascade_strategy
if actual_method == "dense_rerank":
actual_method = "cascade"
internal_cascade_strategy = "dense_rerank"
# Validate cascade_strategy if provided (for advanced users)
if internal_cascade_strategy is not None:
valid_strategies = ["binary", "hybrid", "binary_rerank", "dense_rerank"]
if internal_cascade_strategy not in valid_strategies:
if json_mode:
print_json(success=False, error=f"Invalid cascade strategy: {cascade_strategy}. Must be one of: {', '.join(valid_strategies)}")
print_json(success=False, error=f"Invalid cascade strategy: {internal_cascade_strategy}. Must be one of: {', '.join(valid_strategies)}")
else:
console.print(f"[red]Invalid cascade strategy:[/red] {cascade_strategy}")
console.print(f"[red]Invalid cascade strategy:[/red] {internal_cascade_strategy}")
console.print(f"[dim]Valid strategies: {', '.join(valid_strategies)}[/dim]")
raise typer.Exit(code=1)
# Warn if using cascade_strategy with non-cascade method
if actual_method != "cascade" and not json_mode:
console.print(f"[yellow]Warning: --cascade-strategy is only effective with --method cascade[/yellow]")
# Parse custom weights if provided
hybrid_weights = None
@@ -693,7 +677,7 @@ def search(
else:
# Dispatch to cascade_search for cascade method
if actual_method == "cascade":
result = engine.cascade_search(query, search_path, k=limit, options=options, strategy=cascade_strategy)
result = engine.cascade_search(query, search_path, k=limit, options=options, strategy=internal_cascade_strategy)
else:
result = engine.search(query, search_path, options)
results_list = [

View File

@@ -164,6 +164,10 @@ class Config:
embedding_strategy: str = "latency_aware" # round_robin, latency_aware, weighted_random
embedding_cooldown: float = 60.0 # Default cooldown seconds for rate-limited endpoints
# API concurrency settings
api_max_workers: int = 4 # Max concurrent API calls for embedding/reranking
api_batch_size: int = 8 # Batch size for API requests
def __post_init__(self) -> None:
try:
self.data_dir = self.data_dir.expanduser().resolve()
@@ -276,6 +280,10 @@ class Config:
"coarse_k": self.cascade_coarse_k,
"fine_k": self.cascade_fine_k,
},
"api": {
"max_workers": self.api_max_workers,
"batch_size": self.api_batch_size,
},
}
with open(self.settings_path, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=2)
@@ -348,11 +356,11 @@ class Config:
cascade = settings.get("cascade", {})
if "strategy" in cascade:
strategy = cascade["strategy"]
if strategy in {"binary", "hybrid"}:
if strategy in {"binary", "hybrid", "binary_rerank", "dense_rerank"}:
self.cascade_strategy = strategy
else:
log.warning(
"Invalid cascade strategy in %s: %r (expected 'binary' or 'hybrid')",
"Invalid cascade strategy in %s: %r (expected 'binary', 'hybrid', 'binary_rerank', or 'dense_rerank')",
self.settings_path,
strategy,
)
@@ -360,6 +368,13 @@ class Config:
self.cascade_coarse_k = cascade["coarse_k"]
if "fine_k" in cascade:
self.cascade_fine_k = cascade["fine_k"]
# Load API settings
api = settings.get("api", {})
if "max_workers" in api:
self.api_max_workers = api["max_workers"]
if "batch_size" in api:
self.api_batch_size = api["batch_size"]
except Exception as exc:
log.warning(
"Failed to load settings from %s (%s): %s",

View File

@@ -797,13 +797,15 @@ class ChainSearchEngine:
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
strategy: Optional[Literal["binary", "hybrid"]] = None,
strategy: Optional[Literal["binary", "hybrid", "binary_rerank", "dense_rerank"]] = None,
) -> ChainSearchResult:
"""Unified cascade search entry point with strategy selection.
Provides a single interface for cascade search with configurable strategy:
- "binary": Uses binary vector coarse ranking + dense fine ranking (faster)
- "binary": Uses binary vector coarse ranking + dense fine ranking (fastest)
- "hybrid": Uses FTS+SPLADE+Vector coarse ranking + cross-encoder reranking (original)
- "binary_rerank": Uses binary vector coarse ranking + cross-encoder reranking (best balance)
- "dense_rerank": Uses dense vector coarse ranking + cross-encoder reranking
The strategy is determined with the following priority:
1. The `strategy` parameter (e.g., from CLI --cascade-strategy option)
@@ -816,36 +818,585 @@ class ChainSearchEngine:
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
strategy: Cascade strategy - "binary" or "hybrid". Overrides config if provided.
strategy: Cascade strategy - "binary", "hybrid", or "binary_rerank".
Returns:
ChainSearchResult with reranked results and statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper, config=config)
>>> # Use binary cascade (default, faster)
>>> # Use binary cascade (default, fastest)
>>> result = engine.cascade_search("auth", Path("D:/project"))
>>> # Use hybrid cascade (original behavior)
>>> result = engine.cascade_search("auth", Path("D:/project"), strategy="hybrid")
>>> # Use binary + cross-encoder (best balance of speed and quality)
>>> result = engine.cascade_search("auth", Path("D:/project"), strategy="binary_rerank")
"""
# Strategy priority: parameter > config > default
effective_strategy = strategy
valid_strategies = ("binary", "hybrid", "binary_rerank", "dense_rerank")
if effective_strategy is None:
# Not passed via parameter, check config
if self._config is not None:
config_strategy = getattr(self._config, "cascade_strategy", None)
if config_strategy in ("binary", "hybrid"):
if config_strategy in valid_strategies:
effective_strategy = config_strategy
# If still not set, apply default
if effective_strategy not in ("binary", "hybrid"):
if effective_strategy not in valid_strategies:
effective_strategy = "binary"
if effective_strategy == "binary":
return self.binary_cascade_search(query, source_path, k, coarse_k, options)
elif effective_strategy == "binary_rerank":
return self.binary_rerank_cascade_search(query, source_path, k, coarse_k, options)
elif effective_strategy == "dense_rerank":
return self.dense_rerank_cascade_search(query, source_path, k, coarse_k, options)
else:
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
def binary_rerank_cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
) -> ChainSearchResult:
"""Execute binary cascade search with cross-encoder reranking.
Combines the speed of binary vector coarse search with the quality of
cross-encoder reranking for the best balance of speed and accuracy.
Binary + Reranker cascade process:
1. Stage 1 (Coarse): Fast binary vector search using Hamming distance
to quickly filter to coarse_k candidates (256-dim binary, 32 bytes/vector)
2. Stage 2 (Fine): Cross-encoder reranking for precise semantic ranking
of candidates using query-document attention
This approach is typically faster than hybrid_cascade_search while
achieving similar or better quality through cross-encoder reranking.
Performance characteristics:
- Binary search: O(N) with SIMD-accelerated XOR + popcount (~8ms)
- Cross-encoder: Applied to top coarse_k candidates (~15-20s for API)
- Total: Faster coarse + high-quality fine = best balance
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with cross-encoder reranked results and statistics
Examples:
>>> engine = ChainSearchEngine(registry, mapper, config=config)
>>> result = engine.binary_rerank_cascade_search(
... "how to authenticate users",
... Path("D:/project/src"),
... k=10,
... coarse_k=100
... )
>>> for r in result.results:
... print(f"{r.path}: {r.score:.3f}")
"""
if not NUMPY_AVAILABLE:
self.logger.warning(
"NumPy not available, falling back to hybrid cascade search"
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Use config defaults if available
if self._config is not None:
if hasattr(self._config, "cascade_coarse_k"):
coarse_k = coarse_k or self._config.cascade_coarse_k
if hasattr(self._config, "cascade_fine_k"):
k = k or self._config.cascade_fine_k
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Initialize binary embedding backend
try:
from codexlens.indexing.embedding import BinaryEmbeddingBackend
except ImportError as exc:
self.logger.warning(
"BinaryEmbeddingBackend not available: %s, falling back to hybrid cascade",
exc
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Step 4: Binary coarse search (same as binary_cascade_search)
binary_coarse_time = time.time()
coarse_candidates: List[Tuple[int, int, Path]] = []
# Try centralized BinarySearcher first (preferred for mmap indexes)
# The index root is the parent of the first index path
index_root = index_paths[0].parent if index_paths else None
used_centralized = False
if index_root:
binary_searcher = self._get_centralized_binary_searcher(index_root)
if binary_searcher is not None:
try:
# BinarySearcher expects dense vector, not packed binary
from codexlens.semantic.embedder import Embedder
embedder = Embedder()
query_dense = embedder.embed_to_numpy([query])[0]
results = binary_searcher.search(query_dense, top_k=coarse_k)
for chunk_id, distance in results:
coarse_candidates.append((chunk_id, distance, index_root))
# Only mark as used if we got actual results
if coarse_candidates:
used_centralized = True
self.logger.debug(
"Binary coarse search (centralized): %d candidates in %.2fms",
len(results), (time.time() - binary_coarse_time) * 1000
)
except Exception as exc:
self.logger.debug(f"Centralized binary search failed: {exc}")
if not used_centralized:
# Get GPU preference from config
use_gpu = True
if self._config is not None:
use_gpu = getattr(self._config, "embedding_use_gpu", True)
try:
binary_backend = BinaryEmbeddingBackend(use_gpu=use_gpu)
query_binary = binary_backend.embed_packed([query])[0]
except Exception as exc:
self.logger.warning(f"Failed to generate binary query embedding: {exc}")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Fallback to per-directory binary indexes
for index_path in index_paths:
try:
binary_index = self._get_or_create_binary_index(index_path)
if binary_index is None or binary_index.count() == 0:
continue
# BinaryANNIndex returns (ids, distances) arrays
ids, distances = binary_index.search(query_binary, coarse_k)
for chunk_id, dist in zip(ids, distances):
coarse_candidates.append((chunk_id, dist, index_path))
except Exception as exc:
self.logger.debug(
"Binary search failed for %s: %s", index_path, exc
)
if not coarse_candidates:
self.logger.info("No binary candidates found, falling back to hybrid cascade for reranking")
# Fall back to hybrid_cascade_search which uses FTS+Vector coarse + cross-encoder rerank
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Sort by Hamming distance and take top coarse_k
coarse_candidates.sort(key=lambda x: x[1])
coarse_candidates = coarse_candidates[:coarse_k]
self.logger.debug(
"Binary coarse search: %d candidates in %.2fms",
len(coarse_candidates), (time.time() - binary_coarse_time) * 1000
)
# Step 5: Build SearchResult objects for cross-encoder reranking
# Group candidates by index path for efficient retrieval
candidates_by_index: Dict[Path, List[int]] = {}
for chunk_id, distance, index_path in coarse_candidates:
if index_path not in candidates_by_index:
candidates_by_index[index_path] = []
candidates_by_index[index_path].append(chunk_id)
# Retrieve chunk content for reranking
# Always use centralized VectorMetadataStore since chunks are stored there
import sqlite3
coarse_results: List[SearchResult] = []
# Find the centralized metadata store path (project root)
# index_root was computed earlier, use it for chunk retrieval
central_meta_path = index_root / VECTORS_META_DB_NAME if index_root else None
central_meta_store = None
if central_meta_path and central_meta_path.exists():
central_meta_store = VectorMetadataStore(central_meta_path)
for index_path, chunk_ids in candidates_by_index.items():
try:
chunks_data = []
if central_meta_store:
# Try centralized VectorMetadataStore first (preferred)
chunks_data = central_meta_store.get_chunks_by_ids(chunk_ids)
if not chunks_data and used_centralized:
# Fallback to per-index-path meta store
meta_db_path = index_path / VECTORS_META_DB_NAME
if meta_db_path.exists():
meta_store = VectorMetadataStore(meta_db_path)
chunks_data = meta_store.get_chunks_by_ids(chunk_ids)
if not chunks_data:
# Final fallback: query semantic_chunks table directly
# This handles per-directory indexes with semantic_chunks table
try:
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
cursor = conn.execute(
f"""
SELECT id, file_path, content, metadata, category
FROM semantic_chunks
WHERE id IN ({placeholders})
""",
chunk_ids
)
chunks_data = [
{
"id": row["id"],
"file_path": row["file_path"],
"content": row["content"],
"metadata": row["metadata"],
"category": row["category"],
}
for row in cursor.fetchall()
]
conn.close()
except Exception:
pass # Skip if table doesn't exist
for chunk in chunks_data:
# Find the Hamming distance for this chunk
chunk_id = chunk.get("id") or chunk.get("chunk_id")
distance = next(
(d for cid, d, _ in coarse_candidates if cid == chunk_id),
256
)
# Initial score from Hamming distance (will be replaced by reranker)
score = 1.0 - (distance / 256.0)
content = chunk.get("content", "")
result = SearchResult(
path=chunk.get("file_path", ""),
score=float(score),
excerpt=content[:500] if content else "",
content=content,
)
coarse_results.append(result)
except Exception as exc:
self.logger.debug(
"Failed to retrieve chunks from %s: %s", index_path, exc
)
if not coarse_results:
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query, results=[], symbols=[], stats=stats
)
self.logger.debug(
"Retrieved %d chunks for cross-encoder reranking", len(coarse_results)
)
# Step 6: Cross-encoder reranking (same as hybrid_cascade_search)
rerank_time = time.time()
reranked_results = self._cross_encoder_rerank(query, coarse_results, top_k=k)
self.logger.debug(
"Cross-encoder reranking: %d results in %.2fms",
len(reranked_results), (time.time() - rerank_time) * 1000
)
# Deduplicate by path (keep highest score)
path_to_result: Dict[str, SearchResult] = {}
for result in reranked_results:
if result.path not in path_to_result or result.score > path_to_result[result.path].score:
path_to_result[result.path] = result
final_results = list(path_to_result.values())[:k]
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
self.logger.debug(
"Binary+Rerank cascade search complete: %d results in %.2fms",
len(final_results),
stats.time_ms,
)
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def dense_rerank_cascade_search(
self,
query: str,
source_path: Path,
k: int = 10,
coarse_k: int = 100,
options: Optional[SearchOptions] = None,
) -> ChainSearchResult:
"""Execute dense cascade search with cross-encoder reranking.
Combines dense vector coarse search (HNSW) with cross-encoder reranking
for comparison with binary_rerank strategy.
Dense + Reranker cascade process:
1. Stage 1 (Coarse): Dense vector search using HNSW (cosine similarity)
to get coarse_k candidates (2048-dim float32)
2. Stage 2 (Fine): Cross-encoder reranking for precise semantic ranking
Args:
query: Natural language or keyword query string
source_path: Starting directory path
k: Number of final results to return (default 10)
coarse_k: Number of coarse candidates from first stage (default 100)
options: Search configuration (uses defaults if None)
Returns:
ChainSearchResult with cross-encoder reranked results and statistics
"""
if not NUMPY_AVAILABLE:
self.logger.warning(
"NumPy not available, falling back to hybrid cascade search"
)
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
options = options or SearchOptions()
start_time = time.time()
stats = SearchStats()
# Use config defaults if available
if self._config is not None:
if hasattr(self._config, "cascade_coarse_k"):
coarse_k = coarse_k or self._config.cascade_coarse_k
if hasattr(self._config, "cascade_fine_k"):
k = k or self._config.cascade_fine_k
# Step 1: Find starting index
start_index = self._find_start_index(source_path)
if not start_index:
self.logger.warning(f"No index found for {source_path}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 2: Collect all index paths
index_paths = self._collect_index_paths(start_index, options.depth)
stats.dirs_searched = len(index_paths)
if not index_paths:
self.logger.warning(f"No indexes collected from {start_index}")
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query,
results=[],
symbols=[],
stats=stats
)
# Step 3: Generate query dense embedding using same model as index
# Read embedding config to match the model used during indexing
dense_coarse_time = time.time()
try:
from codexlens.semantic.factory import get_embedder
# Get embedding settings from config
embedding_backend = "litellm" # Default to API for dense
embedding_model = "qwen3-embedding-sf" # Default model
use_gpu = True
if self._config is not None:
embedding_backend = getattr(self._config, "embedding_backend", "litellm")
embedding_model = getattr(self._config, "embedding_model", "qwen3-embedding-sf")
use_gpu = getattr(self._config, "embedding_use_gpu", True)
# Create embedder matching index configuration
if embedding_backend == "litellm":
embedder = get_embedder(backend="litellm", model=embedding_model)
else:
embedder = get_embedder(backend="fastembed", profile=embedding_model, use_gpu=use_gpu)
query_dense = embedder.embed_to_numpy([query])[0]
self.logger.debug(f"Dense query embedding: {query_dense.shape[0]}-dim via {embedding_backend}/{embedding_model}")
except Exception as exc:
self.logger.warning(f"Failed to generate dense query embedding: {exc}")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Step 4: Dense coarse search using HNSW indexes
coarse_candidates: List[Tuple[int, float, Path]] = [] # (chunk_id, distance, index_path)
index_root = index_paths[0].parent if index_paths else None
for index_path in index_paths:
try:
# Load HNSW index
from codexlens.semantic.ann_index import ANNIndex
ann_index = ANNIndex(index_path, dim=query_dense.shape[0])
if not ann_index.load():
continue
if ann_index.count() == 0:
continue
# Search HNSW index
ids, distances = ann_index.search(query_dense, top_k=coarse_k)
for chunk_id, dist in zip(ids, distances):
coarse_candidates.append((chunk_id, dist, index_path))
except Exception as exc:
self.logger.debug(
"Dense search failed for %s: %s", index_path, exc
)
if not coarse_candidates:
self.logger.info("No dense candidates found, falling back to hybrid cascade")
return self.hybrid_cascade_search(query, source_path, k, coarse_k, options)
# Sort by distance (ascending for cosine distance) and take top coarse_k
coarse_candidates.sort(key=lambda x: x[1])
coarse_candidates = coarse_candidates[:coarse_k]
self.logger.debug(
"Dense coarse search: %d candidates in %.2fms",
len(coarse_candidates), (time.time() - dense_coarse_time) * 1000
)
# Step 5: Build SearchResult objects for cross-encoder reranking
candidates_by_index: Dict[Path, List[int]] = {}
for chunk_id, distance, index_path in coarse_candidates:
if index_path not in candidates_by_index:
candidates_by_index[index_path] = []
candidates_by_index[index_path].append(chunk_id)
# Retrieve chunk content for reranking
import sqlite3
coarse_results: List[SearchResult] = []
for index_path, chunk_ids in candidates_by_index.items():
try:
# Query semantic_chunks table directly
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
cursor = conn.execute(
f"""
SELECT id, file_path, content, metadata, category
FROM semantic_chunks
WHERE id IN ({placeholders})
""",
chunk_ids
)
chunks_data = [
{
"id": row["id"],
"file_path": row["file_path"],
"content": row["content"],
"metadata": row["metadata"],
"category": row["category"],
}
for row in cursor.fetchall()
]
conn.close()
for chunk in chunks_data:
chunk_id = chunk.get("id")
distance = next(
(d for cid, d, _ in coarse_candidates if cid == chunk_id),
1.0
)
# Convert cosine distance to score
score = 1.0 - distance
content = chunk.get("content", "")
result = SearchResult(
path=chunk.get("file_path", ""),
score=float(score),
excerpt=content[:500] if content else "",
content=content,
)
coarse_results.append(result)
except Exception as exc:
self.logger.debug(
"Failed to retrieve chunks from %s: %s", index_path, exc
)
if not coarse_results:
stats.time_ms = (time.time() - start_time) * 1000
return ChainSearchResult(
query=query, results=[], symbols=[], stats=stats
)
self.logger.debug(
"Retrieved %d chunks for cross-encoder reranking", len(coarse_results)
)
# Step 6: Cross-encoder reranking
rerank_time = time.time()
reranked_results = self._cross_encoder_rerank(query, coarse_results, top_k=k)
self.logger.debug(
"Cross-encoder reranking: %d results in %.2fms",
len(reranked_results), (time.time() - rerank_time) * 1000
)
# Deduplicate by path (keep highest score)
path_to_result: Dict[str, SearchResult] = {}
for result in reranked_results:
if result.path not in path_to_result or result.score > path_to_result[result.path].score:
path_to_result[result.path] = result
final_results = list(path_to_result.values())[:k]
stats.files_matched = len(final_results)
stats.time_ms = (time.time() - start_time) * 1000
self.logger.debug(
"Dense+Rerank cascade search complete: %d results in %.2fms",
len(final_results),
stats.time_ms,
)
return ChainSearchResult(
query=query,
results=final_results,
symbols=[],
stats=stats,
)
def _get_or_create_binary_index(self, index_path: Path) -> Optional[Any]:
"""Get or create a BinaryANNIndex for the given index path.