hub.go 8.4 KB


  1. package splithttp
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "io"
  6. gonet "net"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/xtls/xray-core/common"
  13. "github.com/xtls/xray-core/common/errors"
  14. "github.com/xtls/xray-core/common/net"
  15. http_proto "github.com/xtls/xray-core/common/protocol/http"
  16. "github.com/xtls/xray-core/common/signal/done"
  17. "github.com/xtls/xray-core/transport/internet"
  18. "github.com/xtls/xray-core/transport/internet/stat"
  19. v2tls "github.com/xtls/xray-core/transport/internet/tls"
  20. "golang.org/x/net/http2"
  21. "golang.org/x/net/http2/h2c"
  22. )
  23. type requestHandler struct {
  24. host string
  25. path string
  26. ln *Listener
  27. sessionMu *sync.Mutex
  28. sessions sync.Map
  29. localAddr gonet.TCPAddr
  30. }
  31. type httpSession struct {
  32. uploadQueue *uploadQueue
  33. // for as long as the GET request is not opened by the client, this will be
  34. // open ("undone"), and the session may be expired within a certain TTL.
  35. // after the client connects, this becomes "done" and the session lives as
  36. // long as the GET request.
  37. isFullyConnected *done.Instance
  38. }
  39. func (h *requestHandler) maybeReapSession(isFullyConnected *done.Instance, sessionId string) {
  40. shouldReap := done.New()
  41. go func() {
  42. time.Sleep(30 * time.Second)
  43. shouldReap.Close()
  44. }()
  45. select {
  46. case <-isFullyConnected.Wait():
  47. return
  48. case <-shouldReap.Wait():
  49. h.sessions.Delete(sessionId)
  50. }
  51. }
  52. func (h *requestHandler) upsertSession(sessionId string) *httpSession {
  53. // fast path
  54. currentSessionAny, ok := h.sessions.Load(sessionId)
  55. if ok {
  56. return currentSessionAny.(*httpSession)
  57. }
  58. // slow path
  59. h.sessionMu.Lock()
  60. defer h.sessionMu.Unlock()
  61. currentSessionAny, ok = h.sessions.Load(sessionId)
  62. if ok {
  63. return currentSessionAny.(*httpSession)
  64. }
  65. s := &httpSession{
  66. uploadQueue: NewUploadQueue(int(2 * h.ln.config.GetNormalizedMaxConcurrentUploads())),
  67. isFullyConnected: done.New(),
  68. }
  69. h.sessions.Store(sessionId, s)
  70. go h.maybeReapSession(s.isFullyConnected, sessionId)
  71. return s
  72. }
  73. func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
  74. if len(h.host) > 0 && !internet.IsValidHTTPHost(request.Host, h.host) {
  75. errors.LogInfo(context.Background(), "failed to validate host, request:", request.Host, ", config:", h.host)
  76. writer.WriteHeader(http.StatusNotFound)
  77. return
  78. }
  79. if !strings.HasPrefix(request.URL.Path, h.path) {
  80. errors.LogInfo(context.Background(), "failed to validate path, request:", request.URL.Path, ", config:", h.path)
  81. writer.WriteHeader(http.StatusNotFound)
  82. return
  83. }
  84. sessionId := ""
  85. subpath := strings.Split(request.URL.Path[len(h.path):], "/")
  86. if len(subpath) > 0 {
  87. sessionId = subpath[0]
  88. }
  89. if sessionId == "" {
  90. errors.LogInfo(context.Background(), "no sessionid on request:", request.URL.Path)
  91. writer.WriteHeader(http.StatusBadRequest)
  92. return
  93. }
  94. forwardedAddrs := http_proto.ParseXForwardedFor(request.Header)
  95. remoteAddr, err := gonet.ResolveTCPAddr("tcp", request.RemoteAddr)
  96. if err != nil {
  97. remoteAddr = &gonet.TCPAddr{}
  98. }
  99. if len(forwardedAddrs) > 0 && forwardedAddrs[0].Family().IsIP() {
  100. remoteAddr = &net.TCPAddr{
  101. IP: forwardedAddrs[0].IP(),
  102. Port: int(0),
  103. }
  104. }
  105. currentSession := h.upsertSession(sessionId)
  106. if request.Method == "POST" {
  107. seq := ""
  108. if len(subpath) > 1 {
  109. seq = subpath[1]
  110. }
  111. if seq == "" {
  112. errors.LogInfo(context.Background(), "no seq on request:", request.URL.Path)
  113. writer.WriteHeader(http.StatusBadRequest)
  114. return
  115. }
  116. payload, err := io.ReadAll(request.Body)
  117. if err != nil {
  118. errors.LogInfoInner(context.Background(), err, "failed to upload")
  119. writer.WriteHeader(http.StatusInternalServerError)
  120. return
  121. }
  122. seqInt, err := strconv.ParseUint(seq, 10, 64)
  123. if err != nil {
  124. errors.LogInfoInner(context.Background(), err, "failed to upload")
  125. writer.WriteHeader(http.StatusInternalServerError)
  126. return
  127. }
  128. err = currentSession.uploadQueue.Push(Packet{
  129. Payload: payload,
  130. Seq: seqInt,
  131. })
  132. if err != nil {
  133. errors.LogInfoInner(context.Background(), err, "failed to upload")
  134. writer.WriteHeader(http.StatusInternalServerError)
  135. return
  136. }
  137. writer.WriteHeader(http.StatusOK)
  138. } else if request.Method == "GET" {
  139. responseFlusher, ok := writer.(http.Flusher)
  140. if !ok {
  141. panic("expected http.ResponseWriter to be an http.Flusher")
  142. }
  143. // after GET is done, the connection is finished. disable automatic
  144. // session reaping, and handle it in defer
  145. currentSession.isFullyConnected.Close()
  146. defer h.sessions.Delete(sessionId)
  147. // magic header instructs nginx + apache to not buffer response body
  148. writer.Header().Set("X-Accel-Buffering", "no")
  149. // magic header to make the HTTP middle box consider this as SSE to disable buffer
  150. writer.Header().Set("Content-Type", "text/event-stream")
  151. writer.WriteHeader(http.StatusOK)
  152. // send a chunk immediately to enable CDN streaming.
  153. // many CDN buffer the response headers until the origin starts sending
  154. // the body, with no way to turn it off.
  155. writer.Write([]byte("ok"))
  156. responseFlusher.Flush()
  157. downloadDone := done.New()
  158. conn := splitConn{
  159. writer: &httpResponseBodyWriter{
  160. responseWriter: writer,
  161. downloadDone: downloadDone,
  162. responseFlusher: responseFlusher,
  163. },
  164. reader: currentSession.uploadQueue,
  165. remoteAddr: remoteAddr,
  166. }
  167. h.ln.addConn(stat.Connection(&conn))
  168. // "A ResponseWriter may not be used after [Handler.ServeHTTP] has returned."
  169. <-downloadDone.Wait()
  170. } else {
  171. writer.WriteHeader(http.StatusMethodNotAllowed)
  172. }
  173. }
  174. type httpResponseBodyWriter struct {
  175. sync.Mutex
  176. responseWriter http.ResponseWriter
  177. responseFlusher http.Flusher
  178. downloadDone *done.Instance
  179. }
  180. func (c *httpResponseBodyWriter) Write(b []byte) (int, error) {
  181. c.Lock()
  182. defer c.Unlock()
  183. if c.downloadDone.Done() {
  184. return 0, io.ErrClosedPipe
  185. }
  186. n, err := c.responseWriter.Write(b)
  187. if err == nil {
  188. c.responseFlusher.Flush()
  189. }
  190. return n, err
  191. }
  192. func (c *httpResponseBodyWriter) Close() error {
  193. c.Lock()
  194. defer c.Unlock()
  195. c.downloadDone.Close()
  196. return nil
  197. }
  198. type Listener struct {
  199. sync.Mutex
  200. server http.Server
  201. listener net.Listener
  202. config *Config
  203. addConn internet.ConnHandler
  204. }
  205. func ListenSH(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
  206. l := &Listener{
  207. addConn: addConn,
  208. }
  209. shSettings := streamSettings.ProtocolSettings.(*Config)
  210. l.config = shSettings
  211. if l.config != nil {
  212. if streamSettings.SocketSettings == nil {
  213. streamSettings.SocketSettings = &internet.SocketConfig{}
  214. }
  215. }
  216. var listener net.Listener
  217. var err error
  218. var localAddr = gonet.TCPAddr{}
  219. if port == net.Port(0) { // unix
  220. listener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  221. Name: address.Domain(),
  222. Net: "unix",
  223. }, streamSettings.SocketSettings)
  224. if err != nil {
  225. return nil, errors.New("failed to listen unix domain socket(for SH) on ", address).Base(err)
  226. }
  227. errors.LogInfo(ctx, "listening unix domain socket(for SH) on ", address)
  228. } else { // tcp
  229. localAddr = gonet.TCPAddr{
  230. IP: address.IP(),
  231. Port: int(port),
  232. }
  233. listener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  234. IP: address.IP(),
  235. Port: int(port),
  236. }, streamSettings.SocketSettings)
  237. if err != nil {
  238. return nil, errors.New("failed to listen TCP(for SH) on ", address, ":", port).Base(err)
  239. }
  240. errors.LogInfo(ctx, "listening TCP(for SH) on ", address, ":", port)
  241. }
  242. if config := v2tls.ConfigFromStreamSettings(streamSettings); config != nil {
  243. if tlsConfig := config.GetTLSConfig(); tlsConfig != nil {
  244. listener = tls.NewListener(listener, tlsConfig)
  245. }
  246. }
  247. handler := &requestHandler{
  248. host: shSettings.Host,
  249. path: shSettings.GetNormalizedPath(),
  250. ln: l,
  251. sessionMu: &sync.Mutex{},
  252. sessions: sync.Map{},
  253. localAddr: localAddr,
  254. }
  255. // h2cHandler can handle both plaintext HTTP/1.1 and h2c
  256. h2cHandler := h2c.NewHandler(handler, &http2.Server{})
  257. l.listener = listener
  258. l.server = http.Server{
  259. Handler: h2cHandler,
  260. ReadHeaderTimeout: time.Second * 4,
  261. MaxHeaderBytes: 8192,
  262. }
  263. go func() {
  264. if err := l.server.Serve(l.listener); err != nil {
  265. errors.LogWarningInner(ctx, err, "failed to serve http for splithttp")
  266. }
  267. }()
  268. return l, err
  269. }
  270. // Addr implements net.Listener.Addr().
  271. func (ln *Listener) Addr() net.Addr {
  272. return ln.listener.Addr()
  273. }
  274. // Close implements net.Listener.Close().
  275. func (ln *Listener) Close() error {
  276. return ln.listener.Close()
  277. }
  278. func init() {
  279. common.Must(internet.RegisterTransportListener(protocolName, ListenSH))
  280. }