Browse Source

实现 web 控制任务队列功能的接口

Signed-off-by: allan716 <[email protected]>
allan716 3 years ago
parent
commit
24cf914866

+ 138 - 0
internal/backend/controllers/v1/jobs_things.go

@@ -0,0 +1,138 @@
+package v1
+
+import (
+	"bufio"
+	"errors"
+	"fmt"
+	"github.com/allanpk716/ChineseSubFinder/internal/pkg/global_value"
+	"github.com/allanpk716/ChineseSubFinder/internal/pkg/my_util"
+	task_queue2 "github.com/allanpk716/ChineseSubFinder/internal/pkg/task_queue"
+	"github.com/allanpk716/ChineseSubFinder/internal/types/backend"
+	"github.com/allanpk716/ChineseSubFinder/internal/types/common"
+	"github.com/allanpk716/ChineseSubFinder/internal/types/task_queue"
+	"github.com/gin-gonic/gin"
+	"io"
+	"net/http"
+	"os"
+	"path/filepath"
+)
+
+func (cb ControllerBase) JobsListHandler(c *gin.Context) {
+	var err error
+	defer func() {
+		// 统一的异常处理
+		cb.ErrorProcess(c, "JobsListHandler", err)
+	}()
+
+	bok, allJobs, err := cb.cronHelper.DownloadQueue.GetAllJobs()
+	if err != nil {
+		return
+	}
+
+	if bok == false {
+		c.JSON(http.StatusOK, backend.ReplyAllJobs{
+			AllJobs: make([]task_queue.OneJob, 0),
+		})
+		return
+	}
+
+	c.JSON(http.StatusOK, backend.ReplyAllJobs{
+		AllJobs: allJobs,
+	})
+}
+
+func (cb ControllerBase) ChangeJobStatusHandler(c *gin.Context) {
+	var err error
+	defer func() {
+		// 统一的异常处理
+		cb.ErrorProcess(c, "JobsListHandler", err)
+	}()
+
+	desJobStatus := backend.ReqChangeJobStatus{}
+	err = c.ShouldBindJSON(&desJobStatus)
+	if err != nil {
+		return
+	}
+
+	bok, nowOneJob := cb.cronHelper.DownloadQueue.GetOneJobByID(desJobStatus.Id)
+	if bok == false {
+		err = errors.New("")
+		return
+	}
+
+	if bok == false {
+		c.JSON(http.StatusOK, backend.ReplyCommon{Message: "job not found"})
+		return
+	}
+
+	if desJobStatus.TaskPriority == "high" {
+		// high
+		nowOneJob.TaskPriority = task_queue2.HighTaskPriorityLevel
+	} else if desJobStatus.TaskPriority == "mddile" {
+		// middle
+		nowOneJob.TaskPriority = task_queue2.DefaultTaskPriorityLevel
+	} else {
+		// low
+		nowOneJob.TaskPriority = task_queue2.LowTaskPriorityLevel
+	}
+	nowOneJob.JobStatus = task_queue.Waiting
+
+	bok, err = cb.cronHelper.DownloadQueue.Update(nowOneJob)
+	if err != nil {
+		return
+	}
+
+	if bok == false {
+		c.JSON(http.StatusOK, backend.ReplyCommon{Message: "update job status failed"})
+		return
+	}
+
+	c.JSON(http.StatusOK, backend.ReplyCommon{Message: "ok"})
+}
+
+func (cb ControllerBase) JobLogHandler(c *gin.Context) {
+	var err error
+	defer func() {
+		// 统一的异常处理
+		cb.ErrorProcess(c, "JobLogHandler", err)
+	}()
+
+	reqJobLog := backend.ReqJobLog{}
+	err = c.ShouldBindJSON(&reqJobLog)
+	if err != nil {
+		return
+	}
+
+	pathRoot := filepath.Join(global_value.ConfigRootDirFPath(), "Logs")
+	fileFPath := filepath.Join(pathRoot, common.OnceLogPrefix+reqJobLog.Id+".log")
+	if my_util.IsFile(fileFPath) == true {
+		// 存在
+		// 一行一行的读取文件
+		var fi *os.File
+		fi, err = os.Open(fileFPath)
+		if err != nil {
+			fmt.Printf("Error: %s\n", err)
+			return
+		}
+		defer fi.Close()
+
+		ReplyJobLog := backend.ReplyJobLog{}
+		ReplyJobLog.OneLine = make([]string, 0)
+		br := bufio.NewReader(fi)
+		for {
+			a, _, c := br.ReadLine()
+			if c == io.EOF {
+				break
+			}
+			ReplyJobLog.OneLine = append(ReplyJobLog.OneLine, string(a))
+		}
+
+		c.JSON(http.StatusOK, ReplyJobLog)
+	} else {
+		// 不存在
+		c.JSON(http.StatusOK, backend.ReplyCommon{Message: "job log not found"})
+		return
+	}
+
+	c.JSON(http.StatusOK, backend.ReplyCommon{Message: "ok"})
+}

