feat: enhance search, ranking, reranker and CLI tooling across ccw and codex-lens

Major improvements to smart-search, chain-search cascade, ranking pipeline,
reranker factory, CLI history store, codex-lens integration, and uv-manager.
Simplify command-generator skill by inlining phases. Add comprehensive tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
catlog22
2026-03-16 20:35:08 +08:00
parent 1cd96b90e8
commit 5a4b18d9b1
73 changed files with 14684 additions and 2442 deletions

View File

@@ -11,12 +11,25 @@ Common Fixtures:
- sample_code_files: Factory for creating sample code files
"""
import pytest
import tempfile
import shutil
from pathlib import Path
from typing import Dict, Any
import sqlite3
import shutil
import tempfile
import warnings
from pathlib import Path
from typing import Any, Dict
import pytest
warnings.filterwarnings(
"ignore",
message=r"'BaseCommand' is deprecated and will be removed in Click 9\.0\..*",
category=DeprecationWarning,
)
warnings.filterwarnings(
"ignore",
message=r"The '__version__' attribute is deprecated and will be removed in Click 9\.1\..*",
category=DeprecationWarning,
)
@pytest.fixture

View File

@@ -98,6 +98,23 @@ class TestANNIndex:
assert ids[0] == 1 # ID of first vector
assert distances[0] < 0.01 # Very small distance (almost identical)
@pytest.mark.skipif(
not _hnswlib_available(),
reason="hnswlib not installed"
)
def test_search_clamps_top_k_to_available_vectors(self, temp_db, sample_vectors, sample_ids):
"""Search should clamp top_k to the loaded vector count."""
from codexlens.semantic.ann_index import ANNIndex
index = ANNIndex(temp_db, dim=384)
index.add_vectors(sample_ids[:3], sample_vectors[:3])
ids, distances = index.search(sample_vectors[0], top_k=10)
assert len(ids) == 3
assert len(distances) == 3
assert ids[0] == 1
@pytest.mark.skipif(
not _hnswlib_available(),
reason="hnswlib not installed"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,350 @@
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
from types import SimpleNamespace
MODULE_PATH = Path(__file__).resolve().parents[1] / "benchmarks" / "compare_ccw_smart_search_stage2.py"
MODULE_NAME = "compare_ccw_smart_search_stage2_test_module"
MODULE_SPEC = importlib.util.spec_from_file_location(MODULE_NAME, MODULE_PATH)
assert MODULE_SPEC is not None and MODULE_SPEC.loader is not None
benchmark = importlib.util.module_from_spec(MODULE_SPEC)
sys.modules[MODULE_NAME] = benchmark
MODULE_SPEC.loader.exec_module(benchmark)
class _FakeChainResult:
def __init__(self, paths: list[str]) -> None:
self.results = [SimpleNamespace(path=path) for path in paths]
class _FakeEngine:
def __init__(
self,
*,
search_paths: list[str] | None = None,
cascade_paths: list[str] | None = None,
) -> None:
self.search_paths = search_paths or []
self.cascade_paths = cascade_paths or []
self.search_calls: list[dict[str, object]] = []
self.cascade_calls: list[dict[str, object]] = []
def search(self, query: str, source_path: Path, options: object) -> _FakeChainResult:
self.search_calls.append(
{
"query": query,
"source_path": source_path,
"options": options,
}
)
return _FakeChainResult(self.search_paths)
def cascade_search(
self,
query: str,
source_path: Path,
*,
k: int,
coarse_k: int,
options: object,
strategy: str,
) -> _FakeChainResult:
self.cascade_calls.append(
{
"query": query,
"source_path": source_path,
"k": k,
"coarse_k": coarse_k,
"options": options,
"strategy": strategy,
}
)
return _FakeChainResult(self.cascade_paths)
def test_strategy_specs_include_baselines_before_stage2_modes() -> None:
specs = benchmark._strategy_specs(
["realtime", "static_global_graph"],
include_dense_baseline=True,
baseline_methods=["auto", "fts", "hybrid"],
)
assert [spec.strategy_key for spec in specs] == [
"auto",
"fts",
"hybrid",
"dense_rerank",
"staged:realtime",
"staged:static_global_graph",
]
def test_select_effective_method_matches_cli_auto_routing() -> None:
assert benchmark._select_effective_method("find_descendant_project_roots", "auto") == "fts"
assert benchmark._select_effective_method("build dist artifact output", "auto") == "fts"
assert benchmark._select_effective_method("embedding backend fastembed local litellm api config", "auto") == "fts"
assert benchmark._select_effective_method("get_reranker factory onnx backend selection", "auto") == "fts"
assert benchmark._select_effective_method("how does the authentication flow work", "auto") == "dense_rerank"
assert benchmark._select_effective_method("how smart_search keyword routing works", "auto") == "hybrid"
def test_filter_dataset_by_query_match_uses_case_insensitive_substring() -> None:
dataset = [
{"query": "embedding backend fastembed local litellm api config", "relevant_paths": ["a"]},
{"query": "get_reranker factory onnx backend selection", "relevant_paths": ["b"]},
{"query": "how does smart search route keyword queries", "relevant_paths": ["c"]},
]
filtered = benchmark._filter_dataset_by_query_match(dataset, "BACKEND")
assert [item["query"] for item in filtered] == [
"embedding backend fastembed local litellm api config",
"get_reranker factory onnx backend selection",
]
narrow_filtered = benchmark._filter_dataset_by_query_match(dataset, "FASTEMBED")
assert [item["query"] for item in narrow_filtered] == [
"embedding backend fastembed local litellm api config",
]
unfiltered = benchmark._filter_dataset_by_query_match(dataset, None)
assert [item["query"] for item in unfiltered] == [item["query"] for item in dataset]
def test_apply_query_limit_runs_after_filtering() -> None:
dataset = [
{"query": "executeHybridMode dense_rerank semantic smart_search", "relevant_paths": ["a"]},
{"query": "embedding backend fastembed local litellm api config", "relevant_paths": ["b"]},
{"query": "reranker backend onnx api legacy configuration", "relevant_paths": ["c"]},
]
filtered = benchmark._filter_dataset_by_query_match(dataset, "backend")
limited = benchmark._apply_query_limit(filtered, 1)
assert [item["query"] for item in limited] == [
"embedding backend fastembed local litellm api config",
]
def test_make_progress_payload_reports_partial_completion() -> None:
args = SimpleNamespace(
queries_file=Path("queries.jsonl"),
k=10,
coarse_k=100,
)
strategy_specs = [
benchmark.StrategySpec(strategy_key="auto", strategy="auto", stage2_mode=None),
benchmark.StrategySpec(strategy_key="dense_rerank", strategy="dense_rerank", stage2_mode=None),
]
evaluations = [
benchmark.QueryEvaluation(
query="embedding backend fastembed local litellm api config",
intent="config",
notes=None,
relevant_paths=["codex-lens/src/codexlens/config.py"],
runs={
"auto": benchmark.StrategyRun(
strategy_key="auto",
strategy="auto",
stage2_mode=None,
effective_method="fts",
execution_method="fts",
latency_ms=123.0,
topk_paths=["config.py"],
first_hit_rank=1,
hit_at_k=True,
recall_at_k=1.0,
generated_artifact_count=0,
test_file_count=0,
error=None,
)
},
)
]
payload = benchmark._make_progress_payload(
args=args,
source_root=Path("D:/repo"),
strategy_specs=strategy_specs,
evaluations=evaluations,
query_index=1,
total_queries=3,
run_index=2,
total_runs=6,
current_query="embedding backend fastembed local litellm api config",
current_strategy_key="complete",
)
assert payload["status"] == "running"
assert payload["progress"]["completed_queries"] == 1
assert payload["progress"]["completed_runs"] == 2
assert payload["progress"]["total_runs"] == 6
assert payload["strategy_keys"] == ["auto", "dense_rerank"]
assert payload["evaluations"][0]["runs"]["auto"]["effective_method"] == "fts"
def test_write_final_outputs_updates_progress_snapshot(tmp_path: Path) -> None:
output_path = tmp_path / "results.json"
progress_path = tmp_path / "progress.json"
payload = {
"status": "completed",
"query_count": 1,
"strategies": {"auto": {"effective_methods": {"fts": 1}}},
}
benchmark._write_final_outputs(
output_path=output_path,
progress_output=progress_path,
payload=payload,
)
assert json.loads(output_path.read_text(encoding="utf-8")) == payload
assert json.loads(progress_path.read_text(encoding="utf-8")) == payload
def test_build_parser_defaults_reranker_gpu_to_disabled() -> None:
parser = benchmark.build_parser()
args = parser.parse_args([])
assert args.embedding_use_gpu is False
assert args.reranker_use_gpu is False
assert args.reranker_model == benchmark.DEFAULT_LOCAL_ONNX_RERANKER_MODEL
def test_build_strategy_runtime_clones_config(monkeypatch, tmp_path: Path) -> None:
class _FakeRegistry:
def __init__(self) -> None:
self.initialized = False
def initialize(self) -> None:
self.initialized = True
class _FakeMapper:
pass
class _FakeEngine:
def __init__(self, *, registry, mapper, config) -> None:
self.registry = registry
self.mapper = mapper
self.config = config
monkeypatch.setattr(benchmark, "RegistryStore", _FakeRegistry)
monkeypatch.setattr(benchmark, "PathMapper", _FakeMapper)
monkeypatch.setattr(benchmark, "ChainSearchEngine", _FakeEngine)
base_config = benchmark.Config(data_dir=tmp_path, reranker_use_gpu=False)
strategy_spec = benchmark.StrategySpec(strategy_key="dense_rerank", strategy="dense_rerank", stage2_mode=None)
runtime = benchmark._build_strategy_runtime(base_config, strategy_spec)
assert runtime.strategy_spec == strategy_spec
assert runtime.config is not base_config
assert runtime.config.reranker_use_gpu is False
assert runtime.registry.initialized is True
assert runtime.engine.config is runtime.config
def test_run_strategy_routes_auto_keyword_queries_to_fts_search() -> None:
engine = _FakeEngine(
search_paths=[
"D:/repo/src/codexlens/storage/registry.py",
"D:/repo/build/lib/codexlens/storage/registry.py",
]
)
config = SimpleNamespace(cascade_strategy="staged", staged_stage2_mode="realtime")
relevant = {benchmark._normalize_path_key("D:/repo/src/codexlens/storage/registry.py")}
run = benchmark._run_strategy(
engine,
config,
strategy_spec=benchmark.StrategySpec(strategy_key="auto", strategy="auto", stage2_mode=None),
query="find_descendant_project_roots",
source_path=Path("D:/repo"),
k=5,
coarse_k=20,
relevant=relevant,
)
assert len(engine.search_calls) == 1
assert len(engine.cascade_calls) == 0
assert run.effective_method == "fts"
assert run.execution_method == "fts"
assert run.hit_at_k is True
assert run.generated_artifact_count == 1
assert run.test_file_count == 0
def test_run_strategy_uses_cascade_for_dense_rerank_and_restores_config() -> None:
engine = _FakeEngine(cascade_paths=["D:/repo/src/tools/smart-search.ts"])
config = SimpleNamespace(cascade_strategy="staged", staged_stage2_mode="static_global_graph")
relevant = {benchmark._normalize_path_key("D:/repo/src/tools/smart-search.ts")}
run = benchmark._run_strategy(
engine,
config,
strategy_spec=benchmark.StrategySpec(
strategy_key="dense_rerank",
strategy="dense_rerank",
stage2_mode=None,
),
query="how does smart search route keyword queries",
source_path=Path("D:/repo"),
k=5,
coarse_k=20,
relevant=relevant,
)
assert len(engine.search_calls) == 0
assert len(engine.cascade_calls) == 1
assert engine.cascade_calls[0]["strategy"] == "dense_rerank"
assert run.effective_method == "dense_rerank"
assert run.execution_method == "cascade"
assert run.hit_at_k is True
assert config.cascade_strategy == "staged"
assert config.staged_stage2_mode == "static_global_graph"
def test_summarize_runs_tracks_effective_method_and_artifact_pressure() -> None:
summary = benchmark._summarize_runs(
[
benchmark.StrategyRun(
strategy_key="auto",
strategy="auto",
stage2_mode=None,
effective_method="fts",
execution_method="fts",
latency_ms=10.0,
topk_paths=["a"],
first_hit_rank=1,
hit_at_k=True,
recall_at_k=1.0,
generated_artifact_count=1,
test_file_count=0,
error=None,
),
benchmark.StrategyRun(
strategy_key="auto",
strategy="auto",
stage2_mode=None,
effective_method="hybrid",
execution_method="hybrid",
latency_ms=30.0,
topk_paths=["b"],
first_hit_rank=None,
hit_at_k=False,
recall_at_k=0.0,
generated_artifact_count=0,
test_file_count=2,
error=None,
),
]
)
assert summary["effective_methods"] == {"fts": 1, "hybrid": 1}
assert summary["runs_with_generated_artifacts"] == 1
assert summary["runs_with_test_files"] == 1
assert summary["avg_generated_artifact_count"] == 0.5
assert summary["avg_test_file_count"] == 1.0

View File

@@ -0,0 +1,83 @@
"""Unit tests for Config .env overrides for final search ranking penalties."""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from codexlens.config import Config
@pytest.fixture
def temp_config_dir() -> Path:
"""Create temporary directory for config data_dir."""
tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
yield Path(tmpdir.name)
try:
tmpdir.cleanup()
except (PermissionError, OSError):
pass
def test_search_penalty_env_overrides_apply(temp_config_dir: Path) -> None:
config = Config(data_dir=temp_config_dir)
env_path = temp_config_dir / ".env"
env_path.write_text(
"\n".join(
[
"TEST_FILE_PENALTY=0.25",
"GENERATED_FILE_PENALTY=0.4",
"",
]
),
encoding="utf-8",
)
config.load_settings()
assert config.test_file_penalty == 0.25
assert config.generated_file_penalty == 0.4
def test_reranker_gpu_env_override_apply(temp_config_dir: Path) -> None:
config = Config(data_dir=temp_config_dir)
env_path = temp_config_dir / ".env"
env_path.write_text(
"\n".join(
[
"RERANKER_USE_GPU=false",
"",
]
),
encoding="utf-8",
)
config.load_settings()
assert config.reranker_use_gpu is False
def test_search_penalty_env_overrides_invalid_ignored(temp_config_dir: Path) -> None:
config = Config(data_dir=temp_config_dir)
env_path = temp_config_dir / ".env"
env_path.write_text(
"\n".join(
[
"TEST_FILE_PENALTY=oops",
"GENERATED_FILE_PENALTY=nope",
"",
]
),
encoding="utf-8",
)
config.load_settings()
assert config.test_file_penalty == 0.15
assert config.generated_file_penalty == 0.35
assert config.reranker_use_gpu is True

View File

@@ -0,0 +1,204 @@
import gc
import gc
import shutil
import sqlite3
import tempfile
import time
from pathlib import Path
import pytest
import codexlens.cli.embedding_manager as embedding_manager
from codexlens.cli.embedding_manager import get_embedding_stats_summary, get_embeddings_status
@pytest.fixture
def status_temp_dir() -> Path:
temp_path = Path(tempfile.mkdtemp())
try:
yield temp_path
finally:
gc.collect()
for _ in range(5):
try:
if temp_path.exists():
shutil.rmtree(temp_path)
break
except PermissionError:
time.sleep(0.1)
def _create_index_db(index_path: Path, files: list[str], embedded_files: list[str] | None = None) -> None:
index_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(index_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE files (
id INTEGER PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
content TEXT,
language TEXT,
hash TEXT
)
"""
)
cursor.executemany(
"INSERT INTO files (path, content, language, hash) VALUES (?, ?, ?, ?)",
[(file_path, "", "python", f"hash-{idx}") for idx, file_path in enumerate(files)],
)
if embedded_files is not None:
cursor.execute(
"""
CREATE TABLE semantic_chunks (
id INTEGER PRIMARY KEY,
file_path TEXT NOT NULL,
content TEXT,
embedding BLOB,
metadata TEXT,
category TEXT
)
"""
)
cursor.executemany(
"INSERT INTO semantic_chunks (file_path, content, embedding, metadata, category) VALUES (?, ?, ?, ?, ?)",
[(file_path, "chunk", b"vec", "{}", "code") for file_path in embedded_files],
)
conn.commit()
def _create_vectors_meta_db(meta_path: Path, embedded_files: list[str], binary_vector_count: int = 0) -> None:
meta_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(meta_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE chunk_metadata (
chunk_id INTEGER PRIMARY KEY,
file_path TEXT NOT NULL,
content TEXT,
start_line INTEGER,
end_line INTEGER,
category TEXT,
metadata TEXT,
source_index_db TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE binary_vectors (
chunk_id INTEGER PRIMARY KEY,
vector BLOB NOT NULL
)
"""
)
cursor.executemany(
"""
INSERT INTO chunk_metadata (
chunk_id, file_path, content, start_line, end_line, category, metadata, source_index_db
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(idx, file_path, "chunk", 1, 1, "code", "{}", str(meta_path.parent / "_index.db"))
for idx, file_path in enumerate(embedded_files, start=1)
],
)
cursor.executemany(
"INSERT INTO binary_vectors (chunk_id, vector) VALUES (?, ?)",
[(idx, b"\x01") for idx in range(1, binary_vector_count + 1)],
)
conn.commit()
def test_root_status_does_not_inherit_child_embeddings(
monkeypatch: pytest.MonkeyPatch, status_temp_dir: Path
) -> None:
workspace = status_temp_dir / "workspace"
workspace.mkdir()
_create_index_db(workspace / "_index.db", ["a.py", "b.py"])
_create_index_db(workspace / "child" / "_index.db", ["child.py"], embedded_files=["child.py"])
monkeypatch.setattr(
embedding_manager,
"_get_model_info_from_index",
lambda index_path: {
"model_profile": "fast",
"model_name": "unit-test-model",
"embedding_dim": 384,
"backend": "fastembed",
"created_at": "2026-03-13T00:00:00Z",
"updated_at": "2026-03-13T00:00:00Z",
} if index_path.parent.name == "child" else None,
)
status = get_embeddings_status(workspace)
assert status["success"] is True
result = status["result"]
assert result["coverage_percent"] == 0.0
assert result["files_with_embeddings"] == 0
assert result["root"]["has_embeddings"] is False
assert result["model_info"] is None
assert result["subtree"]["indexes_with_embeddings"] == 1
assert result["subtree"]["coverage_percent"] > 0
def test_root_status_uses_validated_centralized_metadata(status_temp_dir: Path) -> None:
workspace = status_temp_dir / "workspace"
workspace.mkdir()
_create_index_db(workspace / "_index.db", ["a.py", "b.py"])
_create_vectors_meta_db(workspace / "_vectors_meta.db", ["a.py"])
(workspace / "_vectors.hnsw").write_bytes(b"hnsw")
status = get_embeddings_status(workspace)
assert status["success"] is True
result = status["result"]
assert result["coverage_percent"] == 50.0
assert result["files_with_embeddings"] == 1
assert result["total_chunks"] == 1
assert result["root"]["has_embeddings"] is True
assert result["root"]["storage_mode"] == "centralized"
assert result["centralized"]["dense_ready"] is True
assert result["centralized"]["usable"] is True
def test_embedding_stats_summary_skips_ignored_artifact_indexes(status_temp_dir: Path) -> None:
workspace = status_temp_dir / "workspace"
workspace.mkdir()
_create_index_db(workspace / "_index.db", ["root.py"])
_create_index_db(workspace / "src" / "_index.db", ["src.py"])
_create_index_db(workspace / "dist" / "_index.db", ["bundle.py"], embedded_files=["bundle.py"])
_create_index_db(workspace / ".workflow" / "_index.db", ["trace.py"], embedded_files=["trace.py"])
summary = get_embedding_stats_summary(workspace)
assert summary["success"] is True
result = summary["result"]
assert result["total_indexes"] == 2
assert {Path(item["path"]).relative_to(workspace).as_posix() for item in result["indexes"]} == {
"_index.db",
"src/_index.db",
}
def test_root_status_ignores_empty_centralized_artifacts(status_temp_dir: Path) -> None:
workspace = status_temp_dir / "workspace"
workspace.mkdir()
_create_index_db(workspace / "_index.db", ["a.py", "b.py"])
_create_vectors_meta_db(workspace / "_vectors_meta.db", [])
(workspace / "_vectors.hnsw").write_bytes(b"hnsw")
(workspace / "_binary_vectors.mmap").write_bytes(b"mmap")
status = get_embeddings_status(workspace)
assert status["success"] is True
result = status["result"]
assert result["coverage_percent"] == 0.0
assert result["files_with_embeddings"] == 0
assert result["root"]["has_embeddings"] is False
assert result["centralized"]["chunk_metadata_rows"] == 0
assert result["centralized"]["binary_vector_rows"] == 0
assert result["centralized"]["usable"] is False

View File

@@ -833,6 +833,36 @@ class TestHybridSearchAdaptiveWeights:
assert captured["weights"]["vector"] > 0.6
def test_default_engine_weights_keep_lsp_graph_backend_available(self):
"""Legacy public defaults should not discard LSP graph fusion weights internally."""
from unittest.mock import patch
engine = HybridSearchEngine()
results_map = {
"exact": [SearchResult(path="a.py", score=10.0, excerpt="a")],
"fuzzy": [SearchResult(path="b.py", score=9.0, excerpt="b")],
"vector": [SearchResult(path="c.py", score=0.9, excerpt="c")],
"lsp_graph": [SearchResult(path="d.py", score=0.8, excerpt="d")],
}
captured = {}
from codexlens.search import ranking as ranking_module
def capture_rrf(map_in, weights_in, k=60):
captured["weights"] = dict(weights_in)
return ranking_module.reciprocal_rank_fusion(map_in, weights_in, k=k)
with patch.object(HybridSearchEngine, "_search_parallel", return_value=results_map), patch(
"codexlens.search.hybrid_search.reciprocal_rank_fusion",
side_effect=capture_rrf,
):
engine.search(Path("dummy.db"), "auth flow", enable_vector=True, enable_lsp_graph=True)
assert engine.weights == HybridSearchEngine.DEFAULT_WEIGHTS
assert "lsp_graph" in captured["weights"]
assert captured["weights"]["lsp_graph"] > 0.0
def test_reranking_enabled(self, tmp_path):
"""Reranking runs only when explicitly enabled via config."""
from unittest.mock import patch

View File

@@ -93,7 +93,8 @@ def test_get_cross_encoder_reranker_uses_factory_backend_onnx_gpu_flag(
enable_reranking=True,
enable_cross_encoder_rerank=True,
reranker_backend="onnx",
embedding_use_gpu=False,
embedding_use_gpu=True,
reranker_use_gpu=False,
)
engine = HybridSearchEngine(config=config)
@@ -109,6 +110,58 @@ def test_get_cross_encoder_reranker_uses_factory_backend_onnx_gpu_flag(
assert get_args["kwargs"]["use_gpu"] is False
def test_get_cross_encoder_reranker_uses_cpu_device_for_legacy_when_reranker_gpu_disabled(
monkeypatch: pytest.MonkeyPatch,
tmp_path,
) -> None:
calls: dict[str, object] = {}
def fake_check_reranker_available(backend: str):
calls["check_backend"] = backend
return True, None
sentinel = object()
def fake_get_reranker(*, backend: str, model_name=None, device=None, **kwargs):
calls["get_args"] = {
"backend": backend,
"model_name": model_name,
"device": device,
"kwargs": kwargs,
}
return sentinel
monkeypatch.setattr(
"codexlens.semantic.reranker.check_reranker_available",
fake_check_reranker_available,
)
monkeypatch.setattr(
"codexlens.semantic.reranker.get_reranker",
fake_get_reranker,
)
config = Config(
data_dir=tmp_path / "legacy-cpu",
enable_reranking=True,
enable_cross_encoder_rerank=True,
reranker_backend="legacy",
reranker_model="dummy-model",
embedding_use_gpu=True,
reranker_use_gpu=False,
)
engine = HybridSearchEngine(config=config)
reranker = engine._get_cross_encoder_reranker()
assert reranker is sentinel
assert calls["check_backend"] == "legacy"
get_args = calls["get_args"]
assert isinstance(get_args, dict)
assert get_args["backend"] == "legacy"
assert get_args["model_name"] == "dummy-model"
assert get_args["device"] == "cpu"
def test_get_cross_encoder_reranker_returns_none_when_backend_unavailable(
monkeypatch: pytest.MonkeyPatch,
tmp_path,

View File

@@ -150,6 +150,30 @@ class TestHybridSearchBackends:
assert "exact" in backends
assert "vector" in backends
def test_search_lexical_priority_query_skips_vector_backend(self, temp_paths, mock_config):
"""Config/env/factory queries should stay lexical-first in hybrid mode."""
engine = HybridSearchEngine(config=mock_config)
index_path = temp_paths / "_index.db"
with patch.object(engine, "_search_parallel") as mock_parallel:
mock_parallel.return_value = {
"exact": [SearchResult(path="config.py", score=10.0, excerpt="exact")],
"fuzzy": [SearchResult(path="env_config.py", score=8.0, excerpt="fuzzy")],
}
results = engine.search(
index_path,
"embedding backend fastembed local litellm api config",
enable_fuzzy=True,
enable_vector=True,
)
assert len(results) >= 1
backends = mock_parallel.call_args[0][2]
assert "exact" in backends
assert "fuzzy" in backends
assert "vector" not in backends
def test_search_pure_vector(self, temp_paths, mock_config):
"""Pure vector mode should only use vector backend."""
engine = HybridSearchEngine(config=mock_config)
@@ -257,6 +281,39 @@ class TestHybridSearchFusion:
mock_rerank.assert_called_once()
def test_search_lexical_priority_query_skips_expensive_reranking(self, temp_paths, mock_config):
"""Lexical-priority queries should bypass embedder and cross-encoder reranking."""
mock_config.enable_reranking = True
mock_config.enable_cross_encoder_rerank = True
mock_config.reranking_top_k = 50
mock_config.reranker_top_k = 20
engine = HybridSearchEngine(config=mock_config)
index_path = temp_paths / "_index.db"
with patch.object(engine, "_search_parallel") as mock_parallel:
mock_parallel.return_value = {
"exact": [SearchResult(path="config.py", score=10.0, excerpt="code")],
"fuzzy": [SearchResult(path="env_config.py", score=9.0, excerpt="env vars")],
}
with patch("codexlens.search.hybrid_search.rerank_results") as mock_rerank, patch(
"codexlens.search.hybrid_search.cross_encoder_rerank"
) as mock_cross_encoder, patch.object(
engine,
"_get_cross_encoder_reranker",
) as mock_get_reranker:
results = engine.search(
index_path,
"get_reranker factory onnx backend selection",
enable_fuzzy=True,
enable_vector=True,
)
assert len(results) >= 1
mock_rerank.assert_not_called()
mock_cross_encoder.assert_not_called()
mock_get_reranker.assert_not_called()
def test_search_category_filtering(self, temp_paths, mock_config):
"""Category filtering should separate code/doc results by intent."""
mock_config.enable_category_filter = True
@@ -316,6 +373,217 @@ class TestSearchParallel:
mock_fuzzy.assert_called_once()
class TestCentralizedMetadataFetch:
"""Tests for centralized metadata retrieval helpers."""
def test_fetch_from_vector_meta_store_clamps_negative_scores(self, temp_paths, mock_config, monkeypatch):
engine = HybridSearchEngine(config=mock_config)
class FakeMetaStore:
def __init__(self, _path):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get_chunks_by_ids(self, _chunk_ids, category=None):
assert category is None
return [
{
"chunk_id": 7,
"file_path": "src/app.py",
"content": "def app(): pass",
"metadata": {},
"start_line": 1,
"end_line": 1,
}
]
import codexlens.storage.vector_meta_store as vector_meta_store
monkeypatch.setattr(vector_meta_store, "VectorMetadataStore", FakeMetaStore)
results = engine._fetch_from_vector_meta_store(
temp_paths / "_vectors_meta.db",
[7],
{7: -0.01},
)
assert len(results) == 1
assert results[0].path == "src/app.py"
assert results[0].score == 0.0
class TestCentralizedVectorCaching:
"""Tests for centralized vector search runtime caches."""
def test_search_vector_centralized_reuses_cached_resources(
self,
temp_paths,
mock_config,
):
engine = HybridSearchEngine(config=mock_config)
hnsw_path = temp_paths / "_vectors.hnsw"
hnsw_path.write_bytes(b"hnsw")
vector_store_opened: List[Path] = []
class FakeVectorStore:
def __init__(self, path):
vector_store_opened.append(Path(path))
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get_model_config(self):
return {
"backend": "fastembed",
"model_name": "BAAI/bge-small-en-v1.5",
"model_profile": "fast",
"embedding_dim": 384,
}
class FakeEmbedder:
embedding_dim = 384
def __init__(self):
self.embed_calls: List[str] = []
def embed_single(self, query):
self.embed_calls.append(query)
return [0.1, 0.2, 0.3]
class FakeAnnIndex:
def __init__(self):
self.load_calls = 0
self.search_calls = 0
def load(self):
self.load_calls += 1
return True
def count(self):
return 3
def search(self, _query_vec, top_k):
self.search_calls += 1
assert top_k == 10
return [7], [0.2]
fake_embedder = FakeEmbedder()
fake_ann_index = FakeAnnIndex()
with patch("codexlens.semantic.vector_store.VectorStore", FakeVectorStore), patch(
"codexlens.semantic.factory.get_embedder",
return_value=fake_embedder,
) as mock_get_embedder, patch(
"codexlens.semantic.ann_index.ANNIndex.create_central",
return_value=fake_ann_index,
) as mock_create_central, patch.object(
engine,
"_fetch_chunks_by_ids_centralized",
return_value=[SearchResult(path="src/app.py", score=0.8, excerpt="hit")],
) as mock_fetch:
first = engine._search_vector_centralized(
temp_paths / "child-a" / "_index.db",
hnsw_path,
"smart search routing",
limit=5,
)
second = engine._search_vector_centralized(
temp_paths / "child-b" / "_index.db",
hnsw_path,
"smart search routing",
limit=5,
)
assert [result.path for result in first] == ["src/app.py"]
assert [result.path for result in second] == ["src/app.py"]
assert vector_store_opened == [temp_paths / "_index.db"]
assert mock_get_embedder.call_count == 1
assert mock_create_central.call_count == 1
assert fake_ann_index.load_calls == 1
assert fake_embedder.embed_calls == ["smart search routing"]
assert fake_ann_index.search_calls == 2
assert mock_fetch.call_count == 2
def test_search_vector_centralized_respects_embedding_use_gpu(
self,
temp_paths,
mock_config,
):
engine = HybridSearchEngine(config=mock_config)
hnsw_path = temp_paths / "_vectors.hnsw"
hnsw_path.write_bytes(b"hnsw")
class FakeVectorStore:
def __init__(self, _path):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get_model_config(self):
return {
"backend": "fastembed",
"model_name": "BAAI/bge-small-en-v1.5",
"model_profile": "code",
"embedding_dim": 384,
}
class FakeEmbedder:
embedding_dim = 384
def embed_single(self, _query):
return [0.1, 0.2]
class FakeAnnIndex:
def load(self):
return True
def count(self):
return 1
def search(self, _query_vec, top_k):
assert top_k == 6
return [9], [0.1]
with patch("codexlens.semantic.vector_store.VectorStore", FakeVectorStore), patch(
"codexlens.semantic.factory.get_embedder",
return_value=FakeEmbedder(),
) as mock_get_embedder, patch(
"codexlens.semantic.ann_index.ANNIndex.create_central",
return_value=FakeAnnIndex(),
), patch.object(
engine,
"_fetch_chunks_by_ids_centralized",
return_value=[SearchResult(path="src/app.py", score=0.9, excerpt="hit")],
):
results = engine._search_vector_centralized(
temp_paths / "_index.db",
hnsw_path,
"semantic query",
limit=3,
)
assert len(results) == 1
assert mock_get_embedder.call_count == 1
assert mock_get_embedder.call_args.kwargs == {
"backend": "fastembed",
"profile": "code",
"use_gpu": False,
}
# =============================================================================
# Tests: _search_lsp_graph
# =============================================================================

View File

@@ -0,0 +1,674 @@
import json
from typer.testing import CliRunner
import codexlens.cli.commands as commands
from codexlens.cli.commands import app
import codexlens.cli.embedding_manager as embedding_manager
from codexlens.config import Config
from codexlens.entities import SearchResult
from codexlens.search.chain_search import ChainSearchResult, SearchStats
def test_index_status_json_preserves_legacy_embeddings_contract(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "_index.db").touch()
legacy_summary = {
"total_indexes": 3,
"indexes_with_embeddings": 1,
"total_chunks": 42,
"indexes": [
{
"project": "child",
"path": str(workspace / "child" / "_index.db"),
"has_embeddings": True,
"total_chunks": 42,
"total_files": 1,
"coverage_percent": 100.0,
}
],
}
root_status = {
"total_indexes": 3,
"total_files": 2,
"files_with_embeddings": 0,
"files_without_embeddings": 2,
"total_chunks": 0,
"coverage_percent": 0.0,
"indexes_with_embeddings": 1,
"indexes_without_embeddings": 2,
"model_info": None,
"root": {
"index_path": str(workspace / "_index.db"),
"exists": False,
"total_files": 2,
"files_with_embeddings": 0,
"files_without_embeddings": 2,
"total_chunks": 0,
"coverage_percent": 0.0,
"has_embeddings": False,
"storage_mode": "none",
},
"subtree": {
"total_indexes": 3,
"total_files": 3,
"files_with_embeddings": 1,
"files_without_embeddings": 2,
"total_chunks": 42,
"coverage_percent": 33.3,
"indexes_with_embeddings": 1,
"indexes_without_embeddings": 2,
},
"centralized": {
"dense_index_exists": False,
"binary_index_exists": False,
"dense_ready": False,
"binary_ready": False,
"usable": False,
"chunk_metadata_rows": 0,
"binary_vector_rows": 0,
"files_with_embeddings": 0,
},
}
monkeypatch.setattr(
embedding_manager,
"get_embeddings_status",
lambda _index_root: {"success": True, "result": root_status},
)
monkeypatch.setattr(
embedding_manager,
"get_embedding_stats_summary",
lambda _index_root: {"success": True, "result": legacy_summary},
)
monkeypatch.setattr(
commands,
"RegistryStore",
type(
"FakeRegistryStore",
(),
{
"initialize": lambda self: None,
"close": lambda self: None,
},
),
)
monkeypatch.setattr(
commands,
"PathMapper",
type(
"FakePathMapper",
(),
{
"source_to_index_db": lambda self, _target_path: workspace / "_index.db",
},
),
)
runner = CliRunner()
result = runner.invoke(app, ["index", "status", str(workspace), "--json"])
assert result.exit_code == 0, result.output
payload = json.loads(result.stdout)
body = payload["result"]
assert body["embeddings"] == legacy_summary
assert body["embeddings_error"] is None
assert body["embeddings_status"] == root_status
assert body["embeddings_status_error"] is None
assert body["embeddings_summary"] == legacy_summary
def test_search_json_preserves_dense_rerank_method_label(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
search_result = ChainSearchResult(
query="greet function",
results=[
SearchResult(
path=str(workspace / "src" / "app.py"),
score=0.97,
excerpt="def greet(name):",
content="def greet(name):\n return f'hello {name}'\n",
)
],
symbols=[],
stats=SearchStats(dirs_searched=2, files_matched=1, time_ms=12.5),
)
captured: dict[str, object] = {}
monkeypatch.setattr(commands.Config, "load", staticmethod(lambda: Config(data_dir=tmp_path / "data")))
monkeypatch.setattr(
commands,
"RegistryStore",
type(
"FakeRegistryStore",
(),
{
"initialize": lambda self: None,
"close": lambda self: None,
},
),
)
monkeypatch.setattr(
commands,
"PathMapper",
type(
"FakePathMapper",
(),
{},
),
)
class FakeChainSearchEngine:
def __init__(self, registry, mapper, config=None):
captured["registry"] = registry
captured["mapper"] = mapper
captured["config"] = config
def search(self, *_args, **_kwargs):
raise AssertionError("dense_rerank should dispatch via cascade_search")
def cascade_search(self, query, source_path, k=10, options=None, strategy=None):
captured["query"] = query
captured["source_path"] = source_path
captured["limit"] = k
captured["options"] = options
captured["strategy"] = strategy
return search_result
monkeypatch.setattr(commands, "ChainSearchEngine", FakeChainSearchEngine)
runner = CliRunner()
result = runner.invoke(
app,
["search", "greet function", "--path", str(workspace), "--method", "dense_rerank", "--json"],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.stdout)
body = payload["result"]
assert body["method"] == "dense_rerank"
assert body["count"] == 1
assert body["results"][0]["path"] == str(workspace / "src" / "app.py")
assert captured["strategy"] == "dense_rerank"
assert captured["limit"] == 20
def test_search_json_auto_routes_keyword_queries_to_fts(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
search_result = ChainSearchResult(
query="windowsHide",
results=[
SearchResult(
path=str(workspace / "src" / "spawn.ts"),
score=0.91,
excerpt="windowsHide: true",
content="spawn('node', [], { windowsHide: true })",
)
],
symbols=[],
stats=SearchStats(dirs_searched=2, files_matched=1, time_ms=8.0),
)
captured: dict[str, object] = {}
monkeypatch.setattr(commands.Config, "load", staticmethod(lambda: Config(data_dir=tmp_path / "data")))
monkeypatch.setattr(
commands,
"RegistryStore",
type("FakeRegistryStore", (), {"initialize": lambda self: None, "close": lambda self: None}),
)
monkeypatch.setattr(commands, "PathMapper", type("FakePathMapper", (), {}))
class FakeChainSearchEngine:
def __init__(self, registry, mapper, config=None):
captured["config"] = config
def search(self, query, source_path, options=None):
captured["query"] = query
captured["source_path"] = source_path
captured["options"] = options
return search_result
def cascade_search(self, *_args, **_kwargs):
raise AssertionError("auto keyword queries should not dispatch to cascade_search")
monkeypatch.setattr(commands, "ChainSearchEngine", FakeChainSearchEngine)
runner = CliRunner()
result = runner.invoke(
app,
["search", "windowsHide", "--path", str(workspace), "--json"],
)
assert result.exit_code == 0, result.output
body = json.loads(result.stdout)["result"]
assert body["method"] == "fts"
assert captured["options"].enable_vector is False
assert captured["options"].hybrid_mode is False
def test_search_json_auto_routes_mixed_queries_to_hybrid(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
search_result = ChainSearchResult(
query="how does my_function work",
results=[
SearchResult(
path=str(workspace / "src" / "app.py"),
score=0.81,
excerpt="def my_function():",
content="def my_function():\n return 1\n",
)
],
symbols=[],
stats=SearchStats(dirs_searched=2, files_matched=1, time_ms=10.0),
)
captured: dict[str, object] = {}
monkeypatch.setattr(commands.Config, "load", staticmethod(lambda: Config(data_dir=tmp_path / "data")))
monkeypatch.setattr(
commands,
"RegistryStore",
type("FakeRegistryStore", (), {"initialize": lambda self: None, "close": lambda self: None}),
)
monkeypatch.setattr(commands, "PathMapper", type("FakePathMapper", (), {}))
class FakeChainSearchEngine:
def __init__(self, registry, mapper, config=None):
captured["config"] = config
def search(self, query, source_path, options=None):
captured["query"] = query
captured["source_path"] = source_path
captured["options"] = options
return search_result
def cascade_search(self, *_args, **_kwargs):
raise AssertionError("mixed auto queries should not dispatch to cascade_search")
monkeypatch.setattr(commands, "ChainSearchEngine", FakeChainSearchEngine)
runner = CliRunner()
result = runner.invoke(
app,
["search", "how does my_function work", "--path", str(workspace), "--json"],
)
assert result.exit_code == 0, result.output
body = json.loads(result.stdout)["result"]
assert body["method"] == "hybrid"
assert captured["options"].enable_vector is True
assert captured["options"].hybrid_mode is True
assert captured["options"].enable_cascade is False
def test_search_json_auto_routes_generated_artifact_queries_to_fts(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
search_result = ChainSearchResult(
query="dist bundle output",
results=[
SearchResult(
path=str(workspace / "dist" / "bundle.js"),
score=0.77,
excerpt="bundle output",
content="console.log('bundle')",
)
],
symbols=[],
stats=SearchStats(dirs_searched=2, files_matched=1, time_ms=9.0),
)
captured: dict[str, object] = {}
monkeypatch.setattr(commands.Config, "load", staticmethod(lambda: Config(data_dir=tmp_path / "data")))
monkeypatch.setattr(
commands,
"RegistryStore",
type("FakeRegistryStore", (), {"initialize": lambda self: None, "close": lambda self: None}),
)
monkeypatch.setattr(commands, "PathMapper", type("FakePathMapper", (), {}))
class FakeChainSearchEngine:
def __init__(self, registry, mapper, config=None):
captured["config"] = config
def search(self, query, source_path, options=None):
captured["query"] = query
captured["source_path"] = source_path
captured["options"] = options
return search_result
def cascade_search(self, *_args, **_kwargs):
raise AssertionError("generated artifact auto queries should not dispatch to cascade_search")
monkeypatch.setattr(commands, "ChainSearchEngine", FakeChainSearchEngine)
runner = CliRunner()
result = runner.invoke(
app,
["search", "dist bundle output", "--path", str(workspace), "--json"],
)
assert result.exit_code == 0, result.output
body = json.loads(result.stdout)["result"]
assert body["method"] == "fts"
assert captured["options"].enable_vector is False
assert captured["options"].hybrid_mode is False
def test_auto_select_search_method_prefers_fts_for_lexical_config_queries() -> None:
assert commands._auto_select_search_method("embedding backend fastembed local litellm api config") == "fts"
assert commands._auto_select_search_method("get_reranker factory onnx backend selection") == "fts"
assert commands._auto_select_search_method("how to authenticate users safely?") == "dense_rerank"
def test_search_json_fts_zero_results_uses_filesystem_fallback(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
indexed_result = ChainSearchResult(
query="find_descendant_project_roots",
results=[],
symbols=[],
stats=SearchStats(dirs_searched=3, files_matched=0, time_ms=7.5),
)
fallback_result = SearchResult(
path=str(workspace / "src" / "registry.py"),
score=1.0,
excerpt="def find_descendant_project_roots(...):",
content=None,
metadata={
"filesystem_fallback": True,
"backend": "ripgrep-fallback",
"stale_index_suspected": True,
},
start_line=12,
end_line=12,
)
captured: dict[str, object] = {"fallback_calls": 0}
monkeypatch.setattr(commands.Config, "load", staticmethod(lambda: Config(data_dir=tmp_path / "data")))
monkeypatch.setattr(
commands,
"RegistryStore",
type("FakeRegistryStore", (), {"initialize": lambda self: None, "close": lambda self: None}),
)
monkeypatch.setattr(commands, "PathMapper", type("FakePathMapper", (), {}))
class FakeChainSearchEngine:
def __init__(self, registry, mapper, config=None):
captured["config"] = config
def search(self, query, source_path, options=None):
captured["query"] = query
captured["source_path"] = source_path
captured["options"] = options
return indexed_result
def cascade_search(self, *_args, **_kwargs):
raise AssertionError("fts zero-result queries should not dispatch to cascade_search")
def fake_fallback(query, source_path, *, limit, config, code_only=False, exclude_extensions=None):
captured["fallback_calls"] = int(captured["fallback_calls"]) + 1
captured["fallback_query"] = query
captured["fallback_path"] = source_path
captured["fallback_limit"] = limit
captured["fallback_code_only"] = code_only
captured["fallback_exclude_extensions"] = exclude_extensions
return {
"results": [fallback_result],
"time_ms": 2.5,
"fallback": {
"backend": "ripgrep-fallback",
"stale_index_suspected": True,
"reason": "Indexed FTS search returned no results; filesystem fallback used.",
},
}
monkeypatch.setattr(commands, "ChainSearchEngine", FakeChainSearchEngine)
monkeypatch.setattr(commands, "_filesystem_fallback_search", fake_fallback)
runner = CliRunner()
result = runner.invoke(
app,
["search", "find_descendant_project_roots", "--method", "fts", "--path", str(workspace), "--json"],
)
assert result.exit_code == 0, result.output
body = json.loads(result.stdout)["result"]
assert body["method"] == "fts"
assert body["count"] == 1
assert body["results"][0]["path"] == str(workspace / "src" / "registry.py")
assert body["results"][0]["excerpt"] == "def find_descendant_project_roots(...):"
assert body["stats"]["files_matched"] == 1
assert body["stats"]["time_ms"] == 10.0
assert body["fallback"] == {
"backend": "ripgrep-fallback",
"stale_index_suspected": True,
"reason": "Indexed FTS search returned no results; filesystem fallback used.",
}
assert captured["fallback_calls"] == 1
assert captured["fallback_query"] == "find_descendant_project_roots"
assert captured["fallback_path"] == workspace
assert captured["fallback_limit"] == 20
assert captured["options"].enable_vector is False
assert captured["options"].hybrid_mode is False
def test_search_json_hybrid_zero_results_does_not_use_filesystem_fallback(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
indexed_result = ChainSearchResult(
query="how does my_function work",
results=[],
symbols=[],
stats=SearchStats(dirs_searched=4, files_matched=0, time_ms=11.0),
)
captured: dict[str, object] = {"fallback_calls": 0}
monkeypatch.setattr(commands.Config, "load", staticmethod(lambda: Config(data_dir=tmp_path / "data")))
monkeypatch.setattr(
commands,
"RegistryStore",
type("FakeRegistryStore", (), {"initialize": lambda self: None, "close": lambda self: None}),
)
monkeypatch.setattr(commands, "PathMapper", type("FakePathMapper", (), {}))
class FakeChainSearchEngine:
def __init__(self, registry, mapper, config=None):
captured["config"] = config
def search(self, query, source_path, options=None):
captured["query"] = query
captured["source_path"] = source_path
captured["options"] = options
return indexed_result
def cascade_search(self, *_args, **_kwargs):
raise AssertionError("hybrid queries should not dispatch to cascade_search")
def fake_fallback(*_args, **_kwargs):
captured["fallback_calls"] = int(captured["fallback_calls"]) + 1
return None
monkeypatch.setattr(commands, "ChainSearchEngine", FakeChainSearchEngine)
monkeypatch.setattr(commands, "_filesystem_fallback_search", fake_fallback)
runner = CliRunner()
result = runner.invoke(
app,
["search", "how does my_function work", "--path", str(workspace), "--json"],
)
assert result.exit_code == 0, result.output
body = json.loads(result.stdout)["result"]
assert body["method"] == "hybrid"
assert body["count"] == 0
assert "fallback" not in body
assert body["stats"]["files_matched"] == 0
assert body["stats"]["time_ms"] == 11.0
assert captured["fallback_calls"] == 0
assert captured["options"].enable_vector is True
assert captured["options"].hybrid_mode is True
def test_filesystem_fallback_search_prefers_source_definitions_for_keyword_queries(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
source_path = workspace / "src" / "registry.py"
test_path = workspace / "tests" / "test_registry.py"
ref_path = workspace / "src" / "chain_search.py"
match_lines = [
{
"type": "match",
"data": {
"path": {"text": str(test_path)},
"lines": {"text": "def test_find_descendant_project_roots_returns_nested_project_roots():\n"},
"line_number": 12,
},
},
{
"type": "match",
"data": {
"path": {"text": str(source_path)},
"lines": {"text": "def find_descendant_project_roots(self, source_root: Path) -> List[DirMapping]:\n"},
"line_number": 48,
},
},
{
"type": "match",
"data": {
"path": {"text": str(ref_path)},
"lines": {"text": "descendant_roots = self.registry.find_descendant_project_roots(source_root)\n"},
"line_number": 91,
},
},
]
monkeypatch.setattr(commands.shutil, "which", lambda _name: "rg")
monkeypatch.setattr(
commands.subprocess,
"run",
lambda *_args, **_kwargs: type(
"FakeCompletedProcess",
(),
{
"returncode": 0,
"stdout": "\n".join(json.dumps(line) for line in match_lines),
"stderr": "",
},
)(),
)
fallback = commands._filesystem_fallback_search(
"find_descendant_project_roots",
workspace,
limit=5,
config=Config(data_dir=tmp_path / "data"),
)
assert fallback is not None
assert fallback["fallback"]["backend"] == "ripgrep-fallback"
assert fallback["results"][0].path == str(source_path)
assert fallback["results"][1].path == str(ref_path)
assert fallback["results"][2].path == str(test_path)
assert fallback["results"][0].score > fallback["results"][1].score > fallback["results"][2].score
def test_clean_json_reports_partial_success_when_locked_files_remain(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
project_index = tmp_path / "indexes" / "workspace"
project_index.mkdir(parents=True)
(project_index / "_index.db").write_text("db", encoding="utf-8")
locked_path = project_index / "nested" / "_index.db"
locked_path.parent.mkdir(parents=True)
locked_path.write_text("locked", encoding="utf-8")
captured: dict[str, object] = {}
class FakePathMapper:
def __init__(self):
self.index_root = tmp_path / "indexes"
def source_to_index_dir(self, source_path):
captured["mapped_source"] = source_path
return project_index
class FakeRegistryStore:
def initialize(self):
captured["registry_initialized"] = True
def unregister_project(self, source_path):
captured["unregistered_project"] = source_path
return True
def close(self):
captured["registry_closed"] = True
def fake_remove_tree(target):
captured["removed_target"] = target
return {
"removed": False,
"partial": True,
"locked_paths": [str(locked_path)],
"remaining_path": str(project_index),
"errors": [],
}
monkeypatch.setattr(commands, "PathMapper", FakePathMapper)
monkeypatch.setattr(commands, "RegistryStore", FakeRegistryStore)
monkeypatch.setattr(commands, "_remove_tree_best_effort", fake_remove_tree)
runner = CliRunner()
result = runner.invoke(app, ["clean", str(workspace), "--json"])
assert result.exit_code == 0, result.output
payload = json.loads(result.stdout)
body = payload["result"]
assert payload["success"] is True
assert body["cleaned"] == str(workspace.resolve())
assert body["index_path"] == str(project_index)
assert body["partial"] is True
assert body["locked_paths"] == [str(locked_path)]
assert body["remaining_path"] == str(project_index)
assert captured["registry_initialized"] is True
assert captured["registry_closed"] is True
assert captured["unregistered_project"] == workspace.resolve()
assert captured["removed_target"] == project_index

View File

@@ -5,7 +5,10 @@ from pathlib import Path
from unittest.mock import MagicMock
from codexlens.config import Config
from codexlens.storage.index_tree import IndexTreeBuilder
from codexlens.storage.dir_index import DirIndexStore
from codexlens.storage.index_tree import DirBuildResult, IndexTreeBuilder
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.registry import RegistryStore
def _relative_dirs(source_root: Path, dirs_by_depth: dict[int, list[Path]]) -> set[str]:
@@ -145,3 +148,148 @@ def test_builder_loads_saved_ignore_and_extension_filters_by_default(tmp_path: P
assert [path.name for path in source_files] == ["app.ts"]
assert "frontend/dist" not in discovered_dirs
def test_prune_stale_project_dirs_removes_ignored_artifact_mappings(tmp_path: Path) -> None:
workspace = tmp_path / "workspace"
src_dir = workspace / "src"
dist_dir = workspace / "dist"
src_dir.mkdir(parents=True)
dist_dir.mkdir(parents=True)
(src_dir / "app.py").write_text("print('ok')\n", encoding="utf-8")
(dist_dir / "bundle.py").write_text("print('artifact')\n", encoding="utf-8")
mapper = PathMapper(index_root=tmp_path / "indexes")
registry = RegistryStore(db_path=tmp_path / "registry.db")
registry.initialize()
project = registry.register_project(workspace, mapper.source_to_index_dir(workspace))
registry.register_dir(project.id, workspace, mapper.source_to_index_db(workspace), depth=0)
registry.register_dir(project.id, src_dir, mapper.source_to_index_db(src_dir), depth=1)
registry.register_dir(project.id, dist_dir, mapper.source_to_index_db(dist_dir), depth=1)
builder = IndexTreeBuilder(
registry=registry,
mapper=mapper,
config=Config(data_dir=tmp_path / "data"),
incremental=False,
)
dirs_by_depth = builder._collect_dirs_by_depth(workspace)
pruned = builder._prune_stale_project_dirs(
project_id=project.id,
source_root=workspace,
dirs_by_depth=dirs_by_depth,
)
remaining = {mapping.source_path.resolve() for mapping in registry.get_project_dirs(project.id)}
registry.close()
assert dist_dir.resolve() in pruned
assert workspace.resolve() in remaining
assert src_dir.resolve() in remaining
assert dist_dir.resolve() not in remaining
def test_force_full_build_prunes_stale_ignored_mappings(tmp_path: Path) -> None:
workspace = tmp_path / "workspace"
src_dir = workspace / "src"
dist_dir = workspace / "dist"
src_dir.mkdir(parents=True)
dist_dir.mkdir(parents=True)
(src_dir / "app.py").write_text("print('ok')\n", encoding="utf-8")
(dist_dir / "bundle.py").write_text("print('artifact')\n", encoding="utf-8")
mapper = PathMapper(index_root=tmp_path / "indexes")
registry = RegistryStore(db_path=tmp_path / "registry.db")
registry.initialize()
project = registry.register_project(workspace, mapper.source_to_index_dir(workspace))
registry.register_dir(project.id, workspace, mapper.source_to_index_db(workspace), depth=0)
registry.register_dir(project.id, dist_dir, mapper.source_to_index_db(dist_dir), depth=1)
builder = IndexTreeBuilder(
registry=registry,
mapper=mapper,
config=Config(
data_dir=tmp_path / "data",
global_symbol_index_enabled=False,
),
incremental=False,
)
def fake_build_level_parallel(
dirs: list[Path],
languages,
workers,
*,
source_root: Path,
project_id: int,
global_index_db_path: Path,
) -> list[DirBuildResult]:
return [
DirBuildResult(
source_path=dir_path,
index_path=mapper.source_to_index_db(dir_path),
files_count=1 if dir_path == src_dir else 0,
symbols_count=0,
subdirs=[],
)
for dir_path in dirs
]
builder._build_level_parallel = fake_build_level_parallel # type: ignore[method-assign]
builder._link_children_to_parent = MagicMock()
build_result = builder.build(workspace, force_full=True, workers=1)
remaining = {mapping.source_path.resolve() for mapping in registry.get_project_dirs(project.id)}
registry.close()
assert build_result.total_dirs == 2
assert workspace.resolve() in remaining
assert src_dir.resolve() in remaining
assert dist_dir.resolve() not in remaining
def test_force_full_build_rewrites_directory_db_and_drops_stale_ignored_subdirs(
tmp_path: Path,
) -> None:
project_root = tmp_path / "project"
src_dir = project_root / "src"
build_dir = project_root / "build"
src_dir.mkdir(parents=True)
build_dir.mkdir(parents=True)
(src_dir / "app.py").write_text("print('ok')\n", encoding="utf-8")
(build_dir / "generated.py").write_text("print('artifact')\n", encoding="utf-8")
mapper = PathMapper(index_root=tmp_path / "indexes")
registry = RegistryStore(db_path=tmp_path / "registry.db")
registry.initialize()
config = Config(
data_dir=tmp_path / "data",
global_symbol_index_enabled=False,
)
root_index_db = mapper.source_to_index_db(project_root)
with DirIndexStore(root_index_db, config=config) as store:
store.register_subdir(
name="build",
index_path=mapper.source_to_index_db(build_dir),
files_count=1,
)
builder = IndexTreeBuilder(
registry=registry,
mapper=mapper,
config=config,
incremental=False,
)
build_result = builder.build(project_root, force_full=True, workers=1)
with DirIndexStore(root_index_db, config=config) as store:
subdir_names = [link.name for link in store.get_subdirs()]
registry.close()
assert build_result.total_dirs == 2
assert subdir_names == ["src"]

View File

@@ -24,13 +24,24 @@ from codexlens.entities import SearchResult
from codexlens.search.ranking import (
DEFAULT_WEIGHTS,
QueryIntent,
apply_path_penalties,
extract_explicit_path_hints,
cross_encoder_rerank,
adjust_weights_by_intent,
apply_symbol_boost,
detect_query_intent,
filter_results_by_category,
get_rrf_weights,
group_similar_results,
is_auxiliary_reference_path,
is_generated_artifact_path,
is_test_file,
normalize_weights,
query_prefers_lexical_search,
query_targets_auxiliary_files,
query_targets_generated_files,
query_targets_test_files,
rebalance_noisy_results,
reciprocal_rank_fusion,
simple_weighted_fusion,
)
@@ -73,6 +84,7 @@ class TestDetectQueryIntent:
def test_detect_keyword_intent(self):
"""CamelCase/underscore queries should be detected as KEYWORD."""
assert detect_query_intent("MyClassName") == QueryIntent.KEYWORD
assert detect_query_intent("windowsHide") == QueryIntent.KEYWORD
assert detect_query_intent("my_function_name") == QueryIntent.KEYWORD
assert detect_query_intent("foo::bar") == QueryIntent.KEYWORD
@@ -91,6 +103,25 @@ class TestDetectQueryIntent:
assert detect_query_intent("") == QueryIntent.MIXED
assert detect_query_intent(" ") == QueryIntent.MIXED
def test_query_targets_test_files(self):
"""Queries explicitly mentioning tests should skip test penalties."""
assert query_targets_test_files("how do tests cover auth flow?")
assert query_targets_test_files("spec fixtures for parser")
assert not query_targets_test_files("windowsHide")
def test_query_targets_generated_files(self):
"""Queries explicitly mentioning build artifacts should skip that penalty."""
assert query_targets_generated_files("inspect dist bundle output")
assert query_targets_generated_files("generated artifacts under build")
assert not query_targets_generated_files("cache invalidation strategy")
def test_query_prefers_lexical_search(self):
"""Config/env/factory queries should prefer lexical-first routing."""
assert query_prefers_lexical_search("embedding backend fastembed local litellm api config")
assert query_prefers_lexical_search("get_reranker factory onnx backend selection")
assert query_prefers_lexical_search("EMBEDDING_BACKEND and RERANKER_BACKEND environment variables")
assert not query_prefers_lexical_search("how does smart search route keyword queries")
# =============================================================================
# Tests: adjust_weights_by_intent
@@ -129,6 +160,427 @@ class TestAdjustWeightsByIntent:
assert adjusted["exact"] == pytest.approx(0.3, abs=0.01)
class TestPathPenalties:
"""Tests for lightweight path-based ranking penalties."""
def test_is_test_file(self):
assert is_test_file("/repo/tests/test_auth.py")
assert is_test_file("D:\\repo\\src\\auth.spec.ts")
assert is_test_file("/repo/frontend/src/pages/discoverypage.test.tsx")
assert is_test_file("/repo/frontend/src/pages/discoverypage.spec.jsx")
assert not is_test_file("/repo/src/auth.py")
def test_is_generated_artifact_path(self):
assert is_generated_artifact_path("/repo/dist/app.js")
assert is_generated_artifact_path("/repo/src/generated/client.ts")
assert is_generated_artifact_path("D:\\repo\\frontend\\.next\\server.js")
assert not is_generated_artifact_path("/repo/src/auth.py")
def test_is_auxiliary_reference_path(self):
assert is_auxiliary_reference_path("/repo/examples/auth_demo.py")
assert is_auxiliary_reference_path("/repo/benchmarks/search_eval.py")
assert is_auxiliary_reference_path("/repo/tools/debug_search.py")
assert not is_auxiliary_reference_path("/repo/src/auth.py")
def test_query_targets_auxiliary_files(self):
assert query_targets_auxiliary_files("show smart search examples")
assert query_targets_auxiliary_files("benchmark smart search")
assert not query_targets_auxiliary_files("smart search routing")
def test_apply_path_penalties_demotes_test_files(self):
results = [
_make_result(path="/repo/tests/test_auth.py", score=10.0),
_make_result(path="/repo/src/auth.py", score=9.0),
]
penalized = apply_path_penalties(
results,
"authenticate user",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/auth.py"
assert penalized[1].metadata["path_penalty_reasons"] == ["test_file"]
def test_apply_path_penalties_more_aggressively_demotes_tests_for_keyword_queries(self):
results = [
_make_result(path="/repo/tests/test_auth.py", score=5.0),
_make_result(path="/repo/src/auth.py", score=4.0),
]
penalized = apply_path_penalties(
results,
"find_descendant_project_roots",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/auth.py"
assert penalized[1].metadata["path_penalty_reasons"] == ["test_file"]
assert penalized[1].metadata["path_penalty_multiplier"] == pytest.approx(0.55)
assert penalized[1].metadata["path_rank_multiplier"] == pytest.approx(0.55)
def test_apply_path_penalties_more_aggressively_demotes_tests_for_semantic_queries(self):
results = [
_make_result(path="/repo/tests/test_auth.py", score=5.0),
_make_result(path="/repo/src/auth.py", score=4.1),
]
penalized = apply_path_penalties(
results,
"how does auth routing work",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/auth.py"
assert penalized[1].metadata["path_penalty_reasons"] == ["test_file"]
assert penalized[1].metadata["path_penalty_multiplier"] == pytest.approx(0.75)
def test_apply_path_penalties_boosts_source_definitions_for_identifier_queries(self):
results = [
_make_result(
path="/repo/tests/test_registry.py",
score=4.2,
excerpt='query="find_descendant_project_roots"',
),
_make_result(
path="/repo/src/registry.py",
score=3.0,
excerpt="def find_descendant_project_roots(self, source_root: Path) -> list[str]:",
),
]
penalized = apply_path_penalties(
results,
"find_descendant_project_roots",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/registry.py"
assert penalized[0].metadata["path_boost_reasons"] == ["source_definition"]
assert penalized[0].metadata["path_boost_multiplier"] == pytest.approx(2.0)
assert penalized[0].metadata["path_rank_multiplier"] == pytest.approx(2.0)
assert penalized[1].metadata["path_penalty_reasons"] == ["test_file"]
def test_apply_path_penalties_boosts_source_paths_for_semantic_feature_queries(self):
results = [
_make_result(
path="/repo/tests/smart-search-intent.test.js",
score=0.832,
excerpt="describes how smart search routes keyword queries",
),
_make_result(
path="/repo/src/tools/smart-search.ts",
score=0.555,
excerpt="smart search keyword routing logic",
),
]
penalized = apply_path_penalties(
results,
"how does smart search route keyword queries",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/tools/smart-search.ts"
assert penalized[0].metadata["path_boost_reasons"] == ["source_path_topic_overlap"]
assert penalized[0].metadata["path_boost_multiplier"] == pytest.approx(1.35)
assert penalized[0].metadata["path_boost_overlap_tokens"] == ["smart", "search"]
assert penalized[1].metadata["path_penalty_reasons"] == ["test_file"]
def test_apply_path_penalties_strongly_boosts_keyword_basename_overlap(self):
results = [
_make_result(
path="/repo/src/tools/core-memory.ts",
score=0.04032417772512223,
excerpt="memory listing helpers",
),
_make_result(
path="/repo/src/tools/smart-search.ts",
score=0.009836065573770493,
excerpt="smart search keyword routing logic",
),
]
penalized = apply_path_penalties(
results,
"executeHybridMode dense_rerank semantic smart_search",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/tools/smart-search.ts"
assert penalized[0].metadata["path_boost_reasons"] == ["source_path_topic_overlap"]
assert penalized[0].metadata["path_boost_multiplier"] == pytest.approx(4.5)
assert penalized[0].metadata["path_boost_overlap_tokens"] == ["smart", "search"]
def test_extract_explicit_path_hints_ignores_generic_platform_terms(self):
assert extract_explicit_path_hints(
"parse CodexLens JSON output strip ANSI smart_search",
) == [["smart", "search"]]
def test_apply_path_penalties_prefers_explicit_feature_hint_over_platform_terms(self):
results = [
_make_result(
path="/repo/src/tools/codex-lens-lsp.ts",
score=0.045,
excerpt="CodexLens LSP bridge",
),
_make_result(
path="/repo/src/tools/smart-search.ts",
score=0.03,
excerpt="parse JSON output and strip ANSI for plain-text fallback",
),
]
penalized = apply_path_penalties(
results,
"parse CodexLens JSON output strip ANSI smart_search",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/tools/smart-search.ts"
assert penalized[0].metadata["path_boost_reasons"] == ["source_path_topic_overlap"]
assert penalized[0].metadata["path_boost_overlap_tokens"] == ["smart", "search"]
def test_apply_path_penalties_strongly_boosts_lexical_config_modules(self):
results = [
_make_result(
path="/repo/src/tools/smart-search.ts",
score=22.07,
excerpt="embedding backend local api config routing",
),
_make_result(
path="/repo/src/codexlens/config.py",
score=4.88,
excerpt="embedding_backend = 'fastembed'",
),
]
penalized = apply_path_penalties(
results,
"embedding backend fastembed local litellm api config",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/codexlens/config.py"
assert penalized[0].metadata["path_boost_reasons"] == ["source_path_topic_overlap"]
assert penalized[0].metadata["path_boost_multiplier"] == pytest.approx(5.0)
assert penalized[0].metadata["path_boost_overlap_tokens"] == ["config"]
def test_apply_path_penalties_more_aggressively_demotes_tests_for_explicit_feature_queries(self):
results = [
_make_result(
path="/repo/tests/smart-search-intent.test.js",
score=1.0,
excerpt="smart search intent coverage",
),
_make_result(
path="/repo/src/tools/smart-search.ts",
score=0.58,
excerpt="plain-text JSON fallback for smart search",
),
]
penalized = apply_path_penalties(
results,
"parse CodexLens JSON output strip ANSI smart_search",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/tools/smart-search.ts"
assert penalized[1].metadata["path_penalty_reasons"] == ["test_file"]
assert penalized[1].metadata["path_penalty_multiplier"] == pytest.approx(0.55)
def test_apply_path_penalties_demotes_generated_artifacts(self):
results = [
_make_result(path="/repo/dist/auth.js", score=10.0),
_make_result(path="/repo/src/auth.ts", score=9.0),
]
penalized = apply_path_penalties(
results,
"authenticate user",
generated_file_penalty=0.35,
)
assert penalized[0].path == "/repo/src/auth.ts"
assert penalized[1].metadata["path_penalty_reasons"] == ["generated_artifact"]
def test_apply_path_penalties_more_aggressively_demotes_generated_artifacts_for_explicit_feature_queries(self):
results = [
_make_result(
path="/repo/dist/tools/smart-search.js",
score=1.0,
excerpt="built smart search output",
),
_make_result(
path="/repo/src/tools/smart-search.ts",
score=0.45,
excerpt="plain-text JSON fallback for smart search",
),
]
penalized = apply_path_penalties(
results,
"parse CodexLens JSON output strip ANSI smart_search",
generated_file_penalty=0.35,
)
assert penalized[0].path == "/repo/src/tools/smart-search.ts"
assert penalized[1].metadata["path_penalty_reasons"] == ["generated_artifact"]
assert penalized[1].metadata["path_penalty_multiplier"] == pytest.approx(0.4)
def test_apply_path_penalties_demotes_auxiliary_reference_files(self):
results = [
_make_result(path="/repo/examples/simple_search_comparison.py", score=10.0),
_make_result(path="/repo/src/search/router.py", score=9.0),
]
penalized = apply_path_penalties(
results,
"how does smart search route keyword queries",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/search/router.py"
assert penalized[1].metadata["path_penalty_reasons"] == ["auxiliary_file"]
def test_apply_path_penalties_more_aggressively_demotes_auxiliary_files_for_explicit_feature_queries(self):
results = [
_make_result(
path="/repo/benchmarks/smart_search_demo.py",
score=1.0,
excerpt="demo for smart search fallback",
),
_make_result(
path="/repo/src/tools/smart-search.ts",
score=0.52,
excerpt="plain-text JSON fallback for smart search",
),
]
penalized = apply_path_penalties(
results,
"parse CodexLens JSON output strip ANSI smart_search",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/src/tools/smart-search.ts"
assert penalized[1].metadata["path_penalty_reasons"] == ["auxiliary_file"]
assert penalized[1].metadata["path_penalty_multiplier"] == pytest.approx(0.5)
def test_apply_path_penalties_skips_when_query_targets_tests(self):
results = [
_make_result(path="/repo/tests/test_auth.py", score=10.0),
_make_result(path="/repo/src/auth.py", score=9.0),
]
penalized = apply_path_penalties(
results,
"auth tests",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/tests/test_auth.py"
def test_apply_path_penalties_skips_generated_penalty_when_query_targets_artifacts(self):
results = [
_make_result(path="/repo/dist/auth.js", score=10.0),
_make_result(path="/repo/src/auth.ts", score=9.0),
]
penalized = apply_path_penalties(
results,
"dist auth bundle",
generated_file_penalty=0.35,
)
assert penalized[0].path == "/repo/dist/auth.js"
def test_rebalance_noisy_results_pushes_explicit_feature_query_noise_behind_source_files(self):
results = [
_make_result(path="/repo/src/tools/smart-search.ts", score=0.9),
_make_result(path="/repo/tests/smart-search-intent.test.tsx", score=0.8),
_make_result(path="/repo/src/core/cli-routes.ts", score=0.7),
_make_result(path="/repo/dist/tools/smart-search.js", score=0.6),
_make_result(path="/repo/benchmarks/smart_search_demo.py", score=0.5),
]
rebalanced = rebalance_noisy_results(
results,
"parse CodexLens JSON output strip ANSI smart_search",
)
assert [item.path for item in rebalanced[:2]] == [
"/repo/src/tools/smart-search.ts",
"/repo/src/core/cli-routes.ts",
]
def test_rebalance_noisy_results_preserves_tests_when_query_targets_them(self):
results = [
_make_result(path="/repo/tests/smart-search-intent.test.tsx", score=0.9),
_make_result(path="/repo/src/tools/smart-search.ts", score=0.8),
]
rebalanced = rebalance_noisy_results(results, "smart search tests")
assert [item.path for item in rebalanced] == [
"/repo/tests/smart-search-intent.test.tsx",
"/repo/src/tools/smart-search.ts",
]
def test_apply_path_penalties_skips_auxiliary_penalty_when_query_targets_examples(self):
results = [
_make_result(path="/repo/examples/simple_search_comparison.py", score=10.0),
_make_result(path="/repo/src/search/router.py", score=9.0),
]
penalized = apply_path_penalties(
results,
"smart search examples",
test_file_penalty=0.15,
)
assert penalized[0].path == "/repo/examples/simple_search_comparison.py"
class TestCrossEncoderRerank:
"""Tests for cross-encoder reranking edge cases."""
def test_cross_encoder_rerank_preserves_strong_source_candidates_for_semantic_feature_queries(self):
class DummyReranker:
def score_pairs(self, pairs, batch_size=32):
_ = (pairs, batch_size)
return [0.8323705792427063, 1.2463066923373844e-05]
reranked = cross_encoder_rerank(
"how does smart search route keyword queries",
[
_make_result(
path="/repo/tests/smart-search-intent.test.js",
score=0.5989155769348145,
excerpt="describes how smart search routes keyword queries",
),
_make_result(
path="/repo/src/tools/smart-search.ts",
score=0.554444432258606,
excerpt="smart search keyword routing logic",
),
],
DummyReranker(),
top_k=2,
)
reranked = apply_path_penalties(
reranked,
"how does smart search route keyword queries",
test_file_penalty=0.15,
)
assert reranked[0].path == "/repo/src/tools/smart-search.ts"
assert reranked[0].metadata["cross_encoder_floor_reason"] == "semantic_source_path_overlap"
assert reranked[0].metadata["cross_encoder_floor_overlap_tokens"] == ["smart", "search"]
assert reranked[0].metadata["path_boost_reasons"] == ["source_path_topic_overlap"]
assert reranked[1].metadata["path_penalty_reasons"] == ["test_file"]
# =============================================================================
# Tests: get_rrf_weights
# =============================================================================

View File

@@ -67,3 +67,60 @@ def test_find_nearest_index(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) ->
assert found is not None
assert found.id == mapping.id
def test_find_descendant_project_roots_returns_nested_project_roots(tmp_path: Path) -> None:
db_path = tmp_path / "registry.db"
workspace_root = tmp_path / "workspace"
child_a = workspace_root / "packages" / "app-a"
child_b = workspace_root / "tools" / "app-b"
outside_root = tmp_path / "external"
with RegistryStore(db_path=db_path) as store:
workspace_project = store.register_project(
workspace_root,
tmp_path / "indexes" / "workspace",
)
child_a_project = store.register_project(
child_a,
tmp_path / "indexes" / "workspace" / "packages" / "app-a",
)
child_b_project = store.register_project(
child_b,
tmp_path / "indexes" / "workspace" / "tools" / "app-b",
)
outside_project = store.register_project(
outside_root,
tmp_path / "indexes" / "external",
)
store.register_dir(
workspace_project.id,
workspace_root,
tmp_path / "indexes" / "workspace" / "_index.db",
depth=0,
)
child_a_mapping = store.register_dir(
child_a_project.id,
child_a,
tmp_path / "indexes" / "workspace" / "packages" / "app-a" / "_index.db",
depth=0,
)
child_b_mapping = store.register_dir(
child_b_project.id,
child_b,
tmp_path / "indexes" / "workspace" / "tools" / "app-b" / "_index.db",
depth=0,
)
store.register_dir(
outside_project.id,
outside_root,
tmp_path / "indexes" / "external" / "_index.db",
depth=0,
)
descendants = store.find_descendant_project_roots(workspace_root)
assert [mapping.index_path for mapping in descendants] == [
child_a_mapping.index_path,
child_b_mapping.index_path,
]

View File

@@ -313,3 +313,89 @@ def test_onnx_reranker_scores_pairs_with_sigmoid_normalization(
expected = [1.0 / (1.0 + math.exp(-float(i))) for i in range(len(pairs))]
assert scores == pytest.approx(expected, rel=1e-6, abs=1e-6)
def test_onnx_reranker_splits_tuple_providers_into_provider_options(
monkeypatch: pytest.MonkeyPatch,
) -> None:
import numpy as np
captured: dict[str, object] = {}
dummy_onnxruntime = types.ModuleType("onnxruntime")
dummy_optimum = types.ModuleType("optimum")
dummy_optimum.__path__ = []
dummy_optimum_ort = types.ModuleType("optimum.onnxruntime")
class DummyModelOutput:
def __init__(self, logits: np.ndarray) -> None:
self.logits = logits
class DummyModel:
input_names = ["input_ids", "attention_mask"]
def __call__(self, **inputs):
batch = int(inputs["input_ids"].shape[0])
return DummyModelOutput(logits=np.zeros((batch, 1), dtype=np.float32))
class DummyORTModelForSequenceClassification:
@classmethod
def from_pretrained(
cls,
model_name: str,
providers=None,
provider_options=None,
**kwargs,
):
captured["model_name"] = model_name
captured["providers"] = providers
captured["provider_options"] = provider_options
captured["kwargs"] = kwargs
return DummyModel()
dummy_optimum_ort.ORTModelForSequenceClassification = DummyORTModelForSequenceClassification
dummy_transformers = types.ModuleType("transformers")
class DummyAutoTokenizer:
model_max_length = 512
@classmethod
def from_pretrained(cls, model_name: str, **kwargs):
_ = model_name, kwargs
return cls()
def __call__(self, *, text, text_pair, return_tensors, **kwargs):
_ = text_pair, kwargs
assert return_tensors == "np"
batch = len(text)
return {
"input_ids": np.zeros((batch, 4), dtype=np.int64),
"attention_mask": np.ones((batch, 4), dtype=np.int64),
}
dummy_transformers.AutoTokenizer = DummyAutoTokenizer
monkeypatch.setitem(sys.modules, "onnxruntime", dummy_onnxruntime)
monkeypatch.setitem(sys.modules, "optimum", dummy_optimum)
monkeypatch.setitem(sys.modules, "optimum.onnxruntime", dummy_optimum_ort)
monkeypatch.setitem(sys.modules, "transformers", dummy_transformers)
reranker = get_reranker(
backend="onnx",
model_name="dummy-model",
use_gpu=True,
providers=[
("DmlExecutionProvider", {"device_id": 1}),
"CPUExecutionProvider",
],
)
assert isinstance(reranker, ONNXReranker)
scores = reranker.score_pairs([("q", "d")], batch_size=1)
assert scores == pytest.approx([0.5])
assert captured["model_name"] == "dummy-model"
assert captured["providers"] == ["DmlExecutionProvider", "CPUExecutionProvider"]
assert captured["provider_options"] == [{"device_id": 1}, {}]

View File

@@ -428,6 +428,51 @@ class TestIndexPathCollection:
assert len(paths) == 1
engine.close()
def test_collect_skips_ignored_artifact_indexes(self, mock_registry, mock_mapper, temp_dir):
"""Test collection skips dist/build-style artifact subtrees."""
root_dir = temp_dir / "project"
root_dir.mkdir()
root_db = root_dir / "_index.db"
root_store = DirIndexStore(root_db)
root_store.initialize()
src_dir = root_dir / "src"
src_dir.mkdir()
src_db = src_dir / "_index.db"
src_store = DirIndexStore(src_db)
src_store.initialize()
dist_dir = root_dir / "dist"
dist_dir.mkdir()
dist_db = dist_dir / "_index.db"
dist_store = DirIndexStore(dist_db)
dist_store.initialize()
workflow_dir = root_dir / ".workflow"
workflow_dir.mkdir()
workflow_db = workflow_dir / "_index.db"
workflow_store = DirIndexStore(workflow_db)
workflow_store.initialize()
root_store.register_subdir(name="src", index_path=src_db)
root_store.register_subdir(name="dist", index_path=dist_db)
root_store.register_subdir(name=".workflow", index_path=workflow_db)
root_store.close()
src_store.close()
dist_store.close()
workflow_store.close()
engine = ChainSearchEngine(mock_registry, mock_mapper)
paths = engine._collect_index_paths(root_db, depth=-1)
assert {path.relative_to(root_dir).as_posix() for path in paths} == {
"_index.db",
"src/_index.db",
}
engine.close()
class TestResultMergeAndRank:
"""Tests for _merge_and_rank method."""
@@ -490,6 +535,36 @@ class TestResultMergeAndRank:
assert merged == []
engine.close()
def test_merge_applies_test_file_penalty_for_non_test_query(self, mock_registry, mock_mapper):
"""Non-test queries should lightly demote test files during merge."""
engine = ChainSearchEngine(mock_registry, mock_mapper)
results = [
SearchResult(path="/repo/tests/test_auth.py", score=10.0, excerpt="match 1"),
SearchResult(path="/repo/src/auth.py", score=9.0, excerpt="match 2"),
]
merged = engine._merge_and_rank(results, limit=10, query="authenticate users")
assert merged[0].path == "/repo/src/auth.py"
assert merged[1].metadata["path_penalty_reasons"] == ["test_file"]
engine.close()
def test_merge_applies_generated_file_penalty_for_non_artifact_query(self, mock_registry, mock_mapper):
"""Non-artifact queries should lightly demote generated/build results during merge."""
engine = ChainSearchEngine(mock_registry, mock_mapper)
results = [
SearchResult(path="/repo/dist/auth.js", score=10.0, excerpt="match 1"),
SearchResult(path="/repo/src/auth.ts", score=9.0, excerpt="match 2"),
]
merged = engine._merge_and_rank(results, limit=10, query="authenticate users")
assert merged[0].path == "/repo/src/auth.ts"
assert merged[1].metadata["path_penalty_reasons"] == ["generated_artifact"]
engine.close()
# === Hierarchical Chain Search Tests ===

View File

@@ -400,15 +400,20 @@ class TestStage4OptionalRerank:
"""Tests for Stage 4: Optional cross-encoder reranking."""
def test_stage4_reranks_with_reranker(
self, mock_registry, mock_mapper, mock_config
self, mock_registry, mock_mapper, temp_paths
):
"""Test _stage4_optional_rerank uses _cross_encoder_rerank."""
engine = ChainSearchEngine(mock_registry, mock_mapper, config=mock_config)
"""Test _stage4_optional_rerank overfetches before final trim."""
config = Config(data_dir=temp_paths / "data")
config.reranker_top_k = 4
config.reranking_top_k = 4
engine = ChainSearchEngine(mock_registry, mock_mapper, config=config)
results = [
SearchResult(path="a.py", score=0.9, excerpt="a"),
SearchResult(path="b.py", score=0.8, excerpt="b"),
SearchResult(path="c.py", score=0.7, excerpt="c"),
SearchResult(path="d.py", score=0.6, excerpt="d"),
SearchResult(path="e.py", score=0.5, excerpt="e"),
]
# Mock the _cross_encoder_rerank method that _stage4 calls
@@ -416,12 +421,14 @@ class TestStage4OptionalRerank:
mock_rerank.return_value = [
SearchResult(path="c.py", score=0.95, excerpt="c"),
SearchResult(path="a.py", score=0.85, excerpt="a"),
SearchResult(path="d.py", score=0.83, excerpt="d"),
SearchResult(path="e.py", score=0.81, excerpt="e"),
]
reranked = engine._stage4_optional_rerank("query", results, k=2)
mock_rerank.assert_called_once_with("query", results, 2)
assert len(reranked) <= 2
mock_rerank.assert_called_once_with("query", results, 4)
assert len(reranked) == 4
# First result should be reranked winner
assert reranked[0].path == "c.py"
@@ -633,6 +640,113 @@ class TestStagedCascadeIntegration:
a_result = next(r for r in result.results if r.path == "a.py")
assert a_result.score == 0.9
def test_staged_cascade_expands_stage3_target_for_rerank_budget(
self, mock_registry, mock_mapper, temp_paths
):
"""Test staged cascade preserves enough Stage 3 reps for rerank budget."""
config = Config(data_dir=temp_paths / "data")
config.enable_staged_rerank = True
config.reranker_top_k = 6
config.reranking_top_k = 6
engine = ChainSearchEngine(mock_registry, mock_mapper, config=config)
expanded_results = [
SearchResult(path=f"src/file-{index}.ts", score=1.0 - (index * 0.01), excerpt="x")
for index in range(8)
]
with patch.object(engine, "_find_start_index") as mock_find:
mock_find.return_value = temp_paths / "index" / "_index.db"
with patch.object(engine, "_collect_index_paths") as mock_collect:
mock_collect.return_value = [temp_paths / "index" / "_index.db"]
with patch.object(engine, "_stage1_binary_search") as mock_stage1:
mock_stage1.return_value = (
[SearchResult(path="seed.ts", score=0.9, excerpt="seed")],
temp_paths / "index",
)
with patch.object(engine, "_stage2_lsp_expand") as mock_stage2:
mock_stage2.return_value = expanded_results
with patch.object(engine, "_stage3_cluster_prune") as mock_stage3:
mock_stage3.return_value = expanded_results[:6]
with patch.object(engine, "_stage4_optional_rerank") as mock_stage4:
mock_stage4.return_value = expanded_results[:2]
engine.staged_cascade_search(
"query",
temp_paths / "src",
k=2,
coarse_k=20,
)
mock_stage3.assert_called_once_with(
expanded_results,
6,
query="query",
)
def test_staged_cascade_overfetches_rerank_before_final_trim(
self, mock_registry, mock_mapper, temp_paths
):
"""Test staged rerank keeps enough candidates for path penalties to work."""
config = Config(data_dir=temp_paths / "data")
config.enable_staged_rerank = True
config.reranker_top_k = 4
config.reranking_top_k = 4
config.test_file_penalty = 0.15
config.generated_file_penalty = 0.35
engine = ChainSearchEngine(mock_registry, mock_mapper, config=config)
src_primary = str(temp_paths / "src" / "tools" / "smart-search.ts")
src_secondary = str(temp_paths / "src" / "tools" / "codex-lens.ts")
test_primary = str(temp_paths / "tests" / "integration" / "cli-routes.test.ts")
test_secondary = str(
temp_paths / "frontend" / "tests" / "e2e" / "prompt-memory.spec.ts"
)
query = "parse CodexLens JSON output strip ANSI smart_search"
clustered_results = [
SearchResult(path=test_primary, score=0.98, excerpt="test"),
SearchResult(path=test_secondary, score=0.97, excerpt="test"),
SearchResult(path=src_primary, score=0.96, excerpt="source"),
SearchResult(path=src_secondary, score=0.95, excerpt="source"),
]
with patch.object(engine, "_find_start_index") as mock_find:
mock_find.return_value = temp_paths / "index" / "_index.db"
with patch.object(engine, "_collect_index_paths") as mock_collect:
mock_collect.return_value = [temp_paths / "index" / "_index.db"]
with patch.object(engine, "_stage1_binary_search") as mock_stage1:
mock_stage1.return_value = (
[SearchResult(path=src_primary, score=0.9, excerpt="seed")],
temp_paths / "index",
)
with patch.object(engine, "_stage2_lsp_expand") as mock_stage2:
mock_stage2.return_value = clustered_results
with patch.object(engine, "_stage3_cluster_prune") as mock_stage3:
mock_stage3.return_value = clustered_results
with patch.object(engine, "_cross_encoder_rerank") as mock_rerank:
mock_rerank.return_value = clustered_results
result = engine.staged_cascade_search(
query,
temp_paths / "src",
k=2,
coarse_k=20,
)
mock_rerank.assert_called_once_with(query, clustered_results, 4)
assert [item.path for item in result.results] == [src_primary, src_secondary]
# =============================================================================
# Graceful Degradation Tests