Add comprehensive tests for vector/semantic search functionality

- Implement full coverage tests for Embedder model loading and embedding generation
- Add CRUD operations and caching tests for VectorStore
- Include cosine similarity computation tests
- Validate semantic search accuracy and relevance through various queries
- Establish performance benchmarks for embedding and search operations
- Ensure edge cases and error handling are covered
- Test thread safety and concurrent access scenarios
- Verify availability of semantic search dependencies
This commit is contained in:
catlog22
2025-12-14 17:17:09 +08:00
parent 8d542b8e45
commit 79a2953862
47 changed files with 11208 additions and 4336 deletions

View File

@@ -0,0 +1,83 @@
# Semantic Search Integration
## Overview
The ChainSearchEngine now supports semantic keyword search in addition to FTS5 full-text search.
## Usage
### Enable Semantic Search
```python
from pathlib import Path
from codexlens.search.chain_search import ChainSearchEngine, SearchOptions
from codexlens.storage.registry import RegistryStore
from codexlens.storage.path_mapper import PathMapper
# Initialize
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
engine = ChainSearchEngine(registry, mapper)
# Create options with semantic search enabled
options = SearchOptions(
include_semantic=True, # Enable semantic keyword search
total_limit=50
)
# Execute search
result = engine.search("authentication", Path("./src"), options)
# Results include both FTS and semantic matches
for r in result.results:
print(f"{r.path}: {r.score:.2f} - {r.excerpt}")
```
### How It Works
1. **FTS Search**: Traditional full-text search using SQLite FTS5
2. **Semantic Search**: Searches the `semantic_metadata.keywords` field
3. **Result Merging**: Semantic results are added with 0.8x weight
- FTS results: BM25 score from SQLite
- Semantic results: Base score of 10.0 * 0.8 = 8.0
4. **Deduplication**: `_merge_and_rank()` deduplicates by path, keeping highest score
### Result Format
- **FTS results**: Regular excerpt from matched content
- **Semantic results**: `Keywords: keyword1, keyword2, keyword3, ...`
### Prerequisites
Files must have semantic metadata generated via:
```bash
codex-lens enhance . --tool gemini
```
This uses CCW CLI to generate summaries, keywords, and purpose descriptions.
## Implementation Details
### Changes Made
1. **SearchOptions**: Added `include_semantic: bool = False` parameter
2. **_search_parallel()**: Passes `include_semantic` to worker threads
3. **_search_single_index()**:
- Accepts `include_semantic` parameter
- Calls `DirIndexStore.search_semantic_keywords()` when enabled
- Converts semantic matches to `SearchResult` objects
- Applies 0.8x weight to semantic scores
### Score Weighting
```python
# FTS result (from BM25)
SearchResult(path="...", score=12.5, excerpt="...")
# Semantic result (fixed weighted score)
SearchResult(path="...", score=8.0, excerpt="Keywords: ...")
```
The 0.8x weight ensures semantic matches rank slightly lower than direct FTS matches
but still appear in relevant results.

View File

@@ -0,0 +1,19 @@
Executing gemini (analysis mode)...
Loaded cached credentials.
[STARTUP] StartupProfiler.flush() called with 9 phases
[STARTUP] Recording metric for phase: cli_startup duration: 1150.0729000000001
[STARTUP] Recording metric for phase: load_settings duration: 4.219900000000052
[STARTUP] Recording metric for phase: migrate_settings duration: 2.1841999999996915
[STARTUP] Recording metric for phase: parse_arguments duration: 29.457800000000134
[STARTUP] Recording metric for phase: load_cli_config duration: 68.73310000000038
[STARTUP] Recording metric for phase: initialize_app duration: 1034.8242
[STARTUP] Recording metric for phase: authenticate duration: 1029.4676
[STARTUP] Recording metric for phase: discover_tools duration: 4.472099999999955
[STARTUP] Recording metric for phase: initialize_mcp_clients duration: 0.6972999999998137
Got it. I'm ready for your first command.
✓ Completed in 16.1s
ID: 1765691168543-gemini
Continue: ccw cli exec "..." --resume 1765691168543-gemini

View File

@@ -0,0 +1,22 @@
=== STDOUT ===
Executing gemini (analysis mode)...
Loaded cached credentials.
[STARTUP] StartupProfiler.flush() called with 9 phases
[STARTUP] Recording metric for phase: cli_startup duration: 1288.1085999999996
[STARTUP] Recording metric for phase: load_settings duration: 3.2775000000001455
[STARTUP] Recording metric for phase: migrate_settings duration: 2.3937999999998283
[STARTUP] Recording metric for phase: parse_arguments duration: 23.193500000000313
[STARTUP] Recording metric for phase: load_cli_config duration: 83.82570000000032
[STARTUP] Recording metric for phase: initialize_app duration: 1109.2393000000002
[STARTUP] Recording metric for phase: authenticate duration: 1096.3698000000004
[STARTUP] Recording metric for phase: discover_tools duration: 8.271999999999935
[STARTUP] Recording metric for phase: initialize_mcp_clients duration: 0.9225999999998749
Setup complete. I am ready for your first command.
✓ Completed in 19.6s
ID: 1765690404300-gemini
Continue: ccw cli exec "..." --resume 1765690404300-gemini
=== STDERR ===

View File

@@ -0,0 +1,25 @@
PURPOSE: Generate semantic summaries and search keywords for code files
TASK:
- For each code block, generate a concise summary (1-2 sentences)
- Extract 5-10 relevant search keywords
- Identify the functional purpose/category
MODE: analysis
EXPECTED: JSON format output
=== CODE BLOCKS ===
[FILE: auth.py]
```python
def auth(): pass
```
=== OUTPUT FORMAT ===
Return ONLY valid JSON (no markdown, no explanation):
{
"files": {
"<file_path>": {
"summary": "Brief description of what this code does",
"keywords": ["keyword1", "keyword2", ...],
"purpose": "category like: auth, api, util, ui, data, config, test"
}
}
}

View File

@@ -0,0 +1,19 @@
Executing gemini (analysis mode)...
Loaded cached credentials.
[STARTUP] StartupProfiler.flush() called with 9 phases
[STARTUP] Recording metric for phase: cli_startup duration: 1197.5227999999997
[STARTUP] Recording metric for phase: load_settings duration: 2.119999999999891
[STARTUP] Recording metric for phase: migrate_settings duration: 1.401600000000144
[STARTUP] Recording metric for phase: parse_arguments duration: 18.296000000000276
[STARTUP] Recording metric for phase: load_cli_config duration: 56.0604000000003
[STARTUP] Recording metric for phase: initialize_app duration: 1109.9696999999996
[STARTUP] Recording metric for phase: authenticate duration: 1104.0013
[STARTUP] Recording metric for phase: discover_tools duration: 3.9744999999993524
[STARTUP] Recording metric for phase: initialize_mcp_clients duration: 0.8747000000003027
Setup complete. I am ready for your first command.
✓ Completed in 16.0s
ID: 1765690668720-gemini
Continue: ccw cli exec "..." --resume 1765690668720-gemini

View File

@@ -1098,3 +1098,132 @@ def clean(
else:
console.print(f"[red]Clean failed (unexpected):[/red] {exc}")
raise typer.Exit(code=1)
@app.command("semantic-list")
def semantic_list(
path: Path = typer.Option(Path("."), "--path", "-p", help="Project path to list metadata from."),
offset: int = typer.Option(0, "--offset", "-o", min=0, help="Number of records to skip."),
limit: int = typer.Option(50, "--limit", "-n", min=1, max=100, help="Maximum records to return."),
tool_filter: Optional[str] = typer.Option(None, "--tool", "-t", help="Filter by LLM tool (gemini/qwen)."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""List semantic metadata entries for indexed files.
Shows files that have LLM-generated summaries and keywords.
Results are aggregated from all index databases in the project.
"""
_configure_logging(verbose)
base_path = path.expanduser().resolve()
registry: Optional[RegistryStore] = None
try:
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
project_info = registry.find_project(base_path)
if not project_info:
raise CodexLensError(f"No index found for: {base_path}. Run 'codex-lens init' first.")
index_dir = mapper.source_to_index_dir(base_path)
if not index_dir.exists():
raise CodexLensError(f"Index directory not found: {index_dir}")
all_results: list = []
total_count = 0
index_files = sorted(index_dir.rglob("_index.db"))
for db_path in index_files:
try:
store = DirIndexStore(db_path)
store.initialize()
results, count = store.list_semantic_metadata(
offset=0,
limit=1000,
llm_tool=tool_filter,
)
source_dir = mapper.index_to_source(db_path.parent)
for r in results:
r["source_dir"] = str(source_dir)
all_results.extend(results)
total_count += count
store.close()
except Exception as e:
if verbose:
console.print(f"[yellow]Warning: Error reading {db_path}: {e}[/yellow]")
all_results.sort(key=lambda x: x["generated_at"], reverse=True)
paginated = all_results[offset : offset + limit]
result = {
"path": str(base_path),
"total": total_count,
"offset": offset,
"limit": limit,
"count": len(paginated),
"entries": paginated,
}
if json_mode:
print_json(success=True, result=result)
else:
if not paginated:
console.print("[yellow]No semantic metadata found.[/yellow]")
console.print("Run 'codex-lens enhance' to generate metadata for indexed files.")
else:
table = Table(title=f"Semantic Metadata ({total_count} total)")
table.add_column("File", style="cyan", max_width=40)
table.add_column("Language", style="dim")
table.add_column("Purpose", max_width=30)
table.add_column("Keywords", max_width=25)
table.add_column("Tool")
for entry in paginated:
keywords_str = ", ".join(entry["keywords"][:3])
if len(entry["keywords"]) > 3:
keywords_str += f" (+{len(entry['keywords']) - 3})"
table.add_row(
entry["file_name"],
entry["language"] or "-",
(entry["purpose"] or "-")[:30],
keywords_str or "-",
entry["llm_tool"] or "-",
)
console.print(table)
if total_count > len(paginated):
console.print(
f"[dim]Showing {offset + 1}-{offset + len(paginated)} of {total_count}. "
"Use --offset and --limit for pagination.[/dim]"
)
except StorageError as exc:
if json_mode:
print_json(success=False, error=f"Storage error: {exc}")
else:
console.print(f"[red]Semantic-list failed (storage):[/red] {exc}")
raise typer.Exit(code=1)
except CodexLensError as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Semantic-list failed:[/red] {exc}")
raise typer.Exit(code=1)
except Exception as exc:
if json_mode:
print_json(success=False, error=f"Unexpected error: {exc}")
else:
console.print(f"[red]Semantic-list failed (unexpected):[/red] {exc}")
raise typer.Exit(code=1)
finally:
if registry is not None:
registry.close()

View File

@@ -78,6 +78,11 @@ class Config:
}
)
llm_enabled: bool = False
llm_tool: str = "gemini"
llm_timeout_ms: int = 300000
llm_batch_size: int = 5
def __post_init__(self) -> None:
try:
self.data_dir = self.data_dir.expanduser().resolve()

View File

@@ -30,6 +30,7 @@ class SearchOptions:
total_limit: Total result limit across all directories
include_symbols: Whether to include symbol search results
files_only: Return only file paths without excerpts
include_semantic: Whether to include semantic keyword search results
"""
depth: int = -1
max_workers: int = 8
@@ -37,6 +38,7 @@ class SearchOptions:
total_limit: int = 100
include_symbols: bool = False
files_only: bool = False
include_semantic: bool = False
@dataclass
@@ -378,7 +380,8 @@ class ChainSearchEngine:
idx_path,
query,
options.limit_per_dir,
options.files_only
options.files_only,
options.include_semantic
): idx_path
for idx_path in index_paths
}
@@ -400,7 +403,8 @@ class ChainSearchEngine:
def _search_single_index(self, index_path: Path,
query: str,
limit: int,
files_only: bool = False) -> List[SearchResult]:
files_only: bool = False,
include_semantic: bool = False) -> List[SearchResult]:
"""Search a single index database.
Handles exceptions gracefully, returning empty list on failure.
@@ -410,18 +414,40 @@ class ChainSearchEngine:
query: FTS5 query string
limit: Maximum results from this index
files_only: If True, skip snippet generation for faster search
include_semantic: If True, also search semantic keywords and merge results
Returns:
List of SearchResult objects (empty on error)
"""
try:
with DirIndexStore(index_path) as store:
# Get FTS results
if files_only:
# Fast path: return paths only without snippets
paths = store.search_files_only(query, limit=limit)
return [SearchResult(path=p, score=0.0, excerpt="") for p in paths]
fts_results = [SearchResult(path=p, score=0.0, excerpt="") for p in paths]
else:
return store.search_fts(query, limit=limit)
fts_results = store.search_fts(query, limit=limit)
# Optionally add semantic keyword results
if include_semantic:
try:
semantic_matches = store.search_semantic_keywords(query)
# Convert semantic matches to SearchResult with 0.8x weight
for file_entry, keywords in semantic_matches:
# Create excerpt from keywords
excerpt = f"Keywords: {', '.join(keywords[:5])}"
# Use a base score of 10.0 for semantic matches, weighted by 0.8
semantic_result = SearchResult(
path=str(file_entry.full_path),
score=10.0 * 0.8,
excerpt=excerpt
)
fts_results.append(semantic_result)
except Exception as sem_exc:
self.logger.debug(f"Semantic search error in {index_path}: {sem_exc}")
return fts_results
except Exception as exc:
self.logger.debug(f"Search error in {index_path}: {exc}")
return []

