Browse Source

refactor(repo): migrate message.ts session aggregation to usage_ledger

- aggregateSessionStats: read from usageLedger instead of messageRequest
- aggregateMultipleSessionStats: read from usageLedger instead of messageRequest
- Use LEDGER_BILLING_CONDITION instead of EXCLUDE_WARMUP_CONDITION + FILTER
- Use finalProviderId for provider sub-queries
- Remove deletedAt checks (ledger has no soft-delete)
- Detail-view functions unchanged (still on messageRequest)
ding113 2 weeks ago
parent
commit
4c9a6921ed
2 changed files with 96 additions and 73 deletions
  1. 31 0
      .sisyphus/evidence/task-11-session-agg-migrated.txt
  2. 65 73
      src/repository/message.ts

+ 31 - 0
.sisyphus/evidence/task-11-session-agg-migrated.txt

@@ -0,0 +1,31 @@
+Task 11: Migrate message.ts session aggregation to usage_ledger
+================================================================
+
+File changed: src/repository/message.ts
+
+Functions migrated:
+1. aggregateSessionStats() - single session stats
+2. aggregateMultipleSessionStats() - batch session stats
+
+Changes per function:
+- .from(messageRequest) -> .from(usageLedger)
+- messageRequest.X -> usageLedger.X for all aggregated columns
+- EXCLUDE_WARMUP_CONDITION -> LEDGER_BILLING_CONDITION (warmup already excluded at trigger level)
+- Removed isNull(messageRequest.deletedAt) (no deletedAt on ledger)
+- Provider sub-query: messageRequest.providerId -> usageLedger.finalProviderId
+- Cache TTL sub-query: messageRequest.cacheTtlApplied -> usageLedger.cacheTtlApplied
+- Model sub-query: messageRequest.model -> usageLedger.model
+- FILTER (WHERE EXCLUDE_WARMUP_CONDITION) removed from aggregates (ledger has no warmup rows)
+
+NOT migrated (detail-view, stays on messageRequest):
+- userInfo sub-query (step 4) - needs userAgent, apiType, key join
+- findMessageRequestById, findMessageRequestBySessionId, etc.
+
+Import changes:
+- Added: usageLedger from schema
+- Added: LEDGER_BILLING_CONDITION from ledger-conditions
+- Kept: messageRequest, EXCLUDE_WARMUP_CONDITION (used by detail functions)
+
+Return types: UNCHANGED
+
+Typecheck: PASS (tsgo exit code 0, 0.51s)

+ 65 - 73
src/repository/message.ts

@@ -2,11 +2,12 @@
 
 import { and, asc, desc, eq, gt, inArray, isNull, lt, sql } from "drizzle-orm";
 import { db } from "@/drizzle/db";
-import { keys as keysTable, messageRequest, providers, users } from "@/drizzle/schema";
+import { keys as keysTable, messageRequest, providers, usageLedger, users } from "@/drizzle/schema";
 import { getEnvConfig } from "@/lib/config/env.schema";
 import { formatCostForStorage } from "@/lib/utils/currency";
 import type { CreateMessageRequestData, MessageRequest, ProviderChainItem } from "@/types/message";
 import type { SpecialSetting } from "@/types/special-settings";
+import { LEDGER_BILLING_CONDITION } from "./_shared/ledger-conditions";
 import { EXCLUDE_WARMUP_CONDITION } from "./_shared/message-request-conditions";
 import { toMessageRequest } from "./_shared/transformers";
 import { enqueueMessageRequestUpdate } from "./message-write-buffer";
