package service import ( "context" "fmt" "sync" "sync/atomic" "time" "github.com/QuantumNous/new-api/common" "github.com/QuantumNous/new-api/logger" "github.com/QuantumNous/new-api/model" "github.com/bytedance/gopkg/util/gopool" ) const ( subscriptionResetTickInterval = 1 * time.Minute subscriptionResetBatchSize = 300 subscriptionCleanupInterval = 30 * time.Minute ) var ( subscriptionResetOnce sync.Once subscriptionResetRunning atomic.Bool subscriptionCleanupLast atomic.Int64 ) func StartSubscriptionQuotaResetTask() { subscriptionResetOnce.Do(func() { if !common.IsMasterNode { return } gopool.Go(func() { logger.LogInfo(context.Background(), fmt.Sprintf("subscription quota reset task started: tick=%s", subscriptionResetTickInterval)) ticker := time.NewTicker(subscriptionResetTickInterval) defer ticker.Stop() runSubscriptionQuotaResetOnce() for range ticker.C { runSubscriptionQuotaResetOnce() } }) }) } func runSubscriptionQuotaResetOnce() { if !subscriptionResetRunning.CompareAndSwap(false, true) { return } defer subscriptionResetRunning.Store(false) ctx := context.Background() totalReset := 0 totalExpired := 0 for { n, err := model.ExpireDueSubscriptions(subscriptionResetBatchSize) if err != nil { logger.LogWarn(ctx, fmt.Sprintf("subscription expire task failed: %v", err)) return } if n == 0 { break } totalExpired += n if n < subscriptionResetBatchSize { break } } for { n, err := model.ResetDueSubscriptions(subscriptionResetBatchSize) if err != nil { logger.LogWarn(ctx, fmt.Sprintf("subscription quota reset task failed: %v", err)) return } if n == 0 { break } totalReset += n if n < subscriptionResetBatchSize { break } } lastCleanup := time.Unix(subscriptionCleanupLast.Load(), 0) if time.Since(lastCleanup) >= subscriptionCleanupInterval { if _, err := model.CleanupSubscriptionPreConsumeRecords(7 * 24 * 3600); err == nil { subscriptionCleanupLast.Store(time.Now().Unix()) } } if common.DebugEnabled && (totalReset > 0 || totalExpired > 0) { logger.LogDebug(ctx, "subscription maintenance: reset_count=%d, expired_count=%d", totalReset, totalExpired) } }