Add comprehensive tests for schema cleanup migration and search comparison

- Implement tests for migration 005 to verify removal of deprecated fields in the database schema.
- Ensure that new databases are created with a clean schema.
- Validate that keywords are correctly extracted from the normalized file_keywords table.
- Test symbol insertion without deprecated fields and subdir operations without direct_files.
- Create a detailed search comparison test to evaluate vector search vs hybrid search performance.
- Add a script for reindexing projects to extract code relationships and verify GraphAnalyzer functionality.
- Include a test script to check TreeSitter parser availability and relationship extraction from sample files.
This commit is contained in:
catlog22
2025-12-16 19:27:05 +08:00
parent 3da0ef2adb
commit df23975a0b
61 changed files with 13114 additions and 366 deletions

View File

@@ -469,3 +469,144 @@ class TestDualFTSPerformance:
assert len(results) > 0, "Should find matches in fuzzy FTS"
finally:
store.close()
def test_fuzzy_substring_matching(self, populated_db):
"""Test fuzzy search finds partial token matches with trigram."""
store = DirIndexStore(populated_db)
store.initialize()
try:
# Check if trigram is available
with store._get_connection() as conn:
cursor = conn.execute(
"SELECT sql FROM sqlite_master WHERE name='files_fts_fuzzy'"
)
fts_sql = cursor.fetchone()[0]
has_trigram = 'trigram' in fts_sql.lower()
if not has_trigram:
pytest.skip("Trigram tokenizer not available, skipping fuzzy substring test")
# Search for partial token "func" should match "function0", "function1", etc.
cursor = conn.execute(
"""SELECT full_path, bm25(files_fts_fuzzy) as score
FROM files_fts_fuzzy
WHERE files_fts_fuzzy MATCH 'func'
ORDER BY score
LIMIT 10"""
)
results = cursor.fetchall()
# With trigram, should find matches
assert len(results) > 0, "Fuzzy search with trigram should find partial token matches"
# Verify results contain expected files with "function" in content
for path, score in results:
assert "file" in path # All test files named "test/fileN.py"
assert score < 0 # BM25 scores are negative
finally:
store.close()
class TestMigrationRecovery:
"""Tests for migration failure recovery and edge cases."""
@pytest.fixture
def corrupted_v2_db(self):
"""Create v2 database with incomplete migration state."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = Path(f.name)
conn = sqlite3.connect(db_path)
try:
# Create v2 schema with some data
conn.executescript("""
PRAGMA user_version = 2;
CREATE TABLE files (
path TEXT PRIMARY KEY,
content TEXT,
language TEXT
);
INSERT INTO files VALUES ('test.py', 'content', 'python');
CREATE VIRTUAL TABLE files_fts USING fts5(
path, content, language,
content='files', content_rowid='rowid'
);
""")
conn.commit()
finally:
conn.close()
yield db_path
if db_path.exists():
db_path.unlink()
def test_migration_preserves_data_on_failure(self, corrupted_v2_db):
"""Test that data is preserved if migration encounters issues."""
# Read original data
conn = sqlite3.connect(corrupted_v2_db)
cursor = conn.execute("SELECT path, content FROM files")
original_data = cursor.fetchall()
conn.close()
# Attempt migration (may fail or succeed)
store = DirIndexStore(corrupted_v2_db)
try:
store.initialize()
except Exception:
# Even if migration fails, original data should be intact
pass
finally:
store.close()
# Verify data still exists
conn = sqlite3.connect(corrupted_v2_db)
try:
# Check schema version to determine column name
cursor = conn.execute("PRAGMA user_version")
version = cursor.fetchone()[0]
if version >= 4:
# Migration succeeded, use new column name
cursor = conn.execute("SELECT full_path, content FROM files WHERE full_path='test.py'")
else:
# Migration failed, use old column name
cursor = conn.execute("SELECT path, content FROM files WHERE path='test.py'")
result = cursor.fetchone()
# Data should still be there
assert result is not None, "Data should be preserved after migration attempt"
finally:
conn.close()
def test_migration_idempotent_after_partial_failure(self, corrupted_v2_db):
"""Test migration can be retried after partial failure."""
store1 = DirIndexStore(corrupted_v2_db)
store2 = DirIndexStore(corrupted_v2_db)
try:
# First attempt
try:
store1.initialize()
except Exception:
pass # May fail partially
# Second attempt should succeed or fail gracefully
store2.initialize() # Should not crash
# Verify database is in usable state
with store2._get_connection() as conn:
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
# Should have files table (either old or new schema)
assert 'files' in tables
finally:
store1.close()
store2.close()

View File

@@ -701,3 +701,72 @@ class TestHybridSearchFullCoverage:
store.close()
if db_path.exists():
db_path.unlink()
class TestHybridSearchWithVectorMock:
"""Tests for hybrid search with mocked vector search."""
@pytest.fixture
def mock_vector_db(self):
"""Create database with vector search mocked."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = Path(f.name)
store = DirIndexStore(db_path)
store.initialize()
# Index sample files
files = {
"auth/login.py": "def login_user(username, password): authenticate()",
"auth/logout.py": "def logout_user(session): cleanup_session()",
"user/profile.py": "class UserProfile: def get_data(): pass"
}
with store._get_connection() as conn:
for path, content in files.items():
name = path.split('/')[-1]
conn.execute(
"""INSERT INTO files (name, full_path, content, language, mtime)
VALUES (?, ?, ?, ?, ?)""",
(name, path, content, "python", 0.0)
)
conn.commit()
yield db_path
store.close()
if db_path.exists():
db_path.unlink()
def test_hybrid_with_vector_enabled(self, mock_vector_db):
"""Test hybrid search with vector search enabled (mocked)."""
from unittest.mock import patch, MagicMock
# Mock the vector search to return fake results
mock_vector_results = [
SearchResult(path="auth/login.py", score=0.95, content_snippet="login"),
SearchResult(path="user/profile.py", score=0.75, content_snippet="profile")
]
engine = HybridSearchEngine()
# Mock vector search method if it exists
with patch.object(engine, '_search_vector', return_value=mock_vector_results) if hasattr(engine, '_search_vector') else patch('codexlens.search.hybrid_search.vector_search', return_value=mock_vector_results):
results = engine.search(
mock_vector_db,
"login",
limit=10,
enable_fuzzy=True,
enable_vector=True # ENABLE vector search
)
# Should get results from RRF fusion of exact + fuzzy + vector
assert isinstance(results, list)
assert len(results) > 0, "Hybrid search with vector should return results"
# Results should have fusion scores
for result in results:
assert hasattr(result, 'score')
assert result.score > 0 # RRF fusion scores are positive

