Ver código fonte

feat: streamable http mcp server proxy support use sse endpoint to request (#226)

zijiren 7 meses atrás
pai
commit
3c884d7b52

+ 3 - 2
core/.golangci.yml

@@ -104,6 +104,9 @@ linters:
       constant-kind: true
     usetesting:
       os-temp-dir: true
+    gosec:
+      excludes:
+        - G404
 
 formatters:
   enable:
@@ -126,5 +129,3 @@ formatters:
           replacement: "a[b:]"
     gofumpt:
       extra-rules: true
-    golines:
-      shorten-comments: true

+ 0 - 1
core/common/balance/sealos.go

@@ -108,7 +108,6 @@ type sealosCache struct {
 	Balance int64  `redis:"b"`
 }
 
-//nolint:gosec
 func cacheSetGroupBalance(ctx context.Context, group string, balance int64, userUID string) error {
 	if !common.RedisEnabled || !sealosRedisCacheEnable {
 		return nil

+ 118 - 165
core/common/mcpproxy/sse.go

@@ -1,218 +1,171 @@
 package mcpproxy
 
 import (
-	"bufio"
 	"context"
+	"encoding/json"
+	"errors"
 	"fmt"
-	"io"
 	"net/http"
-	"net/url"
-	"strings"
 	"time"
+
+	"github.com/bytedance/sonic"
+	"github.com/mark3labs/mcp-go/mcp"
 )
 
-type EndpointProvider interface {
-	NewEndpoint(newSession string) (newEndpoint string)
-	LoadEndpoint(endpoint string) (session string)
+type MCPServer interface {
+	HandleMessage(ctx context.Context, message json.RawMessage) mcp.JSONRPCMessage
 }
 
-// SSEAProxy represents the proxy object that handles SSE and HTTP requests
-type SSEAProxy struct {
-	store    SessionManager
-	endpoint EndpointProvider
-	backend  string
-	headers  map[string]string
+// SSEServer implements a Server-Sent Events (SSE) based MCP server.
+// It provides real-time communication capabilities over HTTP using the SSE protocol.
+type SSEServer struct {
+	server          MCPServer
+	messageEndpoint string
+	eventQueue      chan string
+
+	keepAlive         bool
+	keepAliveInterval time.Duration
 }
 
-// NewSSEProxy creates a new proxy with the given backend and endpoint handler
-func NewSSEProxy(
-	backend string,
-	headers map[string]string,
-	store SessionManager,
-	endpoint EndpointProvider,
-) *SSEAProxy {
-	return &SSEAProxy{
-		store:    store,
-		endpoint: endpoint,
-		backend:  backend,
-		headers:  headers,
+// SSEOption defines a function type for configuring SSEServer
+type SSEOption func(*SSEServer)
+
+// WithMessageEndpoint sets the message endpoint path
+func WithMessageEndpoint(endpoint string) SSEOption {
+	return func(s *SSEServer) {
+		s.messageEndpoint = endpoint
 	}
 }
 
-func (p *SSEAProxy) SSEHandler(w http.ResponseWriter, r *http.Request) {
-	SSEHandler(w, r, p.store, p.endpoint, p.backend, p.headers)
+func WithKeepAliveInterval(keepAliveInterval time.Duration) SSEOption {
+	return func(s *SSEServer) {
+		s.keepAlive = true
+		s.keepAliveInterval = keepAliveInterval
+	}
 }
 
-func SSEHandler(
-	w http.ResponseWriter,
-	r *http.Request,
-	store SessionManager,
-	endpoint EndpointProvider,
-	backend string,
-	headers map[string]string,
-) {
-	// Create a request to the backend SSE endpoint
-	req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, backend, nil)
-	if err != nil {
-		http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
-		return
+func WithKeepAlive(keepAlive bool) SSEOption {
+	return func(s *SSEServer) {
+		s.keepAlive = keepAlive
 	}
+}
 
-	// Copy headers from original request
-	for name, value := range headers {
-		req.Header.Set(name, value)
+// NewSSEServer creates a new SSE server instance with the given MCP server and options.
+func NewSSEServer(server MCPServer, opts ...SSEOption) *SSEServer {
+	s := &SSEServer{
+		server:            server,
+		messageEndpoint:   "/message",
+		keepAlive:         false,
+		keepAliveInterval: 30 * time.Second,
+		eventQueue:        make(chan string, 100),
 	}
 
-	// Set necessary headers for SSE
-	req.Header.Set("Accept", "text/event-stream")
-	req.Header.Set("Cache-Control", "no-cache")
-	req.Header.Set("Connection", "keep-alive")
+	// Apply all options
+	for _, opt := range opts {
+		opt(s)
+	}
+
+	return s
+}
 
-	// Make the request to the backend
-	//nolint:bodyclose
-	resp, err := http.DefaultClient.Do(req)
-	if err != nil {
-		http.Error(w, "Failed to connect to backend", http.StatusInternalServerError)
+// handleSSE handles incoming SSE connection requests.
+// It sets up appropriate headers and creates a new session for the client.
+func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != http.MethodGet {
+		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 		return
 	}
-	defer resp.Body.Close()
 
-	// Set SSE headers for the client response
 	w.Header().Set("Content-Type", "text/event-stream")
 	w.Header().Set("Cache-Control", "no-cache")
 	w.Header().Set("Connection", "keep-alive")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	// Create a context that cancels when the client disconnects
-	ctx, cancel := context.WithCancel(r.Context())
-	defer cancel()
-
-	// Monitor client disconnection
-	go func() {
-		<-ctx.Done()
-		resp.Body.Close()
-	}()
-
-	// Parse the SSE stream and extract sessionID
-	reader := bufio.NewReader(resp.Body)
 	flusher, ok := w.(http.Flusher)
 	if !ok {
-		http.Error(w, "Streaming not supported", http.StatusInternalServerError)
+		http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
 		return
 	}
 
-	for {
-		line, err := reader.ReadString('\n')
-		if err != nil {
-			if err == io.EOF {
-				break
-			}
-			return
-		}
-
-		// Write the line to the client
-		fmt.Fprint(w, line)
-		flusher.Flush()
-
-		// Check if this is an endpoint event with sessionID
-		if strings.HasPrefix(line, "event: endpoint") {
-			// Next line should contain the data
-			dataLine, err := reader.ReadString('\n')
-			if err != nil {
-				return
+	// Start keep alive : ping
+	if s.keepAlive {
+		go func() {
+			ticker := time.NewTicker(s.keepAliveInterval)
+			defer ticker.Stop()
+			id := 0
+			for {
+				id++
+				select {
+				case <-ticker.C:
+					message := mcp.JSONRPCRequest{
+						JSONRPC: "2.0",
+						ID:      mcp.NewRequestId(id),
+						Request: mcp.Request{
+							Method: "ping",
+						},
+					}
+					messageBytes, _ := sonic.Marshal(message)
+					pingMsg := fmt.Sprintf("event: message\ndata:%s\n\n", messageBytes)
+					select {
+					case s.eventQueue <- pingMsg:
+					case <-r.Context().Done():
+						return
+					}
+				case <-r.Context().Done():
+					return
+				}
 			}
+		}()
+	}
 
-			newSession := store.New()
-			newEndpoint := endpoint.NewEndpoint(newSession)
-			defer func() {
-				store.Delete(newSession)
-			}()
-
-			// Extract sessionID from data line
-			// Example: data: /message?sessionId=3088a771-7961-44e8-9bdf-21953889f694
-			if strings.HasPrefix(dataLine, "data: ") {
-				endpoint := strings.TrimSpace(strings.TrimPrefix(dataLine, "data: "))
-				copyURL := *req.URL
-				backendHostURL := &copyURL
-				backendHostURL.Path = ""
-				backendHostURL.RawQuery = ""
-				store.Set(newSession, backendHostURL.String()+endpoint)
-			} else {
-				break
-			}
+	// Send the initial endpoint event
+	fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.messageEndpoint)
+	flusher.Flush()
 
-			// Write the data line to the client
-			_, _ = fmt.Fprintf(w, "data: %s\n", newEndpoint)
+	// Main event loop - this runs in the HTTP handler goroutine
+	for {
+		select {
+		case event := <-s.eventQueue:
+			// Write the event to the response
+			fmt.Fprint(w, event)
 			flusher.Flush()
+		case <-r.Context().Done():
+			return
 		}
 	}
 }
 
-func (p *SSEAProxy) ProxyHandler(w http.ResponseWriter, r *http.Request) {
-	SSEProxyHandler(w, r, p.store, p.endpoint)
-}
-
-func SSEProxyHandler(
-	w http.ResponseWriter,
-	r *http.Request,
-	store SessionManager,
-	endpoint EndpointProvider,
-) {
-	// Extract sessionID from the request
-	sessionID := endpoint.LoadEndpoint(r.URL.String())
-	if sessionID == "" {
-		http.Error(w, "Missing sessionId", http.StatusBadRequest)
-		return
-	}
-
-	// Look up the backend endpoint
-	backendEndpoint, ok := store.Get(sessionID)
-	if !ok {
-		http.Error(w, "Invalid or expired sessionId", http.StatusNotFound)
-		return
-	}
-
-	u, err := url.Parse(backendEndpoint)
-	if err != nil || (u.Scheme != "http" && u.Scheme != "https") {
-		http.Error(w, "Invalid backend", http.StatusBadRequest)
-		return
+// handleMessage processes incoming JSON-RPC messages from clients and sends responses
+// back through both the SSE connection and HTTP response.
+func (s *SSEServer) HandleMessage(ctx context.Context, req []byte) error {
+	// Parse message as raw JSON
+	var rawMessage json.RawMessage
+	if err := sonic.Unmarshal(req, &rawMessage); err != nil {
+		return err
 	}
 
-	// Create a request to the backend
-	req, err := http.NewRequestWithContext(r.Context(), r.Method, backendEndpoint, r.Body)
-	if err != nil {
-		http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
-		return
-	}
+	// Process message through MCPServer
+	response := s.server.HandleMessage(ctx, rawMessage)
 
-	// Copy headers from original request
-	for name, values := range r.Header {
-		for _, value := range values {
-			req.Header.Add(name, value)
+	// Only send response if there is one (not for notifications)
+	if response != nil {
+		var message string
+		eventData, err := sonic.Marshal(response)
+		if err != nil {
+			message = "event: message\ndata: {\"error\": \"internal error\",\"jsonrpc\": \"2.0\", \"id\": null}\n\n"
+		} else {
+			message = fmt.Sprintf("event: message\ndata: %s\n\n", eventData)
 		}
-	}
 
-	// Make the request to the backend
-	client := &http.Client{
-		Timeout: time.Second * 30,
-	}
-	resp, err := client.Do(req)
-	if err != nil {
-		http.Error(w, "Failed to connect to backend", http.StatusInternalServerError)
-		return
-	}
-	defer resp.Body.Close()
-
-	// Copy response headers
-	for name, values := range resp.Header {
-		for _, value := range values {
-			w.Header().Add(name, value)
+		// Queue the event for sending via SSE
+		select {
+		case s.eventQueue <- message:
+			// Event queued successfully
+		default:
+			// Queue is full
+			return errors.New("event queue is full")
 		}
 	}
 
-	// Set response status code
-	w.WriteHeader(resp.StatusCode)
-
-	// Copy response body
-	_, _ = io.Copy(w, resp.Body)
+	return nil
 }

+ 0 - 114
core/common/mcpproxy/sse_test.go

@@ -1,114 +0,0 @@
-package mcpproxy_test
-
-import (
-	"fmt"
-	"net/http"
-	"net/http/httptest"
-	"strings"
-	"testing"
-	"time"
-
-	"github.com/labring/aiproxy/core/common/mcpproxy"
-)
-
-type TestSessionManager struct {
-	m map[string]string
-}
-
-func (t *TestSessionManager) New() string {
-	return "test-session-id"
-}
-
-// Set stores a sessionID and its corresponding backend endpoint
-func (t *TestSessionManager) Set(sessionID, endpoint string) {
-	t.m[sessionID] = endpoint
-}
-
-// Get retrieves the backend endpoint for a sessionID
-func (t *TestSessionManager) Get(sessionID string) (string, bool) {
-	v, ok := t.m[sessionID]
-	return v, ok
-}
-
-// Delete removes a sessionID from the store
-func (t *TestSessionManager) Delete(string) {
-}
-
-type TestEndpointHandler struct{}
-
-func (h *TestEndpointHandler) NewEndpoint(_ string) string {
-	return "/message?sessionId=test-session-id"
-}
-
-func (h *TestEndpointHandler) LoadEndpoint(endpoint string) string {
-	if strings.Contains(endpoint, "test-session-id") {
-		return "test-session-id"
-	}
-	return ""
-}
-
-func TestProxySSEEndpoint(t *testing.T) {
-	reqDone := make(chan struct{})
-	// Setup a mock backend server
-	backendServer := httptest.NewServer(
-		http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
-			w.Header().Set("Content-Type", "text/event-stream")
-			w.Header().Set("Cache-Control", "no-cache")
-			w.Header().Set("Connection", "keep-alive")
-
-			flusher, ok := w.(http.Flusher)
-			if !ok {
-				t.Fatal("Expected ResponseWriter to be a Flusher")
-			}
-
-			// Send an endpoint event
-			fmt.Fprintf(w, "event: endpoint\n")
-			fmt.Fprintf(w, "data: /message?sessionId=original-session-id\n\n")
-			flusher.Flush()
-
-			close(reqDone)
-		}),
-	)
-	defer backendServer.Close()
-
-	// Create the proxy
-	store := &TestSessionManager{
-		m: map[string]string{},
-	}
-	handler := &TestEndpointHandler{}
-	proxy := mcpproxy.NewSSEProxy(backendServer.URL+"/sse", nil, store, handler)
-
-	// Setup the proxy server
-	proxyServer := httptest.NewServer(http.HandlerFunc(proxy.SSEHandler))
-	defer proxyServer.Close()
-
-	// Make a request to the proxy
-	req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, proxyServer.URL, nil)
-	if err != nil {
-		t.Fatalf("Error making request to proxy: %v", err)
-	}
-	resp, err := http.DefaultClient.Do(req)
-	if err != nil {
-		t.Fatalf("Error making request to proxy: %v", err)
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusOK {
-		t.Errorf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
-	}
-
-	select {
-	case <-time.NewTimer(time.Second).C:
-		t.Error("timeout")
-		return
-	case <-reqDone:
-	}
-
-	// Verify the session was stored
-	endpoint, ok := store.Get("test-session-id")
-	if !ok {
-		t.Error("Session was not stored")
-	}
-	if !strings.Contains(endpoint, "/message?sessionId=original-session-id") {
-		t.Errorf("Endpoint does not contain expected path, got: %s", endpoint)
-	}
-}

+ 12 - 17
core/common/mcpproxy/streamable.go

@@ -238,11 +238,15 @@ func (p *StreamableProxy) handlePostRequest(w http.ResponseWriter, r *http.Reque
 	// Add our proxy session ID
 	w.Header().Set("Mcp-Session-Id", proxySessionID)
 
+	contentType := resp.Header.Get("Content-Type")
+
+	w.Header().Set("Content-Type", contentType)
+
 	// Set response status code
 	w.WriteHeader(resp.StatusCode)
 
 	// Check if the response is an SSE stream
-	if strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") {
+	if strings.Contains(contentType, "text/event-stream") {
 		// Handle SSE response
 		reader := bufio.NewReader(resp.Body)
 		flusher, ok := w.(http.Flusher)
@@ -349,6 +353,9 @@ func (p *StreamableProxy) handleDeleteRequest(w http.ResponseWriter, r *http.Req
 		}
 	}
 
+	contentType := resp.Header.Get("Content-Type")
+	w.Header().Set("Content-Type", contentType)
+
 	// Set response status code
 	w.WriteHeader(resp.StatusCode)
 
@@ -365,13 +372,6 @@ func (p *StreamableProxy) proxyInitialOrNoSessionRequest(w http.ResponseWriter,
 		return
 	}
 
-	// Copy headers from original request
-	for name, values := range r.Header {
-		for _, value := range values {
-			req.Header.Add(name, value)
-		}
-	}
-
 	// Add any additional headers
 	for name, value := range p.headers {
 		req.Header.Set(name, value)
@@ -405,20 +405,15 @@ func (p *StreamableProxy) proxyInitialOrNoSessionRequest(w http.ResponseWriter,
 		w.Header().Set("Mcp-Session-Id", proxySessionID)
 	}
 
-	// Copy other response headers
-	for name, values := range resp.Header {
-		if name != "Mcp-Session-Id" { // Skip the original session ID
-			for _, value := range values {
-				w.Header().Add(name, value)
-			}
-		}
-	}
+	contentType := resp.Header.Get("Content-Type")
+
+	w.Header().Set("Content-Type", contentType)
 
 	// Set response status code
 	w.WriteHeader(resp.StatusCode)
 
 	// Check if the response is an SSE stream
-	if strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") {
+	if strings.Contains(contentType, "text/event-stream") {
 		// Handle SSE response
 		reader := bufio.NewReader(resp.Body)
 		flusher, ok := w.(http.Flusher)

+ 0 - 168
core/common/stateless-mcp/sse.go

@@ -1,168 +0,0 @@
-package statelessmcp
-
-import (
-	"context"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"net/http"
-	"time"
-
-	"github.com/bytedance/sonic"
-	"github.com/mark3labs/mcp-go/mcp"
-	"github.com/mark3labs/mcp-go/server"
-)
-
-// SSEServer implements a Server-Sent Events (SSE) based MCP server.
-// It provides real-time communication capabilities over HTTP using the SSE protocol.
-type SSEServer struct {
-	server          *server.MCPServer
-	messageEndpoint string
-	eventQueue      chan string
-
-	keepAlive         bool
-	keepAliveInterval time.Duration
-}
-
-// SSEOption defines a function type for configuring SSEServer
-type SSEOption func(*SSEServer)
-
-// WithMessageEndpoint sets the message endpoint path
-func WithMessageEndpoint(endpoint string) SSEOption {
-	return func(s *SSEServer) {
-		s.messageEndpoint = endpoint
-	}
-}
-
-func WithKeepAliveInterval(keepAliveInterval time.Duration) SSEOption {
-	return func(s *SSEServer) {
-		s.keepAlive = true
-		s.keepAliveInterval = keepAliveInterval
-	}
-}
-
-func WithKeepAlive(keepAlive bool) SSEOption {
-	return func(s *SSEServer) {
-		s.keepAlive = keepAlive
-	}
-}
-
-// NewSSEServer creates a new SSE server instance with the given MCP server and options.
-func NewSSEServer(server *server.MCPServer, opts ...SSEOption) *SSEServer {
-	s := &SSEServer{
-		server:            server,
-		messageEndpoint:   "/message",
-		keepAlive:         false,
-		keepAliveInterval: 30 * time.Second,
-		eventQueue:        make(chan string, 100),
-	}
-
-	// Apply all options
-	for _, opt := range opts {
-		opt(s)
-	}
-
-	return s
-}
-
-// handleSSE handles incoming SSE connection requests.
-// It sets up appropriate headers and creates a new session for the client.
-func (s *SSEServer) HandleSSE(w http.ResponseWriter, r *http.Request) {
-	if r.Method != http.MethodGet {
-		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
-		return
-	}
-
-	w.Header().Set("Content-Type", "text/event-stream")
-	w.Header().Set("Cache-Control", "no-cache")
-	w.Header().Set("Connection", "keep-alive")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-
-	flusher, ok := w.(http.Flusher)
-	if !ok {
-		http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
-		return
-	}
-
-	// Start keep alive : ping
-	if s.keepAlive {
-		go func() {
-			ticker := time.NewTicker(s.keepAliveInterval)
-			defer ticker.Stop()
-			id := 0
-			for {
-				id++
-				select {
-				case <-ticker.C:
-					message := mcp.JSONRPCRequest{
-						JSONRPC: "2.0",
-						ID:      mcp.NewRequestId(id),
-						Request: mcp.Request{
-							Method: "ping",
-						},
-					}
-					messageBytes, _ := sonic.Marshal(message)
-					pingMsg := fmt.Sprintf("event: message\ndata:%s\n\n", messageBytes)
-					select {
-					case s.eventQueue <- pingMsg:
-					case <-r.Context().Done():
-						return
-					}
-				case <-r.Context().Done():
-					return
-				}
-			}
-		}()
-	}
-
-	// Send the initial endpoint event
-	fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.messageEndpoint)
-	flusher.Flush()
-
-	// Main event loop - this runs in the HTTP handler goroutine
-	for {
-		select {
-		case event := <-s.eventQueue:
-			// Write the event to the response
-			fmt.Fprint(w, event)
-			flusher.Flush()
-		case <-r.Context().Done():
-			return
-		}
-	}
-}
-
-// handleMessage processes incoming JSON-RPC messages from clients and sends responses
-// back through both the SSE connection and HTTP response.
-func (s *SSEServer) HandleMessage(ctx context.Context, req []byte) error {
-	// Parse message as raw JSON
-	var rawMessage json.RawMessage
-	if err := sonic.Unmarshal(req, &rawMessage); err != nil {
-		return err
-	}
-
-	// Process message through MCPServer
-	response := s.server.HandleMessage(ctx, rawMessage)
-
-	// Only send response if there is one (not for notifications)
-	if response != nil {
-		var message string
-		eventData, err := sonic.Marshal(response)
-		if err != nil {
-			message = "event: message\ndata: {\"error\": \"internal error\",\"jsonrpc\": \"2.0\", \"id\": null}\n\n"
-		} else {
-			message = fmt.Sprintf("event: message\ndata: %s\n\n", eventData)
-		}
-
-		// Queue the event for sending via SSE
-		select {
-		case s.eventQueue <- message:
-			// Event queued successfully
-		default:
-			// Queue is full
-			return errors.New("event queue is full")
-		}
-	}
-
-	return nil
-}

+ 12 - 27
core/controller/embedmcp.go

@@ -11,7 +11,6 @@ import (
 
 	"github.com/gin-gonic/gin"
 	"github.com/labring/aiproxy/core/common/mcpproxy"
-	statelessmcp "github.com/labring/aiproxy/core/common/stateless-mcp"
 	"github.com/labring/aiproxy/core/middleware"
 	"github.com/labring/aiproxy/core/model"
 	mcpservers "github.com/labring/aiproxy/mcp-servers"
@@ -208,7 +207,7 @@ type testEmbedMcpEndpointProvider struct {
 	key string
 }
 
-func newTestEmbedMcpEndpoint(key string) mcpproxy.EndpointProvider {
+func newTestEmbedMcpEndpoint(key string) EndpointProvider {
 	return &testEmbedMcpEndpointProvider{
 		key: key,
 	}
@@ -267,29 +266,22 @@ func getConfigFromQuery(c *gin.Context) (map[string]string, map[string]string) {
 //	@Tags			embedmcp
 //	@Security		ApiKeyAuth
 //	@Param			id				path		string	true	"MCP ID"
-//	@Param			config[key]		query		string	false	"Initial configuration parameters (e.g.,
-//
-// config[host]=http://localhost:3000)" 	@Param			reusing[key]	query		string	false	"Reusing
-// configuration parameters (e.g., reusing[authorization]=apikey)"
-//
+//	@Param			config[key]		query		string	false	"Initial configuration parameters (e.g. config[host]=http://localhost:3000)"
+//	@Param			reusing[key]	query		string	false	"Reusing configuration parameters (e.g. reusing[authorization]=apikey)"
 //	@Success		200				{object}	nil
 //	@Failure		400				{object}	nil
 //	@Router			/api/test-embedmcp/{id}/sse [get]
 func TestEmbedMCPSseServer(c *gin.Context) {
 	id := c.Param("id")
 	if id == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"mcp id is required",
-		))
+		http.Error(c.Writer, "mcp id is required", http.StatusBadRequest)
 		return
 	}
 
 	initConfig, reusingConfig := getConfigFromQuery(c)
 	emcp, err := mcpservers.GetMCPServer(id, initConfig, reusingConfig)
 	if err != nil {
-		middleware.ErrorResponse(c, http.StatusBadRequest, err.Error())
+		http.Error(c.Writer, err.Error(), http.StatusBadRequest)
 		return
 	}
 
@@ -308,9 +300,9 @@ func handleTestEmbedMCPServer(c *gin.Context, s *server.MCPServer) {
 	newSession := store.New()
 
 	newEndpoint := newTestEmbedMcpEndpoint(token.Key).NewEndpoint(newSession)
-	server := statelessmcp.NewSSEServer(
+	server := mcpproxy.NewSSEServer(
 		s,
-		statelessmcp.WithMessageEndpoint(newEndpoint),
+		mcpproxy.WithMessageEndpoint(newEndpoint),
 	)
 
 	store.Set(newSession, testEmbedMcpType)
@@ -325,7 +317,7 @@ func handleTestEmbedMCPServer(c *gin.Context, s *server.MCPServer) {
 	go processMCPSseMpscMessages(ctx, newSession, server)
 
 	// Handle SSE connection
-	server.HandleSSE(c.Writer, c.Request)
+	server.ServeHTTP(c.Writer, c.Request)
 }
 
 // TestEmbedMCPMessage godoc
@@ -343,11 +335,7 @@ func handleTestEmbedMCPServer(c *gin.Context, s *server.MCPServer) {
 func TestEmbedMCPMessage(c *gin.Context) {
 	sessionID, _ := c.GetQuery("sessionId")
 	if sessionID == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"missing sessionId",
-		))
+		http.Error(c.Writer, "missing sessionId", http.StatusBadRequest)
 		return
 	}
 
@@ -361,11 +349,8 @@ func TestEmbedMCPMessage(c *gin.Context) {
 //	@Tags			embedmcp
 //	@Security		ApiKeyAuth
 //	@Param			id				path	string	true	"MCP ID"
-//	@Param			config[key]		query	string	false	"Initial configuration parameters (e.g.,
-//
-// config[host]=http://localhost:3000)" 	@Param			reusing[key]	query	string	false	"Reusing
-// configuration parameters (e.g., reusing[authorization]=apikey)"
-//
+//	@Param			config[key]		query	string	false	"Initial configuration parameters (e.g. config[host]=http://localhost:3000)"
+//	@Param			reusing[key]	query	string	false	"Reusing configuration parameters (e.g., reusing[authorization]=apikey)"
 //	@Accept			json
 //	@Produce		json
 //	@Success		200	{object}	nil
@@ -394,5 +379,5 @@ func TestEmbedMCPStreamable(c *gin.Context) {
 		))
 		return
 	}
-	handleGroupStreamableMCPServer(c, server)
+	handleStreamableMCPServer(c, server)
 }

+ 52 - 117
core/controller/groupmcp-server.go

@@ -2,19 +2,16 @@ package controller
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
 	"net/http"
 	"net/url"
 
-	"github.com/bytedance/sonic"
 	"github.com/gin-gonic/gin"
 	"github.com/labring/aiproxy/core/common/mcpproxy"
-	statelessmcp "github.com/labring/aiproxy/core/common/stateless-mcp"
 	"github.com/labring/aiproxy/core/middleware"
 	"github.com/labring/aiproxy/core/model"
+	"github.com/mark3labs/mcp-go/client/transport"
 	"github.com/mark3labs/mcp-go/mcp"
-	"github.com/mark3labs/mcp-go/server"
 )
 
 type groupMcpEndpointProvider struct {
@@ -22,7 +19,7 @@ type groupMcpEndpointProvider struct {
 	t   model.GroupMCPType
 }
 
-func newGroupMcpEndpoint(key string, t model.GroupMCPType) mcpproxy.EndpointProvider {
+func newGroupMcpEndpoint(key string, t model.GroupMCPType) EndpointProvider {
 	return &groupMcpEndpointProvider{
 		key: key,
 		t:   t,
@@ -50,11 +47,7 @@ func (m *groupMcpEndpointProvider) LoadEndpoint(endpoint string) (session string
 func GroupMCPSseServer(c *gin.Context) {
 	id := c.Param("id")
 	if id == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"mcp id is required",
-		))
+		http.Error(c.Writer, "mcp id is required", http.StatusBadRequest)
 		return
 	}
 
@@ -62,85 +55,61 @@ func GroupMCPSseServer(c *gin.Context) {
 
 	groupMcp, err := model.CacheGetGroupMCP(group.ID, id)
 	if err != nil {
-		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			err.Error(),
-		))
+		http.Error(c.Writer, err.Error(), http.StatusNotFound)
 		return
 	}
 	if groupMcp.Status != model.GroupMCPStatusEnabled {
-		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"mcp is not enabled",
-		))
+		http.Error(c.Writer, "mcp is not enabled", http.StatusNotFound)
 		return
 	}
 
 	switch groupMcp.Type {
 	case model.GroupMCPTypeProxySSE:
-		handleGroupProxySSE(c, groupMcp.ProxyConfig)
+		client, err := transport.NewSSE(
+			groupMcp.ProxyConfig.URL,
+			transport.WithHeaders(groupMcp.ProxyConfig.Headers),
+		)
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		err = client.Start(c.Request.Context())
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		defer client.Close()
+		handleGroupMCPServer(c, wrapMCPClient2Server(client), model.GroupMCPTypeProxySSE)
+	case model.GroupMCPTypeProxyStreamable:
+		client, err := transport.NewStreamableHTTP(
+			groupMcp.ProxyConfig.URL,
+			transport.WithHTTPHeaders(groupMcp.ProxyConfig.Headers),
+		)
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		err = client.Start(c.Request.Context())
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		defer client.Close()
+		handleGroupMCPServer(c, wrapMCPClient2Server(client), model.GroupMCPTypeProxyStreamable)
 	case model.GroupMCPTypeOpenAPI:
 		server, err := newOpenAPIMCPServer(groupMcp.OpenAPIConfig)
 		if err != nil {
-			c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-				mcp.NewRequestId(nil),
-				mcp.INVALID_REQUEST,
-				err.Error(),
-			))
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
 			return
 		}
 		handleGroupMCPServer(c, server, model.GroupMCPTypeOpenAPI)
 	default:
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"unsupported mcp type",
-		))
+		http.Error(c.Writer, "unsupported mcp type", http.StatusBadRequest)
 	}
 }
 
-// handlePublicProxySSE processes SSE proxy requests
-func handleGroupProxySSE(c *gin.Context, config *model.GroupMCPProxyConfig) {
-	if config == nil || config.URL == "" {
-		return
-	}
-
-	backendURL, err := url.Parse(config.URL)
-	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			err.Error(),
-		))
-		return
-	}
-
-	headers := make(map[string]string)
-	backendQuery := &url.Values{}
-	token := middleware.GetToken(c)
-
-	for k, v := range config.Headers {
-		headers[k] = v
-	}
-	for k, v := range config.Querys {
-		backendQuery.Set(k, v)
-	}
-
-	backendURL.RawQuery = backendQuery.Encode()
-	mcpproxy.SSEHandler(
-		c.Writer,
-		c.Request,
-		getStore(),
-		newGroupMcpEndpoint(token.Key, model.GroupMCPTypeProxySSE),
-		backendURL.String(),
-		headers,
-	)
-}
-
 // handleMCPServer handles the SSE connection for an MCP server
-func handleGroupMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.GroupMCPType) {
+func handleGroupMCPServer(c *gin.Context, s mcpproxy.MCPServer, mcpType model.GroupMCPType) {
 	token := middleware.GetToken(c)
 
 	// Store the session
@@ -148,9 +117,9 @@ func handleGroupMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.Gro
 	newSession := store.New()
 
 	newEndpoint := newGroupMcpEndpoint(token.Key, mcpType).NewEndpoint(newSession)
-	server := statelessmcp.NewSSEServer(
+	server := mcpproxy.NewSSEServer(
 		s,
-		statelessmcp.WithMessageEndpoint(newEndpoint),
+		mcpproxy.WithMessageEndpoint(newEndpoint),
 	)
 
 	store.Set(newSession, string(mcpType))
@@ -165,7 +134,7 @@ func handleGroupMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.Gro
 	go processMCPSseMpscMessages(ctx, newSession, server)
 
 	// Handle SSE connection
-	server.HandleSSE(c.Writer, c.Request)
+	server.ServeHTTP(c.Writer, c.Request)
 }
 
 // GroupMCPMessage godoc
@@ -174,39 +143,28 @@ func handleGroupMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.Gro
 //	@Security	ApiKeyAuth
 //	@Router		/mcp/group/message [post]
 func GroupMCPMessage(c *gin.Context) {
-	token := middleware.GetToken(c)
-
 	mcpTypeStr, _ := c.GetQuery("type")
 	if mcpTypeStr == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"missing mcp type",
-		))
+		http.Error(c.Writer, "missing mcp type", http.StatusBadRequest)
 		return
 	}
 	mcpType := model.GroupMCPType(mcpTypeStr)
 
 	sessionID, _ := c.GetQuery("sessionId")
 	if sessionID == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"missing sessionId",
-		))
+		http.Error(c.Writer, "missing sessionId", http.StatusBadRequest)
 		return
 	}
 
 	switch mcpType {
 	case model.GroupMCPTypeProxySSE:
-		mcpproxy.SSEProxyHandler(
-			c.Writer,
-			c.Request,
-			getStore(),
-			newGroupMcpEndpoint(token.Key, mcpType),
-		)
-	default:
 		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
+	case model.GroupMCPTypeProxyStreamable:
+		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
+	case model.GroupMCPTypeOpenAPI:
+		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
+	default:
+		http.Error(c.Writer, "unknown mcp type", http.StatusBadRequest)
 	}
 }
 
@@ -261,7 +219,7 @@ func GroupMCPStreamable(c *gin.Context) {
 			))
 			return
 		}
-		handleGroupStreamableMCPServer(c, server)
+		handleStreamableMCPServer(c, server)
 	default:
 		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
@@ -288,7 +246,7 @@ func handleGroupProxyStreamable(c *gin.Context, config *model.GroupMCPProxyConfi
 	}
 
 	headers := make(map[string]string)
-	backendQuery := &url.Values{}
+	backendQuery := backendURL.Query()
 
 	for k, v := range config.Headers {
 		headers[k] = v
@@ -301,26 +259,3 @@ func handleGroupProxyStreamable(c *gin.Context, config *model.GroupMCPProxyConfi
 	mcpproxy.NewStreamableProxy(backendURL.String(), headers, getStore()).
 		ServeHTTP(c.Writer, c.Request)
 }
-
-// handleGroupStreamableMCPServer handles the streamable connection for a group MCP server
-func handleGroupStreamableMCPServer(c *gin.Context, s *server.MCPServer) {
-	if c.Request.Method != http.MethodPost {
-		c.JSON(http.StatusMethodNotAllowed, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.METHOD_NOT_FOUND,
-			"method not allowed",
-		))
-		return
-	}
-	var rawMessage json.RawMessage
-	if err := sonic.ConfigDefault.NewDecoder(c.Request.Body).Decode(&rawMessage); err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.PARSE_ERROR,
-			err.Error(),
-		))
-		return
-	}
-	respMessage := s.HandleMessage(c.Request.Context(), rawMessage)
-	c.JSON(http.StatusOK, respMessage)
-}

