| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476 |
- package balance
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "math/rand/v2"
- "net/http"
- "time"
- "github.com/bytedance/sonic"
- "github.com/golang-jwt/jwt/v5"
- "github.com/labring/aiproxy/core/common"
- "github.com/labring/aiproxy/core/common/conv"
- "github.com/labring/aiproxy/core/common/env"
- "github.com/labring/aiproxy/core/model"
- "github.com/redis/go-redis/v9"
- "github.com/shopspring/decimal"
- log "github.com/sirupsen/logrus"
- )
- const (
- defaultAccountURL = "http://account-service.account-system.svc.cluster.local:2333"
- balancePrecision = 1000000
- appType = "LLM-TOKEN"
- sealosRequester = "sealos-admin"
- sealosGroupBalanceKey = "sealos:balance:%s"
- sealosUserRealNameKey = "sealos:realName:%s"
- getBalanceRetry = 3
- )
- var (
- _ GroupBalance = (*Sealos)(nil)
- sealosHTTPClient = &http.Client{}
- decimalBalancePrecision = decimal.NewFromInt(balancePrecision)
- minConsumeAmount = decimal.NewFromInt(1)
- jwtToken string
- sealosRedisCacheEnable = env.Bool("BALANCE_SEALOS_REDIS_CACHE_ENABLE", true)
- sealosCacheExpire = 3 * time.Minute
- )
- var (
- sealosCheckRealNameEnable = env.Bool("BALANCE_SEALOS_CHECK_REAL_NAME_ENABLE", false)
- sealosNoRealNameUsedAmountLimit = env.Float64(
- "BALANCE_SEALOS_NO_REAL_NAME_USED_AMOUNT_LIMIT",
- 1,
- )
- )
- type Sealos struct {
- accountURL string
- }
- // FIXME: 如果获取余额能成功,但是消费永远失败,需要加一个失败次数限制,如果失败次数超过一定阈值,暂停服务
- func InitSealos(jwtKey, accountURL string) error {
- token, err := newSealosToken(jwtKey)
- if err != nil {
- return fmt.Errorf("failed to generate sealos jwt token: %w", err)
- }
- jwtToken = token
- Default = NewSealos(accountURL)
- return nil
- }
- func NewSealos(accountURL string) *Sealos {
- if accountURL == "" {
- accountURL = defaultAccountURL
- }
- return &Sealos{accountURL: accountURL}
- }
- type sealosClaims struct {
- Requester string `json:"requester"`
- jwt.RegisteredClaims
- }
- func newSealosToken(key string) (string, error) {
- claims := &sealosClaims{
- Requester: sealosRequester,
- RegisteredClaims: jwt.RegisteredClaims{
- NotBefore: jwt.NewNumericDate(time.Now()),
- },
- }
- return jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(conv.StringToBytes(key))
- }
- type sealosGetGroupBalanceResp struct {
- UserUID string `json:"userUID"`
- Error string `json:"error"`
- Balance int64 `json:"balance"`
- WorkspaceSubscription bool `json:"workspaceSubscription"`
- TotalAIQuota int64 `json:"totalAIQuota"`
- RemainAIQuota int64 `json:"remainAIQuota"`
- }
- type sealosPostGroupConsumeReq struct {
- Namespace string `json:"namespace"`
- AppType string `json:"appType"`
- AppName string `json:"appName"`
- UserUID string `json:"userUID"`
- Amount int64 `json:"amount"`
- }
- type sealosPostGroupConsumeResp struct {
- Error string `json:"error"`
- }
- type sealosCache struct {
- UserUID string `redis:"u"`
- Balance int64 `redis:"b"`
- }
- func cacheSetGroupBalance(ctx context.Context, group string, balance int64, userUID string) error {
- if !common.RedisEnabled || !sealosRedisCacheEnable {
- return nil
- }
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- pipe := common.RDB.Pipeline()
- pipe.HSet(ctx, common.RedisKeyf(sealosGroupBalanceKey, group), sealosCache{
- Balance: balance,
- UserUID: userUID,
- })
- expireTime := sealosCacheExpire + time.Duration(rand.Int64N(10)-5)*time.Second
- pipe.Expire(ctx, common.RedisKeyf(sealosGroupBalanceKey, group), expireTime)
- _, err := pipe.Exec(ctx)
- return err
- }
- func cacheGetGroupBalance(ctx context.Context, group string) (*sealosCache, error) {
- if !common.RedisEnabled || !sealosRedisCacheEnable {
- return nil, redis.Nil
- }
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- var cache sealosCache
- if err := common.RDB.HGetAll(ctx, common.RedisKeyf(sealosGroupBalanceKey, group)).Scan(&cache); err != nil {
- return nil, err
- }
- return &cache, nil
- }
- var decreaseGroupBalanceScript = redis.NewScript(`
- local balance = redis.call("HGet", KEYS[1], "b")
- if balance then
- redis.call("HSet", KEYS[1], "b", balance - ARGV[1])
- end
- return redis.status_reply("ok")
- `)
- func cacheDecreaseGroupBalance(ctx context.Context, group string, amount int64) error {
- if !common.RedisEnabled || !sealosRedisCacheEnable {
- return nil
- }
- return decreaseGroupBalanceScript.Run(ctx, common.RDB, []string{common.RedisKeyf(sealosGroupBalanceKey, group)}, amount).
- Err()
- }
- var ErrNoRealNameUsedAmountLimit = errors.New("达到未实名用户使用额度限制,请实名认证")
- func (s *Sealos) GetGroupRemainBalance(
- ctx context.Context,
- group model.GroupCache,
- ) (float64, PostGroupConsumer, error) {
- for i := 0; ; i++ {
- balance, userUID, err := s.getGroupRemainBalance(ctx, group.ID)
- if err == nil {
- if sealosCheckRealNameEnable &&
- group.UsedAmount > sealosNoRealNameUsedAmountLimit &&
- !s.checkRealName(ctx, userUID) {
- return 0, nil, ErrNoRealNameUsedAmountLimit
- }
- return decimal.NewFromInt(balance).Div(decimalBalancePrecision).InexactFloat64(),
- newSealosPostGroupConsumer(s.accountURL, group.ID, userUID), nil
- }
- if i == getBalanceRetry-1 {
- return 0, nil, err
- }
- time.Sleep(time.Second)
- }
- }
- func cacheGetUserRealName(ctx context.Context, userUID string) (bool, error) {
- if !common.RedisEnabled || !sealosRedisCacheEnable {
- return true, redis.Nil
- }
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- realName, err := common.RDB.Get(ctx, common.RedisKeyf(sealosUserRealNameKey, userUID)).Bool()
- if err != nil {
- return false, err
- }
- return realName, nil
- }
- func cacheSetUserRealName(ctx context.Context, userUID string, realName bool) error {
- if !common.RedisEnabled || !sealosRedisCacheEnable {
- return nil
- }
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- var expireTime time.Duration
- if realName {
- expireTime = time.Hour * 12
- } else {
- expireTime = time.Minute * 1
- }
- return common.RDB.Set(ctx, common.RedisKeyf(sealosUserRealNameKey, userUID), realName, expireTime).
- Err()
- }
- func (s *Sealos) checkRealName(ctx context.Context, userUID string) bool {
- if cache, err := cacheGetUserRealName(ctx, userUID); err == nil {
- return cache
- } else if !errors.Is(err, redis.Nil) {
- log.Errorf("get user (%s) real name cache failed: %s", userUID, err)
- }
- realName, err := s.fetchRealNameFromAPI(ctx, userUID)
- if err != nil {
- log.Errorf("fetch user (%s) real name failed: %s", userUID, err)
- return true
- }
- if err := cacheSetUserRealName(ctx, userUID, realName); err != nil {
- log.Errorf("set user (%s) real name cache failed: %s", userUID, err)
- }
- return realName
- }
- type sealosGetRealNameInfoResp struct {
- IsRealName bool `json:"isRealName"`
- Error string `json:"error"`
- }
- func (s *Sealos) fetchRealNameFromAPI(ctx context.Context, userUID string) (bool, error) {
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- req, err := http.NewRequestWithContext(ctx, http.MethodGet,
- fmt.Sprintf("%s/admin/v1alpha1/real-name-info?userUID=%s", s.accountURL, userUID), nil)
- if err != nil {
- return false, err
- }
- req.Header.Set("Authorization", "Bearer "+jwtToken)
- resp, err := sealosHTTPClient.Do(req)
- if err != nil {
- return false, err
- }
- defer resp.Body.Close()
- var sealosResp sealosGetRealNameInfoResp
- if err := sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
- return false, err
- }
- if resp.StatusCode != http.StatusOK || sealosResp.Error != "" {
- return false, fmt.Errorf(
- "get user (%s) real name failed with status code %d, error: %s",
- userUID,
- resp.StatusCode,
- sealosResp.Error,
- )
- }
- return sealosResp.IsRealName, nil
- }
- // GroupBalance interface implementation
- func (s *Sealos) getGroupRemainBalance(ctx context.Context, group string) (int64, string, error) {
- if cache, err := cacheGetGroupBalance(ctx, group); err == nil && cache.UserUID != "" {
- return cache.Balance, cache.UserUID, nil
- } else if err != nil && !errors.Is(err, redis.Nil) {
- log.Errorf("get group (%s) balance cache failed: %s", group, err)
- }
- balance, err := s.fetchBalanceFromAPI(ctx, group)
- if err != nil {
- return 0, "", err
- }
- if err := cacheSetGroupBalance(ctx, group, balance.balance, balance.userUID); err != nil {
- log.Errorf("set group (%s) balance cache failed: %s", group, err)
- }
- return balance.balance, balance.userUID, nil
- }
- type sealosBalanceResoult struct {
- quota int64
- balance int64
- userUID string
- }
- func (s *Sealos) fetchBalanceFromAPI(
- ctx context.Context,
- group string,
- ) (*sealosBalanceResoult, error) {
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- req, err := http.NewRequestWithContext(
- ctx,
- http.MethodGet,
- fmt.Sprintf(
- "%s/admin/v1alpha1/account-with-workspace?namespace=%s",
- s.accountURL,
- group,
- ),
- nil,
- )
- if err != nil {
- return nil, err
- }
- req.Header.Set("Authorization", "Bearer "+jwtToken)
- resp, err := sealosHTTPClient.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- var sealosResp sealosGetGroupBalanceResp
- if err := sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
- return nil, err
- }
- if sealosResp.Error != "" {
- return nil, errors.New(sealosResp.Error)
- }
- if resp.StatusCode != http.StatusOK {
- return nil, fmt.Errorf(
- "get group (%s) balance failed with status code %d",
- group,
- resp.StatusCode,
- )
- }
- if sealosResp.WorkspaceSubscription {
- return &sealosBalanceResoult{
- quota: sealosResp.TotalAIQuota,
- balance: sealosResp.RemainAIQuota,
- userUID: sealosResp.UserUID,
- }, nil
- }
- return &sealosBalanceResoult{
- quota: sealosResp.Balance,
- balance: sealosResp.Balance,
- userUID: sealosResp.UserUID,
- }, nil
- }
- // GetGroupQuota implements GroupBalance.
- func (s *Sealos) GetGroupQuota(ctx context.Context, group model.GroupCache) (*GroupQuota, error) {
- balance, err := s.fetchBalanceFromAPI(ctx, group.ID)
- if err != nil {
- return nil, err
- }
- return &GroupQuota{
- Total: decimal.NewFromInt(balance.quota).Div(decimalBalancePrecision).InexactFloat64(),
- Remain: decimal.NewFromInt(balance.balance).Div(decimalBalancePrecision).InexactFloat64(),
- }, nil
- }
- type SealosPostGroupConsumer struct {
- accountURL string
- group string
- uid string
- }
- func newSealosPostGroupConsumer(accountURL, group, uid string) *SealosPostGroupConsumer {
- return &SealosPostGroupConsumer{
- accountURL: accountURL,
- group: group,
- uid: uid,
- }
- }
- func (s *SealosPostGroupConsumer) PostGroupConsume(
- ctx context.Context,
- tokenName string,
- usage float64,
- ) (float64, error) {
- amount := s.calculateAmount(usage)
- if err := cacheDecreaseGroupBalance(ctx, s.group, amount.IntPart()); err != nil {
- log.Errorf("decrease group (%s) balance cache failed: %s", s.group, err)
- }
- if err := s.postConsume(ctx, amount.IntPart(), tokenName); err != nil {
- return 0, err
- }
- return amount.Div(decimalBalancePrecision).InexactFloat64(), nil
- }
- func (s *SealosPostGroupConsumer) calculateAmount(usage float64) decimal.Decimal {
- amount := decimal.NewFromFloat(usage).Mul(decimalBalancePrecision).Ceil()
- if amount.LessThan(minConsumeAmount) {
- amount = minConsumeAmount
- }
- return amount
- }
- func (s *SealosPostGroupConsumer) postConsume(
- ctx context.Context,
- amount int64,
- tokenName string,
- ) error {
- reqBody, err := sonic.Marshal(sealosPostGroupConsumeReq{
- Namespace: s.group,
- Amount: amount,
- AppType: appType,
- AppName: tokenName,
- UserUID: s.uid,
- })
- if err != nil {
- return err
- }
- req, err := http.NewRequestWithContext(ctx,
- http.MethodPost,
- s.accountURL+"/admin/v1alpha1/charge-billing",
- bytes.NewBuffer(reqBody))
- if err != nil {
- return err
- }
- req.Header.Set("Authorization", "Bearer "+jwtToken)
- resp, err := sealosHTTPClient.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- var sealosResp sealosPostGroupConsumeResp
- if err := sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
- return err
- }
- if resp.StatusCode != http.StatusOK || sealosResp.Error != "" {
- return fmt.Errorf("status code: %d, error: %s", resp.StatusCode, sealosResp.Error)
- }
- return nil
- }
|