diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..4f86e44 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,6 @@ +# Changelog + +## 5.2.0 - 2025-12-11 +- PR #53: unified CLI version source in `codeagent-wrapper/main.go` and bumped to 5.2.0. +- Added legacy `codex-wrapper` alias support (runtime detection plus `scripts/install.sh` symlink helper). +- Updated documentation to reflect backend flag usage and new version output. diff --git a/README.md b/README.md index f42d2ce..95623d2 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Claude Code](https://img.shields.io/badge/Claude-Code-blue)](https://claude.ai/code) -[![Version](https://img.shields.io/badge/Version-5.0-green)](https://github.com/cexll/myclaude) +[![Version](https://img.shields.io/badge/Version-5.2-green)](https://github.com/cexll/myclaude) > AI-powered development automation with Claude Code + Codex collaboration @@ -242,6 +242,8 @@ python3 install.py --module dev # Manual bash install.sh +# Create legacy codex-wrapper alias (uses $INSTALL_DIR or ~/bin) +bash scripts/install.sh ``` #### Windows diff --git a/README_CN.md b/README_CN.md index 50fc804..5516011 100644 --- a/README_CN.md +++ b/README_CN.md @@ -2,7 +2,7 @@ [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Claude Code](https://img.shields.io/badge/Claude-Code-blue)](https://claude.ai/code) -[![Version](https://img.shields.io/badge/Version-5.0-green)](https://github.com/cexll/myclaude) +[![Version](https://img.shields.io/badge/Version-5.2-green)](https://github.com/cexll/myclaude) > AI 驱动的开发自动化 - Claude Code + Codex 协作 @@ -233,6 +233,8 @@ python3 install.py --module dev # 手动 bash install.sh +# 创建 codex-wrapper 兼容别名(使用 $INSTALL_DIR 或 ~/bin) +bash scripts/install.sh ``` #### Windows 系统 diff --git a/codeagent-wrapper/backend.go b/codeagent-wrapper/backend.go index 59b8f5e..7a5092c 100644 --- a/codeagent-wrapper/backend.go +++ b/codeagent-wrapper/backend.go @@ -29,14 +29,29 @@ func (ClaudeBackend) BuildArgs(cfg *Config, targetArg string) []string { if cfg == nil { return nil } - // claude -p --dangerously-skip-permissions --output-format stream-json --verbose - args := []string{ - "-p", - "--dangerously-skip-permissions", - "--output-format", "stream-json", - "--verbose", + args := []string{"-p"} + + // Default to skip permissions for Claude backend + if !cfg.SkipPermissions { + args = append(args, "--dangerously-skip-permissions") } - return append(args, targetArg) + + workdir := cfg.WorkDir + if workdir == "" { + workdir = defaultWorkdir + } + + if cfg.Mode == "resume" { + if cfg.SessionID != "" { + args = append(args, "--session-id", cfg.SessionID) + } + } else { + args = append(args, "-C", workdir) + } + + args = append(args, "--output-format", "stream-json", "--verbose", targetArg) + + return args } type GeminiBackend struct{} @@ -49,6 +64,22 @@ func (GeminiBackend) BuildArgs(cfg *Config, targetArg string) []string { if cfg == nil { return nil } - // gemini -o stream-json -y -p - return []string{"-o", "stream-json", "-y", "-p", targetArg} + args := []string{"-o", "stream-json", "-y"} + + workdir := cfg.WorkDir + if workdir == "" { + workdir = defaultWorkdir + } + + if cfg.Mode == "resume" { + if cfg.SessionID != "" { + args = append(args, "--session-id", cfg.SessionID) + } + } else { + args = append(args, "-C", workdir) + } + + args = append(args, "-p", targetArg) + + return args } diff --git a/codeagent-wrapper/backend_test.go b/codeagent-wrapper/backend_test.go new file mode 100644 index 0000000..abb550b --- /dev/null +++ b/codeagent-wrapper/backend_test.go @@ -0,0 +1,122 @@ +package main + +import ( + "reflect" + "testing" +) + +func TestClaudeBuildArgs_ModesAndPermissions(t *testing.T) { + backend := ClaudeBackend{} + + t.Run("new mode uses workdir without skip by default", func(t *testing.T) { + cfg := &Config{Mode: "new", WorkDir: "/repo"} + got := backend.BuildArgs(cfg, "todo") + want := []string{"-p", "-C", "/repo", "--output-format", "stream-json", "--verbose", "todo"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("new mode opt-in skip permissions with default workdir", func(t *testing.T) { + cfg := &Config{Mode: "new", SkipPermissions: true} + got := backend.BuildArgs(cfg, "-") + want := []string{"-p", "--dangerously-skip-permissions", "-C", defaultWorkdir, "--output-format", "stream-json", "--verbose", "-"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("resume mode uses session id and omits workdir", func(t *testing.T) { + cfg := &Config{Mode: "resume", SessionID: "sid-123", WorkDir: "/ignored"} + got := backend.BuildArgs(cfg, "resume-task") + want := []string{"-p", "--session-id", "sid-123", "--output-format", "stream-json", "--verbose", "resume-task"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("resume mode without session still returns base flags", func(t *testing.T) { + cfg := &Config{Mode: "resume", WorkDir: "/ignored"} + got := backend.BuildArgs(cfg, "follow-up") + want := []string{"-p", "--output-format", "stream-json", "--verbose", "follow-up"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("nil config returns nil", func(t *testing.T) { + if backend.BuildArgs(nil, "ignored") != nil { + t.Fatalf("nil config should return nil args") + } + }) +} + +func TestClaudeBuildArgs_GeminiAndCodexModes(t *testing.T) { + t.Run("gemini new mode defaults workdir", func(t *testing.T) { + backend := GeminiBackend{} + cfg := &Config{Mode: "new", WorkDir: "/workspace"} + got := backend.BuildArgs(cfg, "task") + want := []string{"-o", "stream-json", "-y", "-C", "/workspace", "-p", "task"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("gemini resume mode uses session id", func(t *testing.T) { + backend := GeminiBackend{} + cfg := &Config{Mode: "resume", SessionID: "sid-999"} + got := backend.BuildArgs(cfg, "resume") + want := []string{"-o", "stream-json", "-y", "--session-id", "sid-999", "-p", "resume"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("gemini resume mode without session omits identifier", func(t *testing.T) { + backend := GeminiBackend{} + cfg := &Config{Mode: "resume"} + got := backend.BuildArgs(cfg, "resume") + want := []string{"-o", "stream-json", "-y", "-p", "resume"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("gemini nil config returns nil", func(t *testing.T) { + backend := GeminiBackend{} + if backend.BuildArgs(nil, "ignored") != nil { + t.Fatalf("nil config should return nil args") + } + }) + + t.Run("codex build args passthrough remains intact", func(t *testing.T) { + backend := CodexBackend{} + cfg := &Config{Mode: "new", WorkDir: "/tmp"} + got := backend.BuildArgs(cfg, "task") + want := []string{"e", "--skip-git-repo-check", "-C", "/tmp", "--json", "task"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + }) +} + +func TestClaudeBuildArgs_BackendMetadata(t *testing.T) { + tests := []struct { + backend Backend + name string + command string + }{ + {backend: CodexBackend{}, name: "codex", command: "codex"}, + {backend: ClaudeBackend{}, name: "claude", command: "claude"}, + {backend: GeminiBackend{}, name: "gemini", command: "gemini"}, + } + + for _, tt := range tests { + if got := tt.backend.Name(); got != tt.name { + t.Fatalf("Name() = %s, want %s", got, tt.name) + } + if got := tt.backend.Command(); got != tt.command { + t.Fatalf("Command() = %s, want %s", got, tt.command) + } + } +} diff --git a/codeagent-wrapper/concurrent_stress_test.go b/codeagent-wrapper/concurrent_stress_test.go index ac31137..10fcc1e 100644 --- a/codeagent-wrapper/concurrent_stress_test.go +++ b/codeagent-wrapper/concurrent_stress_test.go @@ -2,11 +2,13 @@ package main import ( "bufio" + "context" "fmt" "os" "regexp" "strings" "sync" + "sync/atomic" "testing" "time" ) @@ -319,3 +321,106 @@ func TestLoggerOrderPreservation(t *testing.T) { t.Logf("Order preservation test: all %d goroutines maintained sequence order", len(sequences)) } + +func TestConcurrentWorkerPoolLimit(t *testing.T) { + orig := runCodexTaskFn + defer func() { runCodexTaskFn = orig }() + + logger, err := NewLoggerWithSuffix("pool-limit") + if err != nil { + t.Fatal(err) + } + setLogger(logger) + t.Cleanup(func() { + _ = closeLogger() + _ = logger.RemoveLogFile() + }) + + var active int64 + var maxSeen int64 + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + if task.Context == nil { + t.Fatalf("context not propagated for task %s", task.ID) + } + cur := atomic.AddInt64(&active, 1) + for { + prev := atomic.LoadInt64(&maxSeen) + if cur <= prev || atomic.CompareAndSwapInt64(&maxSeen, prev, cur) { + break + } + } + select { + case <-task.Context.Done(): + atomic.AddInt64(&active, -1) + return TaskResult{TaskID: task.ID, ExitCode: 130, Error: "context cancelled"} + case <-time.After(30 * time.Millisecond): + } + atomic.AddInt64(&active, -1) + return TaskResult{TaskID: task.ID} + } + + layers := [][]TaskSpec{{{ID: "t1"}, {ID: "t2"}, {ID: "t3"}, {ID: "t4"}, {ID: "t5"}}} + results := executeConcurrentWithContext(context.Background(), layers, 5, 2) + + if len(results) != 5 { + t.Fatalf("unexpected result count: got %d", len(results)) + } + if maxSeen > 2 { + t.Fatalf("worker pool exceeded limit: saw %d active workers", maxSeen) + } + + logger.Flush() + data, err := os.ReadFile(logger.Path()) + if err != nil { + t.Fatalf("failed to read log file: %v", err) + } + content := string(data) + if !strings.Contains(content, "worker_limit=2") { + t.Fatalf("concurrency planning log missing, content: %s", content) + } + if !strings.Contains(content, "parallel: start") { + t.Fatalf("concurrency start logs missing, content: %s", content) + } +} + +func TestConcurrentCancellationPropagation(t *testing.T) { + orig := runCodexTaskFn + defer func() { runCodexTaskFn = orig }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + if task.Context == nil { + t.Fatalf("context not propagated for task %s", task.ID) + } + select { + case <-task.Context.Done(): + return TaskResult{TaskID: task.ID, ExitCode: 130, Error: "context cancelled"} + case <-time.After(200 * time.Millisecond): + return TaskResult{TaskID: task.ID} + } + } + + layers := [][]TaskSpec{{{ID: "a"}, {ID: "b"}, {ID: "c"}}} + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + results := executeConcurrentWithContext(ctx, layers, 1, 2) + if len(results) != 3 { + t.Fatalf("unexpected result count: got %d", len(results)) + } + + cancelled := 0 + for _, res := range results { + if res.ExitCode != 0 { + cancelled++ + } + } + + if cancelled == 0 { + t.Fatalf("expected cancellation to propagate, got results: %+v", results) + } +} diff --git a/codeagent-wrapper/config.go b/codeagent-wrapper/config.go index 078fcdb..054c9bd 100644 --- a/codeagent-wrapper/config.go +++ b/codeagent-wrapper/config.go @@ -2,36 +2,43 @@ package main import ( "bytes" + "context" "fmt" "os" + "strconv" "strings" ) // Config holds CLI configuration type Config struct { - Mode string // "new" or "resume" - Task string - SessionID string - WorkDir string - ExplicitStdin bool - Timeout int - Backend string + Mode string // "new" or "resume" + Task string + SessionID string + WorkDir string + ExplicitStdin bool + Timeout int + Backend string + SkipPermissions bool + MaxParallelWorkers int } // ParallelConfig defines the JSON schema for parallel execution type ParallelConfig struct { - Tasks []TaskSpec `json:"tasks"` + Tasks []TaskSpec `json:"tasks"` + GlobalBackend string `json:"backend,omitempty"` } // TaskSpec describes an individual task entry in the parallel config type TaskSpec struct { - ID string `json:"id"` - Task string `json:"task"` - WorkDir string `json:"workdir,omitempty"` - Dependencies []string `json:"dependencies,omitempty"` - SessionID string `json:"session_id,omitempty"` - Mode string `json:"-"` - UseStdin bool `json:"-"` + ID string `json:"id"` + Task string `json:"task"` + WorkDir string `json:"workdir,omitempty"` + Dependencies []string `json:"dependencies,omitempty"` + SessionID string `json:"session_id,omitempty"` + Backend string `json:"backend,omitempty"` + Mode string `json:"-"` + UseStdin bool `json:"-"` + Context context.Context `json:"-"` } // TaskResult captures the execution outcome of a task @@ -61,6 +68,32 @@ func selectBackend(name string) (Backend, error) { return nil, fmt.Errorf("unsupported backend %q", name) } +func envFlagEnabled(key string) bool { + val, ok := os.LookupEnv(key) + if !ok { + return false + } + val = strings.TrimSpace(strings.ToLower(val)) + switch val { + case "", "0", "false", "no", "off": + return false + default: + return true + } +} + +func parseBoolFlag(val string, defaultValue bool) bool { + val = strings.TrimSpace(strings.ToLower(val)) + switch val { + case "1", "true", "yes", "on": + return true + case "0", "false", "no", "off": + return false + default: + return defaultValue + } +} + func parseParallelConfig(data []byte) (*ParallelConfig, error) { trimmed := bytes.TrimSpace(data) if len(trimmed) == 0 { @@ -106,6 +139,8 @@ func parseParallelConfig(data []byte) (*ParallelConfig, error) { case "session_id": task.SessionID = value task.Mode = "resume" + case "backend": + task.Backend = value case "dependencies": for _, dep := range strings.Split(value, ",") { dep = strings.TrimSpace(dep) @@ -116,6 +151,10 @@ func parseParallelConfig(data []byte) (*ParallelConfig, error) { } } + if task.Mode == "" { + task.Mode = "new" + } + if task.ID == "" { return nil, fmt.Errorf("task missing id field") } @@ -145,6 +184,7 @@ func parseArgs() (*Config, error) { } backendName := defaultBackendName + skipPermissions := envFlagEnabled("CODEAGENT_SKIP_PERMISSIONS") filtered := make([]string, 0, len(args)) for i := 0; i < len(args); i++ { arg := args[i] @@ -163,6 +203,15 @@ func parseArgs() (*Config, error) { } backendName = value continue + case arg == "--skip-permissions", arg == "--dangerously-skip-permissions": + skipPermissions = true + continue + case strings.HasPrefix(arg, "--skip-permissions="): + skipPermissions = parseBoolFlag(strings.TrimPrefix(arg, "--skip-permissions="), skipPermissions) + continue + case strings.HasPrefix(arg, "--dangerously-skip-permissions="): + skipPermissions = parseBoolFlag(strings.TrimPrefix(arg, "--dangerously-skip-permissions="), skipPermissions) + continue } filtered = append(filtered, arg) } @@ -172,7 +221,8 @@ func parseArgs() (*Config, error) { } args = filtered - cfg := &Config{WorkDir: defaultWorkdir, Backend: backendName} + cfg := &Config{WorkDir: defaultWorkdir, Backend: backendName, SkipPermissions: skipPermissions} + cfg.MaxParallelWorkers = resolveMaxParallelWorkers() if args[0] == "resume" { if len(args) < 3 { @@ -196,3 +246,18 @@ func parseArgs() (*Config, error) { return cfg, nil } + +func resolveMaxParallelWorkers() int { + raw := strings.TrimSpace(os.Getenv("CODEAGENT_MAX_PARALLEL_WORKERS")) + if raw == "" { + return 0 + } + + value, err := strconv.Atoi(raw) + if err != nil || value < 0 { + logWarn(fmt.Sprintf("Invalid CODEAGENT_MAX_PARALLEL_WORKERS=%q, falling back to unlimited", raw)) + return 0 + } + + return value +} diff --git a/codeagent-wrapper/executor.go b/codeagent-wrapper/executor.go index 0e70afb..9c58d2f 100644 --- a/codeagent-wrapper/executor.go +++ b/codeagent-wrapper/executor.go @@ -126,7 +126,22 @@ var runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { task.UseStdin = true } - return runCodexTask(task, true, timeout) + backendName := task.Backend + if backendName == "" { + backendName = defaultBackendName + } + + backend, err := selectBackendFn(backendName) + if err != nil { + return TaskResult{TaskID: task.ID, ExitCode: 1, Error: err.Error()} + } + task.Backend = backend.Name() + + parentCtx := task.Context + if parentCtx == nil { + parentCtx = context.Background() + } + return runCodexTaskWithContext(parentCtx, task, backend, nil, false, true, timeout) } func topologicalSort(tasks []TaskSpec) ([][]TaskSpec, error) { @@ -196,6 +211,11 @@ func topologicalSort(tasks []TaskSpec) ([][]TaskSpec, error) { } func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { + maxWorkers := resolveMaxParallelWorkers() + return executeConcurrentWithContext(context.Background(), layers, timeout, maxWorkers) +} + +func executeConcurrentWithContext(parentCtx context.Context, layers [][]TaskSpec, timeout int, maxWorkers int) []TaskResult { totalTasks := 0 for _, layer := range layers { totalTasks += len(layer) @@ -226,6 +246,49 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { startPrintMu.Unlock() } + ctx := parentCtx + if ctx == nil { + ctx = context.Background() + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + workerLimit := maxWorkers + if workerLimit < 0 { + workerLimit = 0 + } + + var sem chan struct{} + if workerLimit > 0 { + sem = make(chan struct{}, workerLimit) + } + + logConcurrencyPlanning(workerLimit, totalTasks) + + acquireSlot := func() bool { + if sem == nil { + return true + } + select { + case sem <- struct{}{}: + return true + case <-ctx.Done(): + return false + } + } + + releaseSlot := func() { + if sem == nil { + return + } + select { + case <-sem: + default: + } + } + + var activeWorkers int64 + for _, layer := range layers { var wg sync.WaitGroup executed := 0 @@ -238,6 +301,13 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { continue } + if ctx.Err() != nil { + res := cancelledTaskResult(task.ID, ctx) + results = append(results, res) + failed[task.ID] = res + continue + } + executed++ wg.Add(1) go func(ts TaskSpec) { @@ -247,6 +317,21 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)} } }() + + if !acquireSlot() { + resultsCh <- cancelledTaskResult(ts.ID, ctx) + return + } + defer releaseSlot() + + current := atomic.AddInt64(&activeWorkers, 1) + logConcurrencyState("start", ts.ID, int(current), workerLimit) + defer func() { + after := atomic.AddInt64(&activeWorkers, -1) + logConcurrencyState("done", ts.ID, int(after), workerLimit) + }() + + ts.Context = ctx printTaskStart(ts.ID) resultsCh <- runCodexTaskFn(ts, timeout) }(task) @@ -266,6 +351,16 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { return results } +func cancelledTaskResult(taskID string, ctx context.Context) TaskResult { + exitCode := 130 + msg := "execution cancelled" + if ctx != nil && errors.Is(ctx.Err(), context.DeadlineExceeded) { + exitCode = 124 + msg = "execution timeout" + } + return TaskResult{TaskID: taskID, ExitCode: exitCode, Error: msg} +} + func shouldSkipTask(task TaskSpec, failed map[string]TaskResult) (bool, string) { if len(task.Dependencies) == 0 { return false, "" @@ -346,15 +441,15 @@ func buildCodexArgs(cfg *Config, targetArg string) []string { } func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult { - return runCodexTaskWithContext(context.Background(), taskSpec, nil, false, silent, timeoutSec) + return runCodexTaskWithContext(context.Background(), taskSpec, nil, nil, false, silent, timeoutSec) } func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText string, useStdin bool, timeoutSec int) (message, threadID string, exitCode int) { - res := runCodexTaskWithContext(parentCtx, TaskSpec{Task: taskText, WorkDir: defaultWorkdir, Mode: "new", UseStdin: useStdin}, codexArgs, true, false, timeoutSec) + res := runCodexTaskWithContext(parentCtx, TaskSpec{Task: taskText, WorkDir: defaultWorkdir, Mode: "new", UseStdin: useStdin}, nil, codexArgs, true, false, timeoutSec) return res.Message, res.SessionID, res.ExitCode } -func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult { +func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, backend Backend, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult { result := TaskResult{TaskID: taskSpec.ID} setLogPath := func() { if result.LogPath != "" { @@ -372,6 +467,19 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo WorkDir: taskSpec.WorkDir, Backend: defaultBackendName, } + + commandName := codexCommand + argsBuilder := buildCodexArgsFn + if backend != nil { + commandName = backend.Command() + argsBuilder = backend.BuildArgs + cfg.Backend = backend.Name() + } else if taskSpec.Backend != "" { + cfg.Backend = taskSpec.Backend + } else if commandName != "" { + cfg.Backend = commandName + } + if cfg.Mode == "" { cfg.Mode = "new" } @@ -389,7 +497,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo if useCustomArgs { codexArgs = customArgs } else { - codexArgs = buildCodexArgsFn(cfg, targetArg) + codexArgs = argsBuilder(cfg, targetArg) } prefixMsg := func(msg string) string { @@ -467,7 +575,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String()) } - cmd := newCommandRunner(ctx, codexCommand, codexArgs...) + cmd := newCommandRunner(ctx, commandName, codexArgs...) stderrWriters := []io.Writer{stderrBuf} if stderrLogger != nil { @@ -507,23 +615,23 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo stdoutReader = io.TeeReader(stdout, stdoutLogger) } - logInfoFn(fmt.Sprintf("Starting %s with args: %s %s...", codexCommand, codexCommand, strings.Join(codexArgs[:min(5, len(codexArgs))], " "))) + logInfoFn(fmt.Sprintf("Starting %s with args: %s %s...", commandName, commandName, strings.Join(codexArgs[:min(5, len(codexArgs))], " "))) if err := cmd.Start(); err != nil { if strings.Contains(err.Error(), "executable file not found") { - msg := fmt.Sprintf("%s command not found in PATH", codexCommand) + msg := fmt.Sprintf("%s command not found in PATH", commandName) logErrorFn(msg) result.ExitCode = 127 result.Error = attachStderr(msg) return result } - logErrorFn("Failed to start " + codexCommand + ": " + err.Error()) + logErrorFn("Failed to start " + commandName + ": " + err.Error()) result.ExitCode = 1 - result.Error = attachStderr("failed to start " + codexCommand + ": " + err.Error()) + result.Error = attachStderr("failed to start " + commandName + ": " + err.Error()) return result } - logInfoFn(fmt.Sprintf("Starting %s with PID: %d", codexCommand, cmd.Process().Pid())) + logInfoFn(fmt.Sprintf("Starting %s with PID: %d", commandName, cmd.Process().Pid())) if logger := activeLogger(); logger != nil { logInfoFn(fmt.Sprintf("Log capturing to: %s", logger.Path())) } @@ -560,7 +668,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo case waitErr = <-waitCh: case <-ctx.Done(): ctxCancelled = true - logErrorFn(cancelReason(ctx)) + logErrorFn(cancelReason(commandName, ctx)) forceKillTimer = terminateCommandFn(cmd) waitErr = <-waitCh } @@ -592,7 +700,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo if ctxErr := ctx.Err(); ctxErr != nil { if errors.Is(ctxErr, context.DeadlineExceeded) { result.ExitCode = 124 - result.Error = attachStderr(fmt.Sprintf("%s execution timeout", codexCommand)) + result.Error = attachStderr(fmt.Sprintf("%s execution timeout", commandName)) return result } result.ExitCode = 130 @@ -603,23 +711,23 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo if waitErr != nil { if exitErr, ok := waitErr.(*exec.ExitError); ok { code := exitErr.ExitCode() - logErrorFn(fmt.Sprintf("%s exited with status %d", codexCommand, code)) + logErrorFn(fmt.Sprintf("%s exited with status %d", commandName, code)) result.ExitCode = code - result.Error = attachStderr(fmt.Sprintf("%s exited with status %d", codexCommand, code)) + result.Error = attachStderr(fmt.Sprintf("%s exited with status %d", commandName, code)) return result } - logErrorFn(codexCommand + " error: " + waitErr.Error()) + logErrorFn(commandName + " error: " + waitErr.Error()) result.ExitCode = 1 - result.Error = attachStderr(codexCommand + " error: " + waitErr.Error()) + result.Error = attachStderr(commandName + " error: " + waitErr.Error()) return result } message := parsed.message threadID := parsed.threadID if message == "" { - logErrorFn(fmt.Sprintf("%s completed without agent_message output", codexCommand)) + logErrorFn(fmt.Sprintf("%s completed without agent_message output", commandName)) result.ExitCode = 1 - result.Error = attachStderr(fmt.Sprintf("%s completed without agent_message output", codexCommand)) + result.Error = attachStderr(fmt.Sprintf("%s completed without agent_message output", commandName)) return result } @@ -671,16 +779,20 @@ func forwardSignals(ctx context.Context, cmd commandRunner, logErrorFn func(stri }() } -func cancelReason(ctx context.Context) string { +func cancelReason(commandName string, ctx context.Context) string { if ctx == nil { return "Context cancelled" } - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - return fmt.Sprintf("%s execution timeout", codexCommand) + if commandName == "" { + commandName = codexCommand } - return "Execution cancelled, terminating codex process" + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return fmt.Sprintf("%s execution timeout", commandName) + } + + return fmt.Sprintf("Execution cancelled, terminating %s process", commandName) } type stdoutReasonCloser interface { diff --git a/codeagent-wrapper/executor_concurrent_test.go b/codeagent-wrapper/executor_concurrent_test.go new file mode 100644 index 0000000..57aed24 --- /dev/null +++ b/codeagent-wrapper/executor_concurrent_test.go @@ -0,0 +1,576 @@ +package main + +import ( + "bytes" + "context" + "errors" + "io" + "os" + "os/exec" + "strings" + "sync" + "sync/atomic" + "syscall" + "testing" + "time" +) + +type execFakeProcess struct { + pid int + signals []os.Signal + killed atomic.Int32 + mu sync.Mutex +} + +func (p *execFakeProcess) Pid() int { return p.pid } +func (p *execFakeProcess) Kill() error { + p.killed.Add(1) + return nil +} +func (p *execFakeProcess) Signal(sig os.Signal) error { + p.mu.Lock() + p.signals = append(p.signals, sig) + p.mu.Unlock() + return nil +} + +type writeCloserStub struct { + bytes.Buffer + closed atomic.Bool +} + +func (w *writeCloserStub) Close() error { + w.closed.Store(true) + return nil +} + +type reasonReadCloser struct { + r io.Reader + closed []string + mu sync.Mutex + closedC chan struct{} +} + +func newReasonReadCloser(data string) *reasonReadCloser { + return &reasonReadCloser{r: strings.NewReader(data), closedC: make(chan struct{}, 1)} +} + +func (rc *reasonReadCloser) Read(p []byte) (int, error) { return rc.r.Read(p) } +func (rc *reasonReadCloser) Close() error { rc.record("close"); return nil } +func (rc *reasonReadCloser) CloseWithReason(reason string) error { + rc.record(reason) + return nil +} + +func (rc *reasonReadCloser) record(reason string) { + rc.mu.Lock() + rc.closed = append(rc.closed, reason) + rc.mu.Unlock() + select { + case rc.closedC <- struct{}{}: + default: + } +} + +type execFakeRunner struct { + stdout io.ReadCloser + process processHandle + stdin io.WriteCloser + waitErr error + waitDelay time.Duration + startErr error + stdoutErr error + stdinErr error + allowNilProcess bool + started atomic.Bool +} + +func (f *execFakeRunner) Start() error { + if f.startErr != nil { + return f.startErr + } + f.started.Store(true) + return nil +} +func (f *execFakeRunner) Wait() error { + if f.waitDelay > 0 { + time.Sleep(f.waitDelay) + } + return f.waitErr +} +func (f *execFakeRunner) StdoutPipe() (io.ReadCloser, error) { + if f.stdoutErr != nil { + return nil, f.stdoutErr + } + if f.stdout == nil { + f.stdout = io.NopCloser(strings.NewReader("")) + } + return f.stdout, nil +} +func (f *execFakeRunner) StdinPipe() (io.WriteCloser, error) { + if f.stdinErr != nil { + return nil, f.stdinErr + } + if f.stdin != nil { + return f.stdin, nil + } + return &writeCloserStub{}, nil +} +func (f *execFakeRunner) SetStderr(io.Writer) {} +func (f *execFakeRunner) Process() processHandle { + if f.process != nil { + return f.process + } + if f.allowNilProcess { + return nil + } + return &execFakeProcess{pid: 1} +} + +func TestExecutorHelperCoverage(t *testing.T) { + t.Run("realCmdAndProcess", func(t *testing.T) { + rc := &realCmd{} + if err := rc.Start(); err == nil { + t.Fatalf("expected error for nil command") + } + if err := rc.Wait(); err == nil { + t.Fatalf("expected error for nil command") + } + if _, err := rc.StdoutPipe(); err == nil { + t.Fatalf("expected error for nil command") + } + if _, err := rc.StdinPipe(); err == nil { + t.Fatalf("expected error for nil command") + } + rc.SetStderr(io.Discard) + if rc.Process() != nil { + t.Fatalf("expected nil process") + } + rcWithCmd := &realCmd{cmd: &exec.Cmd{}} + rcWithCmd.SetStderr(io.Discard) + echoCmd := exec.Command("echo", "ok") + rcProc := &realCmd{cmd: echoCmd} + stdoutPipe, err := rcProc.StdoutPipe() + if err != nil { + t.Fatalf("StdoutPipe error: %v", err) + } + stdinPipe, err := rcProc.StdinPipe() + if err != nil { + t.Fatalf("StdinPipe error: %v", err) + } + rcProc.SetStderr(io.Discard) + if err := rcProc.Start(); err != nil { + t.Fatalf("Start failed: %v", err) + } + _, _ = stdinPipe.Write([]byte{}) + _ = stdinPipe.Close() + procHandle := rcProc.Process() + if procHandle == nil { + t.Fatalf("expected process handle") + } + _ = procHandle.Signal(syscall.SIGTERM) + _ = procHandle.Kill() + _ = rcProc.Wait() + _, _ = io.ReadAll(stdoutPipe) + + rp := &realProcess{} + if rp.Pid() != 0 { + t.Fatalf("nil process should have pid 0") + } + if rp.Kill() != nil { + t.Fatalf("nil process Kill should be nil") + } + if rp.Signal(syscall.SIGTERM) != nil { + t.Fatalf("nil process Signal should be nil") + } + rpLive := &realProcess{proc: &os.Process{Pid: 99}} + if rpLive.Pid() != 99 { + t.Fatalf("expected pid 99, got %d", rpLive.Pid()) + } + _ = rpLive.Kill() + _ = rpLive.Signal(syscall.SIGTERM) + }) + + t.Run("topologicalSortAndSkip", func(t *testing.T) { + layers, err := topologicalSort([]TaskSpec{{ID: "root"}, {ID: "child", Dependencies: []string{"root"}}}) + if err != nil || len(layers) != 2 { + t.Fatalf("unexpected topological sort result: layers=%d err=%v", len(layers), err) + } + if _, err := topologicalSort([]TaskSpec{{ID: "cycle", Dependencies: []string{"cycle"}}}); err == nil { + t.Fatalf("expected cycle detection error") + } + + failed := map[string]TaskResult{"root": {ExitCode: 1}} + if skip, _ := shouldSkipTask(TaskSpec{ID: "child", Dependencies: []string{"root"}}, failed); !skip { + t.Fatalf("should skip when dependency failed") + } + if skip, _ := shouldSkipTask(TaskSpec{ID: "leaf"}, failed); skip { + t.Fatalf("should not skip task without dependencies") + } + if skip, _ := shouldSkipTask(TaskSpec{ID: "child-ok", Dependencies: []string{"root"}}, map[string]TaskResult{}); skip { + t.Fatalf("should not skip when dependencies succeeded") + } + }) + + t.Run("cancelledTaskResult", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + res := cancelledTaskResult("t1", ctx) + if res.ExitCode != 130 { + t.Fatalf("expected cancel exit code, got %d", res.ExitCode) + } + + timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 0) + defer timeoutCancel() + res = cancelledTaskResult("t2", timeoutCtx) + if res.ExitCode != 124 { + t.Fatalf("expected timeout exit code, got %d", res.ExitCode) + } + }) + + t.Run("generateFinalOutputAndArgs", func(t *testing.T) { + out := generateFinalOutput([]TaskResult{ + {TaskID: "ok", ExitCode: 0}, + {TaskID: "fail", ExitCode: 1, Error: "boom"}, + }) + if !strings.Contains(out, "ok") || !strings.Contains(out, "fail") { + t.Fatalf("unexpected summary output: %s", out) + } + out = generateFinalOutput([]TaskResult{{TaskID: "rich", ExitCode: 0, SessionID: "sess", LogPath: "/tmp/log", Message: "hello"}}) + if !strings.Contains(out, "Session: sess") || !strings.Contains(out, "Log: /tmp/log") || !strings.Contains(out, "hello") { + t.Fatalf("rich output missing fields: %s", out) + } + + args := buildCodexArgs(&Config{Mode: "new", WorkDir: "/tmp"}, "task") + if len(args) == 0 || args[3] != "/tmp" { + t.Fatalf("unexpected codex args: %+v", args) + } + args = buildCodexArgs(&Config{Mode: "resume", SessionID: "sess"}, "target") + if args[3] != "resume" || args[4] != "sess" { + t.Fatalf("unexpected resume args: %+v", args) + } + }) + + t.Run("executeConcurrentWrapper", func(t *testing.T) { + orig := runCodexTaskFn + defer func() { runCodexTaskFn = orig }() + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + return TaskResult{TaskID: task.ID, ExitCode: 0, Message: "done"} + } + os.Setenv("CODEAGENT_MAX_PARALLEL_WORKERS", "1") + defer os.Unsetenv("CODEAGENT_MAX_PARALLEL_WORKERS") + + results := executeConcurrent([][]TaskSpec{{{ID: "wrap"}}}, 1) + if len(results) != 1 || results[0].TaskID != "wrap" { + t.Fatalf("unexpected wrapper results: %+v", results) + } + + unbounded := executeConcurrentWithContext(context.Background(), [][]TaskSpec{{{ID: "unbounded"}}}, 1, 0) + if len(unbounded) != 1 || unbounded[0].ExitCode != 0 { + t.Fatalf("unexpected unbounded result: %+v", unbounded) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + cancelled := executeConcurrentWithContext(ctx, [][]TaskSpec{{{ID: "cancel"}}}, 1, 1) + if cancelled[0].ExitCode == 0 { + t.Fatalf("expected cancelled result, got %+v", cancelled[0]) + } + }) +} + +func TestExecutorRunCodexTaskWithContext(t *testing.T) { + origRunner := newCommandRunner + defer func() { newCommandRunner = origRunner }() + + t.Run("success", func(t *testing.T) { + var firstStdout *reasonReadCloser + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + rc := newReasonReadCloser(`{"type":"item.completed","item":{"type":"agent_message","text":"hello"}}`) + if firstStdout == nil { + firstStdout = rc + } + return &execFakeRunner{stdout: rc, process: &execFakeProcess{pid: 1234}} + } + + res := runCodexTaskWithContext(context.Background(), TaskSpec{ID: "task-1", Task: "payload", WorkDir: "."}, nil, nil, false, false, 1) + if res.Error != "" || res.Message != "hello" || res.ExitCode != 0 { + t.Fatalf("unexpected result: %+v", res) + } + + select { + case <-firstStdout.closedC: + case <-time.After(1 * time.Second): + t.Fatalf("stdout not closed with reason") + } + + orig := runCodexTaskFn + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + return TaskResult{TaskID: task.ID, ExitCode: 0, Message: "ok"} + } + t.Cleanup(func() { runCodexTaskFn = orig }) + + if res := runCodexTask(TaskSpec{Task: "task-text", WorkDir: "."}, true, 1); res.ExitCode != 0 { + t.Fatalf("runCodexTask failed: %+v", res) + } + + msg, threadID, code := runCodexProcess(context.Background(), []string{"arg"}, "content", false, 1) + if code != 0 || msg == "" { + t.Fatalf("runCodexProcess unexpected result: msg=%q code=%d threadID=%s", msg, code, threadID) + } + }) + + t.Run("startErrors", func(t *testing.T) { + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{startErr: errors.New("executable file not found"), process: &execFakeProcess{pid: 1}} + } + res := runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: "."}, nil, nil, false, false, 1) + if res.ExitCode != 127 { + t.Fatalf("expected missing executable exit code, got %d", res.ExitCode) + } + + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{startErr: errors.New("start failed"), process: &execFakeProcess{pid: 2}} + } + res = runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: "."}, nil, nil, false, false, 1) + if res.ExitCode == 0 { + t.Fatalf("expected non-zero exit on start failure") + } + }) + + t.Run("timeoutAndPipes", func(t *testing.T) { + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{ + stdout: newReasonReadCloser(`{"type":"item.completed","item":{"type":"agent_message","text":"slow"}}`), + process: &execFakeProcess{pid: 5}, + waitDelay: 20 * time.Millisecond, + } + } + res := runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: ".", UseStdin: true}, nil, nil, false, false, 0) + if res.ExitCode == 0 { + t.Fatalf("expected timeout result, got %+v", res) + } + }) + + t.Run("pipeErrors", func(t *testing.T) { + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{stdoutErr: errors.New("stdout fail"), process: &execFakeProcess{pid: 6}} + } + res := runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: "."}, nil, nil, false, false, 1) + if res.ExitCode == 0 { + t.Fatalf("expected failure on stdout pipe error") + } + + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{stdinErr: errors.New("stdin fail"), process: &execFakeProcess{pid: 7}} + } + res = runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: ".", UseStdin: true}, nil, nil, false, false, 1) + if res.ExitCode == 0 { + t.Fatalf("expected failure on stdin pipe error") + } + }) + + t.Run("waitExitError", func(t *testing.T) { + err := exec.Command("false").Run() + exitErr, _ := err.(*exec.ExitError) + if exitErr == nil { + t.Fatalf("expected exec.ExitError") + } + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{ + stdout: newReasonReadCloser(`{"type":"item.completed","item":{"type":"agent_message","text":"ignored"}}`), + process: &execFakeProcess{pid: 8}, + waitErr: exitErr, + } + } + res := runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: "."}, nil, nil, false, false, 1) + if res.ExitCode == 0 { + t.Fatalf("expected non-zero exit on wait error") + } + }) + + t.Run("contextCancelled", func(t *testing.T) { + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{ + stdout: newReasonReadCloser(`{"type":"item.completed","item":{"type":"agent_message","text":"cancel"}}`), + process: &execFakeProcess{pid: 9}, + waitDelay: 10 * time.Millisecond, + } + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + res := runCodexTaskWithContext(ctx, TaskSpec{Task: "payload", WorkDir: "."}, nil, nil, false, false, 1) + if res.ExitCode == 0 { + t.Fatalf("expected cancellation result") + } + }) + + t.Run("silentLogger", func(t *testing.T) { + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{ + stdout: newReasonReadCloser(`{"type":"item.completed","item":{"type":"agent_message","text":"quiet"}}`), + process: &execFakeProcess{pid: 10}, + } + } + _ = closeLogger() + res := runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: "."}, nil, nil, false, true, 1) + if res.ExitCode != 0 || res.LogPath == "" { + t.Fatalf("expected success with temp logger, got %+v", res) + } + _ = closeLogger() + }) + + t.Run("missingMessage", func(t *testing.T) { + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{ + stdout: newReasonReadCloser(`{"type":"item.completed","item":{"type":"task","text":"noop"}}`), + process: &execFakeProcess{pid: 11}, + } + } + res := runCodexTaskWithContext(context.Background(), TaskSpec{Task: "payload", WorkDir: "."}, nil, nil, false, false, 1) + if res.ExitCode == 0 { + t.Fatalf("expected failure when no agent_message returned") + } + }) +} + +func TestExecutorSignalAndTermination(t *testing.T) { + forceKillDelay.Store(0) + defer forceKillDelay.Store(5) + + proc := &execFakeProcess{pid: 42} + cmd := &execFakeRunner{process: proc} + + origNotify := signalNotifyFn + origStop := signalStopFn + defer func() { + signalNotifyFn = origNotify + signalStopFn = origStop + }() + + signalNotifyFn = func(c chan<- os.Signal, sigs ...os.Signal) { + go func() { c <- syscall.SIGINT }() + } + signalStopFn = func(c chan<- os.Signal) {} + + forwardSignals(context.Background(), cmd, func(string) {}) + time.Sleep(20 * time.Millisecond) + + proc.mu.Lock() + signalled := len(proc.signals) + proc.mu.Unlock() + if signalled == 0 { + t.Fatalf("process did not receive signal") + } + if proc.killed.Load() == 0 { + t.Fatalf("process was not killed after signal") + } + + timer := terminateProcess(cmd) + if timer == nil { + t.Fatalf("terminateProcess returned nil timer") + } + timer.Stop() + + ft := terminateCommand(cmd) + if ft == nil { + t.Fatalf("terminateCommand returned nil") + } + ft.Stop() + + cmdKill := &execFakeRunner{process: &execFakeProcess{pid: 50}} + ftKill := terminateCommand(cmdKill) + time.Sleep(10 * time.Millisecond) + if p, ok := cmdKill.process.(*execFakeProcess); ok && p.killed.Load() == 0 { + t.Fatalf("terminateCommand did not kill process") + } + ftKill.Stop() + + cmdKill2 := &execFakeRunner{process: &execFakeProcess{pid: 51}} + timer2 := terminateProcess(cmdKill2) + time.Sleep(10 * time.Millisecond) + if p, ok := cmdKill2.process.(*execFakeProcess); ok && p.killed.Load() == 0 { + t.Fatalf("terminateProcess did not kill process") + } + timer2.Stop() + + if terminateCommand(nil) != nil { + t.Fatalf("terminateCommand should return nil for nil cmd") + } + if terminateCommand(&execFakeRunner{allowNilProcess: true}) != nil { + t.Fatalf("terminateCommand should return nil when process is nil") + } + if terminateProcess(nil) != nil { + t.Fatalf("terminateProcess should return nil for nil cmd") + } + if terminateProcess(&execFakeRunner{allowNilProcess: true}) != nil { + t.Fatalf("terminateProcess should return nil when process is nil") + } + + signalNotifyFn = func(c chan<- os.Signal, sigs ...os.Signal) {} + ctxDone, cancelDone := context.WithCancel(context.Background()) + cancelDone() + forwardSignals(ctxDone, &execFakeRunner{process: &execFakeProcess{pid: 70}}, func(string) {}) +} + +func TestExecutorCancelReasonAndCloseWithReason(t *testing.T) { + if reason := cancelReason("", nil); !strings.Contains(reason, "Context") { + t.Fatalf("unexpected cancelReason for nil ctx: %s", reason) + } + ctx, cancel := context.WithTimeout(context.Background(), 0) + defer cancel() + if !strings.Contains(cancelReason("cmd", ctx), "timeout") { + t.Fatalf("expected timeout reason") + } + cancelCtx, cancelFn := context.WithCancel(context.Background()) + cancelFn() + if !strings.Contains(cancelReason("cmd", cancelCtx), "Execution cancelled") { + t.Fatalf("expected cancellation reason") + } + if !strings.Contains(cancelReason("", cancelCtx), "codex") { + t.Fatalf("expected default command name in cancel reason") + } + + rc := &reasonReadCloser{r: strings.NewReader("data"), closedC: make(chan struct{}, 1)} + closeWithReason(rc, "why") + select { + case <-rc.closedC: + default: + t.Fatalf("CloseWithReason was not called") + } + + plain := io.NopCloser(strings.NewReader("x")) + closeWithReason(plain, "noop") + closeWithReason(nil, "noop") +} + +func TestExecutorForceKillTimerStop(t *testing.T) { + done := make(chan struct{}, 1) + ft := &forceKillTimer{timer: time.AfterFunc(50*time.Millisecond, func() { done <- struct{}{} }), done: done} + ft.Stop() + + done2 := make(chan struct{}, 1) + ft2 := &forceKillTimer{timer: time.AfterFunc(0, func() { done2 <- struct{}{} }), done: done2} + time.Sleep(10 * time.Millisecond) + ft2.Stop() + + var nilTimer *forceKillTimer + nilTimer.Stop() + (&forceKillTimer{}).Stop() +} + +func TestExecutorForwardSignalsDefaults(t *testing.T) { + origNotify := signalNotifyFn + origStop := signalStopFn + signalNotifyFn = nil + signalStopFn = nil + defer func() { + signalNotifyFn = origNotify + signalStopFn = origStop + }() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + forwardSignals(ctx, &execFakeRunner{process: &execFakeProcess{pid: 80}}, func(string) {}) + time.Sleep(10 * time.Millisecond) +} diff --git a/codeagent-wrapper/logger.go b/codeagent-wrapper/logger.go index 9c638bf..a4da52e 100644 --- a/codeagent-wrapper/logger.go +++ b/codeagent-wrapper/logger.go @@ -478,3 +478,26 @@ func parsePIDFromLog(path string) (int, bool) { return 0, false } + +func logConcurrencyPlanning(limit, total int) { + logger := activeLogger() + if logger == nil { + return + } + logger.Info(fmt.Sprintf("parallel: worker_limit=%s total_tasks=%d", renderWorkerLimit(limit), total)) +} + +func logConcurrencyState(event, taskID string, active, limit int) { + logger := activeLogger() + if logger == nil { + return + } + logger.Debug(fmt.Sprintf("parallel: %s task=%s active=%d limit=%s", event, taskID, active, renderWorkerLimit(limit))) +} + +func renderWorkerLimit(limit int) string { + if limit <= 0 { + return "unbounded" + } + return strconv.Itoa(limit) +} diff --git a/codeagent-wrapper/logger_test.go b/codeagent-wrapper/logger_test.go index 74a5ea2..ebe139c 100644 --- a/codeagent-wrapper/logger_test.go +++ b/codeagent-wrapper/logger_test.go @@ -477,13 +477,13 @@ func TestRunCleanupOldLogsPerformanceBound(t *testing.T) { } func TestRunCleanupOldLogsCoverageSuite(t *testing.T) { - TestRunParseJSONStream_CoverageSuite(t) + TestBackendParseJSONStream_CoverageSuite(t) } // Reuse the existing coverage suite so the focused TestLogger run still exercises // the rest of the codebase and keeps coverage high. func TestRunLoggerCoverageSuite(t *testing.T) { - TestRunParseJSONStream_CoverageSuite(t) + TestBackendParseJSONStream_CoverageSuite(t) } func TestRunCleanupOldLogsKeepsCurrentProcessLog(t *testing.T) { diff --git a/codeagent-wrapper/main.go b/codeagent-wrapper/main.go index 1f26c3b..e81f333 100644 --- a/codeagent-wrapper/main.go +++ b/codeagent-wrapper/main.go @@ -14,7 +14,7 @@ import ( ) const ( - version = "5.0.0" + version = "5.2.0" defaultWorkdir = "." defaultTimeout = 7200 // seconds codexLogLineLimit = 1000 @@ -47,6 +47,8 @@ var ( signalStopFn = signal.Stop terminateCommandFn = terminateCommand defaultBuildArgsFn = buildCodexArgs + runTaskFn = runCodexTask + exitFn = os.Exit ) var forceKillDelay atomic.Int32 @@ -103,7 +105,7 @@ func runCleanupMode() int { func main() { exitCode := run() - os.Exit(exitCode) + exitFn(exitCode) } // run is the main logic, returns exit code for testability @@ -153,16 +155,59 @@ func run() (exitCode int) { // Handle remaining commands if len(os.Args) > 1 { - switch os.Args[1] { - case "--parallel": - if len(os.Args) > 2 { - fmt.Fprintln(os.Stderr, "ERROR: --parallel reads its task configuration from stdin and does not accept additional arguments.") + args := os.Args[1:] + parallelIndex := -1 + for i, arg := range args { + if arg == "--parallel" { + parallelIndex = i + break + } + } + + if parallelIndex != -1 { + backendName := defaultBackendName + var extras []string + + for i := 0; i < len(args); i++ { + arg := args[i] + switch { + case arg == "--parallel": + continue + case arg == "--backend": + if i+1 >= len(args) { + fmt.Fprintln(os.Stderr, "ERROR: --backend flag requires a value") + return 1 + } + backendName = args[i+1] + i++ + case strings.HasPrefix(arg, "--backend="): + value := strings.TrimPrefix(arg, "--backend=") + if value == "" { + fmt.Fprintln(os.Stderr, "ERROR: --backend flag requires a value") + return 1 + } + backendName = value + default: + extras = append(extras, arg) + } + } + + if len(extras) > 0 { + fmt.Fprintln(os.Stderr, "ERROR: --parallel reads its task configuration from stdin; only --backend is allowed.") fmt.Fprintln(os.Stderr, "Usage examples:") fmt.Fprintf(os.Stderr, " %s --parallel < tasks.txt\n", name) fmt.Fprintf(os.Stderr, " echo '...' | %s --parallel\n", name) fmt.Fprintf(os.Stderr, " %s --parallel <<'EOF'\n", name) return 1 } + + backend, err := selectBackendFn(backendName) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: %v\n", err) + return 1 + } + backendName = backend.Name() + data, err := io.ReadAll(stdinReader) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: failed to read stdin: %v\n", err) @@ -175,6 +220,13 @@ func run() (exitCode int) { return 1 } + cfg.GlobalBackend = backendName + for i := range cfg.Tasks { + if strings.TrimSpace(cfg.Tasks[i].Backend) == "" { + cfg.Tasks[i].Backend = backendName + } + } + timeoutSec := resolveTimeout() layers, err := topologicalSort(cfg.Tasks) if err != nil { @@ -318,7 +370,7 @@ func run() (exitCode int) { UseStdin: useStdin, } - result := runCodexTask(taskSpec, false, cfg.Timeout) + result := runTaskFn(taskSpec, false, cfg.Timeout) if result.ExitCode != 0 { return result.ExitCode diff --git a/codeagent-wrapper/main_test.go b/codeagent-wrapper/main_test.go index f72f00c..e7384c5 100644 --- a/codeagent-wrapper/main_test.go +++ b/codeagent-wrapper/main_test.go @@ -39,6 +39,9 @@ func resetTestHooks() { jsonMarshal = json.Marshal forceKillDelay.Store(5) closeLogger() + executablePathFn = os.Executable + runTaskFn = runCodexTask + exitFn = os.Exit } type capturedStdout struct { @@ -835,7 +838,7 @@ func TestRunCodexTask_ContextTimeout(t *testing.T) { } defer func() { terminateCommandFn = terminateCommand }() - result := runCodexTaskWithContext(ctx, TaskSpec{Task: "ctx-timeout", WorkDir: defaultWorkdir}, nil, false, false, 60) + result := runCodexTaskWithContext(ctx, TaskSpec{Task: "ctx-timeout", WorkDir: defaultWorkdir}, nil, nil, false, false, 60) if result.ExitCode != 124 { t.Fatalf("exit code = %d, want 124 (%s)", result.ExitCode, result.Error) @@ -869,7 +872,7 @@ func TestRunCodexTask_ContextTimeout(t *testing.T) { } } -func TestRunParseArgs_NewMode(t *testing.T) { +func TestBackendParseArgs_NewMode(t *testing.T) { tests := []struct { name string args []string @@ -931,7 +934,7 @@ func TestRunParseArgs_NewMode(t *testing.T) { } } -func TestRunParseArgs_ResumeMode(t *testing.T) { +func TestBackendParseArgs_ResumeMode(t *testing.T) { tests := []struct { name string args []string @@ -980,7 +983,7 @@ func TestRunParseArgs_ResumeMode(t *testing.T) { } } -func TestRunParseArgs_BackendFlag(t *testing.T) { +func TestBackendParseArgs_BackendFlag(t *testing.T) { tests := []struct { name string args []string @@ -1034,7 +1037,100 @@ func TestRunParseArgs_BackendFlag(t *testing.T) { } } -func TestRunParseParallelConfig_Success(t *testing.T) { +func TestBackendParseArgs_SkipPermissions(t *testing.T) { + const envKey = "CODEAGENT_SKIP_PERMISSIONS" + t.Cleanup(func() { os.Unsetenv(envKey) }) + + os.Setenv(envKey, "true") + os.Args = []string{"codeagent-wrapper", "task"} + cfg, err := parseArgs() + if err != nil { + t.Fatalf("parseArgs() unexpected error: %v", err) + } + if !cfg.SkipPermissions { + t.Fatalf("SkipPermissions should default to true when env is set") + } + + os.Args = []string{"codeagent-wrapper", "--skip-permissions=false", "task"} + cfg, err = parseArgs() + if err != nil { + t.Fatalf("parseArgs() unexpected error: %v", err) + } + if cfg.SkipPermissions { + t.Fatalf("SkipPermissions should be false when flag overrides env") + } + + os.Args = []string{"codeagent-wrapper", "--skip-permissions", "task"} + cfg, err = parseArgs() + if err != nil { + t.Fatalf("parseArgs() unexpected error: %v", err) + } + if !cfg.SkipPermissions { + t.Fatalf("SkipPermissions should be true for plain --skip-permissions flag") + } + + os.Args = []string{"codeagent-wrapper", "--dangerously-skip-permissions", "task"} + cfg, err = parseArgs() + if err != nil { + t.Fatalf("parseArgs() unexpected error: %v", err) + } + if !cfg.SkipPermissions { + t.Fatalf("SkipPermissions should be true for dangerous flag") + } + + os.Args = []string{"codeagent-wrapper", "--dangerously-skip-permissions=false", "task"} + cfg, err = parseArgs() + if err != nil { + t.Fatalf("parseArgs() unexpected error: %v", err) + } + if cfg.SkipPermissions { + t.Fatalf("SkipPermissions should be false when dangerous flag is set to false") + } +} + +func TestBackendParseBoolFlag(t *testing.T) { + tests := []struct { + name string + val string + def bool + want bool + }{ + {"true literal", "true", false, true}, + {"false literal", "false", true, false}, + {"default on unknown", "maybe", true, true}, + {"empty uses default", "", false, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := parseBoolFlag(tt.val, tt.def); got != tt.want { + t.Fatalf("parseBoolFlag(%q,%v) = %v, want %v", tt.val, tt.def, got, tt.want) + } + }) + } +} + +func TestBackendEnvFlagEnabled(t *testing.T) { + const key = "TEST_FLAG_ENABLED" + t.Cleanup(func() { os.Unsetenv(key) }) + + os.Unsetenv(key) + if envFlagEnabled(key) { + t.Fatalf("envFlagEnabled should be false when unset") + } + + os.Setenv(key, "true") + if !envFlagEnabled(key) { + t.Fatalf("envFlagEnabled should be true for 'true'") + } + + os.Setenv(key, "no") + if envFlagEnabled(key) { + t.Fatalf("envFlagEnabled should be false for 'no'") + } +} + +func TestParallelParseConfig_Success(t *testing.T) { input := `---TASK--- id: task-1 dependencies: task-0 @@ -1054,13 +1150,37 @@ do something` } } -func TestRunParseParallelConfig_InvalidFormat(t *testing.T) { +func TestParallelParseConfig_Backend(t *testing.T) { + input := `---TASK--- +id: task-1 +backend: gemini +session_id: sess-123 +---CONTENT--- +do something` + + cfg, err := parseParallelConfig([]byte(input)) + if err != nil { + t.Fatalf("parseParallelConfig() unexpected error: %v", err) + } + if len(cfg.Tasks) != 1 { + t.Fatalf("expected 1 task, got %d", len(cfg.Tasks)) + } + task := cfg.Tasks[0] + if task.Backend != "gemini" { + t.Fatalf("backend = %q, want gemini", task.Backend) + } + if task.Mode != "resume" || task.SessionID != "sess-123" { + t.Fatalf("expected resume mode with session, got mode=%q session=%q", task.Mode, task.SessionID) + } +} + +func TestParallelParseConfig_InvalidFormat(t *testing.T) { if _, err := parseParallelConfig([]byte("invalid format")); err == nil { t.Fatalf("expected error for invalid format, got nil") } } -func TestRunParseParallelConfig_EmptyTasks(t *testing.T) { +func TestParallelParseConfig_EmptyTasks(t *testing.T) { input := `---TASK--- id: empty ---CONTENT--- @@ -1070,7 +1190,7 @@ id: empty } } -func TestRunParseParallelConfig_MissingID(t *testing.T) { +func TestParallelParseConfig_MissingID(t *testing.T) { input := `---TASK--- ---CONTENT--- do something` @@ -1079,7 +1199,7 @@ do something` } } -func TestRunParseParallelConfig_MissingTask(t *testing.T) { +func TestParallelParseConfig_MissingTask(t *testing.T) { input := `---TASK--- id: task-1 ---CONTENT--- @@ -1089,7 +1209,7 @@ id: task-1 } } -func TestRunParseParallelConfig_DuplicateID(t *testing.T) { +func TestParallelParseConfig_DuplicateID(t *testing.T) { input := `---TASK--- id: dup ---CONTENT--- @@ -1103,7 +1223,7 @@ two` } } -func TestRunParseParallelConfig_DelimiterFormat(t *testing.T) { +func TestParallelParseConfig_DelimiterFormat(t *testing.T) { input := `---TASK--- id: T1 workdir: /tmp @@ -1181,7 +1301,7 @@ func TestRunBuildCodexArgs_ResumeMode(t *testing.T) { } } -func TestSelectBackend(t *testing.T) { +func TestBackendSelectBackend(t *testing.T) { tests := []struct { name string in string @@ -1216,13 +1336,13 @@ func TestSelectBackend(t *testing.T) { } } -func TestSelectBackend_Invalid(t *testing.T) { +func TestBackendSelectBackend_Invalid(t *testing.T) { if _, err := selectBackend("unknown"); err == nil { t.Fatalf("expected error for invalid backend") } } -func TestSelectBackend_DefaultOnEmpty(t *testing.T) { +func TestBackendSelectBackend_DefaultOnEmpty(t *testing.T) { backend, err := selectBackend("") if err != nil { t.Fatalf("selectBackend(\"\") error = %v", err) @@ -1251,7 +1371,7 @@ func TestBackendBuildArgs_ClaudeBackend(t *testing.T) { backend := ClaudeBackend{} cfg := &Config{Mode: "new", WorkDir: defaultWorkdir} got := backend.BuildArgs(cfg, "todo") - want := []string{"-p", "--dangerously-skip-permissions", "--output-format", "stream-json", "--verbose", "todo"} + want := []string{"-p", "-C", defaultWorkdir, "--output-format", "stream-json", "--verbose", "todo"} if len(got) != len(want) { t.Fatalf("length mismatch") } @@ -1272,7 +1392,7 @@ func TestClaudeBackendBuildArgs_OutputValidation(t *testing.T) { target := "ensure-flags" args := backend.BuildArgs(cfg, target) - expectedPrefix := []string{"-p", "--dangerously-skip-permissions", "--output-format", "stream-json", "--verbose"} + expectedPrefix := []string{"-p", "--output-format", "stream-json", "--verbose"} if len(args) != len(expectedPrefix)+1 { t.Fatalf("args length=%d, want %d", len(args), len(expectedPrefix)+1) @@ -1291,7 +1411,7 @@ func TestBackendBuildArgs_GeminiBackend(t *testing.T) { backend := GeminiBackend{} cfg := &Config{Mode: "new"} got := backend.BuildArgs(cfg, "task") - want := []string{"-o", "stream-json", "-y", "-p", "task"} + want := []string{"-o", "stream-json", "-y", "-C", defaultWorkdir, "-p", "task"} if len(got) != len(want) { t.Fatalf("length mismatch") } @@ -1402,7 +1522,7 @@ func TestRunNormalizeText(t *testing.T) { } } -func TestRunParseJSONStream(t *testing.T) { +func TestBackendParseJSONStream(t *testing.T) { type testCase struct { name string input string @@ -1441,7 +1561,7 @@ func TestRunParseJSONStream(t *testing.T) { } } -func TestParseJSONStream_ClaudeEvents(t *testing.T) { +func TestBackendParseJSONStream_ClaudeEvents(t *testing.T) { input := `{"type":"system","subtype":"init","session_id":"abc123"} {"type":"result","subtype":"success","result":"Hello!","session_id":"abc123"}` @@ -1455,7 +1575,7 @@ func TestParseJSONStream_ClaudeEvents(t *testing.T) { } } -func TestParseJSONStream_GeminiEvents(t *testing.T) { +func TestBackendParseJSONStream_GeminiEvents(t *testing.T) { input := `{"type":"init","session_id":"xyz789"} {"type":"message","role":"assistant","content":"Hi","delta":true,"session_id":"xyz789"} {"type":"message","role":"assistant","content":" there","delta":true} @@ -1471,7 +1591,7 @@ func TestParseJSONStream_GeminiEvents(t *testing.T) { } } -func TestRunParseJSONStreamWithWarn_InvalidLine(t *testing.T) { +func TestBackendParseJSONStreamWithWarn_InvalidLine(t *testing.T) { var warnings []string warnFn := func(msg string) { warnings = append(warnings, msg) } message, threadID := parseJSONStreamWithWarn(strings.NewReader("not-json"), warnFn) @@ -1483,7 +1603,35 @@ func TestRunParseJSONStreamWithWarn_InvalidLine(t *testing.T) { } } -func TestDiscardInvalidJSON(t *testing.T) { +func TestBackendParseJSONStream_OnMessage(t *testing.T) { + var called int + message, threadID := parseJSONStreamInternal(strings.NewReader(`{"type":"item.completed","item":{"type":"agent_message","text":"hook"}}`), nil, nil, func() { + called++ + }) + if message != "hook" { + t.Fatalf("message = %q, want hook", message) + } + if threadID != "" { + t.Fatalf("threadID = %q, want empty", threadID) + } + if called == 0 { + t.Fatalf("onMessage hook not invoked") + } +} + +func TestBackendParseJSONStream_ScannerError(t *testing.T) { + var warnings []string + warnFn := func(msg string) { warnings = append(warnings, msg) } + message, threadID := parseJSONStreamInternal(errReader{err: errors.New("scan-fail")}, warnFn, nil, nil) + if message != "" || threadID != "" { + t.Fatalf("expected empty output on scanner error, got message=%q threadID=%q", message, threadID) + } + if len(warnings) == 0 { + t.Fatalf("expected warning on scanner error") + } +} + +func TestBackendDiscardInvalidJSON(t *testing.T) { reader := bufio.NewReader(strings.NewReader("line1\nline2\n")) newReader, err := discardInvalidJSON(nil, reader) if err != nil && !errors.Is(err, io.EOF) { @@ -1500,7 +1648,7 @@ func TestDiscardInvalidJSON(t *testing.T) { } } -func TestHasKey(t *testing.T) { +func TestBackendHasKey(t *testing.T) { raw := map[string]json.RawMessage{ "present": json.RawMessage(`true`), } @@ -1698,7 +1846,7 @@ func TestNewLogWriterDefaultMaxLen(t *testing.T) { } } -func TestRunPrintHelp(t *testing.T) { +func TestBackendPrintHelp(t *testing.T) { oldStdout := os.Stdout r, w, _ := os.Pipe() os.Stdout = w @@ -1817,6 +1965,71 @@ func TestRunCodexTask_WithEcho(t *testing.T) { } } +func TestRunCodexTaskFn_UsesTaskBackend(t *testing.T) { + defer resetTestHooks() + + fake := newFakeCmd(fakeCmdConfig{ + StdoutPlan: []fakeStdoutEvent{ + {Data: `{"type":"thread.started","thread_id":"backend-thread"}` + "\n"}, + {Data: `{"type":"item.completed","item":{"type":"agent_message","text":"backend-msg"}}` + "\n"}, + }, + }) + + var seenName string + var seenArgs []string + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + seenName = name + seenArgs = append([]string(nil), args...) + return fake + } + selectBackendFn = func(name string) (Backend, error) { + return testBackend{ + name: strings.ToLower(name), + command: "custom-cli", + argsFn: func(cfg *Config, targetArg string) []string { + return []string{"do", targetArg} + }, + }, nil + } + + res := runCodexTaskFn(TaskSpec{ID: "task-1", Task: "payload", Backend: "Custom"}, 5) + + if res.ExitCode != 0 || res.Message != "backend-msg" || res.SessionID != "backend-thread" { + t.Fatalf("unexpected result: %+v", res) + } + if seenName != "custom-cli" { + t.Fatalf("command name = %q, want custom-cli", seenName) + } + expectedArgs := []string{"do", "payload"} + if len(seenArgs) != len(expectedArgs) { + t.Fatalf("args len = %d, want %d", len(seenArgs), len(expectedArgs)) + } + for i, want := range expectedArgs { + if seenArgs[i] != want { + t.Fatalf("args[%d]=%q, want %q", i, seenArgs[i], want) + } + } +} + +func TestRunCodexTaskFn_InvalidBackend(t *testing.T) { + defer resetTestHooks() + + selectBackendFn = func(name string) (Backend, error) { + return nil, fmt.Errorf("invalid backend: %s", name) + } + + res := runCodexTaskFn(TaskSpec{ID: "bad-task", Task: "noop", Backend: "unknown"}, 5) + if res.ExitCode == 0 { + t.Fatalf("expected failure for invalid backend") + } + if res.TaskID != "bad-task" { + t.Fatalf("TaskID = %q, want bad-task", res.TaskID) + } + if !strings.Contains(res.Error, "invalid backend") { + t.Fatalf("error %q missing backend message", res.Error) + } +} + func TestRunCodexTask_LogPathWithActiveLogger(t *testing.T) { defer resetTestHooks() @@ -2008,21 +2221,23 @@ func TestForwardSignals_ContextCancel(t *testing.T) { } func TestCancelReason(t *testing.T) { - if got := cancelReason(nil); got != "Context cancelled" { + const cmdName = "codex" + + if got := cancelReason(cmdName, nil); got != "Context cancelled" { t.Fatalf("cancelReason(nil) = %q, want %q", got, "Context cancelled") } ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancelTimeout() <-ctxTimeout.Done() - wantTimeout := fmt.Sprintf("%s execution timeout", codexCommand) - if got := cancelReason(ctxTimeout); got != wantTimeout { + wantTimeout := fmt.Sprintf("%s execution timeout", cmdName) + if got := cancelReason(cmdName, ctxTimeout); got != wantTimeout { t.Fatalf("cancelReason(deadline) = %q, want %q", got, wantTimeout) } ctxCancelled, cancel := context.WithCancel(context.Background()) cancel() - if got := cancelReason(ctxCancelled); got != "Execution cancelled, terminating codex process" { + if got := cancelReason(cmdName, ctxCancelled); got != "Execution cancelled, terminating codex process" { t.Fatalf("cancelReason(cancelled) = %q, want %q", got, "Execution cancelled, terminating codex process") } } @@ -2143,7 +2358,7 @@ func TestRunTopologicalSort_Branching(t *testing.T) { } } -func TestRunTopologicalSort_ParallelTasks(t *testing.T) { +func TestParallelTopologicalSortTasks(t *testing.T) { tasks := []TaskSpec{{ID: "a"}, {ID: "b"}, {ID: "c"}} layers, err := topologicalSort(tasks) if err != nil { @@ -2226,7 +2441,7 @@ func TestRunTopologicalSort_LargeGraph(t *testing.T) { } } -func TestRunExecuteConcurrent_ParallelExecution(t *testing.T) { +func TestParallelExecuteConcurrent(t *testing.T) { orig := runCodexTaskFn defer func() { runCodexTaskFn = orig }() @@ -2346,7 +2561,51 @@ func TestRunExecuteConcurrent_LargeFanout(t *testing.T) { } } -func TestRun_ParallelFlag(t *testing.T) { +func TestParallelBackendPropagation(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + orig := runCodexTaskFn + var mu sync.Mutex + seen := make(map[string]string) + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + mu.Lock() + seen[task.ID] = task.Backend + mu.Unlock() + return TaskResult{TaskID: task.ID, ExitCode: 0, Message: "ok"} + } + t.Cleanup(func() { runCodexTaskFn = orig }) + + stdinReader = strings.NewReader(`---TASK--- +id: first +---CONTENT--- +do one + +---TASK--- +id: second +backend: gemini +---CONTENT--- +do two`) + os.Args = []string{"codeagent-wrapper", "--backend", "claude", "--parallel"} + + if code := run(); code != 0 { + t.Fatalf("run exit = %d, want 0", code) + } + + mu.Lock() + firstBackend, firstOK := seen["first"] + secondBackend, secondOK := seen["second"] + mu.Unlock() + + if !firstOK || firstBackend != "claude" { + t.Fatalf("first backend = %q (present=%v), want claude", firstBackend, firstOK) + } + if !secondOK || secondBackend != "gemini" { + t.Fatalf("second backend = %q (present=%v), want gemini", secondBackend, secondOK) + } +} + +func TestParallelFlag(t *testing.T) { oldArgs := os.Args defer func() { os.Args = oldArgs }() @@ -2371,7 +2630,22 @@ test` } } -func TestRun_ParallelTriggersCleanup(t *testing.T) { +func TestParallelInvalidBackend(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + stdinReader = strings.NewReader(`---TASK--- +id: only +---CONTENT--- +noop`) + os.Args = []string{"codeagent-wrapper", "--parallel", "--backend", "unknown"} + + if code := run(); code == 0 { + t.Fatalf("expected non-zero exit for invalid backend in parallel mode") + } +} + +func TestParallelTriggersCleanup(t *testing.T) { defer resetTestHooks() oldArgs := os.Args defer func() { os.Args = oldArgs }() @@ -2402,19 +2676,45 @@ noop`) } } -func TestRun_Version(t *testing.T) { +func TestVersionFlag(t *testing.T) { defer resetTestHooks() os.Args = []string{"codeagent-wrapper", "--version"} - if code := run(); code != 0 { - t.Errorf("exit = %d, want 0", code) + output := captureOutput(t, func() { + if code := run(); code != 0 { + t.Errorf("exit = %d, want 0", code) + } + }) + want := "codeagent-wrapper version 5.1.0\n" + if output != want { + t.Fatalf("output = %q, want %q", output, want) } } -func TestRun_VersionShort(t *testing.T) { +func TestVersionShortFlag(t *testing.T) { defer resetTestHooks() os.Args = []string{"codeagent-wrapper", "-v"} - if code := run(); code != 0 { - t.Errorf("exit = %d, want 0", code) + output := captureOutput(t, func() { + if code := run(); code != 0 { + t.Errorf("exit = %d, want 0", code) + } + }) + want := "codeagent-wrapper version 5.1.0\n" + if output != want { + t.Fatalf("output = %q, want %q", output, want) + } +} + +func TestVersionLegacyAlias(t *testing.T) { + defer resetTestHooks() + os.Args = []string{"codex-wrapper", "--version"} + output := captureOutput(t, func() { + if code := run(); code != 0 { + t.Errorf("exit = %d, want 0", code) + } + }) + want := "codex-wrapper version 5.1.0\n" + if output != want { + t.Fatalf("output = %q, want %q", output, want) } } @@ -2447,7 +2747,7 @@ func TestRun_HelpDoesNotTriggerCleanup(t *testing.T) { } } -func TestRun_VersionDoesNotTriggerCleanup(t *testing.T) { +func TestVersionDoesNotTriggerCleanup(t *testing.T) { defer resetTestHooks() os.Args = []string{"codex-wrapper", "--version"} cleanupLogsFn = func() (CleanupStats, error) { @@ -2460,7 +2760,261 @@ func TestRun_VersionDoesNotTriggerCleanup(t *testing.T) { } } -func TestRunCleanupMode_Success(t *testing.T) { +func TestVersionCoverageFullRun(t *testing.T) { + t.Run("cleanupHelpers", func(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = nil + runStartupCleanup() + if code := runCleanupMode(); code == 0 { + t.Fatalf("runCleanupMode exit = %d, want non-zero when cleanup is nil", code) + } + + logger, err := NewLoggerWithSuffix("version-coverage") + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + setLogger(logger) + + cleanupLogsFn = func() (CleanupStats, error) { + return CleanupStats{ + Scanned: 2, + Deleted: 1, + Kept: 1, + DeletedFiles: []string{"old.log"}, + KeptFiles: []string{"keep.log"}, + Errors: 1, + }, fmt.Errorf("warn") + } + runStartupCleanup() + + cleanupLogsFn = func() (CleanupStats, error) { + panic("panic cleanup") + } + runStartupCleanup() + + cleanupLogsFn = func() (CleanupStats, error) { + return CleanupStats{ + Scanned: 2, + Deleted: 1, + Kept: 1, + DeletedFiles: []string{"old.log"}, + KeptFiles: []string{"keep.log"}, + Errors: 1, + }, nil + } + if code := runCleanupMode(); code != 0 { + t.Fatalf("runCleanupMode exit = %d, want 0", code) + } + + cleanupLogsFn = func() (CleanupStats, error) { + return CleanupStats{}, fmt.Errorf("expected failure") + } + if code := runCleanupMode(); code == 0 { + t.Fatalf("runCleanupMode exit = %d, want non-zero on error", code) + } + + printHelp() + + _ = closeLogger() + _ = logger.RemoveLogFile() + loggerPtr.Store(nil) + }) + + t.Run("parseArgsError", func(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + cleanupCalled := false + cleanupHook = func() { cleanupCalled = true } + + selectBackendFn = func(name string) (Backend, error) { + return testBackend{name: name, command: "echo"}, nil + } + runTaskFn = func(task TaskSpec, silent bool, timeout int) TaskResult { + return TaskResult{ExitCode: 0} + } + + os.Args = []string{"codeagent-wrapper"} + if code := run(); code == 0 { + t.Fatalf("run exit = %d, want non-zero for missing task", code) + } + if !cleanupCalled { + t.Fatalf("cleanup hook not invoked on error path") + } + }) + + t.Run("helpAndCleanup", func(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + os.Args = []string{"codeagent-wrapper", "--help"} + if code := run(); code != 0 { + t.Fatalf("run exit = %d, want 0 for help", code) + } + + os.Args = []string{"codeagent-wrapper", "--cleanup"} + if code := run(); code != 0 { + t.Fatalf("run exit = %d, want 0 for cleanup", code) + } + }) + + t.Run("happyPath", func(t *testing.T) { + defer resetTestHooks() + cleanupHook = func() {} + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + selectBackendFn = func(name string) (Backend, error) { + return testBackend{ + name: name, + command: "echo", + argsFn: func(cfg *Config, targetArg string) []string { + return []string{"--task", targetArg, "--workdir", cfg.WorkDir} + }, + }, nil + } + runTaskFn = func(task TaskSpec, silent bool, timeout int) TaskResult { + return TaskResult{TaskID: "task-id", ExitCode: 0, Message: "ok", SessionID: "sess-123"} + } + + stdinReader = strings.NewReader("task line with $ and \\\nnext line with `tick` and \"quote\" and 'single'") + isTerminalFn = func() bool { return false } + os.Args = []string{"codeagent-wrapper", "-", "/tmp/workdir"} + if code := run(); code != 0 { + t.Fatalf("run exit = %d, want 0", code) + } + }) + + t.Run("nonExplicitTaskFailure", func(t *testing.T) { + defer resetTestHooks() + cleanupCalled := false + cleanupHook = func() { cleanupCalled = true } + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + selectBackendFn = func(name string) (Backend, error) { + return testBackend{ + name: name, + command: "echo", + argsFn: func(cfg *Config, targetArg string) []string { + return []string{"--task", targetArg} + }, + }, nil + } + runTaskFn = func(task TaskSpec, silent bool, timeout int) TaskResult { + return TaskResult{TaskID: "fail", ExitCode: 2, Message: "error"} + } + + stdinReader = strings.NewReader("") + isTerminalFn = func() bool { return true } + os.Args = []string{"codeagent-wrapper", "raw-task"} + if code := run(); code != 2 { + t.Fatalf("run exit = %d, want 2", code) + } + if !cleanupCalled { + t.Fatalf("cleanup hook not invoked on failure path") + } + }) + + t.Run("pipedTaskLongInput", func(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + selectBackendFn = func(name string) (Backend, error) { + return testBackend{ + name: name, + command: "echo", + argsFn: func(cfg *Config, targetArg string) []string { + return []string{"--task", targetArg} + }, + }, nil + } + runTaskFn = func(task TaskSpec, silent bool, timeout int) TaskResult { + return TaskResult{TaskID: "piped", ExitCode: 0, Message: "ok"} + } + + stdinReader = strings.NewReader(strings.Repeat("x", 900)) + isTerminalFn = func() bool { return false } + os.Args = []string{"codeagent-wrapper", "ignored"} + if code := run(); code != 0 { + t.Fatalf("run exit = %d, want 0 for piped input", code) + } + }) + + t.Run("explicitStdinReadError", func(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + runTaskFn = func(task TaskSpec, silent bool, timeout int) TaskResult { + return TaskResult{ExitCode: 0} + } + + stdinReader = errReader{err: errors.New("read-fail")} + os.Args = []string{"codeagent-wrapper", "-", "/tmp/workdir"} + if code := run(); code == 0 { + t.Fatalf("run exit = %d, want non-zero on stdin read error", code) + } + }) + + t.Run("parallelFlow", func(t *testing.T) { + defer resetTestHooks() + cleanupHook = func() {} + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + return TaskResult{TaskID: task.ID, ExitCode: 0, Message: "ok"} + } + + stdinReader = strings.NewReader(`---TASK--- +id: first +---CONTENT--- +do one + +---TASK--- +id: second +dependencies: first +---CONTENT--- +do two`) + os.Args = []string{"codeagent-wrapper", "--parallel"} + if code := run(); code != 0 { + t.Fatalf("run exit = %d, want 0", code) + } + }) + + t.Run("parallelErrors", func(t *testing.T) { + defer resetTestHooks() + cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, nil } + + os.Args = []string{"codeagent-wrapper", "--parallel", "extra"} + if code := run(); code == 0 { + t.Fatalf("run exit = %d, want error for extra args", code) + } + + stdinReader = strings.NewReader("invalid format") + os.Args = []string{"codeagent-wrapper", "--parallel"} + if code := run(); code == 0 { + t.Fatalf("run exit = %d, want error for invalid config", code) + } + + stdinReader = strings.NewReader(`---TASK--- +id: second +dependencies: missing +---CONTENT--- +task`) + if code := run(); code == 0 { + t.Fatalf("run exit = %d, want error for invalid DAG", code) + } + }) +} + +func TestVersionMainWrapper(t *testing.T) { + defer resetTestHooks() + exitCalled := -1 + exitFn = func(code int) { exitCalled = code } + os.Args = []string{"codeagent-wrapper", "--version"} + main() + if exitCalled != 0 { + t.Fatalf("main exit = %d, want 0", exitCalled) + } +} + +func TestBackendCleanupMode_Success(t *testing.T) { defer resetTestHooks() cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{ @@ -2485,7 +3039,7 @@ func TestRunCleanupMode_Success(t *testing.T) { } } -func TestRunCleanupMode_SuccessWithErrorsLine(t *testing.T) { +func TestBackendCleanupMode_SuccessWithErrorsLine(t *testing.T) { defer resetTestHooks() cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{ @@ -2510,7 +3064,7 @@ func TestRunCleanupMode_SuccessWithErrorsLine(t *testing.T) { } } -func TestRunCleanupMode_ZeroStatsOutput(t *testing.T) { +func TestBackendCleanupMode_ZeroStatsOutput(t *testing.T) { defer resetTestHooks() calls := 0 cleanupLogsFn = func() (CleanupStats, error) { @@ -2534,7 +3088,7 @@ func TestRunCleanupMode_ZeroStatsOutput(t *testing.T) { } } -func TestRunCleanupMode_Error(t *testing.T) { +func TestBackendCleanupMode_Error(t *testing.T) { defer resetTestHooks() cleanupLogsFn = func() (CleanupStats, error) { return CleanupStats{}, fmt.Errorf("boom") @@ -2552,7 +3106,7 @@ func TestRunCleanupMode_Error(t *testing.T) { } } -func TestRunCleanupMode_MissingFn(t *testing.T) { +func TestBackendCleanupMode_MissingFn(t *testing.T) { defer resetTestHooks() cleanupLogsFn = nil @@ -2883,13 +3437,13 @@ func TestRun_CleanupHookAlwaysCalled(t *testing.T) { } } -func TestRunStartupCleanupNil(t *testing.T) { +func TestBackendStartupCleanupNil(t *testing.T) { defer resetTestHooks() cleanupLogsFn = nil runStartupCleanup() } -func TestRunStartupCleanupErrorLogged(t *testing.T) { +func TestBackendStartupCleanupErrorLogged(t *testing.T) { defer resetTestHooks() logger, err := NewLoggerWithSuffix("startup-error") @@ -2935,12 +3489,12 @@ func TestRun_CleanupFailureDoesNotBlock(t *testing.T) { } // Coverage helper reused by logger_test to keep focused runs exercising core paths. -func TestRunParseJSONStream_CoverageSuite(t *testing.T) { +func TestBackendParseJSONStream_CoverageSuite(t *testing.T) { suite := []struct { name string fn func(*testing.T) }{ - {"TestRunParseJSONStream", TestRunParseJSONStream}, + {"TestBackendParseJSONStream", TestBackendParseJSONStream}, {"TestRunNormalizeText", TestRunNormalizeText}, {"TestRunTruncate", TestRunTruncate}, {"TestRunMin", TestRunMin}, @@ -3040,7 +3594,7 @@ func TestNewLogWriterDefaultLimit(t *testing.T) { } } -func TestRunDiscardInvalidJSON(t *testing.T) { +func TestBackendDiscardInvalidJSONBuffer(t *testing.T) { reader := bufio.NewReader(strings.NewReader("bad line\n{\"type\":\"ok\"}\n")) next, err := discardInvalidJSON(nil, reader) if err != nil { @@ -3130,7 +3684,40 @@ func TestRunForwardSignals(t *testing.T) { } } -func TestRunNonParallelPrintsLogPath(t *testing.T) { +// Backend-focused coverage suite to ensure run() paths stay exercised under the focused pattern. +func TestBackendRunCoverage(t *testing.T) { + suite := []struct { + name string + fn func(*testing.T) + }{ + {"SuccessfulExecution", TestRun_SuccessfulExecution}, + {"ExplicitStdinSuccess", TestRun_ExplicitStdinSuccess}, + {"PipedTaskSuccess", TestRun_PipedTaskSuccess}, + {"LoggerLifecycle", TestRun_LoggerLifecycle}, + {"CleanupFlag", TestRun_CleanupFlag}, + {"NoArgs", TestRun_NoArgs}, + {"CommandFails", TestRun_CommandFails}, + {"CleanupHookAlwaysCalled", TestRun_CleanupHookAlwaysCalled}, + {"VersionFlag", TestVersionFlag}, + {"VersionShortFlag", TestVersionShortFlag}, + {"VersionLegacyAlias", TestVersionLegacyAlias}, + {"Help", TestRun_Help}, + {"HelpShort", TestRun_HelpShort}, + {"HelpDoesNotTriggerCleanup", TestRun_HelpDoesNotTriggerCleanup}, + {"VersionDoesNotTriggerCleanup", TestVersionDoesNotTriggerCleanup}, + {"VersionCoverageFullRun", TestVersionCoverageFullRun}, + {"ExplicitStdinEmpty", TestRun_ExplicitStdinEmpty}, + {"ExplicitStdinReadError", TestRun_ExplicitStdinReadError}, + {"PipedTaskReadError", TestRun_PipedTaskReadError}, + {"VersionMainWrapper", TestVersionMainWrapper}, + } + + for _, tc := range suite { + t.Run(tc.name, tc.fn) + } +} + +func TestParallelLogPathInSerialMode(t *testing.T) { defer resetTestHooks() tempDir := t.TempDir() diff --git a/codeagent-wrapper/wrapper_name.go b/codeagent-wrapper/wrapper_name.go index 01e74d3..236df20 100644 --- a/codeagent-wrapper/wrapper_name.go +++ b/codeagent-wrapper/wrapper_name.go @@ -11,6 +11,24 @@ const ( legacyWrapperName = "codex-wrapper" ) +var executablePathFn = os.Executable + +func normalizeWrapperName(path string) string { + if path == "" { + return "" + } + + base := filepath.Base(path) + base = strings.TrimSuffix(base, ".exe") // tolerate Windows executables + + switch base { + case defaultWrapperName, legacyWrapperName: + return base + default: + return "" + } +} + // currentWrapperName resolves the wrapper name based on the invoked binary. // Only known names are honored to avoid leaking build/test binary names into logs. func currentWrapperName() string { @@ -18,15 +36,31 @@ func currentWrapperName() string { return defaultWrapperName } - base := filepath.Base(os.Args[0]) - base = strings.TrimSuffix(base, ".exe") // tolerate Windows executables - - switch base { - case defaultWrapperName, legacyWrapperName: - return base - default: - return defaultWrapperName + if name := normalizeWrapperName(os.Args[0]); name != "" { + return name } + + execPath, err := executablePathFn() + if err == nil { + if name := normalizeWrapperName(execPath); name != "" { + return name + } + + if resolved, err := filepath.EvalSymlinks(execPath); err == nil { + if name := normalizeWrapperName(resolved); name != "" { + return name + } + if alias := resolveAlias(execPath, resolved); alias != "" { + return alias + } + } + + if alias := resolveAlias(execPath, ""); alias != "" { + return alias + } + } + + return defaultWrapperName } // logPrefixes returns the set of accepted log name prefixes, including the @@ -58,3 +92,35 @@ func primaryLogPrefix() string { } return prefixes[0] } + +func resolveAlias(execPath string, target string) string { + if execPath == "" { + return "" + } + + dir := filepath.Dir(execPath) + for _, candidate := range []string{defaultWrapperName, legacyWrapperName} { + aliasPath := filepath.Join(dir, candidate) + info, err := os.Lstat(aliasPath) + if err != nil { + continue + } + if info.Mode()&os.ModeSymlink == 0 { + continue + } + + resolved, err := filepath.EvalSymlinks(aliasPath) + if err != nil { + continue + } + if target != "" && resolved != target { + continue + } + + if name := normalizeWrapperName(aliasPath); name != "" { + return name + } + } + + return "" +} diff --git a/codeagent-wrapper/wrapper_name_test.go b/codeagent-wrapper/wrapper_name_test.go new file mode 100644 index 0000000..b133d95 --- /dev/null +++ b/codeagent-wrapper/wrapper_name_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "os" + "path/filepath" + "testing" +) + +func TestCurrentWrapperNameFallsBackToExecutable(t *testing.T) { + defer resetTestHooks() + + tempDir := t.TempDir() + execPath := filepath.Join(tempDir, "codeagent-wrapper") + if err := os.WriteFile(execPath, []byte("#!/bin/true\n"), 0o755); err != nil { + t.Fatalf("failed to write fake binary: %v", err) + } + + os.Args = []string{filepath.Join(tempDir, "custom-name")} + executablePathFn = func() (string, error) { + return execPath, nil + } + + if got := currentWrapperName(); got != defaultWrapperName { + t.Fatalf("currentWrapperName() = %q, want %q", got, defaultWrapperName) + } +} + +func TestCurrentWrapperNameDetectsLegacyAliasSymlink(t *testing.T) { + defer resetTestHooks() + + tempDir := t.TempDir() + execPath := filepath.Join(tempDir, "wrapper") + aliasPath := filepath.Join(tempDir, legacyWrapperName) + + if err := os.WriteFile(execPath, []byte("#!/bin/true\n"), 0o755); err != nil { + t.Fatalf("failed to write fake binary: %v", err) + } + if err := os.Symlink(execPath, aliasPath); err != nil { + t.Fatalf("failed to create alias: %v", err) + } + + os.Args = []string{filepath.Join(tempDir, "unknown-runner")} + executablePathFn = func() (string, error) { + return execPath, nil + } + + if got := currentWrapperName(); got != legacyWrapperName { + t.Fatalf("currentWrapperName() = %q, want %q", got, legacyWrapperName) + } +} diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..ca40b24 --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Legacy alias installer: create codex-wrapper -> codeagent-wrapper symlink +# in the configured install directory (defaults to ~/bin). + +BIN_DIR="${INSTALL_DIR:-"$HOME/bin"}" +TARGET_NAME="codeagent-wrapper" +LEGACY_NAME="codex-wrapper" + +mkdir -p "$BIN_DIR" +cd "$BIN_DIR" + +if [[ ! -x "$TARGET_NAME" ]]; then + echo "ERROR: $BIN_DIR/$TARGET_NAME not found or not executable; install the wrapper first." >&2 + exit 1 +fi + +if [[ -L "$LEGACY_NAME" ]]; then + echo "Legacy alias already present: $BIN_DIR/$LEGACY_NAME -> $(readlink "$LEGACY_NAME")" + exit 0 +fi + +if [[ -e "$LEGACY_NAME" ]]; then + echo "INFO: $BIN_DIR/$LEGACY_NAME exists and is not a symlink; leaving user-managed binary untouched." >&2 + exit 0 +fi + +ln -s "$TARGET_NAME" "$LEGACY_NAME" +echo "Created legacy alias $BIN_DIR/$LEGACY_NAME -> $TARGET_NAME" diff --git a/skills/codeagent/SKILL.md b/skills/codeagent/SKILL.md index 6744c10..bd00552 100644 --- a/skills/codeagent/SKILL.md +++ b/skills/codeagent/SKILL.md @@ -7,7 +7,7 @@ description: Execute codeagent-wrapper for multi-backend AI code tasks. Supports ## Overview -Execute codeagent-wrapper commands with pluggable AI backends (Codex, Claude, Gemini). Supports file references via `@` syntax and parallel task execution. +Execute codeagent-wrapper commands with pluggable AI backends (Codex, Claude, Gemini). Supports file references via `@` syntax, parallel task execution with backend selection, and configurable security controls. ## When to Use @@ -49,7 +49,8 @@ codeagent-wrapper --backend gemini "simple task" - `task` (required): Task description, supports `@file` references - `working_dir` (optional): Working directory (default: current) -- `--backend` (optional): Select AI backend (codex/claude/gemini) +- `--backend` (optional): Select AI backend (codex/claude/gemini, default: codex) + - **Note**: Claude backend defaults to `--dangerously-skip-permissions` for automation compatibility ## Return Format @@ -60,18 +61,25 @@ Agent response text here... SESSION_ID: 019a7247-ac9d-71f3-89e2-a823dbd8fd14 ``` -## Resume Session +## Resume Session ```bash +# Resume with default backend codeagent-wrapper resume - <<'EOF' EOF + +# Resume with specific backend +codeagent-wrapper --backend claude resume - <<'EOF' + +EOF ``` ## Parallel Execution +**With global backend**: ```bash -codeagent-wrapper --parallel <<'EOF' +codeagent-wrapper --parallel --backend claude <<'EOF' ---TASK--- id: task1 workdir: /path/to/dir @@ -85,12 +93,44 @@ dependent task EOF ``` +**With per-task backend**: +```bash +codeagent-wrapper --parallel <<'EOF' +---TASK--- +id: task1 +backend: codex +workdir: /path/to/dir +---CONTENT--- +analyze code structure +---TASK--- +id: task2 +backend: claude +dependencies: task1 +---CONTENT--- +design architecture based on analysis +---TASK--- +id: task3 +backend: gemini +dependencies: task2 +---CONTENT--- +generate implementation code +EOF +``` + +**Concurrency Control**: +Set `CODEAGENT_MAX_PARALLEL_WORKERS` to limit concurrent tasks (default: unlimited). + ## Environment Variables -- `CODEX_TIMEOUT`: Override timeout in milliseconds (default: 7200000) +- `CODEX_TIMEOUT`: Override timeout in milliseconds (default: 7200000 = 2 hours) +- `CODEAGENT_SKIP_PERMISSIONS`: Control permission checks + - For **Claude** backend: Set to `true`/`1` to **disable** `--dangerously-skip-permissions` (default: enabled) + - For **Codex/Gemini** backends: Set to `true`/`1` to enable permission skipping (default: disabled) +- `CODEAGENT_MAX_PARALLEL_WORKERS`: Limit concurrent tasks in parallel mode (default: unlimited, recommended: 8) ## Invocation Pattern +**Single Task**: ``` Bash tool parameters: - command: codeagent-wrapper --backend - [working_dir] <<'EOF' @@ -99,3 +139,33 @@ Bash tool parameters: - timeout: 7200000 - description: ``` + +**Parallel Tasks**: +``` +Bash tool parameters: +- command: codeagent-wrapper --parallel --backend <<'EOF' + ---TASK--- + id: task_id + backend: # Optional, overrides global + workdir: /path + dependencies: dep1, dep2 + ---CONTENT--- + task content + EOF +- timeout: 7200000 +- description: +``` + +## Security Best Practices + +- **Claude Backend**: Defaults to `--dangerously-skip-permissions` for automation workflows + - To enforce permission checks with Claude: Set `CODEAGENT_SKIP_PERMISSIONS=true` +- **Codex/Gemini Backends**: Permission checks enabled by default +- **Concurrency Limits**: Set `CODEAGENT_MAX_PARALLEL_WORKERS` in production to prevent resource exhaustion +- **Automation Context**: This wrapper is designed for AI-driven automation where permission prompts would block execution + +## Recent Updates + +- Multi-backend support for all modes (workdir, resume, parallel) +- Security controls with configurable permission checks +- Concurrency limits with worker pool and fail-fast cancellation