@@ -382,69 +383,63 @@ export async function aggregateSessionStats(sessionId: string): Promise<{
   apiType: string | null;
   cacheTtlApplied: string | null;
 } | null> {
-  // 1. 聚合统计
+  // 1. 聚合统计(从 usageLedger 读取,warmup 已在触发器层面排除)
   const [stats] = await db
     .select({
-      // Session 存在性:包含所有请求(含 warmup)
-      totalCount: sql<number>`count(*)::double precision`,
-      // Session 统计:排除 warmup(不计入任何统计)
-      requestCount: sql<number>`count(*) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision`,
-      totalCostUsd: sql<string>`COALESCE(sum(${messageRequest.costUsd}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION}), 0)`,
-      totalInputTokens: sql<number>`COALESCE(sum(${messageRequest.inputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalOutputTokens: sql<number>`COALESCE(sum(${messageRequest.outputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalCacheCreationTokens: sql<number>`COALESCE(sum(${messageRequest.cacheCreationInputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalCacheReadTokens: sql<number>`COALESCE(sum(${messageRequest.cacheReadInputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalDurationMs: sql<number>`COALESCE(sum(${messageRequest.durationMs}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      firstRequestAt: sql<Date>`min(${messageRequest.createdAt}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})`,
-      lastRequestAt: sql<Date>`max(${messageRequest.createdAt}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})`,
+      requestCount: sql<number>`count(*)::double precision`,
+      totalCostUsd: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
+      totalInputTokens: sql<number>`COALESCE(sum(${usageLedger.inputTokens})::double precision, 0::double precision)`,
+      totalOutputTokens: sql<number>`COALESCE(sum(${usageLedger.outputTokens})::double precision, 0::double precision)`,
+      totalCacheCreationTokens: sql<number>`COALESCE(sum(${usageLedger.cacheCreationInputTokens})::double precision, 0::double precision)`,
+      totalCacheReadTokens: sql<number>`COALESCE(sum(${usageLedger.cacheReadInputTokens})::double precision, 0::double precision)`,
+      totalDurationMs: sql<number>`COALESCE(sum(${usageLedger.durationMs})::double precision, 0::double precision)`,
+      firstRequestAt: sql<Date>`min(${usageLedger.createdAt})`,
+      lastRequestAt: sql<Date>`max(${usageLedger.createdAt})`,
     })
-    .from(messageRequest)
-    .where(and(eq(messageRequest.sessionId, sessionId), isNull(messageRequest.deletedAt)));
+    .from(usageLedger)
+    .where(and(eq(usageLedger.sessionId, sessionId), LEDGER_BILLING_CONDITION));
 
-  if (!stats || stats.totalCount === 0) {
+  if (!stats || stats.requestCount === 0) {
     return null;
   }
 
   // 2. 查询供应商列表(去重)
   const providerList = await db
     .selectDistinct({
-      providerId: messageRequest.providerId,
+      providerId: usageLedger.finalProviderId,
       providerName: providers.name,
     })
-    .from(messageRequest)
-    .leftJoin(providers, eq(messageRequest.providerId, providers.id))
+    .from(usageLedger)
+    .leftJoin(providers, eq(usageLedger.finalProviderId, providers.id))
     .where(
       and(
-        eq(messageRequest.sessionId, sessionId),
-        isNull(messageRequest.deletedAt),
-        EXCLUDE_WARMUP_CONDITION,
-        sql`${messageRequest.providerId} IS NOT NULL`
+        eq(usageLedger.sessionId, sessionId),
+        LEDGER_BILLING_CONDITION,
+        sql`${usageLedger.finalProviderId} IS NOT NULL`
       )
     );
 
   // 3. 查询模型列表(去重)
   const modelList = await db
-    .selectDistinct({ model: messageRequest.model })
-    .from(messageRequest)
+    .selectDistinct({ model: usageLedger.model })
+    .from(usageLedger)
     .where(
       and(
-        eq(messageRequest.sessionId, sessionId),
-        isNull(messageRequest.deletedAt),
-        EXCLUDE_WARMUP_CONDITION,
-        sql`${messageRequest.model} IS NOT NULL`
+        eq(usageLedger.sessionId, sessionId),
+        LEDGER_BILLING_CONDITION,
+        sql`${usageLedger.model} IS NOT NULL`
       )
     );
 
-  // 3.1 查询 Cache TTL 列表(去重,用于显示缓存时间信息
+  // 3.1 查询 Cache TTL 列表(去重)
   const cacheTtlList = await db
-    .selectDistinct({ cacheTtl: messageRequest.cacheTtlApplied })
-    .from(messageRequest)
+    .selectDistinct({ cacheTtl: usageLedger.cacheTtlApplied })
+    .from(usageLedger)
     .where(
       and(
-        eq(messageRequest.sessionId, sessionId),
-        isNull(messageRequest.deletedAt),
-        EXCLUDE_WARMUP_CONDITION,
-        sql`${messageRequest.cacheTtlApplied} IS NOT NULL`
+        eq(usageLedger.sessionId, sessionId),
+        LEDGER_BILLING_CONDITION,
+        sql`${usageLedger.cacheTtlApplied} IS NOT NULL`
       )
     );
 
@@ -539,23 +534,23 @@ export async function aggregateMultipleSessionStats(sessionIds: string[]): Promi
     return [];
   }
 
-  // 1. 批量聚合统计(单次查询)
+  // 1. 批量聚合统计(从 usageLedger,单次查询)
   const statsResults = await db
     .select({
-      sessionId: messageRequest.sessionId,
-      requestCount: sql<number>`count(*) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision`,
-      totalCostUsd: sql<string>`COALESCE(sum(${messageRequest.costUsd}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION}), 0)`,
-      totalInputTokens: sql<number>`COALESCE(sum(${messageRequest.inputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalOutputTokens: sql<number>`COALESCE(sum(${messageRequest.outputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalCacheCreationTokens: sql<number>`COALESCE(sum(${messageRequest.cacheCreationInputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalCacheReadTokens: sql<number>`COALESCE(sum(${messageRequest.cacheReadInputTokens}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      totalDurationMs: sql<number>`COALESCE(sum(${messageRequest.durationMs}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})::double precision, 0::double precision)`,
-      firstRequestAt: sql<Date>`min(${messageRequest.createdAt}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})`,
-      lastRequestAt: sql<Date>`max(${messageRequest.createdAt}) FILTER (WHERE ${EXCLUDE_WARMUP_CONDITION})`,
+      sessionId: usageLedger.sessionId,
+      requestCount: sql<number>`count(*)::double precision`,
+      totalCostUsd: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
+      totalInputTokens: sql<number>`COALESCE(sum(${usageLedger.inputTokens})::double precision, 0::double precision)`,
+      totalOutputTokens: sql<number>`COALESCE(sum(${usageLedger.outputTokens})::double precision, 0::double precision)`,
+      totalCacheCreationTokens: sql<number>`COALESCE(sum(${usageLedger.cacheCreationInputTokens})::double precision, 0::double precision)`,
+      totalCacheReadTokens: sql<number>`COALESCE(sum(${usageLedger.cacheReadInputTokens})::double precision, 0::double precision)`,
+      totalDurationMs: sql<number>`COALESCE(sum(${usageLedger.durationMs})::double precision, 0::double precision)`,
+      firstRequestAt: sql<Date>`min(${usageLedger.createdAt})`,
+      lastRequestAt: sql<Date>`max(${usageLedger.createdAt})`,
     })
-    .from(messageRequest)
-    .where(and(inArray(messageRequest.sessionId, sessionIds), isNull(messageRequest.deletedAt)))
-    .groupBy(messageRequest.sessionId);
+    .from(usageLedger)
+    .where(and(inArray(usageLedger.sessionId, sessionIds), LEDGER_BILLING_CONDITION))
+    .groupBy(usageLedger.sessionId);
 
   // 创建 sessionId → stats 的 Map
   const statsMap = new Map(statsResults.map((s) => [s.sessionId, s]));
@@ -563,18 +558,17 @@ export async function aggregateMultipleSessionStats(sessionIds: string[]): Promi
   // 2. 批量查询供应商列表(按 session 分组)
   const providerResults = await db
     .selectDistinct({
-      sessionId: messageRequest.sessionId,
-      providerId: messageRequest.providerId,
+      sessionId: usageLedger.sessionId,
+      providerId: usageLedger.finalProviderId,
       providerName: providers.name,
     })
-    .from(messageRequest)
-    .leftJoin(providers, eq(messageRequest.providerId, providers.id))
+    .from(usageLedger)
+    .leftJoin(providers, eq(usageLedger.finalProviderId, providers.id))
     .where(
       and(
-        inArray(messageRequest.sessionId, sessionIds),
-        isNull(messageRequest.deletedAt),
-        EXCLUDE_WARMUP_CONDITION,
-        sql`${messageRequest.providerId} IS NOT NULL`
+        inArray(usageLedger.sessionId, sessionIds),
+        LEDGER_BILLING_CONDITION,
+        sql`${usageLedger.finalProviderId} IS NOT NULL`
       )
     );
 
@@ -596,16 +590,15 @@ export async function aggregateMultipleSessionStats(sessionIds: string[]): Promi
   // 3. 批量查询模型列表(按 session 分组)
   const modelResults = await db
     .selectDistinct({
-      sessionId: messageRequest.sessionId,
-      model: messageRequest.model,
+      sessionId: usageLedger.sessionId,
+      model: usageLedger.model,
     })
-    .from(messageRequest)
+    .from(usageLedger)
     .where(
       and(
-        inArray(messageRequest.sessionId, sessionIds),
-        isNull(messageRequest.deletedAt),
-        EXCLUDE_WARMUP_CONDITION,
-        sql`${messageRequest.model} IS NOT NULL`
+        inArray(usageLedger.sessionId, sessionIds),
+        LEDGER_BILLING_CONDITION,
+        sql`${usageLedger.model} IS NOT NULL`
       )
     );
 
@@ -624,16 +617,15 @@ export async function aggregateMultipleSessionStats(sessionIds: string[]): Promi
   // 3.1 批量查询 Cache TTL 列表(按 session 分组)
   const cacheTtlResults = await db
     .selectDistinct({
-      sessionId: messageRequest.sessionId,
-      cacheTtl: messageRequest.cacheTtlApplied,
+      sessionId: usageLedger.sessionId,
+      cacheTtl: usageLedger.cacheTtlApplied,
     })
-    .from(messageRequest)
+    .from(usageLedger)
     .where(
       and(
-        inArray(messageRequest.sessionId, sessionIds),
-        isNull(messageRequest.deletedAt),
-        EXCLUDE_WARMUP_CONDITION,
-        sql`${messageRequest.cacheTtlApplied} IS NOT NULL`
+        inArray(usageLedger.sessionId, sessionIds),
+        LEDGER_BILLING_CONDITION,
+        sql`${usageLedger.cacheTtlApplied} IS NOT NULL`
       )
     );