batch.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. package model
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/labring/aiproxy/core/common/config"
  7. "github.com/labring/aiproxy/core/common/notify"
  8. "github.com/shopspring/decimal"
  9. )
  10. type batchUpdateData struct {
  11. Groups map[string]*GroupUpdate
  12. Tokens map[int]*TokenUpdate
  13. Channels map[int]*ChannelUpdate
  14. Summaries map[SummaryUnique]*SummaryUpdate
  15. GroupSummaries map[GroupSummaryUnique]*GroupSummaryUpdate
  16. SummariesMinute map[SummaryMinuteUnique]*SummaryMinuteUpdate
  17. GroupSummariesMinute map[GroupSummaryMinuteUnique]*GroupSummaryMinuteUpdate
  18. sync.Mutex
  19. }
  20. func (b *batchUpdateData) IsClean() bool {
  21. b.Lock()
  22. defer b.Unlock()
  23. return b.isCleanLocked()
  24. }
  25. func (b *batchUpdateData) isCleanLocked() bool {
  26. return len(b.Groups) == 0 &&
  27. len(b.Tokens) == 0 &&
  28. len(b.Channels) == 0 &&
  29. len(b.Summaries) == 0 &&
  30. len(b.GroupSummaries) == 0 &&
  31. len(b.SummariesMinute) == 0 &&
  32. len(b.GroupSummariesMinute) == 0
  33. }
  34. type GroupUpdate struct {
  35. Amount decimal.Decimal
  36. Count int
  37. }
  38. type TokenUpdate struct {
  39. Amount decimal.Decimal
  40. Count int
  41. }
  42. type ChannelUpdate struct {
  43. Amount decimal.Decimal
  44. Count int
  45. RetryCount int
  46. }
  47. type SummaryUpdate struct {
  48. SummaryUnique
  49. SummaryData
  50. }
  51. type SummaryMinuteUpdate struct {
  52. SummaryMinuteUnique
  53. SummaryData
  54. }
  55. type GroupSummaryUpdate struct {
  56. GroupSummaryUnique
  57. SummaryData
  58. }
  59. type GroupSummaryMinuteUpdate struct {
  60. GroupSummaryMinuteUnique
  61. SummaryData
  62. }
  63. var batchData batchUpdateData
  64. func init() {
  65. batchData = batchUpdateData{
  66. Groups: make(map[string]*GroupUpdate),
  67. Tokens: make(map[int]*TokenUpdate),
  68. Channels: make(map[int]*ChannelUpdate),
  69. Summaries: make(map[SummaryUnique]*SummaryUpdate),
  70. GroupSummaries: make(map[GroupSummaryUnique]*GroupSummaryUpdate),
  71. SummariesMinute: make(map[SummaryMinuteUnique]*SummaryMinuteUpdate),
  72. GroupSummariesMinute: make(map[GroupSummaryMinuteUnique]*GroupSummaryMinuteUpdate),
  73. }
  74. }
  75. func StartBatchProcessorSummary(ctx context.Context, wg *sync.WaitGroup) {
  76. defer wg.Done()
  77. ticker := time.NewTicker(5 * time.Second)
  78. defer ticker.Stop()
  79. for {
  80. select {
  81. case <-ctx.Done():
  82. ProcessBatchUpdatesSummary()
  83. return
  84. case <-ticker.C:
  85. ProcessBatchUpdatesSummary()
  86. }
  87. }
  88. }
  89. func CleanBatchUpdatesSummary(ctx context.Context) {
  90. for {
  91. select {
  92. case <-ctx.Done():
  93. ProcessBatchUpdatesSummary()
  94. return
  95. default:
  96. if batchData.IsClean() {
  97. return
  98. }
  99. }
  100. ProcessBatchUpdatesSummary()
  101. time.Sleep(time.Second * 1)
  102. }
  103. }
  104. func ProcessBatchUpdatesSummary() {
  105. batchData.Lock()
  106. defer batchData.Unlock()
  107. var wg sync.WaitGroup
  108. wg.Add(1)
  109. go processGroupUpdates(&wg)
  110. wg.Add(1)
  111. go processTokenUpdates(&wg)
  112. wg.Add(1)
  113. go processChannelUpdates(&wg)
  114. wg.Add(1)
  115. go processGroupSummaryUpdates(&wg)
  116. wg.Add(1)
  117. go processSummaryUpdates(&wg)
  118. wg.Add(1)
  119. go processSummaryMinuteUpdates(&wg)
  120. wg.Add(1)
  121. go processGroupSummaryMinuteUpdates(&wg)
  122. wg.Wait()
  123. }
  124. func processGroupUpdates(wg *sync.WaitGroup) {
  125. defer wg.Done()
  126. for groupID, data := range batchData.Groups {
  127. err := UpdateGroupUsedAmountAndRequestCount(
  128. groupID,
  129. data.Amount.InexactFloat64(),
  130. data.Count,
  131. )
  132. if IgnoreNotFound(err) != nil {
  133. notify.ErrorThrottle(
  134. "batchUpdateGroupUsedAmountAndRequestCount",
  135. time.Minute,
  136. "failed to batch update group",
  137. err.Error(),
  138. )
  139. } else {
  140. delete(batchData.Groups, groupID)
  141. }
  142. }
  143. }
  144. func processTokenUpdates(wg *sync.WaitGroup) {
  145. defer wg.Done()
  146. for tokenID, data := range batchData.Tokens {
  147. err := UpdateTokenUsedAmount(tokenID, data.Amount.InexactFloat64(), data.Count)
  148. if IgnoreNotFound(err) != nil {
  149. notify.ErrorThrottle(
  150. "batchUpdateTokenUsedAmount",
  151. time.Minute,
  152. "failed to batch update token",
  153. err.Error(),
  154. )
  155. } else {
  156. delete(batchData.Tokens, tokenID)
  157. }
  158. }
  159. }
  160. func processChannelUpdates(wg *sync.WaitGroup) {
  161. defer wg.Done()
  162. for channelID, data := range batchData.Channels {
  163. err := UpdateChannelUsedAmount(
  164. channelID,
  165. data.Amount.InexactFloat64(),
  166. data.Count,
  167. data.RetryCount,
  168. )
  169. if IgnoreNotFound(err) != nil {
  170. notify.ErrorThrottle(
  171. "batchUpdateChannelUsedAmount",
  172. time.Minute,
  173. "failed to batch update channel",
  174. err.Error(),
  175. )
  176. } else {
  177. delete(batchData.Channels, channelID)
  178. }
  179. }
  180. }
  181. func processGroupSummaryUpdates(wg *sync.WaitGroup) {
  182. defer wg.Done()
  183. for key, data := range batchData.GroupSummaries {
  184. err := UpsertGroupSummary(data.GroupSummaryUnique, data.SummaryData)
  185. if err != nil {
  186. notify.ErrorThrottle(
  187. "batchUpdateGroupSummary",
  188. time.Minute,
  189. "failed to batch update group summary",
  190. err.Error(),
  191. )
  192. } else {
  193. delete(batchData.GroupSummaries, key)
  194. }
  195. }
  196. }
  197. func processGroupSummaryMinuteUpdates(wg *sync.WaitGroup) {
  198. defer wg.Done()
  199. for key, data := range batchData.GroupSummariesMinute {
  200. err := UpsertGroupSummaryMinute(data.GroupSummaryMinuteUnique, data.SummaryData)
  201. if err != nil {
  202. notify.ErrorThrottle(
  203. "batchUpdateGroupSummary",
  204. time.Minute,
  205. "failed to batch update group summary",
  206. err.Error(),
  207. )
  208. } else {
  209. delete(batchData.GroupSummariesMinute, key)
  210. }
  211. }
  212. }
  213. func processSummaryUpdates(wg *sync.WaitGroup) {
  214. defer wg.Done()
  215. for key, data := range batchData.Summaries {
  216. err := UpsertSummary(data.SummaryUnique, data.SummaryData)
  217. if err != nil {
  218. notify.ErrorThrottle(
  219. "batchUpdateSummary",
  220. time.Minute,
  221. "failed to batch update summary",
  222. err.Error(),
  223. )
  224. } else {
  225. delete(batchData.Summaries, key)
  226. }
  227. }
  228. }
  229. func processSummaryMinuteUpdates(wg *sync.WaitGroup) {
  230. defer wg.Done()
  231. for key, data := range batchData.SummariesMinute {
  232. err := UpsertSummaryMinute(data.SummaryMinuteUnique, data.SummaryData)
  233. if err != nil {
  234. notify.ErrorThrottle(
  235. "batchUpdateSummaryMinute",
  236. time.Minute,
  237. "failed to batch update summary minute",
  238. err.Error(),
  239. )
  240. } else {
  241. delete(batchData.SummariesMinute, key)
  242. }
  243. }
  244. }
  245. func BatchRecordLogs(
  246. now time.Time,
  247. requestID string,
  248. requestAt time.Time,
  249. retryAt time.Time,
  250. firstByteAt time.Time,
  251. group string,
  252. code int,
  253. channelID int,
  254. modelName string,
  255. tokenID int,
  256. tokenName string,
  257. endpoint string,
  258. content string,
  259. mode int,
  260. ip string,
  261. retryTimes int,
  262. requestDetail *RequestDetail,
  263. downstreamResult bool,
  264. usage Usage,
  265. modelPrice Price,
  266. amount float64,
  267. user string,
  268. metadata map[string]string,
  269. ) (err error) {
  270. if now.IsZero() {
  271. now = time.Now()
  272. }
  273. if downstreamResult {
  274. if config.GetLogStorageHours() >= 0 {
  275. err = RecordConsumeLog(
  276. requestID,
  277. now,
  278. requestAt,
  279. retryAt,
  280. firstByteAt,
  281. group,
  282. code,
  283. channelID,
  284. modelName,
  285. tokenID,
  286. tokenName,
  287. endpoint,
  288. content,
  289. mode,
  290. ip,
  291. retryTimes,
  292. requestDetail,
  293. usage,
  294. modelPrice,
  295. amount,
  296. user,
  297. metadata,
  298. )
  299. }
  300. } else {
  301. if config.GetRetryLogStorageHours() >= 0 {
  302. err = RecordRetryLog(
  303. requestID,
  304. now,
  305. requestAt,
  306. retryAt,
  307. firstByteAt,
  308. code,
  309. channelID,
  310. modelName,
  311. mode,
  312. retryTimes,
  313. requestDetail,
  314. )
  315. }
  316. }
  317. amountDecimal := decimal.NewFromFloat(amount)
  318. batchData.Lock()
  319. defer batchData.Unlock()
  320. updateChannelData(channelID, amount, amountDecimal, !downstreamResult)
  321. if channelID != 0 {
  322. updateSummaryData(
  323. channelID,
  324. modelName,
  325. now,
  326. requestAt,
  327. firstByteAt,
  328. code,
  329. amountDecimal,
  330. usage,
  331. !downstreamResult,
  332. )
  333. updateSummaryDataMinute(
  334. channelID,
  335. modelName,
  336. now,
  337. requestAt,
  338. firstByteAt,
  339. code,
  340. amountDecimal,
  341. usage,
  342. !downstreamResult,
  343. )
  344. }
  345. // group related data only records downstream result
  346. if !downstreamResult {
  347. return err
  348. }
  349. updateGroupData(group, amount, amountDecimal)
  350. updateTokenData(tokenID, amount, amountDecimal)
  351. if group != "" {
  352. updateGroupSummaryData(
  353. group,
  354. tokenName,
  355. modelName,
  356. now,
  357. requestAt,
  358. firstByteAt,
  359. code,
  360. amountDecimal,
  361. usage,
  362. )
  363. updateGroupSummaryDataMinute(
  364. group,
  365. tokenName,
  366. modelName,
  367. now,
  368. requestAt,
  369. firstByteAt,
  370. code,
  371. amountDecimal,
  372. usage,
  373. )
  374. }
  375. return err
  376. }
  377. func updateChannelData(
  378. channelID int,
  379. amount float64,
  380. amountDecimal decimal.Decimal,
  381. isRetry bool,
  382. ) {
  383. if channelID <= 0 {
  384. return
  385. }
  386. if _, ok := batchData.Channels[channelID]; !ok {
  387. batchData.Channels[channelID] = &ChannelUpdate{}
  388. }
  389. if amount > 0 {
  390. batchData.Channels[channelID].Amount = amountDecimal.
  391. Add(batchData.Channels[channelID].Amount)
  392. }
  393. batchData.Channels[channelID].Count++
  394. if isRetry {
  395. batchData.Channels[channelID].RetryCount++
  396. }
  397. }
  398. func updateGroupData(group string, amount float64, amountDecimal decimal.Decimal) {
  399. if group == "" {
  400. return
  401. }
  402. if _, ok := batchData.Groups[group]; !ok {
  403. batchData.Groups[group] = &GroupUpdate{}
  404. }
  405. if amount > 0 {
  406. batchData.Groups[group].Amount = amountDecimal.
  407. Add(batchData.Groups[group].Amount)
  408. }
  409. batchData.Groups[group].Count++
  410. }
  411. func updateTokenData(tokenID int, amount float64, amountDecimal decimal.Decimal) {
  412. if tokenID <= 0 {
  413. return
  414. }
  415. if _, ok := batchData.Tokens[tokenID]; !ok {
  416. batchData.Tokens[tokenID] = &TokenUpdate{}
  417. }
  418. if amount > 0 {
  419. batchData.Tokens[tokenID].Amount = amountDecimal.
  420. Add(batchData.Tokens[tokenID].Amount)
  421. }
  422. batchData.Tokens[tokenID].Count++
  423. }
  424. func updateGroupSummaryData(
  425. group, tokenName, modelName string,
  426. createAt time.Time,
  427. requestAt time.Time,
  428. firstByteAt time.Time,
  429. code int,
  430. amountDecimal decimal.Decimal,
  431. usage Usage,
  432. ) {
  433. if createAt.IsZero() {
  434. createAt = time.Now()
  435. }
  436. if requestAt.IsZero() {
  437. requestAt = createAt
  438. }
  439. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  440. firstByteAt = requestAt
  441. }
  442. groupUnique := GroupSummaryUnique{
  443. GroupID: group,
  444. TokenName: tokenName,
  445. Model: modelName,
  446. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  447. }
  448. groupSummary, ok := batchData.GroupSummaries[groupUnique]
  449. if !ok {
  450. groupSummary = &GroupSummaryUpdate{
  451. GroupSummaryUnique: groupUnique,
  452. }
  453. batchData.GroupSummaries[groupUnique] = groupSummary
  454. }
  455. groupSummary.UsedAmount = amountDecimal.
  456. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  457. InexactFloat64()
  458. groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  459. groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  460. groupSummary.Usage.Add(usage)
  461. groupSummary.AddRequest(code, false)
  462. }
  463. func updateGroupSummaryDataMinute(
  464. group, tokenName, modelName string,
  465. createAt time.Time,
  466. requestAt time.Time,
  467. firstByteAt time.Time,
  468. code int,
  469. amountDecimal decimal.Decimal,
  470. usage Usage,
  471. ) {
  472. if createAt.IsZero() {
  473. createAt = time.Now()
  474. }
  475. if requestAt.IsZero() {
  476. requestAt = createAt
  477. }
  478. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  479. firstByteAt = requestAt
  480. }
  481. groupUnique := GroupSummaryMinuteUnique{
  482. GroupID: group,
  483. TokenName: tokenName,
  484. Model: modelName,
  485. MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
  486. }
  487. groupSummary, ok := batchData.GroupSummariesMinute[groupUnique]
  488. if !ok {
  489. groupSummary = &GroupSummaryMinuteUpdate{
  490. GroupSummaryMinuteUnique: groupUnique,
  491. }
  492. batchData.GroupSummariesMinute[groupUnique] = groupSummary
  493. }
  494. groupSummary.UsedAmount = amountDecimal.
  495. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  496. InexactFloat64()
  497. groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  498. groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  499. groupSummary.Usage.Add(usage)
  500. groupSummary.AddRequest(code, false)
  501. }
  502. func updateSummaryData(
  503. channelID int,
  504. modelName string,
  505. createAt time.Time,
  506. requestAt time.Time,
  507. firstByteAt time.Time,
  508. code int,
  509. amountDecimal decimal.Decimal,
  510. usage Usage,
  511. isRetry bool,
  512. ) {
  513. if createAt.IsZero() {
  514. createAt = time.Now()
  515. }
  516. if requestAt.IsZero() {
  517. requestAt = createAt
  518. }
  519. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  520. firstByteAt = requestAt
  521. }
  522. summaryUnique := SummaryUnique{
  523. ChannelID: channelID,
  524. Model: modelName,
  525. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  526. }
  527. summary, ok := batchData.Summaries[summaryUnique]
  528. if !ok {
  529. summary = &SummaryUpdate{
  530. SummaryUnique: summaryUnique,
  531. }
  532. batchData.Summaries[summaryUnique] = summary
  533. }
  534. summary.UsedAmount = amountDecimal.
  535. Add(decimal.NewFromFloat(summary.UsedAmount)).
  536. InexactFloat64()
  537. summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  538. summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  539. summary.Usage.Add(usage)
  540. summary.AddRequest(code, isRetry)
  541. }
  542. func updateSummaryDataMinute(
  543. channelID int,
  544. modelName string,
  545. createAt time.Time,
  546. requestAt time.Time,
  547. firstByteAt time.Time,
  548. code int,
  549. amountDecimal decimal.Decimal,
  550. usage Usage,
  551. isRetry bool,
  552. ) {
  553. if createAt.IsZero() {
  554. createAt = time.Now()
  555. }
  556. if requestAt.IsZero() {
  557. requestAt = createAt
  558. }
  559. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  560. firstByteAt = requestAt
  561. }
  562. summaryUnique := SummaryMinuteUnique{
  563. ChannelID: channelID,
  564. Model: modelName,
  565. MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
  566. }
  567. summary, ok := batchData.SummariesMinute[summaryUnique]
  568. if !ok {
  569. summary = &SummaryMinuteUpdate{
  570. SummaryMinuteUnique: summaryUnique,
  571. }
  572. batchData.SummariesMinute[summaryUnique] = summary
  573. }
  574. summary.UsedAmount = amountDecimal.
  575. Add(decimal.NewFromFloat(summary.UsedAmount)).
  576. InexactFloat64()
  577. summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  578. summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  579. summary.Usage.Add(usage)
  580. summary.AddRequest(code, isRetry)
  581. }