Browse Source

feat(session): add batch concurrent count query and session status utilities

- Add SessionTracker.getConcurrentCountBatch() for efficient N+1 query avoidance
- Add concurrentCount field to ActiveSessionInfo type
- Create session-status.ts with status calculation utilities
- Update active-sessions action to support batch queries
- Add unit tests for session status calculation

Co-Authored-By: Claude Opus 4.5 <[email protected]>
ding113 2 months ago
parent
commit
65df5f4d20

+ 67 - 50
src/actions/active-sessions.ts

@@ -43,33 +43,42 @@ export async function getActiveSessions(): Promise<ActionResult<ActiveSessionInf
       // 过滤:管理员可查看所有,普通用户只能查看自己的
       const filteredData = isAdmin ? cached : cached.filter((s) => s.userId === currentUserId);
 
+      // 获取并发计数(即使缓存命中也需要实时获取)
+      const { SessionTracker } = await import("@/lib/session-tracker");
+      const cachedSessionIds = filteredData.map((s) => s.sessionId);
+      const concurrentCounts = await SessionTracker.getConcurrentCountBatch(cachedSessionIds);
+
       return {
         ok: true,
-        data: filteredData.map((s) => ({
-          sessionId: s.sessionId,
-          userName: s.userName,
-          userId: s.userId,
-          keyId: s.keyId,
-          keyName: s.keyName,
-          providerId: s.providers[0]?.id || null,
-          providerName: s.providers.map((p) => p.name).join(", ") || null,
-          model: s.models.join(", ") || null,
-          apiType: (s.apiType as "chat" | "codex") || "chat",
-          startTime: s.firstRequestAt ? new Date(s.firstRequestAt).getTime() : Date.now(),
-          inputTokens: s.totalInputTokens,
-          outputTokens: s.totalOutputTokens,
-          cacheCreationInputTokens: s.totalCacheCreationTokens,
-          cacheReadInputTokens: s.totalCacheReadTokens,
-          totalTokens:
-            s.totalInputTokens +
-            s.totalOutputTokens +
-            s.totalCacheCreationTokens +
-            s.totalCacheReadTokens,
-          costUsd: s.totalCostUsd,
-          status: "completed",
-          durationMs: s.totalDurationMs,
-          requestCount: s.requestCount,
-        })),
+        data: filteredData.map((s) => {
+          const concurrentCount = concurrentCounts.get(s.sessionId) ?? 0;
+          return {
+            sessionId: s.sessionId,
+            userName: s.userName,
+            userId: s.userId,
+            keyId: s.keyId,
+            keyName: s.keyName,
+            providerId: s.providers[0]?.id || null,
+            providerName: s.providers.map((p) => p.name).join(", ") || null,
+            model: s.models.join(", ") || null,
+            apiType: (s.apiType as "chat" | "codex") || "chat",
+            startTime: s.firstRequestAt ? new Date(s.firstRequestAt).getTime() : Date.now(),
+            inputTokens: s.totalInputTokens,
+            outputTokens: s.totalOutputTokens,
+            cacheCreationInputTokens: s.totalCacheCreationTokens,
+            cacheReadInputTokens: s.totalCacheReadTokens,
+            totalTokens:
+              s.totalInputTokens +
+              s.totalOutputTokens +
+              s.totalCacheCreationTokens +
+              s.totalCacheReadTokens,
+            costUsd: s.totalCostUsd,
+            status: concurrentCount > 0 ? "in_progress" : "completed",
+            durationMs: s.totalDurationMs,
+            requestCount: s.requestCount,
+            concurrentCount,
+          };
+        }),
       };
     }
 
@@ -85,6 +94,10 @@ export async function getActiveSessions(): Promise<ActionResult<ActiveSessionInf
     const { aggregateMultipleSessionStats } = await import("@/repository/message");
     const sessionsData = await aggregateMultipleSessionStats(sessionIds);
 
