| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176 |
- package model
- import (
- "errors"
- "fmt"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/QuantumNous/new-api/common"
- "github.com/QuantumNous/new-api/pkg/cachex"
- "github.com/samber/hot"
- "gorm.io/gorm"
- )
- // Subscription duration units
- const (
- SubscriptionDurationYear = "year"
- SubscriptionDurationMonth = "month"
- SubscriptionDurationDay = "day"
- SubscriptionDurationHour = "hour"
- SubscriptionDurationCustom = "custom"
- )
- // Subscription quota reset period
- const (
- SubscriptionResetNever = "never"
- SubscriptionResetDaily = "daily"
- SubscriptionResetWeekly = "weekly"
- SubscriptionResetMonthly = "monthly"
- SubscriptionResetCustom = "custom"
- )
- var (
- ErrSubscriptionOrderNotFound = errors.New("subscription order not found")
- ErrSubscriptionOrderStatusInvalid = errors.New("subscription order status invalid")
- )
- const (
- subscriptionPlanCacheNamespace = "new-api:subscription_plan:v1"
- subscriptionPlanInfoCacheNamespace = "new-api:subscription_plan_info:v1"
- )
- var (
- subscriptionPlanCacheOnce sync.Once
- subscriptionPlanInfoCacheOnce sync.Once
- subscriptionPlanCache *cachex.HybridCache[SubscriptionPlan]
- subscriptionPlanInfoCache *cachex.HybridCache[SubscriptionPlanInfo]
- )
- func subscriptionPlanCacheTTL() time.Duration {
- ttlSeconds := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_CACHE_TTL", 300)
- if ttlSeconds <= 0 {
- ttlSeconds = 300
- }
- return time.Duration(ttlSeconds) * time.Second
- }
- func subscriptionPlanInfoCacheTTL() time.Duration {
- ttlSeconds := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_INFO_CACHE_TTL", 120)
- if ttlSeconds <= 0 {
- ttlSeconds = 120
- }
- return time.Duration(ttlSeconds) * time.Second
- }
- func subscriptionPlanCacheCapacity() int {
- capacity := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_CACHE_CAP", 5000)
- if capacity <= 0 {
- capacity = 5000
- }
- return capacity
- }
- func subscriptionPlanInfoCacheCapacity() int {
- capacity := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_INFO_CACHE_CAP", 10000)
- if capacity <= 0 {
- capacity = 10000
- }
- return capacity
- }
- func getSubscriptionPlanCache() *cachex.HybridCache[SubscriptionPlan] {
- subscriptionPlanCacheOnce.Do(func() {
- ttl := subscriptionPlanCacheTTL()
- subscriptionPlanCache = cachex.NewHybridCache[SubscriptionPlan](cachex.HybridCacheConfig[SubscriptionPlan]{
- Namespace: cachex.Namespace(subscriptionPlanCacheNamespace),
- Redis: common.RDB,
- RedisEnabled: func() bool {
- return common.RedisEnabled && common.RDB != nil
- },
- RedisCodec: cachex.JSONCodec[SubscriptionPlan]{},
- Memory: func() *hot.HotCache[string, SubscriptionPlan] {
- return hot.NewHotCache[string, SubscriptionPlan](hot.LRU, subscriptionPlanCacheCapacity()).
- WithTTL(ttl).
- WithJanitor().
- Build()
- },
- })
- })
- return subscriptionPlanCache
- }
- func getSubscriptionPlanInfoCache() *cachex.HybridCache[SubscriptionPlanInfo] {
- subscriptionPlanInfoCacheOnce.Do(func() {
- ttl := subscriptionPlanInfoCacheTTL()
- subscriptionPlanInfoCache = cachex.NewHybridCache[SubscriptionPlanInfo](cachex.HybridCacheConfig[SubscriptionPlanInfo]{
- Namespace: cachex.Namespace(subscriptionPlanInfoCacheNamespace),
- Redis: common.RDB,
- RedisEnabled: func() bool {
- return common.RedisEnabled && common.RDB != nil
- },
- RedisCodec: cachex.JSONCodec[SubscriptionPlanInfo]{},
- Memory: func() *hot.HotCache[string, SubscriptionPlanInfo] {
- return hot.NewHotCache[string, SubscriptionPlanInfo](hot.LRU, subscriptionPlanInfoCacheCapacity()).
- WithTTL(ttl).
- WithJanitor().
- Build()
- },
- })
- })
- return subscriptionPlanInfoCache
- }
- func subscriptionPlanCacheKey(id int) string {
- if id <= 0 {
- return ""
- }
- return strconv.Itoa(id)
- }
- func InvalidateSubscriptionPlanCache(planId int) {
- if planId <= 0 {
- return
- }
- cache := getSubscriptionPlanCache()
- _, _ = cache.DeleteMany([]string{subscriptionPlanCacheKey(planId)})
- infoCache := getSubscriptionPlanInfoCache()
- _ = infoCache.Purge()
- }
- // Subscription plan
- type SubscriptionPlan struct {
- Id int `json:"id"`
- Title string `json:"title" gorm:"type:varchar(128);not null"`
- Subtitle string `json:"subtitle" gorm:"type:varchar(255);default:''"`
- // Display money amount (follow existing code style: float64 for money)
- PriceAmount float64 `json:"price_amount" gorm:"type:decimal(10,6);not null;default:0"`
- Currency string `json:"currency" gorm:"type:varchar(8);not null;default:'USD'"`
- DurationUnit string `json:"duration_unit" gorm:"type:varchar(16);not null;default:'month'"`
- DurationValue int `json:"duration_value" gorm:"type:int;not null;default:1"`
- CustomSeconds int64 `json:"custom_seconds" gorm:"type:bigint;not null;default:0"`
- Enabled bool `json:"enabled" gorm:"default:true"`
- SortOrder int `json:"sort_order" gorm:"type:int;default:0"`
- StripePriceId string `json:"stripe_price_id" gorm:"type:varchar(128);default:''"`
- CreemProductId string `json:"creem_product_id" gorm:"type:varchar(128);default:''"`
- // Max purchases per user (0 = unlimited)
- MaxPurchasePerUser int `json:"max_purchase_per_user" gorm:"type:int;default:0"`
- // Upgrade user group after purchase (empty = no change)
- UpgradeGroup string `json:"upgrade_group" gorm:"type:varchar(64);default:''"`
- // Total quota (amount in quota units, 0 = unlimited)
- TotalAmount int64 `json:"total_amount" gorm:"type:bigint;not null;default:0"`
- // Quota reset period for plan
- QuotaResetPeriod string `json:"quota_reset_period" gorm:"type:varchar(16);default:'never'"`
- QuotaResetCustomSeconds int64 `json:"quota_reset_custom_seconds" gorm:"type:bigint;default:0"`
- CreatedAt int64 `json:"created_at" gorm:"bigint"`
- UpdatedAt int64 `json:"updated_at" gorm:"bigint"`
- }
- func (p *SubscriptionPlan) BeforeCreate(tx *gorm.DB) error {
- now := common.GetTimestamp()
- p.CreatedAt = now
- p.UpdatedAt = now
- return nil
- }
- func (p *SubscriptionPlan) BeforeUpdate(tx *gorm.DB) error {
- p.UpdatedAt = common.GetTimestamp()
- return nil
- }
- // Subscription order (payment -> webhook -> create UserSubscription)
- type SubscriptionOrder struct {
- Id int `json:"id"`
- UserId int `json:"user_id" gorm:"index"`
- PlanId int `json:"plan_id" gorm:"index"`
- Money float64 `json:"money"`
- TradeNo string `json:"trade_no" gorm:"unique;type:varchar(255);index"`
- PaymentMethod string `json:"payment_method" gorm:"type:varchar(50)"`
- Status string `json:"status"`
- CreateTime int64 `json:"create_time"`
- CompleteTime int64 `json:"complete_time"`
- ProviderPayload string `json:"provider_payload" gorm:"type:text"`
- }
- func (o *SubscriptionOrder) Insert() error {
- if o.CreateTime == 0 {
- o.CreateTime = common.GetTimestamp()
- }
- return DB.Create(o).Error
- }
- func (o *SubscriptionOrder) Update() error {
- return DB.Save(o).Error
- }
- func GetSubscriptionOrderByTradeNo(tradeNo string) *SubscriptionOrder {
- if tradeNo == "" {
- return nil
- }
- var order SubscriptionOrder
- if err := DB.Where("trade_no = ?", tradeNo).First(&order).Error; err != nil {
- return nil
- }
- return &order
- }
- // User subscription instance
- type UserSubscription struct {
- Id int `json:"id"`
- UserId int `json:"user_id" gorm:"index;index:idx_user_sub_active,priority:1"`
- PlanId int `json:"plan_id" gorm:"index"`
- AmountTotal int64 `json:"amount_total" gorm:"type:bigint;not null;default:0"`
- AmountUsed int64 `json:"amount_used" gorm:"type:bigint;not null;default:0"`
- StartTime int64 `json:"start_time" gorm:"bigint"`
- EndTime int64 `json:"end_time" gorm:"bigint;index;index:idx_user_sub_active,priority:3"`
- Status string `json:"status" gorm:"type:varchar(32);index;index:idx_user_sub_active,priority:2"` // active/expired/cancelled
- Source string `json:"source" gorm:"type:varchar(32);default:'order'"` // order/admin
- LastResetTime int64 `json:"last_reset_time" gorm:"type:bigint;default:0"`
- NextResetTime int64 `json:"next_reset_time" gorm:"type:bigint;default:0;index"`
- UpgradeGroup string `json:"upgrade_group" gorm:"type:varchar(64);default:''"`
- PrevUserGroup string `json:"prev_user_group" gorm:"type:varchar(64);default:''"`
- CreatedAt int64 `json:"created_at" gorm:"bigint"`
- UpdatedAt int64 `json:"updated_at" gorm:"bigint"`
- }
- func (s *UserSubscription) BeforeCreate(tx *gorm.DB) error {
- now := common.GetTimestamp()
- s.CreatedAt = now
- s.UpdatedAt = now
- return nil
- }
- func (s *UserSubscription) BeforeUpdate(tx *gorm.DB) error {
- s.UpdatedAt = common.GetTimestamp()
- return nil
- }
- type SubscriptionSummary struct {
- Subscription *UserSubscription `json:"subscription"`
- }
- func calcPlanEndTime(start time.Time, plan *SubscriptionPlan) (int64, error) {
- if plan == nil {
- return 0, errors.New("plan is nil")
- }
- if plan.DurationValue <= 0 && plan.DurationUnit != SubscriptionDurationCustom {
- return 0, errors.New("duration_value must be > 0")
- }
- switch plan.DurationUnit {
- case SubscriptionDurationYear:
- return start.AddDate(plan.DurationValue, 0, 0).Unix(), nil
- case SubscriptionDurationMonth:
- return start.AddDate(0, plan.DurationValue, 0).Unix(), nil
- case SubscriptionDurationDay:
- return start.Add(time.Duration(plan.DurationValue) * 24 * time.Hour).Unix(), nil
- case SubscriptionDurationHour:
- return start.Add(time.Duration(plan.DurationValue) * time.Hour).Unix(), nil
- case SubscriptionDurationCustom:
- if plan.CustomSeconds <= 0 {
- return 0, errors.New("custom_seconds must be > 0")
- }
- return start.Add(time.Duration(plan.CustomSeconds) * time.Second).Unix(), nil
- default:
- return 0, fmt.Errorf("invalid duration_unit: %s", plan.DurationUnit)
- }
- }
- func NormalizeResetPeriod(period string) string {
- switch strings.TrimSpace(period) {
- case SubscriptionResetDaily, SubscriptionResetWeekly, SubscriptionResetMonthly, SubscriptionResetCustom:
- return strings.TrimSpace(period)
- default:
- return SubscriptionResetNever
- }
- }
- func calcNextResetTime(base time.Time, plan *SubscriptionPlan, endUnix int64) int64 {
- if plan == nil {
- return 0
- }
- period := NormalizeResetPeriod(plan.QuotaResetPeriod)
- if period == SubscriptionResetNever {
- return 0
- }
- var next time.Time
- switch period {
- case SubscriptionResetDaily:
- next = time.Date(base.Year(), base.Month(), base.Day(), 0, 0, 0, 0, base.Location()).
- AddDate(0, 0, 1)
- case SubscriptionResetWeekly:
- // Align to next Monday 00:00
- weekday := int(base.Weekday()) // Sunday=0
- // Convert to Monday=1..Sunday=7
- if weekday == 0 {
- weekday = 7
- }
- daysUntil := 8 - weekday
- next = time.Date(base.Year(), base.Month(), base.Day(), 0, 0, 0, 0, base.Location()).
- AddDate(0, 0, daysUntil)
- case SubscriptionResetMonthly:
- // Align to first day of next month 00:00
- next = time.Date(base.Year(), base.Month(), 1, 0, 0, 0, 0, base.Location()).
- AddDate(0, 1, 0)
- case SubscriptionResetCustom:
- if plan.QuotaResetCustomSeconds <= 0 {
- return 0
- }
- next = base.Add(time.Duration(plan.QuotaResetCustomSeconds) * time.Second)
- default:
- return 0
- }
- if endUnix > 0 && next.Unix() > endUnix {
- return 0
- }
- return next.Unix()
- }
- func GetSubscriptionPlanById(id int) (*SubscriptionPlan, error) {
- return getSubscriptionPlanByIdTx(nil, id)
- }
- func getSubscriptionPlanByIdTx(tx *gorm.DB, id int) (*SubscriptionPlan, error) {
- if id <= 0 {
- return nil, errors.New("invalid plan id")
- }
- key := subscriptionPlanCacheKey(id)
- if key != "" {
- if cached, found, err := getSubscriptionPlanCache().Get(key); err == nil && found {
- return &cached, nil
- }
- }
- var plan SubscriptionPlan
- query := DB
- if tx != nil {
- query = tx
- }
- if err := query.Where("id = ?", id).First(&plan).Error; err != nil {
- return nil, err
- }
- _ = getSubscriptionPlanCache().SetWithTTL(key, plan, subscriptionPlanCacheTTL())
- return &plan, nil
- }
- func CountUserSubscriptionsByPlan(userId int, planId int) (int64, error) {
- if userId <= 0 || planId <= 0 {
- return 0, errors.New("invalid userId or planId")
- }
- var count int64
- if err := DB.Model(&UserSubscription{}).
- Where("user_id = ? AND plan_id = ?", userId, planId).
- Count(&count).Error; err != nil {
- return 0, err
- }
- return count, nil
- }
- func getUserGroupByIdTx(tx *gorm.DB, userId int) (string, error) {
- if userId <= 0 {
- return "", errors.New("invalid userId")
- }
- if tx == nil {
- tx = DB
- }
- var group string
- if err := tx.Model(&User{}).Where("id = ?", userId).Select(commonGroupCol).Find(&group).Error; err != nil {
- return "", err
- }
- return group, nil
- }
- func downgradeUserGroupForSubscriptionTx(tx *gorm.DB, sub *UserSubscription, now int64) (string, error) {
- if tx == nil || sub == nil {
- return "", errors.New("invalid downgrade args")
- }
- upgradeGroup := strings.TrimSpace(sub.UpgradeGroup)
- if upgradeGroup == "" {
- return "", nil
- }
- currentGroup, err := getUserGroupByIdTx(tx, sub.UserId)
- if err != nil {
- return "", err
- }
- if currentGroup != upgradeGroup {
- return "", nil
- }
- var activeSub UserSubscription
- activeQuery := tx.Where("user_id = ? AND status = ? AND end_time > ? AND id <> ? AND upgrade_group <> ''",
- sub.UserId, "active", now, sub.Id).
- Order("end_time desc, id desc").
- Limit(1).
- Find(&activeSub)
- if activeQuery.Error == nil && activeQuery.RowsAffected > 0 {
- return "", nil
- }
- prevGroup := strings.TrimSpace(sub.PrevUserGroup)
- if prevGroup == "" || prevGroup == currentGroup {
- return "", nil
- }
- if err := tx.Model(&User{}).Where("id = ?", sub.UserId).
- Update("group", prevGroup).Error; err != nil {
- return "", err
- }
- return prevGroup, nil
- }
- func CreateUserSubscriptionFromPlanTx(tx *gorm.DB, userId int, plan *SubscriptionPlan, source string) (*UserSubscription, error) {
- if tx == nil {
- return nil, errors.New("tx is nil")
- }
- if plan == nil || plan.Id == 0 {
- return nil, errors.New("invalid plan")
- }
- if userId <= 0 {
- return nil, errors.New("invalid user id")
- }
- if plan.MaxPurchasePerUser > 0 {
- var count int64
- if err := tx.Model(&UserSubscription{}).
- Where("user_id = ? AND plan_id = ?", userId, plan.Id).
- Count(&count).Error; err != nil {
- return nil, err
- }
- if count >= int64(plan.MaxPurchasePerUser) {
- return nil, errors.New("已达到该套餐购买上限")
- }
- }
- nowUnix := GetDBTimestamp()
- now := time.Unix(nowUnix, 0)
- endUnix, err := calcPlanEndTime(now, plan)
- if err != nil {
- return nil, err
- }
- resetBase := now
- nextReset := calcNextResetTime(resetBase, plan, endUnix)
- lastReset := int64(0)
- if nextReset > 0 {
- lastReset = now.Unix()
- }
- upgradeGroup := strings.TrimSpace(plan.UpgradeGroup)
- prevGroup := ""
- if upgradeGroup != "" {
- currentGroup, err := getUserGroupByIdTx(tx, userId)
- if err != nil {
- return nil, err
- }
- if currentGroup != upgradeGroup {
- prevGroup = currentGroup
- if err := tx.Model(&User{}).Where("id = ?", userId).
- Update("group", upgradeGroup).Error; err != nil {
- return nil, err
- }
- }
- }
- sub := &UserSubscription{
- UserId: userId,
- PlanId: plan.Id,
- AmountTotal: plan.TotalAmount,
- AmountUsed: 0,
- StartTime: now.Unix(),
- EndTime: endUnix,
- Status: "active",
- Source: source,
- LastResetTime: lastReset,
- NextResetTime: nextReset,
- UpgradeGroup: upgradeGroup,
- PrevUserGroup: prevGroup,
- CreatedAt: common.GetTimestamp(),
- UpdatedAt: common.GetTimestamp(),
- }
- if err := tx.Create(sub).Error; err != nil {
- return nil, err
- }
- return sub, nil
- }
- // Complete a subscription order (idempotent). Creates a UserSubscription snapshot from the plan.
- func CompleteSubscriptionOrder(tradeNo string, providerPayload string) error {
- if tradeNo == "" {
- return errors.New("tradeNo is empty")
- }
- refCol := "`trade_no`"
- if common.UsingPostgreSQL {
- refCol = `"trade_no"`
- }
- var logUserId int
- var logPlanTitle string
- var logMoney float64
- var logPaymentMethod string
- var upgradeGroup string
- err := DB.Transaction(func(tx *gorm.DB) error {
- var order SubscriptionOrder
- if err := tx.Set("gorm:query_option", "FOR UPDATE").Where(refCol+" = ?", tradeNo).First(&order).Error; err != nil {
- return ErrSubscriptionOrderNotFound
- }
- if order.Status == common.TopUpStatusSuccess {
- return nil
- }
- if order.Status != common.TopUpStatusPending {
- return ErrSubscriptionOrderStatusInvalid
- }
- plan, err := GetSubscriptionPlanById(order.PlanId)
- if err != nil {
- return err
- }
- if !plan.Enabled {
- // still allow completion for already purchased orders
- }
- upgradeGroup = strings.TrimSpace(plan.UpgradeGroup)
- _, err = CreateUserSubscriptionFromPlanTx(tx, order.UserId, plan, "order")
- if err != nil {
- return err
- }
- if err := upsertSubscriptionTopUpTx(tx, &order); err != nil {
- return err
- }
- order.Status = common.TopUpStatusSuccess
- order.CompleteTime = common.GetTimestamp()
- if providerPayload != "" {
- order.ProviderPayload = providerPayload
- }
- if err := tx.Save(&order).Error; err != nil {
- return err
- }
- logUserId = order.UserId
- logPlanTitle = plan.Title
- logMoney = order.Money
- logPaymentMethod = order.PaymentMethod
- return nil
- })
- if err != nil {
- return err
- }
- if upgradeGroup != "" && logUserId > 0 {
- _ = UpdateUserGroupCache(logUserId, upgradeGroup)
- }
- if logUserId > 0 {
- msg := fmt.Sprintf("订阅购买成功,套餐: %s,支付金额: %.2f,支付方式: %s", logPlanTitle, logMoney, logPaymentMethod)
- RecordLog(logUserId, LogTypeTopup, msg)
- }
- return nil
- }
- func upsertSubscriptionTopUpTx(tx *gorm.DB, order *SubscriptionOrder) error {
- if tx == nil || order == nil {
- return errors.New("invalid subscription order")
- }
- now := common.GetTimestamp()
- var topup TopUp
- if err := tx.Where("trade_no = ?", order.TradeNo).First(&topup).Error; err != nil {
- if errors.Is(err, gorm.ErrRecordNotFound) {
- topup = TopUp{
- UserId: order.UserId,
- Amount: 0,
- Money: order.Money,
- TradeNo: order.TradeNo,
- PaymentMethod: order.PaymentMethod,
- CreateTime: order.CreateTime,
- CompleteTime: now,
- Status: common.TopUpStatusSuccess,
- }
- return tx.Create(&topup).Error
- }
- return err
- }
- topup.Money = order.Money
- if topup.PaymentMethod == "" {
- topup.PaymentMethod = order.PaymentMethod
- }
- if topup.CreateTime == 0 {
- topup.CreateTime = order.CreateTime
- }
- topup.CompleteTime = now
- topup.Status = common.TopUpStatusSuccess
- return tx.Save(&topup).Error
- }
- func ExpireSubscriptionOrder(tradeNo string) error {
- if tradeNo == "" {
- return errors.New("tradeNo is empty")
- }
- refCol := "`trade_no`"
- if common.UsingPostgreSQL {
- refCol = `"trade_no"`
- }
- return DB.Transaction(func(tx *gorm.DB) error {
- var order SubscriptionOrder
- if err := tx.Set("gorm:query_option", "FOR UPDATE").Where(refCol+" = ?", tradeNo).First(&order).Error; err != nil {
- return ErrSubscriptionOrderNotFound
- }
- if order.Status != common.TopUpStatusPending {
- return nil
- }
- order.Status = common.TopUpStatusExpired
- order.CompleteTime = common.GetTimestamp()
- return tx.Save(&order).Error
- })
- }
- // Admin bind (no payment). Creates a UserSubscription from a plan.
- func AdminBindSubscription(userId int, planId int, sourceNote string) (string, error) {
- if userId <= 0 || planId <= 0 {
- return "", errors.New("invalid userId or planId")
- }
- plan, err := GetSubscriptionPlanById(planId)
- if err != nil {
- return "", err
- }
- err = DB.Transaction(func(tx *gorm.DB) error {
- _, err := CreateUserSubscriptionFromPlanTx(tx, userId, plan, "admin")
- return err
- })
- if err != nil {
- return "", err
- }
- if strings.TrimSpace(plan.UpgradeGroup) != "" {
- _ = UpdateUserGroupCache(userId, plan.UpgradeGroup)
- return fmt.Sprintf("用户分组将升级到 %s", plan.UpgradeGroup), nil
- }
- return "", nil
- }
- // GetAllActiveUserSubscriptions returns all active subscriptions for a user.
- func GetAllActiveUserSubscriptions(userId int) ([]SubscriptionSummary, error) {
- if userId <= 0 {
- return nil, errors.New("invalid userId")
- }
- now := common.GetTimestamp()
- var subs []UserSubscription
- err := DB.Where("user_id = ? AND status = ? AND end_time > ?", userId, "active", now).
- Order("end_time desc, id desc").
- Find(&subs).Error
- if err != nil {
- return nil, err
- }
- return buildSubscriptionSummaries(subs), nil
- }
- // GetAllUserSubscriptions returns all subscriptions (active and expired) for a user.
- func GetAllUserSubscriptions(userId int) ([]SubscriptionSummary, error) {
- if userId <= 0 {
- return nil, errors.New("invalid userId")
- }
- var subs []UserSubscription
- err := DB.Where("user_id = ?", userId).
- Order("end_time desc, id desc").
- Find(&subs).Error
- if err != nil {
- return nil, err
- }
- return buildSubscriptionSummaries(subs), nil
- }
- func buildSubscriptionSummaries(subs []UserSubscription) []SubscriptionSummary {
- if len(subs) == 0 {
- return []SubscriptionSummary{}
- }
- result := make([]SubscriptionSummary, 0, len(subs))
- for _, sub := range subs {
- subCopy := sub
- result = append(result, SubscriptionSummary{
- Subscription: &subCopy,
- })
- }
- return result
- }
- // AdminInvalidateUserSubscription marks a user subscription as cancelled and ends it immediately.
- func AdminInvalidateUserSubscription(userSubscriptionId int) (string, error) {
- if userSubscriptionId <= 0 {
- return "", errors.New("invalid userSubscriptionId")
- }
- now := common.GetTimestamp()
- cacheGroup := ""
- downgradeGroup := ""
- var userId int
- err := DB.Transaction(func(tx *gorm.DB) error {
- var sub UserSubscription
- if err := tx.Set("gorm:query_option", "FOR UPDATE").
- Where("id = ?", userSubscriptionId).First(&sub).Error; err != nil {
- return err
- }
- userId = sub.UserId
- if err := tx.Model(&sub).Updates(map[string]interface{}{
- "status": "cancelled",
- "end_time": now,
- "updated_at": now,
- }).Error; err != nil {
- return err
- }
- target, err := downgradeUserGroupForSubscriptionTx(tx, &sub, now)
- if err != nil {
- return err
- }
- if target != "" {
- cacheGroup = target
- downgradeGroup = target
- }
- return nil
- })
- if err != nil {
- return "", err
- }
- if cacheGroup != "" && userId > 0 {
- _ = UpdateUserGroupCache(userId, cacheGroup)
- }
- if downgradeGroup != "" {
- return fmt.Sprintf("用户分组将回退到 %s", downgradeGroup), nil
- }
- return "", nil
- }
- // AdminDeleteUserSubscription hard-deletes a user subscription.
- func AdminDeleteUserSubscription(userSubscriptionId int) (string, error) {
- if userSubscriptionId <= 0 {
- return "", errors.New("invalid userSubscriptionId")
- }
- now := common.GetTimestamp()
- cacheGroup := ""
- downgradeGroup := ""
- var userId int
- err := DB.Transaction(func(tx *gorm.DB) error {
- var sub UserSubscription
- if err := tx.Set("gorm:query_option", "FOR UPDATE").
- Where("id = ?", userSubscriptionId).First(&sub).Error; err != nil {
- return err
- }
- userId = sub.UserId
- target, err := downgradeUserGroupForSubscriptionTx(tx, &sub, now)
- if err != nil {
- return err
- }
- if target != "" {
- cacheGroup = target
- downgradeGroup = target
- }
- if err := tx.Where("id = ?", userSubscriptionId).Delete(&UserSubscription{}).Error; err != nil {
- return err
- }
- return nil
- })
- if err != nil {
- return "", err
- }
- if cacheGroup != "" && userId > 0 {
- _ = UpdateUserGroupCache(userId, cacheGroup)
- }
- if downgradeGroup != "" {
- return fmt.Sprintf("用户分组将回退到 %s", downgradeGroup), nil
- }
- return "", nil
- }
- type SubscriptionPreConsumeResult struct {
- UserSubscriptionId int
- PreConsumed int64
- AmountTotal int64
- AmountUsedBefore int64
- AmountUsedAfter int64
- }
- // ExpireDueSubscriptions marks expired subscriptions and handles group downgrade.
- func ExpireDueSubscriptions(limit int) (int, error) {
- if limit <= 0 {
- limit = 200
- }
- now := GetDBTimestamp()
- var subs []UserSubscription
- if err := DB.Where("status = ? AND end_time > 0 AND end_time <= ?", "active", now).
- Order("end_time asc, id asc").
- Limit(limit).
- Find(&subs).Error; err != nil {
- return 0, err
- }
- if len(subs) == 0 {
- return 0, nil
- }
- expiredCount := 0
- userIds := make(map[int]struct{}, len(subs))
- for _, sub := range subs {
- if sub.UserId > 0 {
- userIds[sub.UserId] = struct{}{}
- }
- }
- for userId := range userIds {
- cacheGroup := ""
- err := DB.Transaction(func(tx *gorm.DB) error {
- res := tx.Model(&UserSubscription{}).
- Where("user_id = ? AND status = ? AND end_time > 0 AND end_time <= ?", userId, "active", now).
- Updates(map[string]interface{}{
- "status": "expired",
- "updated_at": common.GetTimestamp(),
- })
- if res.Error != nil {
- return res.Error
- }
- expiredCount += int(res.RowsAffected)
- // If there's an active upgraded subscription, keep current group.
- var activeSub UserSubscription
- activeQuery := tx.Where("user_id = ? AND status = ? AND end_time > ? AND upgrade_group <> ''",
- userId, "active", now).
- Order("end_time desc, id desc").
- Limit(1).
- Find(&activeSub)
- if activeQuery.Error == nil && activeQuery.RowsAffected > 0 {
- return nil
- }
- // No active upgraded subscription, downgrade to previous group if needed.
- var lastExpired UserSubscription
- expiredQuery := tx.Where("user_id = ? AND status = ? AND upgrade_group <> ''",
- userId, "expired").
- Order("end_time desc, id desc").
- Limit(1).
- Find(&lastExpired)
- if expiredQuery.Error != nil || expiredQuery.RowsAffected == 0 {
- return nil
- }
- upgradeGroup := strings.TrimSpace(lastExpired.UpgradeGroup)
- prevGroup := strings.TrimSpace(lastExpired.PrevUserGroup)
- if upgradeGroup == "" || prevGroup == "" {
- return nil
- }
- currentGroup, err := getUserGroupByIdTx(tx, userId)
- if err != nil {
- return err
- }
- if currentGroup != upgradeGroup || currentGroup == prevGroup {
- return nil
- }
- if err := tx.Model(&User{}).Where("id = ?", userId).
- Update("group", prevGroup).Error; err != nil {
- return err
- }
- cacheGroup = prevGroup
- return nil
- })
- if err != nil {
- return expiredCount, err
- }
- if cacheGroup != "" {
- _ = UpdateUserGroupCache(userId, cacheGroup)
- }
- }
- return expiredCount, nil
- }
- // SubscriptionPreConsumeRecord stores idempotent pre-consume operations per request.
- type SubscriptionPreConsumeRecord struct {
- Id int `json:"id"`
- RequestId string `json:"request_id" gorm:"type:varchar(64);uniqueIndex"`
- UserId int `json:"user_id" gorm:"index"`
- UserSubscriptionId int `json:"user_subscription_id" gorm:"index"`
- PreConsumed int64 `json:"pre_consumed" gorm:"type:bigint;not null;default:0"`
- Status string `json:"status" gorm:"type:varchar(32);index"` // consumed/refunded
- CreatedAt int64 `json:"created_at" gorm:"bigint"`
- UpdatedAt int64 `json:"updated_at" gorm:"bigint;index"`
- }
- func (r *SubscriptionPreConsumeRecord) BeforeCreate(tx *gorm.DB) error {
- now := common.GetTimestamp()
- r.CreatedAt = now
- r.UpdatedAt = now
- return nil
- }
- func (r *SubscriptionPreConsumeRecord) BeforeUpdate(tx *gorm.DB) error {
- r.UpdatedAt = common.GetTimestamp()
- return nil
- }
- func maybeResetUserSubscriptionWithPlanTx(tx *gorm.DB, sub *UserSubscription, plan *SubscriptionPlan, now int64) error {
- if tx == nil || sub == nil || plan == nil {
- return errors.New("invalid reset args")
- }
- if sub.NextResetTime > 0 && sub.NextResetTime > now {
- return nil
- }
- if NormalizeResetPeriod(plan.QuotaResetPeriod) == SubscriptionResetNever {
- return nil
- }
- baseUnix := sub.LastResetTime
- if baseUnix <= 0 {
- baseUnix = sub.StartTime
- }
- base := time.Unix(baseUnix, 0)
- next := calcNextResetTime(base, plan, sub.EndTime)
- advanced := false
- for next > 0 && next <= now {
- advanced = true
- base = time.Unix(next, 0)
- next = calcNextResetTime(base, plan, sub.EndTime)
- }
- if !advanced {
- if sub.NextResetTime == 0 && next > 0 {
- sub.NextResetTime = next
- sub.LastResetTime = base.Unix()
- return tx.Save(sub).Error
- }
- return nil
- }
- sub.AmountUsed = 0
- sub.LastResetTime = base.Unix()
- sub.NextResetTime = next
- return tx.Save(sub).Error
- }
- // PreConsumeUserSubscription pre-consumes from any active subscription total quota.
- func PreConsumeUserSubscription(requestId string, userId int, modelName string, quotaType int, amount int64) (*SubscriptionPreConsumeResult, error) {
- if userId <= 0 {
- return nil, errors.New("invalid userId")
- }
- if strings.TrimSpace(requestId) == "" {
- return nil, errors.New("requestId is empty")
- }
- if amount <= 0 {
- return nil, errors.New("amount must be > 0")
- }
- now := GetDBTimestamp()
- returnValue := &SubscriptionPreConsumeResult{}
- err := DB.Transaction(func(tx *gorm.DB) error {
- var existing SubscriptionPreConsumeRecord
- query := tx.Where("request_id = ?", requestId).Limit(1).Find(&existing)
- if query.Error != nil {
- return query.Error
- }
- if query.RowsAffected > 0 {
- if existing.Status == "refunded" {
- return errors.New("subscription pre-consume already refunded")
- }
- var sub UserSubscription
- if err := tx.Where("id = ?", existing.UserSubscriptionId).First(&sub).Error; err != nil {
- return err
- }
- returnValue.UserSubscriptionId = sub.Id
- returnValue.PreConsumed = existing.PreConsumed
- returnValue.AmountTotal = sub.AmountTotal
- returnValue.AmountUsedBefore = sub.AmountUsed
- returnValue.AmountUsedAfter = sub.AmountUsed
- return nil
- }
- var subs []UserSubscription
- if err := tx.Set("gorm:query_option", "FOR UPDATE").
- Where("user_id = ? AND status = ? AND end_time > ?", userId, "active", now).
- Order("end_time asc, id asc").
- Find(&subs).Error; err != nil {
- return errors.New("no active subscription")
- }
- if len(subs) == 0 {
- return errors.New("no active subscription")
- }
- for _, candidate := range subs {
- sub := candidate
- plan, err := getSubscriptionPlanByIdTx(tx, sub.PlanId)
- if err != nil {
- return err
- }
- if err := maybeResetUserSubscriptionWithPlanTx(tx, &sub, plan, now); err != nil {
- return err
- }
- usedBefore := sub.AmountUsed
- if sub.AmountTotal > 0 {
- remain := sub.AmountTotal - usedBefore
- if remain < amount {
- continue
- }
- }
- record := &SubscriptionPreConsumeRecord{
- RequestId: requestId,
- UserId: userId,
- UserSubscriptionId: sub.Id,
- PreConsumed: amount,
- Status: "consumed",
- }
- if err := tx.Create(record).Error; err != nil {
- var dup SubscriptionPreConsumeRecord
- if err2 := tx.Where("request_id = ?", requestId).First(&dup).Error; err2 == nil {
- if dup.Status == "refunded" {
- return errors.New("subscription pre-consume already refunded")
- }
- returnValue.UserSubscriptionId = sub.Id
- returnValue.PreConsumed = dup.PreConsumed
- returnValue.AmountTotal = sub.AmountTotal
- returnValue.AmountUsedBefore = sub.AmountUsed
- returnValue.AmountUsedAfter = sub.AmountUsed
- return nil
- }
- return err
- }
- sub.AmountUsed += amount
- if err := tx.Save(&sub).Error; err != nil {
- return err
- }
- returnValue.UserSubscriptionId = sub.Id
- returnValue.PreConsumed = amount
- returnValue.AmountTotal = sub.AmountTotal
- returnValue.AmountUsedBefore = usedBefore
- returnValue.AmountUsedAfter = sub.AmountUsed
- return nil
- }
- return fmt.Errorf("subscription quota insufficient, need=%d", amount)
- })
- if err != nil {
- return nil, err
- }
- return returnValue, nil
- }
- // RefundSubscriptionPreConsume is idempotent and refunds pre-consumed subscription quota by requestId.
- func RefundSubscriptionPreConsume(requestId string) error {
- if strings.TrimSpace(requestId) == "" {
- return errors.New("requestId is empty")
- }
- return DB.Transaction(func(tx *gorm.DB) error {
- var record SubscriptionPreConsumeRecord
- if err := tx.Set("gorm:query_option", "FOR UPDATE").
- Where("request_id = ?", requestId).First(&record).Error; err != nil {
- return err
- }
- if record.Status == "refunded" {
- return nil
- }
- if record.PreConsumed <= 0 {
- record.Status = "refunded"
- return tx.Save(&record).Error
- }
- if err := PostConsumeUserSubscriptionDelta(record.UserSubscriptionId, -record.PreConsumed); err != nil {
- return err
- }
- record.Status = "refunded"
- return tx.Save(&record).Error
- })
- }
- // ResetDueSubscriptions resets subscriptions whose next_reset_time has passed.
- func ResetDueSubscriptions(limit int) (int, error) {
- if limit <= 0 {
- limit = 200
- }
- now := GetDBTimestamp()
- var subs []UserSubscription
- if err := DB.Where("next_reset_time > 0 AND next_reset_time <= ? AND status = ?", now, "active").
- Order("next_reset_time asc").
- Limit(limit).
- Find(&subs).Error; err != nil {
- return 0, err
- }
- if len(subs) == 0 {
- return 0, nil
- }
- resetCount := 0
- for _, sub := range subs {
- subCopy := sub
- plan, err := getSubscriptionPlanByIdTx(nil, sub.PlanId)
- if err != nil || plan == nil {
- continue
- }
- err = DB.Transaction(func(tx *gorm.DB) error {
- var locked UserSubscription
- if err := tx.Set("gorm:query_option", "FOR UPDATE").
- Where("id = ? AND next_reset_time > 0 AND next_reset_time <= ?", subCopy.Id, now).
- First(&locked).Error; err != nil {
- return nil
- }
- if err := maybeResetUserSubscriptionWithPlanTx(tx, &locked, plan, now); err != nil {
- return err
- }
- resetCount++
- return nil
- })
- if err != nil {
- return resetCount, err
- }
- }
- return resetCount, nil
- }
- // CleanupSubscriptionPreConsumeRecords removes old idempotency records to keep table small.
- func CleanupSubscriptionPreConsumeRecords(olderThanSeconds int64) (int64, error) {
- if olderThanSeconds <= 0 {
- olderThanSeconds = 7 * 24 * 3600
- }
- cutoff := GetDBTimestamp() - olderThanSeconds
- res := DB.Where("updated_at < ?", cutoff).Delete(&SubscriptionPreConsumeRecord{})
- return res.RowsAffected, res.Error
- }
- type SubscriptionPlanInfo struct {
- PlanId int
- PlanTitle string
- }
- func GetSubscriptionPlanInfoByUserSubscriptionId(userSubscriptionId int) (*SubscriptionPlanInfo, error) {
- if userSubscriptionId <= 0 {
- return nil, errors.New("invalid userSubscriptionId")
- }
- cacheKey := fmt.Sprintf("sub:%d", userSubscriptionId)
- if cached, found, err := getSubscriptionPlanInfoCache().Get(cacheKey); err == nil && found {
- return &cached, nil
- }
- var sub UserSubscription
- if err := DB.Where("id = ?", userSubscriptionId).First(&sub).Error; err != nil {
- return nil, err
- }
- plan, err := getSubscriptionPlanByIdTx(nil, sub.PlanId)
- if err != nil {
- return nil, err
- }
- info := &SubscriptionPlanInfo{
- PlanId: sub.PlanId,
- PlanTitle: plan.Title,
- }
- _ = getSubscriptionPlanInfoCache().SetWithTTL(cacheKey, *info, subscriptionPlanInfoCacheTTL())
- return info, nil
- }
- // Update subscription used amount by delta (positive consume more, negative refund).
- func PostConsumeUserSubscriptionDelta(userSubscriptionId int, delta int64) error {
- if userSubscriptionId <= 0 {
- return errors.New("invalid userSubscriptionId")
- }
- if delta == 0 {
- return nil
- }
- return DB.Transaction(func(tx *gorm.DB) error {
- var sub UserSubscription
- if err := tx.Set("gorm:query_option", "FOR UPDATE").
- Where("id = ?", userSubscriptionId).
- First(&sub).Error; err != nil {
- return err
- }
- newUsed := sub.AmountUsed + delta
- if newUsed < 0 {
- newUsed = 0
- }
- if sub.AmountTotal > 0 && newUsed > sub.AmountTotal {
- return fmt.Errorf("subscription used exceeds total, used=%d total=%d", newUsed, sub.AmountTotal)
- }
- sub.AmountUsed = newUsed
- return tx.Save(&sub).Error
- })
- }
|