mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-06 01:54:11 +08:00
1069 lines
37 KiB
Python
1069 lines
37 KiB
Python
"""Approximate Nearest Neighbor (ANN) index using HNSW algorithm.
|
|
|
|
Provides O(log N) similarity search using hnswlib's Hierarchical Navigable Small World graphs.
|
|
Falls back to brute-force search when hnswlib is not available.
|
|
|
|
Key features:
|
|
- HNSW index for fast approximate nearest neighbor search
|
|
- Persistent index storage (saved alongside SQLite database)
|
|
- Incremental vector addition and deletion
|
|
- Thread-safe operations
|
|
- Cosine similarity metric
|
|
- Support for centralized storage mode (single index at project root)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import List, Optional, Tuple
|
|
|
|
from codexlens.errors import StorageError
|
|
from codexlens.config import VECTORS_HNSW_NAME
|
|
|
|
from . import SEMANTIC_AVAILABLE
|
|
|
|
if SEMANTIC_AVAILABLE:
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try to import hnswlib (optional dependency)
|
|
try:
|
|
import hnswlib
|
|
|
|
HNSWLIB_AVAILABLE = True
|
|
except ImportError:
|
|
HNSWLIB_AVAILABLE = False
|
|
|
|
|
|
class ANNIndex:
|
|
"""HNSW-based approximate nearest neighbor index for vector similarity search.
|
|
|
|
Performance characteristics:
|
|
- Build time: O(N log N) where N is number of vectors
|
|
- Search time: O(log N) approximate
|
|
- Memory: ~(M * 2 * 4 * d) bytes per vector (M=16, d=dimension)
|
|
|
|
Index parameters:
|
|
- space: cosine (cosine similarity metric)
|
|
- M: 16 (max connections per node - balance between speed and recall)
|
|
- ef_construction: 200 (search width during build - higher = better quality)
|
|
- ef: 50 (search width during query - higher = better recall)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
index_path: Path,
|
|
dim: int,
|
|
initial_capacity: int = 50000,
|
|
auto_save: bool = False,
|
|
expansion_threshold: float = 0.8,
|
|
) -> None:
|
|
"""Initialize ANN index.
|
|
|
|
Args:
|
|
index_path: Path to SQLite database (index will be saved as _vectors.hnsw)
|
|
dim: Dimension of embedding vectors
|
|
initial_capacity: Initial maximum elements capacity (default: 50000)
|
|
auto_save: Whether to automatically save index after operations (default: False)
|
|
expansion_threshold: Capacity threshold to trigger auto-expansion (default: 0.8)
|
|
|
|
Raises:
|
|
ImportError: If required dependencies are not available
|
|
ValueError: If dimension or capacity is invalid
|
|
"""
|
|
if not SEMANTIC_AVAILABLE:
|
|
raise ImportError(
|
|
"Semantic search dependencies not available. "
|
|
"Install with: pip install codexlens[semantic]"
|
|
)
|
|
|
|
if not HNSWLIB_AVAILABLE:
|
|
raise ImportError(
|
|
"hnswlib is required for ANN index. "
|
|
"Install with: pip install hnswlib"
|
|
)
|
|
|
|
if dim <= 0:
|
|
raise ValueError(f"Invalid dimension: {dim}")
|
|
|
|
if initial_capacity <= 0:
|
|
raise ValueError(f"Invalid initial capacity: {initial_capacity}")
|
|
|
|
if not 0.0 < expansion_threshold < 1.0:
|
|
raise ValueError(
|
|
f"Invalid expansion threshold: {expansion_threshold}. Must be between 0 and 1."
|
|
)
|
|
|
|
self.index_path = Path(index_path)
|
|
self.dim = dim
|
|
|
|
# Derive HNSW index path from database path
|
|
# e.g., /path/to/_index.db -> /path/to/_index_vectors.hnsw
|
|
# This ensures unique HNSW files for each database
|
|
db_stem = self.index_path.stem # e.g., "_index" or "tmp123"
|
|
self.hnsw_path = self.index_path.parent / f"{db_stem}_vectors.hnsw"
|
|
|
|
# HNSW parameters
|
|
self.space = "cosine" # Cosine similarity metric
|
|
self.M = 16 # Max connections per node (16 is good balance)
|
|
self.ef_construction = 200 # Build-time search width (higher = better quality)
|
|
self.ef = 50 # Query-time search width (higher = better recall)
|
|
|
|
# Memory management parameters
|
|
self._auto_save = auto_save
|
|
self._expansion_threshold = expansion_threshold
|
|
|
|
# Thread safety
|
|
self._lock = threading.RLock()
|
|
|
|
# HNSW index instance
|
|
self._index: Optional[hnswlib.Index] = None
|
|
self._max_elements = initial_capacity # Initial capacity (reduced from 1M to 50K)
|
|
self._current_count = 0 # Track number of vectors
|
|
|
|
logger.info(
|
|
f"Initialized ANNIndex with capacity={initial_capacity}, "
|
|
f"auto_save={auto_save}, expansion_threshold={expansion_threshold}"
|
|
)
|
|
|
|
@classmethod
|
|
def create_central(
|
|
cls,
|
|
index_root: Path,
|
|
dim: int,
|
|
initial_capacity: int = 50000,
|
|
auto_save: bool = False,
|
|
expansion_threshold: float = 0.8,
|
|
) -> "ANNIndex":
|
|
"""Create a centralized ANN index at the project index root.
|
|
|
|
This method creates a single shared HNSW index file at the project root,
|
|
rather than per-directory indexes. Use this for projects that want all
|
|
dense vectors stored in one central location.
|
|
|
|
Args:
|
|
index_root: Root directory for the index (e.g., .codexlens/<project_hash>/)
|
|
dim: Dimension of embedding vectors
|
|
initial_capacity: Initial maximum elements capacity (default: 50000)
|
|
auto_save: Whether to automatically save index after operations (default: False)
|
|
expansion_threshold: Capacity threshold to trigger auto-expansion (default: 0.8)
|
|
|
|
Returns:
|
|
ANNIndex instance configured for centralized storage
|
|
|
|
Example:
|
|
>>> index = ANNIndex.create_central(Path(".codexlens/abc123"), dim=768)
|
|
>>> index.hnsw_path # Returns: .codexlens/abc123/_vectors.hnsw
|
|
"""
|
|
# Create a dummy index_path that will result in the central hnsw_path
|
|
# The index_path is used to derive hnsw_path, so we create a virtual path
|
|
# such that self.hnsw_path = index_root / VECTORS_HNSW_NAME
|
|
instance = cls.__new__(cls)
|
|
|
|
if not SEMANTIC_AVAILABLE:
|
|
raise ImportError(
|
|
"Semantic search dependencies not available. "
|
|
"Install with: pip install codexlens[semantic]"
|
|
)
|
|
|
|
if not HNSWLIB_AVAILABLE:
|
|
raise ImportError(
|
|
"hnswlib is required for ANN index. "
|
|
"Install with: pip install hnswlib"
|
|
)
|
|
|
|
if dim <= 0:
|
|
raise ValueError(f"Invalid dimension: {dim}")
|
|
|
|
if initial_capacity <= 0:
|
|
raise ValueError(f"Invalid initial capacity: {initial_capacity}")
|
|
|
|
if not 0.0 < expansion_threshold < 1.0:
|
|
raise ValueError(
|
|
f"Invalid expansion threshold: {expansion_threshold}. Must be between 0 and 1."
|
|
)
|
|
|
|
instance.index_path = index_root
|
|
instance.dim = dim
|
|
|
|
# Centralized mode: use VECTORS_HNSW_NAME directly at index_root
|
|
instance.hnsw_path = index_root / VECTORS_HNSW_NAME
|
|
|
|
# HNSW parameters
|
|
instance.space = "cosine"
|
|
instance.M = 16
|
|
instance.ef_construction = 200
|
|
instance.ef = 50
|
|
|
|
# Memory management parameters
|
|
instance._auto_save = auto_save
|
|
instance._expansion_threshold = expansion_threshold
|
|
|
|
# Thread safety
|
|
instance._lock = threading.RLock()
|
|
|
|
# HNSW index instance
|
|
instance._index: Optional[hnswlib.Index] = None
|
|
instance._max_elements = initial_capacity
|
|
instance._current_count = 0
|
|
|
|
logger.info(
|
|
f"Initialized centralized ANNIndex at {instance.hnsw_path} with "
|
|
f"capacity={initial_capacity}, auto_save={auto_save}"
|
|
)
|
|
|
|
return instance
|
|
|
|
def _ensure_index(self) -> None:
|
|
"""Ensure HNSW index is initialized (lazy initialization)."""
|
|
if self._index is None:
|
|
self._index = hnswlib.Index(space=self.space, dim=self.dim)
|
|
self._index.init_index(
|
|
max_elements=self._max_elements,
|
|
ef_construction=self.ef_construction,
|
|
M=self.M,
|
|
)
|
|
self._index.set_ef(self.ef)
|
|
self._current_count = 0
|
|
logger.debug(f"Created new HNSW index with capacity {self._max_elements}")
|
|
|
|
def _auto_expand_if_needed(self, additional_count: int) -> None:
|
|
"""Auto-expand index capacity if threshold is reached.
|
|
|
|
Args:
|
|
additional_count: Number of vectors to be added
|
|
|
|
Note:
|
|
This is called internally by add_vectors and is thread-safe.
|
|
"""
|
|
usage_ratio = (self._current_count + additional_count) / self._max_elements
|
|
|
|
if usage_ratio >= self._expansion_threshold:
|
|
# Calculate new capacity (2x current or enough to fit new vectors)
|
|
new_capacity = max(
|
|
self._max_elements * 2,
|
|
self._current_count + additional_count,
|
|
)
|
|
|
|
logger.info(
|
|
f"Expanding index capacity: {self._max_elements} -> {new_capacity} "
|
|
f"(usage: {usage_ratio:.1%}, threshold: {self._expansion_threshold:.1%})"
|
|
)
|
|
|
|
self._index.resize_index(new_capacity)
|
|
self._max_elements = new_capacity
|
|
|
|
def add_vectors(self, ids: List[int], vectors: np.ndarray) -> None:
|
|
"""Add vectors to the index.
|
|
|
|
Args:
|
|
ids: List of vector IDs (must be unique)
|
|
vectors: Numpy array of shape (N, dim) where N = len(ids)
|
|
|
|
Raises:
|
|
ValueError: If shapes don't match or vectors are invalid
|
|
StorageError: If index operation fails
|
|
"""
|
|
if len(ids) == 0:
|
|
return
|
|
|
|
if vectors.shape[0] != len(ids):
|
|
raise ValueError(
|
|
f"Number of vectors ({vectors.shape[0]}) must match number of IDs ({len(ids)})"
|
|
)
|
|
|
|
if vectors.shape[1] != self.dim:
|
|
raise ValueError(
|
|
f"Vector dimension ({vectors.shape[1]}) must match index dimension ({self.dim})"
|
|
)
|
|
|
|
with self._lock:
|
|
try:
|
|
self._ensure_index()
|
|
|
|
# Auto-expand if threshold reached
|
|
self._auto_expand_if_needed(len(ids))
|
|
|
|
# Ensure vectors are C-contiguous float32 (hnswlib requirement)
|
|
if not vectors.flags['C_CONTIGUOUS'] or vectors.dtype != np.float32:
|
|
vectors = np.ascontiguousarray(vectors, dtype=np.float32)
|
|
|
|
# Add vectors to index
|
|
self._index.add_items(vectors, ids)
|
|
self._current_count += len(ids)
|
|
|
|
logger.debug(
|
|
f"Added {len(ids)} vectors to index "
|
|
f"(total: {self._current_count}/{self._max_elements})"
|
|
)
|
|
|
|
# Auto-save if enabled
|
|
if self._auto_save:
|
|
self.save()
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to add vectors to ANN index: {e}")
|
|
|
|
def remove_vectors(self, ids: List[int]) -> None:
|
|
"""Remove vectors from the index by marking them as deleted.
|
|
|
|
Note: hnswlib uses soft deletion (mark_deleted). Vectors are not
|
|
physically removed but will be excluded from search results.
|
|
|
|
Args:
|
|
ids: List of vector IDs to remove
|
|
|
|
Raises:
|
|
StorageError: If index operation fails
|
|
"""
|
|
if len(ids) == 0:
|
|
return
|
|
|
|
with self._lock:
|
|
try:
|
|
if self._index is None or self._current_count == 0:
|
|
return # Nothing to remove
|
|
|
|
# Mark vectors as deleted
|
|
deleted_count = 0
|
|
for vec_id in ids:
|
|
try:
|
|
self._index.mark_deleted(vec_id)
|
|
deleted_count += 1
|
|
except RuntimeError:
|
|
# ID not found - ignore (idempotent deletion)
|
|
pass
|
|
|
|
logger.debug(f"Marked {deleted_count}/{len(ids)} vectors as deleted")
|
|
|
|
# Auto-save if enabled
|
|
if self._auto_save and deleted_count > 0:
|
|
self.save()
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to remove vectors from ANN index: {e}")
|
|
|
|
def search(
|
|
self, query: np.ndarray, top_k: int = 10
|
|
) -> Tuple[List[int], List[float]]:
|
|
"""Search for nearest neighbors.
|
|
|
|
Args:
|
|
query: Query vector of shape (dim,) or (1, dim)
|
|
top_k: Number of nearest neighbors to return
|
|
|
|
Returns:
|
|
Tuple of (ids, distances) where:
|
|
- ids: List of vector IDs ordered by similarity
|
|
- distances: List of cosine distances (lower = more similar)
|
|
|
|
Raises:
|
|
ValueError: If query shape is invalid
|
|
StorageError: If search operation fails
|
|
"""
|
|
# Validate query shape
|
|
if query.ndim == 1:
|
|
query = query.reshape(1, -1)
|
|
|
|
if query.shape[0] != 1:
|
|
raise ValueError(
|
|
f"Query must be a single vector, got shape {query.shape}"
|
|
)
|
|
|
|
if query.shape[1] != self.dim:
|
|
raise ValueError(
|
|
f"Query dimension ({query.shape[1]}) must match index dimension ({self.dim})"
|
|
)
|
|
|
|
with self._lock:
|
|
try:
|
|
if self._index is None or self._current_count == 0:
|
|
return [], [] # Empty index
|
|
|
|
# Perform kNN search
|
|
labels, distances = self._index.knn_query(query, k=top_k)
|
|
|
|
# Convert to lists and flatten (knn_query returns 2D arrays)
|
|
ids = labels[0].tolist()
|
|
dists = distances[0].tolist()
|
|
|
|
return ids, dists
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to search ANN index: {e}")
|
|
|
|
def save(self) -> None:
|
|
"""Save index to disk.
|
|
|
|
Index is saved to [db_path_directory]/_vectors.hnsw
|
|
|
|
Raises:
|
|
StorageError: If save operation fails
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
if self._index is None or self._current_count == 0:
|
|
logger.debug("Skipping save: index is empty")
|
|
return # Nothing to save
|
|
|
|
# Ensure parent directory exists
|
|
self.hnsw_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save index
|
|
self._index.save_index(str(self.hnsw_path))
|
|
|
|
logger.debug(
|
|
f"Saved index to {self.hnsw_path} "
|
|
f"({self._current_count} vectors, capacity: {self._max_elements})"
|
|
)
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to save ANN index: {e}")
|
|
|
|
def load(self) -> bool:
|
|
"""Load index from disk.
|
|
|
|
Returns:
|
|
True if index was loaded successfully, False if index file doesn't exist
|
|
|
|
Raises:
|
|
StorageError: If load operation fails
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
if not self.hnsw_path.exists():
|
|
logger.debug(f"Index file not found: {self.hnsw_path}")
|
|
return False # Index file doesn't exist (not an error)
|
|
|
|
# Create fresh index object for loading (don't call init_index first)
|
|
self._index = hnswlib.Index(space=self.space, dim=self.dim)
|
|
|
|
# Load index from disk
|
|
# Note: max_elements here is just for initial allocation, can expand later
|
|
self._index.load_index(str(self.hnsw_path), max_elements=self._max_elements)
|
|
|
|
# Update count and capacity from loaded index
|
|
self._current_count = self._index.get_current_count()
|
|
self._max_elements = self._index.get_max_elements()
|
|
|
|
# Set query-time ef parameter
|
|
self._index.set_ef(self.ef)
|
|
|
|
logger.info(
|
|
f"Loaded index from {self.hnsw_path} "
|
|
f"({self._current_count} vectors, capacity: {self._max_elements})"
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to load ANN index: {e}")
|
|
|
|
def count(self) -> int:
|
|
"""Get number of vectors in the index.
|
|
|
|
Returns:
|
|
Number of vectors currently in the index
|
|
"""
|
|
with self._lock:
|
|
return self._current_count
|
|
|
|
@property
|
|
def capacity(self) -> int:
|
|
"""Get current maximum capacity of the index.
|
|
|
|
Returns:
|
|
Maximum number of vectors the index can hold before expansion
|
|
"""
|
|
with self._lock:
|
|
return self._max_elements
|
|
|
|
@property
|
|
def usage_ratio(self) -> float:
|
|
"""Get current usage ratio (count / capacity).
|
|
|
|
Returns:
|
|
Usage ratio between 0.0 and 1.0
|
|
"""
|
|
with self._lock:
|
|
if self._max_elements == 0:
|
|
return 0.0
|
|
return self._current_count / self._max_elements
|
|
|
|
@property
|
|
def is_loaded(self) -> bool:
|
|
"""Check if index is loaded and ready for use.
|
|
|
|
Returns:
|
|
True if index is loaded, False otherwise
|
|
"""
|
|
with self._lock:
|
|
return self._index is not None and self._current_count > 0
|
|
|
|
|
|
|
|
class BinaryANNIndex:
|
|
"""Binary vector ANN index using Hamming distance for fast coarse retrieval.
|
|
|
|
.. deprecated::
|
|
This class is deprecated. Use :class:`codexlens.search.binary_searcher.BinarySearcher`
|
|
instead, which provides faster memory-mapped search with centralized storage.
|
|
|
|
Optimized for binary vectors (256-bit / 32 bytes per vector).
|
|
Uses packed binary representation for memory efficiency.
|
|
|
|
Performance characteristics:
|
|
- Storage: 32 bytes per vector (vs ~8KB for dense vectors)
|
|
- Distance: Hamming distance via XOR + popcount (CPU-efficient)
|
|
- Search: O(N) brute-force with SIMD-accelerated distance computation
|
|
|
|
Index parameters:
|
|
- dim: Binary vector dimension (default: 256)
|
|
- packed_dim: Packed bytes size (dim / 8 = 32 for 256-bit)
|
|
|
|
Usage:
|
|
index = BinaryANNIndex(index_path, dim=256)
|
|
index.add_vectors([1, 2, 3], packed_vectors) # List of 32-byte packed vectors
|
|
ids, distances = index.search(query_packed, top_k=10)
|
|
"""
|
|
|
|
DEFAULT_DIM = 256 # Default binary vector dimension
|
|
|
|
def __init__(
|
|
self,
|
|
index_path: Path,
|
|
dim: int = 256,
|
|
initial_capacity: int = 100000,
|
|
auto_save: bool = False,
|
|
) -> None:
|
|
"""Initialize Binary ANN index.
|
|
|
|
Args:
|
|
index_path: Path to database (index will be saved as _binary_vectors.bin)
|
|
dim: Dimension of binary vectors (default: 256)
|
|
initial_capacity: Initial capacity hint (default: 100000)
|
|
auto_save: Whether to automatically save index after operations
|
|
|
|
Raises:
|
|
ImportError: If required dependencies are not available
|
|
ValueError: If dimension is invalid
|
|
"""
|
|
if not SEMANTIC_AVAILABLE:
|
|
raise ImportError(
|
|
"Semantic search dependencies not available. "
|
|
"Install with: pip install codexlens[semantic]"
|
|
)
|
|
|
|
import warnings
|
|
warnings.warn(
|
|
"BinaryANNIndex is deprecated. Use codexlens.search.binary_searcher.BinarySearcher "
|
|
"instead for faster memory-mapped search with centralized storage.",
|
|
DeprecationWarning,
|
|
stacklevel=2
|
|
)
|
|
|
|
if dim <= 0 or dim % 8 != 0:
|
|
raise ValueError(
|
|
f"Invalid dimension: {dim}. Must be positive and divisible by 8."
|
|
)
|
|
|
|
self.index_path = Path(index_path)
|
|
self.dim = dim
|
|
self.packed_dim = dim // 8 # 32 bytes for 256-bit vectors
|
|
|
|
# Derive binary index path from database path
|
|
db_stem = self.index_path.stem
|
|
self.binary_path = self.index_path.parent / f"{db_stem}_binary_vectors.bin"
|
|
|
|
# Memory management
|
|
self._auto_save = auto_save
|
|
self._initial_capacity = initial_capacity
|
|
|
|
# Thread safety
|
|
self._lock = threading.RLock()
|
|
|
|
# In-memory storage: id -> packed binary vector
|
|
self._vectors: dict[int, bytes] = {}
|
|
self._id_list: list[int] = [] # Ordered list for efficient iteration
|
|
|
|
# Cached numpy array for vectorized search (invalidated on add/remove)
|
|
self._vectors_matrix: Optional[np.ndarray] = None
|
|
self._ids_array: Optional[np.ndarray] = None
|
|
self._cache_valid: bool = False
|
|
|
|
logger.info(
|
|
f"Initialized BinaryANNIndex with dim={dim}, packed_dim={self.packed_dim}"
|
|
)
|
|
|
|
def add_vectors(self, ids: List[int], vectors: List[bytes]) -> None:
|
|
"""Add packed binary vectors to the index.
|
|
|
|
Args:
|
|
ids: List of vector IDs (must be unique)
|
|
vectors: List of packed binary vectors (each of size packed_dim bytes)
|
|
|
|
Raises:
|
|
ValueError: If shapes don't match or vectors are invalid
|
|
StorageError: If index operation fails
|
|
"""
|
|
if len(ids) == 0:
|
|
return
|
|
|
|
if len(vectors) != len(ids):
|
|
raise ValueError(
|
|
f"Number of vectors ({len(vectors)}) must match number of IDs ({len(ids)})"
|
|
)
|
|
|
|
# Validate vector sizes
|
|
for i, vec in enumerate(vectors):
|
|
if len(vec) != self.packed_dim:
|
|
raise ValueError(
|
|
f"Vector {i} has size {len(vec)}, expected {self.packed_dim}"
|
|
)
|
|
|
|
with self._lock:
|
|
try:
|
|
for vec_id, vec in zip(ids, vectors):
|
|
if vec_id not in self._vectors:
|
|
self._id_list.append(vec_id)
|
|
self._vectors[vec_id] = vec
|
|
|
|
# Invalidate cache on modification
|
|
self._cache_valid = False
|
|
|
|
logger.debug(
|
|
f"Added {len(ids)} binary vectors to index (total: {len(self._vectors)})"
|
|
)
|
|
|
|
if self._auto_save:
|
|
self.save()
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to add vectors to Binary ANN index: {e}")
|
|
|
|
def add_vectors_numpy(self, ids: List[int], vectors: np.ndarray) -> None:
|
|
"""Add unpacked binary vectors (0/1 values) to the index.
|
|
|
|
Convenience method that packs the vectors before adding.
|
|
|
|
Args:
|
|
ids: List of vector IDs (must be unique)
|
|
vectors: Numpy array of shape (N, dim) with binary values (0 or 1)
|
|
|
|
Raises:
|
|
ValueError: If shapes don't match
|
|
StorageError: If index operation fails
|
|
"""
|
|
if len(ids) == 0:
|
|
return
|
|
|
|
if vectors.shape[0] != len(ids):
|
|
raise ValueError(
|
|
f"Number of vectors ({vectors.shape[0]}) must match number of IDs ({len(ids)})"
|
|
)
|
|
|
|
if vectors.shape[1] != self.dim:
|
|
raise ValueError(
|
|
f"Vector dimension ({vectors.shape[1]}) must match index dimension ({self.dim})"
|
|
)
|
|
|
|
# Pack vectors
|
|
packed_vectors = []
|
|
for i in range(vectors.shape[0]):
|
|
packed = np.packbits(vectors[i].astype(np.uint8)).tobytes()
|
|
packed_vectors.append(packed)
|
|
|
|
self.add_vectors(ids, packed_vectors)
|
|
|
|
def remove_vectors(self, ids: List[int]) -> None:
|
|
"""Remove vectors from the index.
|
|
|
|
Args:
|
|
ids: List of vector IDs to remove
|
|
|
|
Raises:
|
|
StorageError: If index operation fails
|
|
|
|
Note:
|
|
Optimized for batch deletion using set operations instead of
|
|
O(N) list.remove() calls for each ID.
|
|
"""
|
|
if len(ids) == 0:
|
|
return
|
|
|
|
with self._lock:
|
|
try:
|
|
# Use set for O(1) lookup during filtering
|
|
ids_to_remove = set(ids)
|
|
removed_count = 0
|
|
|
|
# Remove from dictionary - O(1) per deletion
|
|
for vec_id in ids_to_remove:
|
|
if vec_id in self._vectors:
|
|
del self._vectors[vec_id]
|
|
removed_count += 1
|
|
|
|
# Rebuild ID list efficiently - O(N) once instead of O(N) per removal
|
|
if removed_count > 0:
|
|
self._id_list = [id_ for id_ in self._id_list if id_ not in ids_to_remove]
|
|
# Invalidate cache on modification
|
|
self._cache_valid = False
|
|
|
|
logger.debug(f"Removed {removed_count}/{len(ids)} vectors from index")
|
|
|
|
if self._auto_save and removed_count > 0:
|
|
self.save()
|
|
|
|
except Exception as e:
|
|
raise StorageError(
|
|
f"Failed to remove vectors from Binary ANN index: {e}"
|
|
)
|
|
|
|
def _build_cache(self) -> None:
|
|
"""Build numpy array cache from vectors dict for vectorized search.
|
|
|
|
Pre-computes a contiguous numpy array from all vectors for efficient
|
|
batch distance computation. Called lazily on first search after modification.
|
|
"""
|
|
if self._cache_valid:
|
|
return
|
|
|
|
n_vectors = len(self._id_list)
|
|
if n_vectors == 0:
|
|
self._vectors_matrix = None
|
|
self._ids_array = None
|
|
self._cache_valid = True
|
|
return
|
|
|
|
# Build contiguous numpy array of all packed vectors
|
|
# Shape: (n_vectors, packed_dim) with uint8 dtype
|
|
self._vectors_matrix = np.empty((n_vectors, self.packed_dim), dtype=np.uint8)
|
|
self._ids_array = np.array(self._id_list, dtype=np.int64)
|
|
|
|
for i, vec_id in enumerate(self._id_list):
|
|
vec_bytes = self._vectors[vec_id]
|
|
self._vectors_matrix[i] = np.frombuffer(vec_bytes, dtype=np.uint8)
|
|
|
|
self._cache_valid = True
|
|
logger.debug(f"Built vectorized cache for {n_vectors} binary vectors")
|
|
|
|
def search(
|
|
self, query: bytes, top_k: int = 10
|
|
) -> Tuple[List[int], List[int]]:
|
|
"""Search for nearest neighbors using Hamming distance.
|
|
|
|
Uses vectorized batch computation for O(N) search with SIMD acceleration.
|
|
Pre-computes and caches numpy arrays for efficient repeated queries.
|
|
|
|
Args:
|
|
query: Packed binary query vector (size: packed_dim bytes)
|
|
top_k: Number of nearest neighbors to return
|
|
|
|
Returns:
|
|
Tuple of (ids, distances) where:
|
|
- ids: List of vector IDs ordered by Hamming distance (ascending)
|
|
- distances: List of Hamming distances (lower = more similar)
|
|
|
|
Raises:
|
|
ValueError: If query size is invalid
|
|
StorageError: If search operation fails
|
|
"""
|
|
if len(query) != self.packed_dim:
|
|
raise ValueError(
|
|
f"Query size ({len(query)}) must match packed_dim ({self.packed_dim})"
|
|
)
|
|
|
|
with self._lock:
|
|
try:
|
|
if len(self._vectors) == 0:
|
|
return [], []
|
|
|
|
# Build cache if needed (lazy initialization)
|
|
self._build_cache()
|
|
|
|
if self._vectors_matrix is None or self._ids_array is None:
|
|
return [], []
|
|
|
|
# Vectorized Hamming distance computation
|
|
# 1. Convert query to numpy array
|
|
query_arr = np.frombuffer(query, dtype=np.uint8)
|
|
|
|
# 2. Broadcast XOR: (1, packed_dim) XOR (n_vectors, packed_dim)
|
|
# Result shape: (n_vectors, packed_dim)
|
|
xor_result = np.bitwise_xor(query_arr, self._vectors_matrix)
|
|
|
|
# 3. Vectorized popcount using lookup table for efficiency
|
|
# np.unpackbits is slow for large arrays, use popcount LUT instead
|
|
popcount_lut = np.array([bin(i).count('1') for i in range(256)], dtype=np.uint8)
|
|
bit_counts = popcount_lut[xor_result]
|
|
|
|
# 4. Sum across packed bytes to get Hamming distance per vector
|
|
distances = bit_counts.sum(axis=1)
|
|
|
|
# 5. Get top-k using argpartition (O(N) instead of O(N log N) for full sort)
|
|
n_vectors = len(distances)
|
|
k = min(top_k, n_vectors)
|
|
|
|
if k == n_vectors:
|
|
# No partitioning needed, just sort all
|
|
sorted_indices = np.argsort(distances)
|
|
else:
|
|
# Use argpartition for O(N) partial sort
|
|
partition_indices = np.argpartition(distances, k)[:k]
|
|
# Sort only the top-k
|
|
top_k_distances = distances[partition_indices]
|
|
sorted_order = np.argsort(top_k_distances)
|
|
sorted_indices = partition_indices[sorted_order]
|
|
|
|
# 6. Return results
|
|
result_ids = self._ids_array[sorted_indices].tolist()
|
|
result_dists = distances[sorted_indices].tolist()
|
|
|
|
return result_ids, result_dists
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to search Binary ANN index: {e}")
|
|
|
|
def search_numpy(
|
|
self, query: np.ndarray, top_k: int = 10
|
|
) -> Tuple[List[int], List[int]]:
|
|
"""Search with unpacked binary query vector.
|
|
|
|
Convenience method that packs the query before searching.
|
|
|
|
Args:
|
|
query: Binary query vector of shape (dim,) with values 0 or 1
|
|
top_k: Number of nearest neighbors to return
|
|
|
|
Returns:
|
|
Tuple of (ids, distances)
|
|
"""
|
|
if query.ndim == 2:
|
|
query = query.flatten()
|
|
|
|
if len(query) != self.dim:
|
|
raise ValueError(
|
|
f"Query dimension ({len(query)}) must match index dimension ({self.dim})"
|
|
)
|
|
|
|
packed_query = np.packbits(query.astype(np.uint8)).tobytes()
|
|
return self.search(packed_query, top_k)
|
|
|
|
def search_batch(
|
|
self, queries: List[bytes], top_k: int = 10
|
|
) -> List[Tuple[List[int], List[int]]]:
|
|
"""Batch search for multiple queries.
|
|
|
|
Args:
|
|
queries: List of packed binary query vectors
|
|
top_k: Number of nearest neighbors to return per query
|
|
|
|
Returns:
|
|
List of (ids, distances) tuples, one per query
|
|
"""
|
|
results = []
|
|
for query in queries:
|
|
ids, dists = self.search(query, top_k)
|
|
results.append((ids, dists))
|
|
return results
|
|
|
|
def save(self) -> None:
|
|
"""Save index to disk.
|
|
|
|
Binary format:
|
|
- 4 bytes: magic number (0x42494E56 = "BINV")
|
|
- 4 bytes: version (1)
|
|
- 4 bytes: dim
|
|
- 4 bytes: packed_dim
|
|
- 4 bytes: num_vectors
|
|
- For each vector:
|
|
- 4 bytes: id
|
|
- packed_dim bytes: vector data
|
|
|
|
Raises:
|
|
StorageError: If save operation fails
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
if len(self._vectors) == 0:
|
|
logger.debug("Skipping save: index is empty")
|
|
return
|
|
|
|
# Ensure parent directory exists
|
|
self.binary_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(self.binary_path, "wb") as f:
|
|
# Header
|
|
f.write(b"BINV") # Magic number
|
|
f.write(np.array([1], dtype=np.uint32).tobytes()) # Version
|
|
f.write(np.array([self.dim], dtype=np.uint32).tobytes())
|
|
f.write(np.array([self.packed_dim], dtype=np.uint32).tobytes())
|
|
f.write(
|
|
np.array([len(self._vectors)], dtype=np.uint32).tobytes()
|
|
)
|
|
|
|
# Vectors
|
|
for vec_id in self._id_list:
|
|
f.write(np.array([vec_id], dtype=np.uint32).tobytes())
|
|
f.write(self._vectors[vec_id])
|
|
|
|
logger.debug(
|
|
f"Saved binary index to {self.binary_path} "
|
|
f"({len(self._vectors)} vectors)"
|
|
)
|
|
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to save Binary ANN index: {e}")
|
|
|
|
def load(self) -> bool:
|
|
"""Load index from disk.
|
|
|
|
Returns:
|
|
True if index was loaded successfully, False if index file doesn't exist
|
|
|
|
Raises:
|
|
StorageError: If load operation fails
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
if not self.binary_path.exists():
|
|
logger.debug(f"Binary index file not found: {self.binary_path}")
|
|
return False
|
|
|
|
with open(self.binary_path, "rb") as f:
|
|
# Read header
|
|
magic = f.read(4)
|
|
if magic != b"BINV":
|
|
raise StorageError(
|
|
f"Invalid binary index file: bad magic number"
|
|
)
|
|
|
|
version = np.frombuffer(f.read(4), dtype=np.uint32)[0]
|
|
if version != 1:
|
|
raise StorageError(
|
|
f"Unsupported binary index version: {version}"
|
|
)
|
|
|
|
file_dim = np.frombuffer(f.read(4), dtype=np.uint32)[0]
|
|
file_packed_dim = np.frombuffer(f.read(4), dtype=np.uint32)[0]
|
|
num_vectors = np.frombuffer(f.read(4), dtype=np.uint32)[0]
|
|
|
|
if file_dim != self.dim or file_packed_dim != self.packed_dim:
|
|
raise StorageError(
|
|
f"Dimension mismatch: file has dim={file_dim}, "
|
|
f"packed_dim={file_packed_dim}, "
|
|
f"expected dim={self.dim}, packed_dim={self.packed_dim}"
|
|
)
|
|
|
|
# Clear existing data
|
|
self._vectors.clear()
|
|
self._id_list.clear()
|
|
self._cache_valid = False
|
|
|
|
# Read vectors
|
|
for _ in range(num_vectors):
|
|
vec_id = np.frombuffer(f.read(4), dtype=np.uint32)[0]
|
|
vec_data = f.read(self.packed_dim)
|
|
self._vectors[int(vec_id)] = vec_data
|
|
self._id_list.append(int(vec_id))
|
|
|
|
logger.info(
|
|
f"Loaded binary index from {self.binary_path} "
|
|
f"({len(self._vectors)} vectors)"
|
|
)
|
|
|
|
return True
|
|
|
|
except StorageError:
|
|
raise
|
|
except Exception as e:
|
|
raise StorageError(f"Failed to load Binary ANN index: {e}")
|
|
|
|
def count(self) -> int:
|
|
"""Get number of vectors in the index.
|
|
|
|
Returns:
|
|
Number of vectors currently in the index
|
|
"""
|
|
with self._lock:
|
|
return len(self._vectors)
|
|
|
|
@property
|
|
def is_loaded(self) -> bool:
|
|
"""Check if index has vectors.
|
|
|
|
Returns:
|
|
True if index has vectors, False otherwise
|
|
"""
|
|
with self._lock:
|
|
return len(self._vectors) > 0
|
|
|
|
def get_vector(self, vec_id: int) -> Optional[bytes]:
|
|
"""Get a specific vector by ID.
|
|
|
|
Args:
|
|
vec_id: Vector ID to retrieve
|
|
|
|
Returns:
|
|
Packed binary vector or None if not found
|
|
"""
|
|
with self._lock:
|
|
return self._vectors.get(vec_id)
|
|
|
|
def clear(self) -> None:
|
|
"""Clear all vectors from the index."""
|
|
with self._lock:
|
|
self._vectors.clear()
|
|
self._id_list.clear()
|
|
self._vectors_matrix = None
|
|
self._ids_array = None
|
|
self._cache_valid = False
|
|
logger.debug("Cleared binary index")
|
|
|
|
|
|
def create_ann_index(
|
|
index_path: Path,
|
|
index_type: str = "hnsw",
|
|
dim: int = 2048,
|
|
**kwargs,
|
|
) -> ANNIndex | BinaryANNIndex:
|
|
"""Factory function to create an ANN index.
|
|
|
|
Args:
|
|
index_path: Path to database file
|
|
index_type: Type of index - "hnsw" for dense vectors, "binary" for binary vectors
|
|
dim: Vector dimension (default: 2048 for dense, 256 for binary)
|
|
**kwargs: Additional arguments passed to the index constructor
|
|
|
|
Returns:
|
|
ANNIndex for dense vectors or BinaryANNIndex for binary vectors
|
|
|
|
Raises:
|
|
ValueError: If index_type is invalid
|
|
|
|
Example:
|
|
>>> # Dense vector index (HNSW)
|
|
>>> dense_index = create_ann_index(path, index_type="hnsw", dim=2048)
|
|
>>> dense_index.add_vectors(ids, dense_vectors)
|
|
>>>
|
|
>>> # Binary vector index (Hamming distance)
|
|
>>> binary_index = create_ann_index(path, index_type="binary", dim=256)
|
|
>>> binary_index.add_vectors(ids, packed_vectors)
|
|
"""
|
|
index_type = index_type.lower()
|
|
|
|
if index_type == "hnsw":
|
|
return ANNIndex(index_path=index_path, dim=dim, **kwargs)
|
|
elif index_type == "binary":
|
|
# Default to 256 for binary if not specified
|
|
if dim == 2048: # Default dense dim was used
|
|
dim = 256
|
|
return BinaryANNIndex(index_path=index_path, dim=dim, **kwargs)
|
|
else:
|
|
raise ValueError(
|
|
f"Invalid index_type: {index_type}. Must be 'hnsw' or 'binary'."
|
|
)
|