sse.test.ts 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import { afterEach, describe, expect, test } from "bun:test"
  2. import { parseSSE } from "../../src/control-plane/sse"
  3. import { resetDatabase } from "../fixture/db"
  4. afterEach(async () => {
  5. await resetDatabase()
  6. })
  7. function stream(chunks: string[]) {
  8. return new ReadableStream<Uint8Array>({
  9. start(controller) {
  10. const encoder = new TextEncoder()
  11. chunks.forEach((chunk) => controller.enqueue(encoder.encode(chunk)))
  12. controller.close()
  13. },
  14. })
  15. }
  16. describe("control-plane/sse", () => {
  17. test("parses JSON events with CRLF and multiline data blocks", async () => {
  18. const events: unknown[] = []
  19. const stop = new AbortController()
  20. await parseSSE(
  21. stream([
  22. 'data: {"type":"one","properties":{"ok":true}}\r\n\r\n',
  23. 'data: {"type":"two",\r\ndata: "properties":{"n":2}}\r\n\r\n',
  24. ]),
  25. stop.signal,
  26. (event) => events.push(event),
  27. )
  28. expect(events).toEqual([
  29. { type: "one", properties: { ok: true } },
  30. { type: "two", properties: { n: 2 } },
  31. ])
  32. })
  33. test("falls back to sse.message for non-json payload", async () => {
  34. const events: unknown[] = []
  35. const stop = new AbortController()
  36. await parseSSE(stream(["id: abc\nretry: 1500\ndata: hello world\n\n"]), stop.signal, (event) => events.push(event))
  37. expect(events).toEqual([
  38. {
  39. type: "sse.message",
  40. properties: {
  41. data: "hello world",
  42. id: "abc",
  43. retry: 1500,
  44. },
  45. },
  46. ])
  47. })
  48. })