Просмотр исходного кода

fix: honor retry budget for endpoint timeouts

Ding 3 недель назад
Родитель
Сommit
b7bad4e10a

+ 29 - 11
src/app/v1/_lib/proxy/forwarder.ts

@@ -262,6 +262,10 @@ function resolveMaxAttemptsForProvider(
   return clampRetryAttempts(provider.maxRetryAttempts);
 }
 
+function buildEndpointAttemptKey(endpointId: number | null, endpointUrl: string): string {
+  return endpointId != null ? `id:${endpointId}` : `url:${endpointUrl}`;
+}
+
 /**
  * undici request 超时配置(毫秒)
  *
@@ -665,8 +669,12 @@ export class ProxyForwarder {
         });
       }
 
-      let endpointAttemptsEvaluated = 0;
-      let allEndpointAttemptsTimedOut = true;
+      const endpointCandidateKeys = new Set(
+        endpointCandidates.map((endpoint) =>
+          buildEndpointAttemptKey(endpoint.endpointId, endpoint.baseUrl)
+        )
+      );
+      const timedOutEndpointKeys = new Set<string>();
 
       // Endpoint stickiness: track current endpoint index separately from attemptCount
       // - SYSTEM_ERROR (network error): advance to next endpoint
@@ -1026,11 +1034,11 @@ export class ProxyForwarder {
               : lastError.message;
 
           const isTimeoutError = lastError instanceof ProxyError && lastError.statusCode === 524;
-          if (attemptCount <= endpointCandidates.length) {
-            endpointAttemptsEvaluated = attemptCount;
-            if (!isTimeoutError) {
-              allEndpointAttemptsTimedOut = false;
-            }
+
+          if (isTimeoutError) {
+            timedOutEndpointKeys.add(
+              buildEndpointAttemptKey(activeEndpoint.endpointId, activeEndpoint.baseUrl)
+            );
           }
 
           if (activeEndpoint.endpointId != null) {
@@ -1646,10 +1654,10 @@ export class ProxyForwarder {
             if (
               !isMcpRequest &&
               statusCode === 524 &&
-              endpointCandidates.length > 0 &&
-              endpointAttemptsEvaluated >= endpointCandidates.length &&
-              allEndpointAttemptsTimedOut &&
-              currentProvider.providerVendorId
+              currentProvider.providerVendorId &&
+              endpointCandidateKeys.size > 0 &&
+              timedOutEndpointKeys.size >= endpointCandidateKeys.size &&
+              !willRetry
             ) {
               // Record to decision chain BEFORE triggering vendor-type circuit breaker
               session.addProviderToChain(currentProvider, {
@@ -1736,6 +1744,16 @@ export class ProxyForwarder {
 
             // 未耗尽重试次数:等待 100ms 后继续重试当前供应商
             if (willRetry) {
+              if (statusCode === 524) {
+                currentEndpointIndex++;
+                logger.debug("ProxyForwarder: Advancing endpoint index due to upstream timeout", {
+                  providerId: currentProvider.id,
+                  previousEndpointIndex: currentEndpointIndex - 1,
+                  newEndpointIndex: currentEndpointIndex,
+                  maxEndpointIndex: endpointCandidates.length - 1,
+                });
+              }
+
               await new Promise((resolve) => setTimeout(resolve, 100));
               continue;
             }

+ 79 - 0
tests/unit/proxy/proxy-forwarder-hedge-first-byte.test.ts

@@ -343,6 +343,85 @@ describe("ProxyForwarder - first-byte hedge scheduling", () => {
     }
   });
 
+  test("characterization: hedge still launches alternative provider when maxRetryAttempts > 1", async () => {
+    vi.useFakeTimers();
+
+    try {
+      const provider1 = createProvider({
+        id: 1,
+        name: "p1",
+        maxRetryAttempts: 3,
+        firstByteTimeoutStreamingMs: 100,
+      });
+      const provider2 = createProvider({
+        id: 2,
+        name: "p2",
+        maxRetryAttempts: 3,
+        firstByteTimeoutStreamingMs: 100,
+      });
+      const session = createSession();
+      session.setProvider(provider1);
+
+      mocks.pickRandomProviderWithExclusion.mockResolvedValueOnce(provider2);
+
+      const doForward = vi.spyOn(
+        ProxyForwarder as unknown as {
+          doForward: (...args: unknown[]) => Promise<Response>;
+        },
+        "doForward"
+      );
+
+      const controller1 = new AbortController();
+      const controller2 = new AbortController();
+
+      doForward.mockImplementationOnce(async (attemptSession) => {
+        const runtime = attemptSession as ProxySession & AttemptRuntime;
+        runtime.responseController = controller1;
+        runtime.clearResponseTimeout = vi.fn();
+        return createStreamingResponse({
+          label: "p1",
+          firstChunkDelayMs: 220,
+          controller: controller1,
+        });
+      });
+
+      doForward.mockImplementationOnce(async (attemptSession) => {
+        const runtime = attemptSession as ProxySession & AttemptRuntime;
+        runtime.responseController = controller2;
+        runtime.clearResponseTimeout = vi.fn();
+        return createStreamingResponse({
+          label: "p2",
+          firstChunkDelayMs: 40,
+          controller: controller2,
+        });
+      });
+
+      const responsePromise = ProxyForwarder.send(session);
+
+      await vi.advanceTimersByTimeAsync(100);
+
+      expect(doForward).toHaveBeenCalledTimes(2);
+      expect(mocks.pickRandomProviderWithExclusion).toHaveBeenCalledTimes(1);
+
+      const chainBeforeWinner = session.getProviderChain();
+      expect(chainBeforeWinner).toEqual(
+        expect.arrayContaining([
+          expect.objectContaining({ reason: "hedge_triggered", id: 1 }),
+          expect.objectContaining({ reason: "hedge_launched", id: 2 }),
+        ])
+      );
+
+      await vi.advanceTimersByTimeAsync(50);
+      const response = await responsePromise;
+
+      expect(await response.text()).toContain('"provider":"p2"');
+      expect(controller1.signal.aborted).toBe(true);
+      expect(session.provider?.id).toBe(2);
+    } finally {
+      vi.useRealTimers();
+    }
+  });
+
   test("first provider can still win after hedge started if it emits first chunk earlier than fallback", async () => {
     vi.useFakeTimers();
 

+ 137 - 0
tests/unit/proxy/proxy-forwarder-retry-limit.test.ts

@@ -722,6 +722,143 @@ describe("ProxyForwarder - retry limit enforcement", () => {
       vi.useRealTimers();
     }
   });
+
+  test("524 with endpoint pool: should keep retrying until maxRetryAttempts before vendor_type_all_timeout", async () => {
+    vi.useFakeTimers();
+
+    try {
+      const session = createSession();
+      const provider = createProvider({
+        providerType: "claude",
+        providerVendorId: 123,
+        maxRetryAttempts: 5,
+      });
+      session.setProvider(provider);
+
+      mocks.getPreferredProviderEndpoints.mockResolvedValue([
+        makeEndpoint({
+          id: 1,
+          vendorId: 123,
+          providerType: "claude",
+          url: "https://ep1.example.com",
+          lastProbeLatencyMs: 100,
+        }),
+        makeEndpoint({
+          id: 2,
+          vendorId: 123,
+          providerType: "claude",
+          url: "https://ep2.example.com",
+          lastProbeLatencyMs: 200,
+        }),
+      ]);
+
+      vi.mocked(categorizeErrorAsync).mockResolvedValue(ErrorCategory.PROVIDER_ERROR);
+
+      const doForward = vi.spyOn(
+        ProxyForwarder as unknown as { doForward: (...args: unknown[]) => unknown },
+        "doForward"
+      );
+
+      doForward.mockImplementation(async () => {
+        throw new ProxyError("provider timeout", 524, {
+          body: "",
+          providerId: provider.id,
+          providerName: provider.name,
+        });
+      });
+
+      const sendPromise = ProxyForwarder.send(session);
+      let caughtError: Error | null = null;
+      sendPromise.catch((error) => {
+        caughtError = error as Error;
+      });
+      await vi.runAllTimersAsync();
+
+      expect(caughtError).not.toBeNull();
+      expect(caughtError).toBeInstanceOf(ProxyError);
+      expect(doForward).toHaveBeenCalledTimes(5);
+
+      const chain = session.getProviderChain();
+      expect(chain).toHaveLength(5);
+      expect(chain.map((item) => item.endpointId)).toEqual([1, 2, 2, 2, 2]);
+
+      const vendorTimeoutItems = chain.filter((item) => item.reason === "vendor_type_all_timeout");
+      expect(vendorTimeoutItems).toHaveLength(1);
+      expect(vendorTimeoutItems[0]?.attemptNumber).toBe(5);
+
+      expect(mocks.recordVendorTypeAllEndpointsTimeout).toHaveBeenCalledTimes(1);
+    } finally {
+      vi.useRealTimers();
+    }
+  });
+
+  test("524 with endpoint pool: maxRetryAttempts=2 should stop at budget exhaustion", async () => {
+    vi.useFakeTimers();
+
+    try {
+      const session = createSession();
+      const provider = createProvider({
+        providerType: "claude",
+        providerVendorId: 123,
+        maxRetryAttempts: 2,
+      });
+      session.setProvider(provider);
+
+      mocks.getPreferredProviderEndpoints.mockResolvedValue([
+        makeEndpoint({
+          id: 1,
+          vendorId: 123,
+          providerType: "claude",
+          url: "https://ep1.example.com",
+          lastProbeLatencyMs: 100,
+        }),
+        makeEndpoint({
+          id: 2,
+          vendorId: 123,
+          providerType: "claude",
+          url: "https://ep2.example.com",
+          lastProbeLatencyMs: 200,
+        }),
+      ]);
+
+      vi.mocked(categorizeErrorAsync).mockResolvedValue(ErrorCategory.PROVIDER_ERROR);
+
+      const doForward = vi.spyOn(
+        ProxyForwarder as unknown as { doForward: (...args: unknown[]) => unknown },
+        "doForward"
+      );
+
+      doForward.mockImplementation(async () => {
+        throw new ProxyError("provider timeout", 524, {
+          body: "",
+          providerId: provider.id,
+          providerName: provider.name,
+        });
+      });
+
+      const sendPromise = ProxyForwarder.send(session);
+      let caughtError: Error | null = null;
+      sendPromise.catch((error) => {
+        caughtError = error as Error;
+      });
+      await vi.runAllTimersAsync();
+
+      expect(caughtError).not.toBeNull();
+      expect(caughtError).toBeInstanceOf(ProxyError);
+      expect(doForward).toHaveBeenCalledTimes(2);
+
+      const chain = session.getProviderChain();
+      expect(chain.map((item) => item.endpointId)).toEqual([1, 2]);
+
+      const vendorTimeoutItems = chain.filter((item) => item.reason === "vendor_type_all_timeout");
+      expect(vendorTimeoutItems).toHaveLength(1);
+      expect(vendorTimeoutItems[0]?.attemptNumber).toBe(2);
+
+      expect(mocks.recordVendorTypeAllEndpointsTimeout).toHaveBeenCalledTimes(1);
+    } finally {
+      vi.useRealTimers();
+    }
+  });
 });
 
 describe("ProxyForwarder - endpoint stickiness on retry", () => {