|
|
@@ -12,10 +12,13 @@ import (
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/bytedance/sonic"
|
|
|
+ "github.com/bytedance/sonic/ast"
|
|
|
"github.com/gin-gonic/gin"
|
|
|
"github.com/labring/aiproxy/core/common"
|
|
|
"github.com/labring/aiproxy/core/common/config"
|
|
|
"github.com/labring/aiproxy/core/common/consume"
|
|
|
+ "github.com/labring/aiproxy/core/common/conv"
|
|
|
"github.com/labring/aiproxy/core/common/notify"
|
|
|
"github.com/labring/aiproxy/core/common/reqlimit"
|
|
|
"github.com/labring/aiproxy/core/common/trylock"
|
|
|
@@ -23,7 +26,6 @@ import (
|
|
|
"github.com/labring/aiproxy/core/model"
|
|
|
"github.com/labring/aiproxy/core/monitor"
|
|
|
"github.com/labring/aiproxy/core/relay/adaptor"
|
|
|
- "github.com/labring/aiproxy/core/relay/adaptor/openai"
|
|
|
"github.com/labring/aiproxy/core/relay/adaptors"
|
|
|
"github.com/labring/aiproxy/core/relay/controller"
|
|
|
"github.com/labring/aiproxy/core/relay/meta"
|
|
|
@@ -46,9 +48,7 @@ type RelayController struct {
|
|
|
Handler RelayHandler
|
|
|
}
|
|
|
|
|
|
-var ErrInvalidChannelTypeCode = "invalid_channel_type"
|
|
|
-
|
|
|
-type warpAdaptor struct {
|
|
|
+type wrapAdaptor struct {
|
|
|
adaptor.Adaptor
|
|
|
}
|
|
|
|
|
|
@@ -101,7 +101,7 @@ func updateChannelModelTokensRequestRate(c *gin.Context, meta *meta.Meta, tpm, t
|
|
|
log.Data["ch_tps"] = tps
|
|
|
}
|
|
|
|
|
|
-func (w *warpAdaptor) DoRequest(meta *meta.Meta, c *gin.Context, req *http.Request) (*http.Response, error) {
|
|
|
+func (w *wrapAdaptor) DoRequest(meta *meta.Meta, c *gin.Context, req *http.Request) (*http.Response, error) {
|
|
|
count, overLimitCount, secondCount := reqlimit.PushChannelModelRequest(
|
|
|
context.Background(),
|
|
|
strconv.Itoa(meta.Channel.ID),
|
|
|
@@ -111,7 +111,7 @@ func (w *warpAdaptor) DoRequest(meta *meta.Meta, c *gin.Context, req *http.Reque
|
|
|
return w.Adaptor.DoRequest(meta, c, req)
|
|
|
}
|
|
|
|
|
|
-func (w *warpAdaptor) DoResponse(meta *meta.Meta, c *gin.Context, resp *http.Response) (*model.Usage, *relaymodel.ErrorWithStatusCode) {
|
|
|
+func (w *wrapAdaptor) DoResponse(meta *meta.Meta, c *gin.Context, resp *http.Response) (*model.Usage, adaptor.Error) {
|
|
|
usage, relayErr := w.Adaptor.DoResponse(meta, c, resp)
|
|
|
if usage == nil {
|
|
|
return nil, relayErr
|
|
|
@@ -155,15 +155,15 @@ func relayHandler(c *gin.Context, meta *meta.Meta) *controller.HandleResult {
|
|
|
adaptor, ok := adaptors.GetAdaptor(meta.Channel.Type)
|
|
|
if !ok {
|
|
|
return &controller.HandleResult{
|
|
|
- Error: openai.ErrorWrapperWithMessage(
|
|
|
+ Error: relaymodel.WrapperOpenAIErrorWithMessage(
|
|
|
fmt.Sprintf("invalid channel type: %d", meta.Channel.Type),
|
|
|
- ErrInvalidChannelTypeCode,
|
|
|
+ "invalid_channel_type",
|
|
|
http.StatusInternalServerError,
|
|
|
),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return controller.Handle(&warpAdaptor{adaptor}, c, meta)
|
|
|
+ return controller.Handle(&wrapAdaptor{adaptor}, c, meta)
|
|
|
}
|
|
|
|
|
|
func relayController(m mode.Mode) RelayController {
|
|
|
@@ -220,9 +220,9 @@ func RelayHelper(c *gin.Context, meta *meta.Meta, handel RelayHandler) (*control
|
|
|
}
|
|
|
return result, false
|
|
|
}
|
|
|
- shouldRetry := shouldRetry(c, *result.Error)
|
|
|
+ shouldRetry := shouldRetry(c, result.Error)
|
|
|
if shouldRetry {
|
|
|
- hasPermission := channelHasPermission(*result.Error)
|
|
|
+ hasPermission := channelHasPermission(result.Error)
|
|
|
beyondThreshold, banExecution, err := monitor.AddRequest(
|
|
|
context.Background(),
|
|
|
meta.OriginModel,
|
|
|
@@ -236,17 +236,17 @@ func RelayHelper(c *gin.Context, meta *meta.Meta, handel RelayHandler) (*control
|
|
|
}
|
|
|
switch {
|
|
|
case banExecution:
|
|
|
- notifyChannelIssue(c, meta, "autoBanned", "Auto Banned", *result.Error)
|
|
|
+ notifyChannelIssue(c, meta, "autoBanned", "Auto Banned", result.Error)
|
|
|
case beyondThreshold:
|
|
|
- notifyChannelIssue(c, meta, "beyondThreshold", "Error Rate Beyond Threshold", *result.Error)
|
|
|
+ notifyChannelIssue(c, meta, "beyondThreshold", "Error Rate Beyond Threshold", result.Error)
|
|
|
case !hasPermission:
|
|
|
- notifyChannelIssue(c, meta, "channelHasPermission", "No Permission", *result.Error)
|
|
|
+ notifyChannelIssue(c, meta, "channelHasPermission", "No Permission", result.Error)
|
|
|
}
|
|
|
}
|
|
|
return result, shouldRetry
|
|
|
}
|
|
|
|
|
|
-func notifyChannelIssue(c *gin.Context, meta *meta.Meta, issueType string, titleSuffix string, err relaymodel.ErrorWithStatusCode) {
|
|
|
+func notifyChannelIssue(c *gin.Context, meta *meta.Meta, issueType string, titleSuffix string, err adaptor.Error) {
|
|
|
var notifyFunc func(title string, message string)
|
|
|
|
|
|
lockKey := fmt.Sprintf("%s:%d:%s", issueType, meta.Channel.ID, meta.OriginModel)
|
|
|
@@ -261,6 +261,8 @@ func notifyChannelIssue(c *gin.Context, meta *meta.Meta, issueType string, title
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ respBody, _ := err.MarshalJSON()
|
|
|
+
|
|
|
message := fmt.Sprintf(
|
|
|
"channel: %s (type: %d, type name: %s, id: %d)\nmodel: %s\nmode: %s\nstatus code: %d\ndetail: %s\nrequest id: %s",
|
|
|
meta.Channel.Name,
|
|
|
@@ -269,12 +271,12 @@ func notifyChannelIssue(c *gin.Context, meta *meta.Meta, issueType string, title
|
|
|
meta.Channel.ID,
|
|
|
meta.OriginModel,
|
|
|
meta.Mode,
|
|
|
- err.StatusCode,
|
|
|
- err.JSONOrEmpty(),
|
|
|
+ err.StatusCode(),
|
|
|
+ conv.BytesToString(respBody),
|
|
|
meta.RequestID,
|
|
|
)
|
|
|
|
|
|
- if err.StatusCode == http.StatusTooManyRequests {
|
|
|
+ if err.StatusCode() == http.StatusTooManyRequests {
|
|
|
if !trylock.Lock(lockKey, time.Minute) {
|
|
|
return
|
|
|
}
|
|
|
@@ -408,7 +410,7 @@ func relay(c *gin.Context, mode mode.Mode, relayController RelayController) {
|
|
|
// Get initial channel
|
|
|
initialChannel, err := getInitialChannel(c, requestModel, log)
|
|
|
if err != nil || initialChannel == nil || initialChannel.channel == nil {
|
|
|
- middleware.AbortLogWithMessage(c,
|
|
|
+ middleware.AbortLogWithMessageWithMode(mode, c,
|
|
|
http.StatusServiceUnavailable,
|
|
|
"the upstream load is saturated, please try again later",
|
|
|
)
|
|
|
@@ -421,7 +423,7 @@ func relay(c *gin.Context, mode mode.Mode, relayController RelayController) {
|
|
|
if billingEnabled && relayController.GetRequestPrice != nil {
|
|
|
price, err = relayController.GetRequestPrice(c, mc)
|
|
|
if err != nil {
|
|
|
- middleware.AbortLogWithMessage(c,
|
|
|
+ middleware.AbortLogWithMessageWithMode(mode, c,
|
|
|
http.StatusInternalServerError,
|
|
|
"get request price failed: "+err.Error(),
|
|
|
)
|
|
|
@@ -434,7 +436,7 @@ func relay(c *gin.Context, mode mode.Mode, relayController RelayController) {
|
|
|
if billingEnabled && relayController.GetRequestUsage != nil {
|
|
|
requestUsage, err := relayController.GetRequestUsage(c, mc)
|
|
|
if err != nil {
|
|
|
- middleware.AbortLogWithMessage(c,
|
|
|
+ middleware.AbortLogWithMessageWithMode(mode, c,
|
|
|
http.StatusInternalServerError,
|
|
|
"get request usage failed: "+err.Error(),
|
|
|
)
|
|
|
@@ -442,12 +444,10 @@ func relay(c *gin.Context, mode mode.Mode, relayController RelayController) {
|
|
|
}
|
|
|
gbc := middleware.GetGroupBalanceConsumerFromContext(c)
|
|
|
if !gbc.CheckBalance(consume.CalculateAmount(http.StatusOK, requestUsage, price)) {
|
|
|
- middleware.AbortLogWithMessage(c,
|
|
|
+ middleware.AbortLogWithMessageWithMode(mode, c,
|
|
|
http.StatusForbidden,
|
|
|
fmt.Sprintf("group (%s) balance not enough", gbc.Group),
|
|
|
- &middleware.ErrorField{
|
|
|
- Code: middleware.GroupBalanceNotEnough,
|
|
|
- },
|
|
|
+ middleware.GroupBalanceNotEnough,
|
|
|
)
|
|
|
return
|
|
|
}
|
|
|
@@ -503,8 +503,9 @@ func recordResult(
|
|
|
code := http.StatusOK
|
|
|
content := ""
|
|
|
if result.Error != nil {
|
|
|
- code = result.Error.StatusCode
|
|
|
- content = result.Error.JSONOrEmpty()
|
|
|
+ code = result.Error.StatusCode()
|
|
|
+ respBody, _ := result.Error.MarshalJSON()
|
|
|
+ content = conv.BytesToString(respBody)
|
|
|
}
|
|
|
|
|
|
var detail *model.RequestDetail
|
|
|
@@ -606,15 +607,14 @@ func getInitialChannel(c *gin.Context, modelName string, log *log.Entry) (*initi
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
-func handleRelayResult(c *gin.Context, bizErr *relaymodel.ErrorWithStatusCode, retry bool, retryTimes int) (done bool) {
|
|
|
+func handleRelayResult(c *gin.Context, bizErr adaptor.Error, retry bool, retryTimes int) (done bool) {
|
|
|
if bizErr == nil {
|
|
|
return true
|
|
|
}
|
|
|
if !retry ||
|
|
|
retryTimes == 0 ||
|
|
|
c.Request.Context().Err() != nil {
|
|
|
- bizErr.Error.Message = middleware.MessageWithRequestID(c, bizErr.Error.Message)
|
|
|
- c.JSON(bizErr.StatusCode, bizErr)
|
|
|
+ ErrorWithRequestID(c, bizErr)
|
|
|
return true
|
|
|
}
|
|
|
return false
|
|
|
@@ -636,7 +636,7 @@ func initRetryState(retryTimes int, channel *initialChannel, meta *meta.Meta, re
|
|
|
state.exhausted = true
|
|
|
}
|
|
|
|
|
|
- if !channelHasPermission(*result.Error) {
|
|
|
+ if !channelHasPermission(result.Error) {
|
|
|
state.ignoreChannelIDs = append(state.ignoreChannelIDs, int64(channel.channel.ID))
|
|
|
} else {
|
|
|
state.lastHasPermissionChannel = channel.channel
|
|
|
@@ -650,7 +650,7 @@ func retryLoop(c *gin.Context, mode mode.Mode, state *retryState, relayControlle
|
|
|
i := 0
|
|
|
|
|
|
for {
|
|
|
- lastStatusCode := state.result.Error.StatusCode
|
|
|
+ lastStatusCode := state.result.Error.StatusCode()
|
|
|
lastChannelID := state.meta.Channel.ID
|
|
|
newChannel, err := getRetryChannel(state)
|
|
|
if err == nil {
|
|
|
@@ -734,8 +734,7 @@ func retryLoop(c *gin.Context, mode mode.Mode, state *retryState, relayControlle
|
|
|
}
|
|
|
|
|
|
if state.result.Error != nil {
|
|
|
- state.result.Error.Error.Message = middleware.MessageWithRequestID(c, state.result.Error.Error.Message)
|
|
|
- c.JSON(state.result.Error.StatusCode, state.result.Error)
|
|
|
+ ErrorWithRequestID(c, state.result.Error)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -776,7 +775,7 @@ func handleRetryResult(ctx *gin.Context, retry bool, newChannel *model.Channel,
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
- hasPermission := channelHasPermission(*state.result.Error)
|
|
|
+ hasPermission := channelHasPermission(state.result.Error)
|
|
|
|
|
|
if state.exhausted {
|
|
|
if !hasPermission {
|
|
|
@@ -802,11 +801,8 @@ var channelNoRetryStatusCodesMap = map[int]struct{}{
|
|
|
}
|
|
|
|
|
|
// 仅当是channel错误时,才需要记录,用户请求参数错误时,不需要记录
|
|
|
-func shouldRetry(_ *gin.Context, relayErr relaymodel.ErrorWithStatusCode) bool {
|
|
|
- if relayErr.Error.Code == ErrInvalidChannelTypeCode {
|
|
|
- return false
|
|
|
- }
|
|
|
- _, ok := channelNoRetryStatusCodesMap[relayErr.StatusCode]
|
|
|
+func shouldRetry(_ *gin.Context, relayErr adaptor.Error) bool {
|
|
|
+ _, ok := channelNoRetryStatusCodesMap[relayErr.StatusCode()]
|
|
|
return !ok
|
|
|
}
|
|
|
|
|
|
@@ -817,11 +813,8 @@ var channelNoPermissionStatusCodesMap = map[int]struct{}{
|
|
|
http.StatusNotFound: {},
|
|
|
}
|
|
|
|
|
|
-func channelHasPermission(relayErr relaymodel.ErrorWithStatusCode) bool {
|
|
|
- if relayErr.Error.Code == ErrInvalidChannelTypeCode {
|
|
|
- return false
|
|
|
- }
|
|
|
- _, ok := channelNoPermissionStatusCodesMap[relayErr.StatusCode]
|
|
|
+func channelHasPermission(relayErr adaptor.Error) bool {
|
|
|
+ _, ok := channelNoPermissionStatusCodesMap[relayErr.StatusCode()]
|
|
|
return !ok
|
|
|
}
|
|
|
|
|
|
@@ -843,11 +836,39 @@ func relayDelay() {
|
|
|
}
|
|
|
|
|
|
func RelayNotImplemented(c *gin.Context) {
|
|
|
- c.JSON(http.StatusNotImplemented, gin.H{
|
|
|
- "error": &relaymodel.Error{
|
|
|
+ ErrorWithRequestID(c,
|
|
|
+ relaymodel.NewOpenAIError(http.StatusNotImplemented, relaymodel.OpenAIError{
|
|
|
Message: "API not implemented",
|
|
|
- Type: middleware.ErrorTypeAIPROXY,
|
|
|
+ Type: relaymodel.ErrorTypeAIPROXY,
|
|
|
Code: "api_not_implemented",
|
|
|
- },
|
|
|
- })
|
|
|
+ }),
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+func ErrorWithRequestID(c *gin.Context, relayErr adaptor.Error) {
|
|
|
+ requestID := middleware.GetRequestID(c)
|
|
|
+ if requestID == "" {
|
|
|
+ c.JSON(relayErr.StatusCode(), relayErr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ log := middleware.GetLogger(c)
|
|
|
+ data, err := relayErr.MarshalJSON()
|
|
|
+ if err != nil {
|
|
|
+ log.Errorf("marshal error failed: %+v", err)
|
|
|
+ c.JSON(relayErr.StatusCode(), relayErr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ node, err := sonic.Get(data)
|
|
|
+ if err != nil {
|
|
|
+ log.Errorf("get node failed: %+v", err)
|
|
|
+ c.JSON(relayErr.StatusCode(), relayErr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ _, err = node.Set("aiproxy", ast.NewString(requestID))
|
|
|
+ if err != nil {
|
|
|
+ log.Errorf("set request id failed: %+v", err)
|
|
|
+ c.JSON(relayErr.StatusCode(), relayErr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ c.JSON(relayErr.StatusCode(), &node)
|
|
|
}
|