Преглед изворни кода

fix(circuit-breaker): prevent endpoint circuit from never recovering

Three bugs caused a permanent open circuit death spiral:
1. recordEndpointFailure reset circuitOpenUntil on every call while open
2. Probe success never fed back into the circuit breaker
3. failureCount never reset because open state was never exited

Now: timer only sets on initial transition, probe success resets the
circuit via getEndpointCircuitStateSync + resetEndpointCircuit, and
added tests for death spiral prevention and sync state accessor.
ding113 пре 1 недеља
родитељ
комит
632cb856

+ 29 - 21
src/lib/endpoint-circuit-breaker.ts

@@ -142,32 +142,36 @@ export async function recordEndpointFailure(endpointId: number, error: Error): P
   health.lastFailureTime = Date.now();
 
   if (config.failureThreshold > 0 && health.failureCount >= config.failureThreshold) {
-    health.circuitState = "open";
-    health.circuitOpenUntil = Date.now() + config.openDuration;
-    health.halfOpenSuccessCount = 0;
+    if (health.circuitState !== "open") {
+      // Only set timer and alert on initial transition (closed->open or half-open->open)
+      health.circuitState = "open";
+      health.circuitOpenUntil = Date.now() + config.openDuration;
+      health.halfOpenSuccessCount = 0;
 
-    const retryAt = new Date(health.circuitOpenUntil).toISOString();
+      const retryAt = new Date(health.circuitOpenUntil).toISOString();
 
-    logger.warn("[EndpointCircuitBreaker] Endpoint circuit opened", {
-      endpointId,
-      failureCount: health.failureCount,
-      threshold: config.failureThreshold,
-      errorMessage: error.message,
-    });
+      logger.warn("[EndpointCircuitBreaker] Endpoint circuit opened", {
+        endpointId,
+        failureCount: health.failureCount,
+        threshold: config.failureThreshold,
+        errorMessage: error.message,
+      });
 
-    // Async alert (non-blocking)
-    triggerEndpointCircuitBreakerAlert(
-      endpointId,
-      health.failureCount,
-      retryAt,
-      error.message
-    ).catch((err) => {
-      logger.error({
-        action: "trigger_endpoint_circuit_breaker_alert_error",
+      // Async alert (non-blocking)
+      triggerEndpointCircuitBreakerAlert(
         endpointId,
-        error: err instanceof Error ? err.message : String(err),
+        health.failureCount,
+        retryAt,
+        error.message
+      ).catch((err) => {
+        logger.error({
+          action: "trigger_endpoint_circuit_breaker_alert_error",
+          endpointId,
+          error: err instanceof Error ? err.message : String(err),
+        });
       });
-    });
+    }
+    // If already open: failureCount is updated above, but timer stays fixed — no death spiral
   }
 
   persistStateToRedis(endpointId, health);
@@ -200,6 +204,10 @@ export async function recordEndpointSuccess(endpointId: number): Promise<void> {
   }
 }
 
+export function getEndpointCircuitStateSync(endpointId: number): EndpointCircuitState {
+  return healthMap.get(endpointId)?.circuitState ?? "closed";
+}
+
 export async function resetEndpointCircuit(endpointId: number): Promise<void> {
   const health = getOrCreateHealthSync(endpointId);
   health.circuitState = "closed";

+ 11 - 1
src/lib/provider-endpoints/probe.ts

@@ -1,7 +1,7 @@
 import "server-only";
 
 import net from "node:net";
-import { recordEndpointFailure } from "@/lib/endpoint-circuit-breaker";
+import { getEndpointCircuitStateSync, recordEndpointFailure, resetEndpointCircuit } from "@/lib/endpoint-circuit-breaker";
 import { logger } from "@/lib/logger";
 import { findProviderEndpointById, recordProviderEndpointProbeResult } from "@/repository";
 import type { ProviderEndpoint, ProviderEndpointProbeSource } from "@/types/provider";
@@ -229,6 +229,16 @@ export async function probeProviderEndpointAndRecordByEndpoint(input: {
       ? `HTTP ${result.statusCode}`
       : result.errorType || "probe_failed";
     await recordEndpointFailure(input.endpoint.id, new Error(message));
+  } else {
+    // Probe success: reset circuit breaker if endpoint was open/half-open
+    const currentState = getEndpointCircuitStateSync(input.endpoint.id);
+    if (currentState !== "closed") {
+      await resetEndpointCircuit(input.endpoint.id);
+      logger.info("[EndpointProbe] Probe success, circuit reset", {
+        endpointId: input.endpoint.id,
+        previousState: currentState,
+      });
+    }
   }
 
   // Always record probe results to history table (removed filtering logic)

+ 70 - 0
tests/unit/lib/endpoint-circuit-breaker.test.ts

@@ -227,4 +227,74 @@ describe("endpoint-circuit-breaker", () => {
       endpointUrl: "https://custom.example.com/v1/chat/completions",
     });
   });
+
+  test("recordEndpointFailure should NOT reset circuitOpenUntil when already open", async () => {
+    vi.useFakeTimers();
+    vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
+
+    vi.resetModules();
+
+    let redisState: SavedEndpointCircuitState | null = null;
+    const saveMock = vi.fn(async (_endpointId: number, state: SavedEndpointCircuitState) => {
+      redisState = state;
+    });
+
+    vi.doMock("@/lib/logger", () => ({ logger: createLoggerMock() }));
+    vi.doMock("@/lib/notification/notifier", () => ({
+      sendCircuitBreakerAlert: vi.fn(async () => {}),
+    }));
+    vi.doMock("@/lib/redis/endpoint-circuit-breaker-state", () => ({
+      loadEndpointCircuitState: vi.fn(async () => redisState),
+      saveEndpointCircuitState: saveMock,
+      deleteEndpointCircuitState: vi.fn(async () => {}),
+    }));
+
+    const { recordEndpointFailure, isEndpointCircuitOpen } = await import(
+      "@/lib/endpoint-circuit-breaker"
+    );
+
+    // Record 3 failures to open the circuit
+    await recordEndpointFailure(100, new Error("fail"));
+    await recordEndpointFailure(100, new Error("fail"));
+    await recordEndpointFailure(100, new Error("fail"));
+
+    expect(await isEndpointCircuitOpen(100)).toBe(true);
+    const originalOpenUntil = redisState!.circuitOpenUntil;
+    expect(originalOpenUntil).toBe(Date.now() + 300000);
+
+    // Advance 1 min and record another failure — timer must NOT reset
+    vi.advanceTimersByTime(60_000);
+    await recordEndpointFailure(100, new Error("fail again"));
+
+    expect(redisState!.circuitState).toBe("open");
+    expect(redisState!.circuitOpenUntil).toBe(originalOpenUntil); // unchanged!
+    expect(redisState!.failureCount).toBe(4);
+  });
+
+  test("getEndpointCircuitStateSync returns correct state for known and unknown endpoints", async () => {
+    vi.resetModules();
+
+    vi.doMock("@/lib/logger", () => ({ logger: createLoggerMock() }));
+    vi.doMock("@/lib/notification/notifier", () => ({
+      sendCircuitBreakerAlert: vi.fn(async () => {}),
+    }));
+    vi.doMock("@/lib/redis/endpoint-circuit-breaker-state", () => ({
+      loadEndpointCircuitState: vi.fn(async () => null),
+      saveEndpointCircuitState: vi.fn(async () => {}),
+      deleteEndpointCircuitState: vi.fn(async () => {}),
+    }));
+
+    const { getEndpointCircuitStateSync, recordEndpointFailure } = await import(
+      "@/lib/endpoint-circuit-breaker"
+    );
+
+    // Unknown endpoint returns "closed"
+    expect(getEndpointCircuitStateSync(9999)).toBe("closed");
+
+    // After opening the circuit, sync accessor reflects "open"
+    await recordEndpointFailure(200, new Error("a"));
+    await recordEndpointFailure(200, new Error("b"));
+    await recordEndpointFailure(200, new Error("c"));
+    expect(getEndpointCircuitStateSync(200)).toBe("open");
+  });
 });