groupmcp-server.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package controller
  2. import (
  3. "net/http"
  4. "net/url"
  5. "github.com/gin-gonic/gin"
  6. "github.com/labring/aiproxy/core/mcpproxy"
  7. "github.com/labring/aiproxy/core/middleware"
  8. "github.com/labring/aiproxy/core/model"
  9. mcpservers "github.com/labring/aiproxy/mcp-servers"
  10. "github.com/mark3labs/mcp-go/client/transport"
  11. "github.com/mark3labs/mcp-go/mcp"
  12. )
  13. // GroupMCPSSEServer godoc
  14. //
  15. // @Summary Group MCP SSE Server
  16. // @Security ApiKeyAuth
  17. // @Router /mcp/group/{id}/sse [get]
  18. func GroupMCPSSEServer(c *gin.Context) {
  19. mcpID := c.Param("id")
  20. if mcpID == "" {
  21. http.Error(c.Writer, "mcp id is required", http.StatusBadRequest)
  22. return
  23. }
  24. group := middleware.GetGroup(c)
  25. groupMcp, err := model.CacheGetGroupMCP(group.ID, mcpID)
  26. if err != nil {
  27. http.Error(c.Writer, err.Error(), http.StatusNotFound)
  28. return
  29. }
  30. if groupMcp.Status != model.GroupMCPStatusEnabled {
  31. http.Error(c.Writer, "mcp is not enabled", http.StatusNotFound)
  32. return
  33. }
  34. handleGroupSSEMCPServer(c, groupMcp, sseEndpoint)
  35. }
  36. func handleGroupSSEMCPServer(
  37. c *gin.Context,
  38. groupMcp *model.GroupMCPCache,
  39. endpoint EndpointProvider,
  40. ) {
  41. switch groupMcp.Type {
  42. case model.GroupMCPTypeProxySSE:
  43. client, err := transport.NewSSE(
  44. groupMcp.ProxyConfig.URL,
  45. transport.WithHeaders(groupMcp.ProxyConfig.Headers),
  46. )
  47. if err != nil {
  48. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  49. return
  50. }
  51. err = client.Start(c.Request.Context())
  52. if err != nil {
  53. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  54. return
  55. }
  56. defer client.Close()
  57. handleSSEMCPServer(c,
  58. mcpservers.WrapMCPClient2Server(client),
  59. string(model.GroupMCPTypeProxySSE),
  60. endpoint,
  61. )
  62. case model.GroupMCPTypeProxyStreamable:
  63. client, err := transport.NewStreamableHTTP(
  64. groupMcp.ProxyConfig.URL,
  65. transport.WithHTTPHeaders(groupMcp.ProxyConfig.Headers),
  66. )
  67. if err != nil {
  68. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  69. return
  70. }
  71. err = client.Start(c.Request.Context())
  72. if err != nil {
  73. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  74. return
  75. }
  76. defer client.Close()
  77. handleSSEMCPServer(
  78. c,
  79. mcpservers.WrapMCPClient2Server(client),
  80. string(model.GroupMCPTypeProxyStreamable),
  81. endpoint,
  82. )
  83. case model.GroupMCPTypeOpenAPI:
  84. server, err := newOpenAPIMCPServer(groupMcp.OpenAPIConfig)
  85. if err != nil {
  86. http.Error(c.Writer, err.Error(), http.StatusBadRequest)
  87. return
  88. }
  89. handleSSEMCPServer(c, server, string(model.GroupMCPTypeOpenAPI), endpoint)
  90. default:
  91. http.Error(c.Writer, "unsupported mcp type", http.StatusBadRequest)
  92. }
  93. }
  94. // GroupMCPStreamable godoc
  95. //
  96. // @Summary Group MCP Streamable Server
  97. // @Security ApiKeyAuth
  98. // @Router /mcp/group/{id} [get]
  99. // @Router /mcp/group/{id} [post]
  100. // @Router /mcp/group/{id} [delete]
  101. func GroupMCPStreamable(c *gin.Context) {
  102. mcpID := c.Param("id")
  103. if mcpID == "" {
  104. c.JSON(http.StatusBadRequest, mcpservers.CreateMCPErrorResponse(
  105. mcp.NewRequestId(nil),
  106. mcp.INVALID_REQUEST,
  107. "mcp id is required",
  108. ))
  109. return
  110. }
  111. group := middleware.GetGroup(c)
  112. groupMcp, err := model.CacheGetGroupMCP(group.ID, mcpID)
  113. if err != nil {
  114. c.JSON(http.StatusNotFound, mcpservers.CreateMCPErrorResponse(
  115. mcp.NewRequestId(nil),
  116. mcp.INVALID_REQUEST,
  117. err.Error(),
  118. ))
  119. return
  120. }
  121. if groupMcp.Status != model.GroupMCPStatusEnabled {
  122. c.JSON(http.StatusNotFound, mcpservers.CreateMCPErrorResponse(
  123. mcp.NewRequestId(nil),
  124. mcp.INVALID_REQUEST,
  125. "mcp is not enabled",
  126. ))
  127. return
  128. }
  129. handleGroupStreamable(c, groupMcp)
  130. }
  131. // handleGroupProxyStreamable processes Streamable proxy requests for group
  132. func handleGroupProxyStreamable(c *gin.Context, config *model.GroupMCPProxyConfig) {
  133. if config == nil || config.URL == "" {
  134. return
  135. }
  136. backendURL, err := url.Parse(config.URL)
  137. if err != nil {
  138. c.JSON(http.StatusBadRequest, mcpservers.CreateMCPErrorResponse(
  139. mcp.NewRequestId(nil),
  140. mcp.INVALID_REQUEST,
  141. err.Error(),
  142. ))
  143. return
  144. }
  145. headers := make(map[string]string)
  146. backendQuery := backendURL.Query()
  147. for k, v := range config.Headers {
  148. headers[k] = v
  149. }
  150. for k, v := range config.Querys {
  151. backendQuery.Set(k, v)
  152. }
  153. backendURL.RawQuery = backendQuery.Encode()
  154. mcpproxy.NewStreamableProxy(backendURL.String(), headers, getStore()).
  155. ServeHTTP(c.Writer, c.Request)
  156. }