From fe5508228f10a3ac05544b629bbb62011fe50e46 Mon Sep 17 00:00:00 2001 From: ben Date: Wed, 17 Dec 2025 10:33:38 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=A4=9A=20backend=20?= =?UTF-8?q?=E5=B9=B6=E8=A1=8C=E6=97=A5=E5=BF=97=20PID=20=E6=B7=B7=E4=B9=B1?= =?UTF-8?q?=E5=B9=B6=E7=A7=BB=E9=99=A4=E5=8C=85=E8=A3=85=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=20(#74)=20(#76)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(logger): 修复多 backend 并行日志 PID 混乱并移除包装格式 **问题:** - logger.go:288 使用 os.Getpid() 导致并行任务日志 PID 混乱 - 日志文件添加时间戳/PID/级别前缀包装,应输出 backend 原始内容 **修复:** 1. Logger 结构体添加 pid 字段,创建时捕获 PID 2. 日志写入使用固定 l.pid 替代 os.Getpid() 3. 移除日志输出格式包装,直接写入原始消息 4. 添加内存缓存 ERROR/WARN 条目,ExtractRecentErrors 从缓存读取 5. 优化 executor.go context 初始化顺序,避免重复创建 logger **测试:** - 所有测试通过(23.7s) - 更新相关测试用例匹配新格式 Closes #74 * fix(logger): 增强并发日志隔离和 task ID 清理 ## 核心修复 ### 1. Task ID Sanitization (logger.go) - 新增 sanitizeLogSuffix(): 清理非法字符 (/, \, :, 等) - 新增 fallbackLogSuffix(): 为空/非法 ID 生成唯一后备名 - 新增 isSafeLogRune(): 仅允许 [A-Za-z0-9._-] - 路径穿越防护: ../../../etc/passwd → etc-passwd-{hash}.log - 超长 ID 处理: 截断到 64 字符 + hash 确保唯一性 - 自动创建 TMPDIR (MkdirAll) ### 2. 共享日志标识 (executor.go) - 新增 taskLoggerHandle 结构: 封装 logger、路径、共享标志 - 新增 newTaskLoggerHandle(): 统一处理 logger 创建和回退 - printTaskStart(): 显示 "Log (shared)" 标识 - generateFinalOutput(): 在 summary 中标记共享日志 - 并发失败时明确标识所有任务使用共享主日志 ### 3. 内部标志 (config.go) - TaskResult.sharedLog: 非导出字段,标识共享日志状态 ### 4. Race Detector 修复 (logger.go:209-219) - Close() 在关闭 channel 前先等待 pendingWG - 消除 Logger.Close() 与 Logger.log() 之间的竞态条件 ## 测试覆盖 ### 新增测试 (logger_suffix_test.go) - TestLoggerWithSuffixSanitizesUnsafeSuffix: 非法字符清理 - TestLoggerWithSuffixReturnsErrorWhenTempDirNotWritable: 只读目录处理 ### 新增测试 (executor_concurrent_test.go) - TestConcurrentTaskLoggerFailure: 多任务失败时共享日志标识 - TestSanitizeTaskID: 并发场景下 task ID 清理验证 ## 验证结果 ✅ 所有单元测试通过 ✅ Race detector 无竞态 (65.4s) ✅ 路径穿越攻击防护 ✅ 并发日志完全隔离 ✅ 边界情况正确处理 Resolves: PR #76 review feedback Co-Authored-By: Codex Review Generated with swe-agent-bot Co-Authored-By: swe-agent-bot * fix(logger): 修复关键 bug 并优化日志系统 (v5.2.5) 修复 P0 级别问题: - sanitizeLogSuffix 的 trim 碰撞(防止多 task 日志文件名冲突) - ExtractRecentErrors 边界检查(防止 slice 越界) - Logger.Close 阻塞风险(新增可配置超时机制) 代码质量改进: - 删除无用字段 Logger.pid 和 logEntry.level - 优化 sharedLog 标记绑定到最终 LogPath - 移除日志前缀,直接输出 backend 原始内容 测试覆盖增强: - 新增 4 个测试用例(碰撞防护、边界检查、缓存上限、shared 判定) - 优化测试注释和逻辑 版本更新:5.2.4 → 5.2.5 Generated with swe-agent-bot Co-Authored-By: swe-agent-bot --------- Co-authored-by: swe-agent-bot --- codeagent-wrapper/concurrent_stress_test.go | 17 +- codeagent-wrapper/config.go | 1 + codeagent-wrapper/executor.go | 96 +++++-- codeagent-wrapper/executor_concurrent_test.go | 241 +++++++++++++++++ codeagent-wrapper/logger.go | 255 +++++++++++++----- codeagent-wrapper/logger_suffix_test.go | 43 ++- codeagent-wrapper/logger_test.go | 192 +++++++++++-- codeagent-wrapper/main.go | 2 +- codeagent-wrapper/main_test.go | 14 +- 9 files changed, 730 insertions(+), 131 deletions(-) diff --git a/codeagent-wrapper/concurrent_stress_test.go b/codeagent-wrapper/concurrent_stress_test.go index 10fcc1e..0b376db 100644 --- a/codeagent-wrapper/concurrent_stress_test.go +++ b/codeagent-wrapper/concurrent_stress_test.go @@ -76,8 +76,8 @@ func TestConcurrentStressLogger(t *testing.T) { t.Logf("Successfully wrote %d/%d logs (%.1f%%)", actualCount, totalExpected, float64(actualCount)/float64(totalExpected)*100) - // 验证日志格式 - formatRE := regexp.MustCompile(`^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\] \[PID:\d+\] INFO: goroutine-`) + // 验证日志格式(纯文本,无前缀) + formatRE := regexp.MustCompile(`^goroutine-\d+-msg-\d+$`) for i, line := range lines[:min(10, len(lines))] { if !formatRE.MatchString(line) { t.Errorf("line %d has invalid format: %s", i, line) @@ -293,16 +293,13 @@ func TestLoggerOrderPreservation(t *testing.T) { for scanner.Scan() { line := scanner.Text() var gid, seq int - parts := strings.SplitN(line, " INFO: ", 2) - if len(parts) != 2 { - t.Errorf("invalid log format: %s", line) + // Parse format: G0-SEQ0001 (without INFO: prefix) + _, err := fmt.Sscanf(line, "G%d-SEQ%04d", &gid, &seq) + if err != nil { + t.Errorf("invalid log format: %s (error: %v)", line, err) continue } - if _, err := fmt.Sscanf(parts[1], "G%d-SEQ%d", &gid, &seq); err == nil { - sequences[gid] = append(sequences[gid], seq) - } else { - t.Errorf("failed to parse sequence from line: %s", line) - } + sequences[gid] = append(sequences[gid], seq) } // 验证每个 goroutine 内部顺序 diff --git a/codeagent-wrapper/config.go b/codeagent-wrapper/config.go index bee3a2a..4d20e9a 100644 --- a/codeagent-wrapper/config.go +++ b/codeagent-wrapper/config.go @@ -49,6 +49,7 @@ type TaskResult struct { SessionID string `json:"session_id"` Error string `json:"error"` LogPath string `json:"log_path"` + sharedLog bool } var backendRegistry = map[string]Backend{ diff --git a/codeagent-wrapper/executor.go b/codeagent-wrapper/executor.go index c6c4730..d050a5a 100644 --- a/codeagent-wrapper/executor.go +++ b/codeagent-wrapper/executor.go @@ -139,6 +139,38 @@ func taskLoggerFromContext(ctx context.Context) *Logger { return logger } +type taskLoggerHandle struct { + logger *Logger + path string + shared bool + closeFn func() +} + +func newTaskLoggerHandle(taskID string) taskLoggerHandle { + taskLogger, err := NewLoggerWithSuffix(taskID) + if err == nil { + return taskLoggerHandle{ + logger: taskLogger, + path: taskLogger.Path(), + closeFn: func() { _ = taskLogger.Close() }, + } + } + + msg := fmt.Sprintf("Failed to create task logger for %s: %v, using main logger", taskID, err) + mainLogger := activeLogger() + if mainLogger != nil { + logWarn(msg) + return taskLoggerHandle{ + logger: mainLogger, + path: mainLogger.Path(), + shared: true, + } + } + + fmt.Fprintln(os.Stderr, msg) + return taskLoggerHandle{} +} + // defaultRunCodexTaskFn is the default implementation of runCodexTaskFn (exposed for test reset) func defaultRunCodexTaskFn(task TaskSpec, timeout int) TaskResult { if task.WorkDir == "" { @@ -255,7 +287,7 @@ func executeConcurrentWithContext(parentCtx context.Context, layers [][]TaskSpec var startPrintMu sync.Mutex bannerPrinted := false - printTaskStart := func(taskID, logPath string) { + printTaskStart := func(taskID, logPath string, shared bool) { if logPath == "" { return } @@ -264,7 +296,11 @@ func executeConcurrentWithContext(parentCtx context.Context, layers [][]TaskSpec fmt.Fprintln(os.Stderr, "=== Starting Parallel Execution ===") bannerPrinted = true } - fmt.Fprintf(os.Stderr, "Task %s: Log: %s\n", taskID, logPath) + label := "Log" + if shared { + label = "Log (shared)" + } + fmt.Fprintf(os.Stderr, "Task %s: %s: %s\n", taskID, label, logPath) startPrintMu.Unlock() } @@ -334,11 +370,11 @@ func executeConcurrentWithContext(parentCtx context.Context, layers [][]TaskSpec wg.Add(1) go func(ts TaskSpec) { defer wg.Done() - var taskLogger *Logger var taskLogPath string + handle := taskLoggerHandle{} defer func() { if r := recover(); r != nil { - resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r), LogPath: taskLogPath} + resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r), LogPath: taskLogPath, sharedLog: handle.shared} } }() @@ -355,18 +391,29 @@ func executeConcurrentWithContext(parentCtx context.Context, layers [][]TaskSpec logConcurrencyState("done", ts.ID, int(after), workerLimit) }() - if l, err := NewLoggerWithSuffix(ts.ID); err == nil { - taskLogger = l - taskLogPath = l.Path() - defer func() { _ = taskLogger.Close() }() + handle = newTaskLoggerHandle(ts.ID) + taskLogPath = handle.path + if handle.closeFn != nil { + defer handle.closeFn() } - ts.Context = withTaskLogger(ctx, taskLogger) - printTaskStart(ts.ID, taskLogPath) + taskCtx := ctx + if handle.logger != nil { + taskCtx = withTaskLogger(ctx, handle.logger) + } + ts.Context = taskCtx + + printTaskStart(ts.ID, taskLogPath, handle.shared) res := runCodexTaskFn(ts, timeout) - if res.LogPath == "" && taskLogPath != "" { - res.LogPath = taskLogPath + if taskLogPath != "" { + if res.LogPath == "" || (handle.shared && handle.logger != nil && res.LogPath == handle.logger.Path()) { + res.LogPath = taskLogPath + } + } + // 只有当最终的 LogPath 确实是共享 logger 的路径时才标记为 shared + if handle.shared && handle.logger != nil && res.LogPath == handle.logger.Path() { + res.sharedLog = true } resultsCh <- res }(task) @@ -444,7 +491,11 @@ func generateFinalOutput(results []TaskResult) string { sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID)) } if res.LogPath != "" { - sb.WriteString(fmt.Sprintf("Log: %s\n", res.LogPath)) + if res.sharedLog { + sb.WriteString(fmt.Sprintf("Log: %s (shared)\n", res.LogPath)) + } else { + sb.WriteString(fmt.Sprintf("Log: %s\n", res.LogPath)) + } } if res.Message != "" { sb.WriteString(fmt.Sprintf("\n%s\n", res.Message)) @@ -485,6 +536,13 @@ func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText str } func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, backend Backend, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult { + if parentCtx == nil { + parentCtx = taskSpec.Context + } + if parentCtx == nil { + parentCtx = context.Background() + } + result := TaskResult{TaskID: taskSpec.ID} injectedLogger := taskLoggerFromContext(parentCtx) logger := injectedLogger @@ -595,15 +653,15 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, backe } if !silent { - stdoutLogger = newLogWriter("CODEX_STDOUT: ", codexLogLineLimit) - stderrLogger = newLogWriter("CODEX_STDERR: ", codexLogLineLimit) + // Note: Empty prefix ensures backend output is logged as-is without any wrapper format. + // This preserves the original stdout/stderr content from codex/claude/gemini backends. + // Trade-off: Reduces distinguishability between stdout/stderr in logs, but maintains + // output fidelity which is critical for debugging backend-specific issues. + stdoutLogger = newLogWriter("", codexLogLineLimit) + stderrLogger = newLogWriter("", codexLogLineLimit) } ctx := parentCtx - if ctx == nil { - ctx = context.Background() - } - ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second) defer cancel() ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) diff --git a/codeagent-wrapper/executor_concurrent_test.go b/codeagent-wrapper/executor_concurrent_test.go index 341e1aa..eee3c80 100644 --- a/codeagent-wrapper/executor_concurrent_test.go +++ b/codeagent-wrapper/executor_concurrent_test.go @@ -472,6 +472,43 @@ func TestExecutorRunCodexTaskWithContext(t *testing.T) { } }) + t.Run("contextLoggerWithoutParent", func(t *testing.T) { + newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { + return &execFakeRunner{ + stdout: newReasonReadCloser(`{"type":"item.completed","item":{"type":"agent_message","text":"ctx"}}`), + process: &execFakeProcess{pid: 14}, + } + } + _ = closeLogger() + + taskLogger, err := NewLoggerWithSuffix("executor-taskctx") + if err != nil { + t.Fatalf("NewLoggerWithSuffix() error = %v", err) + } + t.Cleanup(func() { + _ = taskLogger.Close() + _ = os.Remove(taskLogger.Path()) + }) + + ctx := withTaskLogger(context.Background(), taskLogger) + res := runCodexTaskWithContext(nil, TaskSpec{ID: "task-context", Task: "payload", WorkDir: ".", Context: ctx}, nil, nil, false, true, 1) + if res.ExitCode != 0 || res.LogPath != taskLogger.Path() { + t.Fatalf("expected task logger to be reused from spec context, got %+v", res) + } + if activeLogger() != nil { + t.Fatalf("expected no global logger to be created when task context provides one") + } + + taskLogger.Flush() + data, err := os.ReadFile(taskLogger.Path()) + if err != nil { + t.Fatalf("failed to read task log: %v", err) + } + if !strings.Contains(string(data), "task-context") { + t.Fatalf("task log missing task id, content: %s", string(data)) + } + }) + t.Run("backendSetsDirAndNilContext", func(t *testing.T) { var rc *execFakeRunner newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { @@ -974,6 +1011,143 @@ func TestExecutorExecuteConcurrentWithContextBranches(t *testing.T) { t.Fatalf("unexpected results: %+v", results) } }) + + t.Run("TestConcurrentTaskLoggerFailure", func(t *testing.T) { + // Create a writable temp dir for the main logger, then flip TMPDIR to a read-only + // location so task-specific loggers fail to open. + writable := t.TempDir() + t.Setenv("TMPDIR", writable) + + mainLogger, err := NewLoggerWithSuffix("shared-main") + if err != nil { + t.Fatalf("NewLoggerWithSuffix() error = %v", err) + } + setLogger(mainLogger) + t.Cleanup(func() { + mainLogger.Flush() + _ = closeLogger() + _ = os.Remove(mainLogger.Path()) + }) + + noWrite := filepath.Join(writable, "ro") + if err := os.Mkdir(noWrite, 0o500); err != nil { + t.Fatalf("failed to create read-only temp dir: %v", err) + } + t.Setenv("TMPDIR", noWrite) + + taskA := nextExecutorTestTaskID("shared-a") + taskB := nextExecutorTestTaskID("shared-b") + + orig := runCodexTaskFn + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + logger := taskLoggerFromContext(task.Context) + if logger != mainLogger { + return TaskResult{TaskID: task.ID, ExitCode: 1, Error: "unexpected logger"} + } + logger.Info("TASK=" + task.ID) + return TaskResult{TaskID: task.ID, ExitCode: 0} + } + t.Cleanup(func() { runCodexTaskFn = orig }) + + stderrR, stderrW, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + oldStderr := os.Stderr + os.Stderr = stderrW + + results := executeConcurrentWithContext(context.Background(), [][]TaskSpec{{{ID: taskA}, {ID: taskB}}}, 1, 0) + + _ = stderrW.Close() + os.Stderr = oldStderr + stderrData, _ := io.ReadAll(stderrR) + _ = stderrR.Close() + stderrOut := string(stderrData) + + if len(results) != 2 { + t.Fatalf("expected 2 results, got %d", len(results)) + } + for _, res := range results { + if res.ExitCode != 0 || res.Error != "" { + t.Fatalf("task failed unexpectedly: %+v", res) + } + if res.LogPath != mainLogger.Path() { + t.Fatalf("shared log path mismatch: got %q want %q", res.LogPath, mainLogger.Path()) + } + if !res.sharedLog { + t.Fatalf("expected sharedLog flag for %+v", res) + } + if !strings.Contains(stderrOut, "Log (shared)") { + t.Fatalf("stderr missing shared marker: %s", stderrOut) + } + } + + summary := generateFinalOutput(results) + if !strings.Contains(summary, "(shared)") { + t.Fatalf("summary missing shared marker: %s", summary) + } + + mainLogger.Flush() + data, err := os.ReadFile(mainLogger.Path()) + if err != nil { + t.Fatalf("failed to read main log: %v", err) + } + content := string(data) + if !strings.Contains(content, "TASK="+taskA) || !strings.Contains(content, "TASK="+taskB) { + t.Fatalf("expected shared log to contain both tasks, got: %s", content) + } + }) + + t.Run("TestSanitizeTaskID", func(t *testing.T) { + tempDir := t.TempDir() + t.Setenv("TMPDIR", tempDir) + + orig := runCodexTaskFn + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + logger := taskLoggerFromContext(task.Context) + if logger == nil { + return TaskResult{TaskID: task.ID, ExitCode: 1, Error: "missing logger"} + } + logger.Info("TASK=" + task.ID) + return TaskResult{TaskID: task.ID, ExitCode: 0} + } + t.Cleanup(func() { runCodexTaskFn = orig }) + + idA := "../bad id" + idB := "tab\tid" + results := executeConcurrentWithContext(context.Background(), [][]TaskSpec{{{ID: idA}, {ID: idB}}}, 1, 0) + + if len(results) != 2 { + t.Fatalf("expected 2 results, got %d", len(results)) + } + + expected := map[string]string{ + idA: sanitizeLogSuffix(idA), + idB: sanitizeLogSuffix(idB), + } + + for _, res := range results { + if res.ExitCode != 0 || res.Error != "" { + t.Fatalf("unexpected failure: %+v", res) + } + safe, ok := expected[res.TaskID] + if !ok { + t.Fatalf("unexpected task id %q in results", res.TaskID) + } + wantBase := fmt.Sprintf("%s-%d-%s.log", primaryLogPrefix(), os.Getpid(), safe) + if filepath.Base(res.LogPath) != wantBase { + t.Fatalf("log filename for %q = %q, want %q", res.TaskID, filepath.Base(res.LogPath), wantBase) + } + data, err := os.ReadFile(res.LogPath) + if err != nil { + t.Fatalf("failed to read log %q: %v", res.LogPath, err) + } + if !strings.Contains(string(data), "TASK="+res.TaskID) { + t.Fatalf("log for %q missing task marker, content: %s", res.TaskID, string(data)) + } + _ = os.Remove(res.LogPath) + } + }) } func TestExecutorSignalAndTermination(t *testing.T) { @@ -1116,3 +1290,70 @@ func TestExecutorForwardSignalsDefaults(t *testing.T) { forwardSignals(ctx, &execFakeRunner{process: &execFakeProcess{pid: 80}}, func(string) {}) time.Sleep(10 * time.Millisecond) } + +func TestExecutorSharedLogFalseWhenCustomLogPath(t *testing.T) { + devNull, err := os.OpenFile(os.DevNull, os.O_WRONLY, 0) + if err != nil { + t.Fatalf("failed to open %s: %v", os.DevNull, err) + } + oldStderr := os.Stderr + os.Stderr = devNull + t.Cleanup(func() { + os.Stderr = oldStderr + _ = devNull.Close() + }) + + tempDir := t.TempDir() + t.Setenv("TMPDIR", tempDir) + + // Setup: 创建主 logger + mainLogger, err := NewLoggerWithSuffix("shared-main") + if err != nil { + t.Fatalf("NewLoggerWithSuffix() error = %v", err) + } + setLogger(mainLogger) + defer func() { + _ = closeLogger() + _ = os.Remove(mainLogger.Path()) + }() + + // 模拟场景:task logger 创建失败(通过设置只读的 TMPDIR), + // 回退到主 logger(handle.shared=true), + // 但 runCodexTaskFn 返回自定义的 LogPath(不等于主 logger 的路径) + roDir := filepath.Join(tempDir, "ro") + if err := os.Mkdir(roDir, 0o500); err != nil { + t.Fatalf("failed to create read-only dir: %v", err) + } + t.Setenv("TMPDIR", roDir) + + orig := runCodexTaskFn + customLogPath := "/custom/path/to.log" + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + // 返回自定义 LogPath,不等于主 logger 的路径 + return TaskResult{ + TaskID: task.ID, + ExitCode: 0, + LogPath: customLogPath, + } + } + defer func() { runCodexTaskFn = orig }() + + // 执行任务 + results := executeConcurrentWithContext(context.Background(), [][]TaskSpec{{{ID: "task1"}}}, 1, 0) + + if len(results) != 1 { + t.Fatalf("expected 1 result, got %d", len(results)) + } + + res := results[0] + // 关键断言:即使 handle.shared=true(因为 task logger 创建失败), + // 但因为 LogPath 不等于主 logger 的路径,sharedLog 应为 false + if res.sharedLog { + t.Fatalf("expected sharedLog=false when LogPath differs from shared logger, got true") + } + + // 验证 LogPath 确实是自定义的 + if res.LogPath != customLogPath { + t.Fatalf("expected custom LogPath %s, got %s", customLogPath, res.LogPath) + } +} diff --git a/codeagent-wrapper/logger.go b/codeagent-wrapper/logger.go index f33299e..cbe338a 100644 --- a/codeagent-wrapper/logger.go +++ b/codeagent-wrapper/logger.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "hash/crc32" "os" "path/filepath" "strconv" @@ -18,22 +19,25 @@ import ( // It is intentionally minimal: a buffered channel + single worker goroutine // to avoid contention while keeping ordering guarantees. type Logger struct { - path string - file *os.File - writer *bufio.Writer - ch chan logEntry - flushReq chan chan struct{} - done chan struct{} - closed atomic.Bool - closeOnce sync.Once - workerWG sync.WaitGroup - pendingWG sync.WaitGroup - flushMu sync.Mutex + path string + file *os.File + writer *bufio.Writer + ch chan logEntry + flushReq chan chan struct{} + done chan struct{} + closed atomic.Bool + closeOnce sync.Once + workerWG sync.WaitGroup + pendingWG sync.WaitGroup + flushMu sync.Mutex + workerErr error + errorEntries []string // Cache of recent ERROR/WARN entries + errorMu sync.Mutex } type logEntry struct { - level string - msg string + msg string + isError bool // true for ERROR or WARN levels } // CleanupStats captures the outcome of a cleanupOldLogs run. @@ -55,6 +59,10 @@ var ( evalSymlinksFn = filepath.EvalSymlinks ) +const maxLogSuffixLen = 64 + +var logSuffixCounter atomic.Uint64 + // NewLogger creates the async logger and starts the worker goroutine. // The log file is created under os.TempDir() using the required naming scheme. func NewLogger() (*Logger, error) { @@ -64,14 +72,23 @@ func NewLogger() (*Logger, error) { // NewLoggerWithSuffix creates a logger with an optional suffix in the filename. // Useful for tests that need isolated log files within the same process. func NewLoggerWithSuffix(suffix string) (*Logger, error) { - filename := fmt.Sprintf("%s-%d", primaryLogPrefix(), os.Getpid()) + pid := os.Getpid() + filename := fmt.Sprintf("%s-%d", primaryLogPrefix(), pid) + var safeSuffix string if suffix != "" { - filename += "-" + suffix + safeSuffix = sanitizeLogSuffix(suffix) + } + if safeSuffix != "" { + filename += "-" + safeSuffix } filename += ".log" path := filepath.Clean(filepath.Join(os.TempDir(), filename)) + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return nil, err + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600) if err != nil { return nil, err @@ -92,6 +109,73 @@ func NewLoggerWithSuffix(suffix string) (*Logger, error) { return l, nil } +func sanitizeLogSuffix(raw string) string { + trimmed := strings.TrimSpace(raw) + if trimmed == "" { + return fallbackLogSuffix() + } + + var b strings.Builder + changed := false + for _, r := range trimmed { + if isSafeLogRune(r) { + b.WriteRune(r) + } else { + changed = true + b.WriteByte('-') + } + if b.Len() >= maxLogSuffixLen { + changed = true + break + } + } + + sanitized := strings.Trim(b.String(), "-.") + if sanitized != b.String() { + changed = true // Mark if trim removed any characters + } + if sanitized == "" { + return fallbackLogSuffix() + } + + if changed || len(sanitized) > maxLogSuffixLen { + hash := crc32.ChecksumIEEE([]byte(trimmed)) + hashStr := fmt.Sprintf("%x", hash) + + maxPrefix := maxLogSuffixLen - len(hashStr) - 1 + if maxPrefix < 1 { + maxPrefix = 1 + } + if len(sanitized) > maxPrefix { + sanitized = sanitized[:maxPrefix] + } + + sanitized = fmt.Sprintf("%s-%s", sanitized, hashStr) + } + + return sanitized +} + +func fallbackLogSuffix() string { + next := logSuffixCounter.Add(1) + return fmt.Sprintf("task-%d", next) +} + +func isSafeLogRune(r rune) bool { + switch { + case r >= 'a' && r <= 'z': + return true + case r >= 'A' && r <= 'Z': + return true + case r >= '0' && r <= '9': + return true + case r == '-', r == '_', r == '.': + return true + default: + return false + } +} + // Path returns the underlying log file path (useful for tests/inspection). func (l *Logger) Path() string { if l == nil { @@ -112,10 +196,11 @@ func (l *Logger) Debug(msg string) { l.log("DEBUG", msg) } // Error logs at ERROR level. func (l *Logger) Error(msg string) { l.log("ERROR", msg) } -// Close stops the worker and syncs the log file. +// Close signals the worker to flush and close the log file. // The log file is NOT removed, allowing inspection after program exit. // It is safe to call multiple times. -// Returns after a 5-second timeout if worker doesn't stop gracefully. +// Waits up to CODEAGENT_LOGGER_CLOSE_TIMEOUT_MS (default: 5000) for shutdown; set to 0 to wait indefinitely. +// Returns an error if shutdown doesn't complete within the timeout. func (l *Logger) Close() error { if l == nil { return nil @@ -126,42 +211,51 @@ func (l *Logger) Close() error { l.closeOnce.Do(func() { l.closed.Store(true) close(l.done) - close(l.ch) - // Wait for worker with timeout + timeout := loggerCloseTimeout() workerDone := make(chan struct{}) go func() { l.workerWG.Wait() close(workerDone) }() - select { - case <-workerDone: - // Worker stopped gracefully - case <-time.After(5 * time.Second): - // Worker timeout - proceed with cleanup anyway - closeErr = fmt.Errorf("logger worker timeout during close") + if timeout > 0 { + select { + case <-workerDone: + // Worker stopped gracefully + case <-time.After(timeout): + closeErr = fmt.Errorf("logger worker timeout during close") + return + } + } else { + <-workerDone } - if err := l.writer.Flush(); err != nil && closeErr == nil { - closeErr = err + if l.workerErr != nil && closeErr == nil { + closeErr = l.workerErr } - - if err := l.file.Sync(); err != nil && closeErr == nil { - closeErr = err - } - - if err := l.file.Close(); err != nil && closeErr == nil { - closeErr = err - } - - // Log file is kept for debugging - NOT removed - // Users can manually clean up /tmp/-*.log files }) return closeErr } +func loggerCloseTimeout() time.Duration { + const defaultTimeout = 5 * time.Second + + raw := strings.TrimSpace(os.Getenv("CODEAGENT_LOGGER_CLOSE_TIMEOUT_MS")) + if raw == "" { + return defaultTimeout + } + ms, err := strconv.Atoi(raw) + if err != nil { + return defaultTimeout + } + if ms <= 0 { + return 0 + } + return time.Duration(ms) * time.Millisecond +} + // RemoveLogFile removes the log file. Should only be called after Close(). func (l *Logger) RemoveLogFile() error { if l == nil { @@ -170,34 +264,29 @@ func (l *Logger) RemoveLogFile() error { return os.Remove(l.path) } -// ExtractRecentErrors reads the log file and returns the most recent ERROR and WARN entries. +// ExtractRecentErrors returns the most recent ERROR and WARN entries from memory cache. // Returns up to maxEntries entries in chronological order. func (l *Logger) ExtractRecentErrors(maxEntries int) []string { - if l == nil || l.path == "" { + if l == nil || maxEntries <= 0 { return nil } - f, err := os.Open(l.path) - if err != nil { + l.errorMu.Lock() + defer l.errorMu.Unlock() + + if len(l.errorEntries) == 0 { return nil } - defer f.Close() - var entries []string - scanner := bufio.NewScanner(f) - for scanner.Scan() { - line := scanner.Text() - if strings.Contains(line, "] ERROR:") || strings.Contains(line, "] WARN:") { - entries = append(entries, line) - } + // Return last N entries + start := 0 + if len(l.errorEntries) > maxEntries { + start = len(l.errorEntries) - maxEntries } - // Keep only the last maxEntries - if len(entries) > maxEntries { - entries = entries[len(entries)-maxEntries:] - } - - return entries + result := make([]string, len(l.errorEntries)-start) + copy(result, l.errorEntries[start:]) + return result } // Flush waits for all pending log entries to be written. Primarily for tests. @@ -254,7 +343,8 @@ func (l *Logger) log(level, msg string) { return } - entry := logEntry{level: level, msg: msg} + isError := level == "WARN" || level == "ERROR" + entry := logEntry{msg: msg, isError: isError} l.flushMu.Lock() l.pendingWG.Add(1) l.flushMu.Unlock() @@ -275,18 +365,42 @@ func (l *Logger) run() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() + writeEntry := func(entry logEntry) { + fmt.Fprintf(l.writer, "%s\n", entry.msg) + + // Cache error/warn entries in memory for fast extraction + if entry.isError { + l.errorMu.Lock() + l.errorEntries = append(l.errorEntries, entry.msg) + if len(l.errorEntries) > 100 { // Keep last 100 + l.errorEntries = l.errorEntries[1:] + } + l.errorMu.Unlock() + } + + l.pendingWG.Done() + } + + finalize := func() { + if err := l.writer.Flush(); err != nil && l.workerErr == nil { + l.workerErr = err + } + if err := l.file.Sync(); err != nil && l.workerErr == nil { + l.workerErr = err + } + if err := l.file.Close(); err != nil && l.workerErr == nil { + l.workerErr = err + } + } + for { select { case entry, ok := <-l.ch: if !ok { - // Channel closed, final flush - _ = l.writer.Flush() + finalize() return } - timestamp := time.Now().Format("2006-01-02 15:04:05.000") - pid := os.Getpid() - fmt.Fprintf(l.writer, "[%s] [PID:%d] %s: %s\n", timestamp, pid, entry.level, entry.msg) - l.pendingWG.Done() + writeEntry(entry) case <-ticker.C: _ = l.writer.Flush() @@ -296,6 +410,21 @@ func (l *Logger) run() { _ = l.writer.Flush() _ = l.file.Sync() close(flushDone) + + case <-l.done: + for { + select { + case entry, ok := <-l.ch: + if !ok { + finalize() + return + } + writeEntry(entry) + default: + finalize() + return + } + } } } } diff --git a/codeagent-wrapper/logger_suffix_test.go b/codeagent-wrapper/logger_suffix_test.go index 9e57196..dc4a94f 100644 --- a/codeagent-wrapper/logger_suffix_test.go +++ b/codeagent-wrapper/logger_suffix_test.go @@ -68,13 +68,48 @@ func TestLoggerWithSuffixNamingAndIsolation(t *testing.T) { } } -func TestLoggerWithSuffixReturnsErrorWhenTempDirMissing(t *testing.T) { - missingTempDir := filepath.Join(t.TempDir(), "does-not-exist") - setTempDirEnv(t, missingTempDir) +func TestLoggerWithSuffixReturnsErrorWhenTempDirNotWritable(t *testing.T) { + base := t.TempDir() + noWrite := filepath.Join(base, "ro") + if err := os.Mkdir(noWrite, 0o500); err != nil { + t.Fatalf("failed to create read-only temp dir: %v", err) + } + t.Cleanup(func() { _ = os.Chmod(noWrite, 0o700) }) + setTempDirEnv(t, noWrite) logger, err := NewLoggerWithSuffix("task-err") if err == nil { _ = logger.Close() - t.Fatalf("expected error, got nil") + t.Fatalf("expected error when temp dir is not writable") + } +} + +func TestLoggerWithSuffixSanitizesUnsafeSuffix(t *testing.T) { + tempDir := setTempDirEnv(t, t.TempDir()) + + raw := "../bad id/with?chars" + safe := sanitizeLogSuffix(raw) + if safe == "" { + t.Fatalf("sanitizeLogSuffix returned empty string") + } + if strings.ContainsAny(safe, "/\\") { + t.Fatalf("sanitized suffix should not contain path separators, got %q", safe) + } + + logger, err := NewLoggerWithSuffix(raw) + if err != nil { + t.Fatalf("NewLoggerWithSuffix(%q) error = %v", raw, err) + } + t.Cleanup(func() { + _ = logger.Close() + _ = os.Remove(logger.Path()) + }) + + wantBase := fmt.Sprintf("%s-%d-%s.log", primaryLogPrefix(), os.Getpid(), safe) + if gotBase := filepath.Base(logger.Path()); gotBase != wantBase { + t.Fatalf("log filename = %q, want %q", gotBase, wantBase) + } + if dir := filepath.Dir(logger.Path()); dir != tempDir { + t.Fatalf("logger path dir = %q, want %q", dir, tempDir) } } diff --git a/codeagent-wrapper/logger_test.go b/codeagent-wrapper/logger_test.go index 3f59070..e0f5e31 100644 --- a/codeagent-wrapper/logger_test.go +++ b/codeagent-wrapper/logger_test.go @@ -69,7 +69,7 @@ func TestLoggerWritesLevels(t *testing.T) { } content := string(data) - checks := []string{"INFO: info message", "WARN: warn message", "DEBUG: debug message", "ERROR: error message"} + checks := []string{"info message", "warn message", "debug message", "error message"} for _, c := range checks { if !strings.Contains(content, c) { t.Fatalf("log file missing entry %q, content: %s", c, content) @@ -766,7 +766,7 @@ func TestLoggerInternalLog(t *testing.T) { logger.log("INFO", "hello") entry := <-done - if entry.level != "INFO" || entry.msg != "hello" { + if entry.msg != "hello" { t.Fatalf("unexpected entry %+v", entry) } @@ -894,66 +894,90 @@ func (f fakeFileInfo) Sys() interface{} { return nil } func TestLoggerExtractRecentErrors(t *testing.T) { tests := []struct { name string - content string + logs []struct{ level, msg string } maxEntries int want []string }{ { name: "empty log", - content: "", + logs: nil, maxEntries: 10, want: nil, }, { name: "no errors", - content: `[2025-01-01 12:00:00.000] [PID:123] INFO: started -[2025-01-01 12:00:01.000] [PID:123] DEBUG: processing`, + logs: []struct{ level, msg string }{ + {"INFO", "started"}, + {"DEBUG", "processing"}, + }, maxEntries: 10, want: nil, }, { name: "single error", - content: `[2025-01-01 12:00:00.000] [PID:123] INFO: started -[2025-01-01 12:00:01.000] [PID:123] ERROR: something failed`, + logs: []struct{ level, msg string }{ + {"INFO", "started"}, + {"ERROR", "something failed"}, + }, maxEntries: 10, - want: []string{"[2025-01-01 12:00:01.000] [PID:123] ERROR: something failed"}, + want: []string{"something failed"}, }, { name: "error and warn", - content: `[2025-01-01 12:00:00.000] [PID:123] INFO: started -[2025-01-01 12:00:01.000] [PID:123] WARN: warning message -[2025-01-01 12:00:02.000] [PID:123] ERROR: error message`, + logs: []struct{ level, msg string }{ + {"INFO", "started"}, + {"WARN", "warning message"}, + {"ERROR", "error message"}, + }, maxEntries: 10, want: []string{ - "[2025-01-01 12:00:01.000] [PID:123] WARN: warning message", - "[2025-01-01 12:00:02.000] [PID:123] ERROR: error message", + "warning message", + "error message", }, }, { name: "truncate to max", - content: `[2025-01-01 12:00:00.000] [PID:123] ERROR: error 1 -[2025-01-01 12:00:01.000] [PID:123] ERROR: error 2 -[2025-01-01 12:00:02.000] [PID:123] ERROR: error 3 -[2025-01-01 12:00:03.000] [PID:123] ERROR: error 4 -[2025-01-01 12:00:04.000] [PID:123] ERROR: error 5`, + logs: []struct{ level, msg string }{ + {"ERROR", "error 1"}, + {"ERROR", "error 2"}, + {"ERROR", "error 3"}, + {"ERROR", "error 4"}, + {"ERROR", "error 5"}, + }, maxEntries: 3, want: []string{ - "[2025-01-01 12:00:02.000] [PID:123] ERROR: error 3", - "[2025-01-01 12:00:03.000] [PID:123] ERROR: error 4", - "[2025-01-01 12:00:04.000] [PID:123] ERROR: error 5", + "error 3", + "error 4", + "error 5", }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tempDir := t.TempDir() - logPath := filepath.Join(tempDir, "test.log") - if err := os.WriteFile(logPath, []byte(tt.content), 0o644); err != nil { - t.Fatalf("failed to write test log: %v", err) + logger, err := NewLoggerWithSuffix("extract-test") + if err != nil { + t.Fatalf("NewLoggerWithSuffix() error = %v", err) + } + defer logger.Close() + defer logger.RemoveLogFile() + + // Write logs using logger methods + for _, entry := range tt.logs { + switch entry.level { + case "INFO": + logger.Info(entry.msg) + case "WARN": + logger.Warn(entry.msg) + case "ERROR": + logger.Error(entry.msg) + case "DEBUG": + logger.Debug(entry.msg) + } } - logger := &Logger{path: logPath} + logger.Flush() + got := logger.ExtractRecentErrors(tt.maxEntries) if len(got) != len(tt.want) { @@ -988,3 +1012,117 @@ func TestLoggerExtractRecentErrorsFileNotExist(t *testing.T) { t.Fatalf("nonexistent file ExtractRecentErrors() should return nil, got %v", got) } } + +func TestSanitizeLogSuffixNoDuplicates(t *testing.T) { + testCases := []string{ + "task", + "task.", + ".task", + "-task", + "task-", + "--task--", + "..task..", + } + + seen := make(map[string]string) + for _, input := range testCases { + result := sanitizeLogSuffix(input) + if result == "" { + t.Fatalf("sanitizeLogSuffix(%q) returned empty string", input) + } + + if prev, exists := seen[result]; exists { + t.Fatalf("collision detected: %q and %q both produce %q", input, prev, result) + } + seen[result] = input + + // Verify result is safe for file names + if strings.ContainsAny(result, "/\\:*?\"<>|") { + t.Fatalf("sanitizeLogSuffix(%q) = %q contains unsafe characters", input, result) + } + } +} + +func TestExtractRecentErrorsBoundaryCheck(t *testing.T) { + logger, err := NewLoggerWithSuffix("boundary-test") + if err != nil { + t.Fatalf("NewLoggerWithSuffix() error = %v", err) + } + defer logger.Close() + defer logger.RemoveLogFile() + + // Write some errors + logger.Error("error 1") + logger.Warn("warn 1") + logger.Error("error 2") + logger.Flush() + + // Test zero + result := logger.ExtractRecentErrors(0) + if result != nil { + t.Fatalf("ExtractRecentErrors(0) should return nil, got %v", result) + } + + // Test negative + result = logger.ExtractRecentErrors(-5) + if result != nil { + t.Fatalf("ExtractRecentErrors(-5) should return nil, got %v", result) + } + + // Test positive still works + result = logger.ExtractRecentErrors(10) + if len(result) != 3 { + t.Fatalf("ExtractRecentErrors(10) expected 3 entries, got %d", len(result)) + } +} + +func TestErrorEntriesMaxLimit(t *testing.T) { + logger, err := NewLoggerWithSuffix("max-limit-test") + if err != nil { + t.Fatalf("NewLoggerWithSuffix() error = %v", err) + } + defer logger.Close() + defer logger.RemoveLogFile() + + // Write 150 error/warn entries + for i := 1; i <= 150; i++ { + if i%2 == 0 { + logger.Error(fmt.Sprintf("error-%03d", i)) + } else { + logger.Warn(fmt.Sprintf("warn-%03d", i)) + } + } + logger.Flush() + + // Extract all cached errors + result := logger.ExtractRecentErrors(200) // Request more than cache size + + // Should only have last 100 entries (entries 51-150 in sequence) + if len(result) != 100 { + t.Fatalf("expected 100 cached entries, got %d", len(result)) + } + + // Verify entries are the last 100 (entries 51-150) + if !strings.Contains(result[0], "051") { + t.Fatalf("first cached entry should be entry 51, got: %s", result[0]) + } + if !strings.Contains(result[99], "150") { + t.Fatalf("last cached entry should be entry 150, got: %s", result[99]) + } + + // Verify order is preserved - simplified logic + for i := 0; i < len(result)-1; i++ { + expectedNum := 51 + i + nextNum := 51 + i + 1 + + expectedEntry := fmt.Sprintf("%03d", expectedNum) + nextEntry := fmt.Sprintf("%03d", nextNum) + + if !strings.Contains(result[i], expectedEntry) { + t.Fatalf("entry at index %d should contain %s, got: %s", i, expectedEntry, result[i]) + } + if !strings.Contains(result[i+1], nextEntry) { + t.Fatalf("entry at index %d should contain %s, got: %s", i+1, nextEntry, result[i+1]) + } + } +} diff --git a/codeagent-wrapper/main.go b/codeagent-wrapper/main.go index b940e0c..f923a3e 100644 --- a/codeagent-wrapper/main.go +++ b/codeagent-wrapper/main.go @@ -14,7 +14,7 @@ import ( ) const ( - version = "5.2.4" + version = "5.2.5" defaultWorkdir = "." defaultTimeout = 7200 // seconds codexLogLineLimit = 1000 diff --git a/codeagent-wrapper/main_test.go b/codeagent-wrapper/main_test.go index 85d4c42..0193066 100644 --- a/codeagent-wrapper/main_test.go +++ b/codeagent-wrapper/main_test.go @@ -1784,13 +1784,13 @@ func TestRunLogFunctions(t *testing.T) { } output := string(data) - if !strings.Contains(output, "INFO: info message") { + if !strings.Contains(output, "info message") { t.Errorf("logInfo output missing, got: %s", output) } - if !strings.Contains(output, "WARN: warn message") { + if !strings.Contains(output, "warn message") { t.Errorf("logWarn output missing, got: %s", output) } - if !strings.Contains(output, "ERROR: error message") { + if !strings.Contains(output, "error message") { t.Errorf("logError output missing, got: %s", output) } } @@ -2691,7 +2691,7 @@ func TestVersionFlag(t *testing.T) { t.Errorf("exit = %d, want 0", code) } }) - want := "codeagent-wrapper version 5.2.4\n" + want := "codeagent-wrapper version 5.2.5\n" if output != want { t.Fatalf("output = %q, want %q", output, want) } @@ -2705,7 +2705,7 @@ func TestVersionShortFlag(t *testing.T) { t.Errorf("exit = %d, want 0", code) } }) - want := "codeagent-wrapper version 5.2.4\n" + want := "codeagent-wrapper version 5.2.5\n" if output != want { t.Fatalf("output = %q, want %q", output, want) } @@ -2719,7 +2719,7 @@ func TestVersionLegacyAlias(t *testing.T) { t.Errorf("exit = %d, want 0", code) } }) - want := "codex-wrapper version 5.2.4\n" + want := "codex-wrapper version 5.2.5\n" if output != want { t.Fatalf("output = %q, want %q", output, want) } @@ -3300,7 +3300,7 @@ func TestRun_PipedTaskReadError(t *testing.T) { if exitCode != 1 { t.Fatalf("exit=%d, want 1", exitCode) } - if !strings.Contains(logOutput, "ERROR: Failed to read piped stdin: read stdin: pipe failure") { + if !strings.Contains(logOutput, "Failed to read piped stdin: read stdin: pipe failure") { t.Fatalf("log missing piped read error, got %q", logOutput) } // Log file is always removed after completion (new behavior)