Selaa lähdekoodia

✨ feat: introduce IOCopyBytesGracefully function for streamlined response body handling

This update adds the IOCopyBytesGracefully function to the common package, which simplifies the process of copying response bodies in the OpenAI handlers. It enhances error handling and ensures proper resource management by encapsulating the logic for setting headers and writing response data. The OpenAI handlers have been refactored to utilize this new function, improving code clarity and maintainability.
CaIon 6 kuukautta sitten
vanhempi
sitoutus
9c08d8cf20
2 muutettua tiedostoa jossa 58 lisäystä ja 55 poistoa
  1. 44 1
      common/http.go
  2. 14 54
      relay/channel/openai/relay-openai.go

+ 44 - 1
common/http.go

@@ -1,6 +1,12 @@
 package common
 package common
 
 
-import "net/http"
+import (
+	"bytes"
+	"fmt"
+	"github.com/gin-gonic/gin"
+	"io"
+	"net/http"
+)
 
 
 func CloseResponseBodyGracefully(httpResponse *http.Response) {
 func CloseResponseBodyGracefully(httpResponse *http.Response) {
 	if httpResponse == nil || httpResponse.Body == nil {
 	if httpResponse == nil || httpResponse.Body == nil {
@@ -11,3 +17,40 @@ func CloseResponseBodyGracefully(httpResponse *http.Response) {
 		SysError("failed to close response body: " + err.Error())
 		SysError("failed to close response body: " + err.Error())
 	}
 	}
 }
 }
+
+func IOCopyBytesGracefully(c *gin.Context, src *http.Response, data []byte) {
+	if src == nil || src.Body == nil {
+		return
+	}
+
+	defer CloseResponseBodyGracefully(src)
+
+	if c.Writer == nil {
+		return
+	}
+
+	src.Body = io.NopCloser(bytes.NewBuffer(data))
+
+	// We shouldn't set the header before we parse the response body, because the parse part may fail.
+	// And then we will have to send an error response, but in this case, the header has already been set.
+	// So the httpClient will be confused by the response.
+	// For example, Postman will report error, and we cannot check the response at all.
+	for k, v := range src.Header {
+		// avoid setting Content-Length
+		if k == "Content-Length" {
+			continue
+		}
+		c.Writer.Header().Set(k, v[0])
+	}
+
+	// set Content-Length header manually
+	c.Writer.Header().Set("Content-Length", fmt.Sprintf("%d", len(data)))
+
+	c.Writer.WriteHeader(src.StatusCode)
+	c.Writer.WriteHeaderNow()
+
+	_, err := io.Copy(c.Writer, src.Body)
+	if err != nil {
+		LogError(c, fmt.Sprintf("failed to copy response body: %s", err.Error()))
+	}
+}

+ 14 - 54
relay/channel/openai/relay-openai.go

@@ -251,22 +251,8 @@ func OpenaiHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayI
 		responseBody = claudeRespStr
 		responseBody = claudeRespStr
 	}
 	}
 
 
-	// Reset response body
-	resp.Body = io.NopCloser(bytes.NewBuffer(responseBody))
-	// We shouldn't set the header before we parse the response body, because the parse part may fail.
-	// And then we will have to send an error response, but in this case, the header has already been set.
-	// So the httpClient will be confused by the response.
-	// For example, Postman will report error, and we cannot check the response at all.
-	for k, v := range resp.Header {
-		c.Writer.Header().Set(k, v[0])
-	}
-	c.Writer.WriteHeader(resp.StatusCode)
-	_, err = io.Copy(c.Writer, resp.Body)
-	if err != nil {
-		//return service.OpenAIErrorWrapper(err, "copy_response_body_failed", http.StatusInternalServerError), nil
-		common.SysError("error copying response body: " + err.Error())
-	}
-	common.CloseResponseBodyGracefully(resp)
+	common.IOCopyBytesGracefully(c, resp, responseBody)
+
 	return nil, &simpleResponse.Usage
 	return nil, &simpleResponse.Usage
 }
 }
 
 
