monitor.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package monitor
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/labring/aiproxy/core/common"
  10. "github.com/labring/aiproxy/core/common/conv"
  11. "github.com/labring/aiproxy/core/common/notify"
  12. "github.com/labring/aiproxy/core/common/reqlimit"
  13. "github.com/labring/aiproxy/core/model"
  14. "github.com/labring/aiproxy/core/monitor"
  15. "github.com/labring/aiproxy/core/relay/adaptor"
  16. "github.com/labring/aiproxy/core/relay/meta"
  17. "github.com/labring/aiproxy/core/relay/plugin"
  18. "github.com/labring/aiproxy/core/relay/plugin/noop"
  19. )
  20. var _ plugin.Plugin = (*ChannelMonitor)(nil)
  21. type ChannelMonitor struct {
  22. noop.Noop
  23. }
  24. func NewChannelMonitorPlugin() plugin.Plugin {
  25. return &ChannelMonitor{}
  26. }
  27. var channelNoRetryStatusCodesMap = map[int]struct{}{
  28. http.StatusBadRequest: {},
  29. http.StatusRequestEntityTooLarge: {},
  30. http.StatusUnprocessableEntity: {},
  31. http.StatusUnavailableForLegalReasons: {},
  32. }
  33. func ShouldRetry(relayErr adaptor.Error) bool {
  34. _, ok := channelNoRetryStatusCodesMap[relayErr.StatusCode()]
  35. return !ok
  36. }
  37. var channelNoPermissionStatusCodesMap = map[int]struct{}{
  38. http.StatusUnauthorized: {},
  39. http.StatusPaymentRequired: {},
  40. http.StatusForbidden: {},
  41. http.StatusNotFound: {},
  42. }
  43. func ChannelHasPermission(relayErr adaptor.Error) bool {
  44. _, ok := channelNoPermissionStatusCodesMap[relayErr.StatusCode()]
  45. return !ok
  46. }
  47. func getRequestDuration(meta *meta.Meta) time.Duration {
  48. requestAt, ok := meta.Get("requestAt")
  49. if !ok {
  50. return 0
  51. }
  52. requestAtTime, ok := requestAt.(time.Time)
  53. if !ok {
  54. return 0
  55. }
  56. return common.TruncateDuration(time.Since(requestAtTime))
  57. }
  58. func (m *ChannelMonitor) DoRequest(
  59. meta *meta.Meta,
  60. store adaptor.Store,
  61. c *gin.Context,
  62. req *http.Request,
  63. do adaptor.DoRequest,
  64. ) (*http.Response, error) {
  65. count, overLimitCount, secondCount := reqlimit.PushChannelModelRequest(
  66. context.Background(),
  67. strconv.Itoa(meta.Channel.ID),
  68. meta.OriginModel,
  69. )
  70. updateChannelModelRequestRate(c, meta, count+overLimitCount, secondCount)
  71. requestAt := time.Now()
  72. meta.Set("requestAt", requestAt)
  73. resp, err := do.DoRequest(meta, store, c, req)
  74. requestCost := common.TruncateDuration(time.Since(requestAt))
  75. log := common.GetLogger(c)
  76. log.Data["req_cost"] = requestCost.String()
  77. if err == nil {
  78. return resp, nil
  79. }
  80. beyondThreshold, banExecution, _err := monitor.AddRequest(
  81. context.Background(),
  82. meta.OriginModel,
  83. int64(meta.Channel.ID),
  84. true,
  85. false,
  86. meta.ModelConfig.WarnErrorRate,
  87. meta.ModelConfig.MaxErrorRate,
  88. )
  89. if _err != nil {
  90. log.Errorf("add request failed: %+v", _err)
  91. }
  92. switch {
  93. case banExecution:
  94. notifyChannelRequestIssue(
  95. meta,
  96. "autoBanned",
  97. "Auto Banned",
  98. err,
  99. requestCost,
  100. )
  101. case beyondThreshold:
  102. notifyChannelRequestIssue(
  103. meta,
  104. "beyondThreshold",
  105. "Error Rate Beyond Threshold",
  106. err,
  107. requestCost,
  108. )
  109. }
  110. return resp, err
  111. }
  112. func notifyChannelRequestIssue(
  113. meta *meta.Meta,
  114. issueType, titleSuffix string,
  115. err error,
  116. requestCost time.Duration,
  117. ) {
  118. var notifyFunc func(title, message string)
  119. lockKey := fmt.Sprintf(
  120. "%s:%d:%s:%s",
  121. issueType,
  122. meta.Channel.ID,
  123. meta.OriginModel,
  124. issueType,
  125. )
  126. switch issueType {
  127. case "beyondThreshold":
  128. notifyFunc = func(title, message string) {
  129. notify.WarnThrottle(lockKey, time.Minute, title, message)
  130. }
  131. default:
  132. notifyFunc = func(title, message string) {
  133. notify.ErrorThrottle(lockKey, time.Minute, title, message)
  134. }
  135. }
  136. message := fmt.Sprintf(
  137. "channel: %s (type: %d, type name: %s, id: %d)\nmodel: %s\nmode: %s\nerror: %s\nrequest id: %s\ntime cost: %s",
  138. meta.Channel.Name,
  139. meta.Channel.Type,
  140. meta.Channel.Type.String(),
  141. meta.Channel.ID,
  142. meta.OriginModel,
  143. meta.Mode,
  144. err.Error(),
  145. meta.RequestID,
  146. requestCost.String(),
  147. )
  148. notifyFunc(
  149. fmt.Sprintf("%s `%s` %s", meta.Channel.Name, meta.OriginModel, titleSuffix),
  150. message,
  151. )
  152. }
  153. func (m *ChannelMonitor) DoResponse(
  154. meta *meta.Meta,
  155. store adaptor.Store,
  156. c *gin.Context,
  157. resp *http.Response,
  158. do adaptor.DoResponse,
  159. ) (model.Usage, adaptor.Error) {
  160. log := common.GetLogger(c)
  161. usage, relayErr := do.DoResponse(meta, store, c, resp)
  162. if usage.TotalTokens > 0 {
  163. count, overLimitCount, secondCount := reqlimit.PushChannelModelTokensRequest(
  164. context.Background(),
  165. strconv.Itoa(meta.Channel.ID),
  166. meta.OriginModel,
  167. int64(usage.TotalTokens),
  168. )
  169. updateChannelModelTokensRequestRate(c, meta, count+overLimitCount, secondCount)
  170. }
  171. if relayErr == nil {
  172. if _, _, err := monitor.AddRequest(
  173. context.Background(),
  174. meta.OriginModel,
  175. int64(meta.Channel.ID),
  176. false,
  177. false,
  178. meta.ModelConfig.WarnErrorRate,
  179. meta.ModelConfig.MaxErrorRate,
  180. ); err != nil {
  181. log.Errorf("add request failed: %+v", err)
  182. }
  183. return usage, nil
  184. }
  185. if !ShouldRetry(relayErr) {
  186. return usage, relayErr
  187. }
  188. hasPermission := ChannelHasPermission(relayErr)
  189. beyondThreshold, banExecution, err := monitor.AddRequest(
  190. context.Background(),
  191. meta.OriginModel,
  192. int64(meta.Channel.ID),
  193. true,
  194. !hasPermission,
  195. meta.ModelConfig.WarnErrorRate,
  196. meta.ModelConfig.MaxErrorRate,
  197. )
  198. if err != nil {
  199. log.Errorf("add request failed: %+v", err)
  200. }
  201. switch {
  202. case banExecution:
  203. notifyChannelResponseIssue(c, meta, "autoBanned", "Auto Banned", relayErr)
  204. case beyondThreshold:
  205. notifyChannelResponseIssue(
  206. c,
  207. meta,
  208. "beyondThreshold",
  209. "Error Rate Beyond Threshold",
  210. relayErr,
  211. )
  212. case !hasPermission:
  213. notifyChannelResponseIssue(c, meta, "channelHasPermission", "No Permission", relayErr)
  214. }
  215. return usage, relayErr
  216. }
  217. func notifyChannelResponseIssue(
  218. c *gin.Context,
  219. meta *meta.Meta,
  220. issueType, titleSuffix string,
  221. err adaptor.Error,
  222. ) {
  223. var notifyFunc func(title, message string)
  224. lockKey := fmt.Sprintf(
  225. "%s:%d:%s:%s:%d",
  226. issueType,
  227. meta.Channel.ID,
  228. meta.OriginModel,
  229. issueType,
  230. err.StatusCode(),
  231. )
  232. switch issueType {
  233. case "beyondThreshold", "requestRateLimitExceeded":
  234. notifyFunc = func(title, message string) {
  235. notify.WarnThrottle(lockKey, time.Minute, title, message)
  236. }
  237. default:
  238. notifyFunc = func(title, message string) {
  239. notify.ErrorThrottle(lockKey, time.Minute, title, message)
  240. }
  241. }
  242. respBody, _ := err.MarshalJSON()
  243. message := fmt.Sprintf(
  244. "channel: %s (type: %d, type name: %s, id: %d)\nmodel: %s\nmode: %s\nstatus code: %d\ndetail: %s\nrequest id: %s\ntime cost: %s",
  245. meta.Channel.Name,
  246. meta.Channel.Type,
  247. meta.Channel.Type.String(),
  248. meta.Channel.ID,
  249. meta.OriginModel,
  250. meta.Mode,
  251. err.StatusCode(),
  252. conv.BytesToString(respBody),
  253. meta.RequestID,
  254. getRequestDuration(meta).String(),
  255. )
  256. if err.StatusCode() == http.StatusTooManyRequests {
  257. rate := GetChannelModelRequestRate(c, meta)
  258. message += fmt.Sprintf(
  259. "\nrpm: %d\nrps: %d\ntpm: %d\ntps: %d",
  260. rate.RPM,
  261. rate.RPS,
  262. rate.TPM,
  263. rate.TPS,
  264. )
  265. }
  266. notifyFunc(
  267. fmt.Sprintf("%s `%s` %s", meta.Channel.Name, meta.OriginModel, titleSuffix),
  268. message,
  269. )
  270. }
  271. const (
  272. MetaChannelModelKeyRPM = "channel_model_rpm"
  273. MetaChannelModelKeyRPS = "channel_model_rps"
  274. MetaChannelModelKeyTPM = "channel_model_tpm"
  275. MetaChannelModelKeyTPS = "channel_model_tps"
  276. )
  277. type RequestRate struct {
  278. RPM int64
  279. RPS int64
  280. TPM int64
  281. TPS int64
  282. }
  283. func GetChannelModelRequestRate(c *gin.Context, meta *meta.Meta) RequestRate {
  284. rate := RequestRate{}
  285. if rpm, ok := meta.Get(MetaChannelModelKeyRPM); ok {
  286. rate.RPM, _ = rpm.(int64)
  287. rate.RPS = meta.GetInt64(MetaChannelModelKeyRPS)
  288. } else {
  289. rpm, rps := reqlimit.GetChannelModelRequest(context.Background(), strconv.Itoa(meta.Channel.ID), meta.OriginModel)
  290. rate.RPM = rpm
  291. rate.RPS = rps
  292. updateChannelModelRequestRate(c, meta, rpm, rps)
  293. }
  294. if tpm, ok := meta.Get(MetaChannelModelKeyTPM); ok {
  295. rate.TPM, _ = tpm.(int64)
  296. rate.TPS = meta.GetInt64(MetaChannelModelKeyTPS)
  297. } else {
  298. tpm, tps := reqlimit.GetChannelModelTokensRequest(context.Background(), strconv.Itoa(meta.Channel.ID), meta.OriginModel)
  299. rate.TPM = tpm
  300. rate.TPS = tps
  301. updateChannelModelTokensRequestRate(c, meta, tpm, tps)
  302. }
  303. return rate
  304. }
  305. func updateChannelModelRequestRate(c *gin.Context, meta *meta.Meta, rpm, rps int64) {
  306. meta.Set(MetaChannelModelKeyRPM, rpm)
  307. meta.Set(MetaChannelModelKeyRPS, rps)
  308. log := common.GetLogger(c)
  309. log.Data["ch_rpm"] = rpm
  310. log.Data["ch_rps"] = rps
  311. }
  312. func updateChannelModelTokensRequestRate(c *gin.Context, meta *meta.Meta, tpm, tps int64) {
  313. meta.Set(MetaChannelModelKeyTPM, tpm)
  314. meta.Set(MetaChannelModelKeyTPS, tps)
  315. log := common.GetLogger(c)
  316. log.Data["ch_tpm"] = tpm
  317. log.Data["ch_tps"] = tps
  318. }