Просмотр исходного кода

feat: 新增活跃 Session 实时监控页面

核心功能:
- 扩展 SessionManager,新增 5 个方法用于存储和查询 session 详细信息
- 在代理流程中集成 session 信息写入(session-guard, provider-selector, response-handler)
- 新增 /dashboard/sessions 实时监控页面,每 3 秒自动刷新
- 支持查看 session 的用户、密钥、供应商、模型、持续时长、Token 使用量、成本等详细信息
- 支持展开查看请求 messages(受 STORE_SESSION_MESSAGES 环境变量控制)
- 并发数卡片支持点击跳转到详情页面

技术实现:
- 使用 Redis Hash 存储 session:info 和 session:usage
- 复用现有 global:active_sessions Set 和 TTL 机制(5 分钟)
- 所有 Redis 写入异步执行,失败不影响主流程(Fail Open)
- 新增环境变量 STORE_SESSION_MESSAGES(默认 false,避免存储敏感信息)

数据结构:
- session:{id}:info - 用户、密钥、供应商、模型等元数据
- session:{id}:usage - Token 使用量、成本、状态等
- session:{id}:messages - 请求 messages(可选)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
ding113 3 месяцев назад
Родитель
Сommit
a05cd7b4cf

+ 2 - 0
.env.example

@@ -23,3 +23,5 @@ REDIS_URL=redis://localhost:6379        # Redis 连接地址(Docker 部署使
 
 # Session 配置
 SESSION_TTL=300                         # Session 过期时间(秒,默认 300 = 5 分钟)
+STORE_SESSION_MESSAGES=false            # 是否存储请求 messages 到 Redis(用于实时监控页面查看详情,默认:false)
+                                        # 警告:启用后会增加 Redis 内存使用,且可能包含敏感信息

+ 51 - 0
src/actions/active-sessions.ts

@@ -0,0 +1,51 @@
+"use server";
+
+import { SessionManager } from "@/lib/session-manager";
+import type { ActionResult } from "./types";
+import type { ActiveSessionInfo } from "@/types/session";
+
+/**
+ * 获取所有活跃 session 的详细信息
+ * 用于实时监控页面
+ */
+export async function getActiveSessions(): Promise<ActionResult<ActiveSessionInfo[]>> {
+  try {
+    const sessions = await SessionManager.getActiveSessions();
+    return {
+      ok: true,
+      data: sessions,
+    };
+  } catch (error) {
+    console.error('Failed to get active sessions:', error);
+    return {
+      ok: false,
+      error: '获取活跃 session 失败',
+    };
+  }
+}
+
+/**
+ * 获取指定 session 的 messages 内容
+ * 仅当 STORE_SESSION_MESSAGES=true 时可用
+ */
+export async function getSessionMessages(sessionId: string): Promise<ActionResult<unknown>> {
+  try {
+    const messages = await SessionManager.getSessionMessages(sessionId);
+    if (messages === null) {
+      return {
+        ok: false,
+        error: 'Messages 未存储或已过期',
+      };
+    }
+    return {
+      ok: true,
+      data: messages,
+    };
+  } catch (error) {
+    console.error('Failed to get session messages:', error);
+    return {
+      ok: false,
+      error: '获取 session messages 失败',
+    };
+  }
+}

+ 130 - 0
src/app/dashboard/sessions/_components/active-sessions-table.tsx

