summary.go 26 KB


  1. package model
  2. import (
  3. "cmp"
  4. "errors"
  5. "fmt"
  6. "slices"
  7. "time"
  8. "github.com/shopspring/decimal"
  9. "golang.org/x/sync/errgroup"
  10. "gorm.io/gorm"
  11. "gorm.io/gorm/clause"
  12. )
  13. // only summary result only requests
  14. type Summary struct {
  15. ID int `gorm:"primaryKey"`
  16. Unique SummaryUnique `gorm:"embedded"`
  17. Data SummaryData `gorm:"embedded"`
  18. }
  19. type SummaryUnique struct {
  20. ChannelID int `gorm:"not null;uniqueIndex:idx_summary_unique,priority:1"`
  21. Model string `gorm:"not null;uniqueIndex:idx_summary_unique,priority:2"`
  22. HourTimestamp int64 `gorm:"not null;uniqueIndex:idx_summary_unique,priority:3,sort:desc"`
  23. }
  24. type SummaryData struct {
  25. RequestCount int64 `json:"request_count"`
  26. UsedAmount float64 `json:"used_amount"`
  27. ExceptionCount int64 `json:"exception_count"`
  28. MaxRPM int64 `json:"max_rpm,omitempty"`
  29. MaxRPS int64 `json:"max_rps,omitempty"`
  30. MaxTPM int64 `json:"max_tpm,omitempty"`
  31. MaxTPS int64 `json:"max_tps,omitempty"`
  32. Usage Usage `json:"usage,omitempty" gorm:"embedded"`
  33. }
  34. func (d *SummaryData) buildUpdateData(tableName string) map[string]any {
  35. data := map[string]any{}
  36. if d.RequestCount > 0 {
  37. data["request_count"] = gorm.Expr(tableName+".request_count + ?", d.RequestCount)
  38. }
  39. if d.UsedAmount > 0 {
  40. data["used_amount"] = gorm.Expr(tableName+".used_amount + ?", d.UsedAmount)
  41. }
  42. if d.ExceptionCount > 0 {
  43. data["exception_count"] = gorm.Expr(tableName+".exception_count + ?", d.ExceptionCount)
  44. }
  45. // max rpm tpm update
  46. if d.MaxRPM > 0 {
  47. data["max_rpm"] = gorm.Expr(
  48. fmt.Sprintf(
  49. "CASE WHEN %s.max_rpm < ? THEN ? ELSE %s.max_rpm END",
  50. tableName,
  51. tableName,
  52. ),
  53. d.MaxRPM,
  54. d.MaxRPM,
  55. )
  56. }
  57. if d.MaxRPS > 0 {
  58. data["max_rps"] = gorm.Expr(
  59. fmt.Sprintf(
  60. "CASE WHEN %s.max_rps < ? THEN ? ELSE %s.max_rps END",
  61. tableName,
  62. tableName,
  63. ),
  64. d.MaxRPS,
  65. d.MaxRPS,
  66. )
  67. }
  68. if d.MaxTPM > 0 {
  69. data["max_tpm"] = gorm.Expr(
  70. fmt.Sprintf(
  71. "CASE WHEN %s.max_tpm < ? THEN ? ELSE %s.max_tpm END",
  72. tableName,
  73. tableName,
  74. ),
  75. d.MaxTPM,
  76. d.MaxTPM,
  77. )
  78. }
  79. if d.MaxTPS > 0 {
  80. data["max_tps"] = gorm.Expr(
  81. fmt.Sprintf(
  82. "CASE WHEN %s.max_tps < ? THEN ? ELSE %s.max_tps END",
  83. tableName,
  84. tableName,
  85. ),
  86. d.MaxTPS,
  87. d.MaxTPS,
  88. )
  89. }
  90. // usage update
  91. if d.Usage.InputTokens > 0 {
  92. data["input_tokens"] = gorm.Expr(
  93. fmt.Sprintf("COALESCE(%s.input_tokens, 0) + ?", tableName),
  94. d.Usage.InputTokens,
  95. )
  96. }
  97. if d.Usage.ImageInputTokens > 0 {
  98. data["image_input_tokens"] = gorm.Expr(
  99. fmt.Sprintf("COALESCE(%s.image_input_tokens, 0) + ?", tableName),
  100. d.Usage.ImageInputTokens,
  101. )
  102. }
  103. if d.Usage.OutputTokens > 0 {
  104. data["output_tokens"] = gorm.Expr(
  105. fmt.Sprintf("COALESCE(%s.output_tokens, 0) + ?", tableName),
  106. d.Usage.OutputTokens,
  107. )
  108. }
  109. if d.Usage.TotalTokens > 0 {
  110. data["total_tokens"] = gorm.Expr(
  111. fmt.Sprintf("COALESCE(%s.total_tokens, 0) + ?", tableName),
  112. d.Usage.TotalTokens,
  113. )
  114. }
  115. if d.Usage.CachedTokens > 0 {
  116. data["cached_tokens"] = gorm.Expr(
  117. fmt.Sprintf("COALESCE(%s.cached_tokens, 0) + ?", tableName),
  118. d.Usage.CachedTokens,
  119. )
  120. }
  121. if d.Usage.CacheCreationTokens > 0 {
  122. data["cache_creation_tokens"] = gorm.Expr(
  123. fmt.Sprintf("COALESCE(%s.cache_creation_tokens, 0) + ?", tableName),
  124. d.Usage.CacheCreationTokens,
  125. )
  126. }
  127. if d.Usage.WebSearchCount > 0 {
  128. data["web_search_count"] = gorm.Expr(
  129. fmt.Sprintf("COALESCE(%s.web_search_count, 0) + ?", tableName),
  130. d.Usage.WebSearchCount,
  131. )
  132. }
  133. return data
  134. }
  135. func (l *Summary) BeforeCreate(_ *gorm.DB) (err error) {
  136. if l.Unique.ChannelID == 0 {
  137. return errors.New("channel id is required")
  138. }
  139. if l.Unique.Model == "" {
  140. return errors.New("model is required")
  141. }
  142. if l.Unique.HourTimestamp == 0 {
  143. return errors.New("hour timestamp is required")
  144. }
  145. if err := validateHourTimestamp(l.Unique.HourTimestamp); err != nil {
  146. return err
  147. }
  148. return
  149. }
  150. var hourTimestampDivisor = int64(time.Hour.Seconds())
  151. func validateHourTimestamp(hourTimestamp int64) error {
  152. if hourTimestamp%hourTimestampDivisor != 0 {
  153. return errors.New("hour timestamp must be an exact hour")
  154. }
  155. return nil
  156. }
  157. func CreateSummaryIndexs(db *gorm.DB) error {
  158. indexes := []string{
  159. "CREATE INDEX IF NOT EXISTS idx_summary_channel_hour ON summaries (channel_id, hour_timestamp DESC)",
  160. "CREATE INDEX IF NOT EXISTS idx_summary_model_hour ON summaries (model, hour_timestamp DESC)",
  161. }
  162. for _, index := range indexes {
  163. if err := db.Exec(index).Error; err != nil {
  164. return err
  165. }
  166. }
  167. return nil
  168. }
  169. func UpsertSummary(unique SummaryUnique, data SummaryData) error {
  170. err := validateHourTimestamp(unique.HourTimestamp)
  171. if err != nil {
  172. return err
  173. }
  174. for range 3 {
  175. result := LogDB.
  176. Model(&Summary{}).
  177. Where(
  178. "channel_id = ? AND model = ? AND hour_timestamp = ?",
  179. unique.ChannelID,
  180. unique.Model,
  181. unique.HourTimestamp,
  182. ).
  183. Updates(data.buildUpdateData("summaries"))
  184. err = result.Error
  185. if err != nil {
  186. return err
  187. }
  188. if result.RowsAffected > 0 {
  189. return nil
  190. }
  191. err = createSummary(unique, data)
  192. if err == nil {
  193. return nil
  194. }
  195. if !errors.Is(err, gorm.ErrDuplicatedKey) {
  196. return err
  197. }
  198. }
  199. return err
  200. }
  201. func createSummary(unique SummaryUnique, data SummaryData) error {
  202. return LogDB.
  203. Clauses(clause.OnConflict{
  204. Columns: []clause.Column{
  205. {Name: "channel_id"},
  206. {Name: "model"},
  207. {Name: "hour_timestamp"},
  208. },
  209. DoUpdates: clause.Assignments(data.buildUpdateData("summaries")),
  210. }).
  211. Create(&Summary{
  212. Unique: unique,
  213. Data: data,
  214. }).Error
  215. }
  216. func getChartData(
  217. start, end time.Time,
  218. channelID int,
  219. modelName string,
  220. timeSpan TimeSpanType,
  221. timezone *time.Location,
  222. ) ([]*ChartData, error) {
  223. query := LogDB.Model(&Summary{})
  224. if channelID != 0 {
  225. query = query.Where("channel_id = ?", channelID)
  226. }
  227. if modelName != "" {
  228. query = query.Where("model = ?", modelName)
  229. }
  230. switch {
  231. case !start.IsZero() && !end.IsZero():
  232. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  233. case !start.IsZero():
  234. query = query.Where("hour_timestamp >= ?", start.Unix())
  235. case !end.IsZero():
  236. query = query.Where("hour_timestamp <= ?", end.Unix())
  237. }
  238. // Only include max metrics when we have specific channel and model
  239. selectFields := "hour_timestamp as timestamp, sum(request_count) as request_count, sum(used_amount) as used_amount, " +
  240. "sum(exception_count) as exception_count, sum(input_tokens) as input_tokens, sum(output_tokens) as output_tokens, " +
  241. "sum(cached_tokens) as cached_tokens, sum(cache_creation_tokens) as cache_creation_tokens, " +
  242. "sum(total_tokens) as total_tokens, sum(web_search_count) as web_search_count"
  243. // Only include max metrics when querying for a specific channel and model
  244. if channelID != 0 && modelName != "" {
  245. selectFields += ", max(max_rpm) as max_rpm, max(max_rps) as max_rps, max(max_tpm) as max_tpm, max(max_tps) as max_tps"
  246. } else {
  247. // Set max metrics to 0 when not querying for specific channel and model
  248. selectFields += ", 0 as max_rpm, 0 as max_rps, 0 as max_tpm, 0 as max_tps"
  249. }
  250. query = query.
  251. Select(selectFields).
  252. Group("timestamp").
  253. Order("timestamp ASC")
  254. var chartData []*ChartData
  255. err := query.Scan(&chartData).Error
  256. if err != nil {
  257. return nil, err
  258. }
  259. // If timeSpan is day, aggregate hour data into day data
  260. if timeSpan == TimeSpanDay && len(chartData) > 0 {
  261. return aggregateHourDataToDay(chartData, timezone), nil
  262. }
  263. return chartData, nil
  264. }
  265. func getGroupChartData(
  266. group string,
  267. start, end time.Time,
  268. tokenName, modelName string,
  269. timeSpan TimeSpanType,
  270. timezone *time.Location,
  271. ) ([]*ChartData, error) {
  272. query := LogDB.Model(&GroupSummary{})
  273. if group != "" {
  274. query = query.Where("group_id = ?", group)
  275. }
  276. if tokenName != "" {
  277. query = query.Where("token_name = ?", tokenName)
  278. }
  279. if modelName != "" {
  280. query = query.Where("model = ?", modelName)
  281. }
  282. switch {
  283. case !start.IsZero() && !end.IsZero():
  284. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  285. case !start.IsZero():
  286. query = query.Where("hour_timestamp >= ?", start.Unix())
  287. case !end.IsZero():
  288. query = query.Where("hour_timestamp <= ?", end.Unix())
  289. }
  290. // Only include max metrics when we have specific channel and model
  291. selectFields := "hour_timestamp as timestamp, sum(request_count) as request_count, sum(used_amount) as used_amount, " +
  292. "sum(exception_count) as exception_count, sum(input_tokens) as input_tokens, sum(output_tokens) as output_tokens, " +
  293. "sum(cached_tokens) as cached_tokens, sum(cache_creation_tokens) as cache_creation_tokens, " +
  294. "sum(total_tokens) as total_tokens, sum(web_search_count) as web_search_count"
  295. // Only include max metrics when querying for a specific channel and model
  296. if group != "" && tokenName != "" && modelName != "" {
  297. selectFields += ", max(max_rpm) as max_rpm, max(max_rps) as max_rps, max(max_tpm) as max_tpm, max(max_tps) as max_tps"
  298. } else {
  299. // Set max metrics to 0 when not querying for specific channel and model
  300. selectFields += ", 0 as max_rpm, 0 as max_rps, 0 as max_tpm, 0 as max_tps"
  301. }
  302. query = query.
  303. Select(selectFields).
  304. Group("timestamp").
  305. Order("timestamp ASC")
  306. var chartData []*ChartData
  307. err := query.Scan(&chartData).Error
  308. if err != nil {
  309. return nil, err
  310. }
  311. // If timeSpan is day, aggregate hour data into day data
  312. if timeSpan == TimeSpanDay && len(chartData) > 0 {
  313. return aggregateHourDataToDay(chartData, timezone), nil
  314. }
  315. return chartData, nil
  316. }
  317. func GetUsedChannels(start, end time.Time) ([]int, error) {
  318. return getLogGroupByValues[int]("channel_id", start, end)
  319. }
  320. func GetUsedModels(start, end time.Time) ([]string, error) {
  321. return getLogGroupByValues[string]("model", start, end)
  322. }
  323. func GetGroupUsedModels(group, tokenName string, start, end time.Time) ([]string, error) {
  324. return getGroupLogGroupByValues[string]("model", group, tokenName, start, end)
  325. }
  326. func GetGroupUsedTokenNames(group string, start, end time.Time) ([]string, error) {
  327. return getGroupLogGroupByValues[string]("token_name", group, "", start, end)
  328. }
  329. func getLogGroupByValues[T cmp.Ordered](
  330. field string,
  331. start, end time.Time,
  332. ) ([]T, error) {
  333. type Result struct {
  334. Value T
  335. UsedAmount float64
  336. RequestCount int64
  337. }
  338. var results []Result
  339. var query *gorm.DB
  340. query = LogDB.
  341. Model(&Summary{})
  342. switch {
  343. case !start.IsZero() && !end.IsZero():
  344. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  345. case !start.IsZero():
  346. query = query.Where("hour_timestamp >= ?", start.Unix())
  347. case !end.IsZero():
  348. query = query.Where("hour_timestamp <= ?", end.Unix())
  349. }
  350. err := query.
  351. Select(
  352. field + " as value, SUM(request_count) as request_count, SUM(used_amount) as used_amount",
  353. ).
  354. Group(field).
  355. Scan(&results).Error
  356. if err != nil {
  357. return nil, err
  358. }
  359. slices.SortFunc(results, func(a, b Result) int {
  360. if a.UsedAmount != b.UsedAmount {
  361. return cmp.Compare(b.UsedAmount, a.UsedAmount)
  362. }
  363. if a.RequestCount != b.RequestCount {
  364. return cmp.Compare(b.RequestCount, a.RequestCount)
  365. }
  366. return cmp.Compare(a.Value, b.Value)
  367. })
  368. values := make([]T, len(results))
  369. for i, result := range results {
  370. values[i] = result.Value
  371. }
  372. return values, nil
  373. }
  374. func getGroupLogGroupByValues[T cmp.Ordered](
  375. field, group, tokenName string,
  376. start, end time.Time,
  377. ) ([]T, error) {
  378. type Result struct {
  379. Value T
  380. UsedAmount float64
  381. RequestCount int64
  382. }
  383. var results []Result
  384. query := LogDB.
  385. Model(&GroupSummary{})
  386. if group != "" {
  387. query = query.Where("group_id = ?", group)
  388. }
  389. if tokenName != "" {
  390. query = query.Where("token_name = ?", tokenName)
  391. }
  392. switch {
  393. case !start.IsZero() && !end.IsZero():
  394. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  395. case !start.IsZero():
  396. query = query.Where("hour_timestamp >= ?", start.Unix())
  397. case !end.IsZero():
  398. query = query.Where("hour_timestamp <= ?", end.Unix())
  399. }
  400. err := query.
  401. Select(
  402. field + " as value, SUM(request_count) as request_count, SUM(used_amount) as used_amount",
  403. ).
  404. Group(field).
  405. Scan(&results).Error
  406. if err != nil {
  407. return nil, err
  408. }
  409. slices.SortFunc(results, func(a, b Result) int {
  410. if a.UsedAmount != b.UsedAmount {
  411. return cmp.Compare(b.UsedAmount, a.UsedAmount)
  412. }
  413. if a.RequestCount != b.RequestCount {
  414. return cmp.Compare(b.RequestCount, a.RequestCount)
  415. }
  416. return cmp.Compare(a.Value, b.Value)
  417. })
  418. values := make([]T, len(results))
  419. for i, result := range results {
  420. values[i] = result.Value
  421. }
  422. return values, nil
  423. }
  424. type ChartData struct {
  425. Timestamp int64 `json:"timestamp"`
  426. RequestCount int64 `json:"request_count"`
  427. UsedAmount float64 `json:"used_amount"`
  428. InputTokens int64 `json:"input_tokens,omitempty"`
  429. OutputTokens int64 `json:"output_tokens,omitempty"`
  430. CachedTokens int64 `json:"cached_tokens,omitempty"`
  431. CacheCreationTokens int64 `json:"cache_creation_tokens,omitempty"`
  432. TotalTokens int64 `json:"total_tokens,omitempty"`
  433. ExceptionCount int64 `json:"exception_count"`
  434. WebSearchCount int64 `json:"web_search_count,omitempty"`
  435. MaxRPM int64 `json:"max_rpm,omitempty"`
  436. MaxTPM int64 `json:"max_tpm,omitempty"`
  437. MaxRPS int64 `json:"max_rps,omitempty"`
  438. MaxTPS int64 `json:"max_tps,omitempty"`
  439. }
  440. type DashboardResponse struct {
  441. ChartData []*ChartData `json:"chart_data"`
  442. TotalCount int64 `json:"total_count"`
  443. ExceptionCount int64 `json:"exception_count"`
  444. RPM int64 `json:"rpm"`
  445. TPM int64 `json:"tpm"`
  446. MaxRPM int64 `json:"max_rpm,omitempty"`
  447. MaxTPM int64 `json:"max_tpm,omitempty"`
  448. MaxRPS int64 `json:"max_rps,omitempty"`
  449. MaxTPS int64 `json:"max_tps,omitempty"`
  450. UsedAmount float64 `json:"used_amount"`
  451. InputTokens int64 `json:"input_tokens,omitempty"`
  452. OutputTokens int64 `json:"output_tokens,omitempty"`
  453. TotalTokens int64 `json:"total_tokens,omitempty"`
  454. CachedTokens int64 `json:"cached_tokens,omitempty"`
  455. CacheCreationTokens int64 `json:"cache_creation_tokens,omitempty"`
  456. WebSearchCount int64 `json:"web_search_count,omitempty"`
  457. Channels []int `json:"channels,omitempty"`
  458. Models []string `json:"models,omitempty"`
  459. }
  460. type GroupDashboardResponse struct {
  461. DashboardResponse
  462. TokenNames []string `json:"token_names"`
  463. }
  464. type TimeSpanType string
  465. const (
  466. TimeSpanDay TimeSpanType = "day"
  467. TimeSpanHour TimeSpanType = "hour"
  468. )
  469. // aggregateHourDataToDay converts hourly chart data into daily aggregated data
  470. func aggregateHourDataToDay(hourlyData []*ChartData, timezone *time.Location) []*ChartData {
  471. dayData := make(map[int64]*ChartData)
  472. if timezone == nil {
  473. timezone = time.Local
  474. }
  475. for _, data := range hourlyData {
  476. // Convert timestamp to time in the specified timezone
  477. t := time.Unix(data.Timestamp, 0).In(timezone)
  478. // Get the start of the day in the specified timezone
  479. startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, timezone)
  480. dayTimestamp := startOfDay.Unix()
  481. if _, exists := dayData[dayTimestamp]; !exists {
  482. dayData[dayTimestamp] = &ChartData{
  483. Timestamp: dayTimestamp,
  484. }
  485. }
  486. day := dayData[dayTimestamp]
  487. day.RequestCount += data.RequestCount
  488. day.UsedAmount = decimal.
  489. NewFromFloat(data.UsedAmount).
  490. Add(decimal.NewFromFloat(day.UsedAmount)).
  491. InexactFloat64()
  492. day.ExceptionCount += data.ExceptionCount
  493. day.InputTokens += data.InputTokens
  494. day.OutputTokens += data.OutputTokens
  495. day.CachedTokens += data.CachedTokens
  496. day.CacheCreationTokens += data.CacheCreationTokens
  497. day.TotalTokens += data.TotalTokens
  498. day.WebSearchCount += data.WebSearchCount
  499. if data.MaxRPM > day.MaxRPM {
  500. day.MaxRPM = data.MaxRPM
  501. }
  502. if data.MaxTPM > day.MaxTPM {
  503. day.MaxTPM = data.MaxTPM
  504. }
  505. if data.MaxRPS > day.MaxRPS {
  506. day.MaxRPS = data.MaxRPS
  507. }
  508. if data.MaxTPS > day.MaxTPS {
  509. day.MaxTPS = data.MaxTPS
  510. }
  511. }
  512. result := make([]*ChartData, 0, len(dayData))
  513. for _, data := range dayData {
  514. result = append(result, data)
  515. }
  516. slices.SortFunc(result, func(a, b *ChartData) int {
  517. return cmp.Compare(a.Timestamp, b.Timestamp)
  518. })
  519. return result
  520. }
  521. func sumDashboardResponse(chartData []*ChartData) DashboardResponse {
  522. dashboardResponse := DashboardResponse{
  523. ChartData: chartData,
  524. }
  525. usedAmount := decimal.NewFromFloat(0)
  526. for _, data := range chartData {
  527. dashboardResponse.TotalCount += data.RequestCount
  528. dashboardResponse.ExceptionCount += data.ExceptionCount
  529. usedAmount = usedAmount.Add(decimal.NewFromFloat(data.UsedAmount))
  530. dashboardResponse.InputTokens += data.InputTokens
  531. dashboardResponse.OutputTokens += data.OutputTokens
  532. dashboardResponse.TotalTokens += data.TotalTokens
  533. dashboardResponse.CachedTokens += data.CachedTokens
  534. dashboardResponse.CacheCreationTokens += data.CacheCreationTokens
  535. dashboardResponse.WebSearchCount += data.WebSearchCount
  536. if data.MaxRPM > dashboardResponse.MaxRPM {
  537. dashboardResponse.MaxRPM = data.MaxRPM
  538. }
  539. if data.MaxTPM > dashboardResponse.MaxTPM {
  540. dashboardResponse.MaxTPM = data.MaxTPM
  541. }
  542. if data.MaxRPS > dashboardResponse.MaxRPS {
  543. dashboardResponse.MaxRPS = data.MaxRPS
  544. }
  545. if data.MaxTPS > dashboardResponse.MaxTPS {
  546. dashboardResponse.MaxTPS = data.MaxTPS
  547. }
  548. }
  549. dashboardResponse.UsedAmount = usedAmount.InexactFloat64()
  550. return dashboardResponse
  551. }
  552. func GetDashboardData(
  553. start,
  554. end time.Time,
  555. modelName string,
  556. channelID int,
  557. timeSpan TimeSpanType,
  558. timezone *time.Location,
  559. ) (*DashboardResponse, error) {
  560. if end.IsZero() {
  561. end = time.Now()
  562. } else if end.Before(start) {
  563. return nil, errors.New("end time is before start time")
  564. }
  565. var (
  566. chartData []*ChartData
  567. channels []int
  568. models []string
  569. )
  570. g := new(errgroup.Group)
  571. g.Go(func() error {
  572. var err error
  573. chartData, err = getChartData(start, end, channelID, modelName, timeSpan, timezone)
  574. return err
  575. })
  576. g.Go(func() error {
  577. var err error
  578. channels, err = GetUsedChannels(start, end)
  579. return err
  580. })
  581. g.Go(func() error {
  582. var err error
  583. models, err = GetUsedModels(start, end)
  584. return err
  585. })
  586. if err := g.Wait(); err != nil {
  587. return nil, err
  588. }
  589. dashboardResponse := sumDashboardResponse(chartData)
  590. dashboardResponse.Channels = channels
  591. dashboardResponse.Models = models
  592. return &dashboardResponse, nil
  593. }
  594. func GetGroupDashboardData(
  595. group string,
  596. start, end time.Time,
  597. tokenName string,
  598. modelName string,
  599. timeSpan TimeSpanType,
  600. timezone *time.Location,
  601. ) (*GroupDashboardResponse, error) {
  602. if group == "" {
  603. return nil, errors.New("group is required")
  604. }
  605. if end.IsZero() {
  606. end = time.Now()
  607. } else if end.Before(start) {
  608. return nil, errors.New("end time is before start time")
  609. }
  610. var (
  611. chartData []*ChartData
  612. tokenNames []string
  613. models []string
  614. )
  615. g := new(errgroup.Group)
  616. g.Go(func() error {
  617. var err error
  618. chartData, err = getGroupChartData(
  619. group,
  620. start,
  621. end,
  622. tokenName,
  623. modelName,
  624. timeSpan,
  625. timezone,
  626. )
  627. return err
  628. })
  629. g.Go(func() error {
  630. var err error
  631. tokenNames, err = GetGroupUsedTokenNames(group, start, end)
  632. return err
  633. })
  634. g.Go(func() error {
  635. var err error
  636. models, err = GetGroupUsedModels(group, tokenName, start, end)
  637. return err
  638. })
  639. if err := g.Wait(); err != nil {
  640. return nil, err
  641. }
  642. dashboardResponse := sumDashboardResponse(chartData)
  643. dashboardResponse.Models = models
  644. return &GroupDashboardResponse{
  645. DashboardResponse: dashboardResponse,
  646. TokenNames: tokenNames,
  647. }, nil
  648. }
  649. //nolint:revive
  650. type ModelData struct {
  651. Timestamp int64 `json:"timestamp,omitempty"`
  652. Model string `json:"model"`
  653. RequestCount int64 `json:"request_count"`
  654. UsedAmount float64 `json:"used_amount"`
  655. ExceptionCount int64 `json:"exception_count"`
  656. InputTokens int64 `json:"input_tokens,omitempty"`
  657. OutputTokens int64 `json:"output_tokens,omitempty"`
  658. CachedTokens int64 `json:"cached_tokens,omitempty"`
  659. CacheCreationTokens int64 `json:"cache_creation_tokens,omitempty"`
  660. TotalTokens int64 `json:"total_tokens,omitempty"`
  661. WebSearchCount int64 `json:"web_search_count,omitempty"`
  662. MaxRPM int64 `json:"max_rpm,omitempty"`
  663. MaxRPS int64 `json:"max_rps,omitempty"`
  664. MaxTPM int64 `json:"max_tpm,omitempty"`
  665. MaxTPS int64 `json:"max_tps,omitempty"`
  666. }
  667. type TimeModelData struct {
  668. Timestamp int64 `json:"timestamp"`
  669. Models []*ModelData `json:"models"`
  670. }
  671. func GetTimeSeriesModelData(
  672. channelID int,
  673. start, end time.Time,
  674. timeSpan TimeSpanType,
  675. timezone *time.Location,
  676. ) ([]*TimeModelData, error) {
  677. if end.IsZero() {
  678. end = time.Now()
  679. } else if end.Before(start) {
  680. return nil, errors.New("end time is before start time")
  681. }
  682. query := LogDB.Model(&Summary{})
  683. if channelID != 0 {
  684. query = query.Where("channel_id = ?", channelID)
  685. }
  686. switch {
  687. case !start.IsZero() && !end.IsZero():
  688. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  689. case !start.IsZero():
  690. query = query.Where("hour_timestamp >= ?", start.Unix())
  691. case !end.IsZero():
  692. query = query.Where("hour_timestamp <= ?", end.Unix())
  693. }
  694. selectFields := "hour_timestamp as timestamp, model, " +
  695. "sum(request_count) as request_count, sum(used_amount) as used_amount, " +
  696. "sum(exception_count) as exception_count, sum(input_tokens) as input_tokens, " +
  697. "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
  698. "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
  699. "sum(web_search_count) as web_search_count"
  700. if channelID != 0 {
  701. selectFields += ", max(max_rpm) as max_rpm, max(max_rps) as max_rps, max(max_tpm) as max_tpm, max(max_tps) as max_tps"
  702. } else {
  703. selectFields += ", 0 as max_rpm, 0 as max_rps, 0 as max_tpm, 0 as max_tps"
  704. }
  705. var rawData []ModelData
  706. err := query.
  707. Select(selectFields).
  708. Group("timestamp, model").
  709. Order("timestamp ASC").
  710. Scan(&rawData).Error
  711. if err != nil {
  712. return nil, err
  713. }
  714. if timeSpan == TimeSpanDay && len(rawData) > 0 {
  715. rawData = aggregateHourlyToDaily(rawData, timezone)
  716. }
  717. return convertToTimeModelData(rawData), nil
  718. }
  719. func GetGroupTimeSeriesModelData(
  720. group string,
  721. tokenName string,
  722. start, end time.Time,
  723. timeSpan TimeSpanType,
  724. timezone *time.Location,
  725. ) ([]*TimeModelData, error) {
  726. if end.IsZero() {
  727. end = time.Now()
  728. } else if end.Before(start) {
  729. return nil, errors.New("end time is before start time")
  730. }
  731. query := LogDB.Model(&GroupSummary{}).
  732. Where("group_id = ?", group)
  733. if tokenName != "" {
  734. query = query.Where("token_name = ?", tokenName)
  735. }
  736. switch {
  737. case !start.IsZero() && !end.IsZero():
  738. query = query.Where("hour_timestamp BETWEEN ? AND ?", start.Unix(), end.Unix())
  739. case !start.IsZero():
  740. query = query.Where("hour_timestamp >= ?", start.Unix())
  741. case !end.IsZero():
  742. query = query.Where("hour_timestamp <= ?", end.Unix())
  743. }
  744. selectFields := "hour_timestamp as timestamp, model, " +
  745. "sum(request_count) as request_count, sum(used_amount) as used_amount, " +
  746. "sum(exception_count) as exception_count, sum(input_tokens) as input_tokens, " +
  747. "sum(output_tokens) as output_tokens, sum(cached_tokens) as cached_tokens, " +
  748. "sum(cache_creation_tokens) as cache_creation_tokens, sum(total_tokens) as total_tokens, " +
  749. "sum(web_search_count) as web_search_count"
  750. if tokenName != "" {
  751. selectFields += ", max(max_rpm) as max_rpm, max(max_rps) as max_rps, max(max_tpm) as max_tpm, max(max_tps) as max_tps"
  752. } else {
  753. selectFields += ", 0 as max_rpm, 0 as max_rps, 0 as max_tpm, 0 as max_tps"
  754. }
  755. var rawData []ModelData
  756. err := query.
  757. Select(selectFields).
  758. Group("timestamp, model").
  759. Order("timestamp ASC").
  760. Scan(&rawData).Error
  761. if err != nil {
  762. return nil, err
  763. }
  764. if timeSpan == TimeSpanDay && len(rawData) > 0 {
  765. rawData = aggregateHourlyToDaily(rawData, timezone)
  766. }
  767. return convertToTimeModelData(rawData), nil
  768. }
  769. func aggregateHourlyToDaily(hourlyData []ModelData, timezone *time.Location) []ModelData {
  770. if timezone == nil {
  771. timezone = time.Local
  772. }
  773. type AggKey struct {
  774. DayTimestamp int64
  775. Model string
  776. }
  777. dayData := make(map[AggKey]*ModelData)
  778. for _, data := range hourlyData {
  779. t := time.Unix(data.Timestamp, 0).In(timezone)
  780. startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, timezone)
  781. dayTimestamp := startOfDay.Unix()
  782. key := AggKey{
  783. DayTimestamp: dayTimestamp,
  784. Model: data.Model,
  785. }
  786. if _, exists := dayData[key]; !exists {
  787. dayData[key] = &ModelData{
  788. Timestamp: dayTimestamp,
  789. Model: data.Model,
  790. }
  791. }
  792. day := dayData[key]
  793. day.RequestCount += data.RequestCount
  794. day.UsedAmount = decimal.
  795. NewFromFloat(data.UsedAmount).
  796. Add(decimal.NewFromFloat(day.UsedAmount)).
  797. InexactFloat64()
  798. day.ExceptionCount += data.ExceptionCount
  799. day.InputTokens += data.InputTokens
  800. day.OutputTokens += data.OutputTokens
  801. day.CachedTokens += data.CachedTokens
  802. day.CacheCreationTokens += data.CacheCreationTokens
  803. day.TotalTokens += data.TotalTokens
  804. day.WebSearchCount += data.WebSearchCount
  805. if data.MaxRPM > day.MaxRPM {
  806. day.MaxRPM = data.MaxRPM
  807. }
  808. if data.MaxTPM > day.MaxTPM {
  809. day.MaxTPM = data.MaxTPM
  810. }
  811. if data.MaxRPS > day.MaxRPS {
  812. day.MaxRPS = data.MaxRPS
  813. }
  814. if data.MaxTPS > day.MaxTPS {
  815. day.MaxTPS = data.MaxTPS
  816. }
  817. }
  818. result := make([]ModelData, 0, len(dayData))
  819. for _, data := range dayData {
  820. result = append(result, *data)
  821. }
  822. return result
  823. }
  824. func convertToTimeModelData(rawData []ModelData) []*TimeModelData {
  825. timeMap := make(map[int64][]*ModelData)
  826. for _, data := range rawData {
  827. modelData := &ModelData{
  828. Model: data.Model,
  829. RequestCount: data.RequestCount,
  830. UsedAmount: data.UsedAmount,
  831. ExceptionCount: data.ExceptionCount,
  832. InputTokens: data.InputTokens,
  833. OutputTokens: data.OutputTokens,
  834. CachedTokens: data.CachedTokens,
  835. CacheCreationTokens: data.CacheCreationTokens,
  836. TotalTokens: data.TotalTokens,
  837. WebSearchCount: data.WebSearchCount,
  838. MaxRPM: data.MaxRPM,
  839. MaxRPS: data.MaxRPS,
  840. MaxTPM: data.MaxTPM,
  841. MaxTPS: data.MaxTPS,
  842. }
  843. timeMap[data.Timestamp] = append(timeMap[data.Timestamp], modelData)
  844. }
  845. result := make([]*TimeModelData, 0, len(timeMap))
  846. for timestamp, models := range timeMap {
  847. slices.SortFunc(models, func(a, b *ModelData) int {
  848. if a.UsedAmount != b.UsedAmount {
  849. return cmp.Compare(b.UsedAmount, a.UsedAmount)
  850. }
  851. if a.TotalTokens != b.TotalTokens {
  852. return cmp.Compare(b.TotalTokens, a.TotalTokens)
  853. }
  854. if a.RequestCount != b.RequestCount {
  855. return cmp.Compare(b.RequestCount, a.RequestCount)
  856. }
  857. return cmp.Compare(a.Model, b.Model)
  858. })
  859. result = append(result, &TimeModelData{
  860. Timestamp: timestamp,
  861. Models: models,
  862. })
  863. }
  864. slices.SortFunc(result, func(a, b *TimeModelData) int {
  865. return cmp.Compare(a.Timestamp, b.Timestamp)
  866. })
  867. return result
  868. }