fix(parser): 修复 bufio.Scanner token too long 错误 (#64)

## 问题
- 执行 rg 等命令时,如果匹配到 minified 文件,单行输出可能超过 10MB
- 旧实现使用 bufio.Scanner,遇到超长行会报错并中止整个解析
- 导致后续的 agent_message 无法读取,任务失败

## 修复
1. **parser.go**:
   - 移除 bufio.Scanner,改用 bufio.Reader + readLineWithLimit
   - 超长行(>10MB)会被跳过但继续处理后续事件
   - 添加 codexHeader 轻量级解析,只在 agent_message 时完整解析

2. **utils.go**:
   - 修复 logWriter 内存膨胀问题
   - 添加 writeLimited 方法限制缓冲区大小

3. **测试**:
   - parser_token_too_long_test.go: 验证超长行处理
   - log_writer_limit_test.go: 验证日志缓冲限制

## 测试结果
-  TestParseJSONStream_SkipsOverlongLineAndContinues
-  TestLogWriterWriteLimitsBuffer
-  完整测试套件通过

Fixes #64

Generated with swe-agent-bot

Co-Authored-By: swe-agent-bot <agent@swe-agent.ai>
This commit is contained in:
swe-agent[bot]
2025-12-15 13:19:51 +08:00
parent ff301507fe
commit 6f4f4e701b
4 changed files with 269 additions and 74 deletions

View File

@@ -0,0 +1,39 @@
package main
import (
"os"
"strings"
"testing"
)
func TestLogWriterWriteLimitsBuffer(t *testing.T) {
defer resetTestHooks()
logger, err := NewLogger()
if err != nil {
t.Fatalf("NewLogger error: %v", err)
}
setLogger(logger)
defer closeLogger()
lw := newLogWriter("P:", 10)
_, _ = lw.Write([]byte(strings.Repeat("a", 100)))
if lw.buf.Len() != 10 {
t.Fatalf("logWriter buffer len=%d, want %d", lw.buf.Len(), 10)
}
if !lw.dropped {
t.Fatalf("expected logWriter to drop overlong line bytes")
}
lw.Flush()
logger.Flush()
data, err := os.ReadFile(logger.Path())
if err != nil {
t.Fatalf("ReadFile error: %v", err)
}
if !strings.Contains(string(data), "P:aaaaaaa...") {
t.Fatalf("log output missing truncated entry, got %q", string(data))
}
}

View File

@@ -53,9 +53,22 @@ func parseJSONStreamWithLog(r io.Reader, warnFn func(string), infoFn func(string
return parseJSONStreamInternal(r, warnFn, infoFn, nil)
}
const (
jsonLineReaderSize = 64 * 1024
jsonLineMaxBytes = 10 * 1024 * 1024
jsonLinePreviewBytes = 256
)
type codexHeader struct {
Type string `json:"type"`
ThreadID string `json:"thread_id,omitempty"`
Item *struct {
Type string `json:"type"`
} `json:"item,omitempty"`
}
func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(string), onMessage func()) (message, threadID string) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
reader := bufio.NewReaderSize(r, jsonLineReaderSize)
if warnFn == nil {
warnFn = func(string) {}
@@ -78,79 +91,89 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin
geminiBuffer strings.Builder
)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
for {
line, tooLong, err := readLineWithLimit(reader, jsonLineMaxBytes, jsonLinePreviewBytes)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
warnFn("Read stdout error: " + err.Error())
break
}
line = bytes.TrimSpace(line)
if len(line) == 0 {
continue
}
totalEvents++
var raw map[string]json.RawMessage
if err := json.Unmarshal([]byte(line), &raw); err != nil {
warnFn(fmt.Sprintf("Failed to parse line: %s", truncate(line, 100)))
if tooLong {
warnFn(fmt.Sprintf("Skipped overlong JSON line (> %d bytes): %s", jsonLineMaxBytes, truncateBytes(line, 100)))
continue
}
hasItemType := false
if rawItem, ok := raw["item"]; ok {
var itemMap map[string]json.RawMessage
if err := json.Unmarshal(rawItem, &itemMap); err == nil {
if _, ok := itemMap["type"]; ok {
hasItemType = true
var codex codexHeader
if err := json.Unmarshal(line, &codex); err == nil {
isCodex := codex.ThreadID != "" || (codex.Item != nil && codex.Item.Type != "")
if isCodex {
var details []string
if codex.ThreadID != "" {
details = append(details, fmt.Sprintf("thread_id=%s", codex.ThreadID))
}
if codex.Item != nil && codex.Item.Type != "" {
details = append(details, fmt.Sprintf("item_type=%s", codex.Item.Type))
}
if len(details) > 0 {
infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, codex.Type, strings.Join(details, ", ")))
} else {
infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, codex.Type))
}
switch codex.Type {
case "thread.started":
threadID = codex.ThreadID
infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID))
case "item.completed":
itemType := ""
if codex.Item != nil {
itemType = codex.Item.Type
}
if itemType == "agent_message" {
var event JSONEvent
if err := json.Unmarshal(line, &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Codex event: %s", truncateBytes(line, 100)))
continue
}
normalized := ""
if event.Item != nil {
normalized = normalizeText(event.Item.Text)
}
infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized)))
if normalized != "" {
codexMessage = normalized
notifyMessage()
}
} else {
infoFn(fmt.Sprintf("item.completed event item_type=%s", itemType))
}
}
continue
}
}
isCodex := hasItemType
if !isCodex {
if _, ok := raw["thread_id"]; ok {
isCodex = true
}
var raw map[string]json.RawMessage
if err := json.Unmarshal(line, &raw); err != nil {
warnFn(fmt.Sprintf("Failed to parse line: %s", truncateBytes(line, 100)))
continue
}
switch {
case isCodex:
var event JSONEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Codex event: %s", truncate(line, 100)))
continue
}
var details []string
if event.ThreadID != "" {
details = append(details, fmt.Sprintf("thread_id=%s", event.ThreadID))
}
if event.Item != nil && event.Item.Type != "" {
details = append(details, fmt.Sprintf("item_type=%s", event.Item.Type))
}
if len(details) > 0 {
infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, event.Type, strings.Join(details, ", ")))
} else {
infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, event.Type))
}
switch event.Type {
case "thread.started":
threadID = event.ThreadID
infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID))
case "item.completed":
var itemType string
var normalized string
if event.Item != nil {
itemType = event.Item.Type
normalized = normalizeText(event.Item.Text)
}
infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized)))
if event.Item != nil && event.Item.Type == "agent_message" && normalized != "" {
codexMessage = normalized
notifyMessage()
}
}
case hasKey(raw, "subtype") || hasKey(raw, "result"):
var event ClaudeEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Claude event: %s", truncate(line, 100)))
if err := json.Unmarshal(line, &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Claude event: %s", truncateBytes(line, 100)))
continue
}
@@ -167,8 +190,8 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin
case hasKey(raw, "role") || hasKey(raw, "delta"):
var event GeminiEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Gemini event: %s", truncate(line, 100)))
if err := json.Unmarshal(line, &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Gemini event: %s", truncateBytes(line, 100)))
continue
}
@@ -184,14 +207,10 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin
infoFn(fmt.Sprintf("Parsed Gemini event #%d type=%s role=%s delta=%t status=%s content_len=%d", totalEvents, event.Type, event.Role, event.Delta, event.Status, len(event.Content)))
default:
warnFn(fmt.Sprintf("Unknown event format: %s", truncate(line, 100)))
warnFn(fmt.Sprintf("Unknown event format: %s", truncateBytes(line, 100)))
}
}
if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) {
warnFn("Read stdout error: " + err.Error())
}
switch {
case geminiBuffer.Len() > 0:
message = geminiBuffer.String()
@@ -236,6 +255,79 @@ func discardInvalidJSON(decoder *json.Decoder, reader *bufio.Reader) (*bufio.Rea
return bufio.NewReader(io.MultiReader(bytes.NewReader(remaining), reader)), err
}
func readLineWithLimit(r *bufio.Reader, maxBytes int, previewBytes int) (line []byte, tooLong bool, err error) {
if r == nil {
return nil, false, errors.New("reader is nil")
}
if maxBytes <= 0 {
return nil, false, errors.New("maxBytes must be > 0")
}
if previewBytes < 0 {
previewBytes = 0
}
part, isPrefix, err := r.ReadLine()
if err != nil {
return nil, false, err
}
if !isPrefix {
if len(part) > maxBytes {
return part[:min(len(part), previewBytes)], true, nil
}
return part, false, nil
}
preview := make([]byte, 0, min(previewBytes, len(part)))
if previewBytes > 0 {
preview = append(preview, part[:min(previewBytes, len(part))]...)
}
buf := make([]byte, 0, min(maxBytes, len(part)*2))
total := 0
if len(part) > maxBytes {
tooLong = true
} else {
buf = append(buf, part...)
total = len(part)
}
for isPrefix {
part, isPrefix, err = r.ReadLine()
if err != nil {
return nil, tooLong, err
}
if previewBytes > 0 && len(preview) < previewBytes {
preview = append(preview, part[:min(previewBytes-len(preview), len(part))]...)
}
if !tooLong {
if total+len(part) > maxBytes {
tooLong = true
continue
}
buf = append(buf, part...)
total += len(part)
}
}
if tooLong {
return preview, true, nil
}
return buf, false, nil
}
func truncateBytes(b []byte, maxLen int) string {
if len(b) <= maxLen {
return string(b)
}
if maxLen < 0 {
return ""
}
return string(b[:maxLen]) + "..."
}
func normalizeText(text interface{}) string {
switch v := text.(type) {
case string:

View File

@@ -0,0 +1,31 @@
package main
import (
"strings"
"testing"
)
func TestParseJSONStream_SkipsOverlongLineAndContinues(t *testing.T) {
// Exceed the 10MB bufio.Scanner limit in parseJSONStreamInternal.
tooLong := strings.Repeat("a", 11*1024*1024)
input := strings.Join([]string{
`{"type":"item.completed","item":{"type":"other_type","text":"` + tooLong + `"}}`,
`{"type":"thread.started","thread_id":"t-1"}`,
`{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}`,
}, "\n")
var warns []string
warnFn := func(msg string) { warns = append(warns, msg) }
gotMessage, gotThreadID := parseJSONStreamInternal(strings.NewReader(input), warnFn, nil, nil)
if gotMessage != "ok" {
t.Fatalf("message=%q, want %q (warns=%v)", gotMessage, "ok", warns)
}
if gotThreadID != "t-1" {
t.Fatalf("threadID=%q, want %q (warns=%v)", gotThreadID, "t-1", warns)
}
if len(warns) == 0 || !strings.Contains(warns[0], "Skipped overlong JSON line") {
t.Fatalf("expected warning about overlong JSON line, got %v", warns)
}
}

View File

@@ -78,6 +78,7 @@ type logWriter struct {
prefix string
maxLen int
buf bytes.Buffer
dropped bool
}
func newLogWriter(prefix string, maxLen int) *logWriter {
@@ -94,12 +95,12 @@ func (lw *logWriter) Write(p []byte) (int, error) {
total := len(p)
for len(p) > 0 {
if idx := bytes.IndexByte(p, '\n'); idx >= 0 {
lw.buf.Write(p[:idx])
lw.writeLimited(p[:idx])
lw.logLine(true)
p = p[idx+1:]
continue
}
lw.buf.Write(p)
lw.writeLimited(p)
break
}
return total, nil
@@ -117,21 +118,53 @@ func (lw *logWriter) logLine(force bool) {
return
}
line := lw.buf.String()
dropped := lw.dropped
lw.dropped = false
lw.buf.Reset()
if line == "" && !force {
return
}
if lw.maxLen > 0 && len(line) > lw.maxLen {
cutoff := lw.maxLen
if cutoff > 3 {
line = line[:cutoff-3] + "..."
} else {
line = line[:cutoff]
if lw.maxLen > 0 {
if dropped {
if lw.maxLen > 3 {
line = line[:min(len(line), lw.maxLen-3)] + "..."
} else {
line = line[:min(len(line), lw.maxLen)]
}
} else if len(line) > lw.maxLen {
cutoff := lw.maxLen
if cutoff > 3 {
line = line[:cutoff-3] + "..."
} else {
line = line[:cutoff]
}
}
}
logInfo(lw.prefix + line)
}
func (lw *logWriter) writeLimited(p []byte) {
if lw == nil || len(p) == 0 {
return
}
if lw.maxLen <= 0 {
lw.buf.Write(p)
return
}
remaining := lw.maxLen - lw.buf.Len()
if remaining <= 0 {
lw.dropped = true
return
}
if len(p) <= remaining {
lw.buf.Write(p)
return
}
lw.buf.Write(p[:remaining])
lw.dropped = true
}
type tailBuffer struct {
limit int
data []byte