View File

@@ -0,0 +1,324 @@
"""Tests for pure vector search functionality."""
import pytest
import sqlite3
import tempfile
from pathlib import Path
from codexlens.search.hybrid_search import HybridSearchEngine
from codexlens.storage.dir_index import DirIndexStore
# Check if semantic dependencies are available
try:
from codexlens.semantic import SEMANTIC_AVAILABLE
SEMANTIC_DEPS_AVAILABLE = SEMANTIC_AVAILABLE
except ImportError:
SEMANTIC_DEPS_AVAILABLE = False
class TestPureVectorSearch:
"""Tests for pure vector search mode."""
@pytest.fixture
def sample_db(self):
"""Create sample database with files."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = Path(f.name)
store = DirIndexStore(db_path)
store.initialize()
# Add sample files
files = {
"auth.py": "def authenticate_user(username, password): pass",
"login.py": "def login_handler(credentials): pass",
"user.py": "class User: pass",
}
with store._get_connection() as conn:
for path, content in files.items():
conn.execute(
"""INSERT INTO files (name, full_path, content, language, mtime)
VALUES (?, ?, ?, ?, ?)""",
(path, path, content, "python", 0.0)
)
conn.commit()
yield db_path
store.close()
if db_path.exists():
db_path.unlink()
def test_pure_vector_without_embeddings(self, sample_db):
"""Test pure_vector mode returns empty when no embeddings exist."""
engine = HybridSearchEngine()
results = engine.search(
sample_db,
"authentication",
limit=10,
enable_vector=True,
pure_vector=True,
)
# Should return empty list because no embeddings exist
assert isinstance(results, list)
assert len(results) == 0, \
"Pure vector search should return empty when no embeddings exist"
def test_vector_with_fallback(self, sample_db):
"""Test vector mode (with fallback) returns FTS results when no embeddings."""
engine = HybridSearchEngine()
results = engine.search(
sample_db,
"authenticate",
limit=10,
enable_vector=True,
pure_vector=False, # Allow FTS fallback
)
# Should return FTS results even without embeddings
assert isinstance(results, list)
assert len(results) > 0, \
"Vector mode with fallback should return FTS results"
# Verify results come from exact FTS
paths = [r.path for r in results]
assert "auth.py" in paths, "Should find auth.py via FTS"
def test_pure_vector_invalid_config(self, sample_db):
"""Test pure_vector=True but enable_vector=False logs warning."""
engine = HybridSearchEngine()
# Invalid: pure_vector=True but enable_vector=False
results = engine.search(
sample_db,
"test",
limit=10,
enable_vector=False,
pure_vector=True,
)
# Should fallback to exact search
assert isinstance(results, list)
def test_hybrid_mode_ignores_pure_vector(self, sample_db):
"""Test hybrid mode works normally (ignores pure_vector)."""
engine = HybridSearchEngine()
results = engine.search(
sample_db,
"authenticate",
limit=10,
enable_fuzzy=True,
enable_vector=False,
pure_vector=False, # Should be ignored in hybrid
)
# Should return results from exact + fuzzy
assert isinstance(results, list)
assert len(results) > 0
@pytest.mark.skipif(not SEMANTIC_DEPS_AVAILABLE, reason="Semantic dependencies not available")
class TestPureVectorWithEmbeddings:
"""Tests for pure vector search with actual embeddings."""
@pytest.fixture
def db_with_embeddings(self):
"""Create database with embeddings."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = Path(f.name)
store = DirIndexStore(db_path)
store.initialize()
# Add sample files
files = {
"auth/authentication.py": """
def authenticate_user(username: str, password: str) -> bool:
'''Verify user credentials against database.'''
return check_password(username, password)
def check_password(user: str, pwd: str) -> bool:
'''Check if password matches stored hash.'''
return True
""",
"auth/login.py": """
def login_handler(credentials: dict) -> bool:
'''Handle user login request.'''
username = credentials.get('username')
password = credentials.get('password')
return authenticate_user(username, password)
""",
}
with store._get_connection() as conn:
for path, content in files.items():
name = path.split('/')[-1]
conn.execute(
"""INSERT INTO files (name, full_path, content, language, mtime)
VALUES (?, ?, ?, ?, ?)""",
(name, path, content, "python", 0.0)
)
conn.commit()
# Generate embeddings
try:
from codexlens.semantic.embedder import Embedder
from codexlens.semantic.vector_store import VectorStore
from codexlens.semantic.chunker import Chunker, ChunkConfig
embedder = Embedder(profile="fast") # Use fast model for testing
vector_store = VectorStore(db_path)
chunker = Chunker(config=ChunkConfig(max_chunk_size=1000))
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT full_path, content FROM files").fetchall()
for row in rows:
chunks = chunker.chunk_sliding_window(
row["content"],
file_path=row["full_path"],
language="python"
)
for chunk in chunks:
chunk.embedding = embedder.embed_single(chunk.content)
if chunks:
vector_store.add_chunks(chunks, row["full_path"])
except Exception as exc:
pytest.skip(f"Failed to generate embeddings: {exc}")
yield db_path
store.close()
if db_path.exists():
db_path.unlink()
def test_pure_vector_with_embeddings(self, db_with_embeddings):
"""Test pure vector search returns results when embeddings exist."""
engine = HybridSearchEngine()
results = engine.search(
db_with_embeddings,
"how to verify user credentials", # Natural language query
limit=10,
enable_vector=True,
pure_vector=True,
)
# Should return results from vector search only
assert isinstance(results, list)
assert len(results) > 0, "Pure vector search should return results"
# Results should have semantic relevance
for result in results:
assert result.score > 0
assert result.path is not None
def test_compare_pure_vs_hybrid(self, db_with_embeddings):
"""Compare pure vector vs hybrid search results."""
engine = HybridSearchEngine()
# Pure vector search
pure_results = engine.search(
db_with_embeddings,
"verify credentials",
limit=10,
enable_vector=True,
pure_vector=True,
)
# Hybrid search
hybrid_results = engine.search(
db_with_embeddings,
"verify credentials",
limit=10,
enable_fuzzy=True,
enable_vector=True,
pure_vector=False,
)
# Both should return results
assert len(pure_results) > 0, "Pure vector should find results"
assert len(hybrid_results) > 0, "Hybrid should find results"
# Hybrid may have more results (FTS + vector)
# But pure should still be useful for semantic queries
class TestSearchModeComparison:
"""Compare different search modes."""
@pytest.fixture
def comparison_db(self):
"""Create database for mode comparison."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = Path(f.name)
store = DirIndexStore(db_path)
store.initialize()
files = {
"auth.py": "def authenticate(): pass",
"login.py": "def login(): pass",
}
with store._get_connection() as conn:
for path, content in files.items():
conn.execute(
"""INSERT INTO files (name, full_path, content, language, mtime)
VALUES (?, ?, ?, ?, ?)""",
(path, path, content, "python", 0.0)
)
conn.commit()
yield db_path
store.close()
if db_path.exists():
db_path.unlink()
def test_mode_comparison_without_embeddings(self, comparison_db):
"""Compare all search modes without embeddings."""
engine = HybridSearchEngine()
query = "authenticate"
# Test each mode
modes = [
("exact", False, False, False),
("fuzzy", True, False, False),
("vector", False, True, False), # With fallback
("pure_vector", False, True, True), # No fallback
]
results = {}
for mode_name, fuzzy, vector, pure in modes:
result = engine.search(
comparison_db,
query,
limit=10,
enable_fuzzy=fuzzy,
enable_vector=vector,
pure_vector=pure,
)
results[mode_name] = len(result)
# Assertions
assert results["exact"] > 0, "Exact should find results"
assert results["fuzzy"] >= results["exact"], "Fuzzy should find at least as many"
assert results["vector"] > 0, "Vector with fallback should find results (from FTS)"
assert results["pure_vector"] == 0, "Pure vector should return empty (no embeddings)"
# Log comparison
print("\nMode comparison (without embeddings):")
for mode, count in results.items():
print(f" {mode}: {count} results")
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])

View File

@@ -424,3 +424,62 @@ class TestMinTokenLength:
# Should include "a" and "B"
assert "a" in result or "aB" in result
assert "B" in result or "aB" in result
class TestComplexBooleanQueries:
"""Tests for complex boolean query parsing."""
@pytest.fixture
def parser(self):
return QueryParser()
def test_nested_boolean_and_or(self, parser):
"""Test parser preserves nested boolean logic: (A OR B) AND C."""
query = "(login OR logout) AND user"
expanded = parser.preprocess_query(query)
# Should preserve parentheses and boolean operators
assert "(" in expanded
assert ")" in expanded
assert "AND" in expanded
assert "OR" in expanded
def test_mixed_operators_with_expansion(self, parser):
"""Test CamelCase expansion doesn't break boolean operators."""
query = "UserAuth AND (login OR logout)"
expanded = parser.preprocess_query(query)
# Should expand UserAuth but preserve operators
assert "User" in expanded or "Auth" in expanded
assert "AND" in expanded
assert "OR" in expanded
assert "(" in expanded
def test_quoted_phrases_with_boolean(self, parser):
"""Test quoted phrases preserved with boolean operators."""
query = '"user authentication" AND login'
expanded = parser.preprocess_query(query)
# Quoted phrase should remain intact
assert '"user authentication"' in expanded or '"' in expanded
assert "AND" in expanded
def test_not_operator_preservation(self, parser):
"""Test NOT operator is preserved correctly."""
query = "login NOT logout"
expanded = parser.preprocess_query(query)
assert "NOT" in expanded
assert "login" in expanded
assert "logout" in expanded
def test_complex_nested_three_levels(self, parser):
"""Test deeply nested boolean logic: ((A OR B) AND C) OR D."""
query = "((UserAuth OR login) AND session) OR token"
expanded = parser.preprocess_query(query)
# Should handle multiple nesting levels
assert expanded.count("(") >= 2 # At least 2 opening parens
assert expanded.count(")") >= 2 # At least 2 closing parens

