Преглед изворни кода

feat: streamable http mcp support (#142)

* feat: streamable http mcp support

* feat: group streamable http mcp support

* chore: swag

* fix: cilint

* feat: mcp error message
zijiren пре 8 месеци
родитељ
комит
6ab2833177

+ 10 - 1
core/common/mcpproxy/session.go

@@ -1,9 +1,14 @@
 package mcpproxy
 
-import "sync"
+import (
+	"sync"
+
+	"github.com/labring/aiproxy/core/common"
+)
 
 // SessionManager defines the interface for managing session information
 type SessionManager interface {
+	New() (sessionID string)
 	// Set stores a sessionID and its corresponding backend endpoint
 	Set(sessionID, endpoint string)
 	// Get retrieves the backend endpoint for a sessionID
@@ -25,6 +30,10 @@ func NewMemStore() *MemStore {
 	}
 }
 
+func (s *MemStore) New() string {
+	return common.ShortUUID()
+}
+
 // Set stores a sessionID and its corresponding backend endpoint
 func (s *MemStore) Set(sessionID, endpoint string) {
 	s.mu.Lock()

+ 13 - 12
core/common/mcpproxy/sse.go

@@ -12,21 +12,21 @@ import (
 )
 
 type EndpointProvider interface {
-	NewEndpoint() (newSession string, newEndpoint string)
+	NewEndpoint(newSession string) (newEndpoint string)
 	LoadEndpoint(endpoint string) (session string)
 }
 
-// Proxy represents the proxy object that handles SSE and HTTP requests
-type Proxy struct {
+// SSEAProxy represents the proxy object that handles SSE and HTTP requests
+type SSEAProxy struct {
 	store    SessionManager
 	endpoint EndpointProvider
 	backend  string
 	headers  map[string]string
 }
 
-// NewProxy creates a new proxy with the given backend and endpoint handler
-func NewProxy(backend string, headers map[string]string, store SessionManager, endpoint EndpointProvider) *Proxy {
-	return &Proxy{
+// NewSSEProxy creates a new proxy with the given backend and endpoint handler
+func NewSSEProxy(backend string, headers map[string]string, store SessionManager, endpoint EndpointProvider) *SSEAProxy {
+	return &SSEAProxy{
 		store:    store,
 		endpoint: endpoint,
 		backend:  backend,
@@ -34,7 +34,7 @@ func NewProxy(backend string, headers map[string]string, store SessionManager, e
 	}
 }
 
-func (p *Proxy) SSEHandler(w http.ResponseWriter, r *http.Request) {
+func (p *SSEAProxy) SSEHandler(w http.ResponseWriter, r *http.Request) {
 	SSEHandler(w, r, p.store, p.endpoint, p.backend, p.headers)
 }
 
@@ -117,7 +117,8 @@ func SSEHandler(
 				return
 			}
 
-			newSession, newEndpoint := endpoint.NewEndpoint()
+			newSession := store.New()
+			newEndpoint := endpoint.NewEndpoint(newSession)
 			defer func() {
 				store.Delete(newSession)
 			}()
@@ -136,17 +137,17 @@ func SSEHandler(
 			}
 
 			// Write the data line to the client
-			fmt.Fprintf(w, "data: %s\n", newEndpoint)
+			_, _ = fmt.Fprintf(w, "data: %s\n", newEndpoint)
 			flusher.Flush()
 		}
 	}
 }
 
-func (p *Proxy) ProxyHandler(w http.ResponseWriter, r *http.Request) {
-	ProxyHandler(w, r, p.store, p.endpoint)
+func (p *SSEAProxy) ProxyHandler(w http.ResponseWriter, r *http.Request) {
+	SSEProxyHandler(w, r, p.store, p.endpoint)
 }
 
-func ProxyHandler(
+func SSEProxyHandler(
 	w http.ResponseWriter,
 	r *http.Request,
 	store SessionManager,

+ 3 - 3
core/common/mcpproxy/sse_test.go

@@ -14,8 +14,8 @@ import (
 
 type TestEndpointHandler struct{}
 
-func (h *TestEndpointHandler) NewEndpoint() (string, string) {
-	return "test-session-id", "/message?sessionId=test-session-id"
+func (h *TestEndpointHandler) NewEndpoint(_ string) string {
+	return "/message?sessionId=test-session-id"
 }
 
 func (h *TestEndpointHandler) LoadEndpoint(endpoint string) string {
@@ -50,7 +50,7 @@ func TestProxySSEEndpoint(t *testing.T) {
 	// Create the proxy
 	store := mcpproxy.NewMemStore()
 	handler := &TestEndpointHandler{}
-	proxy := mcpproxy.NewProxy(backendServer.URL+"/sse", nil, store, handler)
+	proxy := mcpproxy.NewSSEProxy(backendServer.URL+"/sse", nil, store, handler)
 
 	// Setup the proxy server
 	proxyServer := httptest.NewServer(http.HandlerFunc(proxy.SSEHandler))

+ 451 - 0
core/common/mcpproxy/streamable.go

@@ -0,0 +1,451 @@
+package mcpproxy
+
+import (
+	"bufio"
+	"context"
+	"fmt"
+	"io"
+	"net/http"
+	"strings"
+	"time"
+)
+
+// StreamableProxy represents a proxy for the MCP Streamable HTTP transport
+type StreamableProxy struct {
+	store   SessionManager
+	backend string
+	headers map[string]string
+}
+
+// NewStreamableProxy creates a new proxy for the Streamable HTTP transport
+func NewStreamableProxy(backend string, headers map[string]string, store SessionManager) *StreamableProxy {
+	return &StreamableProxy{
+		store:   store,
+		backend: backend,
+		headers: headers,
+	}
+}
+
+// ServeHTTP handles both GET and POST requests for the Streamable HTTP transport
+func (p *StreamableProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	// Add CORS headers
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	w.Header().Set("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS")
+	w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept, Mcp-Session-Id")
+	w.Header().Set("Access-Control-Expose-Headers", "Mcp-Session-Id")
+
+	// Handle preflight requests
+	if r.Method == http.MethodOptions {
+		w.WriteHeader(http.StatusOK)
+		return
+	}
+
+	switch r.Method {
+	case http.MethodGet:
+		p.handleGetRequest(w, r)
+	case http.MethodPost:
+		p.handlePostRequest(w, r)
+	case http.MethodDelete:
+		p.handleDeleteRequest(w, r)
+	default:
+		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+	}
+}
+
+// handleGetRequest handles GET requests for SSE streaming
+func (p *StreamableProxy) handleGetRequest(w http.ResponseWriter, r *http.Request) {
+	// Check if Accept header includes text/event-stream
+	acceptHeader := r.Header.Get("Accept")
+	if !strings.Contains(acceptHeader, "text/event-stream") {
+		http.Error(w, "Accept header must include text/event-stream", http.StatusBadRequest)
+		return
+	}
+
+	// Get proxy session ID from header
+	proxySessionID := r.Header.Get("Mcp-Session-Id")
+	if proxySessionID == "" {
+		// This might be an initialization request
+		p.proxyInitialOrNoSessionRequest(w, r)
+		return
+	}
+
+	// Look up the backend endpoint and session ID
+	backendInfo, ok := p.store.Get(proxySessionID)
+	if !ok {
+		http.Error(w, "Invalid or expired session ID", http.StatusNotFound)
+		return
+	}
+
+	// Create a request to the backend
+	req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, backendInfo, nil)
+	if err != nil {
+		http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
+		return
+	}
+
+	// Copy headers from original request, but replace the session ID
+	for name, values := range r.Header {
+		if name == "Mcp-Session-Id" {
+			continue // Skip the proxy session ID
+		}
+		for _, value := range values {
+			req.Header.Add(name, value)
+		}
+	}
+
+	// Extract the real backend session ID from the stored URL
+	parts := strings.Split(backendInfo, "?sessionId=")
+	if len(parts) > 1 {
+		req.Header.Set("Mcp-Session-Id", parts[1])
+	}
+
+	// Add any additional headers
+	for name, value := range p.headers {
+		req.Header.Set(name, value)
+	}
+
+	//nolint:bodyclose
+	resp, err := http.DefaultClient.Do(req)
+	if err != nil {
+		http.Error(w, "Failed to connect to backend", http.StatusInternalServerError)
+		return
+	}
+	defer resp.Body.Close()
+
+	// Check if we got an SSE response
+	if resp.StatusCode != http.StatusOK || !strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") {
+		// Copy response headers, but not the backend session ID
+		for name, values := range resp.Header {
+			if name == "Mcp-Session-Id" {
+				continue
+			}
+			for _, value := range values {
+				w.Header().Add(name, value)
+			}
+		}
+		// Add our proxy session ID
+		w.Header().Set("Mcp-Session-Id", proxySessionID)
+
+		w.WriteHeader(resp.StatusCode)
+		_, _ = io.Copy(w, resp.Body)
+		return
+	}
+
+	// Set SSE headers for the client response
+	w.Header().Set("Content-Type", "text/event-stream")
+	w.Header().Set("Cache-Control", "no-cache")
+	w.Header().Set("Connection", "keep-alive")
+
+	// Create a context that cancels when the client disconnects
+	ctx, cancel := context.WithCancel(r.Context())
+	defer cancel()
+
+	// Monitor client disconnection
+	go func() {
+		<-ctx.Done()
+		resp.Body.Close()
+	}()
+
+	// Stream the SSE events to the client
+	reader := bufio.NewReader(resp.Body)
+	flusher, ok := w.(http.Flusher)
+	if !ok {
+		http.Error(w, "Streaming not supported", http.StatusInternalServerError)
+		return
+	}
+
+	for {
+		line, err := reader.ReadString('\n')
+		if err != nil {
+			if err == io.EOF {
+				break
+			}
+			return
+		}
+
+		// Write the line to the client
+		fmt.Fprint(w, line)
+		flusher.Flush()
+	}
+}
+
+// handlePostRequest handles POST requests for JSON-RPC messages
+func (p *StreamableProxy) handlePostRequest(w http.ResponseWriter, r *http.Request) {
+	// Check if this is an initialization request
+	proxySessionID := r.Header.Get("Mcp-Session-Id")
+	if proxySessionID == "" {
+		p.proxyInitialOrNoSessionRequest(w, r)
+		return
+	}
+
+	// Look up the backend endpoint and session ID
+	backendInfo, ok := p.store.Get(proxySessionID)
+	if !ok {
+		http.Error(w, "Invalid or expired session ID", http.StatusNotFound)
+		return
+	}
+
+	// Create a request to the backend
+	req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, backendInfo, r.Body)
+	if err != nil {
+		http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
+		return
+	}
+
+	// Copy headers from original request, but replace the session ID
+	for name, values := range r.Header {
+		if name == "Mcp-Session-Id" {
+			continue // Skip the proxy session ID
+		}
+		for _, value := range values {
+			req.Header.Add(name, value)
+		}
+	}
+
+	// Extract the real backend session ID from the stored URL
+	parts := strings.Split(backendInfo, "?sessionId=")
+	if len(parts) > 1 {
+		req.Header.Set("Mcp-Session-Id", parts[1])
+	}
+
+	// Add any additional headers
+	for name, value := range p.headers {
+		req.Header.Set(name, value)
+	}
+
+	//nolint:bodyclose
+	resp, err := http.DefaultClient.Do(req)
+	if err != nil {
+		http.Error(w, "Failed to connect to backend", http.StatusInternalServerError)
+		return
+	}
+	defer resp.Body.Close()
+
+	// Copy response headers, but not the backend session ID
+	for name, values := range resp.Header {
+		if name == "Mcp-Session-Id" {
+			continue
+		}
+		for _, value := range values {
+			w.Header().Add(name, value)
+		}
+	}
+	// Add our proxy session ID
+	w.Header().Set("Mcp-Session-Id", proxySessionID)
+
+	// Set response status code
+	w.WriteHeader(resp.StatusCode)
+
+	// Check if the response is an SSE stream
+	if strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") {
+		// Handle SSE response
+		reader := bufio.NewReader(resp.Body)
+		flusher, ok := w.(http.Flusher)
+		if !ok {
+			http.Error(w, "Streaming not supported", http.StatusInternalServerError)
+			return
+		}
+
+		// Create a context that cancels when the client disconnects
+		ctx, cancel := context.WithCancel(r.Context())
+		defer cancel()
+
+		// Monitor client disconnection
+		go func() {
+			<-ctx.Done()
+			resp.Body.Close()
+		}()
+
+		for {
+			line, err := reader.ReadString('\n')
+			if err != nil {
+				if err == io.EOF {
+					break
+				}
+				return
+			}
+
+			// Write the line to the client
+			_, _ = fmt.Fprint(w, line)
+			flusher.Flush()
+		}
+	} else {
+		// Copy regular response body
+		_, _ = io.Copy(w, resp.Body)
+	}
+}
+
+// handleDeleteRequest handles DELETE requests for session termination
+func (p *StreamableProxy) handleDeleteRequest(w http.ResponseWriter, r *http.Request) {
+	// Get proxy session ID from header
+	proxySessionID := r.Header.Get("Mcp-Session-Id")
+	if proxySessionID == "" {
+		http.Error(w, "Missing session ID", http.StatusBadRequest)
+		return
+	}
+
+	// Look up the backend endpoint and session ID
+	backendInfo, ok := p.store.Get(proxySessionID)
+	if !ok {
+		http.Error(w, "Invalid or expired session ID", http.StatusNotFound)
+		return
+	}
+
+	// Create a request to the backend
+	req, err := http.NewRequestWithContext(r.Context(), http.MethodDelete, backendInfo, nil)
+	if err != nil {
+		http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
+		return
+	}
+
+	// Copy headers from original request, but replace the session ID
+	for name, values := range r.Header {
+		if name == "Mcp-Session-Id" {
+			continue // Skip the proxy session ID
+		}
+		for _, value := range values {
+			req.Header.Add(name, value)
+		}
+	}
+
+	// Extract the real backend session ID from the stored URL
+	parts := strings.Split(backendInfo, "?sessionId=")
+	if len(parts) > 1 {
+		req.Header.Set("Mcp-Session-Id", parts[1])
+	}
+
+	// Add any additional headers
+	for name, value := range p.headers {
+		req.Header.Set(name, value)
+	}
+
+	// Make the request to the backend
+	client := &http.Client{
+		Timeout: time.Second * 10,
+	}
+
+	resp, err := client.Do(req)
+	if err != nil {
+		http.Error(w, "Failed to connect to backend", http.StatusInternalServerError)
+		return
+	}
+	defer resp.Body.Close()
+
+	// Remove the session from our store
+	p.store.Delete(proxySessionID)
+
+	// Copy response headers, but not the backend session ID
+	for name, values := range resp.Header {
+		if name == "Mcp-Session-Id" {
+			continue
+		}
+		for _, value := range values {
+			w.Header().Add(name, value)
+		}
+	}
+
+	// Set response status code
+	w.WriteHeader(resp.StatusCode)
+
+	// Copy response body
+	_, _ = io.Copy(w, resp.Body)
+}
+
+// proxyInitialOrNoSessionRequest handles the initial request that doesn't have a session ID yet
+func (p *StreamableProxy) proxyInitialOrNoSessionRequest(w http.ResponseWriter, r *http.Request) {
+	// Create a request to the backend
+	req, err := http.NewRequestWithContext(r.Context(), r.Method, p.backend, r.Body)
+	if err != nil {
+		http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
+		return
+	}
+
+	// Copy headers from original request
+	for name, values := range r.Header {
+		for _, value := range values {
+			req.Header.Add(name, value)
+		}
+	}
+
+	// Add any additional headers
+	for name, value := range p.headers {
+		req.Header.Set(name, value)
+	}
+
+	//nolint:bodyclose
+	resp, err := http.DefaultClient.Do(req)
+	if err != nil {
+		http.Error(w, "Failed to connect to backend", http.StatusInternalServerError)
+		return
+	}
+	defer resp.Body.Close()
+
+	// Check if we received a session ID from the backend
+	backendSessionID := resp.Header.Get("Mcp-Session-Id")
+	if backendSessionID != "" {
+		// Generate a new proxy session ID
+		proxySessionID := p.store.New()
+
+		// Store the mapping between our proxy session ID and the backend endpoint with its session ID
+		backendURL := p.backend
+		if strings.Contains(backendURL, "?") {
+			backendURL += "&sessionId=" + backendSessionID
+		} else {
+			backendURL += "?sessionId=" + backendSessionID
+		}
+		p.store.Set(proxySessionID, backendURL)
+
+		// Replace the backend session ID with our proxy session ID in the response
+		w.Header().Set("Mcp-Session-Id", proxySessionID)
+	}
+
+	// Copy other response headers
+	for name, values := range resp.Header {
+		if name != "Mcp-Session-Id" { // Skip the original session ID
+			for _, value := range values {
+				w.Header().Add(name, value)
+			}
+		}
+	}
+
+	// Set response status code
+	w.WriteHeader(resp.StatusCode)
+
+	// Check if the response is an SSE stream
+	if strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") {
+		// Handle SSE response
+		reader := bufio.NewReader(resp.Body)
+		flusher, ok := w.(http.Flusher)
+		if !ok {
+			http.Error(w, "Streaming not supported", http.StatusInternalServerError)
+			return
+		}
+
+		// Create a context that cancels when the client disconnects
+		ctx, cancel := context.WithCancel(r.Context())
+		defer cancel()
+
+		// Monitor client disconnection
+		go func() {
+			<-ctx.Done()
+			resp.Body.Close()
+		}()
+
+		for {
+			line, err := reader.ReadString('\n')
+			if err != nil {
+				if err == io.EOF {
+					break
+				}
+				return
+			}
+
+			// Write the line to the client
+			fmt.Fprint(w, line)
+			flusher.Flush()
+		}
+	} else {
+		// Copy regular response body
+		_, _ = io.Copy(w, resp.Body)
+	}
+}

+ 157 - 18
core/controller/groupmcp-server.go

@@ -2,15 +2,17 @@ package controller
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"net/http"
 	"net/url"
 
+	"github.com/bytedance/sonic"
 	"github.com/gin-gonic/gin"
-	"github.com/labring/aiproxy/core/common"
 	"github.com/labring/aiproxy/core/common/mcpproxy"
 	"github.com/labring/aiproxy/core/middleware"
 	"github.com/labring/aiproxy/core/model"
+	"github.com/mark3labs/mcp-go/mcp"
 	"github.com/mark3labs/mcp-go/server"
 )
 
@@ -26,10 +28,9 @@ func newGroupMcpEndpoint(key string, t model.GroupMCPType) mcpproxy.EndpointProv
 	}
 }
 
-func (m *groupMcpEndpointProvider) NewEndpoint() (newSession string, newEndpoint string) {
-	session := common.ShortUUID()
+func (m *groupMcpEndpointProvider) NewEndpoint(session string) (newEndpoint string) {
 	endpoint := fmt.Sprintf("/mcp/group/message?sessionId=%s&key=%s&type=%s", session, m.key, m.t)
-	return session, endpoint
+	return endpoint
 }
 
 func (m *groupMcpEndpointProvider) LoadEndpoint(endpoint string) (session string) {
@@ -47,42 +48,62 @@ func (m *groupMcpEndpointProvider) LoadEndpoint(endpoint string) (session string
 func GroupMCPSseServer(c *gin.Context) {
 	id := c.Param("id")
 	if id == "" {
-		middleware.ErrorResponse(c, http.StatusBadRequest, "MCP ID, Group ID, and Session are required")
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"mcp id is required",
+		))
 		return
 	}
 
 	group := middleware.GetGroup(c)
 
-	mcp, err := model.GetGroupMCPByID(id, group.ID)
+	groupMcp, err := model.GetGroupMCPByID(id, group.ID)
 	if err != nil {
-		middleware.ErrorResponse(c, http.StatusNotFound, err.Error())
+		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
 		return
 	}
 
-	switch mcp.Type {
+	switch groupMcp.Type {
 	case model.GroupMCPTypeProxySSE:
-		handleGroupProxySSE(c, mcp.ProxySSEConfig)
+		handleGroupProxySSE(c, groupMcp.ProxyConfig)
 	case model.GroupMCPTypeOpenAPI:
-		server, err := newOpenAPIMCPServer(mcp.OpenAPIConfig)
+		server, err := newOpenAPIMCPServer(groupMcp.OpenAPIConfig)
 		if err != nil {
-			middleware.AbortLogWithMessage(c, http.StatusBadRequest, err.Error())
+			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+				nil,
+				mcp.INVALID_REQUEST,
+				err.Error(),
+			))
 			return
 		}
 		handleGroupMCPServer(c, server, model.GroupMCPTypeOpenAPI)
 	default:
-		middleware.ErrorResponse(c, http.StatusBadRequest, "Unsupported MCP type")
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"unsupported mcp type",
+		))
 	}
 }
 
 // handlePublicProxySSE processes SSE proxy requests
-func handleGroupProxySSE(c *gin.Context, config *model.GroupMCPProxySSEConfig) {
+func handleGroupProxySSE(c *gin.Context, config *model.GroupMCPProxyConfig) {
 	if config == nil || config.URL == "" {
 		return
 	}
 
 	backendURL, err := url.Parse(config.URL)
 	if err != nil {
-		middleware.AbortLogWithMessage(c, http.StatusBadRequest, err.Error())
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
 		return
 	}
 
@@ -112,14 +133,16 @@ func handleGroupProxySSE(c *gin.Context, config *model.GroupMCPProxySSEConfig) {
 func handleGroupMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.GroupMCPType) {
 	token := middleware.GetToken(c)
 
-	newSession, newEndpoint := newGroupMcpEndpoint(token.Key, mcpType).NewEndpoint()
+	// Store the session
+	store := getStore()
+	newSession := store.New()
+
+	newEndpoint := newGroupMcpEndpoint(token.Key, mcpType).NewEndpoint(newSession)
 	server := NewSSEServer(
 		s,
 		WithMessageEndpoint(newEndpoint),
 	)
 
-	// Store the session
-	store := getStore()
 	store.Set(newSession, string(mcpType))
 	defer func() {
 		store.Delete(newSession)
@@ -143,17 +166,27 @@ func GroupMCPMessage(c *gin.Context) {
 	token := middleware.GetToken(c)
 	mcpTypeStr, _ := c.GetQuery("type")
 	if mcpTypeStr == "" {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"missing mcp type",
+		))
 		return
 	}
 	mcpType := model.GroupMCPType(mcpTypeStr)
 	sessionID, _ := c.GetQuery("sessionId")
 	if sessionID == "" {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"missing sessionId",
+		))
 		return
 	}
 
 	switch mcpType {
 	case model.GroupMCPTypeProxySSE:
-		mcpproxy.ProxyHandler(
+		mcpproxy.SSEProxyHandler(
 			c.Writer,
 			c.Request,
 			getStore(),
@@ -163,3 +196,109 @@ func GroupMCPMessage(c *gin.Context) {
 		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
 	}
 }
+
+// GroupMCPStreamable godoc
+//
+//	@Summary	Group MCP Streamable Server
+//	@Router		/mcp/group/{id}/streamable [get]
+//	@Router		/mcp/group/{id}/streamable [post]
+//	@Router		/mcp/group/{id}/streamable [delete]
+func GroupMCPStreamable(c *gin.Context) {
+	id := c.Param("id")
+	if id == "" {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"mcp id is required",
+		))
+		return
+	}
+
+	group := middleware.GetGroup(c)
+
+	groupMcp, err := model.GetGroupMCPByID(id, group.ID)
+	if err != nil {
+		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
+		return
+	}
+
+	switch groupMcp.Type {
+	case model.GroupMCPTypeProxyStreamable:
+		handleGroupProxyStreamable(c, groupMcp.ProxyConfig)
+	case model.GroupMCPTypeOpenAPI:
+		server, err := newOpenAPIMCPServer(groupMcp.OpenAPIConfig)
+		if err != nil {
+			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+				nil,
+				mcp.INVALID_REQUEST,
+				err.Error(),
+			))
+			return
+		}
+		handleGroupStreamableMCPServer(c, server)
+	default:
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"unsupported mcp type",
+		))
+	}
+}
+
+// handleGroupProxyStreamable processes Streamable proxy requests for group
+func handleGroupProxyStreamable(c *gin.Context, config *model.GroupMCPProxyConfig) {
+	if config == nil || config.URL == "" {
+		return
+	}
+
+	backendURL, err := url.Parse(config.URL)
+	if err != nil {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
+		return
+	}
+
+	headers := make(map[string]string)
+	backendQuery := &url.Values{}
+
+	for k, v := range config.Headers {
+		headers[k] = v
+	}
+	for k, v := range config.Querys {
+		backendQuery.Set(k, v)
+	}
+
+	backendURL.RawQuery = backendQuery.Encode()
+	mcpproxy.NewStreamableProxy(backendURL.String(), headers, getStore()).
+		ServeHTTP(c.Writer, c.Request)
+}
+
+// handleGroupStreamableMCPServer handles the streamable connection for a group MCP server
+func handleGroupStreamableMCPServer(c *gin.Context, s *server.MCPServer) {
+	if c.Request.Method != http.MethodPost {
+		c.JSON(http.StatusMethodNotAllowed, CreateMCPErrorResponse(
+			nil,
+			mcp.METHOD_NOT_FOUND,
+			"method not allowed",
+		))
+		return
+	}
+	var rawMessage json.RawMessage
+	if err := sonic.ConfigDefault.NewDecoder(c.Request.Body).Decode(&rawMessage); err != nil {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.PARSE_ERROR,
+			err.Error(),
+		))
+		return
+	}
+	respMessage := s.HandleMessage(c.Request.Context(), rawMessage)
+	c.JSON(http.StatusOK, respMessage)
+}

