Add benchmark results for fast3 and fast4, implement KeepAliveLspBridge, and add tests for staged strategies

- Added new benchmark result files: compare_2026-02-09_score_fast3.json and compare_2026-02-09_score_fast4.json.
- Implemented KeepAliveLspBridge to maintain a persistent LSP connection across multiple queries, improving performance.
- Created unit tests for staged clustering strategies in test_staged_stage3_fast_strategies.py, ensuring correct behavior of score and dir_rr strategies.
This commit is contained in:
catlog22
2026-02-09 20:45:29 +08:00
parent c62d26183b
commit 4344e79e68
64 changed files with 6154 additions and 123 deletions

View File

@@ -3486,6 +3486,81 @@ def index_binary(
console.print(f" [dim]... and {len(errors_list) - 3} more[/dim]")
@index_app.command("binary-mmap")
def index_binary_mmap(
path: Annotated[Path, typer.Argument(help="Project directory (indexed) or _index.db file")],
force: Annotated[bool, typer.Option("--force", "-f", help="Force rebuild binary mmap + metadata")] = False,
embedding_dim: Annotated[Optional[int], typer.Option("--embedding-dim", help="Only use embeddings with this dimension (e.g. 768)")] = None,
json_mode: Annotated[bool, typer.Option("--json", help="Output JSON response")] = False,
verbose: Annotated[bool, typer.Option("--verbose", "-v", help="Enable verbose logging")] = False,
) -> None:
"""Build centralized `_binary_vectors.mmap` from existing embeddings (no model calls).
This command enables the staged binary coarse search without regenerating
embeddings and without triggering global model locks. It:
- scans distributed semantic_chunks.embedding blobs under the index root
- assigns global chunk_ids
- writes `<index_root>/_binary_vectors.mmap` (+ `.meta.json`)
- writes `<index_root>/_vectors_meta.db` (chunk_metadata + binary_vectors)
"""
_configure_logging(verbose, json_mode)
from codexlens.cli.embedding_manager import build_centralized_binary_vectors_from_existing
target_path = path.expanduser().resolve()
# Resolve index_root similar to other index commands.
if target_path.is_file() and target_path.name == "_index.db":
index_root = target_path.parent
else:
registry = RegistryStore()
try:
registry.initialize()
mapper = PathMapper()
index_db = mapper.source_to_index_db(target_path)
if not index_db.exists():
msg = f"No index found for {target_path}"
if json_mode:
print_json(success=False, error=msg)
else:
console.print(f"[red]Error:[/red] {msg}")
console.print("Run `codexlens index init` first to create an index.")
raise typer.Exit(code=1)
index_root = index_db.parent
finally:
registry.close()
def progress_update(message: str) -> None:
if json_mode:
return
console.print(f"[dim]{message}[/dim]")
result = build_centralized_binary_vectors_from_existing(
index_root,
force=force,
embedding_dim=embedding_dim,
progress_callback=progress_update,
)
if json_mode:
print_json(**result)
return
if not result.get("success"):
console.print(f"[red]Error:[/red] {result.get('error', 'Unknown error')}")
hint = result.get("hint")
if hint:
console.print(f"[dim]{hint}[/dim]")
raise typer.Exit(code=1)
data = result.get("result", {})
console.print("\n[green]Binary mmap build complete[/green]")
console.print(f" Index root: {data.get('index_root')}")
console.print(f" Chunks written: {data.get('chunks_written'):,}")
console.print(f" Binary mmap: {data.get('binary_mmap')}")
console.print(f" Meta DB: {data.get('vectors_meta_db')}")
# ==================== Index Status Command ====================
@index_app.command("status")

View File

@@ -860,6 +860,294 @@ def _discover_index_dbs_internal(index_root: Path) -> List[Path]:
return sorted(index_root.rglob("_index.db"))
def build_centralized_binary_vectors_from_existing(
index_root: Path,
*,
force: bool = False,
embedding_dim: Optional[int] = None,
progress_callback: Optional[callable] = None,
) -> Dict[str, Any]:
"""Build centralized binary vectors + metadata from existing semantic_chunks embeddings.
This is a fast-path for enabling the staged binary coarse search without
regenerating embeddings (and without triggering global model locks).
It scans all distributed `_index.db` files under `index_root`, reads
existing `semantic_chunks.embedding` blobs, assigns new global chunk_ids,
and writes:
- `<index_root>/_binary_vectors.mmap` (+ `.meta.json`)
- `<index_root>/_vectors_meta.db` (chunk_metadata + binary_vectors)
"""
from codexlens.config import BINARY_VECTORS_MMAP_NAME, VECTORS_META_DB_NAME
from codexlens.storage.vector_meta_store import VectorMetadataStore
index_root = Path(index_root).resolve()
vectors_meta_path = index_root / VECTORS_META_DB_NAME
mmap_path = index_root / BINARY_VECTORS_MMAP_NAME
meta_path = mmap_path.with_suffix(".meta.json")
index_files = _discover_index_dbs_internal(index_root)
if not index_files:
return {"success": False, "error": f"No _index.db files found under {index_root}"}
if progress_callback:
progress_callback(f"Scanning {len(index_files)} index databases for existing embeddings...")
# First pass: detect embedding dims present.
dims_seen: Dict[int, int] = {}
selected_config: Optional[Dict[str, Any]] = None
for index_path in index_files:
try:
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
has_table = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='semantic_chunks'"
).fetchone()
if not has_table:
continue
dim_row = conn.execute(
"SELECT backend, model_profile, model_name, embedding_dim FROM embeddings_config WHERE id=1"
).fetchone()
if dim_row and dim_row[3]:
dim_val = int(dim_row[3])
dims_seen[dim_val] = dims_seen.get(dim_val, 0) + 1
if selected_config is None:
selected_config = {
"backend": dim_row[0],
"model_profile": dim_row[1],
"model_name": dim_row[2],
"embedding_dim": dim_val,
}
# We count per-dim later after selecting a target dim.
except Exception:
continue
if not dims_seen:
return {"success": False, "error": "No embeddings_config found under index_root"}
if embedding_dim is None:
# Default: pick the most common embedding dim across indexes.
embedding_dim = max(dims_seen.items(), key=lambda kv: kv[1])[0]
embedding_dim = int(embedding_dim)
if progress_callback and len(dims_seen) > 1:
progress_callback(f"Mixed embedding dims detected, selecting dim={embedding_dim} (seen={dims_seen})")
# Re-detect the selected model config for this dim (do not reuse an arbitrary first-seen config).
selected_config = None
# Second pass: count only chunks matching selected dim.
total_chunks = 0
for index_path in index_files:
try:
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
has_table = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='semantic_chunks'"
).fetchone()
if not has_table:
continue
dim_row = conn.execute(
"SELECT backend, model_profile, model_name, embedding_dim FROM embeddings_config WHERE id=1"
).fetchone()
dim_val = int(dim_row[3]) if dim_row and dim_row[3] else None
if dim_val != embedding_dim:
continue
if selected_config is None:
selected_config = {
"backend": dim_row[0],
"model_profile": dim_row[1],
"model_name": dim_row[2],
"embedding_dim": dim_val,
}
row = conn.execute(
"SELECT COUNT(*) FROM semantic_chunks WHERE embedding IS NOT NULL AND length(embedding) > 0"
).fetchone()
total_chunks += int(row[0] if row else 0)
except Exception:
continue
if not total_chunks:
return {
"success": False,
"error": f"No existing embeddings found for embedding_dim={embedding_dim}",
"dims_seen": dims_seen,
}
if progress_callback:
progress_callback(f"Found {total_chunks} embedded chunks (dim={embedding_dim}). Building binary vectors...")
# Prepare output files / DB.
try:
import numpy as np
except Exception as exc:
return {"success": False, "error": f"numpy required to build binary vectors: {exc}"}
store = VectorMetadataStore(vectors_meta_path)
store._ensure_schema()
if force:
try:
store.clear()
except Exception:
pass
try:
store.clear_binary_vectors()
except Exception:
pass
try:
if mmap_path.exists():
mmap_path.unlink()
except Exception:
pass
try:
if meta_path.exists():
meta_path.unlink()
except Exception:
pass
bytes_per_vec = (int(embedding_dim) + 7) // 8
mmap = np.memmap(
str(mmap_path),
dtype=np.uint8,
mode="w+",
shape=(int(total_chunks), int(bytes_per_vec)),
)
chunk_ids: List[int] = []
chunks_batch: List[Dict[str, Any]] = []
bin_ids_batch: List[int] = []
bin_vecs_batch: List[bytes] = []
batch_limit = 500
global_id = 1
write_idx = 0
skipped_indexes: Dict[str, int] = {}
for index_path in index_files:
try:
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
has_table = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='semantic_chunks'"
).fetchone()
if not has_table:
continue
dim_row = conn.execute(
"SELECT embedding_dim FROM embeddings_config WHERE id=1"
).fetchone()
dim_val = int(dim_row[0]) if dim_row and dim_row[0] else None
if dim_val != embedding_dim:
skipped_indexes[str(index_path)] = dim_val or -1
continue
rows = conn.execute(
"SELECT file_path, content, embedding, metadata, category FROM semantic_chunks "
"WHERE embedding IS NOT NULL AND length(embedding) > 0"
).fetchall()
for row in rows:
emb = np.frombuffer(row["embedding"], dtype=np.float32)
if emb.size != int(embedding_dim):
continue
packed = np.packbits((emb > 0).astype(np.uint8))
if packed.size != bytes_per_vec:
continue
mmap[write_idx] = packed
write_idx += 1
cid = global_id
global_id += 1
chunk_ids.append(cid)
meta_raw = row["metadata"]
meta_dict: Dict[str, Any] = {}
if meta_raw:
try:
meta_dict = json.loads(meta_raw) if isinstance(meta_raw, str) else dict(meta_raw)
except Exception:
meta_dict = {}
chunks_batch.append(
{
"chunk_id": cid,
"file_path": row["file_path"],
"content": row["content"],
"start_line": meta_dict.get("start_line"),
"end_line": meta_dict.get("end_line"),
"category": row["category"],
"metadata": meta_dict,
"source_index_db": str(index_path),
}
)
bin_ids_batch.append(cid)
bin_vecs_batch.append(packed.tobytes())
if len(chunks_batch) >= batch_limit:
store.add_chunks(chunks_batch)
store.add_binary_vectors(bin_ids_batch, bin_vecs_batch)
chunks_batch = []
bin_ids_batch = []
bin_vecs_batch = []
except Exception:
continue
if chunks_batch:
store.add_chunks(chunks_batch)
store.add_binary_vectors(bin_ids_batch, bin_vecs_batch)
mmap.flush()
del mmap
# If we skipped inconsistent vectors, truncate metadata to actual write count.
chunk_ids = chunk_ids[:write_idx]
# Write sidecar metadata.
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(
{
"shape": [int(write_idx), int(bytes_per_vec)],
"chunk_ids": chunk_ids,
"embedding_dim": int(embedding_dim),
"backend": (selected_config or {}).get("backend"),
"model_profile": (selected_config or {}).get("model_profile"),
"model_name": (selected_config or {}).get("model_name"),
},
f,
)
if progress_callback:
progress_callback(f"Binary vectors ready: {mmap_path} (rows={write_idx})")
return {
"success": True,
"result": {
"index_root": str(index_root),
"index_files_scanned": len(index_files),
"chunks_total": int(total_chunks),
"chunks_written": int(write_idx),
"embedding_dim": int(embedding_dim),
"bytes_per_vector": int(bytes_per_vec),
"skipped_indexes": len(skipped_indexes),
"vectors_meta_db": str(vectors_meta_path),
"binary_mmap": str(mmap_path),
"binary_meta_json": str(meta_path),
},
}
def discover_all_index_dbs(index_root: Path) -> List[Path]:
"""Recursively find all _index.db files in an index tree.
@@ -1804,4 +2092,4 @@ def check_global_model_lock(
"has_conflict": has_conflict,
"locked_config": locked_config,
"target_config": {"backend": target_backend, "model": target_model},
}
}

View File

@@ -153,7 +153,7 @@ class Config:
staged_realtime_lsp_max_concurrent: int = 2 # Max concurrent LSP requests during graph expansion
staged_realtime_lsp_warmup_s: float = 3.0 # Wait for server analysis after opening seed docs
staged_realtime_lsp_resolve_symbols: bool = False # If True, resolves symbol names via documentSymbol (slower)
staged_clustering_strategy: str = "auto" # "auto", "hdbscan", "dbscan", "frequency", "noop"
staged_clustering_strategy: str = "auto" # "auto", "hdbscan", "dbscan", "frequency", "noop", "score", "dir_rr", "path"
staged_clustering_min_size: int = 3 # Minimum cluster size for Stage 3 grouping
enable_staged_rerank: bool = True # Enable optional cross-encoder reranking in Stage 4

View File

@@ -0,0 +1,135 @@
"""Keep-alive wrapper for Standalone LSP servers in synchronous workflows.
The staged realtime pipeline calls into LSP from synchronous code paths.
Creating a fresh asyncio loop per query (via asyncio.run) forces language
servers to start/stop every time, which is slow and can trigger shutdown
timeouts on Windows.
This module runs an asyncio event loop in a background thread and keeps a
single LspBridge (and its StandaloneLspManager + subprocesses) alive across
multiple queries. Callers submit coroutines that operate on the shared bridge.
"""
from __future__ import annotations
import atexit
import asyncio
import threading
from dataclasses import dataclass
from typing import Awaitable, Callable, Optional, TypeVar
from codexlens.lsp.lsp_bridge import LspBridge
T = TypeVar("T")
@dataclass(frozen=True)
class KeepAliveKey:
workspace_root: str
config_file: Optional[str]
timeout: float
class KeepAliveLspBridge:
"""Runs a shared LspBridge on a dedicated event loop thread."""
def __init__(self, *, workspace_root: str, config_file: Optional[str], timeout: float) -> None:
self._key = KeepAliveKey(workspace_root=workspace_root, config_file=config_file, timeout=float(timeout))
self._lock = threading.RLock()
self._call_lock = threading.RLock()
self._ready = threading.Event()
self._thread: Optional[threading.Thread] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._bridge: Optional[LspBridge] = None
self._stopped = False
atexit.register(self.stop)
@property
def key(self) -> KeepAliveKey:
return self._key
def start(self) -> None:
with self._lock:
if self._stopped:
raise RuntimeError("KeepAliveLspBridge is stopped")
if self._thread is not None and self._thread.is_alive():
return
self._ready.clear()
thread = threading.Thread(target=self._run, name="codexlens-lsp-keepalive", daemon=True)
self._thread = thread
thread.start()
if not self._ready.wait(timeout=10.0):
raise RuntimeError("Timed out starting LSP keep-alive loop")
def stop(self) -> None:
with self._lock:
if self._stopped:
return
self._stopped = True
loop = self._loop
bridge = self._bridge
thread = self._thread
if loop is not None and bridge is not None:
try:
fut = asyncio.run_coroutine_threadsafe(bridge.close(), loop)
fut.result(timeout=5.0)
except Exception:
pass
try:
loop.call_soon_threadsafe(loop.stop)
except Exception:
pass
if thread is not None:
try:
thread.join(timeout=5.0)
except Exception:
pass
def run(self, fn: Callable[[LspBridge], Awaitable[T]], *, timeout: Optional[float] = None) -> T:
"""Run an async function against the shared LspBridge and return its result."""
self.start()
loop = self._loop
bridge = self._bridge
if loop is None or bridge is None:
raise RuntimeError("Keep-alive loop not initialized")
async def _call() -> T:
return await fn(bridge)
# Serialize bridge usage to avoid overlapping LSP request storms.
with self._call_lock:
fut = asyncio.run_coroutine_threadsafe(_call(), loop)
return fut.result(timeout=float(timeout or self._key.timeout) + 1.0)
def _run(self) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
bridge = LspBridge(
workspace_root=self._key.workspace_root,
config_file=self._key.config_file,
timeout=self._key.timeout,
)
with self._lock:
self._loop = loop
self._bridge = bridge
self._ready.set()
try:
loop.run_forever()
finally:
try:
if self._bridge is not None:
loop.run_until_complete(self._bridge.close())
except Exception:
pass
try:
loop.close()
except Exception:
pass

View File

@@ -103,6 +103,7 @@ class StandaloneLspManager:
self._configs: Dict[str, ServerConfig] = {} # language_id -> ServerConfig
self._read_tasks: Dict[str, asyncio.Task] = {} # language_id -> read task
self._stderr_tasks: Dict[str, asyncio.Task] = {} # language_id -> stderr read task
self._processor_tasks: Dict[str, asyncio.Task] = {} # language_id -> message processor task
self._lock = asyncio.Lock()
def _find_config_file(self) -> Optional[Path]:
@@ -269,7 +270,7 @@ class StandaloneLspManager:
)
# Start the message processor task to handle queued messages
asyncio.create_task(self._process_messages(language_id))
self._processor_tasks[language_id] = asyncio.create_task(self._process_messages(language_id))
# Initialize the server - now uses queue for reading responses
await self._initialize_server(state)
@@ -311,6 +312,15 @@ class StandaloneLspManager:
except asyncio.CancelledError:
pass
# Cancel message processor task
processor_task = self._processor_tasks.pop(language_id, None)
if processor_task:
processor_task.cancel()
try:
await processor_task
except asyncio.CancelledError:
pass
# Send shutdown request
try:
await self._send_request(state, "shutdown", None, timeout=5.0)

View File

@@ -66,6 +66,10 @@ class BinarySearcher:
self._binary_matrix: Optional[np.ndarray] = None
self._is_memmap = False
self._loaded = False
self._embedding_dim: Optional[int] = None
self._backend: Optional[str] = None
self._model: Optional[str] = None
self._model_profile: Optional[str] = None
def load(self) -> bool:
"""Load binary vectors using memory-mapped file or database fallback.
@@ -90,6 +94,10 @@ class BinarySearcher:
shape = tuple(meta['shape'])
self._chunk_ids = np.array(meta['chunk_ids'], dtype=np.int64)
self._embedding_dim = meta.get("embedding_dim")
self._backend = meta.get("backend")
self._model = meta.get("model") or meta.get("model_name")
self._model_profile = meta.get("model_profile")
# Memory-map the binary matrix (read-only)
self._binary_matrix = np.memmap(
@@ -141,6 +149,10 @@ class BinarySearcher:
self._binary_matrix = np.vstack(binary_arrays)
self._is_memmap = False
self._loaded = True
self._embedding_dim = None
self._backend = None
self._model = None
self._model_profile = None
logger.info(
"Loaded %d binary vectors from DB (%d bytes each)",
@@ -261,6 +273,26 @@ class BinarySearcher:
"""Get number of loaded binary vectors."""
return len(self._chunk_ids) if self._chunk_ids is not None else 0
@property
def embedding_dim(self) -> Optional[int]:
"""Embedding dimension used to build these binary vectors (if known)."""
return int(self._embedding_dim) if self._embedding_dim is not None else None
@property
def backend(self) -> Optional[str]:
"""Embedding backend used to build these vectors (if known)."""
return self._backend
@property
def model(self) -> Optional[str]:
"""Embedding model name used to build these vectors (if known)."""
return self._model
@property
def model_profile(self) -> Optional[str]:
"""Embedding profile name (fastembed) used to build these vectors (if known)."""
return self._model_profile
@property
def is_memmap(self) -> bool:
"""Check if using memory-mapped file (vs in-memory array)."""

View File

@@ -13,6 +13,7 @@ from typing import List, Optional, Dict, Any, Literal, Tuple, TYPE_CHECKING
import json
import logging
import os
import threading
import time
from codexlens.entities import SearchResult, Symbol
@@ -32,7 +33,7 @@ from codexlens.storage.global_index import GlobalSymbolIndex
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.sqlite_store import SQLiteStore
from codexlens.storage.vector_meta_store import VectorMetadataStore
from codexlens.config import VECTORS_META_DB_NAME
from codexlens.config import BINARY_VECTORS_MMAP_NAME, VECTORS_META_DB_NAME
from codexlens.search.hybrid_search import HybridSearchEngine
@@ -165,6 +166,9 @@ class ChainSearchEngine:
self._max_workers = max_workers
self._executor: Optional[ThreadPoolExecutor] = None
self._config = config
self._realtime_lsp_keepalive_lock = threading.RLock()
self._realtime_lsp_keepalive = None
self._realtime_lsp_keepalive_key = None
def _get_executor(self, max_workers: Optional[int] = None) -> ThreadPoolExecutor:
"""Get or create the shared thread pool executor.
@@ -187,6 +191,15 @@ class ChainSearchEngine:
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
with self._realtime_lsp_keepalive_lock:
keepalive = self._realtime_lsp_keepalive
self._realtime_lsp_keepalive = None
self._realtime_lsp_keepalive_key = None
if keepalive is not None:
try:
keepalive.stop()
except Exception:
pass
def __enter__(self) -> "ChainSearchEngine":
"""Context manager entry."""
@@ -838,7 +851,11 @@ class ChainSearchEngine:
# ========== Stage 1: Binary Coarse Search ==========
stage1_start = time.time()
coarse_results, index_root = self._stage1_binary_search(
query, index_paths, coarse_k, stats
query,
index_paths,
coarse_k,
stats,
index_root=start_index.parent,
)
stage_times["stage1_binary_ms"] = (time.time() - stage1_start) * 1000
stage_counts["stage1_candidates"] = len(coarse_results)
@@ -849,14 +866,47 @@ class ChainSearchEngine:
)
if not coarse_results:
self.logger.debug("No binary candidates found, falling back to standard search")
return self.search(query, source_path, options=options)
# Keep the staged pipeline running even when Stage 1 yields no candidates.
# This makes "realtime LSP graph → clustering → rerank" comparable across queries.
self.logger.debug(
"No Stage 1 candidates found; seeding staged pipeline with FTS results"
)
stage1_fallback_start = time.time()
try:
seed_opts = SearchOptions(
depth=options.depth,
max_workers=options.max_workers,
limit_per_dir=max(10, int(coarse_k)),
total_limit=int(coarse_k),
include_symbols=True,
enable_vector=False,
hybrid_mode=False,
enable_cascade=False,
)
seed = self.search(query, source_path, options=seed_opts)
coarse_results = list(seed.results or [])[: int(coarse_k)]
stage_counts["stage1_fallback_used"] = 1
except Exception as exc:
self.logger.debug("Stage 1 fallback seeding failed: %r", exc)
coarse_results = []
stage_times["stage1_fallback_search_ms"] = (time.time() - stage1_fallback_start) * 1000
stage_counts["stage1_candidates"] = len(coarse_results)
if not coarse_results:
return ChainSearchResult(query=query, results=[], symbols=[], stats=stats)
# ========== Stage 2: LSP Graph Expansion ==========
stage2_start = time.time()
expanded_results = self._stage2_lsp_expand(coarse_results, index_root, query=query)
stage_times["stage2_expand_ms"] = (time.time() - stage2_start) * 1000
stage_counts["stage2_expanded"] = len(expanded_results)
try:
stage2_unique_paths = len({(r.path or "").lower() for r in expanded_results if getattr(r, "path", None)})
except Exception:
stage2_unique_paths = 0
stage_counts["stage2_unique_paths"] = stage2_unique_paths
stage_counts["stage2_duplicate_paths"] = max(0, len(expanded_results) - stage2_unique_paths)
self.logger.debug(
"Staged Stage 2: LSP expansion %d -> %d results in %.2fms",
@@ -868,6 +918,11 @@ class ChainSearchEngine:
clustered_results = self._stage3_cluster_prune(expanded_results, k * 2)
stage_times["stage3_cluster_ms"] = (time.time() - stage3_start) * 1000
stage_counts["stage3_clustered"] = len(clustered_results)
if self._config is not None:
try:
stage_counts["stage3_strategy"] = str(getattr(self._config, "staged_clustering_strategy", "auto") or "auto")
except Exception:
pass
self.logger.debug(
"Staged Stage 3: Clustering %d -> %d representatives in %.2fms",
@@ -944,6 +999,8 @@ class ChainSearchEngine:
index_paths: List[Path],
coarse_k: int,
stats: SearchStats,
*,
index_root: Optional[Path] = None,
) -> Tuple[List[SearchResult], Optional[Path]]:
"""Stage 1: Binary vector coarse search using Hamming distance.
@@ -967,8 +1024,12 @@ class ChainSearchEngine:
)
return [], None
# Try centralized BinarySearcher first (preferred for mmap indexes)
index_root = index_paths[0].parent if index_paths else None
# Try centralized BinarySearcher first (preferred for mmap indexes).
# Centralized binary vectors live at a project index root (where `index binary-mmap`
# was run), which may be an ancestor of the nearest `_index.db` directory.
index_root = Path(index_root).resolve() if index_root is not None else (index_paths[0].parent if index_paths else None)
if index_root is not None:
index_root = self._find_nearest_binary_mmap_root(index_root)
coarse_candidates: List[Tuple[int, float, Path]] = [] # (chunk_id, distance, index_path)
used_centralized = False
using_dense_fallback = False
@@ -977,9 +1038,26 @@ class ChainSearchEngine:
binary_searcher = self._get_centralized_binary_searcher(index_root)
if binary_searcher is not None:
try:
from codexlens.semantic.embedder import Embedder
embedder = Embedder()
query_dense = embedder.embed_to_numpy([query])[0]
use_gpu = True
if self._config is not None:
use_gpu = getattr(self._config, "embedding_use_gpu", True)
query_dense = None
backend = getattr(binary_searcher, "backend", None)
model = getattr(binary_searcher, "model", None)
profile = getattr(binary_searcher, "model_profile", None) or "code"
if backend == "litellm":
try:
from codexlens.semantic.factory import get_embedder as get_factory_embedder
embedder = get_factory_embedder(backend="litellm", model=model or "code")
query_dense = embedder.embed_to_numpy([query])[0]
except Exception:
query_dense = None
if query_dense is None:
from codexlens.semantic.embedder import get_embedder
embedder = get_embedder(profile=str(profile), use_gpu=use_gpu)
query_dense = embedder.embed_to_numpy([query])[0]
results = binary_searcher.search(query_dense, top_k=coarse_k)
for chunk_id, distance in results:
@@ -1531,34 +1609,26 @@ class ChainSearchEngine:
if not seed_nodes:
return coarse_results
async def expand_graph():
async with LspBridge(
workspace_root=str(workspace_root),
config_file=str(lsp_config_file) if lsp_config_file else None,
timeout=timeout_s,
) as bridge:
# Warm up analysis: open seed docs and wait a bit so references/call hierarchy are populated.
if warmup_s > 0:
for seed in seed_nodes[:3]:
try:
await bridge.get_document_symbols(seed.file_path)
except Exception:
continue
async def expand_graph(bridge: LspBridge):
# Warm up analysis: open seed docs and wait a bit so references/call hierarchy are populated.
if warmup_s > 0:
for seed in seed_nodes[:3]:
try:
warmup_budget = min(warmup_s, max(0.0, timeout_s * 0.1))
await asyncio.sleep(min(warmup_budget, max(0.0, timeout_s - 0.5)))
await bridge.get_document_symbols(seed.file_path)
except Exception:
pass
builder = LspGraphBuilder(
max_depth=max_depth,
max_nodes=max_nodes,
max_concurrent=max(1, max_concurrent),
resolve_symbols=resolve_symbols,
)
return await builder.build_from_seeds(seed_nodes, bridge)
def run_coro_blocking():
return asyncio.run(asyncio.wait_for(expand_graph(), timeout=timeout_s))
continue
try:
warmup_budget = min(warmup_s, max(0.0, timeout_s * 0.1))
await asyncio.sleep(min(warmup_budget, max(0.0, timeout_s - 0.5)))
except Exception:
pass
builder = LspGraphBuilder(
max_depth=max_depth,
max_nodes=max_nodes,
max_concurrent=max(1, max_concurrent),
resolve_symbols=resolve_symbols,
)
return await builder.build_from_seeds(seed_nodes, bridge)
try:
try:
@@ -1569,9 +1639,43 @@ class ChainSearchEngine:
if has_running_loop:
with ThreadPoolExecutor(max_workers=1) as executor:
graph = executor.submit(run_coro_blocking).result(timeout=timeout_s + 1.0)
async def _expand_once():
async with LspBridge(
workspace_root=str(workspace_root),
config_file=str(lsp_config_file) if lsp_config_file else None,
timeout=timeout_s,
) as bridge:
return await expand_graph(bridge)
def _run():
return asyncio.run(asyncio.wait_for(_expand_once(), timeout=timeout_s))
graph = executor.submit(_run).result(timeout=timeout_s + 1.0)
else:
graph = run_coro_blocking()
from codexlens.lsp.keepalive_bridge import KeepAliveKey, KeepAliveLspBridge
key = KeepAliveKey(
workspace_root=str(workspace_root),
config_file=str(lsp_config_file) if lsp_config_file else None,
timeout=float(timeout_s),
)
with self._realtime_lsp_keepalive_lock:
keepalive = self._realtime_lsp_keepalive
if keepalive is None or self._realtime_lsp_keepalive_key != key:
if keepalive is not None:
try:
keepalive.stop()
except Exception:
pass
keepalive = KeepAliveLspBridge(
workspace_root=key.workspace_root,
config_file=key.config_file,
timeout=key.timeout,
)
self._realtime_lsp_keepalive = keepalive
self._realtime_lsp_keepalive_key = key
graph = keepalive.run(expand_graph, timeout=timeout_s)
except Exception as exc:
self.logger.debug("Stage 2 (realtime) expansion failed: %r", exc)
return coarse_results
@@ -1705,6 +1809,57 @@ class ChainSearchEngine:
if len(expanded_results) <= target_count:
return expanded_results
strategy_name = "auto"
if self._config is not None:
strategy_name = getattr(self._config, "staged_clustering_strategy", "auto") or "auto"
strategy_name = str(strategy_name).strip().lower()
if strategy_name in {"noop", "none", "off"}:
return sorted(expanded_results, key=lambda r: r.score, reverse=True)[:target_count]
if strategy_name in {"score", "top", "rank"}:
return sorted(expanded_results, key=lambda r: r.score, reverse=True)[:target_count]
if strategy_name in {"path", "file"}:
best_by_path: Dict[str, SearchResult] = {}
for r in expanded_results:
if not r.path:
continue
key = str(r.path).lower()
if key not in best_by_path or r.score > best_by_path[key].score:
best_by_path[key] = r
candidates = list(best_by_path.values()) or expanded_results
candidates.sort(key=lambda r: r.score, reverse=True)
return candidates[:target_count]
if strategy_name in {"dir_rr", "rr_dir", "round_robin_dir"}:
results_sorted = sorted(expanded_results, key=lambda r: r.score, reverse=True)
buckets: Dict[str, List[SearchResult]] = {}
dir_order: List[str] = []
for r in results_sorted:
try:
d = str(Path(r.path).parent).lower()
except Exception:
d = ""
if d not in buckets:
buckets[d] = []
dir_order.append(d)
buckets[d].append(r)
out: List[SearchResult] = []
while len(out) < target_count:
progressed = False
for d in dir_order:
if not buckets.get(d):
continue
out.append(buckets[d].pop(0))
progressed = True
if len(out) >= target_count:
break
if not progressed:
break
return out
try:
from codexlens.search.clustering import (
ClusteringConfig,
@@ -2550,6 +2705,31 @@ class ChainSearchEngine:
self.logger.debug("Failed to load centralized binary searcher: %s", exc)
return None
def _find_nearest_binary_mmap_root(self, index_root: Path, *, max_levels: int = 10) -> Path:
"""Walk up index_root parents to find the nearest centralized binary mmap.
Centralized staged-binary artifacts are stored at a project index root
(e.g. `.../project/src/_binary_vectors.mmap`), but staged search often starts
from the nearest ancestor `_index.db` path, which can be nested deeper.
This helper makes Stage 1 robust by locating the nearest ancestor directory
that contains the centralized `_binary_vectors.mmap`.
"""
current_dir = Path(index_root).resolve()
for _ in range(max(0, int(max_levels)) + 1):
try:
if (current_dir / BINARY_VECTORS_MMAP_NAME).exists():
return current_dir
except Exception:
return Path(index_root).resolve()
parent = current_dir.parent
if parent == current_dir:
break
current_dir = parent
return Path(index_root).resolve()
def _compute_cosine_similarity(
self,
query_vec: "np.ndarray",