response-handler-gemini-stream-passthrough-timeouts.test.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. import { createServer } from "node:http";
  2. import type { Socket } from "node:net";
  3. import { beforeEach, describe, expect, test, vi } from "vitest";
  4. import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder";
  5. import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy";
  6. import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler";
  7. import { ProxySession } from "@/app/v1/_lib/proxy/session";
  8. import { SessionManager } from "@/lib/session-manager";
  9. import type { Provider } from "@/types/provider";
  10. const asyncTasks: Promise<void>[] = [];
  11. const mocks = vi.hoisted(() => {
  12. return {
  13. isHttp2Enabled: vi.fn(async () => false),
  14. };
  15. });
  16. beforeEach(() => {
  17. mocks.isHttp2Enabled.mockReset();
  18. mocks.isHttp2Enabled.mockResolvedValue(false);
  19. });
  20. vi.mock("@/lib/config", async (importOriginal) => {
  21. const actual = await importOriginal<typeof import("@/lib/config")>();
  22. return {
  23. ...actual,
  24. isHttp2Enabled: mocks.isHttp2Enabled,
  25. };
  26. });
  27. vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({
  28. ResponseFixer: {
  29. process: async (_session: unknown, response: Response) => response,
  30. },
  31. }));
  32. vi.mock("@/lib/async-task-manager", () => ({
  33. AsyncTaskManager: {
  34. register: (_taskId: string, promise: Promise<void>) => {
  35. asyncTasks.push(promise);
  36. return new AbortController();
  37. },
  38. cleanup: () => {},
  39. cancel: () => {},
  40. },
  41. }));
  42. vi.mock("@/lib/logger", () => ({
  43. logger: {
  44. debug: vi.fn(),
  45. info: vi.fn(),
  46. warn: vi.fn(),
  47. trace: vi.fn(),
  48. error: vi.fn(),
  49. },
  50. }));
  51. vi.mock("@/repository/message", () => ({
  52. updateMessageRequestCost: vi.fn(),
  53. updateMessageRequestDetails: vi.fn(),
  54. updateMessageRequestDuration: vi.fn(),
  55. }));
  56. vi.mock("@/repository/system-config", () => ({
  57. getSystemSettings: vi.fn(async () => ({ billingModelSource: "original" })),
  58. }));
  59. vi.mock("@/repository/model-price", () => ({
  60. findLatestPriceByModel: vi.fn(async () => ({
  61. priceData: { input_cost_per_token: 0, output_cost_per_token: 0 },
  62. })),
  63. }));
  64. vi.mock("@/lib/session-manager", () => ({
  65. SessionManager: {
  66. storeSessionResponse: vi.fn(),
  67. updateSessionUsage: vi.fn(),
  68. clearSessionProvider: vi.fn(),
  69. storeSessionUpstreamRequestMeta: vi.fn(async () => undefined),
  70. storeSessionSpecialSettings: vi.fn(async () => undefined),
  71. storeSessionRequestHeaders: vi.fn(async () => undefined),
  72. storeSessionResponseHeaders: vi.fn(async () => undefined),
  73. storeSessionUpstreamResponseMeta: vi.fn(async () => undefined),
  74. },
  75. }));
  76. vi.mock("@/lib/proxy-status-tracker", () => ({
  77. ProxyStatusTracker: {
  78. getInstance: () => ({
  79. endRequest: () => {},
  80. }),
  81. },
  82. }));
  83. function createProvider(overrides: Partial<Provider> = {}): Provider {
  84. return {
  85. id: 1,
  86. name: "p1",
  87. url: "http://127.0.0.1:1",
  88. key: "k",
  89. providerVendorId: null,
  90. isEnabled: true,
  91. weight: 1,
  92. priority: 0,
  93. groupPriorities: null,
  94. costMultiplier: 1,
  95. groupTag: null,
  96. providerType: "gemini",
  97. preserveClientIp: false,
  98. modelRedirects: null,
  99. allowedModels: null,
  100. mcpPassthroughType: "none",
  101. mcpPassthroughUrl: null,
  102. limit5hUsd: null,
  103. limitDailyUsd: null,
  104. dailyResetMode: "fixed",
  105. dailyResetTime: "00:00",
  106. limitWeeklyUsd: null,
  107. limitMonthlyUsd: null,
  108. limitTotalUsd: null,
  109. totalCostResetAt: null,
  110. limitConcurrentSessions: 0,
  111. maxRetryAttempts: null,
  112. circuitBreakerFailureThreshold: 5,
  113. circuitBreakerOpenDuration: 1_800_000,
  114. circuitBreakerHalfOpenSuccessThreshold: 2,
  115. proxyUrl: null,
  116. proxyFallbackToDirect: false,
  117. firstByteTimeoutStreamingMs: 100,
  118. streamingIdleTimeoutMs: 0,
  119. requestTimeoutNonStreamingMs: 0,
  120. websiteUrl: null,
  121. faviconUrl: null,
  122. cacheTtlPreference: null,
  123. context1mPreference: null,
  124. codexReasoningEffortPreference: null,
  125. codexReasoningSummaryPreference: null,
  126. codexTextVerbosityPreference: null,
  127. codexParallelToolCallsPreference: null,
  128. anthropicMaxTokensPreference: null,
  129. anthropicThinkingBudgetPreference: null,
  130. geminiGoogleSearchPreference: null,
  131. tpm: 0,
  132. rpm: 0,
  133. rpd: 0,
  134. cc: 0,
  135. createdAt: new Date(),
  136. updatedAt: new Date(),
  137. deletedAt: null,
  138. ...overrides,
  139. };
  140. }
  141. function createSession(params: {
  142. clientAbortSignal: AbortSignal;
  143. messageId: number;
  144. userId: number;
  145. }): ProxySession {
  146. const headers = new Headers();
  147. const session = Object.create(ProxySession.prototype);
  148. Object.assign(session, {
  149. startTime: Date.now(),
  150. method: "POST",
  151. requestUrl: new URL("https://example.com/v1/chat/completions"),
  152. headers,
  153. originalHeaders: new Headers(headers),
  154. headerLog: JSON.stringify(Object.fromEntries(headers.entries())),
  155. request: {
  156. model: "gemini-2.0-flash",
  157. log: "(test)",
  158. message: {
  159. model: "gemini-2.0-flash",
  160. stream: true,
  161. messages: [{ role: "user", content: "hi" }],
  162. },
  163. },
  164. userAgent: null,
  165. context: null,
  166. clientAbortSignal: params.clientAbortSignal,
  167. userName: "test-user",
  168. authState: { success: true, user: null, key: null, apiKey: null },
  169. provider: null,
  170. messageContext: {
  171. id: params.messageId,
  172. createdAt: new Date(),
  173. user: { id: params.userId, name: "u1" },
  174. },
  175. sessionId: null,
  176. requestSequence: 1,
  177. originalFormat: "gemini",
  178. providerType: null,
  179. originalModelName: null,
  180. originalUrlPathname: null,
  181. providerChain: [],
  182. cacheTtlResolved: null,
  183. context1mApplied: false,
  184. specialSettings: [],
  185. cachedPriceData: undefined,
  186. cachedBillingModelSource: undefined,
  187. endpointPolicy: resolveEndpointPolicy("/v1/chat/completions"),
  188. isHeaderModified: () => false,
  189. });
  190. return session as ProxySession;
  191. }
  192. async function startSseServer(handler: Parameters<typeof createServer>[0]): Promise<{
  193. baseUrl: string;
  194. close: () => Promise<void>;
  195. }> {
  196. const sockets = new Set<Socket>();
  197. const server = createServer(handler);
  198. server.on("connection", (socket) => {
  199. sockets.add(socket);
  200. socket.on("close", () => sockets.delete(socket));
  201. });
  202. const baseUrl = await new Promise<string>((resolve, reject) => {
  203. server.once("error", reject);
  204. server.listen(0, "127.0.0.1", () => {
  205. const addr = server.address();
  206. if (!addr || typeof addr === "string") {
  207. reject(new Error("Failed to get server address"));
  208. return;
  209. }
  210. resolve(`http://127.0.0.1:${addr.port}`);
  211. });
  212. });
  213. const close = async () => {
  214. for (const socket of sockets) {
  215. try {
  216. socket.destroy();
  217. } catch {
  218. // ignore
  219. }
  220. }
  221. sockets.clear();
  222. await new Promise<void>((resolve) => server.close(() => resolve()));
  223. };
  224. return { baseUrl, close };
  225. }
  226. async function readWithTimeout(
  227. reader: ReadableStreamDefaultReader<Uint8Array>,
  228. timeoutMs: number
  229. ): Promise<
  230. | { ok: true; value: ReadableStreamReadResult<Uint8Array> }
  231. | { ok: true; error: unknown }
  232. | { ok: false; reason: "timeout" }
  233. > {
  234. const result = await Promise.race([
  235. reader
  236. .read()
  237. .then((value) => ({ ok: true as const, value }))
  238. .catch((error) => ({ ok: true as const, error })),
  239. new Promise<{ ok: false; reason: "timeout" }>((resolve) =>
  240. setTimeout(() => resolve({ ok: false as const, reason: "timeout" }), timeoutMs)
  241. ),
  242. ]);
  243. return result;
  244. }
  245. describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => {
  246. test("不应在仅收到 headers 时清除首字节超时:无首块数据时应在窗口内中断避免悬挂", async () => {
  247. asyncTasks.length = 0;
  248. const { baseUrl, close } = await startSseServer((_req, res) => {
  249. res.writeHead(200, {
  250. "content-type": "text/event-stream",
  251. "cache-control": "no-cache",
  252. connection: "keep-alive",
  253. });
  254. res.flushHeaders();
  255. // 不发送任何 body,保持连接不结束
  256. });
  257. const clientAbortController = new AbortController();
  258. try {
  259. const provider = createProvider({
  260. url: baseUrl,
  261. firstByteTimeoutStreamingMs: 200,
  262. });
  263. const session = createSession({
  264. clientAbortSignal: clientAbortController.signal,
  265. messageId: 1,
  266. userId: 1,
  267. });
  268. session.setProvider(provider);
  269. const doForward = (
  270. ProxyForwarder as unknown as {
  271. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  272. }
  273. ).doForward;
  274. const upstreamResponse = (await doForward.call(
  275. ProxyForwarder,
  276. session,
  277. provider,
  278. baseUrl
  279. )) as Response;
  280. const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
  281. const reader = clientResponse.body?.getReader();
  282. expect(reader).toBeTruthy();
  283. if (!reader) throw new Error("Missing body reader");
  284. const startedAt = Date.now();
  285. const firstRead = await readWithTimeout(reader, 1500);
  286. if (!firstRead.ok) {
  287. clientAbortController.abort(new Error("test_timeout"));
  288. throw new Error("首字节超时未生效:读首块数据在 1.5s 内仍未返回(可能仍会卡死)");
  289. }
  290. // 断言:应由超时/中断导致读取结束(done=true 或抛错均可)
  291. const ended = ("value" in firstRead && firstRead.value.done === true) || "error" in firstRead;
  292. expect(ended).toBe(true);
  293. // 断言:responseController 应已触发 abort(即首字节超时生效)
  294. const sessionWithController = session as unknown as { responseController?: AbortController };
  295. expect(sessionWithController.responseController?.signal.aborted).toBe(true);
  296. // 粗略时间断言:不应立即返回(避免“无关早退”导致假阳性)
  297. const elapsed = Date.now() - startedAt;
  298. expect(elapsed).toBeGreaterThanOrEqual(120);
  299. } finally {
  300. clientAbortController.abort(new Error("test_cleanup"));
  301. await close();
  302. await Promise.allSettled(asyncTasks);
  303. }
  304. });
  305. test("收到首块数据后应清除首字节超时:后续 chunk 即使晚于 firstByteTimeout 也不应被误中断", async () => {
  306. asyncTasks.length = 0;
  307. const { baseUrl, close } = await startSseServer((_req, res) => {
  308. res.writeHead(200, {
  309. "content-type": "text/event-stream",
  310. "cache-control": "no-cache",
  311. connection: "keep-alive",
  312. });
  313. res.flushHeaders();
  314. res.write('data: {"x":1}\n\n');
  315. setTimeout(() => {
  316. try {
  317. res.write('data: {"x":2}\n\n');
  318. res.end();
  319. } catch {
  320. // ignore
  321. }
  322. }, 150);
  323. });
  324. const clientAbortController = new AbortController();
  325. try {
  326. const provider = createProvider({
  327. url: baseUrl,
  328. firstByteTimeoutStreamingMs: 100,
  329. streamingIdleTimeoutMs: 0,
  330. });
  331. const session = createSession({
  332. clientAbortSignal: clientAbortController.signal,
  333. messageId: 2,
  334. userId: 1,
  335. });
  336. session.setProvider(provider);
  337. const doForward = (
  338. ProxyForwarder as unknown as {
  339. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  340. }
  341. ).doForward;
  342. const upstreamResponse = (await doForward.call(
  343. ProxyForwarder,
  344. session,
  345. provider,
  346. baseUrl
  347. )) as Response;
  348. const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
  349. const fullText = await Promise.race([
  350. clientResponse.text(),
  351. new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1500)),
  352. ]);
  353. if (fullText === "timeout") {
  354. clientAbortController.abort(new Error("test_timeout"));
  355. throw new Error("读取透传响应超时(可能仍会卡死)");
  356. }
  357. // 第二块数据在 150ms 发送,若首字节超时未被清除,则 100ms 左右就会被中断拿不到第二块
  358. expect(fullText).toContain('"x":2');
  359. } finally {
  360. clientAbortController.abort(new Error("test_cleanup"));
  361. await close();
  362. await Promise.allSettled(asyncTasks);
  363. }
  364. });
  365. test("中途静默超过 streamingIdleTimeoutMs 时应中断,避免 200 跑到一半卡死", async () => {
  366. asyncTasks.length = 0;
  367. const { baseUrl, close } = await startSseServer((_req, res) => {
  368. res.writeHead(200, {
  369. "content-type": "text/event-stream",
  370. "cache-control": "no-cache",
  371. connection: "keep-alive",
  372. });
  373. res.flushHeaders();
  374. res.write('data: {"x":1}\n\n');
  375. // 不再发送数据,也不结束连接
  376. });
  377. const clientAbortController = new AbortController();
  378. try {
  379. const provider = createProvider({
  380. url: baseUrl,
  381. firstByteTimeoutStreamingMs: 1000,
  382. streamingIdleTimeoutMs: 120,
  383. });
  384. const session = createSession({
  385. clientAbortSignal: clientAbortController.signal,
  386. messageId: 3,
  387. userId: 1,
  388. });
  389. session.setProvider(provider);
  390. const doForward = (
  391. ProxyForwarder as unknown as {
  392. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  393. }
  394. ).doForward;
  395. const upstreamResponse = (await doForward.call(
  396. ProxyForwarder,
  397. session,
  398. provider,
  399. baseUrl
  400. )) as Response;
  401. const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
  402. const reader = clientResponse.body?.getReader();
  403. expect(reader).toBeTruthy();
  404. if (!reader) throw new Error("Missing body reader");
  405. const first = await readWithTimeout(reader, 1000);
  406. expect(first.ok).toBe(true);
  407. if (!("value" in first)) {
  408. throw new Error("首块数据读取异常:预期拿到 value,但得到 error");
  409. }
  410. expect(first.value.done).toBe(false);
  411. // 静默超时触发后,后续 read 应该在合理时间内结束(done=true 或抛错均可)
  412. const second = await readWithTimeout(reader, 1500);
  413. if (!second.ok) {
  414. clientAbortController.abort(new Error("test_timeout"));
  415. throw new Error("流式静默超时未生效:读后续数据在 1.5s 内仍未返回(可能仍会卡死)");
  416. }
  417. } finally {
  418. clientAbortController.abort(new Error("test_cleanup"));
  419. await close();
  420. await Promise.allSettled(asyncTasks);
  421. }
  422. });
  423. test("客户端中断流式透传后应清理 session provider 绑定,避免下次继续复用旧供应商", async () => {
  424. asyncTasks.length = 0;
  425. const { baseUrl, close } = await startSseServer((_req, res) => {
  426. res.writeHead(200, {
  427. "content-type": "text/event-stream",
  428. "cache-control": "no-cache",
  429. connection: "keep-alive",
  430. });
  431. res.flushHeaders();
  432. res.write('data: {"x":1}\n\n');
  433. setTimeout(() => {
  434. try {
  435. res.write('data: {"x":2}\n\n');
  436. } catch {
  437. // ignore
  438. }
  439. }, 1000);
  440. });
  441. const clientAbortController = new AbortController();
  442. vi.mocked(SessionManager.clearSessionProvider).mockResolvedValue(undefined);
  443. try {
  444. const provider = createProvider({
  445. url: baseUrl,
  446. firstByteTimeoutStreamingMs: 1000,
  447. streamingIdleTimeoutMs: 0,
  448. });
  449. const session = createSession({
  450. clientAbortSignal: clientAbortController.signal,
  451. messageId: 4,
  452. userId: 1,
  453. });
  454. session.setProvider(provider);
  455. session.setSessionId("gemini-abort-session");
  456. const doForward = (
  457. ProxyForwarder as unknown as {
  458. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  459. }
  460. ).doForward;
  461. const upstreamResponse = (await doForward.call(
  462. ProxyForwarder,
  463. session,
  464. provider,
  465. baseUrl
  466. )) as Response;
  467. const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
  468. const reader = clientResponse.body?.getReader();
  469. expect(reader).toBeTruthy();
  470. if (!reader) throw new Error("Missing body reader");
  471. const first = await reader.read();
  472. expect(first.done).toBe(false);
  473. clientAbortController.abort(new Error("client_cancelled"));
  474. await Promise.allSettled(asyncTasks);
  475. expect(vi.mocked(SessionManager.clearSessionProvider)).toHaveBeenCalledWith(
  476. "gemini-abort-session"
  477. );
  478. } finally {
  479. clientAbortController.abort(new Error("test_cleanup"));
  480. await close();
  481. await Promise.allSettled(asyncTasks);
  482. }
  483. });
  484. });