test(codex-lens): add connection pool stress tests

Solution-ID: SOL-1735410004
Issue-ID: ISS-1766921318981-24
Task-ID: T3
This commit is contained in:
catlog22
2025-12-29 18:16:03 +08:00
parent 015b46e58b
commit 9a45732a39

View File

@@ -600,3 +600,99 @@ class TestConcurrentWrites:
assert not errors
stats = writable_store.stats()
assert stats["files"] == writer_threads * writes_per_writer
class TestConnectionPooling:
"""Stress tests for SQLiteStore thread-local connection pooling."""
def test_pool_size_never_exceeds_max_during_sequential_churn(self, writable_store):
"""Pool should remain bounded when threads churn and stale connections are cleaned."""
max_pool_size = writable_store.MAX_POOL_SIZE
def make_thread():
def worker():
writable_store._get_connection()
t = threading.Thread(target=worker)
t.start()
t.join()
for _ in range(max_pool_size + 50):
make_thread()
writable_store._cleanup_stale_connections()
assert len(writable_store._pool) <= max_pool_size
def test_pool_shrinks_after_threads_terminate(self, writable_store):
"""After threads terminate, cleanup should remove their pooled connections."""
thread_count = 20
barrier = threading.Barrier(thread_count + 1)
created = []
errors = []
lock = threading.Lock()
current_tid = threading.get_ident()
def worker():
try:
writable_store._get_connection()
with lock:
created.append(threading.get_ident())
barrier.wait(timeout=5)
except Exception as exc:
with lock:
errors.append(exc)
threads = [threading.Thread(target=worker) for _ in range(thread_count)]
for t in threads:
t.start()
barrier.wait(timeout=5)
assert not errors
assert len(writable_store._pool) >= thread_count
for t in threads:
t.join()
writable_store._cleanup_stale_connections()
assert all(tid not in writable_store._pool for tid in created)
assert set(writable_store._pool.keys()).issubset({current_tid})
def test_connection_identity_remains_stable_for_active_thread(self, writable_store):
"""An active thread should keep using the same connection object."""
main_conn = writable_store._get_connection()
errors = []
lock = threading.Lock()
def worker():
try:
writable_store._get_connection()
except Exception as exc:
with lock:
errors.append(exc)
threads = [threading.Thread(target=worker) for _ in range(15)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors
assert writable_store._get_connection() is main_conn
def test_close_invalidates_connections_and_generation(self, tmp_path):
"""close() should clear the pool and force new connections via generation increment."""
store = SQLiteStore(tmp_path / "pool-close.db")
store.initialize()
try:
conn_before = store._get_connection()
generation_before = store._pool_generation
store.close()
assert store._pool_generation == generation_before + 1
assert store._pool == {}
conn_after = store._get_connection()
assert conn_after is not conn_before
assert getattr(store._local, "generation", None) == store._pool_generation
finally:
store.close()