1
0
Эх сурвалжийг харах

修复 task_control 单元测试问题

Signed-off-by: allan716 <[email protected]>
allan716 3 жил өмнө
parent
commit
1ba8af1fb5

+ 14 - 11
internal/pkg/task_control/task_control.go

@@ -82,32 +82,34 @@ func (tc *TaskControl) baseFuncHandler(inData interface{}) {
 
 	data := inData.(*TaskData)
 
-	defer func() {
-		tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Done()")
-		tc.wgBase.Done()
-	}()
+	// 如果已经执行 Release 则返回
+	tc.commonLock.Lock()
+	if tc.released == true {
+		tc.log.Debugln("Index:", data.Index, "released == true")
+		return
+	}
+	tc.commonLock.Unlock()
 
 	// 实际执行的时候
 	tc.wgBase.Add(1)
 	tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Add()")
 
+	tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Done()")
+	tc.wgBase.Done()
+
 	var ctx context.Context
 	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(tc.oneCtxTimeOutSecond)*time.Second)
 	defer func() {
 
 		// 那么对应的需要取消掉 map 中的记录
+		tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock() defer")
 		tc.cancelMapLock.Lock()
 		delete(tc.cancelMap, data.Index)
 		tc.cancelMapLock.Unlock()
+		tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock() defer")
 		cancel()
-	}()
 
-	// 如果已经执行 Release 则返回
-	tc.commonLock.Lock()
-	if tc.released == true {
-		return
-	}
-	tc.commonLock.Unlock()
+	}()
 
 	// 记录 cancel
 	tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock()")
@@ -277,6 +279,7 @@ type TaskData struct {
 	Index            int64
 	Status           TaskState // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
 	OneVideoFullPath string
+	DataEx           interface{}
 }
 
 type TaskState int

+ 22 - 16
internal/pkg/task_control/task_control_test.go

@@ -215,8 +215,8 @@ func TestTaskControl_Invoke(t *testing.T) {
 				TimeTester{PoolName: "15",
 					ConcurrentCount:  1,
 					JobCount:         5,
-					TimeAfterRelease: 4,
-					OneJobWaitTime:   3,
+					TimeAfterRelease: 3,
+					OneJobWaitTime:   2,
 					OneJobTimeOut:    4,
 					SelfHold:         true,
 					NeedRelease:      true}},
@@ -275,10 +275,6 @@ func TestTaskControl_Invoke(t *testing.T) {
 
 func process(timeTester TimeTester) ([]int64, []int64, []int64, error) {
 
-	OneJobWaitTime = timeTester.OneJobWaitTime
-	WantPanic = timeTester.WantPanic
-	IndexOverThanAddMoreTime = int64(timeTester.IndexOverThanAddMoreTime)
-
 	tc, err := NewTaskControl(timeTester.PoolName, timeTester.ConcurrentCount, timeTester.OneJobTimeOut, log_helper.GetLogger())
 	if err != nil {
 		return nil, nil, nil, err
@@ -287,7 +283,12 @@ func process(timeTester TimeTester) ([]int64, []int64, []int64, error) {
 
 	for i := 0; i < timeTester.JobCount; i++ {
 		go func(index int64) {
-			err := tc.Invoke(&TaskData{Index: index})
+			err := tc.Invoke(&TaskData{Index: index,
+				DataEx: DataEx{
+					OneJobWaitTime:           timeTester.OneJobWaitTime,
+					WantPanic:                timeTester.WantPanic,
+					IndexOverThanAddMoreTime: int64(timeTester.IndexOverThanAddMoreTime),
+				}})
 			if err != nil {
 				tc.log.Errorln("Index:", index, "Error", err)
 			}
@@ -328,19 +329,22 @@ func waitTimes(ctx context.Context, inData interface{}) error {
 
 	phase0 := make(chan interface{}, 1)
 	index := inData.(*TaskData)
-	if WantPanic == true {
+
+	dataEx := index.DataEx.(DataEx)
+
+	if dataEx.WantPanic == true {
 		panic("want panic")
 	}
 
 	go func() {
 		fmt.Println("Index:", index.Index, "Start 0")
-		if IndexOverThanAddMoreTime == 0 {
-			time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
+		if dataEx.IndexOverThanAddMoreTime == 0 {
+			time.Sleep(time.Duration(dataEx.OneJobWaitTime) * time.Second)
 		} else {
-			if index.Index > IndexOverThanAddMoreTime {
-				time.Sleep(time.Duration(OneJobWaitTime+10) * time.Second)
+			if index.Index > dataEx.IndexOverThanAddMoreTime {
+				time.Sleep(time.Duration(dataEx.OneJobWaitTime+10) * time.Second)
 			} else {
-				time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
+				time.Sleep(time.Duration(dataEx.OneJobWaitTime) * time.Second)
 			}
 		}
 		phase0 <- 1
@@ -376,6 +380,8 @@ type TimeTester struct {
 	IndexOverThanAddMoreTime int    // waitTimes函数中某个 Index 之后都会在等待处理上多加延时以便触发超时逻辑
 }
 
-var OneJobWaitTime int
-var WantPanic bool
-var IndexOverThanAddMoreTime int64
+type DataEx struct {
+	OneJobWaitTime           int
+	WantPanic                bool
+	IndexOverThanAddMoreTime int64
+}