task_control_test.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. package task_control
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/ChineseSubFinder/ChineseSubFinder/pkg"
  9. "github.com/ChineseSubFinder/ChineseSubFinder/pkg/log_helper"
  10. "github.com/sirupsen/logrus"
  11. "golang.org/x/net/context"
  12. )
  13. func TestTaskControl_Invoke(t *testing.T) {
  14. type args struct {
  15. timeTester TimeTester
  16. }
  17. tests := []struct {
  18. name string
  19. args args
  20. successProcessCount int
  21. wantErr bool
  22. }{
  23. // 不超时的情况
  24. {
  25. name: "00", args: args{
  26. TimeTester{PoolName: "00",
  27. ConcurrentCount: 1,
  28. JobCount: 5,
  29. OneJobWaitTime: 1,
  30. OneJobTimeOut: 2,
  31. }},
  32. successProcessCount: 5,
  33. },
  34. {
  35. name: "01", args: args{
  36. TimeTester{PoolName: "01",
  37. ConcurrentCount: 2,
  38. JobCount: 5,
  39. OneJobWaitTime: 1,
  40. OneJobTimeOut: 2,
  41. }},
  42. successProcessCount: 5,
  43. },
  44. {
  45. name: "02", args: args{
  46. TimeTester{PoolName: "02",
  47. ConcurrentCount: 3,
  48. JobCount: 5,
  49. OneJobWaitTime: 1,
  50. OneJobTimeOut: 2,
  51. }},
  52. successProcessCount: 5,
  53. },
  54. // 超时的情况
  55. {
  56. name: "03", args: args{
  57. TimeTester{PoolName: "03",
  58. ConcurrentCount: 2,
  59. JobCount: 5,
  60. TimeAfterRelease: 5,
  61. OneJobWaitTime: 2,
  62. OneJobTimeOut: 1,
  63. NeedRelease: true}},
  64. successProcessCount: 0,
  65. },
  66. {
  67. name: "04", args: args{
  68. TimeTester{PoolName: "04",
  69. ConcurrentCount: 2,
  70. JobCount: 5,
  71. TimeAfterRelease: 5,
  72. OneJobWaitTime: 2,
  73. OneJobTimeOut: 1,
  74. NeedRelease: false}},
  75. successProcessCount: 0,
  76. },
  77. {
  78. name: "05", args: args{
  79. TimeTester{PoolName: "05",
  80. ConcurrentCount: 2,
  81. JobCount: 5,
  82. TimeAfterRelease: 5,
  83. OneJobWaitTime: 2,
  84. OneJobTimeOut: 1,
  85. NeedRelease: true}},
  86. successProcessCount: 0,
  87. },
  88. // 主动触发 painic
  89. {
  90. name: "06", args: args{
  91. TimeTester{PoolName: "06",
  92. ConcurrentCount: 2,
  93. JobCount: 5,
  94. TimeAfterRelease: 5,
  95. OneJobWaitTime: 2,
  96. OneJobTimeOut: 1,
  97. NeedRelease: true,
  98. WantPanic: true}},
  99. successProcessCount: 0,
  100. },
  101. {
  102. name: "07", args: args{
  103. TimeTester{PoolName: "07",
  104. ConcurrentCount: 2,
  105. JobCount: 5,
  106. TimeAfterRelease: 5,
  107. OneJobWaitTime: 2,
  108. OneJobTimeOut: 1,
  109. NeedRelease: false,
  110. WantPanic: true}},
  111. successProcessCount: 0,
  112. },
  113. {
  114. name: "08", args: args{
  115. TimeTester{PoolName: "08",
  116. ConcurrentCount: 2,
  117. JobCount: 5,
  118. TimeAfterRelease: 5,
  119. OneJobWaitTime: 2,
  120. OneJobTimeOut: 1,
  121. NeedRelease: true,
  122. WantPanic: true}},
  123. successProcessCount: 0,
  124. },
  125. // 部分超时
  126. {
  127. name: "09", args: args{
  128. TimeTester{PoolName: "09",
  129. ConcurrentCount: 2,
  130. JobCount: 5,
  131. TimeAfterRelease: 5,
  132. OneJobWaitTime: 2,
  133. OneJobTimeOut: 3,
  134. NeedRelease: true,
  135. IndexOverThanAddMoreTime: 2}},
  136. successProcessCount: 3,
  137. },
  138. {
  139. name: "10", args: args{
  140. TimeTester{PoolName: "10",
  141. ConcurrentCount: 2,
  142. JobCount: 5,
  143. TimeAfterRelease: 5,
  144. OneJobWaitTime: 2,
  145. OneJobTimeOut: 3,
  146. NeedRelease: false,
  147. IndexOverThanAddMoreTime: 2}},
  148. successProcessCount: 3,
  149. },
  150. {
  151. name: "11", args: args{
  152. TimeTester{PoolName: "11",
  153. ConcurrentCount: 2,
  154. JobCount: 5,
  155. TimeAfterRelease: 5,
  156. OneJobWaitTime: 2,
  157. OneJobTimeOut: 3,
  158. NeedRelease: true,
  159. IndexOverThanAddMoreTime: 3}},
  160. successProcessCount: 4,
  161. },
  162. // 使用 Release 取消
  163. {
  164. name: "12", args: args{
  165. TimeTester{PoolName: "12",
  166. ConcurrentCount: 1,
  167. JobCount: 5,
  168. TimeAfterRelease: 2,
  169. OneJobWaitTime: 3,
  170. OneJobTimeOut: 4,
  171. NeedRelease: true}},
  172. successProcessCount: 0,
  173. },
  174. {
  175. name: "13", args: args{
  176. TimeTester{PoolName: "13",
  177. ConcurrentCount: 2,
  178. JobCount: 5,
  179. TimeAfterRelease: 2,
  180. OneJobWaitTime: 3,
  181. OneJobTimeOut: 4,
  182. NeedRelease: true}},
  183. successProcessCount: 0,
  184. },
  185. {
  186. name: "14", args: args{
  187. TimeTester{PoolName: "14",
  188. ConcurrentCount: 2,
  189. JobCount: 5,
  190. TimeAfterRelease: 4,
  191. OneJobWaitTime: 3,
  192. OneJobTimeOut: 4,
  193. NeedRelease: true}},
  194. successProcessCount: 2,
  195. },
  196. {
  197. name: "15", args: args{
  198. TimeTester{PoolName: "15",
  199. ConcurrentCount: 1,
  200. JobCount: 5,
  201. TimeAfterRelease: 3,
  202. OneJobWaitTime: 2,
  203. OneJobTimeOut: 4,
  204. NeedRelease: true}},
  205. successProcessCount: 1,
  206. },
  207. {
  208. name: "16", args: args{
  209. TimeTester{PoolName: "16",
  210. ConcurrentCount: 3,
  211. JobCount: 5,
  212. TimeAfterRelease: 4,
  213. OneJobWaitTime: 3,
  214. OneJobTimeOut: 4,
  215. NeedRelease: true}},
  216. successProcessCount: 3,
  217. },
  218. {
  219. name: "17", args: args{
  220. TimeTester{PoolName: "17",
  221. ConcurrentCount: 4,
  222. JobCount: 5,
  223. TimeAfterRelease: 4,
  224. OneJobWaitTime: 3,
  225. OneJobTimeOut: 4,
  226. NeedRelease: true}},
  227. successProcessCount: 4,
  228. },
  229. {
  230. name: "18", args: args{
  231. TimeTester{PoolName: "18",
  232. ConcurrentCount: 5,
  233. JobCount: 5,
  234. TimeAfterRelease: 4,
  235. OneJobWaitTime: 3,
  236. OneJobTimeOut: 4,
  237. NeedRelease: true}},
  238. successProcessCount: 5,
  239. },
  240. }
  241. for _, tt := range tests {
  242. t.Run(tt.name, func(t *testing.T) {
  243. successList, _, _, err := process(tt.name, tt.args.timeTester)
  244. if err != nil {
  245. t.Fatal(err)
  246. }
  247. if tt.successProcessCount != len(successList) {
  248. t.Fatal("want successProcessCount =", tt.successProcessCount, "now =", len(successList))
  249. }
  250. })
  251. }
  252. }
  253. func process(name string, timeTester TimeTester) ([]int, []int, []int, error) {
  254. once := sync.Once{}
  255. tc, err := NewTaskControl(timeTester.ConcurrentCount, log_helper.NewLogHelper(name, pkg.ConfigRootDirFPath(), logrus.DebugLevel, time.Duration(7*24)*time.Hour, time.Duration(24)*time.Hour))
  256. if err != nil {
  257. return nil, nil, nil, err
  258. }
  259. tc.SetCtxProcessFunc(timeTester.PoolName, waitTimes, timeTester.OneJobTimeOut)
  260. for i := 0; i < timeTester.JobCount; i++ {
  261. once.Do(func() {
  262. go func() {
  263. if timeTester.NeedRelease == false {
  264. tc.log.Infoln("Do not need Release")
  265. return
  266. }
  267. tc.log.Infoln("Release After", timeTester.TimeAfterRelease, "Second")
  268. time.Sleep(time.Duration(timeTester.TimeAfterRelease) * time.Second)
  269. tc.Release()
  270. }()
  271. })
  272. err := tc.Invoke(&TaskData{Index: i,
  273. DataEx: DataEx{
  274. OneJobWaitTime: timeTester.OneJobWaitTime,
  275. WantPanic: timeTester.WantPanic,
  276. IndexOverThanAddMoreTime: timeTester.IndexOverThanAddMoreTime,
  277. }})
  278. if err != nil {
  279. tc.log.Errorln("Index:", i, "Error", err)
  280. }
  281. }
  282. tc.Hold()
  283. fmt.Println("-------------------------------")
  284. // 获取提前终止的计数器以及完成的计数器
  285. successList, noExecuteList, errorList := tc.GetExecuteInfo()
  286. return successList, noExecuteList, errorList, nil
  287. }
  288. func waitTimes(ctx context.Context, inData interface{}) error {
  289. phase0 := make(chan interface{}, 1)
  290. index := inData.(*TaskData)
  291. dataEx := index.DataEx.(DataEx)
  292. if dataEx.WantPanic == true {
  293. panic("want panic")
  294. }
  295. go func() {
  296. defer func() {
  297. close(phase0)
  298. }()
  299. fmt.Println("Index:", index.Index, "Start 0")
  300. if dataEx.IndexOverThanAddMoreTime == 0 {
  301. time.Sleep(time.Duration(dataEx.OneJobWaitTime) * time.Second)
  302. } else {
  303. if index.Index > dataEx.IndexOverThanAddMoreTime {
  304. time.Sleep(time.Duration(dataEx.OneJobWaitTime+10) * time.Second)
  305. } else {
  306. time.Sleep(time.Duration(dataEx.OneJobWaitTime) * time.Second)
  307. }
  308. }
  309. phase0 <- 1
  310. fmt.Println("Index:", index.Index, "End 0")
  311. }()
  312. select {
  313. case <-ctx.Done():
  314. {
  315. fmt.Println("Index:", index.Index, "timeout 0")
  316. return errors.New("timeout jump")
  317. }
  318. case <-phase0:
  319. break
  320. }
  321. fmt.Println("Index:", index.Index, "Start 1")
  322. fmt.Println("Index:", index.Index, "End 1")
  323. return nil
  324. }
  325. type TimeTester struct {
  326. PoolName string // 名称
  327. ConcurrentCount int // 并发数
  328. JobCount int // 总任务数
  329. TimeAfterRelease int // 开始后等待多久执行 Release 操作
  330. OneJobWaitTime int // 单个任务得耗时
  331. OneJobTimeOut int // 单个任务的超时时间
  332. NeedRelease bool // 是否需要主动执行 Release
  333. WantPanic bool // 触发 panic
  334. IndexOverThanAddMoreTime int // waitTimes函数中某个 Index 之后都会在等待处理上多加延时以便触发超时逻辑
  335. }
  336. type DataEx struct {
  337. OneJobWaitTime int
  338. WantPanic bool
  339. IndexOverThanAddMoreTime int
  340. }