batch.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. package model
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/labring/aiproxy/core/common/config"
  9. "github.com/labring/aiproxy/core/common/notify"
  10. "github.com/shopspring/decimal"
  11. )
  12. type batchUpdateData struct {
  13. Groups map[string]*GroupUpdate
  14. Tokens map[int]*TokenUpdate
  15. Channels map[int]*ChannelUpdate
  16. Summaries map[string]*SummaryUpdate
  17. GroupSummaries map[string]*GroupSummaryUpdate
  18. SummariesMinute map[string]*SummaryMinuteUpdate
  19. GroupSummariesMinute map[string]*GroupSummaryMinuteUpdate
  20. sync.Mutex
  21. }
  22. func (b *batchUpdateData) IsClean() bool {
  23. b.Lock()
  24. defer b.Unlock()
  25. return b.isCleanLocked()
  26. }
  27. func (b *batchUpdateData) isCleanLocked() bool {
  28. return len(b.Groups) == 0 &&
  29. len(b.Tokens) == 0 &&
  30. len(b.Channels) == 0 &&
  31. len(b.Summaries) == 0 &&
  32. len(b.GroupSummaries) == 0 &&
  33. len(b.SummariesMinute) == 0 &&
  34. len(b.GroupSummariesMinute) == 0
  35. }
  36. type GroupUpdate struct {
  37. Amount decimal.Decimal
  38. Count int
  39. }
  40. type TokenUpdate struct {
  41. Amount decimal.Decimal
  42. Count int
  43. }
  44. type ChannelUpdate struct {
  45. Amount decimal.Decimal
  46. Count int
  47. }
  48. type SummaryUpdate struct {
  49. SummaryUnique
  50. SummaryData
  51. }
  52. type SummaryMinuteUpdate struct {
  53. SummaryMinuteUnique
  54. SummaryData
  55. }
  56. func summaryUniqueKey(unique SummaryUnique) string {
  57. return fmt.Sprintf("%d:%s:%d", unique.ChannelID, unique.Model, unique.HourTimestamp)
  58. }
  59. func summaryMinuteUniqueKey(unique SummaryMinuteUnique) string {
  60. return fmt.Sprintf("%d:%s:%d", unique.ChannelID, unique.Model, unique.MinuteTimestamp)
  61. }
  62. type GroupSummaryUpdate struct {
  63. GroupSummaryUnique
  64. SummaryData
  65. }
  66. type GroupSummaryMinuteUpdate struct {
  67. GroupSummaryMinuteUnique
  68. SummaryData
  69. }
  70. func groupSummaryUniqueKey(unique GroupSummaryUnique) string {
  71. return fmt.Sprintf(
  72. "%s:%s:%s:%d",
  73. unique.GroupID,
  74. unique.TokenName,
  75. unique.Model,
  76. unique.HourTimestamp,
  77. )
  78. }
  79. func groupSummaryMinuteUniqueKey(unique GroupSummaryMinuteUnique) string {
  80. return fmt.Sprintf(
  81. "%s:%s:%s:%d",
  82. unique.GroupID,
  83. unique.TokenName,
  84. unique.Model,
  85. unique.MinuteTimestamp,
  86. )
  87. }
  88. var batchData batchUpdateData
  89. func init() {
  90. batchData = batchUpdateData{
  91. Groups: make(map[string]*GroupUpdate),
  92. Tokens: make(map[int]*TokenUpdate),
  93. Channels: make(map[int]*ChannelUpdate),
  94. Summaries: make(map[string]*SummaryUpdate),
  95. GroupSummaries: make(map[string]*GroupSummaryUpdate),
  96. SummariesMinute: make(map[string]*SummaryMinuteUpdate),
  97. GroupSummariesMinute: make(map[string]*GroupSummaryMinuteUpdate),
  98. }
  99. }
  100. func StartBatchProcessorSummary(ctx context.Context, wg *sync.WaitGroup) {
  101. defer wg.Done()
  102. ticker := time.NewTicker(5 * time.Second)
  103. defer ticker.Stop()
  104. for {
  105. select {
  106. case <-ctx.Done():
  107. ProcessBatchUpdatesSummary()
  108. return
  109. case <-ticker.C:
  110. ProcessBatchUpdatesSummary()
  111. }
  112. }
  113. }
  114. func CleanBatchUpdatesSummary(ctx context.Context) {
  115. for {
  116. select {
  117. case <-ctx.Done():
  118. ProcessBatchUpdatesSummary()
  119. return
  120. default:
  121. if batchData.IsClean() {
  122. return
  123. }
  124. }
  125. ProcessBatchUpdatesSummary()
  126. time.Sleep(time.Second * 1)
  127. }
  128. }
  129. func ProcessBatchUpdatesSummary() {
  130. batchData.Lock()
  131. defer batchData.Unlock()
  132. var wg sync.WaitGroup
  133. wg.Add(1)
  134. go processGroupUpdates(&wg)
  135. wg.Add(1)
  136. go processTokenUpdates(&wg)
  137. wg.Add(1)
  138. go processChannelUpdates(&wg)
  139. wg.Add(1)
  140. go processGroupSummaryUpdates(&wg)
  141. wg.Add(1)
  142. go processSummaryUpdates(&wg)
  143. wg.Add(1)
  144. go processSummaryMinuteUpdates(&wg)
  145. wg.Add(1)
  146. go processGroupSummaryMinuteUpdates(&wg)
  147. wg.Wait()
  148. }
  149. func processGroupUpdates(wg *sync.WaitGroup) {
  150. defer wg.Done()
  151. for groupID, data := range batchData.Groups {
  152. err := UpdateGroupUsedAmountAndRequestCount(
  153. groupID,
  154. data.Amount.InexactFloat64(),
  155. data.Count,
  156. )
  157. if IgnoreNotFound(err) != nil {
  158. notify.ErrorThrottle(
  159. "batchUpdateGroupUsedAmountAndRequestCount",
  160. time.Minute,
  161. "failed to batch update group",
  162. err.Error(),
  163. )
  164. } else {
  165. delete(batchData.Groups, groupID)
  166. }
  167. }
  168. }
  169. func processTokenUpdates(wg *sync.WaitGroup) {
  170. defer wg.Done()
  171. for tokenID, data := range batchData.Tokens {
  172. err := UpdateTokenUsedAmount(tokenID, data.Amount.InexactFloat64(), data.Count)
  173. if IgnoreNotFound(err) != nil {
  174. notify.ErrorThrottle(
  175. "batchUpdateTokenUsedAmount",
  176. time.Minute,
  177. "failed to batch update token",
  178. err.Error(),
  179. )
  180. } else {
  181. delete(batchData.Tokens, tokenID)
  182. }
  183. }
  184. }
  185. func processChannelUpdates(wg *sync.WaitGroup) {
  186. defer wg.Done()
  187. for channelID, data := range batchData.Channels {
  188. err := UpdateChannelUsedAmount(channelID, data.Amount.InexactFloat64(), data.Count)
  189. if IgnoreNotFound(err) != nil {
  190. notify.ErrorThrottle(
  191. "batchUpdateChannelUsedAmount",
  192. time.Minute,
  193. "failed to batch update channel",
  194. err.Error(),
  195. )
  196. } else {
  197. delete(batchData.Channels, channelID)
  198. }
  199. }
  200. }
  201. func processGroupSummaryUpdates(wg *sync.WaitGroup) {
  202. defer wg.Done()
  203. for key, data := range batchData.GroupSummaries {
  204. err := UpsertGroupSummary(data.GroupSummaryUnique, data.SummaryData)
  205. if err != nil {
  206. notify.ErrorThrottle(
  207. "batchUpdateGroupSummary",
  208. time.Minute,
  209. "failed to batch update group summary",
  210. err.Error(),
  211. )
  212. } else {
  213. delete(batchData.GroupSummaries, key)
  214. }
  215. }
  216. }
  217. func processGroupSummaryMinuteUpdates(wg *sync.WaitGroup) {
  218. defer wg.Done()
  219. for key, data := range batchData.GroupSummariesMinute {
  220. err := UpsertGroupSummaryMinute(data.GroupSummaryMinuteUnique, data.SummaryData)
  221. if err != nil {
  222. notify.ErrorThrottle(
  223. "batchUpdateGroupSummary",
  224. time.Minute,
  225. "failed to batch update group summary",
  226. err.Error(),
  227. )
  228. } else {
  229. delete(batchData.GroupSummariesMinute, key)
  230. }
  231. }
  232. }
  233. func processSummaryUpdates(wg *sync.WaitGroup) {
  234. defer wg.Done()
  235. for key, data := range batchData.Summaries {
  236. err := UpsertSummary(data.SummaryUnique, data.SummaryData)
  237. if err != nil {
  238. notify.ErrorThrottle(
  239. "batchUpdateSummary",
  240. time.Minute,
  241. "failed to batch update summary",
  242. err.Error(),
  243. )
  244. } else {
  245. delete(batchData.Summaries, key)
  246. }
  247. }
  248. }
  249. func processSummaryMinuteUpdates(wg *sync.WaitGroup) {
  250. defer wg.Done()
  251. for key, data := range batchData.SummariesMinute {
  252. err := UpsertSummaryMinute(data.SummaryMinuteUnique, data.SummaryData)
  253. if err != nil {
  254. notify.ErrorThrottle(
  255. "batchUpdateSummaryMinute",
  256. time.Minute,
  257. "failed to batch update summary minute",
  258. err.Error(),
  259. )
  260. } else {
  261. delete(batchData.SummariesMinute, key)
  262. }
  263. }
  264. }
  265. func BatchRecordLogs(
  266. requestID string,
  267. requestAt time.Time,
  268. retryAt time.Time,
  269. firstByteAt time.Time,
  270. group string,
  271. code int,
  272. channelID int,
  273. modelName string,
  274. tokenID int,
  275. tokenName string,
  276. endpoint string,
  277. content string,
  278. mode int,
  279. ip string,
  280. retryTimes int,
  281. requestDetail *RequestDetail,
  282. downstreamResult bool,
  283. usage Usage,
  284. modelPrice Price,
  285. amount float64,
  286. user string,
  287. metadata map[string]string,
  288. ) (err error) {
  289. now := time.Now()
  290. if downstreamResult {
  291. if config.GetLogStorageHours() >= 0 {
  292. err = RecordConsumeLog(
  293. requestID,
  294. now,
  295. requestAt,
  296. retryAt,
  297. firstByteAt,
  298. group,
  299. code,
  300. channelID,
  301. modelName,
  302. tokenID,
  303. tokenName,
  304. endpoint,
  305. content,
  306. mode,
  307. ip,
  308. retryTimes,
  309. requestDetail,
  310. usage,
  311. modelPrice,
  312. amount,
  313. user,
  314. metadata,
  315. )
  316. }
  317. } else {
  318. if config.GetRetryLogStorageHours() >= 0 {
  319. err = RecordRetryLog(
  320. requestID,
  321. now,
  322. requestAt,
  323. retryAt,
  324. firstByteAt,
  325. code,
  326. channelID,
  327. modelName,
  328. mode,
  329. retryTimes,
  330. requestDetail,
  331. )
  332. }
  333. }
  334. amountDecimal := decimal.NewFromFloat(amount)
  335. batchData.Lock()
  336. defer batchData.Unlock()
  337. updateChannelData(channelID, amount, amountDecimal)
  338. if !downstreamResult {
  339. return err
  340. }
  341. updateGroupData(group, amount, amountDecimal)
  342. updateTokenData(tokenID, amount, amountDecimal)
  343. if channelID != 0 {
  344. updateSummaryData(
  345. channelID,
  346. modelName,
  347. now,
  348. requestAt,
  349. firstByteAt,
  350. code,
  351. amountDecimal,
  352. usage,
  353. )
  354. updateSummaryDataMinute(
  355. channelID,
  356. modelName,
  357. now,
  358. requestAt,
  359. firstByteAt,
  360. code,
  361. amountDecimal,
  362. usage,
  363. )
  364. }
  365. if group != "" {
  366. updateGroupSummaryData(
  367. group,
  368. tokenName,
  369. modelName,
  370. now,
  371. requestAt,
  372. firstByteAt,
  373. code,
  374. amountDecimal,
  375. usage,
  376. )
  377. updateGroupSummaryDataMinute(
  378. group,
  379. tokenName,
  380. modelName,
  381. now,
  382. requestAt,
  383. firstByteAt,
  384. code,
  385. amountDecimal,
  386. usage,
  387. )
  388. }
  389. return err
  390. }
  391. func updateChannelData(channelID int, amount float64, amountDecimal decimal.Decimal) {
  392. if channelID > 0 {
  393. if _, ok := batchData.Channels[channelID]; !ok {
  394. batchData.Channels[channelID] = &ChannelUpdate{}
  395. }
  396. if amount > 0 {
  397. batchData.Channels[channelID].Amount = amountDecimal.
  398. Add(batchData.Channels[channelID].Amount)
  399. }
  400. batchData.Channels[channelID].Count++
  401. }
  402. }
  403. func updateGroupData(group string, amount float64, amountDecimal decimal.Decimal) {
  404. if group != "" {
  405. if _, ok := batchData.Groups[group]; !ok {
  406. batchData.Groups[group] = &GroupUpdate{}
  407. }
  408. if amount > 0 {
  409. batchData.Groups[group].Amount = amountDecimal.
  410. Add(batchData.Groups[group].Amount)
  411. }
  412. batchData.Groups[group].Count++
  413. }
  414. }
  415. func updateTokenData(tokenID int, amount float64, amountDecimal decimal.Decimal) {
  416. if tokenID > 0 {
  417. if _, ok := batchData.Tokens[tokenID]; !ok {
  418. batchData.Tokens[tokenID] = &TokenUpdate{}
  419. }
  420. if amount > 0 {
  421. batchData.Tokens[tokenID].Amount = amountDecimal.
  422. Add(batchData.Tokens[tokenID].Amount)
  423. }
  424. batchData.Tokens[tokenID].Count++
  425. }
  426. }
  427. func updateGroupSummaryData(
  428. group, tokenName, modelName string,
  429. createAt time.Time,
  430. requestAt time.Time,
  431. firstByteAt time.Time,
  432. code int,
  433. amountDecimal decimal.Decimal,
  434. usage Usage,
  435. ) {
  436. if createAt.IsZero() {
  437. createAt = time.Now()
  438. }
  439. if requestAt.IsZero() {
  440. requestAt = createAt
  441. }
  442. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  443. firstByteAt = requestAt
  444. }
  445. groupUnique := GroupSummaryUnique{
  446. GroupID: group,
  447. TokenName: tokenName,
  448. Model: modelName,
  449. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  450. }
  451. groupSummaryKey := groupSummaryUniqueKey(groupUnique)
  452. groupSummary, ok := batchData.GroupSummaries[groupSummaryKey]
  453. if !ok {
  454. groupSummary = &GroupSummaryUpdate{
  455. GroupSummaryUnique: groupUnique,
  456. }
  457. batchData.GroupSummaries[groupSummaryKey] = groupSummary
  458. }
  459. groupSummary.RequestCount++
  460. groupSummary.UsedAmount = amountDecimal.
  461. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  462. InexactFloat64()
  463. groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  464. groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  465. groupSummary.Usage.Add(usage)
  466. if code != http.StatusOK {
  467. groupSummary.ExceptionCount++
  468. }
  469. }
  470. func updateGroupSummaryDataMinute(
  471. group, tokenName, modelName string,
  472. createAt time.Time,
  473. requestAt time.Time,
  474. firstByteAt time.Time,
  475. code int,
  476. amountDecimal decimal.Decimal,
  477. usage Usage,
  478. ) {
  479. if createAt.IsZero() {
  480. createAt = time.Now()
  481. }
  482. if requestAt.IsZero() {
  483. requestAt = createAt
  484. }
  485. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  486. firstByteAt = requestAt
  487. }
  488. groupUnique := GroupSummaryMinuteUnique{
  489. GroupID: group,
  490. TokenName: tokenName,
  491. Model: modelName,
  492. MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
  493. }
  494. groupSummaryKey := groupSummaryMinuteUniqueKey(groupUnique)
  495. groupSummary, ok := batchData.GroupSummariesMinute[groupSummaryKey]
  496. if !ok {
  497. groupSummary = &GroupSummaryMinuteUpdate{
  498. GroupSummaryMinuteUnique: groupUnique,
  499. }
  500. batchData.GroupSummariesMinute[groupSummaryKey] = groupSummary
  501. }
  502. groupSummary.RequestCount++
  503. groupSummary.UsedAmount = amountDecimal.
  504. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  505. InexactFloat64()
  506. groupSummary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  507. groupSummary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  508. groupSummary.Usage.Add(usage)
  509. if code != http.StatusOK {
  510. groupSummary.ExceptionCount++
  511. }
  512. }
  513. func updateSummaryData(
  514. channelID int,
  515. modelName string,
  516. createAt time.Time,
  517. requestAt time.Time,
  518. firstByteAt time.Time,
  519. code int,
  520. amountDecimal decimal.Decimal,
  521. usage Usage,
  522. ) {
  523. if createAt.IsZero() {
  524. createAt = time.Now()
  525. }
  526. if requestAt.IsZero() {
  527. requestAt = createAt
  528. }
  529. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  530. firstByteAt = requestAt
  531. }
  532. summaryUnique := SummaryUnique{
  533. ChannelID: channelID,
  534. Model: modelName,
  535. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  536. }
  537. summaryKey := summaryUniqueKey(summaryUnique)
  538. summary, ok := batchData.Summaries[summaryKey]
  539. if !ok {
  540. summary = &SummaryUpdate{
  541. SummaryUnique: summaryUnique,
  542. }
  543. batchData.Summaries[summaryKey] = summary
  544. }
  545. summary.RequestCount++
  546. summary.UsedAmount = amountDecimal.
  547. Add(decimal.NewFromFloat(summary.UsedAmount)).
  548. InexactFloat64()
  549. summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  550. summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  551. summary.Usage.Add(usage)
  552. if code != http.StatusOK {
  553. summary.ExceptionCount++
  554. }
  555. }
  556. func updateSummaryDataMinute(
  557. channelID int,
  558. modelName string,
  559. createAt time.Time,
  560. requestAt time.Time,
  561. firstByteAt time.Time,
  562. code int,
  563. amountDecimal decimal.Decimal,
  564. usage Usage,
  565. ) {
  566. if createAt.IsZero() {
  567. createAt = time.Now()
  568. }
  569. if requestAt.IsZero() {
  570. requestAt = createAt
  571. }
  572. if firstByteAt.IsZero() || firstByteAt.Before(requestAt) {
  573. firstByteAt = requestAt
  574. }
  575. summaryUnique := SummaryMinuteUnique{
  576. ChannelID: channelID,
  577. Model: modelName,
  578. MinuteTimestamp: createAt.Truncate(time.Minute).Unix(),
  579. }
  580. summaryKey := summaryMinuteUniqueKey(summaryUnique)
  581. summary, ok := batchData.SummariesMinute[summaryKey]
  582. if !ok {
  583. summary = &SummaryMinuteUpdate{
  584. SummaryMinuteUnique: summaryUnique,
  585. }
  586. batchData.SummariesMinute[summaryKey] = summary
  587. }
  588. summary.RequestCount++
  589. summary.UsedAmount = amountDecimal.
  590. Add(decimal.NewFromFloat(summary.UsedAmount)).
  591. InexactFloat64()
  592. summary.TotalTimeMilliseconds += createAt.Sub(requestAt).Milliseconds()
  593. summary.TotalTTFBMilliseconds += firstByteAt.Sub(requestAt).Milliseconds()
  594. summary.Usage.Add(usage)
  595. if code != http.StatusOK {
  596. summary.ExceptionCount++
  597. }
  598. }