| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300 |
- package model
- import (
- "cmp"
- "errors"
- "fmt"
- "slices"
- "strings"
- "time"
- "github.com/bytedance/sonic"
- "github.com/labring/sealos/service/aiproxy/common"
- "github.com/labring/sealos/service/aiproxy/common/config"
- "github.com/shopspring/decimal"
- "golang.org/x/sync/errgroup"
- "gorm.io/gorm"
- )
- type RequestDetail struct {
- CreatedAt time.Time `gorm:"autoCreateTime;index" json:"-"`
- RequestBody string `gorm:"type:text" json:"request_body,omitempty"`
- ResponseBody string `gorm:"type:text" json:"response_body,omitempty"`
- RequestBodyTruncated bool `json:"request_body_truncated,omitempty"`
- ResponseBodyTruncated bool `json:"response_body_truncated,omitempty"`
- ID int `gorm:"primaryKey" json:"id"`
- LogID int `gorm:"index" json:"log_id"`
- }
- func (d *RequestDetail) BeforeSave(_ *gorm.DB) (err error) {
- if reqMax := config.GetLogDetailRequestBodyMaxSize(); reqMax > 0 && int64(len(d.RequestBody)) > reqMax {
- d.RequestBody = common.TruncateByRune(d.RequestBody, int(reqMax)) + "..."
- d.RequestBodyTruncated = true
- }
- if respMax := config.GetLogDetailResponseBodyMaxSize(); respMax > 0 && int64(len(d.ResponseBody)) > respMax {
- d.ResponseBody = common.TruncateByRune(d.ResponseBody, int(respMax)) + "..."
- d.ResponseBodyTruncated = true
- }
- return
- }
- type Log struct {
- RequestDetail *RequestDetail `gorm:"foreignKey:LogID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;" json:"request_detail,omitempty"`
- RequestAt time.Time `gorm:"index" json:"request_at"`
- TimestampTruncByDay int64 `json:"timestamp_trunc_by_day"`
- TimestampTruncByHour int64 `json:"timestamp_trunc_by_hour"`
- CreatedAt time.Time `gorm:"autoCreateTime;index" json:"created_at"`
- TokenName string `json:"token_name,omitempty"`
- Endpoint string `json:"endpoint"`
- Content string `gorm:"type:text" json:"content,omitempty"`
- GroupID string `gorm:"index" json:"group,omitempty"`
- Model string `gorm:"index" json:"model"`
- RequestID string `gorm:"index" json:"request_id"`
- Price float64 `json:"price,omitempty"`
- ID int `gorm:"primaryKey" json:"id"`
- CompletionPrice float64 `json:"completion_price,omitempty"`
- TokenID int `gorm:"index" json:"token_id,omitempty"`
- UsedAmount float64 `json:"used_amount,omitempty"`
- PromptTokens int `json:"prompt_tokens,omitempty"`
- CompletionTokens int `json:"completion_tokens,omitempty"`
- TotalTokens int `json:"total_tokens,omitempty"`
- ChannelID int `gorm:"index" json:"channel,omitempty"`
- Code int `gorm:"index" json:"code,omitempty"`
- Mode int `json:"mode,omitempty"`
- IP string `gorm:"index" json:"ip,omitempty"`
- RetryTimes int `json:"retry_times,omitempty"`
- }
- func CreateLogIndexes(db *gorm.DB) error {
- var indexes []string
- if common.UsingSQLite {
- // not support INCLUDE
- indexes = []string{
- // used by global search logs
- "CREATE INDEX IF NOT EXISTS idx_model_reqat ON logs (model, request_at)",
- // used by global search logs
- "CREATE INDEX IF NOT EXISTS idx_channel_reqat ON logs (channel_id, request_at)",
- // used by global search logs
- "CREATE INDEX IF NOT EXISTS idx_channel_model_reqat ON logs (channel_id, model, request_at)",
- // global day indexes, used by global dashboard
- "CREATE INDEX IF NOT EXISTS idx_model_reqat_truncday ON logs (model, request_at, timestamp_trunc_by_day)",
- // global hour indexes, used by global dashboard
- "CREATE INDEX IF NOT EXISTS idx_model_reqat_trunchour ON logs (model, request_at, timestamp_trunc_by_hour)",
- // used by search group logs
- "CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at)",
- // used by search group logs
- "CREATE INDEX IF NOT EXISTS idx_group_model_reqat ON logs (group_id, model, request_at)",
- // used by search group logs
- "CREATE INDEX IF NOT EXISTS idx_group_token_model_reqat ON logs (group_id, token_name, model, request_at)",
- // day indexes, used by dashboard
- "CREATE INDEX IF NOT EXISTS idx_group_reqat_truncday ON logs (group_id, request_at, timestamp_trunc_by_day)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_truncday ON logs (group_id, model, request_at, timestamp_trunc_by_day)",
- "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_truncday ON logs (group_id, token_name, request_at, timestamp_trunc_by_day)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_truncday ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_day)",
- // hour indexes, used by dashboard
- "CREATE INDEX IF NOT EXISTS idx_group_reqat_trunchour ON logs (group_id, request_at, timestamp_trunc_by_hour)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_trunchour ON logs (group_id, model, request_at, timestamp_trunc_by_hour)",
- "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_trunchour ON logs (group_id, token_name, request_at, timestamp_trunc_by_hour)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_trunchour ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_hour)",
- }
- } else {
- indexes = []string{
- // used by global search logs
- "CREATE INDEX IF NOT EXISTS idx_model_reqat ON logs (model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
- // used by global search logs
- "CREATE INDEX IF NOT EXISTS idx_channel_reqat ON logs (channel_id, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
- // used by global search logs
- "CREATE INDEX IF NOT EXISTS idx_channel_model_reqat ON logs (channel_id, model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
- // global day indexes, used by global dashboard
- "CREATE INDEX IF NOT EXISTS idx_model_reqat_truncday ON logs (model, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
- // global hour indexes, used by global dashboard
- "CREATE INDEX IF NOT EXISTS idx_model_reqat_trunchour ON logs (model, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
- // used by search group logs
- "CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
- // used by search group logs
- "CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
- // used by search group logs
- "CREATE INDEX IF NOT EXISTS idx_group_model_reqat ON logs (group_id, model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
- // used by search group logs
- "CREATE INDEX IF NOT EXISTS idx_group_token_model_reqat ON logs (group_id, token_name, model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
- // day indexes, used by dashboard
- "CREATE INDEX IF NOT EXISTS idx_group_reqat_truncday ON logs (group_id, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_truncday ON logs (group_id, model, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
- "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_truncday ON logs (group_id, token_name, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_truncday ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
- // hour indexes, used by dashboard
- "CREATE INDEX IF NOT EXISTS idx_group_reqat_trunchour ON logs (group_id, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_trunchour ON logs (group_id, model, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
- "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_trunchour ON logs (group_id, token_name, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
- "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_trunchour ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
- }
- }
- for _, index := range indexes {
- if err := db.Exec(index).Error; err != nil {
- return err
- }
- }
- return nil
- }
- const (
- contentMaxSize = 2 * 1024 // 2KB
- )
- func (l *Log) BeforeSave(_ *gorm.DB) (err error) {
- if len(l.Content) > contentMaxSize {
- l.Content = common.TruncateByRune(l.Content, contentMaxSize) + "..."
- }
- if l.TimestampTruncByDay == 0 {
- l.TimestampTruncByDay = l.RequestAt.Truncate(24 * time.Hour).Unix()
- }
- if l.TimestampTruncByHour == 0 {
- l.TimestampTruncByHour = l.RequestAt.Truncate(time.Hour).Unix()
- }
- return
- }
- func (l *Log) MarshalJSON() ([]byte, error) {
- type Alias Log
- return sonic.Marshal(&struct {
- *Alias
- CreatedAt int64 `json:"created_at"`
- RequestAt int64 `json:"request_at"`
- }{
- Alias: (*Alias)(l),
- CreatedAt: l.CreatedAt.UnixMilli(),
- RequestAt: l.RequestAt.UnixMilli(),
- })
- }
- func GetLogDetail(logID int) (*RequestDetail, error) {
- var detail RequestDetail
- err := LogDB.
- Model(&RequestDetail{}).
- Where("log_id = ?", logID).
- First(&detail).Error
- if err != nil {
- return nil, err
- }
- return &detail, nil
- }
- func GetGroupLogDetail(logID int, group string) (*RequestDetail, error) {
- if group == "" {
- return nil, errors.New("group is required")
- }
- var detail RequestDetail
- err := LogDB.
- Model(&RequestDetail{}).
- Joins("JOIN logs ON logs.id = request_details.log_id").
- Where("logs.group_id = ?", group).
- Where("log_id = ?", logID).
- First(&detail).Error
- if err != nil {
- return nil, err
- }
- return &detail, nil
- }
- const defaultCleanLogBatchSize = 1000
- func CleanLog(batchSize int) error {
- err := cleanLog(batchSize)
- if err != nil {
- return err
- }
- return cleanLogDetail(batchSize)
- }
- func cleanLog(batchSize int) error {
- logStorageHours := config.GetLogStorageHours()
- if logStorageHours <= 0 {
- return nil
- }
- if batchSize <= 0 {
- batchSize = defaultCleanLogBatchSize
- }
- return LogDB.
- Session(&gorm.Session{SkipDefaultTransaction: true}).
- Where(
- "created_at < ?",
- time.Now().Add(-time.Duration(logStorageHours)*time.Hour),
- ).
- Limit(batchSize).
- Delete(&Log{}).Error
- }
- func cleanLogDetail(batchSize int) error {
- detailStorageHours := config.GetLogDetailStorageHours()
- if detailStorageHours <= 0 {
- return nil
- }
- if batchSize <= 0 {
- batchSize = defaultCleanLogBatchSize
- }
- return LogDB.
- Session(&gorm.Session{SkipDefaultTransaction: true}).
- Where(
- "created_at < ?",
- time.Now().Add(-time.Duration(detailStorageHours)*time.Hour),
- ).
- Limit(batchSize).
- Delete(&RequestDetail{}).Error
- }
- func RecordConsumeLog(
- requestID string,
- requestAt time.Time,
- group string,
- code int,
- channelID int,
- promptTokens int,
- completionTokens int,
- modelName string,
- tokenID int,
- tokenName string,
- amount float64,
- price float64,
- completionPrice float64,
- endpoint string,
- content string,
- mode int,
- ip string,
- retryTimes int,
- requestDetail *RequestDetail,
- ) error {
- log := &Log{
- RequestID: requestID,
- RequestAt: requestAt,
- GroupID: group,
- CreatedAt: time.Now(),
- Code: code,
- PromptTokens: promptTokens,
- CompletionTokens: completionTokens,
- TotalTokens: promptTokens + completionTokens,
- TokenID: tokenID,
- TokenName: tokenName,
- Model: modelName,
- Mode: mode,
- IP: ip,
- UsedAmount: amount,
- Price: price,
- CompletionPrice: completionPrice,
- ChannelID: channelID,
- Endpoint: endpoint,
- Content: content,
- RetryTimes: retryTimes,
- RequestDetail: requestDetail,
- }
- return LogDB.Create(log).Error
- }
- func getLogOrder(order string) string {
- prefix, suffix, _ := strings.Cut(order, "-")
- switch prefix {
- case "request_at", "id", "created_at":
- switch suffix {
- case "asc":
- return prefix + " asc"
- default:
- return prefix + " desc"
- }
- default:
- return "request_at desc"
- }
- }
- type CodeType string
- const (
- CodeTypeAll CodeType = "all"
- CodeTypeSuccess CodeType = "success"
- CodeTypeError CodeType = "error"
- )
- type GetLogsResult struct {
- Logs []*Log `json:"logs"`
- Total int64 `json:"total"`
- }
- type GetGroupLogsResult struct {
- GetLogsResult
- Models []string `json:"models"`
- TokenNames []string `json:"token_names"`
- }
- func buildGetLogsQuery(
- group string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- modelName string,
- requestID string,
- tokenID int,
- tokenName string,
- channelID int,
- endpoint string,
- mode int,
- codeType CodeType,
- ip string,
- ) *gorm.DB {
- tx := LogDB.Model(&Log{})
- if group != "" {
- tx = tx.Where("group_id = ?", group)
- }
- if !startTimestamp.IsZero() && !endTimestamp.IsZero() {
- tx = tx.Where("request_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
- } else if !startTimestamp.IsZero() {
- tx = tx.Where("request_at >= ?", startTimestamp)
- } else if !endTimestamp.IsZero() {
- tx = tx.Where("request_at <= ?", endTimestamp)
- }
- if tokenName != "" {
- tx = tx.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- tx = tx.Where("model = ?", modelName)
- }
- if mode != 0 {
- tx = tx.Where("mode = ?", mode)
- }
- if requestID != "" {
- tx = tx.Where("request_id = ?", requestID)
- }
- if tokenID != 0 {
- tx = tx.Where("token_id = ?", tokenID)
- }
- if channelID != 0 {
- tx = tx.Where("channel_id = ?", channelID)
- }
- if endpoint != "" {
- tx = tx.Where("endpoint = ?", endpoint)
- }
- if ip != "" {
- tx = tx.Where("ip = ?", ip)
- }
- switch codeType {
- case CodeTypeSuccess:
- tx = tx.Where("code = 200")
- case CodeTypeError:
- tx = tx.Where("code != 200")
- }
- return tx
- }
- func getLogs(
- group string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- modelName string,
- requestID string,
- tokenID int,
- tokenName string,
- channelID int,
- endpoint string,
- order string,
- mode int,
- codeType CodeType,
- withBody bool,
- ip string,
- page int,
- perPage int,
- ) (int64, []*Log, error) {
- var total int64
- var logs []*Log
- g := new(errgroup.Group)
- g.Go(func() error {
- return buildGetLogsQuery(
- group,
- startTimestamp,
- endTimestamp,
- modelName,
- requestID,
- tokenID,
- tokenName,
- channelID,
- endpoint,
- mode,
- codeType,
- ip,
- ).Count(&total).Error
- })
- g.Go(func() error {
- query := buildGetLogsQuery(
- group,
- startTimestamp,
- endTimestamp,
- modelName,
- requestID,
- tokenID,
- tokenName,
- channelID,
- endpoint,
- mode,
- codeType,
- ip,
- )
- if withBody {
- query = query.Preload("RequestDetail")
- } else {
- query = query.Preload("RequestDetail", func(db *gorm.DB) *gorm.DB {
- return db.Select("id", "log_id")
- })
- }
- limit, offset := toLimitOffset(page, perPage)
- return query.
- Order(getLogOrder(order)).
- Limit(limit).
- Offset(offset).
- Find(&logs).Error
- })
- if err := g.Wait(); err != nil {
- return 0, nil, err
- }
- return total, logs, nil
- }
- func GetLogs(
- group string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- modelName string,
- requestID string,
- tokenID int,
- tokenName string,
- channelID int,
- endpoint string,
- order string,
- mode int,
- codeType CodeType,
- withBody bool,
- ip string,
- page int,
- perPage int,
- ) (*GetLogsResult, error) {
- var (
- total int64
- logs []*Log
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- total, logs, err = getLogs(group, startTimestamp, endTimestamp, modelName, requestID, tokenID, tokenName, channelID, endpoint, order, mode, codeType, withBody, ip, page, perPage)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- result := &GetLogsResult{
- Logs: logs,
- Total: total,
- }
- return result, nil
- }
- func GetGroupLogs(
- group string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- modelName string,
- requestID string,
- tokenID int,
- tokenName string,
- channelID int,
- endpoint string,
- order string,
- mode int,
- codeType CodeType,
- withBody bool,
- ip string,
- page int,
- perPage int,
- ) (*GetGroupLogsResult, error) {
- if group == "" {
- return nil, errors.New("group is required")
- }
- var (
- total int64
- logs []*Log
- tokenNames []string
- models []string
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- total, logs, err = getLogs(group, startTimestamp, endTimestamp, modelName, requestID, tokenID, tokenName, channelID, endpoint, order, mode, codeType, withBody, ip, page, perPage)
- return err
- })
- g.Go(func() error {
- var err error
- tokenNames, err = GetUsedTokenNames(group, startTimestamp, endTimestamp)
- return err
- })
- g.Go(func() error {
- var err error
- models, err = GetUsedModels(group, startTimestamp, endTimestamp)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- return &GetGroupLogsResult{
- GetLogsResult: GetLogsResult{
- Logs: logs,
- Total: total,
- },
- Models: models,
- TokenNames: tokenNames,
- }, nil
- }
- func buildSearchLogsQuery(
- group string,
- keyword string,
- endpoint string,
- requestID string,
- tokenID int,
- tokenName string,
- modelName string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- channelID int,
- mode int,
- codeType CodeType,
- ip string,
- ) *gorm.DB {
- tx := LogDB.Model(&Log{})
- if group != "" {
- tx = tx.Where("group_id = ?", group)
- }
- if tokenName != "" {
- tx = tx.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- tx = tx.Where("model = ?", modelName)
- }
- if !startTimestamp.IsZero() && !endTimestamp.IsZero() {
- tx = tx.Where("request_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
- } else if !startTimestamp.IsZero() {
- tx = tx.Where("request_at >= ?", startTimestamp)
- } else if !endTimestamp.IsZero() {
- tx = tx.Where("request_at <= ?", endTimestamp)
- }
- if requestID != "" {
- tx = tx.Where("request_id = ?", requestID)
- }
- if tokenID != 0 {
- tx = tx.Where("token_id = ?", tokenID)
- }
- if channelID != 0 {
- tx = tx.Where("channel_id = ?", channelID)
- }
- switch codeType {
- case CodeTypeSuccess:
- tx = tx.Where("code = 200")
- case CodeTypeError:
- tx = tx.Where("code != 200")
- }
- if ip != "" {
- tx = tx.Where("ip = ?", ip)
- }
- if mode != 0 {
- tx = tx.Where("mode = ?", mode)
- }
- if endpoint != "" {
- tx = tx.Where("endpoint = ?", endpoint)
- }
- // Handle keyword search for zero value fields
- if keyword != "" {
- var conditions []string
- var values []interface{}
- if group == "" {
- conditions = append(conditions, "group_id = ?")
- values = append(values, keyword)
- }
- if tokenName == "" {
- conditions = append(conditions, "token_name = ?")
- values = append(values, keyword)
- }
- if modelName == "" {
- conditions = append(conditions, "model = ?")
- values = append(values, keyword)
- }
- if requestID == "" {
- conditions = append(conditions, "request_id = ?")
- values = append(values, keyword)
- }
- // if num := String2Int(keyword); num != 0 {
- // if channelID == 0 {
- // conditions = append(conditions, "channel_id = ?")
- // values = append(values, num)
- // }
- // if mode != 0 {
- // conditions = append(conditions, "mode = ?")
- // values = append(values, num)
- // }
- // }
- // if ip != "" {
- // conditions = append(conditions, "ip = ?")
- // values = append(values, ip)
- // }
- // if endpoint == "" {
- // if common.UsingPostgreSQL {
- // conditions = append(conditions, "endpoint ILIKE ?")
- // } else {
- // conditions = append(conditions, "endpoint LIKE ?")
- // }
- // values = append(values, "%"+keyword+"%")
- // }
- // slow query
- // if common.UsingPostgreSQL {
- // conditions = append(conditions, "content ILIKE ?")
- // } else {
- // conditions = append(conditions, "content LIKE ?")
- // }
- // values = append(values, "%"+keyword+"%")
- if len(conditions) > 0 {
- tx = tx.Where(fmt.Sprintf("(%s)", strings.Join(conditions, " OR ")), values...)
- }
- }
- return tx
- }
- func searchLogs(
- group string,
- keyword string,
- endpoint string,
- requestID string,
- tokenID int,
- tokenName string,
- modelName string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- channelID int,
- order string,
- mode int,
- codeType CodeType,
- withBody bool,
- ip string,
- page int,
- perPage int,
- ) (int64, []*Log, error) {
- var total int64
- var logs []*Log
- g := new(errgroup.Group)
- g.Go(func() error {
- return buildSearchLogsQuery(
- group,
- keyword,
- endpoint,
- requestID,
- tokenID,
- tokenName,
- modelName,
- startTimestamp,
- endTimestamp,
- channelID,
- mode,
- codeType,
- ip,
- ).Count(&total).Error
- })
- g.Go(func() error {
- query := buildSearchLogsQuery(
- group,
- keyword,
- endpoint,
- requestID,
- tokenID,
- tokenName,
- modelName,
- startTimestamp,
- endTimestamp,
- channelID,
- mode,
- codeType,
- ip,
- )
- if withBody {
- query = query.Preload("RequestDetail")
- } else {
- query = query.Preload("RequestDetail", func(db *gorm.DB) *gorm.DB {
- return db.Select("id", "log_id")
- })
- }
- limit, offset := toLimitOffset(page, perPage)
- return query.
- Order(getLogOrder(order)).
- Limit(limit).
- Offset(offset).
- Find(&logs).Error
- })
- if err := g.Wait(); err != nil {
- return 0, nil, err
- }
- return total, logs, nil
- }
- func SearchLogs(
- group string,
- keyword string,
- endpoint string,
- requestID string,
- tokenID int,
- tokenName string,
- modelName string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- channelID int,
- order string,
- mode int,
- codeType CodeType,
- withBody bool,
- ip string,
- page int,
- perPage int,
- ) (*GetLogsResult, error) {
- var (
- total int64
- logs []*Log
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- total, logs, err = searchLogs(group, keyword, endpoint, requestID, tokenID, tokenName, modelName, startTimestamp, endTimestamp, channelID, order, mode, codeType, withBody, ip, page, perPage)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- result := &GetLogsResult{
- Logs: logs,
- Total: total,
- }
- return result, nil
- }
- func SearchGroupLogs(
- group string,
- keyword string,
- endpoint string,
- requestID string,
- tokenID int,
- tokenName string,
- modelName string,
- startTimestamp time.Time,
- endTimestamp time.Time,
- channelID int,
- order string,
- mode int,
- codeType CodeType,
- withBody bool,
- ip string,
- page int,
- perPage int,
- ) (*GetGroupLogsResult, error) {
- if group == "" {
- return nil, errors.New("group is required")
- }
- var (
- total int64
- logs []*Log
- tokenNames []string
- models []string
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- total, logs, err = searchLogs(group, keyword, endpoint, requestID, tokenID, tokenName, modelName, startTimestamp, endTimestamp, channelID, order, mode, codeType, withBody, ip, page, perPage)
- return err
- })
- g.Go(func() error {
- var err error
- tokenNames, err = GetUsedTokenNames(group, startTimestamp, endTimestamp)
- return err
- })
- g.Go(func() error {
- var err error
- models, err = GetUsedModels(group, startTimestamp, endTimestamp)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- result := &GetGroupLogsResult{
- GetLogsResult: GetLogsResult{
- Logs: logs,
- Total: total,
- },
- Models: models,
- TokenNames: tokenNames,
- }
- return result, nil
- }
- func DeleteOldLog(timestamp time.Time) (int64, error) {
- result := LogDB.Where("request_at < ?", timestamp).Delete(&Log{})
- return result.RowsAffected, result.Error
- }
- func DeleteGroupLogs(groupID string) (int64, error) {
- if groupID == "" {
- return 0, errors.New("group is required")
- }
- result := LogDB.Where("group_id = ?", groupID).Delete(&Log{})
- return result.RowsAffected, result.Error
- }
- type ChartData struct {
- Timestamp int64 `json:"timestamp"`
- RequestCount int64 `json:"request_count"`
- UsedAmount float64 `json:"used_amount"`
- ExceptionCount int64 `json:"exception_count"`
- }
- type DashboardResponse struct {
- ChartData []*ChartData `json:"chart_data"`
- TotalCount int64 `json:"total_count"`
- ExceptionCount int64 `json:"exception_count"`
- UsedAmount float64 `json:"used_amount"`
- RPM int64 `json:"rpm"`
- TPM int64 `json:"tpm"`
- }
- type GroupDashboardResponse struct {
- DashboardResponse
- Models []string `json:"models"`
- TokenNames []string `json:"token_names"`
- }
- type TimeSpanType string
- const (
- TimeSpanDay TimeSpanType = "day"
- TimeSpanHour TimeSpanType = "hour"
- )
- func getTimeSpanFormat(t TimeSpanType) string {
- switch t {
- case TimeSpanDay:
- return "timestamp_trunc_by_day"
- case TimeSpanHour:
- return "timestamp_trunc_by_hour"
- default:
- return ""
- }
- }
- func getChartData(group string, start, end time.Time, tokenName, modelName string, timeSpan TimeSpanType) ([]*ChartData, error) {
- var chartData []*ChartData
- timeSpanFormat := getTimeSpanFormat(timeSpan)
- if timeSpanFormat == "" {
- return nil, errors.New("unsupported time format")
- }
- query := LogDB.Table("logs").
- Select(timeSpanFormat + " as timestamp, count(*) as request_count, sum(used_amount) as used_amount, sum(case when code != 200 then 1 else 0 end) as exception_count").
- Group("timestamp").
- Order("timestamp ASC")
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if !start.IsZero() && !end.IsZero() {
- query = query.Where("request_at BETWEEN ? AND ?", start, end)
- } else if !start.IsZero() {
- query = query.Where("request_at >= ?", start)
- } else if !end.IsZero() {
- query = query.Where("request_at <= ?", end)
- }
- if tokenName != "" {
- query = query.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- err := query.Scan(&chartData).Error
- return chartData, err
- }
- func GetUsedModels(group string, start, end time.Time) ([]string, error) {
- return getLogGroupByValues[string]("model", group, start, end)
- }
- func GetUsedTokenNames(group string, start, end time.Time) ([]string, error) {
- if group == "" {
- return nil, errors.New("group is required")
- }
- return getLogGroupByValues[string]("token_name", group, start, end)
- }
- //nolint:unused
- func getLogDistinctValues[T cmp.Ordered](field string, group string, start, end time.Time) ([]T, error) {
- var values []T
- query := LogDB.
- Model(&Log{})
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if !start.IsZero() && !end.IsZero() {
- query = query.Where("request_at BETWEEN ? AND ?", start, end)
- } else if !start.IsZero() {
- query = query.Where("request_at >= ?", start)
- } else if !end.IsZero() {
- query = query.Where("request_at <= ?", end)
- }
- err := query.
- Distinct(field).
- Pluck(field, &values).Error
- if err != nil {
- return nil, err
- }
- slices.Sort(values)
- return values, nil
- }
- func getLogGroupByValues[T cmp.Ordered](field string, group string, start, end time.Time) ([]T, error) {
- var values []T
- query := LogDB.
- Model(&Log{})
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if !start.IsZero() && !end.IsZero() {
- query = query.Where("request_at BETWEEN ? AND ?", start, end)
- } else if !start.IsZero() {
- query = query.Where("request_at >= ?", start)
- } else if !end.IsZero() {
- query = query.Where("request_at <= ?", end)
- }
- err := query.
- Select(field).
- Group(field).
- Pluck(field, &values).Error
- if err != nil {
- return nil, err
- }
- slices.Sort(values)
- return values, nil
- }
- func sumTotalCount(chartData []*ChartData) int64 {
- var count int64
- for _, data := range chartData {
- count += data.RequestCount
- }
- return count
- }
- func sumExceptionCount(chartData []*ChartData) int64 {
- var count int64
- for _, data := range chartData {
- count += data.ExceptionCount
- }
- return count
- }
- func sumUsedAmount(chartData []*ChartData) float64 {
- var amount decimal.Decimal
- for _, data := range chartData {
- amount = amount.Add(decimal.NewFromFloat(data.UsedAmount))
- }
- return amount.InexactFloat64()
- }
- func getRPM(group string, end time.Time, tokenName, modelName string) (int64, error) {
- query := LogDB.Model(&Log{})
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if tokenName != "" {
- query = query.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- var count int64
- err := query.
- Where("request_at BETWEEN ? AND ?", end.Add(-time.Minute), end).
- Count(&count).Error
- return count, err
- }
- func getTPM(group string, end time.Time, tokenName, modelName string) (int64, error) {
- query := LogDB.Model(&Log{}).
- Select("COALESCE(SUM(total_tokens), 0)").
- Where("request_at >= ? AND request_at <= ?", end.Add(-time.Minute), end)
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if tokenName != "" {
- query = query.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- var tpm int64
- err := query.Scan(&tpm).Error
- return tpm, err
- }
- func GetDashboardData(start, end time.Time, modelName string, timeSpan TimeSpanType) (*DashboardResponse, error) {
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- var (
- chartData []*ChartData
- rpm int64
- tpm int64
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- chartData, err = getChartData("", start, end, "", modelName, timeSpan)
- return err
- })
- g.Go(func() error {
- var err error
- rpm, err = getRPM("", end, "", modelName)
- return err
- })
- g.Go(func() error {
- var err error
- tpm, err = getTPM("", end, "", modelName)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- totalCount := sumTotalCount(chartData)
- exceptionCount := sumExceptionCount(chartData)
- usedAmount := sumUsedAmount(chartData)
- return &DashboardResponse{
- ChartData: chartData,
- TotalCount: totalCount,
- ExceptionCount: exceptionCount,
- UsedAmount: usedAmount,
- RPM: rpm,
- TPM: tpm,
- }, nil
- }
- func GetGroupDashboardData(group string, start, end time.Time, tokenName string, modelName string, timeSpan TimeSpanType) (*GroupDashboardResponse, error) {
- if group == "" {
- return nil, errors.New("group is required")
- }
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- var (
- chartData []*ChartData
- tokenNames []string
- models []string
- rpm int64
- tpm int64
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- chartData, err = getChartData(group, start, end, tokenName, modelName, timeSpan)
- return err
- })
- g.Go(func() error {
- var err error
- tokenNames, err = GetUsedTokenNames(group, start, end)
- return err
- })
- g.Go(func() error {
- var err error
- models, err = GetUsedModels(group, start, end)
- return err
- })
- g.Go(func() error {
- var err error
- rpm, err = getRPM(group, end, tokenName, modelName)
- return err
- })
- g.Go(func() error {
- var err error
- tpm, err = getTPM(group, end, tokenName, modelName)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- totalCount := sumTotalCount(chartData)
- exceptionCount := sumExceptionCount(chartData)
- usedAmount := sumUsedAmount(chartData)
- return &GroupDashboardResponse{
- DashboardResponse: DashboardResponse{
- ChartData: chartData,
- TotalCount: totalCount,
- ExceptionCount: exceptionCount,
- UsedAmount: usedAmount,
- RPM: rpm,
- TPM: tpm,
- },
- Models: models,
- TokenNames: tokenNames,
- }, nil
- }
- func GetGroupLastRequestTime(group string) (time.Time, error) {
- if group == "" {
- return time.Time{}, errors.New("group is required")
- }
- var log Log
- err := LogDB.Model(&Log{}).Where("group_id = ?", group).Order("request_at desc").First(&log).Error
- return log.RequestAt, err
- }
- func GetTokenLastRequestTime(id int) (time.Time, error) {
- var log Log
- tx := LogDB.Model(&Log{})
- err := tx.Where("token_id = ?", id).Order("request_at desc").First(&log).Error
- return log.RequestAt, err
- }
- func GetGroupModelTPM(group string, model string) (int64, error) {
- end := time.Now()
- start := end.Add(-time.Minute)
- var tpm int64
- err := LogDB.
- Model(&Log{}).
- Where("group_id = ? AND request_at >= ? AND request_at <= ? AND model = ?", group, start, end, model).
- Select("COALESCE(SUM(total_tokens), 0)").
- Scan(&tpm).Error
- return tpm, err
- }
- type ModelCostRank struct {
- Model string `json:"model"`
- UsedAmount float64 `json:"used_amount"`
- Total int64 `json:"total"`
- }
- func GetModelCostRank(group string, start, end time.Time) ([]*ModelCostRank, error) {
- var ranks []*ModelCostRank
- query := LogDB.Model(&Log{}).
- Select("model, SUM(used_amount) as used_amount, COUNT(*) as total").
- Group("model").
- Order("used_amount DESC")
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if !start.IsZero() && !end.IsZero() {
- query = query.Where("request_at BETWEEN ? AND ?", start, end)
- } else if !start.IsZero() {
- query = query.Where("request_at >= ?", start)
- } else if !end.IsZero() {
- query = query.Where("request_at <= ?", end)
- }
- err := query.Scan(&ranks).Error
- if err != nil {
- return nil, err
- }
- return ranks, nil
- }
|