From 18aff260a0c8300e199af26471659fee9c507314 Mon Sep 17 00:00:00 2001 From: catlog22 Date: Thu, 19 Mar 2026 17:47:53 +0800 Subject: [PATCH] feat: Enhance search functionality with quality tiers and scoped indexing - Updated `search_code` function to include a `quality` parameter for search quality tiers: "fast", "balanced", "thorough", and "auto". - Introduced `search_scope` function to limit search results to a specific directory scope. - Added `index_scope` function for indexing a specific directory without re-indexing the entire project. - Refactored `SearchPipeline` to support quality-based routing in the `search` method. - Implemented `Shard` and `ShardManager` classes to manage multiple index shards with LRU eviction and efficient file routing. - Added debounce functionality in `IncrementalIndexer` to batch file events and reduce redundant processing. - Enhanced `FileWatcher` to integrate with `IncrementalIndexer` for improved event handling. --- .claude/agents/team-supervisor.md | 30 +- .claude/agents/team-worker.md | 115 ++++--- .../team-arch-opt/roles/coordinator/role.md | 2 +- .../team-brainstorm/roles/coordinator/role.md | 2 +- .claude/skills/team-coordinate/SKILL.md | 2 +- .../team-coordinate/roles/coordinator/role.md | 4 +- .../phases/03-content-generation.md | 4 +- .../roles/coordinator/commands/monitor.md | 6 +- .../roles/coordinator/role.md | 2 +- .../team-frontend/roles/coordinator/role.md | 2 +- .../roles/coordinator/commands/dispatch.md | 2 +- .../team-iterdev/roles/coordinator/role.md | 4 +- .../roles/coordinator/commands/monitor.md | 2 +- .../roles/coordinator/commands/dispatch.md | 34 +- .../roles/coordinator/commands/monitor.md | 4 +- .../roles/coordinator/commands/dispatch.md | 2 +- .../team-planex/roles/coordinator/role.md | 2 +- .../roles/coordinator/commands/dispatch.md | 44 +-- .../roles/coordinator/commands/monitor.md | 4 +- .../roles/coordinator/role.md | 4 +- .../roles/coordinator/commands/dispatch.md | 2 +- .../roles/coordinator/commands/pause.md | 2 +- .../roles/coordinator/commands/resume.md | 2 +- .../roles/coordinator/role.md | 2 +- .../roles/coordinator/commands/monitor.md | 3 +- .../team-ultra-analyze/role-specs/analyst.md | 89 ------ .../role-specs/discussant.md | 106 ------- .../team-ultra-analyze/role-specs/explorer.md | 73 ----- .../role-specs/synthesizer.md | 77 ----- .../roles/coordinator/commands/monitor.md | 51 ++- .../roles/coordinator/role.md | 36 ++- .../team-ux-improve/role-specs/tester.md | 5 +- .../roles/coordinator/commands/dispatch.md | 35 +-- codex-lens-v2/src/codexlens_search/bridge.py | 99 ++++-- codex-lens-v2/src/codexlens_search/config.py | 13 +- .../src/codexlens_search/core/binary.py | 7 + .../src/codexlens_search/core/factory.py | 33 +- .../src/codexlens_search/core/faiss_index.py | 34 +- .../src/codexlens_search/core/shard.py | 178 +++++++++++ .../codexlens_search/core/shard_manager.py | 250 +++++++++++++++ .../src/codexlens_search/indexing/metadata.py | 147 ++++++++- .../src/codexlens_search/indexing/pipeline.py | 186 +++++++++-- .../src/codexlens_search/mcp_server.py | 126 +++++++- .../src/codexlens_search/search/pipeline.py | 290 ++++++++++++++---- .../codexlens_search/watcher/file_watcher.py | 22 ++ .../watcher/incremental_indexer.py | 56 ++++ 46 files changed, 1537 insertions(+), 658 deletions(-) delete mode 100644 .claude/skills/team-ultra-analyze/role-specs/analyst.md delete mode 100644 .claude/skills/team-ultra-analyze/role-specs/discussant.md delete mode 100644 .claude/skills/team-ultra-analyze/role-specs/explorer.md delete mode 100644 .claude/skills/team-ultra-analyze/role-specs/synthesizer.md create mode 100644 codex-lens-v2/src/codexlens_search/core/shard.py create mode 100644 codex-lens-v2/src/codexlens_search/core/shard_manager.py diff --git a/.claude/agents/team-supervisor.md b/.claude/agents/team-supervisor.md index aeec1fbc..1f7a53bc 100644 --- a/.claude/agents/team-supervisor.md +++ b/.claude/agents/team-supervisor.md @@ -40,7 +40,7 @@ Parse the following fields from your prompt: | `role_spec` | Yes | Path to supervisor role.md | | `session` | Yes | Session folder path | | `session_id` | Yes | Session ID for message bus operations | -| `team_name` | Yes | Team name for SendMessage | +| `team_name` | Yes | Team name (used by Agent spawn for message routing; NOT used directly in SendMessage calls) | | `requirement` | Yes | Original task/requirement description | | `recovery` | No | `true` if respawned after crash — triggers recovery protocol | @@ -94,14 +94,13 @@ team_msg(operation="get_state", session_id=) // all roles ``` - Record which roles have completed, their key_findings, decisions - Read `/wisdom/*.md` — absorb accumulated team knowledge -- Read `/team-session.json` — understand pipeline mode, stages +- Read `/session.json` — understand pipeline mode, stages ### Step 3: Report Ready ```javascript SendMessage({ - type: "message", - recipient: "coordinator", - content: "[supervisor] Resident supervisor ready. Baseline loaded for session . Awaiting checkpoint assignments.", + to: "coordinator", + message: "[supervisor] Resident supervisor ready. Baseline loaded for session . Awaiting checkpoint assignments.", summary: "[supervisor] Ready, awaiting checkpoints" }) ``` @@ -194,9 +193,8 @@ context_accumulator.append({ ### Step 9: Report to Coordinator ```javascript SendMessage({ - type: "message", - recipient: "coordinator", - content: "[supervisor] CHECKPOINT-NNN complete.\nVerdict: (score: )\nFindings: \nRisks: logged\nQuality trend: \nArtifact: ", + to: "coordinator", + message: "[supervisor] CHECKPOINT-NNN complete.\nVerdict: (score: )\nFindings: \nRisks: logged\nQuality trend: \nArtifact: ", summary: "[supervisor] CHECKPOINT-NNN: " }) ``` @@ -220,17 +218,23 @@ If spawned with `recovery: true` in prompt: ## Shutdown Protocol -When receiving a `shutdown_request` message: +When a new conversation turn delivers a message containing `type: "shutdown_request"`: + +1. Extract `requestId` from the received message JSON (system injects this field at delivery time) +2. Respond via SendMessage: ```javascript SendMessage({ - type: "shutdown_response", - request_id: "", - approve: true + to: "coordinator", + message: { + type: "shutdown_response", + request_id: "", + approve: true + } }) ``` -Agent terminates. +Agent terminates after sending response. --- diff --git a/.claude/agents/team-worker.md b/.claude/agents/team-worker.md index 840bce26..3213acf5 100644 --- a/.claude/agents/team-worker.md +++ b/.claude/agents/team-worker.md @@ -2,8 +2,8 @@ name: team-worker description: | Unified worker agent for team-lifecycle. Contains all shared team behavior - (Phase 1 Task Discovery, Phase 5 Report + Fast-Advance, Message Bus, Consensus - Handling, Inner Loop lifecycle). Loads role-specific Phase 2-4 logic from a + (Phase 1 Task Discovery, Phase 5 Report + Pipeline Notification, Message Bus, + Consensus Handling, Inner Loop lifecycle). Loads role-specific Phase 2-4 logic from a role_spec markdown file passed in the prompt. Examples: @@ -21,7 +21,7 @@ color: green You are a **team-lifecycle worker agent**. You execute a specific role within a team pipeline. Your behavior is split into: -- **Built-in phases** (Phase 1, Phase 5): Task discovery, reporting, fast-advance, inner loop — defined below. +- **Built-in phases** (Phase 1, Phase 5): Task discovery, reporting, pipeline notification, inner loop — defined below. - **Role-specific phases** (Phase 2-4): Loaded from a role_spec markdown file. --- @@ -36,7 +36,7 @@ Parse the following fields from your prompt: | `role_spec` | Yes | Path to role-spec .md file containing Phase 2-4 instructions | | `session` | Yes | Session folder path (e.g., `.workflow/.team/TLS-xxx-2026-02-27`) | | `session_id` | Yes | Session ID (folder name, e.g., `TLS-xxx-2026-02-27`). Used directly as `session_id` param for all message bus operations | -| `team_name` | Yes | Team name for SendMessage | +| `team_name` | Yes | Team name (used by Agent spawn for message routing; NOT used directly in SendMessage calls) | | `requirement` | Yes | Original task/requirement description | | `inner_loop` | Yes | `true` or `false` — whether to loop through same-prefix tasks | @@ -82,7 +82,7 @@ Entry: | team_msg state_update | YES | YES | | Accumulate summary | YES | - | | SendMessage to coordinator | NO | YES (all tasks) | -| Fast-Advance check | - | YES | +| Pipeline status check | - | YES | **Interrupt conditions** (break inner loop immediately): - consensus_blocked HIGH → SendMessage → STOP @@ -99,6 +99,7 @@ Execute on every loop iteration: - Subject starts with this role's `prefix` + `-` (e.g., `DRAFT-`, `IMPL-`) - Status is `pending` - `blockedBy` list is empty (all dependencies resolved) + - **Owner matches** `agent_name` from prompt (e.g., task owner "explorer-1" matches agent_name "explorer-1"). This prevents parallel workers from claiming each other's tasks. - If role has `additional_prefixes` (e.g., reviewer handles REVIEW-* + QUALITY-* + IMPROVE-*), check all prefixes 3. **No matching tasks?** - If first iteration → report idle, SendMessage "No tasks found for [role]", STOP @@ -153,7 +154,7 @@ mcp__ccw-tools__team_msg({ summary: "Request exploration agent for X", data: { reason: "...", scope: "..." } }) -SendMessage({ recipient: "coordinator", content: "..." }) +SendMessage({ to: "coordinator", message: "...", summary: "Request agent delegation" }) ``` ### Consensus Handling @@ -180,7 +181,7 @@ Discussion: /discussions/-discussion.md --- -## Phase 5: Report + Fast-Advance (Built-in) +## Phase 5: Report + Pipeline Notification (Built-in) After Phase 4 completes, determine Phase 5 variant (see Execution Flow for decision table). @@ -228,62 +229,29 @@ After Phase 4 completes, determine Phase 5 variant (see Execution Flow for decis 1. **TaskUpdate**: Mark current task `completed` 2. **Message Bus**: Log state_update (same call as Phase 5-L step 2) -3. **Compile final report** and **SendMessage** to coordinator: +3. **Compile final report + pipeline status**, then send **one single SendMessage** to coordinator: + + First, call `TaskList()` to check pipeline status. Then compose and send: + ```javascript SendMessage({ - type: "message", - recipient: "coordinator", - content: "[] ", + to: "coordinator", + message: "[] Final report:\n\n\nPipeline status: ", summary: "[] Final report delivered" }) ``` - Report contents: tasks completed (count + list), artifacts produced (paths), files modified (with evidence), discuss results (verdicts + ratings), key decisions (from context_accumulator), verification summary, warnings/issues. -4. **Fast-Advance Check**: Call `TaskList()`, find pending tasks whose blockedBy are ALL completed, apply rules: -| Condition | Action | -|-----------|--------| -| Same-prefix successor (inner loop role) | Do NOT spawn — main agent handles via inner loop | -| 1 ready task, simple linear successor, different prefix | Spawn directly via `Agent(run_in_background: true)` + log `fast_advance` | -| Multiple ready tasks (parallel window) | SendMessage to coordinator (needs orchestration) | -| No ready tasks + others running | SendMessage to coordinator (status update) | -| No ready tasks + nothing running | SendMessage to coordinator (pipeline may be complete) | -| Checkpoint task (e.g., spec->impl transition) | SendMessage to coordinator (needs user confirmation) | + **Report body** includes: tasks completed (count + list), artifacts produced (paths), files modified (with evidence), discuss results (verdicts + ratings), key decisions (from context_accumulator), verification summary, warnings/issues. -### Fast-Advance Spawn + **Status line** (append to same message based on TaskList scan): -When fast-advancing to a different-prefix successor: + | Condition | Status line | + |-----------|-------------| + | 1+ ready tasks (unblocked) | `"Tasks unblocked: . Ready for next stage."` | + | No ready tasks + others running | `"All my tasks done. Other tasks still running."` | + | No ready tasks + nothing running | `"All my tasks done. Pipeline may be complete."` | -``` -Agent({ - subagent_type: "team-worker", - description: "Spawn worker", - team_name: , - name: "", - run_in_background: true, - prompt: `## Role Assignment -role: -role_spec: /role-specs/.md -session: -session_id: -team_name: -requirement: -inner_loop: ` -}) -``` - -After spawning, MUST log to message bus (passive log, NOT a SendMessage): - -``` -mcp__ccw-tools__team_msg( - operation="log", - session_id=, - from=, - type="fast_advance", - summary="[] fast-advanced → spawned for " -) -``` - -Coordinator reads this on next callback to reconcile `active_workers`. + **IMPORTANT**: Send exactly ONE SendMessage per Phase 5-F. Multiple SendMessage calls in one turn have undefined delivery behavior. Do NOT spawn agents — coordinator handles all spawning. --- @@ -306,7 +274,7 @@ The worker MUST load available cross-role context before executing role-spec Pha After Phase 4 verification, the worker MUST publish its contributions: -1. **Artifact**: Write deliverable to `/artifacts/--.md` +1. **Artifact**: Write deliverable to the path specified by role_spec Phase 4. If role_spec does not specify a path, use default: `/artifacts/--.md` 2. **State data**: Prepare payload for Phase 5 `state_update` message (see Phase 5-L step 2 for schema) 3. **Wisdom**: Append new patterns to `learnings.md`, decisions to `decisions.md`, issues to `issues.md` 4. **Context accumulator** (inner_loop only): Append summary (see Phase 5-L step 3 for schema). Maintain full accumulator for context continuity across iterations. @@ -324,9 +292,18 @@ Load in Phase 2 to inform execution. Contribute in Phase 4/5 with discoveries. --- -## Message Bus Protocol +## Communication Protocols -Always use `mcp__ccw-tools__team_msg` for team communication. +### Addressing Convention + +- **SendMessage**: For triggering coordinator turns (auto-delivered). Always use `to: "coordinator"` — the main conversation context (team lead) is always addressable as `"coordinator"` regardless of team name. +- **mcp__ccw-tools__team_msg**: For persistent state logging and cross-role queries (manual). Uses `session_id`, not team_name. + +SendMessage triggers coordinator action; team_msg persists state for other roles to query. Always do **both** in Phase 5: team_msg first (state), then SendMessage (notification). + +### Message Bus Protocol + +Always use `mcp__ccw-tools__team_msg` for state persistence and cross-role queries. ### log (with state_update) — Primary for Phase 5 @@ -380,11 +357,33 @@ ccw team log --session-id --from --type --json | Process own prefix tasks | Process other role's prefix tasks | | SendMessage to coordinator | Directly communicate with other workers | | Use CLI tools for analysis/exploration | Create tasks for other roles | -| Fast-advance simple successors | Spawn parallel worker batches | +| Notify coordinator of unblocked tasks | Spawn agents (workers cannot call Agent) | | Write to own artifacts + wisdom | Modify resources outside own scope | --- +## Shutdown Handling + +When a new conversation turn delivers a message containing `type: "shutdown_request"`: + +1. Extract `requestId` from the received message JSON (system injects this field at delivery time) +2. Respond via SendMessage: + +```javascript +SendMessage({ + to: "coordinator", + message: { + type: "shutdown_response", + request_id: "", + approve: true + } +}) +``` + +Agent terminates after sending response. Note: messages are only delivered between turns, so you are always idle when receiving this — no in-progress work to worry about. For ephemeral workers (inner_loop=false) that already reached STOP, SendMessage from coordinator is silently ignored — this handler is a safety net for inner_loop=true workers or workers in idle states. + +--- + ## Error Handling | Scenario | Resolution | diff --git a/.claude/skills/team-arch-opt/roles/coordinator/role.md b/.claude/skills/team-arch-opt/roles/coordinator/role.md index 8cb70aac..f8c3d81d 100644 --- a/.claude/skills/team-arch-opt/roles/coordinator/role.md +++ b/.claude/skills/team-arch-opt/roles/coordinator/role.md @@ -103,7 +103,7 @@ TEXT-LEVEL ONLY. No source code reading. Delegate to @commands/dispatch.md: 1. Read dependency graph and parallel mode from session.json 2. Topological sort tasks -3. Create tasks via TaskCreate with blockedBy +3. Create tasks via TaskCreate, then set dependencies via TaskUpdate({ addBlockedBy }) 4. Update session.json with task count ## Phase 4: Spawn-and-Stop diff --git a/.claude/skills/team-brainstorm/roles/coordinator/role.md b/.claude/skills/team-brainstorm/roles/coordinator/role.md index 6f1fec98..db14de0a 100644 --- a/.claude/skills/team-brainstorm/roles/coordinator/role.md +++ b/.claude/skills/team-brainstorm/roles/coordinator/role.md @@ -99,7 +99,7 @@ TEXT-LEVEL ONLY. No source code reading. Delegate to @commands/dispatch.md: 1. Read pipeline mode and angles from session.json -2. Create tasks for selected pipeline with correct blockedBy +2. Create tasks for selected pipeline, then set dependencies via TaskUpdate({ addBlockedBy }) 3. Update session.json with task count ## Phase 4: Spawn-and-Stop diff --git a/.claude/skills/team-coordinate/SKILL.md b/.claude/skills/team-coordinate/SKILL.md index b03e5085..85396764 100644 --- a/.claude/skills/team-coordinate/SKILL.md +++ b/.claude/skills/team-coordinate/SKILL.md @@ -241,7 +241,7 @@ Coordinator supports `resume` / `continue` for interrupted sessions: 3. Audit TaskList -> reconcile session state <-> task status 4. Reset in_progress -> pending (interrupted tasks) 5. Rebuild team and spawn needed workers only -6. Create missing tasks with correct blockedBy +6. Create missing tasks, set dependencies via TaskUpdate({ addBlockedBy }) 7. Kick first executable task -> Phase 4 coordination loop --- diff --git a/.claude/skills/team-coordinate/roles/coordinator/role.md b/.claude/skills/team-coordinate/roles/coordinator/role.md index 8b8082a1..a759f82c 100644 --- a/.claude/skills/team-coordinate/roles/coordinator/role.md +++ b/.claude/skills/team-coordinate/roles/coordinator/role.md @@ -144,7 +144,7 @@ For callback/check/resume/adapt/complete: load `@commands/monitor.md` and execut 4. Detect fast-advance orphans (in_progress without recent activity) -> reset to pending 5. Determine remaining pipeline from reconciled state 6. Rebuild team if disbanded (TeamCreate + spawn needed workers only) -7. Create missing tasks with correct blockedBy dependencies +7. Create missing tasks, set dependencies via TaskUpdate({ addBlockedBy }) 8. Verify dependency chain integrity 9. Update session file with reconciled state 10. Kick first executable task's worker -> Phase 4 @@ -278,7 +278,7 @@ mcp__ccw-tools__team_msg({ Delegate to `@commands/dispatch.md` which creates the full task chain: 1. Reads dependency_graph from task-analysis.json 2. Topological sorts tasks -3. Creates tasks via TaskCreate with correct blockedBy +3. Creates tasks via TaskCreate, then sets dependencies via TaskUpdate({ addBlockedBy }) 4. Assigns owner based on role mapping from task-analysis.json 5. Includes `Session: ` in every task description 6. Sets InnerLoop flag for multi-task roles diff --git a/.claude/skills/team-designer/phases/03-content-generation.md b/.claude/skills/team-designer/phases/03-content-generation.md index 11a811a4..9558fb24 100644 --- a/.claude/skills/team-designer/phases/03-content-generation.md +++ b/.claude/skills/team-designer/phases/03-content-generation.md @@ -77,7 +77,7 @@ If `.workflow/.team/${teamConfig.sessionPrefix}-*/team-session.json` exists: ## Phase 3: Dispatch - Execute `commands/dispatch.md` -- Creates TaskCreate calls with blockedBy dependencies +- Creates TaskCreate calls, then sets dependencies via TaskUpdate({ addBlockedBy }) ## Phase 4: Spawn & Monitor @@ -144,7 +144,7 @@ Write `task-analysis.json` to session directory: Template — includes: - Topological sort from dependency graph -- TaskCreate with blockedBy +- TaskCreate + TaskUpdate({ addBlockedBy }) for dependencies - Task description template (PURPOSE/TASK/CONTEXT/EXPECTED/CONSTRAINTS) ### coordinator/commands/monitor.md diff --git a/.claude/skills/team-frontend-debug/roles/coordinator/commands/monitor.md b/.claude/skills/team-frontend-debug/roles/coordinator/commands/monitor.md index 7ee95961..7381a5c5 100644 --- a/.claude/skills/team-frontend-debug/roles/coordinator/commands/monitor.md +++ b/.claude/skills/team-frontend-debug/roles/coordinator/commands/monitor.md @@ -44,10 +44,10 @@ Analyzer needs more evidence. Create supplemental reproduction task. 1. Parse Analyzer's evidence request (dimensions, specific actions) 2. Create REPRODUCE-002 task: - TaskCreate with description from Analyzer's request - - blockedBy: [] (can start immediately) + - TaskUpdate to set owner (no blockedBy — can start immediately) 3. Create ANALYZE-002 task: - - blockedBy: [REPRODUCE-002] - - Update FIX-001 blockedBy to include ANALYZE-002 + - TaskCreate + TaskUpdate with addBlockedBy: [REPRODUCE-002] + - TaskUpdate FIX-001 with addBlockedBy to include ANALYZE-002 4. Update team-session.json with new tasks 5. -> handleSpawnNext diff --git a/.claude/skills/team-frontend-debug/roles/coordinator/role.md b/.claude/skills/team-frontend-debug/roles/coordinator/role.md index 590dfaf9..c00f3c3a 100644 --- a/.claude/skills/team-frontend-debug/roles/coordinator/role.md +++ b/.claude/skills/team-frontend-debug/roles/coordinator/role.md @@ -98,7 +98,7 @@ Delegate to @commands/dispatch.md: 1. Read dependency graph from task-analysis.json 2. Read specs/pipelines.md for debug-pipeline task registry 3. Topological sort tasks -4. Create tasks via TaskCreate with blockedBy +4. Create tasks via TaskCreate, then set blockedBy via TaskUpdate 5. Update team-session.json ## Phase 4: Spawn-and-Stop diff --git a/.claude/skills/team-frontend/roles/coordinator/role.md b/.claude/skills/team-frontend/roles/coordinator/role.md index 87d12386..794ec6c1 100644 --- a/.claude/skills/team-frontend/roles/coordinator/role.md +++ b/.claude/skills/team-frontend/roles/coordinator/role.md @@ -115,7 +115,7 @@ mcp__ccw-tools__team_msg({ Delegate to @commands/dispatch.md: 1. Read specs/pipelines.md for selected pipeline task registry -2. Create tasks via TaskCreate with blockedBy +2. Create tasks via TaskCreate, then set blockedBy via TaskUpdate 3. Update session.json ## Phase 4: Spawn-and-Stop diff --git a/.claude/skills/team-iterdev/roles/coordinator/commands/dispatch.md b/.claude/skills/team-iterdev/roles/coordinator/commands/dispatch.md index be4215fa..edc897ad 100644 --- a/.claude/skills/team-iterdev/roles/coordinator/commands/dispatch.md +++ b/.claude/skills/team-iterdev/roles/coordinator/commands/dispatch.md @@ -227,7 +227,7 @@ Verify task chain integrity: | Check | Method | Expected | |-------|--------|----------| | Task count correct | TaskList count | patch: 2, sprint: 4, multi: 5+ | -| Dependencies correct | Trace blockedBy graph | Acyclic, correct ordering | +| Dependencies correct | Trace addBlockedBy graph | Acyclic, correct ordering | | No circular dependencies | Trace full graph | Acyclic | | Structured descriptions | Each has PURPOSE/TASK/CONTEXT/EXPECTED | All present | diff --git a/.claude/skills/team-iterdev/roles/coordinator/role.md b/.claude/skills/team-iterdev/roles/coordinator/role.md index bb0ca040..38e5accc 100644 --- a/.claude/skills/team-iterdev/roles/coordinator/role.md +++ b/.claude/skills/team-iterdev/roles/coordinator/role.md @@ -111,13 +111,13 @@ mcp__ccw-tools__team_msg({ Delegate to @commands/dispatch.md: 1. Read specs/pipelines.md for selected pipeline task registry -2. Create tasks via TaskCreate with blockedBy +2. Create tasks via TaskCreate, then TaskUpdate with addBlockedBy 3. Update task-ledger.json ## Phase 4: Spawn-and-Stop Delegate to @commands/monitor.md#handleSpawnNext: -1. Find ready tasks (pending + blockedBy resolved) +1. Find ready tasks (pending + all addBlockedBy dependencies resolved) 2. Spawn team-worker agents (see SKILL.md Spawn Template) 3. Output status summary 4. STOP diff --git a/.claude/skills/team-lifecycle-v4/roles/coordinator/commands/monitor.md b/.claude/skills/team-lifecycle-v4/roles/coordinator/commands/monitor.md index 9d01293c..88a1999c 100644 --- a/.claude/skills/team-lifecycle-v4/roles/coordinator/commands/monitor.md +++ b/.claude/skills/team-lifecycle-v4/roles/coordinator/commands/monitor.md @@ -105,7 +105,7 @@ Pipeline done. Generate report and completion action. 1. Shutdown resident supervisor (if active): ``` - SendMessage({ type: "shutdown_request", recipient: "supervisor", content: "Pipeline complete" }) + SendMessage({ to: "supervisor", message: { type: "shutdown_request", reason: "Pipeline complete" } }) ``` 2. Generate summary (deliverables, stats, discussions) 3. Read session.completion_action: diff --git a/.claude/skills/team-perf-opt/roles/coordinator/commands/dispatch.md b/.claude/skills/team-perf-opt/roles/coordinator/commands/dispatch.md index 68598f9d..89b3ffc1 100644 --- a/.claude/skills/team-perf-opt/roles/coordinator/commands/dispatch.md +++ b/.claude/skills/team-perf-opt/roles/coordinator/commands/dispatch.md @@ -27,7 +27,6 @@ Every task description uses structured format for clarity: ``` TaskCreate({ subject: "", - owner: "", description: "PURPOSE: | Success: TASK: - @@ -44,9 +43,9 @@ CONSTRAINTS: --- InnerLoop: BranchId: ", - blockedBy: [], status: "pending" }) +TaskUpdate({ taskId: "", addBlockedBy: [], owner: "" }) ``` ### Mode Router @@ -106,9 +105,9 @@ EXPECTED: /artifacts/optimization-plan.md | Priority-ordered with impro CONSTRAINTS: Focus on highest-impact optimizations | Risk assessment required | Non-overlapping file targets per OPT-ID --- InnerLoop: false", - blockedBy: ["PROFILE-001"], status: "pending" }) +TaskUpdate({ taskId: "STRATEGY-001", addBlockedBy: ["PROFILE-001"] }) ``` **IMPL-001** (optimizer, Stage 3): @@ -130,9 +129,9 @@ EXPECTED: Modified source files + validation passing | Optimizations applied wit CONSTRAINTS: Preserve existing behavior | Minimal changes per optimization | Follow code conventions --- InnerLoop: true", - blockedBy: ["STRATEGY-001"], status: "pending" }) +TaskUpdate({ taskId: "IMPL-001", addBlockedBy: ["STRATEGY-001"] }) ``` **BENCH-001** (benchmarker, Stage 4 - parallel): @@ -154,9 +153,9 @@ EXPECTED: /artifacts/benchmark-results.json | Per-metric comparison wit CONSTRAINTS: Must compare against baseline | Flag any regressions --- InnerLoop: false", - blockedBy: ["IMPL-001"], status: "pending" }) +TaskUpdate({ taskId: "BENCH-001", addBlockedBy: ["IMPL-001"] }) ``` **REVIEW-001** (reviewer, Stage 4 - parallel): @@ -178,9 +177,9 @@ EXPECTED: /artifacts/review-report.md | Per-dimension findings with sev CONSTRAINTS: Focus on optimization changes only | Provide specific file:line references --- InnerLoop: false", - blockedBy: ["IMPL-001"], status: "pending" }) +TaskUpdate({ taskId: "REVIEW-001", addBlockedBy: ["IMPL-001"] }) ``` --- @@ -207,11 +206,16 @@ For each target index `i` (0-based), with prefix char `P = pipeline_prefix_chars // Create session subdirectory for this pipeline Bash("mkdir -p /artifacts/pipelines/

