response-handler-endpoint-circuit-isolation.test.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /**
  2. * Tests for endpoint circuit breaker isolation in response-handler.ts
  3. *
  4. * Verifies that key-level errors (fake 200, non-200 HTTP, stream abort) do NOT
  5. * call recordEndpointFailure. Only forwarder-level failures (timeout, network
  6. * error) and probe failures should penalize the endpoint circuit breaker.
  7. *
  8. * Streaming success DOES call recordEndpointSuccess (regression guard).
  9. */
  10. import { beforeEach, describe, expect, it, vi } from "vitest";
  11. import type { ModelPriceData } from "@/types/model-price";
  12. // Track async tasks for draining
  13. const asyncTasks: Promise<void>[] = [];
  14. vi.mock("@/lib/async-task-manager", () => ({
  15. AsyncTaskManager: {
  16. register: (_taskId: string, promise: Promise<void>) => {
  17. asyncTasks.push(promise);
  18. return new AbortController();
  19. },
  20. cleanup: () => {},
  21. cancel: () => {},
  22. },
  23. }));
  24. vi.mock("@/lib/logger", () => ({
  25. logger: {
  26. debug: () => {},
  27. info: () => {},
  28. warn: () => {},
  29. error: () => {},
  30. trace: () => {},
  31. },
  32. }));
  33. vi.mock("@/lib/price-sync/cloud-price-updater", () => ({
  34. requestCloudPriceTableSync: () => {},
  35. }));
  36. vi.mock("@/repository/model-price", () => ({
  37. findLatestPriceByModel: vi.fn(),
  38. }));
  39. vi.mock("@/repository/system-config", () => ({
  40. getSystemSettings: vi.fn(),
  41. }));
  42. vi.mock("@/repository/message", () => ({
  43. updateMessageRequestCost: vi.fn(),
  44. updateMessageRequestDetails: vi.fn(),
  45. updateMessageRequestDuration: vi.fn(),
  46. }));
  47. vi.mock("@/lib/session-manager", () => ({
  48. SessionManager: {
  49. updateSessionUsage: vi.fn(),
  50. storeSessionResponse: vi.fn(),
  51. extractCodexPromptCacheKey: vi.fn(),
  52. updateSessionWithCodexCacheKey: vi.fn(),
  53. },
  54. }));
  55. vi.mock("@/lib/rate-limit", () => ({
  56. RateLimitService: {
  57. trackCost: vi.fn(),
  58. trackUserDailyCost: vi.fn(),
  59. decrementLeaseBudget: vi.fn(),
  60. },
  61. }));
  62. vi.mock("@/lib/session-tracker", () => ({
  63. SessionTracker: {
  64. refreshSession: vi.fn(),
  65. },
  66. }));
  67. vi.mock("@/lib/proxy-status-tracker", () => ({
  68. ProxyStatusTracker: {
  69. getInstance: () => ({
  70. endRequest: () => {},
  71. }),
  72. },
  73. }));
  74. // Mock circuit breakers with tracked spies (vi.hoisted to avoid TDZ with vi.mock hoisting)
  75. const { mockRecordFailure, mockRecordEndpointFailure, mockRecordEndpointSuccess } = vi.hoisted(
  76. () => ({
  77. mockRecordFailure: vi.fn(),
  78. mockRecordEndpointFailure: vi.fn(),
  79. mockRecordEndpointSuccess: vi.fn(),
  80. })
  81. );
  82. vi.mock("@/lib/circuit-breaker", () => ({
  83. recordFailure: mockRecordFailure,
  84. }));
  85. vi.mock("@/lib/endpoint-circuit-breaker", () => ({
  86. recordEndpointFailure: mockRecordEndpointFailure,
  87. recordEndpointSuccess: mockRecordEndpointSuccess,
  88. resetEndpointCircuit: vi.fn(),
  89. }));
  90. import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler";
  91. import { ProxySession } from "@/app/v1/_lib/proxy/session";
  92. import { setDeferredStreamingFinalization } from "@/app/v1/_lib/proxy/stream-finalization";
  93. import { getSystemSettings } from "@/repository/system-config";
  94. import { findLatestPriceByModel } from "@/repository/model-price";
  95. import { updateMessageRequestDetails, updateMessageRequestDuration } from "@/repository/message";
  96. import { SessionManager } from "@/lib/session-manager";
  97. import { RateLimitService } from "@/lib/rate-limit";
  98. import { SessionTracker } from "@/lib/session-tracker";
  99. const testPriceData: ModelPriceData = {
  100. input_cost_per_token: 0.000003,
  101. output_cost_per_token: 0.000015,
  102. };
  103. function createSession(opts?: { sessionId?: string | null }): ProxySession {
  104. const session = Object.create(ProxySession.prototype) as ProxySession;
  105. const provider = {
  106. id: 1,
  107. name: "test-provider",
  108. providerType: "claude" as const,
  109. baseUrl: "https://api.test.com",
  110. priority: 10,
  111. weight: 1,
  112. costMultiplier: 1,
  113. groupTag: "default",
  114. isEnabled: true,
  115. models: [],
  116. createdAt: new Date(),
  117. updatedAt: new Date(),
  118. streamingIdleTimeoutMs: 0,
  119. dailyResetTime: "00:00",
  120. dailyResetMode: "fixed",
  121. };
  122. const user = { id: 123, name: "test-user", dailyResetTime: "00:00", dailyResetMode: "fixed" };
  123. const key = { id: 456, name: "test-key", dailyResetTime: "00:00", dailyResetMode: "fixed" };
  124. Object.assign(session, {
  125. request: { message: {}, log: "(test)", model: "test-model" },
  126. startTime: Date.now(),
  127. method: "POST",
  128. requestUrl: new URL("http://localhost/v1/messages"),
  129. headers: new Headers(),
  130. headerLog: "",
  131. userAgent: null,
  132. context: {},
  133. clientAbortSignal: null,
  134. userName: "test-user",
  135. authState: { user, key, apiKey: "sk-test", success: true },
  136. provider,
  137. messageContext: {
  138. id: 1,
  139. createdAt: new Date(),
  140. user,
  141. key,
  142. apiKey: "sk-test",
  143. },
  144. sessionId: opts?.sessionId ?? null,
  145. requestSequence: 1,
  146. originalFormat: "claude",
  147. providerType: null,
  148. originalModelName: null,
  149. originalUrlPathname: null,
  150. providerChain: [],
  151. cacheTtlResolved: null,
  152. context1mApplied: false,
  153. specialSettings: [],
  154. cachedPriceData: undefined,
  155. cachedBillingModelSource: undefined,
  156. isHeaderModified: () => false,
  157. getContext1mApplied: () => false,
  158. getOriginalModel: () => "test-model",
  159. getCurrentModel: () => "test-model",
  160. getProviderChain: () => session.providerChain,
  161. getCachedPriceDataByBillingSource: async () => testPriceData,
  162. recordTtfb: () => 100,
  163. ttfbMs: null,
  164. getRequestSequence: () => 1,
  165. addProviderToChain: function (
  166. this: ProxySession & { providerChain: unknown[] },
  167. prov: {
  168. id: number;
  169. name: string;
  170. providerType: string;
  171. priority: number;
  172. weight: number;
  173. costMultiplier: number;
  174. groupTag: string;
  175. providerVendorId?: string;
  176. }
  177. ) {
  178. this.providerChain.push({
  179. id: prov.id,
  180. name: prov.name,
  181. vendorId: prov.providerVendorId,
  182. providerType: prov.providerType,
  183. priority: prov.priority,
  184. weight: prov.weight,
  185. costMultiplier: prov.costMultiplier,
  186. groupTag: prov.groupTag,
  187. timestamp: Date.now(),
  188. });
  189. },
  190. });
  191. // Helper setters
  192. (session as { setOriginalModel(m: string | null): void }).setOriginalModel = function (
  193. m: string | null
  194. ) {
  195. (this as { originalModelName: string | null }).originalModelName = m;
  196. };
  197. (session as { setSessionId(s: string): void }).setSessionId = function (s: string) {
  198. (this as { sessionId: string | null }).sessionId = s;
  199. };
  200. (session as { setProvider(p: unknown): void }).setProvider = function (p: unknown) {
  201. (this as { provider: unknown }).provider = p;
  202. };
  203. (session as { setAuthState(a: unknown): void }).setAuthState = function (a: unknown) {
  204. (this as { authState: unknown }).authState = a;
  205. };
  206. (session as { setMessageContext(c: unknown): void }).setMessageContext = function (c: unknown) {
  207. (this as { messageContext: unknown }).messageContext = c;
  208. };
  209. session.setOriginalModel("test-model");
  210. return session;
  211. }
  212. function setDeferredMeta(session: ProxySession, endpointId: number | null = 42) {
  213. setDeferredStreamingFinalization(session, {
  214. providerId: 1,
  215. providerName: "test-provider",
  216. providerPriority: 10,
  217. attemptNumber: 1,
  218. totalProvidersAttempted: 1,
  219. isFirstAttempt: true,
  220. isFailoverSuccess: false,
  221. endpointId,
  222. endpointUrl: "https://api.test.com",
  223. upstreamStatusCode: 200,
  224. });
  225. }
  226. /** Create an SSE stream that emits a fake-200 error body (valid HTTP 200 but error in content). */
  227. function createFake200StreamResponse(): Response {
  228. const body = `data: ${JSON.stringify({ error: { message: "invalid api key" } })}\n\n`;
  229. const encoder = new TextEncoder();
  230. const stream = new ReadableStream<Uint8Array>({
  231. start(controller) {
  232. controller.enqueue(encoder.encode(body));
  233. controller.close();
  234. },
  235. });
  236. return new Response(stream, {
  237. status: 200,
  238. headers: { "content-type": "text/event-stream" },
  239. });
  240. }
  241. /** Create an SSE stream that returns non-200 HTTP status with error body. */
  242. function createNon200StreamResponse(statusCode: number): Response {
  243. const body = `data: ${JSON.stringify({ error: "rate limit exceeded" })}\n\n`;
  244. const encoder = new TextEncoder();
  245. const stream = new ReadableStream<Uint8Array>({
  246. start(controller) {
  247. controller.enqueue(encoder.encode(body));
  248. controller.close();
  249. },
  250. });
  251. return new Response(stream, {
  252. status: statusCode,
  253. headers: { "content-type": "text/event-stream" },
  254. });
  255. }
  256. /** Create a successful SSE stream with usage data. */
  257. function createSuccessStreamResponse(): Response {
  258. const sseText = `event: message_delta\ndata: ${JSON.stringify({ usage: { input_tokens: 100, output_tokens: 50 } })}\n\n`;
  259. const encoder = new TextEncoder();
  260. const stream = new ReadableStream<Uint8Array>({
  261. start(controller) {
  262. controller.enqueue(encoder.encode(sseText));
  263. controller.close();
  264. },
  265. });
  266. return new Response(stream, {
  267. status: 200,
  268. headers: { "content-type": "text/event-stream" },
  269. });
  270. }
  271. async function drainAsyncTasks(): Promise<void> {
  272. const tasks = asyncTasks.splice(0, asyncTasks.length);
  273. await Promise.all(tasks);
  274. }
  275. function setupCommonMocks() {
  276. vi.mocked(getSystemSettings).mockResolvedValue({
  277. billingModelSource: "original",
  278. streamBufferEnabled: false,
  279. streamBufferMode: "none",
  280. streamBufferSize: 0,
  281. } as ReturnType<typeof getSystemSettings> extends Promise<infer T> ? T : never);
  282. vi.mocked(findLatestPriceByModel).mockResolvedValue({
  283. id: 1,
  284. modelName: "test-model",
  285. priceData: testPriceData,
  286. createdAt: new Date(),
  287. updatedAt: new Date(),
  288. });
  289. vi.mocked(updateMessageRequestDetails).mockResolvedValue(undefined);
  290. vi.mocked(updateMessageRequestDuration).mockResolvedValue(undefined);
  291. vi.mocked(SessionManager.storeSessionResponse).mockResolvedValue(undefined);
  292. vi.mocked(RateLimitService.trackCost).mockResolvedValue(undefined);
  293. vi.mocked(RateLimitService.trackUserDailyCost).mockResolvedValue(undefined);
  294. vi.mocked(RateLimitService.decrementLeaseBudget).mockResolvedValue({
  295. success: true,
  296. newRemaining: 10,
  297. });
  298. vi.mocked(SessionTracker.refreshSession).mockResolvedValue(undefined);
  299. mockRecordFailure.mockResolvedValue(undefined);
  300. mockRecordEndpointFailure.mockResolvedValue(undefined);
  301. mockRecordEndpointSuccess.mockResolvedValue(undefined);
  302. }
  303. beforeEach(() => {
  304. vi.clearAllMocks();
  305. asyncTasks.splice(0, asyncTasks.length);
  306. });
  307. describe("Endpoint circuit breaker isolation", () => {
  308. beforeEach(() => {
  309. setupCommonMocks();
  310. });
  311. it("fake-200 error should call recordFailure but NOT recordEndpointFailure", async () => {
  312. const session = createSession();
  313. setDeferredMeta(session, 42);
  314. const response = createFake200StreamResponse();
  315. await ProxyResponseHandler.dispatch(session, response);
  316. await drainAsyncTasks();
  317. expect(mockRecordFailure).toHaveBeenCalledWith(
  318. 1,
  319. expect.objectContaining({ message: expect.stringContaining("FAKE_200") })
  320. );
  321. expect(mockRecordEndpointFailure).not.toHaveBeenCalled();
  322. });
  323. it("non-200 HTTP status should call recordFailure but NOT recordEndpointFailure", async () => {
  324. const session = createSession();
  325. // Set upstream status to 429 in deferred meta
  326. setDeferredStreamingFinalization(session, {
  327. providerId: 1,
  328. providerName: "test-provider",
  329. providerPriority: 10,
  330. attemptNumber: 1,
  331. totalProvidersAttempted: 1,
  332. isFirstAttempt: true,
  333. isFailoverSuccess: false,
  334. endpointId: 42,
  335. endpointUrl: "https://api.test.com",
  336. upstreamStatusCode: 429,
  337. });
  338. const response = createNon200StreamResponse(429);
  339. await ProxyResponseHandler.dispatch(session, response);
  340. await drainAsyncTasks();
  341. expect(mockRecordFailure).toHaveBeenCalledWith(1, expect.any(Error));
  342. expect(mockRecordEndpointFailure).not.toHaveBeenCalled();
  343. });
  344. it("streaming success DOES call recordEndpointSuccess (regression guard)", async () => {
  345. const session = createSession();
  346. setDeferredMeta(session, 42);
  347. const response = createSuccessStreamResponse();
  348. await ProxyResponseHandler.dispatch(session, response);
  349. await drainAsyncTasks();
  350. expect(mockRecordEndpointSuccess).toHaveBeenCalledWith(42);
  351. expect(mockRecordEndpointFailure).not.toHaveBeenCalled();
  352. });
  353. it("streaming success without endpointId should NOT call any endpoint circuit breaker function", async () => {
  354. const session = createSession();
  355. setDeferredMeta(session, null);
  356. const response = createSuccessStreamResponse();
  357. await ProxyResponseHandler.dispatch(session, response);
  358. await drainAsyncTasks();
  359. expect(mockRecordEndpointSuccess).not.toHaveBeenCalled();
  360. expect(mockRecordEndpointFailure).not.toHaveBeenCalled();
  361. });
  362. });