mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-14 02:42:04 +08:00
fix(sqlite-store): add periodic cleanup timer for connection pool
Implement background timer to proactively clean stale connections every 5 minutes, preventing indefinite accumulation. Solution-ID: SOL-1735392000002 Issue-ID: ISS-1766921318981-3 Task-ID: T1
This commit is contained in:
@@ -24,6 +24,8 @@ class SQLiteStore:
|
|||||||
MAX_POOL_SIZE = 32
|
MAX_POOL_SIZE = 32
|
||||||
# Idle timeout in seconds (10 minutes)
|
# Idle timeout in seconds (10 minutes)
|
||||||
IDLE_TIMEOUT = 600
|
IDLE_TIMEOUT = 600
|
||||||
|
# Periodic cleanup interval in seconds (5 minutes)
|
||||||
|
CLEANUP_INTERVAL = 300
|
||||||
|
|
||||||
def __init__(self, db_path: str | Path) -> None:
|
def __init__(self, db_path: str | Path) -> None:
|
||||||
self.db_path = Path(db_path)
|
self.db_path = Path(db_path)
|
||||||
@@ -33,6 +35,9 @@ class SQLiteStore:
|
|||||||
# Pool stores (connection, last_access_time) tuples
|
# Pool stores (connection, last_access_time) tuples
|
||||||
self._pool: Dict[int, Tuple[sqlite3.Connection, float]] = {}
|
self._pool: Dict[int, Tuple[sqlite3.Connection, float]] = {}
|
||||||
self._pool_generation = 0
|
self._pool_generation = 0
|
||||||
|
self._cleanup_timer: threading.Timer | None = None
|
||||||
|
self._cleanup_stop_event = threading.Event()
|
||||||
|
self._start_cleanup_timer()
|
||||||
|
|
||||||
def _get_connection(self) -> sqlite3.Connection:
|
def _get_connection(self) -> sqlite3.Connection:
|
||||||
"""Get or create a thread-local database connection."""
|
"""Get or create a thread-local database connection."""
|
||||||
@@ -95,9 +100,44 @@ class SQLiteStore:
|
|||||||
pass
|
pass
|
||||||
del self._pool[tid]
|
del self._pool[tid]
|
||||||
|
|
||||||
|
def _start_cleanup_timer(self) -> None:
|
||||||
|
if self.CLEANUP_INTERVAL <= 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._cleanup_stop_event.clear()
|
||||||
|
|
||||||
|
def tick() -> None:
|
||||||
|
if self._cleanup_stop_event.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self._pool_lock:
|
||||||
|
self._cleanup_stale_connections()
|
||||||
|
finally:
|
||||||
|
with self._pool_lock:
|
||||||
|
if self._cleanup_stop_event.is_set():
|
||||||
|
self._cleanup_timer = None
|
||||||
|
return
|
||||||
|
|
||||||
|
self._cleanup_timer = threading.Timer(self.CLEANUP_INTERVAL, tick)
|
||||||
|
self._cleanup_timer.daemon = True
|
||||||
|
self._cleanup_timer.start()
|
||||||
|
|
||||||
|
self._cleanup_timer = threading.Timer(self.CLEANUP_INTERVAL, tick)
|
||||||
|
self._cleanup_timer.daemon = True
|
||||||
|
self._cleanup_timer.start()
|
||||||
|
|
||||||
|
def _stop_cleanup_timer(self) -> None:
|
||||||
|
self._cleanup_stop_event.set()
|
||||||
|
with self._pool_lock:
|
||||||
|
if self._cleanup_timer is not None:
|
||||||
|
self._cleanup_timer.cancel()
|
||||||
|
self._cleanup_timer = None
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
"""Close all pooled connections."""
|
"""Close all pooled connections."""
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
self._stop_cleanup_timer()
|
||||||
with self._pool_lock:
|
with self._pool_lock:
|
||||||
for conn, _ in self._pool.values():
|
for conn, _ in self._pool.values():
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|||||||
61
codex-lens/tests/test_sqlite_store.py
Normal file
61
codex-lens/tests/test_sqlite_store.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
"""Tests for SQLiteStore connection pool behavior."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from codexlens.storage.sqlite_store import SQLiteStore
|
||||||
|
|
||||||
|
|
||||||
|
def test_periodic_cleanup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||||
|
"""Periodic timer should proactively clean up stale thread connections."""
|
||||||
|
monkeypatch.setattr(SQLiteStore, "CLEANUP_INTERVAL", 0.2)
|
||||||
|
|
||||||
|
store = SQLiteStore(tmp_path / "periodic_cleanup.db")
|
||||||
|
store.initialize()
|
||||||
|
|
||||||
|
cleanup_called = threading.Event()
|
||||||
|
original_cleanup = store._cleanup_stale_connections
|
||||||
|
|
||||||
|
def wrapped_cleanup() -> None:
|
||||||
|
cleanup_called.set()
|
||||||
|
original_cleanup()
|
||||||
|
|
||||||
|
monkeypatch.setattr(store, "_cleanup_stale_connections", wrapped_cleanup)
|
||||||
|
|
||||||
|
created: list[int] = []
|
||||||
|
lock = threading.Lock()
|
||||||
|
main_tid = threading.get_ident()
|
||||||
|
|
||||||
|
def worker() -> None:
|
||||||
|
store._get_connection()
|
||||||
|
with lock:
|
||||||
|
created.append(threading.get_ident())
|
||||||
|
|
||||||
|
try:
|
||||||
|
threads = [threading.Thread(target=worker) for _ in range(10)]
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
# Ensure we created thread-local connections without reaching MAX_POOL_SIZE.
|
||||||
|
assert len(store._pool) >= 2
|
||||||
|
assert all(tid in store._pool for tid in created)
|
||||||
|
|
||||||
|
# Wait for periodic cleanup to run and prune dead thread connections.
|
||||||
|
assert cleanup_called.wait(timeout=3)
|
||||||
|
deadline = time.time() + 3
|
||||||
|
while time.time() < deadline and any(tid in store._pool for tid in created):
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
assert all(tid not in store._pool for tid in created)
|
||||||
|
assert set(store._pool.keys()).issubset({main_tid})
|
||||||
|
finally:
|
||||||
|
store.close()
|
||||||
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user