main.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package model
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "time"
  8. "github.com/glebarez/sqlite"
  9. "github.com/labring/aiproxy/core/common"
  10. "github.com/labring/aiproxy/core/common/config"
  11. "github.com/labring/aiproxy/core/common/env"
  12. // import fastjson serializer
  13. _ "github.com/labring/aiproxy/core/common/fastJSONSerializer"
  14. "github.com/labring/aiproxy/core/common/notify"
  15. log "github.com/sirupsen/logrus"
  16. "gorm.io/driver/mysql"
  17. "gorm.io/driver/postgres"
  18. "gorm.io/gorm"
  19. gormLogger "gorm.io/gorm/logger"
  20. )
  21. var (
  22. DB *gorm.DB
  23. LogDB *gorm.DB
  24. )
  25. func chooseDB(envName string) (*gorm.DB, error) {
  26. dsn := os.Getenv(envName)
  27. switch {
  28. case strings.HasPrefix(dsn, "postgres"):
  29. // Use PostgreSQL
  30. log.Info("using PostgreSQL as database")
  31. return OpenPostgreSQL(dsn)
  32. default:
  33. // Use SQLite
  34. absPath, err := filepath.Abs(common.SQLitePath)
  35. if err != nil {
  36. return nil, fmt.Errorf("failed to get absolute path of SQLite database: %w", err)
  37. }
  38. log.Info("SQL_DSN not set, using SQLite as database: ", absPath)
  39. common.UsingSQLite = true
  40. return OpenSQLite(absPath)
  41. }
  42. }
  43. func newDBLogger() gormLogger.Interface {
  44. var logLevel gormLogger.LogLevel
  45. if config.DebugSQLEnabled {
  46. logLevel = gormLogger.Info
  47. } else {
  48. logLevel = gormLogger.Warn
  49. }
  50. return gormLogger.New(
  51. log.StandardLogger(),
  52. gormLogger.Config{
  53. SlowThreshold: time.Second,
  54. LogLevel: logLevel,
  55. IgnoreRecordNotFoundError: true,
  56. ParameterizedQueries: !config.DebugSQLEnabled,
  57. Colorful: common.NeedColor(),
  58. },
  59. )
  60. }
  61. func OpenPostgreSQL(dsn string) (*gorm.DB, error) {
  62. return gorm.Open(postgres.New(postgres.Config{
  63. DSN: dsn,
  64. PreferSimpleProtocol: true, // disables implicit prepared statement usage
  65. }), &gorm.Config{
  66. PrepareStmt: true, // precompile SQL
  67. TranslateError: true,
  68. Logger: newDBLogger(),
  69. DisableForeignKeyConstraintWhenMigrating: false,
  70. IgnoreRelationshipsWhenMigrating: false,
  71. })
  72. }
  73. func OpenMySQL(dsn string) (*gorm.DB, error) {
  74. return gorm.Open(mysql.New(mysql.Config{
  75. DSN: strings.TrimPrefix(dsn, "mysql://"),
  76. }), &gorm.Config{
  77. PrepareStmt: true, // precompile SQL
  78. TranslateError: true,
  79. Logger: newDBLogger(),
  80. DisableForeignKeyConstraintWhenMigrating: false,
  81. IgnoreRelationshipsWhenMigrating: false,
  82. })
  83. }
  84. func OpenSQLite(sqlitePath string) (*gorm.DB, error) {
  85. baseDir := filepath.Dir(sqlitePath)
  86. if err := os.MkdirAll(baseDir, 0o755); err != nil {
  87. return nil, fmt.Errorf("failed to create base directory: %w", err)
  88. }
  89. dsn := fmt.Sprintf("%s?_busy_timeout=%d", sqlitePath, common.SQLiteBusyTimeout)
  90. return gorm.Open(sqlite.Open(dsn), &gorm.Config{
  91. PrepareStmt: true, // precompile SQL
  92. TranslateError: true,
  93. Logger: newDBLogger(),
  94. DisableForeignKeyConstraintWhenMigrating: false,
  95. IgnoreRelationshipsWhenMigrating: false,
  96. })
  97. }
  98. func InitDB() error {
  99. var err error
  100. DB, err = chooseDB("SQL_DSN")
  101. if err != nil {
  102. return fmt.Errorf("failed to initialize database: %w", err)
  103. }
  104. setDBConns(DB)
  105. if config.DisableAutoMigrateDB {
  106. return nil
  107. }
  108. log.Info("database migration started")
  109. if err = migrateDB(); err != nil {
  110. log.Fatal("failed to migrate database: " + err.Error())
  111. return fmt.Errorf("failed to migrate database: %w", err)
  112. }
  113. log.Info("database migrated")
  114. return nil
  115. }
  116. func migrateDB() error {
  117. err := DB.AutoMigrate(
  118. &Channel{},
  119. &ChannelTest{},
  120. &Token{},
  121. &PublicMCP{},
  122. &GroupModelConfig{},
  123. &PublicMCPReusingParam{},
  124. &GroupMCP{},
  125. &Group{},
  126. &Option{},
  127. &ModelConfig{},
  128. )
  129. if err != nil {
  130. return err
  131. }
  132. return nil
  133. }
  134. func InitLogDB(batchSize int) error {
  135. if os.Getenv("LOG_SQL_DSN") == "" {
  136. LogDB = DB
  137. } else {
  138. log.Info("using log database for table logs")
  139. var err error
  140. LogDB, err = chooseDB("LOG_SQL_DSN")
  141. if err != nil {
  142. return fmt.Errorf("failed to initialize log database: %w", err)
  143. }
  144. setDBConns(LogDB)
  145. }
  146. if config.DisableAutoMigrateDB {
  147. return nil
  148. }
  149. log.Info("log database migration started")
  150. err := migrateLOGDB(batchSize)
  151. if err != nil {
  152. return fmt.Errorf("failed to migrate log database: %w", err)
  153. }
  154. log.Info("log database migrated")
  155. return nil
  156. }
  157. func migrateLOGDB(batchSize int) error {
  158. // Pre-migration cleanup to remove expired data
  159. err := preMigrationCleanup(batchSize)
  160. if err != nil {
  161. log.Warn("failed to perform pre-migration cleanup: ", err.Error())
  162. }
  163. err = LogDB.AutoMigrate(
  164. &Log{},
  165. &RequestDetail{},
  166. &RetryLog{},
  167. &GroupSummary{},
  168. &Summary{},
  169. &ConsumeError{},
  170. &StoreV2{},
  171. &SummaryMinute{},
  172. &GroupSummaryMinute{},
  173. )
  174. if err != nil {
  175. return err
  176. }
  177. go func() {
  178. err := CreateLogIndexes(LogDB)
  179. if err != nil {
  180. notify.ErrorThrottle(
  181. "createLogIndexes",
  182. time.Minute,
  183. "failed to create log indexes",
  184. err.Error(),
  185. )
  186. }
  187. err = CreateSummaryIndexs(LogDB)
  188. if err != nil {
  189. notify.ErrorThrottle(
  190. "createSummaryIndexs",
  191. time.Minute,
  192. "failed to create summary indexs",
  193. err.Error(),
  194. )
  195. }
  196. err = CreateGroupSummaryIndexs(LogDB)
  197. if err != nil {
  198. notify.ErrorThrottle(
  199. "createGroupSummaryIndexs",
  200. time.Minute,
  201. "failed to create group summary indexs",
  202. err.Error(),
  203. )
  204. }
  205. err = CreateSummaryMinuteIndexs(LogDB)
  206. if err != nil {
  207. notify.ErrorThrottle(
  208. "createSummaryMinuteIndexs",
  209. time.Minute,
  210. "failed to create summary minute indexs",
  211. err.Error(),
  212. )
  213. }
  214. err = CreateGroupSummaryMinuteIndexs(LogDB)
  215. if err != nil {
  216. notify.ErrorThrottle(
  217. "createSummaryMinuteIndexs",
  218. time.Minute,
  219. "failed to create group summary minute indexs",
  220. err.Error(),
  221. )
  222. }
  223. }()
  224. return nil
  225. }
  226. func setDBConns(db *gorm.DB) {
  227. if config.DebugSQLEnabled {
  228. db = db.Debug()
  229. }
  230. sqlDB, err := db.DB()
  231. if err != nil {
  232. log.Fatal("failed to connect database: " + err.Error())
  233. return
  234. }
  235. sqlDB.SetMaxIdleConns(int(env.Int64("SQL_MAX_IDLE_CONNS", 100)))
  236. sqlDB.SetMaxOpenConns(int(env.Int64("SQL_MAX_OPEN_CONNS", 1000)))
  237. sqlDB.SetConnMaxLifetime(time.Second * time.Duration(env.Int64("SQL_MAX_LIFETIME", 60)))
  238. }
  239. func closeDB(db *gorm.DB) error {
  240. sqlDB, err := db.DB()
  241. if err != nil {
  242. return err
  243. }
  244. err = sqlDB.Close()
  245. return err
  246. }
  247. func CloseDB() error {
  248. if LogDB != DB {
  249. err := closeDB(LogDB)
  250. if err != nil {
  251. return err
  252. }
  253. }
  254. return closeDB(DB)
  255. }
  256. func ignoreNoSuchTable(err error) bool {
  257. message := err.Error()
  258. return strings.Contains(message, "no such table") ||
  259. strings.Contains(message, "does not exist")
  260. }
  261. // preMigrationCleanup cleans up expired logs and request details before migration
  262. // to reduce database size and improve migration performance
  263. func preMigrationCleanup(batchSize int) error {
  264. log.Info("starting pre-migration cleanup of expired data")
  265. // Clean up logs
  266. err := preMigrationCleanupLogs(batchSize)
  267. if err != nil {
  268. if ignoreNoSuchTable(err) {
  269. return nil
  270. }
  271. return fmt.Errorf("failed to cleanup logs: %w", err)
  272. }
  273. // Clean up request details
  274. err = preMigrationCleanupRequestDetails(batchSize)
  275. if err != nil {
  276. if ignoreNoSuchTable(err) {
  277. return nil
  278. }
  279. return fmt.Errorf("failed to cleanup request details: %w", err)
  280. }
  281. log.Info("pre-migration cleanup completed")
  282. return nil
  283. }
  284. // preMigrationCleanupLogs cleans up expired logs using ID-based batch deletion
  285. func preMigrationCleanupLogs(batchSize int) error {
  286. logStorageHours := config.GetLogStorageHours()
  287. if logStorageHours <= 0 {
  288. return nil
  289. }
  290. if batchSize <= 0 {
  291. batchSize = defaultCleanLogBatchSize
  292. }
  293. cutoffTime := time.Now().Add(-time.Duration(logStorageHours) * time.Hour)
  294. // First, get the IDs to delete
  295. ids := make([]int, 0, batchSize)
  296. for {
  297. ids = ids[:0]
  298. err := LogDB.Model(&Log{}).
  299. Select("id").
  300. Where("created_at < ?", cutoffTime).
  301. Limit(batchSize).
  302. Find(&ids).Error
  303. if err != nil {
  304. return err
  305. }
  306. // If no IDs found, we're done
  307. if len(ids) == 0 {
  308. break
  309. }
  310. // Delete by IDs
  311. err = LogDB.Where("id IN (?)", ids).
  312. Session(&gorm.Session{SkipDefaultTransaction: true}).
  313. Delete(&Log{}).Error
  314. if err != nil {
  315. return err
  316. }
  317. log.Infof("deleted %d expired log records", len(ids))
  318. // If we got less than batchSize, we're done
  319. if len(ids) < batchSize {
  320. break
  321. }
  322. }
  323. return nil
  324. }
  325. // preMigrationCleanupRequestDetails cleans up expired request details using ID-based batch deletion
  326. func preMigrationCleanupRequestDetails(batchSize int) error {
  327. detailStorageHours := config.GetLogDetailStorageHours()
  328. if detailStorageHours <= 0 {
  329. return nil
  330. }
  331. if batchSize <= 0 {
  332. batchSize = defaultCleanLogBatchSize
  333. }
  334. cutoffTime := time.Now().Add(-time.Duration(detailStorageHours) * time.Hour)
  335. // First, get the IDs to delete
  336. ids := make([]int, 0, batchSize)
  337. for {
  338. ids = ids[:0]
  339. err := LogDB.Model(&RequestDetail{}).
  340. Select("id").
  341. Where("created_at < ?", cutoffTime).
  342. Limit(batchSize).
  343. Find(&ids).Error
  344. if err != nil {
  345. return err
  346. }
  347. // If no IDs found, we're done
  348. if len(ids) == 0 {
  349. break
  350. }
  351. // Delete by IDs
  352. err = LogDB.Where("id IN (?)", ids).
  353. Session(&gorm.Session{SkipDefaultTransaction: true}).
  354. Delete(&RequestDetail{}).Error
  355. if err != nil {
  356. return err
  357. }
  358. log.Infof("deleted %d expired request detail records", len(ids))
  359. // If we got less than batchSize, we're done
  360. if len(ids) < batchSize {
  361. break
  362. }
  363. }
  364. return nil
  365. }