log.go 33 KB


  1. package model
  2. import (
  3. "cmp"
  4. "errors"
  5. "fmt"
  6. "slices"
  7. "strings"
  8. "time"
  9. "github.com/bytedance/sonic"
  10. "github.com/labring/sealos/service/aiproxy/common"
  11. "github.com/labring/sealos/service/aiproxy/common/config"
  12. "github.com/shopspring/decimal"
  13. "golang.org/x/sync/errgroup"
  14. "gorm.io/gorm"
  15. )
  16. type RequestDetail struct {
  17. CreatedAt time.Time `gorm:"autoCreateTime;index" json:"-"`
  18. RequestBody string `gorm:"type:text" json:"request_body,omitempty"`
  19. ResponseBody string `gorm:"type:text" json:"response_body,omitempty"`
  20. RequestBodyTruncated bool `json:"request_body_truncated,omitempty"`
  21. ResponseBodyTruncated bool `json:"response_body_truncated,omitempty"`
  22. ID int `gorm:"primaryKey" json:"id"`
  23. LogID int `gorm:"index" json:"log_id"`
  24. }
  25. func (d *RequestDetail) BeforeSave(_ *gorm.DB) (err error) {
  26. if reqMax := config.GetLogDetailRequestBodyMaxSize(); reqMax > 0 && int64(len(d.RequestBody)) > reqMax {
  27. d.RequestBody = common.TruncateByRune(d.RequestBody, int(reqMax)) + "..."
  28. d.RequestBodyTruncated = true
  29. }
  30. if respMax := config.GetLogDetailResponseBodyMaxSize(); respMax > 0 && int64(len(d.ResponseBody)) > respMax {
  31. d.ResponseBody = common.TruncateByRune(d.ResponseBody, int(respMax)) + "..."
  32. d.ResponseBodyTruncated = true
  33. }
  34. return
  35. }
  36. type Log struct {
  37. RequestDetail *RequestDetail `gorm:"foreignKey:LogID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;" json:"request_detail,omitempty"`
  38. RequestAt time.Time `gorm:"index" json:"request_at"`
  39. TimestampTruncByDay int64 `json:"timestamp_trunc_by_day"`
  40. TimestampTruncByHour int64 `json:"timestamp_trunc_by_hour"`
  41. CreatedAt time.Time `gorm:"autoCreateTime;index" json:"created_at"`
  42. TokenName string `json:"token_name,omitempty"`
  43. Endpoint string `json:"endpoint"`
  44. Content string `gorm:"type:text" json:"content,omitempty"`
  45. GroupID string `gorm:"index" json:"group,omitempty"`
  46. Model string `gorm:"index" json:"model"`
  47. RequestID string `gorm:"index" json:"request_id"`
  48. Price float64 `json:"price,omitempty"`
  49. ID int `gorm:"primaryKey" json:"id"`
  50. CompletionPrice float64 `json:"completion_price,omitempty"`
  51. TokenID int `gorm:"index" json:"token_id,omitempty"`
  52. UsedAmount float64 `json:"used_amount,omitempty"`
  53. PromptTokens int `json:"prompt_tokens,omitempty"`
  54. CompletionTokens int `json:"completion_tokens,omitempty"`
  55. TotalTokens int `json:"total_tokens,omitempty"`
  56. ChannelID int `gorm:"index" json:"channel,omitempty"`
  57. Code int `gorm:"index" json:"code,omitempty"`
  58. Mode int `json:"mode,omitempty"`
  59. IP string `gorm:"index" json:"ip,omitempty"`
  60. RetryTimes int `json:"retry_times,omitempty"`
  61. }
  62. func CreateLogIndexes(db *gorm.DB) error {
  63. var indexes []string
  64. if common.UsingSQLite {
  65. // not support INCLUDE
  66. indexes = []string{
  67. // used by global search logs
  68. "CREATE INDEX IF NOT EXISTS idx_model_reqat ON logs (model, request_at)",
  69. // used by global search logs
  70. "CREATE INDEX IF NOT EXISTS idx_channel_reqat ON logs (channel_id, request_at)",
  71. // used by global search logs
  72. "CREATE INDEX IF NOT EXISTS idx_channel_model_reqat ON logs (channel_id, model, request_at)",
  73. // global day indexes, used by global dashboard
  74. "CREATE INDEX IF NOT EXISTS idx_model_reqat_truncday ON logs (model, request_at, timestamp_trunc_by_day)",
  75. // global hour indexes, used by global dashboard
  76. "CREATE INDEX IF NOT EXISTS idx_model_reqat_trunchour ON logs (model, request_at, timestamp_trunc_by_hour)",
  77. // used by search group logs
  78. "CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at)",
  79. // used by search group logs
  80. "CREATE INDEX IF NOT EXISTS idx_group_model_reqat ON logs (group_id, model, request_at)",
  81. // used by search group logs
  82. "CREATE INDEX IF NOT EXISTS idx_group_token_model_reqat ON logs (group_id, token_name, model, request_at)",
  83. // day indexes, used by dashboard
  84. "CREATE INDEX IF NOT EXISTS idx_group_reqat_truncday ON logs (group_id, request_at, timestamp_trunc_by_day)",
  85. "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_truncday ON logs (group_id, model, request_at, timestamp_trunc_by_day)",
  86. "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_truncday ON logs (group_id, token_name, request_at, timestamp_trunc_by_day)",
  87. "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_truncday ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_day)",
  88. // hour indexes, used by dashboard
  89. "CREATE INDEX IF NOT EXISTS idx_group_reqat_trunchour ON logs (group_id, request_at, timestamp_trunc_by_hour)",
  90. "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_trunchour ON logs (group_id, model, request_at, timestamp_trunc_by_hour)",
  91. "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_trunchour ON logs (group_id, token_name, request_at, timestamp_trunc_by_hour)",
  92. "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_trunchour ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_hour)",
  93. }
  94. } else {
  95. indexes = []string{
  96. // used by global search logs
  97. "CREATE INDEX IF NOT EXISTS idx_model_reqat ON logs (model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
  98. // used by global search logs
  99. "CREATE INDEX IF NOT EXISTS idx_channel_reqat ON logs (channel_id, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
  100. // used by global search logs
  101. "CREATE INDEX IF NOT EXISTS idx_channel_model_reqat ON logs (channel_id, model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
  102. // global day indexes, used by global dashboard
  103. "CREATE INDEX IF NOT EXISTS idx_model_reqat_truncday ON logs (model, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
  104. // global hour indexes, used by global dashboard
  105. "CREATE INDEX IF NOT EXISTS idx_model_reqat_trunchour ON logs (model, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
  106. // used by search group logs
  107. "CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
  108. // used by search group logs
  109. "CREATE INDEX IF NOT EXISTS idx_group_token_reqat ON logs (group_id, token_name, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
  110. // used by search group logs
  111. "CREATE INDEX IF NOT EXISTS idx_group_model_reqat ON logs (group_id, model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
  112. // used by search group logs
  113. "CREATE INDEX IF NOT EXISTS idx_group_token_model_reqat ON logs (group_id, token_name, model, request_at) INCLUDE (code, used_amount, total_tokens, request_id)",
  114. // day indexes, used by dashboard
  115. "CREATE INDEX IF NOT EXISTS idx_group_reqat_truncday ON logs (group_id, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
  116. "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_truncday ON logs (group_id, model, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
  117. "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_truncday ON logs (group_id, token_name, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
  118. "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_truncday ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_day) INCLUDE (code, used_amount, total_tokens)",
  119. // hour indexes, used by dashboard
  120. "CREATE INDEX IF NOT EXISTS idx_group_reqat_trunchour ON logs (group_id, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
  121. "CREATE INDEX IF NOT EXISTS idx_group_model_reqat_trunchour ON logs (group_id, model, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
  122. "CREATE INDEX IF NOT EXISTS idx_group_token_reqat_trunchour ON logs (group_id, token_name, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
  123. "CREATE INDEX IF NOT EXISTS idx_group_model_token_reqat_trunchour ON logs (group_id, model, token_name, request_at, timestamp_trunc_by_hour) INCLUDE (code, used_amount, total_tokens)",
  124. }
  125. }
  126. for _, index := range indexes {
  127. if err := db.Exec(index).Error; err != nil {
  128. return err
  129. }
  130. }
  131. return nil
  132. }
  133. const (
  134. contentMaxSize = 2 * 1024 // 2KB
  135. )
  136. func (l *Log) BeforeSave(_ *gorm.DB) (err error) {
  137. if len(l.Content) > contentMaxSize {
  138. l.Content = common.TruncateByRune(l.Content, contentMaxSize) + "..."
  139. }
  140. if l.TimestampTruncByDay == 0 {
  141. l.TimestampTruncByDay = l.RequestAt.Truncate(24 * time.Hour).Unix()
  142. }
  143. if l.TimestampTruncByHour == 0 {
  144. l.TimestampTruncByHour = l.RequestAt.Truncate(time.Hour).Unix()
  145. }
  146. return
  147. }
  148. func (l *Log) MarshalJSON() ([]byte, error) {
  149. type Alias Log
  150. return sonic.Marshal(&struct {
  151. *Alias
  152. CreatedAt int64 `json:"created_at"`
  153. RequestAt int64 `json:"request_at"`
  154. }{
  155. Alias: (*Alias)(l),
  156. CreatedAt: l.CreatedAt.UnixMilli(),
  157. RequestAt: l.RequestAt.UnixMilli(),
  158. })
  159. }
  160. func GetLogDetail(logID int) (*RequestDetail, error) {
  161. var detail RequestDetail
  162. err := LogDB.
  163. Model(&RequestDetail{}).
  164. Where("log_id = ?", logID).
  165. First(&detail).Error
  166. if err != nil {
  167. return nil, err
  168. }
  169. return &detail, nil
  170. }
  171. func GetGroupLogDetail(logID int, group string) (*RequestDetail, error) {
  172. if group == "" {
  173. return nil, errors.New("group is required")
  174. }
  175. var detail RequestDetail
  176. err := LogDB.
  177. Model(&RequestDetail{}).
  178. Joins("JOIN logs ON logs.id = request_details.log_id").
  179. Where("logs.group_id = ?", group).
  180. Where("log_id = ?", logID).
  181. First(&detail).Error
  182. if err != nil {
  183. return nil, err
  184. }
  185. return &detail, nil
  186. }
  187. const defaultCleanLogBatchSize = 1000
  188. func CleanLog(batchSize int) error {
  189. err := cleanLog(batchSize)
  190. if err != nil {
  191. return err
  192. }
  193. return cleanLogDetail(batchSize)
  194. }
  195. func cleanLog(batchSize int) error {
  196. logStorageHours := config.GetLogStorageHours()
  197. if logStorageHours <= 0 {
  198. return nil
  199. }
  200. if batchSize <= 0 {
  201. batchSize = defaultCleanLogBatchSize
  202. }
  203. return LogDB.
  204. Session(&gorm.Session{SkipDefaultTransaction: true}).
  205. Where(
  206. "created_at < ?",
  207. time.Now().Add(-time.Duration(logStorageHours)*time.Hour),
  208. ).
  209. Limit(batchSize).
  210. Delete(&Log{}).Error
  211. }
  212. func cleanLogDetail(batchSize int) error {
  213. detailStorageHours := config.GetLogDetailStorageHours()
  214. if detailStorageHours <= 0 {
  215. return nil
  216. }
  217. if batchSize <= 0 {
  218. batchSize = defaultCleanLogBatchSize
  219. }
  220. return LogDB.
  221. Session(&gorm.Session{SkipDefaultTransaction: true}).
  222. Where(
  223. "created_at < ?",
  224. time.Now().Add(-time.Duration(detailStorageHours)*time.Hour),
  225. ).
  226. Limit(batchSize).
  227. Delete(&RequestDetail{}).Error
  228. }
  229. func RecordConsumeLog(
  230. requestID string,
  231. requestAt time.Time,
  232. group string,
  233. code int,
  234. channelID int,
  235. promptTokens int,
  236. completionTokens int,
  237. modelName string,
  238. tokenID int,
  239. tokenName string,
  240. amount float64,
  241. price float64,
  242. completionPrice float64,
  243. endpoint string,
  244. content string,
  245. mode int,
  246. ip string,
  247. retryTimes int,
  248. requestDetail *RequestDetail,
  249. ) error {
  250. log := &Log{
  251. RequestID: requestID,
  252. RequestAt: requestAt,
  253. GroupID: group,
  254. CreatedAt: time.Now(),
  255. Code: code,
  256. PromptTokens: promptTokens,
  257. CompletionTokens: completionTokens,
  258. TotalTokens: promptTokens + completionTokens,
  259. TokenID: tokenID,
  260. TokenName: tokenName,
  261. Model: modelName,
  262. Mode: mode,
  263. IP: ip,
  264. UsedAmount: amount,
  265. Price: price,
  266. CompletionPrice: completionPrice,
  267. ChannelID: channelID,
  268. Endpoint: endpoint,
  269. Content: content,
  270. RetryTimes: retryTimes,
  271. RequestDetail: requestDetail,
  272. }
  273. return LogDB.Create(log).Error
  274. }
  275. func getLogOrder(order string) string {
  276. prefix, suffix, _ := strings.Cut(order, "-")
  277. switch prefix {
  278. case "request_at", "id", "created_at":
  279. switch suffix {
  280. case "asc":
  281. return prefix + " asc"
  282. default:
  283. return prefix + " desc"
  284. }
  285. default:
  286. return "request_at desc"
  287. }
  288. }
  289. type CodeType string
  290. const (
  291. CodeTypeAll CodeType = "all"
  292. CodeTypeSuccess CodeType = "success"
  293. CodeTypeError CodeType = "error"
  294. )
  295. type GetLogsResult struct {
  296. Logs []*Log `json:"logs"`
  297. Total int64 `json:"total"`
  298. }
  299. type GetGroupLogsResult struct {
  300. GetLogsResult
  301. Models []string `json:"models"`
  302. TokenNames []string `json:"token_names"`
  303. }
  304. func buildGetLogsQuery(
  305. group string,
  306. startTimestamp time.Time,
  307. endTimestamp time.Time,
  308. modelName string,
  309. requestID string,
  310. tokenID int,
  311. tokenName string,
  312. channelID int,
  313. endpoint string,
  314. mode int,
  315. codeType CodeType,
  316. ip string,
  317. ) *gorm.DB {
  318. tx := LogDB.Model(&Log{})
  319. if group != "" {
  320. tx = tx.Where("group_id = ?", group)
  321. }
  322. if !startTimestamp.IsZero() && !endTimestamp.IsZero() {
  323. tx = tx.Where("request_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
  324. } else if !startTimestamp.IsZero() {
  325. tx = tx.Where("request_at >= ?", startTimestamp)
  326. } else if !endTimestamp.IsZero() {
  327. tx = tx.Where("request_at <= ?", endTimestamp)
  328. }
  329. if tokenName != "" {
  330. tx = tx.Where("token_name = ?", tokenName)
  331. }
  332. if modelName != "" {
  333. tx = tx.Where("model = ?", modelName)
  334. }
  335. if mode != 0 {
  336. tx = tx.Where("mode = ?", mode)
  337. }
  338. if requestID != "" {
  339. tx = tx.Where("request_id = ?", requestID)
  340. }
  341. if tokenID != 0 {
  342. tx = tx.Where("token_id = ?", tokenID)
  343. }
  344. if channelID != 0 {
  345. tx = tx.Where("channel_id = ?", channelID)
  346. }
  347. if endpoint != "" {
  348. tx = tx.Where("endpoint = ?", endpoint)
  349. }
  350. if ip != "" {
  351. tx = tx.Where("ip = ?", ip)
  352. }
  353. switch codeType {
  354. case CodeTypeSuccess:
  355. tx = tx.Where("code = 200")
  356. case CodeTypeError:
  357. tx = tx.Where("code != 200")
  358. }
  359. return tx
  360. }
  361. func getLogs(
  362. group string,
  363. startTimestamp time.Time,
  364. endTimestamp time.Time,
  365. modelName string,
  366. requestID string,
  367. tokenID int,
  368. tokenName string,
  369. channelID int,
  370. endpoint string,
  371. order string,
  372. mode int,
  373. codeType CodeType,
  374. withBody bool,
  375. ip string,
  376. page int,
  377. perPage int,
  378. ) (int64, []*Log, error) {
  379. var total int64
  380. var logs []*Log
  381. g := new(errgroup.Group)
  382. g.Go(func() error {
  383. return buildGetLogsQuery(
  384. group,
  385. startTimestamp,
  386. endTimestamp,
  387. modelName,
  388. requestID,
  389. tokenID,
  390. tokenName,
  391. channelID,
  392. endpoint,
  393. mode,
  394. codeType,
  395. ip,
  396. ).Count(&total).Error
  397. })
  398. g.Go(func() error {
  399. query := buildGetLogsQuery(
  400. group,
  401. startTimestamp,
  402. endTimestamp,
  403. modelName,
  404. requestID,
  405. tokenID,
  406. tokenName,
  407. channelID,
  408. endpoint,
  409. mode,
  410. codeType,
  411. ip,
  412. )
  413. if withBody {
  414. query = query.Preload("RequestDetail")
  415. } else {
  416. query = query.Preload("RequestDetail", func(db *gorm.DB) *gorm.DB {
  417. return db.Select("id", "log_id")
  418. })
  419. }
  420. limit, offset := toLimitOffset(page, perPage)
  421. return query.
  422. Order(getLogOrder(order)).
  423. Limit(limit).
  424. Offset(offset).
  425. Find(&logs).Error
  426. })
  427. if err := g.Wait(); err != nil {
  428. return 0, nil, err
  429. }
  430. return total, logs, nil
  431. }
  432. func GetLogs(
  433. group string,
  434. startTimestamp time.Time,
  435. endTimestamp time.Time,
  436. modelName string,
  437. requestID string,
  438. tokenID int,
  439. tokenName string,
  440. channelID int,
  441. endpoint string,
  442. order string,
  443. mode int,
  444. codeType CodeType,
  445. withBody bool,
  446. ip string,
  447. page int,
  448. perPage int,
  449. ) (*GetLogsResult, error) {
  450. var (
  451. total int64
  452. logs []*Log
  453. )
  454. g := new(errgroup.Group)
  455. g.Go(func() error {
  456. var err error
  457. total, logs, err = getLogs(group, startTimestamp, endTimestamp, modelName, requestID, tokenID, tokenName, channelID, endpoint, order, mode, codeType, withBody, ip, page, perPage)
  458. return err
  459. })
  460. if err := g.Wait(); err != nil {
  461. return nil, err
  462. }
  463. result := &GetLogsResult{
  464. Logs: logs,
  465. Total: total,
  466. }
  467. return result, nil
  468. }
  469. func GetGroupLogs(
  470. group string,
  471. startTimestamp time.Time,
  472. endTimestamp time.Time,
  473. modelName string,
  474. requestID string,
  475. tokenID int,
  476. tokenName string,
  477. channelID int,
  478. endpoint string,
  479. order string,
  480. mode int,
  481. codeType CodeType,
  482. withBody bool,
  483. ip string,
  484. page int,
  485. perPage int,
  486. ) (*GetGroupLogsResult, error) {
  487. if group == "" {
  488. return nil, errors.New("group is required")
  489. }
  490. var (
  491. total int64
  492. logs []*Log
  493. tokenNames []string
  494. models []string
  495. )
  496. g := new(errgroup.Group)
  497. g.Go(func() error {
  498. var err error
  499. total, logs, err = getLogs(group, startTimestamp, endTimestamp, modelName, requestID, tokenID, tokenName, channelID, endpoint, order, mode, codeType, withBody, ip, page, perPage)
  500. return err
  501. })
  502. g.Go(func() error {
  503. var err error
  504. tokenNames, err = GetUsedTokenNames(group, startTimestamp, endTimestamp)
  505. return err
  506. })
  507. g.Go(func() error {
  508. var err error
  509. models, err = GetUsedModels(group, startTimestamp, endTimestamp)
  510. return err
  511. })
  512. if err := g.Wait(); err != nil {
  513. return nil, err
  514. }
  515. return &GetGroupLogsResult{
  516. GetLogsResult: GetLogsResult{
  517. Logs: logs,
  518. Total: total,
  519. },
  520. Models: models,
  521. TokenNames: tokenNames,
  522. }, nil
  523. }
  524. func buildSearchLogsQuery(
  525. group string,
  526. keyword string,
  527. endpoint string,
  528. requestID string,
  529. tokenID int,
  530. tokenName string,
  531. modelName string,
  532. startTimestamp time.Time,
  533. endTimestamp time.Time,
  534. channelID int,
  535. mode int,
  536. codeType CodeType,
  537. ip string,
  538. ) *gorm.DB {
  539. tx := LogDB.Model(&Log{})
  540. if group != "" {
  541. tx = tx.Where("group_id = ?", group)
  542. }
  543. if tokenName != "" {
  544. tx = tx.Where("token_name = ?", tokenName)
  545. }
  546. if modelName != "" {
  547. tx = tx.Where("model = ?", modelName)
  548. }
  549. if !startTimestamp.IsZero() && !endTimestamp.IsZero() {
  550. tx = tx.Where("request_at BETWEEN ? AND ?", startTimestamp, endTimestamp)
  551. } else if !startTimestamp.IsZero() {
  552. tx = tx.Where("request_at >= ?", startTimestamp)
  553. } else if !endTimestamp.IsZero() {
  554. tx = tx.Where("request_at <= ?", endTimestamp)
  555. }
  556. if requestID != "" {
  557. tx = tx.Where("request_id = ?", requestID)
  558. }
  559. if tokenID != 0 {
  560. tx = tx.Where("token_id = ?", tokenID)
  561. }
  562. if channelID != 0 {
  563. tx = tx.Where("channel_id = ?", channelID)
  564. }
  565. switch codeType {
  566. case CodeTypeSuccess:
  567. tx = tx.Where("code = 200")
  568. case CodeTypeError:
  569. tx = tx.Where("code != 200")
  570. }
  571. if ip != "" {
  572. tx = tx.Where("ip = ?", ip)
  573. }
  574. if mode != 0 {
  575. tx = tx.Where("mode = ?", mode)
  576. }
  577. if endpoint != "" {
  578. tx = tx.Where("endpoint = ?", endpoint)
  579. }
  580. // Handle keyword search for zero value fields
  581. if keyword != "" {
  582. var conditions []string
  583. var values []interface{}
  584. if group == "" {
  585. conditions = append(conditions, "group_id = ?")
  586. values = append(values, keyword)
  587. }
  588. if tokenName == "" {
  589. conditions = append(conditions, "token_name = ?")
  590. values = append(values, keyword)
  591. }
  592. if modelName == "" {
  593. conditions = append(conditions, "model = ?")
  594. values = append(values, keyword)
  595. }
  596. if requestID == "" {
  597. conditions = append(conditions, "request_id = ?")
  598. values = append(values, keyword)
  599. }
  600. // if num := String2Int(keyword); num != 0 {
  601. // if channelID == 0 {
  602. // conditions = append(conditions, "channel_id = ?")
  603. // values = append(values, num)
  604. // }
  605. // if mode != 0 {
  606. // conditions = append(conditions, "mode = ?")
  607. // values = append(values, num)
  608. // }
  609. // }
  610. // if ip != "" {
  611. // conditions = append(conditions, "ip = ?")
  612. // values = append(values, ip)
  613. // }
  614. // if endpoint == "" {
  615. // if common.UsingPostgreSQL {
  616. // conditions = append(conditions, "endpoint ILIKE ?")
  617. // } else {
  618. // conditions = append(conditions, "endpoint LIKE ?")
  619. // }
  620. // values = append(values, "%"+keyword+"%")
  621. // }
  622. // slow query
  623. // if common.UsingPostgreSQL {
  624. // conditions = append(conditions, "content ILIKE ?")
  625. // } else {
  626. // conditions = append(conditions, "content LIKE ?")
  627. // }
  628. // values = append(values, "%"+keyword+"%")
  629. if len(conditions) > 0 {
  630. tx = tx.Where(fmt.Sprintf("(%s)", strings.Join(conditions, " OR ")), values...)
  631. }
  632. }
  633. return tx
  634. }
  635. func searchLogs(
  636. group string,
  637. keyword string,
  638. endpoint string,
  639. requestID string,
  640. tokenID int,
  641. tokenName string,
  642. modelName string,
  643. startTimestamp time.Time,
  644. endTimestamp time.Time,
  645. channelID int,
  646. order string,
  647. mode int,
  648. codeType CodeType,
  649. withBody bool,
  650. ip string,
  651. page int,
  652. perPage int,
  653. ) (int64, []*Log, error) {
  654. var total int64
  655. var logs []*Log
  656. g := new(errgroup.Group)
  657. g.Go(func() error {
  658. return buildSearchLogsQuery(
  659. group,
  660. keyword,
  661. endpoint,
  662. requestID,
  663. tokenID,
  664. tokenName,
  665. modelName,
  666. startTimestamp,
  667. endTimestamp,
  668. channelID,
  669. mode,
  670. codeType,
  671. ip,
  672. ).Count(&total).Error
  673. })
  674. g.Go(func() error {
  675. query := buildSearchLogsQuery(
  676. group,
  677. keyword,
  678. endpoint,
  679. requestID,
  680. tokenID,
  681. tokenName,
  682. modelName,
  683. startTimestamp,
  684. endTimestamp,
  685. channelID,
  686. mode,
  687. codeType,
  688. ip,
  689. )
  690. if withBody {
  691. query = query.Preload("RequestDetail")
  692. } else {
  693. query = query.Preload("RequestDetail", func(db *gorm.DB) *gorm.DB {
  694. return db.Select("id", "log_id")
  695. })
  696. }
  697. limit, offset := toLimitOffset(page, perPage)
  698. return query.
  699. Order(getLogOrder(order)).
  700. Limit(limit).
  701. Offset(offset).
  702. Find(&logs).Error
  703. })
  704. if err := g.Wait(); err != nil {
  705. return 0, nil, err
  706. }
  707. return total, logs, nil
  708. }
  709. func SearchLogs(
  710. group string,
  711. keyword string,
  712. endpoint string,
  713. requestID string,
  714. tokenID int,
  715. tokenName string,
  716. modelName string,
  717. startTimestamp time.Time,
  718. endTimestamp time.Time,
  719. channelID int,
  720. order string,
  721. mode int,
  722. codeType CodeType,
  723. withBody bool,
  724. ip string,
  725. page int,
  726. perPage int,
  727. ) (*GetLogsResult, error) {
  728. var (
  729. total int64
  730. logs []*Log
  731. )
  732. g := new(errgroup.Group)
  733. g.Go(func() error {
  734. var err error
  735. total, logs, err = searchLogs(group, keyword, endpoint, requestID, tokenID, tokenName, modelName, startTimestamp, endTimestamp, channelID, order, mode, codeType, withBody, ip, page, perPage)
  736. return err
  737. })
  738. if err := g.Wait(); err != nil {
  739. return nil, err
  740. }
  741. result := &GetLogsResult{
  742. Logs: logs,
  743. Total: total,
  744. }
  745. return result, nil
  746. }
  747. func SearchGroupLogs(
  748. group string,
  749. keyword string,
  750. endpoint string,
  751. requestID string,
  752. tokenID int,
  753. tokenName string,
  754. modelName string,
  755. startTimestamp time.Time,
  756. endTimestamp time.Time,
  757. channelID int,
  758. order string,
  759. mode int,
  760. codeType CodeType,
  761. withBody bool,
  762. ip string,
  763. page int,
  764. perPage int,
  765. ) (*GetGroupLogsResult, error) {
  766. if group == "" {
  767. return nil, errors.New("group is required")
  768. }
  769. var (
  770. total int64
  771. logs []*Log
  772. tokenNames []string
  773. models []string
  774. )
  775. g := new(errgroup.Group)
  776. g.Go(func() error {
  777. var err error
  778. total, logs, err = searchLogs(group, keyword, endpoint, requestID, tokenID, tokenName, modelName, startTimestamp, endTimestamp, channelID, order, mode, codeType, withBody, ip, page, perPage)
  779. return err
  780. })
  781. g.Go(func() error {
  782. var err error
  783. tokenNames, err = GetUsedTokenNames(group, startTimestamp, endTimestamp)
  784. return err
  785. })
  786. g.Go(func() error {
  787. var err error
  788. models, err = GetUsedModels(group, startTimestamp, endTimestamp)
  789. return err
  790. })
  791. if err := g.Wait(); err != nil {
  792. return nil, err
  793. }
  794. result := &GetGroupLogsResult{
  795. GetLogsResult: GetLogsResult{
  796. Logs: logs,
  797. Total: total,
  798. },
  799. Models: models,
  800. TokenNames: tokenNames,
  801. }
  802. return result, nil
  803. }
  804. func DeleteOldLog(timestamp time.Time) (int64, error) {
  805. result := LogDB.Where("request_at < ?", timestamp).Delete(&Log{})
  806. return result.RowsAffected, result.Error
  807. }
  808. func DeleteGroupLogs(groupID string) (int64, error) {
  809. if groupID == "" {
  810. return 0, errors.New("group is required")
  811. }
  812. result := LogDB.Where("group_id = ?", groupID).Delete(&Log{})
  813. return result.RowsAffected, result.Error
  814. }
  815. type ChartData struct {
  816. Timestamp int64 `json:"timestamp"`
  817. RequestCount int64 `json:"request_count"`
  818. UsedAmount float64 `json:"used_amount"`
  819. ExceptionCount int64 `json:"exception_count"`
  820. }
  821. type DashboardResponse struct {
  822. ChartData []*ChartData `json:"chart_data"`
  823. TotalCount int64 `json:"total_count"`
  824. ExceptionCount int64 `json:"exception_count"`
  825. UsedAmount float64 `json:"used_amount"`
  826. RPM int64 `json:"rpm"`
  827. TPM int64 `json:"tpm"`
  828. }
  829. type GroupDashboardResponse struct {
  830. DashboardResponse
  831. Models []string `json:"models"`
  832. TokenNames []string `json:"token_names"`
  833. }
  834. type TimeSpanType string
  835. const (
  836. TimeSpanDay TimeSpanType = "day"
  837. TimeSpanHour TimeSpanType = "hour"
  838. )
  839. func getTimeSpanFormat(t TimeSpanType) string {
  840. switch t {
  841. case TimeSpanDay:
  842. return "timestamp_trunc_by_day"
  843. case TimeSpanHour:
  844. return "timestamp_trunc_by_hour"
  845. default:
  846. return ""
  847. }
  848. }
  849. func getChartData(group string, start, end time.Time, tokenName, modelName string, timeSpan TimeSpanType) ([]*ChartData, error) {
  850. var chartData []*ChartData
  851. timeSpanFormat := getTimeSpanFormat(timeSpan)
  852. if timeSpanFormat == "" {
  853. return nil, errors.New("unsupported time format")
  854. }
  855. query := LogDB.Table("logs").
  856. Select(timeSpanFormat + " as timestamp, count(*) as request_count, sum(used_amount) as used_amount, sum(case when code != 200 then 1 else 0 end) as exception_count").
  857. Group("timestamp").
  858. Order("timestamp ASC")
  859. if group != "" {
  860. query = query.Where("group_id = ?", group)
  861. }
  862. if !start.IsZero() && !end.IsZero() {
  863. query = query.Where("request_at BETWEEN ? AND ?", start, end)
  864. } else if !start.IsZero() {
  865. query = query.Where("request_at >= ?", start)
  866. } else if !end.IsZero() {
  867. query = query.Where("request_at <= ?", end)
  868. }
  869. if tokenName != "" {
  870. query = query.Where("token_name = ?", tokenName)
  871. }
  872. if modelName != "" {
  873. query = query.Where("model = ?", modelName)
  874. }
  875. err := query.Scan(&chartData).Error
  876. return chartData, err
  877. }
  878. func GetUsedModels(group string, start, end time.Time) ([]string, error) {
  879. return getLogGroupByValues[string]("model", group, start, end)
  880. }
  881. func GetUsedTokenNames(group string, start, end time.Time) ([]string, error) {
  882. if group == "" {
  883. return nil, errors.New("group is required")
  884. }
  885. return getLogGroupByValues[string]("token_name", group, start, end)
  886. }
  887. //nolint:unused
  888. func getLogDistinctValues[T cmp.Ordered](field string, group string, start, end time.Time) ([]T, error) {
  889. var values []T
  890. query := LogDB.
  891. Model(&Log{})
  892. if group != "" {
  893. query = query.Where("group_id = ?", group)
  894. }
  895. if !start.IsZero() && !end.IsZero() {
  896. query = query.Where("request_at BETWEEN ? AND ?", start, end)
  897. } else if !start.IsZero() {
  898. query = query.Where("request_at >= ?", start)
  899. } else if !end.IsZero() {
  900. query = query.Where("request_at <= ?", end)
  901. }
  902. err := query.
  903. Distinct(field).
  904. Pluck(field, &values).Error
  905. if err != nil {
  906. return nil, err
  907. }
  908. slices.Sort(values)
  909. return values, nil
  910. }
  911. func getLogGroupByValues[T cmp.Ordered](field string, group string, start, end time.Time) ([]T, error) {
  912. var values []T
  913. query := LogDB.
  914. Model(&Log{})
  915. if group != "" {
  916. query = query.Where("group_id = ?", group)
  917. }
  918. if !start.IsZero() && !end.IsZero() {
  919. query = query.Where("request_at BETWEEN ? AND ?", start, end)
  920. } else if !start.IsZero() {
  921. query = query.Where("request_at >= ?", start)
  922. } else if !end.IsZero() {
  923. query = query.Where("request_at <= ?", end)
  924. }
  925. err := query.
  926. Select(field).
  927. Group(field).
  928. Pluck(field, &values).Error
  929. if err != nil {
  930. return nil, err
  931. }
  932. slices.Sort(values)
  933. return values, nil
  934. }
  935. func sumTotalCount(chartData []*ChartData) int64 {
  936. var count int64
  937. for _, data := range chartData {
  938. count += data.RequestCount
  939. }
  940. return count
  941. }
  942. func sumExceptionCount(chartData []*ChartData) int64 {
  943. var count int64
  944. for _, data := range chartData {
  945. count += data.ExceptionCount
  946. }
  947. return count
  948. }
  949. func sumUsedAmount(chartData []*ChartData) float64 {
  950. var amount decimal.Decimal
  951. for _, data := range chartData {
  952. amount = amount.Add(decimal.NewFromFloat(data.UsedAmount))
  953. }
  954. return amount.InexactFloat64()
  955. }
  956. func getRPM(group string, end time.Time, tokenName, modelName string) (int64, error) {
  957. query := LogDB.Model(&Log{})
  958. if group != "" {
  959. query = query.Where("group_id = ?", group)
  960. }
  961. if tokenName != "" {
  962. query = query.Where("token_name = ?", tokenName)
  963. }
  964. if modelName != "" {
  965. query = query.Where("model = ?", modelName)
  966. }
  967. var count int64
  968. err := query.
  969. Where("request_at BETWEEN ? AND ?", end.Add(-time.Minute), end).
  970. Count(&count).Error
  971. return count, err
  972. }
  973. func getTPM(group string, end time.Time, tokenName, modelName string) (int64, error) {
  974. query := LogDB.Model(&Log{}).
  975. Select("COALESCE(SUM(total_tokens), 0)").
  976. Where("request_at >= ? AND request_at <= ?", end.Add(-time.Minute), end)
  977. if group != "" {
  978. query = query.Where("group_id = ?", group)
  979. }
  980. if tokenName != "" {
  981. query = query.Where("token_name = ?", tokenName)
  982. }
  983. if modelName != "" {
  984. query = query.Where("model = ?", modelName)
  985. }
  986. var tpm int64
  987. err := query.Scan(&tpm).Error
  988. return tpm, err
  989. }
  990. func GetDashboardData(start, end time.Time, modelName string, timeSpan TimeSpanType) (*DashboardResponse, error) {
  991. if end.IsZero() {
  992. end = time.Now()
  993. } else if end.Before(start) {
  994. return nil, errors.New("end time is before start time")
  995. }
  996. var (
  997. chartData []*ChartData
  998. rpm int64
  999. tpm int64
  1000. )
  1001. g := new(errgroup.Group)
  1002. g.Go(func() error {
  1003. var err error
  1004. chartData, err = getChartData("", start, end, "", modelName, timeSpan)
  1005. return err
  1006. })
  1007. g.Go(func() error {
  1008. var err error
  1009. rpm, err = getRPM("", end, "", modelName)
  1010. return err
  1011. })
  1012. g.Go(func() error {
  1013. var err error
  1014. tpm, err = getTPM("", end, "", modelName)
  1015. return err
  1016. })
  1017. if err := g.Wait(); err != nil {
  1018. return nil, err
  1019. }
  1020. totalCount := sumTotalCount(chartData)
  1021. exceptionCount := sumExceptionCount(chartData)
  1022. usedAmount := sumUsedAmount(chartData)
  1023. return &DashboardResponse{
  1024. ChartData: chartData,
  1025. TotalCount: totalCount,
  1026. ExceptionCount: exceptionCount,
  1027. UsedAmount: usedAmount,
  1028. RPM: rpm,
  1029. TPM: tpm,
  1030. }, nil
  1031. }
  1032. func GetGroupDashboardData(group string, start, end time.Time, tokenName string, modelName string, timeSpan TimeSpanType) (*GroupDashboardResponse, error) {
  1033. if group == "" {
  1034. return nil, errors.New("group is required")
  1035. }
  1036. if end.IsZero() {
  1037. end = time.Now()
  1038. } else if end.Before(start) {
  1039. return nil, errors.New("end time is before start time")
  1040. }
  1041. var (
  1042. chartData []*ChartData
  1043. tokenNames []string
  1044. models []string
  1045. rpm int64
  1046. tpm int64
  1047. )
  1048. g := new(errgroup.Group)
  1049. g.Go(func() error {
  1050. var err error
  1051. chartData, err = getChartData(group, start, end, tokenName, modelName, timeSpan)
  1052. return err
  1053. })
  1054. g.Go(func() error {
  1055. var err error
  1056. tokenNames, err = GetUsedTokenNames(group, start, end)
  1057. return err
  1058. })
  1059. g.Go(func() error {
  1060. var err error
  1061. models, err = GetUsedModels(group, start, end)
  1062. return err
  1063. })
  1064. g.Go(func() error {
  1065. var err error
  1066. rpm, err = getRPM(group, end, tokenName, modelName)
  1067. return err
  1068. })
  1069. g.Go(func() error {
  1070. var err error
  1071. tpm, err = getTPM(group, end, tokenName, modelName)
  1072. return err
  1073. })
  1074. if err := g.Wait(); err != nil {
  1075. return nil, err
  1076. }
  1077. totalCount := sumTotalCount(chartData)
  1078. exceptionCount := sumExceptionCount(chartData)
  1079. usedAmount := sumUsedAmount(chartData)
  1080. return &GroupDashboardResponse{
  1081. DashboardResponse: DashboardResponse{
  1082. ChartData: chartData,
  1083. TotalCount: totalCount,
  1084. ExceptionCount: exceptionCount,
  1085. UsedAmount: usedAmount,
  1086. RPM: rpm,
  1087. TPM: tpm,
  1088. },
  1089. Models: models,
  1090. TokenNames: tokenNames,
  1091. }, nil
  1092. }
  1093. func GetGroupLastRequestTime(group string) (time.Time, error) {
  1094. if group == "" {
  1095. return time.Time{}, errors.New("group is required")
  1096. }
  1097. var log Log
  1098. err := LogDB.Model(&Log{}).Where("group_id = ?", group).Order("request_at desc").First(&log).Error
  1099. return log.RequestAt, err
  1100. }
  1101. func GetTokenLastRequestTime(id int) (time.Time, error) {
  1102. var log Log
  1103. tx := LogDB.Model(&Log{})
  1104. err := tx.Where("token_id = ?", id).Order("request_at desc").First(&log).Error
  1105. return log.RequestAt, err
  1106. }
  1107. func GetGroupModelTPM(group string, model string) (int64, error) {
  1108. end := time.Now()
  1109. start := end.Add(-time.Minute)
  1110. var tpm int64
  1111. err := LogDB.
  1112. Model(&Log{}).
  1113. Where("group_id = ? AND request_at >= ? AND request_at <= ? AND model = ?", group, start, end, model).
  1114. Select("COALESCE(SUM(total_tokens), 0)").
  1115. Scan(&tpm).Error
  1116. return tpm, err
  1117. }
  1118. type ModelCostRank struct {
  1119. Model string `json:"model"`
  1120. UsedAmount float64 `json:"used_amount"`
  1121. Total int64 `json:"total"`
  1122. }
  1123. func GetModelCostRank(group string, start, end time.Time) ([]*ModelCostRank, error) {
  1124. var ranks []*ModelCostRank
  1125. query := LogDB.Model(&Log{}).
  1126. Select("model, SUM(used_amount) as used_amount, COUNT(*) as total").
  1127. Group("model").
  1128. Order("used_amount DESC")
  1129. if group != "" {
  1130. query = query.Where("group_id = ?", group)
  1131. }
  1132. if !start.IsZero() && !end.IsZero() {
  1133. query = query.Where("request_at BETWEEN ? AND ?", start, end)
  1134. } else if !start.IsZero() {
  1135. query = query.Where("request_at >= ?", start)
  1136. } else if !end.IsZero() {
  1137. query = query.Where("request_at <= ?", end)
  1138. }
  1139. err := query.Scan(&ranks).Error
  1140. if err != nil {
  1141. return nil, err
  1142. }
  1143. return ranks, nil
  1144. }