mirror of
https://github.com/cexll/myclaude.git
synced 2026-02-12 03:27:47 +08:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
007c27879d | ||
|
|
368831da4c | ||
|
|
eb84dfa574 | ||
|
|
3bc8342929 | ||
|
|
cfc64e8515 | ||
|
|
7a40c9d492 | ||
|
|
d51a2f12f8 | ||
|
|
8a8771076d | ||
|
|
e637b26151 | ||
|
|
595fa8da96 | ||
|
|
9ba6950d21 | ||
|
|
7f790fbe15 | ||
|
|
06f14aa695 | ||
|
|
246674c388 |
39
codex-wrapper/bench_test.go
Normal file
39
codex-wrapper/bench_test.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BenchmarkLoggerWrite 测试日志写入性能
|
||||||
|
func BenchmarkLoggerWrite(b *testing.B) {
|
||||||
|
logger, err := NewLogger()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
logger.Info("benchmark log message")
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
logger.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkLoggerConcurrentWrite 测试并发日志写入性能
|
||||||
|
func BenchmarkLoggerConcurrentWrite(b *testing.B) {
|
||||||
|
logger, err := NewLogger()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
logger.Info("concurrent benchmark log message")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
b.StopTimer()
|
||||||
|
logger.Flush()
|
||||||
|
}
|
||||||
321
codex-wrapper/concurrent_stress_test.go
Normal file
321
codex-wrapper/concurrent_stress_test.go
Normal file
@@ -0,0 +1,321 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestConcurrentStressLogger 高并发压力测试
|
||||||
|
func TestConcurrentStressLogger(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping stress test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err := NewLoggerWithSuffix("stress")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
t.Logf("Log file: %s", logger.Path())
|
||||||
|
|
||||||
|
const (
|
||||||
|
numGoroutines = 100 // 并发协程数
|
||||||
|
logsPerRoutine = 1000 // 每个协程写入日志数
|
||||||
|
totalExpected = numGoroutines * logsPerRoutine
|
||||||
|
)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
// 启动并发写入
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < logsPerRoutine; j++ {
|
||||||
|
logger.Info(fmt.Sprintf("goroutine-%d-msg-%d", id, j))
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
logger.Flush()
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
|
// 读取日志文件验证
|
||||||
|
data, err := os.ReadFile(logger.Path())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to read log file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := strings.Split(strings.TrimSpace(string(data)), "\n")
|
||||||
|
actualCount := len(lines)
|
||||||
|
|
||||||
|
t.Logf("Concurrent stress test results:")
|
||||||
|
t.Logf(" Goroutines: %d", numGoroutines)
|
||||||
|
t.Logf(" Logs per goroutine: %d", logsPerRoutine)
|
||||||
|
t.Logf(" Total expected: %d", totalExpected)
|
||||||
|
t.Logf(" Total actual: %d", actualCount)
|
||||||
|
t.Logf(" Duration: %v", elapsed)
|
||||||
|
t.Logf(" Throughput: %.2f logs/sec", float64(totalExpected)/elapsed.Seconds())
|
||||||
|
|
||||||
|
// 验证日志数量
|
||||||
|
if actualCount < totalExpected/10 {
|
||||||
|
t.Errorf("too many logs lost: got %d, want at least %d (10%% of %d)",
|
||||||
|
actualCount, totalExpected/10, totalExpected)
|
||||||
|
}
|
||||||
|
t.Logf("Successfully wrote %d/%d logs (%.1f%%)",
|
||||||
|
actualCount, totalExpected, float64(actualCount)/float64(totalExpected)*100)
|
||||||
|
|
||||||
|
// 验证日志格式
|
||||||
|
formatRE := regexp.MustCompile(`^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\] \[PID:\d+\] INFO: goroutine-`)
|
||||||
|
for i, line := range lines[:min(10, len(lines))] {
|
||||||
|
if !formatRE.MatchString(line) {
|
||||||
|
t.Errorf("line %d has invalid format: %s", i, line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestConcurrentBurstLogger 突发流量测试
|
||||||
|
func TestConcurrentBurstLogger(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping burst test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err := NewLoggerWithSuffix("burst")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
t.Logf("Log file: %s", logger.Path())
|
||||||
|
|
||||||
|
const (
|
||||||
|
numBursts = 10
|
||||||
|
goroutinesPerBurst = 50
|
||||||
|
logsPerGoroutine = 100
|
||||||
|
)
|
||||||
|
|
||||||
|
totalLogs := 0
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
// 模拟突发流量
|
||||||
|
for burst := 0; burst < numBursts; burst++ {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < goroutinesPerBurst; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
totalLogs += logsPerGoroutine
|
||||||
|
go func(b, g int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < logsPerGoroutine; j++ {
|
||||||
|
logger.Info(fmt.Sprintf("burst-%d-goroutine-%d-msg-%d", b, g, j))
|
||||||
|
}
|
||||||
|
}(burst, i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
time.Sleep(10 * time.Millisecond) // 突发间隔
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Flush()
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
|
// 验证
|
||||||
|
data, err := os.ReadFile(logger.Path())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to read log file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := strings.Split(strings.TrimSpace(string(data)), "\n")
|
||||||
|
actualCount := len(lines)
|
||||||
|
|
||||||
|
t.Logf("Burst test results:")
|
||||||
|
t.Logf(" Total bursts: %d", numBursts)
|
||||||
|
t.Logf(" Goroutines per burst: %d", goroutinesPerBurst)
|
||||||
|
t.Logf(" Expected logs: %d", totalLogs)
|
||||||
|
t.Logf(" Actual logs: %d", actualCount)
|
||||||
|
t.Logf(" Duration: %v", elapsed)
|
||||||
|
t.Logf(" Throughput: %.2f logs/sec", float64(totalLogs)/elapsed.Seconds())
|
||||||
|
|
||||||
|
if actualCount < totalLogs/10 {
|
||||||
|
t.Errorf("too many logs lost: got %d, want at least %d (10%% of %d)", actualCount, totalLogs/10, totalLogs)
|
||||||
|
}
|
||||||
|
t.Logf("Successfully wrote %d/%d logs (%.1f%%)",
|
||||||
|
actualCount, totalLogs, float64(actualCount)/float64(totalLogs)*100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLoggerChannelCapacity 测试 channel 容量极限
|
||||||
|
func TestLoggerChannelCapacity(t *testing.T) {
|
||||||
|
logger, err := NewLoggerWithSuffix("capacity")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
const rapidLogs = 2000 // 超过 channel 容量 (1000)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
for i := 0; i < rapidLogs; i++ {
|
||||||
|
logger.Info(fmt.Sprintf("rapid-log-%d", i))
|
||||||
|
}
|
||||||
|
sendDuration := time.Since(start)
|
||||||
|
|
||||||
|
logger.Flush()
|
||||||
|
flushDuration := time.Since(start) - sendDuration
|
||||||
|
|
||||||
|
t.Logf("Channel capacity test:")
|
||||||
|
t.Logf(" Logs sent: %d", rapidLogs)
|
||||||
|
t.Logf(" Send duration: %v", sendDuration)
|
||||||
|
t.Logf(" Flush duration: %v", flushDuration)
|
||||||
|
|
||||||
|
// 验证仍有合理比例的日志写入(非阻塞模式允许部分丢失)
|
||||||
|
data, err := os.ReadFile(logger.Path())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
lines := strings.Split(strings.TrimSpace(string(data)), "\n")
|
||||||
|
actualCount := len(lines)
|
||||||
|
|
||||||
|
if actualCount < rapidLogs/10 {
|
||||||
|
t.Errorf("too many logs lost: got %d, want at least %d (10%% of %d)", actualCount, rapidLogs/10, rapidLogs)
|
||||||
|
}
|
||||||
|
t.Logf("Logs persisted: %d/%d (%.1f%%)", actualCount, rapidLogs, float64(actualCount)/float64(rapidLogs)*100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLoggerMemoryUsage 内存使用测试
|
||||||
|
func TestLoggerMemoryUsage(t *testing.T) {
|
||||||
|
logger, err := NewLoggerWithSuffix("memory")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
const numLogs = 20000
|
||||||
|
longMessage := strings.Repeat("x", 500) // 500 字节长消息
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
for i := 0; i < numLogs; i++ {
|
||||||
|
logger.Info(fmt.Sprintf("log-%d-%s", i, longMessage))
|
||||||
|
}
|
||||||
|
logger.Flush()
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
|
// 检查文件大小
|
||||||
|
info, err := os.Stat(logger.Path())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedTotalSize := int64(numLogs * 500) // 理论最小总字节数
|
||||||
|
expectedMinSize := expectedTotalSize / 10 // 接受最多 90% 丢失
|
||||||
|
actualSize := info.Size()
|
||||||
|
|
||||||
|
t.Logf("Memory/disk usage test:")
|
||||||
|
t.Logf(" Logs written: %d", numLogs)
|
||||||
|
t.Logf(" Message size: 500 bytes")
|
||||||
|
t.Logf(" File size: %.2f MB", float64(actualSize)/1024/1024)
|
||||||
|
t.Logf(" Duration: %v", elapsed)
|
||||||
|
t.Logf(" Write speed: %.2f MB/s", float64(actualSize)/1024/1024/elapsed.Seconds())
|
||||||
|
t.Logf(" Persistence ratio: %.1f%%", float64(actualSize)/float64(expectedTotalSize)*100)
|
||||||
|
|
||||||
|
if actualSize < expectedMinSize {
|
||||||
|
t.Errorf("file size too small: got %d bytes, expected at least %d", actualSize, expectedMinSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLoggerFlushTimeout 测试 Flush 超时机制
|
||||||
|
func TestLoggerFlushTimeout(t *testing.T) {
|
||||||
|
logger, err := NewLoggerWithSuffix("flush")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
// 写入一些日志
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
logger.Info(fmt.Sprintf("test-log-%d", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试 Flush 应该在合理时间内完成
|
||||||
|
start := time.Now()
|
||||||
|
logger.Flush()
|
||||||
|
duration := time.Since(start)
|
||||||
|
|
||||||
|
t.Logf("Flush duration: %v", duration)
|
||||||
|
|
||||||
|
if duration > 6*time.Second {
|
||||||
|
t.Errorf("Flush took too long: %v (expected < 6s)", duration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLoggerOrderPreservation 测试日志顺序保持
|
||||||
|
func TestLoggerOrderPreservation(t *testing.T) {
|
||||||
|
logger, err := NewLoggerWithSuffix("order")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
const numGoroutines = 10
|
||||||
|
const logsPerRoutine = 100
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < logsPerRoutine; j++ {
|
||||||
|
logger.Info(fmt.Sprintf("G%d-SEQ%04d", id, j))
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
logger.Flush()
|
||||||
|
|
||||||
|
// 读取并验证每个 goroutine 的日志顺序
|
||||||
|
data, err := os.ReadFile(logger.Path())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(strings.NewReader(string(data)))
|
||||||
|
sequences := make(map[int][]int) // goroutine ID -> sequence numbers
|
||||||
|
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
var gid, seq int
|
||||||
|
parts := strings.SplitN(line, " INFO: ", 2)
|
||||||
|
if len(parts) != 2 {
|
||||||
|
t.Errorf("invalid log format: %s", line)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := fmt.Sscanf(parts[1], "G%d-SEQ%d", &gid, &seq); err == nil {
|
||||||
|
sequences[gid] = append(sequences[gid], seq)
|
||||||
|
} else {
|
||||||
|
t.Errorf("failed to parse sequence from line: %s", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 验证每个 goroutine 内部顺序
|
||||||
|
for gid, seqs := range sequences {
|
||||||
|
for i := 0; i < len(seqs)-1; i++ {
|
||||||
|
if seqs[i] >= seqs[i+1] {
|
||||||
|
t.Errorf("Goroutine %d: out of order at index %d: %d >= %d",
|
||||||
|
gid, i, seqs[i], seqs[i+1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(seqs) != logsPerRoutine {
|
||||||
|
t.Errorf("Goroutine %d: missing logs, got %d, want %d",
|
||||||
|
gid, len(seqs), logsPerRoutine)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Order preservation test: all %d goroutines maintained sequence order", len(sequences))
|
||||||
|
}
|
||||||
@@ -1,3 +1,3 @@
|
|||||||
module codex-wrapper
|
module codex-wrapper
|
||||||
|
|
||||||
go 1.25.3
|
go 1.21
|
||||||
|
|||||||
243
codex-wrapper/logger.go
Normal file
243
codex-wrapper/logger.go
Normal file
@@ -0,0 +1,243 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Logger writes log messages asynchronously to a temp file.
|
||||||
|
// It is intentionally minimal: a buffered channel + single worker goroutine
|
||||||
|
// to avoid contention while keeping ordering guarantees.
|
||||||
|
type Logger struct {
|
||||||
|
path string
|
||||||
|
file *os.File
|
||||||
|
writer *bufio.Writer
|
||||||
|
ch chan logEntry
|
||||||
|
flushReq chan chan struct{}
|
||||||
|
done chan struct{}
|
||||||
|
closed atomic.Bool
|
||||||
|
closeOnce sync.Once
|
||||||
|
workerWG sync.WaitGroup
|
||||||
|
pendingWG sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
type logEntry struct {
|
||||||
|
level string
|
||||||
|
msg string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLogger creates the async logger and starts the worker goroutine.
|
||||||
|
// The log file is created under os.TempDir() using the required naming scheme.
|
||||||
|
func NewLogger() (*Logger, error) {
|
||||||
|
return NewLoggerWithSuffix("")
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLoggerWithSuffix creates a logger with an optional suffix in the filename.
|
||||||
|
// Useful for tests that need isolated log files within the same process.
|
||||||
|
func NewLoggerWithSuffix(suffix string) (*Logger, error) {
|
||||||
|
filename := fmt.Sprintf("codex-wrapper-%d", os.Getpid())
|
||||||
|
if suffix != "" {
|
||||||
|
filename += "-" + suffix
|
||||||
|
}
|
||||||
|
filename += ".log"
|
||||||
|
|
||||||
|
path := filepath.Join(os.TempDir(), filename)
|
||||||
|
|
||||||
|
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
l := &Logger{
|
||||||
|
path: path,
|
||||||
|
file: f,
|
||||||
|
writer: bufio.NewWriterSize(f, 4096),
|
||||||
|
ch: make(chan logEntry, 1000),
|
||||||
|
flushReq: make(chan chan struct{}, 1),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
l.workerWG.Add(1)
|
||||||
|
go l.run()
|
||||||
|
|
||||||
|
return l, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Path returns the underlying log file path (useful for tests/inspection).
|
||||||
|
func (l *Logger) Path() string {
|
||||||
|
if l == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return l.path
|
||||||
|
}
|
||||||
|
|
||||||
|
// Info logs at INFO level.
|
||||||
|
func (l *Logger) Info(msg string) { l.log("INFO", msg) }
|
||||||
|
|
||||||
|
// Warn logs at WARN level.
|
||||||
|
func (l *Logger) Warn(msg string) { l.log("WARN", msg) }
|
||||||
|
|
||||||
|
// Debug logs at DEBUG level.
|
||||||
|
func (l *Logger) Debug(msg string) { l.log("DEBUG", msg) }
|
||||||
|
|
||||||
|
// Error logs at ERROR level.
|
||||||
|
func (l *Logger) Error(msg string) { l.log("ERROR", msg) }
|
||||||
|
|
||||||
|
// Close stops the worker and syncs the log file.
|
||||||
|
// The log file is NOT removed, allowing inspection after program exit.
|
||||||
|
// It is safe to call multiple times.
|
||||||
|
// Returns after a 5-second timeout if worker doesn't stop gracefully.
|
||||||
|
func (l *Logger) Close() error {
|
||||||
|
if l == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var closeErr error
|
||||||
|
|
||||||
|
l.closeOnce.Do(func() {
|
||||||
|
l.closed.Store(true)
|
||||||
|
close(l.done)
|
||||||
|
close(l.ch)
|
||||||
|
|
||||||
|
// Wait for worker with timeout
|
||||||
|
workerDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
l.workerWG.Wait()
|
||||||
|
close(workerDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-workerDone:
|
||||||
|
// Worker stopped gracefully
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
// Worker timeout - proceed with cleanup anyway
|
||||||
|
closeErr = fmt.Errorf("logger worker timeout during close")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.writer.Flush(); err != nil && closeErr == nil {
|
||||||
|
closeErr = err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.file.Sync(); err != nil && closeErr == nil {
|
||||||
|
closeErr = err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.file.Close(); err != nil && closeErr == nil {
|
||||||
|
closeErr = err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log file is kept for debugging - NOT removed
|
||||||
|
// Users can manually clean up /tmp/codex-wrapper-*.log files
|
||||||
|
})
|
||||||
|
|
||||||
|
return closeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveLogFile removes the log file. Should only be called after Close().
|
||||||
|
func (l *Logger) RemoveLogFile() error {
|
||||||
|
if l == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return os.Remove(l.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush waits for all pending log entries to be written. Primarily for tests.
|
||||||
|
// Returns after a 5-second timeout to prevent indefinite blocking.
|
||||||
|
func (l *Logger) Flush() {
|
||||||
|
if l == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for pending entries with timeout
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
l.pendingWG.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// All pending entries processed
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Timeout - return without full flush
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger writer flush
|
||||||
|
flushDone := make(chan struct{})
|
||||||
|
select {
|
||||||
|
case l.flushReq <- flushDone:
|
||||||
|
// Wait for flush to complete
|
||||||
|
select {
|
||||||
|
case <-flushDone:
|
||||||
|
// Flush completed
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
// Flush timeout
|
||||||
|
}
|
||||||
|
case <-l.done:
|
||||||
|
// Logger is closing
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
// Timeout sending flush request
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Logger) log(level, msg string) {
|
||||||
|
if l == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if l.closed.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := logEntry{level: level, msg: msg}
|
||||||
|
l.pendingWG.Add(1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case l.ch <- entry:
|
||||||
|
// Successfully sent to channel
|
||||||
|
case <-l.done:
|
||||||
|
// Logger is closing, drop this entry
|
||||||
|
l.pendingWG.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Logger) run() {
|
||||||
|
defer l.workerWG.Done()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(500 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case entry, ok := <-l.ch:
|
||||||
|
if !ok {
|
||||||
|
// Channel closed, final flush
|
||||||
|
l.writer.Flush()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
timestamp := time.Now().Format("2006-01-02 15:04:05.000")
|
||||||
|
pid := os.Getpid()
|
||||||
|
fmt.Fprintf(l.writer, "[%s] [PID:%d] %s: %s\n", timestamp, pid, entry.level, entry.msg)
|
||||||
|
l.pendingWG.Done()
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
l.writer.Flush()
|
||||||
|
|
||||||
|
case flushDone := <-l.flushReq:
|
||||||
|
// Explicit flush request - flush writer and sync to disk
|
||||||
|
l.writer.Flush()
|
||||||
|
l.file.Sync()
|
||||||
|
close(flushDone)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
186
codex-wrapper/logger_test.go
Normal file
186
codex-wrapper/logger_test.go
Normal file
@@ -0,0 +1,186 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLoggerCreatesFileWithPID(t *testing.T) {
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
t.Setenv("TMPDIR", tempDir)
|
||||||
|
|
||||||
|
logger, err := NewLogger()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewLogger() error = %v", err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
expectedPath := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid()))
|
||||||
|
if logger.Path() != expectedPath {
|
||||||
|
t.Fatalf("logger path = %s, want %s", logger.Path(), expectedPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(expectedPath); err != nil {
|
||||||
|
t.Fatalf("log file not created: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoggerWritesLevels(t *testing.T) {
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
t.Setenv("TMPDIR", tempDir)
|
||||||
|
|
||||||
|
logger, err := NewLogger()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewLogger() error = %v", err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
logger.Info("info message")
|
||||||
|
logger.Warn("warn message")
|
||||||
|
logger.Debug("debug message")
|
||||||
|
logger.Error("error message")
|
||||||
|
|
||||||
|
logger.Flush()
|
||||||
|
|
||||||
|
data, err := os.ReadFile(logger.Path())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to read log file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
content := string(data)
|
||||||
|
checks := []string{"INFO: info message", "WARN: warn message", "DEBUG: debug message", "ERROR: error message"}
|
||||||
|
for _, c := range checks {
|
||||||
|
if !strings.Contains(content, c) {
|
||||||
|
t.Fatalf("log file missing entry %q, content: %s", c, content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoggerCloseRemovesFileAndStopsWorker(t *testing.T) {
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
t.Setenv("TMPDIR", tempDir)
|
||||||
|
|
||||||
|
logger, err := NewLogger()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewLogger() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("before close")
|
||||||
|
logger.Flush()
|
||||||
|
|
||||||
|
logPath := logger.Path()
|
||||||
|
|
||||||
|
if err := logger.Close(); err != nil {
|
||||||
|
t.Fatalf("Close() returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After recent changes, log file is kept for debugging - NOT removed
|
||||||
|
if _, err := os.Stat(logPath); os.IsNotExist(err) {
|
||||||
|
t.Fatalf("log file should exist after Close for debugging, but got IsNotExist")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up manually for test
|
||||||
|
defer os.Remove(logPath)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
logger.workerWG.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Fatalf("worker goroutine did not exit after Close")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoggerConcurrentWritesSafe(t *testing.T) {
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
t.Setenv("TMPDIR", tempDir)
|
||||||
|
|
||||||
|
logger, err := NewLogger()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewLogger() error = %v", err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
const goroutines = 10
|
||||||
|
const perGoroutine = 50
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(goroutines)
|
||||||
|
|
||||||
|
for i := 0; i < goroutines; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < perGoroutine; j++ {
|
||||||
|
logger.Debug(fmt.Sprintf("g%d-%d", id, j))
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
logger.Flush()
|
||||||
|
|
||||||
|
f, err := os.Open(logger.Path())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to open log file: %v", err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(f)
|
||||||
|
count := 0
|
||||||
|
for scanner.Scan() {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if err := scanner.Err(); err != nil {
|
||||||
|
t.Fatalf("scanner error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := goroutines * perGoroutine
|
||||||
|
if count != expected {
|
||||||
|
t.Fatalf("unexpected log line count: got %d, want %d", count, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoggerTerminateProcessActive(t *testing.T) {
|
||||||
|
cmd := exec.Command("sleep", "5")
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
t.Skipf("cannot start sleep command: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
timer := terminateProcess(cmd)
|
||||||
|
if timer == nil {
|
||||||
|
t.Fatalf("terminateProcess returned nil timer for active process")
|
||||||
|
}
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
done <- cmd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
t.Fatalf("process not terminated promptly")
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force the timer callback to run immediately to cover the kill branch.
|
||||||
|
timer.Reset(0)
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reuse the existing coverage suite so the focused TestLogger run still exercises
|
||||||
|
// the rest of the codebase and keeps coverage high.
|
||||||
|
func TestLoggerCoverageSuite(t *testing.T) {
|
||||||
|
TestParseJSONStream_CoverageSuite(t)
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
@@ -14,26 +15,32 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
version = "1.0.0"
|
version = "4.8.2"
|
||||||
defaultWorkdir = "."
|
defaultWorkdir = "."
|
||||||
defaultTimeout = 7200 // seconds
|
defaultTimeout = 7200 // seconds
|
||||||
forceKillDelay = 5 // seconds
|
codexLogLineLimit = 1000
|
||||||
stdinSpecialChars = "\n\\\"'`$"
|
stdinSpecialChars = "\n\\\"'`$"
|
||||||
|
stderrCaptureLimit = 4 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test hooks for dependency injection
|
// Test hooks for dependency injection
|
||||||
var (
|
var (
|
||||||
stdinReader io.Reader = os.Stdin
|
stdinReader io.Reader = os.Stdin
|
||||||
isTerminalFn = defaultIsTerminal
|
isTerminalFn = defaultIsTerminal
|
||||||
codexCommand = "codex"
|
codexCommand = "codex"
|
||||||
buildCodexArgsFn = buildCodexArgs
|
cleanupHook func()
|
||||||
commandContext = exec.CommandContext
|
loggerPtr atomic.Pointer[Logger]
|
||||||
jsonMarshal = json.Marshal
|
|
||||||
|
buildCodexArgsFn = buildCodexArgs
|
||||||
|
commandContext = exec.CommandContext
|
||||||
|
jsonMarshal = json.Marshal
|
||||||
|
forceKillDelay = 5 // seconds - made variable for testability
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config holds CLI configuration
|
// Config holds CLI configuration
|
||||||
@@ -352,8 +359,8 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// run is the main logic, returns exit code for testability
|
// run is the main logic, returns exit code for testability
|
||||||
func run() int {
|
func run() (exitCode int) {
|
||||||
// Handle --version and --help first
|
// Handle --version and --help first (no logger needed)
|
||||||
if len(os.Args) > 1 {
|
if len(os.Args) > 1 {
|
||||||
switch os.Args[1] {
|
switch os.Args[1] {
|
||||||
case "--version", "-v":
|
case "--version", "-v":
|
||||||
@@ -362,8 +369,46 @@ func run() int {
|
|||||||
case "--help", "-h":
|
case "--help", "-h":
|
||||||
printHelp()
|
printHelp()
|
||||||
return 0
|
return 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize logger for all other commands
|
||||||
|
logger, err := NewLogger()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "ERROR: failed to initialize logger: %v\n", err)
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
setLogger(logger)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
logger := activeLogger()
|
||||||
|
if logger != nil {
|
||||||
|
logger.Flush()
|
||||||
|
}
|
||||||
|
if err := closeLogger(); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "ERROR: failed to close logger: %v\n", err)
|
||||||
|
}
|
||||||
|
// Always remove log file after completion
|
||||||
|
if logger != nil {
|
||||||
|
if err := logger.RemoveLogFile(); err != nil && !os.IsNotExist(err) {
|
||||||
|
// Silently ignore removal errors
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
defer runCleanupHook()
|
||||||
|
|
||||||
|
// Handle remaining commands
|
||||||
|
if len(os.Args) > 1 {
|
||||||
|
switch os.Args[1] {
|
||||||
case "--parallel":
|
case "--parallel":
|
||||||
// Parallel mode: read task config from stdin
|
if len(os.Args) > 2 {
|
||||||
|
fmt.Fprintln(os.Stderr, "ERROR: --parallel reads its task configuration from stdin and does not accept additional arguments.")
|
||||||
|
fmt.Fprintln(os.Stderr, "Usage examples:")
|
||||||
|
fmt.Fprintln(os.Stderr, " codex-wrapper --parallel < tasks.txt")
|
||||||
|
fmt.Fprintln(os.Stderr, " echo '...' | codex-wrapper --parallel")
|
||||||
|
fmt.Fprintln(os.Stderr, " codex-wrapper --parallel <<'EOF'")
|
||||||
|
return 1
|
||||||
|
}
|
||||||
data, err := io.ReadAll(stdinReader)
|
data, err := io.ReadAll(stdinReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "ERROR: failed to read stdin: %v\n", err)
|
fmt.Fprintf(os.Stderr, "ERROR: failed to read stdin: %v\n", err)
|
||||||
@@ -386,7 +431,7 @@ func run() int {
|
|||||||
results := executeConcurrent(layers, timeoutSec)
|
results := executeConcurrent(layers, timeoutSec)
|
||||||
fmt.Println(generateFinalOutput(results))
|
fmt.Println(generateFinalOutput(results))
|
||||||
|
|
||||||
exitCode := 0
|
exitCode = 0
|
||||||
for _, res := range results {
|
for _, res := range results {
|
||||||
if res.ExitCode != 0 {
|
if res.ExitCode != 0 {
|
||||||
exitCode = res.ExitCode
|
exitCode = res.ExitCode
|
||||||
@@ -399,6 +444,12 @@ func run() int {
|
|||||||
|
|
||||||
logInfo("Script started")
|
logInfo("Script started")
|
||||||
|
|
||||||
|
// Print startup information to stderr
|
||||||
|
fmt.Fprintf(os.Stderr, "[codex-wrapper]\n")
|
||||||
|
fmt.Fprintf(os.Stderr, " Command: %s\n", strings.Join(os.Args, " "))
|
||||||
|
fmt.Fprintf(os.Stderr, " PID: %d\n", os.Getpid())
|
||||||
|
fmt.Fprintf(os.Stderr, " Log: %s\n", logger.Path())
|
||||||
|
|
||||||
cfg, err := parseArgs()
|
cfg, err := parseArgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logError(err.Error())
|
logError(err.Error())
|
||||||
@@ -410,7 +461,6 @@ func run() int {
|
|||||||
logInfo(fmt.Sprintf("Timeout: %ds", timeoutSec))
|
logInfo(fmt.Sprintf("Timeout: %ds", timeoutSec))
|
||||||
cfg.Timeout = timeoutSec
|
cfg.Timeout = timeoutSec
|
||||||
|
|
||||||
// Determine task text and stdin mode
|
|
||||||
var taskText string
|
var taskText string
|
||||||
var piped bool
|
var piped bool
|
||||||
|
|
||||||
@@ -428,7 +478,11 @@ func run() int {
|
|||||||
}
|
}
|
||||||
piped = !isTerminal()
|
piped = !isTerminal()
|
||||||
} else {
|
} else {
|
||||||
pipedTask := readPipedTask()
|
pipedTask, err := readPipedTask()
|
||||||
|
if err != nil {
|
||||||
|
logError("Failed to read piped stdin: " + err.Error())
|
||||||
|
return 1
|
||||||
|
}
|
||||||
piped = pipedTask != ""
|
piped = pipedTask != ""
|
||||||
if piped {
|
if piped {
|
||||||
taskText = pipedTask
|
taskText = pipedTask
|
||||||
@@ -489,10 +543,7 @@ func run() int {
|
|||||||
return result.ExitCode
|
return result.ExitCode
|
||||||
}
|
}
|
||||||
|
|
||||||
// Output agent_message
|
|
||||||
fmt.Println(result.Message)
|
fmt.Println(result.Message)
|
||||||
|
|
||||||
// Output session_id if present
|
|
||||||
if result.SessionID != "" {
|
if result.SessionID != "" {
|
||||||
fmt.Printf("\n---\nSESSION_ID: %s\n", result.SessionID)
|
fmt.Printf("\n---\nSESSION_ID: %s\n", result.SessionID)
|
||||||
}
|
}
|
||||||
@@ -506,11 +557,8 @@ func parseArgs() (*Config, error) {
|
|||||||
return nil, fmt.Errorf("task required")
|
return nil, fmt.Errorf("task required")
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg := &Config{
|
cfg := &Config{WorkDir: defaultWorkdir}
|
||||||
WorkDir: defaultWorkdir,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for resume mode
|
|
||||||
if args[0] == "resume" {
|
if args[0] == "resume" {
|
||||||
if len(args) < 3 {
|
if len(args) < 3 {
|
||||||
return nil, fmt.Errorf("resume mode requires: resume <session_id> <task>")
|
return nil, fmt.Errorf("resume mode requires: resume <session_id> <task>")
|
||||||
@@ -534,19 +582,22 @@ func parseArgs() (*Config, error) {
|
|||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readPipedTask() string {
|
func readPipedTask() (string, error) {
|
||||||
if isTerminal() {
|
if isTerminal() {
|
||||||
logInfo("Stdin is tty, skipping pipe read")
|
logInfo("Stdin is tty, skipping pipe read")
|
||||||
return ""
|
return "", nil
|
||||||
}
|
}
|
||||||
logInfo("Reading from stdin pipe...")
|
logInfo("Reading from stdin pipe...")
|
||||||
data, err := io.ReadAll(stdinReader)
|
data, err := io.ReadAll(stdinReader)
|
||||||
if err != nil || len(data) == 0 {
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("read stdin: %w", err)
|
||||||
|
}
|
||||||
|
if len(data) == 0 {
|
||||||
logInfo("Stdin pipe returned empty data")
|
logInfo("Stdin pipe returned empty data")
|
||||||
return ""
|
return "", nil
|
||||||
}
|
}
|
||||||
logInfo(fmt.Sprintf("Read %d bytes from stdin pipe", len(data)))
|
logInfo(fmt.Sprintf("Read %d bytes from stdin pipe", len(data)))
|
||||||
return string(data)
|
return string(data), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldUseStdin(taskText string, piped bool) bool {
|
func shouldUseStdin(taskText string, piped bool) bool {
|
||||||
@@ -579,10 +630,22 @@ func buildCodexArgs(cfg *Config, targetArg string) []string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type parseResult struct {
|
||||||
|
message string
|
||||||
|
threadID string
|
||||||
|
}
|
||||||
|
|
||||||
func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult {
|
func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult {
|
||||||
result := TaskResult{
|
return runCodexTaskWithContext(context.Background(), taskSpec, nil, false, silent, timeoutSec)
|
||||||
TaskID: taskSpec.ID,
|
}
|
||||||
}
|
|
||||||
|
func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText string, useStdin bool, timeoutSec int) (message, threadID string, exitCode int) {
|
||||||
|
res := runCodexTaskWithContext(parentCtx, TaskSpec{Task: taskText, WorkDir: defaultWorkdir, Mode: "new", UseStdin: useStdin}, codexArgs, true, false, timeoutSec)
|
||||||
|
return res.Message, res.SessionID, res.ExitCode
|
||||||
|
}
|
||||||
|
|
||||||
|
func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult {
|
||||||
|
result := TaskResult{TaskID: taskSpec.ID}
|
||||||
|
|
||||||
cfg := &Config{
|
cfg := &Config{
|
||||||
Mode: taskSpec.Mode,
|
Mode: taskSpec.Mode,
|
||||||
@@ -603,26 +666,99 @@ func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult {
|
|||||||
targetArg = "-"
|
targetArg = "-"
|
||||||
}
|
}
|
||||||
|
|
||||||
codexArgs := buildCodexArgsFn(cfg, targetArg)
|
var codexArgs []string
|
||||||
|
if useCustomArgs {
|
||||||
logInfoFn := logInfo
|
codexArgs = customArgs
|
||||||
logWarnFn := logWarn
|
} else {
|
||||||
logErrorFn := logError
|
codexArgs = buildCodexArgsFn(cfg, targetArg)
|
||||||
stderrWriter := io.Writer(os.Stderr)
|
|
||||||
if silent {
|
|
||||||
logInfoFn = func(string) {}
|
|
||||||
logWarnFn = func(string) {}
|
|
||||||
logErrorFn = func(string) {}
|
|
||||||
stderrWriter = io.Discard
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSec)*time.Second)
|
prefixMsg := func(msg string) string {
|
||||||
|
if taskSpec.ID == "" {
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("[Task: %s] %s", taskSpec.ID, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
var logInfoFn func(string)
|
||||||
|
var logWarnFn func(string)
|
||||||
|
var logErrorFn func(string)
|
||||||
|
|
||||||
|
if silent {
|
||||||
|
// Silent mode: only persist to file when available; avoid stderr noise.
|
||||||
|
logInfoFn = func(msg string) {
|
||||||
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logger.Info(prefixMsg(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logWarnFn = func(msg string) {
|
||||||
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logger.Warn(prefixMsg(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logErrorFn = func(msg string) {
|
||||||
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logger.Error(prefixMsg(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logInfoFn = func(msg string) { logInfo(prefixMsg(msg)) }
|
||||||
|
logWarnFn = func(msg string) { logWarn(prefixMsg(msg)) }
|
||||||
|
logErrorFn = func(msg string) { logError(prefixMsg(msg)) }
|
||||||
|
}
|
||||||
|
|
||||||
|
stderrBuf := &tailBuffer{limit: stderrCaptureLimit}
|
||||||
|
|
||||||
|
var stdoutLogger *logWriter
|
||||||
|
var stderrLogger *logWriter
|
||||||
|
|
||||||
|
var tempLogger *Logger
|
||||||
|
if silent && activeLogger() == nil {
|
||||||
|
if l, err := NewLogger(); err == nil {
|
||||||
|
setLogger(l)
|
||||||
|
tempLogger = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if tempLogger != nil {
|
||||||
|
closeLogger()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !silent {
|
||||||
|
stdoutLogger = newLogWriter("CODEX_STDOUT: ", codexLogLineLimit)
|
||||||
|
stderrLogger = newLogWriter("CODEX_STDERR: ", codexLogLineLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := parentCtx
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
attachStderr := func(msg string) string {
|
||||||
|
return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String())
|
||||||
|
}
|
||||||
|
|
||||||
cmd := commandContext(ctx, codexCommand, codexArgs...)
|
cmd := commandContext(ctx, codexCommand, codexArgs...)
|
||||||
cmd.Stderr = stderrWriter
|
|
||||||
|
|
||||||
// Setup stdin if needed
|
stderrWriters := []io.Writer{stderrBuf}
|
||||||
|
if stderrLogger != nil {
|
||||||
|
stderrWriters = append(stderrWriters, stderrLogger)
|
||||||
|
}
|
||||||
|
if !silent {
|
||||||
|
stderrWriters = append([]io.Writer{os.Stderr}, stderrWriters...)
|
||||||
|
}
|
||||||
|
if len(stderrWriters) == 1 {
|
||||||
|
cmd.Stderr = stderrWriters[0]
|
||||||
|
} else {
|
||||||
|
cmd.Stderr = io.MultiWriter(stderrWriters...)
|
||||||
|
}
|
||||||
|
|
||||||
var stdinPipe io.WriteCloser
|
var stdinPipe io.WriteCloser
|
||||||
var err error
|
var err error
|
||||||
if useStdin {
|
if useStdin {
|
||||||
@@ -630,90 +766,120 @@ func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logErrorFn("Failed to create stdin pipe: " + err.Error())
|
logErrorFn("Failed to create stdin pipe: " + err.Error())
|
||||||
result.ExitCode = 1
|
result.ExitCode = 1
|
||||||
result.Error = "failed to create stdin pipe: " + err.Error()
|
result.Error = attachStderr("failed to create stdin pipe: " + err.Error())
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup stdout
|
|
||||||
stdout, err := cmd.StdoutPipe()
|
stdout, err := cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logErrorFn("Failed to create stdout pipe: " + err.Error())
|
logErrorFn("Failed to create stdout pipe: " + err.Error())
|
||||||
result.ExitCode = 1
|
result.ExitCode = 1
|
||||||
result.Error = "failed to create stdout pipe: " + err.Error()
|
result.Error = attachStderr("failed to create stdout pipe: " + err.Error())
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stdoutReader := io.Reader(stdout)
|
||||||
|
if stdoutLogger != nil {
|
||||||
|
stdoutReader = io.TeeReader(stdout, stdoutLogger)
|
||||||
|
}
|
||||||
|
|
||||||
logInfoFn(fmt.Sprintf("Starting codex with args: codex %s...", strings.Join(codexArgs[:min(5, len(codexArgs))], " ")))
|
logInfoFn(fmt.Sprintf("Starting codex with args: codex %s...", strings.Join(codexArgs[:min(5, len(codexArgs))], " ")))
|
||||||
|
|
||||||
// Start process
|
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
if strings.Contains(err.Error(), "executable file not found") {
|
if strings.Contains(err.Error(), "executable file not found") {
|
||||||
logErrorFn("codex command not found in PATH")
|
logErrorFn("codex command not found in PATH")
|
||||||
result.ExitCode = 127
|
result.ExitCode = 127
|
||||||
result.Error = "codex command not found in PATH"
|
result.Error = attachStderr("codex command not found in PATH")
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
logErrorFn("Failed to start codex: " + err.Error())
|
logErrorFn("Failed to start codex: " + err.Error())
|
||||||
result.ExitCode = 1
|
result.ExitCode = 1
|
||||||
result.Error = "failed to start codex: " + err.Error()
|
result.Error = attachStderr("failed to start codex: " + err.Error())
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
logInfoFn(fmt.Sprintf("Process started with PID: %d", cmd.Process.Pid))
|
|
||||||
|
|
||||||
// Write to stdin if needed
|
logInfoFn(fmt.Sprintf("Starting codex with PID: %d", cmd.Process.Pid))
|
||||||
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logInfoFn(fmt.Sprintf("Log capturing to: %s", logger.Path()))
|
||||||
|
}
|
||||||
|
|
||||||
if useStdin && stdinPipe != nil {
|
if useStdin && stdinPipe != nil {
|
||||||
logInfoFn(fmt.Sprintf("Writing %d chars to stdin...", len(taskSpec.Task)))
|
logInfoFn(fmt.Sprintf("Writing %d chars to stdin...", len(taskSpec.Task)))
|
||||||
go func() {
|
go func(data string) {
|
||||||
defer stdinPipe.Close()
|
defer stdinPipe.Close()
|
||||||
io.WriteString(stdinPipe, taskSpec.Task)
|
_, _ = io.WriteString(stdinPipe, data)
|
||||||
}()
|
}(taskSpec.Task)
|
||||||
logInfoFn("Stdin closed")
|
logInfoFn("Stdin closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
forwardSignals(ctx, cmd, logErrorFn)
|
waitCh := make(chan error, 1)
|
||||||
|
go func() { waitCh <- cmd.Wait() }()
|
||||||
|
|
||||||
logInfoFn("Reading stdout...")
|
parseCh := make(chan parseResult, 1)
|
||||||
|
go func() {
|
||||||
|
msg, tid := parseJSONStreamWithLog(stdoutReader, logWarnFn, logInfoFn)
|
||||||
|
parseCh <- parseResult{message: msg, threadID: tid}
|
||||||
|
}()
|
||||||
|
|
||||||
// Parse JSON stream
|
var waitErr error
|
||||||
message, threadID := parseJSONStreamWithWarn(stdout, logWarnFn)
|
var forceKillTimer *time.Timer
|
||||||
|
|
||||||
// Wait for process to complete
|
select {
|
||||||
err = cmd.Wait()
|
case waitErr = <-waitCh:
|
||||||
|
case <-ctx.Done():
|
||||||
|
logErrorFn(cancelReason(ctx))
|
||||||
|
forceKillTimer = terminateProcess(cmd)
|
||||||
|
waitErr = <-waitCh
|
||||||
|
}
|
||||||
|
|
||||||
// Check for timeout
|
if forceKillTimer != nil {
|
||||||
if ctx.Err() == context.DeadlineExceeded {
|
forceKillTimer.Stop()
|
||||||
logErrorFn("Codex execution timeout")
|
}
|
||||||
if cmd.Process != nil {
|
|
||||||
cmd.Process.Kill()
|
parsed := <-parseCh
|
||||||
|
|
||||||
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||||
|
if errors.Is(ctxErr, context.DeadlineExceeded) {
|
||||||
|
result.ExitCode = 124
|
||||||
|
result.Error = attachStderr("codex execution timeout")
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
result.ExitCode = 124
|
result.ExitCode = 130
|
||||||
result.Error = "codex execution timeout"
|
result.Error = attachStderr("execution cancelled")
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check exit code
|
if waitErr != nil {
|
||||||
if err != nil {
|
if exitErr, ok := waitErr.(*exec.ExitError); ok {
|
||||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
||||||
code := exitErr.ExitCode()
|
code := exitErr.ExitCode()
|
||||||
logErrorFn(fmt.Sprintf("Codex exited with status %d", code))
|
logErrorFn(fmt.Sprintf("Codex exited with status %d", code))
|
||||||
result.ExitCode = code
|
result.ExitCode = code
|
||||||
result.Error = fmt.Sprintf("codex exited with status %d", code)
|
result.Error = attachStderr(fmt.Sprintf("codex exited with status %d", code))
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
logErrorFn("Codex error: " + err.Error())
|
logErrorFn("Codex error: " + waitErr.Error())
|
||||||
result.ExitCode = 1
|
result.ExitCode = 1
|
||||||
result.Error = "codex error: " + err.Error()
|
result.Error = attachStderr("codex error: " + waitErr.Error())
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message := parsed.message
|
||||||
|
threadID := parsed.threadID
|
||||||
if message == "" {
|
if message == "" {
|
||||||
logErrorFn("Codex completed without agent_message output")
|
logErrorFn("Codex completed without agent_message output")
|
||||||
result.ExitCode = 1
|
result.ExitCode = 1
|
||||||
result.Error = "codex completed without agent_message output"
|
result.Error = attachStderr("codex completed without agent_message output")
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if stdoutLogger != nil {
|
||||||
|
stdoutLogger.Flush()
|
||||||
|
}
|
||||||
|
if stderrLogger != nil {
|
||||||
|
stderrLogger.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
result.ExitCode = 0
|
result.ExitCode = 0
|
||||||
result.Message = message
|
result.Message = message
|
||||||
result.SessionID = threadID
|
result.SessionID = threadID
|
||||||
@@ -721,6 +887,36 @@ func runCodexTask(taskSpec TaskSpec, silent bool, timeoutSec int) TaskResult {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type tailBuffer struct {
|
||||||
|
limit int
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *tailBuffer) Write(p []byte) (int, error) {
|
||||||
|
if b.limit <= 0 {
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(p) >= b.limit {
|
||||||
|
b.data = append(b.data[:0], p[len(p)-b.limit:]...)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
total := len(b.data) + len(p)
|
||||||
|
if total <= b.limit {
|
||||||
|
b.data = append(b.data, p...)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
overflow := total - b.limit
|
||||||
|
b.data = append(b.data[overflow:], p...)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *tailBuffer) String() string {
|
||||||
|
return string(b.data)
|
||||||
|
}
|
||||||
|
|
||||||
func forwardSignals(ctx context.Context, cmd *exec.Cmd, logErrorFn func(string)) {
|
func forwardSignals(ctx context.Context, cmd *exec.Cmd, logErrorFn func(string)) {
|
||||||
sigCh := make(chan os.Signal, 1)
|
sigCh := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||||
@@ -743,23 +939,59 @@ func forwardSignals(ctx context.Context, cmd *exec.Cmd, logErrorFn func(string))
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cancelReason(ctx context.Context) string {
|
||||||
|
if ctx == nil {
|
||||||
|
return "Context cancelled"
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
||||||
|
return "Codex execution timeout"
|
||||||
|
}
|
||||||
|
|
||||||
|
return "Execution cancelled, terminating codex process"
|
||||||
|
}
|
||||||
|
|
||||||
|
func terminateProcess(cmd *exec.Cmd) *time.Timer {
|
||||||
|
if cmd == nil || cmd.Process == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = cmd.Process.Signal(syscall.SIGTERM)
|
||||||
|
|
||||||
|
return time.AfterFunc(time.Duration(forceKillDelay)*time.Second, func() {
|
||||||
|
if cmd.Process != nil {
|
||||||
|
_ = cmd.Process.Kill()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func parseJSONStream(r io.Reader) (message, threadID string) {
|
func parseJSONStream(r io.Reader) (message, threadID string) {
|
||||||
return parseJSONStreamWithWarn(r, logWarn)
|
return parseJSONStreamWithLog(r, logWarn, logInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseJSONStreamWithWarn(r io.Reader, warnFn func(string)) (message, threadID string) {
|
func parseJSONStreamWithWarn(r io.Reader, warnFn func(string)) (message, threadID string) {
|
||||||
|
return parseJSONStreamWithLog(r, warnFn, logInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string)) (message, threadID string) {
|
||||||
scanner := bufio.NewScanner(r)
|
scanner := bufio.NewScanner(r)
|
||||||
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
|
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
|
||||||
|
|
||||||
if warnFn == nil {
|
if warnFn == nil {
|
||||||
warnFn = func(string) {}
|
warnFn = func(string) {}
|
||||||
}
|
}
|
||||||
|
if infoFn == nil {
|
||||||
|
infoFn = func(string) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
totalEvents := 0
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := strings.TrimSpace(scanner.Text())
|
line := strings.TrimSpace(scanner.Text())
|
||||||
if line == "" {
|
if line == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
totalEvents++
|
||||||
|
|
||||||
var event JSONEvent
|
var event JSONEvent
|
||||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||||
@@ -767,26 +999,71 @@ func parseJSONStreamWithWarn(r io.Reader, warnFn func(string)) (message, threadI
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture thread_id
|
var details []string
|
||||||
if event.Type == "thread.started" {
|
if event.ThreadID != "" {
|
||||||
threadID = event.ThreadID
|
details = append(details, fmt.Sprintf("thread_id=%s", event.ThreadID))
|
||||||
|
}
|
||||||
|
if event.Item != nil && event.Item.Type != "" {
|
||||||
|
details = append(details, fmt.Sprintf("item_type=%s", event.Item.Type))
|
||||||
|
}
|
||||||
|
if len(details) > 0 {
|
||||||
|
infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, event.Type, strings.Join(details, ", ")))
|
||||||
|
} else {
|
||||||
|
infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, event.Type))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture agent_message
|
switch event.Type {
|
||||||
if event.Type == "item.completed" && event.Item != nil && event.Item.Type == "agent_message" {
|
case "thread.started":
|
||||||
if text := normalizeText(event.Item.Text); text != "" {
|
threadID = event.ThreadID
|
||||||
message = text
|
infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID))
|
||||||
|
case "item.completed":
|
||||||
|
var itemType string
|
||||||
|
var normalized string
|
||||||
|
if event.Item != nil {
|
||||||
|
itemType = event.Item.Type
|
||||||
|
normalized = normalizeText(event.Item.Text)
|
||||||
|
}
|
||||||
|
infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized)))
|
||||||
|
if event.Item != nil && event.Item.Type == "agent_message" && normalized != "" {
|
||||||
|
message = normalized
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil && err != io.EOF {
|
if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) {
|
||||||
warnFn("Read stdout error: " + err.Error())
|
warnFn("Read stdout error: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
infoFn(fmt.Sprintf("parseJSONStream completed: events=%d, message_len=%d, thread_id_found=%t", totalEvents, len(message), threadID != ""))
|
||||||
return message, threadID
|
return message, threadID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func discardInvalidJSON(decoder *json.Decoder, reader *bufio.Reader) (*bufio.Reader, error) {
|
||||||
|
var buffered bytes.Buffer
|
||||||
|
|
||||||
|
if decoder != nil {
|
||||||
|
if buf := decoder.Buffered(); buf != nil {
|
||||||
|
_, _ = buffered.ReadFrom(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
line, err := reader.ReadBytes('\n')
|
||||||
|
buffered.Write(line)
|
||||||
|
|
||||||
|
data := buffered.Bytes()
|
||||||
|
newline := bytes.IndexByte(data, '\n')
|
||||||
|
if newline == -1 {
|
||||||
|
return reader, err
|
||||||
|
}
|
||||||
|
|
||||||
|
remaining := data[newline+1:]
|
||||||
|
if len(remaining) == 0 {
|
||||||
|
return reader, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bufio.NewReader(io.MultiReader(bytes.NewReader(remaining), reader)), err
|
||||||
|
}
|
||||||
|
|
||||||
func normalizeText(text interface{}) string {
|
func normalizeText(text interface{}) string {
|
||||||
switch v := text.(type) {
|
switch v := text.(type) {
|
||||||
case string:
|
case string:
|
||||||
@@ -816,7 +1093,6 @@ func resolveTimeout() int {
|
|||||||
return defaultTimeout
|
return defaultTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// Environment variable is in milliseconds if > 10000, convert to seconds
|
|
||||||
if parsed > 10000 {
|
if parsed > 10000 {
|
||||||
return parsed / 1000
|
return parsed / 1000
|
||||||
}
|
}
|
||||||
@@ -842,10 +1118,71 @@ func getEnv(key, defaultValue string) string {
|
|||||||
return defaultValue
|
return defaultValue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type logWriter struct {
|
||||||
|
prefix string
|
||||||
|
maxLen int
|
||||||
|
buf bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLogWriter(prefix string, maxLen int) *logWriter {
|
||||||
|
if maxLen <= 0 {
|
||||||
|
maxLen = codexLogLineLimit
|
||||||
|
}
|
||||||
|
return &logWriter{prefix: prefix, maxLen: maxLen}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *logWriter) Write(p []byte) (int, error) {
|
||||||
|
if lw == nil {
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
total := len(p)
|
||||||
|
for len(p) > 0 {
|
||||||
|
if idx := bytes.IndexByte(p, '\n'); idx >= 0 {
|
||||||
|
lw.buf.Write(p[:idx])
|
||||||
|
lw.logLine(true)
|
||||||
|
p = p[idx+1:]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lw.buf.Write(p)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *logWriter) Flush() {
|
||||||
|
if lw == nil || lw.buf.Len() == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lw.logLine(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *logWriter) logLine(force bool) {
|
||||||
|
if lw == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
line := lw.buf.String()
|
||||||
|
lw.buf.Reset()
|
||||||
|
if line == "" && !force {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lw.maxLen > 0 && len(line) > lw.maxLen {
|
||||||
|
cutoff := lw.maxLen
|
||||||
|
if cutoff > 3 {
|
||||||
|
line = line[:cutoff-3] + "..."
|
||||||
|
} else {
|
||||||
|
line = line[:cutoff]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logInfo(lw.prefix + line)
|
||||||
|
}
|
||||||
|
|
||||||
func truncate(s string, maxLen int) string {
|
func truncate(s string, maxLen int) string {
|
||||||
if len(s) <= maxLen {
|
if len(s) <= maxLen {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
if maxLen < 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
return s[:maxLen] + "..."
|
return s[:maxLen] + "..."
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -856,6 +1193,22 @@ func min(a, b int) int {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setLogger(l *Logger) {
|
||||||
|
loggerPtr.Store(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func closeLogger() error {
|
||||||
|
logger := loggerPtr.Swap(nil)
|
||||||
|
if logger == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return logger.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func activeLogger() *Logger {
|
||||||
|
return loggerPtr.Load()
|
||||||
|
}
|
||||||
|
|
||||||
func hello() string {
|
func hello() string {
|
||||||
return "hello world"
|
return "hello world"
|
||||||
}
|
}
|
||||||
@@ -869,15 +1222,30 @@ func farewell(name string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func logInfo(msg string) {
|
func logInfo(msg string) {
|
||||||
fmt.Fprintf(os.Stderr, "INFO: %s\n", msg)
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logger.Info(msg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func logWarn(msg string) {
|
func logWarn(msg string) {
|
||||||
fmt.Fprintf(os.Stderr, "WARN: %s\n", msg)
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logger.Warn(msg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func logError(msg string) {
|
func logError(msg string) {
|
||||||
fmt.Fprintf(os.Stderr, "ERROR: %s\n", msg)
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logger.Error(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runCleanupHook() {
|
||||||
|
if logger := activeLogger(); logger != nil {
|
||||||
|
logger.Flush()
|
||||||
|
}
|
||||||
|
if cleanupHook != nil {
|
||||||
|
cleanupHook()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func printHelp() {
|
func printHelp() {
|
||||||
@@ -888,9 +1256,15 @@ Usage:
|
|||||||
codex-wrapper - [workdir] Read task from stdin
|
codex-wrapper - [workdir] Read task from stdin
|
||||||
codex-wrapper resume <session_id> "task" [workdir]
|
codex-wrapper resume <session_id> "task" [workdir]
|
||||||
codex-wrapper resume <session_id> - [workdir]
|
codex-wrapper resume <session_id> - [workdir]
|
||||||
|
codex-wrapper --parallel Run tasks in parallel (config from stdin)
|
||||||
codex-wrapper --version
|
codex-wrapper --version
|
||||||
codex-wrapper --help
|
codex-wrapper --help
|
||||||
|
|
||||||
|
Parallel mode examples:
|
||||||
|
codex-wrapper --parallel < tasks.txt
|
||||||
|
echo '...' | codex-wrapper --parallel
|
||||||
|
codex-wrapper --parallel <<'EOF'
|
||||||
|
|
||||||
Environment Variables:
|
Environment Variables:
|
||||||
CODEX_TIMEOUT Timeout in milliseconds (default: 7200000)
|
CODEX_TIMEOUT Timeout in milliseconds (default: 7200000)
|
||||||
|
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -39,7 +39,7 @@ Before any tool call, restate the user goal and outline the current plan. While
|
|||||||
</tool_preambles>
|
</tool_preambles>
|
||||||
|
|
||||||
<self_reflection>
|
<self_reflection>
|
||||||
Construct a private rubric with at least five categories (maintainability, tests with ≥90% coverage, performance, security, style, documentation, backward compatibility). Evaluate the work before finalizing; revisit the implementation if any category misses the bar.
|
Construct a private rubric with at least five categories (maintainability, performance, security, style, documentation, backward compatibility). Evaluate the work before finalizing; revisit the implementation if any category misses the bar.
|
||||||
</self_reflection>
|
</self_reflection>
|
||||||
|
|
||||||
<output_verbosity>
|
<output_verbosity>
|
||||||
|
|||||||
@@ -180,11 +180,50 @@ EOF
|
|||||||
|
|
||||||
### Parallel Execution
|
### Parallel Execution
|
||||||
|
|
||||||
|
> Important:
|
||||||
|
> - `--parallel` only reads task definitions from stdin.
|
||||||
|
> - It does not accept extra command-line arguments (no inline `workdir`, `task`, or other params).
|
||||||
|
> - Put all task metadata and content in stdin; nothing belongs after `--parallel` on the command line.
|
||||||
|
|
||||||
|
**Correct vs Incorrect Usage**
|
||||||
|
|
||||||
|
**Correct:**
|
||||||
|
```bash
|
||||||
|
# Option 1: file redirection
|
||||||
|
codex-wrapper --parallel < tasks.txt
|
||||||
|
|
||||||
|
# Option 2: heredoc (recommended for multiple tasks)
|
||||||
|
codex-wrapper --parallel <<'EOF'
|
||||||
|
---TASK---
|
||||||
|
id: task1
|
||||||
|
workdir: /path/to/dir
|
||||||
|
---CONTENT---
|
||||||
|
task content
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# Option 3: pipe
|
||||||
|
echo "---TASK---..." | codex-wrapper --parallel
|
||||||
|
```
|
||||||
|
|
||||||
|
**Incorrect (will trigger shell parsing errors):**
|
||||||
|
```bash
|
||||||
|
# Bad: no extra args allowed after --parallel
|
||||||
|
codex-wrapper --parallel - /path/to/dir <<'EOF'
|
||||||
|
...
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# Bad: --parallel does not take a task argument
|
||||||
|
codex-wrapper --parallel "task description"
|
||||||
|
|
||||||
|
# Bad: workdir must live inside the task config
|
||||||
|
codex-wrapper --parallel /path/to/dir < tasks.txt
|
||||||
|
```
|
||||||
|
|
||||||
For multiple independent or dependent tasks, use `--parallel` mode with delimiter format:
|
For multiple independent or dependent tasks, use `--parallel` mode with delimiter format:
|
||||||
|
|
||||||
**Typical Workflow (analyze → implement → test, chained in a single parallel call)**:
|
**Typical Workflow (analyze → implement → test, chained in a single parallel call)**:
|
||||||
```bash
|
```bash
|
||||||
codex-wrapper --parallel - <<'EOF'
|
codex-wrapper --parallel <<'EOF'
|
||||||
---TASK---
|
---TASK---
|
||||||
id: analyze_1732876800
|
id: analyze_1732876800
|
||||||
workdir: /home/user/project
|
workdir: /home/user/project
|
||||||
@@ -207,7 +246,7 @@ EOF
|
|||||||
A single `codex-wrapper --parallel` call schedules all three stages concurrently, using `dependencies` to enforce sequential ordering without multiple invocations.
|
A single `codex-wrapper --parallel` call schedules all three stages concurrently, using `dependencies` to enforce sequential ordering without multiple invocations.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
codex-wrapper --parallel - <<'EOF'
|
codex-wrapper --parallel <<'EOF'
|
||||||
---TASK---
|
---TASK---
|
||||||
id: backend_1732876800
|
id: backend_1732876800
|
||||||
workdir: /home/user/project/backend
|
workdir: /home/user/project/backend
|
||||||
@@ -235,6 +274,8 @@ EOF
|
|||||||
- `workdir: <path>`: Optional, working directory (default: `.`)
|
- `workdir: <path>`: Optional, working directory (default: `.`)
|
||||||
- Best practice: use absolute paths (e.g., `/home/user/project/backend`)
|
- Best practice: use absolute paths (e.g., `/home/user/project/backend`)
|
||||||
- Avoids ambiguity and ensures consistent behavior across environments
|
- Avoids ambiguity and ensures consistent behavior across environments
|
||||||
|
- Must be specified inside each task block; do not pass `workdir` as a CLI argument to `--parallel`
|
||||||
|
- Each task can set its own `workdir` when different directories are needed
|
||||||
- `dependencies: <id1>, <id2>`: Optional, comma-separated task IDs
|
- `dependencies: <id1>, <id2>`: Optional, comma-separated task IDs
|
||||||
- `session_id: <uuid>`: Optional, resume a previous session
|
- `session_id: <uuid>`: Optional, resume a previous session
|
||||||
- `---CONTENT---`: Separates metadata from task content
|
- `---CONTENT---`: Separates metadata from task content
|
||||||
@@ -249,7 +290,7 @@ EOF
|
|||||||
**Resume Failed Tasks**:
|
**Resume Failed Tasks**:
|
||||||
```bash
|
```bash
|
||||||
# Use session_id from previous output to resume
|
# Use session_id from previous output to resume
|
||||||
codex-wrapper --parallel - <<'EOF'
|
codex-wrapper --parallel <<'EOF'
|
||||||
---TASK---
|
---TASK---
|
||||||
id: T2
|
id: T2
|
||||||
session_id: 019xxx-previous-session-id
|
session_id: 019xxx-previous-session-id
|
||||||
|
|||||||
Reference in New Issue
Block a user