Browse Source

修复 task_control 锁的问题

Signed-off-by: allan716 <[email protected]>
allan716 3 years ago
parent
commit
66317e77ed

+ 14 - 14
internal/pkg/task_control/task_control.go

@@ -20,15 +20,15 @@ type TaskControl struct {
 	ctxFunc func(ctx context.Context, inData interface{}) error
 	// 输入结构锁
 	inputDataMap     map[int64]*TaskData
-	inputDataMapLock sync.RWMutex
+	inputDataMapLock sync.Mutex
 	// 结束锁
 	cancelMap     map[int64]context.CancelFunc
-	cancelMapLock sync.RWMutex
+	cancelMapLock sync.Mutex
 	// 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
 	executeInfoMap     map[int64]TaskState
-	executeInfoMapLock sync.RWMutex
+	executeInfoMapLock sync.Mutex
 
-	commonLock sync.RWMutex
+	commonLock sync.Mutex
 }
 
 func NewTaskControl(pollName string, size int, oneCtxTimeOutSecond int, log *logrus.Logger) (*TaskControl, error) {
@@ -103,11 +103,11 @@ func (tc *TaskControl) baseFuncHandler(inData interface{}) {
 	}()
 
 	// 如果已经执行 Release 则返回
-	tc.commonLock.RLock()
+	tc.commonLock.Lock()
 	if tc.released == true {
 		return
 	}
-	tc.commonLock.RUnlock()
+	tc.commonLock.Unlock()
 
 	// 记录 cancel
 	tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock()")
@@ -188,9 +188,9 @@ func (tc *TaskControl) Release() {
 	tc.log.Debugln("Release cancel() End")
 
 	var bHold bool
-	tc.commonLock.RLock()
+	tc.commonLock.Lock()
 	bHold = tc.bHold
-	tc.commonLock.RUnlock()
+	tc.commonLock.Unlock()
 	if bHold == true {
 		tc.log.Debugln("Release Hold wg.Done()")
 		tc.wgBase.Done()
@@ -202,9 +202,9 @@ func (tc *TaskControl) Release() {
 func (tc *TaskControl) Reboot() {
 
 	var release bool
-	tc.commonLock.RLock()
+	tc.commonLock.Lock()
 	release = tc.released
-	tc.commonLock.RUnlock()
+	tc.commonLock.Unlock()
 
 	if release == true {
 		// 如果被释放了,那么第一次 Invoke 的时候需要重启这个 pool
@@ -236,7 +236,7 @@ func (tc *TaskControl) GetExecuteInfo() ([]int64, []int64, []int64) {
 	noExecuteList := make([]int64, 0)
 	errorList := make([]int64, 0)
 
-	tc.executeInfoMapLock.RLock()
+	tc.executeInfoMapLock.Lock()
 
 	for i, state := range tc.executeInfoMap {
 		if state == Success {
@@ -248,16 +248,16 @@ func (tc *TaskControl) GetExecuteInfo() ([]int64, []int64, []int64) {
 		}
 	}
 
-	tc.executeInfoMapLock.RUnlock()
+	tc.executeInfoMapLock.Unlock()
 
 	return successList, noExecuteList, errorList
 }
 
 // GetResult 获取 TaskData 的反馈值,需要在 下一次 Invoke 拿走,否则会清空
 func (tc *TaskControl) GetResult(index int64) (bool, *TaskData) {
-	tc.inputDataMapLock.RLock()
+	tc.inputDataMapLock.Lock()
 	value, found := tc.inputDataMap[index]
-	tc.inputDataMapLock.RUnlock()
+	tc.inputDataMapLock.Unlock()
 	return found, value
 }
 

+ 36 - 0
internal/pkg/task_control/task_control_test.go

@@ -222,6 +222,42 @@ func TestTaskControl_Invoke(t *testing.T) {
 					NeedRelease:      true}},
 			successProcessCount: 1,
 		},
+		{
+			name: "16", args: args{
+				TimeTester{PoolName: "16",
+					ConcurrentCount:  3,
+					JobCount:         5,
+					TimeAfterRelease: 4,
+					OneJobWaitTime:   3,
+					OneJobTimeOut:    4,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 3,
+		},
+		{
+			name: "17", args: args{
+				TimeTester{PoolName: "17",
+					ConcurrentCount:  4,
+					JobCount:         5,
+					TimeAfterRelease: 4,
+					OneJobWaitTime:   3,
+					OneJobTimeOut:    4,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 4,
+		},
+		{
+			name: "18", args: args{
+				TimeTester{PoolName: "18",
+					ConcurrentCount:  5,
+					JobCount:         5,
+					TimeAfterRelease: 4,
+					OneJobWaitTime:   3,
+					OneJobTimeOut:    4,
+					SelfHold:         true,
+					NeedRelease:      true}},
+			successProcessCount: 5,
+		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {