task_control.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package task_control
  2. import (
  3. "github.com/panjf2000/ants/v2"
  4. "github.com/sirupsen/logrus"
  5. "golang.org/x/net/context"
  6. "sync"
  7. "time"
  8. )
  9. type TaskControl struct {
  10. pollName string
  11. antPoolBase *ants.PoolWithFunc
  12. wgBase sync.WaitGroup
  13. log *logrus.Logger
  14. oneCtxTimeOutSecond int
  15. bHold bool
  16. released bool
  17. // 传入的 func
  18. ctxFunc func(ctx context.Context, inData interface{}) error
  19. // 输入结构锁
  20. inputDataMap map[int64]*TaskData
  21. inputDataMapLock sync.Mutex
  22. // 结束锁
  23. cancelMap map[int64]context.CancelFunc
  24. cancelMapLock sync.Mutex
  25. // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
  26. executeInfoMap map[int64]TaskState
  27. executeInfoMapLock sync.Mutex
  28. commonLock sync.Mutex
  29. }
  30. func NewTaskControl(pollName string, size int, oneCtxTimeOutSecond int, log *logrus.Logger) (*TaskControl, error) {
  31. var err error
  32. tc := TaskControl{}
  33. tc.pollName = pollName
  34. tc.oneCtxTimeOutSecond = oneCtxTimeOutSecond
  35. tc.log = log
  36. tc.inputDataMap = make(map[int64]*TaskData, 0)
  37. tc.cancelMap = make(map[int64]context.CancelFunc, 0)
  38. tc.executeInfoMap = make(map[int64]TaskState, 0)
  39. tc.antPoolBase, err = ants.NewPoolWithFunc(size, func(inData interface{}) {
  40. tc.baseFuncHandler(inData)
  41. })
  42. if err != nil {
  43. return nil, err
  44. }
  45. tc.wgBase = sync.WaitGroup{}
  46. return &tc, nil
  47. }
  48. // SetCtxProcessFunc 设置后续需要用到的单个任务的 Func,注意,如果之前的任务没有完成,不应该再次调用函数。建议进行 Release 后,再次调用
  49. func (tc *TaskControl) SetCtxProcessFunc(pf func(ctx context.Context, inData interface{}) error) {
  50. tc.ctxFunc = pf
  51. }
  52. // Invoke 向 SetCtxProcessFunc 设置的 Func 中提交数据处理
  53. func (tc *TaskControl) Invoke(inData *TaskData) error {
  54. // 需要先记录有那些 ID 进来,然后再记录那些是完整执行的,以及出错执行的
  55. tc.setExecuteStatus(inData.Index, NoExecute)
  56. err := tc.antPoolBase.Invoke(inData)
  57. if err != nil {
  58. tc.setTaskDataStatus(inData, Error)
  59. tc.setExecuteStatus(inData.Index, Error)
  60. return err
  61. }
  62. tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap Lock()")
  63. tc.inputDataMapLock.Lock()
  64. tc.inputDataMap[inData.Index] = inData
  65. tc.inputDataMapLock.Unlock()
  66. tc.log.Debugln("Index:", inData.Index, "Invoke inputDataMap UnLock()")
  67. return nil
  68. }
  69. func (tc *TaskControl) baseFuncHandler(inData interface{}) {
  70. data := inData.(*TaskData)
  71. // 如果已经执行 Release 则返回
  72. tc.commonLock.Lock()
  73. if tc.released == true {
  74. tc.log.Debugln("Index:", data.Index, "released == true")
  75. return
  76. }
  77. tc.commonLock.Unlock()
  78. // 实际执行的时候
  79. tc.wgBase.Add(1)
  80. tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Add()")
  81. tc.log.Debugln("Index:", data.Index, "baseFuncHandler wg.Done()")
  82. tc.wgBase.Done()
  83. var ctx context.Context
  84. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(tc.oneCtxTimeOutSecond)*time.Second)
  85. defer func() {
  86. // 那么对应的需要取消掉 map 中的记录
  87. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock() defer")
  88. tc.cancelMapLock.Lock()
  89. delete(tc.cancelMap, data.Index)
  90. tc.cancelMapLock.Unlock()
  91. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock() defer")
  92. cancel()
  93. }()
  94. // 记录 cancel
  95. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock Lock()")
  96. tc.cancelMapLock.Lock()
  97. tc.cancelMap[data.Index] = cancel
  98. tc.cancelMapLock.Unlock()
  99. tc.log.Debugln("Index:", data.Index, "baseFuncHandler cancelMapLock UnLock()")
  100. done := make(chan error, 1)
  101. panicChan := make(chan interface{}, 1)
  102. go func(ctx context.Context) {
  103. defer func() {
  104. if p := recover(); p != nil {
  105. panicChan <- p
  106. }
  107. }()
  108. done <- tc.ctxFunc(ctx, inData)
  109. }(ctx)
  110. select {
  111. case err := <-done:
  112. if err != nil {
  113. tc.setTaskDataStatus(data, Error)
  114. tc.setExecuteStatus(data.Index, Error)
  115. tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc done with Error", err.Error())
  116. } else {
  117. tc.setTaskDataStatus(data, Success)
  118. tc.setExecuteStatus(data.Index, Success)
  119. }
  120. return
  121. case p := <-panicChan:
  122. tc.setTaskDataStatus(data, Error)
  123. tc.setExecuteStatus(data.Index, Error)
  124. tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got panic", p)
  125. return
  126. case <-ctx.Done():
  127. tc.setTaskDataStatus(data, Error)
  128. tc.setExecuteStatus(data.Index, Error)
  129. tc.log.Errorln("PollName:", tc.pollName, "Index:", data.Index, "NewPoolWithFunc got time out", ctx.Err())
  130. return
  131. }
  132. }
  133. // Hold 自身进行阻塞,如果你是使用 Web 服务器,那么应该无需使用该方法
  134. func (tc *TaskControl) Hold() {
  135. tc.commonLock.Lock()
  136. tc.bHold = true
  137. tc.commonLock.Unlock()
  138. tc.wgBase.Add(1)
  139. tc.log.Debugln("Hold wg.Add()")
  140. tc.wgBase.Wait()
  141. }
  142. func (tc *TaskControl) Release() {
  143. tc.log.Debugln("Release Start")
  144. tc.commonLock.Lock()
  145. tc.released = true
  146. tc.commonLock.Unlock()
  147. tc.log.Debugln("Release.Release")
  148. tc.antPoolBase.Release()
  149. tc.log.Debugln("Release cancel() Start")
  150. // 统一 cancel cancel
  151. tc.cancelMapLock.Lock()
  152. for i, cancelFunc := range tc.cancelMap {
  153. tc.log.Debugln("Release cancel() Index:", i)
  154. cancelFunc()
  155. }
  156. tc.cancelMapLock.Unlock()
  157. tc.log.Debugln("Release cancel() End")
  158. var bHold bool
  159. tc.commonLock.Lock()
  160. bHold = tc.bHold
  161. tc.commonLock.Unlock()
  162. if bHold == true {
  163. tc.log.Debugln("Release Hold wg.Done()")
  164. tc.wgBase.Done()
  165. }
  166. tc.log.Debugln("Release End")
  167. }
  168. func (tc *TaskControl) Reboot() {
  169. var release bool
  170. tc.commonLock.Lock()
  171. release = tc.released
  172. tc.commonLock.Unlock()
  173. if release == true {
  174. // 如果被释放了,那么第一次 Invoke 的时候需要重启这个 pool
  175. tc.antPoolBase.Reboot()
  176. // 需要把缓存的 map 清理掉
  177. tc.inputDataMapLock.Lock()
  178. tc.inputDataMap = make(map[int64]*TaskData, 0)
  179. tc.inputDataMapLock.Unlock()
  180. tc.cancelMapLock.Lock()
  181. tc.cancelMap = make(map[int64]context.CancelFunc, 0)
  182. tc.cancelMapLock.Unlock()
  183. tc.executeInfoMapLock.Lock()
  184. tc.executeInfoMap = make(map[int64]TaskState, 0)
  185. tc.executeInfoMapLock.Unlock()
  186. tc.commonLock.Lock()
  187. tc.released = false
  188. tc.commonLock.Unlock()
  189. }
  190. }
  191. // GetExecuteInfo 获取 所有 Invoke 的执行情况,需要在 下一次 Invoke 拿走,否则会清空
  192. // 成功执行的、未执行的、执行错误(超时)的
  193. func (tc *TaskControl) GetExecuteInfo() ([]int64, []int64, []int64) {
  194. successList := make([]int64, 0)
  195. noExecuteList := make([]int64, 0)
  196. errorList := make([]int64, 0)
  197. tc.executeInfoMapLock.Lock()
  198. for i, state := range tc.executeInfoMap {
  199. if state == Success {
  200. successList = append(successList, i)
  201. } else if state == NoExecute {
  202. noExecuteList = append(noExecuteList, i)
  203. } else if state == Error {
  204. errorList = append(errorList, i)
  205. }
  206. }
  207. tc.executeInfoMapLock.Unlock()
  208. return successList, noExecuteList, errorList
  209. }
  210. // GetResult 获取 TaskData 的反馈值,需要在 下一次 Invoke 拿走,否则会清空
  211. func (tc *TaskControl) GetResult(index int64) (bool, *TaskData) {
  212. tc.inputDataMapLock.Lock()
  213. value, found := tc.inputDataMap[index]
  214. tc.inputDataMapLock.Unlock()
  215. return found, value
  216. }
  217. func (tc *TaskControl) setExecuteStatus(index int64, status TaskState) {
  218. tc.executeInfoMapLock.Lock()
  219. tc.executeInfoMap[index] = status
  220. tc.executeInfoMapLock.Unlock()
  221. }
  222. func (tc *TaskControl) setTaskDataStatus(taskData *TaskData, status TaskState) {
  223. tc.inputDataMapLock.Lock()
  224. taskData.Status = status
  225. tc.inputDataMapLock.Unlock()
  226. }
  227. type TaskData struct {
  228. Index int64
  229. Status TaskState // 执行情况, 0 是成功,1 是未执行,2 是错误或者超时
  230. OneVideoFullPath string
  231. DataEx interface{}
  232. }
  233. type TaskState int
  234. const (
  235. Success TaskState = iota
  236. NoExecute
  237. Error
  238. )