feat: Enhance JSON streaming parsing and UI updates

- Added a function to parse JSON streaming content in core-memory.js, extracting readable text from messages.
- Updated memory detail view to utilize the new parsing function for content and summary.
- Introduced an enableReview option in rules-manager.js, allowing users to toggle review functionality in rule creation.
- Simplified skill creation modal in skills-manager.js by removing generation type selection UI.
- Improved CLI executor to handle tool calls for file writing, ensuring proper output parsing.
- Adjusted CLI command tests to set timeout to 0 for immediate execution.
- Updated file watcher to implement a true debounce mechanism and added a pending queue status for UI updates.
- Enhanced watcher manager to handle queue changes and provide JSON output for better integration with TypeScript backend.
- Established TypeScript naming conventions documentation to standardize code style across the project.
This commit is contained in:
catlog22
2026-01-07 21:51:26 +08:00
parent e9fb7be85f
commit 05514631f2
19 changed files with 1346 additions and 173 deletions

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
@@ -28,7 +29,7 @@ class FileEvent:
@dataclass
class WatcherConfig:
"""Configuration for file watcher."""
debounce_ms: int = 1000
debounce_ms: int = 60000 # Default 60 seconds for debounce
ignored_patterns: Set[str] = field(default_factory=lambda: {
# Version control
".git", ".svn", ".hg",
@@ -50,13 +51,26 @@ class WatcherConfig:
languages: Optional[List[str]] = None # None = all supported
@dataclass
class PendingQueueStatus:
"""Status of pending file changes queue."""
file_count: int = 0
files: List[str] = field(default_factory=list) # Limited to 20 files
countdown_seconds: int = 0
last_event_time: Optional[float] = None
@dataclass
class IndexResult:
"""Result of processing file changes."""
files_indexed: int = 0
files_removed: int = 0
symbols_added: int = 0
symbols_removed: int = 0
files_success: List[str] = field(default_factory=list)
files_failed: List[str] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
timestamp: float = field(default_factory=time.time)
@dataclass

View File

@@ -11,11 +11,15 @@ from typing import Callable, Dict, List, Optional
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from .events import ChangeType, FileEvent, WatcherConfig
from .events import ChangeType, FileEvent, WatcherConfig, PendingQueueStatus
from ..config import Config
logger = logging.getLogger(__name__)
# Maximum queue size to prevent unbounded memory growth
# When exceeded, forces immediate flush to avoid memory exhaustion
MAX_QUEUE_SIZE = 50000
class _CodexLensHandler(FileSystemEventHandler):
"""Internal handler for watchdog events."""
@@ -112,8 +116,12 @@ class FileWatcher:
self._event_queue: List[FileEvent] = []
self._queue_lock = threading.Lock()
# Debounce thread
self._debounce_thread: Optional[threading.Thread] = None
# Debounce timer (true debounce - waits after last event)
self._flush_timer: Optional[threading.Timer] = None
self._last_event_time: float = 0
# Queue change callbacks for real-time UI updates
self._queue_change_callbacks: List[Callable[[PendingQueueStatus], None]] = []
# Config instance for language checking
self._codexlens_config = Config()
@@ -138,16 +146,57 @@ class FileWatcher:
return language is not None
def _on_raw_event(self, event: FileEvent) -> None:
"""Handle raw event from watchdog handler."""
"""Handle raw event from watchdog handler with true debounce."""
force_flush = False
with self._queue_lock:
# Check queue size limit to prevent memory exhaustion
if len(self._event_queue) >= MAX_QUEUE_SIZE:
logger.warning(
"Event queue limit (%d) reached, forcing immediate flush",
MAX_QUEUE_SIZE
)
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = None
force_flush = True
self._event_queue.append(event)
# Debouncing is handled by background thread
self._last_event_time = time.time()
# Cancel previous timer and schedule new one (true debounce)
# Skip if we're about to force flush
if not force_flush:
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = threading.Timer(
self.config.debounce_ms / 1000.0,
self._flush_events
)
self._flush_timer.daemon = True
self._flush_timer.start()
# Force flush outside lock to avoid deadlock
if force_flush:
self._flush_events()
# Notify queue change (outside lock to avoid deadlock)
self._notify_queue_change()
def _debounce_loop(self) -> None:
"""Background thread for debounced event batching."""
"""Background thread for checking flush signal file."""
signal_file = self.root_path / '.codexlens' / 'flush.signal'
while self._running:
time.sleep(self.config.debounce_ms / 1000.0)
self._flush_events()
time.sleep(1.0) # Check every second
# Check for flush signal file
if signal_file.exists():
try:
signal_file.unlink()
logger.info("Flush signal detected, triggering immediate index")
self.flush_now()
except Exception as e:
logger.warning("Failed to handle flush signal: %s", e)
def _flush_events(self) -> None:
"""Flush queued events with deduplication."""
@@ -162,6 +211,10 @@ class FileWatcher:
events = list(deduped.values())
self._event_queue.clear()
self._last_event_time = 0 # Reset after flush
# Notify queue cleared
self._notify_queue_change()
if events:
try:
@@ -169,6 +222,50 @@ class FileWatcher:
except Exception as exc:
logger.error("Error in on_changes callback: %s", exc)
def flush_now(self) -> None:
"""Immediately flush pending queue (manual trigger)."""
with self._queue_lock:
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = None
self._flush_events()
def get_pending_queue_status(self) -> PendingQueueStatus:
"""Get current pending queue status for UI display."""
with self._queue_lock:
file_count = len(self._event_queue)
files = [str(e.path.name) for e in self._event_queue[:20]]
# Calculate countdown
if self._last_event_time > 0 and file_count > 0:
elapsed = time.time() - self._last_event_time
remaining = max(0, self.config.debounce_ms / 1000.0 - elapsed)
countdown = int(remaining)
else:
countdown = 0
return PendingQueueStatus(
file_count=file_count,
files=files,
countdown_seconds=countdown,
last_event_time=self._last_event_time if file_count > 0 else None
)
def register_queue_change_callback(
self, callback: Callable[[PendingQueueStatus], None]
) -> None:
"""Register callback for queue change notifications."""
self._queue_change_callbacks.append(callback)
def _notify_queue_change(self) -> None:
"""Notify all registered callbacks of queue change."""
status = self.get_pending_queue_status()
for callback in self._queue_change_callbacks:
try:
callback(status)
except Exception as e:
logger.error("Queue change callback error: %s", e)
def start(self) -> None:
"""Start watching the directory.
@@ -190,13 +287,13 @@ class FileWatcher:
self._stop_event.clear()
self._observer.start()
# Start debounce thread
self._debounce_thread = threading.Thread(
# Start signal check thread (for flush.signal file)
self._signal_check_thread = threading.Thread(
target=self._debounce_loop,
daemon=True,
name="FileWatcher-Debounce",
name="FileWatcher-SignalCheck",
)
self._debounce_thread.start()
self._signal_check_thread.start()
logger.info("Started watching: %s", self.root_path)
@@ -212,15 +309,20 @@ class FileWatcher:
self._running = False
self._stop_event.set()
# Cancel pending flush timer
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = None
if self._observer:
self._observer.stop()
self._observer.join(timeout=5.0)
self._observer = None
# Wait for debounce thread to finish
if self._debounce_thread and self._debounce_thread.is_alive():
self._debounce_thread.join(timeout=2.0)
self._debounce_thread = None
# Wait for signal check thread to finish
if hasattr(self, '_signal_check_thread') and self._signal_check_thread and self._signal_check_thread.is_alive():
self._signal_check_thread.join(timeout=2.0)
self._signal_check_thread = None
# Flush any remaining events
self._flush_events()

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import json
import logging
import signal
import threading
@@ -13,7 +14,7 @@ from codexlens.config import Config
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.registry import RegistryStore
from .events import FileEvent, IndexResult, WatcherConfig, WatcherStats
from .events import FileEvent, IndexResult, PendingQueueStatus, WatcherConfig, WatcherStats
from .file_watcher import FileWatcher
from .incremental_indexer import IncrementalIndexer
@@ -36,44 +37,68 @@ class WatcherManager:
config: Optional[Config] = None,
watcher_config: Optional[WatcherConfig] = None,
on_indexed: Optional[Callable[[IndexResult], None]] = None,
on_queue_change: Optional[Callable[[PendingQueueStatus], None]] = None,
) -> None:
self.root_path = Path(root_path).resolve()
self.config = config or Config()
self.watcher_config = watcher_config or WatcherConfig()
self.on_indexed = on_indexed
self.on_queue_change = on_queue_change
self._registry: Optional[RegistryStore] = None
self._mapper: Optional[PathMapper] = None
self._watcher: Optional[FileWatcher] = None
self._indexer: Optional[IncrementalIndexer] = None
self._running = False
self._stop_event = threading.Event()
self._lock = threading.RLock()
# Statistics
self._stats = WatcherStats()
self._original_sigint = None
self._original_sigterm = None
# Index history for tracking recent results
self._index_history: List[IndexResult] = []
self._max_history_size = 10
def _handle_changes(self, events: List[FileEvent]) -> None:
"""Handle file change events from watcher."""
if not self._indexer or not events:
return
logger.info("Processing %d file changes", len(events))
result = self._indexer.process_changes(events)
# Update stats
self._stats.events_processed += len(events)
self._stats.last_event_time = time.time()
# Save to history
self._index_history.append(result)
if len(self._index_history) > self._max_history_size:
self._index_history.pop(0)
if result.files_indexed > 0 or result.files_removed > 0:
logger.info(
"Indexed %d files, removed %d files, %d errors",
result.files_indexed, result.files_removed, len(result.errors)
)
# Output JSON for TypeScript backend parsing
result_data = {
"files_indexed": result.files_indexed,
"files_removed": result.files_removed,
"symbols_added": result.symbols_added,
"symbols_removed": result.symbols_removed,
"files_success": result.files_success[:20], # Limit output
"files_failed": result.files_failed[:20],
"errors": result.errors[:10],
"timestamp": result.timestamp
}
print(f"[INDEX_RESULT] {json.dumps(result_data)}", flush=True)
if self.on_indexed:
try:
self.on_indexed(result)
@@ -128,7 +153,11 @@ class WatcherManager:
self._watcher = FileWatcher(
self.root_path, self.watcher_config, self._handle_changes
)
# Register queue change callback for real-time UI updates
if self.on_queue_change:
self._watcher.register_queue_change_callback(self._on_queue_change_wrapper)
# Install signal handlers
self._install_signal_handlers()
@@ -192,3 +221,35 @@ class WatcherManager:
last_event_time=self._stats.last_event_time,
is_running=self._running,
)
def _on_queue_change_wrapper(self, status: PendingQueueStatus) -> None:
"""Wrapper for queue change callback with JSON output."""
# Output JSON for TypeScript backend parsing
status_data = {
"file_count": status.file_count,
"files": status.files,
"countdown_seconds": status.countdown_seconds,
"last_event_time": status.last_event_time
}
print(f"[QUEUE_STATUS] {json.dumps(status_data)}", flush=True)
if self.on_queue_change:
try:
self.on_queue_change(status)
except Exception as exc:
logger.error("Error in on_queue_change callback: %s", exc)
def flush_now(self) -> None:
"""Immediately flush pending queue (manual trigger)."""
if self._watcher:
self._watcher.flush_now()
def get_pending_queue_status(self) -> Optional[PendingQueueStatus]:
"""Get current pending queue status."""
if self._watcher:
return self._watcher.get_pending_queue_status()
return None
def get_index_history(self, limit: int = 5) -> List[IndexResult]:
"""Get recent index history."""
return self._index_history[-limit:]