Remove outdated tests for CodexLens and LiteLLM client, refactor Smart Search MCP usage tests to use new command structure, and clean up unified vector index tests.

This commit is contained in:
catlog22
2026-03-18 11:35:51 +08:00
parent ad9d3f94e0
commit df69f997e4
45 changed files with 64 additions and 11170 deletions

View File

@@ -1,226 +0,0 @@
# Memory Embedder Implementation Summary
## Overview
Created a Python script (`memory_embedder.py`) that bridges CCW to CodexLens semantic search by generating and searching embeddings for memory chunks stored in CCW's SQLite database.
## Files Created
### 1. `memory_embedder.py` (Main Script)
**Location**: `D:\Claude_dms3\ccw\scripts\memory_embedder.py`
**Features**:
- Reuses CodexLens embedder: `from codexlens.semantic.embedder import get_embedder`
- Uses jina-embeddings-v2-base-code (768 dimensions)
- Three commands: `embed`, `search`, `status`
- JSON output for easy integration
- Batch processing for efficiency
- Graceful error handling
**Commands**:
1. **embed** - Generate embeddings
```bash
python memory_embedder.py embed <db_path> [options]
Options:
--source-id ID # Only process specific source
--batch-size N # Batch size (default: 8)
--force # Re-embed existing chunks
```
2. **search** - Semantic search
```bash
python memory_embedder.py search <db_path> <query> [options]
Options:
--top-k N # Number of results (default: 10)
--min-score F # Minimum score (default: 0.3)
--type TYPE # Filter by source type
```
3. **status** - Get statistics
```bash
python memory_embedder.py status <db_path>
```
### 2. `README-memory-embedder.md` (Documentation)
**Location**: `D:\Claude_dms3\ccw\scripts\README-memory-embedder.md`
**Contents**:
- Feature overview
- Requirements and installation
- Detailed usage examples
- Database path reference
- TypeScript integration guide
- Performance metrics
- Source type descriptions
### 3. `memory-embedder-example.ts` (Integration Example)
**Location**: `D:\Claude_dms3\ccw\scripts\memory-embedder-example.ts`
**Exported Functions**:
- `embedChunks(dbPath, options)` - Generate embeddings
- `searchMemory(dbPath, query, options)` - Semantic search
- `getEmbeddingStatus(dbPath)` - Get status
**Example Usage**:
```typescript
import { searchMemory, embedChunks, getEmbeddingStatus } from './memory-embedder-example';
// Check status
const status = getEmbeddingStatus(dbPath);
// Generate embeddings
const result = embedChunks(dbPath, { batchSize: 16 });
// Search
const matches = searchMemory(dbPath, 'authentication', {
topK: 5,
minScore: 0.5,
sourceType: 'workflow'
});
```
## Technical Implementation
### Database Schema
Uses existing `memory_chunks` table:
```sql
CREATE TABLE memory_chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT NOT NULL,
source_type TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding BLOB,
metadata TEXT,
created_at TEXT NOT NULL,
UNIQUE(source_id, chunk_index)
);
```
### Embedding Storage
- Format: `float32` bytes (numpy array)
- Dimension: 768 (jina-embeddings-v2-base-code)
- Storage: `np.array(emb, dtype=np.float32).tobytes()`
- Loading: `np.frombuffer(blob, dtype=np.float32)`
### Similarity Search
- Algorithm: Cosine similarity
- Formula: `np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))`
- Default threshold: 0.3
- Sorting: Descending by score
### Source Types
- `core_memory`: Strategic architectural context
- `workflow`: Session-based development history
- `cli_history`: Command execution logs
### Restore Commands
Generated automatically for each match:
- core_memory/cli_history: `ccw memory export <source_id>`
- workflow: `ccw session resume <source_id>`
## Dependencies
### Required
- `numpy`: Array operations and cosine similarity
- `codex-lens[semantic]`: Embedding generation
### Installation
```bash
pip install numpy codex-lens[semantic]
```
## Testing
### Script Validation
```bash
# Syntax check
python -m py_compile scripts/memory_embedder.py # OK
# Help output
python scripts/memory_embedder.py --help # Works
python scripts/memory_embedder.py embed --help # Works
python scripts/memory_embedder.py search --help # Works
python scripts/memory_embedder.py status --help # Works
# Status test
python scripts/memory_embedder.py status <db_path> # Works
```
### Error Handling
- Missing database: FileNotFoundError with clear message
- Missing CodexLens: ImportError with installation instructions
- Missing numpy: ImportError with installation instructions
- Database errors: JSON error response with success=false
- Missing table: Graceful error with JSON output
## Performance
- **Embedding speed**: ~8 chunks/second (batch size 8)
- **Search speed**: ~0.1-0.5 seconds for 1000 chunks
- **Model loading**: ~0.8 seconds (cached after first use via CodexLens singleton)
- **Batch processing**: Configurable batch size (default: 8)
## Output Format
All commands output JSON for easy parsing:
### Embed Result
```json
{
"success": true,
"chunks_processed": 50,
"chunks_failed": 0,
"elapsed_time": 12.34
}
```
### Search Result
```json
{
"success": true,
"matches": [
{
"source_id": "WFS-20250101-auth",
"source_type": "workflow",
"chunk_index": 2,
"content": "Implemented JWT...",
"score": 0.8542,
"restore_command": "ccw session resume WFS-20250101-auth"
}
]
}
```
### Status Result
```json
{
"total_chunks": 150,
"embedded_chunks": 100,
"pending_chunks": 50,
"by_type": {
"core_memory": {"total": 80, "embedded": 60, "pending": 20}
}
}
```
## Next Steps
1. **TypeScript Integration**: Add to CCW's core memory routes
2. **CLI Command**: Create `ccw memory search` command
3. **Automatic Embedding**: Trigger embedding on memory creation
4. **Index Management**: Add rebuild/optimize commands
5. **Cluster Search**: Integrate with session clusters
## Code Quality
- ✅ Single responsibility per function
- ✅ Clear, descriptive naming
- ✅ Explicit error handling
- ✅ No premature abstractions
- ✅ Minimal debug output (essential logging only)
- ✅ ASCII-only characters (no emojis)
- ✅ GBK encoding compatible
- ✅ Type hints for all functions
- ✅ Comprehensive docstrings

