utils.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package model
  2. import (
  3. "errors"
  4. "one-api/common"
  5. "sync"
  6. "time"
  7. "github.com/bytedance/gopkg/util/gopool"
  8. "gorm.io/gorm"
  9. )
  10. const (
  11. BatchUpdateTypeUserQuota = iota
  12. BatchUpdateTypeTokenQuota
  13. BatchUpdateTypeUsedQuota
  14. BatchUpdateTypeChannelUsedQuota
  15. BatchUpdateTypeRequestCount
  16. BatchUpdateTypeCount // if you add a new type, you need to add a new map and a new lock
  17. )
  18. var batchUpdateStores []map[int]int
  19. var batchUpdateLocks []sync.Mutex
  20. func init() {
  21. for i := 0; i < BatchUpdateTypeCount; i++ {
  22. batchUpdateStores = append(batchUpdateStores, make(map[int]int))
  23. batchUpdateLocks = append(batchUpdateLocks, sync.Mutex{})
  24. }
  25. }
  26. func InitBatchUpdater() {
  27. gopool.Go(func() {
  28. for {
  29. time.Sleep(time.Duration(common.BatchUpdateInterval) * time.Second)
  30. batchUpdate()
  31. }
  32. })
  33. }
  34. func addNewRecord(type_ int, id int, value int) {
  35. batchUpdateLocks[type_].Lock()
  36. defer batchUpdateLocks[type_].Unlock()
  37. if _, ok := batchUpdateStores[type_][id]; !ok {
  38. batchUpdateStores[type_][id] = value
  39. } else {
  40. batchUpdateStores[type_][id] += value
  41. }
  42. }
  43. func batchUpdate() {
  44. // check if there's any data to update
  45. hasData := false
  46. for i := 0; i < BatchUpdateTypeCount; i++ {
  47. batchUpdateLocks[i].Lock()
  48. if len(batchUpdateStores[i]) > 0 {
  49. hasData = true
  50. batchUpdateLocks[i].Unlock()
  51. break
  52. }
  53. batchUpdateLocks[i].Unlock()
  54. }
  55. if !hasData {
  56. return
  57. }
  58. common.SysLog("batch update started")
  59. for i := 0; i < BatchUpdateTypeCount; i++ {
  60. batchUpdateLocks[i].Lock()
  61. store := batchUpdateStores[i]
  62. batchUpdateStores[i] = make(map[int]int)
  63. batchUpdateLocks[i].Unlock()
  64. // TODO: maybe we can combine updates with same key?
  65. for key, value := range store {
  66. switch i {
  67. case BatchUpdateTypeUserQuota:
  68. err := increaseUserQuota(key, value)
  69. if err != nil {
  70. common.SysError("failed to batch update user quota: " + err.Error())
  71. }
  72. case BatchUpdateTypeTokenQuota:
  73. err := increaseTokenQuota(key, value)
  74. if err != nil {
  75. common.SysError("failed to batch update token quota: " + err.Error())
  76. }
  77. case BatchUpdateTypeUsedQuota:
  78. updateUserUsedQuota(key, value)
  79. case BatchUpdateTypeRequestCount:
  80. updateUserRequestCount(key, value)
  81. case BatchUpdateTypeChannelUsedQuota:
  82. updateChannelUsedQuota(key, value)
  83. }
  84. }
  85. }
  86. common.SysLog("batch update finished")
  87. }
  88. func RecordExist(err error) (bool, error) {
  89. if err == nil {
  90. return true, nil
  91. }
  92. if errors.Is(err, gorm.ErrRecordNotFound) {
  93. return false, nil
  94. }
  95. return false, err
  96. }
  97. func shouldUpdateRedis(fromDB bool, err error) bool {
  98. return common.RedisEnabled && fromDB && err == nil
  99. }