Enhance semantic search capabilities and configuration

- Added category support for programming and documentation languages in Config.
- Implemented category-based filtering in HybridSearchEngine to improve search relevance based on query intent.
- Introduced functions for filtering results by category and determining file categories based on extensions.
- Updated VectorStore to include a category column in the database schema and modified chunk addition methods to support category tagging.
- Enhanced the WatcherConfig to ignore additional common directories and files.
- Created a benchmark script to compare performance between Binary Cascade, SPLADE, and Vector semantic search methods, including detailed result analysis and overlap comparison.
This commit is contained in:
catlog22
2026-01-02 15:01:20 +08:00
parent 92ed2524b7
commit 54fb7afdb2
7 changed files with 803 additions and 51 deletions

View File

@@ -0,0 +1,489 @@
"""Compare Binary Cascade, SPLADE, and Vector semantic search methods.
This script compares the three semantic retrieval approaches:
1. Binary Cascade: 256-bit binary vectors for coarse ranking
2. SPLADE: Sparse learned representations with inverted index
3. Vector Dense: Full semantic embeddings with cosine similarity
"""
import sys
import time
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from codexlens.storage.dir_index import DirIndexStore
from codexlens.storage.splade_index import SpladeIndex
from codexlens.semantic.vector_store import VectorStore
def get_filename(path: str) -> str:
"""Extract filename from path."""
if "\\" in path:
return path.split("\\")[-1]
elif "/" in path:
return path.split("/")[-1]
return path
def find_splade_db(index_root: Path) -> Path:
"""Find SPLADE database by searching directory tree."""
# Check root first
if (index_root / "_splade.db").exists():
return index_root / "_splade.db"
# Search in subdirectories
for splade_db in index_root.rglob("_splade.db"):
return splade_db
return None
def find_binary_indexes(index_root: Path):
"""Find all binary index files."""
return list(index_root.rglob("_index_binary_vectors.bin"))
# Test queries for semantic search comparison
TEST_QUERIES = [
"how to search code semantically",
"embedding generation for files",
"hybrid search with multiple backends",
"parse python source code",
"database storage for vectors",
]
# Index paths
INDEX_ROOT = Path(r"C:\Users\dyw\.codexlens\indexes\D\Claude_dms3\codex-lens")
def test_vector_search(query: str, limit: int = 10):
"""Test dense vector search."""
try:
from codexlens.semantic.factory import get_embedder
# Find an index with embeddings
all_results = []
total_time = 0
for index_db in INDEX_ROOT.rglob("_index.db"):
vector_store = VectorStore(index_db)
if vector_store.count_chunks() == 0:
continue
# Get embedder based on stored config
model_config = vector_store.get_model_config()
if model_config:
backend = model_config.get("backend", "fastembed")
model_name = model_config["model_name"]
model_profile = model_config["model_profile"]
if backend == "litellm":
embedder = get_embedder(backend="litellm", model=model_name)
else:
embedder = get_embedder(backend="fastembed", profile=model_profile)
else:
embedder = get_embedder(backend="fastembed", profile="code")
start = time.perf_counter()
query_embedding = embedder.embed_single(query)
results = vector_store.search_similar(
query_embedding=query_embedding,
top_k=limit,
min_score=0.0,
return_full_content=True,
)
total_time += (time.perf_counter() - start) * 1000
all_results.extend(results)
# Only need one successful search to get embedder initialized
if results:
break
# Sort by score and limit
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:limit], total_time, None
except Exception as e:
return [], 0, str(e)
def test_splade_search(query: str, limit: int = 10):
"""Test SPLADE sparse search."""
try:
from codexlens.semantic.splade_encoder import get_splade_encoder, check_splade_available
ok, err = check_splade_available()
if not ok:
return [], 0, f"SPLADE not available: {err}"
splade_db_path = find_splade_db(INDEX_ROOT)
if not splade_db_path:
return [], 0, "SPLADE database not found"
splade_index = SpladeIndex(splade_db_path)
if not splade_index.has_index():
return [], 0, "SPLADE index not initialized"
start = time.perf_counter()
encoder = get_splade_encoder()
query_sparse = encoder.encode_text(query)
raw_results = splade_index.search(query_sparse, limit=limit, min_score=0.0)
if not raw_results:
elapsed = (time.perf_counter() - start) * 1000
return [], elapsed, None
# Get chunk details
chunk_ids = [chunk_id for chunk_id, _ in raw_results]
score_map = {chunk_id: score for chunk_id, score in raw_results}
rows = splade_index.get_chunks_by_ids(chunk_ids)
elapsed = (time.perf_counter() - start) * 1000
# Build result objects
results = []
for row in rows:
chunk_id = row["id"]
results.append({
"path": row["file_path"],
"score": score_map.get(chunk_id, 0.0),
"content": row["content"][:200] + "..." if len(row["content"]) > 200 else row["content"],
})
# Sort by score
results.sort(key=lambda x: x["score"], reverse=True)
return results, elapsed, None
except Exception as e:
return [], 0, str(e)
def test_binary_cascade_search(query: str, limit: int = 10):
"""Test binary cascade search (binary coarse + dense fine ranking)."""
try:
from codexlens.semantic.ann_index import BinaryANNIndex
from codexlens.indexing.embedding import CascadeEmbeddingBackend
import numpy as np
import sqlite3
# Find binary indexes
binary_indexes = find_binary_indexes(INDEX_ROOT)
if not binary_indexes:
return [], 0, "No binary indexes found. Run 'codexlens cascade-index' first."
start = time.perf_counter()
# Initialize cascade backend for query encoding
cascade_backend = CascadeEmbeddingBackend()
# Encode query to binary and dense
binary_embeddings, dense_embeddings = cascade_backend.encode_cascade([query], batch_size=1)
query_binary = binary_embeddings[0]
query_dense = dense_embeddings[0]
all_results = []
for binary_index_path in binary_indexes:
# Find corresponding index.db
index_db = binary_index_path.parent / "_index.db"
if not index_db.exists():
continue
# Check if cascade embeddings exist
conn = sqlite3.connect(index_db)
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute(
"SELECT COUNT(*) FROM semantic_chunks WHERE embedding_binary IS NOT NULL"
)
binary_count = cursor.fetchone()[0]
if binary_count == 0:
conn.close()
continue
except Exception:
conn.close()
continue
# Stage 1: Binary coarse search
binary_index = BinaryANNIndex(index_db, dim=256)
try:
binary_index.load()
except Exception:
conn.close()
continue
# Pack query for binary search
from codexlens.indexing.embedding import pack_binary_embedding
query_binary_packed = pack_binary_embedding(query_binary)
# Get top candidates
coarse_limit = min(limit * 10, 100)
# search returns (ids, distances) tuple
coarse_ids, coarse_distances = binary_index.search(query_binary_packed, top_k=coarse_limit)
if not coarse_ids:
conn.close()
continue
# Stage 2: Dense reranking
chunk_ids = coarse_ids
placeholders = ",".join("?" * len(chunk_ids))
cursor = conn.execute(
f"""
SELECT id, file_path, content, embedding_dense
FROM semantic_chunks
WHERE id IN ({placeholders}) AND embedding_dense IS NOT NULL
""",
chunk_ids
)
rows = cursor.fetchall()
# Compute dense scores
for row in rows:
chunk_id = row["id"]
file_path = row["file_path"]
content = row["content"]
dense_blob = row["embedding_dense"]
if dense_blob:
dense_vec = np.frombuffer(dense_blob, dtype=np.float32)
# Cosine similarity
score = float(np.dot(query_dense, dense_vec) / (
np.linalg.norm(query_dense) * np.linalg.norm(dense_vec) + 1e-8
))
else:
score = 0.0
all_results.append({
"path": file_path,
"score": score,
"content": content[:200] + "..." if len(content) > 200 else content,
})
conn.close()
# Sort by dense score and limit
all_results.sort(key=lambda x: x["score"], reverse=True)
final_results = all_results[:limit]
elapsed = (time.perf_counter() - start) * 1000
return final_results, elapsed, None
except ImportError as e:
return [], 0, f"Import error: {e}"
except Exception as e:
import traceback
return [], 0, f"{str(e)}\n{traceback.format_exc()}"
def print_results(method_name: str, results, elapsed: float, error: str = None):
"""Print search results in a formatted way."""
print(f"\n{'='*60}")
print(f"Method: {method_name}")
print(f"{'='*60}")
if error:
print(f"ERROR: {error}")
return
print(f"Results: {len(results)}, Time: {elapsed:.1f}ms")
print("-" * 60)
for i, r in enumerate(results[:5], 1):
if isinstance(r, dict):
path = r.get("path", "?")
score = r.get("score", 0)
content = r.get("content", "")[:80]
else:
path = getattr(r, "path", "?")
score = getattr(r, "score", 0)
content = getattr(r, "content", "")[:80] if hasattr(r, "content") else ""
filename = get_filename(path)
print(f" {i}. [{score:.4f}] {filename}")
if content:
# Sanitize content for console output
safe_content = content.encode('ascii', 'replace').decode('ascii')
print(f" {safe_content}...")
def compare_overlap(results1, results2, name1: str, name2: str):
"""Compare result overlap between two methods."""
def get_paths(results):
paths = set()
for r in results[:10]:
if isinstance(r, dict):
paths.add(r.get("path", ""))
else:
paths.add(getattr(r, "path", ""))
return paths
paths1 = get_paths(results1)
paths2 = get_paths(results2)
if not paths1 or not paths2:
return 0.0
overlap = len(paths1 & paths2)
union = len(paths1 | paths2)
jaccard = overlap / union if union > 0 else 0.0
print(f" {name1} vs {name2}: {overlap} common files (Jaccard: {jaccard:.2f})")
return jaccard
def main():
print("=" * 70)
print("SEMANTIC SEARCH METHODS COMPARISON")
print("Binary Cascade vs SPLADE vs Vector Dense")
print("=" * 70)
# Check prerequisites
print("\n[Prerequisites Check]")
print(f" Index Root: {INDEX_ROOT}")
splade_db = find_splade_db(INDEX_ROOT)
print(f" SPLADE DB: {splade_db} - {'EXISTS' if splade_db else 'NOT FOUND'}")
binary_indexes = find_binary_indexes(INDEX_ROOT)
print(f" Binary Indexes: {len(binary_indexes)} found")
for bi in binary_indexes[:3]:
print(f" - {bi.parent.name}/{bi.name}")
if len(binary_indexes) > 3:
print(f" ... and {len(binary_indexes) - 3} more")
# Aggregate statistics
all_results = {
"binary": {"total_results": 0, "total_time": 0, "queries": 0, "errors": []},
"splade": {"total_results": 0, "total_time": 0, "queries": 0, "errors": []},
"vector": {"total_results": 0, "total_time": 0, "queries": 0, "errors": []},
}
overlap_scores = {"binary_splade": [], "binary_vector": [], "splade_vector": []}
for query in TEST_QUERIES:
print(f"\n{'#'*70}")
print(f"QUERY: \"{query}\"")
print("#" * 70)
# Test each method
binary_results, binary_time, binary_err = test_binary_cascade_search(query)
splade_results, splade_time, splade_err = test_splade_search(query)
vector_results, vector_time, vector_err = test_vector_search(query)
# Print results
print_results("Binary Cascade (256-bit + Dense Rerank)", binary_results, binary_time, binary_err)
print_results("SPLADE (Sparse Learned)", splade_results, splade_time, splade_err)
print_results("Vector Dense (Semantic Embeddings)", vector_results, vector_time, vector_err)
# Update statistics
if not binary_err:
all_results["binary"]["total_results"] += len(binary_results)
all_results["binary"]["total_time"] += binary_time
all_results["binary"]["queries"] += 1
else:
all_results["binary"]["errors"].append(binary_err)
if not splade_err:
all_results["splade"]["total_results"] += len(splade_results)
all_results["splade"]["total_time"] += splade_time
all_results["splade"]["queries"] += 1
else:
all_results["splade"]["errors"].append(splade_err)
if not vector_err:
all_results["vector"]["total_results"] += len(vector_results)
all_results["vector"]["total_time"] += vector_time
all_results["vector"]["queries"] += 1
else:
all_results["vector"]["errors"].append(vector_err)
# Compare overlap
print("\n[Result Overlap Analysis]")
if binary_results and splade_results:
j = compare_overlap(binary_results, splade_results, "Binary", "SPLADE")
overlap_scores["binary_splade"].append(j)
if binary_results and vector_results:
j = compare_overlap(binary_results, vector_results, "Binary", "Vector")
overlap_scores["binary_vector"].append(j)
if splade_results and vector_results:
j = compare_overlap(splade_results, vector_results, "SPLADE", "Vector")
overlap_scores["splade_vector"].append(j)
# Print summary
print("\n" + "=" * 70)
print("SUMMARY STATISTICS")
print("=" * 70)
for method, stats in all_results.items():
queries = stats["queries"]
if queries > 0:
avg_results = stats["total_results"] / queries
avg_time = stats["total_time"] / queries
print(f"\n{method.upper()}:")
print(f" Successful queries: {queries}/{len(TEST_QUERIES)}")
print(f" Avg results: {avg_results:.1f}")
print(f" Avg time: {avg_time:.1f}ms")
else:
print(f"\n{method.upper()}: No successful queries")
if stats["errors"]:
# Show truncated error
err = stats["errors"][0]
if len(err) > 200:
err = err[:200] + "..."
print(f" Error: {err}")
print("\n[Average Overlap Scores]")
for pair, scores in overlap_scores.items():
if scores:
avg = sum(scores) / len(scores)
print(f" {pair}: {avg:.3f}")
print("\n" + "=" * 70)
print("ANALYSIS")
print("=" * 70)
# Analyze working methods
working_methods = [m for m, s in all_results.items() if s["queries"] > 0]
if len(working_methods) == 3:
# All methods working - compare quality
print("\nAll three methods working. Quality comparison:")
# Compare avg results
print("\n Result Coverage (higher = more recall):")
for m in ["vector", "splade", "binary"]:
stats = all_results[m]
if stats["queries"] > 0:
avg = stats["total_results"] / stats["queries"]
print(f" {m.upper()}: {avg:.1f} results/query")
# Compare speed
print("\n Speed (lower = faster):")
for m in ["binary", "splade", "vector"]:
stats = all_results[m]
if stats["queries"] > 0:
avg = stats["total_time"] / stats["queries"]
print(f" {m.upper()}: {avg:.1f}ms")
# Recommend fusion strategy
print("\n Recommended Fusion Strategy:")
print(" For quality-focused hybrid search:")
print(" 1. Run all three in parallel")
print(" 2. Use RRF fusion with weights:")
print(" - Vector: 0.4 (best semantic understanding)")
print(" - SPLADE: 0.35 (learned sparse representations)")
print(" - Binary: 0.25 (fast coarse filtering)")
print(" 3. Apply CrossEncoder reranking on top-50")
elif len(working_methods) >= 2:
print(f"\n{len(working_methods)} methods working: {', '.join(working_methods)}")
print("Consider fixing missing method for complete hybrid search.")
else:
print(f"\nOnly {working_methods[0] if working_methods else 'no'} method(s) working.")
print("Check your index setup.")
if __name__ == "__main__":
main()

