summary-minute.go 30 KB


  1. package model
  2. import (
  3. "cmp"
  4. "errors"
  5. "slices"
  6. "time"
  7. "github.com/shopspring/decimal"
  8. "golang.org/x/sync/errgroup"
  9. "gorm.io/gorm"
  10. "gorm.io/gorm/clause"
  11. )
  12. type SummaryMinute struct {
  13. ID int `gorm:"primaryKey"`
  14. Unique SummaryMinuteUnique `gorm:"embedded"`
  15. Data SummaryData `gorm:"embedded"`
  16. }
  17. type SummaryMinuteUnique struct {
  18. ChannelID int `gorm:"not null;uniqueIndex:idx_summary_minute_unique,priority:1"`
  19. Model string `gorm:"size:64;not null;uniqueIndex:idx_summary_minute_unique,priority:2"`
  20. MinuteTimestamp int64 `gorm:"not null;uniqueIndex:idx_summary_minute_unique,priority:3,sort:desc"`
  21. }
  22. func (l *SummaryMinute) BeforeCreate(_ *gorm.DB) (err error) {
  23. if l.Unique.ChannelID == 0 {
  24. return errors.New("channel id is required")
  25. }
  26. if l.Unique.Model == "" {
  27. return errors.New("model is required")
  28. }
  29. if l.Unique.MinuteTimestamp == 0 {
  30. return errors.New("minute timestamp is required")
  31. }
  32. if err := validateMinuteTimestamp(l.Unique.MinuteTimestamp); err != nil {
  33. return err
  34. }
  35. return
  36. }
  37. var minuteTimestampDivisor = int64(time.Minute.Seconds())
  38. func validateMinuteTimestamp(minuteTimestamp int64) error {
  39. if minuteTimestamp%minuteTimestampDivisor != 0 {
  40. return errors.New("minute timestamp must be an exact minute")
  41. }
  42. return nil
  43. }
  44. func CreateSummaryMinuteIndexs(db *gorm.DB) error {
  45. indexes := []string{
  46. "CREATE INDEX IF NOT EXISTS idx_summary_minute_channel_minute ON summary_minutes (channel_id, minute_timestamp DESC)",
  47. "CREATE INDEX IF NOT EXISTS idx_summary_minute_model_minute ON summary_minutes (model, minute_timestamp DESC)",
  48. }
  49. for _, index := range indexes {
  50. if err := db.Exec(index).Error; err != nil {
  51. return err
  52. }
  53. }
  54. return nil
  55. }
  56. func UpsertSummaryMinute(unique SummaryMinuteUnique, data SummaryData) error {
  57. err := validateMinuteTimestamp(unique.MinuteTimestamp)
  58. if err != nil {
  59. return err
  60. }
  61. for range 3 {
  62. result := LogDB.
  63. Model(&SummaryMinute{}).
  64. Where(
  65. "channel_id = ? AND model = ? AND minute_timestamp = ?",
  66. unique.ChannelID,
  67. unique.Model,
  68. unique.MinuteTimestamp,
  69. ).
  70. Updates(data.buildUpdateData("summary_minutes"))
  71. err = result.Error
  72. if err != nil {
  73. return err
  74. }
  75. if result.RowsAffected > 0 {
  76. return nil
  77. }
  78. err = createSummaryMinute(unique, data)
  79. if err == nil {
  80. return nil
  81. }
  82. if !errors.Is(err, gorm.ErrDuplicatedKey) {
  83. return err
  84. }
  85. }
  86. return err
  87. }
  88. func createSummaryMinute(unique SummaryMinuteUnique, data SummaryData) error {
  89. return LogDB.
  90. Clauses(clause.OnConflict{
  91. Columns: []clause.Column{
  92. {Name: "channel_id"},
  93. {Name: "model"},
  94. {Name: "minute_timestamp"},
  95. },
  96. DoUpdates: clause.Assignments(data.buildUpdateData("summary_minutes")),
  97. }).
  98. Create(&SummaryMinute{
  99. Unique: unique,
  100. Data: data,
  101. }).Error
  102. }
  103. func getChartDataMinute(
  104. start, end time.Time,
  105. channelID int,
  106. modelName string,
  107. timeSpan TimeSpanType,
  108. timezone *time.Location,
  109. ) ([]ChartData, error) {
  110. query := LogDB.Model(&SummaryMinute{})
  111. if channelID != 0 {
  112. query = query.Where("channel_id = ?", channelID)
  113. }
  114. if modelName != "" {
  115. query = query.Where("model = ?", modelName)
  116. }
  117. switch {
  118. case !start.IsZero() && !end.IsZero():
  119. query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  120. case !start.IsZero():
  121. query = query.Where("minute_timestamp >= ?", start.Unix())
  122. case !end.IsZero():
  123. query = query.Where("minute_timestamp <= ?", end.Unix())
  124. }
  125. // Only include max metrics when we have specific channel and model
  126. const selectFields = "minute_timestamp as timestamp, sum(used_amount) as used_amount, " +
  127. "sum(request_count) as request_count, sum(retry_count) as retry_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
  128. "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
  129. "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, sum(output_tokens) as output_tokens, " +
  130. "sum(cached_tokens) as cached_tokens, sum(cache_creation_tokens) as cache_creation_tokens, " +
  131. "sum(total_tokens) as total_tokens, sum(web_search_count) as web_search_count"
  132. query = query.
  133. Select(selectFields).
  134. Group("timestamp")
  135. var chartData []ChartData
  136. err := query.Find(&chartData).Error
  137. if err != nil {
  138. return nil, err
  139. }
  140. if len(chartData) > 0 && timeSpan != TimeSpanMinute {
  141. chartData = aggregateDataToSpan(chartData, timeSpan, timezone)
  142. }
  143. slices.SortFunc(chartData, func(a, b ChartData) int {
  144. return cmp.Compare(a.Timestamp, b.Timestamp)
  145. })
  146. return chartData, nil
  147. }
  148. func getGroupChartDataMinute(
  149. group string,
  150. start, end time.Time,
  151. tokenName, modelName string,
  152. timeSpan TimeSpanType,
  153. timezone *time.Location,
  154. ) ([]ChartData, error) {
  155. query := LogDB.Model(&GroupSummaryMinute{})
  156. if group != "" {
  157. query = query.Where("group_id = ?", group)
  158. }
  159. if tokenName != "" {
  160. query = query.Where("token_name = ?", tokenName)
  161. }
  162. if modelName != "" {
  163. query = query.Where("model = ?", modelName)
  164. }
  165. switch {
  166. case !start.IsZero() && !end.IsZero():
  167. query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  168. case !start.IsZero():
  169. query = query.Where("minute_timestamp >= ?", start.Unix())
  170. case !end.IsZero():
  171. query = query.Where("minute_timestamp <= ?", end.Unix())
  172. }
  173. // Only include max metrics when we have specific channel and model
  174. const selectFields = "minute_timestamp as timestamp, sum(used_amount) as used_amount, " +
  175. "sum(request_count) as request_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
  176. "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
  177. "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, sum(output_tokens) as output_tokens, " +
  178. "sum(cached_tokens) as cached_tokens, sum(cache_creation_tokens) as cache_creation_tokens, " +
  179. "sum(total_tokens) as total_tokens, sum(web_search_count) as web_search_count"
  180. query = query.
  181. Select(selectFields).
  182. Group("timestamp")
  183. var chartData []ChartData
  184. err := query.Find(&chartData).Error
  185. if err != nil {
  186. return nil, err
  187. }
  188. if len(chartData) > 0 && timeSpan != TimeSpanMinute {
  189. chartData = aggregateDataToSpan(chartData, timeSpan, timezone)
  190. }
  191. slices.SortFunc(chartData, func(a, b ChartData) int {
  192. return cmp.Compare(a.Timestamp, b.Timestamp)
  193. })
  194. return chartData, nil
  195. }
  196. func GetUsedChannelsMinute(start, end time.Time) ([]int, error) {
  197. return getLogGroupByValuesMinute[int]("channel_id", start, end)
  198. }
  199. func GetUsedModelsMinute(start, end time.Time) ([]string, error) {
  200. return getLogGroupByValuesMinute[string]("model", start, end)
  201. }
  202. func GetGroupUsedModelsMinute(group, tokenName string, start, end time.Time) ([]string, error) {
  203. return getGroupLogGroupByValuesMinute[string]("model", group, tokenName, start, end)
  204. }
  205. func GetGroupUsedTokenNamesMinute(group string, start, end time.Time) ([]string, error) {
  206. return getGroupLogGroupByValuesMinute[string]("token_name", group, "", start, end)
  207. }
  208. func getLogGroupByValuesMinute[T cmp.Ordered](
  209. field string,
  210. start, end time.Time,
  211. ) ([]T, error) {
  212. type Result struct {
  213. Value T
  214. UsedAmount float64
  215. RequestCount int64
  216. }
  217. var results []Result
  218. var query *gorm.DB
  219. query = LogDB.
  220. Model(&SummaryMinute{})
  221. switch {
  222. case !start.IsZero() && !end.IsZero():
  223. query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  224. case !start.IsZero():
  225. query = query.Where("minute_timestamp >= ?", start.Unix())
  226. case !end.IsZero():
  227. query = query.Where("minute_timestamp <= ?", end.Unix())
  228. }
  229. err := query.
  230. Select(
  231. field + " as value, SUM(request_count) as request_count, SUM(used_amount) as used_amount",
  232. ).
  233. Group(field).
  234. Find(&results).Error
  235. if err != nil {
  236. return nil, err
  237. }
  238. slices.SortFunc(results, func(a, b Result) int {
  239. if a.UsedAmount != b.UsedAmount {
  240. return cmp.Compare(b.UsedAmount, a.UsedAmount)
  241. }
  242. if a.RequestCount != b.RequestCount {
  243. return cmp.Compare(b.RequestCount, a.RequestCount)
  244. }
  245. return cmp.Compare(a.Value, b.Value)
  246. })
  247. values := make([]T, len(results))
  248. for i, result := range results {
  249. values[i] = result.Value
  250. }
  251. return values, nil
  252. }
  253. func getGroupLogGroupByValuesMinute[T cmp.Ordered](
  254. field, group, tokenName string,
  255. start, end time.Time,
  256. ) ([]T, error) {
  257. type Result struct {
  258. Value T
  259. UsedAmount float64
  260. RequestCount int64
  261. }
  262. var results []Result
  263. query := LogDB.
  264. Model(&GroupSummaryMinute{})
  265. if group != "" {
  266. query = query.Where("group_id = ?", group)
  267. }
  268. if tokenName != "" {
  269. query = query.Where("token_name = ?", tokenName)
  270. }
  271. switch {
  272. case !start.IsZero() && !end.IsZero():
  273. query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  274. case !start.IsZero():
  275. query = query.Where("minute_timestamp >= ?", start.Unix())
  276. case !end.IsZero():
  277. query = query.Where("minute_timestamp <= ?", end.Unix())
  278. }
  279. err := query.
  280. Select(
  281. field + " as value, SUM(request_count) as request_count, SUM(used_amount) as used_amount",
  282. ).
  283. Group(field).
  284. Find(&results).Error
  285. if err != nil {
  286. return nil, err
  287. }
  288. slices.SortFunc(results, func(a, b Result) int {
  289. if a.UsedAmount != b.UsedAmount {
  290. return cmp.Compare(b.UsedAmount, a.UsedAmount)
  291. }
  292. if a.RequestCount != b.RequestCount {
  293. return cmp.Compare(b.RequestCount, a.RequestCount)
  294. }
  295. return cmp.Compare(a.Value, b.Value)
  296. })
  297. values := make([]T, len(results))
  298. for i, result := range results {
  299. values[i] = result.Value
  300. }
  301. return values, nil
  302. }
  303. func getDashboardDataMinute(
  304. start,
  305. end time.Time,
  306. modelName string,
  307. channelID int,
  308. timeSpan TimeSpanType,
  309. timezone *time.Location,
  310. ) (*DashboardResponse, error) {
  311. if end.IsZero() {
  312. end = time.Now()
  313. } else if end.Before(start) {
  314. return nil, errors.New("end time is before start time")
  315. }
  316. var (
  317. chartData []ChartData
  318. channels []int
  319. models []string
  320. )
  321. g := new(errgroup.Group)
  322. g.Go(func() error {
  323. var err error
  324. chartData, err = getChartDataMinute(start, end, channelID, modelName, timeSpan, timezone)
  325. return err
  326. })
  327. g.Go(func() error {
  328. var err error
  329. channels, err = GetUsedChannelsMinute(start, end)
  330. return err
  331. })
  332. g.Go(func() error {
  333. var err error
  334. models, err = GetUsedModelsMinute(start, end)
  335. return err
  336. })
  337. if err := g.Wait(); err != nil {
  338. return nil, err
  339. }
  340. dashboardResponse := sumDashboardResponse(chartData)
  341. dashboardResponse.Channels = channels
  342. dashboardResponse.Models = models
  343. return &dashboardResponse, nil
  344. }
  345. func getGroupDashboardDataMinute(
  346. group string,
  347. start, end time.Time,
  348. tokenName string,
  349. modelName string,
  350. timeSpan TimeSpanType,
  351. timezone *time.Location,
  352. ) (*GroupDashboardResponse, error) {
  353. if group == "" {
  354. return nil, errors.New("group is required")
  355. }
  356. if end.IsZero() {
  357. end = time.Now()
  358. } else if end.Before(start) {
  359. return nil, errors.New("end time is before start time")
  360. }
  361. var (
  362. chartData []ChartData
  363. tokenNames []string
  364. models []string
  365. )
  366. g := new(errgroup.Group)
  367. g.Go(func() error {
  368. var err error
  369. chartData, err = getGroupChartDataMinute(
  370. group,
  371. start,
  372. end,
  373. tokenName,
  374. modelName,
  375. timeSpan,
  376. timezone,
  377. )
  378. return err
  379. })
  380. g.Go(func() error {
  381. var err error
  382. tokenNames, err = GetGroupUsedTokenNamesMinute(group, start, end)
  383. return err
  384. })
  385. g.Go(func() error {
  386. var err error
  387. models, err = GetGroupUsedModelsMinute(group, tokenName, start, end)
  388. return err
  389. })
  390. if err := g.Wait(); err != nil {
  391. return nil, err
  392. }
  393. dashboardResponse := sumDashboardResponse(chartData)
  394. dashboardResponse.Models = models
  395. return &GroupDashboardResponse{
  396. DashboardResponse: dashboardResponse,
  397. TokenNames: tokenNames,
  398. }, nil
  399. }
  400. type SummaryDataV2 struct {
  401. Timestamp int64 `json:"timestamp,omitempty"`
  402. ChannelID int `json:"channel_id,omitempty"`
  403. GroupID string `json:"group_id,omitempty"`
  404. TokenName string `json:"token_name,omitempty"`
  405. Model string `json:"model"`
  406. UsedAmount float64 `json:"used_amount"`
  407. TotalTimeMilliseconds int64 `json:"total_time_milliseconds"`
  408. TotalTTFBMilliseconds int64 `json:"total_ttfb_milliseconds"`
  409. Count
  410. Usage
  411. MaxRPM int64 `json:"max_rpm"`
  412. MaxTPM int64 `json:"max_tpm"`
  413. }
  414. type TimeSummaryDataV2 struct {
  415. Timestamp int64 `json:"timestamp"`
  416. Summary []SummaryDataV2 `json:"summary"`
  417. }
  418. func GetTimeSeriesModelData(
  419. channelID int,
  420. modelName string,
  421. start, end time.Time,
  422. timeSpan TimeSpanType,
  423. timezone *time.Location,
  424. ) ([]TimeSummaryDataV2, error) {
  425. if timeSpan == TimeSpanMinute {
  426. return getTimeSeriesModelDataMinute(channelID, modelName, start, end, timeSpan, timezone)
  427. }
  428. if end.IsZero() {
  429. end = time.Now()
  430. } else if end.Before(start) {
  431. return nil, errors.New("end time is before start time")
  432. }
  433. query := LogDB.Model(&Summary{})
  434. if channelID != 0 {
  435. query = query.Where("channel_id = ?", channelID)
  436. }
  437. if modelName != "" {
  438. query = query.Where("model = ?", modelName)
  439. }
  440. switch {
  441. case !start.IsZero() && !end.IsZero():
  442. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  443. case !start.IsZero():
  444. query = query.Where("hour_timestamp >= ?", start.Unix())
  445. case !end.IsZero():
  446. query = query.Where("hour_timestamp <= ?", end.Unix())
  447. }
  448. const selectFields = "hour_timestamp as timestamp, channel_id, model, " +
  449. "sum(used_amount) as used_amount, " +
  450. "sum(request_count) as request_count, sum(retry_count) as retry_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
  451. "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
  452. "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
  453. "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
  454. "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
  455. "sum(web_search_count) as web_search_count"
  456. var rawData []SummaryDataV2
  457. err := query.
  458. Select(selectFields).
  459. Group("timestamp, channel_id, model").
  460. Find(&rawData).Error
  461. if err != nil {
  462. return nil, err
  463. }
  464. if len(rawData) > 0 {
  465. err = batchFillMaxValues(rawData, channelID, modelName, start, end)
  466. if err != nil {
  467. return nil, err
  468. }
  469. if timeSpan != TimeSpanHour {
  470. rawData = aggregatToSpan(rawData, timeSpan, timezone)
  471. }
  472. }
  473. result := convertToTimeModelData(rawData)
  474. slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
  475. return cmp.Compare(a.Timestamp, b.Timestamp)
  476. })
  477. return result, nil
  478. }
  479. func GetGroupTimeSeriesModelData(
  480. group string,
  481. tokenName string,
  482. modelName string,
  483. start, end time.Time,
  484. timeSpan TimeSpanType,
  485. timezone *time.Location,
  486. ) ([]TimeSummaryDataV2, error) {
  487. if timeSpan == TimeSpanMinute {
  488. return getGroupTimeSeriesModelDataMinute(
  489. group,
  490. tokenName,
  491. modelName,
  492. start,
  493. end,
  494. timeSpan,
  495. timezone,
  496. )
  497. }
  498. if end.IsZero() {
  499. end = time.Now()
  500. } else if end.Before(start) {
  501. return nil, errors.New("end time is before start time")
  502. }
  503. query := LogDB.Model(&GroupSummary{}).
  504. Where("group_id = ?", group)
  505. if tokenName != "" {
  506. query = query.Where("token_name = ?", tokenName)
  507. }
  508. if modelName != "" {
  509. query = query.Where("model = ?", modelName)
  510. }
  511. switch {
  512. case !start.IsZero() && !end.IsZero():
  513. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  514. case !start.IsZero():
  515. query = query.Where("hour_timestamp >= ?", start.Unix())
  516. case !end.IsZero():
  517. query = query.Where("hour_timestamp <= ?", end.Unix())
  518. }
  519. const selectFields = "hour_timestamp as timestamp, group_id, token_name, model, " +
  520. "sum(used_amount) as used_amount, " +
  521. "sum(request_count) as request_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
  522. "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
  523. "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
  524. "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
  525. "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
  526. "sum(web_search_count) as web_search_count"
  527. var rawData []SummaryDataV2
  528. err := query.
  529. Select(selectFields).
  530. Group("timestamp, group_id, token_name, model").
  531. Find(&rawData).Error
  532. if err != nil {
  533. return nil, err
  534. }
  535. if len(rawData) > 0 {
  536. err = batchFillGroupMaxValues(rawData, group, tokenName, modelName, start, end)
  537. if err != nil {
  538. return nil, err
  539. }
  540. if timeSpan != TimeSpanHour {
  541. rawData = aggregatToSpan(rawData, timeSpan, timezone)
  542. }
  543. }
  544. result := convertToTimeModelData(rawData)
  545. slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
  546. return cmp.Compare(a.Timestamp, b.Timestamp)
  547. })
  548. return result, nil
  549. }
  550. func batchFillMaxValues(
  551. rawData []SummaryDataV2,
  552. channelID int,
  553. modelName string,
  554. start, end time.Time,
  555. ) error {
  556. minuteQuery := LogDB.Model(&SummaryMinute{})
  557. if channelID != 0 {
  558. minuteQuery = minuteQuery.Where("channel_id = ?", channelID)
  559. }
  560. if modelName != "" {
  561. minuteQuery = minuteQuery.Where("model = ?", modelName)
  562. }
  563. minuteStart := start.Unix()
  564. minuteEnd := end.Unix()
  565. if end.IsZero() {
  566. minuteEnd = time.Now().Unix()
  567. }
  568. minuteQuery = minuteQuery.Where(
  569. "minute_timestamp >= ? AND minute_timestamp <= ?",
  570. minuteStart,
  571. minuteEnd,
  572. )
  573. type MaxResult struct {
  574. HourTimestamp int64 `json:"hour_timestamp"`
  575. ChannelID int `json:"channel_id"`
  576. Model string `json:"model"`
  577. MaxRPM int64 `json:"max_rpm"`
  578. MaxTPM int64 `json:"max_tpm"`
  579. }
  580. var maxResults []MaxResult
  581. err := minuteQuery.
  582. Select(`
  583. (minute_timestamp - minute_timestamp % 3600) as hour_timestamp,
  584. channel_id,
  585. model,
  586. MAX(request_count) as max_rpm,
  587. MAX(total_tokens) as max_tpm
  588. `).
  589. Group("hour_timestamp, channel_id, model").
  590. Find(&maxResults).Error
  591. if err != nil {
  592. return err
  593. }
  594. type Key struct {
  595. HourTimestamp int64
  596. ChannelID int
  597. Model string
  598. }
  599. maxMap := make(map[Key]MaxResult)
  600. for _, result := range maxResults {
  601. key := Key{
  602. HourTimestamp: result.HourTimestamp,
  603. ChannelID: result.ChannelID,
  604. Model: result.Model,
  605. }
  606. maxMap[key] = result
  607. }
  608. for i := range rawData {
  609. data := &rawData[i]
  610. key := Key{
  611. HourTimestamp: data.Timestamp,
  612. ChannelID: data.ChannelID,
  613. Model: data.Model,
  614. }
  615. if maxResult, exists := maxMap[key]; exists {
  616. data.MaxRPM = maxResult.MaxRPM
  617. data.MaxTPM = maxResult.MaxTPM
  618. }
  619. }
  620. return nil
  621. }
  622. func batchFillGroupMaxValues(
  623. rawData []SummaryDataV2,
  624. group, tokenName, modelName string,
  625. start, end time.Time,
  626. ) error {
  627. minuteQuery := LogDB.Model(&GroupSummaryMinute{}).
  628. Where("group_id = ?", group)
  629. if tokenName != "" {
  630. minuteQuery = minuteQuery.Where("token_name = ?", tokenName)
  631. }
  632. if modelName != "" {
  633. minuteQuery = minuteQuery.Where("model = ?", modelName)
  634. }
  635. minuteStart := start.Unix()
  636. minuteEnd := end.Unix()
  637. if end.IsZero() {
  638. minuteEnd = time.Now().Unix()
  639. }
  640. minuteQuery = minuteQuery.Where(
  641. "minute_timestamp >= ? AND minute_timestamp <= ?",
  642. minuteStart,
  643. minuteEnd,
  644. )
  645. type MaxResult struct {
  646. HourTimestamp int64 `json:"hour_timestamp"`
  647. GroupID string `json:"group_id"`
  648. TokenName string `json:"token_name"`
  649. Model string `json:"model"`
  650. MaxRPM int64 `json:"max_rpm"`
  651. MaxTPM int64 `json:"max_tpm"`
  652. }
  653. var maxResults []MaxResult
  654. err := minuteQuery.
  655. Select(`
  656. (minute_timestamp - minute_timestamp % 3600) as hour_timestamp,
  657. group_id,
  658. token_name,
  659. model,
  660. MAX(request_count) as max_rpm,
  661. MAX(total_tokens) as max_tpm
  662. `).
  663. Group("hour_timestamp, group_id, token_name, model").
  664. Find(&maxResults).Error
  665. if err != nil {
  666. return err
  667. }
  668. type Key struct {
  669. HourTimestamp int64
  670. GroupID string
  671. TokenName string
  672. Model string
  673. }
  674. maxMap := make(map[Key]MaxResult)
  675. for _, result := range maxResults {
  676. key := Key{
  677. HourTimestamp: result.HourTimestamp,
  678. GroupID: result.GroupID,
  679. TokenName: result.TokenName,
  680. Model: result.Model,
  681. }
  682. maxMap[key] = result
  683. }
  684. for i := range rawData {
  685. data := &rawData[i]
  686. key := Key{
  687. HourTimestamp: data.Timestamp,
  688. GroupID: data.GroupID,
  689. TokenName: data.TokenName,
  690. Model: data.Model,
  691. }
  692. if maxResult, exists := maxMap[key]; exists {
  693. data.MaxRPM = maxResult.MaxRPM
  694. data.MaxTPM = maxResult.MaxTPM
  695. }
  696. }
  697. return nil
  698. }
  699. func getTimeSeriesModelDataMinute(
  700. channelID int,
  701. modelName string,
  702. start, end time.Time,
  703. timeSpan TimeSpanType,
  704. timezone *time.Location,
  705. ) ([]TimeSummaryDataV2, error) {
  706. if end.IsZero() {
  707. end = time.Now()
  708. } else if end.Before(start) {
  709. return nil, errors.New("end time is before start time")
  710. }
  711. query := LogDB.Model(&SummaryMinute{})
  712. if channelID != 0 {
  713. query = query.Where("channel_id = ?", channelID)
  714. }
  715. if modelName != "" {
  716. query = query.Where("model = ?", modelName)
  717. }
  718. switch {
  719. case !start.IsZero() && !end.IsZero():
  720. query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  721. case !start.IsZero():
  722. query = query.Where("minute_timestamp >= ?", start.Unix())
  723. case !end.IsZero():
  724. query = query.Where("minute_timestamp <= ?", end.Unix())
  725. }
  726. const selectFields = "minute_timestamp as timestamp, channel_id, model, " +
  727. "sum(used_amount) as used_amount, " +
  728. "sum(request_count) as request_count, sum(retry_count) as retry_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
  729. "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
  730. "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
  731. "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
  732. "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
  733. "sum(web_search_count) as web_search_count"
  734. var rawData []SummaryDataV2
  735. err := query.
  736. Select(selectFields).
  737. Group("timestamp, channel_id, model").
  738. Find(&rawData).Error
  739. if err != nil {
  740. return nil, err
  741. }
  742. for i, data := range rawData {
  743. rawData[i].MaxRPM = data.RequestCount
  744. rawData[i].MaxTPM = int64(data.TotalTokens)
  745. }
  746. if len(rawData) > 0 && timeSpan != TimeSpanMinute {
  747. rawData = aggregatToSpan(rawData, timeSpan, timezone)
  748. }
  749. result := convertToTimeModelData(rawData)
  750. slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
  751. return cmp.Compare(a.Timestamp, b.Timestamp)
  752. })
  753. return result, nil
  754. }
  755. func getGroupTimeSeriesModelDataMinute(
  756. group string,
  757. tokenName string,
  758. modelName string,
  759. start, end time.Time,
  760. timeSpan TimeSpanType,
  761. timezone *time.Location,
  762. ) ([]TimeSummaryDataV2, error) {
  763. if end.IsZero() {
  764. end = time.Now()
  765. } else if end.Before(start) {
  766. return nil, errors.New("end time is before start time")
  767. }
  768. query := LogDB.Model(&GroupSummaryMinute{}).
  769. Where("group_id = ?", group)
  770. if tokenName != "" {
  771. query = query.Where("token_name = ?", tokenName)
  772. }
  773. if modelName != "" {
  774. query = query.Where("model = ?", modelName)
  775. }
  776. switch {
  777. case !start.IsZero() && !end.IsZero():
  778. query = query.Where("minute_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  779. case !start.IsZero():
  780. query = query.Where("minute_timestamp >= ?", start.Unix())
  781. case !end.IsZero():
  782. query = query.Where("minute_timestamp <= ?", end.Unix())
  783. }
  784. const selectFields = "minute_timestamp as timestamp, group_id, token_name, model, " +
  785. "sum(used_amount) as used_amount, " +
  786. "sum(request_count) as request_count, sum(exception_count) as exception_count, sum(status4xx_count) as status4xx_count, sum(status5xx_count) as status5xx_count, sum(status400_count) as status400_count, sum(status429_count) as status429_count, sum(status500_count) as status500_count, " +
  787. "sum(total_time_milliseconds) as total_time_milliseconds, sum(total_ttfb_milliseconds) as total_ttfb_milliseconds, " +
  788. "sum(input_tokens) as input_tokens, sum(image_input_tokens) as image_input_tokens, sum(audio_input_tokens) as audio_input_tokens, " +
  789. "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
  790. "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
  791. "sum(web_search_count) as web_search_count"
  792. var rawData []SummaryDataV2
  793. err := query.
  794. Select(selectFields).
  795. Group("timestamp, group_id, token_name, model").
  796. Find(&rawData).Error
  797. if err != nil {
  798. return nil, err
  799. }
  800. for i, data := range rawData {
  801. rawData[i].MaxRPM = data.RequestCount
  802. rawData[i].MaxTPM = int64(data.TotalTokens)
  803. }
  804. if len(rawData) > 0 && timeSpan != TimeSpanMinute {
  805. rawData = aggregatToSpanGroup(rawData, timeSpan, timezone)
  806. }
  807. result := convertToTimeModelData(rawData)
  808. slices.SortFunc(result, func(a, b TimeSummaryDataV2) int {
  809. return cmp.Compare(a.Timestamp, b.Timestamp)
  810. })
  811. return result, nil
  812. }
  813. func aggregatToSpan(
  814. minuteData []SummaryDataV2,
  815. timeSpan TimeSpanType,
  816. timezone *time.Location,
  817. ) []SummaryDataV2 {
  818. if timezone == nil {
  819. timezone = time.Local
  820. }
  821. type AggKey struct {
  822. Timestamp int64
  823. ChannelID int
  824. Model string
  825. }
  826. dataMap := make(map[AggKey]SummaryDataV2)
  827. for _, data := range minuteData {
  828. t := time.Unix(data.Timestamp, 0).In(timezone)
  829. key := AggKey{
  830. ChannelID: data.ChannelID,
  831. Model: data.Model,
  832. }
  833. switch timeSpan {
  834. case TimeSpanMonth:
  835. startOfMonth := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, timezone)
  836. key.Timestamp = startOfMonth.Unix()
  837. case TimeSpanDay:
  838. startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, timezone)
  839. key.Timestamp = startOfDay.Unix()
  840. case TimeSpanHour:
  841. startOfHour := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, timezone)
  842. key.Timestamp = startOfHour.Unix()
  843. case TimeSpanMinute:
  844. fallthrough
  845. default:
  846. startOfMinute := time.Date(
  847. t.Year(),
  848. t.Month(),
  849. t.Day(),
  850. t.Hour(),
  851. t.Minute(),
  852. 0,
  853. 0,
  854. timezone,
  855. )
  856. key.Timestamp = startOfMinute.Unix()
  857. }
  858. currentData, exists := dataMap[key]
  859. if !exists {
  860. currentData = SummaryDataV2{
  861. Timestamp: key.Timestamp,
  862. ChannelID: data.ChannelID,
  863. Model: data.Model,
  864. }
  865. }
  866. currentData.Count.Add(data.Count)
  867. currentData.Usage.Add(data.Usage)
  868. currentData.UsedAmount = decimal.
  869. NewFromFloat(currentData.UsedAmount).
  870. Add(decimal.NewFromFloat(data.UsedAmount)).
  871. InexactFloat64()
  872. currentData.TotalTimeMilliseconds += data.TotalTimeMilliseconds
  873. currentData.TotalTTFBMilliseconds += data.TotalTTFBMilliseconds
  874. if data.MaxRPM > currentData.MaxRPM {
  875. currentData.MaxRPM = data.MaxRPM
  876. }
  877. if data.MaxTPM > currentData.MaxTPM {
  878. currentData.MaxTPM = data.MaxTPM
  879. }
  880. dataMap[key] = currentData
  881. }
  882. result := make([]SummaryDataV2, 0, len(dataMap))
  883. for _, data := range dataMap {
  884. result = append(result, data)
  885. }
  886. return result
  887. }
  888. func aggregatToSpanGroup(
  889. minuteData []SummaryDataV2,
  890. timeSpan TimeSpanType,
  891. timezone *time.Location,
  892. ) []SummaryDataV2 {
  893. if timezone == nil {
  894. timezone = time.Local
  895. }
  896. type AggKey struct {
  897. Timestamp int64
  898. GroupID string
  899. TokenName string
  900. Model string
  901. }
  902. dataMap := make(map[AggKey]SummaryDataV2)
  903. for _, data := range minuteData {
  904. t := time.Unix(data.Timestamp, 0).In(timezone)
  905. key := AggKey{
  906. GroupID: data.GroupID,
  907. TokenName: data.TokenName,
  908. Model: data.Model,
  909. }
  910. switch timeSpan {
  911. case TimeSpanMonth:
  912. startOfMonth := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, timezone)
  913. key.Timestamp = startOfMonth.Unix()
  914. case TimeSpanDay:
  915. startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, timezone)
  916. key.Timestamp = startOfDay.Unix()
  917. case TimeSpanHour:
  918. startOfHour := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, timezone)
  919. key.Timestamp = startOfHour.Unix()
  920. case TimeSpanMinute:
  921. fallthrough
  922. default:
  923. startOfMinute := time.Date(
  924. t.Year(),
  925. t.Month(),
  926. t.Day(),
  927. t.Hour(),
  928. t.Minute(),
  929. 0,
  930. 0,
  931. timezone,
  932. )
  933. key.Timestamp = startOfMinute.Unix()
  934. }
  935. currentData, exists := dataMap[key]
  936. if !exists {
  937. currentData = SummaryDataV2{
  938. Timestamp: key.Timestamp,
  939. GroupID: data.GroupID,
  940. TokenName: data.TokenName,
  941. Model: data.Model,
  942. }
  943. }
  944. currentData.Count.Add(data.Count)
  945. currentData.Usage.Add(data.Usage)
  946. currentData.UsedAmount = decimal.
  947. NewFromFloat(currentData.UsedAmount).
  948. Add(decimal.NewFromFloat(data.UsedAmount)).
  949. InexactFloat64()
  950. currentData.TotalTimeMilliseconds += data.TotalTimeMilliseconds
  951. currentData.TotalTTFBMilliseconds += data.TotalTTFBMilliseconds
  952. if data.MaxRPM > currentData.MaxRPM {
  953. currentData.MaxRPM = data.MaxRPM
  954. }
  955. if data.MaxTPM > currentData.MaxTPM {
  956. currentData.MaxTPM = data.MaxTPM
  957. }
  958. dataMap[key] = currentData
  959. }
  960. result := make([]SummaryDataV2, 0, len(dataMap))
  961. for _, data := range dataMap {
  962. result = append(result, data)
  963. }
  964. return result
  965. }
  966. func convertToTimeModelData(rawData []SummaryDataV2) []TimeSummaryDataV2 {
  967. timeMap := make(map[int64][]SummaryDataV2)
  968. for _, data := range rawData {
  969. timeMap[data.Timestamp] = append(timeMap[data.Timestamp], data)
  970. }
  971. result := make([]TimeSummaryDataV2, 0, len(timeMap))
  972. for timestamp, models := range timeMap {
  973. slices.SortFunc(models, func(a, b SummaryDataV2) int {
  974. if a.UsedAmount != b.UsedAmount {
  975. return cmp.Compare(b.UsedAmount, a.UsedAmount)
  976. }
  977. if a.TotalTokens != b.TotalTokens {
  978. return cmp.Compare(b.TotalTokens, a.TotalTokens)
  979. }
  980. if a.RequestCount != b.RequestCount {
  981. return cmp.Compare(b.RequestCount, a.RequestCount)
  982. }
  983. return cmp.Compare(a.Model, b.Model)
  984. })
  985. result = append(result, TimeSummaryDataV2{
  986. Timestamp: timestamp,
  987. Summary: models,
  988. })
  989. }
  990. return result
  991. }