feat: Return complete method blocks in FTS search results

- Add helper methods to locate match lines and find containing symbols
- Modify search_fts, search_fts_exact, search_fts_fuzzy to return complete
  code blocks (functions/methods/classes) instead of short snippets
- Join with files table to get full content and file_id
- Query symbols table to find the smallest symbol containing the match
- Fall back to context lines when no symbol contains the match
- Add return_full_content and context_lines parameters for flexibility
- Include start_line, end_line, symbol_name, symbol_kind in SearchResult

This improves search result quality by returning semantically meaningful
code blocks rather than arbitrary 20-byte snippets.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
catlog22
2025-12-19 11:38:39 +08:00
parent e17e9a6473
commit 69049e3f45

View File

@@ -9,6 +9,8 @@ Each directory maintains its own _index.db with:
from __future__ import annotations
import logging
import re
import sqlite3
import threading
from dataclasses import dataclass
@@ -67,6 +69,7 @@ class DirIndexStore:
self.db_path = Path(db_path).resolve()
self._lock = threading.RLock()
self._conn: Optional[sqlite3.Connection] = None
self.logger = logging.getLogger(__name__)
def initialize(self) -> None:
"""Create database and schema if not exists."""
@@ -281,7 +284,12 @@ class DirIndexStore:
# Insert new relationships
relationship_rows = []
skipped_relationships = []
for rel in relationships:
# Extract simple name from fully qualified name (e.g., "MyClass.my_method" -> "my_method")
# This handles cases where GraphAnalyzer generates qualified names but symbols table stores simple names
source_symbol_simple = rel.source_symbol.split(".")[-1] if "." in rel.source_symbol else rel.source_symbol
# Find symbol_id by name and file
symbol_row = conn.execute(
"""
@@ -289,14 +297,14 @@ class DirIndexStore:
WHERE file_id=? AND name=? AND start_line<=? AND end_line>=?
LIMIT 1
""",
(file_id, rel.source_symbol, rel.source_line, rel.source_line),
(file_id, source_symbol_simple, rel.source_line, rel.source_line),
).fetchone()
if not symbol_row:
# Try matching by name only
# Try matching by simple name only
symbol_row = conn.execute(
"SELECT id FROM symbols WHERE file_id=? AND name=? LIMIT 1",
(file_id, rel.source_symbol),
(file_id, source_symbol_simple),
).fetchone()
if symbol_row:
@@ -307,6 +315,18 @@ class DirIndexStore:
rel.source_line,
rel.target_file,
))
else:
# Log warning when symbol lookup fails
skipped_relationships.append(rel.source_symbol)
# Log skipped relationships for debugging
if skipped_relationships:
self.logger.warning(
"Failed to find source symbol IDs for %d relationships in %s: %s",
len(skipped_relationships),
file_path_str,
", ".join(set(skipped_relationships))
)
if relationship_rows:
conn.executemany(
@@ -1103,12 +1123,120 @@ class DirIndexStore:
enhanced_words = [f"{word}*" if not word.endswith('*') else word for word in words]
return ' '.join(enhanced_words)
def search_fts(self, query: str, limit: int = 20, enhance_query: bool = False) -> List[SearchResult]:
"""Full-text search in current directory files.
def _find_match_lines(self, content: str, query: str) -> List[int]:
"""Find line numbers where query terms match.
Args:
content: File content
query: Search query (FTS5 format)
Returns:
List of 1-based line numbers containing matches
"""
# Extract search terms from FTS query (remove operators)
terms = re.findall(r'["\']([^"\']+)["\']|(\w+)', query)
search_terms = [t[0] or t[1] for t in terms if t[0] or t[1]]
# Filter out FTS operators
fts_operators = {'AND', 'OR', 'NOT', 'NEAR'}
search_terms = [t for t in search_terms if t.upper() not in fts_operators]
if not search_terms:
return [1] # Default to first line
lines = content.split('\n')
match_lines = []
for i, line in enumerate(lines, 1):
line_lower = line.lower()
for term in search_terms:
# Handle wildcard suffix
term_clean = term.rstrip('*').lower()
if term_clean and term_clean in line_lower:
match_lines.append(i)
break
return match_lines if match_lines else [1]
def _find_containing_symbol(
self, conn: sqlite3.Connection, file_id: int, line_num: int
) -> Optional[Tuple[int, int, str, str]]:
"""Find the symbol that contains the given line number.
Args:
conn: Database connection
file_id: File ID in database
line_num: 1-based line number
Returns:
Tuple of (start_line, end_line, symbol_name, symbol_kind) or None
"""
row = conn.execute(
"""
SELECT start_line, end_line, name, kind
FROM symbols
WHERE file_id = ? AND start_line <= ? AND end_line >= ?
ORDER BY (end_line - start_line) ASC
LIMIT 1
""",
(file_id, line_num, line_num),
).fetchone()
if row:
return (row["start_line"], row["end_line"], row["name"], row["kind"])
return None
def _extract_code_block(
self,
content: str,
start_line: int,
end_line: int,
match_line: Optional[int] = None,
context_lines: int = 5,
) -> Tuple[str, int, int]:
"""Extract code block from content.
If start_line/end_line are provided (from symbol), use them.
Otherwise, extract context around match_line.
Args:
content: Full file content
start_line: 1-based start line (from symbol or calculated)
end_line: 1-based end line (from symbol or calculated)
match_line: 1-based line where match occurred (for context extraction)
context_lines: Number of lines before/after match when no symbol
Returns:
Tuple of (code_block, actual_start_line, actual_end_line)
"""
lines = content.split('\n')
total_lines = len(lines)
# Clamp to valid range
start_line = max(1, start_line)
end_line = min(total_lines, end_line)
# Extract block (convert to 0-based index)
block_lines = lines[start_line - 1:end_line]
block_content = '\n'.join(block_lines)
return block_content, start_line, end_line
def search_fts(
self,
query: str,
limit: int = 20,
enhance_query: bool = False,
return_full_content: bool = True,
context_lines: int = 10,
) -> List[SearchResult]:
"""Full-text search in current directory files with complete method blocks.
Uses files_fts_exact (unicode61 tokenizer) for exact token matching.
For fuzzy/substring search, use search_fts_fuzzy() instead.
Returns complete code blocks (functions/methods/classes) containing the match,
rather than just a short snippet.
Best Practice (from industry analysis of Codanna/Code-Index-MCP):
- Default: Respects exact user input without modification
- Users can manually add wildcards (e.g., "loadPack*") for prefix matching
@@ -1120,9 +1248,11 @@ class DirIndexStore:
limit: Maximum results to return
enhance_query: If True, automatically add prefix wildcards for simple queries.
Default False to respect exact user input.
return_full_content: If True, include full code block in content field
context_lines: Lines of context when no symbol contains the match
Returns:
List of SearchResult objects sorted by relevance
List of SearchResult objects with complete code blocks
Raises:
StorageError: If FTS search fails
@@ -1134,11 +1264,13 @@ class DirIndexStore:
with self._lock:
conn = self._get_connection()
try:
# Join with files table to get content and file_id
rows = conn.execute(
"""
SELECT rowid, full_path, bm25(files_fts_exact) AS rank,
snippet(files_fts_exact, 2, '[bold red]', '[/bold red]', '...', 20) AS excerpt
SELECT f.id AS file_id, f.full_path, f.content,
bm25(files_fts_exact) AS rank
FROM files_fts_exact
JOIN files f ON files_fts_exact.rowid = f.id
WHERE files_fts_exact MATCH ?
ORDER BY rank
LIMIT ?
@@ -1150,26 +1282,73 @@ class DirIndexStore:
results: List[SearchResult] = []
for row in rows:
file_id = row["file_id"]
file_path = row["full_path"]
content = row["content"] or ""
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
# Find matching lines
match_lines = self._find_match_lines(content, final_query)
first_match_line = match_lines[0] if match_lines else 1
# Find symbol containing the first match
symbol_info = self._find_containing_symbol(conn, file_id, first_match_line)
if symbol_info:
start_line, end_line, symbol_name, symbol_kind = symbol_info
else:
# No symbol found, use context around match
lines = content.split('\n')
total_lines = len(lines)
start_line = max(1, first_match_line - context_lines)
end_line = min(total_lines, first_match_line + context_lines)
symbol_name = None
symbol_kind = None
# Extract code block
block_content, start_line, end_line = self._extract_code_block(
content, start_line, end_line
)
# Generate excerpt (first 200 chars)
excerpt = block_content[:200] + "..." if len(block_content) > 200 else block_content
results.append(
SearchResult(
path=row["full_path"],
path=file_path,
score=score,
excerpt=row["excerpt"],
excerpt=excerpt,
content=block_content if return_full_content else None,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
)
)
return results
def search_fts_exact(self, query: str, limit: int = 20) -> List[SearchResult]:
"""Full-text search using exact token matching (unicode61 tokenizer).
def search_fts_exact(
self,
query: str,
limit: int = 20,
return_full_content: bool = True,
context_lines: int = 10,
) -> List[SearchResult]:
"""Full-text search using exact token matching with complete method blocks.
Returns complete code blocks (functions/methods/classes) containing the match,
rather than just a short snippet. If no symbol contains the match, returns
context lines around the match.
Args:
query: FTS5 query string
limit: Maximum results to return
return_full_content: If True, include full code block in content field
context_lines: Lines of context when no symbol contains the match
Returns:
List of SearchResult objects sorted by relevance
List of SearchResult objects with complete code blocks
Raises:
StorageError: If FTS search fails
@@ -1177,11 +1356,13 @@ class DirIndexStore:
with self._lock:
conn = self._get_connection()
try:
# Join with files table to get content and file_id
rows = conn.execute(
"""
SELECT rowid, full_path, bm25(files_fts_exact) AS rank,
snippet(files_fts_exact, 2, '[bold red]', '[/bold red]', '...', 20) AS excerpt
SELECT f.id AS file_id, f.full_path, f.content,
bm25(files_fts_exact) AS rank
FROM files_fts_exact
JOIN files f ON files_fts_exact.rowid = f.id
WHERE files_fts_exact MATCH ?
ORDER BY rank
LIMIT ?
@@ -1193,26 +1374,73 @@ class DirIndexStore:
results: List[SearchResult] = []
for row in rows:
file_id = row["file_id"]
file_path = row["full_path"]
content = row["content"] or ""
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
# Find matching lines
match_lines = self._find_match_lines(content, query)
first_match_line = match_lines[0] if match_lines else 1
# Find symbol containing the first match
symbol_info = self._find_containing_symbol(conn, file_id, first_match_line)
if symbol_info:
start_line, end_line, symbol_name, symbol_kind = symbol_info
else:
# No symbol found, use context around match
lines = content.split('\n')
total_lines = len(lines)
start_line = max(1, first_match_line - context_lines)
end_line = min(total_lines, first_match_line + context_lines)
symbol_name = None
symbol_kind = None
# Extract code block
block_content, start_line, end_line = self._extract_code_block(
content, start_line, end_line
)
# Generate excerpt (first 200 chars)
excerpt = block_content[:200] + "..." if len(block_content) > 200 else block_content
results.append(
SearchResult(
path=row["full_path"],
path=file_path,
score=score,
excerpt=row["excerpt"],
excerpt=excerpt,
content=block_content if return_full_content else None,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
)
)
return results
def search_fts_fuzzy(self, query: str, limit: int = 20) -> List[SearchResult]:
"""Full-text search using fuzzy/substring matching (trigram or extended unicode61 tokenizer).
def search_fts_fuzzy(
self,
query: str,
limit: int = 20,
return_full_content: bool = True,
context_lines: int = 10,
) -> List[SearchResult]:
"""Full-text search using fuzzy/substring matching with complete method blocks.
Returns complete code blocks (functions/methods/classes) containing the match,
rather than just a short snippet. If no symbol contains the match, returns
context lines around the match.
Args:
query: FTS5 query string
limit: Maximum results to return
return_full_content: If True, include full code block in content field
context_lines: Lines of context when no symbol contains the match
Returns:
List of SearchResult objects sorted by relevance
List of SearchResult objects with complete code blocks
Raises:
StorageError: If FTS search fails
@@ -1220,11 +1448,13 @@ class DirIndexStore:
with self._lock:
conn = self._get_connection()
try:
# Join with files table to get content and file_id
rows = conn.execute(
"""
SELECT rowid, full_path, bm25(files_fts_fuzzy) AS rank,
snippet(files_fts_fuzzy, 2, '[bold red]', '[/bold red]', '...', 20) AS excerpt
SELECT f.id AS file_id, f.full_path, f.content,
bm25(files_fts_fuzzy) AS rank
FROM files_fts_fuzzy
JOIN files f ON files_fts_fuzzy.rowid = f.id
WHERE files_fts_fuzzy MATCH ?
ORDER BY rank
LIMIT ?
@@ -1236,13 +1466,48 @@ class DirIndexStore:
results: List[SearchResult] = []
for row in rows:
file_id = row["file_id"]
file_path = row["full_path"]
content = row["content"] or ""
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
# Find matching lines
match_lines = self._find_match_lines(content, query)
first_match_line = match_lines[0] if match_lines else 1
# Find symbol containing the first match
symbol_info = self._find_containing_symbol(conn, file_id, first_match_line)
if symbol_info:
start_line, end_line, symbol_name, symbol_kind = symbol_info
else:
# No symbol found, use context around match
lines = content.split('\n')
total_lines = len(lines)
start_line = max(1, first_match_line - context_lines)
end_line = min(total_lines, first_match_line + context_lines)
symbol_name = None
symbol_kind = None
# Extract code block
block_content, start_line, end_line = self._extract_code_block(
content, start_line, end_line
)
# Generate excerpt (first 200 chars)
excerpt = block_content[:200] + "..." if len(block_content) > 200 else block_content
results.append(
SearchResult(
path=row["full_path"],
path=file_path,
score=score,
excerpt=row["excerpt"],
excerpt=excerpt,
content=block_content if return_full_content else None,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
)
)
return results