Browse Source

修复 channel 关闭的细节问题

Signed-off-by: 716 <[email protected]>
716 3 years ago
parent
commit
613d70fa7b

+ 3 - 1
internal/backend/backend.go

@@ -40,7 +40,9 @@ func StartBackEnd(fileDownloader *file_downloader.FileDownloader, httpPort int,
 
 	hub := ws_helper.NewHub()
 	go hub.Run()
-
+	defer func() {
+		hub.Clear()
+	}()
 	engine.GET("/ws", func(context *gin.Context) {
 		ws_helper.ServeWs(fileDownloader.Log, hub, context.Writer, context.Request)
 	})

+ 6 - 0
internal/backend/ws_helper/hub.go

@@ -50,3 +50,9 @@ func (h *Hub) Run() {
 		}
 	}
 }
+
+func (h *Hub) Clear() {
+	// close channel
+	close(h.broadcast)
+	close(h.register)
+}

+ 3 - 4
internal/logic/emby_helper/embyhelper.go

@@ -622,15 +622,14 @@ func (em *EmbyHelper) getMoreVideoInfoList(videoIdList []string, isMovieOrSeries
 
 		done := make(chan OutData, 1)
 		panicChan := make(chan interface{}, 1)
-		defer func() {
-			close(done)
-			close(panicChan)
-		}()
+
 		go func() {
 			defer func() {
 				if p := recover(); p != nil {
 					panicChan <- p
 				}
+				close(done)
+				close(panicChan)
 			}()
 
 			info, err := queryFuncByMatchPath(data.Id)

+ 4 - 2
internal/logic/scan_played_video_subinfo/scan_played_video_subinfo.go

@@ -92,6 +92,7 @@ func (s *ScanPlayedVideoSubInfo) Cancel() {
 	s.canceledLock.Unlock()
 
 	s.taskControl.Release()
+	s.taskControl.Reboot()
 }
 
 func (s *ScanPlayedVideoSubInfo) GetPlayedItemsSubtitle() (bool, error) {
@@ -245,6 +246,9 @@ func (s *ScanPlayedVideoSubInfo) scan(ctx context.Context, inData interface{}) e
 		index++
 		stage := make(chan interface{}, 1)
 		go func() {
+			defer func() {
+				close(stage)
+			}()
 			s.dealOneVideo(index, videoFPath, orgSubFPath, videoTypes, shareRootDir, scanInputData.IsMovie, imdbInfoCache)
 			stage <- 1
 		}()
@@ -252,11 +256,9 @@ func (s *ScanPlayedVideoSubInfo) scan(ctx context.Context, inData interface{}) e
 		select {
 		case <-ctx.Done():
 			{
-				close(stage)
 				return errors.New(fmt.Sprintf("cancel at scan: %s", videoFPath))
 			}
 		case <-stage:
-			close(stage)
 			break
 		}
 	}

+ 19 - 20
internal/pkg/downloader/downloader.go

@@ -104,16 +104,14 @@ func (d *Downloader) SupplierCheck() {
 	// 接收内部任务的 panic
 	panicChan := make(chan interface{}, 1)
 
-	defer func() {
-		close(done)
-		close(panicChan)
-	}()
-
 	go func() {
 		defer func() {
 			if p := recover(); p != nil {
 				panicChan <- p
 			}
+
+			close(done)
+			close(panicChan)
 		}()
 		// 下载前的初始化
 		d.log.Infoln("PreDownloadProcess.Init().Check().Wait()...")
@@ -212,16 +210,13 @@ func (d *Downloader) QueueDownloader() {
 	// 接收内部任务的 panic
 	panicChan := make(chan interface{}, 1)
 
-	defer func() {
-		close(done)
-		close(panicChan)
-	}()
-
 	go func() {
 		defer func() {
 			if p := recover(); p != nil {
 				panicChan <- p
 			}
+			close(done)
+			close(panicChan)
 			// 没下载完毕一次,进行一次缓存和 Chrome 的清理
 			err = my_folder.ClearRootTmpFolder()
 			if err != nil {
@@ -315,16 +310,6 @@ func (d *Downloader) movieDlFunc(ctx context.Context, job taskQueue2.OneJob, dow
 
 func (d *Downloader) seriesDlFunc(ctx context.Context, job taskQueue2.OneJob, downloadIndex int64) error {
 
-	// 创建一个 chan 用于任务的中断和超时
-	done := make(chan interface{}, 1)
-	// 接收内部任务的 panic
-	panicChan := make(chan interface{}, 1)
-
-	defer func() {
-		close(done)
-		close(panicChan)
-	}()
-
 	nowSubSupplierHub := d.subSupplierHub
 	if nowSubSupplierHub.Suppliers == nil || len(nowSubSupplierHub.Suppliers) < 1 {
 		d.log.Infoln("Wait SupplierCheck Update *subSupplierHub, movieDlFunc Skip this time")
@@ -370,11 +355,19 @@ func (d *Downloader) seriesDlFunc(ctx context.Context, job taskQueue2.OneJob, do
 	subVideoCount := 0
 	for epsKey, episodeInfo := range seriesInfo.NeedDlEpsKeyList {
 
+		// 创建一个 chan 用于任务的中断和超时
+		done := make(chan interface{}, 1)
+		// 接收内部任务的 panic
+		panicChan := make(chan interface{}, 1)
+
 		go func() {
 			defer func() {
 				if p := recover(); p != nil {
 					panicChan <- p
 				}
+
+				close(done)
+				close(panicChan)
 			}()
 			// 匹配对应的 Eps 去处理
 			done <- d.oneVideoSelectBestSub(episodeInfo.FileFullPath, organizeSubFiles[epsKey])
@@ -414,11 +407,17 @@ func (d *Downloader) seriesDlFunc(ctx context.Context, job taskQueue2.OneJob, do
 			continue
 		}
 
+		// 创建一个 chan 用于任务的中断和超时
+		done := make(chan interface{}, 1)
+		// 接收内部任务的 panic
+		panicChan := make(chan interface{}, 1)
 		go func() {
 			defer func() {
 				if p := recover(); p != nil {
 					panicChan <- p
 				}
+				close(done)
+				close(panicChan)
 			}()
 			// 匹配对应的 Eps 去处理
 			seasonEpsKey := my_util.GetEpisodeKeyName(episodeInfo.Season, episodeInfo.Episode)

+ 3 - 5
internal/pkg/emby_api/emby_api.go

@@ -60,16 +60,14 @@ func (em EmbyApi) RefreshRecentlyVideoInfo() error {
 		done := make(chan error, 1)
 		panicChan := make(chan interface{}, 1)
 
-		defer func() {
-			close(done)
-			close(panicChan)
-		}()
-
 		go func() {
 			defer func() {
 				if p := recover(); p != nil {
 					panicChan <- p
 				}
+
+				close(done)
+				close(panicChan)
 			}()
 
 			done <- updateFunc(data.Id)

+ 2 - 0
internal/pkg/sub_timeline_fixer/fixer.go

@@ -775,6 +775,8 @@ package sub_timeline_fixer
 //				if p := recover(); p != nil {
 //					panicChan <- p
 //				}
+//close(done)
+//close(panicChan)
 //			}()
 //
 //			done <- fixFunc(inData)

+ 15 - 6
internal/pkg/task_control/task_control.go

@@ -45,6 +45,7 @@ func NewTaskControl(size int, log *logrus.Logger) (*TaskControl, error) {
 		return nil, err
 	}
 	tc.wgBase = sync.WaitGroup{}
+
 	return &tc, nil
 }
 
@@ -127,17 +128,13 @@ func (tc *TaskControl) baseFuncHandler(inData interface{}) {
 
 	done := make(chan error, 1)
 	panicChan := make(chan interface{}, 1)
-
-	defer func() {
-		close(done)
-		close(panicChan)
-	}()
-
 	go func(ctx context.Context) {
 		defer func() {
 			if p := recover(); p != nil {
 				panicChan <- p
 			}
+			close(done)
+			close(panicChan)
 		}()
 
 		done <- tc.ctxFunc(ctx, inData)
@@ -205,6 +202,8 @@ func (tc *TaskControl) Release() {
 
 func (tc *TaskControl) Reboot() {
 
+	tc.log.Debugln("-------------------------------")
+	tc.log.Debugln("Reboot Start")
 	var release bool
 	tc.commonLock.Lock()
 	release = tc.released
@@ -230,6 +229,16 @@ func (tc *TaskControl) Reboot() {
 		tc.released = false
 		tc.commonLock.Unlock()
 	}
+
+	tc.log.Debugln("Reboot End")
+	tc.log.Debugln("-------------------------------")
+}
+
+func (tc *TaskControl) Close() {
+	tc.log.Debugln("-------------------------------")
+	tc.log.Debugln("Close Start")
+	tc.log.Debugln("Close End")
+	tc.log.Debugln("-------------------------------")
 }
 
 // GetExecuteInfo 获取 所有 Invoke 的执行情况,需要在 下一次 Invoke 拿走,否则会清空

+ 1 - 0
internal/pkg/video_scan_and_refresh_helper/video_scan_and_refresh_helper.go

@@ -84,6 +84,7 @@ func (v *VideoScanAndRefreshHelper) Start() error {
 
 func (v VideoScanAndRefreshHelper) Cancel() {
 	v.taskControl.Release()
+	v.taskControl.Reboot()
 }
 
 // ReadSpeFile 优先级最高。读取特殊文件,启用一些特殊的功能,比如 forced_scan_and_down_sub