Просмотр исходного кода

XHTTP server: Finish stream-up's HTTP POST when its request.Body is closed

https://github.com/XTLS/Xray-core/issues/4373#issuecomment-2647908310

Fixes https://github.com/XTLS/Xray-core/issues/4373
RPRX 10 месяцев назад
Родитель
Сommit
dcd7e92c45
1 измененных файлов с 27 добавлено и 5 удалено
  1. 27 5
      transport/internet/splithttp/hub.go

+ 27 - 5
transport/internet/splithttp/hub.go

@@ -161,7 +161,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
 	}
 	scMaxEachPostBytes := int(h.ln.config.GetNormalizedScMaxEachPostBytes().To)
 
-	if request.Method == "POST" && sessionId != "" {
+	if request.Method == "POST" && sessionId != "" { // stream-up, packet-up
 		seq := ""
 		if len(subpath) > 1 {
 			seq = subpath[1]
@@ -173,8 +173,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
 				writer.WriteHeader(http.StatusBadRequest)
 				return
 			}
+			uploadDone := done.New()
 			err = currentSession.uploadQueue.Push(Packet{
-				Reader: request.Body,
+				Reader: &httpRequestBodyReader{
+					requestReader: request.Body,
+					uploadDone:    uploadDone,
+				},
 			})
 			if err != nil {
 				errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)")
@@ -199,8 +203,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
 						}
 					}()
 				}
-				<-request.Context().Done()
+				select {
+				case <-request.Context().Done():
+				case <-uploadDone.Wait():
+				}
 			}
+			uploadDone.Close()
 			return
 		}
 
@@ -243,7 +251,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
 		}
 
 		writer.WriteHeader(http.StatusOK)
-	} else if request.Method == "GET" || sessionId == "" {
+	} else if request.Method == "GET" || sessionId == "" { // stream-down, stream-one
 		responseFlusher, ok := writer.(http.Flusher)
 		if !ok {
 			panic("expected http.ResponseWriter to be an http.Flusher")
@@ -283,7 +291,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
 			reader:     request.Body,
 			remoteAddr: remoteAddr,
 		}
-		if sessionId != "" {
+		if sessionId != "" { // if not stream-one
 			conn.reader = currentSession.uploadQueue
 		}
 
@@ -302,6 +310,20 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
 	}
 }
 
+type httpRequestBodyReader struct {
+	requestReader io.ReadCloser
+	uploadDone    *done.Instance
+}
+
+func (c *httpRequestBodyReader) Read(b []byte) (int, error) {
+	return c.requestReader.Read(b)
+}
+
+func (c *httpRequestBodyReader) Close() error {
+	defer c.uploadDone.Close()
+	return c.requestReader.Close()
+}
+
 type httpResponseBodyWriter struct {
 	sync.Mutex
 	responseWriter  http.ResponseWriter