task_queue.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package task_queue
  2. import (
  3. "fmt"
  4. "github.com/allanpk716/ChineseSubFinder/internal/pkg/settings"
  5. "github.com/allanpk716/ChineseSubFinder/internal/types/task_queue"
  6. "github.com/dgraph-io/badger/v3"
  7. "github.com/emirpasic/gods/maps/treemap"
  8. "github.com/sirupsen/logrus"
  9. "sync"
  10. "time"
  11. )
  12. type TaskQueue struct {
  13. queueName string
  14. settings *settings.Settings
  15. log *logrus.Logger
  16. taskPriorityMapList []*treemap.Map
  17. taskKeyMap *treemap.Map
  18. queueLock sync.Mutex // 公用这个锁
  19. }
  20. func NewTaskQueue(queueName string, settings *settings.Settings, log *logrus.Logger) *TaskQueue {
  21. tq := &TaskQueue{queueName: queueName, settings: settings, log: log,
  22. taskPriorityMapList: make([]*treemap.Map, 0),
  23. taskKeyMap: treemap.NewWithStringComparator(),
  24. }
  25. for i := 0; i <= taskPriorityCount; i++ {
  26. tq.taskPriorityMapList = append(tq.taskPriorityMapList, treemap.NewWithStringComparator())
  27. }
  28. tq.read()
  29. return tq
  30. }
  31. func (t *TaskQueue) QueueName() string {
  32. return t.queueName
  33. }
  34. func (t *TaskQueue) Clear() error {
  35. defer t.queueLock.Unlock()
  36. t.queueLock.Lock()
  37. err := GetDb().Update(
  38. func(tx *badger.Txn) error {
  39. var err error
  40. for i := 0; i <= taskPriorityCount; i++ {
  41. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  42. fmt.Sprintf("%s_%d", t.queueName, i)))
  43. // 因为已经查询了一次,确保一定存在,所以直接更新+1,TTL 多加 5s 确保今天过去,暂时去除 TTL uint32(restOfDaySecond.Seconds())+5
  44. if err = tx.Delete(key); err != nil {
  45. return err
  46. }
  47. }
  48. return nil
  49. })
  50. if err != nil {
  51. return err
  52. }
  53. for i := 0; i <= taskPriorityCount; i++ {
  54. t.taskPriorityMapList[i].Clear()
  55. }
  56. t.taskKeyMap.Clear()
  57. return nil
  58. }
  59. // Size 队列的长度,对外暴露,有锁
  60. func (t *TaskQueue) Size() int {
  61. defer t.queueLock.Unlock()
  62. t.queueLock.Lock()
  63. return t.taskKeyMap.Size()
  64. }
  65. // Add 放入元素,放入的时候会根据 TaskPriority 进行归类,存在的不会新增和更新
  66. func (t *TaskQueue) Add(oneJob task_queue.OneJob) (bool, error) {
  67. defer t.queueLock.Unlock()
  68. t.queueLock.Lock()
  69. if t.isExist(oneJob.Id) == true {
  70. return false, nil
  71. }
  72. // 插入到统一的 KeyMap
  73. t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
  74. // 分配到具体的优先级 map 中
  75. t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
  76. err := t.save(oneJob.TaskPriority)
  77. if err != nil {
  78. return false, err
  79. }
  80. return true, nil
  81. }
  82. // Update 更新素,不存在则会失败
  83. func (t *TaskQueue) Update(oneJob task_queue.OneJob) (bool, error) {
  84. defer t.queueLock.Unlock()
  85. t.queueLock.Lock()
  86. if t.isExist(oneJob.Id) == false {
  87. return false, nil
  88. }
  89. // 自动更新时间
  90. oneJob.UpdateTime = time.Now()
  91. // 这里需要判断是否有优先级的 Update,如果有就需要把之前缓存的表给更新
  92. // 然后再插入到新的表中
  93. taskPriorityIndex, _ := t.taskKeyMap.Get(oneJob.Id)
  94. if oneJob.TaskPriority != taskPriorityIndex {
  95. // 优先级修改
  96. // 先删除原有的优先级
  97. t.taskPriorityMapList[taskPriorityIndex.(int)].Remove(oneJob.Id)
  98. err := t.save(taskPriorityIndex.(int))
  99. if err != nil {
  100. return false, err
  101. }
  102. }
  103. // 插入到统一的 KeyMap
  104. t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
  105. // 分配到具体的优先级 map 中
  106. t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
  107. err := t.save(oneJob.TaskPriority)
  108. if err != nil {
  109. return false, err
  110. }
  111. return true, nil
  112. }
  113. // GetOneWaiting 获取一个元素,按优先级,0 - taskPriorityCount 的级别去拿去任务,不会移除任务
  114. func (t *TaskQueue) GetOneWaiting() (bool, task_queue.OneJob, error) {
  115. defer t.queueLock.Unlock()
  116. t.queueLock.Lock()
  117. // 如果队列里面没有东西,则返回 false
  118. if t.isEmpty() == true {
  119. return false, task_queue.OneJob{}, nil
  120. }
  121. found := false
  122. tOneJob := task_queue.OneJob{}
  123. for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
  124. t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
  125. tOneJob = value.(task_queue.OneJob)
  126. if tOneJob.JobStatus == task_queue.Waiting {
  127. // 找到就返回
  128. found = true
  129. return
  130. }
  131. })
  132. if found == true {
  133. break
  134. }
  135. }
  136. return true, tOneJob, nil
  137. }
  138. func (t *TaskQueue) Get(status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
  139. defer t.queueLock.Unlock()
  140. t.queueLock.Lock()
  141. outOneJobs := make([]task_queue.OneJob, 0)
  142. // 如果队列里面没有东西,则返回 false
  143. if t.isEmpty() == true {
  144. return false, nil, nil
  145. }
  146. for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
  147. t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
  148. tOneJob := task_queue.OneJob{}
  149. tOneJob = value.(task_queue.OneJob)
  150. if tOneJob.JobStatus == status {
  151. // 找到加入列表
  152. outOneJobs = append(outOneJobs, tOneJob)
  153. }
  154. })
  155. }
  156. return true, outOneJobs, nil
  157. }
  158. func (t *TaskQueue) GetTaskPriority(taskPriority int, status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
  159. defer t.queueLock.Unlock()
  160. t.queueLock.Lock()
  161. outOneJobs := make([]task_queue.OneJob, 0)
  162. // 如果队列里面没有东西,则返回 false
  163. if t.isEmpty() == true {
  164. return false, nil, nil
  165. }
  166. t.taskPriorityMapList[taskPriority].Each(func(key interface{}, value interface{}) {
  167. tOneJob := task_queue.OneJob{}
  168. tOneJob = value.(task_queue.OneJob)
  169. if tOneJob.JobStatus == status {
  170. // 找到加入列表
  171. outOneJobs = append(outOneJobs, tOneJob)
  172. }
  173. })
  174. return true, outOneJobs, nil
  175. }
  176. // Del 删除一个元素
  177. func (t *TaskQueue) Del(jobId string) (bool, error) {
  178. defer t.queueLock.Unlock()
  179. t.queueLock.Lock()
  180. if t.isExist(jobId) == false {
  181. return false, nil
  182. }
  183. taskPriority, bok := t.taskKeyMap.Get(jobId)
  184. if bok == false {
  185. return false, nil
  186. }
  187. t.taskKeyMap.Remove(jobId)
  188. t.taskPriorityMapList[taskPriority.(int)].Remove(jobId)
  189. err := t.save(taskPriority.(int))
  190. if err != nil {
  191. return false, err
  192. }
  193. return true, nil
  194. }
  195. func (t *TaskQueue) read() {
  196. err := GetDb().View(
  197. func(tx *badger.Txn) error {
  198. var err error
  199. for i := 0; i <= taskPriorityCount; i++ {
  200. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  201. fmt.Sprintf("%s_%d", t.queueName, i)))
  202. var item *badger.Item
  203. item, err = tx.Get(key)
  204. if err != nil {
  205. if IsErrOk(err) == true {
  206. return nil
  207. }
  208. return err
  209. }
  210. valCopy, err := item.ValueCopy(nil)
  211. if err != nil {
  212. return err
  213. }
  214. err = t.taskPriorityMapList[i].FromJSON(valCopy)
  215. if err != nil {
  216. return err
  217. }
  218. }
  219. return nil
  220. })
  221. if err != nil {
  222. t.log.Panicln(err)
  223. }
  224. // 需要把几个优先级的map中的key汇总
  225. for i := 0; i < taskPriorityCount; i++ {
  226. t.taskPriorityMapList[i].Each(func(key interface{}, value interface{}) {
  227. t.taskKeyMap.Put(key, i)
  228. })
  229. }
  230. }
  231. // save 需要把改变的数据保持到 K/V 数据库中,这个没有锁,所以需要在 Sync 中使用,不对外开放
  232. func (t *TaskQueue) save(taskPriority int) error {
  233. err := GetDb().Update(
  234. func(tx *badger.Txn) error {
  235. var err error
  236. key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
  237. fmt.Sprintf("%s_%d", t.queueName, taskPriority)))
  238. if err != nil {
  239. return err
  240. }
  241. b, err := t.taskPriorityMapList[taskPriority].ToJSON()
  242. if err != nil {
  243. return err
  244. }
  245. e := badger.NewEntry(key, b)
  246. err = tx.SetEntry(e)
  247. if err != nil {
  248. return err
  249. }
  250. return nil
  251. })
  252. if err != nil {
  253. return err
  254. }
  255. return nil
  256. }
  257. // isExist 是否已经存在
  258. func (t *TaskQueue) isExist(jobID string) bool {
  259. _, bok := t.taskKeyMap.Get(jobID)
  260. return bok
  261. }
  262. func (t *TaskQueue) isEmpty() bool {
  263. return t.taskKeyMap.Empty()
  264. }
  265. const taskPriorityCount = 10