Files
Claude-Code-Workflow/codex-lens/build/lib/codexlens/lsp/standalone_manager.py

1160 lines
41 KiB
Python

"""Standalone Language Server Manager for direct LSP communication.
This module provides direct communication with language servers via JSON-RPC over stdio,
eliminating the need for VSCode Bridge. Similar to cclsp architecture.
Features:
- Direct subprocess spawning of language servers
- JSON-RPC 2.0 communication over stdin/stdout
- Multi-language support via configuration file (lsp-servers.json)
- Process lifecycle management with auto-restart
- Compatible interface with existing LspBridge
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class ServerConfig:
"""Configuration for a language server."""
language_id: str
display_name: str
extensions: List[str]
command: List[str]
enabled: bool = True
initialization_options: Dict[str, Any] = field(default_factory=dict)
settings: Dict[str, Any] = field(default_factory=dict)
root_dir: str = "."
timeout: int = 30000 # ms
restart_interval: int = 5000 # ms
max_restarts: int = 3
@dataclass
class ServerState:
"""State of a running language server."""
config: ServerConfig
process: asyncio.subprocess.Process
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
request_id: int = 0
initialized: bool = False
capabilities: Dict[str, Any] = field(default_factory=dict)
pending_requests: Dict[int, asyncio.Future] = field(default_factory=dict)
restart_count: int = 0
# Queue for producer-consumer pattern - continuous reading puts messages here
message_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
class StandaloneLspManager:
"""Manager for direct language server communication.
Spawns language servers as subprocesses and communicates via JSON-RPC
over stdin/stdout. No VSCode or GUI dependency required.
Example:
manager = StandaloneLspManager(workspace_root="/path/to/project")
await manager.start()
definition = await manager.get_definition(
file_path="src/main.py",
line=10,
character=5
)
await manager.stop()
"""
DEFAULT_CONFIG_FILE = "lsp-servers.json"
def __init__(
self,
workspace_root: Optional[str] = None,
config_file: Optional[str] = None,
timeout: float = 30.0,
):
"""Initialize StandaloneLspManager.
Args:
workspace_root: Root directory of the workspace (used for rootUri)
config_file: Path to lsp-servers.json configuration file
timeout: Default timeout for LSP requests in seconds
"""
self.workspace_root = Path(workspace_root or os.getcwd()).resolve()
self.config_file = config_file
self.timeout = timeout
self._servers: Dict[str, ServerState] = {} # language_id -> ServerState
self._extension_map: Dict[str, str] = {} # extension -> language_id
self._configs: Dict[str, ServerConfig] = {} # language_id -> ServerConfig
self._read_tasks: Dict[str, asyncio.Task] = {} # language_id -> read task
self._stderr_tasks: Dict[str, asyncio.Task] = {} # language_id -> stderr read task
self._lock = asyncio.Lock()
def _find_config_file(self) -> Optional[Path]:
"""Find the lsp-servers.json configuration file.
Search order:
1. Explicit config_file parameter
2. {workspace_root}/lsp-servers.json
3. {workspace_root}/.codexlens/lsp-servers.json
4. Package default (codexlens/lsp-servers.json)
"""
search_paths = []
if self.config_file:
search_paths.append(Path(self.config_file))
search_paths.extend([
self.workspace_root / self.DEFAULT_CONFIG_FILE,
self.workspace_root / ".codexlens" / self.DEFAULT_CONFIG_FILE,
Path(__file__).parent.parent.parent.parent / self.DEFAULT_CONFIG_FILE, # package root
])
for path in search_paths:
if path.exists():
return path
return None
def _load_config(self) -> None:
"""Load language server configuration from JSON file."""
config_path = self._find_config_file()
if not config_path:
logger.warning(f"No {self.DEFAULT_CONFIG_FILE} found, using empty config")
return
try:
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
logger.error(f"Failed to load config from {config_path}: {e}")
return
# Parse defaults
defaults = data.get("defaults", {})
default_timeout = defaults.get("timeout", 30000)
default_restart_interval = defaults.get("restartInterval", 5000)
default_max_restarts = defaults.get("maxRestarts", 3)
# Parse servers
for server_data in data.get("servers", []):
if not server_data.get("enabled", True):
continue
language_id = server_data.get("languageId", "")
if not language_id:
continue
config = ServerConfig(
language_id=language_id,
display_name=server_data.get("displayName", language_id),
extensions=server_data.get("extensions", []),
command=server_data.get("command", []),
enabled=server_data.get("enabled", True),
initialization_options=server_data.get("initializationOptions", {}),
settings=server_data.get("settings", {}),
root_dir=server_data.get("rootDir", defaults.get("rootDir", ".")),
timeout=server_data.get("timeout", default_timeout),
restart_interval=server_data.get("restartInterval", default_restart_interval),
max_restarts=server_data.get("maxRestarts", default_max_restarts),
)
self._configs[language_id] = config
# Build extension map
for ext in config.extensions:
self._extension_map[ext.lower()] = language_id
logger.info(f"Loaded {len(self._configs)} language server configs from {config_path}")
def get_language_id(self, file_path: str) -> Optional[str]:
"""Get language ID for a file based on extension.
Args:
file_path: Path to the file
Returns:
Language ID (e.g., "python", "typescript") or None if unknown
"""
ext = Path(file_path).suffix.lstrip(".").lower()
return self._extension_map.get(ext)
async def start(self) -> None:
"""Initialize the manager and load configuration.
This does NOT start any language servers yet - they are started
on-demand when first needed for a file type.
"""
self._load_config()
logger.info(f"StandaloneLspManager started for workspace: {self.workspace_root}")
async def stop(self) -> None:
"""Stop all running language servers and cleanup."""
async with self._lock:
for language_id in list(self._servers.keys()):
await self._stop_server(language_id)
logger.info("StandaloneLspManager stopped")
async def _start_server(self, language_id: str) -> Optional[ServerState]:
"""Start a language server for the given language.
Args:
language_id: The language ID (e.g., "python")
Returns:
ServerState if successful, None on failure
"""
config = self._configs.get(language_id)
if not config:
logger.error(f"No configuration for language: {language_id}")
return None
if not config.command:
logger.error(f"No command specified for {language_id}")
return None
try:
logger.info(f"Starting {config.display_name}: {' '.join(config.command)}")
# Spawn the language server process
process = await asyncio.create_subprocess_exec(
*config.command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.workspace_root),
)
if process.stdin is None or process.stdout is None:
logger.error(f"Failed to get stdin/stdout for {language_id}")
process.terminate()
return None
state = ServerState(
config=config,
process=process,
reader=process.stdout,
writer=process.stdin,
)
self._servers[language_id] = state
# Start reading stderr in background (prevents pipe buffer from filling up)
if process.stderr:
self._stderr_tasks[language_id] = asyncio.create_task(
self._read_stderr(language_id, process.stderr)
)
# CRITICAL: Start the continuous reader task IMMEDIATELY before any communication
# This ensures no messages are lost during initialization handshake
self._read_tasks[language_id] = asyncio.create_task(
self._continuous_reader(language_id)
)
# Start the message processor task to handle queued messages
asyncio.create_task(self._process_messages(language_id))
# Initialize the server - now uses queue for reading responses
await self._initialize_server(state)
logger.info(f"{config.display_name} started and initialized")
return state
except FileNotFoundError:
logger.error(
f"Language server not found: {config.command[0]}. "
f"Install it with the appropriate package manager."
)
return None
except Exception as e:
logger.error(f"Failed to start {language_id}: {e}")
return None
async def _stop_server(self, language_id: str) -> None:
"""Stop a language server."""
state = self._servers.pop(language_id, None)
if not state:
return
# Cancel read task
task = self._read_tasks.pop(language_id, None)
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Cancel stderr task
stderr_task = self._stderr_tasks.pop(language_id, None)
if stderr_task:
stderr_task.cancel()
try:
await stderr_task
except asyncio.CancelledError:
pass
# Send shutdown request
try:
await self._send_request(state, "shutdown", None, timeout=5.0)
except Exception:
pass
# Send exit notification
try:
await self._send_notification(state, "exit", None)
except Exception:
pass
# Terminate process
if state.process.returncode is None:
state.process.terminate()
try:
await asyncio.wait_for(state.process.wait(), timeout=5.0)
except asyncio.TimeoutError:
state.process.kill()
logger.info(f"Stopped {state.config.display_name}")
async def _get_server(self, file_path: str) -> Optional[ServerState]:
"""Get or start the appropriate language server for a file.
Args:
file_path: Path to the file being operated on
Returns:
ServerState for the appropriate language server, or None
"""
language_id = self.get_language_id(file_path)
if not language_id:
logger.debug(f"No language server configured for: {file_path}")
return None
async with self._lock:
if language_id in self._servers:
state = self._servers[language_id]
# Check if process is still running
if state.process.returncode is None:
return state
# Process died, remove it
del self._servers[language_id]
# Start new server
return await self._start_server(language_id)
async def _initialize_server(self, state: ServerState) -> None:
"""Send initialize request and wait for response via the message queue.
The continuous reader and message processor are already running, so we just
send the request and wait for the response via pending_requests.
"""
root_uri = self.workspace_root.as_uri()
# Simplified params matching direct test that works
params = {
"processId": None, # Use None like direct test
"rootUri": root_uri,
"rootPath": str(self.workspace_root),
"capabilities": {
"textDocument": {
"documentSymbol": {
"hierarchicalDocumentSymbolSupport": True,
},
},
"workspace": {
"configuration": True,
},
},
"workspaceFolders": [
{
"uri": root_uri,
"name": self.workspace_root.name,
}
],
}
# Send initialize request and wait for response via queue
state.request_id += 1
init_request_id = state.request_id
# Create future for the response
future: asyncio.Future = asyncio.get_event_loop().create_future()
state.pending_requests[init_request_id] = future
# Send the request
init_message = {
"jsonrpc": "2.0",
"id": init_request_id,
"method": "initialize",
"params": params,
}
encoded = self._encode_message(init_message)
logger.debug(f"Sending initialize request id={init_request_id}")
state.writer.write(encoded)
await state.writer.drain()
# Wait for response (will be routed by _process_messages)
try:
init_result = await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
state.pending_requests.pop(init_request_id, None)
raise RuntimeError("Initialize request timed out")
if init_result is None:
init_result = {}
# Store capabilities
state.capabilities = init_result.get("capabilities", {})
state.initialized = True
logger.debug(f"Initialize response received, capabilities: {len(state.capabilities)} keys")
# Send initialized notification
await self._send_notification(state, "initialized", {})
# Give time for server to process initialized and send any requests
# The message processor will handle workspace/configuration automatically
await asyncio.sleep(0.5)
def _encode_message(self, content: Dict[str, Any]) -> bytes:
"""Encode a JSON-RPC message with LSP headers."""
body = json.dumps(content).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n"
return header.encode("ascii") + body
async def _read_message(self, reader: asyncio.StreamReader) -> Tuple[Optional[Dict[str, Any]], bool]:
"""Read a JSON-RPC message from the stream.
Returns:
Tuple of (message, stream_closed). If stream_closed is True, the reader loop
should exit. If False and message is None, it was just a timeout.
"""
try:
# Read headers
content_length = 0
while True:
try:
line = await asyncio.wait_for(reader.readline(), timeout=1.0)
except asyncio.TimeoutError:
# Timeout is not an error - just no message available yet
return None, False
if not line:
# Empty read means stream closed
return None, True
line_str = line.decode("ascii").strip()
if line_str: # Only log non-empty lines
logger.debug(f"Read header line: {repr(line_str[:80])}")
if not line_str:
break # Empty line = end of headers
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
if content_length == 0:
return None, False
# Read body
body = await reader.readexactly(content_length)
return json.loads(body.decode("utf-8")), False
except asyncio.IncompleteReadError:
return None, True
except Exception as e:
logger.error(f"Error reading message: {e}")
return None, True
async def _continuous_reader(self, language_id: str) -> None:
"""Continuously read messages from language server and put them in the queue.
This is the PRODUCER in the producer-consumer pattern. It starts IMMEDIATELY
after subprocess creation and runs continuously until shutdown. This ensures
no messages are ever lost, even during initialization handshake.
"""
state = self._servers.get(language_id)
if not state:
return
logger.debug(f"Continuous reader started for {language_id}")
try:
while True:
try:
# Read headers with timeout
content_length = 0
while True:
try:
line = await asyncio.wait_for(state.reader.readline(), timeout=5.0)
except asyncio.TimeoutError:
continue # Keep waiting for data
if not line:
logger.debug(f"Continuous reader for {language_id}: EOF")
return
line_str = line.decode("ascii").strip()
if not line_str:
break # End of headers
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
if content_length == 0:
continue
# Read body
body = await state.reader.readexactly(content_length)
message = json.loads(body.decode("utf-8"))
# Put message in queue for processing
await state.message_queue.put(message)
msg_id = message.get("id", "none")
msg_method = message.get("method", "none")
logger.debug(f"Queued message: id={msg_id}, method={msg_method}")
except asyncio.IncompleteReadError:
logger.debug(f"Continuous reader for {language_id}: IncompleteReadError")
return
except Exception as e:
logger.error(f"Error in continuous reader for {language_id}: {e}")
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.debug(f"Continuous reader cancelled for {language_id}")
except Exception as e:
logger.error(f"Fatal error in continuous reader for {language_id}: {e}")
async def _process_messages(self, language_id: str) -> None:
"""Process messages from the queue and route them appropriately.
This is the CONSUMER in the producer-consumer pattern. It handles:
- Server requests (workspace/configuration, etc.) - responds immediately
- Notifications (window/logMessage, etc.) - logs them
- Responses to our requests are NOT handled here - they're consumed by _wait_for_response
"""
state = self._servers.get(language_id)
if not state:
return
logger.debug(f"Message processor started for {language_id}")
try:
while True:
# Get message from queue (blocks until available)
message = await state.message_queue.get()
msg_id = message.get("id")
method = message.get("method", "")
# Response (has id but no method) - put back for _wait_for_response to consume
if msg_id is not None and not method:
# This is a response to one of our requests
if msg_id in state.pending_requests:
future = state.pending_requests.pop(msg_id)
if "error" in message:
future.set_exception(
Exception(message["error"].get("message", "Unknown error"))
)
else:
future.set_result(message.get("result"))
logger.debug(f"Response routed to pending request id={msg_id}")
else:
logger.debug(f"No pending request for response id={msg_id}")
# Server request (has both id and method) - needs response
elif msg_id is not None and method:
logger.info(f"Server request: {method} (id={msg_id})")
await self._handle_server_request(state, message)
# Notification (has method but no id)
elif method:
self._handle_server_message(language_id, message)
state.message_queue.task_done()
except asyncio.CancelledError:
logger.debug(f"Message processor cancelled for {language_id}")
except Exception as e:
logger.error(f"Error in message processor for {language_id}: {e}")
async def _read_stderr(self, language_id: str, stderr: asyncio.StreamReader) -> None:
"""Background task to read stderr from a language server.
This prevents the stderr pipe buffer from filling up, which would
cause the language server process to block and stop responding.
"""
try:
while True:
line = await stderr.readline()
if not line:
break
text = line.decode("utf-8", errors="replace").rstrip()
if text:
# Log stderr output at warning level for visibility
logger.warning(f"[{language_id}] {text}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug(f"Error reading stderr for {language_id}: {e}")
def _handle_server_message(self, language_id: str, message: Dict[str, Any]) -> None:
"""Handle notifications from the language server."""
method = message.get("method", "")
params = message.get("params", {})
if method == "window/logMessage":
level = params.get("type", 4) # 1=error, 2=warn, 3=info, 4=log
text = params.get("message", "")
if level == 1:
logger.error(f"[{language_id}] {text}")
elif level == 2:
logger.warning(f"[{language_id}] {text}")
else:
logger.debug(f"[{language_id}] {text}")
elif method == "window/showMessage":
text = params.get("message", "")
logger.info(f"[{language_id}] {text}")
async def _handle_server_request(self, state: ServerState, message: Dict[str, Any]) -> None:
"""Handle requests from the language server that need a response."""
request_id = message["id"]
method = message.get("method", "")
params = message.get("params", {})
logger.info(f"SERVER REQUEST: {method} (id={request_id}) params={params}")
result = None
if method == "workspace/configuration":
# Return configuration items for each requested scope
items = params.get("items", [])
result = []
for item in items:
section = item.get("section", "")
# Provide Python-specific settings for pyright
if section == "python":
result.append({
"pythonPath": "python",
"analysis": {
"autoSearchPaths": True,
"useLibraryCodeForTypes": True,
"diagnosticMode": "workspace",
}
})
elif section == "python.analysis":
result.append({
"autoSearchPaths": True,
"useLibraryCodeForTypes": True,
"diagnosticMode": "workspace",
"typeCheckingMode": "basic",
})
else:
# Return empty object for unknown sections
result.append({})
sections = [item.get("section", "") for item in items]
logger.info(f"Responding to workspace/configuration with {len(result)} items for sections: {sections}")
elif method == "client/registerCapability":
# Accept capability registration
result = None
elif method == "window/workDoneProgress/create":
# Accept progress token creation
result = None
else:
logger.debug(f"Unhandled server request: {method}")
# Send response
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result,
}
try:
encoded = self._encode_message(response)
state.writer.write(encoded)
await state.writer.drain()
logger.debug(f"Sent response to server request {method} (id={request_id})")
except Exception as e:
logger.error(f"Failed to respond to server request {method}: {e}")
async def _send_request(
self,
state: ServerState,
method: str,
params: Optional[Dict[str, Any]],
timeout: Optional[float] = None,
) -> Any:
"""Send a request to the language server and wait for response.
Args:
state: Server state
method: LSP method name (e.g., "textDocument/definition")
params: Request parameters
timeout: Request timeout in seconds
Returns:
Response result
"""
state.request_id += 1
request_id = state.request_id
message = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params or {},
}
future: asyncio.Future = asyncio.get_event_loop().create_future()
state.pending_requests[request_id] = future
try:
encoded = self._encode_message(message)
logger.debug(f"Sending request id={request_id}, method={method}")
state.writer.write(encoded)
await state.writer.drain()
return await asyncio.wait_for(
future,
timeout=timeout or self.timeout
)
except asyncio.TimeoutError:
state.pending_requests.pop(request_id, None)
logger.warning(f"Request timed out: {method}")
return None
except Exception as e:
state.pending_requests.pop(request_id, None)
logger.error(f"Request failed: {method} - {e}")
return None
async def _send_notification(
self,
state: ServerState,
method: str,
params: Optional[Dict[str, Any]],
) -> None:
"""Send a notification to the language server (no response expected)."""
message = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
try:
encoded = self._encode_message(message)
logger.debug(f"Sending notification: {method} ({len(encoded)} bytes)")
state.writer.write(encoded)
await state.writer.drain()
logger.debug(f"Notification sent: {method}")
except Exception as e:
logger.error(f"Failed to send notification {method}: {e}")
def _to_text_document_identifier(self, file_path: str) -> Dict[str, str]:
"""Create TextDocumentIdentifier from file path."""
uri = Path(file_path).resolve().as_uri()
return {"uri": uri}
def _to_position(self, line: int, character: int) -> Dict[str, int]:
"""Create LSP Position (0-indexed) from 1-indexed line/character."""
return {
"line": max(0, line - 1), # Convert 1-indexed to 0-indexed
"character": max(0, character - 1),
}
async def _open_document(self, state: ServerState, file_path: str) -> None:
"""Send textDocument/didOpen notification."""
resolved_path = Path(file_path).resolve()
try:
content = resolved_path.read_text(encoding="utf-8")
except Exception as e:
logger.error(f"Failed to read file {file_path}: {e}")
return
# Detect language ID from extension
language_id = self.get_language_id(file_path) or "plaintext"
logger.debug(f"Opening document: {resolved_path.name} ({len(content)} chars)")
await self._send_notification(state, "textDocument/didOpen", {
"textDocument": {
"uri": resolved_path.as_uri(),
"languageId": language_id,
"version": 1,
"text": content,
}
})
# Give the language server a brief moment to process the file
# The message queue handles any server requests automatically
await asyncio.sleep(0.5)
# ========== Public LSP Methods ==========
async def get_definition(
self,
file_path: str,
line: int,
character: int,
) -> Optional[Dict[str, Any]]:
"""Get definition location for symbol at position.
Args:
file_path: Path to the source file
line: Line number (1-indexed)
character: Character position (1-indexed)
Returns:
Location dict with uri, line, character, or None
"""
state = await self._get_server(file_path)
if not state:
return None
# Open document first
await self._open_document(state, file_path)
result = await self._send_request(state, "textDocument/definition", {
"textDocument": self._to_text_document_identifier(file_path),
"position": self._to_position(line, character),
})
if not result:
return None
# Handle single location or array
if isinstance(result, list):
if len(result) == 0:
return None
result = result[0]
# Handle LocationLink vs Location
if "targetUri" in result:
# LocationLink format
return {
"uri": result["targetUri"],
"range": result.get("targetRange", result.get("targetSelectionRange", {})),
}
else:
# Location format
return result
async def get_references(
self,
file_path: str,
line: int,
character: int,
include_declaration: bool = True,
) -> List[Dict[str, Any]]:
"""Get all references to symbol at position.
Args:
file_path: Path to the source file
line: Line number (1-indexed)
character: Character position (1-indexed)
include_declaration: Whether to include the declaration
Returns:
List of Location dicts with uri and range
"""
state = await self._get_server(file_path)
if not state:
return []
# Open document first
await self._open_document(state, file_path)
result = await self._send_request(state, "textDocument/references", {
"textDocument": self._to_text_document_identifier(file_path),
"position": self._to_position(line, character),
"context": {
"includeDeclaration": include_declaration,
},
})
if not result or not isinstance(result, list):
return []
return result
async def get_hover(
self,
file_path: str,
line: int,
character: int,
) -> Optional[str]:
"""Get hover documentation for symbol at position.
Args:
file_path: Path to the source file
line: Line number (1-indexed)
character: Character position (1-indexed)
Returns:
Hover content as string, or None
"""
state = await self._get_server(file_path)
if not state:
return None
# Open document first
await self._open_document(state, file_path)
result = await self._send_request(state, "textDocument/hover", {
"textDocument": self._to_text_document_identifier(file_path),
"position": self._to_position(line, character),
})
if not result:
return None
contents = result.get("contents")
if not contents:
return None
# Parse contents (can be string, MarkedString, MarkupContent, or array)
return self._parse_hover_contents(contents)
def _parse_hover_contents(self, contents: Any) -> Optional[str]:
"""Parse hover contents into string."""
if isinstance(contents, str):
return contents
if isinstance(contents, dict):
# MarkupContent or MarkedString
return contents.get("value", contents.get("contents", ""))
if isinstance(contents, list):
parts = []
for item in contents:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
parts.append(item.get("value", ""))
return "\n\n".join(p for p in parts if p)
return None
async def get_document_symbols(
self,
file_path: str,
) -> List[Dict[str, Any]]:
"""Get all symbols in a document.
Args:
file_path: Path to the source file
Returns:
List of DocumentSymbol or SymbolInformation dicts
"""
state = await self._get_server(file_path)
if not state:
return []
# Open document first
await self._open_document(state, file_path)
result = await self._send_request(state, "textDocument/documentSymbol", {
"textDocument": self._to_text_document_identifier(file_path),
})
if not result or not isinstance(result, list):
return []
return result
async def get_call_hierarchy_items(
self,
file_path: str,
line: int,
character: int,
wait_for_analysis: float = 2.0,
) -> List[Dict[str, Any]]:
"""Prepare call hierarchy items for a position.
Args:
file_path: Path to the source file
line: Line number (1-indexed)
character: Character position (1-indexed)
wait_for_analysis: Time to wait for server analysis (seconds)
Returns:
List of CallHierarchyItem dicts
"""
state = await self._get_server(file_path)
if not state:
return []
# Check if call hierarchy is supported
if not state.capabilities.get("callHierarchyProvider"):
return []
# Open document first
await self._open_document(state, file_path)
# Wait for language server to complete analysis
# This is critical for Pyright to return valid call hierarchy items
if wait_for_analysis > 0:
await asyncio.sleep(wait_for_analysis)
result = await self._send_request(
state,
"textDocument/prepareCallHierarchy",
{
"textDocument": self._to_text_document_identifier(file_path),
"position": self._to_position(line, character),
},
)
if not result or not isinstance(result, list):
return []
return result
async def get_incoming_calls(
self,
item: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Get incoming calls for a call hierarchy item.
Args:
item: CallHierarchyItem from get_call_hierarchy_items
Returns:
List of CallHierarchyIncomingCall dicts
"""
# Determine language from item's uri
uri = item.get("uri", "")
file_path = uri.replace("file:///", "").replace("file://", "")
state = await self._get_server(file_path)
if not state:
return []
result = await self._send_request(
state,
"callHierarchy/incomingCalls",
{"item": item},
)
if not result or not isinstance(result, list):
return []
return result
async def get_outgoing_calls(
self,
item: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Get outgoing calls for a call hierarchy item.
Args:
item: CallHierarchyItem from get_call_hierarchy_items
Returns:
List of CallHierarchyOutgoingCall dicts
"""
# Determine language from item's uri
uri = item.get("uri", "")
file_path = uri.replace("file:///", "").replace("file://", "")
state = await self._get_server(file_path)
if not state:
return []
result = await self._send_request(
state,
"callHierarchy/outgoingCalls",
{"item": item},
)
if not result or not isinstance(result, list):
return []
return result
async def __aenter__(self) -> "StandaloneLspManager":
"""Async context manager entry."""
await self.start()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Async context manager exit - stop all servers."""
await self.stop()
# Simple test
if __name__ == "__main__":
async def test_standalone_manager():
"""Test StandaloneLspManager functionality."""
print("Testing StandaloneLspManager...")
print()
# Find a Python file to test with
test_file = Path(__file__).resolve()
print(f"Test file: {test_file}")
print()
async with StandaloneLspManager(
workspace_root=str(test_file.parent.parent.parent.parent), # codex-lens root
timeout=30.0,
) as manager:
print("1. Testing get_document_symbols...")
symbols = await manager.get_document_symbols(str(test_file))
print(f" Found {len(symbols)} symbols")
for sym in symbols[:5]:
name = sym.get("name", "?")
kind = sym.get("kind", "?")
print(f" - {name} (kind={kind})")
print()
print("2. Testing get_definition...")
# Test definition for 'asyncio' import (line 11)
definition = await manager.get_definition(str(test_file), 11, 8)
if definition:
print(f" Definition: {definition}")
else:
print(" No definition found")
print()
print("3. Testing get_hover...")
hover = await manager.get_hover(str(test_file), 11, 8)
if hover:
print(f" Hover: {hover[:200]}...")
else:
print(" No hover info")
print()
print("4. Testing get_references...")
refs = await manager.get_references(str(test_file), 50, 10)
print(f" Found {len(refs)} references")
for ref in refs[:3]:
print(f" - {ref}")
print()
print("Test complete!")
# Run the test
# Note: On Windows, use default ProactorEventLoop (supports subprocess creation)
asyncio.run(test_standalone_manager())