feat: 添加配置选项以调整重排序模型的权重和测试文件惩罚,增强语义搜索功能

This commit is contained in:
catlog22
2026-01-13 10:44:26 +08:00
parent bf06f4ddcc
commit 8c2d39d517
9 changed files with 1043 additions and 23 deletions

View File

@@ -294,6 +294,7 @@ const i18n = {
'codexlens.envGroup.reranker': 'Reranker Configuration',
'codexlens.envGroup.concurrency': 'Concurrency Settings',
'codexlens.envGroup.cascade': 'Cascade Search Settings',
'codexlens.envGroup.chunking': 'Chunking Options',
'codexlens.envGroup.llm': 'LLM Features',
// Environment variable field labels
'codexlens.envField.backend': 'Backend',
@@ -313,6 +314,10 @@ const i18n = {
'codexlens.envField.searchStrategy': 'Search Strategy',
'codexlens.envField.coarseK': 'Coarse K (1st stage)',
'codexlens.envField.fineK': 'Fine K (final)',
'codexlens.envField.stripComments': 'Strip Comments',
'codexlens.envField.stripDocstrings': 'Strip Docstrings',
'codexlens.envField.testFilePenalty': 'Test File Penalty',
'codexlens.envField.docstringWeight': 'Docstring Weight',
'codexlens.usingApiReranker': 'Using API Reranker',
'codexlens.currentModel': 'Current Model',
'codexlens.localModels': 'Local Models',
@@ -2443,6 +2448,7 @@ const i18n = {
'codexlens.envGroup.reranker': '重排序配置',
'codexlens.envGroup.concurrency': '并发设置',
'codexlens.envGroup.cascade': '级联搜索设置',
'codexlens.envGroup.chunking': '分块选项',
'codexlens.envGroup.llm': 'LLM 功能',
// 环境变量字段标签
'codexlens.envField.backend': '后端',
@@ -2462,6 +2468,10 @@ const i18n = {
'codexlens.envField.searchStrategy': '搜索策略',
'codexlens.envField.coarseK': '粗筛 K (第一阶段)',
'codexlens.envField.fineK': '精筛 K (最终)',
'codexlens.envField.stripComments': '去除注释',
'codexlens.envField.stripDocstrings': '去除文档字符串',
'codexlens.envField.testFilePenalty': '测试文件惩罚',
'codexlens.envField.docstringWeight': '文档字符串权重',
'codexlens.usingApiReranker': '使用 API 重排序',
'codexlens.currentModel': '当前模型',
'codexlens.localModels': '本地模型',

View File

@@ -1109,6 +1109,16 @@ var ENV_VAR_GROUPS = {
'CODEXLENS_CASCADE_COARSE_K': { labelKey: 'codexlens.envField.coarseK', type: 'number', placeholder: '100', default: '100', settingsPath: 'cascade.coarse_k', min: 10, max: 500 },
'CODEXLENS_CASCADE_FINE_K': { labelKey: 'codexlens.envField.fineK', type: 'number', placeholder: '10', default: '10', settingsPath: 'cascade.fine_k', min: 1, max: 100 }
}
},
chunking: {
labelKey: 'codexlens.envGroup.chunking',
icon: 'scissors',
vars: {
'CHUNK_STRIP_COMMENTS': { labelKey: 'codexlens.envField.stripComments', type: 'select', options: ['true', 'false'], default: 'true', settingsPath: 'chunking.strip_comments' },
'CHUNK_STRIP_DOCSTRINGS': { labelKey: 'codexlens.envField.stripDocstrings', type: 'select', options: ['true', 'false'], default: 'true', settingsPath: 'chunking.strip_docstrings' },
'RERANKER_TEST_FILE_PENALTY': { labelKey: 'codexlens.envField.testFilePenalty', type: 'number', placeholder: '0.0', default: '0.0', settingsPath: 'reranker.test_file_penalty', min: 0, max: 1, step: 0.1 },
'RERANKER_DOCSTRING_WEIGHT': { labelKey: 'codexlens.envField.docstringWeight', type: 'number', placeholder: '1.0', default: '1.0', settingsPath: 'reranker.docstring_weight', min: 0, max: 1, step: 0.1 }
}
}
};

View File

@@ -0,0 +1,318 @@
#!/usr/bin/env python
"""Debug script to trace semantic search (dense_rerank) flow step by step."""
import json
import logging
import sqlite3
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple
# Add src to path
sys.path.insert(0, str(Path(__file__).parent / "src"))
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s | %(levelname)-5s | %(name)s | %(message)s",
datefmt="%H:%M:%S",
)
# Enable debug for specific modules
for name in ["codexlens.search", "codexlens.semantic", "codexlens.indexing"]:
logging.getLogger(name).setLevel(logging.DEBUG)
logger = logging.getLogger("debug_semantic")
def load_config() -> Dict[str, Any]:
"""Load config from codexlens settings."""
config_path = Path.home() / ".codexlens" / "config.json"
if config_path.exists():
with open(config_path) as f:
return json.load(f)
return {}
def inspect_hnsw_index(index_root: Path) -> Dict[str, Any]:
"""Inspect centralized HNSW index metadata."""
hnsw_path = index_root / "_vectors.hnsw"
meta_path = index_root / "_vectors_meta.db"
result = {
"hnsw_exists": hnsw_path.exists(),
"meta_exists": meta_path.exists(),
"hnsw_size_mb": round(hnsw_path.stat().st_size / (1024*1024), 2) if hnsw_path.exists() else 0,
}
if meta_path.exists():
conn = sqlite3.connect(str(meta_path))
cursor = conn.execute("SELECT COUNT(*) FROM chunk_metadata")
result["total_chunks"] = cursor.fetchone()[0]
# Sample file paths
cursor = conn.execute("""
SELECT DISTINCT file_path FROM chunk_metadata
ORDER BY file_path LIMIT 20
""")
result["sample_files"] = [row[0] for row in cursor.fetchall()]
# Check if tests vs src
cursor = conn.execute("""
SELECT
CASE
WHEN file_path LIKE '%tests%' OR file_path LIKE '%test_%' THEN 'test'
ELSE 'src'
END as category,
COUNT(*) as count
FROM chunk_metadata
GROUP BY category
""")
result["category_distribution"] = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
return result
def run_dense_search(query: str, index_root: Path, top_k: int = 50) -> List[Tuple[int, float, str]]:
"""Execute dense vector search and return candidates with details."""
from codexlens.semantic.ann_index import ANNIndex
from codexlens.semantic.factory import get_embedder
from codexlens.semantic.vector_store import VectorStore
logger.info("=" * 60)
logger.info("STAGE 1: Dense Embedding Generation")
logger.info("=" * 60)
# Read model config from index
index_db = index_root / "_index.db"
embedding_model = "qwen3-embedding-sf"
embedding_backend = "litellm"
if index_db.exists():
try:
with VectorStore(index_db) as vs:
model_config = vs.get_model_config()
if model_config:
embedding_backend = model_config.get("backend", embedding_backend)
embedding_model = model_config.get("model_name", embedding_model)
logger.info(f"Model config from index: {embedding_backend}/{embedding_model}")
except Exception as e:
logger.warning(f"Failed to read model config: {e}")
# Generate query embedding
embedder = get_embedder(backend=embedding_backend, model=embedding_model)
query_embedding = embedder.embed_to_numpy([query])[0]
logger.info(f"Query: {query!r}")
logger.info(f"Query embedding dim: {query_embedding.shape[0]}")
logger.info(f"Query embedding norm: {(query_embedding**2).sum()**0.5:.4f}")
# Load HNSW index
logger.info("=" * 60)
logger.info("STAGE 2: HNSW Vector Search (Coarse)")
logger.info("=" * 60)
ann_index = ANNIndex.create_central(
index_root=index_root,
dim=query_embedding.shape[0],
)
if not ann_index.load():
logger.error("Failed to load HNSW index")
return []
logger.info(f"HNSW index count: {ann_index.count()}")
# Execute search
ids, distances = ann_index.search(query_embedding, top_k=top_k)
logger.info(f"Found {len(ids)} candidates")
# Get chunk details
candidates = []
meta_path = index_root / "_vectors_meta.db"
if meta_path.exists():
conn = sqlite3.connect(str(meta_path))
conn.row_factory = sqlite3.Row
for chunk_id, distance in zip(ids, distances):
cursor = conn.execute("""
SELECT file_path, content, start_line, end_line
FROM chunk_metadata WHERE chunk_id = ?
""", (int(chunk_id),))
row = cursor.fetchone()
if row:
candidates.append((
int(chunk_id),
float(distance),
row["file_path"],
row["content"][:200] if row["content"] else "",
row["start_line"],
row["end_line"],
))
conn.close()
# Print top candidates
logger.info("\nTop 20 Dense Search Candidates:")
logger.info("-" * 80)
for i, (cid, dist, path, content, start, end) in enumerate(candidates[:20]):
score = max(0, 1 - dist)
is_test = "tests/" in path or "test_" in Path(path).name
marker = "[TEST]" if is_test else "[SRC]"
logger.info(f"{i+1:2d}. {marker} dist={dist:.4f} score={score:.4f}")
logger.info(f" {path}:{start}-{end}")
logger.info(f" {content[:100]}...")
logger.info("")
return candidates
def run_reranking(query: str, candidates: List[Tuple], top_k: int = 10) -> List[Tuple[str, float, float]]:
"""Execute cross-encoder reranking on candidates."""
from codexlens.semantic.reranker import get_reranker, check_reranker_available
logger.info("=" * 60)
logger.info("STAGE 3: Cross-Encoder Reranking")
logger.info("=" * 60)
# Check reranker availability
config = load_config()
backend = config.get("reranker_backend", "api")
model = config.get("reranker_model", "Qwen/Qwen3-Reranker-8B")
logger.info(f"Reranker backend: {backend}")
logger.info(f"Reranker model: {model}")
ok, err = check_reranker_available(backend)
if not ok:
logger.error(f"Reranker not available: {err}")
return []
reranker = get_reranker(backend=backend, model_name=model)
# Prepare pairs for reranking
pairs = []
for cid, dist, path, content, start, end in candidates[:50]: # Top 50 for reranking
doc_text = content if content else path
pairs.append((query, doc_text))
logger.info(f"Reranking {len(pairs)} candidates...")
# Execute reranking
scores = reranker.score_pairs(pairs, batch_size=32)
# Combine scores
results = []
for i, (cid, dist, path, content, start, end) in enumerate(candidates[:len(scores)]):
dense_score = max(0, 1 - dist)
rerank_score = scores[i]
combined = 0.5 * dense_score + 0.5 * rerank_score
is_test = "tests/" in path or "test_" in Path(path).name
results.append((path, dense_score, rerank_score, combined, is_test, content[:100]))
# Sort by combined score
results.sort(key=lambda x: x[3], reverse=True)
logger.info("\nTop 20 Reranked Results:")
logger.info("-" * 100)
logger.info(f"{'Rank':>4} {'Type':^6} {'Dense':^8} {'Rerank':^8} {'Combined':^8} Path")
logger.info("-" * 100)
for i, (path, dense, rerank, combined, is_test, content) in enumerate(results[:20]):
marker = "TEST" if is_test else "SRC"
logger.info(f"{i+1:4d} [{marker:^4}] {dense:8.4f} {rerank:8.4f} {combined:8.4f} {path}")
return results[:top_k]
def analyze_problem(candidates: List[Tuple], results: List[Tuple]):
"""Analyze why tests might rank higher than src files."""
logger.info("=" * 60)
logger.info("ANALYSIS: Why Tests Rank Higher?")
logger.info("=" * 60)
# Count test vs src in dense candidates
test_in_dense = sum(1 for c in candidates[:50] if "tests/" in c[2] or "test_" in Path(c[2]).name)
src_in_dense = 50 - test_in_dense
logger.info(f"\nDense Search (top 50):")
logger.info(f" - Test files: {test_in_dense} ({test_in_dense*2}%)")
logger.info(f" - Src files: {src_in_dense} ({src_in_dense*2}%)")
# Average scores by category
test_dense_scores = [max(0, 1-c[1]) for c in candidates[:50] if "tests/" in c[2] or "test_" in Path(c[2]).name]
src_dense_scores = [max(0, 1-c[1]) for c in candidates[:50] if not ("tests/" in c[2] or "test_" in Path(c[2]).name)]
if test_dense_scores:
logger.info(f"\nDense Score Averages:")
logger.info(f" - Test files: {sum(test_dense_scores)/len(test_dense_scores):.4f}")
if src_dense_scores:
logger.info(f" - Src files: {sum(src_dense_scores)/len(src_dense_scores):.4f}")
# Check rerank score distribution
test_results = [r for r in results if r[4]]
src_results = [r for r in results if not r[4]]
if test_results and src_results:
logger.info(f"\nRerank Score Averages:")
logger.info(f" - Test files: {sum(r[2] for r in test_results)/len(test_results):.4f}")
logger.info(f" - Src files: {sum(r[2] for r in src_results)/len(src_results):.4f}")
logger.info("\n" + "=" * 60)
logger.info("HYPOTHESIS:")
logger.info("=" * 60)
if test_in_dense > src_in_dense:
logger.info("→ Problem is at DENSE SEARCH stage")
logger.info(" Test files have embeddings closer to query")
logger.info(" Possible causes:")
logger.info(" 1. Test files mention implementation concepts in comments/docstrings")
logger.info(" 2. Embedding model doesn't distinguish between tests and implementation")
logger.info(" 3. Test file chunks are more frequent in the index")
else:
logger.info("→ Problem may be at RERANKING stage")
logger.info(" Reranker gives higher scores to test content")
def main():
query = "文件索引和嵌入向量生成的实现逻辑"
index_root = Path(r"C:\Users\dyw\.codexlens\indexes\D\Claude_dms3")
logger.info("=" * 60)
logger.info("DEBUG: Semantic Search Analysis")
logger.info("=" * 60)
logger.info(f"Query: {query}")
logger.info(f"Index root: {index_root}")
logger.info("")
# Step 1: Inspect index
logger.info("STEP 0: Index Inspection")
logger.info("-" * 60)
index_info = inspect_hnsw_index(index_root)
for k, v in index_info.items():
if k == "sample_files":
logger.info(f" {k}:")
for f in v[:10]:
logger.info(f" - {f}")
elif k == "category_distribution":
logger.info(f" {k}:")
for cat, count in v.items():
logger.info(f" - {cat}: {count}")
else:
logger.info(f" {k}: {v}")
logger.info("")
# Step 2: Dense search
candidates = run_dense_search(query, index_root, top_k=100)
if not candidates:
logger.error("No candidates from dense search")
return
# Step 3: Reranking
results = run_reranking(query, candidates, top_k=20)
# Step 4: Analyze
analyze_problem(candidates, results)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,276 @@
#!/usr/bin/env python
"""Debug script v2: Trace the full semantic search flow with detailed logging."""
import json
import logging
import sqlite3
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Tuple
# Add src to path
sys.path.insert(0, str(Path(__file__).parent / "src"))
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-5s | %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("debug")
def count_chunks_by_category(index_root: Path) -> Dict[str, int]:
"""Count chunks by category (src vs test) across all indexes."""
counts = defaultdict(int)
for db_path in index_root.rglob("_index.db"):
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.execute("""
SELECT file_path FROM semantic_chunks
""")
for row in cursor:
path = row[0]
if "tests" in path or "test_" in Path(path).name:
counts["test"] += 1
else:
counts["src"] += 1
conn.close()
except:
pass
return dict(counts)
def run_dense_search_with_trace(query: str, source_path: Path) -> List[Dict]:
"""Run dense search with detailed tracing."""
from codexlens.config import Config
from codexlens.search.chain_search import ChainSearchEngine, SearchOptions
from codexlens.storage.registry import Registry
from codexlens.storage.path_mapper import PathMapper
# Load config
config = Config.load()
registry = Registry(config.data_dir)
mapper = PathMapper(config.data_dir)
# Create search engine with verbose logging
engine = ChainSearchEngine(registry, mapper, config=config)
engine.logger.setLevel(logging.DEBUG)
# Set up handler to capture all log output
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
engine.logger.addHandler(handler)
# Execute cascade search with dense_rerank strategy
options = SearchOptions(depth=-1) # Search all subdirectories
logger.info("=" * 70)
logger.info("Executing dense_rerank cascade search...")
logger.info(f"Query: {query}")
logger.info(f"Source: {source_path}")
logger.info("=" * 70)
result = engine.cascade_search(
query=query,
source_path=source_path,
k=20,
coarse_k=100,
options=options,
strategy="dense_rerank"
)
# Analyze results
logger.info("\n" + "=" * 70)
logger.info("SEARCH RESULTS ANALYSIS")
logger.info("=" * 70)
test_count = 0
src_count = 0
results_detail = []
for i, r in enumerate(result.results):
is_test = "tests" in r.path or "test_" in Path(r.path).name
if is_test:
test_count += 1
category = "TEST"
else:
src_count += 1
category = "SRC"
# Get metadata scores if available
pre_ce_score = r.metadata.get("pre_cross_encoder_score", r.score)
ce_score = r.metadata.get("cross_encoder_score", 0)
ce_prob = r.metadata.get("cross_encoder_prob", 0)
results_detail.append({
"rank": i + 1,
"category": category,
"path": r.path,
"score": r.score,
"pre_ce_score": pre_ce_score,
"ce_score": ce_score,
"ce_prob": ce_prob,
"excerpt": r.excerpt[:100] if r.excerpt else "",
})
logger.info(f"{i+1:2d}. [{category:4s}] score={r.score:.4f} pre_ce={pre_ce_score:.4f} ce={ce_score:.4f}")
logger.info(f" {r.path}")
if r.excerpt:
logger.info(f" {r.excerpt[:80]}...")
logger.info("")
logger.info(f"\nSummary: {src_count} SRC files, {test_count} TEST files in top {len(result.results)}")
logger.info(f"Search time: {result.stats.time_ms:.2f}ms")
return results_detail
def compare_coarse_candidates():
"""Compare coarse candidates before and after reranking."""
from codexlens.config import Config
from codexlens.semantic.factory import get_embedder
from codexlens.semantic.ann_index import ANNIndex
query = "文件索引和嵌入向量生成的实现逻辑"
config = Config.load()
# Generate query embedding
embedder = get_embedder(backend="litellm", model="qwen3-embedding-sf")
query_embedding = embedder.embed_to_numpy([query])[0]
logger.info("=" * 70)
logger.info("COARSE CANDIDATE ANALYSIS (per directory)")
logger.info("=" * 70)
# Scan all HNSW indexes
index_root = Path(r"C:\Users\dyw\.codexlens\indexes\D\Claude_dms3\codex-lens")
all_candidates = []
for hnsw_path in index_root.rglob("_index_vectors.hnsw"):
db_path = hnsw_path.parent / "_index.db"
if not db_path.exists():
continue
try:
ann_index = ANNIndex(db_path, dim=query_embedding.shape[0])
if not ann_index.load() or ann_index.count() == 0:
continue
ids, distances = ann_index.search(query_embedding, top_k=10)
# Get file paths from chunks
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
dir_name = hnsw_path.parent.relative_to(index_root)
for chunk_id, dist in zip(ids, distances):
cursor = conn.execute("""
SELECT file_path, content FROM semantic_chunks WHERE id = ?
""", (int(chunk_id),))
row = cursor.fetchone()
if row:
is_test = "tests" in row["file_path"] or "test_" in Path(row["file_path"]).name
all_candidates.append({
"dir": str(dir_name),
"chunk_id": int(chunk_id),
"distance": float(dist),
"score": max(0, 1 - float(dist)),
"is_test": is_test,
"file_path": row["file_path"],
"content_preview": row["content"][:100] if row["content"] else ""
})
conn.close()
except Exception as e:
logger.warning(f"Error processing {hnsw_path}: {e}")
# Sort by distance (closest first)
all_candidates.sort(key=lambda x: x["distance"])
logger.info(f"\nTotal coarse candidates across all directories: {len(all_candidates)}")
# Analyze distribution
test_candidates = [c for c in all_candidates if c["is_test"]]
src_candidates = [c for c in all_candidates if not c["is_test"]]
logger.info(f"Test files: {len(test_candidates)}")
logger.info(f"Src files: {len(src_candidates)}")
if test_candidates:
avg_test_dist = sum(c["distance"] for c in test_candidates) / len(test_candidates)
logger.info(f"Avg test distance: {avg_test_dist:.4f}")
if src_candidates:
avg_src_dist = sum(c["distance"] for c in src_candidates) / len(src_candidates)
logger.info(f"Avg src distance: {avg_src_dist:.4f}")
logger.info("\nTop 30 candidates (combined from all directories):")
logger.info("-" * 90)
for i, c in enumerate(all_candidates[:30]):
cat = "TEST" if c["is_test"] else "SRC"
logger.info(f"{i+1:2d}. [{cat:4s}] dist={c['distance']:.4f} score={c['score']:.4f} dir={c['dir']}")
logger.info(f" {Path(c['file_path']).name}")
return all_candidates
def main():
logger.info("=" * 70)
logger.info("SEMANTIC SEARCH DEBUG SESSION")
logger.info("=" * 70)
# Step 1: Count chunks distribution
index_root = Path(r"C:\Users\dyw\.codexlens\indexes\D\Claude_dms3\codex-lens")
counts = count_chunks_by_category(index_root)
logger.info(f"\nChunk distribution in index:")
logger.info(f" - Test chunks: {counts.get('test', 0)}")
logger.info(f" - Src chunks: {counts.get('src', 0)}")
# Step 2: Compare coarse candidates
logger.info("\n")
candidates = compare_coarse_candidates()
# Step 3: Run full search
logger.info("\n")
query = "文件索引和嵌入向量生成的实现逻辑"
source_path = Path(r"D:\Claude_dms3\codex-lens")
results = run_dense_search_with_trace(query, source_path)
# Summary
logger.info("\n" + "=" * 70)
logger.info("ROOT CAUSE ANALYSIS")
logger.info("=" * 70)
test_in_top10 = sum(1 for r in results[:10] if r["category"] == "TEST")
src_in_top10 = 10 - test_in_top10
logger.info(f"\nTop 10 results: {src_in_top10} SRC, {test_in_top10} TEST")
if test_in_top10 > src_in_top10:
logger.info("\nPROBLEM: Test files dominate top results")
logger.info("\nPossible causes:")
logger.info(" 1. Test files mention implementation concepts explicitly")
logger.info(" (e.g., docstrings describe what they test)")
logger.info(" 2. Embedding model treats test descriptions as similar to")
logger.info(" implementation descriptions")
logger.info(" 3. Cross-encoder reranker gives higher scores to")
logger.info(" descriptive test content over implementation code")
# Check if coarse candidates already favor tests
test_in_coarse_top30 = sum(1 for c in candidates[:30] if c["is_test"])
if test_in_coarse_top30 > 15:
logger.info(f"\n → Dense coarse search already favors tests")
logger.info(f" ({test_in_coarse_top30}/30 test files in coarse top-30)")
logger.info(f" Problem is at EMBEDDING/DENSE SEARCH stage")
else:
logger.info(f"\n → Coarse search is balanced ({test_in_coarse_top30}/30 tests)")
logger.info(f" Problem is at CROSS-ENCODER RERANKING stage")
if __name__ == "__main__":
main()

View File

@@ -141,6 +141,12 @@ class Config:
reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
reranker_top_k: int = 50
reranker_max_input_tokens: int = 8192 # Maximum tokens for reranker API batching
reranker_chunk_type_weights: Optional[Dict[str, float]] = None # Weights for chunk types: {"code": 1.0, "docstring": 0.7}
reranker_test_file_penalty: float = 0.0 # Penalty for test files (0.0-1.0, e.g., 0.2 = 20% reduction)
# Chunk stripping configuration (for semantic embedding)
chunk_strip_comments: bool = True # Strip comments from code chunks
chunk_strip_docstrings: bool = True # Strip docstrings from code chunks
# Cascade search configuration (two-stage retrieval)
enable_cascade_search: bool = False # Enable cascade search (coarse + fine ranking)
@@ -545,6 +551,35 @@ class Config:
except ValueError:
log.warning("Invalid RERANKER_MAX_INPUT_TOKENS in .env: %r", reranker_max_tokens)
# Reranker tuning from environment
test_penalty = get_env("RERANKER_TEST_FILE_PENALTY")
if test_penalty:
try:
self.reranker_test_file_penalty = float(test_penalty)
log.debug("Overriding reranker_test_file_penalty from .env: %s", self.reranker_test_file_penalty)
except ValueError:
log.warning("Invalid RERANKER_TEST_FILE_PENALTY in .env: %r", test_penalty)
docstring_weight = get_env("RERANKER_DOCSTRING_WEIGHT")
if docstring_weight:
try:
weight = float(docstring_weight)
self.reranker_chunk_type_weights = {"code": 1.0, "docstring": weight}
log.debug("Overriding reranker docstring weight from .env: %s", weight)
except ValueError:
log.warning("Invalid RERANKER_DOCSTRING_WEIGHT in .env: %r", docstring_weight)
# Chunk stripping from environment
strip_comments = get_env("CHUNK_STRIP_COMMENTS")
if strip_comments:
self.chunk_strip_comments = strip_comments.lower() in ("true", "1", "yes")
log.debug("Overriding chunk_strip_comments from .env: %s", self.chunk_strip_comments)
strip_docstrings = get_env("CHUNK_STRIP_DOCSTRINGS")
if strip_docstrings:
self.chunk_strip_docstrings = strip_docstrings.lower() in ("true", "1", "yes")
log.debug("Overriding chunk_strip_docstrings from .env: %s", self.chunk_strip_docstrings)
@classmethod
def load(cls) -> "Config":
"""Load config with settings from file."""

View File

@@ -45,6 +45,12 @@ ENV_VARS = {
# General configuration
"CODEXLENS_DATA_DIR": "Custom data directory path",
"CODEXLENS_DEBUG": "Enable debug mode (true/false)",
# Chunking configuration
"CHUNK_STRIP_COMMENTS": "Strip comments from code chunks for embedding: true/false (default: true)",
"CHUNK_STRIP_DOCSTRINGS": "Strip docstrings from code chunks for embedding: true/false (default: true)",
# Reranker tuning
"RERANKER_TEST_FILE_PENALTY": "Penalty for test files in reranking: 0.0-1.0 (default: 0.0)",
"RERANKER_DOCSTRING_WEIGHT": "Weight for docstring chunks in reranking: 0.0-1.0 (default: 1.0)",
}

View File

@@ -1816,12 +1816,22 @@ class ChainSearchEngine:
# Use cross_encoder_rerank from ranking module
from codexlens.search.ranking import cross_encoder_rerank
# Get chunk_type weights and test_file_penalty from config
chunk_type_weights = None
test_file_penalty = 0.0
if self._config is not None:
chunk_type_weights = getattr(self._config, "reranker_chunk_type_weights", None)
test_file_penalty = getattr(self._config, "reranker_test_file_penalty", 0.0)
return cross_encoder_rerank(
query=query,
results=results,
reranker=reranker,
top_k=top_k,
batch_size=32,
chunk_type_weights=chunk_type_weights,
test_file_penalty=test_file_penalty,
)
def search_files_only(self, query: str,

View File

@@ -613,11 +613,24 @@ def cross_encoder_rerank(
reranker: Any,
top_k: int = 50,
batch_size: int = 32,
chunk_type_weights: Optional[Dict[str, float]] = None,
test_file_penalty: float = 0.0,
) -> List[SearchResult]:
"""Second-stage reranking using a cross-encoder model.
This function is dependency-agnostic: callers can pass any object that exposes
a compatible `score_pairs(pairs, batch_size=...)` method.
Args:
query: Search query string
results: List of search results to rerank
reranker: Cross-encoder model with score_pairs or predict method
top_k: Number of top results to rerank
batch_size: Batch size for reranking
chunk_type_weights: Optional weights for different chunk types.
Example: {"code": 1.0, "docstring": 0.7} - reduce docstring influence
test_file_penalty: Penalty applied to test files (0.0-1.0).
Example: 0.2 means test files get 20% score reduction
"""
if not results:
return []
@@ -667,13 +680,50 @@ def cross_encoder_rerank(
reranked_results: List[SearchResult] = []
# Helper to detect test files
def is_test_file(path: str) -> bool:
if not path:
return False
basename = path.split("/")[-1].split("\\")[-1]
return (
basename.startswith("test_") or
basename.endswith("_test.py") or
basename.endswith(".test.ts") or
basename.endswith(".test.js") or
basename.endswith(".spec.ts") or
basename.endswith(".spec.js") or
"/tests/" in path or
"\\tests\\" in path or
"/test/" in path or
"\\test\\" in path
)
for idx, result in enumerate(results):
if idx < rerank_count:
prev_score = float(result.score)
ce_score = scores[idx]
ce_prob = probs[idx]
# Base combined score
combined_score = 0.5 * prev_score + 0.5 * ce_prob
# Apply chunk_type weight adjustment
if chunk_type_weights:
chunk_type = None
if result.chunk and hasattr(result.chunk, "metadata"):
chunk_type = result.chunk.metadata.get("chunk_type")
elif result.metadata:
chunk_type = result.metadata.get("chunk_type")
if chunk_type and chunk_type in chunk_type_weights:
weight = chunk_type_weights[chunk_type]
# Apply weight to CE contribution only
combined_score = 0.5 * prev_score + 0.5 * ce_prob * weight
# Apply test file penalty
if test_file_penalty > 0 and is_test_file(result.path):
combined_score = combined_score * (1.0 - test_file_penalty)
reranked_results.append(
SearchResult(
path=result.path,

View File

@@ -43,6 +43,250 @@ class ChunkConfig:
strategy: str = "auto" # Chunking strategy: auto, symbol, sliding_window, hybrid
min_chunk_size: int = 50 # Minimum chunk size
skip_token_count: bool = False # Skip expensive token counting (use char/4 estimate)
strip_comments: bool = True # Remove comments from chunk content for embedding
strip_docstrings: bool = True # Remove docstrings from chunk content for embedding
preserve_original: bool = True # Store original content in metadata when stripping
class CommentStripper:
"""Remove comments from source code while preserving structure."""
@staticmethod
def strip_python_comments(content: str) -> str:
"""Strip Python comments (# style) but preserve docstrings.
Args:
content: Python source code
Returns:
Code with comments removed
"""
lines = content.splitlines(keepends=True)
result_lines: List[str] = []
in_string = False
string_char = None
for line in lines:
new_line = []
i = 0
while i < len(line):
char = line[i]
# Handle string literals
if char in ('"', "'") and not in_string:
# Check for triple quotes
if line[i:i+3] in ('"""', "'''"):
in_string = True
string_char = line[i:i+3]
new_line.append(line[i:i+3])
i += 3
continue
else:
in_string = True
string_char = char
elif in_string:
if string_char and len(string_char) == 3:
if line[i:i+3] == string_char:
in_string = False
new_line.append(line[i:i+3])
i += 3
string_char = None
continue
elif char == string_char:
# Check for escape
if i > 0 and line[i-1] != '\\':
in_string = False
string_char = None
# Handle comments (only outside strings)
if char == '#' and not in_string:
# Rest of line is comment, skip it
new_line.append('\n' if line.endswith('\n') else '')
break
new_line.append(char)
i += 1
result_lines.append(''.join(new_line))
return ''.join(result_lines)
@staticmethod
def strip_c_style_comments(content: str) -> str:
"""Strip C-style comments (// and /* */) from code.
Args:
content: Source code with C-style comments
Returns:
Code with comments removed
"""
result = []
i = 0
in_string = False
string_char = None
in_multiline_comment = False
while i < len(content):
# Handle multi-line comment end
if in_multiline_comment:
if content[i:i+2] == '*/':
in_multiline_comment = False
i += 2
continue
i += 1
continue
char = content[i]
# Handle string literals
if char in ('"', "'", '`') and not in_string:
in_string = True
string_char = char
result.append(char)
i += 1
continue
elif in_string:
result.append(char)
if char == string_char and (i == 0 or content[i-1] != '\\'):
in_string = False
string_char = None
i += 1
continue
# Handle comments
if content[i:i+2] == '//':
# Single line comment - skip to end of line
while i < len(content) and content[i] != '\n':
i += 1
if i < len(content):
result.append('\n')
i += 1
continue
if content[i:i+2] == '/*':
in_multiline_comment = True
i += 2
continue
result.append(char)
i += 1
return ''.join(result)
@classmethod
def strip_comments(cls, content: str, language: str) -> str:
"""Strip comments based on language.
Args:
content: Source code content
language: Programming language
Returns:
Code with comments removed
"""
if language == "python":
return cls.strip_python_comments(content)
elif language in {"javascript", "typescript", "java", "c", "cpp", "go", "rust"}:
return cls.strip_c_style_comments(content)
return content
class DocstringStripper:
"""Remove docstrings from source code."""
@staticmethod
def strip_python_docstrings(content: str) -> str:
"""Strip Python docstrings (triple-quoted strings at module/class/function level).
Args:
content: Python source code
Returns:
Code with docstrings removed
"""
lines = content.splitlines(keepends=True)
result_lines: List[str] = []
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
# Check for docstring start
if stripped.startswith('"""') or stripped.startswith("'''"):
quote_type = '"""' if stripped.startswith('"""') else "'''"
# Single line docstring
if stripped.count(quote_type) >= 2:
# Skip this line (docstring)
i += 1
continue
# Multi-line docstring - skip until closing
i += 1
while i < len(lines):
if quote_type in lines[i]:
i += 1
break
i += 1
continue
result_lines.append(line)
i += 1
return ''.join(result_lines)
@staticmethod
def strip_jsdoc_comments(content: str) -> str:
"""Strip JSDoc comments (/** ... */) from code.
Args:
content: JavaScript/TypeScript source code
Returns:
Code with JSDoc comments removed
"""
result = []
i = 0
in_jsdoc = False
while i < len(content):
if in_jsdoc:
if content[i:i+2] == '*/':
in_jsdoc = False
i += 2
continue
i += 1
continue
# Check for JSDoc start (/** but not /*)
if content[i:i+3] == '/**':
in_jsdoc = True
i += 3
continue
result.append(content[i])
i += 1
return ''.join(result)
@classmethod
def strip_docstrings(cls, content: str, language: str) -> str:
"""Strip docstrings based on language.
Args:
content: Source code content
language: Programming language
Returns:
Code with docstrings removed
"""
if language == "python":
return cls.strip_python_docstrings(content)
elif language in {"javascript", "typescript"}:
return cls.strip_jsdoc_comments(content)
return content
class Chunker:
@@ -51,6 +295,33 @@ class Chunker:
def __init__(self, config: ChunkConfig | None = None) -> None:
self.config = config or ChunkConfig()
self._tokenizer = get_default_tokenizer()
self._comment_stripper = CommentStripper()
self._docstring_stripper = DocstringStripper()
def _process_content(self, content: str, language: str) -> Tuple[str, Optional[str]]:
"""Process chunk content by stripping comments/docstrings if configured.
Args:
content: Original chunk content
language: Programming language
Returns:
Tuple of (processed_content, original_content_if_preserved)
"""
original = content if self.config.preserve_original else None
processed = content
if self.config.strip_comments:
processed = self._comment_stripper.strip_comments(processed, language)
if self.config.strip_docstrings:
processed = self._docstring_stripper.strip_docstrings(processed, language)
# If nothing changed, don't store original
if processed == content:
original = None
return processed, original
def _estimate_token_count(self, text: str) -> int:
"""Estimate token count based on config.
@@ -120,30 +391,45 @@ class Chunker:
sub_chunk.metadata["symbol_name"] = symbol.name
sub_chunk.metadata["symbol_kind"] = symbol.kind
sub_chunk.metadata["strategy"] = "symbol_split"
sub_chunk.metadata["chunk_type"] = "code"
sub_chunk.metadata["parent_symbol_range"] = (start_line, end_line)
chunks.extend(sub_chunks)
else:
# Process content (strip comments/docstrings if configured)
processed_content, original_content = self._process_content(chunk_content, language)
# Skip if processed content is too small
if len(processed_content.strip()) < self.config.min_chunk_size:
continue
# Calculate token count if not provided
token_count = None
if symbol_token_counts and symbol.name in symbol_token_counts:
token_count = symbol_token_counts[symbol.name]
else:
token_count = self._estimate_token_count(chunk_content)
token_count = self._estimate_token_count(processed_content)
metadata = {
"file": str(file_path),
"language": language,
"symbol_name": symbol.name,
"symbol_kind": symbol.kind,
"start_line": start_line,
"end_line": end_line,
"strategy": "symbol",
"chunk_type": "code",
"token_count": token_count,
}
# Store original content if it was modified
if original_content is not None:
metadata["original_content"] = original_content
chunks.append(SemanticChunk(
content=chunk_content,
content=processed_content,
embedding=None,
metadata={
"file": str(file_path),
"language": language,
"symbol_name": symbol.name,
"symbol_kind": symbol.kind,
"start_line": start_line,
"end_line": end_line,
"strategy": "symbol",
"token_count": token_count,
}
metadata=metadata
))
return chunks
@@ -188,7 +474,19 @@ class Chunker:
chunk_content = "".join(lines[start:end])
if len(chunk_content.strip()) >= self.config.min_chunk_size:
token_count = self._estimate_token_count(chunk_content)
# Process content (strip comments/docstrings if configured)
processed_content, original_content = self._process_content(chunk_content, language)
# Skip if processed content is too small
if len(processed_content.strip()) < self.config.min_chunk_size:
# Move window forward
step = lines_per_chunk - overlap_lines
if step <= 0:
step = 1
start += step
continue
token_count = self._estimate_token_count(processed_content)
# Calculate correct line numbers
if line_mapping:
@@ -200,18 +498,25 @@ class Chunker:
start_line = start + 1
end_line = end
metadata = {
"file": str(file_path),
"language": language,
"chunk_index": chunk_idx,
"start_line": start_line,
"end_line": end_line,
"strategy": "sliding_window",
"chunk_type": "code",
"token_count": token_count,
}
# Store original content if it was modified
if original_content is not None:
metadata["original_content"] = original_content
chunks.append(SemanticChunk(
content=chunk_content,
content=processed_content,
embedding=None,
metadata={
"file": str(file_path),
"language": language,
"chunk_index": chunk_idx,
"start_line": start_line,
"end_line": end_line,
"strategy": "sliding_window",
"token_count": token_count,
}
metadata=metadata
))
chunk_idx += 1