View File

@@ -1,135 +0,0 @@
# Memory Embedder - Quick Reference
## Installation
```bash
pip install numpy codex-lens[semantic]
```
## Commands
### Status
```bash
python scripts/memory_embedder.py status <db_path>
```
### Embed All
```bash
python scripts/memory_embedder.py embed <db_path>
```
### Embed Specific Source
```bash
python scripts/memory_embedder.py embed <db_path> --source-id CMEM-20250101-120000
```
### Re-embed (Force)
```bash
python scripts/memory_embedder.py embed <db_path> --force
```
### Search
```bash
python scripts/memory_embedder.py search <db_path> "authentication flow"
```
### Advanced Search
```bash
python scripts/memory_embedder.py search <db_path> "rate limiting" \
--top-k 5 \
--min-score 0.5 \
--type workflow
```
## Database Path
Find your database:
```bash
# Linux/Mac
~/.ccw/projects/<project-id>/core-memory/core_memory.db
# Windows
%USERPROFILE%\.ccw\projects\<project-id>\core-memory\core_memory.db
```
## TypeScript Integration
```typescript
import { execSync } from 'child_process';
// Status
const status = JSON.parse(
execSync(`python scripts/memory_embedder.py status "${dbPath}"`, {
encoding: 'utf-8'
})
);
// Embed
const result = JSON.parse(
execSync(`python scripts/memory_embedder.py embed "${dbPath}"`, {
encoding: 'utf-8'
})
);
// Search
const matches = JSON.parse(
execSync(
`python scripts/memory_embedder.py search "${dbPath}" "query"`,
{ encoding: 'utf-8' }
)
);
```
## Output Examples
### Status
```json
{
"total_chunks": 150,
"embedded_chunks": 100,
"pending_chunks": 50,
"by_type": {
"core_memory": {"total": 80, "embedded": 60, "pending": 20}
}
}
```
### Embed
```json
{
"success": true,
"chunks_processed": 50,
"chunks_failed": 0,
"elapsed_time": 12.34
}
```
### Search
```json
{
"success": true,
"matches": [
{
"source_id": "WFS-20250101-auth",
"source_type": "workflow",
"chunk_index": 2,
"content": "Implemented JWT authentication...",
"score": 0.8542,
"restore_command": "ccw session resume WFS-20250101-auth"
}
]
}
```
## Source Types
- `core_memory` - Strategic architectural context
- `workflow` - Session-based development history
- `cli_history` - Command execution logs
## Performance
- Embedding: ~8 chunks/second
- Search: ~0.1-0.5s for 1000 chunks
- Model load: ~0.8s (cached)
- Batch size: 8 (default, configurable)

View File

@@ -1,157 +0,0 @@
# Memory Embedder
Bridge CCW to CodexLens semantic search by generating and searching embeddings for memory chunks.
## Features
- **Generate embeddings** for memory chunks using CodexLens's jina-embeddings-v2-base-code (768 dim)
- **Semantic search** across all memory types (core_memory, workflow, cli_history)
- **Status tracking** to monitor embedding progress
- **Batch processing** for efficient embedding generation
- **Restore commands** included in search results
## Requirements
```bash
pip install numpy codex-lens[semantic]
```
## Usage
### 1. Check Status
```bash
python scripts/memory_embedder.py status <db_path>
```
Example output:
```json
{
"total_chunks": 150,
"embedded_chunks": 100,
"pending_chunks": 50,
"by_type": {
"core_memory": {"total": 80, "embedded": 60, "pending": 20},
"workflow": {"total": 50, "embedded": 30, "pending": 20},
"cli_history": {"total": 20, "embedded": 10, "pending": 10}
}
}
```
### 2. Generate Embeddings
Embed all unembedded chunks:
```bash
python scripts/memory_embedder.py embed <db_path>
```
Embed specific source:
```bash
python scripts/memory_embedder.py embed <db_path> --source-id CMEM-20250101-120000
```
Re-embed all chunks (force):
```bash
python scripts/memory_embedder.py embed <db_path> --force
```
Adjust batch size (default 8):
```bash
python scripts/memory_embedder.py embed <db_path> --batch-size 16
```
Example output:
```json
{
"success": true,
"chunks_processed": 50,
"chunks_failed": 0,
"elapsed_time": 12.34
}
```
### 3. Semantic Search
Basic search:
```bash
python scripts/memory_embedder.py search <db_path> "authentication flow"
```
Advanced search:
```bash
python scripts/memory_embedder.py search <db_path> "rate limiting" \
--top-k 5 \
--min-score 0.5 \
--type workflow
```
Example output:
```json
{
"success": true,
"matches": [
{
"source_id": "WFS-20250101-auth",
"source_type": "workflow",
"chunk_index": 2,
"content": "Implemented JWT-based authentication...",
"score": 0.8542,
"restore_command": "ccw session resume WFS-20250101-auth"
}
]
}
```
## Database Path
The database is located in CCW's storage directory:
- **Windows**: `%USERPROFILE%\.ccw\projects\<project-id>\core-memory\core_memory.db`
- **Linux/Mac**: `~/.ccw/projects/<project-id>/core-memory/core_memory.db`
Find your project's database:
```bash
ccw memory list # Shows project path
# Then look in: ~/.ccw/projects/<hashed-path>/core-memory/core_memory.db
```
## Integration with CCW
This script is designed to be called from CCW's TypeScript code:
```typescript
import { execSync } from 'child_process';
// Embed chunks
const result = execSync(
`python scripts/memory_embedder.py embed ${dbPath}`,
{ encoding: 'utf-8' }
);
const { success, chunks_processed } = JSON.parse(result);
// Search
const searchResult = execSync(
`python scripts/memory_embedder.py search ${dbPath} "${query}" --top-k 10`,
{ encoding: 'utf-8' }
);
const { matches } = JSON.parse(searchResult);
```
## Performance
- **Embedding speed**: ~8 chunks/second (batch size 8)
- **Search speed**: ~0.1-0.5 seconds for 1000 chunks
- **Model loading**: ~0.8 seconds (cached after first use)
## Source Types
- `core_memory`: Strategic architectural context
- `workflow`: Session-based development history
- `cli_history`: Command execution logs
## Restore Commands
Search results include restore commands:
- **core_memory/cli_history**: `ccw memory export <source_id>`
- **workflow**: `ccw session resume <source_id>`

