From 683409464c28a94d1d590b869999036e03e32fa6 Mon Sep 17 00:00:00 2001 From: cexll Date: Sun, 1 Mar 2026 22:14:16 +0800 Subject: [PATCH] feat: add harness skill with hooks install/uninstall support (#156) Add multi-session autonomous agent harness with progress checkpointing, failure recovery, task dependencies, and post-completion self-reflection. - Add harness module to config.json (copy_dir with hooks.json) - Add 7 hook scripts: stop, sessionstart, teammateidle, subagentstop, claim, renew, self-reflect-stop + shared _harness_common.py - Fix self-reflect-stop: only triggers when harness was initialized (checks harness-tasks.json existence), not on every session - Add unmerge_hooks_from_settings() to uninstall.py for clean hook removal - Add unit tests (57 tests) and E2E test (100 tasks + 5 self-reflect) Generated with SWE-Agent.ai Co-Authored-By: SWE-Agent.ai --- config.json | 12 + skills/harness/SKILL.md | 63 +- skills/harness/hooks/_harness_common.py | 410 ++++++++++ skills/harness/hooks/harness-claim.py | 301 ++++++++ skills/harness/hooks/harness-renew.py | 214 +++++ skills/harness/hooks/harness-sessionstart.py | 186 +++++ skills/harness/hooks/harness-stop.py | 314 ++++++++ skills/harness/hooks/harness-subagentstop.py | 137 ++++ skills/harness/hooks/harness-teammateidle.py | 160 ++++ skills/harness/hooks/hooks.json | 60 ++ skills/harness/hooks/self-reflect-stop.py | 210 +++++ skills/harness/tests/e2e-100tasks.sh | 178 +++++ skills/harness/tests/test_hooks.py | 774 +++++++++++++++++++ uninstall.py | 42 + 14 files changed, 3051 insertions(+), 10 deletions(-) create mode 100644 skills/harness/hooks/_harness_common.py create mode 100755 skills/harness/hooks/harness-claim.py create mode 100755 skills/harness/hooks/harness-renew.py create mode 100755 skills/harness/hooks/harness-sessionstart.py create mode 100755 skills/harness/hooks/harness-stop.py create mode 100755 skills/harness/hooks/harness-subagentstop.py create mode 100755 skills/harness/hooks/harness-teammateidle.py create mode 100644 skills/harness/hooks/hooks.json create mode 100644 skills/harness/hooks/self-reflect-stop.py create mode 100755 skills/harness/tests/e2e-100tasks.sh create mode 100644 skills/harness/tests/test_hooks.py diff --git a/config.json b/config.json index d8907aa..be6a1d6 100644 --- a/config.json +++ b/config.json @@ -196,6 +196,18 @@ } ] }, + "harness": { + "enabled": false, + "description": "Multi-session autonomous agent harness with progress checkpointing, failure recovery, task dependencies, and post-completion self-reflection", + "operations": [ + { + "type": "copy_dir", + "source": "skills/harness", + "target": "skills/harness", + "description": "Install harness skill with hooks (Stop, SessionStart, TeammateIdle, SubagentStop, self-reflect)" + } + ] + }, "claudekit": { "enabled": false, "description": "ClaudeKit workflow: skills/do + global hooks (pre-bash, inject-spec, log-prompt)", diff --git a/skills/harness/SKILL.md b/skills/harness/SKILL.md index 3ae40cf..4e90acd 100644 --- a/skills/harness/SKILL.md +++ b/skills/harness/SKILL.md @@ -26,6 +26,15 @@ Executable protocol enabling any agent task to run continuously across multiple /harness add "task description" # Add a task to the list ``` +## Activation Marker + +Hooks only take effect when `.harness-active` marker file exists in the harness root (same directory as `harness-tasks.json`). +Hook 注册配置在 `hooks/hooks.json`。 + +- `/harness init` and `/harness run` MUST create this marker: `touch /.harness-active` +- When all tasks complete (no pending/in_progress/retryable left), remove it: `rm /.harness-active` +- Without this marker, all hooks are no-ops — they exit 0 immediately + ## Progress Persistence (Dual-File System) Maintain two files in the project working directory: @@ -54,6 +63,7 @@ Free-text log of all agent actions across sessions. Never truncate. "version": 2, "created": "2025-07-01T10:00:00Z", "session_config": { + "concurrency_mode": "exclusive", "max_tasks_per_session": 20, "max_sessions": 50 }, @@ -126,6 +136,8 @@ Free-text log of all agent actions across sessions. Never truncate. Task statuses: `pending` → `in_progress` (transient, set only during active execution) → `completed` or `failed`. A task found as `in_progress` at session start means the previous session was interrupted — handle via Context Window Recovery Protocol. +In concurrent mode (see Concurrency Control), tasks may also carry claim metadata: `claimed_by` and `lease_expires_at` (ISO timestamp). + **Session boundary**: A session starts when the agent begins executing the Session Start protocol and ends when a Stopping Condition is met or the context window resets. Each session gets a unique `SESSION-N` identifier (N = `session_count` after increment). ## Concurrency Control @@ -134,7 +146,23 @@ Before modifying `harness-tasks.json`, acquire an exclusive lock using portable ```bash # Acquire lock (fail fast if another agent is running) -LOCKDIR="/tmp/harness-$(printf '%s' "$(pwd)" | shasum -a 256 2>/dev/null || sha256sum | cut -c1-8).lock" +# Lock key must be stable even if invoked from a subdirectory. +ROOT="$PWD" +SEARCH="$PWD" +while [ "$SEARCH" != "/" ] && [ ! -f "$SEARCH/harness-tasks.json" ]; do + SEARCH="$(dirname "$SEARCH")" +done +if [ -f "$SEARCH/harness-tasks.json" ]; then + ROOT="$SEARCH" +fi + +PWD_HASH="$( + printf '%s' "$ROOT" | + (shasum -a 256 2>/dev/null || sha256sum 2>/dev/null) | + awk '{print $1}' | + cut -c1-16 +)" +LOCKDIR="/tmp/harness-${PWD_HASH:-unknown}.lock" if ! mkdir "$LOCKDIR" 2>/dev/null; then # Check if lock holder is still alive LOCK_PID=$(cat "$LOCKDIR/pid" 2>/dev/null) @@ -158,7 +186,16 @@ trap 'rm -rf "$LOCKDIR"' EXIT Log lock acquisition: `[timestamp] [SESSION-N] LOCK acquired (pid=)` Log lock release: `[timestamp] [SESSION-N] LOCK released` -The lock is held for the entire session. The `trap EXIT` handler releases it automatically on normal exit, errors, or signals. Never release the lock between tasks within a session. +Modes: + +- **Exclusive (default)**: hold the lock for the entire session (the `trap EXIT` handler releases it automatically). Any second session in the same state root fails fast. +- **Concurrent (opt-in via `session_config.concurrency_mode: "concurrent"`)**: treat this as a **state transaction lock**. Hold it only while reading/modifying/writing `harness-tasks.json` (including `.bak`/`.tmp`) and appending to `harness-progress.txt`. Release it immediately before doing real work. + +Concurrent mode invariants: + +- All workers MUST point at the same state root (the directory that contains `harness-tasks.json`). If you are using separate worktrees/clones, pin it explicitly (e.g., `HARNESS_STATE_ROOT=/abs/path/to/state-root`). +- Task selection is advisory; the real gate is **atomic claim** under the lock: set `status="in_progress"`, set `claimed_by` (stable worker id, e.g., `HARNESS_WORKER_ID`), set `lease_expires_at`. If claim fails (already `in_progress` with a valid lease), pick another eligible task and retry. +- Never run two workers in the same git working directory. Use separate worktrees/clones. Otherwise rollback (`git reset --hard` / `git clean -fd`) will destroy other workers. ## Infinite Loop Protocol @@ -166,7 +203,7 @@ The lock is held for the entire session. The `trap EXIT` handler releases it aut 1. **Read state**: Read last 200 lines of `harness-progress.txt` + full `harness-tasks.json`. If JSON is unparseable, see JSON corruption recovery in Error Handling. 2. **Read git**: Run `git log --oneline -20` and `git diff --stat` to detect uncommitted work -3. **Acquire lock**: Fail if another session is active +3. **Acquire lock** (mode-dependent): Exclusive mode fails if another session is active. Concurrent mode uses the lock only for state transactions. 4. **Recover interrupted tasks** (see Context Window Recovery below) 5. **Health check**: Run `harness-init.sh` if it exists 6. **Track session**: Increment `session_count` in JSON. Check `session_count` against `max_sessions` — if reached, log STATS and STOP. Initialize per-session task counter to 0. @@ -189,13 +226,13 @@ Then pick the next task in this priority order: For each task, execute this exact sequence: -1. **Claim**: Record `started_at_commit` = current HEAD hash. Set status to `in_progress`, log `Starting [] (base=<hash>)` +1. **Claim** (atomic, under lock): Record `started_at_commit` = current HEAD hash. Set status to `in_progress`, set `claimed_by`, set `lease_expires_at`, log `Starting [<task-id>] <title> (base=<hash>)`. If the task is already claimed (`in_progress` with a valid lease), pick another eligible task and retry. 2. **Execute with checkpoints**: Perform the work. After each significant step, log: ``` [timestamp] [SESSION-N] CHECKPOINT [task-id] step=M/N "description of what was done" ``` - Also append to the task's `checkpoints` array: `{ "step": M, "total": N, "description": "...", "timestamp": "ISO" }` -3. **Validate**: Run the task's `validation.command` wrapped with `timeout`: `timeout <timeout_seconds> <command>`. If no validation command, skip. Before running, verify the command exists (e.g., `command -v <binary>`) — if missing, treat as `ENV_SETUP` error. + Also append to the task's `checkpoints` array: `{ "step": M, "total": N, "description": "...", "timestamp": "ISO" }`. In concurrent mode, renew the lease at each checkpoint (push `lease_expires_at` forward). +3. **Validate**: Run the task's `validation.command` with a timeout wrapper (prefer `timeout`; on macOS use `gtimeout` from coreutils). If `validation.command` is empty/null, log `ERROR [<task-id>] [CONFIG] Missing validation.command` and STOP — do not declare completion without an objective check. Before running, verify the command exists (e.g., `command -v <binary>`) — if missing, treat as `ENV_SETUP` error. - Command exits 0 → PASS - Command exits non-zero → FAIL - Command exceeds timeout → TIMEOUT @@ -217,6 +254,9 @@ For each task, execute this exact sequence: When a new session starts and finds a task with `status: "in_progress"`: +- Exclusive mode: treat this as an interrupted previous session and run the Recovery Protocol below. +- Concurrent mode: only recover a task if either (a) `claimed_by` matches this worker, or (b) `lease_expires_at` is in the past (stale lease). Otherwise, treat it as owned by another worker and do not modify it. + 1. **Check git state**: ```bash git diff --stat # Uncommitted changes? @@ -243,6 +283,7 @@ Each error category has a default recovery strategy: | Category | Default Recovery | Agent Action | |----------|-----------------|--------------| | `ENV_SETUP` | Re-run init, then STOP if still failing | Run `harness-init.sh` again immediately. If fails twice, log and stop — environment is broken | +| `CONFIG` | STOP (requires human fix) | Log the config error precisely (file + field), then STOP. Do not guess or auto-mutate task metadata | | `TASK_EXEC` | Rollback via `git reset --hard <started_at_commit>`, retry | Verify `started_at_commit` exists (`git cat-file -t <hash>`). If missing, mark failed at max_attempts. Otherwise reset, run `on_failure.cleanup` if defined, retry if attempts < max_attempts | | `TEST_FAIL` | Rollback via `git reset --hard <started_at_commit>`, retry | Reset to `started_at_commit`, analyze test output to identify fix, retry with targeted changes | | `TIMEOUT` | Kill process, execute cleanup, retry | Wrap validation with `timeout <seconds> <command>`. On timeout, run `on_failure.cleanup`, retry (consider splitting task if repeated) | @@ -251,7 +292,7 @@ Each error category has a default recovery strategy: **JSON corruption**: If `harness-tasks.json` cannot be parsed, check for `harness-tasks.json.bak` (written before each modification). If backup exists and is valid, restore from it. If no valid backup, log `ERROR [ENV_SETUP] harness-tasks.json corrupted and unrecoverable` and STOP — task metadata (validation commands, dependencies, cleanup) cannot be reconstructed from logs alone. -**Backup protocol**: Before every write to `harness-tasks.json`, copy the current file to `harness-tasks.json.bak`. +**Backup protocol**: Before every write to `harness-tasks.json`, copy the current file to `harness-tasks.json.bak`. Write updates atomically: write JSON to `harness-tasks.json.tmp` then `mv` it into place (readers should never see a partial file). ## Environment Initialization @@ -279,7 +320,7 @@ All log entries use grep-friendly format on a single line: Types: `INIT`, `Starting`, `Completed`, `ERROR`, `CHECKPOINT`, `ROLLBACK`, `RECOVERY`, `STATS`, `LOCK`, `WARN` -Error categories: `ENV_SETUP`, `TASK_EXEC`, `TEST_FAIL`, `TIMEOUT`, `DEPENDENCY`, `SESSION_TIMEOUT` +Error categories: `ENV_SETUP`, `CONFIG`, `TASK_EXEC`, `TEST_FAIL`, `TIMEOUT`, `DEPENDENCY`, `SESSION_TIMEOUT` Filtering: ```bash @@ -293,7 +334,7 @@ grep "RECOVERY" harness-progress.txt # All recovery actions ## Session Statistics -At session end, update `harness-tasks.json`: increment `session_count`, set `last_session` to current timestamp. Then append: +At session end, update `harness-tasks.json`: set `last_session` to current timestamp. (Do NOT increment `session_count` here — it is incremented at Session Start.) Then append: ``` [timestamp] [SESSION-N] STATS tasks_total=10 completed=7 failed=1 pending=2 blocked=0 attempts_total=12 checkpoints=23 @@ -321,9 +362,11 @@ Does NOT acquire the lock (read-only operation). ## Add Command (`/harness add`) -Append a new task to `harness-tasks.json` with auto-incremented id (`task-NNN`), status `pending`, default `max_attempts: 3`, empty `depends_on`, and no validation command. Prompt user for optional fields: `priority`, `depends_on`, `validation.command`, `timeout_seconds`. Requires lock acquisition (modifies JSON). +Append a new task to `harness-tasks.json` with auto-incremented id (`task-NNN`), status `pending`, default `max_attempts: 3`, empty `depends_on`, and no validation command (required before the task can be completed). Prompt user for optional fields: `priority`, `depends_on`, `validation.command`, `timeout_seconds`. Requires lock acquisition (modifies JSON). ## Tool Dependencies Requires: Bash, file read/write, git. All harness operations must be executed from the project root directory. Does NOT require: specific MCP servers, programming languages, or test frameworks. + +Concurrent mode requires isolated working directories (`git worktree` or separate clones). Do not run concurrent workers in the same working tree. diff --git a/skills/harness/hooks/_harness_common.py b/skills/harness/hooks/_harness_common.py new file mode 100644 index 0000000..aa7da4e --- /dev/null +++ b/skills/harness/hooks/_harness_common.py @@ -0,0 +1,410 @@ +"""Shared utilities for harness hooks. + +Consolidates duplicated logic: payload reading, state root discovery, +JSON I/O, lock primitives, task eligibility, and ISO time helpers. + +Ported from Codex harness hooks to Claude Code. +""" +from __future__ import annotations + +import datetime as _dt +import hashlib +import json +import os +import shutil +import sys +import time +from pathlib import Path +from typing import Any, Optional + +# --------------------------------------------------------------------------- +# Time helpers +# --------------------------------------------------------------------------- + +def utc_now() -> _dt.datetime: + return _dt.datetime.now(tz=_dt.timezone.utc) + + +def iso_z(dt: _dt.datetime) -> str: + dt = dt.astimezone(_dt.timezone.utc).replace(microsecond=0) + return dt.isoformat().replace("+00:00", "Z") + + +def parse_iso(ts: Any) -> Optional[_dt.datetime]: + if not isinstance(ts, str) or not ts.strip(): + return None + s = ts.strip() + if s.endswith("Z"): + s = s[:-1] + "+00:00" + try: + dt = _dt.datetime.fromisoformat(s) + except Exception: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=_dt.timezone.utc) + return dt.astimezone(_dt.timezone.utc) + + +# --------------------------------------------------------------------------- +# Hook payload +# --------------------------------------------------------------------------- + +def read_hook_payload() -> dict[str, Any]: + """Read JSON payload from stdin (sent by Claude Code to command hooks).""" + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def maybe_log_hook_event(root: Path, payload: dict[str, Any], hook_script: str) -> None: + """Optionally append a compact hook execution record to HARNESS_HOOK_LOG. + + This is opt-in debugging: when HARNESS_HOOK_LOG is unset, it is a no-op. + Call this only after the .harness-active guard passes. + """ + log_path = os.environ.get("HARNESS_HOOK_LOG") + if not log_path: + return + + entry: dict[str, Any] = { + "ts": iso_z(utc_now()), + "hook_script": hook_script, + "hook_event_name": payload.get("hook_event_name"), + "harness_root": str(root), + } + for k in ( + "session_id", + "cwd", + "source", + "reason", + "teammate_name", + "team_name", + "agent_id", + "agent_type", + "stop_hook_active", + ): + if k in payload: + entry[k] = payload.get(k) + + try: + Path(log_path).expanduser().parent.mkdir(parents=True, exist_ok=True) + with Path(log_path).expanduser().open("a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + except Exception: + return + + +# --------------------------------------------------------------------------- +# State root discovery +# --------------------------------------------------------------------------- + +def find_harness_root(payload: dict[str, Any]) -> Optional[Path]: + """Locate the directory containing harness-tasks.json. + + Search order: + 1. HARNESS_STATE_ROOT env var + 2. CLAUDE_PROJECT_DIR env var (+ parents) + 3. payload["cwd"] / os.getcwd() (+ parents) + """ + env_root = os.environ.get("HARNESS_STATE_ROOT") + if env_root: + p = Path(env_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + + candidates: list[Path] = [] + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def is_harness_active(root: Path) -> bool: + """True when .harness-active marker exists (hooks are live).""" + return (root / ".harness-active").is_file() + + +# --------------------------------------------------------------------------- +# JSON I/O +# --------------------------------------------------------------------------- + +def load_json(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError(f"{path.name} must be a JSON object") + return data + + +def atomic_write_json(path: Path, data: dict[str, Any]) -> None: + """Write JSON atomically: backup -> tmp -> rename.""" + bak = path.with_name(f"{path.name}.bak") + tmp = path.with_name(f"{path.name}.tmp") + shutil.copy2(path, bak) + tmp.write_text( + json.dumps(data, ensure_ascii=False, indent=2) + "\n", + encoding="utf-8", + ) + os.replace(tmp, path) + + +def tail_text(path: Path, max_bytes: int = 200_000) -> str: + """Read the last max_bytes of a text file.""" + with path.open("rb") as f: + try: + f.seek(0, os.SEEK_END) + size = f.tell() + f.seek(max(0, size - max_bytes), os.SEEK_SET) + except Exception: + f.seek(0, os.SEEK_SET) + chunk = f.read() + return chunk.decode("utf-8", errors="replace") + + +# --------------------------------------------------------------------------- +# Lock primitives (mkdir-based, POSIX-portable) +# --------------------------------------------------------------------------- + +def lockdir_for_root(root: Path) -> Path: + h = hashlib.sha256(str(root).encode("utf-8")).hexdigest()[:16] + return Path("/tmp") / f"harness-{h}.lock" + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except Exception: + return False + + +def _read_pid(lockdir: Path) -> Optional[int]: + try: + raw = (lockdir / "pid").read_text("utf-8").strip() + return int(raw) if raw else None + except Exception: + return None + + +def acquire_lock(lockdir: Path, timeout_seconds: float = 5.0) -> None: + deadline = time.time() + timeout_seconds + missing_pid_since: Optional[float] = None + while True: + try: + lockdir.mkdir(mode=0o700) + (lockdir / "pid").write_text(str(os.getpid()), encoding="utf-8") + return + except FileExistsError: + pid = _read_pid(lockdir) + if pid is None: + if missing_pid_since is None: + missing_pid_since = time.time() + if time.time() - missing_pid_since < 1.0: + if time.time() >= deadline: + raise TimeoutError("lock busy (pid missing)") + time.sleep(0.05) + continue + else: + missing_pid_since = None + if _pid_alive(pid): + if time.time() >= deadline: + raise TimeoutError(f"lock busy (pid={pid})") + time.sleep(0.05) + continue + + stale = lockdir.with_name( + f"{lockdir.name}.stale.{os.getpid()}.{int(time.time())}" + ) + try: + lockdir.rename(stale) + except Exception: + if time.time() >= deadline: + raise TimeoutError("lock contention") + time.sleep(0.05) + continue + shutil.rmtree(stale, ignore_errors=True) + missing_pid_since = None + continue + + +def release_lock(lockdir: Path) -> None: + shutil.rmtree(lockdir, ignore_errors=True) + + +# --------------------------------------------------------------------------- +# Task helpers +# --------------------------------------------------------------------------- + +def priority_rank(v: Any) -> int: + return {"P0": 0, "P1": 1, "P2": 2}.get(str(v or ""), 9) + + +def task_attempts(t: dict[str, Any]) -> int: + try: + return int(t.get("attempts") or 0) + except Exception: + return 0 + + +def task_max_attempts(t: dict[str, Any]) -> int: + try: + v = t.get("max_attempts") + return int(v) if v is not None else 3 + except Exception: + return 3 + + +def deps_completed(t: dict[str, Any], completed_ids: set[str]) -> bool: + deps = t.get("depends_on") or [] + if not isinstance(deps, list): + return False + return all(str(d) in completed_ids for d in deps) + + +def parse_tasks(state: dict[str, Any]) -> list[dict[str, Any]]: + """Extract validated task list from state dict.""" + tasks_raw = state.get("tasks") or [] + if not isinstance(tasks_raw, list): + raise ValueError("tasks must be a list") + return [t for t in tasks_raw if isinstance(t, dict)] + + +def completed_ids(tasks: list[dict[str, Any]]) -> set[str]: + return { + str(t.get("id", "")) + for t in tasks + if str(t.get("status", "")) == "completed" + } + + +def eligible_tasks( + tasks: list[dict[str, Any]], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Return (pending_eligible, retryable) sorted by priority then id.""" + done = completed_ids(tasks) + + pending = [ + t for t in tasks + if str(t.get("status", "")) == "pending" and deps_completed(t, done) + ] + retry = [ + t for t in tasks + if str(t.get("status", "")) == "failed" + and task_attempts(t) < task_max_attempts(t) + and deps_completed(t, done) + ] + + def key(t: dict[str, Any]) -> tuple[int, str]: + return (priority_rank(t.get("priority")), str(t.get("id", ""))) + + pending.sort(key=key) + retry.sort(key=key) + return pending, retry + + +def pick_next( + pending: list[dict[str, Any]], retry: list[dict[str, Any]] +) -> Optional[dict[str, Any]]: + return pending[0] if pending else (retry[0] if retry else None) + + +def status_counts(tasks: list[dict[str, Any]]) -> dict[str, int]: + counts: dict[str, int] = {} + for t in tasks: + s = str(t.get("status") or "pending") + counts[s] = counts.get(s, 0) + 1 + return counts + + +def reap_stale_leases( + tasks: list[dict[str, Any]], now: _dt.datetime +) -> bool: + """Reset in_progress tasks with expired leases to failed. Returns True if any changed.""" + changed = False + for t in tasks: + if str(t.get("status", "")) != "in_progress": + continue + exp = parse_iso(t.get("lease_expires_at")) + if exp is None or exp > now: + continue + + t["attempts"] = task_attempts(t) + 1 + err = f"[SESSION_TIMEOUT] Lease expired (claimed_by={t.get('claimed_by')})" + log = t.get("error_log") + if isinstance(log, list): + log.append(err) + else: + t["error_log"] = [err] + + t["status"] = "failed" + t.pop("claimed_by", None) + t.pop("lease_expires_at", None) + t.pop("claimed_at", None) + changed = True + return changed + + +# --------------------------------------------------------------------------- +# Session config helpers +# --------------------------------------------------------------------------- + +def get_session_config(state: dict[str, Any]) -> dict[str, Any]: + cfg = state.get("session_config") or {} + return cfg if isinstance(cfg, dict) else {} + + +def is_concurrent(cfg: dict[str, Any]) -> bool: + return str(cfg.get("concurrency_mode") or "exclusive") == "concurrent" + + +# --------------------------------------------------------------------------- +# Hook output helpers +# --------------------------------------------------------------------------- + +def emit_block(reason: str) -> None: + """Print a JSON block decision to stdout and exit 0.""" + print(json.dumps({"decision": "block", "reason": reason}, ensure_ascii=False)) + + +def emit_allow(reason: str = "") -> None: + """Print a JSON allow decision to stdout and exit 0.""" + out: dict[str, Any] = {"decision": "allow"} + if reason: + out["reason"] = reason + print(json.dumps(out, ensure_ascii=False)) + + +def emit_context(context: str) -> None: + """Inject additional context via hookSpecificOutput.""" + print(json.dumps( + {"hookSpecificOutput": {"additionalContext": context}}, + ensure_ascii=False, + )) + + +def emit_json(data: dict[str, Any]) -> None: + """Print arbitrary JSON to stdout.""" + print(json.dumps(data, ensure_ascii=False)) diff --git a/skills/harness/hooks/harness-claim.py b/skills/harness/hooks/harness-claim.py new file mode 100755 index 0000000..bf20a4c --- /dev/null +++ b/skills/harness/hooks/harness-claim.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import datetime as _dt +import hashlib +import json +import os +import shutil +import socket +import sys +import time +from pathlib import Path +from typing import Any, Optional + + +def _utc_now() -> _dt.datetime: + return _dt.datetime.now(tz=_dt.timezone.utc) + + +def _iso_z(dt: _dt.datetime) -> str: + dt = dt.astimezone(_dt.timezone.utc).replace(microsecond=0) + return dt.isoformat().replace("+00:00", "Z") + + +def _parse_iso(ts: Any) -> Optional[_dt.datetime]: + if not isinstance(ts, str) or not ts.strip(): + return None + s = ts.strip() + if s.endswith("Z"): + s = s[:-1] + "+00:00" + try: + dt = _dt.datetime.fromisoformat(s) + except Exception: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=_dt.timezone.utc) + return dt.astimezone(_dt.timezone.utc) + + +def _read_payload() -> dict[str, Any]: + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def _find_state_root(payload: dict[str, Any]) -> Optional[Path]: + state_root = os.environ.get("HARNESS_STATE_ROOT") + if state_root: + p = Path(state_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + + candidates: list[Path] = [] + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def _lockdir_for_root(root: Path) -> Path: + h = hashlib.sha256(str(root).encode("utf-8")).hexdigest()[:16] + return Path("/tmp") / f"harness-{h}.lock" + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except Exception: + return False + + +def _read_pid(lockdir: Path) -> Optional[int]: + try: + raw = (lockdir / "pid").read_text("utf-8").strip() + return int(raw) if raw else None + except Exception: + return None + + +def _acquire_lock(lockdir: Path, timeout_seconds: float) -> None: + deadline = time.time() + timeout_seconds + missing_pid_since: Optional[float] = None + while True: + try: + lockdir.mkdir(mode=0o700) + (lockdir / "pid").write_text(str(os.getpid()), encoding="utf-8") + return + except FileExistsError: + pid = _read_pid(lockdir) + if pid is None: + if missing_pid_since is None: + missing_pid_since = time.time() + if time.time() - missing_pid_since < 1.0: + if time.time() >= deadline: + raise TimeoutError("lock busy (pid missing)") + time.sleep(0.05) + continue + else: + missing_pid_since = None + if _pid_alive(pid): + if time.time() >= deadline: + raise TimeoutError(f"lock busy (pid={pid})") + time.sleep(0.05) + continue + + stale = lockdir.with_name(f"{lockdir.name}.stale.{os.getpid()}.{int(time.time())}") + try: + lockdir.rename(stale) + except Exception: + if time.time() >= deadline: + raise TimeoutError("lock contention") + time.sleep(0.05) + continue + shutil.rmtree(stale, ignore_errors=True) + missing_pid_since = None + continue + + +def _release_lock(lockdir: Path) -> None: + shutil.rmtree(lockdir, ignore_errors=True) + + +def _load_state(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError("harness-tasks.json must be an object") + return data + + +def _atomic_write_json(path: Path, data: dict[str, Any]) -> None: + bak = path.with_name(f"{path.name}.bak") + tmp = path.with_name(f"{path.name}.tmp") + shutil.copy2(path, bak) + tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + os.replace(tmp, path) + + +def _priority_rank(v: Any) -> int: + return {"P0": 0, "P1": 1, "P2": 2}.get(str(v or ""), 9) + + +def _eligible_tasks(tasks: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + completed = {str(t.get("id", "")) for t in tasks if str(t.get("status", "")) == "completed"} + + def deps_ok(t: dict[str, Any]) -> bool: + deps = t.get("depends_on") or [] + if not isinstance(deps, list): + return False + return all(str(d) in completed for d in deps) + + def attempts(t: dict[str, Any]) -> int: + try: + return int(t.get("attempts") or 0) + except Exception: + return 0 + + def max_attempts(t: dict[str, Any]) -> int: + try: + v = t.get("max_attempts") + return int(v) if v is not None else 3 + except Exception: + return 3 + + pending = [t for t in tasks if str(t.get("status", "")) == "pending" and deps_ok(t)] + retry = [ + t + for t in tasks + if str(t.get("status", "")) == "failed" + and attempts(t) < max_attempts(t) + and deps_ok(t) + ] + + def key(t: dict[str, Any]) -> tuple[int, str]: + return (_priority_rank(t.get("priority")), str(t.get("id", ""))) + + pending.sort(key=key) + retry.sort(key=key) + return pending, retry + + +def _reap_stale_leases(tasks: list[dict[str, Any]], now: _dt.datetime) -> bool: + changed = False + for t in tasks: + if str(t.get("status", "")) != "in_progress": + continue + exp = _parse_iso(t.get("lease_expires_at")) + if exp is None or exp > now: + continue + + try: + t["attempts"] = int(t.get("attempts") or 0) + 1 + except Exception: + t["attempts"] = 1 + + err = f"[SESSION_TIMEOUT] Lease expired (claimed_by={t.get('claimed_by')})" + log = t.get("error_log") + if isinstance(log, list): + log.append(err) + else: + t["error_log"] = [err] + + t["status"] = "failed" + t.pop("claimed_by", None) + t.pop("lease_expires_at", None) + t.pop("claimed_at", None) + changed = True + return changed + + +def main() -> int: + payload = _read_payload() + root = _find_state_root(payload) + if root is None: + print(json.dumps({"claimed": False, "error": "state root not found"}, ensure_ascii=False)) + return 0 + + tasks_path = root / "harness-tasks.json" + lockdir = _lockdir_for_root(root) + + timeout_s = float(os.environ.get("HARNESS_LOCK_TIMEOUT_SECONDS") or "5") + _acquire_lock(lockdir, timeout_s) + try: + state = _load_state(tasks_path) + session_config = state.get("session_config") or {} + if not isinstance(session_config, dict): + session_config = {} + concurrency_mode = str(session_config.get("concurrency_mode") or "exclusive") + is_concurrent = concurrency_mode == "concurrent" + tasks_raw = state.get("tasks") or [] + if not isinstance(tasks_raw, list): + raise ValueError("tasks must be a list") + tasks = [t for t in tasks_raw if isinstance(t, dict)] + + now = _utc_now() + if _reap_stale_leases(tasks, now): + state["tasks"] = tasks + _atomic_write_json(tasks_path, state) + + pending, retry = _eligible_tasks(tasks) + task = pending[0] if pending else (retry[0] if retry else None) + if task is None: + print(json.dumps({"claimed": False}, ensure_ascii=False)) + return 0 + + worker_id = os.environ.get("HARNESS_WORKER_ID") or "" + if is_concurrent and not worker_id: + print(json.dumps({"claimed": False, "error": "missing HARNESS_WORKER_ID"}, ensure_ascii=False)) + return 0 + if not worker_id: + worker_id = f"{socket.gethostname()}:{os.getpid()}" + lease_seconds = int(os.environ.get("HARNESS_LEASE_SECONDS") or "1800") + exp = now + _dt.timedelta(seconds=lease_seconds) + + task["status"] = "in_progress" + task["claimed_by"] = worker_id + task["claimed_at"] = _iso_z(now) + task["lease_expires_at"] = _iso_z(exp) + state["tasks"] = tasks + _atomic_write_json(tasks_path, state) + + out = { + "claimed": True, + "worker_id": worker_id, + "task_id": str(task.get("id") or ""), + "title": str(task.get("title") or ""), + "lease_expires_at": task["lease_expires_at"], + } + print(json.dumps(out, ensure_ascii=False)) + return 0 + finally: + _release_lock(lockdir) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/harness/hooks/harness-renew.py b/skills/harness/hooks/harness-renew.py new file mode 100755 index 0000000..de493e0 --- /dev/null +++ b/skills/harness/hooks/harness-renew.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import datetime as _dt +import hashlib +import json +import os +import shutil +import sys +import time +from pathlib import Path +from typing import Any, Optional + + +def _utc_now() -> _dt.datetime: + return _dt.datetime.now(tz=_dt.timezone.utc) + + +def _iso_z(dt: _dt.datetime) -> str: + dt = dt.astimezone(_dt.timezone.utc).replace(microsecond=0) + return dt.isoformat().replace("+00:00", "Z") + + +def _read_payload() -> dict[str, Any]: + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def _find_state_root(payload: dict[str, Any]) -> Optional[Path]: + state_root = os.environ.get("HARNESS_STATE_ROOT") + if state_root: + p = Path(state_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + + candidates: list[Path] = [] + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def _lockdir_for_root(root: Path) -> Path: + h = hashlib.sha256(str(root).encode("utf-8")).hexdigest()[:16] + return Path("/tmp") / f"harness-{h}.lock" + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except Exception: + return False + + +def _read_pid(lockdir: Path) -> Optional[int]: + try: + raw = (lockdir / "pid").read_text("utf-8").strip() + return int(raw) if raw else None + except Exception: + return None + + +def _acquire_lock(lockdir: Path, timeout_seconds: float) -> None: + deadline = time.time() + timeout_seconds + missing_pid_since: Optional[float] = None + while True: + try: + lockdir.mkdir(mode=0o700) + (lockdir / "pid").write_text(str(os.getpid()), encoding="utf-8") + return + except FileExistsError: + pid = _read_pid(lockdir) + if pid is None: + if missing_pid_since is None: + missing_pid_since = time.time() + if time.time() - missing_pid_since < 1.0: + if time.time() >= deadline: + raise TimeoutError("lock busy (pid missing)") + time.sleep(0.05) + continue + else: + missing_pid_since = None + if _pid_alive(pid): + if time.time() >= deadline: + raise TimeoutError(f"lock busy (pid={pid})") + time.sleep(0.05) + continue + + stale = lockdir.with_name(f"{lockdir.name}.stale.{os.getpid()}.{int(time.time())}") + try: + lockdir.rename(stale) + except Exception: + if time.time() >= deadline: + raise TimeoutError("lock contention") + time.sleep(0.05) + continue + shutil.rmtree(stale, ignore_errors=True) + missing_pid_since = None + continue + + +def _release_lock(lockdir: Path) -> None: + shutil.rmtree(lockdir, ignore_errors=True) + + +def _load_state(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError("harness-tasks.json must be an object") + return data + + +def _atomic_write_json(path: Path, data: dict[str, Any]) -> None: + bak = path.with_name(f"{path.name}.bak") + tmp = path.with_name(f"{path.name}.tmp") + shutil.copy2(path, bak) + tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + os.replace(tmp, path) + + +def main() -> int: + payload = _read_payload() + root = _find_state_root(payload) + if root is None: + print(json.dumps({"renewed": False, "error": "state root not found"}, ensure_ascii=False)) + return 0 + + task_id = os.environ.get("HARNESS_TASK_ID") or str(payload.get("task_id") or "").strip() + if not task_id: + print(json.dumps({"renewed": False, "error": "missing task_id"}, ensure_ascii=False)) + return 0 + + worker_id = os.environ.get("HARNESS_WORKER_ID") or "" + if not worker_id: + print(json.dumps({"renewed": False, "error": "missing HARNESS_WORKER_ID"}, ensure_ascii=False)) + return 0 + lease_seconds = int(os.environ.get("HARNESS_LEASE_SECONDS") or "1800") + + tasks_path = root / "harness-tasks.json" + lockdir = _lockdir_for_root(root) + + timeout_s = float(os.environ.get("HARNESS_LOCK_TIMEOUT_SECONDS") or "5") + try: + _acquire_lock(lockdir, timeout_s) + except Exception as e: + print(json.dumps({"renewed": False, "error": str(e)}, ensure_ascii=False)) + return 0 + + try: + state = _load_state(tasks_path) + tasks_raw = state.get("tasks") or [] + if not isinstance(tasks_raw, list): + raise ValueError("tasks must be a list") + tasks = [t for t in tasks_raw if isinstance(t, dict)] + + task = next((t for t in tasks if str(t.get("id") or "") == task_id), None) + if task is None: + print(json.dumps({"renewed": False, "error": "task not found"}, ensure_ascii=False)) + return 0 + + if str(task.get("status") or "") != "in_progress": + print(json.dumps({"renewed": False, "error": "task not in_progress"}, ensure_ascii=False)) + return 0 + + claimed_by = str(task.get("claimed_by") or "") + if claimed_by and claimed_by != worker_id: + print(json.dumps({"renewed": False, "error": "task owned by other worker"}, ensure_ascii=False)) + return 0 + + now = _utc_now() + exp = now + _dt.timedelta(seconds=lease_seconds) + task["lease_expires_at"] = _iso_z(exp) + task["claimed_by"] = worker_id + state["tasks"] = tasks + _atomic_write_json(tasks_path, state) + + print(json.dumps({"renewed": True, "task_id": task_id, "lease_expires_at": task["lease_expires_at"]}, ensure_ascii=False)) + return 0 + except Exception as e: + print(json.dumps({"renewed": False, "error": str(e)}, ensure_ascii=False)) + return 0 + finally: + _release_lock(lockdir) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/harness/hooks/harness-sessionstart.py b/skills/harness/hooks/harness-sessionstart.py new file mode 100755 index 0000000..ee8bc61 --- /dev/null +++ b/skills/harness/hooks/harness-sessionstart.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from typing import Any, Optional + + +def _read_hook_payload() -> dict[str, Any]: + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {"_invalid_json": True} + + +def _find_harness_root(payload: dict[str, Any]) -> Optional[Path]: + state_root = os.environ.get("HARNESS_STATE_ROOT") + if state_root: + p = Path(state_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + + candidates: list[Path] = [] + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def _load_json(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError(f"{path.name} must be a JSON object") + return data + + +def _tail_text(path: Path, max_bytes: int = 8192) -> str: + with path.open("rb") as f: + try: + f.seek(0, os.SEEK_END) + size = f.tell() + f.seek(max(0, size - max_bytes), os.SEEK_SET) + except Exception: + f.seek(0, os.SEEK_SET) + chunk = f.read() + return chunk.decode("utf-8", errors="replace") + + +def _priority_rank(v: Any) -> int: + return {"P0": 0, "P1": 1, "P2": 2}.get(str(v or ""), 9) + + +def _pick_next_eligible(tasks: list[dict[str, Any]]) -> Optional[dict[str, Any]]: + completed = {str(t.get("id", "")) for t in tasks if str(t.get("status", "")) == "completed"} + + def deps_ok(t: dict[str, Any]) -> bool: + deps = t.get("depends_on") or [] + if not isinstance(deps, list): + return False + return all(str(d) in completed for d in deps) + + def attempts(t: dict[str, Any]) -> int: + try: + return int(t.get("attempts") or 0) + except Exception: + return 0 + + def max_attempts(t: dict[str, Any]) -> int: + try: + v = t.get("max_attempts") + return int(v) if v is not None else 3 + except Exception: + return 3 + + pending = [t for t in tasks if str(t.get("status", "")) == "pending" and deps_ok(t)] + retry = [ + t + for t in tasks + if str(t.get("status", "")) == "failed" + and attempts(t) < max_attempts(t) + and deps_ok(t) + ] + + def key(t: dict[str, Any]) -> tuple[int, str]: + return (_priority_rank(t.get("priority")), str(t.get("id", ""))) + + pending.sort(key=key) + retry.sort(key=key) + return pending[0] if pending else (retry[0] if retry else None) + + +def _is_harness_active(root: Path) -> bool: + """Check if harness skill is actively running (marker file exists).""" + return (root / ".harness-active").is_file() + + +def main() -> int: + payload = _read_hook_payload() + root = _find_harness_root(payload) + if root is None: + return 0 + + # Guard: only active when harness skill is triggered + if not _is_harness_active(root): + return 0 + + tasks_path = root / "harness-tasks.json" + progress_path = root / "harness-progress.txt" + + try: + state = _load_json(tasks_path) + tasks_raw = state.get("tasks") or [] + if not isinstance(tasks_raw, list): + raise ValueError("tasks must be a list") + tasks = [t for t in tasks_raw if isinstance(t, dict)] + except Exception as e: + context = f"HARNESS: CONFIG error: cannot read {tasks_path.name}: {e}" + print(json.dumps({"hookSpecificOutput": {"additionalContext": context}}, ensure_ascii=False)) + return 0 + + counts: dict[str, int] = {} + for t in tasks: + s = str(t.get("status") or "pending") + counts[s] = counts.get(s, 0) + 1 + + next_task = _pick_next_eligible(tasks) + next_hint = "" + if next_task is not None: + tid = str(next_task.get("id") or "") + title = str(next_task.get("title") or "").strip() + next_hint = f" next={tid}{(': ' + title) if title else ''}" + + last_stats = "" + if progress_path.is_file(): + tail = _tail_text(progress_path) + lines = [ln.strip() for ln in tail.splitlines() if ln.strip()] + for ln in reversed(lines[-200:]): + if " STATS " in f" {ln} " or ln.endswith(" STATS"): + last_stats = ln + break + if not last_stats and lines: + last_stats = lines[-1] + if len(last_stats) > 220: + last_stats = last_stats[:217] + "..." + + summary = ( + "HARNESS: " + + " ".join(f"{k}={v}" for k, v in sorted(counts.items())) + + f" total={len(tasks)}" + + next_hint + ).strip() + if last_stats: + summary += f"\nHARNESS: last_log={last_stats}" + + print(json.dumps({"hookSpecificOutput": {"additionalContext": summary}}, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/harness/hooks/harness-stop.py b/skills/harness/hooks/harness-stop.py new file mode 100755 index 0000000..909c491 --- /dev/null +++ b/skills/harness/hooks/harness-stop.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python3 +"""Harness Stop hook — blocks Claude from stopping when eligible tasks remain. + +Uses `stop_hook_active` field and a consecutive-block counter to prevent +infinite loops. If the hook blocks N times in a row without any task +completing, it allows the stop with a warning. +""" +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from typing import Any, Optional + +MAX_CONSECUTIVE_BLOCKS = 8 # safety valve + + +def _read_hook_payload() -> dict[str, Any]: + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {"_invalid_json": True} + + +def _find_harness_root(payload: dict[str, Any]) -> Optional[Path]: + state_root = os.environ.get("HARNESS_STATE_ROOT") + if state_root: + p = Path(state_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + + candidates: list[Path] = [] + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def _load_json(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError(f"{path.name} must be a JSON object") + return data + + +def _tail_text(path: Path, max_bytes: int = 200_000) -> str: + with path.open("rb") as f: + try: + f.seek(0, os.SEEK_END) + size = f.tell() + f.seek(max(0, size - max_bytes), os.SEEK_SET) + except Exception: + f.seek(0, os.SEEK_SET) + chunk = f.read() + return chunk.decode("utf-8", errors="replace") + + +def _priority_rank(v: Any) -> int: + return {"P0": 0, "P1": 1, "P2": 2}.get(str(v or ""), 9) + + +def _deps_completed(t: dict[str, Any], completed: set[str]) -> bool: + deps = t.get("depends_on") or [] + if not isinstance(deps, list): + return False + return all(str(d) in completed for d in deps) + + +def _attempts(t: dict[str, Any]) -> int: + try: + return int(t.get("attempts") or 0) + except Exception: + return 0 + + +def _max_attempts(t: dict[str, Any]) -> int: + try: + v = t.get("max_attempts") + return int(v) if v is not None else 3 + except Exception: + return 3 + + +def _pick_next(pending: list[dict[str, Any]], retry: list[dict[str, Any]]) -> Optional[dict[str, Any]]: + def key(t: dict[str, Any]) -> tuple[int, str]: + return (_priority_rank(t.get("priority")), str(t.get("id", ""))) + pending.sort(key=key) + retry.sort(key=key) + return pending[0] if pending else (retry[0] if retry else None) + + +def _block_counter_path(root: Path) -> Path: + return root / ".harness-stop-counter" + + +def _read_block_counter(root: Path) -> tuple[int, int]: + """Returns (consecutive_blocks, last_completed_count).""" + p = _block_counter_path(root) + try: + raw = p.read_text("utf-8").strip() + parts = raw.split(",") + return int(parts[0]), int(parts[1]) if len(parts) > 1 else 0 + except Exception: + return 0, 0 + + +def _write_block_counter(root: Path, blocks: int, completed: int) -> None: + p = _block_counter_path(root) + tmp = p.with_name(f"{p.name}.tmp.{os.getpid()}") + try: + tmp.write_text(f"{blocks},{completed}", encoding="utf-8") + os.replace(tmp, p) + except Exception: + try: + tmp.unlink(missing_ok=True) + except Exception: + pass + + +def _reset_block_counter(root: Path) -> None: + p = _block_counter_path(root) + try: + p.unlink(missing_ok=True) + except Exception: + pass + + +def _is_harness_active(root: Path) -> bool: + """Check if harness skill is actively running (marker file exists).""" + return (root / ".harness-active").is_file() + + +def main() -> int: + payload = _read_hook_payload() + + # Safety: if stop_hook_active is True, Claude is already continuing + # from a previous Stop hook block. Check if we should allow stop + # to prevent infinite loops. + stop_hook_active = payload.get("stop_hook_active", False) + + root = _find_harness_root(payload) + if root is None: + return 0 # no harness project, allow stop + + # Guard: only active when harness skill is triggered + if not _is_harness_active(root): + return 0 + + tasks_path = root / "harness-tasks.json" + progress_path = root / "harness-progress.txt" + try: + state = _load_json(tasks_path) + tasks_raw = state.get("tasks") or [] + if not isinstance(tasks_raw, list): + raise ValueError("tasks must be a list") + tasks = [t for t in tasks_raw if isinstance(t, dict)] + except Exception as e: + if stop_hook_active: + sys.stderr.write( + "HARNESS: WARN — harness-tasks.json 无法解析且 stop_hook_active=True," + "为避免无限循环,本次允许停止。\n" + ) + return 0 + reason = ( + "HARNESS: 检测到配置损坏,无法解析 harness-tasks.json。\n" + f"HARNESS: error={e}\n" + "按 SKILL.md 的 JSON corruption 恢复:优先用 harness-tasks.json.bak 还原;无法还原则停止并要求人工修复。" + ) + print(json.dumps({"decision": "block", "reason": reason}, ensure_ascii=False)) + return 0 + + session_config = state.get("session_config") or {} + if not isinstance(session_config, dict): + session_config = {} + + concurrency_mode = str(session_config.get("concurrency_mode") or "exclusive") + is_concurrent = concurrency_mode == "concurrent" + worker_id = os.environ.get("HARNESS_WORKER_ID") or None + + # Check session limits + try: + session_count = int(state.get("session_count") or 0) + except Exception: + session_count = 0 + try: + max_sessions = int(session_config.get("max_sessions") or 0) + except Exception: + max_sessions = 0 + if max_sessions > 0 and session_count >= max_sessions: + _reset_block_counter(root) + return 0 # session limit reached, allow stop + + # Check per-session task limit + try: + max_tasks_per_session = int(session_config.get("max_tasks_per_session") or 0) + except Exception: + max_tasks_per_session = 0 + if not is_concurrent and max_tasks_per_session > 0 and session_count > 0 and progress_path.is_file(): + tail = _tail_text(progress_path) + tag = f"[SESSION-{session_count}]" + finished = 0 + for ln in tail.splitlines(): + if tag not in ln: + continue + if " Completed [" in ln or (" ERROR [" in ln and "[task-" in ln): + finished += 1 + if finished >= max_tasks_per_session: + _reset_block_counter(root) + return 0 # per-session limit reached, allow stop + + # Compute eligible tasks + counts: dict[str, int] = {} + for t in tasks: + s = str(t.get("status") or "pending") + counts[s] = counts.get(s, 0) + 1 + + completed_ids = {str(t.get("id", "")) for t in tasks if str(t.get("status", "")) == "completed"} + completed_count = len(completed_ids) + + pending_eligible = [t for t in tasks if str(t.get("status", "")) == "pending" and _deps_completed(t, completed_ids)] + retryable = [ + t for t in tasks + if str(t.get("status", "")) == "failed" + and _attempts(t) < _max_attempts(t) + and _deps_completed(t, completed_ids) + ] + in_progress_any = [t for t in tasks if str(t.get("status", "")) == "in_progress"] + if is_concurrent and worker_id: + in_progress_blocking = [ + t for t in in_progress_any + if str(t.get("claimed_by") or "") == worker_id or not t.get("claimed_by") + ] + else: + in_progress_blocking = in_progress_any + + # If nothing left to do, allow stop + if not pending_eligible and not retryable and not in_progress_blocking: + _reset_block_counter(root) + try: + (root / ".harness-active").unlink(missing_ok=True) + except Exception: + pass + return 0 + + # Safety valve: track consecutive blocks without progress + prev_blocks, prev_completed = _read_block_counter(root) + if completed_count > prev_completed: + # Progress was made, reset counter + prev_blocks = 0 + consecutive = prev_blocks + 1 + _write_block_counter(root, consecutive, completed_count) + + if stop_hook_active and consecutive > MAX_CONSECUTIVE_BLOCKS: + # Too many consecutive blocks without progress — allow stop to prevent infinite loop + _reset_block_counter(root) + sys.stderr.write( + f"HARNESS: WARN — Stop hook blocked {consecutive} times without progress. " + "Allowing stop to prevent infinite loop. Check task definitions and validation commands.\n" + ) + return 0 + + # Block the stop — tasks remain + next_task = _pick_next(pending_eligible, retryable) + next_hint = "" + if next_task is not None: + tid = str(next_task.get("id") or "") + title = str(next_task.get("title") or "").strip() + next_hint = f"next={tid}{(': ' + title) if title else ''}" + + summary = ( + "HARNESS: 未满足停止条件,继续执行。\n" + + "HARNESS: " + + " ".join(f"{k}={v}" for k, v in sorted(counts.items())) + + f" total={len(tasks)}" + + (f" {next_hint}" if next_hint else "") + ).strip() + + reason = ( + summary + + "\n" + + "请按 SKILL.md 的 Task Selection Algorithm 选择下一个 eligible 任务,并完整执行 Task Execution Cycle:" + "Claim → Checkpoint → Validate → Record outcome → STATS(如需)→ Continue。" + ) + + print(json.dumps({"decision": "block", "reason": reason}, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/harness/hooks/harness-subagentstop.py b/skills/harness/hooks/harness-subagentstop.py new file mode 100755 index 0000000..5080018 --- /dev/null +++ b/skills/harness/hooks/harness-subagentstop.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +"""Harness SubagentStop hook — blocks subagents from stopping when they +have assigned harness tasks still in progress. + +Uses the same decision format as Stop hooks: + {"decision": "block", "reason": "..."} +""" +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from typing import Any, Optional + + +def _read_hook_payload() -> dict[str, Any]: + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def _find_harness_root(payload: dict[str, Any]) -> Optional[Path]: + state_root = os.environ.get("HARNESS_STATE_ROOT") + if state_root: + p = Path(state_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + candidates: list[Path] = [] + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def _load_json(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError(f"{path.name} must be a JSON object") + return data + + +def _is_harness_active(root: Path) -> bool: + """Check if harness skill is actively running (marker file exists).""" + return (root / ".harness-active").is_file() + + +def main() -> int: + payload = _read_hook_payload() + + # Safety: respect stop_hook_active to prevent infinite loops + if payload.get("stop_hook_active", False): + return 0 + + root = _find_harness_root(payload) + if root is None: + return 0 # no harness project, allow stop + + # Guard: only active when harness skill is triggered + if not _is_harness_active(root): + return 0 + + tasks_path = root / "harness-tasks.json" + try: + state = _load_json(tasks_path) + session_config = state.get("session_config") or {} + if not isinstance(session_config, dict): + session_config = {} + is_concurrent = str(session_config.get("concurrency_mode") or "exclusive") == "concurrent" + tasks_raw = state.get("tasks") or [] + if not isinstance(tasks_raw, list): + return 0 + tasks = [t for t in tasks_raw if isinstance(t, dict)] + except Exception: + return 0 + + in_progress = [t for t in tasks if str(t.get("status", "")) == "in_progress"] + worker_id = str(os.environ.get("HARNESS_WORKER_ID") or "").strip() + agent_id = str(payload.get("agent_id") or "").strip() + teammate_name = str(payload.get("teammate_name") or "").strip() + identities = {x for x in (worker_id, agent_id, teammate_name) if x} + + if is_concurrent and in_progress and not identities: + reason = ( + "HARNESS: concurrent 模式缺少 worker identity(HARNESS_WORKER_ID/agent_id)。" + "为避免误停导致任务悬空,本次阻止停止。" + ) + print(json.dumps({"decision": "block", "reason": reason}, ensure_ascii=False)) + return 0 + + if is_concurrent: + owned = [ + t for t in in_progress + if str(t.get("claimed_by") or "") in identities + ] if identities else [] + else: + owned = in_progress + + # Only block when this subagent still owns in-progress work. + if owned: + tid = str(owned[0].get("id") or "") + title = str(owned[0].get("title") or "") + reason = ( + f"HARNESS: 子代理仍有进行中的任务 [{tid}] {title}。" + "请完成当前任务的验证和记录后再停止。" + ) + print(json.dumps({"decision": "block", "reason": reason}, ensure_ascii=False)) + return 0 + + return 0 # all done, allow stop + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/harness/hooks/harness-teammateidle.py b/skills/harness/hooks/harness-teammateidle.py new file mode 100755 index 0000000..b1e5955 --- /dev/null +++ b/skills/harness/hooks/harness-teammateidle.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +"""Harness TeammateIdle hook — prevents teammates from going idle when +harness tasks remain eligible for execution. + +Exit code 2 + stderr message keeps the teammate working. +Exit code 0 allows the teammate to go idle. +""" +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from typing import Any, Optional + + +def _read_hook_payload() -> dict[str, Any]: + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def _find_harness_root(payload: dict[str, Any]) -> Optional[Path]: + state_root = os.environ.get("HARNESS_STATE_ROOT") + if state_root: + p = Path(state_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + candidates: list[Path] = [] + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def _load_json(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError(f"{path.name} must be a JSON object") + return data + + +def _priority_rank(v: Any) -> int: + return {"P0": 0, "P1": 1, "P2": 2}.get(str(v or ""), 9) + + +def _is_harness_active(root: Path) -> bool: + """Check if harness skill is actively running (marker file exists).""" + return (root / ".harness-active").is_file() + + +def main() -> int: + payload = _read_hook_payload() + root = _find_harness_root(payload) + if root is None: + return 0 # no harness project, allow idle + + # Guard: only active when harness skill is triggered + if not _is_harness_active(root): + return 0 + + tasks_path = root / "harness-tasks.json" + try: + state = _load_json(tasks_path) + tasks_raw = state.get("tasks") or [] + if not isinstance(tasks_raw, list): + return 0 + tasks = [t for t in tasks_raw if isinstance(t, dict)] + except Exception: + return 0 # can't read state, allow idle + + completed = {str(t.get("id", "")) for t in tasks if str(t.get("status", "")) == "completed"} + + def deps_ok(t: dict[str, Any]) -> bool: + deps = t.get("depends_on") or [] + if not isinstance(deps, list): + return False + return all(str(d) in completed for d in deps) + + def attempts(t: dict[str, Any]) -> int: + try: + return int(t.get("attempts") or 0) + except Exception: + return 0 + + def max_attempts(t: dict[str, Any]) -> int: + try: + v = t.get("max_attempts") + return int(v) if v is not None else 3 + except Exception: + return 3 + + pending = [t for t in tasks if str(t.get("status", "")) == "pending" and deps_ok(t)] + retryable = [ + t for t in tasks + if str(t.get("status", "")) == "failed" + and attempts(t) < max_attempts(t) + and deps_ok(t) + ] + def key(t: dict[str, Any]) -> tuple[int, str]: + return (_priority_rank(t.get("priority")), str(t.get("id", ""))) + pending.sort(key=key) + retryable.sort(key=key) + in_progress = [t for t in tasks if str(t.get("status", "")) == "in_progress"] + + # Check if this teammate owns any in-progress tasks + worker_id = os.environ.get("HARNESS_WORKER_ID") or "" + teammate_name = payload.get("teammate_name", "") + owned = [ + t for t in in_progress + if str(t.get("claimed_by") or "") in (worker_id, teammate_name) + ] if (worker_id or teammate_name) else [] + + if owned: + tid = str(owned[0].get("id") or "") + title = str(owned[0].get("title") or "") + sys.stderr.write( + f"HARNESS: 你仍有进行中的任务 [{tid}] {title}。" + "请继续执行或完成该任务后再停止。\n" + ) + return 2 # block idle + + if pending or retryable: + next_t = pending[0] if pending else retryable[0] + tid = str(next_t.get("id") or "") + title = str(next_t.get("title") or "") + sys.stderr.write( + f"HARNESS: 仍有 {len(pending)} 个待执行 + {len(retryable)} 个可重试任务。" + f"下一个: [{tid}] {title}。请继续执行。\n" + ) + return 2 # block idle + + return 0 # all done, allow idle + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/harness/hooks/hooks.json b/skills/harness/hooks/hooks.json new file mode 100644 index 0000000..090e084 --- /dev/null +++ b/skills/harness/hooks/hooks.json @@ -0,0 +1,60 @@ +{ + "description": "Harness hooks: prevent premature stop, self-reflection iteration, inject task context on session start, keep teammates working, block subagent stop when tasks remain", + "hooks": { + "Stop": [ + { + "hooks": [ + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/harness-stop.py\"", + "timeout": 10 + } + ] + }, + { + "hooks": [ + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/self-reflect-stop.py\"", + "timeout": 15, + "statusMessage": "Self-reflecting on task completion..." + } + ] + } + ], + "SessionStart": [ + { + "matcher": "startup|resume|compact", + "hooks": [ + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/harness-sessionstart.py\"", + "timeout": 10 + } + ] + } + ], + "TeammateIdle": [ + { + "hooks": [ + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/harness-teammateidle.py\"", + "timeout": 10 + } + ] + } + ], + "SubagentStop": [ + { + "hooks": [ + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/harness-subagentstop.py\"", + "timeout": 10 + } + ] + } + ] + } +} diff --git a/skills/harness/hooks/self-reflect-stop.py b/skills/harness/hooks/self-reflect-stop.py new file mode 100644 index 0000000..94344b3 --- /dev/null +++ b/skills/harness/hooks/self-reflect-stop.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +"""Self-reflection Stop hook — harness 任务循环完成后注入自省 prompt。 + +仅在以下条件同时满足时生效: + 1. harness-tasks.json 存在(harness 曾被初始化) + 2. .harness-active 不存在(harness 任务已全部完成) + +当 harness 未曾启动时,本 hook 是完全的 no-op。 + +配置: + - REFLECT_MAX_ITERATIONS 环境变量(默认 5) + - 设为 0 可禁用 +""" +from __future__ import annotations + +import json +import os +import sys +import tempfile +from pathlib import Path +from typing import Any, Optional + +# Add hooks directory to sys.path for _harness_common import +sys.path.insert(0, str(Path(__file__).resolve().parent)) +try: + import _harness_common as hc +except ImportError: + hc = None # type: ignore[assignment] + +DEFAULT_MAX_ITERATIONS = 5 + + +def _read_payload() -> dict[str, Any]: + raw = sys.stdin.read() + if not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def _find_harness_root(payload: dict[str, Any]) -> Optional[Path]: + """查找 harness-tasks.json 所在的目录。存在则说明 harness 曾被使用。""" + if hc is not None: + return hc.find_harness_root(payload) + + # Fallback: inline discovery if _harness_common not available + candidates: list[Path] = [] + state_root = os.environ.get("HARNESS_STATE_ROOT") + if state_root: + p = Path(state_root) + if (p / "harness-tasks.json").is_file(): + try: + return p.resolve() + except Exception: + return p + env_dir = os.environ.get("CLAUDE_PROJECT_DIR") + if env_dir: + candidates.append(Path(env_dir)) + cwd = payload.get("cwd") or os.getcwd() + candidates.append(Path(cwd)) + seen: set[str] = set() + for base in candidates: + try: + base = base.resolve() + except Exception: + continue + if str(base) in seen: + continue + seen.add(str(base)) + for parent in [base, *list(base.parents)[:8]]: + if (parent / "harness-tasks.json").is_file(): + return parent + return None + + +def _counter_path(session_id: str) -> Path: + """每个 session 独立计数文件。""" + return Path(tempfile.gettempdir()) / f"claude-reflect-{session_id}" + + +def _read_counter(session_id: str) -> int: + p = _counter_path(session_id) + try: + return int(p.read_text("utf-8").strip().split("\n")[0]) + except Exception: + return 0 + + +def _write_counter(session_id: str, count: int) -> None: + p = _counter_path(session_id) + try: + p.write_text(str(count), encoding="utf-8") + except Exception: + pass + + +def _extract_original_prompt(transcript_path: str, max_bytes: int = 100_000) -> str: + """从 transcript JSONL 中提取第一条用户消息作为原始 prompt。""" + try: + p = Path(transcript_path) + if not p.is_file(): + return "" + with p.open("r", encoding="utf-8") as f: + # JSONL 格式,逐行解析找第一条 user message + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except Exception: + continue + if not isinstance(entry, dict): + continue + # Claude Code transcript 格式:role + content + role = entry.get("role") or entry.get("type", "") + if role == "user": + content = entry.get("content", "") + if isinstance(content, list): + # content 可能是 list of blocks + texts = [] + for block in content: + if isinstance(block, dict): + t = block.get("text", "") + if t: + texts.append(t) + elif isinstance(block, str): + texts.append(block) + content = "\n".join(texts) + if isinstance(content, str) and content.strip(): + # 截断过长的 prompt + if len(content) > 2000: + content = content[:2000] + "..." + return content.strip() + except Exception: + pass + return "" + + +def main() -> int: + payload = _read_payload() + session_id = payload.get("session_id", "") + if not session_id: + return 0 # 无 session_id,放行 + + # 守卫 1:harness 从未初始化过 → 完全不触发自检 + root = _find_harness_root(payload) + if root is None: + return 0 # harness 未曾使用,不触发自省 + + # 守卫 2:harness 仍活跃 → 由 harness-stop.py 全权管理 + if (root / ".harness-active").is_file(): + return 0 + + # 读取最大迭代次数 + try: + max_iter = int(os.environ.get("REFLECT_MAX_ITERATIONS", DEFAULT_MAX_ITERATIONS)) + except (ValueError, TypeError): + max_iter = DEFAULT_MAX_ITERATIONS + + # 禁用 + if max_iter <= 0: + return 0 + + # 读取当前计数 + count = _read_counter(session_id) + + # 超过最大次数,放行 + if count >= max_iter: + return 0 + + # 递增计数 + _write_counter(session_id, count + 1) + + # 提取原始 prompt + transcript_path = payload.get("transcript_path", "") + original_prompt = _extract_original_prompt(transcript_path) + last_message = payload.get("last_assistant_message", "") + if last_message and len(last_message) > 3000: + last_message = last_message[:3000] + "..." + + # 构建自省 prompt + parts = [ + f"[Self-Reflect] 迭代 {count + 1}/{max_iter} — 请在继续之前进行自省检查:", + ] + + if original_prompt: + parts.append(f"\n📋 原始请求:\n{original_prompt}") + + parts.append( + "\n🔍 自省清单:" + "\n1. 对照原始请求,逐项确认每个需求点是否已完整实现" + "\n2. 检查是否有遗漏的边界情况、错误处理或异常场景" + "\n3. 代码质量:是否有可以改进的地方(可读性、性能、安全性)" + "\n4. 是否需要补充测试或文档" + "\n5. 最终确认:所有改动是否一致且不互相冲突" + "\n\n如果一切已完成,简要总结成果即可结束。如果发现问题,继续修复。" + ) + + reason = "\n".join(parts) + + print(json.dumps({"decision": "block", "reason": reason}, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/harness/tests/e2e-100tasks.sh b/skills/harness/tests/e2e-100tasks.sh new file mode 100755 index 0000000..5d09171 --- /dev/null +++ b/skills/harness/tests/e2e-100tasks.sh @@ -0,0 +1,178 @@ +#!/usr/bin/env bash +set -euo pipefail + +# E2E test: 100 harness tasks + 5 self-reflection iterations via claude -p +# Usage: bash e2e-100tasks.sh + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROJECT_DIR="$(mktemp -d /tmp/harness-e2e-XXXXXX)" +LOG_FILE="${PROJECT_DIR}/test-output.log" + +echo "=== Harness E2E Test: 100 tasks + 5 self-reflect ===" +echo "Project dir: ${PROJECT_DIR}" +echo "" + +# --- 1. Generate harness-tasks.json with 100 trivial tasks --- +python3 - "${PROJECT_DIR}" <<'PYEOF' +import json, sys + +root = sys.argv[1] +tasks = [] +for i in range(1, 101): + tid = f"task-{i:03d}" + tasks.append({ + "id": tid, + "title": f"Create file {tid}.txt", + "status": "pending", + "priority": "P1", + "depends_on": [], + "attempts": 0, + "max_attempts": 3, + "started_at_commit": None, + "validation": { + "command": f"test -f {tid}.txt && grep -q 'done-{tid}' {tid}.txt", + "timeout_seconds": 10 + }, + "on_failure": {"cleanup": None}, + "error_log": [], + "checkpoints": [], + "completed_at": None + }) + +state = { + "version": 2, + "created": "2026-03-01T00:00:00Z", + "session_config": { + "concurrency_mode": "exclusive", + "max_tasks_per_session": 100, + "max_sessions": 50, + "max_reflect_iterations": 5 + }, + "tasks": tasks, + "session_count": 0, + "last_session": None +} + +with open(f"{root}/harness-tasks.json", "w") as f: + json.dump(state, f, indent=2, ensure_ascii=False) + +print(f"Generated {len(tasks)} tasks") +PYEOF + +# --- 2. Create progress log --- +touch "${PROJECT_DIR}/harness-progress.txt" + +# --- 3. Create .harness-active marker --- +touch "${PROJECT_DIR}/.harness-active" + +# --- 4. Init git repo (required for harness commit tracking) --- +cd "${PROJECT_DIR}" +git init -q +git add harness-tasks.json harness-progress.txt .harness-active +git commit -q -m "harness init" + +echo "Setup complete. Running claude -p ..." +echo "" + +# --- 5. Build the prompt --- +PROMPT="$(cat <<'PROMPT_EOF' +You are in a project with a harness setup. Run /harness run to execute all tasks. + +The project is at the current working directory. There are 100 tasks in harness-tasks.json. +Each task requires creating a file: for task-001, create task-001.txt with content "done-task-001". + +Execute the harness infinite loop protocol: +1. Read harness-tasks.json and harness-progress.txt +2. Pick next eligible task by priority +3. For each task: create the file with the required content, run validation, mark completed +4. Continue until all tasks are done +5. After completion, the self-reflect stop hook will trigger 5 times — complete those iterations + +IMPORTANT: Do NOT use any skill tools. Just directly create files and update harness state. +For efficiency, you can batch multiple file creations in a single command. +After creating files, update harness-tasks.json to mark them completed. +Do all work directly — no planning mode, no subagents. +PROMPT_EOF +)" + +# --- 6. Run claude -p --- +START_TIME=$(date +%s) + +cd "${PROJECT_DIR}" +unset CLAUDECODE +REFLECT_MAX_ITERATIONS=5 \ +HARNESS_STATE_ROOT="${PROJECT_DIR}" \ +claude -p "${PROMPT}" \ + --model sonnet \ + --dangerously-skip-permissions \ + --disable-slash-commands \ + --no-session-persistence \ + --max-budget-usd 5 \ + --allowedTools 'Bash(*)' 'Read' 'Write' 'Glob' 'Grep' 'Edit' \ + 2>&1 | tee "${LOG_FILE}" + +END_TIME=$(date +%s) +ELAPSED=$((END_TIME - START_TIME)) + +echo "" +echo "=== Test Results ===" +echo "Duration: ${ELAPSED}s" +echo "" + +# --- 7. Verify results --- +python3 - "${PROJECT_DIR}" <<'VERIFY_EOF' +import json, sys, os +from pathlib import Path + +root = Path(sys.argv[1]) +tasks_path = root / "harness-tasks.json" +progress_path = root / "harness-progress.txt" + +# Check task files created +created = 0 +for i in range(1, 101): + tid = f"task-{i:03d}" + fpath = root / f"{tid}.txt" + if fpath.is_file(): + content = fpath.read_text().strip() + if f"done-{tid}" in content: + created += 1 + +# Check task statuses +with tasks_path.open() as f: + state = json.load(f) +tasks = state.get("tasks", []) +completed = sum(1 for t in tasks if t.get("status") == "completed") +failed = sum(1 for t in tasks if t.get("status") == "failed") +pending = sum(1 for t in tasks if t.get("status") == "pending") +in_progress = sum(1 for t in tasks if t.get("status") == "in_progress") + +# Check .harness-active removed +marker_removed = not (root / ".harness-active").is_file() + +# Check progress log +progress_lines = 0 +if progress_path.is_file(): + progress_lines = len([l for l in progress_path.read_text().splitlines() if l.strip()]) + +print(f"Files created: {created}/100") +print(f"Tasks completed: {completed}/100") +print(f"Tasks failed: {failed}") +print(f"Tasks pending: {pending}") +print(f"Tasks in_progress: {in_progress}") +print(f"Marker removed: {marker_removed}") +print(f"Progress log lines: {progress_lines}") +print() + +if created >= 95 and completed >= 95: + print("PASS: >= 95% tasks completed successfully") + sys.exit(0) +else: + print(f"PARTIAL: {created} files, {completed} completed") + print("Check the log for details") + sys.exit(1) +VERIFY_EOF + +echo "" +echo "Log: ${LOG_FILE}" +echo "Project: ${PROJECT_DIR}" diff --git a/skills/harness/tests/test_hooks.py b/skills/harness/tests/test_hooks.py new file mode 100644 index 0000000..78b4c77 --- /dev/null +++ b/skills/harness/tests/test_hooks.py @@ -0,0 +1,774 @@ +#!/usr/bin/env python3 +"""Unit tests for harness hook scripts. + +Tests the activation guard (.harness-active marker), task state logic, +and edge cases for all 4 hooks: Stop, SessionStart, TeammateIdle, SubagentStop. +""" +from __future__ import annotations + +import json +import os +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path + +HOOKS_DIR = Path(__file__).resolve().parent.parent / "hooks" +STOP_HOOK = HOOKS_DIR / "harness-stop.py" +SESSION_HOOK = HOOKS_DIR / "harness-sessionstart.py" +IDLE_HOOK = HOOKS_DIR / "harness-teammateidle.py" +SUBAGENT_HOOK = HOOKS_DIR / "harness-subagentstop.py" + + +def build_hook_env(env_extra: dict | None = None) -> dict[str, str]: + """Build an isolated environment for hook subprocesses.""" + env = os.environ.copy() + # Clear harness env vars to avoid interference + env.pop("HARNESS_STATE_ROOT", None) + env.pop("HARNESS_WORKER_ID", None) + env.pop("CLAUDE_PROJECT_DIR", None) + if env_extra: + env.update(env_extra) + return env + + +def run_hook(script: Path, payload: dict, env_extra: dict | None = None) -> tuple[int, str, str]: + """Run a hook script with JSON payload on stdin. Returns (exit_code, stdout, stderr).""" + env = build_hook_env(env_extra) + proc = subprocess.run( + [sys.executable, str(script)], + input=json.dumps(payload), + capture_output=True, + text=True, + timeout=10, + env=env, + ) + return proc.returncode, proc.stdout.strip(), proc.stderr.strip() + + +def write_tasks(root: Path, tasks: list[dict], **extra) -> None: + state = {"tasks": tasks, **extra} + (root / "harness-tasks.json").write_text(json.dumps(state), encoding="utf-8") + + +def activate(root: Path) -> None: + (root / ".harness-active").touch() + + +def deactivate(root: Path) -> None: + p = root / ".harness-active" + if p.exists(): + p.unlink() + + +# --------------------------------------------------------------------------- +# Activation Guard Tests (shared across all hooks) +# --------------------------------------------------------------------------- +class TestActivationGuard(unittest.TestCase): + """All hooks must be no-ops when .harness-active is absent.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.root = Path(self.tmpdir) + write_tasks(self.root, [ + {"id": "t1", "title": "Pending task", "status": "pending", "priority": "P0", "depends_on": []}, + ]) + (self.root / "harness-progress.txt").write_text("[SESSION-1] INIT\n") + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def _payload(self, **extra): + return {"cwd": self.tmpdir, **extra} + + def test_stop_inactive_allows(self): + """Stop hook allows stop when .harness-active is absent.""" + deactivate(self.root) + code, stdout, stderr = run_hook(STOP_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_stop_active_blocks(self): + """Stop hook blocks when .harness-active is present and tasks remain.""" + activate(self.root) + code, stdout, stderr = run_hook(STOP_HOOK, self._payload()) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + + def test_sessionstart_inactive_noop(self): + """SessionStart hook produces no output when inactive.""" + deactivate(self.root) + code, stdout, stderr = run_hook(SESSION_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_sessionstart_active_injects(self): + """SessionStart hook injects context when active.""" + activate(self.root) + code, stdout, stderr = run_hook(SESSION_HOOK, self._payload()) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertIn("additionalContext", data.get("hookSpecificOutput", {})) + + def test_teammateidle_inactive_allows(self): + """TeammateIdle hook allows idle when inactive.""" + deactivate(self.root) + code, stdout, stderr = run_hook(IDLE_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stderr, "") + + def test_teammateidle_active_blocks(self): + """TeammateIdle hook blocks idle when active and tasks remain.""" + activate(self.root) + code, stdout, stderr = run_hook(IDLE_HOOK, self._payload()) + self.assertEqual(code, 2) + self.assertIn("HARNESS", stderr) + + def test_subagentstop_inactive_allows(self): + """SubagentStop hook allows stop when inactive.""" + deactivate(self.root) + code, stdout, stderr = run_hook(SUBAGENT_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_subagentstop_active_blocks(self): + """SubagentStop hook blocks when active and tasks in progress.""" + write_tasks(self.root, [ + {"id": "t1", "title": "Working task", "status": "in_progress", "priority": "P0", "depends_on": []}, + ]) + activate(self.root) + code, stdout, stderr = run_hook(SUBAGENT_HOOK, self._payload()) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + + +# --------------------------------------------------------------------------- +# No Harness Root Tests +# --------------------------------------------------------------------------- +class TestNoHarnessRoot(unittest.TestCase): + """All hooks must be no-ops when no harness-tasks.json exists.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_stop_no_root(self): + code, stdout, _ = run_hook(STOP_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_sessionstart_no_root(self): + code, stdout, _ = run_hook(SESSION_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_teammateidle_no_root(self): + code, _, stderr = run_hook(IDLE_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stderr, "") + + def test_subagentstop_no_root(self): + code, stdout, _ = run_hook(SUBAGENT_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + +# --------------------------------------------------------------------------- +# Stop Hook — Task State Logic +# --------------------------------------------------------------------------- +class TestStopHookTaskLogic(unittest.TestCase): + """Stop hook task selection, completion detection, and safety valve.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.root = Path(self.tmpdir) + (self.root / "harness-progress.txt").write_text("") + activate(self.root) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def _payload(self, **extra): + return {"cwd": self.tmpdir, **extra} + + def test_all_completed_allows_stop(self): + """When all tasks are completed, stop is allowed.""" + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + {"id": "t2", "status": "completed"}, + ]) + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + self.assertFalse((self.root / ".harness-active").exists()) + + def test_pending_with_unmet_deps_allows_stop(self): + """Pending tasks with unmet dependencies don't block stop.""" + write_tasks(self.root, [ + {"id": "t1", "status": "failed", "attempts": 3, "max_attempts": 3}, + {"id": "t2", "status": "pending", "depends_on": ["t1"]}, + ]) + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_retryable_failed_blocks(self): + """Failed task with attempts < max_attempts blocks stop.""" + write_tasks(self.root, [ + {"id": "t1", "status": "failed", "attempts": 1, "max_attempts": 3, "priority": "P0", "depends_on": [], "title": "Retry me"}, + ]) + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + self.assertIn("Retry me", data["reason"]) + + def test_exhausted_retries_allows_stop(self): + """Failed task with attempts >= max_attempts allows stop.""" + write_tasks(self.root, [ + {"id": "t1", "status": "failed", "attempts": 3, "max_attempts": 3, "depends_on": []}, + ]) + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_in_progress_blocks(self): + """In-progress tasks block stop.""" + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "priority": "P0"}, + ]) + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + + def test_session_limit_allows_stop(self): + """Session limit reached allows stop even with pending tasks.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "depends_on": [], "priority": "P0"}, + ], session_count=5, session_config={"max_sessions": 5}) + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_max_tasks_per_session_limit_allows_stop(self): + """Per-session completed-task cap allows stop when reached.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "depends_on": [], "priority": "P0"}, + ], session_count=2, session_config={"max_tasks_per_session": 1}) + (self.root / "harness-progress.txt").write_text("[SESSION-2] Completed [task-1]\n") + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_concurrent_other_worker_in_progress_allows_stop(self): + """Concurrent mode should not block on another worker's in-progress task.""" + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "claimed_by": "worker-a", "priority": "P0"}, + ], session_config={"concurrency_mode": "concurrent"}) + code, stdout, _ = run_hook( + STOP_HOOK, self._payload(), + env_extra={"HARNESS_WORKER_ID": "worker-b"}, + ) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_priority_ordering_in_block_reason(self): + """Block reason shows highest priority task as next.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "priority": "P2", "depends_on": [], "title": "Low"}, + {"id": "t2", "status": "pending", "priority": "P0", "depends_on": [], "title": "High"}, + ]) + code, stdout, _ = run_hook(STOP_HOOK, self._payload()) + data = json.loads(stdout) + self.assertIn("t2", data["reason"]) + self.assertIn("High", data["reason"]) + + def test_stop_hook_active_safety_valve(self): + """After MAX_CONSECUTIVE_BLOCKS with stop_hook_active, allows stop.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "depends_on": [], "priority": "P0"}, + ]) + (self.root / ".harness-stop-counter").write_text("9,0") + code, stdout, stderr = run_hook(STOP_HOOK, self._payload(stop_hook_active=True)) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + self.assertIn("WARN", stderr) + + def test_stop_hook_active_below_threshold_blocks(self): + """Below MAX_CONSECUTIVE_BLOCKS with stop_hook_active still blocks.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "depends_on": [], "priority": "P0"}, + ]) + (self.root / ".harness-stop-counter").write_text("2,0") + code, stdout, _ = run_hook(STOP_HOOK, self._payload(stop_hook_active=True)) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + + def test_progress_resets_block_counter(self): + """When completed count increases, block counter resets.""" + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + {"id": "t2", "status": "pending", "depends_on": [], "priority": "P0"}, + ]) + (self.root / ".harness-stop-counter").write_text("7,0") + code, stdout, _ = run_hook(STOP_HOOK, self._payload(stop_hook_active=True)) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + counter = (self.root / ".harness-stop-counter").read_text().strip() + self.assertEqual(counter, "1,1") + + def test_corrupt_json_with_stop_hook_active_allows(self): + """Corrupt config + stop_hook_active should allow stop to avoid loop.""" + (self.root / "harness-tasks.json").write_text("{invalid json") + code, stdout, stderr = run_hook(STOP_HOOK, self._payload(stop_hook_active=True)) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + self.assertIn("WARN", stderr) + + +# --------------------------------------------------------------------------- +# SessionStart Hook — Context Injection +# --------------------------------------------------------------------------- +class TestSessionStartHook(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.root = Path(self.tmpdir) + activate(self.root) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def _payload(self): + return {"cwd": self.tmpdir} + + def test_summary_includes_counts(self): + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + {"id": "t2", "status": "pending", "depends_on": ["t1"]}, + {"id": "t3", "status": "failed", "depends_on": []}, + ]) + (self.root / "harness-progress.txt").write_text("[SESSION-1] STATS total=3\n") + code, stdout, _ = run_hook(SESSION_HOOK, self._payload()) + data = json.loads(stdout) + ctx = data["hookSpecificOutput"]["additionalContext"] + self.assertIn("completed=1", ctx) + self.assertIn("pending=1", ctx) + self.assertIn("failed=1", ctx) + self.assertIn("total=3", ctx) + + def test_next_task_hint(self): + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + {"id": "t2", "status": "pending", "priority": "P0", "depends_on": ["t1"], "title": "Do stuff"}, + ]) + (self.root / "harness-progress.txt").write_text("") + code, stdout, _ = run_hook(SESSION_HOOK, self._payload()) + data = json.loads(stdout) + ctx = data["hookSpecificOutput"]["additionalContext"] + self.assertIn("next=t2", ctx) + self.assertIn("Do stuff", ctx) + + def test_empty_tasks_no_crash(self): + write_tasks(self.root, []) + (self.root / "harness-progress.txt").write_text("") + code, stdout, _ = run_hook(SESSION_HOOK, self._payload()) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertIn("total=0", data["hookSpecificOutput"]["additionalContext"]) + + def test_corrupt_json_reports_error(self): + (self.root / "harness-tasks.json").write_text("{invalid json") + (self.root / "harness-progress.txt").write_text("") + code, stdout, _ = run_hook(SESSION_HOOK, self._payload()) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertIn("error", data["hookSpecificOutput"]["additionalContext"].lower()) + + def test_invalid_attempt_fields_no_crash(self): + write_tasks(self.root, [ + {"id": "t1", "status": "failed", "attempts": "oops", "max_attempts": "bad", "depends_on": []}, + ]) + (self.root / "harness-progress.txt").write_text("") + code, stdout, _ = run_hook(SESSION_HOOK, self._payload()) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertIn("total=1", data["hookSpecificOutput"]["additionalContext"]) + + +# --------------------------------------------------------------------------- +# TeammateIdle Hook — Ownership & Task State +# --------------------------------------------------------------------------- +class TestTeammateIdleHook(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.root = Path(self.tmpdir) + activate(self.root) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_owned_in_progress_blocks(self): + """Teammate with in-progress task is blocked from going idle.""" + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "claimed_by": "alice", "title": "My task"}, + ]) + code, _, stderr = run_hook(IDLE_HOOK, {"cwd": self.tmpdir, "teammate_name": "alice"}) + self.assertEqual(code, 2) + self.assertIn("t1", stderr) + + def test_unowned_in_progress_allows(self): + """Teammate without owned tasks and no pending allows idle.""" + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "claimed_by": "bob"}, + ]) + code, _, stderr = run_hook(IDLE_HOOK, {"cwd": self.tmpdir, "teammate_name": "alice"}) + self.assertEqual(code, 0) + + def test_pending_tasks_block(self): + """Pending eligible tasks block idle even without ownership.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "depends_on": [], "title": "Next up"}, + ]) + code, _, stderr = run_hook(IDLE_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 2) + self.assertIn("t1", stderr) + + def test_all_completed_allows(self): + """All tasks completed allows idle.""" + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + {"id": "t2", "status": "completed"}, + ]) + code, _, stderr = run_hook(IDLE_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stderr, "") + + def test_failed_retryable_blocks(self): + """Retryable failed tasks block idle.""" + write_tasks(self.root, [ + {"id": "t1", "status": "failed", "attempts": 1, "max_attempts": 3, "depends_on": [], "title": "Retry"}, + ]) + code, _, stderr = run_hook(IDLE_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 2) + self.assertIn("t1", stderr) + + def test_worker_id_env_matches(self): + """HARNESS_WORKER_ID env var matches claimed_by.""" + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "claimed_by": "w-123"}, + ]) + code, _, stderr = run_hook( + IDLE_HOOK, {"cwd": self.tmpdir}, + env_extra={"HARNESS_WORKER_ID": "w-123"}, + ) + self.assertEqual(code, 2) + self.assertIn("t1", stderr) + + +# --------------------------------------------------------------------------- +# SubagentStop Hook — Stop Guard & stop_hook_active +# --------------------------------------------------------------------------- +class TestSubagentStopHook(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.root = Path(self.tmpdir) + activate(self.root) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_in_progress_blocks(self): + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "title": "Working"}, + ]) + code, stdout, _ = run_hook(SUBAGENT_HOOK, {"cwd": self.tmpdir}) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + self.assertIn("Working", data["reason"]) + + def test_pending_allows(self): + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + {"id": "t2", "status": "pending", "depends_on": ["t1"], "title": "Next"}, + ]) + code, stdout, _ = run_hook(SUBAGENT_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_all_done_allows(self): + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + {"id": "t2", "status": "completed"}, + ]) + code, stdout, _ = run_hook(SUBAGENT_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_stop_hook_active_allows(self): + """stop_hook_active=True bypasses all checks to prevent infinite loop.""" + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress"}, + ]) + code, stdout, _ = run_hook(SUBAGENT_HOOK, {"cwd": self.tmpdir, "stop_hook_active": True}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_blocked_deps_not_counted(self): + """Pending tasks with unmet deps don't trigger block.""" + write_tasks(self.root, [ + {"id": "t1", "status": "failed", "attempts": 3, "max_attempts": 3}, + {"id": "t2", "status": "pending", "depends_on": ["t1"]}, + ]) + code, stdout, _ = run_hook(SUBAGENT_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_concurrent_owned_in_progress_blocks(self): + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "claimed_by": "worker-a", "title": "Mine"}, + ], session_config={"concurrency_mode": "concurrent"}) + code, stdout, _ = run_hook( + SUBAGENT_HOOK, {"cwd": self.tmpdir}, + env_extra={"HARNESS_WORKER_ID": "worker-a"}, + ) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + self.assertIn("Mine", data["reason"]) + + def test_concurrent_other_worker_in_progress_allows(self): + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "claimed_by": "worker-a", "title": "Other"}, + ], session_config={"concurrency_mode": "concurrent"}) + code, stdout, _ = run_hook( + SUBAGENT_HOOK, {"cwd": self.tmpdir}, + env_extra={"HARNESS_WORKER_ID": "worker-b"}, + ) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_concurrent_missing_identity_blocks(self): + write_tasks(self.root, [ + {"id": "t1", "status": "in_progress", "claimed_by": "worker-a", "title": "Other"}, + ], session_config={"concurrency_mode": "concurrent"}) + code, stdout, _ = run_hook(SUBAGENT_HOOK, {"cwd": self.tmpdir}) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + self.assertIn("worker identity", data["reason"]) + + +# --------------------------------------------------------------------------- +# Edge Cases +# --------------------------------------------------------------------------- +class TestEdgeCases(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.root = Path(self.tmpdir) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_empty_stdin(self): + """Hooks handle empty stdin gracefully.""" + write_tasks(self.root, [{"id": "t1", "status": "pending", "depends_on": []}]) + activate(self.root) + for hook in [STOP_HOOK, SESSION_HOOK, IDLE_HOOK, SUBAGENT_HOOK]: + proc = subprocess.run( + [sys.executable, str(hook)], + input="", + capture_output=True, text=True, timeout=10, + cwd=self.tmpdir, + env=build_hook_env(), + ) + self.assertIn(proc.returncode, {0, 2}, f"{hook.name} failed on empty stdin") + self.assertNotIn("Traceback", proc.stderr) + + def test_invalid_json_stdin(self): + """Hooks handle invalid JSON stdin gracefully.""" + write_tasks(self.root, [{"id": "t1", "status": "pending", "depends_on": []}]) + activate(self.root) + for hook in [STOP_HOOK, SESSION_HOOK, IDLE_HOOK, SUBAGENT_HOOK]: + proc = subprocess.run( + [sys.executable, str(hook)], + input="not json at all", + capture_output=True, text=True, timeout=10, + cwd=self.tmpdir, + env=build_hook_env(), + ) + self.assertIn(proc.returncode, {0, 2}, f"{hook.name} crashed on invalid JSON") + self.assertNotIn("Traceback", proc.stderr) + + def test_harness_state_root_env(self): + """HARNESS_STATE_ROOT env var is respected.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "depends_on": [], "priority": "P0"}, + ]) + activate(self.root) + (self.root / "harness-progress.txt").write_text("") + code, stdout, _ = run_hook( + STOP_HOOK, {"cwd": "/nonexistent"}, + env_extra={"HARNESS_STATE_ROOT": self.tmpdir}, + ) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + + def test_tasks_not_a_list(self): + """Hooks handle tasks field being non-list.""" + (self.root / "harness-tasks.json").write_text('{"tasks": "not a list"}') + activate(self.root) + (self.root / "harness-progress.txt").write_text("") + code, stdout, _ = run_hook(STOP_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + + +# --------------------------------------------------------------------------- +# Self-Reflect Stop Hook — Only triggers after harness completes +# --------------------------------------------------------------------------- +REFLECT_HOOK = HOOKS_DIR / "self-reflect-stop.py" + + +class TestSelfReflectStopHook(unittest.TestCase): + """self-reflect-stop.py must only trigger when harness was used and completed.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.root = Path(self.tmpdir) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + # Clean up counter files + for p in Path(tempfile.gettempdir()).glob("claude-reflect-test-*"): + try: + p.unlink() + except Exception: + pass + + def _payload(self, session_id="test-reflect-001", **extra): + return {"cwd": self.tmpdir, "session_id": session_id, **extra} + + def test_no_harness_root_is_noop(self): + """When harness-tasks.json doesn't exist, hook is a complete no-op.""" + code, stdout, stderr = run_hook(REFLECT_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "", "Should produce no output when harness never used") + + def test_harness_active_defers(self): + """When .harness-active exists, hook defers to harness-stop.py.""" + write_tasks(self.root, [ + {"id": "t1", "status": "pending", "depends_on": []}, + ]) + activate(self.root) + code, stdout, _ = run_hook(REFLECT_HOOK, self._payload()) + self.assertEqual(code, 0) + self.assertEqual(stdout, "", "Should not self-reflect while harness is active") + + def test_harness_completed_triggers_reflection(self): + """When harness-tasks.json exists but .harness-active removed, triggers self-reflection.""" + write_tasks(self.root, [ + {"id": "t1", "status": "completed"}, + ]) + deactivate(self.root) + sid = "test-reflect-trigger" + code, stdout, _ = run_hook(REFLECT_HOOK, self._payload(session_id=sid)) + self.assertEqual(code, 0) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + self.assertIn("Self-Reflect", data["reason"]) + + def test_counter_increments(self): + """Each invocation increments the iteration counter.""" + write_tasks(self.root, [{"id": "t1", "status": "completed"}]) + deactivate(self.root) + sid = "test-reflect-counter" + + # First call: iteration 1 + code, stdout, _ = run_hook(REFLECT_HOOK, self._payload(session_id=sid)) + data = json.loads(stdout) + self.assertIn("1/5", data["reason"]) + + # Second call: iteration 2 + code, stdout, _ = run_hook(REFLECT_HOOK, self._payload(session_id=sid)) + data = json.loads(stdout) + self.assertIn("2/5", data["reason"]) + + def test_max_iterations_allows_stop(self): + """After max iterations, hook allows stop (no output).""" + write_tasks(self.root, [{"id": "t1", "status": "completed"}]) + deactivate(self.root) + sid = "test-reflect-max" + + # Write counter at max + counter_path = Path(tempfile.gettempdir()) / f"claude-reflect-{sid}" + counter_path.write_text("5", encoding="utf-8") + + code, stdout, _ = run_hook(REFLECT_HOOK, self._payload(session_id=sid)) + self.assertEqual(code, 0) + self.assertEqual(stdout, "", "Should allow stop after max iterations") + + def test_disabled_via_env(self): + """REFLECT_MAX_ITERATIONS=0 disables self-reflection.""" + write_tasks(self.root, [{"id": "t1", "status": "completed"}]) + deactivate(self.root) + code, stdout, _ = run_hook( + REFLECT_HOOK, + self._payload(session_id="test-reflect-disabled"), + env_extra={"REFLECT_MAX_ITERATIONS": "0"}, + ) + self.assertEqual(code, 0) + self.assertEqual(stdout, "", "Should be disabled when max=0") + + def test_no_session_id_is_noop(self): + """Missing session_id makes hook a no-op.""" + write_tasks(self.root, [{"id": "t1", "status": "completed"}]) + deactivate(self.root) + code, stdout, _ = run_hook(REFLECT_HOOK, {"cwd": self.tmpdir}) + self.assertEqual(code, 0) + self.assertEqual(stdout, "") + + def test_empty_stdin_no_crash(self): + """Empty stdin doesn't crash.""" + write_tasks(self.root, [{"id": "t1", "status": "completed"}]) + activate(self.root) + proc = subprocess.run( + [sys.executable, str(REFLECT_HOOK)], + input="", + capture_output=True, text=True, timeout=10, + cwd=self.tmpdir, + env=build_hook_env(), + ) + self.assertEqual(proc.returncode, 0) + self.assertNotIn("Traceback", proc.stderr) + + def test_harness_state_root_env_respected(self): + """HARNESS_STATE_ROOT env var is used for root discovery.""" + write_tasks(self.root, [{"id": "t1", "status": "completed"}]) + deactivate(self.root) + sid = "test-reflect-env" + code, stdout, _ = run_hook( + REFLECT_HOOK, + {"cwd": "/nonexistent", "session_id": sid}, + env_extra={"HARNESS_STATE_ROOT": self.tmpdir}, + ) + data = json.loads(stdout) + self.assertEqual(data["decision"], "block") + + +if __name__ == "__main__": + unittest.main() diff --git a/uninstall.py b/uninstall.py index 23d1ab1..7b10a54 100755 --- a/uninstall.py +++ b/uninstall.py @@ -12,6 +12,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Set DEFAULT_INSTALL_DIR = "~/.claude" +SETTINGS_FILE = "settings.json" # Files created by installer itself (not by modules) INSTALLER_FILES = ["install.log", "installed_modules.json", "installed_modules.json.bak"] @@ -80,6 +81,42 @@ def load_config(install_dir: Path) -> Dict[str, Any]: return {} +def unmerge_hooks_from_settings(module_name: str, install_dir: Path) -> bool: + """Remove hooks tagged with __module__=module_name from settings.json.""" + settings_path = install_dir / SETTINGS_FILE + if not settings_path.exists(): + return False + + try: + with settings_path.open("r", encoding="utf-8") as f: + settings = json.load(f) + except (json.JSONDecodeError, OSError): + return False + + if "hooks" not in settings: + return False + + modified = False + for hook_type in list(settings["hooks"].keys()): + original_len = len(settings["hooks"][hook_type]) + settings["hooks"][hook_type] = [ + entry for entry in settings["hooks"][hook_type] + if entry.get("__module__") != module_name + ] + if len(settings["hooks"][hook_type]) < original_len: + modified = True + # Remove empty hook type arrays + if not settings["hooks"][hook_type]: + del settings["hooks"][hook_type] + + if modified: + with settings_path.open("w", encoding="utf-8") as f: + json.dump(settings, f, indent=2, ensure_ascii=False) + f.write("\n") + + return modified + + def get_module_files(module_name: str, config: Dict[str, Any]) -> Set[str]: """Extract files/dirs that a module installs based on config.json operations.""" files: Set[str] = set() @@ -261,6 +298,11 @@ def main(argv: Optional[List[str]] = None) -> int: except OSError as e: print(f" ✗ Failed to remove {item}: {e}", file=sys.stderr) + # Remove module hooks from settings.json + for m in selected: + if unmerge_hooks_from_settings(m, install_dir): + print(f" ✓ Removed hooks for module '{m}' from settings.json") + # Update installed_modules.json status_file = install_dir / "installed_modules.json" if status_file.exists() and selected != list(installed_modules.keys()):