Merge branch 'master' into fix-async-log

合并master分支的TaskSpec重构和测试改进到fix-async-log分支:
- 保留异步日志系统 (Logger, atomic.Pointer)
- 集成TaskSpec结构和runCodexTask流程
- 合并所有测试钩子 (buildCodexArgsFn, commandContext, jsonMarshal)
- 统一常量定义 (stdinSpecialChars, stderrCaptureLimit, codexLogLineLimit)
- 整合测试套件,确保两分支特性兼容

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
cexll
2025-12-02 10:18:33 +08:00
6 changed files with 1904 additions and 584 deletions

View File

@@ -11,19 +11,23 @@ import (
"os"
"os/exec"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
const (
version = "1.0.0"
defaultWorkdir = "."
defaultTimeout = 7200 // seconds
forceKillDelay = 5 // seconds
codexLogLineLimit = 1000
version = "1.0.0"
defaultWorkdir = "."
defaultTimeout = 7200 // seconds
forceKillDelay = 5 // seconds
codexLogLineLimit = 1000
stdinSpecialChars = "\n\\\"'`$"
stderrCaptureLimit = 4 * 1024
)
// Test hooks for dependency injection
@@ -33,6 +37,10 @@ var (
codexCommand = "codex"
cleanupHook func()
loggerPtr atomic.Pointer[Logger]
buildCodexArgsFn = buildCodexArgs
commandContext = exec.CommandContext
jsonMarshal = json.Marshal
)
// Config holds CLI configuration
@@ -45,6 +53,293 @@ type Config struct {
Timeout int
}
// ParallelConfig defines the JSON schema for parallel execution
type ParallelConfig struct {
Tasks []TaskSpec `json:"tasks"`
}
// TaskSpec describes an individual task entry in the parallel config
type TaskSpec struct {
ID string `json:"id"`
Task string `json:"task"`
WorkDir string `json:"workdir,omitempty"`
Dependencies []string `json:"dependencies,omitempty"`
SessionID string `json:"session_id,omitempty"`
Mode string `json:"-"`
UseStdin bool `json:"-"`
}
// TaskResult captures the execution outcome of a task
type TaskResult struct {
TaskID string `json:"task_id"`
ExitCode int `json:"exit_code"`
Message string `json:"message"`
SessionID string `json:"session_id"`
Error string `json:"error"`
}
func parseParallelConfig(data []byte) (*ParallelConfig, error) {
trimmed := bytes.TrimSpace(data)
if len(trimmed) == 0 {
return nil, fmt.Errorf("parallel config is empty")
}
tasks := strings.Split(string(trimmed), "---TASK---")
var cfg ParallelConfig
seen := make(map[string]struct{})
for _, taskBlock := range tasks {
taskBlock = strings.TrimSpace(taskBlock)
if taskBlock == "" {
continue
}
parts := strings.SplitN(taskBlock, "---CONTENT---", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("task block missing ---CONTENT--- separator")
}
meta := strings.TrimSpace(parts[0])
content := strings.TrimSpace(parts[1])
task := TaskSpec{WorkDir: defaultWorkdir}
for _, line := range strings.Split(meta, "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
kv := strings.SplitN(line, ":", 2)
if len(kv) != 2 {
continue
}
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
switch key {
case "id":
task.ID = value
case "workdir":
task.WorkDir = value
case "session_id":
task.SessionID = value
task.Mode = "resume"
case "dependencies":
for _, dep := range strings.Split(value, ",") {
dep = strings.TrimSpace(dep)
if dep != "" {
task.Dependencies = append(task.Dependencies, dep)
}
}
}
}
if task.ID == "" {
return nil, fmt.Errorf("task missing id field")
}
if content == "" {
return nil, fmt.Errorf("task %q missing content", task.ID)
}
if _, exists := seen[task.ID]; exists {
return nil, fmt.Errorf("duplicate task id: %s", task.ID)
}
task.Task = content
cfg.Tasks = append(cfg.Tasks, task)
seen[task.ID] = struct{}{}
}
if len(cfg.Tasks) == 0 {
return nil, fmt.Errorf("no tasks found")
}
return &cfg, nil
}
func topologicalSort(tasks []TaskSpec) ([][]TaskSpec, error) {
idToTask := make(map[string]TaskSpec, len(tasks))
indegree := make(map[string]int, len(tasks))
adj := make(map[string][]string, len(tasks))
for _, task := range tasks {
idToTask[task.ID] = task
indegree[task.ID] = 0
}
for _, task := range tasks {
for _, dep := range task.Dependencies {
if _, ok := idToTask[dep]; !ok {
return nil, fmt.Errorf("dependency %q not found for task %q", dep, task.ID)
}
indegree[task.ID]++
adj[dep] = append(adj[dep], task.ID)
}
}
queue := make([]string, 0, len(tasks))
for _, task := range tasks {
if indegree[task.ID] == 0 {
queue = append(queue, task.ID)
}
}
layers := make([][]TaskSpec, 0)
processed := 0
for len(queue) > 0 {
current := queue
queue = nil
layer := make([]TaskSpec, len(current))
for i, id := range current {
layer[i] = idToTask[id]
processed++
}
layers = append(layers, layer)
next := make([]string, 0)
for _, id := range current {
for _, neighbor := range adj[id] {
indegree[neighbor]--
if indegree[neighbor] == 0 {
next = append(next, neighbor)
}
}
}
queue = append(queue, next...)
}
if processed != len(tasks) {
cycleIDs := make([]string, 0)
for id, deg := range indegree {
if deg > 0 {
cycleIDs = append(cycleIDs, id)
}
}
sort.Strings(cycleIDs)
return nil, fmt.Errorf("cycle detected involving tasks: %s", strings.Join(cycleIDs, ","))
}
return layers, nil
}
var runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
if task.WorkDir == "" {
task.WorkDir = defaultWorkdir
}
if task.Mode == "" {
task.Mode = "new"
}
if task.UseStdin || shouldUseStdin(task.Task, false) {
task.UseStdin = true
}
return runCodexTask(task, true, timeout)
}
func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult {
totalTasks := 0
for _, layer := range layers {
totalTasks += len(layer)
}
results := make([]TaskResult, 0, totalTasks)
failed := make(map[string]TaskResult, totalTasks)
resultsCh := make(chan TaskResult, totalTasks)
for _, layer := range layers {
var wg sync.WaitGroup
executed := 0
for _, task := range layer {
if skip, reason := shouldSkipTask(task, failed); skip {
res := TaskResult{TaskID: task.ID, ExitCode: 1, Error: reason}
results = append(results, res)
failed[task.ID] = res
continue
}
executed++
wg.Add(1)
go func(ts TaskSpec) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)}
}
}()
resultsCh <- runCodexTaskFn(ts, timeout)
}(task)
}
wg.Wait()
for i := 0; i < executed; i++ {
res := <-resultsCh
results = append(results, res)
if res.ExitCode != 0 || res.Error != "" {
failed[res.TaskID] = res
}
}
}
return results
}
func shouldSkipTask(task TaskSpec, failed map[string]TaskResult) (bool, string) {
if len(task.Dependencies) == 0 {
return false, ""
}
var blocked []string
for _, dep := range task.Dependencies {
if _, ok := failed[dep]; ok {
blocked = append(blocked, dep)
}
}
if len(blocked) == 0 {
return false, ""
}
return true, fmt.Sprintf("skipped due to failed dependencies: %s", strings.Join(blocked, ","))
}
func generateFinalOutput(results []TaskResult) string {
var sb strings.Builder
success := 0
failed := 0
for _, res := range results {
if res.ExitCode == 0 && res.Error == "" {
success++
} else {
failed++
}
}
sb.WriteString(fmt.Sprintf("=== Parallel Execution Summary ===\n"))
sb.WriteString(fmt.Sprintf("Total: %d | Success: %d | Failed: %d\n\n", len(results), success, failed))
for _, res := range results {
sb.WriteString(fmt.Sprintf("--- Task: %s ---\n", res.TaskID))
if res.Error != "" {
sb.WriteString(fmt.Sprintf("Status: FAILED (exit code %d)\nError: %s\n", res.ExitCode, res.Error))
} else if res.ExitCode != 0 {
sb.WriteString(fmt.Sprintf("Status: FAILED (exit code %d)\n", res.ExitCode))
} else {
sb.WriteString("Status: SUCCESS\n")
}
if res.SessionID != "" {
sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID))
}
if res.Message != "" {
sb.WriteString(fmt.Sprintf("\n%s\n", res.Message))
}
sb.WriteString("\n")
}
return sb.String()
}
// JSONEvent represents a Codex JSON output event
type JSONEvent struct {
Type string `json:"type"`
@@ -73,7 +368,6 @@ func run() int {
setLogger(logger)
defer func() {
// Ensure all pending logs are written before closing
if logger := activeLogger(); logger != nil {
logger.Flush()
}
@@ -81,9 +375,6 @@ func run() int {
fmt.Fprintf(os.Stderr, "ERROR: failed to close logger: %v\n", err)
}
}()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
defer runCleanupHook()
// Handle --version and --help first
@@ -95,6 +386,45 @@ func run() int {
case "--help", "-h":
printHelp()
return 0
case "--parallel":
if len(os.Args) > 2 {
fmt.Fprintln(os.Stderr, "ERROR: --parallel reads its task configuration from stdin and does not accept additional arguments.")
fmt.Fprintln(os.Stderr, "Usage examples:")
fmt.Fprintln(os.Stderr, " codex-wrapper --parallel < tasks.txt")
fmt.Fprintln(os.Stderr, " echo '...' | codex-wrapper --parallel")
fmt.Fprintln(os.Stderr, " codex-wrapper --parallel <<'EOF'")
return 1
}
data, err := io.ReadAll(stdinReader)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: failed to read stdin: %v\n", err)
return 1
}
cfg, err := parseParallelConfig(data)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %v\n", err)
return 1
}
timeoutSec := resolveTimeout()
layers, err := topologicalSort(cfg.Tasks)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %v\n", err)
return 1
}
results := executeConcurrent(layers, timeoutSec)
fmt.Println(generateFinalOutput(results))
exitCode := 0
for _, res := range results {
if res.ExitCode != 0 {
exitCode = res.ExitCode
}
}
return exitCode
}
}
@@ -111,7 +441,6 @@ func run() int {
logInfo(fmt.Sprintf("Timeout: %ds", timeoutSec))
cfg.Timeout = timeoutSec
// Determine task text and stdin mode
var taskText string
var piped bool
@@ -158,6 +487,18 @@ func run() int {
if strings.Contains(taskText, "\\") {
reasons = append(reasons, "backslash")
}
if strings.Contains(taskText, "\"") {
reasons = append(reasons, "double-quote")
}
if strings.Contains(taskText, "'") {
reasons = append(reasons, "single-quote")
}
if strings.Contains(taskText, "`") {
reasons = append(reasons, "backtick")
}
if strings.Contains(taskText, "$") {
reasons = append(reasons, "dollar")
}
if len(taskText) > 800 {
reasons = append(reasons, "length>800")
}
@@ -166,26 +507,25 @@ func run() int {
}
}
targetArg := taskText
if useStdin {
targetArg = "-"
}
codexArgs := buildCodexArgs(cfg, targetArg)
logInfo("codex running...")
message, threadID, exitCode := runCodexProcess(ctx, codexArgs, taskText, useStdin, cfg.Timeout)
if exitCode != 0 {
return exitCode
taskSpec := TaskSpec{
Task: taskText,
WorkDir: cfg.WorkDir,
Mode: cfg.Mode,
SessionID: cfg.SessionID,
UseStdin: useStdin,
}
// Output agent_message
fmt.Println(message)
result := runCodexTask(taskSpec, false, cfg.Timeout)
// Output session_id if present
if threadID != "" {
fmt.Printf("\n---\nSESSION_ID: %s\n", threadID)
if result.ExitCode != 0 {
return result.ExitCode
}
fmt.Println(result.Message)
if result.SessionID != "" {
fmt.Printf("\n---\nSESSION_ID: %s\n", result.SessionID)
}
return 0
@@ -197,11 +537,8 @@ func parseArgs() (*Config, error) {
return nil, fmt.Errorf("task required")
}
cfg := &Config{
WorkDir: defaultWorkdir,
}
cfg := &Config{WorkDir: defaultWorkdir}
// Check for resume mode
if args[0] == "resume" {
if len(args) < 3 {
return nil, fmt.Errorf("resume mode requires: resume <session_id> <task>")
@@ -247,16 +584,10 @@ func shouldUseStdin(taskText string, piped bool) bool {
if piped {
return true
}
if strings.Contains(taskText, "\n") {
return true
}
if strings.Contains(taskText, "\\") {
return true
}
if len(taskText) > 800 {
return true
}
return false
return strings.IndexAny(taskText, stdinSpecialChars) >= 0
}
func buildCodexArgs(cfg *Config, targetArg string) []string {
@@ -284,75 +615,157 @@ type parseResult struct {
threadID string
}
func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult {
return runCodexTaskWithContext(context.Background(), taskSpec, nil, false, silent, timeoutSec)
}
func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText string, useStdin bool, timeoutSec int) (message, threadID string, exitCode int) {
ctx, cancel := context.WithTimeout(parentCtx, time.Duration(timeoutSec)*time.Second)
res := runCodexTaskWithContext(parentCtx, TaskSpec{Task: taskText, WorkDir: defaultWorkdir, Mode: "new", UseStdin: useStdin}, codexArgs, true, false, timeoutSec)
return res.Message, res.SessionID, res.ExitCode
}
func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult {
result := TaskResult{TaskID: taskSpec.ID}
cfg := &Config{
Mode: taskSpec.Mode,
Task: taskSpec.Task,
SessionID: taskSpec.SessionID,
WorkDir: taskSpec.WorkDir,
}
if cfg.Mode == "" {
cfg.Mode = "new"
}
if cfg.WorkDir == "" {
cfg.WorkDir = defaultWorkdir
}
useStdin := taskSpec.UseStdin
targetArg := taskSpec.Task
if useStdin {
targetArg = "-"
}
var codexArgs []string
if useCustomArgs {
codexArgs = customArgs
} else {
codexArgs = buildCodexArgsFn(cfg, targetArg)
}
logInfoFn := logInfo
logWarnFn := logWarn
logErrorFn := logError
stderrBuf := &tailBuffer{limit: stderrCaptureLimit}
var stdoutLogger *logWriter
var stderrLogger *logWriter
var tempLogger *Logger
if silent && activeLogger() == nil {
if l, err := NewLogger(); err == nil {
setLogger(l)
tempLogger = l
}
}
defer func() {
if tempLogger != nil {
closeLogger()
}
}()
if !silent {
stdoutLogger = newLogWriter("CODEX_STDOUT: ", codexLogLineLimit)
stderrLogger = newLogWriter("CODEX_STDERR: ", codexLogLineLimit)
}
ctx := parentCtx
if ctx == nil {
ctx = context.Background()
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
defer cancel()
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()
cmd := exec.Command(codexCommand, codexArgs...)
attachStderr := func(msg string) string {
return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String())
}
// Create log writers for stdout and stderr
stdoutLogger := newLogWriter("CODEX_STDOUT: ", codexLogLineLimit)
stderrLogger := newLogWriter("CODEX_STDERR: ", codexLogLineLimit)
defer stdoutLogger.Flush()
defer stderrLogger.Flush()
cmd := commandContext(ctx, codexCommand, codexArgs...)
// Stderr goes to both os.Stderr and logger
cmd.Stderr = io.MultiWriter(os.Stderr, stderrLogger)
stderrWriters := []io.Writer{stderrBuf}
if stderrLogger != nil {
stderrWriters = append(stderrWriters, stderrLogger)
}
if !silent {
stderrWriters = append([]io.Writer{os.Stderr}, stderrWriters...)
}
if len(stderrWriters) == 1 {
cmd.Stderr = stderrWriters[0]
} else {
cmd.Stderr = io.MultiWriter(stderrWriters...)
}
// Setup stdin if needed
var stdinPipe io.WriteCloser
var err error
if useStdin {
stdinPipe, err = cmd.StdinPipe()
if err != nil {
logError("Failed to create stdin pipe: " + err.Error())
return "", "", 1
logErrorFn("Failed to create stdin pipe: " + err.Error())
result.ExitCode = 1
result.Error = attachStderr("failed to create stdin pipe: " + err.Error())
return result
}
}
// Setup stdout
stdout, err := cmd.StdoutPipe()
if err != nil {
logError("Failed to create stdout pipe: " + err.Error())
return "", "", 1
logErrorFn("Failed to create stdout pipe: " + err.Error())
result.ExitCode = 1
result.Error = attachStderr("failed to create stdout pipe: " + err.Error())
return result
}
// Tee stdout to logger while parsing JSON
stdoutReader := io.TeeReader(stdout, stdoutLogger)
stdoutReader := io.Reader(stdout)
if stdoutLogger != nil {
stdoutReader = io.TeeReader(stdout, stdoutLogger)
}
logInfo(fmt.Sprintf("Starting codex with args: codex %s...", strings.Join(codexArgs[:min(5, len(codexArgs))], " ")))
logInfoFn(fmt.Sprintf("Starting codex with args: codex %s...", strings.Join(codexArgs[:min(5, len(codexArgs))], " ")))
// Start process
if err := cmd.Start(); err != nil {
if strings.Contains(err.Error(), "executable file not found") {
logError("codex command not found in PATH")
return "", "", 127
logErrorFn("codex command not found in PATH")
result.ExitCode = 127
result.Error = attachStderr("codex command not found in PATH")
return result
}
logError("Failed to start codex: " + err.Error())
return "", "", 1
logErrorFn("Failed to start codex: " + err.Error())
result.ExitCode = 1
result.Error = attachStderr("failed to start codex: " + err.Error())
return result
}
logInfo(fmt.Sprintf("Process started with PID: %d", cmd.Process.Pid))
// Write to stdin if needed
logInfoFn(fmt.Sprintf("Process started with PID: %d", cmd.Process.Pid))
if useStdin && stdinPipe != nil {
logInfo(fmt.Sprintf("Writing %d chars to stdin...", len(taskText)))
go func() {
logInfoFn(fmt.Sprintf("Writing %d chars to stdin...", len(taskSpec.Task)))
go func(data string) {
defer stdinPipe.Close()
io.WriteString(stdinPipe, taskText)
}()
logInfo("Stdin closed")
_, _ = io.WriteString(stdinPipe, data)
}(taskSpec.Task)
logInfoFn("Stdin closed")
}
logInfo("Reading stdout...")
waitCh := make(chan error, 1)
go func() {
waitCh <- cmd.Wait()
}()
go func() { waitCh <- cmd.Wait() }()
parseCh := make(chan parseResult, 1)
go func() {
msg, tid := parseJSONStream(stdoutReader)
msg, tid := parseJSONStreamWithWarn(stdoutReader, logWarnFn)
parseCh <- parseResult{message: msg, threadID: tid}
}()
@@ -362,7 +775,7 @@ func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText str
select {
case waitErr = <-waitCh:
case <-ctx.Done():
logError(cancelReason(ctx))
logErrorFn(cancelReason(ctx))
forceKillTimer = terminateProcess(cmd)
waitErr = <-waitCh
}
@@ -371,33 +784,106 @@ func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText str
forceKillTimer.Stop()
}
result := <-parseCh
parsed := <-parseCh
if ctxErr := ctx.Err(); ctxErr != nil {
if errors.Is(ctxErr, context.DeadlineExceeded) {
return "", "", 124
result.ExitCode = 124
result.Error = attachStderr("codex execution timeout")
return result
}
return "", "", 130
result.ExitCode = 130
result.Error = attachStderr("execution cancelled")
return result
}
if waitErr != nil {
if exitErr, ok := waitErr.(*exec.ExitError); ok {
code := exitErr.ExitCode()
logError(fmt.Sprintf("Codex exited with status %d", code))
return "", "", code
logErrorFn(fmt.Sprintf("Codex exited with status %d", code))
result.ExitCode = code
result.Error = attachStderr(fmt.Sprintf("codex exited with status %d", code))
return result
}
logError("Codex error: " + waitErr.Error())
return "", "", 1
logErrorFn("Codex error: " + waitErr.Error())
result.ExitCode = 1
result.Error = attachStderr("codex error: " + waitErr.Error())
return result
}
message = result.message
threadID = result.threadID
message := parsed.message
threadID := parsed.threadID
if message == "" {
logError("Codex completed without agent_message output")
return "", "", 1
logErrorFn("Codex completed without agent_message output")
result.ExitCode = 1
result.Error = attachStderr("codex completed without agent_message output")
return result
}
return message, threadID, 0
if stdoutLogger != nil {
stdoutLogger.Flush()
}
if stderrLogger != nil {
stderrLogger.Flush()
}
result.ExitCode = 0
result.Message = message
result.SessionID = threadID
return result
}
type tailBuffer struct {
limit int
data []byte
}
func (b *tailBuffer) Write(p []byte) (int, error) {
if b.limit <= 0 {
return len(p), nil
}
if len(p) >= b.limit {
b.data = append(b.data[:0], p[len(p)-b.limit:]...)
return len(p), nil
}
total := len(b.data) + len(p)
if total <= b.limit {
b.data = append(b.data, p...)
return len(p), nil
}
overflow := total - b.limit
b.data = append(b.data[overflow:], p...)
return len(p), nil
}
func (b *tailBuffer) String() string {
return string(b.data)
}
func forwardSignals(ctx context.Context, cmd *exec.Cmd, logErrorFn func(string)) {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
defer signal.Stop(sigCh)
select {
case sig := <-sigCh:
logErrorFn(fmt.Sprintf("Received signal: %v", sig))
if cmd.Process != nil {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() {
if cmd.Process != nil {
cmd.Process.Kill()
}
})
}
case <-ctx.Done():
}
}()
}
func cancelReason(ctx context.Context) string {
@@ -427,35 +913,32 @@ func terminateProcess(cmd *exec.Cmd) *time.Timer {
}
func parseJSONStream(r io.Reader) (message, threadID string) {
logInfo("parseJSONStream: starting to decode stdout stream")
reader := bufio.NewReaderSize(r, 64*1024)
decoder := json.NewDecoder(reader)
return parseJSONStreamWithWarn(r, logWarn)
}
func parseJSONStreamWithWarn(r io.Reader, warnFn func(string)) (message, threadID string) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
if warnFn == nil {
warnFn = func(string) {}
}
totalEvents := 0
for {
var event JSONEvent
if err := decoder.Decode(&event); err != nil {
if errors.Is(err, io.EOF) {
break
}
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
totalEvents++
logWarn(fmt.Sprintf("Failed to decode JSON: %v", err))
var skipErr error
reader, skipErr = discardInvalidJSON(decoder, reader)
if skipErr != nil {
if errors.Is(skipErr, os.ErrClosed) || errors.Is(skipErr, io.ErrClosedPipe) {
logWarn("Read stdout error: " + skipErr.Error())
break
}
if !errors.Is(skipErr, io.EOF) {
logWarn("Read stdout error: " + skipErr.Error())
}
}
decoder = json.NewDecoder(reader)
var event JSONEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse line: %s", truncate(line, 100)))
continue
}
totalEvents++
var details []string
if event.ThreadID != "" {
details = append(details, fmt.Sprintf("thread_id=%s", event.ThreadID))
@@ -487,6 +970,10 @@ func parseJSONStream(r io.Reader) (message, threadID string) {
}
}
if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) {
warnFn("Read stdout error: " + err.Error())
}
logInfo(fmt.Sprintf("parseJSONStream completed: events=%d, message_len=%d, thread_id_found=%t", totalEvents, len(message), threadID != ""))
return message, threadID
}
@@ -546,7 +1033,6 @@ func resolveTimeout() int {
return defaultTimeout
}
// Environment variable is in milliseconds if > 10000, convert to seconds
if parsed > 10000 {
return parsed / 1000
}
@@ -634,6 +1120,9 @@ func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
if maxLen < 0 {
return ""
}
return s[:maxLen] + "..."
}
@@ -660,6 +1149,18 @@ func activeLogger() *Logger {
return loggerPtr.Load()
}
func hello() string {
return "hello world"
}
func greet(name string) string {
return "hello " + name
}
func farewell(name string) string {
return "goodbye " + name
}
func logInfo(msg string) {
if logger := activeLogger(); logger != nil {
logger.Info(msg)
@@ -701,9 +1202,15 @@ Usage:
codex-wrapper - [workdir] Read task from stdin
codex-wrapper resume <session_id> "task" [workdir]
codex-wrapper resume <session_id> - [workdir]
codex-wrapper --parallel Run tasks in parallel (config from stdin)
codex-wrapper --version
codex-wrapper --help
Parallel mode examples:
codex-wrapper --parallel < tasks.txt
echo '...' | codex-wrapper --parallel
codex-wrapper --parallel <<'EOF'
Environment Variables:
CODEX_TIMEOUT Timeout in milliseconds (default: 7200000)

View File

@@ -0,0 +1,400 @@
package main
import (
"bytes"
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
type integrationSummary struct {
Total int `json:"total"`
Success int `json:"success"`
Failed int `json:"failed"`
}
type integrationOutput struct {
Results []TaskResult `json:"results"`
Summary integrationSummary `json:"summary"`
}
func captureStdout(t *testing.T, fn func()) string {
t.Helper()
old := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w
fn()
w.Close()
os.Stdout = old
var buf bytes.Buffer
io.Copy(&buf, r)
return buf.String()
}
func parseIntegrationOutput(t *testing.T, out string) integrationOutput {
t.Helper()
var payload integrationOutput
lines := strings.Split(out, "\n")
var currentTask *TaskResult
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "Total:") {
parts := strings.Split(line, "|")
for _, p := range parts {
p = strings.TrimSpace(p)
if strings.HasPrefix(p, "Total:") {
fmt.Sscanf(p, "Total: %d", &payload.Summary.Total)
} else if strings.HasPrefix(p, "Success:") {
fmt.Sscanf(p, "Success: %d", &payload.Summary.Success)
} else if strings.HasPrefix(p, "Failed:") {
fmt.Sscanf(p, "Failed: %d", &payload.Summary.Failed)
}
}
} else if strings.HasPrefix(line, "--- Task:") {
if currentTask != nil {
payload.Results = append(payload.Results, *currentTask)
}
currentTask = &TaskResult{}
currentTask.TaskID = strings.TrimSuffix(strings.TrimPrefix(line, "--- Task: "), " ---")
} else if currentTask != nil {
if strings.HasPrefix(line, "Status: SUCCESS") {
currentTask.ExitCode = 0
} else if strings.HasPrefix(line, "Status: FAILED") {
if strings.Contains(line, "exit code") {
fmt.Sscanf(line, "Status: FAILED (exit code %d)", &currentTask.ExitCode)
} else {
currentTask.ExitCode = 1
}
} else if strings.HasPrefix(line, "Error:") {
currentTask.Error = strings.TrimPrefix(line, "Error: ")
} else if strings.HasPrefix(line, "Session:") {
currentTask.SessionID = strings.TrimPrefix(line, "Session: ")
} else if line != "" && !strings.HasPrefix(line, "===") && !strings.HasPrefix(line, "---") {
if currentTask.Message != "" {
currentTask.Message += "\n"
}
currentTask.Message += line
}
}
}
if currentTask != nil {
payload.Results = append(payload.Results, *currentTask)
}
return payload
}
func findResultByID(t *testing.T, payload integrationOutput, id string) TaskResult {
t.Helper()
for _, res := range payload.Results {
if res.TaskID == id {
return res
}
}
t.Fatalf("result for task %s not found", id)
return TaskResult{}
}
func TestParallelEndToEnd_OrderAndConcurrency(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
input := `---TASK---
id: A
---CONTENT---
task-a
---TASK---
id: B
dependencies: A
---CONTENT---
task-b
---TASK---
id: C
dependencies: B
---CONTENT---
task-c
---TASK---
id: D
---CONTENT---
task-d
---TASK---
id: E
---CONTENT---
task-e`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
var mu sync.Mutex
starts := make(map[string]time.Time)
ends := make(map[string]time.Time)
var running int64
var maxParallel int64
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
start := time.Now()
mu.Lock()
starts[task.ID] = start
mu.Unlock()
cur := atomic.AddInt64(&running, 1)
for {
prev := atomic.LoadInt64(&maxParallel)
if cur <= prev {
break
}
if atomic.CompareAndSwapInt64(&maxParallel, prev, cur) {
break
}
}
time.Sleep(40 * time.Millisecond)
mu.Lock()
ends[task.ID] = time.Now()
mu.Unlock()
atomic.AddInt64(&running, -1)
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task}
}
var exitCode int
output := captureStdout(t, func() {
exitCode = run()
})
if exitCode != 0 {
t.Fatalf("run() exit = %d, want 0", exitCode)
}
payload := parseIntegrationOutput(t, output)
if payload.Summary.Failed != 0 || payload.Summary.Total != 5 || payload.Summary.Success != 5 {
t.Fatalf("unexpected summary: %+v", payload.Summary)
}
aEnd := ends["A"]
bStart := starts["B"]
cStart := starts["C"]
bEnd := ends["B"]
if aEnd.IsZero() || bStart.IsZero() || bEnd.IsZero() || cStart.IsZero() {
t.Fatalf("missing timestamps, starts=%v ends=%v", starts, ends)
}
if !aEnd.Before(bStart) && !aEnd.Equal(bStart) {
t.Fatalf("B should start after A ends: A_end=%v B_start=%v", aEnd, bStart)
}
if !bEnd.Before(cStart) && !bEnd.Equal(cStart) {
t.Fatalf("C should start after B ends: B_end=%v C_start=%v", bEnd, cStart)
}
dStart := starts["D"]
eStart := starts["E"]
if dStart.IsZero() || eStart.IsZero() {
t.Fatalf("missing D/E start times: %v", starts)
}
delta := dStart.Sub(eStart)
if delta < 0 {
delta = -delta
}
if delta > 25*time.Millisecond {
t.Fatalf("D and E should run in parallel, delta=%v", delta)
}
if maxParallel < 2 {
t.Fatalf("expected at least 2 concurrent tasks, got %d", maxParallel)
}
}
func TestParallelCycleDetectionStopsExecution(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
t.Fatalf("task %s should not execute on cycle", task.ID)
return TaskResult{}
}
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
input := `---TASK---
id: A
dependencies: B
---CONTENT---
a
---TASK---
id: B
dependencies: A
---CONTENT---
b`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
exitCode := 0
output := captureStdout(t, func() {
exitCode = run()
})
if exitCode == 0 {
t.Fatalf("cycle should cause non-zero exit, got %d", exitCode)
}
if strings.TrimSpace(output) != "" {
t.Fatalf("expected no JSON output on cycle, got %q", output)
}
}
func TestParallelPartialFailureBlocksDependents(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
if task.ID == "A" {
return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom"}
}
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task}
}
input := `---TASK---
id: A
---CONTENT---
fail
---TASK---
id: B
dependencies: A
---CONTENT---
blocked
---TASK---
id: D
---CONTENT---
ok-d
---TASK---
id: E
---CONTENT---
ok-e`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
var exitCode int
output := captureStdout(t, func() {
exitCode = run()
})
payload := parseIntegrationOutput(t, output)
if exitCode == 0 {
t.Fatalf("expected non-zero exit when a task fails, got %d", exitCode)
}
resA := findResultByID(t, payload, "A")
resB := findResultByID(t, payload, "B")
resD := findResultByID(t, payload, "D")
resE := findResultByID(t, payload, "E")
if resA.ExitCode == 0 {
t.Fatalf("task A should fail, got %+v", resA)
}
if resB.ExitCode == 0 || !strings.Contains(resB.Error, "dependencies") {
t.Fatalf("task B should be skipped due to dependency failure, got %+v", resB)
}
if resD.ExitCode != 0 || resE.ExitCode != 0 {
t.Fatalf("independent tasks should run successfully, D=%+v E=%+v", resD, resE)
}
if payload.Summary.Failed != 2 || payload.Summary.Total != 4 {
t.Fatalf("unexpected summary after partial failure: %+v", payload.Summary)
}
}
func TestParallelTimeoutPropagation(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
os.Unsetenv("CODEX_TIMEOUT")
})
var receivedTimeout int
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
receivedTimeout = timeout
return TaskResult{TaskID: task.ID, ExitCode: 124, Error: "timeout"}
}
os.Setenv("CODEX_TIMEOUT", "1")
input := `---TASK---
id: T
---CONTENT---
slow`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
exitCode := 0
output := captureStdout(t, func() {
exitCode = run()
})
payload := parseIntegrationOutput(t, output)
if receivedTimeout != 1 {
t.Fatalf("expected timeout 1s to propagate, got %d", receivedTimeout)
}
if exitCode != 124 {
t.Fatalf("expected timeout exit code 124, got %d", exitCode)
}
if payload.Summary.Failed != 1 || payload.Summary.Total != 1 {
t.Fatalf("unexpected summary for timeout case: %+v", payload.Summary)
}
res := findResultByID(t, payload, "T")
if res.Error == "" || res.ExitCode != 124 {
t.Fatalf("timeout result not propagated, got %+v", res)
}
}
func TestConcurrentSpeedupBenchmark(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
time.Sleep(50 * time.Millisecond)
return TaskResult{TaskID: task.ID}
}
tasks := make([]TaskSpec, 10)
for i := range tasks {
tasks[i] = TaskSpec{ID: fmt.Sprintf("task-%d", i)}
}
layers := [][]TaskSpec{tasks}
serialStart := time.Now()
for _, task := range tasks {
_ = runCodexTaskFn(task, 5)
}
serialElapsed := time.Since(serialStart)
concurrentStart := time.Now()
_ = executeConcurrent(layers, 5)
concurrentElapsed := time.Since(concurrentStart)
if concurrentElapsed >= serialElapsed/5 {
t.Fatalf("expected concurrent time <20%% of serial, serial=%v concurrent=%v", serialElapsed, concurrentElapsed)
}
ratio := float64(concurrentElapsed) / float64(serialElapsed)
t.Logf("speedup ratio (concurrent/serial)=%.3f", ratio)
}

File diff suppressed because it is too large Load Diff