hub.go 8.2 KB


  1. package splithttp
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "io"
  6. gonet "net"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/xtls/xray-core/common"
  13. "github.com/xtls/xray-core/common/errors"
  14. "github.com/xtls/xray-core/common/net"
  15. http_proto "github.com/xtls/xray-core/common/protocol/http"
  16. "github.com/xtls/xray-core/common/signal/done"
  17. "github.com/xtls/xray-core/transport/internet"
  18. "github.com/xtls/xray-core/transport/internet/stat"
  19. v2tls "github.com/xtls/xray-core/transport/internet/tls"
  20. "golang.org/x/net/http2"
  21. "golang.org/x/net/http2/h2c"
  22. )
  23. type requestHandler struct {
  24. host string
  25. path string
  26. ln *Listener
  27. sessions sync.Map
  28. localAddr gonet.TCPAddr
  29. }
  30. type httpSession struct {
  31. uploadQueue *UploadQueue
  32. // for as long as the GET request is not opened by the client, this will be
  33. // open ("undone"), and the session may be expired within a certain TTL.
  34. // after the client connects, this becomes "done" and the session lives as
  35. // long as the GET request.
  36. isFullyConnected *done.Instance
  37. }
  38. func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessionId string) {
  39. shouldReap := done.New()
  40. go func() {
  41. time.Sleep(30 * time.Second)
  42. shouldReap.Close()
  43. }()
  44. select {
  45. case <-isFullyConnected.Wait():
  46. return
  47. case <-shouldReap.Wait():
  48. h.sessions.Delete(sessionId)
  49. }
  50. }
  51. func (h *requestHandler) upsertSession(sessionId string) *httpSession {
  52. currentSessionAny, ok := h.sessions.Load(sessionId)
  53. if ok {
  54. return currentSessionAny.(*httpSession)
  55. }
  56. s := &httpSession{
  57. uploadQueue: NewUploadQueue(int(2 * h.ln.config.GetNormalizedMaxConcurrentUploads())),
  58. isFullyConnected: done.New(),
  59. }
  60. h.sessions.Store(sessionId, s)
  61. go h.maybeReapSession(s.isFullyConnected, sessionId)
  62. return s
  63. }
  64. func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
  65. if len(h.host) > 0 && request.Host != h.host {
  66. errors.LogInfo(context.Background(), "failed to validate host, request:", request.Host, ", config:", h.host)
  67. writer.WriteHeader(http.StatusNotFound)
  68. return
  69. }
  70. if !strings.HasPrefix(request.URL.Path, h.path) {
  71. errors.LogInfo(context.Background(), "failed to validate path, request:", request.URL.Path, ", config:", h.path)
  72. writer.WriteHeader(http.StatusNotFound)
  73. return
  74. }
  75. sessionId := ""
  76. subpath := strings.Split(request.URL.Path[len(h.path):], "/")
  77. if len(subpath) > 0 {
  78. sessionId = subpath[0]
  79. }
  80. if sessionId == "" {
  81. errors.LogInfo(context.Background(), "no sessionid on request:", request.URL.Path)
  82. writer.WriteHeader(http.StatusBadRequest)
  83. return
  84. }
  85. forwardedAddrs := http_proto.ParseXForwardedFor(request.Header)
  86. remoteAddr, err := gonet.ResolveTCPAddr("tcp", request.RemoteAddr)
  87. if err != nil {
  88. remoteAddr = &gonet.TCPAddr{}
  89. }
  90. if len(forwardedAddrs) > 0 && forwardedAddrs[0].Family().IsIP() {
  91. remoteAddr = &net.TCPAddr{
  92. IP: forwardedAddrs[0].IP(),
  93. Port: int(0),
  94. }
  95. }
  96. currentSession := h.upsertSession(sessionId)
  97. if request.Method == "POST" {
  98. seq := ""
  99. if len(subpath) > 1 {
  100. seq = subpath[1]
  101. }
  102. if seq == "" {
  103. errors.LogInfo(context.Background(), "no seq on request:", request.URL.Path)
  104. writer.WriteHeader(http.StatusBadRequest)
  105. return
  106. }
  107. payload, err := io.ReadAll(request.Body)
  108. if err != nil {
  109. errors.LogInfoInner(context.Background(), err, "failed to upload")
  110. writer.WriteHeader(http.StatusInternalServerError)
  111. return
  112. }
  113. seqInt, err := strconv.ParseUint(seq, 10, 64)
  114. if err != nil {
  115. errors.LogInfoInner(context.Background(), err, "failed to upload")
  116. writer.WriteHeader(http.StatusInternalServerError)
  117. return
  118. }
  119. err = currentSession.uploadQueue.Push(Packet{
  120. Payload: payload,
  121. Seq: seqInt,
  122. })
  123. if err != nil {
  124. errors.LogInfoInner(context.Background(), err, "failed to upload")
  125. writer.WriteHeader(http.StatusInternalServerError)
  126. return
  127. }
  128. writer.WriteHeader(http.StatusOK)
  129. } else if request.Method == "GET" {
  130. responseFlusher, ok := writer.(http.Flusher)
  131. if !ok {
  132. panic("expected http.ResponseWriter to be an http.Flusher")
  133. }
  134. // after GET is done, the connection is finished. disable automatic
  135. // session reaping, and handle it in defer
  136. currentSession.isFullyConnected.Close()
  137. defer h.sessions.Delete(sessionId)
  138. // magic header instructs nginx + apache to not buffer response body
  139. writer.Header().Set("X-Accel-Buffering", "no")
  140. // magic header to make the HTTP middle box consider this as SSE to disable buffer
  141. writer.Header().Set("Content-Type", "text/event-stream")
  142. writer.WriteHeader(http.StatusOK)
  143. // send a chunk immediately to enable CDN streaming.
  144. // many CDN buffer the response headers until the origin starts sending
  145. // the body, with no way to turn it off.
  146. writer.Write([]byte("ok"))
  147. responseFlusher.Flush()
  148. downloadDone := done.New()
  149. conn := splitConn{
  150. writer: &httpResponseBodyWriter{
  151. responseWriter: writer,
  152. downloadDone: downloadDone,
  153. responseFlusher: responseFlusher,
  154. },
  155. reader: currentSession.uploadQueue,
  156. remoteAddr: remoteAddr,
  157. }
  158. h.ln.addConn(stat.Connection(&conn))
  159. // "A ResponseWriter may not be used after [Handler.ServeHTTP] has returned."
  160. <-downloadDone.Wait()
  161. } else {
  162. writer.WriteHeader(http.StatusMethodNotAllowed)
  163. }
  164. }
  165. type httpResponseBodyWriter struct {
  166. sync.Mutex
  167. responseWriter http.ResponseWriter
  168. responseFlusher http.Flusher
  169. downloadDone *done.Instance
  170. }
  171. func (c *httpResponseBodyWriter) Write(b []byte) (int, error) {
  172. c.Lock()
  173. defer c.Unlock()
  174. if c.downloadDone.Done() {
  175. return 0, io.ErrClosedPipe
  176. }
  177. n, err := c.responseWriter.Write(b)
  178. if err == nil {
  179. c.responseFlusher.Flush()
  180. }
  181. return n, err
  182. }
  183. func (c *httpResponseBodyWriter) Close() error {
  184. c.Lock()
  185. defer c.Unlock()
  186. c.downloadDone.Close()
  187. return nil
  188. }
  189. type Listener struct {
  190. sync.Mutex
  191. server http.Server
  192. listener net.Listener
  193. config *Config
  194. addConn internet.ConnHandler
  195. }
  196. func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
  197. l := &Listener{
  198. addConn: addConn,
  199. }
  200. shSettings := streamSettings.ProtocolSettings.(*Config)
  201. l.config = shSettings
  202. if l.config != nil {
  203. if streamSettings.SocketSettings == nil {
  204. streamSettings.SocketSettings = &internet.SocketConfig{}
  205. }
  206. }
  207. var listener net.Listener
  208. var err error
  209. var localAddr = gonet.TCPAddr{}
  210. if port == net.Port(0) { // unix
  211. listener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  212. Name: address.Domain(),
  213. Net: "unix",
  214. }, streamSettings.SocketSettings)
  215. if err != nil {
  216. return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err)
  217. }
  218. errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address)
  219. } else { // tcp
  220. localAddr = gonet.TCPAddr{
  221. IP: address.IP(),
  222. Port: int(port),
  223. }
  224. listener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  225. IP: address.IP(),
  226. Port: int(port),
  227. }, streamSettings.SocketSettings)
  228. if err != nil {
  229. return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err)
  230. }
  231. errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port)
  232. }
  233. if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil {
  234. if tlsConfig := config.GetTLSConfig(); tlsConfig != nil {
  235. listener = tls.NewListener(listener, tlsConfig)
  236. }
  237. }
  238. handler := &requestHandler{
  239. host: shSettings.Host,
  240. path: shSettings.GetNormalizedPath(),
  241. ln: l,
  242. sessions: sync.Map{},
  243. localAddr: localAddr,
  244. }
  245. // h2cHandler can handle both plaintext HTTP/1.1 and h2c
  246. h2cHandler := h2c.NewHandler(handler, &http2.Server{})
  247. l.listener = listener
  248. l.server = http.Server{
  249. Handler: h2cHandler,
  250. ReadHeaderTimeout: time.Second * 4,
  251. MaxHeaderBytes: 8192,
  252. }
  253. go func() {
  254. if err := l.server.Serve(l.listener); err != nil {
  255. errors.LogWarningInner(ctx, err, "failed to serve http for splithttp")
  256. }
  257. }()
  258. return l, err
  259. }
  260. // Addr implements net.Listener.Addr().
  261. func (ln *Listener) Addr() net.Addr {
  262. return ln.listener.Addr()
  263. }
  264. // Close implements net.Listener.Close().
  265. func (ln *Listener) Close() error {
  266. return ln.listener.Close()
  267. }
  268. func init() {
  269. common.Must(internet.RegisterTransportListener(protocolName, ListenSH))
  270. }