batch.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package model
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/labring/aiproxy/core/common/config"
  9. "github.com/labring/aiproxy/core/common/notify"
  10. "github.com/shopspring/decimal"
  11. )
  12. type batchUpdateData struct {
  13. Groups map[string]*GroupUpdate
  14. Tokens map[int]*TokenUpdate
  15. Channels map[int]*ChannelUpdate
  16. Summaries map[string]*SummaryUpdate
  17. GroupSummaries map[string]*GroupSummaryUpdate
  18. sync.Mutex
  19. }
  20. func (b *batchUpdateData) IsClean() bool {
  21. b.Lock()
  22. defer b.Unlock()
  23. return b.isCleanLocked()
  24. }
  25. func (b *batchUpdateData) isCleanLocked() bool {
  26. return len(b.Groups) == 0 &&
  27. len(b.Tokens) == 0 &&
  28. len(b.Channels) == 0 &&
  29. len(b.Summaries) == 0 &&
  30. len(b.GroupSummaries) == 0
  31. }
  32. type GroupUpdate struct {
  33. Amount decimal.Decimal
  34. Count int
  35. }
  36. type TokenUpdate struct {
  37. Amount decimal.Decimal
  38. Count int
  39. }
  40. type ChannelUpdate struct {
  41. Amount decimal.Decimal
  42. Count int
  43. }
  44. type SummaryUpdate struct {
  45. SummaryUnique
  46. SummaryData
  47. }
  48. func summaryUniqueKey(unique SummaryUnique) string {
  49. return fmt.Sprintf("%d:%s:%d", unique.ChannelID, unique.Model, unique.HourTimestamp)
  50. }
  51. type GroupSummaryUpdate struct {
  52. GroupSummaryUnique
  53. SummaryData
  54. }
  55. func groupSummaryUniqueKey(unique GroupSummaryUnique) string {
  56. return fmt.Sprintf("%s:%s:%s:%d", unique.GroupID, unique.TokenName, unique.Model, unique.HourTimestamp)
  57. }
  58. var batchData batchUpdateData
  59. func init() {
  60. batchData = batchUpdateData{
  61. Groups: make(map[string]*GroupUpdate),
  62. Tokens: make(map[int]*TokenUpdate),
  63. Channels: make(map[int]*ChannelUpdate),
  64. Summaries: make(map[string]*SummaryUpdate),
  65. GroupSummaries: make(map[string]*GroupSummaryUpdate),
  66. }
  67. }
  68. func StartBatchProcessorSummary(ctx context.Context, wg *sync.WaitGroup) {
  69. defer wg.Done()
  70. ticker := time.NewTicker(5 * time.Second)
  71. defer ticker.Stop()
  72. for {
  73. select {
  74. case <-ctx.Done():
  75. ProcessBatchUpdatesSummary()
  76. return
  77. case <-ticker.C:
  78. ProcessBatchUpdatesSummary()
  79. }
  80. }
  81. }
  82. func CleanBatchUpdatesSummary(ctx context.Context) {
  83. for {
  84. select {
  85. case <-ctx.Done():
  86. ProcessBatchUpdatesSummary()
  87. return
  88. default:
  89. if batchData.IsClean() {
  90. return
  91. }
  92. }
  93. ProcessBatchUpdatesSummary()
  94. time.Sleep(time.Second * 1)
  95. }
  96. }
  97. func ProcessBatchUpdatesSummary() {
  98. batchData.Lock()
  99. defer batchData.Unlock()
  100. var wg sync.WaitGroup
  101. wg.Add(1)
  102. go processGroupUpdates(&wg)
  103. wg.Add(1)
  104. go processTokenUpdates(&wg)
  105. wg.Add(1)
  106. go processChannelUpdates(&wg)
  107. wg.Add(1)
  108. go processGroupSummaryUpdates(&wg)
  109. wg.Add(1)
  110. go processSummaryUpdates(&wg)
  111. wg.Wait()
  112. }
  113. func processGroupUpdates(wg *sync.WaitGroup) {
  114. defer wg.Done()
  115. for groupID, data := range batchData.Groups {
  116. err := UpdateGroupUsedAmountAndRequestCount(groupID, data.Amount.InexactFloat64(), data.Count)
  117. if IgnoreNotFound(err) != nil {
  118. notify.ErrorThrottle(
  119. "batchUpdateGroupUsedAmountAndRequestCount",
  120. time.Minute,
  121. "failed to batch update group",
  122. err.Error(),
  123. )
  124. } else {
  125. delete(batchData.Groups, groupID)
  126. }
  127. }
  128. }
  129. func processTokenUpdates(wg *sync.WaitGroup) {
  130. defer wg.Done()
  131. for tokenID, data := range batchData.Tokens {
  132. err := UpdateTokenUsedAmount(tokenID, data.Amount.InexactFloat64(), data.Count)
  133. if IgnoreNotFound(err) != nil {
  134. notify.ErrorThrottle(
  135. "batchUpdateTokenUsedAmount",
  136. time.Minute,
  137. "failed to batch update token",
  138. err.Error(),
  139. )
  140. } else {
  141. delete(batchData.Tokens, tokenID)
  142. }
  143. }
  144. }
  145. func processChannelUpdates(wg *sync.WaitGroup) {
  146. defer wg.Done()
  147. for channelID, data := range batchData.Channels {
  148. err := UpdateChannelUsedAmount(channelID, data.Amount.InexactFloat64(), data.Count)
  149. if IgnoreNotFound(err) != nil {
  150. notify.ErrorThrottle(
  151. "batchUpdateChannelUsedAmount",
  152. time.Minute,
  153. "failed to batch update channel",
  154. err.Error(),
  155. )
  156. } else {
  157. delete(batchData.Channels, channelID)
  158. }
  159. }
  160. }
  161. func processGroupSummaryUpdates(wg *sync.WaitGroup) {
  162. defer wg.Done()
  163. for key, data := range batchData.GroupSummaries {
  164. err := UpsertGroupSummary(data.GroupSummaryUnique, data.SummaryData)
  165. if err != nil {
  166. notify.ErrorThrottle(
  167. "batchUpdateGroupSummary",
  168. time.Minute,
  169. "failed to batch update group summary",
  170. err.Error(),
  171. )
  172. } else {
  173. delete(batchData.GroupSummaries, key)
  174. }
  175. }
  176. }
  177. func processSummaryUpdates(wg *sync.WaitGroup) {
  178. defer wg.Done()
  179. for key, data := range batchData.Summaries {
  180. err := UpsertSummary(data.SummaryUnique, data.SummaryData)
  181. if err != nil {
  182. notify.ErrorThrottle(
  183. "batchUpdateSummary",
  184. time.Minute,
  185. "failed to batch update summary",
  186. err.Error(),
  187. )
  188. } else {
  189. delete(batchData.Summaries, key)
  190. }
  191. }
  192. }
  193. func BatchRecordLogs(
  194. requestID string,
  195. requestAt time.Time,
  196. retryAt time.Time,
  197. firstByteAt time.Time,
  198. group string,
  199. code int,
  200. channelID int,
  201. modelName string,
  202. tokenID int,
  203. tokenName string,
  204. endpoint string,
  205. content string,
  206. mode int,
  207. ip string,
  208. retryTimes int,
  209. requestDetail *RequestDetail,
  210. downstreamResult bool,
  211. usage Usage,
  212. modelPrice Price,
  213. amount float64,
  214. user string,
  215. metadata map[string]string,
  216. ) (err error) {
  217. now := time.Now()
  218. if downstreamResult {
  219. if config.GetLogStorageHours() >= 0 {
  220. err = RecordConsumeLog(
  221. requestID,
  222. now,
  223. requestAt,
  224. retryAt,
  225. firstByteAt,
  226. group,
  227. code,
  228. channelID,
  229. modelName,
  230. tokenID,
  231. tokenName,
  232. endpoint,
  233. content,
  234. mode,
  235. ip,
  236. retryTimes,
  237. requestDetail,
  238. usage,
  239. modelPrice,
  240. amount,
  241. user,
  242. metadata,
  243. )
  244. }
  245. } else {
  246. if config.GetRetryLogStorageHours() >= 0 {
  247. err = RecordRetryLog(
  248. requestID,
  249. now,
  250. requestAt,
  251. retryAt,
  252. firstByteAt,
  253. code,
  254. channelID,
  255. modelName,
  256. mode,
  257. retryTimes,
  258. requestDetail,
  259. )
  260. }
  261. }
  262. amountDecimal := decimal.NewFromFloat(amount)
  263. batchData.Lock()
  264. defer batchData.Unlock()
  265. updateChannelData(channelID, amount, amountDecimal)
  266. if !downstreamResult {
  267. return err
  268. }
  269. updateGroupData(group, amount, amountDecimal)
  270. updateTokenData(tokenID, amount, amountDecimal)
  271. if channelID != 0 {
  272. updateSummaryData(channelID, modelName, now, code, amountDecimal, usage)
  273. }
  274. updateGroupSummaryData(group, tokenName, modelName, now, code, amountDecimal, usage)
  275. return err
  276. }
  277. func updateChannelData(channelID int, amount float64, amountDecimal decimal.Decimal) {
  278. if channelID > 0 {
  279. if _, ok := batchData.Channels[channelID]; !ok {
  280. batchData.Channels[channelID] = &ChannelUpdate{}
  281. }
  282. if amount > 0 {
  283. batchData.Channels[channelID].Amount = amountDecimal.
  284. Add(batchData.Channels[channelID].Amount)
  285. }
  286. batchData.Channels[channelID].Count++
  287. }
  288. }
  289. func updateGroupData(group string, amount float64, amountDecimal decimal.Decimal) {
  290. if group != "" {
  291. if _, ok := batchData.Groups[group]; !ok {
  292. batchData.Groups[group] = &GroupUpdate{}
  293. }
  294. if amount > 0 {
  295. batchData.Groups[group].Amount = amountDecimal.
  296. Add(batchData.Groups[group].Amount)
  297. }
  298. batchData.Groups[group].Count++
  299. }
  300. }
  301. func updateTokenData(tokenID int, amount float64, amountDecimal decimal.Decimal) {
  302. if tokenID > 0 {
  303. if _, ok := batchData.Tokens[tokenID]; !ok {
  304. batchData.Tokens[tokenID] = &TokenUpdate{}
  305. }
  306. if amount > 0 {
  307. batchData.Tokens[tokenID].Amount = amountDecimal.
  308. Add(batchData.Tokens[tokenID].Amount)
  309. }
  310. batchData.Tokens[tokenID].Count++
  311. }
  312. }
  313. func updateGroupSummaryData(group string, tokenName string, modelName string, createAt time.Time, code int, amountDecimal decimal.Decimal, usage Usage) {
  314. groupUnique := GroupSummaryUnique{
  315. GroupID: group,
  316. TokenName: tokenName,
  317. Model: modelName,
  318. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  319. }
  320. groupSummaryKey := groupSummaryUniqueKey(groupUnique)
  321. groupSummary, ok := batchData.GroupSummaries[groupSummaryKey]
  322. if !ok {
  323. groupSummary = &GroupSummaryUpdate{
  324. GroupSummaryUnique: groupUnique,
  325. }
  326. batchData.GroupSummaries[groupSummaryKey] = groupSummary
  327. }
  328. groupSummary.RequestCount++
  329. groupSummary.UsedAmount = amountDecimal.
  330. Add(decimal.NewFromFloat(groupSummary.UsedAmount)).
  331. InexactFloat64()
  332. groupSummary.Usage.Add(&usage)
  333. if code != http.StatusOK {
  334. groupSummary.ExceptionCount++
  335. }
  336. }
  337. func updateSummaryData(channelID int, modelName string, createAt time.Time, code int, amountDecimal decimal.Decimal, usage Usage) {
  338. summaryUnique := SummaryUnique{
  339. ChannelID: channelID,
  340. Model: modelName,
  341. HourTimestamp: createAt.Truncate(time.Hour).Unix(),
  342. }
  343. summaryKey := summaryUniqueKey(summaryUnique)
  344. summary, ok := batchData.Summaries[summaryKey]
  345. if !ok {
  346. summary = &SummaryUpdate{
  347. SummaryUnique: summaryUnique,
  348. }
  349. batchData.Summaries[summaryKey] = summary
  350. }
  351. summary.RequestCount++
  352. summary.UsedAmount = amountDecimal.
  353. Add(decimal.NewFromFloat(summary.UsedAmount)).
  354. InexactFloat64()
  355. summary.Usage.Add(&usage)
  356. if code != http.StatusOK {
  357. summary.ExceptionCount++
  358. }
  359. }