Implement SPLADE sparse encoder and associated database migrations

- Added `splade_encoder.py` for ONNX-optimized SPLADE encoding, including methods for encoding text and batch processing.
- Created `SPLADE_IMPLEMENTATION.md` to document the SPLADE encoder's functionality, design patterns, and integration points.
- Introduced migration script `migration_009_add_splade.py` to add SPLADE metadata and posting list tables to the database.
- Developed `splade_index.py` for managing the SPLADE inverted index, supporting efficient sparse vector retrieval.
- Added verification script `verify_watcher.py` to test FileWatcher event filtering and debouncing functionality.
This commit is contained in:
catlog22
2026-01-01 17:41:22 +08:00
parent 520f2d26f2
commit 5bb01755bc
16 changed files with 3122 additions and 2792 deletions

View File

@@ -0,0 +1,225 @@
# SPLADE Encoder Implementation
## Overview
Created `splade_encoder.py` - A complete ONNX-optimized SPLADE sparse encoder for code search.
## File Location
`src/codexlens/semantic/splade_encoder.py` (474 lines)
## Key Components
### 1. Dependency Checking
**Function**: `check_splade_available() -> Tuple[bool, Optional[str]]`
- Validates numpy, onnxruntime, optimum, transformers availability
- Returns (True, None) if all dependencies present
- Returns (False, error_message) with install instructions if missing
### 2. Caching System
**Global Cache**: Thread-safe singleton pattern
- `_splade_cache: Dict[str, SpladeEncoder]` - Global encoder cache
- `_cache_lock: threading.RLock()` - Thread safety lock
**Factory Function**: `get_splade_encoder(...) -> SpladeEncoder`
- Cache key includes: model_name, gpu/cpu, max_length, sparsity_threshold
- Pre-loads model on first access
- Returns cached instance on subsequent calls
**Cleanup Function**: `clear_splade_cache() -> None`
- Releases ONNX resources
- Clears model and tokenizer references
- Prevents memory leaks
### 3. SpladeEncoder Class
#### Initialization Parameters
- `model_name: str` - Default: "naver/splade-cocondenser-ensembledistil"
- `use_gpu: bool` - Enable GPU acceleration (default: True)
- `max_length: int` - Max sequence length (default: 512)
- `sparsity_threshold: float` - Min weight threshold (default: 0.01)
- `providers: Optional[List]` - Explicit ONNX providers (overrides use_gpu)
#### Core Methods
**`_load_model()`**: Lazy loading with GPU support
- Uses `optimum.onnxruntime.ORTModelForMaskedLM`
- Falls back to CPU if GPU unavailable
- Integrates with `gpu_support.get_optimal_providers()`
- Handles device_id options for DirectML/CUDA
**`_splade_activation(logits, attention_mask)`**: Static method
- Formula: `log(1 + ReLU(logits)) * attention_mask`
- Input: (batch, seq_len, vocab_size)
- Output: (batch, seq_len, vocab_size)
**`_max_pooling(splade_repr)`**: Static method
- Max pooling over sequence dimension
- Input: (batch, seq_len, vocab_size)
- Output: (batch, vocab_size)
**`_to_sparse_dict(dense_vec)`**: Conversion helper
- Filters by sparsity_threshold
- Returns: `Dict[int, float]` mapping token_id to weight
**`encode_text(text: str) -> Dict[int, float]`**: Single text encoding
- Tokenizes input with truncation/padding
- Forward pass through ONNX model
- Applies SPLADE activation + max pooling
- Returns sparse vector
**`encode_batch(texts: List[str], batch_size: int = 32) -> List[Dict[int, float]]`**: Batch encoding
- Processes in batches for memory efficiency
- Returns list of sparse vectors
#### Properties
**`vocab_size: int`**: Vocabulary size (~30k for BERT)
- Cached after first model load
- Returns tokenizer length
#### Debugging Methods
**`get_token(token_id: int) -> str`**
- Converts token_id to human-readable string
- Uses tokenizer.decode()
**`get_top_tokens(sparse_vec: Dict[int, float], top_k: int = 10) -> List[Tuple[str, float]]`**
- Extracts top-k highest-weight tokens
- Returns (token_string, weight) pairs
- Useful for understanding model focus
## Design Patterns Followed
### 1. From `onnx_reranker.py`
✓ ONNX loading with provider detection
✓ Lazy model initialization
✓ Thread-safe loading with RLock
✓ Signature inspection for backward compatibility
✓ Fallback for older Optimum versions
✓ Static helper methods for numerical operations
### 2. From `embedder.py`
✓ Global cache with thread safety
✓ Factory function pattern (get_splade_encoder)
✓ Cache cleanup function (clear_splade_cache)
✓ GPU provider configuration
✓ Batch processing support
### 3. From `gpu_support.py`
`get_optimal_providers(use_gpu, with_device_options=True)`
✓ Device ID options for DirectML/CUDA
✓ Provider tuple format: (provider_name, options_dict)
## SPLADE Algorithm
### Activation Formula
```python
# Step 1: ReLU activation
relu_logits = max(0, logits)
# Step 2: Log(1 + x) transformation
log_relu = log(1 + relu_logits)
# Step 3: Apply attention mask
splade_repr = log_relu * attention_mask
# Step 4: Max pooling over sequence
splade_vec = max(splade_repr, axis=sequence_length)
# Step 5: Sparsification by threshold
sparse_dict = {token_id: weight for token_id, weight in enumerate(splade_vec) if weight > threshold}
```
### Output Format
- Sparse dictionary: `{token_id: weight}`
- Token IDs: 0 to vocab_size-1 (typically ~30,000)
- Weights: Float values > sparsity_threshold
- Interpretable: Can decode token_ids to strings
## Integration Points
### With `splade_index.py`
- `SpladeIndex.add_posting(chunk_id, sparse_vec: Dict[int, float])`
- `SpladeIndex.search(query_sparse: Dict[int, float])`
- Encoder produces the sparse vectors consumed by index
### With Indexing Pipeline
```python
encoder = get_splade_encoder(use_gpu=True)
# Single document
sparse_vec = encoder.encode_text("def main():\n print('hello')")
index.add_posting(chunk_id=1, sparse_vec=sparse_vec)
# Batch indexing
texts = ["code chunk 1", "code chunk 2", ...]
sparse_vecs = encoder.encode_batch(texts, batch_size=64)
postings = [(chunk_id, vec) for chunk_id, vec in enumerate(sparse_vecs)]
index.add_postings_batch(postings)
```
### With Search Pipeline
```python
encoder = get_splade_encoder(use_gpu=True)
query_sparse = encoder.encode_text("authentication function")
results = index.search(query_sparse, limit=50, min_score=0.5)
```
## Dependencies
Required packages:
- `numpy` - Numerical operations
- `onnxruntime` - ONNX model execution (CPU)
- `onnxruntime-gpu` - ONNX with GPU support (optional)
- `optimum[onnxruntime]` - Hugging Face ONNX optimization
- `transformers` - Tokenizer and model loading
Install command:
```bash
# CPU only
pip install numpy onnxruntime optimum[onnxruntime] transformers
# With GPU support
pip install numpy onnxruntime-gpu optimum[onnxruntime-gpu] transformers
```
## Testing Status
✓ Python syntax validation passed
✓ Module import successful
✓ Dependency checking works correctly
✗ Full functional test pending (requires optimum installation)
## Next Steps
1. Install dependencies for functional testing
2. Create unit tests in `tests/semantic/test_splade_encoder.py`
3. Benchmark encoding performance (CPU vs GPU)
4. Integrate with codex-lens indexing pipeline
5. Add SPLADE option to semantic search configuration
## Performance Considerations
### Memory Usage
- Model size: ~100MB (ONNX optimized)
- Sparse vectors: ~100-500 non-zero entries per document
- Batch size: 32 recommended (adjust based on GPU memory)
### Speed Benchmarks (Expected)
- CPU encoding: ~10-20 docs/sec
- GPU encoding (CUDA): ~100-200 docs/sec
- GPU encoding (DirectML): ~50-100 docs/sec
### Sparsity Analysis
- Threshold 0.01: ~200-400 tokens per document
- Threshold 0.05: ~100-200 tokens per document
- Threshold 0.10: ~50-100 tokens per document
## References
- SPLADE paper: https://arxiv.org/abs/2107.05720
- SPLADE v2: https://arxiv.org/abs/2109.10086
- Naver model: https://huggingface.co/naver/splade-cocondenser-ensembledistil

