Add unit tests for LspGraphBuilder class

- Implement comprehensive unit tests for the LspGraphBuilder class to validate its functionality in building code association graphs.
- Tests cover various scenarios including single level graph expansion, max nodes and depth boundaries, concurrent expansion limits, document symbol caching, error handling during node expansion, and edge cases such as empty seed lists and self-referencing nodes.
- Utilize pytest and asyncio for asynchronous testing and mocking of LspBridge methods.
This commit is contained in:
catlog22
2026-01-20 12:49:31 +08:00
parent 1376dc71d9
commit 2f3a14e946
33 changed files with 8303 additions and 716 deletions

View File

@@ -0,0 +1,5 @@
"""Real interface tests for LSP integration.
These tests require VSCode Bridge to be running.
See test_lsp_real_interface.py for details.
"""

View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python
"""Direct comparison: standalone manager vs direct subprocess."""
import asyncio
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
async def test_direct():
"""Direct subprocess test that WORKS."""
print("\n=== DIRECT SUBPROCESS TEST ===")
process = await asyncio.create_subprocess_exec(
'pyright-langserver', '--stdio',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(Path(__file__).parent.parent.parent),
)
def encode(msg):
body = json.dumps(msg).encode('utf-8')
header = f'Content-Length: {len(body)}\r\n\r\n'.encode('ascii')
return header + body
async def read_message(timeout=5.0):
content_length = 0
while True:
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=timeout)
except asyncio.TimeoutError:
return None
if not line:
return None
line_str = line.decode('ascii').strip()
if not line_str:
break
if line_str.lower().startswith('content-length:'):
content_length = int(line_str.split(':')[1].strip())
if content_length == 0:
return None
body = await process.stdout.readexactly(content_length)
return json.loads(body.decode('utf-8'))
# Initialize
init = {
'jsonrpc': '2.0', 'id': 1, 'method': 'initialize',
'params': {
'processId': 12345,
'rootUri': 'file:///D:/Claude_dms3/codex-lens',
'rootPath': 'D:/Claude_dms3/codex-lens',
'capabilities': {
'textDocument': {
'synchronization': {'dynamicRegistration': False},
'documentSymbol': {'hierarchicalDocumentSymbolSupport': True},
},
'workspace': {'configuration': True, 'workspaceFolders': True},
},
'workspaceFolders': [{'uri': 'file:///D:/Claude_dms3/codex-lens', 'name': 'codex-lens'}],
'initializationOptions': {},
}
}
process.stdin.write(encode(init))
await process.stdin.drain()
while True:
msg = await read_message(5.0)
if msg is None or msg.get('id') == 1:
print(f" Got initialize response")
break
# Initialized
process.stdin.write(encode({'jsonrpc': '2.0', 'method': 'initialized', 'params': {}}))
await process.stdin.drain()
print(" Sent initialized")
# didOpen with simple content
did_open = {
'jsonrpc': '2.0', 'method': 'textDocument/didOpen',
'params': {
'textDocument': {
'uri': 'file:///D:/Claude_dms3/codex-lens/simple.py',
'languageId': 'python',
'version': 1,
'text': 'def hello():\n pass\n'
}
}
}
process.stdin.write(encode(did_open))
await process.stdin.drain()
print(" Sent didOpen")
# Read and respond to configuration requests
print(" Waiting for messages...")
for i in range(15):
msg = await read_message(2.0)
if msg is None:
continue
method = msg.get('method')
print(f" RECV: id={msg.get('id')}, method={method}")
if method == 'workspace/configuration':
process.stdin.write(encode({'jsonrpc': '2.0', 'id': msg['id'], 'result': [{}]}))
await process.stdin.drain()
if method == 'textDocument/publishDiagnostics':
break
# documentSymbol
doc_sym = {
'jsonrpc': '2.0', 'id': 2, 'method': 'textDocument/documentSymbol',
'params': {'textDocument': {'uri': 'file:///D:/Claude_dms3/codex-lens/simple.py'}}
}
process.stdin.write(encode(doc_sym))
await process.stdin.drain()
print(" Sent documentSymbol")
for i in range(5):
msg = await read_message(3.0)
if msg is None:
continue
if msg.get('id') == 2:
result = msg.get('result', [])
print(f" GOT {len(result)} SYMBOLS!")
break
process.terminate()
await process.wait()
async def test_manager():
"""Standalone manager test that FAILS."""
print("\n=== STANDALONE MANAGER TEST ===")
from codexlens.lsp.standalone_manager import StandaloneLspManager
workspace = Path(__file__).parent.parent.parent
manager = StandaloneLspManager(
workspace_root=str(workspace),
timeout=30.0
)
await manager.start()
simple_file = workspace / "simple.py"
simple_file.write_text('def hello():\n pass\n')
try:
symbols = await manager.get_document_symbols(str(simple_file))
print(f" GOT {len(symbols)} SYMBOLS!")
finally:
simple_file.unlink(missing_ok=True)
await manager.stop()
async def main():
await test_direct()
await test_manager()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python
"""Test concurrent read loop behavior."""
import asyncio
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
import logging
logging.basicConfig(level=logging.DEBUG, format='%(name)s - %(levelname)s - %(message)s')
from codexlens.lsp.standalone_manager import StandaloneLspManager
async def test():
workspace = Path(__file__).parent.parent.parent
manager = StandaloneLspManager(
workspace_root=str(workspace),
timeout=30.0
)
await manager.start()
# Get server for a simple file
simple_content = "def hello():\n pass\n"
simple_file = workspace / "test_simple.py"
simple_file.write_text(simple_content)
try:
print("\n=== Getting server ===")
state = await manager._get_server(str(simple_file))
print(f"Server state: initialized={state.initialized if state else 'None'}")
print("\n=== Sending didOpen ===")
await manager._send_notification(state, "textDocument/didOpen", {
"textDocument": {
"uri": simple_file.as_uri(),
"languageId": "python",
"version": 1,
"text": simple_content,
}
})
print("\n=== Waiting 5 seconds - watch for server requests ===")
for i in range(5):
print(f" Tick {i+1}...")
await asyncio.sleep(1.0)
print("\n=== Sending documentSymbol ===")
result = await manager._send_request(
state,
"textDocument/documentSymbol",
{"textDocument": {"uri": simple_file.as_uri()}},
timeout=10.0
)
print(f"Result: {result}")
finally:
simple_file.unlink(missing_ok=True)
await manager.stop()
if __name__ == "__main__":
asyncio.run(test())

