client.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package splithttp
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. gonet "net"
  8. "net/http"
  9. "net/http/httptrace"
  10. "sync"
  11. "github.com/xtls/xray-core/common"
  12. "github.com/xtls/xray-core/common/errors"
  13. "github.com/xtls/xray-core/common/net"
  14. "github.com/xtls/xray-core/common/signal/done"
  15. )
  16. // interface to abstract between use of browser dialer, vs net/http
  17. type DialerClient interface {
  18. IsClosed() bool
  19. // (ctx, baseURL, payload) -> err
  20. // baseURL already contains sessionId and seq
  21. SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
  22. // (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
  23. // baseURL already contains sessionId
  24. OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)
  25. // (ctx, baseURL) -> uploadWriter
  26. // baseURL already contains sessionId
  27. OpenUpload(context.Context, string) io.WriteCloser
  28. // (ctx, pureURL) -> (uploadWriter, downloadReader)
  29. // pureURL can not contain sessionId
  30. Open(context.Context, string) (io.WriteCloser, io.ReadCloser)
  31. }
  32. // implements splithttp.DialerClient in terms of direct network connections
  33. type DefaultDialerClient struct {
  34. transportConfig *Config
  35. client *http.Client
  36. closed bool
  37. httpVersion string
  38. // pool of net.Conn, created using dialUploadConn
  39. uploadRawPool *sync.Pool
  40. dialUploadConn func(ctxInner context.Context) (net.Conn, error)
  41. }
  42. func (c *DefaultDialerClient) IsClosed() bool {
  43. return c.closed
  44. }
  45. func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
  46. reader, writer := io.Pipe()
  47. req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader)
  48. req.Header = c.transportConfig.GetRequestHeader()
  49. if !c.transportConfig.NoGRPCHeader {
  50. req.Header.Set("Content-Type", "application/grpc")
  51. }
  52. wrc := &WaitReadCloser{Wait: make(chan struct{})}
  53. go func() {
  54. response, err := c.client.Do(req)
  55. if err != nil || response.StatusCode != 200 {
  56. if err != nil {
  57. errors.LogInfoInner(ctx, err, "failed to open ", pureURL)
  58. } else {
  59. // c.closed = true
  60. response.Body.Close()
  61. errors.LogInfo(ctx, "unexpected status ", response.StatusCode)
  62. }
  63. wrc.Close()
  64. return
  65. }
  66. wrc.Set(response.Body)
  67. }()
  68. return writer, wrc
  69. }
  70. func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
  71. reader, writer := io.Pipe()
  72. req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
  73. req.Header = c.transportConfig.GetRequestHeader()
  74. if !c.transportConfig.NoGRPCHeader {
  75. req.Header.Set("Content-Type", "application/grpc")
  76. }
  77. go func() {
  78. if resp, err := c.client.Do(req); err == nil {
  79. if resp.StatusCode != 200 {
  80. // c.closed = true
  81. }
  82. resp.Body.Close()
  83. }
  84. }()
  85. return writer
  86. }
  87. func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
  88. var remoteAddr gonet.Addr
  89. var localAddr gonet.Addr
  90. // this is done when the TCP/UDP connection to the server was established,
  91. // and we can unblock the Dial function and print correct net addresses in
  92. // logs
  93. gotConn := done.New()
  94. var downResponse io.ReadCloser
  95. gotDownResponse := done.New()
  96. ctx, ctxCancel := context.WithCancel(ctx)
  97. go func() {
  98. trace := &httptrace.ClientTrace{
  99. GotConn: func(connInfo httptrace.GotConnInfo) {
  100. remoteAddr = connInfo.Conn.RemoteAddr()
  101. localAddr = connInfo.Conn.LocalAddr()
  102. gotConn.Close()
  103. },
  104. }
  105. // in case we hit an error, we want to unblock this part
  106. defer gotConn.Close()
  107. ctx = httptrace.WithClientTrace(ctx, trace)
  108. req, err := http.NewRequestWithContext(
  109. ctx,
  110. "GET",
  111. baseURL,
  112. nil,
  113. )
  114. if err != nil {
  115. errors.LogInfoInner(ctx, err, "failed to construct download http request")
  116. gotDownResponse.Close()
  117. return
  118. }
  119. req.Header = c.transportConfig.GetRequestHeader()
  120. response, err := c.client.Do(req)
  121. gotConn.Close()
  122. if err != nil {
  123. errors.LogInfoInner(ctx, err, "failed to send download http request")
  124. gotDownResponse.Close()
  125. return
  126. }
  127. if response.StatusCode != 200 {
  128. // c.closed = true
  129. response.Body.Close()
  130. errors.LogInfo(ctx, "invalid status code on download:", response.Status)
  131. gotDownResponse.Close()
  132. return
  133. }
  134. downResponse = response.Body
  135. gotDownResponse.Close()
  136. }()
  137. <-gotConn.Wait()
  138. lazyDownload := &LazyReader{
  139. CreateReader: func() (io.Reader, error) {
  140. <-gotDownResponse.Wait()
  141. if downResponse == nil {
  142. return nil, errors.New("downResponse failed")
  143. }
  144. return downResponse, nil
  145. },
  146. }
  147. // workaround for https://github.com/quic-go/quic-go/issues/2143 --
  148. // always cancel request context so that Close cancels any Read.
  149. // Should then match the behavior of http2 and http1.
  150. reader := downloadBody{
  151. lazyDownload,
  152. ctxCancel,
  153. }
  154. return reader, remoteAddr, localAddr, nil
  155. }
  156. func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
  157. req, err := http.NewRequestWithContext(ctx, "POST", url, payload)
  158. if err != nil {
  159. return err
  160. }
  161. req.ContentLength = contentLength
  162. req.Header = c.transportConfig.GetRequestHeader()
  163. if c.httpVersion != "1.1" {
  164. resp, err := c.client.Do(req)
  165. if err != nil {
  166. return err
  167. }
  168. defer resp.Body.Close()
  169. if resp.StatusCode != 200 {
  170. // c.closed = true
  171. return errors.New("bad status code:", resp.Status)
  172. }
  173. } else {
  174. // stringify the entire HTTP/1.1 request so it can be
  175. // safely retried. if instead req.Write is called multiple
  176. // times, the body is already drained after the first
  177. // request
  178. requestBuff := new(bytes.Buffer)
  179. common.Must(req.Write(requestBuff))
  180. var uploadConn any
  181. var h1UploadConn *H1Conn
  182. for {
  183. uploadConn = c.uploadRawPool.Get()
  184. newConnection := uploadConn == nil
  185. if newConnection {
  186. newConn, err := c.dialUploadConn(context.WithoutCancel(ctx))
  187. if err != nil {
  188. return err
  189. }
  190. h1UploadConn = NewH1Conn(newConn)
  191. uploadConn = h1UploadConn
  192. } else {
  193. h1UploadConn = uploadConn.(*H1Conn)
  194. // TODO: Replace 0 here with a config value later
  195. // Or add some other condition for optimization purposes
  196. if h1UploadConn.UnreadedResponsesCount > 0 {
  197. resp, err := http.ReadResponse(h1UploadConn.RespBufReader, req)
  198. if err != nil {
  199. return fmt.Errorf("error while reading response: %s", err.Error())
  200. }
  201. if resp.StatusCode != 200 {
  202. // c.closed = true
  203. // resp.Body.Close() // I'm not sure
  204. return fmt.Errorf("got non-200 error response code: %d", resp.StatusCode)
  205. }
  206. }
  207. }
  208. _, err := h1UploadConn.Write(requestBuff.Bytes())
  209. // if the write failed, we try another connection from
  210. // the pool, until the write on a new connection fails.
  211. // failed writes to a pooled connection are normal when
  212. // the connection has been closed in the meantime.
  213. if err == nil {
  214. break
  215. } else if newConnection {
  216. return err
  217. }
  218. }
  219. c.uploadRawPool.Put(uploadConn)
  220. }
  221. return nil
  222. }
  223. type downloadBody struct {
  224. io.Reader
  225. cancel context.CancelFunc
  226. }
  227. func (c downloadBody) Close() error {
  228. c.cancel()
  229. return nil
  230. }
  231. type WaitReadCloser struct {
  232. Wait chan struct{}
  233. io.ReadCloser
  234. }
  235. func (w *WaitReadCloser) Set(rc io.ReadCloser) {
  236. w.ReadCloser = rc
  237. defer func() {
  238. if recover() != nil {
  239. rc.Close()
  240. }
  241. }()
  242. close(w.Wait)
  243. }
  244. func (w *WaitReadCloser) Read(b []byte) (int, error) {
  245. if w.ReadCloser == nil {
  246. if <-w.Wait; w.ReadCloser == nil {
  247. return 0, io.ErrClosedPipe
  248. }
  249. }
  250. return w.ReadCloser.Read(b)
  251. }
  252. func (w *WaitReadCloser) Close() error {
  253. if w.ReadCloser != nil {
  254. return w.ReadCloser.Close()
  255. }
  256. defer func() {
  257. if recover() != nil && w.ReadCloser != nil {
  258. w.ReadCloser.Close()
  259. }
  260. }()
  261. close(w.Wait)
  262. return nil
  263. }