batch.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package model
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/labring/aiproxy/core/common/config"
  9. "github.com/labring/aiproxy/core/common/notify"
  10. "github.com/shopspring/decimal"
  11. )
  12. type batchUpdateData struct {
  13. Groups map[string]*GroupUpdate
  14. Tokens map[int]*TokenUpdate
  15. Channels map[int]*ChannelUpdate
  16. Summaries map[string]*SummaryUpdate
  17. GroupSummaries map[string]*GroupSummaryUpdate
  18. sync.Mutex
  19. }
  20. func (b *batchUpdateData) IsClean() bool {
  21. b.Lock()
  22. defer b.Unlock()
  23. return b.isCleanLocked()
  24. }
  25. func (b *batchUpdateData) isCleanLocked() bool {
  26. return len(b.Groups) == 0 &&
  27. len(b.Tokens) == 0 &&
  28. len(b.Channels) == 0 &&
  29. len(b.Summaries) == 0 &&
  30. len(b.GroupSummaries) == 0
  31. }
  32. type GroupUpdate struct {
  33. Amount decimal.Decimal
  34. Count int
  35. }
  36. type TokenUpdate struct {
  37. Amount decimal.Decimal
  38. Count int
  39. }
  40. type ChannelUpdate struct {
  41. Amount decimal.Decimal
  42. Count int
  43. }
  44. type SummaryUpdate struct {
  45. SummaryUnique
  46. SummaryData
  47. }
  48. func summaryUniqueKey(unique SummaryUnique) string {
  49. return fmt.Sprintf("%d:%s:%d", unique.ChannelID, unique.Model, unique.HourTimestamp)
  50. }
  51. type GroupSummaryUpdate struct {
  52. GroupSummaryUnique
  53. SummaryData
  54. }
  55. func groupSummaryUniqueKey(unique GroupSummaryUnique) string {
  56. return fmt.Sprintf(
  57. "%s:%s:%s:%d",
  58. unique.GroupID,
  59. unique.TokenName,
  60. unique.Model,
  61. unique.HourTimestamp,
  62. )
  63. }
  64. var batchData batchUpdateData
  65. func init() {
  66. batchData = batchUpdateData{
  67. Groups: make(map[string]*GroupUpdate),
  68. Tokens: make(map[int]*TokenUpdate),
  69. Channels: make(map[int]*ChannelUpdate),
  70. Summaries: make(map[string]*SummaryUpdate),
  71. GroupSummaries: make(map[string]*GroupSummaryUpdate),
  72. }
  73. }
  74. func StartBatchProcessorSummary(ctx context.Context, wg *sync.WaitGroup) {
  75. defer wg.Done()
  76. ticker := time.NewTicker(5 * time.Second)
  77. defer ticker.Stop()
  78. for {
  79. select {
  80. case <-ctx.Done():
  81. ProcessBatchUpdatesSummary()
  82. return
  83. case <-ticker.C:
  84. ProcessBatchUpdatesSummary()
  85. }
  86. }
  87. }
  88. func CleanBatchUpdatesSummary(ctx context.Context) {
  89. for {
  90. select {
  91. case <-ctx.Done():
  92. ProcessBatchUpdatesSummary()
  93. return
  94. default:
  95. if batchData.IsClean() {
  96. return
  97. }
  98. }
  99. ProcessBatchUpdatesSummary()
  100. time.Sleep(time.Second * 1)
  101. }
  102. }
  103. func ProcessBatchUpdatesSummary() {
  104. batchData.Lock()
  105. defer batchData.Unlock()
  106. var wg sync.WaitGroup
  107. wg.Add(1)
  108. go processGroupUpdates(&wg)
  109. wg.Add(1)
  110. go processTokenUpdates(&wg)
  111. wg.Add(1)
  112. go processChannelUpdates(&wg)
  113. wg.Add(1)
  114. go processGroupSummaryUpdates(&wg)
  115. wg.Add(1)
  116. go processSummaryUpdates(&wg)
  117. wg.Wait()
  118. }
  119. func processGroupUpdates(wg *sync.WaitGroup) {
  120. defer wg.Done()
  121. for groupID, data := range batchData.Groups {
  122. err := UpdateGroupUsedAmountAndRequestCount(
  123. groupID,
  124. data.Amount.InexactFloat64(),
  125. data.Count,
  126. )
  127. if IgnoreNotFound(err) != nil {
  128. notify.ErrorThrottle(
  129. "batchUpdateGroupUsedAmountAndRequestCount",
  130. time.Minute,
  131. "failed to batch update group",
  132. err.Error(),
  133. )
  134. } else {
  135. delete(batchData.Groups, groupID)
  136. }
  137. }
  138. }
  139. func processTokenUpdates(wg *sync.WaitGroup) {
  140. defer wg.Done()
  141. for tokenID, data := range batchData.Tokens {
  142. err := UpdateTokenUsedAmount(tokenID, data.Amount.InexactFloat64(), data.Count)
  143. if IgnoreNotFound(err) != nil {
  144. notify.ErrorThrottle(
  145. "batchUpdateTokenUsedAmount",
  146. time.Minute,
  147. "failed to batch update token",
  148. err.Error(),
  149. )
  150. } else {
  151. delete(batchData.Tokens, tokenID)
  152. }
  153. }
  154. }
  155. func processChannelUpdates(wg *sync.WaitGroup) {
  156. defer wg.Done()
  157. for channelID, data := range batchData.Channels {
  158. err := UpdateChannelUsedAmount(channelID, data.Amount.InexactFloat64(), data.Count)
  159. if IgnoreNotFound(err) != nil {
  160. notify.ErrorThrottle(
  161. "batchUpdateChannelUsedAmount",
  162. time.Minute,
  163. "failed to batch update channel",
  164. err.Error(),
  165. )
  166. } else {
  167. delete(batchData.Channels, channelID)
  168. }
  169. }
  170. }
  171. func processGroupSummaryUpdates(wg *sync.WaitGroup) {
  172. defer wg.Done()
  173. for key, data := range batchData.GroupSummaries {
  174. err := UpsertGroupSummary(data.GroupSummaryUnique, data.SummaryData)
  175. if err != nil {
  176. notify.ErrorThrottle(
  177. "batchUpdateGroupSummary",
  178. time.Minute,
  179. "failed to batch update group summary",
  180. err.Error(),
  181. )
  182. } else {
  183. delete(batchData.GroupSummaries, key)
  184. }
  185. }
  186. }
  187. func processSummaryUpdates(wg *sync.WaitGroup) {
  188. defer wg.Done()
  189. for key, data := range batchData.Summaries {
  190. err := UpsertSummary(data.SummaryUnique, data.SummaryData)
  191. if err != nil {
  192. notify.ErrorThrottle(
  193. "batchUpdateSummary",
  194. time.Minute,
  195. "failed to batch update summary",
  196. err.Error(),
  197. )
  198. } else {
  199. delete(batchData.Summaries, key)
  200. }
  201. }
  202. }
  203. type RequestRate struct {
  204. RPM int64
  205. RPS int64
  206. TPM int64
  207. TPS int64
  208. }
  209. func BatchRecordLogs(
  210. requestID string,
  211. requestAt time.Time,
  212. retryAt time.Time,
  213. firstByteAt time.Time,
  214. group string,
  215. code int,
  216. channelID int,
  217. modelName string,
  218. tokenID int,
  219. tokenName string,
  220. endpoint string,
  221. content string,
  222. mode int,
  223. ip string,
  224. retryTimes int,
  225. requestDetail *RequestDetail,
  226. downstreamResult bool,
  227. usage Usage,
  228. modelPrice Price,
  229. amount float64,
  230. user string,
  231. metadata map[string]string,
  232. channelModelRate RequestRate,
  233. groupModelTokenRate RequestRate,
  234. ) (err error) {
  235. now := time.Now()
  236. if downstreamResult {
  237. if config.GetLogStorageHours() >= 0 {
  238. err = RecordConsumeLog(
  239. requestID,
  240. now,
  241. requestAt,
  242. retryAt,
  243. firstByteAt,
  244. group,
  245. code,
  246. channelID,
  247. modelName,
  248. tokenID,
  249. tokenName,
  250. endpoint,
  251. content,
  252. mode,
  253. ip,
  254. retryTimes,
  255. requestDetail,
  256. usage,
  257. modelPrice,
  258. amount,
  259. user,
  260. metadata,
  261. )
  262. }
  263. } else {
  264. if config.GetRetryLogStorageHours() >= 0 {
  265. err = RecordRetryLog(
  266. requestID,
  267. now,
  268. requestAt,
  269. retryAt,
  270. firstByteAt,
  271. code,
  272. channelID,
  273. modelName,
  274. mode,
  275. retryTimes,
  276. requestDetail,
  277. )
  278. }
  279. }
  280. amountDecimal := decimal.NewFromFloat(amount)
  281. batchData.Lock()
  282. defer batchData.Unlock()
  283. updateChannelData(channelID, amount, amountDecimal)
  284. if !downstreamResult {
  285. return err
  286. }
  287. updateGroupData(group, amount, amountDecimal)
  288. updateTokenData(tokenID, amount, amountDecimal)
  289. if channelID != 0 {
  290. updateSummaryData(channelID, modelName, now, code, amountDecimal, usage, channelModelRate)
  291. }
  292. updateGroupSummaryData(
  293. group,
  294. tokenName,
  295. modelName,
  296. now,
  297. code,
  298. amountDecimal,
  299. usage,
  300. groupModelTokenRate,
  301. )
  302. return err
  303. }
  304. func updateChannelData(channelID int, amount float64, amountDecimal decimal.Decimal) {
  305. if channelID > 0 {
  306. if _, ok := batchData.Channels[channelID]; !ok {
  307. batchData.Channels[channelID] = &ChannelUpdate{}
  308. }
  309. if amount > 0 {
  310. batchData.Channels[channelID].Amount = amountDecimal.
  311. Add(batchData.Channels[channelID].Amount)
  312. }
  313. batchData.Channels[channelID].Count++
  314. }
  315. }
  316. func updateGroupData(group string, amount float64, amountDecimal decimal.Decimal) {
  317. if group != "" {
  318. if _, ok := batchData.Groups[group]; !ok {
  319. batchData.Groups[group] = &GroupUpdate{}
  320. }
  321. if amount > 0 {
  322. batchData.Groups[group].Amount = amountDecimal.
  323. Add(batchData.Groups[group].Amount)
  324. }
  325. batchData.Groups[group].Count++
  326. }
  327. }
  328. func updateTokenData(tokenID int, amount float64, amountDecimal decimal.Decimal) {
  329. if tokenID > 0 {
  330. if _, ok := batchData.Tokens[tokenID]; !ok {
  331. batchData.Tokens[tokenID] = &TokenUpdate{}
  332. }
  333. if amount > 0 {
  334. batchData.Tokens[tokenID].Amount = amountDecimal.
  335. Add(batchData.Tokens[tokenID].Amount)
  336. }
  337. batchData.Tokens[tokenID].Count++
  338. }
  339. }
  340. func updateGroupSummaryData(
  341. group, tokenName, modelName string,
  342. createAt time.Time,
  343. code int,
  344. amountDecimal decimal.Decimal,
  345. usage Usage,
  346. groupModelTokenRate RequestRate,
  347. ) {
  348. groupUnique := GroupSummaryUnique{
  349. GroupID: group,
  350. TokenName: tokenName,
  351. Model: modelName,
  352. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  353. }
  354. groupSummaryKey := groupSummaryUniqueKey(groupUnique)
  355. groupSummary, ok := batchData.GroupSummaries[groupSummaryKey]
  356. if !ok {
  357. groupSummary = &GroupSummaryUpdate{
  358. GroupSummaryUnique: groupUnique,
  359. }
  360. batchData.GroupSummaries[groupSummaryKey] = groupSummary
  361. }
  362. groupSummary.RequestCount++
  363. groupSummary.UsedAmount = amountDecimal.
  364. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  365. InexactFloat64()
  366. if groupModelTokenRate.RPM > groupSummary.MaxRPM {
  367. groupSummary.MaxRPM = groupModelTokenRate.RPM
  368. }
  369. if groupModelTokenRate.RPS > groupSummary.MaxRPS {
  370. groupSummary.MaxRPS = groupModelTokenRate.RPS
  371. }
  372. if groupModelTokenRate.TPM > groupSummary.MaxTPM {
  373. groupSummary.MaxTPM = groupModelTokenRate.TPM
  374. }
  375. if groupModelTokenRate.TPS > groupSummary.MaxTPS {
  376. groupSummary.MaxTPS = groupModelTokenRate.TPS
  377. }
  378. groupSummary.Usage.Add(usage)
  379. if code != http.StatusOK {
  380. groupSummary.ExceptionCount++
  381. }
  382. }
  383. func updateSummaryData(
  384. channelID int,
  385. modelName string,
  386. createAt time.Time,
  387. code int,
  388. amountDecimal decimal.Decimal,
  389. usage Usage,
  390. channelModelRate RequestRate,
  391. ) {
  392. summaryUnique := SummaryUnique{
  393. ChannelID: channelID,
  394. Model: modelName,
  395. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  396. }
  397. summaryKey := summaryUniqueKey(summaryUnique)
  398. summary, ok := batchData.Summaries[summaryKey]
  399. if !ok {
  400. summary = &SummaryUpdate{
  401. SummaryUnique: summaryUnique,
  402. }
  403. batchData.Summaries[summaryKey] = summary
  404. }
  405. summary.RequestCount++
  406. summary.UsedAmount = amountDecimal.
  407. Add(decimal.NewFromFloat(summary.UsedAmount)).
  408. InexactFloat64()
  409. if channelModelRate.RPM > summary.MaxRPM {
  410. summary.MaxRPM = channelModelRate.RPM
  411. }
  412. if channelModelRate.RPS > summary.MaxRPS {
  413. summary.MaxRPS = channelModelRate.RPS
  414. }
  415. if channelModelRate.TPM > summary.MaxTPM {
  416. summary.MaxTPM = channelModelRate.TPM
  417. }
  418. if channelModelRate.TPS > summary.MaxTPS {
  419. summary.MaxTPS = channelModelRate.TPS
  420. }
  421. summary.Usage.Add(usage)
  422. if code != http.StatusOK {
  423. summary.ExceptionCount++
  424. }
  425. }