Files
myclaude/skills/do/scripts/task.py
cexll 97dfa907d9 feat(skills): add per-task skill spec auto-detection and injection
Replace external inject-spec.py hook with built-in zero-config skill
detection in codeagent-wrapper. The system auto-detects project type
from fingerprint files (go.mod, package.json, etc.), maps to installed
skills, and injects SKILL.md content directly into sub-agent prompts.

Key changes:
- Add DetectProjectSkills/ResolveSkillContent in executor/prompt.go
- Add Skills field to TaskSpec with parallel config parsing
- Add --skills CLI flag for explicit override
- Update /do SKILL.md Phase 4 with per-task skill examples
- Remove on-stop.py global hook (not needed)
- Replace inject-spec.py with no-op (detection now internal)
- Add 20 unit tests covering detection, resolution, budget, security

Security: path traversal protection via validSkillName regex,
16K char budget with tag overhead accounting, CRLF normalization.

Generated with SWE-Agent.ai

Co-Authored-By: SWE-Agent.ai <noreply@swe-agent.ai>
2026-02-09 11:06:36 +08:00

435 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Task Directory Management CLI for do skill workflow.
Commands:
create <title> - Create a new task directory with task.md
start <task-dir> - Set current task pointer
finish - Clear current task pointer
list - List active tasks
status - Show current task status
update-phase <N> - Update current phase
"""
import argparse
import os
import random
import re
import string
import subprocess
import sys
from datetime import datetime
from pathlib import Path
# Directory constants
DIR_TASKS = ".claude/do-tasks"
FILE_CURRENT_TASK = ".current-task"
FILE_TASK_MD = "task.md"
PHASE_NAMES = {
1: "Understand",
2: "Clarify",
3: "Design",
4: "Implement",
5: "Complete",
}
def get_project_root() -> str:
"""Get project root from env or cwd."""
return os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd())
def get_tasks_dir(project_root: str) -> str:
"""Get tasks directory path."""
return os.path.join(project_root, DIR_TASKS)
def get_current_task_file(project_root: str) -> str:
"""Get current task pointer file path."""
return os.path.join(project_root, DIR_TASKS, FILE_CURRENT_TASK)
def generate_task_id() -> str:
"""Generate short task ID: MMDD-XXXX format."""
date_part = datetime.now().strftime("%m%d")
random_part = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
return f"{date_part}-{random_part}"
def read_task_md(task_md_path: str) -> dict | None:
"""Read task.md and parse YAML frontmatter + body."""
if not os.path.exists(task_md_path):
return None
try:
with open(task_md_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception:
return None
# Parse YAML frontmatter
match = re.match(r'^---\n(.*?)\n---\n(.*)$', content, re.DOTALL)
if not match:
return None
frontmatter_str = match.group(1)
body = match.group(2)
# Simple YAML parsing (no external deps)
frontmatter = {}
for line in frontmatter_str.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
# Handle quoted strings
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
elif value == 'true':
value = True
elif value == 'false':
value = False
elif value.isdigit():
value = int(value)
frontmatter[key] = value
return {"frontmatter": frontmatter, "body": body}
def write_task_md(task_md_path: str, frontmatter: dict, body: str) -> bool:
"""Write task.md with YAML frontmatter + body."""
try:
lines = ["---"]
for key, value in frontmatter.items():
if isinstance(value, bool):
lines.append(f"{key}: {str(value).lower()}")
elif isinstance(value, int):
lines.append(f"{key}: {value}")
elif isinstance(value, str) and ('<' in value or '>' in value or ':' in value):
lines.append(f'{key}: "{value}"')
else:
lines.append(f'{key}: "{value}"' if isinstance(value, str) else f"{key}: {value}")
lines.append("---")
lines.append("")
lines.append(body)
with open(task_md_path, "w", encoding="utf-8") as f:
f.write('\n'.join(lines))
return True
except Exception:
return False
def create_worktree(project_root: str, task_id: str) -> str:
"""Create a git worktree for the task. Returns the worktree directory path."""
# Get git root
result = subprocess.run(
["git", "-C", project_root, "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Not a git repository: {project_root}")
git_root = result.stdout.strip()
# Calculate paths
worktree_dir = os.path.join(git_root, ".worktrees", f"do-{task_id}")
branch_name = f"do/{task_id}"
# Create worktree with new branch
result = subprocess.run(
["git", "-C", git_root, "worktree", "add", "-b", branch_name, worktree_dir],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Failed to create worktree: {result.stderr}")
return worktree_dir
def create_task(title: str, use_worktree: bool = False) -> dict:
"""Create a new task directory with task.md."""
project_root = get_project_root()
tasks_dir = get_tasks_dir(project_root)
os.makedirs(tasks_dir, exist_ok=True)
task_id = generate_task_id()
task_dir = os.path.join(tasks_dir, task_id)
os.makedirs(task_dir, exist_ok=True)
# Create worktree if requested
worktree_dir = ""
if use_worktree:
try:
worktree_dir = create_worktree(project_root, task_id)
except RuntimeError as e:
print(f"Warning: {e}", file=sys.stderr)
use_worktree = False
frontmatter = {
"id": task_id,
"title": title,
"status": "in_progress",
"current_phase": 1,
"phase_name": PHASE_NAMES[1],
"max_phases": 5,
"use_worktree": use_worktree,
"worktree_dir": worktree_dir,
"created_at": datetime.now().isoformat(),
"completion_promise": "<promise>DO_COMPLETE</promise>",
}
body = f"""# Requirements
{title}
## Context
## Progress
"""
task_md_path = os.path.join(task_dir, FILE_TASK_MD)
write_task_md(task_md_path, frontmatter, body)
current_task_file = get_current_task_file(project_root)
relative_task_dir = os.path.relpath(task_dir, project_root)
with open(current_task_file, "w", encoding="utf-8") as f:
f.write(relative_task_dir)
return {
"task_dir": task_dir,
"relative_path": relative_task_dir,
"task_data": frontmatter,
"worktree_dir": worktree_dir,
}
def get_current_task(project_root: str) -> str | None:
"""Read current task directory path."""
current_task_file = get_current_task_file(project_root)
if not os.path.exists(current_task_file):
return None
try:
with open(current_task_file, "r", encoding="utf-8") as f:
content = f.read().strip()
return content if content else None
except Exception:
return None
def start_task(task_dir: str) -> bool:
"""Set current task pointer."""
project_root = get_project_root()
tasks_dir = get_tasks_dir(project_root)
if os.path.isabs(task_dir):
full_path = task_dir
relative_path = os.path.relpath(task_dir, project_root)
else:
if not task_dir.startswith(DIR_TASKS):
full_path = os.path.join(tasks_dir, task_dir)
relative_path = os.path.join(DIR_TASKS, task_dir)
else:
full_path = os.path.join(project_root, task_dir)
relative_path = task_dir
if not os.path.exists(full_path):
print(f"Error: Task directory not found: {full_path}", file=sys.stderr)
return False
current_task_file = get_current_task_file(project_root)
os.makedirs(os.path.dirname(current_task_file), exist_ok=True)
with open(current_task_file, "w", encoding="utf-8") as f:
f.write(relative_path)
return True
def finish_task() -> bool:
"""Clear current task pointer."""
project_root = get_project_root()
current_task_file = get_current_task_file(project_root)
if os.path.exists(current_task_file):
os.remove(current_task_file)
return True
def list_tasks() -> list[dict]:
"""List all task directories."""
project_root = get_project_root()
tasks_dir = get_tasks_dir(project_root)
if not os.path.exists(tasks_dir):
return []
tasks = []
current_task = get_current_task(project_root)
for entry in sorted(os.listdir(tasks_dir), reverse=True):
entry_path = os.path.join(tasks_dir, entry)
if not os.path.isdir(entry_path):
continue
task_md_path = os.path.join(entry_path, FILE_TASK_MD)
if not os.path.exists(task_md_path):
continue
parsed = read_task_md(task_md_path)
if parsed:
task_data = parsed["frontmatter"]
else:
task_data = {"id": entry, "title": entry, "status": "unknown"}
relative_path = os.path.join(DIR_TASKS, entry)
task_data["path"] = relative_path
task_data["is_current"] = current_task == relative_path
tasks.append(task_data)
return tasks
def get_status() -> dict | None:
"""Get current task status."""
project_root = get_project_root()
current_task = get_current_task(project_root)
if not current_task:
return None
task_dir = os.path.join(project_root, current_task)
task_md_path = os.path.join(task_dir, FILE_TASK_MD)
parsed = read_task_md(task_md_path)
if not parsed:
return None
task_data = parsed["frontmatter"]
task_data["path"] = current_task
return task_data
def update_phase(phase: int) -> bool:
"""Update current task phase."""
project_root = get_project_root()
current_task = get_current_task(project_root)
if not current_task:
print("Error: No active task.", file=sys.stderr)
return False
task_dir = os.path.join(project_root, current_task)
task_md_path = os.path.join(task_dir, FILE_TASK_MD)
parsed = read_task_md(task_md_path)
if not parsed:
print("Error: task.md not found or invalid.", file=sys.stderr)
return False
frontmatter = parsed["frontmatter"]
frontmatter["current_phase"] = phase
frontmatter["phase_name"] = PHASE_NAMES.get(phase, f"Phase {phase}")
if not write_task_md(task_md_path, frontmatter, parsed["body"]):
print("Error: Failed to write task.md.", file=sys.stderr)
return False
return True
def main():
parser = argparse.ArgumentParser(
description="Task directory management for do skill workflow"
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# create command
create_parser = subparsers.add_parser("create", help="Create a new task")
create_parser.add_argument("title", nargs="+", help="Task title")
create_parser.add_argument("--worktree", action="store_true", help="Enable worktree mode")
# start command
start_parser = subparsers.add_parser("start", help="Set current task")
start_parser.add_argument("task_dir", help="Task directory path")
# finish command
subparsers.add_parser("finish", help="Clear current task")
# list command
subparsers.add_parser("list", help="List all tasks")
# status command
subparsers.add_parser("status", help="Show current task status")
# update-phase command
phase_parser = subparsers.add_parser("update-phase", help="Update current phase")
phase_parser.add_argument("phase", type=int, help="Phase number (1-5)")
args = parser.parse_args()
if args.command == "create":
title = " ".join(args.title)
result = create_task(title, args.worktree)
print(f"Created task: {result['relative_path']}")
print(f"Task ID: {result['task_data']['id']}")
print(f"Phase: 1/{result['task_data']['max_phases']} (Understand)")
print(f"Worktree: {result['task_data']['use_worktree']}")
elif args.command == "start":
if start_task(args.task_dir):
print(f"Started task: {args.task_dir}")
else:
sys.exit(1)
elif args.command == "finish":
if finish_task():
print("Task finished, current task cleared.")
else:
sys.exit(1)
elif args.command == "list":
tasks = list_tasks()
if not tasks:
print("No tasks found.")
else:
for task in tasks:
marker = "* " if task.get("is_current") else " "
phase = task.get("current_phase", "?")
max_phase = task.get("max_phases", 5)
status = task.get("status", "unknown")
print(f"{marker}{task['id']} [{status}] phase {phase}/{max_phase}")
print(f" {task.get('title', 'No title')}")
elif args.command == "status":
status = get_status()
if not status:
print("No active task.")
else:
print(f"Task: {status['id']}")
print(f"Title: {status.get('title', 'No title')}")
print(f"Status: {status.get('status', 'unknown')}")
print(f"Phase: {status.get('current_phase', '?')}/{status.get('max_phases', 5)}")
print(f"Worktree: {status.get('use_worktree', False)}")
print(f"Path: {status['path']}")
elif args.command == "update-phase":
if update_phase(args.phase):
phase_name = PHASE_NAMES.get(args.phase, f"Phase {args.phase}")
print(f"Updated to phase {args.phase} ({phase_name})")
else:
sys.exit(1)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()