From 9a45732a398dc3638127a5774151000a7201ce96 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Mon, 29 Dec 2025 18:16:03 +0800 Subject: [PATCH] test(codex-lens): add connection pool stress tests Solution-ID: SOL-1735410004 Issue-ID: ISS-1766921318981-24 Task-ID: T3 --- codex-lens/tests/test_storage_concurrency.py | 96 ++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/codex-lens/tests/test_storage_concurrency.py b/codex-lens/tests/test_storage_concurrency.py index 6f2608a5..8a7b35f3 100644 --- a/codex-lens/tests/test_storage_concurrency.py +++ b/codex-lens/tests/test_storage_concurrency.py @@ -600,3 +600,99 @@ class TestConcurrentWrites: assert not errors stats = writable_store.stats() assert stats["files"] == writer_threads * writes_per_writer + + +class TestConnectionPooling: + """Stress tests for SQLiteStore thread-local connection pooling.""" + + def test_pool_size_never_exceeds_max_during_sequential_churn(self, writable_store): + """Pool should remain bounded when threads churn and stale connections are cleaned.""" + max_pool_size = writable_store.MAX_POOL_SIZE + + def make_thread(): + def worker(): + writable_store._get_connection() + + t = threading.Thread(target=worker) + t.start() + t.join() + + for _ in range(max_pool_size + 50): + make_thread() + writable_store._cleanup_stale_connections() + assert len(writable_store._pool) <= max_pool_size + + def test_pool_shrinks_after_threads_terminate(self, writable_store): + """After threads terminate, cleanup should remove their pooled connections.""" + thread_count = 20 + barrier = threading.Barrier(thread_count + 1) + created = [] + errors = [] + lock = threading.Lock() + current_tid = threading.get_ident() + + def worker(): + try: + writable_store._get_connection() + with lock: + created.append(threading.get_ident()) + barrier.wait(timeout=5) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(thread_count)] + for t in threads: + t.start() + + barrier.wait(timeout=5) + assert not errors + assert len(writable_store._pool) >= thread_count + + for t in threads: + t.join() + + writable_store._cleanup_stale_connections() + assert all(tid not in writable_store._pool for tid in created) + assert set(writable_store._pool.keys()).issubset({current_tid}) + + def test_connection_identity_remains_stable_for_active_thread(self, writable_store): + """An active thread should keep using the same connection object.""" + main_conn = writable_store._get_connection() + errors = [] + lock = threading.Lock() + + def worker(): + try: + writable_store._get_connection() + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(15)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert writable_store._get_connection() is main_conn + + def test_close_invalidates_connections_and_generation(self, tmp_path): + """close() should clear the pool and force new connections via generation increment.""" + store = SQLiteStore(tmp_path / "pool-close.db") + store.initialize() + try: + conn_before = store._get_connection() + generation_before = store._pool_generation + + store.close() + + assert store._pool_generation == generation_before + 1 + assert store._pool == {} + + conn_after = store._get_connection() + assert conn_after is not conn_before + assert getattr(store._local, "generation", None) == store._pool_generation + finally: + store.close()