fix: 修复channel同步竞态条件和死锁问题

修复了4个严重的channel同步问题:

1. **parseCh无条件阻塞** (main.go:894-907)
   - 问题:cmd.Wait()先返回但parseJSONStreamWithLog永久阻塞时,主流程卡死
   - 修复:引入ctxAwareReader和5秒drainTimer机制,Wait完成后立即关闭stdout

2. **context取消失效** (main.go:894-907)
   - 问题:waitCh先完成后不再监听ctx.Done(),取消信号被吞掉
   - 修复:改为双channel循环持续监听waitCh/parseCh/ctx.Done()/drainTimer

3. **parseJSONStreamWithLog无读超时** (main.go:1056-1094)
   - 问题:bufio.Scanner阻塞读取,stdout未主动关闭时永远停在Read
   - 修复:ctxAwareReader支持CloseWithReason,Wait/ctx完成时主动关闭

4. **forceKillTimer生命周期过短**
   - 问题:waitCh返回后立刻停止timer,但stdout可能仍被写入
   - 修复:统一管理timer生命周期,在循环结束后Stop和drain

5. **并发竞态修复**
   - main.go:492 runStartupCleanup使用WaitGroup同步
   - logger.go:176 Flush加锁防止WaitGroup reuse panic

**测试覆盖**:
- 新增4个核心场景测试(Wait先返回、同时返回、Context超时、Parse阻塞)
- main.go覆盖率从28.6%提升到90.32%
- 154个测试全部通过,-race检测无警告

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
swe-agent[bot]
2025-12-08 23:35:55 +08:00
parent ead11d6996
commit 220be6eb5c
4 changed files with 975 additions and 28 deletions

View File