View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python
"""Debug script to check pyright LSP configuration requests."""
import asyncio
import logging
import sys
from pathlib import Path
# Enable DEBUG logging
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from codexlens.lsp.standalone_manager import StandaloneLspManager
async def test():
workspace = Path(__file__).parent.parent.parent
manager = StandaloneLspManager(
workspace_root=str(workspace),
timeout=60.0
)
await manager.start()
# Wait a bit after start to see if any requests come in
print("Waiting 3 seconds after start to see server requests...")
await asyncio.sleep(3)
# Try to get symbols for a simpler file
test_file = str(workspace / "tests" / "real" / "debug_lsp.py")
print(f"Testing with: {test_file}")
# Let's see if we can check what pyright sees
print("Checking server state...")
state = manager._servers.get("python")
if state:
print(f" - Process running: {state.process.returncode is None}")
print(f" - Initialized: {state.initialized}")
print(f" - Pending requests: {list(state.pending_requests.keys())}")
try:
symbols = await manager.get_document_symbols(test_file)
print(f"Got {len(symbols)} symbols")
for s in symbols[:5]:
print(f" - {s}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
await manager.stop()
if __name__ == "__main__":
asyncio.run(test())

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python
"""Debug script to test StandaloneLspManager directly."""
import asyncio
import logging
import sys
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
# Enable debug logging
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(name)s: %(message)s")
from codexlens.lsp.standalone_manager import StandaloneLspManager
async def test_standalone_manager():
"""Test StandaloneLspManager directly."""
workspace = Path(__file__).parent.parent.parent
test_file = workspace / "src" / "codexlens" / "lsp" / "lsp_bridge.py"
print(f"Workspace: {workspace}")
print(f"Test file: {test_file}")
print()
manager = StandaloneLspManager(workspace_root=str(workspace), timeout=30.0)
print("Starting manager...")
await manager.start()
print(f"Configs loaded: {list(manager._configs.keys())}")
print(f"Servers running: {list(manager._servers.keys())}")
# Try to get the server for the test file
print(f"\nGetting server for {test_file.name}...")
server = await manager._get_server(str(test_file))
if server:
print(f"Server: {server.config.display_name}")
print(f"Initialized: {server.initialized}")
print(f"Capabilities: {list(server.capabilities.keys())}")
else:
print("Failed to get server!")
# Try to get document symbols
print(f"\nGetting document symbols for {test_file.name}...")
try:
symbols = await manager.get_document_symbols(str(test_file))
print(f"Found {len(symbols)} symbols")
for sym in symbols[:5]:
print(f" - {sym.get('name', '?')} ({sym.get('kind', '?')})")
except Exception as e:
print(f"Error getting symbols: {e}")
print("\nStopping manager...")
await manager.stop()
print("Done!")
if __name__ == "__main__":
asyncio.run(test_standalone_manager())

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python
"""Direct test of pyright-langserver communication."""
import asyncio
import json
import sys
async def test_pyright():
print("Starting pyright-langserver...")
process = await asyncio.create_subprocess_exec(
"pyright-langserver", "--stdio",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Build initialize request
init_msg = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"processId": 1234,
"rootUri": "file:///D:/Claude_dms3/codex-lens",
"rootPath": "D:/Claude_dms3/codex-lens",
"capabilities": {
"textDocument": {
"documentSymbol": {"hierarchicalDocumentSymbolSupport": True}
},
"workspace": {"configuration": True}
},
"workspaceFolders": [
{"uri": "file:///D:/Claude_dms3/codex-lens", "name": "codex-lens"}
]
}
}
body = json.dumps(init_msg).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
print(f"Sending initialize request ({len(body)} bytes)...")
process.stdin.write(header + body)
await process.stdin.drain()
# Read responses
print("Reading responses...")
for i in range(20):
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=2.0)
if not line:
print(" (empty line - stream closed)")
break
line_str = line.decode("ascii").strip()
print(f" Header: {line_str}")
if line_str.lower().startswith("content-length:"):
content_length = int(line_str.split(":")[1].strip())
# Read empty line
await process.stdout.readline()
# Read body
body_data = await process.stdout.readexactly(content_length)
msg = json.loads(body_data.decode("utf-8"))
print(f" Message: id={msg.get('id', 'none')}, method={msg.get('method', 'none')}")
if msg.get("id") == 1:
print(f" >>> GOT INITIALIZE RESPONSE!")
print(f" >>> Capabilities: {list(msg.get('result', {}).get('capabilities', {}).keys())[:10]}...")
# Send initialized notification
print("\nSending 'initialized' notification...")
init_notif = {"jsonrpc": "2.0", "method": "initialized", "params": {}}
body2 = json.dumps(init_notif).encode("utf-8")
header2 = f"Content-Length: {len(body2)}\r\n\r\n".encode("ascii")
process.stdin.write(header2 + body2)
await process.stdin.drain()
# Wait a moment for any server requests
print("Waiting for server requests...")
await asyncio.sleep(1.0)
continue # Keep reading to see if workspace/configuration comes
if msg.get("method") == "workspace/configuration":
print(f" >>> GOT workspace/configuration REQUEST!")
print(f" >>> Params: {msg.get('params')}")
except asyncio.TimeoutError:
print(" (timeout waiting for more data)")
break
process.terminate()
await process.wait()
print("Done.")
if __name__ == "__main__":
asyncio.run(test_pyright())

