diff --git a/codex-wrapper/main.go b/codex-wrapper/main.go index 56d13b7..4837704 100644 --- a/codex-wrapper/main.go +++ b/codex-wrapper/main.go @@ -24,9 +24,9 @@ const ( // Test hooks for dependency injection var ( - stdinReader io.Reader = os.Stdin - isTerminalFn = defaultIsTerminal - codexCommand = "codex" + stdinReader io.Reader = os.Stdin + isTerminalFn = defaultIsTerminal + codexCommand = "codex" ) // Config holds CLI configuration @@ -347,9 +347,7 @@ func runCodexProcess(codexArgs []string, taskText string, useStdin bool, timeout func parseJSONStream(r io.Reader) (message, threadID string) { scanner := bufio.NewScanner(r) - // Set larger buffer for long lines - buf := make([]byte, 0, 64*1024) - scanner.Buffer(buf, 1024*1024) + scanner.Buffer(make([]byte, 64*1024), 10*1024*1024) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) @@ -369,18 +367,15 @@ func parseJSONStream(r io.Reader) (message, threadID string) { } // Capture agent_message - if event.Type == "item.completed" && event.Item != nil { - if event.Item.Type == "agent_message" { - text := normalizeText(event.Item.Text) - if text != "" { - message = text - } + if event.Type == "item.completed" && event.Item != nil && event.Item.Type == "agent_message" { + if text := normalizeText(event.Item.Text); text != "" { + message = text } } } - if err := scanner.Err(); err != nil { - logWarn("Scanner error: " + err.Error()) + if err := scanner.Err(); err != nil && err != io.EOF { + logWarn("Read stdout error: " + err.Error()) } return message, threadID diff --git a/codex-wrapper/main_test.go b/codex-wrapper/main_test.go index 112330b..ab123cb 100644 --- a/codex-wrapper/main_test.go +++ b/codex-wrapper/main_test.go @@ -330,12 +330,16 @@ func TestNormalizeText(t *testing.T) { } func TestParseJSONStream(t *testing.T) { - tests := []struct { + type testCase struct { name string input string wantMessage string wantThreadID string - }{ + } + + longText := strings.Repeat("a", 2*1024*1024) // >1MB agent_message payload + + tests := []testCase{ { name: "thread started and agent message", input: `{"type":"thread.started","thread_id":"abc-123"} @@ -364,6 +368,12 @@ func TestParseJSONStream(t *testing.T) { wantMessage: "Valid", wantThreadID: "", }, + { + name: "super long single line (>1MB)", + input: `{"type":"item.completed","item":{"type":"agent_message","text":"` + longText + `"}}`, + wantMessage: longText, + wantThreadID: "", + }, { name: "empty input", input: "", @@ -371,23 +381,25 @@ func TestParseJSONStream(t *testing.T) { wantThreadID: "", }, { - name: "invalid JSON (skipped)", - input: "not valid json\n{\"type\":\"thread.started\",\"thread_id\":\"xyz\"}", + name: "item completed with nil item", + input: strings.Join([]string{ + `{"type":"thread.started","thread_id":"nil-item-thread"}`, + `{"type":"item.completed","item":null}`, + }, "\n"), wantMessage: "", - wantThreadID: "xyz", + wantThreadID: "nil-item-thread", }, { - name: "blank lines ignored", - input: "\n\n{\"type\":\"thread.started\",\"thread_id\":\"test\"}\n\n", + name: "agent message with non-string text", + input: `{"type":"item.completed","item":{"type":"agent_message","text":12345}}`, wantMessage: "", - wantThreadID: "test", + wantThreadID: "", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := strings.NewReader(tt.input) - gotMessage, gotThreadID := parseJSONStream(r) + gotMessage, gotThreadID := parseJSONStream(strings.NewReader(tt.input)) if gotMessage != tt.wantMessage { t.Errorf("parseJSONStream() message = %q, want %q", gotMessage, tt.wantMessage)