| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- import { createServer } from "node:http";
- import type { Socket } from "node:net";
- import { beforeEach, describe, expect, test, vi } from "vitest";
- import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder";
- import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler";
- import { ProxySession } from "@/app/v1/_lib/proxy/session";
- import type { Provider } from "@/types/provider";
- const asyncTasks: Promise<void>[] = [];
- const mocks = vi.hoisted(() => {
- return {
- isHttp2Enabled: vi.fn(async () => false),
- };
- });
- beforeEach(() => {
- mocks.isHttp2Enabled.mockReset();
- mocks.isHttp2Enabled.mockResolvedValue(false);
- });
- vi.mock("@/lib/config", async (importOriginal) => {
- const actual = await importOriginal<typeof import("@/lib/config")>();
- return {
- ...actual,
- isHttp2Enabled: mocks.isHttp2Enabled,
- };
- });
- vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({
- ResponseFixer: {
- process: async (_session: unknown, response: Response) => response,
- },
- }));
- vi.mock("@/lib/async-task-manager", () => ({
- AsyncTaskManager: {
- register: (_taskId: string, promise: Promise<void>) => {
- asyncTasks.push(promise);
- return new AbortController();
- },
- cleanup: () => {},
- cancel: () => {},
- },
- }));
- vi.mock("@/lib/logger", () => ({
- logger: {
- debug: vi.fn(),
- info: vi.fn(),
- warn: vi.fn(),
- trace: vi.fn(),
- error: vi.fn(),
- },
- }));
- vi.mock("@/repository/message", () => ({
- updateMessageRequestCost: vi.fn(),
- updateMessageRequestDetails: vi.fn(),
- updateMessageRequestDuration: vi.fn(),
- }));
- vi.mock("@/repository/system-config", () => ({
- getSystemSettings: vi.fn(async () => ({ billingModelSource: "original" })),
- }));
- vi.mock("@/repository/model-price", () => ({
- findLatestPriceByModel: vi.fn(async () => ({
- priceData: { input_cost_per_token: 0, output_cost_per_token: 0 },
- })),
- }));
- vi.mock("@/lib/session-manager", () => ({
- SessionManager: {
- storeSessionResponse: vi.fn(),
- updateSessionUsage: vi.fn(),
- },
- }));
- vi.mock("@/lib/proxy-status-tracker", () => ({
- ProxyStatusTracker: {
- getInstance: () => ({
- endRequest: () => {},
- }),
- },
- }));
- function createProvider(overrides: Partial<Provider> = {}): Provider {
- return {
- id: 1,
- name: "p1",
- url: "http://127.0.0.1:1",
- key: "k",
- providerVendorId: null,
- isEnabled: true,
- weight: 1,
- priority: 0,
- groupPriorities: null,
- costMultiplier: 1,
- groupTag: null,
- providerType: "gemini",
- preserveClientIp: false,
- modelRedirects: null,
- allowedModels: null,
- mcpPassthroughType: "none",
- mcpPassthroughUrl: null,
- limit5hUsd: null,
- limitDailyUsd: null,
- dailyResetMode: "fixed",
- dailyResetTime: "00:00",
- limitWeeklyUsd: null,
- limitMonthlyUsd: null,
- limitTotalUsd: null,
- totalCostResetAt: null,
- limitConcurrentSessions: 0,
- maxRetryAttempts: null,
- circuitBreakerFailureThreshold: 5,
- circuitBreakerOpenDuration: 1_800_000,
- circuitBreakerHalfOpenSuccessThreshold: 2,
- proxyUrl: null,
- proxyFallbackToDirect: false,
- firstByteTimeoutStreamingMs: 100,
- streamingIdleTimeoutMs: 0,
- requestTimeoutNonStreamingMs: 0,
- websiteUrl: null,
- faviconUrl: null,
- cacheTtlPreference: null,
- context1mPreference: null,
- codexReasoningEffortPreference: null,
- codexReasoningSummaryPreference: null,
- codexTextVerbosityPreference: null,
- codexParallelToolCallsPreference: null,
- anthropicMaxTokensPreference: null,
- anthropicThinkingBudgetPreference: null,
- geminiGoogleSearchPreference: null,
- tpm: 0,
- rpm: 0,
- rpd: 0,
- cc: 0,
- createdAt: new Date(),
- updatedAt: new Date(),
- deletedAt: null,
- ...overrides,
- };
- }
- function createSession(params: {
- clientAbortSignal: AbortSignal;
- messageId: number;
- userId: number;
- }): ProxySession {
- const headers = new Headers();
- const session = Object.create(ProxySession.prototype);
- Object.assign(session, {
- startTime: Date.now(),
- method: "POST",
- requestUrl: new URL("https://example.com/v1/chat/completions"),
- headers,
- originalHeaders: new Headers(headers),
- headerLog: JSON.stringify(Object.fromEntries(headers.entries())),
- request: {
- model: "gemini-2.0-flash",
- log: "(test)",
- message: {
- model: "gemini-2.0-flash",
- stream: true,
- messages: [{ role: "user", content: "hi" }],
- },
- },
- userAgent: null,
- context: null,
- clientAbortSignal: params.clientAbortSignal,
- userName: "test-user",
- authState: { success: true, user: null, key: null, apiKey: null },
- provider: null,
- messageContext: {
- id: params.messageId,
- createdAt: new Date(),
- user: { id: params.userId, name: "u1" },
- },
- sessionId: null,
- requestSequence: 1,
- originalFormat: "gemini",
- providerType: null,
- originalModelName: null,
- originalUrlPathname: null,
- providerChain: [],
- cacheTtlResolved: null,
- context1mApplied: false,
- specialSettings: [],
- cachedPriceData: undefined,
- cachedBillingModelSource: undefined,
- isHeaderModified: () => false,
- });
- return session as ProxySession;
- }
- async function startSseServer(handler: Parameters<typeof createServer>[0]): Promise<{
- baseUrl: string;
- close: () => Promise<void>;
- }> {
- const sockets = new Set<Socket>();
- const server = createServer(handler);
- server.on("connection", (socket) => {
- sockets.add(socket);
- socket.on("close", () => sockets.delete(socket));
- });
- const baseUrl = await new Promise<string>((resolve, reject) => {
- server.once("error", reject);
- server.listen(0, "127.0.0.1", () => {
- const addr = server.address();
- if (!addr || typeof addr === "string") {
- reject(new Error("Failed to get server address"));
- return;
- }
- resolve(`http://127.0.0.1:${addr.port}`);
- });
- });
- const close = async () => {
- for (const socket of sockets) {
- try {
- socket.destroy();
- } catch {
- // ignore
- }
- }
- sockets.clear();
- await new Promise<void>((resolve) => server.close(() => resolve()));
- };
- return { baseUrl, close };
- }
- async function readWithTimeout(
- reader: ReadableStreamDefaultReader<Uint8Array>,
- timeoutMs: number
- ): Promise<
- | { ok: true; value: ReadableStreamReadResult<Uint8Array> }
- | { ok: true; error: unknown }
- | { ok: false; reason: "timeout" }
- > {
- const result = await Promise.race([
- reader
- .read()
- .then((value) => ({ ok: true as const, value }))
- .catch((error) => ({ ok: true as const, error })),
- new Promise<{ ok: false; reason: "timeout" }>((resolve) =>
- setTimeout(() => resolve({ ok: false as const, reason: "timeout" }), timeoutMs)
- ),
- ]);
- return result;
- }
- describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => {
- test("不应在仅收到 headers 时清除首字节超时:无首块数据时应在窗口内中断避免悬挂", async () => {
- asyncTasks.length = 0;
- const { baseUrl, close } = await startSseServer((_req, res) => {
- res.writeHead(200, {
- "content-type": "text/event-stream",
- "cache-control": "no-cache",
- connection: "keep-alive",
- });
- res.flushHeaders();
- // 不发送任何 body,保持连接不结束
- });
- const clientAbortController = new AbortController();
- try {
- const provider = createProvider({
- url: baseUrl,
- firstByteTimeoutStreamingMs: 200,
- });
- const session = createSession({
- clientAbortSignal: clientAbortController.signal,
- messageId: 1,
- userId: 1,
- });
- session.setProvider(provider);
- const doForward = (
- ProxyForwarder as unknown as {
- doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
- }
- ).doForward;
- const upstreamResponse = (await doForward.call(
- ProxyForwarder,
- session,
- provider,
- baseUrl
- )) as Response;
- const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
- const reader = clientResponse.body?.getReader();
- expect(reader).toBeTruthy();
- if (!reader) throw new Error("Missing body reader");
- const startedAt = Date.now();
- const firstRead = await readWithTimeout(reader, 1500);
- if (!firstRead.ok) {
- clientAbortController.abort(new Error("test_timeout"));
- throw new Error("首字节超时未生效:读首块数据在 1.5s 内仍未返回(可能仍会卡死)");
- }
- // 断言:应由超时/中断导致读取结束(done=true 或抛错均可)
- const ended = ("value" in firstRead && firstRead.value.done === true) || "error" in firstRead;
- expect(ended).toBe(true);
- // 断言:responseController 应已触发 abort(即首字节超时生效)
- const sessionWithController = session as unknown as { responseController?: AbortController };
- expect(sessionWithController.responseController?.signal.aborted).toBe(true);
- // 粗略时间断言:不应立即返回(避免“无关早退”导致假阳性)
- const elapsed = Date.now() - startedAt;
- expect(elapsed).toBeGreaterThanOrEqual(120);
- } finally {
- clientAbortController.abort(new Error("test_cleanup"));
- await close();
- await Promise.allSettled(asyncTasks);
- }
- });
- test("收到首块数据后应清除首字节超时:后续 chunk 即使晚于 firstByteTimeout 也不应被误中断", async () => {
- asyncTasks.length = 0;
- const { baseUrl, close } = await startSseServer((_req, res) => {
- res.writeHead(200, {
- "content-type": "text/event-stream",
- "cache-control": "no-cache",
- connection: "keep-alive",
- });
- res.flushHeaders();
- res.write('data: {"x":1}\n\n');
- setTimeout(() => {
- try {
- res.write('data: {"x":2}\n\n');
- res.end();
- } catch {
- // ignore
- }
- }, 150);
- });
- const clientAbortController = new AbortController();
- try {
- const provider = createProvider({
- url: baseUrl,
- firstByteTimeoutStreamingMs: 100,
- streamingIdleTimeoutMs: 0,
- });
- const session = createSession({
- clientAbortSignal: clientAbortController.signal,
- messageId: 2,
- userId: 1,
- });
- session.setProvider(provider);
- const doForward = (
- ProxyForwarder as unknown as {
- doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
- }
- ).doForward;
- const upstreamResponse = (await doForward.call(
- ProxyForwarder,
- session,
- provider,
- baseUrl
- )) as Response;
- const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
- const fullText = await Promise.race([
- clientResponse.text(),
- new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1500)),
- ]);
- if (fullText === "timeout") {
- clientAbortController.abort(new Error("test_timeout"));
- throw new Error("读取透传响应超时(可能仍会卡死)");
- }
- // 第二块数据在 150ms 发送,若首字节超时未被清除,则 100ms 左右就会被中断拿不到第二块
- expect(fullText).toContain('"x":2');
- } finally {
- clientAbortController.abort(new Error("test_cleanup"));
- await close();
- await Promise.allSettled(asyncTasks);
- }
- });
- test("中途静默超过 streamingIdleTimeoutMs 时应中断,避免 200 跑到一半卡死", async () => {
- asyncTasks.length = 0;
- const { baseUrl, close } = await startSseServer((_req, res) => {
- res.writeHead(200, {
- "content-type": "text/event-stream",
- "cache-control": "no-cache",
- connection: "keep-alive",
- });
- res.flushHeaders();
- res.write('data: {"x":1}\n\n');
- // 不再发送数据,也不结束连接
- });
- const clientAbortController = new AbortController();
- try {
- const provider = createProvider({
- url: baseUrl,
- firstByteTimeoutStreamingMs: 1000,
- streamingIdleTimeoutMs: 120,
- });
- const session = createSession({
- clientAbortSignal: clientAbortController.signal,
- messageId: 3,
- userId: 1,
- });
- session.setProvider(provider);
- const doForward = (
- ProxyForwarder as unknown as {
- doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown;
- }
- ).doForward;
- const upstreamResponse = (await doForward.call(
- ProxyForwarder,
- session,
- provider,
- baseUrl
- )) as Response;
- const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse);
- const reader = clientResponse.body?.getReader();
- expect(reader).toBeTruthy();
- if (!reader) throw new Error("Missing body reader");
- const first = await readWithTimeout(reader, 1000);
- expect(first.ok).toBe(true);
- if (!("value" in first)) {
- throw new Error("首块数据读取异常:预期拿到 value,但得到 error");
- }
- expect(first.value.done).toBe(false);
- // 静默超时触发后,后续 read 应该在合理时间内结束(done=true 或抛错均可)
- const second = await readWithTimeout(reader, 1500);
- if (!second.ok) {
- clientAbortController.abort(new Error("test_timeout"));
- throw new Error("流式静默超时未生效:读后续数据在 1.5s 内仍未返回(可能仍会卡死)");
- }
- } finally {
- clientAbortController.abort(new Error("test_cleanup"));
- await close();
- await Promise.allSettled(asyncTasks);
- }
- });
- });
|