Add tests and implement functionality for staged cascade search and LSP expansion

- Introduced a new JSON file for verbose output of the Codex Lens search results.
- Added unit tests for binary search functionality in `test_stage1_binary_search_uses_chunk_lines.py`.
- Implemented regression tests for staged cascade Stage 2 expansion depth in `test_staged_cascade_lsp_depth.py`.
- Created unit tests for staged cascade Stage 2 realtime LSP graph expansion in `test_staged_cascade_realtime_lsp.py`.
- Enhanced the ChainSearchEngine to respect configuration settings for staged LSP depth and improve search accuracy.
This commit is contained in:
catlog22
2026-02-08 21:54:42 +08:00
parent 166211dcd4
commit b9b2932f50
20 changed files with 1882 additions and 283 deletions

415
codex-lens/_tmp_search.json Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,95 @@
{
"success": true,
"result": {
"query": "class Config",
"method": "cascade",
"count": 10,
"results": [
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\hybrid_search\\data_structures.py",
"score": 0.06081658330145309,
"excerpt": " @classmethod\n def from_dict(cls, data: Dict[str, Any]) -> \"CallHierarchyItem\":\n return cls(\n name=data[\"name\"],\n kind=data[\"kind\"],\n file_path=data[\"file...",
"content": " @classmethod\n def from_dict(cls, data: Dict[str, Any]) -> \"CallHierarchyItem\":\n return cls(\n name=data[\"name\"],\n kind=data[\"kind\"],\n file_path=data[\"file_path\"],\n range=Range.from_dict(data[\"range\"]),\n detail=data.get(\"detail\"),\n )\n\n\n@dataclass\nclass CodeSymbolNode:\n\n id: str\n name: str\n kind: str\n file_path: str\n range: Range\n embedding: Optional[List[float]] = None\n raw_code: str = \"\"\n docstring: str = \"\"\n score: float = 0.0\n\n def __post_init__(self) -> None:\n if not self.id:\n raise ValueError(\"id cannot be empty\")\n if not self.name:\n raise ValueError(\"name cannot be empty\")\n if not self.kind:\n raise ValueError(\"kind cannot be empty\")\n if not self.file_path:\n raise ValueError(\"file_path cannot be empty\")\n\n def __hash__(self) -> int:\n return hash(self.id)\n\n def __eq__(self, other: object) -> bool:\n if not isinstance(other, CodeSymbolNode):\n return False\n return self.id == other.id\n\n def to_dict(self) -> Dict[str, Any]:\n",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\cli\\commands.py",
"score": 0.056576452190618645,
"excerpt": "from rich.table import Table\n\nfrom codexlens.config import Config\nfrom codexlens.entities import IndexedFile, SearchResult, Symbol\nfrom codexlens.errors import CodexLensError, ConfigError, ParseError,...",
"content": "import os\nimport shutil\nimport sqlite3\nfrom pathlib import Path\nfrom typing import Annotated, Any, Dict, Iterable, List, Optional\n\nimport typer\nfrom rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn\nfrom rich.table import Table\n\nfrom codexlens.config import Config\nfrom codexlens.entities import IndexedFile, SearchResult, Symbol\nfrom codexlens.errors import CodexLensError, ConfigError, ParseError, StorageError, SearchError\nfrom codexlens.parsers.factory import ParserFactory\nfrom codexlens.storage.path_mapper import PathMapper\nfrom codexlens.storage.registry import RegistryStore, ProjectInfo\nfrom codexlens.storage.index_tree import IndexTreeBuilder\nfrom codexlens.storage.dir_index import DirIndexStore\nfrom codexlens.search.chain_search import ChainSearchEngine, SearchOptions\nfrom codexlens.watcher import WatcherManager, WatcherConfig\n",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\config.py",
"score": 0.05655744432847353,
"excerpt": "\"\"\"Configuration system for CodexLens.\"\"\"\n\nfrom __future__ import annotations",
"content": "\"\"\"Configuration system for CodexLens.\"\"\"\n\nfrom __future__ import annotations\n\nimport json\nimport logging\nimport os\nfrom dataclasses import dataclass, field\nfrom functools import cached_property\nfrom pathlib import Path\nfrom typing import Any, Dict, List, Optional",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\search\\chain_search.py",
"score": 0.049219375000264694,
"excerpt": "\nfrom concurrent.futures import ThreadPoolExecutor, as_completed\nfrom dataclasses import dataclass, field\nfrom pathlib import Path\nfrom typing import List, Optional, Dict, Any, Literal, Tuple, TYPE_CH...",
"content": "\"\"\"Chain search engine for recursive multi-directory searching.\n\nProvides parallel search across directory hierarchies using indexed _index.db files.\nSupports depth-limited traversal, result aggregation, and symbol search.\n\"\"\"\n\nfrom __future__ import annotations\n\nfrom concurrent.futures import ThreadPoolExecutor, as_completed\nfrom dataclasses import dataclass, field\nfrom pathlib import Path\nfrom typing import List, Optional, Dict, Any, Literal, Tuple, TYPE_CHECKING\nimport json\nimport logging\nimport os\nimport time\n\nfrom codexlens.entities import SearchResult, Symbol\n\nif TYPE_CHECKING:",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\indexing\\embedding.py",
"score": 0.047931429239828446,
"excerpt": " def __init__(\n self,\n model_name: Optional[str] = None,\n use_gpu: bool = True,\n expand_dim: bool = True,\n ) -> None:\n from codexlens.semantic import SEMANTIC_...",
"content": " def __init__(\n self,\n model_name: Optional[str] = None,\n use_gpu: bool = True,\n expand_dim: bool = True,\n ) -> None:\n from codexlens.semantic import SEMANTIC_AVAILABLE\n\n if not SEMANTIC_AVAILABLE:\n raise ImportError(\n \"Semantic search dependencies not available. \"\n \"Install with: pip install codexlens[semantic]\"\n )\n\n self._model_name = model_name or self.DEFAULT_MODEL\n self._use_gpu = use_gpu\n self._expand_dim = expand_dim\n self._model = None\n self._native_dim: Optional[int] = None\n\n \n self._expansion_matrix: Optional[np.ndarray] = None\n\n @property\n def model_name(self) -> str:\n return self._model_name\n\n @property\n def embedding_dim(self) -> int:\n if self._expand_dim:\n return self.TARGET_DIM\n \n if self._native_dim is not None:\n return self._native_dim\n \n model_dims = {\n \"BAAI/bge-large-en-v1.5\": 1024,\n \"BAAI/bge-base-en-v1.5\": 768,\n \"BAAI/bge-small-en-v1.5\": 384,\n \"intfloat/multilingual-e5-large\": 1024,\n }\n return model_dims.get(self._model_name, 1024)\n\n @property\n def max_tokens(self) -> int:\n return 512 \n\n",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\semantic\\rotational_embedder.py",
"score": 0.04283104206542711,
"excerpt": "import threading\nimport time\nfrom dataclasses import dataclass, field\nfrom enum import Enum\nfrom typing import Any, Dict, Iterable, List, Optional",
"content": "Provides intelligent load balancing across multiple LiteLLM embedding endpoints\nto maximize throughput while respecting rate limits.\n\"\"\"\n\nfrom __future__ import annotations\n\nimport logging\nimport random\nimport threading\nimport time\nfrom dataclasses import dataclass, field\nfrom enum import Enum\nfrom typing import Any, Dict, Iterable, List, Optional\n\nimport numpy as np\n\nfrom .base import BaseEmbedder\n\nlogger = logging.getLogger(__name__)\n\n",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\lsp\\standalone_manager.py",
"score": 0.036886112765573215,
"excerpt": "- Direct subprocess spawning of language servers\n- JSON-RPC 2.0 communication over stdin/stdout\n- Multi-language support via configuration file (lsp-servers.json)\n- Process lifecycle management with a...",
"content": "\"\"\"Standalone Language Server Manager for direct LSP communication.\n\nThis module provides direct communication with language servers via JSON-RPC over stdio,\neliminating the need for VSCode Bridge. Similar to cclsp architecture.\n\nFeatures:\n- Direct subprocess spawning of language servers\n- JSON-RPC 2.0 communication over stdin/stdout\n- Multi-language support via configuration file (lsp-servers.json)\n- Process lifecycle management with auto-restart\n- Compatible interface with existing LspBridge\n\"\"\"\n\nfrom __future__ import annotations\n\nimport asyncio\nimport json\nimport logging\nimport os",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\api\\models.py",
"score": 0.03448209080810879,
"excerpt": " container: Containing class/module (if any)\n score: Match score for ranking\n return {k: v for k, v in asdict(self).items() if v is not None}\n\n\n# =================================...",
"content": " container: Containing class/module (if any)\n score: Match score for ranking\n return {k: v for k, v in asdict(self).items() if v is not None}\n\n\n# =============================================================================\n# Section 4.4: find_references dataclasses\n# =============================================================================\n\n@dataclass\nclass ReferenceResult:\n file_path: str\n line: int\n column: int\n context_line: str\n relationship: str # call | import | type_annotation | inheritance\n\n def to_dict(self) -> dict:\n return asdict(self)\n\n\n@dataclass\nclass GroupedReferences:\n definition: DefinitionResult\n references: List[ReferenceResult] = field(default_factory=list)\n\n def to_dict(self) -> dict:\n return {\n \"definition\": self.definition.to_dict(),\n \"references\": [r.to_dict() for r in self.references],\n }\n\n\n",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\parsers\\treesitter_parser.py",
"score": 0.03341093379138448,
"excerpt": "\n if TREE_SITTER_AVAILABLE:\n self._initialize_parser()\n\n def _initialize_parser(self) -> None:\n if TreeSitterParser is None or TreeSitterLanguage is None:\n retur...",
"content": "\n if TREE_SITTER_AVAILABLE:\n self._initialize_parser()\n\n def _initialize_parser(self) -> None:\n if TreeSitterParser is None or TreeSitterLanguage is None:\n return\n\n try:\n \n if self.language_id == \"python\":\n import tree_sitter_python\n self._language = TreeSitterLanguage(tree_sitter_python.language())\n elif self.language_id == \"javascript\":\n import tree_sitter_javascript\n self._language = TreeSitterLanguage(tree_sitter_javascript.language())\n elif self.language_id == \"typescript\":\n import tree_sitter_typescript\n \n if self.path is not None and self.path.suffix.lower() == \".tsx\":\n self._language = TreeSitterLanguage(tree_sitter_typescript.language_tsx())\n else:\n self._language = TreeSitterLanguage(tree_sitter_typescript.language_typescript())\n else:\n return\n\n \n self._parser = TreeSitterParser()\n if hasattr(self._parser, \"set_language\"):\n self._parser.set_language(self._language) \n else:\n self._parser.language = self._language \n\n except Exception:\n \n self._parser = None\n self._language = None\n\n def is_available(self) -> bool:\n return self._parser is not None and self._language is not None\n\n def _parse_tree(self, text: str) -> Optional[tuple[bytes, TreeSitterNode]]:\n if not self.is_available() or self._parser is None:\n",
"source": null,
"symbol": null
},
{
"path": "D:\\Claude_dms3\\codex-lens\\src\\codexlens\\watcher\\incremental_indexer.py",
"score": 0.029568673189485736,
"excerpt": "\nimport logging\nfrom dataclasses import dataclass\nfrom pathlib import Path\nfrom typing import List, Optional",
"content": "\"\"\"Incremental indexer for processing file changes.\"\"\"\n\nfrom __future__ import annotations\n\nimport logging\nfrom dataclasses import dataclass\nfrom pathlib import Path\nfrom typing import List, Optional\n\nfrom codexlens.config import Config\nfrom codexlens.parsers.factory import ParserFactory\nfrom codexlens.storage.dir_index import DirIndexStore\nfrom codexlens.storage.global_index import GlobalSymbolIndex\nfrom codexlens.storage.path_mapper import PathMapper\nfrom codexlens.storage.registry import RegistryStore\n",
"source": null,
"symbol": null
}
],
"stats": {
"dirs_searched": 17,
"files_matched": 10,
"time_ms": 6667.8361892700195
}
}
}