+ 159 - 124
core/controller/publicmcp-server.go

@@ -16,23 +16,28 @@ import (
 	"github.com/gin-gonic/gin"
 	"github.com/labring/aiproxy/core/common"
 	"github.com/labring/aiproxy/core/common/mcpproxy"
-	statelessmcp "github.com/labring/aiproxy/core/common/stateless-mcp"
 	"github.com/labring/aiproxy/core/middleware"
 	"github.com/labring/aiproxy/core/model"
 	mcpservers "github.com/labring/aiproxy/mcp-servers"
 	"github.com/labring/aiproxy/openapi-mcp/convert"
+	"github.com/mark3labs/mcp-go/client/transport"
 	"github.com/mark3labs/mcp-go/mcp"
 	"github.com/mark3labs/mcp-go/server"
 	"github.com/redis/go-redis/v9"
 )
 
+type EndpointProvider interface {
+	NewEndpoint(newSession string) (newEndpoint string)
+	LoadEndpoint(endpoint string) (session string)
+}
+
 // publicMcpEndpointProvider implements the EndpointProvider interface for MCP
 type publicMcpEndpointProvider struct {
 	key string
 	t   model.PublicMCPType
 }
 
-func newPublicMcpEndpoint(key string, t model.PublicMCPType) mcpproxy.EndpointProvider {
+func newPublicMcpEndpoint(key string, t model.PublicMCPType) EndpointProvider {
 	return &publicMcpEndpointProvider{
 		key: key,
 		t:   t,
@@ -117,6 +122,92 @@ func (r *redisStoreManager) Delete(session string) {
 	r.rdb.Del(ctx, "mcp:session:"+session)
 }
 
+type mcpClient2Server struct {
+	client transport.Interface
+}
+
+type JSONRPCNoErrorResponse struct {
+	JSONRPC string          `json:"jsonrpc"`
+	ID      mcp.RequestId   `json:"id"`
+	Result  json.RawMessage `json:"result"`
+}
+
+func handleError(err error) mcp.JSONRPCMessage {
+	return mcp.JSONRPCError{
+		JSONRPC: mcp.JSONRPC_VERSION,
+		ID:      mcp.NewRequestId(nil),
+		Error: struct {
+			Code    int    `json:"code"`
+			Message string `json:"message"`
+			Data    any    `json:"data,omitempty"`
+		}{
+			Code:    mcp.INTERNAL_ERROR,
+			Message: err.Error(),
+		},
+	}
+}
+
+func (s *mcpClient2Server) HandleMessage(
+	ctx context.Context,
+	message json.RawMessage,
+) mcp.JSONRPCMessage {
+	methodNode, err := sonic.Get(message, "method")
+	if err != nil {
+		return handleError(err)
+	}
+	method, err := methodNode.String()
+	if err != nil {
+		return handleError(err)
+	}
+
+	switch method {
+	case "notifications/initialized":
+		req := mcp.JSONRPCNotification{}
+		err := sonic.Unmarshal(message, &req)
+		if err != nil {
+			return handleError(err)
+		}
+		err = s.client.SendNotification(ctx, req)
+		if err != nil {
+			return handleError(err)
+		}
+		return nil
+	default:
+		req := transport.JSONRPCRequest{}
+		err := sonic.Unmarshal(message, &req)
+		if err != nil {
+			return handleError(err)
+		}
+		resp, err := s.client.SendRequest(ctx, req)
+		if err != nil {
+			return mcp.JSONRPCError{
+				JSONRPC: mcp.JSONRPC_VERSION,
+				ID:      mcp.NewRequestId(nil),
+				Error: struct {
+					Code    int    `json:"code"`
+					Message string `json:"message"`
+					Data    any    `json:"data,omitempty"`
+				}{
+					Code:    mcp.INTERNAL_ERROR,
+					Message: err.Error(),
+				},
+			}
+		}
+		if resp.Error != nil {
+			return resp
+		}
+		return &JSONRPCNoErrorResponse{
+			JSONRPC: resp.JSONRPC,
+			ID:      resp.ID,
+			Result:  resp.Result,
+		}
+	}
+}
+
+func wrapMCPClient2Server(client transport.Interface) mcpproxy.MCPServer {
+	return &mcpClient2Server{client: client}
+}
+
 // PublicMCPSseServer godoc
 //
 //	@Summary	Public MCP SSE Server
@@ -124,28 +215,54 @@ func (r *redisStoreManager) Delete(session string) {
 //	@Router		/mcp/public/{id}/sse [get]
 func PublicMCPSseServer(c *gin.Context) {
 	mcpID := c.Param("id")
+	if mcpID == "" {
+		http.Error(c.Writer, "mcp id is required", http.StatusBadRequest)
+		return
+	}
 
 	publicMcp, err := model.CacheGetPublicMCP(mcpID)
 	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			err.Error(),
-		))
+		http.Error(c.Writer, err.Error(), http.StatusBadRequest)
 		return
 	}
 	if publicMcp.Status != model.PublicMCPStatusEnabled {
-		c.JSON(http.StatusNotFound, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"mcp is not enabled",
-		))
+		http.Error(c.Writer, "mcp is not enabled", http.StatusBadRequest)
 		return
 	}
 
 	switch publicMcp.Type {
 	case model.PublicMCPTypeProxySSE:
-		handlePublicProxySSE(c, publicMcp.ID, publicMcp.ProxyConfig)
+		client, err := transport.NewSSE(
+			publicMcp.ProxyConfig.URL,
+			transport.WithHeaders(publicMcp.ProxyConfig.Headers),
+		)
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		err = client.Start(c.Request.Context())
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		defer client.Close()
+		handleSSEMCPServer(c, wrapMCPClient2Server(client), model.PublicMCPTypeProxySSE)
+	case model.PublicMCPTypeProxyStreamable:
+		client, err := transport.NewStreamableHTTP(
+			publicMcp.ProxyConfig.URL,
+			transport.WithHTTPHeaders(publicMcp.ProxyConfig.Headers),
+		)
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		err = client.Start(c.Request.Context())
+		if err != nil {
+			http.Error(c.Writer, err.Error(), http.StatusBadRequest)
+			return
+		}
+		defer client.Close()
+		handleSSEMCPServer(c, wrapMCPClient2Server(client), model.PublicMCPTypeProxyStreamable)
 	case model.PublicMCPTypeOpenAPI:
 		server, err := newOpenAPIMCPServer(publicMcp.OpenAPIConfig)
 		if err != nil {
@@ -196,55 +313,6 @@ func handlePublicEmbedMCP(c *gin.Context, mcpID string, config *model.MCPEmbeddi
 	handleSSEMCPServer(c, server, model.PublicMCPTypeEmbed)
 }
 
-// handlePublicProxySSE processes SSE proxy requests
-func handlePublicProxySSE(c *gin.Context, mcpID string, config *model.PublicMCPProxyConfig) {
-	if config == nil || config.URL == "" {
-		return
-	}
-
-	backendURL, err := url.Parse(config.URL)
-	if err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			err.Error(),
-		))
-		return
-	}
-
-	headers := make(map[string]string)
-	backendQuery := &url.Values{}
-	group := middleware.GetGroup(c)
-	token := middleware.GetToken(c)
-
-	// Process reusing parameters if any
-	if err := processReusingParams(config.ReusingParams, mcpID, group.ID, headers, backendQuery); err != nil {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			err.Error(),
-		))
-		return
-	}
-
-	for k, v := range config.Headers {
-		headers[k] = v
-	}
-	for k, v := range config.Querys {
-		backendQuery.Set(k, v)
-	}
-
-	backendURL.RawQuery = backendQuery.Encode()
-	mcpproxy.SSEHandler(
-		c.Writer,
-		c.Request,
-		getStore(),
-		newPublicMcpEndpoint(token.Key, model.PublicMCPTypeProxySSE),
-		backendURL.String(),
-		headers,
-	)
-}
-
 // newOpenAPIMCPServer creates a new MCP server from OpenAPI configuration
 func newOpenAPIMCPServer(config *model.MCPOpenAPIConfig) (*server.MCPServer, error) {
 	if config == nil || (config.OpenAPISpec == "" && config.OpenAPIContent == "") {
@@ -281,7 +349,7 @@ func newOpenAPIMCPServer(config *model.MCPOpenAPIConfig) (*server.MCPServer, err
 }
 
 // handleSSEMCPServer handles the SSE connection for an MCP server
-func handleSSEMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.PublicMCPType) {
+func handleSSEMCPServer(c *gin.Context, s mcpproxy.MCPServer, mcpType model.PublicMCPType) {
 	token := middleware.GetToken(c)
 
 	// Store the session
@@ -289,9 +357,9 @@ func handleSSEMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.Publi
 	newSession := store.New()
 
 	newEndpoint := newPublicMcpEndpoint(token.Key, mcpType).NewEndpoint(newSession)
-	server := statelessmcp.NewSSEServer(
+	server := mcpproxy.NewSSEServer(
 		s,
-		statelessmcp.WithMessageEndpoint(newEndpoint),
+		mcpproxy.WithMessageEndpoint(newEndpoint),
 	)
 
 	store.Set(newSession, string(mcpType))
@@ -306,7 +374,7 @@ func handleSSEMCPServer(c *gin.Context, s *server.MCPServer, mcpType model.Publi
 	go processMCPSseMpscMessages(ctx, newSession, server)
 
 	// Handle SSE connection
-	server.HandleSSE(c.Writer, c.Request)
+	server.ServeHTTP(c.Writer, c.Request)
 }
 
 // parseOpenAPIFromURL parses OpenAPI spec from a URL
@@ -338,7 +406,7 @@ func parseOpenAPIFromContent(config *model.MCPOpenAPIConfig, parser *convert.Par
 func processMCPSseMpscMessages(
 	ctx context.Context,
 	sessionID string,
-	server *statelessmcp.SSEServer,
+	server *mcpproxy.SSEServer,
 ) {
 	mpscInstance := getMCPMpsc()
 	for {
@@ -401,75 +469,47 @@ func processReusingParams(
 //	@Security	ApiKeyAuth
 //	@Router		/mcp/public/message [post]
 func PublicMCPMessage(c *gin.Context) {
-	token := middleware.GetToken(c)
 	mcpTypeStr, _ := c.GetQuery("type")
 	if mcpTypeStr == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"missing mcp type",
-		))
+		http.Error(c.Writer, "missing mcp type", http.StatusBadRequest)
 		return
 	}
 	mcpType := model.PublicMCPType(mcpTypeStr)
 	sessionID, _ := c.GetQuery("sessionId")
 	if sessionID == "" {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"missing sessionId",
-		))
+		http.Error(c.Writer, "missing sessionId", http.StatusBadRequest)
 		return
 	}
 
 	switch mcpType {
 	case model.PublicMCPTypeProxySSE:
-		mcpproxy.SSEProxyHandler(
-			c.Writer,
-			c.Request,
-			getStore(),
-			newPublicMcpEndpoint(token.Key, mcpType),
-		)
+		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
+	case model.PublicMCPTypeProxyStreamable:
+		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
 	case model.PublicMCPTypeOpenAPI:
 		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
 	case model.PublicMCPTypeEmbed:
 		sendMCPSSEMessage(c, mcpTypeStr, sessionID)
 	default:
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"unknown mcp type",
-		))
+		http.Error(c.Writer, "unknown mcp type", http.StatusBadRequest)
 	}
 }
 
 func sendMCPSSEMessage(c *gin.Context, mcpType, sessionID string) {
 	backend, ok := getStore().Get(sessionID)
 	if !ok || backend != mcpType {
-		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INVALID_REQUEST,
-			"invalid session",
-		))
+		http.Error(c.Writer, "invalid session", http.StatusBadRequest)
 		return
 	}
 	mpscInstance := getMCPMpsc()
 	body, err := io.ReadAll(c.Request.Body)
 	if err != nil {
-		c.JSON(http.StatusInternalServerError, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INTERNAL_ERROR,
-			err.Error(),
-		))
+		http.Error(c.Writer, err.Error(), http.StatusInternalServerError)
 		return
 	}
 	err = mpscInstance.send(c.Request.Context(), sessionID, body)
 	if err != nil {
-		c.JSON(http.StatusInternalServerError, CreateMCPErrorResponse(
-			mcp.NewRequestId(nil),
-			mcp.INTERNAL_ERROR,
-			err.Error(),
-		))
+		http.Error(c.Writer, err.Error(), http.StatusInternalServerError)
 		return
 	}
 	c.Writer.WriteHeader(http.StatusAccepted)
