billing_session.go 12 KB


  1. package service
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strings"
  6. "sync"
  7. "github.com/QuantumNous/new-api/common"
  8. "github.com/QuantumNous/new-api/logger"
  9. "github.com/QuantumNous/new-api/model"
  10. relaycommon "github.com/QuantumNous/new-api/relay/common"
  11. "github.com/QuantumNous/new-api/types"
  12. "github.com/bytedance/gopkg/util/gopool"
  13. "github.com/gin-gonic/gin"
  14. )
  15. // ---------------------------------------------------------------------------
  16. // BillingSession — 统一计费会话
  17. // ---------------------------------------------------------------------------
  18. // BillingSession 封装单次请求的预扣费/结算/退款生命周期。
  19. // 实现 relaycommon.BillingSettler 接口。
  20. type BillingSession struct {
  21. relayInfo *relaycommon.RelayInfo
  22. funding FundingSource
  23. preConsumedQuota int // 实际预扣额度(信任用户可能为 0)
  24. tokenConsumed int // 令牌额度实际扣减量
  25. fundingSettled bool // funding.Settle 已成功,资金来源已提交
  26. settled bool // Settle 全部完成(资金 + 令牌)
  27. refunded bool // Refund 已调用
  28. mu sync.Mutex
  29. }
  30. // Settle 根据实际消耗额度进行结算。
  31. // 资金来源和令牌额度分两步提交:若资金来源已提交但令牌调整失败,
  32. // 会标记 fundingSettled 防止 Refund 对已提交的资金来源执行退款。
  33. func (s *BillingSession) Settle(actualQuota int) error {
  34. s.mu.Lock()
  35. defer s.mu.Unlock()
  36. if s.settled {
  37. return nil
  38. }
  39. delta := actualQuota - s.preConsumedQuota
  40. if delta == 0 {
  41. s.settled = true
  42. return nil
  43. }
  44. // 1) 调整资金来源(仅在尚未提交时执行,防止重复调用)
  45. if !s.fundingSettled {
  46. if err := s.funding.Settle(delta); err != nil {
  47. return err
  48. }
  49. s.fundingSettled = true
  50. }
  51. // 2) 调整令牌额度
  52. var tokenErr error
  53. if !s.relayInfo.IsPlayground {
  54. if delta > 0 {
  55. tokenErr = model.DecreaseTokenQuota(s.relayInfo.TokenId, s.relayInfo.TokenKey, delta)
  56. } else {
  57. tokenErr = model.IncreaseTokenQuota(s.relayInfo.TokenId, s.relayInfo.TokenKey, -delta)
  58. }
  59. if tokenErr != nil {
  60. // 资金来源已提交,令牌调整失败只能记录日志;标记 settled 防止 Refund 误退资金
  61. common.SysLog(fmt.Sprintf("error adjusting token quota after funding settled (userId=%d, tokenId=%d, delta=%d): %s",
  62. s.relayInfo.UserId, s.relayInfo.TokenId, delta, tokenErr.Error()))
  63. }
  64. }
  65. // 3) 更新 relayInfo 上的订阅 PostDelta(用于日志)
  66. if s.funding.Source() == BillingSourceSubscription {
  67. s.relayInfo.SubscriptionPostDelta += int64(delta)
  68. }
  69. s.settled = true
  70. return tokenErr
  71. }
  72. // Refund 退还所有预扣费,幂等安全,异步执行。
  73. func (s *BillingSession) Refund(c *gin.Context) {
  74. s.mu.Lock()
  75. if s.settled || s.refunded || !s.needsRefundLocked() {
  76. s.mu.Unlock()
  77. return
  78. }
  79. s.refunded = true
  80. s.mu.Unlock()
  81. logger.LogInfo(c, fmt.Sprintf("用户 %d 请求失败, 返还预扣费(token_quota=%s, funding=%s)",
  82. s.relayInfo.UserId,
  83. logger.FormatQuota(s.tokenConsumed),
  84. s.funding.Source(),
  85. ))
  86. // 复制需要的值到闭包中
  87. tokenId := s.relayInfo.TokenId
  88. tokenKey := s.relayInfo.TokenKey
  89. isPlayground := s.relayInfo.IsPlayground
  90. tokenConsumed := s.tokenConsumed
  91. funding := s.funding
  92. gopool.Go(func() {
  93. // 1) 退还资金来源
  94. if err := funding.Refund(); err != nil {
  95. common.SysLog("error refunding billing source: " + err.Error())
  96. }
  97. // 2) 退还令牌额度
  98. if tokenConsumed > 0 && !isPlayground {
  99. if err := model.IncreaseTokenQuota(tokenId, tokenKey, tokenConsumed); err != nil {
  100. common.SysLog("error refunding token quota: " + err.Error())
  101. }
  102. }
  103. })
  104. }
  105. // NeedsRefund 返回是否存在需要退还的预扣状态。
  106. func (s *BillingSession) NeedsRefund() bool {
  107. s.mu.Lock()
  108. defer s.mu.Unlock()
  109. return s.needsRefundLocked()
  110. }
  111. func (s *BillingSession) needsRefundLocked() bool {
  112. if s.settled || s.refunded || s.fundingSettled {
  113. // fundingSettled 时资金来源已提交结算,不能再退预扣费
  114. return false
  115. }
  116. if s.tokenConsumed > 0 {
  117. return true
  118. }
  119. // 订阅可能在 tokenConsumed=0 时仍预扣了额度
  120. if sub, ok := s.funding.(*SubscriptionFunding); ok && sub.preConsumed > 0 {
  121. return true
  122. }
  123. return false
  124. }
  125. // GetPreConsumedQuota 返回实际预扣的额度。
  126. func (s *BillingSession) GetPreConsumedQuota() int {
  127. return s.preConsumedQuota
  128. }
  129. // ---------------------------------------------------------------------------
  130. // PreConsume — 统一预扣费入口(含信任额度旁路)
  131. // ---------------------------------------------------------------------------
  132. // preConsume 执行预扣费:信任检查 -> 令牌预扣 -> 资金来源预扣。
  133. // 任一步骤失败时原子回滚已完成的步骤。
  134. func (s *BillingSession) preConsume(c *gin.Context, quota int) *types.NewAPIError {
  135. effectiveQuota := quota
  136. // ---- 信任额度旁路 ----
  137. if s.shouldTrust(c) {
  138. effectiveQuota = 0
  139. logger.LogInfo(c, fmt.Sprintf("用户 %d 额度充足, 信任且不需要预扣费 (funding=%s)", s.relayInfo.UserId, s.funding.Source()))
  140. } else if effectiveQuota > 0 {
  141. logger.LogInfo(c, fmt.Sprintf("用户 %d 需要预扣费 %s (funding=%s)", s.relayInfo.UserId, logger.FormatQuota(effectiveQuota), s.funding.Source()))
  142. }
  143. // ---- 1) 预扣令牌额度 ----
  144. if effectiveQuota > 0 {
  145. if err := PreConsumeTokenQuota(s.relayInfo, effectiveQuota); err != nil {
  146. return types.NewErrorWithStatusCode(err, types.ErrorCodePreConsumeTokenQuotaFailed, http.StatusForbidden, types.ErrOptionWithSkipRetry(), types.ErrOptionWithNoRecordErrorLog())
  147. }
  148. s.tokenConsumed = effectiveQuota
  149. }
  150. // ---- 2) 预扣资金来源 ----
  151. if err := s.funding.PreConsume(effectiveQuota); err != nil {
  152. // 预扣费失败,回滚令牌额度
  153. if s.tokenConsumed > 0 && !s.relayInfo.IsPlayground {
  154. if rollbackErr := model.IncreaseTokenQuota(s.relayInfo.TokenId, s.relayInfo.TokenKey, s.tokenConsumed); rollbackErr != nil {
  155. common.SysLog(fmt.Sprintf("error rolling back token quota (userId=%d, tokenId=%d, amount=%d, fundingErr=%s): %s",
  156. s.relayInfo.UserId, s.relayInfo.TokenId, s.tokenConsumed, err.Error(), rollbackErr.Error()))
  157. }
  158. s.tokenConsumed = 0
  159. }
  160. // TODO: model 层应定义哨兵错误(如 ErrNoActiveSubscription),用 errors.Is 替代字符串匹配
  161. errMsg := err.Error()
  162. if strings.Contains(errMsg, "no active subscription") || strings.Contains(errMsg, "subscription quota insufficient") {
  163. return types.NewErrorWithStatusCode(fmt.Errorf("订阅额度不足或未配置订阅: %s", errMsg), types.ErrorCodeInsufficientUserQuota, http.StatusForbidden, types.ErrOptionWithSkipRetry(), types.ErrOptionWithNoRecordErrorLog())
  164. }
  165. return types.NewError(err, types.ErrorCodeUpdateDataError, types.ErrOptionWithSkipRetry())
  166. }
  167. s.preConsumedQuota = effectiveQuota
  168. // ---- 同步 RelayInfo 兼容字段 ----
  169. s.syncRelayInfo()
  170. return nil
  171. }
  172. // shouldTrust 统一信任额度检查,适用于钱包和订阅。
  173. func (s *BillingSession) shouldTrust(c *gin.Context) bool {
  174. trustQuota := common.GetTrustQuota()
  175. if trustQuota <= 0 {
  176. return false
  177. }
  178. // 检查令牌是否充足
  179. tokenTrusted := s.relayInfo.TokenUnlimited
  180. if !tokenTrusted {
  181. tokenQuota := c.GetInt("token_quota")
  182. tokenTrusted = tokenQuota > trustQuota
  183. }
  184. if !tokenTrusted {
  185. return false
  186. }
  187. switch s.funding.Source() {
  188. case BillingSourceWallet:
  189. return s.relayInfo.UserQuota > trustQuota
  190. case BillingSourceSubscription:
  191. // 订阅不能启用信任旁路。原因:
  192. // 1. PreConsumeUserSubscription 要求 amount>0 来创建预扣记录并锁定订阅
  193. // 2. SubscriptionFunding.PreConsume 忽略参数,始终用 s.amount 预扣
  194. // 3. 若信任旁路将 effectiveQuota 设为 0,会导致 preConsumedQuota 与实际订阅预扣不一致
  195. return false
  196. default:
  197. return false
  198. }
  199. }
  200. // syncRelayInfo 将 BillingSession 的状态同步到 RelayInfo 的兼容字段上。
  201. func (s *BillingSession) syncRelayInfo() {
  202. info := s.relayInfo
  203. info.FinalPreConsumedQuota = s.preConsumedQuota
  204. info.BillingSource = s.funding.Source()
  205. if sub, ok := s.funding.(*SubscriptionFunding); ok {
  206. info.SubscriptionId = sub.subscriptionId
  207. info.SubscriptionPreConsumed = sub.preConsumed
  208. info.SubscriptionPostDelta = 0
  209. info.SubscriptionAmountTotal = sub.AmountTotal
  210. info.SubscriptionAmountUsedAfterPreConsume = sub.AmountUsedAfter
  211. info.SubscriptionPlanId = sub.PlanId
  212. info.SubscriptionPlanTitle = sub.PlanTitle
  213. } else {
  214. info.SubscriptionId = 0
  215. info.SubscriptionPreConsumed = 0
  216. }
  217. }
  218. // ---------------------------------------------------------------------------
  219. // NewBillingSession 工厂 — 根据计费偏好创建会话并处理回退
  220. // ---------------------------------------------------------------------------
  221. // NewBillingSession 根据用户计费偏好创建 BillingSession,处理 subscription_first / wallet_first 的回退。
  222. func NewBillingSession(c *gin.Context, relayInfo *relaycommon.RelayInfo, preConsumedQuota int) (*BillingSession, *types.NewAPIError) {
  223. if relayInfo == nil {
  224. return nil, types.NewError(fmt.Errorf("relayInfo is nil"), types.ErrorCodeInvalidRequest, types.ErrOptionWithSkipRetry())
  225. }
  226. pref := common.NormalizeBillingPreference(relayInfo.UserSetting.BillingPreference)
  227. // 钱包路径需要先检查用户额度
  228. tryWallet := func() (*BillingSession, *types.NewAPIError) {
  229. userQuota, err := model.GetUserQuota(relayInfo.UserId, false)
  230. if err != nil {
  231. return nil, types.NewError(err, types.ErrorCodeQueryDataError, types.ErrOptionWithSkipRetry())
  232. }
  233. if userQuota <= 0 {
  234. return nil, types.NewErrorWithStatusCode(
  235. fmt.Errorf("用户额度不足, 剩余额度: %s", logger.FormatQuota(userQuota)),
  236. types.ErrorCodeInsufficientUserQuota, http.StatusForbidden,
  237. types.ErrOptionWithSkipRetry(), types.ErrOptionWithNoRecordErrorLog())
  238. }
  239. if userQuota-preConsumedQuota < 0 {
  240. return nil, types.NewErrorWithStatusCode(
  241. fmt.Errorf("预扣费额度失败, 用户剩余额度: %s, 需要预扣费额度: %s", logger.FormatQuota(userQuota), logger.FormatQuota(preConsumedQuota)),
  242. types.ErrorCodeInsufficientUserQuota, http.StatusForbidden,
  243. types.ErrOptionWithSkipRetry(), types.ErrOptionWithNoRecordErrorLog())
  244. }
  245. relayInfo.UserQuota = userQuota
  246. session := &BillingSession{
  247. relayInfo: relayInfo,
  248. funding: &WalletFunding{userId: relayInfo.UserId},
  249. }
  250. if apiErr := session.preConsume(c, preConsumedQuota); apiErr != nil {
  251. return nil, apiErr
  252. }
  253. return session, nil
  254. }
  255. trySubscription := func() (*BillingSession, *types.NewAPIError) {
  256. subConsume := int64(preConsumedQuota)
  257. if subConsume <= 0 {
  258. subConsume = 1
  259. }
  260. session := &BillingSession{
  261. relayInfo: relayInfo,
  262. funding: &SubscriptionFunding{
  263. requestId: relayInfo.RequestId,
  264. userId: relayInfo.UserId,
  265. modelName: relayInfo.OriginModelName,
  266. amount: subConsume,
  267. },
  268. }
  269. // 必须传 subConsume 而非 preConsumedQuota,保证 SubscriptionFunding.amount、
  270. // preConsume 参数和 FinalPreConsumedQuota 三者一致,避免订阅多扣费。
  271. if apiErr := session.preConsume(c, int(subConsume)); apiErr != nil {
  272. return nil, apiErr
  273. }
  274. return session, nil
  275. }
  276. switch pref {
  277. case "subscription_only":
  278. return trySubscription()
  279. case "wallet_only":
  280. return tryWallet()
  281. case "wallet_first":
  282. session, err := tryWallet()
  283. if err != nil {
  284. if err.GetErrorCode() == types.ErrorCodeInsufficientUserQuota {
  285. return trySubscription()
  286. }
  287. return nil, err
  288. }
  289. return session, nil
  290. case "subscription_first":
  291. fallthrough
  292. default:
  293. hasSub, subCheckErr := model.HasActiveUserSubscription(relayInfo.UserId)
  294. if subCheckErr != nil {
  295. return nil, types.NewError(subCheckErr, types.ErrorCodeQueryDataError, types.ErrOptionWithSkipRetry())
  296. }
  297. if !hasSub {
  298. return tryWallet()
  299. }
  300. session, apiErr := trySubscription()
  301. if apiErr != nil {
  302. if apiErr.GetErrorCode() == types.ErrorCodeInsufficientUserQuota {
  303. return tryWallet()
  304. }
  305. return nil, apiErr
  306. }
  307. return session, nil
  308. }
  309. }