chore: move ccw-skill-hub to standalone repository

Migrated ccw-skill-hub to D:/ccw-skill-hub as independent git project.
Removed nested git repos (ccw/frontend/ccw-skill-hub, skill-hub-repo, skill-hub-temp).
This commit is contained in:
catlog22
2026-02-24 11:57:26 +08:00
parent 6f0bbe84ea
commit 61e313a0c1
35 changed files with 3189 additions and 362 deletions

View File

@@ -0,0 +1,70 @@
# Security scanning workflow for codex-lens
# Runs pip-audit to check for known vulnerabilities in dependencies
name: Security Scan
on:
# Run on push to main branch
push:
branches:
- main
- master
# Run weekly on Sundays at 00:00 UTC
schedule:
- cron: '0 0 * * 0'
# Allow manual trigger
workflow_dispatch:
jobs:
security-audit:
name: Dependency Vulnerability Scan
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
cache: 'pip'
- name: Install pip-audit
run: |
python -m pip install --upgrade pip
pip install pip-audit
- name: Run pip-audit on requirements.in
run: pip-audit --requirement requirements.in
continue-on-error: false
- name: Run pip-audit on pyproject.toml dependencies
run: pip-audit --project-path .
continue-on-error: false
- name: Check for safety issues
run: |
pip install safety
safety check --json || true
continue-on-error: true
bandit-security:
name: Code Security Linting
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install bandit
run: pip install bandit[toml]
- name: Run bandit security linter
run: bandit -r src/ -ll -i
continue-on-error: true

View File

@@ -0,0 +1,38 @@
# Dependency Management
This project uses setuptools with `pyproject.toml` for dependency management.
## Locking Dependencies
To generate a fully pinned `requirements.txt` from `requirements.in`:
```bash
# Install pip-tools
pip install pip-tools
# Compile requirements
pip-compile requirements.in --output-file=requirements.txt
# To upgrade dependencies
pip-compile --upgrade requirements.in --output-file=requirements.txt
```
## Version Constraints
This project uses **pessimistic versioning** (`~=`) for dependency specifications per PEP 440:
- `typer~=0.9.0` means: `>=0.9.0, ==0.9.*`
- Allows bugfix updates (0.9.0, 0.9.1, 0.9.2) but not feature/minor updates (0.10.0)
This provides stability while allowing automatic patch updates.
## Security Scanning
The project includes automated security scanning via GitHub Actions:
- Runs on every push to main branch
- Runs weekly (Sundays at 00:00 UTC)
- Can be triggered manually
The scan uses:
- `pip-audit`: Checks for known vulnerabilities in dependencies
- `bandit`: Security linter for Python code

View File

@@ -13,95 +13,95 @@ authors = [
{ name = "CodexLens contributors" }
]
dependencies = [
"typer>=0.9",
"rich>=13",
"pydantic>=2.0",
"tree-sitter>=0.20",
"tree-sitter-python>=0.25",
"tree-sitter-javascript>=0.25",
"tree-sitter-typescript>=0.23",
"pathspec>=0.11",
"watchdog>=3.0",
"typer~=0.9.0",
"rich~=13.0.0",
"pydantic~=2.0.0",
"tree-sitter~=0.20.0",
"tree-sitter-python~=0.25.0",
"tree-sitter-javascript~=0.25.0",
"tree-sitter-typescript~=0.23.0",
"pathspec~=0.11.0",
"watchdog~=3.0.0",
# ast-grep for pattern-based AST matching (PyO3 bindings)
# ast-grep-py 0.40+ supports Python 3.13
"ast-grep-py>=0.40.0",
"ast-grep-py~=0.40.0",
]
[project.optional-dependencies]
# Semantic search using fastembed (ONNX-based, lightweight ~200MB)
semantic = [
"numpy>=1.24",
"fastembed>=0.2",
"hnswlib>=0.8.0",
"numpy~=1.24.0",
"fastembed~=0.2.0",
"hnswlib~=0.8.0",
]
# GPU acceleration for semantic search (NVIDIA CUDA)
# Install with: pip install codexlens[semantic-gpu]
semantic-gpu = [
"numpy>=1.24",
"fastembed>=0.2",
"hnswlib>=0.8.0",
"onnxruntime-gpu>=1.15.0", # CUDA support
"numpy~=1.24.0",
"fastembed~=0.2.0",
"hnswlib~=0.8.0",
"onnxruntime-gpu~=1.15.0", # CUDA support
]
# GPU acceleration for Windows (DirectML - supports NVIDIA/AMD/Intel)
# Install with: pip install codexlens[semantic-directml]
semantic-directml = [
"numpy>=1.24",
"fastembed>=0.2",
"hnswlib>=0.8.0",
"onnxruntime-directml>=1.15.0", # DirectML support
"numpy~=1.24.0",
"fastembed~=0.2.0",
"hnswlib~=0.8.0",
"onnxruntime-directml~=1.15.0", # DirectML support
]
# Cross-encoder reranking (second-stage, optional)
# Install with: pip install codexlens[reranker] (default: ONNX backend)
reranker-onnx = [
"optimum>=1.16",
"onnxruntime>=1.15",
"transformers>=4.36",
"optimum~=1.16.0",
"onnxruntime~=1.15.0",
"transformers~=4.36.0",
]
# Remote reranking via HTTP API
reranker-api = [
"httpx>=0.25",
"httpx~=0.25.0",
]
# LLM-based reranking via ccw-litellm
reranker-litellm = [
"ccw-litellm>=0.1",
"ccw-litellm~=0.1.0",
]
# Legacy sentence-transformers CrossEncoder reranker
reranker-legacy = [
"sentence-transformers>=2.2",
"sentence-transformers~=2.2.0",
]
# Backward-compatible alias for default reranker backend
reranker = [
"optimum>=1.16",
"onnxruntime>=1.15",
"transformers>=4.36",
"optimum~=1.16.0",
"onnxruntime~=1.15.0",
"transformers~=4.36.0",
]
# Encoding detection for non-UTF8 files
encoding = [
"chardet>=5.0",
"chardet~=5.0.0",
]
# Clustering for staged hybrid search (HDBSCAN + sklearn)
clustering = [
"hdbscan>=0.8.1",
"scikit-learn>=1.3.0",
"hdbscan~=0.8.1",
"scikit-learn~=1.3.0",
]
# Full features including tiktoken for accurate token counting
full = [
"tiktoken>=0.5.0",
"tiktoken~=0.5.0",
]
# Language Server Protocol support
lsp = [
"pygls>=1.3.0",
"pygls~=1.3.0",
]
[project.scripts]

