task_queue.go 11 KB


  1. package task_queue
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/allanpk716/ChineseSubFinder/internal/pkg/settings"
  6. "github.com/allanpk716/ChineseSubFinder/internal/types/task_queue"
  7. taskQueue2 "github.com/allanpk716/ChineseSubFinder/internal/types/task_queue"
  8. "github.com/dgraph-io/badger/v3"
  9. "github.com/emirpasic/gods/maps/treemap"
  10. "github.com/sirupsen/logrus"
  11. "sync"
  12. "time"
  13. )
  14. type TaskQueue struct {
  15. queueName string
  16. settings *settings.Settings
  17. log *logrus.Logger
  18. taskPriorityMapList []*treemap.Map
  19. taskKeyMap *treemap.Map
  20. queueLock sync.Mutex // 公用这个锁
  21. }
  22. func NewTaskQueue(queueName string, settings *settings.Settings, log *logrus.Logger) *TaskQueue {
  23. tq := &TaskQueue{queueName: queueName, settings: settings, log: log,
  24. taskPriorityMapList: make([]*treemap.Map, 0),
  25. taskKeyMap: treemap.NewWithStringComparator(),
  26. }
  27. for i := 0; i <= taskPriorityCount; i++ {
  28. tq.taskPriorityMapList = append(tq.taskPriorityMapList, treemap.NewWithStringComparator())
  29. }
  30. tq.read()
  31. return tq
  32. }
  33. func (t *TaskQueue) QueueName() string {
  34. return t.queueName
  35. }
  36. func (t *TaskQueue) Clear() error {
  37. defer t.queueLock.Unlock()
  38. t.queueLock.Lock()
  39. err := GetDb().Update(
  40. func(tx *badger.Txn) error {
  41. var err error
  42. for i := 0; i <= taskPriorityCount; i++ {
  43. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  44. fmt.Sprintf("%s_%d", t.queueName, i)))
  45. // 因为已经查询了一次,确保一定存在,所以直接更新+1,TTL 多加 5s 确保今天过去,暂时去除 TTL uint32(restOfDaySecond.Seconds())+5
  46. if err = tx.Delete(key); err != nil {
  47. return err
  48. }
  49. }
  50. return nil
  51. })
  52. if err != nil {
  53. return err
  54. }
  55. for i := 0; i <= taskPriorityCount; i++ {
  56. t.taskPriorityMapList[i].Clear()
  57. }
  58. t.taskKeyMap.Clear()
  59. return nil
  60. }
  61. // Size 队列的长度,对外暴露,有锁
  62. func (t *TaskQueue) Size() int {
  63. defer t.queueLock.Unlock()
  64. t.queueLock.Lock()
  65. return t.taskKeyMap.Size()
  66. }
  67. func (t *TaskQueue) checkPriority(oneJob taskQueue2.OneJob) taskQueue2.OneJob {
  68. if oneJob.TaskPriority > taskPriorityCount {
  69. oneJob.TaskPriority = taskPriorityCount
  70. }
  71. if oneJob.TaskPriority < 0 {
  72. oneJob.TaskPriority = 0
  73. }
  74. return oneJob
  75. }
  76. func (t *TaskQueue) degrade(oneJob taskQueue2.OneJob) taskQueue2.OneJob {
  77. oneJob.TaskPriority -= 1
  78. return t.checkPriority(oneJob)
  79. }
  80. // Add 放入元素,放入的时候会根据 TaskPriority 进行归类,存在的不会新增和更新
  81. func (t *TaskQueue) Add(oneJob task_queue.OneJob) (bool, error) {
  82. defer t.queueLock.Unlock()
  83. t.queueLock.Lock()
  84. if t.isExist(oneJob.Id) == true {
  85. return false, nil
  86. }
  87. // 检查权限范围
  88. oneJob = t.checkPriority(oneJob)
  89. // 插入到统一的 KeyMap
  90. t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
  91. // 分配到具体的优先级 map 中
  92. t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
  93. err := t.save(oneJob.TaskPriority)
  94. if err != nil {
  95. return false, err
  96. }
  97. return true, nil
  98. }
  99. // update 更新素,不存在则会失败,内部用,没有锁
  100. func (t *TaskQueue) update(oneJob task_queue.OneJob) (bool, error) {
  101. if t.isExist(oneJob.Id) == false {
  102. return false, nil
  103. }
  104. // 自动更新时间
  105. oneJob.UpdateTime = time.Now()
  106. // 这里需要判断是否有优先级的 Update,如果有就需要把之前缓存的表给更新
  107. // 然后再插入到新的表中
  108. taskPriorityIndex, _ := t.taskKeyMap.Get(oneJob.Id)
  109. // 检查权限范围
  110. oneJob = t.checkPriority(oneJob)
  111. if oneJob.TaskPriority != taskPriorityIndex {
  112. // 优先级修改
  113. // 先删除原有的优先级
  114. t.taskPriorityMapList[taskPriorityIndex.(int)].Remove(oneJob.Id)
  115. err := t.save(taskPriorityIndex.(int))
  116. if err != nil {
  117. return false, err
  118. }
  119. }
  120. // 插入到统一的 KeyMap
  121. t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
  122. // 分配到具体的优先级 map 中
  123. t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
  124. err := t.save(oneJob.TaskPriority)
  125. if err != nil {
  126. return false, err
  127. }
  128. return true, nil
  129. }
  130. // Update 更新素,不存在则会失败
  131. func (t *TaskQueue) Update(oneJob task_queue.OneJob) (bool, error) {
  132. defer t.queueLock.Unlock()
  133. t.queueLock.Lock()
  134. return t.update(oneJob)
  135. }
  136. // AutoDetectUpdateJobStatus 根据任务的生命周期图,进行自动判断更新,见《任务的生命周期》流程图
  137. func (t *TaskQueue) AutoDetectUpdateJobStatus(oneJob task_queue.OneJob, inErr error) {
  138. defer t.queueLock.Unlock()
  139. t.queueLock.Lock()
  140. // 检查权限范围
  141. oneJob = t.checkPriority(oneJob)
  142. if inErr == nil {
  143. // 没有错误就是完成
  144. oneJob.JobStatus = taskQueue2.Done
  145. oneJob.DownloadTimes += 1
  146. } else {
  147. // 超过了时间限制,默认是 90 天, A.Before(B) : A < B == true
  148. if oneJob.AddedTime.AddDate(0, 0, t.settings.AdvancedSettings.TaskQueue.ExpirationTime).Before(time.Now()) == true {
  149. // 超过 90 天了
  150. oneJob.JobStatus = taskQueue2.Failed
  151. } else {
  152. // 还在 90 天内
  153. // 是否是首次,那么就看它的 Level 是否是在 5,然后 retry == 0
  154. if oneJob.TaskPriority == DefaultTaskPriorityLevel && oneJob.RetryTimes == 0 {
  155. // 需要重置到 L6
  156. oneJob.RetryTimes = 0
  157. oneJob.TaskPriority = FirstRetryTaskPriorityLevel
  158. } else {
  159. if oneJob.RetryTimes > t.settings.AdvancedSettings.TaskQueue.MaxRetryTimes {
  160. // 超过重试次数会进行一次降级,然后重置这个次数
  161. oneJob.RetryTimes = 0
  162. oneJob = t.degrade(oneJob)
  163. }
  164. }
  165. // 强制为 waiting
  166. oneJob.JobStatus = taskQueue2.Waiting
  167. }
  168. // 传入的错误需要放进来
  169. oneJob.ErrorInfo = inErr.Error()
  170. oneJob.DownloadTimes += 1
  171. }
  172. // 这里不要用错了,要用无锁的,不然会阻塞
  173. bok, err := t.update(oneJob)
  174. if err != nil {
  175. t.log.Errorln("AutoDetectUpdateJobStatus", oneJob.VideoFPath, err)
  176. return
  177. }
  178. if bok == false {
  179. t.log.Warningln("AutoDetectUpdateJobStatus ==", oneJob.VideoFPath, "Job.ID", oneJob.Id, "Not Found")
  180. return
  181. }
  182. }
  183. // GetOneWaitingJob 获取一个元素,按优先级,0 - taskPriorityCount 的级别去拿去任务,不会移除任务
  184. func (t *TaskQueue) GetOneWaitingJob() (bool, task_queue.OneJob, error) {
  185. defer t.queueLock.Unlock()
  186. t.queueLock.Lock()
  187. // 如果队列里面没有东西,则返回 false
  188. if t.isEmpty() == true {
  189. return false, task_queue.OneJob{}, nil
  190. }
  191. found := false
  192. tOneJob := task_queue.OneJob{}
  193. for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
  194. t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
  195. tOneJob = value.(task_queue.OneJob)
  196. // 任务的 UpdateTime 与现在的时间大于单个字幕下载的间隔
  197. // 默认是 12h, A.After(B) : A > B == true
  198. // 见《任务队列设计》--以优先级顺序取出描述
  199. if tOneJob.JobStatus == task_queue.Waiting && (tOneJob.DownloadTimes == 0 ||
  200. tOneJob.UpdateTime.AddDate(0, 0, t.settings.AdvancedSettings.TaskQueue.OneSubDownloadInterval).After(time.Now()) == true && tOneJob.DownloadTimes > 0) {
  201. // 找到就返回
  202. found = true
  203. return
  204. }
  205. })
  206. if found == true {
  207. return true, tOneJob, nil
  208. }
  209. }
  210. return false, tOneJob, nil
  211. }
  212. func (t *TaskQueue) GetJobsByStatus(status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
  213. defer t.queueLock.Unlock()
  214. t.queueLock.Lock()
  215. outOneJobs := make([]task_queue.OneJob, 0)
  216. // 如果队列里面没有东西,则返回 false
  217. if t.isEmpty() == true {
  218. return false, nil, nil
  219. }
  220. for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
  221. t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
  222. tOneJob := task_queue.OneJob{}
  223. tOneJob = value.(task_queue.OneJob)
  224. if tOneJob.JobStatus == status {
  225. // 找到加入列表
  226. outOneJobs = append(outOneJobs, tOneJob)
  227. }
  228. })
  229. }
  230. return true, outOneJobs, nil
  231. }
  232. // GetJobsByPriorityAndStatus 根据任务优先级和状态获取任务列表
  233. func (t *TaskQueue) GetJobsByPriorityAndStatus(taskPriority int, status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
  234. defer t.queueLock.Unlock()
  235. t.queueLock.Lock()
  236. outOneJobs := make([]task_queue.OneJob, 0)
  237. // 如果队列里面没有东西,则返回 false
  238. if t.isEmpty() == true {
  239. return false, nil, nil
  240. }
  241. t.taskPriorityMapList[taskPriority].Each(func(key interface{}, value interface{}) {
  242. tOneJob := task_queue.OneJob{}
  243. tOneJob = value.(task_queue.OneJob)
  244. if tOneJob.JobStatus == status {
  245. // 找到加入列表
  246. outOneJobs = append(outOneJobs, tOneJob)
  247. }
  248. })
  249. return true, outOneJobs, nil
  250. }
  251. // Del 删除一个元素
  252. func (t *TaskQueue) Del(jobId string) (bool, error) {
  253. defer t.queueLock.Unlock()
  254. t.queueLock.Lock()
  255. if t.isExist(jobId) == false {
  256. return false, nil
  257. }
  258. taskPriority, bok := t.taskKeyMap.Get(jobId)
  259. if bok == false {
  260. return false, nil
  261. }
  262. t.taskKeyMap.Remove(jobId)
  263. t.taskPriorityMapList[taskPriority.(int)].Remove(jobId)
  264. err := t.save(taskPriority.(int))
  265. if err != nil {
  266. return false, err
  267. }
  268. return true, nil
  269. }
  270. func (t *TaskQueue) read() {
  271. err := GetDb().View(
  272. func(tx *badger.Txn) error {
  273. var err error
  274. for i := 0; i <= taskPriorityCount; i++ {
  275. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  276. fmt.Sprintf("%s_%d", t.queueName, i)))
  277. var item *badger.Item
  278. item, err = tx.Get(key)
  279. if err != nil {
  280. if IsErrOk(err) == true {
  281. return nil
  282. }
  283. return err
  284. }
  285. valCopy, err := item.ValueCopy(nil)
  286. if err != nil {
  287. return err
  288. }
  289. err = t.taskPriorityMapList[i].FromJSON(valCopy)
  290. if err != nil {
  291. return err
  292. }
  293. }
  294. return nil
  295. })
  296. if err != nil {
  297. t.log.Panicln(err)
  298. }
  299. // 需要把几个优先级的map中的key汇总
  300. for i := 0; i < taskPriorityCount; i++ {
  301. t.taskPriorityMapList[i].Each(func(key interface{}, value interface{}) {
  302. t.taskKeyMap.Put(key, i)
  303. })
  304. }
  305. }
  306. // save 需要把改变的数据保持到 K/V 数据库中,这个没有锁,所以需要在 Sync 中使用,不对外开放
  307. func (t *TaskQueue) save(taskPriority int) error {
  308. err := GetDb().Update(
  309. func(tx *badger.Txn) error {
  310. var err error
  311. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  312. fmt.Sprintf("%s_%d", t.queueName, taskPriority)))
  313. if err != nil {
  314. return err
  315. }
  316. b, err := t.taskPriorityMapList[taskPriority].ToJSON()
  317. if err != nil {
  318. return err
  319. }
  320. e := badger.NewEntry(key, b)
  321. err = tx.SetEntry(e)
  322. if err != nil {
  323. return err
  324. }
  325. return nil
  326. })
  327. if err != nil {
  328. return err
  329. }
  330. return nil
  331. }
  332. // isExist 是否已经存在,对内,无锁
  333. func (t *TaskQueue) isExist(jobID string) bool {
  334. _, bok := t.taskKeyMap.Get(jobID)
  335. return bok
  336. }
  337. // IsExist 是否已经存在,对外,有锁
  338. func (t *TaskQueue) IsExist(jobID string) bool {
  339. defer t.queueLock.Unlock()
  340. t.queueLock.Lock()
  341. _, bok := t.taskKeyMap.Get(jobID)
  342. return bok
  343. }
  344. // isEmpty 对内,无锁
  345. func (t *TaskQueue) isEmpty() bool {
  346. return t.taskKeyMap.Empty()
  347. }
  348. const (
  349. taskPriorityCount = 10
  350. DefaultTaskPriorityLevel = 5
  351. FirstRetryTaskPriorityLevel = 6
  352. )
  353. var (
  354. ErrNotSubFound = errors.New("Not Sub Found")
  355. )