task_control.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package task_control
  2. import (
  3. "github.com/panjf2000/ants/v2"
  4. "github.com/sirupsen/logrus"
  5. "golang.org/x/net/context"
  6. "sync"
  7. "time"
  8. )
  9. type TaskControl struct {
  10. pollName string
  11. antPoolBase *ants.PoolWithFunc
  12. wgBase sync.WaitGroup
  13. log *logrus.Logger
  14. oneCtxTimeOutSecond int
  15. released bool
  16. // 传入的 func
  17. ctxFunc func(ctx context.Context, inData interface{}) error
  18. // 输入结构锁
  19. inputDataMap map[int]*TaskData
  20. inputDataMapLock sync.Mutex
  21. // 结束锁
  22. cancelMap map[int]context.CancelFunc
  23. cancelMapLock sync.Mutex
  24. // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
  25. executeInfoMap map[int]TaskState
  26. executeInfoMapLock sync.Mutex
  27. commonLock sync.Mutex
  28. }
  29. func NewTaskControl(size int, log *logrus.Logger) (*TaskControl, error) {
  30. var err error
  31. tc := TaskControl{}
  32. tc.log = log
  33. tc.inputDataMap = make(map[int]*TaskData, 0)
  34. tc.cancelMap = make(map[int]context.CancelFunc, 0)
  35. tc.executeInfoMap = make(map[int]TaskState, 0)
  36. tc.antPoolBase, err = ants.NewPoolWithFunc(size, func(inData interface{}) {
  37. tc.baseFuncHandler(inData)
  38. })
  39. if err != nil {
  40. return nil, err
  41. }
  42. tc.wgBase = sync.WaitGroup{}
  43. return &tc, nil
  44. }
  45. // SetCtxProcessFunc 设置后续需要用到的单个任务的 Func,注意,如果之前的任务没有完成,不应该再次调用函数。建议进行 Release 后,再次调用
  46. func (tc *TaskControl) SetCtxProcessFunc(pollName string, pf func(ctx context.Context, inData interface{}) error, oneCtxTimeOutSecond int) {
  47. tc.pollName = pollName
  48. tc.ctxFunc = pf
  49. tc.oneCtxTimeOutSecond = oneCtxTimeOutSecond
  50. }
  51. // Invoke 向 SetCtxProcessFunc 设置的 Func 中提交数据处理
  52. func (tc *TaskControl) Invoke(inData *TaskData) error {
  53. // 实际执行的时候
  54. tc.wgBase.Add(1)
  55. tc.log.Debugln("Index:", inData.Index, "baseFuncHandler wg.Add()")
  56. // 需要先记录有那些 ID 进来,然后再记录那些是完整执行的,以及出错执行的
  57. tc.setExecuteStatus(inData.Index, NoExecute)
  58. err := tc.antPoolBase.Invoke(inData)
  59. if err != nil {
  60. tc.setTaskDataStatus(inData, NoExecute)
  61. tc.setExecuteStatus(inData.Index, NoExecute)
  62. tc.log.Debugln("Index:", inData.Index, "baseFuncHandler wg.Done()")
  63. tc.wgBase.Done()
  64. return err
  65. }
  66. tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap Lock()")
  67. tc.inputDataMapLock.Lock()
  68. tc.inputDataMap[inData.Index] = inData
  69. tc.inputDataMapLock.Unlock()
  70. tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap UnLock()")
  71. return nil
  72. }
  73. func (tc *TaskControl) baseFuncHandler(inData interface{}) {
  74. data := inData.(*TaskData)
  75. defer func() {
  76. tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Done()")
  77. tc.wgBase.Done()
  78. }()
  79. // 如果已经执行 Release 则返回
  80. nowRelease := false
  81. tc.commonLock.Lock()
  82. nowRelease = tc.released
  83. tc.commonLock.Unlock()
  84. if nowRelease == true {
  85. tc.log.Debugln("Index:", data.Index, "released == true")
  86. return
  87. }
  88. var ctx context.Context
  89. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(tc.oneCtxTimeOutSecond)*time.Second)
  90. defer func() {
  91. // 那么对应的需要取消掉 map 中的记录
  92. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock() defer")
  93. tc.cancelMapLock.Lock()
  94. delete(tc.cancelMap, data.Index)
  95. tc.cancelMapLock.Unlock()
  96. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock() defer")
  97. cancel()
  98. }()
  99. // 记录 cancel
  100. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock()")
  101. tc.cancelMapLock.Lock()
  102. tc.cancelMap[data.Index] = cancel
  103. tc.cancelMapLock.Unlock()
  104. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock()")
  105. done := make(chan error, 1)
  106. panicChan := make(chan interface{}, 1)
  107. go func(ctx context.Context) {
  108. defer func() {
  109. if p := recover(); p != nil {
  110. panicChan <- p
  111. }
  112. }()
  113. done <- tc.ctxFunc(ctx, inData)
  114. }(ctx)
  115. select {
  116. case err := <-done:
  117. if err != nil {
  118. tc.setTaskDataStatus(data, Error)
  119. tc.setExecuteStatus(data.Index, Error)
  120. tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc done with Error", err.Error())
  121. } else {
  122. tc.setTaskDataStatus(data, Success)
  123. tc.setExecuteStatus(data.Index, Success)
  124. }
  125. return
  126. case p := <-panicChan:
  127. tc.setTaskDataStatus(data, Error)
  128. tc.setExecuteStatus(data.Index, Error)
  129. tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got panic", p)
  130. return
  131. case <-ctx.Done():
  132. tc.setTaskDataStatus(data, Error)
  133. tc.setExecuteStatus(data.Index, Error)
  134. tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got time out", ctx.Err())
  135. return
  136. }
  137. }
  138. // Hold 自身进行阻塞,如果你是使用 Web 服务器,那么应该无需使用该方法
  139. func (tc *TaskControl) Hold() {
  140. tc.log.Debugln("Hold()")
  141. tc.wgBase.Wait()
  142. }
  143. func (tc *TaskControl) Release() {
  144. tc.log.Debugln("-------------------------------")
  145. tc.log.Debugln("Release Start")
  146. tc.commonLock.Lock()
  147. tc.released = true
  148. tc.commonLock.Unlock()
  149. tc.log.Debugln("Release.Release")
  150. tc.antPoolBase.Release()
  151. tc.log.Debugln("Release cancel() Start")
  152. // 统一 cancel cancel
  153. tc.cancelMapLock.Lock()
  154. for i, cancelFunc := range tc.cancelMap {
  155. tc.log.Debugln("Release cancel() Index:", i)
  156. cancelFunc()
  157. }
  158. tc.cancelMapLock.Unlock()
  159. tc.log.Debugln("Release cancel() End")
  160. tc.log.Debugln("Release End")
  161. tc.log.Debugln("-------------------------------")
  162. }
  163. func (tc *TaskControl) Reboot() {
  164. var release bool
  165. tc.commonLock.Lock()
  166. release = tc.released
  167. tc.commonLock.Unlock()
  168. if release == true {
  169. // 如果被释放了,那么第一次 Invoke 的时候需要重启这个 pool
  170. tc.antPoolBase.Reboot()
  171. // 需要把缓存的 map 清理掉
  172. tc.inputDataMapLock.Lock()
  173. tc.inputDataMap = make(map[int]*TaskData, 0)
  174. tc.inputDataMapLock.Unlock()
  175. tc.cancelMapLock.Lock()
  176. tc.cancelMap = make(map[int]context.CancelFunc, 0)
  177. tc.cancelMapLock.Unlock()
  178. tc.executeInfoMapLock.Lock()
  179. tc.executeInfoMap = make(map[int]TaskState, 0)
  180. tc.executeInfoMapLock.Unlock()
  181. tc.commonLock.Lock()
  182. tc.released = false
  183. tc.commonLock.Unlock()
  184. }
  185. }
  186. // GetExecuteInfo 获取 所有 Invoke 的执行情况,需要在 下一次 Invoke 拿走,否则会清空
  187. // 成功执行的、未执行的、执行错误(超时)的
  188. func (tc *TaskControl) GetExecuteInfo() ([]int, []int, []int) {
  189. successList := make([]int, 0)
  190. noExecuteList := make([]int, 0)
  191. errorList := make([]int, 0)
  192. tc.executeInfoMapLock.Lock()
  193. for i, state := range tc.executeInfoMap {
  194. if state == Success {
  195. successList = append(successList, i)
  196. } else if state == NoExecute {
  197. noExecuteList = append(noExecuteList, i)
  198. } else if state == Error {
  199. errorList = append(errorList, i)
  200. }
  201. }
  202. tc.executeInfoMapLock.Unlock()
  203. return successList, noExecuteList, errorList
  204. }
  205. // GetResult 获取 TaskData 的反馈值,需要在 下一次 Invoke 拿走,否则会清空
  206. func (tc *TaskControl) GetResult(index int) (bool, *TaskData) {
  207. tc.inputDataMapLock.Lock()
  208. value, found := tc.inputDataMap[index]
  209. tc.inputDataMapLock.Unlock()
  210. return found, value
  211. }
  212. func (tc *TaskControl) setExecuteStatus(index int, status TaskState) {
  213. tc.executeInfoMapLock.Lock()
  214. tc.executeInfoMap[index] = status
  215. tc.executeInfoMapLock.Unlock()
  216. }
  217. func (tc *TaskControl) setTaskDataStatus(taskData *TaskData, status TaskState) {
  218. tc.inputDataMapLock.Lock()
  219. taskData.Status = status
  220. tc.inputDataMapLock.Unlock()
  221. }
  222. type TaskData struct {
  223. Index int
  224. Status TaskState // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
  225. DataEx interface{}
  226. }
  227. type TaskState int
  228. const (
  229. Success TaskState = iota
  230. NoExecute
  231. Error
  232. )