diff --git a/codeagent-wrapper/config.go b/codeagent-wrapper/config.go index d64cd5c..078fcdb 100644 --- a/codeagent-wrapper/config.go +++ b/codeagent-wrapper/config.go @@ -41,6 +41,7 @@ type TaskResult struct { Message string `json:"message"` SessionID string `json:"session_id"` Error string `json:"error"` + LogPath string `json:"log_path"` } var backendRegistry = map[string]Backend{ diff --git a/codeagent-wrapper/executor.go b/codeagent-wrapper/executor.go index cc7e4ca..1c86a20 100644 --- a/codeagent-wrapper/executor.go +++ b/codeagent-wrapper/executor.go @@ -11,10 +11,105 @@ import ( "sort" "strings" "sync" + "sync/atomic" "syscall" "time" ) +// commandRunner abstracts exec.Cmd for testability +type commandRunner interface { + Start() error + Wait() error + StdoutPipe() (io.ReadCloser, error) + StdinPipe() (io.WriteCloser, error) + SetStderr(io.Writer) + Process() processHandle +} + +// processHandle abstracts os.Process for testability +type processHandle interface { + Pid() int + Kill() error + Signal(os.Signal) error +} + +// realCmd implements commandRunner using exec.Cmd +type realCmd struct { + cmd *exec.Cmd +} + +func (r *realCmd) Start() error { + if r.cmd == nil { + return errors.New("command is nil") + } + return r.cmd.Start() +} + +func (r *realCmd) Wait() error { + if r.cmd == nil { + return errors.New("command is nil") + } + return r.cmd.Wait() +} + +func (r *realCmd) StdoutPipe() (io.ReadCloser, error) { + if r.cmd == nil { + return nil, errors.New("command is nil") + } + return r.cmd.StdoutPipe() +} + +func (r *realCmd) StdinPipe() (io.WriteCloser, error) { + if r.cmd == nil { + return nil, errors.New("command is nil") + } + return r.cmd.StdinPipe() +} + +func (r *realCmd) SetStderr(w io.Writer) { + if r.cmd != nil { + r.cmd.Stderr = w + } +} + +func (r *realCmd) Process() processHandle { + if r == nil || r.cmd == nil || r.cmd.Process == nil { + return nil + } + return &realProcess{proc: r.cmd.Process} +} + +// realProcess implements processHandle using os.Process +type realProcess struct { + proc *os.Process +} + +func (p *realProcess) Pid() int { + if p == nil || p.proc == nil { + return 0 + } + return p.proc.Pid +} + +func (p *realProcess) Kill() error { + if p == nil || p.proc == nil { + return nil + } + return p.proc.Kill() +} + +func (p *realProcess) Signal(sig os.Signal) error { + if p == nil || p.proc == nil { + return nil + } + return p.proc.Signal(sig) +} + +// newCommandRunner creates a new commandRunner (test hook injection point) +var newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &realCmd{cmd: commandContext(ctx, name, args...)} +} + type parseResult struct { message string threadID string @@ -196,6 +291,9 @@ func generateFinalOutput(results []TaskResult) string { if res.SessionID != "" { sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID)) } + if res.LogPath != "" { + sb.WriteString(fmt.Sprintf("Log: %s\n", res.LogPath)) + } if res.Message != "" { sb.WriteString(fmt.Sprintf("\n%s\n", res.Message)) } @@ -335,7 +433,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String()) } - cmd := commandContext(ctx, codexCommand, codexArgs...) + cmd := newCommandRunner(ctx, codexCommand, codexArgs...) stderrWriters := []io.Writer{stderrBuf} if stderrLogger != nil { @@ -345,9 +443,9 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo stderrWriters = append([]io.Writer{os.Stderr}, stderrWriters...) } if len(stderrWriters) == 1 { - cmd.Stderr = stderrWriters[0] + cmd.SetStderr(stderrWriters[0]) } else { - cmd.Stderr = io.MultiWriter(stderrWriters...) + cmd.SetStderr(io.MultiWriter(stderrWriters...)) } var stdinPipe io.WriteCloser @@ -391,7 +489,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo return result } - logInfoFn(fmt.Sprintf("Starting %s with PID: %d", codexCommand, cmd.Process.Pid)) + logInfoFn(fmt.Sprintf("Starting %s with PID: %d", codexCommand, cmd.Process().Pid())) if logger := activeLogger(); logger != nil { logInfoFn(fmt.Sprintf("Log capturing to: %s", logger.Path())) } @@ -475,11 +573,14 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo result.ExitCode = 0 result.Message = message result.SessionID = threadID + if logger := activeLogger(); logger != nil { + result.LogPath = logger.Path() + } return result } -func forwardSignals(ctx context.Context, cmd *exec.Cmd, logErrorFn func(string)) { +func forwardSignals(ctx context.Context, cmd commandRunner, logErrorFn func(string)) { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) @@ -488,11 +589,11 @@ func forwardSignals(ctx context.Context, cmd *exec.Cmd, logErrorFn func(string)) select { case sig := <-sigCh: logErrorFn(fmt.Sprintf("Received signal: %v", sig)) - if cmd.Process != nil { - _ = cmd.Process.Signal(syscall.SIGTERM) - time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() { - if cmd.Process != nil { - _ = cmd.Process.Kill() + if proc := cmd.Process(); proc != nil { + _ = proc.Signal(syscall.SIGTERM) + time.AfterFunc(time.Duration(forceKillDelay.Load())*time.Second, func() { + if p := cmd.Process(); p != nil { + _ = p.Kill() } }) } @@ -513,16 +614,60 @@ func cancelReason(ctx context.Context) string { return "Execution cancelled, terminating codex process" } -func terminateProcess(cmd *exec.Cmd) *time.Timer { - if cmd == nil || cmd.Process == nil { +type forceKillTimer struct { + timer *time.Timer + done chan struct{} + stopped atomic.Bool + drained atomic.Bool +} + +func (t *forceKillTimer) Stop() { + if t == nil || t.timer == nil { + return + } + if !t.timer.Stop() { + <-t.done + t.drained.Store(true) + } + t.stopped.Store(true) +} + +func terminateCommand(cmd commandRunner) *forceKillTimer { + if cmd == nil { + return nil + } + proc := cmd.Process() + if proc == nil { return nil } - _ = cmd.Process.Signal(syscall.SIGTERM) + _ = proc.Signal(syscall.SIGTERM) - return time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() { - if cmd.Process != nil { - _ = cmd.Process.Kill() + done := make(chan struct{}, 1) + timer := time.AfterFunc(time.Duration(forceKillDelay.Load())*time.Second, func() { + if p := cmd.Process(); p != nil { + _ = p.Kill() + } + close(done) + }) + + return &forceKillTimer{timer: timer, done: done} +} + +func terminateProcess(cmd commandRunner) *time.Timer { + if cmd == nil { + return nil + } + proc := cmd.Process() + if proc == nil { + return nil + } + + _ = proc.Signal(syscall.SIGTERM) + + return time.AfterFunc(time.Duration(forceKillDelay.Load())*time.Second, func() { + if p := cmd.Process(); p != nil { + _ = p.Kill() } }) } diff --git a/codeagent-wrapper/logger_test.go b/codeagent-wrapper/logger_test.go index 2e5406c..74a5ea2 100644 --- a/codeagent-wrapper/logger_test.go +++ b/codeagent-wrapper/logger_test.go @@ -171,7 +171,7 @@ func TestRunLoggerTerminateProcessActive(t *testing.T) { t.Skipf("cannot start sleep command: %v", err) } - timer := terminateProcess(cmd) + timer := terminateProcess(&realCmd{cmd: cmd}) if timer == nil { t.Fatalf("terminateProcess returned nil timer for active process") } @@ -197,7 +197,7 @@ func TestRunTerminateProcessNil(t *testing.T) { if timer := terminateProcess(nil); timer != nil { t.Fatalf("terminateProcess(nil) should return nil timer") } - if timer := terminateProcess(&exec.Cmd{}); timer != nil { + if timer := terminateProcess(&realCmd{cmd: &exec.Cmd{}}); timer != nil { t.Fatalf("terminateProcess with nil process should return nil timer") } } diff --git a/codeagent-wrapper/main.go b/codeagent-wrapper/main.go index 18c72ed..a6e8252 100644 --- a/codeagent-wrapper/main.go +++ b/codeagent-wrapper/main.go @@ -6,8 +6,10 @@ import ( "io" "os" "os/exec" + "os/signal" "strings" "sync/atomic" + "time" ) const ( @@ -19,6 +21,12 @@ const ( stderrCaptureLimit = 4 * 1024 defaultBackendName = "codex" wrapperName = "codeagent-wrapper" + + // stdout close reasons + stdoutCloseReasonWait = "wait-done" + stdoutCloseReasonDrain = "drain-timeout" + stdoutCloseReasonCtx = "context-cancel" + stdoutDrainTimeout = 100 * time.Millisecond ) // Test hooks for dependency injection @@ -29,13 +37,68 @@ var ( cleanupHook func() loggerPtr atomic.Pointer[Logger] - buildCodexArgsFn = buildCodexArgs - selectBackendFn = selectBackend - commandContext = exec.CommandContext - jsonMarshal = json.Marshal - forceKillDelay = 5 // seconds - made variable for testability + buildCodexArgsFn = buildCodexArgs + selectBackendFn = selectBackend + commandContext = exec.CommandContext + jsonMarshal = json.Marshal + cleanupLogsFn = cleanupOldLogs + signalNotifyFn = signal.Notify + signalStopFn = signal.Stop + terminateCommandFn = terminateCommand ) +var forceKillDelay atomic.Int32 + +func init() { + forceKillDelay.Store(5) // seconds - default value +} + +func runStartupCleanup() { + if cleanupLogsFn == nil { + return + } + defer func() { + if r := recover(); r != nil { + logWarn(fmt.Sprintf("cleanupOldLogs panic: %v", r)) + } + }() + if _, err := cleanupLogsFn(); err != nil { + logWarn(fmt.Sprintf("cleanupOldLogs error: %v", err)) + } +} + +func runCleanupMode() int { + if cleanupLogsFn == nil { + fmt.Fprintln(os.Stderr, "Cleanup failed: log cleanup function not configured") + return 1 + } + + stats, err := cleanupLogsFn() + if err != nil { + fmt.Fprintf(os.Stderr, "Cleanup failed: %v\n", err) + return 1 + } + + fmt.Println("Cleanup completed") + fmt.Printf("Files scanned: %d\n", stats.Scanned) + fmt.Printf("Files deleted: %d\n", stats.Deleted) + if len(stats.DeletedFiles) > 0 { + for _, f := range stats.DeletedFiles { + fmt.Printf(" - %s\n", f) + } + } + fmt.Printf("Files kept: %d\n", stats.Kept) + if len(stats.KeptFiles) > 0 { + for _, f := range stats.KeptFiles { + fmt.Printf(" - %s\n", f) + } + } + if stats.Errors > 0 { + fmt.Printf("Deletion errors: %d\n", stats.Errors) + } + return 0 +} + func main() { exitCode := run() os.Exit(exitCode) @@ -52,6 +115,8 @@ func run() (exitCode int) { case "--help", "-h": printHelp() return 0 + case "--cleanup": + return runCleanupMode() } } diff --git a/codeagent-wrapper/main_test.go b/codeagent-wrapper/main_test.go index 2d1344f..f72f00c 100644 --- a/codeagent-wrapper/main_test.go +++ b/codeagent-wrapper/main_test.go @@ -2002,7 +2002,7 @@ func TestRunCodexTask_SignalHandling(t *testing.T) { func TestForwardSignals_ContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - forwardSignals(ctx, &exec.Cmd{}, func(string) {}) + forwardSignals(ctx, &realCmd{cmd: &exec.Cmd{}}, func(string) {}) cancel() time.Sleep(10 * time.Millisecond) } @@ -3070,13 +3070,13 @@ func TestRunForwardSignals(t *testing.T) { t.Skip("sleep command not available on Windows") } - cmd := exec.Command("sleep", "5") - if err := cmd.Start(); err != nil { + execCmd := exec.Command("sleep", "5") + if err := execCmd.Start(); err != nil { t.Skipf("unable to start sleep command: %v", err) } defer func() { - _ = cmd.Process.Kill() - cmd.Wait() + _ = execCmd.Process.Kill() + execCmd.Wait() }() ctx, cancel := context.WithCancel(context.Background()) @@ -3099,6 +3099,7 @@ func TestRunForwardSignals(t *testing.T) { var mu sync.Mutex var logs []string + cmd := &realCmd{cmd: execCmd} forwardSignals(ctx, cmd, func(msg string) { mu.Lock() defer mu.Unlock()