1
0

queue.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package preview_queue
  2. import (
  3. "errors"
  4. "fmt"
  5. "path/filepath"
  6. "strings"
  7. "sync"
  8. "github.com/ChineseSubFinder/ChineseSubFinder/pkg/types/common"
  9. "github.com/ChineseSubFinder/ChineseSubFinder/pkg"
  10. "github.com/ChineseSubFinder/ChineseSubFinder/pkg/ffmpeg_helper"
  11. llq "github.com/emirpasic/gods/queues/linkedlistqueue"
  12. "github.com/emirpasic/gods/sets/hashset"
  13. "github.com/sirupsen/logrus"
  14. )
  15. type PreviewQueue struct {
  16. log *logrus.Logger
  17. ffmpegHelper *ffmpeg_helper.FFMPEGHelper
  18. processQueue *llq.Queue
  19. jobSet *hashset.Set
  20. jobResultMap sync.Map
  21. addOneSignal chan interface{}
  22. addLocker sync.Mutex
  23. workingJob *Job // 正在操作的任务的路径
  24. }
  25. func NewPreviewQueue(log *logrus.Logger) *PreviewQueue {
  26. p := &PreviewQueue{
  27. log: log,
  28. ffmpegHelper: ffmpeg_helper.NewFFMPEGHelper(log),
  29. processQueue: llq.New(),
  30. jobSet: hashset.New(),
  31. jobResultMap: sync.Map{},
  32. addOneSignal: make(chan interface{}, 1),
  33. workingJob: nil,
  34. }
  35. go func(pu *PreviewQueue) {
  36. for {
  37. select {
  38. case <-pu.addOneSignal:
  39. // 有新任务了
  40. pu.dealers()
  41. }
  42. }
  43. }(p)
  44. return p
  45. }
  46. // GetVideoHLSAndSubByTimeRangeExportPathInfo 获取视频的HLS和字幕的导出路径信息
  47. func (p *PreviewQueue) GetVideoHLSAndSubByTimeRangeExportPathInfo(videoFullPath string, subFullPaths []string, startTimeString, timeLength string) (string, []string, error) {
  48. // 导出视频
  49. if pkg.IsFile(videoFullPath) == false {
  50. return "", nil, errors.New("video file not exist, maybe is bluray file, so not support yet")
  51. }
  52. for _, subFullPath := range subFullPaths {
  53. if pkg.IsFile(subFullPath) == false {
  54. return "", nil, errors.New("sub file not exist:" + subFullPath)
  55. }
  56. }
  57. outDirPath, err := pkg.GetVideoAndSubPreviewCacheFolder()
  58. if err != nil {
  59. return "", nil, err
  60. }
  61. fileName := filepath.Base(videoFullPath)
  62. frontName := strings.ReplaceAll(fileName, filepath.Ext(fileName), "")
  63. outDirSubPath := filepath.Join(outDirPath, frontName, startTimeString+"-"+timeLength)
  64. // 字幕的相对位置
  65. outSubFPaths := make([]string, 0)
  66. for i := 0; i < len(subFullPaths); i++ {
  67. outSubFileFPath := filepath.Join(outDirSubPath, fmt.Sprintf(frontName+"_%d"+common.SubExtSRT, i))
  68. var subRelPath string
  69. subRelPath, err = filepath.Rel(outDirPath, outSubFileFPath)
  70. if err != nil {
  71. return "", nil, err
  72. }
  73. outSubFPaths = append(outSubFPaths, subRelPath)
  74. }
  75. // outputlist.m3u8 的相对位置
  76. outputListRelPath, err := filepath.Rel(outDirPath, filepath.Join(outDirSubPath, "outputlist.m3u8"))
  77. if err != nil {
  78. return "", nil, err
  79. }
  80. return outputListRelPath, outSubFPaths, nil
  81. }
  82. // IsJobInQueue 是否正在队列中排队,或者正在被处理
  83. func (p *PreviewQueue) IsJobInQueue(job *Job) bool {
  84. p.addLocker.Lock()
  85. defer func() {
  86. p.addLocker.Unlock()
  87. }()
  88. if job == nil || job.VideoFPath == "" {
  89. return false
  90. }
  91. if p.jobSet.Contains(job.VideoFPath) == true {
  92. // 已经在队列中了
  93. return true
  94. } else {
  95. if p.workingJob == nil {
  96. return false
  97. }
  98. // 还有一种可能,任务从队列拿出来了,正在处理,那么在外部开来也还是在队列中的
  99. if p.workingJob.VideoFPath == job.VideoFPath {
  100. return true
  101. }
  102. }
  103. return false
  104. }
  105. // Add 添加任务
  106. func (p *PreviewQueue) Add(job *Job) {
  107. p.addLocker.Lock()
  108. defer func() {
  109. p.addLocker.Unlock()
  110. }()
  111. if p.jobSet.Contains(job.VideoFPath) == true {
  112. // 已经在队列中了
  113. return
  114. }
  115. p.processQueue.Enqueue(job)
  116. p.jobSet.Add(job.VideoFPath)
  117. // 通知有新任务了
  118. p.addOneSignal <- struct{}{}
  119. return
  120. }
  121. // ListJob 任务列表
  122. func (p *PreviewQueue) ListJob() []*Job {
  123. p.addLocker.Lock()
  124. defer func() {
  125. p.addLocker.Unlock()
  126. }()
  127. ret := make([]*Job, 0)
  128. for _, v := range p.processQueue.Values() {
  129. ret = append(ret, v.(*Job))
  130. }
  131. if p.workingJob != nil {
  132. ret = append(ret, p.workingJob)
  133. }
  134. return ret
  135. }
  136. // JobResult 任务结果,如果成功 ok,如果没有就是空,其他就是错误信息
  137. func (p *PreviewQueue) JobResult(job *Job) string {
  138. value, found := p.jobResultMap.LoadAndDelete(job.VideoFPath)
  139. if found == false {
  140. return ""
  141. }
  142. return value.(string)
  143. }
  144. func (p *PreviewQueue) dealers() {
  145. p.addLocker.Lock()
  146. if p.processQueue.Empty() == true {
  147. // 没有任务了
  148. p.addLocker.Unlock()
  149. return
  150. }
  151. job, ok := p.processQueue.Dequeue()
  152. if ok == false {
  153. // 没有任务了
  154. p.addLocker.Unlock()
  155. return
  156. }
  157. // 移除这个任务
  158. p.jobSet.Remove(job.(*Job).VideoFPath)
  159. // 标记这个正在处理
  160. p.workingJob = job.(*Job)
  161. p.addLocker.Unlock()
  162. // 具体处理这个任务
  163. err := p.processSub(job.(*Job))
  164. if err != nil {
  165. p.log.Error(err)
  166. }
  167. }
  168. func (p *PreviewQueue) processSub(job *Job) error {
  169. var err error
  170. defer func() {
  171. // 任务处理完了
  172. p.addLocker.Lock()
  173. p.workingJob = nil
  174. p.addLocker.Unlock()
  175. if err != nil {
  176. p.jobResultMap.Store(job.VideoFPath, err.Error())
  177. } else {
  178. p.jobResultMap.Store(job.VideoFPath, "ok")
  179. }
  180. }()
  181. const segmentTime = "5.000"
  182. nowOutRootDirPath, err := pkg.GetVideoAndSubPreviewCacheFolder()
  183. if err != nil {
  184. return err
  185. }
  186. // 具体处理这个任务,这个任务在加入队列之前就可以预测将要存放在哪,以及名称是什么
  187. m3u8FPath, subFPath, err := p.ffmpegHelper.ExportVideoHLSAndSubByTimeRange(job.VideoFPath, job.SubFPaths, job.StartTime, job.EndTime, segmentTime, nowOutRootDirPath)
  188. if err != nil {
  189. return err
  190. }
  191. p.log.Infoln("preview m3u8FPath:", m3u8FPath)
  192. p.log.Infoln("preview subFPath:", subFPath)
  193. return nil
  194. }
  195. type Job struct {
  196. VideoFPath string `json:"video_f_path"`
  197. SubFPaths []string `json:"sub_f_paths"`
  198. StartTime string `json:"start_time"`
  199. EndTime string `json:"end_time"`
  200. }
  201. type Reply struct {
  202. Jobs []*Job `json:"jobs"`
  203. }