feat: Add centralized vector storage and metadata management for embeddings

This commit is contained in:
catlog22
2026-01-02 17:18:23 +08:00
parent 9157c5c78b
commit 0b6e9db8e4
5 changed files with 534 additions and 11 deletions

View File

@@ -2005,6 +2005,12 @@ def embeddings_generate(
),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose output."),
centralized: bool = typer.Option(
False,
"--centralized",
"-c",
help="Use centralized vector storage (single HNSW index at project root).",
),
) -> None:
"""Generate semantic embeddings for code search.
@@ -2012,6 +2018,10 @@ def embeddings_generate(
semantic search capabilities. Embeddings are stored in the same
database as the FTS index.
Storage Modes:
- Default: Per-directory HNSW indexes alongside _index.db files
- Centralized: Single HNSW index at project root (_vectors.hnsw)
Embedding Backend Options:
- fastembed: Local ONNX-based embeddings (default, no API calls)
- litellm: Remote API embeddings via ccw-litellm (requires API keys)
@@ -2033,12 +2043,14 @@ def embeddings_generate(
codexlens embeddings-generate ~/.codexlens/indexes/project/_index.db # Specific index
codexlens embeddings-generate ~/projects/my-app --backend litellm --model text-embedding-3-small # Use LiteLLM
codexlens embeddings-generate ~/projects/my-app --model fast --force # Regenerate with fast profile
codexlens embeddings-generate ~/projects/my-app --centralized # Centralized vector storage
"""
_configure_logging(verbose, json_mode)
from codexlens.cli.embedding_manager import (
generate_embeddings,
generate_embeddings_recursive,
generate_dense_embeddings_centralized,
scan_for_model_conflicts,
check_global_model_lock,
set_locked_model_config,
@@ -2099,7 +2111,11 @@ def embeddings_generate(
console.print(f" {msg}")
console.print(f"[bold]Generating embeddings[/bold]")
if use_recursive:
if centralized:
effective_root = index_root if index_root else (index_path.parent if index_path else target_path)
console.print(f"Index root: [dim]{effective_root}[/dim]")
console.print(f"Mode: [green]Centralized[/green]")
elif use_recursive:
console.print(f"Index root: [dim]{index_root}[/dim]")
console.print(f"Mode: [yellow]Recursive[/yellow]")
else:
@@ -2179,7 +2195,20 @@ def embeddings_generate(
console.print("[yellow]Cancelled.[/yellow] Use --force to skip this prompt.")
raise typer.Exit(code=0)
if use_recursive:
if centralized:
# Centralized mode: single HNSW index at project root
if not index_root:
index_root = index_path.parent if index_path else target_path
result = generate_dense_embeddings_centralized(
index_root,
embedding_backend=backend,
model_profile=model,
force=force,
chunk_size=chunk_size,
progress_callback=progress_update,
max_workers=max_workers,
)
elif use_recursive:
result = generate_embeddings_recursive(
index_root,
embedding_backend=backend,
@@ -2225,7 +2254,18 @@ def embeddings_generate(
# This prevents using different models for future indexes
set_locked_model_config(backend, model)
if use_recursive:
if centralized:
# Centralized mode output
elapsed = data.get("elapsed_time", 0)
console.print(f"[green]✓[/green] Centralized embeddings generated successfully!")
console.print(f" Model: {data.get('model_name', model)}")
console.print(f" Chunks created: {data['chunks_created']:,}")
console.print(f" Files processed: {data['files_processed']}")
if data.get("files_failed", 0) > 0:
console.print(f" [yellow]Files failed: {data['files_failed']}[/yellow]")
console.print(f" Central index: {data.get('central_index_path', 'N/A')}")
console.print(f" Time: {elapsed:.1f}s")
elif use_recursive:
# Recursive mode output
console.print(f"[green]✓[/green] Recursive embeddings generation complete!")
console.print(f" Indexes processed: {data['indexes_processed']}")

View File

@@ -17,6 +17,11 @@ except ImportError:
def is_embedding_backend_available(_backend: str): # type: ignore[no-redef]
return False, "codexlens.semantic not available"
try:
from codexlens.config import VECTORS_META_DB_NAME
except ImportError:
VECTORS_META_DB_NAME = "_vectors_meta.db"
try:
from codexlens.search.ranking import get_file_category
except ImportError:
@@ -1277,10 +1282,38 @@ def generate_dense_embeddings_centralized(
}
# Store chunk metadata in a centralized metadata database
vectors_meta_path = index_root / "VECTORS_META_DB_NAME"
# Note: The metadata is already stored in individual _index.db semantic_chunks tables
# For now, we rely on the existing per-index storage for metadata lookup
# A future enhancement could consolidate metadata into _vectors_meta.db
vectors_meta_path = index_root / VECTORS_META_DB_NAME
if chunk_id_to_info:
if progress_callback:
progress_callback(f"Storing {len(chunk_id_to_info)} chunk metadata records...")
try:
from codexlens.storage.vector_meta_store import VectorMetadataStore
with VectorMetadataStore(vectors_meta_path) as meta_store:
# Convert chunk_id_to_info dict to list of dicts for batch insert
chunks_to_store = []
for cid, info in chunk_id_to_info.items():
metadata = info.get("metadata", {})
chunks_to_store.append({
"chunk_id": cid,
"file_path": info["file_path"],
"content": info["content"],
"start_line": metadata.get("start_line"),
"end_line": metadata.get("end_line"),
"category": info.get("category"),
"metadata": metadata,
"source_index_db": None, # Not tracked per-chunk currently
})
meta_store.add_chunks(chunks_to_store)
if progress_callback:
progress_callback(f"Saved metadata to {vectors_meta_path}")
except Exception as e:
logger.warning("Failed to store vector metadata: %s", e)
# Non-fatal: continue without centralized metadata
elapsed_time = time.time() - start_time

View File

@@ -664,10 +664,15 @@ class HybridSearchEngine:
scores: List[float],
category: Optional[str] = None,
) -> List[SearchResult]:
"""Fetch chunk metadata from all _index.db files for centralized search.
"""Fetch chunk metadata from centralized _vectors_meta.db for fast lookup.
This method uses the centralized VectorMetadataStore for O(1) lookup
instead of traversing all _index.db files (O(n) where n = number of indexes).
Falls back to the legacy per-index lookup if centralized metadata is unavailable.
Args:
index_root: Root directory containing _index.db files
index_root: Root directory containing _vectors_meta.db
chunk_ids: List of chunk IDs from ANN search
scores: Corresponding similarity scores
category: Optional category filter
@@ -675,12 +680,123 @@ class HybridSearchEngine:
Returns:
List of SearchResult objects
"""
import sqlite3
import json
from codexlens.config import VECTORS_META_DB_NAME
# Build score map
score_map = {cid: score for cid, score in zip(chunk_ids, scores)}
# Try centralized metadata store first (fast path)
vectors_meta_path = index_root / VECTORS_META_DB_NAME
if vectors_meta_path.exists():
try:
return self._fetch_from_vector_meta_store(
vectors_meta_path, chunk_ids, score_map, category
)
except Exception as e:
self.logger.debug(
"Centralized metadata lookup failed, falling back: %s", e
)
# Fallback: traverse _index.db files (legacy path)
return self._fetch_chunks_by_ids_legacy(
index_root, chunk_ids, score_map, category
)
def _fetch_from_vector_meta_store(
self,
meta_db_path: Path,
chunk_ids: List[int],
score_map: Dict[int, float],
category: Optional[str] = None,
) -> List[SearchResult]:
"""Fetch chunks from centralized VectorMetadataStore.
Args:
meta_db_path: Path to _vectors_meta.db
chunk_ids: List of chunk IDs to fetch
score_map: Mapping of chunk_id to score
category: Optional category filter
Returns:
List of SearchResult objects
"""
from codexlens.storage.vector_meta_store import VectorMetadataStore
results = []
with VectorMetadataStore(meta_db_path) as meta_store:
rows = meta_store.get_chunks_by_ids(chunk_ids, category=category)
for row in rows:
chunk_id = row["chunk_id"]
file_path = row["file_path"]
content = row["content"] or ""
metadata = row.get("metadata") or {}
start_line = row.get("start_line")
end_line = row.get("end_line")
score = score_map.get(chunk_id, 0.0)
# Build excerpt
excerpt = content[:200] + "..." if len(content) > 200 else content
# Extract symbol information
symbol_name = metadata.get("symbol_name")
symbol_kind = metadata.get("symbol_kind")
# Build Symbol object if available
symbol = None
if symbol_name and symbol_kind and start_line and end_line:
try:
from codexlens.entities import Symbol
symbol = Symbol(
name=symbol_name,
kind=symbol_kind,
range=(start_line, end_line)
)
except Exception:
pass
results.append(SearchResult(
path=file_path,
score=score,
excerpt=excerpt,
content=content,
symbol=symbol,
metadata=metadata,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
))
# Sort by score descending
results.sort(key=lambda r: r.score, reverse=True)
return results
def _fetch_chunks_by_ids_legacy(
self,
index_root: Path,
chunk_ids: List[int],
score_map: Dict[int, float],
category: Optional[str] = None,
) -> List[SearchResult]:
"""Legacy fallback: fetch chunk metadata by traversing all _index.db files.
This is the O(n) fallback path used when centralized metadata is unavailable.
Args:
index_root: Root directory containing _index.db files
chunk_ids: List of chunk IDs from ANN search
score_map: Mapping of chunk_id to score
category: Optional category filter
Returns:
List of SearchResult objects
"""
import sqlite3
import json
# Find all _index.db files
index_files = list(index_root.rglob("_index.db"))

View File

@@ -7,6 +7,7 @@ from .path_mapper import PathMapper
from .registry import RegistryStore, ProjectInfo, DirMapping
from .dir_index import DirIndexStore, SubdirLink, FileEntry
from .index_tree import IndexTreeBuilder, BuildResult, DirBuildResult
from .vector_meta_store import VectorMetadataStore
__all__ = [
# Legacy (workspace-local)
@@ -25,5 +26,7 @@ __all__ = [
"IndexTreeBuilder",
"BuildResult",
"DirBuildResult",
# Vector metadata
"VectorMetadataStore",
]

View File

@@ -0,0 +1,331 @@
"""Central storage for vector metadata.
This module provides a centralized SQLite database for storing chunk metadata
associated with centralized vector indexes. Instead of traversing all _index.db
files to fetch chunk metadata, this provides O(1) lookup by chunk ID.
"""
from __future__ import annotations
import json
import logging
import sqlite3
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional
from codexlens.errors import StorageError
logger = logging.getLogger(__name__)
class VectorMetadataStore:
"""Store and retrieve chunk metadata for centralized vector search.
This class provides efficient storage and retrieval of chunk metadata
for the centralized vector index architecture. All chunk metadata is
stored in a single _vectors_meta.db file at the project root, enabling
fast lookups without traversing multiple _index.db files.
Schema:
chunk_metadata:
- chunk_id: INTEGER PRIMARY KEY - Global chunk ID
- file_path: TEXT NOT NULL - Path to source file
- content: TEXT - Chunk text content
- start_line: INTEGER - Start line in source file
- end_line: INTEGER - End line in source file
- category: TEXT - Content category (code/doc)
- metadata: TEXT - JSON-encoded additional metadata
- source_index_db: TEXT - Path to source _index.db file
"""
def __init__(self, db_path: Path | str) -> None:
"""Initialize VectorMetadataStore.
Args:
db_path: Path to SQLite database file.
"""
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Thread-safe connection management
self._lock = threading.RLock()
self._local = threading.local()
def _get_connection(self) -> sqlite3.Connection:
"""Get or create a thread-local database connection.
Each thread gets its own connection to ensure thread safety.
"""
conn = getattr(self._local, "conn", None)
if conn is None:
conn = sqlite3.connect(
str(self.db_path),
timeout=30.0,
check_same_thread=True,
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA mmap_size=1073741824") # 1GB mmap
self._local.conn = conn
return conn
def _ensure_schema(self) -> None:
"""Create tables if they don't exist."""
with self._lock:
conn = self._get_connection()
try:
conn.execute('''
CREATE TABLE IF NOT EXISTS chunk_metadata (
chunk_id INTEGER PRIMARY KEY,
file_path TEXT NOT NULL,
content TEXT,
start_line INTEGER,
end_line INTEGER,
category TEXT,
metadata TEXT,
source_index_db TEXT
)
''')
conn.execute(
'CREATE INDEX IF NOT EXISTS idx_chunk_file_path '
'ON chunk_metadata(file_path)'
)
conn.execute(
'CREATE INDEX IF NOT EXISTS idx_chunk_category '
'ON chunk_metadata(category)'
)
conn.commit()
logger.debug("VectorMetadataStore schema created/verified")
except sqlite3.Error as e:
raise StorageError(
f"Failed to create schema: {e}",
db_path=str(self.db_path),
operation="_ensure_schema"
) from e
def add_chunk(
self,
chunk_id: int,
file_path: str,
content: str,
start_line: Optional[int] = None,
end_line: Optional[int] = None,
category: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
source_index_db: Optional[str] = None,
) -> None:
"""Add a single chunk's metadata.
Args:
chunk_id: Global unique chunk ID.
file_path: Path to source file.
content: Chunk text content.
start_line: Start line in source file.
end_line: End line in source file.
category: Content category (code/doc).
metadata: Additional metadata dictionary.
source_index_db: Path to source _index.db file.
"""
with self._lock:
conn = self._get_connection()
try:
metadata_json = json.dumps(metadata) if metadata else None
conn.execute(
'''
INSERT OR REPLACE INTO chunk_metadata
(chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''',
(chunk_id, file_path, content, start_line, end_line,
category, metadata_json, source_index_db)
)
conn.commit()
except sqlite3.Error as e:
raise StorageError(
f"Failed to add chunk {chunk_id}: {e}",
db_path=str(self.db_path),
operation="add_chunk"
) from e
def add_chunks(self, chunks: List[Dict[str, Any]]) -> None:
"""Batch insert chunk metadata.
Args:
chunks: List of dictionaries with keys:
- chunk_id (required): Global unique chunk ID
- file_path (required): Path to source file
- content: Chunk text content
- start_line: Start line in source file
- end_line: End line in source file
- category: Content category (code/doc)
- metadata: Additional metadata dictionary
- source_index_db: Path to source _index.db file
"""
if not chunks:
return
with self._lock:
conn = self._get_connection()
try:
batch_data = []
for chunk in chunks:
metadata = chunk.get("metadata")
metadata_json = json.dumps(metadata) if metadata else None
batch_data.append((
chunk["chunk_id"],
chunk["file_path"],
chunk.get("content"),
chunk.get("start_line"),
chunk.get("end_line"),
chunk.get("category"),
metadata_json,
chunk.get("source_index_db"),
))
conn.executemany(
'''
INSERT OR REPLACE INTO chunk_metadata
(chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''',
batch_data
)
conn.commit()
logger.debug("Batch inserted %d chunk metadata records", len(chunks))
except sqlite3.Error as e:
raise StorageError(
f"Failed to batch insert chunks: {e}",
db_path=str(self.db_path),
operation="add_chunks"
) from e
def get_chunks_by_ids(
self,
chunk_ids: List[int],
category: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Retrieve chunks by their IDs - the key optimization.
This is the primary method that replaces traversing all _index.db files.
Provides O(1) lookup by chunk ID instead of O(n) where n is the number
of index databases.
Args:
chunk_ids: List of chunk IDs to retrieve.
category: Optional category filter ('code' or 'doc').
Returns:
List of dictionaries with chunk metadata:
- chunk_id: Global chunk ID
- file_path: Path to source file
- content: Chunk text content
- start_line: Start line in source file
- end_line: End line in source file
- category: Content category
- metadata: Parsed metadata dictionary
- source_index_db: Source _index.db path
"""
if not chunk_ids:
return []
with self._lock:
conn = self._get_connection()
try:
placeholders = ",".join("?" * len(chunk_ids))
if category:
query = f'''
SELECT chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db
FROM chunk_metadata
WHERE chunk_id IN ({placeholders}) AND category = ?
'''
params = list(chunk_ids) + [category]
else:
query = f'''
SELECT chunk_id, file_path, content, start_line, end_line,
category, metadata, source_index_db
FROM chunk_metadata
WHERE chunk_id IN ({placeholders})
'''
params = list(chunk_ids)
rows = conn.execute(query, params).fetchall()
results = []
for row in rows:
metadata = None
if row["metadata"]:
try:
metadata = json.loads(row["metadata"])
except json.JSONDecodeError:
metadata = {}
results.append({
"chunk_id": row["chunk_id"],
"file_path": row["file_path"],
"content": row["content"],
"start_line": row["start_line"],
"end_line": row["end_line"],
"category": row["category"],
"metadata": metadata or {},
"source_index_db": row["source_index_db"],
})
return results
except sqlite3.Error as e:
logger.error("Failed to get chunks by IDs: %s", e)
return []
def get_chunk_count(self) -> int:
"""Get total number of chunks in store.
Returns:
Total chunk count.
"""
with self._lock:
conn = self._get_connection()
try:
row = conn.execute(
"SELECT COUNT(*) FROM chunk_metadata"
).fetchone()
return row[0] if row else 0
except sqlite3.Error:
return 0
def clear(self) -> None:
"""Clear all metadata."""
with self._lock:
conn = self._get_connection()
try:
conn.execute("DELETE FROM chunk_metadata")
conn.commit()
logger.info("Cleared all chunk metadata")
except sqlite3.Error as e:
raise StorageError(
f"Failed to clear metadata: {e}",
db_path=str(self.db_path),
operation="clear"
) from e
def close(self) -> None:
"""Close database connection."""
with self._lock:
conn = getattr(self._local, "conn", None)
if conn is not None:
conn.close()
self._local.conn = None
def __enter__(self) -> "VectorMetadataStore":
"""Context manager entry."""
self._ensure_schema()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit."""
self.close()