|
|
@@ -8,6 +8,7 @@ import {
|
|
|
} from "@/lib/constants/usage-logs.constants";
|
|
|
import { logger } from "@/lib/logger";
|
|
|
import { readLiveChainBatch } from "@/lib/redis/live-chain-store";
|
|
|
+import { RedisKVStore } from "@/lib/redis/redis-kv-store";
|
|
|
import { getRetryCount } from "@/lib/utils/provider-chain-formatter";
|
|
|
import { isProviderFinalized } from "@/lib/utils/provider-display";
|
|
|
import {
|
|
|
@@ -39,6 +40,245 @@ let filterOptionsCache: {
|
|
|
expiresAt: number;
|
|
|
} | null = null;
|
|
|
|
|
|
+const USAGE_LOGS_EXPORT_BATCH_SIZE = 500;
|
|
|
+const USAGE_LOGS_EXPORT_JOB_TTL_MS = 15 * 60 * 1000;
|
|
|
+const USAGE_LOGS_EXPORT_JOB_TTL_SECONDS = Math.floor(USAGE_LOGS_EXPORT_JOB_TTL_MS / 1000);
|
|
|
+const CSV_HEADERS = [
|
|
|
+ "Time",
|
|
|
+ "User",
|
|
|
+ "Key",
|
|
|
+ "Provider",
|
|
|
+ "Model",
|
|
|
+ "Original Model",
|
|
|
+ "Endpoint",
|
|
|
+ "Status Code",
|
|
|
+ "Input Tokens",
|
|
|
+ "Output Tokens",
|
|
|
+ "Cache Write 5m",
|
|
|
+ "Cache Write 1h",
|
|
|
+ "Cache Read",
|
|
|
+ "Total Tokens",
|
|
|
+ "Cost (USD)",
|
|
|
+ "Duration (ms)",
|
|
|
+ "Session ID",
|
|
|
+ "Retry Count",
|
|
|
+] as const;
|
|
|
+
|
|
|
+type UsageLogsSession = NonNullable<Awaited<ReturnType<typeof getSession>>>;
|
|
|
+
|
|
|
+export interface UsageLogsExportStatus {
|
|
|
+ jobId: string;
|
|
|
+ status: "queued" | "running" | "completed" | "failed";
|
|
|
+ processedRows: number;
|
|
|
+ totalRows: number;
|
|
|
+ progressPercent: number;
|
|
|
+ error?: string;
|
|
|
+}
|
|
|
+
|
|
|
+interface UsageLogsExportJobRecord extends UsageLogsExportStatus {
|
|
|
+ ownerUserId: number;
|
|
|
+}
|
|
|
+
|
|
|
+const usageLogsExportStatusStore = new RedisKVStore<UsageLogsExportJobRecord>({
|
|
|
+ prefix: "cch:usage-logs:export:status:",
|
|
|
+ defaultTtlSeconds: USAGE_LOGS_EXPORT_JOB_TTL_SECONDS,
|
|
|
+});
|
|
|
+
|
|
|
+const usageLogsExportCsvStore = new RedisKVStore<string>({
|
|
|
+ prefix: "cch:usage-logs:export:csv:",
|
|
|
+ defaultTtlSeconds: USAGE_LOGS_EXPORT_JOB_TTL_SECONDS,
|
|
|
+});
|
|
|
+
|
|
|
+function usageLogsExportCsvKey(jobId: string): string {
|
|
|
+ return `${jobId}:csv`;
|
|
|
+}
|
|
|
+
|
|
|
+function resolveUsageLogFiltersForSession(
|
|
|
+ session: UsageLogsSession,
|
|
|
+ filters: Omit<UsageLogFilters, "userId" | "page" | "pageSize">
|
|
|
+): Omit<UsageLogFilters, "page" | "pageSize"> {
|
|
|
+ return session.user.role === "admin" ? filters : { ...filters, userId: session.user.id };
|
|
|
+}
|
|
|
+
|
|
|
+function toUsageLogsExportStatus(job: UsageLogsExportJobRecord): UsageLogsExportStatus {
|
|
|
+ return {
|
|
|
+ jobId: job.jobId,
|
|
|
+ status: job.status,
|
|
|
+ processedRows: job.processedRows,
|
|
|
+ totalRows: job.totalRows,
|
|
|
+ progressPercent: job.progressPercent,
|
|
|
+ error: job.error,
|
|
|
+ };
|
|
|
+}
|
|
|
+
|
|
|
+function getUsageLogsExportJob(
|
|
|
+ session: UsageLogsSession,
|
|
|
+ job: UsageLogsExportJobRecord | null,
|
|
|
+ _jobId: string
|
|
|
+): UsageLogsExportJobRecord | null {
|
|
|
+ if (!job || job.ownerUserId !== session.user.id) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return job;
|
|
|
+}
|
|
|
+
|
|
|
+function buildCsvRows(logs: UsageLogRow[]): string[] {
|
|
|
+ return logs.map((log) => {
|
|
|
+ const retryCount = log.providerChain ? getRetryCount(log.providerChain) : 0;
|
|
|
+ return [
|
|
|
+ log.createdAt ? new Date(log.createdAt).toISOString() : "",
|
|
|
+ escapeCsvField(log.userName),
|
|
|
+ escapeCsvField(log.keyName),
|
|
|
+ escapeCsvField(log.providerName ?? ""),
|
|
|
+ escapeCsvField(log.model ?? ""),
|
|
|
+ escapeCsvField(log.originalModel ?? ""),
|
|
|
+ escapeCsvField(log.endpoint ?? ""),
|
|
|
+ log.statusCode?.toString() ?? "",
|
|
|
+ log.inputTokens?.toString() ?? "0",
|
|
|
+ log.outputTokens?.toString() ?? "0",
|
|
|
+ log.cacheCreation5mInputTokens?.toString() ?? "0",
|
|
|
+ log.cacheCreation1hInputTokens?.toString() ?? "0",
|
|
|
+ log.cacheReadInputTokens?.toString() ?? "0",
|
|
|
+ log.totalTokens.toString(),
|
|
|
+ log.costUsd ?? "0",
|
|
|
+ log.durationMs?.toString() ?? "",
|
|
|
+ escapeCsvField(log.sessionId ?? ""),
|
|
|
+ retryCount.toString(),
|
|
|
+ ].join(",");
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+function buildUsageLogsExportProgress(
|
|
|
+ processedRows: number,
|
|
|
+ totalRows: number,
|
|
|
+ hasMore: boolean
|
|
|
+): Pick<UsageLogsExportStatus, "processedRows" | "totalRows" | "progressPercent"> {
|
|
|
+ const effectiveTotalRows = Math.max(totalRows, hasMore ? processedRows + 1 : processedRows);
|
|
|
+ const progressPercent =
|
|
|
+ effectiveTotalRows <= 0
|
|
|
+ ? 100
|
|
|
+ : hasMore
|
|
|
+ ? Math.min(99, Math.floor((processedRows / effectiveTotalRows) * 100))
|
|
|
+ : 100;
|
|
|
+
|
|
|
+ return {
|
|
|
+ processedRows,
|
|
|
+ totalRows: effectiveTotalRows,
|
|
|
+ progressPercent,
|
|
|
+ };
|
|
|
+}
|
|
|
+
|
|
|
+async function buildUsageLogsExportCsv(
|
|
|
+ filters: Omit<UsageLogFilters, "page" | "pageSize">,
|
|
|
+ onProgress?: (
|
|
|
+ progress: Pick<UsageLogsExportStatus, "processedRows" | "totalRows" | "progressPercent">
|
|
|
+ ) => Promise<void> | void
|
|
|
+): Promise<string> {
|
|
|
+ const initialResult = await findUsageLogsWithDetails({ ...filters, page: 1, pageSize: 1 });
|
|
|
+ let estimatedTotalRows = initialResult.total;
|
|
|
+
|
|
|
+ if (estimatedTotalRows === 0) {
|
|
|
+ const stats = await findUsageLogsStats(filters);
|
|
|
+ estimatedTotalRows = stats.totalRequests;
|
|
|
+ }
|
|
|
+
|
|
|
+ const csvLines = [CSV_HEADERS.join(",")];
|
|
|
+ let cursor: UsageLogBatchFilters["cursor"] | undefined;
|
|
|
+ let processedRows = 0;
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ const batch = await findUsageLogsBatch({
|
|
|
+ ...filters,
|
|
|
+ cursor,
|
|
|
+ limit: USAGE_LOGS_EXPORT_BATCH_SIZE,
|
|
|
+ });
|
|
|
+
|
|
|
+ if (batch.logs.length > 0) {
|
|
|
+ csvLines.push(...buildCsvRows(batch.logs));
|
|
|
+ processedRows += batch.logs.length;
|
|
|
+ }
|
|
|
+
|
|
|
+ const progress = buildUsageLogsExportProgress(processedRows, estimatedTotalRows, batch.hasMore);
|
|
|
+ estimatedTotalRows = progress.totalRows;
|
|
|
+ await onProgress?.(progress);
|
|
|
+
|
|
|
+ if (!batch.hasMore || !batch.nextCursor) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ cursor = batch.nextCursor;
|
|
|
+ }
|
|
|
+
|
|
|
+ return `\uFEFF${csvLines.join("\n")}`;
|
|
|
+}
|
|
|
+
|
|
|
+async function runUsageLogsExportJob(
|
|
|
+ jobId: string,
|
|
|
+ filters: Omit<UsageLogFilters, "page" | "pageSize">
|
|
|
+): Promise<void> {
|
|
|
+ const existingJob = await usageLogsExportStatusStore.get(jobId);
|
|
|
+ if (!existingJob) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ await usageLogsExportStatusStore.set(jobId, {
|
|
|
+ ...existingJob,
|
|
|
+ status: "running",
|
|
|
+ error: undefined,
|
|
|
+ });
|
|
|
+
|
|
|
+ try {
|
|
|
+ const csv = await buildUsageLogsExportCsv(filters, async (progress) => {
|
|
|
+ const currentJob = await usageLogsExportStatusStore.get(jobId);
|
|
|
+ if (!currentJob) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ await usageLogsExportStatusStore.set(jobId, {
|
|
|
+ ...currentJob,
|
|
|
+ status: "running",
|
|
|
+ ...progress,
|
|
|
+ });
|
|
|
+ });
|
|
|
+
|
|
|
+ const currentJob = await usageLogsExportStatusStore.get(jobId);
|
|
|
+ if (!currentJob) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ const csvStored = await usageLogsExportCsvStore.set(usageLogsExportCsvKey(jobId), csv);
|
|
|
+ if (!csvStored) {
|
|
|
+ await usageLogsExportStatusStore.set(jobId, {
|
|
|
+ ...currentJob,
|
|
|
+ status: "failed",
|
|
|
+ progressPercent: 0,
|
|
|
+ error: "Failed to persist CSV to Redis",
|
|
|
+ });
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ await usageLogsExportStatusStore.set(jobId, {
|
|
|
+ ...currentJob,
|
|
|
+ status: "completed",
|
|
|
+ progressPercent: 100,
|
|
|
+ error: undefined,
|
|
|
+ });
|
|
|
+ } catch (error) {
|
|
|
+ logger.error("Failed to run usage logs export job:", error);
|
|
|
+ const currentJob = await usageLogsExportStatusStore.get(jobId);
|
|
|
+ if (!currentJob) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ await usageLogsExportStatusStore.set(jobId, {
|
|
|
+ ...currentJob,
|
|
|
+ status: "failed",
|
|
|
+ progressPercent: 0,
|
|
|
+ error: error instanceof Error ? error.message : "Export failed",
|
|
|
+ });
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* 获取使用日志(根据权限过滤)
|
|
|
*/
|
|
|
@@ -77,16 +317,8 @@ export async function exportUsageLogs(
|
|
|
return { ok: false, error: "未登录" };
|
|
|
}
|
|
|
|
|
|
- // 如果不是 admin,强制过滤为当前用户
|
|
|
- const finalFilters: UsageLogFilters =
|
|
|
- session.user.role === "admin"
|
|
|
- ? { ...filters, page: 1, pageSize: 10000 }
|
|
|
- : { ...filters, userId: session.user.id, page: 1, pageSize: 10000 };
|
|
|
-
|
|
|
- const result = await findUsageLogsWithDetails(finalFilters);
|
|
|
-
|
|
|
- // 生成 CSV
|
|
|
- const csv = generateCsv(result.logs);
|
|
|
+ const finalFilters = resolveUsageLogFiltersForSession(session, filters);
|
|
|
+ const csv = await buildUsageLogsExportCsv(finalFilters);
|
|
|
|
|
|
return { ok: true, data: csv };
|
|
|
} catch (error) {
|
|
|
@@ -96,74 +328,117 @@ export async function exportUsageLogs(
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * 生成 CSV 字符串
|
|
|
- */
|
|
|
-function generateCsv(logs: UsageLogRow[]): string {
|
|
|
- const headers = [
|
|
|
- "Time",
|
|
|
- "User",
|
|
|
- "Key",
|
|
|
- "Provider",
|
|
|
- "Model",
|
|
|
- "Original Model",
|
|
|
- "Endpoint",
|
|
|
- "Status Code",
|
|
|
- "Input Tokens",
|
|
|
- "Output Tokens",
|
|
|
- "Cache Write 5m",
|
|
|
- "Cache Write 1h",
|
|
|
- "Cache Read",
|
|
|
- "Total Tokens",
|
|
|
- "Cost (USD)",
|
|
|
- "Duration (ms)",
|
|
|
- "Session ID",
|
|
|
- "Retry Count",
|
|
|
- ];
|
|
|
-
|
|
|
- const rows = logs.map((log) => {
|
|
|
- const retryCount = log.providerChain ? getRetryCount(log.providerChain) : 0;
|
|
|
- return [
|
|
|
- log.createdAt ? new Date(log.createdAt).toISOString() : "",
|
|
|
- escapeCsvField(log.userName),
|
|
|
- escapeCsvField(log.keyName),
|
|
|
- escapeCsvField(log.providerName ?? ""),
|
|
|
- escapeCsvField(log.model ?? ""),
|
|
|
- escapeCsvField(log.originalModel ?? ""),
|
|
|
- escapeCsvField(log.endpoint ?? ""),
|
|
|
- log.statusCode?.toString() ?? "",
|
|
|
- log.inputTokens?.toString() ?? "0",
|
|
|
- log.outputTokens?.toString() ?? "0",
|
|
|
- log.cacheCreation5mInputTokens?.toString() ?? "0",
|
|
|
- log.cacheCreation1hInputTokens?.toString() ?? "0",
|
|
|
- log.cacheReadInputTokens?.toString() ?? "0",
|
|
|
- log.totalTokens.toString(),
|
|
|
- log.costUsd ?? "0",
|
|
|
- log.durationMs?.toString() ?? "",
|
|
|
- escapeCsvField(log.sessionId ?? ""),
|
|
|
- retryCount.toString(),
|
|
|
- ];
|
|
|
- });
|
|
|
+export async function startUsageLogsExport(
|
|
|
+ filters: Omit<UsageLogFilters, "userId" | "page" | "pageSize">
|
|
|
+): Promise<ActionResult<{ jobId: string }>> {
|
|
|
+ try {
|
|
|
+ const session = await getSession();
|
|
|
+ if (!session) {
|
|
|
+ return { ok: false, error: "未登录" };
|
|
|
+ }
|
|
|
+
|
|
|
+ const jobId = crypto.randomUUID();
|
|
|
+ const finalFilters = resolveUsageLogFiltersForSession(session, filters);
|
|
|
+
|
|
|
+ const stored = await usageLogsExportStatusStore.set(jobId, {
|
|
|
+ jobId,
|
|
|
+ ownerUserId: session.user.id,
|
|
|
+ status: "queued",
|
|
|
+ processedRows: 0,
|
|
|
+ totalRows: 0,
|
|
|
+ progressPercent: 0,
|
|
|
+ });
|
|
|
+
|
|
|
+ if (!stored) {
|
|
|
+ return { ok: false, error: "Export job initialization failed" };
|
|
|
+ }
|
|
|
|
|
|
- // 添加 BOM 以支持 Excel 正确识别 UTF-8
|
|
|
- const bom = "\uFEFF";
|
|
|
- const csvContent = [headers.join(","), ...rows.map((row) => row.join(","))].join("\n");
|
|
|
+ // Defer to next tick so the action returns the jobId immediately.
|
|
|
+ // Safe for self-hosted Bun server (long-lived process); NOT suitable for serverless.
|
|
|
+ setTimeout(() => {
|
|
|
+ void runUsageLogsExportJob(jobId, finalFilters);
|
|
|
+ }, 0);
|
|
|
|
|
|
- return bom + csvContent;
|
|
|
+ return { ok: true, data: { jobId } };
|
|
|
+ } catch (error) {
|
|
|
+ logger.error("Failed to start usage logs export:", error);
|
|
|
+ const message = error instanceof Error ? error.message : "Failed to start export";
|
|
|
+ return { ok: false, error: message };
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+export async function getUsageLogsExportStatus(
|
|
|
+ jobId: string
|
|
|
+): Promise<ActionResult<UsageLogsExportStatus>> {
|
|
|
+ try {
|
|
|
+ const session = await getSession();
|
|
|
+ if (!session) {
|
|
|
+ return { ok: false, error: "未登录" };
|
|
|
+ }
|
|
|
+
|
|
|
+ const job = getUsageLogsExportJob(session, await usageLogsExportStatusStore.get(jobId), jobId);
|
|
|
+ if (!job) {
|
|
|
+ return { ok: false, error: "Export job not found or expired" };
|
|
|
+ }
|
|
|
+
|
|
|
+ return { ok: true, data: toUsageLogsExportStatus(job) };
|
|
|
+ } catch (error) {
|
|
|
+ logger.error("Failed to get usage logs export status:", error);
|
|
|
+ const message = error instanceof Error ? error.message : "Failed to get export status";
|
|
|
+ return { ok: false, error: message };
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+export async function downloadUsageLogsExport(jobId: string): Promise<ActionResult<string>> {
|
|
|
+ try {
|
|
|
+ const session = await getSession();
|
|
|
+ if (!session) {
|
|
|
+ return { ok: false, error: "未登录" };
|
|
|
+ }
|
|
|
+
|
|
|
+ const job = getUsageLogsExportJob(session, await usageLogsExportStatusStore.get(jobId), jobId);
|
|
|
+ if (!job) {
|
|
|
+ return { ok: false, error: "Export job not found or expired" };
|
|
|
+ }
|
|
|
+
|
|
|
+ if (job.status === "failed") {
|
|
|
+ return { ok: false, error: job.error || "Export failed" };
|
|
|
+ }
|
|
|
+
|
|
|
+ if (job.status !== "completed") {
|
|
|
+ return { ok: false, error: "Export not yet completed" };
|
|
|
+ }
|
|
|
+
|
|
|
+ const csv = await usageLogsExportCsvStore.get(usageLogsExportCsvKey(jobId));
|
|
|
+ if (!csv) {
|
|
|
+ return { ok: false, error: "Export file not found or expired" };
|
|
|
+ }
|
|
|
+
|
|
|
+ return { ok: true, data: csv };
|
|
|
+ } catch (error) {
|
|
|
+ logger.error("Failed to download usage logs export:", error);
|
|
|
+ const message = error instanceof Error ? error.message : "Failed to download export";
|
|
|
+ return { ok: false, error: message };
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 转义 CSV 字段(防止 CSV 公式注入攻击)
|
|
|
*/
|
|
|
function escapeCsvField(field: string): string {
|
|
|
- // Prevent CSV formula injection by prefixing dangerous characters
|
|
|
- const dangerousChars = ["=", "+", "-", "@", "\t", "\r"];
|
|
|
+ const dangerousChars = ["=", "+", "-", "@"];
|
|
|
+ const trimmedField = field.trimStart();
|
|
|
let safeField = field;
|
|
|
- if (dangerousChars.some((char) => field.startsWith(char))) {
|
|
|
- safeField = `'${field}`; // Prefix with single quote to prevent formula execution
|
|
|
+ if (trimmedField && dangerousChars.some((char) => trimmedField.startsWith(char))) {
|
|
|
+ safeField = `'${field}`;
|
|
|
}
|
|
|
|
|
|
- if (safeField.includes(",") || safeField.includes('"') || safeField.includes("\n")) {
|
|
|
+ if (
|
|
|
+ safeField.includes(",") ||
|
|
|
+ safeField.includes('"') ||
|
|
|
+ safeField.includes("\n") ||
|
|
|
+ safeField.includes("\r")
|
|
|
+ ) {
|
|
|
return `"${safeField.replace(/"/g, '""')}"`;
|
|
|
}
|
|
|
return safeField;
|