From 4e2df6a80e68f0f6d656b80a1ac9ac4af20394e1 Mon Sep 17 00:00:00 2001 From: ben Date: Sun, 21 Dec 2025 14:10:40 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20Parser=E9=87=8D=E5=A4=8D=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E4=BC=98=E5=8C=96=20+=20=E4=B8=A5=E9=87=8Dbug?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20+=20PR=20#86=E5=85=BC=E5=AE=B9=E6=80=A7=20?= =?UTF-8?q?(#88)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Merging parser optimization with critical bug fixes and PR #86 compatibility. Supersedes #84. --- codeagent-wrapper/main_test.go | 65 ++++++++++++++ codeagent-wrapper/parser.go | 160 ++++++++++++++++++++------------- 2 files changed, 165 insertions(+), 60 deletions(-) diff --git a/codeagent-wrapper/main_test.go b/codeagent-wrapper/main_test.go index 0193066..e5fc37b 100644 --- a/codeagent-wrapper/main_test.go +++ b/codeagent-wrapper/main_test.go @@ -1582,6 +1582,34 @@ func TestBackendParseJSONStream_ClaudeEvents(t *testing.T) { } } +func TestBackendParseJSONStream_ClaudeEvents_ItemDoesNotForceCodex(t *testing.T) { + tests := []struct { + name string + input string + }{ + { + name: "null item", + input: `{"type":"result","result":"OK","session_id":"abc123","item":null}`, + }, + { + name: "empty object item", + input: `{"type":"result","subtype":"x","result":"OK","session_id":"abc123","item":{}}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + message, threadID := parseJSONStream(strings.NewReader(tt.input)) + if message != "OK" { + t.Fatalf("message=%q, want %q", message, "OK") + } + if threadID != "abc123" { + t.Fatalf("threadID=%q, want %q", threadID, "abc123") + } + }) + } +} + func TestBackendParseJSONStream_GeminiEvents(t *testing.T) { input := `{"type":"init","session_id":"xyz789"} {"type":"message","role":"assistant","content":"Hi","delta":true,"session_id":"xyz789"} @@ -1598,6 +1626,43 @@ func TestBackendParseJSONStream_GeminiEvents(t *testing.T) { } } +func TestBackendParseJSONStream_GeminiEvents_DeltaFalseStillDetected(t *testing.T) { + input := `{"type":"init","session_id":"xyz789"} +{"type":"message","content":"Hi","delta":false,"session_id":"xyz789"} +{"type":"result","status":"success","session_id":"xyz789"}` + + message, threadID := parseJSONStream(strings.NewReader(input)) + + if message != "Hi" { + t.Fatalf("message=%q, want %q", message, "Hi") + } + if threadID != "xyz789" { + t.Fatalf("threadID=%q, want %q", threadID, "xyz789") + } +} + +func TestBackendParseJSONStream_GeminiEvents_OnMessageTriggeredOnStatus(t *testing.T) { + input := `{"type":"init","session_id":"xyz789"} +{"type":"message","role":"assistant","content":"Hi","delta":true,"session_id":"xyz789"} +{"type":"message","content":" there","delta":true} +{"type":"result","status":"success","session_id":"xyz789"}` + + var called int + message, threadID := parseJSONStreamInternal(strings.NewReader(input), nil, nil, func() { + called++ + }) + + if message != "Hi there" { + t.Fatalf("message=%q, want %q", message, "Hi there") + } + if threadID != "xyz789" { + t.Fatalf("threadID=%q, want %q", threadID, "xyz789") + } + if called != 1 { + t.Fatalf("onMessage called=%d, want %d", called, 1) + } +} + func TestBackendParseJSONStreamWithWarn_InvalidLine(t *testing.T) { var warnings []string warnFn := func(msg string) { warnings = append(warnings, msg) } diff --git a/codeagent-wrapper/parser.go b/codeagent-wrapper/parser.go index 79388f9..ecf27e6 100644 --- a/codeagent-wrapper/parser.go +++ b/codeagent-wrapper/parser.go @@ -67,6 +67,34 @@ type codexHeader struct { } `json:"item,omitempty"` } +// UnifiedEvent combines all backend event formats into a single structure +// to avoid multiple JSON unmarshal operations per event +type UnifiedEvent struct { + // Common fields + Type string `json:"type"` + + // Codex-specific fields + ThreadID string `json:"thread_id,omitempty"` + Item json.RawMessage `json:"item,omitempty"` // Lazy parse + + // Claude-specific fields + Subtype string `json:"subtype,omitempty"` + SessionID string `json:"session_id,omitempty"` + Result string `json:"result,omitempty"` + + // Gemini-specific fields + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + Delta *bool `json:"delta,omitempty"` + Status string `json:"status,omitempty"` +} + +// ItemContent represents the parsed item.text field for Codex events +type ItemContent struct { + Type string `json:"type"` + Text interface{} `json:"text"` +} + func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(string), onMessage func()) (message, threadID string) { reader := bufio.NewReaderSize(r, jsonLineReaderSize) @@ -112,71 +140,77 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin continue } - var codex codexHeader - if err := json.Unmarshal(line, &codex); err == nil { - isCodex := codex.ThreadID != "" || (codex.Item != nil && codex.Item.Type != "") - if isCodex { - var details []string - if codex.ThreadID != "" { - details = append(details, fmt.Sprintf("thread_id=%s", codex.ThreadID)) - } - if codex.Item != nil && codex.Item.Type != "" { - details = append(details, fmt.Sprintf("item_type=%s", codex.Item.Type)) - } - if len(details) > 0 { - infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, codex.Type, strings.Join(details, ", "))) - } else { - infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, codex.Type)) - } + // Single unmarshal for all backend types + var event UnifiedEvent + if err := json.Unmarshal(line, &event); err != nil { + warnFn(fmt.Sprintf("Failed to parse event: %s", truncateBytes(line, 100))) + continue + } - switch codex.Type { - case "thread.started": - threadID = codex.ThreadID - infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID)) - case "item.completed": - itemType := "" - if codex.Item != nil { - itemType = codex.Item.Type + // Detect backend type by field presence + isCodex := event.ThreadID != "" + if !isCodex && len(event.Item) > 0 { + var itemHeader struct { + Type string `json:"type"` + } + if json.Unmarshal(event.Item, &itemHeader) == nil && itemHeader.Type != "" { + isCodex = true + } + } + isClaude := event.Subtype != "" || event.Result != "" + isGemini := event.Role != "" || event.Delta != nil || event.Status != "" + + // Handle Codex events + if isCodex { + var details []string + if event.ThreadID != "" { + details = append(details, fmt.Sprintf("thread_id=%s", event.ThreadID)) + } + + if len(details) > 0 { + infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, event.Type, strings.Join(details, ", "))) + } else { + infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, event.Type)) + } + + switch event.Type { + case "thread.started": + threadID = event.ThreadID + infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID)) + + case "item.completed": + var itemType string + if len(event.Item) > 0 { + var itemHeader struct { + Type string `json:"type"` } + if err := json.Unmarshal(event.Item, &itemHeader); err == nil { + itemType = itemHeader.Type + } + } - if itemType == "agent_message" { - var event JSONEvent - if err := json.Unmarshal(line, &event); err != nil { - warnFn(fmt.Sprintf("Failed to parse Codex event: %s", truncateBytes(line, 100))) - continue - } - - normalized := "" - if event.Item != nil { - normalized = normalizeText(event.Item.Text) - } + if itemType == "agent_message" && len(event.Item) > 0 { + // Lazy parse: only parse item content when needed + var item ItemContent + if err := json.Unmarshal(event.Item, &item); err == nil { + normalized := normalizeText(item.Text) infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized))) if normalized != "" { codexMessage = normalized notifyMessage() } } else { - infoFn(fmt.Sprintf("item.completed event item_type=%s", itemType)) + warnFn(fmt.Sprintf("Failed to parse item content: %s", err.Error())) } + } else { + infoFn(fmt.Sprintf("item.completed event item_type=%s", itemType)) } - continue } - } - - var raw map[string]json.RawMessage - if err := json.Unmarshal(line, &raw); err != nil { - warnFn(fmt.Sprintf("Failed to parse line: %s", truncateBytes(line, 100))) continue } - switch { - case hasKey(raw, "subtype") || hasKey(raw, "result"): - var event ClaudeEvent - if err := json.Unmarshal(line, &event); err != nil { - warnFn(fmt.Sprintf("Failed to parse Claude event: %s", truncateBytes(line, 100))) - continue - } - + // Handle Claude events + if isClaude { if event.SessionID != "" && threadID == "" { threadID = event.SessionID } @@ -187,28 +221,34 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin claudeMessage = event.Result notifyMessage() } + continue + } - case hasKey(raw, "role") || hasKey(raw, "delta"): - var event GeminiEvent - if err := json.Unmarshal(line, &event); err != nil { - warnFn(fmt.Sprintf("Failed to parse Gemini event: %s", truncateBytes(line, 100))) - continue - } - + // Handle Gemini events + if isGemini { if event.SessionID != "" && threadID == "" { threadID = event.SessionID } if event.Content != "" { geminiBuffer.WriteString(event.Content) + } + + if event.Status != "" { notifyMessage() } - infoFn(fmt.Sprintf("Parsed Gemini event #%d type=%s role=%s delta=%t status=%s content_len=%d", totalEvents, event.Type, event.Role, event.Delta, event.Status, len(event.Content))) + delta := false + if event.Delta != nil { + delta = *event.Delta + } - default: - warnFn(fmt.Sprintf("Unknown event format: %s", truncateBytes(line, 100))) + infoFn(fmt.Sprintf("Parsed Gemini event #%d type=%s role=%s delta=%t status=%s content_len=%d", totalEvents, event.Type, event.Role, delta, event.Status, len(event.Content))) + continue } + + // Unknown event format + warnFn(fmt.Sprintf("Unknown event format: %s", truncateBytes(line, 100))) } switch {