Преглед изворни кода

fix: 优化数据库连接池与请求日志写入性能 (#503)

* fix: 优化数据库连接池与请求日志写入性能

* fix: 修复异步写缓冲的停机与重试边界
Ding пре 1 месец
родитељ
комит
e557bd3d12

+ 18 - 0
.env.example

@@ -8,6 +8,24 @@ AUTO_MIGRATE=true
 # 数据库连接字符串(仅用于本地开发或非 Docker Compose 部署)
 DSN="postgres://user:password@host:port/db_name"
 
+# PostgreSQL 连接池配置(postgres.js)
+# 说明:
+# - 这些值是“每个应用进程”的连接池上限;k8s 多副本时需要按副本数分摊
+# - 默认值:生产环境 20,开发环境 10(可按需覆盖)
+DB_POOL_MAX=20
+DB_POOL_IDLE_TIMEOUT=20                  # 空闲连接回收(秒)
+DB_POOL_CONNECT_TIMEOUT=10               # 建立连接超时(秒)
+
+# message_request 写入模式
+# - async:异步批量写入(默认,降低 DB 写放大与连接占用)
+# - sync:同步写入(兼容旧行为,但高并发下会增加请求尾部阻塞)
+MESSAGE_REQUEST_WRITE_MODE=async
+
+# message_request 异步批量参数(可选)
+MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS=250
+MESSAGE_REQUEST_ASYNC_BATCH_SIZE=200
+MESSAGE_REQUEST_ASYNC_MAX_PENDING=5000
+
 # 数据库配置(Docker Compose 部署时使用)
 DB_USER=postgres
 DB_PASSWORD=your-secure-password_change-me

+ 7 - 0
README.md

@@ -275,6 +275,13 @@ Docker Compose 是**首选部署方式**,自动配置数据库、Redis 和应
 | ------------------------------------------ | ------------------------ | ---------------------------------------------------------------------------- |
 | `ADMIN_TOKEN`                              | `change-me`              | 后台登录令牌,部署前必须修改。                                               |
 | `DSN`                                      | -                        | PostgreSQL 连接串,如 `postgres://user:pass@host:5432/db`.                   |
+| `DB_POOL_MAX`                              | 生产环境 `20` / 开发 `10` | PostgreSQL 连接池上限(每进程);高并发可提高,k8s 多副本需结合 `max_connections` 分摊。 |
+| `DB_POOL_IDLE_TIMEOUT`                     | `20`                     | 空闲连接回收(秒);避免连接长期占用。                                       |
+| `DB_POOL_CONNECT_TIMEOUT`                  | `10`                     | 建立连接超时(秒);避免网络异常时卡住连接获取。                             |
+| `MESSAGE_REQUEST_WRITE_MODE`               | `async`                  | 请求日志写入模式:`async` 异步批量(默认);`sync` 同步写入(更实时但更慢)。 |
+| `MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS`  | `250`                    | 异步批量写入 flush 间隔(毫秒)。                                            |
+| `MESSAGE_REQUEST_ASYNC_BATCH_SIZE`         | `200`                    | 单次批量写入最大条数(避免单条 SQL 过大)。                                  |
+| `MESSAGE_REQUEST_ASYNC_MAX_PENDING`        | `5000`                   | 内存队列上限(防止 DB 异常时无限增长;超限将丢弃最旧更新并告警)。           |
 | `AUTO_MIGRATE`                             | `true`                   | 启动时自动执行 Drizzle 迁移;生产环境可关闭以人工控制。                      |
 | `REDIS_URL`                                | `redis://localhost:6379` | Redis 地址,支持 `rediss://` 用于 TLS。                                      |
 | `REDIS_TLS_REJECT_UNAUTHORIZED`            | `true`                   | 是否验证 Redis TLS 证书;设为 `false` 可跳过验证(用于自签/共享证书)。      |

+ 15 - 8
src/app/v1/_lib/proxy/response-handler.ts

@@ -1,4 +1,5 @@
 import { AsyncTaskManager } from "@/lib/async-task-manager";
+import { getEnvConfig } from "@/lib/config/env.schema";
 import { logger } from "@/lib/logger";
 import { ProxyStatusTracker } from "@/lib/proxy-status-tracker";
 import { RateLimitService } from "@/lib/rate-limit";
@@ -1967,14 +1968,20 @@ async function persistRequestFailure(options: {
       context1mApplied: session.getContext1mApplied(),
     });
 
-    logger.info("ResponseHandler: Successfully persisted request failure", {
-      taskId,
-      phase,
-      messageId: messageContext.id,
-      duration,
-      statusCode,
-      errorMessage,
-    });
+    const isAsyncWrite = getEnvConfig().MESSAGE_REQUEST_WRITE_MODE !== "sync";
+    logger.info(
+      isAsyncWrite
+        ? "ResponseHandler: Request failure persistence enqueued"
+        : "ResponseHandler: Successfully persisted request failure",
+      {
+        taskId,
+        phase,
+        messageId: messageContext.id,
+        duration,
+        statusCode,
+        errorMessage,
+      }
+    );
   } catch (dbError) {
     logger.error("ResponseHandler: Failed to persist request failure", {
       taskId,

+ 11 - 2
src/drizzle/db.ts

@@ -2,18 +2,27 @@ import 'server-only';
 
 import { drizzle, type PostgresJsDatabase } from 'drizzle-orm/postgres-js';
 import postgres from 'postgres';
+import { getEnvConfig } from '@/lib/config/env.schema';
 import * as schema from './schema';
 
 let dbInstance: PostgresJsDatabase<typeof schema> | null = null;
 
 function createDbInstance(): PostgresJsDatabase<typeof schema> {
-  const connectionString = process.env.DSN;
+  const env = getEnvConfig();
+  const connectionString = env.DSN;
 
   if (!connectionString) {
     throw new Error('DSN environment variable is not set');
   }
 
-  const client = postgres(connectionString);
+  // postgres.js 默认 max=10,在高并发下容易出现查询排队
+  // 这里采用“生产环境默认更大、同时可通过 env 覆盖”的策略,兼容单机与 k8s 多副本
+  const defaultMax = env.NODE_ENV === 'production' ? 20 : 10;
+  const client = postgres(connectionString, {
+    max: env.DB_POOL_MAX ?? defaultMax,
+    idle_timeout: env.DB_POOL_IDLE_TIMEOUT ?? 20,
+    connect_timeout: env.DB_POOL_CONNECT_TIMEOUT ?? 10,
+  });
   return drizzle(client, { schema });
 }
 

+ 12 - 0
src/instrumentation.ts

@@ -84,6 +84,18 @@ export async function register() {
             error: error instanceof Error ? error.message : String(error),
           });
         }
+
+        // 尽力将 message_request 的异步批量更新刷入数据库(避免终止时丢失尾部日志)
+        try {
+          const { stopMessageRequestWriteBuffer } = await import(
+            "@/repository/message-write-buffer"
+          );
+          await stopMessageRequestWriteBuffer();
+        } catch (error) {
+          logger.warn("[Instrumentation] Failed to stop message request write buffer", {
+            error: error instanceof Error ? error.message : String(error),
+          });
+        }
       };
 
       process.once("SIGTERM", () => {

+ 58 - 0
src/lib/config/env.schema.ts

@@ -9,6 +9,18 @@ import { z } from "zod";
  */
 const booleanTransform = (s: string) => s !== "false" && s !== "0";
 
+/**
+ * 可选数值解析(支持字符串)
+ * - undefined/null/空字符串 -> undefined
+ * - 其他 -> 交给 z.coerce.number 处理
+ */
+const optionalNumber = (schema: z.ZodNumber) =>
+  z.preprocess((val) => {
+    if (val === undefined || val === null || val === "") return undefined;
+    if (typeof val === "string") return Number(val);
+    return val;
+  }, schema.optional());
+
 /**
  * 环境变量验证schema
  */
@@ -20,6 +32,52 @@ export const EnvSchema = z.object({
     if (val.includes("user:password@host:port")) return undefined; // 占位符模板
     return val;
   }, z.string().url("数据库URL格式无效").optional()),
+  // PostgreSQL 连接池配置(postgres.js)
+  // - 多副本部署(k8s)需要结合数据库 max_connections 分摊配置
+  // - 这些值为“每个应用进程”的连接池上限
+  DB_POOL_MAX: optionalNumber(
+    z.number().int().min(1, "DB_POOL_MAX 不能小于 1").max(200, "DB_POOL_MAX 不能大于 200")
+  ),
+  // 空闲连接回收(秒)
+  DB_POOL_IDLE_TIMEOUT: optionalNumber(
+    z
+      .number()
+      .min(0, "DB_POOL_IDLE_TIMEOUT 不能小于 0")
+      .max(3600, "DB_POOL_IDLE_TIMEOUT 不能大于 3600")
+  ),
+  // 建连超时(秒)
+  DB_POOL_CONNECT_TIMEOUT: optionalNumber(
+    z
+      .number()
+      .min(1, "DB_POOL_CONNECT_TIMEOUT 不能小于 1")
+      .max(120, "DB_POOL_CONNECT_TIMEOUT 不能大于 120")
+  ),
+  // message_request 写入模式
+  // - sync:同步写入(兼容旧行为,但高并发下会增加请求尾部阻塞)
+  // - async:异步批量写入(默认,降低 DB 写放大与连接占用)
+  MESSAGE_REQUEST_WRITE_MODE: z.enum(["sync", "async"]).default("async"),
+  // 异步批量写入参数
+  MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS: optionalNumber(
+    z
+      .number()
+      .int()
+      .min(10, "MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS 不能小于 10")
+      .max(60000, "MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS 不能大于 60000")
+  ),
+  MESSAGE_REQUEST_ASYNC_BATCH_SIZE: optionalNumber(
+    z
+      .number()
+      .int()
+      .min(1, "MESSAGE_REQUEST_ASYNC_BATCH_SIZE 不能小于 1")
+      .max(2000, "MESSAGE_REQUEST_ASYNC_BATCH_SIZE 不能大于 2000")
+  ),
+  MESSAGE_REQUEST_ASYNC_MAX_PENDING: optionalNumber(
+    z
+      .number()
+      .int()
+      .min(100, "MESSAGE_REQUEST_ASYNC_MAX_PENDING 不能小于 100")
+      .max(200000, "MESSAGE_REQUEST_ASYNC_MAX_PENDING 不能大于 200000")
+  ),
   ADMIN_TOKEN: z.preprocess((val) => {
     // 空字符串或 "change-me" 占位符转为 undefined
     if (!val || typeof val !== "string") return undefined;

+ 352 - 0
src/repository/message-write-buffer.ts

@@ -0,0 +1,352 @@
+import "server-only";
+
+import type { SQL } from "drizzle-orm";
+import { sql } from "drizzle-orm";
+import { db } from "@/drizzle/db";
+import { getEnvConfig } from "@/lib/config/env.schema";
+import { logger } from "@/lib/logger";
+import type { CreateMessageRequestData } from "@/types/message";
+
+export type MessageRequestUpdatePatch = {
+  durationMs?: number;
+  costUsd?: string;
+  statusCode?: number;
+  inputTokens?: number;
+  outputTokens?: number;
+  ttfbMs?: number | null;
+  cacheCreationInputTokens?: number;
+  cacheReadInputTokens?: number;
+  cacheCreation5mInputTokens?: number;
+  cacheCreation1hInputTokens?: number;
+  cacheTtlApplied?: string | null;
+  providerChain?: CreateMessageRequestData["provider_chain"];
+  errorMessage?: string;
+  errorStack?: string;
+  errorCause?: string;
+  model?: string;
+  providerId?: number;
+  context1mApplied?: boolean;
+};
+
+type MessageRequestUpdateRecord = {
+  id: number;
+  patch: MessageRequestUpdatePatch;
+};
+
+type WriterConfig = {
+  flushIntervalMs: number;
+  batchSize: number;
+  maxPending: number;
+};
+
+const COLUMN_MAP: Record<keyof MessageRequestUpdatePatch, string> = {
+  durationMs: "duration_ms",
+  costUsd: "cost_usd",
+  statusCode: "status_code",
+  inputTokens: "input_tokens",
+  outputTokens: "output_tokens",
+  ttfbMs: "ttfb_ms",
+  cacheCreationInputTokens: "cache_creation_input_tokens",
+  cacheReadInputTokens: "cache_read_input_tokens",
+  cacheCreation5mInputTokens: "cache_creation_5m_input_tokens",
+  cacheCreation1hInputTokens: "cache_creation_1h_input_tokens",
+  cacheTtlApplied: "cache_ttl_applied",
+  providerChain: "provider_chain",
+  errorMessage: "error_message",
+  errorStack: "error_stack",
+  errorCause: "error_cause",
+  model: "model",
+  providerId: "provider_id",
+  context1mApplied: "context_1m_applied",
+};
+
+function loadWriterConfig(): WriterConfig {
+  const env = getEnvConfig();
+  return {
+    flushIntervalMs: env.MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS ?? 250,
+    batchSize: env.MESSAGE_REQUEST_ASYNC_BATCH_SIZE ?? 200,
+    maxPending: env.MESSAGE_REQUEST_ASYNC_MAX_PENDING ?? 5000,
+  };
+}
+
+function takeBatch(map: Map<number, MessageRequestUpdatePatch>, batchSize: number) {
+  const items: MessageRequestUpdateRecord[] = [];
+  for (const [id, patch] of map) {
+    items.push({ id, patch });
+    map.delete(id);
+    if (items.length >= batchSize) {
+      break;
+    }
+  }
+  return items;
+}
+
+function buildBatchUpdateSql(updates: MessageRequestUpdateRecord[]): SQL | null {
+  if (updates.length === 0) {
+    return null;
+  }
+
+  const ids = updates.map((u) => u.id);
+
+  const setClauses: SQL[] = [];
+  for (const [key, columnName] of Object.entries(COLUMN_MAP) as Array<
+    [keyof MessageRequestUpdatePatch, string]
+  >) {
+    const cases: SQL[] = [];
+    for (const update of updates) {
+      const value = update.patch[key];
+      if (value === undefined) {
+        continue;
+      }
+
+      if (key === "providerChain") {
+        if (value === null) {
+          cases.push(sql`WHEN ${update.id} THEN NULL`);
+          continue;
+        }
+        const json = JSON.stringify(value);
+        cases.push(sql`WHEN ${update.id} THEN ${json}::jsonb`);
+        continue;
+      }
+
+      if (key === "costUsd") {
+        // numeric 类型,显式 cast 避免隐式类型推断异常
+        cases.push(sql`WHEN ${update.id} THEN ${value}::numeric`);
+        continue;
+      }
+
+      cases.push(sql`WHEN ${update.id} THEN ${value}`);
+    }
+
+    if (cases.length === 0) {
+      continue;
+    }
+
+    const col = sql.identifier(columnName);
+    setClauses.push(sql`${col} = CASE id ${sql.join(cases, sql` `)} ELSE ${col} END`);
+  }
+
+  // 没有任何可更新字段时跳过(避免无意义写入)
+  if (setClauses.length === 0) {
+    return null;
+  }
+
+  // 所有更新统一刷新 updated_at
+  setClauses.push(sql`${sql.identifier("updated_at")} = NOW()`);
+
+  const idList = sql.join(
+    ids.map((id) => sql`${id}`),
+    sql`, `
+  );
+
+  return sql`
+    UPDATE message_request
+    SET ${sql.join(setClauses, sql`, `)}
+    WHERE id IN (${idList}) AND deleted_at IS NULL
+  `;
+}
+
+class MessageRequestWriteBuffer {
+  private readonly config: WriterConfig;
+  private readonly pending = new Map<number, MessageRequestUpdatePatch>();
+  private flushTimer: NodeJS.Timeout | null = null;
+  private flushAgainAfterCurrent = false;
+  private flushInFlight: Promise<void> | null = null;
+  private stopping = false;
+
+  constructor(config: WriterConfig) {
+    this.config = config;
+  }
+
+  enqueue(id: number, patch: MessageRequestUpdatePatch): void {
+    const existing = this.pending.get(id) ?? {};
+    const merged: MessageRequestUpdatePatch = { ...existing };
+    for (const [k, v] of Object.entries(patch) as Array<
+      [keyof MessageRequestUpdatePatch, MessageRequestUpdatePatch[keyof MessageRequestUpdatePatch]]
+    >) {
+      if (v !== undefined) {
+        merged[k] = v as never;
+      }
+    }
+    this.pending.set(id, merged);
+
+    // 队列上限保护:DB 异常时避免无限增长导致 OOM
+    if (this.pending.size > this.config.maxPending) {
+      // 优先丢弃非“终态”更新(没有 durationMs 的条目),尽量保留请求完成信息
+      let droppedId: number | undefined;
+      let droppedPatch: MessageRequestUpdatePatch | undefined;
+
+      for (const [candidateId, candidatePatch] of this.pending) {
+        if (candidatePatch.durationMs === undefined) {
+          droppedId = candidateId;
+          droppedPatch = candidatePatch;
+          break;
+        }
+      }
+
+      if (droppedId === undefined) {
+        const first = this.pending.entries().next().value as
+          | [number, MessageRequestUpdatePatch]
+          | undefined;
+        if (first) {
+          droppedId = first[0];
+          droppedPatch = first[1];
+        }
+      }
+
+      if (droppedId !== undefined) {
+        this.pending.delete(droppedId);
+        logger.warn("[MessageRequestWriteBuffer] Pending queue overflow, dropping update", {
+          maxPending: this.config.maxPending,
+          droppedId,
+          droppedHasDurationMs: droppedPatch?.durationMs !== undefined,
+          currentPending: this.pending.size,
+        });
+      }
+    }
+
+    // flush 过程中有新任务:标记需要再跑一轮(避免刚好 flush 完成时遗漏)
+    if (this.flushInFlight) {
+      this.flushAgainAfterCurrent = true;
+      return;
+    }
+
+    // 停止阶段不再调度 timer,避免阻止进程退出
+    if (!this.stopping) {
+      this.ensureFlushTimer();
+    }
+
+    // 达到批量阈值时尽快 flush,降低 durationMs 为空的“悬挂时间”
+    if (this.pending.size >= this.config.batchSize) {
+      void this.flush();
+    }
+  }
+
+  private ensureFlushTimer(): void {
+    if (this.stopping || this.flushTimer) {
+      return;
+    }
+
+    this.flushTimer = setTimeout(() => {
+      this.flushTimer = null;
+      void this.flush();
+    }, this.config.flushIntervalMs);
+  }
+
+  private clearFlushTimer(): void {
+    if (this.flushTimer) {
+      clearTimeout(this.flushTimer);
+      this.flushTimer = null;
+    }
+  }
+
+  async flush(): Promise<void> {
+    if (this.flushInFlight) {
+      this.flushAgainAfterCurrent = true;
+      return this.flushInFlight;
+    }
+
+    // 进入 flush:先清理 timer,避免重复调度
+    this.clearFlushTimer();
+
+    this.flushInFlight = (async () => {
+      do {
+        this.flushAgainAfterCurrent = false;
+
+        while (this.pending.size > 0) {
+          const batch = takeBatch(this.pending, this.config.batchSize);
+          const query = buildBatchUpdateSql(batch);
+          if (!query) {
+            continue;
+          }
+
+          try {
+            await db.execute(query);
+          } catch (error) {
+            // 失败重试:将 batch 放回队列
+            // 合并策略:保留“更新更晚”的字段(existing 优先),避免覆盖新数据
+            for (const item of batch) {
+              const existing = this.pending.get(item.id) ?? {};
+              this.pending.set(item.id, { ...item.patch, ...existing });
+            }
+
+            logger.error("[MessageRequestWriteBuffer] Flush failed, will retry later", {
+              error: error instanceof Error ? error.message : String(error),
+              pending: this.pending.size,
+              batchSize: batch.length,
+            });
+
+            // DB 异常时不在当前循环内死磕,留待下一次 timer/手动 flush
+            break;
+          }
+        }
+      } while (this.flushAgainAfterCurrent);
+    })().finally(() => {
+      this.flushInFlight = null;
+      // 如果还有积压:运行态下继续用 timer 退避重试;停止阶段不再调度 timer
+      if (this.pending.size > 0 && !this.stopping) {
+        this.ensureFlushTimer();
+      }
+    });
+
+    await this.flushInFlight;
+  }
+
+  async stop(): Promise<void> {
+    this.stopping = true;
+    this.clearFlushTimer();
+    await this.flush();
+    // stop 期间尽量补刷一次,避免极小概率竞态导致的 tail 更新残留
+    if (this.pending.size > 0) {
+      await this.flush();
+    }
+  }
+}
+
+let _buffer: MessageRequestWriteBuffer | null = null;
+let _bufferState: "running" | "stopping" | "stopped" = "running";
+
+function getBuffer(): MessageRequestWriteBuffer | null {
+  if (!_buffer) {
+    if (_bufferState !== "running") {
+      return null;
+    }
+    _buffer = new MessageRequestWriteBuffer(loadWriterConfig());
+  }
+  return _buffer;
+}
+
+export function enqueueMessageRequestUpdate(id: number, patch: MessageRequestUpdatePatch): void {
+  // 只在 async 模式下启用队列,避免额外内存/定时器开销
+  if (getEnvConfig().MESSAGE_REQUEST_WRITE_MODE !== "async") {
+    return;
+  }
+  const buffer = getBuffer();
+  if (!buffer) {
+    return;
+  }
+  buffer.enqueue(id, patch);
+}
+
+export async function flushMessageRequestWriteBuffer(): Promise<void> {
+  if (!_buffer) {
+    return;
+  }
+  await _buffer.flush();
+}
+
+export async function stopMessageRequestWriteBuffer(): Promise<void> {
+  if (_bufferState === "stopped") {
+    return;
+  }
+  _bufferState = "stopping";
+
+  if (!_buffer) {
+    _bufferState = "stopped";
+    return;
+  }
+
+  await _buffer.stop();
+  _buffer = null;
+  _bufferState = "stopped";
+}

+ 17 - 0
src/repository/message.ts

@@ -3,9 +3,11 @@
 import { and, asc, desc, eq, gt, inArray, isNull, lt, sql } from "drizzle-orm";
 import { db } from "@/drizzle/db";
 import { keys as keysTable, messageRequest, providers, users } from "@/drizzle/schema";
+import { getEnvConfig } from "@/lib/config/env.schema";
 import { formatCostForStorage } from "@/lib/utils/currency";
 import type { CreateMessageRequestData, MessageRequest } from "@/types/message";
 import { toMessageRequest } from "./_shared/transformers";
+import { enqueueMessageRequestUpdate } from "./message-write-buffer";
 
 /**
  * 创建消息请求记录
@@ -67,6 +69,11 @@ export async function createMessageRequest(
  * 更新消息请求的耗时
  */
 export async function updateMessageRequestDuration(id: number, durationMs: number): Promise<void> {
+  if (getEnvConfig().MESSAGE_REQUEST_WRITE_MODE === "async") {
+    enqueueMessageRequestUpdate(id, { durationMs });
+    return;
+  }
+
   await db
     .update(messageRequest)
     .set({
@@ -88,6 +95,11 @@ export async function updateMessageRequestCost(
     return;
   }
 
+  if (getEnvConfig().MESSAGE_REQUEST_WRITE_MODE === "async") {
+    enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
+    return;
+  }
+
   await db
     .update(messageRequest)
     .set({
@@ -121,6 +133,11 @@ export async function updateMessageRequestDetails(
     context1mApplied?: boolean; // 是否应用了1M上下文窗口
   }
 ): Promise<void> {
+  if (getEnvConfig().MESSAGE_REQUEST_WRITE_MODE === "async") {
+    enqueueMessageRequestUpdate(id, details);
+    return;
+  }
+
   const updateData: Record<string, unknown> = {
     updatedAt: new Date(),
   };

+ 108 - 0
tests/unit/drizzle/db-pool-config.test.ts

@@ -0,0 +1,108 @@
+import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
+
+type EnvSnapshot = Partial<Record<string, string | undefined>>;
+
+function snapshotEnv(keys: string[]): EnvSnapshot {
+  const snapshot: EnvSnapshot = {};
+  for (const key of keys) {
+    snapshot[key] = process.env[key];
+  }
+  return snapshot;
+}
+
+function restoreEnv(snapshot: EnvSnapshot) {
+  for (const [key, value] of Object.entries(snapshot)) {
+    if (value === undefined) {
+      delete process.env[key];
+    } else {
+      process.env[key] = value;
+    }
+  }
+}
+
+describe("drizzle/db 连接池配置", () => {
+  const envKeys = [
+    "NODE_ENV",
+    "DSN",
+    "DB_POOL_MAX",
+    "DB_POOL_IDLE_TIMEOUT",
+    "DB_POOL_CONNECT_TIMEOUT",
+    "MESSAGE_REQUEST_WRITE_MODE",
+  ];
+
+  const postgresMock = vi.fn();
+  const drizzleMock = vi.fn(() => ({ __db: true }));
+
+  const originalEnv = snapshotEnv(envKeys);
+
+  beforeEach(() => {
+    vi.resetModules();
+    postgresMock.mockReset();
+    drizzleMock.mockReset();
+
+    // 确保每个用例有一致的基础环境
+    process.env.DSN = "postgres://postgres:postgres@localhost:5432/claude_code_hub_test";
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
+    delete process.env.DB_POOL_MAX;
+    delete process.env.DB_POOL_IDLE_TIMEOUT;
+    delete process.env.DB_POOL_CONNECT_TIMEOUT;
+
+    vi.doMock("postgres", () => ({ default: postgresMock }));
+    vi.doMock("drizzle-orm/postgres-js", () => ({
+      drizzle: drizzleMock,
+    }));
+  });
+
+  afterEach(() => {
+    restoreEnv(originalEnv);
+  });
+
+  it("生产环境默认 max=20、idle_timeout=20、connect_timeout=10", async () => {
+    process.env.NODE_ENV = "production";
+
+    const { getDb } = await import("@/drizzle/db");
+    getDb();
+
+    expect(postgresMock).toHaveBeenCalledWith(
+      process.env.DSN,
+      expect.objectContaining({
+        max: 20,
+        idle_timeout: 20,
+        connect_timeout: 10,
+      })
+    );
+  });
+
+  it("开发环境默认 max=10", async () => {
+    process.env.NODE_ENV = "development";
+
+    const { getDb } = await import("@/drizzle/db");
+    getDb();
+
+    expect(postgresMock).toHaveBeenCalledWith(
+      process.env.DSN,
+      expect.objectContaining({
+        max: 10,
+      })
+    );
+  });
+
+  it("支持通过 env 覆盖连接池参数", async () => {
+    process.env.NODE_ENV = "production";
+    process.env.DB_POOL_MAX = "50";
+    process.env.DB_POOL_IDLE_TIMEOUT = "30";
+    process.env.DB_POOL_CONNECT_TIMEOUT = "5";
+
+    const { getDb } = await import("@/drizzle/db");
+    getDb();
+
+    expect(postgresMock).toHaveBeenCalledWith(
+      process.env.DSN,
+      expect.objectContaining({
+        max: 50,
+        idle_timeout: 30,
+        connect_timeout: 5,
+      })
+    );
+  });
+});

+ 265 - 0
tests/unit/repository/message-write-buffer.test.ts

@@ -0,0 +1,265 @@
+import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
+
+type EnvSnapshot = Partial<Record<string, string | undefined>>;
+
+function snapshotEnv(keys: string[]): EnvSnapshot {
+  const snapshot: EnvSnapshot = {};
+  for (const key of keys) {
+    snapshot[key] = process.env[key];
+  }
+  return snapshot;
+}
+
+function restoreEnv(snapshot: EnvSnapshot) {
+  for (const [key, value] of Object.entries(snapshot)) {
+    if (value === undefined) {
+      delete process.env[key];
+    } else {
+      process.env[key] = value;
+    }
+  }
+}
+
+function toSqlText(query: { toQuery: (config: any) => { sql: string; params: unknown[] } }) {
+  return query.toQuery({
+    escapeName: (name: string) => `"${name}"`,
+    escapeParam: (index: number) => `$${index}`,
+    escapeString: (value: string) => `'${value}'`,
+    paramStartIndex: { value: 1 },
+  });
+}
+
+function createDeferred<T>() {
+  let resolve!: (value: T) => void;
+  let reject!: (error: unknown) => void;
+  const promise = new Promise<T>((res, rej) => {
+    resolve = res;
+    reject = rej;
+  });
+  return { promise, resolve, reject };
+}
+
+describe("message_request 异步批量写入", () => {
+  const envKeys = [
+    "NODE_ENV",
+    "DSN",
+    "MESSAGE_REQUEST_WRITE_MODE",
+    "MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS",
+    "MESSAGE_REQUEST_ASYNC_BATCH_SIZE",
+    "MESSAGE_REQUEST_ASYNC_MAX_PENDING",
+  ];
+  const originalEnv = snapshotEnv(envKeys);
+
+  const executeMock = vi.fn(async () => []);
+
+  beforeEach(() => {
+    vi.resetModules();
+    executeMock.mockClear();
+
+    process.env.NODE_ENV = "test";
+    process.env.DSN = "postgres://postgres:postgres@localhost:5432/claude_code_hub_test";
+    process.env.MESSAGE_REQUEST_ASYNC_FLUSH_INTERVAL_MS = "60000";
+    process.env.MESSAGE_REQUEST_ASYNC_BATCH_SIZE = "1000";
+    process.env.MESSAGE_REQUEST_ASYNC_MAX_PENDING = "1000";
+
+    vi.doMock("@/drizzle/db", () => ({
+      db: {
+        execute: executeMock,
+        // 避免 tests/setup.ts 的 afterAll 清理逻辑因 mock 缺失 select 而报错
+        select: () => ({
+          from: () => ({
+            where: async () => [],
+          }),
+        }),
+      },
+    }));
+  });
+
+  afterEach(() => {
+    restoreEnv(originalEnv);
+  });
+
+  it("sync 模式下不应入队/写库", async () => {
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "sync";
+
+    const { enqueueMessageRequestUpdate, flushMessageRequestWriteBuffer } = await import(
+      "@/repository/message-write-buffer"
+    );
+
+    enqueueMessageRequestUpdate(1, { durationMs: 123 });
+    await flushMessageRequestWriteBuffer();
+
+    expect(executeMock).not.toHaveBeenCalled();
+  });
+
+  it("async 模式下应合并同一 id 的多次更新并批量写入", async () => {
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
+
+    const {
+      enqueueMessageRequestUpdate,
+      flushMessageRequestWriteBuffer,
+      stopMessageRequestWriteBuffer,
+    } = await import("@/repository/message-write-buffer");
+
+    enqueueMessageRequestUpdate(42, { durationMs: 100 });
+    enqueueMessageRequestUpdate(42, { statusCode: 200, ttfbMs: 10 });
+
+    await flushMessageRequestWriteBuffer();
+    await stopMessageRequestWriteBuffer();
+
+    expect(executeMock).toHaveBeenCalledTimes(1);
+
+    const query = executeMock.mock.calls[0]?.[0];
+    const built = toSqlText(query);
+
+    expect(built.sql).toContain("UPDATE message_request");
+    expect(built.sql).toContain("duration_ms");
+    expect(built.sql).toContain("status_code");
+    expect(built.sql).toContain("ttfb_ms");
+    expect(built.sql).toContain("updated_at");
+    expect(built.sql).toContain("deleted_at IS NULL");
+  });
+
+  it("应对 costUsd/providerChain 做显式类型转换(numeric/jsonb)", async () => {
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
+
+    const { enqueueMessageRequestUpdate, stopMessageRequestWriteBuffer } = await import(
+      "@/repository/message-write-buffer"
+    );
+
+    enqueueMessageRequestUpdate(7, {
+      costUsd: "0.000123",
+      providerChain: [{ id: 1, name: "p1" }],
+    });
+
+    await stopMessageRequestWriteBuffer();
+
+    expect(executeMock).toHaveBeenCalledTimes(1);
+
+    const query = executeMock.mock.calls[0]?.[0];
+    const built = toSqlText(query);
+
+    expect(built.sql).toContain("::numeric");
+    expect(built.sql).toContain("::jsonb");
+  });
+
+  it("stop 应等待 in-flight flush 完成", async () => {
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
+
+    const deferred = createDeferred<unknown[]>();
+    executeMock.mockImplementationOnce(async () => deferred.promise);
+
+    const { enqueueMessageRequestUpdate, stopMessageRequestWriteBuffer } = await import(
+      "@/repository/message-write-buffer"
+    );
+
+    enqueueMessageRequestUpdate(1, { durationMs: 123 });
+
+    const stopPromise = stopMessageRequestWriteBuffer();
+
+    expect(executeMock).toHaveBeenCalledTimes(1);
+
+    const raced = await Promise.race([
+      stopPromise.then(() => "stopped"),
+      Promise.resolve("pending"),
+    ]);
+    expect(raced).toBe("pending");
+
+    deferred.resolve([]);
+    await stopPromise;
+  });
+
+  it("flush 进行中 enqueue 的更新应最终落库", async () => {
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
+
+    const firstExecute = createDeferred<unknown[]>();
+    executeMock.mockImplementationOnce(async () => firstExecute.promise);
+    executeMock.mockImplementationOnce(async () => []);
+
+    const {
+      enqueueMessageRequestUpdate,
+      flushMessageRequestWriteBuffer,
+      stopMessageRequestWriteBuffer,
+    } = await import("@/repository/message-write-buffer");
+
+    enqueueMessageRequestUpdate(42, { durationMs: 100 });
+
+    const flushPromise = flushMessageRequestWriteBuffer();
+    expect(executeMock).toHaveBeenCalledTimes(1);
+
+    // 在第一次写入尚未完成时,追加同一请求的后续 patch
+    enqueueMessageRequestUpdate(42, { statusCode: 200 });
+
+    firstExecute.resolve([]);
+
+    await flushPromise;
+    await stopMessageRequestWriteBuffer();
+
+    expect(executeMock).toHaveBeenCalledTimes(2);
+
+    const secondQuery = executeMock.mock.calls[1]?.[0];
+    const built = toSqlText(secondQuery);
+    expect(built.sql).toContain("status_code");
+  });
+
+  it("DB 写入失败重试时不应覆盖更晚的 patch", async () => {
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
+
+    const firstExecute = createDeferred<unknown[]>();
+    executeMock.mockImplementationOnce(async () => firstExecute.promise);
+    executeMock.mockImplementationOnce(async () => []);
+
+    const {
+      enqueueMessageRequestUpdate,
+      flushMessageRequestWriteBuffer,
+      stopMessageRequestWriteBuffer,
+    } = await import("@/repository/message-write-buffer");
+
+    enqueueMessageRequestUpdate(7, { durationMs: 100 });
+
+    const flushPromise = flushMessageRequestWriteBuffer();
+    expect(executeMock).toHaveBeenCalledTimes(1);
+
+    // 在第一次 flush 的 in-flight 期间写入“更晚”的字段
+    enqueueMessageRequestUpdate(7, { statusCode: 500 });
+
+    firstExecute.reject(new Error("db down"));
+    await flushPromise;
+
+    // 触发下一次 flush:应同时包含 duration/statusCode
+    await flushMessageRequestWriteBuffer();
+    await stopMessageRequestWriteBuffer();
+
+    expect(executeMock).toHaveBeenCalledTimes(2);
+
+    const secondQuery = executeMock.mock.calls[1]?.[0];
+    const built = toSqlText(secondQuery);
+    expect(built.sql).toContain("duration_ms");
+    expect(built.sql).toContain("status_code");
+  });
+
+  it("队列溢出时应优先丢弃非终态更新(尽量保留 durationMs)", async () => {
+    process.env.MESSAGE_REQUEST_WRITE_MODE = "async";
+    process.env.MESSAGE_REQUEST_ASYNC_MAX_PENDING = "100";
+
+    const { enqueueMessageRequestUpdate, stopMessageRequestWriteBuffer } = await import(
+      "@/repository/message-write-buffer"
+    );
+
+    enqueueMessageRequestUpdate(1001, { statusCode: 200 }); // 非终态(无 durationMs)
+    for (let i = 0; i < 100; i++) {
+      enqueueMessageRequestUpdate(2000 + i, { durationMs: i });
+    }
+
+    await stopMessageRequestWriteBuffer();
+
+    expect(executeMock).toHaveBeenCalledTimes(1);
+
+    const query = executeMock.mock.calls[0]?.[0];
+    const built = toSqlText(query);
+
+    expect(built.params).toContain(2000);
+    expect(built.params).toContain(2099);
+    expect(built.params).not.toContain(1001);
+  });
+});