Compare commits

..

5 Commits

Author SHA1 Message Date
swe-agent[bot]
8f3941adae fix(parallel): 任务启动时立即返回日志文件路径以支持实时调试
修复 --parallel 模式下日志路径在任务完成后才显示的问题,导致长时间运行任务无法实时查看日志进行调试。

主要改进:
- 在 executeConcurrent 中任务启动时立即输出日志路径到 stderr
- 使用 sync.Mutex 保护并发输出,避免多任务输出行交错
- 添加 "=== Starting Parallel Execution ===" banner 标识执行开始
- 扩展 TaskResult 结构体添加 LogPath 字段,确保最终总结仍包含路径
- 统一 parallel 和非 parallel 模式的日志路径输出行为

测试覆盖:
- 总体覆盖率提升至 91.0%
- 核心函数 executeConcurrent 达到 97.8% 覆盖
- 新增集成测试验证启动日志输出、依赖跳过、并发安全等场景

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-09 11:19:25 +08:00
swe-agent[bot]
18c6c32628 fix(test): resolve CI timing race in TestFakeCmdInfra
问题:TestFakeCmdInfra/integration_with_runCodexTask 在 CI 环境中间歇性失败
原因:WaitDelay(1ms) 与 stdout 事件延迟(1ms) 过于接近,在 CI 负载下
     Wait() 可能在第二个 JSON 事件写入前返回,导致过早关闭 stdout

修复:将 WaitDelay 从 1ms 增加到 5ms,确保 Wait() 在所有 stdout
     数据写入后才返回,消除竞态条件

测试:本地和 -race 模式均通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-09 00:34:38 +08:00
ben
1ad2cfe629 Merge pull request #51 from cexll/fix/channel-sync-race-conditions
fix: 修复channel同步竞态条件和死锁问题
2025-12-09 00:13:04 +08:00
swe-agent[bot]
7bad716fbc change codex-wrapper version 2025-12-08 23:45:29 +08:00
swe-agent[bot]
220be6eb5c fix: 修复channel同步竞态条件和死锁问题
修复了4个严重的channel同步问题:

1. **parseCh无条件阻塞** (main.go:894-907)
   - 问题:cmd.Wait()先返回但parseJSONStreamWithLog永久阻塞时,主流程卡死
   - 修复:引入ctxAwareReader和5秒drainTimer机制,Wait完成后立即关闭stdout

2. **context取消失效** (main.go:894-907)
   - 问题:waitCh先完成后不再监听ctx.Done(),取消信号被吞掉
   - 修复:改为双channel循环持续监听waitCh/parseCh/ctx.Done()/drainTimer

3. **parseJSONStreamWithLog无读超时** (main.go:1056-1094)
   - 问题:bufio.Scanner阻塞读取,stdout未主动关闭时永远停在Read
   - 修复:ctxAwareReader支持CloseWithReason,Wait/ctx完成时主动关闭

4. **forceKillTimer生命周期过短**
   - 问题:waitCh返回后立刻停止timer,但stdout可能仍被写入
   - 修复:统一管理timer生命周期,在循环结束后Stop和drain

5. **并发竞态修复**
   - main.go:492 runStartupCleanup使用WaitGroup同步
   - logger.go:176 Flush加锁防止WaitGroup reuse panic

**测试覆盖**:
- 新增4个核心场景测试(Wait先返回、同时返回、Context超时、Parse阻塞)
- main.go覆盖率从28.6%提升到90.32%
- 154个测试全部通过,-race检测无警告

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-08 23:35:55 +08:00
5 changed files with 1553 additions and 34 deletions

View File

@@ -1 +1,5 @@
coverage.out
coverage*.out
cover.out
cover_*.out
coverage.html

View File