View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python
"""Minimal test that mimics the working direct test."""
import asyncio
import json
import sys
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
async def test_minimal():
"""Minimal test using the standalone manager."""
from codexlens.lsp.standalone_manager import StandaloneLspManager
workspace = Path(__file__).parent.parent.parent
manager = StandaloneLspManager(
workspace_root=str(workspace),
timeout=60.0
)
await manager.start()
# Get server state
server_state = await manager._get_server(str(workspace / "tests" / "real" / "minimal_test.py"))
if not server_state:
print("Failed to get server state")
await manager.stop()
return
print(f"Server initialized: {server_state.initialized}")
print(f"Server capabilities: {list(server_state.capabilities.keys())[:5]}...")
# Wait for any background messages
print("Waiting 5 seconds for background messages...")
await asyncio.sleep(5)
# Now send a documentSymbol request manually
print("Sending documentSymbol request...")
result = await manager._send_request(
server_state,
"textDocument/documentSymbol",
{"textDocument": {"uri": (workspace / "tests" / "real" / "minimal_test.py").resolve().as_uri()}},
timeout=30.0
)
print(f"Result: {result}")
await manager.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s')
asyncio.run(test_minimal())

View File

@@ -0,0 +1,313 @@
#!/usr/bin/env python
"""Quick real interface test script for LSP Bridge (Standalone Mode).
Usage:
python tests/real/quick_test.py
Requires: pyright-langserver installed (npm install -g pyright)
"""
import asyncio
import shutil
import sys
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from codexlens.lsp.lsp_bridge import LspBridge
from codexlens.lsp.lsp_graph_builder import LspGraphBuilder
from codexlens.hybrid_search.data_structures import CodeSymbolNode, Range
# Test file - the LSP bridge source itself
TEST_FILE = Path(__file__).parent.parent.parent / "src" / "codexlens" / "lsp" / "lsp_bridge.py"
WORKSPACE_ROOT = Path(__file__).parent.parent.parent # codex-lens root
def check_pyright():
"""Check if pyright-langserver is available."""
return shutil.which("pyright-langserver") is not None
async def test_get_definition():
"""Test get_definition."""
print("\n" + "=" * 60)
print("TEST: get_definition")
print("=" * 60)
symbol = CodeSymbolNode(
id=f"{TEST_FILE}:LspBridge:96",
name="LspBridge",
kind="class",
file_path=str(TEST_FILE),
range=Range(start_line=96, start_character=6, end_line=96, end_character=15),
)
print(f"Symbol: {symbol.name}")
print(f"File: {symbol.file_path}")
print(f"Position: line {symbol.range.start_line}, char {symbol.range.start_character}")
async with LspBridge(workspace_root=str(WORKSPACE_ROOT), timeout=30.0) as bridge:
result = await bridge.get_definition(symbol)
if result:
print(f"\n[OK] SUCCESS: Definition found at {result.file_path}:{result.line}")
else:
print(f"\n[WARN] No definition found (may be expected for class declaration)")
return result is not None
async def test_get_references():
"""Test get_references."""
print("\n" + "=" * 60)
print("TEST: get_references")
print("=" * 60)
symbol = CodeSymbolNode(
id=f"{TEST_FILE}:get_references:200",
name="get_references",
kind="method",
file_path=str(TEST_FILE),
range=Range(start_line=200, start_character=10, end_line=200, end_character=24),
)
print(f"Symbol: {symbol.name}")
print(f"File: {Path(symbol.file_path).name}")
print(f"Position: line {symbol.range.start_line}")
async with LspBridge(workspace_root=str(WORKSPACE_ROOT), timeout=30.0) as bridge:
refs = await bridge.get_references(symbol)
print(f"\n[OK] Found {len(refs)} references:")
for i, ref in enumerate(refs[:10]):
print(f" [{i+1}] {Path(ref.file_path).name}:{ref.line}")
if len(refs) > 10:
print(f" ... and {len(refs) - 10} more")
return len(refs) >= 0
async def test_get_hover():
"""Test get_hover."""
print("\n" + "=" * 60)
print("TEST: get_hover")
print("=" * 60)
symbol = CodeSymbolNode(
id=f"{TEST_FILE}:LspBridge:96",
name="LspBridge",
kind="class",
file_path=str(TEST_FILE),
range=Range(start_line=96, start_character=6, end_line=96, end_character=15),
)
print(f"Symbol: {symbol.name}")
async with LspBridge(workspace_root=str(WORKSPACE_ROOT), timeout=30.0) as bridge:
hover = await bridge.get_hover(symbol)
if hover:
preview = hover[:300].replace('\n', '\n ')
print(f"\n[OK] Hover info ({len(hover)} chars):")
print(f" {preview}...")
else:
print(f"\n[WARN] No hover info available")
return hover is not None
async def test_get_document_symbols():
"""Test get_document_symbols."""
print("\n" + "=" * 60)
print("TEST: get_document_symbols")
print("=" * 60)
file_path = str(TEST_FILE)
print(f"File: {Path(file_path).name}")
async with LspBridge(workspace_root=str(WORKSPACE_ROOT), timeout=30.0) as bridge:
symbols = await bridge.get_document_symbols(file_path)
print(f"\n[OK] Found {len(symbols)} symbols:")
# Group by kind
by_kind = {}
for sym in symbols:
kind = sym.get("kind", "unknown")
by_kind[kind] = by_kind.get(kind, 0) + 1
for kind, count in sorted(by_kind.items()):
print(f" {kind}: {count}")
print("\nSample symbols:")
for sym in symbols[:15]:
name = sym.get("name", "?")
kind = sym.get("kind", "?")
range_data = sym.get("range", {})
start = range_data.get("start", {})
line = start.get("line", 0) + 1
print(f" - {name} ({kind}) at line {line}")
return len(symbols) > 0
async def test_graph_expansion():
"""Test graph expansion."""
print("\n" + "=" * 60)
print("TEST: Graph Expansion (LspGraphBuilder)")
print("=" * 60)
seed = CodeSymbolNode(
id=f"{TEST_FILE}:LspBridge:96",
name="LspBridge",
kind="class",
file_path=str(TEST_FILE),
range=Range(start_line=96, start_character=6, end_line=96, end_character=15),
)
print(f"Seed: {seed.name} in {Path(seed.file_path).name}:{seed.range.start_line}")
print("Settings: max_depth=1, max_nodes=20")
builder = LspGraphBuilder(max_depth=1, max_nodes=20)
async with LspBridge(workspace_root=str(WORKSPACE_ROOT), timeout=30.0) as bridge:
graph = await builder.build_from_seeds([seed], bridge)
print(f"\n[OK] Graph expansion complete:")
print(f" Nodes: {len(graph.nodes)}")
print(f" Edges: {len(graph.edges)}")
if graph.nodes:
print("\nNodes found:")
for node_id, node in list(graph.nodes.items())[:15]:
print(f" - {node.name} ({node.kind}) in {Path(node.file_path).name}:{node.range.start_line}")
if graph.edges:
print(f"\nEdges (first 10):")
for edge in list(graph.edges)[:10]:
src = graph.nodes.get(edge.source_id)
tgt = graph.nodes.get(edge.target_id)
src_name = src.name if src else edge.source_id[:20]
tgt_name = tgt.name if tgt else edge.target_id[:20]
print(f" - {src_name} --[{edge.relation}]--> {tgt_name}")
return len(graph.nodes) >= 1
async def test_cache_performance():
"""Test cache performance."""
print("\n" + "=" * 60)
print("TEST: Cache Performance")
print("=" * 60)
symbol = CodeSymbolNode(
id=f"{TEST_FILE}:LspBridge:96",
name="LspBridge",
kind="class",
file_path=str(TEST_FILE),
range=Range(start_line=96, start_character=6, end_line=96, end_character=15),
)
import time
async with LspBridge(workspace_root=str(WORKSPACE_ROOT), timeout=30.0) as bridge:
# First call - cache miss
start = time.perf_counter()
await bridge.get_references(symbol)
first_time = (time.perf_counter() - start) * 1000
# Second call - cache hit
start = time.perf_counter()
await bridge.get_references(symbol)
second_time = (time.perf_counter() - start) * 1000
print(f"\nFirst call (cache miss): {first_time:.2f}ms")
print(f"Second call (cache hit): {second_time:.2f}ms")
print(f"Speedup: {first_time/max(second_time, 0.001):.1f}x")
print(f"Cache entries: {len(bridge.cache)}")
if second_time < first_time:
print("\n[OK] Cache is working correctly")
else:
print("\n[WARN] Cache may not be effective")
return second_time < first_time
async def run_all_tests():
"""Run all tests."""
print("=" * 60)
print("CODEX-LENS LSP REAL INTERFACE TESTS (Standalone Mode)")
print("=" * 60)
print(f"Test file: {TEST_FILE}")
print(f"Workspace: {WORKSPACE_ROOT}")
print(f"Mode: Standalone (direct language server communication)")
results = {}
tests = [
("get_definition", test_get_definition),
("get_references", test_get_references),
("get_hover", test_get_hover),
("get_document_symbols", test_get_document_symbols),
("graph_expansion", test_graph_expansion),
("cache_performance", test_cache_performance),
]
for name, test_fn in tests:
try:
results[name] = await test_fn()
except Exception as e:
print(f"\n[FAIL] FAILED: {e}")
import traceback
traceback.print_exc()
results[name] = False
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
passed = sum(1 for v in results.values() if v)
total = len(results)
for name, result in results.items():
status = "[PASS]" if result else "[FAIL]"
print(f" {status}: {name}")
print(f"\nResult: {passed}/{total} tests passed")
return passed == total
def main():
"""Main entry point."""
print("Checking pyright-langserver availability...")
if not check_pyright():
print("\n" + "=" * 60)
print("ERROR: pyright-langserver not available")
print("=" * 60)
print()
print("To run these tests:")
print(" 1. Install pyright: npm install -g pyright")
print(" 2. Verify: pyright-langserver --version")
print(" 3. Run this script again")
print()
sys.exit(1)
print("[OK] pyright-langserver is available!")
print()
# Run tests
# Note: On Windows, we use the default ProactorEventLoop (not SelectorEventLoop)
# because ProactorEventLoop supports subprocess creation which is required for LSP
success = asyncio.run(run_all_tests())
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,424 @@
"""Real interface tests for LSP Bridge using Standalone Mode.
These tests require:
1. Language servers installed (pyright-langserver, typescript-language-server)
2. A Python/TypeScript project in the workspace
Run with: pytest tests/real/ -v -s
"""
import asyncio
import os
import sys
import pytest
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from codexlens.lsp.lsp_bridge import LspBridge, Location, HAS_AIOHTTP
from codexlens.lsp.lsp_graph_builder import LspGraphBuilder
from codexlens.hybrid_search.data_structures import CodeSymbolNode, Range
# Test configuration - adjust these paths to match your setup
TEST_PYTHON_FILE = Path(__file__).parent.parent.parent / "src" / "codexlens" / "lsp" / "lsp_bridge.py"
TEST_TYPESCRIPT_FILE = Path(__file__).parent.parent.parent.parent / "ccw-vscode-bridge" / "src" / "extension.ts"
WORKSPACE_ROOT = Path(__file__).parent.parent.parent # codex-lens root
def is_pyright_available() -> bool:
"""Check if pyright-langserver is installed."""
import shutil
return shutil.which("pyright-langserver") is not None
def is_typescript_server_available() -> bool:
"""Check if typescript-language-server is installed."""
import shutil
return shutil.which("typescript-language-server") is not None
# Skip all tests if pyright not available
pytestmark = pytest.mark.skipif(
not is_pyright_available(),
reason="pyright-langserver not installed. Install with: npm install -g pyright"
)
class TestRealLspBridgeStandalone:
"""Real interface tests for LspBridge in Standalone Mode."""
@pytest.fixture
def bridge(self):
"""Create real LspBridge instance in standalone mode."""
return LspBridge(
workspace_root=str(WORKSPACE_ROOT),
timeout=30.0,
use_vscode_bridge=False, # Use standalone mode
)
@pytest.fixture
def python_symbol(self):
"""Create a symbol pointing to LspBridge class."""
return CodeSymbolNode(
id=f"{TEST_PYTHON_FILE}:LspBridge:96",
name="LspBridge",
kind="class",
file_path=str(TEST_PYTHON_FILE),
range=Range(start_line=96, start_character=6, end_line=96, end_character=15),
)
@pytest.fixture
def python_method_symbol(self):
"""Create a symbol pointing to get_references method."""
return CodeSymbolNode(
id=f"{TEST_PYTHON_FILE}:get_references:200",
name="get_references",
kind="method",
file_path=str(TEST_PYTHON_FILE),
range=Range(start_line=200, start_character=10, end_line=200, end_character=24),
)
@pytest.mark.asyncio
async def test_real_get_definition(self, bridge, python_symbol):
"""Test get_definition against real Python file."""
print(f"\n>>> Testing get_definition for {python_symbol.name}")
print(f" File: {python_symbol.file_path}")
print(f" Position: line {python_symbol.range.start_line}, char {python_symbol.range.start_character}")
async with bridge:
definition = await bridge.get_definition(python_symbol)
print(f" Result: {definition}")
# Definition should exist (class definition)
if definition:
print(f" ✓ Found definition at {definition.file_path}:{definition.line}")
assert definition.file_path.endswith(".py")
assert definition.line > 0
else:
print(" ⚠ No definition found (may be expected for class declarations)")
@pytest.mark.asyncio
async def test_real_get_references(self, bridge, python_method_symbol):
"""Test get_references against real Python file."""
print(f"\n>>> Testing get_references for {python_method_symbol.name}")
print(f" File: {python_method_symbol.file_path}")
print(f" Position: line {python_method_symbol.range.start_line}")
async with bridge:
refs = await bridge.get_references(python_method_symbol)
print(f" Found {len(refs)} references:")
for i, ref in enumerate(refs[:5]): # Show first 5
print(f" [{i+1}] {Path(ref.file_path).name}:{ref.line}")
if len(refs) > 5:
print(f" ... and {len(refs) - 5} more")
# Should find at least the definition itself
assert len(refs) >= 0, "References query should succeed (may be empty)"
@pytest.mark.asyncio
async def test_real_get_hover(self, bridge, python_symbol):
"""Test get_hover against real Python file."""
print(f"\n>>> Testing get_hover for {python_symbol.name}")
async with bridge:
hover = await bridge.get_hover(python_symbol)
if hover:
print(f" ✓ Hover info ({len(hover)} chars):")
preview = hover[:200].replace('\n', '\\n')
print(f" {preview}...")
assert len(hover) > 0
else:
print(" ⚠ No hover info available")
@pytest.mark.asyncio
async def test_real_get_document_symbols(self, bridge):
"""Test get_document_symbols against real Python file."""
file_path = str(TEST_PYTHON_FILE)
print(f"\n>>> Testing get_document_symbols")
print(f" File: {file_path}")
async with bridge:
symbols = await bridge.get_document_symbols(file_path)
print(f" Found {len(symbols)} symbols:")
# Group by kind
by_kind = {}
for sym in symbols:
kind = sym.get("kind", "unknown")
by_kind[kind] = by_kind.get(kind, 0) + 1
for kind, count in sorted(by_kind.items()):
print(f" {kind}: {count}")
# Show some sample symbols
print(" Sample symbols:")
for sym in symbols[:10]:
name = sym.get("name", "?")
kind = sym.get("kind", "?")
range_data = sym.get("range", {})
start = range_data.get("start", {})
line = start.get("line", 0) + 1
print(f" - {name} ({kind}) at line {line}")
assert len(symbols) > 0, "Should find symbols in Python file"
@pytest.mark.asyncio
async def test_real_get_call_hierarchy(self, bridge, python_method_symbol):
"""Test get_call_hierarchy against real Python file."""
print(f"\n>>> Testing get_call_hierarchy for {python_method_symbol.name}")
async with bridge:
calls = await bridge.get_call_hierarchy(python_method_symbol)
print(f" Found {len(calls)} call hierarchy items:")
for i, call in enumerate(calls[:10]):
print(f" [{i+1}] {call.name} in {Path(call.file_path).name}:{call.range.start_line}")
# May be empty if call hierarchy not supported or no callers
print(f" ✓ Call hierarchy query completed")
@pytest.mark.asyncio
async def test_real_cache_behavior(self, bridge, python_symbol):
"""Test that cache actually works with real requests."""
print(f"\n>>> Testing cache behavior")
async with bridge:
# First call - should hit language server
print(" First call (cache miss expected)...")
refs1 = await bridge.get_references(python_symbol)
cache_size_after_first = len(bridge.cache)
print(f" Cache size after first call: {cache_size_after_first}")
# Second call - should hit cache
print(" Second call (cache hit expected)...")
refs2 = await bridge.get_references(python_symbol)
cache_size_after_second = len(bridge.cache)
print(f" Cache size after second call: {cache_size_after_second}")
assert cache_size_after_first > 0, "Cache should have entries after first call"
assert cache_size_after_second == cache_size_after_first, "Cache size should not change on hit"
assert refs1 == refs2, "Results should be identical"
print(" ✓ Cache working correctly")
class TestRealLspGraphBuilderStandalone:
"""Real interface tests for LspGraphBuilder with Standalone Mode."""
@pytest.fixture
def seed_node(self):
"""Create a seed node for graph expansion."""
return CodeSymbolNode(
id=f"{TEST_PYTHON_FILE}:LspBridge:96",
name="LspBridge",
kind="class",
file_path=str(TEST_PYTHON_FILE),
range=Range(start_line=96, start_character=6, end_line=96, end_character=15),
)
@pytest.mark.asyncio
async def test_real_graph_expansion(self, seed_node):
"""Test real graph expansion from a Python class."""
print(f"\n>>> Testing graph expansion from {seed_node.name}")
print(f" Seed: {seed_node.file_path}:{seed_node.range.start_line}")
builder = LspGraphBuilder(max_depth=1, max_nodes=20)
async with LspBridge(
workspace_root=str(WORKSPACE_ROOT),
timeout=30.0,
) as bridge:
graph = await builder.build_from_seeds([seed_node], bridge)
print(f" Graph results:")
print(f" Nodes: {len(graph.nodes)}")
print(f" Edges: {len(graph.edges)}")
if graph.nodes:
print(f" Node details:")
for node_id, node in list(graph.nodes.items())[:10]:
print(f" - {node.name} ({node.kind}) in {Path(node.file_path).name}:{node.range.start_line}")
if graph.edges:
print(f" Edge details:")
for edge in list(graph.edges)[:10]:
print(f" - {edge.source_id[:30]}... --[{edge.relation}]--> {edge.target_id[:30]}...")
# We should have at least the seed node
assert len(graph.nodes) >= 1, "Graph should contain at least the seed node"
print(" ✓ Graph expansion completed")
@pytest.mark.asyncio
async def test_real_multi_seed_expansion(self):
"""Test graph expansion from multiple seeds."""
print(f"\n>>> Testing multi-seed graph expansion")
seeds = [
CodeSymbolNode(
id=f"{TEST_PYTHON_FILE}:Location:35",
name="Location",
kind="class",
file_path=str(TEST_PYTHON_FILE),
range=Range(start_line=35, start_character=6, end_line=35, end_character=14),
),
CodeSymbolNode(
id=f"{TEST_PYTHON_FILE}:CacheEntry:81",
name="CacheEntry",
kind="class",
file_path=str(TEST_PYTHON_FILE),
range=Range(start_line=81, start_character=6, end_line=81, end_character=16),
),
]
print(f" Seeds: {[s.name for s in seeds]}")
builder = LspGraphBuilder(max_depth=1, max_nodes=30)
async with LspBridge(
workspace_root=str(WORKSPACE_ROOT),
timeout=30.0,
) as bridge:
graph = await builder.build_from_seeds(seeds, bridge)
print(f" Graph results:")
print(f" Nodes: {len(graph.nodes)}")
print(f" Edges: {len(graph.edges)}")
# Should have at least the seed nodes
assert len(graph.nodes) >= len(seeds), f"Graph should contain at least {len(seeds)} seed nodes"
print(" ✓ Multi-seed expansion completed")
class TestRealHybridSearchIntegrationStandalone:
"""Real integration tests with HybridSearchEngine."""
@pytest.mark.asyncio
async def test_real_lsp_search_pipeline(self):
"""Test the full LSP search pipeline with real LSP."""
print(f"\n>>> Testing full LSP search pipeline")
# Create mock seeds (normally from vector/splade search)
seeds = [
CodeSymbolNode(
id=f"{TEST_PYTHON_FILE}:LspBridge:96",
name="LspBridge",
kind="class",
file_path=str(TEST_PYTHON_FILE),
range=Range(start_line=96, start_character=6, end_line=96, end_character=15),
),
]
print(f" Starting with {len(seeds)} seed(s)")
builder = LspGraphBuilder(max_depth=2, max_nodes=50)
async with LspBridge(
workspace_root=str(WORKSPACE_ROOT),
timeout=30.0,
) as bridge:
graph = await builder.build_from_seeds(seeds, bridge)
print(f" Expanded to {len(graph.nodes)} nodes")
# Simulate conversion to SearchResult format
results = []
for node_id, node in graph.nodes.items():
if node.id not in [s.id for s in seeds]: # Exclude seeds
results.append({
"path": node.file_path,
"symbol_name": node.name,
"symbol_kind": node.kind,
"start_line": node.range.start_line,
"end_line": node.range.end_line,
})
print(f" Generated {len(results)} search results (excluding seeds)")
if results:
print(" Sample results:")
for r in results[:5]:
print(f" - {r['symbol_name']} ({r['symbol_kind']}) at {Path(r['path']).name}:{r['start_line']}")
print(" ✓ Full pipeline completed")
# TypeScript tests (if available)
@pytest.mark.skipif(
not is_typescript_server_available() or not TEST_TYPESCRIPT_FILE.exists(),
reason="TypeScript language server or test file not available"
)
class TestRealTypescriptLspStandalone:
"""Real tests against TypeScript files."""
@pytest.fixture
def ts_symbol(self):
"""Create a symbol in the TypeScript extension file."""
return CodeSymbolNode(
id=f"{TEST_TYPESCRIPT_FILE}:activate:12",
name="activate",
kind="function",
file_path=str(TEST_TYPESCRIPT_FILE),
range=Range(start_line=12, start_character=16, end_line=12, end_character=24),
)
@pytest.mark.asyncio
async def test_real_typescript_definition(self, ts_symbol):
"""Test LSP definition lookup in TypeScript."""
print(f"\n>>> Testing TypeScript definition for {ts_symbol.name}")
async with LspBridge(
workspace_root=str(TEST_TYPESCRIPT_FILE.parent.parent),
timeout=30.0,
) as bridge:
definition = await bridge.get_definition(ts_symbol)
if definition:
print(f" ✓ Found: {definition.file_path}:{definition.line}")
else:
print(" ⚠ No definition found (TypeScript LSP may not be active)")
@pytest.mark.asyncio
async def test_real_typescript_document_symbols(self):
"""Test document symbols in TypeScript."""
print(f"\n>>> Testing TypeScript document symbols")
async with LspBridge(
workspace_root=str(TEST_TYPESCRIPT_FILE.parent.parent),
timeout=30.0,
) as bridge:
symbols = await bridge.get_document_symbols(str(TEST_TYPESCRIPT_FILE))
print(f" Found {len(symbols)} symbols")
for sym in symbols[:5]:
print(f" - {sym.get('name')} ({sym.get('kind')})")
# TypeScript files should have symbols
if symbols:
print(" ✓ TypeScript symbols retrieved")
else:
print(" ⚠ No symbols found (TypeScript LSP may not be active)")
if __name__ == "__main__":
# Allow running directly
if is_pyright_available():
print("Pyright language server is available")
print("Running tests...")
pytest.main([__file__, "-v", "-s"])
else:
print("=" * 60)
print("Pyright language server NOT available")
print("=" * 60)
print()
print("To run these tests:")
print("1. Install pyright: npm install -g pyright")
print("2. Install typescript-language-server: npm install -g typescript-language-server")
print("3. Run: pytest tests/real/ -v -s")
print()
sys.exit(1)