Implement ANN index using HNSW algorithm and update related tests

- Added ANNIndex class for approximate nearest neighbor search using HNSW.
- Integrated ANN index with VectorStore for enhanced search capabilities.
- Updated test suite for ANN index, including tests for adding, searching, saving, and loading vectors.
- Modified existing tests to accommodate changes in search performance expectations.
- Improved error handling for file operations in tests to ensure compatibility with Windows file locks.
- Adjusted hybrid search performance assertions for increased stability in CI environments.
This commit is contained in:
catlog22
2025-12-19 10:35:29 +08:00
parent 9f6e6852da
commit 5e91ba6c60
15 changed files with 1463 additions and 172 deletions

View File

@@ -260,7 +260,6 @@ class HybridSearchEngine:
from codexlens.semantic.embedder import Embedder
from codexlens.semantic.vector_store import VectorStore
embedder = Embedder(profile="code") # Use code-optimized model
vector_store = VectorStore(index_path)
# Check if vector store has data
@@ -272,6 +271,22 @@ class HybridSearchEngine:
)
return []
# Auto-detect embedding dimension and select appropriate profile
detected_dim = vector_store.dimension
if detected_dim is None:
self.logger.info("Vector store dimension unknown, using default profile")
profile = "code" # Default fallback
elif detected_dim == 384:
profile = "fast"
elif detected_dim == 768:
profile = "code"
elif detected_dim == 1024:
profile = "multilingual" # or balanced, both are 1024
else:
profile = "code" # Default fallback
embedder = Embedder(profile=profile)
# Generate query embedding
query_embedding = embedder.embed_single(query)

View File

