浏览代码

fix(proxy): 修复请求卡死(AgentPool 驱逐阻塞) (#759)

* fix(proxy): 修复 Gemini 流式透传超时链路卡死

- Gemini SSE 透传不再在仅收到 headers 时清除首字节超时
- 首块数据到达后清除首字节超时,并支持 streamingIdleTimeoutMs 静默超时中断
- stats 任务失败时尽量落库/结束追踪,避免请求长期停留在 requesting
- 添加回归测试覆盖无首块/首块后延迟/中途静默三种场景

* chore: format code (fix-hang-stuck-requesting-v2-9058885)

* fix(proxy): 修复请求卡死(AgentPool 驱逐阻塞 + 流式透传健壮性)

- AgentPool 驱逐时优先 destroy,避免 close 等待 in-flight 导致 getAgent/cleanup 卡住\n- Gemini SSE 透传 stats 读取增加内存上限、完善 abort reason 判定与清理日志\n- 补齐/加强回归测试

* test(proxy): 补齐 AgentPool 驱逐卡死回归用例

* fix(proxy): 按 AI review 加固 stats 读取/清理

* test(proxy): 回归用例结束后恢复 fake timers

* fix(proxy): AgentPool 驱逐不等待 destroy/close

- closeAgent 触发 destroy/close 后不 await,避免驱逐路径被卡住\n- Gemini 透传 stats:仅在收到非空 chunk 后清首字节超时\n- 回归测试:覆盖 close 无 destroy 且永不返回的场景

* test(proxy): 强化 idle watchdog 与 mock 隔离

- 透传 stats:done=false 时也会重置 idle timer(避免 value 异常导致 watchdog 不工作)\n- 回归测试:reset hoisted isHttp2Enabled mock,避免跨用例污染

* test(proxy): 修复 AgentPool shutdown 用例 close mock

* test: 稳定 endpoint-circuit-breaker 告警用例(避免 CI/bun 串扰)

* test: 彻底避免 endpoint-circuit-breaker 异步告警串扰(CI/bun)

---------

Co-authored-by: tesgth032 <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
tesgth032 3 天之前
父节点
当前提交
f6bb0f0034

+ 284 - 33
src/app/v1/_lib/proxy/response-handler.ts

@@ -971,64 +971,188 @@ export class ProxyResponseHandler {
           }
         );
 
