Compare commits

..

12 Commits

Author SHA1 Message Date
cexll
9fa872a1f0 update codex skill dependencies 2025-12-01 00:11:31 +08:00
ben
6d263fe8c9 Merge pull request #34 from cexll/cce-worktree-master-20251129-111802-997076000
feat: add parallel execution support to codex-wrapper
2025-11-30 00:16:10 +08:00
cexll
e55b13c2c5 docs: improve codex skill parameter best practices
Add best practices for task id and workdir parameters:
- id: recommend <feature>_<timestamp> format for uniqueness
- workdir: recommend absolute paths to avoid ambiguity
Update parallel execution example to demonstrate recommended format

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 23:32:44 +08:00
cexll
f95f5f5e88 feat: add session resume support and improve output format
- Support session_id in parallel task config for resuming failed tasks
- Change output format from JSON to human-readable text
- Add helper functions (hello, greet, farewell) with tests
- Clean up code formatting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 23:14:43 +08:00
cexll
23c212f8be feat: add parallel execution support to codex-wrapper
- Replace JSON format with delimiter format (---TASK---/---CONTENT---)
- Support unlimited concurrent task execution with dependency management
- Implement Kahn's topological sort for dependency resolution
- Add cycle detection and error isolation
- Change output from JSON to human-readable text format
- Update SKILL.md with parallel execution documentation

Key features:
- No escaping needed for task content (heredoc protected)
- Automatic dependency-based scheduling
- Failed tasks don't block independent tasks
- Text output format for better readability

Test coverage: 89.0%

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 22:12:40 +08:00
cexll
90477abb81 update CLAUDE.md and codex skill 2025-11-29 19:11:06 +08:00
ben
11afae2dff Merge pull request #32 from freespace8/master
fix(main): 提升缓冲区限制并简化消息提取流程
2025-11-28 16:49:24 +08:00
freespace8
3df4fec6dd test(ParseJSONStream): 增加对超大单行文本和非字符串文本的处理测试 2025-11-28 15:10:47 +08:00
freespace8
aea19f0e1f fix(main): improve buffer size and streamline message extraction 2025-11-28 15:10:39 +08:00
cexll
291a4e3d0a optimize dev pipline 2025-11-27 22:21:49 +08:00
cexll
957b737126 Merge feat/codex-wrapper: fix repository URLs 2025-11-27 18:01:13 +08:00
cexll
3e30f4e207 fix: update repository URLs to cexll/myclaude
- Update install.sh REPO variable
- Update README.md installation instructions
- Remove obsolete PLUGIN_README.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-27 17:53:35 +08:00
10 changed files with 1838 additions and 227 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,2 @@
CLAUDE.md
.claude/
.claude-trace

View File