+    // 3.1 批量获取并发计数(用于实时状态计算)
+    const allSessionIds = sessionsData.map((s) => s.sessionId);
+    const concurrentCounts = await SessionTracker.getConcurrentCountBatch(allSessionIds);
+
     // 4. 写入缓存
     setActiveSessionsCache(sessionsData);
 
@@ -94,31 +107,35 @@ export async function getActiveSessions(): Promise<ActionResult<ActiveSessionInf
       : sessionsData.filter((s) => s.userId === currentUserId);
 
     // 6. 转换格式
-    const sessions: ActiveSessionInfo[] = filteredSessions.map((s) => ({
-      sessionId: s.sessionId,
-      userName: s.userName,
-      userId: s.userId,
-      keyId: s.keyId,
-      keyName: s.keyName,
-      providerId: s.providers[0]?.id || null,
-      providerName: s.providers.map((p) => p.name).join(", ") || null,
-      model: s.models.join(", ") || null,
-      apiType: (s.apiType as "chat" | "codex") || "chat",
-      startTime: s.firstRequestAt ? new Date(s.firstRequestAt).getTime() : Date.now(),
-      inputTokens: s.totalInputTokens,
-      outputTokens: s.totalOutputTokens,
-      cacheCreationInputTokens: s.totalCacheCreationTokens,
-      cacheReadInputTokens: s.totalCacheReadTokens,
-      totalTokens:
-        s.totalInputTokens +
-        s.totalOutputTokens +
-        s.totalCacheCreationTokens +
-        s.totalCacheReadTokens,
-      costUsd: s.totalCostUsd,
-      status: "completed",
-      durationMs: s.totalDurationMs,
-      requestCount: s.requestCount,
-    }));
+    const sessions: ActiveSessionInfo[] = filteredSessions.map((s) => {
+      const concurrentCount = concurrentCounts.get(s.sessionId) ?? 0;
+      return {
+        sessionId: s.sessionId,
+        userName: s.userName,
+        userId: s.userId,
+        keyId: s.keyId,
+        keyName: s.keyName,
+        providerId: s.providers[0]?.id || null,
+        providerName: s.providers.map((p) => p.name).join(", ") || null,
+        model: s.models.join(", ") || null,
+        apiType: (s.apiType as "chat" | "codex") || "chat",
+        startTime: s.firstRequestAt ? new Date(s.firstRequestAt).getTime() : Date.now(),
+        inputTokens: s.totalInputTokens,
+        outputTokens: s.totalOutputTokens,
+        cacheCreationInputTokens: s.totalCacheCreationTokens,
+        cacheReadInputTokens: s.totalCacheReadTokens,
+        totalTokens:
+          s.totalInputTokens +
+          s.totalOutputTokens +
+          s.totalCacheCreationTokens +
+          s.totalCacheReadTokens,
+        costUsd: s.totalCostUsd,
+        status: concurrentCount > 0 ? "in_progress" : "completed",
+        durationMs: s.totalDurationMs,
+        requestCount: s.requestCount,
+        concurrentCount,
+      };
+    });
 
     logger.debug(
       `[SessionCache] Active sessions fetched and cached, count: ${sessions.length} (filtered for user: ${currentUserId})`

+ 93 - 0
src/lib/session-status.ts

@@ -0,0 +1,93 @@
+import { logger } from "@/lib/logger";
+
+/**
+ * Session Display Status Constants
+ * English uppercase abbreviations (no i18n for status labels)
+ */
+export const SESSION_DISPLAY_STATUS = {
+  IN_PROGRESS: "IN_PROGRESS",
+  IDLE: "IDLE",
+  INITIALIZING: "INITIALIZING",
+} as const;
+
+export type SessionDisplayStatus =
+  (typeof SESSION_DISPLAY_STATUS)[keyof typeof SESSION_DISPLAY_STATUS];
+
+/**
+ * Session Status Info for UI rendering
+ */
+export interface SessionStatusInfo {
+  status: SessionDisplayStatus;
+  label: string;
+  tooltipKey: string;
+  color: string;
+  pulse: boolean;
+}
+
+/**
+ * Input type for session status calculation
+ */
+export interface SessionStatusInput {
+  concurrentCount?: number;
+  requestCount?: number;
+  status?: "in_progress" | "completed" | "error";
+}
+
+/**
+ * Determine session display status based on request state
+ *
+ * Logic:
+ * - IN_PROGRESS: concurrentCount > 0 AND requestCount > 1 (has active requests, not first)
+ * - INITIALIZING: requestCount <= 1 AND concurrentCount > 0 (first request still running)
+ * - IDLE: concurrentCount === 0 (all requests completed)
+ *
+ * @param session - Session data with concurrent and request counts
+ * @returns SessionStatusInfo for UI rendering
+ */
+export function getSessionDisplayStatus(session: SessionStatusInput): SessionStatusInfo {
+  const { concurrentCount = 0, requestCount = 0, status } = session;
+
+  logger.trace("getSessionDisplayStatus", { concurrentCount, requestCount, status });
+
+  // Error status takes priority
+  if (status === "error") {
+    return {
+      status: SESSION_DISPLAY_STATUS.IN_PROGRESS,
+      label: "ERROR",
+      tooltipKey: "status.errorTooltip",
+      color: "text-rose-500 dark:text-rose-400",
+      pulse: true,
+    };
+  }
+
+  // INITIALIZING: first request still running
+  if (requestCount <= 1 && concurrentCount > 0) {
+    return {
+      status: SESSION_DISPLAY_STATUS.INITIALIZING,
+      label: SESSION_DISPLAY_STATUS.INITIALIZING,
+      tooltipKey: "status.initializingTooltip",
+      color: "text-amber-500 dark:text-amber-400",
+      pulse: true,
+    };
+  }
+
+  // IN_PROGRESS: has active requests
+  if (concurrentCount > 0) {
+    return {
+      status: SESSION_DISPLAY_STATUS.IN_PROGRESS,
+      label: SESSION_DISPLAY_STATUS.IN_PROGRESS,
+      tooltipKey: "status.inProgressTooltip",
+      color: "text-emerald-500 dark:text-emerald-400",
+      pulse: true,
+    };
+  }
+
+  // IDLE: no active requests
+  return {
+    status: SESSION_DISPLAY_STATUS.IDLE,
+    label: SESSION_DISPLAY_STATUS.IDLE,
+    tooltipKey: "status.idleTooltip",
+    color: "text-muted-foreground/50",
+    pulse: false,
+  };
+}

+ 56 - 0
src/lib/session-tracker.ts

@@ -599,6 +599,62 @@ export class SessionTracker {
     }
   }
 