+ 21 - 33
core/controller/mcpopenapi.go → core/controller/mcp-sse.go

@@ -8,6 +8,7 @@ import (
 	"net/http"
 	"time"
 
+	"github.com/bytedance/sonic"
 	"github.com/mark3labs/mcp-go/mcp"
 	"github.com/mark3labs/mcp-go/server"
 )
@@ -47,6 +48,7 @@ func WithKeepAlive(keepAlive bool) SSEOption {
 }
 
 // NewSSEServer creates a new SSE server instance with the given MCP server and options.
+// TODO: notify support
 func NewSSEServer(server *server.MCPServer, opts ...SSEOption) *SSEServer {
 	s := &SSEServer{
 		server:            server,
@@ -68,7 +70,13 @@ func NewSSEServer(server *server.MCPServer, opts ...SSEOption) *SSEServer {
 // It sets up appropriate headers and creates a new session for the client.
 func (s *SSEServer) HandleSSE(w http.ResponseWriter, r *http.Request) {
 	if r.Method != http.MethodGet {
-		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+		errorResponse := CreateMCPErrorResponse(
+			nil,
+			mcp.METHOD_NOT_FOUND,
+			"method not allowed",
+		)
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		_ = sonic.ConfigDefault.NewEncoder(w).Encode(errorResponse)
 		return
 	}
 
@@ -79,7 +87,13 @@ func (s *SSEServer) HandleSSE(w http.ResponseWriter, r *http.Request) {
 
 	flusher, ok := w.(http.Flusher)
 	if !ok {
-		http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
+		errorResponse := CreateMCPErrorResponse(
+			nil,
+			mcp.INTERNAL_ERROR,
+			"streaming unsupported",
+		)
+		w.WriteHeader(http.StatusInternalServerError)
+		_ = sonic.ConfigDefault.NewEncoder(w).Encode(errorResponse)
 		return
 	}
 
@@ -121,8 +135,8 @@ func (s *SSEServer) HandleSSE(w http.ResponseWriter, r *http.Request) {
 func (s *SSEServer) HandleMessage(req []byte) error {
 	// Parse message as raw JSON
 	var rawMessage json.RawMessage
-	if err := json.Unmarshal(req, &rawMessage); err != nil {
-		return errors.New("parse error")
+	if err := sonic.Unmarshal(req, &rawMessage); err != nil {
+		return err
 	}
 
 	// Process message through MCPServer
@@ -130,7 +144,7 @@ func (s *SSEServer) HandleMessage(req []byte) error {
 
 	// Only send response if there is one (not for notifications)
 	if response != nil {
-		eventData, err := json.Marshal(response)
+		eventData, err := sonic.Marshal(response)
 		if err != nil {
 			return err
 		}
@@ -140,36 +154,10 @@ func (s *SSEServer) HandleMessage(req []byte) error {
 		case s.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
 			// Event queued successfully
 		default:
-			// Queue is full, could log this
+			// Queue is full
+			return errors.New("event queue is full")
 		}
 	}
 
 	return nil
 }
-
-func JSONRPCError(
-	id interface{},
-	code int,
-	message string,
-) ([]byte, error) {
-	return json.Marshal(createErrorResponse(id, code, message))
-}
-
-func createErrorResponse(
-	id interface{},
-	code int,
-	message string,
-) mcp.JSONRPCMessage {
-	return mcp.JSONRPCError{
-		JSONRPC: mcp.JSONRPC_VERSION,
-		ID:      id,
-		Error: struct {
-			Code    int         `json:"code"`
-			Message string      `json:"message"`
-			Data    interface{} `json:"data,omitempty"`
-		}{
-			Code:    code,
-			Message: message,
-		},
-	}
-}

+ 221 - 22
core/controller/publicmcp-server.go

@@ -2,6 +2,7 @@ package controller
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -11,12 +12,14 @@ import (
 	"sync"
 	"time"
 
+	"github.com/bytedance/sonic"
 	"github.com/gin-gonic/gin"
 	"github.com/labring/aiproxy/core/common"
 	"github.com/labring/aiproxy/core/common/mcpproxy"
 	"github.com/labring/aiproxy/core/middleware"
 	"github.com/labring/aiproxy/core/model"
 	"github.com/labring/aiproxy/openapi-mcp/convert"
+	"github.com/mark3labs/mcp-go/mcp"
 	"github.com/mark3labs/mcp-go/server"
 	"github.com/redis/go-redis/v9"
 )
@@ -34,10 +37,9 @@ func newPublicMcpEndpoint(key string, t model.PublicMCPType) mcpproxy.EndpointPr
 	}
 }
 
-func (m *publicMcpEndpointProvider) NewEndpoint() (newSession string, newEndpoint string) {
-	session := common.ShortUUID()
+func (m *publicMcpEndpointProvider) NewEndpoint(session string) (newEndpoint string) {
 	endpoint := fmt.Sprintf("/mcp/public/message?sessionId=%s&key=%s&type=%s", session, m.key, m.t)
-	return session, endpoint
+	return endpoint
 }
 
 func (m *publicMcpEndpointProvider) LoadEndpoint(endpoint string) (session string) {
@@ -86,6 +88,10 @@ redis.call('EXPIRE', key, 300)
 return value
 `)
 
+func (r *redisStoreManager) New() string {
+	return common.ShortUUID()
+}
+
 func (r *redisStoreManager) Get(sessionID string) (string, bool) {
 	ctx := context.Background()
 
@@ -116,35 +122,51 @@ func PublicMCPSseServer(c *gin.Context) {
 
 	publicMcp, err := model.GetPublicMCPByID(mcpID)
 	if err != nil {
-		middleware.AbortLogWithMessage(c, http.StatusBadRequest, err.Error())
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
 		return
 	}
 
 	switch publicMcp.Type {
 	case model.PublicMCPTypeProxySSE:
-		handlePublicProxySSE(c, publicMcp.ID, publicMcp.ProxySSEConfig)
+		handlePublicProxySSE(c, publicMcp.ID, publicMcp.ProxyConfig)
 	case model.PublicMCPTypeOpenAPI:
 		server, err := newOpenAPIMCPServer(publicMcp.OpenAPIConfig)
 		if err != nil {
-			middleware.AbortLogWithMessage(c, http.StatusBadRequest, err.Error())
+			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+				nil,
+				mcp.INVALID_REQUEST,
+				err.Error(),
+			))
 			return
 		}
-		handleMCPServer(c, server, model.PublicMCPTypeOpenAPI)
+		handleSSEMCPServer(c, server, model.PublicMCPTypeOpenAPI)
 	default:
-		middleware.AbortLogWithMessage(c, http.StatusBadRequest, "unknow mcp type")
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"unknown mcp type",
+		))
 		return
 	}
 }
 
 // handlePublicProxySSE processes SSE proxy requests
-func handlePublicProxySSE(c *gin.Context, mcpID string, config *model.PublicMCPProxySSEConfig) {
+func handlePublicProxySSE(c *gin.Context, mcpID string, config *model.PublicMCPProxyConfig) {
 	if config == nil || config.URL == "" {
 		return
 	}
 
 	backendURL, err := url.Parse(config.URL)
 	if err != nil {
-		middleware.AbortLogWithMessage(c, http.StatusBadRequest, err.Error())
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
 		return
 	}
 
@@ -155,7 +177,11 @@ func handlePublicProxySSE(c *gin.Context, mcpID string, config *model.PublicMCPP
 
 	// Process reusing parameters if any
 	if err := processReusingParams(config.ReusingParams, mcpID, group.ID, headers, backendQuery); err != nil {
-		middleware.AbortLogWithMessage(c, http.StatusBadRequest, err.Error())
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
 		return
 	}
 
@@ -212,18 +238,20 @@ func newOpenAPIMCPServer(config *model.MCPOpenAPIConfig) (*server.MCPServer, err
 	return s, nil
 }
 
-// handleMCPServer handles the SSE connection for an MCP server
-func handleMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.PublicMCPType) {
+// handleSSEMCPServer handles the SSE connection for an MCP server
+func handleSSEMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.PublicMCPType) {
 	token := middleware.GetToken(c)
 
-	newSession, newEndpoint := newPublicMcpEndpoint(token.Key, mcpType).NewEndpoint()
+	// Store the session
+	store := getStore()
+	newSession := store.New()
+
+	newEndpoint := newPublicMcpEndpoint(token.Key, mcpType).NewEndpoint(newSession)
 	server := NewSSEServer(
 		s,
 		WithMessageEndpoint(newEndpoint),
 	)
 
-	// Store the session
-	store := getStore()
 	store.Set(newSession, string(mcpType))
 	defer func() {
 		store.Delete(newSession)
@@ -277,7 +305,7 @@ func processMCPSseMpscMessages(ctx context.Context, sessionID string, server *SS
 				return
 			}
 			if err := server.HandleMessage(data); err != nil {
-				return
+				continue
 			}
 		}
 	}
@@ -318,52 +346,204 @@ func processReusingParams(reusingParams map[string]model.ReusingParam, mcpID str
 
 // PublicMCPMessage godoc
 //
-//	@Summary	MCP SSE Proxy
+//	@Summary	Public MCP SSE Server
 //	@Router		/mcp/public/message [post]
 func PublicMCPMessage(c *gin.Context) {
 	token := middleware.GetToken(c)
 	mcpTypeStr, _ := c.GetQuery("type")
 	if mcpTypeStr == "" {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"missing mcp type",
+		))
 		return
 	}
 	mcpType := model.PublicMCPType(mcpTypeStr)
 	sessionID, _ := c.GetQuery("sessionId")
 	if sessionID == "" {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"missing sessionId",
+		))
 		return
 	}
 
 	switch mcpType {
 	case model.PublicMCPTypeProxySSE:
-		mcpproxy.ProxyHandler(
+		mcpproxy.SSEProxyHandler(
 			c.Writer,
 			c.Request,
 			getStore(),
 			newPublicMcpEndpoint(token.Key, mcpType),
 		)
-	default:
+	case model.PublicMCPTypeOpenAPI:
 		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
+	default:
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"unknown mcp type",
+		))
 	}
 }
 
 func sendMCPSSEMessage(c *gin.Context, mcpType, sessionID string) {
 	backend, ok := getStore().Get(sessionID)
 	if !ok || backend != mcpType {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"invalid session",
+		))
 		return
 	}
 	mpscInstance := getMCPMpsc()
 	body, err := io.ReadAll(c.Request.Body)
 	if err != nil {
-		_ = c.AbortWithError(http.StatusInternalServerError, err)
+		c.JSON(http.StatusInternalServerError, CreateMCPErrorResponse(
+			nil,
+			mcp.INTERNAL_ERROR,
+			err.Error(),
+		))
 		return
 	}
 	err = mpscInstance.send(c.Request.Context(), sessionID, body)
 	if err != nil {
-		_ = c.AbortWithError(http.StatusInternalServerError, err)
+		c.JSON(http.StatusInternalServerError, CreateMCPErrorResponse(
+			nil,
+			mcp.INTERNAL_ERROR,
+			err.Error(),
+		))
 		return
 	}
 	c.Writer.WriteHeader(http.StatusAccepted)
 }
 
+// PublicMCPStreamable godoc
+//
+//	@Summary	Public MCP Streamable Server
+//	@Router		/mcp/public/{id}/streamable [get]
+//	@Router		/mcp/public/{id}/streamable [post]
+//	@Router		/mcp/public/{id}/streamable [delete]
+//
+// TODO: batch and sse support
+func PublicMCPStreamable(c *gin.Context) {
+	mcpID := c.Param("id")
+	publicMcp, err := model.GetPublicMCPByID(mcpID)
+	if err != nil {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
+		return
+	}
+
+	switch publicMcp.Type {
+	case model.PublicMCPTypeProxyStreamable:
+		handlePublicProxyStreamable(c, mcpID, publicMcp.ProxyConfig)
+	case model.PublicMCPTypeOpenAPI:
+		server, err := newOpenAPIMCPServer(publicMcp.OpenAPIConfig)
+		if err != nil {
+			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+				nil,
+				mcp.INVALID_REQUEST,
+				err.Error(),
+			))
+			return
+		}
+		handleStreamableMCPServer(c, server)
+	default:
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"unknown mcp type",
+		))
+	}
+}
+
+// handlePublicProxyStreamable processes Streamable proxy requests
+func handlePublicProxyStreamable(c *gin.Context, mcpID string, config *model.PublicMCPProxyConfig) {
+	if config == nil || config.URL == "" {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			"invalid proxy configuration",
+		))
+		return
+	}
+
+	backendURL, err := url.Parse(config.URL)
+	if err != nil {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
+		return
+	}
+
+	headers := make(map[string]string)
+	backendQuery := &url.Values{}
+	group := middleware.GetGroup(c)
+	token := middleware.GetToken(c)
+
+	// Process reusing parameters if any
+	if err := processReusingParams(config.ReusingParams, mcpID, group.ID, headers, backendQuery); err != nil {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.INVALID_REQUEST,
+			err.Error(),
+		))
+		return
+	}
+
+	for k, v := range config.Headers {
+		headers[k] = v
+	}
+	for k, v := range config.Querys {
+		backendQuery.Set(k, v)
+	}
+
+	backendURL.RawQuery = backendQuery.Encode()
+	mcpproxy.SSEHandler(
+		c.Writer,
+		c.Request,
+		getStore(),
+		newPublicMcpEndpoint(token.Key, model.PublicMCPTypeProxySSE),
+		backendURL.String(),
+		headers,
+	)
+
+	mcpproxy.NewStreamableProxy(backendURL.String(), headers, getStore()).
+		ServeHTTP(c.Writer, c.Request)
+}
+
+// handleStreamableMCPServer handles the streamable connection for an MCP server
+func handleStreamableMCPServer(c *gin.Context, s *server.MCPServer) {
+	if c.Request.Method != http.MethodPost {
+		c.JSON(http.StatusMethodNotAllowed, CreateMCPErrorResponse(
+			nil,
+			mcp.METHOD_NOT_FOUND,
+			"method not allowed",
+		))
+		return
+	}
+	var rawMessage json.RawMessage
+	if err := sonic.ConfigDefault.NewDecoder(c.Request.Body).Decode(&rawMessage); err != nil {
+		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
+			nil,
+			mcp.PARSE_ERROR,
+			err.Error(),
+		))
+		return
+	}
+	respMessage := s.HandleMessage(c.Request.Context(), rawMessage)
+	c.JSON(http.StatusOK, respMessage)
+}
+
 // Interface for multi-producer, single-consumer message passing
 type mpsc interface {
 	recv(ctx context.Context, id string) ([]byte, error)
@@ -523,3 +703,22 @@ func (r *redisMCPMPSC) recv(ctx context.Context, id string) ([]byte, error) {
 		}
 	}
 }
+
+func CreateMCPErrorResponse(
+	id interface{},
+	code int,
+	message string,
+) mcp.JSONRPCMessage {
+	return mcp.JSONRPCError{
+		JSONRPC: mcp.JSONRPC_VERSION,
+		ID:      id,
+		Error: struct {
+			Code    int         `json:"code"`
+			Message string      `json:"message"`
+			Data    interface{} `json:"data,omitempty"`
+		}{
+			Code:    code,
+			Message: message,
+		},
+	}
+}

+ 40 - 8
core/docs/docs.go

@@ -6299,9 +6299,23 @@ const docTemplate = `{
                 "responses": {}
             }
         },
+        "/mcp/group/{id}/streamable": {
+            "get": {
+                "summary": "Group MCP Streamable Server",
+                "responses": {}
+            },
+            "post": {
+                "summary": "Group MCP Streamable Server",
+                "responses": {}
+            },
+            "delete": {
+                "summary": "Group MCP Streamable Server",
+                "responses": {}
+            }
+        },
         "/mcp/public/message": {
             "post": {
-                "summary": "MCP SSE Proxy",
+                "summary": "Public MCP SSE Server",
                 "responses": {}
             }
         },
@@ -6311,6 +6325,20 @@ const docTemplate = `{
                 "responses": {}
             }
         },
+        "/mcp/public/{id}/streamable": {
+            "get": {
+                "summary": "Public MCP Streamable Server",
+                "responses": {}
+            },
+            "post": {
+                "summary": "Public MCP Streamable Server",
+                "responses": {}
+            },
+            "delete": {
+                "summary": "Public MCP Streamable Server",
+                "responses": {}
+            }
+        },
         "/v1/audio/speech": {
             "post": {
                 "security": [
@@ -8327,8 +8355,8 @@ const docTemplate = `{
                 "openapi_config": {
                     "$ref": "#/definitions/model.MCPOpenAPIConfig"
                 },
-                "proxy_sse_config": {
-                    "$ref": "#/definitions/model.GroupMCPProxySSEConfig"
+                "proxy_config": {
+                    "$ref": "#/definitions/model.GroupMCPProxyConfig"
                 },
                 "type": {
                     "$ref": "#/definitions/model.GroupMCPType"
@@ -8338,7 +8366,7 @@ const docTemplate = `{
                 }
             }
         },
-        "model.GroupMCPProxySSEConfig": {
+        "model.GroupMCPProxyConfig": {
             "type": "object",
             "properties": {
                 "headers": {
@@ -8362,10 +8390,12 @@ const docTemplate = `{
             "type": "string",
             "enum": [
                 "mcp_proxy_sse",
+                "mcp_proxy_streamable",
                 "mcp_openapi"
             ],
             "x-enum-varnames": [
                 "GroupMCPTypeProxySSE",
+                "GroupMCPTypeProxyStreamable",
                 "GroupMCPTypeOpenAPI"
             ]
         },
@@ -8568,7 +8598,7 @@ const docTemplate = `{
                 "openapi_spec": {
                     "type": "string"
                 },
-                "server": {
+                "server_addr": {
                     "type": "string"
                 },
                 "v2": {
@@ -8871,8 +8901,8 @@ const docTemplate = `{
                 "price": {
                     "$ref": "#/definitions/model.MCPPrice"
                 },
-                "proxy_sse_config": {
-                    "$ref": "#/definitions/model.PublicMCPProxySSEConfig"
+                "proxy_config": {
+                    "$ref": "#/definitions/model.PublicMCPProxyConfig"
                 },
                 "readme": {
                     "type": "string"
@@ -8897,7 +8927,7 @@ const docTemplate = `{
                 }
             }
         },
-        "model.PublicMCPProxySSEConfig": {
+        "model.PublicMCPProxyConfig": {
             "type": "object",
             "properties": {
                 "headers": {
@@ -8950,6 +8980,7 @@ const docTemplate = `{
             "type": "string",
             "enum": [
                 "mcp_proxy_sse",
+                "mcp_proxy_streamable",
                 "mcp_git_repo",
                 "mcp_openapi"
             ],
@@ -8958,6 +8989,7 @@ const docTemplate = `{
             },
             "x-enum-varnames": [
                 "PublicMCPTypeProxySSE",
+                "PublicMCPTypeProxyStreamable",
                 "PublicMCPTypeGitRepo",
                 "PublicMCPTypeOpenAPI"
             ]

+ 40 - 8
core/docs/swagger.json

@@ -6290,9 +6290,23 @@
                 "responses": {}
             }
         },
+        "/mcp/group/{id}/streamable": {
+            "get": {
+                "summary": "Group MCP Streamable Server",
+                "responses": {}
+            },
+            "post": {
+                "summary": "Group MCP Streamable Server",
+                "responses": {}
+            },
+            "delete": {
+                "summary": "Group MCP Streamable Server",
+                "responses": {}
+            }
+        },
         "/mcp/public/message": {
             "post": {
-                "summary": "MCP SSE Proxy",
+                "summary": "Public MCP SSE Server",
                 "responses": {}
             }
         },
@@ -6302,6 +6316,20 @@
                 "responses": {}
             }
         },
+        "/mcp/public/{id}/streamable": {
+            "get": {
+                "summary": "Public MCP Streamable Server",
+                "responses": {}
+            },
+            "post": {
+                "summary": "Public MCP Streamable Server",
+                "responses": {}
+            },
+            "delete": {
+                "summary": "Public MCP Streamable Server",
+                "responses": {}
+            }
+        },
         "/v1/audio/speech": {
             "post": {
                 "security": [
@@ -8318,8 +8346,8 @@
                 "openapi_config": {
                     "$ref": "#/definitions/model.MCPOpenAPIConfig"
                 },
-                "proxy_sse_config": {
-                    "$ref": "#/definitions/model.GroupMCPProxySSEConfig"
+                "proxy_config": {
+                    "$ref": "#/definitions/model.GroupMCPProxyConfig"
                 },
                 "type": {
                     "$ref": "#/definitions/model.GroupMCPType"
@@ -8329,7 +8357,7 @@
                 }
             }
         },
-        "model.GroupMCPProxySSEConfig": {
+        "model.GroupMCPProxyConfig": {
             "type": "object",
             "properties": {
                 "headers": {
@@ -8353,10 +8381,12 @@
             "type": "string",
             "enum": [
                 "mcp_proxy_sse",
+                "mcp_proxy_streamable",
                 "mcp_openapi"
             ],
             "x-enum-varnames": [
                 "GroupMCPTypeProxySSE",
+                "GroupMCPTypeProxyStreamable",
                 "GroupMCPTypeOpenAPI"
             ]
         },
@@ -8559,7 +8589,7 @@
                 "openapi_spec": {
                     "type": "string"
                 },
-                "server": {
+                "server_addr": {
                     "type": "string"
                 },
                 "v2": {
@@ -8862,8 +8892,8 @@
                 "price": {
                     "$ref": "#/definitions/model.MCPPrice"
                 },
-                "proxy_sse_config": {
-                    "$ref": "#/definitions/model.PublicMCPProxySSEConfig"
+                "proxy_config": {
+                    "$ref": "#/definitions/model.PublicMCPProxyConfig"
                 },
                 "readme": {
                     "type": "string"
@@ -8888,7 +8918,7 @@
                 }
             }
         },
-        "model.PublicMCPProxySSEConfig": {
+        "model.PublicMCPProxyConfig": {
             "type": "object",
             "properties": {
                 "headers": {
@@ -8941,6 +8971,7 @@
             "type": "string",
             "enum": [
                 "mcp_proxy_sse",
+                "mcp_proxy_streamable",
                 "mcp_git_repo",
                 "mcp_openapi"
             ],
@@ -8949,6 +8980,7 @@
             },
             "x-enum-varnames": [
                 "PublicMCPTypeProxySSE",
+                "PublicMCPTypeProxyStreamable",
                 "PublicMCPTypeGitRepo",
                 "PublicMCPTypeOpenAPI"
             ]

+ 32 - 8
core/docs/swagger.yaml

@@ -805,14 +805,14 @@ definitions:
         type: string
       openapi_config:
         $ref: '#/definitions/model.MCPOpenAPIConfig'
-      proxy_sse_config:
-        $ref: '#/definitions/model.GroupMCPProxySSEConfig'
+      proxy_config:
+        $ref: '#/definitions/model.GroupMCPProxyConfig'
       type:
         $ref: '#/definitions/model.GroupMCPType'
       update_at:
         type: string
     type: object
-  model.GroupMCPProxySSEConfig:
+  model.GroupMCPProxyConfig:
     properties:
       headers:
         additionalProperties:
@@ -828,10 +828,12 @@ definitions:
   model.GroupMCPType:
     enum:
     - mcp_proxy_sse
+    - mcp_proxy_streamable
     - mcp_openapi
     type: string
     x-enum-varnames:
     - GroupMCPTypeProxySSE
+    - GroupMCPTypeProxyStreamable
     - GroupMCPTypeOpenAPI
   model.GroupModelConfig:
     properties:
@@ -964,7 +966,7 @@ definitions:
         type: string
       openapi_spec:
         type: string
-      server:
+      server_addr:
         type: string
       v2:
         type: boolean
@@ -1185,8 +1187,8 @@ definitions:
         $ref: '#/definitions/model.MCPOpenAPIConfig'
       price:
         $ref: '#/definitions/model.MCPPrice'
-      proxy_sse_config:
-        $ref: '#/definitions/model.PublicMCPProxySSEConfig'
+      proxy_config:
+        $ref: '#/definitions/model.PublicMCPProxyConfig'
       readme:
         type: string
       readme_url:
@@ -1202,7 +1204,7 @@ definitions:
       update_at:
         type: string
     type: object
-  model.PublicMCPProxySSEConfig:
+  model.PublicMCPProxyConfig:
     properties:
       headers:
         additionalProperties:
@@ -1237,6 +1239,7 @@ definitions:
   model.PublicMCPType:
     enum:
     - mcp_proxy_sse
+    - mcp_proxy_streamable
     - mcp_git_repo
     - mcp_openapi
     type: string
@@ -1244,6 +1247,7 @@ definitions:
       PublicMCPTypeGitRepo: read only
     x-enum-varnames:
     - PublicMCPTypeProxySSE
+    - PublicMCPTypeProxyStreamable
     - PublicMCPTypeGitRepo
     - PublicMCPTypeOpenAPI
   model.RequestDetail:
@@ -5199,6 +5203,16 @@ paths:
     get:
       responses: {}
       summary: Group MCP SSE Server
+  /mcp/group/{id}/streamable:
+    delete:
+      responses: {}
+      summary: Group MCP Streamable Server
+    get:
+      responses: {}
+      summary: Group MCP Streamable Server
+    post:
+      responses: {}
+      summary: Group MCP Streamable Server
   /mcp/group/message:
     post:
       responses: {}
@@ -5207,10 +5221,20 @@ paths:
     get:
       responses: {}
       summary: Public MCP SSE Server
+  /mcp/public/{id}/streamable:
+    delete:
+      responses: {}
+      summary: Public MCP Streamable Server
+    get:
+      responses: {}
+      summary: Public MCP Streamable Server
+    post:
+      responses: {}
+      summary: Public MCP Streamable Server
   /mcp/public/message:
     post:
       responses: {}
-      summary: MCP SSE Proxy
+      summary: Public MCP SSE Server
   /v1/audio/speech:
     post:
       description: AudioSpeech

+ 15 - 14
core/model/groupmcp.go

@@ -16,26 +16,27 @@ const (
 type GroupMCPType string
 
 const (
-	GroupMCPTypeProxySSE GroupMCPType = "mcp_proxy_sse"
-	GroupMCPTypeOpenAPI  GroupMCPType = "mcp_openapi"
+	GroupMCPTypeProxySSE        GroupMCPType = "mcp_proxy_sse"
+	GroupMCPTypeProxyStreamable GroupMCPType = "mcp_proxy_streamable"
+	GroupMCPTypeOpenAPI         GroupMCPType = "mcp_openapi"
 )
 
-type GroupMCPProxySSEConfig struct {
+type GroupMCPProxyConfig struct {
 	URL     string            `json:"url"`
 	Querys  map[string]string `json:"querys"`
 	Headers map[string]string `json:"headers"`
 }
 
 type GroupMCP struct {
-	ID             string                  `gorm:"primaryKey"                    json:"id"`
-	GroupID        string                  `gorm:"primaryKey"                    json:"group_id"`
-	Group          *Group                  `gorm:"foreignKey:GroupID"            json:"-"`
-	CreatedAt      time.Time               `gorm:"index"                         json:"created_at"`
-	UpdateAt       time.Time               `gorm:"index"                         json:"update_at"`
-	Name           string                  `json:"name"`
-	Type           GroupMCPType            `gorm:"index"                         json:"type"`
-	ProxySSEConfig *GroupMCPProxySSEConfig `gorm:"serializer:fastjson;type:text" json:"proxy_sse_config,omitempty"`
-	OpenAPIConfig  *MCPOpenAPIConfig       `gorm:"serializer:fastjson;type:text" json:"openapi_config,omitempty"`
+	ID            string               `gorm:"primaryKey"                    json:"id"`
+	GroupID       string               `gorm:"primaryKey"                    json:"group_id"`
+	Group         *Group               `gorm:"foreignKey:GroupID"            json:"-"`
+	CreatedAt     time.Time            `gorm:"index"                         json:"created_at"`
+	UpdateAt      time.Time            `gorm:"index"                         json:"update_at"`
+	Name          string               `json:"name"`
+	Type          GroupMCPType         `gorm:"index"                         json:"type"`
+	ProxyConfig   *GroupMCPProxyConfig `gorm:"serializer:fastjson;type:text" json:"proxy_config,omitempty"`
+	OpenAPIConfig *MCPOpenAPIConfig    `gorm:"serializer:fastjson;type:text" json:"openapi_config,omitempty"`
 }
 
 func (g *GroupMCP) BeforeCreate(_ *gorm.DB) (err error) {
@@ -57,8 +58,8 @@ func (g *GroupMCP) BeforeCreate(_ *gorm.DB) (err error) {
 		return errors.New("openapi spec and content is empty")
 	}
 
-	if g.ProxySSEConfig != nil {
-		config := g.ProxySSEConfig
+	if g.ProxyConfig != nil {
+		config := g.ProxyConfig
 		return validateHTTPURL(config.URL)
 	}
 

+ 22 - 21
core/model/publicmcp.go

@@ -18,9 +18,10 @@ const (
 type PublicMCPType string
 
 const (
-	PublicMCPTypeProxySSE PublicMCPType = "mcp_proxy_sse"
-	PublicMCPTypeGitRepo  PublicMCPType = "mcp_git_repo" // read only
-	PublicMCPTypeOpenAPI  PublicMCPType = "mcp_openapi"
+	PublicMCPTypeProxySSE        PublicMCPType = "mcp_proxy_sse"
+	PublicMCPTypeProxyStreamable PublicMCPType = "mcp_proxy_streamable"
+	PublicMCPTypeGitRepo         PublicMCPType = "mcp_git_repo" // read only
+	PublicMCPTypeOpenAPI         PublicMCPType = "mcp_openapi"
 )
 
 type ParamType string
@@ -42,7 +43,7 @@ type MCPPrice struct {
 	ToolsCallPrices       map[string]float64 `gorm:"serializer:fastjson;type:text" json:"tools_call_prices"`
 }
 
-type PublicMCPProxySSEConfig struct {
+type PublicMCPProxyConfig struct {
 	URL           string                  `json:"url"`
 	Querys        map[string]string       `json:"querys"`
 	Headers       map[string]string       `json:"headers"`
@@ -91,21 +92,21 @@ type MCPOpenAPIConfig struct {
 }
 
 type PublicMCP struct {
-	ID                     string                   `gorm:"primaryKey"                    json:"id"`
-	CreatedAt              time.Time                `gorm:"index"                         json:"created_at"`
-	UpdateAt               time.Time                `gorm:"index"                         json:"update_at"`
-	PublicMCPReusingParams []PublicMCPReusingParam  `gorm:"foreignKey:MCPID"              json:"-"`
-	Name                   string                   `json:"name"`
-	Type                   PublicMCPType            `gorm:"index"                         json:"type"`
-	RepoURL                string                   `json:"repo_url"`
-	ReadmeURL              string                   `json:"readme_url"`
-	Readme                 string                   `gorm:"type:text"                     json:"readme"`
-	Tags                   []string                 `gorm:"serializer:fastjson;type:text" json:"tags,omitempty"`
-	Author                 string                   `json:"author"`
-	LogoURL                string                   `json:"logo_url"`
-	Price                  MCPPrice                 `gorm:"embedded"                      json:"price"`
-	ProxySSEConfig         *PublicMCPProxySSEConfig `gorm:"serializer:fastjson;type:text" json:"proxy_sse_config,omitempty"`
-	OpenAPIConfig          *MCPOpenAPIConfig        `gorm:"serializer:fastjson;type:text" json:"openapi_config,omitempty"`
+	ID                     string                  `gorm:"primaryKey"                    json:"id"`
+	CreatedAt              time.Time               `gorm:"index"                         json:"created_at"`
+	UpdateAt               time.Time               `gorm:"index"                         json:"update_at"`
+	PublicMCPReusingParams []PublicMCPReusingParam `gorm:"foreignKey:MCPID"              json:"-"`
+	Name                   string                  `json:"name"`
+	Type                   PublicMCPType           `gorm:"index"                         json:"type"`
+	RepoURL                string                  `json:"repo_url"`
+	ReadmeURL              string                  `json:"readme_url"`
+	Readme                 string                  `gorm:"type:text"                     json:"readme"`
+	Tags                   []string                `gorm:"serializer:fastjson;type:text" json:"tags,omitempty"`
+	Author                 string                  `json:"author"`
+	LogoURL                string                  `json:"logo_url"`
+	Price                  MCPPrice                `gorm:"embedded"                      json:"price"`
+	ProxyConfig            *PublicMCPProxyConfig   `gorm:"serializer:fastjson;type:text" json:"proxy_config,omitempty"`
+	OpenAPIConfig          *MCPOpenAPIConfig       `gorm:"serializer:fastjson;type:text" json:"openapi_config,omitempty"`
 }
 
 func (p *PublicMCP) BeforeCreate(_ *gorm.DB) error {
@@ -124,8 +125,8 @@ func (p *PublicMCP) BeforeCreate(_ *gorm.DB) error {
 		return errors.New("openapi spec and content is empty")
 	}
 
-	if p.ProxySSEConfig != nil {
-		config := p.ProxySSEConfig
+	if p.ProxyConfig != nil {
+		config := p.ProxyConfig
 		return validateHTTPURL(config.URL)
 	}
 	return nil

+ 6 - 0
core/router/mcp.go

@@ -11,7 +11,13 @@ func SetMCPRouter(router *gin.Engine) {
 
 	mcpRoute.GET("/public/:id/sse", controller.PublicMCPSseServer)
 	mcpRoute.POST("/public/message", controller.PublicMCPMessage)
+	mcpRoute.GET("/public/:id/streamable", controller.PublicMCPStreamable)
+	mcpRoute.POST("/public/:id/streamable", controller.PublicMCPStreamable)
+	mcpRoute.DELETE("/public/:id/streamable", controller.PublicMCPStreamable)
 
 	mcpRoute.GET("/group/:id/sse", controller.GroupMCPSseServer)
 	mcpRoute.POST("/group/message", controller.GroupMCPMessage)
+	mcpRoute.GET("/group/:id/streamable", controller.GroupMCPStreamable)
+	mcpRoute.POST("/group/:id/streamable", controller.GroupMCPStreamable)
+	mcpRoute.DELETE("/group/:id/streamable", controller.GroupMCPStreamable)
 }