|
|
@@ -242,21 +242,25 @@ func CacheUpdateTokenStatus(key string, status int) error {
|
|
|
return updateTokenStatusScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(TokenCacheKey, key)}, status).Err()
|
|
|
}
|
|
|
|
|
|
-type redisGroupModelConfigMap map[string]GroupModelConfig
|
|
|
+type redisMap[K comparable, V any] map[K]V
|
|
|
|
|
|
var (
|
|
|
- _ redis.Scanner = (*redisGroupModelConfigMap)(nil)
|
|
|
- _ encoding.BinaryMarshaler = (*redisGroupModelConfigMap)(nil)
|
|
|
+ _ redis.Scanner = (*redisMap[string, any])(nil)
|
|
|
+ _ encoding.BinaryMarshaler = (*redisMap[string, any])(nil)
|
|
|
)
|
|
|
|
|
|
-func (r *redisGroupModelConfigMap) ScanRedis(value string) error {
|
|
|
+func (r *redisMap[K, V]) ScanRedis(value string) error {
|
|
|
return sonic.UnmarshalString(value, r)
|
|
|
}
|
|
|
|
|
|
-func (r redisGroupModelConfigMap) MarshalBinary() ([]byte, error) {
|
|
|
+func (r redisMap[K, V]) MarshalBinary() ([]byte, error) {
|
|
|
return sonic.Marshal(r)
|
|
|
}
|
|
|
|
|
|
+type (
|
|
|
+ redisGroupModelConfigMap redisMap[string, GroupModelConfig]
|
|
|
+)
|
|
|
+
|
|
|
type GroupCache struct {
|
|
|
ID string `json:"-" redis:"-"`
|
|
|
Status int `json:"status" redis:"st"`
|
|
|
@@ -278,8 +282,8 @@ func (g *GroupCache) GetAvailableSets() []string {
|
|
|
}
|
|
|
|
|
|
func (g *Group) ToGroupCache() *GroupCache {
|
|
|
- modelConfigs := make(redisGroupModelConfigMap, len(g.GroupModelConfigs))
|
|
|
- for _, modelConfig := range g.GroupModelConfigs {
|
|
|
+ modelConfigs := make(redisGroupModelConfigMap, len(g.ModelConfigs))
|
|
|
+ for _, modelConfig := range g.ModelConfigs {
|
|
|
modelConfigs[modelConfig.Model] = modelConfig
|
|
|
}
|
|
|
return &GroupCache{
|
|
|
@@ -411,6 +415,261 @@ func CacheUpdateGroupUsedAmountOnlyIncrease(id string, amount float64) error {
|
|
|
return updateGroupUsedAmountOnlyIncreaseScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(GroupCacheKey, id)}, amount).Err()
|
|
|
}
|
|
|
|
|
|
+type GroupMCPCache struct {
|
|
|
+ ID string `json:"id" redis:"i"`
|
|
|
+ GroupID string `json:"group_id" redis:"g"`
|
|
|
+ Status GroupMCPStatus `json:"status" redis:"s"`
|
|
|
+ Type GroupMCPType `json:"type" redis:"t"`
|
|
|
+ ProxyConfig *GroupMCPProxyConfig `json:"proxy_config" redis:"pc"`
|
|
|
+ OpenAPIConfig *MCPOpenAPIConfig `json:"openapi_config" redis:"oc"`
|
|
|
+}
|
|
|
+
|
|
|
+func (g *GroupMCP) ToGroupMCPCache() *GroupMCPCache {
|
|
|
+ return &GroupMCPCache{
|
|
|
+ ID: g.ID,
|
|
|
+ GroupID: g.GroupID,
|
|
|
+ Status: g.Status,
|
|
|
+ Type: g.Type,
|
|
|
+ ProxyConfig: g.ProxyConfig,
|
|
|
+ OpenAPIConfig: g.OpenAPIConfig,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ GroupMCPCacheKey = "group_mcp:%s:%s" // group_id:mcp_id
|
|
|
+)
|
|
|
+
|
|
|
+func CacheDeleteGroupMCP(groupID, mcpID string) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return common.RedisDel(fmt.Sprintf(GroupMCPCacheKey, groupID, mcpID))
|
|
|
+}
|
|
|
+
|
|
|
+//nolint:gosec
|
|
|
+func CacheSetGroupMCP(groupMCP *GroupMCPCache) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ key := fmt.Sprintf(GroupMCPCacheKey, groupMCP.GroupID, groupMCP.ID)
|
|
|
+ pipe := common.RDB.Pipeline()
|
|
|
+ pipe.HSet(context.Background(), key, groupMCP)
|
|
|
+ expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
|
|
|
+ pipe.Expire(context.Background(), key, expireTime)
|
|
|
+ _, err := pipe.Exec(context.Background())
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func CacheGetGroupMCP(groupID, mcpID string) (*GroupMCPCache, error) {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ groupMCP, err := GetGroupMCPByID(mcpID, groupID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return groupMCP.ToGroupMCPCache(), nil
|
|
|
+ }
|
|
|
+
|
|
|
+ cacheKey := fmt.Sprintf(GroupMCPCacheKey, groupID, mcpID)
|
|
|
+ groupMCPCache := &GroupMCPCache{}
|
|
|
+ err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(groupMCPCache)
|
|
|
+ if err == nil && groupMCPCache.ID != "" {
|
|
|
+ return groupMCPCache, nil
|
|
|
+ } else if err != nil && !errors.Is(err, redis.Nil) {
|
|
|
+ log.Errorf("get group mcp (%s:%s) from redis error: %s", groupID, mcpID, err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ groupMCP, err := GetGroupMCPByID(mcpID, groupID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ gmc := groupMCP.ToGroupMCPCache()
|
|
|
+
|
|
|
+ if err := CacheSetGroupMCP(gmc); err != nil {
|
|
|
+ log.Error("redis set group mcp error: " + err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return gmc, nil
|
|
|
+}
|
|
|
+
|
|
|
+var updateGroupMCPStatusScript = redis.NewScript(`
|
|
|
+ if redis.call("HExists", KEYS[1], "s") then
|
|
|
+ redis.call("HSet", KEYS[1], "s", ARGV[1])
|
|
|
+ end
|
|
|
+ return redis.status_reply("ok")
|
|
|
+`)
|
|
|
+
|
|
|
+func CacheUpdateGroupMCPStatus(groupID, mcpID string, status GroupMCPStatus) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return updateGroupMCPStatusScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(GroupMCPCacheKey, groupID, mcpID)}, status).Err()
|
|
|
+}
|
|
|
+
|
|
|
+type PublicMCPCache struct {
|
|
|
+ ID string `json:"id" redis:"i"`
|
|
|
+ Status PublicMCPStatus `json:"status" redis:"s"`
|
|
|
+ Type PublicMCPType `json:"type" redis:"t"`
|
|
|
+ Price MCPPrice `json:"price" redis:"p"`
|
|
|
+ ProxyConfig *PublicMCPProxyConfig `json:"proxy_config" redis:"pc"`
|
|
|
+ OpenAPIConfig *MCPOpenAPIConfig `json:"openapi_config" redis:"oc"`
|
|
|
+ EmbedConfig *MCPEmbeddingConfig `json:"embed_config" redis:"ec"`
|
|
|
+}
|
|
|
+
|
|
|
+func (p *PublicMCP) ToPublicMCPCache() *PublicMCPCache {
|
|
|
+ return &PublicMCPCache{
|
|
|
+ ID: p.ID,
|
|
|
+ Status: p.Status,
|
|
|
+ Type: p.Type,
|
|
|
+ Price: p.Price,
|
|
|
+ ProxyConfig: p.ProxyConfig,
|
|
|
+ OpenAPIConfig: p.OpenAPIConfig,
|
|
|
+ EmbedConfig: p.EmbedConfig,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ PublicMCPCacheKey = "public_mcp:%s" // mcp_id
|
|
|
+)
|
|
|
+
|
|
|
+func CacheDeletePublicMCP(mcpID string) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return common.RedisDel(fmt.Sprintf(PublicMCPCacheKey, mcpID))
|
|
|
+}
|
|
|
+
|
|
|
+//nolint:gosec
|
|
|
+func CacheSetPublicMCP(publicMCP *PublicMCPCache) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ key := fmt.Sprintf(PublicMCPCacheKey, publicMCP.ID)
|
|
|
+ pipe := common.RDB.Pipeline()
|
|
|
+ pipe.HSet(context.Background(), key, publicMCP)
|
|
|
+ expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
|
|
|
+ pipe.Expire(context.Background(), key, expireTime)
|
|
|
+ _, err := pipe.Exec(context.Background())
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func CacheGetPublicMCP(mcpID string) (*PublicMCPCache, error) {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ publicMCP, err := GetPublicMCPByID(mcpID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return publicMCP.ToPublicMCPCache(), nil
|
|
|
+ }
|
|
|
+
|
|
|
+ cacheKey := fmt.Sprintf(PublicMCPCacheKey, mcpID)
|
|
|
+ publicMCPCache := &PublicMCPCache{}
|
|
|
+ err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(publicMCPCache)
|
|
|
+ if err == nil && publicMCPCache.ID != "" {
|
|
|
+ return publicMCPCache, nil
|
|
|
+ } else if err != nil && !errors.Is(err, redis.Nil) {
|
|
|
+ log.Errorf("get public mcp (%s) from redis error: %s", mcpID, err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ publicMCP, err := GetPublicMCPByID(mcpID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ pmc := publicMCP.ToPublicMCPCache()
|
|
|
+
|
|
|
+ if err := CacheSetPublicMCP(pmc); err != nil {
|
|
|
+ log.Error("redis set public mcp error: " + err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return pmc, nil
|
|
|
+}
|
|
|
+
|
|
|
+var updatePublicMCPStatusScript = redis.NewScript(`
|
|
|
+ if redis.call("HExists", KEYS[1], "s") then
|
|
|
+ redis.call("HSet", KEYS[1], "s", ARGV[1])
|
|
|
+ end
|
|
|
+ return redis.status_reply("ok")
|
|
|
+`)
|
|
|
+
|
|
|
+func CacheUpdatePublicMCPStatus(mcpID string, status PublicMCPStatus) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return updatePublicMCPStatusScript.Run(context.Background(), common.RDB, []string{fmt.Sprintf(PublicMCPCacheKey, mcpID)}, status).Err()
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ PublicMCPReusingParamCacheKey = "public_mcp_reusing_param:%s:%s" // mcp_id:group_id
|
|
|
+)
|
|
|
+
|
|
|
+type PublicMCPReusingParamCache struct {
|
|
|
+ MCPID string `json:"mcp_id" redis:"m"`
|
|
|
+ GroupID string `json:"group_id" redis:"g"`
|
|
|
+ ReusingParams map[string]string `json:"reusing_params" redis:"rp"`
|
|
|
+}
|
|
|
+
|
|
|
+func (p *PublicMCPReusingParam) ToPublicMCPReusingParamCache() *PublicMCPReusingParamCache {
|
|
|
+ return &PublicMCPReusingParamCache{
|
|
|
+ MCPID: p.MCPID,
|
|
|
+ GroupID: p.GroupID,
|
|
|
+ ReusingParams: p.ReusingParams,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func CacheDeletePublicMCPReusingParam(mcpID, groupID string) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return common.RedisDel(fmt.Sprintf(PublicMCPReusingParamCacheKey, mcpID, groupID))
|
|
|
+}
|
|
|
+
|
|
|
+//nolint:gosec
|
|
|
+func CacheSetPublicMCPReusingParam(param *PublicMCPReusingParamCache) error {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ key := fmt.Sprintf(PublicMCPReusingParamCacheKey, param.MCPID, param.GroupID)
|
|
|
+ pipe := common.RDB.Pipeline()
|
|
|
+ pipe.HSet(context.Background(), key, param)
|
|
|
+ expireTime := SyncFrequency + time.Duration(rand.Int64N(60)-30)*time.Second
|
|
|
+ pipe.Expire(context.Background(), key, expireTime)
|
|
|
+ _, err := pipe.Exec(context.Background())
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func CacheGetPublicMCPReusingParam(mcpID, groupID string) (*PublicMCPReusingParamCache, error) {
|
|
|
+ if !common.RedisEnabled {
|
|
|
+ param, err := GetPublicMCPReusingParam(mcpID, groupID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return param.ToPublicMCPReusingParamCache(), nil
|
|
|
+ }
|
|
|
+
|
|
|
+ cacheKey := fmt.Sprintf(PublicMCPReusingParamCacheKey, mcpID, groupID)
|
|
|
+ paramCache := &PublicMCPReusingParamCache{}
|
|
|
+ err := common.RDB.HGetAll(context.Background(), cacheKey).Scan(paramCache)
|
|
|
+ if err == nil && paramCache.MCPID != "" {
|
|
|
+ return paramCache, nil
|
|
|
+ } else if err != nil && !errors.Is(err, redis.Nil) {
|
|
|
+ log.Errorf("get public mcp reusing param (%s:%s) from redis error: %s", mcpID, groupID, err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ param, err := GetPublicMCPReusingParam(mcpID, groupID)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ prc := param.ToPublicMCPReusingParamCache()
|
|
|
+
|
|
|
+ if err := CacheSetPublicMCPReusingParam(prc); err != nil {
|
|
|
+ log.Error("redis set public mcp reusing param error: " + err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return prc, nil
|
|
|
+}
|
|
|
+
|
|
|
//nolint:revive
|
|
|
type ModelConfigCache interface {
|
|
|
GetModelConfig(model string) (*ModelConfig, bool)
|