2
0

batch.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. package model
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/labring/aiproxy/core/common/config"
  8. "github.com/labring/aiproxy/core/common/notify"
  9. "github.com/shopspring/decimal"
  10. )
  11. type batchUpdateData struct {
  12. Groups map[string]*GroupUpdate
  13. Tokens map[int]*TokenUpdate
  14. Channels map[int]*ChannelUpdate
  15. Summaries map[SummaryUnique]*SummaryUpdate
  16. GroupSummaries map[GroupSummaryUnique]*GroupSummaryUpdate
  17. SummariesMinute map[SummaryMinuteUnique]*SummaryMinuteUpdate
  18. GroupSummariesMinute map[GroupSummaryMinuteUnique]*GroupSummaryMinuteUpdate
  19. sync.Mutex
  20. }
  21. func (b *batchUpdateData) IsClean() bool {
  22. b.Lock()
  23. defer b.Unlock()
  24. return b.isCleanLocked()
  25. }
  26. func (b *batchUpdateData) isCleanLocked() bool {
  27. return len(b.Groups) == 0 &&
  28. len(b.Tokens) == 0 &&
  29. len(b.Channels) == 0 &&
  30. len(b.Summaries) == 0 &&
  31. len(b.GroupSummaries) == 0 &&
  32. len(b.SummariesMinute) == 0 &&
  33. len(b.GroupSummariesMinute) == 0
  34. }
  35. type GroupUpdate struct {
  36. Amount decimal.Decimal
  37. Count int
  38. }
  39. type TokenUpdate struct {
  40. Amount decimal.Decimal
  41. Count int
  42. }
  43. type ChannelUpdate struct {
  44. Amount decimal.Decimal
  45. Count int
  46. RetryCount int
  47. }
  48. type SummaryUpdate struct {
  49. SummaryUnique
  50. SummaryData
  51. }
  52. type SummaryMinuteUpdate struct {
  53. SummaryMinuteUnique
  54. SummaryData
  55. }
  56. type GroupSummaryUpdate struct {
  57. GroupSummaryUnique
  58. SummaryData
  59. }
  60. type GroupSummaryMinuteUpdate struct {
  61. GroupSummaryMinuteUnique
  62. SummaryData
  63. }
  64. var batchData batchUpdateData
  65. func init() {
  66. batchData = batchUpdateData{
  67. Groups: make(map[string]*GroupUpdate),
  68. Tokens: make(map[int]*TokenUpdate),
  69. Channels: make(map[int]*ChannelUpdate),
  70. Summaries: make(map[SummaryUnique]*SummaryUpdate),
  71. GroupSummaries: make(map[GroupSummaryUnique]*GroupSummaryUpdate),
  72. SummariesMinute: make(map[SummaryMinuteUnique]*SummaryMinuteUpdate),
  73. GroupSummariesMinute: make(map[GroupSummaryMinuteUnique]*GroupSummaryMinuteUpdate),
  74. }
  75. }
  76. func StartBatchProcessorSummary(ctx context.Context, wg *sync.WaitGroup) {
  77. defer wg.Done()
  78. ticker := time.NewTicker(5 * time.Second)
  79. defer ticker.Stop()
  80. for {
  81. select {
  82. case <-ctx.Done():
  83. ProcessBatchUpdatesSummary()
  84. return
  85. case <-ticker.C:
  86. ProcessBatchUpdatesSummary()
  87. }
  88. }
  89. }
  90. func CleanBatchUpdatesSummary(ctx context.Context) {
  91. for {
  92. select {
  93. case <-ctx.Done():
  94. ProcessBatchUpdatesSummary()
  95. return
  96. default:
  97. if batchData.IsClean() {
  98. return
  99. }
  100. }
  101. ProcessBatchUpdatesSummary()
  102. time.Sleep(time.Second * 1)
  103. }
  104. }
  105. func ProcessBatchUpdatesSummary() {
  106. batchData.Lock()
  107. defer batchData.Unlock()
  108. var wg sync.WaitGroup
  109. wg.Add(1)
  110. go processGroupUpdates(&wg)
  111. wg.Add(1)
  112. go processTokenUpdates(&wg)
  113. wg.Add(1)
  114. go processChannelUpdates(&wg)
  115. wg.Add(1)
  116. go processGroupSummaryUpdates(&wg)
  117. wg.Add(1)
  118. go processSummaryUpdates(&wg)
  119. wg.Add(1)
  120. go processSummaryMinuteUpdates(&wg)
  121. wg.Add(1)
  122. go processGroupSummaryMinuteUpdates(&wg)
  123. wg.Wait()
  124. }
  125. func processGroupUpdates(wg *sync.WaitGroup) {
  126. defer wg.Done()
  127. for groupID, data := range batchData.Groups {
  128. err := UpdateGroupUsedAmountAndRequestCount(
  129. groupID,
  130. data.Amount.InexactFloat64(),
  131. data.Count,
  132. )
  133. if IgnoreNotFound(err) != nil {
  134. notify.ErrorThrottle(
  135. "batchUpdateGroupUsedAmountAndRequestCount",
  136. time.Minute*10,
  137. "failed to batch update group",
  138. err.Error(),
  139. )
  140. } else {
  141. delete(batchData.Groups, groupID)
  142. }
  143. }
  144. }
  145. func processTokenUpdates(wg *sync.WaitGroup) {
  146. defer wg.Done()
  147. for tokenID, data := range batchData.Tokens {
  148. err := UpdateTokenUsedAmount(tokenID, data.Amount.InexactFloat64(), data.Count)
  149. if IgnoreNotFound(err) != nil {
  150. notify.ErrorThrottle(
  151. "batchUpdateTokenUsedAmount",
  152. time.Minute*10,
  153. "failed to batch update token",
  154. err.Error(),
  155. )
  156. } else {
  157. delete(batchData.Tokens, tokenID)
  158. }
  159. }
  160. }
  161. func processChannelUpdates(wg *sync.WaitGroup) {
  162. defer wg.Done()
  163. for channelID, data := range batchData.Channels {
  164. err := UpdateChannelUsedAmount(
  165. channelID,
  166. data.Amount.InexactFloat64(),
  167. data.Count,
  168. data.RetryCount,
  169. )
  170. if IgnoreNotFound(err) != nil {
  171. notify.ErrorThrottle(
  172. "batchUpdateChannelUsedAmount",
  173. time.Minute*10,
  174. "failed to batch update channel",
  175. err.Error(),
  176. )
  177. } else {
  178. delete(batchData.Channels, channelID)
  179. }
  180. }
  181. }
  182. func processGroupSummaryUpdates(wg *sync.WaitGroup) {
  183. defer wg.Done()
  184. for key, data := range batchData.GroupSummaries {
  185. err := UpsertGroupSummary(data.GroupSummaryUnique, data.SummaryData)
  186. if err != nil {
  187. notify.ErrorThrottle(
  188. "batchUpdateGroupSummary",
  189. time.Minute*10,
  190. "failed to batch update group summary",
  191. err.Error(),
  192. )
  193. } else {
  194. delete(batchData.GroupSummaries, key)
  195. }
  196. }
  197. }
  198. func processGroupSummaryMinuteUpdates(wg *sync.WaitGroup) {
  199. defer wg.Done()
  200. for key, data := range batchData.GroupSummariesMinute {
  201. err := UpsertGroupSummaryMinute(data.GroupSummaryMinuteUnique, data.SummaryData)
  202. if err != nil {
  203. notify.ErrorThrottle(
  204. "batchUpdateGroupSummary",
  205. time.Minute*10,
  206. "failed to batch update group summary",
  207. err.Error(),
  208. )
  209. } else {
  210. delete(batchData.GroupSummariesMinute, key)
  211. }
  212. }
  213. }
  214. func processSummaryUpdates(wg *sync.WaitGroup) {
  215. defer wg.Done()
  216. for key, data := range batchData.Summaries {
  217. err := UpsertSummary(data.SummaryUnique, data.SummaryData)
  218. if err != nil {
  219. notify.ErrorThrottle(
  220. "batchUpdateSummary",
  221. time.Minute*10,
  222. "failed to batch update summary",
  223. err.Error(),
  224. )
  225. } else {
  226. delete(batchData.Summaries, key)
  227. }
  228. }
  229. }
  230. func processSummaryMinuteUpdates(wg *sync.WaitGroup) {
  231. defer wg.Done()
  232. for key, data := range batchData.SummariesMinute {
  233. err := UpsertSummaryMinute(data.SummaryMinuteUnique, data.SummaryData)
  234. if err != nil {
  235. notify.ErrorThrottle(
  236. "batchUpdateSummaryMinute",
  237. time.Minute*10,
  238. "failed to batch update summary minute",
  239. err.Error(),
  240. )
  241. } else {
  242. delete(batchData.SummariesMinute, key)
  243. }
  244. }
  245. }
  246. func BatchRecordLogs(
  247. now time.Time,
  248. requestID string,
  249. requestAt time.Time,
  250. retryAt time.Time,
  251. firstByteAt time.Time,
  252. group string,
  253. code int,
  254. channelID int,
  255. modelName string,
  256. tokenID int,
  257. tokenName string,
  258. endpoint string,
  259. content string,
  260. mode int,
  261. ip string,
  262. retryTimes int,
  263. requestDetail *RequestDetail,
  264. downstreamResult bool,
  265. usage Usage,
  266. modelPrice Price,
  267. amount float64,
  268. user string,
  269. metadata map[string]string,
  270. ) (err error) {
  271. if now.IsZero() {
  272. now = time.Now()
  273. }
  274. if code == http.StatusTooManyRequests ||
  275. config.GetLogDetailStorageHours() < 0 ||
  276. config.GetLogStorageHours() < 0 {
  277. requestDetail = nil
  278. }
  279. if downstreamResult {
  280. if config.GetLogStorageHours() >= 0 {
  281. err = RecordConsumeLog(
  282. requestID,
  283. now,
  284. requestAt,
  285. retryAt,
  286. firstByteAt,
  287. group,
  288. code,
  289. channelID,
  290. modelName,
  291. tokenID,
  292. tokenName,
  293. endpoint,
  294. content,
  295. mode,
  296. ip,
  297. retryTimes,
  298. requestDetail,
  299. usage,
  300. modelPrice,
  301. amount,
  302. user,
  303. metadata,
  304. )
  305. }
  306. } else {
  307. if code != http.StatusTooManyRequests &&
  308. config.GetLogStorageHours() >= 0 &&
  309. config.GetRetryLogStorageHours() > 0 {
  310. err = RecordRetryLog(
  311. requestID,
  312. now,
  313. requestAt,
  314. retryAt,
  315. firstByteAt,
  316. code,
  317. channelID,
  318. modelName,
  319. mode,
  320. retryTimes,
  321. requestDetail,
  322. )
  323. }
  324. }
  325. BatchUpdateSummary(
  326. now,
  327. requestAt,
  328. firstByteAt,
  329. group,
  330. code,
  331. channelID,
  332. modelName,
  333. tokenID,
  334. tokenName,
  335. downstreamResult,
  336. usage,
  337. amount,
  338. )
  339. return err
  340. }
  341. func BatchUpdateSummary(
  342. now time.Time,
  343. requestAt time.Time,
  344. firstByteAt time.Time,
  345. group string,
  346. code int,
  347. channelID int,
  348. modelName string,
  349. tokenID int,
  350. tokenName string,
  351. downstreamResult bool,
  352. usage Usage,
  353. amount float64,
  354. ) {
  355. if now.IsZero() {
  356. now = time.Now()
  357. }
  358. amountDecimal := decimal.NewFromFloat(amount)
  359. batchData.Lock()
  360. defer batchData.Unlock()
  361. updateChannelData(channelID, amount, amountDecimal, !downstreamResult)
  362. if channelID != 0 {
  363. updateSummaryData(
  364. channelID,
  365. modelName,
  366. now,
  367. requestAt,
  368. firstByteAt,
  369. code,
  370. amountDecimal,
  371. usage,
  372. !downstreamResult,
  373. )
  374. updateSummaryDataMinute(
  375. channelID,
  376. modelName,
  377. now,
  378. requestAt,
  379. firstByteAt,
  380. code,
  381. amountDecimal,
  382. usage,
  383. !downstreamResult,
  384. )
  385. }
  386. // group related data only records downstream result
  387. if !downstreamResult {
  388. return
  389. }
  390. updateGroupData(group, amount, amountDecimal)
  391. updateTokenData(tokenID, amount, amountDecimal)
  392. if group != "" {
  393. updateGroupSummaryData(
  394. group,
  395. tokenName,
  396. modelName,
  397. now,
  398. requestAt,
  399. firstByteAt,
  400. code,
  401. amountDecimal,
  402. usage,
  403. )
  404. updateGroupSummaryDataMinute(
  405. group,
  406. tokenName,
  407. modelName,
  408. now,
  409. requestAt,
  410. firstByteAt,
  411. code,
  412. amountDecimal,
  413. usage,
  414. )
  415. }
  416. }
  417. func updateChannelData(
  418. channelID int,
  419. amount float64,
  420. amountDecimal decimal.Decimal,
  421. isRetry bool,
  422. ) {
  423. if channelID <= 0 {
  424. return
  425. }
  426. if _, ok := batchData.Channels[channelID]; !ok {
  427. batchData.Channels[channelID] = &ChannelUpdate{}
  428. }
  429. if amount > 0 {
  430. batchData.Channels[channelID].Amount = amountDecimal.
  431. Add(batchData.Channels[channelID].Amount)
  432. }
  433. batchData.Channels[channelID].Count++
  434. if isRetry {
  435. batchData.Channels[channelID].RetryCount++
  436. }
  437. }
  438. func updateGroupData(group string, amount float64, amountDecimal decimal.Decimal) {
  439. if group == "" {
  440. return
  441. }
  442. if _, ok := batchData.Groups[group]; !ok {
  443. batchData.Groups[group] = &GroupUpdate{}
  444. }
  445. if amount > 0 {
  446. batchData.Groups[group].Amount = amountDecimal.
  447. Add(batchData.Groups[group].Amount)
  448. }
  449. batchData.Groups[group].Count++
  450. }
  451. func updateTokenData(tokenID int, amount float64, amountDecimal decimal.Decimal) {
  452. if tokenID <= 0 {
  453. return
  454. }
  455. if _, ok := batchData.Tokens[tokenID]; !ok {
  456. batchData.Tokens[tokenID] = &TokenUpdate{}
  457. }
  458. if amount > 0 {
  459. batchData.Tokens[tokenID].Amount = amountDecimal.
  460. Add(batchData.Tokens[tokenID].Amount)
  461. }
  462. batchData.Tokens[tokenID].Count++
  463. }
  464. func updateGroupSummaryData(
  465. group, tokenName, modelName string,
  466. createAt time.Time,
  467. requestAt time.Time,
  468. firstByteAt time.Time,
  469. code int,
  470. amountDecimal decimal.Decimal,
  471. usage Usage,
  472. ) {
  473. if createAt.IsZero() {
  474. createAt = time.Now()
  475. }
  476. if requestAt.IsZero() {
  477. requestAt = createAt
  478. }
  479. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  480. firstByteAt = requestAt
  481. }
  482. groupUnique := GroupSummaryUnique{
  483. GroupID: group,
  484. TokenName: tokenName,
  485. Model: modelName,
  486. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  487. }
  488. groupSummary, ok := batchData.GroupSummaries[groupUnique]
  489. if !ok {
  490. groupSummary = &GroupSummaryUpdate{
  491. GroupSummaryUnique: groupUnique,
  492. }
  493. batchData.GroupSummaries[groupUnique] = groupSummary
  494. }
  495. groupSummary.UsedAmount = amountDecimal.
  496. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  497. InexactFloat64()
  498. groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  499. groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  500. groupSummary.Usage.Add(usage)
  501. groupSummary.AddRequest(code, false)
  502. }
  503. func updateGroupSummaryDataMinute(
  504. group, tokenName, modelName string,
  505. createAt time.Time,
  506. requestAt time.Time,
  507. firstByteAt time.Time,
  508. code int,
  509. amountDecimal decimal.Decimal,
  510. usage Usage,
  511. ) {
  512. if createAt.IsZero() {
  513. createAt = time.Now()
  514. }
  515. if requestAt.IsZero() {
  516. requestAt = createAt
  517. }
  518. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  519. firstByteAt = requestAt
  520. }
  521. groupUnique := GroupSummaryMinuteUnique{
  522. GroupID: group,
  523. TokenName: tokenName,
  524. Model: modelName,
  525. MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
  526. }
  527. groupSummary, ok := batchData.GroupSummariesMinute[groupUnique]
  528. if !ok {
  529. groupSummary = &GroupSummaryMinuteUpdate{
  530. GroupSummaryMinuteUnique: groupUnique,
  531. }
  532. batchData.GroupSummariesMinute[groupUnique] = groupSummary
  533. }
  534. groupSummary.UsedAmount = amountDecimal.
  535. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  536. InexactFloat64()
  537. groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  538. groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  539. groupSummary.Usage.Add(usage)
  540. groupSummary.AddRequest(code, false)
  541. }
  542. func updateSummaryData(
  543. channelID int,
  544. modelName string,
  545. createAt time.Time,
  546. requestAt time.Time,
  547. firstByteAt time.Time,
  548. code int,
  549. amountDecimal decimal.Decimal,
  550. usage Usage,
  551. isRetry bool,
  552. ) {
  553. if createAt.IsZero() {
  554. createAt = time.Now()
  555. }
  556. if requestAt.IsZero() {
  557. requestAt = createAt
  558. }
  559. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  560. firstByteAt = requestAt
  561. }
  562. summaryUnique := SummaryUnique{
  563. ChannelID: channelID,
  564. Model: modelName,
  565. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  566. }
  567. summary, ok := batchData.Summaries[summaryUnique]
  568. if !ok {
  569. summary = &SummaryUpdate{
  570. SummaryUnique: summaryUnique,
  571. }
  572. batchData.Summaries[summaryUnique] = summary
  573. }
  574. summary.UsedAmount = amountDecimal.
  575. Add(decimal.NewFromFloat(summary.UsedAmount)).
  576. InexactFloat64()
  577. summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  578. summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  579. summary.Usage.Add(usage)
  580. summary.AddRequest(code, isRetry)
  581. }
  582. func updateSummaryDataMinute(
  583. channelID int,
  584. modelName string,
  585. createAt time.Time,
  586. requestAt time.Time,
  587. firstByteAt time.Time,
  588. code int,
  589. amountDecimal decimal.Decimal,
  590. usage Usage,
  591. isRetry bool,
  592. ) {
  593. if createAt.IsZero() {
  594. createAt = time.Now()
  595. }
  596. if requestAt.IsZero() {
  597. requestAt = createAt
  598. }
  599. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  600. firstByteAt = requestAt
  601. }
  602. summaryUnique := SummaryMinuteUnique{
  603. ChannelID: channelID,
  604. Model: modelName,
  605. MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
  606. }
  607. summary, ok := batchData.SummariesMinute[summaryUnique]
  608. if !ok {
  609. summary = &SummaryMinuteUpdate{
  610. SummaryMinuteUnique: summaryUnique,
  611. }
  612. batchData.SummariesMinute[summaryUnique] = summary
  613. }
  614. summary.UsedAmount = amountDecimal.
  615. Add(decimal.NewFromFloat(summary.UsedAmount)).
  616. InexactFloat64()
  617. summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  618. summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  619. summary.Usage.Add(usage)
  620. summary.AddRequest(code, isRetry)
  621. }