monitoringService.js 11 KB


  1. /**
  2. * 监控服务模块 - 处理容器状态监控和通知
  3. */
  4. const axios = require('axios');
  5. const logger = require('../logger');
  6. const configService = require('./configService');
  7. const dockerService = require('./dockerService');
  8. // 监控相关状态映射
  9. let containerStates = new Map();
  10. let lastStopAlertTime = new Map();
  11. let secondAlertSent = new Set();
  12. let monitoringInterval = null;
  13. // 更新监控配置
  14. async function updateMonitoringConfig(config) {
  15. try {
  16. const currentConfig = await configService.getConfig();
  17. currentConfig.monitoringConfig = {
  18. ...currentConfig.monitoringConfig,
  19. ...config
  20. };
  21. await configService.saveConfig(currentConfig);
  22. // 重新启动监控
  23. await startMonitoring();
  24. return { success: true };
  25. } catch (error) {
  26. logger.error('更新监控配置失败:', error);
  27. throw error;
  28. }
  29. }
  30. // 启动监控
  31. async function startMonitoring() {
  32. try {
  33. const config = await configService.getConfig();
  34. const { isEnabled, monitorInterval } = config.monitoringConfig || {};
  35. // 如果监控已启用
  36. if (isEnabled) {
  37. const docker = await dockerService.getDockerConnection();
  38. if (docker) {
  39. // 初始化容器状态
  40. await initializeContainerStates(docker);
  41. // 如果已存在监控间隔,清除它
  42. if (monitoringInterval) {
  43. clearInterval(monitoringInterval);
  44. }
  45. // 启动监控间隔
  46. monitoringInterval = setInterval(async () => {
  47. await checkContainerStates(docker, config.monitoringConfig);
  48. }, (monitorInterval || 60) * 1000);
  49. // 监听Docker事件流
  50. try {
  51. const dockerEventStream = await docker.getEvents();
  52. dockerEventStream.on('data', async (chunk) => {
  53. try {
  54. const event = JSON.parse(chunk.toString());
  55. // 处理容器状态变化事件
  56. if (event.Type === 'container' &&
  57. (event.Action === 'start' || event.Action === 'die' ||
  58. event.Action === 'stop' || event.Action === 'kill')) {
  59. await handleContainerEvent(docker, event, config.monitoringConfig);
  60. }
  61. } catch (eventError) {
  62. logger.error('处理Docker事件出错:', eventError);
  63. }
  64. });
  65. dockerEventStream.on('error', (err) => {
  66. logger.error('Docker事件流错误:', err);
  67. });
  68. } catch (streamError) {
  69. logger.error('无法获取Docker事件流:', streamError);
  70. }
  71. return true;
  72. }
  73. } else if (monitoringInterval) {
  74. // 如果监控已禁用但间隔仍在运行,停止它
  75. clearInterval(monitoringInterval);
  76. monitoringInterval = null;
  77. }
  78. return false;
  79. } catch (error) {
  80. logger.error('启动监控失败:', error);
  81. return false;
  82. }
  83. }
  84. // 停止监控
  85. function stopMonitoring() {
  86. if (monitoringInterval) {
  87. clearInterval(monitoringInterval);
  88. monitoringInterval = null;
  89. logger.info('容器监控已停止');
  90. }
  91. return true;
  92. }
  93. // 初始化容器状态
  94. async function initializeContainerStates(docker) {
  95. try {
  96. const containers = await docker.listContainers({ all: true });
  97. for (const container of containers) {
  98. const containerInfo = await docker.getContainer(container.Id).inspect();
  99. containerStates.set(container.Id, containerInfo.State.Status);
  100. }
  101. } catch (error) {
  102. logger.error('初始化容器状态失败:', error);
  103. }
  104. }
  105. // 处理容器事件
  106. async function handleContainerEvent(docker, event, monitoringConfig) {
  107. try {
  108. const containerId = event.Actor.ID;
  109. const container = docker.getContainer(containerId);
  110. const containerInfo = await container.inspect();
  111. const newStatus = containerInfo.State.Status;
  112. const oldStatus = containerStates.get(containerId);
  113. if (oldStatus && oldStatus !== newStatus) {
  114. // 如果容器从停止状态变为运行状态
  115. if (newStatus === 'running' && oldStatus !== 'running') {
  116. await sendAlertWithRetry(
  117. containerInfo.Name,
  118. `恢复运行 (之前状态: ${oldStatus}, 当前状态: ${newStatus})`,
  119. monitoringConfig
  120. );
  121. // 清除告警状态
  122. lastStopAlertTime.delete(containerInfo.Name);
  123. secondAlertSent.delete(containerInfo.Name);
  124. }
  125. // 如果容器从运行状态变为停止状态
  126. else if (oldStatus === 'running' && newStatus !== 'running') {
  127. await sendAlertWithRetry(
  128. containerInfo.Name,
  129. `停止运行 (之前状态: ${oldStatus}, 当前状态: ${newStatus})`,
  130. monitoringConfig
  131. );
  132. // 记录停止时间,用于后续检查
  133. lastStopAlertTime.set(containerInfo.Name, Date.now());
  134. secondAlertSent.delete(containerInfo.Name);
  135. }
  136. // 更新状态记录
  137. containerStates.set(containerId, newStatus);
  138. }
  139. } catch (error) {
  140. logger.error('处理容器事件失败:', error);
  141. }
  142. }
  143. // 检查容器状态
  144. async function checkContainerStates(docker, monitoringConfig) {
  145. try {
  146. const containers = await docker.listContainers({ all: true });
  147. for (const container of containers) {
  148. const containerInfo = await docker.getContainer(container.Id).inspect();
  149. const newStatus = containerInfo.State.Status;
  150. const oldStatus = containerStates.get(container.Id);
  151. // 如果状态发生变化
  152. if (oldStatus && oldStatus !== newStatus) {
  153. // 处理状态变化,与handleContainerEvent相同的逻辑
  154. if (newStatus === 'running' && oldStatus !== 'running') {
  155. await sendAlertWithRetry(
  156. containerInfo.Name,
  157. `恢复运行 (之前状态: ${oldStatus}, 当前状态: ${newStatus})`,
  158. monitoringConfig
  159. );
  160. lastStopAlertTime.delete(containerInfo.Name);
  161. secondAlertSent.delete(containerInfo.Name);
  162. }
  163. else if (oldStatus === 'running' && newStatus !== 'running') {
  164. await sendAlertWithRetry(
  165. containerInfo.Name,
  166. `停止运行 (之前状态: ${oldStatus}, 当前状态: ${newStatus})`,
  167. monitoringConfig
  168. );
  169. lastStopAlertTime.set(containerInfo.Name, Date.now());
  170. secondAlertSent.delete(containerInfo.Name);
  171. }
  172. containerStates.set(container.Id, newStatus);
  173. }
  174. // 如果容器仍处于非运行状态,检查是否需要发送二次告警
  175. else if (newStatus !== 'running') {
  176. await checkSecondStopAlert(containerInfo.Name, newStatus, monitoringConfig);
  177. }
  178. }
  179. } catch (error) {
  180. logger.error('检查容器状态失败:', error);
  181. }
  182. }
  183. // 检查是否需要发送二次停止告警
  184. async function checkSecondStopAlert(containerName, currentStatus, monitoringConfig) {
  185. const now = Date.now();
  186. const lastStopAlert = lastStopAlertTime.get(containerName) || 0;
  187. // 如果距离上次停止告警超过1小时,且还没有发送过第二次告警,则发送第二次告警
  188. if (now - lastStopAlert >= 60 * 60 * 1000 && !secondAlertSent.has(containerName)) {
  189. await sendAlertWithRetry(containerName, `仍未恢复 (当前状态: ${currentStatus})`, monitoringConfig);
  190. secondAlertSent.add(containerName); // 标记已发送第二次告警
  191. }
  192. }
  193. // 发送告警(带重试)
  194. async function sendAlertWithRetry(containerName, status, monitoringConfig, maxRetries = 6) {
  195. const { notificationType, webhookUrl, telegramToken, telegramChatId } = monitoringConfig;
  196. const cleanContainerName = containerName.replace(/^\//, '');
  197. for (let attempt = 1; attempt <= maxRetries; attempt++) {
  198. try {
  199. if (notificationType === 'wechat') {
  200. await sendWechatAlert(webhookUrl, cleanContainerName, status);
  201. } else if (notificationType === 'telegram') {
  202. await sendTelegramAlert(telegramToken, telegramChatId, cleanContainerName, status);
  203. }
  204. logger.success(`告警发送成功: ${cleanContainerName} ${status}`);
  205. return;
  206. } catch (error) {
  207. if (attempt === maxRetries) {
  208. logger.error(`达到最大重试次数,放弃发送告警: ${cleanContainerName} ${status}`);
  209. logger.error('最后一次错误:', error);
  210. return;
  211. }
  212. logger.warn(`告警发送失败,尝试重试 (${attempt}/${maxRetries}): ${error.message}`);
  213. await new Promise(resolve => setTimeout(resolve, 10000));
  214. }
  215. }
  216. }
  217. // 发送企业微信告警
  218. async function sendWechatAlert(webhookUrl, containerName, status) {
  219. if (!webhookUrl) {
  220. throw new Error('企业微信 Webhook URL 未设置');
  221. }
  222. const response = await axios.post(webhookUrl, {
  223. msgtype: 'text',
  224. text: {
  225. content: `Docker 容器告警: 容器 ${containerName} ${status}`
  226. }
  227. }, {
  228. timeout: 5000
  229. });
  230. if (response.status !== 200 || response.data.errcode !== 0) {
  231. throw new Error(`请求成功但返回错误:${response.data.errmsg || JSON.stringify(response.data)}`);
  232. }
  233. }
  234. // 发送Telegram告警
  235. async function sendTelegramAlert(token, chatId, containerName, status) {
  236. if (!token || !chatId) {
  237. throw new Error('Telegram Bot Token 或 Chat ID 未设置');
  238. }
  239. const url = `https://api.telegram.org/bot${token}/sendMessage`;
  240. const response = await axios.post(url, {
  241. chat_id: chatId,
  242. text: `Docker 容器告警: 容器 ${containerName} ${status}`
  243. }, {
  244. timeout: 5000
  245. });
  246. if (response.status !== 200 || !response.data.ok) {
  247. throw new Error(`发送Telegram消息失败:${JSON.stringify(response.data)}`);
  248. }
  249. }
  250. // 测试通知
  251. async function testNotification(config) {
  252. const { notificationType, webhookUrl, telegramToken, telegramChatId } = config;
  253. if (notificationType === 'wechat') {
  254. await sendWechatAlert(webhookUrl, 'Test Container', 'This is a test notification');
  255. } else if (notificationType === 'telegram') {
  256. await sendTelegramAlert(telegramToken, telegramChatId, 'Test Container', 'This is a test notification');
  257. } else {
  258. throw new Error('不支持的通知类型');
  259. }
  260. return { success: true };
  261. }
  262. // 切换监控状态
  263. async function toggleMonitoring(isEnabled) {
  264. const config = await configService.getConfig();
  265. config.monitoringConfig.isEnabled = isEnabled;
  266. await configService.saveConfig(config);
  267. return startMonitoring();
  268. }
  269. // 获取已停止的容器
  270. async function getStoppedContainers(forceRefresh = false) {
  271. return await dockerService.getStoppedContainers();
  272. }
  273. module.exports = {
  274. updateMonitoringConfig,
  275. startMonitoring,
  276. stopMonitoring,
  277. testNotification,
  278. toggleMonitoring,
  279. getStoppedContainers,
  280. sendAlertWithRetry
  281. };