Prechádzať zdrojové kódy

XHTTP client: Merge Open* into OpenStream(), and more

https://github.com/XTLS/Xray-core/issues/4148#issuecomment-2557066988
RPRX 10 mesiacov pred
rodič
commit
db934f0832

+ 1 - 1
go.mod

@@ -19,7 +19,7 @@ require (
 	github.com/stretchr/testify v1.10.0
 	github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e
 	github.com/vishvananda/netlink v1.3.0
-	github.com/xtls/quic-go v0.46.0
+	github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087
 	github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d
 	go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
 	golang.org/x/crypto v0.31.0

+ 2 - 2
go.sum

@@ -68,8 +68,8 @@ github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQ
 github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
 github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
 github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
-github.com/xtls/quic-go v0.46.0 h1:yfv6h+/+iOeFhFnmJiwlZgnJjr4fPb4N4rQelffbs1U=
-github.com/xtls/quic-go v0.46.0/go.mod h1:mN9lAuc8Vt7eHvnQkDIH5+uHh+DcLmTBma9rLqk/rPY=
+github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087 h1:kKPg/cJPSKnE50VXVBskDYYSBkl4X3sMCIbTy+XKNGk=
+github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087/go.mod h1:mN9lAuc8Vt7eHvnQkDIH5+uHh+DcLmTBma9rLqk/rPY=
 github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d h1:+B97uD9uHLgAAulhigmys4BVwZZypzK7gPN3WtpgRJg=
 github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d/go.mod h1:dm4y/1QwzjGaK17ofi0Vs6NpKAHegZky8qk6J2JJZAE=
 github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

+ 7 - 11
transport/internet/splithttp/browser_client.go

@@ -17,16 +17,12 @@ func (c *BrowserDialerClient) IsClosed() bool {
 	panic("not implemented yet")
 }
 
-func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
-	panic("not implemented yet")
-}
-
-func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
-	panic("not implemented yet")
-}
+func (c *BrowserDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
+	if body != nil {
+		panic("not implemented yet")
+	}
 
-func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
-	conn, err := browser_dialer.DialGet(baseURL)
+	conn, err := browser_dialer.DialGet(url)
 	dummyAddr := &gonet.IPAddr{}
 	if err != nil {
 		return nil, dummyAddr, dummyAddr, err
@@ -35,8 +31,8 @@ func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string)
 	return websocket.NewConnection(conn, dummyAddr, nil, 0), conn.RemoteAddr(), conn.LocalAddr(), nil
 }
 
-func (c *BrowserDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
-	bytes, err := io.ReadAll(payload)
+func (c *BrowserDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error {
+	bytes, err := io.ReadAll(body)
 	if err != nil {
 		return err
 	}

+ 36 - 136
transport/internet/splithttp/client.go

@@ -20,21 +20,11 @@ import (
 type DialerClient interface {
 	IsClosed() bool
 
-	// (ctx, baseURL, payload) -> err
-	// baseURL already contains sessionId and seq
-	SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
+	// ctx, url, body, uploadOnly
+	OpenStream(context.Context, string, io.Reader, bool) (io.ReadCloser, net.Addr, net.Addr, error)
 
-	// (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
-	// baseURL already contains sessionId
-	OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)
-
-	// (ctx, baseURL) -> uploadWriter
-	// baseURL already contains sessionId
-	OpenUpload(context.Context, string) io.WriteCloser
-
-	// (ctx, pureURL) -> (uploadWriter, downloadReader)
-	// pureURL can not contain sessionId
-	Open(context.Context, string) (io.WriteCloser, io.ReadCloser)
+	// ctx, url, body, contentLength
+	PostPacket(context.Context, string, io.Reader, int64) error
 }
 
 // implements splithttp.DialerClient in terms of direct network connections
@@ -52,136 +42,56 @@ func (c *DefaultDialerClient) IsClosed() bool {
 	return c.closed
 }
 
-func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
-	reader, writer := io.Pipe()
-	req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader)
-	req.Header = c.transportConfig.GetRequestHeader()
-	if !c.transportConfig.NoGRPCHeader {
-		req.Header.Set("Content-Type", "application/grpc")
-	}
-	wrc := &WaitReadCloser{Wait: make(chan struct{})}
-	go func() {
-		response, err := c.client.Do(req)
-		if err != nil || response.StatusCode != 200 {
-			if err != nil {
-				errors.LogInfoInner(ctx, err, "failed to open ", pureURL)
-			} else {
-				// c.closed = true
-				response.Body.Close()
-				errors.LogInfo(ctx, "unexpected status ", response.StatusCode)
-			}
-			wrc.Close()
-			return
-		}
-		wrc.Set(response.Body)
-	}()
-	return writer, wrc
-}
-
-func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
-	reader, writer := io.Pipe()
-	req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
-	req.Header = c.transportConfig.GetRequestHeader()
-	if !c.transportConfig.NoGRPCHeader {
-		req.Header.Set("Content-Type", "application/grpc")
-	}
-	go func() {
-		if resp, err := c.client.Do(req); err == nil {
-			if resp.StatusCode != 200 {
-				// c.closed = true
-			}
-			resp.Body.Close()
-		}
-	}()
-	return writer
-}
-
-func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
-	var remoteAddr gonet.Addr
-	var localAddr gonet.Addr
+func (c *DefaultDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (wrc io.ReadCloser, remoteAddr, localAddr gonet.Addr, err error) {
 	// this is done when the TCP/UDP connection to the server was established,
 	// and we can unblock the Dial function and print correct net addresses in
 	// logs
 	gotConn := done.New()
+	ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
+		GotConn: func(connInfo httptrace.GotConnInfo) {
+			remoteAddr = connInfo.Conn.RemoteAddr()
+			localAddr = connInfo.Conn.LocalAddr()
+			gotConn.Close()
+		},
+	})
 
