| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- // This file is auto-generated by @hey-api/openapi-ts
- import type { Config } from "./types.gen.js"
- export type ServerSentEventsOptions<TData = unknown> = Omit<RequestInit, "method"> &
- Pick<Config, "method" | "responseTransformer" | "responseValidator"> & {
- /**
- * Fetch API implementation. You can use this option to provide a custom
- * fetch instance.
- *
- * @default globalThis.fetch
- */
- fetch?: typeof fetch
- /**
- * Implementing clients can call request interceptors inside this hook.
- */
- onRequest?: (url: string, init: RequestInit) => Promise<Request>
- /**
- * Callback invoked when a network or parsing error occurs during streaming.
- *
- * This option applies only if the endpoint returns a stream of events.
- *
- * @param error The error that occurred.
- */
- onSseError?: (error: unknown) => void
- /**
- * Callback invoked when an event is streamed from the server.
- *
- * This option applies only if the endpoint returns a stream of events.
- *
- * @param event Event streamed from the server.
- * @returns Nothing (void).
- */
- onSseEvent?: (event: StreamEvent<TData>) => void
- serializedBody?: RequestInit["body"]
- /**
- * Default retry delay in milliseconds.
- *
- * This option applies only if the endpoint returns a stream of events.
- *
- * @default 3000
- */
- sseDefaultRetryDelay?: number
- /**
- * Maximum number of retry attempts before giving up.
- */
- sseMaxRetryAttempts?: number
- /**
- * Maximum retry delay in milliseconds.
- *
- * Applies only when exponential backoff is used.
- *
- * This option applies only if the endpoint returns a stream of events.
- *
- * @default 30000
- */
- sseMaxRetryDelay?: number
- /**
- * Optional sleep function for retry backoff.
- *
- * Defaults to using `setTimeout`.
- */
- sseSleepFn?: (ms: number) => Promise<void>
- url: string
- }
- export interface StreamEvent<TData = unknown> {
- data: TData
- event?: string
- id?: string
- retry?: number
- }
- export type ServerSentEventsResult<TData = unknown, TReturn = void, TNext = unknown> = {
- stream: AsyncGenerator<TData extends Record<string, unknown> ? TData[keyof TData] : TData, TReturn, TNext>
- }
- export const createSseClient = <TData = unknown>({
- onRequest,
- onSseError,
- onSseEvent,
- responseTransformer,
- responseValidator,
- sseDefaultRetryDelay,
- sseMaxRetryAttempts,
- sseMaxRetryDelay,
- sseSleepFn,
- url,
- ...options
- }: ServerSentEventsOptions): ServerSentEventsResult<TData> => {
- let lastEventId: string | undefined
- const sleep = sseSleepFn ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)))
- const createStream = async function* () {
- let retryDelay: number = sseDefaultRetryDelay ?? 3000
- let attempt = 0
- const signal = options.signal ?? new AbortController().signal
- while (true) {
- if (signal.aborted) break
- attempt++
- const headers =
- options.headers instanceof Headers
- ? options.headers
- : new Headers(options.headers as Record<string, string> | undefined)
- if (lastEventId !== undefined) {
- headers.set("Last-Event-ID", lastEventId)
- }
- try {
- const requestInit: RequestInit = {
- redirect: "follow",
- ...options,
- body: options.serializedBody,
- headers,
- signal,
- }
- let request = new Request(url, requestInit)
- if (onRequest) {
- request = await onRequest(url, requestInit)
- }
- // fetch must be assigned here, otherwise it would throw the error:
- // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation
- const _fetch = options.fetch ?? globalThis.fetch
- const response = await _fetch(request)
- if (!response.ok) throw new Error(`SSE failed: ${response.status} ${response.statusText}`)
- if (!response.body) throw new Error("No body in SSE response")
- const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
- let buffer = ""
- const abortHandler = () => {
- try {
- reader.cancel()
- } catch {
- // noop
- }
- }
- signal.addEventListener("abort", abortHandler)
- try {
- while (true) {
- const { done, value } = await reader.read()
- if (done) break
- buffer += value
- const chunks = buffer.split("\n\n")
- buffer = chunks.pop() ?? ""
- for (const chunk of chunks) {
- const lines = chunk.split("\n")
- const dataLines: Array<string> = []
- let eventName: string | undefined
- for (const line of lines) {
- if (line.startsWith("data:")) {
- dataLines.push(line.replace(/^data:\s*/, ""))
- } else if (line.startsWith("event:")) {
- eventName = line.replace(/^event:\s*/, "")
- } else if (line.startsWith("id:")) {
- lastEventId = line.replace(/^id:\s*/, "")
- } else if (line.startsWith("retry:")) {
- const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10)
- if (!Number.isNaN(parsed)) {
- retryDelay = parsed
- }
- }
- }
- let data: unknown
- let parsedJson = false
- if (dataLines.length) {
- const rawData = dataLines.join("\n")
- try {
- data = JSON.parse(rawData)
- parsedJson = true
- } catch {
- data = rawData
- }
- }
- if (parsedJson) {
- if (responseValidator) {
- await responseValidator(data)
- }
- if (responseTransformer) {
- data = await responseTransformer(data)
- }
- }
- onSseEvent?.({
- data,
- event: eventName,
- id: lastEventId,
- retry: retryDelay,
- })
- if (dataLines.length) {
- yield data as any
- }
- }
- }
- } finally {
- signal.removeEventListener("abort", abortHandler)
- reader.releaseLock()
- }
- break // exit loop on normal completion
- } catch (error) {
- // connection failed or aborted; retry after delay
- onSseError?.(error)
- if (sseMaxRetryAttempts !== undefined && attempt >= sseMaxRetryAttempts) {
- break // stop after firing error
- }
- // exponential backoff: double retry each attempt, cap at 30s
- const backoff = Math.min(retryDelay * 2 ** (attempt - 1), sseMaxRetryDelay ?? 30000)
- await sleep(backoff)
- }
- }
- }
- const stream = createStream()
- return { stream }
- }
|