Browse Source

refactor(repo): migrate leaderboard.ts read paths to usage_ledger

ding113 3 weeks ago
parent
commit
2111684c87
1 changed files with 68 additions and 77 deletions
  1. 68 77
      src/repository/leaderboard.ts

+ 68 - 77
src/repository/leaderboard.ts

@@ -2,10 +2,10 @@
 
 import { and, desc, eq, isNull, sql } from "drizzle-orm";
 import { db } from "@/drizzle/db";
-import { messageRequest, providers, users } from "@/drizzle/schema";
+import { providers, usageLedger, users } from "@/drizzle/schema";
 import { resolveSystemTimezone } from "@/lib/utils/timezone";
 import type { ProviderType } from "@/types/provider";
-import { EXCLUDE_WARMUP_CONDITION } from "./_shared/message-request-conditions";
+import { LEDGER_BILLING_CONDITION } from "./_shared/ledger-conditions";
 import { getSystemSettings } from "./system-config";
 
 /**
@@ -166,7 +166,7 @@ function buildDateCondition(
     const endExclusiveLocal = sql`(${dateRange.endDate}::date + INTERVAL '1 day')`;
     const start = sql`(${startLocal} AT TIME ZONE ${timezone})`;
     const endExclusive = sql`(${endExclusiveLocal} AT TIME ZONE ${timezone})`;
-    return sql`${messageRequest.createdAt} >= ${start} AND ${messageRequest.createdAt} < ${endExclusive}`;
+    return sql`${usageLedger.createdAt} >= ${start} AND ${usageLedger.createdAt} < ${endExclusive}`;
   }
 
   switch (period) {
@@ -177,23 +177,23 @@ function buildDateCondition(
       const endExclusiveLocal = sql`(${startLocal} + INTERVAL '1 day')`;
       const start = sql`(${startLocal} AT TIME ZONE ${timezone})`;
       const endExclusive = sql`(${endExclusiveLocal} AT TIME ZONE ${timezone})`;
-      return sql`${messageRequest.createdAt} >= ${start} AND ${messageRequest.createdAt} < ${endExclusive}`;
+      return sql`${usageLedger.createdAt} >= ${start} AND ${usageLedger.createdAt} < ${endExclusive}`;
     }
     case "last24h":
-      return sql`${messageRequest.createdAt} >= (CURRENT_TIMESTAMP - INTERVAL '24 hours')`;
+      return sql`${usageLedger.createdAt} >= (CURRENT_TIMESTAMP - INTERVAL '24 hours')`;
     case "weekly": {
       const startLocal = sql`DATE_TRUNC('week', ${nowLocal})`;
       const endExclusiveLocal = sql`(${startLocal} + INTERVAL '1 week')`;
       const start = sql`(${startLocal} AT TIME ZONE ${timezone})`;
       const endExclusive = sql`(${endExclusiveLocal} AT TIME ZONE ${timezone})`;
-      return sql`${messageRequest.createdAt} >= ${start} AND ${messageRequest.createdAt} < ${endExclusive}`;
+      return sql`${usageLedger.createdAt} >= ${start} AND ${usageLedger.createdAt} < ${endExclusive}`;
     }
     case "monthly": {
       const startLocal = sql`DATE_TRUNC('month', ${nowLocal})`;
       const endExclusiveLocal = sql`(${startLocal} + INTERVAL '1 month')`;
       const start = sql`(${startLocal} AT TIME ZONE ${timezone})`;
       const endExclusive = sql`(${endExclusiveLocal} AT TIME ZONE ${timezone})`;
-      return sql`${messageRequest.createdAt} >= ${start} AND ${messageRequest.createdAt} < ${endExclusive}`;
+      return sql`${usageLedger.createdAt} >= ${start} AND ${usageLedger.createdAt} < ${endExclusive}`;
     }
     default:
       return sql`1=1`;
@@ -210,8 +210,7 @@ async function findLeaderboardWithTimezone(
   userFilters?: UserLeaderboardFilters
 ): Promise<LeaderboardEntry[]> {
   const whereConditions = [
-    isNull(messageRequest.deletedAt),
-    EXCLUDE_WARMUP_CONDITION,
+    LEDGER_BILLING_CONDITION,
     buildDateCondition(period, timezone, dateRange),
   ];
 
@@ -242,25 +241,25 @@ async function findLeaderboardWithTimezone(
 
   const rankings = await db
     .select({
-      userId: messageRequest.userId,
+      userId: usageLedger.userId,
       userName: users.name,
       totalRequests: sql<number>`count(*)::double precision`,
-      totalCost: sql<string>`COALESCE(sum(${messageRequest.costUsd}), 0)`,
+      totalCost: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
       totalTokens: sql<number>`COALESCE(
         sum(
-          ${messageRequest.inputTokens} +
-          ${messageRequest.outputTokens} +
-          COALESCE(${messageRequest.cacheCreationInputTokens}, 0) +
-          COALESCE(${messageRequest.cacheReadInputTokens}, 0)
+          ${usageLedger.inputTokens} +
+          ${usageLedger.outputTokens} +
+          COALESCE(${usageLedger.cacheCreationInputTokens}, 0) +
+          COALESCE(${usageLedger.cacheReadInputTokens}, 0)
         )::double precision,
         0::double precision
       )`,
     })
-    .from(messageRequest)
-    .innerJoin(users, and(sql`${messageRequest.userId} = ${users.id}`, isNull(users.deletedAt)))
+    .from(usageLedger)
+    .innerJoin(users, and(sql`${usageLedger.userId} = ${users.id}`, isNull(users.deletedAt)))
     .where(and(...whereConditions))
-    .groupBy(messageRequest.userId, users.name)
-    .orderBy(desc(sql`sum(${messageRequest.costUsd})`));
+    .groupBy(usageLedger.userId, users.name)
+    .orderBy(desc(sql`sum(${usageLedger.costUsd})`));
 
   return rankings.map((entry) => ({
     userId: entry.userId,
@@ -394,58 +393,57 @@ async function findProviderLeaderboardWithTimezone(
   providerType?: ProviderType
 ): Promise<ProviderLeaderboardEntry[]> {
   const whereConditions = [
-    isNull(messageRequest.deletedAt),
-    EXCLUDE_WARMUP_CONDITION,
+    LEDGER_BILLING_CONDITION,
     buildDateCondition(period, timezone, dateRange),
     providerType ? eq(providers.providerType, providerType) : undefined,
   ];
 
   const rankings = await db
     .select({
-      providerId: messageRequest.providerId,
+      providerId: usageLedger.finalProviderId,
       providerName: providers.name,
       totalRequests: sql<number>`count(*)::double precision`,
-      totalCost: sql<string>`COALESCE(sum(${messageRequest.costUsd}), 0)`,
+      totalCost: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
       totalTokens: sql<number>`COALESCE(
         sum(
-          ${messageRequest.inputTokens} +
-          ${messageRequest.outputTokens} +
-          COALESCE(${messageRequest.cacheCreationInputTokens}, 0) +
-          COALESCE(${messageRequest.cacheReadInputTokens}, 0)
+          ${usageLedger.inputTokens} +
+          ${usageLedger.outputTokens} +
+          COALESCE(${usageLedger.cacheCreationInputTokens}, 0) +
+          COALESCE(${usageLedger.cacheReadInputTokens}, 0)
         )::double precision,
         0::double precision
       )`,
       successRate: sql<number>`COALESCE(
-        count(CASE WHEN ${messageRequest.errorMessage} IS NULL OR ${messageRequest.errorMessage} = '' THEN 1 END)::double precision
+        count(CASE WHEN ${usageLedger.isSuccess} THEN 1 END)::double precision
         / NULLIF(count(*)::double precision, 0),
         0::double precision
       )`,
-      avgTtfbMs: sql<number>`COALESCE(avg(${messageRequest.ttfbMs})::double precision, 0::double precision)`,
+      avgTtfbMs: sql<number>`COALESCE(avg(${usageLedger.ttfbMs})::double precision, 0::double precision)`,
       avgTokensPerSecond: sql<number>`COALESCE(
         avg(
           CASE
-            WHEN ${messageRequest.outputTokens} > 0
-              AND ${messageRequest.durationMs} IS NOT NULL
-              AND ${messageRequest.ttfbMs} IS NOT NULL
-              AND ${messageRequest.ttfbMs} < ${messageRequest.durationMs}
-              AND (${messageRequest.durationMs} - ${messageRequest.ttfbMs}) >= 100
-            THEN (${messageRequest.outputTokens}::double precision)
-              / ((${messageRequest.durationMs} - ${messageRequest.ttfbMs}) / 1000.0)
+            WHEN ${usageLedger.outputTokens} > 0
+              AND ${usageLedger.durationMs} IS NOT NULL
+              AND ${usageLedger.ttfbMs} IS NOT NULL
+              AND ${usageLedger.ttfbMs} < ${usageLedger.durationMs}
+              AND (${usageLedger.durationMs} - ${usageLedger.ttfbMs}) >= 100
+            THEN (${usageLedger.outputTokens}::double precision)
+              / ((${usageLedger.durationMs} - ${usageLedger.ttfbMs}) / 1000.0)
           END
         )::double precision,
         0::double precision
       )`,
     })
-    .from(messageRequest)
+    .from(usageLedger)
     .innerJoin(
       providers,
-      and(sql`${messageRequest.providerId} = ${providers.id}`, isNull(providers.deletedAt))
+      and(sql`${usageLedger.finalProviderId} = ${providers.id}`, isNull(providers.deletedAt))
     )
     .where(
       and(...whereConditions.filter((c): c is NonNullable<(typeof whereConditions)[number]> => !!c))
     )
-    .groupBy(messageRequest.providerId, providers.name)
-    .orderBy(desc(sql`sum(${messageRequest.costUsd})`));
+    .groupBy(usageLedger.finalProviderId, providers.name)
+    .orderBy(desc(sql`sum(${usageLedger.costUsd})`));
 
   return rankings.map((entry) => {
     const totalCost = parseFloat(entry.totalCost);
@@ -480,19 +478,19 @@ async function findProviderCacheHitRateLeaderboardWithTimezone(
   providerType?: ProviderType
 ): Promise<ProviderCacheHitRateLeaderboardEntry[]> {
   const totalInputTokensExpr = sql<number>`(
-    COALESCE(${messageRequest.inputTokens}, 0)::double precision +
-    COALESCE(${messageRequest.cacheCreationInputTokens}, 0)::double precision +
-    COALESCE(${messageRequest.cacheReadInputTokens}, 0)::double precision
+    COALESCE(${usageLedger.inputTokens}, 0)::double precision +
+    COALESCE(${usageLedger.cacheCreationInputTokens}, 0)::double precision +
+    COALESCE(${usageLedger.cacheReadInputTokens}, 0)::double precision
   )`;
 
   const cacheRequiredCondition = sql`(
-    COALESCE(${messageRequest.cacheCreationInputTokens}, 0) > 0
-    OR COALESCE(${messageRequest.cacheReadInputTokens}, 0) > 0
+    COALESCE(${usageLedger.cacheCreationInputTokens}, 0) > 0
+    OR COALESCE(${usageLedger.cacheReadInputTokens}, 0) > 0
   )`;
 
   const sumTotalInputTokens = sql<number>`COALESCE(sum(${totalInputTokensExpr})::double precision, 0::double precision)`;
-  const sumCacheReadTokens = sql<number>`COALESCE(sum(COALESCE(${messageRequest.cacheReadInputTokens}, 0))::double precision, 0::double precision)`;
-  const sumCacheCreationCost = sql<string>`COALESCE(sum(CASE WHEN COALESCE(${messageRequest.cacheCreationInputTokens}, 0) > 0 THEN ${messageRequest.costUsd} ELSE 0 END), 0)`;
+  const sumCacheReadTokens = sql<number>`COALESCE(sum(COALESCE(${usageLedger.cacheReadInputTokens}, 0))::double precision, 0::double precision)`;
+  const sumCacheCreationCost = sql<string>`COALESCE(sum(CASE WHEN COALESCE(${usageLedger.cacheCreationInputTokens}, 0) > 0 THEN ${usageLedger.costUsd} ELSE 0 END), 0)`;
 
   const cacheHitRateExpr = sql<number>`COALESCE(
     ${sumCacheReadTokens} / NULLIF(${sumTotalInputTokens}, 0::double precision),
@@ -500,8 +498,7 @@ async function findProviderCacheHitRateLeaderboardWithTimezone(
   )`;
 
   const whereConditions = [
-    isNull(messageRequest.deletedAt),
-    EXCLUDE_WARMUP_CONDITION,
+    LEDGER_BILLING_CONDITION,
     buildDateCondition(period, timezone, dateRange),
     cacheRequiredCondition,
     providerType ? eq(providers.providerType, providerType) : undefined,
@@ -509,24 +506,24 @@ async function findProviderCacheHitRateLeaderboardWithTimezone(
 
   const rankings = await db
     .select({
-      providerId: messageRequest.providerId,
+      providerId: usageLedger.finalProviderId,
       providerName: providers.name,
       totalRequests: sql<number>`count(*)::double precision`,
-      totalCost: sql<string>`COALESCE(sum(${messageRequest.costUsd}), 0)`,
+      totalCost: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
       cacheReadTokens: sumCacheReadTokens,
       cacheCreationCost: sumCacheCreationCost,
       totalInputTokens: sumTotalInputTokens,
       cacheHitRate: cacheHitRateExpr,
     })
-    .from(messageRequest)
+    .from(usageLedger)
     .innerJoin(
       providers,
-      and(sql`${messageRequest.providerId} = ${providers.id}`, isNull(providers.deletedAt))
+      and(sql`${usageLedger.finalProviderId} = ${providers.id}`, isNull(providers.deletedAt))
     )
     .where(
       and(...whereConditions.filter((c): c is NonNullable<(typeof whereConditions)[number]> => !!c))
     )
-    .groupBy(messageRequest.providerId, providers.name)
+    .groupBy(usageLedger.finalProviderId, providers.name)
     .orderBy(desc(cacheHitRateExpr), desc(sql`count(*)`));
 
   // Model-level cache hit breakdown per provider
@@ -534,11 +531,11 @@ async function findProviderCacheHitRateLeaderboardWithTimezone(
   const billingModelSource = systemSettings.billingModelSource;
   const modelField =
     billingModelSource === "original"
-      ? sql<string>`COALESCE(${messageRequest.originalModel}, ${messageRequest.model})`
-      : sql<string>`COALESCE(${messageRequest.model}, ${messageRequest.originalModel})`;
+      ? sql<string>`COALESCE(${usageLedger.originalModel}, ${usageLedger.model})`
+      : sql<string>`COALESCE(${usageLedger.model}, ${usageLedger.originalModel})`;
 
   const modelTotalInput = sql<number>`COALESCE(sum(${totalInputTokensExpr})::double precision, 0::double precision)`;
-  const modelCacheRead = sql<number>`COALESCE(sum(COALESCE(${messageRequest.cacheReadInputTokens}, 0))::double precision, 0::double precision)`;
+  const modelCacheRead = sql<number>`COALESCE(sum(COALESCE(${usageLedger.cacheReadInputTokens}, 0))::double precision, 0::double precision)`;
   const modelCacheHitRate = sql<number>`COALESCE(
     ${modelCacheRead} / NULLIF(${modelTotalInput}, 0::double precision),
     0::double precision
@@ -546,22 +543,22 @@ async function findProviderCacheHitRateLeaderboardWithTimezone(
 
   const modelRows = await db
     .select({
-      providerId: messageRequest.providerId,
+      providerId: usageLedger.finalProviderId,
       model: modelField,
       totalRequests: sql<number>`count(*)::double precision`,
       cacheReadTokens: modelCacheRead,
       totalInputTokens: modelTotalInput,
       cacheHitRate: modelCacheHitRate,
     })
-    .from(messageRequest)
+    .from(usageLedger)
     .innerJoin(
       providers,
-      and(sql`${messageRequest.providerId} = ${providers.id}`, isNull(providers.deletedAt))
+      and(sql`${usageLedger.finalProviderId} = ${providers.id}`, isNull(providers.deletedAt))
     )
     .where(
       and(...whereConditions.filter((c): c is NonNullable<(typeof whereConditions)[number]> => !!c))
     )
-    .groupBy(messageRequest.providerId, modelField)
+    .groupBy(usageLedger.finalProviderId, modelField)
     .orderBy(desc(modelCacheHitRate), desc(sql`count(*)`));
 
   // Group model stats by providerId
@@ -672,37 +669,31 @@ async function findModelLeaderboardWithTimezone(
   // redirected: 优先使用 model(重定向后的实际模型),回退到 originalModel
   const modelField =
     billingModelSource === "original"
-      ? sql<string>`COALESCE(${messageRequest.originalModel}, ${messageRequest.model})`
-      : sql<string>`COALESCE(${messageRequest.model}, ${messageRequest.originalModel})`;
+      ? sql<string>`COALESCE(${usageLedger.originalModel}, ${usageLedger.model})`
+      : sql<string>`COALESCE(${usageLedger.model}, ${usageLedger.originalModel})`;
 
   const rankings = await db
     .select({
       model: modelField,
       totalRequests: sql<number>`count(*)::double precision`,
-      totalCost: sql<string>`COALESCE(sum(${messageRequest.costUsd}), 0)`,
+      totalCost: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
       totalTokens: sql<number>`COALESCE(
         sum(
-          ${messageRequest.inputTokens} +
-          ${messageRequest.outputTokens} +
-          COALESCE(${messageRequest.cacheCreationInputTokens}, 0) +
-          COALESCE(${messageRequest.cacheReadInputTokens}, 0)
+          ${usageLedger.inputTokens} +
+          ${usageLedger.outputTokens} +
+          COALESCE(${usageLedger.cacheCreationInputTokens}, 0) +
+          COALESCE(${usageLedger.cacheReadInputTokens}, 0)
         )::double precision,
         0::double precision
       )`,
       successRate: sql<number>`COALESCE(
-        count(CASE WHEN ${messageRequest.errorMessage} IS NULL OR ${messageRequest.errorMessage} = '' THEN 1 END)::double precision
+        count(CASE WHEN ${usageLedger.isSuccess} THEN 1 END)::double precision
         / NULLIF(count(*)::double precision, 0),
         0::double precision
       )`,
     })
-    .from(messageRequest)
-    .where(
-      and(
-        isNull(messageRequest.deletedAt),
-        EXCLUDE_WARMUP_CONDITION,
-        buildDateCondition(period, timezone, dateRange)
-      )
-    )
+    .from(usageLedger)
+    .where(and(LEDGER_BILLING_CONDITION, buildDateCondition(period, timezone, dateRange)))
     .groupBy(modelField)
     .orderBy(desc(sql`count(*)`)); // 按请求数排序