subscription_reset_task.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/QuantumNous/new-api/common"
  9. "github.com/QuantumNous/new-api/logger"
  10. "github.com/QuantumNous/new-api/model"
  11. "github.com/bytedance/gopkg/util/gopool"
  12. )
  13. const (
  14. subscriptionResetTickInterval = 1 * time.Minute
  15. subscriptionResetBatchSize = 300
  16. subscriptionCleanupInterval = 30 * time.Minute
  17. )
  18. var (
  19. subscriptionResetOnce sync.Once
  20. subscriptionResetRunning atomic.Bool
  21. subscriptionCleanupLast atomic.Int64
  22. )
  23. func StartSubscriptionQuotaResetTask() {
  24. subscriptionResetOnce.Do(func() {
  25. if !common.IsMasterNode {
  26. return
  27. }
  28. gopool.Go(func() {
  29. logger.LogInfo(context.Background(), fmt.Sprintf("subscription quota reset task started: tick=%s", subscriptionResetTickInterval))
  30. ticker := time.NewTicker(subscriptionResetTickInterval)
  31. defer ticker.Stop()
  32. runSubscriptionQuotaResetOnce()
  33. for range ticker.C {
  34. runSubscriptionQuotaResetOnce()
  35. }
  36. })
  37. })
  38. }
  39. func runSubscriptionQuotaResetOnce() {
  40. if !subscriptionResetRunning.CompareAndSwap(false, true) {
  41. return
  42. }
  43. defer subscriptionResetRunning.Store(false)
  44. ctx := context.Background()
  45. totalReset := 0
  46. totalExpired := 0
  47. for {
  48. n, err := model.ExpireDueSubscriptions(subscriptionResetBatchSize)
  49. if err != nil {
  50. logger.LogWarn(ctx, fmt.Sprintf("subscription expire task failed: %v", err))
  51. return
  52. }
  53. if n == 0 {
  54. break
  55. }
  56. totalExpired += n
  57. if n < subscriptionResetBatchSize {
  58. break
  59. }
  60. }
  61. for {
  62. n, err := model.ResetDueSubscriptions(subscriptionResetBatchSize)
  63. if err != nil {
  64. logger.LogWarn(ctx, fmt.Sprintf("subscription quota reset task failed: %v", err))
  65. return
  66. }
  67. if n == 0 {
  68. break
  69. }
  70. totalReset += n
  71. if n < subscriptionResetBatchSize {
  72. break
  73. }
  74. }
  75. lastCleanup := time.Unix(subscriptionCleanupLast.Load(), 0)
  76. if time.Since(lastCleanup) >= subscriptionCleanupInterval {
  77. if _, err := model.CleanupSubscriptionPreConsumeRecords(7 * 24 * 3600); err == nil {
  78. subscriptionCleanupLast.Store(time.Now().Unix())
  79. }
  80. }
  81. if common.DebugEnabled && (totalReset > 0 || totalExpired > 0) {
  82. logger.LogDebug(ctx, "subscription maintenance: reset_count=%d, expired_count=%d", totalReset, totalExpired)
  83. }
  84. }