Files
Claude-Code-Workflow/codex-lens/debug_semantic_search.py

319 lines
11 KiB
Python

#!/usr/bin/env python
"""Debug script to trace semantic search (dense_rerank) flow step by step."""
import json
import logging
import sqlite3
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple
# Add src to path
sys.path.insert(0, str(Path(__file__).parent / "src"))
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s | %(levelname)-5s | %(name)s | %(message)s",
datefmt="%H:%M:%S",
)
# Enable debug for specific modules
for name in ["codexlens.search", "codexlens.semantic", "codexlens.indexing"]:
logging.getLogger(name).setLevel(logging.DEBUG)
logger = logging.getLogger("debug_semantic")
def load_config() -> Dict[str, Any]:
"""Load config from codexlens settings."""
config_path = Path.home() / ".codexlens" / "config.json"
if config_path.exists():
with open(config_path) as f:
return json.load(f)
return {}
def inspect_hnsw_index(index_root: Path) -> Dict[str, Any]:
"""Inspect centralized HNSW index metadata."""
hnsw_path = index_root / "_vectors.hnsw"
meta_path = index_root / "_vectors_meta.db"
result = {
"hnsw_exists": hnsw_path.exists(),
"meta_exists": meta_path.exists(),
"hnsw_size_mb": round(hnsw_path.stat().st_size / (1024*1024), 2) if hnsw_path.exists() else 0,
}
if meta_path.exists():
conn = sqlite3.connect(str(meta_path))
cursor = conn.execute("SELECT COUNT(*) FROM chunk_metadata")
result["total_chunks"] = cursor.fetchone()[0]
# Sample file paths
cursor = conn.execute("""
SELECT DISTINCT file_path FROM chunk_metadata
ORDER BY file_path LIMIT 20
""")
result["sample_files"] = [row[0] for row in cursor.fetchall()]
# Check if tests vs src
cursor = conn.execute("""
SELECT
CASE
WHEN file_path LIKE '%tests%' OR file_path LIKE '%test_%' THEN 'test'
ELSE 'src'
END as category,
COUNT(*) as count
FROM chunk_metadata
GROUP BY category
""")
result["category_distribution"] = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
return result
def run_dense_search(query: str, index_root: Path, top_k: int = 50) -> List[Tuple[int, float, str]]:
"""Execute dense vector search and return candidates with details."""
from codexlens.semantic.ann_index import ANNIndex
from codexlens.semantic.factory import get_embedder
from codexlens.semantic.vector_store import VectorStore
logger.info("=" * 60)
logger.info("STAGE 1: Dense Embedding Generation")
logger.info("=" * 60)
# Read model config from index
index_db = index_root / "_index.db"
embedding_model = "qwen3-embedding-sf"
embedding_backend = "litellm"
if index_db.exists():
try:
with VectorStore(index_db) as vs:
model_config = vs.get_model_config()
if model_config:
embedding_backend = model_config.get("backend", embedding_backend)
embedding_model = model_config.get("model_name", embedding_model)
logger.info(f"Model config from index: {embedding_backend}/{embedding_model}")
except Exception as e:
logger.warning(f"Failed to read model config: {e}")
# Generate query embedding
embedder = get_embedder(backend=embedding_backend, model=embedding_model)
query_embedding = embedder.embed_to_numpy([query])[0]
logger.info(f"Query: {query!r}")
logger.info(f"Query embedding dim: {query_embedding.shape[0]}")
logger.info(f"Query embedding norm: {(query_embedding**2).sum()**0.5:.4f}")
# Load HNSW index
logger.info("=" * 60)
logger.info("STAGE 2: HNSW Vector Search (Coarse)")
logger.info("=" * 60)
ann_index = ANNIndex.create_central(
index_root=index_root,
dim=query_embedding.shape[0],
)
if not ann_index.load():
logger.error("Failed to load HNSW index")
return []
logger.info(f"HNSW index count: {ann_index.count()}")
# Execute search
ids, distances = ann_index.search(query_embedding, top_k=top_k)
logger.info(f"Found {len(ids)} candidates")
# Get chunk details
candidates = []
meta_path = index_root / "_vectors_meta.db"
if meta_path.exists():
conn = sqlite3.connect(str(meta_path))
conn.row_factory = sqlite3.Row
for chunk_id, distance in zip(ids, distances):
cursor = conn.execute("""
SELECT file_path, content, start_line, end_line
FROM chunk_metadata WHERE chunk_id = ?
""", (int(chunk_id),))
row = cursor.fetchone()
if row:
candidates.append((
int(chunk_id),
float(distance),
row["file_path"],
row["content"][:200] if row["content"] else "",
row["start_line"],
row["end_line"],
))
conn.close()
# Print top candidates
logger.info("\nTop 20 Dense Search Candidates:")
logger.info("-" * 80)
for i, (cid, dist, path, content, start, end) in enumerate(candidates[:20]):
score = max(0, 1 - dist)
is_test = "tests/" in path or "test_" in Path(path).name
marker = "[TEST]" if is_test else "[SRC]"
logger.info(f"{i+1:2d}. {marker} dist={dist:.4f} score={score:.4f}")
logger.info(f" {path}:{start}-{end}")
logger.info(f" {content[:100]}...")
logger.info("")
return candidates
def run_reranking(query: str, candidates: List[Tuple], top_k: int = 10) -> List[Tuple[str, float, float]]:
"""Execute cross-encoder reranking on candidates."""
from codexlens.semantic.reranker import get_reranker, check_reranker_available
logger.info("=" * 60)
logger.info("STAGE 3: Cross-Encoder Reranking")
logger.info("=" * 60)
# Check reranker availability
config = load_config()
backend = config.get("reranker_backend", "api")
model = config.get("reranker_model", "Qwen/Qwen3-Reranker-8B")
logger.info(f"Reranker backend: {backend}")
logger.info(f"Reranker model: {model}")
ok, err = check_reranker_available(backend)
if not ok:
logger.error(f"Reranker not available: {err}")
return []
reranker = get_reranker(backend=backend, model_name=model)
# Prepare pairs for reranking
pairs = []
for cid, dist, path, content, start, end in candidates[:50]: # Top 50 for reranking
doc_text = content if content else path
pairs.append((query, doc_text))
logger.info(f"Reranking {len(pairs)} candidates...")
# Execute reranking
scores = reranker.score_pairs(pairs, batch_size=32)
# Combine scores
results = []
for i, (cid, dist, path, content, start, end) in enumerate(candidates[:len(scores)]):
dense_score = max(0, 1 - dist)
rerank_score = scores[i]
combined = 0.5 * dense_score + 0.5 * rerank_score
is_test = "tests/" in path or "test_" in Path(path).name
results.append((path, dense_score, rerank_score, combined, is_test, content[:100]))
# Sort by combined score
results.sort(key=lambda x: x[3], reverse=True)
logger.info("\nTop 20 Reranked Results:")
logger.info("-" * 100)
logger.info(f"{'Rank':>4} {'Type':^6} {'Dense':^8} {'Rerank':^8} {'Combined':^8} Path")
logger.info("-" * 100)
for i, (path, dense, rerank, combined, is_test, content) in enumerate(results[:20]):
marker = "TEST" if is_test else "SRC"
logger.info(f"{i+1:4d} [{marker:^4}] {dense:8.4f} {rerank:8.4f} {combined:8.4f} {path}")
return results[:top_k]
def analyze_problem(candidates: List[Tuple], results: List[Tuple]):
"""Analyze why tests might rank higher than src files."""
logger.info("=" * 60)
logger.info("ANALYSIS: Why Tests Rank Higher?")
logger.info("=" * 60)
# Count test vs src in dense candidates
test_in_dense = sum(1 for c in candidates[:50] if "tests/" in c[2] or "test_" in Path(c[2]).name)
src_in_dense = 50 - test_in_dense
logger.info(f"\nDense Search (top 50):")
logger.info(f" - Test files: {test_in_dense} ({test_in_dense*2}%)")
logger.info(f" - Src files: {src_in_dense} ({src_in_dense*2}%)")
# Average scores by category
test_dense_scores = [max(0, 1-c[1]) for c in candidates[:50] if "tests/" in c[2] or "test_" in Path(c[2]).name]
src_dense_scores = [max(0, 1-c[1]) for c in candidates[:50] if not ("tests/" in c[2] or "test_" in Path(c[2]).name)]
if test_dense_scores:
logger.info(f"\nDense Score Averages:")
logger.info(f" - Test files: {sum(test_dense_scores)/len(test_dense_scores):.4f}")
if src_dense_scores:
logger.info(f" - Src files: {sum(src_dense_scores)/len(src_dense_scores):.4f}")
# Check rerank score distribution
test_results = [r for r in results if r[4]]
src_results = [r for r in results if not r[4]]
if test_results and src_results:
logger.info(f"\nRerank Score Averages:")
logger.info(f" - Test files: {sum(r[2] for r in test_results)/len(test_results):.4f}")
logger.info(f" - Src files: {sum(r[2] for r in src_results)/len(src_results):.4f}")
logger.info("\n" + "=" * 60)
logger.info("HYPOTHESIS:")
logger.info("=" * 60)
if test_in_dense > src_in_dense:
logger.info("→ Problem is at DENSE SEARCH stage")
logger.info(" Test files have embeddings closer to query")
logger.info(" Possible causes:")
logger.info(" 1. Test files mention implementation concepts in comments/docstrings")
logger.info(" 2. Embedding model doesn't distinguish between tests and implementation")
logger.info(" 3. Test file chunks are more frequent in the index")
else:
logger.info("→ Problem may be at RERANKING stage")
logger.info(" Reranker gives higher scores to test content")
def main():
query = "文件索引和嵌入向量生成的实现逻辑"
index_root = Path(r"C:\Users\dyw\.codexlens\indexes\D\Claude_dms3")
logger.info("=" * 60)
logger.info("DEBUG: Semantic Search Analysis")
logger.info("=" * 60)
logger.info(f"Query: {query}")
logger.info(f"Index root: {index_root}")
logger.info("")
# Step 1: Inspect index
logger.info("STEP 0: Index Inspection")
logger.info("-" * 60)
index_info = inspect_hnsw_index(index_root)
for k, v in index_info.items():
if k == "sample_files":
logger.info(f" {k}:")
for f in v[:10]:
logger.info(f" - {f}")
elif k == "category_distribution":
logger.info(f" {k}:")
for cat, count in v.items():
logger.info(f" - {cat}: {count}")
else:
logger.info(f" {k}: {v}")
logger.info("")
# Step 2: Dense search
candidates = run_dense_search(query, index_root, top_k=100)
if not candidates:
logger.error("No candidates from dense search")
return
# Step 3: Reranking
results = run_reranking(query, candidates, top_k=20)
# Step 4: Analyze
analyze_problem(candidates, results)
if __name__ == "__main__":
main()