proxy-forwarder-nonok-body-hang.test.ts 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import { createServer } from "node:http";
  2. import type { Socket } from "node:net";
  3. import { describe, expect, test, vi } from "vitest";
  4. import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder";
  5. import { ProxyError } from "@/app/v1/_lib/proxy/errors";
  6. import { ProxySession } from "@/app/v1/_lib/proxy/session";
  7. import type { Provider } from "@/types/provider";
  8. const mocks = vi.hoisted(() => {
  9. return {
  10. isHttp2Enabled: vi.fn(async () => false),
  11. };
  12. });
  13. vi.mock("@/lib/config", async (importOriginal) => {
  14. const actual = await importOriginal<typeof import("@/lib/config")>();
  15. return {
  16. ...actual,
  17. isHttp2Enabled: mocks.isHttp2Enabled,
  18. };
  19. });
  20. vi.mock("@/lib/logger", () => ({
  21. logger: {
  22. debug: vi.fn(),
  23. info: vi.fn(),
  24. warn: vi.fn(),
  25. trace: vi.fn(),
  26. error: vi.fn(),
  27. fatal: vi.fn(),
  28. },
  29. }));
  30. function createProvider(overrides: Partial<Provider> = {}): Provider {
  31. return {
  32. id: 1,
  33. name: "p1",
  34. url: "http://127.0.0.1:1",
  35. key: "k",
  36. providerVendorId: null,
  37. isEnabled: true,
  38. weight: 1,
  39. priority: 0,
  40. groupPriorities: null,
  41. costMultiplier: 1,
  42. groupTag: null,
  43. providerType: "openai-compatible",
  44. preserveClientIp: false,
  45. modelRedirects: null,
  46. allowedModels: null,
  47. mcpPassthroughType: "none",
  48. mcpPassthroughUrl: null,
  49. limit5hUsd: null,
  50. limitDailyUsd: null,
  51. dailyResetMode: "fixed",
  52. dailyResetTime: "00:00",
  53. limitWeeklyUsd: null,
  54. limitMonthlyUsd: null,
  55. limitTotalUsd: null,
  56. totalCostResetAt: null,
  57. limitConcurrentSessions: 0,
  58. maxRetryAttempts: null,
  59. circuitBreakerFailureThreshold: 5,
  60. circuitBreakerOpenDuration: 1_800_000,
  61. circuitBreakerHalfOpenSuccessThreshold: 2,
  62. proxyUrl: null,
  63. proxyFallbackToDirect: false,
  64. firstByteTimeoutStreamingMs: 30_000,
  65. streamingIdleTimeoutMs: 10_000,
  66. requestTimeoutNonStreamingMs: 1_000,
  67. websiteUrl: null,
  68. faviconUrl: null,
  69. cacheTtlPreference: null,
  70. context1mPreference: null,
  71. codexReasoningEffortPreference: null,
  72. codexReasoningSummaryPreference: null,
  73. codexTextVerbosityPreference: null,
  74. codexParallelToolCallsPreference: null,
  75. anthropicMaxTokensPreference: null,
  76. anthropicThinkingBudgetPreference: null,
  77. geminiGoogleSearchPreference: null,
  78. tpm: 0,
  79. rpm: 0,
  80. rpd: 0,
  81. cc: 0,
  82. createdAt: new Date(),
  83. updatedAt: new Date(),
  84. deletedAt: null,
  85. ...overrides,
  86. };
  87. }
  88. function createSession(params?: { clientAbortSignal?: AbortSignal | null }): ProxySession {
  89. const headers = new Headers();
  90. const session = Object.create(ProxySession.prototype);
  91. Object.assign(session, {
  92. startTime: Date.now(),
  93. method: "POST",
  94. requestUrl: new URL("https://example.com/v1/chat/completions"),
  95. headers,
  96. originalHeaders: new Headers(headers),
  97. headerLog: JSON.stringify(Object.fromEntries(headers.entries())),
  98. request: {
  99. model: "gpt-5.2",
  100. log: "(test)",
  101. message: {
  102. model: "gpt-5.2",
  103. messages: [{ role: "user", content: "hi" }],
  104. },
  105. },
  106. userAgent: null,
  107. context: null,
  108. clientAbortSignal: params?.clientAbortSignal ?? null,
  109. userName: "test-user",
  110. authState: { success: true, user: null, key: null, apiKey: null },
  111. provider: null,
  112. messageContext: null,
  113. sessionId: null,
  114. requestSequence: 1,
  115. originalFormat: "claude",
  116. providerType: null,
  117. originalModelName: null,
  118. originalUrlPathname: null,
  119. providerChain: [],
  120. cacheTtlResolved: null,
  121. context1mApplied: false,
  122. specialSettings: [],
  123. cachedPriceData: undefined,
  124. cachedBillingModelSource: undefined,
  125. isHeaderModified: () => false,
  126. });
  127. return session as ProxySession;
  128. }
  129. async function startServer(): Promise<{ baseUrl: string; close: () => Promise<void> }> {
  130. const sockets = new Set<Socket>();
  131. const server = createServer((req, res) => {
  132. // 模拟上游异常:返回 403,但永远不结束 body(导致 response.text() 无限等待)
  133. res.writeHead(403, { "content-type": "application/json" });
  134. res.write(JSON.stringify({ error: { message: "forbidden" } }));
  135. // 连接/请求关闭时,主动销毁响应,避免测试进程残留挂起连接(降低 flakiness)
  136. const cleanup = () => {
  137. try {
  138. res.destroy();
  139. } catch {
  140. // ignore
  141. }
  142. };
  143. req.on("aborted", cleanup);
  144. req.on("close", cleanup);
  145. });
  146. server.on("connection", (socket) => {
  147. sockets.add(socket);
  148. socket.on("close", () => sockets.delete(socket));
  149. });
  150. const baseUrl = await new Promise<string>((resolve, reject) => {
  151. server.once("error", reject);
  152. server.listen(0, "127.0.0.1", () => {
  153. const addr = server.address();
  154. if (!addr || typeof addr === "string") {
  155. reject(new Error("Failed to get server address"));
  156. return;
  157. }
  158. resolve(`http://127.0.0.1:${addr.port}`);
  159. });
  160. });
  161. const close = async () => {
  162. // server.close 只停止接收新连接;这里显式销毁已有 socket,避免挂死/跑飞
  163. for (const socket of sockets) {
  164. try {
  165. socket.destroy();
  166. } catch {
  167. // ignore
  168. }
  169. }
  170. sockets.clear();
  171. await new Promise<void>((resolve) => server.close(() => resolve()));
  172. };
  173. return { baseUrl, close };
  174. }
  175. describe("ProxyForwarder - non-ok response body hang", () => {
  176. test("HTTP 4xx/5xx 在 body 不结束时也应被超时中断,避免请求悬挂", async () => {
  177. const { baseUrl, close } = await startServer();
  178. const clientAbortController = new AbortController();
  179. try {
  180. const provider = createProvider({
  181. url: baseUrl,
  182. requestTimeoutNonStreamingMs: 200,
  183. });
  184. const session = createSession({ clientAbortSignal: clientAbortController.signal });
  185. session.setProvider(provider);
  186. // 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。
  187. const doForward = (
  188. ProxyForwarder as unknown as {
  189. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  190. }
  191. ).doForward;
  192. const forwardPromise = doForward.call(
  193. ProxyForwarder,
  194. session,
  195. provider,
  196. baseUrl
  197. ) as Promise<Response>;
  198. const result = await Promise.race([
  199. forwardPromise.then(
  200. () => ({ type: "resolved" as const }),
  201. (error) => ({ type: "rejected" as const, error })
  202. ),
  203. new Promise<{ type: "timeout" }>((resolve) =>
  204. setTimeout(() => resolve({ type: "timeout" as const }), 2_000)
  205. ),
  206. ]);
  207. if (result.type === "timeout") {
  208. // 兜底:避免回归时测试套件整体挂死
  209. clientAbortController.abort(new Error("test_timeout"));
  210. throw new Error("doForward 超时未返回:可能存在非 ok 响应体读取悬挂问题");
  211. }
  212. expect(result.type).toBe("rejected");
  213. expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError);
  214. const err = (result as { type: "rejected"; error: unknown }).error as ProxyError;
  215. expect(err.statusCode).toBe(403);
  216. } finally {
  217. await close();
  218. }
  219. });
  220. test("代理失败降级到直连后也必须恢复 response timeout,避免非 ok 响应体读取悬挂", async () => {
  221. const { baseUrl, close } = await startServer();
  222. const clientAbortController = new AbortController();
  223. try {
  224. const provider = createProvider({
  225. url: baseUrl,
  226. proxyUrl: "http://127.0.0.1:1", // 不可用的代理,触发 fallbackToDirect
  227. proxyFallbackToDirect: true,
  228. requestTimeoutNonStreamingMs: 200,
  229. });
  230. const session = createSession({ clientAbortSignal: clientAbortController.signal });
  231. session.setProvider(provider);
  232. // 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。
  233. const doForward = (
  234. ProxyForwarder as unknown as {
  235. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  236. }
  237. ).doForward;
  238. const forwardPromise = doForward.call(
  239. ProxyForwarder,
  240. session,
  241. provider,
  242. baseUrl
  243. ) as Promise<Response>;
  244. const result = await Promise.race([
  245. forwardPromise.then(
  246. () => ({ type: "resolved" as const }),
  247. (error) => ({ type: "rejected" as const, error })
  248. ),
  249. new Promise<{ type: "timeout" }>((resolve) =>
  250. setTimeout(() => resolve({ type: "timeout" as const }), 2_000)
  251. ),
  252. ]);
  253. if (result.type === "timeout") {
  254. // 兜底:避免回归时测试套件整体挂死
  255. clientAbortController.abort(new Error("test_timeout"));
  256. throw new Error("doForward 超时未返回:可能存在代理降级后 response timeout 未恢复的问题");
  257. }
  258. expect(result.type).toBe("rejected");
  259. expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError);
  260. const err = (result as { type: "rejected"; error: unknown }).error as ProxyError;
  261. expect(err.statusCode).toBe(403);
  262. } finally {
  263. await close();
  264. }
  265. });
  266. });