@@ -32,6 +32,9 @@ func resetTestHooks() {
signalStopFn = signal.Stop
buildCodexArgsFn = buildCodexArgs
commandContext = exec.CommandContext
newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
return &realCmd{cmd: commandContext(ctx, name, args...)}
}
jsonMarshal = json.Marshal
forceKillDelay.Store(5)
closeLogger()
@@ -103,6 +106,430 @@ func captureStderr(t *testing.T, fn func()) string {
return buf.String()
}
type ctxAwareReader struct {
reader io.ReadCloser
mu sync.Mutex
reason string
closed bool
}
func newCtxAwareReader(r io.ReadCloser) *ctxAwareReader {
return &ctxAwareReader{reader: r}
}
func (r *ctxAwareReader) Read(p []byte) (int, error) {
if r.reader == nil {
return 0, io.EOF
}
return r.reader.Read(p)
}
func (r *ctxAwareReader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.closed || r.reader == nil {
r.closed = true
return nil
}
r.closed = true
return r.reader.Close()
}
func (r *ctxAwareReader) CloseWithReason(reason string) error {
r.mu.Lock()
if !r.closed {
r.reason = reason
}
r.mu.Unlock()
return r.Close()
}
func (r *ctxAwareReader) Reason() string {
r.mu.Lock()
defer r.mu.Unlock()
return r.reason
}
type drainBlockingStdout struct {
inner *ctxAwareReader
}
func newDrainBlockingStdout(inner *ctxAwareReader) *drainBlockingStdout {
return &drainBlockingStdout{inner: inner}
}
func (d *drainBlockingStdout) Read(p []byte) (int, error) {
return d.inner.Read(p)
}
func (d *drainBlockingStdout) Close() error {
return d.inner.Close()
}
func (d *drainBlockingStdout) CloseWithReason(reason string) error {
if reason != stdoutCloseReasonDrain {
return nil
}
return d.inner.CloseWithReason(reason)
}
type drainBlockingCmd struct {
inner *fakeCmd
injected atomic.Bool
}
func newDrainBlockingCmd(inner *fakeCmd) *drainBlockingCmd {
return &drainBlockingCmd{inner: inner}
}
func (d *drainBlockingCmd) Start() error {
return d.inner.Start()
}
func (d *drainBlockingCmd) Wait() error {
return d.inner.Wait()
}
func (d *drainBlockingCmd) StdoutPipe() (io.ReadCloser, error) {
stdout, err := d.inner.StdoutPipe()
if err != nil {
return nil, err
}
ctxReader, ok := stdout.(*ctxAwareReader)
if !ok {
return stdout, nil
}
d.injected.Store(true)
return newDrainBlockingStdout(ctxReader), nil
}
func (d *drainBlockingCmd) StdinPipe() (io.WriteCloser, error) {
return d.inner.StdinPipe()
}
func (d *drainBlockingCmd) SetStderr(w io.Writer) {
d.inner.SetStderr(w)
}
func (d *drainBlockingCmd) Process() processHandle {
return d.inner.Process()
}
type bufferWriteCloser struct {
buf bytes.Buffer
mu sync.Mutex
closed bool
}
func newBufferWriteCloser() *bufferWriteCloser {
return &bufferWriteCloser{}
}
func (b *bufferWriteCloser) Write(p []byte) (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return 0, io.ErrClosedPipe
}
return b.buf.Write(p)
}
func (b *bufferWriteCloser) Close() error {
b.mu.Lock()
b.closed = true
b.mu.Unlock()
return nil
}
func (b *bufferWriteCloser) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.String()
}
type fakeProcess struct {
pid int
killed atomic.Bool
mu sync.Mutex
signals []os.Signal
signalCount atomic.Int32
killCount atomic.Int32
onSignal func(os.Signal)
onKill func()
}
func newFakeProcess(pid int) *fakeProcess {
if pid == 0 {
pid = 4242
}
return &fakeProcess{pid: pid}
}
func (p *fakeProcess) Pid() int {
return p.pid
}
func (p *fakeProcess) Kill() error {
p.killed.Store(true)
p.killCount.Add(1)
if p.onKill != nil {
p.onKill()
}
return nil
}
func (p *fakeProcess) Signal(sig os.Signal) error {
p.mu.Lock()
p.signals = append(p.signals, sig)
p.mu.Unlock()
p.signalCount.Add(1)
if p.onSignal != nil {
p.onSignal(sig)
}
return nil
}
func (p *fakeProcess) Signals() []os.Signal {
p.mu.Lock()
defer p.mu.Unlock()
cp := make([]os.Signal, len(p.signals))
copy(cp, p.signals)
return cp
}
func (p *fakeProcess) Killed() bool {
return p.killed.Load()
}
func (p *fakeProcess) SignalCount() int {
return int(p.signalCount.Load())
}
func (p *fakeProcess) KillCount() int {
return int(p.killCount.Load())
}
type fakeStdoutEvent struct {
Delay time.Duration
Data string
}
type fakeCmdConfig struct {
StdoutPlan []fakeStdoutEvent
WaitDelay time.Duration
WaitErr error
StartErr error
PID int
KeepStdoutOpen bool
BlockWait bool
ReleaseWaitOnKill bool
ReleaseWaitOnSignal bool
}
type fakeCmd struct {
mu sync.Mutex
stdout *ctxAwareReader
stdoutWriter *io.PipeWriter
stdoutPlan []fakeStdoutEvent
stdoutOnce sync.Once
stdoutClaim bool
keepStdoutOpen bool
stdoutWriteMu sync.Mutex
stdinWriter *bufferWriteCloser
stdinClaim bool
stderr io.Writer
waitDelay time.Duration
waitErr error
startErr error
waitOnce sync.Once
waitDone chan struct{}
waitResult error
waitReleaseCh chan struct{}
waitReleaseOnce sync.Once
waitBlocked bool
started bool
startCount atomic.Int32
waitCount atomic.Int32
stdoutPipeCount atomic.Int32
process *fakeProcess
}
func newFakeCmd(cfg fakeCmdConfig) *fakeCmd {
r, w := io.Pipe()
cmd := &fakeCmd{
stdout: newCtxAwareReader(r),
stdoutWriter: w,
stdoutPlan: append([]fakeStdoutEvent(nil), cfg.StdoutPlan...),
stdinWriter: newBufferWriteCloser(),
waitDelay: cfg.WaitDelay,
waitErr: cfg.WaitErr,
startErr: cfg.StartErr,
waitDone: make(chan struct{}),
keepStdoutOpen: cfg.KeepStdoutOpen,
process: newFakeProcess(cfg.PID),
}
if len(cmd.stdoutPlan) == 0 {
cmd.stdoutPlan = nil
}
if cfg.BlockWait {
cmd.waitBlocked = true
cmd.waitReleaseCh = make(chan struct{})
releaseOnSignal := cfg.ReleaseWaitOnSignal
releaseOnKill := cfg.ReleaseWaitOnKill
if !releaseOnSignal && !releaseOnKill {
releaseOnKill = true
}
cmd.process.onSignal = func(os.Signal) {
if releaseOnSignal {
cmd.releaseWait()
}
}
cmd.process.onKill = func() {
if releaseOnKill {
cmd.releaseWait()
}
}
}
return cmd
}
func (f *fakeCmd) Start() error {
f.mu.Lock()
if f.started {
f.mu.Unlock()
return errors.New("start already called")
}
f.started = true
f.mu.Unlock()
f.startCount.Add(1)
if f.startErr != nil {
f.waitOnce.Do(func() {
f.waitResult = f.startErr
close(f.waitDone)
})
return f.startErr
}
go f.runStdoutScript()
return nil
}
func (f *fakeCmd) Wait() error {
f.waitCount.Add(1)
f.waitOnce.Do(func() {
if f.waitBlocked && f.waitReleaseCh != nil {
<-f.waitReleaseCh
} else if f.waitDelay > 0 {
time.Sleep(f.waitDelay)
}
f.waitResult = f.waitErr
close(f.waitDone)
})
<-f.waitDone
return f.waitResult
}
func (f *fakeCmd) StdoutPipe() (io.ReadCloser, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.stdoutClaim {
return nil, errors.New("stdout pipe already claimed")
}
f.stdoutClaim = true
f.stdoutPipeCount.Add(1)
return f.stdout, nil
}
func (f *fakeCmd) StdinPipe() (io.WriteCloser, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.stdinClaim {
return nil, errors.New("stdin pipe already claimed")
}
f.stdinClaim = true
return f.stdinWriter, nil
}
func (f *fakeCmd) SetStderr(w io.Writer) {
f.stderr = w
}
func (f *fakeCmd) Process() processHandle {
if f == nil {
return nil
}
return f.process
}
func (f *fakeCmd) runStdoutScript() {
if len(f.stdoutPlan) == 0 {
if !f.keepStdoutOpen {
f.CloseStdout(nil)
}
return
}
for _, ev := range f.stdoutPlan {
if ev.Delay > 0 {
time.Sleep(ev.Delay)
}
f.WriteStdout(ev.Data)
}
if !f.keepStdoutOpen {
f.CloseStdout(nil)
}
}
func (f *fakeCmd) releaseWait() {
if f.waitReleaseCh == nil {
return
}
f.waitReleaseOnce.Do(func() {
close(f.waitReleaseCh)
})
}
func (f *fakeCmd) WriteStdout(data string) {
if data == "" {
return
}
f.stdoutWriteMu.Lock()
defer f.stdoutWriteMu.Unlock()
if f.stdoutWriter != nil {
_, _ = io.WriteString(f.stdoutWriter, data)
}
}
func (f *fakeCmd) CloseStdout(err error) {
f.stdoutOnce.Do(func() {
if f.stdoutWriter == nil {
return
}
if err != nil {
_ = f.stdoutWriter.CloseWithError(err)
return
}
_ = f.stdoutWriter.Close()
})
}
func (f *fakeCmd) StdinContents() string {
if f.stdinWriter == nil {
return ""
}
return f.stdinWriter.String()
}
func createFakeCodexScript(t *testing.T, threadID, message string) string {
t.Helper()
scriptPath := filepath.Join(t.TempDir(), "codex.sh")
@@ -116,6 +543,296 @@ printf '%%s\n' '{"type":"item.completed","item":{"type":"agent_message","text":"
return scriptPath
}
func TestFakeCmdInfra(t *testing.T) {
t.Run("pipes and wait scheduling", func(t *testing.T) {
fake := newFakeCmd(fakeCmdConfig{
StdoutPlan: []fakeStdoutEvent{
{Data: "line1\n"},
{Delay: 5 * time.Millisecond, Data: "line2\n"},
},
WaitDelay: 20 * time.Millisecond,
})
stdout, err := fake.StdoutPipe()
if err != nil {
t.Fatalf("StdoutPipe() error = %v", err)
}
if err := fake.Start(); err != nil {
t.Fatalf("Start() error = %v", err)
}
scanner := bufio.NewScanner(stdout)
var lines []string
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) == 2 {
break
}
}
if err := scanner.Err(); err != nil {
t.Fatalf("scanner error: %v", err)
}
if len(lines) != 2 || lines[0] != "line1" || lines[1] != "line2" {
t.Fatalf("unexpected stdout lines: %v", lines)
}
ctxReader, ok := stdout.(*ctxAwareReader)
if !ok {
t.Fatalf("stdout pipe is %T, want *ctxAwareReader", stdout)
}
if err := ctxReader.CloseWithReason("test-complete"); err != nil {
t.Fatalf("CloseWithReason error: %v", err)
}
if ctxReader.Reason() != "test-complete" {
t.Fatalf("CloseWithReason reason mismatch: %q", ctxReader.Reason())
}
waitStart := time.Now()
if err := fake.Wait(); err != nil {
t.Fatalf("Wait() error = %v", err)
}
if elapsed := time.Since(waitStart); elapsed < 20*time.Millisecond {
t.Fatalf("Wait() returned too early: %v", elapsed)
}
if fake.startCount.Load() != 1 {
t.Fatalf("Start() count = %d, want 1", fake.startCount.Load())
}
if fake.waitCount.Load() != 1 {
t.Fatalf("Wait() count = %d, want 1", fake.waitCount.Load())
}
if fake.stdoutPipeCount.Load() != 1 {
t.Fatalf("StdoutPipe() count = %d, want 1", fake.stdoutPipeCount.Load())
}
})
t.Run("integration with runCodexTask", func(t *testing.T) {
defer resetTestHooks()
fake := newFakeCmd(fakeCmdConfig{
StdoutPlan: []fakeStdoutEvent{
{Data: `{"type":"thread.started","thread_id":"fake-thread"}` + "\n"},
{
Delay: time.Millisecond,
Data: `{"type":"item.completed","item":{"type":"agent_message","text":"fake-msg"}}` + "\n",
},
},
WaitDelay: time.Millisecond,
})
newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
return fake
}
buildCodexArgsFn = func(cfg *Config, targetArg string) []string {
return []string{targetArg}
}
codexCommand = "fake-cmd"
res := runCodexTask(TaskSpec{Task: "ignored"}, false, 2)
if res.ExitCode != 0 {
t.Fatalf("runCodexTask exit = %d, want 0 (%s)", res.ExitCode, res.Error)
}
if res.Message != "fake-msg" {
t.Fatalf("message = %q, want fake-msg", res.Message)
}
if res.SessionID != "fake-thread" {
t.Fatalf("sessionID = %q, want fake-thread", res.SessionID)
}
if fake.startCount.Load() != 1 {
t.Fatalf("Start() count = %d, want 1", fake.startCount.Load())
}
if fake.waitCount.Load() != 1 {
t.Fatalf("Wait() count = %d, want 1", fake.waitCount.Load())
}
})
}
func TestRunCodexTask_WaitBeforeParse(t *testing.T) {
defer resetTestHooks()
const (
threadID = "wait-first-thread"
message = "wait-first-message"
waitDelay = 100 * time.Millisecond
extraDelay = 2 * time.Second
)
fake := newFakeCmd(fakeCmdConfig{
StdoutPlan: []fakeStdoutEvent{
{Data: fmt.Sprintf(`{"type":"thread.started","thread_id":"%s"}`+"\n", threadID)},
{Data: fmt.Sprintf(`{"type":"item.completed","item":{"type":"agent_message","text":"%s"}}`+"\n", message)},
{Delay: extraDelay},
},
WaitDelay: waitDelay,
})
newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
return fake
}
buildCodexArgsFn = func(cfg *Config, targetArg string) []string {
return []string{targetArg}
}
codexCommand = "fake-cmd"
start := time.Now()
result := runCodexTask(TaskSpec{Task: "ignored"}, false, 5)
elapsed := time.Since(start)
if result.ExitCode != 0 {
t.Fatalf("runCodexTask exit = %d, want 0 (%s)", result.ExitCode, result.Error)
}
if result.Message != message {
t.Fatalf("message = %q, want %q", result.Message, message)
}
if result.SessionID != threadID {
t.Fatalf("sessionID = %q, want %q", result.SessionID, threadID)
}
if elapsed >= extraDelay {
t.Fatalf("runCodexTask took %v, want < %v", elapsed, extraDelay)
}
if fake.stdout == nil {
t.Fatalf("stdout reader not initialized")
}
if reason := fake.stdout.Reason(); reason != stdoutCloseReasonWait {
t.Fatalf("stdout close reason = %q, want %q", reason, stdoutCloseReasonWait)
}
}
func TestRunCodexTask_ParseStall(t *testing.T) {
defer resetTestHooks()
const threadID = "stall-thread"
startG := runtime.NumGoroutine()
fake := newFakeCmd(fakeCmdConfig{
StdoutPlan: []fakeStdoutEvent{
{Data: fmt.Sprintf(`{"type":"thread.started","thread_id":"%s"}`+"\n", threadID)},
},
KeepStdoutOpen: true,
})
blockingCmd := newDrainBlockingCmd(fake)
newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
return blockingCmd
}
buildCodexArgsFn = func(cfg *Config, targetArg string) []string {
return []string{targetArg}
}
codexCommand = "fake-cmd"
start := time.Now()
result := runCodexTask(TaskSpec{Task: "stall"}, false, 60)
elapsed := time.Since(start)
if !blockingCmd.injected.Load() {
t.Fatalf("stdout wrapper was not installed")
}
if result.ExitCode == 0 || result.Error == "" {
t.Fatalf("expected runCodexTask to error when parse stalls, got %+v", result)
}
errText := strings.ToLower(result.Error)
if !strings.Contains(errText, "drain timeout") && !strings.Contains(errText, "agent_message") {
t.Fatalf("error %q does not mention drain timeout or missing agent_message", result.Error)
}
if elapsed < stdoutDrainTimeout {
t.Fatalf("runCodexTask returned after %v (reason=%s), want >= %v to confirm drainTimer firing", elapsed, fake.stdout.Reason(), stdoutDrainTimeout)
}
maxDuration := stdoutDrainTimeout + time.Second
if elapsed >= maxDuration {
t.Fatalf("runCodexTask took %v, want < %v", elapsed, maxDuration)
}
if fake.stdout == nil {
t.Fatalf("stdout reader not initialized")
}
if !fake.stdout.closed {
t.Fatalf("stdout reader still open; drainTimer should force close")
}
if reason := fake.stdout.Reason(); reason != stdoutCloseReasonDrain {
t.Fatalf("stdout close reason = %q, want %q", reason, stdoutCloseReasonDrain)
}
deadline := time.Now().Add(500 * time.Millisecond)
allowed := startG + 8
finalG := runtime.NumGoroutine()
for finalG > allowed && time.Now().Before(deadline) {
runtime.Gosched()
time.Sleep(10 * time.Millisecond)
runtime.GC()
finalG = runtime.NumGoroutine()
}
if finalG > allowed {
t.Fatalf("goroutines leaked: before=%d after=%d", startG, finalG)
}
}
func TestRunCodexTask_ContextTimeout(t *testing.T) {
defer resetTestHooks()
forceKillDelay.Store(0)
fake := newFakeCmd(fakeCmdConfig{
KeepStdoutOpen: true,
BlockWait: true,
ReleaseWaitOnKill: true,
ReleaseWaitOnSignal: false,
})
newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
return fake
}
buildCodexArgsFn = func(cfg *Config, targetArg string) []string {
return []string{targetArg}
}
codexCommand = "fake-cmd"
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
var capturedTimer *forceKillTimer
terminateCommandFn = func(cmd commandRunner) *forceKillTimer {
timer := terminateCommand(cmd)
capturedTimer = timer
return timer
}
defer func() { terminateCommandFn = terminateCommand }()
result := runCodexTaskWithContext(ctx, TaskSpec{Task: "ctx-timeout", WorkDir: defaultWorkdir}, nil, false, false, 60)
if result.ExitCode != 124 {
t.Fatalf("exit code = %d, want 124 (%s)", result.ExitCode, result.Error)
}
if !strings.Contains(strings.ToLower(result.Error), "timeout") {
t.Fatalf("error %q does not mention timeout", result.Error)
}
if fake.process == nil {
t.Fatalf("fake process not initialized")
}
if fake.process.SignalCount() == 0 {
t.Fatalf("expected SIGTERM to be sent, got 0")
}
if fake.process.KillCount() == 0 {
t.Fatalf("expected Kill to eventually run, got 0")
}
if capturedTimer == nil {
t.Fatalf("forceKillTimer not captured")
}
if !capturedTimer.stopped.Load() {
t.Fatalf("forceKillTimer.Stop was not called")
}
if !capturedTimer.drained.Load() {
t.Fatalf("forceKillTimer drain logic did not run")
}
if fake.stdout == nil {
t.Fatalf("stdout reader not initialized")
}
if reason := fake.stdout.Reason(); reason != stdoutCloseReasonCtx {
t.Fatalf("stdout close reason = %q, want %q", reason, stdoutCloseReasonCtx)
}
}
func TestRunParseArgs_NewMode(t *testing.T) {
tests := []struct {
name string