View File

@@ -0,0 +1,306 @@
"""
Test migration 005: Schema cleanup for unused/redundant fields.
Tests that migration 005 successfully removes:
1. semantic_metadata.keywords (replaced by file_keywords)
2. symbols.token_count (unused)
3. symbols.symbol_type (redundant with kind)
4. subdirs.direct_files (unused)
"""
import sqlite3
import tempfile
from pathlib import Path
import pytest
from codexlens.storage.dir_index import DirIndexStore
from codexlens.entities import Symbol
class TestSchemaCleanupMigration:
"""Test schema cleanup migration (v4 -> v5)."""
def test_migration_from_v4_to_v5(self):
"""Test that migration successfully removes deprecated fields."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "_index.db"
store = DirIndexStore(db_path)
# Create v4 schema manually (with deprecated fields)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Set schema version to 4
cursor.execute("PRAGMA user_version = 4")
# Create v4 schema with deprecated fields
cursor.execute("""
CREATE TABLE files (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
full_path TEXT UNIQUE NOT NULL,
language TEXT,
content TEXT,
mtime REAL,
line_count INTEGER
)
""")
cursor.execute("""
CREATE TABLE subdirs (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
index_path TEXT NOT NULL,
files_count INTEGER DEFAULT 0,
direct_files INTEGER DEFAULT 0,
last_updated REAL
)
""")
cursor.execute("""
CREATE TABLE symbols (
id INTEGER PRIMARY KEY,
file_id INTEGER REFERENCES files(id) ON DELETE CASCADE,
name TEXT NOT NULL,
kind TEXT NOT NULL,
start_line INTEGER,
end_line INTEGER,
token_count INTEGER,
symbol_type TEXT
)
""")
cursor.execute("""
CREATE TABLE semantic_metadata (
id INTEGER PRIMARY KEY,
file_id INTEGER UNIQUE REFERENCES files(id) ON DELETE CASCADE,
summary TEXT,
keywords TEXT,
purpose TEXT,
llm_tool TEXT,
generated_at REAL
)
""")
cursor.execute("""
CREATE TABLE keywords (
id INTEGER PRIMARY KEY,
keyword TEXT NOT NULL UNIQUE
)
""")
cursor.execute("""
CREATE TABLE file_keywords (
file_id INTEGER NOT NULL,
keyword_id INTEGER NOT NULL,
PRIMARY KEY (file_id, keyword_id),
FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE,
FOREIGN KEY (keyword_id) REFERENCES keywords (id) ON DELETE CASCADE
)
""")
# Insert test data
cursor.execute(
"INSERT INTO files (name, full_path, language, content, mtime, line_count) VALUES (?, ?, ?, ?, ?, ?)",
("test.py", "/test/test.py", "python", "def test(): pass", 1234567890.0, 1)
)
file_id = cursor.lastrowid
cursor.execute(
"INSERT INTO symbols (file_id, name, kind, start_line, end_line, token_count, symbol_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
(file_id, "test", "function", 1, 1, 10, "function")
)
cursor.execute(
"INSERT INTO semantic_metadata (file_id, summary, keywords, purpose, llm_tool, generated_at) VALUES (?, ?, ?, ?, ?, ?)",
(file_id, "Test function", '["test", "example"]', "Testing", "gemini", 1234567890.0)
)
cursor.execute(
"INSERT INTO subdirs (name, index_path, files_count, direct_files, last_updated) VALUES (?, ?, ?, ?, ?)",
("subdir", "/test/subdir/_index.db", 5, 2, 1234567890.0)
)
conn.commit()
conn.close()
# Now initialize store - this should trigger migration
store.initialize()
# Verify schema version is now 5
conn = store._get_connection()
version_row = conn.execute("PRAGMA user_version").fetchone()
assert version_row[0] == 5, f"Expected schema version 5, got {version_row[0]}"
# Check that deprecated columns are removed
# 1. Check semantic_metadata doesn't have keywords column
cursor = conn.execute("PRAGMA table_info(semantic_metadata)")
columns = {row[1] for row in cursor.fetchall()}
assert "keywords" not in columns, "semantic_metadata.keywords should be removed"
assert "summary" in columns, "semantic_metadata.summary should exist"
assert "purpose" in columns, "semantic_metadata.purpose should exist"
# 2. Check symbols doesn't have token_count or symbol_type
cursor = conn.execute("PRAGMA table_info(symbols)")
columns = {row[1] for row in cursor.fetchall()}
assert "token_count" not in columns, "symbols.token_count should be removed"
assert "symbol_type" not in columns, "symbols.symbol_type should be removed"
assert "kind" in columns, "symbols.kind should exist"
# 3. Check subdirs doesn't have direct_files
cursor = conn.execute("PRAGMA table_info(subdirs)")
columns = {row[1] for row in cursor.fetchall()}
assert "direct_files" not in columns, "subdirs.direct_files should be removed"
assert "files_count" in columns, "subdirs.files_count should exist"
# 4. Verify data integrity - data should be preserved
semantic = store.get_semantic_metadata(file_id)
assert semantic is not None, "Semantic metadata should be preserved"
assert semantic["summary"] == "Test function"
assert semantic["purpose"] == "Testing"
# Keywords should now come from file_keywords table (empty after migration since we didn't populate it)
assert isinstance(semantic["keywords"], list)
store.close()
def test_new_database_has_clean_schema(self):
"""Test that new databases are created with clean schema (v5)."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "_index.db"
store = DirIndexStore(db_path)
store.initialize()
conn = store._get_connection()
# Verify schema version is 5
version_row = conn.execute("PRAGMA user_version").fetchone()
assert version_row[0] == 5
# Check that new schema doesn't have deprecated columns
cursor = conn.execute("PRAGMA table_info(semantic_metadata)")
columns = {row[1] for row in cursor.fetchall()}
assert "keywords" not in columns
cursor = conn.execute("PRAGMA table_info(symbols)")
columns = {row[1] for row in cursor.fetchall()}
assert "token_count" not in columns
assert "symbol_type" not in columns
cursor = conn.execute("PRAGMA table_info(subdirs)")
columns = {row[1] for row in cursor.fetchall()}
assert "direct_files" not in columns
store.close()
def test_semantic_metadata_keywords_from_normalized_table(self):
"""Test that keywords are read from file_keywords table, not JSON column."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "_index.db"
store = DirIndexStore(db_path)
store.initialize()
# Add a file
file_id = store.add_file(
name="test.py",
full_path="/test/test.py",
content="def test(): pass",
language="python",
symbols=[]
)
# Add semantic metadata with keywords
store.add_semantic_metadata(
file_id=file_id,
summary="Test function",
keywords=["test", "example", "function"],
purpose="Testing",
llm_tool="gemini"
)
# Retrieve and verify keywords come from normalized table
semantic = store.get_semantic_metadata(file_id)
assert semantic is not None
assert sorted(semantic["keywords"]) == ["example", "function", "test"]
# Verify keywords are in normalized tables
conn = store._get_connection()
keyword_count = conn.execute(
"""SELECT COUNT(*) FROM file_keywords WHERE file_id = ?""",
(file_id,)
).fetchone()[0]
assert keyword_count == 3
store.close()
def test_symbols_insert_without_deprecated_fields(self):
"""Test that symbols can be inserted without token_count and symbol_type."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "_index.db"
store = DirIndexStore(db_path)
store.initialize()
# Add file with symbols
symbols = [
Symbol(name="test_func", kind="function", range=(1, 5)),
Symbol(name="TestClass", kind="class", range=(7, 20)),
]
file_id = store.add_file(
name="test.py",
full_path="/test/test.py",
content="def test_func(): pass\n\nclass TestClass:\n pass",
language="python",
symbols=symbols
)
# Verify symbols were inserted
conn = store._get_connection()
symbol_rows = conn.execute(
"SELECT name, kind, start_line, end_line FROM symbols WHERE file_id = ?",
(file_id,)
).fetchall()
assert len(symbol_rows) == 2
assert symbol_rows[0]["name"] == "test_func"
assert symbol_rows[0]["kind"] == "function"
assert symbol_rows[1]["name"] == "TestClass"
assert symbol_rows[1]["kind"] == "class"
store.close()
def test_subdir_operations_without_direct_files(self):
"""Test that subdir operations work without direct_files field."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "_index.db"
store = DirIndexStore(db_path)
store.initialize()
# Register subdir (direct_files parameter is ignored)
store.register_subdir(
name="subdir",
index_path="/test/subdir/_index.db",
files_count=10,
direct_files=5 # This should be ignored
)
# Retrieve and verify
subdir = store.get_subdir("subdir")
assert subdir is not None
assert subdir.name == "subdir"
assert subdir.files_count == 10
assert not hasattr(subdir, "direct_files") # Should not have this attribute
# Update stats (direct_files parameter is ignored)
store.update_subdir_stats("subdir", files_count=15, direct_files=7)
# Verify update
subdir = store.get_subdir("subdir")
assert subdir.files_count == 15
store.close()
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,529 @@
"""Comprehensive comparison test for vector search vs hybrid search.
This test diagnoses why vector search returns empty results and compares
performance between different search modes.
"""
import json
import sqlite3
import tempfile
import time
from pathlib import Path
from typing import Dict, List, Any
import pytest
from codexlens.entities import SearchResult
from codexlens.search.hybrid_search import HybridSearchEngine
from codexlens.storage.dir_index import DirIndexStore
# Check semantic search availability
try:
from codexlens.semantic.embedder import Embedder
from codexlens.semantic.vector_store import VectorStore
from codexlens.semantic import SEMANTIC_AVAILABLE
SEMANTIC_DEPS_AVAILABLE = SEMANTIC_AVAILABLE
except ImportError:
SEMANTIC_DEPS_AVAILABLE = False
class TestSearchComparison:
"""Comprehensive comparison of search modes."""
@pytest.fixture
def sample_project_db(self):
"""Create sample project database with semantic chunks."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = Path(f.name)
store = DirIndexStore(db_path)
store.initialize()
# Sample files with varied content for testing
sample_files = {
"src/auth/authentication.py": """
def authenticate_user(username: str, password: str) -> bool:
'''Authenticate user with credentials using bcrypt hashing.
This function validates user credentials against the database
and returns True if authentication succeeds.
'''
hashed = hash_password(password)
return verify_credentials(username, hashed)
def hash_password(password: str) -> str:
'''Hash password using bcrypt algorithm.'''
import bcrypt
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_credentials(user: str, pwd_hash: str) -> bool:
'''Verify user credentials against database.'''
# Database verification logic
return True
""",
"src/auth/authorization.py": """
def authorize_action(user_id: int, resource: str, action: str) -> bool:
'''Authorize user action on resource using role-based access control.
Checks if user has permission to perform action on resource
based on their assigned roles.
'''
roles = get_user_roles(user_id)
permissions = get_role_permissions(roles)
return has_permission(permissions, resource, action)
def get_user_roles(user_id: int) -> List[str]:
'''Fetch user roles from database.'''
return ["user", "admin"]
def has_permission(permissions, resource, action) -> bool:
'''Check if permissions allow action on resource.'''
return True
""",
"src/models/user.py": """
from dataclasses import dataclass
from typing import Optional
@dataclass
class User:
'''User model representing application users.
Stores user profile information and authentication state.
'''
id: int
username: str
email: str
password_hash: str
is_active: bool = True
def authenticate(self, password: str) -> bool:
'''Authenticate this user with password.'''
from auth.authentication import verify_credentials
return verify_credentials(self.username, password)
def has_role(self, role: str) -> bool:
'''Check if user has specific role.'''
return True
""",
"src/api/user_api.py": """
from flask import Flask, request, jsonify
from models.user import User
app = Flask(__name__)
@app.route('/api/user/<int:user_id>', methods=['GET'])
def get_user(user_id: int):
'''Get user by ID from database.
Returns user profile information as JSON.
'''
user = User.query.get(user_id)
return jsonify(user.to_dict())
@app.route('/api/user/login', methods=['POST'])
def login():
'''User login endpoint using username and password.
Authenticates user and returns session token.
'''
data = request.json
username = data.get('username')
password = data.get('password')
if authenticate_user(username, password):
token = generate_session_token(username)
return jsonify({'token': token})
return jsonify({'error': 'Invalid credentials'}), 401
""",
"tests/test_auth.py": """
import pytest
from auth.authentication import authenticate_user, hash_password
class TestAuthentication:
'''Test authentication functionality.'''
def test_authenticate_valid_user(self):
'''Test authentication with valid credentials.'''
assert authenticate_user("testuser", "password123") == True
def test_authenticate_invalid_user(self):
'''Test authentication with invalid credentials.'''
assert authenticate_user("invalid", "wrong") == False
def test_password_hashing(self):
'''Test password hashing produces unique hashes.'''
hash1 = hash_password("password")
hash2 = hash_password("password")
assert hash1 != hash2 # Salts should differ
""",
}
# Insert files into database
with store._get_connection() as conn:
for file_path, content in sample_files.items():
name = file_path.split('/')[-1]
lang = "python"
conn.execute(
"""INSERT INTO files (name, full_path, content, language, mtime)
VALUES (?, ?, ?, ?, ?)""",
(name, file_path, content, lang, time.time())
)
conn.commit()
yield db_path
store.close()
if db_path.exists():
db_path.unlink()
def _check_semantic_chunks_table(self, db_path: Path) -> Dict[str, Any]:
"""Check if semantic_chunks table exists and has data."""
with sqlite3.connect(db_path) as conn:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='semantic_chunks'"
)
table_exists = cursor.fetchone() is not None
chunk_count = 0
if table_exists:
cursor = conn.execute("SELECT COUNT(*) FROM semantic_chunks")
chunk_count = cursor.fetchone()[0]
return {
"table_exists": table_exists,
"chunk_count": chunk_count,
}
def _create_vector_index(self, db_path: Path) -> Dict[str, Any]:
"""Create vector embeddings for indexed files."""
if not SEMANTIC_DEPS_AVAILABLE:
return {
"success": False,
"error": "Semantic dependencies not available",
"chunks_created": 0,
}
try:
from codexlens.semantic.chunker import Chunker, ChunkConfig
# Initialize embedder and vector store
embedder = Embedder(profile="code")
vector_store = VectorStore(db_path)
chunker = Chunker(config=ChunkConfig(max_chunk_size=2000))
# Read files from database
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT full_path, content FROM files")
files = cursor.fetchall()
chunks_created = 0
for file_row in files:
file_path = file_row["full_path"]
content = file_row["content"]
# Create semantic chunks using sliding window
chunks = chunker.chunk_sliding_window(
content,
file_path=file_path,
language="python"
)
# Generate embeddings
for chunk in chunks:
embedding = embedder.embed_single(chunk.content)
chunk.embedding = embedding
# Store chunks
if chunks: # Only store if we have chunks
vector_store.add_chunks(chunks, file_path)
chunks_created += len(chunks)
return {
"success": True,
"chunks_created": chunks_created,
"files_processed": len(files),
}
except Exception as exc:
return {
"success": False,
"error": str(exc),
"chunks_created": 0,
}
def _run_search_mode(
self,
db_path: Path,
query: str,
mode: str,
limit: int = 10,
) -> Dict[str, Any]:
"""Run search in specified mode and collect metrics."""
engine = HybridSearchEngine()
# Map mode to parameters
if mode == "exact":
enable_fuzzy, enable_vector = False, False
elif mode == "fuzzy":
enable_fuzzy, enable_vector = True, False
elif mode == "vector":
enable_fuzzy, enable_vector = False, True
elif mode == "hybrid":
enable_fuzzy, enable_vector = True, True
else:
raise ValueError(f"Invalid mode: {mode}")
# Measure search time
start_time = time.time()
try:
results = engine.search(
db_path,
query,
limit=limit,
enable_fuzzy=enable_fuzzy,
enable_vector=enable_vector,
)
elapsed_ms = (time.time() - start_time) * 1000
return {
"success": True,
"mode": mode,
"query": query,
"result_count": len(results),
"elapsed_ms": elapsed_ms,
"results": [
{
"path": r.path,
"score": r.score,
"excerpt": r.excerpt[:100] if r.excerpt else "",
"source": getattr(r, "search_source", None),
}
for r in results[:5] # Top 5 results
],
}
except Exception as exc:
elapsed_ms = (time.time() - start_time) * 1000
return {
"success": False,
"mode": mode,
"query": query,
"error": str(exc),
"elapsed_ms": elapsed_ms,
"result_count": 0,
}
@pytest.mark.skipif(not SEMANTIC_DEPS_AVAILABLE, reason="Semantic dependencies not available")
def test_full_search_comparison_with_vectors(self, sample_project_db):
"""Complete search comparison test with vector embeddings."""
db_path = sample_project_db
# Step 1: Check initial state
print("\n=== Step 1: Checking initial database state ===")
initial_state = self._check_semantic_chunks_table(db_path)
print(f"Table exists: {initial_state['table_exists']}")
print(f"Chunk count: {initial_state['chunk_count']}")
# Step 2: Create vector index
print("\n=== Step 2: Creating vector embeddings ===")
vector_result = self._create_vector_index(db_path)
print(f"Success: {vector_result['success']}")
if vector_result['success']:
print(f"Chunks created: {vector_result['chunks_created']}")
print(f"Files processed: {vector_result['files_processed']}")
else:
print(f"Error: {vector_result.get('error', 'Unknown')}")
# Step 3: Verify vector index was created
print("\n=== Step 3: Verifying vector index ===")
final_state = self._check_semantic_chunks_table(db_path)
print(f"Table exists: {final_state['table_exists']}")
print(f"Chunk count: {final_state['chunk_count']}")
# Step 4: Run comparison tests
print("\n=== Step 4: Running search mode comparison ===")
test_queries = [
"authenticate user credentials", # Semantic query
"authentication", # Keyword query
"password hashing bcrypt", # Multi-term query
]
comparison_results = []
for query in test_queries:
print(f"\n--- Query: '{query}' ---")
for mode in ["exact", "fuzzy", "vector", "hybrid"]:
result = self._run_search_mode(db_path, query, mode, limit=10)
comparison_results.append(result)
print(f"\n{mode.upper()} mode:")
print(f" Success: {result['success']}")
print(f" Results: {result['result_count']}")
print(f" Time: {result['elapsed_ms']:.2f}ms")
if result['success'] and result['result_count'] > 0:
print(f" Top result: {result['results'][0]['path']}")
print(f" Score: {result['results'][0]['score']:.3f}")
print(f" Source: {result['results'][0]['source']}")
elif not result['success']:
print(f" Error: {result.get('error', 'Unknown')}")
# Step 5: Generate comparison report
print("\n=== Step 5: Comparison Summary ===")
# Group by mode
mode_stats = {}
for result in comparison_results:
mode = result['mode']
if mode not in mode_stats:
mode_stats[mode] = {
"total_searches": 0,
"successful_searches": 0,
"total_results": 0,
"total_time_ms": 0,
"empty_results": 0,
}
stats = mode_stats[mode]
stats["total_searches"] += 1
if result['success']:
stats["successful_searches"] += 1
stats["total_results"] += result['result_count']
if result['result_count'] == 0:
stats["empty_results"] += 1
stats["total_time_ms"] += result['elapsed_ms']
# Print summary table
print("\nMode | Queries | Success | Avg Results | Avg Time | Empty Results")
print("-" * 75)
for mode in ["exact", "fuzzy", "vector", "hybrid"]:
if mode in mode_stats:
stats = mode_stats[mode]
avg_results = stats["total_results"] / stats["total_searches"]
avg_time = stats["total_time_ms"] / stats["total_searches"]
print(
f"{mode:9} | {stats['total_searches']:7} | "
f"{stats['successful_searches']:7} | {avg_results:11.1f} | "
f"{avg_time:8.1f}ms | {stats['empty_results']:13}"
)
# Assertions
assert initial_state is not None
if vector_result['success']:
assert final_state['chunk_count'] > 0, "Vector index should contain chunks"
# Find vector search results
vector_results = [r for r in comparison_results if r['mode'] == 'vector']
if vector_results:
# At least one vector search should return results if index was created
has_vector_results = any(r.get('result_count', 0) > 0 for r in vector_results)
if not has_vector_results:
print("\n⚠️ WARNING: Vector index created but vector search returned no results!")
print("This indicates a potential issue with vector search implementation.")
def test_search_comparison_without_vectors(self, sample_project_db):
"""Search comparison test without vector embeddings (baseline)."""
db_path = sample_project_db
print("\n=== Testing search without vector embeddings ===")
# Check state
state = self._check_semantic_chunks_table(db_path)
print(f"Semantic chunks table exists: {state['table_exists']}")
print(f"Chunk count: {state['chunk_count']}")
# Run exact and fuzzy searches only
test_queries = ["authentication", "user password", "bcrypt hash"]
for query in test_queries:
print(f"\n--- Query: '{query}' ---")
for mode in ["exact", "fuzzy"]:
result = self._run_search_mode(db_path, query, mode, limit=10)
print(f"{mode.upper()}: {result['result_count']} results in {result['elapsed_ms']:.2f}ms")
if result['success'] and result['result_count'] > 0:
print(f" Top: {result['results'][0]['path']} (score: {result['results'][0]['score']:.3f})")
# Test vector search without embeddings (should return empty)
print(f"\n--- Testing vector search without embeddings ---")
vector_result = self._run_search_mode(db_path, "authentication", "vector", limit=10)
print(f"Vector search result count: {vector_result['result_count']}")
print(f"This is expected to be 0 without embeddings: {vector_result['result_count'] == 0}")
assert vector_result['result_count'] == 0, \
"Vector search should return empty results when no embeddings exist"
class TestDiagnostics:
"""Diagnostic tests to identify specific issues."""
@pytest.fixture
def empty_db(self):
"""Create empty database."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = Path(f.name)
store = DirIndexStore(db_path)
store.initialize()
store.close()
yield db_path
if db_path.exists():
db_path.unlink()
def test_diagnose_empty_database(self, empty_db):
"""Diagnose behavior with empty database."""
engine = HybridSearchEngine()
print("\n=== Diagnosing empty database ===")
# Test all modes
for mode_config in [
("exact", False, False),
("fuzzy", True, False),
("vector", False, True),
("hybrid", True, True),
]:
mode, enable_fuzzy, enable_vector = mode_config
try:
results = engine.search(
empty_db,
"test",
limit=10,
enable_fuzzy=enable_fuzzy,
enable_vector=enable_vector,
)
print(f"{mode}: {len(results)} results (OK)")
assert isinstance(results, list)
assert len(results) == 0
except Exception as exc:
print(f"{mode}: ERROR - {exc}")
# Should not raise errors, should return empty list
pytest.fail(f"Search mode '{mode}' raised exception on empty database: {exc}")
@pytest.mark.skipif(not SEMANTIC_DEPS_AVAILABLE, reason="Semantic dependencies not available")
def test_diagnose_embedder_initialization(self):
"""Test embedder initialization and embedding generation."""
print("\n=== Diagnosing embedder ===")
try:
embedder = Embedder(profile="code")
print(f"✓ Embedder initialized (model: {embedder.model_name})")
print(f" Embedding dimension: {embedder.embedding_dim}")
# Test embedding generation
test_text = "def authenticate_user(username, password):"
embedding = embedder.embed_single(test_text)
print(f"✓ Generated embedding (length: {len(embedding)})")
print(f" Sample values: {embedding[:5]}")
assert len(embedding) == embedder.embedding_dim
assert all(isinstance(v, float) for v in embedding)
except Exception as exc:
print(f"✗ Embedder error: {exc}")
raise
if __name__ == "__main__":
# Run tests with pytest
pytest.main([__file__, "-v", "-s"])