From 6f4f4e701b2f3e90299febdf3da16d8b311f5d6f Mon Sep 17 00:00:00 2001 From: "swe-agent[bot]" <0+swe-agent[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 13:19:51 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix(parser):=20=E4=BF=AE=E5=A4=8D=20bufio.S?= =?UTF-8?q?canner=20token=20too=20long=20=E9=94=99=E8=AF=AF=20(#64)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 问题 - 执行 rg 等命令时,如果匹配到 minified 文件,单行输出可能超过 10MB - 旧实现使用 bufio.Scanner,遇到超长行会报错并中止整个解析 - 导致后续的 agent_message 无法读取,任务失败 ## 修复 1. **parser.go**: - 移除 bufio.Scanner,改用 bufio.Reader + readLineWithLimit - 超长行(>10MB)会被跳过但继续处理后续事件 - 添加 codexHeader 轻量级解析,只在 agent_message 时完整解析 2. **utils.go**: - 修复 logWriter 内存膨胀问题 - 添加 writeLimited 方法限制缓冲区大小 3. **测试**: - parser_token_too_long_test.go: 验证超长行处理 - log_writer_limit_test.go: 验证日志缓冲限制 ## 测试结果 - ✅ TestParseJSONStream_SkipsOverlongLineAndContinues - ✅ TestLogWriterWriteLimitsBuffer - ✅ 完整测试套件通过 Fixes #64 Generated with swe-agent-bot Co-Authored-By: swe-agent-bot --- codeagent-wrapper/log_writer_limit_test.go | 39 +++ codeagent-wrapper/parser.go | 224 ++++++++++++------ .../parser_token_too_long_test.go | 31 +++ codeagent-wrapper/utils.go | 49 +++- 4 files changed, 269 insertions(+), 74 deletions(-) create mode 100644 codeagent-wrapper/log_writer_limit_test.go create mode 100644 codeagent-wrapper/parser_token_too_long_test.go diff --git a/codeagent-wrapper/log_writer_limit_test.go b/codeagent-wrapper/log_writer_limit_test.go new file mode 100644 index 0000000..9f51c07 --- /dev/null +++ b/codeagent-wrapper/log_writer_limit_test.go @@ -0,0 +1,39 @@ +package main + +import ( + "os" + "strings" + "testing" +) + +func TestLogWriterWriteLimitsBuffer(t *testing.T) { + defer resetTestHooks() + + logger, err := NewLogger() + if err != nil { + t.Fatalf("NewLogger error: %v", err) + } + setLogger(logger) + defer closeLogger() + + lw := newLogWriter("P:", 10) + _, _ = lw.Write([]byte(strings.Repeat("a", 100))) + + if lw.buf.Len() != 10 { + t.Fatalf("logWriter buffer len=%d, want %d", lw.buf.Len(), 10) + } + if !lw.dropped { + t.Fatalf("expected logWriter to drop overlong line bytes") + } + + lw.Flush() + logger.Flush() + data, err := os.ReadFile(logger.Path()) + if err != nil { + t.Fatalf("ReadFile error: %v", err) + } + if !strings.Contains(string(data), "P:aaaaaaa...") { + t.Fatalf("log output missing truncated entry, got %q", string(data)) + } +} + diff --git a/codeagent-wrapper/parser.go b/codeagent-wrapper/parser.go index 7f97ff3..79388f9 100644 --- a/codeagent-wrapper/parser.go +++ b/codeagent-wrapper/parser.go @@ -53,9 +53,22 @@ func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string return parseJSONStreamInternal(r, warnFn, infoFn, nil) } +const ( + jsonLineReaderSize = 64 * 1024 + jsonLineMaxBytes = 10 * 1024 * 1024 + jsonLinePreviewBytes = 256 +) + +type codexHeader struct { + Type string `json:"type"` + ThreadID string `json:"thread_id,omitempty"` + Item *struct { + Type string `json:"type"` + } `json:"item,omitempty"` +} + func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(string), onMessage func()) (message, threadID string) { - scanner := bufio.NewScanner(r) - scanner.Buffer(make([]byte, 64*1024), 10*1024*1024) + reader := bufio.NewReaderSize(r, jsonLineReaderSize) if warnFn == nil { warnFn = func(string) {} @@ -78,79 +91,89 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin geminiBuffer strings.Builder ) - for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - if line == "" { + for { + line, tooLong, err := readLineWithLimit(reader, jsonLineMaxBytes, jsonLinePreviewBytes) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + warnFn("Read stdout error: " + err.Error()) + break + } + + line = bytes.TrimSpace(line) + if len(line) == 0 { continue } totalEvents++ - var raw map[string]json.RawMessage - if err := json.Unmarshal([]byte(line), &raw); err != nil { - warnFn(fmt.Sprintf("Failed to parse line: %s", truncate(line, 100))) + if tooLong { + warnFn(fmt.Sprintf("Skipped overlong JSON line (> %d bytes): %s", jsonLineMaxBytes, truncateBytes(line, 100))) continue } - hasItemType := false - if rawItem, ok := raw["item"]; ok { - var itemMap map[string]json.RawMessage - if err := json.Unmarshal(rawItem, &itemMap); err == nil { - if _, ok := itemMap["type"]; ok { - hasItemType = true + var codex codexHeader + if err := json.Unmarshal(line, &codex); err == nil { + isCodex := codex.ThreadID != "" || (codex.Item != nil && codex.Item.Type != "") + if isCodex { + var details []string + if codex.ThreadID != "" { + details = append(details, fmt.Sprintf("thread_id=%s", codex.ThreadID)) } + if codex.Item != nil && codex.Item.Type != "" { + details = append(details, fmt.Sprintf("item_type=%s", codex.Item.Type)) + } + if len(details) > 0 { + infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, codex.Type, strings.Join(details, ", "))) + } else { + infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, codex.Type)) + } + + switch codex.Type { + case "thread.started": + threadID = codex.ThreadID + infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID)) + case "item.completed": + itemType := "" + if codex.Item != nil { + itemType = codex.Item.Type + } + + if itemType == "agent_message" { + var event JSONEvent + if err := json.Unmarshal(line, &event); err != nil { + warnFn(fmt.Sprintf("Failed to parse Codex event: %s", truncateBytes(line, 100))) + continue + } + + normalized := "" + if event.Item != nil { + normalized = normalizeText(event.Item.Text) + } + infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized))) + if normalized != "" { + codexMessage = normalized + notifyMessage() + } + } else { + infoFn(fmt.Sprintf("item.completed event item_type=%s", itemType)) + } + } + continue } } - isCodex := hasItemType - if !isCodex { - if _, ok := raw["thread_id"]; ok { - isCodex = true - } + var raw map[string]json.RawMessage + if err := json.Unmarshal(line, &raw); err != nil { + warnFn(fmt.Sprintf("Failed to parse line: %s", truncateBytes(line, 100))) + continue } switch { - case isCodex: - var event JSONEvent - if err := json.Unmarshal([]byte(line), &event); err != nil { - warnFn(fmt.Sprintf("Failed to parse Codex event: %s", truncate(line, 100))) - continue - } - - var details []string - if event.ThreadID != "" { - details = append(details, fmt.Sprintf("thread_id=%s", event.ThreadID)) - } - if event.Item != nil && event.Item.Type != "" { - details = append(details, fmt.Sprintf("item_type=%s", event.Item.Type)) - } - if len(details) > 0 { - infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, event.Type, strings.Join(details, ", "))) - } else { - infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, event.Type)) - } - - switch event.Type { - case "thread.started": - threadID = event.ThreadID - infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID)) - case "item.completed": - var itemType string - var normalized string - if event.Item != nil { - itemType = event.Item.Type - normalized = normalizeText(event.Item.Text) - } - infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized))) - if event.Item != nil && event.Item.Type == "agent_message" && normalized != "" { - codexMessage = normalized - notifyMessage() - } - } - case hasKey(raw, "subtype") || hasKey(raw, "result"): var event ClaudeEvent - if err := json.Unmarshal([]byte(line), &event); err != nil { - warnFn(fmt.Sprintf("Failed to parse Claude event: %s", truncate(line, 100))) + if err := json.Unmarshal(line, &event); err != nil { + warnFn(fmt.Sprintf("Failed to parse Claude event: %s", truncateBytes(line, 100))) continue } @@ -167,8 +190,8 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin case hasKey(raw, "role") || hasKey(raw, "delta"): var event GeminiEvent - if err := json.Unmarshal([]byte(line), &event); err != nil { - warnFn(fmt.Sprintf("Failed to parse Gemini event: %s", truncate(line, 100))) + if err := json.Unmarshal(line, &event); err != nil { + warnFn(fmt.Sprintf("Failed to parse Gemini event: %s", truncateBytes(line, 100))) continue } @@ -184,14 +207,10 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin infoFn(fmt.Sprintf("Parsed Gemini event #%d type=%s role=%s delta=%t status=%s content_len=%d", totalEvents, event.Type, event.Role, event.Delta, event.Status, len(event.Content))) default: - warnFn(fmt.Sprintf("Unknown event format: %s", truncate(line, 100))) + warnFn(fmt.Sprintf("Unknown event format: %s", truncateBytes(line, 100))) } } - if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) { - warnFn("Read stdout error: " + err.Error()) - } - switch { case geminiBuffer.Len() > 0: message = geminiBuffer.String() @@ -236,6 +255,79 @@ func discardInvalidJSON(decoder *json.Decoder, reader *bufio.Reader) (*bufio.Rea return bufio.NewReader(io.MultiReader(bytes.NewReader(remaining), reader)), err } +func readLineWithLimit(r *bufio.Reader, maxBytes int, previewBytes int) (line []byte, tooLong bool, err error) { + if r == nil { + return nil, false, errors.New("reader is nil") + } + if maxBytes <= 0 { + return nil, false, errors.New("maxBytes must be > 0") + } + if previewBytes < 0 { + previewBytes = 0 + } + + part, isPrefix, err := r.ReadLine() + if err != nil { + return nil, false, err + } + + if !isPrefix { + if len(part) > maxBytes { + return part[:min(len(part), previewBytes)], true, nil + } + return part, false, nil + } + + preview := make([]byte, 0, min(previewBytes, len(part))) + if previewBytes > 0 { + preview = append(preview, part[:min(previewBytes, len(part))]...) + } + + buf := make([]byte, 0, min(maxBytes, len(part)*2)) + total := 0 + if len(part) > maxBytes { + tooLong = true + } else { + buf = append(buf, part...) + total = len(part) + } + + for isPrefix { + part, isPrefix, err = r.ReadLine() + if err != nil { + return nil, tooLong, err + } + + if previewBytes > 0 && len(preview) < previewBytes { + preview = append(preview, part[:min(previewBytes-len(preview), len(part))]...) + } + + if !tooLong { + if total+len(part) > maxBytes { + tooLong = true + continue + } + buf = append(buf, part...) + total += len(part) + } + } + + if tooLong { + return preview, true, nil + } + return buf, false, nil +} + +func truncateBytes(b []byte, maxLen int) string { + if len(b) <= maxLen { + return string(b) + } + if maxLen < 0 { + return "" + } + return string(b[:maxLen]) + "..." +} + func normalizeText(text interface{}) string { switch v := text.(type) { case string: diff --git a/codeagent-wrapper/parser_token_too_long_test.go b/codeagent-wrapper/parser_token_too_long_test.go new file mode 100644 index 0000000..ed91cd2 --- /dev/null +++ b/codeagent-wrapper/parser_token_too_long_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "strings" + "testing" +) + +func TestParseJSONStream_SkipsOverlongLineAndContinues(t *testing.T) { + // Exceed the 10MB bufio.Scanner limit in parseJSONStreamInternal. + tooLong := strings.Repeat("a", 11*1024*1024) + + input := strings.Join([]string{ + `{"type":"item.completed","item":{"type":"other_type","text":"` + tooLong + `"}}`, + `{"type":"thread.started","thread_id":"t-1"}`, + `{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}`, + }, "\n") + + var warns []string + warnFn := func(msg string) { warns = append(warns, msg) } + + gotMessage, gotThreadID := parseJSONStreamInternal(strings.NewReader(input), warnFn, nil, nil) + if gotMessage != "ok" { + t.Fatalf("message=%q, want %q (warns=%v)", gotMessage, "ok", warns) + } + if gotThreadID != "t-1" { + t.Fatalf("threadID=%q, want %q (warns=%v)", gotThreadID, "t-1", warns) + } + if len(warns) == 0 || !strings.Contains(warns[0], "Skipped overlong JSON line") { + t.Fatalf("expected warning about overlong JSON line, got %v", warns) + } +} diff --git a/codeagent-wrapper/utils.go b/codeagent-wrapper/utils.go index 3f4fa89..7f504c1 100644 --- a/codeagent-wrapper/utils.go +++ b/codeagent-wrapper/utils.go @@ -78,6 +78,7 @@ type logWriter struct { prefix string maxLen int buf bytes.Buffer + dropped bool } func newLogWriter(prefix string, maxLen int) *logWriter { @@ -94,12 +95,12 @@ func (lw *logWriter) Write(p []byte) (int, error) { total := len(p) for len(p) > 0 { if idx := bytes.IndexByte(p, '\n'); idx >= 0 { - lw.buf.Write(p[:idx]) + lw.writeLimited(p[:idx]) lw.logLine(true) p = p[idx+1:] continue } - lw.buf.Write(p) + lw.writeLimited(p) break } return total, nil @@ -117,21 +118,53 @@ func (lw *logWriter) logLine(force bool) { return } line := lw.buf.String() + dropped := lw.dropped + lw.dropped = false lw.buf.Reset() if line == "" && !force { return } - if lw.maxLen > 0 && len(line) > lw.maxLen { - cutoff := lw.maxLen - if cutoff > 3 { - line = line[:cutoff-3] + "..." - } else { - line = line[:cutoff] + if lw.maxLen > 0 { + if dropped { + if lw.maxLen > 3 { + line = line[:min(len(line), lw.maxLen-3)] + "..." + } else { + line = line[:min(len(line), lw.maxLen)] + } + } else if len(line) > lw.maxLen { + cutoff := lw.maxLen + if cutoff > 3 { + line = line[:cutoff-3] + "..." + } else { + line = line[:cutoff] + } } } logInfo(lw.prefix + line) } +func (lw *logWriter) writeLimited(p []byte) { + if lw == nil || len(p) == 0 { + return + } + if lw.maxLen <= 0 { + lw.buf.Write(p) + return + } + + remaining := lw.maxLen - lw.buf.Len() + if remaining <= 0 { + lw.dropped = true + return + } + if len(p) <= remaining { + lw.buf.Write(p) + return + } + lw.buf.Write(p[:remaining]) + lw.dropped = true +} + type tailBuffer struct { limit int data []byte From 0c93bbe5742643b3e27ed35ab22a6cd3a995fbf2 Mon Sep 17 00:00:00 2001 From: "swe-agent[bot]" <0+swe-agent[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 13:23:26 +0800 Subject: [PATCH 2/3] change version --- codeagent-wrapper/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codeagent-wrapper/main.go b/codeagent-wrapper/main.go index 7d642e9..1659566 100644 --- a/codeagent-wrapper/main.go +++ b/codeagent-wrapper/main.go @@ -14,7 +14,7 @@ import ( ) const ( - version = "5.2.2" + version = "5.2.3" defaultWorkdir = "." defaultTimeout = 7200 // seconds codexLogLineLimit = 1000 From b1156038decabbefc25d9a5b9cf91a60a24b7a30 Mon Sep 17 00:00:00 2001 From: "swe-agent[bot]" <0+swe-agent[bot]@users.noreply.github.com> Date: Mon, 15 Dec 2025 14:13:03 +0800 Subject: [PATCH 3/3] =?UTF-8?q?test:=20=E5=90=8C=E6=AD=A5=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E4=B8=AD=E7=9A=84=E7=89=88=E6=9C=AC=E5=8F=B7=E8=87=B3?= =?UTF-8?q?=205.2.3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复 CI 失败:将 main_test.go 中的版本期望值从 5.2.2 更新为 5.2.3, 与 main.go 中的实际版本号保持一致。 修改文件: - codeagent-wrapper/main_test.go:2693 (TestVersionFlag) - codeagent-wrapper/main_test.go:2707 (TestVersionShortFlag) - codeagent-wrapper/main_test.go:2721 (TestVersionLegacyAlias) Generated with swe-agent-bot Co-Authored-By: swe-agent-bot --- codeagent-wrapper/main_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/codeagent-wrapper/main_test.go b/codeagent-wrapper/main_test.go index 6360464..c3b8948 100644 --- a/codeagent-wrapper/main_test.go +++ b/codeagent-wrapper/main_test.go @@ -2690,7 +2690,7 @@ func TestVersionFlag(t *testing.T) { t.Errorf("exit = %d, want 0", code) } }) - want := "codeagent-wrapper version 5.2.2\n" + want := "codeagent-wrapper version 5.2.3\n" if output != want { t.Fatalf("output = %q, want %q", output, want) } @@ -2704,7 +2704,7 @@ func TestVersionShortFlag(t *testing.T) { t.Errorf("exit = %d, want 0", code) } }) - want := "codeagent-wrapper version 5.2.2\n" + want := "codeagent-wrapper version 5.2.3\n" if output != want { t.Fatalf("output = %q, want %q", output, want) } @@ -2718,7 +2718,7 @@ func TestVersionLegacyAlias(t *testing.T) { t.Errorf("exit = %d, want 0", code) } }) - want := "codex-wrapper version 5.2.2\n" + want := "codex-wrapper version 5.2.3\n" if output != want { t.Fatalf("output = %q, want %q", output, want) }