Browse Source

Merge pull request #959 from Praying/main

fix(relay): 优化数据流处理
Calcium-Ion 9 months ago
parent
commit
909c5eb276
2 changed files with 7 additions and 10 deletions
  1. 1 9
      relay/channel/dify/relay-dify.go
  2. 6 1
      relay/helper/stream_scanner.go

+ 1 - 9
relay/channel/dify/relay-dify.go

@@ -1,7 +1,6 @@
 package dify
 
 import (
-	"bufio"
 	"bytes"
 	"encoding/base64"
 	"encoding/json"
@@ -213,12 +212,8 @@ func streamResponseDify2OpenAI(difyResponse DifyChunkChatCompletionResponse) *dt
 func difyStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
 	var responseText string
 	usage := &dto.Usage{}
-	scanner := bufio.NewScanner(resp.Body)
-	scanner.Split(bufio.ScanLines)
 	var nodeToken int
-
 	helper.SetEventStreamHeaders(c)
-
 	helper.StreamScannerHandler(c, resp, info, func(data string) bool {
 		var difyResponse DifyChunkChatCompletionResponse
 		err := json.Unmarshal([]byte(data), &difyResponse)
@@ -247,13 +242,10 @@ func difyStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Re
 		}
 		return true
 	})
-	if err := scanner.Err(); err != nil {
-		common.SysError("error reading stream: " + err.Error())
-	}
 	helper.Done(c)
 	err := resp.Body.Close()
 	if err != nil {
-		//return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
+		// return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
 		common.SysError("close_response_body_failed: " + err.Error())
 	}
 	if usage.TotalTokens == 0 {

+ 6 - 1
relay/helper/stream_scanner.go

@@ -14,6 +14,11 @@ import (
 	"github.com/gin-gonic/gin"
 )
 
+const (
+	InitialScannerBufferSize = 1 << 20  // 1MB (1*1024*1024)
+	MaxScannerBufferSize     = 10 << 20 // 10MB (10*1024*1024)
+)
+
 func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo, dataHandler func(data string) bool) {
 
 	if resp == nil {
@@ -38,7 +43,7 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon
 		ticker.Stop()
 		close(stopChan)
 	}()
-
+	scanner.Buffer(make([]byte, InitialScannerBufferSize), MaxScannerBufferSize)
 	scanner.Split(bufio.ScanLines)
 	SetEventStreamHeaders(c)