embedmcp.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package controller
  2. import (
  3. "context"
  4. "fmt"
  5. "maps"
  6. "net/http"
  7. "net/url"
  8. "slices"
  9. "strings"
  10. "github.com/gin-gonic/gin"
  11. "github.com/labring/aiproxy/core/common/mcpproxy"
  12. statelessmcp "github.com/labring/aiproxy/core/common/stateless-mcp"
  13. "github.com/labring/aiproxy/core/middleware"
  14. "github.com/labring/aiproxy/core/model"
  15. mcpservers "github.com/labring/aiproxy/mcp-servers"
  16. // init embed mcp
  17. _ "github.com/labring/aiproxy/mcp-servers/mcpregister"
  18. "github.com/mark3labs/mcp-go/mcp"
  19. "github.com/mark3labs/mcp-go/server"
  20. )
  21. type EmbedMCPConfigTemplate struct {
  22. Name string `json:"name"`
  23. Required bool `json:"required"`
  24. Example string `json:"example,omitempty"`
  25. Description string `json:"description,omitempty"`
  26. }
  27. func newEmbedMCPConfigTemplate(template mcpservers.ConfigTemplate) EmbedMCPConfigTemplate {
  28. return EmbedMCPConfigTemplate{
  29. Name: template.Name,
  30. Required: template.Required == mcpservers.ConfigRequiredTypeInitOnly,
  31. Example: template.Example,
  32. Description: template.Description,
  33. }
  34. }
  35. type EmbedMCPConfigTemplates = map[string]EmbedMCPConfigTemplate
  36. func newEmbedMCPConfigTemplates(templates mcpservers.ConfigTemplates) EmbedMCPConfigTemplates {
  37. emcpTemplates := make(EmbedMCPConfigTemplates, len(templates))
  38. for key, template := range templates {
  39. emcpTemplates[key] = newEmbedMCPConfigTemplate(template)
  40. }
  41. return emcpTemplates
  42. }
  43. type EmbedMCP struct {
  44. ID string `json:"id"`
  45. Enabled bool `json:"enabled"`
  46. Name string `json:"name"`
  47. Readme string `json:"readme"`
  48. Tags []string `json:"tags"`
  49. ConfigTemplates EmbedMCPConfigTemplates `json:"config_templates"`
  50. }
  51. func newEmbedMCP(mcp *mcpservers.EmbedMcp, enabled bool) *EmbedMCP {
  52. emcp := &EmbedMCP{
  53. ID: mcp.ID,
  54. Enabled: enabled,
  55. Name: mcp.Name,
  56. Readme: mcp.Readme,
  57. Tags: mcp.Tags,
  58. ConfigTemplates: newEmbedMCPConfigTemplates(mcp.ConfigTemplates),
  59. }
  60. return emcp
  61. }
  62. // GetEmbedMCPs godoc
  63. //
  64. // @Summary Get embed mcp
  65. // @Description Get embed mcp
  66. // @Tags embedmcp
  67. // @Accept json
  68. // @Produce json
  69. // @Security ApiKeyAuth
  70. // @Success 200 {array} EmbedMCP
  71. // @Router /api/embedmcp/ [get]
  72. func GetEmbedMCPs(c *gin.Context) {
  73. embeds := mcpservers.Servers()
  74. enabledMCPs, err := model.GetPublicMCPsEnabled(slices.Collect(maps.Keys(embeds)))
  75. if err != nil {
  76. middleware.ErrorResponse(c, http.StatusInternalServerError, err.Error())
  77. return
  78. }
  79. emcps := make([]*EmbedMCP, 0, len(embeds))
  80. for _, mcp := range embeds {
  81. emcps = append(emcps, newEmbedMCP(&mcp, slices.Contains(enabledMCPs, mcp.ID)))
  82. }
  83. middleware.SuccessResponse(c, emcps)
  84. }
  85. type SaveEmbedMCPRequest struct {
  86. ID string `json:"id"`
  87. Enabled bool `json:"enabled"`
  88. InitConfig map[string]string `json:"init_config"`
  89. }
  90. func GetEmbedConfig(
  91. ct mcpservers.ConfigTemplates,
  92. initConfig map[string]string,
  93. ) (*model.MCPEmbeddingConfig, error) {
  94. reusingConfig := make(map[string]model.MCPEmbeddingReusingConfig)
  95. embedConfig := &model.MCPEmbeddingConfig{
  96. Init: initConfig,
  97. }
  98. for key, value := range ct {
  99. switch value.Required {
  100. case mcpservers.ConfigRequiredTypeInitOnly:
  101. if v, ok := initConfig[key]; !ok || v == "" {
  102. return nil, fmt.Errorf("config %s is required", key)
  103. }
  104. case mcpservers.ConfigRequiredTypeReusingOnly:
  105. if _, ok := initConfig[key]; ok {
  106. return nil, fmt.Errorf("config %s is provided, but it is not allowed", key)
  107. }
  108. reusingConfig[key] = model.MCPEmbeddingReusingConfig{
  109. Name: value.Name,
  110. Description: value.Description,
  111. Required: true,
  112. }
  113. case mcpservers.ConfigRequiredTypeInitOrReusingOnly:
  114. if v, ok := initConfig[key]; ok {
  115. if v == "" {
  116. return nil, fmt.Errorf("config %s is required", key)
  117. }
  118. continue
  119. }
  120. reusingConfig[key] = model.MCPEmbeddingReusingConfig{
  121. Name: value.Name,
  122. Description: value.Description,
  123. Required: true,
  124. }
  125. }
  126. }
  127. embedConfig.Reusing = reusingConfig
  128. return embedConfig, nil
  129. }
  130. func ToPublicMCP(
  131. e mcpservers.EmbedMcp,
  132. initConfig map[string]string,
  133. enabled bool,
  134. ) (*model.PublicMCP, error) {
  135. embedConfig, err := GetEmbedConfig(e.ConfigTemplates, initConfig)
  136. if err != nil {
  137. return nil, err
  138. }
  139. pmcp := &model.PublicMCP{
  140. ID: e.ID,
  141. Type: model.PublicMCPTypeEmbed,
  142. Name: e.Name,
  143. Readme: e.Readme,
  144. Tags: e.Tags,
  145. EmbedConfig: embedConfig,
  146. }
  147. if enabled {
  148. pmcp.Status = model.PublicMCPStatusEnabled
  149. } else {
  150. pmcp.Status = model.PublicMCPStatusDisabled
  151. }
  152. return pmcp, nil
  153. }
  154. // SaveEmbedMCP godoc
  155. //
  156. // @Summary Save embed mcp
  157. // @Description Save embed mcp
  158. // @Tags embedmcp
  159. // @Accept json
  160. // @Produce json
  161. // @Security ApiKeyAuth
  162. // @Param body body SaveEmbedMCPRequest true "Save embed mcp request"
  163. // @Success 200 {object} nil
  164. // @Router /api/embedmcp/ [post]
  165. func SaveEmbedMCP(c *gin.Context) {
  166. var req SaveEmbedMCPRequest
  167. if err := c.ShouldBindJSON(&req); err != nil {
  168. middleware.ErrorResponse(c, http.StatusBadRequest, err.Error())
  169. return
  170. }
  171. emcp, ok := mcpservers.GetEmbedMCP(req.ID)
  172. if !ok {
  173. middleware.ErrorResponse(c, http.StatusNotFound, "embed mcp not found")
  174. return
  175. }
  176. pmcp, err := ToPublicMCP(emcp, req.InitConfig, req.Enabled)
  177. if err != nil {
  178. middleware.ErrorResponse(c, http.StatusInternalServerError, err.Error())
  179. return
  180. }
  181. if err := model.SavePublicMCP(pmcp); err != nil {
  182. middleware.ErrorResponse(c, http.StatusInternalServerError, err.Error())
  183. return
  184. }
  185. middleware.SuccessResponse(c, nil)
  186. }
  187. type testEmbedMcpEndpointProvider struct {
  188. key string
  189. }
  190. func newTestEmbedMcpEndpoint(key string) mcpproxy.EndpointProvider {
  191. return &testEmbedMcpEndpointProvider{
  192. key: key,
  193. }
  194. }
  195. func (m *testEmbedMcpEndpointProvider) NewEndpoint(session string) (newEndpoint string) {
  196. endpoint := fmt.Sprintf("/api/test-embedmcp/message?sessionId=%s&key=%s", session, m.key)
  197. return endpoint
  198. }
  199. func (m *testEmbedMcpEndpointProvider) LoadEndpoint(endpoint string) (session string) {
  200. parsedURL, err := url.Parse(endpoint)
  201. if err != nil {
  202. return ""
  203. }
  204. return parsedURL.Query().Get("sessionId")
  205. }
  206. // query like:
  207. // /api/test-embedmcp/aiproxy-openapi/sse?key=adminkey&config[key1]=value1&config[key2]=value2&reusing[key3]=value3
  208. func getConfigFromQuery(c *gin.Context) (map[string]string, map[string]string) {
  209. initConfig := make(map[string]string)
  210. reusingConfig := make(map[string]string)
  211. queryParams := c.Request.URL.Query()
  212. for paramName, paramValues := range queryParams {
  213. if len(paramValues) == 0 {
  214. continue
  215. }
  216. paramValue := paramValues[0]
  217. if strings.HasPrefix(paramName, "config[") && strings.HasSuffix(paramName, "]") {
  218. key := paramName[7 : len(paramName)-1]
  219. if key != "" {
  220. initConfig[key] = paramValue
  221. }
  222. }
  223. if strings.HasPrefix(paramName, "reusing[") && strings.HasSuffix(paramName, "]") {
  224. key := paramName[8 : len(paramName)-1]
  225. if key != "" {
  226. reusingConfig[key] = paramValue
  227. }
  228. }
  229. }
  230. return initConfig, reusingConfig
  231. }
  232. // TestEmbedMCPSseServer godoc
  233. //
  234. // @Summary Test Embed MCP SSE Server
  235. // @Description Test Embed MCP SSE Server
  236. // @Tags embedmcp
  237. // @Security ApiKeyAuth
  238. // @Param id path string true "MCP ID"
  239. // @Param config[key] query string false "Initial configuration parameters (e.g.,
  240. //
  241. // config[host]=http://localhost:3000)" @Param reusing[key] query string false "Reusing
  242. // configuration parameters (e.g., reusing[authorization]=apikey)"
  243. //
  244. // @Success 200 {object} nil
  245. // @Failure 400 {object} nil
  246. // @Router /api/test-embedmcp/{id}/sse [get]
  247. func TestEmbedMCPSseServer(c *gin.Context) {
  248. id := c.Param("id")
  249. if id == "" {
  250. c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
  251. mcp.NewRequestId(nil),
  252. mcp.INVALID_REQUEST,
  253. "mcp id is required",
  254. ))
  255. return
  256. }
  257. initConfig, reusingConfig := getConfigFromQuery(c)
  258. emcp, err := mcpservers.GetMCPServer(id, initConfig, reusingConfig)
  259. if err != nil {
  260. middleware.ErrorResponse(c, http.StatusBadRequest, err.Error())
  261. return
  262. }
  263. handleTestEmbedMCPServer(c, emcp)
  264. }
  265. const (
  266. testEmbedMcpType = "test-embedmcp"
  267. )
  268. func handleTestEmbedMCPServer(c *gin.Context, s *server.MCPServer) {
  269. token := middleware.GetToken(c)
  270. // Store the session
  271. store := getStore()
  272. newSession := store.New()
  273. newEndpoint := newTestEmbedMcpEndpoint(token.Key).NewEndpoint(newSession)
  274. server := statelessmcp.NewSSEServer(
  275. s,
  276. statelessmcp.WithMessageEndpoint(newEndpoint),
  277. )
  278. store.Set(newSession, testEmbedMcpType)
  279. defer func() {
  280. store.Delete(newSession)
  281. }()
  282. ctx, cancel := context.WithCancel(c.Request.Context())
  283. defer cancel()
  284. // Start message processing goroutine
  285. go processMCPSseMpscMessages(ctx, newSession, server)
  286. // Handle SSE connection
  287. server.HandleSSE(c.Writer, c.Request)
  288. }
  289. // TestEmbedMCPMessage godoc
  290. //
  291. // @Summary Test Embed MCP Message
  292. // @Description Send a message to the test embed MCP server
  293. // @Tags embedmcp
  294. // @Security ApiKeyAuth
  295. // @Param sessionId query string true "Session ID"
  296. // @Accept json
  297. // @Produce json
  298. // @Success 200 {object} nil
  299. // @Failure 400 {object} nil
  300. // @Router /api/test-embedmcp/message [post]
  301. func TestEmbedMCPMessage(c *gin.Context) {
  302. sessionID, _ := c.GetQuery("sessionId")
  303. if sessionID == "" {
  304. c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
  305. mcp.NewRequestId(nil),
  306. mcp.INVALID_REQUEST,
  307. "missing sessionId",
  308. ))
  309. return
  310. }
  311. sendMCPSSEMessage(c, testEmbedMcpType, sessionID)
  312. }
  313. // TestEmbedMCPStreamable godoc
  314. //
  315. // @Summary Test Embed MCP Streamable Server
  316. // @Description Test Embed MCP Streamable Server with various HTTP methods
  317. // @Tags embedmcp
  318. // @Security ApiKeyAuth
  319. // @Param id path string true "MCP ID"
  320. // @Param config[key] query string false "Initial configuration parameters (e.g.,
  321. //
  322. // config[host]=http://localhost:3000)" @Param reusing[key] query string false "Reusing
  323. // configuration parameters (e.g., reusing[authorization]=apikey)"
  324. //
  325. // @Accept json
  326. // @Produce json
  327. // @Success 200 {object} nil
  328. // @Failure 400 {object} nil
  329. // @Router /api/test-embedmcp/{id}/streamable [get]
  330. // @Router /api/test-embedmcp/{id}/streamable [post]
  331. // @Router /api/test-embedmcp/{id}/streamable [delete]
  332. func TestEmbedMCPStreamable(c *gin.Context) {
  333. id := c.Param("id")
  334. if id == "" {
  335. c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
  336. mcp.NewRequestId(nil),
  337. mcp.INVALID_REQUEST,
  338. "mcp id is required",
  339. ))
  340. return
  341. }
  342. initConfig, reusingConfig := getConfigFromQuery(c)
  343. server, err := mcpservers.GetMCPServer(id, initConfig, reusingConfig)
  344. if err != nil {
  345. c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
  346. mcp.NewRequestId(nil),
  347. mcp.INVALID_REQUEST,
  348. err.Error(),
  349. ))
  350. return
  351. }
  352. handleGroupStreamableMCPServer(c, server)
  353. }