-        // ⭐ gemini 透传立即清除首字节超时:透传路径收到响应即视为首字节到达
-        const sessionWithCleanup = session as typeof session & {
-          clearResponseTimeout?: () => void;
-        };
-        if (sessionWithCleanup.clearResponseTimeout) {
-          sessionWithCleanup.clearResponseTimeout();
-          // ⭐ 同步记录 TTFB,与首字节超时口径一致
-          session.recordTtfb();
-          logger.debug(
-            "[ResponseHandler] Gemini passthrough: First byte timeout cleared on response received",
-            {
-              providerId: provider.id,
-              providerName: provider.name,
-            }
-          );
-        }
+        // 注意:不要在“仅收到响应头”时清除首字节超时。
+        // 背景:部分上游可能会快速返回 200 + SSE headers,但随后长时间不发送任何 body 数据。
+        // 若在 headers 阶段就 clearResponseTimeout,会导致首字节超时失效,客户端与服务端都会表现为一直“请求中”。
+        // 透传场景下,我们在后台 stats 读取到第一块数据时再清除超时(与非透传路径口径一致)。
 
         const responseForStats = response.clone();
         const statusCode = response.status;
 
         const taskId = `stream-passthrough-${messageContext.id}`;
         const statsPromise = (async () => {
+          const sessionWithCleanup = session as typeof session & {
+            clearResponseTimeout?: () => void;
+          };
+          const sessionWithController = session as typeof session & {
+            responseController?: AbortController;
+          };
+
+          let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
+          // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险)
+          // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。
+          // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。
+          const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB
+          const MAX_STATS_BUFFER_CHUNKS = 8192;
+          const chunks: string[] = [];
+          const chunkBytes: number[] = [];
+          let chunkHead = 0;
+          let bufferedBytes = 0;
+          let wasTruncated = false;
+
+          const joinChunks = (): string => {
+            if (chunkHead <= 0) return chunks.join("");
+            return chunks.slice(chunkHead).join("");
+          };
+
+          const pushChunk = (text: string, bytes: number) => {
+            if (!text) return;
+            chunks.push(text);
+            chunkBytes.push(bytes);
+            bufferedBytes += bytes;
+
+            // 仅保留尾部窗口,避免内存无界增长
+            while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) {
+              bufferedBytes -= chunkBytes[chunkHead] ?? 0;
+              chunks[chunkHead] = "";
+              chunkBytes[chunkHead] = 0;
+              chunkHead += 1;
+              wasTruncated = true;
+            }
+
+            // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化
+            if (chunkHead > 4096) {
+              chunks.splice(0, chunkHead);
+              chunkBytes.splice(0, chunkHead);
+              chunkHead = 0;
+            }
+
+            // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限)
+            const keptCount = chunks.length - chunkHead;
+            if (keptCount > MAX_STATS_BUFFER_CHUNKS) {
+              const joined = joinChunks();
+              chunks.length = 0;
+              chunkBytes.length = 0;
+              chunkHead = 0;
+              chunks.push(joined);
+              chunkBytes.push(bufferedBytes);
+            }
+          };
+          const decoder = new TextDecoder();
+          let isFirstChunk = true;
+          let streamEndedNormally = false;
+          let responseTimeoutCleared = false;
+          let abortReason: string | undefined;
+
+          // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送)
+          const idleTimeoutMs =
+            provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity;
+          let idleTimeoutId: NodeJS.Timeout | null = null;
+          const clearIdleTimer = () => {
+            if (idleTimeoutId) {
+              clearTimeout(idleTimeoutId);
+              idleTimeoutId = null;
+            }
+          };
+          const startIdleTimer = () => {
+            if (idleTimeoutMs === Infinity) return;
+            clearIdleTimer();
+            idleTimeoutId = setTimeout(() => {
+              abortReason = "STREAM_IDLE_TIMEOUT";
+              logger.warn("[ResponseHandler] Gemini passthrough streaming idle timeout triggered", {
+                taskId,
+                providerId: provider.id,
+                providerName: provider.name,
+                idleTimeoutMs,
+                chunksCollected: Math.max(0, chunks.length - chunkHead),
+                bufferedBytes,
+                wasTruncated,
+              });
+              // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源
+              try {
+                sessionWithController.responseController?.abort(new Error("streaming_idle"));
+              } catch {
+                // ignore
+              }
+            }, idleTimeoutMs);
+          };
+
+          const clearResponseTimeoutOnce = (firstChunkSize?: number) => {
+            if (responseTimeoutCleared) return;
+            if (!sessionWithCleanup.clearResponseTimeout) return;
+            sessionWithCleanup.clearResponseTimeout();
+            responseTimeoutCleared = true;
+            if (firstChunkSize != null) {
+              logger.debug(
+                "[ResponseHandler] Gemini passthrough: First chunk received, response timeout cleared",
+                {
+                  taskId,
+                  providerId: provider.id,
+                  providerName: provider.name,
+                  firstChunkSize,
+                }
+              );
+            }
+          };
+
+          const flushAndJoin = (): string => {
+            const flushed = decoder.decode();
+            if (flushed) pushChunk(flushed, 0);
+            return joinChunks();
+          };
+
           try {
-            const reader = responseForStats.body?.getReader();
-            if (!reader) return;
+            const body = responseForStats.body;
+            if (!body) return;
+            reader = body.getReader();
 
             // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容:
             // - 用于解析 usage/cost 与内部结算(例如“假 200”检测)
             // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。
-            const chunks: string[] = [];
-            const decoder = new TextDecoder();
-            let isFirstChunk = true;
-            let streamEndedNormally = false;
-
             while (true) {
               if (session.clientAbortSignal?.aborted) break;
 
               const { done, value } = await reader.read();
               if (done) {
-                streamEndedNormally = true;
+                const wasResponseControllerAborted =
+                  sessionWithController.responseController?.signal.aborted ?? false;
+                const clientAborted = session.clientAbortSignal?.aborted ?? false;
+
+                // abort -> nodeStreamToWebStreamSafe 可能会把错误吞掉并 close(),导致 done=true;
+                // 这里必须结合 abort signal 判断是否为“自然结束”。
+                if (wasResponseControllerAborted || clientAborted) {
+                  streamEndedNormally = false;
+                  if (!abortReason) {
+                    abortReason = clientAborted ? "CLIENT_ABORTED" : "STREAM_RESPONSE_TIMEOUT";
+                  }
+                } else {
+                  streamEndedNormally = true;
+                }
                 break;
               }
-              if (value) {
+
+              const chunkSize = value?.byteLength ?? 0;
+              if (value && chunkSize > 0) {
                 if (isFirstChunk) {
                   isFirstChunk = false;
                   session.recordTtfb();
+                  clearResponseTimeoutOnce(chunkSize);
                 }
-                chunks.push(decoder.decode(value, { stream: true }));
+                pushChunk(decoder.decode(value, { stream: true }), chunkSize);
+              }
+
+              // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠)
+              if (!isFirstChunk) {
+                startIdleTimer();
               }
             }
 
-            const flushed = decoder.decode();
-            if (flushed) chunks.push(flushed);
-            const allContent = chunks.join("");
+            clearIdleTimer();
+            const allContent = flushAndJoin();
             const clientAborted = session.clientAbortSignal?.aborted ?? false;
 
             // 存储响应体到 Redis(5分钟过期)
-            if (session.sessionId) {
+            if (session.sessionId && !wasTruncated) {
               void SessionManager.storeSessionResponse(
                 session.sessionId,
                 allContent,
@@ -1036,6 +1160,13 @@ export class ProxyResponseHandler {
               ).catch((err) => {
                 logger.error("[ResponseHandler] Failed to store stream passthrough response:", err);
               });
+            } else if (session.sessionId && wasTruncated) {
+              logger.warn("[ResponseHandler] Skip storing passthrough response: body too large", {
+                taskId,
+                providerId: provider.id,
+                providerName: provider.name,
+                maxBytes: MAX_STATS_BUFFER_BYTES,
+              });
             }
 
             // 使用共享的统计处理方法
@@ -1045,7 +1176,8 @@ export class ProxyResponseHandler {
               allContent,
               statusCode,
               streamEndedNormally,
-              clientAborted
+              clientAborted,
+              abortReason
             );
             await finalizeRequestStats(
               session,
@@ -1056,10 +1188,129 @@ export class ProxyResponseHandler {
               finalized.providerIdForPersistence ?? undefined
             );
           } catch (error) {
-            if (!isClientAbortError(error as Error)) {
-              logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error);
+            const err = error instanceof Error ? error : new Error(String(error));
+            const clientAborted = session.clientAbortSignal?.aborted ?? false;
+            const isResponseControllerAborted =
+              sessionWithController.responseController?.signal.aborted ?? false;
+            const isIdleTimeout = !!err.message?.includes("streaming_idle");
+
+            abortReason =
+              abortReason ??
+              (clientAborted
+                ? "CLIENT_ABORTED"
+                : isIdleTimeout
+                  ? "STREAM_IDLE_TIMEOUT"
+                  : isResponseControllerAborted
+                    ? "STREAM_RESPONSE_TIMEOUT"
+                    : "STREAM_PROCESSING_ERROR");
+
+            // 透传的 stats 任务失败时,必须尽量落库并结束追踪,避免请求长期停留在“requesting”
+            logger.error("[ResponseHandler] Gemini passthrough stats task failed", {
+              taskId,
+              providerId: provider.id,
+              providerName: provider.name,
+              messageId: messageContext.id,
+              clientAborted,
+              isResponseControllerAborted,
+              isIdleTimeout,
+              abortReason,
+              errorName: err.name,
+              errorMessage: err.message || "(empty message)",
+            });
+
+            try {
+              clearIdleTimer();
+              const allContent = flushAndJoin();
+              const duration = Date.now() - session.startTime;
+
+              const finalized = await finalizeDeferredStreamingFinalizationIfNeeded(
+                session,
+                allContent,
+                statusCode,
+                false,
+                clientAborted,
+                abortReason
+              );
+
+              await finalizeRequestStats(
+                session,
+                allContent,
+                finalized.effectiveStatusCode,
+                duration,
+                finalized.errorMessage ?? abortReason,
+                finalized.providerIdForPersistence ?? undefined
+              );
+            } catch (finalizeError) {
+              await persistRequestFailure({
+                session,
+                messageContext,
+                statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
+                error: finalizeError,
+                taskId,
+                phase: "stream",
+              });
             }
           } finally {
+            clearIdleTimer();
+            // 兜底:在流结束/中断后清理首字节超时,避免定时器泄漏
+            // 注意:不应在流仍可能继续时清理(否则会让首字节超时失效)
+            try {
+              const wasResponseControllerAborted =
+                sessionWithController.responseController?.signal.aborted ?? false;
+              const clientAborted = session.clientAbortSignal?.aborted ?? false;
+              const shouldClearTimeout =
+                responseTimeoutCleared ||
+                streamEndedNormally ||
+                wasResponseControllerAborted ||
+                clientAborted;
+              if (shouldClearTimeout) {
+                clearResponseTimeoutOnce();
+              }
+            } catch (e) {
+              logger.warn(
+                "[ResponseHandler] Gemini passthrough: Failed to clear response timeout",
+                {
+                  taskId,
+                  providerId: provider.id,
+                  providerName: provider.name,
+                  error: e instanceof Error ? e.message : String(e),
+                }
+              );
+            }
+            try {
+              // 取消 tee 分支,避免 stats 任务提前退出时 backpressure 影响客户端透传
+              const cancelPromise = reader?.cancel();
+              if (cancelPromise) {
+                cancelPromise.catch((err) => {
+                  logger.warn(
+                    "[ResponseHandler] Gemini passthrough: Failed to cancel stats reader",
+                    {
+                      taskId,
+                      providerId: provider.id,
+                      providerName: provider.name,
+                      error: err instanceof Error ? err.message : String(err),
+                    }
+                  );
+                });
+              }
+            } catch (e) {
+              logger.warn("[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", {
+                taskId,
+                providerId: provider.id,
+                providerName: provider.name,
+                error: e instanceof Error ? e.message : String(e),
+              });
+            }
+            try {
+              reader?.releaseLock();
+            } catch (e) {
+              logger.warn("[ResponseHandler] Gemini passthrough: Failed to release reader lock", {
+                taskId,
+                providerId: provider.id,
+                providerName: provider.name,
+                error: e instanceof Error ? e.message : String(e),
+              });
+            }
             AsyncTaskManager.cleanup(taskId);
           }
         })();

+ 31 - 4
src/lib/proxy-agent/agent-pool.ts

@@ -356,11 +356,38 @@ export class AgentPoolImpl implements AgentPool {
   }
 
   private async closeAgent(agent: Dispatcher, key: string): Promise<void> {
+    // 防御性处理:极端情况下(例如 mock/第三方 dispatcher 异常)可能传入空值
+    if (!agent) return;
+
     try {
-      if (typeof agent.close === "function") {
-        await agent.close();
-      } else if (typeof agent.destroy === "function") {
-        await agent.destroy();
+      // 注意:优先 destroy。undici 的 close() 可能会等待 in-flight 请求结束(流式/卡住时会导致长期阻塞),
+      // 从而让 getAgent/evictEndpoint/cleanup 也被卡住,最终表现为“所有请求都卡在 requesting”。
+      // destroy() 会强制关闭底层连接,更适合作为驱逐/清理时的兜底手段。
+      const operation =
+        typeof agent.destroy === "function"
+          ? ("destroy" as const)
+          : typeof agent.close === "function"
+            ? ("close" as const)
+            : null;
+
+      // 关键点:驱逐/清理路径不能等待 in-flight(否则会把 getAgent() 也阻塞住,导致全局“requesting”)
+      // 因此这里发起 destroy/close 后不 await,仅记录异常,确保 eviction 始终快速返回。
+      if (operation === "destroy") {
+        agent.destroy().catch((error) => {
+          logger.warn("AgentPool: Error closing agent", {
+            key,
+            operation,
+            error: error instanceof Error ? error.message : String(error),
+          });
+        });
+      } else if (operation === "close") {
+        agent.close().catch((error) => {
+          logger.warn("AgentPool: Error closing agent", {
+            key,
+            operation,
+            error: error instanceof Error ? error.message : String(error),
+          });
+        });
       }
     } catch (error) {
       logger.warn("AgentPool: Error closing agent", {

+ 13 - 0
tests/unit/lib/endpoint-circuit-breaker.test.ts

@@ -40,6 +40,10 @@ describe("endpoint-circuit-breaker", () => {
     });
 
     vi.doMock("@/lib/logger", () => ({ logger: createLoggerMock() }));
+    const sendAlertMock = vi.fn(async () => {});
+    vi.doMock("@/lib/notification/notifier", () => ({
+      sendCircuitBreakerAlert: sendAlertMock,
+    }));
     vi.doMock("@/lib/redis/endpoint-circuit-breaker-state", () => ({
       loadEndpointCircuitState: loadMock,
       saveEndpointCircuitState: saveMock,
@@ -88,6 +92,15 @@ describe("endpoint-circuit-breaker", () => {
 
     await resetEndpointCircuit(1);
     expect(deleteMock).toHaveBeenCalledWith(1);
+
+    // 说明:recordEndpointFailure 在达到阈值后会触发异步告警(dynamic import + await)。
+    // 在 CI/bun 环境下,告警 Promise 可能在下一个测试开始后才完成,从而“借用”后续用例的 module mock,
+    // 导致 sendAlertMock 被额外调用而产生偶发失败。这里用真实计时器让事件循环前进,确保告警任务尽快落地。
+    vi.useRealTimers();
+    const startedAt = Date.now();
+    while (sendAlertMock.mock.calls.length === 0 && Date.now() - startedAt < 1000) {
+      await new Promise<void>((resolve) => setTimeout(resolve, 0));
+    }
   });
 
   test("recordEndpointSuccess: closed 且 failureCount>0 时应清零", async () => {

+ 86 - 0
tests/unit/lib/proxy-agent/agent-pool.test.ts

@@ -235,6 +235,64 @@ describe("AgentPool", () => {
       expect(result2.agent).not.toBe(result1.agent);
     });
 
+    it("should not hang when evicting an unhealthy agent whose close() never resolves", async () => {
+      // 说明:beforeEach 使用了 fake timers,但此用例需要依赖真实 setTimeout 做“防卡死”断言
+      await pool.shutdown();
+      vi.useRealTimers();
+
+      const realPool = new AgentPoolImpl(defaultConfig);
+
+      const withTimeout = async <T>(promise: Promise<T>, ms: number): Promise<T> => {
+        let timeoutId: ReturnType<typeof setTimeout> | null = null;
+        try {
+          return await Promise.race([
+            promise,
+            new Promise<T>((_, reject) => {
+              timeoutId = setTimeout(() => reject(new Error("timeout")), ms);
+            }),
+          ]);
+        } finally {
+          if (timeoutId) clearTimeout(timeoutId);
+        }
+      };
+
+      try {
+        const params = {
+          endpointUrl: "https://api.anthropic.com/v1/messages",
+          proxyUrl: null,
+          enableHttp2: true,
+        };
+
+        const result1 = await realPool.getAgent(params);
+        const agent1 = result1.agent as unknown as {
+          close?: () => Promise<void>;
+          destroy?: unknown;
+        };
+
+        // 强制走 close() 分支:模拟某些 dispatcher 不支持 destroy()
+        agent1.destroy = undefined;
+
+        // 模拟:close 可能因等待 in-flight 请求结束而长期不返回
+        let closeCalled = false;
+        agent1.close = () => {
+          closeCalled = true;
+          return new Promise<void>(() => {});
+        };
+
+        realPool.markUnhealthy(result1.cacheKey, "test-hang-close");
+
+        const result2 = await withTimeout(realPool.getAgent(params), 500);
+        expect(result2.isNew).toBe(true);
+        expect(result2.agent).not.toBe(result1.agent);
+
+        // 断言:即使 close() 处于 pending,也不会阻塞 getAgent(),且会触发 close 调用
+        expect(closeCalled).toBe(true);
+      } finally {
+        await realPool.shutdown();
+        vi.useFakeTimers();
+      }
+    });
+
     it("should track unhealthy agents in stats", async () => {
       const params = {
         endpointUrl: "https://api.anthropic.com/v1/messages",
@@ -443,6 +501,34 @@ describe("AgentPool", () => {
       const stats = pool.getPoolStats();
       expect(stats.cacheSize).toBe(0);
     });
+
+    it("should prefer destroy over close to avoid hanging on in-flight streaming requests", async () => {
+      const result = await pool.getAgent({
+        endpointUrl: "https://api.anthropic.com/v1/messages",
+        proxyUrl: null,
+        enableHttp2: true,
+      });
+
+      const agent = result.agent as unknown as {
+        close?: () => Promise<void>;
+        destroy?: () => Promise<void>;
+      };
+
+      // 模拟:close 可能因等待 in-flight 请求结束而长期不返回
+      if (typeof agent.close === "function") {
+        vi.mocked(agent.close).mockImplementation(() => new Promise<void>(() => {}));
+      }
+
+      await pool.shutdown();
+
+      // destroy 应被优先调用(避免 close 挂死导致 shutdown/evict 卡住)
+      if (typeof agent.destroy === "function") {
+        expect(agent.destroy).toHaveBeenCalled();
+      }
+      if (typeof agent.close === "function") {
+        expect(agent.close).not.toHaveBeenCalled();
+      }
+    });
   });
 });
 

+ 458 - 0
tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts

@@ -0,0 +1,458 @@
+import { createServer } from "node:http";
+import type { Socket } from "node:net";
+import { beforeEach, describe, expect, test, vi } from "vitest";
+import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder";
+import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler";
+import { ProxySession } from "@/app/v1/_lib/proxy/session";
+import type { Provider } from "@/types/provider";
+
+const asyncTasks: Promise<void>[] = [];
+
+const mocks = vi.hoisted(() => {
+  return {
+    isHttp2Enabled: vi.fn(async () => false),
+  };
+});
+
+beforeEach(() => {
+  mocks.isHttp2Enabled.mockReset();
+  mocks.isHttp2Enabled.mockResolvedValue(false);
+});
+
+vi.mock("@/lib/config", async (importOriginal) => {
+  const actual = await importOriginal<typeof import("@/lib/config")>();
+  return {
+    ...actual,
+    isHttp2Enabled: mocks.isHttp2Enabled,
+  };
+});
+
+vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({
+  ResponseFixer: {
+    process: async (_session: unknown, response: Response) => response,
+  },
+}));
+
+vi.mock("@/lib/async-task-manager", () => ({
+  AsyncTaskManager: {
+    register: (_taskId: string, promise: Promise<void>) => {
+      asyncTasks.push(promise);
+      return new AbortController();
+    },
+    cleanup: () => {},
+    cancel: () => {},
+  },
+}));
+
+vi.mock("@/lib/logger", () => ({
+  logger: {
+    debug: vi.fn(),
+    info: vi.fn(),
+    warn: vi.fn(),
+    trace: vi.fn(),
+    error: vi.fn(),
+  },
+}));
+
+vi.mock("@/repository/message", () => ({
+  updateMessageRequestCost: vi.fn(),
+  updateMessageRequestDetails: vi.fn(),
+  updateMessageRequestDuration: vi.fn(),
+}));
+
+vi.mock("@/repository/system-config", () => ({
+  getSystemSettings: vi.fn(async () => ({ billingModelSource: "original" })),
+}));
+
+vi.mock("@/repository/model-price", () => ({
+  findLatestPriceByModel: vi.fn(async () => ({
+    priceData: { input_cost_per_token: 0, output_cost_per_token: 0 },
+  })),
+}));
+
+vi.mock("@/lib/session-manager", () => ({
+  SessionManager: {
+    storeSessionResponse: vi.fn(),
+    updateSessionUsage: vi.fn(),
+  },
+}));
+
+vi.mock("@/lib/proxy-status-tracker", () => ({
+  ProxyStatusTracker: {
+    getInstance: () => ({
+      endRequest: () => {},
+    }),
+  },
+}));
+
+function createProvider(overrides: Partial<Provider> = {}): Provider {
+  return {
+    id: 1,
+    name: "p1",
+    url: "http://127.0.0.1:1",
+    key: "k",
+    providerVendorId: null,
+    isEnabled: true,
+    weight: 1,
+    priority: 0,
+    groupPriorities: null,
+    costMultiplier: 1,
+    groupTag: null,
+    providerType: "gemini",
+    preserveClientIp: false,
+    modelRedirects: null,
+    allowedModels: null,
+    mcpPassthroughType: "none",
+    mcpPassthroughUrl: null,
+    limit5hUsd: null,
+    limitDailyUsd: null,
+    dailyResetMode: "fixed",
+    dailyResetTime: "00:00",
+    limitWeeklyUsd: null,
+    limitMonthlyUsd: null,
+    limitTotalUsd: null,
+    totalCostResetAt: null,
+    limitConcurrentSessions: 0,
+    maxRetryAttempts: null,
+    circuitBreakerFailureThreshold: 5,
+    circuitBreakerOpenDuration: 1_800_000,
+    circuitBreakerHalfOpenSuccessThreshold: 2,
+    proxyUrl: null,
+    proxyFallbackToDirect: false,
+    firstByteTimeoutStreamingMs: 100,
+    streamingIdleTimeoutMs: 0,
+    requestTimeoutNonStreamingMs: 0,
+    websiteUrl: null,
+    faviconUrl: null,
+    cacheTtlPreference: null,
+    context1mPreference: null,
+    codexReasoningEffortPreference: null,
+    codexReasoningSummaryPreference: null,
+    codexTextVerbosityPreference: null,
+    codexParallelToolCallsPreference: null,
+    anthropicMaxTokensPreference: null,
+    anthropicThinkingBudgetPreference: null,
+    geminiGoogleSearchPreference: null,
+    tpm: 0,
+    rpm: 0,
+    rpd: 0,
+    cc: 0,
+    createdAt: new Date(),
+    updatedAt: new Date(),
+    deletedAt: null,
+    ...overrides,
+  };
+}
+
+function createSession(params: {
+  clientAbortSignal: AbortSignal;
+  messageId: number;
+  userId: number;
+}): ProxySession {
+  const headers = new Headers();
+  const session = Object.create(ProxySession.prototype);
+
+  Object.assign(session, {
+    startTime: Date.now(),
+    method: "POST",
+    requestUrl: new URL("https://example.com/v1/chat/completions"),
+    headers,
+    originalHeaders: new Headers(headers),
+    headerLog: JSON.stringify(Object.fromEntries(headers.entries())),
+    request: {
+      model: "gemini-2.0-flash",
+      log: "(test)",
+      message: {
+        model: "gemini-2.0-flash",
+        stream: true,
+        messages: [{ role: "user", content: "hi" }],
+      },
+    },
+    userAgent: null,
+    context: null,
+    clientAbortSignal: params.clientAbortSignal,
+    userName: "test-user",
+    authState: { success: true, user: null, key: null, apiKey: null },
+    provider: null,
+    messageContext: {
+      id: params.messageId,
+      createdAt: new Date(),
+      user: { id: params.userId, name: "u1" },
+    },
+    sessionId: null,
+    requestSequence: 1,
+    originalFormat: "gemini",
+    providerType: null,
+    originalModelName: null,
+    originalUrlPathname: null,
+    providerChain: [],
+    cacheTtlResolved: null,
+    context1mApplied: false,
+    specialSettings: [],
+    cachedPriceData: undefined,
+    cachedBillingModelSource: undefined,
+    isHeaderModified: () => false,
+  });
+
+  return session as ProxySession;
+}
+
+async function startSseServer(handler: Parameters<typeof createServer>[0]): Promise<{
+  baseUrl: string;
+  close: () => Promise<void>;
+}> {
+  const sockets = new Set<Socket>();
+  const server = createServer(handler);
+
+  server.on("connection", (socket) => {
+    sockets.add(socket);
+    socket.on("close", () => sockets.delete(socket));
+  });
+
+  const baseUrl = await new Promise<string>((resolve, reject) => {
+    server.once("error", reject);
+    server.listen(0, "127.0.0.1", () => {
+      const addr = server.address();
+      if (!addr || typeof addr === "string") {
+        reject(new Error("Failed to get server address"));
+        return;
+      }
+      resolve(`http://127.0.0.1:${addr.port}`);
+    });
+  });
+
+  const close = async () => {
+    for (const socket of sockets) {
+      try {
+        socket.destroy();
+      } catch {
+        // ignore
+      }
+    }
+    sockets.clear();
+    await new Promise<void>((resolve) => server.close(() => resolve()));
+  };
+
+  return { baseUrl, close };
+}
+
+async function readWithTimeout(
+  reader: ReadableStreamDefaultReader<Uint8Array>,
+  timeoutMs: number
+): Promise<
+  | { ok: true; value: ReadableStreamReadResult<Uint8Array> }
+  | { ok: true; error: unknown }
+  | { ok: false; reason: "timeout" }
+> {
+  const result = await Promise.race([
+    reader
+      .read()
+      .then((value) => ({ ok: true as const, value }))
+      .catch((error) => ({ ok: true as const, error })),
+    new Promise<{ ok: false; reason: "timeout" }>((resolve) =>
+      setTimeout(() => resolve({ ok: false as const, reason: "timeout" }), timeoutMs)
+    ),
+  ]);
+  return result;
+}
+
+describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => {
+  test("不应在仅收到 headers 时清除首字节超时:无首块数据时应在窗口内中断避免悬挂", async () => {
+    asyncTasks.length = 0;
+    const { baseUrl, close } = await startSseServer((_req, res) => {
+      res.writeHead(200, {
+        "content-type": "text/event-stream",
+        "cache-control": "no-cache",
+        connection: "keep-alive",
+      });
+      res.flushHeaders();
+      // 不发送任何 body,保持连接不结束
+    });
+
+    const clientAbortController = new AbortController();
+    try {
+      const provider = createProvider({
+        url: baseUrl,
+        firstByteTimeoutStreamingMs: 200,
+      });
+      const session = createSession({
+        clientAbortSignal: clientAbortController.signal,
+        messageId: 1,
+        userId: 1,
+      });
+      session.setProvider(provider);
+
+      const doForward = (
+        ProxyForwarder as unknown as {
+          doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
+        }
+      ).doForward;
+
+      const upstreamResponse = (await doForward.call(
+        ProxyForwarder,
+        session,
+        provider,
+        baseUrl
+      )) as Response;
+
+      const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
+      const reader = clientResponse.body?.getReader();
+      expect(reader).toBeTruthy();
+      if (!reader) throw new Error("Missing body reader");
+
+      const startedAt = Date.now();
+      const firstRead = await readWithTimeout(reader, 1500);
+      if (!firstRead.ok) {
+        clientAbortController.abort(new Error("test_timeout"));
+        throw new Error("首字节超时未生效:读首块数据在 1.5s 内仍未返回(可能仍会卡死)");
+      }
+
+      // 断言:应由超时/中断导致读取结束(done=true 或抛错均可)
+      const ended = ("value" in firstRead && firstRead.value.done === true) || "error" in firstRead;
+      expect(ended).toBe(true);
+
+      // 断言:responseController 应已触发 abort(即首字节超时生效)
+      const sessionWithController = session as unknown as { responseController?: AbortController };
+      expect(sessionWithController.responseController?.signal.aborted).toBe(true);
+
+      // 粗略时间断言:不应立即返回(避免“无关早退”导致假阳性)
+      const elapsed = Date.now() - startedAt;
+      expect(elapsed).toBeGreaterThanOrEqual(120);
+    } finally {
+      clientAbortController.abort(new Error("test_cleanup"));
+      await close();
+      await Promise.allSettled(asyncTasks);
+    }
+  });
+
+  test("收到首块数据后应清除首字节超时:后续 chunk 即使晚于 firstByteTimeout 也不应被误中断", async () => {
+    asyncTasks.length = 0;
+    const { baseUrl, close } = await startSseServer((_req, res) => {
+      res.writeHead(200, {
+        "content-type": "text/event-stream",
+        "cache-control": "no-cache",
+        connection: "keep-alive",
+      });
+      res.flushHeaders();
+      res.write('data: {"x":1}\n\n');
+      setTimeout(() => {
+        try {
+          res.write('data: {"x":2}\n\n');
+          res.end();
+        } catch {
+          // ignore
+        }
+      }, 150);
+    });
+
+    const clientAbortController = new AbortController();
+    try {
+      const provider = createProvider({
+        url: baseUrl,
+        firstByteTimeoutStreamingMs: 100,
+        streamingIdleTimeoutMs: 0,
+      });
+      const session = createSession({
+        clientAbortSignal: clientAbortController.signal,
+        messageId: 2,
+        userId: 1,
+      });
+      session.setProvider(provider);
+
+      const doForward = (
+        ProxyForwarder as unknown as {
+          doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
+        }
+      ).doForward;
+
+      const upstreamResponse = (await doForward.call(
+        ProxyForwarder,
+        session,
+        provider,
+        baseUrl
+      )) as Response;
+
+      const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
+      const fullText = await Promise.race([
+        clientResponse.text(),
+        new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1500)),
+      ]);
+      if (fullText === "timeout") {
+        clientAbortController.abort(new Error("test_timeout"));
+        throw new Error("读取透传响应超时(可能仍会卡死)");
+      }
+
+      // 第二块数据在 150ms 发送,若首字节超时未被清除,则 100ms 左右就会被中断拿不到第二块
+      expect(fullText).toContain('"x":2');
+    } finally {
+      clientAbortController.abort(new Error("test_cleanup"));
+      await close();
+      await Promise.allSettled(asyncTasks);
+    }
+  });
+
+  test("中途静默超过 streamingIdleTimeoutMs 时应中断,避免 200 跑到一半卡死", async () => {
+    asyncTasks.length = 0;
+    const { baseUrl, close } = await startSseServer((_req, res) => {
+      res.writeHead(200, {
+        "content-type": "text/event-stream",
+        "cache-control": "no-cache",
+        connection: "keep-alive",
+      });
+      res.flushHeaders();
+      res.write('data: {"x":1}\n\n');
+      // 不再发送数据,也不结束连接
+    });
+
+    const clientAbortController = new AbortController();
+    try {
+      const provider = createProvider({
+        url: baseUrl,
+        firstByteTimeoutStreamingMs: 1000,
+        streamingIdleTimeoutMs: 120,
+      });
+      const session = createSession({
+        clientAbortSignal: clientAbortController.signal,
+        messageId: 3,
+        userId: 1,
+      });
+      session.setProvider(provider);
+
+      const doForward = (
+        ProxyForwarder as unknown as {
+          doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
+        }
+      ).doForward;
+
+      const upstreamResponse = (await doForward.call(
+        ProxyForwarder,
+        session,
+        provider,
+        baseUrl
+      )) as Response;
+
+      const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
+      const reader = clientResponse.body?.getReader();
+      expect(reader).toBeTruthy();
+      if (!reader) throw new Error("Missing body reader");
+
+      const first = await readWithTimeout(reader, 1000);
+      expect(first.ok).toBe(true);
+      if (!("value" in first)) {
+        throw new Error("首块数据读取异常:预期拿到 value,但得到 error");
+      }
+      expect(first.value.done).toBe(false);
+
+      // 静默超时触发后,后续 read 应该在合理时间内结束(done=true 或抛错均可)
+      const second = await readWithTimeout(reader, 1500);
+      if (!second.ok) {
+        clientAbortController.abort(new Error("test_timeout"));
+        throw new Error("流式静默超时未生效:读后续数据在 1.5s 内仍未返回(可能仍会卡死)");
+      }
+    } finally {
+      clientAbortController.abort(new Error("test_cleanup"));
+      await close();
+      await Promise.allSettled(asyncTasks);
+    }
+  });
+});