task_queue_test.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package task_queue
  2. import (
  3. "fmt"
  4. "github.com/allanpk716/ChineseSubFinder/pkg/types/common"
  5. task_queue2 "github.com/allanpk716/ChineseSubFinder/pkg/types/task_queue"
  6. "testing"
  7. "github.com/allanpk716/ChineseSubFinder/pkg/cache_center"
  8. "github.com/allanpk716/ChineseSubFinder/pkg/log_helper"
  9. "github.com/allanpk716/ChineseSubFinder/pkg/my_util"
  10. "github.com/allanpk716/ChineseSubFinder/pkg/settings"
  11. )
  12. const taskQueueName = "testQueue"
  13. func TestTaskQueue_AddAndGetAndDel(t *testing.T) {
  14. defer func() {
  15. cache_center.DelDb(taskQueueName)
  16. }()
  17. cache_center.DelDb(taskQueueName)
  18. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, settings.GetSettings(), log_helper.GetLogger4Tester()))
  19. defer func() {
  20. taskQueue.Close()
  21. }()
  22. for i := taskPriorityCount; i >= 0; i-- {
  23. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, my_util.RandStringBytesMaskImprSrcSB(10), i))
  24. if err != nil {
  25. t.Fatal("TestTaskQueue.Add", err)
  26. }
  27. if bok == false {
  28. t.Fatal("TestTaskQueue.Add == false")
  29. }
  30. }
  31. bok, waitingJobs, err := taskQueue.GetJobsByStatus(task_queue2.Waiting)
  32. if err != nil {
  33. t.Fatal("TestTaskQueue.Get", err)
  34. }
  35. if bok == false {
  36. t.Fatal("TestTaskQueue.Get == false")
  37. }
  38. if len(waitingJobs) != taskPriorityCount+1 {
  39. t.Fatal("len(waitingJobs) != taskPriorityCount")
  40. }
  41. for i := 0; i <= taskPriorityCount; i++ {
  42. if waitingJobs[i].TaskPriority != i {
  43. t.Fatalf("TestTaskQueue.TaskPriority pop error, want = %d, got = %d", i, waitingJobs[i].TaskPriority)
  44. }
  45. }
  46. for _, waitingJob := range waitingJobs {
  47. bok, err = taskQueue.Del(waitingJob.Id)
  48. if err != nil {
  49. t.Fatal("TestTaskQueue.Del", err)
  50. }
  51. if bok == false {
  52. t.Fatal("TestTaskQueue.Del == false")
  53. }
  54. }
  55. if taskQueue.Size() != 0 {
  56. t.Fatal("taskQueue.Size() != 0")
  57. }
  58. }
  59. func TestTaskQueue_AddAndClear(t *testing.T) {
  60. defer func() {
  61. cache_center.DelDb(taskQueueName)
  62. }()
  63. cache_center.DelDb(taskQueueName)
  64. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, settings.GetSettings(), log_helper.GetLogger4Tester()))
  65. for i := taskPriorityCount; i >= 0; i-- {
  66. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, my_util.RandStringBytesMaskImprSrcSB(10), i))
  67. if err != nil {
  68. t.Fatal("TestTaskQueue.Add", err)
  69. }
  70. if bok == false {
  71. t.Fatal("TestTaskQueue.Add == false")
  72. }
  73. }
  74. err := taskQueue.Clear()
  75. if err != nil {
  76. t.Fatal("TestTaskQueue.Clear", err)
  77. }
  78. if taskQueue.Size() != 0 {
  79. t.Fatal("taskQueue.Size() != 0")
  80. }
  81. }
  82. func TestTaskQueue_Update(t *testing.T) {
  83. defer func() {
  84. cache_center.DelDb(taskQueueName)
  85. }()
  86. cache_center.DelDb(taskQueueName)
  87. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, settings.GetSettings(), log_helper.GetLogger4Tester()))
  88. for i := taskPriorityCount; i >= 0; i-- {
  89. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, my_util.RandStringBytesMaskImprSrcSB(10), i))
  90. if err != nil {
  91. t.Fatal("TestTaskQueue.Add", err)
  92. }
  93. if bok == false {
  94. t.Fatal("TestTaskQueue.Add == false")
  95. }
  96. }
  97. bok, waitingJobs, err := taskQueue.GetJobsByStatus(task_queue2.Waiting)
  98. if err != nil {
  99. t.Fatal("TestTaskQueue.Get", err)
  100. }
  101. if bok == false {
  102. t.Fatal("TestTaskQueue.Get == false")
  103. }
  104. if len(waitingJobs) != taskPriorityCount+1 {
  105. t.Fatal("len(waitingJobs) != taskPriorityCount")
  106. }
  107. for i := 0; i <= taskPriorityCount; i++ {
  108. if waitingJobs[i].TaskPriority != i {
  109. t.Fatalf("TestTaskQueue.TaskPriority pop error, want = %d, got = %d", i, waitingJobs[i].TaskPriority)
  110. }
  111. }
  112. for _, waitingJob := range waitingJobs {
  113. waitingJob.JobStatus = task_queue2.Committed
  114. bok, err = taskQueue.Update(waitingJob)
  115. if err != nil {
  116. t.Fatal("TestTaskQueue.Update", err)
  117. }
  118. if bok == false {
  119. t.Fatal("TestTaskQueue.Update == false")
  120. }
  121. }
  122. bok, committedJobs, err := taskQueue.GetJobsByStatus(task_queue2.Committed)
  123. if err != nil {
  124. t.Fatal("TestTaskQueue.Get", err)
  125. }
  126. if bok == false {
  127. t.Fatal("TestTaskQueue.Get == false")
  128. }
  129. if len(committedJobs) != taskPriorityCount+1 {
  130. t.Fatal("len(committedJobs) != taskPriorityCount")
  131. }
  132. }
  133. func TestTaskQueue_UpdateAdGetOneWaiting(t *testing.T) {
  134. defer func() {
  135. cache_center.DelDb(taskQueueName)
  136. }()
  137. cache_center.DelDb(taskQueueName)
  138. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, settings.GetSettings(), log_helper.GetLogger4Tester()))
  139. for i := taskPriorityCount; i >= 0; i-- {
  140. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, fmt.Sprintf("%d", i), i))
  141. if err != nil {
  142. t.Fatal("TestTaskQueue.Add", err)
  143. }
  144. if bok == false {
  145. t.Fatal("TestTaskQueue.Add == false")
  146. }
  147. }
  148. bok, waitingJob, err := taskQueue.GetOneWaitingJob()
  149. if err != nil {
  150. t.Fatal("TestTaskQueue.GetOneWaitingJob", err)
  151. }
  152. if bok == false {
  153. t.Fatal("TestTaskQueue.GetOneWaitingJob == false")
  154. }
  155. if waitingJob.TaskPriority != 0 {
  156. t.Fatal("waitingJob.TaskPriority != 0")
  157. }
  158. waitingJob.JobStatus = task_queue2.Committed
  159. bok, err = taskQueue.Update(waitingJob)
  160. if err != nil {
  161. t.Fatal("TestTaskQueue.Update", err)
  162. }
  163. if bok == false {
  164. t.Fatal("TestTaskQueue.Update == false")
  165. }
  166. bok, waitingJob, err = taskQueue.GetOneWaitingJob()
  167. if err != nil {
  168. t.Fatal("TestTaskQueue.GetOneWaitingJob", err)
  169. }
  170. if bok == false {
  171. t.Fatal("TestTaskQueue.GetOneWaitingJob == false")
  172. }
  173. if waitingJob.TaskPriority != 1 {
  174. t.Fatal("waitingJob.TaskPriority != 0")
  175. }
  176. }
  177. func TestTaskQueue_UpdatePriority(t *testing.T) {
  178. defer func() {
  179. cache_center.DelDb(taskQueueName)
  180. }()
  181. cache_center.DelDb(taskQueueName)
  182. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, settings.GetSettings(), log_helper.GetLogger4Tester()))
  183. for i := taskPriorityCount; i >= 0; i-- {
  184. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, fmt.Sprintf("%d", i), i))
  185. if err != nil {
  186. t.Fatal("TestTaskQueue.Add", err)
  187. }
  188. if bok == false {
  189. t.Fatal("TestTaskQueue.Add == false")
  190. }
  191. }
  192. bok, waitingJob, err := taskQueue.GetOneWaitingJob()
  193. if err != nil {
  194. t.Fatal("TestTaskQueue.GetOneWaitingJob", err)
  195. }
  196. if bok == false {
  197. t.Fatal("TestTaskQueue.GetOneWaitingJob == false")
  198. }
  199. if waitingJob.TaskPriority != 0 {
  200. t.Fatal("waitingJob.TaskPriority != 0")
  201. }
  202. waitingJob.TaskPriority = 1
  203. bok, err = taskQueue.Update(waitingJob)
  204. if err != nil {
  205. t.Fatal("TestTaskQueue.Update", err)
  206. }
  207. if bok == false {
  208. t.Fatal("TestTaskQueue.Update == false")
  209. }
  210. bok, waitingJobs, err := taskQueue.GetJobsByPriorityAndStatus(0, task_queue2.Waiting)
  211. if err != nil {
  212. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus", err)
  213. }
  214. if bok == false {
  215. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus == false")
  216. }
  217. if len(waitingJobs) != 0 {
  218. t.Fatal("len(waitingJobs) != 0")
  219. }
  220. bok, waitingJobs, err = taskQueue.GetJobsByPriorityAndStatus(1, task_queue2.Waiting)
  221. if err != nil {
  222. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus", err)
  223. }
  224. if bok == false {
  225. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus == false")
  226. }
  227. if len(waitingJobs) != 2 {
  228. t.Fatal("len(waitingJobs) != 2")
  229. }
  230. }
  231. func TestTaskQueue_AddAndGetOneJob(t *testing.T) {
  232. defer func() {
  233. cache_center.DelDb(taskQueueName)
  234. }()
  235. cache_center.DelDb(taskQueueName)
  236. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, settings.GetSettings(), log_helper.GetLogger4Tester()))
  237. for i := taskPriorityCount; i >= 0; i-- {
  238. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, fmt.Sprintf("%d", i), DefaultTaskPriorityLevel))
  239. if err != nil {
  240. t.Fatal("TestTaskQueue.Add", err)
  241. }
  242. if bok == false {
  243. t.Fatal("TestTaskQueue.Add == false")
  244. }
  245. }
  246. bok, oneJob, err := taskQueue.GetOneJob()
  247. if err != nil {
  248. t.Fatal("TestTaskQueue.Add", err)
  249. }
  250. if bok == false {
  251. t.Fatal("TestTaskQueue.Add == false")
  252. }
  253. println("VideoFPath", oneJob.VideoFPath)
  254. println("TaskPriority", oneJob.TaskPriority)
  255. taskQueue.AutoDetectUpdateJobStatus(oneJob, nil)
  256. bok, oneJob, err = taskQueue.GetOneJob()
  257. if err != nil {
  258. t.Fatal("TestTaskQueue.Add", err)
  259. }
  260. if bok == false {
  261. t.Fatal("TestTaskQueue.Add == false")
  262. }
  263. println("VideoFPath", oneJob.VideoFPath)
  264. println("TaskPriority", oneJob.TaskPriority)
  265. found, waitingJobs, err := taskQueue.GetJobsByStatus(task_queue2.Waiting)
  266. if err != nil {
  267. return
  268. }
  269. println(found)
  270. for i, job := range waitingJobs {
  271. println("QueueDownloader Waiting:", i, job.VideoName)
  272. }
  273. found, waitingJobs, err = taskQueue.GetJobsByStatus(task_queue2.Done)
  274. if err != nil {
  275. return
  276. }
  277. println(found)
  278. for i, job := range waitingJobs {
  279. println("QueueDownloader Done:", i, job.VideoName)
  280. }
  281. found, waitingJobs, err = taskQueue.GetJobsByStatus(task_queue2.Failed)
  282. if err != nil {
  283. return
  284. }
  285. println(found)
  286. for i, job := range waitingJobs {
  287. println("QueueDownloader Failed:", i, job.VideoName)
  288. }
  289. }