mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-05 01:50:27 +08:00
376 lines
12 KiB
Python
376 lines
12 KiB
Python
"""Graph builder for code association graphs via LSP."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Dict, List, Optional, Set, Tuple
|
|
|
|
from codexlens.hybrid_search.data_structures import (
|
|
CallHierarchyItem,
|
|
CodeAssociationGraph,
|
|
CodeSymbolNode,
|
|
Range,
|
|
)
|
|
from codexlens.lsp.lsp_bridge import (
|
|
Location,
|
|
LspBridge,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LspGraphBuilder:
|
|
"""Builds code association graph by expanding from seed symbols using LSP."""
|
|
|
|
def __init__(
|
|
self,
|
|
max_depth: int = 2,
|
|
max_nodes: int = 100,
|
|
max_concurrent: int = 10,
|
|
):
|
|
"""Initialize GraphBuilder.
|
|
|
|
Args:
|
|
max_depth: Maximum depth for BFS expansion from seeds.
|
|
max_nodes: Maximum number of nodes in the graph.
|
|
max_concurrent: Maximum concurrent LSP requests.
|
|
"""
|
|
self.max_depth = max_depth
|
|
self.max_nodes = max_nodes
|
|
self.max_concurrent = max_concurrent
|
|
# Cache for document symbols per file (avoids per-location hover queries)
|
|
self._document_symbols_cache: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
|
async def build_from_seeds(
|
|
self,
|
|
seeds: List[CodeSymbolNode],
|
|
lsp_bridge: LspBridge,
|
|
) -> CodeAssociationGraph:
|
|
"""Build association graph by BFS expansion from seeds.
|
|
|
|
For each seed:
|
|
1. Get references via LSP
|
|
2. Get call hierarchy via LSP
|
|
3. Add nodes and edges to graph
|
|
4. Continue expanding until max_depth or max_nodes reached
|
|
|
|
Args:
|
|
seeds: Initial seed symbols to expand from.
|
|
lsp_bridge: LSP bridge for querying language servers.
|
|
|
|
Returns:
|
|
CodeAssociationGraph with expanded nodes and relationships.
|
|
"""
|
|
graph = CodeAssociationGraph()
|
|
visited: Set[str] = set()
|
|
semaphore = asyncio.Semaphore(self.max_concurrent)
|
|
|
|
# Initialize queue with seeds at depth 0
|
|
queue: List[Tuple[CodeSymbolNode, int]] = [(s, 0) for s in seeds]
|
|
|
|
# Add seed nodes to graph
|
|
for seed in seeds:
|
|
graph.add_node(seed)
|
|
|
|
# BFS expansion
|
|
while queue and len(graph.nodes) < self.max_nodes:
|
|
# Take a batch of nodes from queue
|
|
batch_size = min(self.max_concurrent, len(queue))
|
|
batch = queue[:batch_size]
|
|
queue = queue[batch_size:]
|
|
|
|
# Expand nodes in parallel
|
|
tasks = [
|
|
self._expand_node(
|
|
node, depth, graph, lsp_bridge, visited, semaphore
|
|
)
|
|
for node, depth in batch
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Process results and add new nodes to queue
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
logger.warning("Error expanding node: %s", result)
|
|
continue
|
|
if result:
|
|
# Add new nodes to queue if not at max depth
|
|
for new_node, new_depth in result:
|
|
if (
|
|
new_depth <= self.max_depth
|
|
and len(graph.nodes) < self.max_nodes
|
|
):
|
|
queue.append((new_node, new_depth))
|
|
|
|
return graph
|
|
|
|
async def _expand_node(
|
|
self,
|
|
node: CodeSymbolNode,
|
|
depth: int,
|
|
graph: CodeAssociationGraph,
|
|
lsp_bridge: LspBridge,
|
|
visited: Set[str],
|
|
semaphore: asyncio.Semaphore,
|
|
) -> List[Tuple[CodeSymbolNode, int]]:
|
|
"""Expand a single node, return new nodes to process.
|
|
|
|
Args:
|
|
node: Node to expand.
|
|
depth: Current depth in BFS.
|
|
graph: Graph to add nodes and edges to.
|
|
lsp_bridge: LSP bridge for queries.
|
|
visited: Set of visited node IDs.
|
|
semaphore: Semaphore for concurrency control.
|
|
|
|
Returns:
|
|
List of (new_node, new_depth) tuples to add to queue.
|
|
"""
|
|
# Skip if already visited or at max depth
|
|
if node.id in visited:
|
|
return []
|
|
if depth > self.max_depth:
|
|
return []
|
|
if len(graph.nodes) >= self.max_nodes:
|
|
return []
|
|
|
|
visited.add(node.id)
|
|
new_nodes: List[Tuple[CodeSymbolNode, int]] = []
|
|
|
|
async with semaphore:
|
|
# Get relationships in parallel
|
|
try:
|
|
refs_task = lsp_bridge.get_references(node)
|
|
calls_task = lsp_bridge.get_call_hierarchy(node)
|
|
|
|
refs, calls = await asyncio.gather(
|
|
refs_task, calls_task, return_exceptions=True
|
|
)
|
|
|
|
# Handle reference results
|
|
if isinstance(refs, Exception):
|
|
logger.debug(
|
|
"Failed to get references for %s: %s", node.id, refs
|
|
)
|
|
refs = []
|
|
|
|
# Handle call hierarchy results
|
|
if isinstance(calls, Exception):
|
|
logger.debug(
|
|
"Failed to get call hierarchy for %s: %s",
|
|
node.id,
|
|
calls,
|
|
)
|
|
calls = []
|
|
|
|
# Process references
|
|
for ref in refs:
|
|
if len(graph.nodes) >= self.max_nodes:
|
|
break
|
|
|
|
ref_node = await self._location_to_node(ref, lsp_bridge)
|
|
if ref_node and ref_node.id != node.id:
|
|
if ref_node.id not in graph.nodes:
|
|
graph.add_node(ref_node)
|
|
new_nodes.append((ref_node, depth + 1))
|
|
# Use add_edge since both nodes should exist now
|
|
graph.add_edge(node.id, ref_node.id, "references")
|
|
|
|
# Process call hierarchy (incoming calls)
|
|
for call in calls:
|
|
if len(graph.nodes) >= self.max_nodes:
|
|
break
|
|
|
|
call_node = await self._call_hierarchy_to_node(
|
|
call, lsp_bridge
|
|
)
|
|
if call_node and call_node.id != node.id:
|
|
if call_node.id not in graph.nodes:
|
|
graph.add_node(call_node)
|
|
new_nodes.append((call_node, depth + 1))
|
|
# Incoming call: call_node calls node
|
|
graph.add_edge(call_node.id, node.id, "calls")
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Error during node expansion for %s: %s", node.id, e
|
|
)
|
|
|
|
return new_nodes
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear the document symbols cache.
|
|
|
|
Call this between searches to free memory and ensure fresh data.
|
|
"""
|
|
self._document_symbols_cache.clear()
|
|
|
|
async def _get_symbol_at_location(
|
|
self,
|
|
file_path: str,
|
|
line: int,
|
|
lsp_bridge: LspBridge,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Find symbol at location using cached document symbols.
|
|
|
|
This is much more efficient than individual hover queries because
|
|
document symbols are fetched once per file and cached.
|
|
|
|
Args:
|
|
file_path: Path to the source file.
|
|
line: Line number (1-based).
|
|
lsp_bridge: LSP bridge for fetching document symbols.
|
|
|
|
Returns:
|
|
Symbol dictionary with name, kind, range, etc., or None if not found.
|
|
"""
|
|
# Get or fetch document symbols for this file
|
|
if file_path not in self._document_symbols_cache:
|
|
symbols = await lsp_bridge.get_document_symbols(file_path)
|
|
self._document_symbols_cache[file_path] = symbols
|
|
|
|
symbols = self._document_symbols_cache[file_path]
|
|
|
|
# Find symbol containing this line (best match = smallest range)
|
|
best_match: Optional[Dict[str, Any]] = None
|
|
best_range_size = float("inf")
|
|
|
|
for symbol in symbols:
|
|
sym_range = symbol.get("range", {})
|
|
start = sym_range.get("start", {})
|
|
end = sym_range.get("end", {})
|
|
|
|
# LSP ranges are 0-based, our line is 1-based
|
|
start_line = start.get("line", 0) + 1
|
|
end_line = end.get("line", 0) + 1
|
|
|
|
if start_line <= line <= end_line:
|
|
range_size = end_line - start_line
|
|
if range_size < best_range_size:
|
|
best_match = symbol
|
|
best_range_size = range_size
|
|
|
|
return best_match
|
|
|
|
async def _location_to_node(
|
|
self,
|
|
location: Location,
|
|
lsp_bridge: LspBridge,
|
|
) -> Optional[CodeSymbolNode]:
|
|
"""Convert LSP location to CodeSymbolNode.
|
|
|
|
Uses cached document symbols instead of individual hover queries
|
|
for better performance.
|
|
|
|
Args:
|
|
location: LSP location to convert.
|
|
lsp_bridge: LSP bridge for additional queries.
|
|
|
|
Returns:
|
|
CodeSymbolNode or None if conversion fails.
|
|
"""
|
|
try:
|
|
file_path = location.file_path
|
|
start_line = location.line
|
|
|
|
# Try to find symbol info from cached document symbols (fast)
|
|
symbol_info = await self._get_symbol_at_location(
|
|
file_path, start_line, lsp_bridge
|
|
)
|
|
|
|
if symbol_info:
|
|
name = symbol_info.get("name", f"symbol_L{start_line}")
|
|
kind = symbol_info.get("kind", "unknown")
|
|
|
|
# Extract range from symbol if available
|
|
sym_range = symbol_info.get("range", {})
|
|
start = sym_range.get("start", {})
|
|
end = sym_range.get("end", {})
|
|
|
|
location_range = Range(
|
|
start_line=start.get("line", start_line - 1) + 1,
|
|
start_character=start.get("character", location.character - 1) + 1,
|
|
end_line=end.get("line", start_line - 1) + 1,
|
|
end_character=end.get("character", location.character - 1) + 1,
|
|
)
|
|
else:
|
|
# Fallback to basic node without symbol info
|
|
name = f"symbol_L{start_line}"
|
|
kind = "unknown"
|
|
location_range = Range(
|
|
start_line=location.line,
|
|
start_character=location.character,
|
|
end_line=location.line,
|
|
end_character=location.character,
|
|
)
|
|
|
|
node_id = self._create_node_id(file_path, name, start_line)
|
|
|
|
return CodeSymbolNode(
|
|
id=node_id,
|
|
name=name,
|
|
kind=kind,
|
|
file_path=file_path,
|
|
range=location_range,
|
|
docstring="", # Skip hover for performance
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug("Failed to convert location to node: %s", e)
|
|
return None
|
|
|
|
async def _call_hierarchy_to_node(
|
|
self,
|
|
call_item: CallHierarchyItem,
|
|
lsp_bridge: LspBridge,
|
|
) -> Optional[CodeSymbolNode]:
|
|
"""Convert CallHierarchyItem to CodeSymbolNode.
|
|
|
|
Args:
|
|
call_item: Call hierarchy item to convert.
|
|
lsp_bridge: LSP bridge (unused, kept for API consistency).
|
|
|
|
Returns:
|
|
CodeSymbolNode or None if conversion fails.
|
|
"""
|
|
try:
|
|
file_path = call_item.file_path
|
|
name = call_item.name
|
|
start_line = call_item.range.start_line
|
|
# CallHierarchyItem.kind is already a string
|
|
kind = call_item.kind
|
|
|
|
node_id = self._create_node_id(file_path, name, start_line)
|
|
|
|
return CodeSymbolNode(
|
|
id=node_id,
|
|
name=name,
|
|
kind=kind,
|
|
file_path=file_path,
|
|
range=call_item.range,
|
|
docstring=call_item.detail or "",
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug(
|
|
"Failed to convert call hierarchy item to node: %s", e
|
|
)
|
|
return None
|
|
|
|
def _create_node_id(
|
|
self, file_path: str, name: str, line: int
|
|
) -> str:
|
|
"""Create unique node ID.
|
|
|
|
Args:
|
|
file_path: Path to the file.
|
|
name: Symbol name.
|
|
line: Line number (0-based).
|
|
|
|
Returns:
|
|
Unique node ID string.
|
|
"""
|
|
return f"{file_path}:{name}:{line}"
|