@@ -304,24 +290,9 @@ func OpenaiSTTHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel
 		return service.OpenAIErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
 		return service.OpenAIErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
 	}
 	}
 	common.CloseResponseBodyGracefully(resp)
 	common.CloseResponseBodyGracefully(resp)
-	if err != nil {
-		return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
-	}
-	// Reset response body
-	resp.Body = io.NopCloser(bytes.NewBuffer(responseBody))
-	// We shouldn't set the header before we parse the response body, because the parse part may fail.
-	// And then we will have to send an error response, but in this case, the header has already been set.
-	// So the httpClient will be confused by the response.
-	// For example, Postman will report error, and we cannot check the response at all.
-	for k, v := range resp.Header {
-		c.Writer.Header().Set(k, v[0])
-	}
-	c.Writer.WriteHeader(resp.StatusCode)
-	_, err = io.Copy(c.Writer, resp.Body)
-	if err != nil {
-		return service.OpenAIErrorWrapper(err, "copy_response_body_failed", http.StatusInternalServerError), nil
-	}
-	common.CloseResponseBodyGracefully(resp)
+
+	// 写入新的 response body
+	common.IOCopyBytesGracefully(c, resp, responseBody)
 
 
 	usage := &dto.Usage{}
 	usage := &dto.Usage{}
 	usage.PromptTokens = audioTokens
 	usage.PromptTokens = audioTokens
@@ -602,33 +573,22 @@ func OpenaiHandlerWithUsage(c *gin.Context, resp *http.Response, info *relaycomm
 	if err != nil {
 	if err != nil {
 		return service.OpenAIErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
 		return service.OpenAIErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
 	}
 	}
-	common.CloseResponseBodyGracefully(resp)
-	// Reset response body
-	resp.Body = io.NopCloser(bytes.NewBuffer(responseBody))
-	// We shouldn't set the header before we parse the response body, because the parse part may fail.
-	// And then we will have to send an error response, but in this case, the header has already been set.
-	// So the httpClient will be confused by the response.
-	// For example, Postman will report error, and we cannot check the response at all.
-	for k, v := range resp.Header {
-		c.Writer.Header().Set(k, v[0])
-	}
-	// reset content length
-	c.Writer.Header().Set("Content-Length", fmt.Sprintf("%d", len(responseBody)))
-	c.Writer.WriteHeader(resp.StatusCode)
-	_, err = io.Copy(c.Writer, resp.Body)
+
+	var usageResp dto.SimpleResponse
+	err = json.Unmarshal(responseBody, &usageResp)
 	if err != nil {
 	if err != nil {
-		common.SysError("error copying response body: " + err.Error())
+		return service.OpenAIErrorWrapper(err, "parse_response_body_failed", http.StatusInternalServerError), nil
 	}
 	}
+
+	// 关闭旧的 response body(已被读取,再次读取会导致错误)
 	common.CloseResponseBodyGracefully(resp)
 	common.CloseResponseBodyGracefully(resp)
 
 
+	// 写入新的 response body
+	common.IOCopyBytesGracefully(c, resp, responseBody)
+
 	// Once we've written to the client, we should not return errors anymore
 	// Once we've written to the client, we should not return errors anymore
 	// because the upstream has already consumed resources and returned content
 	// because the upstream has already consumed resources and returned content
 	// We should still perform billing even if parsing fails
 	// We should still perform billing even if parsing fails
-	var usageResp dto.SimpleResponse
-	err = json.Unmarshal(responseBody, &usageResp)
-	if err != nil {
-		return service.OpenAIErrorWrapper(err, "parse_response_body_failed", http.StatusInternalServerError), nil
-	}
 	// format
 	// format
 	if usageResp.InputTokens > 0 {
 	if usageResp.InputTokens > 0 {
 		usageResp.PromptTokens += usageResp.InputTokens
 		usageResp.PromptTokens += usageResp.InputTokens