Refactor team edict agent and task schemas; remove deprecated files

- Deleted Zhongshu Planner agent documentation as it is no longer needed.
- Removed agent instruction documentation to streamline task assignment process.
- Eliminated tasks schema file to simplify task management.
- Updated Codex Lens installation instructions to use 'uv' for pip commands.
- Bumped version to 0.4.1 in pyproject.toml and adjusted dependencies.
- Enhanced API embedding with text truncation and automatic batch splitting on 413 errors.
- Improved indexing pipeline with metadata registration and progress reporting.
- Converted index_project and index_update functions to async for better performance.
This commit is contained in:
catlog22
2026-03-19 15:17:48 +08:00
parent 00672ec8e5
commit 54071473fc
32 changed files with 511 additions and 4205 deletions

View File

@@ -31,13 +31,13 @@ That's it. Claude Code will auto-discover the tools: `index_project` → `search
```bash
# Standard install (includes vector search + API clients)
pip install codexlens-search
uv pip install codexlens-search
# With MCP server for Claude Code
pip install codexlens-search[mcp]
uv pip install codexlens-search[mcp]
```
Optional extras for advanced use:
Optional extras:
| Extra | Description |
|-------|-------------|
@@ -123,7 +123,7 @@ Format: `url|key|model,url|key|model,...`
### Local Models (Offline, No API)
```bash
pip install codexlens-search[mcp]
uv pip install codexlens-search[mcp]
codexlens-search download-models
```
@@ -195,6 +195,8 @@ codexlens-search download-models
| `CODEXLENS_FTS_TOP_K` | `50` | FTS results per method |
| `CODEXLENS_FUSION_K` | `60` | RRF fusion k parameter |
| `CODEXLENS_RERANKER_TOP_K` | `20` | Results to rerank |
| `CODEXLENS_EMBED_BATCH_SIZE` | `32` | Max texts per API batch (auto-splits on 413) |
| `CODEXLENS_EMBED_MAX_TOKENS` | `8192` | Max tokens per text (truncate if exceeded, 0=no limit) |
| `CODEXLENS_INDEX_WORKERS` | `2` | Parallel indexing workers |
| `CODEXLENS_MAX_FILE_SIZE` | `1000000` | Max file size in bytes |
@@ -215,7 +217,7 @@ Query → [Embedder] → query vector
```bash
git clone https://github.com/catlog22/codexlens-search.git
cd codexlens-search
pip install -e ".[dev]"
uv pip install -e ".[dev]"
pytest
```

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "codexlens-search"
version = "0.3.0"
version = "0.4.1"
description = "Lightweight semantic code search engine — 2-stage vector + FTS + RRF fusion + MCP server"
requires-python = ">=3.10"
dependencies = [

View File

@@ -91,6 +91,8 @@ def create_config_from_env(db_path: str | Path, **overrides: object) -> "Config"
kwargs["embed_api_concurrency"] = int(os.environ["CODEXLENS_EMBED_API_CONCURRENCY"])
if os.environ.get("CODEXLENS_EMBED_API_MAX_TOKENS"):
kwargs["embed_api_max_tokens_per_batch"] = int(os.environ["CODEXLENS_EMBED_API_MAX_TOKENS"])
if os.environ.get("CODEXLENS_EMBED_MAX_TOKENS"):
kwargs["embed_max_tokens"] = int(os.environ["CODEXLENS_EMBED_MAX_TOKENS"])
# Reranker API env vars
if os.environ.get("CODEXLENS_RERANKER_API_URL"):
kwargs["reranker_api_url"] = os.environ["CODEXLENS_RERANKER_API_URL"]

View File

@@ -10,7 +10,7 @@ class Config:
# Embedding
embed_model: str = "BAAI/bge-small-en-v1.5"
embed_dim: int = 384
embed_batch_size: int = 64
embed_batch_size: int = 32
# API embedding (optional — overrides local fastembed when set)
embed_api_url: str = "" # e.g. "https://api.openai.com/v1"
@@ -19,7 +19,8 @@ class Config:
# Multi-endpoint: list of {"url": "...", "key": "...", "model": "..."} dicts
embed_api_endpoints: list[dict[str, str]] = None # type: ignore[assignment]
embed_api_concurrency: int = 4
embed_api_max_tokens_per_batch: int = 8192
embed_api_max_tokens_per_batch: int = 32768
embed_max_tokens: int = 8192 # max tokens per single text (0 = no limit)
# Model download / cache
model_cache_dir: str = "" # empty = fastembed default cache

View File

@@ -95,6 +95,16 @@ class APIEmbedder(BaseEmbedder):
"""Rough token estimate: ~4 chars per token for code."""
return max(1, len(text) // 4)
def _truncate_text(self, text: str) -> str:
"""Truncate text to embed_max_tokens if configured."""
max_tokens = self._config.embed_max_tokens
if max_tokens <= 0:
return text
max_chars = max_tokens * 4 # inverse of _estimate_tokens
if len(text) > max_chars:
return text[:max_chars]
return text
def _pack_batches(
self, texts: list[str]
) -> list[list[tuple[int, str]]]:
@@ -189,14 +199,35 @@ class APIEmbedder(BaseEmbedder):
# -- Public interface ---------------------------------------------
def embed_single(self, text: str) -> np.ndarray:
text = self._truncate_text(text)
endpoint = self._next_endpoint()
vecs = self._call_api([text], endpoint)
return vecs[0]
def _call_api_with_split(
self,
texts: list[str],
endpoint: "_Endpoint",
) -> list[np.ndarray]:
"""Call API with automatic batch splitting on 413 errors."""
try:
return self._call_api(texts, endpoint)
except Exception as exc:
if "413" in str(exc) and len(texts) > 1:
mid = len(texts) // 2
logger.info("413 received, splitting batch %d%d + %d", len(texts), mid, len(texts) - mid)
left = self._call_api_with_split(texts[:mid], endpoint)
right = self._call_api_with_split(texts[mid:], endpoint)
return left + right
raise
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
if not texts:
return []
# 0. Truncate texts exceeding model context
texts = [self._truncate_text(t) for t in texts]
# 1. Pack into token-aware batches
packed = self._pack_batches(texts)
@@ -205,7 +236,7 @@ class APIEmbedder(BaseEmbedder):
batch_texts = [t for _, t in packed[0]]
batch_indices = [i for i, _ in packed[0]]
endpoint = self._next_endpoint()
vecs = self._call_api(batch_texts, endpoint)
vecs = self._call_api_with_split(batch_texts, endpoint)
results: dict[int, np.ndarray] = {}
for idx, vec in zip(batch_indices, vecs):
results[idx] = vec
@@ -220,7 +251,7 @@ class APIEmbedder(BaseEmbedder):
batch_texts = [t for _, t in batch]
batch_indices = [i for i, _ in batch]
endpoint = self._next_endpoint()
future = self._executor.submit(self._call_api, batch_texts, endpoint)
future = self._executor.submit(self._call_api_with_split, batch_texts, endpoint)
futures.append(future)
batch_index_map.append(batch_indices)

View File

@@ -604,6 +604,7 @@ class IndexingPipeline:
max_chunk_chars: int = _DEFAULT_MAX_CHUNK_CHARS,
chunk_overlap: int = _DEFAULT_CHUNK_OVERLAP,
max_file_size: int = 50_000,
progress_callback: callable | None = None,
) -> IndexStats:
"""Reconcile index state against a current file list.
@@ -637,19 +638,38 @@ class IndexingPipeline:
for rel in removed:
self.remove_file(rel)
# Index new and changed files
total_files = 0
total_chunks = 0
# Collect files needing update
files_to_index: list[Path] = []
for rel, fpath in current_rel_paths.items():
stats = self.index_file(
fpath,
try:
text = fpath.read_text(encoding="utf-8", errors="replace")
except Exception:
continue
content_hash = self._content_hash(text)
if meta.file_needs_update(rel, content_hash):
# Remove old data if previously indexed
if meta.get_file_hash(rel) is not None:
meta.mark_file_deleted(rel)
self._fts.delete_by_path(rel)
files_to_index.append(fpath)
# Batch index via parallel pipeline
if files_to_index:
# Set starting chunk ID from metadata
start_id = self._next_chunk_id()
batch_stats = self._index_files_with_metadata(
files_to_index,
root=root,
max_chunk_chars=max_chunk_chars,
chunk_overlap=chunk_overlap,
max_file_size=max_file_size,
start_chunk_id=start_id,
progress_callback=progress_callback,
)
total_files += stats.files_processed
total_chunks += stats.chunks_created
total_files = batch_stats.files_processed
total_chunks = batch_stats.chunks_created
else:
total_files = 0
total_chunks = 0
duration = time.monotonic() - t0
result = IndexStats(
@@ -665,6 +685,157 @@ class IndexingPipeline:
)
return result
def _index_files_with_metadata(
self,
files: list[Path],
*,
root: Path | None = None,
max_chunk_chars: int = _DEFAULT_MAX_CHUNK_CHARS,
chunk_overlap: int = _DEFAULT_CHUNK_OVERLAP,
start_chunk_id: int = 0,
progress_callback: callable | None = None,
) -> IndexStats:
"""Batch index files using the parallel pipeline, registering metadata.
Like index_files() but also registers each file and its chunks
in the MetadataStore for incremental tracking.
Args:
files: Files to index.
root: Root for relative paths.
max_chunk_chars: Max chars per chunk.
chunk_overlap: Overlap between chunks.
start_chunk_id: Starting chunk ID.
progress_callback: Optional callback(files_done, total_files) for progress.
"""
meta = self._require_metadata()
if not files:
return IndexStats()
t0 = time.monotonic()
embed_queue: queue.Queue = queue.Queue(maxsize=4)
index_queue: queue.Queue = queue.Queue(maxsize=4)
worker_errors: list[Exception] = []
error_lock = threading.Lock()
def _record_error(exc: Exception) -> None:
with error_lock:
worker_errors.append(exc)
embed_thread = threading.Thread(
target=self._embed_worker,
args=(embed_queue, index_queue, _record_error),
daemon=True, name="sync-embed",
)
index_thread = threading.Thread(
target=self._index_worker,
args=(index_queue, _record_error),
daemon=True, name="sync-index",
)
embed_thread.start()
index_thread.start()
chunk_id = start_chunk_id
files_processed = 0
chunks_created = 0
total_files = len(files)
# Cross-file chunk accumulator for optimal API batch utilization
max_batch_items = self._config.embed_batch_size
max_batch_tokens = self._config.embed_api_max_tokens_per_batch
buf_ids: list[int] = []
buf_texts: list[str] = []
buf_paths: list[str] = []
buf_lines: list[tuple[int, int]] = []
buf_tokens = 0
def _flush_buffer() -> None:
nonlocal buf_ids, buf_texts, buf_paths, buf_lines, buf_tokens
if buf_ids:
embed_queue.put((list(buf_ids), list(buf_texts), list(buf_paths), list(buf_lines)))
buf_ids.clear()
buf_texts.clear()
buf_paths.clear()
buf_lines.clear()
buf_tokens = 0
for fpath in files:
exclude_reason = is_file_excluded(fpath, self._config)
if exclude_reason:
logger.debug("Skipping %s: %s", fpath, exclude_reason)
if progress_callback:
progress_callback(files_processed, total_files)
continue
try:
text = fpath.read_text(encoding="utf-8", errors="replace")
except Exception as exc:
logger.debug("Skipping %s: %s", fpath, exc)
if progress_callback:
progress_callback(files_processed, total_files)
continue
rel_path = str(fpath.relative_to(root)) if root else str(fpath)
content_hash = self._content_hash(text)
file_chunks = self._smart_chunk(text, rel_path, max_chunk_chars, chunk_overlap)
if not file_chunks:
meta.register_file(rel_path, content_hash, fpath.stat().st_mtime)
continue
files_processed += 1
file_chunk_ids = []
for chunk_text, path, sl, el in file_chunks:
chunk_tokens = max(1, len(chunk_text) // 4)
# Flush if adding this chunk would exceed batch limits
if buf_ids and (
len(buf_ids) >= max_batch_items
or buf_tokens + chunk_tokens > max_batch_tokens
):
_flush_buffer()
buf_ids.append(chunk_id)
buf_texts.append(chunk_text)
buf_paths.append(path)
buf_lines.append((sl, el))
buf_tokens += chunk_tokens
file_chunk_ids.append((chunk_id, chunk_text))
chunk_id += 1
chunks_created += len(file_chunk_ids)
# Register metadata per file
meta.register_file(rel_path, content_hash, fpath.stat().st_mtime)
chunk_id_hashes = [
(cid, self._content_hash(ct)) for cid, ct in file_chunk_ids
]
meta.register_chunks(rel_path, chunk_id_hashes)
if progress_callback:
progress_callback(files_processed, total_files)
# Final flush for remaining chunks
_flush_buffer()
embed_queue.put(_SENTINEL)
embed_thread.join()
index_thread.join()
self._binary_store.save()
self._ann_index.save()
duration = time.monotonic() - t0
if worker_errors:
raise worker_errors[0]
return IndexStats(
files_processed=files_processed,
chunks_created=chunks_created,
duration_seconds=round(duration, 2),
)
def compact(self) -> None:
"""Rebuild indexes excluding tombstoned chunk IDs.

View File

@@ -58,11 +58,12 @@ Tuning: CODEXLENS_BINARY_TOP_K, _ANN_TOP_K, _FTS_TOP_K, _FUSION_K,
"""
from __future__ import annotations
import asyncio
import logging
import threading
from pathlib import Path
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp import Context, FastMCP
from codexlens_search.bridge import (
DEFAULT_EXCLUDES,
@@ -139,8 +140,9 @@ def search_code(project_path: str, query: str, top_k: int = 10) -> str:
# ---------------------------------------------------------------------------
@mcp.tool()
def index_project(
project_path: str, glob_pattern: str = "**/*", force: bool = False
async def index_project(
project_path: str, glob_pattern: str = "**/*", force: bool = False,
ctx: Context | None = None,
) -> str:
"""Build or rebuild the search index for a project.
@@ -167,7 +169,27 @@ def index_project(
if p.is_file() and not should_exclude(p.relative_to(root), DEFAULT_EXCLUDES)
]
stats = indexing.sync(file_paths, root=root)
if ctx:
await ctx.report_progress(0, len(file_paths), f"Scanning {len(file_paths)} files...")
# Progress callback bridging sync pipeline → async MCP context
loop = asyncio.get_event_loop()
def _progress(done: int, total: int) -> None:
if ctx:
asyncio.run_coroutine_threadsafe(
ctx.report_progress(done, total, f"Indexed {done}/{total} files"),
loop,
)
stats = indexing.sync(file_paths, root=root, progress_callback=_progress)
if ctx:
await ctx.report_progress(
stats.files_processed, stats.files_processed,
f"Done: {stats.files_processed} files, {stats.chunks_created} chunks"
)
return (
f"Indexed {stats.files_processed} files, "
f"{stats.chunks_created} chunks in {stats.duration_seconds:.1f}s. "
@@ -208,7 +230,10 @@ def index_status(project_path: str) -> str:
@mcp.tool()
def index_update(project_path: str, glob_pattern: str = "**/*") -> str:
async def index_update(
project_path: str, glob_pattern: str = "**/*",
ctx: Context | None = None,
) -> str:
"""Incrementally sync the index with current project files.
Only re-indexes files that changed since last indexing.
@@ -231,7 +256,19 @@ def index_update(project_path: str, glob_pattern: str = "**/*") -> str:
if p.is_file() and not should_exclude(p.relative_to(root), DEFAULT_EXCLUDES)
]
stats = indexing.sync(file_paths, root=root)
if ctx:
await ctx.report_progress(0, len(file_paths), f"Scanning {len(file_paths)} files...")
loop = asyncio.get_event_loop()
def _progress(done: int, total: int) -> None:
if ctx:
asyncio.run_coroutine_threadsafe(
ctx.report_progress(done, total, f"Synced {done}/{total} files"),
loop,
)
stats = indexing.sync(file_paths, root=root, progress_callback=_progress)
return (
f"Synced {stats.files_processed} files, "
f"{stats.chunks_created} chunks in {stats.duration_seconds:.1f}s."