Просмотр исходного кода

feat: auto cleanup when migrate log database (#367)

* feat: auto cleanup when migrate log database

* fix: ci lint

* chore: set default clean log batch size to 5000
zijiren 3 месяцев назад
Родитель
Сommit
5b95779266
5 измененных файлов с 171 добавлено и 25 удалено
  1. 1 1
      README.md
  2. 1 1
      README.zh.md
  3. 1 1
      core/common/config/config.go
  4. 13 8
      core/main.go
  5. 155 14
      core/model/main.go

+ 1 - 1
README.md

@@ -173,7 +173,7 @@ GROUP_MAX_TOKEN_NUM=100        # Max tokens per group
 ```bash
 LOG_STORAGE_HOURS=168          # Log retention (0 = unlimited)
 LOG_DETAIL_STORAGE_HOURS=72    # Detail log retention
-CLEAN_LOG_BATCH_SIZE=2000      # Log cleanup batch size
+CLEAN_LOG_BATCH_SIZE=5000      # Log cleanup batch size
 ```
 
 #### **Security & Access Control**

+ 1 - 1
README.zh.md

@@ -173,7 +173,7 @@ GROUP_MAX_TOKEN_NUM=100        # 每组最大令牌数
 ```bash
 LOG_STORAGE_HOURS=168          # 日志保留时间(0 = 无限制)
 LOG_DETAIL_STORAGE_HOURS=72    # 详细日志保留时间
-CLEAN_LOG_BATCH_SIZE=2000      # 日志清理批次大小
+CLEAN_LOG_BATCH_SIZE=5000      # 日志清理批次大小
 ```
 
 #### **安全与访问控制**

+ 1 - 1
core/common/config/config.go

@@ -17,7 +17,7 @@ var (
 	logDetailRequestBodyMaxSize  int64 = 128 * 1024 // 128KB
 	logDetailResponseBodyMaxSize int64 = 128 * 1024 // 128KB
 	logDetailStorageHours        int64 = 3 * 24     // 3 days
-	cleanLogBatchSize            int64 = 2000
+	cleanLogBatchSize            int64 = 5000
 	notifyNote                   atomic.Value
 	ipGroupsThreshold            int64
 	ipGroupsBanThreshold         int64

+ 13 - 8
core/main.go

@@ -49,15 +49,23 @@ func initializeServices() error {
 	initializePprof()
 	initializeNotifier()
 
+	if err := common.InitRedisClient(); err != nil {
+		return err
+	}
+
 	if err := initializeBalance(); err != nil {
 		return err
 	}
 
-	if err := initializeDatabases(); err != nil {
+	if err := model.InitDB(); err != nil {
+		return err
+	}
+
+	if err := initializeOptionAndCaches(); err != nil {
 		return err
 	}
 
-	return initializeCaches()
+	return model.InitLogDB(int(config.GetCleanLogBatchSize()))
 }
 
 func initializePprof() {
@@ -89,16 +97,13 @@ func initializeNotifier() {
 	}
 }
 
-func initializeDatabases() error {
-	model.InitDB()
-	model.InitLogDB()
-	return common.InitRedisClient()
-}
+func initializeOptionAndCaches() error {
+	log.Info("starting init config and channel")
 
-func initializeCaches() error {
 	if err := model.InitOption2DB(); err != nil {
 		return err
 	}
+
 	return model.InitModelConfigAndChannelCache()
 }
 

+ 155 - 14
core/model/main.go

@@ -112,29 +112,30 @@ func OpenSQLite(sqlitePath string) (*gorm.DB, error) {
 	})
 }
 
-func InitDB() {
+func InitDB() error {
 	var err error
 
 	DB, err = chooseDB("SQL_DSN")
 	if err != nil {
-		log.Fatal("failed to initialize database: " + err.Error())
-		return
+		return fmt.Errorf("failed to initialize database: %w", err)
 	}
 
 	setDBConns(DB)
 
 	if config.DisableAutoMigrateDB {
-		return
+		return nil
 	}
 
 	log.Info("database migration started")
 
 	if err = migrateDB(); err != nil {
 		log.Fatal("failed to migrate database: " + err.Error())
-		return
+		return fmt.Errorf("failed to migrate database: %w", err)
 	}
 
 	log.Info("database migrated")
+
+	return nil
 }
 
 func migrateDB() error {
@@ -157,7 +158,7 @@ func migrateDB() error {
 	return nil
 }
 
-func InitLogDB() {
+func InitLogDB(batchSize int) error {
 	if os.Getenv("LOG_SQL_DSN") == "" {
 		LogDB = DB
 	} else {
@@ -167,30 +168,36 @@ func InitLogDB() {
 
 		LogDB, err = chooseDB("LOG_SQL_DSN")
 		if err != nil {
-			log.Fatal("failed to initialize log database: " + err.Error())
-			return
+			return fmt.Errorf("failed to initialize log database: %w", err)
 		}
 
 		setDBConns(LogDB)
 	}
 
 	if config.DisableAutoMigrateDB {
-		return
+		return nil
 	}
 
 	log.Info("log database migration started")
 
-	err := migrateLOGDB()
+	err := migrateLOGDB(batchSize)
 	if err != nil {
-		log.Fatal("failed to migrate log database: " + err.Error())
-		return
+		return fmt.Errorf("failed to migrate log database: %w", err)
 	}
 
 	log.Info("log database migrated")
+
+	return nil
 }
 
-func migrateLOGDB() error {
-	err := LogDB.AutoMigrate(
+func migrateLOGDB(batchSize int) error {
+	// Pre-migration cleanup to remove expired data
+	err := preMigrationCleanup(batchSize)
+	if err != nil {
+		log.Warn("failed to perform pre-migration cleanup: ", err.Error())
+	}
+
+	err = LogDB.AutoMigrate(
 		&Log{},
 		&RequestDetail{},
 		&RetryLog{},
@@ -297,3 +304,137 @@ func CloseDB() error {
 
 	return closeDB(DB)
 }
+
+func ignoreNoSuchTable(err error) bool {
+	message := err.Error()
+	return strings.Contains(message, "no such table") ||
+		strings.Contains(message, "does not exist")
+}
+
+// preMigrationCleanup cleans up expired logs and request details before migration
+// to reduce database size and improve migration performance
+func preMigrationCleanup(batchSize int) error {
+	log.Info("starting pre-migration cleanup of expired data")
+
+	// Clean up logs
+	err := preMigrationCleanupLogs(batchSize)
+	if err != nil {
+		if ignoreNoSuchTable(err) {
+			return nil
+		}
+		return fmt.Errorf("failed to cleanup logs: %w", err)
+	}
+
+	// Clean up request details
+	err = preMigrationCleanupRequestDetails(batchSize)
+	if err != nil {
+		if ignoreNoSuchTable(err) {
+			return nil
+		}
+		return fmt.Errorf("failed to cleanup request details: %w", err)
+	}
+
+	log.Info("pre-migration cleanup completed")
+
+	return nil
+}
+
+// preMigrationCleanupLogs cleans up expired logs using ID-based batch deletion
+func preMigrationCleanupLogs(batchSize int) error {
+	logStorageHours := config.GetLogStorageHours()
+	if logStorageHours <= 0 {
+		return nil
+	}
+
+	if batchSize <= 0 {
+		batchSize = defaultCleanLogBatchSize
+	}
+
+	cutoffTime := time.Now().Add(-time.Duration(logStorageHours) * time.Hour)
+
+	for {
+		// First, get the IDs to delete
+		ids := make([]int, 0, batchSize)
+
+		err := LogDB.Model(&Log{}).
+			Select("id").
+			Where("created_at < ?", cutoffTime).
+			Limit(batchSize).
+			Find(&ids).Error
+		if err != nil {
+			return err
+		}
+
+		// If no IDs found, we're done
+		if len(ids) == 0 {
+			break
+		}
+
+		// Delete by IDs
+		err = LogDB.Where("id IN (?)", ids).
+			Session(&gorm.Session{SkipDefaultTransaction: true}).
+			Delete(&Log{}).Error
+		if err != nil {
+			return err
+		}
+
+		log.Infof("deleted %d expired log records", len(ids))
+
+		// If we got less than batchSize, we're done
+		if len(ids) < batchSize {
+			break
+		}
+	}
+
+	return nil
+}
+
+// preMigrationCleanupRequestDetails cleans up expired request details using ID-based batch deletion
+func preMigrationCleanupRequestDetails(batchSize int) error {
+	detailStorageHours := config.GetLogDetailStorageHours()
+	if detailStorageHours <= 0 {
+		return nil
+	}
+
+	if batchSize <= 0 {
+		batchSize = defaultCleanLogBatchSize
+	}
+
+	cutoffTime := time.Now().Add(-time.Duration(detailStorageHours) * time.Hour)
+
+	for {
+		// First, get the IDs to delete
+		ids := make([]int, 0, batchSize)
+
+		err := LogDB.Model(&RequestDetail{}).
+			Select("id").
+			Where("created_at < ?", cutoffTime).
+			Limit(batchSize).
+			Find(&ids).Error
+		if err != nil {
+			return err
+		}
+
+		// If no IDs found, we're done
+		if len(ids) == 0 {
+			break
+		}
+
+		// Delete by IDs
+		err = LogDB.Where("id IN (?)", ids).
+			Session(&gorm.Session{SkipDefaultTransaction: true}).
+			Delete(&RequestDetail{}).Error
+		if err != nil {
+			return err
+		}
+
+		log.Infof("deleted %d expired request detail records", len(ids))
+
+		// If we got less than batchSize, we're done
+		if len(ids) < batchSize {
+			break
+		}
+	}
+
+	return nil
+}