message-write-buffer.test.ts 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
  2. type EnvSnapshot = Partial<Record<string, string | undefined>>;
  3. function snapshotEnv(keys: string[]): EnvSnapshot {
  4. const snapshot: EnvSnapshot = {};
  5. for (const key of keys) {
  6. snapshot[key] = process.env[key];
  7. }
  8. return snapshot;
  9. }
  10. function restoreEnv(snapshot: EnvSnapshot) {
  11. for (const [key, value] of Object.entries(snapshot)) {
  12. if (value === undefined) {
  13. delete process.env[key];
  14. } else {
  15. process.env[key] = value;
  16. }
  17. }
  18. }
  19. function toSqlText(query: { toQuery: (config: any) => { sql: string; params: unknown[] } }) {
  20. return query.toQuery({
  21. escapeName: (name: string) => `"${name}"`,
  22. escapeParam: (index: number) => `$${index}`,
  23. escapeString: (value: string) => `'${value}'`,
  24. paramStartIndex: { value: 1 },
  25. });
  26. }
  27. function createDeferred<T>() {
  28. let resolve!: (value: T) => void;
  29. let reject!: (error: unknown) => void;
  30. const promise = new Promise<T>((res, rej) => {
  31. resolve = res;
  32. reject = rej;
  33. });
  34. return { promise, resolve, reject };
  35. }
  36. describe("message_request 异步批量写入", () => {
  37. const envKeys = [
  38. "NODE_ENV",
  39. "DSN",
  40. "MESSAGE_REQUEST_WRITE_MODE",
  41. "MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS",
  42. "MESSAGE_REQUEST_ASYNC_BATCH_SIZE",
  43. "MESSAGE_REQUEST_ASYNC_MAX_PENDING",
  44. ];
  45. const originalEnv = snapshotEnv(envKeys);
  46. const executeMock = vi.fn(async () => []);
  47. beforeEach(() => {
  48. vi.resetModules();
  49. executeMock.mockClear();
  50. process.env.NODE_ENV = "test";
  51. process.env.DSN = "postgres://postgres:postgres@localhost:5432/claude_code_hub_test";
  52. process.env.MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS = "60000";
  53. process.env.MESSAGE_REQUEST_ASYNC_BATCH_SIZE = "1000";
  54. process.env.MESSAGE_REQUEST_ASYNC_MAX_PENDING = "1000";
  55. vi.doMock("@/drizzle/db", () => ({
  56. db: {
  57. execute: executeMock,
  58. // 避免 tests/setup.ts 的 afterAll 清理逻辑因 mock 缺失 select 而报错
  59. select: () => ({
  60. from: () => ({
  61. where: async () => [],
  62. }),
  63. }),
  64. },
  65. }));
  66. });
  67. afterEach(() => {
  68. restoreEnv(originalEnv);
  69. });
  70. it("sync 模式下不应入队/写库", async () => {
  71. process.env.MESSAGE_REQUEST_WRITE_MODE = "sync";
  72. const { enqueueMessageRequestUpdate, flushMessageRequestWriteBuffer } = await import(
  73. "@/repository/message-write-buffer"
  74. );
  75. enqueueMessageRequestUpdate(1, { durationMs: 123 });
  76. await flushMessageRequestWriteBuffer();
  77. expect(executeMock).not.toHaveBeenCalled();
  78. });
  79. it("async 模式下应合并同一 id 的多次更新并批量写入", async () => {
  80. process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
  81. const {
  82. enqueueMessageRequestUpdate,
  83. flushMessageRequestWriteBuffer,
  84. stopMessageRequestWriteBuffer,
  85. } = await import("@/repository/message-write-buffer");
  86. enqueueMessageRequestUpdate(42, { durationMs: 100 });
  87. enqueueMessageRequestUpdate(42, { statusCode: 200, ttfbMs: 10 });
  88. await flushMessageRequestWriteBuffer();
  89. await stopMessageRequestWriteBuffer();
  90. expect(executeMock).toHaveBeenCalledTimes(1);
  91. const query = executeMock.mock.calls[0]?.[0];
  92. const built = toSqlText(query);
  93. expect(built.sql).toContain("UPDATE message_request");
  94. expect(built.sql).toContain("duration_ms");
  95. expect(built.sql).toContain("status_code");
  96. expect(built.sql).toContain("ttfb_ms");
  97. expect(built.sql).toContain("updated_at");
  98. expect(built.sql).toContain("deleted_at IS NULL");
  99. });
  100. it("应对 costUsd/providerChain 做显式类型转换(numeric/jsonb)", async () => {
  101. process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
  102. const { enqueueMessageRequestUpdate, stopMessageRequestWriteBuffer } = await import(
  103. "@/repository/message-write-buffer"
  104. );
  105. enqueueMessageRequestUpdate(7, {
  106. costUsd: "0.000123",
  107. providerChain: [{ id: 1, name: "p1" }],
  108. });
  109. await stopMessageRequestWriteBuffer();
  110. expect(executeMock).toHaveBeenCalledTimes(1);
  111. const query = executeMock.mock.calls[0]?.[0];
  112. const built = toSqlText(query);
  113. expect(built.sql).toContain("::numeric");
  114. expect(built.sql).toContain("::jsonb");
  115. });
  116. it("stop 应等待 in-flight flush 完成", async () => {
  117. process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
  118. const deferred = createDeferred<unknown[]>();
  119. executeMock.mockImplementationOnce(async () => deferred.promise);
  120. const { enqueueMessageRequestUpdate, stopMessageRequestWriteBuffer } = await import(
  121. "@/repository/message-write-buffer"
  122. );
  123. enqueueMessageRequestUpdate(1, { durationMs: 123 });
  124. const stopPromise = stopMessageRequestWriteBuffer();
  125. expect(executeMock).toHaveBeenCalledTimes(1);
  126. const raced = await Promise.race([
  127. stopPromise.then(() => "stopped"),
  128. Promise.resolve("pending"),
  129. ]);
  130. expect(raced).toBe("pending");
  131. deferred.resolve([]);
  132. await stopPromise;
  133. });
  134. it("flush 进行中 enqueue 的更新应最终落库", async () => {
  135. process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
  136. const firstExecute = createDeferred<unknown[]>();
  137. executeMock.mockImplementationOnce(async () => firstExecute.promise);
  138. executeMock.mockImplementationOnce(async () => []);
  139. const {
  140. enqueueMessageRequestUpdate,
  141. flushMessageRequestWriteBuffer,
  142. stopMessageRequestWriteBuffer,
  143. } = await import("@/repository/message-write-buffer");
  144. enqueueMessageRequestUpdate(42, { durationMs: 100 });
  145. const flushPromise = flushMessageRequestWriteBuffer();
  146. expect(executeMock).toHaveBeenCalledTimes(1);
  147. // 在第一次写入尚未完成时,追加同一请求的后续 patch
  148. enqueueMessageRequestUpdate(42, { statusCode: 200 });
  149. firstExecute.resolve([]);
  150. await flushPromise;
  151. await stopMessageRequestWriteBuffer();
  152. expect(executeMock).toHaveBeenCalledTimes(2);
  153. const secondQuery = executeMock.mock.calls[1]?.[0];
  154. const built = toSqlText(secondQuery);
  155. expect(built.sql).toContain("status_code");
  156. });
  157. it("DB 写入失败重试时不应覆盖更晚的 patch", async () => {
  158. process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
  159. const firstExecute = createDeferred<unknown[]>();
  160. executeMock.mockImplementationOnce(async () => firstExecute.promise);
  161. executeMock.mockImplementationOnce(async () => []);
  162. const {
  163. enqueueMessageRequestUpdate,
  164. flushMessageRequestWriteBuffer,
  165. stopMessageRequestWriteBuffer,
  166. } = await import("@/repository/message-write-buffer");
  167. enqueueMessageRequestUpdate(7, { durationMs: 100 });
  168. const flushPromise = flushMessageRequestWriteBuffer();
  169. expect(executeMock).toHaveBeenCalledTimes(1);
  170. // 在第一次 flush 的 in-flight 期间写入“更晚”的字段
  171. enqueueMessageRequestUpdate(7, { statusCode: 500 });
  172. firstExecute.reject(new Error("db down"));
  173. await flushPromise;
  174. // 触发下一次 flush:应同时包含 duration/statusCode
  175. await flushMessageRequestWriteBuffer();
  176. await stopMessageRequestWriteBuffer();
  177. expect(executeMock).toHaveBeenCalledTimes(2);
  178. const secondQuery = executeMock.mock.calls[1]?.[0];
  179. const built = toSqlText(secondQuery);
  180. expect(built.sql).toContain("duration_ms");
  181. expect(built.sql).toContain("status_code");
  182. });
  183. it("队列溢出时应优先丢弃非终态更新(尽量保留 durationMs)", async () => {
  184. process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
  185. process.env.MESSAGE_REQUEST_ASYNC_MAX_PENDING = "100";
  186. const { enqueueMessageRequestUpdate, stopMessageRequestWriteBuffer } = await import(
  187. "@/repository/message-write-buffer"
  188. );
  189. enqueueMessageRequestUpdate(1001, { statusCode: 200 }); // 非终态(无 durationMs)
  190. for (let i = 0; i < 100; i++) {
  191. enqueueMessageRequestUpdate(2000 + i, { durationMs: i });
  192. }
  193. await stopMessageRequestWriteBuffer();
  194. expect(executeMock).toHaveBeenCalledTimes(1);
  195. const query = executeMock.mock.calls[0]?.[0];
  196. const built = toSqlText(query);
  197. expect(built.params).toContain(2000);
  198. expect(built.params).toContain(2099);
  199. expect(built.params).not.toContain(1001);
  200. });
  201. });