Просмотр исходного кода

fix(proxy): prevent TypeError in NON_RETRYABLE_CLIENT_ERROR handler with transport errors

Fixes a regression where native transport errors (SocketError, UND_ERR_SOCKET)
could be misclassified as NON_RETRYABLE_CLIENT_ERROR by error rule matching,
causing the error handler to crash with "TypeError: getDetailedErrorMessage is
not a function" instead of properly rethrowing the original error.

Changes:
1. errors.ts: Add transport-error guard in categorizeErrorAsync() at priority 1.5
   - Detects SocketError, undici error codes (UND_ERR_SOCKET, ECONNREFUSED, etc.)
   - Returns SYSTEM_ERROR before error rule matching to prevent misclassification
   - Preserves correct failover behavior for network-level failures

2. forwarder.ts: Bifurcate NON_RETRYABLE_CLIENT_ERROR branch on instanceof check
   - ProxyError path: uses full ProxyError fields (statusCode, upstreamError, etc.)
   - Plain Error path: uses only base Error fields, avoids calling non-existent methods
   - Both paths preserve original error rethrow and provider chain recording

3. Add regression tests:
   - Test A: Verifies plain SocketError + forced NON_RETRYABLE_CLIENT_ERROR doesn't throw TypeError
   - Test B: Verifies categorizeErrorAsync returns SYSTEM_ERROR for transport errors

All 657 proxy unit tests pass. Type check and lint pass.

Co-Authored-By: Claude Opus 4.6 <[email protected]>
ding113 1 месяц назад
Родитель
Сommit
a3fd666e46

+ 50 - 0
src/app/v1/_lib/proxy/errors.ts

@@ -717,6 +717,50 @@ export function isClientAbortError(error: Error): boolean {
   return abortMessages.some((msg) => error.message.includes(msg));
 }
 
