fix(codexlens): Fix constructor and path handling issues

1. GlobalSymbolIndex constructor: Add project_id parameter lookup
   - Get project_id from registry using source_root
   - Pass project_id to GlobalSymbolIndex constructor

2. Binary cascade search path handling:
   - Add VectorMetadataStore import for centralized search
   - Fix _build_results_from_candidates to handle centralized mode
   - Use VectorMetadataStore for metadata, source_index_db for embeddings
   - Properly distinguish between index_root and index_path

3. Dense reranking for centralized search:
   - Get chunk metadata from _vectors_meta.db
   - Group chunks by source_index_db
   - Retrieve dense embeddings from respective _index.db files
This commit is contained in:
catlog22
2026-01-03 11:47:07 +08:00
parent a364a10d6a
commit 740bd1b61e
2 changed files with 135 additions and 32 deletions

View File

@@ -30,6 +30,8 @@ from codexlens.storage.dir_index import DirIndexStore, SubdirLink
from codexlens.storage.global_index import GlobalSymbolIndex
from codexlens.storage.path_mapper import PathMapper
from codexlens.storage.sqlite_store import SQLiteStore
from codexlens.storage.vector_meta_store import VectorMetadataStore
from codexlens.config import VECTORS_META_DB_NAME
from codexlens.search.hybrid_search import HybridSearchEngine
@@ -49,6 +51,8 @@ class SearchOptions:
enable_fuzzy: Enable fuzzy FTS in hybrid mode (default True)
enable_vector: Enable vector semantic search (default False)
pure_vector: If True, only use vector search without FTS fallback (default False)
enable_splade: Enable SPLADE sparse neural search (default False)
enable_cascade: Enable cascade (binary+dense) two-stage retrieval (default False)
hybrid_weights: Custom RRF weights for hybrid search (optional)
group_results: Enable grouping of similar results (default False)
grouping_threshold: Score threshold for grouping similar results (default 0.01)
@@ -64,6 +68,8 @@ class SearchOptions:
enable_fuzzy: bool = True
enable_vector: bool = False
pure_vector: bool = False
enable_splade: bool = False
enable_cascade: bool = False
hybrid_weights: Optional[Dict[str, float]] = None
group_results: bool = False
grouping_threshold: float = 0.01
@@ -622,7 +628,8 @@ class ChainSearchEngine:
)
# Fall back to using Hamming distance as score
return self._build_results_from_candidates(
coarse_candidates[:k], index_paths, stats, query, start_time
coarse_candidates[:k], index_paths, stats, query, start_time,
use_centralized=used_centralized
)
# Group candidates by index path for batch retrieval
@@ -634,30 +641,96 @@ class ChainSearchEngine:
# Retrieve dense embeddings and compute cosine similarity
scored_results: List[Tuple[float, SearchResult]] = []
import sqlite3
for index_path, chunk_ids in candidates_by_index.items():
try:
# Read directly from semantic_chunks table (where cascade-index stores data)
import sqlite3
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
# Collect valid rows and dense vectors for batch processing
valid_rows: List[Dict[str, Any]] = []
dense_vectors: List["np.ndarray"] = []
placeholders = ",".join("?" * len(chunk_ids))
rows = conn.execute(
f"SELECT id, file_path, content, embedding_dense FROM semantic_chunks WHERE id IN ({placeholders})",
chunk_ids
).fetchall()
conn.close()
if used_centralized:
# Centralized mode: index_path is actually index_root directory
# Dense embeddings are in per-directory _index.db files
# referenced by source_index_db in chunk_metadata
meta_db_path = index_path / VECTORS_META_DB_NAME
if not meta_db_path.exists():
self.logger.debug(
"VectorMetadataStore not found at %s, skipping dense reranking", meta_db_path
)
continue
# Batch processing: collect all valid embeddings first
valid_rows = []
dense_vectors = []
for row in rows:
dense_bytes = row["embedding_dense"]
if dense_bytes is not None:
valid_rows.append(row)
dense_vectors.append(np.frombuffer(dense_bytes, dtype=np.float32))
# Get chunk metadata with source_index_db references
meta_store = VectorMetadataStore(meta_db_path)
chunks_meta = meta_store.get_chunks_by_ids(chunk_ids)
# Group chunks by source_index_db
chunks_by_source: Dict[str, List[Dict[str, Any]]] = {}
for chunk in chunks_meta:
source_db = chunk.get("source_index_db")
if source_db:
if source_db not in chunks_by_source:
chunks_by_source[source_db] = []
chunks_by_source[source_db].append(chunk)
# Retrieve dense embeddings from each source_index_db
for source_db, source_chunks in chunks_by_source.items():
try:
source_chunk_ids = [c["chunk_id"] for c in source_chunks]
conn = sqlite3.connect(source_db)
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(source_chunk_ids))
# Try semantic_chunks first (newer schema), fall back to chunks
try:
rows = conn.execute(
f"SELECT id, embedding_dense FROM semantic_chunks WHERE id IN ({placeholders})",
source_chunk_ids
).fetchall()
except sqlite3.OperationalError:
rows = conn.execute(
f"SELECT id, embedding_dense FROM chunks WHERE id IN ({placeholders})",
source_chunk_ids
).fetchall()
conn.close()
# Build dense vector lookup
dense_lookup = {row["id"]: row["embedding_dense"] for row in rows}
# Process chunks with their embeddings
for chunk in source_chunks:
chunk_id = chunk["chunk_id"]
dense_bytes = dense_lookup.get(chunk_id)
if dense_bytes is not None:
valid_rows.append({
"id": chunk_id,
"file_path": chunk["file_path"],
"content": chunk["content"],
})
dense_vectors.append(np.frombuffer(dense_bytes, dtype=np.float32))
except Exception as exc:
self.logger.debug(
"Failed to get dense embeddings from %s: %s", source_db, exc
)
else:
# Per-directory mode: index_path is the _index.db file
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
placeholders = ",".join("?" * len(chunk_ids))
rows = conn.execute(
f"SELECT id, file_path, content, embedding_dense FROM semantic_chunks WHERE id IN ({placeholders})",
chunk_ids
).fetchall()
conn.close()
for row in rows:
dense_bytes = row["embedding_dense"]
if dense_bytes is not None:
valid_rows.append(dict(row))
dense_vectors.append(np.frombuffer(dense_bytes, dtype=np.float32))
# Skip if no dense embeddings found
if not dense_vectors:
continue
@@ -670,9 +743,9 @@ class ChainSearchEngine:
# Create search results
for i, row in enumerate(valid_rows):
score = float(scores[i])
excerpt = (row["content"] or "")[:500]
excerpt = (row.get("content") or "")[:500]
result = SearchResult(
path=row["file_path"] or "",
path=row.get("file_path") or "",
score=score,
excerpt=excerpt,
)
@@ -919,6 +992,7 @@ class ChainSearchEngine:
stats: SearchStats,
query: str,
start_time: float,
use_centralized: bool = False,
) -> ChainSearchResult:
"""Build ChainSearchResult from binary candidates using Hamming distance scores.
@@ -930,6 +1004,8 @@ class ChainSearchEngine:
stats: SearchStats to update
query: Original query string
start_time: Search start time for timing
use_centralized: If True, index_path is the index_root directory
and VectorMetadataStore should be used instead of SQLiteStore
Returns:
ChainSearchResult with results scored by Hamming distance
@@ -945,9 +1021,22 @@ class ChainSearchEngine:
for index_path, chunk_tuples in candidates_by_index.items():
try:
store = SQLiteStore(index_path)
chunk_ids = [c[0] for c in chunk_tuples]
chunks_data = store.get_chunks_by_ids(chunk_ids)
# Use VectorMetadataStore for centralized search, SQLiteStore for per-directory
if use_centralized:
# index_path is actually index_root directory for centralized search
meta_db_path = index_path / VECTORS_META_DB_NAME
if not meta_db_path.exists():
self.logger.debug(
"VectorMetadataStore not found at %s, skipping", meta_db_path
)
continue
meta_store = VectorMetadataStore(meta_db_path)
chunks_data = meta_store.get_chunks_by_ids(chunk_ids)
else:
store = SQLiteStore(index_path)
chunks_data = store.get_chunks_by_ids(chunk_ids)
chunk_content: Dict[int, Dict[str, Any]] = {
c["id"]: c for c in chunks_data
@@ -1341,6 +1430,7 @@ class ChainSearchEngine:
options.enable_fuzzy,
options.enable_vector,
options.pure_vector,
options.enable_splade,
options.hybrid_weights
): idx_path
for idx_path in index_paths
@@ -1369,6 +1459,7 @@ class ChainSearchEngine:
enable_fuzzy: bool = True,
enable_vector: bool = False,
pure_vector: bool = False,
enable_splade: bool = False,
hybrid_weights: Optional[Dict[str, float]] = None) -> List[SearchResult]:
"""Search a single index database.
@@ -1384,6 +1475,7 @@ class ChainSearchEngine:
enable_fuzzy: Enable fuzzy FTS in hybrid mode
enable_vector: Enable vector semantic search
pure_vector: If True, only use vector search without FTS fallback
enable_splade: If True, force SPLADE sparse neural search
hybrid_weights: Custom RRF weights for hybrid search
Returns:
@@ -1400,6 +1492,7 @@ class ChainSearchEngine:
enable_fuzzy=enable_fuzzy,
enable_vector=enable_vector,
pure_vector=pure_vector,
enable_splade=enable_splade,
)
else:
# Single-FTS search (exact or fuzzy mode)