View File

@@ -17,6 +17,20 @@ except ImportError:
def is_embedding_backend_available(_backend: str): # type: ignore[no-redef]
return False, "codexlens.semantic not available"
try:
from codexlens.search.ranking import get_file_category
except ImportError:
def get_file_category(path: str): # type: ignore[no-redef]
"""Fallback: map common extensions to category."""
ext = Path(path).suffix.lower()
code_exts = {".py", ".js", ".jsx", ".ts", ".tsx", ".java", ".go", ".c", ".cpp", ".rs"}
doc_exts = {".md", ".mdx", ".txt", ".rst"}
if ext in code_exts:
return "code"
elif ext in doc_exts:
return "doc"
return None
logger = logging.getLogger(__name__)
# Embedding batch size - larger values improve throughput on modern hardware
@@ -24,6 +38,22 @@ logger = logging.getLogger(__name__)
EMBEDDING_BATCH_SIZE = 256
def _build_categories_from_batch(chunk_batch: List[Tuple[Any, str]]) -> List[str]:
"""Build categories list from chunk batch for index-level category filtering.
Args:
chunk_batch: List of (chunk, file_path) tuples
Returns:
List of category strings ('code' or 'doc'), defaulting to 'code' for unknown
"""
categories = []
for _, file_path in chunk_batch:
cat = get_file_category(file_path)
categories.append(cat if cat else "code") # Default to 'code' for unknown extensions
return categories
def _cleanup_fastembed_resources() -> None:
"""Best-effort cleanup for fastembed/ONNX resources (no-op for other backends)."""
try:
@@ -577,8 +607,9 @@ def generate_embeddings(
batch_contents = [chunk.content for chunk, _ in chunk_batch]
embeddings_numpy = embedder.embed_to_numpy(batch_contents, batch_size=EMBEDDING_BATCH_SIZE)
# Store embeddings
vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy)
# Store embeddings with category
categories = _build_categories_from_batch(chunk_batch)
vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy, categories=categories)
files_seen.update(batch_files)
total_chunks_created += len(chunk_batch)
@@ -630,7 +661,8 @@ def generate_embeddings(
batch_num, chunk_batch, embeddings_numpy, batch_files, error = f.result()
if embeddings_numpy is not None and error is None:
# Write to DB in main thread (no contention)
vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy)
categories = _build_categories_from_batch(chunk_batch)
vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy, categories=categories)
total_chunks_created += len(chunk_batch)
files_seen.update(batch_files)
total_files_processed = len(files_seen)
@@ -667,7 +699,8 @@ def generate_embeddings(
try:
batch_num, chunk_batch, embeddings_numpy, batch_files, error = future.result()
if embeddings_numpy is not None and error is None:
vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy)
categories = _build_categories_from_batch(chunk_batch)
vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy, categories=categories)
total_chunks_created += len(chunk_batch)
files_seen.update(batch_files)
total_files_processed = len(files_seen)

