2
0

cleanup-queue.ts 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import Queue from 'bull';
  2. import { createBullBoard } from '@bull-board/api';
  3. import { BullAdapter } from '@bull-board/api/bullAdapter';
  4. import { ExpressAdapter } from '@bull-board/express';
  5. import { logger } from '@/lib/logger';
  6. import { cleanupLogs } from './service';
  7. import { getSystemSettings } from '@/repository/system-config';
  8. /**
  9. * 日志清理任务队列
  10. */
  11. export const cleanupQueue = new Queue('log-cleanup', {
  12. redis: {
  13. host: process.env.REDIS_HOST || 'localhost',
  14. port: parseInt(process.env.REDIS_PORT || '6379'),
  15. },
  16. defaultJobOptions: {
  17. attempts: 3, // 失败重试 3 次
  18. backoff: {
  19. type: 'exponential',
  20. delay: 60000, // 首次重试延迟 1 分钟
  21. },
  22. removeOnComplete: 100, // 保留最近 100 个完成任务
  23. removeOnFail: 50, // 保留最近 50 个失败任务
  24. },
  25. });
  26. /**
  27. * 处理清理任务
  28. */
  29. cleanupQueue.process(async (job) => {
  30. logger.info({
  31. action: 'cleanup_job_start',
  32. jobId: job.id,
  33. conditions: job.data.conditions,
  34. });
  35. const result = await cleanupLogs(
  36. job.data.conditions,
  37. { batchSize: job.data.batchSize },
  38. { type: 'scheduled' }
  39. );
  40. if (result.error) {
  41. throw new Error(result.error);
  42. }
  43. logger.info({
  44. action: 'cleanup_job_complete',
  45. jobId: job.id,
  46. totalDeleted: result.totalDeleted,
  47. durationMs: result.durationMs,
  48. });
  49. return result;
  50. });
  51. /**
  52. * 错误处理
  53. */
  54. cleanupQueue.on('failed', (job, err) => {
  55. logger.error({
  56. action: 'cleanup_job_failed',
  57. jobId: job.id,
  58. error: err.message,
  59. attempts: job.attemptsMade,
  60. });
  61. });
  62. /**
  63. * 添加或更新定时清理任务
  64. */
  65. export async function scheduleAutoCleanup() {
  66. try {
  67. const settings = await getSystemSettings();
  68. if (!settings.enableAutoCleanup) {
  69. logger.info({ action: 'auto_cleanup_disabled' });
  70. // 移除所有已存在的定时任务
  71. const repeatableJobs = await cleanupQueue.getRepeatableJobs();
  72. for (const job of repeatableJobs) {
  73. await cleanupQueue.removeRepeatableByKey(job.key);
  74. }
  75. return;
  76. }
  77. // 移除旧的定时任务
  78. const repeatableJobs = await cleanupQueue.getRepeatableJobs();
  79. for (const job of repeatableJobs) {
  80. await cleanupQueue.removeRepeatableByKey(job.key);
  81. }
  82. // 构建清理条件(使用默认值)
  83. const retentionDays = settings.cleanupRetentionDays ?? 30;
  84. const beforeDate = new Date();
  85. beforeDate.setDate(beforeDate.getDate() - retentionDays);
  86. // 添加新的定时任务
  87. await cleanupQueue.add(
  88. 'auto-cleanup',
  89. {
  90. conditions: { beforeDate },
  91. batchSize: settings.cleanupBatchSize ?? 10000,
  92. },
  93. {
  94. repeat: {
  95. cron: settings.cleanupSchedule ?? '0 2 * * *', // 默认每天凌晨 2 点
  96. },
  97. }
  98. );
  99. logger.info({
  100. action: 'auto_cleanup_scheduled',
  101. schedule: settings.cleanupSchedule ?? '0 2 * * *',
  102. retentionDays,
  103. batchSize: settings.cleanupBatchSize ?? 10000,
  104. });
  105. } catch (error) {
  106. logger.error({
  107. action: 'schedule_auto_cleanup_error',
  108. error: error instanceof Error ? error.message : String(error),
  109. });
  110. // Fail Open: 调度失败不影响应用启动
  111. }
  112. }
  113. /**
  114. * Bull Board 监控面板
  115. */
  116. export function createCleanupMonitor() {
  117. const serverAdapter = new ExpressAdapter();
  118. serverAdapter.setBasePath('/admin/queues');
  119. createBullBoard({
  120. queues: [new BullAdapter(cleanupQueue)],
  121. serverAdapter,
  122. });
  123. return serverAdapter.getRouter();
  124. }
  125. /**
  126. * 停止清理队列(优雅关闭)
  127. */
  128. export async function stopCleanupQueue() {
  129. await cleanupQueue.close();
  130. logger.info({ action: 'cleanup_queue_closed' });
  131. }