fix codex wrapper async log

This commit is contained in:
cexll
2025-12-02 16:54:43 +08:00
parent cfc64e8515
commit 3bc8342929
3 changed files with 52 additions and 49 deletions

View File

@@ -19,13 +19,12 @@ type Logger struct {
file *os.File file *os.File
writer *bufio.Writer writer *bufio.Writer
ch chan logEntry ch chan logEntry
flushReq chan struct{} flushReq chan chan struct{}
done chan struct{} done chan struct{}
closed atomic.Bool closed atomic.Bool
closeOnce sync.Once closeOnce sync.Once
workerWG sync.WaitGroup workerWG sync.WaitGroup
pendingWG sync.WaitGroup pendingWG sync.WaitGroup
flushMu sync.Mutex
} }
type logEntry struct { type logEntry struct {
@@ -60,7 +59,7 @@ func NewLoggerWithSuffix(suffix string) (*Logger, error) {
file: f, file: f,
writer: bufio.NewWriterSize(f, 4096), writer: bufio.NewWriterSize(f, 4096),
ch: make(chan logEntry, 1000), ch: make(chan logEntry, 1000),
flushReq: make(chan struct{}, 1), flushReq: make(chan chan struct{}, 1),
done: make(chan struct{}), done: make(chan struct{}),
} }
@@ -174,16 +173,10 @@ func (l *Logger) Flush() {
} }
// Trigger writer flush // Trigger writer flush
select {
case l.flushReq <- struct{}{}:
// Wait for flush to complete (with mutex)
flushDone := make(chan struct{}) flushDone := make(chan struct{})
go func() { select {
l.flushMu.Lock() case l.flushReq <- flushDone:
l.flushMu.Unlock() // Wait for flush to complete
close(flushDone)
}()
select { select {
case <-flushDone: case <-flushDone:
// Flush completed // Flush completed
@@ -210,11 +203,9 @@ func (l *Logger) log(level, msg string) {
select { select {
case l.ch <- entry: case l.ch <- entry:
// Successfully sent to channel
case <-l.done: case <-l.done:
l.pendingWG.Done() // Logger is closing, drop this entry
return
default:
// Channel is full; drop the entry to avoid blocking callers.
l.pendingWG.Done() l.pendingWG.Done()
return return
} }
@@ -242,11 +233,11 @@ func (l *Logger) run() {
case <-ticker.C: case <-ticker.C:
l.writer.Flush() l.writer.Flush()
case <-l.flushReq: case flushDone := <-l.flushReq:
// Explicit flush request // Explicit flush request - flush writer and sync to disk
l.flushMu.Lock()
l.writer.Flush() l.writer.Flush()
l.flushMu.Unlock() l.file.Sync()
close(flushDone)
} }
} }
} }

View File