View File

@@ -0,0 +1,22 @@
# Core dependencies for codex-lens
# This file tracks direct dependencies only
# Run: pip-compile requirements.in --output-file=requirements.txt
typer~=0.9.0
rich~=13.0.0
pydantic~=2.0.0
tree-sitter~=0.20.0
tree-sitter-python~=0.25.0
tree-sitter-javascript~=0.25.0
tree-sitter-typescript~=0.23.0
pathspec~=0.11.0
watchdog~=3.0.0
ast-grep-py~=0.40.0
# Semantic search dependencies
numpy~=1.24.0
fastembed~=0.2.0
hnswlib~=0.8.0
# LSP support
pygls~=1.3.0

1
codex-lens/src/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.ace-tool/

View File

@@ -1,4 +1,42 @@
"""Embedding Manager - Manage semantic embeddings for code indexes."""
"""Embedding Manager - Manage semantic embeddings for code indexes.
This module provides functions for generating and managing semantic embeddings
for code indexes, supporting both fastembed and litellm backends.
Example Usage:
Generate embeddings for a single index:
>>> from pathlib import Path
>>> from codexlens.cli.embedding_manager import generate_embeddings
>>> result = generate_embeddings(
... index_path=Path("path/to/_index.db"),
... force=True
... )
>>> if result["success"]:
... print(f"Generated {result['total_chunks_created']} embeddings")
Generate embeddings for an entire project with centralized index:
>>> from codexlens.cli.embedding_manager import generate_dense_embeddings_centralized
>>> result = generate_dense_embeddings_centralized(
... index_root=Path("path/to/project"),
... force=True,
... progress_callback=lambda msg: print(msg)
... )
Check if embeddings exist:
>>> from codexlens.cli.embedding_manager import check_index_embeddings
>>> status = check_index_embeddings(Path("path/to/_index.db"))
>>> print(status["result"]["has_embeddings"])
Backward Compatibility:
The deprecated `discover_all_index_dbs()` function is maintained for compatibility.
`generate_embeddings_recursive()` is deprecated but functional; use
`generate_dense_embeddings_centralized()` instead.
The `EMBEDDING_BATCH_SIZE` constant is kept as a reference but actual batch size
is calculated dynamically via `calculate_dynamic_batch_size()`.
"""
import gc
import json
@@ -53,11 +91,11 @@ def calculate_dynamic_batch_size(config, embedder) -> int:
- Utilization factor (default 80% to leave headroom)
Args:
config: Config object with api_batch_size_* settings
embedder: Embedding model object with max_tokens property
config: Config object with api_batch_size_* settings.
embedder: Embedding model object with max_tokens property.
Returns:
Calculated batch size, clamped to [1, api_batch_size_max]
int: Calculated batch size, clamped to [1, api_batch_size_max].
"""
# If dynamic calculation is disabled, return static value
if not getattr(config, 'api_batch_size_dynamic', False):
@@ -147,8 +185,12 @@ def _cleanup_fastembed_resources() -> None:
try:
from codexlens.semantic.embedder import clear_embedder_cache
clear_embedder_cache()
except Exception:
except (ImportError, AttributeError):
# Expected when semantic module unavailable or cache function doesn't exist
pass
except Exception as exc:
# Log unexpected errors but don't fail cleanup
logger.debug(f"Unexpected error during fastembed cleanup: {exc}")
def _generate_chunks_from_cursor(
@@ -201,9 +243,18 @@ def _generate_chunks_from_cursor(
total_files += 1
for chunk in chunks:
yield (chunk, file_path)
except (OSError, UnicodeDecodeError) as e:
# File access or encoding errors
logger.error(f"Failed to read file {file_path}: {e}")
failed_files.append((file_path, f"File read error: {e}"))
except ValueError as e:
# Chunking configuration errors
logger.error(f"Chunking config error for {file_path}: {e}")
failed_files.append((file_path, f"Chunking error: {e}"))
except Exception as e:
logger.error(f"Failed to chunk {file_path}: {e}")
failed_files.append((file_path, str(e)))
# Other unexpected errors
logger.error(f"Unexpected error processing {file_path}: {e}")
failed_files.append((file_path, f"Unexpected error: {e}"))
def _create_token_aware_batches(
@@ -371,8 +422,153 @@ def _get_embedding_defaults() -> tuple[str, str, bool, List, str, float]:
config.embedding_strategy,
config.embedding_cooldown,
)
except Exception:
except (ImportError, AttributeError, OSError, ValueError) as exc:
# Config not available or malformed - use defaults
logger.debug(f"Using default embedding config (config load failed): {exc}")
return "fastembed", "code", True, [], "latency_aware", 60.0
except Exception as exc:
# Unexpected error - still use defaults but log
logger.warning(f"Unexpected error loading embedding config: {exc}")
return "fastembed", "code", True, [], "latency_aware", 60.0
def _apply_embedding_config_defaults(
embedding_backend: Optional[str],
model_profile: Optional[str],
use_gpu: Optional[bool],
endpoints: Optional[List],
strategy: Optional[str],
cooldown: Optional[float],
) -> tuple[str, str, bool, List, str, float]:
"""Apply config defaults to embedding parameters.
This helper function reduces code duplication across embedding generation
functions by centralizing the default value application logic.
Args:
embedding_backend: Embedding backend (fastembed/litellm) or None for default
model_profile: Model profile/name or None for default
use_gpu: GPU flag or None for default
endpoints: API endpoints list or None for default
strategy: Selection strategy or None for default
cooldown: Cooldown seconds or None for default
Returns:
Tuple of (backend, model, use_gpu, endpoints, strategy, cooldown) with
defaults applied where None was passed.
"""
(default_backend, default_model, default_gpu,
default_endpoints, default_strategy, default_cooldown) = _get_embedding_defaults()
backend = embedding_backend if embedding_backend is not None else default_backend
model = model_profile if model_profile is not None else default_model
gpu = use_gpu if use_gpu is not None else default_gpu
eps = endpoints if endpoints is not None else default_endpoints
strat = strategy if strategy is not None else default_strategy
cool = cooldown if cooldown is not None else default_cooldown
return backend, model, gpu, eps, strat, cool
def _calculate_max_workers(
embedding_backend: str,
endpoints: Optional[List],
max_workers: Optional[int],
) -> int:
"""Calculate optimal max_workers based on backend and endpoint count.
Args:
embedding_backend: The embedding backend being used
endpoints: List of API endpoints (for litellm multi-endpoint mode)
max_workers: Explicitly specified max_workers or None for auto-calculation
Returns:
Calculated or specified max_workers value
"""
if max_workers is not None:
return max_workers
endpoint_count = len(endpoints) if endpoints else 1
# Set dynamic max_workers default based on backend type and endpoint count
# - FastEmbed: CPU-bound, sequential is optimal (1 worker)
# - LiteLLM single endpoint: 4 workers default
# - LiteLLM multi-endpoint: workers = endpoint_count * 2 (to saturate all APIs)
if embedding_backend == "litellm":
if endpoint_count > 1:
return endpoint_count * 2 # No cap, scale with endpoints
else:
return 4
else:
return 1
def _initialize_embedder_and_chunker(
embedding_backend: str,
model_profile: str,
use_gpu: bool,
endpoints: Optional[List],
strategy: str,
cooldown: float,
chunk_size: int,
overlap: int,
) -> tuple:
"""Initialize embedder and chunker for embedding generation.
This helper function reduces code duplication by centralizing embedder
and chunker initialization logic.
Args:
embedding_backend: The embedding backend (fastembed/litellm)
model_profile: Model profile or name
use_gpu: Whether to use GPU acceleration
endpoints: Optional API endpoints for load balancing
strategy: Selection strategy for multi-endpoint mode
cooldown: Cooldown seconds for rate-limited endpoints
chunk_size: Maximum chunk size in characters
overlap: Overlap size in characters
Returns:
Tuple of (embedder, chunker, endpoint_count)
Raises:
ValueError: If embedding_backend is invalid
"""
from codexlens.semantic.factory import get_embedder as get_embedder_factory
from codexlens.semantic.chunker import Chunker, ChunkConfig
from codexlens.config import Config
# Initialize embedder using factory (supports fastembed, litellm, and rotational)
# For fastembed: model_profile is a profile name (fast/code/multilingual/balanced)
# For litellm: model_profile is a model name (e.g., qwen3-embedding)
# For multi-endpoint: endpoints list enables load balancing
if embedding_backend == "fastembed":
embedder = get_embedder_factory(backend="fastembed", profile=model_profile, use_gpu=use_gpu)
elif embedding_backend == "litellm":
embedder = get_embedder_factory(
backend="litellm",
model=model_profile,
endpoints=endpoints if endpoints else None,
strategy=strategy,
cooldown=cooldown,
)
else:
raise ValueError(f"Invalid embedding backend: {embedding_backend}. Must be 'fastembed' or 'litellm'.")
# skip_token_count=True: Use fast estimation (len/4) instead of expensive tiktoken
# This significantly reduces CPU usage with minimal impact on metadata accuracy
# Load chunk stripping config from settings
chunk_cfg = Config.load()
chunker = Chunker(config=ChunkConfig(
max_chunk_size=chunk_size,
overlap=overlap,
skip_token_count=True,
strip_comments=getattr(chunk_cfg, 'chunk_strip_comments', True),
strip_docstrings=getattr(chunk_cfg, 'chunk_strip_docstrings', True),
))
endpoint_count = len(endpoints) if endpoints else 1
return embedder, chunker, endpoint_count
def generate_embeddings(
@@ -397,16 +593,16 @@ def generate_embeddings(
LiteLLM backend to improve throughput.
Args:
index_path: Path to _index.db file
index_path: Path to _index.db file.
embedding_backend: Embedding backend to use (fastembed or litellm).
Defaults to config setting.
model_profile: Model profile for fastembed (fast, code, multilingual, balanced)
or model name for litellm (e.g., qwen3-embedding).
Defaults to config setting.
force: If True, regenerate even if embeddings exist
chunk_size: Maximum chunk size in characters
overlap: Overlap size in characters for sliding window chunking (default: 200)
progress_callback: Optional callback for progress updates
force: If True, regenerate even if embeddings exist.
chunk_size: Maximum chunk size in characters.
overlap: Overlap size in characters for sliding window chunking (default: 200).
progress_callback: Optional callback for progress updates.
use_gpu: Whether to use GPU acceleration (fastembed only).
Defaults to config setting.
max_tokens_per_batch: Maximum tokens per batch for token-aware batching.
@@ -420,40 +616,22 @@ def generate_embeddings(
cooldown: Default cooldown seconds for rate-limited endpoints.
Returns:
Result dictionary with generation statistics
Dict[str, any]: Result dictionary with generation statistics.
Contains keys: success, error (if failed), files_processed,
total_chunks_created, execution_time, etc.
Raises:
ValueError: If embedding_backend is invalid.
ImportError: If semantic module is not available.
"""
# Get defaults from config if not specified
(default_backend, default_model, default_gpu,
default_endpoints, default_strategy, default_cooldown) = _get_embedding_defaults()
# Apply config defaults
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown = \
_apply_embedding_config_defaults(
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown
)
if embedding_backend is None:
embedding_backend = default_backend
if model_profile is None:
model_profile = default_model
if use_gpu is None:
use_gpu = default_gpu
if endpoints is None:
endpoints = default_endpoints
if strategy is None:
strategy = default_strategy
if cooldown is None:
cooldown = default_cooldown
# Calculate endpoint count for worker scaling
endpoint_count = len(endpoints) if endpoints else 1
# Set dynamic max_workers default based on backend type and endpoint count
# - FastEmbed: CPU-bound, sequential is optimal (1 worker)
# - LiteLLM single endpoint: 4 workers default
# - LiteLLM multi-endpoint: workers = endpoint_count * 2 (to saturate all APIs)
if max_workers is None:
if embedding_backend == "litellm":
if endpoint_count > 1:
max_workers = endpoint_count * 2 # No cap, scale with endpoints
else:
max_workers = 4
else:
max_workers = 1
# Calculate max_workers
max_workers = _calculate_max_workers(embedding_backend, endpoints, max_workers)
backend_available, backend_error = is_embedding_backend_available(embedding_backend)
if not backend_available:
@@ -487,51 +665,23 @@ def generate_embeddings(
with sqlite3.connect(index_path) as conn:
conn.execute("DELETE FROM semantic_chunks")
conn.commit()
except sqlite3.DatabaseError as e:
return {
"success": False,
"error": f"Database error clearing chunks: {str(e)}",
}
except Exception as e:
return {
"success": False,
"error": f"Failed to clear existing chunks: {str(e)}",
}
# Initialize components
# Initialize embedder and chunker using helper
try:
# Import factory function to support both backends
from codexlens.semantic.factory import get_embedder as get_embedder_factory
from codexlens.semantic.vector_store import VectorStore
from codexlens.semantic.chunker import Chunker, ChunkConfig
# Initialize embedder using factory (supports fastembed, litellm, and rotational)
# For fastembed: model_profile is a profile name (fast/code/multilingual/balanced)
# For litellm: model_profile is a model name (e.g., qwen3-embedding)
# For multi-endpoint: endpoints list enables load balancing
if embedding_backend == "fastembed":
embedder = get_embedder_factory(backend="fastembed", profile=model_profile, use_gpu=use_gpu)
elif embedding_backend == "litellm":
embedder = get_embedder_factory(
backend="litellm",
model=model_profile,
endpoints=endpoints if endpoints else None,
strategy=strategy,
cooldown=cooldown,
)
else:
return {
"success": False,
"error": f"Invalid embedding backend: {embedding_backend}. Must be 'fastembed' or 'litellm'.",
}
# skip_token_count=True: Use fast estimation (len/4) instead of expensive tiktoken
# This significantly reduces CPU usage with minimal impact on metadata accuracy
# Load chunk stripping config from settings
from codexlens.config import Config
chunk_cfg = Config.load()
chunker = Chunker(config=ChunkConfig(
max_chunk_size=chunk_size,
overlap=overlap,
skip_token_count=True,
strip_comments=getattr(chunk_cfg, 'chunk_strip_comments', True),
strip_docstrings=getattr(chunk_cfg, 'chunk_strip_docstrings', True),
))
embedder, chunker, endpoint_count = _initialize_embedder_and_chunker(
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown,
chunk_size, overlap
)
# Log embedder info with endpoint count for multi-endpoint mode
if progress_callback:
@@ -547,10 +697,17 @@ def generate_embeddings(
if progress_callback and batch_config.api_batch_size_dynamic:
progress_callback(f"Dynamic batch size: {effective_batch_size} (model max_tokens={getattr(embedder, 'max_tokens', 8192)})")
except Exception as e:
except (ImportError, ValueError) as e:
# Missing dependency or invalid configuration
return {
"success": False,
"error": f"Failed to initialize components: {str(e)}",
"error": f"Failed to initialize embedding components: {str(e)}",
}
except Exception as e:
# Other unexpected errors
return {
"success": False,
"error": f"Unexpected error initializing components: {str(e)}",
}
# --- STREAMING PROCESSING ---
@@ -814,8 +971,8 @@ def generate_embeddings(
try:
_cleanup_fastembed_resources()
gc.collect()
except Exception:
pass
except Exception as cleanup_exc:
logger.debug(f"Cleanup error during exception handling: {cleanup_exc}")
return {"success": False, "error": f"Failed to read or process files: {str(e)}"}
elapsed_time = time.time() - start_time
@@ -825,8 +982,8 @@ def generate_embeddings(
try:
_cleanup_fastembed_resources()
gc.collect()
except Exception:
pass
except Exception as cleanup_exc:
logger.debug(f"Cleanup error during finalization: {cleanup_exc}")
return {
"success": True,
@@ -922,7 +1079,8 @@ def build_centralized_binary_vectors_from_existing(
}
# We count per-dim later after selecting a target dim.
except Exception:
except (sqlite3.DatabaseError, ValueError, TypeError):
# Skip corrupted or malformed indexes
continue
if not dims_seen:
@@ -971,7 +1129,8 @@ def build_centralized_binary_vectors_from_existing(
"SELECT COUNT(*) FROM semantic_chunks WHERE embedding IS NOT NULL AND length(embedding) > 0"
).fetchone()
total_chunks += int(row[0] if row else 0)
except Exception:
except (sqlite3.DatabaseError, ValueError, TypeError):
# Skip corrupted or malformed indexes
continue
if not total_chunks:
@@ -987,7 +1146,7 @@ def build_centralized_binary_vectors_from_existing(
# Prepare output files / DB.
try:
import numpy as np
except Exception as exc:
except ImportError as exc:
return {"success": False, "error": f"numpy required to build binary vectors: {exc}"}
store = VectorMetadataStore(vectors_meta_path)
@@ -1243,35 +1402,14 @@ def generate_embeddings_recursive(
stacklevel=2
)
# Get defaults from config if not specified
(default_backend, default_model, default_gpu,
default_endpoints, default_strategy, default_cooldown) = _get_embedding_defaults()
# Apply config defaults
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown = \
_apply_embedding_config_defaults(
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown
)
if embedding_backend is None:
embedding_backend = default_backend
if model_profile is None:
model_profile = default_model
if use_gpu is None:
use_gpu = default_gpu
if endpoints is None:
endpoints = default_endpoints
if strategy is None:
strategy = default_strategy
if cooldown is None:
cooldown = default_cooldown
# Calculate endpoint count for worker scaling
endpoint_count = len(endpoints) if endpoints else 1
# Set dynamic max_workers default based on backend type and endpoint count
if max_workers is None:
if embedding_backend == "litellm":
if endpoint_count > 1:
max_workers = endpoint_count * 2 # No cap, scale with endpoints
else:
max_workers = 4
else:
max_workers = 1
# Calculate max_workers
max_workers = _calculate_max_workers(embedding_backend, endpoints, max_workers)
# Discover all _index.db files (using internal helper to avoid double deprecation warning)
index_files = _discover_index_dbs_internal(index_root)
@@ -1401,34 +1539,14 @@ def generate_dense_embeddings_centralized(
"""
from codexlens.config import VECTORS_HNSW_NAME
# Get defaults from config if not specified
(default_backend, default_model, default_gpu,
default_endpoints, default_strategy, default_cooldown) = _get_embedding_defaults()
# Apply config defaults
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown = \
_apply_embedding_config_defaults(
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown
)
if embedding_backend is None:
embedding_backend = default_backend
if model_profile is None:
model_profile = default_model
if use_gpu is None:
use_gpu = default_gpu
if endpoints is None:
endpoints = default_endpoints
if strategy is None:
strategy = default_strategy
if cooldown is None:
cooldown = default_cooldown
# Calculate endpoint count for worker scaling
endpoint_count = len(endpoints) if endpoints else 1
if max_workers is None:
if embedding_backend == "litellm":
if endpoint_count > 1:
max_workers = endpoint_count * 2
else:
max_workers = 4
else:
max_workers = 1
# Calculate max_workers
max_workers = _calculate_max_workers(embedding_backend, endpoints, max_workers)
backend_available, backend_error = is_embedding_backend_available(embedding_backend)
if not backend_available:
@@ -1470,38 +1588,18 @@ def generate_dense_embeddings_centralized(
"error": f"Centralized vector index already exists at {central_hnsw_path}. Use --force to regenerate.",
}
# Initialize embedder
# Initialize embedder and chunker using helper
try:
from codexlens.semantic.factory import get_embedder as get_embedder_factory
from codexlens.semantic.chunker import Chunker, ChunkConfig
from codexlens.semantic.ann_index import ANNIndex
if embedding_backend == "fastembed":
embedder = get_embedder_factory(backend="fastembed", profile=model_profile, use_gpu=use_gpu)
elif embedding_backend == "litellm":
embedder = get_embedder_factory(
backend="litellm",
model=model_profile,
endpoints=endpoints if endpoints else None,
strategy=strategy,
cooldown=cooldown,
)
else:
return {
"success": False,
"error": f"Invalid embedding backend: {embedding_backend}",
}
embedder, chunker, endpoint_count = _initialize_embedder_and_chunker(
embedding_backend, model_profile, use_gpu, endpoints, strategy, cooldown,
chunk_size, overlap
)
# Load chunk stripping config from settings
# Load chunk stripping config for batch size calculation
from codexlens.config import Config
chunk_cfg = Config.load()
chunker = Chunker(config=ChunkConfig(
max_chunk_size=chunk_size,
overlap=overlap,
skip_token_count=True,
strip_comments=getattr(chunk_cfg, 'chunk_strip_comments', True),
strip_docstrings=getattr(chunk_cfg, 'chunk_strip_docstrings', True),
))
batch_config = Config.load()
if progress_callback:
if endpoint_count > 1:
@@ -1509,7 +1607,6 @@ def generate_dense_embeddings_centralized(
progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)")
# Calculate dynamic batch size based on model capacity
batch_config = chunk_cfg # Reuse already loaded config
effective_batch_size = calculate_dynamic_batch_size(batch_config, embedder)
if progress_callback and batch_config.api_batch_size_dynamic:

View File

@@ -120,8 +120,12 @@ def load_env_file(env_path: Path) -> Dict[str, str]:
if result:
key, value = result
env_vars[key] = value
except Exception as exc:
except (OSError, UnicodeDecodeError) as exc:
# File access errors or encoding issues are expected and logged
log.warning("Failed to load .env file %s: %s", env_path, exc)
except Exception as exc:
# Other unexpected errors are also logged but indicate a code issue
log.warning("Unexpected error loading .env file %s: %s", env_path, exc)
return env_vars

View File

@@ -0,0 +1,278 @@
"""Pytest configuration and shared fixtures for codex-lens tests.
This module provides common fixtures and test utilities to reduce code duplication
across the test suite. Using fixtures ensures consistent test setup and makes tests
more maintainable.
Common Fixtures:
- temp_dir: Temporary directory for test files
- sample_index_db: Sample index database with test data
- mock_config: Mock configuration object
- sample_code_files: Factory for creating sample code files
"""
import pytest
import tempfile
import shutil
from pathlib import Path
from typing import Dict, Any
import sqlite3
@pytest.fixture
def temp_dir():
"""Create a temporary directory for test files.
The directory is automatically cleaned up after the test.
Yields:
Path: Path to the temporary directory.
"""
temp_path = Path(tempfile.mkdtemp())
yield temp_path
# Cleanup
if temp_path.exists():
shutil.rmtree(temp_path)
@pytest.fixture
def sample_index_db(temp_dir):
"""Create a sample index database with test data.
The database has a basic schema with files and chunks tables
populated with sample data.
Args:
temp_dir: Temporary directory fixture.
Yields:
Path: Path to the sample index database.
"""
db_path = temp_dir / "_index.db"
# Create database with basic schema
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Files table
cursor.execute("""
CREATE TABLE files (
id INTEGER PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
content TEXT,
language TEXT,
hash TEXT,
indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert sample files
sample_files = [
("test.py", "def hello():\n print('world')", "python", "hash1"),
("test.js", "function hello() { console.log('world'); }", "javascript", "hash2"),
("README.md", "# Test Project", "markdown", "hash3"),
]
cursor.executemany(
"INSERT INTO files (path, content, language, hash) VALUES (?, ?, ?, ?)",
sample_files
)
conn.commit()
conn.close()
yield db_path
@pytest.fixture
def mock_config():
"""Create a mock configuration object with default values.
Returns:
Mock: Mock object with common config attributes.
"""
from unittest.mock import Mock
config = Mock()
config.index_path = Path("/tmp/test_index")
config.chunk_size = 2000
config.overlap = 200
config.embedding_backend = "fastembed"
config.embedding_model = "code"
config.max_results = 10
return config
@pytest.fixture
def sample_code_factory(temp_dir):
"""Factory for creating sample code files.
Args:
temp_dir: Temporary directory fixture.
Returns:
callable: Function that creates sample code files.
"""
def _create_file(filename: str, content: str, language: str = "python") -> Path:
"""Create a sample code file.
Args:
filename: Name of the file to create.
content: Content of the file.
language: Programming language (default: python).
Returns:
Path: Path to the created file.
"""
file_path = temp_dir / filename
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content)
return file_path
return _create_file
@pytest.fixture
def sample_python_code():
"""Sample Python code for testing.
Returns:
str: Sample Python code snippet.
"""
return '''
def calculate_sum(a: int, b: int) -> int:
"""Calculate the sum of two integers."""
return a + b
class Calculator:
"""A simple calculator class."""
def __init__(self):
self.value = 0
def add(self, x: int) -> None:
"""Add a value to the calculator."""
self.value += x
if __name__ == "__main__":
calc = Calculator()
calc.add(5)
print(f"Result: {calc.value}")
'''
@pytest.fixture
def sample_javascript_code():
"""Sample JavaScript code for testing.
Returns:
str: Sample JavaScript code snippet.
"""
return '''
// Simple utility functions
function add(a, b) {
return a + b;
}
const Calculator = class {
constructor() {
this.value = 0;
}
add(x) {
this.value += x;
}
};
// Example usage
const calc = new Calculator();
calc.add(5);
console.log(`Result: ${calc.value}`);
'''
class CodeSampleFactory:
"""Factory class for generating various code samples.
This class provides methods to generate code samples in different
languages with various patterns (classes, functions, imports, etc.).
"""
@staticmethod
def python_function(name: str = "example", docstring: bool = True) -> str:
"""Generate a Python function sample.
Args:
name: Function name.
docstring: Whether to include docstring.
Returns:
str: Python function code.
"""
doc = f' """Example function."""\n' if docstring else ''
return f'''
def {name}(param1: str, param2: int = 10) -> str:
{doc} return param1 * param2
'''.strip()
@staticmethod
def python_class(name: str = "Example") -> str:
"""Generate a Python class sample.
Args:
name: Class name.
Returns:
str: Python class code.
"""
return f'''
class {name}:
"""Example class."""
def __init__(self, value: int = 0):
self.value = value
def increment(self) -> None:
"""Increment the value."""
self.value += 1
'''.strip()
@staticmethod
def javascript_function(name: str = "example") -> str:
"""Generate a JavaScript function sample.
Args:
name: Function name.
Returns:
str: JavaScript function code.
"""
return f'''function {name}(param1, param2 = 10) {{
return param1 * param2;
}}'''.strip()
@staticmethod
def typescript_interface(name: str = "Example") -> str:
"""Generate a TypeScript interface sample.
Args:
name: Interface name.
Returns:
str: TypeScript interface code.
"""
return f'''interface {name} {{
id: number;
name: string;
getValue(): number;
}}'''.strip()
@pytest.fixture
def code_sample_factory():
"""Create a code sample factory instance.
Returns:
CodeSampleFactory: Factory for generating code samples.
"""
return CodeSampleFactory()

View File

@@ -0,0 +1,101 @@
"""LSP Edge Case Tests.
This module tests edge cases and error conditions in LSP (Language Server Protocol)
operations, including timeout handling, protocol errors, and connection failures.
Test Coverage:
- Timeout scenarios for LSP operations
- Protocol errors and malformed responses
- Connection failures and recovery
- Concurrent request handling
"""
import pytest
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import time
class TestLSPTimeouts:
"""Test timeout handling in LSP operations."""
def test_hover_request_timeout(self):
"""Test that hover requests timeout appropriately after configured duration."""
# This is a placeholder for actual timeout testing
# Implementation requires mocking LSP client with delayed response
pytest.skip("Requires LSP server fixture setup")
def test_definition_request_timeout(self):
"""Test that go-to-definition requests timeout appropriately."""
pytest.skip("Requires LSP server fixture setup")
def test_references_request_timeout(self):
"""Test that find-references requests timeout appropriately."""
pytest.skip("Requires LSP server fixture setup")
def test_concurrent_requests_with_timeout(self):
"""Test behavior when multiple requests exceed timeout threshold."""
pytest.skip("Requires LSP server fixture setup")
class TestLSPProtocolErrors:
"""Test handling of LSP protocol errors."""
def test_malformed_json_response(self):
"""Test handling of malformed JSON in LSP responses."""
pytest.skip("Requires LSP client fixture")
def test_invalid_method_error(self):
"""Test handling of unknown/invalid method calls."""
pytest.skip("Requires LSP client fixture")
def test_missing_required_params(self):
"""Test handling of responses with missing required parameters."""
pytest.skip("Requires LSP client fixture")
def test_null_result_handling(self):
"""Test that null results from LSP are handled gracefully."""
pytest.skip("Requires LSP client fixture")
class TestLSPConnectionFailures:
"""Test LSP connection failure scenarios."""
def test_server_not_found(self):
"""Test behavior when LSP server is not available."""
pytest.skip("Requires LSP client fixture")
def test_connection_dropped_mid_request(self):
"""Test handling of dropped connections during active requests."""
pytest.skip("Requires LSP client fixture")
def test_connection_retry_logic(self):
"""Test that connection retry logic works as expected."""
pytest.skip("Requires LSP client fixture")
def test_server_startup_failure(self):
"""Test handling of LSP server startup failures."""
pytest.skip("Requires LSP server fixture")
class TestLSPResourceLimits:
"""Test LSP behavior under resource constraints."""
def test_large_file_handling(self):
"""Test LSP operations on very large source files."""
pytest.skip("Requires test file fixtures")
def test_memory_pressure(self):
"""Test LSP behavior under memory pressure."""
pytest.skip("Requires memory simulation")
def test_concurrent_request_limits(self):
"""Test handling of too many concurrent LSP requests."""
pytest.skip("Requires LSP client fixture")
# TODO: Implement actual tests using pytest fixtures and LSP mock objects
# The test infrastructure needs to be set up with:
# - LSP server fixture (maybe using pygls test server)
# - LSP client fixture with configurable delays/errors
# - Test file fixtures with various code patterns

View File

@@ -0,0 +1,125 @@
"""Incremental Indexer File Event Processing Tests.
This module tests the file event processing in the incremental indexer,
covering all file system event types (CREATED, MODIFIED, DELETED, MOVED).
Test Coverage:
- CREATED events: New files being indexed
- MODIFIED events: Changed files being re-indexed
- DELETED events: Removed files being handled
- MOVED events: File renames being tracked
- Batch processing of multiple events
"""
import pytest
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import tempfile
import shutil
class TestCreatedEvents:
"""Test handling of CREATED file events."""
def test_new_file_indexed(self):
"""Test that newly created files are properly indexed."""
pytest.skip("Requires incremental indexer fixture")
def test_created_in_subdirectory(self):
"""Test that files created in subdirectories are indexed."""
pytest.skip("Requires incremental indexer fixture")
def test_batch_created_events(self):
"""Test handling multiple files created simultaneously."""
pytest.skip("Requires incremental indexer fixture")
class TestModifiedEvents:
"""Test handling of MODIFIED file events."""
def test_file_content_updated(self):
"""Test that file content changes trigger re-indexing."""
pytest.skip("Requires incremental indexer fixture")
def test_metadata_only_change(self):
"""Test handling of metadata-only changes (permissions, etc)."""
pytest.skip("Requires incremental indexer fixture")
def test_rapid_modifications(self):
"""Test handling of rapid successive modifications to same file."""
pytest.skip("Requires incremental indexer fixture")
class TestDeletedEvents:
"""Test handling of DELETED file events."""
def test_file_removed_from_index(self):
"""Test that deleted files are removed from the index."""
pytest.skip("Requires incremental indexer fixture")
def test_directory_deleted(self):
"""Test handling of directory deletion events."""
pytest.skip("Requires incremental indexer fixture")
def test_delete_non_indexed_file(self):
"""Test handling deletion of files that were never indexed."""
pytest.skip("Requires incremental indexer fixture")
class TestMovedEvents:
"""Test handling of MOVED/RENAMED file events."""
def test_file_renamed(self):
"""Test that renamed files are tracked in the index."""
pytest.skip("Requires incremental indexer fixture")
def test_file_moved_to_subdirectory(self):
"""Test that files moved to subdirectories are tracked."""
pytest.skip("Requires incremental indexer fixture")
def test_file_moved_out_of_watch_root(self):
"""Test handling of files moved outside the watch directory."""
pytest.skip("Requires incremental indexer fixture")
def test_directory_renamed(self):
"""Test handling of directory rename events."""
pytest.skip("Requires incremental indexer fixture")
class TestEventBatching:
"""Test batching and deduplication of file events."""
def test_duplicate_events_deduplicated(self):
"""Test that duplicate events for the same file are deduplicated."""
pytest.skip("Requires incremental indexer fixture")
def test_event_ordering_preserved(self):
"""Test that events are processed in the correct order."""
pytest.skip("Requires incremental indexer fixture")
def test_mixed_event_types_batch(self):
"""Test handling a batch with mixed event types."""
pytest.skip("Requires incremental indexer fixture")
class TestErrorHandling:
"""Test error handling in file event processing."""
def test_unreadable_file_skipped(self):
"""Test that unreadable files are handled gracefully."""
pytest.skip("Requires incremental indexer fixture")
def test_corrupted_event_continues(self):
"""Test that processing continues after a corrupted event."""
pytest.skip("Requires incremental indexer fixture")
def test_indexer_error_recovery(self):
"""Test recovery from indexer errors during event processing."""
pytest.skip("Requires incremental indexer fixture")
# TODO: Implement actual tests using pytest fixtures and the incremental indexer
# The test infrastructure needs:
# - IncrementalIndexer fixture with mock filesystem watcher
# - Temporary directory fixtures for test files
# - Mock event queue for controlled event injection

View File

@@ -0,0 +1,114 @@
"""Database Migration Tests.
This module tests the database migration system for the codex-lens index,
ensuring that forward and backward compatibility is maintained across schema versions.
Test Coverage:
- Forward migrations: Old schema to new schema
- Backward compatibility: New code can read old schemas
- Migration rollback capabilities
- Data integrity during migrations
- Edge cases (empty databases, corrupted data, etc.)
"""
import pytest
import sqlite3
from pathlib import Path
import tempfile
import json
class TestForwardMigrations:
"""Test upgrading from older schema versions to newer ones."""
def test_v0_to_v1_migration(self):
"""Test migration from schema v0 to v1."""
pytest.skip("Requires migration infrastructure setup")
def test_v1_to_v2_migration(self):
"""Test migration from schema v1 to v2."""
pytest.skip("Requires migration infrastructure setup")
def test_migration_preserves_data(self):
"""Test that migration preserves existing data."""
pytest.skip("Requires migration infrastructure setup")
def test_migration_adds_new_columns(self):
"""Test that new columns are added with correct defaults."""
pytest.skip("Requires migration infrastructure setup")
class TestBackwardCompatibility:
"""Test that newer code can read and work with older database schemas."""
def test_new_code_reads_old_schema(self):
"""Test that current code can read old schema databases."""
pytest.skip("Requires old schema fixture")
def test_new_code_writes_to_old_schema(self):
"""Test that current code handles writes to old schema gracefully."""
pytest.skip("Requires old schema fixture")
def test_old_code_rejects_new_schema(self):
"""Test that old code fails appropriately on new schemas."""
pytest.skip("Requires old code fixture")
class TestMigrationRollback:
"""Test rollback capabilities for failed migrations."""
def test_failed_migration_rolls_back(self):
"""Test that failed migrations are rolled back completely."""
pytest.skip("Requires migration infrastructure setup")
def test_partial_migration_recovery(self):
"""Test recovery from partially completed migrations."""
pytest.skip("Requires migration infrastructure setup")
def test_rollback_preserves_original_data(self):
"""Test that rollback restores original state."""
pytest.skip("Requires migration infrastructure setup")
class TestMigrationEdgeCases:
"""Test migration behavior in edge cases."""
def test_empty_database_migration(self):
"""Test migration of an empty database."""
pytest.skip("Requires migration infrastructure setup")
def test_large_database_migration(self):
"""Test migration of a large database."""
pytest.skip("Requires migration infrastructure setup")
def test_corrupted_database_handling(self):
"""Test handling of corrupted databases during migration."""
pytest.skip("Requires migration infrastructure setup")
def test_concurrent_migration_protection(self):
"""Test that concurrent migrations are prevented."""
pytest.skip("Requires migration infrastructure setup")
class TestSchemaVersionTracking:
"""Test schema version tracking and detection."""
def test_version_table_exists(self):
"""Test that version tracking table exists and is populated."""
pytest.skip("Requires migration infrastructure setup")
def test_version_auto_detection(self):
"""Test that schema version is auto-detected from database."""
pytest.skip("Requires migration infrastructure setup")
def test_version_update_after_migration(self):
"""Test that version is updated correctly after migration."""
pytest.skip("Requires migration infrastructure setup")
# TODO: Implement actual tests using pytest fixtures
# The test infrastructure needs:
# - Migration runner fixture that can apply and rollback migrations
# - Old schema fixtures (pre-built databases with known schemas)
# - Temporary database fixtures for isolated testing
# - Mock data generators for various schema versions