"""Semantic search API with RRF fusion. This module provides the semantic_search() function for combining vector, structural, and keyword search with configurable fusion strategies. """ from __future__ import annotations import logging from pathlib import Path from typing import List, Optional from .models import SemanticResult from .utils import resolve_project logger = logging.getLogger(__name__) def semantic_search( project_root: str, query: str, mode: str = "fusion", vector_weight: float = 0.5, structural_weight: float = 0.3, keyword_weight: float = 0.2, fusion_strategy: str = "rrf", kind_filter: Optional[List[str]] = None, limit: int = 20, include_match_reason: bool = False, ) -> List[SemanticResult]: """Semantic search - combining vector and structural search. This function provides a high-level API for semantic code search, combining vector similarity, structural (symbol + relationships), and keyword-based search methods with configurable fusion. Args: project_root: Project root directory query: Natural language query mode: Search mode - vector: Vector search only - structural: Structural search only (symbol + relationships) - fusion: Fusion search (default) vector_weight: Vector search weight [0, 1] (default 0.5) structural_weight: Structural search weight [0, 1] (default 0.3) keyword_weight: Keyword search weight [0, 1] (default 0.2) fusion_strategy: Fusion strategy (maps to chain_search.py) - rrf: Reciprocal Rank Fusion (recommended, default) - staged: Staged cascade -> staged_cascade_search - binary: Binary rerank cascade -> binary_cascade_search - hybrid: Hybrid cascade -> hybrid_cascade_search kind_filter: Symbol type filter (e.g., ["function", "class"]) limit: Max return count (default 20) include_match_reason: Generate match reason (heuristic, not LLM) Returns: Results sorted by fusion_score Degradation: - No vector index: vector_score=None, uses FTS + structural search - No relationship data: structural_score=None, vector search only Examples: >>> results = semantic_search( ... "/path/to/project", ... "authentication handler", ... mode="fusion", ... fusion_strategy="rrf" ... ) >>> for r in results: ... print(f"{r.symbol_name}: {r.fusion_score:.3f}") """ # Validate and resolve project path project_path = resolve_project(project_root) # Normalize weights to sum to 1.0 total_weight = vector_weight + structural_weight + keyword_weight if total_weight > 0: vector_weight = vector_weight / total_weight structural_weight = structural_weight / total_weight keyword_weight = keyword_weight / total_weight else: # Default to equal weights if all zero vector_weight = structural_weight = keyword_weight = 1.0 / 3.0 # Initialize search infrastructure try: from codexlens.config import Config from codexlens.storage.registry import RegistryStore from codexlens.storage.path_mapper import PathMapper from codexlens.search.chain_search import ChainSearchEngine, SearchOptions except ImportError as exc: logger.error("Failed to import search dependencies: %s", exc) return [] # Load config config = Config.load() # Get or create registry and mapper try: registry = RegistryStore.default() mapper = PathMapper(registry) except Exception as exc: logger.error("Failed to initialize search infrastructure: %s", exc) return [] # Build search options based on mode search_options = _build_search_options( mode=mode, vector_weight=vector_weight, structural_weight=structural_weight, keyword_weight=keyword_weight, limit=limit, ) # Execute search based on fusion_strategy try: with ChainSearchEngine(registry, mapper, config=config) as engine: chain_result = _execute_search( engine=engine, query=query, source_path=project_path, fusion_strategy=fusion_strategy, options=search_options, limit=limit, ) except Exception as exc: logger.error("Search execution failed: %s", exc) return [] # Transform results to SemanticResult semantic_results = _transform_results( results=chain_result.results, mode=mode, vector_weight=vector_weight, structural_weight=structural_weight, keyword_weight=keyword_weight, kind_filter=kind_filter, include_match_reason=include_match_reason, query=query, ) return semantic_results[:limit] def _build_search_options( mode: str, vector_weight: float, structural_weight: float, keyword_weight: float, limit: int, ) -> "SearchOptions": """Build SearchOptions based on mode and weights. Args: mode: Search mode (vector, structural, fusion) vector_weight: Vector search weight structural_weight: Structural search weight keyword_weight: Keyword search weight limit: Result limit Returns: Configured SearchOptions """ from codexlens.search.chain_search import SearchOptions # Default options options = SearchOptions( total_limit=limit * 2, # Fetch extra for filtering limit_per_dir=limit, include_symbols=True, # Always include symbols for structural ) if mode == "vector": # Pure vector mode options.hybrid_mode = True options.enable_vector = True options.pure_vector = True options.enable_fuzzy = False elif mode == "structural": # Structural only - use FTS + symbols options.hybrid_mode = True options.enable_vector = False options.enable_fuzzy = True options.include_symbols = True else: # Fusion mode (default) options.hybrid_mode = True options.enable_vector = vector_weight > 0 options.enable_fuzzy = keyword_weight > 0 options.include_symbols = structural_weight > 0 # Set custom weights for RRF if options.enable_vector and keyword_weight > 0: options.hybrid_weights = { "vector": vector_weight, "exact": keyword_weight * 0.7, "fuzzy": keyword_weight * 0.3, } return options def _execute_search( engine: "ChainSearchEngine", query: str, source_path: Path, fusion_strategy: str, options: "SearchOptions", limit: int, ) -> "ChainSearchResult": """Execute search using appropriate strategy. Maps fusion_strategy to ChainSearchEngine methods: - rrf: Standard hybrid search with RRF fusion - staged: staged_cascade_search - binary: binary_cascade_search - hybrid: hybrid_cascade_search Args: engine: ChainSearchEngine instance query: Search query source_path: Project root path fusion_strategy: Strategy name options: Search options limit: Result limit Returns: ChainSearchResult from the search """ from codexlens.search.chain_search import ChainSearchResult if fusion_strategy == "staged": # Use staged cascade search (4-stage pipeline) return engine.staged_cascade_search( query=query, source_path=source_path, k=limit, coarse_k=limit * 5, options=options, ) elif fusion_strategy == "binary": # Use binary cascade search (binary coarse + dense fine) return engine.binary_cascade_search( query=query, source_path=source_path, k=limit, coarse_k=limit * 5, options=options, ) elif fusion_strategy == "hybrid": # Use hybrid cascade search (FTS+SPLADE+Vector + cross-encoder) return engine.hybrid_cascade_search( query=query, source_path=source_path, k=limit, coarse_k=limit * 5, options=options, ) else: # Default: rrf - Standard search with RRF fusion return engine.search( query=query, source_path=source_path, options=options, ) def _transform_results( results: List, mode: str, vector_weight: float, structural_weight: float, keyword_weight: float, kind_filter: Optional[List[str]], include_match_reason: bool, query: str, ) -> List[SemanticResult]: """Transform ChainSearchEngine results to SemanticResult. Args: results: List of SearchResult objects mode: Search mode vector_weight: Vector weight used structural_weight: Structural weight used keyword_weight: Keyword weight used kind_filter: Optional symbol kind filter include_match_reason: Whether to generate match reasons query: Original query (for match reason generation) Returns: List of SemanticResult objects """ semantic_results = [] for result in results: # Extract symbol info symbol_name = getattr(result, "symbol_name", None) symbol_kind = getattr(result, "symbol_kind", None) start_line = getattr(result, "start_line", None) # Use symbol object if available if hasattr(result, "symbol") and result.symbol: symbol_name = symbol_name or result.symbol.name symbol_kind = symbol_kind or result.symbol.kind if hasattr(result.symbol, "range") and result.symbol.range: start_line = start_line or result.symbol.range[0] # Filter by kind if specified if kind_filter and symbol_kind: if symbol_kind.lower() not in [k.lower() for k in kind_filter]: continue # Determine scores based on mode and metadata metadata = getattr(result, "metadata", {}) or {} fusion_score = result.score # Try to extract source scores from metadata source_scores = metadata.get("source_scores", {}) vector_score: Optional[float] = None structural_score: Optional[float] = None if mode == "vector": # In pure vector mode, the main score is the vector score vector_score = result.score structural_score = None elif mode == "structural": # In structural mode, no vector score vector_score = None structural_score = result.score else: # Fusion mode - try to extract individual scores if "vector" in source_scores: vector_score = source_scores["vector"] elif metadata.get("fusion_method") == "simple_weighted": # From weighted fusion vector_score = source_scores.get("vector") # Structural score approximation (from exact/fuzzy FTS) fts_scores = [] if "exact" in source_scores: fts_scores.append(source_scores["exact"]) if "fuzzy" in source_scores: fts_scores.append(source_scores["fuzzy"]) if "splade" in source_scores: fts_scores.append(source_scores["splade"]) if fts_scores: structural_score = max(fts_scores) # Build snippet snippet = getattr(result, "excerpt", "") or getattr(result, "content", "") if len(snippet) > 500: snippet = snippet[:500] + "..." # Generate match reason if requested match_reason = None if include_match_reason: match_reason = _generate_match_reason( query=query, symbol_name=symbol_name, symbol_kind=symbol_kind, snippet=snippet, vector_score=vector_score, structural_score=structural_score, ) semantic_result = SemanticResult( symbol_name=symbol_name or Path(result.path).stem, kind=symbol_kind or "unknown", file_path=result.path, line=start_line or 1, vector_score=vector_score, structural_score=structural_score, fusion_score=fusion_score, snippet=snippet, match_reason=match_reason, ) semantic_results.append(semantic_result) # Sort by fusion_score descending semantic_results.sort(key=lambda r: r.fusion_score, reverse=True) return semantic_results def _generate_match_reason( query: str, symbol_name: Optional[str], symbol_kind: Optional[str], snippet: str, vector_score: Optional[float], structural_score: Optional[float], ) -> str: """Generate human-readable match reason heuristically. This is a simple heuristic-based approach, not LLM-powered. Args: query: Original search query symbol_name: Symbol name if available symbol_kind: Symbol kind if available snippet: Code snippet vector_score: Vector similarity score structural_score: Structural match score Returns: Human-readable explanation string """ reasons = [] # Check for direct name match query_lower = query.lower() query_words = set(query_lower.split()) if symbol_name: name_lower = symbol_name.lower() # Direct substring match if query_lower in name_lower or name_lower in query_lower: reasons.append(f"Symbol name '{symbol_name}' matches query") # Word overlap name_words = set(_split_camel_case(symbol_name).lower().split()) overlap = query_words & name_words if overlap and not reasons: reasons.append(f"Symbol name contains: {', '.join(overlap)}") # Check snippet for keyword matches snippet_lower = snippet.lower() matching_words = [w for w in query_words if w in snippet_lower and len(w) > 2] if matching_words and len(reasons) < 2: reasons.append(f"Code contains keywords: {', '.join(matching_words[:3])}") # Add score-based reasoning if vector_score is not None and vector_score > 0.7: reasons.append("High semantic similarity") elif vector_score is not None and vector_score > 0.5: reasons.append("Moderate semantic similarity") if structural_score is not None and structural_score > 0.8: reasons.append("Strong structural match") # Symbol kind context if symbol_kind and len(reasons) < 3: reasons.append(f"Matched {symbol_kind}") if not reasons: reasons.append("Partial relevance based on content analysis") return "; ".join(reasons[:3]) def _split_camel_case(name: str) -> str: """Split camelCase and PascalCase to words. Args: name: Symbol name in camelCase or PascalCase Returns: Space-separated words """ import re # Insert space before uppercase letters result = re.sub(r"([a-z])([A-Z])", r"\1 \2", name) # Insert space before uppercase followed by lowercase result = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1 \2", result) # Replace underscores with spaces result = result.replace("_", " ") return result