group.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package monitor
  2. import (
  3. "context"
  4. "net/http"
  5. "strconv"
  6. "github.com/gin-gonic/gin"
  7. "github.com/labring/aiproxy/core/common"
  8. "github.com/labring/aiproxy/core/common/reqlimit"
  9. "github.com/labring/aiproxy/core/model"
  10. "github.com/labring/aiproxy/core/relay/adaptor"
  11. "github.com/labring/aiproxy/core/relay/meta"
  12. "github.com/labring/aiproxy/core/relay/plugin"
  13. "github.com/labring/aiproxy/core/relay/plugin/noop"
  14. )
  15. const (
  16. GroupModelTokenRPM = "group_model_token_rpm"
  17. GroupModelTokenRPS = "group_model_token_rps"
  18. GroupModelTokenTPM = "group_model_token_tpm"
  19. GroupModelTokenTPS = "group_model_token_tps"
  20. )
  21. var _ plugin.Plugin = (*GroupMonitor)(nil)
  22. type GroupMonitor struct {
  23. noop.Noop
  24. }
  25. func NewGroupMonitorPlugin() plugin.Plugin {
  26. return &GroupMonitor{}
  27. }
  28. func (m *GroupMonitor) DoResponse(
  29. meta *meta.Meta,
  30. store adaptor.Store,
  31. c *gin.Context,
  32. resp *http.Response,
  33. do adaptor.DoResponse,
  34. ) (model.Usage, adaptor.Error) {
  35. usage, relayErr := do.DoResponse(meta, store, c, resp)
  36. if usage.TotalTokens > 0 {
  37. count, overLimitCount, secondCount := reqlimit.PushGroupModelTokensRequest(
  38. context.Background(),
  39. meta.Group.ID,
  40. meta.OriginModel,
  41. meta.ModelConfig.TPM,
  42. int64(usage.TotalTokens),
  43. )
  44. UpdateGroupModelTokensRequest(c, meta.Group, count+overLimitCount, secondCount)
  45. count, overLimitCount, secondCount = reqlimit.PushGroupModelTokennameTokensRequest(
  46. context.Background(),
  47. meta.Group.ID,
  48. meta.OriginModel,
  49. meta.Token.Name,
  50. int64(usage.TotalTokens),
  51. )
  52. UpdateGroupModelTokennameTokensRequest(c, count+overLimitCount, secondCount)
  53. }
  54. return usage, relayErr
  55. }
  56. func UpdateGroupModelRequest(c *gin.Context, group model.GroupCache, rpm, rps int64) {
  57. if group.Status == model.GroupStatusInternal {
  58. return
  59. }
  60. log := common.GetLogger(c)
  61. log.Data["group_rpm"] = strconv.FormatInt(rpm, 10)
  62. log.Data["group_rps"] = strconv.FormatInt(rps, 10)
  63. }
  64. func UpdateGroupModelTokensRequest(c *gin.Context, group model.GroupCache, tpm, tps int64) {
  65. if group.Status == model.GroupStatusInternal {
  66. return
  67. }
  68. log := common.GetLogger(c)
  69. log.Data["group_tpm"] = strconv.FormatInt(tpm, 10)
  70. log.Data["group_tps"] = strconv.FormatInt(tps, 10)
  71. }
  72. func UpdateGroupModelTokennameRequest(c *gin.Context, rpm, rps int64) {
  73. c.Set(GroupModelTokenRPM, rpm)
  74. c.Set(GroupModelTokenRPS, rps)
  75. // log := common.GetLogger(c)
  76. // log.Data["rpm"] = strconv.FormatInt(rpm, 10)
  77. // log.Data["rps"] = strconv.FormatInt(rps, 10)
  78. }
  79. func UpdateGroupModelTokennameTokensRequest(c *gin.Context, tpm, tps int64) {
  80. c.Set(GroupModelTokenTPM, tpm)
  81. c.Set(GroupModelTokenTPS, tps)
  82. // log := common.GetLogger(c)
  83. // log.Data["tpm"] = strconv.FormatInt(tpm, 10)
  84. // log.Data["tps"] = strconv.FormatInt(tps, 10)
  85. }
  86. func GetGroupModelTokenRPM(c *gin.Context) int64 {
  87. return c.GetInt64(GroupModelTokenRPM)
  88. }
  89. func GetGroupModelTokenRPS(c *gin.Context) int64 {
  90. return c.GetInt64(GroupModelTokenRPS)
  91. }
  92. func GetGroupModelTokenTPM(c *gin.Context) int64 {
  93. return c.GetInt64(GroupModelTokenTPM)
  94. }
  95. func GetGroupModelTokenTPS(c *gin.Context) int64 {
  96. return c.GetInt64(GroupModelTokenTPS)
  97. }