+ 4 - 0
internal/backend/routers/base_router.go

@@ -44,5 +44,9 @@ func InitRouter(fileDownloader *file_downloader.FileDownloader, router *gin.Engi
 		GroupV1.POST("/daemon/start", cbV1.DaemonStartHandler)
 		GroupV1.POST("/daemon/stop", cbV1.DaemonStopHandler)
 		GroupV1.GET("/daemon/status", cbV1.DaemonStatusHandler)
+
+		GroupV1.GET("/jobs/list", cbV1.JobsListHandler)
+		GroupV1.POST("/jobs/change-job-status", cbV1.ChangeJobStatusHandler)
+		GroupV1.POST("/jobs/log", cbV1.JobLogHandler)
 	}
 }

+ 4 - 4
internal/logic/cron_helper/cron_helper.go

@@ -20,7 +20,7 @@ type CronHelper struct {
 	cronHelperRunning             bool                                                     // 这个是定时器启动的状态,它为true,不代表核心函数在执行
 	scanPlayedVideoSubInfo        *scan_played_video_subinfo.ScanPlayedVideoSubInfo        // 扫描已经播放过的视频的字幕信息
 	fileDownloader                *file_downloader.FileDownloader                          // 文件下载器
-	downloadQueue                 *task_queue.TaskQueue                                    // 需要下载的视频的队列
+	DownloadQueue                 *task_queue.TaskQueue                                    // 需要下载的视频的队列
 	downloader                    *downloader.Downloader                                   // 下载者线程
 	videoScanAndRefreshHelper     *video_scan_and_refresh_helper.VideoScanAndRefreshHelper // 视频扫描和刷新的帮助类
 	cronLock                      sync.Mutex                                               // 锁
@@ -40,7 +40,7 @@ func NewCronHelper(fileDownloader *file_downloader.FileDownloader) *CronHelper {
 		log:            fileDownloader.Log,
 		settings:       fileDownloader.Settings,
 		// 实例化下载队列
-		downloadQueue: task_queue.NewTaskQueue(fileDownloader.CacheCenter),
+		DownloadQueue: task_queue.NewTaskQueue(fileDownloader.CacheCenter),
 	}
 
 	var err error
@@ -52,7 +52,7 @@ func NewCronHelper(fileDownloader *file_downloader.FileDownloader) *CronHelper {
 	// 字幕扫描器
 	ch.videoScanAndRefreshHelper = video_scan_and_refresh_helper.NewVideoScanAndRefreshHelper(
 		ch.fileDownloader,
-		ch.downloadQueue)
+		ch.DownloadQueue)
 
 	return &ch
 }
@@ -76,7 +76,7 @@ func (ch *CronHelper) Start(runImmediately bool) {
 	// 初始化下载者,里面的两个 func 需要使用定时器启动 SupplierCheck QueueDownloader
 	ch.downloader = downloader.NewDownloader(
 		sub_formatter.GetSubFormatter(ch.log, ch.settings.AdvancedSettings.SubNameFormatter),
-		ch.fileDownloader, ch.downloadQueue)
+		ch.fileDownloader, ch.DownloadQueue)
 	// ----------------------------------------------
 	// 判断扫描任务的时间间隔是否符合要求,不符合则重写默认值
 	_, err := cron.ParseStandard(ch.settings.CommonSettings.ScanInterval)

