Просмотр исходного кода

正在调试,修复bug

Signed-off-by: 716 <[email protected]>
716 3 лет назад
Родитель
Сommit
67b2dee477

+ 7 - 8
internal/logic/cron_helper/cron_helper.go

@@ -1,7 +1,6 @@
 package cron_helper
 
 import (
-	"github.com/allanpk716/ChineseSubFinder/internal/logic/pre_job"
 	"github.com/allanpk716/ChineseSubFinder/internal/logic/task_queue"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/common"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/downloader"
@@ -63,15 +62,15 @@ func (ch *CronHelper) Start(runImmediately bool) {
 		ch.sets, ch.log, ch.downloadQueue)
 	// ----------------------------------------------
 	// 前置的任务,热修复、字幕修改文件名格式、提前下载好浏览器
-	pj := pre_job.NewPreJob(ch.sets, ch.log)
-	err := pj.HotFix().ChangeSubNameFormat().ReloadBrowser().Wait()
-	if err != nil {
-		ch.log.Panicln("pre_job", err)
-		return
-	}
+	//pj := pre_job.NewPreJob(ch.sets, ch.log)
+	//err := pj.HotFix().ChangeSubNameFormat().ReloadBrowser().Wait()
+	//if err != nil {
+	//	ch.log.Panicln("pre_job", err)
+	//	return
+	//}
 	// ----------------------------------------------
 	// 判断扫描任务的时间间隔是否符合要求,不符合则重写默认值
-	_, err = cron.ParseStandard(ch.sets.CommonSettings.ScanInterval)
+	_, err := cron.ParseStandard(ch.sets.CommonSettings.ScanInterval)
 	if err != nil {
 		ch.log.Warningln("CommonSettings.ScanInterval format error, after v0.25.x , need reset this at WebUI")
 		// 如果解析错误了,就需要重新赋值默认值过来,然后保存

+ 23 - 44
internal/logic/series_helper/seriesHelper.go

@@ -167,52 +167,31 @@ func DownloadSubtitleInAllSiteByOneSeries(Suppliers []ifaces.ISupplier, seriesIn
 	for key := range seriesInfo.NeedDlEpsKeyList {
 		log_helper.GetLogger().Infoln(common.QueueName, i, seriesInfo.Name, "-", key)
 	}
-	// TODO 资源占用较高,把这里的并发给取消
-	// 同时进行查询
-	//subInfosChannel := make(chan []supplier.SubInfo)
-	//for _, oneSupplier := range Suppliers {
-	//	nowSupplier := oneSupplier
-	//	go func() {
-	//		var subInfos []supplier.SubInfo
-	//		defer func() {
-	//			subInfosChannel <- subInfos
-	//			log_helper.GetLogger().Infoln(common.QueueName, i, nowSupplier.GetSupplierName(), "End...")
-	//		}()
-	//
-	//		log_helper.GetLogger().Infoln(common.QueueName, i, nowSupplier.GetSupplierName(), "Start...")
-	//		// 一次性把这一部连续剧的所有字幕下载完
-	//		subInfos, err := nowSupplier.GetSubListFromFile4Series(seriesInfo)
-	//		if err != nil {
-	//			log_helper.GetLogger().Errorln(common.QueueName, i, nowSupplier.GetSupplierName(), "GetSubListFromFile4Series", err)
-	//		}
-	//		// 把后缀名给改好
-	//		sub_helper.ChangeVideoExt2SubExt(subInfos)
-	//	}()
-	//}
-	//for i := 0; i < len(Suppliers); i++ {
-	//	v, ok := <-subInfosChannel
-	//	if ok == true && v != nil {
-	//		outSUbInfos = append(outSUbInfos, v...)
-	//	}
-	//}
-
-	for _, oneSupplier := range Suppliers {
-
-		var subInfos []supplier.SubInfo
-		log_helper.GetLogger().Infoln(common.QueueName, i, oneSupplier.GetSupplierName(), "Start...")
-		// 一次性把这一部连续剧的所有字幕下载完
-		subInfos, err := oneSupplier.GetSubListFromFile4Series(seriesInfo)
-		if err != nil {
-			log_helper.GetLogger().Errorln(common.QueueName, i, oneSupplier.GetSupplierName(), "GetSubListFromFile4Series", err)
-			log_helper.GetLogger().Infoln(common.QueueName, i, oneSupplier.GetSupplierName(), "End")
-			continue
-		}
-		// 把后缀名给改好
-		sub_helper.ChangeVideoExt2SubExt(subInfos)
 
-		outSUbInfos = append(outSUbInfos, subInfos...)
+	for index, oneSupplier := range Suppliers {
+
+		oneSupplierFunc := func() {
+			defer func() {
+				log_helper.GetLogger().Infoln(common.QueueName, index, i, oneSupplier.GetSupplierName(), "End")
+				log_helper.GetLogger().Infoln("------------------------------------------")
+			}()
+
+			var subInfos []supplier.SubInfo
+			log_helper.GetLogger().Infoln("------------------------------------------")
+			log_helper.GetLogger().Infoln(common.QueueName, index, i, oneSupplier.GetSupplierName(), "Start...")
+			// 一次性把这一部连续剧的所有字幕下载完
+			subInfos, err := oneSupplier.GetSubListFromFile4Series(seriesInfo)
+			if err != nil {
+				log_helper.GetLogger().Errorln(common.QueueName, index, i, oneSupplier.GetSupplierName(), "GetSubListFromFile4Series", err)
+				return
+			}
+			// 把后缀名给改好
+			sub_helper.ChangeVideoExt2SubExt(subInfos)
+
+			outSUbInfos = append(outSUbInfos, subInfos...)
+		}
 
-		log_helper.GetLogger().Infoln(common.QueueName, i, oneSupplier.GetSupplierName(), "End")
+		oneSupplierFunc()
 	}
 
 	return outSUbInfos

+ 21 - 8
internal/logic/task_queue/task_queue.go

@@ -122,11 +122,8 @@ func (t *TaskQueue) Add(oneJob task_queue.OneJob) (bool, error) {
 	return true, nil
 }
 
-// Update 更新素,不存在则会失败
-func (t *TaskQueue) Update(oneJob task_queue.OneJob) (bool, error) {
-
-	defer t.queueLock.Unlock()
-	t.queueLock.Lock()
+// update 更新素,不存在则会失败,内部用,没有锁
+func (t *TaskQueue) update(oneJob task_queue.OneJob) (bool, error) {
 
 	if t.isExist(oneJob.Id) == false {
 		return false, nil
@@ -160,6 +157,15 @@ func (t *TaskQueue) Update(oneJob task_queue.OneJob) (bool, error) {
 	return true, nil
 }
 
+// Update 更新素,不存在则会失败
+func (t *TaskQueue) Update(oneJob task_queue.OneJob) (bool, error) {
+
+	defer t.queueLock.Unlock()
+	t.queueLock.Lock()
+
+	return t.update(oneJob)
+}
+
 // AutoDetectUpdateJobStatus 根据任务的生命周期图,进行自动判断更新,见《任务的生命周期》流程图
 func (t *TaskQueue) AutoDetectUpdateJobStatus(oneJob task_queue.OneJob, inErr error) {
 
@@ -172,8 +178,9 @@ func (t *TaskQueue) AutoDetectUpdateJobStatus(oneJob task_queue.OneJob, inErr er
 	if inErr == nil {
 		// 没有错误就是完成
 		oneJob.JobStatus = taskQueue2.Done
+		oneJob.DownloadTimes += 1
 	} else {
-		// 超过了时间限制,默认是 90 天, A.Before(B) : A < B
+		// 超过了时间限制,默认是 90 天, A.Before(B) : A < B == true
 		if oneJob.AddedTime.AddDate(0, 0, t.settings.AdvancedSettings.TaskQueue.ExpirationTime).Before(time.Now()) == true {
 			// 超过 90 天了
 			oneJob.JobStatus = taskQueue2.Failed
@@ -197,9 +204,11 @@ func (t *TaskQueue) AutoDetectUpdateJobStatus(oneJob task_queue.OneJob, inErr er
 		}
 		// 传入的错误需要放进来
 		oneJob.ErrorInfo = inErr.Error()
+		oneJob.DownloadTimes += 1
 	}
 
-	bok, err := t.Update(oneJob)
+	// 这里不要用错了,要用无锁的,不然会阻塞
+	bok, err := t.update(oneJob)
 	if err != nil {
 		t.log.Errorln("AutoDetectUpdateJobStatus", oneJob.VideoFPath, err)
 		return
@@ -228,7 +237,11 @@ func (t *TaskQueue) GetOneWaitingJob() (bool, task_queue.OneJob, error) {
 		t.taskPriorityMapList[TaskPriority].Each(func(key interface{}, value interface{}) {
 
 			tOneJob = value.(task_queue.OneJob)
-			if tOneJob.JobStatus == task_queue.Waiting {
+			// 任务的 UpdateTime 与现在的时间大于单个字幕下载的间隔
+			// 默认是 12h, A.After(B) : A > B == true
+			// 见《任务队列设计》--以优先级顺序取出描述
+			if tOneJob.JobStatus == task_queue.Waiting && (tOneJob.DownloadTimes == 0 ||
+				tOneJob.UpdateTime.AddDate(0, 0, t.settings.AdvancedSettings.TaskQueue.OneSubDownloadInterval).After(time.Now()) == true && tOneJob.DownloadTimes > 0) {
 				// 找到就返回
 				found = true
 				return

+ 34 - 56
internal/pkg/downloader/downloader.go

@@ -5,9 +5,9 @@ import (
 	"fmt"
 	"github.com/allanpk716/ChineseSubFinder/internal/ifaces"
 	markSystem "github.com/allanpk716/ChineseSubFinder/internal/logic/mark_system"
-	"github.com/allanpk716/ChineseSubFinder/internal/logic/pre_download_process"
 	"github.com/allanpk716/ChineseSubFinder/internal/logic/series_helper"
 	subSupplier "github.com/allanpk716/ChineseSubFinder/internal/logic/sub_supplier"
+	"github.com/allanpk716/ChineseSubFinder/internal/logic/sub_supplier/xunlei"
 	"github.com/allanpk716/ChineseSubFinder/internal/logic/sub_timeline_fixer"
 	"github.com/allanpk716/ChineseSubFinder/internal/logic/task_queue"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/my_folder"
@@ -37,7 +37,8 @@ type Downloader struct {
 	subTimelineFixerHelperEx *sub_timeline_fixer.SubTimelineFixerHelperEx // 字幕时间轴校正
 	downloaderLock           sync.Mutex                                   // 取消执行 task control 的 Lock
 	downloadQueue            *task_queue.TaskQueue                        // 需要下载的视频的队列
-	supplierChecking         bool                                         // 正在检测字幕源有效性
+	//supplierChecking         bool                                         // 正在检测字幕源有效性
+	//queueDownloading         bool                                         // 正在下载个视频的字幕
 }
 
 func NewDownloader(inSubFormatter ifaces.ISubFormatter, _settings *settings.Settings, log *logrus.Logger, downloadQueue *task_queue.TaskQueue) *Downloader {
@@ -80,28 +81,13 @@ func (d *Downloader) SupplierCheck() {
 		if p := recover(); p != nil {
 			d.log.Errorln("Downloader.SupplierCheck() panic")
 		}
-
-		d.downloaderLock.Lock()
-		d.supplierChecking = false
 		d.downloaderLock.Unlock()
 
 		d.log.Infoln("Download.SupplierCheck() End")
 	}()
 
-	d.log.Infoln("Download.SupplierCheck() Start ...")
-
-	supplierChecking := false
-	d.downloaderLock.Lock()
-	supplierChecking = d.supplierChecking
-	d.downloaderLock.Unlock()
-	if supplierChecking == true {
-		d.log.Warningln("Download.SupplierCheck() only run once, skip")
-		return
-	}
-
 	d.downloaderLock.Lock()
-	d.supplierChecking = true
-	d.downloaderLock.Unlock()
+	d.log.Infoln("Download.SupplierCheck() Start ...")
 
 	// 创建一个 chan 用于任务的中断和超时
 	done := make(chan interface{}, 1)
@@ -112,19 +98,27 @@ func (d *Downloader) SupplierCheck() {
 			panicChan <- p
 		}
 		// 下载前的初始化
-		d.log.Infoln("PreDownloadProcess.Init().Check().Wait()...")
-		preDownloadProcess := pre_download_process.NewPreDownloadProcess(d.log, d.settings)
-		err := preDownloadProcess.Init().Check().Wait()
-		if err != nil {
-			done <- errors.New(fmt.Sprintf("NewPreDownloadProcess Error: %v", err))
-		} else {
-			// 更新 SubSupplierHub 实例
-			d.downloaderLock.Lock()
-			d.subSupplierHub = preDownloadProcess.SubSupplierHub
-			d.downloaderLock.Unlock()
-
-			done <- nil
-		}
+		//d.log.Infoln("PreDownloadProcess.Init().Check().Wait()...")
+		//preDownloadProcess := pre_download_process.NewPreDownloadProcess(d.log, d.settings)
+		//err := preDownloadProcess.Init().Check().Wait()
+		//if err != nil {
+		//	done <- errors.New(fmt.Sprintf("NewPreDownloadProcess Error: %v", err))
+		//} else {
+		//	// 更新 SubSupplierHub 实例
+		//	d.downloaderLock.Lock()
+		//	d.subSupplierHub = preDownloadProcess.SubSupplierHub
+		//	d.downloaderLock.Unlock()
+		//
+		//	done <- nil
+		//}
+
+		subSupplierHub := subSupplier.NewSubSupplierHub(
+			d.settings,
+			d.log,
+			xunlei.NewSupplier(d.settings, d.log),
+		)
+		d.subSupplierHub = subSupplierHub
+		done <- nil
 	}()
 
 	select {
@@ -150,24 +144,15 @@ func (d *Downloader) QueueDownloader() {
 		if p := recover(); p != nil {
 			d.log.Errorln("Downloader.QueueDownloader() panic")
 		}
-
+		d.downloaderLock.Unlock()
 		d.log.Infoln("Download.QueueDownloader() End")
 	}()
 
-	var downloadCounter int64
-	downloadCounter = 0
-
+	d.downloaderLock.Lock()
 	d.log.Infoln("Download.QueueDownloader() Start ...")
 
-	// 如果正在 check supplier 的状态,那么就跳过本次
-	supplierChecking := false
-	d.downloaderLock.Lock()
-	supplierChecking = d.supplierChecking
-	d.downloaderLock.Unlock()
-	if supplierChecking == true {
-		d.log.Infoln("SupplierCheck is running, Skip QueueDownloader this time")
-		return
-	}
+	var downloadCounter int64
+	downloadCounter = 0
 	// 从队列取数据出来
 	bok, oneJob, err := d.downloadQueue.GetOneWaitingJob()
 	if err != nil {
@@ -239,11 +224,8 @@ func (d *Downloader) Cancel() {
 
 func (d *Downloader) movieDlFunc(ctx context.Context, job taskQueue2.OneJob, downloadIndex int64) error {
 
-	var nowSubSupplierHub *subSupplier.SubSupplierHub
-	d.downloaderLock.Lock()
-	nowSubSupplierHub = d.subSupplierHub
-	d.downloaderLock.Unlock()
-	if nowSubSupplierHub == nil {
+	nowSubSupplierHub := d.subSupplierHub
+	if nowSubSupplierHub.Suppliers == nil || len(nowSubSupplierHub.Suppliers) < 1 {
 		d.log.Infoln("Wait SupplierCheck Update *subSupplierHub, movieDlFunc Skip this time")
 		return nil
 	}
@@ -274,15 +256,11 @@ func (d *Downloader) movieDlFunc(ctx context.Context, job taskQueue2.OneJob, dow
 
 func (d *Downloader) seriesDlFunc(ctx context.Context, job taskQueue2.OneJob, downloadIndex int64) error {
 
-	var nowSubSupplierHub *subSupplier.SubSupplierHub
-	d.downloaderLock.Lock()
-	nowSubSupplierHub = d.subSupplierHub
-	d.downloaderLock.Unlock()
-	if nowSubSupplierHub == nil {
-		d.log.Infoln("Wait SupplierCheck Update *subSupplierHub, seriesDlFunc Skip this time")
+	nowSubSupplierHub := d.subSupplierHub
+	if nowSubSupplierHub.Suppliers == nil || len(nowSubSupplierHub.Suppliers) < 1 {
+		d.log.Infoln("Wait SupplierCheck Update *subSupplierHub, movieDlFunc Skip this time")
 		return nil
 	}
-
 	var err error
 	// 这里拿到了这一部连续剧的所有的剧集信息,以及所有下载到的字幕信息
 	seriesInfo, err := series_helper.ReadSeriesInfoFromDir(job.SeriesRootDirPath, false, d.settings.AdvancedSettings.ProxySettings)

+ 6 - 5
internal/pkg/settings/task_queue.go

@@ -1,12 +1,13 @@
 package settings
 
 type TaskQueue struct {
-	MaxRetryTimes  int `json:"max_retry_times" default:"3"`    // 单个任务失败后,最大重试次数,超过后会降一级
-	OneJobTimeOut  int `json:"one_job_time_out" default:"300"` // 单个任务的超时时间 5 * 60 s
-	Interval       int `json:"interval" default:"10"`          // 任务的间隔,单位 s,这里会有一个限制,不允许太快,然后会做一定的随机时间范围,当前值 x ~ 2*x 之内随机
-	ExpirationTime int `json:"expiration_time"  default:"90"`  // 添加任务后,过期的时间(单位 day),超过后,任务会降级到 Low
+	MaxRetryTimes          int `json:"max_retry_times" default:"3"`            // 单个任务失败后,最大重试次数,超过后会降一级
+	OneJobTimeOut          int `json:"one_job_time_out" default:"300"`         // 单个任务的超时时间 5 * 60 s
+	Interval               int `json:"interval" default:"10"`                  // 任务的间隔,单位 s,这里会有一个限制,不允许太快,然后会做一定的随机时间范围,当前值 x ~ 2*x 之内随机
+	ExpirationTime         int `json:"expiration_time"  default:"90"`          // 添加任务后,过期的时间(单位 day),超过后,任务会降级到 Low
+	OneSubDownloadInterval int `json:"one_sub_download_interval" default:"12"` // 一个字幕下载的间隔(单位 h),不然老是一个循环。对比的基准是 OneJob 的 UpdateTime
 }
 
 func NewTaskQueue() *TaskQueue {
-	return &TaskQueue{MaxRetryTimes: 3, Interval: 10, ExpirationTime: 90, OneJobTimeOut: 300}
+	return &TaskQueue{MaxRetryTimes: 3, Interval: 10, ExpirationTime: 90, OneJobTimeOut: 300, OneSubDownloadInterval: 12}
 }

+ 1 - 0
internal/types/task_queue/task_queue.go

@@ -26,6 +26,7 @@ type OneJob struct {
 	UpdateTime               time.Time        `json:"update_time"`                  // 任务更新的时间
 	MediaServerInsideVideoID string           `json:"media_server_inside_video_id"` // 媒体服务器中,这个视频的 ID,如果是 Emby 就对应它内部这个视频的 ID,后续用于指定刷新视频信息
 	ErrorInfo                string           `json:"error_info"`                   // 这个任务的错误信息
+	DownloadTimes            int              `json:"download_times"`               // 下载的次数,用于统计下载过几次
 }
 
 func NewOneJob(videoType common.VideoType, videoFPath string, taskPriority int, MediaServerInsideVideoID ...string) *OneJob {