diff --git a/codex-lens/src/codexlens/search/__init__.py b/codex-lens/src/codexlens/search/__init__.py new file mode 100644 index 00000000..25daa0de --- /dev/null +++ b/codex-lens/src/codexlens/search/__init__.py @@ -0,0 +1,15 @@ +from .chain_search import ( + ChainSearchEngine, + SearchOptions, + SearchStats, + ChainSearchResult, + quick_search, +) + +__all__ = [ + "ChainSearchEngine", + "SearchOptions", + "SearchStats", + "ChainSearchResult", + "quick_search", +] diff --git a/codex-lens/src/codexlens/search/chain_search.py b/codex-lens/src/codexlens/search/chain_search.py new file mode 100644 index 00000000..588d56d4 --- /dev/null +++ b/codex-lens/src/codexlens/search/chain_search.py @@ -0,0 +1,566 @@ +"""Chain search engine for recursive multi-directory searching. + +Provides parallel search across directory hierarchies using indexed _index.db files. +Supports depth-limited traversal, result aggregation, and symbol search. +""" + +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional, Dict, Any +import logging +import time + +from codexlens.entities import SearchResult, Symbol +from codexlens.storage.registry import RegistryStore, DirMapping +from codexlens.storage.dir_index import DirIndexStore, SubdirLink +from codexlens.storage.path_mapper import PathMapper + + +@dataclass +class SearchOptions: + """Configuration options for chain search. + + Attributes: + depth: Maximum search depth (-1 = unlimited, 0 = current dir only) + max_workers: Number of parallel worker threads + limit_per_dir: Maximum results per directory + total_limit: Total result limit across all directories + include_symbols: Whether to include symbol search results + files_only: Return only file paths without excerpts + """ + depth: int = -1 + max_workers: int = 8 + limit_per_dir: int = 10 + total_limit: int = 100 + include_symbols: bool = False + files_only: bool = False + + +@dataclass +class SearchStats: + """Statistics collected during search execution. + + Attributes: + dirs_searched: Number of directories searched + files_matched: Number of files with matches + time_ms: Total search time in milliseconds + errors: List of error messages encountered + """ + dirs_searched: int = 0 + files_matched: int = 0 + time_ms: float = 0 + errors: List[str] = field(default_factory=list) + + +@dataclass +class ChainSearchResult: + """Comprehensive search result with metadata. + + Attributes: + query: Original search query + results: List of SearchResult objects + symbols: List of Symbol objects (if include_symbols=True) + stats: SearchStats with execution metrics + """ + query: str + results: List[SearchResult] + symbols: List[Symbol] + stats: SearchStats + + +class ChainSearchEngine: + """Parallel chain search engine for hierarchical directory indexes. + + Searches across multiple directory indexes in parallel, following subdirectory + links to recursively traverse the file tree. Supports depth limits, result + aggregation, and both content and symbol searches. + + Thread-safe with configurable parallelism. + + Attributes: + registry: Global project registry + mapper: Path mapping utility + logger: Python logger instance + """ + + def __init__(self, + registry: RegistryStore, + mapper: PathMapper, + max_workers: int = 8): + """Initialize chain search engine. + + Args: + registry: Global project registry for path lookups + mapper: Path mapper for source/index conversions + max_workers: Maximum parallel workers (default 8) + """ + self.registry = registry + self.mapper = mapper + self.logger = logging.getLogger(__name__) + self._max_workers = max_workers + self._executor: Optional[ThreadPoolExecutor] = None + + def _get_executor(self, max_workers: Optional[int] = None) -> ThreadPoolExecutor: + """Get or create the shared thread pool executor. + + Lazy initialization to avoid creating executor if never used. + + Args: + max_workers: Override default max_workers if specified + + Returns: + ThreadPoolExecutor instance + """ + workers = max_workers or self._max_workers + if self._executor is None: + self._executor = ThreadPoolExecutor(max_workers=workers) + return self._executor + + def close(self) -> None: + """Shutdown the thread pool executor.""" + if self._executor is not None: + self._executor.shutdown(wait=True) + self._executor = None + + def __enter__(self) -> "ChainSearchEngine": + """Context manager entry.""" + return self + + def __exit__(self, exc_type: object, exc: object, tb: object) -> None: + """Context manager exit.""" + self.close() + + def search(self, query: str, + source_path: Path, + options: Optional[SearchOptions] = None) -> ChainSearchResult: + """Execute chain search from source_path with recursive traversal. + + Process: + 1. Locate starting index for source_path + 2. Collect all child indexes based on depth limit + 3. Search indexes in parallel using ThreadPoolExecutor + 4. Aggregate, deduplicate, and rank results + + Args: + query: FTS5 search query string + source_path: Starting directory path + options: Search configuration (uses defaults if None) + + Returns: + ChainSearchResult with results, symbols, and statistics + + Examples: + >>> engine = ChainSearchEngine(registry, mapper) + >>> result = engine.search("authentication", Path("D:/project/src")) + >>> for r in result.results[:5]: + ... print(f"{r.path}: {r.score:.2f}") + """ + options = options or SearchOptions() + start_time = time.time() + stats = SearchStats() + + # Step 1: Find starting index + start_index = self._find_start_index(source_path) + if not start_index: + self.logger.warning(f"No index found for {source_path}") + stats.time_ms = (time.time() - start_time) * 1000 + return ChainSearchResult( + query=query, + results=[], + symbols=[], + stats=stats + ) + + # Step 2: Collect all index paths to search + index_paths = self._collect_index_paths(start_index, options.depth) + stats.dirs_searched = len(index_paths) + + if not index_paths: + self.logger.warning(f"No indexes collected from {start_index}") + stats.time_ms = (time.time() - start_time) * 1000 + return ChainSearchResult( + query=query, + results=[], + symbols=[], + stats=stats + ) + + # Step 3: Parallel search + results, search_stats = self._search_parallel( + index_paths, query, options + ) + stats.errors = search_stats.errors + + # Step 4: Merge and rank + final_results = self._merge_and_rank(results, options.total_limit) + stats.files_matched = len(final_results) + + # Optional: Symbol search + symbols = [] + if options.include_symbols: + symbols = self._search_symbols_parallel( + index_paths, query, None, options.total_limit + ) + + stats.time_ms = (time.time() - start_time) * 1000 + + return ChainSearchResult( + query=query, + results=final_results, + symbols=symbols, + stats=stats + ) + + def search_files_only(self, query: str, + source_path: Path, + options: Optional[SearchOptions] = None) -> List[str]: + """Search and return only matching file paths. + + Faster than full search when excerpts are not needed. + + Args: + query: FTS5 search query string + source_path: Starting directory path + options: Search configuration (uses defaults if None) + + Returns: + List of file paths as strings + + Examples: + >>> engine = ChainSearchEngine(registry, mapper) + >>> paths = engine.search_files_only("TODO", Path("D:/project")) + >>> print(f"Found {len(paths)} files with TODOs") + """ + options = options or SearchOptions() + options.files_only = True + + result = self.search(query, source_path, options) + return [r.path for r in result.results] + + def search_symbols(self, name: str, + source_path: Path, + kind: Optional[str] = None, + options: Optional[SearchOptions] = None) -> List[Symbol]: + """Chain symbol search across directory hierarchy. + + Args: + name: Symbol name pattern (partial match supported) + source_path: Starting directory path + kind: Optional symbol kind filter (e.g., 'function', 'class') + options: Search configuration (uses defaults if None) + + Returns: + List of Symbol objects sorted by name + + Examples: + >>> engine = ChainSearchEngine(registry, mapper) + >>> funcs = engine.search_symbols("init", Path("D:/project"), kind="function") + >>> for sym in funcs[:10]: + ... print(f"{sym.name} ({sym.kind}): lines {sym.range}") + """ + options = options or SearchOptions() + + start_index = self._find_start_index(source_path) + if not start_index: + self.logger.warning(f"No index found for {source_path}") + return [] + + index_paths = self._collect_index_paths(start_index, options.depth) + if not index_paths: + return [] + + return self._search_symbols_parallel( + index_paths, name, kind, options.total_limit + ) + + # === Internal Methods === + + def _find_start_index(self, source_path: Path) -> Optional[Path]: + """Find index database path for source directory. + + Attempts exact match first, then searches for nearest ancestor index. + + Args: + source_path: Source directory path + + Returns: + Path to _index.db file, or None if not found + """ + source_path = source_path.resolve() + + # Try exact match first + exact_index = self.mapper.source_to_index_db(source_path) + if exact_index.exists(): + self.logger.debug(f"Found exact index: {exact_index}") + return exact_index + + # Try nearest ancestor via registry + nearest = self.registry.find_nearest_index(source_path) + if nearest: + self.logger.debug(f"Found nearest index: {nearest.index_path}") + return nearest.index_path + + self.logger.warning(f"No index found for {source_path}") + return None + + def _collect_index_paths(self, start_index: Path, + depth: int) -> List[Path]: + """Recursively collect all subdirectory index paths. + + Traverses directory tree via subdirs table in each _index.db, + respecting depth limit. + + Args: + start_index: Starting _index.db path + depth: Maximum depth (-1 = unlimited, 0 = current only) + + Returns: + List of _index.db paths to search + """ + collected = [] + visited = set() + + def _collect_recursive(index_path: Path, current_depth: int): + # Normalize path to avoid duplicates + normalized = index_path.resolve() + if normalized in visited: + return + visited.add(normalized) + + # Add current index + if normalized.exists(): + collected.append(normalized) + else: + self.logger.debug(f"Index does not exist: {normalized}") + return + + # Check depth limit + if depth >= 0 and current_depth >= depth: + return + + # Read subdirs and recurse + try: + with DirIndexStore(normalized) as store: + subdirs = store.get_subdirs() + for subdir in subdirs: + _collect_recursive(subdir.index_path, current_depth + 1) + except Exception as exc: + self.logger.warning(f"Failed to read subdirs from {normalized}: {exc}") + + _collect_recursive(start_index, 0) + self.logger.info(f"Collected {len(collected)} indexes (depth={depth})") + return collected + + def _search_parallel(self, index_paths: List[Path], + query: str, + options: SearchOptions) -> tuple[List[SearchResult], SearchStats]: + """Search multiple indexes in parallel using shared ThreadPoolExecutor. + + Args: + index_paths: List of _index.db paths to search + query: FTS5 query string + options: Search configuration + + Returns: + Tuple of (all results, search statistics) + """ + all_results = [] + stats = SearchStats() + + executor = self._get_executor(options.max_workers) + # Submit all search tasks + future_to_path = { + executor.submit( + self._search_single_index, + idx_path, + query, + options.limit_per_dir, + options.files_only + ): idx_path + for idx_path in index_paths + } + + # Collect results as they complete + for future in as_completed(future_to_path): + idx_path = future_to_path[future] + try: + results = future.result() + all_results.extend(results) + self.logger.debug(f"Got {len(results)} results from {idx_path.parent.name}") + except Exception as exc: + error_msg = f"Search failed for {idx_path}: {exc}" + self.logger.error(error_msg) + stats.errors.append(error_msg) + + return all_results, stats + + def _search_single_index(self, index_path: Path, + query: str, + limit: int, + files_only: bool = False) -> List[SearchResult]: + """Search a single index database. + + Handles exceptions gracefully, returning empty list on failure. + + Args: + index_path: Path to _index.db file + query: FTS5 query string + limit: Maximum results from this index + files_only: If True, skip snippet generation for faster search + + Returns: + List of SearchResult objects (empty on error) + """ + try: + with DirIndexStore(index_path) as store: + if files_only: + # Fast path: return paths only without snippets + paths = store.search_files_only(query, limit=limit) + return [SearchResult(path=p, score=0.0, excerpt="") for p in paths] + else: + return store.search_fts(query, limit=limit) + except Exception as exc: + self.logger.debug(f"Search error in {index_path}: {exc}") + return [] + + def _merge_and_rank(self, results: List[SearchResult], + limit: int) -> List[SearchResult]: + """Aggregate, deduplicate, and rank results. + + Process: + 1. Deduplicate by path (keep highest score) + 2. Sort by score descending + 3. Limit to requested count + + Args: + results: Raw results from all indexes + limit: Maximum results to return + + Returns: + Deduplicated and ranked results + """ + # Deduplicate by path, keeping best score + path_to_result: Dict[str, SearchResult] = {} + for result in results: + path = result.path + if path not in path_to_result or result.score > path_to_result[path].score: + path_to_result[path] = result + + # Sort by score descending + unique_results = list(path_to_result.values()) + unique_results.sort(key=lambda r: r.score, reverse=True) + + # Apply limit + return unique_results[:limit] + + def _search_symbols_parallel(self, index_paths: List[Path], + name: str, + kind: Optional[str], + limit: int) -> List[Symbol]: + """Search symbols across multiple indexes in parallel. + + Args: + index_paths: List of _index.db paths to search + name: Symbol name pattern + kind: Optional symbol kind filter + limit: Total symbol limit + + Returns: + Deduplicated and sorted symbols + """ + all_symbols = [] + + executor = self._get_executor() + # Submit all symbol search tasks + future_to_path = { + executor.submit( + self._search_symbols_single, + idx_path, + name, + kind + ): idx_path + for idx_path in index_paths + } + + # Collect results + for future in as_completed(future_to_path): + try: + symbols = future.result() + all_symbols.extend(symbols) + except Exception as exc: + self.logger.error(f"Symbol search failed: {exc}") + + # Deduplicate by (name, kind, range) + seen = set() + unique_symbols = [] + for sym in all_symbols: + key = (sym.name, sym.kind, sym.range) + if key not in seen: + seen.add(key) + unique_symbols.append(sym) + + # Sort by name + unique_symbols.sort(key=lambda s: s.name) + + return unique_symbols[:limit] + + def _search_symbols_single(self, index_path: Path, + name: str, + kind: Optional[str]) -> List[Symbol]: + """Search symbols in a single index. + + Args: + index_path: Path to _index.db file + name: Symbol name pattern + kind: Optional symbol kind filter + + Returns: + List of Symbol objects (empty on error) + """ + try: + with DirIndexStore(index_path) as store: + return store.search_symbols(name, kind=kind) + except Exception as exc: + self.logger.debug(f"Symbol search error in {index_path}: {exc}") + return [] + + +# === Convenience Functions === + +def quick_search(query: str, + source_path: Path, + depth: int = -1) -> List[SearchResult]: + """Quick search convenience function with automatic initialization. + + Creates temporary registry and mapper instances for one-off searches. + For repeated searches, create a ChainSearchEngine instance directly. + + Args: + query: FTS5 search query string + source_path: Starting directory path + depth: Maximum search depth (-1 = unlimited) + + Returns: + List of SearchResult objects sorted by relevance + + Examples: + >>> from pathlib import Path + >>> results = quick_search("authentication", Path("D:/project/src")) + >>> print(f"Found {len(results)} matches") + """ + registry = RegistryStore() + registry.initialize() + + mapper = PathMapper() + + engine = ChainSearchEngine(registry, mapper) + options = SearchOptions(depth=depth) + + result = engine.search(query, source_path, options) + + registry.close() + + return result.results diff --git a/codex-lens/src/codexlens/semantic/vector_store.py b/codex-lens/src/codexlens/semantic/vector_store.py index 1b9712c0..4b7b22bb 100644 --- a/codex-lens/src/codexlens/semantic/vector_store.py +++ b/codex-lens/src/codexlens/semantic/vector_store.py @@ -1,9 +1,16 @@ -"""Vector storage and similarity search for semantic chunks.""" +"""Vector storage and similarity search for semantic chunks. + +Optimized for high-performance similarity search using: +- Cached embedding matrix for batch operations +- NumPy vectorized cosine similarity (100x+ faster than loops) +- Lazy content loading (only fetch for top-k results) +""" from __future__ import annotations import json import sqlite3 +import threading from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -34,7 +41,14 @@ def _cosine_similarity(a: List[float], b: List[float]) -> float: class VectorStore: - """SQLite-based vector storage with cosine similarity search.""" + """SQLite-based vector storage with optimized cosine similarity search. + + Performance optimizations: + - Embedding matrix cached in memory for batch similarity computation + - NumPy vectorized operations instead of Python loops + - Lazy content loading - only fetch full content for top-k results + - Thread-safe cache invalidation + """ def __init__(self, db_path: str | Path) -> None: if not SEMANTIC_AVAILABLE: @@ -45,11 +59,21 @@ class VectorStore: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) + + # Embedding cache for fast similarity search + self._cache_lock = threading.RLock() + self._embedding_matrix: Optional[np.ndarray] = None + self._embedding_norms: Optional[np.ndarray] = None + self._chunk_ids: Optional[List[int]] = None + self._cache_version: int = 0 + self._init_schema() def _init_schema(self) -> None: """Initialize vector storage schema.""" with sqlite3.connect(self.db_path) as conn: + # Enable memory mapping for faster reads + conn.execute("PRAGMA mmap_size = 30000000000") # 30GB limit conn.execute(""" CREATE TABLE IF NOT EXISTS semantic_chunks ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -66,6 +90,53 @@ class VectorStore: """) conn.commit() + def _invalidate_cache(self) -> None: + """Invalidate the embedding cache (thread-safe).""" + with self._cache_lock: + self._embedding_matrix = None + self._embedding_norms = None + self._chunk_ids = None + self._cache_version += 1 + + def _refresh_cache(self) -> bool: + """Load embeddings into numpy matrix for fast similarity search. + + Returns: + True if cache was refreshed successfully, False if no data. + """ + with self._cache_lock: + with sqlite3.connect(self.db_path) as conn: + conn.execute("PRAGMA mmap_size = 30000000000") + rows = conn.execute( + "SELECT id, embedding FROM semantic_chunks" + ).fetchall() + + if not rows: + self._embedding_matrix = None + self._embedding_norms = None + self._chunk_ids = None + return False + + # Extract IDs and embeddings + self._chunk_ids = [r[0] for r in rows] + + # Bulk convert binary blobs to numpy matrix + embeddings = [ + np.frombuffer(r[1], dtype=np.float32) for r in rows + ] + self._embedding_matrix = np.vstack(embeddings) + + # Pre-compute norms for faster similarity calculation + self._embedding_norms = np.linalg.norm( + self._embedding_matrix, axis=1, keepdims=True + ) + # Avoid division by zero + self._embedding_norms = np.where( + self._embedding_norms == 0, 1e-10, self._embedding_norms + ) + + return True + def add_chunk(self, chunk: SemanticChunk, file_path: str) -> int: """Add a single chunk with its embedding. @@ -87,17 +158,46 @@ class VectorStore: (file_path, chunk.content, embedding_blob, metadata_json) ) conn.commit() - return cursor.lastrowid or 0 + chunk_id = cursor.lastrowid or 0 + + # Invalidate cache after modification + self._invalidate_cache() + return chunk_id def add_chunks(self, chunks: List[SemanticChunk], file_path: str) -> List[int]: - """Add multiple chunks with embeddings. + """Add multiple chunks with embeddings (batch insert). Returns: List of inserted chunk IDs. """ - ids = [] + if not chunks: + return [] + + # Prepare batch data + batch_data = [] for chunk in chunks: - ids.append(self.add_chunk(chunk, file_path)) + if chunk.embedding is None: + raise ValueError("All chunks must have embeddings") + embedding_blob = np.array(chunk.embedding, dtype=np.float32).tobytes() + metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None + batch_data.append((file_path, chunk.content, embedding_blob, metadata_json)) + + # Batch insert + with sqlite3.connect(self.db_path) as conn: + cursor = conn.executemany( + """ + INSERT INTO semantic_chunks (file_path, content, embedding, metadata) + VALUES (?, ?, ?, ?) + """, + batch_data + ) + conn.commit() + # Get inserted IDs (approximate - assumes sequential) + last_id = cursor.lastrowid or 0 + ids = list(range(last_id - len(chunks) + 1, last_id + 1)) + + # Invalidate cache after modification + self._invalidate_cache() return ids def delete_file_chunks(self, file_path: str) -> int: @@ -112,7 +212,11 @@ class VectorStore: (file_path,) ) conn.commit() - return cursor.rowcount + deleted = cursor.rowcount + + if deleted > 0: + self._invalidate_cache() + return deleted def search_similar( self, @@ -123,6 +227,11 @@ class VectorStore: ) -> List[SearchResult]: """Find chunks most similar to query embedding. + Optimized with: + - Vectorized NumPy similarity computation (100x+ faster) + - Cached embedding matrix (avoids repeated DB reads) + - Lazy content loading (only fetch for top-k results) + Args: query_embedding: Query vector. top_k: Maximum results to return. @@ -132,62 +241,132 @@ class VectorStore: Returns: List of SearchResult ordered by similarity (highest first). """ - results: List[Tuple[float, SearchResult]] = [] + with self._cache_lock: + # Refresh cache if needed + if self._embedding_matrix is None: + if not self._refresh_cache(): + return [] # No data + + # Vectorized cosine similarity + query_vec = np.array(query_embedding, dtype=np.float32).reshape(1, -1) + query_norm = np.linalg.norm(query_vec) + if query_norm == 0: + return [] + + # Compute all similarities at once: (N,) scores + # similarity = (A @ B.T) / (||A|| * ||B||) + dot_products = np.dot(self._embedding_matrix, query_vec.T).flatten() + scores = dot_products / (self._embedding_norms.flatten() * query_norm) + + # Filter by min_score and get top-k indices + valid_mask = scores >= min_score + valid_indices = np.where(valid_mask)[0] + + if len(valid_indices) == 0: + return [] + + # Sort by score descending and take top_k + valid_scores = scores[valid_indices] + sorted_order = np.argsort(valid_scores)[::-1][:top_k] + top_indices = valid_indices[sorted_order] + top_scores = valid_scores[sorted_order] + + # Get chunk IDs for top results + top_ids = [self._chunk_ids[i] for i in top_indices] + + # Fetch content only for top-k results (lazy loading) + results = self._fetch_results_by_ids( + top_ids, top_scores.tolist(), return_full_content + ) + + return results + + def _fetch_results_by_ids( + self, + chunk_ids: List[int], + scores: List[float], + return_full_content: bool, + ) -> List[SearchResult]: + """Fetch full result data for specific chunk IDs. + + Args: + chunk_ids: List of chunk IDs to fetch. + scores: Corresponding similarity scores. + return_full_content: Whether to include full content. + + Returns: + List of SearchResult objects. + """ + if not chunk_ids: + return [] + + # Build parameterized query for IN clause + placeholders = ",".join("?" * len(chunk_ids)) + query = f""" + SELECT id, file_path, content, metadata + FROM semantic_chunks + WHERE id IN ({placeholders}) + """ with sqlite3.connect(self.db_path) as conn: - rows = conn.execute( - "SELECT id, file_path, content, embedding, metadata FROM semantic_chunks" - ).fetchall() + conn.execute("PRAGMA mmap_size = 30000000000") + rows = conn.execute(query, chunk_ids).fetchall() - for row_id, file_path, content, embedding_blob, metadata_json in rows: - stored_embedding = np.frombuffer(embedding_blob, dtype=np.float32).tolist() - score = _cosine_similarity(query_embedding, stored_embedding) + # Build ID -> row mapping + id_to_row = {r[0]: r for r in rows} - if score >= min_score: - metadata = json.loads(metadata_json) if metadata_json else {} + results = [] + for chunk_id, score in zip(chunk_ids, scores): + row = id_to_row.get(chunk_id) + if not row: + continue - # Build excerpt (short preview) - excerpt = content[:200] + "..." if len(content) > 200 else content - - # Extract symbol information from metadata - symbol_name = metadata.get("symbol_name") - symbol_kind = metadata.get("symbol_kind") - start_line = metadata.get("start_line") - end_line = metadata.get("end_line") - - # Build Symbol object if we have symbol info - symbol = None - if symbol_name and symbol_kind and start_line and end_line: - try: - from codexlens.entities import Symbol - symbol = Symbol( - name=symbol_name, - kind=symbol_kind, - range=(start_line, end_line) - ) - except Exception: - pass + _, file_path, content, metadata_json = row + metadata = json.loads(metadata_json) if metadata_json else {} - results.append((score, SearchResult( - path=file_path, - score=score, - excerpt=excerpt, - content=content if return_full_content else None, - symbol=symbol, - metadata=metadata, - start_line=start_line, - end_line=end_line, - symbol_name=symbol_name, - symbol_kind=symbol_kind, - ))) + # Build excerpt (short preview) + excerpt = content[:200] + "..." if len(content) > 200 else content - # Sort by score descending - results.sort(key=lambda x: x[0], reverse=True) + # Extract symbol information from metadata + symbol_name = metadata.get("symbol_name") + symbol_kind = metadata.get("symbol_kind") + start_line = metadata.get("start_line") + end_line = metadata.get("end_line") - return [r for _, r in results[:top_k]] + # Build Symbol object if we have symbol info + symbol = None + if symbol_name and symbol_kind and start_line and end_line: + try: + from codexlens.entities import Symbol + symbol = Symbol( + name=symbol_name, + kind=symbol_kind, + range=(start_line, end_line) + ) + except Exception: + pass + + results.append(SearchResult( + path=file_path, + score=score, + excerpt=excerpt, + content=content if return_full_content else None, + symbol=symbol, + metadata=metadata, + start_line=start_line, + end_line=end_line, + symbol_name=symbol_name, + symbol_kind=symbol_kind, + )) + + return results def count_chunks(self) -> int: """Count total chunks in store.""" with sqlite3.connect(self.db_path) as conn: row = conn.execute("SELECT COUNT(*) FROM semantic_chunks").fetchone() return row[0] if row else 0 + + def clear_cache(self) -> None: + """Manually clear the embedding cache.""" + self._invalidate_cache() diff --git a/codex-lens/src/codexlens/storage/__init__.py b/codex-lens/src/codexlens/storage/__init__.py index 293162b3..dd0820eb 100644 --- a/codex-lens/src/codexlens/storage/__init__.py +++ b/codex-lens/src/codexlens/storage/__init__.py @@ -3,6 +3,27 @@ from __future__ import annotations from .sqlite_store import SQLiteStore +from .path_mapper import PathMapper +from .registry import RegistryStore, ProjectInfo, DirMapping +from .dir_index import DirIndexStore, SubdirLink, FileEntry +from .index_tree import IndexTreeBuilder, BuildResult, DirBuildResult -__all__ = ["SQLiteStore"] +__all__ = [ + # Legacy (workspace-local) + "SQLiteStore", + # Path mapping + "PathMapper", + # Global registry + "RegistryStore", + "ProjectInfo", + "DirMapping", + # Directory index + "DirIndexStore", + "SubdirLink", + "FileEntry", + # Tree builder + "IndexTreeBuilder", + "BuildResult", + "DirBuildResult", +] diff --git a/codex-lens/src/codexlens/storage/dir_index.py b/codex-lens/src/codexlens/storage/dir_index.py new file mode 100644 index 00000000..6f891585 --- /dev/null +++ b/codex-lens/src/codexlens/storage/dir_index.py @@ -0,0 +1,797 @@ +"""Single-directory index storage with hierarchical linking. + +Each directory maintains its own _index.db with: +- Files in the current directory +- Links to subdirectory indexes +- Full-text search via FTS5 +- Symbol table for code navigation +""" + +from __future__ import annotations + +import sqlite3 +import threading +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from codexlens.entities import SearchResult, Symbol +from codexlens.errors import StorageError + + +@dataclass +class SubdirLink: + """Link to a subdirectory's index database.""" + + id: int + name: str + index_path: Path + files_count: int + direct_files: int + last_updated: float + + +@dataclass +class FileEntry: + """Metadata for an indexed file in current directory.""" + + id: int + name: str + full_path: Path + language: str + mtime: float + line_count: int + + +class DirIndexStore: + """Single-directory index storage with hierarchical subdirectory linking. + + Each directory has an independent _index.db containing: + - Files table: Files in this directory only + - Subdirs table: Links to child directory indexes + - Symbols table: Code symbols from files + - FTS5 index: Full-text search on file content + + Thread-safe operations with WAL mode enabled. + """ + + def __init__(self, db_path: str | Path) -> None: + """Initialize directory index store. + + Args: + db_path: Path to _index.db file for this directory + """ + self.db_path = Path(db_path).resolve() + self._lock = threading.RLock() + self._conn: Optional[sqlite3.Connection] = None + + def initialize(self) -> None: + """Create database and schema if not exists.""" + with self._lock: + self.db_path.parent.mkdir(parents=True, exist_ok=True) + conn = self._get_connection() + self._create_schema(conn) + self._create_fts_triggers(conn) + conn.commit() + + def close(self) -> None: + """Close database connection.""" + with self._lock: + if self._conn is not None: + try: + self._conn.close() + except Exception: + pass + finally: + self._conn = None + + def __enter__(self) -> DirIndexStore: + """Context manager entry.""" + self.initialize() + return self + + def __exit__(self, exc_type: object, exc: object, tb: object) -> None: + """Context manager exit.""" + self.close() + + # === File Operations === + + def add_file( + self, + name: str, + full_path: str | Path, + content: str, + language: str, + symbols: Optional[List[Symbol]] = None, + ) -> int: + """Add or update a file in the current directory index. + + Args: + name: Filename without path + full_path: Complete source file path + content: File content for indexing + language: Programming language identifier + symbols: List of Symbol objects from the file + + Returns: + Database file_id + + Raises: + StorageError: If database operations fail + """ + with self._lock: + conn = self._get_connection() + full_path_str = str(Path(full_path).resolve()) + mtime = Path(full_path_str).stat().st_mtime if Path(full_path_str).exists() else None + line_count = content.count('\n') + 1 + + try: + conn.execute( + """ + INSERT INTO files(name, full_path, language, content, mtime, line_count) + VALUES(?, ?, ?, ?, ?, ?) + ON CONFLICT(full_path) DO UPDATE SET + name=excluded.name, + language=excluded.language, + content=excluded.content, + mtime=excluded.mtime, + line_count=excluded.line_count + """, + (name, full_path_str, language, content, mtime, line_count), + ) + + row = conn.execute("SELECT id FROM files WHERE full_path=?", (full_path_str,)).fetchone() + if not row: + raise StorageError(f"Failed to retrieve file_id for {full_path_str}") + + file_id = int(row["id"]) + + # Replace symbols + conn.execute("DELETE FROM symbols WHERE file_id=?", (file_id,)) + if symbols: + conn.executemany( + """ + INSERT INTO symbols(file_id, name, kind, start_line, end_line) + VALUES(?, ?, ?, ?, ?) + """, + [ + (file_id, s.name, s.kind, s.range[0], s.range[1]) + for s in symbols + ], + ) + + conn.commit() + return file_id + + except sqlite3.DatabaseError as exc: + conn.rollback() + raise StorageError(f"Failed to add file {name}: {exc}") from exc + + def add_files_batch( + self, files: List[Tuple[str, Path, str, str, Optional[List[Symbol]]]] + ) -> int: + """Add multiple files in a single transaction. + + Args: + files: List of (name, full_path, content, language, symbols) tuples + + Returns: + Number of files added + + Raises: + StorageError: If batch operation fails + """ + with self._lock: + conn = self._get_connection() + count = 0 + + try: + conn.execute("BEGIN") + + for name, full_path, content, language, symbols in files: + full_path_str = str(Path(full_path).resolve()) + mtime = Path(full_path_str).stat().st_mtime if Path(full_path_str).exists() else None + line_count = content.count('\n') + 1 + + conn.execute( + """ + INSERT INTO files(name, full_path, language, content, mtime, line_count) + VALUES(?, ?, ?, ?, ?, ?) + ON CONFLICT(full_path) DO UPDATE SET + name=excluded.name, + language=excluded.language, + content=excluded.content, + mtime=excluded.mtime, + line_count=excluded.line_count + """, + (name, full_path_str, language, content, mtime, line_count), + ) + + row = conn.execute("SELECT id FROM files WHERE full_path=?", (full_path_str,)).fetchone() + if not row: + raise StorageError(f"Failed to retrieve file_id for {full_path_str}") + + file_id = int(row["id"]) + count += 1 + + conn.execute("DELETE FROM symbols WHERE file_id=?", (file_id,)) + if symbols: + conn.executemany( + """ + INSERT INTO symbols(file_id, name, kind, start_line, end_line) + VALUES(?, ?, ?, ?, ?) + """, + [ + (file_id, s.name, s.kind, s.range[0], s.range[1]) + for s in symbols + ], + ) + + conn.commit() + return count + + except sqlite3.DatabaseError as exc: + conn.rollback() + raise StorageError(f"Batch insert failed: {exc}") from exc + + def remove_file(self, full_path: str | Path) -> bool: + """Remove a file from the index. + + Args: + full_path: Complete source file path + + Returns: + True if file was removed, False if not found + """ + with self._lock: + conn = self._get_connection() + full_path_str = str(Path(full_path).resolve()) + + row = conn.execute("SELECT id FROM files WHERE full_path=?", (full_path_str,)).fetchone() + if not row: + return False + + file_id = int(row["id"]) + conn.execute("DELETE FROM files WHERE id=?", (file_id,)) + conn.commit() + return True + + def get_file(self, full_path: str | Path) -> Optional[FileEntry]: + """Get file metadata. + + Args: + full_path: Complete source file path + + Returns: + FileEntry if found, None otherwise + """ + with self._lock: + conn = self._get_connection() + full_path_str = str(Path(full_path).resolve()) + + row = conn.execute( + """ + SELECT id, name, full_path, language, mtime, line_count + FROM files WHERE full_path=? + """, + (full_path_str,), + ).fetchone() + + if not row: + return None + + return FileEntry( + id=int(row["id"]), + name=row["name"], + full_path=Path(row["full_path"]), + language=row["language"], + mtime=float(row["mtime"]) if row["mtime"] else 0.0, + line_count=int(row["line_count"]) if row["line_count"] else 0, + ) + + def get_file_mtime(self, full_path: str | Path) -> Optional[float]: + """Get stored modification time for a file. + + Args: + full_path: Complete source file path + + Returns: + Modification time as float, or None if not found + """ + with self._lock: + conn = self._get_connection() + full_path_str = str(Path(full_path).resolve()) + + row = conn.execute( + "SELECT mtime FROM files WHERE full_path=?", (full_path_str,) + ).fetchone() + + return float(row["mtime"]) if row and row["mtime"] else None + + def list_files(self) -> List[FileEntry]: + """List all files in current directory. + + Returns: + List of FileEntry objects + """ + with self._lock: + conn = self._get_connection() + rows = conn.execute( + """ + SELECT id, name, full_path, language, mtime, line_count + FROM files + ORDER BY name + """ + ).fetchall() + + return [ + FileEntry( + id=int(row["id"]), + name=row["name"], + full_path=Path(row["full_path"]), + language=row["language"], + mtime=float(row["mtime"]) if row["mtime"] else 0.0, + line_count=int(row["line_count"]) if row["line_count"] else 0, + ) + for row in rows + ] + + def file_count(self) -> int: + """Get number of files in current directory. + + Returns: + File count + """ + with self._lock: + conn = self._get_connection() + row = conn.execute("SELECT COUNT(*) AS c FROM files").fetchone() + return int(row["c"]) if row else 0 + + # === Subdirectory Links === + + def register_subdir( + self, + name: str, + index_path: str | Path, + files_count: int = 0, + direct_files: int = 0, + ) -> None: + """Register or update a subdirectory link. + + Args: + name: Subdirectory name + index_path: Path to subdirectory's _index.db + files_count: Total files recursively + direct_files: Files directly in subdirectory + """ + with self._lock: + conn = self._get_connection() + index_path_str = str(Path(index_path).resolve()) + + import time + last_updated = time.time() + + conn.execute( + """ + INSERT INTO subdirs(name, index_path, files_count, direct_files, last_updated) + VALUES(?, ?, ?, ?, ?) + ON CONFLICT(name) DO UPDATE SET + index_path=excluded.index_path, + files_count=excluded.files_count, + direct_files=excluded.direct_files, + last_updated=excluded.last_updated + """, + (name, index_path_str, files_count, direct_files, last_updated), + ) + conn.commit() + + def unregister_subdir(self, name: str) -> bool: + """Remove a subdirectory link. + + Args: + name: Subdirectory name + + Returns: + True if removed, False if not found + """ + with self._lock: + conn = self._get_connection() + row = conn.execute("SELECT id FROM subdirs WHERE name=?", (name,)).fetchone() + if not row: + return False + + conn.execute("DELETE FROM subdirs WHERE name=?", (name,)) + conn.commit() + return True + + def get_subdirs(self) -> List[SubdirLink]: + """Get all subdirectory links. + + Returns: + List of SubdirLink objects + """ + with self._lock: + conn = self._get_connection() + rows = conn.execute( + """ + SELECT id, name, index_path, files_count, direct_files, last_updated + FROM subdirs + ORDER BY name + """ + ).fetchall() + + return [ + SubdirLink( + id=int(row["id"]), + name=row["name"], + index_path=Path(row["index_path"]), + files_count=int(row["files_count"]) if row["files_count"] else 0, + direct_files=int(row["direct_files"]) if row["direct_files"] else 0, + last_updated=float(row["last_updated"]) if row["last_updated"] else 0.0, + ) + for row in rows + ] + + def get_subdir(self, name: str) -> Optional[SubdirLink]: + """Get a specific subdirectory link. + + Args: + name: Subdirectory name + + Returns: + SubdirLink if found, None otherwise + """ + with self._lock: + conn = self._get_connection() + row = conn.execute( + """ + SELECT id, name, index_path, files_count, direct_files, last_updated + FROM subdirs WHERE name=? + """, + (name,), + ).fetchone() + + if not row: + return None + + return SubdirLink( + id=int(row["id"]), + name=row["name"], + index_path=Path(row["index_path"]), + files_count=int(row["files_count"]) if row["files_count"] else 0, + direct_files=int(row["direct_files"]) if row["direct_files"] else 0, + last_updated=float(row["last_updated"]) if row["last_updated"] else 0.0, + ) + + def update_subdir_stats( + self, name: str, files_count: int, direct_files: Optional[int] = None + ) -> None: + """Update subdirectory statistics. + + Args: + name: Subdirectory name + files_count: Total files recursively + direct_files: Files directly in subdirectory (optional) + """ + with self._lock: + conn = self._get_connection() + import time + last_updated = time.time() + + if direct_files is not None: + conn.execute( + """ + UPDATE subdirs + SET files_count=?, direct_files=?, last_updated=? + WHERE name=? + """, + (files_count, direct_files, last_updated, name), + ) + else: + conn.execute( + """ + UPDATE subdirs + SET files_count=?, last_updated=? + WHERE name=? + """, + (files_count, last_updated, name), + ) + conn.commit() + + # === Search === + + def search_fts(self, query: str, limit: int = 20) -> List[SearchResult]: + """Full-text search in current directory files. + + Args: + query: FTS5 query string + limit: Maximum results to return + + Returns: + List of SearchResult objects sorted by relevance + + Raises: + StorageError: If FTS search fails + """ + with self._lock: + conn = self._get_connection() + try: + rows = conn.execute( + """ + SELECT rowid, full_path, bm25(files_fts) AS rank, + snippet(files_fts, 2, '[bold red]', '[/bold red]', '...', 20) AS excerpt + FROM files_fts + WHERE files_fts MATCH ? + ORDER BY rank + LIMIT ? + """, + (query, limit), + ).fetchall() + except sqlite3.DatabaseError as exc: + raise StorageError(f"FTS search failed: {exc}") from exc + + results: List[SearchResult] = [] + for row in rows: + rank = float(row["rank"]) if row["rank"] is not None else 0.0 + score = abs(rank) if rank < 0 else 0.0 + results.append( + SearchResult( + path=row["full_path"], + score=score, + excerpt=row["excerpt"], + ) + ) + return results + + def search_files_only(self, query: str, limit: int = 20) -> List[str]: + """Fast FTS search returning only file paths (no snippet generation). + + Optimized for when only file paths are needed, skipping expensive + snippet() function call. + + Args: + query: FTS5 query string + limit: Maximum results to return + + Returns: + List of file paths as strings + + Raises: + StorageError: If FTS search fails + """ + with self._lock: + conn = self._get_connection() + try: + rows = conn.execute( + """ + SELECT full_path + FROM files_fts + WHERE files_fts MATCH ? + ORDER BY bm25(files_fts) + LIMIT ? + """, + (query, limit), + ).fetchall() + except sqlite3.DatabaseError as exc: + raise StorageError(f"FTS search failed: {exc}") from exc + + return [row["full_path"] for row in rows] + + def search_symbols( + self, name: str, kind: Optional[str] = None, limit: int = 50 + ) -> List[Symbol]: + """Search symbols by name pattern. + + Args: + name: Symbol name pattern (LIKE query) + kind: Optional symbol kind filter + limit: Maximum results to return + + Returns: + List of Symbol objects + """ + pattern = f"%{name}%" + with self._lock: + conn = self._get_connection() + if kind: + rows = conn.execute( + """ + SELECT name, kind, start_line, end_line + FROM symbols + WHERE name LIKE ? AND kind=? + ORDER BY name + LIMIT ? + """, + (pattern, kind, limit), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT name, kind, start_line, end_line + FROM symbols + WHERE name LIKE ? + ORDER BY name + LIMIT ? + """, + (pattern, limit), + ).fetchall() + + return [ + Symbol( + name=row["name"], + kind=row["kind"], + range=(row["start_line"], row["end_line"]), + ) + for row in rows + ] + + # === Statistics === + + def stats(self) -> Dict[str, Any]: + """Get current directory statistics. + + Returns: + Dictionary containing: + - files: Number of files in this directory + - symbols: Number of symbols + - subdirs: Number of subdirectories + - total_files: Total files including subdirectories + - languages: Dictionary of language counts + """ + with self._lock: + conn = self._get_connection() + + file_count = conn.execute("SELECT COUNT(*) AS c FROM files").fetchone()["c"] + symbol_count = conn.execute("SELECT COUNT(*) AS c FROM symbols").fetchone()["c"] + subdir_count = conn.execute("SELECT COUNT(*) AS c FROM subdirs").fetchone()["c"] + + total_files_row = conn.execute( + "SELECT COALESCE(SUM(files_count), 0) AS total FROM subdirs" + ).fetchone() + total_files = int(file_count) + int(total_files_row["total"] if total_files_row else 0) + + lang_rows = conn.execute( + "SELECT language, COUNT(*) AS c FROM files GROUP BY language ORDER BY c DESC" + ).fetchall() + languages = {row["language"]: int(row["c"]) for row in lang_rows} + + return { + "files": int(file_count), + "symbols": int(symbol_count), + "subdirs": int(subdir_count), + "total_files": total_files, + "languages": languages, + } + + # === Internal Methods === + + def _get_connection(self) -> sqlite3.Connection: + """Get or create database connection with proper configuration. + + Returns: + sqlite3.Connection with WAL mode and foreign keys enabled + """ + if self._conn is None: + self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute("PRAGMA synchronous=NORMAL") + self._conn.execute("PRAGMA foreign_keys=ON") + # Memory-mapped I/O for faster reads (30GB limit) + self._conn.execute("PRAGMA mmap_size=30000000000") + return self._conn + + def _create_schema(self, conn: sqlite3.Connection) -> None: + """Create database schema. + + Args: + conn: Database connection + + Raises: + StorageError: If schema creation fails + """ + try: + # Files table + conn.execute( + """ + CREATE TABLE IF NOT EXISTS files ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + full_path TEXT UNIQUE NOT NULL, + language TEXT, + content TEXT, + mtime REAL, + line_count INTEGER + ) + """ + ) + + # Subdirectories table + conn.execute( + """ + CREATE TABLE IF NOT EXISTS subdirs ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + index_path TEXT NOT NULL, + files_count INTEGER DEFAULT 0, + direct_files INTEGER DEFAULT 0, + last_updated REAL + ) + """ + ) + + # Symbols table + conn.execute( + """ + CREATE TABLE IF NOT EXISTS symbols ( + id INTEGER PRIMARY KEY, + file_id INTEGER REFERENCES files(id) ON DELETE CASCADE, + name TEXT NOT NULL, + kind TEXT NOT NULL, + start_line INTEGER, + end_line INTEGER + ) + """ + ) + + # FTS5 external content table with code-friendly tokenizer + # unicode61 tokenchars keeps underscores as part of tokens + # so 'user_id' is indexed as one token, not 'user' and 'id' + conn.execute( + """ + CREATE VIRTUAL TABLE IF NOT EXISTS files_fts USING fts5( + name, full_path UNINDEXED, content, + content='files', + content_rowid='id', + tokenize="unicode61 tokenchars '_'" + ) + """ + ) + + # Indexes + conn.execute("CREATE INDEX IF NOT EXISTS idx_files_name ON files(name)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_files_path ON files(full_path)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_subdirs_name ON subdirs(name)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_file ON symbols(file_id)") + + except sqlite3.DatabaseError as exc: + raise StorageError(f"Failed to create schema: {exc}") from exc + + def _create_fts_triggers(self, conn: sqlite3.Connection) -> None: + """Create FTS5 external content triggers. + + Args: + conn: Database connection + """ + # Insert trigger + conn.execute( + """ + CREATE TRIGGER IF NOT EXISTS files_ai AFTER INSERT ON files BEGIN + INSERT INTO files_fts(rowid, name, full_path, content) + VALUES(new.id, new.name, new.full_path, new.content); + END + """ + ) + + # Delete trigger + conn.execute( + """ + CREATE TRIGGER IF NOT EXISTS files_ad AFTER DELETE ON files BEGIN + INSERT INTO files_fts(files_fts, rowid, name, full_path, content) + VALUES('delete', old.id, old.name, old.full_path, old.content); + END + """ + ) + + # Update trigger + conn.execute( + """ + CREATE TRIGGER IF NOT EXISTS files_au AFTER UPDATE ON files BEGIN + INSERT INTO files_fts(files_fts, rowid, name, full_path, content) + VALUES('delete', old.id, old.name, old.full_path, old.content); + INSERT INTO files_fts(rowid, name, full_path, content) + VALUES(new.id, new.name, new.full_path, new.content); + END + """ + ) diff --git a/codex-lens/src/codexlens/storage/index_tree.py b/codex-lens/src/codexlens/storage/index_tree.py new file mode 100644 index 00000000..d6618546 --- /dev/null +++ b/codex-lens/src/codexlens/storage/index_tree.py @@ -0,0 +1,698 @@ +"""Hierarchical index tree builder for CodexLens. + +Constructs a bottom-up directory index tree with parallel processing support. +Each directory maintains its own _index.db with files and subdirectory links. +""" + +from __future__ import annotations + +import logging +import os +import time +from concurrent.futures import ProcessPoolExecutor, as_completed +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional, Set + +from codexlens.config import Config +from codexlens.parsers.factory import ParserFactory +from codexlens.storage.dir_index import DirIndexStore +from codexlens.storage.path_mapper import PathMapper +from codexlens.storage.registry import ProjectInfo, RegistryStore + + +@dataclass +class BuildResult: + """Complete build operation result.""" + + project_id: int + source_root: Path + index_root: Path + total_files: int + total_dirs: int + errors: List[str] + + +@dataclass +class DirBuildResult: + """Single directory build result.""" + + source_path: Path + index_path: Path + files_count: int + symbols_count: int + subdirs: List[str] # Subdirectory names + error: Optional[str] = None + + +class IndexTreeBuilder: + """Hierarchical index tree builder with parallel processing. + + Builds directory indexes bottom-up to enable proper subdirectory linking. + Each directory gets its own _index.db containing: + - Files in that directory + - Links to child directory indexes + - Symbols and FTS5 search + + Attributes: + registry: Global project registry + mapper: Path mapping between source and index + config: CodexLens configuration + parser_factory: Parser factory for symbol extraction + logger: Logger instance + IGNORE_DIRS: Set of directory names to skip during indexing + """ + + # Directories to skip during indexing + IGNORE_DIRS: Set[str] = { + ".git", + ".venv", + "venv", + "node_modules", + "__pycache__", + ".codexlens", + ".idea", + ".vscode", + } + + def __init__( + self, registry: RegistryStore, mapper: PathMapper, config: Config = None + ): + """Initialize the index tree builder. + + Args: + registry: Global registry store for project tracking + mapper: Path mapper for source to index conversions + config: CodexLens configuration (uses defaults if None) + """ + self.registry = registry + self.mapper = mapper + self.config = config or Config() + self.parser_factory = ParserFactory(self.config) + self.logger = logging.getLogger(__name__) + + def build( + self, + source_root: Path, + languages: List[str] = None, + workers: int = 4, + ) -> BuildResult: + """Build complete index tree for a project. + + Process: + 1. Register project in registry + 2. Collect all directories grouped by depth + 3. Build indexes bottom-up (deepest first) + 4. Link subdirectories to parents + 5. Update project statistics + + Args: + source_root: Project root directory to index + languages: Optional list of language IDs to limit indexing + workers: Number of parallel worker processes + + Returns: + BuildResult with statistics and errors + + Raises: + ValueError: If source_root doesn't exist + """ + source_root = source_root.resolve() + if not source_root.exists(): + raise ValueError(f"Source root does not exist: {source_root}") + + self.logger.info("Building index tree for %s", source_root) + + # Register project + index_root = self.mapper.source_to_index_dir(source_root) + project_info = self.registry.register_project(source_root, index_root) + + # Collect directories by depth + dirs_by_depth = self._collect_dirs_by_depth(source_root, languages) + + if not dirs_by_depth: + self.logger.warning("No indexable directories found in %s", source_root) + return BuildResult( + project_id=project_info.id, + source_root=source_root, + index_root=index_root, + total_files=0, + total_dirs=0, + errors=["No indexable directories found"], + ) + + total_files = 0 + total_dirs = 0 + all_errors: List[str] = [] + all_results: List[DirBuildResult] = [] # Store all results for subdir linking + + # Build bottom-up (highest depth first) + max_depth = max(dirs_by_depth.keys()) + for depth in range(max_depth, -1, -1): + if depth not in dirs_by_depth: + continue + + dirs = dirs_by_depth[depth] + self.logger.info("Building %d directories at depth %d", len(dirs), depth) + + # Build directories at this level in parallel + results = self._build_level_parallel(dirs, languages, workers) + all_results.extend(results) + + # Process results + for result in results: + if result.error: + all_errors.append(f"{result.source_path}: {result.error}") + continue + + total_files += result.files_count + total_dirs += 1 + + # Register directory in registry + self.registry.register_dir( + project_id=project_info.id, + source_path=result.source_path, + index_path=result.index_path, + depth=self.mapper.get_relative_depth(result.source_path, source_root), + files_count=result.files_count, + ) + + # After building all directories, link subdirectories to parents + # This needs to happen after all indexes exist + for result in all_results: + if result.error: + continue + # Link children to this directory + self._link_children_to_parent(result.source_path, all_results) + + # Update project statistics + self.registry.update_project_stats(source_root, total_files, total_dirs) + + self.logger.info( + "Index build complete: %d files, %d directories, %d errors", + total_files, + total_dirs, + len(all_errors), + ) + + return BuildResult( + project_id=project_info.id, + source_root=source_root, + index_root=index_root, + total_files=total_files, + total_dirs=total_dirs, + errors=all_errors, + ) + + def update_subtree( + self, + source_path: Path, + languages: List[str] = None, + workers: int = 4, + ) -> BuildResult: + """Incrementally update a subtree. + + Rebuilds indexes for the specified directory and all subdirectories. + Useful for incremental updates when only part of the tree changed. + + Args: + source_path: Root of subtree to update + languages: Optional list of language IDs to limit indexing + workers: Number of parallel worker processes + + Returns: + BuildResult for the subtree + + Raises: + ValueError: If source_path is not indexed + """ + source_path = source_path.resolve() + project_root = self.mapper.get_project_root(source_path) + + # Get project info + project_info = self.registry.get_project(project_root) + if not project_info: + raise ValueError(f"Directory not indexed: {source_path}") + + self.logger.info("Updating subtree at %s", source_path) + + # Use build logic but start from source_path + return self.build(source_path, languages, workers) + + def rebuild_dir(self, source_path: Path) -> DirBuildResult: + """Rebuild index for a single directory. + + Only rebuilds the specified directory, does not touch subdirectories. + Useful for updating a single directory after file changes. + + Args: + source_path: Directory to rebuild + + Returns: + DirBuildResult for the directory + """ + source_path = source_path.resolve() + self.logger.info("Rebuilding directory %s", source_path) + return self._build_single_dir(source_path) + + # === Internal Methods === + + def _collect_dirs_by_depth( + self, source_root: Path, languages: List[str] = None + ) -> Dict[int, List[Path]]: + """Collect all indexable directories grouped by depth. + + Walks the directory tree and groups directories by their depth + relative to source_root. Depth 0 is the root itself. + + Args: + source_root: Root directory to start from + languages: Optional language filter + + Returns: + Dictionary mapping depth to list of directory paths + Example: {0: [root], 1: [src, tests], 2: [src/api, src/utils]} + """ + source_root = source_root.resolve() + dirs_by_depth: Dict[int, List[Path]] = {} + + # Always include the root directory at depth 0 for chain search entry point + dirs_by_depth[0] = [source_root] + + for root, dirnames, _ in os.walk(source_root): + # Filter out ignored directories + dirnames[:] = [ + d + for d in dirnames + if d not in self.IGNORE_DIRS and not d.startswith(".") + ] + + root_path = Path(root) + + # Skip root (already added) + if root_path == source_root: + continue + + # Check if this directory should be indexed + if not self._should_index_dir(root_path, languages): + continue + + # Calculate depth relative to source_root + try: + depth = len(root_path.relative_to(source_root).parts) + except ValueError: + continue + + if depth not in dirs_by_depth: + dirs_by_depth[depth] = [] + + dirs_by_depth[depth].append(root_path) + + return dirs_by_depth + + def _should_index_dir(self, dir_path: Path, languages: List[str] = None) -> bool: + """Check if directory should be indexed. + + A directory is indexed if: + 1. It's not in IGNORE_DIRS + 2. It doesn't start with '.' + 3. It contains at least one supported language file + + Args: + dir_path: Directory to check + languages: Optional language filter + + Returns: + True if directory should be indexed + """ + # Check directory name + if dir_path.name in self.IGNORE_DIRS or dir_path.name.startswith("."): + return False + + # Check for supported files in this directory + source_files = self._iter_source_files(dir_path, languages) + return len(source_files) > 0 + + def _build_level_parallel( + self, dirs: List[Path], languages: List[str], workers: int + ) -> List[DirBuildResult]: + """Build multiple directories in parallel. + + Uses ProcessPoolExecutor to build directories concurrently. + All directories at the same level are independent and can be + processed in parallel. + + Args: + dirs: List of directories to build + languages: Language filter + workers: Number of worker processes + + Returns: + List of DirBuildResult objects + """ + results: List[DirBuildResult] = [] + + if not dirs: + return results + + # For single directory, avoid overhead of process pool + if len(dirs) == 1: + result = self._build_single_dir(dirs[0], languages) + return [result] + + # Prepare arguments for worker processes + config_dict = { + "data_dir": str(self.config.data_dir), + "supported_languages": self.config.supported_languages, + "parsing_rules": self.config.parsing_rules, + } + + worker_args = [ + ( + dir_path, + self.mapper.source_to_index_db(dir_path), + languages, + config_dict, + ) + for dir_path in dirs + ] + + # Execute in parallel + with ProcessPoolExecutor(max_workers=workers) as executor: + futures = { + executor.submit(_build_dir_worker, args): args[0] + for args in worker_args + } + + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as exc: + dir_path = futures[future] + self.logger.error("Failed to build %s: %s", dir_path, exc) + results.append( + DirBuildResult( + source_path=dir_path, + index_path=self.mapper.source_to_index_db(dir_path), + files_count=0, + symbols_count=0, + subdirs=[], + error=str(exc), + ) + ) + + return results + + def _build_single_dir( + self, dir_path: Path, languages: List[str] = None + ) -> DirBuildResult: + """Build index for a single directory. + + Creates _index.db and indexes all files in the directory. + Does not recurse into subdirectories. + + Args: + dir_path: Directory to index + languages: Optional language filter + + Returns: + DirBuildResult with statistics and subdirectory list + """ + dir_path = dir_path.resolve() + index_db_path = self.mapper.source_to_index_db(dir_path) + + try: + # Ensure index directory exists + index_db_path.parent.mkdir(parents=True, exist_ok=True) + + # Create directory index + store = DirIndexStore(index_db_path) + store.initialize() + + # Get source files in this directory only + source_files = self._iter_source_files(dir_path, languages) + + files_count = 0 + symbols_count = 0 + + for file_path in source_files: + try: + # Read and parse file + text = file_path.read_text(encoding="utf-8", errors="ignore") + language_id = self.config.language_for_path(file_path) + if not language_id: + continue + + parser = self.parser_factory.get_parser(language_id) + indexed_file = parser.parse(text, file_path) + + # Add to directory index + store.add_file( + name=file_path.name, + full_path=file_path, + content=text, + language=language_id, + symbols=indexed_file.symbols, + ) + + files_count += 1 + symbols_count += len(indexed_file.symbols) + + except Exception as exc: + self.logger.debug("Failed to index %s: %s", file_path, exc) + continue + + # Get list of subdirectories + subdirs = [ + d.name + for d in dir_path.iterdir() + if d.is_dir() + and d.name not in self.IGNORE_DIRS + and not d.name.startswith(".") + ] + + store.close() + + self.logger.debug( + "Built %s: %d files, %d symbols, %d subdirs", + dir_path, + files_count, + symbols_count, + len(subdirs), + ) + + return DirBuildResult( + source_path=dir_path, + index_path=index_db_path, + files_count=files_count, + symbols_count=symbols_count, + subdirs=subdirs, + ) + + except Exception as exc: + self.logger.error("Failed to build directory %s: %s", dir_path, exc) + return DirBuildResult( + source_path=dir_path, + index_path=index_db_path, + files_count=0, + symbols_count=0, + subdirs=[], + error=str(exc), + ) + + def _link_children_to_parent( + self, parent_path: Path, all_results: List[DirBuildResult] + ) -> None: + """Link child directory indexes to parent's subdirs table. + + Finds all direct children of parent_path in all_results and + registers them as subdirectories in the parent's index. + + Args: + parent_path: Parent directory path + all_results: List of all build results + """ + parent_index_db = self.mapper.source_to_index_db(parent_path) + + try: + store = DirIndexStore(parent_index_db) + store.initialize() + + for result in all_results: + # Only register direct children (parent is one level up) + if result.source_path.parent != parent_path: + continue + + if result.error: + continue + + # Register subdirectory link + store.register_subdir( + name=result.source_path.name, + index_path=result.index_path, + files_count=result.files_count, + direct_files=result.files_count, + ) + self.logger.debug( + "Linked %s to parent %s", + result.source_path.name, + parent_path, + ) + + store.close() + + except Exception as exc: + self.logger.error( + "Failed to link children to %s: %s", parent_path, exc + ) + + def _iter_source_files( + self, dir_path: Path, languages: List[str] = None + ) -> List[Path]: + """Iterate source files in directory (non-recursive). + + Returns files in the specified directory that match language filters. + Does not recurse into subdirectories. + + Args: + dir_path: Directory to scan + languages: Optional language filter + + Returns: + List of source file paths + """ + files: List[Path] = [] + + if not dir_path.is_dir(): + return files + + for item in dir_path.iterdir(): + if not item.is_file(): + continue + + if item.name.startswith("."): + continue + + # Check language support + language_id = self.config.language_for_path(item) + if not language_id: + continue + + # Apply language filter + if languages and language_id not in languages: + continue + + files.append(item) + + return files + + +# === Worker Function for ProcessPoolExecutor === + + +def _build_dir_worker(args: tuple) -> DirBuildResult: + """Worker function for parallel directory building. + + Must be at module level for ProcessPoolExecutor pickling. + Reconstructs necessary objects from serializable arguments. + + Args: + args: Tuple of (dir_path, index_db_path, languages, config_dict) + + Returns: + DirBuildResult for the directory + """ + dir_path, index_db_path, languages, config_dict = args + + # Reconstruct config + config = Config( + data_dir=Path(config_dict["data_dir"]), + supported_languages=config_dict["supported_languages"], + parsing_rules=config_dict["parsing_rules"], + ) + + parser_factory = ParserFactory(config) + + try: + # Ensure index directory exists + index_db_path.parent.mkdir(parents=True, exist_ok=True) + + # Create directory index + store = DirIndexStore(index_db_path) + store.initialize() + + files_count = 0 + symbols_count = 0 + + # Index files in this directory + for item in dir_path.iterdir(): + if not item.is_file(): + continue + + if item.name.startswith("."): + continue + + language_id = config.language_for_path(item) + if not language_id: + continue + + if languages and language_id not in languages: + continue + + try: + text = item.read_text(encoding="utf-8", errors="ignore") + parser = parser_factory.get_parser(language_id) + indexed_file = parser.parse(text, item) + + store.add_file( + name=item.name, + full_path=item, + content=text, + language=language_id, + symbols=indexed_file.symbols, + ) + + files_count += 1 + symbols_count += len(indexed_file.symbols) + + except Exception: + continue + + # Get subdirectories + ignore_dirs = { + ".git", + ".venv", + "venv", + "node_modules", + "__pycache__", + ".codexlens", + ".idea", + ".vscode", + } + + subdirs = [ + d.name + for d in dir_path.iterdir() + if d.is_dir() and d.name not in ignore_dirs and not d.name.startswith(".") + ] + + store.close() + + return DirBuildResult( + source_path=dir_path, + index_path=index_db_path, + files_count=files_count, + symbols_count=symbols_count, + subdirs=subdirs, + ) + + except Exception as exc: + return DirBuildResult( + source_path=dir_path, + index_path=index_db_path, + files_count=0, + symbols_count=0, + subdirs=[], + error=str(exc), + ) diff --git a/codex-lens/src/codexlens/storage/path_mapper.py b/codex-lens/src/codexlens/storage/path_mapper.py new file mode 100644 index 00000000..8d16d2df --- /dev/null +++ b/codex-lens/src/codexlens/storage/path_mapper.py @@ -0,0 +1,274 @@ +"""Path mapping utilities for source paths and index paths. + +This module provides bidirectional mapping between source code directories +and their corresponding index storage locations. + +Storage Structure: + ~/.codexlens/ + ├── registry.db # Global mapping table + └── indexes/ + └── D/ + └── Claude_dms3/ + ├── _index.db # Root directory index + └── src/ + └── _index.db # src/ directory index +""" + +import platform +from pathlib import Path +from typing import Optional + + +class PathMapper: + """Bidirectional mapping tool for source paths ↔ index paths. + + Handles cross-platform path normalization and conversion between + source code directories and their index storage locations. + + Attributes: + DEFAULT_INDEX_ROOT: Default root directory for all indexes + INDEX_DB_NAME: Standard name for index database files + index_root: Configured index root directory + """ + + DEFAULT_INDEX_ROOT = Path.home() / ".codexlens" / "indexes" + INDEX_DB_NAME = "_index.db" + + def __init__(self, index_root: Optional[Path] = None): + """Initialize PathMapper with optional custom index root. + + Args: + index_root: Custom index root directory. If None, uses DEFAULT_INDEX_ROOT. + """ + self.index_root = (index_root or self.DEFAULT_INDEX_ROOT).resolve() + + def source_to_index_dir(self, source_path: Path) -> Path: + """Convert source directory to its index directory path. + + Maps a source code directory to where its index data should be stored. + The mapping preserves the directory structure but normalizes paths + for cross-platform compatibility. + + Args: + source_path: Source directory path to map + + Returns: + Index directory path under index_root + + Examples: + >>> mapper = PathMapper() + >>> mapper.source_to_index_dir(Path("D:/Claude_dms3/src")) + PosixPath('/home/user/.codexlens/indexes/D/Claude_dms3/src') + + >>> mapper.source_to_index_dir(Path("/home/user/project")) + PosixPath('/home/user/.codexlens/indexes/home/user/project') + """ + source_path = source_path.resolve() + normalized = self.normalize_path(source_path) + return self.index_root / normalized + + def source_to_index_db(self, source_path: Path) -> Path: + """Convert source directory to its index database file path. + + Maps a source directory to the full path of its index database file, + including the standard INDEX_DB_NAME. + + Args: + source_path: Source directory path to map + + Returns: + Full path to the index database file + + Examples: + >>> mapper = PathMapper() + >>> mapper.source_to_index_db(Path("D:/Claude_dms3/src")) + PosixPath('/home/user/.codexlens/indexes/D/Claude_dms3/src/_index.db') + """ + index_dir = self.source_to_index_dir(source_path) + return index_dir / self.INDEX_DB_NAME + + def index_to_source(self, index_path: Path) -> Path: + """Convert index path back to original source path. + + Performs reverse mapping from an index storage location to the + original source directory. Handles both directory paths and + database file paths. + + Args: + index_path: Index directory or database file path + + Returns: + Original source directory path + + Raises: + ValueError: If index_path is not under index_root + + Examples: + >>> mapper = PathMapper() + >>> mapper.index_to_source( + ... Path("~/.codexlens/indexes/D/Claude_dms3/src/_index.db") + ... ) + WindowsPath('D:/Claude_dms3/src') + + >>> mapper.index_to_source( + ... Path("~/.codexlens/indexes/D/Claude_dms3/src") + ... ) + WindowsPath('D:/Claude_dms3/src') + """ + index_path = index_path.resolve() + + # Remove _index.db if present + if index_path.name == self.INDEX_DB_NAME: + index_path = index_path.parent + + # Verify path is under index_root + try: + relative = index_path.relative_to(self.index_root) + except ValueError: + raise ValueError( + f"Index path {index_path} is not under index root {self.index_root}" + ) + + # Convert normalized path back to source path + normalized_str = str(relative).replace("\\", "/") + return self.denormalize_path(normalized_str) + + def get_project_root(self, source_path: Path) -> Path: + """Find the project root directory (topmost indexed directory). + + Walks up the directory tree to find the highest-level directory + that has an index database. + + Args: + source_path: Source directory to start from + + Returns: + Project root directory path. Returns source_path itself if + no parent index is found. + + Examples: + >>> mapper = PathMapper() + >>> mapper.get_project_root(Path("D:/Claude_dms3/src/codexlens")) + WindowsPath('D:/Claude_dms3') + """ + source_path = source_path.resolve() + current = source_path + project_root = source_path + + # Walk up the tree + while current.parent != current: # Stop at filesystem root + parent_index_db = self.source_to_index_db(current.parent) + if parent_index_db.exists(): + project_root = current.parent + current = current.parent + else: + break + + return project_root + + def get_relative_depth(self, source_path: Path, project_root: Path) -> int: + """Calculate directory depth relative to project root. + + Args: + source_path: Target directory path + project_root: Project root directory path + + Returns: + Number of directory levels from project_root to source_path + + Raises: + ValueError: If source_path is not under project_root + + Examples: + >>> mapper = PathMapper() + >>> mapper.get_relative_depth( + ... Path("D:/Claude_dms3/src/codexlens"), + ... Path("D:/Claude_dms3") + ... ) + 2 + """ + source_path = source_path.resolve() + project_root = project_root.resolve() + + try: + relative = source_path.relative_to(project_root) + # Count path components + return len(relative.parts) + except ValueError: + raise ValueError( + f"Source path {source_path} is not under project root {project_root}" + ) + + def normalize_path(self, path: Path) -> str: + """Normalize path to cross-platform storage format. + + Converts OS-specific paths to a standardized format for storage: + - Windows: Removes drive colons (D: → D) + - Unix: Removes leading slash + - Uses forward slashes throughout + + Args: + path: Path to normalize + + Returns: + Normalized path string + + Examples: + >>> mapper = PathMapper() + >>> mapper.normalize_path(Path("D:/path/to/dir")) + 'D/path/to/dir' + + >>> mapper.normalize_path(Path("/home/user/path")) + 'home/user/path' + """ + path = path.resolve() + path_str = str(path) + + # Handle Windows paths with drive letters + if platform.system() == "Windows" and len(path.parts) > 0: + # Convert D:\path\to\dir → D/path/to/dir + drive = path.parts[0].replace(":", "") # D: → D + rest = Path(*path.parts[1:]) if len(path.parts) > 1 else Path() + normalized = f"{drive}/{rest}".replace("\\", "/") + return normalized.rstrip("/") + + # Handle Unix paths + # /home/user/path → home/user/path + return path_str.lstrip("/").replace("\\", "/") + + def denormalize_path(self, normalized: str) -> Path: + """Convert normalized path back to OS-specific path. + + Reverses the normalization process to restore OS-native path format: + - Windows: Adds drive colons (D → D:) + - Unix: Adds leading slash + + Args: + normalized: Normalized path string + + Returns: + OS-specific Path object + + Examples: + >>> mapper = PathMapper() + >>> mapper.denormalize_path("D/path/to/dir") # On Windows + WindowsPath('D:/path/to/dir') + + >>> mapper.denormalize_path("home/user/path") # On Unix + PosixPath('/home/user/path') + """ + parts = normalized.split("/") + + # Handle Windows paths + if platform.system() == "Windows" and len(parts) > 0: + # Check if first part is a drive letter + if len(parts[0]) == 1 and parts[0].isalpha(): + # D/path/to/dir → D:/path/to/dir + drive = f"{parts[0]}:" + if len(parts) > 1: + return Path(drive) / Path(*parts[1:]) + return Path(drive) + + # Handle Unix paths or relative paths + # home/user/path → /home/user/path + return Path("/") / Path(*parts) diff --git a/codex-lens/src/codexlens/storage/registry.py b/codex-lens/src/codexlens/storage/registry.py new file mode 100644 index 00000000..6456529f --- /dev/null +++ b/codex-lens/src/codexlens/storage/registry.py @@ -0,0 +1,600 @@ +"""Global project registry for CodexLens - SQLite storage.""" + +from __future__ import annotations + +import sqlite3 +import threading +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional + +from codexlens.errors import StorageError + + +@dataclass +class ProjectInfo: + """Registered project information.""" + + id: int + source_root: Path + index_root: Path + created_at: float + last_indexed: float + total_files: int + total_dirs: int + status: str + + +@dataclass +class DirMapping: + """Directory to index path mapping.""" + + id: int + project_id: int + source_path: Path + index_path: Path + depth: int + files_count: int + last_updated: float + + +class RegistryStore: + """Global project registry - SQLite storage. + + Manages indexed projects and directory-to-index path mappings. + Thread-safe with connection pooling. + """ + + DEFAULT_DB_PATH = Path.home() / ".codexlens" / "registry.db" + + def __init__(self, db_path: Path | None = None) -> None: + self.db_path = (db_path or self.DEFAULT_DB_PATH).resolve() + self._lock = threading.RLock() + self._local = threading.local() + self._pool_lock = threading.Lock() + self._pool: Dict[int, sqlite3.Connection] = {} + self._pool_generation = 0 + + def _get_connection(self) -> sqlite3.Connection: + """Get or create a thread-local database connection.""" + thread_id = threading.get_ident() + if getattr(self._local, "generation", None) == self._pool_generation: + conn = getattr(self._local, "conn", None) + if conn is not None: + return conn + + with self._pool_lock: + conn = self._pool.get(thread_id) + if conn is None: + conn = sqlite3.connect(self.db_path, check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA foreign_keys=ON") + self._pool[thread_id] = conn + + self._local.conn = conn + self._local.generation = self._pool_generation + return conn + + def close(self) -> None: + """Close all pooled connections.""" + with self._lock: + with self._pool_lock: + for conn in self._pool.values(): + conn.close() + self._pool.clear() + self._pool_generation += 1 + + if hasattr(self._local, "conn"): + self._local.conn = None + if hasattr(self._local, "generation"): + self._local.generation = self._pool_generation + + def __enter__(self) -> RegistryStore: + self.initialize() + return self + + def __exit__(self, exc_type: object, exc: object, tb: object) -> None: + self.close() + + def initialize(self) -> None: + """Create database and schema.""" + with self._lock: + self.db_path.parent.mkdir(parents=True, exist_ok=True) + conn = self._get_connection() + self._create_schema(conn) + + def _create_schema(self, conn: sqlite3.Connection) -> None: + """Create database schema.""" + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS projects ( + id INTEGER PRIMARY KEY, + source_root TEXT UNIQUE NOT NULL, + index_root TEXT NOT NULL, + created_at REAL, + last_indexed REAL, + total_files INTEGER DEFAULT 0, + total_dirs INTEGER DEFAULT 0, + status TEXT DEFAULT 'active' + ) + """ + ) + + conn.execute( + """ + CREATE TABLE IF NOT EXISTS dir_mapping ( + id INTEGER PRIMARY KEY, + project_id INTEGER REFERENCES projects(id) ON DELETE CASCADE, + source_path TEXT NOT NULL, + index_path TEXT NOT NULL, + depth INTEGER, + files_count INTEGER DEFAULT 0, + last_updated REAL, + UNIQUE(source_path) + ) + """ + ) + + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_dir_source ON dir_mapping(source_path)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_dir_project ON dir_mapping(project_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_project_source ON projects(source_root)" + ) + + conn.commit() + except sqlite3.DatabaseError as exc: + raise StorageError(f"Failed to initialize registry schema: {exc}") from exc + + # === Project Operations === + + def register_project(self, source_root: Path, index_root: Path) -> ProjectInfo: + """Register a new project or update existing one. + + Args: + source_root: Source code root directory + index_root: Index storage root directory + + Returns: + ProjectInfo for the registered project + """ + with self._lock: + conn = self._get_connection() + source_root_str = str(source_root.resolve()) + index_root_str = str(index_root.resolve()) + now = time.time() + + conn.execute( + """ + INSERT INTO projects(source_root, index_root, created_at, last_indexed) + VALUES(?, ?, ?, ?) + ON CONFLICT(source_root) DO UPDATE SET + index_root=excluded.index_root, + last_indexed=excluded.last_indexed, + status='active' + """, + (source_root_str, index_root_str, now, now), + ) + + row = conn.execute( + "SELECT * FROM projects WHERE source_root=?", (source_root_str,) + ).fetchone() + + conn.commit() + + if not row: + raise StorageError(f"Failed to register project: {source_root}") + + return self._row_to_project_info(row) + + def unregister_project(self, source_root: Path) -> bool: + """Remove a project registration (cascades to directory mappings). + + Args: + source_root: Source code root directory + + Returns: + True if project was removed, False if not found + """ + with self._lock: + conn = self._get_connection() + source_root_str = str(source_root.resolve()) + + row = conn.execute( + "SELECT id FROM projects WHERE source_root=?", (source_root_str,) + ).fetchone() + + if not row: + return False + + conn.execute("DELETE FROM projects WHERE source_root=?", (source_root_str,)) + conn.commit() + return True + + def get_project(self, source_root: Path) -> Optional[ProjectInfo]: + """Get project information by source root. + + Args: + source_root: Source code root directory + + Returns: + ProjectInfo if found, None otherwise + """ + with self._lock: + conn = self._get_connection() + source_root_str = str(source_root.resolve()) + + row = conn.execute( + "SELECT * FROM projects WHERE source_root=?", (source_root_str,) + ).fetchone() + + return self._row_to_project_info(row) if row else None + + def get_project_by_id(self, project_id: int) -> Optional[ProjectInfo]: + """Get project information by ID. + + Args: + project_id: Project database ID + + Returns: + ProjectInfo if found, None otherwise + """ + with self._lock: + conn = self._get_connection() + + row = conn.execute( + "SELECT * FROM projects WHERE id=?", (project_id,) + ).fetchone() + + return self._row_to_project_info(row) if row else None + + def list_projects(self, status: Optional[str] = None) -> List[ProjectInfo]: + """List all registered projects. + + Args: + status: Optional status filter ('active', 'stale', 'removed') + + Returns: + List of ProjectInfo objects + """ + with self._lock: + conn = self._get_connection() + + if status: + rows = conn.execute( + "SELECT * FROM projects WHERE status=? ORDER BY created_at DESC", + (status,), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM projects ORDER BY created_at DESC" + ).fetchall() + + return [self._row_to_project_info(row) for row in rows] + + def update_project_stats( + self, source_root: Path, total_files: int, total_dirs: int + ) -> None: + """Update project statistics. + + Args: + source_root: Source code root directory + total_files: Total number of indexed files + total_dirs: Total number of indexed directories + """ + with self._lock: + conn = self._get_connection() + source_root_str = str(source_root.resolve()) + + conn.execute( + """ + UPDATE projects + SET total_files=?, total_dirs=?, last_indexed=? + WHERE source_root=? + """, + (total_files, total_dirs, time.time(), source_root_str), + ) + conn.commit() + + def set_project_status(self, source_root: Path, status: str) -> None: + """Set project status. + + Args: + source_root: Source code root directory + status: Status string ('active', 'stale', 'removed') + """ + with self._lock: + conn = self._get_connection() + source_root_str = str(source_root.resolve()) + + conn.execute( + "UPDATE projects SET status=? WHERE source_root=?", + (status, source_root_str), + ) + conn.commit() + + # === Directory Mapping Operations === + + def register_dir( + self, + project_id: int, + source_path: Path, + index_path: Path, + depth: int, + files_count: int = 0, + ) -> DirMapping: + """Register a directory mapping. + + Args: + project_id: Project database ID + source_path: Source directory path + index_path: Index database path + depth: Directory depth relative to project root + files_count: Number of files in directory + + Returns: + DirMapping for the registered directory + """ + with self._lock: + conn = self._get_connection() + source_path_str = str(source_path.resolve()) + index_path_str = str(index_path.resolve()) + now = time.time() + + conn.execute( + """ + INSERT INTO dir_mapping( + project_id, source_path, index_path, depth, files_count, last_updated + ) + VALUES(?, ?, ?, ?, ?, ?) + ON CONFLICT(source_path) DO UPDATE SET + index_path=excluded.index_path, + depth=excluded.depth, + files_count=excluded.files_count, + last_updated=excluded.last_updated + """, + (project_id, source_path_str, index_path_str, depth, files_count, now), + ) + + row = conn.execute( + "SELECT * FROM dir_mapping WHERE source_path=?", (source_path_str,) + ).fetchone() + + conn.commit() + + if not row: + raise StorageError(f"Failed to register directory: {source_path}") + + return self._row_to_dir_mapping(row) + + def unregister_dir(self, source_path: Path) -> bool: + """Remove a directory mapping. + + Args: + source_path: Source directory path + + Returns: + True if directory was removed, False if not found + """ + with self._lock: + conn = self._get_connection() + source_path_str = str(source_path.resolve()) + + row = conn.execute( + "SELECT id FROM dir_mapping WHERE source_path=?", (source_path_str,) + ).fetchone() + + if not row: + return False + + conn.execute("DELETE FROM dir_mapping WHERE source_path=?", (source_path_str,)) + conn.commit() + return True + + def find_index_path(self, source_path: Path) -> Optional[Path]: + """Find index path for a source directory (exact match). + + Args: + source_path: Source directory path + + Returns: + Index path if found, None otherwise + """ + with self._lock: + conn = self._get_connection() + source_path_str = str(source_path.resolve()) + + row = conn.execute( + "SELECT index_path FROM dir_mapping WHERE source_path=?", + (source_path_str,), + ).fetchone() + + return Path(row["index_path"]) if row else None + + def find_nearest_index(self, source_path: Path) -> Optional[DirMapping]: + """Find nearest indexed ancestor directory. + + Searches for the closest parent directory that has an index. + Useful for supporting subdirectory searches. + + Args: + source_path: Source directory or file path + + Returns: + DirMapping for nearest ancestor, None if not found + """ + with self._lock: + conn = self._get_connection() + source_path_resolved = source_path.resolve() + + # Check from current path up to root + current = source_path_resolved + while True: + current_str = str(current) + row = conn.execute( + "SELECT * FROM dir_mapping WHERE source_path=?", (current_str,) + ).fetchone() + + if row: + return self._row_to_dir_mapping(row) + + parent = current.parent + if parent == current: # Reached filesystem root + break + current = parent + + return None + + def get_project_dirs(self, project_id: int) -> List[DirMapping]: + """Get all directory mappings for a project. + + Args: + project_id: Project database ID + + Returns: + List of DirMapping objects + """ + with self._lock: + conn = self._get_connection() + + rows = conn.execute( + "SELECT * FROM dir_mapping WHERE project_id=? ORDER BY depth, source_path", + (project_id,), + ).fetchall() + + return [self._row_to_dir_mapping(row) for row in rows] + + def get_subdirs(self, source_path: Path) -> List[DirMapping]: + """Get direct subdirectory mappings. + + Args: + source_path: Parent directory path + + Returns: + List of DirMapping objects for direct children + """ + with self._lock: + conn = self._get_connection() + source_path_str = str(source_path.resolve()) + + # First get the parent's depth + parent_row = conn.execute( + "SELECT depth, project_id FROM dir_mapping WHERE source_path=?", + (source_path_str,), + ).fetchone() + + if not parent_row: + return [] + + parent_depth = int(parent_row["depth"]) + project_id = int(parent_row["project_id"]) + + # Get all subdirs with depth = parent_depth + 1 and matching path prefix + rows = conn.execute( + """ + SELECT * FROM dir_mapping + WHERE project_id=? AND depth=? AND source_path LIKE ? + ORDER BY source_path + """, + (project_id, parent_depth + 1, f"{source_path_str}%"), + ).fetchall() + + return [self._row_to_dir_mapping(row) for row in rows] + + def update_dir_stats(self, source_path: Path, files_count: int) -> None: + """Update directory statistics. + + Args: + source_path: Source directory path + files_count: Number of files in directory + """ + with self._lock: + conn = self._get_connection() + source_path_str = str(source_path.resolve()) + + conn.execute( + """ + UPDATE dir_mapping + SET files_count=?, last_updated=? + WHERE source_path=? + """, + (files_count, time.time(), source_path_str), + ) + conn.commit() + + def update_index_paths(self, old_root: Path, new_root: Path) -> int: + """Update all index paths after migration. + + Replaces old_root prefix with new_root in all stored index paths. + + Args: + old_root: Old index root directory + new_root: New index root directory + + Returns: + Number of paths updated + """ + with self._lock: + conn = self._get_connection() + old_root_str = str(old_root.resolve()) + new_root_str = str(new_root.resolve()) + updated = 0 + + # Update projects + conn.execute( + """ + UPDATE projects + SET index_root = REPLACE(index_root, ?, ?) + WHERE index_root LIKE ? + """, + (old_root_str, new_root_str, f"{old_root_str}%"), + ) + updated += conn.total_changes + + # Update dir_mapping + conn.execute( + """ + UPDATE dir_mapping + SET index_path = REPLACE(index_path, ?, ?) + WHERE index_path LIKE ? + """, + (old_root_str, new_root_str, f"{old_root_str}%"), + ) + updated += conn.total_changes + + conn.commit() + return updated + + # === Internal Methods === + + def _row_to_project_info(self, row: sqlite3.Row) -> ProjectInfo: + """Convert database row to ProjectInfo.""" + return ProjectInfo( + id=int(row["id"]), + source_root=Path(row["source_root"]), + index_root=Path(row["index_root"]), + created_at=float(row["created_at"]) if row["created_at"] else 0.0, + last_indexed=float(row["last_indexed"]) if row["last_indexed"] else 0.0, + total_files=int(row["total_files"]) if row["total_files"] else 0, + total_dirs=int(row["total_dirs"]) if row["total_dirs"] else 0, + status=str(row["status"]) if row["status"] else "active", + ) + + def _row_to_dir_mapping(self, row: sqlite3.Row) -> DirMapping: + """Convert database row to DirMapping.""" + return DirMapping( + id=int(row["id"]), + project_id=int(row["project_id"]), + source_path=Path(row["source_path"]), + index_path=Path(row["index_path"]), + depth=int(row["depth"]) if row["depth"] is not None else 0, + files_count=int(row["files_count"]) if row["files_count"] else 0, + last_updated=float(row["last_updated"]) if row["last_updated"] else 0.0, + ) diff --git a/codex-lens/src/codexlens/storage/sqlite_store.py b/codex-lens/src/codexlens/storage/sqlite_store.py index f600c89d..31418912 100644 --- a/codex-lens/src/codexlens/storage/sqlite_store.py +++ b/codex-lens/src/codexlens/storage/sqlite_store.py @@ -43,6 +43,8 @@ class SQLiteStore: conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") conn.execute("PRAGMA foreign_keys=ON") + # Memory-mapped I/O for faster reads (30GB limit) + conn.execute("PRAGMA mmap_size=30000000000") self._pool[thread_id] = conn self._local.conn = conn @@ -384,7 +386,8 @@ class SQLiteStore: language UNINDEXED, content, content='files', - content_rowid='id' + content_rowid='id', + tokenize="unicode61 tokenchars '_'" ) """ ) diff --git a/codex-lens/tests/test_search_comprehensive.py b/codex-lens/tests/test_search_comprehensive.py new file mode 100644 index 00000000..f26f256c --- /dev/null +++ b/codex-lens/tests/test_search_comprehensive.py @@ -0,0 +1,603 @@ +"""Comprehensive tests for CodexLens search functionality. + +Tests cover: +- FTS5 text search (basic, phrase, boolean, wildcard) +- Chain search across directories +- Symbol search (by name, kind, filters) +- Files-only search mode +- Edge cases and error handling +""" + +import tempfile +import pytest +from pathlib import Path +from unittest.mock import MagicMock, patch + +from codexlens.storage.sqlite_store import SQLiteStore +from codexlens.storage.dir_index import DirIndexStore +from codexlens.storage.registry import RegistryStore +from codexlens.storage.path_mapper import PathMapper +from codexlens.search import ( + ChainSearchEngine, + SearchOptions, + SearchStats, + ChainSearchResult, + quick_search, +) +from codexlens.entities import IndexedFile, Symbol, SearchResult + + +# === Fixtures === + +@pytest.fixture +def temp_dir(): + """Create a temporary directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +@pytest.fixture +def sample_files(): + """Sample file data for testing.""" + return [ + (IndexedFile( + path="/project/src/auth.py", + language="python", + symbols=[ + Symbol(name="authenticate", kind="function", range=(1, 10)), + Symbol(name="verify_token", kind="function", range=(12, 20)), + Symbol(name="AuthManager", kind="class", range=(22, 50)), + ], + ), """ +def authenticate(username, password): + '''Authenticate user with credentials.''' + user = find_user(username) + if user and check_password(user, password): + return create_token(user) + return None + +def verify_token(token): + '''Verify JWT token validity.''' + try: + payload = decode_token(token) + return payload + except TokenExpired: + return None + +class AuthManager: + '''Manages authentication state.''' + def __init__(self): + self.sessions = {} + + def login(self, user): + token = authenticate(user.name, user.password) + self.sessions[user.id] = token + return token +"""), + (IndexedFile( + path="/project/src/database.py", + language="python", + symbols=[ + Symbol(name="connect", kind="function", range=(1, 5)), + Symbol(name="query", kind="function", range=(7, 15)), + Symbol(name="DatabasePool", kind="class", range=(17, 40)), + ], + ), """ +def connect(host, port, database): + '''Establish database connection.''' + return Connection(host, port, database) + +def query(connection, sql, params=None): + '''Execute SQL query and return results.''' + cursor = connection.cursor() + cursor.execute(sql, params or []) + return cursor.fetchall() + +class DatabasePool: + '''Connection pool for database.''' + def __init__(self, size=10): + self.pool = [] + self.size = size + + def get_connection(self): + if self.pool: + return self.pool.pop() + return connect() +"""), + (IndexedFile( + path="/project/src/utils.py", + language="python", + symbols=[ + Symbol(name="format_date", kind="function", range=(1, 3)), + Symbol(name="parse_json", kind="function", range=(5, 10)), + Symbol(name="hash_password", kind="function", range=(12, 18)), + ], + ), """ +def format_date(date, fmt='%Y-%m-%d'): + return date.strftime(fmt) + +def parse_json(data): + '''Parse JSON string to dictionary.''' + import json + return json.loads(data) + +def hash_password(password, salt=None): + '''Hash password using bcrypt.''' + import hashlib + salt = salt or generate_salt() + return hashlib.sha256((password + salt).encode()).hexdigest() +"""), + ] + + +@pytest.fixture +def populated_store(temp_dir, sample_files): + """Create a populated SQLite store for testing.""" + db_path = temp_dir / "_index.db" + store = SQLiteStore(db_path) + store.initialize() + + for indexed_file, content in sample_files: + store.add_file(indexed_file, content) + + yield store + store.close() + + +@pytest.fixture +def populated_dir_store(temp_dir, sample_files): + """Create a populated DirIndexStore for testing.""" + db_path = temp_dir / "_index.db" + store = DirIndexStore(db_path) + + for indexed_file, content in sample_files: + store.add_file(indexed_file, content) + + yield store + store.close() + + +# === FTS5 Search Tests === + +class TestFTS5BasicSearch: + """Tests for basic FTS5 text search.""" + + def test_single_term_search(self, populated_store): + """Test search with a single term.""" + results = populated_store.search_fts("authenticate") + assert len(results) >= 1 + assert any("auth" in r.path.lower() for r in results) + + def test_case_insensitive_search(self, populated_store): + """Test that search is case insensitive.""" + results_lower = populated_store.search_fts("database") + results_upper = populated_store.search_fts("DATABASE") + results_mixed = populated_store.search_fts("DataBase") + + # All should return similar results + assert len(results_lower) == len(results_upper) == len(results_mixed) + + def test_partial_word_search(self, populated_store): + """Test search with partial words using wildcards.""" + results = populated_store.search_fts("auth*") + assert len(results) >= 1 + # Should match authenticate, authentication, AuthManager, etc. + + def test_multiple_terms_search(self, populated_store): + """Test search with multiple terms (implicit AND).""" + results = populated_store.search_fts("user password") + assert len(results) >= 1 + + def test_no_results_search(self, populated_store): + """Test search that returns no results.""" + results = populated_store.search_fts("nonexistent_xyz_term") + assert len(results) == 0 + + def test_search_with_limit(self, populated_store): + """Test search respects limit parameter.""" + results = populated_store.search_fts("def", limit=1) + assert len(results) <= 1 + + def test_search_returns_excerpt(self, populated_store): + """Test search results include excerpts.""" + results = populated_store.search_fts("authenticate") + assert len(results) >= 1 + # SearchResult should have excerpt field + for r in results: + assert hasattr(r, 'excerpt') + + +class TestFTS5AdvancedSearch: + """Tests for advanced FTS5 search features.""" + + def test_phrase_search(self, populated_store): + """Test exact phrase search with quotes.""" + results = populated_store.search_fts('"verify_token"') + assert len(results) >= 1 + + def test_boolean_or_search(self, populated_store): + """Test OR boolean search.""" + results = populated_store.search_fts("authenticate OR database") + # Should find files containing either term + assert len(results) >= 2 + + def test_boolean_not_search(self, populated_store): + """Test NOT boolean search.""" + all_results = populated_store.search_fts("def") + not_results = populated_store.search_fts("def NOT authenticate") + # NOT should return fewer results + assert len(not_results) <= len(all_results) + + def test_prefix_search(self, populated_store): + """Test prefix search with asterisk.""" + results = populated_store.search_fts("connect*") + assert len(results) >= 1 + # Should match connect, connection, etc. + + def test_special_characters_in_query(self, populated_store): + """Test search handles special characters gracefully.""" + # Should not raise an error + results = populated_store.search_fts("__init__") + # May or may not have results, but shouldn't crash + + def test_unicode_search(self, temp_dir): + """Test search with unicode content.""" + store = SQLiteStore(temp_dir / "_index.db") + store.initialize() + + indexed_file = IndexedFile( + path="/test/unicode.py", + language="python", + symbols=[Symbol(name="世界", kind="function", range=(1, 1))], + ) + store.add_file(indexed_file, "def 世界(): return '你好世界'") + + results = store.search_fts("世界") + assert len(results) == 1 + + store.close() + + +class TestFTS5Pagination: + """Tests for FTS5 search pagination.""" + + def test_offset_pagination(self, temp_dir): + """Test search with offset for pagination.""" + store = SQLiteStore(temp_dir / "_index.db") + store.initialize() + + # Add multiple files + for i in range(10): + indexed_file = IndexedFile( + path=f"/test/file{i}.py", + language="python", + symbols=[], + ) + store.add_file(indexed_file, f"searchable content number {i}") + + page1 = store.search_fts("searchable", limit=3, offset=0) + page2 = store.search_fts("searchable", limit=3, offset=3) + page3 = store.search_fts("searchable", limit=3, offset=6) + + # Each page should have different results + paths1 = {r.path for r in page1} + paths2 = {r.path for r in page2} + paths3 = {r.path for r in page3} + + assert paths1.isdisjoint(paths2) + assert paths2.isdisjoint(paths3) + + store.close() + + def test_offset_beyond_results(self, populated_store): + """Test offset beyond available results.""" + results = populated_store.search_fts("authenticate", limit=10, offset=1000) + assert len(results) == 0 + + +# === Symbol Search Tests === + +class TestSymbolSearch: + """Tests for symbol search functionality.""" + + def test_search_by_name(self, populated_store): + """Test symbol search by name.""" + results = populated_store.search_symbols("auth") + assert len(results) >= 1 + assert any("auth" in s.name.lower() for s in results) + + def test_search_by_kind_function(self, populated_store): + """Test symbol search filtered by kind=function.""" + results = populated_store.search_symbols("", kind="function") + assert all(s.kind == "function" for s in results) + + def test_search_by_kind_class(self, populated_store): + """Test symbol search filtered by kind=class.""" + results = populated_store.search_symbols("", kind="class") + assert all(s.kind == "class" for s in results) + assert any("Manager" in s.name or "Pool" in s.name for s in results) + + def test_search_symbols_with_limit(self, populated_store): + """Test symbol search respects limit.""" + results = populated_store.search_symbols("", limit=2) + assert len(results) <= 2 + + def test_search_symbols_returns_range(self, populated_store): + """Test symbol search results include line range.""" + results = populated_store.search_symbols("authenticate") + assert len(results) >= 1 + for sym in results: + assert hasattr(sym, 'range') + assert len(sym.range) == 2 + assert sym.range[0] <= sym.range[1] + + +# === Chain Search Tests === + +class TestChainSearchEngine: + """Tests for ChainSearchEngine.""" + + @pytest.fixture + def mock_registry(self): + """Create a mock registry.""" + registry = MagicMock(spec=RegistryStore) + registry.find_nearest_index.return_value = None + return registry + + @pytest.fixture + def mock_mapper(self): + """Create a mock path mapper.""" + return MagicMock(spec=PathMapper) + + def test_search_no_index_found(self, mock_registry, mock_mapper): + """Test search when no index is found.""" + mock_mapper.source_to_index_db.return_value = Path("/nonexistent/_index.db") + + engine = ChainSearchEngine(mock_registry, mock_mapper) + result = engine.search("test", Path("/nonexistent")) + + assert result.results == [] + assert result.symbols == [] + assert result.stats.dirs_searched == 0 + + def test_search_options_depth(self, mock_registry, mock_mapper, temp_dir): + """Test search respects depth option.""" + # Create a simple index structure + db_path = temp_dir / "_index.db" + store = DirIndexStore(db_path) + store.initialize() + store.add_file( + name="test.py", + full_path=str(temp_dir / "test.py"), + content="test content searchable", + language="python", + ) + store.close() + + mock_mapper.source_to_index_db.return_value = db_path + + engine = ChainSearchEngine(mock_registry, mock_mapper) + options = SearchOptions(depth=0) # Only current dir + + result = engine.search("test", temp_dir, options) + + # With depth=0, should only search current directory + assert result.stats.dirs_searched <= 1 + + def test_search_files_only(self, mock_registry, mock_mapper, temp_dir): + """Test search_files_only returns only paths.""" + db_path = temp_dir / "_index.db" + store = DirIndexStore(db_path) + store.initialize() + store.add_file( + name="test.py", + full_path=str(temp_dir / "test.py"), + content="searchable content here", + language="python", + ) + store.close() + + mock_mapper.source_to_index_db.return_value = db_path + + engine = ChainSearchEngine(mock_registry, mock_mapper) + paths = engine.search_files_only("searchable", temp_dir) + + assert isinstance(paths, list) + for p in paths: + assert isinstance(p, str) + + def test_search_symbols_engine(self, mock_registry, mock_mapper, temp_dir): + """Test symbol search through engine.""" + db_path = temp_dir / "_index.db" + store = DirIndexStore(db_path) + store.initialize() + store.add_file( + name="test.py", + full_path=str(temp_dir / "test.py"), + content="def my_function(): pass", + language="python", + symbols=[Symbol(name="my_function", kind="function", range=(1, 5))], + ) + store.close() + + mock_mapper.source_to_index_db.return_value = db_path + + engine = ChainSearchEngine(mock_registry, mock_mapper) + symbols = engine.search_symbols("my_func", temp_dir) + + assert len(symbols) >= 1 + assert symbols[0].name == "my_function" + + def test_search_result_stats(self, mock_registry, mock_mapper, temp_dir): + """Test search result includes proper stats.""" + db_path = temp_dir / "_index.db" + store = DirIndexStore(db_path) + store.initialize() + store.add_file( + name="test.py", + full_path=str(temp_dir / "test.py"), + content="content to search", + language="python", + ) + store.close() + + mock_mapper.source_to_index_db.return_value = db_path + + engine = ChainSearchEngine(mock_registry, mock_mapper) + result = engine.search("content", temp_dir) + + assert result.stats.time_ms >= 0 + assert result.stats.dirs_searched >= 0 + assert isinstance(result.stats.errors, list) + + +class TestSearchOptions: + """Tests for SearchOptions configuration.""" + + def test_default_options(self): + """Test default search options.""" + options = SearchOptions() + assert options.depth == -1 + assert options.max_workers == 8 + assert options.limit_per_dir == 10 + assert options.total_limit == 100 + assert options.include_symbols is False + assert options.files_only is False + + def test_custom_options(self): + """Test custom search options.""" + options = SearchOptions( + depth=3, + max_workers=4, + limit_per_dir=5, + total_limit=50, + include_symbols=True, + files_only=True, + ) + assert options.depth == 3 + assert options.max_workers == 4 + assert options.limit_per_dir == 5 + assert options.total_limit == 50 + assert options.include_symbols is True + assert options.files_only is True + + +# === Edge Cases and Error Handling === + +class TestSearchEdgeCases: + """Edge case tests for search functionality.""" + + def test_empty_query(self, populated_store): + """Test search with empty query.""" + # Empty query may raise an error or return empty results + try: + results = populated_store.search_fts("") + assert isinstance(results, list) + except Exception: + # Some implementations may reject empty queries + pass + + def test_whitespace_query(self, populated_store): + """Test search with whitespace-only query.""" + # Whitespace query may raise an error or return empty results + try: + results = populated_store.search_fts(" ") + assert isinstance(results, list) + except Exception: + # Some implementations may reject whitespace queries + pass + + def test_very_long_query(self, populated_store): + """Test search with very long query.""" + long_query = "function " * 100 # Repeat valid word + try: + results = populated_store.search_fts(long_query) + assert isinstance(results, list) + except Exception: + # Very long queries may be rejected + pass + + def test_special_sql_characters(self, populated_store): + """Test search handles SQL-like characters safely.""" + # These should not cause SQL injection - may raise FTS syntax errors + queries = ["test", "function*", "test OR data"] + for q in queries: + results = populated_store.search_fts(q) + assert isinstance(results, list) + + def test_search_reopened_store(self, temp_dir, sample_files): + """Test search works after store is reopened.""" + db_path = temp_dir / "_index.db" + store = SQLiteStore(db_path) + store.initialize() + store.add_file(sample_files[0][0], sample_files[0][1]) + store.close() + + # Reopen and search + store2 = SQLiteStore(db_path) + store2.initialize() + results = store2.search_fts("authenticate") + assert len(results) >= 1 + store2.close() + + def test_concurrent_searches(self, populated_store): + """Test multiple concurrent searches.""" + import threading + + results = [] + errors = [] + + def search_task(query): + try: + r = populated_store.search_fts(query) + results.append(len(r)) + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=search_task, args=("authenticate",)), + threading.Thread(target=search_task, args=("database",)), + threading.Thread(target=search_task, args=("password",)), + ] + + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(errors) == 0 + assert len(results) == 3 + + +class TestChainSearchResult: + """Tests for ChainSearchResult dataclass.""" + + def test_result_structure(self): + """Test ChainSearchResult has all required fields.""" + result = ChainSearchResult( + query="test", + results=[], + symbols=[], + stats=SearchStats(), + ) + assert result.query == "test" + assert result.results == [] + assert result.symbols == [] + assert result.stats.dirs_searched == 0 + + +class TestSearchStats: + """Tests for SearchStats dataclass.""" + + def test_default_stats(self): + """Test default search stats.""" + stats = SearchStats() + assert stats.dirs_searched == 0 + assert stats.files_matched == 0 + assert stats.time_ms == 0 + assert stats.errors == [] + + def test_stats_with_errors(self): + """Test search stats with errors.""" + stats = SearchStats(errors=["Error 1", "Error 2"]) + assert len(stats.errors) == 2 diff --git a/codex-lens/tests/test_search_performance.py b/codex-lens/tests/test_search_performance.py new file mode 100644 index 00000000..5460efb5 --- /dev/null +++ b/codex-lens/tests/test_search_performance.py @@ -0,0 +1,660 @@ +"""Performance benchmarks for CodexLens search functionality. + +Measures: +- FTS5 search speed at various scales +- Chain search traversal performance +- Semantic search latency +- Memory usage during search operations +""" + +import gc +import sys +import tempfile +import time +from pathlib import Path +from typing import List, Tuple +from dataclasses import dataclass +from contextlib import contextmanager + +import pytest + +from codexlens.storage.sqlite_store import SQLiteStore +from codexlens.storage.dir_index import DirIndexStore +from codexlens.storage.registry import RegistryStore +from codexlens.storage.path_mapper import PathMapper +from codexlens.search import ChainSearchEngine, SearchOptions +from codexlens.entities import IndexedFile, Symbol + + +@dataclass +class BenchmarkResult: + """Benchmark result container.""" + name: str + iterations: int + total_time_ms: float + avg_time_ms: float + min_time_ms: float + max_time_ms: float + ops_per_sec: float + + def __str__(self): + return ( + f"{self.name}:\n" + f" Iterations: {self.iterations}\n" + f" Total: {self.total_time_ms:.2f}ms\n" + f" Avg: {self.avg_time_ms:.2f}ms\n" + f" Min: {self.min_time_ms:.2f}ms\n" + f" Max: {self.max_time_ms:.2f}ms\n" + f" Ops/sec: {self.ops_per_sec:.1f}" + ) + + +def benchmark(func, iterations=10, warmup=2): + """Run benchmark with warmup iterations.""" + # Warmup + for _ in range(warmup): + func() + + # Measure + times = [] + for _ in range(iterations): + gc.collect() + start = time.perf_counter() + func() + elapsed = (time.perf_counter() - start) * 1000 + times.append(elapsed) + + total = sum(times) + return BenchmarkResult( + name=func.__name__ if hasattr(func, '__name__') else 'benchmark', + iterations=iterations, + total_time_ms=total, + avg_time_ms=total / iterations, + min_time_ms=min(times), + max_time_ms=max(times), + ops_per_sec=1000 / (total / iterations) if total > 0 else 0 + ) + + +@contextmanager +def timer(name: str): + """Context manager for timing code blocks.""" + start = time.perf_counter() + yield + elapsed = (time.perf_counter() - start) * 1000 + print(f" {name}: {elapsed:.2f}ms") + + +# === Test Fixtures === + +@pytest.fixture(scope="module") +def temp_dir(): + """Create a temporary directory for all tests.""" + tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) + yield Path(tmpdir.name) + # Explicit cleanup with error handling for Windows file locking + try: + tmpdir.cleanup() + except (PermissionError, OSError): + pass # Ignore Windows file locking errors + + +def generate_code_file(index: int, lines: int = 100) -> Tuple[IndexedFile, str]: + """Generate a synthetic code file for testing.""" + symbols = [ + Symbol(name=f"function_{index}_{i}", kind="function", range=(i*10+1, i*10+9)) + for i in range(lines // 10) + ] + + content_lines = [] + for i in range(lines): + if i % 10 == 0: + content_lines.append(f"def function_{index}_{i//10}(param_{i}, data_{i}):") + else: + content_lines.append(f" # Line {i}: processing data with param_{i % 5}") + content_lines.append(f" result_{i} = compute(data_{i})") + + return ( + IndexedFile( + path=f"/project/src/module_{index}/file_{index}.py", + language="python", + symbols=symbols, + ), + "\n".join(content_lines) + ) + + +@pytest.fixture(scope="module") +def small_store(temp_dir): + """Small store with 10 files (~100 lines each).""" + db_path = temp_dir / "small_index.db" + store = SQLiteStore(db_path) + store.initialize() + + for i in range(10): + indexed_file, content = generate_code_file(i, lines=100) + store.add_file(indexed_file, content) + + yield store + store.close() + + +@pytest.fixture(scope="module") +def medium_store(temp_dir): + """Medium store with 100 files (~100 lines each).""" + db_path = temp_dir / "medium_index.db" + store = SQLiteStore(db_path) + store.initialize() + + for i in range(100): + indexed_file, content = generate_code_file(i, lines=100) + store.add_file(indexed_file, content) + + yield store + store.close() + + +@pytest.fixture(scope="module") +def large_store(temp_dir): + """Large store with 500 files (~200 lines each).""" + db_path = temp_dir / "large_index.db" + store = SQLiteStore(db_path) + store.initialize() + + for i in range(500): + indexed_file, content = generate_code_file(i, lines=200) + store.add_file(indexed_file, content) + + yield store + store.close() + + +# === FTS5 Performance Tests === + +class TestFTS5Performance: + """FTS5 search performance benchmarks.""" + + def test_small_store_search(self, small_store): + """Benchmark FTS5 search on small store (10 files).""" + print("\n" + "="*60) + print("FTS5 SEARCH - SMALL STORE (10 files)") + print("="*60) + + queries = ["function", "data", "compute", "result", "param"] + + for query in queries: + result = benchmark( + lambda q=query: small_store.search_fts(q, limit=20), + iterations=50 + ) + result.name = f"search '{query}'" + print(f"\n{result}") + + def test_medium_store_search(self, medium_store): + """Benchmark FTS5 search on medium store (100 files).""" + print("\n" + "="*60) + print("FTS5 SEARCH - MEDIUM STORE (100 files)") + print("="*60) + + queries = ["function", "data", "compute", "result", "param"] + + for query in queries: + result = benchmark( + lambda q=query: medium_store.search_fts(q, limit=20), + iterations=30 + ) + result.name = f"search '{query}'" + print(f"\n{result}") + + def test_large_store_search(self, large_store): + """Benchmark FTS5 search on large store (500 files).""" + print("\n" + "="*60) + print("FTS5 SEARCH - LARGE STORE (500 files)") + print("="*60) + + queries = ["function", "data", "compute", "result", "param"] + + for query in queries: + result = benchmark( + lambda q=query: large_store.search_fts(q, limit=20), + iterations=20 + ) + result.name = f"search '{query}'" + print(f"\n{result}") + + def test_search_limit_scaling(self, medium_store): + """Test how search time scales with result limit.""" + print("\n" + "="*60) + print("FTS5 SEARCH - LIMIT SCALING") + print("="*60) + + limits = [5, 10, 20, 50, 100, 200] + + for limit in limits: + result = benchmark( + lambda l=limit: medium_store.search_fts("function", limit=l), + iterations=20 + ) + result.name = f"limit={limit}" + print(f"\n{result}") + + def test_complex_query_performance(self, medium_store): + """Test performance of complex FTS5 queries.""" + print("\n" + "="*60) + print("FTS5 SEARCH - COMPLEX QUERIES") + print("="*60) + + queries = [ + ("single term", "function"), + ("two terms", "function data"), + ("phrase", '"def function"'), + ("OR query", "function OR result"), + ("wildcard", "func*"), + ("NOT query", "function NOT data"), + ] + + for name, query in queries: + result = benchmark( + lambda q=query: medium_store.search_fts(q, limit=20), + iterations=20 + ) + result.name = name + print(f"\n{result}") + + +class TestSymbolSearchPerformance: + """Symbol search performance benchmarks.""" + + def test_symbol_search_scaling(self, small_store, medium_store, large_store): + """Test symbol search performance at different scales.""" + print("\n" + "="*60) + print("SYMBOL SEARCH - SCALING") + print("="*60) + + stores = [ + ("small (10 files)", small_store), + ("medium (100 files)", medium_store), + ("large (500 files)", large_store), + ] + + for name, store in stores: + result = benchmark( + lambda s=store: s.search_symbols("function", limit=50), + iterations=20 + ) + result.name = name + print(f"\n{result}") + + def test_symbol_search_with_kind_filter(self, medium_store): + """Test symbol search with kind filtering.""" + print("\n" + "="*60) + print("SYMBOL SEARCH - KIND FILTER") + print("="*60) + + # Without filter + result_no_filter = benchmark( + lambda: medium_store.search_symbols("function", limit=50), + iterations=20 + ) + result_no_filter.name = "no filter" + print(f"\n{result_no_filter}") + + # With filter + result_with_filter = benchmark( + lambda: medium_store.search_symbols("function", kind="function", limit=50), + iterations=20 + ) + result_with_filter.name = "kind=function" + print(f"\n{result_with_filter}") + + +# === Chain Search Performance Tests === + +class TestChainSearchPerformance: + """Chain search engine performance benchmarks.""" + + @pytest.fixture + def chain_engine_setup(self, temp_dir): + """Setup chain search engine with directory hierarchy.""" + # Create directory hierarchy + root = temp_dir / "project" + root.mkdir(exist_ok=True) + + registry = RegistryStore(temp_dir / "registry.db") + registry.initialize() + mapper = PathMapper(temp_dir / "indexes") + + # Create indexes at different depths + dirs = [ + root, + root / "src", + root / "src" / "core", + root / "src" / "utils", + root / "tests", + ] + + for i, dir_path in enumerate(dirs): + dir_path.mkdir(exist_ok=True) + index_path = mapper.source_to_index_db(dir_path) + index_path.parent.mkdir(parents=True, exist_ok=True) + + store = DirIndexStore(index_path) + store.initialize() + for j in range(20): # 20 files per directory + indexed_file, content = generate_code_file(i * 100 + j, lines=50) + file_path = str(dir_path / f"file_{j}.py") + store.add_file( + name=f"file_{j}.py", + full_path=file_path, + content=content, + language="python", + symbols=indexed_file.symbols, + ) + store.close() + + # Register directory + project = registry.register_project(root, mapper.source_to_index_dir(root)) + registry.register_dir(project.id, dir_path, index_path, i, 20) + + engine = ChainSearchEngine(registry, mapper) + + yield { + "engine": engine, + "registry": registry, + "root": root, + } + + registry.close() + + def test_chain_search_depth(self, chain_engine_setup): + """Test chain search at different depths.""" + print("\n" + "="*60) + print("CHAIN SEARCH - DEPTH VARIATION") + print("="*60) + + engine = chain_engine_setup["engine"] + root = chain_engine_setup["root"] + + depths = [0, 1, 2, -1] # -1 = unlimited + + for depth in depths: + options = SearchOptions(depth=depth, max_workers=4, total_limit=50) + result = benchmark( + lambda d=depth, o=options: engine.search("function", root, o), + iterations=10 + ) + result.name = f"depth={depth}" + print(f"\n{result}") + + def test_chain_search_parallelism(self, chain_engine_setup): + """Test chain search with different worker counts.""" + print("\n" + "="*60) + print("CHAIN SEARCH - PARALLELISM") + print("="*60) + + engine = chain_engine_setup["engine"] + root = chain_engine_setup["root"] + + worker_counts = [1, 2, 4, 8] + + for workers in worker_counts: + options = SearchOptions(depth=-1, max_workers=workers, total_limit=50) + result = benchmark( + lambda w=workers, o=options: engine.search("function", root, o), + iterations=10 + ) + result.name = f"workers={workers}" + print(f"\n{result}") + + +# === Semantic Search Performance Tests === + +class TestSemanticSearchPerformance: + """Semantic search performance benchmarks.""" + + @pytest.fixture + def semantic_setup(self, temp_dir): + """Setup semantic search with embeddings.""" + try: + from codexlens.semantic import SEMANTIC_AVAILABLE + if not SEMANTIC_AVAILABLE: + pytest.skip("Semantic search dependencies not installed") + + from codexlens.semantic.embedder import Embedder + from codexlens.semantic.vector_store import VectorStore + from codexlens.entities import SemanticChunk + + embedder = Embedder() + db_path = temp_dir / "semantic.db" + vector_store = VectorStore(db_path) + + # Add test chunks + code_samples = [ + "def authenticate_user(username, password): verify user credentials", + "class DatabaseConnection: manage database connections with pooling", + "async def fetch_api_data(url): make HTTP request and return JSON", + "function renderComponent(props): render React UI component", + "def process_data(input): transform and validate input data", + ] * 50 # 250 chunks + + for i, content in enumerate(code_samples): + chunk = SemanticChunk( + content=content, + metadata={"index": i, "language": "python"} + ) + chunk.embedding = embedder.embed_single(content) + vector_store.add_chunk(chunk, f"/test/file_{i}.py") + + yield { + "embedder": embedder, + "vector_store": vector_store, + } + + # Clean up vector store cache + vector_store.clear_cache() + + except ImportError: + pytest.skip("Semantic search dependencies not installed") + + def test_embedding_generation_speed(self, semantic_setup): + """Benchmark embedding generation speed.""" + print("\n" + "="*60) + print("SEMANTIC SEARCH - EMBEDDING GENERATION") + print("="*60) + + embedder = semantic_setup["embedder"] + + # Single embedding + result = benchmark( + lambda: embedder.embed_single("def example_function(): return 42"), + iterations=50 + ) + result.name = "single embedding" + print(f"\n{result}") + + # Batch embedding + texts = ["def func{}(): return {}".format(i, i) for i in range(10)] + result = benchmark( + lambda: embedder.embed(texts), + iterations=20 + ) + result.name = "batch embedding (10 texts)" + print(f"\n{result}") + + def test_vector_search_speed(self, semantic_setup): + """Benchmark vector similarity search speed.""" + print("\n" + "="*60) + print("SEMANTIC SEARCH - VECTOR SEARCH") + print("="*60) + + embedder = semantic_setup["embedder"] + vector_store = semantic_setup["vector_store"] + + query_embedding = embedder.embed_single("user authentication login") + + # Different top_k values + for top_k in [5, 10, 20, 50]: + result = benchmark( + lambda k=top_k: vector_store.search_similar(query_embedding, top_k=k), + iterations=30 + ) + result.name = f"top_k={top_k}" + print(f"\n{result}") + + def test_full_semantic_search_latency(self, semantic_setup): + """Benchmark full semantic search (embed + search).""" + print("\n" + "="*60) + print("SEMANTIC SEARCH - FULL LATENCY") + print("="*60) + + embedder = semantic_setup["embedder"] + vector_store = semantic_setup["vector_store"] + + queries = [ + "user authentication", + "database connection", + "API request handler", + "React component", + "data processing", + ] + + for query in queries: + def full_search(q=query): + embedding = embedder.embed_single(q) + return vector_store.search_similar(embedding, top_k=10) + + result = benchmark(full_search, iterations=20) + result.name = f"'{query}'" + print(f"\n{result}") + + +# === Comparative Benchmarks === + +class TestComparativeBenchmarks: + """Compare FTS5 vs Semantic search performance.""" + + @pytest.fixture + def comparison_setup(self, temp_dir): + """Setup both FTS5 and semantic stores with same content.""" + # FTS5 store + fts_store = SQLiteStore(temp_dir / "fts_compare.db") + fts_store.initialize() + + code_samples = [ + ("auth.py", "def authenticate_user(username, password): verify credentials"), + ("db.py", "class DatabasePool: manage database connection pooling"), + ("api.py", "async def handle_request(req): process API request"), + ("ui.py", "function Button({ onClick }): render button component"), + ("utils.py", "def process_data(input): transform and validate data"), + ] * 20 + + for i, (filename, content) in enumerate(code_samples): + indexed_file = IndexedFile( + path=f"/project/{filename.replace('.py', '')}_{i}.py", + language="python", + symbols=[Symbol(name=f"func_{i}", kind="function", range=(1, 5))], + ) + fts_store.add_file(indexed_file, content) + + # Semantic store (if available) + try: + from codexlens.semantic import SEMANTIC_AVAILABLE + if SEMANTIC_AVAILABLE: + from codexlens.semantic.embedder import Embedder + from codexlens.semantic.vector_store import VectorStore + from codexlens.entities import SemanticChunk + + embedder = Embedder() + semantic_store = VectorStore(temp_dir / "semantic_compare.db") + + for i, (filename, content) in enumerate(code_samples): + chunk = SemanticChunk(content=content, metadata={"index": i}) + chunk.embedding = embedder.embed_single(content) + semantic_store.add_chunk(chunk, f"/project/{filename}") + + yield { + "fts_store": fts_store, + "semantic_store": semantic_store, + "embedder": embedder, + "has_semantic": True, + } + # Close semantic store connection + semantic_store.clear_cache() + else: + yield {"fts_store": fts_store, "has_semantic": False} + except ImportError: + yield {"fts_store": fts_store, "has_semantic": False} + + fts_store.close() + + def test_fts_vs_semantic_latency(self, comparison_setup): + """Compare FTS5 vs Semantic search latency.""" + print("\n" + "="*60) + print("FTS5 vs SEMANTIC - LATENCY COMPARISON") + print("="*60) + + fts_store = comparison_setup["fts_store"] + + queries = [ + "authenticate", + "database", + "request", + "button", + "process", + ] + + print("\nFTS5 Search:") + for query in queries: + result = benchmark( + lambda q=query: fts_store.search_fts(q, limit=10), + iterations=30 + ) + result.name = f"'{query}'" + print(f" {result.name}: avg={result.avg_time_ms:.2f}ms") + + if comparison_setup.get("has_semantic"): + semantic_store = comparison_setup["semantic_store"] + embedder = comparison_setup["embedder"] + + print("\nSemantic Search (embed + search):") + for query in queries: + def semantic_search(q=query): + emb = embedder.embed_single(q) + return semantic_store.search_similar(emb, top_k=10) + + result = benchmark(semantic_search, iterations=20) + result.name = f"'{query}'" + print(f" {result.name}: avg={result.avg_time_ms:.2f}ms") + else: + print("\n(Semantic search not available)") + + +# === Memory Usage Tests === + +class TestMemoryUsage: + """Memory usage during search operations.""" + + def test_search_memory_footprint(self, medium_store): + """Measure memory footprint during search.""" + print("\n" + "="*60) + print("MEMORY USAGE - SEARCH OPERATIONS") + print("="*60) + + import tracemalloc + + tracemalloc.start() + + # Run multiple searches + for _ in range(100): + medium_store.search_fts("function", limit=20) + + current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + print(f"\nAfter 100 FTS5 searches:") + print(f" Current memory: {current / 1024 / 1024:.2f} MB") + print(f" Peak memory: {peak / 1024 / 1024:.2f} MB") + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s", "--tb=short"])