Merge pull request #32 from freespace8/master

fix(main): 提升缓冲区限制并简化消息提取流程
This commit is contained in:
ben
2025-11-28 16:49:24 +08:00
committed by GitHub
2 changed files with 31 additions and 24 deletions

View File

@@ -24,9 +24,9 @@ const (
// Test hooks for dependency injection
var (
stdinReader io.Reader = os.Stdin
isTerminalFn = defaultIsTerminal
codexCommand = "codex"
stdinReader io.Reader = os.Stdin
isTerminalFn = defaultIsTerminal
codexCommand = "codex"
)
// Config holds CLI configuration
@@ -347,9 +347,7 @@ func runCodexProcess(codexArgs []string, taskText string, useStdin bool, timeout
func parseJSONStream(r io.Reader) (message, threadID string) {
scanner := bufio.NewScanner(r)
// Set larger buffer for long lines
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
@@ -369,18 +367,15 @@ func parseJSONStream(r io.Reader) (message, threadID string) {
}
// Capture agent_message
if event.Type == "item.completed" && event.Item != nil {
if event.Item.Type == "agent_message" {
text := normalizeText(event.Item.Text)
if text != "" {
message = text
}
if event.Type == "item.completed" && event.Item != nil && event.Item.Type == "agent_message" {
if text := normalizeText(event.Item.Text); text != "" {
message = text
}
}
}
if err := scanner.Err(); err != nil {
logWarn("Scanner error: " + err.Error())
if err := scanner.Err(); err != nil && err != io.EOF {
logWarn("Read stdout error: " + err.Error())
}
return message, threadID

View File

@@ -330,12 +330,16 @@ func TestNormalizeText(t *testing.T) {
}
func TestParseJSONStream(t *testing.T) {
tests := []struct {
type testCase struct {
name string
input string
wantMessage string
wantThreadID string
}{
}
longText := strings.Repeat("a", 2*1024*1024) // >1MB agent_message payload
tests := []testCase{
{
name: "thread started and agent message",
input: `{"type":"thread.started","thread_id":"abc-123"}
@@ -364,6 +368,12 @@ func TestParseJSONStream(t *testing.T) {
wantMessage: "Valid",
wantThreadID: "",
},
{
name: "super long single line (>1MB)",
input: `{"type":"item.completed","item":{"type":"agent_message","text":"` + longText + `"}}`,
wantMessage: longText,
wantThreadID: "",
},
{
name: "empty input",
input: "",
@@ -371,23 +381,25 @@ func TestParseJSONStream(t *testing.T) {
wantThreadID: "",
},
{
name: "invalid JSON (skipped)",
input: "not valid json\n{\"type\":\"thread.started\",\"thread_id\":\"xyz\"}",
name: "item completed with nil item",
input: strings.Join([]string{
`{"type":"thread.started","thread_id":"nil-item-thread"}`,
`{"type":"item.completed","item":null}`,
}, "\n"),
wantMessage: "",
wantThreadID: "xyz",
wantThreadID: "nil-item-thread",
},
{
name: "blank lines ignored",
input: "\n\n{\"type\":\"thread.started\",\"thread_id\":\"test\"}\n\n",
name: "agent message with non-string text",
input: `{"type":"item.completed","item":{"type":"agent_message","text":12345}}`,
wantMessage: "",
wantThreadID: "test",
wantThreadID: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := strings.NewReader(tt.input)
gotMessage, gotThreadID := parseJSONStream(r)
gotMessage, gotThreadID := parseJSONStream(strings.NewReader(tt.input))
if gotMessage != tt.wantMessage {
t.Errorf("parseJSONStream() message = %q, want %q", gotMessage, tt.wantMessage)