| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732 |
- package model
- import (
- "context"
- "net/http"
- "sync"
- "time"
- "github.com/labring/aiproxy/core/common/config"
- "github.com/labring/aiproxy/core/common/notify"
- "github.com/shopspring/decimal"
- )
- type batchUpdateData struct {
- Groups map[string]*GroupUpdate
- Tokens map[int]*TokenUpdate
- Channels map[int]*ChannelUpdate
- Summaries map[SummaryUnique]*SummaryUpdate
- GroupSummaries map[GroupSummaryUnique]*GroupSummaryUpdate
- SummariesMinute map[SummaryMinuteUnique]*SummaryMinuteUpdate
- GroupSummariesMinute map[GroupSummaryMinuteUnique]*GroupSummaryMinuteUpdate
- sync.Mutex
- }
- func (b *batchUpdateData) IsClean() bool {
- b.Lock()
- defer b.Unlock()
- return b.isCleanLocked()
- }
- func (b *batchUpdateData) isCleanLocked() bool {
- return len(b.Groups) == 0 &&
- len(b.Tokens) == 0 &&
- len(b.Channels) == 0 &&
- len(b.Summaries) == 0 &&
- len(b.GroupSummaries) == 0 &&
- len(b.SummariesMinute) == 0 &&
- len(b.GroupSummariesMinute) == 0
- }
- type GroupUpdate struct {
- Amount decimal.Decimal
- Count int
- }
- type TokenUpdate struct {
- Amount decimal.Decimal
- Count int
- }
- type ChannelUpdate struct {
- Amount decimal.Decimal
- Count int
- RetryCount int
- }
- type SummaryUpdate struct {
- SummaryUnique
- SummaryData
- }
- type SummaryMinuteUpdate struct {
- SummaryMinuteUnique
- SummaryData
- }
- type GroupSummaryUpdate struct {
- GroupSummaryUnique
- SummaryData
- }
- type GroupSummaryMinuteUpdate struct {
- GroupSummaryMinuteUnique
- SummaryData
- }
- var batchData batchUpdateData
- func init() {
- batchData = batchUpdateData{
- Groups: make(map[string]*GroupUpdate),
- Tokens: make(map[int]*TokenUpdate),
- Channels: make(map[int]*ChannelUpdate),
- Summaries: make(map[SummaryUnique]*SummaryUpdate),
- GroupSummaries: make(map[GroupSummaryUnique]*GroupSummaryUpdate),
- SummariesMinute: make(map[SummaryMinuteUnique]*SummaryMinuteUpdate),
- GroupSummariesMinute: make(map[GroupSummaryMinuteUnique]*GroupSummaryMinuteUpdate),
- }
- }
- func StartBatchProcessorSummary(ctx context.Context, wg *sync.WaitGroup) {
- defer wg.Done()
- ticker := time.NewTicker(5 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- ProcessBatchUpdatesSummary()
- return
- case <-ticker.C:
- ProcessBatchUpdatesSummary()
- }
- }
- }
- func CleanBatchUpdatesSummary(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- ProcessBatchUpdatesSummary()
- return
- default:
- if batchData.IsClean() {
- return
- }
- }
- ProcessBatchUpdatesSummary()
- time.Sleep(time.Second * 1)
- }
- }
- func ProcessBatchUpdatesSummary() {
- batchData.Lock()
- defer batchData.Unlock()
- var wg sync.WaitGroup
- wg.Add(1)
- go processGroupUpdates(&wg)
- wg.Add(1)
- go processTokenUpdates(&wg)
- wg.Add(1)
- go processChannelUpdates(&wg)
- wg.Add(1)
- go processGroupSummaryUpdates(&wg)
- wg.Add(1)
- go processSummaryUpdates(&wg)
- wg.Add(1)
- go processSummaryMinuteUpdates(&wg)
- wg.Add(1)
- go processGroupSummaryMinuteUpdates(&wg)
- wg.Wait()
- }
- func processGroupUpdates(wg *sync.WaitGroup) {
- defer wg.Done()
- for groupID, data := range batchData.Groups {
- err := UpdateGroupUsedAmountAndRequestCount(
- groupID,
- data.Amount.InexactFloat64(),
- data.Count,
- )
- if IgnoreNotFound(err) != nil {
- notify.ErrorThrottle(
- "batchUpdateGroupUsedAmountAndRequestCount",
- time.Minute*10,
- "failed to batch update group",
- err.Error(),
- )
- } else {
- delete(batchData.Groups, groupID)
- }
- }
- }
- func processTokenUpdates(wg *sync.WaitGroup) {
- defer wg.Done()
- for tokenID, data := range batchData.Tokens {
- err := UpdateTokenUsedAmount(tokenID, data.Amount.InexactFloat64(), data.Count)
- if IgnoreNotFound(err) != nil {
- notify.ErrorThrottle(
- "batchUpdateTokenUsedAmount",
- time.Minute*10,
- "failed to batch update token",
- err.Error(),
- )
- } else {
- delete(batchData.Tokens, tokenID)
- }
- }
- }
- func processChannelUpdates(wg *sync.WaitGroup) {
- defer wg.Done()
- for channelID, data := range batchData.Channels {
- err := UpdateChannelUsedAmount(
- channelID,
- data.Amount.InexactFloat64(),
- data.Count,
- data.RetryCount,
- )
- if IgnoreNotFound(err) != nil {
- notify.ErrorThrottle(
- "batchUpdateChannelUsedAmount",
- time.Minute*10,
- "failed to batch update channel",
- err.Error(),
- )
- } else {
- delete(batchData.Channels, channelID)
- }
- }
- }
- func processGroupSummaryUpdates(wg *sync.WaitGroup) {
- defer wg.Done()
- for key, data := range batchData.GroupSummaries {
- err := UpsertGroupSummary(data.GroupSummaryUnique, data.SummaryData)
- if err != nil {
- notify.ErrorThrottle(
- "batchUpdateGroupSummary",
- time.Minute*10,
- "failed to batch update group summary",
- err.Error(),
- )
- } else {
- delete(batchData.GroupSummaries, key)
- }
- }
- }
- func processGroupSummaryMinuteUpdates(wg *sync.WaitGroup) {
- defer wg.Done()
- for key, data := range batchData.GroupSummariesMinute {
- err := UpsertGroupSummaryMinute(data.GroupSummaryMinuteUnique, data.SummaryData)
- if err != nil {
- notify.ErrorThrottle(
- "batchUpdateGroupSummary",
- time.Minute*10,
- "failed to batch update group summary",
- err.Error(),
- )
- } else {
- delete(batchData.GroupSummariesMinute, key)
- }
- }
- }
- func processSummaryUpdates(wg *sync.WaitGroup) {
- defer wg.Done()
- for key, data := range batchData.Summaries {
- err := UpsertSummary(data.SummaryUnique, data.SummaryData)
- if err != nil {
- notify.ErrorThrottle(
- "batchUpdateSummary",
- time.Minute*10,
- "failed to batch update summary",
- err.Error(),
- )
- } else {
- delete(batchData.Summaries, key)
- }
- }
- }
- func processSummaryMinuteUpdates(wg *sync.WaitGroup) {
- defer wg.Done()
- for key, data := range batchData.SummariesMinute {
- err := UpsertSummaryMinute(data.SummaryMinuteUnique, data.SummaryData)
- if err != nil {
- notify.ErrorThrottle(
- "batchUpdateSummaryMinute",
- time.Minute*10,
- "failed to batch update summary minute",
- err.Error(),
- )
- } else {
- delete(batchData.SummariesMinute, key)
- }
- }
- }
- func BatchRecordLogs(
- now time.Time,
- requestID string,
- requestAt time.Time,
- retryAt time.Time,
- firstByteAt time.Time,
- group string,
- code int,
- channelID int,
- modelName string,
- tokenID int,
- tokenName string,
- endpoint string,
- content string,
- mode int,
- ip string,
- retryTimes int,
- requestDetail *RequestDetail,
- downstreamResult bool,
- usage Usage,
- modelPrice Price,
- amount float64,
- user string,
- metadata map[string]string,
- ) (err error) {
- if now.IsZero() {
- now = time.Now()
- }
- if code == http.StatusTooManyRequests ||
- config.GetLogDetailStorageHours() < 0 ||
- config.GetLogStorageHours() < 0 {
- requestDetail = nil
- }
- if downstreamResult {
- if config.GetLogStorageHours() >= 0 {
- err = RecordConsumeLog(
- requestID,
- now,
- requestAt,
- retryAt,
- firstByteAt,
- group,
- code,
- channelID,
- modelName,
- tokenID,
- tokenName,
- endpoint,
- content,
- mode,
- ip,
- retryTimes,
- requestDetail,
- usage,
- modelPrice,
- amount,
- user,
- metadata,
- )
- }
- } else {
- if code != http.StatusTooManyRequests &&
- config.GetLogStorageHours() >= 0 &&
- config.GetRetryLogStorageHours() > 0 {
- err = RecordRetryLog(
- requestID,
- now,
- requestAt,
- retryAt,
- firstByteAt,
- code,
- channelID,
- modelName,
- mode,
- retryTimes,
- requestDetail,
- )
- }
- }
- BatchUpdateSummary(
- now,
- requestAt,
- firstByteAt,
- group,
- code,
- channelID,
- modelName,
- tokenID,
- tokenName,
- downstreamResult,
- usage,
- amount,
- )
- return err
- }
- func BatchUpdateSummary(
- now time.Time,
- requestAt time.Time,
- firstByteAt time.Time,
- group string,
- code int,
- channelID int,
- modelName string,
- tokenID int,
- tokenName string,
- downstreamResult bool,
- usage Usage,
- amount float64,
- ) {
- if now.IsZero() {
- now = time.Now()
- }
- amountDecimal := decimal.NewFromFloat(amount)
- batchData.Lock()
- defer batchData.Unlock()
- updateChannelData(channelID, amount, amountDecimal, !downstreamResult)
- if channelID != 0 {
- updateSummaryData(
- channelID,
- modelName,
- now,
- requestAt,
- firstByteAt,
- code,
- amountDecimal,
- usage,
- !downstreamResult,
- )
- updateSummaryDataMinute(
- channelID,
- modelName,
- now,
- requestAt,
- firstByteAt,
- code,
- amountDecimal,
- usage,
- !downstreamResult,
- )
- }
- // group related data only records downstream result
- if !downstreamResult {
- return
- }
- updateGroupData(group, amount, amountDecimal)
- updateTokenData(tokenID, amount, amountDecimal)
- if group != "" {
- updateGroupSummaryData(
- group,
- tokenName,
- modelName,
- now,
- requestAt,
- firstByteAt,
- code,
- amountDecimal,
- usage,
- )
- updateGroupSummaryDataMinute(
- group,
- tokenName,
- modelName,
- now,
- requestAt,
- firstByteAt,
- code,
- amountDecimal,
- usage,
- )
- }
- }
- func updateChannelData(
- channelID int,
- amount float64,
- amountDecimal decimal.Decimal,
- isRetry bool,
- ) {
- if channelID <= 0 {
- return
- }
- if _, ok := batchData.Channels[channelID]; !ok {
- batchData.Channels[channelID] = &ChannelUpdate{}
- }
- if amount > 0 {
- batchData.Channels[channelID].Amount = amountDecimal.
- Add(batchData.Channels[channelID].Amount)
- }
- batchData.Channels[channelID].Count++
- if isRetry {
- batchData.Channels[channelID].RetryCount++
- }
- }
- func updateGroupData(group string, amount float64, amountDecimal decimal.Decimal) {
- if group == "" {
- return
- }
- if _, ok := batchData.Groups[group]; !ok {
- batchData.Groups[group] = &GroupUpdate{}
- }
- if amount > 0 {
- batchData.Groups[group].Amount = amountDecimal.
- Add(batchData.Groups[group].Amount)
- }
- batchData.Groups[group].Count++
- }
- func updateTokenData(tokenID int, amount float64, amountDecimal decimal.Decimal) {
- if tokenID <= 0 {
- return
- }
- if _, ok := batchData.Tokens[tokenID]; !ok {
- batchData.Tokens[tokenID] = &TokenUpdate{}
- }
- if amount > 0 {
- batchData.Tokens[tokenID].Amount = amountDecimal.
- Add(batchData.Tokens[tokenID].Amount)
- }
- batchData.Tokens[tokenID].Count++
- }
- func updateGroupSummaryData(
- group, tokenName, modelName string,
- createAt time.Time,
- requestAt time.Time,
- firstByteAt time.Time,
- code int,
- amountDecimal decimal.Decimal,
- usage Usage,
- ) {
- if createAt.IsZero() {
- createAt = time.Now()
- }
- if requestAt.IsZero() {
- requestAt = createAt
- }
- if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
- firstByteAt = requestAt
- }
- groupUnique := GroupSummaryUnique{
- GroupID: group,
- TokenName: tokenName,
- Model: modelName,
- HourTimestamp: createAt.Truncate(time.Hour).Unix(),
- }
- groupSummary, ok := batchData.GroupSummaries[groupUnique]
- if !ok {
- groupSummary = &GroupSummaryUpdate{
- GroupSummaryUnique: groupUnique,
- }
- batchData.GroupSummaries[groupUnique] = groupSummary
- }
- groupSummary.UsedAmount = amountDecimal.
- Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
- InexactFloat64()
- groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
- groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
- groupSummary.Usage.Add(usage)
- groupSummary.AddRequest(code, false)
- }
- func updateGroupSummaryDataMinute(
- group, tokenName, modelName string,
- createAt time.Time,
- requestAt time.Time,
- firstByteAt time.Time,
- code int,
- amountDecimal decimal.Decimal,
- usage Usage,
- ) {
- if createAt.IsZero() {
- createAt = time.Now()
- }
- if requestAt.IsZero() {
- requestAt = createAt
- }
- if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
- firstByteAt = requestAt
- }
- groupUnique := GroupSummaryMinuteUnique{
- GroupID: group,
- TokenName: tokenName,
- Model: modelName,
- MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
- }
- groupSummary, ok := batchData.GroupSummariesMinute[groupUnique]
- if !ok {
- groupSummary = &GroupSummaryMinuteUpdate{
- GroupSummaryMinuteUnique: groupUnique,
- }
- batchData.GroupSummariesMinute[groupUnique] = groupSummary
- }
- groupSummary.UsedAmount = amountDecimal.
- Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
- InexactFloat64()
- groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
- groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
- groupSummary.Usage.Add(usage)
- groupSummary.AddRequest(code, false)
- }
- func updateSummaryData(
- channelID int,
- modelName string,
- createAt time.Time,
- requestAt time.Time,
- firstByteAt time.Time,
- code int,
- amountDecimal decimal.Decimal,
- usage Usage,
- isRetry bool,
- ) {
- if createAt.IsZero() {
- createAt = time.Now()
- }
- if requestAt.IsZero() {
- requestAt = createAt
- }
- if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
- firstByteAt = requestAt
- }
- summaryUnique := SummaryUnique{
- ChannelID: channelID,
- Model: modelName,
- HourTimestamp: createAt.Truncate(time.Hour).Unix(),
- }
- summary, ok := batchData.Summaries[summaryUnique]
- if !ok {
- summary = &SummaryUpdate{
- SummaryUnique: summaryUnique,
- }
- batchData.Summaries[summaryUnique] = summary
- }
- summary.UsedAmount = amountDecimal.
- Add(decimal.NewFromFloat(summary.UsedAmount)).
- InexactFloat64()
- summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
- summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
- summary.Usage.Add(usage)
- summary.AddRequest(code, isRetry)
- }
- func updateSummaryDataMinute(
- channelID int,
- modelName string,
- createAt time.Time,
- requestAt time.Time,
- firstByteAt time.Time,
- code int,
- amountDecimal decimal.Decimal,
- usage Usage,
- isRetry bool,
- ) {
- if createAt.IsZero() {
- createAt = time.Now()
- }
- if requestAt.IsZero() {
- requestAt = createAt
- }
- if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
- firstByteAt = requestAt
- }
- summaryUnique := SummaryMinuteUnique{
- ChannelID: channelID,
- Model: modelName,
- MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
- }
- summary, ok := batchData.SummariesMinute[summaryUnique]
- if !ok {
- summary = &SummaryMinuteUpdate{
- SummaryMinuteUnique: summaryUnique,
- }
- batchData.SummariesMinute[summaryUnique] = summary
- }
- summary.UsedAmount = amountDecimal.
- Add(decimal.NewFromFloat(summary.UsedAmount)).
- InexactFloat64()
- summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
- summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
- summary.Usage.Add(usage)
- summary.AddRequest(code, isRetry)
- }
|