From b958a1ea968f1ef566e1d10360861a73fbaddda1 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Mon, 29 Dec 2025 18:43:55 +0800 Subject: [PATCH] fix(sqlite-store): add periodic cleanup timer for connection pool Implement background timer to proactively clean stale connections every 5 minutes, preventing indefinite accumulation. Solution-ID: SOL-1735392000002 Issue-ID: ISS-1766921318981-3 Task-ID: T1 --- .../src/codexlens/storage/sqlite_store.py | 40 ++++++++++++ codex-lens/tests/test_sqlite_store.py | 61 +++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 codex-lens/tests/test_sqlite_store.py diff --git a/codex-lens/src/codexlens/storage/sqlite_store.py b/codex-lens/src/codexlens/storage/sqlite_store.py index e88ee7f6..b6780146 100644 --- a/codex-lens/src/codexlens/storage/sqlite_store.py +++ b/codex-lens/src/codexlens/storage/sqlite_store.py @@ -24,6 +24,8 @@ class SQLiteStore: MAX_POOL_SIZE = 32 # Idle timeout in seconds (10 minutes) IDLE_TIMEOUT = 600 + # Periodic cleanup interval in seconds (5 minutes) + CLEANUP_INTERVAL = 300 def __init__(self, db_path: str | Path) -> None: self.db_path = Path(db_path) @@ -33,6 +35,9 @@ class SQLiteStore: # Pool stores (connection, last_access_time) tuples self._pool: Dict[int, Tuple[sqlite3.Connection, float]] = {} self._pool_generation = 0 + self._cleanup_timer: threading.Timer | None = None + self._cleanup_stop_event = threading.Event() + self._start_cleanup_timer() def _get_connection(self) -> sqlite3.Connection: """Get or create a thread-local database connection.""" @@ -95,9 +100,44 @@ class SQLiteStore: pass del self._pool[tid] + def _start_cleanup_timer(self) -> None: + if self.CLEANUP_INTERVAL <= 0: + return + + self._cleanup_stop_event.clear() + + def tick() -> None: + if self._cleanup_stop_event.is_set(): + return + + try: + with self._pool_lock: + self._cleanup_stale_connections() + finally: + with self._pool_lock: + if self._cleanup_stop_event.is_set(): + self._cleanup_timer = None + return + + self._cleanup_timer = threading.Timer(self.CLEANUP_INTERVAL, tick) + self._cleanup_timer.daemon = True + self._cleanup_timer.start() + + self._cleanup_timer = threading.Timer(self.CLEANUP_INTERVAL, tick) + self._cleanup_timer.daemon = True + self._cleanup_timer.start() + + def _stop_cleanup_timer(self) -> None: + self._cleanup_stop_event.set() + with self._pool_lock: + if self._cleanup_timer is not None: + self._cleanup_timer.cancel() + self._cleanup_timer = None + def close(self) -> None: """Close all pooled connections.""" with self._lock: + self._stop_cleanup_timer() with self._pool_lock: for conn, _ in self._pool.values(): conn.close() diff --git a/codex-lens/tests/test_sqlite_store.py b/codex-lens/tests/test_sqlite_store.py new file mode 100644 index 00000000..3dc972a4 --- /dev/null +++ b/codex-lens/tests/test_sqlite_store.py @@ -0,0 +1,61 @@ +"""Tests for SQLiteStore connection pool behavior.""" + +from __future__ import annotations + +import threading +import time +from pathlib import Path + +import pytest + +from codexlens.storage.sqlite_store import SQLiteStore + + +def test_periodic_cleanup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """Periodic timer should proactively clean up stale thread connections.""" + monkeypatch.setattr(SQLiteStore, "CLEANUP_INTERVAL", 0.2) + + store = SQLiteStore(tmp_path / "periodic_cleanup.db") + store.initialize() + + cleanup_called = threading.Event() + original_cleanup = store._cleanup_stale_connections + + def wrapped_cleanup() -> None: + cleanup_called.set() + original_cleanup() + + monkeypatch.setattr(store, "_cleanup_stale_connections", wrapped_cleanup) + + created: list[int] = [] + lock = threading.Lock() + main_tid = threading.get_ident() + + def worker() -> None: + store._get_connection() + with lock: + created.append(threading.get_ident()) + + try: + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Ensure we created thread-local connections without reaching MAX_POOL_SIZE. + assert len(store._pool) >= 2 + assert all(tid in store._pool for tid in created) + + # Wait for periodic cleanup to run and prune dead thread connections. + assert cleanup_called.wait(timeout=3) + deadline = time.time() + 3 + while time.time() < deadline and any(tid in store._pool for tid in created): + time.sleep(0.05) + + assert all(tid not in store._pool for tid in created) + assert set(store._pool.keys()).issubset({main_tid}) + finally: + store.close() + +