task_queue_test.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package task_queue
  2. import (
  3. "fmt"
  4. "testing"
  5. "github.com/ChineseSubFinder/ChineseSubFinder/pkg"
  6. "github.com/ChineseSubFinder/ChineseSubFinder/pkg/types/common"
  7. task_queue2 "github.com/ChineseSubFinder/ChineseSubFinder/pkg/types/task_queue"
  8. "github.com/ChineseSubFinder/ChineseSubFinder/pkg/cache_center"
  9. "github.com/ChineseSubFinder/ChineseSubFinder/pkg/log_helper"
  10. )
  11. const taskQueueName = "testQueue"
  12. func TestTaskQueue_AddAndGetAndDel(t *testing.T) {
  13. defer func() {
  14. cache_center.DelDb(taskQueueName)
  15. }()
  16. cache_center.DelDb(taskQueueName)
  17. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, log_helper.GetLogger4Tester()))
  18. defer func() {
  19. taskQueue.Close()
  20. }()
  21. for i := taskPriorityCount; i >= 0; i-- {
  22. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, pkg.RandStringBytesMaskImprSrcSB(10), i))
  23. if err != nil {
  24. t.Fatal("TestTaskQueue.Add", err)
  25. }
  26. if bok == false {
  27. t.Fatal("TestTaskQueue.Add == false")
  28. }
  29. }
  30. bok, waitingJobs, err := taskQueue.GetJobsByStatus(task_queue2.Waiting)
  31. if err != nil {
  32. t.Fatal("TestTaskQueue.Get", err)
  33. }
  34. if bok == false {
  35. t.Fatal("TestTaskQueue.Get == false")
  36. }
  37. if len(waitingJobs) != taskPriorityCount+1 {
  38. t.Fatal("len(waitingJobs) != taskPriorityCount")
  39. }
  40. for i := 0; i <= taskPriorityCount; i++ {
  41. if waitingJobs[i].TaskPriority != i {
  42. t.Fatalf("TestTaskQueue.TaskPriority pop error, want = %d, got = %d", i, waitingJobs[i].TaskPriority)
  43. }
  44. }
  45. for _, waitingJob := range waitingJobs {
  46. bok, err = taskQueue.Del(waitingJob.Id)
  47. if err != nil {
  48. t.Fatal("TestTaskQueue.Del", err)
  49. }
  50. if bok == false {
  51. t.Fatal("TestTaskQueue.Del == false")
  52. }
  53. }
  54. if taskQueue.Size() != 0 {
  55. t.Fatal("taskQueue.Size() != 0")
  56. }
  57. }
  58. func TestTaskQueue_AddAndClear(t *testing.T) {
  59. defer func() {
  60. cache_center.DelDb(taskQueueName)
  61. }()
  62. cache_center.DelDb(taskQueueName)
  63. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, log_helper.GetLogger4Tester()))
  64. for i := taskPriorityCount; i >= 0; i-- {
  65. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, pkg.RandStringBytesMaskImprSrcSB(10), i))
  66. if err != nil {
  67. t.Fatal("TestTaskQueue.Add", err)
  68. }
  69. if bok == false {
  70. t.Fatal("TestTaskQueue.Add == false")
  71. }
  72. }
  73. err := taskQueue.Clear()
  74. if err != nil {
  75. t.Fatal("TestTaskQueue.Clear", err)
  76. }
  77. if taskQueue.Size() != 0 {
  78. t.Fatal("taskQueue.Size() != 0")
  79. }
  80. }
  81. func TestTaskQueue_Update(t *testing.T) {
  82. defer func() {
  83. cache_center.DelDb(taskQueueName)
  84. }()
  85. cache_center.DelDb(taskQueueName)
  86. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, log_helper.GetLogger4Tester()))
  87. for i := taskPriorityCount; i >= 0; i-- {
  88. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, pkg.RandStringBytesMaskImprSrcSB(10), i))
  89. if err != nil {
  90. t.Fatal("TestTaskQueue.Add", err)
  91. }
  92. if bok == false {
  93. t.Fatal("TestTaskQueue.Add == false")
  94. }
  95. }
  96. bok, waitingJobs, err := taskQueue.GetJobsByStatus(task_queue2.Waiting)
  97. if err != nil {
  98. t.Fatal("TestTaskQueue.Get", err)
  99. }
  100. if bok == false {
  101. t.Fatal("TestTaskQueue.Get == false")
  102. }
  103. if len(waitingJobs) != taskPriorityCount+1 {
  104. t.Fatal("len(waitingJobs) != taskPriorityCount")
  105. }
  106. for i := 0; i <= taskPriorityCount; i++ {
  107. if waitingJobs[i].TaskPriority != i {
  108. t.Fatalf("TestTaskQueue.TaskPriority pop error, want = %d, got = %d", i, waitingJobs[i].TaskPriority)
  109. }
  110. }
  111. for _, waitingJob := range waitingJobs {
  112. waitingJob.JobStatus = task_queue2.Committed
  113. bok, err = taskQueue.Update(waitingJob)
  114. if err != nil {
  115. t.Fatal("TestTaskQueue.Update", err)
  116. }
  117. if bok == false {
  118. t.Fatal("TestTaskQueue.Update == false")
  119. }
  120. }
  121. bok, committedJobs, err := taskQueue.GetJobsByStatus(task_queue2.Committed)
  122. if err != nil {
  123. t.Fatal("TestTaskQueue.Get", err)
  124. }
  125. if bok == false {
  126. t.Fatal("TestTaskQueue.Get == false")
  127. }
  128. if len(committedJobs) != taskPriorityCount+1 {
  129. t.Fatal("len(committedJobs) != taskPriorityCount")
  130. }
  131. }
  132. func TestTaskQueue_UpdateAdGetOneWaiting(t *testing.T) {
  133. defer func() {
  134. cache_center.DelDb(taskQueueName)
  135. }()
  136. cache_center.DelDb(taskQueueName)
  137. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, log_helper.GetLogger4Tester()))
  138. for i := taskPriorityCount; i >= 0; i-- {
  139. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, fmt.Sprintf("%d", i), i))
  140. if err != nil {
  141. t.Fatal("TestTaskQueue.Add", err)
  142. }
  143. if bok == false {
  144. t.Fatal("TestTaskQueue.Add == false")
  145. }
  146. }
  147. bok, waitingJob, err := taskQueue.GetOneWaitingJob()
  148. if err != nil {
  149. t.Fatal("TestTaskQueue.GetOneWaitingJob", err)
  150. }
  151. if bok == false {
  152. t.Fatal("TestTaskQueue.GetOneWaitingJob == false")
  153. }
  154. if waitingJob.TaskPriority != 0 {
  155. t.Fatal("waitingJob.TaskPriority != 0")
  156. }
  157. waitingJob.JobStatus = task_queue2.Committed
  158. bok, err = taskQueue.Update(waitingJob)
  159. if err != nil {
  160. t.Fatal("TestTaskQueue.Update", err)
  161. }
  162. if bok == false {
  163. t.Fatal("TestTaskQueue.Update == false")
  164. }
  165. bok, waitingJob, err = taskQueue.GetOneWaitingJob()
  166. if err != nil {
  167. t.Fatal("TestTaskQueue.GetOneWaitingJob", err)
  168. }
  169. if bok == false {
  170. t.Fatal("TestTaskQueue.GetOneWaitingJob == false")
  171. }
  172. if waitingJob.TaskPriority != 1 {
  173. t.Fatal("waitingJob.TaskPriority != 0")
  174. }
  175. }
  176. func TestTaskQueue_UpdatePriority(t *testing.T) {
  177. defer func() {
  178. cache_center.DelDb(taskQueueName)
  179. }()
  180. cache_center.DelDb(taskQueueName)
  181. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, log_helper.GetLogger4Tester()))
  182. for i := taskPriorityCount; i >= 0; i-- {
  183. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, fmt.Sprintf("%d", i), i))
  184. if err != nil {
  185. t.Fatal("TestTaskQueue.Add", err)
  186. }
  187. if bok == false {
  188. t.Fatal("TestTaskQueue.Add == false")
  189. }
  190. }
  191. bok, waitingJob, err := taskQueue.GetOneWaitingJob()
  192. if err != nil {
  193. t.Fatal("TestTaskQueue.GetOneWaitingJob", err)
  194. }
  195. if bok == false {
  196. t.Fatal("TestTaskQueue.GetOneWaitingJob == false")
  197. }
  198. if waitingJob.TaskPriority != 0 {
  199. t.Fatal("waitingJob.TaskPriority != 0")
  200. }
  201. waitingJob.TaskPriority = 1
  202. bok, err = taskQueue.Update(waitingJob)
  203. if err != nil {
  204. t.Fatal("TestTaskQueue.Update", err)
  205. }
  206. if bok == false {
  207. t.Fatal("TestTaskQueue.Update == false")
  208. }
  209. bok, waitingJobs, err := taskQueue.GetJobsByPriorityAndStatus(0, task_queue2.Waiting)
  210. if err != nil {
  211. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus", err)
  212. }
  213. if bok == false {
  214. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus == false")
  215. }
  216. if len(waitingJobs) != 0 {
  217. t.Fatal("len(waitingJobs) != 0")
  218. }
  219. bok, waitingJobs, err = taskQueue.GetJobsByPriorityAndStatus(1, task_queue2.Waiting)
  220. if err != nil {
  221. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus", err)
  222. }
  223. if bok == false {
  224. t.Fatal("TestTaskQueue.GetJobsByPriorityAndStatus == false")
  225. }
  226. if len(waitingJobs) != 2 {
  227. t.Fatal("len(waitingJobs) != 2")
  228. }
  229. }
  230. func TestTaskQueue_AddAndGetOneJob(t *testing.T) {
  231. defer func() {
  232. cache_center.DelDb(taskQueueName)
  233. }()
  234. cache_center.DelDb(taskQueueName)
  235. taskQueue := NewTaskQueue(cache_center.NewCacheCenter(taskQueueName, log_helper.GetLogger4Tester()))
  236. for i := taskPriorityCount; i >= 0; i-- {
  237. bok, err := taskQueue.Add(*task_queue2.NewOneJob(common.Movie, fmt.Sprintf("%d", i), DefaultTaskPriorityLevel))
  238. if err != nil {
  239. t.Fatal("TestTaskQueue.Add", err)
  240. }
  241. if bok == false {
  242. t.Fatal("TestTaskQueue.Add == false")
  243. }
  244. }
  245. bok, oneJob, err := taskQueue.GetOneJob()
  246. if err != nil {
  247. t.Fatal("TestTaskQueue.Add", err)
  248. }
  249. if bok == false {
  250. t.Fatal("TestTaskQueue.Add == false")
  251. }
  252. println("VideoFPath", oneJob.VideoFPath)
  253. println("TaskPriority", oneJob.TaskPriority)
  254. taskQueue.AutoDetectUpdateJobStatus(oneJob, nil)
  255. bok, oneJob, err = taskQueue.GetOneJob()
  256. if err != nil {
  257. t.Fatal("TestTaskQueue.Add", err)
  258. }
  259. if bok == false {
  260. t.Fatal("TestTaskQueue.Add == false")
  261. }
  262. println("VideoFPath", oneJob.VideoFPath)
  263. println("TaskPriority", oneJob.TaskPriority)
  264. found, waitingJobs, err := taskQueue.GetJobsByStatus(task_queue2.Waiting)
  265. if err != nil {
  266. return
  267. }
  268. println(found)
  269. for i, job := range waitingJobs {
  270. println("QueueDownloader Waiting:", i, job.VideoName)
  271. }
  272. found, waitingJobs, err = taskQueue.GetJobsByStatus(task_queue2.Done)
  273. if err != nil {
  274. return
  275. }
  276. println(found)
  277. for i, job := range waitingJobs {
  278. println("QueueDownloader Done:", i, job.VideoName)
  279. }
  280. found, waitingJobs, err = taskQueue.GetJobsByStatus(task_queue2.Failed)
  281. if err != nil {
  282. return
  283. }
  284. println(found)
  285. for i, job := range waitingJobs {
  286. println("QueueDownloader Failed:", i, job.VideoName)
  287. }
  288. }