From 5d5652c2c5690fb3df3dc4ed57358f97c2dc6663 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Mon, 29 Dec 2025 18:50:22 +0800 Subject: [PATCH] fix(sqlite-store): improve thread tracking in connection cleanup Add fallback validation to detect dead threads missed by threading.enumerate(), ensuring all stale connections are cleaned. Solution-ID: SOL-1735392000002 Issue-ID: ISS-1766921318981-3 Task-ID: T2 --- .../src/codexlens/storage/sqlite_store.py | 47 ++++++++++++---- codex-lens/tests/test_sqlite_store.py | 55 +++++++++++++++++++ 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/codex-lens/src/codexlens/storage/sqlite_store.py b/codex-lens/src/codexlens/storage/sqlite_store.py index b6780146..369feef7 100644 --- a/codex-lens/src/codexlens/storage/sqlite_store.py +++ b/codex-lens/src/codexlens/storage/sqlite_store.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import logging import sqlite3 import threading import time @@ -13,6 +14,8 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple from codexlens.entities import IndexedFile, SearchResult, Symbol from codexlens.errors import StorageError +logger = logging.getLogger(__name__) + class SQLiteStore: """SQLiteStore providing FTS5 search and symbol lookup. @@ -47,11 +50,16 @@ class SQLiteStore: if getattr(self._local, "generation", None) == self._pool_generation: conn = getattr(self._local, "conn", None) if conn is not None: - # Update last access time with self._pool_lock: - if thread_id in self._pool: - self._pool[thread_id] = (conn, current_time) - return conn + pool_entry = self._pool.get(thread_id) + if pool_entry is not None: + pooled_conn, _ = pool_entry + self._pool[thread_id] = (pooled_conn, current_time) + self._local.conn = pooled_conn + return pooled_conn + + # Thread-local connection is stale (e.g., cleaned up by timer). + self._local.conn = None with self._pool_lock: pool_entry = self._pool.get(thread_id) @@ -84,21 +92,40 @@ class SQLiteStore: active_threads = {t.ident for t in threading.enumerate() if t.ident is not None} # Find connections to remove: dead threads or idle timeout exceeded - stale_ids = [] + stale_ids: list[tuple[int, str]] = [] for tid, (conn, last_access) in list(self._pool.items()): - is_dead_thread = tid not in active_threads - is_idle = (current_time - last_access) > self.IDLE_TIMEOUT - if is_dead_thread or is_idle: - stale_ids.append(tid) + try: + is_dead_thread = tid not in active_threads + is_idle = (current_time - last_access) > self.IDLE_TIMEOUT + + is_invalid_connection = False + if not is_dead_thread and not is_idle: + try: + conn.execute("SELECT 1").fetchone() + except sqlite3.ProgrammingError: + is_invalid_connection = True + except sqlite3.Error: + is_invalid_connection = True + + if is_invalid_connection: + stale_ids.append((tid, "invalid_connection")) + elif is_dead_thread: + stale_ids.append((tid, "dead_thread")) + elif is_idle: + stale_ids.append((tid, "idle_timeout")) + except Exception: + # Never break cleanup for a single bad entry. + continue # Close and remove stale connections - for tid in stale_ids: + for tid, reason in stale_ids: try: conn, _ = self._pool[tid] conn.close() except Exception: pass del self._pool[tid] + logger.debug("Cleaned SQLiteStore connection for thread_id=%s (%s)", tid, reason) def _start_cleanup_timer(self) -> None: if self.CLEANUP_INTERVAL <= 0: diff --git a/codex-lens/tests/test_sqlite_store.py b/codex-lens/tests/test_sqlite_store.py index 3dc972a4..3f4a037a 100644 --- a/codex-lens/tests/test_sqlite_store.py +++ b/codex-lens/tests/test_sqlite_store.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging import threading import time from pathlib import Path @@ -59,3 +60,57 @@ def test_periodic_cleanup(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> No store.close() +def test_cleanup_robustness( + monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture, tmp_path: Path +) -> None: + """Cleanup should handle dead threads, idle timeouts, and invalid connections.""" + monkeypatch.setattr(SQLiteStore, "CLEANUP_INTERVAL", 0) + caplog.set_level(logging.DEBUG, logger="codexlens.storage.sqlite_store") + + store = SQLiteStore(tmp_path / "cleanup_robustness.db") + store.initialize() + + try: + # Invalid connection: active thread but pooled connection is already closed. + conn = store._get_connection() + conn.close() + with store._pool_lock: + store._pool[threading.get_ident()] = (conn, time.time()) + store._cleanup_stale_connections() + + assert "invalid_connection" in caplog.text + assert threading.get_ident() not in store._pool + + # Ensure next access recreates a working connection after cleanup. + fresh_conn = store._get_connection() + assert fresh_conn is not conn + + # Idle timeout cleanup should be logged distinctly. + with store._pool_lock: + store._pool[threading.get_ident()] = (fresh_conn, time.time() - store.IDLE_TIMEOUT - 1) + store._cleanup_stale_connections() + + assert "idle_timeout" in caplog.text + assert threading.get_ident() not in store._pool + + # Dead thread cleanup should be logged distinctly. + created: list[int] = [] + + def worker() -> None: + store._get_connection() + created.append(threading.get_ident()) + + t = threading.Thread(target=worker) + t.start() + t.join() + + dead_tid = created[0] + assert dead_tid in store._pool + with store._pool_lock: + store._cleanup_stale_connections() + + assert "dead_thread" in caplog.text + assert dead_tid not in store._pool + finally: + store.close() +