channel-billing.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package controller
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/labring/aiproxy/core/common/notify"
  11. "github.com/labring/aiproxy/core/middleware"
  12. "github.com/labring/aiproxy/core/model"
  13. "github.com/labring/aiproxy/core/relay/adaptor"
  14. "github.com/labring/aiproxy/core/relay/adaptors"
  15. )
  16. func updateChannelBalance(channel *model.Channel) (float64, error) {
  17. adaptorI, ok := adaptors.GetAdaptor(channel.Type)
  18. if !ok {
  19. return 0, fmt.Errorf(
  20. "invalid channel type: %d, channel: %s (id: %d)",
  21. channel.Type,
  22. channel.Name,
  23. channel.ID,
  24. )
  25. }
  26. if getBalance, ok := adaptorI.(adaptor.Balancer); ok {
  27. balance, err := getBalance.GetBalance(channel)
  28. if err != nil && !errors.Is(err, adaptor.ErrGetBalanceNotImplemented) {
  29. return 0, fmt.Errorf(
  30. "failed to get channel %s (type: %d, id: %d) balance: %s",
  31. channel.Name,
  32. channel.Type,
  33. channel.ID,
  34. err.Error(),
  35. )
  36. }
  37. if err := channel.UpdateBalance(balance); err != nil {
  38. return 0, fmt.Errorf(
  39. "failed to update channel %s (type: %d, id: %d) balance: %s",
  40. channel.Name,
  41. channel.Type,
  42. channel.ID,
  43. err.Error(),
  44. )
  45. }
  46. if !errors.Is(err, adaptor.ErrGetBalanceNotImplemented) &&
  47. balance < channel.GetBalanceThreshold() {
  48. return 0, fmt.Errorf(
  49. "channel %s (type: %d, id: %d) balance is less than threshold: %f",
  50. channel.Name,
  51. channel.Type,
  52. channel.ID,
  53. balance,
  54. )
  55. }
  56. return balance, nil
  57. }
  58. return 0, nil
  59. }
  60. // UpdateChannelBalance godoc
  61. //
  62. // @Summary Update channel balance
  63. // @Description Updates the balance for a single channel
  64. // @Tags channel
  65. // @Produce json
  66. // @Security ApiKeyAuth
  67. // @Param id path int true "Channel ID"
  68. // @Success 200 {object} middleware.APIResponse{data=float64}
  69. // @Router /api/channel/{id}/balance [get]
  70. func UpdateChannelBalance(c *gin.Context) {
  71. id, err := strconv.Atoi(c.Param("id"))
  72. if err != nil {
  73. c.JSON(http.StatusOK, middleware.APIResponse{
  74. Success: false,
  75. Message: err.Error(),
  76. })
  77. return
  78. }
  79. channel, err := model.GetChannelByID(id)
  80. if err != nil {
  81. c.JSON(http.StatusOK, middleware.APIResponse{
  82. Success: false,
  83. Message: err.Error(),
  84. })
  85. return
  86. }
  87. balance, err := updateChannelBalance(channel)
  88. if err != nil {
  89. notify.Error(
  90. fmt.Sprintf(
  91. "check channel %s (type: %d, id: %d) balance error",
  92. channel.Name,
  93. channel.Type,
  94. channel.ID,
  95. ),
  96. err.Error(),
  97. )
  98. c.JSON(http.StatusOK, middleware.APIResponse{
  99. Success: false,
  100. Message: err.Error(),
  101. })
  102. return
  103. }
  104. middleware.SuccessResponse(c, balance)
  105. }
  106. func updateAllChannelsBalance() error {
  107. channels, err := model.GetAllChannels()
  108. if err != nil {
  109. return err
  110. }
  111. var wg sync.WaitGroup
  112. semaphore := make(chan struct{}, 10)
  113. for _, channel := range channels {
  114. if !channel.EnabledAutoBalanceCheck {
  115. continue
  116. }
  117. wg.Add(1)
  118. semaphore <- struct{}{}
  119. go func(ch *model.Channel) {
  120. defer wg.Done()
  121. defer func() { <-semaphore }()
  122. _, err := updateChannelBalance(ch)
  123. if err != nil {
  124. notify.Error(
  125. fmt.Sprintf(
  126. "check channel %s (type: %d, id: %d) balance error",
  127. ch.Name,
  128. ch.Type,
  129. ch.ID,
  130. ),
  131. err.Error(),
  132. )
  133. }
  134. }(channel)
  135. }
  136. wg.Wait()
  137. return nil
  138. }
  139. // UpdateAllChannelsBalance godoc
  140. //
  141. // @Summary Update all channels balance
  142. // @Description Updates the balance for all channels
  143. // @Tags channel
  144. // @Produce json
  145. // @Security ApiKeyAuth
  146. // @Success 200 {object} middleware.APIResponse
  147. // @Router /api/channels/balance [get]
  148. func UpdateAllChannelsBalance(c *gin.Context) {
  149. err := updateAllChannelsBalance()
  150. if err != nil {
  151. middleware.ErrorResponse(c, http.StatusInternalServerError, err.Error())
  152. return
  153. }
  154. middleware.SuccessResponse(c, nil)
  155. }
  156. func UpdateChannelsBalance(frequency time.Duration) {
  157. for {
  158. time.Sleep(frequency)
  159. _ = updateAllChannelsBalance()
  160. }
  161. }