@@ -0,0 +1,130 @@
+"use client";
+
+import {
+  Table,
+  TableBody,
+  TableCell,
+  TableHead,
+  TableHeader,
+  TableRow,
+} from "@/components/ui/table";
+import { Badge } from "@/components/ui/badge";
+import type { ActiveSessionInfo } from "@/types/session";
+import { SessionMessagesDialog } from "./session-messages-dialog";
+
+interface ActiveSessionsTableProps {
+  sessions: ActiveSessionInfo[];
+  isLoading: boolean;
+}
+
+function formatDuration(durationMs: number | undefined): string {
+  if (!durationMs) return "-";
+
+  if (durationMs < 1000) {
+    return `${durationMs}ms`;
+  } else if (durationMs < 60000) {
+    return `${(durationMs / 1000).toFixed(1)}s`;
+  } else {
+    const minutes = Math.floor(durationMs / 60000);
+    const seconds = Math.floor((durationMs % 60000) / 1000);
+    return `${minutes}m ${seconds}s`;
+  }
+}
+
+function getStatusBadge(status: 'in_progress' | 'completed' | 'error', statusCode?: number) {
+  if (status === 'in_progress') {
+    return <Badge variant="default" className="bg-blue-500">进行中</Badge>;
+  } else if (status === 'error' || (statusCode && statusCode >= 400)) {
+    return <Badge variant="destructive">错误</Badge>;
+  } else {
+    return <Badge variant="outline" className="text-green-600 border-green-600">完成</Badge>;
+  }
+}
+
+export function ActiveSessionsTable({
+  sessions,
+  isLoading,
+}: ActiveSessionsTableProps) {
+  // 按开始时间降序排序(最新的在前)
+  const sortedSessions = [...sessions].sort((a, b) => b.startTime - a.startTime);
+
+  return (
+    <div className="space-y-4">
+      <div className="flex items-center justify-between">
+        <div className="text-sm text-muted-foreground">
+          共 {sessions.length} 个活跃 Session
+        </div>
+        {isLoading && (
+          <div className="text-sm text-muted-foreground animate-pulse">
+            刷新中...
+          </div>
+        )}
+      </div>
+
+      <div className="rounded-md border">
+        <Table>
+          <TableHeader>
+            <TableRow>
+              <TableHead>Session ID</TableHead>
+              <TableHead>用户</TableHead>
+              <TableHead>密钥</TableHead>
+              <TableHead>供应商</TableHead>
+              <TableHead>模型</TableHead>
+              <TableHead>类型</TableHead>
+              <TableHead className="text-right">持续时长</TableHead>
+              <TableHead className="text-right">输入</TableHead>
+              <TableHead className="text-right">输出</TableHead>
+              <TableHead className="text-right">成本</TableHead>
+              <TableHead>状态</TableHead>
+              <TableHead className="text-center">操作</TableHead>
+            </TableRow>
+          </TableHeader>
+          <TableBody>
+            {sortedSessions.length === 0 ? (
+              <TableRow>
+                <TableCell colSpan={12} className="text-center text-muted-foreground">
+                  暂无活跃 Session
+                </TableCell>
+              </TableRow>
+            ) : (
+              sortedSessions.map((session) => (
+                <TableRow key={session.sessionId}>
+                  <TableCell className="font-mono text-xs">
+                    {session.sessionId.substring(0, 16)}...
+                  </TableCell>
+                  <TableCell>{session.userName}</TableCell>
+                  <TableCell className="font-mono text-xs">{session.keyName}</TableCell>
+                  <TableCell>{session.providerName || "-"}</TableCell>
+                  <TableCell className="font-mono text-xs">{session.model || "-"}</TableCell>
+                  <TableCell>
+                    <Badge variant="outline" className="text-xs">
+                      {session.apiType === 'codex' ? 'Codex' : 'Chat'}
+                    </Badge>
+                  </TableCell>
+                  <TableCell className="text-right font-mono text-xs">
+                    {formatDuration(session.durationMs)}
+                  </TableCell>
+                  <TableCell className="text-right font-mono text-xs">
+                    {session.inputTokens?.toLocaleString() || "-"}
+                  </TableCell>
+                  <TableCell className="text-right font-mono text-xs">
+                    {session.outputTokens?.toLocaleString() || "-"}
+                  </TableCell>
+                  <TableCell className="text-right font-mono text-xs">
+                    {session.costUsd ? `$${parseFloat(session.costUsd).toFixed(6)}` : "-"}
+                  </TableCell>
+                  <TableCell>
+                    {getStatusBadge(session.status, session.statusCode)}
+                  </TableCell>
+                  <TableCell className="text-center">
+                    <SessionMessagesDialog sessionId={session.sessionId} />
+                  </TableCell>
+                </TableRow>
+              ))
+            )}
+          </TableBody>
+        </Table>
+      </div>
+    </div>
+  );
+}

+ 103 - 0
src/app/dashboard/sessions/_components/session-messages-dialog.tsx

