| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package decoders
- import (
- "bufio"
- "bytes"
- "io"
- "github.com/sst/opencode-sdk-go/packages/ssestream"
- )
- // UnboundedDecoder is an SSE decoder that uses bufio.Reader instead of bufio.Scanner
- // to avoid the 32MB token size limit. This is a workaround for large SSE events until
- // the upstream Stainless SDK is fixed.
- //
- // This decoder handles SSE events of unlimited size by reading line-by-line with
- // bufio.Reader.ReadBytes('\n'), which dynamically grows the buffer as needed.
- type UnboundedDecoder struct {
- reader *bufio.Reader
- closer io.ReadCloser
- evt ssestream.Event
- err error
- }
- // NewUnboundedDecoder creates a new unbounded SSE decoder with a 1MB initial buffer size
- func NewUnboundedDecoder(rc io.ReadCloser) ssestream.Decoder {
- reader := bufio.NewReaderSize(rc, 1024*1024) // 1MB initial buffer
- return &UnboundedDecoder{
- reader: reader,
- closer: rc,
- }
- }
- // Next reads and decodes the next SSE event from the stream
- func (d *UnboundedDecoder) Next() bool {
- if d.err != nil {
- return false
- }
- event := ""
- data := bytes.NewBuffer(nil)
- for {
- line, err := d.reader.ReadBytes('\n')
- if err != nil {
- if err == io.EOF && len(line) == 0 {
- return false
- }
- if err != io.EOF {
- d.err = err
- return false
- }
- }
- // Remove trailing newline characters
- line = bytes.TrimRight(line, "\r\n")
- // Empty line indicates end of event
- if len(line) == 0 {
- if data.Len() > 0 || event != "" {
- d.evt = ssestream.Event{
- Type: event,
- Data: data.Bytes(),
- }
- return true
- }
- continue
- }
- // Skip comments (lines starting with ':')
- if line[0] == ':' {
- continue
- }
- // Parse field
- name, value, found := bytes.Cut(line, []byte(":"))
- if !found {
- // Field with no value
- continue
- }
- // Remove leading space from value
- if len(value) > 0 && value[0] == ' ' {
- value = value[1:]
- }
- switch string(name) {
- case "":
- // An empty line in the form ": something" is a comment and should be ignored
- continue
- case "event":
- event = string(value)
- case "data":
- _, d.err = data.Write(value)
- if d.err != nil {
- return false
- }
- _, d.err = data.WriteRune('\n')
- if d.err != nil {
- return false
- }
- }
- }
- }
- // Event returns the current event
- func (d *UnboundedDecoder) Event() ssestream.Event {
- return d.evt
- }
- // Close closes the underlying reader
- func (d *UnboundedDecoder) Close() error {
- return d.closer.Close()
- }
- // Err returns any error that occurred during decoding
- func (d *UnboundedDecoder) Err() error {
- return d.err
- }
|