feat(codexlens): add CodexLens code indexing platform with incremental updates

- Add CodexLens Python package with SQLite FTS5 search and tree-sitter parsing
- Implement workspace-local index storage (.codexlens/ directory)
- Add incremental update CLI command for efficient file-level index refresh
- Integrate CodexLens with CCW tools (codex_lens action: update)
- Add CodexLens Auto-Sync hook template for automatic index updates on file changes
- Add CodexLens status card in CCW Dashboard CLI Manager with install/init buttons
- Add server APIs: /api/codexlens/status, /api/codexlens/bootstrap, /api/codexlens/init

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
catlog22
2025-12-12 15:02:32 +08:00
parent b74a90b416
commit a393601ec5
31 changed files with 2718 additions and 27 deletions

34
codex-lens/pyproject.toml Normal file
View File

@@ -0,0 +1,34 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "codex-lens"
version = "0.1.0"
description = "CodexLens multi-modal code analysis platform"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [
{ name = "CodexLens contributors" }
]
dependencies = [
"typer>=0.9",
"rich>=13",
"pydantic>=2.0",
"tree-sitter>=0.20",
"pathspec>=0.11",
]
[project.optional-dependencies]
semantic = [
"numpy>=1.24",
"sentence-transformers>=2.2",
]
[project.urls]
Homepage = "https://github.com/openai/codex-lens"
[tool.setuptools]
package-dir = { "" = "src" }

View File

@@ -0,0 +1,17 @@
Metadata-Version: 2.4
Name: codex-lens
Version: 0.1.0
Summary: CodexLens multi-modal code analysis platform
Author: CodexLens contributors
License: MIT
Project-URL: Homepage, https://github.com/openai/codex-lens
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: typer>=0.9
Requires-Dist: rich>=13
Requires-Dist: pydantic>=2.0
Requires-Dist: tree-sitter>=0.20
Requires-Dist: pathspec>=0.11
Provides-Extra: semantic
Requires-Dist: numpy>=1.24; extra == "semantic"
Requires-Dist: sentence-transformers>=2.2; extra == "semantic"

View File

@@ -0,0 +1,23 @@
pyproject.toml
src/codex_lens.egg-info/PKG-INFO
src/codex_lens.egg-info/SOURCES.txt
src/codex_lens.egg-info/dependency_links.txt
src/codex_lens.egg-info/requires.txt
src/codex_lens.egg-info/top_level.txt
src/codexlens/__init__.py
src/codexlens/__main__.py
src/codexlens/config.py
src/codexlens/entities.py
src/codexlens/errors.py
src/codexlens/cli/__init__.py
src/codexlens/cli/commands.py
src/codexlens/cli/output.py
src/codexlens/parsers/__init__.py
src/codexlens/parsers/factory.py
src/codexlens/semantic/__init__.py
src/codexlens/semantic/chunker.py
src/codexlens/semantic/embedder.py
src/codexlens/semantic/vector_store.py
src/codexlens/storage/__init__.py
src/codexlens/storage/file_cache.py
src/codexlens/storage/sqlite_store.py

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,9 @@
typer>=0.9
rich>=13
pydantic>=2.0
tree-sitter>=0.20
pathspec>=0.11
[semantic]
numpy>=1.24
sentence-transformers>=2.2

View File

@@ -0,0 +1 @@
codexlens

View File

@@ -0,0 +1,28 @@
"""CodexLens package."""
from __future__ import annotations
from . import config, entities, errors
from .config import Config
from .entities import IndexedFile, SearchResult, SemanticChunk, Symbol
from .errors import CodexLensError, ConfigError, ParseError, SearchError, StorageError
__version__ = "0.1.0"
__all__ = [
"__version__",
"config",
"entities",
"errors",
"Config",
"IndexedFile",
"SearchResult",
"SemanticChunk",
"Symbol",
"CodexLensError",
"ConfigError",
"ParseError",
"StorageError",
"SearchError",
]

View File

@@ -0,0 +1,14 @@
"""Module entrypoint for `python -m codexlens`."""
from __future__ import annotations
from codexlens.cli import app
def main() -> None:
app()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,8 @@
"""CLI package for CodexLens."""
from __future__ import annotations
from .commands import app
__all__ = ["app"]

View File

