mirror of
https://github.com/cexll/myclaude.git
synced 2026-02-05 02:30:26 +08:00
- Use cmd.exe /c to execute .bat/.cmd on Windows - Set USERPROFILE alongside HOME for os.UserHomeDir() - Use setTempDirEnv to set TEMP/TMP on Windows - Replace chmod-based tests with cross-platform alternatives - Fix concurrent speedup benchmark with fair comparison - Add output/ to gitignore Generated with SWE-Agent.ai Co-Authored-By: SWE-Agent.ai <noreply@swe-agent.ai>
1486 lines
38 KiB
Go
1486 lines
38 KiB
Go
package executor
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
backend "codeagent-wrapper/internal/backend"
|
|
config "codeagent-wrapper/internal/config"
|
|
ilogger "codeagent-wrapper/internal/logger"
|
|
parser "codeagent-wrapper/internal/parser"
|
|
utils "codeagent-wrapper/internal/utils"
|
|
)
|
|
|
|
const postMessageTerminateDelay = 1 * time.Second
|
|
const forceKillWaitTimeout = 5 * time.Second
|
|
|
|
// Defaults duplicated from wrapper for module decoupling.
|
|
const (
|
|
defaultWorkdir = "."
|
|
defaultCoverageTarget = 90.0
|
|
defaultBackendName = "codex"
|
|
|
|
codexLogLineLimit = 1000
|
|
stderrCaptureLimit = 4 * 1024
|
|
)
|
|
|
|
const (
|
|
// stdout close reasons
|
|
stdoutCloseReasonWait = "wait-done"
|
|
stdoutCloseReasonDrain = "drain-timeout"
|
|
stdoutCloseReasonCtx = "context-cancel"
|
|
stdoutDrainTimeout = 500 * time.Millisecond
|
|
)
|
|
|
|
// Hook points (tests can override inside this package).
|
|
var (
|
|
selectBackendFn = backend.Select
|
|
commandContext = exec.CommandContext
|
|
terminateCommandFn = terminateCommand
|
|
)
|
|
|
|
var forceKillDelay atomic.Int32
|
|
|
|
func init() {
|
|
forceKillDelay.Store(5) // seconds - default value
|
|
}
|
|
|
|
type (
|
|
Backend = backend.Backend
|
|
Config = config.Config
|
|
Logger = ilogger.Logger
|
|
)
|
|
|
|
type minimalClaudeSettings = backend.MinimalClaudeSettings
|
|
|
|
func loadMinimalClaudeSettings() minimalClaudeSettings { return backend.LoadMinimalClaudeSettings() }
|
|
|
|
func loadGeminiEnv() map[string]string { return backend.LoadGeminiEnv() }
|
|
|
|
func NewLogger() (*Logger, error) { return ilogger.NewLogger() }
|
|
|
|
func NewLoggerWithSuffix(suffix string) (*Logger, error) { return ilogger.NewLoggerWithSuffix(suffix) }
|
|
|
|
func setLogger(l *Logger) { ilogger.SetLogger(l) }
|
|
|
|
func closeLogger() error { return ilogger.CloseLogger() }
|
|
|
|
func activeLogger() *Logger { return ilogger.ActiveLogger() }
|
|
|
|
func logInfo(msg string) { ilogger.LogInfo(msg) }
|
|
|
|
func logWarn(msg string) { ilogger.LogWarn(msg) }
|
|
|
|
func logError(msg string) { ilogger.LogError(msg) }
|
|
|
|
func logConcurrencyPlanning(limit, total int) { ilogger.LogConcurrencyPlanning(limit, total) }
|
|
|
|
func logConcurrencyState(event, taskID string, active, limit int) {
|
|
ilogger.LogConcurrencyState(event, taskID, active, limit)
|
|
}
|
|
|
|
func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(string), onMessage func(), onComplete func()) (message, threadID string) {
|
|
return parser.ParseJSONStreamInternal(r, warnFn, infoFn, onMessage, onComplete)
|
|
}
|
|
|
|
func sanitizeOutput(s string) string { return utils.SanitizeOutput(s) }
|
|
|
|
func safeTruncate(s string, maxLen int) string { return utils.SafeTruncate(s, maxLen) }
|
|
|
|
func min(a, b int) int { return utils.Min(a, b) }
|
|
|
|
// commandRunner abstracts exec.Cmd for testability
|
|
type commandRunner interface {
|
|
Start() error
|
|
Wait() error
|
|
StdoutPipe() (io.ReadCloser, error)
|
|
StderrPipe() (io.ReadCloser, error)
|
|
StdinPipe() (io.WriteCloser, error)
|
|
SetStderr(io.Writer)
|
|
SetDir(string)
|
|
SetEnv(env map[string]string)
|
|
Process() processHandle
|
|
}
|
|
|
|
// processHandle abstracts os.Process for testability
|
|
type processHandle interface {
|
|
Pid() int
|
|
Kill() error
|
|
Signal(os.Signal) error
|
|
}
|
|
|
|
// realCmd implements commandRunner using exec.Cmd
|
|
type realCmd struct {
|
|
cmd *exec.Cmd
|
|
}
|
|
|
|
func (r *realCmd) Start() error {
|
|
if r.cmd == nil {
|
|
return errors.New("command is nil")
|
|
}
|
|
return r.cmd.Start()
|
|
}
|
|
|
|
func (r *realCmd) Wait() error {
|
|
if r.cmd == nil {
|
|
return errors.New("command is nil")
|
|
}
|
|
return r.cmd.Wait()
|
|
}
|
|
|
|
func (r *realCmd) StdoutPipe() (io.ReadCloser, error) {
|
|
if r.cmd == nil {
|
|
return nil, errors.New("command is nil")
|
|
}
|
|
return r.cmd.StdoutPipe()
|
|
}
|
|
|
|
func (r *realCmd) StderrPipe() (io.ReadCloser, error) {
|
|
if r.cmd == nil {
|
|
return nil, errors.New("command is nil")
|
|
}
|
|
return r.cmd.StderrPipe()
|
|
}
|
|
|
|
func (r *realCmd) StdinPipe() (io.WriteCloser, error) {
|
|
if r.cmd == nil {
|
|
return nil, errors.New("command is nil")
|
|
}
|
|
return r.cmd.StdinPipe()
|
|
}
|
|
|
|
func (r *realCmd) SetStderr(w io.Writer) {
|
|
if r.cmd != nil {
|
|
r.cmd.Stderr = w
|
|
}
|
|
}
|
|
|
|
func (r *realCmd) SetDir(dir string) {
|
|
if r.cmd != nil {
|
|
r.cmd.Dir = dir
|
|
}
|
|
}
|
|
|
|
func (r *realCmd) SetEnv(env map[string]string) {
|
|
if r == nil || r.cmd == nil || len(env) == 0 {
|
|
return
|
|
}
|
|
|
|
merged := make(map[string]string, len(env)+len(os.Environ()))
|
|
for _, kv := range os.Environ() {
|
|
if kv == "" {
|
|
continue
|
|
}
|
|
idx := strings.IndexByte(kv, '=')
|
|
if idx <= 0 {
|
|
continue
|
|
}
|
|
merged[kv[:idx]] = kv[idx+1:]
|
|
}
|
|
for _, kv := range r.cmd.Env {
|
|
if kv == "" {
|
|
continue
|
|
}
|
|
idx := strings.IndexByte(kv, '=')
|
|
if idx <= 0 {
|
|
continue
|
|
}
|
|
merged[kv[:idx]] = kv[idx+1:]
|
|
}
|
|
for k, v := range env {
|
|
if strings.TrimSpace(k) == "" {
|
|
continue
|
|
}
|
|
merged[k] = v
|
|
}
|
|
|
|
keys := make([]string, 0, len(merged))
|
|
for k := range merged {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
|
|
out := make([]string, 0, len(keys))
|
|
for _, k := range keys {
|
|
out = append(out, k+"="+merged[k])
|
|
}
|
|
r.cmd.Env = out
|
|
}
|
|
|
|
func (r *realCmd) Process() processHandle {
|
|
if r == nil || r.cmd == nil || r.cmd.Process == nil {
|
|
return nil
|
|
}
|
|
return &realProcess{proc: r.cmd.Process}
|
|
}
|
|
|
|
// realProcess implements processHandle using os.Process
|
|
type realProcess struct {
|
|
proc *os.Process
|
|
}
|
|
|
|
func (p *realProcess) Pid() int {
|
|
if p == nil || p.proc == nil {
|
|
return 0
|
|
}
|
|
return p.proc.Pid
|
|
}
|
|
|
|
func (p *realProcess) Kill() error {
|
|
if p == nil || p.proc == nil {
|
|
return nil
|
|
}
|
|
return p.proc.Kill()
|
|
}
|
|
|
|
func (p *realProcess) Signal(sig os.Signal) error {
|
|
if p == nil || p.proc == nil {
|
|
return nil
|
|
}
|
|
return p.proc.Signal(sig)
|
|
}
|
|
|
|
// newCommandRunner creates a new commandRunner (test hook injection point)
|
|
var newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner {
|
|
if runtime.GOOS == "windows" {
|
|
lowerName := strings.ToLower(strings.TrimSpace(name))
|
|
if strings.HasSuffix(lowerName, ".bat") || strings.HasSuffix(lowerName, ".cmd") {
|
|
cmdArgs := make([]string, 0, 2+len(args))
|
|
cmdArgs = append(cmdArgs, "/c", name)
|
|
cmdArgs = append(cmdArgs, args...)
|
|
return &realCmd{cmd: commandContext(ctx, "cmd.exe", cmdArgs...)}
|
|
}
|
|
}
|
|
return &realCmd{cmd: commandContext(ctx, name, args...)}
|
|
}
|
|
|
|
type parseResult struct {
|
|
message string
|
|
threadID string
|
|
}
|
|
|
|
type taskLoggerContextKey struct{}
|
|
|
|
func withTaskLogger(ctx context.Context, logger *Logger) context.Context {
|
|
if ctx == nil || logger == nil {
|
|
return ctx
|
|
}
|
|
return context.WithValue(ctx, taskLoggerContextKey{}, logger)
|
|
}
|
|
|
|
func taskLoggerFromContext(ctx context.Context) *Logger {
|
|
if ctx == nil {
|
|
return nil
|
|
}
|
|
logger, _ := ctx.Value(taskLoggerContextKey{}).(*Logger)
|
|
return logger
|
|
}
|
|
|
|
type taskLoggerHandle struct {
|
|
logger *Logger
|
|
path string
|
|
shared bool
|
|
closeFn func()
|
|
}
|
|
|
|
func newTaskLoggerHandle(taskID string) taskLoggerHandle {
|
|
taskLogger, err := NewLoggerWithSuffix(taskID)
|
|
if err == nil {
|
|
return taskLoggerHandle{
|
|
logger: taskLogger,
|
|
path: taskLogger.Path(),
|
|
closeFn: func() { _ = taskLogger.Close() },
|
|
}
|
|
}
|
|
|
|
msg := fmt.Sprintf("Failed to create task logger for %s: %v, using main logger", taskID, err)
|
|
mainLogger := activeLogger()
|
|
if mainLogger != nil {
|
|
logWarn(msg)
|
|
return taskLoggerHandle{
|
|
logger: mainLogger,
|
|
path: mainLogger.Path(),
|
|
shared: true,
|
|
}
|
|
}
|
|
|
|
fmt.Fprintln(os.Stderr, msg)
|
|
return taskLoggerHandle{}
|
|
}
|
|
|
|
// defaultRunCodexTaskFn is the default implementation of runCodexTaskFn (exposed for test reset)
|
|
func DefaultRunCodexTaskFn(task TaskSpec, timeout int) TaskResult {
|
|
if task.WorkDir == "" {
|
|
task.WorkDir = defaultWorkdir
|
|
}
|
|
if task.Mode == "" {
|
|
task.Mode = "new"
|
|
}
|
|
if strings.TrimSpace(task.PromptFile) != "" {
|
|
prompt, err := ReadAgentPromptFile(task.PromptFile, false)
|
|
if err != nil {
|
|
return TaskResult{TaskID: task.ID, ExitCode: 1, Error: "failed to read prompt file: " + err.Error()}
|
|
}
|
|
task.Task = WrapTaskWithAgentPrompt(prompt, task.Task)
|
|
}
|
|
if task.UseStdin || ShouldUseStdin(task.Task, false) {
|
|
task.UseStdin = true
|
|
}
|
|
|
|
backendName := task.Backend
|
|
if backendName == "" {
|
|
backendName = defaultBackendName
|
|
}
|
|
|
|
backend, err := selectBackendFn(backendName)
|
|
if err != nil {
|
|
return TaskResult{TaskID: task.ID, ExitCode: 1, Error: err.Error()}
|
|
}
|
|
task.Backend = backend.Name()
|
|
|
|
parentCtx := task.Context
|
|
if parentCtx == nil {
|
|
parentCtx = context.Background()
|
|
}
|
|
return RunCodexTaskWithContext(parentCtx, task, backend, "", nil, nil, false, true, timeout)
|
|
}
|
|
|
|
func TopologicalSort(tasks []TaskSpec) ([][]TaskSpec, error) {
|
|
idToTask := make(map[string]TaskSpec, len(tasks))
|
|
indegree := make(map[string]int, len(tasks))
|
|
adj := make(map[string][]string, len(tasks))
|
|
|
|
for _, task := range tasks {
|
|
idToTask[task.ID] = task
|
|
indegree[task.ID] = 0
|
|
}
|
|
|
|
for _, task := range tasks {
|
|
for _, dep := range task.Dependencies {
|
|
if _, ok := idToTask[dep]; !ok {
|
|
return nil, fmt.Errorf("dependency %q not found for task %q", dep, task.ID)
|
|
}
|
|
indegree[task.ID]++
|
|
adj[dep] = append(adj[dep], task.ID)
|
|
}
|
|
}
|
|
|
|
queue := make([]string, 0, len(tasks))
|
|
for _, task := range tasks {
|
|
if indegree[task.ID] == 0 {
|
|
queue = append(queue, task.ID)
|
|
}
|
|
}
|
|
|
|
layers := make([][]TaskSpec, 0)
|
|
processed := 0
|
|
|
|
for len(queue) > 0 {
|
|
current := queue
|
|
queue = nil
|
|
layer := make([]TaskSpec, len(current))
|
|
for i, id := range current {
|
|
layer[i] = idToTask[id]
|
|
processed++
|
|
}
|
|
layers = append(layers, layer)
|
|
|
|
next := make([]string, 0)
|
|
for _, id := range current {
|
|
for _, neighbor := range adj[id] {
|
|
indegree[neighbor]--
|
|
if indegree[neighbor] == 0 {
|
|
next = append(next, neighbor)
|
|
}
|
|
}
|
|
}
|
|
queue = append(queue, next...)
|
|
}
|
|
|
|
if processed != len(tasks) {
|
|
cycleIDs := make([]string, 0)
|
|
for id, deg := range indegree {
|
|
if deg > 0 {
|
|
cycleIDs = append(cycleIDs, id)
|
|
}
|
|
}
|
|
sort.Strings(cycleIDs)
|
|
return nil, fmt.Errorf("cycle detected involving tasks: %s", strings.Join(cycleIDs, ","))
|
|
}
|
|
|
|
return layers, nil
|
|
}
|
|
|
|
func ExecuteConcurrent(layers [][]TaskSpec, timeout int, runTask func(TaskSpec, int) TaskResult) []TaskResult {
|
|
maxWorkers := config.ResolveMaxParallelWorkers()
|
|
return ExecuteConcurrentWithContext(context.Background(), layers, timeout, maxWorkers, runTask)
|
|
}
|
|
|
|
func ExecuteConcurrentWithContext(parentCtx context.Context, layers [][]TaskSpec, timeout int, maxWorkers int, runTask func(TaskSpec, int) TaskResult) []TaskResult {
|
|
if runTask == nil {
|
|
runTask = DefaultRunCodexTaskFn
|
|
}
|
|
|
|
totalTasks := 0
|
|
for _, layer := range layers {
|
|
totalTasks += len(layer)
|
|
}
|
|
|
|
results := make([]TaskResult, 0, totalTasks)
|
|
failed := make(map[string]TaskResult, totalTasks)
|
|
resultsCh := make(chan TaskResult, totalTasks)
|
|
|
|
var startPrintMu sync.Mutex
|
|
bannerPrinted := false
|
|
|
|
printTaskStart := func(taskID, logPath string, shared bool) {
|
|
if logPath == "" {
|
|
return
|
|
}
|
|
startPrintMu.Lock()
|
|
if !bannerPrinted {
|
|
fmt.Fprintln(os.Stderr, "=== Starting Parallel Execution ===")
|
|
bannerPrinted = true
|
|
}
|
|
label := "Log"
|
|
if shared {
|
|
label = "Log (shared)"
|
|
}
|
|
fmt.Fprintf(os.Stderr, "Task %s: %s: %s\n", taskID, label, logPath)
|
|
startPrintMu.Unlock()
|
|
}
|
|
|
|
ctx := parentCtx
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
workerLimit := maxWorkers
|
|
if workerLimit < 0 {
|
|
workerLimit = 0
|
|
}
|
|
|
|
var sem chan struct{}
|
|
if workerLimit > 0 {
|
|
sem = make(chan struct{}, workerLimit)
|
|
}
|
|
|
|
logConcurrencyPlanning(workerLimit, totalTasks)
|
|
|
|
acquireSlot := func() bool {
|
|
if sem == nil {
|
|
return true
|
|
}
|
|
select {
|
|
case sem <- struct{}{}:
|
|
return true
|
|
case <-ctx.Done():
|
|
return false
|
|
}
|
|
}
|
|
|
|
releaseSlot := func() {
|
|
if sem == nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-sem:
|
|
default:
|
|
}
|
|
}
|
|
|
|
var activeWorkers int64
|
|
|
|
for _, layer := range layers {
|
|
var wg sync.WaitGroup
|
|
executed := 0
|
|
|
|
for _, task := range layer {
|
|
if skip, reason := shouldSkipTask(task, failed); skip {
|
|
res := TaskResult{TaskID: task.ID, ExitCode: 1, Error: reason}
|
|
results = append(results, res)
|
|
failed[task.ID] = res
|
|
continue
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
res := cancelledTaskResult(task.ID, ctx)
|
|
results = append(results, res)
|
|
failed[task.ID] = res
|
|
continue
|
|
}
|
|
|
|
executed++
|
|
wg.Add(1)
|
|
go func(ts TaskSpec) {
|
|
defer wg.Done()
|
|
var taskLogPath string
|
|
handle := taskLoggerHandle{}
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r), LogPath: taskLogPath, sharedLog: handle.shared}
|
|
}
|
|
}()
|
|
|
|
if !acquireSlot() {
|
|
resultsCh <- cancelledTaskResult(ts.ID, ctx)
|
|
return
|
|
}
|
|
defer releaseSlot()
|
|
|
|
current := atomic.AddInt64(&activeWorkers, 1)
|
|
logConcurrencyState("start", ts.ID, int(current), workerLimit)
|
|
defer func() {
|
|
after := atomic.AddInt64(&activeWorkers, -1)
|
|
logConcurrencyState("done", ts.ID, int(after), workerLimit)
|
|
}()
|
|
|
|
handle = newTaskLoggerHandle(ts.ID)
|
|
taskLogPath = handle.path
|
|
if handle.closeFn != nil {
|
|
defer handle.closeFn()
|
|
}
|
|
|
|
taskCtx := ctx
|
|
if handle.logger != nil {
|
|
taskCtx = withTaskLogger(ctx, handle.logger)
|
|
}
|
|
ts.Context = taskCtx
|
|
|
|
printTaskStart(ts.ID, taskLogPath, handle.shared)
|
|
|
|
res := runTask(ts, timeout)
|
|
if taskLogPath != "" {
|
|
if res.LogPath == "" || (handle.shared && handle.logger != nil && res.LogPath == handle.logger.Path()) {
|
|
res.LogPath = taskLogPath
|
|
}
|
|
}
|
|
// 只有当最终的 LogPath 确实是共享 logger 的路径时才标记为 shared
|
|
if handle.shared && handle.logger != nil && res.LogPath == handle.logger.Path() {
|
|
res.sharedLog = true
|
|
}
|
|
resultsCh <- res
|
|
}(task)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
for i := 0; i < executed; i++ {
|
|
res := <-resultsCh
|
|
results = append(results, res)
|
|
if res.ExitCode != 0 || res.Error != "" {
|
|
failed[res.TaskID] = res
|
|
}
|
|
}
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
func cancelledTaskResult(taskID string, ctx context.Context) TaskResult {
|
|
exitCode := 130
|
|
msg := "execution cancelled"
|
|
if ctx != nil && errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
|
exitCode = 124
|
|
msg = "execution timeout"
|
|
}
|
|
return TaskResult{TaskID: taskID, ExitCode: exitCode, Error: msg}
|
|
}
|
|
|
|
func shouldSkipTask(task TaskSpec, failed map[string]TaskResult) (bool, string) {
|
|
if len(task.Dependencies) == 0 {
|
|
return false, ""
|
|
}
|
|
|
|
var blocked []string
|
|
for _, dep := range task.Dependencies {
|
|
if _, ok := failed[dep]; ok {
|
|
blocked = append(blocked, dep)
|
|
}
|
|
}
|
|
|
|
if len(blocked) == 0 {
|
|
return false, ""
|
|
}
|
|
|
|
return true, fmt.Sprintf("skipped due to failed dependencies: %s", strings.Join(blocked, ","))
|
|
}
|
|
|
|
// getStatusSymbols returns status symbols based on ASCII mode.
|
|
func getStatusSymbols() (success, warning, failed string) {
|
|
if os.Getenv("CODEAGENT_ASCII_MODE") == "true" {
|
|
return "PASS", "WARN", "FAIL"
|
|
}
|
|
return "✓", "⚠️", "✗"
|
|
}
|
|
|
|
func GenerateFinalOutput(results []TaskResult) string {
|
|
return GenerateFinalOutputWithMode(results, true) // default to summary mode
|
|
}
|
|
|
|
// generateFinalOutputWithMode generates output based on mode
|
|
// summaryOnly=true: structured report - every token has value
|
|
// summaryOnly=false: full output with complete messages (legacy behavior)
|
|
func GenerateFinalOutputWithMode(results []TaskResult, summaryOnly bool) string {
|
|
var sb strings.Builder
|
|
successSymbol, warningSymbol, failedSymbol := getStatusSymbols()
|
|
|
|
reportCoverageTarget := defaultCoverageTarget
|
|
for _, res := range results {
|
|
if res.CoverageTarget > 0 {
|
|
reportCoverageTarget = res.CoverageTarget
|
|
break
|
|
}
|
|
}
|
|
|
|
// Count results by status
|
|
success := 0
|
|
failed := 0
|
|
belowTarget := 0
|
|
for _, res := range results {
|
|
if res.ExitCode == 0 && res.Error == "" {
|
|
success++
|
|
target := res.CoverageTarget
|
|
if target <= 0 {
|
|
target = reportCoverageTarget
|
|
}
|
|
if res.Coverage != "" && target > 0 && res.CoverageNum < target {
|
|
belowTarget++
|
|
}
|
|
} else {
|
|
failed++
|
|
}
|
|
}
|
|
|
|
if summaryOnly {
|
|
// Header
|
|
sb.WriteString("=== Execution Report ===\n")
|
|
sb.WriteString(fmt.Sprintf("%d tasks | %d passed | %d failed", len(results), success, failed))
|
|
if belowTarget > 0 {
|
|
sb.WriteString(fmt.Sprintf(" | %d below %.0f%%", belowTarget, reportCoverageTarget))
|
|
}
|
|
sb.WriteString("\n\n")
|
|
|
|
// Task Results - each task gets: Did + Files + Tests + Coverage
|
|
sb.WriteString("## Task Results\n")
|
|
|
|
for _, res := range results {
|
|
taskID := sanitizeOutput(res.TaskID)
|
|
coverage := sanitizeOutput(res.Coverage)
|
|
keyOutput := sanitizeOutput(res.KeyOutput)
|
|
logPath := sanitizeOutput(res.LogPath)
|
|
filesChanged := sanitizeOutput(strings.Join(res.FilesChanged, ", "))
|
|
|
|
target := res.CoverageTarget
|
|
if target <= 0 {
|
|
target = reportCoverageTarget
|
|
}
|
|
|
|
isSuccess := res.ExitCode == 0 && res.Error == ""
|
|
isBelowTarget := isSuccess && coverage != "" && target > 0 && res.CoverageNum < target
|
|
|
|
if isSuccess && !isBelowTarget {
|
|
// Passed task: one block with Did/Files/Tests
|
|
sb.WriteString(fmt.Sprintf("\n### %s %s", taskID, successSymbol))
|
|
if coverage != "" {
|
|
sb.WriteString(fmt.Sprintf(" %s", coverage))
|
|
}
|
|
sb.WriteString("\n")
|
|
|
|
if keyOutput != "" {
|
|
sb.WriteString(fmt.Sprintf("Did: %s\n", keyOutput))
|
|
}
|
|
if len(res.FilesChanged) > 0 {
|
|
sb.WriteString(fmt.Sprintf("Files: %s\n", filesChanged))
|
|
}
|
|
if res.TestsPassed > 0 {
|
|
sb.WriteString(fmt.Sprintf("Tests: %d passed\n", res.TestsPassed))
|
|
}
|
|
if logPath != "" {
|
|
sb.WriteString(fmt.Sprintf("Log: %s\n", logPath))
|
|
}
|
|
|
|
} else if isSuccess && isBelowTarget {
|
|
// Below target: add Gap info
|
|
sb.WriteString(fmt.Sprintf("\n### %s %s %s (below %.0f%%)\n", taskID, warningSymbol, coverage, target))
|
|
|
|
if keyOutput != "" {
|
|
sb.WriteString(fmt.Sprintf("Did: %s\n", keyOutput))
|
|
}
|
|
if len(res.FilesChanged) > 0 {
|
|
sb.WriteString(fmt.Sprintf("Files: %s\n", filesChanged))
|
|
}
|
|
if res.TestsPassed > 0 {
|
|
sb.WriteString(fmt.Sprintf("Tests: %d passed\n", res.TestsPassed))
|
|
}
|
|
// Extract what's missing from coverage
|
|
gap := sanitizeOutput(extractCoverageGap(res.Message))
|
|
if gap != "" {
|
|
sb.WriteString(fmt.Sprintf("Gap: %s\n", gap))
|
|
}
|
|
if logPath != "" {
|
|
sb.WriteString(fmt.Sprintf("Log: %s\n", logPath))
|
|
}
|
|
|
|
} else {
|
|
// Failed task: show error detail
|
|
sb.WriteString(fmt.Sprintf("\n### %s %s FAILED\n", taskID, failedSymbol))
|
|
sb.WriteString(fmt.Sprintf("Exit code: %d\n", res.ExitCode))
|
|
if errText := sanitizeOutput(res.Error); errText != "" {
|
|
sb.WriteString(fmt.Sprintf("Error: %s\n", errText))
|
|
}
|
|
// Show context from output (last meaningful lines)
|
|
detail := sanitizeOutput(extractErrorDetail(res.Message, 300))
|
|
if detail != "" {
|
|
sb.WriteString(fmt.Sprintf("Detail: %s\n", detail))
|
|
}
|
|
if logPath != "" {
|
|
sb.WriteString(fmt.Sprintf("Log: %s\n", logPath))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Summary section
|
|
sb.WriteString("\n## Summary\n")
|
|
sb.WriteString(fmt.Sprintf("- %d/%d completed successfully\n", success, len(results)))
|
|
|
|
if belowTarget > 0 || failed > 0 {
|
|
var needFix []string
|
|
var needCoverage []string
|
|
for _, res := range results {
|
|
if res.ExitCode != 0 || res.Error != "" {
|
|
taskID := sanitizeOutput(res.TaskID)
|
|
reason := sanitizeOutput(res.Error)
|
|
if reason == "" && res.ExitCode != 0 {
|
|
reason = fmt.Sprintf("exit code %d", res.ExitCode)
|
|
}
|
|
reason = safeTruncate(reason, 50)
|
|
needFix = append(needFix, fmt.Sprintf("%s (%s)", taskID, reason))
|
|
continue
|
|
}
|
|
|
|
target := res.CoverageTarget
|
|
if target <= 0 {
|
|
target = reportCoverageTarget
|
|
}
|
|
if res.Coverage != "" && target > 0 && res.CoverageNum < target {
|
|
needCoverage = append(needCoverage, sanitizeOutput(res.TaskID))
|
|
}
|
|
}
|
|
if len(needFix) > 0 {
|
|
sb.WriteString(fmt.Sprintf("- Fix: %s\n", strings.Join(needFix, ", ")))
|
|
}
|
|
if len(needCoverage) > 0 {
|
|
sb.WriteString(fmt.Sprintf("- Coverage: %s\n", strings.Join(needCoverage, ", ")))
|
|
}
|
|
}
|
|
|
|
} else {
|
|
// Legacy full output mode
|
|
sb.WriteString("=== Parallel Execution Summary ===\n")
|
|
sb.WriteString(fmt.Sprintf("Total: %d | Success: %d | Failed: %d\n\n", len(results), success, failed))
|
|
|
|
for _, res := range results {
|
|
taskID := sanitizeOutput(res.TaskID)
|
|
sb.WriteString(fmt.Sprintf("--- Task: %s ---\n", taskID))
|
|
if res.Error != "" {
|
|
sb.WriteString(fmt.Sprintf("Status: FAILED (exit code %d)\nError: %s\n", res.ExitCode, sanitizeOutput(res.Error)))
|
|
} else if res.ExitCode != 0 {
|
|
sb.WriteString(fmt.Sprintf("Status: FAILED (exit code %d)\n", res.ExitCode))
|
|
} else {
|
|
sb.WriteString("Status: SUCCESS\n")
|
|
}
|
|
if res.Coverage != "" {
|
|
sb.WriteString(fmt.Sprintf("Coverage: %s\n", sanitizeOutput(res.Coverage)))
|
|
}
|
|
if res.SessionID != "" {
|
|
sb.WriteString(fmt.Sprintf("Session: %s\n", sanitizeOutput(res.SessionID)))
|
|
}
|
|
if res.LogPath != "" {
|
|
logPath := sanitizeOutput(res.LogPath)
|
|
if res.sharedLog {
|
|
sb.WriteString(fmt.Sprintf("Log: %s (shared)\n", logPath))
|
|
} else {
|
|
sb.WriteString(fmt.Sprintf("Log: %s\n", logPath))
|
|
}
|
|
}
|
|
if res.Message != "" {
|
|
message := sanitizeOutput(res.Message)
|
|
if message != "" {
|
|
sb.WriteString(fmt.Sprintf("\n%s\n", message))
|
|
}
|
|
}
|
|
sb.WriteString("\n")
|
|
}
|
|
}
|
|
|
|
return sb.String()
|
|
}
|
|
|
|
func buildCodexArgs(cfg *Config, targetArg string) []string {
|
|
if cfg == nil {
|
|
panic("buildCodexArgs: nil config")
|
|
}
|
|
|
|
var resumeSessionID string
|
|
isResume := cfg.Mode == "resume"
|
|
if isResume {
|
|
resumeSessionID = strings.TrimSpace(cfg.SessionID)
|
|
if resumeSessionID == "" {
|
|
logError("invalid config: resume mode requires non-empty session_id")
|
|
isResume = false
|
|
}
|
|
}
|
|
|
|
args := []string{"e"}
|
|
|
|
// Default to bypass sandbox unless CODEX_BYPASS_SANDBOX=false
|
|
if cfg.Yolo || config.EnvFlagDefaultTrue("CODEX_BYPASS_SANDBOX") {
|
|
logWarn("YOLO mode or CODEX_BYPASS_SANDBOX enabled: running without approval/sandbox protection")
|
|
args = append(args, "--dangerously-bypass-approvals-and-sandbox")
|
|
}
|
|
|
|
if model := strings.TrimSpace(cfg.Model); model != "" {
|
|
args = append(args, "--model", model)
|
|
}
|
|
|
|
if reasoningEffort := strings.TrimSpace(cfg.ReasoningEffort); reasoningEffort != "" {
|
|
args = append(args, "-c", "model_reasoning_effort="+reasoningEffort)
|
|
}
|
|
|
|
args = append(args, "--skip-git-repo-check")
|
|
|
|
if isResume {
|
|
return append(args,
|
|
"--json",
|
|
"resume",
|
|
resumeSessionID,
|
|
targetArg,
|
|
)
|
|
}
|
|
|
|
return append(args,
|
|
"-C", cfg.WorkDir,
|
|
"--json",
|
|
targetArg,
|
|
)
|
|
}
|
|
|
|
func RunCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, backend Backend, defaultCommandName string, defaultArgsBuilder func(*Config, string) []string, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult {
|
|
taskCtx := taskSpec.Context
|
|
if parentCtx == nil {
|
|
parentCtx = taskCtx
|
|
}
|
|
if parentCtx == nil {
|
|
parentCtx = context.Background()
|
|
}
|
|
|
|
result := TaskResult{TaskID: taskSpec.ID}
|
|
injectedLogger := taskLoggerFromContext(taskCtx)
|
|
if injectedLogger == nil {
|
|
injectedLogger = taskLoggerFromContext(parentCtx)
|
|
}
|
|
logger := injectedLogger
|
|
|
|
cfg := &Config{
|
|
Mode: taskSpec.Mode,
|
|
Task: taskSpec.Task,
|
|
SessionID: taskSpec.SessionID,
|
|
WorkDir: taskSpec.WorkDir,
|
|
Model: taskSpec.Model,
|
|
ReasoningEffort: taskSpec.ReasoningEffort,
|
|
SkipPermissions: taskSpec.SkipPermissions,
|
|
Backend: defaultBackendName,
|
|
}
|
|
|
|
commandName := strings.TrimSpace(defaultCommandName)
|
|
if commandName == "" {
|
|
commandName = defaultBackendName
|
|
}
|
|
argsBuilder := defaultArgsBuilder
|
|
if argsBuilder == nil {
|
|
argsBuilder = buildCodexArgs
|
|
}
|
|
if backend != nil {
|
|
commandName = backend.Command()
|
|
argsBuilder = backend.BuildArgs
|
|
cfg.Backend = backend.Name()
|
|
} else if taskSpec.Backend != "" {
|
|
cfg.Backend = taskSpec.Backend
|
|
} else if commandName != "" {
|
|
cfg.Backend = commandName
|
|
}
|
|
|
|
if cfg.Mode == "" {
|
|
cfg.Mode = "new"
|
|
}
|
|
if cfg.WorkDir == "" {
|
|
cfg.WorkDir = defaultWorkdir
|
|
}
|
|
|
|
if cfg.Mode == "resume" && strings.TrimSpace(cfg.SessionID) == "" {
|
|
result.ExitCode = 1
|
|
result.Error = "resume mode requires non-empty session_id"
|
|
return result
|
|
}
|
|
|
|
var fileEnv map[string]string
|
|
if cfg.Backend == "claude" {
|
|
settings := loadMinimalClaudeSettings()
|
|
fileEnv = settings.Env
|
|
if cfg.Mode != "resume" && strings.TrimSpace(cfg.Model) == "" && settings.Model != "" {
|
|
cfg.Model = settings.Model
|
|
}
|
|
}
|
|
|
|
// Load gemini env from ~/.gemini/.env if exists
|
|
if cfg.Backend == "gemini" {
|
|
fileEnv = loadGeminiEnv()
|
|
if cfg.Mode != "resume" && strings.TrimSpace(cfg.Model) == "" {
|
|
if model := fileEnv["GEMINI_MODEL"]; model != "" {
|
|
cfg.Model = model
|
|
}
|
|
}
|
|
}
|
|
|
|
useStdin := taskSpec.UseStdin
|
|
targetArg := taskSpec.Task
|
|
if useStdin {
|
|
targetArg = "-"
|
|
}
|
|
|
|
var codexArgs []string
|
|
if useCustomArgs {
|
|
codexArgs = customArgs
|
|
} else {
|
|
codexArgs = argsBuilder(cfg, targetArg)
|
|
}
|
|
|
|
prefixMsg := func(msg string) string {
|
|
if taskSpec.ID == "" {
|
|
return msg
|
|
}
|
|
return fmt.Sprintf("[Task: %s] %s", taskSpec.ID, msg)
|
|
}
|
|
|
|
var logInfoFn func(string)
|
|
var logWarnFn func(string)
|
|
var logErrorFn func(string)
|
|
|
|
if silent {
|
|
// Silent mode: only persist to file when available; avoid stderr noise.
|
|
logInfoFn = func(msg string) {
|
|
if logger != nil {
|
|
logger.Info(prefixMsg(msg))
|
|
}
|
|
}
|
|
logWarnFn = func(msg string) {
|
|
if logger != nil {
|
|
logger.Warn(prefixMsg(msg))
|
|
}
|
|
}
|
|
logErrorFn = func(msg string) {
|
|
if logger != nil {
|
|
logger.Error(prefixMsg(msg))
|
|
}
|
|
}
|
|
} else {
|
|
logInfoFn = func(msg string) { logInfo(prefixMsg(msg)) }
|
|
logWarnFn = func(msg string) { logWarn(prefixMsg(msg)) }
|
|
logErrorFn = func(msg string) { logError(prefixMsg(msg)) }
|
|
}
|
|
|
|
stderrBuf := &tailBuffer{limit: stderrCaptureLimit}
|
|
|
|
var stdoutLogger *logWriter
|
|
var stderrLogger *logWriter
|
|
|
|
var tempLogger *Logger
|
|
if logger == nil && silent && activeLogger() == nil {
|
|
if l, err := NewLogger(); err == nil {
|
|
setLogger(l)
|
|
tempLogger = l
|
|
logger = l
|
|
}
|
|
}
|
|
defer func() {
|
|
if tempLogger != nil {
|
|
_ = closeLogger()
|
|
}
|
|
}()
|
|
defer func() {
|
|
if result.LogPath != "" || logger == nil {
|
|
return
|
|
}
|
|
result.LogPath = logger.Path()
|
|
}()
|
|
if logger == nil {
|
|
logger = activeLogger()
|
|
}
|
|
if logger != nil {
|
|
result.LogPath = logger.Path()
|
|
}
|
|
|
|
if !silent {
|
|
// Note: Empty prefix ensures backend output is logged as-is without any wrapper format.
|
|
// This preserves the original stdout/stderr content from codex/claude/gemini backends.
|
|
// Trade-off: Reduces distinguishability between stdout/stderr in logs, but maintains
|
|
// output fidelity which is critical for debugging backend-specific issues.
|
|
stdoutLogger = newLogWriter("", codexLogLineLimit)
|
|
stderrLogger = newLogWriter("", codexLogLineLimit)
|
|
}
|
|
|
|
ctx := parentCtx
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
|
|
defer cancel()
|
|
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
attachStderr := func(msg string) string {
|
|
return fmt.Sprintf("%s; stderr: %s", msg, stderrBuf.String())
|
|
}
|
|
|
|
cmd := newCommandRunner(ctx, commandName, codexArgs...)
|
|
|
|
if len(fileEnv) > 0 {
|
|
cmd.SetEnv(fileEnv)
|
|
}
|
|
|
|
envBackend := backend
|
|
if envBackend == nil && cfg.Backend != "" {
|
|
if b, err := selectBackendFn(cfg.Backend); err == nil {
|
|
envBackend = b
|
|
}
|
|
}
|
|
|
|
if envBackend != nil {
|
|
baseURL, apiKey := config.ResolveBackendConfig(cfg.Backend)
|
|
if agentName := strings.TrimSpace(taskSpec.Agent); agentName != "" {
|
|
agentBackend, _, _, _, agentBaseURL, agentAPIKey, _, err := config.ResolveAgentConfig(agentName)
|
|
if err == nil {
|
|
if strings.EqualFold(strings.TrimSpace(agentBackend), strings.TrimSpace(cfg.Backend)) {
|
|
baseURL, apiKey = agentBaseURL, agentAPIKey
|
|
}
|
|
}
|
|
}
|
|
if injected := envBackend.Env(baseURL, apiKey); len(injected) > 0 {
|
|
cmd.SetEnv(injected)
|
|
// Log injected env vars with masked API keys (to file and stderr)
|
|
for k, v := range injected {
|
|
msg := fmt.Sprintf("Env: %s=%s", k, maskSensitiveValue(k, v))
|
|
logInfoFn(msg)
|
|
fmt.Fprintln(os.Stderr, " "+msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// For backends that don't support -C flag (claude, gemini), set working directory via cmd.Dir
|
|
// Codex passes workdir via -C flag, so we skip setting Dir for it to avoid conflicts
|
|
if cfg.Mode != "resume" && commandName != "codex" && cfg.WorkDir != "" {
|
|
cmd.SetDir(cfg.WorkDir)
|
|
}
|
|
|
|
stderrWriters := []io.Writer{stderrBuf}
|
|
if stderrLogger != nil {
|
|
stderrWriters = append(stderrWriters, stderrLogger)
|
|
}
|
|
|
|
// For gemini backend, filter noisy stderr output
|
|
var stderrFilter *filteringWriter
|
|
if !silent {
|
|
stderrOut := io.Writer(os.Stderr)
|
|
if cfg.Backend == "gemini" {
|
|
stderrFilter = newFilteringWriter(os.Stderr, geminiNoisePatterns)
|
|
stderrOut = stderrFilter
|
|
} else if cfg.Backend == "codex" {
|
|
stderrFilter = newFilteringWriter(os.Stderr, codexNoisePatterns)
|
|
stderrOut = stderrFilter
|
|
}
|
|
stderrWriters = append([]io.Writer{stderrOut}, stderrWriters...)
|
|
}
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
logErrorFn("Failed to create stderr pipe: " + err.Error())
|
|
result.ExitCode = 1
|
|
result.Error = attachStderr("failed to create stderr pipe: " + err.Error())
|
|
return result
|
|
}
|
|
|
|
var stdinPipe io.WriteCloser
|
|
if useStdin {
|
|
stdinPipe, err = cmd.StdinPipe()
|
|
if err != nil {
|
|
logErrorFn("Failed to create stdin pipe: " + err.Error())
|
|
result.ExitCode = 1
|
|
result.Error = attachStderr("failed to create stdin pipe: " + err.Error())
|
|
closeWithReason(stderr, "stdin-pipe-failed")
|
|
return result
|
|
}
|
|
}
|
|
|
|
stderrDone := make(chan error, 1)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
logErrorFn("Failed to create stdout pipe: " + err.Error())
|
|
result.ExitCode = 1
|
|
result.Error = attachStderr("failed to create stdout pipe: " + err.Error())
|
|
closeWithReason(stderr, "stdout-pipe-failed")
|
|
if stdinPipe != nil {
|
|
_ = stdinPipe.Close()
|
|
}
|
|
return result
|
|
}
|
|
|
|
stdoutReader := io.Reader(stdout)
|
|
if stdoutLogger != nil {
|
|
stdoutReader = io.TeeReader(stdout, stdoutLogger)
|
|
}
|
|
|
|
// Start parse goroutine BEFORE starting the command to avoid race condition
|
|
// where fast-completing commands close stdout before parser starts reading
|
|
messageSeen := make(chan struct{}, 1)
|
|
completeSeen := make(chan struct{}, 1)
|
|
parseCh := make(chan parseResult, 1)
|
|
go func() {
|
|
msg, tid := parseJSONStreamInternal(stdoutReader, logWarnFn, logInfoFn, func() {
|
|
select {
|
|
case messageSeen <- struct{}{}:
|
|
default:
|
|
}
|
|
}, func() {
|
|
select {
|
|
case completeSeen <- struct{}{}:
|
|
default:
|
|
}
|
|
})
|
|
select {
|
|
case completeSeen <- struct{}{}:
|
|
default:
|
|
}
|
|
parseCh <- parseResult{message: msg, threadID: tid}
|
|
}()
|
|
|
|
logInfoFn(fmt.Sprintf("Starting %s with args: %s %s...", commandName, commandName, strings.Join(codexArgs[:min(5, len(codexArgs))], " ")))
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
closeWithReason(stdout, "start-failed")
|
|
closeWithReason(stderr, "start-failed")
|
|
if stdinPipe != nil {
|
|
_ = stdinPipe.Close()
|
|
}
|
|
if strings.Contains(err.Error(), "executable file not found") {
|
|
msg := fmt.Sprintf("%s command not found in PATH", commandName)
|
|
logErrorFn(msg)
|
|
result.ExitCode = 127
|
|
result.Error = attachStderr(msg)
|
|
return result
|
|
}
|
|
logErrorFn("Failed to start " + commandName + ": " + err.Error())
|
|
result.ExitCode = 1
|
|
result.Error = attachStderr("failed to start " + commandName + ": " + err.Error())
|
|
return result
|
|
}
|
|
|
|
logInfoFn(fmt.Sprintf("Starting %s with PID: %d", commandName, cmd.Process().Pid()))
|
|
if logger != nil {
|
|
logInfoFn(fmt.Sprintf("Log capturing to: %s", logger.Path()))
|
|
}
|
|
|
|
// Start stderr drain AFTER we know the command started, but BEFORE cmd.Wait can close the pipe.
|
|
go func() {
|
|
_, copyErr := io.Copy(io.MultiWriter(stderrWriters...), stderr)
|
|
if stderrFilter != nil {
|
|
stderrFilter.Flush()
|
|
}
|
|
stderrDone <- copyErr
|
|
}()
|
|
|
|
if useStdin && stdinPipe != nil {
|
|
logInfoFn(fmt.Sprintf("Writing %d chars to stdin...", len(taskSpec.Task)))
|
|
go func(data string) {
|
|
defer stdinPipe.Close()
|
|
_, _ = io.WriteString(stdinPipe, data)
|
|
}(taskSpec.Task)
|
|
logInfoFn("Stdin closed")
|
|
}
|
|
|
|
waitCh := make(chan error, 1)
|
|
go func() { waitCh <- cmd.Wait() }()
|
|
|
|
var (
|
|
waitErr error
|
|
forceKillTimer *forceKillTimer
|
|
ctxCancelled bool
|
|
messageTimer *time.Timer
|
|
messageTimerCh <-chan time.Time
|
|
forcedAfterComplete bool
|
|
terminated bool
|
|
messageSeenObserved bool
|
|
completeSeenObserved bool
|
|
)
|
|
|
|
waitLoop:
|
|
for {
|
|
select {
|
|
case err := <-waitCh:
|
|
waitErr = err
|
|
break waitLoop
|
|
case <-ctx.Done():
|
|
ctxCancelled = true
|
|
logErrorFn(cancelReason(commandName, ctx))
|
|
if !terminated {
|
|
if timer := terminateCommandFn(cmd); timer != nil {
|
|
forceKillTimer = timer
|
|
terminated = true
|
|
}
|
|
}
|
|
for {
|
|
select {
|
|
case err := <-waitCh:
|
|
waitErr = err
|
|
break waitLoop
|
|
case <-time.After(forceKillWaitTimeout):
|
|
if proc := cmd.Process(); proc != nil {
|
|
_ = proc.Kill()
|
|
}
|
|
}
|
|
}
|
|
case <-messageTimerCh:
|
|
forcedAfterComplete = true
|
|
messageTimerCh = nil
|
|
if !terminated {
|
|
logWarnFn(fmt.Sprintf("%s output parsed; terminating lingering backend", commandName))
|
|
if timer := terminateCommandFn(cmd); timer != nil {
|
|
forceKillTimer = timer
|
|
terminated = true
|
|
}
|
|
}
|
|
// Close pipes to unblock stream readers, then wait for process exit.
|
|
closeWithReason(stdout, "terminate")
|
|
closeWithReason(stderr, "terminate")
|
|
for {
|
|
select {
|
|
case err := <-waitCh:
|
|
waitErr = err
|
|
break waitLoop
|
|
case <-time.After(forceKillWaitTimeout):
|
|
if proc := cmd.Process(); proc != nil {
|
|
_ = proc.Kill()
|
|
}
|
|
}
|
|
}
|
|
case <-completeSeen:
|
|
completeSeenObserved = true
|
|
if messageTimer != nil {
|
|
continue
|
|
}
|
|
messageTimer = time.NewTimer(postMessageTerminateDelay)
|
|
messageTimerCh = messageTimer.C
|
|
case <-messageSeen:
|
|
messageSeenObserved = true
|
|
}
|
|
}
|
|
|
|
if messageTimer != nil {
|
|
if !messageTimer.Stop() {
|
|
select {
|
|
case <-messageTimer.C:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
if forceKillTimer != nil {
|
|
forceKillTimer.Stop()
|
|
}
|
|
|
|
var parsed parseResult
|
|
switch {
|
|
case ctxCancelled:
|
|
closeWithReason(stdout, stdoutCloseReasonCtx)
|
|
parsed = <-parseCh
|
|
case messageSeenObserved || completeSeenObserved:
|
|
closeWithReason(stdout, stdoutCloseReasonWait)
|
|
parsed = <-parseCh
|
|
default:
|
|
drainTimer := time.NewTimer(stdoutDrainTimeout)
|
|
defer drainTimer.Stop()
|
|
|
|
select {
|
|
case parsed = <-parseCh:
|
|
closeWithReason(stdout, stdoutCloseReasonWait)
|
|
case <-messageSeen:
|
|
closeWithReason(stdout, stdoutCloseReasonWait)
|
|
parsed = <-parseCh
|
|
case <-completeSeen:
|
|
closeWithReason(stdout, stdoutCloseReasonWait)
|
|
parsed = <-parseCh
|
|
case <-drainTimer.C:
|
|
closeWithReason(stdout, stdoutCloseReasonDrain)
|
|
parsed = <-parseCh
|
|
}
|
|
}
|
|
|
|
closeWithReason(stderr, stdoutCloseReasonWait)
|
|
// Wait for stderr drain so stderrBuf / stderrLogger are not accessed concurrently.
|
|
// Important: cmd.Wait can block on internal stderr copying if cmd.Stderr is a non-file writer.
|
|
// We use StderrPipe and drain ourselves to avoid that deadlock class (common when children inherit pipes).
|
|
<-stderrDone
|
|
|
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
|
if errors.Is(ctxErr, context.DeadlineExceeded) {
|
|
result.ExitCode = 124
|
|
result.Error = attachStderr(fmt.Sprintf("%s execution timeout", commandName))
|
|
return result
|
|
}
|
|
result.ExitCode = 130
|
|
result.Error = attachStderr("execution cancelled")
|
|
return result
|
|
}
|
|
|
|
if waitErr != nil {
|
|
if forcedAfterComplete && parsed.message != "" {
|
|
logWarnFn(fmt.Sprintf("%s terminated after delivering output", commandName))
|
|
} else {
|
|
if exitErr, ok := waitErr.(*exec.ExitError); ok {
|
|
code := exitErr.ExitCode()
|
|
logErrorFn(fmt.Sprintf("%s exited with status %d", commandName, code))
|
|
result.ExitCode = code
|
|
result.Error = attachStderr(fmt.Sprintf("%s exited with status %d", commandName, code))
|
|
return result
|
|
}
|
|
logErrorFn(commandName + " error: " + waitErr.Error())
|
|
result.ExitCode = 1
|
|
result.Error = attachStderr(commandName + " error: " + waitErr.Error())
|
|
return result
|
|
}
|
|
}
|
|
|
|
message := parsed.message
|
|
threadID := parsed.threadID
|
|
if message == "" {
|
|
logErrorFn(fmt.Sprintf("%s completed without agent_message output", commandName))
|
|
result.ExitCode = 1
|
|
result.Error = attachStderr(fmt.Sprintf("%s completed without agent_message output", commandName))
|
|
return result
|
|
}
|
|
|
|
if stdoutLogger != nil {
|
|
stdoutLogger.Flush()
|
|
}
|
|
if stderrLogger != nil {
|
|
stderrLogger.Flush()
|
|
}
|
|
|
|
result.ExitCode = 0
|
|
result.Message = message
|
|
result.SessionID = threadID
|
|
if result.LogPath == "" && injectedLogger != nil {
|
|
result.LogPath = injectedLogger.Path()
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func cancelReason(commandName string, ctx context.Context) string {
|
|
if ctx == nil {
|
|
return "Context cancelled"
|
|
}
|
|
|
|
if commandName == "" {
|
|
commandName = defaultBackendName
|
|
}
|
|
|
|
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
|
return fmt.Sprintf("%s execution timeout", commandName)
|
|
}
|
|
|
|
return fmt.Sprintf("Execution cancelled, terminating %s process", commandName)
|
|
}
|
|
|
|
type stdoutReasonCloser interface {
|
|
CloseWithReason(string) error
|
|
}
|
|
|
|
func closeWithReason(rc io.ReadCloser, reason string) {
|
|
if rc == nil {
|
|
return
|
|
}
|
|
if c, ok := rc.(stdoutReasonCloser); ok {
|
|
_ = c.CloseWithReason(reason)
|
|
return
|
|
}
|
|
_ = rc.Close()
|
|
}
|
|
|
|
type forceKillTimer struct {
|
|
timer *time.Timer
|
|
done chan struct{}
|
|
stopped atomic.Bool
|
|
drained atomic.Bool
|
|
}
|
|
|
|
func (t *forceKillTimer) Stop() {
|
|
if t == nil || t.timer == nil {
|
|
return
|
|
}
|
|
if !t.timer.Stop() {
|
|
<-t.done
|
|
t.drained.Store(true)
|
|
}
|
|
t.stopped.Store(true)
|
|
}
|
|
|
|
func terminateCommand(cmd commandRunner) *forceKillTimer {
|
|
if cmd == nil {
|
|
return nil
|
|
}
|
|
proc := cmd.Process()
|
|
if proc == nil {
|
|
return nil
|
|
}
|
|
|
|
_ = sendTermSignal(proc)
|
|
|
|
done := make(chan struct{}, 1)
|
|
timer := time.AfterFunc(time.Duration(forceKillDelay.Load())*time.Second, func() {
|
|
if p := cmd.Process(); p != nil {
|
|
_ = p.Kill()
|
|
}
|
|
close(done)
|
|
})
|
|
|
|
return &forceKillTimer{timer: timer, done: done}
|
|
}
|
|
|
|
// maskSensitiveValue masks sensitive values like API keys for logging.
|
|
// Values containing "key", "token", or "secret" (case-insensitive) are masked.
|
|
// For values longer than 8 chars: shows first 4 + **** + last 4.
|
|
// For shorter values: shows only ****.
|
|
func maskSensitiveValue(key, value string) string {
|
|
keyLower := strings.ToLower(key)
|
|
if strings.Contains(keyLower, "key") || strings.Contains(keyLower, "token") || strings.Contains(keyLower, "secret") {
|
|
if len(value) > 8 {
|
|
return value[:4] + "****" + value[len(value)-4:]
|
|
} else if len(value) > 0 {
|
|
return "****"
|
|
}
|
|
}
|
|
return value
|
|
}
|