Browse Source

完成任务管理基础功能实现

Signed-off-by: allan716 <[email protected]>
allan716 3 years ago
parent
commit
be651e4a7c
3 changed files with 466 additions and 87 deletions
  1. 1 0
      .gitignore
  2. 195 22
      internal/pkg/task_control/task_control.go
  3. 270 65
      internal/pkg/task_control/task_control_test.go

+ 1 - 0
.gitignore

@@ -62,3 +62,4 @@ TestData/
 /internal/backend/Logs
 /internal/backend
 /internal/pkg/task_control/Logs/*.log
+/internal/pkg/task_control/opendebuglog

+ 195 - 22
internal/pkg/task_control/task_control.go

@@ -15,7 +15,20 @@ type TaskControl struct {
 	log                 *logrus.Logger
 	oneCtxTimeOutSecond int
 	bHold               bool
-	ctxFunc             func(ctx context.Context, inData interface{}) error
+	released            bool
+	// 传入的 func
+	ctxFunc func(ctx context.Context, inData interface{}) error
+	// 输入结构锁
+	inputDataMap     map[int64]*TaskData
+	inputDataMapLock sync.RWMutex
+	// 结束锁
+	cancelMap     map[int64]context.CancelFunc
+	cancelMapLock sync.RWMutex
+	// 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
+	executeInfoMap     map[int64]TaskState
+	executeInfoMapLock sync.RWMutex
+
+	commonLock sync.RWMutex
 }
 
 func NewTaskControl(pollName string, size int, oneCtxTimeOutSecond int, log *logrus.Logger) (*TaskControl, error) {
@@ -25,6 +38,9 @@ func NewTaskControl(pollName string, size int, oneCtxTimeOutSecond int, log *log
 	tc.pollName = pollName
 	tc.oneCtxTimeOutSecond = oneCtxTimeOutSecond
 	tc.log = log
+	tc.inputDataMap = make(map[int64]*TaskData, 0)
+	tc.cancelMap = make(map[int64]context.CancelFunc, 0)
+	tc.executeInfoMap = make(map[int64]TaskState, 0)
 	tc.antPoolBase, err = ants.NewPoolWithFunc(size, func(inData interface{}) {
 		tc.baseFuncHandler(inData)
 	})
@@ -41,28 +57,64 @@ func (tc *TaskControl) SetCtxProcessFunc(pf func(ctx context.Context, inData int
 }
 
 // Invoke 向 SetCtxProcessFunc 设置的 Func 中提交数据处理
-func (tc *TaskControl) Invoke(inData InputData) error {
-	tc.wgBase.Add(1)
-	inData.Wg = &tc.wgBase
-	tc.log.Debugln("Index:", inData.Index, "Invoke wg.Add()")
+func (tc *TaskControl) Invoke(inData *TaskData) error {
+
+	// 需要先记录有那些 ID 进来,然后再记录那些是完整执行的,以及出错执行的
+	tc.setExecuteStatus(inData.Index, NoExecute)
+
 	err := tc.antPoolBase.Invoke(inData)
 	if err != nil {
-		// 如果这个执行有问题,那么就把 wg 的计数器减一
-		tc.log.Debugln("Index:", inData.Index, "Invoke Error wg.Done()")
-		tc.wgBase.Done()
+		tc.setTaskDataStatus(inData, Error)
+		tc.setExecuteStatus(inData.Index, Error)
+		return err
 	}
 
-	return err
+	tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap Lock()")
+	tc.inputDataMapLock.Lock()
+	tc.inputDataMap[inData.Index] = inData
+	tc.inputDataMapLock.Unlock()
+	tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap UnLock()")
+
+	return nil
 }
 
 func (tc *TaskControl) baseFuncHandler(inData interface{}) {
-	data := inData.(InputData)
+
+	data := inData.(*TaskData)
+
 	defer func() {
 		tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Done()")
-		data.Wg.Done()
+		tc.wgBase.Done()
 	}()
+
+	// 实际执行的时候
+	tc.wgBase.Add(1)
+	tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Add()")
+
+	var ctx context.Context
 	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(tc.oneCtxTimeOutSecond)*time.Second)
-	defer cancel()
+	defer func() {
+
+		// 那么对应的需要取消掉 map 中的记录
+		tc.cancelMapLock.Lock()
+		delete(tc.cancelMap, data.Index)
+		tc.cancelMapLock.Unlock()
+		cancel()
+	}()
+
+	// 如果已经执行 Release 则返回
+	tc.commonLock.RLock()
+	if tc.released == true {
+		return
+	}
+	tc.commonLock.RUnlock()
+
+	// 记录 cancel
+	tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock()")
+	tc.cancelMapLock.Lock()
+	tc.cancelMap[data.Index] = cancel
+	tc.cancelMapLock.Unlock()
+	tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock()")
 
 	done := make(chan error, 1)
 	panicChan := make(chan interface{}, 1)
@@ -79,37 +131,158 @@ func (tc *TaskControl) baseFuncHandler(inData interface{}) {
 	select {
 	case err := <-done:
 		if err != nil {
-			tc.log.Errorln(tc.pollName, "Index:", data.Index, "NewPoolWithFunc done with Error", err.Error())
+
+			tc.setTaskDataStatus(data, Error)
+			tc.setExecuteStatus(data.Index, Error)
+			tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc done with Error", err.Error())
+		} else {
+			tc.setTaskDataStatus(data, Success)
+			tc.setExecuteStatus(data.Index, Success)
 		}
 		return
 	case p := <-panicChan:
-		tc.log.Errorln(tc.pollName, "Index:", data.Index, "NewPoolWithFunc got panic", p)
+
+		tc.setTaskDataStatus(data, Error)
+		tc.setExecuteStatus(data.Index, Error)
+		tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got panic", p)
 		return
 	case <-ctx.Done():
-		tc.log.Errorln(tc.pollName, "Index:", data.Index, "NewPoolWithFunc got time out", ctx.Err())
+
+		tc.setTaskDataStatus(data, Error)
+		tc.setExecuteStatus(data.Index, Error)
+		tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got time out", ctx.Err())
 		return
 	}
 }
 
 // Hold 自身进行阻塞,如果你是使用 Web 服务器,那么应该无需使用该方法
 func (tc *TaskControl) Hold() {
+	tc.commonLock.Lock()
 	tc.bHold = true
+	tc.commonLock.Unlock()
 	tc.wgBase.Add(1)
 	tc.log.Debugln("Hold wg.Add()")
 	tc.wgBase.Wait()
 }
 
 func (tc *TaskControl) Release() {
-	if tc.bHold == true {
-		tc.log.Debugln("Release wg.Done()")
-		tc.wgBase.Done()
-	}
+
+	tc.log.Debugln("Release Start")
+
+	tc.commonLock.Lock()
+	tc.released = true
+	tc.commonLock.Unlock()
+
 	tc.log.Debugln("Release.Release")
 	tc.antPoolBase.Release()
+
+	tc.log.Debugln("Release cancel() Start")
+	// 统一 cancel cancel
+	tc.cancelMapLock.Lock()
+	for i, cancelFunc := range tc.cancelMap {
+		tc.log.Debugln("Release cancel() Index:", i)
+		cancelFunc()
+	}
+	tc.cancelMapLock.Unlock()
+
+	tc.log.Debugln("Release cancel() End")
+
+	var bHold bool
+	tc.commonLock.RLock()
+	bHold = tc.bHold
+	tc.commonLock.RUnlock()
+	if bHold == true {
+		tc.log.Debugln("Release Hold wg.Done()")
+		tc.wgBase.Done()
+	}
+
+	tc.log.Debugln("Release End")
+}
+
+func (tc *TaskControl) Reboot() {
+
+	var release bool
+	tc.commonLock.RLock()
+	release = tc.released
+	tc.commonLock.RUnlock()
+
+	if release == true {
+		// 如果被释放了,那么第一次 Invoke 的时候需要重启这个 pool
+		tc.antPoolBase.Reboot()
+		// 需要把缓存的 map 清理掉
+		tc.inputDataMapLock.Lock()
+		tc.inputDataMap = make(map[int64]*TaskData, 0)
+		tc.inputDataMapLock.Unlock()
+
+		tc.cancelMapLock.Lock()
+		tc.cancelMap = make(map[int64]context.CancelFunc, 0)
+		tc.cancelMapLock.Unlock()
+
+		tc.executeInfoMapLock.Lock()
+		tc.executeInfoMap = make(map[int64]TaskState, 0)
+		tc.executeInfoMapLock.Unlock()
+
+		tc.commonLock.Lock()
+		tc.released = false
+		tc.commonLock.Unlock()
+	}
+}
+
+// GetExecuteInfo 获取 所有 Invoke 的执行情况,需要在 下一次 Invoke 拿走,否则会清空
+// 成功执行的、未执行的、执行错误(超时)的
+func (tc *TaskControl) GetExecuteInfo() ([]int64, []int64, []int64) {
+
+	successList := make([]int64, 0)
+	noExecuteList := make([]int64, 0)
+	errorList := make([]int64, 0)
+
+	tc.executeInfoMapLock.RLock()
+
+	for i, state := range tc.executeInfoMap {
+		if state == Success {
+			successList = append(successList, i)
+		} else if state == NoExecute {
+			noExecuteList = append(noExecuteList, i)
+		} else if state == Error {
+			errorList = append(errorList, i)
+		}
+	}
+
+	tc.executeInfoMapLock.RUnlock()
+
+	return successList, noExecuteList, errorList
+}
+
+// GetResult 获取 TaskData 的反馈值,需要在 下一次 Invoke 拿走,否则会清空
+func (tc *TaskControl) GetResult(index int64) (bool, *TaskData) {
+	tc.inputDataMapLock.RLock()
+	value, found := tc.inputDataMap[index]
+	tc.inputDataMapLock.RUnlock()
+	return found, value
 }
 
-type InputData struct {
+func (tc *TaskControl) setExecuteStatus(index int64, status TaskState) {
+	tc.executeInfoMapLock.Lock()
+	tc.executeInfoMap[index] = status
+	tc.executeInfoMapLock.Unlock()
+}
+
+func (tc *TaskControl) setTaskDataStatus(taskData *TaskData, status TaskState) {
+	tc.inputDataMapLock.Lock()
+	taskData.Status = status
+	tc.inputDataMapLock.Unlock()
+}
+
+type TaskData struct {
+	Index            int64
+	Status           TaskState // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
 	OneVideoFullPath string
-	Index            int
-	Wg               *sync.WaitGroup
 }
+
+type TaskState int
+
+const (
+	Success TaskState = iota
+	NoExecute
+	Error
+)

+ 270 - 65
internal/pkg/task_control/task_control_test.go

@@ -1,6 +1,7 @@
 package task_control
 
 import (
+	"errors"
 	"fmt"
 	"github.com/allanpk716/ChineseSubFinder/internal/pkg/log_helper"
 	"golang.org/x/net/context"
@@ -8,76 +9,261 @@ import (
 	"time"
 )
 
-func TestNewTaskControl(t *testing.T) {
-
-	// 不超时的情况
-	err := process(TimeTester{
-		ConcurrentCount:  2,
-		JobCount:         5,
-		TimeAfterRelease: 5,
-		OneJobWaitTime:   2,
-		OneJobTimeOut:    5,
-		SelfHold:         true,
-		DontRelease:      false,
-	})
-	if err != nil {
-		t.Fatal(err)
-	}
-	err = process(TimeTester{
-		ConcurrentCount:  2,
-		JobCount:         5,
-		TimeAfterRelease: 5,
-		OneJobWaitTime:   2,
-		OneJobTimeOut:    5,
-		SelfHold:         false,
-		DontRelease:      true,
-	})
-	if err != nil {
-		t.Fatal(err)
+func TestTaskControl_Invoke(t *testing.T) {
+	type args struct {
+		timeTester TimeTester
 	}
-	err = process(TimeTester{
-		ConcurrentCount:  2,
-		JobCount:         5,
-		TimeAfterRelease: 5,
-		OneJobWaitTime:   2,
-		OneJobTimeOut:    5,
-		SelfHold:         false,
-		DontRelease:      false,
-	})
-	if err != nil {
-		t.Fatal(err)
+	tests := []struct {
+		name                string
+		args                args
+		successProcessCount int
+		wantErr             bool
+	}{
+		// 不超时的情况
+		{
+			name: "00", args: args{
+				TimeTester{PoolName: "00",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   1,
+					OneJobTimeOut:    2,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 5,
+		},
+		{
+			name: "01", args: args{
+				TimeTester{PoolName: "01",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   1,
+					OneJobTimeOut:    2,
+					SelfHold:         false,
+					NeedRelease:      false}},
+			successProcessCount: 5,
+		},
+		{
+			name: "02", args: args{
+				TimeTester{PoolName: "02",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   1,
+					OneJobTimeOut:    2,
+					SelfHold:         false,
+					NeedRelease:      true}},
+			successProcessCount: 5,
+		},
+		// 超时的情况
+		{
+			name: "03", args: args{
+				TimeTester{PoolName: "03",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   2,
+					OneJobTimeOut:    1,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 0,
+		},
+		{
+			name: "04", args: args{
+				TimeTester{PoolName: "04",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   2,
+					OneJobTimeOut:    1,
+					SelfHold:         false,
+					NeedRelease:      false}},
+			successProcessCount: 0,
+		},
+		{
+			name: "05", args: args{
+				TimeTester{PoolName: "05",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   2,
+					OneJobTimeOut:    1,
+					SelfHold:         false,
+					NeedRelease:      true}},
+			successProcessCount: 0,
+		},
+		// 主动触发 painic
+		{
+			name: "06", args: args{
+				TimeTester{PoolName: "06",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   2,
+					OneJobTimeOut:    1,
+					SelfHold:         true,
+					NeedRelease:      true,
+					WantPanic:        true}},
+			successProcessCount: 0,
+		},
+		{
+			name: "07", args: args{
+				TimeTester{PoolName: "07",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   2,
+					OneJobTimeOut:    1,
+					SelfHold:         false,
+					NeedRelease:      false,
+					WantPanic:        true}},
+			successProcessCount: 0,
+		},
+		{
+			name: "08", args: args{
+				TimeTester{PoolName: "08",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 5,
+					OneJobWaitTime:   2,
+					OneJobTimeOut:    1,
+					SelfHold:         false,
+					NeedRelease:      true,
+					WantPanic:        true}},
+			successProcessCount: 0,
+		},
+		// 部分超时
+		{
+			name: "09", args: args{
+				TimeTester{PoolName: "09",
+					ConcurrentCount:          2,
+					JobCount:                 5,
+					TimeAfterRelease:         5,
+					OneJobWaitTime:           2,
+					OneJobTimeOut:            3,
+					SelfHold:                 true,
+					NeedRelease:              true,
+					IndexOverThanAddMoreTime: 2}},
+			successProcessCount: 3,
+		},
+		{
+			name: "10", args: args{
+				TimeTester{PoolName: "10",
+					ConcurrentCount:          2,
+					JobCount:                 5,
+					TimeAfterRelease:         5,
+					OneJobWaitTime:           2,
+					OneJobTimeOut:            3,
+					SelfHold:                 false,
+					NeedRelease:              false,
+					IndexOverThanAddMoreTime: 2}},
+			successProcessCount: 3,
+		},
+		{
+			name: "11", args: args{
+				TimeTester{PoolName: "11",
+					ConcurrentCount:          2,
+					JobCount:                 5,
+					TimeAfterRelease:         5,
+					OneJobWaitTime:           2,
+					OneJobTimeOut:            3,
+					SelfHold:                 false,
+					NeedRelease:              true,
+					IndexOverThanAddMoreTime: 3}},
+			successProcessCount: 4,
+		},
+		// 使用 Release 取消
+		{
+			name: "12", args: args{
+				TimeTester{PoolName: "12",
+					ConcurrentCount:  1,
+					JobCount:         5,
+					TimeAfterRelease: 2,
+					OneJobWaitTime:   3,
+					OneJobTimeOut:    4,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 0,
+		},
+		{
+			name: "13", args: args{
+				TimeTester{PoolName: "13",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 2,
+					OneJobWaitTime:   3,
+					OneJobTimeOut:    4,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 0,
+		},
+		{
+			name: "14", args: args{
+				TimeTester{PoolName: "14",
+					ConcurrentCount:  2,
+					JobCount:         5,
+					TimeAfterRelease: 4,
+					OneJobWaitTime:   3,
+					OneJobTimeOut:    4,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 2,
+		},
+		{
+			name: "15", args: args{
+				TimeTester{PoolName: "15",
+					ConcurrentCount:  1,
+					JobCount:         5,
+					TimeAfterRelease: 4,
+					OneJobWaitTime:   3,
+					OneJobTimeOut:    4,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 1,
+		},
 	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			successList, _, _, err := process(tt.args.timeTester)
+			if err != nil {
+				t.Fatal(err)
+			}
 
+			if tt.successProcessCount != len(successList) {
+				t.Fatal("want successProcessCount =", tt.successProcessCount, "now =", len(successList))
+			}
+		})
+	}
 }
 
-func process(timeTester TimeTester) error {
+func process(timeTester TimeTester) ([]int64, []int64, []int64, error) {
 
 	OneJobWaitTime = timeTester.OneJobWaitTime
+	WantPanic = timeTester.WantPanic
+	IndexOverThanAddMoreTime = int64(timeTester.IndexOverThanAddMoreTime)
 
-	tc, err := NewTaskControl("TestPool", timeTester.ConcurrentCount, timeTester.OneJobTimeOut, log_helper.GetLogger())
+	tc, err := NewTaskControl(timeTester.PoolName, timeTester.ConcurrentCount, timeTester.OneJobTimeOut, log_helper.GetLogger())
 	if err != nil {
-		return err
+		return nil, nil, nil, err
 	}
 	tc.SetCtxProcessFunc(waitTimes)
 
 	for i := 0; i < timeTester.JobCount; i++ {
-		go func(index int) {
-			if index > 1 {
-				time.Sleep(10 * time.Second)
-			}
-			err := tc.Invoke(InputData{Index: index})
+		go func(index int64) {
+			err := tc.Invoke(&TaskData{Index: index})
 			if err != nil {
-				fmt.Println("Index:", index, "Error", err)
+				tc.log.Errorln("Index:", index, "Error", err)
 			}
-		}(i)
+		}(int64(i))
 	}
 
 	go func() {
-		if timeTester.DontRelease == true {
-			fmt.Println("dont Release")
+		if timeTester.NeedRelease == false {
+			tc.log.Infoln("Do not need Release")
 			return
 		}
-		fmt.Println("Release After 2 Second")
+		tc.log.Infoln("Release After", timeTester.TimeAfterRelease, "Second")
 		time.Sleep(time.Duration(timeTester.TimeAfterRelease) * time.Second)
 		tc.Release()
 	}()
@@ -97,25 +283,39 @@ func process(timeTester TimeTester) error {
 	}
 	fmt.Println("-------------------------------")
 
-	return nil
+	// 获取提前终止的计数器以及完成的计数器
+	successList, noExecuteList, errorList := tc.GetExecuteInfo()
+	return successList, noExecuteList, errorList, nil
 }
 
 func waitTimes(ctx context.Context, inData interface{}) error {
 
 	phase0 := make(chan interface{}, 1)
+	index := inData.(*TaskData)
+	if WantPanic == true {
+		panic("want panic")
+	}
 
-	index := inData.(InputData)
-
-	fmt.Println("Index:", index.Index, "Start 0")
-	time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
-	fmt.Println("Index:", index.Index, "End 0")
+	go func() {
+		fmt.Println("Index:", index.Index, "Start 0")
+		if IndexOverThanAddMoreTime == 0 {
+			time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
+		} else {
+			if index.Index > IndexOverThanAddMoreTime {
+				time.Sleep(time.Duration(OneJobWaitTime+10) * time.Second)
+			} else {
+				time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
+			}
+		}
+		phase0 <- 1
+		fmt.Println("Index:", index.Index, "End 0")
+	}()
 
-	phase0 <- 1
 	select {
 	case <-ctx.Done():
 		{
 			fmt.Println("Index:", index.Index, "timeout 0")
-			return nil
+			return errors.New("timeout jump")
 		}
 	case <-phase0:
 		break
@@ -128,13 +328,18 @@ func waitTimes(ctx context.Context, inData interface{}) error {
 }
 
 type TimeTester struct {
-	ConcurrentCount  int  // 并发数
-	JobCount         int  // 总任务数
-	TimeAfterRelease int  // 开始后等待多久执行 Release 操作
-	OneJobWaitTime   int  // 单个任务得耗时
-	OneJobTimeOut    int  // 单个任务的超时时间
-	SelfHold         bool // 是否需要自身的等待,如果使用了,那么一定需要 Release
-	DontRelease      bool // 是否需要主动执行 Release
+	PoolName                 string // 名称
+	ConcurrentCount          int    // 并发数
+	JobCount                 int    // 总任务数
+	TimeAfterRelease         int    // 开始后等待多久执行 Release 操作
+	OneJobWaitTime           int    // 单个任务得耗时
+	OneJobTimeOut            int    // 单个任务的超时时间
+	SelfHold                 bool   // 是否需要自身的等待,如果使用了,那么一定需要 Release
+	NeedRelease              bool   // 是否需要主动执行 Release
+	WantPanic                bool   // 触发 panic
+	IndexOverThanAddMoreTime int    // waitTimes函数中某个 Index 之后都会在等待处理上多加延时以便触发超时逻辑
 }
 
 var OneJobWaitTime int
+var WantPanic bool
+var IndexOverThanAddMoreTime int64