| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046 |
- import "server-only";
- import crypto from "node:crypto";
- import { extractCodexSessionId } from "@/app/v1/_lib/codex/session-extractor";
- import { sanitizeHeaders, sanitizeUrl } from "@/app/v1/_lib/proxy/errors";
- import { getEnvConfig } from "@/lib/config/env.schema";
- import { logger } from "@/lib/logger";
- import {
- redactMessages,
- redactRequestBody,
- redactResponseBody,
- } from "@/lib/utils/message-redaction";
- import { normalizeRequestSequence } from "@/lib/utils/request-sequence";
- import type {
- ActiveSessionInfo,
- SessionProviderInfo,
- SessionStoreInfo,
- SessionUsageUpdate,
- } from "@/types/session";
- import type { SpecialSetting } from "@/types/special-settings";
- import { getRedisClient } from "./redis";
- import { SessionTracker } from "./session-tracker";
- function headersToSanitizedObject(headers: Headers): Record<string, string> {
- const sanitizedText = sanitizeHeaders(headers);
- if (!sanitizedText || sanitizedText === "(empty)") {
- return {};
- }
- const obj: Record<string, string> = {};
- const lines = sanitizedText.split(/\r?\n/).filter(Boolean);
- for (const line of lines) {
- const colonIndex = line.indexOf(":");
- if (colonIndex === -1) continue;
- const name = line.slice(0, colonIndex).trim();
- const value = line.slice(colonIndex + 1).trim();
- if (!name) continue;
- if (obj[name]) {
- obj[name] = `${obj[name]}\n${value}`;
- } else {
- obj[name] = value;
- }
- }
- return obj;
- }
- function parseHeaderRecord(value: string): Record<string, string> | null {
- try {
- const parsed: unknown = JSON.parse(value);
- if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
- const record: Record<string, string> = {};
- for (const [key, raw] of Object.entries(parsed as Record<string, unknown>)) {
- if (typeof raw === "string") {
- record[key] = raw;
- }
- }
- return record;
- } catch (error) {
- logger.warn("SessionManager: Failed to parse header record JSON", { error });
- return null;
- }
- }
- type SessionRequestMeta = {
- url: string;
- method: string;
- };
- type SessionResponseMeta = {
- url: string;
- statusCode: number;
- };
- /**
- * Session 管理器
- *
- * 核心功能:
- * 1. 基于 messages 内容哈希识别 session
- * 2. 管理 session 与 provider 的绑定关系
- * 3. 支持客户端主动传递 session_id
- * 4. 存储和查询活跃 session 详细信息(用于实时监控)
- */
- export class SessionManager {
- private static readonly SESSION_TTL = parseInt(process.env.SESSION_TTL || "300", 10); // 5 分钟
- private static readonly SHORT_CONTEXT_THRESHOLD = parseInt(
- process.env.SHORT_CONTEXT_THRESHOLD || "2",
- 10
- ); // 短上下文阈值
- private static readonly ENABLE_SHORT_CONTEXT_DETECTION =
- process.env.ENABLE_SHORT_CONTEXT_DETECTION !== "false"; // 默认启用
- /**
- * 获取 STORE_SESSION_MESSAGES 配置
- * - true:原样存储 message 内容
- * - false(默认):存储但对 message 内容脱敏 [REDACTED]
- */
- private static get STORE_MESSAGES(): boolean {
- return getEnvConfig().STORE_SESSION_MESSAGES;
- }
- /**
- * 从客户端请求中提取 session_id(支持 metadata 或 header)
- *
- * 优先级:
- * 1. metadata.user_id (Claude Code 主要方式,典型格式: "user_{hash}_account__session_{sessionId}")
- * 2. metadata.session_id (备选方式)
- */
- static extractClientSessionId(
- requestMessage: Record<string, unknown>,
- headers?: Headers | null,
- _userAgent?: string | null
- ): string | null {
- // Codex 请求:优先尝试从 headers/body 提取稳定的 session_id
- if (headers && Array.isArray(requestMessage.input)) {
- const result = extractCodexSessionId(headers, requestMessage);
- if (result.sessionId) {
- logger.trace("SessionManager: Extracted session from Codex request", {
- sessionId: result.sessionId,
- source: result.source,
- });
- return result.sessionId;
- }
- return null;
- }
- const metadata = requestMessage.metadata;
- if (!metadata || typeof metadata !== "object") {
- return null;
- }
- const metadataObj = metadata as Record<string, unknown>;
- // 方案 A: 从 metadata.user_id 中提取 (Claude Code 主要方式)
- // 典型格式: "user_{hash}_account__session_{sessionId}"
- if (typeof metadataObj.user_id === "string" && metadataObj.user_id.length > 0) {
- const userId = metadataObj.user_id;
- const sessionMarker = "_session_";
- const markerIndex = userId.indexOf(sessionMarker);
- if (markerIndex !== -1) {
- const extractedSessionId = userId.substring(markerIndex + sessionMarker.length);
- if (extractedSessionId.length > 0) {
- logger.trace("SessionManager: Extracted session from metadata.user_id", {
- sessionId: extractedSessionId,
- });
- return extractedSessionId;
- }
- }
- }
- // 方案 B: 直接从 metadata.session_id 读取 (备选方案)
- if (typeof metadataObj.session_id === "string" && metadataObj.session_id.length > 0) {
- logger.trace("SessionManager: Extracted session from metadata.session_id", {
- sessionId: metadataObj.session_id,
- });
- return metadataObj.session_id;
- }
- return null;
- }
- /**
- * 生成新的 session_id
- * 格式:sess_{timestamp}_{random}
- */
- static generateSessionId(): string {
- const timestamp = Date.now().toString(36);
- const random = crypto.randomBytes(6).toString("hex");
- return `sess_${timestamp}_${random}`;
- }
- /**
- * 获取 Session 内下一个请求序号(原子操作)
- *
- * 使用 Redis INCR 保证并发安全,序号从 1 开始递增
- * 每个请求在同一 Session 内获得唯一序号,用于独立存储 messages
- *
- * @param sessionId - Session ID
- * @returns 请求序号(从 1 开始),Redis 不可用时返回基于时间戳的唯一序号
- */
- static async getNextRequestSequence(sessionId: string): Promise<number> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- // 改进的 fallback:使用时间戳 + 随机数生成伪唯一序号
- // 避免 Redis 不可用时所有请求都返回 1 导致的冲突
- const fallbackSeq = (Date.now() % 1000000) + Math.floor(Math.random() * 1000);
- logger.warn("SessionManager: Redis not ready, using fallback sequence", {
- sessionId,
- fallbackSeq,
- });
- return fallbackSeq;
- }
- try {
- const key = `session:${sessionId}:seq`;
- const sequence = await redis.incr(key);
- // 首次创建时设置过期时间
- if (sequence === 1) {
- await redis.expire(key, SessionManager.SESSION_TTL);
- }
- logger.trace("SessionManager: Got next request sequence", {
- sessionId,
- sequence,
- });
- return sequence;
- } catch (error) {
- // 改进的 fallback:使用时间戳 + 随机数生成伪唯一序号
- const fallbackSeq = (Date.now() % 1000000) + Math.floor(Math.random() * 1000);
- logger.error("SessionManager: Failed to get request sequence, using fallback", {
- error,
- sessionId,
- fallbackSeq,
- });
- return fallbackSeq;
- }
- }
- /**
- * 获取 Session 当前的请求计数
- *
- * @param sessionId - Session ID
- * @returns 当前请求数量,不存在返回 0
- */
- static async getSessionRequestCount(sessionId: string): Promise<number> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return 0;
- try {
- const count = await redis.get(`session:${sessionId}:seq`);
- return count ? parseInt(count, 10) : 0;
- } catch (error) {
- logger.error("SessionManager: Failed to get request count", {
- error,
- sessionId,
- });
- return 0;
- }
- }
- /**
- * 计算 messages 内容哈希(用于 session 匹配)
- *
- * ⚠️ 注意: 这是一个降级方案,仅在无法从 metadata 提取 session ID 时使用
- * 不同会话如果开头相似可能产生相同哈希,因此优先使用 metadata.user_id
- *
- * @param messages - 消息数组
- * @returns 哈希值(16 字符)或 null
- */
- static calculateMessagesHash(messages: unknown): string | null {
- if (!Array.isArray(messages) || messages.length === 0) {
- logger.trace("SessionManager: calculateMessagesHash - messages is empty or not array");
- return null;
- }
- // 计算范围:前 N 条(N = min(length, 3))
- const count = Math.min(messages.length, 3);
- const contents: string[] = [];
- for (let i = 0; i < count; i++) {
- const message = messages[i];
- if (message && typeof message === "object") {
- const messageObj = message as Record<string, unknown>;
- const content = messageObj.content;
- if (typeof content === "string") {
- contents.push(content);
- logger.trace("SessionManager: Message content (string)", {
- index: i,
- preview: content.substring(0, 100),
- });
- } else if (Array.isArray(content)) {
- // 支持多模态 content(数组格式)
- const textParts = content
- .filter(
- (item) =>
- item &&
- typeof item === "object" &&
- (item as Record<string, unknown>).type === "text"
- )
- .map((item) => (item as Record<string, unknown>).text);
- const joined = textParts.join("");
- contents.push(joined);
- logger.trace("SessionManager: Message content (array)", {
- index: i,
- preview: joined.substring(0, 100),
- });
- } else {
- logger.trace("SessionManager: Message content type (skipped)", {
- index: i,
- type: typeof content,
- });
- }
- }
- }
- if (contents.length === 0) {
- logger.trace("SessionManager: calculateMessagesHash - no valid contents extracted");
- return null;
- }
- // 拼接并计算 SHA-256 哈希
- const combined = contents.join("|");
- const hash = crypto.createHash("sha256").update(combined, "utf8").digest("hex");
- // 截取前 16 字符(足够区分,节省存储)
- const shortHash = hash.substring(0, 16);
- logger.trace("SessionManager: Calculated hash", {
- hash: shortHash,
- messageCount: contents.length,
- totalChars: combined.length,
- });
- return shortHash;
- }
- /**
- * 获取或创建 session_id(核心方法)
- *
- * @param keyId - API Key ID
- * @param messages - 消息数组
- * @param clientSessionId - 客户端传递的 session_id(可选)
- * @returns session_id
- */
- static async getOrCreateSessionId(
- keyId: number,
- messages: unknown,
- clientSessionId?: string | null
- ): Promise<string> {
- const redis = getRedisClient();
- const messagesLength = Array.isArray(messages) ? messages.length : 0;
- logger.trace("SessionManager: getOrCreateSessionId called", {
- keyId,
- hasClientSession: !!clientSessionId,
- messagesLength,
- });
- // 1. 优先使用客户端传递的 session_id (来自 metadata.user_id 或 metadata.session_id)
- if (clientSessionId) {
- // 2. 短上下文并发检测(方案E)
- if (
- SessionManager.ENABLE_SHORT_CONTEXT_DETECTION &&
- messagesLength <= SessionManager.SHORT_CONTEXT_THRESHOLD
- ) {
- // 检查该 session 是否有其他请求正在运行
- const concurrentCount = await SessionTracker.getConcurrentCount(clientSessionId);
- if (concurrentCount > 0) {
- // 场景B:有并发请求 → 这是并发短任务 → 强制新建 session
- const newId = SessionManager.generateSessionId();
- logger.info("SessionManager: 检测到并发短任务,强制新建 session", {
- originalSessionId: clientSessionId,
- newSessionId: newId,
- messagesLength,
- existingConcurrentCount: concurrentCount,
- });
- return newId;
- }
- // 场景A:无并发 → 这可能是长对话的开始 → 允许复用
- logger.debug("SessionManager: 短上下文但 session 空闲,允许复用(长对话开始)", {
- sessionId: clientSessionId,
- messagesLength,
- });
- }
- // 3. 长上下文 or 无并发 → 正常复用
- logger.debug("SessionManager: Using client-provided session", {
- sessionId: clientSessionId,
- });
- // 刷新 TTL(滑动窗口)
- if (redis && redis.status === "ready") {
- await SessionManager.refreshSessionTTL(clientSessionId).catch((err) => {
- logger.error("SessionManager: Failed to refresh TTL", { error: err });
- });
- }
- return clientSessionId;
- }
- // 2. 降级方案:计算 messages 内容哈希(TC-047 警告:不可靠)
- logger.warn(
- "SessionManager: No client session ID, falling back to content hash (unreliable for compressed dialogs)",
- {
- keyId,
- messagesLength: Array.isArray(messages) ? messages.length : 0,
- }
- );
- const contentHash = SessionManager.calculateMessagesHash(messages);
- if (!contentHash) {
- // 降级:无法计算哈希,生成新 session
- const newId = SessionManager.generateSessionId();
- logger.warn("SessionManager: Cannot calculate hash, generating new session", {
- sessionId: newId,
- });
- return newId;
- }
- // 3. 尝试从 Redis 查找已有 session
- if (redis && redis.status === "ready") {
- try {
- const hashKey = `hash:${contentHash}:session`;
- const existingSessionId = await redis.get(hashKey);
- if (existingSessionId) {
- // 找到已有 session,刷新 TTL
- await SessionManager.refreshSessionTTL(existingSessionId);
- logger.trace("SessionManager: Reusing session via hash", {
- sessionId: existingSessionId,
- hash: contentHash,
- });
- return existingSessionId;
- }
- // 未找到:创建新 session
- const newSessionId = SessionManager.generateSessionId();
- // 存储映射关系(异步,不阻塞)
- void SessionManager.storeSessionMapping(contentHash, newSessionId, keyId);
- logger.trace("SessionManager: Created new session with hash", {
- sessionId: newSessionId,
- hash: contentHash,
- });
- return newSessionId;
- } catch (error) {
- logger.error("SessionManager: Redis error", { error });
- // 降级:Redis 错误,生成新 session
- return SessionManager.generateSessionId();
- }
- }
- // 4. Redis 不可用,降级生成新 session
- return SessionManager.generateSessionId();
- }
- /**
- * 存储 hash → session 映射关系
- */
- private static async storeSessionMapping(
- contentHash: string,
- sessionId: string,
- keyId: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const pipeline = redis.pipeline();
- const hashKey = `hash:${contentHash}:session`;
- // 存储映射关系
- pipeline.setex(hashKey, SessionManager.SESSION_TTL, sessionId);
- // 初始化 session 元数据
- pipeline.setex(`session:${sessionId}:key`, SessionManager.SESSION_TTL, keyId.toString());
- pipeline.setex(
- `session:${sessionId}:last_seen`,
- SessionManager.SESSION_TTL,
- Date.now().toString()
- );
- await pipeline.exec();
- } catch (error) {
- logger.error("SessionManager: Failed to store session mapping", {
- error,
- });
- }
- }
- /**
- * 刷新 session TTL(滑动窗口)
- */
- private static async refreshSessionTTL(sessionId: string): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const pipeline = redis.pipeline();
- // 刷新所有 session 相关 key 的 TTL
- pipeline.expire(`session:${sessionId}:key`, SessionManager.SESSION_TTL);
- pipeline.expire(`session:${sessionId}:provider`, SessionManager.SESSION_TTL);
- pipeline.setex(
- `session:${sessionId}:last_seen`,
- SessionManager.SESSION_TTL,
- Date.now().toString()
- );
- await pipeline.exec();
- } catch (error) {
- logger.error("SessionManager: Failed to refresh TTL", { error });
- }
- }
- /**
- * 绑定 session 到 provider(TC-009 修复:使用 SET NX 避免竞态条件)
- */
- static async bindSessionToProvider(sessionId: string, providerId: number): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const key = `session:${sessionId}:provider`;
- // 使用 SET ... NX 保证只有第一次绑定成功(原子操作)
- const result = await redis.set(
- key,
- providerId.toString(),
- "EX",
- SessionManager.SESSION_TTL,
- "NX" // Only set if not exists
- );
- if (result === "OK") {
- logger.trace("SessionManager: Bound session to provider", {
- sessionId,
- providerId,
- });
- } else {
- // 已绑定过,不覆盖(避免并发请求选择不同供应商)
- logger.debug("SessionManager: Session already bound, skipping", {
- sessionId,
- attemptedProviderId: providerId,
- });
- }
- } catch (error) {
- logger.error("SessionManager: Failed to bind provider", { error });
- }
- }
- /**
- * 获取 session 绑定的 provider
- */
- static async getSessionProvider(sessionId: string): Promise<number | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const value = await redis.get(`session:${sessionId}:provider`);
- if (value) {
- const providerId = parseInt(value, 10);
- if (!Number.isNaN(providerId)) {
- return providerId;
- }
- }
- } catch (error) {
- logger.error("SessionManager: Failed to get session provider", { error });
- }
- return null;
- }
- /**
- * 获取当前绑定供应商的优先级
- *
- * ⚠️ 修复:从 session:provider 读取(真实绑定),而不是 session:info
- * 原因:info.providerId 是并发检查通过的供应商,可能请求失败了
- *
- * @param sessionId - Session ID
- * @returns 优先级数字(数字越小优先级越高),如果未绑定或无法查询则返回 null
- */
- static async getSessionProviderPriority(sessionId: string): Promise<number | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- // 修复:从真实绑定关系读取(session:provider)
- const providerIdStr = await redis.get(`session:${sessionId}:provider`);
- if (!providerIdStr) {
- return null;
- }
- const providerId = parseInt(providerIdStr, 10);
- if (Number.isNaN(providerId)) {
- return null;
- }
- // 查询供应商详情获取优先级
- const { findProviderById } = await import("@/repository/provider");
- const provider = await findProviderById(providerId);
- if (!provider) {
- logger.warn("SessionManager: Bound provider not found", { providerId });
- return null;
- }
- return provider.priority;
- } catch (error) {
- logger.error("SessionManager: Failed to get session provider priority", {
- error,
- });
- return null;
- }
- }
- /**
- * 智能更新 Session 绑定
- *
- * 策略:首次绑定用 SET NX;故障转移后无条件更新;其他情况按优先级和熔断状态决策
- */
- static async updateSessionBindingSmart(
- sessionId: string,
- newProviderId: number,
- newProviderPriority: number,
- isFirstAttempt: boolean = false,
- isFailoverSuccess: boolean = false
- ): Promise<{ updated: boolean; reason: string; details?: string }> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- return { updated: false, reason: "redis_not_ready" };
- }
- try {
- // ========== 情况 1:首次尝试成功 ==========
- if (isFirstAttempt) {
- const key = `session:${sessionId}:provider`;
- // 使用 SET NX 绑定(避免覆盖并发请求)
- const result = await redis.set(
- key,
- newProviderId.toString(),
- "EX",
- SessionManager.SESSION_TTL,
- "NX"
- );
- if (result === "OK") {
- logger.info("SessionManager: Bound session to provider (first success)", {
- sessionId,
- providerId: newProviderId,
- priority: newProviderPriority,
- });
- return {
- updated: true,
- reason: "first_success",
- details: `首次成功,绑定到供应商 ${newProviderId} (priority=${newProviderPriority})`,
- };
- } else {
- // 并发请求已经绑定了,放弃更新
- return {
- updated: false,
- reason: "concurrent_binding_exists",
- details: "并发请求已绑定,跳过",
- };
- }
- }
- // ========== 情况 2:重试成功(需要智能决策)==========
- // 2.0 故障转移成功:无条件更新绑定(减少缓存切换)
- if (isFailoverSuccess) {
- const key = `session:${sessionId}:provider`;
- await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
- logger.info("SessionManager: Updated binding after failover", {
- sessionId,
- newProviderId,
- newPriority: newProviderPriority,
- });
- return {
- updated: true,
- reason: "failover_success",
- details: `故障转移成功,绑定到供应商 ${newProviderId}`,
- };
- }
- // 2.1 获取当前绑定的供应商 ID
- const currentProviderIdStr = await redis.get(`session:${sessionId}:provider`);
- if (!currentProviderIdStr) {
- // 没有绑定,使用 SET NX 绑定
- const key = `session:${sessionId}:provider`;
- const result = await redis.set(
- key,
- newProviderId.toString(),
- "EX",
- SessionManager.SESSION_TTL,
- "NX"
- );
- if (result === "OK") {
- logger.info("SessionManager: Bound session (no previous binding)", {
- sessionId,
- providerId: newProviderId,
- priority: newProviderPriority,
- });
- return {
- updated: true,
- reason: "no_previous_binding",
- details: `无绑定,绑定到供应商 ${newProviderId} (priority=${newProviderPriority})`,
- };
- } else {
- return {
- updated: false,
- reason: "concurrent_binding_exists",
- details: "并发请求已绑定",
- };
- }
- }
- const currentProviderId = parseInt(currentProviderIdStr, 10);
- if (Number.isNaN(currentProviderId)) {
- logger.warn("SessionManager: Invalid provider ID in Redis", {
- currentProviderIdStr,
- });
- return { updated: false, reason: "invalid_provider_id" };
- }
- // 2.2 查询当前供应商的详情(优先级 + 健康状态)
- const { findProviderById } = await import("@/repository/provider");
- const currentProvider = await findProviderById(currentProviderId);
- if (!currentProvider) {
- // 当前供应商不存在(可能被删除),直接更新
- const key = `session:${sessionId}:provider`;
- await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
- logger.info("SessionManager: Updated binding (current provider not found)", {
- sessionId,
- oldProviderId: currentProviderId,
- newProviderId,
- newPriority: newProviderPriority,
- });
- return {
- updated: true,
- reason: "current_provider_not_found",
- details: `原供应商 ${currentProviderId} 不存在,更新到 ${newProviderId}`,
- };
- }
- const currentPriority = currentProvider.priority || 0;
- // 2.3 智能决策:优先级比较 + 健康检查
- // ========== 规则 A:新供应商优先级更高(数字更小)→ 直接迁移 ==========
- if (newProviderPriority < currentPriority) {
- const key = `session:${sessionId}:provider`;
- await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
- logger.info("SessionManager: Migrated to higher priority provider", {
- sessionId,
- oldProviderId: currentProviderId,
- oldProviderName: currentProvider.name,
- oldPriority: currentPriority,
- newProviderId,
- newPriority: newProviderPriority,
- });
- return {
- updated: true,
- reason: "priority_upgrade",
- details: `优先级升级:从供应商 ${currentProvider.name} (priority=${currentPriority}) 迁移到 ${newProviderId} (priority=${newProviderPriority})`,
- };
- }
- // ========== 规则 B:新供应商优先级相同或更低 → 检查原供应商健康状态 ==========
- const { isCircuitOpen } = await import("@/lib/circuit-breaker");
- const isCurrentCircuitOpen = await isCircuitOpen(currentProviderId);
- if (isCurrentCircuitOpen) {
- // 原供应商已熔断 → 更新到新供应商(备用供应商接管)
- const key = `session:${sessionId}:provider`;
- await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
- logger.info("SessionManager: Migrated to backup provider (circuit open)", {
- sessionId,
- oldProviderId: currentProviderId,
- oldProviderName: currentProvider.name,
- oldPriority: currentPriority,
- newProviderId,
- newPriority: newProviderPriority,
- });
- return {
- updated: true,
- reason: "circuit_open_fallback",
- details: `原供应商 ${currentProvider.name} (priority=${currentPriority}) 已熔断,切换到供应商 ${newProviderId} (priority=${newProviderPriority})`,
- };
- }
- // 原供应商健康 + 优先级更高/相同 → 保持原绑定(尽量使用主供应商)
- logger.debug("SessionManager: Keeping current provider (healthy and higher/equal priority)", {
- sessionId,
- currentProviderId,
- currentProviderName: currentProvider.name,
- currentPriority,
- attemptedProviderId: newProviderId,
- attemptedPriority: newProviderPriority,
- });
- return {
- updated: false,
- reason: "keep_healthy_higher_priority",
- details: `保持原供应商 ${currentProvider.name} (priority=${currentPriority}, 健康),拒绝供应商 ${newProviderId} (priority=${newProviderPriority})`,
- };
- } catch (error) {
- logger.error("SessionManager: Failed to update session binding", {
- error,
- });
- return { updated: false, reason: "error", details: String(error) };
- }
- }
- /**
- * 存储 session 基础信息(请求开始时调用)
- */
- static async storeSessionInfo(sessionId: string, info: SessionStoreInfo): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const pipeline = redis.pipeline();
- // 存储详细信息到 Hash
- pipeline.hset(`session:${sessionId}:info`, {
- userName: info.userName,
- userId: info.userId.toString(),
- keyId: info.keyId.toString(),
- keyName: info.keyName,
- model: info.model || "",
- apiType: info.apiType,
- startTime: Date.now().toString(),
- status: "in_progress", // 初始状态
- });
- // 设置 TTL
- pipeline.expire(`session:${sessionId}:info`, SessionManager.SESSION_TTL);
- await pipeline.exec();
- logger.trace("SessionManager: Stored session info", { sessionId });
- } catch (error) {
- logger.error("SessionManager: Failed to store session info", { error });
- }
- }
- /**
- * 更新 session 供应商信息(选择供应商后调用)
- */
- static async updateSessionProvider(
- sessionId: string,
- providerInfo: SessionProviderInfo
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const pipeline = redis.pipeline();
- // 更新 info Hash 中的 provider 字段
- pipeline.hset(`session:${sessionId}:info`, {
- providerId: providerInfo.providerId.toString(),
- providerName: providerInfo.providerName,
- });
- // 刷新 TTL
- pipeline.expire(`session:${sessionId}:info`, SessionManager.SESSION_TTL);
- await pipeline.exec();
- logger.trace("SessionManager: Updated session provider", {
- sessionId,
- providerName: providerInfo.providerName,
- });
- } catch (error) {
- logger.error("SessionManager: Failed to update session provider", {
- error,
- });
- }
- }
- /**
- * 更新 session 使用量和状态(响应完成时调用)
- */
- static async updateSessionUsage(sessionId: string, usage: SessionUsageUpdate): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const pipeline = redis.pipeline();
- // 存储使用量到单独的 Hash
- const usageData: Record<string, string> = {
- status: usage.status,
- };
- if (usage.inputTokens !== undefined) {
- usageData.inputTokens = usage.inputTokens.toString();
- }
- if (usage.outputTokens !== undefined) {
- usageData.outputTokens = usage.outputTokens.toString();
- }
- if (usage.cacheCreationInputTokens !== undefined) {
- usageData.cacheCreationInputTokens = usage.cacheCreationInputTokens.toString();
- }
- if (usage.cacheReadInputTokens !== undefined) {
- usageData.cacheReadInputTokens = usage.cacheReadInputTokens.toString();
- }
- if (usage.costUsd !== undefined) {
- usageData.costUsd = usage.costUsd;
- }
- if (usage.statusCode !== undefined) {
- usageData.statusCode = usage.statusCode.toString();
- }
- if (usage.errorMessage !== undefined) {
- usageData.errorMessage = usage.errorMessage;
- }
- pipeline.hset(`session:${sessionId}:usage`, usageData);
- // 同时更新 info Hash 中的 status
- pipeline.hset(`session:${sessionId}:info`, "status", usage.status);
- // 刷新 TTL
- pipeline.expire(`session:${sessionId}:usage`, SessionManager.SESSION_TTL);
- pipeline.expire(`session:${sessionId}:info`, SessionManager.SESSION_TTL);
- await pipeline.exec();
- logger.trace("SessionManager: Updated session usage", {
- sessionId,
- status: usage.status,
- });
- } catch (error) {
- logger.error("SessionManager: Failed to update session usage", { error });
- }
- }
- /**
- * 存储 session 请求 messages
- *
- * 存储策略受 STORE_SESSION_MESSAGES 控制:
- * - true:原样存储 message 内容
- * - false(默认):存储但对 message 内容脱敏 [REDACTED]
- *
- * @param sessionId - Session ID
- * @param messages - 消息内容
- * @param requestSequence - 可选,请求序号。提供时使用新的 key 格式存储独立消息
- */
- static async storeSessionMessages(
- sessionId: string,
- messages: unknown,
- requestSequence?: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- // 根据配置决定是否脱敏
- const messagesToStore = SessionManager.STORE_MESSAGES ? messages : redactMessages(messages);
- const messagesJson = JSON.stringify(messagesToStore);
- // 新格式:session:{sessionId}:req:{sequence}:messages(独立存储每个请求)
- // 旧格式:session:{sessionId}:messages(向后兼容)
- const key = requestSequence
- ? `session:${sessionId}:req:${requestSequence}:messages`
- : `session:${sessionId}:messages`;
- await redis.setex(key, SessionManager.SESSION_TTL, messagesJson);
- logger.trace("SessionManager: Stored session messages", {
- sessionId,
- requestSequence,
- key,
- redacted: !SessionManager.STORE_MESSAGES,
- });
- } catch (error) {
- logger.error("SessionManager: Failed to store session messages", {
- error,
- });
- }
- }
- /**
- * 辅助方法:从 Redis Hash 数据构建 ActiveSessionInfo 对象
- *
- * @private
- */
- private static buildSessionInfo(
- sessionId: string,
- info: Record<string, string>,
- usage: Record<string, string>
- ): ActiveSessionInfo {
- const startTime = parseInt(info.startTime || "0", 10);
- const now = Date.now();
- const session: ActiveSessionInfo = {
- sessionId,
- userName: info.userName || "unknown",
- userId: parseInt(info.userId || "0", 10),
- keyId: parseInt(info.keyId || "0", 10),
- keyName: info.keyName || "unknown",
- providerId: info.providerId ? parseInt(info.providerId, 10) : null,
- providerName: info.providerName || null,
- model: info.model || null,
- apiType: (info.apiType as "chat" | "codex") || "chat",
- startTime,
- status: (usage.status || info.status || "in_progress") as
- | "in_progress"
- | "completed"
- | "error",
- durationMs: startTime > 0 ? now - startTime : undefined,
- };
- // 添加 usage 数据(如果存在)
- if (usage && Object.keys(usage).length > 0) {
- if (usage.inputTokens) session.inputTokens = parseInt(usage.inputTokens, 10);
- if (usage.outputTokens) session.outputTokens = parseInt(usage.outputTokens, 10);
- if (usage.cacheCreationInputTokens)
- session.cacheCreationInputTokens = parseInt(usage.cacheCreationInputTokens, 10);
- if (usage.cacheReadInputTokens)
- session.cacheReadInputTokens = parseInt(usage.cacheReadInputTokens, 10);
- if (usage.costUsd) session.costUsd = usage.costUsd;
- if (usage.statusCode) session.statusCode = parseInt(usage.statusCode, 10);
- if (usage.errorMessage) session.errorMessage = usage.errorMessage;
- // 计算总 token
- const input = session.inputTokens || 0;
- const output = session.outputTokens || 0;
- const cacheCreate = session.cacheCreationInputTokens || 0;
- const cacheRead = session.cacheReadInputTokens || 0;
- session.totalTokens = input + output + cacheCreate + cacheRead;
- }
- return session;
- }
- /**
- * 获取活跃 session 列表(用于实时监控页面)
- */
- static async getActiveSessions(): Promise<ActiveSessionInfo[]> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- logger.warn("SessionManager: Redis not ready, returning empty list");
- return [];
- }
- try {
- // 1. 使用 SessionTracker 获取活跃 session ID(自动兼容 ZSET/Set)
- const sessionIds = await SessionTracker.getActiveSessions();
- if (sessionIds.length === 0) {
- return [];
- }
- logger.trace("SessionManager: Found active sessions", {
- count: sessionIds.length,
- });
- // 2. 批量获取 session 详细信息
- const sessions: ActiveSessionInfo[] = [];
- const pipeline = redis.pipeline();
- for (const sessionId of sessionIds) {
- pipeline.hgetall(`session:${sessionId}:info`);
- pipeline.hgetall(`session:${sessionId}:usage`);
- }
- const results = await pipeline.exec();
- if (!results) {
- return [];
- }
- // 3. 解析结果
- for (let i = 0; i < sessionIds.length; i++) {
- const infoIndex = i * 2;
- const usageIndex = i * 2 + 1;
- const infoResult = results[infoIndex];
- const usageResult = results[usageIndex];
- // 检查结果有效性
- if (!infoResult || infoResult[0] !== null) continue;
- if (!usageResult || usageResult[0] !== null) continue;
- const info = infoResult[1] as Record<string, string>;
- const usage = usageResult[1] as Record<string, string>;
- // 跳过空的 info(session 可能已过期)
- if (!info || Object.keys(info).length === 0) continue;
- // 使用辅助方法构建 session 对象
- const session = SessionManager.buildSessionInfo(sessionIds[i], info, usage);
- sessions.push(session);
- }
- logger.trace("SessionManager: Retrieved active sessions with details", {
- count: sessions.length,
- });
- return sessions;
- } catch (error) {
- logger.error("SessionManager: Failed to get active sessions", { error });
- return [];
- }
- }
- /**
- * 获取所有 session(包括非活跃的)
- *
- * 使用 SCAN 扫描 Redis 中所有 session:*:info key,
- * 按最后活跃时间分为活跃(5 分钟内)和非活跃两组。
- *
- * @returns { active: 活跃 session 列表, inactive: 非活跃 session 列表 }
- */
- static async getAllSessionsWithExpiry(): Promise<{
- active: ActiveSessionInfo[];
- inactive: ActiveSessionInfo[];
- }> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- logger.warn("SessionManager: Redis not ready, returning empty lists");
- return { active: [], inactive: [] };
- }
- try {
- const now = Date.now();
- const fiveMinutesAgo = now - SessionManager.SESSION_TTL * 1000; // SESSION_TTL 是秒,转为毫秒
- // 1. 使用 SCAN 扫描所有 session:*:info key
- const allSessions: ActiveSessionInfo[] = [];
- let cursor = "0";
- do {
- const [nextCursor, keys] = (await redis.scan(
- cursor,
- "MATCH",
- "session:*:info",
- "COUNT",
- 100
- )) as [string, string[]];
- cursor = nextCursor;
- if (keys.length > 0) {
- // 2. 批量获取 session info 和 usage
- const pipeline = redis.pipeline();
- for (const key of keys) {
- pipeline.hgetall(key);
- // 提取 sessionId
- const sessionId = key.replace("session:", "").replace(":info", "");
- pipeline.hgetall(`session:${sessionId}:usage`);
- }
- const results = await pipeline.exec();
- if (!results) continue;
- // 3. 解析结果
- for (let i = 0; i < keys.length; i++) {
- const infoIndex = i * 2;
- const usageIndex = i * 2 + 1;
- const infoResult = results[infoIndex];
- const usageResult = results[usageIndex];
- // 检查结果有效性
- if (!infoResult || infoResult[0] !== null) continue;
- if (!usageResult || usageResult[0] !== null) continue;
- const info = infoResult[1] as Record<string, string>;
- const usage = usageResult[1] as Record<string, string>;
- // 跳过空的 info
- if (!info || Object.keys(info).length === 0) continue;
- // 提取 sessionId
- const sessionId = keys[i].replace("session:", "").replace(":info", "");
- // 使用辅助方法构建 session 对象
- const session = SessionManager.buildSessionInfo(sessionId, info, usage);
- allSessions.push(session);
- }
- }
- } while (cursor !== "0");
- // 4. 按最后活跃时间分组
- const active: ActiveSessionInfo[] = [];
- const inactive: ActiveSessionInfo[] = [];
- for (const session of allSessions) {
- if (session.startTime >= fiveMinutesAgo) {
- active.push(session);
- } else {
- inactive.push(session);
- }
- }
- logger.trace("SessionManager: Found sessions", {
- active: active.length,
- inactive: inactive.length,
- total: allSessions.length,
- });
- return { active, inactive };
- } catch (error) {
- logger.error("SessionManager: Failed to get all sessions", { error });
- return { active: [], inactive: [] };
- }
- }
- /**
- * 获取所有 session ID 列表(轻量级版本)
- * 仅返回 session ID,不返回详细信息
- *
- * @returns session ID 数组
- */
- static async getAllSessionIds(): Promise<string[]> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- logger.warn("SessionManager: Redis not ready, returning empty list");
- return [];
- }
- try {
- const sessionIds: string[] = [];
- let cursor = "0";
- do {
- const [nextCursor, keys] = (await redis.scan(
- cursor,
- "MATCH",
- "session:*:info",
- "COUNT",
- 100
- )) as [string, string[]];
- cursor = nextCursor;
- if (keys.length > 0) {
- // 提取 sessionId
- for (const key of keys) {
- const sessionId = key.replace("session:", "").replace(":info", "");
- sessionIds.push(sessionId);
- }
- }
- } while (cursor !== "0");
- logger.trace(`SessionManager: Found ${sessionIds.length} session IDs`);
- return sessionIds;
- } catch (error) {
- logger.error("SessionManager: Failed to get session IDs", { error });
- return [];
- }
- }
- /**
- * 获取 session 的 messages 内容
- *
- * @param sessionId - Session ID
- * @param requestSequence - 可选,请求序号。提供时读取特定请求的消息
- * @returns 消息内容(解析后的 JSON 对象,可能已脱敏)
- */
- static async getSessionMessages(
- sessionId: string,
- requestSequence?: number
- ): Promise<unknown | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- // 优先尝试新格式
- if (requestSequence) {
- const newKey = `session:${sessionId}:req:${requestSequence}:messages`;
- const messagesJson = await redis.get(newKey);
- if (messagesJson) {
- return JSON.parse(messagesJson);
- }
- }
- // 向后兼容:尝试旧格式
- const legacyKey = `session:${sessionId}:messages`;
- const messagesJson = await redis.get(legacyKey);
- if (!messagesJson) {
- return null;
- }
- return JSON.parse(messagesJson);
- } catch (error) {
- logger.error("SessionManager: Failed to get session messages", { error });
- return null;
- }
- }
- /**
- * 检查 Session 是否有任意请求的 messages
- *
- * 使用 Redis SCAN 检查是否存在任意格式的 messages key:
- * - 新格式:session:{sessionId}:req:*:messages
- * - 旧格式:session:{sessionId}:messages
- *
- * @param sessionId - Session ID
- * @returns 是否存在任意 messages
- */
- static async hasAnySessionMessages(sessionId: string): Promise<boolean> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return false;
- try {
- // 1. 先检查旧格式(直接 EXISTS 更高效)
- const legacyKey = `session:${sessionId}:messages`;
- const legacyExists = await redis.exists(legacyKey);
- if (legacyExists) {
- return true;
- }
- // 2. 检查新格式:使用 SCAN 搜索 session:{sessionId}:req:*:messages
- let cursor = "0";
- do {
- const [nextCursor, keys] = (await redis.scan(
- cursor,
- "MATCH",
- `session:${sessionId}:req:*:messages`,
- "COUNT",
- 100
- )) as [string, string[]];
- cursor = nextCursor;
- // 找到任意一个就返回 true
- if (keys.length > 0) {
- return true;
- }
- } while (cursor !== "0");
- return false;
- } catch (error) {
- logger.error("SessionManager: Failed to check session messages existence", { error });
- return false;
- }
- }
- /**
- * 存储 session 响应体(临时存储,5分钟过期)
- *
- * 存储行为受 STORE_SESSION_RESPONSE_BODY 控制:
- * - true (默认):存储响应体到 Redis 临时缓存
- * - false:不存储(注意:不影响本次请求处理与统计,仅影响后续查看 response body)
- *
- * 存储策略(脱敏/原样)受 STORE_SESSION_MESSAGES 控制:
- * - true:原样存储响应内容
- * - false(默认):对 JSON 响应体中的 message 内容脱敏 [REDACTED]
- *
- * @param sessionId - Session ID
- * @param response - 响应体内容(字符串或对象)
- * @param requestSequence - 可选,请求序号。提供时使用新的 key 格式存储独立响应
- */
- static async storeSessionResponse(
- sessionId: string,
- response: string | object,
- requestSequence?: number
- ): Promise<void> {
- // 允许通过环境变量显式关闭响应体存储(例如隐私/节省 Redis 内存)。
- // 注意:这里仅关闭“写入 Redis”这一步;调用方仍然可能在内存中读取响应体用于统计或错误检测。
- if (!getEnvConfig().STORE_SESSION_RESPONSE_BODY) return;
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- let responseString: string;
- if (SessionManager.STORE_MESSAGES) {
- // 原样存储
- responseString = typeof response === "string" ? response : JSON.stringify(response);
- } else {
- // 尝试解析 JSON 并脱敏
- if (typeof response === "object") {
- responseString = JSON.stringify(redactResponseBody(response));
- } else {
- // 字符串响应 - 尝试解析为 JSON
- try {
- const parsed = JSON.parse(response);
- responseString = JSON.stringify(redactResponseBody(parsed));
- } catch {
- // 非 JSON(如 SSE 流),原样存储
- responseString = response;
- }
- }
- }
- // 新格式:session:{sessionId}:req:{sequence}:response(独立存储每个请求)
- // 旧格式:session:{sessionId}:response(向后兼容)
- const key = requestSequence
- ? `session:${sessionId}:req:${requestSequence}:response`
- : `session:${sessionId}:response`;
- await redis.setex(key, SessionManager.SESSION_TTL, responseString);
- logger.trace("SessionManager: Stored session response", {
- sessionId,
- requestSequence,
- size: responseString.length,
- redacted: !SessionManager.STORE_MESSAGES,
- });
- } catch (error) {
- logger.error("SessionManager: Failed to store session response", {
- error,
- });
- }
- }
- /**
- * 存储 session 完整请求体(客户端原始请求体,临时存储,5分钟过期)
- *
- * 存储策略受 STORE_SESSION_MESSAGES 控制:
- * - true:原样存储请求体内容
- * - false(默认):存储但对 message 内容脱敏 [REDACTED]
- *
- * @param sessionId - Session ID
- * @param requestBody - 请求体(完整 JSON)
- * @param requestSequence - 可选,请求序号
- */
- static async storeSessionRequestBody(
- sessionId: string,
- requestBody: unknown,
- requestSequence?: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const sequence = normalizeRequestSequence(requestSequence) ?? 1;
- const key = `session:${sessionId}:req:${sequence}:requestBody`;
- // 根据配置决定是否脱敏
- const bodyToStore = SessionManager.STORE_MESSAGES
- ? requestBody
- : redactRequestBody(requestBody);
- const payload = JSON.stringify(bodyToStore);
- await redis.setex(key, SessionManager.SESSION_TTL, payload);
- logger.trace("SessionManager: Stored session request body", {
- sessionId,
- requestSequence: sequence,
- key,
- size: payload.length,
- redacted: !SessionManager.STORE_MESSAGES,
- });
- } catch (error) {
- logger.error("SessionManager: Failed to store session request body", { error, sessionId });
- }
- }
- /**
- * 获取 session 完整请求体(客户端原始请求体,可能已脱敏)
- *
- * @param sessionId - Session ID
- * @param requestSequence - 请求序号
- * @returns 解析后的 JSON 对象(可能已脱敏)
- */
- static async getSessionRequestBody(
- sessionId: string,
- requestSequence?: number
- ): Promise<unknown | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const sequence = normalizeRequestSequence(requestSequence);
- if (!sequence) return null;
- const key = `session:${sessionId}:req:${sequence}:requestBody`;
- const value = await redis.get(key);
- if (!value) return null;
- return JSON.parse(value) as unknown;
- } catch (error) {
- logger.error("SessionManager: Failed to get session request body", { error, sessionId });
- return null;
- }
- }
- /**
- * 存储特殊设置(审计字段,临时存储,5分钟过期)
- *
- * @param sessionId - Session ID
- * @param specialSettings - 特殊设置(可为空)
- * @param requestSequence - 请求序号
- */
- static async storeSessionSpecialSettings(
- sessionId: string,
- specialSettings: SpecialSetting[] | null,
- requestSequence?: number
- ): Promise<void> {
- if (!specialSettings || specialSettings.length === 0) {
- return;
- }
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const sequence = normalizeRequestSequence(requestSequence) ?? 1;
- const key = `session:${sessionId}:req:${sequence}:specialSettings`;
- const payload = JSON.stringify(specialSettings);
- await redis.setex(key, SessionManager.SESSION_TTL, payload);
- } catch (error) {
- logger.error("SessionManager: Failed to store special settings", { error, sessionId });
- }
- }
- static async getSessionSpecialSettings(
- sessionId: string,
- requestSequence?: number
- ): Promise<SpecialSetting[] | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const sequence = normalizeRequestSequence(requestSequence);
- if (!sequence) return null;
- const key = `session:${sessionId}:req:${sequence}:specialSettings`;
- const value = await redis.get(key);
- if (!value) return null;
- const parsed: unknown = JSON.parse(value);
- if (!Array.isArray(parsed)) return null;
- return parsed as SpecialSetting[];
- } catch (error) {
- logger.error("SessionManager: Failed to get special settings", { error, sessionId });
- return null;
- }
- }
- /**
- * 存储客户端请求元信息(端点/方法,临时存储,5分钟过期)
- *
- * @param sessionId - Session ID
- * @param meta - 元信息
- * @param requestSequence - 请求序号
- */
- static async storeSessionClientRequestMeta(
- sessionId: string,
- meta: { url: string | URL; method: string },
- requestSequence?: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const sequence = normalizeRequestSequence(requestSequence) ?? 1;
- const key = `session:${sessionId}:req:${sequence}:clientReqMeta`;
- const payload: SessionRequestMeta = {
- url: sanitizeUrl(meta.url),
- method: meta.method,
- };
- await redis.setex(key, SessionManager.SESSION_TTL, JSON.stringify(payload));
- } catch (error) {
- logger.error("SessionManager: Failed to store client request meta", { error, sessionId });
- }
- }
- static async getSessionClientRequestMeta(
- sessionId: string,
- requestSequence?: number
- ): Promise<SessionRequestMeta | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const sequence = normalizeRequestSequence(requestSequence);
- if (!sequence) return null;
- const key = `session:${sessionId}:req:${sequence}:clientReqMeta`;
- const value = await redis.get(key);
- if (!value) return null;
- const parsed: unknown = JSON.parse(value);
- if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
- const obj = parsed as Record<string, unknown>;
- if (typeof obj.url !== "string" || typeof obj.method !== "string") return null;
- return { url: obj.url, method: obj.method };
- } catch (error) {
- logger.error("SessionManager: Failed to get client request meta", { error, sessionId });
- return null;
- }
- }
- /**
- * 存储上游请求元信息(端点/方法,临时存储,5分钟过期)
- *
- * @param sessionId - Session ID
- * @param meta - 元信息
- * @param requestSequence - 请求序号
- */
- static async storeSessionUpstreamRequestMeta(
- sessionId: string,
- meta: { url: string | URL; method: string },
- requestSequence?: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const sequence = normalizeRequestSequence(requestSequence) ?? 1;
- const key = `session:${sessionId}:req:${sequence}:upstreamReqMeta`;
- const payload: SessionRequestMeta = {
- url: sanitizeUrl(meta.url),
- method: meta.method,
- };
- await redis.setex(key, SessionManager.SESSION_TTL, JSON.stringify(payload));
- } catch (error) {
- logger.error("SessionManager: Failed to store upstream request meta", { error, sessionId });
- }
- }
- static async getSessionUpstreamRequestMeta(
- sessionId: string,
- requestSequence?: number
- ): Promise<SessionRequestMeta | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const sequence = normalizeRequestSequence(requestSequence);
- if (!sequence) return null;
- const key = `session:${sessionId}:req:${sequence}:upstreamReqMeta`;
- const value = await redis.get(key);
- if (!value) return null;
- const parsed: unknown = JSON.parse(value);
- if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
- const obj = parsed as Record<string, unknown>;
- if (typeof obj.url !== "string" || typeof obj.method !== "string") return null;
- return { url: obj.url, method: obj.method };
- } catch (error) {
- logger.error("SessionManager: Failed to get upstream request meta", { error, sessionId });
- return null;
- }
- }
- /**
- * 存储上游响应元信息(端点/状态码,临时存储,5分钟过期)
- *
- * @param sessionId - Session ID
- * @param meta - 元信息
- * @param requestSequence - 请求序号
- */
- static async storeSessionUpstreamResponseMeta(
- sessionId: string,
- meta: { url: string | URL; statusCode: number },
- requestSequence?: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const sequence = normalizeRequestSequence(requestSequence) ?? 1;
- const key = `session:${sessionId}:req:${sequence}:upstreamResMeta`;
- const payload: SessionResponseMeta = {
- url: sanitizeUrl(meta.url),
- statusCode: meta.statusCode,
- };
- await redis.setex(key, SessionManager.SESSION_TTL, JSON.stringify(payload));
- } catch (error) {
- logger.error("SessionManager: Failed to store upstream response meta", { error, sessionId });
- }
- }
- static async getSessionUpstreamResponseMeta(
- sessionId: string,
- requestSequence?: number
- ): Promise<SessionResponseMeta | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const sequence = normalizeRequestSequence(requestSequence);
- if (!sequence) return null;
- const key = `session:${sessionId}:req:${sequence}:upstreamResMeta`;
- const value = await redis.get(key);
- if (!value) return null;
- const parsed: unknown = JSON.parse(value);
- if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
- const obj = parsed as Record<string, unknown>;
- if (typeof obj.url !== "string" || typeof obj.statusCode !== "number") return null;
- return { url: obj.url, statusCode: obj.statusCode };
- } catch (error) {
- logger.error("SessionManager: Failed to get upstream response meta", { error, sessionId });
- return null;
- }
- }
- static async storeSessionRequestHeaders(
- sessionId: string,
- headers: Headers,
- requestSequence?: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const sequence = normalizeRequestSequence(requestSequence) ?? 1;
- const key = `session:${sessionId}:req:${sequence}:reqHeaders`;
- const headersJson = JSON.stringify(headersToSanitizedObject(headers));
- await redis.setex(key, SessionManager.SESSION_TTL, headersJson);
- logger.trace("SessionManager: Stored session request headers", {
- sessionId,
- requestSequence: sequence,
- key,
- });
- } catch (error) {
- logger.error("SessionManager: Failed to store session request headers", { error, sessionId });
- }
- }
- static async storeSessionResponseHeaders(
- sessionId: string,
- headers: Headers,
- requestSequence?: number
- ): Promise<void> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return;
- try {
- const sequence = normalizeRequestSequence(requestSequence) ?? 1;
- const key = `session:${sessionId}:req:${sequence}:resHeaders`;
- const headersJson = JSON.stringify(headersToSanitizedObject(headers));
- await redis.setex(key, SessionManager.SESSION_TTL, headersJson);
- logger.trace("SessionManager: Stored session response headers", {
- sessionId,
- requestSequence: sequence,
- key,
- });
- } catch (error) {
- logger.error("SessionManager: Failed to store session response headers", {
- error,
- sessionId,
- });
- }
- }
- static async getSessionRequestHeaders(
- sessionId: string,
- requestSequence?: number
- ): Promise<Record<string, string> | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const sequence = normalizeRequestSequence(requestSequence);
- if (!sequence) return null;
- const key = `session:${sessionId}:req:${sequence}:reqHeaders`;
- const value = await redis.get(key);
- if (!value) return null;
- return parseHeaderRecord(value);
- } catch (error) {
- logger.error("SessionManager: Failed to get session request headers", { error, sessionId });
- return null;
- }
- }
- static async getSessionResponseHeaders(
- sessionId: string,
- requestSequence?: number
- ): Promise<Record<string, string> | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- const sequence = normalizeRequestSequence(requestSequence);
- if (!sequence) return null;
- const key = `session:${sessionId}:req:${sequence}:resHeaders`;
- const value = await redis.get(key);
- if (!value) return null;
- return parseHeaderRecord(value);
- } catch (error) {
- logger.error("SessionManager: Failed to get session response headers", { error, sessionId });
- return null;
- }
- }
- /**
- * 获取 session 响应体
- *
- * @param sessionId - Session ID
- * @param requestSequence - 可选,请求序号。提供时读取特定请求的响应
- * @returns 响应体内容(字符串)
- */
- static async getSessionResponse(
- sessionId: string,
- requestSequence?: number
- ): Promise<string | null> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") return null;
- try {
- // 优先尝试新格式
- if (requestSequence) {
- const newKey = `session:${sessionId}:req:${requestSequence}:response`;
- const response = await redis.get(newKey);
- if (response) return response;
- }
- // 向后兼容:尝试旧格式
- const legacyKey = `session:${sessionId}:response`;
- const response = await redis.get(legacyKey);
- return response;
- } catch (error) {
- logger.error("SessionManager: Failed to get session response", { error });
- return null;
- }
- }
- /**
- * 从 Codex 响应中提取 prompt_cache_key 作为 Session ID
- *
- * Codex 响应中包含 prompt_cache_key 字段(UUID 格式),用于标识缓存上下文。
- * 这个字段出现在 response.created、response.in_progress、response.completed 等事件中。
- *
- * @param responseData - Codex 响应数据(流式事件的 data 部分或完整响应)
- * @returns prompt_cache_key 或 null
- */
- static extractCodexPromptCacheKey(responseData: Record<string, unknown>): string | null {
- // 检查 response 对象中的 prompt_cache_key(SSE 事件格式)
- const response = responseData.response as Record<string, unknown> | undefined;
- if (
- response &&
- typeof response.prompt_cache_key === "string" &&
- response.prompt_cache_key.length > 0
- ) {
- logger.trace("SessionManager: Extracted prompt_cache_key from response object", {
- promptCacheKey: response.prompt_cache_key,
- });
- return response.prompt_cache_key;
- }
- // 备选:直接在顶层检查(非流式响应格式)
- if (
- typeof responseData.prompt_cache_key === "string" &&
- responseData.prompt_cache_key.length > 0
- ) {
- logger.trace("SessionManager: Extracted prompt_cache_key from top level", {
- promptCacheKey: responseData.prompt_cache_key,
- });
- return responseData.prompt_cache_key;
- }
- return null;
- }
- /**
- * 使用 Codex 的 prompt_cache_key 更新 Session 绑定
- *
- * 策略:如果响应中包含 prompt_cache_key,使用它作为 Session ID 的来源。
- * 这类似于 Claude 从请求 metadata 中提取 session_id 的机制。
- *
- * Session ID 格式:codex_{prompt_cache_key}(添加前缀以区分来源)
- *
- * @param currentSessionId - 当前的 Session ID(可能是生成的或从请求提取的)
- * @param promptCacheKey - Codex 响应中的 prompt_cache_key
- * @param providerId - 供应商 ID
- * @returns 更新后的 Session ID 和是否创建了新绑定
- */
- static async updateSessionWithCodexCacheKey(
- currentSessionId: string,
- promptCacheKey: string,
- providerId: number
- ): Promise<{ sessionId: string; updated: boolean }> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- logger.debug("SessionManager: Redis not ready, skipping Codex session update");
- return { sessionId: currentSessionId, updated: false };
- }
- try {
- // 使用 prompt_cache_key 作为新的 Session ID(添加前缀以区分)
- const codexSessionId = `codex_${promptCacheKey}`;
- // 检查是否已经存在绑定
- const existingProvider = await redis.get(`session:${codexSessionId}:provider`);
- if (existingProvider) {
- // 已存在绑定,刷新 TTL
- await redis.expire(`session:${codexSessionId}:provider`, SessionManager.SESSION_TTL);
- logger.debug("SessionManager: Refreshed Codex session TTL", {
- sessionId: codexSessionId,
- providerId: parseInt(existingProvider, 10),
- });
- return { sessionId: codexSessionId, updated: false };
- }
- // 新建绑定
- await redis.set(
- `session:${codexSessionId}:provider`,
- providerId.toString(),
- "EX",
- SessionManager.SESSION_TTL
- );
- logger.info("SessionManager: Created Codex session from prompt_cache_key", {
- sessionId: codexSessionId,
- promptCacheKey,
- providerId,
- ttl: SessionManager.SESSION_TTL,
- });
- return { sessionId: codexSessionId, updated: true };
- } catch (error) {
- logger.error("SessionManager: Failed to update Codex session", { error });
- return { sessionId: currentSessionId, updated: false };
- }
- }
- /**
- * 终止 Session(主动打断)
- *
- * 功能:删除 Session 在 Redis 中的所有绑定关系,强制下次请求重新选择供应商
- * 用途:管理员主动打断长时间占用同一供应商的 Session
- *
- * @param sessionId - Session ID
- * @returns 是否成功删除
- */
- static async terminateSession(sessionId: string): Promise<boolean> {
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- logger.warn("SessionManager: Redis not ready, cannot terminate session");
- return false;
- }
- try {
- // 1. 先查询绑定信息(用于从 ZSET 中移除)
- let providerId: number | null = null;
- let keyId: number | null = null;
- try {
- const [providerIdStr, keyIdStr] = await Promise.all([
- redis.get(`session:${sessionId}:provider`),
- redis.get(`session:${sessionId}:key`),
- ]);
- providerId = providerIdStr ? parseInt(providerIdStr, 10) : null;
- keyId = keyIdStr ? parseInt(keyIdStr, 10) : null;
- } catch (lookupError) {
- // Redis 查询失败不应阻止清理操作,继续执行删除
- logger.warn(
- "SessionManager: Failed to lookup session binding info, continuing with cleanup",
- {
- sessionId,
- error: lookupError,
- }
- );
- }
- // 2. 删除所有 Session 相关的 key
- const pipeline = redis.pipeline();
- // 基础绑定信息
- pipeline.del(`session:${sessionId}:provider`);
- pipeline.del(`session:${sessionId}:key`);
- pipeline.del(`session:${sessionId}:info`);
- pipeline.del(`session:${sessionId}:last_seen`);
- pipeline.del(`session:${sessionId}:concurrent_count`);
- // 可选:messages 和 response(如果启用了存储)
- pipeline.del(`session:${sessionId}:messages`);
- pipeline.del(`session:${sessionId}:response`);
- // 3. 从 ZSET 中移除(始终尝试,即使查询失败)
- pipeline.zrem("global:active_sessions", sessionId);
- if (providerId) {
- pipeline.zrem(`provider:${providerId}:active_sessions`, sessionId);
- }
- if (keyId) {
- pipeline.zrem(`key:${keyId}:active_sessions`, sessionId);
- }
- // 4. 删除 hash 映射(如果存在)
- // 注意:无法直接反查 hash,只能清理已知的 session key
- // hash 会在 TTL 后自动过期,不影响功能
- const results = await pipeline.exec();
- // 5. 检查结果
- let deletedKeys = 0;
- if (results) {
- for (const [err, result] of results) {
- if (!err && typeof result === "number" && result > 0) {
- deletedKeys += result;
- }
- }
- }
- logger.info("SessionManager: Terminated session", {
- sessionId,
- providerId,
- keyId,
- deletedKeys,
- });
- return deletedKeys > 0;
- } catch (error) {
- logger.error("SessionManager: Failed to terminate session", {
- error,
- sessionId,
- });
- return false;
- }
- }
- /**
- * 批量终止 Session
- *
- * 采用分块处理策略,避免大批量操作时对 Redis 造成过大压力
- *
- * @param sessionIds - Session ID 列表
- * @returns 成功终止的数量
- */
- static async terminateSessionsBatch(sessionIds: string[]): Promise<number> {
- if (sessionIds.length === 0) {
- return 0;
- }
- const redis = getRedisClient();
- if (!redis || redis.status !== "ready") {
- logger.warn("SessionManager: Redis not ready, cannot terminate sessions");
- return 0;
- }
- try {
- // 分块处理,每批 20 个,避免并发过高
- const CHUNK_SIZE = 20;
- let successCount = 0;
- for (let i = 0; i < sessionIds.length; i += CHUNK_SIZE) {
- const chunk = sessionIds.slice(i, i + CHUNK_SIZE);
- const results = await Promise.all(
- chunk.map(async (sessionId) => {
- const success = await SessionManager.terminateSession(sessionId);
- return success ? 1 : 0;
- })
- );
- successCount += results.reduce<number>((sum, value) => sum + value, 0);
- }
- logger.info("SessionManager: Terminated sessions batch", {
- total: sessionIds.length,
- successCount,
- });
- return successCount;
- } catch (error) {
- logger.error("SessionManager: Failed to terminate sessions batch", {
- error,
- });
- return 0;
- }
- }
- }
- export { headersToSanitizedObject, parseHeaderRecord };
|