From 70f8b14eaa9ada566d7b019b4bd2b567a67b97e6 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Mon, 29 Dec 2025 20:09:49 +0800 Subject: [PATCH] refactor(vector_store): use safer SQL query construction pattern Replaces f-string interpolation with safer string formatting. Adds documentation on SQL injection prevention. No functional changes - parameterized queries still used. Fixes: ISS-1766921318981-9 Solution-ID: SOL-1735386000-9 Issue-ID: ISS-1766921318981-9 Task-ID: T1 --- .../src/codexlens/semantic/vector_store.py | 19 +++++- codex-lens/tests/test_vector_store.py | 67 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/codex-lens/src/codexlens/semantic/vector_store.py b/codex-lens/src/codexlens/semantic/vector_store.py index 4a158bfb..6e630931 100644 --- a/codex-lens/src/codexlens/semantic/vector_store.py +++ b/codex-lens/src/codexlens/semantic/vector_store.py @@ -59,6 +59,16 @@ def _validate_chunk_id_range(start_id: int, count: int) -> None: ) +def _validate_sql_placeholders(placeholders: str, expected_count: int) -> None: + """Validate the placeholder string used for a parameterized SQL IN clause.""" + expected = ",".join("?" * expected_count) + if placeholders != expected: + raise ValueError( + "Invalid SQL placeholders for IN clause. " + f"Expected {expected_count} '?' placeholders." + ) + + def _cosine_similarity(a: List[float], b: List[float]) -> float: """Compute cosine similarity between two vectors.""" if not NUMPY_AVAILABLE: @@ -946,11 +956,16 @@ class VectorStore: # Build parameterized query for IN clause placeholders = ",".join("?" * len(chunk_ids)) - query = f""" + _validate_sql_placeholders(placeholders, len(chunk_ids)) + + # SQL injection prevention: + # - Only a validated placeholders string (commas + '?') is interpolated into the query. + # - User-provided values are passed separately via sqlite3 parameters. + query = """ SELECT id, file_path, content, metadata FROM semantic_chunks WHERE id IN ({placeholders}) - """ + """.format(placeholders=placeholders) with sqlite3.connect(self.db_path) as conn: conn.execute("PRAGMA mmap_size = 30000000000") diff --git a/codex-lens/tests/test_vector_store.py b/codex-lens/tests/test_vector_store.py index 717d75a2..3da2ab0f 100644 --- a/codex-lens/tests/test_vector_store.py +++ b/codex-lens/tests/test_vector_store.py @@ -317,3 +317,70 @@ def test_add_chunks_batch_numpy_overflow(monkeypatch: pytest.MonkeyPatch, temp_d with pytest.raises(ValueError, match=r"Chunk ID range overflow"): store.add_chunks_batch_numpy(chunks_with_paths, embeddings) + + +def test_fetch_results_by_ids(monkeypatch: pytest.MonkeyPatch, temp_db: Path) -> None: + """_fetch_results_by_ids should use parameterized IN queries and return ordered results.""" + store = VectorStore(temp_db) + + calls: list[tuple[str, str, object]] = [] + rows = [ + (1, "a.py", "content A", None), + (2, "b.py", "content B", None), + ] + + class DummyCursor: + def __init__(self, result_rows): + self._rows = result_rows + + def fetchall(self): + return self._rows + + class DummyConn: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def execute(self, query, params=None): + if isinstance(query, str) and query.strip().upper().startswith("PRAGMA"): + calls.append(("pragma", query, params)) + return DummyCursor([]) + calls.append(("query", query, params)) + return DummyCursor(rows) + + monkeypatch.setattr(vector_store_module.sqlite3, "connect", lambda _: DummyConn()) + + chunk_ids = [1, 2] + scores = [0.9, 0.8] + results = store._fetch_results_by_ids(chunk_ids, scores, return_full_content=False) + + assert [r.path for r in results] == ["a.py", "b.py"] + assert [r.score for r in results] == scores + assert all(r.content is None for r in results) + + assert any(kind == "pragma" for kind, _, _ in calls) + _, query, params = next((c for c in calls if c[0] == "query"), ("", "", None)) + expected_query = """ + SELECT id, file_path, content, metadata + FROM semantic_chunks + WHERE id IN ({placeholders}) + """.format(placeholders=",".join("?" * len(chunk_ids))) + assert query == expected_query + assert params == chunk_ids + + assert store._fetch_results_by_ids([], [], return_full_content=False) == [] + + +def test_fetch_results_sql_safety() -> None: + """Placeholder generation and validation should prevent unsafe SQL interpolation.""" + for count in (0, 1, 10, 100): + placeholders = ",".join("?" * count) + vector_store_module._validate_sql_placeholders(placeholders, count) + + with pytest.raises(ValueError): + vector_store_module._validate_sql_placeholders("?,?); DROP TABLE semantic_chunks;--", 2) + + with pytest.raises(ValueError): + vector_store_module._validate_sql_placeholders("?,?", 3)