fix: Parser重复解析优化 + 严重bug修复 + PR #86兼容性 (#88)

Merging parser optimization with critical bug fixes and PR #86 compatibility. Supersedes #84.
This commit is contained in:
ben
2025-12-21 14:10:40 +08:00
committed by GitHub
parent a30f434b5d
commit 4e2df6a80e
2 changed files with 165 additions and 60 deletions

View File

@@ -1582,6 +1582,34 @@ func TestBackendParseJSONStream_ClaudeEvents(t *testing.T) {
}
}
func TestBackendParseJSONStream_ClaudeEvents_ItemDoesNotForceCodex(t *testing.T) {
tests := []struct {
name string
input string
}{
{
name: "null item",
input: `{"type":"result","result":"OK","session_id":"abc123","item":null}`,
},
{
name: "empty object item",
input: `{"type":"result","subtype":"x","result":"OK","session_id":"abc123","item":{}}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
message, threadID := parseJSONStream(strings.NewReader(tt.input))
if message != "OK" {
t.Fatalf("message=%q, want %q", message, "OK")
}
if threadID != "abc123" {
t.Fatalf("threadID=%q, want %q", threadID, "abc123")
}
})
}
}
func TestBackendParseJSONStream_GeminiEvents(t *testing.T) {
input := `{"type":"init","session_id":"xyz789"}
{"type":"message","role":"assistant","content":"Hi","delta":true,"session_id":"xyz789"}
@@ -1598,6 +1626,43 @@ func TestBackendParseJSONStream_GeminiEvents(t *testing.T) {
}
}
func TestBackendParseJSONStream_GeminiEvents_DeltaFalseStillDetected(t *testing.T) {
input := `{"type":"init","session_id":"xyz789"}
{"type":"message","content":"Hi","delta":false,"session_id":"xyz789"}
{"type":"result","status":"success","session_id":"xyz789"}`
message, threadID := parseJSONStream(strings.NewReader(input))
if message != "Hi" {
t.Fatalf("message=%q, want %q", message, "Hi")
}
if threadID != "xyz789" {
t.Fatalf("threadID=%q, want %q", threadID, "xyz789")
}
}
func TestBackendParseJSONStream_GeminiEvents_OnMessageTriggeredOnStatus(t *testing.T) {
input := `{"type":"init","session_id":"xyz789"}
{"type":"message","role":"assistant","content":"Hi","delta":true,"session_id":"xyz789"}
{"type":"message","content":" there","delta":true}
{"type":"result","status":"success","session_id":"xyz789"}`
var called int
message, threadID := parseJSONStreamInternal(strings.NewReader(input), nil, nil, func() {
called++
})
if message != "Hi there" {
t.Fatalf("message=%q, want %q", message, "Hi there")
}
if threadID != "xyz789" {
t.Fatalf("threadID=%q, want %q", threadID, "xyz789")
}
if called != 1 {
t.Fatalf("onMessage called=%d, want %d", called, 1)
}
}
func TestBackendParseJSONStreamWithWarn_InvalidLine(t *testing.T) {
var warnings []string
warnFn := func(msg string) { warnings = append(warnings, msg) }

View File

@@ -67,6 +67,34 @@ type codexHeader struct {
} `json:"item,omitempty"`
}
// UnifiedEvent combines all backend event formats into a single structure
// to avoid multiple JSON unmarshal operations per event
type UnifiedEvent struct {
// Common fields
Type string `json:"type"`
// Codex-specific fields
ThreadID string `json:"thread_id,omitempty"`
Item json.RawMessage `json:"item,omitempty"` // Lazy parse
// Claude-specific fields
Subtype string `json:"subtype,omitempty"`
SessionID string `json:"session_id,omitempty"`
Result string `json:"result,omitempty"`
// Gemini-specific fields
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
Delta *bool `json:"delta,omitempty"`
Status string `json:"status,omitempty"`
}
// ItemContent represents the parsed item.text field for Codex events
type ItemContent struct {
Type string `json:"type"`
Text interface{} `json:"text"`
}
func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(string), onMessage func()) (message, threadID string) {
reader := bufio.NewReaderSize(r, jsonLineReaderSize)
@@ -112,71 +140,77 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin
continue
}
var codex codexHeader
if err := json.Unmarshal(line, &codex); err == nil {
isCodex := codex.ThreadID != "" || (codex.Item != nil && codex.Item.Type != "")
if isCodex {
var details []string
if codex.ThreadID != "" {
details = append(details, fmt.Sprintf("thread_id=%s", codex.ThreadID))
}
if codex.Item != nil && codex.Item.Type != "" {
details = append(details, fmt.Sprintf("item_type=%s", codex.Item.Type))
}
if len(details) > 0 {
infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, codex.Type, strings.Join(details, ", ")))
} else {
infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, codex.Type))
}
// Single unmarshal for all backend types
var event UnifiedEvent
if err := json.Unmarshal(line, &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse event: %s", truncateBytes(line, 100)))
continue
}
switch codex.Type {
case "thread.started":
threadID = codex.ThreadID
infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID))
case "item.completed":
itemType := ""
if codex.Item != nil {
itemType = codex.Item.Type
// Detect backend type by field presence
isCodex := event.ThreadID != ""
if !isCodex && len(event.Item) > 0 {
var itemHeader struct {
Type string `json:"type"`
}
if json.Unmarshal(event.Item, &itemHeader) == nil && itemHeader.Type != "" {
isCodex = true
}
}
isClaude := event.Subtype != "" || event.Result != ""
isGemini := event.Role != "" || event.Delta != nil || event.Status != ""
// Handle Codex events
if isCodex {
var details []string
if event.ThreadID != "" {
details = append(details, fmt.Sprintf("thread_id=%s", event.ThreadID))
}
if len(details) > 0 {
infoFn(fmt.Sprintf("Parsed event #%d type=%s (%s)", totalEvents, event.Type, strings.Join(details, ", ")))
} else {
infoFn(fmt.Sprintf("Parsed event #%d type=%s", totalEvents, event.Type))
}
switch event.Type {
case "thread.started":
threadID = event.ThreadID
infoFn(fmt.Sprintf("thread.started event thread_id=%s", threadID))
case "item.completed":
var itemType string
if len(event.Item) > 0 {
var itemHeader struct {
Type string `json:"type"`
}
if err := json.Unmarshal(event.Item, &itemHeader); err == nil {
itemType = itemHeader.Type
}
}
if itemType == "agent_message" {
var event JSONEvent
if err := json.Unmarshal(line, &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Codex event: %s", truncateBytes(line, 100)))
continue
}
normalized := ""
if event.Item != nil {
normalized = normalizeText(event.Item.Text)
}
if itemType == "agent_message" && len(event.Item) > 0 {
// Lazy parse: only parse item content when needed
var item ItemContent
if err := json.Unmarshal(event.Item, &item); err == nil {
normalized := normalizeText(item.Text)
infoFn(fmt.Sprintf("item.completed event item_type=%s message_len=%d", itemType, len(normalized)))
if normalized != "" {
codexMessage = normalized
notifyMessage()
}
} else {
infoFn(fmt.Sprintf("item.completed event item_type=%s", itemType))
warnFn(fmt.Sprintf("Failed to parse item content: %s", err.Error()))
}
} else {
infoFn(fmt.Sprintf("item.completed event item_type=%s", itemType))
}
continue
}
}
var raw map[string]json.RawMessage
if err := json.Unmarshal(line, &raw); err != nil {
warnFn(fmt.Sprintf("Failed to parse line: %s", truncateBytes(line, 100)))
continue
}
switch {
case hasKey(raw, "subtype") || hasKey(raw, "result"):
var event ClaudeEvent
if err := json.Unmarshal(line, &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Claude event: %s", truncateBytes(line, 100)))
continue
}
// Handle Claude events
if isClaude {
if event.SessionID != "" && threadID == "" {
threadID = event.SessionID
}
@@ -187,28 +221,34 @@ func parseJSONStreamInternal(r io.Reader, warnFn func(string), infoFn func(strin
claudeMessage = event.Result
notifyMessage()
}
continue
}
case hasKey(raw, "role") || hasKey(raw, "delta"):
var event GeminiEvent
if err := json.Unmarshal(line, &event); err != nil {
warnFn(fmt.Sprintf("Failed to parse Gemini event: %s", truncateBytes(line, 100)))
continue
}
// Handle Gemini events
if isGemini {
if event.SessionID != "" && threadID == "" {
threadID = event.SessionID
}
if event.Content != "" {
geminiBuffer.WriteString(event.Content)
}
if event.Status != "" {
notifyMessage()
}
infoFn(fmt.Sprintf("Parsed Gemini event #%d type=%s role=%s delta=%t status=%s content_len=%d", totalEvents, event.Type, event.Role, event.Delta, event.Status, len(event.Content)))
delta := false
if event.Delta != nil {
delta = *event.Delta
}
default:
warnFn(fmt.Sprintf("Unknown event format: %s", truncateBytes(line, 100)))
infoFn(fmt.Sprintf("Parsed Gemini event #%d type=%s role=%s delta=%t status=%s content_len=%d", totalEvents, event.Type, event.Role, delta, event.Status, len(event.Content)))
continue
}
// Unknown event format
warnFn(fmt.Sprintf("Unknown event format: %s", truncateBytes(line, 100)))
}
switch {