| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866 |
- /**
- * Agent Pool Tests
- *
- * TDD: Tests written first, implementation follows
- */
- import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
- // Mock undici before importing agent-pool
- vi.mock("undici", () => ({
- Agent: vi.fn().mockImplementation((options) => ({
- options,
- close: vi.fn().mockResolvedValue(undefined),
- destroy: vi.fn().mockResolvedValue(undefined),
- })),
- ProxyAgent: vi.fn().mockImplementation((options) => ({
- options,
- close: vi.fn().mockResolvedValue(undefined),
- destroy: vi.fn().mockResolvedValue(undefined),
- })),
- }));
- vi.mock("fetch-socks", () => ({
- socksDispatcher: vi.fn().mockImplementation((proxy, options) => ({
- proxy,
- options,
- close: vi.fn().mockResolvedValue(undefined),
- destroy: vi.fn().mockResolvedValue(undefined),
- })),
- }));
- import {
- type AgentPool,
- AgentPoolImpl,
- generateAgentCacheKey,
- getGlobalAgentPool,
- resetGlobalAgentPool,
- type AgentPoolConfig,
- } from "@/lib/proxy-agent/agent-pool";
- describe("generateAgentCacheKey", () => {
- it("should generate correct cache key for direct connection", () => {
- const key = generateAgentCacheKey({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- expect(key).toBe("https://api.anthropic.com|direct|h1");
- });
- it("should generate correct cache key with proxy", () => {
- const key = generateAgentCacheKey({
- endpointUrl: "https://api.openai.com/v1/chat/completions",
- proxyUrl: "http://proxy.example.com:8080",
- enableHttp2: false,
- });
- expect(key).toBe("https://api.openai.com|http://proxy.example.com:8080|h1");
- });
- it("should generate correct cache key with HTTP/2 enabled", () => {
- const key = generateAgentCacheKey({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: true,
- });
- expect(key).toBe("https://api.anthropic.com|direct|h2");
- });
- it("should generate correct cache key with proxy and HTTP/2", () => {
- const key = generateAgentCacheKey({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: "https://secure-proxy.example.com:443",
- enableHttp2: true,
- });
- // URL API strips default port 443 for HTTPS
- expect(key).toBe("https://api.anthropic.com|https://secure-proxy.example.com|h2");
- });
- it("should use origin only (strip path and query)", () => {
- const key = generateAgentCacheKey({
- endpointUrl: "https://api.anthropic.com/v1/messages?key=value",
- proxyUrl: null,
- enableHttp2: false,
- });
- expect(key).toBe("https://api.anthropic.com|direct|h1");
- });
- it("should handle different ports", () => {
- const key = generateAgentCacheKey({
- endpointUrl: "https://api.example.com:8443/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- expect(key).toBe("https://api.example.com:8443|direct|h1");
- });
- it("should differentiate HTTP and HTTPS", () => {
- const httpKey = generateAgentCacheKey({
- endpointUrl: "http://api.example.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- const httpsKey = generateAgentCacheKey({
- endpointUrl: "https://api.example.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- expect(httpKey).not.toBe(httpsKey);
- expect(httpKey).toBe("http://api.example.com|direct|h1");
- expect(httpsKey).toBe("https://api.example.com|direct|h1");
- });
- });
- describe("AgentPool", () => {
- let pool: AgentPool;
- const defaultConfig: AgentPoolConfig = {
- maxTotalAgents: 10,
- agentTtlMs: 300000, // 5 minutes
- connectionIdleTimeoutMs: 60000, // 1 minute
- cleanupIntervalMs: 30000, // 30 seconds
- };
- beforeEach(() => {
- vi.useFakeTimers();
- pool = new AgentPoolImpl(defaultConfig);
- });
- afterEach(async () => {
- await pool.shutdown();
- vi.useRealTimers();
- vi.clearAllMocks();
- });
- describe("caching behavior", () => {
- it("should reuse Agent for same endpoint", async () => {
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- const result1 = await pool.getAgent(params);
- const result2 = await pool.getAgent(params);
- expect(result1.cacheKey).toBe(result2.cacheKey);
- expect(result1.agent).toBe(result2.agent);
- expect(result1.isNew).toBe(true);
- expect(result2.isNew).toBe(false);
- });
- it("should create different Agent for different endpoints", async () => {
- const result1 = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- const result2 = await pool.getAgent({
- endpointUrl: "https://api.openai.com/v1/chat/completions",
- proxyUrl: null,
- enableHttp2: false,
- });
- expect(result1.cacheKey).not.toBe(result2.cacheKey);
- expect(result1.agent).not.toBe(result2.agent);
- expect(result1.isNew).toBe(true);
- expect(result2.isNew).toBe(true);
- });
- it("should create different Agent for different proxy configs", async () => {
- const result1 = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- const result2 = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: "http://proxy.example.com:8080",
- enableHttp2: false,
- });
- expect(result1.cacheKey).not.toBe(result2.cacheKey);
- expect(result1.agent).not.toBe(result2.agent);
- });
- it("should create different Agent for HTTP/2 vs HTTP/1.1", async () => {
- const result1 = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- const result2 = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: true,
- });
- expect(result1.cacheKey).not.toBe(result2.cacheKey);
- expect(result1.agent).not.toBe(result2.agent);
- });
- it("should track request count", async () => {
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- await pool.getAgent(params);
- await pool.getAgent(params);
- await pool.getAgent(params);
- const stats = pool.getPoolStats();
- expect(stats.totalRequests).toBe(3);
- expect(stats.cacheHits).toBe(2);
- expect(stats.cacheMisses).toBe(1);
- });
- });
- describe("health management", () => {
- it("should create new Agent after marking unhealthy", async () => {
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- const result1 = await pool.getAgent(params);
- pool.markUnhealthy(result1.cacheKey, "SSL certificate error");
- const result2 = await pool.getAgent(params);
- expect(result2.isNew).toBe(true);
- expect(result2.agent).not.toBe(result1.agent);
- });
- it("should not hang when evicting an unhealthy agent whose close() never resolves", async () => {
- // 说明:beforeEach 使用了 fake timers,但此用例需要依赖真实 setTimeout 做“防卡死”断言
- await pool.shutdown();
- vi.useRealTimers();
- const realPool = new AgentPoolImpl(defaultConfig);
- const withTimeout = async <T>(promise: Promise<T>, ms: number): Promise<T> => {
- let timeoutId: ReturnType<typeof setTimeout> | null = null;
- try {
- return await Promise.race([
- promise,
- new Promise<T>((_, reject) => {
- timeoutId = setTimeout(() => reject(new Error("timeout")), ms);
- }),
- ]);
- } finally {
- if (timeoutId) clearTimeout(timeoutId);
- }
- };
- try {
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: true,
- };
- const result1 = await realPool.getAgent(params);
- const agent1 = result1.agent as unknown as {
- close?: () => Promise<void>;
- destroy?: unknown;
- };
- // 强制走 close() 分支:模拟某些 dispatcher 不支持 destroy()
- agent1.destroy = undefined;
- // 模拟:close 可能因等待 in-flight 请求结束而长期不返回
- let closeCalled = false;
- agent1.close = () => {
- closeCalled = true;
- return new Promise<void>(() => {});
- };
- realPool.markUnhealthy(result1.cacheKey, "test-hang-close");
- const result2 = await withTimeout(realPool.getAgent(params), 500);
- expect(result2.isNew).toBe(true);
- expect(result2.agent).not.toBe(result1.agent);
- // 断言:即使 close() 处于 pending,也不会阻塞 getAgent(),且会触发 close 调用
- expect(closeCalled).toBe(true);
- } finally {
- await realPool.shutdown();
- vi.useFakeTimers();
- }
- });
- it("should track unhealthy agents in stats", async () => {
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- const result = await pool.getAgent(params);
- pool.markUnhealthy(result.cacheKey, "SSL certificate error");
- const stats = pool.getPoolStats();
- expect(stats.unhealthyAgents).toBe(1);
- });
- it("should evict all Agents for endpoint on evictEndpoint", async () => {
- // Create agents for same endpoint with different configs
- await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: true,
- });
- await pool.getAgent({
- endpointUrl: "https://api.openai.com/v1/chat/completions",
- proxyUrl: null,
- enableHttp2: false,
- });
- const statsBefore = pool.getPoolStats();
- expect(statsBefore.cacheSize).toBe(3);
- await pool.evictEndpoint("https://api.anthropic.com");
- const statsAfter = pool.getPoolStats();
- expect(statsAfter.cacheSize).toBe(1);
- expect(statsAfter.evictedAgents).toBe(2);
- });
- });
- describe("expiration cleanup", () => {
- it("should cleanup expired Agents", async () => {
- const shortTtlPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000, // 1 second TTL
- });
- const { cacheKey, dispatcherId } = await shortTtlPool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- expect(shortTtlPool.getPoolStats().cacheSize).toBe(1);
- // Release the agent (simulates request completion)
- shortTtlPool.releaseAgent(cacheKey, dispatcherId);
- // Advance time past TTL
- vi.advanceTimersByTime(2000);
- const cleaned = await shortTtlPool.cleanup();
- expect(cleaned).toBe(1);
- expect(shortTtlPool.getPoolStats().cacheSize).toBe(0);
- await shortTtlPool.shutdown();
- });
- it("should not cleanup recently used Agents", async () => {
- const shortTtlPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- await shortTtlPool.getAgent(params);
- // Advance time but not past TTL
- vi.advanceTimersByTime(500);
- // Use the agent again (updates lastUsedAt)
- await shortTtlPool.getAgent(params);
- // Advance time again
- vi.advanceTimersByTime(500);
- const cleaned = await shortTtlPool.cleanup();
- expect(cleaned).toBe(0);
- expect(shortTtlPool.getPoolStats().cacheSize).toBe(1);
- await shortTtlPool.shutdown();
- });
- it("should implement LRU eviction when max size reached", async () => {
- const smallPool = new AgentPoolImpl({
- ...defaultConfig,
- maxTotalAgents: 2,
- });
- // Create 3 agents (exceeds max of 2)
- await smallPool.getAgent({
- endpointUrl: "https://api1.example.com/v1",
- proxyUrl: null,
- enableHttp2: false,
- });
- vi.advanceTimersByTime(100);
- await smallPool.getAgent({
- endpointUrl: "https://api2.example.com/v1",
- proxyUrl: null,
- enableHttp2: false,
- });
- vi.advanceTimersByTime(100);
- await smallPool.getAgent({
- endpointUrl: "https://api3.example.com/v1",
- proxyUrl: null,
- enableHttp2: false,
- });
- // Should have evicted the oldest (LRU)
- const stats = smallPool.getPoolStats();
- expect(stats.cacheSize).toBeLessThanOrEqual(2);
- await smallPool.shutdown();
- });
- });
- describe("reference counting", () => {
- it("should prevent expiration of agents with active requests", async () => {
- const shortTtlPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- // getAgent increments activeRequests
- await shortTtlPool.getAgent(params);
- expect(shortTtlPool.getPoolStats().activeRequests).toBe(1);
- // Advance past TTL
- vi.advanceTimersByTime(2000);
- // Should NOT be evicted because activeRequests > 0
- const cleaned = await shortTtlPool.cleanup();
- expect(cleaned).toBe(0);
- expect(shortTtlPool.getPoolStats().cacheSize).toBe(1);
- await shortTtlPool.shutdown();
- });
- it("should allow expiration after all requests are released", async () => {
- const shortTtlPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- const { cacheKey, dispatcherId } = await shortTtlPool.getAgent(params);
- shortTtlPool.releaseAgent(cacheKey, dispatcherId);
- expect(shortTtlPool.getPoolStats().activeRequests).toBe(0);
- // Advance past TTL
- vi.advanceTimersByTime(2000);
- const cleaned = await shortTtlPool.cleanup();
- expect(cleaned).toBe(1);
- expect(shortTtlPool.getPoolStats().cacheSize).toBe(0);
- await shortTtlPool.shutdown();
- });
- it("should track multiple sequential requests correctly", async () => {
- const shortTtlPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- // 3 sequential requests to same agent (exercises the cache-hit path)
- const r1 = await shortTtlPool.getAgent(params);
- await shortTtlPool.getAgent(params);
- await shortTtlPool.getAgent(params);
- expect(shortTtlPool.getPoolStats().activeRequests).toBe(3);
- // Release 2
- shortTtlPool.releaseAgent(r1.cacheKey, r1.dispatcherId);
- shortTtlPool.releaseAgent(r1.cacheKey, r1.dispatcherId);
- expect(shortTtlPool.getPoolStats().activeRequests).toBe(1);
- // Advance past TTL - should NOT be evicted
- vi.advanceTimersByTime(2000);
- const cleaned1 = await shortTtlPool.cleanup();
- expect(cleaned1).toBe(0);
- // Release last request
- shortTtlPool.releaseAgent(r1.cacheKey, r1.dispatcherId);
- expect(shortTtlPool.getPoolStats().activeRequests).toBe(0);
- // Now advance past TTL - should be evicted
- vi.advanceTimersByTime(2000);
- const cleaned2 = await shortTtlPool.cleanup();
- expect(cleaned2).toBe(1);
- await shortTtlPool.shutdown();
- });
- it("should count pending-creation waiters in activeRequests", async () => {
- // ⭐ 回归用例:真正模拟"首次创建阶段的并发请求"
- // 使用 vi.useRealTimers() 并通过 spy 阻塞 createAgent 以强制等待者走 pendingCreations 路径。
- // 如果 waiter 未正确递增 activeRequests,下面 expect(3) 会退化为 1 或 2。
- vi.useRealTimers();
- try {
- const concurrentPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 60_000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- // 用 deferred 控制 createAgent 的完成时机,保证后续 getAgent 都落入 pendingCreations
- let releaseCreate: (() => void) | null = null;
- const createBlocker = new Promise<void>((resolve) => {
- releaseCreate = resolve;
- });
- // biome-ignore lint/suspicious/noExplicitAny: private 方法 spy
- const createSpy = vi.spyOn(concurrentPool as any, "createAgent");
- createSpy.mockImplementationOnce(async () => {
- await createBlocker;
- return {
- close: vi.fn().mockResolvedValue(undefined),
- destroy: vi.fn().mockResolvedValue(undefined),
- options: {},
- };
- });
- // 同时发起 3 个请求:首个进入 createAgentWithCache;后两个必须走 pendingCreations
- const p1 = concurrentPool.getAgent(params);
- const p2 = concurrentPool.getAgent(params);
- const p3 = concurrentPool.getAgent(params);
- // 稍微等一个 microtask,确保 p2/p3 已经进入 await pending 分支
- await Promise.resolve();
- await Promise.resolve();
- // 放行创建
- releaseCreate?.();
- const [r1, r2, r3] = await Promise.all([p1, p2, p3]);
- // 三个调用应拿到相同的 cacheKey 与 dispatcherId
- expect(r2.cacheKey).toBe(r1.cacheKey);
- expect(r3.cacheKey).toBe(r1.cacheKey);
- expect(r2.dispatcherId).toBe(r1.dispatcherId);
- expect(r3.dispatcherId).toBe(r1.dispatcherId);
- // 关键断言:所有 3 个并发 waiter 都应计入 activeRequests
- expect(concurrentPool.getPoolStats().activeRequests).toBe(3);
- // 释放 3 次后归零,且仍能再多释一次(no-op)
- concurrentPool.releaseAgent(r1.cacheKey, r1.dispatcherId);
- concurrentPool.releaseAgent(r2.cacheKey, r2.dispatcherId);
- concurrentPool.releaseAgent(r3.cacheKey, r3.dispatcherId);
- expect(concurrentPool.getPoolStats().activeRequests).toBe(0);
- createSpy.mockRestore();
- await concurrentPool.shutdown();
- } finally {
- vi.useFakeTimers();
- }
- });
- it("should ignore releaseAgent from stale dispatcher generation", async () => {
- // ⭐ 回归用例:cacheKey 相同但 dispatcher 已被重建时,旧 release 不应误减新实例
- const regenPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- // 第一代 dispatcher
- const r1 = await regenPool.getAgent(params);
- expect(regenPool.getPoolStats().activeRequests).toBe(1);
- // 模拟外部强制驱逐(例如 markUnhealthy 后下次 getAgent 触发 evictByKey)
- regenPool.markUnhealthy(r1.cacheKey, "simulated SSL failure");
- // 第二代 dispatcher(同 cacheKey,但 dispatcherId 必须不同)
- const r2 = await regenPool.getAgent(params);
- expect(r2.cacheKey).toBe(r1.cacheKey);
- expect(r2.dispatcherId).not.toBe(r1.dispatcherId);
- expect(regenPool.getPoolStats().activeRequests).toBe(1);
- // 用第一代 dispatcherId 释放 —— 必须被忽略
- regenPool.releaseAgent(r1.cacheKey, r1.dispatcherId);
- expect(regenPool.getPoolStats().activeRequests).toBe(1);
- // 用第二代 dispatcherId 释放 —— 正常减到 0
- regenPool.releaseAgent(r2.cacheKey, r2.dispatcherId);
- expect(regenPool.getPoolStats().activeRequests).toBe(0);
- await regenPool.shutdown();
- });
- it("should refresh lastUsedAt on release", async () => {
- const shortTtlPool = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- const { cacheKey, dispatcherId } = await shortTtlPool.getAgent(params);
- // Advance 800ms (close to TTL but not past)
- vi.advanceTimersByTime(800);
- // Release refreshes lastUsedAt
- shortTtlPool.releaseAgent(cacheKey, dispatcherId);
- // Advance another 500ms (total 1300ms from start, but only 500ms from release)
- vi.advanceTimersByTime(500);
- // Should NOT be evicted (TTL reset by release)
- const cleaned = await shortTtlPool.cleanup();
- expect(cleaned).toBe(0);
- await shortTtlPool.shutdown();
- });
- it("should force-expire after hard upper bound regardless of active requests", async () => {
- const pool2 = new AgentPoolImpl({
- ...defaultConfig,
- agentTtlMs: 1000,
- });
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- // getAgent increments activeRequests (never released)
- await pool2.getAgent(params);
- expect(pool2.getPoolStats().activeRequests).toBe(1);
- // Advance past 30-minute hard upper bound
- vi.advanceTimersByTime(31 * 60 * 1000);
- const cleaned = await pool2.cleanup();
- expect(cleaned).toBe(1);
- expect(pool2.getPoolStats().cacheSize).toBe(0);
- await pool2.shutdown();
- });
- it("should be a no-op when releasing non-existent key", () => {
- // Should not throw
- pool.releaseAgent("nonexistent-key", "disp-1");
- expect(pool.getPoolStats().activeRequests).toBe(0);
- });
- it("should not go below zero on over-release", async () => {
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- const { cacheKey, dispatcherId } = await pool.getAgent(params);
- pool.releaseAgent(cacheKey, dispatcherId);
- // Release again when already at 0
- pool.releaseAgent(cacheKey, dispatcherId);
- expect(pool.getPoolStats().activeRequests).toBe(0);
- });
- it("should include activeRequests in pool stats", async () => {
- const params = {
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- };
- expect(pool.getPoolStats().activeRequests).toBe(0);
- await pool.getAgent(params);
- expect(pool.getPoolStats().activeRequests).toBe(1);
- await pool.getAgent(params);
- expect(pool.getPoolStats().activeRequests).toBe(2);
- const { cacheKey, dispatcherId } = await pool.getAgent(params);
- expect(pool.getPoolStats().activeRequests).toBe(3);
- pool.releaseAgent(cacheKey, dispatcherId);
- expect(pool.getPoolStats().activeRequests).toBe(2);
- });
- });
- describe("proxy support", () => {
- it("should create ProxyAgent for HTTP proxy", async () => {
- const result = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: "http://proxy.example.com:8080",
- enableHttp2: false,
- });
- expect(result.isNew).toBe(true);
- expect(result.cacheKey).toContain("http://proxy.example.com:8080");
- });
- it("should create SOCKS dispatcher for SOCKS proxy", async () => {
- const result = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: "socks5://proxy.example.com:1080",
- enableHttp2: false,
- });
- expect(result.isNew).toBe(true);
- expect(result.cacheKey).toContain("socks5://proxy.example.com:1080");
- });
- });
- describe("pool stats", () => {
- it("should return accurate pool statistics", async () => {
- await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- await pool.getAgent({
- endpointUrl: "https://api.openai.com/v1/chat/completions",
- proxyUrl: null,
- enableHttp2: false,
- });
- const stats = pool.getPoolStats();
- expect(stats.cacheSize).toBe(2);
- expect(stats.totalRequests).toBe(3);
- expect(stats.cacheHits).toBe(1);
- expect(stats.cacheMisses).toBe(2);
- expect(stats.hitRate).toBeCloseTo(1 / 3, 2);
- });
- });
- describe("shutdown", () => {
- it("should close all agents on shutdown", async () => {
- await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: false,
- });
- await pool.getAgent({
- endpointUrl: "https://api.openai.com/v1/chat/completions",
- proxyUrl: null,
- enableHttp2: false,
- });
- await pool.shutdown();
- const stats = pool.getPoolStats();
- expect(stats.cacheSize).toBe(0);
- });
- it("should prefer destroy over close to avoid hanging on in-flight streaming requests", async () => {
- const result = await pool.getAgent({
- endpointUrl: "https://api.anthropic.com/v1/messages",
- proxyUrl: null,
- enableHttp2: true,
- });
- const agent = result.agent as unknown as {
- close?: () => Promise<void>;
- destroy?: () => Promise<void>;
- };
- // 说明:本文件顶部已 mock undici Agent/ProxyAgent,因此 destroy/close 应为 vi.fn,断言才有意义
- if (typeof agent.destroy === "function") {
- expect(vi.isMockFunction(agent.destroy)).toBe(true);
- }
- if (typeof agent.close === "function") {
- expect(vi.isMockFunction(agent.close)).toBe(true);
- }
- // 模拟:close 可能因等待 in-flight 请求结束而长期不返回
- if (typeof agent.close === "function") {
- vi.mocked(agent.close).mockImplementation(() => new Promise<void>(() => {}));
- }
- await pool.shutdown();
- // destroy 应被优先调用(避免 close 挂死导致 shutdown/evict 卡住)
- if (typeof agent.destroy === "function") {
- expect(agent.destroy).toHaveBeenCalled();
- }
- if (typeof agent.close === "function") {
- expect(agent.close).not.toHaveBeenCalled();
- }
- });
- });
- });
- describe("getGlobalAgentPool", () => {
- afterEach(async () => {
- await resetGlobalAgentPool();
- });
- it("should return singleton instance", () => {
- const pool1 = getGlobalAgentPool();
- const pool2 = getGlobalAgentPool();
- expect(pool1).toBe(pool2);
- });
- it("should create new instance after reset", async () => {
- const pool1 = getGlobalAgentPool();
- await resetGlobalAgentPool();
- const pool2 = getGlobalAgentPool();
- expect(pool1).not.toBe(pool2);
- });
- });
|