decoder.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package decoders
  2. import (
  3. "bufio"
  4. "bytes"
  5. "io"
  6. "github.com/sst/opencode-sdk-go/packages/ssestream"
  7. )
  8. // UnboundedDecoder is an SSE decoder that uses bufio.Reader instead of bufio.Scanner
  9. // to avoid the 32MB token size limit. This is a workaround for large SSE events until
  10. // the upstream Stainless SDK is fixed.
  11. //
  12. // This decoder handles SSE events of unlimited size by reading line-by-line with
  13. // bufio.Reader.ReadBytes('\n'), which dynamically grows the buffer as needed.
  14. type UnboundedDecoder struct {
  15. reader *bufio.Reader
  16. closer io.ReadCloser
  17. evt ssestream.Event
  18. err error
  19. }
  20. // NewUnboundedDecoder creates a new unbounded SSE decoder with a 1MB initial buffer size
  21. func NewUnboundedDecoder(rc io.ReadCloser) ssestream.Decoder {
  22. reader := bufio.NewReaderSize(rc, 1024*1024) // 1MB initial buffer
  23. return &UnboundedDecoder{
  24. reader: reader,
  25. closer: rc,
  26. }
  27. }
  28. // Next reads and decodes the next SSE event from the stream
  29. func (d *UnboundedDecoder) Next() bool {
  30. if d.err != nil {
  31. return false
  32. }
  33. event := ""
  34. data := bytes.NewBuffer(nil)
  35. for {
  36. line, err := d.reader.ReadBytes('\n')
  37. if err != nil {
  38. if err == io.EOF && len(line) == 0 {
  39. return false
  40. }
  41. if err != io.EOF {
  42. d.err = err
  43. return false
  44. }
  45. }
  46. // Remove trailing newline characters
  47. line = bytes.TrimRight(line, "\r\n")
  48. // Empty line indicates end of event
  49. if len(line) == 0 {
  50. if data.Len() > 0 || event != "" {
  51. d.evt = ssestream.Event{
  52. Type: event,
  53. Data: data.Bytes(),
  54. }
  55. return true
  56. }
  57. continue
  58. }
  59. // Skip comments (lines starting with ':')
  60. if line[0] == ':' {
  61. continue
  62. }
  63. // Parse field
  64. name, value, found := bytes.Cut(line, []byte(":"))
  65. if !found {
  66. // Field with no value
  67. continue
  68. }
  69. // Remove leading space from value
  70. if len(value) > 0 && value[0] == ' ' {
  71. value = value[1:]
  72. }
  73. switch string(name) {
  74. case "":
  75. // An empty line in the form ": something" is a comment and should be ignored
  76. continue
  77. case "event":
  78. event = string(value)
  79. case "data":
  80. _, d.err = data.Write(value)
  81. if d.err != nil {
  82. return false
  83. }
  84. _, d.err = data.WriteRune('\n')
  85. if d.err != nil {
  86. return false
  87. }
  88. }
  89. }
  90. }
  91. // Event returns the current event
  92. func (d *UnboundedDecoder) Event() ssestream.Event {
  93. return d.evt
  94. }
  95. // Close closes the underlying reader
  96. func (d *UnboundedDecoder) Close() error {
  97. return d.closer.Close()
  98. }
  99. // Err returns any error that occurred during decoding
  100. func (d *UnboundedDecoder) Err() error {
  101. return d.err
  102. }