") -TaskCreate({ subject: "PROFILE-

01", ... }) // blockedBy: [] -TaskCreate({ subject: "STRATEGY-

01", ... }) // blockedBy: ["PROFILE-

01"] -TaskCreate({ subject: "IMPL-

01", ... }) // blockedBy: ["STRATEGY-

01"] -TaskCreate({ subject: "BENCH-

01", ... }) // blockedBy: ["IMPL-

01"] -TaskCreate({ subject: "REVIEW-

01", ... }) // blockedBy: ["IMPL-

01"] +TaskCreate({ subject: "PROFILE-

01", ... }) +TaskCreate({ subject: "STRATEGY-

01", ... }) +TaskCreate({ subject: "IMPL-

01", ... }) +TaskCreate({ subject: "BENCH-

01", ... }) +TaskCreate({ subject: "REVIEW-

01", ... }) +// Then set dependencies via TaskUpdate: +TaskUpdate({ taskId: "STRATEGY-

01", addBlockedBy: ["PROFILE-

01"] }) +TaskUpdate({ taskId: "IMPL-

01", addBlockedBy: ["STRATEGY-

01"] }) +TaskUpdate({ taskId: "BENCH-

01", addBlockedBy: ["IMPL-

01"] }) +TaskUpdate({ taskId: "REVIEW-

01", addBlockedBy: ["IMPL-

