| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208 |
- package model
- import (
- "cmp"
- "errors"
- "slices"
- "time"
- "github.com/shopspring/decimal"
- "golang.org/x/sync/errgroup"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
- )
- type SummaryMinute struct {
- ID int `gorm:"primaryKey"`
- Unique SummaryMinuteUnique `gorm:"embedded"`
- Data SummaryData `gorm:"embedded"`
- }
- type SummaryMinuteUnique struct {
- ChannelID int `gorm:"not null;uniqueIndex:idx_summary_minute_unique,priority:1"`
- Model string `gorm:"size:64;not null;uniqueIndex:idx_summary_minute_unique,priority:2"`
- MinuteTimestamp int64 `gorm:"not null;uniqueIndex:idx_summary_minute_unique,priority:3,sort:desc"`
- }
- func (l *SummaryMinute) BeforeCreate(_ *gorm.DB) (err error) {
- if l.Unique.ChannelID == 0 {
- return errors.New("channel id is required")
- }
- if l.Unique.Model == "" {
- return errors.New("model is required")
- }
- if l.Unique.MinuteTimestamp == 0 {
- return errors.New("minute timestamp is required")
- }
- if err := validateMinuteTimestamp(l.Unique.MinuteTimestamp); err != nil {
- return err
- }
- return err
- }
- var minuteTimestampDivisor = int64(time.Minute.Seconds())
- func validateMinuteTimestamp(minuteTimestamp int64) error {
- if minuteTimestamp%minuteTimestampDivisor != 0 {
- return errors.New("minute timestamp must be an exact minute")
- }
- return nil
- }
- func CreateSummaryMinuteIndexs(db *gorm.DB) error {
- indexes := []string{
- "CREATE INDEX IF NOT EXISTS idx_summary_minute_channel_minute ON summary_minutes (channel_id, minute_timestamp DESC)",
- "CREATE INDEX IF NOT EXISTS idx_summary_minute_model_minute ON summary_minutes (model, minute_timestamp DESC)",
- }
- for _, index := range indexes {
- if err := db.Exec(index).Error; err != nil {
- return err
- }
- }
- return nil
- }
- func UpsertSummaryMinute(unique SummaryMinuteUnique, data SummaryData) error {
- err := validateMinuteTimestamp(unique.MinuteTimestamp)
- if err != nil {
- return err
- }
- for range 3 {
- result := LogDB.
- Model(&SummaryMinute{}).
- Where(
- "channel_id = ? AND model = ? AND minute_timestamp = ?",
- unique.ChannelID,
- unique.Model,
- unique.MinuteTimestamp,
- ).
- Updates(data.buildUpdateData("summary_minutes"))
- err = result.Error
- if err != nil {
- return err
- }
- if result.RowsAffected > 0 {
- return nil
- }
- err = createSummaryMinute(unique, data)
- if err == nil {
- return nil
- }
- if !errors.Is(err, gorm.ErrDuplicatedKey) {
- return err
- }
- }
- return err
- }
- func createSummaryMinute(unique SummaryMinuteUnique, data SummaryData) error {
- return LogDB.
- Clauses(clause.OnConflict{
- Columns: []clause.Column{
- {Name: "channel_id"},
- {Name: "model"},
- {Name: "minute_timestamp"},
- },
- DoUpdates: clause.Assignments(data.buildUpdateData("summary_minutes")),
- }).
- Create(&SummaryMinute{
- Unique: unique,
- Data: data,
- }).Error
- }
- func getChartDataMinute(
- start, end time.Time,
- channelID int,
- modelName string,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) ([]ChartData, error) {
- query := LogDB.Model(&SummaryMinute{})
- if channelID != 0 {
- query = query.Where("channel_id = ?", channelID)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("minute_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("minute_timestamp <= ?", end.Unix())
- }
- // Only include max metrics when we have specific channel and model
- const selectFields = "minute_timestamp as timestamp, sum(used_amount) as used_amount, " +
- "sum(request_count) as request_count, sum(retry_count) as retry_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
- "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
- "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, sum(output_tokens) as output_tokens, " +
- "sum(cached_tokens) as cached_tokens, sum(cache_creation_tokens) as cache_creation_tokens, " +
- "sum(total_tokens) as total_tokens, sum(web_search_count) as web_search_count"
- query = query.
- Select(selectFields).
- Group("timestamp")
- var chartData []ChartData
- err := query.Find(&chartData).Error
- if err != nil {
- return nil, err
- }
- if len(chartData) > 0 && timeSpan != TimeSpanMinute {
- chartData = aggregateDataToSpan(chartData, timeSpan, timezone)
- }
- slices.SortFunc(chartData, func(a, b ChartData) int {
- return cmp.Compare(a.Timestamp, b.Timestamp)
- })
- return chartData, nil
- }
- func getGroupChartDataMinute(
- group string,
- start, end time.Time,
- tokenName, modelName string,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) ([]ChartData, error) {
- query := LogDB.Model(&GroupSummaryMinute{})
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if tokenName != "" {
- query = query.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("minute_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("minute_timestamp <= ?", end.Unix())
- }
- // Only include max metrics when we have specific channel and model
- const selectFields = "minute_timestamp as timestamp, sum(used_amount) as used_amount, " +
- "sum(request_count) as request_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
- "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
- "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, sum(output_tokens) as output_tokens, " +
- "sum(cached_tokens) as cached_tokens, sum(cache_creation_tokens) as cache_creation_tokens, " +
- "sum(total_tokens) as total_tokens, sum(web_search_count) as web_search_count"
- query = query.
- Select(selectFields).
- Group("timestamp")
- var chartData []ChartData
- err := query.Find(&chartData).Error
- if err != nil {
- return nil, err
- }
- if len(chartData) > 0 && timeSpan != TimeSpanMinute {
- chartData = aggregateDataToSpan(chartData, timeSpan, timezone)
- }
- slices.SortFunc(chartData, func(a, b ChartData) int {
- return cmp.Compare(a.Timestamp, b.Timestamp)
- })
- return chartData, nil
- }
- func GetUsedChannelsMinute(start, end time.Time) ([]int, error) {
- return getLogGroupByValuesMinute[int]("channel_id", start, end)
- }
- func GetUsedModelsMinute(start, end time.Time) ([]string, error) {
- return getLogGroupByValuesMinute[string]("model", start, end)
- }
- func GetGroupUsedModelsMinute(group, tokenName string, start, end time.Time) ([]string, error) {
- return getGroupLogGroupByValuesMinute[string]("model", group, tokenName, start, end)
- }
- func GetGroupUsedTokenNamesMinute(group string, start, end time.Time) ([]string, error) {
- return getGroupLogGroupByValuesMinute[string]("token_name", group, "", start, end)
- }
- func getLogGroupByValuesMinute[T cmp.Ordered](
- field string,
- start, end time.Time,
- ) ([]T, error) {
- type Result struct {
- Value T
- UsedAmount float64
- RequestCount int64
- }
- var results []Result
- var query *gorm.DB
- query = LogDB.
- Model(&SummaryMinute{})
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("minute_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("minute_timestamp <= ?", end.Unix())
- }
- err := query.
- Select(
- field + " as value, SUM(request_count) as request_count, SUM(used_amount) as used_amount",
- ).
- Group(field).
- Find(&results).Error
- if err != nil {
- return nil, err
- }
- slices.SortFunc(results, func(a, b Result) int {
- if a.UsedAmount != b.UsedAmount {
- return cmp.Compare(b.UsedAmount, a.UsedAmount)
- }
- if a.RequestCount != b.RequestCount {
- return cmp.Compare(b.RequestCount, a.RequestCount)
- }
- return cmp.Compare(a.Value, b.Value)
- })
- values := make([]T, len(results))
- for i, result := range results {
- values[i] = result.Value
- }
- return values, nil
- }
- func getGroupLogGroupByValuesMinute[T cmp.Ordered](
- field, group, tokenName string,
- start, end time.Time,
- ) ([]T, error) {
- type Result struct {
- Value T
- UsedAmount float64
- RequestCount int64
- }
- var results []Result
- query := LogDB.
- Model(&GroupSummaryMinute{})
- if group != "" {
- query = query.Where("group_id = ?", group)
- }
- if tokenName != "" {
- query = query.Where("token_name = ?", tokenName)
- }
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("minute_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("minute_timestamp <= ?", end.Unix())
- }
- err := query.
- Select(
- field + " as value, SUM(request_count) as request_count, SUM(used_amount) as used_amount",
- ).
- Group(field).
- Find(&results).Error
- if err != nil {
- return nil, err
- }
- slices.SortFunc(results, func(a, b Result) int {
- if a.UsedAmount != b.UsedAmount {
- return cmp.Compare(b.UsedAmount, a.UsedAmount)
- }
- if a.RequestCount != b.RequestCount {
- return cmp.Compare(b.RequestCount, a.RequestCount)
- }
- return cmp.Compare(a.Value, b.Value)
- })
- values := make([]T, len(results))
- for i, result := range results {
- values[i] = result.Value
- }
- return values, nil
- }
- func getDashboardDataMinute(
- start,
- end time.Time,
- modelName string,
- channelID int,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) (*DashboardResponse, error) {
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- var (
- chartData []ChartData
- channels []int
- models []string
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- chartData, err = getChartDataMinute(start, end, channelID, modelName, timeSpan, timezone)
- return err
- })
- g.Go(func() error {
- var err error
- channels, err = GetUsedChannelsMinute(start, end)
- return err
- })
- g.Go(func() error {
- var err error
- models, err = GetUsedModelsMinute(start, end)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- dashboardResponse := sumDashboardResponse(chartData)
- dashboardResponse.Channels = channels
- dashboardResponse.Models = models
- return &dashboardResponse, nil
- }
- func getGroupDashboardDataMinute(
- group string,
- start, end time.Time,
- tokenName string,
- modelName string,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) (*GroupDashboardResponse, error) {
- if group == "" {
- return nil, errors.New("group is required")
- }
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- var (
- chartData []ChartData
- tokenNames []string
- models []string
- )
- g := new(errgroup.Group)
- g.Go(func() error {
- var err error
- chartData, err = getGroupChartDataMinute(
- group,
- start,
- end,
- tokenName,
- modelName,
- timeSpan,
- timezone,
- )
- return err
- })
- g.Go(func() error {
- var err error
- tokenNames, err = GetGroupUsedTokenNamesMinute(group, start, end)
- return err
- })
- g.Go(func() error {
- var err error
- models, err = GetGroupUsedModelsMinute(group, tokenName, start, end)
- return err
- })
- if err := g.Wait(); err != nil {
- return nil, err
- }
- dashboardResponse := sumDashboardResponse(chartData)
- dashboardResponse.Models = models
- return &GroupDashboardResponse{
- DashboardResponse: dashboardResponse,
- TokenNames: tokenNames,
- }, nil
- }
- type SummaryDataV2 struct {
- Timestamp int64 `json:"timestamp,omitempty"`
- ChannelID int `json:"channel_id,omitempty"`
- GroupID string `json:"group_id,omitempty"`
- TokenName string `json:"token_name,omitempty"`
- Model string `json:"model"`
- UsedAmount float64 `json:"used_amount"`
- TotalTimeMilliseconds int64 `json:"total_time_milliseconds"`
- TotalTTFBMilliseconds int64 `json:"total_ttfb_milliseconds"`
- Count
- Usage
- MaxRPM int64 `json:"max_rpm"`
- MaxTPM int64 `json:"max_tpm"`
- }
- type TimeSummaryDataV2 struct {
- Timestamp int64 `json:"timestamp"`
- Summary []SummaryDataV2 `json:"summary"`
- }
- func GetTimeSeriesModelData(
- channelID int,
- modelName string,
- start, end time.Time,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) ([]TimeSummaryDataV2, error) {
- if timeSpan == TimeSpanMinute {
- return getTimeSeriesModelDataMinute(channelID, modelName, start, end, timeSpan, timezone)
- }
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- query := LogDB.Model(&Summary{})
- if channelID != 0 {
- query = query.Where("channel_id = ?", channelID)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("hour_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("hour_timestamp <= ?", end.Unix())
- }
- const selectFields = "hour_timestamp as timestamp, channel_id, model, " +
- "sum(used_amount) as used_amount, " +
- "sum(request_count) as request_count, sum(retry_count) as retry_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
- "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
- "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
- "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
- "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
- "sum(web_search_count) as web_search_count"
- var rawData []SummaryDataV2
- err := query.
- Select(selectFields).
- Group("timestamp, channel_id, model").
- Find(&rawData).Error
- if err != nil {
- return nil, err
- }
- if len(rawData) > 0 {
- err = batchFillMaxValues(rawData, channelID, modelName, start, end)
- if err != nil {
- return nil, err
- }
- if timeSpan != TimeSpanHour {
- rawData = aggregatToSpan(rawData, timeSpan, timezone)
- }
- }
- result := convertToTimeModelData(rawData)
- slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
- return cmp.Compare(a.Timestamp, b.Timestamp)
- })
- return result, nil
- }
- func GetGroupTimeSeriesModelData(
- group string,
- tokenName string,
- modelName string,
- start, end time.Time,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) ([]TimeSummaryDataV2, error) {
- if timeSpan == TimeSpanMinute {
- return getGroupTimeSeriesModelDataMinute(
- group,
- tokenName,
- modelName,
- start,
- end,
- timeSpan,
- timezone,
- )
- }
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- query := LogDB.Model(&GroupSummary{}).
- Where("group_id = ?", group)
- if tokenName != "" {
- query = query.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("hour_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("hour_timestamp <= ?", end.Unix())
- }
- const selectFields = "hour_timestamp as timestamp, group_id, token_name, model, " +
- "sum(used_amount) as used_amount, " +
- "sum(request_count) as request_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
- "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
- "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
- "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
- "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
- "sum(web_search_count) as web_search_count"
- var rawData []SummaryDataV2
- err := query.
- Select(selectFields).
- Group("timestamp, group_id, token_name, model").
- Find(&rawData).Error
- if err != nil {
- return nil, err
- }
- if len(rawData) > 0 {
- err = batchFillGroupMaxValues(rawData, group, tokenName, modelName, start, end)
- if err != nil {
- return nil, err
- }
- if timeSpan != TimeSpanHour {
- rawData = aggregatToSpan(rawData, timeSpan, timezone)
- }
- }
- result := convertToTimeModelData(rawData)
- slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
- return cmp.Compare(a.Timestamp, b.Timestamp)
- })
- return result, nil
- }
- func batchFillMaxValues(
- rawData []SummaryDataV2,
- channelID int,
- modelName string,
- start, end time.Time,
- ) error {
- minuteQuery := LogDB.Model(&SummaryMinute{})
- if channelID != 0 {
- minuteQuery = minuteQuery.Where("channel_id = ?", channelID)
- }
- if modelName != "" {
- minuteQuery = minuteQuery.Where("model = ?", modelName)
- }
- minuteStart := start.Unix()
- minuteEnd := end.Unix()
- if end.IsZero() {
- minuteEnd = time.Now().Unix()
- }
- minuteQuery = minuteQuery.Where(
- "minute_timestamp >= ? AND minute_timestamp <= ?",
- minuteStart,
- minuteEnd,
- )
- type MaxResult struct {
- HourTimestamp int64 `json:"hour_timestamp"`
- ChannelID int `json:"channel_id"`
- Model string `json:"model"`
- MaxRPM int64 `json:"max_rpm"`
- MaxTPM int64 `json:"max_tpm"`
- }
- var maxResults []MaxResult
- err := minuteQuery.
- Select(`
- (minute_timestamp - minute_timestamp % 3600) as hour_timestamp,
- channel_id,
- model,
- MAX(request_count) as max_rpm,
- MAX(total_tokens) as max_tpm
- `).
- Group("hour_timestamp, channel_id, model").
- Find(&maxResults).Error
- if err != nil {
- return err
- }
- type Key struct {
- HourTimestamp int64
- ChannelID int
- Model string
- }
- maxMap := make(map[Key]MaxResult)
- for _, result := range maxResults {
- key := Key{
- HourTimestamp: result.HourTimestamp,
- ChannelID: result.ChannelID,
- Model: result.Model,
- }
- maxMap[key] = result
- }
- for i := range rawData {
- data := &rawData[i]
- key := Key{
- HourTimestamp: data.Timestamp,
- ChannelID: data.ChannelID,
- Model: data.Model,
- }
- if maxResult, exists := maxMap[key]; exists {
- data.MaxRPM = maxResult.MaxRPM
- data.MaxTPM = maxResult.MaxTPM
- }
- }
- return nil
- }
- func batchFillGroupMaxValues(
- rawData []SummaryDataV2,
- group, tokenName, modelName string,
- start, end time.Time,
- ) error {
- minuteQuery := LogDB.Model(&GroupSummaryMinute{}).
- Where("group_id = ?", group)
- if tokenName != "" {
- minuteQuery = minuteQuery.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- minuteQuery = minuteQuery.Where("model = ?", modelName)
- }
- minuteStart := start.Unix()
- minuteEnd := end.Unix()
- if end.IsZero() {
- minuteEnd = time.Now().Unix()
- }
- minuteQuery = minuteQuery.Where(
- "minute_timestamp >= ? AND minute_timestamp <= ?",
- minuteStart,
- minuteEnd,
- )
- type MaxResult struct {
- HourTimestamp int64 `json:"hour_timestamp"`
- GroupID string `json:"group_id"`
- TokenName string `json:"token_name"`
- Model string `json:"model"`
- MaxRPM int64 `json:"max_rpm"`
- MaxTPM int64 `json:"max_tpm"`
- }
- var maxResults []MaxResult
- err := minuteQuery.
- Select(`
- (minute_timestamp - minute_timestamp % 3600) as hour_timestamp,
- group_id,
- token_name,
- model,
- MAX(request_count) as max_rpm,
- MAX(total_tokens) as max_tpm
- `).
- Group("hour_timestamp, group_id, token_name, model").
- Find(&maxResults).Error
- if err != nil {
- return err
- }
- type Key struct {
- HourTimestamp int64
- GroupID string
- TokenName string
- Model string
- }
- maxMap := make(map[Key]MaxResult)
- for _, result := range maxResults {
- key := Key{
- HourTimestamp: result.HourTimestamp,
- GroupID: result.GroupID,
- TokenName: result.TokenName,
- Model: result.Model,
- }
- maxMap[key] = result
- }
- for i := range rawData {
- data := &rawData[i]
- key := Key{
- HourTimestamp: data.Timestamp,
- GroupID: data.GroupID,
- TokenName: data.TokenName,
- Model: data.Model,
- }
- if maxResult, exists := maxMap[key]; exists {
- data.MaxRPM = maxResult.MaxRPM
- data.MaxTPM = maxResult.MaxTPM
- }
- }
- return nil
- }
- func getTimeSeriesModelDataMinute(
- channelID int,
- modelName string,
- start, end time.Time,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) ([]TimeSummaryDataV2, error) {
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- query := LogDB.Model(&SummaryMinute{})
- if channelID != 0 {
- query = query.Where("channel_id = ?", channelID)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("minute_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("minute_timestamp <= ?", end.Unix())
- }
- const selectFields = "minute_timestamp as timestamp, channel_id, model, " +
- "sum(used_amount) as used_amount, " +
- "sum(request_count) as request_count, sum(retry_count) as retry_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
- "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
- "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
- "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
- "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
- "sum(web_search_count) as web_search_count"
- var rawData []SummaryDataV2
- err := query.
- Select(selectFields).
- Group("timestamp, channel_id, model").
- Find(&rawData).Error
- if err != nil {
- return nil, err
- }
- for i, data := range rawData {
- rawData[i].MaxRPM = data.RequestCount
- rawData[i].MaxTPM = int64(data.TotalTokens)
- }
- if len(rawData) > 0 && timeSpan != TimeSpanMinute {
- rawData = aggregatToSpan(rawData, timeSpan, timezone)
- }
- result := convertToTimeModelData(rawData)
- slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
- return cmp.Compare(a.Timestamp, b.Timestamp)
- })
- return result, nil
- }
- func getGroupTimeSeriesModelDataMinute(
- group string,
- tokenName string,
- modelName string,
- start, end time.Time,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) ([]TimeSummaryDataV2, error) {
- if end.IsZero() {
- end = time.Now()
- } else if end.Before(start) {
- return nil, errors.New("end time is before start time")
- }
- query := LogDB.Model(&GroupSummaryMinute{}).
- Where("group_id = ?", group)
- if tokenName != "" {
- query = query.Where("token_name = ?", tokenName)
- }
- if modelName != "" {
- query = query.Where("model = ?", modelName)
- }
- switch {
- case !start.IsZero() && !end.IsZero():
- query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
- case !start.IsZero():
- query = query.Where("minute_timestamp >= ?", start.Unix())
- case !end.IsZero():
- query = query.Where("minute_timestamp <= ?", end.Unix())
- }
- const selectFields = "minute_timestamp as timestamp, group_id, token_name, model, " +
- "sum(used_amount) as used_amount, " +
- "sum(request_count) as request_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
- "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
- "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
- "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
- "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
- "sum(web_search_count) as web_search_count"
- var rawData []SummaryDataV2
- err := query.
- Select(selectFields).
- Group("timestamp, group_id, token_name, model").
- Find(&rawData).Error
- if err != nil {
- return nil, err
- }
- for i, data := range rawData {
- rawData[i].MaxRPM = data.RequestCount
- rawData[i].MaxTPM = int64(data.TotalTokens)
- }
- if len(rawData) > 0 && timeSpan != TimeSpanMinute {
- rawData = aggregatToSpanGroup(rawData, timeSpan, timezone)
- }
- result := convertToTimeModelData(rawData)
- slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
- return cmp.Compare(a.Timestamp, b.Timestamp)
- })
- return result, nil
- }
- func aggregatToSpan(
- minuteData []SummaryDataV2,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) []SummaryDataV2 {
- if timezone == nil {
- timezone = time.Local
- }
- type AggKey struct {
- Timestamp int64
- ChannelID int
- Model string
- }
- dataMap := make(map[AggKey]SummaryDataV2)
- for _, data := range minuteData {
- t := time.Unix(data.Timestamp, 0).In(timezone)
- key := AggKey{
- ChannelID: data.ChannelID,
- Model: data.Model,
- }
- switch timeSpan {
- case TimeSpanMonth:
- startOfMonth := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, timezone)
- key.Timestamp = startOfMonth.Unix()
- case TimeSpanDay:
- startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, timezone)
- key.Timestamp = startOfDay.Unix()
- case TimeSpanHour:
- startOfHour := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, timezone)
- key.Timestamp = startOfHour.Unix()
- case TimeSpanMinute:
- fallthrough
- default:
- startOfMinute := time.Date(
- t.Year(),
- t.Month(),
- t.Day(),
- t.Hour(),
- t.Minute(),
- 0,
- 0,
- timezone,
- )
- key.Timestamp = startOfMinute.Unix()
- }
- currentData, exists := dataMap[key]
- if !exists {
- currentData = SummaryDataV2{
- Timestamp: key.Timestamp,
- ChannelID: data.ChannelID,
- Model: data.Model,
- }
- }
- currentData.Count.Add(data.Count)
- currentData.Usage.Add(data.Usage)
- currentData.UsedAmount = decimal.
- NewFromFloat(currentData.UsedAmount).
- Add(decimal.NewFromFloat(data.UsedAmount)).
- InexactFloat64()
- currentData.TotalTimeMilliseconds += data.TotalTimeMilliseconds
- currentData.TotalTTFBMilliseconds += data.TotalTTFBMilliseconds
- if data.MaxRPM > currentData.MaxRPM {
- currentData.MaxRPM = data.MaxRPM
- }
- if data.MaxTPM > currentData.MaxTPM {
- currentData.MaxTPM = data.MaxTPM
- }
- dataMap[key] = currentData
- }
- result := make([]SummaryDataV2, 0, len(dataMap))
- for _, data := range dataMap {
- result = append(result, data)
- }
- return result
- }
- func aggregatToSpanGroup(
- minuteData []SummaryDataV2,
- timeSpan TimeSpanType,
- timezone *time.Location,
- ) []SummaryDataV2 {
- if timezone == nil {
- timezone = time.Local
- }
- type AggKey struct {
- Timestamp int64
- GroupID string
- TokenName string
- Model string
- }
- dataMap := make(map[AggKey]SummaryDataV2)
- for _, data := range minuteData {
- t := time.Unix(data.Timestamp, 0).In(timezone)
- key := AggKey{
- GroupID: data.GroupID,
- TokenName: data.TokenName,
- Model: data.Model,
- }
- switch timeSpan {
- case TimeSpanMonth:
- startOfMonth := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, timezone)
- key.Timestamp = startOfMonth.Unix()
- case TimeSpanDay:
- startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, timezone)
- key.Timestamp = startOfDay.Unix()
- case TimeSpanHour:
- startOfHour := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, timezone)
- key.Timestamp = startOfHour.Unix()
- case TimeSpanMinute:
- fallthrough
- default:
- startOfMinute := time.Date(
- t.Year(),
- t.Month(),
- t.Day(),
- t.Hour(),
- t.Minute(),
- 0,
- 0,
- timezone,
- )
- key.Timestamp = startOfMinute.Unix()
- }
- currentData, exists := dataMap[key]
- if !exists {
- currentData = SummaryDataV2{
- Timestamp: key.Timestamp,
- GroupID: data.GroupID,
- TokenName: data.TokenName,
- Model: data.Model,
- }
- }
- currentData.Count.Add(data.Count)
- currentData.Usage.Add(data.Usage)
- currentData.UsedAmount = decimal.
- NewFromFloat(currentData.UsedAmount).
- Add(decimal.NewFromFloat(data.UsedAmount)).
- InexactFloat64()
- currentData.TotalTimeMilliseconds += data.TotalTimeMilliseconds
- currentData.TotalTTFBMilliseconds += data.TotalTTFBMilliseconds
- if data.MaxRPM > currentData.MaxRPM {
- currentData.MaxRPM = data.MaxRPM
- }
- if data.MaxTPM > currentData.MaxTPM {
- currentData.MaxTPM = data.MaxTPM
- }
- dataMap[key] = currentData
- }
- result := make([]SummaryDataV2, 0, len(dataMap))
- for _, data := range dataMap {
- result = append(result, data)
- }
- return result
- }
- func convertToTimeModelData(rawData []SummaryDataV2) []TimeSummaryDataV2 {
- timeMap := make(map[int64][]SummaryDataV2)
- for _, data := range rawData {
- timeMap[data.Timestamp] = append(timeMap[data.Timestamp], data)
- }
- result := make([]TimeSummaryDataV2, 0, len(timeMap))
- for timestamp, models := range timeMap {
- slices.SortFunc(models, func(a, b SummaryDataV2) int {
- if a.UsedAmount != b.UsedAmount {
- return cmp.Compare(b.UsedAmount, a.UsedAmount)
- }
- if a.TotalTokens != b.TotalTokens {
- return cmp.Compare(b.TotalTokens, a.TotalTokens)
- }
- if a.RequestCount != b.RequestCount {
- return cmp.Compare(b.RequestCount, a.RequestCount)
- }
- return cmp.Compare(a.Model, b.Model)
- })
- result = append(result, TimeSummaryDataV2{
- Timestamp: timestamp,
- Summary: models,
- })
- }
- return result
- }
|