mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-05 01:50:27 +08:00
370 lines
13 KiB
Python
370 lines
13 KiB
Python
"""Incremental indexer for processing file changes."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
from codexlens.config import Config
|
|
from codexlens.parsers.factory import ParserFactory
|
|
from codexlens.storage.dir_index import DirIndexStore
|
|
from codexlens.storage.global_index import GlobalSymbolIndex
|
|
from codexlens.storage.path_mapper import PathMapper
|
|
from codexlens.storage.registry import RegistryStore
|
|
|
|
from .events import ChangeType, FileEvent, IndexResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class FileIndexResult:
|
|
"""Result of indexing a single file."""
|
|
path: Path
|
|
symbols_count: int
|
|
success: bool
|
|
error: Optional[str] = None
|
|
|
|
|
|
class IncrementalIndexer:
|
|
"""Incremental indexer for processing file change events.
|
|
|
|
Processes file events (create, modify, delete, move) and updates
|
|
the corresponding index databases incrementally.
|
|
|
|
Reuses existing infrastructure:
|
|
- ParserFactory for symbol extraction
|
|
- DirIndexStore for per-directory storage
|
|
- GlobalSymbolIndex for cross-file symbols
|
|
- PathMapper for source-to-index path conversion
|
|
|
|
Example:
|
|
indexer = IncrementalIndexer(registry, mapper, config)
|
|
result = indexer.process_changes([
|
|
FileEvent(Path("foo.py"), ChangeType.MODIFIED, time.time()),
|
|
])
|
|
print(f"Indexed {result.files_indexed} files")
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
registry: RegistryStore,
|
|
mapper: PathMapper,
|
|
config: Optional[Config] = None,
|
|
) -> None:
|
|
"""Initialize incremental indexer.
|
|
|
|
Args:
|
|
registry: Global project registry
|
|
mapper: Path mapper for source-to-index conversion
|
|
config: CodexLens configuration (uses defaults if None)
|
|
"""
|
|
self.registry = registry
|
|
self.mapper = mapper
|
|
self.config = config or Config()
|
|
self.parser_factory = ParserFactory(self.config)
|
|
|
|
self._global_index: Optional[GlobalSymbolIndex] = None
|
|
self._dir_stores: dict[Path, DirIndexStore] = {}
|
|
self._lock = __import__("threading").RLock()
|
|
|
|
def _get_global_index(self, index_root: Path, source_root: Optional[Path] = None) -> Optional[GlobalSymbolIndex]:
|
|
"""Get or create global symbol index.
|
|
|
|
Args:
|
|
index_root: Root directory containing the global symbol index DB
|
|
source_root: Source directory root for looking up project_id from registry
|
|
"""
|
|
if not self.config.global_symbol_index_enabled:
|
|
return None
|
|
|
|
if self._global_index is None:
|
|
global_db_path = index_root / GlobalSymbolIndex.DEFAULT_DB_NAME
|
|
if global_db_path.exists():
|
|
# Get project_id from registry using source_root
|
|
project_id = 0 # Default fallback
|
|
if source_root:
|
|
project_info = self.registry.get_project(source_root)
|
|
if project_info:
|
|
project_id = project_info.id
|
|
self._global_index = GlobalSymbolIndex(global_db_path, project_id=project_id)
|
|
|
|
return self._global_index
|
|
|
|
def _get_dir_store(self, dir_path: Path) -> Optional[DirIndexStore]:
|
|
"""Get DirIndexStore for a directory, if indexed."""
|
|
with self._lock:
|
|
if dir_path in self._dir_stores:
|
|
return self._dir_stores[dir_path]
|
|
|
|
index_db = self.mapper.source_to_index_db(dir_path)
|
|
if not index_db.exists():
|
|
logger.debug("No index found for directory: %s", dir_path)
|
|
return None
|
|
|
|
# Get index root for global index
|
|
source_root = self.mapper.get_project_root(dir_path) or dir_path
|
|
index_root = self.mapper.source_to_index_dir(source_root)
|
|
global_index = self._get_global_index(index_root, source_root=source_root)
|
|
|
|
store = DirIndexStore(
|
|
index_db,
|
|
config=self.config,
|
|
global_index=global_index,
|
|
)
|
|
self._dir_stores[dir_path] = store
|
|
return store
|
|
|
|
def process_changes(self, events: List[FileEvent]) -> IndexResult:
|
|
"""Process a batch of file change events.
|
|
|
|
Args:
|
|
events: List of file events to process
|
|
|
|
Returns:
|
|
IndexResult with statistics
|
|
"""
|
|
result = IndexResult()
|
|
|
|
for event in events:
|
|
try:
|
|
if event.change_type == ChangeType.CREATED:
|
|
file_result = self._index_file(event.path)
|
|
if file_result.success:
|
|
result.files_indexed += 1
|
|
result.symbols_added += file_result.symbols_count
|
|
else:
|
|
result.errors.append(file_result.error or f"Failed to index: {event.path}")
|
|
|
|
elif event.change_type == ChangeType.MODIFIED:
|
|
file_result = self._index_file(event.path)
|
|
if file_result.success:
|
|
result.files_indexed += 1
|
|
result.symbols_added += file_result.symbols_count
|
|
else:
|
|
result.errors.append(file_result.error or f"Failed to index: {event.path}")
|
|
|
|
elif event.change_type == ChangeType.DELETED:
|
|
self._remove_file(event.path)
|
|
result.files_removed += 1
|
|
|
|
elif event.change_type == ChangeType.MOVED:
|
|
# Remove from old location, add at new location
|
|
if event.old_path:
|
|
self._remove_file(event.old_path)
|
|
result.files_removed += 1
|
|
file_result = self._index_file(event.path)
|
|
if file_result.success:
|
|
result.files_indexed += 1
|
|
result.symbols_added += file_result.symbols_count
|
|
else:
|
|
result.errors.append(file_result.error or f"Failed to index: {event.path}")
|
|
|
|
except Exception as exc:
|
|
error_msg = f"Error processing {event.path}: {type(exc).__name__}: {exc}"
|
|
logger.error(error_msg)
|
|
result.errors.append(error_msg)
|
|
|
|
return result
|
|
|
|
def _index_file(self, path: Path) -> FileIndexResult:
|
|
"""Index a single file.
|
|
|
|
Args:
|
|
path: Path to the file to index
|
|
|
|
Returns:
|
|
FileIndexResult with status
|
|
"""
|
|
path = Path(path).resolve()
|
|
|
|
# Check if file exists
|
|
if not path.exists():
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=f"File not found: {path}",
|
|
)
|
|
|
|
# Check if language is supported
|
|
language = self.config.language_for_path(path)
|
|
if not language:
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=f"Unsupported language for: {path}",
|
|
)
|
|
|
|
# Get directory store
|
|
dir_path = path.parent
|
|
store = self._get_dir_store(dir_path)
|
|
if store is None:
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=f"Directory not indexed: {dir_path}",
|
|
)
|
|
|
|
# Read file content with fallback encodings
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
logger.debug("UTF-8 decode failed for %s, using fallback with errors='ignore'", path)
|
|
try:
|
|
content = path.read_text(encoding="utf-8", errors="ignore")
|
|
except Exception as exc:
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=f"Failed to read file: {exc}",
|
|
)
|
|
except Exception as exc:
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=f"Failed to read file: {exc}",
|
|
)
|
|
|
|
# Parse symbols
|
|
try:
|
|
parser = self.parser_factory.get_parser(language)
|
|
indexed_file = parser.parse(content, path)
|
|
except Exception as exc:
|
|
error_msg = f"Failed to parse {path}: {type(exc).__name__}: {exc}"
|
|
logger.error(error_msg)
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=error_msg,
|
|
)
|
|
|
|
# Update store with retry logic for transient database errors
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
store.add_file(
|
|
name=path.name,
|
|
full_path=str(path),
|
|
content=content,
|
|
language=language,
|
|
symbols=indexed_file.symbols,
|
|
relationships=indexed_file.relationships,
|
|
)
|
|
|
|
# Update merkle root
|
|
store.update_merkle_root()
|
|
|
|
logger.debug("Indexed file: %s (%d symbols)", path, len(indexed_file.symbols))
|
|
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=len(indexed_file.symbols),
|
|
success=True,
|
|
)
|
|
|
|
except __import__("sqlite3").OperationalError as exc:
|
|
# Transient database errors (e.g., database locked)
|
|
if attempt < max_retries - 1:
|
|
import time
|
|
wait_time = 0.1 * (2 ** attempt) # Exponential backoff
|
|
logger.debug("Database operation failed (attempt %d/%d), retrying in %.2fs: %s",
|
|
attempt + 1, max_retries, wait_time, exc)
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
error_msg = f"Failed to store {path} after {max_retries} attempts: {exc}"
|
|
logger.error(error_msg)
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=error_msg,
|
|
)
|
|
except Exception as exc:
|
|
error_msg = f"Failed to store {path}: {type(exc).__name__}: {exc}"
|
|
logger.error(error_msg)
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error=error_msg,
|
|
)
|
|
|
|
# Should never reach here
|
|
return FileIndexResult(
|
|
path=path,
|
|
symbols_count=0,
|
|
success=False,
|
|
error="Unexpected error in indexing loop",
|
|
)
|
|
|
|
def _remove_file(self, path: Path) -> bool:
|
|
"""Remove a file from the index.
|
|
|
|
Args:
|
|
path: Path to the file to remove
|
|
|
|
Returns:
|
|
True if removed successfully
|
|
"""
|
|
path = Path(path).resolve()
|
|
dir_path = path.parent
|
|
|
|
store = self._get_dir_store(dir_path)
|
|
if store is None:
|
|
logger.debug("Cannot remove file, directory not indexed: %s", dir_path)
|
|
return False
|
|
|
|
# Retry logic for transient database errors
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
store.remove_file(str(path))
|
|
store.update_merkle_root()
|
|
logger.debug("Removed file from index: %s", path)
|
|
return True
|
|
|
|
except __import__("sqlite3").OperationalError as exc:
|
|
# Transient database errors (e.g., database locked)
|
|
if attempt < max_retries - 1:
|
|
import time
|
|
wait_time = 0.1 * (2 ** attempt) # Exponential backoff
|
|
logger.debug("Database operation failed (attempt %d/%d), retrying in %.2fs: %s",
|
|
attempt + 1, max_retries, wait_time, exc)
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error("Failed to remove %s after %d attempts: %s", path, max_retries, exc)
|
|
return False
|
|
except Exception as exc:
|
|
logger.error("Failed to remove %s: %s", path, exc)
|
|
return False
|
|
|
|
# Should never reach here
|
|
return False
|
|
|
|
def close(self) -> None:
|
|
"""Close all open stores."""
|
|
with self._lock:
|
|
for store in self._dir_stores.values():
|
|
try:
|
|
store.close()
|
|
except Exception:
|
|
pass
|
|
self._dir_stores.clear()
|
|
|
|
if self._global_index:
|
|
try:
|
|
self._global_index.close()
|
|
except Exception:
|
|
pass
|
|
self._global_index = None
|