task_queue.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package task_queue
  2. import (
  3. "fmt"
  4. "github.com/allanpk716/ChineseSubFinder/internal/pkg/settings"
  5. "github.com/allanpk716/ChineseSubFinder/internal/types/task_queue"
  6. "github.com/dgraph-io/badger/v3"
  7. "github.com/emirpasic/gods/maps/treemap"
  8. "github.com/sirupsen/logrus"
  9. "sync"
  10. "time"
  11. )
  12. type TaskQueue struct {
  13. queueName string
  14. settings *settings.Settings
  15. log *logrus.Logger
  16. taskPriorityMapList []*treemap.Map
  17. taskKeyMap *treemap.Map
  18. queueLock sync.Mutex // 公用这个锁
  19. }
  20. func NewTaskQueue(queueName string, settings *settings.Settings, log *logrus.Logger) *TaskQueue {
  21. tq := &TaskQueue{queueName: queueName, settings: settings, log: log,
  22. taskPriorityMapList: make([]*treemap.Map, 0),
  23. taskKeyMap: treemap.NewWithStringComparator(),
  24. }
  25. for i := 0; i <= taskPriorityCount; i++ {
  26. tq.taskPriorityMapList = append(tq.taskPriorityMapList, treemap.NewWithStringComparator())
  27. }
  28. tq.read()
  29. return tq
  30. }
  31. func (t *TaskQueue) QueueName() string {
  32. return t.queueName
  33. }
  34. func (t *TaskQueue) Clear() error {
  35. defer t.queueLock.Unlock()
  36. t.queueLock.Lock()
  37. err := GetDb().Update(
  38. func(tx *badger.Txn) error {
  39. var err error
  40. for i := 0; i <= taskPriorityCount; i++ {
  41. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  42. fmt.Sprintf("%s_%d", t.queueName, i)))
  43. // 因为已经查询了一次,确保一定存在,所以直接更新+1,TTL 多加 5s 确保今天过去,暂时去除 TTL uint32(restOfDaySecond.Seconds())+5
  44. if err = tx.Delete(key); err != nil {
  45. return err
  46. }
  47. }
  48. return nil
  49. })
  50. if err != nil {
  51. return err
  52. }
  53. for i := 0; i <= taskPriorityCount; i++ {
  54. t.taskPriorityMapList[i].Clear()
  55. }
  56. t.taskKeyMap.Clear()
  57. return nil
  58. }
  59. // Size 队列的长度,对外暴露,有锁
  60. func (t *TaskQueue) Size() int {
  61. defer t.queueLock.Unlock()
  62. t.queueLock.Lock()
  63. return t.taskKeyMap.Size()
  64. }
  65. // Add 放入元素,放入的时候会根据 TaskPriority 进行归类,存在的不会新增和更新
  66. func (t *TaskQueue) Add(oneJob task_queue.OneJob) (bool, error) {
  67. defer t.queueLock.Unlock()
  68. t.queueLock.Lock()
  69. if t.isExist(oneJob.Id) == true {
  70. return false, nil
  71. }
  72. // 插入到统一的 KeyMap
  73. t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
  74. // 分配到具体的优先级 map 中
  75. t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
  76. err := t.save(oneJob.TaskPriority)
  77. if err != nil {
  78. return false, err
  79. }
  80. return true, nil
  81. }
  82. // Update 更新素,不存在则会失败
  83. func (t *TaskQueue) Update(oneJob task_queue.OneJob) (bool, error) {
  84. defer t.queueLock.Unlock()
  85. t.queueLock.Lock()
  86. if t.isExist(oneJob.Id) == false {
  87. return false, nil
  88. }
  89. // 自动更新时间
  90. oneJob.UpdateTime = time.Now()
  91. // 这里需要判断是否有优先级的 Update,如果有就需要把之前缓存的表给更新
  92. // 然后再插入到新的表中
  93. taskPriorityIndex, _ := t.taskKeyMap.Get(oneJob.Id)
  94. if oneJob.TaskPriority != taskPriorityIndex {
  95. // 优先级修改
  96. // 先删除原有的优先级
  97. t.taskPriorityMapList[taskPriorityIndex.(int)].Remove(oneJob.Id)
  98. err := t.save(taskPriorityIndex.(int))
  99. if err != nil {
  100. return false, err
  101. }
  102. }
  103. // 插入到统一的 KeyMap
  104. t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
  105. // 分配到具体的优先级 map 中
  106. t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
  107. err := t.save(oneJob.TaskPriority)
  108. if err != nil {
  109. return false, err
  110. }
  111. return true, nil
  112. }
  113. // GetOneWaiting 获取一个元素,按优先级,0 - taskPriorityCount 的级别去拿去任务,不会移除任务
  114. func (t *TaskQueue) GetOneWaiting() (bool, task_queue.OneJob, error) {
  115. defer t.queueLock.Unlock()
  116. t.queueLock.Lock()
  117. // 如果队列里面没有东西,则返回 false
  118. if t.isEmpty() == true {
  119. return false, task_queue.OneJob{}, nil
  120. }
  121. found := false
  122. tOneJob := task_queue.OneJob{}
  123. for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
  124. t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
  125. tOneJob = value.(task_queue.OneJob)
  126. if tOneJob.JobStatus == task_queue.Waiting {
  127. // 找到就返回
  128. found = true
  129. return
  130. }
  131. })
  132. if found == true {
  133. break
  134. }
  135. }
  136. return true, tOneJob, nil
  137. }
  138. func (t *TaskQueue) Get(status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
  139. outOneJobs := make([]task_queue.OneJob, 0)
  140. // 如果队列里面没有东西,则返回 false
  141. if t.isEmpty() == true {
  142. return false, nil, nil
  143. }
  144. for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
  145. t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
  146. tOneJob := task_queue.OneJob{}
  147. tOneJob = value.(task_queue.OneJob)
  148. if tOneJob.JobStatus == status {
  149. // 找到加入列表
  150. outOneJobs = append(outOneJobs, tOneJob)
  151. }
  152. })
  153. }
  154. return true, outOneJobs, nil
  155. }
  156. func (t *TaskQueue) GetTaskPriority(taskPriority int, status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
  157. outOneJobs := make([]task_queue.OneJob, 0)
  158. // 如果队列里面没有东西,则返回 false
  159. if t.isEmpty() == true {
  160. return false, nil, nil
  161. }
  162. t.taskPriorityMapList[taskPriority].Each(func(key interface{}, value interface{}) {
  163. tOneJob := task_queue.OneJob{}
  164. tOneJob = value.(task_queue.OneJob)
  165. if tOneJob.JobStatus == status {
  166. // 找到加入列表
  167. outOneJobs = append(outOneJobs, tOneJob)
  168. }
  169. })
  170. return true, outOneJobs, nil
  171. }
  172. // Del 删除一个元素
  173. func (t *TaskQueue) Del(jobId string) (bool, error) {
  174. defer t.queueLock.Unlock()
  175. t.queueLock.Lock()
  176. if t.isExist(jobId) == false {
  177. return false, nil
  178. }
  179. taskPriority, bok := t.taskKeyMap.Get(jobId)
  180. if bok == false {
  181. return false, nil
  182. }
  183. t.taskKeyMap.Remove(jobId)
  184. t.taskPriorityMapList[taskPriority.(int)].Remove(jobId)
  185. err := t.save(taskPriority.(int))
  186. if err != nil {
  187. return false, err
  188. }
  189. return true, nil
  190. }
  191. func (t *TaskQueue) read() {
  192. err := GetDb().View(
  193. func(tx *badger.Txn) error {
  194. var err error
  195. for i := 0; i <= taskPriorityCount; i++ {
  196. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  197. fmt.Sprintf("%s_%d", t.queueName, i)))
  198. var item *badger.Item
  199. item, err = tx.Get(key)
  200. if err != nil {
  201. if IsErrOk(err) == true {
  202. return nil
  203. }
  204. return err
  205. }
  206. valCopy, err := item.ValueCopy(nil)
  207. if err != nil {
  208. return err
  209. }
  210. err = t.taskPriorityMapList[i].FromJSON(valCopy)
  211. if err != nil {
  212. return err
  213. }
  214. }
  215. return nil
  216. })
  217. if err != nil {
  218. t.log.Panicln(err)
  219. }
  220. // 需要把几个优先级的map中的key汇总
  221. for i := 0; i < taskPriorityCount; i++ {
  222. t.taskPriorityMapList[i].Each(func(key interface{}, value interface{}) {
  223. t.taskKeyMap.Put(key, i)
  224. })
  225. }
  226. }
  227. // save 需要把改变的数据保持到 K/V 数据库中,这个没有锁,所以需要在 Sync 中使用,不对外开放
  228. func (t *TaskQueue) save(taskPriority int) error {
  229. err := GetDb().Update(
  230. func(tx *badger.Txn) error {
  231. var err error
  232. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  233. fmt.Sprintf("%s_%d", t.queueName, taskPriority)))
  234. if err != nil {
  235. return err
  236. }
  237. b, err := t.taskPriorityMapList[taskPriority].ToJSON()
  238. if err != nil {
  239. return err
  240. }
  241. e := badger.NewEntry(key, b)
  242. err = tx.SetEntry(e)
  243. if err != nil {
  244. return err
  245. }
  246. return nil
  247. })
  248. if err != nil {
  249. return err
  250. }
  251. return nil
  252. }
  253. // isExist 是否已经存在
  254. func (t *TaskQueue) isExist(jobID string) bool {
  255. _, bok := t.taskKeyMap.Get(jobID)
  256. return bok
  257. }
  258. func (t *TaskQueue) isEmpty() bool {
  259. return t.taskKeyMap.Empty()
  260. }
  261. const taskPriorityCount = 10