Optimize SQLite FTS storage and pooling

This commit is contained in:
catlog22
2025-12-12 17:40:03 +08:00
parent 6a39f7e69d
commit 92d2085b64
4 changed files with 489 additions and 158 deletions

View File

@@ -137,6 +137,7 @@ def init(
languages = _parse_languages(language)
base_path = path.expanduser().resolve()
store: SQLiteStore | None = None
try:
# Determine database location
if use_global:
@@ -197,6 +198,9 @@ def init(
print_json(success=False, error=str(exc))
else:
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
@app.command()
@@ -214,6 +218,7 @@ def search(
"""
_configure_logging(verbose)
store: SQLiteStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
@@ -229,6 +234,9 @@ def search(
else:
console.print(f"[red]Search failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
@app.command()
@@ -252,6 +260,7 @@ def symbol(
"""
_configure_logging(verbose)
store: SQLiteStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
@@ -267,6 +276,9 @@ def symbol(
else:
console.print(f"[red]Symbol lookup failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
@app.command()
@@ -316,6 +328,7 @@ def status(
"""
_configure_logging(verbose)
store: SQLiteStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
@@ -330,6 +343,9 @@ def status(
else:
console.print(f"[red]Status failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
@app.command()
@@ -351,6 +367,7 @@ def update(
config = Config()
factory = ParserFactory(config)
store: SQLiteStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
@@ -427,6 +444,9 @@ def update(
else:
console.print(f"[red]Update failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
@app.command()

View File

@@ -14,201 +14,256 @@ from codexlens.errors import StorageError
class SQLiteStore:
"""SQLiteStore providing FTS5 search and symbol lookup."""
"""SQLiteStore providing FTS5 search and symbol lookup.
Implements thread-local connection pooling for improved performance.
"""
def __init__(self, db_path: str | Path) -> None:
self.db_path = Path(db_path)
self._lock = threading.RLock()
self._local = threading.local()
self._pool_lock = threading.Lock()
self._pool: Dict[int, sqlite3.Connection] = {}
self._pool_generation = 0
def _get_connection(self) -> sqlite3.Connection:
"""Get or create a thread-local database connection."""
thread_id = threading.get_ident()
if getattr(self._local, "generation", None) == self._pool_generation:
conn = getattr(self._local, "conn", None)
if conn is not None:
return conn
with self._pool_lock:
conn = self._pool.get(thread_id)
if conn is None:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA foreign_keys=ON")
self._pool[thread_id] = conn
self._local.conn = conn
self._local.generation = self._pool_generation
return conn
def close(self) -> None:
"""Close all pooled connections."""
with self._lock:
with self._pool_lock:
for conn in self._pool.values():
conn.close()
self._pool.clear()
self._pool_generation += 1
if hasattr(self._local, "conn"):
self._local.conn = None
if hasattr(self._local, "generation"):
self._local.generation = self._pool_generation
def __enter__(self) -> SQLiteStore:
self.initialize()
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
self.close()
def initialize(self) -> None:
with self._lock:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
self._create_schema(conn)
conn = self._get_connection()
self._create_schema(conn)
self._ensure_fts_external_content(conn)
def add_file(self, indexed_file: IndexedFile, content: str) -> None:
with self._lock:
with self._connect() as conn:
path = str(Path(indexed_file.path).resolve())
language = indexed_file.language
mtime = Path(path).stat().st_mtime if Path(path).exists() else None
line_count = content.count("\n") + 1
conn = self._get_connection()
path = str(Path(indexed_file.path).resolve())
language = indexed_file.language
mtime = Path(path).stat().st_mtime if Path(path).exists() else None
line_count = content.count(chr(10)) + 1
conn.execute(
conn.execute(
"""
INSERT INTO files(path, language, content, mtime, line_count)
VALUES(?, ?, ?, ?, ?)
ON CONFLICT(path) DO UPDATE SET
language=excluded.language,
content=excluded.content,
mtime=excluded.mtime,
line_count=excluded.line_count
""",
(path, language, content, mtime, line_count),
)
row = conn.execute("SELECT id FROM files WHERE path=?", (path,)).fetchone()
if not row:
raise StorageError(f"Failed to read file id for {path}")
file_id = int(row["id"])
conn.execute("DELETE FROM symbols WHERE file_id=?", (file_id,))
if indexed_file.symbols:
conn.executemany(
"""
INSERT INTO files(path, language, content, mtime, line_count)
INSERT INTO symbols(file_id, name, kind, start_line, end_line)
VALUES(?, ?, ?, ?, ?)
ON CONFLICT(path) DO UPDATE SET
language=excluded.language,
content=excluded.content,
mtime=excluded.mtime,
line_count=excluded.line_count
""",
(path, language, content, mtime, line_count),
[
(file_id, s.name, s.kind, s.range[0], s.range[1])
for s in indexed_file.symbols
],
)
row = conn.execute("SELECT id FROM files WHERE path=?", (path,)).fetchone()
if not row:
raise StorageError(f"Failed to read file id for {path}")
file_id = int(row["id"])
conn.execute(
"INSERT OR REPLACE INTO files_fts(rowid, path, language, content) VALUES(?, ?, ?, ?)",
(file_id, path, language, content),
)
conn.execute("DELETE FROM symbols WHERE file_id=?", (file_id,))
if indexed_file.symbols:
conn.executemany(
"""
INSERT INTO symbols(file_id, name, kind, start_line, end_line)
VALUES(?, ?, ?, ?, ?)
""",
[
(file_id, s.name, s.kind, s.range[0], s.range[1])
for s in indexed_file.symbols
],
)
conn.commit()
def remove_file(self, path: str | Path) -> bool:
"""Remove a file from the index.
Returns True if the file was removed, False if it didn't exist.
"""
"""Remove a file from the index."""
with self._lock:
with self._connect() as conn:
resolved_path = str(Path(path).resolve())
conn = self._get_connection()
resolved_path = str(Path(path).resolve())
# Get file_id first
row = conn.execute(
"SELECT id FROM files WHERE path=?", (resolved_path,)
).fetchone()
row = conn.execute(
"SELECT id FROM files WHERE path=?", (resolved_path,)
).fetchone()
if not row:
return False
if not row:
return False
file_id = int(row["id"])
# Delete from FTS index
conn.execute("DELETE FROM files_fts WHERE rowid=?", (file_id,))
# Delete symbols (CASCADE should handle this, but be explicit)
conn.execute("DELETE FROM symbols WHERE file_id=?", (file_id,))
# Delete file record
conn.execute("DELETE FROM files WHERE id=?", (file_id,))
return True
file_id = int(row["id"])
conn.execute("DELETE FROM files WHERE id=?", (file_id,))
conn.commit()
return True
def file_exists(self, path: str | Path) -> bool:
"""Check if a file exists in the index."""
with self._lock:
with self._connect() as conn:
resolved_path = str(Path(path).resolve())
row = conn.execute(
"SELECT 1 FROM files WHERE path=?", (resolved_path,)
).fetchone()
return row is not None
conn = self._get_connection()
resolved_path = str(Path(path).resolve())
row = conn.execute(
"SELECT 1 FROM files WHERE path=?", (resolved_path,)
).fetchone()
return row is not None
def get_file_mtime(self, path: str | Path) -> float | None:
"""Get the stored mtime for a file, or None if not indexed."""
"""Get the stored mtime for a file."""
with self._lock:
with self._connect() as conn:
resolved_path = str(Path(path).resolve())
row = conn.execute(
"SELECT mtime FROM files WHERE path=?", (resolved_path,)
).fetchone()
return float(row["mtime"]) if row and row["mtime"] else None
conn = self._get_connection()
resolved_path = str(Path(path).resolve())
row = conn.execute(
"SELECT mtime FROM files WHERE path=?", (resolved_path,)
).fetchone()
return float(row["mtime"]) if row and row["mtime"] else None
def search_fts(self, query: str, *, limit: int = 20, offset: int = 0) -> List[SearchResult]:
with self._lock:
with self._connect() as conn:
try:
rows = conn.execute(
"""
SELECT rowid, path, bm25(files_fts) AS rank,
snippet(files_fts, 2, '[bold red]', '[/bold red]', '', 20) AS excerpt
FROM files_fts
WHERE files_fts MATCH ?
ORDER BY rank
LIMIT ? OFFSET ?
""",
(query, limit, offset),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
conn = self._get_connection()
try:
rows = conn.execute(
"""
SELECT rowid, path, bm25(files_fts) AS rank,
snippet(files_fts, 2, '[bold red]', '[/bold red]', "...", 20) AS excerpt
FROM files_fts
WHERE files_fts MATCH ?
ORDER BY rank
LIMIT ? OFFSET ?
""",
(query, limit, offset),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
# BM25 returns negative values where more negative = better match
# Convert to positive score where higher = better
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = max(0.0, -rank) # Negate to make positive, clamp at 0
results.append(
SearchResult(
path=row["path"],
score=score,
excerpt=row["excerpt"],
)
results: List[SearchResult] = []
for row in rows:
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = max(0.0, -rank)
results.append(
SearchResult(
path=row["path"],
score=score,
excerpt=row["excerpt"],
)
return results
)
return results
def search_files_only(
self, query: str, *, limit: int = 20, offset: int = 0
) -> List[str]:
"""Search indexed file contents and return only file paths."""
with self._lock:
conn = self._get_connection()
try:
rows = conn.execute(
"""
SELECT path
FROM files_fts
WHERE files_fts MATCH ?
ORDER BY bm25(files_fts)
LIMIT ? OFFSET ?
""",
(query, limit, offset),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
return [row["path"] for row in rows]
def search_symbols(
self, name: str, *, kind: Optional[str] = None, limit: int = 50
) -> List[Symbol]:
pattern = f"%{name}%"
with self._lock:
with self._connect() as conn:
if kind:
rows = conn.execute(
"""
SELECT name, kind, start_line, end_line
FROM symbols
WHERE name LIKE ? AND kind=?
ORDER BY name
LIMIT ?
""",
(pattern, kind, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT name, kind, start_line, end_line
FROM symbols
WHERE name LIKE ?
ORDER BY name
LIMIT ?
""",
(pattern, limit),
).fetchall()
conn = self._get_connection()
if kind:
rows = conn.execute(
"""
SELECT name, kind, start_line, end_line
FROM symbols
WHERE name LIKE ? AND kind=?
ORDER BY name
LIMIT ?
""",
(pattern, kind, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT name, kind, start_line, end_line
FROM symbols
WHERE name LIKE ?
ORDER BY name
LIMIT ?
""",
(pattern, limit),
).fetchall()
return [
Symbol(name=row["name"], kind=row["kind"], range=(row["start_line"], row["end_line"]))
for row in rows
]
return [
Symbol(name=row["name"], kind=row["kind"], range=(row["start_line"], row["end_line"]))
for row in rows
]
def stats(self) -> Dict[str, Any]:
with self._lock:
with self._connect() as conn:
file_count = conn.execute("SELECT COUNT(*) AS c FROM files").fetchone()["c"]
symbol_count = conn.execute("SELECT COUNT(*) AS c FROM symbols").fetchone()["c"]
lang_rows = conn.execute(
"SELECT language, COUNT(*) AS c FROM files GROUP BY language ORDER BY c DESC"
).fetchall()
languages = {row["language"]: row["c"] for row in lang_rows}
return {
"files": int(file_count),
"symbols": int(symbol_count),
"languages": languages,
"db_path": str(self.db_path),
}
conn = self._get_connection()
file_count = conn.execute("SELECT COUNT(*) AS c FROM files").fetchone()["c"]
symbol_count = conn.execute("SELECT COUNT(*) AS c FROM symbols").fetchone()["c"]
lang_rows = conn.execute(
"SELECT language, COUNT(*) AS c FROM files GROUP BY language ORDER BY c DESC"
).fetchall()
languages = {row["language"]: row["c"] for row in lang_rows}
return {
"files": int(file_count),
"symbols": int(symbol_count),
"languages": languages,
"db_path": str(self.db_path),
}
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
"""Legacy method for backward compatibility."""
return self._get_connection()
def _create_schema(self, conn: sqlite3.Connection) -> None:
try:
@@ -224,15 +279,6 @@ class SQLiteStore:
)
"""
)
conn.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS files_fts USING fts5(
path UNINDEXED,
language UNINDEXED,
content
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS symbols (
@@ -247,6 +293,110 @@ class SQLiteStore:
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_kind ON symbols(kind)")
conn.commit()
except sqlite3.DatabaseError as exc:
raise StorageError(f"Failed to initialize database schema: {exc}") from exc
def _ensure_fts_external_content(self, conn: sqlite3.Connection) -> None:
"""Ensure files_fts is an FTS5 external-content table (no content duplication)."""
try:
sql_row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='files_fts'"
).fetchone()
sql = str(sql_row["sql"]) if sql_row and sql_row["sql"] else None
if sql is None:
self._create_external_fts(conn)
conn.commit()
return
if (
"content='files'" in sql
or 'content="files"' in sql
or "content=files" in sql
):
self._create_fts_triggers(conn)
conn.commit()
return
self._migrate_fts_to_external(conn)
except sqlite3.DatabaseError as exc:
raise StorageError(f"Failed to ensure FTS schema: {exc}") from exc
def _create_external_fts(self, conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE VIRTUAL TABLE files_fts USING fts5(
path UNINDEXED,
language UNINDEXED,
content,
content='files',
content_rowid='id'
)
"""
)
self._create_fts_triggers(conn)
def _create_fts_triggers(self, conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS files_ai AFTER INSERT ON files BEGIN
INSERT INTO files_fts(rowid, path, language, content)
VALUES(new.id, new.path, new.language, new.content);
END
"""
)
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS files_ad AFTER DELETE ON files BEGIN
INSERT INTO files_fts(files_fts, rowid, path, language, content)
VALUES('delete', old.id, old.path, old.language, old.content);
END
"""
)
conn.execute(
"""
CREATE TRIGGER IF NOT EXISTS files_au AFTER UPDATE ON files BEGIN
INSERT INTO files_fts(files_fts, rowid, path, language, content)
VALUES('delete', old.id, old.path, old.language, old.content);
INSERT INTO files_fts(rowid, path, language, content)
VALUES(new.id, new.path, new.language, new.content);
END
"""
)
def _migrate_fts_to_external(self, conn: sqlite3.Connection) -> None:
"""Migrate legacy files_fts (with duplicated content) to external content."""
try:
conn.execute("BEGIN")
conn.execute("DROP TRIGGER IF EXISTS files_ai")
conn.execute("DROP TRIGGER IF EXISTS files_ad")
conn.execute("DROP TRIGGER IF EXISTS files_au")
conn.execute("ALTER TABLE files_fts RENAME TO files_fts_legacy")
self._create_external_fts(conn)
conn.execute("INSERT INTO files_fts(files_fts) VALUES('rebuild')")
conn.execute("DROP TABLE files_fts_legacy")
conn.commit()
except sqlite3.DatabaseError:
try:
conn.rollback()
except Exception:
pass
try:
conn.execute("DROP TABLE IF EXISTS files_fts")
except Exception:
pass
try:
conn.execute("ALTER TABLE files_fts_legacy RENAME TO files_fts")
conn.commit()
except Exception:
pass
raise
try:
conn.execute("VACUUM")
except sqlite3.DatabaseError:
pass

View File

@@ -0,0 +1 @@
"""CodexLens test suite."""

View File

@@ -0,0 +1,160 @@
"""Tests for CodexLens storage."""
import sqlite3
import pytest
import tempfile
from pathlib import Path
from codexlens.storage.sqlite_store import SQLiteStore
from codexlens.entities import IndexedFile, Symbol
@pytest.fixture
def temp_db():
"""Create a temporary database for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
store = SQLiteStore(db_path)
store.initialize()
yield store
store.close()
class TestSQLiteStore:
"""Tests for SQLiteStore."""
def test_initialize(self, temp_db):
"""Test database initialization."""
stats = temp_db.stats()
assert stats["files"] == 0
assert stats["symbols"] == 0
def test_fts_uses_external_content(self, temp_db):
"""FTS should be configured as external-content to avoid duplication."""
conn = temp_db._get_connection()
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='files_fts'"
).fetchone()
assert row is not None
assert "content='files'" in row["sql"] or "content=files" in row["sql"]
def test_add_file(self, temp_db):
"""Test adding a file to the index."""
indexed_file = IndexedFile(
path="/test/file.py",
language="python",
symbols=[
Symbol(name="hello", kind="function", range=(1, 1)),
],
chunks=[],
)
temp_db.add_file(indexed_file, "def hello():\n pass")
stats = temp_db.stats()
assert stats["files"] == 1
assert stats["symbols"] == 1
def test_remove_file(self, temp_db):
"""Test removing a file from the index."""
indexed_file = IndexedFile(
path="/test/file.py",
language="python",
symbols=[],
chunks=[],
)
temp_db.add_file(indexed_file, "# test")
assert temp_db.file_exists("/test/file.py")
assert temp_db.remove_file("/test/file.py")
assert not temp_db.file_exists("/test/file.py")
def test_search_fts(self, temp_db):
"""Test FTS search."""
indexed_file = IndexedFile(
path="/test/file.py",
language="python",
symbols=[],
chunks=[],
)
temp_db.add_file(indexed_file, "def hello_world():\n print('hello')")
results = temp_db.search_fts("hello")
assert len(results) == 1
assert str(Path("/test/file.py").resolve()) == results[0].path
def test_search_symbols(self, temp_db):
"""Test symbol search."""
indexed_file = IndexedFile(
path="/test/file.py",
language="python",
symbols=[
Symbol(name="hello_world", kind="function", range=(1, 1)),
Symbol(name="goodbye", kind="function", range=(3, 3)),
],
chunks=[],
)
temp_db.add_file(indexed_file, "def hello_world():\n pass\ndef goodbye():\n pass")
results = temp_db.search_symbols("hello")
assert len(results) == 1
assert results[0].name == "hello_world"
def test_connection_reuse(self, temp_db):
"""Test that connections are reused within the same thread."""
conn1 = temp_db._get_connection()
conn2 = temp_db._get_connection()
assert conn1 is conn2
def test_migrate_legacy_fts_to_external(self, tmp_path):
"""Existing databases should be migrated to external-content FTS."""
db_path = tmp_path / "legacy.db"
with sqlite3.connect(db_path) as conn:
conn.execute(
"""
CREATE TABLE files (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE NOT NULL,
language TEXT NOT NULL,
content TEXT NOT NULL,
mtime REAL,
line_count INTEGER
)
"""
)
conn.execute(
"""
CREATE VIRTUAL TABLE files_fts USING fts5(
path UNINDEXED,
language UNINDEXED,
content
)
"""
)
conn.execute(
"""
INSERT INTO files(path, language, content, mtime, line_count)
VALUES(?, ?, ?, ?, ?)
""",
(str(Path("/test/file.py").resolve()), "python", "def hello():\n pass", None, 2),
)
file_id = conn.execute("SELECT id FROM files").fetchone()[0]
conn.execute(
"INSERT INTO files_fts(rowid, path, language, content) VALUES(?, ?, ?, ?)",
(file_id, str(Path("/test/file.py").resolve()), "python", "def hello():\n pass"),
)
conn.commit()
store = SQLiteStore(db_path)
store.initialize()
try:
results = store.search_fts("hello")
assert len(results) == 1
conn = store._get_connection()
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='files_fts'"
).fetchone()
assert row is not None
assert "content='files'" in row["sql"] or "content=files" in row["sql"]
finally:
store.close()