feat: add async logging to temp file with lifecycle management

Implement async logging system that writes to /tmp/codex-wrapper-{pid}.log during execution and auto-deletes on exit.

- Add Logger with buffered channel (cap 100) + single worker goroutine
- Support INFO/DEBUG/ERROR levels
- Graceful shutdown via signal.NotifyContext
- File cleanup on normal/signal exit
- Test coverage: 90.4%

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
cexll
2025-11-29 22:40:19 +08:00
parent 11afae2dff
commit 246674c388
4 changed files with 985 additions and 99 deletions

View File

@@ -2,8 +2,10 @@ package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
@@ -11,6 +13,7 @@ import (
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
)
@@ -27,6 +30,8 @@ var (
stdinReader io.Reader = os.Stdin
isTerminalFn = defaultIsTerminal
codexCommand = "codex"
cleanupHook func()
loggerPtr atomic.Pointer[Logger]
)
// Config holds CLI configuration
@@ -59,6 +64,23 @@ func main() {
// run is the main logic, returns exit code for testability
func run() int {
logger, err := NewLogger()
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: failed to initialize logger: %v\n", err)
return 1
}
setLogger(logger)
defer func() {
if err := closeLogger(); err != nil {
fmt.Fprintf(os.Stderr, "ERROR: failed to close logger: %v\n", err)
}
}()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
defer runCleanupHook()
// Handle --version and --help first
if len(os.Args) > 1 {
switch os.Args[1] {
@@ -102,7 +124,11 @@ func run() int {
}
piped = !isTerminal()
} else {
pipedTask := readPipedTask()
pipedTask, err := readPipedTask()
if err != nil {
logError("Failed to read piped stdin: " + err.Error())
return 1
}
piped = pipedTask != ""
if piped {
taskText = pipedTask
@@ -143,7 +169,7 @@ func run() int {
codexArgs := buildCodexArgs(cfg, targetArg)
logInfo("codex running...")
message, threadID, exitCode := runCodexProcess(codexArgs, taskText, useStdin, cfg.Timeout)
message, threadID, exitCode := runCodexProcess(ctx, codexArgs, taskText, useStdin, cfg.Timeout)
if exitCode != 0 {
return exitCode
@@ -194,19 +220,22 @@ func parseArgs() (*Config, error) {
return cfg, nil
}
func readPipedTask() string {
func readPipedTask() (string, error) {
if isTerminal() {
logInfo("Stdin is tty, skipping pipe read")
return ""
return "", nil
}
logInfo("Reading from stdin pipe...")
data, err := io.ReadAll(stdinReader)
if err != nil || len(data) == 0 {
if err != nil {
return "", fmt.Errorf("read stdin: %w", err)
}
if len(data) == 0 {
logInfo("Stdin pipe returned empty data")
return ""
return "", nil
}
logInfo(fmt.Sprintf("Read %d bytes from stdin pipe", len(data)))
return string(data)
return string(data), nil
}
func shouldUseStdin(taskText string, piped bool) bool {
@@ -245,11 +274,16 @@ func buildCodexArgs(cfg *Config, targetArg string) []string {
}
}
func runCodexProcess(codexArgs []string, taskText string, useStdin bool, timeoutSec int) (message, threadID string, exitCode int) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSec)*time.Second)
type parseResult struct {
message string
threadID string
}
func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText string, useStdin bool, timeoutSec int) (message, threadID string, exitCode int) {
ctx, cancel := context.WithTimeout(parentCtx, time.Duration(timeoutSec)*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, codexCommand, codexArgs...)
cmd := exec.Command(codexCommand, codexArgs...)
cmd.Stderr = os.Stderr
// Setup stdin if needed
@@ -293,50 +327,55 @@ func runCodexProcess(codexArgs []string, taskText string, useStdin bool, timeout
logInfo("Stdin closed")
}
// Setup signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
logError(fmt.Sprintf("Received signal: %v", sig))
if cmd.Process != nil {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() {
if cmd.Process != nil {
cmd.Process.Kill()
}
})
}
}()
logInfo("Reading stdout...")
// Parse JSON stream
message, threadID = parseJSONStream(stdout)
waitCh := make(chan error, 1)
go func() {
waitCh <- cmd.Wait()
}()
// Wait for process to complete
err = cmd.Wait()
parseCh := make(chan parseResult, 1)
go func() {
msg, tid := parseJSONStream(stdout)
parseCh <- parseResult{message: msg, threadID: tid}
}()
// Check for timeout
if ctx.Err() == context.DeadlineExceeded {
logError("Codex execution timeout")
if cmd.Process != nil {
cmd.Process.Kill()
}
return "", "", 124
var waitErr error
var forceKillTimer *time.Timer
select {
case waitErr = <-waitCh:
case <-ctx.Done():
logError(cancelReason(ctx))
forceKillTimer = terminateProcess(cmd)
waitErr = <-waitCh
}
// Check exit code
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
if forceKillTimer != nil {
forceKillTimer.Stop()
}
result := <-parseCh
if ctxErr := ctx.Err(); ctxErr != nil {
if errors.Is(ctxErr, context.DeadlineExceeded) {
return "", "", 124
}
return "", "", 130
}
if waitErr != nil {
if exitErr, ok := waitErr.(*exec.ExitError); ok {
code := exitErr.ExitCode()
logError(fmt.Sprintf("Codex exited with status %d", code))
return "", "", code
}
logError("Codex error: " + err.Error())
logError("Codex error: " + waitErr.Error())
return "", "", 1
}
message = result.message
threadID = result.threadID
if message == "" {
logError("Codex completed without agent_message output")
return "", "", 1
@@ -345,40 +384,98 @@ func runCodexProcess(codexArgs []string, taskText string, useStdin bool, timeout
return message, threadID, 0
}
func cancelReason(ctx context.Context) string {
if ctx == nil {
return "Context cancelled"
}
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return "Codex execution timeout"
}
return "Execution cancelled, terminating codex process"
}
func terminateProcess(cmd *exec.Cmd) *time.Timer {
if cmd == nil || cmd.Process == nil {
return nil
}
_ = cmd.Process.Signal(syscall.SIGTERM)
return time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() {
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
})
}
func parseJSONStream(r io.Reader) (message, threadID string) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
reader := bufio.NewReaderSize(r, 64*1024)
decoder := json.NewDecoder(reader)
for {
var event JSONEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
logWarn(fmt.Sprintf("Failed to parse line: %s", truncate(line, 100)))
if err := decoder.Decode(&event); err != nil {
if errors.Is(err, io.EOF) {
break
}
logWarn(fmt.Sprintf("Failed to decode JSON: %v", err))
var skipErr error
reader, skipErr = discardInvalidJSON(decoder, reader)
if skipErr != nil {
if errors.Is(skipErr, os.ErrClosed) || errors.Is(skipErr, io.ErrClosedPipe) {
logWarn("Read stdout error: " + skipErr.Error())
break
}
if !errors.Is(skipErr, io.EOF) {
logWarn("Read stdout error: " + skipErr.Error())
}
}
decoder = json.NewDecoder(reader)
continue
}
// Capture thread_id
if event.Type == "thread.started" {
switch event.Type {
case "thread.started":
threadID = event.ThreadID
}
// Capture agent_message
if event.Type == "item.completed" && event.Item != nil && event.Item.Type == "agent_message" {
if text := normalizeText(event.Item.Text); text != "" {
message = text
case "item.completed":
if event.Item != nil && event.Item.Type == "agent_message" {
if text := normalizeText(event.Item.Text); text != "" {
message = text
}
}
}
}
if err := scanner.Err(); err != nil && err != io.EOF {
logWarn("Read stdout error: " + err.Error())
return message, threadID
}
func discardInvalidJSON(decoder *json.Decoder, reader *bufio.Reader) (*bufio.Reader, error) {
var buffered bytes.Buffer
if decoder != nil {
if buf := decoder.Buffered(); buf != nil {
_, _ = buffered.ReadFrom(buf)
}
}
return message, threadID
line, err := reader.ReadBytes('\n')
buffered.Write(line)
data := buffered.Bytes()
newline := bytes.IndexByte(data, '\n')
if newline == -1 {
return reader, err
}
remaining := data[newline+1:]
if len(remaining) == 0 {
return reader, err
}
return bufio.NewReader(io.MultiReader(bytes.NewReader(remaining), reader)), err
}
func normalizeText(text interface{}) string {
@@ -450,18 +547,55 @@ func min(a, b int) int {
return b
}
func setLogger(l *Logger) {
loggerPtr.Store(l)
}
func closeLogger() error {
logger := loggerPtr.Swap(nil)
if logger == nil {
return nil
}
return logger.Close()
}
func activeLogger() *Logger {
return loggerPtr.Load()
}
func logInfo(msg string) {
if logger := activeLogger(); logger != nil {
logger.Info(msg)
return
}
fmt.Fprintf(os.Stderr, "INFO: %s\n", msg)
}
func logWarn(msg string) {
if logger := activeLogger(); logger != nil {
logger.Warn(msg)
return
}
fmt.Fprintf(os.Stderr, "WARN: %s\n", msg)
}
func logError(msg string) {
if logger := activeLogger(); logger != nil {
logger.Error(msg)
return
}
fmt.Fprintf(os.Stderr, "ERROR: %s\n", msg)
}
func runCleanupHook() {
if logger := activeLogger(); logger != nil {
logger.Flush()
}
if cleanupHook != nil {
cleanupHook()
}
}
func printHelp() {
help := `codex-wrapper - Go wrapper for Codex CLI