Files

348 lines
11 KiB
Python

"""File system watcher using watchdog library."""
from __future__ import annotations
import logging
import threading
import time
from pathlib import Path
from typing import Callable, Dict, List, Optional
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from .events import ChangeType, FileEvent, WatcherConfig, PendingQueueStatus
from ..config import Config
logger = logging.getLogger(__name__)
# Maximum queue size to prevent unbounded memory growth
# When exceeded, forces immediate flush to avoid memory exhaustion
MAX_QUEUE_SIZE = 50000
class _CodexLensHandler(FileSystemEventHandler):
"""Internal handler for watchdog events."""
def __init__(
self,
watcher: "FileWatcher",
on_event: Callable[[FileEvent], None],
) -> None:
super().__init__()
self._watcher = watcher
self._on_event = on_event
def on_created(self, event) -> None:
if event.is_directory:
return
self._emit(event.src_path, ChangeType.CREATED)
def on_modified(self, event) -> None:
if event.is_directory:
return
self._emit(event.src_path, ChangeType.MODIFIED)
def on_deleted(self, event) -> None:
if event.is_directory:
return
self._emit(event.src_path, ChangeType.DELETED)
def on_moved(self, event) -> None:
if event.is_directory:
return
self._emit(event.dest_path, ChangeType.MOVED, old_path=event.src_path)
def _emit(
self,
path: str,
change_type: ChangeType,
old_path: Optional[str] = None,
) -> None:
path_obj = Path(path)
# Filter out files that should not be indexed
if not self._watcher._should_index_file(path_obj):
return
event = FileEvent(
path=path_obj,
change_type=change_type,
timestamp=time.time(),
old_path=Path(old_path) if old_path else None,
)
self._on_event(event)
class FileWatcher:
"""File system watcher for monitoring directory changes.
Uses watchdog library for cross-platform file system monitoring.
Events are forwarded to the on_changes callback.
Example:
def handle_changes(events: List[FileEvent]) -> None:
for event in events:
print(f"{event.change_type}: {event.path}")
watcher = FileWatcher(Path("."), WatcherConfig(), handle_changes)
watcher.start()
watcher.wait() # Block until stopped
"""
def __init__(
self,
root_path: Path,
config: WatcherConfig,
on_changes: Callable[[List[FileEvent]], None],
) -> None:
"""Initialize file watcher.
Args:
root_path: Directory to watch recursively
config: Watcher configuration
on_changes: Callback invoked with batched events
"""
self.root_path = Path(root_path).resolve()
self.config = config
self.on_changes = on_changes
self._observer: Optional[Observer] = None
self._running = False
self._stop_event = threading.Event()
self._lock = threading.RLock()
# Event queue for batching
self._event_queue: List[FileEvent] = []
self._queue_lock = threading.Lock()
# Debounce timer (true debounce - waits after last event)
self._flush_timer: Optional[threading.Timer] = None
self._last_event_time: float = 0
# Queue change callbacks for real-time UI updates
self._queue_change_callbacks: List[Callable[[PendingQueueStatus], None]] = []
# Config instance for language checking
self._codexlens_config = Config()
def _should_index_file(self, path: Path) -> bool:
"""Check if file should be indexed based on extension and ignore patterns.
Args:
path: File path to check
Returns:
True if file should be indexed, False otherwise
"""
# Check against ignore patterns
parts = path.parts
for pattern in self.config.ignored_patterns:
if pattern in parts:
return False
# Check extension against supported languages
language = self._codexlens_config.language_for_path(path)
return language is not None
def _on_raw_event(self, event: FileEvent) -> None:
"""Handle raw event from watchdog handler with true debounce."""
force_flush = False
with self._queue_lock:
# Check queue size limit to prevent memory exhaustion
if len(self._event_queue) >= MAX_QUEUE_SIZE:
logger.warning(
"Event queue limit (%d) reached, forcing immediate flush",
MAX_QUEUE_SIZE
)
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = None
force_flush = True
self._event_queue.append(event)
self._last_event_time = time.time()
# Cancel previous timer and schedule new one (true debounce)
# Skip if we're about to force flush
if not force_flush:
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = threading.Timer(
self.config.debounce_ms / 1000.0,
self._flush_events
)
self._flush_timer.daemon = True
self._flush_timer.start()
# Force flush outside lock to avoid deadlock
if force_flush:
self._flush_events()
# Notify queue change (outside lock to avoid deadlock)
self._notify_queue_change()
def _debounce_loop(self) -> None:
"""Background thread for checking flush signal file."""
signal_file = self.root_path / '.codexlens' / 'flush.signal'
while self._running:
time.sleep(1.0) # Check every second
# Check for flush signal file
if signal_file.exists():
try:
signal_file.unlink()
logger.info("Flush signal detected, triggering immediate index")
self.flush_now()
except Exception as e:
logger.warning("Failed to handle flush signal: %s", e)
def _flush_events(self) -> None:
"""Flush queued events with deduplication."""
with self._queue_lock:
if not self._event_queue:
return
# Deduplicate: keep latest event per path
deduped: Dict[Path, FileEvent] = {}
for event in self._event_queue:
deduped[event.path] = event
events = list(deduped.values())
self._event_queue.clear()
self._last_event_time = 0 # Reset after flush
# Notify queue cleared
self._notify_queue_change()
if events:
try:
self.on_changes(events)
except Exception as exc:
logger.error("Error in on_changes callback: %s", exc)
def flush_now(self) -> None:
"""Immediately flush pending queue (manual trigger)."""
with self._queue_lock:
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = None
self._flush_events()
def get_pending_queue_status(self) -> PendingQueueStatus:
"""Get current pending queue status for UI display."""
with self._queue_lock:
file_count = len(self._event_queue)
files = [str(e.path.name) for e in self._event_queue[:20]]
# Calculate countdown
if self._last_event_time > 0 and file_count > 0:
elapsed = time.time() - self._last_event_time
remaining = max(0, self.config.debounce_ms / 1000.0 - elapsed)
countdown = int(remaining)
else:
countdown = 0
return PendingQueueStatus(
file_count=file_count,
files=files,
countdown_seconds=countdown,
last_event_time=self._last_event_time if file_count > 0 else None
)
def register_queue_change_callback(
self, callback: Callable[[PendingQueueStatus], None]
) -> None:
"""Register callback for queue change notifications."""
self._queue_change_callbacks.append(callback)
def _notify_queue_change(self) -> None:
"""Notify all registered callbacks of queue change."""
status = self.get_pending_queue_status()
for callback in self._queue_change_callbacks:
try:
callback(status)
except Exception as e:
logger.error("Queue change callback error: %s", e)
def start(self) -> None:
"""Start watching the directory.
Non-blocking. Use wait() to block until stopped.
"""
with self._lock:
if self._running:
logger.warning("Watcher already running")
return
if not self.root_path.exists():
raise ValueError(f"Root path does not exist: {self.root_path}")
self._observer = Observer()
handler = _CodexLensHandler(self, self._on_raw_event)
self._observer.schedule(handler, str(self.root_path), recursive=True)
self._running = True
self._stop_event.clear()
self._observer.start()
# Start signal check thread (for flush.signal file)
self._signal_check_thread = threading.Thread(
target=self._debounce_loop,
daemon=True,
name="FileWatcher-SignalCheck",
)
self._signal_check_thread.start()
logger.info("Started watching: %s", self.root_path)
def stop(self) -> None:
"""Stop watching the directory.
Gracefully stops the observer and flushes remaining events.
"""
with self._lock:
if not self._running:
return
self._running = False
self._stop_event.set()
# Cancel pending flush timer
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = None
if self._observer:
self._observer.stop()
self._observer.join(timeout=5.0)
self._observer = None
# Wait for signal check thread to finish
if hasattr(self, '_signal_check_thread') and self._signal_check_thread and self._signal_check_thread.is_alive():
self._signal_check_thread.join(timeout=2.0)
self._signal_check_thread = None
# Flush any remaining events
self._flush_events()
logger.info("Stopped watching: %s", self.root_path)
def wait(self) -> None:
"""Block until watcher is stopped.
Use Ctrl+C or call stop() from another thread to unblock.
"""
try:
while self._running:
self._stop_event.wait(timeout=1.0)
except KeyboardInterrupt:
logger.info("Received interrupt, stopping watcher...")
self.stop()
@property
def is_running(self) -> bool:
"""Check if watcher is currently running."""
return self._running