View File

@@ -70,16 +70,27 @@ class IncrementalIndexer:
self._dir_stores: dict[Path, DirIndexStore] = {}
self._lock = __import__("threading").RLock()
def _get_global_index(self, index_root: Path) -> Optional[GlobalSymbolIndex]:
"""Get or create global symbol index."""
def _get_global_index(self, index_root: Path, source_root: Optional[Path] = None) -> Optional[GlobalSymbolIndex]:
"""Get or create global symbol index.
Args:
index_root: Root directory containing the global symbol index DB
source_root: Source directory root for looking up project_id from registry
"""
if not self.config.global_symbol_index_enabled:
return None
if self._global_index is None:
global_db_path = index_root / GlobalSymbolIndex.DEFAULT_DB_NAME
if global_db_path.exists():
self._global_index = GlobalSymbolIndex(global_db_path)
# Get project_id from registry using source_root
project_id = 0 # Default fallback
if source_root:
project_info = self.registry.get_project(source_root)
if project_info:
project_id = project_info.id
self._global_index = GlobalSymbolIndex(global_db_path, project_id=project_id)
return self._global_index
def _get_dir_store(self, dir_path: Path) -> Optional[DirIndexStore]:
@@ -94,10 +105,9 @@ class IncrementalIndexer:
return None
# Get index root for global index
index_root = self.mapper.source_to_index_dir(
self.mapper.get_project_root(dir_path) or dir_path
)
global_index = self._get_global_index(index_root)
source_root = self.mapper.get_project_root(dir_path) or dir_path
index_root = self.mapper.source_to_index_dir(source_root)
global_index = self._get_global_index(index_root, source_root=source_root)
store = DirIndexStore(
index_db,