@@ -0,0 +1,103 @@
+"use client";
+
+import * as React from "react";
+import { Button } from "@/components/ui/button";
+import {
+  Dialog,
+  DialogContent,
+  DialogDescription,
+  DialogHeader,
+  DialogTitle,
+  DialogTrigger,
+} from "@/components/ui/dialog";
+import { Eye } from "lucide-react";
+import { getSessionMessages } from "@/actions/active-sessions";
+import { useState } from "react";
+
+interface SessionMessagesDialogProps {
+  sessionId: string;
+}
+
+export function SessionMessagesDialog({ sessionId }: SessionMessagesDialogProps) {
+  const [isOpen, setIsOpen] = useState(false);
+  const [messages, setMessages] = useState<unknown | null>(null);
+  const [isLoading, setIsLoading] = useState(false);
+  const [error, setError] = useState<string | null>(null);
+
+  const handleOpen = async () => {
+    setIsOpen(true);
+    setIsLoading(true);
+    setError(null);
+
+    try {
+      const result = await getSessionMessages(sessionId);
+      if (result.ok) {
+        setMessages(result.data);
+      } else {
+        setError(result.error || '获取失败');
+      }
+    } catch (err) {
+      setError(err instanceof Error ? err.message : '未知错误');
+    } finally {
+      setIsLoading(false);
+    }
+  };
+
+  const handleClose = () => {
+    setIsOpen(false);
+    setMessages(null);
+    setError(null);
+  };
+
+  return (
+    <Dialog open={isOpen} onOpenChange={(open) => {
+      if (open) {
+        void handleOpen();
+      } else {
+        handleClose();
+      }
+    }}>
+      <DialogTrigger asChild>
+        <Button variant="ghost" size="sm">
+          <Eye className="h-4 w-4 mr-1" />
+          查看
+        </Button>
+      </DialogTrigger>
+      <DialogContent className="max-w-3xl max-h-[80vh] overflow-y-auto">
+        <DialogHeader>
+          <DialogTitle>Session Messages</DialogTitle>
+          <DialogDescription className="font-mono text-xs">
+            {sessionId}
+          </DialogDescription>
+        </DialogHeader>
+
+        <div className="space-y-4">
+          {isLoading ? (
+            <div className="text-center py-8 text-muted-foreground">
+              加载中...
+            </div>
+          ) : error ? (
+            <div className="text-center py-8 text-destructive">
+              {error}
+              {error.includes('未存储') && (
+                <p className="text-sm text-muted-foreground mt-2">
+                  提示:请设置环境变量 STORE_SESSION_MESSAGES=true 以启用 messages 存储
+                </p>
+              )}
+            </div>
+          ) : messages ? (
+            <div className="rounded-md border bg-muted p-4">
+              <pre className="text-xs overflow-x-auto">
+                {JSON.stringify(messages, null, 2)}
+              </pre>
+            </div>
+          ) : (
+            <div className="text-center py-8 text-muted-foreground">
+              暂无数据
+            </div>
+          )}
+        </div>
+      </DialogContent>
+    </Dialog>
+  );
+}

+ 68 - 0
src/app/dashboard/sessions/page.tsx

@@ -0,0 +1,68 @@
+"use client";
+
+import * as React from "react";
+import { useQuery } from "@tanstack/react-query";
+import { Section } from "@/components/section";
+import { Button } from "@/components/ui/button";
+import { ArrowLeft } from "lucide-react";
+import { useRouter } from "next/navigation";
+import { getActiveSessions } from "@/actions/active-sessions";
+import { ActiveSessionsTable } from "./_components/active-sessions-table";
+import type { ActiveSessionInfo } from "@/types/session";
+
+const REFRESH_INTERVAL = 3000; // 3秒刷新一次
+
+async function fetchActiveSessions(): Promise<ActiveSessionInfo[]> {
+  const result = await getActiveSessions();
+  if (!result.ok) {
+    throw new Error(result.error || '获取活跃 session 失败');
+  }
+  return result.data;
+}
+
+/**
+ * 活跃 Session 实时监控页面
+ */
+export default function ActiveSessionsPage() {
+  const router = useRouter();
+
+  const { data: sessions = [], isLoading, error } = useQuery<ActiveSessionInfo[], Error>({
+    queryKey: ["active-sessions"],
+    queryFn: fetchActiveSessions,
+    refetchInterval: REFRESH_INTERVAL,
+  });
+
+  return (
+    <div className="space-y-6">
+      <div className="flex items-center gap-4">
+        <Button
+          variant="outline"
+          size="sm"
+          onClick={() => router.back()}
+        >
+          <ArrowLeft className="h-4 w-4 mr-2" />
+          返回
+        </Button>
+        <div>
+          <h1 className="text-2xl font-bold">活跃 Session 监控</h1>
+          <p className="text-sm text-muted-foreground">
+            实时显示最近 5 分钟内的活跃请求(每 3 秒自动刷新)
+          </p>
+        </div>
+      </div>
+
+      <Section title="活跃 Session 列表">
+        {error ? (
+          <div className="text-center text-destructive py-8">
+            加载失败: {error.message}
+          </div>
+        ) : (
+          <ActiveSessionsTable
+            sessions={sessions}
+            isLoading={isLoading}
+          />
+        )}
+      </Section>
+    </div>
+  );
+}