View File

@@ -0,0 +1,474 @@
"""ONNX-optimized SPLADE sparse encoder for code search.
This module provides SPLADE (Sparse Lexical and Expansion) encoding using ONNX Runtime
for efficient sparse vector generation. SPLADE produces vocabulary-aligned sparse vectors
that combine the interpretability of BM25 with neural relevance modeling.
Install (CPU):
pip install onnxruntime optimum[onnxruntime] transformers
Install (GPU):
pip install onnxruntime-gpu optimum[onnxruntime-gpu] transformers
"""
from __future__ import annotations
import logging
import threading
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
def check_splade_available() -> Tuple[bool, Optional[str]]:
"""Check whether SPLADE dependencies are available.
Returns:
Tuple of (available: bool, error_message: Optional[str])
"""
try:
import numpy # noqa: F401
except ImportError as exc:
return False, f"numpy not available: {exc}. Install with: pip install numpy"
try:
import onnxruntime # noqa: F401
except ImportError as exc:
return (
False,
f"onnxruntime not available: {exc}. Install with: pip install onnxruntime",
)
try:
from optimum.onnxruntime import ORTModelForMaskedLM # noqa: F401
except ImportError as exc:
return (
False,
f"optimum[onnxruntime] not available: {exc}. Install with: pip install optimum[onnxruntime]",
)
try:
from transformers import AutoTokenizer # noqa: F401
except ImportError as exc:
return (
False,
f"transformers not available: {exc}. Install with: pip install transformers",
)
return True, None
# Global cache for SPLADE encoders (singleton pattern)
_splade_cache: Dict[str, "SpladeEncoder"] = {}
_cache_lock = threading.RLock()
def get_splade_encoder(
model_name: str = "naver/splade-cocondenser-ensembledistil",
use_gpu: bool = True,
max_length: int = 512,
sparsity_threshold: float = 0.01,
) -> "SpladeEncoder":
"""Get or create cached SPLADE encoder (thread-safe singleton).
This function provides significant performance improvement by reusing
SpladeEncoder instances across multiple searches, avoiding repeated model
loading overhead.
Args:
model_name: SPLADE model name (default: naver/splade-cocondenser-ensembledistil)
use_gpu: If True, use GPU acceleration when available
max_length: Maximum sequence length for tokenization
sparsity_threshold: Minimum weight to include in sparse vector
Returns:
Cached SpladeEncoder instance for the given configuration
"""
global _splade_cache
# Cache key includes all configuration parameters
cache_key = f"{model_name}:{'gpu' if use_gpu else 'cpu'}:{max_length}:{sparsity_threshold}"
with _cache_lock:
encoder = _splade_cache.get(cache_key)
if encoder is not None:
return encoder
# Create new encoder and cache it
encoder = SpladeEncoder(
model_name=model_name,
use_gpu=use_gpu,
max_length=max_length,
sparsity_threshold=sparsity_threshold,
)
# Pre-load model to ensure it's ready
encoder._load_model()
_splade_cache[cache_key] = encoder
return encoder
def clear_splade_cache() -> None:
"""Clear the SPLADE encoder cache and release ONNX resources.
This method ensures proper cleanup of ONNX model resources to prevent
memory leaks when encoders are no longer needed.
"""
global _splade_cache
with _cache_lock:
# Release ONNX resources before clearing cache
for encoder in _splade_cache.values():
if encoder._model is not None:
del encoder._model
encoder._model = None
if encoder._tokenizer is not None:
del encoder._tokenizer
encoder._tokenizer = None
_splade_cache.clear()
class SpladeEncoder:
"""ONNX-optimized SPLADE sparse encoder.
Produces sparse vectors with vocabulary-aligned dimensions.
Output: Dict[int, float] mapping token_id to weight.
SPLADE activation formula:
splade_repr = log(1 + ReLU(logits)) * attention_mask
splade_vec = max_pooling(splade_repr, axis=sequence_length)
References:
- SPLADE: https://arxiv.org/abs/2107.05720
- SPLADE v2: https://arxiv.org/abs/2109.10086
"""
DEFAULT_MODEL = "naver/splade-cocondenser-ensembledistil"
def __init__(
self,
model_name: str = DEFAULT_MODEL,
use_gpu: bool = True,
max_length: int = 512,
sparsity_threshold: float = 0.01,
providers: Optional[List[Any]] = None,
) -> None:
"""Initialize SPLADE encoder.
Args:
model_name: SPLADE model name (default: naver/splade-cocondenser-ensembledistil)
use_gpu: If True, use GPU acceleration when available
max_length: Maximum sequence length for tokenization
sparsity_threshold: Minimum weight to include in sparse vector
providers: Explicit ONNX providers list (overrides use_gpu)
"""
self.model_name = (model_name or self.DEFAULT_MODEL).strip()
if not self.model_name:
raise ValueError("model_name cannot be blank")
self.use_gpu = bool(use_gpu)
self.max_length = int(max_length) if max_length > 0 else 512
self.sparsity_threshold = float(sparsity_threshold)
self.providers = providers
self._tokenizer: Any | None = None
self._model: Any | None = None
self._vocab_size: int | None = None
self._lock = threading.RLock()
def _load_model(self) -> None:
"""Lazy load ONNX model and tokenizer."""
if self._model is not None and self._tokenizer is not None:
return
ok, err = check_splade_available()
if not ok:
raise ImportError(err)
with self._lock:
if self._model is not None and self._tokenizer is not None:
return
from inspect import signature
from optimum.onnxruntime import ORTModelForMaskedLM
from transformers import AutoTokenizer
if self.providers is None:
from .gpu_support import get_optimal_providers
# Include device_id options for DirectML/CUDA selection when available
self.providers = get_optimal_providers(
use_gpu=self.use_gpu, with_device_options=True
)
# Some Optimum versions accept `providers`, others accept a single `provider`
# Prefer passing the full providers list, with a conservative fallback
model_kwargs: dict[str, Any] = {}
try:
params = signature(ORTModelForMaskedLM.from_pretrained).parameters
if "providers" in params:
model_kwargs["providers"] = self.providers
elif "provider" in params:
provider_name = "CPUExecutionProvider"
if self.providers:
first = self.providers[0]
provider_name = first[0] if isinstance(first, tuple) else str(first)
model_kwargs["provider"] = provider_name
except Exception:
model_kwargs = {}
try:
self._model = ORTModelForMaskedLM.from_pretrained(
self.model_name,
**model_kwargs,
)
logger.debug(f"SPLADE model loaded: {self.model_name}")
except TypeError:
# Fallback for older Optimum versions: retry without provider arguments
self._model = ORTModelForMaskedLM.from_pretrained(self.model_name)
logger.warning(
"Optimum version doesn't support provider parameters. "
"Upgrade optimum for GPU acceleration: pip install --upgrade optimum"
)
self._tokenizer = AutoTokenizer.from_pretrained(self.model_name, use_fast=True)
# Cache vocabulary size
self._vocab_size = len(self._tokenizer)
logger.debug(f"SPLADE tokenizer loaded: vocab_size={self._vocab_size}")
@staticmethod
def _splade_activation(logits: Any, attention_mask: Any) -> Any:
"""Apply SPLADE activation function to model outputs.
Formula: log(1 + ReLU(logits)) * attention_mask
Args:
logits: Model output logits (batch, seq_len, vocab_size)
attention_mask: Attention mask (batch, seq_len)
Returns:
SPLADE representations (batch, seq_len, vocab_size)
"""
import numpy as np
# ReLU activation
relu_logits = np.maximum(0, logits)
# Log(1 + x) transformation
log_relu = np.log1p(relu_logits)
# Apply attention mask (expand to match vocab dimension)
# attention_mask: (batch, seq_len) -> (batch, seq_len, 1)
mask_expanded = np.expand_dims(attention_mask, axis=-1)
# Element-wise multiplication
splade_repr = log_relu * mask_expanded
return splade_repr
@staticmethod
def _max_pooling(splade_repr: Any) -> Any:
"""Max pooling over sequence length dimension.
Args:
splade_repr: SPLADE representations (batch, seq_len, vocab_size)
Returns:
Pooled sparse vectors (batch, vocab_size)
"""
import numpy as np
# Max pooling over sequence dimension (axis=1)
return np.max(splade_repr, axis=1)
def _to_sparse_dict(self, dense_vec: Any) -> Dict[int, float]:
"""Convert dense vector to sparse dictionary.
Args:
dense_vec: Dense vector (vocab_size,)
Returns:
Sparse dictionary {token_id: weight} with weights above threshold
"""
import numpy as np
# Find non-zero indices above threshold
nonzero_indices = np.where(dense_vec > self.sparsity_threshold)[0]
# Create sparse dictionary
sparse_dict = {
int(idx): float(dense_vec[idx])
for idx in nonzero_indices
}
return sparse_dict
def encode_text(self, text: str) -> Dict[int, float]:
"""Encode text to sparse vector {token_id: weight}.
Args:
text: Input text to encode
Returns:
Sparse vector as dictionary mapping token_id to weight
"""
self._load_model()
if self._model is None or self._tokenizer is None:
raise RuntimeError("Model not loaded")
import numpy as np
# Tokenize input
encoded = self._tokenizer(
text,
padding=True,
truncation=True,
max_length=self.max_length,
return_tensors="np",
)
# Forward pass through model
outputs = self._model(**encoded)
# Extract logits
if hasattr(outputs, "logits"):
logits = outputs.logits
elif isinstance(outputs, dict) and "logits" in outputs:
logits = outputs["logits"]
elif isinstance(outputs, (list, tuple)) and outputs:
logits = outputs[0]
else:
raise RuntimeError("Unexpected model output format")
# Apply SPLADE activation
attention_mask = encoded["attention_mask"]
splade_repr = self._splade_activation(logits, attention_mask)
# Max pooling over sequence length
splade_vec = self._max_pooling(splade_repr)
# Convert to sparse dictionary (single item batch)
sparse_dict = self._to_sparse_dict(splade_vec[0])
return sparse_dict
def encode_batch(self, texts: List[str], batch_size: int = 32) -> List[Dict[int, float]]:
"""Batch encode texts to sparse vectors.
Args:
texts: List of input texts to encode
batch_size: Batch size for encoding (default: 32)
Returns:
List of sparse vectors as dictionaries
"""
if not texts:
return []
self._load_model()
if self._model is None or self._tokenizer is None:
raise RuntimeError("Model not loaded")
import numpy as np
results: List[Dict[int, float]] = []
# Process in batches
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
# Tokenize batch
encoded = self._tokenizer(
batch_texts,
padding=True,
truncation=True,
max_length=self.max_length,
return_tensors="np",
)
# Forward pass through model
outputs = self._model(**encoded)
# Extract logits
if hasattr(outputs, "logits"):
logits = outputs.logits
elif isinstance(outputs, dict) and "logits" in outputs:
logits = outputs["logits"]
elif isinstance(outputs, (list, tuple)) and outputs:
logits = outputs[0]
else:
raise RuntimeError("Unexpected model output format")
# Apply SPLADE activation
attention_mask = encoded["attention_mask"]
splade_repr = self._splade_activation(logits, attention_mask)
# Max pooling over sequence length
splade_vecs = self._max_pooling(splade_repr)
# Convert each vector to sparse dictionary
for vec in splade_vecs:
sparse_dict = self._to_sparse_dict(vec)
results.append(sparse_dict)
return results
@property
def vocab_size(self) -> int:
"""Return vocabulary size (~30k for BERT-based models).
Returns:
Vocabulary size (number of tokens in tokenizer)
"""
if self._vocab_size is not None:
return self._vocab_size
self._load_model()
return self._vocab_size or 0
def get_token(self, token_id: int) -> str:
"""Convert token_id to string (for debugging).
Args:
token_id: Token ID to convert
Returns:
Token string
"""
self._load_model()
if self._tokenizer is None:
raise RuntimeError("Tokenizer not loaded")
return self._tokenizer.decode([token_id])
def get_top_tokens(self, sparse_vec: Dict[int, float], top_k: int = 10) -> List[Tuple[str, float]]:
"""Get top-k tokens with highest weights from sparse vector.
Useful for debugging and understanding what the model is focusing on.
Args:
sparse_vec: Sparse vector as {token_id: weight}
top_k: Number of top tokens to return
Returns:
List of (token_string, weight) tuples, sorted by weight descending
"""
self._load_model()
if not sparse_vec:
return []
# Sort by weight descending
sorted_items = sorted(sparse_vec.items(), key=lambda x: x[1], reverse=True)
# Take top-k and convert token_ids to strings
top_items = sorted_items[:top_k]
return [
(self.get_token(token_id), weight)
for token_id, weight in top_items
]