From 042a99dbe3c34a837f9dc7fc2304bdad7569ac52 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Mon, 29 Dec 2025 17:59:08 +0800 Subject: [PATCH] test(codex-lens): add concurrent read operation tests Solution-ID: SOL-1735410004 Issue-ID: ISS-1766921318981-24 Task-ID: T1 --- codex-lens/tests/test_storage_concurrency.py | 347 +++++++++++++++++++ 1 file changed, 347 insertions(+) create mode 100644 codex-lens/tests/test_storage_concurrency.py diff --git a/codex-lens/tests/test_storage_concurrency.py b/codex-lens/tests/test_storage_concurrency.py new file mode 100644 index 00000000..7338c6de --- /dev/null +++ b/codex-lens/tests/test_storage_concurrency.py @@ -0,0 +1,347 @@ +"""Concurrency tests for CodexLens storage managers.""" + +from __future__ import annotations + +import threading +import time +import tempfile +from pathlib import Path + +import pytest + +from codexlens.entities import IndexedFile, Symbol +from codexlens.storage.dir_index import DirIndexStore +from codexlens.storage.registry import RegistryStore +from codexlens.storage.sqlite_store import SQLiteStore + + +@pytest.fixture(scope="module") +def populated_store(): + """Create a SQLiteStore populated with 1000+ files across multiple directories.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "concurrency.db" + store = SQLiteStore(db_path) + store.initialize() + + files = [] + for i in range(1000): + path = f"/test/dir_{i % 25}/file_{i}.py" + content = f"# token_{i}\n\ndef func_{i}():\n return {i}\n" + symbols = [Symbol(name=f"func_{i}", kind="function", range=(1, 1))] + files.append((IndexedFile(path=path, language="python", symbols=symbols), content)) + + store.add_files(files) + yield store + store.close() + + +@pytest.fixture() +def registry_store(tmp_path): + """Create a RegistryStore in a temporary database with a single registered project.""" + db_path = tmp_path / "registry.db" + store = RegistryStore(db_path) + store.initialize() + store.register_project(source_root=tmp_path / "src", index_root=tmp_path / "idx") + yield store + store.close() + + +@pytest.fixture() +def dir_index_store(tmp_path): + """Create a DirIndexStore for concurrency tests.""" + db_path = tmp_path / "_index.db" + store = DirIndexStore(db_path) + store.initialize() + + # Seed a few entries for read tests + for i in range(10): + store.add_file( + name=f"file_{i}.py", + full_path=tmp_path / f"file_{i}.py", + content=f"# dir-index token_{i}\nprint({i})\n", + language="python", + symbols=[Symbol(name=f"sym_{i}", kind="function", range=(1, 1))], + ) + + yield store + store.close() + + +class TestConcurrentReads: + """Concurrent read tests for storage managers.""" + + def test_concurrent_stats_same_query_consistent(self, populated_store): + """Concurrent reads from 10 threads accessing the same stats query.""" + results = [] + errors = [] + lock = threading.Lock() + + def worker(): + try: + stats = populated_store.stats() + with lock: + results.append(stats) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert len(results) == 10 + assert all(r["files"] == 1000 for r in results) + assert all(r["symbols"] == 1000 for r in results) + + def test_concurrent_file_exists_same_file(self, populated_store): + """Concurrent reads from 10 threads checking the same file path.""" + target = "/test/dir_0/file_0.py" + results = [] + errors = [] + lock = threading.Lock() + + def worker(): + try: + ok = populated_store.file_exists(target) + with lock: + results.append(ok) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert results == [True] * 10 + + def test_concurrent_search_fts_same_token_consistent(self, populated_store): + """Concurrent reads from 10 threads searching the same FTS token.""" + results = [] + errors = [] + lock = threading.Lock() + + def worker(): + try: + matches = populated_store.search_fts("token_42") + with lock: + results.append(len(matches)) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert results == [1] * 10 + + def test_concurrent_search_fts_different_tokens(self, populated_store): + """Concurrent reads from 20 threads searching different tokens.""" + results = {} + errors = [] + lock = threading.Lock() + + def worker(i: int): + try: + matches = populated_store.search_fts(f"token_{i}") + with lock: + results[i] = len(matches) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert len(results) == 20 + assert all(results[i] == 1 for i in range(20)) + + def test_connection_pool_thread_local_isolation(self, populated_store): + """Each thread should get a dedicated connection object.""" + conn_ids = [] + errors = [] + lock = threading.Lock() + + def worker(): + try: + conn = populated_store._get_connection() + with lock: + conn_ids.append(id(conn)) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert len(set(conn_ids)) == len(conn_ids) + + def test_connection_reuse_within_thread(self, populated_store): + """Connections should be reused within the same thread.""" + errors = [] + + def worker(): + try: + c1 = populated_store._get_connection() + c2 = populated_store._get_connection() + assert c1 is c2 + except Exception as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + + def test_pool_cleanup_removes_dead_thread_connections(self, populated_store): + """cleanup_stale_connections should remove connections for terminated threads.""" + created = [] + lock = threading.Lock() + current_tid = threading.get_ident() + + def worker(): + conn = populated_store._get_connection() + with lock: + created.append(threading.get_ident()) + # allow the thread to end quickly + + threads = [threading.Thread(target=worker) for _ in range(15)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Ensure pool has entries for the threads we created + assert len(populated_store._pool) >= 10 + + populated_store._cleanup_stale_connections() + # Main thread connection may remain active; all terminated thread connections must be removed. + assert all(tid not in populated_store._pool for tid in created) + assert set(populated_store._pool.keys()).issubset({current_tid}) + + def test_pool_size_respects_max_after_sequential_load(self, populated_store): + """Pool should stay within MAX_POOL_SIZE once stale threads are cleaned up.""" + max_pool_size = populated_store.MAX_POOL_SIZE + + def make_thread(): + def worker(): + populated_store._get_connection() + + t = threading.Thread(target=worker) + t.start() + t.join() + + # Create more than MAX_POOL_SIZE thread connections sequentially. + for _ in range(max_pool_size + 8): + make_thread() + + populated_store._cleanup_stale_connections() + assert len(populated_store._pool) <= max_pool_size + + def test_read_throughput_measurement(self, populated_store): + """Measure simple read throughput scaling by thread count.""" + target_paths = [f"/test/dir_{i % 25}/file_{i}.py" for i in range(200)] + + def run(thread_count: int) -> float: + per_thread = 200 + errors = [] + + def worker(offset: int): + try: + for j in range(per_thread): + populated_store.file_exists(target_paths[(offset + j) % len(target_paths)]) + except Exception as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(thread_count)] + start = time.time() + for t in threads: + t.start() + for t in threads: + t.join() + duration = max(time.time() - start, 1e-6) + + assert not errors + total_ops = thread_count * per_thread + return total_ops / duration + + qps_1 = run(1) + qps_5 = run(5) + qps_10 = run(10) + qps_20 = run(20) + + # Sanity: throughput is measurable (no zeros). Do not assert strict scaling + # due to platform/GIL variability. + assert qps_1 > 0 + assert qps_5 > 0 + assert qps_10 > 0 + assert qps_20 > 0 + + def test_registry_store_concurrent_list_projects(self, registry_store): + """RegistryStore should support concurrent read access across threads.""" + results = [] + errors = [] + lock = threading.Lock() + + def worker(): + try: + projects = registry_store.list_projects() + with lock: + results.append(len(projects)) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert results == [1] * 10 + + def test_dir_index_store_concurrent_list_files(self, dir_index_store): + """DirIndexStore should support concurrent read listing via its internal lock.""" + results = [] + errors = [] + lock = threading.Lock() + + def worker(): + try: + files = dir_index_store.list_files() + with lock: + results.append(len(files)) + except Exception as exc: + with lock: + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert results == [10] * 10