01"] }) ``` Task descriptions follow same template as single mode, with additions: @@ -295,9 +299,9 @@ CONSTRAINTS: Only implement this branch's optimization | Do not touch files outs --- InnerLoop: false BranchId: B{NN}", - blockedBy: ["STRATEGY-001"], status: "pending" }) +TaskUpdate({ taskId: "IMPL-B{NN}", addBlockedBy: ["STRATEGY-001"] }) TaskCreate({ subject: "BENCH-B{NN}", @@ -316,9 +320,9 @@ CONSTRAINTS: Only benchmark this branch's metrics --- InnerLoop: false BranchId: B{NN}", - blockedBy: ["IMPL-B{NN}"], status: "pending" }) +TaskUpdate({ taskId: "BENCH-B{NN}", addBlockedBy: ["IMPL-B{NN}"] }) TaskCreate({ subject: "REVIEW-B{NN}", @@ -337,9 +341,9 @@ CONSTRAINTS: Only review this branch's changes --- InnerLoop: false BranchId: B{NN}", - blockedBy: ["IMPL-B{NN}"], status: "pending" }) +TaskUpdate({ taskId: "REVIEW-B{NN}", addBlockedBy: ["IMPL-B{NN}"] }) ``` 7. Update session.json: @@ -355,7 +359,7 @@ Verify task chain integrity: | Check | Method | Expected | |-------|--------|----------| | Task count correct | TaskList count | single: 5, auto/fan-out: 2 (pre-CP-2.5), independent: 5*M | -| Dependencies correct | Trace dependency graph | Acyclic, correct blockedBy | +| Dependencies correct | Trace dependency graph | Acyclic, correct addBlockedBy | | No circular dependencies | Trace dependency graph | Acyclic | | Task IDs use correct prefixes | Pattern check | Match naming rules per mode | | Structured descriptions complete | Each has PURPOSE/TASK/CONTEXT/EXPECTED/CONSTRAINTS | All present | diff --git a/.claude/skills/team-perf-opt/roles/coordinator/commands/monitor.md b/.claude/skills/team-perf-opt/roles/coordinator/commands/monitor.md index c435fc27..a4f198fe 100644 --- a/.claude/skills/team-perf-opt/roles/coordinator/commands/monitor.md +++ b/.claude/skills/team-perf-opt/roles/coordinator/commands/monitor.md @@ -172,7 +172,6 @@ CONSTRAINTS: Targeted fixes only | Do not touch other branches --- InnerLoop: false BranchId: B{NN}", - blockedBy: [], status: "pending" }) ``` @@ -186,7 +185,6 @@ Create new BENCH and REVIEW with retry suffix: TaskCreate({ subject: "FIX-{P}01-{cycle}", ...same pattern with pipeline prefix... - blockedBy: [], status: "pending" }) ``` @@ -310,7 +308,7 @@ Triggered by user "revise [feedback]" command. 1. Parse target task ID and optional feedback 2. Detect branch/pipeline from task ID pattern 3. Create revision task with same role but updated requirements, scoped to branch -4. Set blockedBy to empty (immediate execution) +4. Skip addBlockedBy (no dependencies, immediate execution) 5. Cascade: create new downstream tasks within same branch only 6. Proceed to handleSpawnNext diff --git a/.claude/skills/team-planex/roles/coordinator/commands/dispatch.md b/.claude/skills/team-planex/roles/coordinator/commands/dispatch.md index d6125c6f..5202d0a2 100644 --- a/.claude/skills/team-planex/roles/coordinator/commands/dispatch.md +++ b/.claude/skills/team-planex/roles/coordinator/commands/dispatch.md @@ -32,7 +32,7 @@ Execution method: ## Instructions 1. Parse input to get issue list 2. For each issue: call issue-plan-agent → write solution artifact -3. After each solution: create EXEC-* task (owner: executor) with solution_file path +3. After each solution: create EXEC-* task with solution_file path, then TaskUpdate to set owner: executor 4. After all issues: send all_planned signal InnerLoop: true`, diff --git a/.claude/skills/team-planex/roles/coordinator/role.md b/.claude/skills/team-planex/roles/coordinator/role.md index e66f42ad..5130bcf6 100644 --- a/.claude/skills/team-planex/roles/coordinator/role.md +++ b/.claude/skills/team-planex/roles/coordinator/role.md @@ -46,7 +46,7 @@ For callback/check/resume: load `@commands/monitor.md` and execute the appropria 1. Parse new input (Issue IDs / `--text` / `--plan`) 2. Get current max PLAN-* sequence from `TaskList` -3. `TaskCreate` new PLAN-00N task (owner: planner) +3. `TaskCreate` new PLAN-00N task, then `TaskUpdate` to set owner: planner 4. If planner already sent `all_planned` (check team_msg) -> `SendMessage` to planner to re-enter loop 5. STOP diff --git a/.claude/skills/team-quality-assurance/roles/coordinator/commands/dispatch.md b/.claude/skills/team-quality-assurance/roles/coordinator/commands/dispatch.md index e5b9765b..2b2aeb74 100644 --- a/.claude/skills/team-quality-assurance/roles/coordinator/commands/dispatch.md +++ b/.claude/skills/team-quality-assurance/roles/coordinator/commands/dispatch.md @@ -1,16 +1,16 @@ # Dispatch Tasks -Create task chains from dependency graph with proper blockedBy relationships. +Create task chains from dependency graph with proper addBlockedBy relationships. ## Workflow 1. Read task-analysis.json -> extract pipeline_mode and dependency_graph 2. Read specs/pipelines.md -> get task registry for selected pipeline -3. Topological sort tasks (respect blockedBy) +3. Topological sort tasks (respect addBlockedBy) 4. Validate all owners exist in role registry (SKILL.md) 5. For each task (in order): - TaskCreate with structured description (see template below) - - TaskUpdate with blockedBy + owner assignment + - TaskUpdate with addBlockedBy + owner assignment 6. Update session.json with pipeline.tasks_total 7. Validate chain (no orphans, no cycles, all refs valid) @@ -38,51 +38,51 @@ RoleSpec: ~ or /.claude/skills/team-quality-assurance/roles//rol ### Discovery Mode ``` SCOUT-001 (scout): Multi-perspective issue scanning - blockedBy: [] + addBlockedBy: [] QASTRAT-001 (strategist): Test strategy formulation - blockedBy: [SCOUT-001] + addBlockedBy: [SCOUT-001] QAGEN-001 (generator): L1 unit test generation - blockedBy: [QASTRAT-001], meta: layer=L1 + addBlockedBy: [QASTRAT-001], meta: layer=L1 QARUN-001 (executor): L1 test execution + fix cycles - blockedBy: [QAGEN-001], inner_loop: true, meta: layer=L1 + addBlockedBy: [QAGEN-001], inner_loop: true, meta: layer=L1 QAANA-001 (analyst): Quality analysis report - blockedBy: [QARUN-001] + addBlockedBy: [QARUN-001] ``` ### Testing Mode ``` QASTRAT-001 (strategist): Test strategy formulation - blockedBy: [] + addBlockedBy: [] QAGEN-L1-001 (generator): L1 unit test generation - blockedBy: [QASTRAT-001], meta: layer=L1 + addBlockedBy: [QASTRAT-001], meta: layer=L1 QARUN-L1-001 (executor): L1 test execution + fix cycles - blockedBy: [QAGEN-L1-001], inner_loop: true, meta: layer=L1 + addBlockedBy: [QAGEN-L1-001], inner_loop: true, meta: layer=L1 QAGEN-L2-001 (generator): L2 integration test generation - blockedBy: [QARUN-L1-001], meta: layer=L2 + addBlockedBy: [QARUN-L1-001], meta: layer=L2 QARUN-L2-001 (executor): L2 test execution + fix cycles - blockedBy: [QAGEN-L2-001], inner_loop: true, meta: layer=L2 + addBlockedBy: [QAGEN-L2-001], inner_loop: true, meta: layer=L2 QAANA-001 (analyst): Quality analysis report - blockedBy: [QARUN-L2-001] + addBlockedBy: [QARUN-L2-001] ``` ### Full Mode ``` SCOUT-001 (scout): Multi-perspective issue scanning - blockedBy: [] + addBlockedBy: [] QASTRAT-001 (strategist): Test strategy formulation - blockedBy: [SCOUT-001] + addBlockedBy: [SCOUT-001] QAGEN-L1-001 (generator-1): L1 unit test generation - blockedBy: [QASTRAT-001], meta: layer=L1 + addBlockedBy: [QASTRAT-001], meta: layer=L1 QAGEN-L2-001 (generator-2): L2 integration test generation - blockedBy: [QASTRAT-001], meta: layer=L2 + addBlockedBy: [QASTRAT-001], meta: layer=L2 QARUN-L1-001 (executor-1): L1 test execution + fix cycles - blockedBy: [QAGEN-L1-001], inner_loop: true, meta: layer=L1 + addBlockedBy: [QAGEN-L1-001], inner_loop: true, meta: layer=L1 QARUN-L2-001 (executor-2): L2 test execution + fix cycles - blockedBy: [QAGEN-L2-001], inner_loop: true, meta: layer=L2 + addBlockedBy: [QAGEN-L2-001], inner_loop: true, meta: layer=L2 QAANA-001 (analyst): Quality analysis report - blockedBy: [QARUN-L1-001, QARUN-L2-001] + addBlockedBy: [QARUN-L1-001, QARUN-L2-001] SCOUT-002 (scout): Regression scan after fixes - blockedBy: [QAANA-001] + addBlockedBy: [QAANA-001] ``` ## InnerLoop Flag Rules diff --git a/.claude/skills/team-quality-assurance/roles/coordinator/commands/monitor.md b/.claude/skills/team-quality-assurance/roles/coordinator/commands/monitor.md index 0e9bb888..b6155adf 100644 --- a/.claude/skills/team-quality-assurance/roles/coordinator/commands/monitor.md +++ b/.claude/skills/team-quality-assurance/roles/coordinator/commands/monitor.md @@ -72,9 +72,9 @@ EXPECTED: /results/run--gc-.json CONSTRAINTS: Read-only execution --- InnerLoop: false -RoleSpec: ~ or /.claude/skills/team-quality-assurance/roles/executor/role.md", - blockedBy: ["QAGEN-fix-"] +RoleSpec: ~ or /.claude/skills/team-quality-assurance/roles/executor/role.md" }) +TaskUpdate({ taskId: "QARUN-gc-", addBlockedBy: ["QAGEN-fix-"] }) ``` 6. -> handleSpawnNext diff --git a/.claude/skills/team-quality-assurance/roles/coordinator/role.md b/.claude/skills/team-quality-assurance/roles/coordinator/role.md index f4c4d5fb..d2ece6f9 100644 --- a/.claude/skills/team-quality-assurance/roles/coordinator/role.md +++ b/.claude/skills/team-quality-assurance/roles/coordinator/role.md @@ -111,13 +111,13 @@ Delegate to @commands/dispatch.md: 1. Read dependency graph from task-analysis.json 2. Read specs/pipelines.md for selected pipeline's task registry 3. Topological sort tasks -4. Create tasks via TaskCreate with blockedBy +4. Create tasks via TaskCreate, then TaskUpdate with addBlockedBy 5. Update session.json ## Phase 4: Spawn-and-Stop Delegate to @commands/monitor.md#handleSpawnNext: -1. Find ready tasks (pending + blockedBy resolved) +1. Find ready tasks (pending + all addBlockedBy dependencies resolved) 2. Spawn team-worker agents (see SKILL.md Spawn Template) 3. Output status summary 4. STOP diff --git a/.claude/skills/team-roadmap-dev/roles/coordinator/commands/dispatch.md b/.claude/skills/team-roadmap-dev/roles/coordinator/commands/dispatch.md index bf28dc98..314cec84 100644 --- a/.claude/skills/team-roadmap-dev/roles/coordinator/commands/dispatch.md +++ b/.claude/skills/team-roadmap-dev/roles/coordinator/commands/dispatch.md @@ -150,7 +150,7 @@ mcp__ccw-tools__team_msg({ operation: "log", session_id: sessionId, from: "coordinator", to: "all", type: "phase_started", - ref: `${sessionFolder}/roadmap.md` + data: { ref: `${sessionFolder}/roadmap.md` } }) ``` diff --git a/.claude/skills/team-roadmap-dev/roles/coordinator/commands/pause.md b/.claude/skills/team-roadmap-dev/roles/coordinator/commands/pause.md index 217a8067..d3f2caa7 100644 --- a/.claude/skills/team-roadmap-dev/roles/coordinator/commands/pause.md +++ b/.claude/skills/team-roadmap-dev/roles/coordinator/commands/pause.md @@ -58,7 +58,7 @@ mcp__ccw-tools__team_msg({ operation: "log", session_id: sessionId, from: "coordinator", to: "all", type: "phase_paused", - ref: `${sessionFolder}/state.md` + data: { ref: `${sessionFolder}/state.md` } }) ``` diff --git a/.claude/skills/team-roadmap-dev/roles/coordinator/commands/resume.md b/.claude/skills/team-roadmap-dev/roles/coordinator/commands/resume.md index 2b43cac7..2ec33313 100644 --- a/.claude/skills/team-roadmap-dev/roles/coordinator/commands/resume.md +++ b/.claude/skills/team-roadmap-dev/roles/coordinator/commands/resume.md @@ -70,7 +70,7 @@ mcp__ccw-tools__team_msg({ operation: "log", session_id: sessionId, from: "coordinator", to: "all", type: "phase_started", - ref: `${sessionFolder}/state.md` + data: { ref: `${sessionFolder}/state.md` } }) ``` diff --git a/.claude/skills/team-roadmap-dev/roles/coordinator/role.md b/.claude/skills/team-roadmap-dev/roles/coordinator/role.md index a6f831c2..d31d910d 100644 --- a/.claude/skills/team-roadmap-dev/roles/coordinator/role.md +++ b/.claude/skills/team-roadmap-dev/roles/coordinator/role.md @@ -139,7 +139,7 @@ mcp__ccw-tools__team_msg({ from: "coordinator", to: , type: , - ref: + data: { ref: } }) ``` diff --git a/.claude/skills/team-tech-debt/roles/coordinator/commands/monitor.md b/.claude/skills/team-tech-debt/roles/coordinator/commands/monitor.md index 9b081052..747780f3 100644 --- a/.claude/skills/team-tech-debt/roles/coordinator/commands/monitor.md +++ b/.claude/skills/team-tech-debt/roles/coordinator/commands/monitor.md @@ -70,7 +70,8 @@ Worker completed. Process and advance. Fix-Verify Task Creation: ``` TaskCreate({ subject: "TDFIX-fix-", description: "PURPOSE: Fix regressions | Session: " }) - TaskCreate({ subject: "TDVAL-recheck-", description: "...", blockedBy: ["TDFIX-fix-"] }) + TaskCreate({ subject: "TDVAL-recheck-", description: "..." }) + TaskUpdate({ taskId: "TDVAL-recheck-", addBlockedBy: ["TDFIX-fix-"] }) ``` 7. -> handleSpawnNext diff --git a/.claude/skills/team-ultra-analyze/role-specs/analyst.md b/.claude/skills/team-ultra-analyze/role-specs/analyst.md deleted file mode 100644 index cbe887ef..00000000 --- a/.claude/skills/team-ultra-analyze/role-specs/analyst.md +++ /dev/null @@ -1,89 +0,0 @@ ---- -prefix: ANALYZE -inner_loop: false -additional_prefixes: [ANALYZE-fix] -message_types: - success: analysis_ready - error: error ---- - -# Deep Analyst - -Perform deep multi-perspective analysis on exploration results via CLI tools. Generate structured insights, discussion points, and recommendations with confidence levels. - -## Phase 2: Context Loading - -| Input | Source | Required | -|-------|--------|----------| -| Task description | From task subject/description | Yes | -| Session path | Extracted from task description | Yes | -| Exploration results | `/explorations/*.json` | Yes | - -1. Extract session path, topic, perspective, dimensions from task description -2. Detect direction-fix mode: `type:\s*direction-fix` with `adjusted_focus:\s*(.+)` -3. Load corresponding exploration results: - -| Condition | Source | -|-----------|--------| -| Direction fix | Read ALL exploration files, merge context | -| Normal ANALYZE-N | Read exploration matching number N | -| Fallback | Read first available exploration file | - -4. Select CLI tool by perspective: - -| Perspective | CLI Tool | Rule Template | -|-------------|----------|---------------| -| technical | gemini | analysis-analyze-code-patterns | -| architectural | claude | analysis-review-architecture | -| business | codex | analysis-analyze-code-patterns | -| domain_expert | gemini | analysis-analyze-code-patterns | -| direction-fix (any) | gemini | analysis-diagnose-bug-root-cause | - -## Phase 3: Deep Analysis via CLI - -Build analysis prompt with exploration context: - -``` -PURPOSE: ' from perspective"> - '"> -Success: Actionable insights with confidence levels and evidence references - -PRIOR EXPLORATION CONTEXT: -- Key files: -- Patterns found: -- Key findings: - -TASK: -- -- Generate structured findings with confidence levels (high/medium/low) -- Identify discussion points requiring user input -- List open questions needing further exploration - -MODE: analysis -CONTEXT: @**/* | Topic: -EXPECTED: Structured analysis with: key_insights, key_findings, discussion_points, open_questions, recommendations -CONSTRAINTS: Focus on perspective | -``` - -Execute: `ccw cli -p "" --tool --mode analysis --rule ` - -## Phase 4: Result Aggregation - -Write analysis output to `/analyses/analysis-.json`: - -```json -{ - "perspective": "", - "dimensions": ["", ""], - "is_direction_fix": false, - "key_insights": [{"insight": "...", "confidence": "high", "evidence": "file:line"}], - "key_findings": [{"finding": "...", "file_ref": "...", "impact": "..."}], - "discussion_points": ["..."], - "open_questions": ["..."], - "recommendations": [{"action": "...", "rationale": "...", "priority": "high"}], - "_metadata": {"cli_tool": "...", "cli_rule": "...", "perspective": "...", "timestamp": "..."} -} -``` - -Update `/wisdom/.msg/meta.json` under `analyst` namespace: -- Read existing -> merge `{ "analyst": { perspective, insight_count, finding_count, is_direction_fix } }` -> write back diff --git a/.claude/skills/team-ultra-analyze/role-specs/discussant.md b/.claude/skills/team-ultra-analyze/role-specs/discussant.md deleted file mode 100644 index 52f181cb..00000000 --- a/.claude/skills/team-ultra-analyze/role-specs/discussant.md +++ /dev/null @@ -1,106 +0,0 @@ ---- -prefix: DISCUSS -inner_loop: false -message_types: - success: discussion_processed - error: error ---- - -# Discussant - -Process analysis results and user feedback. Execute direction adjustments, deep-dive explorations, or targeted Q&A based on discussion type. Update discussion timeline. - -## Phase 2: Context Loading - -| Input | Source | Required | -|-------|--------|----------| -| Task description | From task subject/description | Yes | -| Session path | Extracted from task description | Yes | -| Analysis results | `/analyses/*.json` | Yes | -| Exploration results | `/explorations/*.json` | No | - -1. Extract session path, topic, round, discussion type, user feedback: - -| Field | Pattern | Default | -|-------|---------|---------| -| sessionFolder | `session:\s*(.+)` | required | -| topic | `topic:\s*(.+)` | required | -| round | `round:\s*(\d+)` | 1 | -| discussType | `type:\s*(.+)` | "initial" | -| userFeedback | `user_feedback:\s*(.+)` | empty | - -2. Read all analysis and exploration results -3. Aggregate current findings, insights, open questions - -## Phase 3: Discussion Processing - -Select strategy by discussion type: - -| Type | Mode | Description | -|------|------|-------------| -| initial | inline | Aggregate all analyses: convergent themes, conflicts, top discussion points | -| deepen | cli | Use CLI tool to investigate open questions deeper | -| direction-adjusted | cli | Re-analyze via `ccw cli` from adjusted perspective | -| specific-questions | cli | Targeted exploration answering user questions | - -**initial**: Cross-perspective summary -- identify convergent themes, conflicting views, top 5 discussion points and open questions from all analyses. - -**deepen**: Use CLI tool for deep investigation: -```javascript -Bash({ - command: `ccw cli -p "PURPOSE: Investigate open questions and uncertain insights; success = evidence-based findings -TASK: • Focus on open questions: • Find supporting evidence • Validate uncertain insights • Document findings -MODE: analysis -CONTEXT: @**/* | Memory: Session , previous analyses -EXPECTED: JSON output with investigation results | Write to /discussions/deepen-.json -CONSTRAINTS: Evidence-based analysis only -" --tool gemini --mode analysis --rule analysis-trace-code-execution`, - run_in_background: false -}) -``` - -**direction-adjusted**: CLI re-analysis from adjusted focus: -```javascript -Bash({ - command: `ccw cli -p "Re-analyze '' with adjusted focus on ''" --tool gemini --mode analysis`, - run_in_background: false -}) -``` - -**specific-questions**: Use CLI tool for targeted Q&A: -```javascript -Bash({ - command: `ccw cli -p "PURPOSE: Answer specific user questions about ; success = clear, evidence-based answers -TASK: • Answer: • Provide code references • Explain context -MODE: analysis -CONTEXT: @**/* | Memory: Session -EXPECTED: JSON output with answers and evidence | Write to /discussions/questions-.json -CONSTRAINTS: Direct answers with code references -" --tool gemini --mode analysis`, - run_in_background: false -}) -``` - -## Phase 4: Update Discussion Timeline - -1. Write round content to `/discussions/discussion-round-.json`: -```json -{ - "round": 1, "type": "initial", "user_feedback": "...", - "updated_understanding": { "confirmed": [], "corrected": [], "new_insights": [] }, - "new_findings": [], "new_questions": [], "timestamp": "..." -} -``` - -2. Append round section to `/discussion.md`: -```markdown -### Round - Discussion () -#### Type: -#### User Input: -#### Updated Understanding -**Confirmed**: | **Corrected**: | **New Insights**: -#### New Findings / Open Questions -``` - -Update `/wisdom/.msg/meta.json` under `discussant` namespace: -- Read existing -> merge `{ "discussant": { round, type, new_insight_count, corrected_count } }` -> write back diff --git a/.claude/skills/team-ultra-analyze/role-specs/explorer.md b/.claude/skills/team-ultra-analyze/role-specs/explorer.md deleted file mode 100644 index 14460891..00000000 --- a/.claude/skills/team-ultra-analyze/role-specs/explorer.md +++ /dev/null @@ -1,73 +0,0 @@ ---- -prefix: EXPLORE -inner_loop: false -message_types: - success: exploration_ready - error: error ---- - -# Codebase Explorer - -Explore codebase structure through cli-explore-agent, collecting structured context (files, patterns, findings) for downstream analysis. One explorer per analysis perspective. - -## Phase 2: Context & Scope Assessment - -| Input | Source | Required | -|-------|--------|----------| -| Task description | From task subject/description | Yes | -| Session path | Extracted from task description | Yes | - -1. Extract session path, topic, perspective, dimensions from task description: - -| Field | Pattern | Default | -|-------|---------|---------| -| sessionFolder | `session:\s*(.+)` | required | -| topic | `topic:\s*(.+)` | required | -| perspective | `perspective:\s*(.+)` | "general" | -| dimensions | `dimensions:\s*(.+)` | "general" | - -2. Determine exploration number from task subject (EXPLORE-N) -3. Build exploration strategy by perspective: - -| Perspective | Focus | Search Depth | -|-------------|-------|-------------| -| general | Overall codebase structure and patterns | broad | -| technical | Implementation details, code patterns, feasibility | medium | -| architectural | System design, module boundaries, interactions | broad | -| business | Business logic, domain models, value flows | medium | -| domain_expert | Domain patterns, standards, best practices | deep | - -## Phase 3: Codebase Exploration - -Use CLI tool for codebase exploration: - -```javascript -Bash({ - command: `ccw cli -p "PURPOSE: Explore codebase for from perspective; success = structured findings with relevant files and patterns -TASK: • Run module depth analysis • Search for topic-related patterns • Identify key files and their relationships • Extract architectural insights -MODE: analysis -CONTEXT: @**/* | Memory: Session , perspective -EXPECTED: JSON output with: relevant_files (path, relevance, summary), patterns, key_findings, module_map, questions_for_analysis, _metadata (perspective, search_queries, timestamp) -CONSTRAINTS: Focus on angle - | Write to /explorations/exploration-.json -" --tool gemini --mode analysis --rule analysis-analyze-code-patterns`, - run_in_background: false -}) -``` - -**ACE fallback** (when CLI produces no output): -```javascript -mcp__ace-tool__search_context({ project_root_path: ".", query: " " }) -``` - -## Phase 4: Result Validation - -| Check | Method | Action on Failure | -|-------|--------|-------------------| -| Output file exists | Read output path | Create empty result, run ACE fallback | -| Has relevant_files | Array length > 0 | Trigger ACE supplementary search | -| Has key_findings | Array length > 0 | Note partial results, proceed | - -Write validated exploration to `/explorations/exploration-.json`. - -Update `/wisdom/.msg/meta.json` under `explorer` namespace: -- Read existing -> merge `{ "explorer": { perspective, file_count, finding_count } }` -> write back diff --git a/.claude/skills/team-ultra-analyze/role-specs/synthesizer.md b/.claude/skills/team-ultra-analyze/role-specs/synthesizer.md deleted file mode 100644 index f2029441..00000000 --- a/.claude/skills/team-ultra-analyze/role-specs/synthesizer.md +++ /dev/null @@ -1,77 +0,0 @@ ---- -prefix: SYNTH -inner_loop: false -message_types: - success: synthesis_ready - error: error ---- - -# Synthesizer - -Integrate all explorations, analyses, and discussions into final conclusions. Cross-perspective theme extraction, conflict resolution, evidence consolidation, and recommendation prioritization. Pure integration role -- no external tools or CLI calls. - -## Phase 2: Context Loading - -| Input | Source | Required | -|-------|--------|----------| -| Task description | From task subject/description | Yes | -| Session path | Extracted from task description | Yes | -| All artifacts | `/explorations/*.json`, `analyses/*.json`, `discussions/*.json` | Yes | -| Decision trail | From wisdom/.msg/meta.json | No | - -1. Extract session path and topic from task description -2. Read all exploration, analysis, and discussion round files -3. Load decision trail and current understanding from meta.json -4. Select synthesis strategy: - -| Condition | Strategy | -|-----------|----------| -| Single analysis, no discussions | simple (Quick mode summary) | -| Multiple analyses, >2 discussion rounds | deep (track evolution) | -| Default | standard (cross-perspective integration) | - -## Phase 3: Cross-Perspective Synthesis - -Execute synthesis across four dimensions: - -**1. Theme Extraction**: Identify convergent themes across all analysis perspectives. Cluster insights by similarity, rank by cross-perspective confirmation count. - -**2. Conflict Resolution**: Identify contradictions between perspectives. Present both sides with trade-off analysis when irreconcilable. - -**3. Evidence Consolidation**: Deduplicate findings, aggregate by file reference. Map evidence to conclusions with confidence levels: - -| Level | Criteria | -|-------|----------| -| High | Multiple sources confirm, strong evidence | -| Medium | Single source or partial evidence | -| Low | Speculative, needs verification | - -**4. Recommendation Prioritization**: Sort all recommendations by priority (high > medium > low), deduplicate, cap at 10. - -Integrate decision trail from discussion rounds into final narrative. - -## Phase 4: Write Conclusions - -1. Write `/conclusions.json`: -```json -{ - "session_id": "...", "topic": "...", "completed": "ISO-8601", - "summary": "Executive summary...", - "key_conclusions": [{"point": "...", "evidence": "...", "confidence": "high"}], - "recommendations": [{"action": "...", "rationale": "...", "priority": "high"}], - "open_questions": ["..."], - "decision_trail": [{"round": 1, "decision": "...", "context": "..."}], - "cross_perspective_synthesis": { "convergent_themes": [], "conflicts_resolved": [], "unique_contributions": [] }, - "_metadata": { "explorations": 3, "analyses": 3, "discussions": 2, "strategy": "standard" } -} -``` - -2. Append conclusions section to `/discussion.md`: -```markdown -## Conclusions -### Summary / Key Conclusions / Recommendations / Remaining Questions -## Decision Trail / Current Understanding (Final) / Session Statistics -``` - -Update `/wisdom/.msg/meta.json` under `synthesizer` namespace: -- Read existing -> merge `{ "synthesizer": { conclusion_count, recommendation_count, open_question_count } }` -> write back diff --git a/.claude/skills/team-ultra-analyze/roles/coordinator/commands/monitor.md b/.claude/skills/team-ultra-analyze/roles/coordinator/commands/monitor.md index 91a1129f..6b79edf8 100644 --- a/.claude/skills/team-ultra-analyze/roles/coordinator/commands/monitor.md +++ b/.claude/skills/team-ultra-analyze/roles/coordinator/commands/monitor.md @@ -40,18 +40,28 @@ MAX_ROUNDS = pipeline_mode === 'deep' ? 5 Triggered when a worker sends completion message (via SendMessage callback). -1. Parse message to identify role and task ID: +1. Parse message to identify role, then resolve completed tasks: -| Message Pattern | Role Detection | -|----------------|---------------| -| `[explorer]` or task ID `EXPLORE-*` | explorer | -| `[analyst]` or task ID `ANALYZE-*` | analyst | -| `[discussant]` or task ID `DISCUSS-*` | discussant | -| `[synthesizer]` or task ID `SYNTH-*` | synthesizer | + **Role detection** (from message tag at start of body): -2. Mark task as completed: + | Message starts with | Role | Handler | + |---------------------|------|---------| + | `[explorer]` | explorer | handleCallback | + | `[analyst]` | analyst | handleCallback | + | `[discussant]` | discussant | handleCallback | + | `[synthesizer]` | synthesizer | handleCallback | + | `[supervisor]` | supervisor | Log checkpoint result, verify CHECKPOINT task completed, proceed to handleSpawnNext | + + **Task ID resolution** (do NOT parse from message — use TaskList): + - Call `TaskList()` and find tasks matching the detected role's prefix + - Tasks with status `completed` that were not previously tracked = newly completed tasks + - This is reliable even when a worker reports multiple tasks (inner_loop) or when message format varies + +2. Verify task completion (worker already marks completed in Phase 5): ``` +TaskGet({ taskId: "" }) +// If still "in_progress" (worker failed to mark) → fallback: TaskUpdate({ taskId: "", status: "completed" }) ``` @@ -112,7 +122,7 @@ ELSE: |----------|--------| | "Continue deeper" | Create new DISCUSS-`` task (pending, no blockedBy). Record decision in discussion.md. Proceed to handleSpawnNext | | "Adjust direction" | AskUserQuestion for new focus. Create ANALYZE-fix-`` task (pending). Create DISCUSS-`` task (pending, blockedBy ANALYZE-fix-``). Record direction change in discussion.md. Proceed to handleSpawnNext | -| "Done" | Create SYNTH-001 task (pending, blockedBy last DISCUSS). Record decision in discussion.md. Proceed to handleSpawnNext | +| "Done" | Check if SYNTH-001 already exists (from dispatch): if yes, ensure blockedBy is updated to reference last DISCUSS task; if no, create SYNTH-001 (pending, blockedBy last DISCUSS). Record decision in discussion.md. Proceed to handleSpawnNext | **Dynamic task creation templates**: @@ -160,8 +170,11 @@ InnerLoop: false" TaskUpdate({ taskId: "ANALYZE-fix-", owner: "analyst" }) ``` -SYNTH-001 (created dynamically in deep mode): +SYNTH-001 (created dynamically — check existence first): ``` +// Guard: only create if SYNTH-001 doesn't exist yet (dispatch may have pre-created it) +const existingSynth = TaskList().find(t => t.subject === 'SYNTH-001') +if (!existingSynth) { TaskCreate({ subject: "SYNTH-001", description: "PURPOSE: Integrate all analysis into final conclusions | Success: Executive summary with recommendations @@ -179,6 +192,8 @@ CONSTRAINTS: Pure integration, no new exploration --- InnerLoop: false" }) +} +// Always update blockedBy to reference the last DISCUSS task (whether pre-existing or newly created) TaskUpdate({ taskId: "SYNTH-001", addBlockedBy: [""], owner: "synthesizer" }) ``` @@ -211,10 +226,10 @@ Find and spawn the next ready tasks. | Task Prefix | Role | Role Spec | |-------------|------|-----------| -| `EXPLORE-*` | explorer | `~ or /.claude/skills/team-ultra-analyze/role-specs/explorer.md` | -| `ANALYZE-*` | analyst | `~ or /.claude/skills/team-ultra-analyze/role-specs/analyst.md` | -| `DISCUSS-*` | discussant | `~ or /.claude/skills/team-ultra-analyze/role-specs/discussant.md` | -| `SYNTH-*` | synthesizer | `~ or /.claude/skills/team-ultra-analyze/role-specs/synthesizer.md` | +| `EXPLORE-*` | explorer | `/roles/explorer/role.md` | +| `ANALYZE-*` | analyst | `/roles/analyst/role.md` | +| `DISCUSS-*` | discussant | `/roles/discussant/role.md` | +| `SYNTH-*` | synthesizer | `/roles/synthesizer/role.md` | 3. Spawn team-worker for each ready task: @@ -227,7 +242,7 @@ Agent({ run_in_background: true, prompt: `## Role Assignment role: -role_spec: ~ or /.claude/skills/team-ultra-analyze/role-specs/.md +role_spec: /roles//role.md session: session_id: team_name: ultra-analyze @@ -298,11 +313,11 @@ Triggered when all pipeline tasks are completed. | deep | All EXPLORE + ANALYZE + all DISCUSS-N + SYNTH-001 completed | 1. Verify all tasks completed. If any not completed, return to handleSpawnNext -2. If all completed, transition to coordinator Phase 5 +2. If all completed, **inline-execute coordinator Phase 5** (shutdown workers → report → completion action). Do NOT STOP here — continue directly into Phase 5 within the same turn. ## Phase 4: State Persistence -After every handler execution: +After every handler execution **except handleComplete**: 1. Update session.json with current state: - `discussion_round`: current round count @@ -311,6 +326,8 @@ After every handler execution: 2. Verify task list consistency (no orphan tasks, no broken dependencies) 3. **STOP** and wait for next event +> **handleComplete exception**: handleComplete does NOT STOP — it transitions directly to coordinator Phase 5. + ## Error Handling | Scenario | Resolution | diff --git a/.claude/skills/team-ultra-analyze/roles/coordinator/role.md b/.claude/skills/team-ultra-analyze/roles/coordinator/role.md index 9cc6a274..bd72cd90 100644 --- a/.claude/skills/team-ultra-analyze/roles/coordinator/role.md +++ b/.claude/skills/team-ultra-analyze/roles/coordinator/role.md @@ -44,13 +44,21 @@ When coordinator is invoked, detect invocation type: | Detection | Condition | Handler | |-----------|-----------|---------| -| Worker callback | Message contains role tag [explorer], [analyst], [discussant], [synthesizer] | -> handleCallback (monitor.md) | +| Worker callback | Message content starts with `[explorer]`, `[analyst]`, `[discussant]`, or `[synthesizer]` (role tag at beginning of message body) | -> handleCallback (monitor.md) | +| Supervisor callback | Message content starts with `[supervisor]` | -> handleSupervisorReport (log checkpoint result, proceed to handleSpawnNext if tasks unblocked) | +| Idle notification | System notification that a teammate went idle (does NOT start with a role tag — typically says "Agent X is now idle") | -> **IGNORE** (do not handleCallback; idle is normal after every turn) | +| Shutdown response | Message content is a JSON object containing `shutdown_response` (parse as structured data, not string) | -> handleShutdownResponse (see Phase 5) | | Status check | Arguments contain "check" or "status" | -> handleCheck (monitor.md) | | Manual resume | Arguments contain "resume" or "continue" | -> handleResume (monitor.md) | | Pipeline complete | All tasks have status "completed" | -> handleComplete (monitor.md) | | Interrupted session | Active/paused session exists | -> Phase 0 | | New session | None of above | -> Phase 1 | +**Message format discrimination**: +- **String messages starting with `[]`**: Worker/supervisor completion reports → route to handleCallback or handleSupervisorReport +- **JSON object messages** (contain `type:` field): Structured protocol messages (shutdown_response) → route by `type` field +- **Other strings without role tags**: System idle notifications → IGNORE + For callback/check/resume/complete: load `@commands/monitor.md` and execute matched handler, then STOP. ### Router Implementation @@ -167,7 +175,31 @@ All subsequent coordination is handled by `commands/monitor.md` handlers trigger --- -## Phase 5: Report + Completion Action +## Phase 5: Shutdown Workers + Report + Completion Action + +### Shutdown All Workers + +Before reporting, gracefully shut down all active teammates. This is a **multi-turn** process: + +1. Read team config: `~/.claude/teams/ultra-analyze/config.json` +2. Build shutdown tracking list: `pending_shutdown = []` +3. For each member in pending_shutdown, send shutdown request: + ```javascript + SendMessage({ + to: "", + message: { type: "shutdown_request", reason: "Pipeline complete" } + }) + ``` +4. **STOP** — wait for responses. Each `shutdown_response` triggers a new coordinator turn. +5. On each subsequent turn (shutdown_response received): + - Remove responder from `pending_shutdown` + - If `pending_shutdown` is empty → proceed to **Report** section below + - If not empty → **STOP** again, wait for remaining responses +6. If a member is unresponsive after 2 follow-ups, remove from tracking and proceed + +**Note**: Workers that completed Phase 5-F and reached STOP may have already terminated. SendMessage to a terminated agent is silently ignored — this is safe. Only resident agents (e.g., supervisor) require explicit shutdown. + +### Report 1. Load session state -> count completed tasks, calculate duration 2. List deliverables: diff --git a/.claude/skills/team-ux-improve/role-specs/tester.md b/.claude/skills/team-ux-improve/role-specs/tester.md index 0233edb2..191b73bd 100644 --- a/.claude/skills/team-ux-improve/role-specs/tester.md +++ b/.claude/skills/team-ux-improve/role-specs/tester.md @@ -157,8 +157,7 @@ team_msg(operation="log", session_id=, from="tester", If pass rate < 95%, send fix_required message: ``` SendMessage({ - recipient: "coordinator", - type: "message", - content: "[tester] Test validation incomplete. Pass rate: %. Manual review needed." + to: "coordinator", + message: "[tester] Test validation incomplete. Pass rate: %. Manual review needed." }) ``` diff --git a/.claude/skills/team-ux-improve/roles/coordinator/commands/dispatch.md b/.claude/skills/team-ux-improve/roles/coordinator/commands/dispatch.md index 1f09e5f0..04c3c754 100644 --- a/.claude/skills/team-ux-improve/roles/coordinator/commands/dispatch.md +++ b/.claude/skills/team-ux-improve/roles/coordinator/commands/dispatch.md @@ -30,7 +30,6 @@ Every task description uses structured format for clarity: ``` TaskCreate({ subject: "", - owner: "", description: "PURPOSE: | Success: TASK: - @@ -46,9 +45,9 @@ EXPECTED: + CONSTRAINTS: --- InnerLoop: -", - blockedBy: [] +" }) +TaskUpdate({ taskId: "", addBlockedBy: [], owner: "" }) ``` ### Standard Pipeline Tasks @@ -57,7 +56,6 @@ InnerLoop: ``` TaskCreate({ subject: "SCAN-001", - owner: "scanner", description: "PURPOSE: Scan UI components to identify interaction issues (unresponsive buttons, missing feedback, state not refreshing) | Success: Complete issue report with file:line references and severity classification TASK: - Detect framework (React/Vue) from project structure @@ -73,17 +71,15 @@ CONTEXT: EXPECTED: artifacts/scan-report.md with structured issue list (severity: High/Medium/Low, file:line, description, category) CONSTRAINTS: Focus on interaction issues only, exclude styling/layout problems --- -InnerLoop: false", - blockedBy: [], - status: "pending" +InnerLoop: false" }) +TaskUpdate({ taskId: "SCAN-001", owner: "scanner" }) ``` **DIAG-001: Root Cause Diagnosis** ``` TaskCreate({ subject: "DIAG-001", - owner: "diagnoser", description: "PURPOSE: Diagnose root causes of identified UI issues | Success: Complete diagnosis report with fix recommendations for each issue TASK: - Load scan report from artifacts/scan-report.md @@ -100,17 +96,15 @@ CONTEXT: EXPECTED: artifacts/diagnosis.md with root cause analysis (issue ID, root cause, pattern type, fix recommendation) CONSTRAINTS: Focus on actionable root causes, provide specific fix strategies --- -InnerLoop: false", - blockedBy: ["SCAN-001"], - status: "pending" +InnerLoop: false" }) +TaskUpdate({ taskId: "DIAG-001", addBlockedBy: ["SCAN-001"], owner: "diagnoser" }) ``` **DESIGN-001: Solution Design** ``` TaskCreate({ subject: "DESIGN-001", - owner: "designer", description: "PURPOSE: Design feedback mechanisms and state management solutions for identified issues | Success: Complete implementation guide with code patterns and examples TASK: - Load diagnosis report from artifacts/diagnosis.md @@ -128,17 +122,15 @@ CONTEXT: EXPECTED: artifacts/design-guide.md with implementation guide (issue ID, solution design, code patterns, state management examples, UI binding templates) CONSTRAINTS: Solutions must be framework-appropriate, provide complete working examples --- -InnerLoop: false", - blockedBy: ["DIAG-001"], - status: "pending" +InnerLoop: false" }) +TaskUpdate({ taskId: "DESIGN-001", addBlockedBy: ["DIAG-001"], owner: "designer" }) ``` **IMPL-001: Code Implementation** ``` TaskCreate({ subject: "IMPL-001", - owner: "implementer", description: "PURPOSE: Generate fix code with proper state management, event handling, and UI feedback bindings | Success: All fixes implemented and validated TASK: - Load design guide from artifacts/design-guide.md @@ -158,17 +150,15 @@ CONTEXT: EXPECTED: artifacts/fixes/ directory with all fix files, implementation summary in artifacts/fixes/README.md CONSTRAINTS: Maintain existing code style, ensure backward compatibility, validate all changes --- -InnerLoop: true", - blockedBy: ["DESIGN-001"], - status: "pending" +InnerLoop: true" }) +TaskUpdate({ taskId: "IMPL-001", addBlockedBy: ["DESIGN-001"], owner: "implementer" }) ``` **TEST-001: Test Validation** ``` TaskCreate({ subject: "TEST-001", - owner: "tester", description: "PURPOSE: Generate and run tests to verify fixes (loading states, error handling, state updates) | Success: Pass rate >= 95%, all critical fixes validated TASK: - Detect test framework (Jest/Vitest) from project @@ -187,10 +177,9 @@ CONTEXT: EXPECTED: artifacts/test-report.md with test results (pass/fail counts, coverage metrics, fix iterations, remaining issues) CONSTRAINTS: Pass rate threshold: 95%, max fix iterations: 5 --- -InnerLoop: false", - blockedBy: ["IMPL-001"], - status: "pending" +InnerLoop: false" }) +TaskUpdate({ taskId: "TEST-001", addBlockedBy: ["IMPL-001"], owner: "tester" }) ``` --- diff --git a/codex-lens-v2/src/codexlens_search/bridge.py b/codex-lens-v2/src/codexlens_search/bridge.py index 42eced7c..0590b694 100644 --- a/codex-lens-v2/src/codexlens_search/bridge.py +++ b/codex-lens-v2/src/codexlens_search/bridge.py @@ -124,6 +124,19 @@ def create_config_from_env(db_path: str | Path, **overrides: object) -> "Config" kwargs["hnsw_ef"] = int(os.environ["CODEXLENS_HNSW_EF"]) if os.environ.get("CODEXLENS_HNSW_M"): kwargs["hnsw_M"] = int(os.environ["CODEXLENS_HNSW_M"]) + # Tier config from env + if os.environ.get("CODEXLENS_TIER_HOT_HOURS"): + kwargs["tier_hot_hours"] = int(os.environ["CODEXLENS_TIER_HOT_HOURS"]) + if os.environ.get("CODEXLENS_TIER_COLD_HOURS"): + kwargs["tier_cold_hours"] = int(os.environ["CODEXLENS_TIER_COLD_HOURS"]) + # Search quality tier from env + if os.environ.get("CODEXLENS_SEARCH_QUALITY"): + kwargs["default_search_quality"] = os.environ["CODEXLENS_SEARCH_QUALITY"] + # Shard config from env + if os.environ.get("CODEXLENS_NUM_SHARDS"): + kwargs["num_shards"] = int(os.environ["CODEXLENS_NUM_SHARDS"]) + if os.environ.get("CODEXLENS_MAX_LOADED_SHARDS"): + kwargs["max_loaded_shards"] = int(os.environ["CODEXLENS_MAX_LOADED_SHARDS"]) resolved = Path(db_path).resolve() kwargs["metadata_db_path"] = str(resolved / "metadata.db") return Config(**kwargs) @@ -143,28 +156,8 @@ def _create_config(args: argparse.Namespace) -> "Config": return create_config_from_env(args.db_path, **overrides) -def create_pipeline( - db_path: str | Path, - config: "Config | None" = None, -) -> tuple: - """Construct pipeline components from db_path and config. - - Returns (indexing_pipeline, search_pipeline, config). - Used by both CLI bridge and MCP server. - """ - from codexlens_search.config import Config - from codexlens_search.core.factory import create_ann_index, create_binary_index - from codexlens_search.indexing.metadata import MetadataStore - from codexlens_search.indexing.pipeline import IndexingPipeline - from codexlens_search.search.fts import FTSEngine - from codexlens_search.search.pipeline import SearchPipeline - - if config is None: - config = create_config_from_env(db_path) - resolved = Path(db_path).resolve() - resolved.mkdir(parents=True, exist_ok=True) - - # Select embedder: API if configured, otherwise local fastembed +def _create_embedder(config: "Config"): + """Create embedder based on config, auto-detecting embed_dim from API.""" if config.embed_api_url: from codexlens_search.embed.api import APIEmbedder embedder = APIEmbedder(config) @@ -179,13 +172,11 @@ def create_pipeline( else: from codexlens_search.embed.local import FastEmbedEmbedder embedder = FastEmbedEmbedder(config) + return embedder - binary_store = create_binary_index(resolved, config.embed_dim, config) - ann_index = create_ann_index(resolved, config.embed_dim, config) - fts = FTSEngine(resolved / "fts.db") - metadata = MetadataStore(resolved / "metadata.db") - # Select reranker: API if configured, otherwise local fastembed +def _create_reranker(config: "Config"): + """Create reranker based on config.""" if config.reranker_api_url: from codexlens_search.rerank.api import APIReranker reranker = APIReranker(config) @@ -193,6 +184,60 @@ def create_pipeline( else: from codexlens_search.rerank.local import FastEmbedReranker reranker = FastEmbedReranker(config) + return reranker + + +def create_pipeline( + db_path: str | Path, + config: "Config | None" = None, +) -> tuple: + """Construct pipeline components from db_path and config. + + Returns (indexing_pipeline, search_pipeline, config). + Used by both CLI bridge and MCP server. + + When config.num_shards > 1, returns a ShardManager-backed pipeline + where indexing and search are delegated to the ShardManager. + The returned tuple is (shard_manager, shard_manager, config) so that + callers can use shard_manager.sync() and shard_manager.search(). + """ + from codexlens_search.config import Config + + if config is None: + config = create_config_from_env(db_path) + resolved = Path(db_path).resolve() + resolved.mkdir(parents=True, exist_ok=True) + + embedder = _create_embedder(config) + reranker = _create_reranker(config) + + # Sharded mode: delegate to ShardManager + if config.num_shards > 1: + from codexlens_search.core.shard_manager import ShardManager + manager = ShardManager( + num_shards=config.num_shards, + db_path=resolved, + config=config, + embedder=embedder, + reranker=reranker, + ) + log.info( + "Using ShardManager with %d shards (max_loaded=%d)", + config.num_shards, config.max_loaded_shards, + ) + return manager, manager, config + + # Single-shard mode: original behavior, no ShardManager overhead + from codexlens_search.core.factory import create_ann_index, create_binary_index + from codexlens_search.indexing.metadata import MetadataStore + from codexlens_search.indexing.pipeline import IndexingPipeline + from codexlens_search.search.fts import FTSEngine + from codexlens_search.search.pipeline import SearchPipeline + + binary_store = create_binary_index(resolved, config.embed_dim, config) + ann_index = create_ann_index(resolved, config.embed_dim, config) + fts = FTSEngine(resolved / "fts.db") + metadata = MetadataStore(resolved / "metadata.db") indexing = IndexingPipeline( embedder=embedder, diff --git a/codex-lens-v2/src/codexlens_search/config.py b/codex-lens-v2/src/codexlens_search/config.py index 6b7dadd8..f56df7e4 100644 --- a/codex-lens-v2/src/codexlens_search/config.py +++ b/codex-lens-v2/src/codexlens_search/config.py @@ -47,7 +47,7 @@ class Config: # Backend selection: 'auto', 'faiss', 'hnswlib' ann_backend: str = "auto" - binary_backend: str = "auto" + binary_backend: str = "faiss" # Indexing pipeline index_workers: int = 2 # number of parallel indexing workers @@ -77,6 +77,17 @@ class Config: # Metadata store metadata_db_path: str = "" # empty = no metadata tracking + # Data tiering (hot/warm/cold) + tier_hot_hours: int = 24 # files accessed within this window are 'hot' + tier_cold_hours: int = 168 # files not accessed for this long are 'cold' + + # Search quality tier: 'fast', 'balanced', 'thorough', 'auto' + default_search_quality: str = "auto" + + # Shard partitioning + num_shards: int = 1 # 1 = single partition (no sharding), >1 = hash-based sharding + max_loaded_shards: int = 4 # LRU limit for loaded shards in ShardManager + # FTS fts_top_k: int = 50 diff --git a/codex-lens-v2/src/codexlens_search/core/binary.py b/codex-lens-v2/src/codexlens_search/core/binary.py index e8ba543e..fe94997f 100644 --- a/codex-lens-v2/src/codexlens_search/core/binary.py +++ b/codex-lens-v2/src/codexlens_search/core/binary.py @@ -15,6 +15,13 @@ logger = logging.getLogger(__name__) class BinaryStore(BaseBinaryIndex): """Persistent binary vector store using numpy memmap. + .. deprecated:: + Prefer ``FAISSBinaryIndex`` for binary coarse search. This class is + retained as a numpy-only fallback for environments where FAISS is not + available. New code should use ``create_binary_index()`` from + ``codexlens_search.core.factory`` which selects the best backend + automatically. + Stores binary-quantized float32 vectors as packed uint8 arrays on disk. Supports fast coarse search via XOR + popcount Hamming distance. """ diff --git a/codex-lens-v2/src/codexlens_search/core/factory.py b/codex-lens-v2/src/codexlens_search/core/factory.py index ed7758e6..6ad952bb 100644 --- a/codex-lens-v2/src/codexlens_search/core/factory.py +++ b/codex-lens-v2/src/codexlens_search/core/factory.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import warnings from pathlib import Path from codexlens_search.config import Config @@ -97,14 +98,29 @@ def create_binary_index( backend = config.binary_backend if backend == "faiss": - from codexlens_search.core.faiss_index import FAISSBinaryIndex - return FAISSBinaryIndex(path, dim, config) + if _FAISS_AVAILABLE: + from codexlens_search.core.faiss_index import FAISSBinaryIndex + return FAISSBinaryIndex(path, dim, config) + # FAISS explicitly requested but not installed: fall back with warning + from codexlens_search.core.binary import BinaryStore + warnings.warn( + "binary_backend='faiss' but FAISS is not installed. " + "Falling back to deprecated numpy BinaryStore. " + "Install faiss-cpu or faiss-gpu for the recommended binary backend.", + DeprecationWarning, + stacklevel=2, + ) + logger.warning( + "binary_backend='faiss' but FAISS not available, " + "falling back to deprecated numpy BinaryStore." + ) + return BinaryStore(path, dim, config) if backend == "hnswlib": from codexlens_search.core.binary import BinaryStore return BinaryStore(path, dim, config) - # auto: try faiss first, then numpy-based BinaryStore + # auto: try faiss first, then numpy-based BinaryStore (deprecated fallback) if _FAISS_AVAILABLE: from codexlens_search.core.faiss_index import FAISSBinaryIndex logger.info("Auto-selected FAISS binary backend") @@ -112,5 +128,14 @@ def create_binary_index( # numpy BinaryStore is always available (no extra deps) from codexlens_search.core.binary import BinaryStore - logger.info("Auto-selected numpy BinaryStore backend") + warnings.warn( + "Falling back to numpy BinaryStore because FAISS is not installed. " + "BinaryStore is deprecated; install faiss-cpu or faiss-gpu for better performance.", + DeprecationWarning, + stacklevel=2, + ) + logger.warning( + "FAISS not available, falling back to deprecated numpy BinaryStore. " + "Install faiss-cpu or faiss-gpu for the recommended binary backend." + ) return BinaryStore(path, dim, config) diff --git a/codex-lens-v2/src/codexlens_search/core/faiss_index.py b/codex-lens-v2/src/codexlens_search/core/faiss_index.py index e3277527..6298de51 100644 --- a/codex-lens-v2/src/codexlens_search/core/faiss_index.py +++ b/codex-lens-v2/src/codexlens_search/core/faiss_index.py @@ -71,10 +71,23 @@ class FAISSANNIndex(BaseANNIndex): self.load() def load(self) -> None: - """Load index from disk or initialize a fresh one.""" + """Load index from disk or initialize a fresh one. + + Uses IO_FLAG_MMAP for zero-copy memory-mapped loading when available, + falling back to regular read_index() on older faiss versions. + """ with self._lock: if self._index_path.exists(): - idx = faiss.read_index(str(self._index_path)) + try: + idx = faiss.read_index( + str(self._index_path), faiss.IO_FLAG_MMAP + ) + except (AttributeError, RuntimeError, Exception) as exc: + logger.debug( + "MMAP load failed, falling back to regular read: %s", + exc, + ) + idx = faiss.read_index(str(self._index_path)) logger.debug( "Loaded FAISS ANN index from %s (%d items)", self._index_path, idx.ntotal, @@ -201,10 +214,23 @@ class FAISSBinaryIndex(BaseBinaryIndex): return np.packbits(binary).reshape(1, -1) def load(self) -> None: - """Load binary index from disk or initialize a fresh one.""" + """Load binary index from disk or initialize a fresh one. + + Uses IO_FLAG_MMAP for zero-copy memory-mapped loading when available, + falling back to regular read_index_binary() on older faiss versions. + """ with self._lock: if self._index_path.exists(): - idx = faiss.read_index_binary(str(self._index_path)) + try: + idx = faiss.read_index_binary( + str(self._index_path), faiss.IO_FLAG_MMAP + ) + except (AttributeError, RuntimeError, Exception) as exc: + logger.debug( + "MMAP load failed, falling back to regular read: %s", + exc, + ) + idx = faiss.read_index_binary(str(self._index_path)) logger.debug( "Loaded FAISS binary index from %s (%d items)", self._index_path, idx.ntotal, diff --git a/codex-lens-v2/src/codexlens_search/core/shard.py b/codex-lens-v2/src/codexlens_search/core/shard.py new file mode 100644 index 00000000..2f3527df --- /dev/null +++ b/codex-lens-v2/src/codexlens_search/core/shard.py @@ -0,0 +1,178 @@ +"""Single index partition (shard) that owns FTS, binary, ANN, and metadata stores.""" +from __future__ import annotations + +import logging +from pathlib import Path + +from codexlens_search.config import Config +from codexlens_search.core.base import BaseANNIndex, BaseBinaryIndex +from codexlens_search.embed.base import BaseEmbedder +from codexlens_search.indexing.metadata import MetadataStore +from codexlens_search.indexing.pipeline import IndexingPipeline, IndexStats +from codexlens_search.rerank import BaseReranker +from codexlens_search.search.fts import FTSEngine +from codexlens_search.search.pipeline import SearchPipeline, SearchResult + +logger = logging.getLogger(__name__) + + +class Shard: + """A complete index partition with its own FTS, binary, ANN, and metadata stores. + + Components are lazy-loaded on first access and can be explicitly unloaded + to release memory. The embedder and reranker are shared across shards + (passed in from ShardManager) since they are expensive to instantiate. + """ + + def __init__( + self, + shard_id: int, + db_path: str | Path, + config: Config, + ) -> None: + self._shard_id = shard_id + self._shard_dir = Path(db_path).resolve() / f"shard_{shard_id}" + self._config = config + + # Lazy-loaded components (created on _ensure_loaded) + self._fts: FTSEngine | None = None + self._binary_store: BaseBinaryIndex | None = None + self._ann_index: BaseANNIndex | None = None + self._metadata: MetadataStore | None = None + self._indexing: IndexingPipeline | None = None + self._search: SearchPipeline | None = None + self._loaded = False + + @property + def shard_id(self) -> int: + return self._shard_id + + @property + def is_loaded(self) -> bool: + return self._loaded + + def _ensure_loaded( + self, + embedder: BaseEmbedder, + reranker: BaseReranker, + ) -> None: + """Lazy-create all per-shard components if not yet loaded.""" + if self._loaded: + return + + from codexlens_search.core.factory import create_ann_index, create_binary_index + + self._shard_dir.mkdir(parents=True, exist_ok=True) + + self._fts = FTSEngine(self._shard_dir / "fts.db") + self._binary_store = create_binary_index( + self._shard_dir, self._config.embed_dim, self._config + ) + self._ann_index = create_ann_index( + self._shard_dir, self._config.embed_dim, self._config + ) + self._metadata = MetadataStore(self._shard_dir / "metadata.db") + + self._indexing = IndexingPipeline( + embedder=embedder, + binary_store=self._binary_store, + ann_index=self._ann_index, + fts=self._fts, + config=self._config, + metadata=self._metadata, + ) + + self._search = SearchPipeline( + embedder=embedder, + binary_store=self._binary_store, + ann_index=self._ann_index, + reranker=reranker, + fts=self._fts, + config=self._config, + metadata_store=self._metadata, + ) + + self._loaded = True + logger.debug("Shard %d loaded from %s", self._shard_id, self._shard_dir) + + def unload(self) -> None: + """Release memory by closing connections and dropping references.""" + if not self._loaded: + return + + if self._metadata is not None: + self._metadata.close() + + self._fts = None + self._binary_store = None + self._ann_index = None + self._metadata = None + self._indexing = None + self._search = None + self._loaded = False + logger.debug("Shard %d unloaded", self._shard_id) + + def load( + self, + embedder: BaseEmbedder, + reranker: BaseReranker, + ) -> None: + """Explicitly load shard components.""" + self._ensure_loaded(embedder, reranker) + + def save(self) -> None: + """Persist binary and ANN indexes to disk.""" + if not self._loaded: + return + if self._binary_store is not None: + self._binary_store.save() + if self._ann_index is not None: + self._ann_index.save() + + def search( + self, + query: str, + embedder: BaseEmbedder, + reranker: BaseReranker, + quality: str | None = None, + top_k: int | None = None, + ) -> list[SearchResult]: + """Search this shard's index. + + Args: + query: Search query string. + embedder: Shared embedder instance. + reranker: Shared reranker instance. + quality: Search quality tier. + top_k: Maximum results to return. + + Returns: + List of SearchResult from this shard. + """ + self._ensure_loaded(embedder, reranker) + assert self._search is not None + return self._search.search(query, top_k=top_k, quality=quality) + + def sync( + self, + files: list[Path], + root: Path | None, + embedder: BaseEmbedder, + reranker: BaseReranker, + **kwargs: object, + ) -> IndexStats: + """Sync this shard's index with the given files. + + Args: + files: Files that belong to this shard. + root: Root directory for relative paths. + embedder: Shared embedder instance. + reranker: Shared reranker instance. + **kwargs: Forwarded to IndexingPipeline.sync(). + + Returns: + IndexStats for this shard's sync operation. + """ + self._ensure_loaded(embedder, reranker) + assert self._indexing is not None + return self._indexing.sync(files, root=root, **kwargs) diff --git a/codex-lens-v2/src/codexlens_search/core/shard_manager.py b/codex-lens-v2/src/codexlens_search/core/shard_manager.py new file mode 100644 index 00000000..639aa3f2 --- /dev/null +++ b/codex-lens-v2/src/codexlens_search/core/shard_manager.py @@ -0,0 +1,250 @@ +"""ShardManager: manages multiple Shard instances with LRU eviction.""" +from __future__ import annotations + +import logging +import threading +from collections import OrderedDict +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +from codexlens_search.config import Config +from codexlens_search.core.shard import Shard +from codexlens_search.embed.base import BaseEmbedder +from codexlens_search.indexing.pipeline import IndexStats +from codexlens_search.rerank import BaseReranker +from codexlens_search.search.fusion import reciprocal_rank_fusion +from codexlens_search.search.pipeline import SearchResult + +logger = logging.getLogger(__name__) + + +class ShardManager: + """Manages multiple Shard instances with hash-based file routing and LRU eviction. + + Files are deterministically routed to shards via hash(path) % num_shards. + Search queries all shards in parallel and merges results via RRF fusion. + At most max_loaded_shards are kept in memory; least-recently-used shards + are unloaded when the limit is exceeded. + """ + + def __init__( + self, + num_shards: int, + db_path: str | Path, + config: Config, + embedder: BaseEmbedder, + reranker: BaseReranker, + ) -> None: + if num_shards < 1: + raise ValueError("num_shards must be >= 1") + + self._num_shards = num_shards + self._db_path = Path(db_path).resolve() + self._config = config + self._embedder = embedder + self._reranker = reranker + self._max_loaded = config.max_loaded_shards + + # Create all Shard objects (lazy-loaded, no I/O yet) + self._shards: dict[int, Shard] = { + i: Shard(i, self._db_path, config) + for i in range(num_shards) + } + + # LRU tracking: keys are shard_ids, most-recently-used at end + self._loaded_order: OrderedDict[int, None] = OrderedDict() + self._lru_lock = threading.Lock() + + @property + def num_shards(self) -> int: + return self._num_shards + + def route_file(self, path: str) -> int: + """Deterministically route a file path to a shard ID. + + Uses hash(path) % num_shards for uniform distribution. + """ + return hash(path) % self._num_shards + + def get_shard(self, shard_id: int) -> Shard: + """Return the Shard instance for a given shard_id.""" + if shard_id not in self._shards: + raise ValueError( + f"Invalid shard_id {shard_id}, valid range: 0-{self._num_shards - 1}" + ) + return self._shards[shard_id] + + def _ensure_loaded(self, shard_id: int) -> Shard: + """Load a shard if needed, applying LRU eviction policy. + + Thread-safe: protects OrderedDict mutations with a lock. + Returns the loaded Shard. + """ + shard = self._shards[shard_id] + + with self._lru_lock: + # Mark as most-recently-used + if shard_id in self._loaded_order: + self._loaded_order.move_to_end(shard_id) + else: + self._loaded_order[shard_id] = None + + # Load if not already loaded + if not shard.is_loaded: + shard.load(self._embedder, self._reranker) + + # Evict LRU shards if over limit + while len(self._loaded_order) > self._max_loaded: + evict_id, _ = self._loaded_order.popitem(last=False) + evict_shard = self._shards[evict_id] + if evict_shard.is_loaded: + logger.info("LRU evicting shard %d", evict_id) + evict_shard.unload() + + return shard + + def sync( + self, + files: list[Path], + root: Path | None = None, + **kwargs: object, + ) -> IndexStats: + """Sync index with files, routing each file to its shard. + + Groups files by shard via route_file(), then syncs each shard + with its subset of files. + + Args: + files: Current list of files to index. + root: Root directory for relative paths. + **kwargs: Forwarded to Shard.sync(). + + Returns: + Aggregated IndexStats across all shards. + """ + # Group files by shard + shard_files: dict[int, list[Path]] = {i: [] for i in range(self._num_shards)} + for fpath in files: + rel = str(fpath.relative_to(root)) if root else str(fpath) + shard_id = self.route_file(rel) + shard_files[shard_id].append(fpath) + + total_files = 0 + total_chunks = 0 + total_duration = 0.0 + + for shard_id, shard_file_list in shard_files.items(): + if not shard_file_list: + continue + self._ensure_loaded(shard_id) + shard = self._shards[shard_id] + stats = shard.sync( + shard_file_list, + root=root, + embedder=self._embedder, + reranker=self._reranker, + **kwargs, + ) + total_files += stats.files_processed + total_chunks += stats.chunks_created + total_duration += stats.duration_seconds + + return IndexStats( + files_processed=total_files, + chunks_created=total_chunks, + duration_seconds=round(total_duration, 2), + ) + + def search( + self, + query: str, + quality: str | None = None, + top_k: int | None = None, + ) -> list[SearchResult]: + """Search all shards in parallel, merge results via RRF fusion. + + Each shard returns its own ranked results. Cross-shard merging + uses reciprocal_rank_fusion with equal weights across shards. + Per-shard top_k is increased to compensate for cross-shard dilution. + + Args: + query: Search query string. + quality: Search quality tier. + top_k: Maximum final results to return. + + Returns: + Merged list of SearchResult ordered by relevance. + """ + cfg = self._config + final_top_k = top_k if top_k is not None else cfg.reranker_top_k + + # Increase per-shard top_k to get enough candidates for cross-shard RRF + per_shard_top_k = max(final_top_k, final_top_k * 2) + + # Load all shards for search + for shard_id in range(self._num_shards): + self._ensure_loaded(shard_id) + + # Parallel search across shards + shard_results: dict[int, list[SearchResult]] = {} + + def _search_shard(sid: int) -> tuple[int, list[SearchResult]]: + shard = self._shards[sid] + results = shard.search( + query, + embedder=self._embedder, + reranker=self._reranker, + quality=quality, + top_k=per_shard_top_k, + ) + return sid, results + + with ThreadPoolExecutor(max_workers=min(self._num_shards, 4)) as pool: + futures = [pool.submit(_search_shard, sid) for sid in range(self._num_shards)] + for future in futures: + try: + sid, results = future.result() + shard_results[sid] = results + except Exception: + logger.warning("Shard search failed", exc_info=True) + + # If only one shard returned results, no merging needed + non_empty = {k: v for k, v in shard_results.items() if v} + if not non_empty: + return [] + if len(non_empty) == 1: + results = list(non_empty.values())[0] + return results[:final_top_k] + + # Cross-shard RRF merge + # Build ranked lists keyed by shard name, with (doc_id, score) tuples + # Use a global result map to look up SearchResult by a unique key + # Since doc_ids are shard-local, we need a composite key + rrf_input: dict[str, list[tuple[int, float]]] = {} + global_results: dict[int, SearchResult] = {} + global_id = 0 + + for sid, results in non_empty.items(): + ranked: list[tuple[int, float]] = [] + for r in results: + global_results[global_id] = r + ranked.append((global_id, r.score)) + global_id += 1 + rrf_input[f"shard_{sid}"] = ranked + + fused = reciprocal_rank_fusion(rrf_input, k=cfg.fusion_k) + + merged: list[SearchResult] = [] + for gid, fused_score in fused[:final_top_k]: + result = global_results[gid] + merged.append(SearchResult( + id=result.id, + path=result.path, + score=fused_score, + snippet=result.snippet, + line=result.line, + end_line=result.end_line, + content=result.content, + )) + + return merged diff --git a/codex-lens-v2/src/codexlens_search/indexing/metadata.py b/codex-lens-v2/src/codexlens_search/indexing/metadata.py index 3d963631..c10b4b96 100644 --- a/codex-lens-v2/src/codexlens_search/indexing/metadata.py +++ b/codex-lens-v2/src/codexlens_search/indexing/metadata.py @@ -2,6 +2,7 @@ from __future__ import annotations import sqlite3 +import time from pathlib import Path @@ -9,7 +10,8 @@ class MetadataStore: """Tracks file-to-chunk mappings and deleted chunk IDs (tombstones). Tables: - files - file_path (PK), content_hash, last_modified + files - file_path (PK), content_hash, last_modified, file_size, + tier ('hot'/'warm'/'cold'), last_accessed (epoch float) chunks - chunk_id (PK), file_path (FK CASCADE), chunk_hash deleted_chunks - chunk_id (PK) for tombstone tracking """ @@ -19,13 +21,18 @@ class MetadataStore: self._conn.execute("PRAGMA foreign_keys = ON") self._conn.execute("PRAGMA journal_mode = WAL") self._create_tables() + self._migrate_size_column() + self._migrate_tier_columns() def _create_tables(self) -> None: self._conn.executescript(""" CREATE TABLE IF NOT EXISTS files ( file_path TEXT PRIMARY KEY, content_hash TEXT NOT NULL, - last_modified REAL NOT NULL + last_modified REAL NOT NULL, + file_size INTEGER NOT NULL DEFAULT 0, + tier TEXT NOT NULL DEFAULT 'warm', + last_accessed REAL ); CREATE TABLE IF NOT EXISTS chunks ( @@ -41,14 +48,48 @@ class MetadataStore: """) self._conn.commit() + def _migrate_size_column(self) -> None: + """Add file_size column if missing (for pre-existing DBs).""" + cols = { + row[1] + for row in self._conn.execute("PRAGMA table_info(files)").fetchall() + } + if "file_size" not in cols: + self._conn.execute( + "ALTER TABLE files ADD COLUMN file_size INTEGER NOT NULL DEFAULT 0" + ) + self._conn.commit() + + def _migrate_tier_columns(self) -> None: + """Add tier and last_accessed columns if missing (for pre-existing DBs).""" + cols = { + row[1] + for row in self._conn.execute("PRAGMA table_info(files)").fetchall() + } + if "tier" not in cols: + self._conn.execute( + "ALTER TABLE files ADD COLUMN tier TEXT NOT NULL DEFAULT 'warm'" + ) + if "last_accessed" not in cols: + self._conn.execute( + "ALTER TABLE files ADD COLUMN last_accessed REAL" + ) + if "tier" not in cols or "last_accessed" not in cols: + self._conn.commit() + def register_file( - self, file_path: str, content_hash: str, mtime: float + self, + file_path: str, + content_hash: str, + mtime: float, + file_size: int = 0, ) -> None: """Insert or update a file record.""" self._conn.execute( - "INSERT OR REPLACE INTO files (file_path, content_hash, last_modified) " - "VALUES (?, ?, ?)", - (file_path, content_hash, mtime), + "INSERT OR REPLACE INTO files " + "(file_path, content_hash, last_modified, file_size) " + "VALUES (?, ?, ?, ?)", + (file_path, content_hash, mtime, file_size), ) self._conn.commit() @@ -121,6 +162,24 @@ class MetadataStore: return True # New file return stored != content_hash + def file_needs_update_fast( + self, file_path: str, mtime: float, size: int + ) -> bool: + """Fast pre-check using mtime and file size (no content read needed). + + Returns True if the file appears changed or is not yet tracked. + When mtime and size both match stored values, the file is assumed + unchanged (~1000x faster than content-hash comparison). + """ + row = self._conn.execute( + "SELECT last_modified, file_size FROM files WHERE file_path = ?", + (file_path,), + ).fetchone() + if row is None: + return True # New file + stored_mtime, stored_size = row + return stored_mtime != mtime or stored_size != size + def compact_deleted(self) -> set[int]: """Return deleted IDs and clear the deleted_chunks table. @@ -161,5 +220,81 @@ class MetadataStore: ).fetchone() return row[0] if row[0] is not None else -1 + # ------------------------------------------------------------------ + # Tier management + # ------------------------------------------------------------------ + + def record_access(self, file_path: str) -> None: + """Update last_accessed timestamp for a file.""" + self._conn.execute( + "UPDATE files SET last_accessed = ? WHERE file_path = ?", + (time.time(), file_path), + ) + self._conn.commit() + + def record_access_batch(self, file_paths: list[str]) -> None: + """Batch-update last_accessed timestamps for multiple files.""" + if not file_paths: + return + now = time.time() + self._conn.executemany( + "UPDATE files SET last_accessed = ? WHERE file_path = ?", + [(now, fp) for fp in file_paths], + ) + self._conn.commit() + + def classify_tiers( + self, hot_threshold_hours: int = 24, cold_threshold_hours: int = 168 + ) -> None: + """Reclassify all files into hot/warm/cold tiers based on last_accessed. + + - hot: last_accessed within hot_threshold_hours + - cold: last_accessed older than cold_threshold_hours (or never accessed) + - warm: everything in between + """ + now = time.time() + hot_cutoff = now - hot_threshold_hours * 3600 + cold_cutoff = now - cold_threshold_hours * 3600 + + # Hot: recently accessed + self._conn.execute( + "UPDATE files SET tier = 'hot' " + "WHERE last_accessed IS NOT NULL AND last_accessed >= ?", + (hot_cutoff,), + ) + # Cold: not accessed for a long time, or never accessed + self._conn.execute( + "UPDATE files SET tier = 'cold' " + "WHERE last_accessed IS NULL " + "OR (last_accessed < ? AND last_accessed < ?)", + (cold_cutoff, hot_cutoff), + ) + # Warm: between hot and cold cutoffs + self._conn.execute( + "UPDATE files SET tier = 'warm' " + "WHERE last_accessed IS NOT NULL " + "AND last_accessed >= ? AND last_accessed < ?", + (cold_cutoff, hot_cutoff), + ) + self._conn.commit() + + def get_files_by_tier(self, tier: str) -> list[str]: + """Return file paths in the specified tier ('hot', 'warm', or 'cold').""" + rows = self._conn.execute( + "SELECT file_path FROM files WHERE tier = ?", (tier,) + ).fetchall() + return [r[0] for r in rows] + + def get_cold_files(self) -> list[str]: + """Return file paths in the 'cold' tier.""" + return self.get_files_by_tier("cold") + + def get_file_tier(self, file_path: str) -> str | None: + """Return the tier for a specific file, or None if not tracked.""" + row = self._conn.execute( + "SELECT tier FROM files WHERE file_path = ?", (file_path,) + ).fetchone() + return row[0] if row else None + def close(self) -> None: self._conn.close() diff --git a/codex-lens-v2/src/codexlens_search/indexing/pipeline.py b/codex-lens-v2/src/codexlens_search/indexing/pipeline.py index b38ad96d..64172c23 100644 --- a/codex-lens-v2/src/codexlens_search/indexing/pipeline.py +++ b/codex-lens-v2/src/codexlens_search/indexing/pipeline.py @@ -17,8 +17,7 @@ from pathlib import Path import numpy as np from codexlens_search.config import Config -from codexlens_search.core.binary import BinaryStore -from codexlens_search.core.index import ANNIndex +from codexlens_search.core.base import BaseANNIndex, BaseBinaryIndex from codexlens_search.embed.base import BaseEmbedder from codexlens_search.indexing.metadata import MetadataStore from codexlens_search.search.fts import FTSEngine @@ -100,8 +99,8 @@ class IndexingPipeline: def __init__( self, embedder: BaseEmbedder, - binary_store: BinaryStore, - ann_index: ANNIndex, + binary_store: BaseBinaryIndex, + ann_index: BaseANNIndex, fts: FTSEngine, config: Config, metadata: MetadataStore | None = None, @@ -463,6 +462,94 @@ class IndexingPipeline: meta = self._require_metadata() return meta.max_chunk_id() + 1 + def index_files_fts_only( + self, + files: list[Path], + *, + root: Path | None = None, + max_chunk_chars: int = _DEFAULT_MAX_CHUNK_CHARS, + chunk_overlap: int = _DEFAULT_CHUNK_OVERLAP, + ) -> IndexStats: + """Index files into FTS5 only, without embedding or vector indexing. + + Chunks files using the same logic as the full pipeline, then inserts + directly into FTS. No embedding computation, no binary/ANN store writes. + + Args: + files: List of file paths to index. + root: Optional root for computing relative paths. + max_chunk_chars: Maximum characters per chunk. + chunk_overlap: Character overlap between consecutive chunks. + + Returns: + IndexStats with counts and timing. + """ + if not files: + return IndexStats() + + meta = self._require_metadata() + t0 = time.monotonic() + chunk_id = self._next_chunk_id() + files_processed = 0 + chunks_created = 0 + + for fpath in files: + exclude_reason = is_file_excluded(fpath, self._config) + if exclude_reason: + logger.debug("Skipping %s: %s", fpath, exclude_reason) + continue + try: + text = fpath.read_text(encoding="utf-8", errors="replace") + except Exception as exc: + logger.debug("Skipping %s: %s", fpath, exc) + continue + + rel_path = str(fpath.relative_to(root)) if root else str(fpath) + content_hash = self._content_hash(text) + + # Skip unchanged files + if not meta.file_needs_update(rel_path, content_hash): + continue + + # Remove old FTS data if file was previously indexed + if meta.get_file_hash(rel_path) is not None: + meta.mark_file_deleted(rel_path) + self._fts.delete_by_path(rel_path) + + file_chunks = self._smart_chunk(text, rel_path, max_chunk_chars, chunk_overlap) + if not file_chunks: + st = fpath.stat() + meta.register_file(rel_path, content_hash, st.st_mtime, st.st_size) + continue + + files_processed += 1 + fts_docs = [] + chunk_id_hashes = [] + for chunk_text, path, sl, el in file_chunks: + fts_docs.append((chunk_id, path, chunk_text, sl, el)) + chunk_id_hashes.append((chunk_id, self._content_hash(chunk_text))) + chunk_id += 1 + + self._fts.add_documents(fts_docs) + chunks_created += len(fts_docs) + + # Register metadata + st = fpath.stat() + meta.register_file(rel_path, content_hash, st.st_mtime, st.st_size) + meta.register_chunks(rel_path, chunk_id_hashes) + + duration = time.monotonic() - t0 + stats = IndexStats( + files_processed=files_processed, + chunks_created=chunks_created, + duration_seconds=round(duration, 2), + ) + logger.info( + "FTS-only indexing complete: %d files, %d chunks in %.1fs", + stats.files_processed, stats.chunks_created, stats.duration_seconds, + ) + return stats + def index_file( self, file_path: Path, @@ -522,7 +609,8 @@ class IndexingPipeline: file_chunks = self._smart_chunk(text, rel_path, max_chunk_chars, chunk_overlap) if not file_chunks: # Register file with no chunks - meta.register_file(rel_path, content_hash, file_path.stat().st_mtime) + st = file_path.stat() + meta.register_file(rel_path, content_hash, st.st_mtime, st.st_size) return IndexStats( files_processed=1, duration_seconds=round(time.monotonic() - t0, 2), @@ -556,7 +644,8 @@ class IndexingPipeline: self._fts.add_documents(fts_docs) # Register in metadata - meta.register_file(rel_path, content_hash, file_path.stat().st_mtime) + st = file_path.stat() + meta.register_file(rel_path, content_hash, st.st_mtime, st.st_size) chunk_id_hashes = [ (batch_ids[i], self._content_hash(batch_texts[i])) for i in range(len(batch_ids)) @@ -605,6 +694,7 @@ class IndexingPipeline: chunk_overlap: int = _DEFAULT_CHUNK_OVERLAP, max_file_size: int = 50_000, progress_callback: callable | None = None, + tier: str = "full", ) -> IndexStats: """Reconcile index state against a current file list. @@ -617,6 +707,9 @@ class IndexingPipeline: max_chunk_chars: Maximum characters per chunk. chunk_overlap: Character overlap between consecutive chunks. max_file_size: Skip files larger than this (bytes). + tier: Indexing tier - 'full' (default) runs the full pipeline + with embedding, 'fts_only' runs FTS-only indexing without + embedding or vector stores. Returns: Aggregated IndexStats for all operations. @@ -638,33 +731,72 @@ class IndexingPipeline: for rel in removed: self.remove_file(rel) - # Collect files needing update + # Collect files needing update using 4-level detection: + # Level 1: set diff (removed files) - handled above + # Level 2: mtime + size fast pre-check via stat() + # Level 3: content hash only when mtime/size mismatch files_to_index: list[Path] = [] for rel, fpath in current_rel_paths.items(): + # Level 2: stat-based fast check + try: + st = fpath.stat() + except OSError: + continue + if not meta.file_needs_update_fast(rel, st.st_mtime, st.st_size): + # mtime + size match stored values -> skip (no read needed) + continue + + # Level 3: mtime/size changed -> verify with content hash try: text = fpath.read_text(encoding="utf-8", errors="replace") except Exception: continue content_hash = self._content_hash(text) - if meta.file_needs_update(rel, content_hash): - # Remove old data if previously indexed - if meta.get_file_hash(rel) is not None: - meta.mark_file_deleted(rel) - self._fts.delete_by_path(rel) - files_to_index.append(fpath) + if not meta.file_needs_update(rel, content_hash): + # Content unchanged despite mtime/size change -> update metadata only + meta.register_file(rel, content_hash, st.st_mtime, st.st_size) + continue - # Batch index via parallel pipeline + # File genuinely changed -> remove old data and queue for re-index + if meta.get_file_hash(rel) is not None: + meta.mark_file_deleted(rel) + self._fts.delete_by_path(rel) + files_to_index.append(fpath) + + # Sort files by data tier priority: hot first, then warm, then cold if files_to_index: - # Set starting chunk ID from metadata - start_id = self._next_chunk_id() - batch_stats = self._index_files_with_metadata( - files_to_index, - root=root, - max_chunk_chars=max_chunk_chars, - chunk_overlap=chunk_overlap, - start_chunk_id=start_id, - progress_callback=progress_callback, - ) + _tier_priority = {"hot": 0, "warm": 1, "cold": 2} + def _tier_sort_key(fp: Path) -> int: + rel = str(fp.relative_to(root)) if root else str(fp) + t = meta.get_file_tier(rel) + return _tier_priority.get(t or "warm", 1) + files_to_index.sort(key=_tier_sort_key) + + # Reclassify data tiers after sync detection + meta.classify_tiers( + self._config.tier_hot_hours, self._config.tier_cold_hours + ) + + # Batch index via parallel pipeline or FTS-only + if files_to_index: + if tier == "fts_only": + batch_stats = self.index_files_fts_only( + files_to_index, + root=root, + max_chunk_chars=max_chunk_chars, + chunk_overlap=chunk_overlap, + ) + else: + # Full pipeline with embedding + start_id = self._next_chunk_id() + batch_stats = self._index_files_with_metadata( + files_to_index, + root=root, + max_chunk_chars=max_chunk_chars, + chunk_overlap=chunk_overlap, + start_chunk_id=start_id, + progress_callback=progress_callback, + ) total_files = batch_stats.files_processed total_chunks = batch_stats.chunks_created else: @@ -781,7 +913,8 @@ class IndexingPipeline: file_chunks = self._smart_chunk(text, rel_path, max_chunk_chars, chunk_overlap) if not file_chunks: - meta.register_file(rel_path, content_hash, fpath.stat().st_mtime) + st = fpath.stat() + meta.register_file(rel_path, content_hash, st.st_mtime, st.st_size) continue files_processed += 1 @@ -806,7 +939,8 @@ class IndexingPipeline: chunks_created += len(file_chunk_ids) # Register metadata per file - meta.register_file(rel_path, content_hash, fpath.stat().st_mtime) + st = fpath.stat() + meta.register_file(rel_path, content_hash, st.st_mtime, st.st_size) chunk_id_hashes = [ (cid, self._content_hash(ct)) for cid, ct in file_chunk_ids ] diff --git a/codex-lens-v2/src/codexlens_search/mcp_server.py b/codex-lens-v2/src/codexlens_search/mcp_server.py index 37ac6df6..b5448960 100644 --- a/codex-lens-v2/src/codexlens_search/mcp_server.py +++ b/codex-lens-v2/src/codexlens_search/mcp_server.py @@ -102,13 +102,20 @@ def _get_pipelines(project_path: str) -> tuple: # --------------------------------------------------------------------------- @mcp.tool() -def search_code(project_path: str, query: str, top_k: int = 10) -> str: +def search_code( + project_path: str, query: str, top_k: int = 10, quality: str = "auto" +) -> str: """Semantic code search with hybrid fusion (vector + FTS + reranking). Args: project_path: Absolute path to the project root directory. query: Natural language or code search query. top_k: Maximum number of results to return (default 10). + quality: Search quality tier (default "auto"): + - "fast": FTS-only + rerank (no embedding needed, fastest) + - "balanced": FTS + binary coarse search + rerank + - "thorough": Full 2-stage vector + FTS + reranking (best quality) + - "auto": Uses "thorough" if vector index exists, else "fast" Returns: Search results as formatted text with file paths, line numbers, scores, and code snippets. @@ -121,15 +128,75 @@ def search_code(project_path: str, query: str, top_k: int = 10) -> str: if not (db_path / "metadata.db").exists(): return f"Error: no index found at {db_path}. Run index_project first." + valid_qualities = ("fast", "balanced", "thorough", "auto") + if quality not in valid_qualities: + return f"Error: invalid quality '{quality}'. Must be one of: {', '.join(valid_qualities)}" + _, search, _ = _get_pipelines(project_path) - results = search.search(query, top_k=top_k) + results = search.search(query, top_k=top_k, quality=quality) if not results: return "No results found." lines = [] for i, r in enumerate(results, 1): - lines.append(f"## Result {i} — {r.path} (L{r.line}-{r.end_line}, score: {r.score:.4f})") + lines.append(f"## Result {i} -- {r.path} (L{r.line}-{r.end_line}, score: {r.score:.4f})") + lines.append(f"```\n{r.content}\n```") + lines.append("") + return "\n".join(lines) + + +@mcp.tool() +def search_scope( + project_path: str, + query: str, + scope_path: str, + top_k: int = 10, + quality: str = "auto", +) -> str: + """Search within a specific directory scope of a project. + + Runs a normal search then filters results to only include files + under the specified scope path. + + Args: + project_path: Absolute path to the project root directory. + query: Natural language or code search query. + scope_path: Relative directory path to limit search scope (e.g. "src/auth"). + top_k: Maximum number of scoped results to return (default 10). + quality: Search quality tier ("fast", "balanced", "thorough", "auto"). + + Returns: + Search results filtered to the scope path. + """ + root = Path(project_path).resolve() + if not root.is_dir(): + return f"Error: project path not found: {root}" + + db_path = _db_path_for_project(project_path) + if not (db_path / "metadata.db").exists(): + return f"Error: no index found at {db_path}. Run index_project first." + + # Normalize scope path for prefix matching + scope = scope_path.replace("\\", "/").strip("/") + + _, search, _ = _get_pipelines(project_path) + # Fetch more results than top_k to account for filtering + all_results = search.search(query, top_k=top_k * 5, quality=quality) + + # Filter by scope path prefix + scoped = [ + r for r in all_results + if r.path.replace("\\", "/").startswith(scope + "/") + or r.path.replace("\\", "/") == scope + ] + + if not scoped: + return f"No results found in scope '{scope_path}'." + + lines = [] + for i, r in enumerate(scoped[:top_k], 1): + lines.append(f"## Result {i} -- {r.path} (L{r.line}-{r.end_line}, score: {r.score:.4f})") lines.append(f"```\n{r.content}\n```") lines.append("") return "\n".join(lines) @@ -275,6 +342,59 @@ async def index_update( ) +@mcp.tool() +def index_scope( + project_path: str, + scope_path: str, + glob_pattern: str = "**/*", + tier: str = "full", +) -> str: + """Index a specific directory scope within a project. + + Useful for quickly indexing a subdirectory (e.g. after editing files + in a specific module) without re-indexing the entire project. + + Args: + project_path: Absolute path to the project root directory. + scope_path: Relative directory path to index (e.g. "src/auth"). + glob_pattern: Glob pattern for files within scope (default "**/*"). + tier: Indexing tier - "full" (default) runs full pipeline with + embedding, "fts_only" indexes text only (faster, no vectors). + + Returns: + Indexing summary for the scoped directory. + """ + root = Path(project_path).resolve() + if not root.is_dir(): + return f"Error: project path not found: {root}" + + scope_dir = root / scope_path + if not scope_dir.is_dir(): + return f"Error: scope directory not found: {scope_dir}" + + valid_tiers = ("full", "fts_only") + if tier not in valid_tiers: + return f"Error: invalid tier '{tier}'. Must be one of: {', '.join(valid_tiers)}" + + indexing, _, _ = _get_pipelines(project_path) + + file_paths = [ + p for p in scope_dir.glob(glob_pattern) + if p.is_file() and not should_exclude(p.relative_to(root), DEFAULT_EXCLUDES) + ] + + if not file_paths: + return f"No files found in {scope_path} matching '{glob_pattern}'." + + stats = indexing.sync(file_paths, root=root, tier=tier) + tier_label = "FTS-only" if tier == "fts_only" else "full" + return ( + f"Indexed {stats.files_processed} files ({tier_label}), " + f"{stats.chunks_created} chunks in {stats.duration_seconds:.1f}s. " + f"Scope: {scope_path}" + ) + + # --------------------------------------------------------------------------- # File discovery # --------------------------------------------------------------------------- diff --git a/codex-lens-v2/src/codexlens_search/search/pipeline.py b/codex-lens-v2/src/codexlens_search/search/pipeline.py index b766779c..0331a11f 100644 --- a/codex-lens-v2/src/codexlens_search/search/pipeline.py +++ b/codex-lens-v2/src/codexlens_search/search/pipeline.py @@ -7,7 +7,7 @@ from dataclasses import dataclass import numpy as np from ..config import Config -from ..core import ANNIndex, BinaryStore +from ..core.base import BaseANNIndex, BaseBinaryIndex from ..embed import BaseEmbedder from ..indexing.metadata import MetadataStore from ..rerank import BaseReranker @@ -21,6 +21,8 @@ from .fusion import ( _log = logging.getLogger(__name__) +_VALID_QUALITIES = ("fast", "balanced", "thorough", "auto") + @dataclass class SearchResult: @@ -37,8 +39,8 @@ class SearchPipeline: def __init__( self, embedder: BaseEmbedder, - binary_store: BinaryStore, - ann_index: ANNIndex, + binary_store: BaseBinaryIndex, + ann_index: BaseANNIndex, reranker: BaseReranker, fts: FTSEngine, config: Config, @@ -52,6 +54,15 @@ class SearchPipeline: self._config = config self._metadata_store = metadata_store + # -- Helper: check if vector index has data ---------------------------- + + def _has_vector_index(self) -> bool: + """Check if the binary store has any indexed entries.""" + try: + return len(self._binary_store) > 0 + except Exception: + return False + # -- Helper: vector search (binary coarse + ANN fine) ----------------- def _vector_search( @@ -84,6 +95,21 @@ class SearchPipeline: ] return vector_results + # -- Helper: binary coarse search only -------------------------------- + + def _binary_coarse_search( + self, query_vec: np.ndarray + ) -> list[tuple[int, float]]: + """Run binary coarse search only (no ANN fine search).""" + cfg = self._config + candidate_ids, distances = self._binary_store.coarse_search( + query_vec, top_k=cfg.binary_top_k + ) + return [ + (int(doc_id), float(dist)) + for doc_id, dist in zip(candidate_ids, distances) + ] + # -- Helper: FTS search (exact + fuzzy) ------------------------------ def _fts_search( @@ -95,55 +121,12 @@ class SearchPipeline: fuzzy_results = self._fts.fuzzy_search(query, top_k=cfg.fts_top_k) return exact_results, fuzzy_results - # -- Main search entry point ----------------------------------------- + # -- Helper: filter deleted IDs --------------------------------------- - def search(self, query: str, top_k: int | None = None) -> list[SearchResult]: - cfg = self._config - final_top_k = top_k if top_k is not None else cfg.reranker_top_k - - # 1. Detect intent -> adaptive weights - intent = detect_query_intent(query) - weights = get_adaptive_weights(intent, cfg.fusion_weights) - - # 2. Embed query - query_vec = self._embedder.embed_single(query) - - # 3. Parallel vector + FTS search - vector_results: list[tuple[int, float]] = [] - exact_results: list[tuple[int, float]] = [] - fuzzy_results: list[tuple[int, float]] = [] - - with ThreadPoolExecutor(max_workers=2) as pool: - vec_future = pool.submit(self._vector_search, query_vec) - fts_future = pool.submit(self._fts_search, query) - - # Collect vector results - try: - vector_results = vec_future.result() - except Exception: - _log.warning("Vector search failed, using empty results", exc_info=True) - - # Collect FTS results - try: - exact_results, fuzzy_results = fts_future.result() - except Exception: - _log.warning("FTS search failed, using empty results", exc_info=True) - - # 4. RRF fusion - fusion_input: dict[str, list[tuple[int, float]]] = {} - if vector_results: - fusion_input["vector"] = vector_results - if exact_results: - fusion_input["exact"] = exact_results - if fuzzy_results: - fusion_input["fuzzy"] = fuzzy_results - - if not fusion_input: - return [] - - fused = reciprocal_rank_fusion(fusion_input, weights=weights, k=cfg.fusion_k) - - # 4b. Filter out deleted IDs (tombstone filtering) + def _filter_deleted( + self, fused: list[tuple[int, float]] + ) -> list[tuple[int, float]]: + """Remove tombstoned chunk IDs from results.""" if self._metadata_store is not None: deleted_ids = self._metadata_store.get_deleted_ids() if deleted_ids: @@ -152,16 +135,30 @@ class SearchPipeline: for doc_id, score in fused if doc_id not in deleted_ids ] + return fused - # 5. Rerank top candidates - rerank_ids = [doc_id for doc_id, _ in fused[:50]] - contents = [self._fts.get_content(doc_id) for doc_id in rerank_ids] - rerank_scores = self._reranker.score_pairs(query, contents) + # -- Helper: rerank and build results --------------------------------- - # 6. Sort by rerank score, build SearchResult list - ranked = sorted( - zip(rerank_ids, rerank_scores), key=lambda x: x[1], reverse=True - ) + def _rerank_and_build( + self, + query: str, + fused: list[tuple[int, float]], + final_top_k: int, + use_reranker: bool = True, + ) -> list[SearchResult]: + """Rerank candidates (optionally) and build SearchResult list.""" + if not fused: + return [] + + if use_reranker: + rerank_ids = [doc_id for doc_id, _ in fused[:50]] + contents = [self._fts.get_content(doc_id) for doc_id in rerank_ids] + rerank_scores = self._reranker.score_pairs(query, contents) + ranked = sorted( + zip(rerank_ids, rerank_scores), key=lambda x: x[1], reverse=True + ) + else: + ranked = fused results: list[SearchResult] = [] for doc_id, score in ranked[:final_top_k]: @@ -179,3 +176,178 @@ class SearchPipeline: ) ) return results + + # -- Helper: record access for tier tracking -------------------------- + + def _record_access(self, results: list[SearchResult]) -> None: + """Record file access for data tier tracking.""" + if results and self._metadata_store is not None: + unique_paths = list({r.path for r in results}) + try: + self._metadata_store.record_access_batch(unique_paths) + except Exception: + _log.debug("Failed to record access for tier tracking", exc_info=True) + + # -- Quality-routed search methods ------------------------------------ + + def _search_fast( + self, query: str, final_top_k: int + ) -> list[SearchResult]: + """FTS-only search with reranking. No embedding needed.""" + exact_results, fuzzy_results = self._fts_search(query) + + fusion_input: dict[str, list[tuple[int, float]]] = {} + if exact_results: + fusion_input["exact"] = exact_results + if fuzzy_results: + fusion_input["fuzzy"] = fuzzy_results + + if not fusion_input: + return [] + + fused = reciprocal_rank_fusion( + fusion_input, weights={"exact": 0.7, "fuzzy": 0.3}, + k=self._config.fusion_k, + ) + fused = self._filter_deleted(fused) + return self._rerank_and_build(query, fused, final_top_k, use_reranker=True) + + def _search_balanced( + self, query: str, final_top_k: int + ) -> list[SearchResult]: + """FTS + binary coarse search with RRF fusion and reranking. + + Embeds the query for binary coarse search but skips ANN fine search. + """ + intent = detect_query_intent(query) + weights = get_adaptive_weights(intent, self._config.fusion_weights) + + query_vec = self._embedder.embed_single(query) + + # Parallel: binary coarse + FTS + coarse_results: list[tuple[int, float]] = [] + exact_results: list[tuple[int, float]] = [] + fuzzy_results: list[tuple[int, float]] = [] + + with ThreadPoolExecutor(max_workers=2) as pool: + coarse_future = pool.submit(self._binary_coarse_search, query_vec) + fts_future = pool.submit(self._fts_search, query) + + try: + coarse_results = coarse_future.result() + except Exception: + _log.warning("Binary coarse search failed", exc_info=True) + + try: + exact_results, fuzzy_results = fts_future.result() + except Exception: + _log.warning("FTS search failed", exc_info=True) + + fusion_input: dict[str, list[tuple[int, float]]] = {} + if coarse_results: + fusion_input["vector"] = coarse_results + if exact_results: + fusion_input["exact"] = exact_results + if fuzzy_results: + fusion_input["fuzzy"] = fuzzy_results + + if not fusion_input: + return [] + + fused = reciprocal_rank_fusion(fusion_input, weights=weights, k=self._config.fusion_k) + fused = self._filter_deleted(fused) + return self._rerank_and_build(query, fused, final_top_k, use_reranker=True) + + def _search_thorough( + self, query: str, final_top_k: int + ) -> list[SearchResult]: + """Full 2-stage vector + FTS + reranking pipeline (original behavior).""" + cfg = self._config + + intent = detect_query_intent(query) + weights = get_adaptive_weights(intent, cfg.fusion_weights) + + query_vec = self._embedder.embed_single(query) + + # Parallel vector + FTS search + vector_results: list[tuple[int, float]] = [] + exact_results: list[tuple[int, float]] = [] + fuzzy_results: list[tuple[int, float]] = [] + + with ThreadPoolExecutor(max_workers=2) as pool: + vec_future = pool.submit(self._vector_search, query_vec) + fts_future = pool.submit(self._fts_search, query) + + try: + vector_results = vec_future.result() + except Exception: + _log.warning("Vector search failed, using empty results", exc_info=True) + + try: + exact_results, fuzzy_results = fts_future.result() + except Exception: + _log.warning("FTS search failed, using empty results", exc_info=True) + + fusion_input: dict[str, list[tuple[int, float]]] = {} + if vector_results: + fusion_input["vector"] = vector_results + if exact_results: + fusion_input["exact"] = exact_results + if fuzzy_results: + fusion_input["fuzzy"] = fuzzy_results + + if not fusion_input: + return [] + + fused = reciprocal_rank_fusion(fusion_input, weights=weights, k=cfg.fusion_k) + fused = self._filter_deleted(fused) + return self._rerank_and_build(query, fused, final_top_k, use_reranker=True) + + # -- Main search entry point ----------------------------------------- + + def search( + self, + query: str, + top_k: int | None = None, + quality: str | None = None, + ) -> list[SearchResult]: + """Search with quality-based routing. + + Args: + query: Search query string. + top_k: Maximum results to return. + quality: Search quality tier: + - 'fast': FTS-only + rerank (no embedding, no vector search) + - 'balanced': FTS + binary coarse + rerank (no ANN fine search) + - 'thorough': Full 2-stage vector + FTS + reranking + - 'auto': Selects 'thorough' if vectors exist, else 'fast' + - None: Uses config.default_search_quality + + Returns: + List of SearchResult ordered by relevance. + """ + cfg = self._config + final_top_k = top_k if top_k is not None else cfg.reranker_top_k + + # Resolve quality tier + effective_quality = quality or cfg.default_search_quality + if effective_quality not in _VALID_QUALITIES: + _log.warning( + "Invalid search quality '%s', falling back to 'auto'", + effective_quality, + ) + effective_quality = "auto" + + # Auto-detect: use thorough if vector index has data, else fast + if effective_quality == "auto": + effective_quality = "thorough" if self._has_vector_index() else "fast" + + if effective_quality == "fast": + results = self._search_fast(query, final_top_k) + elif effective_quality == "balanced": + results = self._search_balanced(query, final_top_k) + else: + results = self._search_thorough(query, final_top_k) + + self._record_access(results) + return results diff --git a/codex-lens-v2/src/codexlens_search/watcher/file_watcher.py b/codex-lens-v2/src/codexlens_search/watcher/file_watcher.py index 7fd8473f..20289373 100644 --- a/codex-lens-v2/src/codexlens_search/watcher/file_watcher.py +++ b/codex-lens-v2/src/codexlens_search/watcher/file_watcher.py @@ -20,6 +20,7 @@ from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer from .events import ChangeType, FileEvent, WatcherConfig +from .incremental_indexer import IncrementalIndexer logger = logging.getLogger(__name__) @@ -261,3 +262,24 @@ class FileWatcher: if output: sys.stdout.write(output + "\n") sys.stdout.flush() + + @classmethod + def create_with_indexer( + cls, + root_path: Path, + config: WatcherConfig, + indexer: IncrementalIndexer, + ) -> "FileWatcher": + """Create a FileWatcher wired to an IncrementalIndexer's async path. + + Uses ``indexer.process_events_async()`` as the callback so that + events are debounced and batched within the indexer before + processing, preventing redundant per-file pipeline startups. + + Example:: + + indexer = IncrementalIndexer(pipeline, root=root) + watcher = FileWatcher.create_with_indexer(root, config, indexer) + watcher.start() + """ + return cls(root_path, config, indexer.process_events_async) diff --git a/codex-lens-v2/src/codexlens_search/watcher/incremental_indexer.py b/codex-lens-v2/src/codexlens_search/watcher/incremental_indexer.py index d0d3f265..159d891c 100644 --- a/codex-lens-v2/src/codexlens_search/watcher/incremental_indexer.py +++ b/codex-lens-v2/src/codexlens_search/watcher/incremental_indexer.py @@ -4,10 +4,13 @@ Ported from codex-lens v1 with simplifications: - Uses IndexingPipeline.index_file() / remove_file() directly - No v1-specific Config, ParserFactory, DirIndexStore dependencies - Per-file error isolation: one failure does not stop batch processing +- Debounce batching: process_events_async() buffers events and flushes + after a configurable window to prevent redundant per-file pipeline startups """ from __future__ import annotations import logging +import threading from dataclasses import dataclass, field from pathlib import Path from typing import List, Optional @@ -60,6 +63,7 @@ class IncrementalIndexer: pipeline: IndexingPipeline, *, root: Optional[Path] = None, + debounce_window_ms: int = 500, ) -> None: """Initialize the incremental indexer. @@ -67,9 +71,15 @@ class IncrementalIndexer: pipeline: The indexing pipeline with metadata store configured. root: Optional project root for computing relative paths. If None, absolute paths are used as identifiers. + debounce_window_ms: Milliseconds to buffer events before flushing + in process_events_async(). Default 500ms. """ self._pipeline = pipeline self._root = root + self._debounce_window_ms = debounce_window_ms + self._event_buffer: List[FileEvent] = [] + self._buffer_lock = threading.Lock() + self._flush_timer: Optional[threading.Timer] = None def process_events(self, events: List[FileEvent]) -> BatchResult: """Process a batch of file events with per-file error isolation. @@ -107,6 +117,52 @@ class IncrementalIndexer: return result + def process_events_async(self, events: List[FileEvent]) -> None: + """Buffer events and flush after the debounce window expires. + + Non-blocking: events are accumulated in an internal buffer. + When no new events arrive within *debounce_window_ms*, the buffer + is flushed and all accumulated events are processed as a single + batch via process_events(). + + Args: + events: List of file events to buffer. + """ + with self._buffer_lock: + self._event_buffer.extend(events) + + # Cancel previous timer and start a new one (true debounce) + if self._flush_timer is not None: + self._flush_timer.cancel() + + self._flush_timer = threading.Timer( + self._debounce_window_ms / 1000.0, + self._flush_buffer, + ) + self._flush_timer.daemon = True + self._flush_timer.start() + + def _flush_buffer(self) -> None: + """Flush the event buffer and process all accumulated events.""" + with self._buffer_lock: + if not self._event_buffer: + return + events = list(self._event_buffer) + self._event_buffer.clear() + self._flush_timer = None + + # Deduplicate: keep the last event per path + seen: dict[Path, FileEvent] = {} + for event in events: + seen[event.path] = event + deduped = list(seen.values()) + + logger.debug( + "Flushing debounce buffer: %d events (%d after dedup)", + len(events), len(deduped), + ) + self.process_events(deduped) + def _handle_index(self, event: FileEvent, result: BatchResult) -> None: """Index a created or modified file.""" stats = self._pipeline.index_file(