View File

@@ -1,184 +0,0 @@
/**
* Example: Using Memory Embedder from TypeScript
*
* This shows how to integrate the Python memory embedder script
* into CCW's TypeScript codebase.
*/
import { execSync } from 'child_process';
import { join } from 'path';
interface EmbedResult {
success: boolean;
chunks_processed: number;
chunks_failed: number;
elapsed_time: number;
}
interface SearchMatch {
source_id: string;
source_type: 'core_memory' | 'workflow' | 'cli_history';
chunk_index: number;
content: string;
score: number;
restore_command: string;
}
interface SearchResult {
success: boolean;
matches: SearchMatch[];
error?: string;
}
interface StatusResult {
total_chunks: number;
embedded_chunks: number;
pending_chunks: number;
by_type: Record<string, { total: number; embedded: number; pending: number }>;
}
/**
* Get path to memory embedder script
*/
function getEmbedderScript(): string {
return join(__dirname, 'memory_embedder.py');
}
/**
* Execute memory embedder command
*/
function execEmbedder(args: string[]): string {
const script = getEmbedderScript();
const command = `python "${script}" ${args.join(' ')}`;
try {
return execSync(command, {
encoding: 'utf-8',
maxBuffer: 10 * 1024 * 1024 // 10MB buffer
});
} catch (error: any) {
// Try to parse error output as JSON
if (error.stdout) {
return error.stdout;
}
throw new Error(`Embedder failed: ${error.message}`);
}
}
/**
* Generate embeddings for memory chunks
*/
export function embedChunks(
dbPath: string,
options: {
sourceId?: string;
batchSize?: number;
force?: boolean;
} = {}
): EmbedResult {
const args = ['embed', `"${dbPath}"`];
if (options.sourceId) {
args.push('--source-id', options.sourceId);
}
if (options.batchSize) {
args.push('--batch-size', String(options.batchSize));
}
if (options.force) {
args.push('--force');
}
const output = execEmbedder(args);
return JSON.parse(output);
}
/**
* Search memory chunks semantically
*/
export function searchMemory(
dbPath: string,
query: string,
options: {
topK?: number;
minScore?: number;
sourceType?: 'core_memory' | 'workflow' | 'cli_history';
} = {}
): SearchResult {
const args = ['search', `"${dbPath}"`, `"${query}"`];
if (options.topK) {
args.push('--top-k', String(options.topK));
}
if (options.minScore !== undefined) {
args.push('--min-score', String(options.minScore));
}
if (options.sourceType) {
args.push('--type', options.sourceType);
}
const output = execEmbedder(args);
return JSON.parse(output);
}
/**
* Get embedding status
*/
export function getEmbeddingStatus(dbPath: string): StatusResult {
const args = ['status', `"${dbPath}"`];
const output = execEmbedder(args);
return JSON.parse(output);
}
// ============================================================================
// Example Usage
// ============================================================================
async function exampleUsage() {
const dbPath = join(process.env.HOME || '', '.ccw/projects/myproject/core-memory/core_memory.db');
// 1. Check status
console.log('Checking embedding status...');
const status = getEmbeddingStatus(dbPath);
console.log(`Total chunks: ${status.total_chunks}`);
console.log(`Embedded: ${status.embedded_chunks}`);
console.log(`Pending: ${status.pending_chunks}`);
// 2. Generate embeddings if needed
if (status.pending_chunks > 0) {
console.log('\nGenerating embeddings...');
const embedResult = embedChunks(dbPath, { batchSize: 16 });
console.log(`Processed: ${embedResult.chunks_processed}`);
console.log(`Time: ${embedResult.elapsed_time}s`);
}
// 3. Search for relevant memories
console.log('\nSearching for authentication-related memories...');
const searchResult = searchMemory(dbPath, 'authentication flow', {
topK: 5,
minScore: 0.5
});
if (searchResult.success) {
console.log(`Found ${searchResult.matches.length} matches:`);
for (const match of searchResult.matches) {
console.log(`\n- ${match.source_id} (score: ${match.score})`);
console.log(` Type: ${match.source_type}`);
console.log(` Restore: ${match.restore_command}`);
console.log(` Content: ${match.content.substring(0, 100)}...`);
}
}
// 4. Search specific source type
console.log('\nSearching workflows only...');
const workflowSearch = searchMemory(dbPath, 'API implementation', {
sourceType: 'workflow',
topK: 3
});
console.log(`Found ${workflowSearch.matches.length} workflow matches`);
}
// Run example if executed directly
if (require.main === module) {
exampleUsage().catch(console.error);
}