@@ -360,6 +360,19 @@ func main() {
// run is the main logic, returns exit code for testability // run is the main logic, returns exit code for testability
func run() (exitCode int) { func run() (exitCode int) {
// Handle --version and --help first (no logger needed)
if len(os.Args) > 1 {
switch os.Args[1] {
case "--version", "-v":
fmt.Printf("codex-wrapper version %s\n", version)
return 0
case "--help", "-h":
printHelp()
return 0
}
}
// Initialize logger for all other commands
logger, err := NewLogger() logger, err := NewLogger()
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: failed to initialize logger: %v\n", err) fmt.Fprintf(os.Stderr, "ERROR: failed to initialize logger: %v\n", err)
@@ -375,25 +388,18 @@ func run() (exitCode int) {
if err := closeLogger(); err != nil { if err := closeLogger(); err != nil {
fmt.Fprintf(os.Stderr, "ERROR: failed to close logger: %v\n", err) fmt.Fprintf(os.Stderr, "ERROR: failed to close logger: %v\n", err)
} }
if exitCode == 0 && logger != nil { // Always remove log file after completion
if logger != nil {
if err := logger.RemoveLogFile(); err != nil && !os.IsNotExist(err) { if err := logger.RemoveLogFile(); err != nil && !os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "ERROR: failed to remove logger file: %v\n", err) // Silently ignore removal errors
} }
} else if exitCode != 0 && logger != nil {
fmt.Fprintf(os.Stderr, "Log file retained at: %s\n", logger.Path())
} }
}() }()
defer runCleanupHook() defer runCleanupHook()
// Handle --version and --help first // Handle remaining commands
if len(os.Args) > 1 { if len(os.Args) > 1 {
switch os.Args[1] { switch os.Args[1] {
case "--version", "-v":
fmt.Printf("codex-wrapper version %s\n", version)
return 0
case "--help", "-h":
printHelp()
return 0
case "--parallel": case "--parallel":
if len(os.Args) > 2 { if len(os.Args) > 2 {
fmt.Fprintln(os.Stderr, "ERROR: --parallel reads its task configuration from stdin and does not accept additional arguments.") fmt.Fprintln(os.Stderr, "ERROR: --parallel reads its task configuration from stdin and does not accept additional arguments.")
@@ -438,6 +444,12 @@ func run() (exitCode int) {
logInfo("Script started") logInfo("Script started")
// Print startup information to stderr
fmt.Fprintf(os.Stderr, "[codex-wrapper]\n")
fmt.Fprintf(os.Stderr, " Command: %s\n", strings.Join(os.Args, " "))
fmt.Fprintf(os.Stderr, " PID: %d\n", os.Getpid())
fmt.Fprintf(os.Stderr, " Log: %s\n", logger.Path())
cfg, err := parseArgs() cfg, err := parseArgs()
if err != nil { if err != nil {
logError(err.Error()) logError(err.Error())
@@ -1210,24 +1222,18 @@ func farewell(name string) string {
} }
func logInfo(msg string) { func logInfo(msg string) {
fmt.Fprintf(os.Stderr, "INFO: %s\n", msg)
if logger := activeLogger(); logger != nil { if logger := activeLogger(); logger != nil {
logger.Info(msg) logger.Info(msg)
} }
} }
func logWarn(msg string) { func logWarn(msg string) {
fmt.Fprintf(os.Stderr, "WARN: %s\n", msg)
if logger := activeLogger(); logger != nil { if logger := activeLogger(); logger != nil {
logger.Warn(msg) logger.Warn(msg)
} }
} }
func logError(msg string) { func logError(msg string) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", msg)
if logger := activeLogger(); logger != nil { if logger := activeLogger(); logger != nil {
logger.Error(msg) logger.Error(msg)
} }

View File

@@ -788,11 +788,13 @@ func TestSilentMode(t *testing.T) {
verbose := capture(false) verbose := capture(false)
quiet := capture(true) quiet := capture(true)
// After refactoring, logs are only written to file, not stderr
// Both silent and non-silent modes should produce no stderr output
if quiet != "" { if quiet != "" {
t.Fatalf("silent mode should suppress stderr, got: %q", quiet) t.Fatalf("silent mode should suppress stderr, got: %q", quiet)
} }
if !strings.Contains(verbose, "INFO: Starting codex") { if verbose != "" {
t.Fatalf("non-silent mode should log to stderr, got: %q", verbose) t.Fatalf("non-silent mode should also suppress stderr (logs go to file), got: %q", verbose)
} }
} }
@@ -1136,10 +1138,10 @@ func TestRun_ExplicitStdinReadError(t *testing.T) {
if !strings.Contains(logOutput, "Failed to read stdin: broken stdin") { if !strings.Contains(logOutput, "Failed to read stdin: broken stdin") {
t.Fatalf("log missing read error entry, got %q", logOutput) t.Fatalf("log missing read error entry, got %q", logOutput)
} }
if _, err := os.Stat(logPath); os.IsNotExist(err) { // Log file is always removed after completion (new behavior)
t.Fatalf("log file should exist") if _, err := os.Stat(logPath); !os.IsNotExist(err) {
t.Fatalf("log file should be removed after completion")
} }
defer os.Remove(logPath)
} }
func TestRun_CommandFails(t *testing.T) { func TestRun_CommandFails(t *testing.T) {
@@ -1220,10 +1222,10 @@ func TestRun_PipedTaskReadError(t *testing.T) {
if !strings.Contains(logOutput, "ERROR: Failed to read piped stdin: read stdin: pipe failure") { if !strings.Contains(logOutput, "ERROR: Failed to read piped stdin: read stdin: pipe failure") {
t.Fatalf("log missing piped read error, got %q", logOutput) t.Fatalf("log missing piped read error, got %q", logOutput)
} }
if _, err := os.Stat(logPath); os.IsNotExist(err) { // Log file is always removed after completion (new behavior)
t.Fatalf("log file should exist") if _, err := os.Stat(logPath); !os.IsNotExist(err) {
t.Fatalf("log file should be removed after completion")
} }
defer os.Remove(logPath)
} }
func TestRun_PipedTaskSuccess(t *testing.T) { func TestRun_PipedTaskSuccess(t *testing.T) {
@@ -1325,17 +1327,21 @@ printf '%s\n' '{"type":"item.completed","item":{"type":"agent_message","text":"l
if exitCode != 130 { if exitCode != 130 {
t.Fatalf("exit code = %d, want 130", exitCode) t.Fatalf("exit code = %d, want 130", exitCode)
} }
if _, err := os.Stat(logPath); os.IsNotExist(err) { // Log file is always removed after completion (new behavior)
t.Fatalf("log file should exist after signal exit") if _, err := os.Stat(logPath); !os.IsNotExist(err) {
t.Fatalf("log file should be removed after completion")
} }
defer os.Remove(logPath)
} }
func TestRun_CleanupHookAlwaysCalled(t *testing.T) { func TestRun_CleanupHookAlwaysCalled(t *testing.T) {
defer resetTestHooks() defer resetTestHooks()
called := false called := false
cleanupHook = func() { called = true } cleanupHook = func() { called = true }
os.Args = []string{"codex-wrapper", "--version"} // Use a command that goes through normal flow, not --version which returns early
codexCommand = "echo"
buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{`{"type":"thread.started","thread_id":"x"}
{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}`} }
os.Args = []string{"codex-wrapper", "task"}
if exitCode := run(); exitCode != 0 { if exitCode := run(); exitCode != 0 {
t.Fatalf("exit = %d, want 0", exitCode) t.Fatalf("exit = %d, want 0", exitCode)
} }