View File

@@ -32,4 +32,38 @@ def check_semantic_available() -> tuple[bool, str | None]:
"""Check if semantic search dependencies are available."""
return SEMANTIC_AVAILABLE, _import_error
__all__ = ["SEMANTIC_AVAILABLE", "SEMANTIC_BACKEND", "check_semantic_available"]
# Export LLM enhancement classes
try:
from .llm_enhancer import (
LLMEnhancer,
LLMConfig,
SemanticMetadata,
FileData,
EnhancedSemanticIndexer,
create_enhancer,
create_enhanced_indexer,
)
LLM_AVAILABLE = True
except ImportError:
LLM_AVAILABLE = False
LLMEnhancer = None # type: ignore
LLMConfig = None # type: ignore
SemanticMetadata = None # type: ignore
FileData = None # type: ignore
EnhancedSemanticIndexer = None # type: ignore
create_enhancer = None # type: ignore
create_enhanced_indexer = None # type: ignore
__all__ = [
"SEMANTIC_AVAILABLE",
"SEMANTIC_BACKEND",
"check_semantic_available",
"LLM_AVAILABLE",
"LLMEnhancer",
"LLMConfig",
"SemanticMetadata",
"FileData",
"EnhancedSemanticIndexer",
"create_enhancer",
"create_enhanced_indexer",
]

View File

