proxy-forwarder-nonok-body-hang.test.ts 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. import { createServer } from "node:http";
  2. import type { Socket } from "node:net";
  3. import { describe, expect, test, vi } from "vitest";
  4. import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder";
  5. import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy";
  6. import { ProxyError } from "@/app/v1/_lib/proxy/errors";
  7. import { ProxySession } from "@/app/v1/_lib/proxy/session";
  8. import type { Provider } from "@/types/provider";
  9. const mocks = vi.hoisted(() => {
  10. return {
  11. isHttp2Enabled: vi.fn(async () => false),
  12. };
  13. });
  14. vi.mock("@/lib/config", async (importOriginal) => {
  15. const actual = await importOriginal<typeof import("@/lib/config")>();
  16. return {
  17. ...actual,
  18. isHttp2Enabled: mocks.isHttp2Enabled,
  19. };
  20. });
  21. vi.mock("@/lib/logger", () => ({
  22. logger: {
  23. debug: vi.fn(),
  24. info: vi.fn(),
  25. warn: vi.fn(),
  26. trace: vi.fn(),
  27. error: vi.fn(),
  28. fatal: vi.fn(),
  29. },
  30. }));
  31. function createProvider(overrides: Partial<Provider> = {}): Provider {
  32. return {
  33. id: 1,
  34. name: "p1",
  35. url: "http://127.0.0.1:1",
  36. key: "k",
  37. providerVendorId: null,
  38. isEnabled: true,
  39. weight: 1,
  40. priority: 0,
  41. groupPriorities: null,
  42. costMultiplier: 1,
  43. groupTag: null,
  44. providerType: "openai-compatible",
  45. preserveClientIp: false,
  46. modelRedirects: null,
  47. allowedModels: null,
  48. mcpPassthroughType: "none",
  49. mcpPassthroughUrl: null,
  50. limit5hUsd: null,
  51. limitDailyUsd: null,
  52. dailyResetMode: "fixed",
  53. dailyResetTime: "00:00",
  54. limitWeeklyUsd: null,
  55. limitMonthlyUsd: null,
  56. limitTotalUsd: null,
  57. totalCostResetAt: null,
  58. limitConcurrentSessions: 0,
  59. maxRetryAttempts: null,
  60. circuitBreakerFailureThreshold: 5,
  61. circuitBreakerOpenDuration: 1_800_000,
  62. circuitBreakerHalfOpenSuccessThreshold: 2,
  63. proxyUrl: null,
  64. proxyFallbackToDirect: false,
  65. firstByteTimeoutStreamingMs: 30_000,
  66. streamingIdleTimeoutMs: 10_000,
  67. requestTimeoutNonStreamingMs: 1_000,
  68. websiteUrl: null,
  69. faviconUrl: null,
  70. cacheTtlPreference: null,
  71. context1mPreference: null,
  72. codexReasoningEffortPreference: null,
  73. codexReasoningSummaryPreference: null,
  74. codexTextVerbosityPreference: null,
  75. codexParallelToolCallsPreference: null,
  76. anthropicMaxTokensPreference: null,
  77. anthropicThinkingBudgetPreference: null,
  78. geminiGoogleSearchPreference: null,
  79. tpm: 0,
  80. rpm: 0,
  81. rpd: 0,
  82. cc: 0,
  83. createdAt: new Date(),
  84. updatedAt: new Date(),
  85. deletedAt: null,
  86. ...overrides,
  87. };
  88. }
  89. function createSession(params?: { clientAbortSignal?: AbortSignal | null }): ProxySession {
  90. const headers = new Headers();
  91. const session = Object.create(ProxySession.prototype);
  92. Object.assign(session, {
  93. startTime: Date.now(),
  94. method: "POST",
  95. requestUrl: new URL("https://example.com/v1/chat/completions"),
  96. headers,
  97. originalHeaders: new Headers(headers),
  98. headerLog: JSON.stringify(Object.fromEntries(headers.entries())),
  99. request: {
  100. model: "gpt-5.2",
  101. log: "(test)",
  102. message: {
  103. model: "gpt-5.2",
  104. messages: [{ role: "user", content: "hi" }],
  105. },
  106. },
  107. userAgent: null,
  108. context: null,
  109. clientAbortSignal: params?.clientAbortSignal ?? null,
  110. userName: "test-user",
  111. authState: { success: true, user: null, key: null, apiKey: null },
  112. provider: null,
  113. messageContext: null,
  114. sessionId: null,
  115. requestSequence: 1,
  116. originalFormat: "claude",
  117. providerType: null,
  118. originalModelName: null,
  119. originalUrlPathname: null,
  120. providerChain: [],
  121. cacheTtlResolved: null,
  122. context1mApplied: false,
  123. specialSettings: [],
  124. cachedPriceData: undefined,
  125. cachedBillingModelSource: undefined,
  126. endpointPolicy: resolveEndpointPolicy("/v1/chat/completions"),
  127. isHeaderModified: () => false,
  128. });
  129. return session as ProxySession;
  130. }
  131. async function startServer(): Promise<{ baseUrl: string; close: () => Promise<void> }> {
  132. const sockets = new Set<Socket>();
  133. const server = createServer((req, res) => {
  134. // 模拟上游异常:返回 403,但永远不结束 body(导致 response.text() 无限等待)
  135. res.writeHead(403, { "content-type": "application/json" });
  136. res.write(JSON.stringify({ error: { message: "forbidden" } }));
  137. // 连接/请求关闭时,主动销毁响应,避免测试进程残留挂起连接(降低 flakiness)
  138. const cleanup = () => {
  139. try {
  140. res.destroy();
  141. } catch {
  142. // ignore
  143. }
  144. };
  145. req.on("aborted", cleanup);
  146. req.on("close", cleanup);
  147. });
  148. server.on("connection", (socket) => {
  149. sockets.add(socket);
  150. socket.on("close", () => sockets.delete(socket));
  151. });
  152. const baseUrl = await new Promise<string>((resolve, reject) => {
  153. server.once("error", reject);
  154. server.listen(0, "127.0.0.1", () => {
  155. const addr = server.address();
  156. if (!addr || typeof addr === "string") {
  157. reject(new Error("Failed to get server address"));
  158. return;
  159. }
  160. resolve(`http://127.0.0.1:${addr.port}`);
  161. });
  162. });
  163. const close = async () => {
  164. // server.close 只停止接收新连接;这里显式销毁已有 socket,避免挂死/跑飞
  165. for (const socket of sockets) {
  166. try {
  167. socket.destroy();
  168. } catch {
  169. // ignore
  170. }
  171. }
  172. sockets.clear();
  173. await new Promise<void>((resolve) => server.close(() => resolve()));
  174. };
  175. return { baseUrl, close };
  176. }
  177. describe("ProxyForwarder - non-ok response body hang", () => {
  178. test("HTTP 4xx/5xx 在 body 不结束时也应被超时中断,避免请求悬挂", async () => {
  179. const { baseUrl, close } = await startServer();
  180. const clientAbortController = new AbortController();
  181. try {
  182. const provider = createProvider({
  183. url: baseUrl,
  184. requestTimeoutNonStreamingMs: 200,
  185. });
  186. const session = createSession({ clientAbortSignal: clientAbortController.signal });
  187. session.setProvider(provider);
  188. // 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。
  189. const doForward = (
  190. ProxyForwarder as unknown as {
  191. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  192. }
  193. ).doForward;
  194. const forwardPromise = doForward.call(
  195. ProxyForwarder,
  196. session,
  197. provider,
  198. baseUrl
  199. ) as Promise<Response>;
  200. const result = await Promise.race([
  201. forwardPromise.then(
  202. () => ({ type: "resolved" as const }),
  203. (error) => ({ type: "rejected" as const, error })
  204. ),
  205. new Promise<{ type: "timeout" }>((resolve) =>
  206. setTimeout(() => resolve({ type: "timeout" as const }), 2_000)
  207. ),
  208. ]);
  209. if (result.type === "timeout") {
  210. // 兜底:避免回归时测试套件整体挂死
  211. clientAbortController.abort(new Error("test_timeout"));
  212. throw new Error("doForward 超时未返回:可能存在非 ok 响应体读取悬挂问题");
  213. }
  214. expect(result.type).toBe("rejected");
  215. expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError);
  216. const err = (result as { type: "rejected"; error: unknown }).error as ProxyError;
  217. expect(err.statusCode).toBe(403);
  218. } finally {
  219. await close();
  220. }
  221. });
  222. test("代理失败降级到直连后也必须恢复 response timeout,避免非 ok 响应体读取悬挂", async () => {
  223. const { baseUrl, close } = await startServer();
  224. const clientAbortController = new AbortController();
  225. try {
  226. const provider = createProvider({
  227. url: baseUrl,
  228. proxyUrl: "http://127.0.0.1:1", // 不可用的代理,触发 fallbackToDirect
  229. proxyFallbackToDirect: true,
  230. requestTimeoutNonStreamingMs: 200,
  231. });
  232. const session = createSession({ clientAbortSignal: clientAbortController.signal });
  233. session.setProvider(provider);
  234. // 直接测试 doForward 以隔离单次转发行为,避免 send() 的重试/供应商切换逻辑干扰。
  235. const doForward = (
  236. ProxyForwarder as unknown as {
  237. doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
  238. }
  239. ).doForward;
  240. const forwardPromise = doForward.call(
  241. ProxyForwarder,
  242. session,
  243. provider,
  244. baseUrl
  245. ) as Promise<Response>;
  246. const result = await Promise.race([
  247. forwardPromise.then(
  248. () => ({ type: "resolved" as const }),
  249. (error) => ({ type: "rejected" as const, error })
  250. ),
  251. new Promise<{ type: "timeout" }>((resolve) =>
  252. setTimeout(() => resolve({ type: "timeout" as const }), 2_000)
  253. ),
  254. ]);
  255. if (result.type === "timeout") {
  256. // 兜底:避免回归时测试套件整体挂死
  257. clientAbortController.abort(new Error("test_timeout"));
  258. throw new Error("doForward 超时未返回:可能存在代理降级后 response timeout 未恢复的问题");
  259. }
  260. expect(result.type).toBe("rejected");
  261. expect(result.type === "rejected" ? result.error : null).toBeInstanceOf(ProxyError);
  262. const err = (result as { type: "rejected"; error: unknown }).error as ProxyError;
  263. expect(err.statusCode).toBe(403);
  264. } finally {
  265. await close();
  266. }
  267. });
  268. });