-	var downResponse io.ReadCloser
-	gotDownResponse := done.New()
-
-	ctx, ctxCancel := context.WithCancel(ctx)
+	method := "GET"
+	if body != nil {
+		method = "POST"
+	}
+	req, _ := http.NewRequestWithContext(ctx, method, url, body)
+	req.Header = c.transportConfig.GetRequestHeader()
+	if method == "POST" && !c.transportConfig.NoGRPCHeader {
+		req.Header.Set("Content-Type", "application/grpc")
+	}
 
+	wrc = &WaitReadCloser{Wait: make(chan struct{})}
 	go func() {
-		trace := &httptrace.ClientTrace{
-			GotConn: func(connInfo httptrace.GotConnInfo) {
-				remoteAddr = connInfo.Conn.RemoteAddr()
-				localAddr = connInfo.Conn.LocalAddr()
-				gotConn.Close()
-			},
-		}
-
-		// in case we hit an error, we want to unblock this part
-		defer gotConn.Close()
-
-		ctx = httptrace.WithClientTrace(ctx, trace)
-
-		req, err := http.NewRequestWithContext(
-			ctx,
-			"GET",
-			baseURL,
-			nil,
-		)
-		if err != nil {
-			errors.LogInfoInner(ctx, err, "failed to construct download http request")
-			gotDownResponse.Close()
-			return
-		}
-
-		req.Header = c.transportConfig.GetRequestHeader()
-
-		response, err := c.client.Do(req)
-		gotConn.Close()
+		resp, err := c.client.Do(req)
 		if err != nil {
-			errors.LogInfoInner(ctx, err, "failed to send download http request")
-			gotDownResponse.Close()
+			errors.LogInfoInner(ctx, err, "failed to "+method+" "+url)
+			gotConn.Close()
+			wrc.Close()
 			return
 		}
-
-		if response.StatusCode != 200 {
+		if resp.StatusCode != 200 && !uploadOnly {
 			// c.closed = true
-			response.Body.Close()
-			errors.LogInfo(ctx, "invalid status code on download:", response.Status)
-			gotDownResponse.Close()
+			errors.LogInfo(ctx, "unexpected status ", resp.StatusCode)
+		}
+		if resp.StatusCode != 200 || uploadOnly {
+			resp.Body.Close()
+			wrc.Close()
 			return
 		}
-
-		downResponse = response.Body
-		gotDownResponse.Close()
+		wrc.(*WaitReadCloser).Set(resp.Body)
 	}()
 
 	<-gotConn.Wait()
-
-	lazyDownload := &LazyReader{
-		CreateReader: func() (io.Reader, error) {
-			<-gotDownResponse.Wait()
-			if downResponse == nil {
-				return nil, errors.New("downResponse failed")
-			}
-			return downResponse, nil
-		},
-	}
-
-	// workaround for https://github.com/quic-go/quic-go/issues/2143 --
-	// always cancel request context so that Close cancels any Read.
-	// Should then match the behavior of http2 and http1.
-	reader := downloadBody{
-		lazyDownload,
-		ctxCancel,
-	}
-
-	return reader, remoteAddr, localAddr, nil
+	return
 }
 
