|
@@ -1,28 +1,34 @@
|
|
|
package task_queue
|
|
|
|
|
|
import (
|
|
|
- "encoding/json"
|
|
|
+ "fmt"
|
|
|
"github.com/allanpk716/ChineseSubFinder/internal/pkg/settings"
|
|
|
"github.com/allanpk716/ChineseSubFinder/internal/types/task_queue"
|
|
|
"github.com/dgraph-io/badger/v3"
|
|
|
- dll "github.com/emirpasic/gods/lists/doublylinkedlist"
|
|
|
+ "github.com/emirpasic/gods/maps/treemap"
|
|
|
"github.com/sirupsen/logrus"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
type TaskQueue struct {
|
|
|
- queueName string
|
|
|
- settings *settings.Settings
|
|
|
- log *logrus.Logger
|
|
|
- doubleList *dll.List
|
|
|
- doubleListLock sync.Mutex
|
|
|
- lockList bool
|
|
|
- lockListLock sync.Mutex
|
|
|
+ queueName string
|
|
|
+ settings *settings.Settings
|
|
|
+ log *logrus.Logger
|
|
|
+ taskPriorityMapList []*treemap.Map
|
|
|
+ taskKeyMap *treemap.Map
|
|
|
+ queueLock sync.Mutex // 公用这个锁
|
|
|
}
|
|
|
|
|
|
func NewTaskQueue(queueName string, settings *settings.Settings, log *logrus.Logger) *TaskQueue {
|
|
|
|
|
|
- tq := &TaskQueue{queueName: queueName, settings: settings, log: log, doubleList: dll.New()}
|
|
|
+ tq := &TaskQueue{queueName: queueName, settings: settings, log: log,
|
|
|
+ taskPriorityMapList: make([]*treemap.Map, 0),
|
|
|
+ taskKeyMap: treemap.NewWithStringComparator(),
|
|
|
+ }
|
|
|
+ for i := 0; i <= taskPriorityCount; i++ {
|
|
|
+ tq.taskPriorityMapList = append(tq.taskPriorityMapList, treemap.NewWithStringComparator())
|
|
|
+ }
|
|
|
tq.read()
|
|
|
return tq
|
|
|
}
|
|
@@ -33,18 +39,20 @@ func (t *TaskQueue) QueueName() string {
|
|
|
|
|
|
func (t *TaskQueue) Clear() error {
|
|
|
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
-
|
|
|
- t.doubleList.Clear()
|
|
|
+ defer t.queueLock.Unlock()
|
|
|
+ t.queueLock.Lock()
|
|
|
|
|
|
err := GetDb().Update(
|
|
|
func(tx *badger.Txn) error {
|
|
|
var err error
|
|
|
- key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue, t.queueName))
|
|
|
- // 因为已经查询了一次,确保一定存在,所以直接更新+1,TTL 多加 5s 确保今天过去,暂时去除 TTL uint32(restOfDaySecond.Seconds())+5
|
|
|
- if err = tx.Delete(key); err != nil {
|
|
|
- return err
|
|
|
+
|
|
|
+ for i := 0; i <= taskPriorityCount; i++ {
|
|
|
+ key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
|
|
|
+ fmt.Sprintf("%s_%d", t.queueName, i)))
|
|
|
+ // 因为已经查询了一次,确保一定存在,所以直接更新+1,TTL 多加 5s 确保今天过去,暂时去除 TTL uint32(restOfDaySecond.Seconds())+5
|
|
|
+ if err = tx.Delete(key); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
return nil
|
|
|
})
|
|
@@ -52,107 +60,73 @@ func (t *TaskQueue) Clear() error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// isEmpty 这个没有锁,所以需要在 Sync 中使用,不对外开放
|
|
|
-func (t *TaskQueue) isEmpty() bool {
|
|
|
-
|
|
|
- if t.doubleList.Size() > 0 {
|
|
|
- return false
|
|
|
+ for i := 0; i <= taskPriorityCount; i++ {
|
|
|
+ t.taskPriorityMapList[i].Clear()
|
|
|
}
|
|
|
|
|
|
- return true
|
|
|
-}
|
|
|
-
|
|
|
-// IsEmpty 是否队列为空,对外暴露,有锁
|
|
|
-func (t *TaskQueue) IsEmpty() bool {
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
+ t.taskKeyMap.Clear()
|
|
|
|
|
|
- return t.isEmpty()
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
+// Size 队列的长度,对外暴露,有锁
|
|
|
func (t *TaskQueue) Size() int {
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
+ defer t.queueLock.Unlock()
|
|
|
+ t.queueLock.Lock()
|
|
|
|
|
|
- return t.doubleList.Size()
|
|
|
+ return t.taskKeyMap.Size()
|
|
|
}
|
|
|
|
|
|
-func (t *TaskQueue) read() {
|
|
|
+// Add 放入元素,放入的时候会根据 TaskPriority 进行归类,存在的不会新增和更新
|
|
|
+func (t *TaskQueue) Add(oneJob task_queue.OneJob) (bool, error) {
|
|
|
|
|
|
- err := GetDb().View(
|
|
|
- func(tx *badger.Txn) error {
|
|
|
- var err error
|
|
|
+ defer t.queueLock.Unlock()
|
|
|
+ t.queueLock.Lock()
|
|
|
|
|
|
- key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue, t.queueName))
|
|
|
- e, err := tx.Get(key)
|
|
|
- if err != nil {
|
|
|
-
|
|
|
- if IsErrOk(err) == true {
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- return err
|
|
|
- }
|
|
|
- valCopy, err := e.ValueCopy(nil)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- err = json.Unmarshal(valCopy, t.doubleList)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- return nil
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- t.log.Panicln(err)
|
|
|
+ if t.isExist(oneJob.Id) == true {
|
|
|
+ return false, nil
|
|
|
}
|
|
|
-}
|
|
|
-
|
|
|
-// save 需要把改变的数据保持到 K/V 数据库中,这个没有锁,所以需要在 Sync 中使用,不对外开放
|
|
|
-func (t *TaskQueue) save() error {
|
|
|
-
|
|
|
- err := GetDb().Update(
|
|
|
- func(tx *badger.Txn) error {
|
|
|
- var err error
|
|
|
- key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue, t.queueName))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- b, err := json.Marshal(t.doubleList)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- e := badger.NewEntry(key, b)
|
|
|
- err = tx.SetEntry(e)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- return nil
|
|
|
- })
|
|
|
+ // 插入到统一的 KeyMap
|
|
|
+ t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
|
|
|
+ // 分配到具体的优先级 map 中
|
|
|
+ t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
|
|
|
+ err := t.save(oneJob.TaskPriority)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return false, err
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
+ return true, nil
|
|
|
}
|
|
|
|
|
|
-// RPush 向右边放入元素
|
|
|
-func (t *TaskQueue) RPush(oneJob task_queue.OneJob) (bool, error) {
|
|
|
+// Update 更新素,不存在则会失败
|
|
|
+func (t *TaskQueue) Update(oneJob task_queue.OneJob) (bool, error) {
|
|
|
+
|
|
|
+ defer t.queueLock.Unlock()
|
|
|
+ t.queueLock.Lock()
|
|
|
|
|
|
- if t.isLockList() == true {
|
|
|
+ if t.isExist(oneJob.Id) == false {
|
|
|
return false, nil
|
|
|
}
|
|
|
-
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
-
|
|
|
- t.doubleList.Add(oneJob)
|
|
|
-
|
|
|
- err := t.save()
|
|
|
+ // 自动更新时间
|
|
|
+ oneJob.UpdateTime = time.Now()
|
|
|
+
|
|
|
+ // 这里需要判断是否有优先级的 Update,如果有就需要把之前缓存的表给更新
|
|
|
+ // 然后再插入到新的表中
|
|
|
+ taskPriorityIndex, _ := t.taskKeyMap.Get(oneJob.Id)
|
|
|
+ if oneJob.TaskPriority != taskPriorityIndex {
|
|
|
+ // 优先级修改
|
|
|
+ // 先删除原有的优先级
|
|
|
+ t.taskPriorityMapList[taskPriorityIndex.(int)].Remove(oneJob.Id)
|
|
|
+ err := t.save(taskPriorityIndex.(int))
|
|
|
+ if err != nil {
|
|
|
+ return false, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 插入到统一的 KeyMap
|
|
|
+ t.taskKeyMap.Put(oneJob.Id, oneJob.TaskPriority)
|
|
|
+ // 分配到具体的优先级 map 中
|
|
|
+ t.taskPriorityMapList[oneJob.TaskPriority].Put(oneJob.Id, oneJob)
|
|
|
+ err := t.save(oneJob.TaskPriority)
|
|
|
if err != nil {
|
|
|
return false, err
|
|
|
}
|
|
@@ -160,155 +134,189 @@ func (t *TaskQueue) RPush(oneJob task_queue.OneJob) (bool, error) {
|
|
|
return true, nil
|
|
|
}
|
|
|
|
|
|
-// LPush 向左边放入元素
|
|
|
-func (t *TaskQueue) LPush(oneJob task_queue.OneJob) (bool, error) {
|
|
|
+// GetOneWaiting 获取一个元素,按优先级,0 - taskPriorityCount 的级别去拿去任务,不会移除任务
|
|
|
+func (t *TaskQueue) GetOneWaiting() (bool, task_queue.OneJob, error) {
|
|
|
|
|
|
- if t.isLockList() == true {
|
|
|
- return false, nil
|
|
|
+ defer t.queueLock.Unlock()
|
|
|
+ t.queueLock.Lock()
|
|
|
+
|
|
|
+ // 如果队列里面没有东西,则返回 false
|
|
|
+ if t.isEmpty() == true {
|
|
|
+ return false, task_queue.OneJob{}, nil
|
|
|
}
|
|
|
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
+ found := false
|
|
|
+ tOneJob := task_queue.OneJob{}
|
|
|
+ for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
|
|
|
|
|
|
- t.doubleList.Add(0, oneJob)
|
|
|
+ t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
|
|
|
|
|
|
- err := t.save()
|
|
|
- if err != nil {
|
|
|
- return false, err
|
|
|
+ tOneJob = value.(task_queue.OneJob)
|
|
|
+ if tOneJob.JobStatus == task_queue.Waiting {
|
|
|
+ // 找到就返回
|
|
|
+ found = true
|
|
|
+ return
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ if found == true {
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- return true, nil
|
|
|
+ return true, tOneJob, nil
|
|
|
}
|
|
|
|
|
|
-// RPop 从右边取出第一个元素,并移除
|
|
|
-func (t *TaskQueue) RPop() (bool, task_queue.OneJob, error) {
|
|
|
-
|
|
|
- if t.isLockList() == true {
|
|
|
- return false, task_queue.OneJob{}, nil
|
|
|
- }
|
|
|
-
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
+func (t *TaskQueue) Get(status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
|
|
|
|
|
|
+ outOneJobs := make([]task_queue.OneJob, 0)
|
|
|
// 如果队列里面没有东西,则返回 false
|
|
|
if t.isEmpty() == true {
|
|
|
- return false, task_queue.OneJob{}, nil
|
|
|
+ return false, nil, nil
|
|
|
}
|
|
|
|
|
|
- rightLastOneIndex := t.doubleList.Size() - 1
|
|
|
- value, bok := t.doubleList.Get(rightLastOneIndex)
|
|
|
- if bok == false {
|
|
|
- return false, task_queue.OneJob{}, nil
|
|
|
- }
|
|
|
- // 移除最后一个元素
|
|
|
- t.doubleList.Remove(rightLastOneIndex)
|
|
|
+ for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
|
|
|
|
|
|
- err := t.save()
|
|
|
- if err != nil {
|
|
|
- return false, task_queue.OneJob{}, err
|
|
|
+ t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
|
|
|
+
|
|
|
+ tOneJob := task_queue.OneJob{}
|
|
|
+ tOneJob = value.(task_queue.OneJob)
|
|
|
+ if tOneJob.JobStatus == status {
|
|
|
+ // 找到加入列表
|
|
|
+ outOneJobs = append(outOneJobs, tOneJob)
|
|
|
+ }
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
- return true, value.(task_queue.OneJob), nil
|
|
|
+ return true, outOneJobs, nil
|
|
|
}
|
|
|
|
|
|
-// RPeek 获取右边取出第一个元素,不移除
|
|
|
-func (t *TaskQueue) RPeek() (bool, task_queue.OneJob) {
|
|
|
-
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
+func (t *TaskQueue) GetTaskPriority(taskPriority int, status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
|
|
|
|
|
|
+ outOneJobs := make([]task_queue.OneJob, 0)
|
|
|
// 如果队列里面没有东西,则返回 false
|
|
|
if t.isEmpty() == true {
|
|
|
- return false, task_queue.OneJob{}
|
|
|
+ return false, nil, nil
|
|
|
}
|
|
|
|
|
|
- rightLastOneIndex := t.doubleList.Size() - 1
|
|
|
- value, bok := t.doubleList.Get(rightLastOneIndex)
|
|
|
- if bok == false {
|
|
|
- return false, task_queue.OneJob{}
|
|
|
- }
|
|
|
+ t.taskPriorityMapList[taskPriority].Each(func(key interface{}, value interface{}) {
|
|
|
|
|
|
- return true, value.(task_queue.OneJob)
|
|
|
+ tOneJob := task_queue.OneJob{}
|
|
|
+ tOneJob = value.(task_queue.OneJob)
|
|
|
+ if tOneJob.JobStatus == status {
|
|
|
+ // 找到加入列表
|
|
|
+ outOneJobs = append(outOneJobs, tOneJob)
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ return true, outOneJobs, nil
|
|
|
}
|
|
|
|
|
|
-// LPop 向左边取出第一个元素,并移除
|
|
|
-func (t *TaskQueue) LPop() (bool, task_queue.OneJob, error) {
|
|
|
+// Del 删除一个元素
|
|
|
+func (t *TaskQueue) Del(jobId string) (bool, error) {
|
|
|
|
|
|
- if t.isLockList() == true {
|
|
|
- return false, task_queue.OneJob{}, nil
|
|
|
- }
|
|
|
-
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
+ defer t.queueLock.Unlock()
|
|
|
+ t.queueLock.Lock()
|
|
|
|
|
|
- // 如果队列里面没有东西,则返回 false
|
|
|
- if t.isEmpty() == true {
|
|
|
- return false, task_queue.OneJob{}, nil
|
|
|
+ if t.isExist(jobId) == false {
|
|
|
+ return false, nil
|
|
|
}
|
|
|
|
|
|
- leftFistOneIndex := 0
|
|
|
- value, bok := t.doubleList.Get(leftFistOneIndex)
|
|
|
+ taskPriority, bok := t.taskKeyMap.Get(jobId)
|
|
|
if bok == false {
|
|
|
- return false, task_queue.OneJob{}, nil
|
|
|
+ return false, nil
|
|
|
}
|
|
|
- // 移除左边第一个元素
|
|
|
- t.doubleList.Remove(leftFistOneIndex)
|
|
|
+ t.taskKeyMap.Remove(jobId)
|
|
|
+ t.taskPriorityMapList[taskPriority.(int)].Remove(jobId)
|
|
|
|
|
|
- err := t.save()
|
|
|
+ err := t.save(taskPriority.(int))
|
|
|
if err != nil {
|
|
|
- return false, task_queue.OneJob{}, err
|
|
|
+ return false, err
|
|
|
}
|
|
|
|
|
|
- return true, value.(task_queue.OneJob), nil
|
|
|
+ return true, nil
|
|
|
}
|
|
|
|
|
|
-// LPeek 向左边获取第一个元素,不移除
|
|
|
-func (t *TaskQueue) LPeek() (bool, task_queue.OneJob) {
|
|
|
+func (t *TaskQueue) read() {
|
|
|
|
|
|
- defer t.doubleListLock.Unlock()
|
|
|
- t.doubleListLock.Lock()
|
|
|
+ err := GetDb().View(
|
|
|
+ func(tx *badger.Txn) error {
|
|
|
+ var err error
|
|
|
+ for i := 0; i <= taskPriorityCount; i++ {
|
|
|
+
|
|
|
+ key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
|
|
|
+ fmt.Sprintf("%s_%d", t.queueName, i)))
|
|
|
+ var item *badger.Item
|
|
|
+ item, err = tx.Get(key)
|
|
|
+ if err != nil {
|
|
|
+ if IsErrOk(err) == true {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ valCopy, err := item.ValueCopy(nil)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ err = t.taskPriorityMapList[i].FromJSON(valCopy)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // 如果队列里面没有东西,则返回 false
|
|
|
- if t.isEmpty() == true {
|
|
|
- return false, task_queue.OneJob{}
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ t.log.Panicln(err)
|
|
|
}
|
|
|
-
|
|
|
- leftFistOneIndex := 0
|
|
|
- value, bok := t.doubleList.Get(leftFistOneIndex)
|
|
|
- if bok == false {
|
|
|
- return false, task_queue.OneJob{}
|
|
|
+ // 需要把几个优先级的map中的key汇总
|
|
|
+ for i := 0; i < taskPriorityCount; i++ {
|
|
|
+ t.taskPriorityMapList[i].Each(func(key interface{}, value interface{}) {
|
|
|
+ t.taskKeyMap.Put(key, i)
|
|
|
+ })
|
|
|
}
|
|
|
-
|
|
|
- return true, value.(task_queue.OneJob)
|
|
|
}
|
|
|
|
|
|
-// LockList 锁住 List,这样才能够正确的进行遍历
|
|
|
-func (t *TaskQueue) LockList() {
|
|
|
- defer t.lockListLock.Unlock()
|
|
|
- t.lockListLock.Lock()
|
|
|
+// save 需要把改变的数据保持到 K/V 数据库中,这个没有锁,所以需要在 Sync 中使用,不对外开放
|
|
|
+func (t *TaskQueue) save(taskPriority int) error {
|
|
|
|
|
|
- t.lockList = true
|
|
|
-}
|
|
|
+ err := GetDb().Update(
|
|
|
+ func(tx *badger.Txn) error {
|
|
|
+ var err error
|
|
|
|
|
|
-// UnLockList 解锁 List,就可以正常的 Push 和 Pop
|
|
|
-func (t *TaskQueue) UnLockList() {
|
|
|
- defer t.lockListLock.Unlock()
|
|
|
- t.lockListLock.Lock()
|
|
|
+ key := []byte(MergeBucketAndKeyName(BucketNamePrefixVideoSubDownloadQueue,
|
|
|
+ fmt.Sprintf("%s_%d", t.queueName, taskPriority)))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- t.lockList = false
|
|
|
-}
|
|
|
+ b, err := t.taskPriorityMapList[taskPriority].ToJSON()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ e := badger.NewEntry(key, b)
|
|
|
+ err = tx.SetEntry(e)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
-func (t *TaskQueue) isLockList() bool {
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- bLock := false
|
|
|
- t.lockListLock.Lock()
|
|
|
- bLock = t.lockList
|
|
|
- t.lockListLock.Unlock()
|
|
|
+ return nil
|
|
|
+}
|
|
|
|
|
|
- return bLock
|
|
|
+// isExist 是否已经存在
|
|
|
+func (t *TaskQueue) isExist(jobID string) bool {
|
|
|
+ _, bok := t.taskKeyMap.Get(jobID)
|
|
|
+ return bok
|
|
|
}
|
|
|
|
|
|
-// GetList 使用的时候不要插入数据,否则会有问题
|
|
|
-func (t *TaskQueue) GetList() dll.Iterator {
|
|
|
- return t.doubleList.Iterator()
|
|
|
+func (t *TaskQueue) isEmpty() bool {
|
|
|
+ return t.taskKeyMap.Empty()
|
|
|
}
|
|
|
+
|
|
|
+const taskPriorityCount = 10
|