cache.go 28 KB


  1. package model
  2. import (
  3. "context"
  4. "encoding"
  5. "errors"
  6. "fmt"
  7. "math/rand/v2"
  8. "slices"
  9. "sort"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/bytedance/sonic"
  14. "github.com/labring/aiproxy/core/common"
  15. "github.com/labring/aiproxy/core/common/config"
  16. "github.com/labring/aiproxy/core/common/conv"
  17. "github.com/labring/aiproxy/core/common/notify"
  18. "github.com/maruel/natural"
  19. "github.com/redis/go-redis/v9"
  20. log "github.com/sirupsen/logrus"
  21. )
  22. const (
  23. SyncFrequency = time.Minute * 3
  24. TokenCacheKey = "token:%s"
  25. GroupCacheKey = "group:%s"
  26. GroupModelTPMKey = "group:%s:model_tpm"
  27. )
  28. var (
  29. _ encoding.BinaryMarshaler = (*redisStringSlice)(nil)
  30. _ redis.Scanner = (*redisStringSlice)(nil)
  31. )
  32. type redisStringSlice []string
  33. func (r *redisStringSlice) ScanRedis(value string) error {
  34. return sonic.Unmarshal(conv.StringToBytes(value), r)
  35. }
  36. func (r redisStringSlice) MarshalBinary() ([]byte, error) {
  37. return sonic.Marshal(r)
  38. }
  39. type redisTime time.Time
  40. var (
  41. _ redis.Scanner = (*redisTime)(nil)
  42. _ encoding.BinaryMarshaler = (*redisTime)(nil)
  43. )
  44. func (t *redisTime) ScanRedis(value string) error {
  45. return (*time.Time)(t).UnmarshalBinary(conv.StringToBytes(value))
  46. }
  47. func (t redisTime) MarshalBinary() ([]byte, error) {
  48. return time.Time(t).MarshalBinary()
  49. }
  50. type TokenCache struct {
  51. ExpiredAt redisTime `json:"expired_at" redis:"e"`
  52. Group string `json:"group" redis:"g"`
  53. Key string `json:"-" redis:"-"`
  54. Name string `json:"name" redis:"n"`
  55. Subnets redisStringSlice `json:"subnets" redis:"s"`
  56. Models redisStringSlice `json:"models" redis:"m"`
  57. ID int `json:"id" redis:"i"`
  58. Status int `json:"status" redis:"st"`
  59. Quota float64 `json:"quota" redis:"q"`
  60. UsedAmount float64 `json:"used_amount" redis:"u"`
  61. availableSets []string
  62. modelsBySet map[string][]string
  63. }
  64. func (t *TokenCache) SetAvailableSets(availableSets []string) {
  65. t.availableSets = availableSets
  66. }
  67. func (t *TokenCache) SetModelsBySet(modelsBySet map[string][]string) {
  68. t.modelsBySet = modelsBySet
  69. }
  70. func (t *TokenCache) ContainsModel(model string) bool {
  71. if len(t.Models) != 0 {
  72. if !slices.Contains(t.Models, model) {
  73. return false
  74. }
  75. }
  76. return containsModel(model, t.availableSets, t.modelsBySet)
  77. }
  78. func containsModel(model string, sets []string, modelsBySet map[string][]string) bool {
  79. for _, set := range sets {
  80. if slices.Contains(modelsBySet[set], model) {
  81. return true
  82. }
  83. }
  84. return false
  85. }
  86. func (t *TokenCache) Range(fn func(model string) bool) {
  87. ranged := make(map[string]struct{})
  88. if len(t.Models) != 0 {
  89. for _, model := range t.Models {
  90. if _, ok := ranged[model]; !ok && containsModel(model, t.availableSets, t.modelsBySet) {
  91. if !fn(model) {
  92. return
  93. }
  94. }
  95. ranged[model] = struct{}{}
  96. }
  97. return
  98. }
  99. for _, set := range t.availableSets {
  100. for _, model := range t.modelsBySet[set] {
  101. if _, ok := ranged[model]; !ok {
  102. if !fn(model) {
  103. return
  104. }
  105. }
  106. ranged[model] = struct{}{}
  107. }
  108. }
  109. }
  110. func (t *Token) ToTokenCache() *TokenCache {
  111. return &TokenCache{
  112. ID: t.ID,
  113. Group: t.GroupID,
  114. Key: t.Key,
  115. Name: t.Name.String(),
  116. Models: t.Models,
  117. Subnets: t.Subnets,
  118. Status: t.Status,
  119. ExpiredAt: redisTime(t.ExpiredAt),
  120. Quota: t.Quota,
  121. UsedAmount: t.UsedAmount,
  122. }
  123. }
  124. func CacheDeleteToken(key string) error {
  125. if !common.RedisEnabled {
  126. return nil
  127. }
  128. return common.RedisDel(fmt.Sprintf(TokenCacheKey, key))
  129. }
  130. func CacheSetToken(token *TokenCache) error {
  131. if !common.RedisEnabled {
  132. return nil
  133. }
  134. key := fmt.Sprintf(TokenCacheKey, token.Key)
  135. pipe := common.RDB.Pipeline()
  136. pipe.HSet(context.Background(), key, token)
  137. expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
  138. pipe.Expire(context.Background(), key, expireTime)
  139. _, err := pipe.Exec(context.Background())
  140. return err
  141. }
  142. func CacheGetTokenByKey(key string) (*TokenCache, error) {
  143. if !common.RedisEnabled {
  144. token, err := GetTokenByKey(key)
  145. if err != nil {
  146. return nil, err
  147. }
  148. return token.ToTokenCache(), nil
  149. }
  150. cacheKey := fmt.Sprintf(TokenCacheKey, key)
  151. tokenCache := &TokenCache{}
  152. err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(tokenCache)
  153. if err == nil && tokenCache.ID != 0 {
  154. tokenCache.Key = key
  155. return tokenCache, nil
  156. } else if err != nil && !errors.Is(err, redis.Nil) {
  157. log.Errorf("get token (%s) from redis error: %s", key, err.Error())
  158. }
  159. token, err := GetTokenByKey(key)
  160. if err != nil {
  161. return nil, err
  162. }
  163. tc := token.ToTokenCache()
  164. if err := CacheSetToken(tc); err != nil {
  165. log.Error("redis set token error: " + err.Error())
  166. }
  167. return tc, nil
  168. }
  169. var updateTokenUsedAmountOnlyIncreaseScript = redis.NewScript(`
  170. local used_amount = redis.call("HGet", KEYS[1], "ua")
  171. if used_amount == false then
  172. return redis.status_reply("ok")
  173. end
  174. if ARGV[1] < used_amount then
  175. return redis.status_reply("ok")
  176. end
  177. redis.call("HSet", KEYS[1], "ua", ARGV[1])
  178. return redis.status_reply("ok")
  179. `)
  180. func CacheUpdateTokenUsedAmountOnlyIncrease(key string, amount float64) error {
  181. if !common.RedisEnabled {
  182. return nil
  183. }
  184. return updateTokenUsedAmountOnlyIncreaseScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(TokenCacheKey, key)}, amount).
  185. Err()
  186. }
  187. var updateTokenNameScript = redis.NewScript(`
  188. if redis.call("HExists", KEYS[1], "n") then
  189. redis.call("HSet", KEYS[1], "n", ARGV[1])
  190. end
  191. return redis.status_reply("ok")
  192. `)
  193. func CacheUpdateTokenName(key, name string) error {
  194. if !common.RedisEnabled {
  195. return nil
  196. }
  197. return updateTokenNameScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(TokenCacheKey, key)}, name).
  198. Err()
  199. }
  200. var updateTokenStatusScript = redis.NewScript(`
  201. if redis.call("HExists", KEYS[1], "st") then
  202. redis.call("HSet", KEYS[1], "st", ARGV[1])
  203. end
  204. return redis.status_reply("ok")
  205. `)
  206. func CacheUpdateTokenStatus(key string, status int) error {
  207. if !common.RedisEnabled {
  208. return nil
  209. }
  210. return updateTokenStatusScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(TokenCacheKey, key)}, status).
  211. Err()
  212. }
  213. type redisMap[K comparable, V any] map[K]V
  214. var (
  215. _ redis.Scanner = (*redisMap[string, any])(nil)
  216. _ encoding.BinaryMarshaler = (*redisMap[string, any])(nil)
  217. )
  218. func (r *redisMap[K, V]) ScanRedis(value string) error {
  219. return sonic.UnmarshalString(value, r)
  220. }
  221. func (r redisMap[K, V]) MarshalBinary() ([]byte, error) {
  222. return sonic.Marshal(r)
  223. }
  224. type (
  225. redisGroupModelConfigMap = redisMap[string, GroupModelConfig]
  226. )
  227. type GroupCache struct {
  228. ID string `json:"-" redis:"-"`
  229. Status int `json:"status" redis:"st"`
  230. UsedAmount float64 `json:"used_amount" redis:"ua"`
  231. RPMRatio float64 `json:"rpm_ratio" redis:"rpm_r"`
  232. TPMRatio float64 `json:"tpm_ratio" redis:"tpm_r"`
  233. AvailableSets redisStringSlice `json:"available_sets" redis:"ass"`
  234. ModelConfigs redisGroupModelConfigMap `json:"model_configs" redis:"mc"`
  235. BalanceAlertEnabled bool `json:"balance_alert_enabled" redis:"bae"`
  236. BalanceAlertThreshold float64 `json:"balance_alert_threshold" redis:"bat"`
  237. }
  238. func (g *GroupCache) GetAvailableSets() []string {
  239. if len(g.AvailableSets) == 0 {
  240. return []string{ChannelDefaultSet}
  241. }
  242. return g.AvailableSets
  243. }
  244. func (g *Group) ToGroupCache() *GroupCache {
  245. modelConfigs := make(redisGroupModelConfigMap, len(g.GroupModelConfigs))
  246. for _, modelConfig := range g.GroupModelConfigs {
  247. modelConfigs[modelConfig.Model] = modelConfig
  248. }
  249. return &GroupCache{
  250. ID: g.ID,
  251. Status: g.Status,
  252. UsedAmount: g.UsedAmount,
  253. RPMRatio: g.RPMRatio,
  254. TPMRatio: g.TPMRatio,
  255. AvailableSets: g.AvailableSets,
  256. ModelConfigs: modelConfigs,
  257. BalanceAlertEnabled: g.BalanceAlertEnabled,
  258. BalanceAlertThreshold: g.BalanceAlertThreshold,
  259. }
  260. }
  261. func CacheDeleteGroup(id string) error {
  262. if !common.RedisEnabled {
  263. return nil
  264. }
  265. return common.RedisDel(fmt.Sprintf(GroupCacheKey, id))
  266. }
  267. var updateGroupRPMRatioScript = redis.NewScript(`
  268. if redis.call("HExists", KEYS[1], "rpm_r") then
  269. redis.call("HSet", KEYS[1], "rpm_r", ARGV[1])
  270. end
  271. return redis.status_reply("ok")
  272. `)
  273. func CacheUpdateGroupRPMRatio(id string, rpmRatio float64) error {
  274. if !common.RedisEnabled {
  275. return nil
  276. }
  277. return updateGroupRPMRatioScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(GroupCacheKey, id)}, rpmRatio).
  278. Err()
  279. }
  280. var updateGroupTPMRatioScript = redis.NewScript(`
  281. if redis.call("HExists", KEYS[1], "tpm_r") then
  282. redis.call("HSet", KEYS[1], "tpm_r", ARGV[1])
  283. end
  284. return redis.status_reply("ok")
  285. `)
  286. func CacheUpdateGroupTPMRatio(id string, tpmRatio float64) error {
  287. if !common.RedisEnabled {
  288. return nil
  289. }
  290. return updateGroupTPMRatioScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(GroupCacheKey, id)}, tpmRatio).
  291. Err()
  292. }
  293. var updateGroupStatusScript = redis.NewScript(`
  294. if redis.call("HExists", KEYS[1], "st") then
  295. redis.call("HSet", KEYS[1], "st", ARGV[1])
  296. end
  297. return redis.status_reply("ok")
  298. `)
  299. func CacheUpdateGroupStatus(id string, status int) error {
  300. if !common.RedisEnabled {
  301. return nil
  302. }
  303. return updateGroupStatusScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(GroupCacheKey, id)}, status).
  304. Err()
  305. }
  306. func CacheSetGroup(group *GroupCache) error {
  307. if !common.RedisEnabled {
  308. return nil
  309. }
  310. key := fmt.Sprintf(GroupCacheKey, group.ID)
  311. pipe := common.RDB.Pipeline()
  312. pipe.HSet(context.Background(), key, group)
  313. expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
  314. pipe.Expire(context.Background(), key, expireTime)
  315. _, err := pipe.Exec(context.Background())
  316. return err
  317. }
  318. func CacheGetGroup(id string) (*GroupCache, error) {
  319. if !common.RedisEnabled {
  320. group, err := GetGroupByID(id, true)
  321. if err != nil {
  322. return nil, err
  323. }
  324. return group.ToGroupCache(), nil
  325. }
  326. cacheKey := fmt.Sprintf(GroupCacheKey, id)
  327. groupCache := &GroupCache{}
  328. err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(groupCache)
  329. if err == nil && groupCache.Status != 0 {
  330. groupCache.ID = id
  331. return groupCache, nil
  332. } else if err != nil && !errors.Is(err, redis.Nil) {
  333. log.Errorf("get group (%s) from redis error: %s", id, err.Error())
  334. }
  335. group, err := GetGroupByID(id, true)
  336. if err != nil {
  337. return nil, err
  338. }
  339. gc := group.ToGroupCache()
  340. if err := CacheSetGroup(gc); err != nil {
  341. log.Error("redis set group error: " + err.Error())
  342. }
  343. return gc, nil
  344. }
  345. var updateGroupUsedAmountOnlyIncreaseScript = redis.NewScript(`
  346. local used_amount = redis.call("HGet", KEYS[1], "ua")
  347. if used_amount == false then
  348. return redis.status_reply("ok")
  349. end
  350. if ARGV[1] < used_amount then
  351. return redis.status_reply("ok")
  352. end
  353. redis.call("HSet", KEYS[1], "ua", ARGV[1])
  354. return redis.status_reply("ok")
  355. `)
  356. func CacheUpdateGroupUsedAmountOnlyIncrease(id string, amount float64) error {
  357. if !common.RedisEnabled {
  358. return nil
  359. }
  360. return updateGroupUsedAmountOnlyIncreaseScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(GroupCacheKey, id)}, amount).
  361. Err()
  362. }
  363. type GroupMCPCache struct {
  364. ID string `json:"id" redis:"i"`
  365. GroupID string `json:"group_id" redis:"g"`
  366. Status GroupMCPStatus `json:"status" redis:"s"`
  367. Type GroupMCPType `json:"type" redis:"t"`
  368. ProxyConfig *GroupMCPProxyConfig `json:"proxy_config" redis:"pc"`
  369. OpenAPIConfig *MCPOpenAPIConfig `json:"openapi_config" redis:"oc"`
  370. }
  371. func (g *GroupMCP) ToGroupMCPCache() *GroupMCPCache {
  372. return &GroupMCPCache{
  373. ID: g.ID,
  374. GroupID: g.GroupID,
  375. Status: g.Status,
  376. Type: g.Type,
  377. ProxyConfig: g.ProxyConfig,
  378. OpenAPIConfig: g.OpenAPIConfig,
  379. }
  380. }
  381. const (
  382. GroupMCPCacheKey = "group_mcp:%s:%s" // group_id:mcp_id
  383. )
  384. func CacheDeleteGroupMCP(groupID, mcpID string) error {
  385. if !common.RedisEnabled {
  386. return nil
  387. }
  388. return common.RedisDel(fmt.Sprintf(GroupMCPCacheKey, groupID, mcpID))
  389. }
  390. func CacheSetGroupMCP(groupMCP *GroupMCPCache) error {
  391. if !common.RedisEnabled {
  392. return nil
  393. }
  394. key := fmt.Sprintf(GroupMCPCacheKey, groupMCP.GroupID, groupMCP.ID)
  395. pipe := common.RDB.Pipeline()
  396. pipe.HSet(context.Background(), key, groupMCP)
  397. expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
  398. pipe.Expire(context.Background(), key, expireTime)
  399. _, err := pipe.Exec(context.Background())
  400. return err
  401. }
  402. func CacheGetGroupMCP(groupID, mcpID string) (*GroupMCPCache, error) {
  403. if !common.RedisEnabled {
  404. groupMCP, err := GetGroupMCPByID(mcpID, groupID)
  405. if err != nil {
  406. return nil, err
  407. }
  408. return groupMCP.ToGroupMCPCache(), nil
  409. }
  410. cacheKey := fmt.Sprintf(GroupMCPCacheKey, groupID, mcpID)
  411. groupMCPCache := &GroupMCPCache{}
  412. err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(groupMCPCache)
  413. if err == nil && groupMCPCache.ID != "" {
  414. return groupMCPCache, nil
  415. } else if err != nil && !errors.Is(err, redis.Nil) {
  416. log.Errorf("get group mcp (%s:%s) from redis error: %s", groupID, mcpID, err.Error())
  417. }
  418. groupMCP, err := GetGroupMCPByID(mcpID, groupID)
  419. if err != nil {
  420. return nil, err
  421. }
  422. gmc := groupMCP.ToGroupMCPCache()
  423. if err := CacheSetGroupMCP(gmc); err != nil {
  424. log.Error("redis set group mcp error: " + err.Error())
  425. }
  426. return gmc, nil
  427. }
  428. var updateGroupMCPStatusScript = redis.NewScript(`
  429. if redis.call("HExists", KEYS[1], "s") then
  430. redis.call("HSet", KEYS[1], "s", ARGV[1])
  431. end
  432. return redis.status_reply("ok")
  433. `)
  434. func CacheUpdateGroupMCPStatus(groupID, mcpID string, status GroupMCPStatus) error {
  435. if !common.RedisEnabled {
  436. return nil
  437. }
  438. return updateGroupMCPStatusScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(GroupMCPCacheKey, groupID, mcpID)}, status).
  439. Err()
  440. }
  441. type PublicMCPCache struct {
  442. ID string `json:"id" redis:"i"`
  443. Status PublicMCPStatus `json:"status" redis:"s"`
  444. Type PublicMCPType `json:"type" redis:"t"`
  445. Price MCPPrice `json:"price" redis:"p"`
  446. ProxyConfig *PublicMCPProxyConfig `json:"proxy_config" redis:"pc"`
  447. OpenAPIConfig *MCPOpenAPIConfig `json:"openapi_config" redis:"oc"`
  448. EmbedConfig *MCPEmbeddingConfig `json:"embed_config" redis:"ec"`
  449. }
  450. func (p *PublicMCP) ToPublicMCPCache() *PublicMCPCache {
  451. return &PublicMCPCache{
  452. ID: p.ID,
  453. Status: p.Status,
  454. Type: p.Type,
  455. Price: p.Price,
  456. ProxyConfig: p.ProxyConfig,
  457. OpenAPIConfig: p.OpenAPIConfig,
  458. EmbedConfig: p.EmbedConfig,
  459. }
  460. }
  461. const (
  462. PublicMCPCacheKey = "public_mcp:%s" // mcp_id
  463. )
  464. func CacheDeletePublicMCP(mcpID string) error {
  465. if !common.RedisEnabled {
  466. return nil
  467. }
  468. return common.RedisDel(fmt.Sprintf(PublicMCPCacheKey, mcpID))
  469. }
  470. func CacheSetPublicMCP(publicMCP *PublicMCPCache) error {
  471. if !common.RedisEnabled {
  472. return nil
  473. }
  474. key := fmt.Sprintf(PublicMCPCacheKey, publicMCP.ID)
  475. pipe := common.RDB.Pipeline()
  476. pipe.HSet(context.Background(), key, publicMCP)
  477. expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
  478. pipe.Expire(context.Background(), key, expireTime)
  479. _, err := pipe.Exec(context.Background())
  480. return err
  481. }
  482. func CacheGetPublicMCP(mcpID string) (*PublicMCPCache, error) {
  483. if !common.RedisEnabled {
  484. publicMCP, err := GetPublicMCPByID(mcpID)
  485. if err != nil {
  486. return nil, err
  487. }
  488. return publicMCP.ToPublicMCPCache(), nil
  489. }
  490. cacheKey := fmt.Sprintf(PublicMCPCacheKey, mcpID)
  491. publicMCPCache := &PublicMCPCache{}
  492. err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(publicMCPCache)
  493. if err == nil && publicMCPCache.ID != "" {
  494. return publicMCPCache, nil
  495. } else if err != nil && !errors.Is(err, redis.Nil) {
  496. log.Errorf("get public mcp (%s) from redis error: %s", mcpID, err.Error())
  497. }
  498. publicMCP, err := GetPublicMCPByID(mcpID)
  499. if err != nil {
  500. return nil, err
  501. }
  502. pmc := publicMCP.ToPublicMCPCache()
  503. if err := CacheSetPublicMCP(pmc); err != nil {
  504. log.Error("redis set public mcp error: " + err.Error())
  505. }
  506. return pmc, nil
  507. }
  508. var updatePublicMCPStatusScript = redis.NewScript(`
  509. if redis.call("HExists", KEYS[1], "s") then
  510. redis.call("HSet", KEYS[1], "s", ARGV[1])
  511. end
  512. return redis.status_reply("ok")
  513. `)
  514. func CacheUpdatePublicMCPStatus(mcpID string, status PublicMCPStatus) error {
  515. if !common.RedisEnabled {
  516. return nil
  517. }
  518. return updatePublicMCPStatusScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(PublicMCPCacheKey, mcpID)}, status).
  519. Err()
  520. }
  521. const (
  522. PublicMCPReusingParamCacheKey = "public_mcp_reusing_param:%s:%s" // mcp_id:group_id
  523. )
  524. type PublicMCPReusingParamCache struct {
  525. MCPID string `json:"mcp_id" redis:"m"`
  526. GroupID string `json:"group_id" redis:"g"`
  527. ReusingParams map[string]string `json:"reusing_params" redis:"rp"`
  528. }
  529. func (p *PublicMCPReusingParam) ToPublicMCPReusingParamCache() *PublicMCPReusingParamCache {
  530. return &PublicMCPReusingParamCache{
  531. MCPID: p.MCPID,
  532. GroupID: p.GroupID,
  533. ReusingParams: p.ReusingParams,
  534. }
  535. }
  536. func CacheDeletePublicMCPReusingParam(mcpID, groupID string) error {
  537. if !common.RedisEnabled {
  538. return nil
  539. }
  540. return common.RedisDel(fmt.Sprintf(PublicMCPReusingParamCacheKey, mcpID, groupID))
  541. }
  542. func CacheSetPublicMCPReusingParam(param *PublicMCPReusingParamCache) error {
  543. if !common.RedisEnabled {
  544. return nil
  545. }
  546. key := fmt.Sprintf(PublicMCPReusingParamCacheKey, param.MCPID, param.GroupID)
  547. pipe := common.RDB.Pipeline()
  548. pipe.HSet(context.Background(), key, param)
  549. expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
  550. pipe.Expire(context.Background(), key, expireTime)
  551. _, err := pipe.Exec(context.Background())
  552. return err
  553. }
  554. func CacheGetPublicMCPReusingParam(mcpID, groupID string) (*PublicMCPReusingParamCache, error) {
  555. if !common.RedisEnabled {
  556. param, err := GetPublicMCPReusingParam(mcpID, groupID)
  557. if err != nil {
  558. return nil, err
  559. }
  560. return param.ToPublicMCPReusingParamCache(), nil
  561. }
  562. cacheKey := fmt.Sprintf(PublicMCPReusingParamCacheKey, mcpID, groupID)
  563. paramCache := &PublicMCPReusingParamCache{}
  564. err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(paramCache)
  565. if err == nil && paramCache.MCPID != "" {
  566. return paramCache, nil
  567. } else if err != nil && !errors.Is(err, redis.Nil) {
  568. log.Errorf("get public mcp reusing param (%s:%s) from redis error: %s", mcpID, groupID, err.Error())
  569. }
  570. param, err := GetPublicMCPReusingParam(mcpID, groupID)
  571. if err != nil {
  572. return nil, err
  573. }
  574. prc := param.ToPublicMCPReusingParamCache()
  575. if err := CacheSetPublicMCPReusingParam(prc); err != nil {
  576. log.Error("redis set public mcp reusing param error: " + err.Error())
  577. }
  578. return prc, nil
  579. }
  580. const (
  581. StoreCacheKey = "store:%s" // store_id
  582. )
  583. type StoreCache struct {
  584. ID string `json:"id" redis:"i"`
  585. GroupID string `json:"group_id" redis:"g"`
  586. TokenID int `json:"token_id" redis:"t"`
  587. ChannelID int `json:"channel_id" redis:"c"`
  588. Model string `json:"model" redis:"m"`
  589. ExpiresAt time.Time `json:"expires_at" redis:"e"`
  590. }
  591. func (s *Store) ToStoreCache() *StoreCache {
  592. return &StoreCache{
  593. ID: s.ID,
  594. GroupID: s.GroupID,
  595. TokenID: s.TokenID,
  596. ChannelID: s.ChannelID,
  597. Model: s.Model,
  598. ExpiresAt: s.ExpiresAt,
  599. }
  600. }
  601. func CacheSetStore(store *StoreCache) error {
  602. if !common.RedisEnabled {
  603. return nil
  604. }
  605. key := fmt.Sprintf(StoreCacheKey, store.ID)
  606. pipe := common.RDB.Pipeline()
  607. pipe.HSet(context.Background(), key, store)
  608. expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
  609. pipe.Expire(context.Background(), key, expireTime)
  610. _, err := pipe.Exec(context.Background())
  611. return err
  612. }
  613. func CacheGetStore(id string) (*StoreCache, error) {
  614. if !common.RedisEnabled {
  615. store, err := GetStore(id)
  616. if err != nil {
  617. return nil, err
  618. }
  619. return store.ToStoreCache(), nil
  620. }
  621. cacheKey := fmt.Sprintf(StoreCacheKey, id)
  622. storeCache := &StoreCache{}
  623. err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(storeCache)
  624. if err == nil && storeCache.ID != "" {
  625. return storeCache, nil
  626. }
  627. store, err := GetStore(id)
  628. if err != nil {
  629. return nil, err
  630. }
  631. sc := store.ToStoreCache()
  632. if err := CacheSetStore(sc); err != nil {
  633. log.Error("redis set store error: " + err.Error())
  634. }
  635. return sc, nil
  636. }
  637. //nolint:revive
  638. type ModelConfigCache interface {
  639. GetModelConfig(model string) (ModelConfig, bool)
  640. }
  641. // read-only cache
  642. //
  643. //nolint:revive
  644. type ModelCaches struct {
  645. ModelConfig ModelConfigCache
  646. // map[set][]model
  647. EnabledModelsBySet map[string][]string
  648. // map[set][]modelconfig
  649. EnabledModelConfigsBySet map[string][]ModelConfig
  650. // map[model]modelconfig
  651. EnabledModelConfigsMap map[string]ModelConfig
  652. // map[set]map[model][]channel
  653. EnabledModel2ChannelsBySet map[string]map[string][]*Channel
  654. // map[set]map[model][]channel
  655. DisabledModel2ChannelsBySet map[string]map[string][]*Channel
  656. }
  657. var modelCaches atomic.Pointer[ModelCaches]
  658. func init() {
  659. modelCaches.Store(new(ModelCaches))
  660. }
  661. func LoadModelCaches() *ModelCaches {
  662. return modelCaches.Load()
  663. }
  664. // InitModelConfigAndChannelCache initializes the channel cache from database
  665. func InitModelConfigAndChannelCache() error {
  666. modelConfig, err := initializeModelConfigCache()
  667. if err != nil {
  668. return err
  669. }
  670. // Load enabled channels from database
  671. enabledChannels, err := LoadEnabledChannels()
  672. if err != nil {
  673. return err
  674. }
  675. // Build model to channels map by set
  676. enabledModel2ChannelsBySet := buildModelToChannelsBySetMap(enabledChannels)
  677. // Sort channels by priority within each set
  678. sortChannelsByPriorityBySet(enabledModel2ChannelsBySet)
  679. // Build enabled models and configs by set
  680. enabledModelsBySet, enabledModelConfigsBySet, enabledModelConfigsMap := buildEnabledModelsBySet(
  681. enabledModel2ChannelsBySet,
  682. modelConfig,
  683. )
  684. // Load disabled channels
  685. disabledChannels, err := LoadDisabledChannels()
  686. if err != nil {
  687. return err
  688. }
  689. // Build disabled model to channels map by set
  690. disabledModel2ChannelsBySet := buildModelToChannelsBySetMap(disabledChannels)
  691. // Update global cache atomically
  692. modelCaches.Store(&ModelCaches{
  693. ModelConfig: modelConfig,
  694. EnabledModelsBySet: enabledModelsBySet,
  695. EnabledModelConfigsBySet: enabledModelConfigsBySet,
  696. EnabledModelConfigsMap: enabledModelConfigsMap,
  697. EnabledModel2ChannelsBySet: enabledModel2ChannelsBySet,
  698. DisabledModel2ChannelsBySet: disabledModel2ChannelsBySet,
  699. })
  700. return nil
  701. }
  702. func LoadEnabledChannels() ([]*Channel, error) {
  703. var channels []*Channel
  704. err := DB.Where("status = ?", ChannelStatusEnabled).Find(&channels).Error
  705. if err != nil {
  706. return nil, err
  707. }
  708. for _, channel := range channels {
  709. initializeChannelModels(channel)
  710. initializeChannelModelMapping(channel)
  711. }
  712. return channels, nil
  713. }
  714. func LoadDisabledChannels() ([]*Channel, error) {
  715. var channels []*Channel
  716. err := DB.Where("status = ?", ChannelStatusDisabled).Find(&channels).Error
  717. if err != nil {
  718. return nil, err
  719. }
  720. for _, channel := range channels {
  721. initializeChannelModels(channel)
  722. initializeChannelModelMapping(channel)
  723. }
  724. return channels, nil
  725. }
  726. func LoadChannels() ([]*Channel, error) {
  727. var channels []*Channel
  728. err := DB.Find(&channels).Error
  729. if err != nil {
  730. return nil, err
  731. }
  732. for _, channel := range channels {
  733. initializeChannelModels(channel)
  734. initializeChannelModelMapping(channel)
  735. }
  736. return channels, nil
  737. }
  738. func LoadChannelByID(id int) (*Channel, error) {
  739. var channel Channel
  740. err := DB.First(&channel, id).Error
  741. if err != nil {
  742. return nil, err
  743. }
  744. initializeChannelModels(&channel)
  745. initializeChannelModelMapping(&channel)
  746. return &channel, nil
  747. }
  748. var _ ModelConfigCache = (*modelConfigMapCache)(nil)
  749. type modelConfigMapCache struct {
  750. modelConfigMap map[string]ModelConfig
  751. }
  752. func (m *modelConfigMapCache) GetModelConfig(model string) (ModelConfig, bool) {
  753. config, ok := m.modelConfigMap[model]
  754. return config, ok
  755. }
  756. var _ ModelConfigCache = (*disabledModelConfigCache)(nil)
  757. type disabledModelConfigCache struct {
  758. modelConfigs ModelConfigCache
  759. }
  760. func (d *disabledModelConfigCache) GetModelConfig(model string) (ModelConfig, bool) {
  761. if config, ok := d.modelConfigs.GetModelConfig(model); ok {
  762. return config, true
  763. }
  764. return NewDefaultModelConfig(model), true
  765. }
  766. func initializeModelConfigCache() (ModelConfigCache, error) {
  767. modelConfigs, err := GetAllModelConfigs()
  768. if err != nil {
  769. return nil, err
  770. }
  771. newModelConfigMap := make(map[string]ModelConfig)
  772. for _, modelConfig := range modelConfigs {
  773. newModelConfigMap[modelConfig.Model] = modelConfig
  774. }
  775. configs := &modelConfigMapCache{modelConfigMap: newModelConfigMap}
  776. if config.DisableModelConfig {
  777. return &disabledModelConfigCache{modelConfigs: configs}, nil
  778. }
  779. return configs, nil
  780. }
  781. func initializeChannelModels(channel *Channel) {
  782. if len(channel.Models) == 0 {
  783. channel.Models = config.GetDefaultChannelModels()[int(channel.Type)]
  784. return
  785. }
  786. findedModels, missingModels, err := GetModelConfigWithModels(channel.Models)
  787. if err != nil {
  788. return
  789. }
  790. if len(missingModels) > 0 {
  791. slices.Sort(missingModels)
  792. log.Errorf("model config not found: %v", missingModels)
  793. }
  794. slices.Sort(findedModels)
  795. channel.Models = findedModels
  796. }
  797. func initializeChannelModelMapping(channel *Channel) {
  798. if len(channel.ModelMapping) == 0 {
  799. channel.ModelMapping = config.GetDefaultChannelModelMapping()[int(channel.Type)]
  800. }
  801. }
  802. func buildModelToChannelsBySetMap(channels []*Channel) map[string]map[string][]*Channel {
  803. modelMapBySet := make(map[string]map[string][]*Channel)
  804. for _, channel := range channels {
  805. sets := channel.GetSets()
  806. for _, set := range sets {
  807. if _, ok := modelMapBySet[set]; !ok {
  808. modelMapBySet[set] = make(map[string][]*Channel)
  809. }
  810. for _, model := range channel.Models {
  811. modelMapBySet[set][model] = append(modelMapBySet[set][model], channel)
  812. }
  813. }
  814. }
  815. return modelMapBySet
  816. }
  817. func sortChannelsByPriorityBySet(modelMapBySet map[string]map[string][]*Channel) {
  818. for _, modelMap := range modelMapBySet {
  819. for _, channels := range modelMap {
  820. sort.Slice(channels, func(i, j int) bool {
  821. return channels[i].GetPriority() > channels[j].GetPriority()
  822. })
  823. }
  824. }
  825. }
  826. func buildEnabledModelsBySet(
  827. modelMapBySet map[string]map[string][]*Channel,
  828. modelConfigCache ModelConfigCache,
  829. ) (
  830. map[string][]string,
  831. map[string][]ModelConfig,
  832. map[string]ModelConfig,
  833. ) {
  834. modelsBySet := make(map[string][]string)
  835. modelConfigsBySet := make(map[string][]ModelConfig)
  836. modelConfigsMap := make(map[string]ModelConfig)
  837. for set, modelMap := range modelMapBySet {
  838. models := make([]string, 0)
  839. configs := make([]ModelConfig, 0)
  840. appended := make(map[string]struct{})
  841. for model := range modelMap {
  842. if _, ok := appended[model]; ok {
  843. continue
  844. }
  845. if config, ok := modelConfigCache.GetModelConfig(model); ok {
  846. models = append(models, model)
  847. configs = append(configs, config)
  848. appended[model] = struct{}{}
  849. modelConfigsMap[model] = config
  850. }
  851. }
  852. slices.Sort(models)
  853. slices.SortStableFunc(configs, SortModelConfigsFunc)
  854. modelsBySet[set] = models
  855. modelConfigsBySet[set] = configs
  856. }
  857. return modelsBySet, modelConfigsBySet, modelConfigsMap
  858. }
  859. func SortModelConfigsFunc(i, j ModelConfig) int {
  860. if i.Owner != j.Owner {
  861. if natural.Less(string(i.Owner), string(j.Owner)) {
  862. return -1
  863. }
  864. return 1
  865. }
  866. if i.Type != j.Type {
  867. if i.Type < j.Type {
  868. return -1
  869. }
  870. return 1
  871. }
  872. if i.Model == j.Model {
  873. return 0
  874. }
  875. if natural.Less(i.Model, j.Model) {
  876. return -1
  877. }
  878. return 1
  879. }
  880. func SyncModelConfigAndChannelCache(
  881. ctx context.Context,
  882. wg *sync.WaitGroup,
  883. frequency time.Duration,
  884. ) {
  885. defer wg.Done()
  886. ticker := time.NewTicker(frequency)
  887. defer ticker.Stop()
  888. for {
  889. select {
  890. case <-ctx.Done():
  891. return
  892. case <-ticker.C:
  893. err := InitModelConfigAndChannelCache()
  894. if err != nil {
  895. notify.ErrorThrottle(
  896. "syncModelChannel",
  897. time.Minute,
  898. "failed to sync channels",
  899. err.Error(),
  900. )
  901. }
  902. }
  903. }
  904. }