Переглянути джерело

修复:熔断器禁用后仍拦截供应商 (#837)

* 修复:熔断器禁用后仍拦截供应商

- failureThreshold<=0 视为禁用:检测到非 closed 状态时强制复位并写回 Redis\n- recordFailure 在 OPEN 状态不再重置 circuitOpenUntil,避免熔断窗口被延长\n- 补充单测覆盖 isCircuitOpen / getAllHealthStatusAsync / recordFailure

* 优化:降低熔断配置强制刷新频率

- isCircuitOpen 在非 closed 状态下按最小间隔强刷配置,减少持续 OPEN 时的 Redis 压力\n- recordFailure 在已 OPEN 时不再重复拉取配置,仅持久化计数

* 修复:批量刷新时清理 Redis 缺失的残留熔断状态

- getAllHealthStatusAsync 在批量刷新时,如果 Redis 无该 provider 状态,则清理内存中的非 closed 状态\n- 补充单测覆盖该清理逻辑,避免健康状态展示/筛选残留

* 重构:抽取禁用熔断器处理并稳定异步告警测试

- 提取 handleDisabledCircuitBreaker/needsHealthResetToClosed,消除多处重复逻辑\n- 单测用 expect.poll 等待告警触发,避免忙等待导致的潜在不稳定

* 优化:缓存默认熔断配置并并行预取批量配置

* 优化:批量强刷配置增加并发上限

* 测试:抽取熔断器测试通用 mocks

* 熔断器:60s 强刷 + Pub/Sub 配置失效 + 禁用时主动闭合

* 测试:兼容 CI 环境的配置失效订阅用例

* 优化:Pub/Sub 订阅失败指数退避

---------

Co-authored-by: tesgth032 <[email protected]>
tesgth032 1 місяць тому
батько
коміт
aa2dabd554

+ 49 - 4
src/actions/providers.ts

@@ -12,7 +12,9 @@ import { publishProviderCacheInvalidation } from "@/lib/cache/provider-cache";
 import {
   clearConfigCache,
   clearProviderState,
+  forceCloseCircuitState,
   getAllHealthStatusAsync,
+  publishCircuitBreakerConfigInvalidation,
   resetCircuit,
 } from "@/lib/circuit-breaker";
 import { PROVIDER_GROUP, PROVIDER_TIMEOUT_DEFAULTS } from "@/lib/constants/provider.constants";
@@ -777,9 +779,14 @@ export async function editProvider(
           openDuration: provider.circuitBreakerOpenDuration,
           halfOpenSuccessThreshold: provider.circuitBreakerHalfOpenSuccessThreshold,
         });
-        // 清除内存缓存,强制下次读取最新配置
-        clearConfigCache(providerId);
+        // 清除配置缓存并广播(跨实例立即生效)
+        await publishCircuitBreakerConfigInvalidation(providerId);
         logger.debug("editProvider:config_synced_to_redis", { providerId });