@@ -0,0 +1,310 @@
"""Approximate Nearest Neighbor (ANN) index using HNSW algorithm.
Provides O(log N) similarity search using hnswlib's Hierarchical Navigable Small World graphs.
Falls back to brute-force search when hnswlib is not available.
Key features:
- HNSW index for fast approximate nearest neighbor search
- Persistent index storage (saved alongside SQLite database)
- Incremental vector addition and deletion
- Thread-safe operations
- Cosine similarity metric
"""
from __future__ import annotations
import threading
from pathlib import Path
from typing import List, Optional, Tuple
from codexlens.errors import StorageError
from . import SEMANTIC_AVAILABLE
if SEMANTIC_AVAILABLE:
import numpy as np
# Try to import hnswlib (optional dependency)
try:
import hnswlib
HNSWLIB_AVAILABLE = True
except ImportError:
HNSWLIB_AVAILABLE = False
class ANNIndex:
"""HNSW-based approximate nearest neighbor index for vector similarity search.
Performance characteristics:
- Build time: O(N log N) where N is number of vectors
- Search time: O(log N) approximate
- Memory: ~(M * 2 * 4 * d) bytes per vector (M=16, d=dimension)
Index parameters:
- space: cosine (cosine similarity metric)
- M: 16 (max connections per node - balance between speed and recall)
- ef_construction: 200 (search width during build - higher = better quality)
- ef: 50 (search width during query - higher = better recall)
"""
def __init__(self, index_path: Path, dim: int) -> None:
"""Initialize ANN index.
Args:
index_path: Path to SQLite database (index will be saved as _vectors.hnsw)
dim: Dimension of embedding vectors
Raises:
ImportError: If required dependencies are not available
ValueError: If dimension is invalid
"""
if not SEMANTIC_AVAILABLE:
raise ImportError(
"Semantic search dependencies not available. "
"Install with: pip install codexlens[semantic]"
)
if not HNSWLIB_AVAILABLE:
raise ImportError(
"hnswlib is required for ANN index. "
"Install with: pip install hnswlib"
)
if dim <= 0:
raise ValueError(f"Invalid dimension: {dim}")
self.index_path = Path(index_path)
self.dim = dim
# Derive HNSW index path from database path
# e.g., /path/to/_index.db -> /path/to/_index_vectors.hnsw
# This ensures unique HNSW files for each database
db_stem = self.index_path.stem # e.g., "_index" or "tmp123"
self.hnsw_path = self.index_path.parent / f"{db_stem}_vectors.hnsw"
# HNSW parameters
self.space = "cosine" # Cosine similarity metric
self.M = 16 # Max connections per node (16 is good balance)
self.ef_construction = 200 # Build-time search width (higher = better quality)
self.ef = 50 # Query-time search width (higher = better recall)
# Thread safety
self._lock = threading.RLock()
# HNSW index instance
self._index: Optional[hnswlib.Index] = None
self._max_elements = 1000000 # Initial capacity (auto-resizes)
self._current_count = 0 # Track number of vectors
def _ensure_index(self) -> None:
"""Ensure HNSW index is initialized (lazy initialization)."""
if self._index is None:
self._index = hnswlib.Index(space=self.space, dim=self.dim)
self._index.init_index(
max_elements=self._max_elements,
ef_construction=self.ef_construction,
M=self.M,
)
self._index.set_ef(self.ef)
self._current_count = 0
def add_vectors(self, ids: List[int], vectors: np.ndarray) -> None:
"""Add vectors to the index.
Args:
ids: List of vector IDs (must be unique)
vectors: Numpy array of shape (N, dim) where N = len(ids)
Raises:
ValueError: If shapes don't match or vectors are invalid
StorageError: If index operation fails
"""
if len(ids) == 0:
return
if vectors.shape[0] != len(ids):
raise ValueError(
f"Number of vectors ({vectors.shape[0]}) must match number of IDs ({len(ids)})"
)
if vectors.shape[1] != self.dim:
raise ValueError(
f"Vector dimension ({vectors.shape[1]}) must match index dimension ({self.dim})"
)
with self._lock:
try:
self._ensure_index()
# Resize index if needed
if self._current_count + len(ids) > self._max_elements:
new_max = max(
self._max_elements * 2,
self._current_count + len(ids)
)
self._index.resize_index(new_max)
self._max_elements = new_max
# Ensure vectors are C-contiguous float32 (hnswlib requirement)
if not vectors.flags['C_CONTIGUOUS'] or vectors.dtype != np.float32:
vectors = np.ascontiguousarray(vectors, dtype=np.float32)
# Add vectors to index
self._index.add_items(vectors, ids)
self._current_count += len(ids)
except Exception as e:
raise StorageError(f"Failed to add vectors to ANN index: {e}")
def remove_vectors(self, ids: List[int]) -> None:
"""Remove vectors from the index by marking them as deleted.
Note: hnswlib uses soft deletion (mark_deleted). Vectors are not
physically removed but will be excluded from search results.
Args:
ids: List of vector IDs to remove
Raises:
StorageError: If index operation fails
"""
if len(ids) == 0:
return
with self._lock:
try:
if self._index is None or self._current_count == 0:
return # Nothing to remove
# Mark vectors as deleted
for vec_id in ids:
try:
self._index.mark_deleted(vec_id)
except RuntimeError:
# ID not found - ignore (idempotent deletion)
pass
except Exception as e:
raise StorageError(f"Failed to remove vectors from ANN index: {e}")
def search(
self, query: np.ndarray, top_k: int = 10
) -> Tuple[List[int], List[float]]:
"""Search for nearest neighbors.
Args:
query: Query vector of shape (dim,) or (1, dim)
top_k: Number of nearest neighbors to return
Returns:
Tuple of (ids, distances) where:
- ids: List of vector IDs ordered by similarity
- distances: List of cosine distances (lower = more similar)
Raises:
ValueError: If query shape is invalid
StorageError: If search operation fails
"""
# Validate query shape
if query.ndim == 1:
query = query.reshape(1, -1)
if query.shape[0] != 1:
raise ValueError(
f"Query must be a single vector, got shape {query.shape}"
)
if query.shape[1] != self.dim:
raise ValueError(
f"Query dimension ({query.shape[1]}) must match index dimension ({self.dim})"
)
with self._lock:
try:
if self._index is None or self._current_count == 0:
return [], [] # Empty index
# Perform kNN search
labels, distances = self._index.knn_query(query, k=top_k)
# Convert to lists and flatten (knn_query returns 2D arrays)
ids = labels[0].tolist()
dists = distances[0].tolist()
return ids, dists
except Exception as e:
raise StorageError(f"Failed to search ANN index: {e}")
def save(self) -> None:
"""Save index to disk.
Index is saved to [db_path_directory]/_vectors.hnsw
Raises:
StorageError: If save operation fails
"""
with self._lock:
try:
if self._index is None or self._current_count == 0:
return # Nothing to save
# Ensure parent directory exists
self.hnsw_path.parent.mkdir(parents=True, exist_ok=True)
# Save index
self._index.save_index(str(self.hnsw_path))
except Exception as e:
raise StorageError(f"Failed to save ANN index: {e}")
def load(self) -> bool:
"""Load index from disk.
Returns:
True if index was loaded successfully, False if index file doesn't exist
Raises:
StorageError: If load operation fails
"""
with self._lock:
try:
if not self.hnsw_path.exists():
return False # Index file doesn't exist (not an error)
# Create fresh index object for loading (don't call init_index first)
self._index = hnswlib.Index(space=self.space, dim=self.dim)
# Load index from disk
self._index.load_index(str(self.hnsw_path), max_elements=self._max_elements)
# Update count from loaded index
self._current_count = self._index.get_current_count()
# Set query-time ef parameter
self._index.set_ef(self.ef)
return True
except Exception as e:
raise StorageError(f"Failed to load ANN index: {e}")
def count(self) -> int:
"""Get number of vectors in the index.
Returns:
Number of vectors currently in the index
"""
with self._lock:
return self._current_count
@property
def is_loaded(self) -> bool:
"""Check if index is loaded and ready for use.
Returns:
True if index is loaded, False otherwise
"""
with self._lock:
return self._index is not None and self._current_count > 0