-func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
-	req, err := http.NewRequestWithContext(ctx, "POST", url, payload)
+func (c *DefaultDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error {
+	req, err := http.NewRequestWithContext(ctx, "POST", url, body)
 	if err != nil {
 		return err
 	}
@@ -257,16 +167,6 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
 	return nil
 }
 
-type downloadBody struct {
-	io.Reader
-	cancel context.CancelFunc
-}
-
-func (c downloadBody) Close() error {
-	c.cancel()
-	return nil
-}
-
 type WaitReadCloser struct {
 	Wait chan struct{}
 	io.ReadCloser

+ 15 - 29
transport/internet/splithttp/dialer.go

@@ -343,29 +343,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
 		errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host))
 	}
 
-	var writer io.WriteCloser
-	var reader io.ReadCloser
-	var remoteAddr, localAddr net.Addr
-	var err error
-
-	if mode == "stream-one" {
-		requestURL.Path = transportConfiguration.GetNormalizedPath()
-		if xmuxClient != nil {
-			xmuxClient.LeftRequests.Add(-1)
-		}
-		writer, reader = httpClient.Open(context.WithoutCancel(ctx), requestURL.String())
-		remoteAddr = &net.TCPAddr{}
-		localAddr = &net.TCPAddr{}
-	} else {
-		if xmuxClient2 != nil {
-			xmuxClient2.LeftRequests.Add(-1)
-		}
-		reader, remoteAddr, localAddr, err = httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String())
-		if err != nil {
-			return nil, err
-		}
-	}
-
 	if xmuxClient != nil {
 		xmuxClient.OpenUsage.Add(1)
 	}
@@ -374,11 +351,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
 	}
 	var closed atomic.Int32
 
+	reader, writer := io.Pipe()
 	conn := splitConn{
-		writer:     writer,
-		reader:     reader,
-		remoteAddr: remoteAddr,
-		localAddr:  localAddr,
+		writer: writer,
 		onClose: func() {
 			if closed.Add(1) > 1 {
 				return
@@ -393,16 +368,27 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
 	}
 
 	if mode == "stream-one" {
+		requestURL.Path = transportConfiguration.GetNormalizedPath()
 		if xmuxClient != nil {
 			xmuxClient.LeftRequests.Add(-1)
 		}
+		conn.reader, conn.remoteAddr, conn.localAddr, _ = httpClient.OpenStream(context.WithoutCancel(ctx), requestURL.String(), reader, false)
 		return stat.Connection(&conn), nil
+	} else { // stream-down
+		var err error
+		if xmuxClient2 != nil {
+			xmuxClient2.LeftRequests.Add(-1)
+		}
+		conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(context.WithoutCancel(ctx), requestURL2.String(), nil, false)
+		if err != nil { // browser dialer only
+			return nil, err
+		}
 	}
 	if mode == "stream-up" {
 		if xmuxClient != nil {
 			xmuxClient.LeftRequests.Add(-1)
 		}
-		conn.writer = httpClient.OpenUpload(ctx, requestURL.String())
+		httpClient.OpenStream(ctx, requestURL.String(), reader, true)
 		return stat.Connection(&conn), nil
 	}
 
@@ -466,7 +452,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
 			}
 
 			go func() {
-				err := httpClient.SendUploadRequest(
+				err := httpClient.PostPacket(
 					context.WithoutCancel(ctx),
 					url.String(),
 					&buf.MultiBufferContainer{MultiBuffer: chunk},

+ 0 - 47
transport/internet/splithttp/lazy_reader.go

@@ -1,47 +0,0 @@
-package splithttp
-
-import (
-	"io"
-	"sync"
-)
-
-// Close is intentionally not supported by LazyReader because it's not clear
-// how CreateReader should be aborted in case of Close. It's best to wrap
-// LazyReader in another struct that handles Close correctly, or better, stop
-// using LazyReader entirely.
-type LazyReader struct {
-	readerSync   sync.Mutex
-	CreateReader func() (io.Reader, error)
-	reader       io.Reader
-	readerError  error
-}
-
-func (r *LazyReader) getReader() (io.Reader, error) {
-	r.readerSync.Lock()
-	defer r.readerSync.Unlock()
-	if r.reader != nil {
-		return r.reader, nil
-	}
-
-	if r.readerError != nil {
-		return nil, r.readerError
-	}
-
-	reader, err := r.CreateReader()
-	if err != nil {
-		r.readerError = err
-		return nil, err
-	}
-
-	r.reader = reader
-	return reader, nil
-}
-
-func (r *LazyReader) Read(b []byte) (int, error) {
-	reader, err := r.getReader()
-	if err != nil {
-		return 0, err
-	}
-	n, err := reader.Read(b)
-	return n, err
-}