123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- package task_control
- import (
- "errors"
- "fmt"
- "github.com/allanpk716/ChineseSubFinder/internal/pkg/global_value"
- "github.com/allanpk716/ChineseSubFinder/internal/pkg/log_helper"
- "github.com/sirupsen/logrus"
- "golang.org/x/net/context"
- "sync"
- "testing"
- "time"
- )
- func TestTaskControl_Invoke(t *testing.T) {
- type args struct {
- timeTester TimeTester
- }
- tests := []struct {
- name string
- args args
- successProcessCount int
- wantErr bool
- }{
- // 不超时的情况
- {
- name: "00", args: args{
- TimeTester{PoolName: "00",
- ConcurrentCount: 1,
- JobCount: 5,
- OneJobWaitTime: 1,
- OneJobTimeOut: 2,
- }},
- successProcessCount: 5,
- },
- {
- name: "01", args: args{
- TimeTester{PoolName: "01",
- ConcurrentCount: 2,
- JobCount: 5,
- OneJobWaitTime: 1,
- OneJobTimeOut: 2,
- }},
- successProcessCount: 5,
- },
- {
- name: "02", args: args{
- TimeTester{PoolName: "02",
- ConcurrentCount: 3,
- JobCount: 5,
- OneJobWaitTime: 1,
- OneJobTimeOut: 2,
- }},
- successProcessCount: 5,
- },
- // 超时的情况
- {
- name: "03", args: args{
- TimeTester{PoolName: "03",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 1,
- NeedRelease: true}},
- successProcessCount: 0,
- },
- {
- name: "04", args: args{
- TimeTester{PoolName: "04",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 1,
- NeedRelease: false}},
- successProcessCount: 0,
- },
- {
- name: "05", args: args{
- TimeTester{PoolName: "05",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 1,
- NeedRelease: true}},
- successProcessCount: 0,
- },
- // 主动触发 painic
- {
- name: "06", args: args{
- TimeTester{PoolName: "06",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 1,
- NeedRelease: true,
- WantPanic: true}},
- successProcessCount: 0,
- },
- {
- name: "07", args: args{
- TimeTester{PoolName: "07",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 1,
- NeedRelease: false,
- WantPanic: true}},
- successProcessCount: 0,
- },
- {
- name: "08", args: args{
- TimeTester{PoolName: "08",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 1,
- NeedRelease: true,
- WantPanic: true}},
- successProcessCount: 0,
- },
- // 部分超时
- {
- name: "09", args: args{
- TimeTester{PoolName: "09",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 3,
- NeedRelease: true,
- IndexOverThanAddMoreTime: 2}},
- successProcessCount: 3,
- },
- {
- name: "10", args: args{
- TimeTester{PoolName: "10",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 3,
- NeedRelease: false,
- IndexOverThanAddMoreTime: 2}},
- successProcessCount: 3,
- },
- {
- name: "11", args: args{
- TimeTester{PoolName: "11",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 5,
- OneJobWaitTime: 2,
- OneJobTimeOut: 3,
- NeedRelease: true,
- IndexOverThanAddMoreTime: 3}},
- successProcessCount: 4,
- },
- // 使用 Release 取消
- {
- name: "12", args: args{
- TimeTester{PoolName: "12",
- ConcurrentCount: 1,
- JobCount: 5,
- TimeAfterRelease: 2,
- OneJobWaitTime: 3,
- OneJobTimeOut: 4,
- NeedRelease: true}},
- successProcessCount: 0,
- },
- {
- name: "13", args: args{
- TimeTester{PoolName: "13",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 2,
- OneJobWaitTime: 3,
- OneJobTimeOut: 4,
- NeedRelease: true}},
- successProcessCount: 0,
- },
- {
- name: "14", args: args{
- TimeTester{PoolName: "14",
- ConcurrentCount: 2,
- JobCount: 5,
- TimeAfterRelease: 4,
- OneJobWaitTime: 3,
- OneJobTimeOut: 4,
- NeedRelease: true}},
- successProcessCount: 2,
- },
- {
- name: "15", args: args{
- TimeTester{PoolName: "15",
- ConcurrentCount: 1,
- JobCount: 5,
- TimeAfterRelease: 3,
- OneJobWaitTime: 2,
- OneJobTimeOut: 4,
- NeedRelease: true}},
- successProcessCount: 1,
- },
- {
- name: "16", args: args{
- TimeTester{PoolName: "16",
- ConcurrentCount: 3,
- JobCount: 5,
- TimeAfterRelease: 4,
- OneJobWaitTime: 3,
- OneJobTimeOut: 4,
- NeedRelease: true}},
- successProcessCount: 3,
- },
- {
- name: "17", args: args{
- TimeTester{PoolName: "17",
- ConcurrentCount: 4,
- JobCount: 5,
- TimeAfterRelease: 4,
- OneJobWaitTime: 3,
- OneJobTimeOut: 4,
- NeedRelease: true}},
- successProcessCount: 4,
- },
- {
- name: "18", args: args{
- TimeTester{PoolName: "18",
- ConcurrentCount: 5,
- JobCount: 5,
- TimeAfterRelease: 4,
- OneJobWaitTime: 3,
- OneJobTimeOut: 4,
- NeedRelease: true}},
- successProcessCount: 5,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- successList, _, _, err := process(tt.name, tt.args.timeTester)
- if err != nil {
- t.Fatal(err)
- }
- if tt.successProcessCount != len(successList) {
- t.Fatal("want successProcessCount =", tt.successProcessCount, "now =", len(successList))
- }
- })
- }
- }
- func process(name string, timeTester TimeTester) ([]int, []int, []int, error) {
- once := sync.Once{}
- tc, err := NewTaskControl(timeTester.ConcurrentCount, log_helper.NewLogHelper(name, global_value.ConfigRootDirFPath(), logrus.DebugLevel, time.Duration(7*24)*time.Hour, time.Duration(24)*time.Hour))
- if err != nil {
- return nil, nil, nil, err
- }
- tc.SetCtxProcessFunc(timeTester.PoolName, waitTimes, timeTester.OneJobTimeOut)
- for i := 0; i < timeTester.JobCount; i++ {
- once.Do(func() {
- go func() {
- if timeTester.NeedRelease == false {
- tc.log.Infoln("Do not need Release")
- return
- }
- tc.log.Infoln("Release After", timeTester.TimeAfterRelease, "Second")
- time.Sleep(time.Duration(timeTester.TimeAfterRelease) * time.Second)
- tc.Release()
- }()
- })
- err := tc.Invoke(&TaskData{Index: i,
- DataEx: DataEx{
- OneJobWaitTime: timeTester.OneJobWaitTime,
- WantPanic: timeTester.WantPanic,
- IndexOverThanAddMoreTime: timeTester.IndexOverThanAddMoreTime,
- }})
- if err != nil {
- tc.log.Errorln("Index:", i, "Error", err)
- }
- }
- tc.Hold()
- fmt.Println("-------------------------------")
- // 获取提前终止的计数器以及完成的计数器
- successList, noExecuteList, errorList := tc.GetExecuteInfo()
- return successList, noExecuteList, errorList, nil
- }
- func waitTimes(ctx context.Context, inData interface{}) error {
- phase0 := make(chan interface{}, 1)
- index := inData.(*TaskData)
- dataEx := index.DataEx.(DataEx)
- if dataEx.WantPanic == true {
- panic("want panic")
- }
- go func() {
- fmt.Println("Index:", index.Index, "Start 0")
- if dataEx.IndexOverThanAddMoreTime == 0 {
- time.Sleep(time.Duration(dataEx.OneJobWaitTime) * time.Second)
- } else {
- if index.Index > dataEx.IndexOverThanAddMoreTime {
- time.Sleep(time.Duration(dataEx.OneJobWaitTime+10) * time.Second)
- } else {
- time.Sleep(time.Duration(dataEx.OneJobWaitTime) * time.Second)
- }
- }
- phase0 <- 1
- fmt.Println("Index:", index.Index, "End 0")
- }()
- select {
- case <-ctx.Done():
- {
- fmt.Println("Index:", index.Index, "timeout 0")
- return errors.New("timeout jump")
- }
- case <-phase0:
- break
- }
- fmt.Println("Index:", index.Index, "Start 1")
- fmt.Println("Index:", index.Index, "End 1")
- return nil
- }
- type TimeTester struct {
- PoolName string // 名称
- ConcurrentCount int // 并发数
- JobCount int // 总任务数
- TimeAfterRelease int // 开始后等待多久执行 Release 操作
- OneJobWaitTime int // 单个任务得耗时
- OneJobTimeOut int // 单个任务的超时时间
- NeedRelease bool // 是否需要主动执行 Release
- WantPanic bool // 触发 panic
- IndexOverThanAddMoreTime int // waitTimes函数中某个 Index 之后都会在等待处理上多加延时以便触发超时逻辑
- }
- type DataEx struct {
- OneJobWaitTime int
- WantPanic bool
- IndexOverThanAddMoreTime int
- }
|