+ 98 - 0
internal/pkg/task_queue/get.go

@@ -130,3 +130,101 @@ func (t *TaskQueue) GetOneDoneJob() (bool, task_queue.OneJob, error) {
 
 	return false, tOneJob, nil
 }
+
+func (t *TaskQueue) GetJobsByStatus(status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
+
+	defer t.queueLock.Unlock()
+	t.queueLock.Lock()
+
+	outOneJobs := make([]task_queue.OneJob, 0)
+	// 如果队列里面没有东西,则返回 false
+	if t.isEmpty() == true {
+		return false, nil, nil
+	}
+
+	for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
+
+		t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
+
+			tOneJob := task_queue.OneJob{}
+			tOneJob = value.(task_queue.OneJob)
+			if tOneJob.JobStatus == status {
+				// 找到加入列表
+				outOneJobs = append(outOneJobs, tOneJob)
+			}
+		})
+	}
+
+	return true, outOneJobs, nil
+}
+
+// GetJobsByPriorityAndStatus 根据任务优先级和状态获取任务列表
+func (t *TaskQueue) GetJobsByPriorityAndStatus(taskPriority int, status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
+
+	defer t.queueLock.Unlock()
+	t.queueLock.Lock()
+
+	outOneJobs := make([]task_queue.OneJob, 0)
+	// 如果队列里面没有东西,则返回 false
+	if t.isEmpty() == true {
+		return false, nil, nil
+	}
+
+	t.taskPriorityMapList[taskPriority].Each(func(key interface{}, value interface{}) {
+
+		tOneJob := task_queue.OneJob{}
+		tOneJob = value.(task_queue.OneJob)
+		if tOneJob.JobStatus == status {
+			// 找到加入列表
+			outOneJobs = append(outOneJobs, tOneJob)
+		}
+	})
+
+	return true, outOneJobs, nil
+}
+
+func (t *TaskQueue) GetAllJobs() (bool, []task_queue.OneJob, error) {
+
+	defer t.queueLock.Unlock()
+	t.queueLock.Lock()
+
+	outOneJobs := make([]task_queue.OneJob, 0)
+	// 如果队列里面没有东西,则返回 false
+	if t.isEmpty() == true {
+		return false, nil, nil
+	}
+
+	for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
+
+		t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
+
+			tOneJob := task_queue.OneJob{}
+			tOneJob = value.(task_queue.OneJob)
+			// 找到加入列表
+			outOneJobs = append(outOneJobs, tOneJob)
+		})
+	}
+
+	return true, outOneJobs, nil
+}
+
+func (t *TaskQueue) GetOneJobByID(jobId string) (bool, task_queue.OneJob) {
+
+	defer t.queueLock.Unlock()
+	t.queueLock.Lock()
+
+	outOneJob := task_queue.OneJob{}
+
+	taskPriority, bok := t.taskKeyMap.Get(jobId)
+	if bok == false {
+		return false, outOneJob
+	}
+	// 删除连续剧的 tree.Map 里面的 tree.Set 的元素
+	needDelJobObj, bok := t.taskPriorityMapList[taskPriority.(int)].Get(jobId)
+	if bok == false {
+		return false, outOneJob
+	}
+	outOneJob = needDelJobObj.(task_queue.OneJob)
+
+	return true, outOneJob
+}

+ 2 - 53
internal/pkg/task_queue/task_queue.go

