feat: Implement Skills Manager View and Notifier Module

- Added `skills-manager.js` for managing Claude Code skills with functionalities for loading, displaying, and editing skills.
- Introduced a Notifier module in `notifier.ts` for CLI to server communication, enabling notifications for UI updates on data changes.
- Created comprehensive documentation for the Chain Search implementation, including usage examples and performance tips.
- Developed a test suite for the Chain Search engine, covering basic search, quick search, symbol search, and files-only search functionalities.
This commit is contained in:
catlog22
2025-12-14 11:12:48 +08:00
parent 08dc0a0348
commit ac43cf85ec
26 changed files with 3827 additions and 2005 deletions

View File

@@ -5,17 +5,22 @@ from __future__ import annotations
import json
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
import typer
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.table import Table
from codexlens.config import Config, WorkspaceConfig, find_workspace_root
from codexlens.config import Config
from codexlens.entities import IndexedFile, SearchResult, Symbol
from codexlens.errors import CodexLensError
from codexlens.parsers.factory import ParserFactory
from codexlens.storage.sqlite_store import SQLiteStore
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.registry import RegistryStore, ProjectInfo
from codexlens.storage.index_tree import IndexTreeBuilder
from codexlens.search.chain_search import ChainSearchEngine, SearchOptions
from .output import (
console,
@@ -46,106 +51,20 @@ def _parse_languages(raw: Optional[List[str]]) -> Optional[List[str]]:
return langs or None
def _load_gitignore(base_path: Path) -> List[str]:
gitignore = base_path / ".gitignore"
if not gitignore.exists():
return []
try:
return [line.strip() for line in gitignore.read_text(encoding="utf-8").splitlines() if line.strip()]
except OSError:
return []
def _get_index_root() -> Path:
"""Get the index root directory from config or default."""
env_override = os.getenv("CODEXLENS_INDEX_DIR")
if env_override:
return Path(env_override).expanduser().resolve()
return Path.home() / ".codexlens" / "indexes"
def _iter_source_files(
base_path: Path,
config: Config,
languages: Optional[List[str]] = None,
) -> Iterable[Path]:
ignore_dirs = {".git", ".venv", "venv", "node_modules", "__pycache__", ".codexlens"}
# Cache for PathSpec objects per directory
pathspec_cache: Dict[Path, Optional[Any]] = {}
def get_pathspec_for_dir(dir_path: Path) -> Optional[Any]:
"""Get PathSpec for a directory, loading .gitignore if present."""
if dir_path in pathspec_cache:
return pathspec_cache[dir_path]
ignore_patterns = _load_gitignore(dir_path)
if not ignore_patterns:
pathspec_cache[dir_path] = None
return None
try:
from pathspec import PathSpec
from pathspec.patterns.gitwildmatch import GitWildMatchPattern
pathspec = PathSpec.from_lines(GitWildMatchPattern, ignore_patterns)
pathspec_cache[dir_path] = pathspec
return pathspec
except Exception:
pathspec_cache[dir_path] = None
return None
for root, dirs, files in os.walk(base_path):
dirs[:] = [d for d in dirs if d not in ignore_dirs and not d.startswith(".")]
root_path = Path(root)
# Get pathspec for current directory
pathspec = get_pathspec_for_dir(root_path)
for file in files:
if file.startswith("."):
continue
full_path = root_path / file
rel = full_path.relative_to(root_path)
if pathspec and pathspec.match_file(str(rel)):
continue
language_id = config.language_for_path(full_path)
if not language_id:
continue
if languages and language_id not in languages:
continue
yield full_path
def _get_store_for_path(path: Path, use_global: bool = False) -> tuple[SQLiteStore, Path]:
"""Get SQLiteStore for a path, using workspace-local or global database.
Returns (store, db_path) tuple.
"""
if use_global:
config = Config()
config.ensure_runtime_dirs()
return SQLiteStore(config.db_path), config.db_path
# Try to find existing workspace
workspace = WorkspaceConfig.from_path(path)
if workspace:
return SQLiteStore(workspace.db_path), workspace.db_path
# Fall back to global config
config = Config()
config.ensure_runtime_dirs()
return SQLiteStore(config.db_path), config.db_path
def _is_safe_to_clean(target_dir: Path) -> bool:
"""Verify directory is a CodexLens directory before deletion.
Checks for presence of .codexlens directory or index.db file.
"""
if not target_dir.exists():
return True
# Check if it's the .codexlens directory itself
if target_dir.name == ".codexlens":
# Verify it contains index.db or cache directory
return (target_dir / "index.db").exists() or (target_dir / "cache").exists()
# Check if it contains .codexlens subdirectory
return (target_dir / ".codexlens").exists()
def _get_registry_path() -> Path:
"""Get the registry database path."""
env_override = os.getenv("CODEXLENS_DATA_DIR")
if env_override:
return Path(env_override).expanduser().resolve() / "registry.db"
return Path.home() / ".codexlens" / "registry.db"
@app.command()
@@ -157,112 +76,98 @@ def init(
"-l",
help="Limit indexing to specific languages (repeat or comma-separated).",
),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
workers: int = typer.Option(4, "--workers", "-w", min=1, max=16, help="Parallel worker processes."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Initialize or rebuild the index for a directory.
Creates a .codexlens/ directory in the project root to store index data.
Use --global to use the global database at ~/.codexlens/ instead.
Indexes are stored in ~/.codexlens/indexes/ with mirrored directory structure.
Set CODEXLENS_INDEX_DIR to customize the index location.
"""
_configure_logging(verbose)
config = Config()
factory = ParserFactory(config)
languages = _parse_languages(language)
base_path = path.expanduser().resolve()
store: SQLiteStore | None = None
registry: RegistryStore | None = None
try:
# Determine database location
if use_global:
config.ensure_runtime_dirs()
db_path = config.db_path
workspace_root = None
else:
# Create workspace-local .codexlens directory
workspace = WorkspaceConfig.create_at(base_path)
db_path = workspace.db_path
workspace_root = workspace.workspace_root
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
store = SQLiteStore(db_path)
store.initialize()
builder = IndexTreeBuilder(registry, mapper, config)
files = list(_iter_source_files(base_path, config, languages))
indexed_count = 0
symbol_count = 0
console.print(f"[bold]Building index for:[/bold] {base_path}")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("{task.completed}/{task.total} files"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Indexing", total=len(files))
for file_path in files:
progress.advance(task)
try:
text = file_path.read_text(encoding="utf-8", errors="ignore")
lang_id = config.language_for_path(file_path) or "unknown"
parser = factory.get_parser(lang_id)
indexed_file = parser.parse(text, file_path)
store.add_file(indexed_file, text)
indexed_count += 1
symbol_count += len(indexed_file.symbols)
except Exception as exc:
logging.debug("Failed to index %s: %s", file_path, exc)
continue
build_result = builder.build(
source_root=base_path,
languages=languages,
workers=workers,
)
result = {
"path": str(base_path),
"files_indexed": indexed_count,
"symbols_indexed": symbol_count,
"files_indexed": build_result.total_files,
"dirs_indexed": build_result.total_dirs,
"index_root": str(build_result.index_root),
"project_id": build_result.project_id,
"languages": languages or sorted(config.supported_languages.keys()),
"db_path": str(db_path),
"workspace_root": str(workspace_root) if workspace_root else None,
"errors": len(build_result.errors),
}
if json_mode:
print_json(success=True, result=result)
else:
render_status(result)
console.print(f"[green]OK[/green] Indexed [bold]{build_result.total_files}[/bold] files in [bold]{build_result.total_dirs}[/bold] directories")
console.print(f" Index root: {build_result.index_root}")
if build_result.errors:
console.print(f" [yellow]Warnings:[/yellow] {len(build_result.errors)} errors")
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Init failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
if registry is not None:
registry.close()
@app.command()
def search(
query: str = typer.Argument(..., help="FTS query to run."),
path: Path = typer.Option(Path("."), "--path", "-p", help="Directory to search from."),
limit: int = typer.Option(20, "--limit", "-n", min=1, max=500, help="Max results."),
depth: int = typer.Option(-1, "--depth", "-d", help="Search depth (-1 = unlimited, 0 = current only)."),
files_only: bool = typer.Option(False, "--files-only", "-f", help="Return only file paths without content snippets."),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Search indexed file contents using SQLite FTS5.
Searches the workspace-local .codexlens/index.db by default.
Use --global to search the global database at ~/.codexlens/.
Use --files-only to return only matching file paths.
Uses chain search across directory indexes.
Use --depth to limit search recursion (0 = current dir only).
"""
_configure_logging(verbose)
search_path = path.expanduser().resolve()
store: SQLiteStore | None = None
registry: RegistryStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
engine = ChainSearchEngine(registry, mapper)
options = SearchOptions(
depth=depth,
total_limit=limit,
files_only=files_only,
)
if files_only:
file_paths = store.search_files_only(query, limit=limit)
file_paths = engine.search_files_only(query, search_path, options)
payload = {"query": query, "count": len(file_paths), "files": file_paths}
if json_mode:
print_json(success=True, result=payload)
@@ -270,12 +175,24 @@ def search(
for fp in file_paths:
console.print(fp)
else:
results = store.search_fts(query, limit=limit)
payload = {"query": query, "count": len(results), "results": results}
result = engine.search(query, search_path, options)
payload = {
"query": query,
"count": len(result.results),
"results": [{"path": r.path, "score": r.score, "excerpt": r.excerpt} for r in result.results],
"stats": {
"dirs_searched": result.stats.dirs_searched,
"files_matched": result.stats.files_matched,
"time_ms": result.stats.time_ms,
},
}
if json_mode:
print_json(success=True, result=payload)
else:
render_search_results(results)
render_search_results(result.results)
if verbose:
console.print(f"[dim]Searched {result.stats.dirs_searched} directories in {result.stats.time_ms:.1f}ms[/dim]")
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
@@ -283,13 +200,14 @@ def search(
console.print(f"[red]Search failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
if registry is not None:
registry.close()
@app.command()
def symbol(
name: str = typer.Argument(..., help="Symbol name to look up."),
path: Path = typer.Option(Path("."), "--path", "-p", help="Directory to search from."),
kind: Optional[str] = typer.Option(
None,
"--kind",
@@ -297,27 +215,31 @@ def symbol(
help="Filter by kind (function|class|method).",
),
limit: int = typer.Option(50, "--limit", "-n", min=1, max=500, help="Max symbols."),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
depth: int = typer.Option(-1, "--depth", "-d", help="Search depth (-1 = unlimited)."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Look up symbols by name and optional kind.
Searches the workspace-local .codexlens/index.db by default.
Use --global to search the global database at ~/.codexlens/.
"""
"""Look up symbols by name and optional kind."""
_configure_logging(verbose)
search_path = path.expanduser().resolve()
store: SQLiteStore | None = None
registry: RegistryStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
syms = store.search_symbols(name, kind=kind, limit=limit)
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
engine = ChainSearchEngine(registry, mapper)
options = SearchOptions(depth=depth, total_limit=limit)
syms = engine.search_symbols(name, search_path, kind=kind, options=options)
payload = {"name": name, "kind": kind, "count": len(syms), "symbols": syms}
if json_mode:
print_json(success=True, result=payload)
else:
render_symbols(syms)
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
@@ -325,8 +247,8 @@ def symbol(
console.print(f"[red]Symbol lookup failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
if registry is not None:
registry.close()
@app.command()
@@ -365,26 +287,54 @@ def inspect(
@app.command()
def status(
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Show index statistics.
Shows statistics for the workspace-local .codexlens/index.db by default.
Use --global to show the global database at ~/.codexlens/.
"""
"""Show index status and configuration."""
_configure_logging(verbose)
store: SQLiteStore | None = None
registry: RegistryStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
stats = store.stats()
registry = RegistryStore()
registry.initialize()
mapper = PathMapper()
# Get all projects
projects = registry.list_projects()
# Calculate total stats
total_files = sum(p.total_files for p in projects)
total_dirs = sum(p.total_dirs for p in projects)
# Get index root size
index_root = mapper.index_root
index_size = 0
if index_root.exists():
for f in index_root.rglob("*"):
if f.is_file():
index_size += f.stat().st_size
stats = {
"index_root": str(index_root),
"registry_path": str(_get_registry_path()),
"projects_count": len(projects),
"total_files": total_files,
"total_dirs": total_dirs,
"index_size_bytes": index_size,
"index_size_mb": round(index_size / (1024 * 1024), 2),
}
if json_mode:
print_json(success=True, result=stats)
else:
render_status(stats)
console.print("[bold]CodexLens Status[/bold]")
console.print(f" Index Root: {stats['index_root']}")
console.print(f" Registry: {stats['registry_path']}")
console.print(f" Projects: {stats['projects_count']}")
console.print(f" Total Files: {stats['total_files']}")
console.print(f" Total Directories: {stats['total_dirs']}")
console.print(f" Index Size: {stats['index_size_mb']} MB")
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
@@ -392,153 +342,423 @@ def status(
console.print(f"[red]Status failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
if registry is not None:
registry.close()
@app.command()
def update(
files: List[str] = typer.Argument(..., help="File paths to update in the index."),
use_global: bool = typer.Option(False, "--global", "-g", help="Use global database instead of workspace-local."),
def projects(
action: str = typer.Argument("list", help="Action: list, show, remove"),
project_path: Optional[Path] = typer.Argument(None, help="Project path (for show/remove)."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Incrementally update specific files in the index.
"""Manage registered projects in the global registry.
Pass one or more file paths to update. Files that no longer exist
will be removed from the index. New or modified files will be re-indexed.
This is much faster than re-running init for large codebases when
only a few files have changed.
Actions:
- list: Show all registered projects
- show <path>: Show details for a specific project
- remove <path>: Remove a project from the registry
"""
_configure_logging(verbose)
config = Config()
factory = ParserFactory(config)
store: SQLiteStore | None = None
registry: RegistryStore | None = None
try:
store, db_path = _get_store_for_path(Path.cwd(), use_global)
store.initialize()
registry = RegistryStore()
registry.initialize()
updated = 0
removed = 0
skipped = 0
errors = []
for file_str in files:
file_path = Path(file_str).resolve()
# Check if file exists on disk
if not file_path.exists():
# File was deleted - remove from index
if store.remove_file(file_path):
removed += 1
logging.debug("Removed deleted file: %s", file_path)
if action == "list":
project_list = registry.list_projects()
if json_mode:
result = [
{
"id": p.id,
"source_root": str(p.source_root),
"index_root": str(p.index_root),
"total_files": p.total_files,
"total_dirs": p.total_dirs,
"status": p.status,
}
for p in project_list
]
print_json(success=True, result=result)
else:
if not project_list:
console.print("[yellow]No projects registered.[/yellow]")
else:
skipped += 1
logging.debug("File not in index: %s", file_path)
continue
table = Table(title="Registered Projects")
table.add_column("ID", style="dim")
table.add_column("Source Root")
table.add_column("Files", justify="right")
table.add_column("Dirs", justify="right")
table.add_column("Status")
# Check if file is supported
language_id = config.language_for_path(file_path)
if not language_id:
skipped += 1
logging.debug("Unsupported file type: %s", file_path)
continue
for p in project_list:
table.add_row(
str(p.id),
str(p.source_root),
str(p.total_files),
str(p.total_dirs),
p.status,
)
console.print(table)
# Check if file needs update (compare mtime)
current_mtime = file_path.stat().st_mtime
stored_mtime = store.get_file_mtime(file_path)
elif action == "show":
if not project_path:
raise typer.BadParameter("Project path required for 'show' action")
if stored_mtime is not None and abs(current_mtime - stored_mtime) < 0.001:
skipped += 1
logging.debug("File unchanged: %s", file_path)
continue
project_path = project_path.expanduser().resolve()
project_info = registry.get_project(project_path)
# Re-index the file
try:
text = file_path.read_text(encoding="utf-8", errors="ignore")
parser = factory.get_parser(language_id)
indexed_file = parser.parse(text, file_path)
store.add_file(indexed_file, text)
updated += 1
logging.debug("Updated file: %s", file_path)
except Exception as exc:
errors.append({"file": str(file_path), "error": str(exc)})
logging.debug("Failed to update %s: %s", file_path, exc)
if not project_info:
if json_mode:
print_json(success=False, error=f"Project not found: {project_path}")
else:
console.print(f"[red]Project not found:[/red] {project_path}")
raise typer.Exit(code=1)
result = {
"updated": updated,
"removed": removed,
"skipped": skipped,
"errors": errors,
"db_path": str(db_path),
}
if json_mode:
result = {
"id": project_info.id,
"source_root": str(project_info.source_root),
"index_root": str(project_info.index_root),
"total_files": project_info.total_files,
"total_dirs": project_info.total_dirs,
"status": project_info.status,
"created_at": project_info.created_at,
"last_indexed": project_info.last_indexed,
}
print_json(success=True, result=result)
else:
console.print(f"[bold]Project:[/bold] {project_info.source_root}")
console.print(f" ID: {project_info.id}")
console.print(f" Index Root: {project_info.index_root}")
console.print(f" Files: {project_info.total_files}")
console.print(f" Directories: {project_info.total_dirs}")
console.print(f" Status: {project_info.status}")
# Show directory breakdown
dirs = registry.get_project_dirs(project_info.id)
if dirs:
console.print(f"\n [bold]Indexed Directories:[/bold] {len(dirs)}")
for d in dirs[:10]:
console.print(f" - {d.source_path.name}/ ({d.files_count} files)")
if len(dirs) > 10:
console.print(f" ... and {len(dirs) - 10} more")
elif action == "remove":
if not project_path:
raise typer.BadParameter("Project path required for 'remove' action")
project_path = project_path.expanduser().resolve()
removed = registry.unregister_project(project_path)
if removed:
mapper = PathMapper()
index_root = mapper.source_to_index_dir(project_path)
if index_root.exists():
shutil.rmtree(index_root)
if json_mode:
print_json(success=True, result={"removed": str(project_path)})
else:
console.print(f"[green]Removed:[/green] {project_path}")
else:
if json_mode:
print_json(success=False, error=f"Project not found: {project_path}")
else:
console.print(f"[yellow]Project not found:[/yellow] {project_path}")
if json_mode:
print_json(success=True, result=result)
else:
console.print(f"[green]Updated:[/green] {updated} files")
console.print(f"[yellow]Removed:[/yellow] {removed} files")
console.print(f"[dim]Skipped:[/dim] {skipped} files")
if errors:
console.print(f"[red]Errors:[/red] {len(errors)}")
for err in errors[:5]:
console.print(f" - {err['file']}: {err['error']}")
raise typer.BadParameter(f"Unknown action: {action}. Use list, show, or remove.")
except typer.BadParameter:
raise
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Update failed:[/red] {exc}")
console.print(f"[red]Projects command failed:[/red] {exc}")
raise typer.Exit(code=1)
finally:
if store is not None:
store.close()
if registry is not None:
registry.close()
@app.command()
def config(
action: str = typer.Argument("show", help="Action: show, set, migrate"),
key: Optional[str] = typer.Argument(None, help="Config key (for set action)."),
value: Optional[str] = typer.Argument(None, help="Config value (for set action)."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Manage CodexLens configuration.
Actions:
- show: Display current configuration
- set <key> <value>: Set configuration value
- migrate <new_path>: Migrate indexes to new location
Config keys:
- index_dir: Directory to store indexes (default: ~/.codexlens/indexes)
"""
_configure_logging(verbose)
config_file = Path.home() / ".codexlens" / "config.json"
def load_config() -> Dict[str, Any]:
if config_file.exists():
return json.loads(config_file.read_text(encoding="utf-8"))
return {}
def save_config(cfg: Dict[str, Any]) -> None:
config_file.parent.mkdir(parents=True, exist_ok=True)
config_file.write_text(json.dumps(cfg, indent=2), encoding="utf-8")
try:
if action == "show":
cfg = load_config()
current_index_dir = os.getenv("CODEXLENS_INDEX_DIR") or cfg.get("index_dir") or str(Path.home() / ".codexlens" / "indexes")
result = {
"config_file": str(config_file),
"index_dir": current_index_dir,
"env_override": os.getenv("CODEXLENS_INDEX_DIR"),
}
if json_mode:
print_json(success=True, result=result)
else:
console.print("[bold]CodexLens Configuration[/bold]")
console.print(f" Config File: {result['config_file']}")
console.print(f" Index Directory: {result['index_dir']}")
if result['env_override']:
console.print(f" [dim](Override via CODEXLENS_INDEX_DIR)[/dim]")
elif action == "set":
if not key:
raise typer.BadParameter("Config key required for 'set' action")
if not value:
raise typer.BadParameter("Config value required for 'set' action")
cfg = load_config()
if key == "index_dir":
new_path = Path(value).expanduser().resolve()
cfg["index_dir"] = str(new_path)
save_config(cfg)
if json_mode:
print_json(success=True, result={"key": key, "value": str(new_path)})
else:
console.print(f"[green]Set {key}=[/green] {new_path}")
console.print("[yellow]Note: Existing indexes remain at old location. Use 'config migrate' to move them.[/yellow]")
else:
raise typer.BadParameter(f"Unknown config key: {key}")
elif action == "migrate":
if not key:
raise typer.BadParameter("New path required for 'migrate' action")
new_path = Path(key).expanduser().resolve()
mapper = PathMapper()
old_path = mapper.index_root
if not old_path.exists():
if json_mode:
print_json(success=False, error="No indexes to migrate")
else:
console.print("[yellow]No indexes to migrate.[/yellow]")
return
# Create new directory
new_path.mkdir(parents=True, exist_ok=True)
# Count items to migrate
items = list(old_path.iterdir())
migrated = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("{task.completed}/{task.total}"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Migrating indexes", total=len(items))
for item in items:
dest = new_path / item.name
if item.is_dir():
shutil.copytree(item, dest, dirs_exist_ok=True)
else:
shutil.copy2(item, dest)
migrated += 1
progress.advance(task)
# Update config
cfg = load_config()
cfg["index_dir"] = str(new_path)
save_config(cfg)
# Update registry paths
registry = RegistryStore()
registry.initialize()
registry.update_index_paths(old_path, new_path)
registry.close()
result = {
"migrated_from": str(old_path),
"migrated_to": str(new_path),
"items_migrated": migrated,
}
if json_mode:
print_json(success=True, result=result)
else:
console.print(f"[green]Migrated {migrated} items to:[/green] {new_path}")
console.print("[dim]Old indexes can be manually deleted after verifying migration.[/dim]")
else:
raise typer.BadParameter(f"Unknown action: {action}. Use show, set, or migrate.")
except typer.BadParameter:
raise
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))
else:
console.print(f"[red]Config command failed:[/red] {exc}")
raise typer.Exit(code=1)
@app.command()
def clean(
path: Path = typer.Argument(Path("."), exists=True, file_okay=False, dir_okay=True, help="Project root to clean."),
use_global: bool = typer.Option(False, "--global", "-g", help="Clean global database instead of workspace-local."),
path: Optional[Path] = typer.Argument(None, help="Project path to clean (removes project index)."),
all_indexes: bool = typer.Option(False, "--all", "-a", help="Remove all indexes."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Remove CodexLens index data.
Removes the .codexlens/ directory from the project root.
Use --global to clean the global database at ~/.codexlens/.
Without arguments, shows current index size.
With path, removes that project's indexes.
With --all, removes all indexes (use with caution).
"""
_configure_logging(verbose)
base_path = path.expanduser().resolve()
try:
if use_global:
config = Config()
import shutil
if config.index_dir.exists():
if not _is_safe_to_clean(config.index_dir):
raise CodexLensError(f"Safety check failed: {config.index_dir} does not appear to be a CodexLens directory")
shutil.rmtree(config.index_dir)
result = {"cleaned": str(config.index_dir), "type": "global"}
else:
workspace = WorkspaceConfig.from_path(base_path)
if workspace and workspace.codexlens_dir.exists():
import shutil
if not _is_safe_to_clean(workspace.codexlens_dir):
raise CodexLensError(f"Safety check failed: {workspace.codexlens_dir} does not appear to be a CodexLens directory")
shutil.rmtree(workspace.codexlens_dir)
result = {"cleaned": str(workspace.codexlens_dir), "type": "workspace"}
else:
result = {"cleaned": None, "type": "workspace", "message": "No workspace found"}
mapper = PathMapper()
index_root = mapper.index_root
if json_mode:
print_json(success=True, result=result)
else:
if result.get("cleaned"):
console.print(f"[green]Cleaned:[/green] {result['cleaned']}")
if all_indexes:
# Remove everything
if not index_root.exists():
if json_mode:
print_json(success=True, result={"cleaned": None, "message": "No indexes to clean"})
else:
console.print("[yellow]No indexes to clean.[/yellow]")
return
# Calculate size before removal
total_size = 0
for f in index_root.rglob("*"):
if f.is_file():
total_size += f.stat().st_size
# Remove registry first
registry_path = _get_registry_path()
if registry_path.exists():
registry_path.unlink()
# Remove all indexes
shutil.rmtree(index_root)
result = {
"cleaned": str(index_root),
"size_freed_mb": round(total_size / (1024 * 1024), 2),
}
if json_mode:
print_json(success=True, result=result)
else:
console.print("[yellow]No workspace index found to clean.[/yellow]")
console.print(f"[green]Removed all indexes:[/green] {result['size_freed_mb']} MB freed")
elif path:
# Remove specific project
project_path = path.expanduser().resolve()
project_index = mapper.source_to_index_dir(project_path)
if not project_index.exists():
if json_mode:
print_json(success=False, error=f"No index found for: {project_path}")
else:
console.print(f"[yellow]No index found for:[/yellow] {project_path}")
return
# Calculate size
total_size = 0
for f in project_index.rglob("*"):
if f.is_file():
total_size += f.stat().st_size
# Remove from registry
registry = RegistryStore()
registry.initialize()
registry.unregister_project(project_path)
registry.close()
# Remove indexes
shutil.rmtree(project_index)
result = {
"cleaned": str(project_path),
"index_path": str(project_index),
"size_freed_mb": round(total_size / (1024 * 1024), 2),
}
if json_mode:
print_json(success=True, result=result)
else:
console.print(f"[green]Removed indexes for:[/green] {project_path}")
console.print(f" Freed: {result['size_freed_mb']} MB")
else:
# Show current status
if not index_root.exists():
if json_mode:
print_json(success=True, result={"index_root": str(index_root), "exists": False})
else:
console.print("[yellow]No indexes found.[/yellow]")
return
total_size = 0
for f in index_root.rglob("*"):
if f.is_file():
total_size += f.stat().st_size
registry = RegistryStore()
registry.initialize()
projects = registry.list_projects()
registry.close()
result = {
"index_root": str(index_root),
"projects_count": len(projects),
"total_size_mb": round(total_size / (1024 * 1024), 2),
}
if json_mode:
print_json(success=True, result=result)
else:
console.print("[bold]Index Status[/bold]")
console.print(f" Location: {result['index_root']}")
console.print(f" Projects: {result['projects_count']}")
console.print(f" Total Size: {result['total_size_mb']} MB")
console.print("\n[dim]Use 'clean <path>' to remove a specific project or 'clean --all' to remove everything.[/dim]")
except Exception as exc:
if json_mode:
print_json(success=False, error=str(exc))