|
|
@@ -5,9 +5,6 @@ import (
|
|
|
"bytes"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
- "github.com/bytedance/gopkg/util/gopool"
|
|
|
- "github.com/gin-gonic/gin"
|
|
|
- "github.com/gorilla/websocket"
|
|
|
"io"
|
|
|
"net/http"
|
|
|
"one-api/common"
|
|
|
@@ -19,9 +16,33 @@ import (
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
+
|
|
|
+ "github.com/bytedance/gopkg/util/gopool"
|
|
|
+ "github.com/gin-gonic/gin"
|
|
|
+ "github.com/gorilla/websocket"
|
|
|
)
|
|
|
|
|
|
+func sendStreamData(c *gin.Context, data string, forceFormat bool) error {
|
|
|
+ if data == "" {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if forceFormat {
|
|
|
+ var lastStreamResponse dto.ChatCompletionsStreamResponse
|
|
|
+ if err := json.Unmarshal(common.StringToByteSlice(data), &lastStreamResponse); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return service.ObjectData(c, lastStreamResponse)
|
|
|
+ }
|
|
|
+ return service.StringData(c, data)
|
|
|
+}
|
|
|
+
|
|
|
func OaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
|
|
|
+ if resp == nil || resp.Body == nil {
|
|
|
+ common.LogError(c, "invalid response or response body")
|
|
|
+ return service.OpenAIErrorWrapper(fmt.Errorf("invalid response"), "invalid_response", http.StatusInternalServerError), nil
|
|
|
+ }
|
|
|
+
|
|
|
containStreamUsage := false
|
|
|
var responseId string
|
|
|
var createAt int64 = 0
|
|
|
@@ -31,6 +52,13 @@ func OaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel
|
|
|
var responseTextBuilder strings.Builder
|
|
|
var usage = &dto.Usage{}
|
|
|
var streamItems []string // store stream items
|
|
|
+ var forceFormat bool
|
|
|
+
|
|
|
+ if info.ChannelType == common.ChannelTypeCustom {
|
|
|
+ if forceFmt, ok := info.ChannelSetting["force_format"].(bool); ok {
|
|
|
+ forceFormat = forceFmt
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
toolCount := 0
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
|
@@ -62,7 +90,7 @@ func OaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel
|
|
|
data = data[6:]
|
|
|
if !strings.HasPrefix(data, "[DONE]") {
|
|
|
if lastStreamData != "" {
|
|
|
- err := service.StringData(c, lastStreamData)
|
|
|
+ err := sendStreamData(c, lastStreamData, forceFormat)
|
|
|
if err != nil {
|
|
|
common.LogError(c, "streaming error: "+err.Error())
|
|
|
}
|
|
|
@@ -105,7 +133,7 @@ func OaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel
|
|
|
}
|
|
|
}
|
|
|
if shouldSendLastResp {
|
|
|
- service.StringData(c, lastStreamData)
|
|
|
+ sendStreamData(c, lastStreamData, forceFormat)
|
|
|
}
|
|
|
|
|
|
// 计算token
|
|
|
@@ -375,6 +403,10 @@ func getTextFromJSON(body []byte) (string, error) {
|
|
|
}
|
|
|
|
|
|
func OpenaiRealtimeHandler(c *gin.Context, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.RealtimeUsage) {
|
|
|
+ if info == nil || info.ClientWs == nil || info.TargetWs == nil {
|
|
|
+ return service.OpenAIErrorWrapper(fmt.Errorf("invalid websocket connection"), "invalid_connection", http.StatusBadRequest), nil
|
|
|
+ }
|
|
|
+
|
|
|
info.IsStream = true
|
|
|
clientConn := info.ClientWs
|
|
|
targetConn := info.TargetWs
|
|
|
@@ -390,6 +422,11 @@ func OpenaiRealtimeHandler(c *gin.Context, info *relaycommon.RelayInfo) (*dto.Op
|
|
|
sumUsage := &dto.RealtimeUsage{}
|
|
|
|
|
|
gopool.Go(func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ errChan <- fmt.Errorf("panic in client reader: %v", r)
|
|
|
+ }
|
|
|
+ }()
|
|
|
for {
|
|
|
select {
|
|
|
case <-c.Done():
|
|
|
@@ -445,6 +482,11 @@ func OpenaiRealtimeHandler(c *gin.Context, info *relaycommon.RelayInfo) (*dto.Op
|
|
|
})
|
|
|
|
|
|
gopool.Go(func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ errChan <- fmt.Errorf("panic in target reader: %v", r)
|
|
|
+ }
|
|
|
+ }()
|
|
|
for {
|
|
|
select {
|
|
|
case <-c.Done():
|
|
|
@@ -568,6 +610,10 @@ func OpenaiRealtimeHandler(c *gin.Context, info *relaycommon.RelayInfo) (*dto.Op
|
|
|
}
|
|
|
|
|
|
func preConsumeUsage(ctx *gin.Context, info *relaycommon.RelayInfo, usage *dto.RealtimeUsage, totalUsage *dto.RealtimeUsage) error {
|
|
|
+ if usage == nil || totalUsage == nil {
|
|
|
+ return fmt.Errorf("invalid usage pointer")
|
|
|
+ }
|
|
|
+
|
|
|
totalUsage.TotalTokens += usage.TotalTokens
|
|
|
totalUsage.InputTokens += usage.InputTokens
|
|
|
totalUsage.OutputTokens += usage.OutputTokens
|