@@ -578,12 +618,11 @@ func handlePublicProxyStreamable(c *gin.Context, mcpID string, config *model.Pub
 	}
 
 	headers := make(map[string]string)
-	backendQuery := &url.Values{}
+	backendQuery := backendURL.Query()
 	group := middleware.GetGroup(c)
-	token := middleware.GetToken(c)
 
 	// Process reusing parameters if any
-	if err := processReusingParams(config.ReusingParams, mcpID, group.ID, headers, backendQuery); err != nil {
+	if err := processReusingParams(config.ReusingParams, mcpID, group.ID, headers, &backendQuery); err != nil {
 		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.INVALID_REQUEST,
@@ -600,15 +639,6 @@ func handlePublicProxyStreamable(c *gin.Context, mcpID string, config *model.Pub
 	}
 
 	backendURL.RawQuery = backendQuery.Encode()
-	mcpproxy.SSEHandler(
-		c.Writer,
-		c.Request,
-		getStore(),
-		newPublicMcpEndpoint(token.Key, model.PublicMCPTypeProxySSE),
-		backendURL.String(),
-		headers,
-	)
-
 	mcpproxy.NewStreamableProxy(backendURL.String(), headers, getStore()).
 		ServeHTTP(c.Writer, c.Request)
 }
@@ -623,8 +653,8 @@ func handleStreamableMCPServer(c *gin.Context, s *server.MCPServer) {
 		))
 		return
 	}