@@ -0,0 +1,667 @@
"""LLM-based semantic enhancement using CCW CLI.
This module provides LLM-generated descriptions that are then embedded
by fastembed for improved semantic search. The flow is:
Code → LLM Summary → fastembed embedding → VectorStore → semantic search
LLM-generated summaries match natural language queries better than raw code.
"""
from __future__ import annotations
import json
import logging
import os
import subprocess
import shutil
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from codexlens.entities import SemanticChunk, Symbol
if TYPE_CHECKING:
from .embedder import Embedder
from .vector_store import VectorStore
logger = logging.getLogger(__name__)
@dataclass
class SemanticMetadata:
"""LLM-generated semantic metadata for a file or symbol."""
summary: str
keywords: List[str]
purpose: str
file_path: Optional[str] = None
symbol_name: Optional[str] = None
llm_tool: Optional[str] = None
@dataclass
class FileData:
"""File data for LLM processing."""
path: str
content: str
language: str
symbols: List[Symbol] = field(default_factory=list)
@dataclass
class LLMConfig:
"""Configuration for LLM enhancement.
Tool selection can be overridden via environment variables:
- CCW_CLI_SECONDARY_TOOL: Primary tool for LLM calls (default: gemini)
- CCW_CLI_FALLBACK_TOOL: Fallback tool if primary fails (default: qwen)
"""
tool: str = field(default_factory=lambda: os.environ.get("CCW_CLI_SECONDARY_TOOL", "gemini"))
fallback_tool: str = field(default_factory=lambda: os.environ.get("CCW_CLI_FALLBACK_TOOL", "qwen"))
timeout_ms: int = 300000
batch_size: int = 5
max_content_chars: int = 8000 # Max chars per file in batch prompt
enabled: bool = True
class LLMEnhancer:
"""LLM-based semantic enhancement using CCW CLI.
Generates code summaries and search keywords by calling
external LLM tools (gemini, qwen) via CCW CLI subprocess.
"""
PROMPT_TEMPLATE = '''PURPOSE: Generate semantic summaries and search keywords for code files
TASK:
- For each code block, generate a concise summary (1-2 sentences)
- Extract 5-10 relevant search keywords
- Identify the functional purpose/category
MODE: analysis
EXPECTED: JSON format output
=== CODE BLOCKS ===
{code_blocks}
=== OUTPUT FORMAT ===
Return ONLY valid JSON (no markdown, no explanation):
{{
"files": {{
"<file_path>": {{
"summary": "Brief description of what this code does",
"keywords": ["keyword1", "keyword2", ...],
"purpose": "category like: auth, api, util, ui, data, config, test"
}}
}}
}}'''
def __init__(self, config: LLMConfig | None = None) -> None:
"""Initialize LLM enhancer.
Args:
config: LLM configuration, uses defaults if None
"""
self.config = config or LLMConfig()
self._ccw_available: Optional[bool] = None
def check_available(self) -> bool:
"""Check if CCW CLI tool is available."""
if self._ccw_available is not None:
return self._ccw_available
self._ccw_available = shutil.which("ccw") is not None
if not self._ccw_available:
logger.warning("CCW CLI not found in PATH, LLM enhancement disabled")
return self._ccw_available
def enhance_files(
self,
files: List[FileData],
working_dir: Optional[Path] = None,
) -> Dict[str, SemanticMetadata]:
"""Enhance multiple files with LLM-generated semantic metadata.
Processes files in batches to manage token limits and API costs.
Args:
files: List of file data to process
working_dir: Optional working directory for CCW CLI
Returns:
Dict mapping file paths to SemanticMetadata
"""
if not self.config.enabled:
logger.debug("LLM enhancement disabled by config")
return {}
if not self.check_available():
return {}
if not files:
return {}
results: Dict[str, SemanticMetadata] = {}
batch_size = self.config.batch_size
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
try:
batch_results = self._process_batch(batch, working_dir)
results.update(batch_results)
logger.debug(
"Processed batch %d/%d: %d files enhanced",
i // batch_size + 1,
(len(files) + batch_size - 1) // batch_size,
len(batch_results),
)
except Exception as e:
logger.warning(
"Batch %d failed, continuing: %s",
i // batch_size + 1,
e,
)
continue
return results
def enhance_file(
self,
path: str,
content: str,
language: str,
working_dir: Optional[Path] = None,
) -> SemanticMetadata:
"""Enhance a single file with LLM-generated semantic metadata.
Convenience method that wraps enhance_files for single file processing.
Args:
path: File path
content: File content
language: Programming language
working_dir: Optional working directory for CCW CLI
Returns:
SemanticMetadata for the file
Raises:
ValueError: If enhancement fails
"""
file_data = FileData(path=path, content=content, language=language)
results = self.enhance_files([file_data], working_dir)
if path not in results:
# Return default metadata if enhancement failed
return SemanticMetadata(
summary=f"Code file written in {language}",
keywords=[language, "code"],
purpose="unknown",
file_path=path,
llm_tool=self.config.tool,
)
return results[path]
def _process_batch(
self,
files: List[FileData],
working_dir: Optional[Path] = None,
) -> Dict[str, SemanticMetadata]:
"""Process a single batch of files."""
prompt = self._build_batch_prompt(files)
# Try primary tool first
result = self._invoke_ccw_cli(
prompt,
tool=self.config.tool,
working_dir=working_dir,
)
# Fallback to secondary tool if primary fails
if not result["success"] and self.config.fallback_tool:
logger.debug(
"Primary tool %s failed, trying fallback %s",
self.config.tool,
self.config.fallback_tool,
)
result = self._invoke_ccw_cli(
prompt,
tool=self.config.fallback_tool,
working_dir=working_dir,
)
if not result["success"]:
logger.warning("LLM call failed: %s", result.get("stderr", "unknown error"))
return {}
return self._parse_response(result["stdout"], self.config.tool)
def _build_batch_prompt(self, files: List[FileData]) -> str:
"""Build prompt for batch processing."""
code_blocks_parts: List[str] = []
for file_data in files:
# Truncate content if too long
content = file_data.content
if len(content) > self.config.max_content_chars:
content = content[:self.config.max_content_chars] + "\n... [truncated]"
# Format code block
lang_hint = file_data.language or "text"
code_block = f'''[FILE: {file_data.path}]
```{lang_hint}
{content}
```'''
code_blocks_parts.append(code_block)
code_blocks = "\n\n".join(code_blocks_parts)
return self.PROMPT_TEMPLATE.format(code_blocks=code_blocks)
def _invoke_ccw_cli(
self,
prompt: str,
tool: str = "gemini",
working_dir: Optional[Path] = None,
) -> Dict[str, Any]:
"""Invoke CCW CLI tool via subprocess.
Args:
prompt: The prompt to send to LLM
tool: Tool name (gemini, qwen, codex)
working_dir: Optional working directory
Returns:
Dict with success, stdout, stderr, exit_code
"""
import sys
import os
timeout_seconds = (self.config.timeout_ms / 1000) + 30
# Build base arguments
base_args = [
"cli", "exec",
prompt, # Direct string argument
"--tool", tool,
"--mode", "analysis",
"--timeout", str(self.config.timeout_ms),
]
if working_dir:
base_args.extend(["--cd", str(working_dir)])
try:
if sys.platform == "win32":
# On Windows, ccw is a .CMD wrapper that requires shell
# Instead, directly invoke node with the ccw.js script
ccw_path = shutil.which("ccw")
if ccw_path and ccw_path.lower().endswith(".cmd"):
# Find the ccw.js script location
npm_dir = Path(ccw_path).parent
ccw_js = npm_dir / "node_modules" / "ccw" / "bin" / "ccw.js"
if ccw_js.exists():
cmd = ["node", str(ccw_js)] + base_args
else:
# Fallback to shell execution
cmd_str = "ccw " + " ".join(f'"{a}"' if " " in a else a for a in base_args)
result = subprocess.run(
cmd_str, shell=True, capture_output=True, text=True,
timeout=timeout_seconds, cwd=working_dir,
encoding="utf-8", errors="replace",
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.returncode,
}
else:
cmd = ["ccw"] + base_args
else:
cmd = ["ccw"] + base_args
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout_seconds,
cwd=working_dir,
encoding="utf-8",
errors="replace",
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.returncode,
}
except subprocess.TimeoutExpired:
logger.warning("CCW CLI timeout after %ds", self.config.timeout_ms / 1000)
return {
"success": False,
"stdout": "",
"stderr": "timeout",
"exit_code": -1,
}
except FileNotFoundError:
logger.warning("CCW CLI not found - ensure 'ccw' is in PATH")
return {
"success": False,
"stdout": "",
"stderr": "ccw command not found",
"exit_code": -1,
}
except Exception as e:
logger.warning("CCW CLI invocation failed: %s", e)
return {
"success": False,
"stdout": "",
"stderr": str(e),
"exit_code": -1,
}
def _parse_response(
self,
stdout: str,
tool: str,
) -> Dict[str, SemanticMetadata]:
"""Parse LLM response into SemanticMetadata objects.
Args:
stdout: Raw stdout from CCW CLI
tool: Tool name used for generation
Returns:
Dict mapping file paths to SemanticMetadata
"""
results: Dict[str, SemanticMetadata] = {}
# Extract JSON from response (may be wrapped in markdown or other text)
json_str = self._extract_json(stdout)
if not json_str:
logger.warning("No JSON found in LLM response")
return results
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
logger.warning("Failed to parse LLM response JSON: %s", e)
return results
# Handle expected format: {"files": {"path": {...}}}
files_data = data.get("files", data)
if not isinstance(files_data, dict):
logger.warning("Unexpected response format: expected dict")
return results
for file_path, metadata in files_data.items():
if not isinstance(metadata, dict):
continue
try:
results[file_path] = SemanticMetadata(
summary=metadata.get("summary", ""),
keywords=metadata.get("keywords", []),
purpose=metadata.get("purpose", ""),
file_path=file_path,
llm_tool=tool,
)
except Exception as e:
logger.debug("Failed to parse metadata for %s: %s", file_path, e)
continue
return results
def _extract_json(self, text: str) -> Optional[str]:
"""Extract JSON object from text that may contain markdown or other content."""
# Try to find JSON object boundaries
text = text.strip()
# Remove markdown code blocks if present
if text.startswith("```"):
lines = text.split("\n")
# Remove first line (```json or ```)
lines = lines[1:]
# Find closing ```
for i, line in enumerate(lines):
if line.strip() == "```":
lines = lines[:i]
break
text = "\n".join(lines)
# Find JSON object
start = text.find("{")
if start == -1:
return None
# Find matching closing brace
depth = 0
end = start
for i, char in enumerate(text[start:], start):
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
end = i + 1
break
if depth != 0:
return None
return text[start:end]
def create_enhancer(
tool: str = "gemini",
timeout_ms: int = 300000,
batch_size: int = 5,
enabled: bool = True,
) -> LLMEnhancer:
"""Factory function to create LLM enhancer with custom config."""
config = LLMConfig(
tool=tool,
timeout_ms=timeout_ms,
batch_size=batch_size,
enabled=enabled,
)
return LLMEnhancer(config)
class EnhancedSemanticIndexer:
"""Integrates LLM enhancement with fastembed vector search.
Flow:
1. Code files → LLM generates summaries/keywords
2. Summaries → fastembed generates embeddings
3. Embeddings → VectorStore for similarity search
This produces better semantic search because:
- LLM summaries are natural language descriptions
- Natural language queries match summaries better than raw code
- Keywords expand search coverage
"""
def __init__(
self,
enhancer: LLMEnhancer,
embedder: "Embedder",
vector_store: "VectorStore",
) -> None:
"""Initialize enhanced semantic indexer.
Args:
enhancer: LLM enhancer for generating summaries
embedder: Fastembed embedder for vector generation
vector_store: Vector storage for similarity search
"""
self.enhancer = enhancer
self.embedder = embedder
self.vector_store = vector_store
def index_files(
self,
files: List[FileData],
working_dir: Optional[Path] = None,
) -> int:
"""Index files with LLM-enhanced semantic search.
Args:
files: List of file data to index
working_dir: Optional working directory for LLM calls
Returns:
Number of files successfully indexed
"""
if not files:
return 0
# Step 1: Generate LLM summaries
logger.info("Generating LLM summaries for %d files...", len(files))
metadata_map = self.enhancer.enhance_files(files, working_dir)
if not metadata_map:
logger.warning("No LLM metadata generated, falling back to raw code")
return self._index_raw_code(files)
# Step 2: Create semantic chunks from LLM summaries
chunks_to_embed: List[SemanticChunk] = []
file_paths: List[str] = []
for file_data in files:
metadata = metadata_map.get(file_data.path)
if metadata:
# Use LLM-generated summary + keywords for embedding
embeddable_text = self._create_embeddable_text(metadata, file_data)
chunk = SemanticChunk(
content=embeddable_text,
embedding=None,
metadata={
"file": file_data.path,
"language": file_data.language,
"summary": metadata.summary,
"keywords": metadata.keywords,
"purpose": metadata.purpose,
"llm_tool": metadata.llm_tool,
"strategy": "llm_enhanced",
},
)
else:
# Fallback: use truncated raw code
chunk = SemanticChunk(
content=file_data.content[:2000],
embedding=None,
metadata={
"file": file_data.path,
"language": file_data.language,
"strategy": "raw_code",
},
)
chunks_to_embed.append(chunk)
file_paths.append(file_data.path)
# Step 3: Generate embeddings
logger.info("Generating embeddings for %d chunks...", len(chunks_to_embed))
texts = [chunk.content for chunk in chunks_to_embed]
embeddings = self.embedder.embed(texts)
# Step 4: Store in vector store
indexed_count = 0
for chunk, embedding, file_path in zip(chunks_to_embed, embeddings, file_paths):
chunk.embedding = embedding
try:
self.vector_store.add_chunk(chunk, file_path)
indexed_count += 1
except Exception as e:
logger.debug("Failed to store chunk for %s: %s", file_path, e)
logger.info("Successfully indexed %d/%d files", indexed_count, len(files))
return indexed_count
def _create_embeddable_text(
self,
metadata: SemanticMetadata,
file_data: FileData,
) -> str:
"""Create text optimized for embedding from LLM metadata.
Combines summary, keywords, and purpose into a single string
that will produce good semantic matches for natural language queries.
"""
parts = []
# Summary is the primary content
if metadata.summary:
parts.append(metadata.summary)
# Purpose adds categorical context
if metadata.purpose:
parts.append(f"Category: {metadata.purpose}")
# Keywords expand search coverage
if metadata.keywords:
parts.append(f"Keywords: {', '.join(metadata.keywords)}")
# Add file name for context
parts.append(f"File: {Path(file_data.path).name}")
return "\n".join(parts)
def _index_raw_code(self, files: List[FileData]) -> int:
"""Fallback: index raw code without LLM enhancement."""
indexed_count = 0
for file_data in files:
# Truncate to reasonable size
content = file_data.content[:2000]
chunk = SemanticChunk(
content=content,
embedding=None,
metadata={
"file": file_data.path,
"language": file_data.language,
"strategy": "raw_code",
},
)
try:
embedding = self.embedder.embed_single(content)
chunk.embedding = embedding
self.vector_store.add_chunk(chunk, file_data.path)
indexed_count += 1
except Exception as e:
logger.debug("Failed to index %s: %s", file_data.path, e)
return indexed_count
def create_enhanced_indexer(
vector_store_path: Path,
llm_tool: str = "gemini",
llm_enabled: bool = True,
) -> EnhancedSemanticIndexer:
"""Factory function to create an enhanced semantic indexer.
Args:
vector_store_path: Path for the vector store database
llm_tool: LLM tool to use (gemini, qwen)
llm_enabled: Whether to enable LLM enhancement
Returns:
Configured EnhancedSemanticIndexer instance
"""
from .embedder import Embedder
from .vector_store import VectorStore
enhancer = create_enhancer(tool=llm_tool, enabled=llm_enabled)
embedder = Embedder()
vector_store = VectorStore(vector_store_path)
return EnhancedSemanticIndexer(enhancer, embedder, vector_store)

