serverSentEvents.gen.ts 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // This file is auto-generated by @hey-api/openapi-ts
  2. import type { Config } from "./types.gen.js"
  3. export type ServerSentEventsOptions<TData = unknown> = Omit<RequestInit, "method"> &
  4. Pick<Config, "method" | "responseTransformer" | "responseValidator"> & {
  5. /**
  6. * Fetch API implementation. You can use this option to provide a custom
  7. * fetch instance.
  8. *
  9. * @default globalThis.fetch
  10. */
  11. fetch?: typeof fetch
  12. /**
  13. * Implementing clients can call request interceptors inside this hook.
  14. */
  15. onRequest?: (url: string, init: RequestInit) => Promise<Request>
  16. /**
  17. * Callback invoked when a network or parsing error occurs during streaming.
  18. *
  19. * This option applies only if the endpoint returns a stream of events.
  20. *
  21. * @param error The error that occurred.
  22. */
  23. onSseError?: (error: unknown) => void
  24. /**
  25. * Callback invoked when an event is streamed from the server.
  26. *
  27. * This option applies only if the endpoint returns a stream of events.
  28. *
  29. * @param event Event streamed from the server.
  30. * @returns Nothing (void).
  31. */
  32. onSseEvent?: (event: StreamEvent<TData>) => void
  33. serializedBody?: RequestInit["body"]
  34. /**
  35. * Default retry delay in milliseconds.
  36. *
  37. * This option applies only if the endpoint returns a stream of events.
  38. *
  39. * @default 3000
  40. */
  41. sseDefaultRetryDelay?: number
  42. /**
  43. * Maximum number of retry attempts before giving up.
  44. */
  45. sseMaxRetryAttempts?: number
  46. /**
  47. * Maximum retry delay in milliseconds.
  48. *
  49. * Applies only when exponential backoff is used.
  50. *
  51. * This option applies only if the endpoint returns a stream of events.
  52. *
  53. * @default 30000
  54. */
  55. sseMaxRetryDelay?: number
  56. /**
  57. * Optional sleep function for retry backoff.
  58. *
  59. * Defaults to using `setTimeout`.
  60. */
  61. sseSleepFn?: (ms: number) => Promise<void>
  62. url: string
  63. }
  64. export interface StreamEvent<TData = unknown> {
  65. data: TData
  66. event?: string
  67. id?: string
  68. retry?: number
  69. }
  70. export type ServerSentEventsResult<TData = unknown, TReturn = void, TNext = unknown> = {
  71. stream: AsyncGenerator<TData extends Record<string, unknown> ? TData[keyof TData] : TData, TReturn, TNext>
  72. }
  73. export const createSseClient = <TData = unknown>({
  74. onRequest,
  75. onSseError,
  76. onSseEvent,
  77. responseTransformer,
  78. responseValidator,
  79. sseDefaultRetryDelay,
  80. sseMaxRetryAttempts,
  81. sseMaxRetryDelay,
  82. sseSleepFn,
  83. url,
  84. ...options
  85. }: ServerSentEventsOptions): ServerSentEventsResult<TData> => {
  86. let lastEventId: string | undefined
  87. const sleep = sseSleepFn ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)))
  88. const createStream = async function* () {
  89. let retryDelay: number = sseDefaultRetryDelay ?? 3000
  90. let attempt = 0
  91. const signal = options.signal ?? new AbortController().signal
  92. while (true) {
  93. if (signal.aborted) break
  94. attempt++
  95. const headers =
  96. options.headers instanceof Headers
  97. ? options.headers
  98. : new Headers(options.headers as Record<string, string> | undefined)
  99. if (lastEventId !== undefined) {
  100. headers.set("Last-Event-ID", lastEventId)
  101. }
  102. try {
  103. const requestInit: RequestInit = {
  104. redirect: "follow",
  105. ...options,
  106. body: options.serializedBody,
  107. headers,
  108. signal,
  109. }
  110. let request = new Request(url, requestInit)
  111. if (onRequest) {
  112. request = await onRequest(url, requestInit)
  113. }
  114. // fetch must be assigned here, otherwise it would throw the error:
  115. // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation
  116. const _fetch = options.fetch ?? globalThis.fetch
  117. const response = await _fetch(request)
  118. if (!response.ok) throw new Error(`SSE failed: ${response.status} ${response.statusText}`)
  119. if (!response.body) throw new Error("No body in SSE response")
  120. const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
  121. let buffer = ""
  122. const abortHandler = () => {
  123. try {
  124. reader.cancel()
  125. } catch {
  126. // noop
  127. }
  128. }
  129. signal.addEventListener("abort", abortHandler)
  130. try {
  131. while (true) {
  132. const { done, value } = await reader.read()
  133. if (done) break
  134. buffer += value
  135. const chunks = buffer.split("\n\n")
  136. buffer = chunks.pop() ?? ""
  137. for (const chunk of chunks) {
  138. const lines = chunk.split("\n")
  139. const dataLines: Array<string> = []
  140. let eventName: string | undefined
  141. for (const line of lines) {
  142. if (line.startsWith("data:")) {
  143. dataLines.push(line.replace(/^data:\s*/, ""))
  144. } else if (line.startsWith("event:")) {
  145. eventName = line.replace(/^event:\s*/, "")
  146. } else if (line.startsWith("id:")) {
  147. lastEventId = line.replace(/^id:\s*/, "")
  148. } else if (line.startsWith("retry:")) {
  149. const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10)
  150. if (!Number.isNaN(parsed)) {
  151. retryDelay = parsed
  152. }
  153. }
  154. }
  155. let data: unknown
  156. let parsedJson = false
  157. if (dataLines.length) {
  158. const rawData = dataLines.join("\n")
  159. try {
  160. data = JSON.parse(rawData)
  161. parsedJson = true
  162. } catch {
  163. data = rawData
  164. }
  165. }
  166. if (parsedJson) {
  167. if (responseValidator) {
  168. await responseValidator(data)
  169. }
  170. if (responseTransformer) {
  171. data = await responseTransformer(data)
  172. }
  173. }
  174. onSseEvent?.({
  175. data,
  176. event: eventName,
  177. id: lastEventId,
  178. retry: retryDelay,
  179. })
  180. if (dataLines.length) {
  181. yield data as any
  182. }
  183. }
  184. }
  185. } finally {
  186. signal.removeEventListener("abort", abortHandler)
  187. reader.releaseLock()
  188. }
  189. break // exit loop on normal completion
  190. } catch (error) {
  191. // connection failed or aborted; retry after delay
  192. onSseError?.(error)
  193. if (sseMaxRetryAttempts !== undefined && attempt >= sseMaxRetryAttempts) {
  194. break // stop after firing error
  195. }
  196. // exponential backoff: double retry each attempt, cap at 30s
  197. const backoff = Math.min(retryDelay * 2 ** (attempt - 1), sseMaxRetryDelay ?? 30000)
  198. await sleep(backoff)
  199. }
  200. }
  201. }
  202. const stream = createStream()
  203. return { stream }
  204. }