@@ -28,6 +28,7 @@ type Logger struct {
closeOnce sync.Once
workerWG sync.WaitGroup
pendingWG sync.WaitGroup
flushMu sync.Mutex
}
type logEntry struct {
@@ -46,12 +47,12 @@ type CleanupStats struct {
}
var (
processRunningCheck = isProcessRunning
processStartTimeFn = getProcessStartTime
removeLogFileFn = os.Remove
globLogFiles = filepath.Glob
fileStatFn = os.Lstat // Use Lstat to detect symlinks
evalSymlinksFn = filepath.EvalSymlinks
processRunningCheck = isProcessRunning
processStartTimeFn = getProcessStartTime
removeLogFileFn = os.Remove
globLogFiles = filepath.Glob
fileStatFn = os.Lstat // Use Lstat to detect symlinks
evalSymlinksFn = filepath.EvalSymlinks
)
// NewLogger creates the async logger and starts the worker goroutine.
@@ -176,6 +177,9 @@ func (l *Logger) Flush() {
return
}
l.flushMu.Lock()
defer l.flushMu.Unlock()
// Wait for pending entries with timeout
done := make(chan struct{})
go func() {
@@ -221,7 +225,9 @@ func (l *Logger) log(level, msg string) {
}
entry := logEntry{level: level, msg: msg}
l.flushMu.Lock()
l.pendingWG.Add(1)
l.flushMu.Unlock()
select {
case l.ch <- entry:

View File

@@ -22,12 +22,19 @@ import (
)
const (
version = "4.8.2"
version = "5.1.2"
defaultWorkdir = "."
defaultTimeout = 7200 // seconds
codexLogLineLimit = 1000
stdinSpecialChars = "\n\\\"'`$"
stderrCaptureLimit = 4 * 1024
stdoutDrainTimeout = 5 * time.Second
)
const (
stdoutCloseReasonWait = "wait-complete"
stdoutCloseReasonCtx = "context-cancelled"
stdoutCloseReasonDrain = "drain-timeout"
)
// Test hooks for dependency injection
@@ -40,10 +47,14 @@ var (
buildCodexArgsFn = buildCodexArgs
commandContext = exec.CommandContext
jsonMarshal = json.Marshal
cleanupLogsFn = cleanupOldLogs
signalNotifyFn = signal.Notify
signalStopFn = signal.Stop
newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
return &realCmd{cmd: commandContext(ctx, name, args...)}
}
jsonMarshal = json.Marshal
cleanupLogsFn = cleanupOldLogs
signalNotifyFn = signal.Notify
signalStopFn = signal.Stop
terminateCommandFn = terminateCommand
)
var forceKillDelay atomic.Int32
@@ -52,6 +63,77 @@ func init() {
forceKillDelay.Store(5) // seconds - default value
}
type commandRunner interface {
Start() error
Wait() error
StdoutPipe() (io.ReadCloser, error)
StdinPipe() (io.WriteCloser, error)
SetStderr(io.Writer)
Process() processHandle
}
type processHandle interface {
Pid() int
Kill() error
Signal(os.Signal) error
}
type realCmd struct {
cmd *exec.Cmd
}
func (r *realCmd) Start() error {
return r.cmd.Start()
}
func (r *realCmd) Wait() error {
return r.cmd.Wait()
}
func (r *realCmd) StdoutPipe() (io.ReadCloser, error) {
return r.cmd.StdoutPipe()
}
func (r *realCmd) StdinPipe() (io.WriteCloser, error) {
return r.cmd.StdinPipe()
}
func (r *realCmd) SetStderr(w io.Writer) {
r.cmd.Stderr = w
}
func (r *realCmd) Process() processHandle {
if r.cmd == nil || r.cmd.Process == nil {
return nil
}
return &realProcess{proc: r.cmd.Process}
}
type realProcess struct {
proc *os.Process
}
func (p *realProcess) Pid() int {
if p == nil || p.proc == nil {
return 0
}
return p.proc.Pid
}
func (p *realProcess) Kill() error {
if p == nil || p.proc == nil {
return nil
}
return p.proc.Kill()
}
func (p *realProcess) Signal(sig os.Signal) error {
if p == nil || p.proc == nil {
return nil
}
return p.proc.Signal(sig)
}
// Config holds CLI configuration
type Config struct {
Mode string // "new" or "resume"
@@ -85,6 +167,7 @@ type TaskResult struct {
Message string `json:"message"`
SessionID string `json:"session_id"`
Error string `json:"error"`
LogPath string `json:"log_path"`
}
func parseParallelConfig(data []byte) (*ParallelConfig, error) {
@@ -254,6 +337,27 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult {
failed := make(map[string]TaskResult, totalTasks)
resultsCh := make(chan TaskResult, totalTasks)
var startPrintMu sync.Mutex
bannerPrinted := false
printTaskStart := func(taskID string) {
logger := activeLogger()
if logger == nil {
return
}
path := logger.Path()
if path == "" {
return
}
startPrintMu.Lock()
if !bannerPrinted {
fmt.Fprintln(os.Stderr, "=== Starting Parallel Execution ===")
bannerPrinted = true
}
fmt.Fprintf(os.Stderr, "Task %s: Log: %s\n", taskID, path)
startPrintMu.Unlock()
}
for _, layer := range layers {
var wg sync.WaitGroup
executed := 0
@@ -275,6 +379,7 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult {
resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)}
}
}()
printTaskStart(ts.ID)
resultsCh <- runCodexTaskFn(ts, timeout)
}(task)
}
@@ -340,6 +445,9 @@ func generateFinalOutput(results []TaskResult) string {
if res.SessionID != "" {
sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID))
}
if res.LogPath != "" {
sb.WriteString(fmt.Sprintf("Log: %s\n", res.LogPath))
}
if res.Message != "" {
sb.WriteString(fmt.Sprintf("\n%s\n", res.Message))
}
@@ -383,6 +491,8 @@ func runStartupCleanup() {
// run is the main logic, returns exit code for testability
func run() (exitCode int) {
var startupCleanupWG sync.WaitGroup
// Handle --version and --help first (no logger needed)
if len(os.Args) > 1 {
switch os.Args[1] {
@@ -421,9 +531,16 @@ func run() (exitCode int) {
}
}()
defer runCleanupHook()
defer startupCleanupWG.Wait()
// Run cleanup asynchronously to avoid blocking startup
go runStartupCleanup()
// Run cleanup asynchronously to avoid blocking startup but wait before exit
if cleanupLogsFn != nil {
startupCleanupWG.Add(1)
go func() {
defer startupCleanupWG.Done()
runStartupCleanup()
}()
}
// Handle remaining commands
if len(os.Args) > 1 {
@@ -525,7 +642,20 @@ func run() (exitCode int) {
fmt.Fprintf(os.Stderr, "[codex-wrapper]\n")
fmt.Fprintf(os.Stderr, " Command: %s %s\n", codexCommand, strings.Join(codexArgs, " "))
fmt.Fprintf(os.Stderr, " PID: %d\n", os.Getpid())
fmt.Fprintf(os.Stderr, " Log: %s\n", logger.Path())
logPath := ""
if logger != nil {
logPath = logger.Path()
}
fmt.Fprintf(os.Stderr, " Log: %s\n", logPath)
printedLogPath := logPath
printLogPath := func(path string) {
if path == "" || path == printedLogPath {
return
}
fmt.Fprintf(os.Stderr, "[codex-wrapper]\n")
fmt.Fprintf(os.Stderr, " Log: %s\n", path)
printedLogPath = path
}
if useStdin {
var reasons []string
@@ -572,6 +702,7 @@ func run() (exitCode int) {
}
result := runCodexTask(taskSpec, false, cfg.Timeout)
printLogPath(result.LogPath)
if result.ExitCode != 0 {
return result.ExitCode
@@ -710,8 +841,8 @@ func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText str
return res.Message, res.SessionID, res.ExitCode
}
func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult {
result := TaskResult{TaskID: taskSpec.ID}
func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) (result TaskResult) {
result.TaskID = taskSpec.ID
cfg := &Config{
Mode: taskSpec.Mode,
@@ -746,6 +877,15 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
return fmt.Sprintf("[Task: %s] %s", taskSpec.ID, msg)
}
captureLogPath := func() {
if result.LogPath != "" {
return
}
if logger := activeLogger(); logger != nil {
result.LogPath = logger.Path()
}
}
var logInfoFn func(string)
var logWarnFn func(string)
var logErrorFn func(string)
@@ -786,6 +926,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
}
}
defer func() {
captureLogPath()
if tempLogger != nil {
closeLogger()
}
@@ -810,7 +951,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String())
}
cmd := commandContext(ctx, codexCommand, codexArgs...)
cmd := newCommandRunner(ctx, codexCommand, codexArgs...)
stderrWriters := []io.Writer{stderrBuf}
if stderrLogger != nil {
@@ -820,9 +961,9 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
stderrWriters = append([]io.Writer{os.Stderr}, stderrWriters...)
}
if len(stderrWriters) == 1 {
cmd.Stderr = stderrWriters[0]
cmd.SetStderr(stderrWriters[0])
} else {
cmd.Stderr = io.MultiWriter(stderrWriters...)
cmd.SetStderr(io.MultiWriter(stderrWriters...))
}
var stdinPipe io.WriteCloser
@@ -865,7 +1006,9 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
return result
}
logInfoFn(fmt.Sprintf("Starting codex with PID: %d", cmd.Process.Pid))
if proc := cmd.Process(); proc != nil {
logInfoFn(fmt.Sprintf("Starting codex with PID: %d", proc.Pid()))
}
if logger := activeLogger(); logger != nil {
logInfoFn(fmt.Sprintf("Log capturing to: %s", logger.Path()))
}
@@ -888,23 +1031,105 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo
parseCh <- parseResult{message: msg, threadID: tid}
}()
var waitErr error
var forceKillTimer *time.Timer
select {
case waitErr = <-waitCh:
case <-ctx.Done():
logErrorFn(cancelReason(ctx))
forceKillTimer = terminateProcess(cmd)
waitErr = <-waitCh
var stdoutCloseOnce sync.Once
var stdoutDrainCloseOnce sync.Once
closeStdout := func(reason string) {
var once *sync.Once
if reason == stdoutCloseReasonDrain {
once = &stdoutDrainCloseOnce
} else {
once = &stdoutCloseOnce
}
once.Do(func() {
if stdout == nil {
return
}
var closeErr error
switch c := stdout.(type) {
case interface{ CloseWithReason(string) error }:
closeErr = c.CloseWithReason(reason)
case interface{ CloseWithError(error) error }:
closeErr = c.CloseWithError(nil)
default:
closeErr = stdout.Close()
}
if closeErr != nil {
logWarnFn(fmt.Sprintf("Failed to close stdout pipe: %v", closeErr))
}
})
}
var waitErr error
var forceKillTimer *forceKillTimer
var parsed parseResult
var drainTimer *time.Timer
var drainTimerCh <-chan time.Time
startDrainTimer := func() {
if drainTimer != nil {
return
}
timer := time.NewTimer(stdoutDrainTimeout)
drainTimer = timer
drainTimerCh = timer.C
}
stopDrainTimer := func() {
if drainTimer == nil {
return
}
if !drainTimer.Stop() {
select {
case <-drainTimerCh:
default:
}
}
drainTimer = nil
drainTimerCh = nil
}
waitDone := false
parseDone := false
ctxLogged := false
for !waitDone || !parseDone {
select {
case waitErr = <-waitCh:
waitDone = true
waitCh = nil
closeStdout(stdoutCloseReasonWait)
if !parseDone {
startDrainTimer()
}
case parsed = <-parseCh:
parseDone = true
parseCh = nil
stopDrainTimer()
case <-ctx.Done():
if !ctxLogged {
logErrorFn(cancelReason(ctx))
ctxLogged = true
if forceKillTimer == nil {
forceKillTimer = terminateCommandFn(cmd)
}
}
closeStdout(stdoutCloseReasonCtx)
if !parseDone {
startDrainTimer()
}
case <-drainTimerCh:
logWarnFn("stdout did not drain within 5s; forcing close")
closeStdout(stdoutCloseReasonDrain)
stopDrainTimer()
}
}
stopDrainTimer()
if forceKillTimer != nil {
forceKillTimer.Stop()
forceKillTimer.stop()
}
parsed := <-parseCh
if ctxErr := ctx.Err(); ctxErr != nil {
if errors.Is(ctxErr, context.DeadlineExceeded) {
result.ExitCode = 124
@@ -1045,6 +1270,51 @@ func terminateProcess(cmd *exec.Cmd) *time.Timer {
})
}
type forceKillTimer struct {
timer *time.Timer
done chan struct{}
stopped atomic.Bool
drained atomic.Bool
}
func (t *forceKillTimer) stop() {
if t == nil || t.timer == nil {
return
}
if !t.timer.Stop() {
<-t.done
t.drained.Store(true)
}
t.stopped.Store(true)
}
func terminateCommand(cmd commandRunner) *forceKillTimer {
if cmd == nil {
return nil
}
proc := cmd.Process()
if proc == nil {
return nil
}
if runtime.GOOS == "windows" {
_ = proc.Kill()
return nil
}
_ = proc.Signal(syscall.SIGTERM)
done := make(chan struct{}, 1)
timer := time.AfterFunc(time.Duration(forceKillDelay.Load())*time.Second, func() {
if p := cmd.Process(); p != nil {
_ = p.Kill()
}
done <- struct{}{}
})
return &forceKillTimer{timer: timer, done: done}
}
func parseJSONStream(r io.Reader) (message, threadID string) {
return parseJSONStreamWithLog(r, logWarn, logInfo)
}

View File

@@ -80,6 +80,8 @@ func parseIntegrationOutput(t *testing.T, out string) integrationOutput {
currentTask.Error = strings.TrimPrefix(line, "Error: ")
} else if strings.HasPrefix(line, "Session:") {
currentTask.SessionID = strings.TrimPrefix(line, "Session: ")
} else if strings.HasPrefix(line, "Log:") {
currentTask.LogPath = strings.TrimSpace(strings.TrimPrefix(line, "Log:"))
} else if line != "" && !strings.HasPrefix(line, "===") && !strings.HasPrefix(line, "---") {
if currentTask.Message != "" {
currentTask.Message += "\n"
@@ -96,6 +98,32 @@ func parseIntegrationOutput(t *testing.T, out string) integrationOutput {
return payload
}
func extractTaskBlock(t *testing.T, output, taskID string) string {
t.Helper()
header := fmt.Sprintf("--- Task: %s ---", taskID)
lines := strings.Split(output, "\n")
var block []string
collecting := false
for _, raw := range lines {
trimmed := strings.TrimSpace(raw)
if !collecting {
if trimmed == header {
collecting = true
block = append(block, trimmed)
}
continue
}
if strings.HasPrefix(trimmed, "--- Task: ") && trimmed != header {
break
}
block = append(block, trimmed)
}
if len(block) == 0 {
t.Fatalf("task block %s not found in output:\n%s", taskID, output)
}
return strings.Join(block, "\n")
}
func findResultByID(t *testing.T, payload integrationOutput, id string) TaskResult {
t.Helper()
for _, res := range payload.Results {
@@ -256,6 +284,194 @@ b`
}
}
func TestRunParallelOutputsIncludeLogPaths(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
t.Cleanup(func() {
runCodexTaskFn = origRun
resetTestHooks()
})
tempDir := t.TempDir()
logPathFor := func(id string) string {
return filepath.Join(tempDir, fmt.Sprintf("%s.log", id))
}
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
res := TaskResult{
TaskID: task.ID,
Message: fmt.Sprintf("result-%s", task.ID),
SessionID: fmt.Sprintf("session-%s", task.ID),
LogPath: logPathFor(task.ID),
}
if task.ID == "beta" {
res.ExitCode = 9
res.Error = "boom"
}
return res
}
input := `---TASK---
id: alpha
---CONTENT---
task-alpha
---TASK---
id: beta
---CONTENT---
task-beta`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
var exitCode int
output := captureStdout(t, func() {
exitCode = run()
})
if exitCode != 9 {
t.Fatalf("parallel run exit=%d, want 9", exitCode)
}
payload := parseIntegrationOutput(t, output)
alpha := findResultByID(t, payload, "alpha")
beta := findResultByID(t, payload, "beta")
if alpha.LogPath != logPathFor("alpha") {
t.Fatalf("alpha log path = %q, want %q", alpha.LogPath, logPathFor("alpha"))
}
if beta.LogPath != logPathFor("beta") {
t.Fatalf("beta log path = %q, want %q", beta.LogPath, logPathFor("beta"))
}
for _, id := range []string{"alpha", "beta"} {
want := fmt.Sprintf("Log: %s", logPathFor(id))
if !strings.Contains(output, want) {
t.Fatalf("parallel output missing %q for %s:\n%s", want, id, output)
}
}
}
func TestRunParallelStartupLogsPrinted(t *testing.T) {
defer resetTestHooks()
tempDir := setTempDirEnv(t, t.TempDir())
input := `---TASK---
id: a
---CONTENT---
fail
---TASK---
id: b
---CONTENT---
ok-b
---TASK---
id: c
dependencies: a
---CONTENT---
should-skip
---TASK---
id: d
---CONTENT---
ok-d`
stdinReader = bytes.NewReader([]byte(input))
os.Args = []string{"codex-wrapper", "--parallel"}
expectedLog := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid()))
origRun := runCodexTaskFn
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
path := expectedLog
if logger := activeLogger(); logger != nil && logger.Path() != "" {
path = logger.Path()
}
if task.ID == "a" {
return TaskResult{TaskID: task.ID, ExitCode: 3, Error: "boom", LogPath: path}
}
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task, LogPath: path}
}
t.Cleanup(func() { runCodexTaskFn = origRun })
var exitCode int
var stdoutOut string
stderrOut := captureStderr(t, func() {
stdoutOut = captureStdout(t, func() {
exitCode = run()
})
})
if exitCode == 0 {
t.Fatalf("expected non-zero exit due to task failure, got %d", exitCode)
}
if stdoutOut == "" {
t.Fatalf("expected parallel summary on stdout")
}
lines := strings.Split(strings.TrimSpace(stderrOut), "\n")
var bannerSeen bool
var taskLines []string
for _, raw := range lines {
line := strings.TrimSpace(raw)
if line == "" {
continue
}
if line == "=== Starting Parallel Execution ===" {
if bannerSeen {
t.Fatalf("banner printed multiple times:\n%s", stderrOut)
}
bannerSeen = true
continue
}
taskLines = append(taskLines, line)
}
if !bannerSeen {
t.Fatalf("expected startup banner in stderr, got:\n%s", stderrOut)
}
expectedLines := map[string]struct{}{
fmt.Sprintf("Task a: Log: %s", expectedLog): {},
fmt.Sprintf("Task b: Log: %s", expectedLog): {},
fmt.Sprintf("Task d: Log: %s", expectedLog): {},
}
if len(taskLines) != len(expectedLines) {
t.Fatalf("startup log lines mismatch, got %d lines:\n%s", len(taskLines), stderrOut)
}
for _, line := range taskLines {
if _, ok := expectedLines[line]; !ok {
t.Fatalf("unexpected startup line %q\nstderr:\n%s", line, stderrOut)
}
}
}
func TestRunNonParallelOutputsIncludeLogPathsIntegration(t *testing.T) {
defer resetTestHooks()
tempDir := setTempDirEnv(t, t.TempDir())
os.Args = []string{"codex-wrapper", "integration-log-check"}
stdinReader = strings.NewReader("")
isTerminalFn = func() bool { return true }
codexCommand = "echo"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string {
return []string{`{"type":"thread.started","thread_id":"integration-session"}` + "\n" + `{"type":"item.completed","item":{"type":"agent_message","text":"done"}}`}
}
var exitCode int
stderr := captureStderr(t, func() {
_ = captureStdout(t, func() {
exitCode = run()
})
})
if exitCode != 0 {
t.Fatalf("run() exit=%d, want 0", exitCode)
}
expectedLog := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid()))
wantLine := fmt.Sprintf("Log: %s", expectedLog)
if !strings.Contains(stderr, wantLine) {
t.Fatalf("stderr missing %q, got: %q", wantLine, stderr)
}
}
func TestRunParallelPartialFailureBlocksDependents(t *testing.T) {
defer resetTestHooks()
origRun := runCodexTaskFn
@@ -264,11 +480,17 @@ func TestRunParallelPartialFailureBlocksDependents(t *testing.T) {
resetTestHooks()
})
tempDir := t.TempDir()
logPathFor := func(id string) string {
return filepath.Join(tempDir, fmt.Sprintf("%s.log", id))
}
runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult {
path := logPathFor(task.ID)
if task.ID == "A" {
return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom"}
return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom", LogPath: path}
}
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task}
return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task, LogPath: path}
}
input := `---TASK---
@@ -318,6 +540,26 @@ ok-e`
if payload.Summary.Failed != 2 || payload.Summary.Total != 4 {
t.Fatalf("unexpected summary after partial failure: %+v", payload.Summary)
}
if resA.LogPath != logPathFor("A") {
t.Fatalf("task A log path = %q, want %q", resA.LogPath, logPathFor("A"))
}
if resB.LogPath != "" {
t.Fatalf("task B should not report a log path when skipped, got %q", resB.LogPath)
}
if resD.LogPath != logPathFor("D") || resE.LogPath != logPathFor("E") {
t.Fatalf("expected log paths for D/E, got D=%q E=%q", resD.LogPath, resE.LogPath)
}
for _, id := range []string{"A", "D", "E"} {
block := extractTaskBlock(t, output, id)
want := fmt.Sprintf("Log: %s", logPathFor(id))
if !strings.Contains(block, want) {
t.Fatalf("task %s block missing %q:\n%s", id, want, block)
}
}
blockB := extractTaskBlock(t, output, "B")
if strings.Contains(blockB, "Log:") {
t.Fatalf("skipped task B should not emit a log line:\n%s", blockB)
}
}
func TestRunParallelTimeoutPropagation(t *testing.T) {

File diff suppressed because it is too large Load Diff