View File

@@ -347,6 +347,222 @@ class DirIndexStore:
row = conn.execute("SELECT COUNT(*) AS c FROM files").fetchone()
return int(row["c"]) if row else 0
# === Semantic Metadata ===
def add_semantic_metadata(
self,
file_id: int,
summary: str,
keywords: List[str],
purpose: str,
llm_tool: str
) -> None:
"""Add or update semantic metadata for a file.
Args:
file_id: File ID from files table
summary: LLM-generated summary
keywords: List of keywords
purpose: Purpose/role of the file
llm_tool: Tool used to generate metadata (gemini/qwen)
"""
with self._lock:
conn = self._get_connection()
import json
import time
keywords_json = json.dumps(keywords)
generated_at = time.time()
conn.execute(
"""
INSERT INTO semantic_metadata(file_id, summary, keywords, purpose, llm_tool, generated_at)
VALUES(?, ?, ?, ?, ?, ?)
ON CONFLICT(file_id) DO UPDATE SET
summary=excluded.summary,
keywords=excluded.keywords,
purpose=excluded.purpose,
llm_tool=excluded.llm_tool,
generated_at=excluded.generated_at
""",
(file_id, summary, keywords_json, purpose, llm_tool, generated_at),
)
conn.commit()
def get_semantic_metadata(self, file_id: int) -> Optional[Dict[str, Any]]:
"""Get semantic metadata for a file.
Args:
file_id: File ID from files table
Returns:
Dict with summary, keywords, purpose, llm_tool, generated_at, or None if not found
"""
with self._lock:
conn = self._get_connection()
row = conn.execute(
"""
SELECT summary, keywords, purpose, llm_tool, generated_at
FROM semantic_metadata WHERE file_id=?
""",
(file_id,),
).fetchone()
if not row:
return None
import json
return {
"summary": row["summary"],
"keywords": json.loads(row["keywords"]) if row["keywords"] else [],
"purpose": row["purpose"],
"llm_tool": row["llm_tool"],
"generated_at": float(row["generated_at"]) if row["generated_at"] else 0.0,
}
def get_files_without_semantic(self) -> List[FileEntry]:
"""Get all files that don't have semantic metadata.
Returns:
List of FileEntry objects without semantic metadata
"""
with self._lock:
conn = self._get_connection()
rows = conn.execute(
"""
SELECT f.id, f.name, f.full_path, f.language, f.mtime, f.line_count
FROM files f
LEFT JOIN semantic_metadata sm ON f.id = sm.file_id
WHERE sm.id IS NULL
ORDER BY f.name
"""
).fetchall()
return [
FileEntry(
id=int(row["id"]),
name=row["name"],
full_path=Path(row["full_path"]),
language=row["language"],
mtime=float(row["mtime"]) if row["mtime"] else 0.0,
line_count=int(row["line_count"]) if row["line_count"] else 0,
)
for row in rows
]
def search_semantic_keywords(self, keyword: str) -> List[Tuple[FileEntry, List[str]]]:
"""Search files by semantic keywords.
Args:
keyword: Keyword to search for (case-insensitive)
Returns:
List of (FileEntry, keywords) tuples where keyword matches
"""
with self._lock:
conn = self._get_connection()
keyword_pattern = f"%{keyword}%"
rows = conn.execute(
"""
SELECT f.id, f.name, f.full_path, f.language, f.mtime, f.line_count, sm.keywords
FROM files f
JOIN semantic_metadata sm ON f.id = sm.file_id
WHERE sm.keywords LIKE ? COLLATE NOCASE
ORDER BY f.name
""",
(keyword_pattern,),
).fetchall()
import json
results = []
for row in rows:
file_entry = FileEntry(
id=int(row["id"]),
name=row["name"],
full_path=Path(row["full_path"]),
language=row["language"],
mtime=float(row["mtime"]) if row["mtime"] else 0.0,
line_count=int(row["line_count"]) if row["line_count"] else 0,
)
keywords = json.loads(row["keywords"]) if row["keywords"] else []
results.append((file_entry, keywords))
return results
def list_semantic_metadata(
self,
offset: int = 0,
limit: int = 50,
llm_tool: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
"""List all semantic metadata with file information.
Args:
offset: Number of records to skip (for pagination)
limit: Maximum records to return (max 100)
llm_tool: Optional filter by LLM tool used
Returns:
Tuple of (list of metadata dicts, total count)
"""
import json
with self._lock:
conn = self._get_connection()
base_query = """
SELECT f.id as file_id, f.name as file_name, f.full_path,
f.language, f.line_count,
sm.summary, sm.keywords, sm.purpose,
sm.llm_tool, sm.generated_at
FROM files f
JOIN semantic_metadata sm ON f.id = sm.file_id
"""
count_query = """
SELECT COUNT(*) as total
FROM files f
JOIN semantic_metadata sm ON f.id = sm.file_id
"""
params: List[Any] = []
if llm_tool:
base_query += " WHERE sm.llm_tool = ?"
count_query += " WHERE sm.llm_tool = ?"
params.append(llm_tool)
base_query += " ORDER BY sm.generated_at DESC LIMIT ? OFFSET ?"
params.extend([min(limit, 100), offset])
count_params = [llm_tool] if llm_tool else []
total_row = conn.execute(count_query, count_params).fetchone()
total = int(total_row["total"]) if total_row else 0
rows = conn.execute(base_query, params).fetchall()
results = []
for row in rows:
results.append({
"file_id": int(row["file_id"]),
"file_name": row["file_name"],
"full_path": row["full_path"],
"language": row["language"],
"line_count": int(row["line_count"]) if row["line_count"] else 0,
"summary": row["summary"],
"keywords": json.loads(row["keywords"]) if row["keywords"] else [],
"purpose": row["purpose"],
"llm_tool": row["llm_tool"],
"generated_at": float(row["generated_at"]) if row["generated_at"] else 0.0,
})
return results, total
# === Subdirectory Links ===
def register_subdir(
@@ -748,12 +964,28 @@ class DirIndexStore:
"""
)
# Semantic metadata table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS semantic_metadata (
id INTEGER PRIMARY KEY,
file_id INTEGER UNIQUE REFERENCES files(id) ON DELETE CASCADE,
summary TEXT,
keywords TEXT,
purpose TEXT,
llm_tool TEXT,
generated_at REAL
)
"""
)
# Indexes
conn.execute("CREATE INDEX IF NOT EXISTS idx_files_name ON files(name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_files_path ON files(full_path)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_subdirs_name ON subdirs(name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_file ON symbols(file_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_semantic_file ON semantic_metadata(file_id)")
except sqlite3.DatabaseError as exc:
raise StorageError(f"Failed to create schema: {exc}") from exc

View File

@@ -0,0 +1,831 @@
"""Tests for LLM-based semantic enhancement functionality.
Tests cover:
- LLMConfig and data classes
- LLMEnhancer initialization and configuration
- Prompt building and JSON parsing
- Batch processing logic
- CCW CLI invocation (mocked)
- EnhancedSemanticIndexer integration
- Error handling and fallback behavior
"""
import json
import tempfile
from pathlib import Path
from typing import Dict, Any
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
from codexlens.entities import SemanticChunk, Symbol
from codexlens.semantic.llm_enhancer import (
SemanticMetadata,
FileData,
LLMConfig,
LLMEnhancer,
EnhancedSemanticIndexer,
create_enhancer,
create_enhanced_indexer,
)
# === Data Class Tests ===
class TestSemanticMetadata:
"""Tests for SemanticMetadata dataclass."""
def test_basic_creation(self):
"""Test creating SemanticMetadata with required fields."""
metadata = SemanticMetadata(
summary="Authentication handler",
keywords=["auth", "login", "jwt"],
purpose="auth",
)
assert metadata.summary == "Authentication handler"
assert metadata.keywords == ["auth", "login", "jwt"]
assert metadata.purpose == "auth"
assert metadata.file_path is None
assert metadata.symbol_name is None
assert metadata.llm_tool is None
def test_full_creation(self):
"""Test creating SemanticMetadata with all fields."""
metadata = SemanticMetadata(
summary="User login function",
keywords=["login", "user"],
purpose="auth",
file_path="/test/auth.py",
symbol_name="login",
llm_tool="gemini",
)
assert metadata.file_path == "/test/auth.py"
assert metadata.symbol_name == "login"
assert metadata.llm_tool == "gemini"
def test_empty_keywords(self):
"""Test creating SemanticMetadata with empty keywords."""
metadata = SemanticMetadata(
summary="Empty",
keywords=[],
purpose="",
)
assert metadata.keywords == []
class TestFileData:
"""Tests for FileData dataclass."""
def test_basic_creation(self):
"""Test creating FileData with required fields."""
data = FileData(
path="/test/file.py",
content="def hello(): pass",
language="python",
)
assert data.path == "/test/file.py"
assert data.content == "def hello(): pass"
assert data.language == "python"
assert data.symbols == []
def test_with_symbols(self):
"""Test creating FileData with symbols."""
symbols = [
Symbol(name="hello", kind="function", range=(1, 1)),
Symbol(name="MyClass", kind="class", range=(3, 10)),
]
data = FileData(
path="/test/file.py",
content="code",
language="python",
symbols=symbols,
)
assert len(data.symbols) == 2
assert data.symbols[0].name == "hello"
class TestLLMConfig:
"""Tests for LLMConfig dataclass."""
def test_default_values(self):
"""Test default configuration values."""
config = LLMConfig()
assert config.tool == "gemini"
assert config.fallback_tool == "qwen"
assert config.timeout_ms == 300000
assert config.batch_size == 5
assert config.max_content_chars == 8000
assert config.enabled is True
def test_custom_values(self):
"""Test custom configuration values."""
config = LLMConfig(
tool="qwen",
fallback_tool="gemini",
timeout_ms=600000,
batch_size=10,
max_content_chars=4000,
enabled=False,
)
assert config.tool == "qwen"
assert config.fallback_tool == "gemini"
assert config.timeout_ms == 600000
assert config.batch_size == 10
assert config.max_content_chars == 4000
assert config.enabled is False
@patch.dict("os.environ", {"CCW_CLI_SECONDARY_TOOL": "codex", "CCW_CLI_FALLBACK_TOOL": "gemini"})
def test_env_override(self):
"""Test environment variable override."""
config = LLMConfig()
assert config.tool == "codex"
assert config.fallback_tool == "gemini"
# === LLMEnhancer Tests ===
class TestLLMEnhancerInit:
"""Tests for LLMEnhancer initialization."""
def test_default_init(self):
"""Test default initialization."""
enhancer = LLMEnhancer()
assert enhancer.config is not None
assert enhancer.config.tool == "gemini"
assert enhancer._ccw_available is None
def test_custom_config(self):
"""Test initialization with custom config."""
config = LLMConfig(tool="qwen", batch_size=3)
enhancer = LLMEnhancer(config)
assert enhancer.config.tool == "qwen"
assert enhancer.config.batch_size == 3
class TestLLMEnhancerAvailability:
"""Tests for CCW CLI availability check."""
@patch("shutil.which")
def test_ccw_available(self, mock_which):
"""Test CCW available returns True."""
mock_which.return_value = "/usr/bin/ccw"
enhancer = LLMEnhancer()
result = enhancer.check_available()
assert result is True
assert enhancer._ccw_available is True
mock_which.assert_called_with("ccw")
@patch("shutil.which")
def test_ccw_not_available(self, mock_which):
"""Test CCW not available returns False."""
mock_which.return_value = None
enhancer = LLMEnhancer()
result = enhancer.check_available()
assert result is False
assert enhancer._ccw_available is False
@patch("shutil.which")
def test_ccw_availability_cached(self, mock_which):
"""Test availability result is cached."""
mock_which.return_value = "/usr/bin/ccw"
enhancer = LLMEnhancer()
# First call
enhancer.check_available()
# Second call
enhancer.check_available()
# which should only be called once
mock_which.assert_called_once()
class TestPromptBuilding:
"""Tests for prompt building."""
def test_build_single_file_prompt(self):
"""Test prompt building with single file."""
enhancer = LLMEnhancer()
files = [
FileData(
path="/test/auth.py",
content="def login(): pass",
language="python",
)
]
prompt = enhancer._build_batch_prompt(files)
assert "[FILE: /test/auth.py]" in prompt
assert "```python" in prompt
assert "def login(): pass" in prompt
assert "PURPOSE:" in prompt
assert "JSON format output" in prompt
def test_build_multiple_files_prompt(self):
"""Test prompt building with multiple files."""
enhancer = LLMEnhancer()
files = [
FileData(path="/test/a.py", content="def a(): pass", language="python"),
FileData(path="/test/b.js", content="function b() {}", language="javascript"),
]
prompt = enhancer._build_batch_prompt(files)
assert "[FILE: /test/a.py]" in prompt
assert "[FILE: /test/b.js]" in prompt
assert "```python" in prompt
assert "```javascript" in prompt
def test_build_prompt_truncates_long_content(self):
"""Test prompt truncates long content."""
config = LLMConfig(max_content_chars=100)
enhancer = LLMEnhancer(config)
long_content = "x" * 200
files = [FileData(path="/test/long.py", content=long_content, language="python")]
prompt = enhancer._build_batch_prompt(files)
assert "... [truncated]" in prompt
assert "x" * 200 not in prompt
class TestJSONParsing:
"""Tests for JSON response parsing."""
def test_parse_valid_response(self):
"""Test parsing valid JSON response."""
enhancer = LLMEnhancer()
response = json.dumps({
"files": {
"/test/auth.py": {
"summary": "Authentication handler",
"keywords": ["auth", "login"],
"purpose": "auth",
}
}
})
result = enhancer._parse_response(response, "gemini")
assert "/test/auth.py" in result
assert result["/test/auth.py"].summary == "Authentication handler"
assert result["/test/auth.py"].keywords == ["auth", "login"]
assert result["/test/auth.py"].purpose == "auth"
assert result["/test/auth.py"].llm_tool == "gemini"
def test_parse_response_with_markdown(self):
"""Test parsing response wrapped in markdown."""
enhancer = LLMEnhancer()
response = '''```json
{
"files": {
"/test/file.py": {
"summary": "Test file",
"keywords": ["test"],
"purpose": "test"
}
}
}
```'''
result = enhancer._parse_response(response, "qwen")
assert "/test/file.py" in result
assert result["/test/file.py"].summary == "Test file"
def test_parse_response_multiple_files(self):
"""Test parsing response with multiple files."""
enhancer = LLMEnhancer()
response = json.dumps({
"files": {
"/test/a.py": {"summary": "File A", "keywords": ["a"], "purpose": "util"},
"/test/b.py": {"summary": "File B", "keywords": ["b"], "purpose": "api"},
}
})
result = enhancer._parse_response(response, "gemini")
assert len(result) == 2
assert result["/test/a.py"].summary == "File A"
assert result["/test/b.py"].summary == "File B"
def test_parse_invalid_json(self):
"""Test parsing invalid JSON returns empty dict."""
enhancer = LLMEnhancer()
response = "not valid json at all"
result = enhancer._parse_response(response, "gemini")
assert result == {}
def test_parse_empty_response(self):
"""Test parsing empty response returns empty dict."""
enhancer = LLMEnhancer()
result = enhancer._parse_response("", "gemini")
assert result == {}
class TestJSONExtraction:
"""Tests for JSON extraction from mixed text."""
def test_extract_json_from_plain(self):
"""Test extracting JSON from plain text."""
enhancer = LLMEnhancer()
text = '{"key": "value"}'
result = enhancer._extract_json(text)
assert result == '{"key": "value"}'
def test_extract_json_from_markdown(self):
"""Test extracting JSON from markdown code block."""
enhancer = LLMEnhancer()
text = '''```json
{"key": "value"}
```'''
result = enhancer._extract_json(text)
assert result == '{"key": "value"}'
def test_extract_json_with_surrounding_text(self):
"""Test extracting JSON with surrounding text."""
enhancer = LLMEnhancer()
text = 'Here is the result: {"key": "value"} That is all.'
result = enhancer._extract_json(text)
assert result == '{"key": "value"}'
def test_extract_nested_json(self):
"""Test extracting nested JSON."""
enhancer = LLMEnhancer()
text = '{"outer": {"inner": "value"}}'
result = enhancer._extract_json(text)
assert '"outer"' in result
assert '"inner"' in result
def test_extract_no_json(self):
"""Test extracting from text without JSON."""
enhancer = LLMEnhancer()
text = "No JSON here at all"
result = enhancer._extract_json(text)
assert result is None
def test_extract_malformed_json(self):
"""Test extracting malformed JSON returns None."""
enhancer = LLMEnhancer()
text = '{"key": "value"' # Missing closing brace
result = enhancer._extract_json(text)
assert result is None
class TestEnhanceFiles:
"""Tests for enhance_files method."""
@patch.object(LLMEnhancer, "check_available", return_value=False)
def test_enhance_files_ccw_not_available(self, mock_check):
"""Test enhance_files returns empty when CCW not available."""
enhancer = LLMEnhancer()
files = [FileData(path="/test/a.py", content="code", language="python")]
result = enhancer.enhance_files(files)
assert result == {}
def test_enhance_files_disabled(self):
"""Test enhance_files returns empty when disabled."""
config = LLMConfig(enabled=False)
enhancer = LLMEnhancer(config)
files = [FileData(path="/test/a.py", content="code", language="python")]
result = enhancer.enhance_files(files)
assert result == {}
@patch.object(LLMEnhancer, "check_available", return_value=True)
def test_enhance_files_empty_list(self, mock_check):
"""Test enhance_files with empty list returns empty dict."""
enhancer = LLMEnhancer()
result = enhancer.enhance_files([])
assert result == {}
@patch.object(LLMEnhancer, "check_available", return_value=True)
@patch.object(LLMEnhancer, "_invoke_ccw_cli")
def test_enhance_files_success(self, mock_invoke, mock_check):
"""Test enhance_files successful processing."""
mock_invoke.return_value = {
"success": True,
"stdout": json.dumps({
"files": {
"/test/auth.py": {
"summary": "Auth module",
"keywords": ["auth"],
"purpose": "auth",
}
}
}),
"stderr": "",
"exit_code": 0,
}
enhancer = LLMEnhancer()
files = [FileData(path="/test/auth.py", content="def login(): pass", language="python")]
result = enhancer.enhance_files(files)
assert "/test/auth.py" in result
assert result["/test/auth.py"].summary == "Auth module"
@patch.object(LLMEnhancer, "check_available", return_value=True)
@patch.object(LLMEnhancer, "_invoke_ccw_cli")
def test_enhance_files_fallback(self, mock_invoke, mock_check):
"""Test enhance_files falls back to secondary tool."""
# First call fails, second succeeds
mock_invoke.side_effect = [
{"success": False, "stdout": "", "stderr": "error", "exit_code": 1},
{
"success": True,
"stdout": json.dumps({
"files": {
"/test/file.py": {
"summary": "Fallback result",
"keywords": ["fallback"],
"purpose": "util",
}
}
}),
"stderr": "",
"exit_code": 0,
},
]
enhancer = LLMEnhancer()
files = [FileData(path="/test/file.py", content="code", language="python")]
result = enhancer.enhance_files(files)
assert "/test/file.py" in result
assert result["/test/file.py"].summary == "Fallback result"
assert mock_invoke.call_count == 2
class TestEnhanceFile:
"""Tests for enhance_file single file method."""
@patch.object(LLMEnhancer, "enhance_files")
def test_enhance_file_success(self, mock_enhance_files):
"""Test enhance_file returns metadata on success."""
mock_enhance_files.return_value = {
"/test/auth.py": SemanticMetadata(
summary="Auth module",
keywords=["auth", "login"],
purpose="auth",
file_path="/test/auth.py",
llm_tool="gemini",
)
}
enhancer = LLMEnhancer()
result = enhancer.enhance_file("/test/auth.py", "def login(): pass", "python")
assert result.summary == "Auth module"
assert result.keywords == ["auth", "login"]
@patch.object(LLMEnhancer, "enhance_files")
def test_enhance_file_fallback_on_failure(self, mock_enhance_files):
"""Test enhance_file returns default metadata on failure."""
mock_enhance_files.return_value = {} # Enhancement failed
enhancer = LLMEnhancer()
result = enhancer.enhance_file("/test/file.py", "code", "python")
assert "python" in result.summary.lower()
assert "python" in result.keywords
assert result.purpose == "unknown"
class TestBatchProcessing:
"""Tests for batch processing."""
@patch.object(LLMEnhancer, "check_available", return_value=True)
@patch.object(LLMEnhancer, "_process_batch")
def test_batch_processing(self, mock_process, mock_check):
"""Test files are processed in batches."""
mock_process.return_value = {}
config = LLMConfig(batch_size=2)
enhancer = LLMEnhancer(config)
files = [
FileData(path=f"/test/file{i}.py", content="code", language="python")
for i in range(5)
]
enhancer.enhance_files(files)
# 5 files with batch_size=2 should result in 3 batches
assert mock_process.call_count == 3
@patch.object(LLMEnhancer, "check_available", return_value=True)
@patch.object(LLMEnhancer, "_process_batch")
def test_batch_continues_on_error(self, mock_process, mock_check):
"""Test batch processing continues on error."""
# First batch fails, second succeeds
mock_process.side_effect = [
Exception("Batch 1 failed"),
{"/test/file2.py": SemanticMetadata(summary="OK", keywords=[], purpose="")},
]
config = LLMConfig(batch_size=1)
enhancer = LLMEnhancer(config)
files = [
FileData(path="/test/file1.py", content="code", language="python"),
FileData(path="/test/file2.py", content="code", language="python"),
]
result = enhancer.enhance_files(files)
# Should still get results from second batch
assert "/test/file2.py" in result
# === CCW CLI Invocation Tests ===
class TestCCWInvocation:
"""Tests for CCW CLI invocation."""
@patch("subprocess.run")
@patch("shutil.which", return_value="/usr/bin/ccw")
def test_invoke_success(self, mock_which, mock_run):
"""Test successful CCW CLI invocation."""
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"files": {}}',
stderr="",
)
enhancer = LLMEnhancer()
result = enhancer._invoke_ccw_cli("test prompt", tool="gemini")
assert result["success"] is True
assert result["exit_code"] == 0
@patch("subprocess.run")
@patch("shutil.which", return_value="/usr/bin/ccw")
def test_invoke_failure(self, mock_which, mock_run):
"""Test failed CCW CLI invocation."""
mock_run.return_value = MagicMock(
returncode=1,
stdout="",
stderr="Error occurred",
)
enhancer = LLMEnhancer()
result = enhancer._invoke_ccw_cli("test prompt", tool="gemini")
assert result["success"] is False
assert result["exit_code"] == 1
@patch("subprocess.run")
@patch("shutil.which", return_value="/usr/bin/ccw")
def test_invoke_timeout(self, mock_which, mock_run):
"""Test CCW CLI timeout handling."""
import subprocess
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ccw", timeout=300)
enhancer = LLMEnhancer()
result = enhancer._invoke_ccw_cli("test prompt", tool="gemini")
assert result["success"] is False
assert "timeout" in result["stderr"]
@patch("subprocess.run")
@patch("shutil.which", return_value=None)
def test_invoke_ccw_not_found(self, mock_which, mock_run):
"""Test CCW CLI not found handling."""
mock_run.side_effect = FileNotFoundError()
enhancer = LLMEnhancer()
result = enhancer._invoke_ccw_cli("test prompt", tool="gemini")
assert result["success"] is False
assert "not found" in result["stderr"]
# === EnhancedSemanticIndexer Tests ===
class TestEnhancedSemanticIndexer:
"""Tests for EnhancedSemanticIndexer integration."""
@pytest.fixture
def mock_enhancer(self):
"""Create mock LLM enhancer."""
enhancer = MagicMock(spec=LLMEnhancer)
enhancer.enhance_files.return_value = {
"/test/auth.py": SemanticMetadata(
summary="Authentication handler",
keywords=["auth", "login", "jwt"],
purpose="auth",
file_path="/test/auth.py",
llm_tool="gemini",
)
}
return enhancer
@pytest.fixture
def mock_embedder(self):
"""Create mock embedder."""
embedder = MagicMock()
embedder.embed.return_value = [[0.1] * 384]
embedder.embed_single.return_value = [0.1] * 384
return embedder
@pytest.fixture
def mock_vector_store(self):
"""Create mock vector store."""
store = MagicMock()
store.add_chunk.return_value = 1
return store
def test_index_files_empty_list(self, mock_enhancer, mock_embedder, mock_vector_store):
"""Test indexing empty file list."""
indexer = EnhancedSemanticIndexer(mock_enhancer, mock_embedder, mock_vector_store)
result = indexer.index_files([])
assert result == 0
mock_enhancer.enhance_files.assert_not_called()
def test_index_files_with_llm_enhancement(self, mock_enhancer, mock_embedder, mock_vector_store):
"""Test indexing with LLM enhancement."""
indexer = EnhancedSemanticIndexer(mock_enhancer, mock_embedder, mock_vector_store)
files = [FileData(path="/test/auth.py", content="def login(): pass", language="python")]
result = indexer.index_files(files)
assert result == 1
mock_enhancer.enhance_files.assert_called_once()
mock_embedder.embed.assert_called_once()
mock_vector_store.add_chunk.assert_called_once()
def test_index_files_fallback_to_raw_code(self, mock_embedder, mock_vector_store):
"""Test indexing falls back to raw code when LLM fails."""
mock_enhancer = MagicMock(spec=LLMEnhancer)
mock_enhancer.enhance_files.return_value = {} # No enhancement
indexer = EnhancedSemanticIndexer(mock_enhancer, mock_embedder, mock_vector_store)
files = [FileData(path="/test/file.py", content="code", language="python")]
result = indexer.index_files(files)
assert result == 1
mock_embedder.embed_single.assert_called()
def test_create_embeddable_text(self, mock_enhancer, mock_embedder, mock_vector_store):
"""Test embeddable text creation."""
indexer = EnhancedSemanticIndexer(mock_enhancer, mock_embedder, mock_vector_store)
metadata = SemanticMetadata(
summary="Handles user authentication",
keywords=["auth", "login", "user"],
purpose="auth",
)
file_data = FileData(path="/test/auth.py", content="code", language="python")
text = indexer._create_embeddable_text(metadata, file_data)
assert "Handles user authentication" in text
assert "auth" in text.lower()
assert "Keywords:" in text
assert "auth.py" in text
# === Factory Function Tests ===
class TestFactoryFunctions:
"""Tests for factory functions."""
def test_create_enhancer_default(self):
"""Test create_enhancer with defaults."""
enhancer = create_enhancer()
assert enhancer.config.tool == "gemini"
assert enhancer.config.enabled is True
def test_create_enhancer_custom(self):
"""Test create_enhancer with custom params."""
enhancer = create_enhancer(
tool="qwen",
timeout_ms=600000,
batch_size=10,
enabled=False,
)
assert enhancer.config.tool == "qwen"
assert enhancer.config.timeout_ms == 600000
assert enhancer.config.batch_size == 10
assert enhancer.config.enabled is False
@pytest.mark.skipif(
not pytest.importorskip("codexlens.semantic", reason="semantic not available"),
reason="Semantic dependencies not installed"
)
def test_create_enhanced_indexer(self, tmp_path):
"""Test create_enhanced_indexer factory."""
try:
from codexlens.semantic import SEMANTIC_AVAILABLE
if not SEMANTIC_AVAILABLE:
pytest.skip("Semantic dependencies not installed")
db_path = tmp_path / "semantic.db"
indexer = create_enhanced_indexer(db_path, llm_tool="gemini", llm_enabled=False)
assert indexer.enhancer is not None
assert indexer.embedder is not None
assert indexer.vector_store is not None
except ImportError:
pytest.skip("Semantic dependencies not installed")
# === Edge Cases ===
class TestEdgeCases:
"""Tests for edge cases."""
def test_semantic_metadata_with_special_chars(self):
"""Test metadata with special characters."""
metadata = SemanticMetadata(
summary='Test "quoted" and \'single\' quotes',
keywords=["special", "chars", "test's"],
purpose="test",
)
assert '"quoted"' in metadata.summary
assert "test's" in metadata.keywords
def test_file_data_with_unicode(self):
"""Test FileData with unicode content."""
data = FileData(
path="/test/中文.py",
content="def 你好(): return '世界'",
language="python",
)
assert "中文" in data.path
assert "你好" in data.content
@patch.object(LLMEnhancer, "check_available", return_value=True)
@patch.object(LLMEnhancer, "_invoke_ccw_cli")
def test_enhance_with_very_long_content(self, mock_invoke, mock_check):
"""Test enhancement with very long content."""
mock_invoke.return_value = {
"success": True,
"stdout": json.dumps({"files": {}}),
"stderr": "",
"exit_code": 0,
}
config = LLMConfig(max_content_chars=100)
enhancer = LLMEnhancer(config)
long_content = "x" * 10000
files = [FileData(path="/test/long.py", content=long_content, language="python")]
enhancer.enhance_files(files)
# Should not crash, content should be truncated in prompt
mock_invoke.assert_called_once()
def test_parse_response_with_missing_fields(self):
"""Test parsing response with missing fields."""
enhancer = LLMEnhancer()
response = json.dumps({
"files": {
"/test/file.py": {
"summary": "Only summary provided",
# keywords and purpose missing
}
}
})
result = enhancer._parse_response(response, "gemini")
assert "/test/file.py" in result
assert result["/test/file.py"].summary == "Only summary provided"
assert result["/test/file.py"].keywords == []
assert result["/test/file.py"].purpose == ""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,747 @@
"""Full coverage tests for vector/semantic search functionality.
Tests cover:
- Embedder model loading and embedding generation
- VectorStore CRUD operations and caching
- Cosine similarity computation
- Semantic search accuracy and relevance
- Performance benchmarks
- Edge cases and error handling
- Thread safety and concurrent access
"""
import json
import tempfile
import threading
import time
from pathlib import Path
from typing import List
import pytest
from codexlens.entities import SemanticChunk, Symbol, SearchResult
from codexlens.semantic import SEMANTIC_AVAILABLE, check_semantic_available
# Skip all tests if semantic dependencies not available
pytestmark = pytest.mark.skipif(
not SEMANTIC_AVAILABLE,
reason="Semantic search dependencies not installed (pip install codexlens[semantic])"
)
# === Fixtures ===
@pytest.fixture
def temp_db(tmp_path):
"""Create temporary database path."""
return tmp_path / "test_semantic.db"
@pytest.fixture
def embedder():
"""Create Embedder instance."""
from codexlens.semantic.embedder import Embedder
return Embedder()
@pytest.fixture
def vector_store(temp_db):
"""Create VectorStore instance."""
from codexlens.semantic.vector_store import VectorStore
return VectorStore(temp_db)
@pytest.fixture
def sample_code_chunks():
"""Sample code chunks for testing."""
return [
{
"content": "def authenticate(username, password): return check_credentials(username, password)",
"metadata": {"symbol_name": "authenticate", "symbol_kind": "function", "start_line": 1, "end_line": 1, "language": "python"},
},
{
"content": "class DatabaseConnection:\n def connect(self, host, port): pass\n def execute(self, query): pass",
"metadata": {"symbol_name": "DatabaseConnection", "symbol_kind": "class", "start_line": 1, "end_line": 3, "language": "python"},
},
{
"content": "async function fetchUserData(userId) { return await api.get('/users/' + userId); }",
"metadata": {"symbol_name": "fetchUserData", "symbol_kind": "function", "start_line": 1, "end_line": 1, "language": "javascript"},
},
{
"content": "def calculate_sum(numbers): return sum(numbers)",
"metadata": {"symbol_name": "calculate_sum", "symbol_kind": "function", "start_line": 1, "end_line": 1, "language": "python"},
},
{
"content": "class UserProfile:\n def __init__(self, name, email):\n self.name = name\n self.email = email",
"metadata": {"symbol_name": "UserProfile", "symbol_kind": "class", "start_line": 1, "end_line": 4, "language": "python"},
},
]
# === Embedder Tests ===
class TestEmbedder:
"""Tests for Embedder class."""
def test_embedder_initialization(self, embedder):
"""Test embedder initializes correctly."""
assert embedder.model_name == "BAAI/bge-small-en-v1.5"
assert embedder.EMBEDDING_DIM == 384
assert embedder._model is None # Lazy loading
def test_embed_single_returns_correct_dimension(self, embedder):
"""Test single embedding has correct dimension."""
text = "def hello(): print('world')"
embedding = embedder.embed_single(text)
assert isinstance(embedding, list)
assert len(embedding) == 384
assert all(isinstance(x, float) for x in embedding)
def test_embed_batch_returns_correct_count(self, embedder):
"""Test batch embedding returns correct number of embeddings."""
texts = [
"def foo(): pass",
"def bar(): pass",
"def baz(): pass",
]
embeddings = embedder.embed(texts)
assert len(embeddings) == len(texts)
assert all(len(e) == 384 for e in embeddings)
def test_embed_empty_string(self, embedder):
"""Test embedding empty string."""
embedding = embedder.embed_single("")
assert len(embedding) == 384
def test_embed_unicode_text(self, embedder):
"""Test embedding unicode text."""
text = "def 你好(): return '世界'"
embedding = embedder.embed_single(text)
assert len(embedding) == 384
def test_embed_long_text(self, embedder):
"""Test embedding long text."""
text = "def process(): pass\n" * 100
embedding = embedder.embed_single(text)
assert len(embedding) == 384
def test_embed_special_characters(self, embedder):
"""Test embedding text with special characters."""
text = "def test(): return {'key': 'value', '@decorator': True}"
embedding = embedder.embed_single(text)
assert len(embedding) == 384
def test_lazy_model_loading(self, embedder):
"""Test model loads lazily on first embed call."""
assert embedder._model is None
embedder.embed_single("test")
assert embedder._model is not None
def test_model_reuse(self, embedder):
"""Test model is reused across multiple calls."""
embedder.embed_single("test1")
model_ref = embedder._model
embedder.embed_single("test2")
assert embedder._model is model_ref # Same instance
class TestEmbeddingSimilarity:
"""Tests for embedding similarity."""
def test_identical_text_similarity(self, embedder):
"""Test identical text has similarity ~1.0."""
from codexlens.semantic.vector_store import _cosine_similarity
text = "def calculate_sum(a, b): return a + b"
emb1 = embedder.embed_single(text)
emb2 = embedder.embed_single(text)
similarity = _cosine_similarity(emb1, emb2)
assert similarity > 0.99, "Identical text should have ~1.0 similarity"
def test_similar_code_high_similarity(self, embedder):
"""Test similar code has high similarity."""
from codexlens.semantic.vector_store import _cosine_similarity
code1 = "def add(a, b): return a + b"
code2 = "def sum_numbers(x, y): return x + y"
emb1 = embedder.embed_single(code1)
emb2 = embedder.embed_single(code2)
similarity = _cosine_similarity(emb1, emb2)
assert similarity > 0.6, "Similar functions should have high similarity"
def test_different_code_lower_similarity(self, embedder):
"""Test different code has lower similarity than similar code."""
from codexlens.semantic.vector_store import _cosine_similarity
code1 = "def add(a, b): return a + b"
code2 = "def sum_numbers(x, y): return x + y"
code3 = "class UserAuth: def login(self, user, pwd): pass"
emb1 = embedder.embed_single(code1)
emb2 = embedder.embed_single(code2)
emb3 = embedder.embed_single(code3)
sim_similar = _cosine_similarity(emb1, emb2)
sim_different = _cosine_similarity(emb1, emb3)
assert sim_similar > sim_different, "Similar code should have higher similarity"
def test_zero_vector_similarity(self):
"""Test cosine similarity with zero vector."""
from codexlens.semantic.vector_store import _cosine_similarity
zero_vec = [0.0] * 384
normal_vec = [1.0] * 384
similarity = _cosine_similarity(zero_vec, normal_vec)
assert similarity == 0.0, "Zero vector should have 0 similarity"
# === VectorStore Tests ===
class TestVectorStoreCRUD:
"""Tests for VectorStore CRUD operations."""
def test_add_chunk(self, vector_store, embedder):
"""Test adding a single chunk."""
chunk = SemanticChunk(
content="def test(): pass",
metadata={"language": "python"},
)
chunk.embedding = embedder.embed_single(chunk.content)
chunk_id = vector_store.add_chunk(chunk, "/test/file.py")
assert chunk_id > 0
assert vector_store.count_chunks() == 1
def test_add_chunk_without_embedding_raises(self, vector_store):
"""Test adding chunk without embedding raises error."""
chunk = SemanticChunk(content="def test(): pass", metadata={})
with pytest.raises(ValueError, match="must have embedding"):
vector_store.add_chunk(chunk, "/test/file.py")
def test_add_chunks_batch(self, vector_store, embedder, sample_code_chunks):
"""Test batch adding chunks."""
chunks = []
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
chunks.append(chunk)
ids = vector_store.add_chunks(chunks, "/test/multi.py")
assert len(ids) == len(chunks)
assert vector_store.count_chunks() == len(chunks)
def test_add_empty_batch(self, vector_store):
"""Test adding empty batch returns empty list."""
ids = vector_store.add_chunks([], "/test/empty.py")
assert ids == []
def test_delete_file_chunks(self, vector_store, embedder):
"""Test deleting chunks by file path."""
# Add chunks for two files
chunk1 = SemanticChunk(content="def a(): pass", metadata={})
chunk1.embedding = embedder.embed_single(chunk1.content)
vector_store.add_chunk(chunk1, "/test/file1.py")
chunk2 = SemanticChunk(content="def b(): pass", metadata={})
chunk2.embedding = embedder.embed_single(chunk2.content)
vector_store.add_chunk(chunk2, "/test/file2.py")
assert vector_store.count_chunks() == 2
# Delete one file's chunks
deleted = vector_store.delete_file_chunks("/test/file1.py")
assert deleted == 1
assert vector_store.count_chunks() == 1
def test_delete_nonexistent_file(self, vector_store):
"""Test deleting non-existent file returns 0."""
deleted = vector_store.delete_file_chunks("/nonexistent/file.py")
assert deleted == 0
def test_count_chunks_empty(self, vector_store):
"""Test count on empty store."""
assert vector_store.count_chunks() == 0
class TestVectorStoreSearch:
"""Tests for VectorStore search functionality."""
def test_search_similar_basic(self, vector_store, embedder, sample_code_chunks):
"""Test basic similarity search."""
# Add chunks
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
# Search
query = "function to authenticate user login"
query_embedding = embedder.embed_single(query)
results = vector_store.search_similar(query_embedding, top_k=3)
assert len(results) > 0
assert all(isinstance(r, SearchResult) for r in results)
# Top result should be auth-related
assert "authenticate" in results[0].excerpt.lower() or "auth" in results[0].path.lower()
def test_search_respects_top_k(self, vector_store, embedder, sample_code_chunks):
"""Test search respects top_k parameter."""
# Add all chunks
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
query_embedding = embedder.embed_single("code")
results_2 = vector_store.search_similar(query_embedding, top_k=2)
results_5 = vector_store.search_similar(query_embedding, top_k=5)
assert len(results_2) <= 2
assert len(results_5) <= 5
def test_search_min_score_filtering(self, vector_store, embedder):
"""Test min_score filtering."""
chunk = SemanticChunk(
content="def hello(): print('hello world')",
metadata={},
)
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/hello.py")
query_embedding = embedder.embed_single("database connection pool")
results_no_filter = vector_store.search_similar(query_embedding, min_score=0.0)
results_high_filter = vector_store.search_similar(query_embedding, min_score=0.9)
assert len(results_no_filter) >= len(results_high_filter)
def test_search_returns_sorted_by_score(self, vector_store, embedder, sample_code_chunks):
"""Test results are sorted by score descending."""
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
query_embedding = embedder.embed_single("function")
results = vector_store.search_similar(query_embedding, top_k=5)
if len(results) > 1:
for i in range(len(results) - 1):
assert results[i].score >= results[i + 1].score
def test_search_includes_metadata(self, vector_store, embedder):
"""Test search results include metadata."""
chunk = SemanticChunk(
content="def test_function(): pass",
metadata={
"symbol_name": "test_function",
"symbol_kind": "function",
"start_line": 10,
"end_line": 15,
},
)
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/func.py")
query_embedding = embedder.embed_single("test function")
results = vector_store.search_similar(query_embedding, top_k=1)
assert len(results) == 1
assert results[0].symbol_name == "test_function"
assert results[0].symbol_kind == "function"
assert results[0].start_line == 10
assert results[0].end_line == 15
def test_search_empty_store_returns_empty(self, vector_store, embedder):
"""Test search on empty store returns empty list."""
query_embedding = embedder.embed_single("anything")
results = vector_store.search_similar(query_embedding)
assert results == []
def test_search_with_return_full_content_false(self, vector_store, embedder):
"""Test search with return_full_content=False."""
chunk = SemanticChunk(
content="def long_function(): " + "pass\n" * 100,
metadata={},
)
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/long.py")
query_embedding = embedder.embed_single("function")
results = vector_store.search_similar(
query_embedding, top_k=1, return_full_content=False
)
assert len(results) == 1
assert results[0].content is None
assert results[0].excerpt is not None
class TestVectorStoreCache:
"""Tests for VectorStore caching behavior."""
def test_cache_invalidation_on_add(self, vector_store, embedder):
"""Test cache is invalidated when chunks are added."""
chunk1 = SemanticChunk(content="def a(): pass", metadata={})
chunk1.embedding = embedder.embed_single(chunk1.content)
vector_store.add_chunk(chunk1, "/test/a.py")
# Trigger cache population
query_embedding = embedder.embed_single("function")
vector_store.search_similar(query_embedding)
initial_version = vector_store._cache_version
# Add another chunk
chunk2 = SemanticChunk(content="def b(): pass", metadata={})
chunk2.embedding = embedder.embed_single(chunk2.content)
vector_store.add_chunk(chunk2, "/test/b.py")
assert vector_store._cache_version > initial_version
assert vector_store._embedding_matrix is None
def test_cache_invalidation_on_delete(self, vector_store, embedder):
"""Test cache is invalidated when chunks are deleted."""
chunk = SemanticChunk(content="def a(): pass", metadata={})
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/a.py")
# Trigger cache population
query_embedding = embedder.embed_single("function")
vector_store.search_similar(query_embedding)
initial_version = vector_store._cache_version
# Delete chunk
vector_store.delete_file_chunks("/test/a.py")
assert vector_store._cache_version > initial_version
def test_manual_cache_clear(self, vector_store, embedder):
"""Test manual cache clearing."""
chunk = SemanticChunk(content="def a(): pass", metadata={})
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/a.py")
# Trigger cache population
query_embedding = embedder.embed_single("function")
vector_store.search_similar(query_embedding)
assert vector_store._embedding_matrix is not None
vector_store.clear_cache()
assert vector_store._embedding_matrix is None
# === Semantic Search Accuracy Tests ===
class TestSemanticSearchAccuracy:
"""Tests for semantic search accuracy and relevance."""
def test_auth_query_finds_auth_code(self, vector_store, embedder, sample_code_chunks):
"""Test authentication query finds auth code."""
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
query = "user authentication login"
query_embedding = embedder.embed_single(query)
results = vector_store.search_similar(query_embedding, top_k=1)
assert len(results) > 0
assert "authenticate" in results[0].excerpt.lower()
def test_database_query_finds_db_code(self, vector_store, embedder, sample_code_chunks):
"""Test database query finds database code."""
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
query = "database connection execute query"
query_embedding = embedder.embed_single(query)
results = vector_store.search_similar(query_embedding, top_k=1)
assert len(results) > 0
assert "database" in results[0].excerpt.lower() or "connect" in results[0].excerpt.lower()
def test_math_query_finds_calculation_code(self, vector_store, embedder, sample_code_chunks):
"""Test math query finds calculation code."""
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
query = "sum numbers add calculation"
query_embedding = embedder.embed_single(query)
results = vector_store.search_similar(query_embedding, top_k=1)
assert len(results) > 0
assert "sum" in results[0].excerpt.lower() or "calculate" in results[0].excerpt.lower()
# === Performance Tests ===
class TestVectorSearchPerformance:
"""Performance tests for vector search."""
def test_embedding_performance(self, embedder):
"""Test embedding generation performance."""
text = "def calculate_sum(a, b): return a + b"
# Warm up
embedder.embed_single(text)
# Measure
start = time.perf_counter()
iterations = 10
for _ in range(iterations):
embedder.embed_single(text)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / iterations) * 1000
assert avg_ms < 100, f"Single embedding should be <100ms, got {avg_ms:.2f}ms"
def test_batch_embedding_performance(self, embedder):
"""Test batch embedding performance."""
texts = [f"def function_{i}(): pass" for i in range(50)]
# Warm up
embedder.embed(texts[:5])
# Measure
start = time.perf_counter()
embedder.embed(texts)
elapsed = time.perf_counter() - start
total_ms = elapsed * 1000
per_text_ms = total_ms / len(texts)
assert per_text_ms < 20, f"Per-text embedding should be <20ms, got {per_text_ms:.2f}ms"
def test_search_performance_small(self, vector_store, embedder):
"""Test search performance with small dataset."""
# Add 100 chunks
for i in range(100):
chunk = SemanticChunk(
content=f"def function_{i}(): return {i}",
metadata={"index": i},
)
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, f"/test/file_{i}.py")
query_embedding = embedder.embed_single("function return value")
# Warm up
vector_store.search_similar(query_embedding)
# Measure
start = time.perf_counter()
iterations = 10
for _ in range(iterations):
vector_store.search_similar(query_embedding)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / iterations) * 1000
assert avg_ms < 50, f"Search with 100 chunks should be <50ms, got {avg_ms:.2f}ms"
def test_search_performance_medium(self, vector_store, embedder):
"""Test search performance with medium dataset."""
# Add 500 chunks in batch
chunks = []
for i in range(500):
chunk = SemanticChunk(
content=f"def function_{i}(x): return x * {i}",
metadata={"index": i},
)
chunk.embedding = embedder.embed_single(chunk.content)
chunks.append(chunk)
vector_store.add_chunks(chunks, "/test/bulk.py")
query_embedding = embedder.embed_single("multiply value")
# Warm up
vector_store.search_similar(query_embedding)
# Measure
start = time.perf_counter()
iterations = 5
for _ in range(iterations):
vector_store.search_similar(query_embedding)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / iterations) * 1000
assert avg_ms < 100, f"Search with 500 chunks should be <100ms, got {avg_ms:.2f}ms"
# === Thread Safety Tests ===
class TestThreadSafety:
"""Tests for thread safety."""
def test_concurrent_searches(self, vector_store, embedder, sample_code_chunks):
"""Test concurrent searches are thread-safe."""
# Populate store
for data in sample_code_chunks:
chunk = SemanticChunk(content=data["content"], metadata=data["metadata"])
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
results_list = []
errors = []
def search_task(query):
try:
query_embedding = embedder.embed_single(query)
results = vector_store.search_similar(query_embedding, top_k=3)
results_list.append(len(results))
except Exception as e:
errors.append(str(e))
queries = ["authentication", "database", "function", "class", "async"]
threads = [threading.Thread(target=search_task, args=(q,)) for q in queries]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Errors during concurrent search: {errors}"
assert len(results_list) == len(queries)
def test_concurrent_add_and_search(self, vector_store, embedder):
"""Test concurrent add and search operations."""
errors = []
def add_task(idx):
try:
chunk = SemanticChunk(
content=f"def task_{idx}(): pass",
metadata={"idx": idx},
)
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, f"/test/task_{idx}.py")
except Exception as e:
errors.append(f"Add error: {e}")
def search_task():
try:
query_embedding = embedder.embed_single("function task")
vector_store.search_similar(query_embedding)
except Exception as e:
errors.append(f"Search error: {e}")
threads = []
for i in range(10):
threads.append(threading.Thread(target=add_task, args=(i,)))
threads.append(threading.Thread(target=search_task))
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Errors during concurrent ops: {errors}"
# === Edge Cases ===
class TestEdgeCases:
"""Tests for edge cases."""
def test_very_short_content(self, vector_store, embedder):
"""Test handling very short content."""
chunk = SemanticChunk(content="x", metadata={})
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/short.py")
query_embedding = embedder.embed_single("x")
results = vector_store.search_similar(query_embedding)
assert len(results) == 1
def test_special_characters_in_path(self, vector_store, embedder):
"""Test handling special characters in file path."""
chunk = SemanticChunk(content="def test(): pass", metadata={})
chunk.embedding = embedder.embed_single(chunk.content)
special_path = "/test/path with spaces/file-name_v2.py"
vector_store.add_chunk(chunk, special_path)
query_embedding = embedder.embed_single("test function")
results = vector_store.search_similar(query_embedding)
assert len(results) == 1
assert results[0].path == special_path
def test_json_metadata_special_chars(self, vector_store, embedder):
"""Test metadata with special JSON characters."""
metadata = {
"description": 'Test "quoted" text with \'single\' quotes',
"path": "C:\\Users\\test\\file.py",
"tags": ["tag1", "tag2"],
}
chunk = SemanticChunk(content="def test(): pass", metadata=metadata)
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/special.py")
query_embedding = embedder.embed_single("test")
results = vector_store.search_similar(query_embedding)
assert len(results) == 1
assert results[0].metadata["description"] == metadata["description"]
def test_search_zero_top_k(self, vector_store, embedder):
"""Test search with top_k=0."""
chunk = SemanticChunk(content="def test(): pass", metadata={})
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/file.py")
query_embedding = embedder.embed_single("test")
results = vector_store.search_similar(query_embedding, top_k=0)
assert results == []
def test_search_very_high_min_score(self, vector_store, embedder):
"""Test search with very high min_score filters all results."""
chunk = SemanticChunk(content="def hello(): print('world')", metadata={})
chunk.embedding = embedder.embed_single(chunk.content)
vector_store.add_chunk(chunk, "/test/hello.py")
# Query something unrelated with very high threshold
query_embedding = embedder.embed_single("database connection")
results = vector_store.search_similar(query_embedding, min_score=0.99)
# Should filter out since unrelated
assert len(results) == 0
# === Availability Check Tests ===
class TestAvailabilityCheck:
"""Tests for semantic availability checking."""
def test_check_semantic_available(self):
"""Test check_semantic_available function."""
available, error = check_semantic_available()
assert available is True
assert error is None
def test_semantic_available_flag(self):
"""Test SEMANTIC_AVAILABLE flag is True when deps installed."""
assert SEMANTIC_AVAILABLE is True