View File

@@ -1,428 +0,0 @@
#!/usr/bin/env python3
"""
Memory Embedder - Bridge CCW to CodexLens semantic search
This script generates and searches embeddings for memory chunks stored in CCW's
SQLite database using CodexLens's embedder.
Usage:
python memory_embedder.py embed <db_path> [--source-id ID] [--batch-size N] [--force]
python memory_embedder.py search <db_path> <query> [--top-k N] [--min-score F] [--type TYPE]
python memory_embedder.py status <db_path>
"""
import argparse
import json
import sqlite3
import sys
import time
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
try:
import numpy as np
except ImportError:
print("Error: numpy is required. Install with: pip install numpy", file=sys.stderr)
sys.exit(1)
try:
from codexlens.semantic.factory import get_embedder as get_embedder_factory
from codexlens.semantic.factory import clear_embedder_cache
from codexlens.config import Config as CodexLensConfig
except ImportError:
print("Error: CodexLens not found. Install with: pip install codex-lens[semantic]", file=sys.stderr)
sys.exit(1)
class MemoryEmbedder:
"""Generate and search embeddings for memory chunks."""
def __init__(self, db_path: str):
"""Initialize embedder with database path."""
self.db_path = Path(db_path)
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found: {db_path}")
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row
# Load CodexLens configuration for embedding settings
try:
self._config = CodexLensConfig.load()
except Exception as e:
print(f"Warning: Could not load CodexLens config, using defaults. Error: {e}", file=sys.stderr)
self._config = CodexLensConfig() # Use default config
# Lazy-load embedder to avoid ~0.8s model loading for status command
self._embedder = None
self._embedding_dim = None
@property
def embedding_dim(self) -> int:
"""Get embedding dimension from the embedder."""
if self._embedding_dim is None:
# Access embedder to get its dimension
self._embedding_dim = self.embedder.embedding_dim
return self._embedding_dim
@property
def embedder(self):
"""Lazy-load the embedder on first access using CodexLens config."""
if self._embedder is None:
# Use CodexLens configuration settings
backend = self._config.embedding_backend
model = self._config.embedding_model
use_gpu = self._config.embedding_use_gpu
# Use factory to create embedder based on backend type
if backend == "fastembed":
self._embedder = get_embedder_factory(
backend="fastembed",
profile=model,
use_gpu=use_gpu
)
elif backend == "litellm":
# For litellm backend, also pass endpoints if configured
endpoints = self._config.embedding_endpoints
strategy = self._config.embedding_strategy
cooldown = self._config.embedding_cooldown
self._embedder = get_embedder_factory(
backend="litellm",
model=model,
endpoints=endpoints if endpoints else None,
strategy=strategy,
cooldown=cooldown,
)
else:
# Fallback to fastembed with code profile
self._embedder = get_embedder_factory(
backend="fastembed",
profile="code",
use_gpu=True
)
return self._embedder
def close(self):
"""Close database connection."""
if self.conn:
self.conn.close()
def embed_chunks(
self,
source_id: Optional[str] = None,
batch_size: int = 8,
force: bool = False
) -> Dict[str, Any]:
"""
Generate embeddings for unembedded chunks.
Args:
source_id: Only process chunks from this source
batch_size: Number of chunks to process in each batch
force: Re-embed chunks that already have embeddings
Returns:
Result dict with success, chunks_processed, chunks_failed, elapsed_time
"""
start_time = time.time()
# Build query
query = "SELECT id, source_id, source_type, chunk_index, content FROM memory_chunks"
params = []
if force:
# Process all chunks (with optional source filter)
if source_id:
query += " WHERE source_id = ?"
params.append(source_id)
else:
# Only process chunks without embeddings
query += " WHERE embedding IS NULL"
if source_id:
query += " AND source_id = ?"
params.append(source_id)
query += " ORDER BY id"
cursor = self.conn.cursor()
cursor.execute(query, params)
chunks_processed = 0
chunks_failed = 0
batch = []
batch_ids = []
for row in cursor:
batch.append(row["content"])
batch_ids.append(row["id"])
# Process batch when full
if len(batch) >= batch_size:
processed, failed = self._process_batch(batch, batch_ids)
chunks_processed += processed
chunks_failed += failed
batch = []
batch_ids = []
# Process remaining chunks
if batch:
processed, failed = self._process_batch(batch, batch_ids)
chunks_processed += processed
chunks_failed += failed
elapsed_time = time.time() - start_time
return {
"success": chunks_failed == 0,
"chunks_processed": chunks_processed,
"chunks_failed": chunks_failed,
"elapsed_time": round(elapsed_time, 2)
}
def _process_batch(self, texts: List[str], ids: List[int]) -> Tuple[int, int]:
"""Process a batch of texts and update embeddings."""
try:
# Generate embeddings for batch
embeddings = self.embedder.embed(texts)
processed = 0
failed = 0
# Update database
cursor = self.conn.cursor()
for chunk_id, embedding in zip(ids, embeddings):
try:
# Convert to numpy array and store as bytes
emb_array = np.array(embedding, dtype=np.float32)
emb_bytes = emb_array.tobytes()
cursor.execute(
"UPDATE memory_chunks SET embedding = ? WHERE id = ?",
(emb_bytes, chunk_id)
)
processed += 1
except Exception as e:
print(f"Error updating chunk {chunk_id}: {e}", file=sys.stderr)
failed += 1
self.conn.commit()
return processed, failed
except Exception as e:
print(f"Error processing batch: {e}", file=sys.stderr)
return 0, len(ids)
def search(
self,
query: str,
top_k: int = 10,
min_score: float = 0.3,
source_type: Optional[str] = None
) -> Dict[str, Any]:
"""
Perform semantic search on memory chunks.
Args:
query: Search query text
top_k: Number of results to return
min_score: Minimum similarity score (0-1)
source_type: Filter by source type (core_memory, workflow, cli_history)
Returns:
Result dict with success and matches list
"""
try:
# Generate query embedding
query_embedding = self.embedder.embed_single(query)
query_array = np.array(query_embedding, dtype=np.float32)
# Build database query
sql = """
SELECT id, source_id, source_type, chunk_index, content, embedding
FROM memory_chunks
WHERE embedding IS NOT NULL
"""
params = []
if source_type:
sql += " AND source_type = ?"
params.append(source_type)
cursor = self.conn.cursor()
cursor.execute(sql, params)
# Calculate similarities
matches = []
for row in cursor:
# Load embedding from bytes
emb_bytes = row["embedding"]
emb_array = np.frombuffer(emb_bytes, dtype=np.float32)
# Cosine similarity
score = float(
np.dot(query_array, emb_array) /
(np.linalg.norm(query_array) * np.linalg.norm(emb_array))
)
if score >= min_score:
# Generate restore command
restore_command = self._get_restore_command(
row["source_id"],
row["source_type"]
)
matches.append({
"source_id": row["source_id"],
"source_type": row["source_type"],
"chunk_index": row["chunk_index"],
"content": row["content"],
"score": round(score, 4),
"restore_command": restore_command
})
# Sort by score and limit
matches.sort(key=lambda x: x["score"], reverse=True)
matches = matches[:top_k]
return {
"success": True,
"matches": matches
}
except Exception as e:
return {
"success": False,
"error": str(e),
"matches": []
}
def _get_restore_command(self, source_id: str, source_type: str) -> str:
"""Generate restore command for a source."""
if source_type in ("core_memory", "cli_history"):
return f"ccw memory export {source_id}"
elif source_type == "workflow":
return f"ccw session resume {source_id}"
else:
return f"# Unknown source type: {source_type}"
def get_status(self) -> Dict[str, Any]:
"""Get embedding status statistics."""
cursor = self.conn.cursor()
# Total chunks
cursor.execute("SELECT COUNT(*) as count FROM memory_chunks")
total_chunks = cursor.fetchone()["count"]
# Embedded chunks
cursor.execute("SELECT COUNT(*) as count FROM memory_chunks WHERE embedding IS NOT NULL")
embedded_chunks = cursor.fetchone()["count"]
# By type
cursor.execute("""
SELECT
source_type,
COUNT(*) as total,
SUM(CASE WHEN embedding IS NOT NULL THEN 1 ELSE 0 END) as embedded
FROM memory_chunks
GROUP BY source_type
""")
by_type = {}
for row in cursor:
by_type[row["source_type"]] = {
"total": row["total"],
"embedded": row["embedded"],
"pending": row["total"] - row["embedded"]
}
return {
"total_chunks": total_chunks,
"embedded_chunks": embedded_chunks,
"pending_chunks": total_chunks - embedded_chunks,
"by_type": by_type
}
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Memory Embedder - Bridge CCW to CodexLens semantic search"
)
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
subparsers.required = True
# Embed command
embed_parser = subparsers.add_parser("embed", help="Generate embeddings for chunks")
embed_parser.add_argument("db_path", help="Path to SQLite database")
embed_parser.add_argument("--source-id", help="Only process chunks from this source")
embed_parser.add_argument("--batch-size", type=int, default=8, help="Batch size (default: 8)")
embed_parser.add_argument("--force", action="store_true", help="Re-embed existing chunks")
# Search command
search_parser = subparsers.add_parser("search", help="Semantic search")
search_parser.add_argument("db_path", help="Path to SQLite database")
search_parser.add_argument("query", help="Search query")
search_parser.add_argument("--top-k", type=int, default=10, help="Number of results (default: 10)")
search_parser.add_argument("--min-score", type=float, default=0.3, help="Minimum score (default: 0.3)")
search_parser.add_argument("--type", dest="source_type", help="Filter by source type")
# Status command
status_parser = subparsers.add_parser("status", help="Get embedding status")
status_parser.add_argument("db_path", help="Path to SQLite database")
args = parser.parse_args()
try:
embedder = MemoryEmbedder(args.db_path)
if args.command == "embed":
result = embedder.embed_chunks(
source_id=args.source_id,
batch_size=args.batch_size,
force=args.force
)
print(json.dumps(result, indent=2))
elif args.command == "search":
result = embedder.search(
query=args.query,
top_k=args.top_k,
min_score=args.min_score,
source_type=args.source_type
)
print(json.dumps(result, indent=2))
elif args.command == "status":
result = embedder.get_status()
print(json.dumps(result, indent=2))
embedder.close()
# Exit with error code if operation failed
if "success" in result and not result["success"]:
# Clean up ONNX resources before exit
clear_embedder_cache()
sys.exit(1)
# Clean up ONNX resources to ensure process can exit cleanly
# This releases fastembed/ONNX Runtime threads that would otherwise
# prevent the Python interpreter from shutting down
clear_embedder_cache()
except Exception as e:
# Clean up ONNX resources even on error
try:
clear_embedder_cache()
except Exception:
pass
print(json.dumps({
"success": False,
"error": str(e)
}, indent=2), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,245 +0,0 @@
#!/usr/bin/env python3
"""
Test script for memory_embedder.py
Creates a temporary database with test data and verifies all commands work.
"""
import json
import sqlite3
import tempfile
import subprocess
from pathlib import Path
from datetime import datetime
def create_test_database():
"""Create a temporary database with test chunks."""
# Create temp file
temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
temp_db.close()
conn = sqlite3.connect(temp_db.name)
cursor = conn.cursor()
# Create schema
cursor.execute("""
CREATE TABLE memory_chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT NOT NULL,
source_type TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding BLOB,
metadata TEXT,
created_at TEXT NOT NULL,
UNIQUE(source_id, chunk_index)
)
""")
# Insert test data
test_chunks = [
("CMEM-20250101-001", "core_memory", 0, "Implemented authentication using JWT tokens with refresh mechanism"),
("CMEM-20250101-001", "core_memory", 1, "Added rate limiting to API endpoints using Redis"),
("WFS-20250101-auth", "workflow", 0, "Created login endpoint with password hashing"),
("WFS-20250101-auth", "workflow", 1, "Implemented session management with token rotation"),
("CLI-20250101-001", "cli_history", 0, "Executed database migration for user table"),
]
now = datetime.now().isoformat()
for source_id, source_type, chunk_index, content in test_chunks:
cursor.execute(
"""
INSERT INTO memory_chunks (source_id, source_type, chunk_index, content, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(source_id, source_type, chunk_index, content, now)
)
conn.commit()
conn.close()
return temp_db.name
def run_command(args):
"""Run memory_embedder.py with given arguments."""
script = Path(__file__).parent / "memory_embedder.py"
cmd = ["python", str(script)] + args
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
return result.returncode, result.stdout, result.stderr
def test_status(db_path):
"""Test status command."""
print("Testing status command...")
returncode, stdout, stderr = run_command(["status", db_path])
if returncode != 0:
print(f"[FAIL] Status failed: {stderr}")
return False
result = json.loads(stdout)
expected_total = 5
if result["total_chunks"] != expected_total:
print(f"[FAIL] Expected {expected_total} chunks, got {result['total_chunks']}")
return False
if result["embedded_chunks"] != 0:
print(f"[FAIL] Expected 0 embedded chunks, got {result['embedded_chunks']}")
return False
print(f"[PASS] Status OK: {result['total_chunks']} total, {result['embedded_chunks']} embedded")
return True
def test_embed(db_path):
"""Test embed command."""
print("\nTesting embed command...")
returncode, stdout, stderr = run_command(["embed", db_path, "--batch-size", "2"])
if returncode != 0:
print(f"[FAIL] Embed failed: {stderr}")
return False
result = json.loads(stdout)
if not result["success"]:
print(f"[FAIL] Embed unsuccessful")
return False
if result["chunks_processed"] != 5:
print(f"[FAIL] Expected 5 processed, got {result['chunks_processed']}")
return False
if result["chunks_failed"] != 0:
print(f"[FAIL] Expected 0 failed, got {result['chunks_failed']}")
return False
print(f"[PASS] Embed OK: {result['chunks_processed']} processed in {result['elapsed_time']}s")
return True
def test_search(db_path):
"""Test search command."""
print("\nTesting search command...")
returncode, stdout, stderr = run_command([
"search", db_path, "authentication JWT",
"--top-k", "3",
"--min-score", "0.3"
])
if returncode != 0:
print(f"[FAIL] Search failed: {stderr}")
return False
result = json.loads(stdout)
if not result["success"]:
print(f"[FAIL] Search unsuccessful: {result.get('error', 'Unknown error')}")
return False
if len(result["matches"]) == 0:
print(f"[FAIL] Expected at least 1 match, got 0")
return False
print(f"[PASS] Search OK: {len(result['matches'])} matches found")
# Show top match
top_match = result["matches"][0]
print(f" Top match: {top_match['source_id']} (score: {top_match['score']})")
print(f" Content: {top_match['content'][:60]}...")
return True
def test_source_filter(db_path):
"""Test search with source type filter."""
print("\nTesting source type filter...")
returncode, stdout, stderr = run_command([
"search", db_path, "authentication",
"--type", "workflow"
])
if returncode != 0:
print(f"[FAIL] Filtered search failed: {stderr}")
return False
result = json.loads(stdout)
if not result["success"]:
print(f"[FAIL] Filtered search unsuccessful")
return False
# Verify all matches are workflow type
for match in result["matches"]:
if match["source_type"] != "workflow":
print(f"[FAIL] Expected workflow type, got {match['source_type']}")
return False
print(f"[PASS] Filter OK: {len(result['matches'])} workflow matches")
return True
def main():
"""Run all tests."""
print("Memory Embedder Test Suite")
print("=" * 60)
# Create test database
print("\nCreating test database...")
db_path = create_test_database()
print(f"[PASS] Database created: {db_path}")
try:
# Run tests
tests = [
("Status", test_status),
("Embed", test_embed),
("Search", test_search),
("Source Filter", test_source_filter),
]
passed = 0
failed = 0
for name, test_func in tests:
try:
if test_func(db_path):
passed += 1
else:
failed += 1
except Exception as e:
print(f"[FAIL] {name} crashed: {e}")
failed += 1
# Summary
print("\n" + "=" * 60)
print(f"Results: {passed} passed, {failed} failed")
if failed == 0:
print("[PASS] All tests passed!")
return 0
else:
print("[FAIL] Some tests failed")
return 1
finally:
# Cleanup
import os
try:
os.unlink(db_path)
print(f"\n[PASS] Cleaned up test database")
except:
pass
if __name__ == "__main__":
exit(main())

View File

@@ -1,473 +0,0 @@
#!/usr/bin/env python3
"""
Unified Memory Embedder - Bridge CCW to CodexLens VectorStore (HNSW)
Uses CodexLens VectorStore for HNSW-indexed vector storage and search,
replacing full-table-scan cosine similarity with sub-10ms approximate
nearest neighbor lookups.
Protocol: JSON via stdin/stdout
Operations: embed, search, search_by_vector, status, reindex
Usage:
echo '{"operation":"embed","store_path":"...","chunks":[...]}' | python unified_memory_embedder.py
echo '{"operation":"search","store_path":"...","query":"..."}' | python unified_memory_embedder.py
echo '{"operation":"status","store_path":"..."}' | python unified_memory_embedder.py
echo '{"operation":"reindex","store_path":"..."}' | python unified_memory_embedder.py
"""
import json
import sys
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
try:
import numpy as np
except ImportError:
print(json.dumps({
"success": False,
"error": "numpy is required. Install with: pip install numpy"
}))
sys.exit(1)
try:
from codexlens.semantic.factory import get_embedder, clear_embedder_cache
from codexlens.semantic.vector_store import VectorStore
from codexlens.entities import SemanticChunk
except ImportError:
print(json.dumps({
"success": False,
"error": "CodexLens not found. Install with: pip install codex-lens[semantic]"
}))
sys.exit(1)
# Valid category values for filtering
VALID_CATEGORIES = {"core_memory", "cli_history", "workflow", "entity", "pattern"}
class UnifiedMemoryEmbedder:
"""Unified embedder backed by CodexLens VectorStore (HNSW)."""
def __init__(self, store_path: str):
"""
Initialize with path to VectorStore database directory.
Args:
store_path: Directory containing vectors.db and vectors.hnsw
"""
self.store_path = Path(store_path)
self.store_path.mkdir(parents=True, exist_ok=True)
db_path = str(self.store_path / "vectors.db")
self.store = VectorStore(db_path)
# Lazy-load embedder to avoid ~0.8s model loading for status command
self._embedder = None
@property
def embedder(self):
"""Lazy-load the embedder on first access."""
if self._embedder is None:
self._embedder = get_embedder(
backend="fastembed",
profile="code",
use_gpu=True
)
return self._embedder
def embed(self, chunks: List[Dict[str, Any]], batch_size: int = 8) -> Dict[str, Any]:
"""
Embed chunks and insert into VectorStore.
Each chunk dict must contain:
- content: str
- source_id: str
- source_type: str (e.g. "core_memory", "workflow", "cli_history")
- category: str (e.g. "core_memory", "cli_history", "workflow", "entity", "pattern")
Optional fields:
- chunk_index: int (default 0)
- metadata: dict (additional metadata)
Args:
chunks: List of chunk dicts to embed
batch_size: Number of chunks to embed per batch
Returns:
Result dict with success, chunks_processed, chunks_failed, elapsed_time
"""
start_time = time.time()
chunks_processed = 0
chunks_failed = 0
if not chunks:
return {
"success": True,
"chunks_processed": 0,
"chunks_failed": 0,
"elapsed_time": 0.0
}
# Process in batches
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c["content"] for c in batch]
try:
# Batch embed
embeddings = self.embedder.embed_to_numpy(texts)
# Build SemanticChunks and insert
semantic_chunks = []
for j, chunk_data in enumerate(batch):
category = chunk_data.get("category", chunk_data.get("source_type", "core_memory"))
source_id = chunk_data.get("source_id", "")
chunk_index = chunk_data.get("chunk_index", 0)
extra_meta = chunk_data.get("metadata", {})
# Build metadata dict for VectorStore
metadata = {
"source_id": source_id,
"source_type": chunk_data.get("source_type", ""),
"chunk_index": chunk_index,
**extra_meta
}
sc = SemanticChunk(
content=chunk_data["content"],
embedding=embeddings[j].tolist(),
metadata=metadata
)
semantic_chunks.append((sc, source_id, category))
# Insert into VectorStore
for sc, file_path, category in semantic_chunks:
try:
self.store.add_chunk(sc, file_path=file_path, category=category)
chunks_processed += 1
except Exception as e:
print(f"Error inserting chunk: {e}", file=sys.stderr)
chunks_failed += 1
except Exception as e:
print(f"Error embedding batch starting at {i}: {e}", file=sys.stderr)
chunks_failed += len(batch)
elapsed_time = time.time() - start_time
return {
"success": chunks_failed == 0,
"chunks_processed": chunks_processed,
"chunks_failed": chunks_failed,
"elapsed_time": round(elapsed_time, 3)
}
def search(
self,
query: str,
top_k: int = 10,
min_score: float = 0.3,
category: Optional[str] = None
) -> Dict[str, Any]:
"""
Search VectorStore using HNSW index.
Args:
query: Search query text
top_k: Number of results
min_score: Minimum similarity threshold
category: Optional category filter
Returns:
Result dict with success and matches list
"""
try:
start_time = time.time()
# Generate query embedding (embed_to_numpy accepts single string)
query_emb = self.embedder.embed_to_numpy(query)[0].tolist()
# Search via VectorStore HNSW
results = self.store.search_similar(
query_emb,
top_k=top_k,
min_score=min_score,
category=category
)
elapsed_time = time.time() - start_time
matches = []
for result in results:
meta = result.metadata if result.metadata else {}
if isinstance(meta, str):
try:
meta = json.loads(meta)
except (json.JSONDecodeError, TypeError):
meta = {}
matches.append({
"content": result.content or result.excerpt or "",
"score": round(float(result.score), 4),
"source_id": meta.get("source_id", result.path or ""),
"source_type": meta.get("source_type", ""),
"chunk_index": meta.get("chunk_index", 0),
"category": meta.get("category", ""),
"metadata": meta
})
return {
"success": True,
"matches": matches,
"elapsed_time": round(elapsed_time, 3),
"total_searched": len(results)
}
except Exception as e:
return {
"success": False,
"matches": [],
"error": str(e)
}
def search_by_vector(
self,
vector: List[float],
top_k: int = 10,
min_score: float = 0.3,
category: Optional[str] = None
) -> Dict[str, Any]:
"""
Search VectorStore using a pre-computed embedding vector (no re-embedding).
Args:
vector: Pre-computed embedding vector (list of floats)
top_k: Number of results
min_score: Minimum similarity threshold
category: Optional category filter
Returns:
Result dict with success and matches list
"""
try:
start_time = time.time()
# Search via VectorStore HNSW directly with provided vector
results = self.store.search_similar(
vector,
top_k=top_k,
min_score=min_score,
category=category
)
elapsed_time = time.time() - start_time
matches = []
for result in results:
meta = result.metadata if result.metadata else {}
if isinstance(meta, str):
try:
meta = json.loads(meta)
except (json.JSONDecodeError, TypeError):
meta = {}
matches.append({
"content": result.content or result.excerpt or "",
"score": round(float(result.score), 4),
"source_id": meta.get("source_id", result.path or ""),
"source_type": meta.get("source_type", ""),
"chunk_index": meta.get("chunk_index", 0),
"category": meta.get("category", ""),
"metadata": meta
})
return {
"success": True,
"matches": matches,
"elapsed_time": round(elapsed_time, 3),
"total_searched": len(results)
}
except Exception as e:
return {
"success": False,
"matches": [],
"error": str(e)
}
def status(self) -> Dict[str, Any]:
"""
Get VectorStore index status.
Returns:
Status dict with total_chunks, hnsw_available, dimension, etc.
"""
try:
total_chunks = self.store.count_chunks()
hnsw_available = self.store.ann_available
hnsw_count = self.store.ann_count
dimension = self.store.dimension or 768
# Count per category from SQLite
categories = {}
try:
import sqlite3
db_path = str(self.store_path / "vectors.db")
with sqlite3.connect(db_path) as conn:
rows = conn.execute(
"SELECT category, COUNT(*) FROM semantic_chunks GROUP BY category"
).fetchall()
for row in rows:
categories[row[0] or "unknown"] = row[1]
except Exception:
pass
return {
"success": True,
"total_chunks": total_chunks,
"hnsw_available": hnsw_available,
"hnsw_count": hnsw_count,
"dimension": dimension,
"categories": categories,
"model_config": {
"backend": "fastembed",
"profile": "code",
"dimension": 768,
"max_tokens": 8192
}
}
except Exception as e:
return {
"success": False,
"total_chunks": 0,
"hnsw_available": False,
"hnsw_count": 0,
"dimension": 0,
"error": str(e)
}
def reindex(self) -> Dict[str, Any]:
"""
Rebuild HNSW index from scratch.
Returns:
Result dict with success and timing
"""
try:
start_time = time.time()
self.store.rebuild_ann_index()
elapsed_time = time.time() - start_time
return {
"success": True,
"hnsw_count": self.store.ann_count,
"elapsed_time": round(elapsed_time, 3)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def main():
"""Main entry point. Reads JSON from stdin, writes JSON to stdout."""
try:
raw_input = sys.stdin.read()
if not raw_input.strip():
print(json.dumps({
"success": False,
"error": "No input provided. Send JSON via stdin."
}))
sys.exit(1)
request = json.loads(raw_input)
except json.JSONDecodeError as e:
print(json.dumps({
"success": False,
"error": f"Invalid JSON input: {e}"
}))
sys.exit(1)
operation = request.get("operation")
store_path = request.get("store_path")
if not operation:
print(json.dumps({
"success": False,
"error": "Missing required field: operation"
}))
sys.exit(1)
if not store_path:
print(json.dumps({
"success": False,
"error": "Missing required field: store_path"
}))
sys.exit(1)
try:
embedder = UnifiedMemoryEmbedder(store_path)
if operation == "embed":
chunks = request.get("chunks", [])
batch_size = request.get("batch_size", 8)
result = embedder.embed(chunks, batch_size=batch_size)
elif operation == "search":
query = request.get("query", "")
if not query:
result = {"success": False, "error": "Missing required field: query", "matches": []}
else:
top_k = request.get("top_k", 10)
min_score = request.get("min_score", 0.3)
category = request.get("category")
result = embedder.search(query, top_k=top_k, min_score=min_score, category=category)
elif operation == "search_by_vector":
vector = request.get("vector", [])
if not vector:
result = {"success": False, "error": "Missing required field: vector", "matches": []}
else:
top_k = request.get("top_k", 10)
min_score = request.get("min_score", 0.3)
category = request.get("category")
result = embedder.search_by_vector(vector, top_k=top_k, min_score=min_score, category=category)
elif operation == "status":
result = embedder.status()
elif operation == "reindex":
result = embedder.reindex()
else:
result = {
"success": False,
"error": f"Unknown operation: {operation}. Valid: embed, search, search_by_vector, status, reindex"
}
print(json.dumps(result))
# Clean up ONNX resources to ensure process can exit cleanly
clear_embedder_cache()
except Exception as e:
try:
clear_embedder_cache()
except Exception:
pass
print(json.dumps({
"success": False,
"error": str(e)
}))
sys.exit(1)
if __name__ == "__main__":
main()