Add TypeScript LSP setup guide and enhance debugging tests

- Created a comprehensive guide for setting up TypeScript LSP in Claude Code, detailing installation methods, configuration, and troubleshooting.
- Added multiple debugging test scripts to validate LSP communication with pyright, including direct communication tests, configuration checks, and document symbol retrieval.
- Implemented error handling and logging for better visibility during LSP interactions.
This commit is contained in:
catlog22
2026-01-20 14:53:18 +08:00
parent 2f3a14e946
commit 9c9b1ad01c
8 changed files with 1432 additions and 115 deletions

View File

@@ -42,10 +42,10 @@ class ServerConfig:
max_restarts: int = 3
@dataclass
@dataclass
class ServerState:
"""State of a running language server."""
config: ServerConfig
process: asyncio.subprocess.Process
reader: asyncio.StreamReader
@@ -55,6 +55,8 @@ class ServerState:
capabilities: Dict[str, Any] = field(default_factory=dict)
pending_requests: Dict[int, asyncio.Future] = field(default_factory=dict)
restart_count: int = 0
# Queue for producer-consumer pattern - continuous reading puts messages here
message_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
class StandaloneLspManager:
@@ -253,20 +255,24 @@ class StandaloneLspManager:
self._servers[language_id] = state
# Start reading responses in background
self._read_tasks[language_id] = asyncio.create_task(
self._read_responses(language_id)
)
# Start reading stderr in background (prevents pipe buffer from filling up)
if process.stderr:
self._stderr_tasks[language_id] = asyncio.create_task(
self._read_stderr(language_id, process.stderr)
)
# Initialize the server
# CRITICAL: Start the continuous reader task IMMEDIATELY before any communication
# This ensures no messages are lost during initialization handshake
self._read_tasks[language_id] = asyncio.create_task(
self._continuous_reader(language_id)
)
# Start the message processor task to handle queued messages
asyncio.create_task(self._process_messages(language_id))
# Initialize the server - now uses queue for reading responses
await self._initialize_server(state)
logger.info(f"{config.display_name} started and initialized")
return state
@@ -353,49 +359,25 @@ class StandaloneLspManager:
return await self._start_server(language_id)
async def _initialize_server(self, state: ServerState) -> None:
"""Send initialize request to language server."""
"""Send initialize request and wait for response via the message queue.
The continuous reader and message processor are already running, so we just
send the request and wait for the response via pending_requests.
"""
root_uri = self.workspace_root.as_uri()
# Simplified params matching direct test that works
params = {
"processId": os.getpid(),
"processId": None, # Use None like direct test
"rootUri": root_uri,
"rootPath": str(self.workspace_root),
"capabilities": {
"textDocument": {
"synchronization": {
"dynamicRegistration": False,
"willSave": False,
"willSaveWaitUntil": False,
"didSave": True,
},
"completion": {
"dynamicRegistration": False,
"completionItem": {
"snippetSupport": False,
"documentationFormat": ["plaintext", "markdown"],
},
},
"hover": {
"dynamicRegistration": False,
"contentFormat": ["plaintext", "markdown"],
},
"definition": {
"dynamicRegistration": False,
"linkSupport": False,
},
"references": {
"dynamicRegistration": False,
},
"documentSymbol": {
"dynamicRegistration": False,
"hierarchicalDocumentSymbolSupport": True,
},
"callHierarchy": {
"dynamicRegistration": False,
},
},
"workspace": {
"workspaceFolders": True,
"configuration": True,
},
},
@@ -405,17 +387,49 @@ class StandaloneLspManager:
"name": self.workspace_root.name,
}
],
"initializationOptions": state.config.initialization_options,
}
result = await self._send_request(state, "initialize", params)
if result:
state.capabilities = result.get("capabilities", {})
state.initialized = True
# Send initialize request and wait for response via queue
state.request_id += 1
init_request_id = state.request_id
# Send initialized notification
await self._send_notification(state, "initialized", {})
# Create future for the response
future: asyncio.Future = asyncio.get_event_loop().create_future()
state.pending_requests[init_request_id] = future
# Send the request
init_message = {
"jsonrpc": "2.0",
"id": init_request_id,
"method": "initialize",
"params": params,
}
encoded = self._encode_message(init_message)
logger.debug(f"Sending initialize request id={init_request_id}")
state.writer.write(encoded)
await state.writer.drain()
# Wait for response (will be routed by _process_messages)
try:
init_result = await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
state.pending_requests.pop(init_request_id, None)
raise RuntimeError("Initialize request timed out")
if init_result is None:
init_result = {}
# Store capabilities
state.capabilities = init_result.get("capabilities", {})
state.initialized = True
logger.debug(f"Initialize response received, capabilities: {len(state.capabilities)} keys")
# Send initialized notification
await self._send_notification(state, "initialized", {})
# Give time for server to process initialized and send any requests
# The message processor will handle workspace/configuration automatically
await asyncio.sleep(0.5)
def _encode_message(self, content: Dict[str, Any]) -> bytes:
"""Encode a JSON-RPC message with LSP headers."""
@@ -465,63 +479,121 @@ class StandaloneLspManager:
except Exception as e:
logger.error(f"Error reading message: {e}")
return None, True
async def _read_responses(self, language_id: str) -> None:
"""Background task to read responses from a language server."""
async def _continuous_reader(self, language_id: str) -> None:
"""Continuously read messages from language server and put them in the queue.
This is the PRODUCER in the producer-consumer pattern. It starts IMMEDIATELY
after subprocess creation and runs continuously until shutdown. This ensures
no messages are ever lost, even during initialization handshake.
"""
state = self._servers.get(language_id)
if not state:
return
logger.debug(f"Continuous reader started for {language_id}")
try:
while True:
# Yield to allow other tasks to run
await asyncio.sleep(0)
try:
# Read headers with timeout
content_length = 0
while True:
try:
line = await asyncio.wait_for(state.reader.readline(), timeout=5.0)
except asyncio.TimeoutError:
continue # Keep waiting for data
message, stream_closed = await self._read_message(state.reader)
if not line:
logger.debug(f"Continuous reader for {language_id}: EOF")
return
if stream_closed:
logger.debug(f"Read loop for {language_id}: stream closed")
break
line_str = line.decode("ascii").strip()
if not line_str:
break # End of headers
if message is None:
# Just a timeout, continue waiting
logger.debug(f"Read loop for {language_id}: timeout, continuing...")
continue
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
# Log all incoming messages for debugging
msg_id = message.get("id", "none")
msg_method = message.get("method", "none")
logger.debug(f"Received message: id={msg_id}, method={msg_method}")
if content_length == 0:
continue
# Handle response (has id but no method)
if "id" in message and "method" not in message:
request_id = message["id"]
logger.debug(f"Received response id={request_id}, pending={list(state.pending_requests.keys())}")
if request_id in state.pending_requests:
future = state.pending_requests.pop(request_id)
# Read body
body = await state.reader.readexactly(content_length)
message = json.loads(body.decode("utf-8"))
# Put message in queue for processing
await state.message_queue.put(message)
msg_id = message.get("id", "none")
msg_method = message.get("method", "none")
logger.debug(f"Queued message: id={msg_id}, method={msg_method}")
except asyncio.IncompleteReadError:
logger.debug(f"Continuous reader for {language_id}: IncompleteReadError")
return
except Exception as e:
logger.error(f"Error in continuous reader for {language_id}: {e}")
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.debug(f"Continuous reader cancelled for {language_id}")
except Exception as e:
logger.error(f"Fatal error in continuous reader for {language_id}: {e}")
async def _process_messages(self, language_id: str) -> None:
"""Process messages from the queue and route them appropriately.
This is the CONSUMER in the producer-consumer pattern. It handles:
- Server requests (workspace/configuration, etc.) - responds immediately
- Notifications (window/logMessage, etc.) - logs them
- Responses to our requests are NOT handled here - they're consumed by _wait_for_response
"""
state = self._servers.get(language_id)
if not state:
return
logger.debug(f"Message processor started for {language_id}")
try:
while True:
# Get message from queue (blocks until available)
message = await state.message_queue.get()
msg_id = message.get("id")
method = message.get("method", "")
# Response (has id but no method) - put back for _wait_for_response to consume
if msg_id is not None and not method:
# This is a response to one of our requests
if msg_id in state.pending_requests:
future = state.pending_requests.pop(msg_id)
if "error" in message:
future.set_exception(
Exception(message["error"].get("message", "Unknown error"))
)
else:
future.set_result(message.get("result"))
logger.debug(f"Response routed to pending request id={msg_id}")
else:
logger.debug(f"No pending request for id={request_id}")
logger.debug(f"No pending request for response id={msg_id}")
# Handle server request (has both id and method) - needs response
elif "id" in message and "method" in message:
logger.info(f"Server request received: {message.get('method')} with id={message.get('id')}")
# Server request (has both id and method) - needs response
elif msg_id is not None and method:
logger.info(f"Server request: {method} (id={msg_id})")
await self._handle_server_request(state, message)
# Handle notification from server (has method but no id)
elif "method" in message:
# Notification (has method but no id)
elif method:
self._handle_server_message(language_id, message)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error in read loop for {language_id}: {e}")
state.message_queue.task_done()
except asyncio.CancelledError:
logger.debug(f"Message processor cancelled for {language_id}")
except Exception as e:
logger.error(f"Error in message processor for {language_id}: {e}")
async def _read_stderr(self, language_id: str, stderr: asyncio.StreamReader) -> None:
"""Background task to read stderr from a language server.
@@ -732,9 +804,9 @@ class StandaloneLspManager:
}
})
# Give the language server time to process the file and send any requests
# The read loop running in background will handle workspace/configuration requests
await asyncio.sleep(2.0)
# Give the language server a brief moment to process the file
# The message queue handles any server requests automatically
await asyncio.sleep(0.5)
# ========== Public LSP Methods ==========

View File

@@ -0,0 +1,149 @@
#!/usr/bin/env python
"""Compare manager read behavior vs direct read."""
import asyncio
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from codexlens.lsp.standalone_manager import StandaloneLspManager
async def direct_test():
"""Direct communication - this works."""
workspace = Path(__file__).parent.parent.parent
print("\n=== DIRECT TEST ===")
process = await asyncio.create_subprocess_exec(
"pyright-langserver", "--stdio",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
def encode_message(content):
body = json.dumps(content).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n"
return header.encode("ascii") + body
async def send(message):
encoded = encode_message(message)
process.stdin.write(encoded)
await process.stdin.drain()
msg_desc = message.get('method') or f"response id={message.get('id')}"
print(f" SENT: {msg_desc}")
async def read_one():
content_length = 0
while True:
line = await asyncio.wait_for(process.stdout.readline(), timeout=3.0)
if not line:
return None
line_str = line.decode("ascii").strip()
if not line_str:
break
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
if content_length == 0:
return None
body = await process.stdout.readexactly(content_length)
return json.loads(body.decode("utf-8"))
# Initialize
print(" Sending initialize...")
await send({
"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {
"processId": None,
"rootUri": workspace.as_uri(),
"capabilities": {"workspace": {"configuration": True}},
"workspaceFolders": [{"uri": workspace.as_uri(), "name": workspace.name}],
},
})
# Read until response
while True:
msg = await read_one()
if msg and msg.get("id") == 1:
print(f" Initialize response OK")
break
elif msg:
print(f" Notification: {msg.get('method')}")
# Send initialized
print(" Sending initialized...")
await send({"jsonrpc": "2.0", "method": "initialized", "params": {}})
# Check for workspace/configuration
print(" Checking for workspace/configuration (3s timeout)...")
try:
for i in range(10):
msg = await read_one()
if msg:
method = msg.get("method")
msg_id = msg.get("id")
print(f" RECV: {method or 'response'} (id={msg_id})")
if method == "workspace/configuration":
print(" SUCCESS: workspace/configuration received!")
break
except asyncio.TimeoutError:
print(" TIMEOUT: No more messages")
process.terminate()
await process.wait()
async def manager_test():
"""Manager communication - investigating why this doesn't work."""
workspace = Path(__file__).parent.parent.parent
print("\n=== MANAGER TEST ===")
manager = StandaloneLspManager(
workspace_root=str(workspace),
timeout=60.0
)
await manager.start()
# Just check if server initialized
state = manager._servers.get("python")
if state:
print(f" Server initialized: {state.initialized}")
print(f" Capabilities: {len(state.capabilities)} keys")
else:
# Force initialization by getting server for a Python file
print(" Getting server for Python file...")
test_file = workspace / "tests" / "real" / "debug_compare.py"
state = await manager._get_server(str(test_file))
if state:
print(f" Server initialized: {state.initialized}")
# Try to read directly from state.reader
if state:
print("\n Direct read test from state.reader:")
print(f" state.reader is: {type(state.reader)}")
print(f" state.reader at_eof: {state.reader.at_eof()}")
# Check if there's data available
try:
line = await asyncio.wait_for(state.reader.readline(), timeout=1.0)
if line:
print(f" Got line: {line[:50]}...")
else:
print(f" readline returned empty (EOF)")
except asyncio.TimeoutError:
print(f" readline timed out (no data)")
await manager.stop()
async def main():
await direct_test()
await manager_test()
print("\n=== DONE ===")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python
"""Test if pyright sends workspace/configuration after initialized."""
import asyncio
import json
import sys
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
async def read_message_direct(reader):
"""Read a JSON-RPC message - direct blocking read, no timeout."""
content_length = 0
while True:
line = await reader.readline()
if not line:
return None
line_str = line.decode("ascii").strip()
if not line_str:
break
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
if content_length == 0:
return None
body = await reader.readexactly(content_length)
return json.loads(body.decode("utf-8"))
async def main():
workspace = Path(__file__).parent.parent.parent
print(f"Workspace: {workspace}")
# Start pyright - exactly like in direct test
process = await asyncio.create_subprocess_exec(
"pyright-langserver", "--stdio",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
def encode_message(content):
body = json.dumps(content).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n"
return header.encode("ascii") + body
async def send(message):
encoded = encode_message(message)
process.stdin.write(encoded)
await process.stdin.drain()
method_or_resp = message.get('method') or f"response id={message.get('id')}"
print(f"SENT: {method_or_resp} ({len(encoded)} bytes)")
# Start stderr reader
async def read_stderr():
while True:
line = await process.stderr.readline()
if not line:
break
print(f"[stderr] {line.decode('utf-8', errors='replace').rstrip()}")
asyncio.create_task(read_stderr())
print("\n=== INITIALIZE ===")
await send({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"processId": None,
"rootUri": workspace.as_uri(),
"rootPath": str(workspace),
"capabilities": {
"workspace": {"configuration": True},
},
"workspaceFolders": [{"uri": workspace.as_uri(), "name": workspace.name}],
},
})
# Read until we get initialize response
print("Reading initialize response...")
while True:
msg = await asyncio.wait_for(read_message_direct(process.stdout), timeout=10)
if msg is None:
break
method = msg.get("method")
msg_id = msg.get("id")
if method:
print(f"RECV: {method} (notification)")
else:
print(f"RECV: response id={msg_id}")
if msg_id == 1:
print("Initialize OK!")
break
print("\n=== SEND INITIALIZED ===")
await send({
"jsonrpc": "2.0",
"method": "initialized",
"params": {},
})
# Now, here's the key test - will we receive workspace/configuration?
print("\n=== WAIT FOR workspace/configuration ===")
print("Reading with 5 second timeout...")
try:
for i in range(10):
msg = await asyncio.wait_for(read_message_direct(process.stdout), timeout=2)
if msg is None:
print("EOF")
break
method = msg.get("method")
msg_id = msg.get("id")
print(f"RECV: method={method}, id={msg_id}")
# Respond to server requests
if msg_id is not None and method:
if method == "workspace/configuration":
print(" -> Got workspace/configuration! Responding...")
await send({
"jsonrpc": "2.0",
"id": msg_id,
"result": [{} for _ in msg.get("params", {}).get("items", [])],
})
else:
print(f" -> Responding to {method}")
await send({"jsonrpc": "2.0", "id": msg_id, "result": None})
except asyncio.TimeoutError:
print("No more messages (timeout)")
print("\n=== Now start background read task like manager does ===")
# Store references like manager does
reader = process.stdout # This is how manager does it
writer = process.stdin
# Start background read task
async def bg_read_loop():
print("[BG] Read loop started")
try:
while True:
await asyncio.sleep(0)
try:
msg = await asyncio.wait_for(read_message_direct(reader), timeout=1.0)
if msg is None:
print("[BG] Stream closed")
break
bg_method = msg.get('method') or f"response id={msg.get('id')}"
print(f"[BG] RECV: {bg_method}")
# Handle server requests
method = msg.get("method")
msg_id = msg.get("id")
if msg_id is not None and method:
print(f"[BG] Responding to {method}")
await send({"jsonrpc": "2.0", "id": msg_id, "result": None})
except asyncio.TimeoutError:
print("[BG] timeout")
except asyncio.CancelledError:
print("[BG] Cancelled")
read_task = asyncio.create_task(bg_read_loop())
# Wait a moment
await asyncio.sleep(0.5)
# Now send didOpen and documentSymbol like manager does
print("\n=== SEND didOpen ===")
test_file = workspace / "tests" / "real" / "debug_config.py"
await send({
"jsonrpc": "2.0",
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": test_file.as_uri(),
"languageId": "python",
"version": 1,
"text": test_file.read_text(),
},
},
})
# Wait for processing
await asyncio.sleep(2)
print("\n=== SEND documentSymbol ===")
await send({
"jsonrpc": "2.0",
"id": 2,
"method": "textDocument/documentSymbol",
"params": {"textDocument": {"uri": test_file.as_uri()}},
})
# Wait for response
print("Waiting for documentSymbol response (max 30s)...")
deadline = asyncio.get_event_loop().time() + 30
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(0.5)
# The background task will print when it receives the response
read_task.cancel()
try:
await read_task
except asyncio.CancelledError:
pass
process.terminate()
print("\nDone!")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,320 @@
#!/usr/bin/env python
"""Minimal direct test of pyright LSP communication."""
import asyncio
import json
import sys
from pathlib import Path
async def send_message(writer, message):
"""Send a JSON-RPC message."""
body = json.dumps(message).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
writer.write(header + body)
await writer.drain()
print(f"SENT: {message.get('method', 'response')} (id={message.get('id', 'N/A')})")
async def read_message(reader):
"""Read a JSON-RPC message."""
# Read headers
content_length = 0
while True:
line = await reader.readline()
if not line:
return None
line_str = line.decode("ascii").strip()
if not line_str:
break
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
if content_length == 0:
return None
# Read body
body = await reader.readexactly(content_length)
return json.loads(body.decode("utf-8"))
async def main():
workspace = Path(__file__).parent.parent.parent
test_file = workspace / "tests" / "real" / "debug_direct.py"
print(f"Workspace: {workspace}")
print(f"Test file: {test_file}")
print()
# Start pyright
print("Starting pyright-langserver...")
process = await asyncio.create_subprocess_exec(
"pyright-langserver", "--stdio",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
# Start stderr reader
async def read_stderr():
while True:
line = await process.stderr.readline()
if not line:
break
print(f"[stderr] {line.decode('utf-8', errors='replace').rstrip()}")
stderr_task = asyncio.create_task(read_stderr())
try:
# 1. Send initialize
print("\n=== INITIALIZE ===")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"processId": None,
"rootUri": workspace.as_uri(),
"rootPath": str(workspace),
"capabilities": {
"textDocument": {
"documentSymbol": {
"hierarchicalDocumentSymbolSupport": True,
},
},
"workspace": {
"configuration": True,
},
},
"workspaceFolders": [{"uri": workspace.as_uri(), "name": workspace.name}],
},
})
# Read all messages until we get initialize response
print("\n=== READING RESPONSES ===")
init_done = False
for i in range(20):
try:
msg = await asyncio.wait_for(read_message(process.stdout), timeout=5.0)
if msg is None:
print("EOF")
break
method = msg.get("method", "")
msg_id = msg.get("id", "N/A")
if method:
print(f"RECV: {method} (id={msg_id})")
# Handle server requests
if msg_id != "N/A":
if method == "workspace/configuration":
print(" -> Responding to workspace/configuration")
items = msg.get("params", {}).get("items", [])
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": [{"pythonPath": "python"} for _ in items],
})
elif method == "client/registerCapability":
print(" -> Responding to client/registerCapability")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": None,
})
elif method == "window/workDoneProgress/create":
print(" -> Responding to window/workDoneProgress/create")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": None,
})
else:
print(f"RECV: response (id={msg_id})")
if msg_id == 1:
print(" -> Initialize response received!")
caps = list(msg.get("result", {}).get("capabilities", {}).keys())
print(f" -> Capabilities: {caps[:5]}...")
init_done = True
break
except asyncio.TimeoutError:
print(f" Timeout waiting for message {i+1}")
break
if not init_done:
print("ERROR: Initialize failed")
return
# 2. Send initialized notification
print("\n=== INITIALIZED ===")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"method": "initialized",
"params": {},
})
# Read any messages pyright sends after initialized
print("\n=== READING POST-INITIALIZED MESSAGES ===")
for i in range(10):
try:
msg = await asyncio.wait_for(read_message(process.stdout), timeout=2.0)
if msg is None:
break
method = msg.get("method", "")
msg_id = msg.get("id", "N/A")
print(f"RECV: {method or 'response'} (id={msg_id})")
# Handle server requests
if msg_id != "N/A" and method:
if method == "workspace/configuration":
print(" -> Responding to workspace/configuration")
items = msg.get("params", {}).get("items", [])
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": [{"pythonPath": "python"} for _ in items],
})
elif method == "client/registerCapability":
print(" -> Responding to client/registerCapability")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": None,
})
elif method == "window/workDoneProgress/create":
print(" -> Responding to window/workDoneProgress/create")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": None,
})
except asyncio.TimeoutError:
print(f" No more messages (timeout)")
break
# 3. Send didOpen
print("\n=== DIDOPEN ===")
content = test_file.read_text(encoding="utf-8")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": test_file.as_uri(),
"languageId": "python",
"version": 1,
"text": content,
},
},
})
# Read any messages
print("\n=== READING POST-DIDOPEN MESSAGES ===")
for i in range(10):
try:
msg = await asyncio.wait_for(read_message(process.stdout), timeout=2.0)
if msg is None:
break
method = msg.get("method", "")
msg_id = msg.get("id", "N/A")
print(f"RECV: {method or 'response'} (id={msg_id})")
# Handle server requests
if msg_id != "N/A" and method:
if method == "workspace/configuration":
print(" -> Responding to workspace/configuration")
items = msg.get("params", {}).get("items", [])
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": [{"pythonPath": "python"} for _ in items],
})
else:
print(f" -> Responding with null to {method}")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": None,
})
except asyncio.TimeoutError:
print(f" No more messages (timeout)")
break
# 4. Send documentSymbol request
print("\n=== DOCUMENTSYMBOL ===")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": 2,
"method": "textDocument/documentSymbol",
"params": {
"textDocument": {"uri": test_file.as_uri()},
},
})
# Wait for response
print("\n=== READING DOCUMENTSYMBOL RESPONSE ===")
for i in range(20):
try:
msg = await asyncio.wait_for(read_message(process.stdout), timeout=5.0)
if msg is None:
break
method = msg.get("method", "")
msg_id = msg.get("id", "N/A")
if method:
print(f"RECV: {method} (id={msg_id})")
# Handle server requests
if msg_id != "N/A":
if method == "workspace/configuration":
print(" -> Responding to workspace/configuration")
items = msg.get("params", {}).get("items", [])
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": [{"pythonPath": "python"} for _ in items],
})
else:
print(f" -> Responding with null to {method}")
await send_message(process.stdin, {
"jsonrpc": "2.0",
"id": msg_id,
"result": None,
})
else:
print(f"RECV: response (id={msg_id})")
if msg_id == 2:
result = msg.get("result", [])
print(f" -> DocumentSymbol response: {len(result)} symbols")
for sym in result[:5]:
print(f" - {sym.get('name')} ({sym.get('kind')})")
break
except asyncio.TimeoutError:
print(f" Timeout {i+1}")
if i >= 5:
break
print("\n=== DONE ===")
finally:
stderr_task.cancel()
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
process.kill()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,123 @@
#!/usr/bin/env python
"""Debug exactly what's happening with reads after initialized."""
import asyncio
import json
from pathlib import Path
async def main():
workspace = Path(__file__).parent.parent.parent
print(f"Workspace: {workspace}")
# Start pyright
process = await asyncio.create_subprocess_exec(
"pyright-langserver", "--stdio",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
# Helper to encode message
def encode(content):
body = json.dumps(content).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n"
return header.encode("ascii") + body
# Helper to send
async def send(msg):
encoded = encode(msg)
process.stdin.write(encoded)
await process.stdin.drain()
method = msg.get("method") or f"response-{msg.get('id')}"
print(f"SENT: {method}")
# Helper to read one message
async def read_one(timeout=3.0):
content_length = 0
while True:
try:
print(f" readline(timeout={timeout})...")
line = await asyncio.wait_for(process.stdout.readline(), timeout=timeout)
print(f" got line: {repr(line[:50] if len(line) > 50 else line)}")
except asyncio.TimeoutError:
print(f" TIMEOUT on readline")
return None
if not line:
print(f" EOF")
return None
line_str = line.decode("ascii").strip()
if not line_str:
break # End of headers
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
if content_length == 0:
return None
body = await process.stdout.readexactly(content_length)
return json.loads(body.decode("utf-8"))
# Start stderr reader
async def read_stderr():
while True:
line = await process.stderr.readline()
if not line:
break
print(f"[stderr] {line.decode('utf-8', errors='replace').rstrip()}")
asyncio.create_task(read_stderr())
print("\n=== INITIALIZE ===")
await send({
"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {
"processId": None,
"rootUri": workspace.as_uri(),
"capabilities": {"workspace": {"configuration": True}},
"workspaceFolders": [{"uri": workspace.as_uri(), "name": workspace.name}],
},
})
# Read until initialize response
print("\n=== READING UNTIL INITIALIZE RESPONSE ===")
while True:
msg = await read_one()
if msg and msg.get("id") == 1 and "method" not in msg:
print(f"Got initialize response")
break
elif msg:
print(f"Got notification: {msg.get('method')}")
print("\n=== SEND INITIALIZED ===")
await send({"jsonrpc": "2.0", "method": "initialized", "params": {}})
print("\n=== NOW TRY TO READ WORKSPACE/CONFIGURATION ===")
print("Attempting reads with 2s timeout each...")
for i in range(3):
print(f"\n--- Read attempt {i+1} ---")
msg = await read_one(timeout=2.0)
if msg:
method = msg.get("method", "")
msg_id = msg.get("id")
print(f"SUCCESS: method={method}, id={msg_id}")
if method and msg_id is not None:
# Respond to server request
print(f"Responding to {method}")
await send({"jsonrpc": "2.0", "id": msg_id, "result": [{}]})
else:
print(f"No message (timeout or EOF)")
break
print("\n=== CLEANUP ===")
process.terminate()
await process.wait()
print("Done")
if __name__ == "__main__":
asyncio.run(main())