session-manager.ts 67 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046
  1. import "server-only";
  2. import crypto from "node:crypto";
  3. import { extractCodexSessionId } from "@/app/v1/_lib/codex/session-extractor";
  4. import { sanitizeHeaders, sanitizeUrl } from "@/app/v1/_lib/proxy/errors";
  5. import { getEnvConfig } from "@/lib/config/env.schema";
  6. import { logger } from "@/lib/logger";
  7. import {
  8. redactMessages,
  9. redactRequestBody,
  10. redactResponseBody,
  11. } from "@/lib/utils/message-redaction";
  12. import { normalizeRequestSequence } from "@/lib/utils/request-sequence";
  13. import type {
  14. ActiveSessionInfo,
  15. SessionProviderInfo,
  16. SessionStoreInfo,
  17. SessionUsageUpdate,
  18. } from "@/types/session";
  19. import type { SpecialSetting } from "@/types/special-settings";
  20. import { getRedisClient } from "./redis";
  21. import { SessionTracker } from "./session-tracker";
  22. function headersToSanitizedObject(headers: Headers): Record<string, string> {
  23. const sanitizedText = sanitizeHeaders(headers);
  24. if (!sanitizedText || sanitizedText === "(empty)") {
  25. return {};
  26. }
  27. const obj: Record<string, string> = {};
  28. const lines = sanitizedText.split(/\r?\n/).filter(Boolean);
  29. for (const line of lines) {
  30. const colonIndex = line.indexOf(":");
  31. if (colonIndex === -1) continue;
  32. const name = line.slice(0, colonIndex).trim();
  33. const value = line.slice(colonIndex + 1).trim();
  34. if (!name) continue;
  35. if (obj[name]) {
  36. obj[name] = `${obj[name]}\n${value}`;
  37. } else {
  38. obj[name] = value;
  39. }
  40. }
  41. return obj;
  42. }
  43. function parseHeaderRecord(value: string): Record<string, string> | null {
  44. try {
  45. const parsed: unknown = JSON.parse(value);
  46. if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
  47. const record: Record<string, string> = {};
  48. for (const [key, raw] of Object.entries(parsed as Record<string, unknown>)) {
  49. if (typeof raw === "string") {
  50. record[key] = raw;
  51. }
  52. }
  53. return record;
  54. } catch (error) {
  55. logger.warn("SessionManager: Failed to parse header record JSON", { error });
  56. return null;
  57. }
  58. }
  59. type SessionRequestMeta = {
  60. url: string;
  61. method: string;
  62. };
  63. type SessionResponseMeta = {
  64. url: string;
  65. statusCode: number;
  66. };
  67. /**
  68. * Session 管理器
  69. *
  70. * 核心功能:
  71. * 1. 基于 messages 内容哈希识别 session
  72. * 2. 管理 session 与 provider 的绑定关系
  73. * 3. 支持客户端主动传递 session_id
  74. * 4. 存储和查询活跃 session 详细信息(用于实时监控)
  75. */
  76. export class SessionManager {
  77. private static readonly SESSION_TTL = parseInt(process.env.SESSION_TTL || "300", 10); // 5 分钟
  78. private static readonly SHORT_CONTEXT_THRESHOLD = parseInt(
  79. process.env.SHORT_CONTEXT_THRESHOLD || "2",
  80. 10
  81. ); // 短上下文阈值
  82. private static readonly ENABLE_SHORT_CONTEXT_DETECTION =
  83. process.env.ENABLE_SHORT_CONTEXT_DETECTION !== "false"; // 默认启用
  84. /**
  85. * 获取 STORE_SESSION_MESSAGES 配置
  86. * - true:原样存储 message 内容
  87. * - false(默认):存储但对 message 内容脱敏 [REDACTED]
  88. */
  89. private static get STORE_MESSAGES(): boolean {
  90. return getEnvConfig().STORE_SESSION_MESSAGES;
  91. }
  92. /**
  93. * 从客户端请求中提取 session_id(支持 metadata 或 header)
  94. *
  95. * 优先级:
  96. * 1. metadata.user_id (Claude Code 主要方式,典型格式: "user_{hash}_account__session_{sessionId}")
  97. * 2. metadata.session_id (备选方式)
  98. */
  99. static extractClientSessionId(
  100. requestMessage: Record<string, unknown>,
  101. headers?: Headers | null,
  102. _userAgent?: string | null
  103. ): string | null {
  104. // Codex 请求:优先尝试从 headers/body 提取稳定的 session_id
  105. if (headers && Array.isArray(requestMessage.input)) {
  106. const result = extractCodexSessionId(headers, requestMessage);
  107. if (result.sessionId) {
  108. logger.trace("SessionManager: Extracted session from Codex request", {
  109. sessionId: result.sessionId,
  110. source: result.source,
  111. });
  112. return result.sessionId;
  113. }
  114. return null;
  115. }
  116. const metadata = requestMessage.metadata;
  117. if (!metadata || typeof metadata !== "object") {
  118. return null;
  119. }
  120. const metadataObj = metadata as Record<string, unknown>;
  121. // 方案 A: 从 metadata.user_id 中提取 (Claude Code 主要方式)
  122. // 典型格式: "user_{hash}_account__session_{sessionId}"
  123. if (typeof metadataObj.user_id === "string" && metadataObj.user_id.length > 0) {
  124. const userId = metadataObj.user_id;
  125. const sessionMarker = "_session_";
  126. const markerIndex = userId.indexOf(sessionMarker);
  127. if (markerIndex !== -1) {
  128. const extractedSessionId = userId.substring(markerIndex + sessionMarker.length);
  129. if (extractedSessionId.length > 0) {
  130. logger.trace("SessionManager: Extracted session from metadata.user_id", {
  131. sessionId: extractedSessionId,
  132. });
  133. return extractedSessionId;
  134. }
  135. }
  136. }
  137. // 方案 B: 直接从 metadata.session_id 读取 (备选方案)
  138. if (typeof metadataObj.session_id === "string" && metadataObj.session_id.length > 0) {
  139. logger.trace("SessionManager: Extracted session from metadata.session_id", {
  140. sessionId: metadataObj.session_id,
  141. });
  142. return metadataObj.session_id;
  143. }
  144. return null;
  145. }
  146. /**
  147. * 生成新的 session_id
  148. * 格式:sess_{timestamp}_{random}
  149. */
  150. static generateSessionId(): string {
  151. const timestamp = Date.now().toString(36);
  152. const random = crypto.randomBytes(6).toString("hex");
  153. return `sess_${timestamp}_${random}`;
  154. }
  155. /**
  156. * 获取 Session 内下一个请求序号(原子操作)
  157. *
  158. * 使用 Redis INCR 保证并发安全,序号从 1 开始递增
  159. * 每个请求在同一 Session 内获得唯一序号,用于独立存储 messages
  160. *
  161. * @param sessionId - Session ID
  162. * @returns 请求序号(从 1 开始),Redis 不可用时返回基于时间戳的唯一序号
  163. */
  164. static async getNextRequestSequence(sessionId: string): Promise<number> {
  165. const redis = getRedisClient();
  166. if (!redis || redis.status !== "ready") {
  167. // 改进的 fallback:使用时间戳 + 随机数生成伪唯一序号
  168. // 避免 Redis 不可用时所有请求都返回 1 导致的冲突
  169. const fallbackSeq = (Date.now() % 1000000) + Math.floor(Math.random() * 1000);
  170. logger.warn("SessionManager: Redis not ready, using fallback sequence", {
  171. sessionId,
  172. fallbackSeq,
  173. });
  174. return fallbackSeq;
  175. }
  176. try {
  177. const key = `session:${sessionId}:seq`;
  178. const sequence = await redis.incr(key);
  179. // 首次创建时设置过期时间
  180. if (sequence === 1) {
  181. await redis.expire(key, SessionManager.SESSION_TTL);
  182. }
  183. logger.trace("SessionManager: Got next request sequence", {
  184. sessionId,
  185. sequence,
  186. });
  187. return sequence;
  188. } catch (error) {
  189. // 改进的 fallback:使用时间戳 + 随机数生成伪唯一序号
  190. const fallbackSeq = (Date.now() % 1000000) + Math.floor(Math.random() * 1000);
  191. logger.error("SessionManager: Failed to get request sequence, using fallback", {
  192. error,
  193. sessionId,
  194. fallbackSeq,
  195. });
  196. return fallbackSeq;
  197. }
  198. }
  199. /**
  200. * 获取 Session 当前的请求计数
  201. *
  202. * @param sessionId - Session ID
  203. * @returns 当前请求数量,不存在返回 0
  204. */
  205. static async getSessionRequestCount(sessionId: string): Promise<number> {
  206. const redis = getRedisClient();
  207. if (!redis || redis.status !== "ready") return 0;
  208. try {
  209. const count = await redis.get(`session:${sessionId}:seq`);
  210. return count ? parseInt(count, 10) : 0;
  211. } catch (error) {
  212. logger.error("SessionManager: Failed to get request count", {
  213. error,
  214. sessionId,
  215. });
  216. return 0;
  217. }
  218. }
  219. /**
  220. * 计算 messages 内容哈希(用于 session 匹配)
  221. *
  222. * ⚠️ 注意: 这是一个降级方案,仅在无法从 metadata 提取 session ID 时使用
  223. * 不同会话如果开头相似可能产生相同哈希,因此优先使用 metadata.user_id
  224. *
  225. * @param messages - 消息数组
  226. * @returns 哈希值(16 字符)或 null
  227. */
  228. static calculateMessagesHash(messages: unknown): string | null {
  229. if (!Array.isArray(messages) || messages.length === 0) {
  230. logger.trace("SessionManager: calculateMessagesHash - messages is empty or not array");
  231. return null;
  232. }
  233. // 计算范围:前 N 条(N = min(length, 3))
  234. const count = Math.min(messages.length, 3);
  235. const contents: string[] = [];
  236. for (let i = 0; i < count; i++) {
  237. const message = messages[i];
  238. if (message && typeof message === "object") {
  239. const messageObj = message as Record<string, unknown>;
  240. const content = messageObj.content;
  241. if (typeof content === "string") {
  242. contents.push(content);
  243. logger.trace("SessionManager: Message content (string)", {
  244. index: i,
  245. preview: content.substring(0, 100),
  246. });
  247. } else if (Array.isArray(content)) {
  248. // 支持多模态 content(数组格式)
  249. const textParts = content
  250. .filter(
  251. (item) =>
  252. item &&
  253. typeof item === "object" &&
  254. (item as Record<string, unknown>).type === "text"
  255. )
  256. .map((item) => (item as Record<string, unknown>).text);
  257. const joined = textParts.join("");
  258. contents.push(joined);
  259. logger.trace("SessionManager: Message content (array)", {
  260. index: i,
  261. preview: joined.substring(0, 100),
  262. });
  263. } else {
  264. logger.trace("SessionManager: Message content type (skipped)", {
  265. index: i,
  266. type: typeof content,
  267. });
  268. }
  269. }
  270. }
  271. if (contents.length === 0) {
  272. logger.trace("SessionManager: calculateMessagesHash - no valid contents extracted");
  273. return null;
  274. }
  275. // 拼接并计算 SHA-256 哈希
  276. const combined = contents.join("|");
  277. const hash = crypto.createHash("sha256").update(combined, "utf8").digest("hex");
  278. // 截取前 16 字符(足够区分,节省存储)
  279. const shortHash = hash.substring(0, 16);
  280. logger.trace("SessionManager: Calculated hash", {
  281. hash: shortHash,
  282. messageCount: contents.length,
  283. totalChars: combined.length,
  284. });
  285. return shortHash;
  286. }
  287. /**
  288. * 获取或创建 session_id(核心方法)
  289. *
  290. * @param keyId - API Key ID
  291. * @param messages - 消息数组
  292. * @param clientSessionId - 客户端传递的 session_id(可选)
  293. * @returns session_id
  294. */
  295. static async getOrCreateSessionId(
  296. keyId: number,
  297. messages: unknown,
  298. clientSessionId?: string | null
  299. ): Promise<string> {
  300. const redis = getRedisClient();
  301. const messagesLength = Array.isArray(messages) ? messages.length : 0;
  302. logger.trace("SessionManager: getOrCreateSessionId called", {
  303. keyId,
  304. hasClientSession: !!clientSessionId,
  305. messagesLength,
  306. });
  307. // 1. 优先使用客户端传递的 session_id (来自 metadata.user_id 或 metadata.session_id)
  308. if (clientSessionId) {
  309. // 2. 短上下文并发检测(方案E)
  310. if (
  311. SessionManager.ENABLE_SHORT_CONTEXT_DETECTION &&
  312. messagesLength <= SessionManager.SHORT_CONTEXT_THRESHOLD
  313. ) {
  314. // 检查该 session 是否有其他请求正在运行
  315. const concurrentCount = await SessionTracker.getConcurrentCount(clientSessionId);
  316. if (concurrentCount > 0) {
  317. // 场景B:有并发请求 → 这是并发短任务 → 强制新建 session
  318. const newId = SessionManager.generateSessionId();
  319. logger.info("SessionManager: 检测到并发短任务,强制新建 session", {
  320. originalSessionId: clientSessionId,
  321. newSessionId: newId,
  322. messagesLength,
  323. existingConcurrentCount: concurrentCount,
  324. });
  325. return newId;
  326. }
  327. // 场景A:无并发 → 这可能是长对话的开始 → 允许复用
  328. logger.debug("SessionManager: 短上下文但 session 空闲,允许复用(长对话开始)", {
  329. sessionId: clientSessionId,
  330. messagesLength,
  331. });
  332. }
  333. // 3. 长上下文 or 无并发 → 正常复用
  334. logger.debug("SessionManager: Using client-provided session", {
  335. sessionId: clientSessionId,
  336. });
  337. // 刷新 TTL(滑动窗口)
  338. if (redis && redis.status === "ready") {
  339. await SessionManager.refreshSessionTTL(clientSessionId).catch((err) => {
  340. logger.error("SessionManager: Failed to refresh TTL", { error: err });
  341. });
  342. }
  343. return clientSessionId;
  344. }
  345. // 2. 降级方案:计算 messages 内容哈希(TC-047 警告:不可靠)
  346. logger.warn(
  347. "SessionManager: No client session ID, falling back to content hash (unreliable for compressed dialogs)",
  348. {
  349. keyId,
  350. messagesLength: Array.isArray(messages) ? messages.length : 0,
  351. }
  352. );
  353. const contentHash = SessionManager.calculateMessagesHash(messages);
  354. if (!contentHash) {
  355. // 降级:无法计算哈希,生成新 session
  356. const newId = SessionManager.generateSessionId();
  357. logger.warn("SessionManager: Cannot calculate hash, generating new session", {
  358. sessionId: newId,
  359. });
  360. return newId;
  361. }
  362. // 3. 尝试从 Redis 查找已有 session
  363. if (redis && redis.status === "ready") {
  364. try {
  365. const hashKey = `hash:${contentHash}:session`;
  366. const existingSessionId = await redis.get(hashKey);
  367. if (existingSessionId) {
  368. // 找到已有 session,刷新 TTL
  369. await SessionManager.refreshSessionTTL(existingSessionId);
  370. logger.trace("SessionManager: Reusing session via hash", {
  371. sessionId: existingSessionId,
  372. hash: contentHash,
  373. });
  374. return existingSessionId;
  375. }
  376. // 未找到:创建新 session
  377. const newSessionId = SessionManager.generateSessionId();
  378. // 存储映射关系(异步,不阻塞)
  379. void SessionManager.storeSessionMapping(contentHash, newSessionId, keyId);
  380. logger.trace("SessionManager: Created new session with hash", {
  381. sessionId: newSessionId,
  382. hash: contentHash,
  383. });
  384. return newSessionId;
  385. } catch (error) {
  386. logger.error("SessionManager: Redis error", { error });
  387. // 降级:Redis 错误,生成新 session
  388. return SessionManager.generateSessionId();
  389. }
  390. }
  391. // 4. Redis 不可用,降级生成新 session
  392. return SessionManager.generateSessionId();
  393. }
  394. /**
  395. * 存储 hash → session 映射关系
  396. */
  397. private static async storeSessionMapping(
  398. contentHash: string,
  399. sessionId: string,
  400. keyId: number
  401. ): Promise<void> {
  402. const redis = getRedisClient();
  403. if (!redis || redis.status !== "ready") return;
  404. try {
  405. const pipeline = redis.pipeline();
  406. const hashKey = `hash:${contentHash}:session`;
  407. // 存储映射关系
  408. pipeline.setex(hashKey, SessionManager.SESSION_TTL, sessionId);
  409. // 初始化 session 元数据
  410. pipeline.setex(`session:${sessionId}:key`, SessionManager.SESSION_TTL, keyId.toString());
  411. pipeline.setex(
  412. `session:${sessionId}:last_seen`,
  413. SessionManager.SESSION_TTL,
  414. Date.now().toString()
  415. );
  416. await pipeline.exec();
  417. } catch (error) {
  418. logger.error("SessionManager: Failed to store session mapping", {
  419. error,
  420. });
  421. }
  422. }
  423. /**
  424. * 刷新 session TTL(滑动窗口)
  425. */
  426. private static async refreshSessionTTL(sessionId: string): Promise<void> {
  427. const redis = getRedisClient();
  428. if (!redis || redis.status !== "ready") return;
  429. try {
  430. const pipeline = redis.pipeline();
  431. // 刷新所有 session 相关 key 的 TTL
  432. pipeline.expire(`session:${sessionId}:key`, SessionManager.SESSION_TTL);
  433. pipeline.expire(`session:${sessionId}:provider`, SessionManager.SESSION_TTL);
  434. pipeline.setex(
  435. `session:${sessionId}:last_seen`,
  436. SessionManager.SESSION_TTL,
  437. Date.now().toString()
  438. );
  439. await pipeline.exec();
  440. } catch (error) {
  441. logger.error("SessionManager: Failed to refresh TTL", { error });
  442. }
  443. }
  444. /**
  445. * 绑定 session 到 provider(TC-009 修复:使用 SET NX 避免竞态条件)
  446. */
  447. static async bindSessionToProvider(sessionId: string, providerId: number): Promise<void> {
  448. const redis = getRedisClient();
  449. if (!redis || redis.status !== "ready") return;
  450. try {
  451. const key = `session:${sessionId}:provider`;
  452. // 使用 SET ... NX 保证只有第一次绑定成功(原子操作)
  453. const result = await redis.set(
  454. key,
  455. providerId.toString(),
  456. "EX",
  457. SessionManager.SESSION_TTL,
  458. "NX" // Only set if not exists
  459. );
  460. if (result === "OK") {
  461. logger.trace("SessionManager: Bound session to provider", {
  462. sessionId,
  463. providerId,
  464. });
  465. } else {
  466. // 已绑定过,不覆盖(避免并发请求选择不同供应商)
  467. logger.debug("SessionManager: Session already bound, skipping", {
  468. sessionId,
  469. attemptedProviderId: providerId,
  470. });
  471. }
  472. } catch (error) {
  473. logger.error("SessionManager: Failed to bind provider", { error });
  474. }
  475. }
  476. /**
  477. * 获取 session 绑定的 provider
  478. */
  479. static async getSessionProvider(sessionId: string): Promise<number | null> {
  480. const redis = getRedisClient();
  481. if (!redis || redis.status !== "ready") return null;
  482. try {
  483. const value = await redis.get(`session:${sessionId}:provider`);
  484. if (value) {
  485. const providerId = parseInt(value, 10);
  486. if (!Number.isNaN(providerId)) {
  487. return providerId;
  488. }
  489. }
  490. } catch (error) {
  491. logger.error("SessionManager: Failed to get session provider", { error });
  492. }
  493. return null;
  494. }
  495. /**
  496. * 获取当前绑定供应商的优先级
  497. *
  498. * ⚠️ 修复:从 session:provider 读取(真实绑定),而不是 session:info
  499. * 原因:info.providerId 是并发检查通过的供应商,可能请求失败了
  500. *
  501. * @param sessionId - Session ID
  502. * @returns 优先级数字(数字越小优先级越高),如果未绑定或无法查询则返回 null
  503. */
  504. static async getSessionProviderPriority(sessionId: string): Promise<number | null> {
  505. const redis = getRedisClient();
  506. if (!redis || redis.status !== "ready") return null;
  507. try {
  508. // 修复:从真实绑定关系读取(session:provider)
  509. const providerIdStr = await redis.get(`session:${sessionId}:provider`);
  510. if (!providerIdStr) {
  511. return null;
  512. }
  513. const providerId = parseInt(providerIdStr, 10);
  514. if (Number.isNaN(providerId)) {
  515. return null;
  516. }
  517. // 查询供应商详情获取优先级
  518. const { findProviderById } = await import("@/repository/provider");
  519. const provider = await findProviderById(providerId);
  520. if (!provider) {
  521. logger.warn("SessionManager: Bound provider not found", { providerId });
  522. return null;
  523. }
  524. return provider.priority;
  525. } catch (error) {
  526. logger.error("SessionManager: Failed to get session provider priority", {
  527. error,
  528. });
  529. return null;
  530. }
  531. }
  532. /**
  533. * 智能更新 Session 绑定
  534. *
  535. * 策略:首次绑定用 SET NX;故障转移后无条件更新;其他情况按优先级和熔断状态决策
  536. */
  537. static async updateSessionBindingSmart(
  538. sessionId: string,
  539. newProviderId: number,
  540. newProviderPriority: number,
  541. isFirstAttempt: boolean = false,
  542. isFailoverSuccess: boolean = false
  543. ): Promise<{ updated: boolean; reason: string; details?: string }> {
  544. const redis = getRedisClient();
  545. if (!redis || redis.status !== "ready") {
  546. return { updated: false, reason: "redis_not_ready" };
  547. }
  548. try {
  549. // ========== 情况 1:首次尝试成功 ==========
  550. if (isFirstAttempt) {
  551. const key = `session:${sessionId}:provider`;
  552. // 使用 SET NX 绑定(避免覆盖并发请求)
  553. const result = await redis.set(
  554. key,
  555. newProviderId.toString(),
  556. "EX",
  557. SessionManager.SESSION_TTL,
  558. "NX"
  559. );
  560. if (result === "OK") {
  561. logger.info("SessionManager: Bound session to provider (first success)", {
  562. sessionId,
  563. providerId: newProviderId,
  564. priority: newProviderPriority,
  565. });
  566. return {
  567. updated: true,
  568. reason: "first_success",
  569. details: `首次成功,绑定到供应商 ${newProviderId} (priority=${newProviderPriority})`,
  570. };
  571. } else {
  572. // 并发请求已经绑定了,放弃更新
  573. return {
  574. updated: false,
  575. reason: "concurrent_binding_exists",
  576. details: "并发请求已绑定,跳过",
  577. };
  578. }
  579. }
  580. // ========== 情况 2:重试成功(需要智能决策)==========
  581. // 2.0 故障转移成功:无条件更新绑定(减少缓存切换)
  582. if (isFailoverSuccess) {
  583. const key = `session:${sessionId}:provider`;
  584. await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
  585. logger.info("SessionManager: Updated binding after failover", {
  586. sessionId,
  587. newProviderId,
  588. newPriority: newProviderPriority,
  589. });
  590. return {
  591. updated: true,
  592. reason: "failover_success",
  593. details: `故障转移成功,绑定到供应商 ${newProviderId}`,
  594. };
  595. }
  596. // 2.1 获取当前绑定的供应商 ID
  597. const currentProviderIdStr = await redis.get(`session:${sessionId}:provider`);
  598. if (!currentProviderIdStr) {
  599. // 没有绑定,使用 SET NX 绑定
  600. const key = `session:${sessionId}:provider`;
  601. const result = await redis.set(
  602. key,
  603. newProviderId.toString(),
  604. "EX",
  605. SessionManager.SESSION_TTL,
  606. "NX"
  607. );
  608. if (result === "OK") {
  609. logger.info("SessionManager: Bound session (no previous binding)", {
  610. sessionId,
  611. providerId: newProviderId,
  612. priority: newProviderPriority,
  613. });
  614. return {
  615. updated: true,
  616. reason: "no_previous_binding",
  617. details: `无绑定,绑定到供应商 ${newProviderId} (priority=${newProviderPriority})`,
  618. };
  619. } else {
  620. return {
  621. updated: false,
  622. reason: "concurrent_binding_exists",
  623. details: "并发请求已绑定",
  624. };
  625. }
  626. }
  627. const currentProviderId = parseInt(currentProviderIdStr, 10);
  628. if (Number.isNaN(currentProviderId)) {
  629. logger.warn("SessionManager: Invalid provider ID in Redis", {
  630. currentProviderIdStr,
  631. });
  632. return { updated: false, reason: "invalid_provider_id" };
  633. }
  634. // 2.2 查询当前供应商的详情(优先级 + 健康状态)
  635. const { findProviderById } = await import("@/repository/provider");
  636. const currentProvider = await findProviderById(currentProviderId);
  637. if (!currentProvider) {
  638. // 当前供应商不存在(可能被删除),直接更新
  639. const key = `session:${sessionId}:provider`;
  640. await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
  641. logger.info("SessionManager: Updated binding (current provider not found)", {
  642. sessionId,
  643. oldProviderId: currentProviderId,
  644. newProviderId,
  645. newPriority: newProviderPriority,
  646. });
  647. return {
  648. updated: true,
  649. reason: "current_provider_not_found",
  650. details: `原供应商 ${currentProviderId} 不存在,更新到 ${newProviderId}`,
  651. };
  652. }
  653. const currentPriority = currentProvider.priority || 0;
  654. // 2.3 智能决策:优先级比较 + 健康检查
  655. // ========== 规则 A:新供应商优先级更高(数字更小)→ 直接迁移 ==========
  656. if (newProviderPriority < currentPriority) {
  657. const key = `session:${sessionId}:provider`;
  658. await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
  659. logger.info("SessionManager: Migrated to higher priority provider", {
  660. sessionId,
  661. oldProviderId: currentProviderId,
  662. oldProviderName: currentProvider.name,
  663. oldPriority: currentPriority,
  664. newProviderId,
  665. newPriority: newProviderPriority,
  666. });
  667. return {
  668. updated: true,
  669. reason: "priority_upgrade",
  670. details: `优先级升级:从供应商 ${currentProvider.name} (priority=${currentPriority}) 迁移到 ${newProviderId} (priority=${newProviderPriority})`,
  671. };
  672. }
  673. // ========== 规则 B:新供应商优先级相同或更低 → 检查原供应商健康状态 ==========
  674. const { isCircuitOpen } = await import("@/lib/circuit-breaker");
  675. const isCurrentCircuitOpen = await isCircuitOpen(currentProviderId);
  676. if (isCurrentCircuitOpen) {
  677. // 原供应商已熔断 → 更新到新供应商(备用供应商接管)
  678. const key = `session:${sessionId}:provider`;
  679. await redis.setex(key, SessionManager.SESSION_TTL, newProviderId.toString());
  680. logger.info("SessionManager: Migrated to backup provider (circuit open)", {
  681. sessionId,
  682. oldProviderId: currentProviderId,
  683. oldProviderName: currentProvider.name,
  684. oldPriority: currentPriority,
  685. newProviderId,
  686. newPriority: newProviderPriority,
  687. });
  688. return {
  689. updated: true,
  690. reason: "circuit_open_fallback",
  691. details: `原供应商 ${currentProvider.name} (priority=${currentPriority}) 已熔断,切换到供应商 ${newProviderId} (priority=${newProviderPriority})`,
  692. };
  693. }
  694. // 原供应商健康 + 优先级更高/相同 → 保持原绑定(尽量使用主供应商)
  695. logger.debug("SessionManager: Keeping current provider (healthy and higher/equal priority)", {
  696. sessionId,
  697. currentProviderId,
  698. currentProviderName: currentProvider.name,
  699. currentPriority,
  700. attemptedProviderId: newProviderId,
  701. attemptedPriority: newProviderPriority,
  702. });
  703. return {
  704. updated: false,
  705. reason: "keep_healthy_higher_priority",
  706. details: `保持原供应商 ${currentProvider.name} (priority=${currentPriority}, 健康),拒绝供应商 ${newProviderId} (priority=${newProviderPriority})`,
  707. };
  708. } catch (error) {
  709. logger.error("SessionManager: Failed to update session binding", {
  710. error,
  711. });
  712. return { updated: false, reason: "error", details: String(error) };
  713. }
  714. }
  715. /**
  716. * 存储 session 基础信息(请求开始时调用)
  717. */
  718. static async storeSessionInfo(sessionId: string, info: SessionStoreInfo): Promise<void> {
  719. const redis = getRedisClient();
  720. if (!redis || redis.status !== "ready") return;
  721. try {
  722. const pipeline = redis.pipeline();
  723. // 存储详细信息到 Hash
  724. pipeline.hset(`session:${sessionId}:info`, {
  725. userName: info.userName,
  726. userId: info.userId.toString(),
  727. keyId: info.keyId.toString(),
  728. keyName: info.keyName,
  729. model: info.model || "",
  730. apiType: info.apiType,
  731. startTime: Date.now().toString(),
  732. status: "in_progress", // 初始状态
  733. });
  734. // 设置 TTL
  735. pipeline.expire(`session:${sessionId}:info`, SessionManager.SESSION_TTL);
  736. await pipeline.exec();
  737. logger.trace("SessionManager: Stored session info", { sessionId });
  738. } catch (error) {
  739. logger.error("SessionManager: Failed to store session info", { error });
  740. }
  741. }
  742. /**
  743. * 更新 session 供应商信息(选择供应商后调用)
  744. */
  745. static async updateSessionProvider(
  746. sessionId: string,
  747. providerInfo: SessionProviderInfo
  748. ): Promise<void> {
  749. const redis = getRedisClient();
  750. if (!redis || redis.status !== "ready") return;
  751. try {
  752. const pipeline = redis.pipeline();
  753. // 更新 info Hash 中的 provider 字段
  754. pipeline.hset(`session:${sessionId}:info`, {
  755. providerId: providerInfo.providerId.toString(),
  756. providerName: providerInfo.providerName,
  757. });
  758. // 刷新 TTL
  759. pipeline.expire(`session:${sessionId}:info`, SessionManager.SESSION_TTL);
  760. await pipeline.exec();
  761. logger.trace("SessionManager: Updated session provider", {
  762. sessionId,
  763. providerName: providerInfo.providerName,
  764. });
  765. } catch (error) {
  766. logger.error("SessionManager: Failed to update session provider", {
  767. error,
  768. });
  769. }
  770. }
  771. /**
  772. * 更新 session 使用量和状态(响应完成时调用)
  773. */
  774. static async updateSessionUsage(sessionId: string, usage: SessionUsageUpdate): Promise<void> {
  775. const redis = getRedisClient();
  776. if (!redis || redis.status !== "ready") return;
  777. try {
  778. const pipeline = redis.pipeline();
  779. // 存储使用量到单独的 Hash
  780. const usageData: Record<string, string> = {
  781. status: usage.status,
  782. };
  783. if (usage.inputTokens !== undefined) {
  784. usageData.inputTokens = usage.inputTokens.toString();
  785. }
  786. if (usage.outputTokens !== undefined) {
  787. usageData.outputTokens = usage.outputTokens.toString();
  788. }
  789. if (usage.cacheCreationInputTokens !== undefined) {
  790. usageData.cacheCreationInputTokens = usage.cacheCreationInputTokens.toString();
  791. }
  792. if (usage.cacheReadInputTokens !== undefined) {
  793. usageData.cacheReadInputTokens = usage.cacheReadInputTokens.toString();
  794. }
  795. if (usage.costUsd !== undefined) {
  796. usageData.costUsd = usage.costUsd;
  797. }
  798. if (usage.statusCode !== undefined) {
  799. usageData.statusCode = usage.statusCode.toString();
  800. }
  801. if (usage.errorMessage !== undefined) {
  802. usageData.errorMessage = usage.errorMessage;
  803. }
  804. pipeline.hset(`session:${sessionId}:usage`, usageData);
  805. // 同时更新 info Hash 中的 status
  806. pipeline.hset(`session:${sessionId}:info`, "status", usage.status);
  807. // 刷新 TTL
  808. pipeline.expire(`session:${sessionId}:usage`, SessionManager.SESSION_TTL);
  809. pipeline.expire(`session:${sessionId}:info`, SessionManager.SESSION_TTL);
  810. await pipeline.exec();
  811. logger.trace("SessionManager: Updated session usage", {
  812. sessionId,
  813. status: usage.status,
  814. });
  815. } catch (error) {
  816. logger.error("SessionManager: Failed to update session usage", { error });
  817. }
  818. }
  819. /**
  820. * 存储 session 请求 messages
  821. *
  822. * 存储策略受 STORE_SESSION_MESSAGES 控制:
  823. * - true:原样存储 message 内容
  824. * - false(默认):存储但对 message 内容脱敏 [REDACTED]
  825. *
  826. * @param sessionId - Session ID
  827. * @param messages - 消息内容
  828. * @param requestSequence - 可选,请求序号。提供时使用新的 key 格式存储独立消息
  829. */
  830. static async storeSessionMessages(
  831. sessionId: string,
  832. messages: unknown,
  833. requestSequence?: number
  834. ): Promise<void> {
  835. const redis = getRedisClient();
  836. if (!redis || redis.status !== "ready") return;
  837. try {
  838. // 根据配置决定是否脱敏
  839. const messagesToStore = SessionManager.STORE_MESSAGES ? messages : redactMessages(messages);
  840. const messagesJson = JSON.stringify(messagesToStore);
  841. // 新格式:session:{sessionId}:req:{sequence}:messages(独立存储每个请求)
  842. // 旧格式:session:{sessionId}:messages(向后兼容)
  843. const key = requestSequence
  844. ? `session:${sessionId}:req:${requestSequence}:messages`
  845. : `session:${sessionId}:messages`;
  846. await redis.setex(key, SessionManager.SESSION_TTL, messagesJson);
  847. logger.trace("SessionManager: Stored session messages", {
  848. sessionId,
  849. requestSequence,
  850. key,
  851. redacted: !SessionManager.STORE_MESSAGES,
  852. });
  853. } catch (error) {
  854. logger.error("SessionManager: Failed to store session messages", {
  855. error,
  856. });
  857. }
  858. }
  859. /**
  860. * 辅助方法:从 Redis Hash 数据构建 ActiveSessionInfo 对象
  861. *
  862. * @private
  863. */
  864. private static buildSessionInfo(
  865. sessionId: string,
  866. info: Record<string, string>,
  867. usage: Record<string, string>
  868. ): ActiveSessionInfo {
  869. const startTime = parseInt(info.startTime || "0", 10);
  870. const now = Date.now();
  871. const session: ActiveSessionInfo = {
  872. sessionId,
  873. userName: info.userName || "unknown",
  874. userId: parseInt(info.userId || "0", 10),
  875. keyId: parseInt(info.keyId || "0", 10),
  876. keyName: info.keyName || "unknown",
  877. providerId: info.providerId ? parseInt(info.providerId, 10) : null,
  878. providerName: info.providerName || null,
  879. model: info.model || null,
  880. apiType: (info.apiType as "chat" | "codex") || "chat",
  881. startTime,
  882. status: (usage.status || info.status || "in_progress") as
  883. | "in_progress"
  884. | "completed"
  885. | "error",
  886. durationMs: startTime > 0 ? now - startTime : undefined,
  887. };
  888. // 添加 usage 数据(如果存在)
  889. if (usage && Object.keys(usage).length > 0) {
  890. if (usage.inputTokens) session.inputTokens = parseInt(usage.inputTokens, 10);
  891. if (usage.outputTokens) session.outputTokens = parseInt(usage.outputTokens, 10);
  892. if (usage.cacheCreationInputTokens)
  893. session.cacheCreationInputTokens = parseInt(usage.cacheCreationInputTokens, 10);
  894. if (usage.cacheReadInputTokens)
  895. session.cacheReadInputTokens = parseInt(usage.cacheReadInputTokens, 10);
  896. if (usage.costUsd) session.costUsd = usage.costUsd;
  897. if (usage.statusCode) session.statusCode = parseInt(usage.statusCode, 10);
  898. if (usage.errorMessage) session.errorMessage = usage.errorMessage;
  899. // 计算总 token
  900. const input = session.inputTokens || 0;
  901. const output = session.outputTokens || 0;
  902. const cacheCreate = session.cacheCreationInputTokens || 0;
  903. const cacheRead = session.cacheReadInputTokens || 0;
  904. session.totalTokens = input + output + cacheCreate + cacheRead;
  905. }
  906. return session;
  907. }
  908. /**
  909. * 获取活跃 session 列表(用于实时监控页面)
  910. */
  911. static async getActiveSessions(): Promise<ActiveSessionInfo[]> {
  912. const redis = getRedisClient();
  913. if (!redis || redis.status !== "ready") {
  914. logger.warn("SessionManager: Redis not ready, returning empty list");
  915. return [];
  916. }
  917. try {
  918. // 1. 使用 SessionTracker 获取活跃 session ID(自动兼容 ZSET/Set)
  919. const sessionIds = await SessionTracker.getActiveSessions();
  920. if (sessionIds.length === 0) {
  921. return [];
  922. }
  923. logger.trace("SessionManager: Found active sessions", {
  924. count: sessionIds.length,
  925. });
  926. // 2. 批量获取 session 详细信息
  927. const sessions: ActiveSessionInfo[] = [];
  928. const pipeline = redis.pipeline();
  929. for (const sessionId of sessionIds) {
  930. pipeline.hgetall(`session:${sessionId}:info`);
  931. pipeline.hgetall(`session:${sessionId}:usage`);
  932. }
  933. const results = await pipeline.exec();
  934. if (!results) {
  935. return [];
  936. }
  937. // 3. 解析结果
  938. for (let i = 0; i < sessionIds.length; i++) {
  939. const infoIndex = i * 2;
  940. const usageIndex = i * 2 + 1;
  941. const infoResult = results[infoIndex];
  942. const usageResult = results[usageIndex];
  943. // 检查结果有效性
  944. if (!infoResult || infoResult[0] !== null) continue;
  945. if (!usageResult || usageResult[0] !== null) continue;
  946. const info = infoResult[1] as Record<string, string>;
  947. const usage = usageResult[1] as Record<string, string>;
  948. // 跳过空的 info(session 可能已过期)
  949. if (!info || Object.keys(info).length === 0) continue;
  950. // 使用辅助方法构建 session 对象
  951. const session = SessionManager.buildSessionInfo(sessionIds[i], info, usage);
  952. sessions.push(session);
  953. }
  954. logger.trace("SessionManager: Retrieved active sessions with details", {
  955. count: sessions.length,
  956. });
  957. return sessions;
  958. } catch (error) {
  959. logger.error("SessionManager: Failed to get active sessions", { error });
  960. return [];
  961. }
  962. }
  963. /**
  964. * 获取所有 session(包括非活跃的)
  965. *
  966. * 使用 SCAN 扫描 Redis 中所有 session:*:info key,
  967. * 按最后活跃时间分为活跃(5 分钟内)和非活跃两组。
  968. *
  969. * @returns { active: 活跃 session 列表, inactive: 非活跃 session 列表 }
  970. */
  971. static async getAllSessionsWithExpiry(): Promise<{
  972. active: ActiveSessionInfo[];
  973. inactive: ActiveSessionInfo[];
  974. }> {
  975. const redis = getRedisClient();
  976. if (!redis || redis.status !== "ready") {
  977. logger.warn("SessionManager: Redis not ready, returning empty lists");
  978. return { active: [], inactive: [] };
  979. }
  980. try {
  981. const now = Date.now();
  982. const fiveMinutesAgo = now - SessionManager.SESSION_TTL * 1000; // SESSION_TTL 是秒,转为毫秒
  983. // 1. 使用 SCAN 扫描所有 session:*:info key
  984. const allSessions: ActiveSessionInfo[] = [];
  985. let cursor = "0";
  986. do {
  987. const [nextCursor, keys] = (await redis.scan(
  988. cursor,
  989. "MATCH",
  990. "session:*:info",
  991. "COUNT",
  992. 100
  993. )) as [string, string[]];
  994. cursor = nextCursor;
  995. if (keys.length > 0) {
  996. // 2. 批量获取 session info 和 usage
  997. const pipeline = redis.pipeline();
  998. for (const key of keys) {
  999. pipeline.hgetall(key);
  1000. // 提取 sessionId
  1001. const sessionId = key.replace("session:", "").replace(":info", "");
  1002. pipeline.hgetall(`session:${sessionId}:usage`);
  1003. }
  1004. const results = await pipeline.exec();
  1005. if (!results) continue;
  1006. // 3. 解析结果
  1007. for (let i = 0; i < keys.length; i++) {
  1008. const infoIndex = i * 2;
  1009. const usageIndex = i * 2 + 1;
  1010. const infoResult = results[infoIndex];
  1011. const usageResult = results[usageIndex];
  1012. // 检查结果有效性
  1013. if (!infoResult || infoResult[0] !== null) continue;
  1014. if (!usageResult || usageResult[0] !== null) continue;
  1015. const info = infoResult[1] as Record<string, string>;
  1016. const usage = usageResult[1] as Record<string, string>;
  1017. // 跳过空的 info
  1018. if (!info || Object.keys(info).length === 0) continue;
  1019. // 提取 sessionId
  1020. const sessionId = keys[i].replace("session:", "").replace(":info", "");
  1021. // 使用辅助方法构建 session 对象
  1022. const session = SessionManager.buildSessionInfo(sessionId, info, usage);
  1023. allSessions.push(session);
  1024. }
  1025. }
  1026. } while (cursor !== "0");
  1027. // 4. 按最后活跃时间分组
  1028. const active: ActiveSessionInfo[] = [];
  1029. const inactive: ActiveSessionInfo[] = [];
  1030. for (const session of allSessions) {
  1031. if (session.startTime >= fiveMinutesAgo) {
  1032. active.push(session);
  1033. } else {
  1034. inactive.push(session);
  1035. }
  1036. }
  1037. logger.trace("SessionManager: Found sessions", {
  1038. active: active.length,
  1039. inactive: inactive.length,
  1040. total: allSessions.length,
  1041. });
  1042. return { active, inactive };
  1043. } catch (error) {
  1044. logger.error("SessionManager: Failed to get all sessions", { error });
  1045. return { active: [], inactive: [] };
  1046. }
  1047. }
  1048. /**
  1049. * 获取所有 session ID 列表(轻量级版本)
  1050. * 仅返回 session ID,不返回详细信息
  1051. *
  1052. * @returns session ID 数组
  1053. */
  1054. static async getAllSessionIds(): Promise<string[]> {
  1055. const redis = getRedisClient();
  1056. if (!redis || redis.status !== "ready") {
  1057. logger.warn("SessionManager: Redis not ready, returning empty list");
  1058. return [];
  1059. }
  1060. try {
  1061. const sessionIds: string[] = [];
  1062. let cursor = "0";
  1063. do {
  1064. const [nextCursor, keys] = (await redis.scan(
  1065. cursor,
  1066. "MATCH",
  1067. "session:*:info",
  1068. "COUNT",
  1069. 100
  1070. )) as [string, string[]];
  1071. cursor = nextCursor;
  1072. if (keys.length > 0) {
  1073. // 提取 sessionId
  1074. for (const key of keys) {
  1075. const sessionId = key.replace("session:", "").replace(":info", "");
  1076. sessionIds.push(sessionId);
  1077. }
  1078. }
  1079. } while (cursor !== "0");
  1080. logger.trace(`SessionManager: Found ${sessionIds.length} session IDs`);
  1081. return sessionIds;
  1082. } catch (error) {
  1083. logger.error("SessionManager: Failed to get session IDs", { error });
  1084. return [];
  1085. }
  1086. }
  1087. /**
  1088. * 获取 session 的 messages 内容
  1089. *
  1090. * @param sessionId - Session ID
  1091. * @param requestSequence - 可选,请求序号。提供时读取特定请求的消息
  1092. * @returns 消息内容(解析后的 JSON 对象,可能已脱敏)
  1093. */
  1094. static async getSessionMessages(
  1095. sessionId: string,
  1096. requestSequence?: number
  1097. ): Promise<unknown | null> {
  1098. const redis = getRedisClient();
  1099. if (!redis || redis.status !== "ready") return null;
  1100. try {
  1101. // 优先尝试新格式
  1102. if (requestSequence) {
  1103. const newKey = `session:${sessionId}:req:${requestSequence}:messages`;
  1104. const messagesJson = await redis.get(newKey);
  1105. if (messagesJson) {
  1106. return JSON.parse(messagesJson);
  1107. }
  1108. }
  1109. // 向后兼容:尝试旧格式
  1110. const legacyKey = `session:${sessionId}:messages`;
  1111. const messagesJson = await redis.get(legacyKey);
  1112. if (!messagesJson) {
  1113. return null;
  1114. }
  1115. return JSON.parse(messagesJson);
  1116. } catch (error) {
  1117. logger.error("SessionManager: Failed to get session messages", { error });
  1118. return null;
  1119. }
  1120. }
  1121. /**
  1122. * 检查 Session 是否有任意请求的 messages
  1123. *
  1124. * 使用 Redis SCAN 检查是否存在任意格式的 messages key:
  1125. * - 新格式:session:{sessionId}:req:*:messages
  1126. * - 旧格式:session:{sessionId}:messages
  1127. *
  1128. * @param sessionId - Session ID
  1129. * @returns 是否存在任意 messages
  1130. */
  1131. static async hasAnySessionMessages(sessionId: string): Promise<boolean> {
  1132. const redis = getRedisClient();
  1133. if (!redis || redis.status !== "ready") return false;
  1134. try {
  1135. // 1. 先检查旧格式(直接 EXISTS 更高效)
  1136. const legacyKey = `session:${sessionId}:messages`;
  1137. const legacyExists = await redis.exists(legacyKey);
  1138. if (legacyExists) {
  1139. return true;
  1140. }
  1141. // 2. 检查新格式:使用 SCAN 搜索 session:{sessionId}:req:*:messages
  1142. let cursor = "0";
  1143. do {
  1144. const [nextCursor, keys] = (await redis.scan(
  1145. cursor,
  1146. "MATCH",
  1147. `session:${sessionId}:req:*:messages`,
  1148. "COUNT",
  1149. 100
  1150. )) as [string, string[]];
  1151. cursor = nextCursor;
  1152. // 找到任意一个就返回 true
  1153. if (keys.length > 0) {
  1154. return true;
  1155. }
  1156. } while (cursor !== "0");
  1157. return false;
  1158. } catch (error) {
  1159. logger.error("SessionManager: Failed to check session messages existence", { error });
  1160. return false;
  1161. }
  1162. }
  1163. /**
  1164. * 存储 session 响应体(临时存储,5分钟过期)
  1165. *
  1166. * 存储行为受 STORE_SESSION_RESPONSE_BODY 控制:
  1167. * - true (默认):存储响应体到 Redis 临时缓存
  1168. * - false:不存储(注意:不影响本次请求处理与统计,仅影响后续查看 response body)
  1169. *
  1170. * 存储策略(脱敏/原样)受 STORE_SESSION_MESSAGES 控制:
  1171. * - true:原样存储响应内容
  1172. * - false(默认):对 JSON 响应体中的 message 内容脱敏 [REDACTED]
  1173. *
  1174. * @param sessionId - Session ID
  1175. * @param response - 响应体内容(字符串或对象)
  1176. * @param requestSequence - 可选,请求序号。提供时使用新的 key 格式存储独立响应
  1177. */
  1178. static async storeSessionResponse(
  1179. sessionId: string,
  1180. response: string | object,
  1181. requestSequence?: number
  1182. ): Promise<void> {
  1183. // 允许通过环境变量显式关闭响应体存储(例如隐私/节省 Redis 内存)。
  1184. // 注意:这里仅关闭“写入 Redis”这一步;调用方仍然可能在内存中读取响应体用于统计或错误检测。
  1185. if (!getEnvConfig().STORE_SESSION_RESPONSE_BODY) return;
  1186. const redis = getRedisClient();
  1187. if (!redis || redis.status !== "ready") return;
  1188. try {
  1189. let responseString: string;
  1190. if (SessionManager.STORE_MESSAGES) {
  1191. // 原样存储
  1192. responseString = typeof response === "string" ? response : JSON.stringify(response);
  1193. } else {
  1194. // 尝试解析 JSON 并脱敏
  1195. if (typeof response === "object") {
  1196. responseString = JSON.stringify(redactResponseBody(response));
  1197. } else {
  1198. // 字符串响应 - 尝试解析为 JSON
  1199. try {
  1200. const parsed = JSON.parse(response);
  1201. responseString = JSON.stringify(redactResponseBody(parsed));
  1202. } catch {
  1203. // 非 JSON(如 SSE 流),原样存储
  1204. responseString = response;
  1205. }
  1206. }
  1207. }
  1208. // 新格式:session:{sessionId}:req:{sequence}:response(独立存储每个请求)
  1209. // 旧格式:session:{sessionId}:response(向后兼容)
  1210. const key = requestSequence
  1211. ? `session:${sessionId}:req:${requestSequence}:response`
  1212. : `session:${sessionId}:response`;
  1213. await redis.setex(key, SessionManager.SESSION_TTL, responseString);
  1214. logger.trace("SessionManager: Stored session response", {
  1215. sessionId,
  1216. requestSequence,
  1217. size: responseString.length,
  1218. redacted: !SessionManager.STORE_MESSAGES,
  1219. });
  1220. } catch (error) {
  1221. logger.error("SessionManager: Failed to store session response", {
  1222. error,
  1223. });
  1224. }
  1225. }
  1226. /**
  1227. * 存储 session 完整请求体(客户端原始请求体,临时存储,5分钟过期)
  1228. *
  1229. * 存储策略受 STORE_SESSION_MESSAGES 控制:
  1230. * - true:原样存储请求体内容
  1231. * - false(默认):存储但对 message 内容脱敏 [REDACTED]
  1232. *
  1233. * @param sessionId - Session ID
  1234. * @param requestBody - 请求体(完整 JSON)
  1235. * @param requestSequence - 可选,请求序号
  1236. */
  1237. static async storeSessionRequestBody(
  1238. sessionId: string,
  1239. requestBody: unknown,
  1240. requestSequence?: number
  1241. ): Promise<void> {
  1242. const redis = getRedisClient();
  1243. if (!redis || redis.status !== "ready") return;
  1244. try {
  1245. const sequence = normalizeRequestSequence(requestSequence) ?? 1;
  1246. const key = `session:${sessionId}:req:${sequence}:requestBody`;
  1247. // 根据配置决定是否脱敏
  1248. const bodyToStore = SessionManager.STORE_MESSAGES
  1249. ? requestBody
  1250. : redactRequestBody(requestBody);
  1251. const payload = JSON.stringify(bodyToStore);
  1252. await redis.setex(key, SessionManager.SESSION_TTL, payload);
  1253. logger.trace("SessionManager: Stored session request body", {
  1254. sessionId,
  1255. requestSequence: sequence,
  1256. key,
  1257. size: payload.length,
  1258. redacted: !SessionManager.STORE_MESSAGES,
  1259. });
  1260. } catch (error) {
  1261. logger.error("SessionManager: Failed to store session request body", { error, sessionId });
  1262. }
  1263. }
  1264. /**
  1265. * 获取 session 完整请求体(客户端原始请求体,可能已脱敏)
  1266. *
  1267. * @param sessionId - Session ID
  1268. * @param requestSequence - 请求序号
  1269. * @returns 解析后的 JSON 对象(可能已脱敏)
  1270. */
  1271. static async getSessionRequestBody(
  1272. sessionId: string,
  1273. requestSequence?: number
  1274. ): Promise<unknown | null> {
  1275. const redis = getRedisClient();
  1276. if (!redis || redis.status !== "ready") return null;
  1277. try {
  1278. const sequence = normalizeRequestSequence(requestSequence);
  1279. if (!sequence) return null;
  1280. const key = `session:${sessionId}:req:${sequence}:requestBody`;
  1281. const value = await redis.get(key);
  1282. if (!value) return null;
  1283. return JSON.parse(value) as unknown;
  1284. } catch (error) {
  1285. logger.error("SessionManager: Failed to get session request body", { error, sessionId });
  1286. return null;
  1287. }
  1288. }
  1289. /**
  1290. * 存储特殊设置(审计字段,临时存储,5分钟过期)
  1291. *
  1292. * @param sessionId - Session ID
  1293. * @param specialSettings - 特殊设置(可为空)
  1294. * @param requestSequence - 请求序号
  1295. */
  1296. static async storeSessionSpecialSettings(
  1297. sessionId: string,
  1298. specialSettings: SpecialSetting[] | null,
  1299. requestSequence?: number
  1300. ): Promise<void> {
  1301. if (!specialSettings || specialSettings.length === 0) {
  1302. return;
  1303. }
  1304. const redis = getRedisClient();
  1305. if (!redis || redis.status !== "ready") return;
  1306. try {
  1307. const sequence = normalizeRequestSequence(requestSequence) ?? 1;
  1308. const key = `session:${sessionId}:req:${sequence}:specialSettings`;
  1309. const payload = JSON.stringify(specialSettings);
  1310. await redis.setex(key, SessionManager.SESSION_TTL, payload);
  1311. } catch (error) {
  1312. logger.error("SessionManager: Failed to store special settings", { error, sessionId });
  1313. }
  1314. }
  1315. static async getSessionSpecialSettings(
  1316. sessionId: string,
  1317. requestSequence?: number
  1318. ): Promise<SpecialSetting[] | null> {
  1319. const redis = getRedisClient();
  1320. if (!redis || redis.status !== "ready") return null;
  1321. try {
  1322. const sequence = normalizeRequestSequence(requestSequence);
  1323. if (!sequence) return null;
  1324. const key = `session:${sessionId}:req:${sequence}:specialSettings`;
  1325. const value = await redis.get(key);
  1326. if (!value) return null;
  1327. const parsed: unknown = JSON.parse(value);
  1328. if (!Array.isArray(parsed)) return null;
  1329. return parsed as SpecialSetting[];
  1330. } catch (error) {
  1331. logger.error("SessionManager: Failed to get special settings", { error, sessionId });
  1332. return null;
  1333. }
  1334. }
  1335. /**
  1336. * 存储客户端请求元信息(端点/方法,临时存储,5分钟过期)
  1337. *
  1338. * @param sessionId - Session ID
  1339. * @param meta - 元信息
  1340. * @param requestSequence - 请求序号
  1341. */
  1342. static async storeSessionClientRequestMeta(
  1343. sessionId: string,
  1344. meta: { url: string | URL; method: string },
  1345. requestSequence?: number
  1346. ): Promise<void> {
  1347. const redis = getRedisClient();
  1348. if (!redis || redis.status !== "ready") return;
  1349. try {
  1350. const sequence = normalizeRequestSequence(requestSequence) ?? 1;
  1351. const key = `session:${sessionId}:req:${sequence}:clientReqMeta`;
  1352. const payload: SessionRequestMeta = {
  1353. url: sanitizeUrl(meta.url),
  1354. method: meta.method,
  1355. };
  1356. await redis.setex(key, SessionManager.SESSION_TTL, JSON.stringify(payload));
  1357. } catch (error) {
  1358. logger.error("SessionManager: Failed to store client request meta", { error, sessionId });
  1359. }
  1360. }
  1361. static async getSessionClientRequestMeta(
  1362. sessionId: string,
  1363. requestSequence?: number
  1364. ): Promise<SessionRequestMeta | null> {
  1365. const redis = getRedisClient();
  1366. if (!redis || redis.status !== "ready") return null;
  1367. try {
  1368. const sequence = normalizeRequestSequence(requestSequence);
  1369. if (!sequence) return null;
  1370. const key = `session:${sessionId}:req:${sequence}:clientReqMeta`;
  1371. const value = await redis.get(key);
  1372. if (!value) return null;
  1373. const parsed: unknown = JSON.parse(value);
  1374. if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
  1375. const obj = parsed as Record<string, unknown>;
  1376. if (typeof obj.url !== "string" || typeof obj.method !== "string") return null;
  1377. return { url: obj.url, method: obj.method };
  1378. } catch (error) {
  1379. logger.error("SessionManager: Failed to get client request meta", { error, sessionId });
  1380. return null;
  1381. }
  1382. }
  1383. /**
  1384. * 存储上游请求元信息(端点/方法,临时存储,5分钟过期)
  1385. *
  1386. * @param sessionId - Session ID
  1387. * @param meta - 元信息
  1388. * @param requestSequence - 请求序号
  1389. */
  1390. static async storeSessionUpstreamRequestMeta(
  1391. sessionId: string,
  1392. meta: { url: string | URL; method: string },
  1393. requestSequence?: number
  1394. ): Promise<void> {
  1395. const redis = getRedisClient();
  1396. if (!redis || redis.status !== "ready") return;
  1397. try {
  1398. const sequence = normalizeRequestSequence(requestSequence) ?? 1;
  1399. const key = `session:${sessionId}:req:${sequence}:upstreamReqMeta`;
  1400. const payload: SessionRequestMeta = {
  1401. url: sanitizeUrl(meta.url),
  1402. method: meta.method,
  1403. };
  1404. await redis.setex(key, SessionManager.SESSION_TTL, JSON.stringify(payload));
  1405. } catch (error) {
  1406. logger.error("SessionManager: Failed to store upstream request meta", { error, sessionId });
  1407. }
  1408. }
  1409. static async getSessionUpstreamRequestMeta(
  1410. sessionId: string,
  1411. requestSequence?: number
  1412. ): Promise<SessionRequestMeta | null> {
  1413. const redis = getRedisClient();
  1414. if (!redis || redis.status !== "ready") return null;
  1415. try {
  1416. const sequence = normalizeRequestSequence(requestSequence);
  1417. if (!sequence) return null;
  1418. const key = `session:${sessionId}:req:${sequence}:upstreamReqMeta`;
  1419. const value = await redis.get(key);
  1420. if (!value) return null;
  1421. const parsed: unknown = JSON.parse(value);
  1422. if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
  1423. const obj = parsed as Record<string, unknown>;
  1424. if (typeof obj.url !== "string" || typeof obj.method !== "string") return null;
  1425. return { url: obj.url, method: obj.method };
  1426. } catch (error) {
  1427. logger.error("SessionManager: Failed to get upstream request meta", { error, sessionId });
  1428. return null;
  1429. }
  1430. }
  1431. /**
  1432. * 存储上游响应元信息(端点/状态码,临时存储,5分钟过期)
  1433. *
  1434. * @param sessionId - Session ID
  1435. * @param meta - 元信息
  1436. * @param requestSequence - 请求序号
  1437. */
  1438. static async storeSessionUpstreamResponseMeta(
  1439. sessionId: string,
  1440. meta: { url: string | URL; statusCode: number },
  1441. requestSequence?: number
  1442. ): Promise<void> {
  1443. const redis = getRedisClient();
  1444. if (!redis || redis.status !== "ready") return;
  1445. try {
  1446. const sequence = normalizeRequestSequence(requestSequence) ?? 1;
  1447. const key = `session:${sessionId}:req:${sequence}:upstreamResMeta`;
  1448. const payload: SessionResponseMeta = {
  1449. url: sanitizeUrl(meta.url),
  1450. statusCode: meta.statusCode,
  1451. };
  1452. await redis.setex(key, SessionManager.SESSION_TTL, JSON.stringify(payload));
  1453. } catch (error) {
  1454. logger.error("SessionManager: Failed to store upstream response meta", { error, sessionId });
  1455. }
  1456. }
  1457. static async getSessionUpstreamResponseMeta(
  1458. sessionId: string,
  1459. requestSequence?: number
  1460. ): Promise<SessionResponseMeta | null> {
  1461. const redis = getRedisClient();
  1462. if (!redis || redis.status !== "ready") return null;
  1463. try {
  1464. const sequence = normalizeRequestSequence(requestSequence);
  1465. if (!sequence) return null;
  1466. const key = `session:${sessionId}:req:${sequence}:upstreamResMeta`;
  1467. const value = await redis.get(key);
  1468. if (!value) return null;
  1469. const parsed: unknown = JSON.parse(value);
  1470. if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null;
  1471. const obj = parsed as Record<string, unknown>;
  1472. if (typeof obj.url !== "string" || typeof obj.statusCode !== "number") return null;
  1473. return { url: obj.url, statusCode: obj.statusCode };
  1474. } catch (error) {
  1475. logger.error("SessionManager: Failed to get upstream response meta", { error, sessionId });
  1476. return null;
  1477. }
  1478. }
  1479. static async storeSessionRequestHeaders(
  1480. sessionId: string,
  1481. headers: Headers,
  1482. requestSequence?: number
  1483. ): Promise<void> {
  1484. const redis = getRedisClient();
  1485. if (!redis || redis.status !== "ready") return;
  1486. try {
  1487. const sequence = normalizeRequestSequence(requestSequence) ?? 1;
  1488. const key = `session:${sessionId}:req:${sequence}:reqHeaders`;
  1489. const headersJson = JSON.stringify(headersToSanitizedObject(headers));
  1490. await redis.setex(key, SessionManager.SESSION_TTL, headersJson);
  1491. logger.trace("SessionManager: Stored session request headers", {
  1492. sessionId,
  1493. requestSequence: sequence,
  1494. key,
  1495. });
  1496. } catch (error) {
  1497. logger.error("SessionManager: Failed to store session request headers", { error, sessionId });
  1498. }
  1499. }
  1500. static async storeSessionResponseHeaders(
  1501. sessionId: string,
  1502. headers: Headers,
  1503. requestSequence?: number
  1504. ): Promise<void> {
  1505. const redis = getRedisClient();
  1506. if (!redis || redis.status !== "ready") return;
  1507. try {
  1508. const sequence = normalizeRequestSequence(requestSequence) ?? 1;
  1509. const key = `session:${sessionId}:req:${sequence}:resHeaders`;
  1510. const headersJson = JSON.stringify(headersToSanitizedObject(headers));
  1511. await redis.setex(key, SessionManager.SESSION_TTL, headersJson);
  1512. logger.trace("SessionManager: Stored session response headers", {
  1513. sessionId,
  1514. requestSequence: sequence,
  1515. key,
  1516. });
  1517. } catch (error) {
  1518. logger.error("SessionManager: Failed to store session response headers", {
  1519. error,
  1520. sessionId,
  1521. });
  1522. }
  1523. }
  1524. static async getSessionRequestHeaders(
  1525. sessionId: string,
  1526. requestSequence?: number
  1527. ): Promise<Record<string, string> | null> {
  1528. const redis = getRedisClient();
  1529. if (!redis || redis.status !== "ready") return null;
  1530. try {
  1531. const sequence = normalizeRequestSequence(requestSequence);
  1532. if (!sequence) return null;
  1533. const key = `session:${sessionId}:req:${sequence}:reqHeaders`;
  1534. const value = await redis.get(key);
  1535. if (!value) return null;
  1536. return parseHeaderRecord(value);
  1537. } catch (error) {
  1538. logger.error("SessionManager: Failed to get session request headers", { error, sessionId });
  1539. return null;
  1540. }
  1541. }
  1542. static async getSessionResponseHeaders(
  1543. sessionId: string,
  1544. requestSequence?: number
  1545. ): Promise<Record<string, string> | null> {
  1546. const redis = getRedisClient();
  1547. if (!redis || redis.status !== "ready") return null;
  1548. try {
  1549. const sequence = normalizeRequestSequence(requestSequence);
  1550. if (!sequence) return null;
  1551. const key = `session:${sessionId}:req:${sequence}:resHeaders`;
  1552. const value = await redis.get(key);
  1553. if (!value) return null;
  1554. return parseHeaderRecord(value);
  1555. } catch (error) {
  1556. logger.error("SessionManager: Failed to get session response headers", { error, sessionId });
  1557. return null;
  1558. }
  1559. }
  1560. /**
  1561. * 获取 session 响应体
  1562. *
  1563. * @param sessionId - Session ID
  1564. * @param requestSequence - 可选,请求序号。提供时读取特定请求的响应
  1565. * @returns 响应体内容(字符串)
  1566. */
  1567. static async getSessionResponse(
  1568. sessionId: string,
  1569. requestSequence?: number
  1570. ): Promise<string | null> {
  1571. const redis = getRedisClient();
  1572. if (!redis || redis.status !== "ready") return null;
  1573. try {
  1574. // 优先尝试新格式
  1575. if (requestSequence) {
  1576. const newKey = `session:${sessionId}:req:${requestSequence}:response`;
  1577. const response = await redis.get(newKey);
  1578. if (response) return response;
  1579. }
  1580. // 向后兼容:尝试旧格式
  1581. const legacyKey = `session:${sessionId}:response`;
  1582. const response = await redis.get(legacyKey);
  1583. return response;
  1584. } catch (error) {
  1585. logger.error("SessionManager: Failed to get session response", { error });
  1586. return null;
  1587. }
  1588. }
  1589. /**
  1590. * 从 Codex 响应中提取 prompt_cache_key 作为 Session ID
  1591. *
  1592. * Codex 响应中包含 prompt_cache_key 字段(UUID 格式),用于标识缓存上下文。
  1593. * 这个字段出现在 response.created、response.in_progress、response.completed 等事件中。
  1594. *
  1595. * @param responseData - Codex 响应数据(流式事件的 data 部分或完整响应)
  1596. * @returns prompt_cache_key 或 null
  1597. */
  1598. static extractCodexPromptCacheKey(responseData: Record<string, unknown>): string | null {
  1599. // 检查 response 对象中的 prompt_cache_key(SSE 事件格式)
  1600. const response = responseData.response as Record<string, unknown> | undefined;
  1601. if (
  1602. response &&
  1603. typeof response.prompt_cache_key === "string" &&
  1604. response.prompt_cache_key.length > 0
  1605. ) {
  1606. logger.trace("SessionManager: Extracted prompt_cache_key from response object", {
  1607. promptCacheKey: response.prompt_cache_key,
  1608. });
  1609. return response.prompt_cache_key;
  1610. }
  1611. // 备选:直接在顶层检查(非流式响应格式)
  1612. if (
  1613. typeof responseData.prompt_cache_key === "string" &&
  1614. responseData.prompt_cache_key.length > 0
  1615. ) {
  1616. logger.trace("SessionManager: Extracted prompt_cache_key from top level", {
  1617. promptCacheKey: responseData.prompt_cache_key,
  1618. });
  1619. return responseData.prompt_cache_key;
  1620. }
  1621. return null;
  1622. }
  1623. /**
  1624. * 使用 Codex 的 prompt_cache_key 更新 Session 绑定
  1625. *
  1626. * 策略:如果响应中包含 prompt_cache_key,使用它作为 Session ID 的来源。
  1627. * 这类似于 Claude 从请求 metadata 中提取 session_id 的机制。
  1628. *
  1629. * Session ID 格式:codex_{prompt_cache_key}(添加前缀以区分来源)
  1630. *
  1631. * @param currentSessionId - 当前的 Session ID(可能是生成的或从请求提取的)
  1632. * @param promptCacheKey - Codex 响应中的 prompt_cache_key
  1633. * @param providerId - 供应商 ID
  1634. * @returns 更新后的 Session ID 和是否创建了新绑定
  1635. */
  1636. static async updateSessionWithCodexCacheKey(
  1637. currentSessionId: string,
  1638. promptCacheKey: string,
  1639. providerId: number
  1640. ): Promise<{ sessionId: string; updated: boolean }> {
  1641. const redis = getRedisClient();
  1642. if (!redis || redis.status !== "ready") {
  1643. logger.debug("SessionManager: Redis not ready, skipping Codex session update");
  1644. return { sessionId: currentSessionId, updated: false };
  1645. }
  1646. try {
  1647. // 使用 prompt_cache_key 作为新的 Session ID(添加前缀以区分)
  1648. const codexSessionId = `codex_${promptCacheKey}`;
  1649. // 检查是否已经存在绑定
  1650. const existingProvider = await redis.get(`session:${codexSessionId}:provider`);
  1651. if (existingProvider) {
  1652. // 已存在绑定,刷新 TTL
  1653. await redis.expire(`session:${codexSessionId}:provider`, SessionManager.SESSION_TTL);
  1654. logger.debug("SessionManager: Refreshed Codex session TTL", {
  1655. sessionId: codexSessionId,
  1656. providerId: parseInt(existingProvider, 10),
  1657. });
  1658. return { sessionId: codexSessionId, updated: false };
  1659. }
  1660. // 新建绑定
  1661. await redis.set(
  1662. `session:${codexSessionId}:provider`,
  1663. providerId.toString(),
  1664. "EX",
  1665. SessionManager.SESSION_TTL
  1666. );
  1667. logger.info("SessionManager: Created Codex session from prompt_cache_key", {
  1668. sessionId: codexSessionId,
  1669. promptCacheKey,
  1670. providerId,
  1671. ttl: SessionManager.SESSION_TTL,
  1672. });
  1673. return { sessionId: codexSessionId, updated: true };
  1674. } catch (error) {
  1675. logger.error("SessionManager: Failed to update Codex session", { error });
  1676. return { sessionId: currentSessionId, updated: false };
  1677. }
  1678. }
  1679. /**
  1680. * 终止 Session(主动打断)
  1681. *
  1682. * 功能:删除 Session 在 Redis 中的所有绑定关系,强制下次请求重新选择供应商
  1683. * 用途:管理员主动打断长时间占用同一供应商的 Session
  1684. *
  1685. * @param sessionId - Session ID
  1686. * @returns 是否成功删除
  1687. */
  1688. static async terminateSession(sessionId: string): Promise<boolean> {
  1689. const redis = getRedisClient();
  1690. if (!redis || redis.status !== "ready") {
  1691. logger.warn("SessionManager: Redis not ready, cannot terminate session");
  1692. return false;
  1693. }
  1694. try {
  1695. // 1. 先查询绑定信息(用于从 ZSET 中移除)
  1696. let providerId: number | null = null;
  1697. let keyId: number | null = null;
  1698. try {
  1699. const [providerIdStr, keyIdStr] = await Promise.all([
  1700. redis.get(`session:${sessionId}:provider`),
  1701. redis.get(`session:${sessionId}:key`),
  1702. ]);
  1703. providerId = providerIdStr ? parseInt(providerIdStr, 10) : null;
  1704. keyId = keyIdStr ? parseInt(keyIdStr, 10) : null;
  1705. } catch (lookupError) {
  1706. // Redis 查询失败不应阻止清理操作,继续执行删除
  1707. logger.warn(
  1708. "SessionManager: Failed to lookup session binding info, continuing with cleanup",
  1709. {
  1710. sessionId,
  1711. error: lookupError,
  1712. }
  1713. );
  1714. }
  1715. // 2. 删除所有 Session 相关的 key
  1716. const pipeline = redis.pipeline();
  1717. // 基础绑定信息
  1718. pipeline.del(`session:${sessionId}:provider`);
  1719. pipeline.del(`session:${sessionId}:key`);
  1720. pipeline.del(`session:${sessionId}:info`);
  1721. pipeline.del(`session:${sessionId}:last_seen`);
  1722. pipeline.del(`session:${sessionId}:concurrent_count`);
  1723. // 可选:messages 和 response(如果启用了存储)
  1724. pipeline.del(`session:${sessionId}:messages`);
  1725. pipeline.del(`session:${sessionId}:response`);
  1726. // 3. 从 ZSET 中移除(始终尝试,即使查询失败)
  1727. pipeline.zrem("global:active_sessions", sessionId);
  1728. if (providerId) {
  1729. pipeline.zrem(`provider:${providerId}:active_sessions`, sessionId);
  1730. }
  1731. if (keyId) {
  1732. pipeline.zrem(`key:${keyId}:active_sessions`, sessionId);
  1733. }
  1734. // 4. 删除 hash 映射(如果存在)
  1735. // 注意:无法直接反查 hash,只能清理已知的 session key
  1736. // hash 会在 TTL 后自动过期,不影响功能
  1737. const results = await pipeline.exec();
  1738. // 5. 检查结果
  1739. let deletedKeys = 0;
  1740. if (results) {
  1741. for (const [err, result] of results) {
  1742. if (!err && typeof result === "number" && result > 0) {
  1743. deletedKeys += result;
  1744. }
  1745. }
  1746. }
  1747. logger.info("SessionManager: Terminated session", {
  1748. sessionId,
  1749. providerId,
  1750. keyId,
  1751. deletedKeys,
  1752. });
  1753. return deletedKeys > 0;
  1754. } catch (error) {
  1755. logger.error("SessionManager: Failed to terminate session", {
  1756. error,
  1757. sessionId,
  1758. });
  1759. return false;
  1760. }
  1761. }
  1762. /**
  1763. * 批量终止 Session
  1764. *
  1765. * 采用分块处理策略,避免大批量操作时对 Redis 造成过大压力
  1766. *
  1767. * @param sessionIds - Session ID 列表
  1768. * @returns 成功终止的数量
  1769. */
  1770. static async terminateSessionsBatch(sessionIds: string[]): Promise<number> {
  1771. if (sessionIds.length === 0) {
  1772. return 0;
  1773. }
  1774. const redis = getRedisClient();
  1775. if (!redis || redis.status !== "ready") {
  1776. logger.warn("SessionManager: Redis not ready, cannot terminate sessions");
  1777. return 0;
  1778. }
  1779. try {
  1780. // 分块处理,每批 20 个,避免并发过高
  1781. const CHUNK_SIZE = 20;
  1782. let successCount = 0;
  1783. for (let i = 0; i < sessionIds.length; i += CHUNK_SIZE) {
  1784. const chunk = sessionIds.slice(i, i + CHUNK_SIZE);
  1785. const results = await Promise.all(
  1786. chunk.map(async (sessionId) => {
  1787. const success = await SessionManager.terminateSession(sessionId);
  1788. return success ? 1 : 0;
  1789. })
  1790. );
  1791. successCount += results.reduce<number>((sum, value) => sum + value, 0);
  1792. }
  1793. logger.info("SessionManager: Terminated sessions batch", {
  1794. total: sessionIds.length,
  1795. successCount,
  1796. });
  1797. return successCount;
  1798. } catch (error) {
  1799. logger.error("SessionManager: Failed to terminate sessions batch", {
  1800. error,
  1801. });
  1802. return 0;
  1803. }
  1804. }
  1805. }
  1806. export { headersToSanitizedObject, parseHeaderRecord };