@@ -25,7 +25,7 @@ type TaskQueue struct {
 	log                 *logrus.Logger            // 日志
 	center              *cache_center.CacheCenter // 缓存中心
 	taskPriorityMapList []*treemap.Map            // 这里有 0-10 个优先级划分的存储 List,每Add一个数据的时候需要切换到这个 List 中去 save
-	taskKeyMap          *treemap.Map              // 以每个任务的唯一 JobID 来存储每个 Job,这样可以快速查询
+	taskKeyMap          *treemap.Map              // 以每个任务的唯一 JobID 来存储每个 Job 的 优先级在哪里,这样可以快速查询
 	taskGroupBySeries   *treemap.Map              // 以每个任务的 SeriesRootPath 来存储每个任务,然后内层是一个 treeset,后续可以遍历删除即可
 	queueLock           sync.Mutex                // 公用这个锁
 }
@@ -243,58 +243,6 @@ func (t *TaskQueue) AutoDetectUpdateJobStatus(oneJob task_queue.OneJob, inErr er
 	}
 }
 
-func (t *TaskQueue) GetJobsByStatus(status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
-
-	defer t.queueLock.Unlock()
-	t.queueLock.Lock()
-
-	outOneJobs := make([]task_queue.OneJob, 0)
-	// 如果队列里面没有东西,则返回 false
-	if t.isEmpty() == true {
-		return false, nil, nil
-	}
-
-	for TaskPriority := 0; TaskPriority <= taskPriorityCount; TaskPriority++ {
-
-		t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
-
-			tOneJob := task_queue.OneJob{}
-			tOneJob = value.(task_queue.OneJob)
-			if tOneJob.JobStatus == status {
-				// 找到加入列表
-				outOneJobs = append(outOneJobs, tOneJob)
-			}
-		})
-	}
-
-	return true, outOneJobs, nil
-}
-
-// GetJobsByPriorityAndStatus 根据任务优先级和状态获取任务列表
-func (t *TaskQueue) GetJobsByPriorityAndStatus(taskPriority int, status task_queue.JobStatus) (bool, []task_queue.OneJob, error) {
-
-	defer t.queueLock.Unlock()
-	t.queueLock.Lock()
-
-	outOneJobs := make([]task_queue.OneJob, 0)
-	// 如果队列里面没有东西,则返回 false
-	if t.isEmpty() == true {
-		return false, nil, nil
-	}
-
-	t.taskPriorityMapList[taskPriority].Each(func(key interface{}, value interface{}) {
-
-		tOneJob := task_queue.OneJob{}
-		tOneJob = value.(task_queue.OneJob)
-		if tOneJob.JobStatus == status {
-			// 找到加入列表
-			outOneJobs = append(outOneJobs, tOneJob)
-		}
-	})
-
-	return true, outOneJobs, nil
-}
-
 func (t *TaskQueue) del(jobId string) (bool, error) {
 	if t.isExist(jobId) == false {
 		return false, nil
@@ -467,6 +415,7 @@ const (
 	HighTaskPriorityLevel       = 3
 	DefaultTaskPriorityLevel    = 5
 	FirstRetryTaskPriorityLevel = 6
+	LowTaskPriorityLevel        = 7
 )
 
 var (

+ 1 - 0
internal/pkg/task_queue/update.go

@@ -0,0 +1 @@
+package task_queue

+ 7 - 0
internal/types/backend/reply_all_jobs.go

@@ -0,0 +1,7 @@
+package backend
+
+import "github.com/allanpk716/ChineseSubFinder/internal/types/task_queue"
+
+type ReplyAllJobs struct {
+	AllJobs []task_queue.OneJob `json:"all_jobs"`
+}

+ 5 - 0
internal/types/backend/reply_job_log.go

@@ -0,0 +1,5 @@
+package backend
+
+type ReplyJobLog struct {
+	OneLine []string `json:"one_line"`
+}

+ 6 - 0
internal/types/backend/req_change_job_status.go

@@ -0,0 +1,6 @@
+package backend
+
+type ReqChangeJobStatus struct {
+	Id           string `json:"id"`                           // 任务的唯一 ID
+	TaskPriority string `json:"task_priority" default:"high"` // 任务的优先级,high or middle or low priority
+}

+ 5 - 0
internal/types/backend/req_job_log.go

@@ -0,0 +1,5 @@
+package backend
+
+type ReqJobLog struct {
+	Id string `json:"id"` // 任务的唯一 ID
+}