feat: Optimize FTS search with batch symbol fetching and improved excerpt generation

This commit is contained in:
catlog22
2025-12-19 15:35:31 +08:00
parent 2f0cce0089
commit 3428642d04

View File

@@ -1221,6 +1221,237 @@ class DirIndexStore:
return block_content, start_line, end_line
def _batch_fetch_symbols(
self, conn: sqlite3.Connection, file_ids: List[int]
) -> Dict[int, List[Tuple[int, int, str, str]]]:
"""Batch fetch all symbols for multiple files in a single query.
Args:
conn: Database connection
file_ids: List of file IDs to fetch symbols for
Returns:
Dictionary mapping file_id to list of (start_line, end_line, name, kind) tuples
"""
if not file_ids:
return {}
# Build placeholder string for IN clause
placeholders = ','.join('?' for _ in file_ids)
rows = conn.execute(
f"""
SELECT file_id, start_line, end_line, name, kind
FROM symbols
WHERE file_id IN ({placeholders})
ORDER BY file_id, (end_line - start_line) ASC
""",
file_ids,
).fetchall()
# Organize symbols by file_id
symbols_by_file: Dict[int, List[Tuple[int, int, str, str]]] = {fid: [] for fid in file_ids}
for row in rows:
symbols_by_file[row["file_id"]].append(
(row["start_line"], row["end_line"], row["name"], row["kind"])
)
return symbols_by_file
def _find_containing_symbol_from_cache(
self, symbols: List[Tuple[int, int, str, str]], line_num: int
) -> Optional[Tuple[int, int, str, str]]:
"""Find the smallest symbol containing the given line number from cached symbols.
Args:
symbols: List of (start_line, end_line, name, kind) tuples, sorted by size
line_num: 1-based line number
Returns:
Tuple of (start_line, end_line, symbol_name, symbol_kind) or None
"""
for start_line, end_line, name, kind in symbols:
if start_line <= line_num <= end_line:
return (start_line, end_line, name, kind)
return None
def _generate_centered_excerpt(
self, content: str, match_line: int, start_line: int, end_line: int, max_chars: int = 200
) -> str:
"""Generate excerpt centered around the match line.
Args:
content: Full file content
match_line: 1-based line where match occurred
start_line: 1-based start line of the code block
end_line: 1-based end line of the code block
max_chars: Maximum characters for excerpt
Returns:
Excerpt string centered around the match
"""
lines = content.split('\n')
total_lines = len(lines)
# Ensure match_line is within bounds
match_line = max(1, min(match_line, total_lines))
# Calculate context window (2 lines before, 2 lines after the match)
ctx_start = max(start_line, match_line - 2)
ctx_end = min(end_line, match_line + 2)
# Extract and join lines
excerpt_lines = lines[ctx_start - 1:ctx_end]
excerpt = '\n'.join(excerpt_lines)
# Truncate if too long
if len(excerpt) > max_chars:
excerpt = excerpt[:max_chars] + "..."
return excerpt
def _search_internal(
self,
query: str,
fts_table: str,
limit: int = 20,
return_full_content: bool = False,
context_lines: int = 10,
) -> List[SearchResult]:
"""Internal unified search implementation for all FTS modes.
Optimizations:
- Fast path: Direct FTS query with snippet() for location-only results
- Full content path: Batch fetch symbols to eliminate N+1 queries
- Centered excerpt generation for better context
Args:
query: FTS5 query string
fts_table: FTS table name ('files_fts_exact' or 'files_fts_fuzzy')
limit: Maximum results to return
return_full_content: If True, include full code block in content field
context_lines: Lines of context when no symbol contains the match
Returns:
List of SearchResult objects
"""
with self._lock:
conn = self._get_connection()
# Fast path: location-only results (no content processing)
if not return_full_content:
try:
rows = conn.execute(
f"""
SELECT rowid, full_path, bm25({fts_table}) AS rank,
snippet({fts_table}, 2, '', '', '...', 30) AS excerpt
FROM {fts_table}
WHERE {fts_table} MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
results.append(
SearchResult(
path=row["full_path"],
score=score,
excerpt=row["excerpt"],
)
)
return results
# Full content path with batch optimization
# Step 1: Get file_ids and ranks (lightweight query)
try:
id_rows = conn.execute(
f"""
SELECT rowid AS file_id, bm25({fts_table}) AS rank
FROM {fts_table}
WHERE {fts_table} MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
if not id_rows:
return []
file_ids = [row["file_id"] for row in id_rows]
ranks_by_id = {row["file_id"]: row["rank"] for row in id_rows}
# Step 2: Batch fetch all symbols for matched files (eliminates N+1)
symbols_by_file = self._batch_fetch_symbols(conn, file_ids)
# Step 3: Process each file on-demand (reduces memory)
results: List[SearchResult] = []
for file_id in file_ids:
# Fetch file content on-demand
file_row = conn.execute(
"SELECT full_path, content FROM files WHERE id = ?",
(file_id,),
).fetchone()
if not file_row:
continue
file_path = file_row["full_path"]
content = file_row["content"] or ""
rank = ranks_by_id.get(file_id, 0.0)
score = abs(rank) if rank < 0 else 0.0
# Find matching lines
match_lines = self._find_match_lines(content, query)
first_match_line = match_lines[0] if match_lines else 1
# Find symbol from cached symbols (no extra SQL query)
file_symbols = symbols_by_file.get(file_id, [])
symbol_info = self._find_containing_symbol_from_cache(file_symbols, first_match_line)
if symbol_info:
start_line, end_line, symbol_name, symbol_kind = symbol_info
else:
# No symbol found, use context around match
lines = content.split('\n')
total_lines = len(lines)
start_line = max(1, first_match_line - context_lines)
end_line = min(total_lines, first_match_line + context_lines)
symbol_name = None
symbol_kind = None
# Extract code block
block_content, start_line, end_line = self._extract_code_block(
content, start_line, end_line
)
# Generate centered excerpt (improved quality)
excerpt = self._generate_centered_excerpt(
content, first_match_line, start_line, end_line
)
results.append(
SearchResult(
path=file_path,
score=score,
excerpt=excerpt,
content=block_content,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
)
)
return results
def search_fts(
self,
query: str,
@@ -1255,107 +1486,14 @@ class DirIndexStore:
Raises:
StorageError: If FTS search fails
"""
# Only enhance query if explicitly requested (not default behavior)
# Best practice: Let users control wildcards manually
final_query = self._enhance_fts_query(query) if enhance_query else query
with self._lock:
conn = self._get_connection()
# Fast path: location-only results (no content processing)
if not return_full_content:
try:
rows = conn.execute(
"""
SELECT rowid, full_path, bm25(files_fts_exact) AS rank,
snippet(files_fts_exact, 2, '', '', '...', 30) AS excerpt
FROM files_fts_exact
WHERE files_fts_exact MATCH ?
ORDER BY rank
LIMIT ?
""",
(final_query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
results.append(
SearchResult(
path=row["full_path"],
score=score,
excerpt=row["excerpt"],
)
)
return results
# Full content path: fetch content and find containing symbols
try:
rows = conn.execute(
"""
SELECT f.id AS file_id, f.full_path, f.content,
bm25(files_fts_exact) AS rank
FROM files_fts_exact
JOIN files f ON files_fts_exact.rowid = f.id
WHERE files_fts_exact MATCH ?
ORDER BY rank
LIMIT ?
""",
(final_query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
file_id = row["file_id"]
file_path = row["full_path"]
content = row["content"] or ""
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
# Find matching lines
match_lines = self._find_match_lines(content, final_query)
first_match_line = match_lines[0] if match_lines else 1
# Find symbol containing the first match
symbol_info = self._find_containing_symbol(conn, file_id, first_match_line)
if symbol_info:
start_line, end_line, symbol_name, symbol_kind = symbol_info
else:
# No symbol found, use context around match
lines = content.split('\n')
total_lines = len(lines)
start_line = max(1, first_match_line - context_lines)
end_line = min(total_lines, first_match_line + context_lines)
symbol_name = None
symbol_kind = None
# Extract code block
block_content, start_line, end_line = self._extract_code_block(
content, start_line, end_line
)
# Generate excerpt (first 200 chars)
excerpt = block_content[:200] + "..." if len(block_content) > 200 else block_content
results.append(
SearchResult(
path=file_path,
score=score,
excerpt=excerpt,
content=block_content,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
)
)
return results
return self._search_internal(
query=final_query,
fts_table='files_fts_exact',
limit=limit,
return_full_content=return_full_content,
context_lines=context_lines,
)
def search_fts_exact(
self,
@@ -1379,103 +1517,13 @@ class DirIndexStore:
Raises:
StorageError: If FTS search fails
"""
with self._lock:
conn = self._get_connection()
# Fast path: location-only results (no content processing)
if not return_full_content:
try:
rows = conn.execute(
"""
SELECT rowid, full_path, bm25(files_fts_exact) AS rank,
snippet(files_fts_exact, 2, '', '', '...', 30) AS excerpt
FROM files_fts_exact
WHERE files_fts_exact MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS exact search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
results.append(
SearchResult(
path=row["full_path"],
score=score,
excerpt=row["excerpt"],
)
)
return results
# Full content path: fetch content and find containing symbols
try:
rows = conn.execute(
"""
SELECT f.id AS file_id, f.full_path, f.content,
bm25(files_fts_exact) AS rank
FROM files_fts_exact
JOIN files f ON files_fts_exact.rowid = f.id
WHERE files_fts_exact MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS exact search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
file_id = row["file_id"]
file_path = row["full_path"]
content = row["content"] or ""
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
# Find matching lines
match_lines = self._find_match_lines(content, query)
first_match_line = match_lines[0] if match_lines else 1
# Find symbol containing the first match
symbol_info = self._find_containing_symbol(conn, file_id, first_match_line)
if symbol_info:
start_line, end_line, symbol_name, symbol_kind = symbol_info
else:
# No symbol found, use context around match
lines = content.split('\n')
total_lines = len(lines)
start_line = max(1, first_match_line - context_lines)
end_line = min(total_lines, first_match_line + context_lines)
symbol_name = None
symbol_kind = None
# Extract code block
block_content, start_line, end_line = self._extract_code_block(
content, start_line, end_line
)
# Generate excerpt (first 200 chars)
excerpt = block_content[:200] + "..." if len(block_content) > 200 else block_content
results.append(
SearchResult(
path=file_path,
score=score,
excerpt=excerpt,
content=block_content,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
)
)
return results
return self._search_internal(
query=query,
fts_table='files_fts_exact',
limit=limit,
return_full_content=return_full_content,
context_lines=context_lines,
)
def search_fts_fuzzy(
self,
@@ -1499,103 +1547,13 @@ class DirIndexStore:
Raises:
StorageError: If FTS search fails
"""
with self._lock:
conn = self._get_connection()
# Fast path: location-only results (no content processing)
if not return_full_content:
try:
rows = conn.execute(
"""
SELECT rowid, full_path, bm25(files_fts_fuzzy) AS rank,
snippet(files_fts_fuzzy, 2, '', '', '...', 30) AS excerpt
FROM files_fts_fuzzy
WHERE files_fts_fuzzy MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS fuzzy search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
results.append(
SearchResult(
path=row["full_path"],
score=score,
excerpt=row["excerpt"],
)
)
return results
# Full content path: fetch content and find containing symbols
try:
rows = conn.execute(
"""
SELECT f.id AS file_id, f.full_path, f.content,
bm25(files_fts_fuzzy) AS rank
FROM files_fts_fuzzy
JOIN files f ON files_fts_fuzzy.rowid = f.id
WHERE files_fts_fuzzy MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
except sqlite3.DatabaseError as exc:
raise StorageError(f"FTS fuzzy search failed: {exc}") from exc
results: List[SearchResult] = []
for row in rows:
file_id = row["file_id"]
file_path = row["full_path"]
content = row["content"] or ""
rank = float(row["rank"]) if row["rank"] is not None else 0.0
score = abs(rank) if rank < 0 else 0.0
# Find matching lines
match_lines = self._find_match_lines(content, query)
first_match_line = match_lines[0] if match_lines else 1
# Find symbol containing the first match
symbol_info = self._find_containing_symbol(conn, file_id, first_match_line)
if symbol_info:
start_line, end_line, symbol_name, symbol_kind = symbol_info
else:
# No symbol found, use context around match
lines = content.split('\n')
total_lines = len(lines)
start_line = max(1, first_match_line - context_lines)
end_line = min(total_lines, first_match_line + context_lines)
symbol_name = None
symbol_kind = None
# Extract code block
block_content, start_line, end_line = self._extract_code_block(
content, start_line, end_line
)
# Generate excerpt (first 200 chars)
excerpt = block_content[:200] + "..." if len(block_content) > 200 else block_content
results.append(
SearchResult(
path=file_path,
score=score,
excerpt=excerpt,
content=block_content,
start_line=start_line,
end_line=end_line,
symbol_name=symbol_name,
symbol_kind=symbol_kind,
)
)
return results
return self._search_internal(
query=query,
fts_table='files_fts_fuzzy',
limit=limit,
return_full_content=return_full_content,
context_lines=context_lines,
)
def search_files_only(self, query: str, limit: int = 20) -> List[str]:
"""Fast FTS search returning only file paths (no snippet generation).