circuit-breaker.test.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. import { afterEach, describe, expect, test, vi } from "vitest";
  2. type SavedCircuitState = {
  3. failureCount: number;
  4. lastFailureTime: number | null;
  5. circuitState: "closed" | "open" | "half-open";
  6. circuitOpenUntil: number | null;
  7. halfOpenSuccessCount: number;
  8. };
  9. type CircuitBreakerConfig = {
  10. failureThreshold: number;
  11. openDuration: number;
  12. halfOpenSuccessThreshold: number;
  13. };
  14. function createLoggerMock() {
  15. return {
  16. debug: vi.fn(),
  17. info: vi.fn(),
  18. warn: vi.fn(),
  19. trace: vi.fn(),
  20. error: vi.fn(),
  21. fatal: vi.fn(),
  22. };
  23. }
  24. function setupFakeTime(): void {
  25. vi.useFakeTimers();
  26. vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
  27. }
  28. function setupCircuitBreakerMocks(options?: {
  29. redis?: {
  30. loadCircuitState?: (providerId: number) => Promise<SavedCircuitState | null>;
  31. loadAllCircuitStates?: (providerIds: number[]) => Promise<Map<number, SavedCircuitState>>;
  32. saveCircuitState?: (providerId: number, state: SavedCircuitState) => Promise<void>;
  33. };
  34. config?: {
  35. defaultConfig?: CircuitBreakerConfig;
  36. loadProviderCircuitConfig?: (providerId: number) => Promise<CircuitBreakerConfig>;
  37. };
  38. pubsub?: {
  39. publishCacheInvalidation?: (channel: string, message?: string) => Promise<void>;
  40. subscribeCacheInvalidation?: (
  41. channel: string,
  42. callback: (message: string) => void
  43. ) => Promise<(() => void) | null>;
  44. };
  45. }): void {
  46. const defaultConfig: CircuitBreakerConfig = options?.config?.defaultConfig ?? {
  47. failureThreshold: 5,
  48. openDuration: 1800000,
  49. halfOpenSuccessThreshold: 2,
  50. };
  51. vi.doMock("@/lib/logger", () => ({ logger: createLoggerMock() }));
  52. vi.doMock("@/lib/redis/circuit-breaker-state", () => ({
  53. loadCircuitState: options?.redis?.loadCircuitState ?? vi.fn(async () => null),
  54. loadAllCircuitStates: options?.redis?.loadAllCircuitStates ?? vi.fn(async () => new Map()),
  55. saveCircuitState: options?.redis?.saveCircuitState ?? vi.fn(async () => {}),
  56. }));
  57. vi.doMock("@/lib/redis/circuit-breaker-config", () => ({
  58. DEFAULT_CIRCUIT_BREAKER_CONFIG: defaultConfig,
  59. loadProviderCircuitConfig:
  60. options?.config?.loadProviderCircuitConfig ?? vi.fn(async () => defaultConfig),
  61. }));
  62. vi.doMock("@/lib/redis/pubsub", () => ({
  63. publishCacheInvalidation: options?.pubsub?.publishCacheInvalidation ?? vi.fn(async () => {}),
  64. subscribeCacheInvalidation:
  65. options?.pubsub?.subscribeCacheInvalidation ?? vi.fn(async () => null),
  66. }));
  67. }
  68. afterEach(() => {
  69. vi.useRealTimers();
  70. });
  71. describe("circuit-breaker", () => {
  72. test("failureThreshold=0 时应视为禁用:即便 Redis 为 OPEN 也不应阻止请求,并自动复位为 CLOSED", async () => {
  73. setupFakeTime();
  74. vi.resetModules();
  75. let redisState: SavedCircuitState | null = {
  76. failureCount: 10,
  77. lastFailureTime: Date.now() - 1000,
  78. circuitState: "open",
  79. circuitOpenUntil: Date.now() + 300000,
  80. halfOpenSuccessCount: 0,
  81. };
  82. const loadStateMock = vi.fn(async () => redisState);
  83. const saveStateMock = vi.fn(async (_providerId: number, state: SavedCircuitState) => {
  84. redisState = state;
  85. });
  86. setupCircuitBreakerMocks({
  87. redis: {
  88. loadCircuitState: loadStateMock,
  89. saveCircuitState: saveStateMock,
  90. },
  91. config: {
  92. loadProviderCircuitConfig: vi.fn(async () => ({
  93. failureThreshold: 0,
  94. openDuration: 1800000,
  95. halfOpenSuccessThreshold: 2,
  96. })),
  97. },
  98. });
  99. const { getCircuitState, isCircuitOpen } = await import("@/lib/circuit-breaker");
  100. expect(await isCircuitOpen(1)).toBe(false);
  101. expect(getCircuitState(1)).toBe("closed");
  102. const lastState = saveStateMock.mock.calls[saveStateMock.mock.calls.length - 1]?.[1] as
  103. | SavedCircuitState
  104. | undefined;
  105. expect(lastState?.circuitState).toBe("closed");
  106. expect(lastState?.failureCount).toBe(0);
  107. expect(lastState?.lastFailureTime).toBeNull();
  108. expect(lastState?.circuitOpenUntil).toBeNull();
  109. expect(lastState?.halfOpenSuccessCount).toBe(0);
  110. });
  111. test("getAllHealthStatusAsync: failureThreshold=0 时应强制返回 CLOSED 并写回 Redis", async () => {
  112. setupFakeTime();
  113. vi.resetModules();
  114. const openState: SavedCircuitState = {
  115. failureCount: 10,
  116. lastFailureTime: Date.now() - 1000,
  117. circuitState: "open",
  118. circuitOpenUntil: Date.now() + 300000,
  119. halfOpenSuccessCount: 0,
  120. };
  121. let savedState: SavedCircuitState | null = null;
  122. const saveStateMock = vi.fn(async (_providerId: number, state: SavedCircuitState) => {
  123. savedState = state;
  124. });
  125. setupCircuitBreakerMocks({
  126. redis: {
  127. loadCircuitState: vi.fn(async () => null),
  128. loadAllCircuitStates: vi.fn(async () => new Map([[1, openState]])),
  129. saveCircuitState: saveStateMock,
  130. },
  131. config: {
  132. loadProviderCircuitConfig: vi.fn(async () => ({
  133. failureThreshold: 0,
  134. openDuration: 1800000,
  135. halfOpenSuccessThreshold: 2,
  136. })),
  137. },
  138. });
  139. const { getAllHealthStatusAsync } = await import("@/lib/circuit-breaker");
  140. const status = await getAllHealthStatusAsync([1], { forceRefresh: true });
  141. expect(status[1]?.circuitState).toBe("closed");
  142. expect(savedState?.circuitState).toBe("closed");
  143. expect(savedState?.failureCount).toBe(0);
  144. expect(savedState?.lastFailureTime).toBeNull();
  145. expect(savedState?.circuitOpenUntil).toBeNull();
  146. expect(savedState?.halfOpenSuccessCount).toBe(0);
  147. });
  148. test("getAllHealthStatusAsync: Redis 无状态时应清理内存中的非 CLOSED 状态(避免展示/筛选残留)", async () => {
  149. setupFakeTime();
  150. vi.resetModules();
  151. const openState: SavedCircuitState = {
  152. failureCount: 10,
  153. lastFailureTime: Date.now() - 1000,
  154. circuitState: "open",
  155. circuitOpenUntil: Date.now() + 300000,
  156. halfOpenSuccessCount: 0,
  157. };
  158. let loadCalls = 0;
  159. const loadAllCircuitStatesMock = vi.fn(async () => {
  160. loadCalls++;
  161. if (loadCalls === 1) {
  162. return new Map([[1, openState]]);
  163. }
  164. return new Map();
  165. });
  166. setupCircuitBreakerMocks({
  167. redis: {
  168. loadCircuitState: vi.fn(async () => null),
  169. loadAllCircuitStates: loadAllCircuitStatesMock,
  170. saveCircuitState: vi.fn(async () => {}),
  171. },
  172. config: {
  173. loadProviderCircuitConfig: vi.fn(async () => ({
  174. failureThreshold: 5,
  175. openDuration: 1800000,
  176. halfOpenSuccessThreshold: 2,
  177. })),
  178. },
  179. });
  180. const { getAllHealthStatusAsync, getCircuitState } = await import("@/lib/circuit-breaker");
  181. const first = await getAllHealthStatusAsync([1], { forceRefresh: true });
  182. expect(first[1]?.circuitState).toBe("open");
  183. expect(getCircuitState(1)).toBe("open");
  184. const second = await getAllHealthStatusAsync([1], { forceRefresh: true });
  185. expect(second[1]?.circuitState).toBe("closed");
  186. expect(getCircuitState(1)).toBe("closed");
  187. });
  188. test("recordFailure: 已处于 OPEN 时不应重置 circuitOpenUntil(避免延长熔断时间)", async () => {
  189. setupFakeTime();
  190. vi.resetModules();
  191. let redisState: SavedCircuitState | null = null;
  192. const loadStateMock = vi.fn(async () => redisState);
  193. const saveStateMock = vi.fn(async (_providerId: number, state: SavedCircuitState) => {
  194. redisState = state;
  195. });
  196. const sendAlertMock = vi.fn(async () => {});
  197. vi.doMock("@/lib/notification/notifier", () => ({
  198. sendCircuitBreakerAlert: sendAlertMock,
  199. }));
  200. vi.doMock("@/drizzle/schema", () => ({
  201. providers: { id: "id", name: "name" },
  202. }));
  203. vi.doMock("drizzle-orm", () => ({ eq: vi.fn(() => ({})) }));
  204. vi.doMock("@/drizzle/db", () => ({
  205. db: {
  206. select: vi.fn(() => ({
  207. from: vi.fn(() => ({
  208. where: vi.fn(() => ({
  209. limit: vi.fn(async () => [{ name: "Test Provider" }]),
  210. })),
  211. })),
  212. })),
  213. },
  214. }));
  215. setupCircuitBreakerMocks({
  216. redis: {
  217. loadCircuitState: loadStateMock,
  218. saveCircuitState: saveStateMock,
  219. },
  220. config: {
  221. loadProviderCircuitConfig: vi.fn(async () => ({
  222. failureThreshold: 2,
  223. openDuration: 300000,
  224. halfOpenSuccessThreshold: 2,
  225. })),
  226. },
  227. });
  228. const { recordFailure } = await import("@/lib/circuit-breaker");
  229. await recordFailure(1, new Error("boom"));
  230. await recordFailure(1, new Error("boom"));
  231. expect(redisState?.circuitState).toBe("open");
  232. const openUntil = redisState?.circuitOpenUntil;
  233. expect(openUntil).toBe(Date.now() + 300000);
  234. vi.advanceTimersByTime(1000);
  235. await recordFailure(1, new Error("boom"));
  236. expect(redisState?.circuitOpenUntil).toBe(openUntil);
  237. // recordFailure 在达到阈值后会触发异步告警(dynamic import + non-blocking)。
  238. // 切回真实计时器推进事件循环,避免任务悬挂导致后续用例 mock 串台。
  239. vi.useRealTimers();
  240. await expect.poll(() => sendAlertMock.mock.calls.length, { timeout: 1000 }).toBe(1);
  241. });
  242. test("配置加载失败时应缓存默认配置,避免重复请求配置存储", async () => {
  243. setupFakeTime();
  244. vi.resetModules();
  245. const loadProviderCircuitConfigMock = vi.fn(async () => {
  246. throw new Error("redis down");
  247. });
  248. setupCircuitBreakerMocks({
  249. config: {
  250. defaultConfig: {
  251. failureThreshold: 100,
  252. openDuration: 1800000,
  253. halfOpenSuccessThreshold: 2,
  254. },
  255. loadProviderCircuitConfig: loadProviderCircuitConfigMock,
  256. },
  257. });
  258. const { recordFailure } = await import("@/lib/circuit-breaker");
  259. await recordFailure(1, new Error("boom"));
  260. await recordFailure(1, new Error("boom"));
  261. expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
  262. });
  263. test("并发加载配置时应进行 in-flight 合并,避免重复请求配置存储", async () => {
  264. setupFakeTime();
  265. vi.resetModules();
  266. const loadProviderCircuitConfigMock = vi.fn(
  267. async () =>
  268. await new Promise<CircuitBreakerConfig>((resolve) => {
  269. setTimeout(() => {
  270. resolve({
  271. failureThreshold: 100,
  272. openDuration: 1800000,
  273. halfOpenSuccessThreshold: 2,
  274. });
  275. }, 50);
  276. })
  277. );
  278. setupCircuitBreakerMocks({
  279. config: {
  280. loadProviderCircuitConfig: loadProviderCircuitConfigMock,
  281. },
  282. });
  283. const { recordFailure } = await import("@/lib/circuit-breaker");
  284. const p1 = recordFailure(1, new Error("boom"));
  285. const p2 = recordFailure(1, new Error("boom"));
  286. await vi.advanceTimersByTimeAsync(50);
  287. await Promise.all([p1, p2]);
  288. expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
  289. });
  290. test("收到配置失效通知后应清除配置缓存并触发重新加载(跨实例一致性)", async () => {
  291. setupFakeTime();
  292. const originalCi = process.env.CI;
  293. process.env.CI = "false";
  294. try {
  295. vi.resetModules();
  296. let onInvalidation: ((message: string) => void) | null = null;
  297. const loadProviderCircuitConfigMock = vi
  298. .fn()
  299. .mockResolvedValueOnce({
  300. failureThreshold: 5,
  301. openDuration: 1800000,
  302. halfOpenSuccessThreshold: 2,
  303. })
  304. .mockResolvedValueOnce({
  305. failureThreshold: 0,
  306. openDuration: 1800000,
  307. halfOpenSuccessThreshold: 2,
  308. });
  309. const subscribeCacheInvalidationMock = vi.fn(
  310. async (_channel: string, cb: (message: string) => void) => {
  311. onInvalidation = cb;
  312. return () => {};
  313. }
  314. );
  315. setupCircuitBreakerMocks({
  316. config: {
  317. loadProviderCircuitConfig: loadProviderCircuitConfigMock,
  318. },
  319. pubsub: {
  320. subscribeCacheInvalidation: subscribeCacheInvalidationMock,
  321. },
  322. });
  323. const { recordFailure } = await import("@/lib/circuit-breaker");
  324. await recordFailure(1, new Error("boom"));
  325. expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
  326. expect(subscribeCacheInvalidationMock).toHaveBeenCalledTimes(1);
  327. expect(onInvalidation).not.toBeNull();
  328. onInvalidation!(JSON.stringify({ providerIds: [1] }));
  329. await recordFailure(1, new Error("boom"));
  330. expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(2);
  331. } finally {
  332. if (originalCi === undefined) {
  333. delete process.env.CI;
  334. } else {
  335. process.env.CI = originalCi;
  336. }
  337. }
  338. });
  339. test("失效通知发生在配置加载期间时应重试,避免把旧配置写回缓存", async () => {
  340. setupFakeTime();
  341. const originalCi = process.env.CI;
  342. process.env.CI = "false";
  343. try {
  344. vi.resetModules();
  345. let onInvalidation: ((message: string) => void) | null = null;
  346. const deferred = <T>() => {
  347. let resolve!: (value: T) => void;
  348. let reject!: (reason?: unknown) => void;
  349. const promise = new Promise<T>((res, rej) => {
  350. resolve = res;
  351. reject = rej;
  352. });
  353. return { promise, resolve, reject };
  354. };
  355. const first = deferred<CircuitBreakerConfig>();
  356. const second = deferred<CircuitBreakerConfig>();
  357. const loadProviderCircuitConfigMock = vi
  358. .fn()
  359. .mockImplementationOnce(async () => await first.promise)
  360. .mockImplementationOnce(async () => await second.promise);
  361. const subscribeCacheInvalidationMock = vi.fn(
  362. async (_channel: string, cb: (message: string) => void) => {
  363. onInvalidation = cb;
  364. return () => {};
  365. }
  366. );
  367. setupCircuitBreakerMocks({
  368. config: {
  369. loadProviderCircuitConfig: loadProviderCircuitConfigMock,
  370. },
  371. pubsub: {
  372. subscribeCacheInvalidation: subscribeCacheInvalidationMock,
  373. },
  374. });
  375. const { getProviderHealthInfo, recordFailure } = await import("@/lib/circuit-breaker");
  376. const failurePromise = recordFailure(1, new Error("boom"));
  377. // recordFailure 会先 await getOrCreateHealth(包含 Redis 同步),这里让出若干微任务以触发配置加载
  378. for (let i = 0; i < 5 && loadProviderCircuitConfigMock.mock.calls.length === 0; i++) {
  379. await Promise.resolve();
  380. }
  381. expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
  382. expect(onInvalidation).not.toBeNull();
  383. onInvalidation!(JSON.stringify({ providerIds: [1] }));
  384. first.resolve({
  385. failureThreshold: 5,
  386. openDuration: 1800000,
  387. halfOpenSuccessThreshold: 2,
  388. });
  389. for (let i = 0; i < 5 && loadProviderCircuitConfigMock.mock.calls.length < 2; i++) {
  390. await Promise.resolve();
  391. }
  392. expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(2);
  393. second.resolve({
  394. failureThreshold: 0,
  395. openDuration: 1800000,
  396. halfOpenSuccessThreshold: 2,
  397. });
  398. await failurePromise;
  399. const { health } = await getProviderHealthInfo(1);
  400. expect(health.failureCount).toBe(0);
  401. } finally {
  402. if (originalCi === undefined) {
  403. delete process.env.CI;
  404. } else {
  405. process.env.CI = originalCi;
  406. }
  407. }
  408. });
  409. });