| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package model
- import (
- "errors"
- "one-api/common"
- "sync"
- "time"
- "github.com/bytedance/gopkg/util/gopool"
- "gorm.io/gorm"
- )
- const (
- BatchUpdateTypeUserQuota = iota
- BatchUpdateTypeTokenQuota
- BatchUpdateTypeUsedQuota
- BatchUpdateTypeChannelUsedQuota
- BatchUpdateTypeRequestCount
- BatchUpdateTypeCount // if you add a new type, you need to add a new map and a new lock
- )
- var batchUpdateStores []map[int]int
- var batchUpdateLocks []sync.Mutex
- func init() {
- for i := 0; i < BatchUpdateTypeCount; i++ {
- batchUpdateStores = append(batchUpdateStores, make(map[int]int))
- batchUpdateLocks = append(batchUpdateLocks, sync.Mutex{})
- }
- }
- func InitBatchUpdater() {
- gopool.Go(func() {
- for {
- time.Sleep(time.Duration(common.BatchUpdateInterval) * time.Second)
- batchUpdate()
- }
- })
- }
- func addNewRecord(type_ int, id int, value int) {
- batchUpdateLocks[type_].Lock()
- defer batchUpdateLocks[type_].Unlock()
- if _, ok := batchUpdateStores[type_][id]; !ok {
- batchUpdateStores[type_][id] = value
- } else {
- batchUpdateStores[type_][id] += value
- }
- }
- func batchUpdate() {
- // check if there's any data to update
- hasData := false
- for i := 0; i < BatchUpdateTypeCount; i++ {
- batchUpdateLocks[i].Lock()
- if len(batchUpdateStores[i]) > 0 {
- hasData = true
- batchUpdateLocks[i].Unlock()
- break
- }
- batchUpdateLocks[i].Unlock()
- }
- if !hasData {
- return
- }
- common.SysLog("batch update started")
- for i := 0; i < BatchUpdateTypeCount; i++ {
- batchUpdateLocks[i].Lock()
- store := batchUpdateStores[i]
- batchUpdateStores[i] = make(map[int]int)
- batchUpdateLocks[i].Unlock()
- // TODO: maybe we can combine updates with same key?
- for key, value := range store {
- switch i {
- case BatchUpdateTypeUserQuota:
- err := increaseUserQuota(key, value)
- if err != nil {
- common.SysError("failed to batch update user quota: " + err.Error())
- }
- case BatchUpdateTypeTokenQuota:
- err := increaseTokenQuota(key, value)
- if err != nil {
- common.SysError("failed to batch update token quota: " + err.Error())
- }
- case BatchUpdateTypeUsedQuota:
- updateUserUsedQuota(key, value)
- case BatchUpdateTypeRequestCount:
- updateUserRequestCount(key, value)
- case BatchUpdateTypeChannelUsedQuota:
- updateChannelUsedQuota(key, value)
- }
- }
- }
- common.SysLog("batch update finished")
- }
- func RecordExist(err error) (bool, error) {
- if err == nil {
- return true, nil
- }
- if errors.Is(err, gorm.ErrRecordNotFound) {
- return false, nil
- }
- return false, err
- }
- func shouldUpdateRedis(fromDB bool, err error) bool {
- return common.RedisEnabled && fromDB && err == nil
- }
|