register.go 5.0 KB


  1. package mcpservers
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/labring/aiproxy/core/model"
  11. "github.com/mark3labs/mcp-go/mcp"
  12. )
  13. type mcpServerCacheItem struct {
  14. MCPServer Server
  15. LastUsedTimestamp atomic.Int64
  16. }
  17. var (
  18. servers = make(map[string]McpServer)
  19. mcpServerCache = make(map[string]*mcpServerCacheItem)
  20. mcpServerCacheLock = sync.RWMutex{}
  21. cacheExpirationTime = 3 * time.Minute
  22. )
  23. func startCacheCleaner(interval time.Duration) {
  24. go func() {
  25. ticker := time.NewTicker(interval)
  26. defer ticker.Stop()
  27. for range ticker.C {
  28. cleanupExpiredCache()
  29. }
  30. }()
  31. }
  32. func cleanupExpiredCache() {
  33. now := time.Now().Unix()
  34. expiredTime := now - int64(cacheExpirationTime.Seconds())
  35. mcpServerCacheLock.Lock()
  36. defer mcpServerCacheLock.Unlock()
  37. for key, item := range mcpServerCache {
  38. if item.LastUsedTimestamp.Load() < expiredTime {
  39. delete(mcpServerCache, key)
  40. }
  41. }
  42. }
  43. func init() {
  44. startCacheCleaner(time.Minute)
  45. }
  46. func Register(mcp McpServer) {
  47. if mcp.ID == "" {
  48. panic("mcp id is required")
  49. }
  50. if mcp.Name == "" {
  51. panic("mcp name is required")
  52. }
  53. if mcp.Description == "" &&
  54. mcp.DescriptionCN == "" &&
  55. mcp.Readme == "" &&
  56. mcp.ReadmeURL == "" &&
  57. mcp.ReadmeCN == "" &&
  58. mcp.ReadmeCNURL == "" {
  59. panic(
  60. fmt.Sprintf(
  61. "mcp %s description or description_cn readme or readme_url or readme_cn or readme_cn_url is required",
  62. mcp.ID,
  63. ),
  64. )
  65. }
  66. switch mcp.Type {
  67. case model.PublicMCPTypeEmbed:
  68. if mcp.newServer == nil {
  69. panic(fmt.Sprintf("mcp %s new server is required", mcp.ID))
  70. }
  71. case model.PublicMCPTypeProxySSE,
  72. model.PublicMCPTypeProxyStreamable:
  73. if len(mcp.ProxyConfigTemplates) == 0 {
  74. panic(fmt.Sprintf("mcp %s proxy config templates is required", mcp.ID))
  75. }
  76. default:
  77. }
  78. if len(mcp.ConfigTemplates) != 0 {
  79. if err := CheckConfigTemplatesValidate(mcp.ConfigTemplates); err != nil {
  80. panic(fmt.Sprintf("mcp %s config templates example is invalid: %v", mcp.ID, err))
  81. }
  82. }
  83. if len(mcp.ProxyConfigTemplates) != 0 {
  84. if err := CheckProxyConfigTemplatesValidate(mcp.ProxyConfigTemplates); err != nil {
  85. panic(fmt.Sprintf("mcp %s config templates example is invalid: %v", mcp.ID, err))
  86. }
  87. }
  88. if _, ok := servers[mcp.ID]; ok {
  89. panic(fmt.Sprintf("mcp %s already registered", mcp.ID))
  90. }
  91. servers[mcp.ID] = mcp
  92. }
  93. func ListTools(ctx context.Context, id string) ([]mcp.Tool, error) {
  94. embedServer, ok := servers[id]
  95. if !ok {
  96. return nil, fmt.Errorf("mcp %s not found", id)
  97. }
  98. tools, err := embedServer.ListTools(ctx)
  99. if err != nil {
  100. return nil, fmt.Errorf("mcp %s list tools error: %w", id, err)
  101. }
  102. return tools, nil
  103. }
  104. func GetMCPServer(id string, config, reusingConfig map[string]string) (Server, error) {
  105. embedServer, ok := servers[id]
  106. if !ok {
  107. return nil, fmt.Errorf("mcp %s not found", id)
  108. }
  109. if len(embedServer.ConfigTemplates) == 0 {
  110. if embedServer.disableCache {
  111. return embedServer.NewServer(config, reusingConfig)
  112. }
  113. return loadCacheServer(embedServer, nil)
  114. }
  115. if err := ValidateConfigTemplatesConfig(embedServer.ConfigTemplates, config, reusingConfig); err != nil {
  116. return nil, fmt.Errorf("mcp %s config is invalid: %w", id, err)
  117. }
  118. if embedServer.disableCache {
  119. return embedServer.NewServer(config, reusingConfig)
  120. }
  121. if len(reusingConfig) == 0 {
  122. return loadCacheServer(embedServer, config)
  123. }
  124. for _, template := range embedServer.ConfigTemplates {
  125. switch template.Required {
  126. case ConfigRequiredTypeReusingOptional,
  127. ConfigRequiredTypeReusingOnly,
  128. ConfigRequiredTypeInitOrReusingOnly:
  129. return embedServer.NewServer(config, reusingConfig)
  130. }
  131. }
  132. return loadCacheServer(embedServer, config)
  133. }
  134. func buildNoReusingConfigCacheKey(config map[string]string) string {
  135. keys := make([]string, 0, len(config))
  136. for key, value := range config {
  137. keys = append(keys, fmt.Sprintf("%s:%s", key, value))
  138. }
  139. sort.Strings(keys)
  140. return strings.Join(keys, ":")
  141. }
  142. func loadCacheServer(embedServer McpServer, config map[string]string) (Server, error) {
  143. cacheKey := embedServer.ID
  144. if len(config) > 0 {
  145. cacheKey = fmt.Sprintf("%s:%s", embedServer.ID, buildNoReusingConfigCacheKey(config))
  146. }
  147. mcpServerCacheLock.RLock()
  148. server, ok := mcpServerCache[cacheKey]
  149. mcpServerCacheLock.RUnlock()
  150. if ok {
  151. server.LastUsedTimestamp.Store(time.Now().Unix())
  152. return server.MCPServer, nil
  153. }
  154. mcpServerCacheLock.Lock()
  155. defer mcpServerCacheLock.Unlock()
  156. server, ok = mcpServerCache[cacheKey]
  157. if ok {
  158. server.LastUsedTimestamp.Store(time.Now().Unix())
  159. return server.MCPServer, nil
  160. }
  161. mcpServer, err := embedServer.NewServer(config, nil)
  162. if err != nil {
  163. return nil, fmt.Errorf("mcp %s new server is invalid: %w", embedServer.ID, err)
  164. }
  165. mcpServerCacheItem := &mcpServerCacheItem{
  166. MCPServer: mcpServer,
  167. LastUsedTimestamp: atomic.Int64{},
  168. }
  169. mcpServerCacheItem.LastUsedTimestamp.Store(time.Now().Unix())
  170. mcpServerCache[cacheKey] = mcpServerCacheItem
  171. return mcpServer, nil
  172. }
  173. func Servers() map[string]McpServer {
  174. return servers
  175. }
  176. func GetEmbedMCP(id string) (McpServer, bool) {
  177. mcp, ok := servers[id]
  178. return mcp, ok
  179. }