mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-05 01:50:27 +08:00
- Implemented GraphExpander to enhance search results with related symbols using precomputed neighbors. - Added CrossEncoderReranker for second-stage search ranking, allowing for improved result scoring. - Created migrations to establish necessary database tables for relationships and graph neighbors. - Developed tests for graph expansion functionality, ensuring related results are populated correctly. - Enhanced performance benchmarks for cross-encoder reranking latency and graph expansion overhead. - Updated schema cleanup tests to reflect changes in versioning and deprecated fields. - Added new test cases for Treesitter parser to validate relationship extraction with alias resolution.
605 lines
20 KiB
Python
605 lines
20 KiB
Python
"""Comprehensive tests for CodexLens search functionality.
|
|
|
|
Tests cover:
|
|
- FTS5 text search (basic, phrase, boolean, wildcard)
|
|
- Chain search across directories
|
|
- Symbol search (by name, kind, filters)
|
|
- Files-only search mode
|
|
- Edge cases and error handling
|
|
"""
|
|
|
|
import tempfile
|
|
import pytest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from codexlens.storage.sqlite_store import SQLiteStore
|
|
from codexlens.storage.dir_index import DirIndexStore
|
|
from codexlens.storage.registry import RegistryStore
|
|
from codexlens.storage.path_mapper import PathMapper
|
|
from codexlens.search import (
|
|
ChainSearchEngine,
|
|
SearchOptions,
|
|
SearchStats,
|
|
ChainSearchResult,
|
|
quick_search,
|
|
)
|
|
from codexlens.entities import IndexedFile, Symbol, SearchResult
|
|
|
|
|
|
# === Fixtures ===
|
|
|
|
@pytest.fixture
|
|
def temp_dir():
|
|
"""Create a temporary directory."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
yield Path(tmpdir)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_files():
|
|
"""Sample file data for testing."""
|
|
return [
|
|
(IndexedFile(
|
|
path="/project/src/auth.py",
|
|
language="python",
|
|
symbols=[
|
|
Symbol(name="authenticate", kind="function", range=(1, 10)),
|
|
Symbol(name="verify_token", kind="function", range=(12, 20)),
|
|
Symbol(name="AuthManager", kind="class", range=(22, 50)),
|
|
],
|
|
), """
|
|
def authenticate(username, password):
|
|
'''Authenticate user with credentials.'''
|
|
user = find_user(username)
|
|
if user and check_password(user, password):
|
|
return create_token(user)
|
|
return None
|
|
|
|
def verify_token(token):
|
|
'''Verify JWT token validity.'''
|
|
try:
|
|
payload = decode_token(token)
|
|
return payload
|
|
except TokenExpired:
|
|
return None
|
|
|
|
class AuthManager:
|
|
'''Manages authentication state.'''
|
|
def __init__(self):
|
|
self.sessions = {}
|
|
|
|
def login(self, user):
|
|
token = authenticate(user.name, user.password)
|
|
self.sessions[user.id] = token
|
|
return token
|
|
"""),
|
|
(IndexedFile(
|
|
path="/project/src/database.py",
|
|
language="python",
|
|
symbols=[
|
|
Symbol(name="connect", kind="function", range=(1, 5)),
|
|
Symbol(name="query", kind="function", range=(7, 15)),
|
|
Symbol(name="DatabasePool", kind="class", range=(17, 40)),
|
|
],
|
|
), """
|
|
def connect(host, port, database):
|
|
'''Establish database connection.'''
|
|
return Connection(host, port, database)
|
|
|
|
def query(connection, sql, params=None):
|
|
'''Execute SQL query and return results.'''
|
|
cursor = connection.cursor()
|
|
cursor.execute(sql, params or [])
|
|
return cursor.fetchall()
|
|
|
|
class DatabasePool:
|
|
'''Connection pool for database.'''
|
|
def __init__(self, size=10):
|
|
self.pool = []
|
|
self.size = size
|
|
|
|
def get_connection(self):
|
|
if self.pool:
|
|
return self.pool.pop()
|
|
return connect()
|
|
"""),
|
|
(IndexedFile(
|
|
path="/project/src/utils.py",
|
|
language="python",
|
|
symbols=[
|
|
Symbol(name="format_date", kind="function", range=(1, 3)),
|
|
Symbol(name="parse_json", kind="function", range=(5, 10)),
|
|
Symbol(name="hash_password", kind="function", range=(12, 18)),
|
|
],
|
|
), """
|
|
def format_date(date, fmt='%Y-%m-%d'):
|
|
return date.strftime(fmt)
|
|
|
|
def parse_json(data):
|
|
'''Parse JSON string to dictionary.'''
|
|
import json
|
|
return json.loads(data)
|
|
|
|
def hash_password(password, salt=None):
|
|
'''Hash password using bcrypt.'''
|
|
import hashlib
|
|
salt = salt or generate_salt()
|
|
return hashlib.sha256((password + salt).encode()).hexdigest()
|
|
"""),
|
|
]
|
|
|
|
|
|
@pytest.fixture
|
|
def populated_store(temp_dir, sample_files):
|
|
"""Create a populated SQLite store for testing."""
|
|
db_path = temp_dir / "_index.db"
|
|
store = SQLiteStore(db_path)
|
|
store.initialize()
|
|
|
|
for indexed_file, content in sample_files:
|
|
store.add_file(indexed_file, content)
|
|
|
|
yield store
|
|
store.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def populated_dir_store(temp_dir, sample_files):
|
|
"""Create a populated DirIndexStore for testing."""
|
|
db_path = temp_dir / "_index.db"
|
|
store = DirIndexStore(db_path)
|
|
|
|
for indexed_file, content in sample_files:
|
|
store.add_file(indexed_file, content)
|
|
|
|
yield store
|
|
store.close()
|
|
|
|
|
|
# === FTS5 Search Tests ===
|
|
|
|
class TestFTS5BasicSearch:
|
|
"""Tests for basic FTS5 text search."""
|
|
|
|
def test_single_term_search(self, populated_store):
|
|
"""Test search with a single term."""
|
|
results = populated_store.search_fts("authenticate")
|
|
assert len(results) >= 1
|
|
assert any("auth" in r.path.lower() for r in results)
|
|
|
|
def test_case_insensitive_search(self, populated_store):
|
|
"""Test that search is case insensitive."""
|
|
results_lower = populated_store.search_fts("database")
|
|
results_upper = populated_store.search_fts("DATABASE")
|
|
results_mixed = populated_store.search_fts("DataBase")
|
|
|
|
# All should return similar results
|
|
assert len(results_lower) == len(results_upper) == len(results_mixed)
|
|
|
|
def test_partial_word_search(self, populated_store):
|
|
"""Test search with partial words using wildcards."""
|
|
results = populated_store.search_fts("auth*")
|
|
assert len(results) >= 1
|
|
# Should match authenticate, authentication, AuthManager, etc.
|
|
|
|
def test_multiple_terms_search(self, populated_store):
|
|
"""Test search with multiple terms (implicit AND)."""
|
|
results = populated_store.search_fts("user password")
|
|
assert len(results) >= 1
|
|
|
|
def test_no_results_search(self, populated_store):
|
|
"""Test search that returns no results."""
|
|
results = populated_store.search_fts("nonexistent_xyz_term")
|
|
assert len(results) == 0
|
|
|
|
def test_search_with_limit(self, populated_store):
|
|
"""Test search respects limit parameter."""
|
|
results = populated_store.search_fts("def", limit=1)
|
|
assert len(results) <= 1
|
|
|
|
def test_search_returns_excerpt(self, populated_store):
|
|
"""Test search results include excerpts."""
|
|
results = populated_store.search_fts("authenticate")
|
|
assert len(results) >= 1
|
|
# SearchResult should have excerpt field
|
|
for r in results:
|
|
assert hasattr(r, 'excerpt')
|
|
|
|
|
|
class TestFTS5AdvancedSearch:
|
|
"""Tests for advanced FTS5 search features."""
|
|
|
|
def test_phrase_search(self, populated_store):
|
|
"""Test exact phrase search with quotes."""
|
|
results = populated_store.search_fts('"verify_token"')
|
|
assert len(results) >= 1
|
|
|
|
def test_boolean_or_search(self, populated_store):
|
|
"""Test OR boolean search."""
|
|
results = populated_store.search_fts("authenticate OR database")
|
|
# Should find files containing either term
|
|
assert len(results) >= 2
|
|
|
|
def test_boolean_not_search(self, populated_store):
|
|
"""Test NOT boolean search."""
|
|
all_results = populated_store.search_fts("def")
|
|
not_results = populated_store.search_fts("def NOT authenticate")
|
|
# NOT should return fewer results
|
|
assert len(not_results) <= len(all_results)
|
|
|
|
def test_prefix_search(self, populated_store):
|
|
"""Test prefix search with asterisk."""
|
|
results = populated_store.search_fts("connect*")
|
|
assert len(results) >= 1
|
|
# Should match connect, connection, etc.
|
|
|
|
def test_special_characters_in_query(self, populated_store):
|
|
"""Test search handles special characters gracefully."""
|
|
# Should not raise an error
|
|
results = populated_store.search_fts("__init__")
|
|
# May or may not have results, but shouldn't crash
|
|
|
|
def test_unicode_search(self, temp_dir):
|
|
"""Test search with unicode content."""
|
|
store = SQLiteStore(temp_dir / "_index.db")
|
|
store.initialize()
|
|
|
|
indexed_file = IndexedFile(
|
|
path="/test/unicode.py",
|
|
language="python",
|
|
symbols=[Symbol(name="世界", kind="function", range=(1, 1))],
|
|
)
|
|
store.add_file(indexed_file, "def 世界(): return '你好世界'")
|
|
|
|
results = store.search_fts("世界")
|
|
assert len(results) == 1
|
|
|
|
store.close()
|
|
|
|
|
|
class TestFTS5Pagination:
|
|
"""Tests for FTS5 search pagination."""
|
|
|
|
def test_offset_pagination(self, temp_dir):
|
|
"""Test search with offset for pagination."""
|
|
store = SQLiteStore(temp_dir / "_index.db")
|
|
store.initialize()
|
|
|
|
# Add multiple files
|
|
for i in range(10):
|
|
indexed_file = IndexedFile(
|
|
path=f"/test/file{i}.py",
|
|
language="python",
|
|
symbols=[],
|
|
)
|
|
store.add_file(indexed_file, f"searchable content number {i}")
|
|
|
|
page1 = store.search_fts("searchable", limit=3, offset=0)
|
|
page2 = store.search_fts("searchable", limit=3, offset=3)
|
|
page3 = store.search_fts("searchable", limit=3, offset=6)
|
|
|
|
# Each page should have different results
|
|
paths1 = {r.path for r in page1}
|
|
paths2 = {r.path for r in page2}
|
|
paths3 = {r.path for r in page3}
|
|
|
|
assert paths1.isdisjoint(paths2)
|
|
assert paths2.isdisjoint(paths3)
|
|
|
|
store.close()
|
|
|
|
def test_offset_beyond_results(self, populated_store):
|
|
"""Test offset beyond available results."""
|
|
results = populated_store.search_fts("authenticate", limit=10, offset=1000)
|
|
assert len(results) == 0
|
|
|
|
|
|
# === Symbol Search Tests ===
|
|
|
|
class TestSymbolSearch:
|
|
"""Tests for symbol search functionality."""
|
|
|
|
def test_search_by_name(self, populated_store):
|
|
"""Test symbol search by name."""
|
|
results = populated_store.search_symbols("auth")
|
|
assert len(results) >= 1
|
|
assert any("auth" in s.name.lower() for s in results)
|
|
|
|
def test_search_by_kind_function(self, populated_store):
|
|
"""Test symbol search filtered by kind=function."""
|
|
results = populated_store.search_symbols("", kind="function")
|
|
assert all(s.kind == "function" for s in results)
|
|
|
|
def test_search_by_kind_class(self, populated_store):
|
|
"""Test symbol search filtered by kind=class."""
|
|
results = populated_store.search_symbols("", kind="class")
|
|
assert all(s.kind == "class" for s in results)
|
|
assert any("Manager" in s.name or "Pool" in s.name for s in results)
|
|
|
|
def test_search_symbols_with_limit(self, populated_store):
|
|
"""Test symbol search respects limit."""
|
|
results = populated_store.search_symbols("", limit=2)
|
|
assert len(results) <= 2
|
|
|
|
def test_search_symbols_returns_range(self, populated_store):
|
|
"""Test symbol search results include line range."""
|
|
results = populated_store.search_symbols("authenticate")
|
|
assert len(results) >= 1
|
|
for sym in results:
|
|
assert hasattr(sym, 'range')
|
|
assert len(sym.range) == 2
|
|
assert sym.range[0] <= sym.range[1]
|
|
|
|
|
|
# === Chain Search Tests ===
|
|
|
|
class TestChainSearchEngine:
|
|
"""Tests for ChainSearchEngine."""
|
|
|
|
@pytest.fixture
|
|
def mock_registry(self):
|
|
"""Create a mock registry."""
|
|
registry = MagicMock(spec=RegistryStore)
|
|
registry.find_nearest_index.return_value = None
|
|
return registry
|
|
|
|
@pytest.fixture
|
|
def mock_mapper(self):
|
|
"""Create a mock path mapper."""
|
|
return MagicMock(spec=PathMapper)
|
|
|
|
def test_search_no_index_found(self, mock_registry, mock_mapper):
|
|
"""Test search when no index is found."""
|
|
mock_mapper.source_to_index_db.return_value = Path("/nonexistent/_index.db")
|
|
|
|
engine = ChainSearchEngine(mock_registry, mock_mapper)
|
|
result = engine.search("test", Path("/nonexistent"))
|
|
|
|
assert result.results == []
|
|
assert result.symbols == []
|
|
assert result.stats.dirs_searched == 0
|
|
|
|
def test_search_options_depth(self, mock_registry, mock_mapper, temp_dir):
|
|
"""Test search respects depth option."""
|
|
# Create a simple index structure
|
|
db_path = temp_dir / "_index.db"
|
|
store = DirIndexStore(db_path)
|
|
store.initialize()
|
|
store.add_file(
|
|
name="test.py",
|
|
full_path=str(temp_dir / "test.py"),
|
|
content="test content searchable",
|
|
language="python",
|
|
)
|
|
store.close()
|
|
|
|
mock_mapper.source_to_index_db.return_value = db_path
|
|
|
|
engine = ChainSearchEngine(mock_registry, mock_mapper)
|
|
options = SearchOptions(depth=0) # Only current dir
|
|
|
|
result = engine.search("test", temp_dir, options)
|
|
|
|
# With depth=0, should only search current directory
|
|
assert result.stats.dirs_searched <= 1
|
|
|
|
def test_search_files_only(self, mock_registry, mock_mapper, temp_dir):
|
|
"""Test search_files_only returns only paths."""
|
|
db_path = temp_dir / "_index.db"
|
|
store = DirIndexStore(db_path)
|
|
store.initialize()
|
|
store.add_file(
|
|
name="test.py",
|
|
full_path=str(temp_dir / "test.py"),
|
|
content="searchable content here",
|
|
language="python",
|
|
)
|
|
store.close()
|
|
|
|
mock_mapper.source_to_index_db.return_value = db_path
|
|
|
|
engine = ChainSearchEngine(mock_registry, mock_mapper)
|
|
paths = engine.search_files_only("searchable", temp_dir)
|
|
|
|
assert isinstance(paths, list)
|
|
for p in paths:
|
|
assert isinstance(p, str)
|
|
|
|
def test_search_symbols_engine(self, mock_registry, mock_mapper, temp_dir):
|
|
"""Test symbol search through engine."""
|
|
db_path = temp_dir / "_index.db"
|
|
store = DirIndexStore(db_path)
|
|
store.initialize()
|
|
store.add_file(
|
|
name="test.py",
|
|
full_path=str(temp_dir / "test.py"),
|
|
content="def my_function(): pass",
|
|
language="python",
|
|
symbols=[Symbol(name="my_function", kind="function", range=(1, 5))],
|
|
)
|
|
store.close()
|
|
|
|
mock_mapper.source_to_index_db.return_value = db_path
|
|
|
|
engine = ChainSearchEngine(mock_registry, mock_mapper)
|
|
symbols = engine.search_symbols("my_func", temp_dir)
|
|
|
|
assert len(symbols) >= 1
|
|
assert symbols[0].name == "my_function"
|
|
|
|
def test_search_result_stats(self, mock_registry, mock_mapper, temp_dir):
|
|
"""Test search result includes proper stats."""
|
|
db_path = temp_dir / "_index.db"
|
|
store = DirIndexStore(db_path)
|
|
store.initialize()
|
|
store.add_file(
|
|
name="test.py",
|
|
full_path=str(temp_dir / "test.py"),
|
|
content="content to search",
|
|
language="python",
|
|
)
|
|
store.close()
|
|
|
|
mock_mapper.source_to_index_db.return_value = db_path
|
|
|
|
engine = ChainSearchEngine(mock_registry, mock_mapper)
|
|
result = engine.search("content", temp_dir)
|
|
|
|
assert result.stats.time_ms >= 0
|
|
assert result.stats.dirs_searched >= 0
|
|
assert isinstance(result.stats.errors, list)
|
|
|
|
|
|
class TestSearchOptions:
|
|
"""Tests for SearchOptions configuration."""
|
|
|
|
def test_default_options(self):
|
|
"""Test default search options."""
|
|
options = SearchOptions()
|
|
assert options.depth == -1
|
|
assert options.max_workers == 8
|
|
assert options.limit_per_dir == 10
|
|
assert options.total_limit == 100
|
|
assert options.include_symbols is False
|
|
assert options.files_only is False
|
|
|
|
def test_custom_options(self):
|
|
"""Test custom search options."""
|
|
options = SearchOptions(
|
|
depth=3,
|
|
max_workers=4,
|
|
limit_per_dir=5,
|
|
total_limit=50,
|
|
include_symbols=True,
|
|
files_only=True,
|
|
)
|
|
assert options.depth == 3
|
|
assert options.max_workers == 4
|
|
assert options.limit_per_dir == 5
|
|
assert options.total_limit == 50
|
|
assert options.include_symbols is True
|
|
assert options.files_only is True
|
|
|
|
|
|
# === Edge Cases and Error Handling ===
|
|
|
|
class TestSearchEdgeCases:
|
|
"""Edge case tests for search functionality."""
|
|
|
|
def test_empty_query(self, populated_store):
|
|
"""Test search with empty query."""
|
|
# Empty query may raise an error or return empty results
|
|
try:
|
|
results = populated_store.search_fts("")
|
|
assert isinstance(results, list)
|
|
except Exception:
|
|
# Some implementations may reject empty queries
|
|
pass
|
|
|
|
def test_whitespace_query(self, populated_store):
|
|
"""Test search with whitespace-only query."""
|
|
# Whitespace query may raise an error or return empty results
|
|
try:
|
|
results = populated_store.search_fts(" ")
|
|
assert isinstance(results, list)
|
|
except Exception:
|
|
# Some implementations may reject whitespace queries
|
|
pass
|
|
|
|
def test_very_long_query(self, populated_store):
|
|
"""Test search with very long query."""
|
|
long_query = "function " * 100 # Repeat valid word
|
|
try:
|
|
results = populated_store.search_fts(long_query)
|
|
assert isinstance(results, list)
|
|
except Exception:
|
|
# Very long queries may be rejected
|
|
pass
|
|
|
|
def test_special_sql_characters(self, populated_store):
|
|
"""Test search handles SQL-like characters safely."""
|
|
# These should not cause SQL injection - may raise FTS syntax errors
|
|
queries = ["test", "function*", "test OR data"]
|
|
for q in queries:
|
|
results = populated_store.search_fts(q)
|
|
assert isinstance(results, list)
|
|
|
|
def test_search_reopened_store(self, temp_dir, sample_files):
|
|
"""Test search works after store is reopened."""
|
|
db_path = temp_dir / "_index.db"
|
|
store = SQLiteStore(db_path)
|
|
store.initialize()
|
|
store.add_file(sample_files[0][0], sample_files[0][1])
|
|
store.close()
|
|
|
|
# Reopen and search
|
|
store2 = SQLiteStore(db_path)
|
|
store2.initialize()
|
|
results = store2.search_fts("authenticate")
|
|
assert len(results) >= 1
|
|
store2.close()
|
|
|
|
def test_concurrent_searches(self, populated_store):
|
|
"""Test multiple concurrent searches."""
|
|
import threading
|
|
|
|
results = []
|
|
errors = []
|
|
|
|
def search_task(query):
|
|
try:
|
|
r = populated_store.search_fts(query)
|
|
results.append(len(r))
|
|
except Exception as e:
|
|
errors.append(e)
|
|
|
|
threads = [
|
|
threading.Thread(target=search_task, args=("authenticate",)),
|
|
threading.Thread(target=search_task, args=("database",)),
|
|
threading.Thread(target=search_task, args=("password",)),
|
|
]
|
|
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert len(errors) == 0
|
|
assert len(results) == 3
|
|
|
|
|
|
class TestChainSearchResult:
|
|
"""Tests for ChainSearchResult dataclass."""
|
|
|
|
def test_result_structure(self):
|
|
"""Test ChainSearchResult has all required fields."""
|
|
result = ChainSearchResult(
|
|
query="test",
|
|
results=[],
|
|
symbols=[],
|
|
stats=SearchStats(),
|
|
)
|
|
assert result.query == "test"
|
|
assert result.results == []
|
|
assert result.related_results == []
|
|
assert result.symbols == []
|
|
assert result.stats.dirs_searched == 0
|
|
|
|
|
|
class TestSearchStats:
|
|
"""Tests for SearchStats dataclass."""
|
|
|
|
def test_default_stats(self):
|
|
"""Test default search stats."""
|
|
stats = SearchStats()
|
|
assert stats.dirs_searched == 0
|
|
assert stats.files_matched == 0
|
|
assert stats.time_ms == 0
|
|
assert stats.errors == []
|
|
|
|
def test_stats_with_errors(self):
|
|
"""Test search stats with errors."""
|
|
stats = SearchStats(errors=["Error 1", "Error 2"])
|
|
assert len(stats.errors) == 2
|