log.go 14 KB


  1. package model
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "one-api/common"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/gin-gonic/gin"
  11. "github.com/bytedance/gopkg/util/gopool"
  12. "gorm.io/gorm"
  13. )
  14. type Log struct {
  15. Id int `json:"id" gorm:"index:idx_created_at_id,priority:1"`
  16. UserId int `json:"user_id" gorm:"index"`
  17. CreatedAt int64 `json:"created_at" gorm:"bigint;index:idx_created_at_id,priority:2;index:idx_created_at_type"`
  18. Type int `json:"type" gorm:"index:idx_created_at_type"`
  19. Content string `json:"content"`
  20. UserInput string `json:"user_input" gorm:"type:text;comment:用户输入内容"`
  21. Username string `json:"username" gorm:"index;index:index_username_model_name,priority:2;default:''"`
  22. TokenName string `json:"token_name" gorm:"index;default:''"`
  23. ModelName string `json:"model_name" gorm:"index;index:index_username_model_name,priority:1;default:''"`
  24. Quota int `json:"quota" gorm:"default:0"`
  25. PromptTokens int `json:"prompt_tokens" gorm:"default:0"`
  26. CompletionTokens int `json:"completion_tokens" gorm:"default:0"`
  27. UseTime int `json:"use_time" gorm:"default:0"`
  28. IsStream bool `json:"is_stream"`
  29. ChannelId int `json:"channel" gorm:"index"`
  30. ChannelName string `json:"channel_name" gorm:"->"`
  31. TokenId int `json:"token_id" gorm:"default:0;index"`
  32. Group string `json:"group" gorm:"index"`
  33. Ip string `json:"ip" gorm:"index;default:''"`
  34. Other string `json:"other"`
  35. }
  36. const (
  37. LogTypeUnknown = iota
  38. LogTypeTopup
  39. LogTypeConsume
  40. LogTypeManage
  41. LogTypeSystem
  42. LogTypeError
  43. )
  44. func formatUserLogs(logs []*Log) {
  45. for i := range logs {
  46. logs[i].ChannelName = ""
  47. var otherMap map[string]interface{}
  48. otherMap, _ = common.StrToMap(logs[i].Other)
  49. if otherMap != nil {
  50. // delete admin
  51. delete(otherMap, "admin_info")
  52. }
  53. logs[i].Other = common.MapToJsonStr(otherMap)
  54. logs[i].Id = logs[i].Id % 1024
  55. }
  56. }
  57. func GetLogByKey(key string) (logs []*Log, err error) {
  58. if os.Getenv("LOG_SQL_DSN") != "" {
  59. var tk Token
  60. if err = DB.Model(&Token{}).Where(logKeyCol+"=?", strings.TrimPrefix(key, "sk-")).First(&tk).Error; err != nil {
  61. return nil, err
  62. }
  63. err = LOG_DB.Model(&Log{}).Where("token_id=?", tk.Id).Find(&logs).Error
  64. } else {
  65. err = LOG_DB.Joins("left join tokens on tokens.id = logs.token_id").Where("tokens.key = ?", strings.TrimPrefix(key, "sk-")).Find(&logs).Error
  66. }
  67. formatUserLogs(logs)
  68. return logs, err
  69. }
  70. func RecordLog(userId int, logType int, content string) {
  71. if logType == LogTypeConsume && !common.LogConsumeEnabled {
  72. return
  73. }
  74. username, _ := GetUsernameById(userId, false)
  75. log := &Log{
  76. UserId: userId,
  77. Username: username,
  78. CreatedAt: common.GetTimestamp(),
  79. Type: logType,
  80. Content: content,
  81. }
  82. err := LOG_DB.Create(log).Error
  83. if err != nil {
  84. common.SysError("failed to record log: " + err.Error())
  85. }
  86. }
  87. func RecordErrorLog(c *gin.Context, userId int, channelId int, modelName string, tokenName string, content string, tokenId int, useTimeSeconds int,
  88. isStream bool, group string, other map[string]interface{}) {
  89. common.LogInfo(c, fmt.Sprintf("record error log: userId=%d, channelId=%d, modelName=%s, tokenName=%s, content=%s", userId, channelId, modelName, tokenName, content))
  90. username := c.GetString("username")
  91. otherStr := common.MapToJsonStr(other)
  92. // 判断是否需要记录 IP
  93. needRecordIp := false
  94. if settingMap, err := GetUserSetting(userId, false); err == nil {
  95. if settingMap.RecordIpLog {
  96. needRecordIp = true
  97. }
  98. }
  99. log := &Log{
  100. UserId: userId,
  101. Username: username,
  102. CreatedAt: common.GetTimestamp(),
  103. Type: LogTypeError,
  104. Content: content,
  105. PromptTokens: 0,
  106. CompletionTokens: 0,
  107. TokenName: tokenName,
  108. ModelName: modelName,
  109. Quota: 0,
  110. ChannelId: channelId,
  111. TokenId: tokenId,
  112. UseTime: useTimeSeconds,
  113. IsStream: isStream,
  114. Group: group,
  115. Ip: func() string {
  116. if needRecordIp {
  117. return c.ClientIP()
  118. }
  119. return ""
  120. }(),
  121. Other: otherStr,
  122. }
  123. err := LOG_DB.Create(log).Error
  124. if err != nil {
  125. common.LogError(c, "failed to record log: "+err.Error())
  126. }
  127. // 异步记录用量统计(失败请求)
  128. gopool.Go(func() {
  129. if recordErr := RecordUsageStatistics(tokenId, tokenName, modelName,
  130. 0, 0, 0, false); recordErr != nil {
  131. common.SysError("failed to record usage statistics: " + recordErr.Error())
  132. }
  133. })
  134. }
  135. type RecordConsumeLogParams struct {
  136. ChannelId int `json:"channel_id"`
  137. PromptTokens int `json:"prompt_tokens"`
  138. CompletionTokens int `json:"completion_tokens"`
  139. ModelName string `json:"model_name"`
  140. TokenName string `json:"token_name"`
  141. Quota int `json:"quota"`
  142. Content string `json:"content"`
  143. UserInput string `json:"user_input"`
  144. TokenId int `json:"token_id"`
  145. UserQuota int `json:"user_quota"`
  146. UseTimeSeconds int `json:"use_time_seconds"`
  147. IsStream bool `json:"is_stream"`
  148. Group string `json:"group"`
  149. Other map[string]interface{} `json:"other"`
  150. }
  151. func RecordConsumeLog(c *gin.Context, userId int, params RecordConsumeLogParams) {
  152. log.Println("=======,", c)
  153. common.LogInfo(c, fmt.Sprintf("record consume log: userId=%d, params=%s", userId, common.GetJsonString(params)))
  154. if !common.LogConsumeEnabled {
  155. return
  156. }
  157. username := c.GetString("username")
  158. otherStr := common.MapToJsonStr(params.Other)
  159. // 判断是否需要记录 IP
  160. needRecordIp := false
  161. if settingMap, err := GetUserSetting(userId, false); err == nil {
  162. if settingMap.RecordIpLog {
  163. needRecordIp = true
  164. }
  165. }
  166. log := &Log{
  167. UserId: userId,
  168. Username: username,
  169. CreatedAt: common.GetTimestamp(),
  170. Type: LogTypeConsume,
  171. Content: params.Content,
  172. UserInput: params.UserInput,
  173. PromptTokens: params.PromptTokens,
  174. CompletionTokens: params.CompletionTokens,
  175. TokenName: params.TokenName,
  176. ModelName: params.ModelName,
  177. Quota: params.Quota,
  178. ChannelId: params.ChannelId,
  179. TokenId: params.TokenId,
  180. UseTime: params.UseTimeSeconds,
  181. IsStream: params.IsStream,
  182. Group: params.Group,
  183. Ip: func() string {
  184. if needRecordIp {
  185. return c.ClientIP()
  186. }
  187. return ""
  188. }(),
  189. Other: otherStr,
  190. }
  191. err := LOG_DB.Create(log).Error
  192. if err != nil {
  193. common.LogError(c, "failed to record log: "+err.Error())
  194. }
  195. // 异步记录用量统计
  196. gopool.Go(func() {
  197. if recordErr := RecordUsageStatistics(params.TokenId, params.TokenName, params.ModelName,
  198. params.PromptTokens, params.CompletionTokens, params.Quota, true); recordErr != nil {
  199. common.SysError("failed to record usage statistics: " + recordErr.Error())
  200. }
  201. })
  202. if common.DataExportEnabled {
  203. gopool.Go(func() {
  204. LogQuotaData(userId, username, params.ModelName, params.Quota, common.GetTimestamp(), params.PromptTokens+params.CompletionTokens)
  205. })
  206. }
  207. }
  208. func GetAllLogs(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string, startIdx int, num int, channel int, group string) (logs []*Log, total int64, err error) {
  209. var tx *gorm.DB
  210. if logType == LogTypeUnknown {
  211. tx = LOG_DB
  212. } else {
  213. tx = LOG_DB.Where("logs.type = ?", logType)
  214. }
  215. if modelName != "" {
  216. tx = tx.Where("logs.model_name like ?", modelName)
  217. }
  218. if username != "" {
  219. tx = tx.Where("logs.username = ?", username)
  220. }
  221. if tokenName != "" {
  222. tx = tx.Where("logs.token_name = ?", tokenName)
  223. }
  224. if startTimestamp != 0 {
  225. tx = tx.Where("logs.created_at >= ?", startTimestamp)
  226. }
  227. if endTimestamp != 0 {
  228. tx = tx.Where("logs.created_at <= ?", endTimestamp)
  229. }
  230. if channel != 0 {
  231. tx = tx.Where("logs.channel_id = ?", channel)
  232. }
  233. if group != "" {
  234. tx = tx.Where("logs."+logGroupCol+" = ?", group)
  235. }
  236. err = tx.Model(&Log{}).Count(&total).Error
  237. if err != nil {
  238. return nil, 0, err
  239. }
  240. err = tx.Order("logs.id desc").Limit(num).Offset(startIdx).Find(&logs).Error
  241. if err != nil {
  242. return nil, 0, err
  243. }
  244. channelIdsMap := make(map[int]struct{})
  245. channelMap := make(map[int]string)
  246. for _, log := range logs {
  247. if log.ChannelId != 0 {
  248. channelIdsMap[log.ChannelId] = struct{}{}
  249. }
  250. }
  251. channelIds := make([]int, 0, len(channelIdsMap))
  252. for channelId := range channelIdsMap {
  253. channelIds = append(channelIds, channelId)
  254. }
  255. if len(channelIds) > 0 {
  256. var channels []struct {
  257. Id int `gorm:"column:id"`
  258. Name string `gorm:"column:name"`
  259. }
  260. if err = DB.Table("channels").Select("id, name").Where("id IN ?", channelIds).Find(&channels).Error; err != nil {
  261. return logs, total, err
  262. }
  263. for _, channel := range channels {
  264. channelMap[channel.Id] = channel.Name
  265. }
  266. for i := range logs {
  267. logs[i].ChannelName = channelMap[logs[i].ChannelId]
  268. }
  269. }
  270. return logs, total, err
  271. }
  272. func GetUserLogs(userId int, logType int, startTimestamp int64, endTimestamp int64, modelName string, tokenName string, startIdx int, num int, group string) (logs []*Log, total int64, err error) {
  273. var tx *gorm.DB
  274. if logType == LogTypeUnknown {
  275. tx = LOG_DB.Where("logs.user_id = ?", userId)
  276. } else {
  277. tx = LOG_DB.Where("logs.user_id = ? and logs.type = ?", userId, logType)
  278. }
  279. if modelName != "" {
  280. tx = tx.Where("logs.model_name like ?", modelName)
  281. }
  282. if tokenName != "" {
  283. tx = tx.Where("logs.token_name = ?", tokenName)
  284. }
  285. if startTimestamp != 0 {
  286. tx = tx.Where("logs.created_at >= ?", startTimestamp)
  287. }
  288. if endTimestamp != 0 {
  289. tx = tx.Where("logs.created_at <= ?", endTimestamp)
  290. }
  291. if group != "" {
  292. tx = tx.Where("logs."+logGroupCol+" = ?", group)
  293. }
  294. err = tx.Model(&Log{}).Count(&total).Error
  295. if err != nil {
  296. return nil, 0, err
  297. }
  298. err = tx.Order("logs.id desc").Limit(num).Offset(startIdx).Find(&logs).Error
  299. if err != nil {
  300. return nil, 0, err
  301. }
  302. formatUserLogs(logs)
  303. return logs, total, err
  304. }
  305. func SearchAllLogs(keyword string) (logs []*Log, err error) {
  306. err = LOG_DB.Where("type = ? or content LIKE ?", keyword, keyword+"%").Order("id desc").Limit(common.MaxRecentItems).Find(&logs).Error
  307. return logs, err
  308. }
  309. func SearchUserLogs(userId int, keyword string) (logs []*Log, err error) {
  310. err = LOG_DB.Where("user_id = ? and type = ?", userId, keyword).Order("id desc").Limit(common.MaxRecentItems).Find(&logs).Error
  311. formatUserLogs(logs)
  312. return logs, err
  313. }
  314. type Stat struct {
  315. Quota int `json:"quota"`
  316. Rpm int `json:"rpm"`
  317. Tpm int `json:"tpm"`
  318. }
  319. func SumUsedQuota(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string, channel int, group string) (stat Stat) {
  320. tx := LOG_DB.Table("logs").Select("sum(quota) quota")
  321. // 为rpm和tpm创建单独的查询
  322. rpmTpmQuery := LOG_DB.Table("logs").Select("count(*) rpm, sum(prompt_tokens) + sum(completion_tokens) tpm")
  323. if username != "" {
  324. tx = tx.Where("username = ?", username)
  325. rpmTpmQuery = rpmTpmQuery.Where("username = ?", username)
  326. }
  327. if tokenName != "" {
  328. tx = tx.Where("token_name = ?", tokenName)
  329. rpmTpmQuery = rpmTpmQuery.Where("token_name = ?", tokenName)
  330. }
  331. if startTimestamp != 0 {
  332. tx = tx.Where("created_at >= ?", startTimestamp)
  333. }
  334. if endTimestamp != 0 {
  335. tx = tx.Where("created_at <= ?", endTimestamp)
  336. }
  337. if modelName != "" {
  338. tx = tx.Where("model_name like ?", modelName)
  339. rpmTpmQuery = rpmTpmQuery.Where("model_name like ?", modelName)
  340. }
  341. if channel != 0 {
  342. tx = tx.Where("channel_id = ?", channel)
  343. rpmTpmQuery = rpmTpmQuery.Where("channel_id = ?", channel)
  344. }
  345. if group != "" {
  346. tx = tx.Where(logGroupCol+" = ?", group)
  347. rpmTpmQuery = rpmTpmQuery.Where(logGroupCol+" = ?", group)
  348. }
  349. tx = tx.Where("type = ?", LogTypeConsume)
  350. rpmTpmQuery = rpmTpmQuery.Where("type = ?", LogTypeConsume)
  351. // 只统计最近60秒的rpm和tpm
  352. rpmTpmQuery = rpmTpmQuery.Where("created_at >= ?", time.Now().Add(-60*time.Second).Unix())
  353. // 执行查询
  354. tx.Scan(&stat)
  355. rpmTpmQuery.Scan(&stat)
  356. return stat
  357. }
  358. func SumUsedToken(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string) (token int) {
  359. tx := LOG_DB.Table("logs").Select("ifnull(sum(prompt_tokens),0) + ifnull(sum(completion_tokens),0)")
  360. if username != "" {
  361. tx = tx.Where("username = ?", username)
  362. }
  363. if tokenName != "" {
  364. tx = tx.Where("token_name = ?", tokenName)
  365. }
  366. if startTimestamp != 0 {
  367. tx = tx.Where("created_at >= ?", startTimestamp)
  368. }
  369. if endTimestamp != 0 {
  370. tx = tx.Where("created_at <= ?", endTimestamp)
  371. }
  372. if modelName != "" {
  373. tx = tx.Where("model_name = ?", modelName)
  374. }
  375. tx.Where("type = ?", LogTypeConsume).Scan(&token)
  376. return token
  377. }
  378. func DeleteOldLog(ctx context.Context, targetTimestamp int64, limit int) (int64, error) {
  379. var total int64 = 0
  380. for {
  381. if nil != ctx.Err() {
  382. return total, ctx.Err()
  383. }
  384. result := LOG_DB.Where("created_at < ?", targetTimestamp).Limit(limit).Delete(&Log{})
  385. if nil != result.Error {
  386. return total, result.Error
  387. }
  388. total += result.RowsAffected
  389. if result.RowsAffected < int64(limit) {
  390. break
  391. }
  392. }
  393. return total, nil
  394. }
  395. // DeleteLogsByTimeRange 根据时间范围删除日志
  396. func DeleteLogsByTimeRange(ctx context.Context, startTimestamp int64, endTimestamp int64, limit int) (int64, error) {
  397. var total int64 = 0
  398. var result *gorm.DB
  399. for {
  400. if nil != ctx.Err() {
  401. return total, ctx.Err()
  402. }
  403. // 构建查询条件
  404. query := LOG_DB
  405. if startTimestamp > 0 {
  406. query = query.Where("created_at >= ?", startTimestamp)
  407. }
  408. if endTimestamp > 0 {
  409. query = query.Where("created_at <= ?", endTimestamp)
  410. }
  411. // 执行删除操作
  412. result = query.Limit(limit).Delete(&Log{})
  413. if nil != result.Error {
  414. return total, result.Error
  415. }
  416. total += result.RowsAffected
  417. if result.RowsAffected < int64(limit) {
  418. break
  419. }
  420. }
  421. return total, nil
  422. }