瀏覽代碼

正在大改

Signed-off-by: allan716 <[email protected]>
allan716 3 年之前
父節點
當前提交
26f8e27e25

+ 1 - 0
.gitignore

@@ -61,3 +61,4 @@ TestData/
 /.idea
 /internal/backend/Logs
 /internal/backend
+/internal/pkg/task_control/Logs/*.log

+ 43 - 158
TestCode/test_timeout.go

@@ -1,181 +1,66 @@
 package TestCode
 
 import (
-	"github.com/allanpk716/ChineseSubFinder/internal/pkg/random_useragent"
-	"github.com/allanpk716/ChineseSubFinder/internal/pkg/rod_helper"
-	"github.com/go-rod/rod/lib/proto"
-	"github.com/panjf2000/ants/v2"
+	"fmt"
 	"golang.org/x/net/context"
+	"runtime"
 	"sync"
 	"time"
 )
 
-func DownloadTest() error {
+func hardWork(job interface{}) error {
+	index := job.(int)
+	println("Index:", index, "Start")
+	time.Sleep(time.Second * 10)
+	println("Index:", index, "End")
+	return nil
+}
+
+func requestWork(ctx context.Context, job interface{}) error {
+	ctx, cancel := context.WithTimeout(ctx, time.Second*2)
+	defer cancel()
 
-	testFunc := func(i interface{}) error {
-		inData := i.(InputData)
+	done := make(chan error, 1)
+	panicChan := make(chan interface{}, 1)
+	go func() {
 		defer func() {
-			println(inData.Index, "testFunc done.")
+			if p := recover(); p != nil {
+				panicChan <- p
+			}
 		}()
-		println(inData.Index, "start...")
-
-		err2 := oneStep(inData)
-		if err2 != nil {
-			return err2
-		}
 
-		return nil
+		done <- hardWork(job)
+	}()
 
-		//return goStep(inData)
+	select {
+	case err := <-done:
+		return err
+	case p := <-panicChan:
+		panic(p)
+	case <-ctx.Done():
+		return ctx.Err()
 	}
+}
 
-	antPool, err := ants.NewPoolWithFunc(2, func(inData interface{}) {
-		data := inData.(InputData)
-		defer data.Wg.Done()
-		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
-		defer cancel()
-		data.Ctx = ctx
-		done := make(chan error, 1)
-		panicChan := make(chan interface{}, 1)
-		go func() {
+func Process() {
+	const total = 10
+	var wg sync.WaitGroup
+	wg.Add(total)
+	now := time.Now()
+	for i := 0; i < total; i++ {
+		go func(index int) {
 			defer func() {
 				if p := recover(); p != nil {
-					panicChan <- p
+					fmt.Println("oops, panic")
 				}
 			}()
 
-			done <- testFunc(data)
-		}()
-
-		select {
-		case err := <-done:
-			if err != nil {
-				println("done with Error", err.Error())
-			}
-			return
-		case p := <-panicChan:
-			println("got panic", p)
-		case <-ctx.Done():
-			println("got time out", ctx.Err())
-			return
-		}
-	})
-	if err != nil {
-		return err
-	}
-	defer antPool.Release()
-	wg := sync.WaitGroup{}
-
-	for i := 0; i < 10; i++ {
-		wg.Add(1)
-		err = antPool.Invoke(InputData{Index: i, Wg: &wg})
-		if err != nil {
-			println("antPool.Invoke", err)
-		}
+			defer wg.Done()
+			requestWork(context.Background(), index)
+		}(i)
 	}
 	wg.Wait()
-
-	println("All Done.")
-
-	return nil
-}
-
-func goStep(inData InputData) error {
-	outDataChan := make(chan int)
-	for i := 0; i < 2; i++ {
-		go func(cxt context.Context, in int) {
-
-			var outData int
-			outData = -1
-			defer func() {
-				println(inData.Index, in, "go func done")
-				outDataChan <- outData
-			}()
-
-			browser, err := rod_helper.NewBrowser("", true)
-			if err != nil {
-				println(inData.Index, in, "rod_helper.NewBrowser", err)
-				return
-			}
-			defer func() {
-				_ = browser.Close()
-				println(inData.Index, in, "browser closed")
-			}()
-			ontTime := false
-
-			for {
-				select {
-				case <-cxt.Done():
-					return
-				default:
-					if ontTime == true {
-						return
-					}
-					ontTime = true
-
-					page, err := rod_helper.NewPageNavigate(browser, "https://www.baidu.com", 5*time.Second, 5)
-					if err != nil {
-						println("NewPageNavigate time out", err)
-						return
-					}
-					page.MustSetUserAgent(&proto.NetworkSetUserAgentOverride{
-						UserAgent: random_useragent.RandomUserAgent(true),
-					})
-					err = page.WaitLoad()
-					time.Sleep(10 * time.Second)
-
-					outData = in
-				}
-			}
-
-		}(inData.Ctx, i)
-	}
-
-	countResult := 0
-	for {
-		select {
-		case <-inData.Ctx.Done():
-			// 超时退出
-			return nil
-		case v, ok := <-outDataChan:
-			if ok == true {
-				println(inData.Index, "outData ok", v)
-			} else {
-				println(inData.Index, "outData not ok", v)
-			}
-			countResult++
-			// 跳出,收到够反馈了
-			if countResult == 2 {
-				return nil
-			}
-		}
-	}
-}
-
-func oneStep(inData InputData) error {
-	browser, err := rod_helper.NewBrowser("", true)
-	if err != nil {
-		println(inData.Index, "rod_helper.NewBrowser", err)
-		return err
-	}
-	defer func() {
-		_ = browser.Close()
-		println(inData.Index, "browser closed")
-	}()
-	page, err := rod_helper.NewPageNavigate(browser, "https://www.baidu.com", 10*time.Second, 5)
-	if err != nil {
-		return err
-	}
-	page.MustSetUserAgent(&proto.NetworkSetUserAgentOverride{
-		UserAgent: random_useragent.RandomUserAgent(true),
-	})
-	err = page.WaitLoad()
-	time.Sleep(10 * time.Second)
-	return nil
-}
-
-type InputData struct {
-	Ctx   context.Context
-	Index int
-	Wg    *sync.WaitGroup
+	fmt.Println("elapsed:", time.Since(now))
+	time.Sleep(time.Second * 20)
+	fmt.Println("number of goroutines:", runtime.NumGoroutine())
 }

+ 2 - 8
TestCode/test_timeout_test.go

@@ -2,13 +2,7 @@ package TestCode
 
 import "testing"
 
-// 无需关注这个测试用例,这个方案暂时弃用
-func TestDownloadTest(t *testing.T) {
+func TestProcess(t *testing.T) {
 
-	//err := DownloadTest()
-	//if err != nil {
-	//	t.Fatal(err)
-	//}
-	//
-	//select {}
+	Process()
 }

+ 0 - 6
internal/downloader.go

@@ -661,9 +661,3 @@ func (d Downloader) writeSubFile2VideoPath(videoFileFullPath string, finalSubFil
 
 	return nil
 }
-
-type InputData struct {
-	OneVideoFullPath string
-	Index            int
-	Wg               *sync.WaitGroup
-}

+ 115 - 0
internal/pkg/task_control/task_control.go

@@ -0,0 +1,115 @@
+package task_control
+
+import (
+	"github.com/panjf2000/ants/v2"
+	"github.com/sirupsen/logrus"
+	"golang.org/x/net/context"
+	"sync"
+	"time"
+)
+
+type TaskControl struct {
+	pollName            string
+	antPoolBase         *ants.PoolWithFunc
+	wgBase              sync.WaitGroup
+	log                 *logrus.Logger
+	oneCtxTimeOutSecond int
+	bHold               bool
+	ctxFunc             func(ctx context.Context, inData interface{}) error
+}
+
+func NewTaskControl(pollName string, size int, oneCtxTimeOutSecond int, log *logrus.Logger) (*TaskControl, error) {
+
+	var err error
+	tc := TaskControl{}
+	tc.pollName = pollName
+	tc.oneCtxTimeOutSecond = oneCtxTimeOutSecond
+	tc.log = log
+	tc.antPoolBase, err = ants.NewPoolWithFunc(size, func(inData interface{}) {
+		tc.baseFuncHandler(inData)
+	})
+	if err != nil {
+		return nil, err
+	}
+	tc.wgBase = sync.WaitGroup{}
+	return &tc, nil
+}
+
+// SetCtxProcessFunc 设置后续需要用到的单个任务的 Func,注意,如果之前的任务没有完成,不应该再次调用函数。建议进行 Release 后,再次调用
+func (tc *TaskControl) SetCtxProcessFunc(pf func(ctx context.Context, inData interface{}) error) {
+	tc.ctxFunc = pf
+}
+
+// Invoke 向 SetCtxProcessFunc 设置的 Func 中提交数据处理
+func (tc *TaskControl) Invoke(inData InputData) error {
+	tc.wgBase.Add(1)
+	inData.Wg = &tc.wgBase
+	tc.log.Debugln("Index:", inData.Index, "Invoke wg.Add()")
+	err := tc.antPoolBase.Invoke(inData)
+	if err != nil {
+		// 如果这个执行有问题,那么就把 wg 的计数器减一
+		tc.log.Debugln("Index:", inData.Index, "Invoke Error wg.Done()")
+		tc.wgBase.Done()
+	}
+
+	return err
+}
+
+func (tc *TaskControl) baseFuncHandler(inData interface{}) {
+	data := inData.(InputData)
+	defer func() {
+		tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Done()")
+		data.Wg.Done()
+	}()
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(tc.oneCtxTimeOutSecond)*time.Second)
+	defer cancel()
+
+	done := make(chan error, 1)
+	panicChan := make(chan interface{}, 1)
+	go func(ctx context.Context) {
+		defer func() {
+			if p := recover(); p != nil {
+				panicChan <- p
+			}
+		}()
+
+		done <- tc.ctxFunc(ctx, inData)
+	}(ctx)
+
+	select {
+	case err := <-done:
+		if err != nil {
+			tc.log.Errorln(tc.pollName, "Index:", data.Index, "NewPoolWithFunc done with Error", err.Error())
+		}
+		return
+	case p := <-panicChan:
+		tc.log.Errorln(tc.pollName, "Index:", data.Index, "NewPoolWithFunc got panic", p)
+		return
+	case <-ctx.Done():
+		tc.log.Errorln(tc.pollName, "Index:", data.Index, "NewPoolWithFunc got time out", ctx.Err())
+		return
+	}
+}
+
+// Hold 自身进行阻塞,如果你是使用 Web 服务器,那么应该无需使用该方法
+func (tc *TaskControl) Hold() {
+	tc.bHold = true
+	tc.wgBase.Add(1)
+	tc.log.Debugln("Hold wg.Add()")
+	tc.wgBase.Wait()
+}
+
+func (tc *TaskControl) Release() {
+	if tc.bHold == true {
+		tc.log.Debugln("Release wg.Done()")
+		tc.wgBase.Done()
+	}
+	tc.log.Debugln("Release.Release")
+	tc.antPoolBase.Release()
+}
+
+type InputData struct {
+	OneVideoFullPath string
+	Index            int
+	Wg               *sync.WaitGroup
+}

+ 140 - 0
internal/pkg/task_control/task_control_test.go

@@ -0,0 +1,140 @@
+package task_control
+
+import (
+	"fmt"
+	"github.com/allanpk716/ChineseSubFinder/internal/pkg/log_helper"
+	"golang.org/x/net/context"
+	"testing"
+	"time"
+)
+
+func TestNewTaskControl(t *testing.T) {
+
+	// 不超时的情况
+	err := process(TimeTester{
+		ConcurrentCount:  2,
+		JobCount:         5,
+		TimeAfterRelease: 5,
+		OneJobWaitTime:   2,
+		OneJobTimeOut:    5,
+		SelfHold:         true,
+		DontRelease:      false,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = process(TimeTester{
+		ConcurrentCount:  2,
+		JobCount:         5,
+		TimeAfterRelease: 5,
+		OneJobWaitTime:   2,
+		OneJobTimeOut:    5,
+		SelfHold:         false,
+		DontRelease:      true,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = process(TimeTester{
+		ConcurrentCount:  2,
+		JobCount:         5,
+		TimeAfterRelease: 5,
+		OneJobWaitTime:   2,
+		OneJobTimeOut:    5,
+		SelfHold:         false,
+		DontRelease:      false,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+}
+
+func process(timeTester TimeTester) error {
+
+	OneJobWaitTime = timeTester.OneJobWaitTime
+
+	tc, err := NewTaskControl("TestPool", timeTester.ConcurrentCount, timeTester.OneJobTimeOut, log_helper.GetLogger())
+	if err != nil {
+		return err
+	}
+	tc.SetCtxProcessFunc(waitTimes)
+
+	for i := 0; i < timeTester.JobCount; i++ {
+		go func(index int) {
+			if index > 1 {
+				time.Sleep(10 * time.Second)
+			}
+			err := tc.Invoke(InputData{Index: index})
+			if err != nil {
+				fmt.Println("Index:", index, "Error", err)
+			}
+		}(i)
+	}
+
+	go func() {
+		if timeTester.DontRelease == true {
+			fmt.Println("dont Release")
+			return
+		}
+		fmt.Println("Release After 2 Second")
+		time.Sleep(time.Duration(timeTester.TimeAfterRelease) * time.Second)
+		tc.Release()
+	}()
+
+	fmt.Println("-------------------------------")
+	if timeTester.SelfHold == true {
+
+		fmt.Println("Start Hold")
+		tc.Hold()
+		fmt.Println("End Hold")
+	} else {
+
+		waitTime := timeTester.JobCount * timeTester.OneJobWaitTime
+		fmt.Printf("wait %ds start\n", waitTime)
+		time.Sleep(time.Duration(waitTime) * time.Second)
+		fmt.Printf("wait %ds end\n", waitTime)
+	}
+	fmt.Println("-------------------------------")
+
+	return nil
+}
+
+func waitTimes(ctx context.Context, inData interface{}) error {
+
+	phase0 := make(chan interface{}, 1)
+
+	index := inData.(InputData)
+
+	fmt.Println("Index:", index.Index, "Start 0")
+	time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
+	fmt.Println("Index:", index.Index, "End 0")
+
+	phase0 <- 1
+	select {
+	case <-ctx.Done():
+		{
+			fmt.Println("Index:", index.Index, "timeout 0")
+			return nil
+		}
+	case <-phase0:
+		break
+	}
+
+	fmt.Println("Index:", index.Index, "Start 1")
+	fmt.Println("Index:", index.Index, "End 1")
+
+	return nil
+}
+
+type TimeTester struct {
+	ConcurrentCount  int  // 并发数
+	JobCount         int  // 总任务数
+	TimeAfterRelease int  // 开始后等待多久执行 Release 操作
+	OneJobWaitTime   int  // 单个任务得耗时
+	OneJobTimeOut    int  // 单个任务的超时时间
+	SelfHold         bool // 是否需要自身的等待,如果使用了,那么一定需要 Release
+	DontRelease      bool // 是否需要主动执行 Release
+}
+
+var OneJobWaitTime int