channel_affinity.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. package service
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/QuantumNous/new-api/common"
  11. "github.com/QuantumNous/new-api/dto"
  12. "github.com/QuantumNous/new-api/pkg/cachex"
  13. "github.com/QuantumNous/new-api/setting/operation_setting"
  14. "github.com/gin-gonic/gin"
  15. "github.com/samber/hot"
  16. "github.com/tidwall/gjson"
  17. )
  18. const (
  19. ginKeyChannelAffinityCacheKey = "channel_affinity_cache_key"
  20. ginKeyChannelAffinityTTLSeconds = "channel_affinity_ttl_seconds"
  21. ginKeyChannelAffinityMeta = "channel_affinity_meta"
  22. ginKeyChannelAffinityLogInfo = "channel_affinity_log_info"
  23. ginKeyChannelAffinitySkipRetry = "channel_affinity_skip_retry_on_failure"
  24. channelAffinityCacheNamespace = "new-api:channel_affinity:v1"
  25. channelAffinityUsageCacheStatsNamespace = "new-api:channel_affinity_usage_cache_stats:v1"
  26. )
  27. var (
  28. channelAffinityCacheOnce sync.Once
  29. channelAffinityCache *cachex.HybridCache[int]
  30. channelAffinityUsageCacheStatsOnce sync.Once
  31. channelAffinityUsageCacheStatsCache *cachex.HybridCache[ChannelAffinityUsageCacheCounters]
  32. channelAffinityRegexCache sync.Map // map[string]*regexp.Regexp
  33. )
  34. type channelAffinityMeta struct {
  35. CacheKey string
  36. TTLSeconds int
  37. RuleName string
  38. SkipRetry bool
  39. KeySourceType string
  40. KeySourceKey string
  41. KeySourcePath string
  42. KeyHint string
  43. KeyFingerprint string
  44. UsingGroup string
  45. ModelName string
  46. RequestPath string
  47. }
  48. type ChannelAffinityStatsContext struct {
  49. RuleName string
  50. UsingGroup string
  51. KeyFingerprint string
  52. TTLSeconds int64
  53. }
  54. type ChannelAffinityCacheStats struct {
  55. Enabled bool `json:"enabled"`
  56. Total int `json:"total"`
  57. Unknown int `json:"unknown"`
  58. ByRuleName map[string]int `json:"by_rule_name"`
  59. CacheCapacity int `json:"cache_capacity"`
  60. CacheAlgo string `json:"cache_algo"`
  61. }
  62. func getChannelAffinityCache() *cachex.HybridCache[int] {
  63. channelAffinityCacheOnce.Do(func() {
  64. setting := operation_setting.GetChannelAffinitySetting()
  65. capacity := setting.MaxEntries
  66. if capacity <= 0 {
  67. capacity = 100_000
  68. }
  69. defaultTTLSeconds := setting.DefaultTTLSeconds
  70. if defaultTTLSeconds <= 0 {
  71. defaultTTLSeconds = 3600
  72. }
  73. channelAffinityCache = cachex.NewHybridCache[int](cachex.HybridCacheConfig[int]{
  74. Namespace: cachex.Namespace(channelAffinityCacheNamespace),
  75. Redis: common.RDB,
  76. RedisEnabled: func() bool {
  77. return common.RedisEnabled && common.RDB != nil
  78. },
  79. RedisCodec: cachex.IntCodec{},
  80. Memory: func() *hot.HotCache[string, int] {
  81. return hot.NewHotCache[string, int](hot.LRU, capacity).
  82. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  83. WithJanitor().
  84. Build()
  85. },
  86. })
  87. })
  88. return channelAffinityCache
  89. }
  90. func GetChannelAffinityCacheStats() ChannelAffinityCacheStats {
  91. setting := operation_setting.GetChannelAffinitySetting()
  92. if setting == nil {
  93. return ChannelAffinityCacheStats{
  94. Enabled: false,
  95. Total: 0,
  96. Unknown: 0,
  97. ByRuleName: map[string]int{},
  98. }
  99. }
  100. cache := getChannelAffinityCache()
  101. mainCap, _ := cache.Capacity()
  102. mainAlgo, _ := cache.Algorithm()
  103. rules := setting.Rules
  104. ruleByName := make(map[string]operation_setting.ChannelAffinityRule, len(rules))
  105. for _, r := range rules {
  106. name := strings.TrimSpace(r.Name)
  107. if name == "" {
  108. continue
  109. }
  110. if !r.IncludeRuleName {
  111. continue
  112. }
  113. ruleByName[name] = r
  114. }
  115. byRuleName := make(map[string]int, len(ruleByName))
  116. for name := range ruleByName {
  117. byRuleName[name] = 0
  118. }
  119. keys, err := cache.Keys()
  120. if err != nil {
  121. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  122. keys = nil
  123. }
  124. total := len(keys)
  125. unknown := 0
  126. for _, k := range keys {
  127. prefix := channelAffinityCacheNamespace + ":"
  128. if !strings.HasPrefix(k, prefix) {
  129. unknown++
  130. continue
  131. }
  132. rest := strings.TrimPrefix(k, prefix)
  133. parts := strings.Split(rest, ":")
  134. if len(parts) < 2 {
  135. unknown++
  136. continue
  137. }
  138. ruleName := parts[0]
  139. rule, ok := ruleByName[ruleName]
  140. if !ok {
  141. unknown++
  142. continue
  143. }
  144. if rule.IncludeUsingGroup {
  145. if len(parts) < 3 {
  146. unknown++
  147. continue
  148. }
  149. }
  150. byRuleName[ruleName]++
  151. }
  152. return ChannelAffinityCacheStats{
  153. Enabled: setting.Enabled,
  154. Total: total,
  155. Unknown: unknown,
  156. ByRuleName: byRuleName,
  157. CacheCapacity: mainCap,
  158. CacheAlgo: mainAlgo,
  159. }
  160. }
  161. func ClearChannelAffinityCacheAll() int {
  162. cache := getChannelAffinityCache()
  163. keys, err := cache.Keys()
  164. if err != nil {
  165. common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
  166. keys = nil
  167. }
  168. if len(keys) > 0 {
  169. if _, err := cache.DeleteMany(keys); err != nil {
  170. common.SysError(fmt.Sprintf("channel affinity cache delete many failed: err=%v", err))
  171. }
  172. }
  173. return len(keys)
  174. }
  175. func ClearChannelAffinityCacheByRuleName(ruleName string) (int, error) {
  176. ruleName = strings.TrimSpace(ruleName)
  177. if ruleName == "" {
  178. return 0, fmt.Errorf("rule_name 不能为空")
  179. }
  180. setting := operation_setting.GetChannelAffinitySetting()
  181. if setting == nil {
  182. return 0, fmt.Errorf("channel_affinity_setting 未初始化")
  183. }
  184. var matchedRule *operation_setting.ChannelAffinityRule
  185. for i := range setting.Rules {
  186. r := &setting.Rules[i]
  187. if strings.TrimSpace(r.Name) != ruleName {
  188. continue
  189. }
  190. matchedRule = r
  191. break
  192. }
  193. if matchedRule == nil {
  194. return 0, fmt.Errorf("未知规则名称")
  195. }
  196. if !matchedRule.IncludeRuleName {
  197. return 0, fmt.Errorf("该规则未启用 include_rule_name,无法按规则清空缓存")
  198. }
  199. cache := getChannelAffinityCache()
  200. deleted, err := cache.DeleteByPrefix(ruleName)
  201. if err != nil {
  202. return 0, err
  203. }
  204. return deleted, nil
  205. }
  206. func matchAnyRegexCached(patterns []string, s string) bool {
  207. if len(patterns) == 0 || s == "" {
  208. return false
  209. }
  210. for _, pattern := range patterns {
  211. if pattern == "" {
  212. continue
  213. }
  214. re, ok := channelAffinityRegexCache.Load(pattern)
  215. if !ok {
  216. compiled, err := regexp.Compile(pattern)
  217. if err != nil {
  218. continue
  219. }
  220. re = compiled
  221. channelAffinityRegexCache.Store(pattern, re)
  222. }
  223. if re.(*regexp.Regexp).MatchString(s) {
  224. return true
  225. }
  226. }
  227. return false
  228. }
  229. func matchAnyIncludeFold(patterns []string, s string) bool {
  230. if len(patterns) == 0 || s == "" {
  231. return false
  232. }
  233. sLower := strings.ToLower(s)
  234. for _, p := range patterns {
  235. p = strings.TrimSpace(p)
  236. if p == "" {
  237. continue
  238. }
  239. if strings.Contains(sLower, strings.ToLower(p)) {
  240. return true
  241. }
  242. }
  243. return false
  244. }
  245. func extractChannelAffinityValue(c *gin.Context, src operation_setting.ChannelAffinityKeySource) string {
  246. switch src.Type {
  247. case "context_int":
  248. if src.Key == "" {
  249. return ""
  250. }
  251. v := c.GetInt(src.Key)
  252. if v <= 0 {
  253. return ""
  254. }
  255. return strconv.Itoa(v)
  256. case "context_string":
  257. if src.Key == "" {
  258. return ""
  259. }
  260. return strings.TrimSpace(c.GetString(src.Key))
  261. case "gjson":
  262. if src.Path == "" {
  263. return ""
  264. }
  265. storage, err := common.GetBodyStorage(c)
  266. if err != nil {
  267. return ""
  268. }
  269. body, err := storage.Bytes()
  270. if err != nil || len(body) == 0 {
  271. return ""
  272. }
  273. res := gjson.GetBytes(body, src.Path)
  274. if !res.Exists() {
  275. return ""
  276. }
  277. switch res.Type {
  278. case gjson.String, gjson.Number, gjson.True, gjson.False:
  279. return strings.TrimSpace(res.String())
  280. default:
  281. return strings.TrimSpace(res.Raw)
  282. }
  283. default:
  284. return ""
  285. }
  286. }
  287. func buildChannelAffinityCacheKeySuffix(rule operation_setting.ChannelAffinityRule, usingGroup string, affinityValue string) string {
  288. parts := make([]string, 0, 3)
  289. if rule.IncludeRuleName && rule.Name != "" {
  290. parts = append(parts, rule.Name)
  291. }
  292. if rule.IncludeUsingGroup && usingGroup != "" {
  293. parts = append(parts, usingGroup)
  294. }
  295. parts = append(parts, affinityValue)
  296. return strings.Join(parts, ":")
  297. }
  298. func setChannelAffinityContext(c *gin.Context, meta channelAffinityMeta) {
  299. c.Set(ginKeyChannelAffinityCacheKey, meta.CacheKey)
  300. c.Set(ginKeyChannelAffinityTTLSeconds, meta.TTLSeconds)
  301. c.Set(ginKeyChannelAffinityMeta, meta)
  302. }
  303. func getChannelAffinityContext(c *gin.Context) (string, int, bool) {
  304. keyAny, ok := c.Get(ginKeyChannelAffinityCacheKey)
  305. if !ok {
  306. return "", 0, false
  307. }
  308. key, ok := keyAny.(string)
  309. if !ok || key == "" {
  310. return "", 0, false
  311. }
  312. ttlAny, ok := c.Get(ginKeyChannelAffinityTTLSeconds)
  313. if !ok {
  314. return key, 0, true
  315. }
  316. ttlSeconds, _ := ttlAny.(int)
  317. return key, ttlSeconds, true
  318. }
  319. func getChannelAffinityMeta(c *gin.Context) (channelAffinityMeta, bool) {
  320. anyMeta, ok := c.Get(ginKeyChannelAffinityMeta)
  321. if !ok {
  322. return channelAffinityMeta{}, false
  323. }
  324. meta, ok := anyMeta.(channelAffinityMeta)
  325. if !ok {
  326. return channelAffinityMeta{}, false
  327. }
  328. return meta, true
  329. }
  330. func GetChannelAffinityStatsContext(c *gin.Context) (ChannelAffinityStatsContext, bool) {
  331. if c == nil {
  332. return ChannelAffinityStatsContext{}, false
  333. }
  334. meta, ok := getChannelAffinityMeta(c)
  335. if !ok {
  336. return ChannelAffinityStatsContext{}, false
  337. }
  338. ruleName := strings.TrimSpace(meta.RuleName)
  339. keyFp := strings.TrimSpace(meta.KeyFingerprint)
  340. usingGroup := strings.TrimSpace(meta.UsingGroup)
  341. if ruleName == "" || keyFp == "" {
  342. return ChannelAffinityStatsContext{}, false
  343. }
  344. ttlSeconds := int64(meta.TTLSeconds)
  345. if ttlSeconds <= 0 {
  346. return ChannelAffinityStatsContext{}, false
  347. }
  348. return ChannelAffinityStatsContext{
  349. RuleName: ruleName,
  350. UsingGroup: usingGroup,
  351. KeyFingerprint: keyFp,
  352. TTLSeconds: ttlSeconds,
  353. }, true
  354. }
  355. func affinityFingerprint(s string) string {
  356. if s == "" {
  357. return ""
  358. }
  359. hex := common.Sha1([]byte(s))
  360. if len(hex) >= 8 {
  361. return hex[:8]
  362. }
  363. return hex
  364. }
  365. func buildChannelAffinityKeyHint(s string) string {
  366. s = strings.TrimSpace(s)
  367. if s == "" {
  368. return ""
  369. }
  370. s = strings.ReplaceAll(s, "\n", " ")
  371. s = strings.ReplaceAll(s, "\r", " ")
  372. if len(s) <= 12 {
  373. return s
  374. }
  375. return s[:4] + "..." + s[len(s)-4:]
  376. }
  377. func GetPreferredChannelByAffinity(c *gin.Context, modelName string, usingGroup string) (int, bool) {
  378. setting := operation_setting.GetChannelAffinitySetting()
  379. if setting == nil || !setting.Enabled {
  380. return 0, false
  381. }
  382. path := ""
  383. if c != nil && c.Request != nil && c.Request.URL != nil {
  384. path = c.Request.URL.Path
  385. }
  386. userAgent := ""
  387. if c != nil && c.Request != nil {
  388. userAgent = c.Request.UserAgent()
  389. }
  390. for _, rule := range setting.Rules {
  391. if !matchAnyRegexCached(rule.ModelRegex, modelName) {
  392. continue
  393. }
  394. if len(rule.PathRegex) > 0 && !matchAnyRegexCached(rule.PathRegex, path) {
  395. continue
  396. }
  397. if len(rule.UserAgentInclude) > 0 && !matchAnyIncludeFold(rule.UserAgentInclude, userAgent) {
  398. continue
  399. }
  400. var affinityValue string
  401. var usedSource operation_setting.ChannelAffinityKeySource
  402. for _, src := range rule.KeySources {
  403. affinityValue = extractChannelAffinityValue(c, src)
  404. if affinityValue != "" {
  405. usedSource = src
  406. break
  407. }
  408. }
  409. if affinityValue == "" {
  410. continue
  411. }
  412. if rule.ValueRegex != "" && !matchAnyRegexCached([]string{rule.ValueRegex}, affinityValue) {
  413. continue
  414. }
  415. ttlSeconds := rule.TTLSeconds
  416. if ttlSeconds <= 0 {
  417. ttlSeconds = setting.DefaultTTLSeconds
  418. }
  419. cacheKeySuffix := buildChannelAffinityCacheKeySuffix(rule, usingGroup, affinityValue)
  420. cacheKeyFull := channelAffinityCacheNamespace + ":" + cacheKeySuffix
  421. setChannelAffinityContext(c, channelAffinityMeta{
  422. CacheKey: cacheKeyFull,
  423. TTLSeconds: ttlSeconds,
  424. RuleName: rule.Name,
  425. SkipRetry: rule.SkipRetryOnFailure,
  426. KeySourceType: strings.TrimSpace(usedSource.Type),
  427. KeySourceKey: strings.TrimSpace(usedSource.Key),
  428. KeySourcePath: strings.TrimSpace(usedSource.Path),
  429. KeyHint: buildChannelAffinityKeyHint(affinityValue),
  430. KeyFingerprint: affinityFingerprint(affinityValue),
  431. UsingGroup: usingGroup,
  432. ModelName: modelName,
  433. RequestPath: path,
  434. })
  435. cache := getChannelAffinityCache()
  436. channelID, found, err := cache.Get(cacheKeySuffix)
  437. if err != nil {
  438. common.SysError(fmt.Sprintf("channel affinity cache get failed: key=%s, err=%v", cacheKeyFull, err))
  439. return 0, false
  440. }
  441. if found {
  442. return channelID, true
  443. }
  444. return 0, false
  445. }
  446. return 0, false
  447. }
  448. func ShouldSkipRetryAfterChannelAffinityFailure(c *gin.Context) bool {
  449. if c == nil {
  450. return false
  451. }
  452. v, ok := c.Get(ginKeyChannelAffinitySkipRetry)
  453. if !ok {
  454. return false
  455. }
  456. b, ok := v.(bool)
  457. if !ok {
  458. return false
  459. }
  460. return b
  461. }
  462. func MarkChannelAffinityUsed(c *gin.Context, selectedGroup string, channelID int) {
  463. if c == nil || channelID <= 0 {
  464. return
  465. }
  466. meta, ok := getChannelAffinityMeta(c)
  467. if !ok {
  468. return
  469. }
  470. c.Set(ginKeyChannelAffinitySkipRetry, meta.SkipRetry)
  471. info := map[string]interface{}{
  472. "reason": meta.RuleName,
  473. "rule_name": meta.RuleName,
  474. "using_group": meta.UsingGroup,
  475. "selected_group": selectedGroup,
  476. "model": meta.ModelName,
  477. "request_path": meta.RequestPath,
  478. "channel_id": channelID,
  479. "key_source": meta.KeySourceType,
  480. "key_key": meta.KeySourceKey,
  481. "key_path": meta.KeySourcePath,
  482. "key_hint": meta.KeyHint,
  483. "key_fp": meta.KeyFingerprint,
  484. }
  485. c.Set(ginKeyChannelAffinityLogInfo, info)
  486. }
  487. func AppendChannelAffinityAdminInfo(c *gin.Context, adminInfo map[string]interface{}) {
  488. if c == nil || adminInfo == nil {
  489. return
  490. }
  491. anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo)
  492. if !ok || anyInfo == nil {
  493. return
  494. }
  495. adminInfo["channel_affinity"] = anyInfo
  496. }
  497. func RecordChannelAffinity(c *gin.Context, channelID int) {
  498. if channelID <= 0 {
  499. return
  500. }
  501. setting := operation_setting.GetChannelAffinitySetting()
  502. if setting == nil || !setting.Enabled {
  503. return
  504. }
  505. if setting.SwitchOnSuccess && c != nil {
  506. if successChannelID := c.GetInt("channel_id"); successChannelID > 0 {
  507. channelID = successChannelID
  508. }
  509. }
  510. cacheKey, ttlSeconds, ok := getChannelAffinityContext(c)
  511. if !ok {
  512. return
  513. }
  514. if ttlSeconds <= 0 {
  515. ttlSeconds = setting.DefaultTTLSeconds
  516. }
  517. if ttlSeconds <= 0 {
  518. ttlSeconds = 3600
  519. }
  520. cache := getChannelAffinityCache()
  521. if err := cache.SetWithTTL(cacheKey, channelID, time.Duration(ttlSeconds)*time.Second); err != nil {
  522. common.SysError(fmt.Sprintf("channel affinity cache set failed: key=%s, err=%v", cacheKey, err))
  523. }
  524. }
  525. type ChannelAffinityUsageCacheStats struct {
  526. RuleName string `json:"rule_name"`
  527. UsingGroup string `json:"using_group"`
  528. KeyFingerprint string `json:"key_fp"`
  529. Hit int64 `json:"hit"`
  530. Total int64 `json:"total"`
  531. WindowSeconds int64 `json:"window_seconds"`
  532. PromptTokens int64 `json:"prompt_tokens"`
  533. CompletionTokens int64 `json:"completion_tokens"`
  534. TotalTokens int64 `json:"total_tokens"`
  535. CachedTokens int64 `json:"cached_tokens"`
  536. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  537. LastSeenAt int64 `json:"last_seen_at"`
  538. }
  539. type ChannelAffinityUsageCacheCounters struct {
  540. Hit int64 `json:"hit"`
  541. Total int64 `json:"total"`
  542. WindowSeconds int64 `json:"window_seconds"`
  543. PromptTokens int64 `json:"prompt_tokens"`
  544. CompletionTokens int64 `json:"completion_tokens"`
  545. TotalTokens int64 `json:"total_tokens"`
  546. CachedTokens int64 `json:"cached_tokens"`
  547. PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
  548. LastSeenAt int64 `json:"last_seen_at"`
  549. }
  550. var channelAffinityUsageCacheStatsLocks [64]sync.Mutex
  551. func ObserveChannelAffinityUsageCacheFromContext(c *gin.Context, usage *dto.Usage) {
  552. statsCtx, ok := GetChannelAffinityStatsContext(c)
  553. if !ok {
  554. return
  555. }
  556. observeChannelAffinityUsageCache(statsCtx, usage)
  557. }
  558. func GetChannelAffinityUsageCacheStats(ruleName, usingGroup, keyFp string) ChannelAffinityUsageCacheStats {
  559. ruleName = strings.TrimSpace(ruleName)
  560. usingGroup = strings.TrimSpace(usingGroup)
  561. keyFp = strings.TrimSpace(keyFp)
  562. entryKey := channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp)
  563. if entryKey == "" {
  564. return ChannelAffinityUsageCacheStats{
  565. RuleName: ruleName,
  566. UsingGroup: usingGroup,
  567. KeyFingerprint: keyFp,
  568. }
  569. }
  570. cache := getChannelAffinityUsageCacheStatsCache()
  571. v, found, err := cache.Get(entryKey)
  572. if err != nil || !found {
  573. return ChannelAffinityUsageCacheStats{
  574. RuleName: ruleName,
  575. UsingGroup: usingGroup,
  576. KeyFingerprint: keyFp,
  577. }
  578. }
  579. return ChannelAffinityUsageCacheStats{
  580. RuleName: ruleName,
  581. UsingGroup: usingGroup,
  582. KeyFingerprint: keyFp,
  583. Hit: v.Hit,
  584. Total: v.Total,
  585. WindowSeconds: v.WindowSeconds,
  586. PromptTokens: v.PromptTokens,
  587. CompletionTokens: v.CompletionTokens,
  588. TotalTokens: v.TotalTokens,
  589. CachedTokens: v.CachedTokens,
  590. PromptCacheHitTokens: v.PromptCacheHitTokens,
  591. LastSeenAt: v.LastSeenAt,
  592. }
  593. }
  594. func observeChannelAffinityUsageCache(statsCtx ChannelAffinityStatsContext, usage *dto.Usage) {
  595. entryKey := channelAffinityUsageCacheEntryKey(statsCtx.RuleName, statsCtx.UsingGroup, statsCtx.KeyFingerprint)
  596. if entryKey == "" {
  597. return
  598. }
  599. windowSeconds := statsCtx.TTLSeconds
  600. if windowSeconds <= 0 {
  601. return
  602. }
  603. cache := getChannelAffinityUsageCacheStatsCache()
  604. ttl := time.Duration(windowSeconds) * time.Second
  605. lock := channelAffinityUsageCacheStatsLock(entryKey)
  606. lock.Lock()
  607. defer lock.Unlock()
  608. prev, found, err := cache.Get(entryKey)
  609. if err != nil {
  610. return
  611. }
  612. next := prev
  613. if !found {
  614. next = ChannelAffinityUsageCacheCounters{}
  615. }
  616. next.Total++
  617. hit, cachedTokens, promptCacheHitTokens := usageCacheSignals(usage)
  618. if hit {
  619. next.Hit++
  620. }
  621. next.WindowSeconds = windowSeconds
  622. next.LastSeenAt = time.Now().Unix()
  623. next.CachedTokens += cachedTokens
  624. next.PromptCacheHitTokens += promptCacheHitTokens
  625. next.PromptTokens += int64(usagePromptTokens(usage))
  626. next.CompletionTokens += int64(usageCompletionTokens(usage))
  627. next.TotalTokens += int64(usageTotalTokens(usage))
  628. _ = cache.SetWithTTL(entryKey, next, ttl)
  629. }
  630. func channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp string) string {
  631. ruleName = strings.TrimSpace(ruleName)
  632. usingGroup = strings.TrimSpace(usingGroup)
  633. keyFp = strings.TrimSpace(keyFp)
  634. if ruleName == "" || keyFp == "" {
  635. return ""
  636. }
  637. return ruleName + "\n" + usingGroup + "\n" + keyFp
  638. }
  639. func usageCacheSignals(usage *dto.Usage) (hit bool, cachedTokens int64, promptCacheHitTokens int64) {
  640. if usage == nil {
  641. return false, 0, 0
  642. }
  643. cached := int64(0)
  644. if usage.PromptTokensDetails.CachedTokens > 0 {
  645. cached = int64(usage.PromptTokensDetails.CachedTokens)
  646. } else if usage.InputTokensDetails != nil && usage.InputTokensDetails.CachedTokens > 0 {
  647. cached = int64(usage.InputTokensDetails.CachedTokens)
  648. }
  649. pcht := int64(0)
  650. if usage.PromptCacheHitTokens > 0 {
  651. pcht = int64(usage.PromptCacheHitTokens)
  652. }
  653. return cached > 0 || pcht > 0, cached, pcht
  654. }
  655. func usagePromptTokens(usage *dto.Usage) int {
  656. if usage == nil {
  657. return 0
  658. }
  659. if usage.PromptTokens > 0 {
  660. return usage.PromptTokens
  661. }
  662. return usage.InputTokens
  663. }
  664. func usageCompletionTokens(usage *dto.Usage) int {
  665. if usage == nil {
  666. return 0
  667. }
  668. if usage.CompletionTokens > 0 {
  669. return usage.CompletionTokens
  670. }
  671. return usage.OutputTokens
  672. }
  673. func usageTotalTokens(usage *dto.Usage) int {
  674. if usage == nil {
  675. return 0
  676. }
  677. if usage.TotalTokens > 0 {
  678. return usage.TotalTokens
  679. }
  680. pt := usagePromptTokens(usage)
  681. ct := usageCompletionTokens(usage)
  682. if pt > 0 || ct > 0 {
  683. return pt + ct
  684. }
  685. return 0
  686. }
  687. func getChannelAffinityUsageCacheStatsCache() *cachex.HybridCache[ChannelAffinityUsageCacheCounters] {
  688. channelAffinityUsageCacheStatsOnce.Do(func() {
  689. setting := operation_setting.GetChannelAffinitySetting()
  690. capacity := 100_000
  691. defaultTTLSeconds := 3600
  692. if setting != nil {
  693. if setting.MaxEntries > 0 {
  694. capacity = setting.MaxEntries
  695. }
  696. if setting.DefaultTTLSeconds > 0 {
  697. defaultTTLSeconds = setting.DefaultTTLSeconds
  698. }
  699. }
  700. channelAffinityUsageCacheStatsCache = cachex.NewHybridCache[ChannelAffinityUsageCacheCounters](cachex.HybridCacheConfig[ChannelAffinityUsageCacheCounters]{
  701. Namespace: cachex.Namespace(channelAffinityUsageCacheStatsNamespace),
  702. Redis: common.RDB,
  703. RedisEnabled: func() bool {
  704. return common.RedisEnabled && common.RDB != nil
  705. },
  706. RedisCodec: cachex.JSONCodec[ChannelAffinityUsageCacheCounters]{},
  707. Memory: func() *hot.HotCache[string, ChannelAffinityUsageCacheCounters] {
  708. return hot.NewHotCache[string, ChannelAffinityUsageCacheCounters](hot.LRU, capacity).
  709. WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
  710. WithJanitor().
  711. Build()
  712. },
  713. })
  714. })
  715. return channelAffinityUsageCacheStatsCache
  716. }
  717. func channelAffinityUsageCacheStatsLock(key string) *sync.Mutex {
  718. h := fnv.New32a()
  719. _, _ = h.Write([]byte(key))
  720. idx := h.Sum32() % uint32(len(channelAffinityUsageCacheStatsLocks))
  721. return &channelAffinityUsageCacheStatsLocks[idx]
  722. }