diff --git a/codeagent-wrapper/codeagent-wrapper.test b/codeagent-wrapper/codeagent-wrapper.test new file mode 100755 index 0000000..f9217d6 Binary files /dev/null and b/codeagent-wrapper/codeagent-wrapper.test differ diff --git a/codeagent-wrapper/executor.go b/codeagent-wrapper/executor.go index 1c86a20..20b23e0 100644 --- a/codeagent-wrapper/executor.go +++ b/codeagent-wrapper/executor.go @@ -205,6 +205,27 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { failed := make(map[string]TaskResult, totalTasks) resultsCh := make(chan TaskResult, totalTasks) + var startPrintMu sync.Mutex + bannerPrinted := false + + printTaskStart := func(taskID string) { + logger := activeLogger() + if logger == nil { + return + } + path := logger.Path() + if path == "" { + return + } + startPrintMu.Lock() + if !bannerPrinted { + fmt.Fprintln(os.Stderr, "=== Starting Parallel Execution ===") + bannerPrinted = true + } + fmt.Fprintf(os.Stderr, "Task %s: Log: %s\n", taskID, path) + startPrintMu.Unlock() + } + for _, layer := range layers { var wg sync.WaitGroup executed := 0 @@ -226,6 +247,7 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)} } }() + printTaskStart(ts.ID) resultsCh <- runCodexTaskFn(ts, timeout) }(task) } @@ -334,6 +356,14 @@ func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText str func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult { result := TaskResult{TaskID: taskSpec.ID} + setLogPath := func() { + if result.LogPath != "" { + return + } + if logger := activeLogger(); logger != nil { + result.LogPath = logger.Path() + } + } cfg := &Config{ Mode: taskSpec.Mode, @@ -413,6 +443,10 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo _ = closeLogger() } }() + defer setLogPath() + if logger := activeLogger(); logger != nil { + result.LogPath = logger.Path() + } if !silent { stdoutLogger = newLogWriter("CODEX_STDOUT: ", codexLogLineLimit) @@ -506,20 +540,28 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo waitCh := make(chan error, 1) go func() { waitCh <- cmd.Wait() }() + messageSeen := make(chan struct{}, 1) parseCh := make(chan parseResult, 1) go func() { - msg, tid := parseJSONStreamWithLog(stdoutReader, logWarnFn, logInfoFn) + msg, tid := parseJSONStreamInternal(stdoutReader, logWarnFn, logInfoFn, func() { + select { + case messageSeen <- struct{}{}: + default: + } + }) parseCh <- parseResult{message: msg, threadID: tid} }() var waitErr error - var forceKillTimer *time.Timer + var forceKillTimer *forceKillTimer + var ctxCancelled bool select { case waitErr = <-waitCh: case <-ctx.Done(): + ctxCancelled = true logErrorFn(cancelReason(ctx)) - forceKillTimer = terminateProcess(cmd) + forceKillTimer = terminateCommandFn(cmd) waitErr = <-waitCh } @@ -527,7 +569,25 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo forceKillTimer.Stop() } - parsed := <-parseCh + var parsed parseResult + if ctxCancelled { + closeWithReason(stdout, stdoutCloseReasonCtx) + parsed = <-parseCh + } else { + drainTimer := time.NewTimer(stdoutDrainTimeout) + defer drainTimer.Stop() + + select { + case parsed = <-parseCh: + closeWithReason(stdout, stdoutCloseReasonWait) + case <-messageSeen: + closeWithReason(stdout, stdoutCloseReasonWait) + parsed = <-parseCh + case <-drainTimer.C: + closeWithReason(stdout, stdoutCloseReasonDrain) + parsed = <-parseCh + } + } if ctxErr := ctx.Err(); ctxErr != nil { if errors.Is(ctxErr, context.DeadlineExceeded) { @@ -582,10 +642,14 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo func forwardSignals(ctx context.Context, cmd commandRunner, logErrorFn func(string)) { sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + if signalNotifyFn != nil { + signalNotifyFn(sigCh, syscall.SIGINT, syscall.SIGTERM) + } go func() { - defer signal.Stop(sigCh) + if signalStopFn != nil { + defer signalStopFn(sigCh) + } select { case sig := <-sigCh: logErrorFn(fmt.Sprintf("Received signal: %v", sig)) @@ -614,6 +678,21 @@ func cancelReason(ctx context.Context) string { return "Execution cancelled, terminating codex process" } +type stdoutReasonCloser interface { + CloseWithReason(string) error +} + +func closeWithReason(rc io.ReadCloser, reason string) { + if rc == nil { + return + } + if c, ok := rc.(stdoutReasonCloser); ok { + _ = c.CloseWithReason(reason) + return + } + _ = rc.Close() +} + type forceKillTimer struct { timer *time.Timer done chan struct{} diff --git a/codeagent-wrapper/logger.go b/codeagent-wrapper/logger.go index c7102c6..9c638bf 100644 --- a/codeagent-wrapper/logger.go +++ b/codeagent-wrapper/logger.go @@ -64,7 +64,7 @@ func NewLogger() (*Logger, error) { // NewLoggerWithSuffix creates a logger with an optional suffix in the filename. // Useful for tests that need isolated log files within the same process. func NewLoggerWithSuffix(suffix string) (*Logger, error) { - filename := fmt.Sprintf("codeagent-wrapper-%d", os.Getpid()) + filename := fmt.Sprintf("%s-%d", primaryLogPrefix(), os.Getpid()) if suffix != "" { filename += "-" + suffix } @@ -156,7 +156,7 @@ func (l *Logger) Close() error { } // Log file is kept for debugging - NOT removed - // Users can manually clean up /tmp/codeagent-wrapper-*.log files + // Users can manually clean up /tmp/-*.log files }) return closeErr @@ -246,16 +246,16 @@ func (l *Logger) run() { defer ticker.Stop() for { - select { - case entry, ok := <-l.ch: - if !ok { - // Channel closed, final flush - _ = l.writer.Flush() - return - } - timestamp := time.Now().Format("2006-01-02 15:04:05.000") - pid := os.Getpid() - fmt.Fprintf(l.writer, "[%s] [PID:%d] %s: %s\n", timestamp, pid, entry.level, entry.msg) + select { + case entry, ok := <-l.ch: + if !ok { + // Channel closed, final flush + _ = l.writer.Flush() + return + } + timestamp := time.Now().Format("2006-01-02 15:04:05.000") + pid := os.Getpid() + fmt.Fprintf(l.writer, "[%s] [PID:%d] %s: %s\n", timestamp, pid, entry.level, entry.msg) l.pendingWG.Done() case <-ticker.C: @@ -270,7 +270,7 @@ func (l *Logger) run() { } } -// cleanupOldLogs scans os.TempDir() for codex-wrapper-*.log files and removes those +// cleanupOldLogs scans os.TempDir() for wrapper log files and removes those // whose owning process is no longer running (i.e., orphaned logs). // It includes safety checks for: // - PID reuse: Compares file modification time with process start time @@ -278,12 +278,28 @@ func (l *Logger) run() { func cleanupOldLogs() (CleanupStats, error) { var stats CleanupStats tempDir := os.TempDir() - pattern := filepath.Join(tempDir, "codex-wrapper-*.log") - matches, err := globLogFiles(pattern) - if err != nil { - logWarn(fmt.Sprintf("cleanupOldLogs: failed to list logs: %v", err)) - return stats, fmt.Errorf("cleanupOldLogs: %w", err) + prefixes := logPrefixes() + if len(prefixes) == 0 { + prefixes = []string{defaultWrapperName} + } + + seen := make(map[string]struct{}) + var matches []string + for _, prefix := range prefixes { + pattern := filepath.Join(tempDir, fmt.Sprintf("%s-*.log", prefix)) + found, err := globLogFiles(pattern) + if err != nil { + logWarn(fmt.Sprintf("cleanupOldLogs: failed to list logs: %v", err)) + return stats, fmt.Errorf("cleanupOldLogs: %w", err) + } + for _, path := range found { + if _, ok := seen[path]; ok { + continue + } + seen[path] = struct{}{} + matches = append(matches, path) + } } var removeErr error @@ -428,28 +444,37 @@ func isPIDReused(logPath string, pid int) bool { func parsePIDFromLog(path string) (int, bool) { name := filepath.Base(path) - if !strings.HasPrefix(name, "codex-wrapper-") || !strings.HasSuffix(name, ".log") { - return 0, false + prefixes := logPrefixes() + if len(prefixes) == 0 { + prefixes = []string{defaultWrapperName} } - core := strings.TrimSuffix(strings.TrimPrefix(name, "codex-wrapper-"), ".log") - if core == "" { - return 0, false + for _, prefix := range prefixes { + prefixWithDash := fmt.Sprintf("%s-", prefix) + if !strings.HasPrefix(name, prefixWithDash) || !strings.HasSuffix(name, ".log") { + continue + } + + core := strings.TrimSuffix(strings.TrimPrefix(name, prefixWithDash), ".log") + if core == "" { + continue + } + + pidPart := core + if idx := strings.IndexRune(core, '-'); idx != -1 { + pidPart = core[:idx] + } + + if pidPart == "" { + continue + } + + pid, err := strconv.Atoi(pidPart) + if err != nil || pid <= 0 { + continue + } + return pid, true } - pidPart := core - if idx := strings.IndexRune(core, '-'); idx != -1 { - pidPart = core[:idx] - } - - if pidPart == "" { - return 0, false - } - - pid, err := strconv.Atoi(pidPart) - if err != nil || pid <= 0 { - return 0, false - } - - return pid, true + return 0, false } diff --git a/codeagent-wrapper/main.go b/codeagent-wrapper/main.go index a6e8252..1f26c3b 100644 --- a/codeagent-wrapper/main.go +++ b/codeagent-wrapper/main.go @@ -7,20 +7,21 @@ import ( "os" "os/exec" "os/signal" + "reflect" "strings" "sync/atomic" "time" ) const ( - version = "5.0.0" - defaultWorkdir = "." - defaultTimeout = 7200 // seconds - codexLogLineLimit = 1000 - stdinSpecialChars = "\n\\\"'`$" - stderrCaptureLimit = 4 * 1024 - defaultBackendName = "codex" - wrapperName = "codeagent-wrapper" + version = "5.0.0" + defaultWorkdir = "." + defaultTimeout = 7200 // seconds + codexLogLineLimit = 1000 + stdinSpecialChars = "\n\\\"'`$" + stderrCaptureLimit = 4 * 1024 + defaultBackendName = "codex" + defaultCodexCommand = "codex" // stdout close reasons stdoutCloseReasonWait = "wait-done" @@ -33,7 +34,7 @@ const ( var ( stdinReader io.Reader = os.Stdin isTerminalFn = defaultIsTerminal - codexCommand = "codex" + codexCommand = defaultCodexCommand cleanupHook func() loggerPtr atomic.Pointer[Logger] @@ -45,6 +46,7 @@ var ( signalNotifyFn = signal.Notify signalStopFn = signal.Stop terminateCommandFn = terminateCommand + defaultBuildArgsFn = buildCodexArgs ) var forceKillDelay atomic.Int32 @@ -106,11 +108,12 @@ func main() { // run is the main logic, returns exit code for testability func run() (exitCode int) { + name := currentWrapperName() // Handle --version and --help first (no logger needed) if len(os.Args) > 1 { switch os.Args[1] { case "--version", "-v": - fmt.Printf("%s version %s\n", wrapperName, version) + fmt.Printf("%s version %s\n", name, version) return 0 case "--help", "-h": printHelp() @@ -145,6 +148,9 @@ func run() (exitCode int) { }() defer runCleanupHook() + // Clean up stale logs from previous runs. + runStartupCleanup() + // Handle remaining commands if len(os.Args) > 1 { switch os.Args[1] { @@ -152,9 +158,9 @@ func run() (exitCode int) { if len(os.Args) > 2 { fmt.Fprintln(os.Stderr, "ERROR: --parallel reads its task configuration from stdin and does not accept additional arguments.") fmt.Fprintln(os.Stderr, "Usage examples:") - fmt.Fprintf(os.Stderr, " %s --parallel < tasks.txt\n", wrapperName) - fmt.Fprintf(os.Stderr, " echo '...' | %s --parallel\n", wrapperName) - fmt.Fprintf(os.Stderr, " %s --parallel <<'EOF'\n", wrapperName) + fmt.Fprintf(os.Stderr, " %s --parallel < tasks.txt\n", name) + fmt.Fprintf(os.Stderr, " echo '...' | %s --parallel\n", name) + fmt.Fprintf(os.Stderr, " %s --parallel <<'EOF'\n", name) return 1 } data, err := io.ReadAll(stdinReader) @@ -204,10 +210,19 @@ func run() (exitCode int) { logError(err.Error()) return 1 } - // Wire selected backend into runtime hooks for the rest of the execution. - codexCommand = backend.Command() - buildCodexArgsFn = backend.BuildArgs cfg.Backend = backend.Name() + + cmdInjected := codexCommand != defaultCodexCommand + argsInjected := buildCodexArgsFn != nil && reflect.ValueOf(buildCodexArgsFn).Pointer() != reflect.ValueOf(defaultBuildArgsFn).Pointer() + + // Wire selected backend into runtime hooks for the rest of the execution, + // but preserve any injected test hooks for the default backend. + if backend.Name() != defaultBackendName || !cmdInjected { + codexCommand = backend.Command() + } + if backend.Name() != defaultBackendName || !argsInjected { + buildCodexArgsFn = backend.BuildArgs + } logInfo(fmt.Sprintf("Selected backend: %s", backend.Name())) timeoutSec := resolveTimeout() @@ -253,7 +268,7 @@ func run() (exitCode int) { codexArgs := buildCodexArgsFn(cfg, targetArg) // Print startup information to stderr - fmt.Fprintf(os.Stderr, "[%s]\n", wrapperName) + fmt.Fprintf(os.Stderr, "[%s]\n", name) fmt.Fprintf(os.Stderr, " Backend: %s\n", cfg.Backend) fmt.Fprintf(os.Stderr, " Command: %s %s\n", codexCommand, strings.Join(codexArgs, " ")) fmt.Fprintf(os.Stderr, " PID: %d\n", os.Getpid()) @@ -361,22 +376,23 @@ func runCleanupHook() { } func printHelp() { - help := `codeagent-wrapper - Go wrapper for AI CLI backends + name := currentWrapperName() + help := fmt.Sprintf(`%[1]s - Go wrapper for AI CLI backends Usage: - codeagent-wrapper "task" [workdir] - codeagent-wrapper --backend claude "task" [workdir] - codeagent-wrapper - [workdir] Read task from stdin - codeagent-wrapper resume "task" [workdir] - codeagent-wrapper resume - [workdir] - codeagent-wrapper --parallel Run tasks in parallel (config from stdin) - codeagent-wrapper --version - codeagent-wrapper --help + %[1]s "task" [workdir] + %[1]s --backend claude "task" [workdir] + %[1]s - [workdir] Read task from stdin + %[1]s resume "task" [workdir] + %[1]s resume - [workdir] + %[1]s --parallel Run tasks in parallel (config from stdin) + %[1]s --version + %[1]s --help Parallel mode examples: - codeagent-wrapper --parallel < tasks.txt - echo '...' | codeagent-wrapper --parallel - codeagent-wrapper --parallel <<'EOF' + %[1]s --parallel < tasks.txt + echo '...' | %[1]s --parallel + %[1]s --parallel <<'EOF' Environment Variables: CODEX_TIMEOUT Timeout in milliseconds (default: 7200000) @@ -387,6 +403,6 @@ Exit Codes: 124 Timeout 127 backend command not found 130 Interrupted (Ctrl+C) - * Passthrough from backend process` + * Passthrough from backend process`, name) fmt.Println(help) } diff --git a/codeagent-wrapper/parser.go b/codeagent-wrapper/parser.go index 1e794da..7f97ff3 100644 --- a/codeagent-wrapper/parser.go +++ b/codeagent-wrapper/parser.go @@ -50,6 +50,10 @@ func parseJSONStreamWithWarn(r io.Reader, warnFn func(string)) (message, threadI } func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string)) (message, threadID string) { + return parseJSONStreamInternal(r, warnFn, infoFn, nil) +} + +func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(string), onMessage func()) (message, threadID string) { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, 64*1024), 10*1024*1024) @@ -60,6 +64,12 @@ func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string infoFn = func(string) {} } + notifyMessage := func() { + if onMessage != nil { + onMessage() + } + } + totalEvents := 0 var ( @@ -133,6 +143,7 @@ func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized))) if event.Item != nil && event.Item.Type == "agent_message" && normalized != "" { codexMessage = normalized + notifyMessage() } } @@ -151,6 +162,7 @@ func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string if event.Result != "" { claudeMessage = event.Result + notifyMessage() } case hasKey(raw, "role") || hasKey(raw, "delta"): @@ -166,6 +178,7 @@ func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string if event.Content != "" { geminiBuffer.WriteString(event.Content) + notifyMessage() } infoFn(fmt.Sprintf("Parsed Gemini event #%d type=%s role=%s delta=%t status=%s content_len=%d", totalEvents, event.Type, event.Role, event.Delta, event.Status, len(event.Content))) diff --git a/codeagent-wrapper/wrapper_name.go b/codeagent-wrapper/wrapper_name.go new file mode 100644 index 0000000..01e74d3 --- /dev/null +++ b/codeagent-wrapper/wrapper_name.go @@ -0,0 +1,60 @@ +package main + +import ( + "os" + "path/filepath" + "strings" +) + +const ( + defaultWrapperName = "codeagent-wrapper" + legacyWrapperName = "codex-wrapper" +) + +// currentWrapperName resolves the wrapper name based on the invoked binary. +// Only known names are honored to avoid leaking build/test binary names into logs. +func currentWrapperName() string { + if len(os.Args) == 0 { + return defaultWrapperName + } + + base := filepath.Base(os.Args[0]) + base = strings.TrimSuffix(base, ".exe") // tolerate Windows executables + + switch base { + case defaultWrapperName, legacyWrapperName: + return base + default: + return defaultWrapperName + } +} + +// logPrefixes returns the set of accepted log name prefixes, including the +// current wrapper name and legacy aliases. +func logPrefixes() []string { + prefixes := []string{currentWrapperName(), defaultWrapperName, legacyWrapperName} + seen := make(map[string]struct{}, len(prefixes)) + var unique []string + for _, prefix := range prefixes { + if prefix == "" { + continue + } + if _, ok := seen[prefix]; ok { + continue + } + seen[prefix] = struct{}{} + unique = append(unique, prefix) + } + return unique +} + +// primaryLogPrefix returns the preferred filename prefix for log files. +// Defaults to the current wrapper name when available, otherwise falls back +// to the canonical default name. +func primaryLogPrefix() string { + prefixes := logPrefixes() + if len(prefixes) == 0 { + return defaultWrapperName + } + return prefixes[0] +}