task_control_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package task_control
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/allanpk716/ChineseSubFinder/internal/pkg/log_helper"
  6. "golang.org/x/net/context"
  7. "testing"
  8. "time"
  9. )
  10. func TestTaskControl_Invoke(t *testing.T) {
  11. type args struct {
  12. timeTester TimeTester
  13. }
  14. tests := []struct {
  15. name string
  16. args args
  17. successProcessCount int
  18. wantErr bool
  19. }{
  20. // 不超时的情况
  21. {
  22. name: "00", args: args{
  23. TimeTester{PoolName: "00",
  24. ConcurrentCount: 2,
  25. JobCount: 5,
  26. TimeAfterRelease: 5,
  27. OneJobWaitTime: 1,
  28. OneJobTimeOut: 2,
  29. SelfHold: true,
  30. NeedRelease: true}},
  31. successProcessCount: 5,
  32. },
  33. {
  34. name: "01", args: args{
  35. TimeTester{PoolName: "01",
  36. ConcurrentCount: 2,
  37. JobCount: 5,
  38. TimeAfterRelease: 5,
  39. OneJobWaitTime: 1,
  40. OneJobTimeOut: 2,
  41. SelfHold: false,
  42. NeedRelease: false}},
  43. successProcessCount: 5,
  44. },
  45. {
  46. name: "02", args: args{
  47. TimeTester{PoolName: "02",
  48. ConcurrentCount: 2,
  49. JobCount: 5,
  50. TimeAfterRelease: 5,
  51. OneJobWaitTime: 1,
  52. OneJobTimeOut: 2,
  53. SelfHold: false,
  54. NeedRelease: true}},
  55. successProcessCount: 5,
  56. },
  57. // 超时的情况
  58. {
  59. name: "03", args: args{
  60. TimeTester{PoolName: "03",
  61. ConcurrentCount: 2,
  62. JobCount: 5,
  63. TimeAfterRelease: 5,
  64. OneJobWaitTime: 2,
  65. OneJobTimeOut: 1,
  66. SelfHold: true,
  67. NeedRelease: true}},
  68. successProcessCount: 0,
  69. },
  70. {
  71. name: "04", args: args{
  72. TimeTester{PoolName: "04",
  73. ConcurrentCount: 2,
  74. JobCount: 5,
  75. TimeAfterRelease: 5,
  76. OneJobWaitTime: 2,
  77. OneJobTimeOut: 1,
  78. SelfHold: false,
  79. NeedRelease: false}},
  80. successProcessCount: 0,
  81. },
  82. {
  83. name: "05", args: args{
  84. TimeTester{PoolName: "05",
  85. ConcurrentCount: 2,
  86. JobCount: 5,
  87. TimeAfterRelease: 5,
  88. OneJobWaitTime: 2,
  89. OneJobTimeOut: 1,
  90. SelfHold: false,
  91. NeedRelease: true}},
  92. successProcessCount: 0,
  93. },
  94. // 主动触发 painic
  95. {
  96. name: "06", args: args{
  97. TimeTester{PoolName: "06",
  98. ConcurrentCount: 2,
  99. JobCount: 5,
  100. TimeAfterRelease: 5,
  101. OneJobWaitTime: 2,
  102. OneJobTimeOut: 1,
  103. SelfHold: true,
  104. NeedRelease: true,
  105. WantPanic: true}},
  106. successProcessCount: 0,
  107. },
  108. {
  109. name: "07", args: args{
  110. TimeTester{PoolName: "07",
  111. ConcurrentCount: 2,
  112. JobCount: 5,
  113. TimeAfterRelease: 5,
  114. OneJobWaitTime: 2,
  115. OneJobTimeOut: 1,
  116. SelfHold: false,
  117. NeedRelease: false,
  118. WantPanic: true}},
  119. successProcessCount: 0,
  120. },
  121. {
  122. name: "08", args: args{
  123. TimeTester{PoolName: "08",
  124. ConcurrentCount: 2,
  125. JobCount: 5,
  126. TimeAfterRelease: 5,
  127. OneJobWaitTime: 2,
  128. OneJobTimeOut: 1,
  129. SelfHold: false,
  130. NeedRelease: true,
  131. WantPanic: true}},
  132. successProcessCount: 0,
  133. },
  134. // 部分超时
  135. {
  136. name: "09", args: args{
  137. TimeTester{PoolName: "09",
  138. ConcurrentCount: 2,
  139. JobCount: 5,
  140. TimeAfterRelease: 5,
  141. OneJobWaitTime: 2,
  142. OneJobTimeOut: 3,
  143. SelfHold: true,
  144. NeedRelease: true,
  145. IndexOverThanAddMoreTime: 2}},
  146. successProcessCount: 3,
  147. },
  148. {
  149. name: "10", args: args{
  150. TimeTester{PoolName: "10",
  151. ConcurrentCount: 2,
  152. JobCount: 5,
  153. TimeAfterRelease: 5,
  154. OneJobWaitTime: 2,
  155. OneJobTimeOut: 3,
  156. SelfHold: false,
  157. NeedRelease: false,
  158. IndexOverThanAddMoreTime: 2}},
  159. successProcessCount: 3,
  160. },
  161. {
  162. name: "11", args: args{
  163. TimeTester{PoolName: "11",
  164. ConcurrentCount: 2,
  165. JobCount: 5,
  166. TimeAfterRelease: 5,
  167. OneJobWaitTime: 2,
  168. OneJobTimeOut: 3,
  169. SelfHold: false,
  170. NeedRelease: true,
  171. IndexOverThanAddMoreTime: 3}},
  172. successProcessCount: 4,
  173. },
  174. // 使用 Release 取消
  175. {
  176. name: "12", args: args{
  177. TimeTester{PoolName: "12",
  178. ConcurrentCount: 1,
  179. JobCount: 5,
  180. TimeAfterRelease: 2,
  181. OneJobWaitTime: 3,
  182. OneJobTimeOut: 4,
  183. SelfHold: true,
  184. NeedRelease: true}},
  185. successProcessCount: 0,
  186. },
  187. {
  188. name: "13", args: args{
  189. TimeTester{PoolName: "13",
  190. ConcurrentCount: 2,
  191. JobCount: 5,
  192. TimeAfterRelease: 2,
  193. OneJobWaitTime: 3,
  194. OneJobTimeOut: 4,
  195. SelfHold: true,
  196. NeedRelease: true}},
  197. successProcessCount: 0,
  198. },
  199. {
  200. name: "14", args: args{
  201. TimeTester{PoolName: "14",
  202. ConcurrentCount: 2,
  203. JobCount: 5,
  204. TimeAfterRelease: 4,
  205. OneJobWaitTime: 3,
  206. OneJobTimeOut: 4,
  207. SelfHold: true,
  208. NeedRelease: true}},
  209. successProcessCount: 2,
  210. },
  211. {
  212. name: "15", args: args{
  213. TimeTester{PoolName: "15",
  214. ConcurrentCount: 1,
  215. JobCount: 5,
  216. TimeAfterRelease: 4,
  217. OneJobWaitTime: 3,
  218. OneJobTimeOut: 4,
  219. SelfHold: true,
  220. NeedRelease: true}},
  221. successProcessCount: 1,
  222. },
  223. {
  224. name: "16", args: args{
  225. TimeTester{PoolName: "16",
  226. ConcurrentCount: 3,
  227. JobCount: 5,
  228. TimeAfterRelease: 4,
  229. OneJobWaitTime: 3,
  230. OneJobTimeOut: 4,
  231. SelfHold: true,
  232. NeedRelease: true}},
  233. successProcessCount: 3,
  234. },
  235. {
  236. name: "17", args: args{
  237. TimeTester{PoolName: "17",
  238. ConcurrentCount: 4,
  239. JobCount: 5,
  240. TimeAfterRelease: 4,
  241. OneJobWaitTime: 3,
  242. OneJobTimeOut: 4,
  243. SelfHold: true,
  244. NeedRelease: true}},
  245. successProcessCount: 4,
  246. },
  247. {
  248. name: "18", args: args{
  249. TimeTester{PoolName: "18",
  250. ConcurrentCount: 5,
  251. JobCount: 5,
  252. TimeAfterRelease: 4,
  253. OneJobWaitTime: 3,
  254. OneJobTimeOut: 4,
  255. SelfHold: true,
  256. NeedRelease: true}},
  257. successProcessCount: 5,
  258. },
  259. }
  260. for _, tt := range tests {
  261. t.Run(tt.name, func(t *testing.T) {
  262. successList, _, _, err := process(tt.args.timeTester)
  263. if err != nil {
  264. t.Fatal(err)
  265. }
  266. if tt.successProcessCount != len(successList) {
  267. t.Fatal("want successProcessCount =", tt.successProcessCount, "now =", len(successList))
  268. }
  269. })
  270. }
  271. }
  272. func process(timeTester TimeTester) ([]int64, []int64, []int64, error) {
  273. OneJobWaitTime = timeTester.OneJobWaitTime
  274. WantPanic = timeTester.WantPanic
  275. IndexOverThanAddMoreTime = int64(timeTester.IndexOverThanAddMoreTime)
  276. tc, err := NewTaskControl(timeTester.PoolName, timeTester.ConcurrentCount, timeTester.OneJobTimeOut, log_helper.GetLogger())
  277. if err != nil {
  278. return nil, nil, nil, err
  279. }
  280. tc.SetCtxProcessFunc(waitTimes)
  281. for i := 0; i < timeTester.JobCount; i++ {
  282. go func(index int64) {
  283. err := tc.Invoke(&TaskData{Index: index})
  284. if err != nil {
  285. tc.log.Errorln("Index:", index, "Error", err)
  286. }
  287. }(int64(i))
  288. }
  289. go func() {
  290. if timeTester.NeedRelease == false {
  291. tc.log.Infoln("Do not need Release")
  292. return
  293. }
  294. tc.log.Infoln("Release After", timeTester.TimeAfterRelease, "Second")
  295. time.Sleep(time.Duration(timeTester.TimeAfterRelease) * time.Second)
  296. tc.Release()
  297. }()
  298. fmt.Println("-------------------------------")
  299. if timeTester.SelfHold == true {
  300. fmt.Println("Start Hold")
  301. tc.Hold()
  302. fmt.Println("End Hold")
  303. } else {
  304. waitTime := timeTester.JobCount * timeTester.OneJobWaitTime
  305. fmt.Printf("wait %ds start\n", waitTime)
  306. time.Sleep(time.Duration(waitTime) * time.Second)
  307. fmt.Printf("wait %ds end\n", waitTime)
  308. }
  309. fmt.Println("-------------------------------")
  310. // 获取提前终止的计数器以及完成的计数器
  311. successList, noExecuteList, errorList := tc.GetExecuteInfo()
  312. return successList, noExecuteList, errorList, nil
  313. }
  314. func waitTimes(ctx context.Context, inData interface{}) error {
  315. phase0 := make(chan interface{}, 1)
  316. index := inData.(*TaskData)
  317. if WantPanic == true {
  318. panic("want panic")
  319. }
  320. go func() {
  321. fmt.Println("Index:", index.Index, "Start 0")
  322. if IndexOverThanAddMoreTime == 0 {
  323. time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
  324. } else {
  325. if index.Index > IndexOverThanAddMoreTime {
  326. time.Sleep(time.Duration(OneJobWaitTime+10) * time.Second)
  327. } else {
  328. time.Sleep(time.Duration(OneJobWaitTime) * time.Second)
  329. }
  330. }
  331. phase0 <- 1
  332. fmt.Println("Index:", index.Index, "End 0")
  333. }()
  334. select {
  335. case <-ctx.Done():
  336. {
  337. fmt.Println("Index:", index.Index, "timeout 0")
  338. return errors.New("timeout jump")
  339. }
  340. case <-phase0:
  341. break
  342. }
  343. fmt.Println("Index:", index.Index, "Start 1")
  344. fmt.Println("Index:", index.Index, "End 1")
  345. return nil
  346. }
  347. type TimeTester struct {
  348. PoolName string // 名称
  349. ConcurrentCount int // 并发数
  350. JobCount int // 总任务数
  351. TimeAfterRelease int // 开始后等待多久执行 Release 操作
  352. OneJobWaitTime int // 单个任务得耗时
  353. OneJobTimeOut int // 单个任务的超时时间
  354. SelfHold bool // 是否需要自身的等待,如果使用了,那么一定需要 Release
  355. NeedRelease bool // 是否需要主动执行 Release
  356. WantPanic bool // 触发 panic
  357. IndexOverThanAddMoreTime int // waitTimes函数中某个 Index 之后都会在等待处理上多加延时以便触发超时逻辑
  358. }
  359. var OneJobWaitTime int
  360. var WantPanic bool
  361. var IndexOverThanAddMoreTime int64