queue.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package preview_queue
  2. import (
  3. "errors"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "sync"
  8. "github.com/allanpk716/ChineseSubFinder/pkg/types/common"
  9. "github.com/allanpk716/ChineseSubFinder/pkg"
  10. "github.com/allanpk716/ChineseSubFinder/pkg/ffmpeg_helper"
  11. llq "github.com/emirpasic/gods/queues/linkedlistqueue"
  12. "github.com/emirpasic/gods/sets/hashset"
  13. "github.com/sirupsen/logrus"
  14. )
  15. type PreviewQueue struct {
  16. log *logrus.Logger
  17. ffmpegHelper *ffmpeg_helper.FFMPEGHelper
  18. processQueue *llq.Queue
  19. jobSet *hashset.Set
  20. addOneSignal chan interface{}
  21. addLocker sync.Mutex
  22. workingJob *Job // 正在操作的任务的路径
  23. }
  24. func NewPreviewQueue(log *logrus.Logger) *PreviewQueue {
  25. p := &PreviewQueue{
  26. log: log,
  27. ffmpegHelper: ffmpeg_helper.NewFFMPEGHelper(log),
  28. processQueue: llq.New(),
  29. jobSet: hashset.New(),
  30. addOneSignal: make(chan interface{}, 1),
  31. workingJob: nil,
  32. }
  33. go func(pu *PreviewQueue) {
  34. for {
  35. select {
  36. case <-pu.addOneSignal:
  37. // 有新任务了
  38. pu.dealers()
  39. }
  40. }
  41. }(p)
  42. return p
  43. }
  44. // GetVideoHLSAndSubByTimeRangeExportPathInfo 获取视频的HLS和字幕的导出路径信息
  45. func (p *PreviewQueue) GetVideoHLSAndSubByTimeRangeExportPathInfo(videoFullPath, subFullPath, startTimeString, timeLength string) (string, string, error) {
  46. // 导出视频
  47. if pkg.IsFile(videoFullPath) == false {
  48. return "", "", errors.New("video file not exist, maybe is bluray file, so not support yet")
  49. }
  50. if pkg.IsFile(subFullPath) == false {
  51. return "", "", errors.New("sub file not exist")
  52. }
  53. outDirPath, err := pkg.GetVideoAndSubPreviewCacheFolder()
  54. if err != nil {
  55. return "", "", err
  56. }
  57. fileName := filepath.Base(videoFullPath)
  58. frontName := strings.ReplaceAll(fileName, filepath.Ext(fileName), "")
  59. outDirSubPath := filepath.Join(outDirPath, frontName, startTimeString+"-"+timeLength)
  60. if pkg.IsDir(outDirSubPath) == true {
  61. err := os.RemoveAll(outDirSubPath)
  62. if err != nil {
  63. return "", "", err
  64. }
  65. }
  66. err = os.MkdirAll(outDirSubPath, os.ModePerm)
  67. if err != nil {
  68. return "", "", err
  69. }
  70. outSubFileFPath := filepath.Join(outDirSubPath, frontName+common.SubExtSRT)
  71. // 字幕的相对位置
  72. subRelPath, err := filepath.Rel(outDirPath, outSubFileFPath)
  73. if err != nil {
  74. return "", "", err
  75. }
  76. // outputlist.m3u8 的相对位置
  77. outputListRelPath, err := filepath.Rel(outDirPath, filepath.Join(outDirSubPath, "outputlist.m3u8"))
  78. if err != nil {
  79. return "", "", err
  80. }
  81. return outputListRelPath, subRelPath, nil
  82. }
  83. // IsJobInQueue 是否正在队列中排队,或者正在被处理
  84. func (p *PreviewQueue) IsJobInQueue(job *Job) bool {
  85. p.addLocker.Lock()
  86. defer func() {
  87. p.addLocker.Unlock()
  88. }()
  89. if job == nil || job.VideoFPath == "" {
  90. return false
  91. }
  92. if p.jobSet.Contains(job.VideoFPath) == true {
  93. // 已经在队列中了
  94. return true
  95. } else {
  96. if p.workingJob == nil {
  97. return false
  98. }
  99. // 还有一种可能,任务从队列拿出来了,正在处理,那么在外部开来也还是在队列中的
  100. if p.workingJob.VideoFPath == job.VideoFPath {
  101. return true
  102. }
  103. }
  104. return false
  105. }
  106. // Add 添加任务
  107. func (p *PreviewQueue) Add(job *Job) {
  108. p.addLocker.Lock()
  109. defer func() {
  110. p.addLocker.Unlock()
  111. }()
  112. if p.jobSet.Contains(job.VideoFPath) == true {
  113. // 已经在队列中了
  114. return
  115. }
  116. p.processQueue.Enqueue(job)
  117. p.jobSet.Add(job.VideoFPath)
  118. // 通知有新任务了
  119. p.addOneSignal <- struct{}{}
  120. return
  121. }
  122. // ListJob 任务列表
  123. func (p *PreviewQueue) ListJob() []*Job {
  124. p.addLocker.Lock()
  125. defer func() {
  126. p.addLocker.Unlock()
  127. }()
  128. ret := make([]*Job, 0)
  129. for _, v := range p.processQueue.Values() {
  130. ret = append(ret, v.(*Job))
  131. }
  132. if p.workingJob != nil {
  133. ret = append(ret, p.workingJob)
  134. }
  135. return ret
  136. }
  137. func (p *PreviewQueue) dealers() {
  138. p.addLocker.Lock()
  139. if p.processQueue.Empty() == true {
  140. // 没有任务了
  141. p.addLocker.Unlock()
  142. return
  143. }
  144. job, ok := p.processQueue.Dequeue()
  145. if ok == false {
  146. // 没有任务了
  147. p.addLocker.Unlock()
  148. return
  149. }
  150. // 移除这个任务
  151. p.jobSet.Remove(job.(*Job).VideoFPath)
  152. // 标记这个正在处理
  153. p.workingJob = job.(*Job)
  154. p.addLocker.Unlock()
  155. // 具体处理这个任务
  156. err := p.processSub(job.(*Job))
  157. if err != nil {
  158. p.log.Error(err)
  159. }
  160. }
  161. func (p *PreviewQueue) processSub(job *Job) error {
  162. defer func() {
  163. // 任务处理完了
  164. p.addLocker.Lock()
  165. p.workingJob = nil
  166. p.addLocker.Unlock()
  167. }()
  168. const segmentTime = "5.000"
  169. nowOutRootDirPath, err := pkg.GetVideoAndSubPreviewCacheFolder()
  170. if err != nil {
  171. return err
  172. }
  173. // 具体处理这个任务,这个任务在加入队列之前就可以预测将要存放在哪,以及名称是什么
  174. m3u8FPath, subFPath, err := p.ffmpegHelper.ExportVideoHLSAndSubByTimeRange(job.VideoFPath, job.SubFPath, job.StartTime, job.EndTime, segmentTime, nowOutRootDirPath)
  175. if err != nil {
  176. return err
  177. }
  178. p.log.Infoln("preview m3u8FPath:", m3u8FPath)
  179. p.log.Infoln("preview subFPath:", subFPath)
  180. return nil
  181. }
  182. type Job struct {
  183. VideoFPath string `json:"video_f_path"`
  184. SubFPath string `json:"sub_f_path"`
  185. StartTime string `json:"start_time"`
  186. EndTime string `json:"end_time"`
  187. }
  188. type Reply struct {
  189. Jobs []*Job `json:"jobs"`
  190. }