cron_helper.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package cron_helper
  2. import (
  3. "fmt"
  4. "github.com/allanpk716/ChineseSubFinder/internal/dao"
  5. "github.com/allanpk716/ChineseSubFinder/internal/models"
  6. "sync"
  7. "time"
  8. "github.com/allanpk716/ChineseSubFinder/internal/logic/file_downloader"
  9. "github.com/allanpk716/ChineseSubFinder/internal/logic/scan_played_video_subinfo"
  10. "github.com/allanpk716/ChineseSubFinder/internal/pkg/downloader"
  11. "github.com/allanpk716/ChineseSubFinder/internal/pkg/settings"
  12. "github.com/allanpk716/ChineseSubFinder/internal/pkg/sub_formatter"
  13. "github.com/allanpk716/ChineseSubFinder/internal/pkg/task_queue"
  14. "github.com/allanpk716/ChineseSubFinder/internal/pkg/video_scan_and_refresh_helper"
  15. "github.com/robfig/cron/v3"
  16. "github.com/sirupsen/logrus"
  17. )
  18. type CronHelper struct {
  19. stopping bool // 正在停止
  20. cronHelperRunning bool // 这个是定时器启动的状态,它为true,不代表核心函数在执行
  21. scanPlayedVideoSubInfo *scan_played_video_subinfo.ScanPlayedVideoSubInfo // 扫描已经播放过的视频的字幕信息
  22. FileDownloader *file_downloader.FileDownloader // 文件下载器
  23. DownloadQueue *task_queue.TaskQueue // 需要下载的视频的队列
  24. Downloader *downloader.Downloader // 下载者线程
  25. videoScanAndRefreshHelper *video_scan_and_refresh_helper.VideoScanAndRefreshHelper // 视频扫描和刷新的帮助类
  26. cronLock sync.Mutex // 锁
  27. c *cron.Cron // 定时器实例
  28. Settings *settings.Settings // 设置实例
  29. log *logrus.Logger // 日志实例
  30. entryIDScanVideoProcess cron.EntryID
  31. entryIDSupplierCheck cron.EntryID
  32. entryIDQueueDownloader cron.EntryID
  33. entryIDScanPlayedVideoSubInfo cron.EntryID
  34. entryIDUploadPlayedVideoSub cron.EntryID
  35. }
  36. func NewCronHelper(fileDownloader *file_downloader.FileDownloader) *CronHelper {
  37. ch := CronHelper{
  38. FileDownloader: fileDownloader,
  39. log: fileDownloader.Log,
  40. Settings: fileDownloader.Settings,
  41. // 实例化下载队列
  42. DownloadQueue: task_queue.NewTaskQueue(fileDownloader.CacheCenter),
  43. }
  44. var err error
  45. // ----------------------------------------------
  46. // 扫描已播放
  47. ch.scanPlayedVideoSubInfo, err = scan_played_video_subinfo.NewScanPlayedVideoSubInfo(ch.log, ch.Settings, fileDownloader)
  48. if err != nil {
  49. ch.log.Panicln(err)
  50. }
  51. // ----------------------------------------------
  52. // 字幕扫描器
  53. ch.videoScanAndRefreshHelper = video_scan_and_refresh_helper.NewVideoScanAndRefreshHelper(
  54. ch.FileDownloader,
  55. ch.DownloadQueue)
  56. // ----------------------------------------------
  57. // 初始化下载者,里面的两个 func 需要使用定时器启动 SupplierCheck QueueDownloader
  58. ch.Downloader = downloader.NewDownloader(
  59. sub_formatter.GetSubFormatter(ch.log, ch.Settings.AdvancedSettings.SubNameFormatter),
  60. ch.FileDownloader, ch.DownloadQueue)
  61. // 强制进行一次字幕源有效性检查
  62. ch.Downloader.SupplierCheck()
  63. return &ch
  64. }
  65. // Start 开启定时器任务,这个任务是非阻塞的,scanVideoProcessAdd2DownloadQueue 仅仅可能是这个函数执行耗时而已
  66. // runImmediately == false 那么 ch.c.Start() 是不会阻塞的
  67. func (ch *CronHelper) Start(runImmediately bool) {
  68. ch.cronLock.Lock()
  69. if ch.cronHelperRunning == true {
  70. ch.cronLock.Unlock()
  71. return
  72. }
  73. ch.cronLock.Unlock()
  74. ch.cronLock.Lock()
  75. ch.cronHelperRunning = true
  76. ch.stopping = false
  77. ch.cronLock.Unlock()
  78. // ----------------------------------------------
  79. // 判断扫描任务的时间间隔是否符合要求,不符合则重写默认值
  80. _, err := cron.ParseStandard(ch.Settings.CommonSettings.ScanInterval)
  81. if err != nil {
  82. ch.log.Warningln("CommonSettings.ScanInterval format error, after v0.25.x , need reset this at WebUI")
  83. // 如果解析错误了,就需要重新赋值默认值过来,然后保存
  84. nowSettings := ch.Settings
  85. nowSettings.CommonSettings.ScanInterval = settings.NewCommonSettings().ScanInterval
  86. err = settings.SetFullNewSettings(nowSettings)
  87. if err != nil {
  88. ch.log.Panicln("CronHelper.SetFullNewSettings:", err)
  89. return
  90. }
  91. }
  92. // ----------------------------------------------
  93. ch.c = cron.New(cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger)))
  94. {
  95. // 测试部分定时器代码,提前运行
  96. if ch.Settings.SpeedDevMode == true {
  97. ch.scanPlayedVideoSub()
  98. }
  99. }
  100. // 定时器
  101. // 这个暂时无法被取消执行
  102. ch.entryIDScanVideoProcess, err = ch.c.AddFunc(ch.Settings.CommonSettings.ScanInterval, ch.scanVideoProcessAdd2DownloadQueue)
  103. if err != nil {
  104. ch.log.Panicln("CronHelper scanVideoProcessAdd2DownloadQueue, scanVideoProcessAdd2DownloadQueue Cron entryID:", ch.entryIDScanVideoProcess, "Error:", err)
  105. }
  106. // 这个可以由 ch.Downloader.Cancel() 取消执行
  107. ch.entryIDSupplierCheck, err = ch.c.AddFunc("@every 1h", ch.Downloader.SupplierCheck)
  108. if err != nil {
  109. ch.log.Panicln("CronHelper SupplierCheck, SupplierCheck Cron entryID:", ch.entryIDSupplierCheck, "Error:", err)
  110. }
  111. // 这个可以由 ch.Downloader.Cancel() 取消执行
  112. ch.entryIDQueueDownloader, err = ch.c.AddFunc("@every 15s", ch.Downloader.QueueDownloader)
  113. if err != nil {
  114. ch.log.Panicln("CronHelper QueueDownloader, QueueDownloader Cron entryID:", ch.entryIDQueueDownloader, "Error:", err)
  115. }
  116. // 这个可以由 ch.scanPlayedVideoSubInfo.Cancel() 取消执行
  117. ch.entryIDScanPlayedVideoSubInfo, err = ch.c.AddFunc("@every 24h", ch.scanPlayedVideoSub)
  118. if err != nil {
  119. ch.log.Panicln("CronHelper QueueDownloader, scanPlayedVideoSub Cron entryID:", ch.entryIDScanPlayedVideoSubInfo, "Error:", err)
  120. }
  121. // 字幕的上传逻辑
  122. if ch.Settings.ExperimentalFunction.ShareSubSettings.ShareSubEnabled == true {
  123. ch.entryIDUploadPlayedVideoSub, err = ch.c.AddFunc("@every 5m", ch.uploadPlayedVideoSub)
  124. if err != nil {
  125. ch.log.Panicln("CronHelper QueueDownloader, uploadPlayedVideoSub Cron entryID:", ch.entryIDUploadPlayedVideoSub, "Error:", err)
  126. }
  127. }
  128. // ----------------------------------------------
  129. if runImmediately == true {
  130. // 是否在定时器开启前先执行一次视频扫描任务
  131. ch.cronLock.Lock()
  132. if ch.cronHelperRunning == true && ch.stopping == false {
  133. ch.cronLock.Unlock()
  134. //----------------------------------------------
  135. // 没有停止,那么继续扫描
  136. ch.log.Infoln("First Time scanVideoProcessAdd2DownloadQueue Start")
  137. if ch.Settings.SpeedDevMode == false {
  138. ch.scanVideoProcessAdd2DownloadQueue()
  139. }
  140. ch.log.Infoln("First Time scanVideoProcessAdd2DownloadQueue End")
  141. //----------------------------------------------
  142. } else {
  143. ch.cronLock.Unlock()
  144. ch.log.Infoln("CronHelper is stopping, not start scanVideoProcessAdd2DownloadQueue")
  145. return
  146. }
  147. } else {
  148. ch.log.Infoln("RunAtStartup: false, so will not Run At Startup")
  149. }
  150. // ----------------------------------------------
  151. // 如果不是立即执行,那么就等待定时器开启
  152. ch.cronLock.Lock()
  153. if ch.cronHelperRunning == true && ch.stopping == false {
  154. ch.cronLock.Unlock()
  155. //----------------------------------------------
  156. ch.log.Infoln("CronHelper Start...")
  157. ch.c.Start()
  158. //----------------------------------------------
  159. // 只有定时任务 start 之后才能拿到信息
  160. if len(ch.c.Entries()) > 0 {
  161. // 不会马上启动扫描,那么就需要设置当前的时间,且为 waiting
  162. tttt := ch.c.Entry(ch.entryIDScanVideoProcess).Next.Format("2006-01-02 15:04:05")
  163. ch.log.Infoln("Next Sub Scan Will Process At:", tttt)
  164. } else {
  165. ch.log.Errorln("Can't get cron jobs, will not send SubScanJobStatus")
  166. }
  167. //----------------------------------------------
  168. } else {
  169. ch.cronLock.Unlock()
  170. ch.log.Infoln("CronHelper is stopping, not start CronHelper")
  171. }
  172. //----------------------------------------------
  173. }
  174. // Stop 会阻塞等待任务完成
  175. func (ch *CronHelper) Stop() {
  176. cronHelperRunning := false
  177. ch.cronLock.Lock()
  178. cronHelperRunning = ch.cronHelperRunning
  179. ch.cronLock.Unlock()
  180. if cronHelperRunning == false {
  181. return
  182. }
  183. ch.cronLock.Lock()
  184. if ch.stopping == true {
  185. ch.cronLock.Unlock()
  186. return
  187. }
  188. ch.stopping = true
  189. ch.cronLock.Unlock()
  190. ch.videoScanAndRefreshHelper.Cancel()
  191. ch.Downloader.Cancel()
  192. ch.scanPlayedVideoSubInfo.Cancel()
  193. // Stop stops the cron scheduler if it is running; otherwise it does nothing.
  194. // A context is returned so the caller can wait for running jobs to complete.
  195. nowContext := ch.c.Stop()
  196. select {
  197. case <-time.After(5 * time.Minute):
  198. ch.log.Warningln("Wait over 5 min, CronHelper is timeout")
  199. case <-nowContext.Done():
  200. ch.log.Infoln("CronHelper.Stop() context<-Done.")
  201. }
  202. ch.cronLock.Lock()
  203. ch.cronHelperRunning = false
  204. ch.stopping = false
  205. ch.cronLock.Unlock()
  206. ch.log.Infoln("CronHelper.Stop() Done.")
  207. }
  208. func (ch *CronHelper) scanPlayedVideoSub() {
  209. bok, err := ch.scanPlayedVideoSubInfo.GetPlayedItemsSubtitle()
  210. if err != nil {
  211. ch.log.Errorln(err)
  212. }
  213. if bok == true {
  214. ch.scanPlayedVideoSubInfo.Clear()
  215. err = ch.scanPlayedVideoSubInfo.Scan()
  216. if err != nil {
  217. ch.log.Errorln(err)
  218. }
  219. }
  220. }
  221. // uploadPlayedVideoSub 上传字幕的定时器
  222. func (ch *CronHelper) uploadPlayedVideoSub() {
  223. // 找出没有上传过的字幕列表
  224. var notUploadedVideoSubInfos []models.VideoSubInfo
  225. dao.GetDb().Where("is_send = ?", false).Limit(1).Find(&notUploadedVideoSubInfos)
  226. if len(notUploadedVideoSubInfos) < 1 {
  227. ch.log.Debugln("No notUploadedVideoSubInfos")
  228. return
  229. }
  230. // 问询这个字幕是否上传过了,如果没有就需要进入上传的队列
  231. askForUploadReply, err := ch.FileDownloader.SubtitleBestApi.AskFroUpload(notUploadedVideoSubInfos[0].SHA256)
  232. if err != nil {
  233. ch.log.Errorln(fmt.Errorf("AskFroUpload err: %v", err))
  234. return
  235. }
  236. if askForUploadReply.Status == 3 {
  237. // 上传过了,直接标记本地的 is_send 字段为 true
  238. notUploadedVideoSubInfos[0].IsSend = true
  239. dao.GetDb().Save(&notUploadedVideoSubInfos[0])
  240. ch.log.Infoln("Subtitle has been uploaded, so will not upload again")
  241. return
  242. } else if askForUploadReply.Status == 4 {
  243. // 上传队列满了,等待下次定时器触发
  244. ch.log.Infoln("Subtitle upload queue is full, will try ask upload again")
  245. return
  246. } else if askForUploadReply.Status == 2 {
  247. // 这个上传任务已经在队列中了,也许有其他人也需要上传这个字幕,或者本机排队的时候故障了,重启也可能遇到这个故障
  248. ch.log.Infoln("Subtitle is int the queue")
  249. return
  250. } else if askForUploadReply.Status == 1 {
  251. // 正确放入了队列,然后需要按规划的时间进行上传操作
  252. // 这里可能需要执行耗时操作来等待到安排的时间点进行字幕的上传,不能直接长时间的 Sleep 操作
  253. // 每次 Sleep 1s 然后就判断一次定时器是否还允许允许,如果不运行了,那么也就需要退出循环
  254. // 得到目标时间与当前时间的差值,单位是s
  255. waitTime := askForUploadReply.ScheduledUnixTime - time.Now().Unix()
  256. if waitTime <= 0 {
  257. waitTime = 5
  258. }
  259. var sleepCounter int64
  260. sleepCounter = 0
  261. normalStatus := false
  262. for ch.cronHelperRunning == true {
  263. if sleepCounter > waitTime {
  264. normalStatus = true
  265. break
  266. }
  267. time.Sleep(1 * time.Second)
  268. sleepCounter++
  269. }
  270. if normalStatus == false || ch.cronHelperRunning == false {
  271. // 说明不是正常跳出来的,是结束定时器来执行的
  272. ch.log.Infoln("uploadPlayedVideoSub early termination")
  273. return
  274. }
  275. // 发送字幕
  276. } else {
  277. // 不是预期的返回值,需要报警
  278. ch.log.Errorln(fmt.Errorf("AskFroUpload Not the expected return value, Status: %d, Message: %v", askForUploadReply.Status, askForUploadReply.Message))
  279. return
  280. }
  281. }
  282. func (ch *CronHelper) CronHelperRunning() bool {
  283. defer func() {
  284. ch.cronLock.Unlock()
  285. }()
  286. ch.cronLock.Lock()
  287. return ch.cronHelperRunning
  288. }
  289. func (ch *CronHelper) CronHelperStopping() bool {
  290. defer func() {
  291. ch.cronLock.Unlock()
  292. }()
  293. ch.cronLock.Lock()
  294. return ch.stopping
  295. }
  296. func (ch *CronHelper) CronRunningStatusString() string {
  297. if ch.CronHelperRunning() == true {
  298. if ch.CronHelperStopping() == true {
  299. return Stopping
  300. }
  301. return Running
  302. } else {
  303. return Stopped
  304. }
  305. }
  306. // scanVideoProcessAdd2DownloadQueue 定时执行的视频扫描任务,提交给任务队列,然后由额外的下载者线程去取队列中的任务下载
  307. func (ch *CronHelper) scanVideoProcessAdd2DownloadQueue() {
  308. // 扫描字幕任务开始,先是扫描阶段,那么是拿不到有多少视频需要扫描的数量的
  309. ch.log.Infoln("scanVideoProcessAdd2DownloadQueue Start:", time.Now().Format("2006-01-02 15:04:05"))
  310. // ----------------------------------------------------------------------------------------
  311. // ----------------------------------------------------------------------------------------
  312. // 扫描有那些视频需要下载字幕,放入队列中,然后会有下载者去这个队列取出来进行下载
  313. err := ch.videoScanAndRefreshHelper.Start()
  314. if err != nil {
  315. ch.log.Errorln(err)
  316. return
  317. }
  318. }
  319. const (
  320. Stopped = "stopped"
  321. Running = "running"
  322. Stopping = "stopping"
  323. )