@@ -0,0 +1,475 @@
"""Typer commands for CodexLens."""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
import typer
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from codexlens.config import Config, WorkspaceConfig, find_workspace_root
from codexlens.entities import IndexedFile, SearchResult, Symbol
from codexlens.errors import CodexLensError
from codexlens.parsers.factory import ParserFactory
from codexlens.storage.sqlite_store import SQLiteStore
from .output import (
console,
print_json,
render_file_inspect,
render_search_results,
render_status,
render_symbols,
)
app = typer.Typer(help="CodexLens CLI — local code indexing and search.")
def _configure_logging(verbose: bool) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format="%(levelname)s %(message)s")
def _parse_languages(raw: Optional[List[str]]) -> Optional[List[str]]:
if not raw:
return None
langs: List[str] = []
for item in raw:
for part in item.split(","):
part = part.strip()
if part:
langs.append(part)
return langs or None
def _load_gitignore(base_path: Path) -> List[str]:
gitignore = base_path / ".gitignore"
if not gitignore.exists():
return []
try:
return [line.strip() for line in gitignore.read_text(encoding="utf-8").splitlines() if line.strip()]
except OSError:
return []
def _iter_source_files(
base_path: Path,
config: Config,
languages: Optional[List[str]] = None,
) -> Iterable[Path]:
ignore_dirs = {".git", ".venv", "venv", "node_modules", "__pycache__", ".codexlens"}
ignore_patterns = _load_gitignore(base_path)
pathspec = None
if ignore_patterns:
try:
from pathspec import PathSpec
from pathspec.patterns.gitwildmatch import GitWildMatchPattern
pathspec = PathSpec.from_lines(GitWildMatchPattern, ignore_patterns)
except Exception:
pathspec = None
for root, dirs, files in os.walk(base_path):
dirs[:] = [d for d in dirs if d not in ignore_dirs and not d.startswith(".")]
root_path = Path(root)
for file in files:
if file.startswith("."):
continue
full_path = root_path / file
rel = full_path.relative_to(base_path)
if pathspec and pathspec.match_file(str(rel)):
continue
language_id = config.language_for_path(full_path)
if not language_id:
continue
if languages and language_id not in languages:
continue
yield full_path
def _get_store_for_path(path: Path, use_global: bool = False) -> tuple[SQLiteStore, Path]:
"""Get SQLiteStore for a path, using workspace-local or global database.
Returns (store, db_path) tuple.
"""
if use_global:
config = Config()
config.ensure_runtime_dirs()
return SQLiteStore(config.db_path), config.db_path
# Try to find existing workspace
workspace = WorkspaceConfig.from_path(path)
if workspace:
return SQLiteStore(workspace.db_path), workspace.db_path
# Fall back to global config
config = Config()
config.ensure_runtime_dirs()
return SQLiteStore(config.db_path), config.db_path
@app.command()
def init(
path: Path = typer.Argument(Path("."), exists=True, file_okay=False, dir_okay=True, help="Project root to index."),
language: Optional[List[str]] = typer.Option(
None,
"--language",
"-l",
help="Limit indexing to specific languages (repeat or comma-separated).",
),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Initialize or rebuild the index for a directory.
Creates a .codexlens/ directory in the project root to store index data.
Use --global to use the global database at ~/.codexlens/ instead.
"""
_configure_logging(verbose)
config = Config()
factory = ParserFactory(config)
languages = _parse_languages(language)
base_path = path.expanduser().resolve()
try:
# Determine database location
if use_global:
config.ensure_runtime_dirs()
db_path = config.db_path
workspace_root = None
else:
# Create workspace-local .codexlens directory
workspace = WorkspaceConfig.create_at(base_path)
db_path = workspace.db_path
workspace_root = workspace.workspace_root
store = SQLiteStore(db_path)
store.initialize()
files = list(_iter_source_files(base_path, config, languages))
indexed_count = 0
symbol_count = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("{task.completed}/{task.total} files"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Indexing", total=len(files))
for file_path in files:
progress.advance(task)
try:
text = file_path.read_text(encoding="utf-8", errors="ignore")
lang_id = config.language_for_path(file_path) or "unknown"
parser = factory.get_parser(lang_id)
indexed_file = parser.parse(text, file_path)
store.add_file(indexed_file, text)
indexed_count += 1
symbol_count += len(indexed_file.symbols)
except Exception as exc:
logging.debug("Failed to index %s: %s", file_path, exc)
continue
result = {
"path": str(base_path),
"files_indexed": indexed_count,
"symbols_indexed": symbol_count,
"languages": languages or sorted(config.supported_languages.keys()),
"db_path": str(db_path),
"workspace_root": str(workspace_root) if workspace_root else None,
}
if json_mode:
print_json(success=True, result=result)
else:
render_status(result)
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
raise typer.Exit(code=1)
@app.command()
def search(
query: str = typer.Argument(..., help="FTS query to run."),
limit: int = typer.Option(20, "--limit", "-n", min=1, max=500, help="Max results."),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Search indexed file contents using SQLite FTS5.
Searches the workspace-local .codexlens/index.db by default.
Use --global to search the global database at ~/.codexlens/.
"""
_configure_logging(verbose)
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
results = store.search_fts(query, limit=limit)
payload = {"query": query, "count": len(results), "results": results}
if json_mode:
print_json(success=True, result=payload)
else:
render_search_results(results)
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Search failed:[/red] {exc}")
raise typer.Exit(code=1)
@app.command()
def symbol(
name: str = typer.Argument(..., help="Symbol name to look up."),
kind: Optional[str] = typer.Option(
None,
"--kind",
"-k",
help="Filter by kind (function|class|method).",
),
limit: int = typer.Option(50, "--limit", "-n", min=1, max=500, help="Max symbols."),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Look up symbols by name and optional kind.
Searches the workspace-local .codexlens/index.db by default.
Use --global to search the global database at ~/.codexlens/.
"""
_configure_logging(verbose)
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
syms = store.search_symbols(name, kind=kind, limit=limit)
payload = {"name": name, "kind": kind, "count": len(syms), "symbols": syms}
if json_mode:
print_json(success=True, result=payload)
else:
render_symbols(syms)
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Symbol lookup failed:[/red] {exc}")
raise typer.Exit(code=1)
@app.command()
def inspect(
file: Path = typer.Argument(..., exists=True, dir_okay=False, help="File to analyze."),
symbols: bool = typer.Option(True, "--symbols/--no-symbols", help="Show discovered symbols."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Analyze a single file and display symbols."""
_configure_logging(verbose)
config = Config()
factory = ParserFactory(config)
file_path = file.expanduser().resolve()
try:
text = file_path.read_text(encoding="utf-8", errors="ignore")
language_id = config.language_for_path(file_path) or "unknown"
parser = factory.get_parser(language_id)
indexed = parser.parse(text, file_path)
payload = {"file": indexed, "content_lines": len(text.splitlines())}
if json_mode:
print_json(success=True, result=payload)
else:
if symbols:
render_file_inspect(indexed.path, indexed.language, indexed.symbols)
else:
render_status({"file": indexed.path, "language": indexed.language})
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Inspect failed:[/red] {exc}")
raise typer.Exit(code=1)
@app.command()
def status(
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Show index statistics.
Shows statistics for the workspace-local .codexlens/index.db by default.
Use --global to show the global database at ~/.codexlens/.
"""
_configure_logging(verbose)
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
stats = store.stats()
if json_mode:
print_json(success=True, result=stats)
else:
render_status(stats)
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Status failed:[/red] {exc}")
raise typer.Exit(code=1)
@app.command()
def update(
files: List[str] = typer.Argument(..., help="File paths to update in the index."),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Incrementally update specific files in the index.
Pass one or more file paths to update. Files that no longer exist
will be removed from the index. New or modified files will be re-indexed.
This is much faster than re-running init for large codebases when
only a few files have changed.
"""
_configure_logging(verbose)
config = Config()
factory = ParserFactory(config)
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
updated = 0
removed = 0
skipped = 0
errors = []
for file_str in files:
file_path = Path(file_str).resolve()
# Check if file exists on disk
if not file_path.exists():
# File was deleted - remove from index
if store.remove_file(file_path):
removed += 1
logging.debug("Removed deleted file: %s", file_path)
else:
skipped += 1
logging.debug("File not in index: %s", file_path)
continue
# Check if file is supported
language_id = config.language_for_path(file_path)
if not language_id:
skipped += 1
logging.debug("Unsupported file type: %s", file_path)
continue
# Check if file needs update (compare mtime)
current_mtime = file_path.stat().st_mtime
stored_mtime = store.get_file_mtime(file_path)
if stored_mtime is not None and abs(current_mtime - stored_mtime) < 0.001:
skipped += 1
logging.debug("File unchanged: %s", file_path)
continue
# Re-index the file
try:
text = file_path.read_text(encoding="utf-8", errors="ignore")
parser = factory.get_parser(language_id)
indexed_file = parser.parse(text, file_path)
store.add_file(indexed_file, text)
updated += 1
logging.debug("Updated file: %s", file_path)
except Exception as exc:
errors.append({"file": str(file_path), "error": str(exc)})
logging.debug("Failed to update %s: %s", file_path, exc)
result = {
"updated": updated,
"removed": removed,
"skipped": skipped,
"errors": errors,
"db_path": str(db_path),
}
if json_mode:
print_json(success=True, result=result)
else:
console.print(f"[green]Updated:[/green] {updated} files")
console.print(f"[yellow]Removed:[/yellow] {removed} files")
console.print(f"[dim]Skipped:[/dim] {skipped} files")
if errors:
console.print(f"[red]Errors:[/red] {len(errors)}")
for err in errors[:5]:
console.print(f" - {err['file']}: {err['error']}")
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Update failed:[/red] {exc}")
raise typer.Exit(code=1)
@app.command()
def clean(
path: Path = typer.Argument(Path("."), exists=True, file_okay=False, dir_okay=True, help="Project root to clean."),
use_global: bool = typer.Option(False, "--global", "-g", help="Clean global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Remove CodexLens index data.
Removes the .codexlens/ directory from the project root.
Use --global to clean the global database at ~/.codexlens/.
"""
_configure_logging(verbose)
base_path = path.expanduser().resolve()
try:
if use_global:
config = Config()
import shutil
if config.index_dir.exists():
shutil.rmtree(config.index_dir)
result = {"cleaned": str(config.index_dir), "type": "global"}
else:
workspace = WorkspaceConfig.from_path(base_path)
if workspace and workspace.codexlens_dir.exists():
import shutil
shutil.rmtree(workspace.codexlens_dir)
result = {"cleaned": str(workspace.codexlens_dir), "type": "workspace"}
else:
result = {"cleaned": None, "type": "workspace", "message": "No workspace found"}
if json_mode:
print_json(success=True, result=result)
else:
if result.get("cleaned"):
console.print(f"[green]Cleaned:[/green] {result['cleaned']}")
else:
console.print("[yellow]No workspace index found to clean.[/yellow]")
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Clean failed:[/red] {exc}")
raise typer.Exit(code=1)

View File

@@ -0,0 +1,91 @@
"""Rich and JSON output helpers for CodexLens CLI."""
from __future__ import annotations
import json
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any, Iterable, Mapping, Sequence
from rich.console import Console
from rich.table import Table
from rich.text import Text
from codexlens.entities import SearchResult, Symbol
console = Console()
def _to_jsonable(value: Any) -> Any:
if value is None:
return None
if hasattr(value, "model_dump"):
return value.model_dump()
if is_dataclass(value):
return asdict(value)
if isinstance(value, Path):
return str(value)
if isinstance(value, Mapping):
return {k: _to_jsonable(v) for k, v in value.items()}
if isinstance(value, (list, tuple, set)):
return [_to_jsonable(v) for v in value]
return value
def print_json(*, success: bool, result: Any = None, error: str | None = None) -> None:
payload: dict[str, Any] = {"success": success}
if success:
payload["result"] = _to_jsonable(result)
else:
payload["error"] = error or "Unknown error"
console.print_json(json.dumps(payload, ensure_ascii=False))
def render_search_results(results: Sequence[SearchResult], *, title: str = "Search Results") -> None:
table = Table(title=title, show_lines=False)
table.add_column("Path", style="cyan", no_wrap=True)
table.add_column("Score", style="magenta", justify="right")
table.add_column("Excerpt", style="white")
for res in results:
excerpt = res.excerpt or ""
table.add_row(res.path, f"{res.score:.3f}", excerpt)
console.print(table)
def render_symbols(symbols: Sequence[Symbol], *, title: str = "Symbols") -> None:
table = Table(title=title)
table.add_column("Name", style="green")
table.add_column("Kind", style="yellow")
table.add_column("Range", style="white", justify="right")
for sym in symbols:
start, end = sym.range
table.add_row(sym.name, sym.kind, f"{start}-{end}")
console.print(table)
def render_status(stats: Mapping[str, Any]) -> None:
table = Table(title="Index Status")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="white")
for key, value in stats.items():
if isinstance(value, Mapping):
value_text = ", ".join(f"{k}:{v}" for k, v in value.items())
elif isinstance(value, (list, tuple)):
value_text = ", ".join(str(v) for v in value)
else:
value_text = str(value)
table.add_row(str(key), value_text)
console.print(table)
def render_file_inspect(path: str, language: str, symbols: Iterable[Symbol]) -> None:
header = Text.assemble(("File: ", "bold"), (path, "cyan"), (" Language: ", "bold"), (language, "green"))
console.print(header)
render_symbols(list(symbols), title="Discovered Symbols")

View File

@@ -0,0 +1,190 @@
"""Configuration system for CodexLens."""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from .errors import ConfigError
# Workspace-local directory name
WORKSPACE_DIR_NAME = ".codexlens"
def _default_global_dir() -> Path:
"""Get global CodexLens data directory."""
env_override = os.getenv("CODEXLENS_DATA_DIR")
if env_override:
return Path(env_override).expanduser().resolve()
return (Path.home() / ".codexlens").resolve()
def find_workspace_root(start_path: Path) -> Optional[Path]:
"""Find the workspace root by looking for .codexlens directory.
Searches from start_path upward to find an existing .codexlens directory.
Returns None if not found.
"""
current = start_path.resolve()
# Search up to filesystem root
while current != current.parent:
workspace_dir = current / WORKSPACE_DIR_NAME
if workspace_dir.is_dir():
return current
current = current.parent
# Check root as well
workspace_dir = current / WORKSPACE_DIR_NAME
if workspace_dir.is_dir():
return current
return None
@dataclass
class Config:
"""Runtime configuration for CodexLens.
- data_dir: Base directory for all persistent CodexLens data.
- venv_path: Optional virtualenv used for language tooling.
- supported_languages: Language IDs and their associated file extensions.
- parsing_rules: Per-language parsing and chunking hints.
"""
data_dir: Path = field(default_factory=_default_global_dir)
venv_path: Path = field(default_factory=lambda: _default_global_dir() / "venv")
supported_languages: Dict[str, Dict[str, Any]] = field(
default_factory=lambda: {
"python": {"extensions": [".py"], "tree_sitter_language": "python"},
"javascript": {"extensions": [".js", ".jsx"], "tree_sitter_language": "javascript"},
"typescript": {"extensions": [".ts", ".tsx"], "tree_sitter_language": "typescript"},
"java": {"extensions": [".java"], "tree_sitter_language": "java"},
"go": {"extensions": [".go"], "tree_sitter_language": "go"},
"zig": {"extensions": [".zig"], "tree_sitter_language": "zig"},
"objective-c": {"extensions": [".m", ".mm"], "tree_sitter_language": "objc"},
}
)
parsing_rules: Dict[str, Dict[str, Any]] = field(
default_factory=lambda: {
"default": {
"max_chunk_chars": 4000,
"max_chunk_lines": 200,
"overlap_lines": 20,
}
}
)
def __post_init__(self) -> None:
try:
self.data_dir = self.data_dir.expanduser().resolve()
self.venv_path = self.venv_path.expanduser().resolve()
self.data_dir.mkdir(parents=True, exist_ok=True)
except Exception as exc:
raise ConfigError(f"Failed to initialize data_dir at {self.data_dir}: {exc}") from exc
@property
def cache_dir(self) -> Path:
"""Directory for transient caches."""
return self.data_dir / "cache"
@property
def index_dir(self) -> Path:
"""Directory where index artifacts are stored."""
return self.data_dir / "index"
@property
def db_path(self) -> Path:
"""Default SQLite index path."""
return self.index_dir / "codexlens.db"
def ensure_runtime_dirs(self) -> None:
"""Create standard runtime directories if missing."""
for directory in (self.cache_dir, self.index_dir):
try:
directory.mkdir(parents=True, exist_ok=True)
except Exception as exc:
raise ConfigError(f"Failed to create directory {directory}: {exc}") from exc
def language_for_path(self, path: str | Path) -> str | None:
"""Infer a supported language ID from a file path."""
extension = Path(path).suffix.lower()
for language_id, spec in self.supported_languages.items():
extensions: List[str] = spec.get("extensions", [])
if extension in extensions:
return language_id
return None
def rules_for_language(self, language_id: str) -> Dict[str, Any]:
"""Get parsing rules for a specific language, falling back to defaults."""
return {**self.parsing_rules.get("default", {}), **self.parsing_rules.get(language_id, {})}
@dataclass
class WorkspaceConfig:
"""Workspace-local configuration for CodexLens.
Stores index data in project/.codexlens/ directory.
"""
workspace_root: Path
def __post_init__(self) -> None:
self.workspace_root = Path(self.workspace_root).resolve()
@property
def codexlens_dir(self) -> Path:
"""The .codexlens directory in workspace root."""
return self.workspace_root / WORKSPACE_DIR_NAME
@property
def db_path(self) -> Path:
"""SQLite index path for this workspace."""
return self.codexlens_dir / "index.db"
@property
def cache_dir(self) -> Path:
"""Cache directory for this workspace."""
return self.codexlens_dir / "cache"
def initialize(self) -> None:
"""Create the .codexlens directory structure."""
try:
self.codexlens_dir.mkdir(parents=True, exist_ok=True)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Create .gitignore to exclude cache but keep index
gitignore_path = self.codexlens_dir / ".gitignore"
if not gitignore_path.exists():
gitignore_path.write_text(
"# CodexLens workspace data\n"
"cache/\n"
"*.log\n"
)
except Exception as exc:
raise ConfigError(f"Failed to initialize workspace at {self.codexlens_dir}: {exc}") from exc
def exists(self) -> bool:
"""Check if workspace is already initialized."""
return self.codexlens_dir.is_dir() and self.db_path.exists()
@classmethod
def from_path(cls, path: Path) -> Optional["WorkspaceConfig"]:
"""Create WorkspaceConfig from a path by finding workspace root.
Returns None if no workspace found.
"""
root = find_workspace_root(path)
if root is None:
return None
return cls(workspace_root=root)
@classmethod
def create_at(cls, path: Path) -> "WorkspaceConfig":
"""Create a new workspace at the given path."""
config = cls(workspace_root=path)
config.initialize()
return config

View File

@@ -0,0 +1,73 @@
"""Pydantic entity models for CodexLens."""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field, field_validator
class Symbol(BaseModel):
"""A code symbol discovered in a file."""
name: str = Field(..., min_length=1)
kind: str = Field(..., min_length=1)
range: Tuple[int, int] = Field(..., description="(start_line, end_line), 1-based inclusive")
@field_validator("range")
@classmethod
def validate_range(cls, value: Tuple[int, int]) -> Tuple[int, int]:
if len(value) != 2:
raise ValueError("range must be a (start_line, end_line) tuple")
start_line, end_line = value
if start_line < 1 or end_line < 1:
raise ValueError("range lines must be >= 1")
if end_line < start_line:
raise ValueError("end_line must be >= start_line")
return value
class SemanticChunk(BaseModel):
"""A semantically meaningful chunk of content, optionally embedded."""
content: str = Field(..., min_length=1)
embedding: Optional[List[float]] = Field(default=None, description="Vector embedding for semantic search")
metadata: Dict[str, Any] = Field(default_factory=dict)
@field_validator("embedding")
@classmethod
def validate_embedding(cls, value: Optional[List[float]]) -> Optional[List[float]]:
if value is None:
return value
if not value:
raise ValueError("embedding cannot be empty when provided")
return value
class IndexedFile(BaseModel):
"""An indexed source file with symbols and optional semantic chunks."""
path: str = Field(..., min_length=1)
language: str = Field(..., min_length=1)
symbols: List[Symbol] = Field(default_factory=list)
chunks: List[SemanticChunk] = Field(default_factory=list)
@field_validator("path", "language")
@classmethod
def strip_and_validate_nonempty(cls, value: str) -> str:
cleaned = value.strip()
if not cleaned:
raise ValueError("value cannot be blank")
return cleaned
class SearchResult(BaseModel):
"""A unified search result for lexical or semantic search."""
path: str = Field(..., min_length=1)
score: float = Field(..., ge=0.0)
excerpt: Optional[str] = None
symbol: Optional[Symbol] = None
chunk: Optional[SemanticChunk] = None
metadata: Dict[str, Any] = Field(default_factory=dict)

View File

@@ -0,0 +1,24 @@
"""CodexLens exception hierarchy."""
from __future__ import annotations
class CodexLensError(Exception):
"""Base class for all CodexLens errors."""
class ConfigError(CodexLensError):
"""Raised when configuration is invalid or cannot be loaded."""
class ParseError(CodexLensError):
"""Raised when parsing or indexing a file fails."""
class StorageError(CodexLensError):
"""Raised when reading/writing index storage fails."""
class SearchError(CodexLensError):
"""Raised when a search operation fails."""

View File

@@ -0,0 +1,8 @@
"""Parsers for CodexLens."""
from __future__ import annotations
from .factory import ParserFactory
__all__ = ["ParserFactory"]

View File

@@ -0,0 +1,154 @@
"""Parser factory for CodexLens.
The project currently ships lightweight regex-based parsers per language.
They can be swapped for tree-sitter based parsers later without changing
CLI or storage interfaces.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Protocol
from codexlens.config import Config
from codexlens.entities import IndexedFile, Symbol
class Parser(Protocol):
def parse(self, text: str, path: Path) -> IndexedFile: ...
@dataclass
class SimpleRegexParser:
language_id: str
def parse(self, text: str, path: Path) -> IndexedFile:
symbols: List[Symbol] = []
if self.language_id == "python":
symbols = _parse_python_symbols(text)
elif self.language_id in {"javascript", "typescript"}:
symbols = _parse_js_ts_symbols(text)
elif self.language_id == "java":
symbols = _parse_java_symbols(text)
elif self.language_id == "go":
symbols = _parse_go_symbols(text)
else:
symbols = _parse_generic_symbols(text)
return IndexedFile(
path=str(path.resolve()),
language=self.language_id,
symbols=symbols,
chunks=[],
)
class ParserFactory:
def __init__(self, config: Config) -> None:
self.config = config
self._parsers: Dict[str, Parser] = {}
def get_parser(self, language_id: str) -> Parser:
if language_id not in self._parsers:
self._parsers[language_id] = SimpleRegexParser(language_id)
return self._parsers[language_id]
_PY_CLASS_RE = re.compile(r"^\s*class\s+([A-Za-z_]\w*)\b")
_PY_DEF_RE = re.compile(r"^\s*def\s+([A-Za-z_]\w*)\s*\(")
def _parse_python_symbols(text: str) -> List[Symbol]:
symbols: List[Symbol] = []
current_class_indent: Optional[int] = None
for i, line in enumerate(text.splitlines(), start=1):
if _PY_CLASS_RE.match(line):
name = _PY_CLASS_RE.match(line).group(1)
current_class_indent = len(line) - len(line.lstrip(" "))
symbols.append(Symbol(name=name, kind="class", range=(i, i)))
continue
def_match = _PY_DEF_RE.match(line)
if def_match:
name = def_match.group(1)
indent = len(line) - len(line.lstrip(" "))
kind = "method" if current_class_indent is not None and indent > current_class_indent else "function"
symbols.append(Symbol(name=name, kind=kind, range=(i, i)))
continue
if current_class_indent is not None:
indent = len(line) - len(line.lstrip(" "))
if line.strip() and indent <= current_class_indent:
current_class_indent = None
return symbols
_JS_FUNC_RE = re.compile(r"^\s*(?:export\s+)?function\s+([A-Za-z_$][\w$]*)\s*\(")
_JS_CLASS_RE = re.compile(r"^\s*(?:export\s+)?class\s+([A-Za-z_$][\w$]*)\b")
def _parse_js_ts_symbols(text: str) -> List[Symbol]:
symbols: List[Symbol] = []
for i, line in enumerate(text.splitlines(), start=1):
func_match = _JS_FUNC_RE.match(line)
if func_match:
symbols.append(Symbol(name=func_match.group(1), kind="function", range=(i, i)))
continue
class_match = _JS_CLASS_RE.match(line)
if class_match:
symbols.append(Symbol(name=class_match.group(1), kind="class", range=(i, i)))
return symbols
_JAVA_CLASS_RE = re.compile(r"^\s*(?:public\s+)?class\s+([A-Za-z_]\w*)\b")
_JAVA_METHOD_RE = re.compile(
r"^\s*(?:public|private|protected|static|\s)+[\w<>\[\]]+\s+([A-Za-z_]\w*)\s*\("
)
def _parse_java_symbols(text: str) -> List[Symbol]:
symbols: List[Symbol] = []
for i, line in enumerate(text.splitlines(), start=1):
class_match = _JAVA_CLASS_RE.match(line)
if class_match:
symbols.append(Symbol(name=class_match.group(1), kind="class", range=(i, i)))
continue
method_match = _JAVA_METHOD_RE.match(line)
if method_match:
symbols.append(Symbol(name=method_match.group(1), kind="method", range=(i, i)))
return symbols
_GO_FUNC_RE = re.compile(r"^\s*func\s+(?:\([^)]+\)\s+)?([A-Za-z_]\w*)\s*\(")
_GO_TYPE_RE = re.compile(r"^\s*type\s+([A-Za-z_]\w*)\s+(?:struct|interface)\b")
def _parse_go_symbols(text: str) -> List[Symbol]:
symbols: List[Symbol] = []
for i, line in enumerate(text.splitlines(), start=1):
type_match = _GO_TYPE_RE.match(line)
if type_match:
symbols.append(Symbol(name=type_match.group(1), kind="class", range=(i, i)))
continue
func_match = _GO_FUNC_RE.match(line)
if func_match:
symbols.append(Symbol(name=func_match.group(1), kind="function", range=(i, i)))
return symbols
_GENERIC_DEF_RE = re.compile(r"^\s*(?:def|function|func)\s+([A-Za-z_]\w*)\b")
_GENERIC_CLASS_RE = re.compile(r"^\s*(?:class|struct|interface)\s+([A-Za-z_]\w*)\b")
def _parse_generic_symbols(text: str) -> List[Symbol]:
symbols: List[Symbol] = []
for i, line in enumerate(text.splitlines(), start=1):
class_match = _GENERIC_CLASS_RE.match(line)
if class_match:
symbols.append(Symbol(name=class_match.group(1), kind="class", range=(i, i)))
continue
def_match = _GENERIC_DEF_RE.match(line)
if def_match:
symbols.append(Symbol(name=def_match.group(1), kind="function", range=(i, i)))
return symbols

View File

@@ -0,0 +1,31 @@
"""Optional semantic search module for CodexLens.
Install with: pip install codexlens[semantic]
"""
from __future__ import annotations
SEMANTIC_AVAILABLE = False
_import_error: str | None = None
try:
import numpy as np
try:
from fastembed import TextEmbedding
SEMANTIC_BACKEND = "fastembed"
except ImportError:
try:
from sentence_transformers import SentenceTransformer
SEMANTIC_BACKEND = "sentence-transformers"
except ImportError:
raise ImportError("Neither fastembed nor sentence-transformers available")
SEMANTIC_AVAILABLE = True
except ImportError as e:
_import_error = str(e)
SEMANTIC_BACKEND = None
def check_semantic_available() -> tuple[bool, str | None]:
"""Check if semantic search dependencies are available."""
return SEMANTIC_AVAILABLE, _import_error
__all__ = ["SEMANTIC_AVAILABLE", "SEMANTIC_BACKEND", "check_semantic_available"]

View File

@@ -0,0 +1,130 @@
"""Code chunking strategies for semantic search."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
from codexlens.entities import SemanticChunk, Symbol
@dataclass
class ChunkConfig:
"""Configuration for chunking strategies."""
max_chunk_size: int = 1000 # Max characters per chunk
overlap: int = 100 # Overlap for sliding window
min_chunk_size: int = 50 # Minimum chunk size
class Chunker:
"""Chunk code files for semantic embedding."""
def __init__(self, config: ChunkConfig | None = None) -> None:
self.config = config or ChunkConfig()
def chunk_by_symbol(
self,
content: str,
symbols: List[Symbol],
file_path: str | Path,
language: str,
) -> List[SemanticChunk]:
"""Chunk code by extracted symbols (functions, classes).
Each symbol becomes one chunk with its full content.
"""
chunks: List[SemanticChunk] = []
lines = content.splitlines(keepends=True)
for symbol in symbols:
start_line, end_line = symbol.range
# Convert to 0-indexed
start_idx = max(0, start_line - 1)
end_idx = min(len(lines), end_line)
chunk_content = "".join(lines[start_idx:end_idx])
if len(chunk_content.strip()) < self.config.min_chunk_size:
continue
chunks.append(SemanticChunk(
content=chunk_content,
embedding=None,
metadata={
"file": str(file_path),
"language": language,
"symbol_name": symbol.name,
"symbol_kind": symbol.kind,
"start_line": start_line,
"end_line": end_line,
"strategy": "symbol",
}
))
return chunks
def chunk_sliding_window(
self,
content: str,
file_path: str | Path,
language: str,
) -> List[SemanticChunk]:
"""Chunk code using sliding window approach.
Used for files without clear symbol boundaries or very long functions.
"""
chunks: List[SemanticChunk] = []
lines = content.splitlines(keepends=True)
if not lines:
return chunks
# Calculate lines per chunk based on average line length
avg_line_len = len(content) / max(len(lines), 1)
lines_per_chunk = max(10, int(self.config.max_chunk_size / max(avg_line_len, 1)))
overlap_lines = max(2, int(self.config.overlap / max(avg_line_len, 1)))
start = 0
chunk_idx = 0
while start < len(lines):
end = min(start + lines_per_chunk, len(lines))
chunk_content = "".join(lines[start:end])
if len(chunk_content.strip()) >= self.config.min_chunk_size:
chunks.append(SemanticChunk(
content=chunk_content,
embedding=None,
metadata={
"file": str(file_path),
"language": language,
"chunk_index": chunk_idx,
"start_line": start + 1,
"end_line": end,
"strategy": "sliding_window",
}
))
chunk_idx += 1
# Move window, accounting for overlap
start = end - overlap_lines
if start >= len(lines) - overlap_lines:
break
return chunks
def chunk_file(
self,
content: str,
symbols: List[Symbol],
file_path: str | Path,
language: str,
) -> List[SemanticChunk]:
"""Chunk a file using the best strategy.
Uses symbol-based chunking if symbols available,
falls back to sliding window for files without symbols.
"""
if symbols:
return self.chunk_by_symbol(content, symbols, file_path, language)
return self.chunk_sliding_window(content, file_path, language)

View File

@@ -0,0 +1,67 @@
"""Embedder for semantic code search."""
from __future__ import annotations
from typing import Iterable, List
from . import SEMANTIC_AVAILABLE, SEMANTIC_BACKEND
if SEMANTIC_AVAILABLE:
import numpy as np
class Embedder:
"""Generate embeddings for code chunks using fastembed or sentence-transformers."""
MODEL_NAME = "BAAI/bge-small-en-v1.5"
EMBEDDING_DIM = 384
def __init__(self, model_name: str | None = None) -> None:
if not SEMANTIC_AVAILABLE:
raise ImportError(
"Semantic search dependencies not available. "
"Install with: pip install codexlens[semantic]"
)
self.model_name = model_name or self.MODEL_NAME
self._model = None
self._backend = SEMANTIC_BACKEND
def _load_model(self) -> None:
"""Lazy load the embedding model."""
if self._model is not None:
return
if self._backend == "fastembed":
from fastembed import TextEmbedding
self._model = TextEmbedding(model_name=self.model_name)
else:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.model_name)
def embed(self, texts: str | Iterable[str]) -> List[List[float]]:
"""Generate embeddings for one or more texts.
Args:
texts: Single text or iterable of texts to embed.
Returns:
List of embedding vectors (each is a list of floats).
"""
self._load_model()
if isinstance(texts, str):
texts = [texts]
else:
texts = list(texts)
if self._backend == "fastembed":
embeddings = list(self._model.embed(texts))
return [emb.tolist() for emb in embeddings]
else:
embeddings = self._model.encode(texts)
return embeddings.tolist()
def embed_single(self, text: str) -> List[float]:
"""Generate embedding for a single text."""
return self.embed(text)[0]

View File

@@ -0,0 +1,166 @@
"""Vector storage and similarity search for semantic chunks."""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from codexlens.entities import SearchResult, SemanticChunk
from codexlens.errors import StorageError
from . import SEMANTIC_AVAILABLE
if SEMANTIC_AVAILABLE:
import numpy as np
def _cosine_similarity(a: List[float], b: List[float]) -> float:
"""Compute cosine similarity between two vectors."""
if not SEMANTIC_AVAILABLE:
raise ImportError("numpy required for vector operations")
a_arr = np.array(a)
b_arr = np.array(b)
norm_a = np.linalg.norm(a_arr)
norm_b = np.linalg.norm(b_arr)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a_arr, b_arr) / (norm_a * norm_b))
class VectorStore:
"""SQLite-based vector storage with cosine similarity search."""
def __init__(self, db_path: str | Path) -> None:
if not SEMANTIC_AVAILABLE:
raise ImportError(
"Semantic search dependencies not available. "
"Install with: pip install codexlens[semantic]"
)
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
def _init_schema(self) -> None:
"""Initialize vector storage schema."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS semantic_chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT NOT NULL,
content TEXT NOT NULL,
embedding BLOB NOT NULL,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_chunks_file
ON semantic_chunks(file_path)
""")
conn.commit()
def add_chunk(self, chunk: SemanticChunk, file_path: str) -> int:
"""Add a single chunk with its embedding.
Returns:
The inserted chunk ID.
"""
if chunk.embedding is None:
raise ValueError("Chunk must have embedding before adding to store")
embedding_blob = np.array(chunk.embedding, dtype=np.float32).tobytes()
metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
INSERT INTO semantic_chunks (file_path, content, embedding, metadata)
VALUES (?, ?, ?, ?)
""",
(file_path, chunk.content, embedding_blob, metadata_json)
)
conn.commit()
return cursor.lastrowid or 0
def add_chunks(self, chunks: List[SemanticChunk], file_path: str) -> List[int]:
"""Add multiple chunks with embeddings.
Returns:
List of inserted chunk IDs.
"""
ids = []
for chunk in chunks:
ids.append(self.add_chunk(chunk, file_path))
return ids
def delete_file_chunks(self, file_path: str) -> int:
"""Delete all chunks for a file.
Returns:
Number of deleted chunks.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"DELETE FROM semantic_chunks WHERE file_path = ?",
(file_path,)
)
conn.commit()
return cursor.rowcount
def search_similar(
self,
query_embedding: List[float],
top_k: int = 10,
min_score: float = 0.0,
) -> List[SearchResult]:
"""Find chunks most similar to query embedding.
Args:
query_embedding: Query vector.
top_k: Maximum results to return.
min_score: Minimum similarity score (0-1).
Returns:
List of SearchResult ordered by similarity (highest first).
"""
results: List[Tuple[float, SearchResult]] = []
with sqlite3.connect(self.db_path) as conn:
rows = conn.execute(
"SELECT id, file_path, content, embedding, metadata FROM semantic_chunks"
).fetchall()
for row_id, file_path, content, embedding_blob, metadata_json in rows:
stored_embedding = np.frombuffer(embedding_blob, dtype=np.float32).tolist()
score = _cosine_similarity(query_embedding, stored_embedding)
if score >= min_score:
metadata = json.loads(metadata_json) if metadata_json else {}
# Build excerpt
excerpt = content[:200] + "..." if len(content) > 200 else content
results.append((score, SearchResult(
path=file_path,
score=score,
excerpt=excerpt,
symbol=None,
)))
# Sort by score descending
results.sort(key=lambda x: x[0], reverse=True)
return [r for _, r in results[:top_k]]
def count_chunks(self) -> int:
"""Count total chunks in store."""
with sqlite3.connect(self.db_path) as conn:
row = conn.execute("SELECT COUNT(*) FROM semantic_chunks").fetchone()
return row[0] if row else 0

View File

@@ -0,0 +1,8 @@
"""Storage backends for CodexLens."""
from __future__ import annotations
from .sqlite_store import SQLiteStore
__all__ = ["SQLiteStore"]

View File

@@ -0,0 +1,32 @@
"""Simple filesystem cache helpers."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
@dataclass
class FileCache:
"""Caches file mtimes for incremental indexing."""
cache_path: Path
def load_mtime(self, path: Path) -> Optional[float]:
try:
key = self._key_for(path)
record = (self.cache_path / key).read_text(encoding="utf-8")
return float(record)
except Exception:
return None
def store_mtime(self, path: Path, mtime: float) -> None:
self.cache_path.mkdir(parents=True, exist_ok=True)
key = self._key_for(path)
(self.cache_path / key).write_text(str(mtime), encoding="utf-8")
def _key_for(self, path: Path) -> str:
safe = str(path).replace(":", "_").replace("\\", "_").replace("/", "_")
return f"{safe}.mtime"

View File

@@ -0,0 +1,252 @@
"""SQLite storage for CodexLens indexing and search."""
from __future__ import annotations
import json
import sqlite3
import threading
from dataclasses import asdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
from codexlens.entities import IndexedFile, SearchResult, Symbol
from codexlens.errors import StorageError
class SQLiteStore:
"""SQLiteStore providing FTS5 search and symbol lookup."""
def __init__(self, db_path: str | Path) -> None:
self.db_path = Path(db_path)
self._lock = threading.RLock()
def initialize(self) -> None:
with self._lock:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
self._create_schema(conn)
def add_file(self, indexed_file: IndexedFile, content: str) -> None:
with self._lock:
with self._connect() as conn:
path = str(Path(indexed_file.path).resolve())
language = indexed_file.language
mtime = Path(path).stat().st_mtime if Path(path).exists() else None
line_count = content.count("\n") + 1
conn.execute(
"""
INSERT INTO files(path, language, content, mtime, line_count)
VALUES(?, ?, ?, ?, ?)
ON CONFLICT(path) DO UPDATE SET
language=excluded.language,
content=excluded.content,
mtime=excluded.mtime,
line_count=excluded.line_count
""",
(path, language, content, mtime, line_count),
)
row = conn.execute("SELECT id FROM files WHERE path=?", (path,)).fetchone()
if not row:
raise StorageError(f"Failed to read file id for {path}")
file_id = int(row["id"])
conn.execute(
"INSERT OR REPLACE INTO files_fts(rowid, path, language, content) VALUES(?, ?, ?, ?)",
(file_id, path, language, content),
)
conn.execute("DELETE FROM symbols WHERE file_id=?", (file_id,))
if indexed_file.symbols:
conn.executemany(
"""
INSERT INTO symbols(file_id, name, kind, start_line, end_line)
VALUES(?, ?, ?, ?, ?)
""",
[
(file_id, s.name, s.kind, s.range[0], s.range[1])
for s in indexed_file.symbols
],
)
def remove_file(self, path: str | Path) -> bool:
"""Remove a file from the index.
Returns True if the file was removed, False if it didn't exist.
"""
with self._lock:
with self._connect() as conn:
resolved_path = str(Path(path).resolve())
# Get file_id first
row = conn.execute(
"SELECT id FROM files WHERE path=?", (resolved_path,)
).fetchone()
if not row:
return False
file_id = int(row["id"])
# Delete from FTS index
conn.execute("DELETE FROM files_fts WHERE rowid=?", (file_id,))
# Delete symbols (CASCADE should handle this, but be explicit)
conn.execute("DELETE FROM symbols WHERE file_id=?", (file_id,))
# Delete file record
conn.execute("DELETE FROM files WHERE id=?", (file_id,))
return True
def file_exists(self, path: str | Path) -> bool:
"""Check if a file exists in the index."""
with self._lock:
with self._connect() as conn:
resolved_path = str(Path(path).resolve())
row = conn.execute(
"SELECT 1 FROM files WHERE path=?", (resolved_path,)
).fetchone()
return row is not None
def get_file_mtime(self, path: str | Path) -> float | None:
"""Get the stored mtime for a file, or None if not indexed."""
with self._lock:
with self._connect() as conn:
resolved_path = str(Path(path).resolve())
row = conn.execute(
"SELECT mtime FROM files WHERE path=?", (resolved_path,)
).fetchone()
return float(row["mtime"]) if row and row["mtime"] else None
def search_fts(self, query: str, *, limit: int = 20, offset: int = 0) -> List[SearchResult]:
with self._lock:
with self._connect() as conn:
try:
rows = conn.execute(
"""
SELECT rowid, path, bm25(files_fts) AS rank,
snippet(files_fts, 2, '[bold red]', '[/bold red]', '', 20) AS excerpt
FROM files_fts
WHERE files_fts MATCH ?
ORDER BY rank
LIMIT ? OFFSET ?
""",
(query, limit, offset),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
# BM25 returns negative values where more negative = better match
# Convert to positive score where higher = better
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = max(0.0, -rank) # Negate to make positive, clamp at 0
results.append(
SearchResult(
path=row["path"],
score=score,
excerpt=row["excerpt"],
)
)
return results
def search_symbols(
self, name: str, *, kind: Optional[str] = None, limit: int = 50
) -> List[Symbol]:
pattern = f"%{name}%"
with self._lock:
with self._connect() as conn:
if kind:
rows = conn.execute(
"""
SELECT name, kind, start_line, end_line
FROM symbols
WHERE name LIKE ? AND kind=?
ORDER BY name
LIMIT ?
""",
(pattern, kind, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT name, kind, start_line, end_line
FROM symbols
WHERE name LIKE ?
ORDER BY name
LIMIT ?
""",
(pattern, limit),
).fetchall()
return [
Symbol(name=row["name"], kind=row["kind"], range=(row["start_line"], row["end_line"]))
for row in rows
]
def stats(self) -> Dict[str, Any]:
with self._lock:
with self._connect() as conn:
file_count = conn.execute("SELECT COUNT(*) AS c FROM files").fetchone()["c"]
symbol_count = conn.execute("SELECT COUNT(*) AS c FROM symbols").fetchone()["c"]
lang_rows = conn.execute(
"SELECT language, COUNT(*) AS c FROM files GROUP BY language ORDER BY c DESC"
).fetchall()
languages = {row["language"]: row["c"] for row in lang_rows}
return {
"files": int(file_count),
"symbols": int(symbol_count),
"languages": languages,
"db_path": str(self.db_path),
}
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def _create_schema(self, conn: sqlite3.Connection) -> None:
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE NOT NULL,
language TEXT NOT NULL,
content TEXT NOT NULL,
mtime REAL,
line_count INTEGER
)
"""
)
conn.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS files_fts USING fts5(
path UNINDEXED,
language UNINDEXED,
content
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS symbols (
id INTEGER PRIMARY KEY,
file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
name TEXT NOT NULL,
kind TEXT NOT NULL,
start_line INTEGER NOT NULL,
end_line INTEGER NOT NULL
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_symbols_kind ON symbols(kind)")
except sqlite3.DatabaseError as exc:
raise StorageError(f"Failed to initialize database schema: {exc}") from exc