agent-pool.test.ts 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. /**
  2. * Agent Pool Tests
  3. *
  4. * TDD: Tests written first, implementation follows
  5. */
  6. import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
  7. // Mock undici before importing agent-pool
  8. vi.mock("undici", () => ({
  9. Agent: vi.fn().mockImplementation((options) => ({
  10. options,
  11. close: vi.fn().mockResolvedValue(undefined),
  12. destroy: vi.fn().mockResolvedValue(undefined),
  13. })),
  14. ProxyAgent: vi.fn().mockImplementation((options) => ({
  15. options,
  16. close: vi.fn().mockResolvedValue(undefined),
  17. destroy: vi.fn().mockResolvedValue(undefined),
  18. })),
  19. }));
  20. vi.mock("fetch-socks", () => ({
  21. socksDispatcher: vi.fn().mockImplementation((proxy, options) => ({
  22. proxy,
  23. options,
  24. close: vi.fn().mockResolvedValue(undefined),
  25. destroy: vi.fn().mockResolvedValue(undefined),
  26. })),
  27. }));
  28. import {
  29. type AgentPool,
  30. AgentPoolImpl,
  31. generateAgentCacheKey,
  32. getGlobalAgentPool,
  33. resetGlobalAgentPool,
  34. type AgentPoolConfig,
  35. } from "@/lib/proxy-agent/agent-pool";
  36. describe("generateAgentCacheKey", () => {
  37. it("should generate correct cache key for direct connection", () => {
  38. const key = generateAgentCacheKey({
  39. endpointUrl: "https://api.anthropic.com/v1/messages",
  40. proxyUrl: null,
  41. enableHttp2: false,
  42. });
  43. expect(key).toBe("https://api.anthropic.com|direct|h1");
  44. });
  45. it("should generate correct cache key with proxy", () => {
  46. const key = generateAgentCacheKey({
  47. endpointUrl: "https://api.openai.com/v1/chat/completions",
  48. proxyUrl: "http://proxy.example.com:8080",
  49. enableHttp2: false,
  50. });
  51. expect(key).toBe("https://api.openai.com|http://proxy.example.com:8080|h1");
  52. });
  53. it("should generate correct cache key with HTTP/2 enabled", () => {
  54. const key = generateAgentCacheKey({
  55. endpointUrl: "https://api.anthropic.com/v1/messages",
  56. proxyUrl: null,
  57. enableHttp2: true,
  58. });
  59. expect(key).toBe("https://api.anthropic.com|direct|h2");
  60. });
  61. it("should generate correct cache key with proxy and HTTP/2", () => {
  62. const key = generateAgentCacheKey({
  63. endpointUrl: "https://api.anthropic.com/v1/messages",
  64. proxyUrl: "https://secure-proxy.example.com:443",
  65. enableHttp2: true,
  66. });
  67. // URL API strips default port 443 for HTTPS
  68. expect(key).toBe("https://api.anthropic.com|https://secure-proxy.example.com|h2");
  69. });
  70. it("should use origin only (strip path and query)", () => {
  71. const key = generateAgentCacheKey({
  72. endpointUrl: "https://api.anthropic.com/v1/messages?key=value",
  73. proxyUrl: null,
  74. enableHttp2: false,
  75. });
  76. expect(key).toBe("https://api.anthropic.com|direct|h1");
  77. });
  78. it("should handle different ports", () => {
  79. const key = generateAgentCacheKey({
  80. endpointUrl: "https://api.example.com:8443/v1/messages",
  81. proxyUrl: null,
  82. enableHttp2: false,
  83. });
  84. expect(key).toBe("https://api.example.com:8443|direct|h1");
  85. });
  86. it("should differentiate HTTP and HTTPS", () => {
  87. const httpKey = generateAgentCacheKey({
  88. endpointUrl: "http://api.example.com/v1/messages",
  89. proxyUrl: null,
  90. enableHttp2: false,
  91. });
  92. const httpsKey = generateAgentCacheKey({
  93. endpointUrl: "https://api.example.com/v1/messages",
  94. proxyUrl: null,
  95. enableHttp2: false,
  96. });
  97. expect(httpKey).not.toBe(httpsKey);
  98. expect(httpKey).toBe("http://api.example.com|direct|h1");
  99. expect(httpsKey).toBe("https://api.example.com|direct|h1");
  100. });
  101. });
  102. describe("AgentPool", () => {
  103. let pool: AgentPool;
  104. const defaultConfig: AgentPoolConfig = {
  105. maxTotalAgents: 10,
  106. agentTtlMs: 300000, // 5 minutes
  107. connectionIdleTimeoutMs: 60000, // 1 minute
  108. cleanupIntervalMs: 30000, // 30 seconds
  109. };
  110. beforeEach(() => {
  111. vi.useFakeTimers();
  112. pool = new AgentPoolImpl(defaultConfig);
  113. });
  114. afterEach(async () => {
  115. await pool.shutdown();
  116. vi.useRealTimers();
  117. vi.clearAllMocks();
  118. });
  119. describe("caching behavior", () => {
  120. it("should reuse Agent for same endpoint", async () => {
  121. const params = {
  122. endpointUrl: "https://api.anthropic.com/v1/messages",
  123. proxyUrl: null,
  124. enableHttp2: false,
  125. };
  126. const result1 = await pool.getAgent(params);
  127. const result2 = await pool.getAgent(params);
  128. expect(result1.cacheKey).toBe(result2.cacheKey);
  129. expect(result1.agent).toBe(result2.agent);
  130. expect(result1.isNew).toBe(true);
  131. expect(result2.isNew).toBe(false);
  132. });
  133. it("should create different Agent for different endpoints", async () => {
  134. const result1 = await pool.getAgent({
  135. endpointUrl: "https://api.anthropic.com/v1/messages",
  136. proxyUrl: null,
  137. enableHttp2: false,
  138. });
  139. const result2 = await pool.getAgent({
  140. endpointUrl: "https://api.openai.com/v1/chat/completions",
  141. proxyUrl: null,
  142. enableHttp2: false,
  143. });
  144. expect(result1.cacheKey).not.toBe(result2.cacheKey);
  145. expect(result1.agent).not.toBe(result2.agent);
  146. expect(result1.isNew).toBe(true);
  147. expect(result2.isNew).toBe(true);
  148. });
  149. it("should create different Agent for different proxy configs", async () => {
  150. const result1 = await pool.getAgent({
  151. endpointUrl: "https://api.anthropic.com/v1/messages",
  152. proxyUrl: null,
  153. enableHttp2: false,
  154. });
  155. const result2 = await pool.getAgent({
  156. endpointUrl: "https://api.anthropic.com/v1/messages",
  157. proxyUrl: "http://proxy.example.com:8080",
  158. enableHttp2: false,
  159. });
  160. expect(result1.cacheKey).not.toBe(result2.cacheKey);
  161. expect(result1.agent).not.toBe(result2.agent);
  162. });
  163. it("should create different Agent for HTTP/2 vs HTTP/1.1", async () => {
  164. const result1 = await pool.getAgent({
  165. endpointUrl: "https://api.anthropic.com/v1/messages",
  166. proxyUrl: null,
  167. enableHttp2: false,
  168. });
  169. const result2 = await pool.getAgent({
  170. endpointUrl: "https://api.anthropic.com/v1/messages",
  171. proxyUrl: null,
  172. enableHttp2: true,
  173. });
  174. expect(result1.cacheKey).not.toBe(result2.cacheKey);
  175. expect(result1.agent).not.toBe(result2.agent);
  176. });
  177. it("should track request count", async () => {
  178. const params = {
  179. endpointUrl: "https://api.anthropic.com/v1/messages",
  180. proxyUrl: null,
  181. enableHttp2: false,
  182. };
  183. await pool.getAgent(params);
  184. await pool.getAgent(params);
  185. await pool.getAgent(params);
  186. const stats = pool.getPoolStats();
  187. expect(stats.totalRequests).toBe(3);
  188. expect(stats.cacheHits).toBe(2);
  189. expect(stats.cacheMisses).toBe(1);
  190. });
  191. });
  192. describe("health management", () => {
  193. it("should create new Agent after marking unhealthy", async () => {
  194. const params = {
  195. endpointUrl: "https://api.anthropic.com/v1/messages",
  196. proxyUrl: null,
  197. enableHttp2: false,
  198. };
  199. const result1 = await pool.getAgent(params);
  200. pool.markUnhealthy(result1.cacheKey, "SSL certificate error");
  201. const result2 = await pool.getAgent(params);
  202. expect(result2.isNew).toBe(true);
  203. expect(result2.agent).not.toBe(result1.agent);
  204. });
  205. it("should not hang when evicting an unhealthy agent whose close() never resolves", async () => {
  206. // 说明:beforeEach 使用了 fake timers,但此用例需要依赖真实 setTimeout 做“防卡死”断言
  207. await pool.shutdown();
  208. vi.useRealTimers();
  209. const realPool = new AgentPoolImpl(defaultConfig);
  210. const withTimeout = async <T>(promise: Promise<T>, ms: number): Promise<T> => {
  211. let timeoutId: ReturnType<typeof setTimeout> | null = null;
  212. try {
  213. return await Promise.race([
  214. promise,
  215. new Promise<T>((_, reject) => {
  216. timeoutId = setTimeout(() => reject(new Error("timeout")), ms);
  217. }),
  218. ]);
  219. } finally {
  220. if (timeoutId) clearTimeout(timeoutId);
  221. }
  222. };
  223. try {
  224. const params = {
  225. endpointUrl: "https://api.anthropic.com/v1/messages",
  226. proxyUrl: null,
  227. enableHttp2: true,
  228. };
  229. const result1 = await realPool.getAgent(params);
  230. const agent1 = result1.agent as unknown as {
  231. close?: () => Promise<void>;
  232. destroy?: unknown;
  233. };
  234. // 强制走 close() 分支:模拟某些 dispatcher 不支持 destroy()
  235. agent1.destroy = undefined;
  236. // 模拟:close 可能因等待 in-flight 请求结束而长期不返回
  237. let closeCalled = false;
  238. agent1.close = () => {
  239. closeCalled = true;
  240. return new Promise<void>(() => {});
  241. };
  242. realPool.markUnhealthy(result1.cacheKey, "test-hang-close");
  243. const result2 = await withTimeout(realPool.getAgent(params), 500);
  244. expect(result2.isNew).toBe(true);
  245. expect(result2.agent).not.toBe(result1.agent);
  246. // 断言:即使 close() 处于 pending,也不会阻塞 getAgent(),且会触发 close 调用
  247. expect(closeCalled).toBe(true);
  248. } finally {
  249. await realPool.shutdown();
  250. vi.useFakeTimers();
  251. }
  252. });
  253. it("should track unhealthy agents in stats", async () => {
  254. const params = {
  255. endpointUrl: "https://api.anthropic.com/v1/messages",
  256. proxyUrl: null,
  257. enableHttp2: false,
  258. };
  259. const result = await pool.getAgent(params);
  260. pool.markUnhealthy(result.cacheKey, "SSL certificate error");
  261. const stats = pool.getPoolStats();
  262. expect(stats.unhealthyAgents).toBe(1);
  263. });
  264. it("should evict all Agents for endpoint on evictEndpoint", async () => {
  265. // Create agents for same endpoint with different configs
  266. await pool.getAgent({
  267. endpointUrl: "https://api.anthropic.com/v1/messages",
  268. proxyUrl: null,
  269. enableHttp2: false,
  270. });
  271. await pool.getAgent({
  272. endpointUrl: "https://api.anthropic.com/v1/messages",
  273. proxyUrl: null,
  274. enableHttp2: true,
  275. });
  276. await pool.getAgent({
  277. endpointUrl: "https://api.openai.com/v1/chat/completions",
  278. proxyUrl: null,
  279. enableHttp2: false,
  280. });
  281. const statsBefore = pool.getPoolStats();
  282. expect(statsBefore.cacheSize).toBe(3);
  283. await pool.evictEndpoint("https://api.anthropic.com");
  284. const statsAfter = pool.getPoolStats();
  285. expect(statsAfter.cacheSize).toBe(1);
  286. expect(statsAfter.evictedAgents).toBe(2);
  287. });
  288. });
  289. describe("expiration cleanup", () => {
  290. it("should cleanup expired Agents", async () => {
  291. const shortTtlPool = new AgentPoolImpl({
  292. ...defaultConfig,
  293. agentTtlMs: 1000, // 1 second TTL
  294. });
  295. await shortTtlPool.getAgent({
  296. endpointUrl: "https://api.anthropic.com/v1/messages",
  297. proxyUrl: null,
  298. enableHttp2: false,
  299. });
  300. expect(shortTtlPool.getPoolStats().cacheSize).toBe(1);
  301. // Advance time past TTL
  302. vi.advanceTimersByTime(2000);
  303. const cleaned = await shortTtlPool.cleanup();
  304. expect(cleaned).toBe(1);
  305. expect(shortTtlPool.getPoolStats().cacheSize).toBe(0);
  306. await shortTtlPool.shutdown();
  307. });
  308. it("should not cleanup recently used Agents", async () => {
  309. const shortTtlPool = new AgentPoolImpl({
  310. ...defaultConfig,
  311. agentTtlMs: 1000,
  312. });
  313. const params = {
  314. endpointUrl: "https://api.anthropic.com/v1/messages",
  315. proxyUrl: null,
  316. enableHttp2: false,
  317. };
  318. await shortTtlPool.getAgent(params);
  319. // Advance time but not past TTL
  320. vi.advanceTimersByTime(500);
  321. // Use the agent again (updates lastUsedAt)
  322. await shortTtlPool.getAgent(params);
  323. // Advance time again
  324. vi.advanceTimersByTime(500);
  325. const cleaned = await shortTtlPool.cleanup();
  326. expect(cleaned).toBe(0);
  327. expect(shortTtlPool.getPoolStats().cacheSize).toBe(1);
  328. await shortTtlPool.shutdown();
  329. });
  330. it("should implement LRU eviction when max size reached", async () => {
  331. const smallPool = new AgentPoolImpl({
  332. ...defaultConfig,
  333. maxTotalAgents: 2,
  334. });
  335. // Create 3 agents (exceeds max of 2)
  336. await smallPool.getAgent({
  337. endpointUrl: "https://api1.example.com/v1",
  338. proxyUrl: null,
  339. enableHttp2: false,
  340. });
  341. vi.advanceTimersByTime(100);
  342. await smallPool.getAgent({
  343. endpointUrl: "https://api2.example.com/v1",
  344. proxyUrl: null,
  345. enableHttp2: false,
  346. });
  347. vi.advanceTimersByTime(100);
  348. await smallPool.getAgent({
  349. endpointUrl: "https://api3.example.com/v1",
  350. proxyUrl: null,
  351. enableHttp2: false,
  352. });
  353. // Should have evicted the oldest (LRU)
  354. const stats = smallPool.getPoolStats();
  355. expect(stats.cacheSize).toBeLessThanOrEqual(2);
  356. await smallPool.shutdown();
  357. });
  358. });
  359. describe("proxy support", () => {
  360. it("should create ProxyAgent for HTTP proxy", async () => {
  361. const result = await pool.getAgent({
  362. endpointUrl: "https://api.anthropic.com/v1/messages",
  363. proxyUrl: "http://proxy.example.com:8080",
  364. enableHttp2: false,
  365. });
  366. expect(result.isNew).toBe(true);
  367. expect(result.cacheKey).toContain("http://proxy.example.com:8080");
  368. });
  369. it("should create SOCKS dispatcher for SOCKS proxy", async () => {
  370. const result = await pool.getAgent({
  371. endpointUrl: "https://api.anthropic.com/v1/messages",
  372. proxyUrl: "socks5://proxy.example.com:1080",
  373. enableHttp2: false,
  374. });
  375. expect(result.isNew).toBe(true);
  376. expect(result.cacheKey).toContain("socks5://proxy.example.com:1080");
  377. });
  378. });
  379. describe("pool stats", () => {
  380. it("should return accurate pool statistics", async () => {
  381. await pool.getAgent({
  382. endpointUrl: "https://api.anthropic.com/v1/messages",
  383. proxyUrl: null,
  384. enableHttp2: false,
  385. });
  386. await pool.getAgent({
  387. endpointUrl: "https://api.anthropic.com/v1/messages",
  388. proxyUrl: null,
  389. enableHttp2: false,
  390. });
  391. await pool.getAgent({
  392. endpointUrl: "https://api.openai.com/v1/chat/completions",
  393. proxyUrl: null,
  394. enableHttp2: false,
  395. });
  396. const stats = pool.getPoolStats();
  397. expect(stats.cacheSize).toBe(2);
  398. expect(stats.totalRequests).toBe(3);
  399. expect(stats.cacheHits).toBe(1);
  400. expect(stats.cacheMisses).toBe(2);
  401. expect(stats.hitRate).toBeCloseTo(1 / 3, 2);
  402. });
  403. });
  404. describe("shutdown", () => {
  405. it("should close all agents on shutdown", async () => {
  406. await pool.getAgent({
  407. endpointUrl: "https://api.anthropic.com/v1/messages",
  408. proxyUrl: null,
  409. enableHttp2: false,
  410. });
  411. await pool.getAgent({
  412. endpointUrl: "https://api.openai.com/v1/chat/completions",
  413. proxyUrl: null,
  414. enableHttp2: false,
  415. });
  416. await pool.shutdown();
  417. const stats = pool.getPoolStats();
  418. expect(stats.cacheSize).toBe(0);
  419. });
  420. it("should prefer destroy over close to avoid hanging on in-flight streaming requests", async () => {
  421. const result = await pool.getAgent({
  422. endpointUrl: "https://api.anthropic.com/v1/messages",
  423. proxyUrl: null,
  424. enableHttp2: true,
  425. });
  426. const agent = result.agent as unknown as {
  427. close?: () => Promise<void>;
  428. destroy?: () => Promise<void>;
  429. };
  430. // 说明:本文件顶部已 mock undici Agent/ProxyAgent,因此 destroy/close 应为 vi.fn,断言才有意义
  431. if (typeof agent.destroy === "function") {
  432. expect(vi.isMockFunction(agent.destroy)).toBe(true);
  433. }
  434. if (typeof agent.close === "function") {
  435. expect(vi.isMockFunction(agent.close)).toBe(true);
  436. }
  437. // 模拟:close 可能因等待 in-flight 请求结束而长期不返回
  438. if (typeof agent.close === "function") {
  439. vi.mocked(agent.close).mockImplementation(() => new Promise<void>(() => {}));
  440. }
  441. await pool.shutdown();
  442. // destroy 应被优先调用(避免 close 挂死导致 shutdown/evict 卡住)
  443. if (typeof agent.destroy === "function") {
  444. expect(agent.destroy).toHaveBeenCalled();
  445. }
  446. if (typeof agent.close === "function") {
  447. expect(agent.close).not.toHaveBeenCalled();
  448. }
  449. });
  450. });
  451. });
  452. describe("getGlobalAgentPool", () => {
  453. afterEach(async () => {
  454. await resetGlobalAgentPool();
  455. });
  456. it("should return singleton instance", () => {
  457. const pool1 = getGlobalAgentPool();
  458. const pool2 = getGlobalAgentPool();
  459. expect(pool1).toBe(pool2);
  460. });
  461. it("should create new instance after reset", async () => {
  462. const pool1 = getGlobalAgentPool();
  463. await resetGlobalAgentPool();
  464. const pool2 = getGlobalAgentPool();
  465. expect(pool1).not.toBe(pool2);
  466. });
  467. });