groupmcp-server.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package controller
  2. import (
  3. "maps"
  4. "net/http"
  5. "net/url"
  6. "github.com/gin-gonic/gin"
  7. "github.com/labring/aiproxy/core/mcpproxy"
  8. "github.com/labring/aiproxy/core/middleware"
  9. "github.com/labring/aiproxy/core/model"
  10. mcpservers "github.com/labring/aiproxy/mcp-servers"
  11. "github.com/mark3labs/mcp-go/client/transport"
  12. "github.com/mark3labs/mcp-go/mcp"
  13. )
  14. // GroupMCPSSEServer godoc
  15. //
  16. // @Summary Group MCP SSE Server
  17. // @Security ApiKeyAuth
  18. // @Router /mcp/group/{id}/sse [get]
  19. func GroupMCPSSEServer(c *gin.Context) {
  20. mcpID := c.Param("id")
  21. if mcpID == "" {
  22. http.Error(c.Writer, "mcp id is required", http.StatusBadRequest)
  23. return
  24. }
  25. group := middleware.GetGroup(c)
  26. groupMcp, err := model.CacheGetGroupMCP(group.ID, mcpID)
  27. if err != nil {
  28. http.Error(c.Writer, err.Error(), http.StatusNotFound)
  29. return
  30. }
  31. if groupMcp.Status != model.GroupMCPStatusEnabled {
  32. http.Error(c.Writer, "mcp is not enabled", http.StatusNotFound)
  33. return
  34. }
  35. handleGroupSSEMCPServer(c, groupMcp, sseEndpoint)
  36. }
  37. func handleGroupSSEMCPServer(
  38. c *gin.Context,
  39. groupMcp *model.GroupMCPCache,
  40. endpoint EndpointProvider,
  41. ) {
  42. switch groupMcp.Type {
  43. case model.GroupMCPTypeProxySSE:
  44. client, err := transport.NewSSE(
  45. groupMcp.ProxyConfig.URL,
  46. transport.WithHeaders(groupMcp.ProxyConfig.Headers),
  47. )
  48. if err != nil {
  49. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  50. return
  51. }
  52. err = client.Start(c.Request.Context())
  53. if err != nil {
  54. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  55. return
  56. }
  57. defer client.Close()
  58. handleSSEMCPServer(c,
  59. mcpservers.WrapMCPClient2Server(client),
  60. string(model.GroupMCPTypeProxySSE),
  61. endpoint,
  62. )
  63. case model.GroupMCPTypeProxyStreamable:
  64. client, err := transport.NewStreamableHTTP(
  65. groupMcp.ProxyConfig.URL,
  66. transport.WithHTTPHeaders(groupMcp.ProxyConfig.Headers),
  67. )
  68. if err != nil {
  69. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  70. return
  71. }
  72. err = client.Start(c.Request.Context())
  73. if err != nil {
  74. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  75. return
  76. }
  77. defer client.Close()
  78. handleSSEMCPServer(
  79. c,
  80. mcpservers.WrapMCPClient2Server(client),
  81. string(model.GroupMCPTypeProxyStreamable),
  82. endpoint,
  83. )
  84. case model.GroupMCPTypeOpenAPI:
  85. server, err := newOpenAPIMCPServer(groupMcp.OpenAPIConfig)
  86. if err != nil {
  87. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  88. return
  89. }
  90. handleSSEMCPServer(c, server, string(model.GroupMCPTypeOpenAPI), endpoint)
  91. default:
  92. http.Error(c.Writer, "unsupported mcp type", http.StatusBadRequest)
  93. }
  94. }
  95. // GroupMCPStreamable godoc
  96. //
  97. // @Summary Group MCP Streamable Server
  98. // @Security ApiKeyAuth
  99. // @Router /mcp/group/{id} [get]
  100. // @Router /mcp/group/{id} [post]
  101. // @Router /mcp/group/{id} [delete]
  102. func GroupMCPStreamable(c *gin.Context) {
  103. mcpID := c.Param("id")
  104. if mcpID == "" {
  105. c.JSON(http.StatusBadRequest, mcpservers.CreateMCPErrorResponse(
  106. mcp.NewRequestId(nil),
  107. mcp.INVALID_REQUEST,
  108. "mcp id is required",
  109. ))
  110. return
  111. }
  112. group := middleware.GetGroup(c)
  113. groupMcp, err := model.CacheGetGroupMCP(group.ID, mcpID)
  114. if err != nil {
  115. c.JSON(http.StatusNotFound, mcpservers.CreateMCPErrorResponse(
  116. mcp.NewRequestId(nil),
  117. mcp.INVALID_REQUEST,
  118. err.Error(),
  119. ))
  120. return
  121. }
  122. if groupMcp.Status != model.GroupMCPStatusEnabled {
  123. c.JSON(http.StatusNotFound, mcpservers.CreateMCPErrorResponse(
  124. mcp.NewRequestId(nil),
  125. mcp.INVALID_REQUEST,
  126. "mcp is not enabled",
  127. ))
  128. return
  129. }
  130. handleGroupStreamable(c, groupMcp)
  131. }
  132. // handleGroupProxyStreamable processes Streamable proxy requests for group
  133. func handleGroupProxyStreamable(c *gin.Context, config *model.GroupMCPProxyConfig) {
  134. if config == nil || config.URL == "" {
  135. return
  136. }
  137. backendURL, err := url.Parse(config.URL)
  138. if err != nil {
  139. c.JSON(http.StatusBadRequest, mcpservers.CreateMCPErrorResponse(
  140. mcp.NewRequestId(nil),
  141. mcp.INVALID_REQUEST,
  142. err.Error(),
  143. ))
  144. return
  145. }
  146. headers := maps.Clone(config.Headers)
  147. backendQuery := backendURL.Query()
  148. for k, v := range config.Querys {
  149. backendQuery.Set(k, v)
  150. }
  151. backendURL.RawQuery = backendQuery.Encode()
  152. mcpproxy.NewStreamableProxy(backendURL.String(), headers, getStore()).
  153. ServeHTTP(c.Writer, c.Request)
  154. }