+/**
+ * Transport error detection
+ *
+ * Detects native undici/fetch transport errors that should always be classified
+ * as SYSTEM_ERROR regardless of error rule matching.
+ *
+ * These errors indicate network-level failures (DNS, connection, timeout) rather
+ * than application-level issues, and must not be misclassified by error rules
+ * that might match their message content.
+ *
+ * @param error - Error to check
+ * @returns true if error is a transport error
+ */
+function isTransportError(error: Error): boolean {
+  const TRANSPORT_ERROR_CODES = new Set([
+    "UND_ERR_SOCKET",
+    "UND_ERR_CONNECT_TIMEOUT",
+    "UND_ERR_HEADERS_TIMEOUT",
+    "UND_ERR_BODY_TIMEOUT",
+    "ECONNREFUSED",
+    "ECONNRESET",
+    "ETIMEDOUT",
+    "ENOTFOUND",
+    "EAI_AGAIN",
+  ]);
+
+  const TRANSPORT_MESSAGE_SIGNATURES = ["other side closed", "fetch failed"];
+
+  // Check error name
+  if (error.name === "SocketError") return true;
+
+  // Check error code on error itself or cause
+  const code =
+    (error as Error & { code?: string }).code ??
+    (error as Error & { cause?: { code?: string } }).cause?.code;
+  if (code && TRANSPORT_ERROR_CODES.has(code)) return true;
+
+  // Check message for known transport signatures
+  const msg = error.message.toLowerCase();
+  if (TRANSPORT_MESSAGE_SIGNATURES.some((sig) => msg.includes(sig))) return true;
+
+  return false;
+}
+
 /**
  * 限流错误类 - 携带详细的限流上下文信息
  *
@@ -863,6 +907,12 @@ export async function categorizeErrorAsync(error: Error): Promise<ErrorCategory>
     return ErrorCategory.CLIENT_ABORT; // 客户端主动中断
   }
 
+  // 优先级 1.5: Native transport errors — must not be matched by error rules
+  // These are always SYSTEM_ERROR regardless of message content
+  if (isTransportError(error)) {
+    return ErrorCategory.SYSTEM_ERROR;
+  }
+
   // 优先级 2: 不可重试的客户端输入错误检测(白名单模式)
   // 使用异步版本确保错误规则已加载
   if (await isNonRetryableClientErrorAsync(error)) {

+ 63 - 36
src/app/v1/_lib/proxy/forwarder.ts

@@ -1350,8 +1350,6 @@ export class ProxyForwarder {
 
           // ⭐ 3. 不可重试的客户端输入错误处理(不计入熔断器,不重试,立即返回)
           if (errorCategory === ErrorCategory.NON_RETRYABLE_CLIENT_ERROR) {
-            const proxyError = lastError as ProxyError;
-            const statusCode = proxyError.statusCode;
             const detectionResult = await getErrorDetectionResultAsync(lastError);
             const matchedRule =
               detectionResult.matched &&
@@ -1370,42 +1368,71 @@ export class ProxyForwarder {
                   }
                 : undefined;
 
-            logger.warn("ProxyForwarder: Non-retryable client error, stopping immediately", {
-              providerId: currentProvider.id,
-              providerName: currentProvider.name,
-              statusCode: statusCode,
-              statusCodeInferred: proxyError.upstreamError?.statusCodeInferred ?? false,
-              error: errorMessage,
-              attemptNumber: attemptCount,
-              totalProvidersAttempted,
-              reason:
-                "White-listed client error (prompt length, content filter, PDF limit, or thinking format)",
-            });
+            if (lastError instanceof ProxyError) {
+              // Original path: full ProxyError fields available
+              logger.warn("ProxyForwarder: Non-retryable client error, stopping immediately", {
+                providerId: currentProvider.id,
+                providerName: currentProvider.name,
+                statusCode: lastError.statusCode,
+                statusCodeInferred: lastError.upstreamError?.statusCodeInferred ?? false,
+                error: errorMessage,
+                attemptNumber: attemptCount,
+                totalProvidersAttempted,
+                reason:
+                  "White-listed client error (prompt length, content filter, PDF limit, or thinking format)",
+              });
 
-            // 记录到决策链(标记为不可重试的客户端错误)
-            // 注意:不调用 recordFailure(),因为这不是供应商的问题,是客户端输入问题
-            session.addProviderToChain(currentProvider, {
-              ...endpointAudit,
-              reason: "client_error_non_retryable", // 新增的 reason 值
-              circuitState: getCircuitState(currentProvider.id),
-              attemptNumber: attemptCount,
-              errorMessage: errorMessage,
-              statusCode: statusCode,
-              statusCodeInferred: proxyError.upstreamError?.statusCodeInferred ?? false,
-              errorDetails: {
-                provider: {
-                  id: currentProvider.id,
-                  name: currentProvider.name,
-                  statusCode: statusCode,
-                  statusText: proxyError.message,
-                  upstreamBody: proxyError.upstreamError?.body,
-                  upstreamParsed: proxyError.upstreamError?.parsed,
+              // 记录到决策链(标记为不可重试的客户端错误)
+              // 注意:不调用 recordFailure(),因为这不是供应商的问题,是客户端输入问题
+              session.addProviderToChain(currentProvider, {
+                ...endpointAudit,
+                reason: "client_error_non_retryable", // 新增的 reason 值
+                circuitState: getCircuitState(currentProvider.id),
+                attemptNumber: attemptCount,
+                errorMessage: errorMessage,
+                statusCode: lastError.statusCode,
+                statusCodeInferred: lastError.upstreamError?.statusCodeInferred ?? false,
+                errorDetails: {
+                  provider: {
+                    id: currentProvider.id,
+                    name: currentProvider.name,
+                    statusCode: lastError.statusCode,
+                    statusText: lastError.message,
+                    upstreamBody: lastError.upstreamError?.body,
+                    upstreamParsed: lastError.upstreamError?.parsed,
+                  },
+                  clientError: lastError.getDetailedErrorMessage(),
+                  matchedRule,
+                  request: buildRequestDetails(session),
                 },
-                clientError: proxyError.getDetailedErrorMessage(),
-                matchedRule,
-                request: buildRequestDetails(session),
-              },
-            });
+              });
+            } else {
+              // Plain Error path: omit ProxyError-only fields
+              logger.warn(
+                "ProxyForwarder: Non-retryable client error (plain error), stopping immediately",
+                {
+                  providerId: currentProvider.id,
+                  providerName: currentProvider.name,
+                  error: lastError.message,
+                  attemptNumber: attemptCount,
+                  totalProvidersAttempted,
+                  reason: "White-listed client error matched by error rule",
+                }
+              );
+
+              session.addProviderToChain(currentProvider, {
+                ...endpointAudit,
+                reason: "client_error_non_retryable",
+                circuitState: getCircuitState(currentProvider.id),
+                attemptNumber: attemptCount,
+                errorMessage: lastError.message,
+                errorDetails: {
+                  clientError: lastError.message,
+                  matchedRule,
+                  request: buildRequestDetails(session),
+                },
+              });
+            }
 
             // 立即抛出错误,不重试,不切换供应商
             // 白名单错误不计入熔断器,因为是客户端输入问题,不是供应商故障

+ 43 - 39
tests/integration/usage-ledger.test.ts

@@ -278,45 +278,49 @@ run("usage ledger integration", () => {
   });
 
   describe("backfill", () => {
-    test("backfill copies non-warmup message_request rows when ledger rows are missing", {
-      timeout: 60_000,
-    }, async () => {
-      const userId = nextUserId();
-      const providerId = nextProviderId();
-      const keepA = await insertMessageRequestRow({
-        key: nextKey("backfill-a"),
-        userId,
-        providerId,
-        costUsd: "1.100000000000000",
-      });
-      const keepB = await insertMessageRequestRow({
-        key: nextKey("backfill-b"),
-        userId,
-        providerId,
-        costUsd: "2.200000000000000",
-      });
-      const warmup = await insertMessageRequestRow({
-        key: nextKey("backfill-warmup"),
-        userId,
-        providerId,
-        blockedBy: "warmup",
-      });
-
-      await db.delete(usageLedger).where(inArray(usageLedger.requestId, [keepA, keepB, warmup]));
-
-      const summary = await backfillUsageLedger();
-      expect(summary.totalProcessed).toBeGreaterThanOrEqual(2);
-
-      const rows = await db
-        .select({ requestId: usageLedger.requestId })
-        .from(usageLedger)
-        .where(inArray(usageLedger.requestId, [keepA, keepB, warmup]));
-      const requestIds = rows.map((row) => row.requestId);
-
-      expect(requestIds).toContain(keepA);
-      expect(requestIds).toContain(keepB);
-      expect(requestIds).not.toContain(warmup);
-    });
+    test(
+      "backfill copies non-warmup message_request rows when ledger rows are missing",
+      {
+        timeout: 60_000,
+      },
+      async () => {
+        const userId = nextUserId();
+        const providerId = nextProviderId();
+        const keepA = await insertMessageRequestRow({
+          key: nextKey("backfill-a"),
+          userId,
+          providerId,
+          costUsd: "1.100000000000000",
+        });
+        const keepB = await insertMessageRequestRow({
+          key: nextKey("backfill-b"),
+          userId,
+          providerId,
+          costUsd: "2.200000000000000",
+        });
+        const warmup = await insertMessageRequestRow({
+          key: nextKey("backfill-warmup"),
+          userId,
+          providerId,
+          blockedBy: "warmup",
+        });
+
+        await db.delete(usageLedger).where(inArray(usageLedger.requestId, [keepA, keepB, warmup]));
+
+        const summary = await backfillUsageLedger();
+        expect(summary.totalProcessed).toBeGreaterThanOrEqual(2);
+
+        const rows = await db
+          .select({ requestId: usageLedger.requestId })
+          .from(usageLedger)
+          .where(inArray(usageLedger.requestId, [keepA, keepB, warmup]));
+        const requestIds = rows.map((row) => row.requestId);
+
+        expect(requestIds).toContain(keepA);
+        expect(requestIds).toContain(keepB);
+        expect(requestIds).not.toContain(warmup);
+      }
+    );
 
     test("backfill is idempotent when running twice", { timeout: 60_000 }, async () => {
       const requestId = await insertMessageRequestRow({

+ 54 - 0
tests/unit/proxy/proxy-forwarder-retry-limit.test.ts

@@ -1082,3 +1082,57 @@ describe("ProxyForwarder - endpoint stickiness on retry", () => {
     }
   });
 });
+
+describe("NON_RETRYABLE_CLIENT_ERROR regression tests", () => {
+  beforeEach(() => {
+    vi.clearAllMocks();
+    mocks.getPreferredProviderEndpoints.mockResolvedValue([
+      makeEndpoint({ id: 1, vendorId: 1, providerType: "claude", url: "https://ep1.example.com" }),
+    ]);
+  });
+
+  test("NON_RETRYABLE_CLIENT_ERROR with plain SocketError does not throw TypeError", async () => {
+    // 1. Build a synthetic native transport error
+    const socketErr = Object.assign(new Error("other side closed"), {
+      name: "SocketError",
+      code: "UND_ERR_SOCKET",
+    });
+
+    // 2. doForward always throws it
+    const doForward = vi.spyOn(ProxyForwarder as unknown as { doForward: unknown }, "doForward");
+    doForward.mockRejectedValue(socketErr);
+
+    // 3. Force categorizeErrorAsync to return NON_RETRYABLE_CLIENT_ERROR (simulates regression)
+    vi.mocked(categorizeErrorAsync).mockResolvedValue(ErrorCategory.NON_RETRYABLE_CLIENT_ERROR);
+
+    const session = createSession(new URL("https://example.com/v1/messages"));
+    const provider = createProvider({ id: 1, providerType: "claude", providerVendorId: 1 });
+    session.setProvider(provider);
+
+    // 4. Expect the original SocketError to be rethrown, NOT a TypeError
+    await expect(ProxyForwarder.send(session)).rejects.toSatisfy((e: unknown) => {
+      expect(e).toBe(socketErr); // same object reference
+      return true;
+    });
+
+    // 5. Provider chain should have been recorded
+    expect(session.getProviderChain()).toHaveLength(1);
+    expect(session.getProviderChain()[0].reason).toBe("client_error_non_retryable");
+  });
+
+  test("categorizeErrorAsync returns SYSTEM_ERROR for SocketError even if rule would match", async () => {
+    // Import real categorizeErrorAsync (not the mock from the forwarder test)
+    const { categorizeErrorAsync: realCategorize, ErrorCategory: RealErrorCategory } =
+      await vi.importActual<typeof import("@/app/v1/_lib/proxy/errors")>(
+        "@/app/v1/_lib/proxy/errors"
+      );
+
+    const socketErr = Object.assign(new Error("other side closed"), {
+      name: "SocketError",
+      code: "UND_ERR_SOCKET",
+    });
+
+    const category = await realCategorize(socketErr);
+    expect(category).toBe(RealErrorCategory.SYSTEM_ERROR);
+  });
+});