View File

@@ -455,6 +455,12 @@ def search(
hidden=True,
help="[Advanced] Cascade strategy for --method cascade."
),
staged_stage2_mode: Optional[str] = typer.Option(
None,
"--staged-stage2-mode",
hidden=True,
help="[Advanced] Stage 2 expansion mode for cascade strategy 'staged': precomputed | realtime.",
),
# Hidden deprecated parameter for backward compatibility
mode: Optional[str] = typer.Option(None, "--mode", hidden=True, help="[DEPRECATED] Use --method instead."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
@@ -545,7 +551,7 @@ def search(
# Validate cascade_strategy if provided (for advanced users)
if internal_cascade_strategy is not None:
valid_strategies = ["binary", "hybrid", "binary_rerank", "dense_rerank"]
valid_strategies = ["binary", "hybrid", "binary_rerank", "dense_rerank", "staged"]
if internal_cascade_strategy not in valid_strategies:
if json_mode:
print_json(success=False, error=f"Invalid cascade strategy: {internal_cascade_strategy}. Must be one of: {', '.join(valid_strategies)}")
@@ -606,6 +612,18 @@ def search(
engine = ChainSearchEngine(registry, mapper, config=config)
# Optional staged cascade overrides (only meaningful for cascade strategy 'staged')
if staged_stage2_mode is not None:
stage2 = staged_stage2_mode.strip().lower()
if stage2 not in {"precomputed", "realtime"}:
msg = "Invalid --staged-stage2-mode. Must be: precomputed | realtime."
if json_mode:
print_json(success=False, error=msg)
else:
console.print(f"[red]{msg}[/red]")
raise typer.Exit(code=1)
config.staged_stage2_mode = stage2
# Map method to SearchOptions flags
# fts: FTS-only search (optionally with fuzzy)
# vector: Pure vector semantic search
@@ -986,6 +1004,103 @@ def status(
registry.close()
@app.command(name="lsp-status")
def lsp_status(
path: Path = typer.Option(Path("."), "--path", "-p", help="Workspace root for LSP probing."),
probe_file: Optional[Path] = typer.Option(
None,
"--probe-file",
help="Optional file path to probe (starts the matching language server and prints capabilities).",
),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Show standalone LSP configuration and optionally probe a language server.
This exercises the existing LSP server selection/startup path in StandaloneLspManager.
"""
_configure_logging(verbose, json_mode)
import asyncio
import shutil
from codexlens.lsp.standalone_manager import StandaloneLspManager
workspace_root = path.expanduser().resolve()
probe_path = probe_file.expanduser().resolve() if probe_file is not None else None
async def _run():
manager = StandaloneLspManager(workspace_root=str(workspace_root))
await manager.start()
servers = []
for language_id, cfg in sorted(manager._configs.items()): # type: ignore[attr-defined]
cmd0 = cfg.command[0] if cfg.command else None
servers.append(
{
"language_id": language_id,
"display_name": cfg.display_name,
"extensions": list(cfg.extensions),
"command": list(cfg.command),
"command_available": bool(shutil.which(cmd0)) if cmd0 else False,
}
)
probe = None
if probe_path is not None:
state = await manager._get_server(str(probe_path))
if state is None:
probe = {
"file": str(probe_path),
"ok": False,
"error": "No language server configured/available for this file.",
}
else:
probe = {
"file": str(probe_path),
"ok": True,
"language_id": state.config.language_id,
"display_name": state.config.display_name,
"initialized": bool(state.initialized),
"capabilities": state.capabilities,
}
await manager.stop()
return {"workspace_root": str(workspace_root), "servers": servers, "probe": probe}
try:
payload = asyncio.run(_run())
except Exception as exc:
if json_mode:
print_json(success=False, error=f"LSP status failed: {exc}")
else:
console.print(f"[red]LSP status failed:[/red] {exc}")
raise typer.Exit(code=1)
if json_mode:
print_json(success=True, result=payload)
return
console.print("[bold]CodexLens LSP Status[/bold]")
console.print(f" Workspace: {payload['workspace_root']}")
console.print("\n[bold]Configured Servers:[/bold]")
for s in payload["servers"]:
ok = "" if s["command_available"] else ""
console.print(f" {ok} {s['display_name']} ({s['language_id']}) -> {s['command'][0] if s['command'] else ''}")
console.print(f" Extensions: {', '.join(s['extensions'])}")
if payload["probe"] is not None:
probe = payload["probe"]
console.print("\n[bold]Probe:[/bold]")
if not probe.get("ok"):
console.print(f"{probe.get('file')}")
console.print(f" {probe.get('error')}")
else:
console.print(f"{probe.get('file')}")
console.print(f" Server: {probe.get('display_name')} ({probe.get('language_id')})")
console.print(f" Initialized: {probe.get('initialized')}")
@app.command()
def projects(
action: str = typer.Argument("list", help="Action: list, show, remove"),
@@ -3962,4 +4077,3 @@ def index_migrate_deprecated(
json_mode=json_mode,
verbose=verbose,
)

View File

@@ -145,6 +145,11 @@ class Config:
# Staged cascade search configuration (4-stage pipeline)
staged_coarse_k: int = 200 # Number of coarse candidates from Stage 1 binary search
staged_lsp_depth: int = 2 # LSP relationship expansion depth in Stage 2
staged_stage2_mode: str = "precomputed" # "precomputed" (graph_neighbors) | "realtime" (LSP)
staged_realtime_lsp_timeout_s: float = 10.0 # Max time budget for realtime LSP expansion
staged_realtime_lsp_max_nodes: int = 100 # Node cap for realtime graph expansion
staged_realtime_lsp_warmup_s: float = 2.0 # Wait for server analysis after opening seed docs
staged_realtime_lsp_resolve_symbols: bool = False # If True, resolves symbol names via documentSymbol (slower)
staged_clustering_strategy: str = "auto" # "auto", "hdbscan", "dbscan", "frequency", "noop"
staged_clustering_min_size: int = 3 # Minimum cluster size for Stage 3 grouping
enable_staged_rerank: bool = True # Enable optional cross-encoder reranking in Stage 4

View File

@@ -20,6 +20,7 @@ from collections import OrderedDict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from urllib.parse import unquote
if TYPE_CHECKING:
from codexlens.lsp.standalone_manager import StandaloneLspManager
@@ -62,12 +63,14 @@ class Location:
"""
# Handle VSCode URI format (file:///path/to/file)
uri = data.get("uri", data.get("file_path", ""))
if uri.startswith("file:///"):
# Windows: file:///C:/path -> C:/path
# Unix: file:///path -> /path
file_path = uri[8:] if uri[8:9].isalpha() and uri[9:10] == ":" else uri[7:]
elif uri.startswith("file://"):
file_path = uri[7:]
if uri.startswith("file://"):
# Strip scheme and decode percent-encoding (e.g. file:///d%3A/...).
# Keep behavior compatible with both Windows and Unix paths.
raw = unquote(uri[7:]) # keep leading slash for Unix paths
# Windows: file:///C:/... or file:///c%3A/... -> C:/...
if raw.startswith("/") and len(raw) > 2 and raw[2] == ":":
raw = raw[1:]
file_path = raw
else:
file_path = uri

View File

@@ -28,6 +28,7 @@ class LspGraphBuilder:
max_depth: int = 2,
max_nodes: int = 100,
max_concurrent: int = 10,
resolve_symbols: bool = True,
):
"""Initialize GraphBuilder.
@@ -35,10 +36,12 @@ class LspGraphBuilder:
max_depth: Maximum depth for BFS expansion from seeds.
max_nodes: Maximum number of nodes in the graph.
max_concurrent: Maximum concurrent LSP requests.
resolve_symbols: If False, skip documentSymbol lookups and create lightweight nodes.
"""
self.max_depth = max_depth
self.max_nodes = max_nodes
self.max_concurrent = max_concurrent
self.resolve_symbols = resolve_symbols
# Cache for document symbols per file (avoids per-location hover queries)
self._document_symbols_cache: Dict[str, List[Dict[str, Any]]] = {}
@@ -276,9 +279,11 @@ class LspGraphBuilder:
start_line = location.line
# Try to find symbol info from cached document symbols (fast)
symbol_info = await self._get_symbol_at_location(
file_path, start_line, lsp_bridge
)
symbol_info = None
if self.resolve_symbols:
symbol_info = await self._get_symbol_at_location(
file_path, start_line, lsp_bridge
)
if symbol_info:
name = symbol_info.get("name", f"symbol_L{start_line}")

View File

@@ -1094,15 +1094,15 @@ class ChainSearchEngine:
metadata = chunk.get("metadata")
symbol_name = None
symbol_kind = None
start_line = None
end_line = None
start_line = chunk.get("start_line")
end_line = chunk.get("end_line")
if metadata:
try:
meta_dict = json.loads(metadata) if isinstance(metadata, str) else metadata
symbol_name = meta_dict.get("symbol_name")
symbol_kind = meta_dict.get("symbol_kind")
start_line = meta_dict.get("start_line")
end_line = meta_dict.get("end_line")
start_line = meta_dict.get("start_line", start_line)
end_line = meta_dict.get("end_line", end_line)
except Exception:
pass
@@ -1130,10 +1130,11 @@ class ChainSearchEngine:
coarse_results: List[SearchResult],
index_root: Optional[Path],
) -> List[SearchResult]:
"""Stage 2: LSP-based graph expansion using GraphExpander.
"""Stage 2: LSP/graph expansion for staged cascade.
Expands coarse results with related symbols (definitions, references,
callers, callees) using precomputed graph neighbors.
Supports two modes via Config.staged_stage2_mode:
- "precomputed" (default): GraphExpander over per-dir `graph_neighbors` table
- "realtime": on-demand graph expansion via live LSP servers (LspBridge + LspGraphBuilder)
Args:
coarse_results: Results from Stage 1 binary search
@@ -1146,44 +1147,14 @@ class ChainSearchEngine:
return coarse_results
try:
from codexlens.search.graph_expander import GraphExpander
# Get expansion depth from config
depth = 2
mode = "precomputed"
if self._config is not None:
depth = getattr(self._config, "graph_expansion_depth", 2)
mode = (getattr(self._config, "staged_stage2_mode", "precomputed") or "precomputed").strip().lower()
expander = GraphExpander(self.mapper, config=self._config)
if mode in {"realtime", "live"}:
return self._stage2_realtime_lsp_expand(coarse_results, index_root=index_root)
# Expand top results (limit expansion to avoid explosion)
max_expand = min(10, len(coarse_results))
max_related = 50
related_results = expander.expand(
coarse_results,
depth=depth,
max_expand=max_expand,
max_related=max_related,
)
if related_results:
self.logger.debug(
"Stage 2 expanded %d base results to %d related symbols",
len(coarse_results), len(related_results)
)
# Combine: original results + related results
# Keep original results first (higher relevance)
combined = list(coarse_results)
seen_keys = {(r.path, r.symbol_name, r.start_line) for r in coarse_results}
for related in related_results:
key = (related.path, related.symbol_name, related.start_line)
if key not in seen_keys:
seen_keys.add(key)
combined.append(related)
return combined
return self._stage2_precomputed_graph_expand(coarse_results, index_root=index_root)
except ImportError as exc:
self.logger.debug("GraphExpander not available: %s", exc)
@@ -1192,6 +1163,238 @@ class ChainSearchEngine:
self.logger.debug("Stage 2 LSP expansion failed: %s", exc)
return coarse_results
def _stage2_precomputed_graph_expand(
self,
coarse_results: List[SearchResult],
*,
index_root: Path,
) -> List[SearchResult]:
"""Stage 2 (precomputed): expand using GraphExpander over `graph_neighbors`."""
from codexlens.search.graph_expander import GraphExpander
depth = 2
if self._config is not None:
depth = getattr(
self._config,
"staged_lsp_depth",
getattr(self._config, "graph_expansion_depth", 2),
)
try:
depth = int(depth)
except Exception:
depth = 2
expander = GraphExpander(self.mapper, config=self._config)
max_expand = min(10, len(coarse_results))
max_related = 50
related_results = expander.expand(
coarse_results,
depth=depth,
max_expand=max_expand,
max_related=max_related,
)
if related_results:
self.logger.debug(
"Stage 2 (precomputed) expanded %d base results to %d related symbols",
len(coarse_results), len(related_results)
)
return self._combine_stage2_results(coarse_results, related_results)
def _stage2_realtime_lsp_expand(
self,
coarse_results: List[SearchResult],
*,
index_root: Path,
) -> List[SearchResult]:
"""Stage 2 (realtime): compute expansion graph via live LSP servers."""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from codexlens.hybrid_search.data_structures import CodeSymbolNode, Range
from codexlens.lsp import LspBridge, LspGraphBuilder
max_depth = 2
timeout_s = 10.0
max_nodes = 100
warmup_s = 2.0
resolve_symbols = False
if self._config is not None:
max_depth = int(getattr(self._config, "staged_lsp_depth", 2) or 2)
timeout_s = float(getattr(self._config, "staged_realtime_lsp_timeout_s", 10.0) or 10.0)
max_nodes = int(getattr(self._config, "staged_realtime_lsp_max_nodes", 100) or 100)
warmup_s = float(getattr(self._config, "staged_realtime_lsp_warmup_s", 2.0) or 0.0)
resolve_symbols = bool(getattr(self._config, "staged_realtime_lsp_resolve_symbols", False))
try:
source_root = self.mapper.index_to_source(index_root)
except Exception:
source_root = Path(coarse_results[0].path).resolve().parent
workspace_root = self._find_lsp_workspace_root(source_root)
max_expand = min(10, len(coarse_results))
seed_nodes: List[CodeSymbolNode] = []
seed_ids: set[str] = set()
for seed in list(coarse_results)[:max_expand]:
if not seed.path:
continue
name = seed.symbol_name or Path(seed.path).stem
kind = seed.symbol_kind or "unknown"
start_line = int(seed.start_line or 1)
end_line = int(seed.end_line or start_line)
start_character = 1
try:
if seed.symbol_name and start_line >= 1:
line_text = Path(seed.path).read_text(encoding="utf-8", errors="ignore").splitlines()[start_line - 1]
idx = line_text.find(seed.symbol_name)
if idx >= 0:
start_character = idx + 1 # 1-based for StandaloneLspManager
except Exception:
start_character = 1
node_id = f"{seed.path}:{name}:{start_line}"
seed_ids.add(node_id)
seed_nodes.append(
CodeSymbolNode(
id=node_id,
name=name,
kind=kind,
file_path=seed.path,
range=Range(
start_line=start_line,
start_character=start_character,
end_line=end_line,
end_character=1,
),
raw_code=seed.content or "",
docstring=seed.excerpt or "",
)
)
if not seed_nodes:
return coarse_results
async def expand_graph():
async with LspBridge(workspace_root=str(workspace_root), timeout=timeout_s) as bridge:
# Warm up analysis: open seed docs and wait a bit so references/call hierarchy are populated.
if warmup_s > 0:
for seed in seed_nodes[:3]:
try:
await bridge.get_document_symbols(seed.file_path)
except Exception:
continue
try:
await asyncio.sleep(min(warmup_s, max(0.0, timeout_s - 0.5)))
except Exception:
pass
builder = LspGraphBuilder(
max_depth=max_depth,
max_nodes=max_nodes,
resolve_symbols=resolve_symbols,
)
return await builder.build_from_seeds(seed_nodes, bridge)
def run_coro_blocking():
return asyncio.run(asyncio.wait_for(expand_graph(), timeout=timeout_s))
try:
try:
asyncio.get_running_loop()
has_running_loop = True
except RuntimeError:
has_running_loop = False
if has_running_loop:
with ThreadPoolExecutor(max_workers=1) as executor:
graph = executor.submit(run_coro_blocking).result(timeout=timeout_s + 1.0)
else:
graph = run_coro_blocking()
except Exception as exc:
self.logger.debug("Stage 2 (realtime) expansion failed: %s", exc)
return coarse_results
related_results: List[SearchResult] = []
for node_id, node in getattr(graph, "nodes", {}).items():
if node_id in seed_ids or getattr(node, "id", "") in seed_ids:
continue
try:
start_line = int(getattr(node.range, "start_line", 1) or 1)
end_line = int(getattr(node.range, "end_line", start_line) or start_line)
except Exception:
start_line, end_line = 1, 1
related_results.append(
SearchResult(
path=node.file_path,
score=0.5,
excerpt=None,
content=getattr(node, "raw_code", "") or None,
symbol_name=node.name,
symbol_kind=node.kind,
start_line=start_line,
end_line=end_line,
metadata={"stage2_mode": "realtime", "lsp_node_id": node_id},
)
)
if related_results:
self.logger.debug(
"Stage 2 (realtime) expanded %d base results to %d related symbols",
len(coarse_results), len(related_results)
)
return self._combine_stage2_results(coarse_results, related_results)
def _combine_stage2_results(
self,
coarse_results: List[SearchResult],
related_results: List[SearchResult],
) -> List[SearchResult]:
combined = list(coarse_results)
seen_keys = {(r.path, r.symbol_name, r.start_line) for r in coarse_results}
for related in related_results:
key = (related.path, related.symbol_name, related.start_line)
if key not in seen_keys:
seen_keys.add(key)
combined.append(related)
return combined
def _find_lsp_workspace_root(self, start_path: Path) -> Path:
"""Best-effort workspace root selection for LSP initialization.
Many language servers (e.g. Pyright) use workspace-relative include/exclude
patterns, so using a deep subdir (like "src") as root can break reference
and call-hierarchy queries.
"""
start = Path(start_path).resolve()
if start.is_file():
start = start.parent
# Prefer an explicit LSP config file in the workspace.
for current in [start, *list(start.parents)]:
try:
if (current / "lsp-servers.json").is_file():
return current
except OSError:
continue
# Fallback heuristics for project root markers.
for current in [start, *list(start.parents)]:
try:
if (current / ".git").exists() or (current / "pyproject.toml").is_file():
return current
except OSError:
continue
return start
def _stage3_cluster_prune(
self,
expanded_results: List[SearchResult],

View File

@@ -0,0 +1,65 @@
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
from codexlens.config import VECTORS_META_DB_NAME, Config
from codexlens.search.chain_search import ChainSearchEngine, SearchStats
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.registry import RegistryStore
def test_stage1_binary_search_prefers_chunk_start_line(tmp_path: Path) -> None:
registry = RegistryStore(db_path=tmp_path / "registry.db")
registry.initialize()
mapper = PathMapper(index_root=tmp_path / "indexes")
engine = ChainSearchEngine(registry, mapper, config=Config(data_dir=tmp_path / "data"))
try:
index_root = tmp_path / "fake_index_root"
index_root.mkdir(parents=True, exist_ok=True)
index_db = index_root / "_index.db"
index_db.write_text("", encoding="utf-8")
(index_root / VECTORS_META_DB_NAME).write_text("", encoding="utf-8")
class _DummyBinarySearcher:
def search(self, query_dense, top_k: int):
_ = query_dense
_ = top_k
return [(123, 10)]
class _DummyEmbedder:
def embed_to_numpy(self, texts):
_ = texts
return [[0.0]]
dummy_meta_store = MagicMock()
dummy_meta_store.get_chunks_by_ids.return_value = [
{
"chunk_id": 123,
"file_path": str(tmp_path / "a.py"),
"content": "def a():\n return 1\n",
"start_line": 12,
"end_line": 14,
"metadata": {},
"category": "code",
}
]
with patch.object(engine, "_get_centralized_binary_searcher", return_value=_DummyBinarySearcher()):
with patch("codexlens.search.chain_search.VectorMetadataStore", return_value=dummy_meta_store):
with patch("codexlens.semantic.embedder.Embedder", return_value=_DummyEmbedder()):
coarse_results, returned_root = engine._stage1_binary_search(
"a",
[index_db],
coarse_k=1,
stats=SearchStats(),
)
assert returned_root == index_root
assert len(coarse_results) == 1
assert coarse_results[0].start_line == 12
assert coarse_results[0].end_line == 14
finally:
engine.close()

View File

@@ -0,0 +1,168 @@
"""Regression tests for staged cascade Stage 2 expansion depth.
Staged cascade is documented as:
coarse (binary) → LSP/graph expansion → clustering → optional rerank
This test ensures Stage 2 respects Config.staged_lsp_depth (not unrelated
graph_expansion_depth settings).
"""
from __future__ import annotations
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from codexlens.config import Config
from codexlens.entities import CodeRelationship, RelationshipType, SearchResult, Symbol
from codexlens.search.chain_search import ChainSearchEngine
from codexlens.storage.dir_index import DirIndexStore
from codexlens.storage.index_tree import _compute_graph_neighbors
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.registry import RegistryStore
@pytest.fixture()
def temp_paths() -> Path:
tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
root = Path(tmpdir.name)
yield root
try:
tmpdir.cleanup()
except (PermissionError, OSError):
pass
def _create_index_with_neighbors(root: Path) -> tuple[PathMapper, Path, Path, str]:
project_root = root / "project"
project_root.mkdir(parents=True, exist_ok=True)
index_root = root / "indexes"
mapper = PathMapper(index_root=index_root)
index_db_path = mapper.source_to_index_db(project_root)
index_db_path.parent.mkdir(parents=True, exist_ok=True)
# Use 3 files so staged_cascade_search's final "deduplicate by path" step
# doesn't collapse all expanded symbols into a single file result.
content_a = "\n".join(["def a():", " b()", ""])
content_b = "\n".join(["def b():", " c()", ""])
content_c = "\n".join(["def c():", " return 1", ""])
file_a = project_root / "a.py"
file_b = project_root / "b.py"
file_c = project_root / "c.py"
file_a.write_text(content_a, encoding="utf-8")
file_b.write_text(content_b, encoding="utf-8")
file_c.write_text(content_c, encoding="utf-8")
symbols_a = [Symbol(name="a", kind="function", range=(1, 2), file=str(file_a))]
symbols_b = [Symbol(name="b", kind="function", range=(1, 2), file=str(file_b))]
symbols_c = [Symbol(name="c", kind="function", range=(1, 2), file=str(file_c))]
relationships_a = [
CodeRelationship(
source_symbol="a",
target_symbol="b",
relationship_type=RelationshipType.CALL,
source_file=str(file_a),
target_file=str(file_b),
source_line=2,
)
]
relationships_b = [
CodeRelationship(
source_symbol="b",
target_symbol="c",
relationship_type=RelationshipType.CALL,
source_file=str(file_b),
target_file=str(file_c),
source_line=2,
)
]
config = Config(data_dir=root / "data")
store = DirIndexStore(index_db_path, config=config)
store.initialize()
store.add_file(
name=file_a.name,
full_path=file_a,
content=content_a,
language="python",
symbols=symbols_a,
relationships=relationships_a,
)
store.add_file(
name=file_b.name,
full_path=file_b,
content=content_b,
language="python",
symbols=symbols_b,
relationships=relationships_b,
)
store.add_file(
name=file_c.name,
full_path=file_c,
content=content_c,
language="python",
symbols=symbols_c,
relationships=[],
)
_compute_graph_neighbors(store)
store.close()
return mapper, project_root, file_a, content_a
def test_staged_cascade_stage2_uses_staged_lsp_depth(temp_paths: Path) -> None:
mapper, project_root, file_path, content = _create_index_with_neighbors(temp_paths)
index_db_path = mapper.source_to_index_db(project_root)
registry = RegistryStore(db_path=temp_paths / "registry.db")
registry.initialize()
# Intentionally conflicting depths: staged_lsp_depth should win for staged cascade.
config = Config(
data_dir=temp_paths / "data",
staged_lsp_depth=1,
graph_expansion_depth=2,
enable_staged_rerank=False,
staged_clustering_strategy="noop",
)
engine = ChainSearchEngine(registry, mapper, config=config)
try:
base = SearchResult(
path=str(file_path.resolve()),
score=1.0,
excerpt="",
content=content,
start_line=1,
end_line=2,
symbol_name="a",
symbol_kind="function",
)
with patch("codexlens.search.chain_search.NUMPY_AVAILABLE", True):
with patch.object(engine, "_find_start_index", return_value=index_db_path):
with patch.object(engine, "_collect_index_paths", return_value=[index_db_path]):
# Bypass binary vector infrastructure; Stage 1 output is sufficient for Stage 2 behavior.
with patch.object(
engine,
"_stage1_binary_search",
return_value=([base], index_db_path.parent),
):
result = engine.staged_cascade_search(
query="test",
source_path=project_root,
k=3,
coarse_k=10,
)
symbol_names = {r.symbol_name for r in result.results if r.symbol_name}
assert "b" in symbol_names
# With staged_lsp_depth=1, Stage 2 should NOT include 2-hop neighbor "c".
assert "c" not in symbol_names
finally:
engine.close()

View File

@@ -0,0 +1,98 @@
"""Unit tests for staged cascade Stage 2 realtime LSP graph expansion.
These tests mock out the live LSP components (LspBridge + LspGraphBuilder)
so they can run without external language servers installed.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from codexlens.config import Config
from codexlens.entities import SearchResult
from codexlens.hybrid_search.data_structures import CodeAssociationGraph, CodeSymbolNode, Range
from codexlens.search.chain_search import ChainSearchEngine
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.registry import RegistryStore
class _DummyBridge:
def __init__(self, *args, **kwargs) -> None:
pass
async def get_document_symbols(self, file_path: str):
_ = file_path
return []
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
def test_stage2_realtime_mode_expands_and_combines(tmp_path: Path) -> None:
registry = RegistryStore(db_path=tmp_path / "registry.db")
registry.initialize()
mapper = PathMapper(index_root=tmp_path / "indexes")
config = Config(
data_dir=tmp_path / "data",
staged_stage2_mode="realtime",
staged_lsp_depth=1,
staged_realtime_lsp_timeout_s=1.0,
staged_realtime_lsp_max_nodes=10,
staged_realtime_lsp_warmup_s=0.0,
)
engine = ChainSearchEngine(registry, mapper, config=config)
try:
coarse = [
SearchResult(
path=str(tmp_path / "a.py"),
score=1.0,
excerpt="def a(): pass",
content="def a():\n pass\n",
symbol_name="a",
symbol_kind="function",
start_line=1,
end_line=2,
)
]
graph = CodeAssociationGraph()
seed_id = f"{coarse[0].path}:a:1"
graph.nodes[seed_id] = CodeSymbolNode(
id=seed_id,
name="a",
kind="function",
file_path=coarse[0].path,
range=Range(start_line=1, start_character=1, end_line=2, end_character=1),
)
related_id = f"{str(tmp_path / 'b.py')}:b:1"
graph.nodes[related_id] = CodeSymbolNode(
id=related_id,
name="b",
kind="function",
file_path=str(tmp_path / "b.py"),
range=Range(start_line=1, start_character=1, end_line=1, end_character=1),
raw_code="def b():\n return 1\n",
)
dummy_builder = MagicMock()
dummy_builder.build_from_seeds = AsyncMock(return_value=graph)
with patch("codexlens.lsp.LspBridge", _DummyBridge):
with patch("codexlens.lsp.LspGraphBuilder", return_value=dummy_builder) as mock_builder:
# Avoid needing a real index_to_source mapping
engine.mapper.index_to_source = MagicMock(return_value=tmp_path)
expanded = engine._stage2_lsp_expand(coarse, index_root=tmp_path / "fake_index_root")
assert mock_builder.call_args is not None
assert mock_builder.call_args.kwargs.get("resolve_symbols") is False
names = {r.symbol_name for r in expanded if r.symbol_name}
assert "a" in names
assert "b" in names
finally:
engine.close()

View File

@@ -760,6 +760,24 @@ class TestLocationParsing:
assert loc.line == 1
assert loc.character == 1
def test_location_from_file_uri_windows_percent_encoded_drive(self):
"""Parse Location from percent-encoded Windows drive URIs (pyright-style)."""
from codexlens.lsp.lsp_bridge import Location
data = {
"uri": "file:///d%3A/Claude_dms3/codex-lens/src/codexlens/api/semantic.py",
"range": {
"start": {"line": 18, "character": 3},
"end": {"line": 18, "character": 10},
},
}
loc = Location.from_lsp_response(data)
assert loc.file_path == "d:/Claude_dms3/codex-lens/src/codexlens/api/semantic.py"
assert loc.line == 19 # 0-based -> 1-based
assert loc.character == 4
def test_location_from_direct_fields(self):
"""Parse Location from direct field format."""
from codexlens.lsp.lsp_bridge import Location