sealos.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. package balance
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math/rand/v2"
  8. "net/http"
  9. "time"
  10. "github.com/bytedance/sonic"
  11. "github.com/golang-jwt/jwt/v5"
  12. "github.com/labring/aiproxy/core/common"
  13. "github.com/labring/aiproxy/core/common/conv"
  14. "github.com/labring/aiproxy/core/common/env"
  15. "github.com/labring/aiproxy/core/model"
  16. "github.com/redis/go-redis/v9"
  17. "github.com/shopspring/decimal"
  18. log "github.com/sirupsen/logrus"
  19. )
  20. const (
  21. defaultAccountURL = "http://account-service.account-system.svc.cluster.local:2333"
  22. balancePrecision = 1000000
  23. appType = "LLM-TOKEN"
  24. sealosRequester = "sealos-admin"
  25. sealosGroupBalanceKey = "sealos:balance:%s"
  26. sealosUserRealNameKey = "sealos:realName:%s"
  27. getBalanceRetry = 3
  28. )
  29. var (
  30. _ GroupBalance = (*Sealos)(nil)
  31. sealosHTTPClient = &http.Client{}
  32. decimalBalancePrecision = decimal.NewFromInt(balancePrecision)
  33. minConsumeAmount = decimal.NewFromInt(1)
  34. jwtToken string
  35. sealosRedisCacheEnable = env.Bool("BALANCE_SEALOS_REDIS_CACHE_ENABLE", true)
  36. sealosCacheExpire = 3 * time.Minute
  37. )
  38. var (
  39. sealosCheckRealNameEnable = env.Bool("BALANCE_SEALOS_CHECK_REAL_NAME_ENABLE", false)
  40. sealosNoRealNameUsedAmountLimit = env.Float64(
  41. "BALANCE_SEALOS_NO_REAL_NAME_USED_AMOUNT_LIMIT",
  42. 1,
  43. )
  44. )
  45. type Sealos struct {
  46. accountURL string
  47. }
  48. // FIXME: 如果获取余额能成功,但是消费永远失败,需要加一个失败次数限制,如果失败次数超过一定阈值,暂停服务
  49. func InitSealos(jwtKey, accountURL string) error {
  50. token, err := newSealosToken(jwtKey)
  51. if err != nil {
  52. return fmt.Errorf("failed to generate sealos jwt token: %w", err)
  53. }
  54. jwtToken = token
  55. Default = NewSealos(accountURL)
  56. return nil
  57. }
  58. func NewSealos(accountURL string) *Sealos {
  59. if accountURL == "" {
  60. accountURL = defaultAccountURL
  61. }
  62. return &Sealos{accountURL: accountURL}
  63. }
  64. type sealosClaims struct {
  65. Requester string `json:"requester"`
  66. jwt.RegisteredClaims
  67. }
  68. func newSealosToken(key string) (string, error) {
  69. claims := &sealosClaims{
  70. Requester: sealosRequester,
  71. RegisteredClaims: jwt.RegisteredClaims{
  72. NotBefore: jwt.NewNumericDate(time.Now()),
  73. },
  74. }
  75. return jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(conv.StringToBytes(key))
  76. }
  77. type sealosGetGroupBalanceResp struct {
  78. UserUID string `json:"userUID"`
  79. Error string `json:"error"`
  80. Balance int64 `json:"balance"`
  81. WorkspaceSubscription bool `json:"workspaceSubscription"`
  82. TotalAIQuota int64 `json:"totalAIQuota"`
  83. RemainAIQuota int64 `json:"remainAIQuota"`
  84. }
  85. type sealosPostGroupConsumeReq struct {
  86. Namespace string `json:"namespace"`
  87. AppType string `json:"appType"`
  88. AppName string `json:"appName"`
  89. UserUID string `json:"userUID"`
  90. Amount int64 `json:"amount"`
  91. }
  92. type sealosPostGroupConsumeResp struct {
  93. Error string `json:"error"`
  94. }
  95. type sealosCache struct {
  96. UserUID string `redis:"u"`
  97. Balance int64 `redis:"b"`
  98. }
  99. func cacheSetGroupBalance(ctx context.Context, group string, balance int64, userUID string) error {
  100. if !common.RedisEnabled || !sealosRedisCacheEnable {
  101. return nil
  102. }
  103. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  104. defer cancel()
  105. pipe := common.RDB.Pipeline()
  106. pipe.HSet(ctx, common.RedisKeyf(sealosGroupBalanceKey, group), sealosCache{
  107. Balance: balance,
  108. UserUID: userUID,
  109. })
  110. expireTime := sealosCacheExpire + time.Duration(rand.Int64N(10)-5)*time.Second
  111. pipe.Expire(ctx, common.RedisKeyf(sealosGroupBalanceKey, group), expireTime)
  112. _, err := pipe.Exec(ctx)
  113. return err
  114. }
  115. func cacheGetGroupBalance(ctx context.Context, group string) (*sealosCache, error) {
  116. if !common.RedisEnabled || !sealosRedisCacheEnable {
  117. return nil, redis.Nil
  118. }
  119. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  120. defer cancel()
  121. var cache sealosCache
  122. if err := common.RDB.HGetAll(ctx, common.RedisKeyf(sealosGroupBalanceKey, group)).Scan(&cache); err != nil {
  123. return nil, err
  124. }
  125. return &cache, nil
  126. }
  127. var decreaseGroupBalanceScript = redis.NewScript(`
  128. local balance = redis.call("HGet", KEYS[1], "b")
  129. if balance then
  130. redis.call("HSet", KEYS[1], "b", balance - ARGV[1])
  131. end
  132. return redis.status_reply("ok")
  133. `)
  134. func cacheDecreaseGroupBalance(ctx context.Context, group string, amount int64) error {
  135. if !common.RedisEnabled || !sealosRedisCacheEnable {
  136. return nil
  137. }
  138. return decreaseGroupBalanceScript.Run(ctx, common.RDB, []string{common.RedisKeyf(sealosGroupBalanceKey, group)}, amount).
  139. Err()
  140. }
  141. var ErrNoRealNameUsedAmountLimit = errors.New("达到未实名用户使用额度限制,请实名认证")
  142. func (s *Sealos) GetGroupRemainBalance(
  143. ctx context.Context,
  144. group model.GroupCache,
  145. ) (float64, PostGroupConsumer, error) {
  146. for i := 0; ; i++ {
  147. balance, userUID, err := s.getGroupRemainBalance(ctx, group.ID)
  148. if err == nil {
  149. if sealosCheckRealNameEnable &&
  150. group.UsedAmount > sealosNoRealNameUsedAmountLimit &&
  151. !s.checkRealName(ctx, userUID) {
  152. return 0, nil, ErrNoRealNameUsedAmountLimit
  153. }
  154. return decimal.NewFromInt(balance).Div(decimalBalancePrecision).InexactFloat64(),
  155. newSealosPostGroupConsumer(s.accountURL, group.ID, userUID), nil
  156. }
  157. if i == getBalanceRetry-1 {
  158. return 0, nil, err
  159. }
  160. time.Sleep(time.Second)
  161. }
  162. }
  163. func cacheGetUserRealName(ctx context.Context, userUID string) (bool, error) {
  164. if !common.RedisEnabled || !sealosRedisCacheEnable {
  165. return true, redis.Nil
  166. }
  167. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  168. defer cancel()
  169. realName, err := common.RDB.Get(ctx, common.RedisKeyf(sealosUserRealNameKey, userUID)).Bool()
  170. if err != nil {
  171. return false, err
  172. }
  173. return realName, nil
  174. }
  175. func cacheSetUserRealName(ctx context.Context, userUID string, realName bool) error {
  176. if !common.RedisEnabled || !sealosRedisCacheEnable {
  177. return nil
  178. }
  179. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  180. defer cancel()
  181. var expireTime time.Duration
  182. if realName {
  183. expireTime = time.Hour * 12
  184. } else {
  185. expireTime = time.Minute * 1
  186. }
  187. return common.RDB.Set(ctx, common.RedisKeyf(sealosUserRealNameKey, userUID), realName, expireTime).
  188. Err()
  189. }
  190. func (s *Sealos) checkRealName(ctx context.Context, userUID string) bool {
  191. if cache, err := cacheGetUserRealName(ctx, userUID); err == nil {
  192. return cache
  193. } else if !errors.Is(err, redis.Nil) {
  194. log.Errorf("get user (%s) real name cache failed: %s", userUID, err)
  195. }
  196. realName, err := s.fetchRealNameFromAPI(ctx, userUID)
  197. if err != nil {
  198. log.Errorf("fetch user (%s) real name failed: %s", userUID, err)
  199. return true
  200. }
  201. if err := cacheSetUserRealName(ctx, userUID, realName); err != nil {
  202. log.Errorf("set user (%s) real name cache failed: %s", userUID, err)
  203. }
  204. return realName
  205. }
  206. type sealosGetRealNameInfoResp struct {
  207. IsRealName bool `json:"isRealName"`
  208. Error string `json:"error"`
  209. }
  210. func (s *Sealos) fetchRealNameFromAPI(ctx context.Context, userUID string) (bool, error) {
  211. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  212. defer cancel()
  213. req, err := http.NewRequestWithContext(ctx, http.MethodGet,
  214. fmt.Sprintf("%s/admin/v1alpha1/real-name-info?userUID=%s", s.accountURL, userUID), nil)
  215. if err != nil {
  216. return false, err
  217. }
  218. req.Header.Set("Authorization", "Bearer "+jwtToken)
  219. resp, err := sealosHTTPClient.Do(req)
  220. if err != nil {
  221. return false, err
  222. }
  223. defer resp.Body.Close()
  224. var sealosResp sealosGetRealNameInfoResp
  225. if err := sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
  226. return false, err
  227. }
  228. if resp.StatusCode != http.StatusOK || sealosResp.Error != "" {
  229. return false, fmt.Errorf(
  230. "get user (%s) real name failed with status code %d, error: %s",
  231. userUID,
  232. resp.StatusCode,
  233. sealosResp.Error,
  234. )
  235. }
  236. return sealosResp.IsRealName, nil
  237. }
  238. // GroupBalance interface implementation
  239. func (s *Sealos) getGroupRemainBalance(ctx context.Context, group string) (int64, string, error) {
  240. if cache, err := cacheGetGroupBalance(ctx, group); err == nil && cache.UserUID != "" {
  241. return cache.Balance, cache.UserUID, nil
  242. } else if err != nil && !errors.Is(err, redis.Nil) {
  243. log.Errorf("get group (%s) balance cache failed: %s", group, err)
  244. }
  245. balance, err := s.fetchBalanceFromAPI(ctx, group)
  246. if err != nil {
  247. return 0, "", err
  248. }
  249. if err := cacheSetGroupBalance(ctx, group, balance.balance, balance.userUID); err != nil {
  250. log.Errorf("set group (%s) balance cache failed: %s", group, err)
  251. }
  252. return balance.balance, balance.userUID, nil
  253. }
  254. type sealosBalanceResoult struct {
  255. quota int64
  256. balance int64
  257. userUID string
  258. }
  259. func (s *Sealos) fetchBalanceFromAPI(
  260. ctx context.Context,
  261. group string,
  262. ) (*sealosBalanceResoult, error) {
  263. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  264. defer cancel()
  265. req, err := http.NewRequestWithContext(
  266. ctx,
  267. http.MethodGet,
  268. fmt.Sprintf(
  269. "%s/admin/v1alpha1/account-with-workspace?namespace=%s",
  270. s.accountURL,
  271. group,
  272. ),
  273. nil,
  274. )
  275. if err != nil {
  276. return nil, err
  277. }
  278. req.Header.Set("Authorization", "Bearer "+jwtToken)
  279. resp, err := sealosHTTPClient.Do(req)
  280. if err != nil {
  281. return nil, err
  282. }
  283. defer resp.Body.Close()
  284. var sealosResp sealosGetGroupBalanceResp
  285. if err := sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
  286. return nil, err
  287. }
  288. if sealosResp.Error != "" {
  289. return nil, errors.New(sealosResp.Error)
  290. }
  291. if resp.StatusCode != http.StatusOK {
  292. return nil, fmt.Errorf(
  293. "get group (%s) balance failed with status code %d",
  294. group,
  295. resp.StatusCode,
  296. )
  297. }
  298. if sealosResp.WorkspaceSubscription {
  299. return &sealosBalanceResoult{
  300. quota: sealosResp.TotalAIQuota,
  301. balance: sealosResp.RemainAIQuota,
  302. userUID: sealosResp.UserUID,
  303. }, nil
  304. }
  305. return &sealosBalanceResoult{
  306. quota: sealosResp.Balance,
  307. balance: sealosResp.Balance,
  308. userUID: sealosResp.UserUID,
  309. }, nil
  310. }
  311. // GetGroupQuota implements GroupBalance.
  312. func (s *Sealos) GetGroupQuota(ctx context.Context, group model.GroupCache) (*GroupQuota, error) {
  313. balance, err := s.fetchBalanceFromAPI(ctx, group.ID)
  314. if err != nil {
  315. return nil, err
  316. }
  317. return &GroupQuota{
  318. Total: decimal.NewFromInt(balance.quota).Div(decimalBalancePrecision).InexactFloat64(),
  319. Remain: decimal.NewFromInt(balance.balance).Div(decimalBalancePrecision).InexactFloat64(),
  320. }, nil
  321. }
  322. type SealosPostGroupConsumer struct {
  323. accountURL string
  324. group string
  325. uid string
  326. }
  327. func newSealosPostGroupConsumer(accountURL, group, uid string) *SealosPostGroupConsumer {
  328. return &SealosPostGroupConsumer{
  329. accountURL: accountURL,
  330. group: group,
  331. uid: uid,
  332. }
  333. }
  334. func (s *SealosPostGroupConsumer) PostGroupConsume(
  335. ctx context.Context,
  336. tokenName string,
  337. usage float64,
  338. ) (float64, error) {
  339. amount := s.calculateAmount(usage)
  340. if err := cacheDecreaseGroupBalance(ctx, s.group, amount.IntPart()); err != nil {
  341. log.Errorf("decrease group (%s) balance cache failed: %s", s.group, err)
  342. }
  343. if err := s.postConsume(ctx, amount.IntPart(), tokenName); err != nil {
  344. return 0, err
  345. }
  346. return amount.Div(decimalBalancePrecision).InexactFloat64(), nil
  347. }
  348. func (s *SealosPostGroupConsumer) calculateAmount(usage float64) decimal.Decimal {
  349. amount := decimal.NewFromFloat(usage).Mul(decimalBalancePrecision).Ceil()
  350. if amount.LessThan(minConsumeAmount) {
  351. amount = minConsumeAmount
  352. }
  353. return amount
  354. }
  355. func (s *SealosPostGroupConsumer) postConsume(
  356. ctx context.Context,
  357. amount int64,
  358. tokenName string,
  359. ) error {
  360. reqBody, err := sonic.Marshal(sealosPostGroupConsumeReq{
  361. Namespace: s.group,
  362. Amount: amount,
  363. AppType: appType,
  364. AppName: tokenName,
  365. UserUID: s.uid,
  366. })
  367. if err != nil {
  368. return err
  369. }
  370. req, err := http.NewRequestWithContext(ctx,
  371. http.MethodPost,
  372. s.accountURL+"/admin/v1alpha1/charge-billing",
  373. bytes.NewBuffer(reqBody))
  374. if err != nil {
  375. return err
  376. }
  377. req.Header.Set("Authorization", "Bearer "+jwtToken)
  378. resp, err := sealosHTTPClient.Do(req)
  379. if err != nil {
  380. return err
  381. }
  382. defer resp.Body.Close()
  383. var sealosResp sealosPostGroupConsumeResp
  384. if err := sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
  385. return err
  386. }
  387. if resp.StatusCode != http.StatusOK || sealosResp.Error != "" {
  388. return fmt.Errorf("status code: %d, error: %s", resp.StatusCode, sealosResp.Error)
  389. }
  390. return nil
  391. }