Просмотр исходного кода

feat: log user and metadata and retry log table (#178)

* feat: log user and metadata and retry log table

* fix: record log hour

* fix: ci lint
zijiren 7 месяцев назад
Родитель
Сommit
6bd1ce1478

+ 1 - 1
README.md

@@ -105,7 +105,7 @@ docker-compose up -d
 ### Logging Configuration
 
 - `LOG_STORAGE_HOURS`: Hours to store logs (0 means unlimited), default is `0`
-- `LOG_CONTENT_STORAGE_HOURS`: Hours to store log `content` `ip` `endpoint` `ttfb_milliseconds`, default is `0`
+- `RETRY_LOG_STORAGE_HOURS`: Hours to store retry log, default is `0`
 - `SAVE_ALL_LOG_DETAIL`: Save all log details, default is `false`
 - `LOG_DETAIL_REQUEST_BODY_MAX_SIZE`: Maximum size for request body in log details, default is `128KB`
 - `LOG_DETAIL_RESPONSE_BODY_MAX_SIZE`: Maximum size for response body in log details, default is `128KB`

+ 1 - 1
README.zh.md

@@ -106,7 +106,7 @@ docker-compose up -d
 ### 日志配置
 
 - `LOG_STORAGE_HOURS`: 日志存储时间(0 表示不限),默认 `0`
-- `LOG_CONTENT_STORAGE_HOURS`: 日志 `content` `ip` `endpoint` `ttfb_milliseconds` 存储时间(0 表示不限),默认 `0`
+- `RETRY_LOG_STORAGE_HOURS`: 重试日志存储时间(0 表示不限),默认 `0`
 - `SAVE_ALL_LOG_DETAIL`: 保存所有日志详情,默认 `false` 则只保存错误日志
 - `LOG_DETAIL_REQUEST_BODY_MAX_SIZE`: 日志详情请求体最大大小,默认 `128KB`
 - `LOG_DETAIL_RESPONSE_BODY_MAX_SIZE`: 日志详情响应体最大大小,默认 `128KB`

+ 6 - 6
core/common/config/config.go

@@ -24,7 +24,7 @@ var (
 var (
 	disableServe                 atomic.Bool
 	logStorageHours              int64 // default 0 means no limit
-	logContentStorageHours       int64 // default 0 means no limit
+	retryLogStorageHours         int64 // default 0 means no limit
 	saveAllLogDetail             atomic.Bool
 	logDetailRequestBodyMaxSize  int64 = 128 * 1024 // 128KB
 	logDetailResponseBodyMaxSize int64 = 128 * 1024 // 128KB
@@ -116,13 +116,13 @@ func SetLogStorageHours(hours int64) {
 	atomic.StoreInt64(&logStorageHours, hours)
 }
 
-func GetLogContentStorageHours() int64 {
-	return atomic.LoadInt64(&logContentStorageHours)
+func GetRetryLogStorageHours() int64 {
+	return atomic.LoadInt64(&retryLogStorageHours)
 }
 
-func SetLogContentStorageHours(hours int64) {
-	hours = env.Int64("LOG_CONTENT_STORAGE_HOURS", hours)
-	atomic.StoreInt64(&logContentStorageHours, hours)
+func SetRetryLogStorageHours(hours int64) {
+	hours = env.Int64("RETRY_LOG_STORAGE_HOURS", hours)
+	atomic.StoreInt64(&retryLogStorageHours, hours)
 }
 
 func GetLogDetailStorageHours() int64 {

+ 8 - 9
core/common/consume/consume.go

@@ -7,7 +7,6 @@ import (
 	"time"
 
 	"github.com/labring/aiproxy/core/common/balance"
-	"github.com/labring/aiproxy/core/common/config"
 	"github.com/labring/aiproxy/core/common/notify"
 	"github.com/labring/aiproxy/core/model"
 	"github.com/labring/aiproxy/core/relay/meta"
@@ -33,6 +32,8 @@ func AsyncConsume(
 	retryTimes int,
 	requestDetail *model.RequestDetail,
 	downstreamResult bool,
+	user string,
+	metadata map[string]string,
 ) {
 	consumeWaitGroup.Add(1)
 	defer func() {
@@ -55,6 +56,8 @@ func AsyncConsume(
 		retryTimes,
 		requestDetail,
 		downstreamResult,
+		user,
+		metadata,
 	)
 }
 
@@ -71,18 +74,12 @@ func Consume(
 	retryTimes int,
 	requestDetail *model.RequestDetail,
 	downstreamResult bool,
+	user string,
+	metadata map[string]string,
 ) {
 	amount := CalculateAmount(code, usage, modelPrice)
 	amount = consumeAmount(ctx, amount, postGroupConsumer, meta)
 
-	if requestDetail != nil && config.GetLogContentStorageHours() < 0 {
-		requestDetail = nil
-	}
-
-	if requestDetail == nil && config.GetLogStorageHours() < 0 {
-		return
-	}
-
 	err := recordConsume(
 		meta,
 		code,
@@ -95,6 +92,8 @@ func Consume(
 		amount,
 		retryTimes,
 		downstreamResult,
+		user,
+		metadata,
 	)
 	if err != nil {
 		log.Error("error batch record consume: " + err.Error())

+ 5 - 1
core/common/consume/record.go

@@ -19,8 +19,10 @@ func recordConsume(
 	amount float64,
 	retryTimes int,
 	downstreamResult bool,
+	user string,
+	metadata map[string]string,
 ) error {
-	return model.BatchRecordConsume(
+	return model.BatchRecordLogs(
 		meta.RequestID,
 		meta.RequestAt,
 		meta.RetryAt,
@@ -41,5 +43,7 @@ func recordConsume(
 		usage,
 		modelPrice,
 		amount,
+		user,
+		metadata,
 	)
 }

+ 20 - 20
core/controller/log.go

@@ -29,17 +29,17 @@ func parseTimeRange(c *gin.Context) (startTime, endTime time.Time) {
 }
 
 func parseCommonParams(c *gin.Context) (params struct {
-	tokenName  string
-	modelName  string
-	channelID  int
-	tokenID    int
-	order      string
-	requestID  string
-	codeType   string
-	code       int
-	withBody   bool
-	ip         string
-	resultOnly bool
+	tokenName string
+	modelName string
+	channelID int
+	tokenID   int
+	order     string
+	requestID string
+	codeType  string
+	code      int
+	withBody  bool
+	ip        string
+	user      string
 },
 ) {
 	params.tokenName = c.Query("token_name")
@@ -52,7 +52,7 @@ func parseCommonParams(c *gin.Context) (params struct {
 	params.code, _ = strconv.Atoi(c.Query("code"))
 	params.withBody, _ = strconv.ParseBool(c.Query("with_body"))
 	params.ip = c.Query("ip")
-	params.resultOnly, _ = strconv.ParseBool(c.Query("result_only"))
+	params.user = c.Query("user")
 	return
 }
 
@@ -78,7 +78,7 @@ func parseCommonParams(c *gin.Context) (params struct {
 //	@Param			code			query		int		false	"Status code"
 //	@Param			with_body		query		bool	false	"With body"
 //	@Param			ip				query		string	false	"IP"
-//	@Param			result_only		query		bool	false	"Result only"
+//	@Param			user			query		string	false	"User"
 //	@Success		200				{object}	middleware.APIResponse{data=model.GetLogsResult}
 //	@Router			/api/logs [get]
 func GetLogs(c *gin.Context) {
@@ -101,9 +101,9 @@ func GetLogs(c *gin.Context) {
 		params.code,
 		params.withBody,
 		params.ip,
+		params.user,
 		page,
 		perPage,
-		params.resultOnly,
 	)
 	if err != nil {
 		middleware.ErrorResponse(c, http.StatusOK, err.Error())
@@ -134,7 +134,7 @@ func GetLogs(c *gin.Context) {
 //	@Param			code			query		int		false	"Status code"
 //	@Param			with_body		query		bool	false	"With body"
 //	@Param			ip				query		string	false	"IP"
-//	@Param			result_only		query		bool	false	"Result only"
+//	@Param			user			query		string	false	"User"
 //	@Success		200				{object}	middleware.APIResponse{data=model.GetGroupLogsResult}
 //	@Router			/api/log/{group} [get]
 func GetGroupLogs(c *gin.Context) {
@@ -162,9 +162,9 @@ func GetGroupLogs(c *gin.Context) {
 		params.code,
 		params.withBody,
 		params.ip,
+		params.user,
 		page,
 		perPage,
-		params.resultOnly,
 	)
 	if err != nil {
 		middleware.ErrorResponse(c, http.StatusOK, err.Error())
@@ -196,7 +196,7 @@ func GetGroupLogs(c *gin.Context) {
 //	@Param			code			query		int		false	"Status code"
 //	@Param			with_body		query		bool	false	"With body"
 //	@Param			ip				query		string	false	"IP"
-//	@Param			result_only		query		bool	false	"Result only"
+//	@Param			user			query		string	false	"User"
 //	@Success		200				{object}	middleware.APIResponse{data=model.GetLogsResult}
 //	@Router			/api/logs/search [get]
 func SearchLogs(c *gin.Context) {
@@ -222,9 +222,9 @@ func SearchLogs(c *gin.Context) {
 		params.code,
 		params.withBody,
 		params.ip,
+		params.user,
 		page,
 		perPage,
-		params.resultOnly,
 	)
 	if err != nil {
 		middleware.ErrorResponse(c, http.StatusOK, err.Error())
@@ -256,7 +256,7 @@ func SearchLogs(c *gin.Context) {
 //	@Param			code			query		int		false	"Status code"
 //	@Param			with_body		query		bool	false	"With body"
 //	@Param			ip				query		string	false	"IP"
-//	@Param			result_only		query		bool	false	"Result only"
+//	@Param			user			query		string	false	"User"
 //	@Success		200				{object}	middleware.APIResponse{data=model.GetGroupLogsResult}
 //	@Router			/api/log/{group}/search [get]
 func SearchGroupLogs(c *gin.Context) {
@@ -286,9 +286,9 @@ func SearchGroupLogs(c *gin.Context) {
 		params.code,
 		params.withBody,
 		params.ip,
+		params.user,
 		page,
 		perPage,
-		params.resultOnly,
 	)
 	if err != nil {
 		middleware.ErrorResponse(c, http.StatusOK, err.Error())

+ 52 - 5
core/controller/relay-controller.go

@@ -355,7 +355,16 @@ func relay(c *gin.Context, mode mode.Mode, relayController RelayController) {
 		retryTimes = int(mc.RetryTimes)
 	}
 	if handleRelayResult(c, result.Error, retry, retryTimes) {
-		recordResult(c, meta, price, result, 0, true)
+		recordResult(
+			c,
+			meta,
+			price,
+			result,
+			0,
+			true,
+			middleware.GetRequestUser(c),
+			middleware.GetRequestMetadata(c),
+		)
 		return
 	}
 
@@ -373,7 +382,16 @@ func relay(c *gin.Context, mode mode.Mode, relayController RelayController) {
 }
 
 // recordResult records the consumption for the final result
-func recordResult(c *gin.Context, meta *meta.Meta, price model.Price, result *controller.HandleResult, retryTimes int, downstreamResult bool) {
+func recordResult(
+	c *gin.Context,
+	meta *meta.Meta,
+	price model.Price,
+	result *controller.HandleResult,
+	retryTimes int,
+	downstreamResult bool,
+	user string,
+	metadata map[string]string,
+) {
 	code := http.StatusOK
 	content := ""
 	if result.Error != nil {
@@ -416,6 +434,8 @@ func recordResult(c *gin.Context, meta *meta.Meta, price model.Price, result *co
 		retryTimes,
 		detail,
 		downstreamResult,
+		user,
+		metadata,
 	)
 }
 
@@ -532,13 +552,31 @@ func retryLoop(c *gin.Context, mode mode.Mode, state *retryState, relayControlle
 			}
 			// when the last request has not recorded the result, record the result
 			if state.meta != nil && state.result != nil {
-				recordResult(c, state.meta, state.price, state.result, i, true)
+				recordResult(
+					c,
+					state.meta,
+					state.price,
+					state.result,
+					i,
+					true,
+					middleware.GetRequestUser(c),
+					middleware.GetRequestMetadata(c),
+				)
 			}
 			break
 		}
 		// when the last request has not recorded the result, record the result
 		if state.meta != nil && state.result != nil {
-			recordResult(c, state.meta, state.price, state.result, i, false)
+			recordResult(
+				c,
+				state.meta,
+				state.price,
+				state.result,
+				i,
+				false,
+				middleware.GetRequestUser(c),
+				middleware.GetRequestMetadata(c),
+			)
 			state.meta = nil
 			state.result = nil
 		}
@@ -569,7 +607,16 @@ func retryLoop(c *gin.Context, mode mode.Mode, state *retryState, relayControlle
 
 		done := handleRetryResult(c, retry, newChannel, state)
 		if done || i == state.retryTimes-1 {
-			recordResult(c, state.meta, state.price, state.result, i+1, true)
+			recordResult(
+				c,
+				state.meta,
+				state.price,
+				state.result,
+				i+1,
+				true,
+				middleware.GetRequestUser(c),
+				middleware.GetRequestMetadata(c),
+			)
 			break
 		}
 

+ 22 - 15
core/docs/docs.go

@@ -2246,9 +2246,9 @@ const docTemplate = `{
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -2442,9 +2442,9 @@ const docTemplate = `{
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -2675,9 +2675,9 @@ const docTemplate = `{
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -3037,9 +3037,9 @@ const docTemplate = `{
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -8894,9 +8894,6 @@ const docTemplate = `{
                 "created_at": {
                     "type": "string"
                 },
-                "downstream_result": {
-                    "type": "boolean"
-                },
                 "endpoint": {
                     "type": "string"
                 },
@@ -8909,6 +8906,12 @@ const docTemplate = `{
                 "ip": {
                     "type": "string"
                 },
+                "metadata": {
+                    "type": "object",
+                    "additionalProperties": {
+                        "type": "string"
+                    }
+                },
                 "mode": {
                     "type": "integer"
                 },
@@ -8947,6 +8950,10 @@ const docTemplate = `{
                 },
                 "used_amount": {
                     "type": "number"
+                },
+                "user": {
+                    "description": "https://platform.openai.com/docs/guides/safety-best-practices#end-user-ids",
+                    "type": "string"
                 }
             }
         },

+ 22 - 15
core/docs/swagger.json

@@ -2237,9 +2237,9 @@
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -2433,9 +2433,9 @@
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -2666,9 +2666,9 @@
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -3028,9 +3028,9 @@
                         "in": "query"
                     },
                     {
-                        "type": "boolean",
-                        "description": "Result only",
-                        "name": "result_only",
+                        "type": "string",
+                        "description": "User",
+                        "name": "user",
                         "in": "query"
                     }
                 ],
@@ -8885,9 +8885,6 @@
                 "created_at": {
                     "type": "string"
                 },
-                "downstream_result": {
-                    "type": "boolean"
-                },
                 "endpoint": {
                     "type": "string"
                 },
@@ -8900,6 +8897,12 @@
                 "ip": {
                     "type": "string"
                 },
+                "metadata": {
+                    "type": "object",
+                    "additionalProperties": {
+                        "type": "string"
+                    }
+                },
                 "mode": {
                     "type": "integer"
                 },
@@ -8938,6 +8941,10 @@
                 },
                 "used_amount": {
                     "type": "number"
+                },
+                "user": {
+                    "description": "https://platform.openai.com/docs/guides/safety-best-practices#end-user-ids",
+                    "type": "string"
                 }
             }
         },

+ 19 - 14
core/docs/swagger.yaml

@@ -1065,8 +1065,6 @@ definitions:
         type: string
       created_at:
         type: string
-      downstream_result:
-        type: boolean
       endpoint:
         type: string
       group:
@@ -1075,6 +1073,10 @@ definitions:
         type: integer
       ip:
         type: string
+      metadata:
+        additionalProperties:
+          type: string
+        type: object
       mode:
         type: integer
       model:
@@ -1101,6 +1103,9 @@ definitions:
         $ref: '#/definitions/github_com_labring_aiproxy_core_model.Usage'
       used_amount:
         type: number
+      user:
+        description: https://platform.openai.com/docs/guides/safety-best-practices#end-user-ids
+        type: string
     type: object
   model.MCPOpenAPIConfig:
     properties:
@@ -2934,10 +2939,10 @@ paths:
         in: query
         name: ip
         type: string
-      - description: Result only
+      - description: User
         in: query
-        name: result_only
-        type: boolean
+        name: user
+        type: string
       produces:
       - application/json
       responses:
@@ -3056,10 +3061,10 @@ paths:
         in: query
         name: ip
         type: string
-      - description: Result only
+      - description: User
         in: query
-        name: result_only
-        type: boolean
+        name: user
+        type: string
       produces:
       - application/json
       responses:
@@ -3224,10 +3229,10 @@ paths:
         in: query
         name: ip
         type: string
-      - description: Result only
+      - description: User
         in: query
-        name: result_only
-        type: boolean
+        name: user
+        type: string
       produces:
       - application/json
       responses:
@@ -3420,10 +3425,10 @@ paths:
         in: query
         name: ip
         type: string
-      - description: Result only
+      - description: User
         in: query
-        name: result_only
-        type: boolean
+        name: user
+        type: string
       produces:
       - application/json
       responses:

+ 11 - 9
core/middleware/ctxkey.go

@@ -1,13 +1,15 @@
 package middleware
 
 const (
-	Channel      = "channel"
-	Group        = "group"
-	Token        = "token"
-	GroupBalance = "group_balance"
-	RequestModel = "request_model"
-	RequestAt    = "request_at"
-	RequestID    = "request_id"
-	ModelCaches  = "model_caches"
-	ModelConfig  = "model_config"
+	Channel         = "channel"
+	Group           = "group"
+	Token           = "token"
+	GroupBalance    = "group_balance"
+	RequestModel    = "request_model"
+	RequestUser     = "request_user"
+	RequestMetadata = "request_metadata"
+	RequestAt       = "request_at"
+	RequestID       = "request_id"
+	ModelCaches     = "model_caches"
+	ModelConfig     = "model_config"
 )

+ 95 - 4
core/middleware/distributor.go

@@ -358,6 +358,26 @@ func distribute(c *gin.Context, mode mode.Mode) {
 		}
 	}
 
+	user, err := getRequestUser(c, mode)
+	if err != nil {
+		AbortLogWithMessage(c, http.StatusInternalServerError, err.Error(), &ErrorField{
+			Type: "invalid_request_error",
+			Code: "get_request_user_error",
+		})
+		return
+	}
+	c.Set(RequestUser, user)
+
+	metadata, err := getRequestMetadata(c, mode)
+	if err != nil {
+		AbortLogWithMessage(c, http.StatusInternalServerError, err.Error(), &ErrorField{
+			Type: "invalid_request_error",
+			Code: "get_request_metadata_error",
+		})
+		return
+	}
+	c.Set(RequestMetadata, metadata)
+
 	if err := checkGroupModelRPMAndTPM(c, group, mc); err != nil {
 		errMsg := err.Error()
 		consume.AsyncConsume(
@@ -372,6 +392,8 @@ func distribute(c *gin.Context, mode mode.Mode) {
 			0,
 			nil,
 			true,
+			user,
+			metadata,
 		)
 		AbortLogWithMessage(c, http.StatusTooManyRequests, errMsg, &ErrorField{
 			Type: "invalid_request_error",
@@ -387,6 +409,14 @@ func GetRequestModel(c *gin.Context) string {
 	return c.GetString(RequestModel)
 }
 
+func GetRequestUser(c *gin.Context) string {
+	return c.GetString(RequestUser)
+}
+
+func GetRequestMetadata(c *gin.Context) map[string]string {
+	return c.GetStringMapString(RequestMetadata)
+}
+
 func GetModelConfig(c *gin.Context) *model.ModelConfig {
 	return c.MustGet(ModelConfig).(*model.ModelConfig)
 }
@@ -421,10 +451,7 @@ func NewMetaByContext(c *gin.Context,
 	)
 }
 
-type ModelRequest struct {
-	Model string `form:"model" json:"model"`
-}
-
+// https://platform.openai.com/docs/api-reference/chat
 func getRequestModel(c *gin.Context, m mode.Mode) (string, error) {
 	path := c.Request.URL.Path
 	switch {
@@ -464,3 +491,67 @@ func GetModelFromJSON(body []byte) (string, error) {
 	}
 	return node.String()
 }
+
+// https://platform.openai.com/docs/api-reference/chat
+func getRequestUser(c *gin.Context, m mode.Mode) (string, error) {
+	switch m {
+	case mode.ChatCompletions,
+		mode.Completions,
+		mode.Embeddings,
+		mode.ImagesGenerations,
+		mode.ImagesEdits,
+		mode.AudioSpeech,
+		mode.Rerank,
+		mode.Anthropic:
+		body, err := common.GetRequestBody(c.Request)
+		if err != nil {
+			return "", fmt.Errorf("get request model failed: %w", err)
+		}
+		return GetRequestUserFromJSON(body)
+	default:
+		return "", nil
+	}
+}
+
+func GetRequestUserFromJSON(body []byte) (string, error) {
+	node, err := sonic.GetWithOptions(body, ast.SearchOptions{}, "user")
+	if err != nil {
+		return "", fmt.Errorf("get request user failed: %w", err)
+	}
+	if node.Exists() {
+		return node.String()
+	}
+	return "", nil
+}
+
+func getRequestMetadata(c *gin.Context, m mode.Mode) (map[string]string, error) {
+	switch m {
+	case mode.ChatCompletions,
+		mode.Completions,
+		mode.Embeddings,
+		mode.ImagesGenerations,
+		mode.ImagesEdits,
+		mode.AudioSpeech,
+		mode.Rerank,
+		mode.Anthropic:
+		body, err := common.GetRequestBody(c.Request)
+		if err != nil {
+			return nil, fmt.Errorf("get request metadata failed: %w", err)
+		}
+		return GetRequestMetadataFromJSON(body)
+	default:
+		return nil, nil
+	}
+}
+
+type RequestWithMetadata struct {
+	Metadata map[string]string `json:"metadata,omitempty"`
+}
+
+func GetRequestMetadataFromJSON(body []byte) (map[string]string, error) {
+	var requestWithMetadata RequestWithMetadata
+	if err := sonic.Unmarshal(body, &requestWithMetadata); err != nil {
+		return nil, fmt.Errorf("get request metadata failed: %w", err)
+	}
+	return requestWithMetadata.Metadata, nil
+}

+ 5 - 1
core/middleware/distributor_test.go

@@ -9,8 +9,12 @@ import (
 	"github.com/labring/aiproxy/core/middleware"
 )
 
+type ModelRequest struct {
+	Model string `form:"model" json:"model"`
+}
+
 func StdGetModelFromJSON(body []byte) (string, error) {
-	var modelRequest middleware.ModelRequest
+	var modelRequest ModelRequest
 	err := json.Unmarshal(body, &modelRequest)
 	if err != nil {
 		return "", err

+ 50 - 25
core/model/batch.go

@@ -7,6 +7,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/labring/aiproxy/core/common/config"
 	"github.com/labring/aiproxy/core/common/notify"
 	"github.com/shopspring/decimal"
 )
@@ -191,7 +192,7 @@ func processSummaryUpdates(wg *sync.WaitGroup) {
 	}
 }
 
-func BatchRecordConsume(
+func BatchRecordLogs(
 	requestID string,
 	requestAt time.Time,
 	retryAt time.Time,
@@ -212,31 +213,55 @@ func BatchRecordConsume(
 	usage Usage,
 	modelPrice Price,
 	amount float64,
-) error {
+	user string,
+	metadata map[string]string,
+) (err error) {
 	now := time.Now()
-	err := RecordConsumeLog(
-		requestID,
-		now,
-		requestAt,
-		retryAt,
-		firstByteAt,
-		group,
-		code,
-		channelID,
-		modelName,
-		tokenID,
-		tokenName,
-		endpoint,
-		content,
-		mode,
-		ip,
-		retryTimes,
-		requestDetail,
-		downstreamResult,
-		usage,
-		modelPrice,
-		amount,
-	)
+
+	if downstreamResult {
+		if config.GetLogStorageHours() >= 0 {
+			err = RecordConsumeLog(
+				requestID,
+				now,
+				requestAt,
+				retryAt,
+				firstByteAt,
+				group,
+				code,
+				channelID,
+				modelName,
+				tokenID,
+				tokenName,
+				endpoint,
+				content,
+				mode,
+				ip,
+				retryTimes,
+				requestDetail,
+				usage,
+				modelPrice,
+				amount,
+				user,
+				metadata,
+			)
+		}
+	} else {
+		if config.GetRetryLogStorageHours() >= 0 {
+			err = RecordRetryLog(
+				requestID,
+				now,
+				requestAt,
+				retryAt,
+				firstByteAt,
+				code,
+				channelID,
+				modelName,
+				mode,
+				retryTimes,
+				requestDetail,
+			)
+		}
+	}
 
 	amountDecimal := decimal.NewFromFloat(amount)
 

+ 71 - 102
core/model/log.go

@@ -132,7 +132,7 @@ func (u *Usage) Add(other *Usage) {
 
 type Log struct {
 	RequestDetail    *RequestDetail  `gorm:"foreignKey:LogID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;" json:"request_detail,omitempty"`
-	RequestAt        time.Time       `gorm:"index"                                                          json:"request_at"`
+	RequestAt        time.Time       `json:"request_at"`
 	RetryAt          time.Time       `json:"retry_at,omitempty"`
 	TTFBMilliseconds ZeroNullInt64   `json:"ttfb_milliseconds,omitempty"`
 	CreatedAt        time.Time       `gorm:"autoCreateTime;index"                                           json:"created_at"`
@@ -149,10 +149,12 @@ type Log struct {
 	Mode             int             `json:"mode,omitempty"`
 	IP               EmptyNullString `gorm:"index:,where:ip is not null"                                    json:"ip,omitempty"`
 	RetryTimes       ZeroNullInt64   `json:"retry_times,omitempty"`
-	DownstreamResult bool            `json:"downstream_result,omitempty"`
 	Price            Price           `gorm:"embedded"                                                       json:"price,omitempty"`
 	Usage            Usage           `gorm:"embedded"                                                       json:"usage,omitempty"`
 	UsedAmount       float64         `json:"used_amount,omitempty"`
+	// https://platform.openai.com/docs/guides/safety-best-practices#end-user-ids
+	User     EmptyNullString   `json:"user,omitempty"`
+	Metadata map[string]string `gorm:"serializer:fastjson;type:text" json:"metadata,omitempty"`
 }
 
 func CreateLogIndexes(db *gorm.DB) error {
@@ -179,20 +181,20 @@ func CreateLogIndexes(db *gorm.DB) error {
 	} else {
 		indexes = []string{
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_model_creat ON logs (model, created_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_model_creat ON logs (model, created_at DESC) INCLUDE (code)",
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_channel_creat ON logs (channel_id, created_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_channel_creat ON logs (channel_id, created_at DESC) INCLUDE (code)",
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_channel_model_creat ON logs (channel_id, model, created_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_channel_model_creat ON logs (channel_id, model, created_at DESC) INCLUDE (code)",
 
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_creat ON logs (group_id, created_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_creat ON logs (group_id, created_at DESC) INCLUDE (code)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_token_creat ON logs (group_id, token_name, created_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_token_creat ON logs (group_id, token_name, created_at DESC) INCLUDE (code)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_model_creat ON logs (group_id, model, created_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_model_creat ON logs (group_id, model, created_at DESC) INCLUDE (code)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_token_model_creat ON logs (group_id, token_name, model, created_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_token_model_creat ON logs (group_id, token_name, model, created_at DESC) INCLUDE (code)",
 		}
 	}
 
@@ -271,18 +273,27 @@ func GetGroupLogDetail(logID int, group string) (*RequestDetail, error) {
 
 const defaultCleanLogBatchSize = 5000
 
-func CleanLog(batchSize int, optimize bool) error {
-	err := cleanLog(batchSize, optimize)
+func CleanLog(batchSize int, optimize bool) (err error) {
+	err = cleanLog(batchSize)
 	if err != nil {
 		return err
 	}
-	return cleanLogDetail(batchSize, optimize)
+	err = cleanLogDetail(batchSize)
+	if err != nil {
+		return err
+	}
+
+	if optimize {
+		return optimizeLog()
+	}
+	return nil
 }
 
-func cleanLog(batchSize int, optimize bool) error {
+func cleanLog(batchSize int) error {
 	if batchSize <= 0 {
 		batchSize = defaultCleanLogBatchSize
 	}
+
 	logStorageHours := config.GetLogStorageHours()
 	if logStorageHours != 0 {
 		subQuery := LogDB.
@@ -303,57 +314,27 @@ func cleanLog(batchSize int, optimize bool) error {
 		}
 	}
 
-	logContentStorageHours := config.GetLogContentStorageHours()
-	if logContentStorageHours == 0 {
-		if optimize {
-			return optimizeLog()
-		}
-		return nil
-	}
-
-	// Find the minimum ID that meets our criteria
-	var id int64
-	err := LogDB.
-		Model(&Log{}).
-		Where(
-			"created_at < ?",
-			time.Now().Truncate(time.Hour).Add(-time.Duration(logContentStorageHours)*time.Hour),
-		).
-		Where("content IS NOT NULL").
-		Order("created_at DESC").
-		Limit(1).
-		Select("id").
-		Scan(&id).Error
-
-	if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
-		return err
-	}
-
-	if id > 0 {
-		// Process in batches based on ID range
-		err = LogDB.
-			Model(&Log{}).
-			Session(&gorm.Session{SkipDefaultTransaction: true}).
+	retryLogStorageHours := config.GetRetryLogStorageHours()
+	if retryLogStorageHours != 0 {
+		subQuery := LogDB.
+			Model(&RetryLog{}).
 			Where(
-				"id BETWEEN ? AND ? AND content IS NOT NULL",
-				id-int64(batchSize),
-				id,
+				"created_at < ?",
+				time.Now().Add(-time.Duration(retryLogStorageHours)*time.Hour),
 			).
-			UpdateColumns(map[string]any{
-				"content":           gorm.Expr("NULL"),
-				"ip":                gorm.Expr("NULL"),
-				"endpoint":          gorm.Expr("NULL"),
-				"ttfb_milliseconds": gorm.Expr("NULL"),
-			}).Error
-	}
-	if err != nil {
-		return err
-	}
-	if !optimize {
-		return nil
+			Limit(batchSize).
+			Select("id")
+
+		err := LogDB.
+			Session(&gorm.Session{SkipDefaultTransaction: true}).
+			Where("id IN (?)", subQuery).
+			Delete(&RetryLog{}).Error
+		if err != nil {
+			return err
+		}
 	}
 
-	return optimizeLog()
+	return nil
 }
 
 func optimizeLog() error {
@@ -368,7 +349,7 @@ func optimizeLog() error {
 	return nil
 }
 
-func cleanLogDetail(batchSize int, optimize bool) error {
+func cleanLogDetail(batchSize int) error {
 	detailStorageHours := config.GetLogDetailStorageHours()
 	if detailStorageHours <= 0 {
 		return nil
@@ -393,22 +374,7 @@ func cleanLogDetail(batchSize int, optimize bool) error {
 	if err != nil {
 		return err
 	}
-	if !optimize {
-		return nil
-	}
 
-	return optimizeLogDetail()
-}
-
-func optimizeLogDetail() error {
-	switch {
-	case common.UsingPostgreSQL:
-		return LogDB.Exec("VACUUM ANALYZE request_details").Error
-	case common.UsingMySQL:
-		return LogDB.Exec("OPTIMIZE TABLE request_details").Error
-	case common.UsingSQLite:
-		return LogDB.Exec("VACUUM").Error
-	}
 	return nil
 }
 
@@ -430,10 +396,11 @@ func RecordConsumeLog(
 	ip string,
 	retryTimes int,
 	requestDetail *RequestDetail,
-	downstreamResult bool,
 	usage Usage,
 	modelPrice Price,
 	amount float64,
+	user string,
+	metadata map[string]string,
 ) error {
 	if createAt.IsZero() {
 		createAt = time.Now()
@@ -462,10 +429,11 @@ func RecordConsumeLog(
 		Content:          EmptyNullString(content),
 		RetryTimes:       ZeroNullInt64(retryTimes),
 		RequestDetail:    requestDetail,
-		DownstreamResult: downstreamResult,
 		Price:            modelPrice,
 		Usage:            usage,
 		UsedAmount:       amount,
+		User:             EmptyNullString(user),
+		Metadata:         metadata,
 	}
 	return LogDB.Create(log).Error
 }
@@ -517,7 +485,7 @@ func buildGetLogsQuery(
 	codeType CodeType,
 	code int,
 	ip string,
-	resultOnly bool,
+	user string,
 ) *gorm.DB {
 	tx := LogDB.Model(&Log{})
 
@@ -552,10 +520,6 @@ func buildGetLogsQuery(
 		tx = tx.Where("created_at <= ?", endTimestamp)
 	}
 
-	if resultOnly {
-		tx = tx.Where("downstream_result = true")
-	}
-
 	switch codeType {
 	case CodeTypeSuccess:
 		tx = tx.Where("code = 200")
@@ -570,6 +534,11 @@ func buildGetLogsQuery(
 	if tokenID != 0 {
 		tx = tx.Where("token_id = ?", tokenID)
 	}
+
+	if user != "" {
+		tx = tx.Where("user = ?", user)
+	}
+
 	return tx
 }
 
@@ -587,9 +556,9 @@ func getLogs(
 	code int,
 	withBody bool,
 	ip string,
+	user string,
 	page int,
 	perPage int,
-	resultOnly bool,
 ) (int64, []*Log, error) {
 	var total int64
 	var logs []*Log
@@ -609,7 +578,7 @@ func getLogs(
 			codeType,
 			code,
 			ip,
-			resultOnly,
+			user,
 		).Count(&total).Error
 	})
 
@@ -626,7 +595,7 @@ func getLogs(
 			codeType,
 			code,
 			ip,
-			resultOnly,
+			user,
 		)
 		if withBody {
 			query = query.Preload("RequestDetail")
@@ -665,9 +634,9 @@ func GetLogs(
 	code int,
 	withBody bool,
 	ip string,
+	user string,
 	page int,
 	perPage int,
-	resultOnly bool,
 ) (*GetLogsResult, error) {
 	var total int64
 	var logs []*Log
@@ -697,9 +666,9 @@ func GetLogs(
 			code,
 			withBody,
 			ip,
+			user,
 			page,
 			perPage,
-			resultOnly,
 		)
 		return err
 	})
@@ -731,9 +700,9 @@ func GetGroupLogs(
 	code int,
 	withBody bool,
 	ip string,
+	user string,
 	page int,
 	perPage int,
-	resultOnly bool,
 ) (*GetGroupLogsResult, error) {
 	if group == "" {
 		return nil, errors.New("group is required")
@@ -763,9 +732,9 @@ func GetGroupLogs(
 			code,
 			withBody,
 			ip,
+			user,
 			page,
 			perPage,
-			resultOnly,
 		)
 		return err
 	})
@@ -809,7 +778,7 @@ func buildSearchLogsQuery(
 	codeType CodeType,
 	code int,
 	ip string,
-	resultOnly bool,
+	user string,
 ) *gorm.DB {
 	tx := LogDB.Model(&Log{})
 
@@ -844,10 +813,6 @@ func buildSearchLogsQuery(
 		tx = tx.Where("created_at <= ?", endTimestamp)
 	}
 
-	if resultOnly {
-		tx = tx.Where("downstream_result = true")
-	}
-
 	switch codeType {
 	case CodeTypeSuccess:
 		tx = tx.Where("code = 200")
@@ -863,6 +828,10 @@ func buildSearchLogsQuery(
 		tx = tx.Where("token_id = ?", tokenID)
 	}
 
+	if user != "" {
+		tx = tx.Where("user = ?", user)
+	}
+
 	// Handle keyword search for zero value fields
 	if keyword != "" {
 		var conditions []string
@@ -928,9 +897,9 @@ func searchLogs(
 	code int,
 	withBody bool,
 	ip string,
+	user string,
 	page int,
 	perPage int,
-	resultOnly bool,
 ) (int64, []*Log, error) {
 	var total int64
 	var logs []*Log
@@ -951,7 +920,7 @@ func searchLogs(
 			codeType,
 			code,
 			ip,
-			resultOnly,
+			user,
 		).Count(&total).Error
 	})
 
@@ -969,7 +938,7 @@ func searchLogs(
 			codeType,
 			code,
 			ip,
-			resultOnly,
+			user,
 		)
 
 		if withBody {
@@ -1010,9 +979,9 @@ func SearchLogs(
 	code int,
 	withBody bool,
 	ip string,
+	user string,
 	page int,
 	perPage int,
-	resultOnly bool,
 ) (*GetLogsResult, error) {
 	var total int64
 	var logs []*Log
@@ -1037,9 +1006,9 @@ func SearchLogs(
 			code,
 			withBody,
 			ip,
+			user,
 			page,
 			perPage,
-			resultOnly,
 		)
 		return err
 	})
@@ -1078,9 +1047,9 @@ func SearchGroupLogs(
 	code int,
 	withBody bool,
 	ip string,
+	user string,
 	page int,
 	perPage int,
-	resultOnly bool,
 ) (*GetGroupLogsResult, error) {
 	if group == "" {
 		return nil, errors.New("group is required")
@@ -1111,9 +1080,9 @@ func SearchGroupLogs(
 			code,
 			withBody,
 			ip,
+			user,
 			page,
 			perPage,
-			resultOnly,
 		)
 		return err
 	})

+ 1 - 1
core/model/main.go

@@ -194,7 +194,7 @@ func migrateLOGDB() error {
 	err := LogDB.AutoMigrate(
 		&Log{},
 		&RequestDetail{},
-		&Group{},
+		&RetryLog{},
 		&GroupSummary{},
 		&Summary{},
 		&ConsumeError{},

+ 4 - 4
core/model/option.go

@@ -58,7 +58,7 @@ func InitOption2DB() error {
 
 func initOptionMap() error {
 	optionMap["LogStorageHours"] = strconv.FormatInt(config.GetLogStorageHours(), 10)
-	optionMap["LogContentStorageHours"] = strconv.FormatInt(config.GetLogContentStorageHours(), 10)
+	optionMap["RetryLogStorageHours"] = strconv.FormatInt(config.GetRetryLogStorageHours(), 10)
 	optionMap["LogDetailStorageHours"] = strconv.FormatInt(config.GetLogDetailStorageHours(), 10)
 	optionMap["CleanLogBatchSize"] = strconv.FormatInt(config.GetCleanLogBatchSize(), 10)
 	optionMap["IPGroupsThreshold"] = strconv.FormatInt(config.GetIPGroupsThreshold(), 10)
@@ -202,12 +202,12 @@ func updateOption(key string, value string, isInit bool) (err error) {
 			return err
 		}
 		config.SetLogStorageHours(logStorageHours)
-	case "LogContentStorageHours":
-		logContentStorageHours, err := strconv.ParseInt(value, 10, 64)
+	case "RetryLogStorageHours":
+		retryLogStorageHours, err := strconv.ParseInt(value, 10, 64)
 		if err != nil {
 			return err
 		}
-		config.SetLogContentStorageHours(logContentStorageHours)
+		config.SetRetryLogStorageHours(retryLogStorageHours)
 	case "LogDetailStorageHours":
 		logDetailStorageHours, err := strconv.ParseInt(value, 10, 64)
 		if err != nil {

+ 97 - 0
core/model/retrylog.go

@@ -0,0 +1,97 @@
+package model
+
+import (
+	"time"
+
+	"github.com/bytedance/sonic"
+	"github.com/labring/aiproxy/core/common"
+	"github.com/labring/aiproxy/core/common/config"
+	"gorm.io/gorm"
+)
+
+type RetryLog struct {
+	RequestBody           string          `gorm:"type:text"                           json:"request_body,omitempty"`
+	ResponseBody          string          `gorm:"type:text"                           json:"response_body,omitempty"`
+	RequestBodyTruncated  bool            `json:"request_body_truncated,omitempty"`
+	ResponseBodyTruncated bool            `json:"response_body_truncated,omitempty"`
+	RequestAt             time.Time       `json:"request_at"`
+	RetryAt               time.Time       `json:"retry_at,omitempty"`
+	TTFBMilliseconds      ZeroNullInt64   `json:"ttfb_milliseconds,omitempty"`
+	CreatedAt             time.Time       `gorm:"autoCreateTime;index"                json:"created_at"`
+	Model                 string          `json:"model"`
+	RequestID             EmptyNullString `gorm:"index:,where:request_id is not null" json:"request_id"`
+	ID                    int             `gorm:"primaryKey"                          json:"id"`
+	ChannelID             int             `json:"channel,omitempty"`
+	Code                  int             `gorm:"index"                               json:"code,omitempty"`
+	Mode                  int             `json:"mode,omitempty"`
+	RetryTimes            ZeroNullInt64   `json:"retry_times,omitempty"`
+}
+
+func (r *RetryLog) BeforeSave(_ *gorm.DB) (err error) {
+	if reqMax := config.GetLogDetailRequestBodyMaxSize(); reqMax > 0 && int64(len(r.RequestBody)) > reqMax {
+		r.RequestBody = common.TruncateByRune(r.RequestBody, int(reqMax)) + "..."
+		r.RequestBodyTruncated = true
+	}
+	if respMax := config.GetLogDetailResponseBodyMaxSize(); respMax > 0 && int64(len(r.ResponseBody)) > respMax {
+		r.ResponseBody = common.TruncateByRune(r.ResponseBody, int(respMax)) + "..."
+		r.ResponseBodyTruncated = true
+	}
+	return
+}
+
+func (r *RetryLog) MarshalJSON() ([]byte, error) {
+	type Alias RetryLog
+	a := &struct {
+		*Alias
+		CreatedAt int64 `json:"created_at"`
+		RequestAt int64 `json:"request_at"`
+		RetryAt   int64 `json:"retry_at,omitempty"`
+	}{
+		Alias:     (*Alias)(r),
+		CreatedAt: r.CreatedAt.UnixMilli(),
+		RequestAt: r.RequestAt.UnixMilli(),
+	}
+	if !r.RetryAt.IsZero() {
+		a.RetryAt = r.RetryAt.UnixMilli()
+	}
+	return sonic.Marshal(a)
+}
+
+func RecordRetryLog(
+	requestID string,
+	createAt time.Time,
+	requestAt time.Time,
+	retryAt time.Time,
+	firstByteAt time.Time,
+	code int,
+	channelID int,
+	modelName string,
+	mode int,
+	retryTimes int,
+	requestDetail *RequestDetail,
+) error {
+	if createAt.IsZero() {
+		createAt = time.Now()
+	}
+	if requestAt.IsZero() {
+		requestAt = createAt
+	}
+	if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
+		firstByteAt = requestAt
+	}
+	log := &RetryLog{
+		RequestID:        EmptyNullString(requestID),
+		RequestAt:        requestAt,
+		CreatedAt:        createAt,
+		RetryAt:          retryAt,
+		TTFBMilliseconds: ZeroNullInt64(firstByteAt.Sub(requestAt).Milliseconds()),
+		Code:             code,
+		Model:            modelName,
+		Mode:             mode,
+		ChannelID:        channelID,
+		RetryTimes:       ZeroNullInt64(retryTimes),
+		RequestBody:      requestDetail.RequestBody,
+		ResponseBody:     requestDetail.ResponseBody,
+	}
+	return LogDB.Create(log).Error
+}