proxy-forwarder-large-chunked-response.test.ts 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import { createServer } from "node:http";
  2. import type { Socket } from "node:net";
  3. import { describe, expect, test, vi } from "vitest";
  4. import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder";
  5. import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy";
  6. import { ProxySession } from "@/app/v1/_lib/proxy/session";
  7. import type { Provider } from "@/types/provider";
  8. const mocks = vi.hoisted(() => {
  9. return {
  10. isHttp2Enabled: vi.fn(async () => false),
  11. };
  12. });
  13. vi.mock("@/lib/config", async (importOriginal) => {
  14. const actual = await importOriginal<typeof import("@/lib/config")>();
  15. return {
  16. ...actual,
  17. isHttp2Enabled: mocks.isHttp2Enabled,
  18. };
  19. });
  20. vi.mock("@/lib/logger", () => ({
  21. logger: {
  22. debug: vi.fn(),
  23. info: vi.fn(),
  24. warn: vi.fn(),
  25. trace: vi.fn(),
  26. error: vi.fn(),
  27. fatal: vi.fn(),
  28. },
  29. }));
  30. function createProvider(overrides: Partial<Provider> = {}): Provider {
  31. return {
  32. id: 1,
  33. name: "test-chunked",
  34. url: "http://127.0.0.1:1",
  35. key: "k",
  36. providerVendorId: null,
  37. isEnabled: true,
  38. weight: 1,
  39. priority: 0,
  40. groupPriorities: null,
  41. costMultiplier: 1,
  42. groupTag: null,
  43. providerType: "openai-compatible",
  44. preserveClientIp: false,
  45. modelRedirects: null,
  46. allowedModels: null,
  47. mcpPassthroughType: "none",
  48. mcpPassthroughUrl: null,
  49. limit5hUsd: null,
  50. limitDailyUsd: null,
  51. dailyResetMode: "fixed",
  52. dailyResetTime: "00:00",
  53. limitWeeklyUsd: null,
  54. limitMonthlyUsd: null,
  55. limitTotalUsd: null,
  56. totalCostResetAt: null,
  57. limitConcurrentSessions: 0,
  58. maxRetryAttempts: null,
  59. circuitBreakerFailureThreshold: 5,
  60. circuitBreakerOpenDuration: 1_800_000,
  61. circuitBreakerHalfOpenSuccessThreshold: 2,
  62. proxyUrl: null,
  63. proxyFallbackToDirect: false,
  64. firstByteTimeoutStreamingMs: 30_000,
  65. streamingIdleTimeoutMs: 10_000,
  66. requestTimeoutNonStreamingMs: 0,
  67. websiteUrl: null,
  68. faviconUrl: null,
  69. cacheTtlPreference: null,
  70. context1mPreference: null,
  71. codexReasoningEffortPreference: null,
  72. codexReasoningSummaryPreference: null,
  73. codexTextVerbosityPreference: null,
  74. codexParallelToolCallsPreference: null,
  75. anthropicMaxTokensPreference: null,
  76. anthropicThinkingBudgetPreference: null,
  77. anthropicAdaptiveThinking: null,
  78. geminiGoogleSearchPreference: null,
  79. tpm: 0,
  80. rpm: 0,
  81. rpd: 0,
  82. cc: 0,
  83. createdAt: new Date(),
  84. updatedAt: new Date(),
  85. deletedAt: null,
  86. ...overrides,
  87. };
  88. }
  89. function createSession(params?: { clientAbortSignal?: AbortSignal | null }): ProxySession {
  90. const headers = new Headers();
  91. const session = Object.create(ProxySession.prototype);
  92. Object.assign(session, {
  93. startTime: Date.now(),
  94. method: "POST",
  95. requestUrl: new URL("https://example.com/v1/chat/completions"),
  96. headers,
  97. originalHeaders: new Headers(headers),
  98. headerLog: JSON.stringify(Object.fromEntries(headers.entries())),
  99. request: {
  100. model: "gpt-5.2",
  101. log: "(test)",
  102. message: {
  103. model: "gpt-5.2",
  104. messages: [{ role: "user", content: "hi" }],
  105. },
  106. },
  107. userAgent: null,
  108. context: null,
  109. clientAbortSignal: params?.clientAbortSignal ?? null,
  110. userName: "test-user",
  111. authState: { success: true, user: null, key: null, apiKey: null },
  112. provider: null,
  113. messageContext: null,
  114. sessionId: null,
  115. requestSequence: 1,
  116. originalFormat: "claude",
  117. providerType: null,
  118. originalModelName: null,
  119. originalUrlPathname: null,
  120. providerChain: [],
  121. cacheTtlResolved: null,
  122. context1mApplied: false,
  123. specialSettings: [],
  124. cachedPriceData: undefined,
  125. cachedBillingModelSource: undefined,
  126. endpointPolicy: resolveEndpointPolicy("/v1/chat/completions"),
  127. isHeaderModified: () => false,
  128. });
  129. return session as ProxySession;
  130. }
  131. /**
  132. * Start a local server that returns 200 + application/json + chunked body (no Content-Length).
  133. * The body is larger than 32 KiB to trigger the truncated path in readResponseTextUpTo.
  134. */
  135. async function startChunkedServer(
  136. bodySize: number
  137. ): Promise<{ baseUrl: string; close: () => Promise<void> }> {
  138. const sockets = new Set<Socket>();
  139. const server = createServer((_req, res) => {
  140. // Chunked transfer encoding: write headers without Content-Length,
  141. // then write body in multiple chunks.
  142. res.writeHead(200, { "content-type": "application/json" });
  143. // Build a valid JSON body larger than bodySize.
  144. // Use a simple structure: {"data":"AAAA..."}
  145. const padding = "A".repeat(bodySize);
  146. const body = JSON.stringify({ data: padding });
  147. // Write in ~4KB chunks to simulate realistic chunked transfer
  148. const chunkSize = 4096;
  149. let offset = 0;
  150. const writeNext = () => {
  151. while (offset < body.length) {
  152. const slice = body.slice(offset, offset + chunkSize);
  153. offset += chunkSize;
  154. if (!res.write(slice)) {
  155. res.once("drain", writeNext);
  156. return;
  157. }
  158. }
  159. res.end();
  160. };
  161. writeNext();
  162. });
  163. server.on("connection", (socket) => {
  164. sockets.add(socket);
  165. socket.on("close", () => sockets.delete(socket));
  166. });
  167. const baseUrl = await new Promise<string>((resolve, reject) => {
  168. server.once("error", reject);
  169. server.listen(0, "127.0.0.1", () => {
  170. const addr = server.address();
  171. if (!addr || typeof addr === "string") {
  172. reject(new Error("Failed to get server address"));
  173. return;
  174. }
  175. resolve(`http://127.0.0.1:${addr.port}`);
  176. });
  177. });
  178. const close = async () => {
  179. for (const socket of sockets) {
  180. try {
  181. socket.destroy();
  182. } catch {
  183. // ignore
  184. }
  185. }
  186. sockets.clear();
  187. await new Promise<void>((resolve) => server.close(() => resolve()));
  188. };
  189. return { baseUrl, close };
  190. }
  191. describe("ProxyForwarder - large chunked non-streaming response", () => {
  192. test("200 + chunked + no Content-Length + >32KiB body must not hang on body inspection", async () => {
  193. // 64 KiB body: well above the 32 KiB inspection limit to trigger truncated + cancel
  194. const { baseUrl, close } = await startChunkedServer(64 * 1024);
  195. const clientAbortController = new AbortController();
  196. try {
  197. const provider = createProvider({
  198. url: baseUrl,
  199. // Disable response timeout so the only thing that can hang is readResponseTextUpTo
  200. requestTimeoutNonStreamingMs: 0,
  201. });
  202. const session = createSession({ clientAbortSignal: clientAbortController.signal });
  203. session.setProvider(provider);
  204. const doForward = (
  205. ProxyForwarder as unknown as {
  206. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  207. }
  208. ).doForward;
  209. const forwardPromise = doForward.call(
  210. ProxyForwarder,
  211. session,
  212. provider,
  213. baseUrl
  214. ) as Promise<Response>;
  215. const result = await Promise.race([
  216. forwardPromise.then(
  217. (response) => ({ type: "resolved" as const, response: response as Response }),
  218. (error) => ({ type: "rejected" as const, error })
  219. ),
  220. new Promise<{ type: "timeout" }>((resolve) =>
  221. setTimeout(() => resolve({ type: "timeout" as const }), 5_000)
  222. ),
  223. ]);
  224. if (result.type === "timeout") {
  225. clientAbortController.abort(new Error("test_timeout"));
  226. throw new Error(
  227. "doForward timed out: readResponseTextUpTo likely blocking on reader.cancel() for large chunked response"
  228. );
  229. }
  230. // doForward should resolve successfully (200 response)
  231. expect(result.type).toBe("resolved");
  232. const response = (result as { type: "resolved"; response: Response }).response;
  233. expect(response.status).toBe(200);
  234. // The response body must be fully readable by the client
  235. const bodyText = await response.text();
  236. expect(bodyText.length).toBeGreaterThan(64 * 1024);
  237. const parsed = JSON.parse(bodyText);
  238. expect(parsed.data).toBeDefined();
  239. } finally {
  240. await close();
  241. }
  242. });
  243. });