client.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package splithttp
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. gonet "net"
  8. "net/http"
  9. "net/http/httptrace"
  10. "sync"
  11. "github.com/xtls/xray-core/common"
  12. "github.com/xtls/xray-core/common/errors"
  13. "github.com/xtls/xray-core/common/net"
  14. "github.com/xtls/xray-core/common/signal/done"
  15. )
  16. // interface to abstract between use of browser dialer, vs net/http
  17. type DialerClient interface {
  18. // (ctx, baseURL, payload) -> err
  19. // baseURL already contains sessionId and seq
  20. SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
  21. // (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
  22. // baseURL already contains sessionId
  23. OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)
  24. // (ctx, baseURL) -> uploadWriter
  25. // baseURL already contains sessionId
  26. OpenUpload(context.Context, string) io.WriteCloser
  27. }
  28. // implements splithttp.DialerClient in terms of direct network connections
  29. type DefaultDialerClient struct {
  30. transportConfig *Config
  31. client *http.Client
  32. isH2 bool
  33. isH3 bool
  34. // pool of net.Conn, created using dialUploadConn
  35. uploadRawPool *sync.Pool
  36. dialUploadConn func(ctxInner context.Context) (net.Conn, error)
  37. }
  38. func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
  39. reader, writer := io.Pipe()
  40. req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
  41. req.Header = c.transportConfig.GetRequestHeader()
  42. if !c.transportConfig.NoGRPCHeader {
  43. req.Header.Set("Content-Type", "application/grpc")
  44. }
  45. go c.client.Do(req)
  46. return writer
  47. }
  48. func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
  49. var remoteAddr gonet.Addr
  50. var localAddr gonet.Addr
  51. // this is done when the TCP/UDP connection to the server was established,
  52. // and we can unblock the Dial function and print correct net addresses in
  53. // logs
  54. gotConn := done.New()
  55. var downResponse io.ReadCloser
  56. gotDownResponse := done.New()
  57. ctx, ctxCancel := context.WithCancel(ctx)
  58. go func() {
  59. trace := &httptrace.ClientTrace{
  60. GotConn: func(connInfo httptrace.GotConnInfo) {
  61. remoteAddr = connInfo.Conn.RemoteAddr()
  62. localAddr = connInfo.Conn.LocalAddr()
  63. gotConn.Close()
  64. },
  65. }
  66. // in case we hit an error, we want to unblock this part
  67. defer gotConn.Close()
  68. ctx = httptrace.WithClientTrace(ctx, trace)
  69. req, err := http.NewRequestWithContext(
  70. ctx,
  71. "GET",
  72. baseURL,
  73. nil,
  74. )
  75. if err != nil {
  76. errors.LogInfoInner(ctx, err, "failed to construct download http request")
  77. gotDownResponse.Close()
  78. return
  79. }
  80. req.Header = c.transportConfig.GetRequestHeader()
  81. response, err := c.client.Do(req)
  82. gotConn.Close()
  83. if err != nil {
  84. errors.LogInfoInner(ctx, err, "failed to send download http request")
  85. gotDownResponse.Close()
  86. return
  87. }
  88. if response.StatusCode != 200 {
  89. response.Body.Close()
  90. errors.LogInfo(ctx, "invalid status code on download:", response.Status)
  91. gotDownResponse.Close()
  92. return
  93. }
  94. downResponse = response.Body
  95. gotDownResponse.Close()
  96. }()
  97. if !c.isH3 {
  98. // in quic-go, sometimes gotConn is never closed for the lifetime of
  99. // the entire connection, and the download locks up
  100. // https://github.com/quic-go/quic-go/issues/3342
  101. // for other HTTP versions, we want to block Dial until we know the
  102. // remote address of the server, for logging purposes
  103. <-gotConn.Wait()
  104. }
  105. lazyDownload := &LazyReader{
  106. CreateReader: func() (io.Reader, error) {
  107. <-gotDownResponse.Wait()
  108. if downResponse == nil {
  109. return nil, errors.New("downResponse failed")
  110. }
  111. return downResponse, nil
  112. },
  113. }
  114. // workaround for https://github.com/quic-go/quic-go/issues/2143 --
  115. // always cancel request context so that Close cancels any Read.
  116. // Should then match the behavior of http2 and http1.
  117. reader := downloadBody{
  118. lazyDownload,
  119. ctxCancel,
  120. }
  121. return reader, remoteAddr, localAddr, nil
  122. }
  123. func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
  124. req, err := http.NewRequest("POST", url, payload)
  125. if err != nil {
  126. return err
  127. }
  128. req.ContentLength = contentLength
  129. req.Header = c.transportConfig.GetRequestHeader()
  130. if c.isH2 || c.isH3 {
  131. resp, err := c.client.Do(req)
  132. if err != nil {
  133. return err
  134. }
  135. defer resp.Body.Close()
  136. if resp.StatusCode != 200 {
  137. return errors.New("bad status code:", resp.Status)
  138. }
  139. } else {
  140. // stringify the entire HTTP/1.1 request so it can be
  141. // safely retried. if instead req.Write is called multiple
  142. // times, the body is already drained after the first
  143. // request
  144. requestBuff := new(bytes.Buffer)
  145. common.Must(req.Write(requestBuff))
  146. var uploadConn any
  147. var h1UploadConn *H1Conn
  148. for {
  149. uploadConn = c.uploadRawPool.Get()
  150. newConnection := uploadConn == nil
  151. if newConnection {
  152. newConn, err := c.dialUploadConn(context.WithoutCancel(ctx))
  153. if err != nil {
  154. return err
  155. }
  156. h1UploadConn = NewH1Conn(newConn)
  157. uploadConn = h1UploadConn
  158. } else {
  159. h1UploadConn = uploadConn.(*H1Conn)
  160. // TODO: Replace 0 here with a config value later
  161. // Or add some other condition for optimization purposes
  162. if h1UploadConn.UnreadedResponsesCount > 0 {
  163. resp, err := http.ReadResponse(h1UploadConn.RespBufReader, req)
  164. if err != nil {
  165. return fmt.Errorf("error while reading response: %s", err.Error())
  166. }
  167. if resp.StatusCode != 200 {
  168. return fmt.Errorf("got non-200 error response code: %d", resp.StatusCode)
  169. }
  170. }
  171. }
  172. _, err := h1UploadConn.Write(requestBuff.Bytes())
  173. // if the write failed, we try another connection from
  174. // the pool, until the write on a new connection fails.
  175. // failed writes to a pooled connection are normal when
  176. // the connection has been closed in the meantime.
  177. if err == nil {
  178. break
  179. } else if newConnection {
  180. return err
  181. }
  182. }
  183. c.uploadRawPool.Put(uploadConn)
  184. }
  185. return nil
  186. }
  187. type downloadBody struct {
  188. io.Reader
  189. cancel context.CancelFunc
  190. }
  191. func (c downloadBody) Close() error {
  192. c.cancel()
  193. return nil
  194. }