Просмотр исходного кода

Fix "bufio.Scanner token too long" error by replacing Scanner with Reader in SSE (#3531)

Khang Ha (Kelvin) 3 месяцев назад
Родитель
Сommit
4b5e447961

+ 7 - 0
packages/tui/cmd/opencode/main.go

@@ -13,9 +13,11 @@ import (
 	flag "github.com/spf13/pflag"
 	"github.com/sst/opencode-sdk-go"
 	"github.com/sst/opencode-sdk-go/option"
+	"github.com/sst/opencode-sdk-go/packages/ssestream"
 	"github.com/sst/opencode/internal/api"
 	"github.com/sst/opencode/internal/app"
 	"github.com/sst/opencode/internal/clipboard"
+	"github.com/sst/opencode/internal/decoders"
 	"github.com/sst/opencode/internal/tui"
 	"github.com/sst/opencode/internal/util"
 	"golang.org/x/sync/errgroup"
@@ -61,6 +63,11 @@ func main() {
 		}
 	}
 
+	// Register custom SSE decoder to handle large events (>32MB)
+	// This is a workaround for the bufio.Scanner token size limit in the auto-generated SDK
+	// See: packages/tui/internal/decoders/decoder.go
+	ssestream.RegisterDecoder("text/event-stream", decoders.NewUnboundedDecoder)
+
 	httpClient := opencode.NewClient(
 		option.WithBaseURL(url),
 	)

+ 118 - 0
packages/tui/internal/decoders/decoder.go

@@ -0,0 +1,118 @@
+package decoders
+
+import (
+	"bufio"
+	"bytes"
+	"io"
+
+	"github.com/sst/opencode-sdk-go/packages/ssestream"
+)
+
+// UnboundedDecoder is an SSE decoder that uses bufio.Reader instead of bufio.Scanner
+// to avoid the 32MB token size limit. This is a workaround for large SSE events until
+// the upstream Stainless SDK is fixed.
+//
+// This decoder handles SSE events of unlimited size by reading line-by-line with
+// bufio.Reader.ReadBytes('\n'), which dynamically grows the buffer as needed.
+type UnboundedDecoder struct {
+	reader *bufio.Reader
+	closer io.ReadCloser
+	evt    ssestream.Event
+	err    error
+}
+
+// NewUnboundedDecoder creates a new unbounded SSE decoder with a 1MB initial buffer size
+func NewUnboundedDecoder(rc io.ReadCloser) ssestream.Decoder {
+	reader := bufio.NewReaderSize(rc, 1024*1024) // 1MB initial buffer
+	return &UnboundedDecoder{
+		reader: reader,
+		closer: rc,
+	}
+}
+
+// Next reads and decodes the next SSE event from the stream
+func (d *UnboundedDecoder) Next() bool {
+	if d.err != nil {
+		return false
+	}
+
+	event := ""
+	data := bytes.NewBuffer(nil)
+
+	for {
+		line, err := d.reader.ReadBytes('\n')
+		if err != nil {
+			if err == io.EOF && len(line) == 0 {
+				return false
+			}
+			if err != io.EOF {
+				d.err = err
+				return false
+			}
+		}
+
+		// Remove trailing newline characters
+		line = bytes.TrimRight(line, "\r\n")
+
+		// Empty line indicates end of event
+		if len(line) == 0 {
+			if data.Len() > 0 || event != "" {
+				d.evt = ssestream.Event{
+					Type: event,
+					Data: data.Bytes(),
+				}
+				return true
+			}
+			continue
+		}
+
+		// Skip comments (lines starting with ':')
+		if line[0] == ':' {
+			continue
+		}
+
+		// Parse field
+		name, value, found := bytes.Cut(line, []byte(":"))
+		if !found {
+			// Field with no value
+			continue
+		}
+
+		// Remove leading space from value
+		if len(value) > 0 && value[0] == ' ' {
+			value = value[1:]
+		}
+
+		switch string(name) {
+		case "":
+			// An empty line in the form ": something" is a comment and should be ignored
+			continue
+		case "event":
+			event = string(value)
+		case "data":
+			_, d.err = data.Write(value)
+			if d.err != nil {
+				return false
+			}
+			_, d.err = data.WriteRune('\n')
+			if d.err != nil {
+				return false
+			}
+		}
+	}
+}
+
+// Event returns the current event
+func (d *UnboundedDecoder) Event() ssestream.Event {
+	return d.evt
+}
+
+// Close closes the underlying reader
+func (d *UnboundedDecoder) Close() error {
+	return d.closer.Close()
+}
+
+// Err returns any error that occurred during decoding
+func (d *UnboundedDecoder) Err() error {
+	return d.err
+}

+ 194 - 0
packages/tui/internal/decoders/decoder_test.go

@@ -0,0 +1,194 @@
+package decoders
+
+import (
+	"bytes"
+	"io"
+	"strings"
+	"testing"
+)
+
+func TestUnboundedDecoder_SmallEvent(t *testing.T) {
+	data := "event: test\ndata: hello world\n\n"
+	rc := io.NopCloser(strings.NewReader(data))
+	decoder := NewUnboundedDecoder(rc)
+
+	if !decoder.Next() {
+		t.Fatal("Expected Next() to return true")
+	}
+
+	evt := decoder.Event()
+	if evt.Type != "test" {
+		t.Errorf("Expected event type 'test', got '%s'", evt.Type)
+	}
+	if string(evt.Data) != "hello world\n" {
+		t.Errorf("Expected data 'hello world\\n', got '%s'", string(evt.Data))
+	}
+
+	if decoder.Next() {
+		t.Error("Expected Next() to return false at end of stream")
+	}
+
+	if err := decoder.Err(); err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+}
+
+func TestUnboundedDecoder_LargeEvent(t *testing.T) {
+	// Create a large event (50MB)
+	size := 50 * 1024 * 1024
+	largeData := strings.Repeat("x", size)
+
+	var buf bytes.Buffer
+	buf.WriteString("event: large\n")
+	buf.WriteString("data: ")
+	buf.WriteString(largeData)
+	buf.WriteString("\n\n")
+
+	rc := io.NopCloser(&buf)
+	decoder := NewUnboundedDecoder(rc)
+
+	if !decoder.Next() {
+		t.Fatal("Expected Next() to return true")
+	}
+
+	evt := decoder.Event()
+	if evt.Type != "large" {
+		t.Errorf("Expected event type 'large', got '%s'", evt.Type)
+	}
+
+	expectedData := largeData + "\n"
+	if string(evt.Data) != expectedData {
+		t.Errorf("Data size mismatch: expected %d, got %d", len(expectedData), len(evt.Data))
+	}
+
+	if decoder.Next() {
+		t.Error("Expected Next() to return false at end of stream")
+	}
+
+	if err := decoder.Err(); err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+}
+
+func TestUnboundedDecoder_MultipleEvents(t *testing.T) {
+	data := "event: first\ndata: data1\n\nevent: second\ndata: data2\n\n"
+	rc := io.NopCloser(strings.NewReader(data))
+	decoder := NewUnboundedDecoder(rc)
+
+	// First event
+	if !decoder.Next() {
+		t.Fatal("Expected Next() to return true for first event")
+	}
+	evt := decoder.Event()
+	if evt.Type != "first" {
+		t.Errorf("Expected event type 'first', got '%s'", evt.Type)
+	}
+	if string(evt.Data) != "data1\n" {
+		t.Errorf("Expected data 'data1\\n', got '%s'", string(evt.Data))
+	}
+
+	// Second event
+	if !decoder.Next() {
+		t.Fatal("Expected Next() to return true for second event")
+	}
+	evt = decoder.Event()
+	if evt.Type != "second" {
+		t.Errorf("Expected event type 'second', got '%s'", evt.Type)
+	}
+	if string(evt.Data) != "data2\n" {
+		t.Errorf("Expected data 'data2\\n', got '%s'", string(evt.Data))
+	}
+
+	// No more events
+	if decoder.Next() {
+		t.Error("Expected Next() to return false at end of stream")
+	}
+
+	if err := decoder.Err(); err != nil {
+		t.Errorf("Expected no error, got %v", err)
+	}
+}
+
+func TestUnboundedDecoder_MultilineData(t *testing.T) {
+	data := "event: multiline\ndata: line1\ndata: line2\ndata: line3\n\n"
+	rc := io.NopCloser(strings.NewReader(data))
+	decoder := NewUnboundedDecoder(rc)
+
+	if !decoder.Next() {
+		t.Fatal("Expected Next() to return true")
+	}
+
+	evt := decoder.Event()
+	if evt.Type != "multiline" {
+		t.Errorf("Expected event type 'multiline', got '%s'", evt.Type)
+	}
+
+	expectedData := "line1\nline2\nline3\n"
+	if string(evt.Data) != expectedData {
+		t.Errorf("Expected data '%s', got '%s'", expectedData, string(evt.Data))
+	}
+}
+
+func TestUnboundedDecoder_Comments(t *testing.T) {
+	data := ": this is a comment\nevent: test\n: another comment\ndata: hello\n\n"
+	rc := io.NopCloser(strings.NewReader(data))
+	decoder := NewUnboundedDecoder(rc)
+
+	if !decoder.Next() {
+		t.Fatal("Expected Next() to return true")
+	}
+
+	evt := decoder.Event()
+	if evt.Type != "test" {
+		t.Errorf("Expected event type 'test', got '%s'", evt.Type)
+	}
+	if string(evt.Data) != "hello\n" {
+		t.Errorf("Expected data 'hello\\n', got '%s'", string(evt.Data))
+	}
+}
+
+func TestUnboundedDecoder_NoEventType(t *testing.T) {
+	data := "data: hello\n\n"
+	rc := io.NopCloser(strings.NewReader(data))
+	decoder := NewUnboundedDecoder(rc)
+
+	if !decoder.Next() {
+		t.Fatal("Expected Next() to return true")
+	}
+
+	evt := decoder.Event()
+	if evt.Type != "" {
+		t.Errorf("Expected empty event type, got '%s'", evt.Type)
+	}
+	if string(evt.Data) != "hello\n" {
+		t.Errorf("Expected data 'hello\\n', got '%s'", string(evt.Data))
+	}
+}
+
+func BenchmarkUnboundedDecoder_LargeEvent(b *testing.B) {
+	// Create a 10MB event for benchmarking
+	size := 10 * 1024 * 1024
+	largeData := strings.Repeat("x", size)
+
+	var buf bytes.Buffer
+	buf.WriteString("event: bench\n")
+	buf.WriteString("data: ")
+	buf.WriteString(largeData)
+	buf.WriteString("\n\n")
+
+	data := buf.Bytes()
+
+	b.ResetTimer()
+	b.SetBytes(int64(len(data)))
+
+	for i := 0; i < b.N; i++ {
+		rc := io.NopCloser(bytes.NewReader(data))
+		decoder := NewUnboundedDecoder(rc)
+
+		if !decoder.Next() {
+			b.Fatal("Expected Next() to return true")
+		}
+
+		_ = decoder.Event()
+	}
+}