+ 8 - 0
src/app/v1/_lib/proxy/provider-selector.ts

@@ -35,6 +35,14 @@ export class ProxyProviderResolver {
       // 绑定 session 到 provider(异步,不阻塞)
       if (session.sessionId) {
         void SessionManager.bindSessionToProvider(session.sessionId, session.provider.id);
+
+        // 更新 session 详细信息中的 provider 信息
+        void SessionManager.updateSessionProvider(session.sessionId, {
+          providerId: session.provider.id,
+          providerName: session.provider.name,
+        }).catch((error) => {
+          console.error('[ProviderSelector] Failed to update session provider info:', error);
+        });
       }
 
       return null;

+ 55 - 0
src/app/v1/_lib/proxy/response-handler.ts

@@ -3,6 +3,7 @@ import { findLatestPriceByModel } from "@/repository/model-price";
 import { parseSSEData } from "@/lib/utils/sse";
 import { calculateRequestCost } from "@/lib/utils/cost-calculation";
 import { RateLimitService } from "@/lib/rate-limit";
+import { SessionManager } from "@/lib/session-manager";
 import type { ProxySession } from "./session";
 import { ProxyLogger } from "./logger";
 import { ProxyStatusTracker } from "@/lib/proxy-status-tracker";
@@ -99,6 +100,33 @@ export class ProxyResponseHandler {
           await trackCostToRedis(session, usageMetrics);
         }
 
+        // 更新 session 使用量到 Redis(用于实时监控)
+        if (session.sessionId && usageMetrics) {
+          // 计算成本(复用相同逻辑)
+          let costUsdStr: string | undefined;
+          if (session.request.model) {
+            const priceData = await findLatestPriceByModel(session.request.model);
+            if (priceData?.priceData) {
+              const cost = calculateRequestCost(usageMetrics, priceData.priceData, provider.costMultiplier);
+              if (cost.gt(0)) {
+                costUsdStr = cost.toString();
+              }
+            }
+          }
+
+          void SessionManager.updateSessionUsage(session.sessionId, {
+            inputTokens: usageMetrics.input_tokens,
+            outputTokens: usageMetrics.output_tokens,
+            cacheCreationInputTokens: usageMetrics.cache_creation_input_tokens,
+            cacheReadInputTokens: usageMetrics.cache_read_input_tokens,
+            costUsd: costUsdStr,
+            status: statusCode >= 200 && statusCode < 300 ? 'completed' : 'error',
+            statusCode: statusCode,
+          }).catch((error: unknown) => {
+            console.error('[ResponseHandler] Failed to update session usage:', error);
+          });
+        }
+
         if (messageContext) {
           const duration = Date.now() - session.startTime;
           await updateMessageRequestDuration(messageContext.id, duration);
@@ -251,6 +279,33 @@ export class ProxyResponseHandler {
         // 追踪消费到 Redis(用于限流)
         await trackCostToRedis(session, usageForCost);
 
+        // 更新 session 使用量到 Redis(用于实时监控)
+        if (session.sessionId && usageForCost) {
+          // 计算成本(复用相同逻辑)
+          let costUsdStr: string | undefined;
+          if (session.request.model) {
+            const priceData = await findLatestPriceByModel(session.request.model);
+            if (priceData?.priceData) {
+              const cost = calculateRequestCost(usageForCost, priceData.priceData, provider.costMultiplier);
+              if (cost.gt(0)) {
+                costUsdStr = cost.toString();
+              }
+            }
+          }
+
+          void SessionManager.updateSessionUsage(session.sessionId, {
+            inputTokens: usageForCost.input_tokens,
+            outputTokens: usageForCost.output_tokens,
+            cacheCreationInputTokens: usageForCost.cache_creation_input_tokens,
+            cacheReadInputTokens: usageForCost.cache_read_input_tokens,
+            costUsd: costUsdStr,
+            status: statusCode >= 200 && statusCode < 300 ? 'completed' : 'error',
+            statusCode: statusCode,
+          }).catch((error: unknown) => {
+            console.error('[ResponseHandler] Failed to update session usage:', error);
+          });
+        }
+
         // 保存扩展信息(status code, tokens, provider chain)
         await updateMessageRequestDetails(messageContext.id, {
           statusCode: statusCode,

+ 24 - 0
src/app/v1/_lib/proxy/session-guard.ts

@@ -34,6 +34,30 @@ export class ProxySessionGuard {
       // 4. 设置到 session 对象
       session.setSessionId(sessionId);
 
+      // 5. 存储 session 详细信息到 Redis(用于实时监控)
+      void (async () => {
+        try {
+          if (session.authState?.user && session.authState?.key) {
+            await SessionManager.storeSessionInfo(sessionId, {
+              userName: session.authState.user.name,
+              userId: session.authState.user.id,
+              keyId: session.authState.key.id,
+              keyName: session.authState.key.name,
+              model: session.request.model,
+              apiType: session.originalFormat === 'openai' ? 'codex' : 'chat',
+            });
+
+            // 可选:存储 messages(受环境变量控制)
+            const messages = session.getMessages();
+            if (messages) {
+              await SessionManager.storeSessionMessages(sessionId, messages);
+            }
+          }
+        } catch (error) {
+          console.error('[ProxySessionGuard] Failed to store session info:', error);
+        }
+      })();
+
       console.debug(
         `[ProxySessionGuard] Session assigned: ${sessionId} (key=${keyId}, messagesLength=${session.getMessagesLength()}, clientProvided=${!!clientSessionId})`
       );

+ 12 - 2
src/components/customs/concurrent-sessions-card.tsx

@@ -1,6 +1,7 @@
 "use client";
 
 import * as React from "react";
+import { useRouter } from "next/navigation";
 import { useQuery } from "@tanstack/react-query";
 import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
 import { Activity } from "lucide-react";
@@ -19,16 +20,25 @@ async function fetchConcurrentSessions(): Promise<number> {
 /**
  * 并发 Session 数显示卡片
  * 显示最近 5 分钟内的活跃 session 数量
+ * 点击可跳转到详情页面
  */
 export function ConcurrentSessionsCard() {
+  const router = useRouter();
   const { data = 0 } = useQuery<number, Error>({
     queryKey: ["concurrent-sessions"],
     queryFn: fetchConcurrentSessions,
     refetchInterval: REFRESH_INTERVAL,
   });
 
+  const handleClick = () => {
+    router.push('/dashboard/sessions');
+  };
+
   return (
-    <Card>
+    <Card
+      className="cursor-pointer hover:border-primary transition-colors"
+      onClick={handleClick}
+    >
       <CardHeader className="flex flex-row items-center justify-between space-y-0 pb-2">
         <CardTitle className="text-sm font-medium">
           当前并发
@@ -38,7 +48,7 @@ export function ConcurrentSessionsCard() {
       <CardContent>
         <div className="text-2xl font-bold">{data}</div>
         <p className="text-xs text-muted-foreground">
-          最近 5 分钟活跃 Session
+          最近 5 分钟活跃 Session(点击查看详情)
         </p>
       </CardContent>
     </Card>

+ 263 - 0
src/lib/session-manager.ts

@@ -1,5 +1,11 @@
 import crypto from 'crypto';
 import { getRedisClient } from './redis';
+import type {
+  ActiveSessionInfo,
+  SessionStoreInfo,
+  SessionUsageUpdate,
+  SessionProviderInfo
+} from '@/types/session';
 
 /**
  * Session 管理器
@@ -8,9 +14,11 @@ import { getRedisClient } from './redis';
  * 1. 基于 messages 内容哈希识别 session
  * 2. 管理 session 与 provider 的绑定关系
  * 3. 支持客户端主动传递 session_id
+ * 4. 存储和查询活跃 session 详细信息(用于实时监控)
  */
 export class SessionManager {
   private static readonly SESSION_TTL = parseInt(process.env.SESSION_TTL || '300'); // 5 分钟
+  private static readonly STORE_MESSAGES = process.env.STORE_SESSION_MESSAGES === 'true';
 
   /**
    * 从客户端请求中提取 session_id(支持 metadata 或 header)
@@ -287,4 +295,259 @@ export class SessionManager {
 
     return null;
   }
+
+  /**
+   * 存储 session 基础信息(请求开始时调用)
+   */
+  static async storeSessionInfo(sessionId: string, info: SessionStoreInfo): Promise<void> {
+    const redis = getRedisClient();
+    if (!redis || redis.status !== 'ready') return;
+
+    try {
+      const pipeline = redis.pipeline();
+
+      // 存储详细信息到 Hash
+      pipeline.hset(`session:${sessionId}:info`, {
+        userName: info.userName,
+        userId: info.userId.toString(),
+        keyId: info.keyId.toString(),
+        keyName: info.keyName,
+        model: info.model || '',
+        apiType: info.apiType,
+        startTime: Date.now().toString(),
+        status: 'in_progress', // 初始状态
+      });
+
+      // 设置 TTL
+      pipeline.expire(`session:${sessionId}:info`, this.SESSION_TTL);
+
+      await pipeline.exec();
+      console.debug(`[SessionManager] Stored session info: ${sessionId}`);
+    } catch (error) {
+      console.error('[SessionManager] Failed to store session info:', error);
+    }
+  }
+
+  /**
+   * 更新 session 供应商信息(选择供应商后调用)
+   */
+  static async updateSessionProvider(sessionId: string, providerInfo: SessionProviderInfo): Promise<void> {
+    const redis = getRedisClient();
+    if (!redis || redis.status !== 'ready') return;
+
+    try {
+      const pipeline = redis.pipeline();
+
+      // 更新 info Hash 中的 provider 字段
+      pipeline.hset(`session:${sessionId}:info`, {
+        providerId: providerInfo.providerId.toString(),
+        providerName: providerInfo.providerName,
+      });
+
+      // 刷新 TTL
+      pipeline.expire(`session:${sessionId}:info`, this.SESSION_TTL);
+
+      await pipeline.exec();
+      console.debug(`[SessionManager] Updated session provider: ${sessionId} → ${providerInfo.providerName}`);
+    } catch (error) {
+      console.error('[SessionManager] Failed to update session provider:', error);
+    }
+  }
+
+  /**
+   * 更新 session 使用量和状态(响应完成时调用)
+   */
+  static async updateSessionUsage(sessionId: string, usage: SessionUsageUpdate): Promise<void> {
+    const redis = getRedisClient();
+    if (!redis || redis.status !== 'ready') return;
+
+    try {
+      const pipeline = redis.pipeline();
+
+      // 存储使用量到单独的 Hash
+      const usageData: Record<string, string> = {
+        status: usage.status,
+      };
+
+      if (usage.inputTokens !== undefined) {
+        usageData.inputTokens = usage.inputTokens.toString();
+      }
+      if (usage.outputTokens !== undefined) {
+        usageData.outputTokens = usage.outputTokens.toString();
+      }
+      if (usage.cacheCreationInputTokens !== undefined) {
+        usageData.cacheCreationInputTokens = usage.cacheCreationInputTokens.toString();
+      }
+      if (usage.cacheReadInputTokens !== undefined) {
+        usageData.cacheReadInputTokens = usage.cacheReadInputTokens.toString();
+      }
+      if (usage.costUsd !== undefined) {
+        usageData.costUsd = usage.costUsd;
+      }
+      if (usage.statusCode !== undefined) {
+        usageData.statusCode = usage.statusCode.toString();
+      }
+      if (usage.errorMessage !== undefined) {
+        usageData.errorMessage = usage.errorMessage;
+      }
+
+      pipeline.hset(`session:${sessionId}:usage`, usageData);
+
+      // 同时更新 info Hash 中的 status
+      pipeline.hset(`session:${sessionId}:info`, 'status', usage.status);
+
+      // 刷新 TTL
+      pipeline.expire(`session:${sessionId}:usage`, this.SESSION_TTL);
+      pipeline.expire(`session:${sessionId}:info`, this.SESSION_TTL);
+
+      await pipeline.exec();
+      console.debug(`[SessionManager] Updated session usage: ${sessionId} (${usage.status})`);
+    } catch (error) {
+      console.error('[SessionManager] Failed to update session usage:', error);
+    }
+  }
+
+  /**
+   * 存储 session 请求 messages(可选,受环境变量控制)
+   */
+  static async storeSessionMessages(sessionId: string, messages: unknown): Promise<void> {
+    if (!this.STORE_MESSAGES) {
+      console.debug('[SessionManager] STORE_SESSION_MESSAGES is disabled, skipping');
+      return;
+    }
+
+    const redis = getRedisClient();
+    if (!redis || redis.status !== 'ready') return;
+
+    try {
+      const messagesJson = JSON.stringify(messages);
+      await redis.setex(`session:${sessionId}:messages`, this.SESSION_TTL, messagesJson);
+      console.debug(`[SessionManager] Stored session messages: ${sessionId}`);
+    } catch (error) {
+      console.error('[SessionManager] Failed to store session messages:', error);
+    }
+  }
+
+  /**
+   * 获取活跃 session 列表(用于实时监控页面)
+   */
+  static async getActiveSessions(): Promise<ActiveSessionInfo[]> {
+    const redis = getRedisClient();
+    if (!redis || redis.status !== 'ready') {
+      console.warn('[SessionManager] Redis not ready, returning empty list');
+      return [];
+    }
+
+    try {
+      // 1. 获取所有活跃 session ID
+      const sessionIds = await redis.smembers('global:active_sessions');
+      if (sessionIds.length === 0) {
+        return [];
+      }
+
+      console.debug(`[SessionManager] Found ${sessionIds.length} active sessions`);
+
+      // 2. 批量获取 session 详细信息
+      const sessions: ActiveSessionInfo[] = [];
+      const pipeline = redis.pipeline();
+
+      for (const sessionId of sessionIds) {
+        pipeline.hgetall(`session:${sessionId}:info`);
+        pipeline.hgetall(`session:${sessionId}:usage`);
+      }
+
+      const results = await pipeline.exec();
+      if (!results) {
+        return [];
+      }
+
+      // 3. 解析结果
+      for (let i = 0; i < sessionIds.length; i++) {
+        const infoIndex = i * 2;
+        const usageIndex = i * 2 + 1;
+
+        const infoResult = results[infoIndex];
+        const usageResult = results[usageIndex];
+
+        // 检查结果有效性
+        if (!infoResult || infoResult[0] !== null) continue;
+        if (!usageResult || usageResult[0] !== null) continue;
+
+        const info = infoResult[1] as Record<string, string>;
+        const usage = usageResult[1] as Record<string, string>;
+
+        // 跳过空的 info(session 可能已过期)
+        if (!info || Object.keys(info).length === 0) continue;
+
+        // 解析并构建 ActiveSessionInfo
+        const startTime = parseInt(info.startTime || '0', 10);
+        const now = Date.now();
+
+        const session: ActiveSessionInfo = {
+          sessionId: sessionIds[i],
+          userName: info.userName || 'unknown',
+          userId: parseInt(info.userId || '0', 10),
+          keyId: parseInt(info.keyId || '0', 10),
+          keyName: info.keyName || 'unknown',
+          providerId: info.providerId ? parseInt(info.providerId, 10) : null,
+          providerName: info.providerName || null,
+          model: info.model || null,
+          apiType: (info.apiType as 'chat' | 'codex') || 'chat',
+          startTime,
+          status: (usage.status || info.status || 'in_progress') as 'in_progress' | 'completed' | 'error',
+          durationMs: startTime > 0 ? now - startTime : undefined,
+        };
+
+        // 添加 usage 数据(如果存在)
+        if (usage && Object.keys(usage).length > 0) {
+          if (usage.inputTokens) session.inputTokens = parseInt(usage.inputTokens, 10);
+          if (usage.outputTokens) session.outputTokens = parseInt(usage.outputTokens, 10);
+          if (usage.cacheCreationInputTokens) session.cacheCreationInputTokens = parseInt(usage.cacheCreationInputTokens, 10);
+          if (usage.cacheReadInputTokens) session.cacheReadInputTokens = parseInt(usage.cacheReadInputTokens, 10);
+          if (usage.costUsd) session.costUsd = usage.costUsd;
+          if (usage.statusCode) session.statusCode = parseInt(usage.statusCode, 10);
+          if (usage.errorMessage) session.errorMessage = usage.errorMessage;
+
+          // 计算总 token
+          const input = session.inputTokens || 0;
+          const output = session.outputTokens || 0;
+          const cacheCreate = session.cacheCreationInputTokens || 0;
+          const cacheRead = session.cacheReadInputTokens || 0;
+          session.totalTokens = input + output + cacheCreate + cacheRead;
+        }
+
+        sessions.push(session);
+      }
+
+      console.debug(`[SessionManager] Retrieved ${sessions.length} active sessions with details`);
+      return sessions;
+    } catch (error) {
+      console.error('[SessionManager] Failed to get active sessions:', error);
+      return [];
+    }
+  }
+
+  /**
+   * 获取 session 的 messages 内容
+   */
+  static async getSessionMessages(sessionId: string): Promise<unknown | null> {
+    if (!this.STORE_MESSAGES) {
+      console.warn('[SessionManager] STORE_SESSION_MESSAGES is disabled');
+      return null;
+    }
+
+    const redis = getRedisClient();
+    if (!redis || redis.status !== 'ready') return null;
+
+    try {
+      const messagesJson = await redis.get(`session:${sessionId}:messages`);
+      if (!messagesJson) {
+        return null;
+      }
+      return JSON.parse(messagesJson);
+    } catch (error) {
+      console.error('[SessionManager] Failed to get session messages:', error);
+      return null;
+    }
+  }
 }

+ 73 - 0
src/types/session.ts

@@ -0,0 +1,73 @@
+/**
+ * 活跃 Session 详细信息
+ * 用于实时监控当前正在运行的请求
+ */
+export interface ActiveSessionInfo {
+  // 基础标识
+  sessionId: string;
+
+  // 用户和密钥信息
+  userName: string;
+  userId: number;
+  keyId: number;
+  keyName: string;
+
+  // 供应商信息
+  providerId: number | null;
+  providerName: string | null;
+
+  // 请求元数据
+  model: string | null;
+  apiType: 'chat' | 'codex';
+  startTime: number; // Unix timestamp (ms)
+
+  // 使用量(可能为空,表示请求未完成)
+  inputTokens?: number;
+  outputTokens?: number;
+  cacheCreationInputTokens?: number;
+  cacheReadInputTokens?: number;
+  totalTokens?: number;
+  costUsd?: string;
+
+  // 状态
+  status: 'in_progress' | 'completed' | 'error';
+  statusCode?: number;
+  errorMessage?: string;
+
+  // 派生字段(前端计算)
+  durationMs?: number; // 持续时长
+}
+
+/**
+ * Session 基础信息(存储到 Redis)
+ */
+export interface SessionStoreInfo {
+  userName: string;
+  userId: number;
+  keyId: number;
+  keyName: string;
+  model: string | null;
+  apiType: 'chat' | 'codex';
+}
+
+/**
+ * Session 使用量信息(响应时更新)
+ */
+export interface SessionUsageUpdate {
+  inputTokens?: number;
+  outputTokens?: number;
+  cacheCreationInputTokens?: number;
+  cacheReadInputTokens?: number;
+  costUsd?: string;
+  status: 'completed' | 'error';
+  statusCode?: number;
+  errorMessage?: string;
+}
+
+/**
+ * Session 供应商信息(选择后更新)
+ */
+export interface SessionProviderInfo {
+  providerId: number;
+  providerName: string;
+}