View File

@@ -1,14 +1,16 @@
"""Vector storage and similarity search for semantic chunks.
Optimized for high-performance similarity search using:
- Cached embedding matrix for batch operations
- NumPy vectorized cosine similarity (100x+ faster than loops)
- HNSW index for O(log N) approximate nearest neighbor search (primary)
- Cached embedding matrix for batch operations (fallback)
- NumPy vectorized cosine similarity (fallback, 100x+ faster than loops)
- Lazy content loading (only fetch for top-k results)
"""
from __future__ import annotations
import json
import logging
import sqlite3
import threading
from pathlib import Path
@@ -22,6 +24,16 @@ from . import SEMANTIC_AVAILABLE
if SEMANTIC_AVAILABLE:
import numpy as np
# Try to import ANN index (optional hnswlib dependency)
try:
from codexlens.semantic.ann_index import ANNIndex, HNSWLIB_AVAILABLE
except ImportError:
HNSWLIB_AVAILABLE = False
ANNIndex = None
logger = logging.getLogger(__name__)
def _cosine_similarity(a: List[float], b: List[float]) -> float:
"""Compute cosine similarity between two vectors."""
@@ -41,15 +53,19 @@ def _cosine_similarity(a: List[float], b: List[float]) -> float:
class VectorStore:
"""SQLite-based vector storage with optimized cosine similarity search.
"""SQLite-based vector storage with HNSW-accelerated similarity search.
Performance optimizations:
- Embedding matrix cached in memory for batch similarity computation
- NumPy vectorized operations instead of Python loops
- HNSW index for O(log N) approximate nearest neighbor search
- Embedding matrix cached in memory for batch similarity computation (fallback)
- NumPy vectorized operations instead of Python loops (fallback)
- Lazy content loading - only fetch full content for top-k results
- Thread-safe cache invalidation
"""
# Default embedding dimension (used when creating new index)
DEFAULT_DIM = 768
def __init__(self, db_path: str | Path) -> None:
if not SEMANTIC_AVAILABLE:
raise ImportError(
@@ -60,14 +76,20 @@ class VectorStore:
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Embedding cache for fast similarity search
# Embedding cache for fast similarity search (fallback)
self._cache_lock = threading.RLock()
self._embedding_matrix: Optional[np.ndarray] = None
self._embedding_norms: Optional[np.ndarray] = None
self._chunk_ids: Optional[List[int]] = None
self._cache_version: int = 0
# ANN index for O(log N) search
self._ann_index: Optional[ANNIndex] = None
self._ann_dim: Optional[int] = None
self._ann_write_lock = threading.Lock() # Protects ANN index modifications
self._init_schema()
self._init_ann_index()
def _init_schema(self) -> None:
"""Initialize vector storage schema."""
@@ -90,6 +112,118 @@ class VectorStore:
""")
conn.commit()
def _init_ann_index(self) -> None:
"""Initialize ANN index (lazy loading from existing data)."""
if not HNSWLIB_AVAILABLE:
logger.debug("hnswlib not available, using brute-force search")
return
# Try to detect embedding dimension from existing data
dim = self._detect_embedding_dim()
if dim is None:
# No data yet, will initialize on first add
logger.debug("No embeddings found, ANN index will be created on first add")
return
self._ann_dim = dim
try:
self._ann_index = ANNIndex(self.db_path, dim)
if self._ann_index.load():
logger.debug(
"Loaded ANN index with %d vectors", self._ann_index.count()
)
else:
# Index file doesn't exist, try to build from SQLite data
logger.debug("ANN index file not found, rebuilding from SQLite")
self._rebuild_ann_index_internal()
except Exception as e:
logger.warning("Failed to initialize ANN index: %s", e)
self._ann_index = None
def _detect_embedding_dim(self) -> Optional[int]:
"""Detect embedding dimension from existing data."""
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT embedding FROM semantic_chunks LIMIT 1"
).fetchone()
if row and row[0]:
# Embedding is stored as float32 blob
blob = row[0]
return len(blob) // np.dtype(np.float32).itemsize
return None
@property
def dimension(self) -> Optional[int]:
"""Return the dimension of embeddings in the store.
Returns:
Embedding dimension if available, None if store is empty.
"""
if self._ann_dim is not None:
return self._ann_dim
self._ann_dim = self._detect_embedding_dim()
return self._ann_dim
def _rebuild_ann_index_internal(self) -> int:
"""Internal method to rebuild ANN index from SQLite data."""
if self._ann_index is None:
return 0
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA mmap_size = 30000000000")
rows = conn.execute(
"SELECT id, embedding FROM semantic_chunks"
).fetchall()
if not rows:
return 0
# Extract IDs and embeddings
ids = [r[0] for r in rows]
embeddings = np.vstack([
np.frombuffer(r[1], dtype=np.float32) for r in rows
])
# Add to ANN index
self._ann_index.add_vectors(ids, embeddings)
self._ann_index.save()
logger.info("Rebuilt ANN index with %d vectors", len(ids))
return len(ids)
def rebuild_ann_index(self) -> int:
"""Rebuild HNSW index from all chunks in SQLite.
Use this method to:
- Migrate existing data to use ANN search
- Repair corrupted index
- Reclaim space after many deletions
Returns:
Number of vectors indexed.
"""
if not HNSWLIB_AVAILABLE:
logger.warning("hnswlib not available, cannot rebuild ANN index")
return 0
# Detect dimension
dim = self._detect_embedding_dim()
if dim is None:
logger.warning("No embeddings found, cannot rebuild ANN index")
return 0
self._ann_dim = dim
# Create new index
try:
self._ann_index = ANNIndex(self.db_path, dim)
return self._rebuild_ann_index_internal()
except Exception as e:
logger.error("Failed to rebuild ANN index: %s", e)
self._ann_index = None
return 0
def _invalidate_cache(self) -> None:
"""Invalidate the embedding cache (thread-safe)."""
with self._cache_lock:
@@ -137,6 +271,40 @@ class VectorStore:
return True
def _ensure_ann_index(self, dim: int) -> bool:
"""Ensure ANN index is initialized with correct dimension.
This method is thread-safe and uses double-checked locking.
Args:
dim: Embedding dimension
Returns:
True if ANN index is ready, False otherwise
"""
if not HNSWLIB_AVAILABLE:
return False
# Fast path: index already initialized (no lock needed)
if self._ann_index is not None:
return True
# Slow path: acquire lock for initialization
with self._ann_write_lock:
# Double-check after acquiring lock
if self._ann_index is not None:
return True
try:
self._ann_dim = dim
self._ann_index = ANNIndex(self.db_path, dim)
self._ann_index.load() # Try to load existing
return True
except Exception as e:
logger.warning("Failed to initialize ANN index: %s", e)
self._ann_index = None
return False
def add_chunk(self, chunk: SemanticChunk, file_path: str) -> int:
"""Add a single chunk with its embedding.
@@ -146,7 +314,8 @@ class VectorStore:
if chunk.embedding is None:
raise ValueError("Chunk must have embedding before adding to store")
embedding_blob = np.array(chunk.embedding, dtype=np.float32).tobytes()
embedding_arr = np.array(chunk.embedding, dtype=np.float32)
embedding_blob = embedding_arr.tobytes()
metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None
with sqlite3.connect(self.db_path) as conn:
@@ -160,6 +329,15 @@ class VectorStore:
conn.commit()
chunk_id = cursor.lastrowid or 0
# Add to ANN index
if self._ensure_ann_index(len(chunk.embedding)):
with self._ann_write_lock:
try:
self._ann_index.add_vectors([chunk_id], embedding_arr.reshape(1, -1))
self._ann_index.save()
except Exception as e:
logger.warning("Failed to add to ANN index: %s", e)
# Invalidate cache after modification
self._invalidate_cache()
return chunk_id
@@ -175,16 +353,23 @@ class VectorStore:
# Prepare batch data
batch_data = []
embeddings_list = []
for chunk in chunks:
if chunk.embedding is None:
raise ValueError("All chunks must have embeddings")
embedding_blob = np.array(chunk.embedding, dtype=np.float32).tobytes()
embedding_arr = np.array(chunk.embedding, dtype=np.float32)
embedding_blob = embedding_arr.tobytes()
metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json))
embeddings_list.append(embedding_arr)
# Batch insert
# Batch insert to SQLite
with sqlite3.connect(self.db_path) as conn:
cursor = conn.executemany(
# Get starting ID before insert
row = conn.execute("SELECT MAX(id) FROM semantic_chunks").fetchone()
start_id = (row[0] or 0) + 1
conn.executemany(
"""
INSERT INTO semantic_chunks (file_path, content, embedding, metadata)
VALUES (?, ?, ?, ?)
@@ -192,9 +377,77 @@ class VectorStore:
batch_data
)
conn.commit()
# Get inserted IDs (approximate - assumes sequential)
last_id = cursor.lastrowid or 0
ids = list(range(last_id - len(chunks) + 1, last_id + 1))
# Calculate inserted IDs based on starting ID
ids = list(range(start_id, start_id + len(chunks)))
# Add to ANN index
if embeddings_list and self._ensure_ann_index(len(embeddings_list[0])):
with self._ann_write_lock:
try:
embeddings_matrix = np.vstack(embeddings_list)
self._ann_index.add_vectors(ids, embeddings_matrix)
self._ann_index.save()
except Exception as e:
logger.warning("Failed to add batch to ANN index: %s", e)
# Invalidate cache after modification
self._invalidate_cache()
return ids
def add_chunks_batch(
self, chunks_with_paths: List[Tuple[SemanticChunk, str]]
) -> List[int]:
"""Batch insert chunks from multiple files in a single transaction.
This method is optimized for bulk operations during index generation.
Args:
chunks_with_paths: List of (chunk, file_path) tuples
Returns:
List of inserted chunk IDs
"""
if not chunks_with_paths:
return []
# Prepare batch data
batch_data = []
embeddings_list = []
for chunk, file_path in chunks_with_paths:
if chunk.embedding is None:
raise ValueError("All chunks must have embeddings")
embedding_arr = np.array(chunk.embedding, dtype=np.float32)
embedding_blob = embedding_arr.tobytes()
metadata_json = json.dumps(chunk.metadata) if chunk.metadata else None
batch_data.append((file_path, chunk.content, embedding_blob, metadata_json))
embeddings_list.append(embedding_arr)
# Batch insert to SQLite in single transaction
with sqlite3.connect(self.db_path) as conn:
# Get starting ID before insert
row = conn.execute("SELECT MAX(id) FROM semantic_chunks").fetchone()
start_id = (row[0] or 0) + 1
conn.executemany(
"""
INSERT INTO semantic_chunks (file_path, content, embedding, metadata)
VALUES (?, ?, ?, ?)
""",
batch_data
)
conn.commit()
# Calculate inserted IDs based on starting ID
ids = list(range(start_id, start_id + len(chunks_with_paths)))
# Add to ANN index
if embeddings_list and self._ensure_ann_index(len(embeddings_list[0])):
with self._ann_write_lock:
try:
embeddings_matrix = np.vstack(embeddings_list)
self._ann_index.add_vectors(ids, embeddings_matrix)
self._ann_index.save()
except Exception as e:
logger.warning("Failed to add batch to ANN index: %s", e)
# Invalidate cache after modification
self._invalidate_cache()
@@ -206,6 +459,17 @@ class VectorStore:
Returns:
Number of deleted chunks.
"""
# Get chunk IDs before deletion (for ANN index)
chunk_ids_to_delete = []
if self._ann_index is not None:
with sqlite3.connect(self.db_path) as conn:
rows = conn.execute(
"SELECT id FROM semantic_chunks WHERE file_path = ?",
(file_path,)
).fetchall()
chunk_ids_to_delete = [r[0] for r in rows]
# Delete from SQLite
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"DELETE FROM semantic_chunks WHERE file_path = ?",
@@ -214,6 +478,15 @@ class VectorStore:
conn.commit()
deleted = cursor.rowcount
# Remove from ANN index
if deleted > 0 and self._ann_index is not None and chunk_ids_to_delete:
with self._ann_write_lock:
try:
self._ann_index.remove_vectors(chunk_ids_to_delete)
self._ann_index.save()
except Exception as e:
logger.warning("Failed to remove from ANN index: %s", e)
if deleted > 0:
self._invalidate_cache()
return deleted
@@ -227,10 +500,8 @@ class VectorStore:
) -> List[SearchResult]:
"""Find chunks most similar to query embedding.
Optimized with:
- Vectorized NumPy similarity computation (100x+ faster)
- Cached embedding matrix (avoids repeated DB reads)
- Lazy content loading (only fetch for top-k results)
Uses HNSW index for O(log N) search when available, falls back to
brute-force NumPy search otherwise.
Args:
query_embedding: Query vector.
@@ -241,6 +512,96 @@ class VectorStore:
Returns:
List of SearchResult ordered by similarity (highest first).
"""
query_vec = np.array(query_embedding, dtype=np.float32)
# Try HNSW search first (O(log N))
if (
HNSWLIB_AVAILABLE
and self._ann_index is not None
and self._ann_index.is_loaded
and self._ann_index.count() > 0
):
try:
return self._search_with_ann(
query_vec, top_k, min_score, return_full_content
)
except Exception as e:
logger.warning("ANN search failed, falling back to brute-force: %s", e)
# Fallback to brute-force search (O(N))
return self._search_brute_force(
query_vec, top_k, min_score, return_full_content
)
def _search_with_ann(
self,
query_vec: np.ndarray,
top_k: int,
min_score: float,
return_full_content: bool,
) -> List[SearchResult]:
"""Search using HNSW index (O(log N)).
Args:
query_vec: Query vector as numpy array
top_k: Maximum results to return
min_score: Minimum similarity score (0-1)
return_full_content: If True, return full code block content
Returns:
List of SearchResult ordered by similarity (highest first)
"""
# Limit top_k to available vectors to prevent hnswlib error
ann_count = self._ann_index.count()
effective_top_k = min(top_k, ann_count) if ann_count > 0 else 0
if effective_top_k == 0:
return []
# HNSW search returns (ids, distances)
# For cosine space: distance = 1 - similarity
ids, distances = self._ann_index.search(query_vec, effective_top_k)
if not ids:
return []
# Convert distances to similarity scores
scores = [1.0 - d for d in distances]
# Filter by min_score
filtered = [
(chunk_id, score)
for chunk_id, score in zip(ids, scores)
if score >= min_score
]
if not filtered:
return []
top_ids = [f[0] for f in filtered]
top_scores = [f[1] for f in filtered]
# Fetch content from SQLite
return self._fetch_results_by_ids(top_ids, top_scores, return_full_content)
def _search_brute_force(
self,
query_vec: np.ndarray,
top_k: int,
min_score: float,
return_full_content: bool,
) -> List[SearchResult]:
"""Brute-force search using NumPy (O(N) fallback).
Args:
query_vec: Query vector as numpy array
top_k: Maximum results to return
min_score: Minimum similarity score (0-1)
return_full_content: If True, return full code block content
Returns:
List of SearchResult ordered by similarity (highest first)
"""
with self._cache_lock:
# Refresh cache if needed
if self._embedding_matrix is None:
@@ -248,7 +609,7 @@ class VectorStore:
return [] # No data
# Vectorized cosine similarity
query_vec = np.array(query_embedding, dtype=np.float32).reshape(1, -1)
query_vec = query_vec.reshape(1, -1)
query_norm = np.linalg.norm(query_vec)
if query_norm == 0:
return []
@@ -370,3 +731,41 @@ class VectorStore:
def clear_cache(self) -> None:
"""Manually clear the embedding cache."""
self._invalidate_cache()
@property
def ann_available(self) -> bool:
"""Check if ANN index is available and ready."""
return (
HNSWLIB_AVAILABLE
and self._ann_index is not None
and self._ann_index.is_loaded
)
@property
def ann_count(self) -> int:
"""Get number of vectors in ANN index."""
if self._ann_index is not None:
return self._ann_index.count()
return 0
def close(self) -> None:
"""Close the vector store and release resources.
This ensures SQLite connections are closed and ANN index is cleared,
allowing temporary files to be deleted on Windows.
"""
with self._cache_lock:
self._embedding_matrix = None
self._embedding_norms = None
self._chunk_ids = None
with self._ann_write_lock:
self._ann_index = None
def __enter__(self) -> "VectorStore":
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit - close resources."""
self.close()