stateless-streamable.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package mcpproxy
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strconv"
  8. "github.com/bytedance/sonic"
  9. mcpservers "github.com/labring/aiproxy/mcp-servers"
  10. "github.com/mark3labs/mcp-go/mcp"
  11. )
  12. type StreamableHTTPOption func(*StreamableHTTPServer)
  13. type StreamableHTTPServer struct {
  14. server mcpservers.Server
  15. }
  16. // NewStatelessStreamableHTTPServer creates a new streamable-http server instance
  17. func NewStatelessStreamableHTTPServer(
  18. server mcpservers.Server,
  19. opts ...StreamableHTTPOption,
  20. ) *StreamableHTTPServer {
  21. s := &StreamableHTTPServer{
  22. server: server,
  23. }
  24. for _, opt := range opts {
  25. opt(s)
  26. }
  27. return s
  28. }
  29. // ServeHTTP implements the http.Handler interface.
  30. func (s *StreamableHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  31. switch r.Method {
  32. case http.MethodPost:
  33. s.handlePost(w, r)
  34. case http.MethodGet:
  35. s.handleGet(w, r)
  36. case http.MethodDelete:
  37. s.handleDelete(w, r)
  38. default:
  39. http.NotFound(w, r)
  40. }
  41. }
  42. func (s *StreamableHTTPServer) handlePost(w http.ResponseWriter, r *http.Request) {
  43. // post request carry request/notification message
  44. // Check content type
  45. contentType := r.Header.Get("Content-Type")
  46. if contentType != "application/json" {
  47. http.Error(w, "Invalid content type: must be 'application/json'", http.StatusBadRequest)
  48. return
  49. }
  50. // Check the request body is valid json, meanwhile, get the request Method
  51. rawData, err := io.ReadAll(r.Body)
  52. if err != nil {
  53. s.writeJSONRPCError(
  54. w,
  55. nil,
  56. mcp.PARSE_ERROR,
  57. fmt.Sprintf("read request body error: %v", err),
  58. )
  59. return
  60. }
  61. var baseMessage struct {
  62. Method mcp.MCPMethod `json:"method"`
  63. }
  64. if err := json.Unmarshal(rawData, &baseMessage); err != nil {
  65. s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "request body is not valid json")
  66. return
  67. }
  68. // Process message through MCPServer
  69. response := s.server.HandleMessage(r.Context(), rawData)
  70. if response == nil {
  71. // For notifications, just send 202 Accepted with no body
  72. w.WriteHeader(http.StatusAccepted)
  73. return
  74. }
  75. w.Header().Set("Content-Type", "application/json")
  76. w.WriteHeader(http.StatusOK)
  77. _ = sonic.ConfigDefault.NewEncoder(w).Encode(response)
  78. }
  79. func (s *StreamableHTTPServer) handleGet(w http.ResponseWriter, _ *http.Request) {
  80. http.Error(w, "get request is not supported", http.StatusMethodNotAllowed)
  81. }
  82. func (s *StreamableHTTPServer) handleDelete(w http.ResponseWriter, _ *http.Request) {
  83. http.Error(w, "delete request is not supported", http.StatusMethodNotAllowed)
  84. }
  85. func (s *StreamableHTTPServer) writeJSONRPCError(
  86. w http.ResponseWriter,
  87. id any,
  88. code int,
  89. message string,
  90. ) {
  91. response := mcpservers.CreateMCPErrorResponse(id, code, message)
  92. jsonBody, err := sonic.Marshal(response)
  93. if err != nil {
  94. http.Error(w, err.Error(), http.StatusInternalServerError)
  95. return
  96. }
  97. w.Header().Set("Content-Type", "application/json")
  98. w.Header().Set("Content-Length", strconv.Itoa(len(jsonBody)))
  99. w.WriteHeader(http.StatusBadRequest)
  100. _, _ = w.Write(jsonBody)
  101. }