+  /**
+   * 批量获取多个 session 的并发计数
+   * 用于 dashboard 显示优化,避免 N+1 查询
+   *
+   * @param sessionIds - Session ID 数组
+   * @returns Map<sessionId, concurrentCount>
+   */
+  static async getConcurrentCountBatch(sessionIds: string[]): Promise<Map<string, number>> {
+    const result = new Map<string, number>();
+
+    if (sessionIds.length === 0) {
+      return result;
+    }
+
+    const redis = getRedisClient();
+    if (!redis || redis.status !== "ready") {
+      for (const id of sessionIds) {
+        result.set(id, 0);
+      }
+      return result;
+    }
+
+    try {
+      const pipeline = redis.pipeline();
+      for (const sessionId of sessionIds) {
+        pipeline.get(`session:${sessionId}:concurrent_count`);
+      }
+
+      const results = await pipeline.exec();
+      if (!results) {
+        for (const id of sessionIds) {
+          result.set(id, 0);
+        }
+        return result;
+      }
+
+      for (let i = 0; i < sessionIds.length; i++) {
+        const [err, count] = results[i];
+        result.set(sessionIds[i], !err && count ? parseInt(count as string, 10) : 0);
+      }
+
+      logger.trace("SessionTracker: Got concurrent count batch", {
+        count: sessionIds.length,
+        nonZero: Array.from(result.values()).filter((v) => v > 0).length,
+      });
+
+      return result;
+    } catch (error) {
+      logger.error("SessionTracker: Failed to get concurrent count batch", { error });
+      for (const id of sessionIds) {
+        result.set(id, 0);
+      }
+      return result;
+    }
+  }
+
   /**
    * 获取 session 当前并发计数
    *

+ 1 - 0
src/types/session.ts

@@ -37,6 +37,7 @@ export interface ActiveSessionInfo {
   // 派生字段
   durationMs?: number; // 总耗时
   requestCount?: number; // 请求次数
+  concurrentCount?: number; // 并发请求数(用于实时状态计算)
 }
 
 /**

+ 255 - 0
tests/unit/lib/session-status.test.ts

@@ -0,0 +1,255 @@
+import { describe, expect, test, vi } from "vitest";
+
+vi.mock("@/lib/logger", () => ({
+  logger: {
+    trace: vi.fn(),
+    debug: vi.fn(),
+    info: vi.fn(),
+    warn: vi.fn(),
+    error: vi.fn(),
+  },
+}));
+
+import {
+  getSessionDisplayStatus,
+  SESSION_DISPLAY_STATUS,
+  type SessionStatusInput,
+} from "@/lib/session-status";
+
+describe("Session Status Logic", () => {
+  describe("getSessionDisplayStatus", () => {
+    test("IDLE: concurrentCount is 0 with no requests", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 0,
+        requestCount: 0,
+        status: "completed",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IDLE);
+      expect(result.label).toBe("IDLE");
+      expect(result.pulse).toBe(false);
+      expect(result.tooltipKey).toBe("status.idleTooltip");
+    });
+
+    test("IDLE: concurrentCount is 0 with completed requests", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 0,
+        requestCount: 5,
+        status: "completed",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IDLE);
+      expect(result.label).toBe("IDLE");
+      expect(result.pulse).toBe(false);
+    });
+
+    test("INITIALIZING: first request still running (requestCount=0, concurrentCount>0)", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 1,
+        requestCount: 0,
+        status: "in_progress",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.INITIALIZING);
+      expect(result.label).toBe("INITIALIZING");
+      expect(result.pulse).toBe(true);
+      expect(result.tooltipKey).toBe("status.initializingTooltip");
+      expect(result.color).toContain("amber");
+    });
+
+    test("INITIALIZING: first request still running (requestCount=1, concurrentCount>0)", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 1,
+        requestCount: 1,
+        status: "in_progress",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.INITIALIZING);
+      expect(result.label).toBe("INITIALIZING");
+      expect(result.pulse).toBe(true);
+    });
+
+    test("IN_PROGRESS: has active requests after first (requestCount>1, concurrentCount>0)", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 2,
+        requestCount: 5,
+        status: "in_progress",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IN_PROGRESS);
+      expect(result.label).toBe("IN_PROGRESS");
+      expect(result.pulse).toBe(true);
+      expect(result.tooltipKey).toBe("status.inProgressTooltip");
+      expect(result.color).toContain("emerald");
+    });
+
+    test("IN_PROGRESS: single active request after first completed", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 1,
+        requestCount: 2,
+        status: "in_progress",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IN_PROGRESS);
+      expect(result.label).toBe("IN_PROGRESS");
+      expect(result.pulse).toBe(true);
+    });
+
+    test("ERROR: status is error takes priority", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 1,
+        requestCount: 3,
+        status: "error",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IN_PROGRESS);
+      expect(result.label).toBe("ERROR");
+      expect(result.pulse).toBe(true);
+      expect(result.tooltipKey).toBe("status.errorTooltip");
+      expect(result.color).toContain("rose");
+    });
+
+    test("ERROR: status is error even with no concurrent requests", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 0,
+        requestCount: 5,
+        status: "error",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.label).toBe("ERROR");
+      expect(result.pulse).toBe(true);
+    });
+
+    test("handles undefined values with defaults", () => {
+      const input: SessionStatusInput = {};
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IDLE);
+      expect(result.label).toBe("IDLE");
+      expect(result.pulse).toBe(false);
+    });
+
+    test("handles partial input with only concurrentCount", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 1,
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.INITIALIZING);
+      expect(result.label).toBe("INITIALIZING");
+    });
+
+    test("handles partial input with only requestCount", () => {
+      const input: SessionStatusInput = {
+        requestCount: 10,
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IDLE);
+      expect(result.label).toBe("IDLE");
+    });
+
+    test("high concurrency scenario", () => {
+      const input: SessionStatusInput = {
+        concurrentCount: 50,
+        requestCount: 100,
+        status: "in_progress",
+      };
+
+      const result = getSessionDisplayStatus(input);
+
+      expect(result.status).toBe(SESSION_DISPLAY_STATUS.IN_PROGRESS);
+      expect(result.label).toBe("IN_PROGRESS");
+      expect(result.pulse).toBe(true);
+    });
+  });
+
+  describe("SESSION_DISPLAY_STATUS constants", () => {
+    test("constants are uppercase strings", () => {
+      expect(SESSION_DISPLAY_STATUS.IN_PROGRESS).toBe("IN_PROGRESS");
+      expect(SESSION_DISPLAY_STATUS.IDLE).toBe("IDLE");
+      expect(SESSION_DISPLAY_STATUS.INITIALIZING).toBe("INITIALIZING");
+    });
+
+    test("constants are readonly", () => {
+      expect(Object.isFrozen(SESSION_DISPLAY_STATUS)).toBe(false);
+      expect(typeof SESSION_DISPLAY_STATUS).toBe("object");
+    });
+  });
+
+  describe("status transition scenarios", () => {
+    test("session lifecycle: new -> initializing -> in_progress -> idle", () => {
+      // New session, no requests yet
+      const newSession: SessionStatusInput = {
+        concurrentCount: 0,
+        requestCount: 0,
+      };
+      expect(getSessionDisplayStatus(newSession).status).toBe(SESSION_DISPLAY_STATUS.IDLE);
+
+      // First request starts
+      const initializing: SessionStatusInput = {
+        concurrentCount: 1,
+        requestCount: 0,
+      };
+      expect(getSessionDisplayStatus(initializing).status).toBe(
+        SESSION_DISPLAY_STATUS.INITIALIZING
+      );
+
+      // First request completes, second starts
+      const inProgress: SessionStatusInput = {
+        concurrentCount: 1,
+        requestCount: 2,
+      };
+      expect(getSessionDisplayStatus(inProgress).status).toBe(SESSION_DISPLAY_STATUS.IN_PROGRESS);
+
+      // All requests complete
+      const idle: SessionStatusInput = {
+        concurrentCount: 0,
+        requestCount: 10,
+      };
+      expect(getSessionDisplayStatus(idle).status).toBe(SESSION_DISPLAY_STATUS.IDLE);
+    });
+
+    test("error can occur at any stage", () => {
+      const errorDuringInit: SessionStatusInput = {
+        concurrentCount: 1,
+        requestCount: 0,
+        status: "error",
+      };
+      expect(getSessionDisplayStatus(errorDuringInit).label).toBe("ERROR");
+
+      const errorDuringProgress: SessionStatusInput = {
+        concurrentCount: 3,
+        requestCount: 10,
+        status: "error",
+      };
+      expect(getSessionDisplayStatus(errorDuringProgress).label).toBe("ERROR");
+
+      const errorAfterComplete: SessionStatusInput = {
+        concurrentCount: 0,
+        requestCount: 5,
+        status: "error",
+      };
+      expect(getSessionDisplayStatus(errorAfterComplete).label).toBe("ERROR");
+    });
+  });
+});