-	var rawMessage json.RawMessage
-	if err := sonic.ConfigDefault.NewDecoder(c.Request.Body).Decode(&rawMessage); err != nil {
+	reqBody, err := io.ReadAll(c.Request.Body)
+	if err != nil {
 		c.JSON(http.StatusBadRequest, CreateMCPErrorResponse(
 			mcp.NewRequestId(nil),
 			mcp.PARSE_ERROR,
@@ -632,7 +662,12 @@ func handleStreamableMCPServer(c *gin.Context, s *server.MCPServer) {
 		))
 		return
 	}
-	respMessage := s.HandleMessage(c.Request.Context(), rawMessage)
+	respMessage := s.HandleMessage(c.Request.Context(), reqBody)
+	if respMessage == nil {
+		// For notifications, just send 202 Accepted with no body
+		c.Status(http.StatusAccepted)
+		return
+	}
 	c.JSON(http.StatusOK, respMessage)
 }
 
@@ -680,16 +715,16 @@ func newChannelMCPMpsc() *channelMCPMpsc {
 }
 
 // cleanupExpiredChannels periodically checks for and removes channels that haven't been accessed in
-// 5 minutes
+// 15 seconds
 func (c *channelMCPMpsc) cleanupExpiredChannels() {
-	ticker := time.NewTicker(1 * time.Minute)
+	ticker := time.NewTicker(15 * time.Second)
 	defer ticker.Stop()
 
 	for range ticker.C {
 		c.channelMutex.Lock()
 		now := time.Now()
 		for id, lastAccess := range c.lastAccess {
-			if now.Sub(lastAccess) > 5*time.Minute {
+			if now.Sub(lastAccess) > 15*time.Second {
 				// Close and delete the channel
 				if ch, exists := c.channels[id]; exists {
 					close(ch)
@@ -765,11 +800,11 @@ func newRedisMCPMPSC(rdb *redis.Client) *redisMCPMPSC {
 }
 
 func (r *redisMCPMPSC) send(ctx context.Context, id string, data []byte) error {
-	// Set expiration to 5 minutes when sending data
+	// Set expiration to 15 seconds when sending data
 	id = "mcp:mpsc:" + id
 	pipe := r.rdb.Pipeline()
 	pipe.LPush(ctx, id, data)
-	pipe.Expire(ctx, id, 5*time.Minute)
+	pipe.Expire(ctx, id, 15*time.Second)
 	_, err := pipe.Exec(ctx)
 	return err
 }

+ 1 - 2
core/controller/relay-controller.go

@@ -398,7 +398,7 @@ func getPriority(channel *model.Channel, errorRate float64) int32 {
 }
 
 //
-//nolint:gosec
+
 func getRandomChannel(
 	channels []*model.Channel,
 	errorRates map[int64]float64,
@@ -957,7 +957,6 @@ func shouldDelay(statusCode, lastChannelID, newChannelID int) bool {
 }
 
 func relayDelay() {
-	//nolint:gosec
 	time.Sleep(time.Duration(rand.Float64()*float64(time.Second)) + time.Second)
 }
 

+ 6 - 6
core/docs/docs.go

@@ -5601,13 +5601,13 @@ const docTemplate = `{
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
                     {
                         "type": "string",
-                        "description": "Reusing configuration parameters (e.g., reusing[authorization]=apikey)",
+                        "description": "Reusing configuration parameters (e.g. reusing[authorization]=apikey)",
                         "name": "reusing[key]",
                         "in": "query"
                     }
@@ -5650,7 +5650,7 @@ const docTemplate = `{
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
@@ -5697,7 +5697,7 @@ const docTemplate = `{
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
@@ -5744,7 +5744,7 @@ const docTemplate = `{
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
@@ -9979,7 +9979,7 @@ const docTemplate = `{
                     "type": "number"
                 },
                 "thinking_mode_output_price": {
-                    "description": "when ThinkingModeOutputPrice and ReasoningTokens are not 0, OutputPrice and OutputPriceUnit will be overwritten",
+                    "description": "when ThinkingModeOutputPrice and ReasoningTokens are not 0, OutputPrice and OutputPriceUnit\nwill be overwritten",
                     "type": "number"
                 },
                 "thinking_mode_output_price_unit": {

+ 6 - 6
core/docs/swagger.json

@@ -5592,13 +5592,13 @@
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
                     {
                         "type": "string",
-                        "description": "Reusing configuration parameters (e.g., reusing[authorization]=apikey)",
+                        "description": "Reusing configuration parameters (e.g. reusing[authorization]=apikey)",
                         "name": "reusing[key]",
                         "in": "query"
                     }
@@ -5641,7 +5641,7 @@
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
@@ -5688,7 +5688,7 @@
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
@@ -5735,7 +5735,7 @@
                     },
                     {
                         "type": "string",
-                        "description": "Initial configuration parameters (e.g., config[host]=http://localhost:3000)",
+                        "description": "Initial configuration parameters (e.g. config[host]=http://localhost:3000)",
                         "name": "config[key]",
                         "in": "query"
                     },
@@ -9970,7 +9970,7 @@
                     "type": "number"
                 },
                 "thinking_mode_output_price": {
-                    "description": "when ThinkingModeOutputPrice and ReasoningTokens are not 0, OutputPrice and OutputPriceUnit will be overwritten",
+                    "description": "when ThinkingModeOutputPrice and ReasoningTokens are not 0, OutputPrice and OutputPriceUnit\nwill be overwritten",
                     "type": "number"
                 },
                 "thinking_mode_output_price_unit": {

+ 8 - 7
core/docs/swagger.yaml

@@ -1506,8 +1506,9 @@ definitions:
       per_request_price:
         type: number
       thinking_mode_output_price:
-        description: when ThinkingModeOutputPrice and ReasoningTokens are not 0, OutputPrice
-          and OutputPriceUnit will be overwritten
+        description: |-
+          when ThinkingModeOutputPrice and ReasoningTokens are not 0, OutputPrice and OutputPriceUnit
+          will be overwritten
         type: number
       thinking_mode_output_price_unit:
         type: integer
@@ -5131,11 +5132,11 @@ paths:
         name: id
         required: true
         type: string
-      - description: Initial configuration parameters (e.g., config[host]=http://localhost:3000)
+      - description: Initial configuration parameters (e.g. config[host]=http://localhost:3000)
         in: query
         name: config[key]
         type: string
-      - description: Reusing configuration parameters (e.g., reusing[authorization]=apikey)
+      - description: Reusing configuration parameters (e.g. reusing[authorization]=apikey)
         in: query
         name: reusing[key]
         type: string
@@ -5160,7 +5161,7 @@ paths:
         name: id
         required: true
         type: string
-      - description: Initial configuration parameters (e.g., config[host]=http://localhost:3000)
+      - description: Initial configuration parameters (e.g. config[host]=http://localhost:3000)
         in: query
         name: config[key]
         type: string
@@ -5190,7 +5191,7 @@ paths:
         name: id
         required: true
         type: string
-      - description: Initial configuration parameters (e.g., config[host]=http://localhost:3000)
+      - description: Initial configuration parameters (e.g. config[host]=http://localhost:3000)
         in: query
         name: config[key]
         type: string
@@ -5220,7 +5221,7 @@ paths:
         name: id
         required: true
         type: string
-      - description: Initial configuration parameters (e.g., config[host]=http://localhost:3000)
+      - description: Initial configuration parameters (e.g. config[host]=http://localhost:3000)
         in: query
         name: config[key]
         type: string

+ 4 - 0
core/middleware/auth.go

@@ -56,6 +56,10 @@ func AdminAuth(c *gin.Context) {
 		return
 	}
 
+	c.Set(Token, &model.TokenCache{
+		Key: config.AdminKey,
+	})
+
 	group := c.Param("group")
 	if group != "" {
 		log := GetLogger(c)

+ 0 - 5
core/model/cache.go

@@ -148,7 +148,6 @@ func CacheDeleteToken(key string) error {
 	return common.RedisDel(fmt.Sprintf(TokenCacheKey, key))
 }
 
-//nolint:gosec
 func CacheSetToken(token *TokenCache) error {
 	if !common.RedisEnabled {
 		return nil
@@ -355,7 +354,6 @@ func CacheUpdateGroupStatus(id string, status int) error {
 		Err()
 }
 
-//nolint:gosec
 func CacheSetGroup(group *GroupCache) error {
 	if !common.RedisEnabled {
 		return nil
@@ -453,7 +451,6 @@ func CacheDeleteGroupMCP(groupID, mcpID string) error {
 	return common.RedisDel(fmt.Sprintf(GroupMCPCacheKey, groupID, mcpID))
 }
 
-//nolint:gosec
 func CacheSetGroupMCP(groupMCP *GroupMCPCache) error {
 	if !common.RedisEnabled {
 		return nil
@@ -547,7 +544,6 @@ func CacheDeletePublicMCP(mcpID string) error {
 	return common.RedisDel(fmt.Sprintf(PublicMCPCacheKey, mcpID))
 }
 
-//nolint:gosec
 func CacheSetPublicMCP(publicMCP *PublicMCPCache) error {
 	if !common.RedisEnabled {
 		return nil
@@ -633,7 +629,6 @@ func CacheDeletePublicMCPReusingParam(mcpID, groupID string) error {
 	return common.RedisDel(fmt.Sprintf(PublicMCPReusingParamCacheKey, mcpID, groupID))
 }
 
-//nolint:gosec
 func CacheSetPublicMCPReusingParam(param *PublicMCPReusingParamCache) error {
 	if !common.RedisEnabled {
 		return nil

+ 0 - 1
core/model/token.go

@@ -51,7 +51,6 @@ const (
 	keyChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
 )
 
-//nolint:gosec
 func generateKey() string {
 	key := make([]byte, 48)
 	for i := range key {

+ 3 - 2
mcp-servers/.golangci.yml

@@ -104,6 +104,9 @@ linters:
       constant-kind: true
     usetesting:
       os-temp-dir: true
+    gosec:
+      excludes:
+        - G404
 
 formatters:
   enable:
@@ -126,5 +129,3 @@ formatters:
           replacement: "a[b:]"
     gofumpt:
       extra-rules: true
-    golines:
-      shorten-comments: true

+ 3 - 2
openapi-mcp/.golangci.yml

@@ -104,6 +104,9 @@ linters:
       constant-kind: true
     usetesting:
       os-temp-dir: true
+    gosec:
+      excludes:
+        - G404
 
 formatters:
   enable:
@@ -126,5 +129,3 @@ formatters:
           replacement: "a[b:]"
     gofumpt:
       extra-rules: true
-    golines:
-      shorten-comments: true