View File

@@ -67,15 +67,21 @@ class Config:
venv_path: Path = field(default_factory=lambda: _default_global_dir() / "venv")
supported_languages: Dict[str, Dict[str, Any]] = field(
default_factory=lambda: {
"python": {"extensions": [".py"], "tree_sitter_language": "python"},
"javascript": {"extensions": [".js", ".jsx"], "tree_sitter_language": "javascript"},
"typescript": {"extensions": [".ts", ".tsx"], "tree_sitter_language": "typescript"},
"java": {"extensions": [".java"], "tree_sitter_language": "java"},
"go": {"extensions": [".go"], "tree_sitter_language": "go"},
"zig": {"extensions": [".zig"], "tree_sitter_language": "zig"},
"objective-c": {"extensions": [".m", ".mm"], "tree_sitter_language": "objc"},
"markdown": {"extensions": [".md", ".mdx"], "tree_sitter_language": None},
"text": {"extensions": [".txt"], "tree_sitter_language": None},
# Source code languages (category: "code")
"python": {"extensions": [".py"], "tree_sitter_language": "python", "category": "code"},
"javascript": {"extensions": [".js", ".jsx"], "tree_sitter_language": "javascript", "category": "code"},
"typescript": {"extensions": [".ts", ".tsx"], "tree_sitter_language": "typescript", "category": "code"},
"java": {"extensions": [".java"], "tree_sitter_language": "java", "category": "code"},
"go": {"extensions": [".go"], "tree_sitter_language": "go", "category": "code"},
"zig": {"extensions": [".zig"], "tree_sitter_language": "zig", "category": "code"},
"objective-c": {"extensions": [".m", ".mm"], "tree_sitter_language": "objc", "category": "code"},
"c": {"extensions": [".c", ".h"], "tree_sitter_language": "c", "category": "code"},
"cpp": {"extensions": [".cc", ".cpp", ".hpp", ".cxx"], "tree_sitter_language": "cpp", "category": "code"},
"rust": {"extensions": [".rs"], "tree_sitter_language": "rust", "category": "code"},
# Documentation languages (category: "doc")
"markdown": {"extensions": [".md", ".mdx"], "tree_sitter_language": None, "category": "doc"},
"text": {"extensions": [".txt"], "tree_sitter_language": None, "category": "doc"},
"rst": {"extensions": [".rst"], "tree_sitter_language": None, "category": "doc"},
}
)
parsing_rules: Dict[str, Dict[str, Any]] = field(
@@ -141,6 +147,9 @@ class Config:
fusion_method: str = "rrf" # "simple" (weighted sum) or "rrf" (reciprocal rank fusion)
rrf_k: int = 60 # RRF constant (default 60)
# Category-based filtering to separate code/doc results
enable_category_filter: bool = True # Enable code/doc result separation
# Multi-endpoint configuration for litellm backend
embedding_endpoints: List[Dict[str, Any]] = field(default_factory=list)
# List of endpoint configs: [{"model": "...", "api_key": "...", "api_base": "...", "weight": 1.0}]
@@ -210,6 +219,14 @@ class Config:
return language_id
return None
def category_for_path(self, path: str | Path) -> str | None:
"""Get file category ('code' or 'doc') from a file path."""
language = self.language_for_path(path)
if language is None:
return None
spec = self.supported_languages.get(language, {})
return spec.get("category")
def rules_for_language(self, language_id: str) -> Dict[str, Any]:
"""Get parsing rules for a specific language, falling back to defaults."""
return {**self.parsing_rules.get("default", {}), **self.parsing_rules.get(language_id, {})}

View File

@@ -35,8 +35,11 @@ from codexlens.entities import SearchResult
from codexlens.search.ranking import (
DEFAULT_WEIGHTS,
FTS_FALLBACK_WEIGHTS,
QueryIntent,
apply_symbol_boost,
cross_encoder_rerank,
detect_query_intent,
filter_results_by_category,
get_rrf_weights,
reciprocal_rank_fusion,
rerank_results,
@@ -131,6 +134,16 @@ class HybridSearchEngine:
except OSError:
return []
# Detect query intent early for category filtering at index level
query_intent = detect_query_intent(query)
# Map intent to category for vector search:
# - KEYWORD (code intent) -> filter to 'code' only
# - SEMANTIC (doc intent) -> no filter (allow docs to surface)
# - MIXED -> no filter (allow all)
vector_category: Optional[str] = None
if query_intent == QueryIntent.KEYWORD:
vector_category = "code"
# Determine which backends to use
backends = {}
@@ -183,7 +196,7 @@ class HybridSearchEngine:
# Execute parallel searches
with timer("parallel_search_total", self.logger):
results_map = self._search_parallel(index_path, query, backends, limit)
results_map = self._search_parallel(index_path, query, backends, limit, vector_category)
# Provide helpful message if pure-vector mode returns no results
if pure_vector and enable_vector and len(results_map.get("vector", [])) == 0:
@@ -263,6 +276,19 @@ class HybridSearchEngine:
top_k=self._config.reranker_top_k,
)
# Apply category filtering to avoid code/doc pollution
# This ensures KEYWORD queries return code files, SEMANTIC queries prefer docs
enable_category_filter = (
self._config is None
or getattr(self._config, 'enable_category_filter', True)
)
if enable_category_filter and not pure_vector:
with timer("category_filter", self.logger):
query_intent = detect_query_intent(query)
fused_results = filter_results_by_category(
fused_results, query_intent, allow_mixed=True
)
# Apply final limit
return fused_results[:limit]
@@ -361,6 +387,7 @@ class HybridSearchEngine:
query: str,
backends: Dict[str, bool],
limit: int,
category: Optional[str] = None,
) -> Dict[str, List[SearchResult]]:
"""Execute parallel searches across enabled backends.
@@ -369,6 +396,7 @@ class HybridSearchEngine:
query: FTS5 query string
backends: Dictionary of backend name to enabled flag
limit: Results limit per backend
category: Optional category filter for vector search ('code' or 'doc')
Returns:
Dictionary mapping source name to results list
@@ -399,7 +427,7 @@ class HybridSearchEngine:
if backends.get("vector"):
submit_times["vector"] = time.perf_counter()
future = executor.submit(
self._search_vector, index_path, query, limit
self._search_vector, index_path, query, limit, category
)
future_to_source[future] = "vector"
@@ -490,7 +518,7 @@ class HybridSearchEngine:
return []
def _search_vector(
self, index_path: Path, query: str, limit: int
self, index_path: Path, query: str, limit: int, category: Optional[str] = None
) -> List[SearchResult]:
"""Execute vector similarity search using semantic embeddings.
@@ -498,6 +526,7 @@ class HybridSearchEngine:
index_path: Path to _index.db file
query: Natural language query string
limit: Maximum results
category: Optional category filter ('code' or 'doc')
Returns:
List of SearchResult objects ordered by semantic similarity
@@ -616,6 +645,7 @@ class HybridSearchEngine:
top_k=limit,
min_score=0.0, # Return all results, let RRF handle filtering
return_full_content=True,
category=category,
)
self.logger.debug(
"[TIMING] vector_similarity_search: %.2fms (%d results)",

View File

@@ -9,7 +9,8 @@ from __future__ import annotations
import re
import math
from enum import Enum
from typing import Any, Dict, List
from pathlib import Path
from typing import Any, Dict, List, Optional
from codexlens.entities import SearchResult, AdditionalLocation
@@ -132,6 +133,83 @@ def get_rrf_weights(
return adjust_weights_by_intent(detect_query_intent(query), base_weights)
# File extensions to category mapping for fast lookup
_EXT_TO_CATEGORY: Dict[str, str] = {
# Code extensions
".py": "code", ".js": "code", ".jsx": "code", ".ts": "code", ".tsx": "code",
".java": "code", ".go": "code", ".zig": "code", ".m": "code", ".mm": "code",
".c": "code", ".h": "code", ".cc": "code", ".cpp": "code", ".hpp": "code", ".cxx": "code",
".rs": "code",
# Doc extensions
".md": "doc", ".mdx": "doc", ".txt": "doc", ".rst": "doc",
}
def get_file_category(path: str) -> Optional[str]:
"""Get file category ('code' or 'doc') from path extension.
Args:
path: File path string
Returns:
'code', 'doc', or None if unknown
"""
ext = Path(path).suffix.lower()
return _EXT_TO_CATEGORY.get(ext)
def filter_results_by_category(
results: List[SearchResult],
intent: QueryIntent,
allow_mixed: bool = True,
) -> List[SearchResult]:
"""Filter results by category based on query intent.
Strategy:
- KEYWORD (code intent): Only return code files
- SEMANTIC (doc intent): Prefer docs, but allow code if allow_mixed=True
- MIXED: Return all results
Args:
results: List of SearchResult objects
intent: Query intent from detect_query_intent()
allow_mixed: If True, SEMANTIC intent includes code files with lower priority
Returns:
Filtered and re-ranked list of SearchResult objects
"""
if not results or intent == QueryIntent.MIXED:
return results
code_results = []
doc_results = []
unknown_results = []
for r in results:
category = get_file_category(r.path)
if category == "code":
code_results.append(r)
elif category == "doc":
doc_results.append(r)
else:
unknown_results.append(r)
if intent == QueryIntent.KEYWORD:
# Code intent: return only code files + unknown (might be code)
filtered = code_results + unknown_results
elif intent == QueryIntent.SEMANTIC:
if allow_mixed:
# Semantic intent with mixed: docs first, then code
filtered = doc_results + code_results + unknown_results
else:
# Semantic intent strict: only docs
filtered = doc_results + unknown_results
else:
filtered = results
return filtered
def simple_weighted_fusion(
results_map: Dict[str, List[SearchResult]],
weights: Dict[str, float] = None,

View File

@@ -155,6 +155,7 @@ class VectorStore:
content TEXT NOT NULL,
embedding BLOB NOT NULL,
metadata TEXT,
category TEXT DEFAULT 'code',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
@@ -162,6 +163,10 @@ class VectorStore:
CREATE INDEX IF NOT EXISTS idx_chunks_file
ON semantic_chunks(file_path)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_chunks_category
ON semantic_chunks(category)
""")
# Model configuration table - tracks which model generated the embeddings
conn.execute("""
CREATE TABLE IF NOT EXISTS embeddings_config (
@@ -177,6 +182,8 @@ class VectorStore:
# Migration: Add backend column to existing tables
self._migrate_backend_column(conn)
# Migration: Add category column
self._migrate_category_column(conn)
conn.commit()
@@ -197,6 +204,28 @@ class VectorStore:
ADD COLUMN backend TEXT NOT NULL DEFAULT 'fastembed'
""")
def _migrate_category_column(self, conn: sqlite3.Connection) -> None:
"""Add category column to existing semantic_chunks table if not present.
Args:
conn: Active SQLite connection
"""
# Check if category column exists
cursor = conn.execute("PRAGMA table_info(semantic_chunks)")
columns = [row[1] for row in cursor.fetchall()]
if 'category' not in columns:
logger.info("Migrating semantic_chunks table: adding category column")
conn.execute("""
ALTER TABLE semantic_chunks
ADD COLUMN category TEXT DEFAULT 'code'
""")
# Create index for fast category filtering
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_chunks_category
ON semantic_chunks(category)
""")
def _init_ann_index(self) -> None:
"""Initialize ANN index (lazy loading from existing data)."""
if not HNSWLIB_AVAILABLE:
@@ -390,9 +419,16 @@ class VectorStore:
self._ann_index = None
return False
def add_chunk(self, chunk: SemanticChunk, file_path: str) -> int:
def add_chunk(
self, chunk: SemanticChunk, file_path: str, category: str = "code"
) -> int:
"""Add a single chunk with its embedding.
Args:
chunk: SemanticChunk with embedding
file_path: Path to the source file
category: File category ('code' or 'doc'), default 'code'
Returns:
The inserted chunk ID.
"""
@@ -406,10 +442,10 @@ class VectorStore:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
INSERT INTO semantic_chunks (file_path, content, embedding, metadata)
VALUES (?, ?, ?, ?)
INSERT INTO semantic_chunks (file_path, content, embedding, metadata, category)
VALUES (?, ?, ?, ?, ?)
""",
(file_path, chunk.content, embedding_blob, metadata_json)
(file_path, chunk.content, embedding_blob, metadata_json, category)
)
conn.commit()
chunk_id = cursor.lastrowid or 0
@@ -427,9 +463,16 @@ class VectorStore:
self._invalidate_cache()
return chunk_id
def add_chunks(self, chunks: List[SemanticChunk], file_path: str) -> List[int]:
def add_chunks(
self, chunks: List[SemanticChunk], file_path: str, category: str = "code"
) -> List[int]:
"""Add multiple chunks with embeddings (batch insert).
Args:
chunks: List of SemanticChunk objects with embeddings
file_path: Path to the source file
category: File category ('code' or 'doc'), default 'code'
Returns:
List of inserted chunk IDs.
"""
@@ -445,7 +488,7 @@ class VectorStore:
embedding_arr = np.array(chunk.embedding, dtype=np.float32)
embedding_blob = embedding_arr.tobytes()
metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json))
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json, category))
embeddings_list.append(embedding_arr)
# Batch insert to SQLite
@@ -456,8 +499,8 @@ class VectorStore:
conn.executemany(
"""
INSERT INTO semantic_chunks (file_path, content, embedding, metadata)
VALUES (?, ?, ?, ?)
INSERT INTO semantic_chunks (file_path, content, embedding, metadata, category)
VALUES (?, ?, ?, ?, ?)
""",
batch_data
)
@@ -484,6 +527,7 @@ class VectorStore:
chunks_with_paths: List[Tuple[SemanticChunk, str]],
update_ann: bool = True,
auto_save_ann: bool = True,
categories: Optional[List[str]] = None,
) -> List[int]:
"""Batch insert chunks from multiple files in a single transaction.
@@ -494,6 +538,8 @@ class VectorStore:
update_ann: If True, update ANN index with new vectors (default: True)
auto_save_ann: If True, save ANN index after update (default: True).
Set to False for bulk inserts to reduce I/O overhead.
categories: Optional list of categories per chunk. If None, defaults to 'code'.
If provided, must match length of chunks_with_paths.
Returns:
List of inserted chunk IDs
@@ -503,10 +549,17 @@ class VectorStore:
batch_size = len(chunks_with_paths)
# Validate categories if provided
if categories is not None and len(categories) != batch_size:
raise ValueError(
f"categories length ({len(categories)}) must match "
f"chunks_with_paths length ({batch_size})"
)
# Prepare batch data
batch_data = []
embeddings_list = []
for chunk, file_path in chunks_with_paths:
for i, (chunk, file_path) in enumerate(chunks_with_paths):
if chunk.embedding is None:
raise ValueError("All chunks must have embeddings")
# Optimize: avoid repeated np.array() if already numpy
@@ -516,7 +569,8 @@ class VectorStore:
embedding_arr = np.array(chunk.embedding, dtype=np.float32)
embedding_blob = embedding_arr.tobytes()
metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json))
category = categories[i] if categories else "code"
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json, category))
embeddings_list.append(embedding_arr)
# Batch insert to SQLite in single transaction
@@ -529,8 +583,8 @@ class VectorStore:
conn.executemany(
"""
INSERT INTO semantic_chunks (file_path, content, embedding, metadata)
VALUES (?, ?, ?, ?)
INSERT INTO semantic_chunks (file_path, content, embedding, metadata, category)
VALUES (?, ?, ?, ?, ?)
""",
batch_data
)
@@ -565,6 +619,7 @@ class VectorStore:
embeddings_matrix: np.ndarray,
update_ann: bool = True,
auto_save_ann: bool = True,
categories: Optional[List[str]] = None,
) -> List[int]:
"""Batch insert chunks with pre-computed numpy embeddings matrix.
@@ -576,6 +631,7 @@ class VectorStore:
embeddings_matrix: Pre-computed embeddings as (N, D) numpy array
update_ann: If True, update ANN index with new vectors (default: True)
auto_save_ann: If True, save ANN index after update (default: True)
categories: Optional list of categories per chunk. If None, defaults to 'code'.
Returns:
List of inserted chunk IDs
@@ -591,6 +647,13 @@ class VectorStore:
f"{embeddings_matrix.shape[0]} embeddings"
)
# Validate categories if provided
if categories is not None and len(categories) != batch_size:
raise ValueError(
f"categories length ({len(categories)}) must match "
f"chunks_with_paths length ({batch_size})"
)
# Ensure float32 format
embeddings_matrix = embeddings_matrix.astype(np.float32)
@@ -600,7 +663,8 @@ class VectorStore:
embedding_arr = embeddings_matrix[i]
embedding_blob = embedding_arr.tobytes()
metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json))
category = categories[i] if categories else "code"
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json, category))
# Batch insert to SQLite in single transaction
with sqlite3.connect(self.db_path) as conn:
@@ -612,8 +676,8 @@ class VectorStore:
conn.executemany(
"""
INSERT INTO semantic_chunks (file_path, content, embedding, metadata)
VALUES (?, ?, ?, ?)
INSERT INTO semantic_chunks (file_path, content, embedding, metadata, category)
VALUES (?, ?, ?, ?, ?)
""",
batch_data
)
@@ -765,6 +829,7 @@ class VectorStore:
top_k: int = 10,
min_score: float = 0.0,
return_full_content: bool = True,
category: Optional[str] = None,
) -> List[SearchResult]:
"""Find chunks most similar to query embedding.
@@ -776,6 +841,7 @@ class VectorStore:
top_k: Maximum results to return.
min_score: Minimum cosine similarity score in [0.0, 1.0].
return_full_content: If True, return full code block content.
category: Optional category filter ('code' or 'doc'). If None, returns all.
Returns:
List of SearchResult ordered by similarity (highest first).
@@ -796,14 +862,14 @@ class VectorStore:
):
try:
return self._search_with_ann(
query_vec, top_k, min_score, return_full_content
query_vec, top_k, min_score, return_full_content, category
)
except Exception as e:
logger.warning("ANN search failed, falling back to brute-force: %s", e)
# Fallback to brute-force search (O(N))
return self._search_brute_force(
query_vec, top_k, min_score, return_full_content
query_vec, top_k, min_score, return_full_content, category
)
def _search_with_ann(
@@ -812,6 +878,7 @@ class VectorStore:
top_k: int,
min_score: float,
return_full_content: bool,
category: Optional[str] = None,
) -> List[SearchResult]:
"""Search using HNSW index (O(log N)).
@@ -820,13 +887,16 @@ class VectorStore:
top_k: Maximum results to return
min_score: Minimum cosine similarity score in [0.0, 1.0]
return_full_content: If True, return full code block content
category: Optional category filter ('code' or 'doc')
Returns:
List of SearchResult ordered by similarity (highest first)
"""
# Limit top_k to available vectors to prevent hnswlib error
ann_count = self._ann_index.count()
effective_top_k = min(top_k, ann_count) if ann_count > 0 else 0
# When category filtering, fetch more candidates to compensate for filtering
fetch_k = top_k * 3 if category else top_k
effective_top_k = min(fetch_k, ann_count) if ann_count > 0 else 0
if effective_top_k == 0:
return []
@@ -875,8 +945,12 @@ class VectorStore:
top_ids = [f[0] for f in filtered]
top_scores = [f[1] for f in filtered]
# Fetch content from SQLite
return self._fetch_results_by_ids(top_ids, top_scores, return_full_content)
# Fetch content from SQLite with category filtering
results = self._fetch_results_by_ids(
top_ids, top_scores, return_full_content, category
)
# Apply final limit after category filtering
return results[:top_k]
def _search_brute_force(
self,
@@ -884,6 +958,7 @@ class VectorStore:
top_k: int,
min_score: float,
return_full_content: bool,
category: Optional[str] = None,
) -> List[SearchResult]:
"""Brute-force search using NumPy (O(N) fallback).
@@ -892,6 +967,7 @@ class VectorStore:
top_k: Maximum results to return
min_score: Minimum cosine similarity score in [0.0, 1.0]
return_full_content: If True, return full code block content
category: Optional category filter ('code' or 'doc')
Returns:
List of SearchResult ordered by similarity (highest first)
@@ -926,27 +1002,31 @@ class VectorStore:
if len(valid_indices) == 0:
return []
# Sort by score descending and take top_k
# When category filtering, fetch more candidates to compensate for filtering
fetch_k = top_k * 3 if category else top_k
# Sort by score descending and take top candidates
valid_scores = scores[valid_indices]
sorted_order = np.argsort(valid_scores)[::-1][:top_k]
sorted_order = np.argsort(valid_scores)[::-1][:fetch_k]
top_indices = valid_indices[sorted_order]
top_scores = valid_scores[sorted_order]
# Get chunk IDs for top results
top_ids = [self._chunk_ids[i] for i in top_indices]
# Fetch content only for top-k results (lazy loading)
# Fetch content only for top-k results (lazy loading) with category filtering
results = self._fetch_results_by_ids(
top_ids, top_scores.tolist(), return_full_content
top_ids, top_scores.tolist(), return_full_content, category
)
return results
# Apply final limit after category filtering
return results[:top_k]
def _fetch_results_by_ids(
self,
chunk_ids: List[int],
scores: List[float],
return_full_content: bool,
category: Optional[str] = None,
) -> List[SearchResult]:
"""Fetch full result data for specific chunk IDs.
@@ -954,6 +1034,7 @@ class VectorStore:
chunk_ids: List of chunk IDs to fetch.
scores: Corresponding similarity scores.
return_full_content: Whether to include full content.
category: Optional category filter ('code' or 'doc').
Returns:
List of SearchResult objects.
@@ -968,15 +1049,25 @@ class VectorStore:
# SQL injection prevention:
# - Only a validated placeholders string (commas + '?') is interpolated into the query.
# - User-provided values are passed separately via sqlite3 parameters.
# - Category filter is added as a separate parameter
if category:
query = """
SELECT id, file_path, content, metadata
FROM semantic_chunks
WHERE id IN ({placeholders}) AND category = ?
""".format(placeholders=placeholders)
params = list(chunk_ids) + [category]
else:
query = """
SELECT id, file_path, content, metadata
FROM semantic_chunks
WHERE id IN ({placeholders})
""".format(placeholders=placeholders)
params = chunk_ids
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA mmap_size = 30000000000")
rows = conn.execute(query, chunk_ids).fetchall()
rows = conn.execute(query, params).fetchall()
# Build ID -> row mapping
id_to_row = {r[0]: r for r in rows}

View File

@@ -30,8 +30,22 @@ class WatcherConfig:
"""Configuration for file watcher."""
debounce_ms: int = 1000
ignored_patterns: Set[str] = field(default_factory=lambda: {
".git", ".venv", "venv", "node_modules",
"__pycache__", ".codexlens", ".idea", ".vscode",
# Version control
".git", ".svn", ".hg",
# Python environments & cache
".venv", "venv", "env", "__pycache__", ".pytest_cache", ".mypy_cache", ".ruff_cache",
# Node.js
"node_modules", "bower_components", ".npm", ".yarn",
# Build artifacts
"dist", "build", "out", "target", "bin", "obj", "_build", "coverage", "htmlcov",
# IDE & Editor
".idea", ".vscode", ".vs", ".eclipse",
# CodexLens internal
".codexlens",
# Package manager caches
".cache", ".parcel-cache", ".turbo", ".next", ".nuxt",
# Logs & temp
"logs", "tmp", "temp",
})
languages: Optional[List[str]] = None # None = all supported