mirror of
https://github.com/cexll/myclaude.git
synced 2026-02-10 03:14:32 +08:00
Compare commits
6 Commits
freespace8
...
fix/parall
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f3941adae | ||
|
|
18c6c32628 | ||
|
|
1ad2cfe629 | ||
|
|
7bad716fbc | ||
|
|
220be6eb5c | ||
|
|
ead11d6996 |
4
codex-wrapper/.gitignore
vendored
4
codex-wrapper/.gitignore
vendored
@@ -1 +1,5 @@
|
|||||||
coverage.out
|
coverage.out
|
||||||
|
coverage*.out
|
||||||
|
cover.out
|
||||||
|
cover_*.out
|
||||||
|
coverage.html
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ type Logger struct {
|
|||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
workerWG sync.WaitGroup
|
workerWG sync.WaitGroup
|
||||||
pendingWG sync.WaitGroup
|
pendingWG sync.WaitGroup
|
||||||
|
flushMu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type logEntry struct {
|
type logEntry struct {
|
||||||
@@ -46,12 +47,12 @@ type CleanupStats struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
processRunningCheck = isProcessRunning
|
processRunningCheck = isProcessRunning
|
||||||
processStartTimeFn = getProcessStartTime
|
processStartTimeFn = getProcessStartTime
|
||||||
removeLogFileFn = os.Remove
|
removeLogFileFn = os.Remove
|
||||||
globLogFiles = filepath.Glob
|
globLogFiles = filepath.Glob
|
||||||
fileStatFn = os.Lstat // Use Lstat to detect symlinks
|
fileStatFn = os.Lstat // Use Lstat to detect symlinks
|
||||||
evalSymlinksFn = filepath.EvalSymlinks
|
evalSymlinksFn = filepath.EvalSymlinks
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewLogger creates the async logger and starts the worker goroutine.
|
// NewLogger creates the async logger and starts the worker goroutine.
|
||||||
@@ -176,6 +177,9 @@ func (l *Logger) Flush() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.flushMu.Lock()
|
||||||
|
defer l.flushMu.Unlock()
|
||||||
|
|
||||||
// Wait for pending entries with timeout
|
// Wait for pending entries with timeout
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
@@ -221,7 +225,9 @@ func (l *Logger) log(level, msg string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
entry := logEntry{level: level, msg: msg}
|
entry := logEntry{level: level, msg: msg}
|
||||||
|
l.flushMu.Lock()
|
||||||
l.pendingWG.Add(1)
|
l.pendingWG.Add(1)
|
||||||
|
l.flushMu.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case l.ch <- entry:
|
case l.ch <- entry:
|
||||||
|
|||||||
@@ -22,12 +22,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
version = "4.8.2"
|
version = "5.1.2"
|
||||||
defaultWorkdir = "."
|
defaultWorkdir = "."
|
||||||
defaultTimeout = 7200 // seconds
|
defaultTimeout = 7200 // seconds
|
||||||
codexLogLineLimit = 1000
|
codexLogLineLimit = 1000
|
||||||
stdinSpecialChars = "\n\\\"'`$"
|
stdinSpecialChars = "\n\\\"'`$"
|
||||||
stderrCaptureLimit = 4 * 1024
|
stderrCaptureLimit = 4 * 1024
|
||||||
|
stdoutDrainTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
stdoutCloseReasonWait = "wait-complete"
|
||||||
|
stdoutCloseReasonCtx = "context-cancelled"
|
||||||
|
stdoutCloseReasonDrain = "drain-timeout"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test hooks for dependency injection
|
// Test hooks for dependency injection
|
||||||
@@ -40,10 +47,14 @@ var (
|
|||||||
|
|
||||||
buildCodexArgsFn = buildCodexArgs
|
buildCodexArgsFn = buildCodexArgs
|
||||||
commandContext = exec.CommandContext
|
commandContext = exec.CommandContext
|
||||||
jsonMarshal = json.Marshal
|
newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
|
||||||
cleanupLogsFn = cleanupOldLogs
|
return &realCmd{cmd: commandContext(ctx, name, args...)}
|
||||||
signalNotifyFn = signal.Notify
|
}
|
||||||
signalStopFn = signal.Stop
|
jsonMarshal = json.Marshal
|
||||||
|
cleanupLogsFn = cleanupOldLogs
|
||||||
|
signalNotifyFn = signal.Notify
|
||||||
|
signalStopFn = signal.Stop
|
||||||
|
terminateCommandFn = terminateCommand
|
||||||
)
|
)
|
||||||
|
|
||||||
var forceKillDelay atomic.Int32
|
var forceKillDelay atomic.Int32
|
||||||
@@ -52,6 +63,77 @@ func init() {
|
|||||||
forceKillDelay.Store(5) // seconds - default value
|
forceKillDelay.Store(5) // seconds - default value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type commandRunner interface {
|
||||||
|
Start() error
|
||||||
|
Wait() error
|
||||||
|
StdoutPipe() (io.ReadCloser, error)
|
||||||
|
StdinPipe() (io.WriteCloser, error)
|
||||||
|
SetStderr(io.Writer)
|
||||||
|
Process() processHandle
|
||||||
|
}
|
||||||
|
|
||||||
|
type processHandle interface {
|
||||||
|
Pid() int
|
||||||
|
Kill() error
|
||||||
|
Signal(os.Signal) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type realCmd struct {
|
||||||
|
cmd *exec.Cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realCmd) Start() error {
|
||||||
|
return r.cmd.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realCmd) Wait() error {
|
||||||
|
return r.cmd.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realCmd) StdoutPipe() (io.ReadCloser, error) {
|
||||||
|
return r.cmd.StdoutPipe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realCmd) StdinPipe() (io.WriteCloser, error) {
|
||||||
|
return r.cmd.StdinPipe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realCmd) SetStderr(w io.Writer) {
|
||||||
|
r.cmd.Stderr = w
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realCmd) Process() processHandle {
|
||||||
|
if r.cmd == nil || r.cmd.Process == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &realProcess{proc: r.cmd.Process}
|
||||||
|
}
|
||||||
|
|
||||||
|
type realProcess struct {
|
||||||
|
proc *os.Process
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *realProcess) Pid() int {
|
||||||
|
if p == nil || p.proc == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return p.proc.Pid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *realProcess) Kill() error {
|
||||||
|
if p == nil || p.proc == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return p.proc.Kill()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *realProcess) Signal(sig os.Signal) error {
|
||||||
|
if p == nil || p.proc == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return p.proc.Signal(sig)
|
||||||
|
}
|
||||||
|
|
||||||
// Config holds CLI configuration
|
// Config holds CLI configuration
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Mode string // "new" or "resume"
|
Mode string // "new" or "resume"
|
||||||
@@ -85,6 +167,7 @@ type TaskResult struct {
|
|||||||
Message string `json:"message"`
|
Message string `json:"message"`
|
||||||
SessionID string `json:"session_id"`
|
SessionID string `json:"session_id"`
|
||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
|
LogPath string `json:"log_path"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseParallelConfig(data []byte) (*ParallelConfig, error) {
|
func parseParallelConfig(data []byte) (*ParallelConfig, error) {
|
||||||
@@ -254,6 +337,27 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult {
|
|||||||
failed := make(map[string]TaskResult, totalTasks)
|
failed := make(map[string]TaskResult, totalTasks)
|
||||||
resultsCh := make(chan TaskResult, totalTasks)
|
resultsCh := make(chan TaskResult, totalTasks)
|
||||||
|
|
||||||
|
var startPrintMu sync.Mutex
|
||||||
|
bannerPrinted := false
|
||||||
|
|
||||||
|
printTaskStart := func(taskID string) {
|
||||||
|
logger := activeLogger()
|
||||||
|
if logger == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
path := logger.Path()
|
||||||
|
if path == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
startPrintMu.Lock()
|
||||||
|
if !bannerPrinted {
|
||||||
|
fmt.Fprintln(os.Stderr, "=== Starting Parallel Execution ===")
|
||||||
|
bannerPrinted = true
|
||||||
|
}
|
||||||
|
fmt.Fprintf(os.Stderr, "Task %s: Log: %s\n", taskID, path)
|
||||||
|
startPrintMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
for _, layer := range layers {
|
for _, layer := range layers {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
executed := 0
|
executed := 0
|
||||||
@@ -275,6 +379,7 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult {
|
|||||||
resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)}
|
resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
printTaskStart(ts.ID)
|
||||||
resultsCh <- runCodexTaskFn(ts, timeout)
|
resultsCh <- runCodexTaskFn(ts, timeout)
|
||||||
}(task)
|
}(task)
|
||||||
}
|
}
|
||||||
@@ -340,6 +445,9 @@ func generateFinalOutput(results []TaskResult) string {
|
|||||||
if res.SessionID != "" {
|
if res.SessionID != "" {
|
||||||
sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID))
|
sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID))
|
||||||
}
|
}
|
||||||
|
if res.LogPath != "" {
|
||||||
|
sb.WriteString(fmt.Sprintf("Log: %s\n", res.LogPath))
|
||||||
|
}
|
||||||
if res.Message != "" {
|
if res.Message != "" {
|
||||||
sb.WriteString(fmt.Sprintf("\n%s\n", res.Message))
|
sb.WriteString(fmt.Sprintf("\n%s\n", res.Message))
|
||||||
}
|
}
|
||||||
@@ -383,6 +491,8 @@ func runStartupCleanup() {
|
|||||||
|
|
||||||
// run is the main logic, returns exit code for testability
|
// run is the main logic, returns exit code for testability
|
||||||
func run() (exitCode int) {
|
func run() (exitCode int) {
|
||||||
|
var startupCleanupWG sync.WaitGroup
|
||||||
|
|
||||||
// Handle --version and --help first (no logger needed)
|
// Handle --version and --help first (no logger needed)
|
||||||
if len(os.Args) > 1 {
|
if len(os.Args) > 1 {
|
||||||
switch os.Args[1] {
|
switch os.Args[1] {
|
||||||
@@ -421,9 +531,16 @@ func run() (exitCode int) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
defer runCleanupHook()
|
defer runCleanupHook()
|
||||||
|
defer startupCleanupWG.Wait()
|
||||||
|
|
||||||
// Run cleanup asynchronously to avoid blocking startup
|
// Run cleanup asynchronously to avoid blocking startup but wait before exit
|
||||||
go runStartupCleanup()
|
if cleanupLogsFn != nil {
|
||||||
|
startupCleanupWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer startupCleanupWG.Done()
|
||||||
|
runStartupCleanup()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Handle remaining commands
|
// Handle remaining commands
|
||||||
if len(os.Args) > 1 {
|
if len(os.Args) > 1 {
|
||||||
@@ -525,7 +642,20 @@ func run() (exitCode int) {
|
|||||||
fmt.Fprintf(os.Stderr, "[codex-wrapper]\n")
|
fmt.Fprintf(os.Stderr, "[codex-wrapper]\n")
|
||||||
fmt.Fprintf(os.Stderr, " Command: %s %s\n", codexCommand, strings.Join(codexArgs, " "))
|
fmt.Fprintf(os.Stderr, " Command: %s %s\n", codexCommand, strings.Join(codexArgs, " "))
|
||||||
fmt.Fprintf(os.Stderr, " PID: %d\n", os.Getpid())
|
fmt.Fprintf(os.Stderr, " PID: %d\n", os.Getpid())
|
||||||
fmt.Fprintf(os.Stderr, " Log: %s\n", logger.Path())
|
logPath := ""
|
||||||
|
if logger != nil {
|
||||||
|
logPath = logger.Path()
|
||||||
|
}
|
||||||
|
fmt.Fprintf(os.Stderr, " Log: %s\n", logPath)
|
||||||
|
printedLogPath := logPath
|
||||||
|
printLogPath := func(path string) {
|
||||||
|
if path == "" || path == printedLogPath {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Fprintf(os.Stderr, "[codex-wrapper]\n")
|
||||||
|
fmt.Fprintf(os.Stderr, " Log: %s\n", path)
|
||||||
|
printedLogPath = path
|
||||||
|
}
|
||||||
|
|
||||||
if useStdin {
|
if useStdin {
|
||||||
var reasons []string
|
var reasons []string
|
||||||
@@ -572,6 +702,7 @@ func run() (exitCode int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
result := runCodexTask(taskSpec, false, cfg.Timeout)
|
result := runCodexTask(taskSpec, false, cfg.Timeout)
|
||||||
|
printLogPath(result.LogPath)
|
||||||
|
|
||||||
if result.ExitCode != 0 {
|
if result.ExitCode != 0 {
|
||||||
return result.ExitCode
|
return result.ExitCode
|
||||||
@@ -710,8 +841,8 @@ func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText str
|
|||||||
return res.Message, res.SessionID, res.ExitCode
|
return res.Message, res.SessionID, res.ExitCode
|
||||||
}
|
}
|
||||||
|
|
||||||
func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult {
|
func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) (result TaskResult) {
|
||||||
result := TaskResult{TaskID: taskSpec.ID}
|
result.TaskID = taskSpec.ID
|
||||||
|
|
||||||
cfg := &Config{
|
cfg := &Config{
|
||||||
Mode: taskSpec.Mode,
|
Mode: taskSpec.Mode,
|
||||||
@@ -746,6 +877,15 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
|
|||||||
return fmt.Sprintf("[Task: %s] %s", taskSpec.ID, msg)
|
return fmt.Sprintf("[Task: %s] %s", taskSpec.ID, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
captureLogPath := func() {
|
||||||
|
if result.LogPath != "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if logger := activeLogger(); logger != nil {
|
||||||
|
result.LogPath = logger.Path()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var logInfoFn func(string)
|
var logInfoFn func(string)
|
||||||
var logWarnFn func(string)
|
var logWarnFn func(string)
|
||||||
var logErrorFn func(string)
|
var logErrorFn func(string)
|
||||||
@@ -786,6 +926,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
|
captureLogPath()
|
||||||
if tempLogger != nil {
|
if tempLogger != nil {
|
||||||
closeLogger()
|
closeLogger()
|
||||||
}
|
}
|
||||||
@@ -810,7 +951,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
|
|||||||
return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String())
|
return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := commandContext(ctx, codexCommand, codexArgs...)
|
cmd := newCommandRunner(ctx, codexCommand, codexArgs...)
|
||||||
|
|
||||||
stderrWriters := []io.Writer{stderrBuf}
|
stderrWriters := []io.Writer{stderrBuf}
|
||||||
if stderrLogger != nil {
|
if stderrLogger != nil {
|
||||||
@@ -820,9 +961,9 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
|
|||||||
stderrWriters = append([]io.Writer{os.Stderr}, stderrWriters...)
|
stderrWriters = append([]io.Writer{os.Stderr}, stderrWriters...)
|
||||||
}
|
}
|
||||||
if len(stderrWriters) == 1 {
|
if len(stderrWriters) == 1 {
|
||||||
cmd.Stderr = stderrWriters[0]
|
cmd.SetStderr(stderrWriters[0])
|
||||||
} else {
|
} else {
|
||||||
cmd.Stderr = io.MultiWriter(stderrWriters...)
|
cmd.SetStderr(io.MultiWriter(stderrWriters...))
|
||||||
}
|
}
|
||||||
|
|
||||||
var stdinPipe io.WriteCloser
|
var stdinPipe io.WriteCloser
|
||||||
@@ -865,7 +1006,9 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
logInfoFn(fmt.Sprintf("Starting codex with PID: %d", cmd.Process.Pid))
|
if proc := cmd.Process(); proc != nil {
|
||||||
|
logInfoFn(fmt.Sprintf("Starting codex with PID: %d", proc.Pid()))
|
||||||
|
}
|
||||||
if logger := activeLogger(); logger != nil {
|
if logger := activeLogger(); logger != nil {
|
||||||
logInfoFn(fmt.Sprintf("Log capturing to: %s", logger.Path()))
|
logInfoFn(fmt.Sprintf("Log capturing to: %s", logger.Path()))
|
||||||
}
|
}
|
||||||
@@ -888,23 +1031,105 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
|
|||||||
parseCh <- parseResult{message: msg, threadID: tid}
|
parseCh <- parseResult{message: msg, threadID: tid}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var waitErr error
|
var stdoutCloseOnce sync.Once
|
||||||
var forceKillTimer *time.Timer
|
var stdoutDrainCloseOnce sync.Once
|
||||||
|
closeStdout := func(reason string) {
|
||||||
select {
|
var once *sync.Once
|
||||||
case waitErr = <-waitCh:
|
if reason == stdoutCloseReasonDrain {
|
||||||
case <-ctx.Done():
|
once = &stdoutDrainCloseOnce
|
||||||
logErrorFn(cancelReason(ctx))
|
} else {
|
||||||
forceKillTimer = terminateProcess(cmd)
|
once = &stdoutCloseOnce
|
||||||
waitErr = <-waitCh
|
}
|
||||||
|
once.Do(func() {
|
||||||
|
if stdout == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var closeErr error
|
||||||
|
switch c := stdout.(type) {
|
||||||
|
case interface{ CloseWithReason(string) error }:
|
||||||
|
closeErr = c.CloseWithReason(reason)
|
||||||
|
case interface{ CloseWithError(error) error }:
|
||||||
|
closeErr = c.CloseWithError(nil)
|
||||||
|
default:
|
||||||
|
closeErr = stdout.Close()
|
||||||
|
}
|
||||||
|
if closeErr != nil {
|
||||||
|
logWarnFn(fmt.Sprintf("Failed to close stdout pipe: %v", closeErr))
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var waitErr error
|
||||||
|
var forceKillTimer *forceKillTimer
|
||||||
|
|
||||||
|
var parsed parseResult
|
||||||
|
|
||||||
|
var drainTimer *time.Timer
|
||||||
|
var drainTimerCh <-chan time.Time
|
||||||
|
startDrainTimer := func() {
|
||||||
|
if drainTimer != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(stdoutDrainTimeout)
|
||||||
|
drainTimer = timer
|
||||||
|
drainTimerCh = timer.C
|
||||||
|
}
|
||||||
|
stopDrainTimer := func() {
|
||||||
|
if drainTimer == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !drainTimer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-drainTimerCh:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drainTimer = nil
|
||||||
|
drainTimerCh = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
waitDone := false
|
||||||
|
parseDone := false
|
||||||
|
ctxLogged := false
|
||||||
|
|
||||||
|
for !waitDone || !parseDone {
|
||||||
|
select {
|
||||||
|
case waitErr = <-waitCh:
|
||||||
|
waitDone = true
|
||||||
|
waitCh = nil
|
||||||
|
closeStdout(stdoutCloseReasonWait)
|
||||||
|
if !parseDone {
|
||||||
|
startDrainTimer()
|
||||||
|
}
|
||||||
|
case parsed = <-parseCh:
|
||||||
|
parseDone = true
|
||||||
|
parseCh = nil
|
||||||
|
stopDrainTimer()
|
||||||
|
case <-ctx.Done():
|
||||||
|
if !ctxLogged {
|
||||||
|
logErrorFn(cancelReason(ctx))
|
||||||
|
ctxLogged = true
|
||||||
|
if forceKillTimer == nil {
|
||||||
|
forceKillTimer = terminateCommandFn(cmd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
closeStdout(stdoutCloseReasonCtx)
|
||||||
|
if !parseDone {
|
||||||
|
startDrainTimer()
|
||||||
|
}
|
||||||
|
case <-drainTimerCh:
|
||||||
|
logWarnFn("stdout did not drain within 5s; forcing close")
|
||||||
|
closeStdout(stdoutCloseReasonDrain)
|
||||||
|
stopDrainTimer()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stopDrainTimer()
|
||||||
|
|
||||||
if forceKillTimer != nil {
|
if forceKillTimer != nil {
|
||||||
forceKillTimer.Stop()
|
forceKillTimer.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
parsed := <-parseCh
|
|
||||||
|
|
||||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||||
if errors.Is(ctxErr, context.DeadlineExceeded) {
|
if errors.Is(ctxErr, context.DeadlineExceeded) {
|
||||||
result.ExitCode = 124
|
result.ExitCode = 124
|
||||||
@@ -1045,6 +1270,51 @@ func terminateProcess(cmd *exec.Cmd) *time.Timer {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type forceKillTimer struct {
|
||||||
|
timer *time.Timer
|
||||||
|
done chan struct{}
|
||||||
|
stopped atomic.Bool
|
||||||
|
drained atomic.Bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *forceKillTimer) stop() {
|
||||||
|
if t == nil || t.timer == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !t.timer.Stop() {
|
||||||
|
<-t.done
|
||||||
|
t.drained.Store(true)
|
||||||
|
}
|
||||||
|
t.stopped.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func terminateCommand(cmd commandRunner) *forceKillTimer {
|
||||||
|
if cmd == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
proc := cmd.Process()
|
||||||
|
if proc == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
_ = proc.Kill()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = proc.Signal(syscall.SIGTERM)
|
||||||
|
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
timer := time.AfterFunc(time.Duration(forceKillDelay.Load())*time.Second, func() {
|
||||||
|
if p := cmd.Process(); p != nil {
|
||||||
|
_ = p.Kill()
|
||||||
|
}
|
||||||
|
done <- struct{}{}
|
||||||
|
})
|
||||||
|
|
||||||
|
return &forceKillTimer{timer: timer, done: done}
|
||||||
|
}
|
||||||
|
|
||||||
func parseJSONStream(r io.Reader) (message, threadID string) {
|
func parseJSONStream(r io.Reader) (message, threadID string) {
|
||||||
return parseJSONStreamWithLog(r, logWarn, logInfo)
|
return parseJSONStreamWithLog(r, logWarn, logInfo)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,6 +80,8 @@ func parseIntegrationOutput(t *testing.T, out string) integrationOutput {
|
|||||||
currentTask.Error = strings.TrimPrefix(line, "Error: ")
|
currentTask.Error = strings.TrimPrefix(line, "Error: ")
|
||||||
} else if strings.HasPrefix(line, "Session:") {
|
} else if strings.HasPrefix(line, "Session:") {
|
||||||
currentTask.SessionID = strings.TrimPrefix(line, "Session: ")
|
currentTask.SessionID = strings.TrimPrefix(line, "Session: ")
|
||||||
|
} else if strings.HasPrefix(line, "Log:") {
|
||||||
|
currentTask.LogPath = strings.TrimSpace(strings.TrimPrefix(line, "Log:"))
|
||||||
} else if line != "" && !strings.HasPrefix(line, "===") && !strings.HasPrefix(line, "---") {
|
} else if line != "" && !strings.HasPrefix(line, "===") && !strings.HasPrefix(line, "---") {
|
||||||
if currentTask.Message != "" {
|
if currentTask.Message != "" {
|
||||||
currentTask.Message += "\n"
|
currentTask.Message += "\n"
|
||||||
@@ -96,6 +98,32 @@ func parseIntegrationOutput(t *testing.T, out string) integrationOutput {
|
|||||||
return payload
|
return payload
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func extractTaskBlock(t *testing.T, output, taskID string) string {
|
||||||
|
t.Helper()
|
||||||
|
header := fmt.Sprintf("--- Task: %s ---", taskID)
|
||||||
|
lines := strings.Split(output, "\n")
|
||||||
|
var block []string
|
||||||
|
collecting := false
|
||||||
|
for _, raw := range lines {
|
||||||
|
trimmed := strings.TrimSpace(raw)
|
||||||
|
if !collecting {
|
||||||
|
if trimmed == header {
|
||||||
|
collecting = true
|
||||||
|
block = append(block, trimmed)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(trimmed, "--- Task: ") && trimmed != header {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
block = append(block, trimmed)
|
||||||
|
}
|
||||||
|
if len(block) == 0 {
|
||||||
|
t.Fatalf("task block %s not found in output:\n%s", taskID, output)
|
||||||
|
}
|
||||||
|
return strings.Join(block, "\n")
|
||||||
|
}
|
||||||
|
|
||||||
func findResultByID(t *testing.T, payload integrationOutput, id string) TaskResult {
|
func findResultByID(t *testing.T, payload integrationOutput, id string) TaskResult {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
for _, res := range payload.Results {
|
for _, res := range payload.Results {
|
||||||
@@ -256,6 +284,194 @@ b`
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRunParallelOutputsIncludeLogPaths(t *testing.T) {
|
||||||
|
defer resetTestHooks()
|
||||||
|
origRun := runCodexTaskFn
|
||||||
|
t.Cleanup(func() {
|
||||||
|
runCodexTaskFn = origRun
|
||||||
|
resetTestHooks()
|
||||||
|
})
|
||||||
|
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
logPathFor := func(id string) string {
|
||||||
|
return filepath.Join(tempDir, fmt.Sprintf("%s.log", id))
|
||||||
|
}
|
||||||
|
|
||||||
|
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
|
||||||
|
res := TaskResult{
|
||||||
|
TaskID: task.ID,
|
||||||
|
Message: fmt.Sprintf("result-%s", task.ID),
|
||||||
|
SessionID: fmt.Sprintf("session-%s", task.ID),
|
||||||
|
LogPath: logPathFor(task.ID),
|
||||||
|
}
|
||||||
|
if task.ID == "beta" {
|
||||||
|
res.ExitCode = 9
|
||||||
|
res.Error = "boom"
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
input := `---TASK---
|
||||||
|
id: alpha
|
||||||
|
---CONTENT---
|
||||||
|
task-alpha
|
||||||
|
---TASK---
|
||||||
|
id: beta
|
||||||
|
---CONTENT---
|
||||||
|
task-beta`
|
||||||
|
stdinReader = bytes.NewReader([]byte(input))
|
||||||
|
os.Args = []string{"codex-wrapper", "--parallel"}
|
||||||
|
|
||||||
|
var exitCode int
|
||||||
|
output := captureStdout(t, func() {
|
||||||
|
exitCode = run()
|
||||||
|
})
|
||||||
|
|
||||||
|
if exitCode != 9 {
|
||||||
|
t.Fatalf("parallel run exit=%d, want 9", exitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := parseIntegrationOutput(t, output)
|
||||||
|
alpha := findResultByID(t, payload, "alpha")
|
||||||
|
beta := findResultByID(t, payload, "beta")
|
||||||
|
|
||||||
|
if alpha.LogPath != logPathFor("alpha") {
|
||||||
|
t.Fatalf("alpha log path = %q, want %q", alpha.LogPath, logPathFor("alpha"))
|
||||||
|
}
|
||||||
|
if beta.LogPath != logPathFor("beta") {
|
||||||
|
t.Fatalf("beta log path = %q, want %q", beta.LogPath, logPathFor("beta"))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, id := range []string{"alpha", "beta"} {
|
||||||
|
want := fmt.Sprintf("Log: %s", logPathFor(id))
|
||||||
|
if !strings.Contains(output, want) {
|
||||||
|
t.Fatalf("parallel output missing %q for %s:\n%s", want, id, output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunParallelStartupLogsPrinted(t *testing.T) {
|
||||||
|
defer resetTestHooks()
|
||||||
|
|
||||||
|
tempDir := setTempDirEnv(t, t.TempDir())
|
||||||
|
input := `---TASK---
|
||||||
|
id: a
|
||||||
|
---CONTENT---
|
||||||
|
fail
|
||||||
|
---TASK---
|
||||||
|
id: b
|
||||||
|
---CONTENT---
|
||||||
|
ok-b
|
||||||
|
---TASK---
|
||||||
|
id: c
|
||||||
|
dependencies: a
|
||||||
|
---CONTENT---
|
||||||
|
should-skip
|
||||||
|
---TASK---
|
||||||
|
id: d
|
||||||
|
---CONTENT---
|
||||||
|
ok-d`
|
||||||
|
stdinReader = bytes.NewReader([]byte(input))
|
||||||
|
os.Args = []string{"codex-wrapper", "--parallel"}
|
||||||
|
|
||||||
|
expectedLog := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid()))
|
||||||
|
|
||||||
|
origRun := runCodexTaskFn
|
||||||
|
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
|
||||||
|
path := expectedLog
|
||||||
|
if logger := activeLogger(); logger != nil && logger.Path() != "" {
|
||||||
|
path = logger.Path()
|
||||||
|
}
|
||||||
|
if task.ID == "a" {
|
||||||
|
return TaskResult{TaskID: task.ID, ExitCode: 3, Error: "boom", LogPath: path}
|
||||||
|
}
|
||||||
|
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task, LogPath: path}
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { runCodexTaskFn = origRun })
|
||||||
|
|
||||||
|
var exitCode int
|
||||||
|
var stdoutOut string
|
||||||
|
stderrOut := captureStderr(t, func() {
|
||||||
|
stdoutOut = captureStdout(t, func() {
|
||||||
|
exitCode = run()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
if exitCode == 0 {
|
||||||
|
t.Fatalf("expected non-zero exit due to task failure, got %d", exitCode)
|
||||||
|
}
|
||||||
|
if stdoutOut == "" {
|
||||||
|
t.Fatalf("expected parallel summary on stdout")
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := strings.Split(strings.TrimSpace(stderrOut), "\n")
|
||||||
|
var bannerSeen bool
|
||||||
|
var taskLines []string
|
||||||
|
for _, raw := range lines {
|
||||||
|
line := strings.TrimSpace(raw)
|
||||||
|
if line == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if line == "=== Starting Parallel Execution ===" {
|
||||||
|
if bannerSeen {
|
||||||
|
t.Fatalf("banner printed multiple times:\n%s", stderrOut)
|
||||||
|
}
|
||||||
|
bannerSeen = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
taskLines = append(taskLines, line)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bannerSeen {
|
||||||
|
t.Fatalf("expected startup banner in stderr, got:\n%s", stderrOut)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedLines := map[string]struct{}{
|
||||||
|
fmt.Sprintf("Task a: Log: %s", expectedLog): {},
|
||||||
|
fmt.Sprintf("Task b: Log: %s", expectedLog): {},
|
||||||
|
fmt.Sprintf("Task d: Log: %s", expectedLog): {},
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(taskLines) != len(expectedLines) {
|
||||||
|
t.Fatalf("startup log lines mismatch, got %d lines:\n%s", len(taskLines), stderrOut)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range taskLines {
|
||||||
|
if _, ok := expectedLines[line]; !ok {
|
||||||
|
t.Fatalf("unexpected startup line %q\nstderr:\n%s", line, stderrOut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunNonParallelOutputsIncludeLogPathsIntegration(t *testing.T) {
|
||||||
|
defer resetTestHooks()
|
||||||
|
|
||||||
|
tempDir := setTempDirEnv(t, t.TempDir())
|
||||||
|
os.Args = []string{"codex-wrapper", "integration-log-check"}
|
||||||
|
stdinReader = strings.NewReader("")
|
||||||
|
isTerminalFn = func() bool { return true }
|
||||||
|
codexCommand = "echo"
|
||||||
|
buildCodexArgsFn = func(cfg *Config, targetArg string) []string {
|
||||||
|
return []string{`{"type":"thread.started","thread_id":"integration-session"}` + "\n" + `{"type":"item.completed","item":{"type":"agent_message","text":"done"}}`}
|
||||||
|
}
|
||||||
|
|
||||||
|
var exitCode int
|
||||||
|
stderr := captureStderr(t, func() {
|
||||||
|
_ = captureStdout(t, func() {
|
||||||
|
exitCode = run()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
if exitCode != 0 {
|
||||||
|
t.Fatalf("run() exit=%d, want 0", exitCode)
|
||||||
|
}
|
||||||
|
expectedLog := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid()))
|
||||||
|
wantLine := fmt.Sprintf("Log: %s", expectedLog)
|
||||||
|
if !strings.Contains(stderr, wantLine) {
|
||||||
|
t.Fatalf("stderr missing %q, got: %q", wantLine, stderr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRunParallelPartialFailureBlocksDependents(t *testing.T) {
|
func TestRunParallelPartialFailureBlocksDependents(t *testing.T) {
|
||||||
defer resetTestHooks()
|
defer resetTestHooks()
|
||||||
origRun := runCodexTaskFn
|
origRun := runCodexTaskFn
|
||||||
@@ -264,11 +480,17 @@ func TestRunParallelPartialFailureBlocksDependents(t *testing.T) {
|
|||||||
resetTestHooks()
|
resetTestHooks()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
logPathFor := func(id string) string {
|
||||||
|
return filepath.Join(tempDir, fmt.Sprintf("%s.log", id))
|
||||||
|
}
|
||||||
|
|
||||||
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
|
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
|
||||||
|
path := logPathFor(task.ID)
|
||||||
if task.ID == "A" {
|
if task.ID == "A" {
|
||||||
return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom"}
|
return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom", LogPath: path}
|
||||||
}
|
}
|
||||||
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task}
|
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task, LogPath: path}
|
||||||
}
|
}
|
||||||
|
|
||||||
input := `---TASK---
|
input := `---TASK---
|
||||||
@@ -318,6 +540,26 @@ ok-e`
|
|||||||
if payload.Summary.Failed != 2 || payload.Summary.Total != 4 {
|
if payload.Summary.Failed != 2 || payload.Summary.Total != 4 {
|
||||||
t.Fatalf("unexpected summary after partial failure: %+v", payload.Summary)
|
t.Fatalf("unexpected summary after partial failure: %+v", payload.Summary)
|
||||||
}
|
}
|
||||||
|
if resA.LogPath != logPathFor("A") {
|
||||||
|
t.Fatalf("task A log path = %q, want %q", resA.LogPath, logPathFor("A"))
|
||||||
|
}
|
||||||
|
if resB.LogPath != "" {
|
||||||
|
t.Fatalf("task B should not report a log path when skipped, got %q", resB.LogPath)
|
||||||
|
}
|
||||||
|
if resD.LogPath != logPathFor("D") || resE.LogPath != logPathFor("E") {
|
||||||
|
t.Fatalf("expected log paths for D/E, got D=%q E=%q", resD.LogPath, resE.LogPath)
|
||||||
|
}
|
||||||
|
for _, id := range []string{"A", "D", "E"} {
|
||||||
|
block := extractTaskBlock(t, output, id)
|
||||||
|
want := fmt.Sprintf("Log: %s", logPathFor(id))
|
||||||
|
if !strings.Contains(block, want) {
|
||||||
|
t.Fatalf("task %s block missing %q:\n%s", id, want, block)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
blockB := extractTaskBlock(t, output, "B")
|
||||||
|
if strings.Contains(blockB, "Log:") {
|
||||||
|
t.Fatalf("skipped task B should not emit a log line:\n%s", blockB)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRunParallelTimeoutPropagation(t *testing.T) {
|
func TestRunParallelTimeoutPropagation(t *testing.T) {
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user