Browse Source

refactor(repo): migrate usage-logs summary + my-usage to usage_ledger

ding113 1 week ago
parent
commit
44be131a45
2 changed files with 66 additions and 31 deletions
  1. 13 13
      src/actions/my-usage.ts
  2. 53 18
      src/repository/usage-logs.ts

+ 13 - 13
src/actions/my-usage.ts

@@ -3,7 +3,7 @@
 import { fromZonedTime } from "date-fns-tz";
 import { and, eq, gte, isNull, lt, sql } from "drizzle-orm";
 import { db } from "@/drizzle/db";
-import { messageRequest } from "@/drizzle/schema";
+import { messageRequest, usageLedger } from "@/drizzle/schema";
 import { getSession } from "@/lib/auth";
 import { logger } from "@/lib/logger";
 import { resolveKeyConcurrentSessionLimit } from "@/lib/rate-limit/concurrent-session-limit";
@@ -11,6 +11,7 @@ import type { DailyResetMode } from "@/lib/rate-limit/time-utils";
 import { SessionTracker } from "@/lib/session-tracker";
 import type { CurrencyCode } from "@/lib/utils";
 import { resolveSystemTimezone } from "@/lib/utils/timezone";
+import { LEDGER_BILLING_CONDITION } from "@/repository/_shared/ledger-conditions";
 import { EXCLUDE_WARMUP_CONDITION } from "@/repository/_shared/message-request-conditions";
 import { getSystemSettings } from "@/repository/system-config";
 import {
@@ -359,24 +360,23 @@ export async function getMyTodayStats(): Promise<ActionResult<MyTodayStats>> {
 
     const breakdown = await db
       .select({
-        model: messageRequest.model,
-        originalModel: messageRequest.originalModel,
+        model: usageLedger.model,
+        originalModel: usageLedger.originalModel,
         calls: sql<number>`count(*)::int`,
-        costUsd: sql<string>`COALESCE(sum(${messageRequest.costUsd}), 0)`,
-        inputTokens: sql<number>`COALESCE(sum(${messageRequest.inputTokens}), 0)::double precision`,
-        outputTokens: sql<number>`COALESCE(sum(${messageRequest.outputTokens}), 0)::double precision`,
+        costUsd: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
+        inputTokens: sql<number>`COALESCE(sum(${usageLedger.inputTokens}), 0)::double precision`,
+        outputTokens: sql<number>`COALESCE(sum(${usageLedger.outputTokens}), 0)::double precision`,
       })
-      .from(messageRequest)
+      .from(usageLedger)
       .where(
         and(
-          eq(messageRequest.key, session.key.key),
-          isNull(messageRequest.deletedAt),
-          EXCLUDE_WARMUP_CONDITION,
-          gte(messageRequest.createdAt, timeRange.startTime),
-          lt(messageRequest.createdAt, timeRange.endTime)
+          eq(usageLedger.key, session.key.key),
+          LEDGER_BILLING_CONDITION,
+          gte(usageLedger.createdAt, timeRange.startTime),
+          lt(usageLedger.createdAt, timeRange.endTime)
         )
       )
-      .groupBy(messageRequest.model, messageRequest.originalModel);
+      .groupBy(usageLedger.model, usageLedger.originalModel);
 
     let totalCalls = 0;
     let totalInputTokens = 0;

+ 53 - 18
src/repository/usage-logs.ts

@@ -1,12 +1,13 @@
 import "server-only";
 
-import { and, desc, eq, isNull, sql } from "drizzle-orm";
+import { and, desc, eq, gte, isNull, lt, sql } from "drizzle-orm";
 import { db } from "@/drizzle/db";
-import { keys as keysTable, messageRequest, providers, users } from "@/drizzle/schema";
+import { keys as keysTable, messageRequest, providers, usageLedger, users } from "@/drizzle/schema";
 import { TTLMap } from "@/lib/cache/ttl-map";
 import { buildUnifiedSpecialSettings } from "@/lib/utils/special-settings";
 import type { ProviderChainItem } from "@/types/message";
 import type { SpecialSetting } from "@/types/special-settings";
+import { LEDGER_BILLING_CONDITION } from "./_shared/ledger-conditions";
 import { escapeLike } from "./_shared/like";
 import { EXCLUDE_WARMUP_CONDITION } from "./_shared/message-request-conditions";
 import { buildUsageLogConditions } from "./_shared/usage-log-filters";
@@ -716,10 +717,10 @@ export async function findUsageLogsStats(
 ): Promise<UsageLogSummary> {
   const { userId, keyId, providerId } = filters;
 
-  const conditions = [isNull(messageRequest.deletedAt)];
+  const conditions = [LEDGER_BILLING_CONDITION];
 
   if (userId !== undefined) {
-    conditions.push(eq(messageRequest.userId, userId));
+    conditions.push(eq(usageLedger.userId, userId));
   }
 
   if (keyId !== undefined) {
@@ -727,32 +728,66 @@ export async function findUsageLogsStats(
   }
 
   if (providerId !== undefined) {
-    conditions.push(eq(messageRequest.providerId, providerId));
+    conditions.push(eq(usageLedger.providerId, providerId));
   }
 
-  conditions.push(...buildUsageLogConditions(filters));
+  const trimmedSessionId = filters.sessionId?.trim();
+  if (trimmedSessionId) {
+    conditions.push(eq(usageLedger.sessionId, trimmedSessionId));
+  }
+
+  if (filters.startTime !== undefined) {
+    conditions.push(gte(usageLedger.createdAt, new Date(filters.startTime)));
+  }
+
+  if (filters.endTime !== undefined) {
+    conditions.push(lt(usageLedger.createdAt, new Date(filters.endTime)));
+  }
 
-  const statsConditions = [...conditions, EXCLUDE_WARMUP_CONDITION];
+  if (filters.statusCode !== undefined) {
+    conditions.push(eq(usageLedger.statusCode, filters.statusCode));
+  } else if (filters.excludeStatusCode200) {
+    conditions.push(sql`(${usageLedger.statusCode} IS NULL OR ${usageLedger.statusCode} <> 200)`);
+  }
+
+  if (filters.model) {
+    conditions.push(eq(usageLedger.model, filters.model));
+  }
+
+  if (filters.endpoint) {
+    conditions.push(eq(usageLedger.endpoint, filters.endpoint));
+  }
+
+  if (filters.minRetryCount !== undefined) {
+    conditions.push(
+      sql`GREATEST(COALESCE(jsonb_array_length(${messageRequest.providerChain}) - 1, 0), 0) >= ${filters.minRetryCount}`
+    );
+  }
 
   const baseQuery = db
     .select({
       totalRequests: sql<number>`count(*)::double precision`,
-      totalCost: sql<string>`COALESCE(sum(${messageRequest.costUsd}), 0)`,
-      totalInputTokens: sql<number>`COALESCE(sum(${messageRequest.inputTokens})::double precision, 0::double precision)`,
-      totalOutputTokens: sql<number>`COALESCE(sum(${messageRequest.outputTokens})::double precision, 0::double precision)`,
-      totalCacheCreationTokens: sql<number>`COALESCE(sum(${messageRequest.cacheCreationInputTokens})::double precision, 0::double precision)`,
-      totalCacheReadTokens: sql<number>`COALESCE(sum(${messageRequest.cacheReadInputTokens})::double precision, 0::double precision)`,
-      totalCacheCreation5mTokens: sql<number>`COALESCE(sum(${messageRequest.cacheCreation5mInputTokens})::double precision, 0::double precision)`,
-      totalCacheCreation1hTokens: sql<number>`COALESCE(sum(${messageRequest.cacheCreation1hInputTokens})::double precision, 0::double precision)`,
+      totalCost: sql<string>`COALESCE(sum(${usageLedger.costUsd}), 0)`,
+      totalInputTokens: sql<number>`COALESCE(sum(${usageLedger.inputTokens})::double precision, 0::double precision)`,
+      totalOutputTokens: sql<number>`COALESCE(sum(${usageLedger.outputTokens})::double precision, 0::double precision)`,
+      totalCacheCreationTokens: sql<number>`COALESCE(sum(${usageLedger.cacheCreationInputTokens})::double precision, 0::double precision)`,
+      totalCacheReadTokens: sql<number>`COALESCE(sum(${usageLedger.cacheReadInputTokens})::double precision, 0::double precision)`,
+      totalCacheCreation5mTokens: sql<number>`COALESCE(sum(${usageLedger.cacheCreation5mInputTokens})::double precision, 0::double precision)`,
+      totalCacheCreation1hTokens: sql<number>`COALESCE(sum(${usageLedger.cacheCreation1hInputTokens})::double precision, 0::double precision)`,
     })
-    .from(messageRequest);
+    .from(usageLedger);
 
-  const query =
+  const queryByKey =
     keyId !== undefined
-      ? baseQuery.innerJoin(keysTable, eq(messageRequest.key, keysTable.key))
+      ? baseQuery.innerJoin(keysTable, eq(usageLedger.key, keysTable.key))
       : baseQuery;
 
-  const [summaryResult] = await query.where(and(...statsConditions));
+  const query =
+    filters.minRetryCount !== undefined
+      ? queryByKey.innerJoin(messageRequest, eq(usageLedger.requestId, messageRequest.id))
+      : queryByKey;
+
+  const [summaryResult] = await query.where(and(...conditions));
 
   const totalRequests = summaryResult?.totalRequests ?? 0;
   const totalCost = parseFloat(summaryResult?.totalCost ?? "0");