+
+        // 若管理员禁用熔断器(threshold<=0),则应立即解除 OPEN/HALF-OPEN 拦截(跨实例)
+        if (provider.circuitBreakerFailureThreshold <= 0) {
+          await forceCloseCircuitState(providerId, { reason: "circuit_breaker_disabled" });
+        }
       } catch (error) {
         logger.warn("editProvider:redis_sync_failed", {
           providerId,
@@ -1930,7 +1937,6 @@ export async function applyProviderBatchPatch(
       for (const id of effectiveProviderIds) {
         try {
           await deleteProviderCircuitConfig(id);
-          clearConfigCache(id);
         } catch (error) {
           logger.warn("applyProviderBatchPatch:cb_cache_invalidation_failed", {
             providerId: id,
@@ -1938,6 +1944,23 @@ export async function applyProviderBatchPatch(
           });
         }
       }
+
+      // 清除配置缓存并广播(跨实例立即生效)
+      await publishCircuitBreakerConfigInvalidation(effectiveProviderIds);
+
+      // 若本次补丁将熔断器禁用(threshold<=0),则应立即解除 OPEN/HALF-OPEN 拦截(跨实例)
+      const nextFailureThreshold = updatesResult.data.circuit_breaker_failure_threshold;
+      if (typeof nextFailureThreshold === "number" && nextFailureThreshold <= 0) {
+        const batchSize = 20;
+        for (let i = 0; i < effectiveProviderIds.length; i += batchSize) {
+          const batch = effectiveProviderIds.slice(i, i + batchSize);
+          await Promise.all(
+            batch.map((providerId) =>
+              forceCloseCircuitState(providerId, { reason: "circuit_breaker_disabled" })
+            )
+          );
+        }
+      }
     }
 
     const appliedAt = new Date(nowMs).toISOString();
@@ -2055,7 +2078,6 @@ export async function undoProviderPatch(
       for (const providerId of snapshot.providerIds) {
         try {
           await deleteProviderCircuitConfig(providerId);
-          clearConfigCache(providerId);
         } catch (error) {
           logger.warn("undoProviderPatch:cb_cache_invalidation_failed", {
             providerId,
@@ -2063,6 +2085,29 @@ export async function undoProviderPatch(
           });
         }
       }
+
+      // 清除配置缓存并广播(跨实例立即生效)
+      await publishCircuitBreakerConfigInvalidation(snapshot.providerIds);
+
+      // 若撤销后变为禁用(threshold<=0),则应立即解除 OPEN/HALF-OPEN 拦截(跨实例)
+      const disabledProviderIds = snapshot.providerIds.filter((providerId) => {
+        const preimage = snapshot.preimage[providerId];
+        if (!preimage) return false;
+        const nextFailureThreshold = preimage.circuitBreakerFailureThreshold;
+        return typeof nextFailureThreshold === "number" && nextFailureThreshold <= 0;
+      });
+
+      if (disabledProviderIds.length > 0) {
+        const batchSize = 20;
+        for (let i = 0; i < disabledProviderIds.length; i += batchSize) {
+          const batch = disabledProviderIds.slice(i, i + batchSize);
+          await Promise.all(
+            batch.map((providerId) =>
+              forceCloseCircuitState(providerId, { reason: "circuit_breaker_disabled" })
+            )
+          );
+        }
+      }
     }
 
     return {

+ 403 - 45
src/lib/circuit-breaker.ts

@@ -27,6 +27,7 @@ import {
   loadCircuitState,
   saveCircuitState,
 } from "@/lib/redis/circuit-breaker-state";
+import { publishCacheInvalidation, subscribeCacheInvalidation } from "@/lib/redis/pubsub";
 
 // 修复:导出 ProviderHealth 类型,供其他模块使用
 export interface ProviderHealth {
@@ -46,9 +47,215 @@ const healthMap = new Map<number, ProviderHealth>();
 // 配置缓存 TTL(5 分钟)
 const CONFIG_CACHE_TTL = 5 * 60 * 1000;
 
+// 非 closed 状态下,为了及时响应管理员禁用配置,最小间隔强制刷新一次配置(避免每次调用都打 Redis)
+const NON_CLOSED_CONFIG_FORCE_RELOAD_INTERVAL_MS = 60_000;
+
+export const CHANNEL_CIRCUIT_BREAKER_CONFIG_UPDATED = "cch:cache:circuit_breaker_config:updated";
+
+// getAllHealthStatusAsync 中批量强制刷新配置时的并发批大小(避免瞬时放大 Redis/配置存储压力)
+const CONFIG_FORCE_RELOAD_BATCH_SIZE = 20;
+
 // 标记已从 Redis 加载过状态的供应商(避免重复加载)
 const loadedFromRedis = new Set<number>();
 
+// 配置加载去抖:同一 provider 同时只允许一个配置加载任务
+const configLoadInFlight = new Map<number, Promise<CircuitBreakerConfig>>();
+
+// 配置缓存版本号:用于避免“失效事件”被 in-flight 旧结果覆盖
+const configCacheVersion = new Map<number, number>();
+
+let configInvalidationSubscriptionInitialized = false;
+let configInvalidationSubscriptionPromise: Promise<void> | null = null;
+
+function bumpConfigCacheVersion(providerId: number): number {
+  const next = (configCacheVersion.get(providerId) ?? 0) + 1;
+  configCacheVersion.set(providerId, next);
+  return next;
+}
+
+function getConfigCacheVersion(providerId: number): number {
+  return configCacheVersion.get(providerId) ?? 0;
+}
+
+function parseConfigInvalidationProviderIds(message: string): number[] | null {
+  // 兼容:纯数字字符串(做上限保护,避免误把时间戳当作 providerId 导致内存膨胀)
+  const trimmed = message.trim();
+  const asNumber = Number.parseInt(trimmed, 10);
+  if (
+    Number.isFinite(asNumber) &&
+    `${asNumber}` === trimmed &&
+    asNumber > 0 &&
+    asNumber <= 1_000_000_000
+  ) {
+    return [asNumber];
+  }
+
+  try {
+    const parsed = JSON.parse(message) as unknown;
+    if (!parsed || typeof parsed !== "object") return null;
+
+    const obj = parsed as {
+      providerId?: unknown;
+      providerIds?: unknown;
+    };
+
+    if (
+      typeof obj.providerId === "number" &&
+      Number.isFinite(obj.providerId) &&
+      Number.isInteger(obj.providerId) &&
+      obj.providerId > 0 &&
+      obj.providerId <= 1_000_000_000
+    ) {
+      return [obj.providerId];
+    }
+
+    if (Array.isArray(obj.providerIds)) {
+      const ids = obj.providerIds
+        .map((v) => (typeof v === "number" ? v : Number.NaN))
+        .filter((v) => Number.isFinite(v) && Number.isInteger(v) && v > 0 && v <= 1_000_000_000);
+      return ids.length > 0 ? ids : null;
+    }
+
+    return null;
+  } catch {
+    return null;
+  }
+}
+
+async function ensureConfigInvalidationSubscription(): Promise<void> {
+  if (configInvalidationSubscriptionInitialized) return;
+  if (configInvalidationSubscriptionPromise) return configInvalidationSubscriptionPromise;
+
+  configInvalidationSubscriptionPromise = (async () => {
+    // CI/build 阶段跳过,避免订阅超时拖慢检查
+    if (process.env.CI === "true" || process.env.NEXT_PHASE === "phase-production-build") {
+      configInvalidationSubscriptionInitialized = true;
+      return;
+    }
+
+    // Edge runtime 跳过(不支持 ioredis)
+    if (typeof process !== "undefined" && process.env.NEXT_RUNTIME === "edge") {
+      configInvalidationSubscriptionInitialized = true;
+      return;
+    }
+
+    const cleanup = await subscribeCacheInvalidation(
+      CHANNEL_CIRCUIT_BREAKER_CONFIG_UPDATED,
+      (message) => {
+        const ids = parseConfigInvalidationProviderIds(message);
+        if (!ids) return;
+
+        for (const providerId of ids) {
+          clearConfigCache(providerId);
+        }
+
+        logger.debug("[CircuitBreaker] Config cache invalidated via pub/sub", {
+          count: ids.length,
+        });
+      }
+    );
+
+    if (!cleanup) return;
+    configInvalidationSubscriptionInitialized = true;
+  })().finally(() => {
+    configInvalidationSubscriptionPromise = null;
+  });
+
+  return configInvalidationSubscriptionPromise;
+}
+
+async function loadProviderConfigDeduped(providerId: number): Promise<CircuitBreakerConfig> {
+  const existing = configLoadInFlight.get(providerId);
+  if (existing) return existing;
+
+  const promise = loadProviderCircuitConfig(providerId);
+  configLoadInFlight.set(providerId, promise);
+
+  promise.then(
+    () => {
+      if (configLoadInFlight.get(providerId) === promise) {
+        configLoadInFlight.delete(providerId);
+      }
+    },
+    () => {
+      if (configLoadInFlight.get(providerId) === promise) {
+        configLoadInFlight.delete(providerId);
+      }
+    }
+  );
+
+  return promise;
+}
+
+export async function publishCircuitBreakerConfigInvalidation(
+  providerIds: number | number[]
+): Promise<void> {
+  const ids = Array.isArray(providerIds) ? providerIds : [providerIds];
+  if (ids.length === 0) return;
+
+  for (const providerId of ids) {
+    clearConfigCache(providerId);
+  }
+
+  await publishCacheInvalidation(
+    CHANNEL_CIRCUIT_BREAKER_CONFIG_UPDATED,
+    JSON.stringify({ providerIds: ids })
+  );
+  logger.debug("[CircuitBreaker] Published config cache invalidation", { count: ids.length });
+}
+
+function isCircuitBreakerDisabled(config: CircuitBreakerConfig): boolean {
+  return !Number.isFinite(config.failureThreshold) || config.failureThreshold <= 0;
+}
+
+function resetHealthToClosed(health: ProviderHealth): void {
+  health.circuitState = "closed";
+  health.failureCount = 0;
+  health.lastFailureTime = null;
+  health.circuitOpenUntil = null;
+  health.halfOpenSuccessCount = 0;
+}
+
+function isCircuitStateOpen(health: ProviderHealth): boolean {
+  return health.circuitState === "open";
+}
+
+function needsHealthResetToClosed(health: ProviderHealth): boolean {
+  return (
+    health.circuitState !== "closed" ||
+    health.failureCount !== 0 ||
+    health.lastFailureTime !== null ||
+    health.circuitOpenUntil !== null ||
+    health.halfOpenSuccessCount !== 0
+  );
+}
+
+function handleDisabledCircuitBreaker(
+  providerId: number,
+  health: ProviderHealth,
+  config: CircuitBreakerConfig
+): boolean {
+  if (!isCircuitBreakerDisabled(config)) {
+    return false;
+  }
+
+  if (!needsHealthResetToClosed(health)) {
+    return true;
+  }
+
+  const previousState = health.circuitState;
+  resetHealthToClosed(health);
+  logger.info(
+    `[CircuitBreaker] Provider ${providerId} circuit forced closed because circuit breaker is disabled`,
+    {
+      providerId,
+      previousState,
+    }
+  );
+  persistStateToRedis(providerId, health);
+  return true;
+}
+
 /**
  * 获取或创建供应商的健康状态(同步版本,用于内部)
  */
@@ -159,30 +366,67 @@ function persistStateToRedis(providerId: number, health: ProviderHealth): void {
  * 获取供应商的熔断器配置(带缓存)
  * 缓存策略:内存缓存 5 分钟,避免频繁查询 Redis
  */
-async function getProviderConfig(providerId: number): Promise<CircuitBreakerConfig> {
-  const health = await getOrCreateHealth(providerId);
+async function getProviderConfigForHealth(
+  providerId: number,
+  health: ProviderHealth,
+  options?: { forceReload?: boolean }
+): Promise<CircuitBreakerConfig> {
+  // 异步初始化订阅(不阻塞主流程)
+  void ensureConfigInvalidationSubscription();
 
+  const forceReload = options?.forceReload ?? false;
   // 检查内存缓存是否有效
   const now = Date.now();
-  if (health.config && health.configLoadedAt && now - health.configLoadedAt < CONFIG_CACHE_TTL) {
+  if (
+    !forceReload &&
+    health.config &&
+    health.configLoadedAt &&
+    now - health.configLoadedAt < CONFIG_CACHE_TTL
+  ) {
     return health.config;
   }
 
-  // 从 Redis/数据库加载配置
-  try {
-    const config = await loadProviderCircuitConfig(providerId);
-    health.config = config;
-    health.configLoadedAt = now;
-    return config;
-  } catch (error) {
-    logger.warn(
-      `[CircuitBreaker] Failed to load config for provider ${providerId}, using default`,
-      {
-        error: error instanceof Error ? error.message : String(error),
+  // 从 Redis/数据库加载配置(in-flight 合并 + 版本号防止失效竞态)
+  for (let attempt = 0; attempt < 2; attempt++) {
+    const startedAt = Date.now();
+    const versionAtStart = getConfigCacheVersion(providerId);
+
+    try {
+      const config = await loadProviderConfigDeduped(providerId);
+
+      if (getConfigCacheVersion(providerId) !== versionAtStart) {
+        // 失效事件在加载期间发生,重试一次(避免把旧结果写回缓存)
+        if (attempt < 1) continue;
+        return config;
       }
-    );
-    return DEFAULT_CIRCUIT_BREAKER_CONFIG;
+
+      health.config = config;
+      health.configLoadedAt = startedAt;
+      return config;
+    } catch (error) {
+      // 如果加载期间发生失效事件,允许重试一次再降级
+      if (getConfigCacheVersion(providerId) !== versionAtStart && attempt < 1) {
+        continue;
+      }
+
+      logger.warn(
+        `[CircuitBreaker] Failed to load config for provider ${providerId}, using default`,
+        {
+          error: error instanceof Error ? error.message : String(error),
+        }
+      );
+
+      // 缓存默认配置,避免配置读取失败时在高频路径反复打 Redis/数据库
+      health.config = DEFAULT_CIRCUIT_BREAKER_CONFIG;
+      health.configLoadedAt = startedAt;
+      return health.config;
+    }
   }
+
+  // 理论上不应到达这里,兜底返回默认配置
+  health.config = DEFAULT_CIRCUIT_BREAKER_CONFIG;
+  health.configLoadedAt = Date.now();
+  return health.config;
 }
 
 /**
@@ -193,7 +437,7 @@ export async function getProviderHealthInfo(providerId: number): Promise<{
   config: CircuitBreakerConfig;
 }> {
   const health = await getOrCreateHealth(providerId);
-  const config = await getProviderConfig(providerId);
+  const config = await getProviderConfigForHealth(providerId, health);
   return { health, config };
 }
 
@@ -207,9 +451,19 @@ export async function isCircuitOpen(providerId: number): Promise<boolean> {
     return false;
   }
 
+  const now = Date.now();
+  const config = await getProviderConfigForHealth(providerId, health, {
+    forceReload:
+      health.configLoadedAt === null ||
+      now - health.configLoadedAt > NON_CLOSED_CONFIG_FORCE_RELOAD_INTERVAL_MS,
+  });
+  if (handleDisabledCircuitBreaker(providerId, health, config)) {
+    return false;
+  }
+
   if (health.circuitState === "open") {
     // 检查是否可以转为半开状态
-    if (health.circuitOpenUntil && Date.now() > health.circuitOpenUntil) {
+    if (health.circuitOpenUntil && now > health.circuitOpenUntil) {
       health.circuitState = "half-open";
       health.halfOpenSuccessCount = 0;
       logger.info(`[CircuitBreaker] Provider ${providerId} transitioned to half-open`);
@@ -229,7 +483,11 @@ export async function isCircuitOpen(providerId: number): Promise<boolean> {
  */
 export async function recordFailure(providerId: number, error: Error): Promise<void> {
   const health = await getOrCreateHealth(providerId);
-  const config = await getProviderConfig(providerId);
+  const config = await getProviderConfigForHealth(providerId, health);
+
+  if (handleDisabledCircuitBreaker(providerId, health, config)) {
+    return;
+  }
 
   health.failureCount++;
   health.lastFailureTime = Date.now();
@@ -244,35 +502,55 @@ export async function recordFailure(providerId: number, error: Error): Promise<v
     }
   );
 
+  if (health.circuitState === "open") {
+    // 已经 OPEN:不应重复开闸/重置 openUntil;只记录计数并持久化(避免失败风暴下重复拉取配置)
+    persistStateToRedis(providerId, health);
+    return;
+  }
+
   // 检查是否需要打开熔断器
   // failureThreshold = 0 表示禁用熔断器
-  if (config.failureThreshold > 0 && health.failureCount >= config.failureThreshold) {
-    health.circuitState = "open";
-    health.circuitOpenUntil = Date.now() + config.openDuration;
-    health.halfOpenSuccessCount = 0;
+  if (health.failureCount >= config.failureThreshold) {
+    const latestConfig = await getProviderConfigForHealth(providerId, health, {
+      forceReload: true,
+    });
+    if (handleDisabledCircuitBreaker(providerId, health, latestConfig)) {
+      return;
+    }
 
-    const retryAt = new Date(health.circuitOpenUntil).toISOString();
+    if (health.failureCount < latestConfig.failureThreshold) {
+      persistStateToRedis(providerId, health);
+      return;
+    }
 
-    logger.error(
-      `[CircuitBreaker] Provider ${providerId} circuit opened after ${health.failureCount} failures, will retry at ${retryAt}`,
-      {
-        providerId,
-        failureCount: health.failureCount,
-        openDuration: config.openDuration,
-        retryAt,
-      }
-    );
+    if (!isCircuitStateOpen(health)) {
+      health.circuitState = "open";
+      health.circuitOpenUntil = Date.now() + latestConfig.openDuration;
+      health.halfOpenSuccessCount = 0;
+
+      const retryAt = new Date(health.circuitOpenUntil).toISOString();
 
-    // 异步发送熔断器告警(不阻塞主流程)
-    triggerCircuitBreakerAlert(providerId, health.failureCount, retryAt, error.message).catch(
-      (err) => {
-        logger.error({
-          action: "trigger_circuit_breaker_alert_error",
+      logger.error(
+        `[CircuitBreaker] Provider ${providerId} circuit opened after ${health.failureCount} failures, will retry at ${retryAt}`,
+        {
           providerId,
-          error: err instanceof Error ? err.message : String(err),
-        });
-      }
-    );
+          failureCount: health.failureCount,
+          openDuration: latestConfig.openDuration,
+          retryAt,
+        }
+      );
+
+      // 异步发送熔断器告警(不阻塞主流程)
+      triggerCircuitBreakerAlert(providerId, health.failureCount, retryAt, error.message).catch(
+        (err) => {
+          logger.error({
+            action: "trigger_circuit_breaker_alert_error",
+            providerId,
+            error: err instanceof Error ? err.message : String(err),
+          });
+        }
+      );
+    }
   }
 
   // 持久化状态变更到 Redis
@@ -333,9 +611,13 @@ async function triggerCircuitBreakerAlert(
  */
 export async function recordSuccess(providerId: number): Promise<void> {
   const health = await getOrCreateHealth(providerId);
-  const config = await getProviderConfig(providerId);
+  const config = await getProviderConfigForHealth(providerId, health);
   let stateChanged = false;
 
+  if (handleDisabledCircuitBreaker(providerId, health, config)) {
+    return;
+  }
+
   if (health.circuitState === "half-open") {
     // 半开状态下成功
     health.halfOpenSuccessCount++;
@@ -486,8 +768,22 @@ export async function getAllHealthStatusAsync(
         });
       }
 
-      // Mark IDs without Redis state as "loaded" to prevent repeated queries
+      // Mark IDs without Redis state as "loaded" to prevent repeated queries.
+      // If Redis has no state but memory is non-closed, force-reset to avoid stale states.
       for (const id of needsRefresh) {
+        if (!redisStates.has(id)) {
+          const health = healthMap.get(id);
+          if (health && health.circuitState !== "closed") {
+            resetHealthToClosed(health);
+            logger.info(
+              `[CircuitBreaker] Provider ${id} reset to closed (Redis state missing on batch load)`,
+              {
+                providerId: id,
+              }
+            );
+          }
+        }
+
         loadedFromRedis.add(id);
       }
     } catch (error) {
@@ -497,6 +793,23 @@ export async function getAllHealthStatusAsync(
     }
   }
 
+  const nonClosedIds = providerIds.filter((providerId) => {
+    const health = healthMap.get(providerId);
+    return health && health.circuitState !== "closed";
+  });
+  const forcedConfigMap = new Map<number, CircuitBreakerConfig>();
+  for (let i = 0; i < nonClosedIds.length; i += CONFIG_FORCE_RELOAD_BATCH_SIZE) {
+    const batch = nonClosedIds.slice(i, i + CONFIG_FORCE_RELOAD_BATCH_SIZE);
+    await Promise.all(
+      batch.map(async (providerId) => {
+        const health = healthMap.get(providerId);
+        if (!health) return;
+        const config = await getProviderConfigForHealth(providerId, health, { forceReload: true });
+        forcedConfigMap.set(providerId, config);
+      })
+    );
+  }
+
   // Only include status for requested providers (not all in healthMap)
   for (const providerId of providerIds) {
     let health = healthMap.get(providerId);
@@ -515,6 +828,16 @@ export async function getAllHealthStatusAsync(
       healthMap.set(providerId, health);
     }
 
+    if (health.circuitState !== "closed") {
+      const config =
+        forcedConfigMap.get(providerId) ??
+        (await getProviderConfigForHealth(providerId, health, { forceReload: true }));
+      if (handleDisabledCircuitBreaker(providerId, health, config)) {
+        status[providerId] = { ...health };
+        continue;
+      }
+    }
+
     // Check and update expired circuit breaker status
     if (health.circuitState === "open") {
       if (health.circuitOpenUntil && now > health.circuitOpenUntil) {
@@ -560,6 +883,36 @@ export function resetCircuit(providerId: number): void {
   persistStateToRedis(providerId, health);
 }
 
+/**
+ * 强制将熔断器状态关闭并写回 Redis(跨实例立即生效)
+ * 典型使用场景:管理员禁用熔断器配置后,应立即解除 OPEN/HALF-OPEN 拦截。
+ */
+export async function forceCloseCircuitState(
+  providerId: number,
+  options?: { reason?: string }
+): Promise<void> {
+  const health = healthMap.get(providerId);
+  const previousState = health?.circuitState ?? null;
+
+  if (health) {
+    resetHealthToClosed(health);
+  }
+
+  await saveCircuitState(providerId, {
+    failureCount: 0,
+    lastFailureTime: null,
+    circuitState: "closed",
+    circuitOpenUntil: null,
+    halfOpenSuccessCount: 0,
+  });
+
+  logger.info(`[CircuitBreaker] Provider ${providerId} circuit forced closed`, {
+    providerId,
+    previousState,
+    reason: options?.reason,
+  });
+}
+
 /**
  * 将熔断器从 OPEN 状态转换到 HALF_OPEN 状态(用于智能探测)
  * 比直接 resetCircuit 更安全,允许通过 HALF_OPEN 阶段验证恢复
@@ -605,12 +958,15 @@ export function tripToHalfOpen(providerId: number): boolean {
  * 清除供应商的配置缓存(供应商更新后调用)
  */
 export function clearConfigCache(providerId: number): void {
+  bumpConfigCacheVersion(providerId);
+  configLoadInFlight.delete(providerId);
+
   const health = healthMap.get(providerId);
   if (health) {
     health.config = null;
     health.configLoadedAt = null;
-    logger.debug(`[CircuitBreaker] Cleared config cache for provider ${providerId}`);
   }
+  logger.debug(`[CircuitBreaker] Cleared config cache for provider ${providerId}`);
 }
 
 /**
@@ -621,6 +977,8 @@ export async function clearProviderState(providerId: number): Promise<void> {
   // 清除内存状态
   healthMap.delete(providerId);
   loadedFromRedis.delete(providerId);
+  configLoadInFlight.delete(providerId);
+  configCacheVersion.delete(providerId);
 
   // 清除 Redis 状态
   const { deleteCircuitState } = await import("@/lib/redis/circuit-breaker-state");

+ 46 - 1
src/lib/redis/__tests__/pubsub.test.ts

@@ -80,8 +80,10 @@ describe("Redis Pub/Sub cache invalidation", () => {
     expect(base.duplicate).toHaveBeenCalledTimes(1);
     expect(subscriber.subscribe).toHaveBeenCalledWith("test-channel");
 
-    subscriber.emit("message", "test-channel", Date.now().toString());
+    const message = Date.now().toString();
+    subscriber.emit("message", "test-channel", message);
     expect(onInvalidate).toHaveBeenCalledTimes(1);
+    expect(onInvalidate).toHaveBeenCalledWith(message);
 
     cleanup!();
     subscriber.emit("message", "test-channel", Date.now().toString());
@@ -158,6 +160,49 @@ describe("Redis Pub/Sub cache invalidation", () => {
     expect(onInvalidate).not.toHaveBeenCalled();
   });
 
+  test("subscribeCacheInvalidation: should backoff reconnect attempts after connection error", async () => {
+    vi.useFakeTimers();
+    vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
+
+    try {
+      const base = new MockRedis();
+      const subscriber1 = new MockRedis();
+      const subscriber2 = new MockRedis();
+      base.duplicate.mockReturnValueOnce(subscriber1).mockReturnValueOnce(subscriber2);
+      subscriber2.subscribe.mockResolvedValue(1);
+
+      const { getRedisClient } = await import("@/lib/redis/client");
+      (getRedisClient as unknown as ReturnType<typeof vi.fn>).mockReturnValue(base);
+
+      const { subscribeCacheInvalidation } = await import("@/lib/redis/pubsub");
+
+      const cleanup1Promise = subscribeCacheInvalidation("test-channel", vi.fn());
+      subscriber1.emit("error", new Error("Connection refused"));
+
+      const cleanup1 = await cleanup1Promise;
+      expect(cleanup1).toBeNull();
+      expect(base.duplicate).toHaveBeenCalledTimes(1);
+
+      const cleanup2Promise = subscribeCacheInvalidation("test-channel", vi.fn());
+      expect(base.duplicate).toHaveBeenCalledTimes(1);
+
+      await vi.advanceTimersByTimeAsync(999);
+      expect(base.duplicate).toHaveBeenCalledTimes(1);
+
+      await vi.advanceTimersByTimeAsync(1);
+      expect(base.duplicate).toHaveBeenCalledTimes(2);
+
+      subscriber2.status = "ready";
+      subscriber2.emit("ready");
+
+      const cleanup2 = await cleanup2Promise;
+      expect(cleanup2).not.toBeNull();
+      cleanup2!();
+    } finally {
+      vi.useRealTimers();
+    }
+  });
+
   test("subscribeCacheInvalidation: should rollback callback when subscribe fails and allow retry", async () => {
     const base = new MockRedis();
     const subscriber = new MockRedis();

+ 122 - 55
src/lib/redis/pubsub.ts

@@ -10,7 +10,7 @@ export const CHANNEL_SENSITIVE_WORDS_UPDATED = "cch:cache:sensitive_words:update
 // API Key 集合发生变化(典型:创建新 key)时,通知各实例重建 Vacuum Filter,避免误拒绝
 export const CHANNEL_API_KEYS_UPDATED = "cch:cache:api_keys:updated";
 
-type CacheInvalidationCallback = () => void;
+type CacheInvalidationCallback = (message: string) => void;
 
 let subscriberClient: Redis | null = null;
 let subscriberReady: Promise<Redis> | null = null;
@@ -19,6 +19,21 @@ const subscribedChannels = new Set<string>();
 
 let resubscribeInFlight: Promise<void> | null = null;
 
+const SUBSCRIBER_CONNECT_TIMEOUT_MS = 10000;
+const SUBSCRIBER_CONNECT_BACKOFF_BASE_MS = 1000;
+const SUBSCRIBER_CONNECT_BACKOFF_MAX_MS = 60000;
+
+let subscriberConnectFailures = 0;
+let subscriberNextConnectAt = 0;
+
+function computeSubscriberConnectBackoffMs(consecutiveFailures: number): number {
+  if (!Number.isFinite(consecutiveFailures) || consecutiveFailures <= 0) return 0;
+
+  const exponent = Math.min(consecutiveFailures - 1, 10);
+  const backoffMs = SUBSCRIBER_CONNECT_BACKOFF_BASE_MS * 2 ** exponent;
+  return Math.min(SUBSCRIBER_CONNECT_BACKOFF_MAX_MS, backoffMs);
+}
+
 async function resubscribeAll(sub: Redis): Promise<void> {
   if (resubscribeInFlight) return resubscribeInFlight;
 
@@ -62,13 +77,25 @@ async function resubscribeAll(sub: Redis): Promise<void> {
 function ensureSubscriber(baseClient: Redis): Promise<Redis> {
   if (subscriberReady) return subscriberReady;
 
+  const now = Date.now();
+  const startDelayMs = Math.max(0, subscriberNextConnectAt - now);
+
   subscriberReady = new Promise<Redis>((resolve, reject) => {
-    const sub = baseClient.duplicate();
+    let sub: Redis | null = null;
     let timeoutId: ReturnType<typeof setTimeout> | null = null;
+    let startDelayTimeoutId: ReturnType<typeof setTimeout> | null = null;
+    let settled = false;
 
     function cleanup(): void {
-      sub.off("ready", onReady);
-      sub.off("error", onError);
+      if (startDelayTimeoutId) {
+        clearTimeout(startDelayTimeoutId);
+        startDelayTimeoutId = null;
+      }
+
+      if (sub) {
+        sub.off("ready", onReady);
+        sub.off("error", onError);
+      }
 
       if (timeoutId) {
         clearTimeout(timeoutId);
@@ -77,10 +104,24 @@ function ensureSubscriber(baseClient: Redis): Promise<Redis> {
     }
 
     function fail(error: Error): void {
+      if (settled) return;
+      settled = true;
+
       cleanup();
       subscriberReady = null;
+
+      subscriberConnectFailures++;
+      const backoffMs = computeSubscriberConnectBackoffMs(subscriberConnectFailures);
+      subscriberNextConnectAt = Date.now() + backoffMs;
+
+      logger.warn("[RedisPubSub] Subscriber connection failed", {
+        error,
+        consecutiveFailures: subscriberConnectFailures,
+        nextRetryAt: new Date(subscriberNextConnectAt).toISOString(),
+        backoffMs,
+      });
       try {
-        sub.disconnect();
+        sub?.disconnect();
       } catch {
         // ignore
       }
@@ -88,23 +129,37 @@ function ensureSubscriber(baseClient: Redis): Promise<Redis> {
     }
 
     function onReady(): void {
+      if (settled) return;
+      settled = true;
+
       cleanup();
-      subscriberClient = sub;
+      if (!sub) {
+        subscriberReady = null;
+        reject(new Error("Redis subscriber connection ready without client"));
+        return;
+      }
+
+      const readySub = sub;
+
+      subscriberClient = readySub;
       subscribedChannels.clear();
 
-      sub.on("error", (error) =>
+      subscriberConnectFailures = 0;
+      subscriberNextConnectAt = 0;
+
+      readySub.on("error", (error) =>
         logger.warn("[RedisPubSub] Subscriber connection error", { error })
       );
-      sub.on("close", () => subscribedChannels.clear());
-      sub.on("end", () => subscribedChannels.clear());
-      sub.on("ready", () => void resubscribeAll(sub));
+      readySub.on("close", () => subscribedChannels.clear());
+      readySub.on("end", () => subscribedChannels.clear());
+      readySub.on("ready", () => void resubscribeAll(readySub));
 
-      sub.on("message", (channel: string) => {
+      readySub.on("message", (channel: string, message: string) => {
         const callbacks = subscriptions.get(channel);
         if (!callbacks || callbacks.size === 0) return;
         for (const cb of callbacks) {
           try {
-            cb();
+            cb(message);
           } catch (error) {
             logger.error("[RedisPubSub] Callback error", { channel, error });
           }
@@ -112,23 +167,33 @@ function ensureSubscriber(baseClient: Redis): Promise<Redis> {
       });
 
       logger.info("[RedisPubSub] Subscriber connection ready");
-      resolve(sub);
+      resolve(readySub);
     }
 
     function onError(error: Error): void {
-      logger.warn("[RedisPubSub] Subscriber connection error", { error });
       fail(error);
     }
 
-    sub.once("ready", onReady);
-    sub.once("error", onError);
+    function startConnection(): void {
+      if (settled) return;
 
-    // Timeout 10 seconds
-    timeoutId = setTimeout(() => {
-      if (sub.status !== "ready") {
-        fail(new Error("Redis subscriber connection timeout"));
-      }
-    }, 10000);
+      sub = baseClient.duplicate();
+      sub.once("ready", onReady);
+      sub.once("error", onError);
+
+      // Timeout 10 seconds
+      timeoutId = setTimeout(() => {
+        if (sub?.status !== "ready") {
+          fail(new Error("Redis subscriber connection timeout"));
+        }
+      }, SUBSCRIBER_CONNECT_TIMEOUT_MS);
+    }
+
+    if (startDelayMs > 0) {
+      startDelayTimeoutId = setTimeout(startConnection, startDelayMs);
+    } else {
+      startConnection();
+    }
   });
 
   return subscriberReady;
@@ -137,12 +202,12 @@ function ensureSubscriber(baseClient: Redis): Promise<Redis> {
 /**
  * Publish cache invalidation (silent fail, auto-degrade)
  */
-export async function publishCacheInvalidation(channel: string): Promise<void> {
+export async function publishCacheInvalidation(channel: string, message?: string): Promise<void> {
   const redis = getRedisClient();
   if (!redis) return;
 
   try {
-    await redis.publish(channel, Date.now().toString());
+    await redis.publish(channel, message ?? Date.now().toString());
   } catch (error) {
     logger.warn("[RedisPubSub] Failed to publish cache invalidation", { channel, error });
   }
@@ -159,43 +224,45 @@ export async function subscribeCacheInvalidation(
   const baseClient = getRedisClient();
   if (!baseClient) return null;
 
+  let sub: Redis;
   try {
-    const sub = await ensureSubscriber(baseClient);
-
-    const callbacks = subscriptions.get(channel) ?? new Set<CacheInvalidationCallback>();
-    callbacks.add(callback);
-    subscriptions.set(channel, callbacks);
+    sub = await ensureSubscriber(baseClient);
+  } catch {
+    // ensureSubscriber 内部已记录连接失败与 backoff 信息,避免这里按 channel 重复刷屏
+    return null;
+  }
 
-    if (!subscribedChannels.has(channel)) {
-      try {
-        await sub.subscribe(channel);
-        subscribedChannels.add(channel);
-        logger.info("[RedisPubSub] Subscribed to channel", { channel });
-      } catch (error) {
-        callbacks.delete(callback);
-        if (callbacks.size === 0) {
-          subscriptions.delete(channel);
-        }
-        throw error;
+  const callbacks = subscriptions.get(channel) ?? new Set<CacheInvalidationCallback>();
+  callbacks.add(callback);
+  subscriptions.set(channel, callbacks);
+
+  if (!subscribedChannels.has(channel)) {
+    try {
+      await sub.subscribe(channel);
+      subscribedChannels.add(channel);
+      logger.info("[RedisPubSub] Subscribed to channel", { channel });
+    } catch (error) {
+      callbacks.delete(callback);
+      if (callbacks.size === 0) {
+        subscriptions.delete(channel);
       }
+      logger.warn("[RedisPubSub] Failed to subscribe channel", { channel, error });
+      return null;
     }
+  }
 
-    return () => {
-      const cbs = subscriptions.get(channel);
-      if (!cbs) return;
+  return () => {
+    const cbs = subscriptions.get(channel);
+    if (!cbs) return;
 
-      cbs.delete(callback);
+    cbs.delete(callback);
 
-      if (cbs.size === 0) {
-        subscriptions.delete(channel);
-        subscribedChannels.delete(channel);
-        if (subscriberClient) {
-          void subscriberClient.unsubscribe(channel);
-        }
+    if (cbs.size === 0) {
+      subscriptions.delete(channel);
+      subscribedChannels.delete(channel);
+      if (subscriberClient) {
+        void subscriberClient.unsubscribe(channel);
       }
-    };
-  } catch (error) {
-    logger.warn("[RedisPubSub] Failed to subscribe cache invalidation", { channel, error });
-    return null;
-  }
+    }
+  };
 }

+ 498 - 0
tests/unit/lib/circuit-breaker.test.ts

@@ -0,0 +1,498 @@
+import { afterEach, describe, expect, test, vi } from "vitest";
+
+type SavedCircuitState = {
+  failureCount: number;
+  lastFailureTime: number | null;
+  circuitState: "closed" | "open" | "half-open";
+  circuitOpenUntil: number | null;
+  halfOpenSuccessCount: number;
+};
+
+type CircuitBreakerConfig = {
+  failureThreshold: number;
+  openDuration: number;
+  halfOpenSuccessThreshold: number;
+};
+
+function createLoggerMock() {
+  return {
+    debug: vi.fn(),
+    info: vi.fn(),
+    warn: vi.fn(),
+    trace: vi.fn(),
+    error: vi.fn(),
+    fatal: vi.fn(),
+  };
+}
+
+function setupFakeTime(): void {
+  vi.useFakeTimers();
+  vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
+}
+
+function setupCircuitBreakerMocks(options?: {
+  redis?: {
+    loadCircuitState?: (providerId: number) => Promise<SavedCircuitState | null>;
+    loadAllCircuitStates?: (providerIds: number[]) => Promise<Map<number, SavedCircuitState>>;
+    saveCircuitState?: (providerId: number, state: SavedCircuitState) => Promise<void>;
+  };
+  config?: {
+    defaultConfig?: CircuitBreakerConfig;
+    loadProviderCircuitConfig?: (providerId: number) => Promise<CircuitBreakerConfig>;
+  };
+  pubsub?: {
+    publishCacheInvalidation?: (channel: string, message?: string) => Promise<void>;
+    subscribeCacheInvalidation?: (
+      channel: string,
+      callback: (message: string) => void
+    ) => Promise<(() => void) | null>;
+  };
+}): void {
+  const defaultConfig: CircuitBreakerConfig = options?.config?.defaultConfig ?? {
+    failureThreshold: 5,
+    openDuration: 1800000,
+    halfOpenSuccessThreshold: 2,
+  };
+
+  vi.doMock("@/lib/logger", () => ({ logger: createLoggerMock() }));
+  vi.doMock("@/lib/redis/circuit-breaker-state", () => ({
+    loadCircuitState: options?.redis?.loadCircuitState ?? vi.fn(async () => null),
+    loadAllCircuitStates: options?.redis?.loadAllCircuitStates ?? vi.fn(async () => new Map()),
+    saveCircuitState: options?.redis?.saveCircuitState ?? vi.fn(async () => {}),
+  }));
+  vi.doMock("@/lib/redis/circuit-breaker-config", () => ({
+    DEFAULT_CIRCUIT_BREAKER_CONFIG: defaultConfig,
+    loadProviderCircuitConfig:
+      options?.config?.loadProviderCircuitConfig ?? vi.fn(async () => defaultConfig),
+  }));
+  vi.doMock("@/lib/redis/pubsub", () => ({
+    publishCacheInvalidation: options?.pubsub?.publishCacheInvalidation ?? vi.fn(async () => {}),
+    subscribeCacheInvalidation:
+      options?.pubsub?.subscribeCacheInvalidation ?? vi.fn(async () => null),
+  }));
+}
+
+afterEach(() => {
+  vi.useRealTimers();
+});
+
+describe("circuit-breaker", () => {
+  test("failureThreshold=0 时应视为禁用:即便 Redis 为 OPEN 也不应阻止请求,并自动复位为 CLOSED", async () => {
+    setupFakeTime();
+
+    vi.resetModules();
+
+    let redisState: SavedCircuitState | null = {
+      failureCount: 10,
+      lastFailureTime: Date.now() - 1000,
+      circuitState: "open",
+      circuitOpenUntil: Date.now() + 300000,
+      halfOpenSuccessCount: 0,
+    };
+
+    const loadStateMock = vi.fn(async () => redisState);
+    const saveStateMock = vi.fn(async (_providerId: number, state: SavedCircuitState) => {
+      redisState = state;
+    });
+
+    setupCircuitBreakerMocks({
+      redis: {
+        loadCircuitState: loadStateMock,
+        saveCircuitState: saveStateMock,
+      },
+      config: {
+        loadProviderCircuitConfig: vi.fn(async () => ({
+          failureThreshold: 0,
+          openDuration: 1800000,
+          halfOpenSuccessThreshold: 2,
+        })),
+      },
+    });
+
+    const { getCircuitState, isCircuitOpen } = await import("@/lib/circuit-breaker");
+
+    expect(await isCircuitOpen(1)).toBe(false);
+    expect(getCircuitState(1)).toBe("closed");
+
+    const lastState = saveStateMock.mock.calls[saveStateMock.mock.calls.length - 1]?.[1] as
+      | SavedCircuitState
+      | undefined;
+    expect(lastState?.circuitState).toBe("closed");
+    expect(lastState?.failureCount).toBe(0);
+    expect(lastState?.lastFailureTime).toBeNull();
+    expect(lastState?.circuitOpenUntil).toBeNull();
+    expect(lastState?.halfOpenSuccessCount).toBe(0);
+  });
+
+  test("getAllHealthStatusAsync: failureThreshold=0 时应强制返回 CLOSED 并写回 Redis", async () => {
+    setupFakeTime();
+
+    vi.resetModules();
+
+    const openState: SavedCircuitState = {
+      failureCount: 10,
+      lastFailureTime: Date.now() - 1000,
+      circuitState: "open",
+      circuitOpenUntil: Date.now() + 300000,
+      halfOpenSuccessCount: 0,
+    };
+
+    let savedState: SavedCircuitState | null = null;
+    const saveStateMock = vi.fn(async (_providerId: number, state: SavedCircuitState) => {
+      savedState = state;
+    });
+
+    setupCircuitBreakerMocks({
+      redis: {
+        loadCircuitState: vi.fn(async () => null),
+        loadAllCircuitStates: vi.fn(async () => new Map([[1, openState]])),
+        saveCircuitState: saveStateMock,
+      },
+      config: {
+        loadProviderCircuitConfig: vi.fn(async () => ({
+          failureThreshold: 0,
+          openDuration: 1800000,
+          halfOpenSuccessThreshold: 2,
+        })),
+      },
+    });
+
+    const { getAllHealthStatusAsync } = await import("@/lib/circuit-breaker");
+
+    const status = await getAllHealthStatusAsync([1], { forceRefresh: true });
+    expect(status[1]?.circuitState).toBe("closed");
+
+    expect(savedState?.circuitState).toBe("closed");
+    expect(savedState?.failureCount).toBe(0);
+    expect(savedState?.lastFailureTime).toBeNull();
+    expect(savedState?.circuitOpenUntil).toBeNull();
+    expect(savedState?.halfOpenSuccessCount).toBe(0);
+  });
+
+  test("getAllHealthStatusAsync: Redis 无状态时应清理内存中的非 CLOSED 状态(避免展示/筛选残留)", async () => {
+    setupFakeTime();
+
+    vi.resetModules();
+
+    const openState: SavedCircuitState = {
+      failureCount: 10,
+      lastFailureTime: Date.now() - 1000,
+      circuitState: "open",
+      circuitOpenUntil: Date.now() + 300000,
+      halfOpenSuccessCount: 0,
+    };
+
+    let loadCalls = 0;
+    const loadAllCircuitStatesMock = vi.fn(async () => {
+      loadCalls++;
+      if (loadCalls === 1) {
+        return new Map([[1, openState]]);
+      }
+      return new Map();
+    });
+
+    setupCircuitBreakerMocks({
+      redis: {
+        loadCircuitState: vi.fn(async () => null),
+        loadAllCircuitStates: loadAllCircuitStatesMock,
+        saveCircuitState: vi.fn(async () => {}),
+      },
+      config: {
+        loadProviderCircuitConfig: vi.fn(async () => ({
+          failureThreshold: 5,
+          openDuration: 1800000,
+          halfOpenSuccessThreshold: 2,
+        })),
+      },
+    });
+
+    const { getAllHealthStatusAsync, getCircuitState } = await import("@/lib/circuit-breaker");
+
+    const first = await getAllHealthStatusAsync([1], { forceRefresh: true });
+    expect(first[1]?.circuitState).toBe("open");
+    expect(getCircuitState(1)).toBe("open");
+
+    const second = await getAllHealthStatusAsync([1], { forceRefresh: true });
+    expect(second[1]?.circuitState).toBe("closed");
+    expect(getCircuitState(1)).toBe("closed");
+  });
+
+  test("recordFailure: 已处于 OPEN 时不应重置 circuitOpenUntil(避免延长熔断时间)", async () => {
+    setupFakeTime();
+
+    vi.resetModules();
+
+    let redisState: SavedCircuitState | null = null;
+    const loadStateMock = vi.fn(async () => redisState);
+    const saveStateMock = vi.fn(async (_providerId: number, state: SavedCircuitState) => {
+      redisState = state;
+    });
+
+    const sendAlertMock = vi.fn(async () => {});
+
+    vi.doMock("@/lib/notification/notifier", () => ({
+      sendCircuitBreakerAlert: sendAlertMock,
+    }));
+    vi.doMock("@/drizzle/schema", () => ({
+      providers: { id: "id", name: "name" },
+    }));
+    vi.doMock("drizzle-orm", () => ({ eq: vi.fn(() => ({})) }));
+    vi.doMock("@/drizzle/db", () => ({
+      db: {
+        select: vi.fn(() => ({
+          from: vi.fn(() => ({
+            where: vi.fn(() => ({
+              limit: vi.fn(async () => [{ name: "Test Provider" }]),
+            })),
+          })),
+        })),
+      },
+    }));
+
+    setupCircuitBreakerMocks({
+      redis: {
+        loadCircuitState: loadStateMock,
+        saveCircuitState: saveStateMock,
+      },
+      config: {
+        loadProviderCircuitConfig: vi.fn(async () => ({
+          failureThreshold: 2,
+          openDuration: 300000,
+          halfOpenSuccessThreshold: 2,
+        })),
+      },
+    });
+
+    const { recordFailure } = await import("@/lib/circuit-breaker");
+
+    await recordFailure(1, new Error("boom"));
+    await recordFailure(1, new Error("boom"));
+
+    expect(redisState?.circuitState).toBe("open");
+    const openUntil = redisState?.circuitOpenUntil;
+    expect(openUntil).toBe(Date.now() + 300000);
+
+    vi.advanceTimersByTime(1000);
+
+    await recordFailure(1, new Error("boom"));
+    expect(redisState?.circuitOpenUntil).toBe(openUntil);
+
+    // recordFailure 在达到阈值后会触发异步告警(dynamic import + non-blocking)。
+    // 切回真实计时器推进事件循环,避免任务悬挂导致后续用例 mock 串台。
+    vi.useRealTimers();
+    await expect.poll(() => sendAlertMock.mock.calls.length, { timeout: 1000 }).toBe(1);
+  });
+
+  test("配置加载失败时应缓存默认配置,避免重复请求配置存储", async () => {
+    setupFakeTime();
+
+    vi.resetModules();
+
+    const loadProviderCircuitConfigMock = vi.fn(async () => {
+      throw new Error("redis down");
+    });
+
+    setupCircuitBreakerMocks({
+      config: {
+        defaultConfig: {
+          failureThreshold: 100,
+          openDuration: 1800000,
+          halfOpenSuccessThreshold: 2,
+        },
+        loadProviderCircuitConfig: loadProviderCircuitConfigMock,
+      },
+    });
+
+    const { recordFailure } = await import("@/lib/circuit-breaker");
+
+    await recordFailure(1, new Error("boom"));
+    await recordFailure(1, new Error("boom"));
+
+    expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
+  });
+
+  test("并发加载配置时应进行 in-flight 合并,避免重复请求配置存储", async () => {
+    setupFakeTime();
+
+    vi.resetModules();
+
+    const loadProviderCircuitConfigMock = vi.fn(
+      async () =>
+        await new Promise<CircuitBreakerConfig>((resolve) => {
+          setTimeout(() => {
+            resolve({
+              failureThreshold: 100,
+              openDuration: 1800000,
+              halfOpenSuccessThreshold: 2,
+            });
+          }, 50);
+        })
+    );
+
+    setupCircuitBreakerMocks({
+      config: {
+        loadProviderCircuitConfig: loadProviderCircuitConfigMock,
+      },
+    });
+
+    const { recordFailure } = await import("@/lib/circuit-breaker");
+
+    const p1 = recordFailure(1, new Error("boom"));
+    const p2 = recordFailure(1, new Error("boom"));
+
+    await vi.advanceTimersByTimeAsync(50);
+    await Promise.all([p1, p2]);
+
+    expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
+  });
+
+  test("收到配置失效通知后应清除配置缓存并触发重新加载(跨实例一致性)", async () => {
+    setupFakeTime();
+
+    const originalCi = process.env.CI;
+    process.env.CI = "false";
+
+    try {
+      vi.resetModules();
+
+      let onInvalidation: ((message: string) => void) | null = null;
+
+      const loadProviderCircuitConfigMock = vi
+        .fn()
+        .mockResolvedValueOnce({
+          failureThreshold: 5,
+          openDuration: 1800000,
+          halfOpenSuccessThreshold: 2,
+        })
+        .mockResolvedValueOnce({
+          failureThreshold: 0,
+          openDuration: 1800000,
+          halfOpenSuccessThreshold: 2,
+        });
+
+      const subscribeCacheInvalidationMock = vi.fn(
+        async (_channel: string, cb: (message: string) => void) => {
+          onInvalidation = cb;
+          return () => {};
+        }
+      );
+
+      setupCircuitBreakerMocks({
+        config: {
+          loadProviderCircuitConfig: loadProviderCircuitConfigMock,
+        },
+        pubsub: {
+          subscribeCacheInvalidation: subscribeCacheInvalidationMock,
+        },
+      });
+
+      const { recordFailure } = await import("@/lib/circuit-breaker");
+
+      await recordFailure(1, new Error("boom"));
+      expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
+
+      expect(subscribeCacheInvalidationMock).toHaveBeenCalledTimes(1);
+      expect(onInvalidation).not.toBeNull();
+
+      onInvalidation!(JSON.stringify({ providerIds: [1] }));
+
+      await recordFailure(1, new Error("boom"));
+      expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(2);
+    } finally {
+      if (originalCi === undefined) {
+        delete process.env.CI;
+      } else {
+        process.env.CI = originalCi;
+      }
+    }
+  });
+
+  test("失效通知发生在配置加载期间时应重试,避免把旧配置写回缓存", async () => {
+    setupFakeTime();
+
+    const originalCi = process.env.CI;
+    process.env.CI = "false";
+
+    try {
+      vi.resetModules();
+
+      let onInvalidation: ((message: string) => void) | null = null;
+
+      const deferred = <T>() => {
+        let resolve!: (value: T) => void;
+        let reject!: (reason?: unknown) => void;
+        const promise = new Promise<T>((res, rej) => {
+          resolve = res;
+          reject = rej;
+        });
+        return { promise, resolve, reject };
+      };
+
+      const first = deferred<CircuitBreakerConfig>();
+      const second = deferred<CircuitBreakerConfig>();
+
+      const loadProviderCircuitConfigMock = vi
+        .fn()
+        .mockImplementationOnce(async () => await first.promise)
+        .mockImplementationOnce(async () => await second.promise);
+
+      const subscribeCacheInvalidationMock = vi.fn(
+        async (_channel: string, cb: (message: string) => void) => {
+          onInvalidation = cb;
+          return () => {};
+        }
+      );
+
+      setupCircuitBreakerMocks({
+        config: {
+          loadProviderCircuitConfig: loadProviderCircuitConfigMock,
+        },
+        pubsub: {
+          subscribeCacheInvalidation: subscribeCacheInvalidationMock,
+        },
+      });
+
+      const { getProviderHealthInfo, recordFailure } = await import("@/lib/circuit-breaker");
+
+      const failurePromise = recordFailure(1, new Error("boom"));
+
+      // recordFailure 会先 await getOrCreateHealth(包含 Redis 同步),这里让出若干微任务以触发配置加载
+      for (let i = 0; i < 5 && loadProviderCircuitConfigMock.mock.calls.length === 0; i++) {
+        await Promise.resolve();
+      }
+
+      expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(1);
+      expect(onInvalidation).not.toBeNull();
+
+      onInvalidation!(JSON.stringify({ providerIds: [1] }));
+
+      first.resolve({
+        failureThreshold: 5,
+        openDuration: 1800000,
+        halfOpenSuccessThreshold: 2,
+      });
+
+      for (let i = 0; i < 5 && loadProviderCircuitConfigMock.mock.calls.length < 2; i++) {
+        await Promise.resolve();
+      }
+      expect(loadProviderCircuitConfigMock).toHaveBeenCalledTimes(2);
+
+      second.resolve({
+        failureThreshold: 0,
+        openDuration: 1800000,
+        halfOpenSuccessThreshold: 2,
+      });
+
+      await failurePromise;
+
+      const { health } = await getProviderHealthInfo(1);
+      expect(health.failureCount).toBe(0);
+    } finally {
+      if (originalCi === undefined) {
+        delete process.env.CI;
+      } else {
+        process.env.CI = originalCi;
+      }
+    }
+  });
+});