Explorar el Código

feat: log and summary from log create at (#177)

* feat: log and summary from log create at

* fix: skip inline image test
zijiren hace 7 meses
padre
commit
c2d63405b9
Se han modificado 4 ficheros con 63 adiciones y 56 borrados
  1. 8 6
      core/model/batch.go
  2. 41 38
      core/model/log.go
  3. 8 5
      core/model/summary.go
  4. 6 7
      core/relay/adaptor/doc2x/html2md_test.go

+ 8 - 6
core/model/batch.go

@@ -213,8 +213,10 @@ func BatchRecordConsume(
 	modelPrice Price,
 	amount float64,
 ) error {
+	now := time.Now()
 	err := RecordConsumeLog(
 		requestID,
+		now,
 		requestAt,
 		retryAt,
 		firstByteAt,
@@ -252,10 +254,10 @@ func BatchRecordConsume(
 	updateTokenData(tokenID, amount, amountDecimal)
 
 	if channelID != 0 {
-		updateSummaryData(channelID, modelName, requestAt, code, amountDecimal, usage)
+		updateSummaryData(channelID, modelName, now, code, amountDecimal, usage)
 	}
 
-	updateGroupSummaryData(group, tokenName, modelName, requestAt, code, amountDecimal, usage)
+	updateGroupSummaryData(group, tokenName, modelName, now, code, amountDecimal, usage)
 
 	return err
 }
@@ -302,12 +304,12 @@ func updateTokenData(tokenID int, amount float64, amountDecimal decimal.Decimal)
 	}
 }
 
-func updateGroupSummaryData(group string, tokenName string, modelName string, requestAt time.Time, code int, amountDecimal decimal.Decimal, usage Usage) {
+func updateGroupSummaryData(group string, tokenName string, modelName string, createAt time.Time, code int, amountDecimal decimal.Decimal, usage Usage) {
 	groupUnique := GroupSummaryUnique{
 		GroupID:       group,
 		TokenName:     tokenName,
 		Model:         modelName,
-		HourTimestamp: requestAt.Truncate(time.Hour).Unix(),
+		HourTimestamp: createAt.Truncate(time.Hour).Unix(),
 	}
 
 	groupSummaryKey := groupSummaryUniqueKey(groupUnique)
@@ -329,11 +331,11 @@ func updateGroupSummaryData(group string, tokenName string, modelName string, re
 	}
 }
 
-func updateSummaryData(channelID int, modelName string, requestAt time.Time, code int, amountDecimal decimal.Decimal, usage Usage) {
+func updateSummaryData(channelID int, modelName string, createAt time.Time, code int, amountDecimal decimal.Decimal, usage Usage) {
 	summaryUnique := SummaryUnique{
 		ChannelID:     channelID,
 		Model:         modelName,
-		HourTimestamp: requestAt.Truncate(time.Hour).Unix(),
+		HourTimestamp: createAt.Truncate(time.Hour).Unix(),
 	}
 
 	summaryKey := summaryUniqueKey(summaryUnique)

+ 41 - 38
core/model/log.go

@@ -161,38 +161,38 @@ func CreateLogIndexes(db *gorm.DB) error {
 		// not support INCLUDE
 		indexes = []string{
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_model_reqat ON logs (model, request_at DESC)",
+			"CREATE INDEX IF NOT EXISTS idx_model_creat ON logs (model, created_at DESC)",
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_channel_reqat ON logs (channel_id, request_at DESC)",
+			"CREATE INDEX IF NOT EXISTS idx_channel_creat ON logs (channel_id, created_at DESC)",
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_channel_model_reqat ON logs (channel_id, model, request_at DESC)",
+			"CREATE INDEX IF NOT EXISTS idx_channel_model_creat ON logs (channel_id, model, created_at DESC)",
 
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_reqat ON logs (group_id, request_at DESC)",
+			"CREATE INDEX IF NOT EXISTS idx_group_creat ON logs (group_id, created_at DESC)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at DESC)",
+			"CREATE INDEX IF NOT EXISTS idx_group_token_creat ON logs (group_id, token_name, created_at DESC)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_model_reqat ON logs (group_id, model, request_at DESC)",
+			"CREATE INDEX IF NOT EXISTS idx_group_model_creat ON logs (group_id, model, created_at DESC)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_token_model_reqat ON logs (group_id, token_name, model, request_at DESC)",
+			"CREATE INDEX IF NOT EXISTS idx_group_token_model_creat ON logs (group_id, token_name, model, created_at DESC)",
 		}
 	} else {
 		indexes = []string{
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_model_reqat ON logs (model, request_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_model_creat ON logs (model, created_at DESC) INCLUDE (code, downstream_result)",
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_channel_reqat ON logs (channel_id, request_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_channel_creat ON logs (channel_id, created_at DESC) INCLUDE (code, downstream_result)",
 			// used by global search logs
-			"CREATE INDEX IF NOT EXISTS idx_channel_model_reqat ON logs (channel_id, model, request_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_channel_model_creat ON logs (channel_id, model, created_at DESC) INCLUDE (code, downstream_result)",
 
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_reqat ON logs (group_id, request_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_creat ON logs (group_id, created_at DESC) INCLUDE (code, downstream_result)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_token_creat ON logs (group_id, token_name, created_at DESC) INCLUDE (code, downstream_result)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_model_reqat ON logs (group_id, model, request_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_model_creat ON logs (group_id, model, created_at DESC) INCLUDE (code, downstream_result)",
 			// used by search group logs
-			"CREATE INDEX IF NOT EXISTS idx_group_token_model_reqat ON logs (group_id, token_name, model, request_at DESC) INCLUDE (code, downstream_result)",
+			"CREATE INDEX IF NOT EXISTS idx_group_token_model_creat ON logs (group_id, token_name, model, created_at DESC) INCLUDE (code, downstream_result)",
 		}
 	}
 
@@ -414,6 +414,7 @@ func optimizeLogDetail() error {
 
 func RecordConsumeLog(
 	requestID string,
+	createAt time.Time,
 	requestAt time.Time,
 	retryAt time.Time,
 	firstByteAt time.Time,
@@ -434,9 +435,11 @@ func RecordConsumeLog(
 	modelPrice Price,
 	amount float64,
 ) error {
-	now := time.Now()
+	if createAt.IsZero() {
+		createAt = time.Now()
+	}
 	if requestAt.IsZero() {
-		requestAt = now
+		requestAt = createAt
 	}
 	if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
 		firstByteAt = requestAt
@@ -444,7 +447,7 @@ func RecordConsumeLog(
 	log := &Log{
 		RequestID:        EmptyNullString(requestID),
 		RequestAt:        requestAt,
-		CreatedAt:        now,
+		CreatedAt:        createAt,
 		RetryAt:          retryAt,
 		TTFBMilliseconds: ZeroNullInt64(firstByteAt.Sub(requestAt).Milliseconds()),
 		GroupID:          group,
@@ -470,7 +473,7 @@ func RecordConsumeLog(
 func getLogOrder(order string) string {
 	prefix, suffix, _ := strings.Cut(order, "-")
 	switch prefix {
-	case "request_at", "id", "created_at":
+	case "request_at", "id":
 		switch suffix {
 		case "asc":
 			return prefix + " asc"
@@ -478,7 +481,7 @@ func getLogOrder(order string) string {
 			return prefix + " desc"
 		}
 	default:
-		return "request_at desc"
+		return "created_at desc"
 	}
 }
 
@@ -542,11 +545,11 @@ func buildGetLogsQuery(
 
 	switch {
 	case !startTimestamp.IsZero() && !endTimestamp.IsZero():
-		tx = tx.Where("request_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
+		tx = tx.Where("created_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
 	case !startTimestamp.IsZero():
-		tx = tx.Where("request_at >= ?", startTimestamp)
+		tx = tx.Where("created_at >= ?", startTimestamp)
 	case !endTimestamp.IsZero():
-		tx = tx.Where("request_at <= ?", endTimestamp)
+		tx = tx.Where("created_at <= ?", endTimestamp)
 	}
 
 	if resultOnly {
@@ -775,7 +778,7 @@ func GetGroupLogs(
 
 	g.Go(func() error {
 		var err error
-		models, err = GetUsedModels(group, startTimestamp, endTimestamp)
+		models, err = GetUsedModels(group, tokenName, startTimestamp, endTimestamp)
 		return err
 	})
 
@@ -834,11 +837,11 @@ func buildSearchLogsQuery(
 
 	switch {
 	case !startTimestamp.IsZero() && !endTimestamp.IsZero():
-		tx = tx.Where("request_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
+		tx = tx.Where("created_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
 	case !startTimestamp.IsZero():
-		tx = tx.Where("request_at >= ?", startTimestamp)
+		tx = tx.Where("created_at >= ?", startTimestamp)
 	case !endTimestamp.IsZero():
-		tx = tx.Where("request_at <= ?", endTimestamp)
+		tx = tx.Where("created_at <= ?", endTimestamp)
 	}
 
 	if resultOnly {
@@ -1123,7 +1126,7 @@ func SearchGroupLogs(
 
 	g.Go(func() error {
 		var err error
-		models, err = GetUsedModels(group, startTimestamp, endTimestamp)
+		models, err = GetUsedModels(group, tokenName, startTimestamp, endTimestamp)
 		return err
 	})
 
@@ -1144,7 +1147,7 @@ func SearchGroupLogs(
 }
 
 func DeleteOldLog(timestamp time.Time) (int64, error) {
-	result := LogDB.Where("request_at < ?", timestamp).Delete(&Log{})
+	result := LogDB.Where("created_at < ?", timestamp).Delete(&Log{})
 	return result.RowsAffected, result.Error
 }
 
@@ -1273,11 +1276,11 @@ func getLogGroupByValuesFromLog[T cmp.Ordered](field string, group string, start
 
 	switch {
 	case !start.IsZero() && !end.IsZero():
-		query = query.Where("request_at BETWEEN ? AND ?", start, end)
+		query = query.Where("created_at BETWEEN ? AND ?", start, end)
 	case !start.IsZero():
-		query = query.Where("request_at >= ?", start)
+		query = query.Where("created_at >= ?", start)
 	case !end.IsZero():
-		query = query.Where("request_at <= ?", end)
+		query = query.Where("created_at <= ?", end)
 	}
 
 	err := query.
@@ -1332,7 +1335,7 @@ func GetRPM(group string, end time.Time, tokenName, modelName string, channelID
 
 	var count int64
 	err := query.
-		Where("request_at BETWEEN ? AND ?", end.Add(-time.Minute), end).
+		Where("created_at BETWEEN ? AND ?", end.Add(-time.Minute), end).
 		Count(&count).Error
 	return count, err
 }
@@ -1358,7 +1361,7 @@ func GetTPM(group string, end time.Time, tokenName, modelName string, channelID
 
 	var tpm int64
 	err := query.
-		Where("request_at BETWEEN ? AND ?", end.Add(-time.Minute), end).
+		Where("created_at BETWEEN ? AND ?", end.Add(-time.Minute), end).
 		Scan(&tpm).Error
 	return tpm, err
 }
@@ -1469,7 +1472,7 @@ func GetGroupDashboardData(
 
 	g.Go(func() error {
 		var err error
-		models, err = GetUsedModels(group, start, end)
+		models, err = GetUsedModels(group, tokenName, start, end)
 		return err
 	})
 
@@ -1508,7 +1511,7 @@ func GetGroupModelTPM(group string, model string) (int64, error) {
 	var tpm int64
 	err := LogDB.
 		Model(&Log{}).
-		Where("group_id = ? AND request_at >= ? AND request_at <= ? AND model = ?", group, start, end, model).
+		Where("group_id = ? AND created_at >= ? AND created_at <= ? AND model = ?", group, start, end, model).
 		Select("COALESCE(SUM(total_tokens), 0)").
 		Scan(&tpm).Error
 	return tpm, err
@@ -1546,11 +1549,11 @@ func GetIPGroups(threshold int, start, end time.Time) (map[string][]string, erro
 
 	switch {
 	case !start.IsZero() && !end.IsZero():
-		db = db.Where("request_at BETWEEN ? AND ?", start, end)
+		db = db.Where("created_at BETWEEN ? AND ?", start, end)
 	case !start.IsZero():
-		db = db.Where("request_at >= ?", start)
+		db = db.Where("created_at >= ?", start)
 	case !end.IsZero():
-		db = db.Where("request_at <= ?", end)
+		db = db.Where("created_at <= ?", end)
 	}
 	db.Where("ip IS NOT NULL AND ip != '' AND group_id != ''")
 

+ 8 - 5
core/model/summary.go

@@ -213,18 +213,18 @@ func GetUsedChannels(group string, start, end time.Time) ([]int, error) {
 	if group != "*" {
 		return []int{}, nil
 	}
-	return getLogGroupByValues[int]("channel_id", group, start, end)
+	return getLogGroupByValues[int]("channel_id", group, "", start, end)
 }
 
-func GetUsedModels(group string, start, end time.Time) ([]string, error) {
-	return getLogGroupByValues[string]("model", group, start, end)
+func GetUsedModels(group string, tokenName string, start, end time.Time) ([]string, error) {
+	return getLogGroupByValues[string]("model", group, tokenName, start, end)
 }
 
 func GetUsedTokenNames(group string, start, end time.Time) ([]string, error) {
-	return getLogGroupByValues[string]("token_name", group, start, end)
+	return getLogGroupByValues[string]("token_name", group, "", start, end)
 }
 
-func getLogGroupByValues[T cmp.Ordered](field string, group string, start, end time.Time) ([]T, error) {
+func getLogGroupByValues[T cmp.Ordered](field string, group string, tokenName string, start, end time.Time) ([]T, error) {
 	type Result struct {
 		Value        T
 		UsedAmount   float64
@@ -241,6 +241,9 @@ func getLogGroupByValues[T cmp.Ordered](field string, group string, start, end t
 		query = LogDB.
 			Model(&GroupSummary{}).
 			Where("group_id = ?", group)
+		if tokenName != "" {
+			query = query.Where("token_name = ?", tokenName)
+		}
 	}
 
 	switch {

+ 6 - 7
core/relay/adaptor/doc2x/html2md_test.go

@@ -1,7 +1,6 @@
 package doc2x_test
 
 import (
-	"context"
 	"testing"
 
 	"github.com/labring/aiproxy/core/relay/adaptor/doc2x"
@@ -52,10 +51,10 @@ func TestHTMLTable2Md(t *testing.T) {
 	}
 }
 
-var htmlImage = `<img src="https://cdn.noedgeai.com/01956426-b164-730d-a1fe-8be8972145d6_0.jpg?x=258&y=694&w=1132&h=826"/>`
+// var htmlImage = `<img src="https://cdn.noedgeai.com/01956426-b164-730d-a1fe-8be8972145d6_0.jpg?x=258&y=694&w=1132&h=826"/>`
 
-func TestInlineMdImage(t *testing.T) {
-	t.Parallel()
-	result := doc2x.InlineMdImage(context.Background(), htmlImage)
-	t.Log(result)
-}
+// func TestInlineMdImage(t *testing.T) {
+// 	t.Parallel()
+// 	result := doc2x.InlineMdImage(context.Background(), htmlImage)
+// 	t.Log(result)
+// }