123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- package task_control
- import (
- "github.com/panjf2000/ants/v2"
- "github.com/sirupsen/logrus"
- "golang.org/x/net/context"
- "sync"
- "time"
- )
- type TaskControl struct {
- pollName string
- antPoolBase *ants.PoolWithFunc
- wgBase sync.WaitGroup
- log *logrus.Logger
- oneCtxTimeOutSecond int
- released bool
- // 传入的 func
- ctxFunc func(ctx context.Context, inData interface{}) error
- // 输入结构锁
- inputDataMap map[int]*TaskData
- inputDataMapLock sync.Mutex
- // 结束锁
- cancelMap map[int]context.CancelFunc
- cancelMapLock sync.Mutex
- // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
- executeInfoMap map[int]TaskState
- executeInfoMapLock sync.Mutex
- commonLock sync.Mutex
- }
- func NewTaskControl(size int, log *logrus.Logger) (*TaskControl, error) {
- var err error
- tc := TaskControl{}
- tc.log = log
- tc.inputDataMap = make(map[int]*TaskData, 0)
- tc.cancelMap = make(map[int]context.CancelFunc, 0)
- tc.executeInfoMap = make(map[int]TaskState, 0)
- tc.antPoolBase, err = ants.NewPoolWithFunc(size, func(inData interface{}) {
- tc.baseFuncHandler(inData)
- })
- if err != nil {
- return nil, err
- }
- tc.wgBase = sync.WaitGroup{}
- return &tc, nil
- }
- // SetCtxProcessFunc 设置后续需要用到的单个任务的 Func,注意,如果之前的任务没有完成,不应该再次调用函数。建议进行 Release 后,再次调用
- func (tc *TaskControl) SetCtxProcessFunc(pollName string, pf func(ctx context.Context, inData interface{}) error, oneCtxTimeOutSecond int) {
- tc.pollName = pollName
- tc.ctxFunc = pf
- tc.oneCtxTimeOutSecond = oneCtxTimeOutSecond
- }
- // Invoke 向 SetCtxProcessFunc 设置的 Func 中提交数据处理
- func (tc *TaskControl) Invoke(inData *TaskData) error {
- // 实际执行的时候
- tc.wgBase.Add(1)
- tc.log.Debugln("Index:", inData.Index, "baseFuncHandler wg.Add()")
- // 需要先记录有那些 ID 进来,然后再记录那些是完整执行的,以及出错执行的
- tc.setExecuteStatus(inData.Index, NoExecute)
- err := tc.antPoolBase.Invoke(inData)
- if err != nil {
- tc.setTaskDataStatus(inData, NoExecute)
- tc.setExecuteStatus(inData.Index, NoExecute)
- tc.log.Debugln("Index:", inData.Index, "baseFuncHandler wg.Done()")
- tc.wgBase.Done()
- return err
- }
- tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap Lock()")
- tc.inputDataMapLock.Lock()
- tc.inputDataMap[inData.Index] = inData
- tc.inputDataMapLock.Unlock()
- tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap UnLock()")
- return nil
- }
- func (tc *TaskControl) baseFuncHandler(inData interface{}) {
- data := inData.(*TaskData)
- defer func() {
- tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Done()")
- tc.wgBase.Done()
- }()
- // 如果已经执行 Release 则返回
- nowRelease := false
- tc.commonLock.Lock()
- nowRelease = tc.released
- tc.commonLock.Unlock()
- if nowRelease == true {
- tc.log.Debugln("Index:", data.Index, "released == true")
- return
- }
- var ctx context.Context
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(tc.oneCtxTimeOutSecond)*time.Second)
- defer func() {
- // 那么对应的需要取消掉 map 中的记录
- tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock() defer")
- tc.cancelMapLock.Lock()
- delete(tc.cancelMap, data.Index)
- tc.cancelMapLock.Unlock()
- tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock() defer")
- cancel()
- }()
- // 记录 cancel
- tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock()")
- tc.cancelMapLock.Lock()
- tc.cancelMap[data.Index] = cancel
- tc.cancelMapLock.Unlock()
- tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock()")
- done := make(chan error, 1)
- panicChan := make(chan interface{}, 1)
- go func(ctx context.Context) {
- defer func() {
- if p := recover(); p != nil {
- panicChan <- p
- }
- }()
- done <- tc.ctxFunc(ctx, inData)
- }(ctx)
- select {
- case err := <-done:
- if err != nil {
- tc.setTaskDataStatus(data, Error)
- tc.setExecuteStatus(data.Index, Error)
- tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc done with Error", err.Error())
- } else {
- tc.setTaskDataStatus(data, Success)
- tc.setExecuteStatus(data.Index, Success)
- }
- return
- case p := <-panicChan:
- tc.setTaskDataStatus(data, Error)
- tc.setExecuteStatus(data.Index, Error)
- tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got panic", p)
- return
- case <-ctx.Done():
- tc.setTaskDataStatus(data, Error)
- tc.setExecuteStatus(data.Index, Error)
- tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got time out", ctx.Err())
- return
- }
- }
- // Hold 自身进行阻塞,如果你是使用 Web 服务器,那么应该无需使用该方法
- func (tc *TaskControl) Hold() {
- tc.log.Debugln("Hold()")
- tc.wgBase.Wait()
- }
- func (tc *TaskControl) Release() {
- tc.log.Debugln("-------------------------------")
- tc.log.Debugln("Release Start")
- tc.commonLock.Lock()
- tc.released = true
- tc.commonLock.Unlock()
- tc.log.Debugln("Release.Release")
- tc.antPoolBase.Release()
- tc.log.Debugln("Release cancel() Start")
- // 统一 cancel cancel
- tc.cancelMapLock.Lock()
- for i, cancelFunc := range tc.cancelMap {
- tc.log.Debugln("Release cancel() Index:", i)
- cancelFunc()
- }
- tc.cancelMapLock.Unlock()
- tc.log.Debugln("Release cancel() End")
- tc.log.Debugln("Release End")
- tc.log.Debugln("-------------------------------")
- }
- func (tc *TaskControl) Reboot() {
- var release bool
- tc.commonLock.Lock()
- release = tc.released
- tc.commonLock.Unlock()
- if release == true {
- // 如果被释放了,那么第一次 Invoke 的时候需要重启这个 pool
- tc.antPoolBase.Reboot()
- // 需要把缓存的 map 清理掉
- tc.inputDataMapLock.Lock()
- tc.inputDataMap = make(map[int]*TaskData, 0)
- tc.inputDataMapLock.Unlock()
- tc.cancelMapLock.Lock()
- tc.cancelMap = make(map[int]context.CancelFunc, 0)
- tc.cancelMapLock.Unlock()
- tc.executeInfoMapLock.Lock()
- tc.executeInfoMap = make(map[int]TaskState, 0)
- tc.executeInfoMapLock.Unlock()
- tc.commonLock.Lock()
- tc.released = false
- tc.commonLock.Unlock()
- }
- }
- // GetExecuteInfo 获取 所有 Invoke 的执行情况,需要在 下一次 Invoke 拿走,否则会清空
- // 成功执行的、未执行的、执行错误(超时)的
- func (tc *TaskControl) GetExecuteInfo() ([]int, []int, []int) {
- successList := make([]int, 0)
- noExecuteList := make([]int, 0)
- errorList := make([]int, 0)
- tc.executeInfoMapLock.Lock()
- for i, state := range tc.executeInfoMap {
- if state == Success {
- successList = append(successList, i)
- } else if state == NoExecute {
- noExecuteList = append(noExecuteList, i)
- } else if state == Error {
- errorList = append(errorList, i)
- }
- }
- tc.executeInfoMapLock.Unlock()
- return successList, noExecuteList, errorList
- }
- // GetResult 获取 TaskData 的反馈值,需要在 下一次 Invoke 拿走,否则会清空
- func (tc *TaskControl) GetResult(index int) (bool, *TaskData) {
- tc.inputDataMapLock.Lock()
- value, found := tc.inputDataMap[index]
- tc.inputDataMapLock.Unlock()
- return found, value
- }
- func (tc *TaskControl) setExecuteStatus(index int, status TaskState) {
- tc.executeInfoMapLock.Lock()
- tc.executeInfoMap[index] = status
- tc.executeInfoMapLock.Unlock()
- }
- func (tc *TaskControl) setTaskDataStatus(taskData *TaskData, status TaskState) {
- tc.inputDataMapLock.Lock()
- taskData.Status = status
- tc.inputDataMapLock.Unlock()
- }
- type TaskData struct {
- Index int
- Status TaskState // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
- DataEx interface{}
- }
- type TaskState int
- const (
- Success TaskState = iota
- NoExecute
- Error
- )
|