@@ -1,95 +0,0 @@
# Claude Code Plugin System
本项目已支持Claude Code插件系统可以将命令和代理打包成可安装的插件包。
## 插件配置
插件配置文件位于 `.claude-plugin/marketplace.json`,定义了所有可用的插件包。
## 可用插件
### 1. Requirements-Driven Development
- **描述**: 需求驱动的开发工作流包含90%质量门控
- **命令**: `/requirements-pilot`
- **代理**: requirements-generate, requirements-code, requirements-testing, requirements-review
### 2. BMAD Agile Workflow
- **描述**: 完整的BMAD敏捷工作流产品负责人→架构师→SM→开发→QA
- **命令**: `/bmad-pilot`
- **代理**: bmad-po, bmad-architect, bmad-sm, bmad-dev, bmad-qa, bmad-orchestrator
### 3. Development Essentials
- **描述**: 核心开发命令套件
- **命令**: `/code`, `/debug`, `/test`, `/optimize`, `/review`, `/bugfix`, `/refactor`, `/docs`, `/ask`, `/think`
- **代理**: code, bugfix, bugfix-verify, code-optimize, debug, develop
### 4. Advanced AI Agents
- **描述**: 高级AI代理集成GPT-5进行深度分析
- **代理**: gpt5
## 使用插件命令
### 列出所有可用插件
```bash
/plugin list
```
### 查看插件详情
```bash
/plugin info <plugin-name>
```
例如:`/plugin info requirements-driven-development`
### 安装插件
```bash
/plugin install <plugin-name>
```
例如:`/plugin install bmad-agile-workflow`
### 移除插件
```bash
/plugin remove <plugin-name>
```
## 创建自定义插件
要创建自己的插件:
1.`.claude-plugin/marketplace.json` 中添加新的插件定义
2. 指定插件包含的命令和代理文件路径
3. 设置适当的元数据(版本、作者、关键词等)
示例插件结构:
```json
{
"name": "my-custom-plugin",
"source": "./",
"description": "自定义插件描述",
"version": "1.0.0",
"commands": [
"./commands/my-command.md"
],
"agents": [
"./agents/my-agent.md"
]
}
```
## 分享插件
要分享插件给其他项目:
1. 复制整个 `.claude-plugin` 目录到目标项目
2. 确保相关的命令和代理文件存在
3. 在新项目中使用 `/plugin` 命令管理插件
## 注意事项
- 插件系统遵循Claude Code的插件规范
- 所有命令和代理文件必须是有效的Markdown格式
- 插件配置支持版本管理和依赖关系
- 插件可以包含多个命令、代理和输出样式
## 相关文档
- [Claude Code插件文档](https://docs.claude.com/en/docs/claude-code/plugins)
- [示例插件仓库](https://github.com/wshobson/agents)

View File

@@ -92,7 +92,7 @@ make install
**Codex Wrapper** (Go binary for Codex CLI)
```bash
curl -fsSL https://raw.githubusercontent.com/chenwenjie/myclaude/master/install.sh | bash
curl -fsSL https://raw.githubusercontent.com/cexll/myclaude/refs/heads/master/install.sh | bash
```
**Method 1: Plugin Install** (One command)

View File

@@ -2,6 +2,7 @@ package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
@@ -9,24 +10,30 @@ import (
"os"
"os/exec"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
const (
version = "1.0.0"
defaultWorkdir = "."
defaultTimeout = 7200 // seconds
forceKillDelay = 5 // seconds
version = "1.0.0"
defaultWorkdir = "."
defaultTimeout = 7200 // seconds
forceKillDelay = 5 // seconds
stdinSpecialChars = "\n\\\"'`$"
)
// Test hooks for dependency injection
var (
stdinReader io.Reader = os.Stdin
isTerminalFn = defaultIsTerminal
codexCommand = "codex"
stdinReader io.Reader = os.Stdin
isTerminalFn = defaultIsTerminal
codexCommand = "codex"
buildCodexArgsFn = buildCodexArgs
commandContext = exec.CommandContext
jsonMarshal = json.Marshal
)
// Config holds CLI configuration
@@ -39,6 +46,293 @@ type Config struct {
Timeout int
}
// ParallelConfig defines the JSON schema for parallel execution
type ParallelConfig struct {
Tasks []TaskSpec `json:"tasks"`
}
// TaskSpec describes an individual task entry in the parallel config
type TaskSpec struct {
ID string `json:"id"`
Task string `json:"task"`
WorkDir string `json:"workdir,omitempty"`
Dependencies []string `json:"dependencies,omitempty"`
SessionID string `json:"session_id,omitempty"`
Mode string `json:"-"`
UseStdin bool `json:"-"`
}
// TaskResult captures the execution outcome of a task
type TaskResult struct {
TaskID string `json:"task_id"`
ExitCode int `json:"exit_code"`
Message string `json:"message"`
SessionID string `json:"session_id"`
Error string `json:"error"`
}
func parseParallelConfig(data []byte) (*ParallelConfig, error) {
trimmed := bytes.TrimSpace(data)
if len(trimmed) == 0 {
return nil, fmt.Errorf("parallel config is empty")
}
tasks := strings.Split(string(trimmed), "---TASK---")
var cfg ParallelConfig
seen := make(map[string]struct{})
for _, taskBlock := range tasks {
taskBlock = strings.TrimSpace(taskBlock)
if taskBlock == "" {
continue
}
parts := strings.SplitN(taskBlock, "---CONTENT---", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("task block missing ---CONTENT--- separator")
}
meta := strings.TrimSpace(parts[0])
content := strings.TrimSpace(parts[1])
task := TaskSpec{WorkDir: defaultWorkdir}
for _, line := range strings.Split(meta, "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
kv := strings.SplitN(line, ":", 2)
if len(kv) != 2 {
continue
}
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
switch key {
case "id":
task.ID = value
case "workdir":
task.WorkDir = value
case "session_id":
task.SessionID = value
task.Mode = "resume"
case "dependencies":
for _, dep := range strings.Split(value, ",") {
dep = strings.TrimSpace(dep)
if dep != "" {
task.Dependencies = append(task.Dependencies, dep)
}
}
}
}
if task.ID == "" {
return nil, fmt.Errorf("task missing id field")
}
if content == "" {
return nil, fmt.Errorf("task %q missing content", task.ID)
}
if _, exists := seen[task.ID]; exists {
return nil, fmt.Errorf("duplicate task id: %s", task.ID)
}
task.Task = content
cfg.Tasks = append(cfg.Tasks, task)
seen[task.ID] = struct{}{}
}
if len(cfg.Tasks) == 0 {
return nil, fmt.Errorf("no tasks found")
}
return &cfg, nil
}
func topologicalSort(tasks []TaskSpec) ([][]TaskSpec, error) {
idToTask := make(map[string]TaskSpec, len(tasks))
indegree := make(map[string]int, len(tasks))
adj := make(map[string][]string, len(tasks))
for _, task := range tasks {
idToTask[task.ID] = task
indegree[task.ID] = 0
}
for _, task := range tasks {
for _, dep := range task.Dependencies {
if _, ok := idToTask[dep]; !ok {
return nil, fmt.Errorf("dependency %q not found for task %q", dep, task.ID)
}
indegree[task.ID]++
adj[dep] = append(adj[dep], task.ID)
}
}
queue := make([]string, 0, len(tasks))
for _, task := range tasks {
if indegree[task.ID] == 0 {
queue = append(queue, task.ID)
}
}
layers := make([][]TaskSpec, 0)
processed := 0
for len(queue) > 0 {
current := queue
queue = nil
layer := make([]TaskSpec, len(current))
for i, id := range current {
layer[i] = idToTask[id]
processed++
}
layers = append(layers, layer)
next := make([]string, 0)
for _, id := range current {
for _, neighbor := range adj[id] {
indegree[neighbor]--
if indegree[neighbor] == 0 {
next = append(next, neighbor)
}
}
}
queue = append(queue, next...)
}
if processed != len(tasks) {
cycleIDs := make([]string, 0)
for id, deg := range indegree {
if deg > 0 {
cycleIDs = append(cycleIDs, id)
}
}
sort.Strings(cycleIDs)
return nil, fmt.Errorf("cycle detected involving tasks: %s", strings.Join(cycleIDs, ","))
}
return layers, nil
}
var runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
if task.WorkDir == "" {
task.WorkDir = defaultWorkdir
}
if task.Mode == "" {
task.Mode = "new"
}
if task.UseStdin || shouldUseStdin(task.Task, false) {
task.UseStdin = true
}
return runCodexTask(task, true, timeout)
}
func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult {
totalTasks := 0
for _, layer := range layers {
totalTasks += len(layer)
}
results := make([]TaskResult, 0, totalTasks)
failed := make(map[string]TaskResult, totalTasks)
resultsCh := make(chan TaskResult, totalTasks)
for _, layer := range layers {
var wg sync.WaitGroup
executed := 0
for _, task := range layer {
if skip, reason := shouldSkipTask(task, failed); skip {
res := TaskResult{TaskID: task.ID, ExitCode: 1, Error: reason}
results = append(results, res)
failed[task.ID] = res
continue
}
executed++
wg.Add(1)
go func(ts TaskSpec) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)}
}
}()
resultsCh <- runCodexTaskFn(ts, timeout)
}(task)
}
wg.Wait()
for i := 0; i < executed; i++ {
res := <-resultsCh
results = append(results, res)
if res.ExitCode != 0 || res.Error != "" {
failed[res.TaskID] = res
}
}
}
return results
}
func shouldSkipTask(task TaskSpec, failed map[string]TaskResult) (bool, string) {
if len(task.Dependencies) == 0 {
return false, ""
}
var blocked []string
for _, dep := range task.Dependencies {
if _, ok := failed[dep]; ok {
blocked = append(blocked, dep)
}
}
if len(blocked) == 0 {
return false, ""
}
return true, fmt.Sprintf("skipped due to failed dependencies: %s", strings.Join(blocked, ","))
}
func generateFinalOutput(results []TaskResult) string {
var sb strings.Builder
success := 0
failed := 0
for _, res := range results {
if res.ExitCode == 0 && res.Error == "" {
success++
} else {
failed++
}
}
sb.WriteString(fmt.Sprintf("=== Parallel Execution Summary ===\n"))
sb.WriteString(fmt.Sprintf("Total: %d | Success: %d | Failed: %d\n\n", len(results), success, failed))
for _, res := range results {
sb.WriteString(fmt.Sprintf("--- Task: %s ---\n", res.TaskID))
if res.Error != "" {
sb.WriteString(fmt.Sprintf("Status: FAILED (exit code %d)\nError: %s\n", res.ExitCode, res.Error))
} else if res.ExitCode != 0 {
sb.WriteString(fmt.Sprintf("Status: FAILED (exit code %d)\n", res.ExitCode))
} else {
sb.WriteString("Status: SUCCESS\n")
}
if res.SessionID != "" {
sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID))
}
if res.Message != "" {
sb.WriteString(fmt.Sprintf("\n%s\n", res.Message))
}
sb.WriteString("\n")
}
return sb.String()
}
// JSONEvent represents a Codex JSON output event
type JSONEvent struct {
Type string `json:"type"`
@@ -68,6 +362,38 @@ func run() int {
case "--help", "-h":
printHelp()
return 0
case "--parallel":
// Parallel mode: read task config from stdin
data, err := io.ReadAll(stdinReader)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: failed to read stdin: %v\n", err)
return 1
}
cfg, err := parseParallelConfig(data)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %v\n", err)
return 1
}
timeoutSec := resolveTimeout()
layers, err := topologicalSort(cfg.Tasks)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %v\n", err)
return 1
}
results := executeConcurrent(layers, timeoutSec)
fmt.Println(generateFinalOutput(results))
exitCode := 0
for _, res := range results {
if res.ExitCode != 0 {
exitCode = res.ExitCode
}
}
return exitCode
}
}
@@ -127,6 +453,18 @@ func run() int {
if strings.Contains(taskText, "\\") {
reasons = append(reasons, "backslash")
}
if strings.Contains(taskText, "\"") {
reasons = append(reasons, "double-quote")
}
if strings.Contains(taskText, "'") {
reasons = append(reasons, "single-quote")
}
if strings.Contains(taskText, "`") {
reasons = append(reasons, "backtick")
}
if strings.Contains(taskText, "$") {
reasons = append(reasons, "dollar")
}
if len(taskText) > 800 {
reasons = append(reasons, "length>800")
}
@@ -135,26 +473,28 @@ func run() int {
}
}
targetArg := taskText
if useStdin {
targetArg = "-"
}
codexArgs := buildCodexArgs(cfg, targetArg)
logInfo("codex running...")
message, threadID, exitCode := runCodexProcess(codexArgs, taskText, useStdin, cfg.Timeout)
taskSpec := TaskSpec{
Task: taskText,
WorkDir: cfg.WorkDir,
Mode: cfg.Mode,
SessionID: cfg.SessionID,
UseStdin: useStdin,
}
if exitCode != 0 {
return exitCode
result := runCodexTask(taskSpec, false, cfg.Timeout)
if result.ExitCode != 0 {
return result.ExitCode
}
// Output agent_message
fmt.Println(message)
fmt.Println(result.Message)
// Output session_id if present
if threadID != "" {
fmt.Printf("\n---\nSESSION_ID: %s\n", threadID)
if result.SessionID != "" {
fmt.Printf("\n---\nSESSION_ID: %s\n", result.SessionID)
}
return 0
@@ -213,16 +553,10 @@ func shouldUseStdin(taskText string, piped bool) bool {
if piped {
return true
}
if strings.Contains(taskText, "\n") {
return true
}
if strings.Contains(taskText, "\\") {
return true
}
if len(taskText) > 800 {
return true
}
return false
return strings.IndexAny(taskText, stdinSpecialChars) >= 0
}
func buildCodexArgs(cfg *Config, targetArg string) []string {
@@ -245,12 +579,48 @@ func buildCodexArgs(cfg *Config, targetArg string) []string {
}
}
func runCodexProcess(codexArgs []string, taskText string, useStdin bool, timeoutSec int) (message, threadID string, exitCode int) {
func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult {
result := TaskResult{
TaskID: taskSpec.ID,
}
cfg := &Config{
Mode: taskSpec.Mode,
Task: taskSpec.Task,
SessionID: taskSpec.SessionID,
WorkDir: taskSpec.WorkDir,
}
if cfg.Mode == "" {
cfg.Mode = "new"
}
if cfg.WorkDir == "" {
cfg.WorkDir = defaultWorkdir
}
useStdin := taskSpec.UseStdin
targetArg := taskSpec.Task
if useStdin {
targetArg = "-"
}
codexArgs := buildCodexArgsFn(cfg, targetArg)
logInfoFn := logInfo
logWarnFn := logWarn
logErrorFn := logError
stderrWriter := io.Writer(os.Stderr)
if silent {
logInfoFn = func(string) {}
logWarnFn = func(string) {}
logErrorFn = func(string) {}
stderrWriter = io.Discard
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSec)*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, codexCommand, codexArgs...)
cmd.Stderr = os.Stderr
cmd := commandContext(ctx, codexCommand, codexArgs...)
cmd.Stderr = stderrWriter
// Setup stdin if needed
var stdinPipe io.WriteCloser
@@ -258,98 +628,132 @@ func runCodexProcess(codexArgs []string, taskText string, useStdin bool, timeout
if useStdin {
stdinPipe, err = cmd.StdinPipe()
if err != nil {
logError("Failed to create stdin pipe: " + err.Error())
return "", "", 1
logErrorFn("Failed to create stdin pipe: " + err.Error())
result.ExitCode = 1
result.Error = "failed to create stdin pipe: " + err.Error()
return result
}
}
// Setup stdout
stdout, err := cmd.StdoutPipe()
if err != nil {
logError("Failed to create stdout pipe: " + err.Error())
return "", "", 1
logErrorFn("Failed to create stdout pipe: " + err.Error())
result.ExitCode = 1
result.Error = "failed to create stdout pipe: " + err.Error()
return result
}
logInfo(fmt.Sprintf("Starting codex with args: codex %s...", strings.Join(codexArgs[:min(5, len(codexArgs))], " ")))
logInfoFn(fmt.Sprintf("Starting codex with args: codex %s...", strings.Join(codexArgs[:min(5, len(codexArgs))], " ")))
// Start process
if err := cmd.Start(); err != nil {
if strings.Contains(err.Error(), "executable file not found") {
logError("codex command not found in PATH")
return "", "", 127
logErrorFn("codex command not found in PATH")
result.ExitCode = 127
result.Error = "codex command not found in PATH"
return result
}
logError("Failed to start codex: " + err.Error())
return "", "", 1
logErrorFn("Failed to start codex: " + err.Error())
result.ExitCode = 1
result.Error = "failed to start codex: " + err.Error()
return result
}
logInfo(fmt.Sprintf("Process started with PID: %d", cmd.Process.Pid))
logInfoFn(fmt.Sprintf("Process started with PID: %d", cmd.Process.Pid))
// Write to stdin if needed
if useStdin && stdinPipe != nil {
logInfo(fmt.Sprintf("Writing %d chars to stdin...", len(taskText)))
logInfoFn(fmt.Sprintf("Writing %d chars to stdin...", len(taskSpec.Task)))
go func() {
defer stdinPipe.Close()
io.WriteString(stdinPipe, taskText)
io.WriteString(stdinPipe, taskSpec.Task)
}()
logInfo("Stdin closed")
logInfoFn("Stdin closed")
}
// Setup signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
logError(fmt.Sprintf("Received signal: %v", sig))
if cmd.Process != nil {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() {
if cmd.Process != nil {
cmd.Process.Kill()
}
})
}
}()
forwardSignals(ctx, cmd, logErrorFn)
logInfo("Reading stdout...")
logInfoFn("Reading stdout...")
// Parse JSON stream
message, threadID = parseJSONStream(stdout)
message, threadID := parseJSONStreamWithWarn(stdout, logWarnFn)
// Wait for process to complete
err = cmd.Wait()
// Check for timeout
if ctx.Err() == context.DeadlineExceeded {
logError("Codex execution timeout")
logErrorFn("Codex execution timeout")
if cmd.Process != nil {
cmd.Process.Kill()
}
return "", "", 124
result.ExitCode = 124
result.Error = "codex execution timeout"
return result
}
// Check exit code
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
code := exitErr.ExitCode()
logError(fmt.Sprintf("Codex exited with status %d", code))
return "", "", code
logErrorFn(fmt.Sprintf("Codex exited with status %d", code))
result.ExitCode = code
result.Error = fmt.Sprintf("codex exited with status %d", code)
return result
}
logError("Codex error: " + err.Error())
return "", "", 1
logErrorFn("Codex error: " + err.Error())
result.ExitCode = 1
result.Error = "codex error: " + err.Error()
return result
}
if message == "" {
logError("Codex completed without agent_message output")
return "", "", 1
logErrorFn("Codex completed without agent_message output")
result.ExitCode = 1
result.Error = "codex completed without agent_message output"
return result
}
return message, threadID, 0
result.ExitCode = 0
result.Message = message
result.SessionID = threadID
return result
}
func forwardSignals(ctx context.Context, cmd *exec.Cmd, logErrorFn func(string)) {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
defer signal.Stop(sigCh)
select {
case sig := <-sigCh:
logErrorFn(fmt.Sprintf("Received signal: %v", sig))
if cmd.Process != nil {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() {
if cmd.Process != nil {
cmd.Process.Kill()
}
})
}
case <-ctx.Done():
}
}()
}
func parseJSONStream(r io.Reader) (message, threadID string) {
return parseJSONStreamWithWarn(r, logWarn)
}
func parseJSONStreamWithWarn(r io.Reader, warnFn func(string)) (message, threadID string) {
scanner := bufio.NewScanner(r)
// Set larger buffer for long lines
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
if warnFn == nil {
warnFn = func(string) {}
}
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
@@ -359,7 +763,7 @@ func parseJSONStream(r io.Reader) (message, threadID string) {
var event JSONEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
logWarn(fmt.Sprintf("Failed to parse line: %s", truncate(line, 100)))
warnFn(fmt.Sprintf("Failed to parse line: %s", truncate(line, 100)))
continue
}
@@ -369,18 +773,15 @@ func parseJSONStream(r io.Reader) (message, threadID string) {
}
// Capture agent_message
if event.Type == "item.completed" && event.Item != nil {
if event.Item.Type == "agent_message" {
text := normalizeText(event.Item.Text)
if text != "" {
message = text
}
if event.Type == "item.completed" && event.Item != nil && event.Item.Type == "agent_message" {
if text := normalizeText(event.Item.Text); text != "" {
message = text
}
}
}
if err := scanner.Err(); err != nil {
logWarn("Scanner error: " + err.Error())
if err := scanner.Err(); err != nil && err != io.EOF {
warnFn("Read stdout error: " + err.Error())
}
return message, threadID
@@ -455,6 +856,18 @@ func min(a, b int) int {
return b
}
func hello() string {
return "hello world"
}
func greet(name string) string {
return "hello " + name
}
func farewell(name string) string {
return "goodbye " + name
}
func logInfo(msg string) {
fmt.Fprintf(os.Stderr, "INFO: %s\n", msg)
}

View File

@@ -0,0 +1,400 @@
package main
import (
"bytes"
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
type integrationSummary struct {
Total int `json:"total"`
Success int `json:"success"`
Failed int `json:"failed"`
}
type integrationOutput struct {
Results []TaskResult `json:"results"`
Summary integrationSummary `json:"summary"`
}
func captureStdout(t *testing.T, fn func()) string {
t.Helper()
old := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w
fn()
w.Close()
os.Stdout = old
var buf bytes.Buffer
io.Copy(&buf, r)
return buf.String()
}
func parseIntegrationOutput(t *testing.T, out string) integrationOutput {
t.Helper()
var payload integrationOutput
lines := strings.Split(out, "\n")
var currentTask *TaskResult
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "Total:") {
parts := strings.Split(line, "|")
for _, p := range parts {
p = strings.TrimSpace(p)
if strings.HasPrefix(p, "Total:") {
fmt.Sscanf(p, "Total: %d", &payload.Summary.Total)
} else if strings.HasPrefix(p, "Success:") {
fmt.Sscanf(p, "Success: %d", &payload.Summary.Success)
} else if strings.HasPrefix(p, "Failed:") {
fmt.Sscanf(p, "Failed: %d", &payload.Summary.Failed)
}
}
} else if strings.HasPrefix(line, "--- Task:") {
if currentTask != nil {
payload.Results = append(payload.Results, *currentTask)
}
currentTask = &TaskResult{}
currentTask.TaskID = strings.TrimSuffix(strings.TrimPrefix(line, "--- Task: "), " ---")
} else if currentTask != nil {
if strings.HasPrefix(line, "Status: SUCCESS") {
currentTask.ExitCode = 0
} else if strings.HasPrefix(line, "Status: FAILED") {
if strings.Contains(line, "exit code") {
fmt.Sscanf(line, "Status: FAILED (exit code %d)", &currentTask.ExitCode)
} else {
currentTask.ExitCode = 1
}
} else if strings.HasPrefix(line, "Error:") {
currentTask.Error = strings.TrimPrefix(line, "Error: ")
} else if strings.HasPrefix(line, "Session:") {
currentTask.SessionID = strings.TrimPrefix(line, "Session: ")
} else if line != "" && !strings.HasPrefix(line, "===") && !strings.HasPrefix(line, "---") {
if currentTask.Message != "" {
currentTask.Message += "\n"
}
currentTask.Message += line
}
}
}
if currentTask != nil {
payload.Results = append(payload.Results, *currentTask)
}
return payload
}
func findResultByID(t *testing.T, payload integrationOutput, id string) TaskResult {
t.Helper()
for _, res := range payload.Results {
if res.TaskID == id {
return res
}
}
t.Fatalf("result for task %s not found", id)
return TaskResult{}
}
func TestParallelEndToEnd_OrderAndConcurrency(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
input := `---TASK---
id: A
---CONTENT---
task-a
---TASK---
id: B
dependencies: A
---CONTENT---
task-b
---TASK---
id: C
dependencies: B
---CONTENT---
task-c
---TASK---
id: D
---CONTENT---
task-d
---TASK---
id: E
---CONTENT---
task-e`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
var mu sync.Mutex
starts := make(map[string]time.Time)
ends := make(map[string]time.Time)
var running int64
var maxParallel int64
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
start := time.Now()
mu.Lock()
starts[task.ID] = start
mu.Unlock()
cur := atomic.AddInt64(&running, 1)
for {
prev := atomic.LoadInt64(&maxParallel)
if cur <= prev {
break
}
if atomic.CompareAndSwapInt64(&maxParallel, prev, cur) {
break
}
}
time.Sleep(40 * time.Millisecond)
mu.Lock()
ends[task.ID] = time.Now()
mu.Unlock()
atomic.AddInt64(&running, -1)
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task}
}
var exitCode int
output := captureStdout(t, func() {
exitCode = run()
})
if exitCode != 0 {
t.Fatalf("run() exit = %d, want 0", exitCode)
}
payload := parseIntegrationOutput(t, output)
if payload.Summary.Failed != 0 || payload.Summary.Total != 5 || payload.Summary.Success != 5 {
t.Fatalf("unexpected summary: %+v", payload.Summary)
}
aEnd := ends["A"]
bStart := starts["B"]
cStart := starts["C"]
bEnd := ends["B"]
if aEnd.IsZero() || bStart.IsZero() || bEnd.IsZero() || cStart.IsZero() {
t.Fatalf("missing timestamps, starts=%v ends=%v", starts, ends)
}
if !aEnd.Before(bStart) && !aEnd.Equal(bStart) {
t.Fatalf("B should start after A ends: A_end=%v B_start=%v", aEnd, bStart)
}
if !bEnd.Before(cStart) && !bEnd.Equal(cStart) {
t.Fatalf("C should start after B ends: B_end=%v C_start=%v", bEnd, cStart)
}
dStart := starts["D"]
eStart := starts["E"]
if dStart.IsZero() || eStart.IsZero() {
t.Fatalf("missing D/E start times: %v", starts)
}
delta := dStart.Sub(eStart)
if delta < 0 {
delta = -delta
}
if delta > 25*time.Millisecond {
t.Fatalf("D and E should run in parallel, delta=%v", delta)
}
if maxParallel < 2 {
t.Fatalf("expected at least 2 concurrent tasks, got %d", maxParallel)
}
}
func TestParallelCycleDetectionStopsExecution(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
t.Fatalf("task %s should not execute on cycle", task.ID)
return TaskResult{}
}
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
input := `---TASK---
id: A
dependencies: B
---CONTENT---
a
---TASK---
id: B
dependencies: A
---CONTENT---
b`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
exitCode := 0
output := captureStdout(t, func() {
exitCode = run()
})
if exitCode == 0 {
t.Fatalf("cycle should cause non-zero exit, got %d", exitCode)
}
if strings.TrimSpace(output) != "" {
t.Fatalf("expected no JSON output on cycle, got %q", output)
}
}
func TestParallelPartialFailureBlocksDependents(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
if task.ID == "A" {
return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom"}
}
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task}
}
input := `---TASK---
id: A
---CONTENT---
fail
---TASK---
id: B
dependencies: A
---CONTENT---
blocked
---TASK---
id: D
---CONTENT---
ok-d
---TASK---
id: E
---CONTENT---
ok-e`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
var exitCode int
output := captureStdout(t, func() {
exitCode = run()
})
payload := parseIntegrationOutput(t, output)
if exitCode == 0 {
t.Fatalf("expected non-zero exit when a task fails, got %d", exitCode)
}
resA := findResultByID(t, payload, "A")
resB := findResultByID(t, payload, "B")
resD := findResultByID(t, payload, "D")
resE := findResultByID(t, payload, "E")
if resA.ExitCode == 0 {
t.Fatalf("task A should fail, got %+v", resA)
}
if resB.ExitCode == 0 || !strings.Contains(resB.Error, "dependencies") {
t.Fatalf("task B should be skipped due to dependency failure, got %+v", resB)
}
if resD.ExitCode != 0 || resE.ExitCode != 0 {
t.Fatalf("independent tasks should run successfully, D=%+v E=%+v", resD, resE)
}
if payload.Summary.Failed != 2 || payload.Summary.Total != 4 {
t.Fatalf("unexpected summary after partial failure: %+v", payload.Summary)
}
}
func TestParallelTimeoutPropagation(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
os.Unsetenv("CODEX_TIMEOUT")
})
var receivedTimeout int
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
receivedTimeout = timeout
return TaskResult{TaskID: task.ID, ExitCode: 124, Error: "timeout"}
}
os.Setenv("CODEX_TIMEOUT", "1")
input := `---TASK---
id: T
---CONTENT---
slow`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
exitCode := 0
output := captureStdout(t, func() {
exitCode = run()
})
payload := parseIntegrationOutput(t, output)
if receivedTimeout != 1 {
t.Fatalf("expected timeout 1s to propagate, got %d", receivedTimeout)
}
if exitCode != 124 {
t.Fatalf("expected timeout exit code 124, got %d", exitCode)
}
if payload.Summary.Failed != 1 || payload.Summary.Total != 1 {
t.Fatalf("unexpected summary for timeout case: %+v", payload.Summary)
}
res := findResultByID(t, payload, "T")
if res.Error == "" || res.ExitCode != 124 {
t.Fatalf("timeout result not propagated, got %+v", res)
}
}
func TestConcurrentSpeedupBenchmark(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
time.Sleep(50 * time.Millisecond)
return TaskResult{TaskID: task.ID}
}
tasks := make([]TaskSpec, 10)
for i := range tasks {
tasks[i] = TaskSpec{ID: fmt.Sprintf("task-%d", i)}
}
layers := [][]TaskSpec{tasks}
serialStart := time.Now()
for _, task := range tasks {
_ = runCodexTaskFn(task, 5)
}
serialElapsed := time.Since(serialStart)
concurrentStart := time.Now()
_ = executeConcurrent(layers, 5)
concurrentElapsed := time.Since(concurrentStart)
if concurrentElapsed >= serialElapsed/5 {
t.Fatalf("expected concurrent time <20%% of serial, serial=%v concurrent=%v", serialElapsed, concurrentElapsed)
}
ratio := float64(concurrentElapsed) / float64(serialElapsed)
t.Logf("speedup ratio (concurrent/serial)=%.3f", ratio)
}

View File

@@ -2,10 +2,19 @@ package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
)
// Helper to reset test hooks
@@ -13,6 +22,9 @@ func resetTestHooks() {
stdinReader = os.Stdin
isTerminalFn = defaultIsTerminal
codexCommand = "codex"
buildCodexArgsFn = buildCodexArgs
commandContext = exec.CommandContext
jsonMarshal = json.Marshal
}
func TestParseArgs_NewMode(t *testing.T) {
@@ -192,6 +204,113 @@ func TestParseArgs_ResumeMode(t *testing.T) {
}
}
func TestParseParallelConfig_Success(t *testing.T) {
input := `---TASK---
id: task-1
dependencies: task-0
---CONTENT---
do something`
cfg, err := parseParallelConfig([]byte(input))
if err != nil {
t.Fatalf("parseParallelConfig() unexpected error: %v", err)
}
if len(cfg.Tasks) != 1 {
t.Fatalf("expected 1 task, got %d", len(cfg.Tasks))
}
task := cfg.Tasks[0]
if task.ID != "task-1" {
t.Errorf("task.ID = %q, want %q", task.ID, "task-1")
}
if task.Task != "do something" {
t.Errorf("task.Task = %q, want %q", task.Task, "do something")
}
if task.WorkDir != defaultWorkdir {
t.Errorf("task.WorkDir = %q, want %q", task.WorkDir, defaultWorkdir)
}
if len(task.Dependencies) != 1 || task.Dependencies[0] != "task-0" {
t.Errorf("dependencies = %v, want [task-0]", task.Dependencies)
}
}
func TestParseParallelConfig_InvalidFormat(t *testing.T) {
if _, err := parseParallelConfig([]byte("invalid format")); err == nil {
t.Fatalf("expected error for invalid format, got nil")
}
}
func TestParseParallelConfig_EmptyTasks(t *testing.T) {
input := `---TASK---
id: empty
---CONTENT---
`
if _, err := parseParallelConfig([]byte(input)); err == nil {
t.Fatalf("expected error for empty tasks array, got nil")
}
}
func TestParseParallelConfig_MissingID(t *testing.T) {
input := `---TASK---
---CONTENT---
do something`
if _, err := parseParallelConfig([]byte(input)); err == nil {
t.Fatalf("expected error for missing id, got nil")
}
}
func TestParseParallelConfig_MissingTask(t *testing.T) {
input := `---TASK---
id: task-1
---CONTENT---
`
if _, err := parseParallelConfig([]byte(input)); err == nil {
t.Fatalf("expected error for missing task, got nil")
}
}
func TestParseParallelConfig_DuplicateID(t *testing.T) {
input := `---TASK---
id: dup
---CONTENT---
one
---TASK---
id: dup
---CONTENT---
two`
if _, err := parseParallelConfig([]byte(input)); err == nil {
t.Fatalf("expected error for duplicate id, got nil")
}
}
func TestParseParallelConfig_DelimiterFormat(t *testing.T) {
input := `---TASK---
id: T1
workdir: /tmp
---CONTENT---
echo 'test'
---TASK---
id: T2
dependencies: T1
---CONTENT---
code with special chars: $var "quotes"`
cfg, err := parseParallelConfig([]byte(input))
if err != nil {
t.Fatalf("parseParallelConfig() error = %v", err)
}
if len(cfg.Tasks) != 2 {
t.Fatalf("expected 2 tasks, got %d", len(cfg.Tasks))
}
if cfg.Tasks[0].ID != "T1" || cfg.Tasks[0].Task != "echo 'test'" {
t.Errorf("task T1 mismatch")
}
if cfg.Tasks[1].ID != "T2" || len(cfg.Tasks[1].Dependencies) != 1 {
t.Errorf("task T2 mismatch")
}
}
func TestShouldUseStdin(t *testing.T) {
tests := []struct {
name string
@@ -203,6 +322,10 @@ func TestShouldUseStdin(t *testing.T) {
{"piped input", "analyze code", true, true},
{"contains newline", "line1\nline2", false, true},
{"contains backslash", "path\\to\\file", false, true},
{"contains double quote", `say "hi"`, false, true},
{"contains single quote", "it's tricky", false, true},
{"contains backtick", "use `code`", false, true},
{"contains dollar", "price is $5", false, true},
{"long task", strings.Repeat("a", 801), false, true},
{"exactly 800 chars", strings.Repeat("a", 800), false, false},
}
@@ -330,12 +453,16 @@ func TestNormalizeText(t *testing.T) {
}
func TestParseJSONStream(t *testing.T) {
tests := []struct {
type testCase struct {
name string
input string
wantMessage string
wantThreadID string
}{
}
longText := strings.Repeat("a", 2*1024*1024) // >1MB agent_message payload
tests := []testCase{
{
name: "thread started and agent message",
input: `{"type":"thread.started","thread_id":"abc-123"}
@@ -364,6 +491,12 @@ func TestParseJSONStream(t *testing.T) {
wantMessage: "Valid",
wantThreadID: "",
},
{
name: "super long single line (>1MB)",
input: `{"type":"item.completed","item":{"type":"agent_message","text":"` + longText + `"}}`,
wantMessage: longText,
wantThreadID: "",
},
{
name: "empty input",
input: "",
@@ -371,23 +504,25 @@ func TestParseJSONStream(t *testing.T) {
wantThreadID: "",
},
{
name: "invalid JSON (skipped)",
input: "not valid json\n{\"type\":\"thread.started\",\"thread_id\":\"xyz\"}",
name: "item completed with nil item",
input: strings.Join([]string{
`{"type":"thread.started","thread_id":"nil-item-thread"}`,
`{"type":"item.completed","item":null}`,
}, "\n"),
wantMessage: "",
wantThreadID: "xyz",
wantThreadID: "nil-item-thread",
},
{
name: "blank lines ignored",
input: "\n\n{\"type\":\"thread.started\",\"thread_id\":\"test\"}\n\n",
name: "agent message with non-string text",
input: `{"type":"item.completed","item":{"type":"agent_message","text":12345}}`,
wantMessage: "",
wantThreadID: "test",
wantThreadID: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := strings.NewReader(tt.input)
gotMessage, gotThreadID := parseJSONStream(r)
gotMessage, gotThreadID := parseJSONStream(strings.NewReader(tt.input))
if gotMessage != tt.wantMessage {
t.Errorf("parseJSONStream() message = %q, want %q", gotMessage, tt.wantMessage)
@@ -399,6 +534,21 @@ func TestParseJSONStream(t *testing.T) {
}
}
func TestParseJSONStreamWithWarn_InvalidLine(t *testing.T) {
var warnings []string
warnFn := func(msg string) {
warnings = append(warnings, msg)
}
message, threadID := parseJSONStreamWithWarn(strings.NewReader("not-json"), warnFn)
if message != "" || threadID != "" {
t.Fatalf("expected empty output for invalid json, got message=%q thread=%q", message, threadID)
}
if len(warnings) == 0 {
t.Fatalf("expected warning to be emitted")
}
}
func TestGetEnv(t *testing.T) {
tests := []struct {
name string
@@ -474,6 +624,34 @@ func TestMin(t *testing.T) {
}
}
func TestHello(t *testing.T) {
got := hello()
if got != "hello world" {
t.Fatalf("hello() = %q, want %q", got, "hello world")
}
}
func TestGreet(t *testing.T) {
got := greet("Linus")
if got != "hello Linus" {
t.Fatalf("greet() = %q, want %q", got, "hello Linus")
}
}
func TestFarewell(t *testing.T) {
got := farewell("Linus")
if got != "goodbye Linus" {
t.Fatalf("farewell() = %q, want %q", got, "goodbye Linus")
}
}
func TestFarewellEmpty(t *testing.T) {
got := farewell("")
if got != "goodbye " {
t.Fatalf("farewell(\"\") = %q, want %q", got, "goodbye ")
}
}
func TestLogFunctions(t *testing.T) {
// Capture stderr
oldStderr := os.Stderr
@@ -584,82 +762,270 @@ func TestReadPipedTask(t *testing.T) {
}
}
// Tests for runCodexProcess with mock command
func TestRunCodexProcess_CommandNotFound(t *testing.T) {
// Tests for runCodexTask with mock command
func TestRunCodexTask_CommandNotFound(t *testing.T) {
defer resetTestHooks()
codexCommand = "nonexistent-command-xyz"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{targetArg} }
_, _, exitCode := runCodexProcess([]string{"arg1"}, "task", false, 10)
res := runCodexTask(TaskSpec{Task: "task"}, false, 10)
if exitCode != 127 {
t.Errorf("runCodexProcess() exitCode = %d, want 127 for command not found", exitCode)
if res.ExitCode != 127 {
t.Errorf("runCodexTask() exitCode = %d, want 127 for command not found", res.ExitCode)
}
if res.Error == "" {
t.Errorf("runCodexTask() expected error message for missing command")
}
}
func TestRunCodexProcess_WithEcho(t *testing.T) {
func TestRunCodexTask_StartError(t *testing.T) {
defer resetTestHooks()
tmpFile, err := os.CreateTemp("", "start-error")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
codexCommand = tmpFile.Name()
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{} }
res := runCodexTask(TaskSpec{Task: "task"}, false, 1)
if res.ExitCode != 1 {
t.Fatalf("runCodexTask() exitCode = %d, want 1 for start error", res.ExitCode)
}
if !strings.Contains(res.Error, "failed to start codex") {
t.Fatalf("runCodexTask() unexpected error: %s", res.Error)
}
}
func TestRunCodexTask_WithEcho(t *testing.T) {
defer resetTestHooks()
// Use echo to simulate codex output
codexCommand = "echo"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{targetArg} }
jsonOutput := `{"type":"thread.started","thread_id":"test-session"}
{"type":"item.completed","item":{"type":"agent_message","text":"Test output"}}`
message, threadID, exitCode := runCodexProcess([]string{jsonOutput}, "", false, 10)
res := runCodexTask(TaskSpec{Task: jsonOutput}, false, 10)
if exitCode != 0 {
t.Errorf("runCodexProcess() exitCode = %d, want 0", exitCode)
if res.ExitCode != 0 {
t.Errorf("runCodexTask() exitCode = %d, want 0", res.ExitCode)
}
if message != "Test output" {
t.Errorf("runCodexProcess() message = %q, want %q", message, "Test output")
if res.Message != "Test output" {
t.Errorf("runCodexTask() message = %q, want %q", res.Message, "Test output")
}
if threadID != "test-session" {
t.Errorf("runCodexProcess() threadID = %q, want %q", threadID, "test-session")
if res.SessionID != "test-session" {
t.Errorf("runCodexTask() sessionID = %q, want %q", res.SessionID, "test-session")
}
}
func TestRunCodexProcess_NoMessage(t *testing.T) {
func TestRunCodexTask_NoMessage(t *testing.T) {
defer resetTestHooks()
codexCommand = "echo"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{targetArg} }
// Output without agent_message
jsonOutput := `{"type":"thread.started","thread_id":"test-session"}`
_, _, exitCode := runCodexProcess([]string{jsonOutput}, "", false, 10)
res := runCodexTask(TaskSpec{Task: jsonOutput}, false, 10)
if exitCode != 1 {
t.Errorf("runCodexProcess() exitCode = %d, want 1 for no message", exitCode)
if res.ExitCode != 1 {
t.Errorf("runCodexTask() exitCode = %d, want 1 for no message", res.ExitCode)
}
if res.Error == "" {
t.Errorf("runCodexTask() expected error for missing agent_message output")
}
}
func TestRunCodexProcess_WithStdin(t *testing.T) {
func TestRunCodexTask_WithStdin(t *testing.T) {
defer resetTestHooks()
// Use cat to echo stdin back
codexCommand = "cat"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{} }
message, _, exitCode := runCodexProcess([]string{}, `{"type":"item.completed","item":{"type":"agent_message","text":"from stdin"}}`, true, 10)
jsonInput := `{"type":"item.completed","item":{"type":"agent_message","text":"from stdin"}}`
if exitCode != 0 {
t.Errorf("runCodexProcess() exitCode = %d, want 0", exitCode)
res := runCodexTask(TaskSpec{Task: jsonInput, UseStdin: true}, false, 10)
if res.ExitCode != 0 {
t.Errorf("runCodexTask() exitCode = %d, want 0", res.ExitCode)
}
if message != "from stdin" {
t.Errorf("runCodexProcess() message = %q, want %q", message, "from stdin")
if res.Message != "from stdin" {
t.Errorf("runCodexTask() message = %q, want %q", res.Message, "from stdin")
}
}
func TestRunCodexProcess_ExitError(t *testing.T) {
func TestRunCodexTask_ExitError(t *testing.T) {
defer resetTestHooks()
// Use false command which exits with code 1
codexCommand = "false"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{} }
_, _, exitCode := runCodexProcess([]string{}, "", false, 10)
res := runCodexTask(TaskSpec{Task: "noop"}, false, 10)
if exitCode == 0 {
t.Errorf("runCodexProcess() exitCode = 0, want non-zero for failed command")
if res.ExitCode == 0 {
t.Errorf("runCodexTask() exitCode = 0, want non-zero for failed command")
}
if res.Error == "" {
t.Errorf("runCodexTask() expected error message for failed command")
}
}
func TestRunCodexTask_StdinPipeError(t *testing.T) {
defer resetTestHooks()
commandContext = func(ctx context.Context, name string, args ...string) *exec.Cmd {
cmd := exec.CommandContext(ctx, "cat")
cmd.Stdin = os.Stdin
return cmd
}
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{} }
res := runCodexTask(TaskSpec{Task: "data", UseStdin: true}, false, 1)
if res.ExitCode != 1 || !strings.Contains(res.Error, "stdin pipe") {
t.Fatalf("expected stdin pipe error, got %+v", res)
}
}
func TestRunCodexTask_StdoutPipeError(t *testing.T) {
defer resetTestHooks()
commandContext = func(ctx context.Context, name string, args ...string) *exec.Cmd {
cmd := exec.CommandContext(ctx, "echo", "noop")
cmd.Stdout = os.Stdout
return cmd
}
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{} }
res := runCodexTask(TaskSpec{Task: "noop"}, false, 1)
if res.ExitCode != 1 || !strings.Contains(res.Error, "stdout pipe") {
t.Fatalf("expected stdout pipe error, got %+v", res)
}
}
func TestRunCodexTask_Timeout(t *testing.T) {
defer resetTestHooks()
codexCommand = "sleep"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{"2"} }
res := runCodexTask(TaskSpec{Task: "ignored"}, false, 1)
if res.ExitCode != 124 || !strings.Contains(res.Error, "timeout") {
t.Fatalf("expected timeout exit, got %+v", res)
}
}
func TestRunCodexTask_SignalHandling(t *testing.T) {
defer resetTestHooks()
codexCommand = "sleep"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{"5"} }
resultCh := make(chan TaskResult, 1)
go func() {
resultCh <- runCodexTask(TaskSpec{Task: "ignored"}, false, 5)
}()
time.Sleep(200 * time.Millisecond)
syscall.Kill(os.Getpid(), syscall.SIGTERM)
res := <-resultCh
signal.Reset(syscall.SIGINT, syscall.SIGTERM)
if res.ExitCode == 0 {
t.Fatalf("expected non-zero exit after signal, got %+v", res)
}
if res.Error == "" {
t.Fatalf("expected error after signal, got %+v", res)
}
}
func TestSilentMode(t *testing.T) {
defer resetTestHooks()
jsonOutput := `{"type":"thread.started","thread_id":"silent-session"}
{"type":"item.completed","item":{"type":"agent_message","text":"quiet"}}`
codexCommand = "echo"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{targetArg} }
capture := func(silent bool) string {
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
res := runCodexTask(TaskSpec{Task: jsonOutput}, silent, 10)
if res.ExitCode != 0 {
t.Fatalf("runCodexTask() unexpected exitCode %d", res.ExitCode)
}
w.Close()
os.Stderr = oldStderr
var buf bytes.Buffer
io.Copy(&buf, r)
return buf.String()
}
verbose := capture(false)
quiet := capture(true)
if quiet != "" {
t.Fatalf("silent mode should suppress stderr, got: %q", quiet)
}
if !strings.Contains(verbose, "INFO: Starting codex") {
t.Fatalf("non-silent mode should log to stderr, got: %q", verbose)
}
}
func TestGenerateFinalOutput(t *testing.T) {
results := []TaskResult{
{TaskID: "a", ExitCode: 0, Message: "ok"},
{TaskID: "b", ExitCode: 1, Error: "boom"},
{TaskID: "c", ExitCode: 0},
}
out := generateFinalOutput(results)
if out == "" {
t.Fatalf("generateFinalOutput() returned empty string")
}
if !strings.Contains(out, "Total: 3") {
t.Errorf("output missing 'Total: 3'")
}
if !strings.Contains(out, "Success: 2") {
t.Errorf("output missing 'Success: 2'")
}
if !strings.Contains(out, "Failed: 1") {
t.Errorf("output missing 'Failed: 1'")
}
if !strings.Contains(out, "Task: a") {
t.Errorf("output missing task a")
}
if !strings.Contains(out, "Task: b") {
t.Errorf("output missing task b")
}
if !strings.Contains(out, "Status: SUCCESS") {
t.Errorf("output missing success status")
}
if !strings.Contains(out, "Status: FAILED") {
t.Errorf("output missing failed status")
}
}
func TestGenerateFinalOutput_MarshalError(t *testing.T) {
// This test is no longer relevant since we don't use JSON marshaling
// generateFinalOutput now uses string building
out := generateFinalOutput([]TaskResult{{TaskID: "x"}})
if out == "" {
t.Fatalf("generateFinalOutput() should not return empty string")
}
if !strings.Contains(out, "Task: x") {
t.Errorf("output should contain task x")
}
}
@@ -746,3 +1112,358 @@ func TestRun_CommandFails(t *testing.T) {
t.Errorf("run() with failing command returned 0, want non-zero")
}
}
func TestRun_CLI_Success(t *testing.T) {
defer resetTestHooks()
os.Args = []string{"codex-wrapper", "do-things"}
stdinReader = strings.NewReader("")
isTerminalFn = func() bool { return true }
codexCommand = "echo"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string {
return []string{
`{"type":"thread.started","thread_id":"cli-session"}` + "\n" +
`{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}`,
}
}
var exitCode int
output := captureStdout(t, func() {
exitCode = run()
})
if exitCode != 0 {
t.Fatalf("run() exit=%d, want 0", exitCode)
}
if !strings.Contains(output, "ok") {
t.Fatalf("expected agent output, got %q", output)
}
if !strings.Contains(output, "SESSION_ID: cli-session") {
t.Fatalf("expected session id output, got %q", output)
}
}
func TestTopologicalSort_LinearChain(t *testing.T) {
tasks := []TaskSpec{
{ID: "a"},
{ID: "b", Dependencies: []string{"a"}},
{ID: "c", Dependencies: []string{"b"}},
}
layers, err := topologicalSort(tasks)
if err != nil {
t.Fatalf("topologicalSort() unexpected error: %v", err)
}
if len(layers) != 3 {
t.Fatalf("expected 3 layers, got %d", len(layers))
}
if layers[0][0].ID != "a" || layers[1][0].ID != "b" || layers[2][0].ID != "c" {
t.Fatalf("unexpected order: %+v", layers)
}
}
func TestTopologicalSort_Branching(t *testing.T) {
tasks := []TaskSpec{
{ID: "root"},
{ID: "left", Dependencies: []string{"root"}},
{ID: "right", Dependencies: []string{"root"}},
{ID: "leaf", Dependencies: []string{"left", "right"}},
}
layers, err := topologicalSort(tasks)
if err != nil {
t.Fatalf("topologicalSort() unexpected error: %v", err)
}
if len(layers) != 3 {
t.Fatalf("expected 3 layers, got %d", len(layers))
}
if len(layers[1]) != 2 {
t.Fatalf("expected branching layer size 2, got %d", len(layers[1]))
}
}
func TestTopologicalSort_ParallelTasks(t *testing.T) {
tasks := []TaskSpec{{ID: "a"}, {ID: "b"}, {ID: "c"}}
layers, err := topologicalSort(tasks)
if err != nil {
t.Fatalf("topologicalSort() unexpected error: %v", err)
}
if len(layers) != 1 {
t.Fatalf("expected single layer, got %d", len(layers))
}
if len(layers[0]) != 3 {
t.Fatalf("expected 3 tasks in layer, got %d", len(layers[0]))
}
}
func TestShouldSkipTask(t *testing.T) {
failed := map[string]TaskResult{
"a": {TaskID: "a", ExitCode: 1},
"b": {TaskID: "b", ExitCode: 2},
}
tests := []struct {
name string
task TaskSpec
skip bool
reasonContains []string
}{
{"no deps", TaskSpec{ID: "c"}, false, nil},
{"missing deps not failed", TaskSpec{ID: "d", Dependencies: []string{"x"}}, false, nil},
{"single failed dep", TaskSpec{ID: "e", Dependencies: []string{"a"}}, true, []string{"a"}},
{"multiple failed deps", TaskSpec{ID: "f", Dependencies: []string{"a", "b"}}, true, []string{"a", "b"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
skip, reason := shouldSkipTask(tt.task, failed)
if skip != tt.skip {
t.Fatalf("shouldSkipTask(%s) skip=%v, want %v", tt.name, skip, tt.skip)
}
for _, expect := range tt.reasonContains {
if !strings.Contains(reason, expect) {
t.Fatalf("reason %q missing %q", reason, expect)
}
}
})
}
}
func TestTopologicalSort_CycleDetection(t *testing.T) {
tasks := []TaskSpec{
{ID: "a", Dependencies: []string{"b"}},
{ID: "b", Dependencies: []string{"a"}},
}
if _, err := topologicalSort(tasks); err == nil || !strings.Contains(err.Error(), "cycle detected") {
t.Fatalf("expected cycle error, got %v", err)
}
}
func TestTopologicalSort_IndirectCycle(t *testing.T) {
tasks := []TaskSpec{
{ID: "a", Dependencies: []string{"c"}},
{ID: "b", Dependencies: []string{"a"}},
{ID: "c", Dependencies: []string{"b"}},
}
if _, err := topologicalSort(tasks); err == nil || !strings.Contains(err.Error(), "cycle detected") {
t.Fatalf("expected indirect cycle error, got %v", err)
}
}
func TestTopologicalSort_MissingDependency(t *testing.T) {
tasks := []TaskSpec{
{ID: "a", Dependencies: []string{"missing"}},
}
if _, err := topologicalSort(tasks); err == nil || !strings.Contains(err.Error(), "dependency \"missing\" not found") {
t.Fatalf("expected missing dependency error, got %v", err)
}
}
func TestTopologicalSort_LargeGraph(t *testing.T) {
const count = 1000
tasks := make([]TaskSpec, count)
for i := 0; i < count; i++ {
id := fmt.Sprintf("task-%d", i)
if i == 0 {
tasks[i] = TaskSpec{ID: id}
continue
}
prev := fmt.Sprintf("task-%d", i-1)
tasks[i] = TaskSpec{ID: id, Dependencies: []string{prev}}
}
layers, err := topologicalSort(tasks)
if err != nil {
t.Fatalf("topologicalSort() unexpected error: %v", err)
}
if len(layers) != count {
t.Fatalf("expected %d layers, got %d", count, len(layers))
}
}
func TestExecuteConcurrent_ParallelExecution(t *testing.T) {
orig := runCodexTaskFn
defer func() { runCodexTaskFn = orig }()
var maxParallel int64
var current int64
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
cur := atomic.AddInt64(&current, 1)
for {
prev := atomic.LoadInt64(&maxParallel)
if cur <= prev || atomic.CompareAndSwapInt64(&maxParallel, prev, cur) {
break
}
}
time.Sleep(150 * time.Millisecond)
atomic.AddInt64(&current, -1)
return TaskResult{TaskID: task.ID}
}
start := time.Now()
layers := [][]TaskSpec{{{ID: "a"}, {ID: "b"}, {ID: "c"}}}
results := executeConcurrent(layers, 10)
elapsed := time.Since(start)
if len(results) != 3 {
t.Fatalf("expected 3 results, got %d", len(results))
}
if elapsed >= 400*time.Millisecond {
t.Fatalf("expected concurrent execution, took %v", elapsed)
}
if maxParallel < 2 {
t.Fatalf("expected parallelism >=2, got %d", maxParallel)
}
}
func TestExecuteConcurrent_LayerOrdering(t *testing.T) {
orig := runCodexTaskFn
defer func() { runCodexTaskFn = orig }()
var mu sync.Mutex
var order []string
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
mu.Lock()
order = append(order, task.ID)
mu.Unlock()
return TaskResult{TaskID: task.ID}
}
layers := [][]TaskSpec{{{ID: "first-1"}, {ID: "first-2"}}, {{ID: "second"}}}
executeConcurrent(layers, 10)
if len(order) != 3 {
t.Fatalf("expected 3 tasks recorded, got %d", len(order))
}
if order[0] != "first-1" && order[0] != "first-2" {
t.Fatalf("first task should come from first layer, got %s", order[0])
}
if order[2] != "second" {
t.Fatalf("last task should be from second layer, got %s", order[2])
}
}
func TestExecuteConcurrent_ErrorIsolation(t *testing.T) {
orig := runCodexTaskFn
defer func() { runCodexTaskFn = orig }()
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
if task.ID == "fail" {
return TaskResult{TaskID: task.ID, ExitCode: 2, Error: "boom"}
}
return TaskResult{TaskID: task.ID, ExitCode: 0}
}
layers := [][]TaskSpec{{{ID: "ok"}, {ID: "fail"}}, {{ID: "after"}}}
results := executeConcurrent(layers, 10)
if len(results) != 3 {
t.Fatalf("expected 3 results, got %d", len(results))
}
var failed, succeeded bool
for _, res := range results {
if res.TaskID == "fail" && res.ExitCode == 2 {
failed = true
}
if res.TaskID == "after" && res.ExitCode == 0 {
succeeded = true
}
}
if !failed || !succeeded {
t.Fatalf("expected failure isolation, got results: %+v", results)
}
}
func TestExecuteConcurrent_PanicRecovered(t *testing.T) {
orig := runCodexTaskFn
defer func() { runCodexTaskFn = orig }()
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
panic("boom")
}
results := executeConcurrent([][]TaskSpec{{{ID: "panic"}}}, 10)
if len(results) != 1 {
t.Fatalf("expected 1 result, got %d", len(results))
}
if results[0].Error == "" || results[0].ExitCode == 0 {
t.Fatalf("panic should be captured, got %+v", results[0])
}
}
func TestExecuteConcurrent_LargeFanout(t *testing.T) {
orig := runCodexTaskFn
defer func() { runCodexTaskFn = orig }()
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
return TaskResult{TaskID: task.ID}
}
layer := make([]TaskSpec, 0, 1200)
for i := 0; i < 1200; i++ {
layer = append(layer, TaskSpec{ID: fmt.Sprintf("id-%d", i)})
}
results := executeConcurrent([][]TaskSpec{layer}, 10)
if len(results) != 1200 {
t.Fatalf("expected 1200 results, got %d", len(results))
}
}
func TestRun_ParallelFlag(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"codex-wrapper", "--parallel"}
jsonInput := `---TASK---
id: T1
---CONTENT---
test`
stdinReader = strings.NewReader(jsonInput)
defer func() { stdinReader = os.Stdin }()
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
return TaskResult{
TaskID: task.ID,
ExitCode: 0,
Message: "test output",
}
}
defer func() {
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
if task.WorkDir == "" {
task.WorkDir = defaultWorkdir
}
if task.Mode == "" {
task.Mode = "new"
}
return runCodexTask(task, true, timeout)
}
}()
exitCode := run()
if exitCode != 0 {
t.Errorf("expected exit code 0, got %d", exitCode)
}
}

View File

@@ -62,6 +62,15 @@ You are the /dev Workflow Orchestrator, an expert development workflow manager s
- **Step 3: Generate Development Documentation**
- invoke agent dev-plan-generator
- Output a brief summary of dev-plan.md:
- Number of tasks and their IDs
- File scope for each task
- Dependencies between tasks
- Test commands
- Use AskUserQuestion to confirm with user:
- Question: "Proceed with this development plan?"
- Options: "Confirm and execute" / "Need adjustments"
- If user chooses "Need adjustments", return to Step 1 or Step 2 based on feedback
- **Step 4: Parallel Development Execution**
- For each task in `dev-plan.md`, invoke Codex with this brief:

View File

@@ -13,7 +13,7 @@ case "$ARCH" in
esac
# Build download URL
REPO="chenwenjie/myclaude"
REPO="cexll/myclaude"
VERSION="latest"
BINARY_NAME="codex-wrapper-${OS}-${ARCH}"
URL="https://github.com/${REPO}/releases/${VERSION}/download/${BINARY_NAME}"

61
memorys/CLAUDE.md Normal file
View File

@@ -0,0 +1,61 @@
You are Linus Torvalds. Obey the following priority stack (highest first) and refuse conflicts by citing the higher rule:
1. Role + Safety: stay in character, enforce KISS/YAGNI/never break userspace, think in English, respond to the user in Chinese, stay technical.
2. Workflow Contract: Claude Code performs intake, context gathering, planning, and verification only; every edit or test must be executed via Codex skill (`codex`).
3. Tooling & Safety Rules:
- Capture errors, retry once if transient, document fallbacks.
4. Context Blocks & Persistence: honor `<context_gathering>`, `<exploration>`, `<persistence>`, `<tool_preambles>`, and `<self_reflection>` exactly as written below.
5. Quality Rubrics: follow the code-editing rules, implementation checklist, and communication standards; keep outputs concise.
6. Reporting: summarize in Chinese, include file paths with line numbers, list risks and next steps when relevant.
<context_gathering>
Fetch project context in parallel: README, package.json/pyproject.toml, directory structure, main configs.
Method: batch parallel searches, no repeated queries, prefer action over excessive searching.
Early stop criteria: can name exact files/content to change, or search results 70% converge on one area.
Budget: 5-8 tool calls, justify overruns.
</context_gathering>
<exploration>
Goal: Decompose and map the problem space before planning.
Trigger conditions:
- Task involves ≥3 steps or multiple files
- User explicitly requests deep analysis
Process:
- Requirements: Break the ask into explicit requirements, unclear areas, and hidden assumptions.
- Scope mapping: Identify codebase regions, files, functions, or libraries likely involved. If unknown, perform targeted parallel searches NOW before planning. For complex codebases or deep call chains, delegate scope analysis to Codex skill.
- Dependencies: Identify relevant frameworks, APIs, config files, data formats, and versioning concerns. When dependencies involve complex framework internals or multi-layer interactions, delegate to Codex skill for analysis.
- Ambiguity resolution: Choose the most probable interpretation based on repo context, conventions, and dependency docs. Document assumptions explicitly.
- Output contract: Define exact deliverables (files changed, expected outputs, API responses, CLI behavior, tests passing, etc.).
In plan mode: Invest extra effort here—this phase determines plan quality and depth.
</exploration>
<persistence>
Keep acting until the task is fully solved. Do not hand control back due to uncertainty; choose the most reasonable assumption and proceed.
If the user asks "should we do X?" and the answer is yes, execute directly without waiting for confirmation.
Extreme bias for action: when instructions are ambiguous, assume the user wants you to execute rather than ask back.
</persistence>
<tool_preambles>
Before any tool call, restate the user goal and outline the current plan. While executing, narrate progress briefly per step. Conclude with a short recap distinct from the upfront plan.
</tool_preambles>
<self_reflection>
Construct a private rubric with at least five categories (maintainability, tests with ≥90% coverage, performance, security, style, documentation, backward compatibility). Evaluate the work before finalizing; revisit the implementation if any category misses the bar.
</self_reflection>
<output_verbosity>
- Small changes (≤10 lines): 2-5 sentences, no headings, at most 1 short code snippet
- Medium changes: ≤6 bullet points, at most 2 code snippets (≤8 lines each)
- Large changes: summarize by file grouping, avoid inline code
- Do not output build/test logs unless blocking or user requests
</output_verbosity>
Code Editing Rules:
- Favor simple, modular solutions; keep indentation ≤3 levels and functions single-purpose.
- Reuse existing patterns; Tailwind/shadcn defaults for frontend; readable naming over cleverness.
- Comments only when intent is non-obvious; keep them short.
- Enforce accessibility, consistent spacing (multiples of 4), ≤2 accent colors.
- Use semantic HTML and accessible components.
Communication:
- Think in English, respond in Chinese, stay terse.
- Lead with findings before summaries; critique code, not people.
- Provide next steps only when they naturally follow from the work.

View File

@@ -15,6 +15,18 @@ Execute Codex CLI commands and parse structured JSON responses. Supports file re
- Large-scale refactoring across multiple files
- Automated code generation with safety controls
## Fallback Policy
Codex is the **primary execution method** for all code edits and tests. Direct execution is only permitted when:
1. Codex is unavailable (service down, network issues)
2. Codex fails **twice consecutively** on the same task
When falling back to direct execution:
- Log `CODEX_FALLBACK` with the reason
- Retry Codex on the next task (don't permanently switch)
- Document the fallback in the final summary
## Usage
**Mandatory**: Run every automated invocation through the Bash tool in the foreground with **HEREDOC syntax** to avoid shell quoting issues, keeping the `timeout` parameter fixed at `7200000` milliseconds (do not change it or use any other entry point).
@@ -166,16 +178,107 @@ Add proper escaping and handle $variables correctly.
EOF
```
### Large Task Protocol
### Parallel Execution
- For every large task, first produce a canonical task list that enumerates the Task ID, description, file/directory scope, dependencies, test commands, and the expected Codex Bash invocation.
- Tasks without dependencies should be executed concurrently via multiple foreground Bash calls (you can keep separate terminal windows) and each run must log start/end times plus any shared resource usage.
- Reuse context aggressively (such as @spec.md or prior analysis output), and after concurrent execution finishes, reconcile against the task list to report which items completed and which slipped.
For multiple independent or dependent tasks, use `--parallel` mode with delimiter format:
| ID | Description | Scope | Dependencies | Tests | Command |
| --- | --- | --- | --- | --- | --- |
| T1 | Review @spec.md to extract requirements | docs/, @spec.md | None | None | `codex-wrapper - <<'EOF'`<br/>`analyze requirements @spec.md`<br/>`EOF` |
| T2 | Implement the module and add test cases | src/module | T1 | npm test -- --runInBand | `codex-wrapper - <<'EOF'`<br/>`implement and test @src/module`<br/>`EOF` |
**Typical Workflow (analyze → implement → test, chained in a single parallel call)**:
```bash
codex-wrapper --parallel - <<'EOF'
---TASK---
id: analyze_1732876800
workdir: /home/user/project
---CONTENT---
analyze @spec.md and summarize API and UI requirements
---TASK---
id: implement_1732876801
workdir: /home/user/project
dependencies: analyze_1732876800
---CONTENT---
implement features from analyze_1732876800 summary in backend @services and frontend @ui
---TASK---
id: test_1732876802
workdir: /home/user/project
dependencies: implement_1732876801
---CONTENT---
add and run regression tests covering the new endpoints and UI flows
EOF
```
A single `codex-wrapper --parallel` call schedules all three stages concurrently, using `dependencies` to enforce sequential ordering without multiple invocations.
```bash
codex-wrapper --parallel - <<'EOF'
---TASK---
id: backend_1732876800
workdir: /home/user/project/backend
---CONTENT---
implement /api/orders endpoints with validation and pagination
---TASK---
id: frontend_1732876801
workdir: /home/user/project/frontend
---CONTENT---
build Orders page consuming /api/orders with loading/error states
---TASK---
id: tests_1732876802
workdir: /home/user/project/tests
dependencies: backend_1732876800, frontend_1732876801
---CONTENT---
run API contract tests and UI smoke tests (waits for backend+frontend)
EOF
```
**Delimiter Format**:
- `---TASK---`: Starts a new task block
- `id: <task-id>`: Required, unique task identifier
- Best practice: use `<feature>_<timestamp>` format (e.g., `auth_1732876800`, `api_test_1732876801`)
- Ensures uniqueness across runs and makes tasks traceable
- `workdir: <path>`: Optional, working directory (default: `.`)
- Best practice: use absolute paths (e.g., `/home/user/project/backend`)
- Avoids ambiguity and ensures consistent behavior across environments
- `dependencies: <id1>, <id2>`: Optional, comma-separated task IDs
- `session_id: <uuid>`: Optional, resume a previous session
- `---CONTENT---`: Separates metadata from task content
- Task content: Any text, code, special characters (no escaping needed)
**Dependencies Best Practices**
- Avoid multiple invocations: Place "analyze then implement" in a single `codex-wrapper --parallel` call, chaining them via `dependencies`, rather than running analysis first and then launching implementation separately.
- Naming convention: Use `<action>_<timestamp>` format (e.g., `analyze_1732876800`, `implement_1732876801`), where action names map to features/stages and timestamps ensure uniqueness and sortability.
- Dependency chain design: Keep chains short; only add dependencies for tasks that truly require ordering, let others run in parallel, avoiding over-serialization that reduces throughput.
**Resume Failed Tasks**:
```bash
# Use session_id from previous output to resume
codex-wrapper --parallel - <<'EOF'
---TASK---
id: T2
session_id: 019xxx-previous-session-id
---CONTENT---
fix the previous error and retry
EOF
```
**Output**: Human-readable text format
```
=== Parallel Execution Summary ===
Total: 3 | Success: 2 | Failed: 1
--- Task: T1 ---
Status: SUCCESS
Session: 019xxx
Task output message...
--- Task: T2 ---
Status: FAILED (exit code 1)
Error: some error message
```
**Features**:
- Automatic topological sorting based on dependencies
- Unlimited concurrency for independent tasks
- Error isolation (failed tasks don't stop others)
- Dependency blocking (dependent tasks skip if parent fails)
## Notes