Browse Source

feat: sse mcp backend support proxy to streamable http (#227)

zijiren 7 months ago
parent
commit
9e44a5cc92

+ 114 - 0
core/common/mcpproxy/server.go

@@ -0,0 +1,114 @@
+package mcpproxy
+
+import (
+	"context"
+	"encoding/json"
+
+	"github.com/bytedance/sonic"
+	"github.com/mark3labs/mcp-go/client/transport"
+	"github.com/mark3labs/mcp-go/mcp"
+)
+
+type MCPServer interface {
+	HandleMessage(ctx context.Context, message json.RawMessage) mcp.JSONRPCMessage
+}
+
+type mcpClient2Server struct {
+	client transport.Interface
+}
+
+func (s *mcpClient2Server) HandleMessage(
+	ctx context.Context,
+	message json.RawMessage,
+) mcp.JSONRPCMessage {
+	methodNode, err := sonic.Get(message, "method")
+	if err != nil {
+		return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
+	}
+	method, err := methodNode.String()
+	if err != nil {
+		return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
+	}
+
+	switch method {
+	case "notifications/initialized":
+		req := mcp.JSONRPCNotification{}
+		err := sonic.Unmarshal(message, &req)
+		if err != nil {
+			return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
+		}
+		err = s.client.SendNotification(ctx, req)
+		if err != nil {
+			return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
+		}
+		return nil
+	default:
+		req := transport.JSONRPCRequest{}
+		err := sonic.Unmarshal(message, &req)
+		if err != nil {
+			return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
+		}
+		resp, err := s.client.SendRequest(ctx, req)
+		if err != nil {
+			return CreateMCPErrorResponse(nil, mcp.INTERNAL_ERROR, err.Error())
+		}
+		if resp.Error != nil {
+			return CreateMCPErrorResponse(
+				resp.ID,
+				resp.Error.Code,
+				resp.Error.Message,
+				resp.Error.Data,
+			)
+		}
+		return CreateMCPResultResponse(
+			resp.ID,
+			resp.Result,
+		)
+	}
+}
+
+func WrapMCPClient2Server(client transport.Interface) MCPServer {
+	return &mcpClient2Server{client: client}
+}
+
+type JSONRPCNoErrorResponse struct {
+	JSONRPC string          `json:"jsonrpc"`
+	ID      mcp.RequestId   `json:"id"`
+	Result  json.RawMessage `json:"result"`
+}
+
+func CreateMCPResultResponse(
+	id any,
+	result json.RawMessage,
+) mcp.JSONRPCMessage {
+	return &JSONRPCNoErrorResponse{
+		JSONRPC: mcp.JSONRPC_VERSION,
+		ID:      mcp.NewRequestId(id),
+		Result:  result,
+	}
+}
+
+func CreateMCPErrorResponse(
+	id any,
+	code int,
+	message string,
+	data ...any,
+) mcp.JSONRPCMessage {
+	var d any
+	if len(data) > 0 {
+		d = data[0]
+	}
+	return mcp.JSONRPCError{
+		JSONRPC: mcp.JSONRPC_VERSION,
+		ID:      mcp.NewRequestId(id),
+		Error: struct {
+			Code    int    `json:"code"`
+			Message string `json:"message"`
+			Data    any    `json:"data,omitempty"`
+		}{
+			Code:    code,
+			Message: message,
+			Data:    d,
+		},
+	}
+}

+ 1 - 12
core/common/mcpproxy/sse.go

@@ -2,7 +2,6 @@ package mcpproxy
 
 import (
 	"context"
-	"encoding/json"
 	"errors"
 	"fmt"
 	"net/http"
@@ -12,10 +11,6 @@ import (
 	"github.com/mark3labs/mcp-go/mcp"
 )
 
-type MCPServer interface {
-	HandleMessage(ctx context.Context, message json.RawMessage) mcp.JSONRPCMessage
-}
-
 // SSEServer implements a Server-Sent Events (SSE) based MCP server.
 // It provides real-time communication capabilities over HTTP using the SSE protocol.
 type SSEServer struct {
@@ -138,14 +133,8 @@ func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 // handleMessage processes incoming JSON-RPC messages from clients and sends responses
 // back through both the SSE connection and HTTP response.
 func (s *SSEServer) HandleMessage(ctx context.Context, req []byte) error {
-	// Parse message as raw JSON
-	var rawMessage json.RawMessage
-	if err := sonic.Unmarshal(req, &rawMessage); err != nil {
-		return err
-	}
-
 	// Process message through MCPServer
-	response := s.server.HandleMessage(ctx, rawMessage)
+	response := s.server.HandleMessage(ctx, req)
 
 	// Only send response if there is one (not for notifications)
 	if response != nil {

+ 108 - 0
core/common/mcpproxy/stateless-streamable.go

@@ -0,0 +1,108 @@
+package mcpproxy
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+
+	"github.com/bytedance/sonic"
+	"github.com/mark3labs/mcp-go/mcp"
+)
+
+type StreamableHTTPOption func(*StreamableHTTPServer)
+
+type StreamableHTTPServer struct {
+	server MCPServer
+}
+
+// NewStatelessStreamableHTTPServer creates a new streamable-http server instance
+func NewStatelessStreamableHTTPServer(
+	server MCPServer,
+	opts ...StreamableHTTPOption,
+) *StreamableHTTPServer {
+	s := &StreamableHTTPServer{
+		server: server,
+	}
+
+	for _, opt := range opts {
+		opt(s)
+	}
+	return s
+}
+
+// ServeHTTP implements the http.Handler interface.
+func (s *StreamableHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	switch r.Method {
+	case http.MethodPost:
+		s.handlePost(w, r)
+	case http.MethodGet:
+		s.handleGet(w, r)
+	case http.MethodDelete:
+		s.handleDelete(w, r)
+	default:
+		http.NotFound(w, r)
+	}
+}
+
+func (s *StreamableHTTPServer) handlePost(w http.ResponseWriter, r *http.Request) {
+	// post request carry request/notification message
+
+	// Check content type
+	contentType := r.Header.Get("Content-Type")
+	if contentType != "application/json" {
+		http.Error(w, "Invalid content type: must be 'application/json'", http.StatusBadRequest)
+		return
+	}
+
+	// Check the request body is valid json, meanwhile, get the request Method
+	rawData, err := io.ReadAll(r.Body)
+	if err != nil {
+		s.writeJSONRPCError(
+			w,
+			nil,
+			mcp.PARSE_ERROR,
+			fmt.Sprintf("read request body error: %v", err),
+		)
+		return
+	}
+	var baseMessage struct {
+		Method mcp.MCPMethod `json:"method"`
+	}
+	if err := json.Unmarshal(rawData, &baseMessage); err != nil {
+		s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "request body is not valid json")
+		return
+	}
+
+	// Process message through MCPServer
+	response := s.server.HandleMessage(r.Context(), rawData)
+	if response == nil {
+		// For notifications, just send 202 Accepted with no body
+		w.WriteHeader(http.StatusAccepted)
+		return
+	}
+
+	w.Header().Set("Content-Type", "application/json")
+	w.WriteHeader(http.StatusOK)
+	_ = sonic.ConfigDefault.NewEncoder(w).Encode(response)
+}
+
+func (s *StreamableHTTPServer) handleGet(w http.ResponseWriter, _ *http.Request) {
+	http.Error(w, "get request is not supported", http.StatusMethodNotAllowed)
+}
+
+func (s *StreamableHTTPServer) handleDelete(w http.ResponseWriter, _ *http.Request) {
+	http.Error(w, "delete request is not supported", http.StatusMethodNotAllowed)
+}
+
+func (s *StreamableHTTPServer) writeJSONRPCError(
+	w http.ResponseWriter,
+	id any,
+	code int,
+	message string,
+) {
+	response := CreateMCPErrorResponse(id, code, message)
+	w.Header().Set("Content-Type", "application/json")
+	w.WriteHeader(http.StatusBadRequest)
+	_ = sonic.ConfigDefault.NewEncoder(w).Encode(response)
+}

+ 14 - 68
core/common/mcpproxy/streamable.go

@@ -10,6 +10,10 @@ import (
 	"time"
 )
 
+const (
+	headerKeySessionID = "Mcp-Session-Id"
+)
+
 // StreamableProxy represents a proxy for the MCP Streamable HTTP transport
 type StreamableProxy struct {
 	store   SessionManager
@@ -66,7 +70,7 @@ func (p *StreamableProxy) handleGetRequest(w http.ResponseWriter, r *http.Reques
 	}
 
 	// Get proxy session ID from header
-	proxySessionID := r.Header.Get("Mcp-Session-Id")
+	proxySessionID := r.Header.Get(headerKeySessionID)
 	if proxySessionID == "" {
 		// This might be an initialization request
 		p.proxyInitialOrNoSessionRequest(w, r)
@@ -87,20 +91,10 @@ func (p *StreamableProxy) handleGetRequest(w http.ResponseWriter, r *http.Reques
 		return
 	}
 
-	// Copy headers from original request, but replace the session ID
-	for name, values := range r.Header {
-		if name == "Mcp-Session-Id" {
-			continue // Skip the proxy session ID
-		}
-		for _, value := range values {
-			req.Header.Add(name, value)
-		}
-	}
-
 	// Extract the real backend session ID from the stored URL
 	parts := strings.Split(backendInfo, "?sessionId=")
 	if len(parts) > 1 {
-		req.Header.Set("Mcp-Session-Id", parts[1])
+		req.Header.Set(headerKeySessionID, parts[1])
 	}
 
 	// Add any additional headers
@@ -119,17 +113,8 @@ func (p *StreamableProxy) handleGetRequest(w http.ResponseWriter, r *http.Reques
 	// Check if we got an SSE response
 	if resp.StatusCode != http.StatusOK ||
 		!strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") {
-		// Copy response headers, but not the backend session ID
-		for name, values := range resp.Header {
-			if name == "Mcp-Session-Id" {
-				continue
-			}
-			for _, value := range values {
-				w.Header().Add(name, value)
-			}
-		}
 		// Add our proxy session ID
-		w.Header().Set("Mcp-Session-Id", proxySessionID)
+		w.Header().Set(headerKeySessionID, proxySessionID)
 
 		w.WriteHeader(resp.StatusCode)
 		_, _ = io.Copy(w, resp.Body)
@@ -177,7 +162,7 @@ func (p *StreamableProxy) handleGetRequest(w http.ResponseWriter, r *http.Reques
 // handlePostRequest handles POST requests for JSON-RPC messages
 func (p *StreamableProxy) handlePostRequest(w http.ResponseWriter, r *http.Request) {
 	// Check if this is an initialization request
-	proxySessionID := r.Header.Get("Mcp-Session-Id")
+	proxySessionID := r.Header.Get(headerKeySessionID)
 	if proxySessionID == "" {
 		p.proxyInitialOrNoSessionRequest(w, r)
 		return
@@ -197,20 +182,10 @@ func (p *StreamableProxy) handlePostRequest(w http.ResponseWriter, r *http.Reque
 		return
 	}
 
-	// Copy headers from original request, but replace the session ID
-	for name, values := range r.Header {
-		if name == "Mcp-Session-Id" {
-			continue // Skip the proxy session ID
-		}
-		for _, value := range values {
-			req.Header.Add(name, value)
-		}
-	}
-
 	// Extract the real backend session ID from the stored URL
 	parts := strings.Split(backendInfo, "?sessionId=")
 	if len(parts) > 1 {
-		req.Header.Set("Mcp-Session-Id", parts[1])
+		req.Header.Set(headerKeySessionID, parts[1])
 	}
 
 	// Add any additional headers
@@ -226,17 +201,8 @@ func (p *StreamableProxy) handlePostRequest(w http.ResponseWriter, r *http.Reque
 	}
 	defer resp.Body.Close()
 
-	// Copy response headers, but not the backend session ID
-	for name, values := range resp.Header {
-		if name == "Mcp-Session-Id" {
-			continue
-		}
-		for _, value := range values {
-			w.Header().Add(name, value)
-		}
-	}
 	// Add our proxy session ID
-	w.Header().Set("Mcp-Session-Id", proxySessionID)
+	w.Header().Set(headerKeySessionID, proxySessionID)
 
 	contentType := resp.Header.Get("Content-Type")
 
@@ -287,7 +253,7 @@ func (p *StreamableProxy) handlePostRequest(w http.ResponseWriter, r *http.Reque
 // handleDeleteRequest handles DELETE requests for session termination
 func (p *StreamableProxy) handleDeleteRequest(w http.ResponseWriter, r *http.Request) {
 	// Get proxy session ID from header
-	proxySessionID := r.Header.Get("Mcp-Session-Id")
+	proxySessionID := r.Header.Get(headerKeySessionID)
 	if proxySessionID == "" {
 		http.Error(w, "Missing session ID", http.StatusBadRequest)
 		return
@@ -307,20 +273,10 @@ func (p *StreamableProxy) handleDeleteRequest(w http.ResponseWriter, r *http.Req
 		return
 	}
 
-	// Copy headers from original request, but replace the session ID
-	for name, values := range r.Header {
-		if name == "Mcp-Session-Id" {
-			continue // Skip the proxy session ID
-		}
-		for _, value := range values {
-			req.Header.Add(name, value)
-		}
-	}
-
 	// Extract the real backend session ID from the stored URL
 	parts := strings.Split(backendInfo, "?sessionId=")
 	if len(parts) > 1 {
-		req.Header.Set("Mcp-Session-Id", parts[1])
+		req.Header.Set(headerKeySessionID, parts[1])
 	}
 
 	// Add any additional headers
@@ -343,16 +299,6 @@ func (p *StreamableProxy) handleDeleteRequest(w http.ResponseWriter, r *http.Req
 	// Remove the session from our store
 	p.store.Delete(proxySessionID)
 
-	// Copy response headers, but not the backend session ID
-	for name, values := range resp.Header {
-		if name == "Mcp-Session-Id" {
-			continue
-		}
-		for _, value := range values {
-			w.Header().Add(name, value)
-		}
-	}
-
 	contentType := resp.Header.Get("Content-Type")
 	w.Header().Set("Content-Type", contentType)
 
@@ -386,7 +332,7 @@ func (p *StreamableProxy) proxyInitialOrNoSessionRequest(w http.ResponseWriter,
 	defer resp.Body.Close()
 
 	// Check if we received a session ID from the backend
-	backendSessionID := resp.Header.Get("Mcp-Session-Id")
+	backendSessionID := resp.Header.Get(headerKeySessionID)
 	if backendSessionID != "" {
 		// Generate a new proxy session ID
 		proxySessionID := p.store.New()
@@ -402,7 +348,7 @@ func (p *StreamableProxy) proxyInitialOrNoSessionRequest(w http.ResponseWriter,
 		p.store.Set(proxySessionID, backendURL)
 
 		// Replace the backend session ID with our proxy session ID in the response
-		w.Header().Set("Mcp-Session-Id", proxySessionID)
+		w.Header().Set(headerKeySessionID, proxySessionID)
 	}
 
 	contentType := resp.Header.Get("Content-Type")

+ 2 - 2
core/controller/embedmcp.go

@@ -361,7 +361,7 @@ func TestEmbedMCPMessage(c *gin.Context) {
 func TestEmbedMCPStreamable(c *gin.Context) {
 	id := c.Param("id")
 	if id == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"mcp id is required",
@@ -372,7 +372,7 @@ func TestEmbedMCPStreamable(c *gin.Context) {
 	initConfig, reusingConfig := getConfigFromQuery(c)
 	server, err := mcpservers.GetMCPServer(id, initConfig, reusingConfig)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),

+ 15 - 8
core/controller/groupmcp-server.go

@@ -79,7 +79,10 @@ func GroupMCPSseServer(c *gin.Context) {
 			return
 		}
 		defer client.Close()
-		handleGroupMCPServer(c, wrapMCPClient2Server(client), model.GroupMCPTypeProxySSE)
+		handleGroupMCPServer(c,
+			mcpproxy.WrapMCPClient2Server(client),
+			model.GroupMCPTypeProxySSE,
+		)
 	case model.GroupMCPTypeProxyStreamable:
 		client, err := transport.NewStreamableHTTP(
 			groupMcp.ProxyConfig.URL,
@@ -95,7 +98,11 @@ func GroupMCPSseServer(c *gin.Context) {
 			return
 		}
 		defer client.Close()
-		handleGroupMCPServer(c, wrapMCPClient2Server(client), model.GroupMCPTypeProxyStreamable)
+		handleGroupMCPServer(
+			c,
+			mcpproxy.WrapMCPClient2Server(client),
+			model.GroupMCPTypeProxyStreamable,
+		)
 	case model.GroupMCPTypeOpenAPI:
 		server, err := newOpenAPIMCPServer(groupMcp.OpenAPIConfig)
 		if err != nil {
@@ -178,7 +185,7 @@ func GroupMCPMessage(c *gin.Context) {
 func GroupMCPStreamable(c *gin.Context) {
 	id := c.Param("id")
 	if id == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"mcp id is required",
@@ -190,7 +197,7 @@ func GroupMCPStreamable(c *gin.Context) {
 
 	groupMcp, err := model.CacheGetGroupMCP(group.ID, id)
 	if err != nil {
-		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
+		c.JSON(http.StatusNotFound, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),
@@ -198,7 +205,7 @@ func GroupMCPStreamable(c *gin.Context) {
 		return
 	}
 	if groupMcp.Status != model.GroupMCPStatusEnabled {
-		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
+		c.JSON(http.StatusNotFound, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"mcp is not enabled",
@@ -212,7 +219,7 @@ func GroupMCPStreamable(c *gin.Context) {
 	case model.GroupMCPTypeOpenAPI:
 		server, err := newOpenAPIMCPServer(groupMcp.OpenAPIConfig)
 		if err != nil {
-			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 				mcp.NewRequestId(nil),
 				mcp.INVALID_REQUEST,
 				err.Error(),
@@ -221,7 +228,7 @@ func GroupMCPStreamable(c *gin.Context) {
 		}
 		handleStreamableMCPServer(c, server)
 	default:
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"unsupported mcp type",
@@ -237,7 +244,7 @@ func handleGroupProxyStreamable(c *gin.Context, config *model.GroupMCPProxyConfi
 
 	backendURL, err := url.Parse(config.URL)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),

+ 51 - 124
core/controller/publicmcp-server.go

@@ -2,7 +2,6 @@ package controller
 
 import (
 	"context"
-	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -12,7 +11,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/bytedance/sonic"
 	"github.com/gin-gonic/gin"
 	"github.com/labring/aiproxy/core/common"
 	"github.com/labring/aiproxy/core/common/mcpproxy"
@@ -122,92 +120,6 @@ func (r *redisStoreManager) Delete(session string) {
 	r.rdb.Del(ctx, "mcp:session:"+session)
 }
 
-type mcpClient2Server struct {
-	client transport.Interface
-}
-
-type JSONRPCNoErrorResponse struct {
-	JSONRPC string          `json:"jsonrpc"`
-	ID      mcp.RequestId   `json:"id"`
-	Result  json.RawMessage `json:"result"`
-}
-
-func handleError(err error) mcp.JSONRPCMessage {
-	return mcp.JSONRPCError{
-		JSONRPC: mcp.JSONRPC_VERSION,
-		ID:      mcp.NewRequestId(nil),
-		Error: struct {
-			Code    int    `json:"code"`
-			Message string `json:"message"`
-			Data    any    `json:"data,omitempty"`
-		}{
-			Code:    mcp.INTERNAL_ERROR,
-			Message: err.Error(),
-		},
-	}
-}
-
-func (s *mcpClient2Server) HandleMessage(
-	ctx context.Context,
-	message json.RawMessage,
-) mcp.JSONRPCMessage {
-	methodNode, err := sonic.Get(message, "method")
-	if err != nil {
-		return handleError(err)
-	}
-	method, err := methodNode.String()
-	if err != nil {
-		return handleError(err)
-	}
-
-	switch method {
-	case "notifications/initialized":
-		req := mcp.JSONRPCNotification{}
-		err := sonic.Unmarshal(message, &req)
-		if err != nil {
-			return handleError(err)
-		}
-		err = s.client.SendNotification(ctx, req)
-		if err != nil {
-			return handleError(err)
-		}
-		return nil
-	default:
-		req := transport.JSONRPCRequest{}
-		err := sonic.Unmarshal(message, &req)
-		if err != nil {
-			return handleError(err)
-		}
-		resp, err := s.client.SendRequest(ctx, req)
-		if err != nil {
-			return mcp.JSONRPCError{
-				JSONRPC: mcp.JSONRPC_VERSION,
-				ID:      mcp.NewRequestId(nil),
-				Error: struct {
-					Code    int    `json:"code"`
-					Message string `json:"message"`
-					Data    any    `json:"data,omitempty"`
-				}{
-					Code:    mcp.INTERNAL_ERROR,
-					Message: err.Error(),
-				},
-			}
-		}
-		if resp.Error != nil {
-			return resp
-		}
-		return &JSONRPCNoErrorResponse{
-			JSONRPC: resp.JSONRPC,
-			ID:      resp.ID,
-			Result:  resp.Result,
-		}
-	}
-}
-
-func wrapMCPClient2Server(client transport.Interface) mcpproxy.MCPServer {
-	return &mcpClient2Server{client: client}
-}
-
 // PublicMCPSseServer godoc
 //
 //	@Summary	Public MCP SSE Server
@@ -246,7 +158,11 @@ func PublicMCPSseServer(c *gin.Context) {
 			return
 		}
 		defer client.Close()
-		handleSSEMCPServer(c, wrapMCPClient2Server(client), model.PublicMCPTypeProxySSE)
+		handleSSEMCPServer(
+			c,
+			mcpproxy.WrapMCPClient2Server(client),
+			model.PublicMCPTypeProxySSE,
+		)
 	case model.PublicMCPTypeProxyStreamable:
 		client, err := transport.NewStreamableHTTP(
 			publicMcp.ProxyConfig.URL,
@@ -262,11 +178,15 @@ func PublicMCPSseServer(c *gin.Context) {
 			return
 		}
 		defer client.Close()
-		handleSSEMCPServer(c, wrapMCPClient2Server(client), model.PublicMCPTypeProxyStreamable)
+		handleSSEMCPServer(
+			c,
+			mcpproxy.WrapMCPClient2Server(client),
+			model.PublicMCPTypeProxyStreamable,
+		)
 	case model.PublicMCPTypeOpenAPI:
 		server, err := newOpenAPIMCPServer(publicMcp.OpenAPIConfig)
 		if err != nil {
-			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 				mcp.NewRequestId(nil),
 				mcp.INVALID_REQUEST,
 				err.Error(),
@@ -277,7 +197,7 @@ func PublicMCPSseServer(c *gin.Context) {
 	case model.PublicMCPTypeEmbed:
 		handlePublicEmbedMCP(c, publicMcp.ID, publicMcp.EmbedConfig)
 	default:
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"unknown mcp type",
@@ -292,7 +212,7 @@ func handlePublicEmbedMCP(c *gin.Context, mcpID string, config *model.MCPEmbeddi
 		group := middleware.GetGroup(c)
 		param, err := model.CacheGetPublicMCPReusingParam(mcpID, group.ID)
 		if err != nil {
-			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 				mcp.NewRequestId(nil),
 				mcp.INVALID_REQUEST,
 				err.Error(),
@@ -303,7 +223,7 @@ func handlePublicEmbedMCP(c *gin.Context, mcpID string, config *model.MCPEmbeddi
 	}
 	server, err := mcpservers.GetMCPServer(mcpID, config.Init, reusingConfig)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),
@@ -528,7 +448,7 @@ func PublicMCPStreamable(c *gin.Context) {
 	mcpID := c.Param("id")
 	publicMcp, err := model.CacheGetPublicMCP(mcpID)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),
@@ -536,7 +456,7 @@ func PublicMCPStreamable(c *gin.Context) {
 		return
 	}
 	if publicMcp.Status != model.PublicMCPStatusEnabled {
-		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
+		c.JSON(http.StatusNotFound, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"mcp is not enabled",
@@ -545,12 +465,38 @@ func PublicMCPStreamable(c *gin.Context) {
 	}
 
 	switch publicMcp.Type {
+	case model.PublicMCPTypeProxySSE:
+		client, err := transport.NewSSE(
+			publicMcp.ProxyConfig.URL,
+			transport.WithHeaders(publicMcp.ProxyConfig.Headers),
+		)
+		if err != nil {
+			c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
+				mcp.NewRequestId(nil),
+				mcp.INVALID_REQUEST,
+				err.Error(),
+			))
+			return
+		}
+		err = client.Start(c.Request.Context())
+		if err != nil {
+			c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
+				mcp.NewRequestId(nil),
+				mcp.INVALID_REQUEST,
+				err.Error(),
+			))
+			return
+		}
+		defer client.Close()
+		mcpproxy.NewStatelessStreamableHTTPServer(
+			mcpproxy.WrapMCPClient2Server(client),
+		).ServeHTTP(c.Writer, c.Request)
 	case model.PublicMCPTypeProxyStreamable:
 		handlePublicProxyStreamable(c, mcpID, publicMcp.ProxyConfig)
 	case model.PublicMCPTypeOpenAPI:
 		server, err := newOpenAPIMCPServer(publicMcp.OpenAPIConfig)
 		if err != nil {
-			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 				mcp.NewRequestId(nil),
 				mcp.INVALID_REQUEST,
 				err.Error(),
@@ -561,7 +507,7 @@ func PublicMCPStreamable(c *gin.Context) {
 	case model.PublicMCPTypeEmbed:
 		handlePublicEmbedStreamable(c, mcpID, publicMcp.EmbedConfig)
 	default:
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"unknown mcp type",
@@ -575,7 +521,7 @@ func handlePublicEmbedStreamable(c *gin.Context, mcpID string, config *model.MCP
 		group := middleware.GetGroup(c)
 		param, err := model.CacheGetPublicMCPReusingParam(mcpID, group.ID)
 		if err != nil {
-			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 				mcp.NewRequestId(nil),
 				mcp.INVALID_REQUEST,
 				err.Error(),
@@ -586,7 +532,7 @@ func handlePublicEmbedStreamable(c *gin.Context, mcpID string, config *model.MCP
 	}
 	server, err := mcpservers.GetMCPServer(mcpID, config.Init, reusingConfig)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),
@@ -599,7 +545,7 @@ func handlePublicEmbedStreamable(c *gin.Context, mcpID string, config *model.MCP
 // handlePublicProxyStreamable processes Streamable proxy requests
 func handlePublicProxyStreamable(c *gin.Context, mcpID string, config *model.PublicMCPProxyConfig) {
 	if config == nil || config.URL == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			"invalid proxy configuration",
@@ -609,7 +555,7 @@ func handlePublicProxyStreamable(c *gin.Context, mcpID string, config *model.Pub
 
 	backendURL, err := url.Parse(config.URL)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),
@@ -623,7 +569,7 @@ func handlePublicProxyStreamable(c *gin.Context, mcpID string, config *model.Pub
 
 	// Process reusing parameters if any
 	if err := processReusingParams(config.ReusingParams, mcpID, group.ID, headers, &backendQuery); err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
 			err.Error(),
@@ -646,7 +592,7 @@ func handlePublicProxyStreamable(c *gin.Context, mcpID string, config *model.Pub
 // handleStreamableMCPServer handles the streamable connection for an MCP server
 func handleStreamableMCPServer(c *gin.Context, s *server.MCPServer) {
 	if c.Request.Method != http.MethodPost {
-		c.JSON(http.StatusMethodNotAllowed, CreateMCPErrorResponse(
+		c.JSON(http.StatusMethodNotAllowed, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.METHOD_NOT_FOUND,
 			"method not allowed",
@@ -655,7 +601,7 @@ func handleStreamableMCPServer(c *gin.Context, s *server.MCPServer) {
 	}
 	reqBody, err := io.ReadAll(c.Request.Body)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+		c.JSON(http.StatusBadRequest, mcpproxy.CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.PARSE_ERROR,
 			err.Error(),
@@ -831,22 +777,3 @@ func (r *redisMCPMPSC) recv(ctx context.Context, id string) ([]byte, error) {
 		}
 	}
 }
-
-func CreateMCPErrorResponse(
-	id mcp.RequestId,
-	code int,
-	message string,
-) mcp.JSONRPCMessage {
-	return mcp.JSONRPCError{
-		JSONRPC: mcp.JSONRPC_VERSION,
-		ID:      id,
-		Error: struct {
-			Code    int    `json:"code"`
-			Message string `json:"message"`
-			Data    any    `json:"data,omitempty"`
-		}{
-			Code:    code,
-			Message: message,
-		},
-	}
-}

+ 2 - 2
core/router/static.go

@@ -74,7 +74,7 @@ func checkNoRouteNotFound(path string) bool {
 func newIndexNoRouteHandler(fs http.FileSystem) func(ctx *gin.Context) {
 	return func(ctx *gin.Context) {
 		if checkNoRouteNotFound(ctx.Request.URL.Path) {
-			ctx.String(http.StatusNotFound, "404 page not found")
+			http.NotFound(ctx.Writer, ctx.Request)
 			return
 		}
 		ctx.FileFromFS("", fs)
@@ -85,7 +85,7 @@ func newDynamicNoRouteHandler(fs http.FileSystem) func(ctx *gin.Context) {
 	fileServer := http.StripPrefix("/", http.FileServer(fs))
 	return func(c *gin.Context) {
 		if checkNoRouteNotFound(c.Request.URL.Path) {
-			c.String(http.StatusNotFound, "404 page not found")
+			http.NotFound(c.Writer, c.Request)
 			return
 		}