feat: Enhance embedding generation to track current index path and improve metadata retrieval

This commit is contained in:
catlog22
2026-01-02 19:18:26 +08:00
parent 0b6e9db8e4
commit c268b531aa
6 changed files with 175 additions and 136 deletions

View File

@@ -17,8 +17,8 @@ Usage:
# Use specific embedding model
python generate_embeddings.py /path/to/_index.db --model code
# Generate embeddings for all indexes in a directory
python generate_embeddings.py --scan ~/.codexlens/indexes
# Generate centralized embeddings for all indexes in a directory
python generate_embeddings.py --centralized ~/.codexlens/indexes
# Force regeneration
python generate_embeddings.py /path/to/_index.db --force
@@ -27,8 +27,8 @@ Usage:
import argparse
import logging
import sys
import warnings
from pathlib import Path
from typing import List
# Configure logging
logging.basicConfig(
@@ -42,7 +42,7 @@ logger = logging.getLogger(__name__)
try:
from codexlens.cli.embedding_manager import (
generate_embeddings,
generate_embeddings_recursive,
generate_dense_embeddings_centralized,
)
from codexlens.semantic import SEMANTIC_AVAILABLE
except ImportError as exc:
@@ -135,13 +135,20 @@ def main():
parser.add_argument(
"index_path",
type=Path,
help="Path to _index.db file or directory to scan"
help="Path to _index.db file or directory for centralized mode"
)
parser.add_argument(
"--centralized",
"-c",
action="store_true",
help="Use centralized vector storage (single HNSW index at project root)"
)
parser.add_argument(
"--scan",
action="store_true",
help="Scan directory tree for all _index.db files"
help="(Deprecated) Use --centralized instead"
)
parser.add_argument(
@@ -203,14 +210,25 @@ def main():
logger.error(f"Path not found: {index_path}")
sys.exit(1)
# Determine if scanning or single file
if args.scan or index_path.is_dir():
# Scan mode - use recursive implementation
# Handle deprecated --scan flag
use_centralized = args.centralized
if args.scan:
warnings.warn(
"--scan is deprecated, use --centralized instead",
DeprecationWarning
)
logger.warning("--scan is deprecated. Use --centralized instead.")
use_centralized = True
# Determine if using centralized mode or single file
if use_centralized or index_path.is_dir():
# Centralized mode - single HNSW index at project root
if index_path.is_file():
logger.error("--scan requires a directory path")
logger.error("--centralized requires a directory path")
sys.exit(1)
result = generate_embeddings_recursive(
logger.info(f"Generating centralized embeddings for: {index_path}")
result = generate_dense_embeddings_centralized(
index_root=index_path,
model_profile=args.model,
force=args.force,
@@ -225,13 +243,14 @@ def main():
# Log summary
data = result["result"]
logger.info(f"\n{'='*60}")
logger.info("BATCH PROCESSING COMPLETE")
logger.info("CENTRALIZED EMBEDDING COMPLETE")
logger.info(f"{'='*60}")
logger.info(f"Indexes processed: {data['indexes_successful']}/{data['indexes_processed']}")
logger.info(f"Total chunks created: {data['total_chunks_created']}")
logger.info(f"Total files processed: {data['total_files_processed']}")
if data['total_files_failed'] > 0:
logger.warning(f"Total files failed: {data['total_files_failed']}")
logger.info(f"Total chunks created: {data['chunks_created']}")
logger.info(f"Total files processed: {data['files_processed']}")
if data.get('files_failed', 0) > 0:
logger.warning(f"Total files failed: {data['files_failed']}")
logger.info(f"Central index: {data.get('central_index_path', 'N/A')}")
logger.info(f"Time: {data.get('elapsed_time', 0):.1f}s")
else:
# Single index mode
@@ -250,7 +269,7 @@ def main():
logger.error(f"Failed: {result.get('error', 'Unknown error')}")
sys.exit(1)
logger.info("\n Embeddings generation complete!")
logger.info("\nv Embeddings generation complete!")
logger.info("\nYou can now use vector search:")
logger.info(" codexlens search 'your query' --mode pure-vector")

View File

@@ -1990,12 +1990,6 @@ def embeddings_generate(
"--chunk-size",
help="Maximum chunk size in characters.",
),
recursive: bool = typer.Option(
False,
"--recursive",
"-r",
help="Recursively process all _index.db files in directory tree.",
),
max_workers: int = typer.Option(
1,
"--max-workers",
@@ -2049,7 +2043,6 @@ def embeddings_generate(
from codexlens.cli.embedding_manager import (
generate_embeddings,
generate_embeddings_recursive,
generate_dense_embeddings_centralized,
scan_for_model_conflicts,
check_global_model_lock,
@@ -2070,25 +2063,21 @@ def embeddings_generate(
# Resolve path
target_path = path.expanduser().resolve()
# Determine if we should use recursive mode
use_recursive = False
# Determine index path or root for centralized mode
index_path = None
index_root = None
if target_path.is_file() and target_path.name == "_index.db":
# Direct index file
index_path = target_path
if recursive:
# Use parent directory for recursive processing
use_recursive = True
index_root = target_path.parent
index_root = target_path.parent
elif target_path.is_dir():
if recursive:
# Recursive mode: process all _index.db files in directory tree
use_recursive = True
# Directory: Try to find index for this project
if centralized:
# Centralized mode uses directory as root
index_root = target_path
else:
# Non-recursive: Try to find index for this project
# Single index mode: find the specific index
registry = RegistryStore()
try:
registry.initialize()
@@ -2099,6 +2088,7 @@ def embeddings_generate(
console.print(f"[red]Error:[/red] No index found for {target_path}")
console.print("Run 'codexlens init' first to create an index")
raise typer.Exit(code=1)
index_root = index_path.parent
finally:
registry.close()
else:
@@ -2115,9 +2105,6 @@ def embeddings_generate(
effective_root = index_root if index_root else (index_path.parent if index_path else target_path)
console.print(f"Index root: [dim]{effective_root}[/dim]")
console.print(f"Mode: [green]Centralized[/green]")
elif use_recursive:
console.print(f"Index root: [dim]{index_root}[/dim]")
console.print(f"Mode: [yellow]Recursive[/yellow]")
else:
console.print(f"Index: [dim]{index_path}[/dim]")
console.print(f"Backend: [cyan]{backend}[/cyan]")
@@ -2154,7 +2141,7 @@ def embeddings_generate(
# Pre-check for model conflicts (only if not forcing)
if not force:
# Determine the index root for conflict scanning
scan_root = index_root if use_recursive else (index_path.parent if index_path else None)
scan_root = index_root if index_root else (index_path.parent if index_path else None)
if scan_root:
conflict_result = scan_for_model_conflicts(scan_root, backend, model)
@@ -2208,16 +2195,6 @@ def embeddings_generate(
progress_callback=progress_update,
max_workers=max_workers,
)
elif use_recursive:
result = generate_embeddings_recursive(
index_root,
embedding_backend=backend,
model_profile=model,
force=force,
chunk_size=chunk_size,
progress_callback=progress_update,
max_workers=max_workers,
)
else:
result = generate_embeddings(
index_path,
@@ -2257,7 +2234,7 @@ def embeddings_generate(
if centralized:
# Centralized mode output
elapsed = data.get("elapsed_time", 0)
console.print(f"[green][/green] Centralized embeddings generated successfully!")
console.print(f"[green]v[/green] Centralized embeddings generated successfully!")
console.print(f" Model: {data.get('model_name', model)}")
console.print(f" Chunks created: {data['chunks_created']:,}")
console.print(f" Files processed: {data['files_processed']}")
@@ -2265,32 +2242,11 @@ def embeddings_generate(
console.print(f" [yellow]Files failed: {data['files_failed']}[/yellow]")
console.print(f" Central index: {data.get('central_index_path', 'N/A')}")
console.print(f" Time: {elapsed:.1f}s")
elif use_recursive:
# Recursive mode output
console.print(f"[green]✓[/green] Recursive embeddings generation complete!")
console.print(f" Indexes processed: {data['indexes_processed']}")
console.print(f" Indexes successful: {data['indexes_successful']}")
if data['indexes_failed'] > 0:
console.print(f" [yellow]Indexes failed: {data['indexes_failed']}[/yellow]")
console.print(f" Total chunks created: {data['total_chunks_created']:,}")
console.print(f" Total files processed: {data['total_files_processed']}")
if data['total_files_failed'] > 0:
console.print(f" [yellow]Total files failed: {data['total_files_failed']}[/yellow]")
console.print(f" Model profile: {data['model_profile']}")
# Show details if verbose
if verbose and data.get('details'):
console.print("\n[dim]Index details:[/dim]")
for detail in data['details']:
status_icon = "[green]✓[/green]" if detail['success'] else "[red]✗[/red]"
console.print(f" {status_icon} {detail['path']}")
if not detail['success'] and detail.get('error'):
console.print(f" [dim]Error: {detail['error']}[/dim]")
else:
# Single index mode output
elapsed = data["elapsed_time"]
console.print(f"[green][/green] Embeddings generated successfully!")
console.print(f"[green]v[/green] Embeddings generated successfully!")
console.print(f" Model: {data['model_name']}")
console.print(f" Chunks created: {data['chunks_created']:,}")
console.print(f" Files processed: {data['files_processed']}")

View File

@@ -848,8 +848,10 @@ def generate_embeddings(
}
def discover_all_index_dbs(index_root: Path) -> List[Path]:
"""Recursively find all _index.db files in an index tree.
def _discover_index_dbs_internal(index_root: Path) -> List[Path]:
"""Internal helper to find all _index.db files (no deprecation warning).
Used internally by generate_dense_embeddings_centralized.
Args:
index_root: Root directory to scan for _index.db files
@@ -863,6 +865,30 @@ def discover_all_index_dbs(index_root: Path) -> List[Path]:
return sorted(index_root.rglob("_index.db"))
def discover_all_index_dbs(index_root: Path) -> List[Path]:
"""Recursively find all _index.db files in an index tree.
.. deprecated::
This function is deprecated. Use centralized indexing with
``generate_dense_embeddings_centralized`` instead, which handles
index discovery internally.
Args:
index_root: Root directory to scan for _index.db files
Returns:
Sorted list of paths to _index.db files
"""
import warnings
warnings.warn(
"discover_all_index_dbs is deprecated. Use centralized indexing with "
"generate_dense_embeddings_centralized instead.",
DeprecationWarning,
stacklevel=2
)
return _discover_index_dbs_internal(index_root)
def find_all_indexes(scan_dir: Path) -> List[Path]:
"""Find all _index.db files in directory tree.
@@ -896,6 +922,11 @@ def generate_embeddings_recursive(
) -> Dict[str, any]:
"""Generate embeddings for all index databases in a project recursively.
.. deprecated::
This function is deprecated. Use ``generate_dense_embeddings_centralized``
instead, which creates a single centralized vector index for the entire project
rather than per-directory indexes.
Args:
index_root: Root index directory containing _index.db files
embedding_backend: Embedding backend to use (fastembed or litellm).
@@ -921,6 +952,14 @@ def generate_embeddings_recursive(
Returns:
Aggregated result dictionary with generation statistics
"""
import warnings
warnings.warn(
"generate_embeddings_recursive is deprecated. Use "
"generate_dense_embeddings_centralized instead for centralized indexing.",
DeprecationWarning,
stacklevel=2
)
# Get defaults from config if not specified
(default_backend, default_model, default_gpu,
default_endpoints, default_strategy, default_cooldown) = _get_embedding_defaults()
@@ -951,8 +990,8 @@ def generate_embeddings_recursive(
else:
max_workers = 1
# Discover all _index.db files
index_files = discover_all_index_dbs(index_root)
# Discover all _index.db files (using internal helper to avoid double deprecation warning)
index_files = _discover_index_dbs_internal(index_root)
if not index_files:
return {
@@ -1120,7 +1159,7 @@ def generate_dense_embeddings_centralized(
return {"success": False, "error": backend_error or "Embedding backend not available"}
# Discover all _index.db files
index_files = discover_all_index_dbs(index_root)
index_files = _discover_index_dbs_internal(index_root)
if not index_files:
return {
@@ -1197,6 +1236,8 @@ def generate_dense_embeddings_centralized(
# Track chunk ID to file_path mapping for metadata
chunk_id_to_info: Dict[int, Dict[str, Any]] = {}
next_chunk_id = 1
# Track current index_path for source_index_db field
current_index_path: Optional[str] = None
for idx, index_path in enumerate(index_files, 1):
if progress_callback:
@@ -1206,6 +1247,9 @@ def generate_dense_embeddings_centralized(
rel_path = index_path
progress_callback(f"Processing {idx}/{len(index_files)}: {rel_path}")
# Track current index_path for source_index_db
current_index_path = str(index_path)
try:
with sqlite3.connect(index_path) as conn:
conn.row_factory = sqlite3.Row
@@ -1250,6 +1294,7 @@ def generate_dense_embeddings_centralized(
"content": chunk.content,
"metadata": chunk.metadata,
"category": get_file_category(file_path) or "code",
"source_index_db": current_index_path,
}
total_chunks_created += 1
@@ -1303,7 +1348,7 @@ def generate_dense_embeddings_centralized(
"end_line": metadata.get("end_line"),
"category": info.get("category"),
"metadata": metadata,
"source_index_db": None, # Not tracked per-chunk currently
"source_index_db": info.get("source_index_db"),
})
meta_store.add_chunks(chunks_to_store)
@@ -1348,7 +1393,7 @@ def get_embeddings_status(index_root: Path) -> Dict[str, any]:
Returns:
Aggregated status with coverage statistics, model info, and timestamps
"""
index_files = discover_all_index_dbs(index_root)
index_files = _discover_index_dbs_internal(index_root)
if not index_files:
return {
@@ -1517,7 +1562,7 @@ def scan_for_model_conflicts(
- conflicts: List of conflicting index paths with their configs
- indexes_with_embeddings: Count of indexes that have embeddings
"""
index_files = discover_all_index_dbs(index_root)
index_files = _discover_index_dbs_internal(index_root)
if not index_files:
return {

View File

@@ -693,8 +693,10 @@ class HybridSearchEngine:
vectors_meta_path, chunk_ids, score_map, category
)
except Exception as e:
self.logger.debug(
"Centralized metadata lookup failed, falling back: %s", e
self.logger.warning(
"Centralized metadata lookup failed, falling back to legacy traversal: %s. "
"Consider regenerating embeddings with: codexlens embeddings-generate --centralized",
e
)
# Fallback: traverse _index.db files (legacy path)

View File

@@ -282,10 +282,27 @@ def get_optimal_providers(use_gpu: bool = True, with_device_options: bool = Fals
return ["CPUExecutionProvider"]
gpu_info = detect_gpu()
# Check if GPU was requested but not available - log warning
if not gpu_info.gpu_available:
try:
import onnxruntime as ort
available_providers = ort.get_available_providers()
except ImportError:
available_providers = []
logger.warning(
"GPU acceleration was requested, but no supported GPU provider (CUDA, DirectML) "
f"was found. Available providers: {available_providers}. Falling back to CPU."
)
else:
# Log which GPU provider is being used
gpu_providers = [p for p in gpu_info.onnx_providers if p != "CPUExecutionProvider"]
if gpu_providers:
logger.info(f"Using {gpu_providers[0]} for ONNX GPU acceleration")
if not with_device_options:
return gpu_info.onnx_providers
# Build providers with device_id options for GPU providers
device_id = get_selected_device_id()
providers = []

View File

@@ -232,55 +232,55 @@ class VectorMetadataStore:
if not chunk_ids:
return []
with self._lock:
conn = self._get_connection()
try:
placeholders = ",".join("?" * len(chunk_ids))
# No lock needed for reads: WAL mode + thread-local connections ensure safety
conn = self._get_connection()
try:
placeholders = ",".join("?" * len(chunk_ids))
if category:
query = f'''
SELECT chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db
FROM chunk_metadata
WHERE chunk_id IN ({placeholders}) AND category = ?
'''
params = list(chunk_ids) + [category]
else:
query = f'''
SELECT chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db
FROM chunk_metadata
WHERE chunk_id IN ({placeholders})
'''
params = list(chunk_ids)
if category:
query = f'''
SELECT chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db
FROM chunk_metadata
WHERE chunk_id IN ({placeholders}) AND category = ?
'''
params = list(chunk_ids) + [category]
else:
query = f'''
SELECT chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db
FROM chunk_metadata
WHERE chunk_id IN ({placeholders})
'''
params = list(chunk_ids)
rows = conn.execute(query, params).fetchall()
rows = conn.execute(query, params).fetchall()
results = []
for row in rows:
metadata = None
if row["metadata"]:
try:
metadata = json.loads(row["metadata"])
except json.JSONDecodeError:
metadata = {}
results = []
for row in rows:
metadata = None
if row["metadata"]:
try:
metadata = json.loads(row["metadata"])
except json.JSONDecodeError:
metadata = {}
results.append({
"chunk_id": row["chunk_id"],
"file_path": row["file_path"],
"content": row["content"],
"start_line": row["start_line"],
"end_line": row["end_line"],
"category": row["category"],
"metadata": metadata or {},
"source_index_db": row["source_index_db"],
})
results.append({
"chunk_id": row["chunk_id"],
"file_path": row["file_path"],
"content": row["content"],
"start_line": row["start_line"],
"end_line": row["end_line"],
"category": row["category"],
"metadata": metadata or {},
"source_index_db": row["source_index_db"],
})
return results
return results
except sqlite3.Error as e:
logger.error("Failed to get chunks by IDs: %s", e)
return []
except sqlite3.Error as e:
logger.error("Failed to get chunks by IDs: %s", e)
return []
def get_chunk_count(self) -> int:
"""Get total number of chunks in store.
@@ -288,15 +288,15 @@ class VectorMetadataStore:
Returns:
Total chunk count.
"""
with self._lock:
conn = self._get_connection()
try:
row = conn.execute(
"SELECT COUNT(*) FROM chunk_metadata"
).fetchone()
return row[0] if row else 0
except sqlite3.Error:
return 0
# No lock needed for reads: WAL mode + thread-local connections ensure safety
conn = self._get_connection()
try:
row = conn.execute(
"SELECT COUNT(*) FROM chunk_metadata"
).fetchone()
return row[0] if row else 0
except sqlite3.Error:
return 0
def clear(self) -> None:
"""Clear all metadata."""