worker.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package worker
  2. import (
  3. "bytes"
  4. "context"
  5. "github.com/ChineseSubFinder/ChineseSubFinder/pkg/hls_center/cache"
  6. "io"
  7. "path/filepath"
  8. log "github.com/sirupsen/logrus"
  9. )
  10. type WorkHandler interface {
  11. Key(request interface{}) string
  12. Handle(request interface{}, w io.Writer) error
  13. }
  14. type WorkerServerConf struct {
  15. NumWorkers int
  16. CacheDir string
  17. Worker WorkHandler
  18. }
  19. type token struct{}
  20. type WorkerServer struct {
  21. conf WorkerServerConf
  22. cache cache.Cache
  23. tokens chan token
  24. }
  25. func NewWorkerServer(conf WorkerServerConf) *WorkerServer {
  26. tokens := make(chan token, conf.NumWorkers)
  27. for i := conf.NumWorkers; i > 0; i-- {
  28. tokens <- token{}
  29. }
  30. return &WorkerServer{conf, cache.NewDirCache(conf.CacheDir), tokens}
  31. }
  32. func (s *WorkerServer) handler() WorkHandler {
  33. return s.conf.Worker
  34. }
  35. func (s *WorkerServer) getCachePath(r interface{}) string {
  36. return filepath.Join(s.conf.CacheDir, s.handler().Key(r))
  37. }
  38. func (s *WorkerServer) tryServeFromCache(r interface{}, w io.Writer) (bool, error) {
  39. data, err := s.cache.Get(context.Background(), s.handler().Key(r))
  40. // If error getting item, return not served with error
  41. if err != nil {
  42. return false, err
  43. }
  44. // If no item found, return not served with no error
  45. if data == nil {
  46. return false, nil
  47. }
  48. // If copying fails, return served with error
  49. if _, err = io.Copy(w, bytes.NewReader(data)); err != nil {
  50. return true, err
  51. }
  52. // Everything worked, return served with no error
  53. return true, nil
  54. }
  55. // TODO timeout & context
  56. func (s *WorkerServer) Serve(request interface{}, w io.Writer) error {
  57. if served, err := s.tryServeFromCache(request, w); served || err != nil {
  58. if served {
  59. log.Debugf("Served request %v from cache", request)
  60. }
  61. if err != nil {
  62. log.Errorf("Error serving request from cache: %v", err)
  63. }
  64. return err
  65. }
  66. // Wait for token
  67. token := <-s.tokens
  68. defer func() {
  69. s.tokens <- token
  70. }()
  71. log.Debugf("Processing request %v", request)
  72. cw := new(bytes.Buffer)
  73. mw := io.MultiWriter(cw, w)
  74. if err := s.handler().Handle(request, mw); err != nil {
  75. log.Errorf("Error handling request: %v", err)
  76. return err
  77. }
  78. if err := s.cache.Set(context.Background(), s.handler().Key(request), cw.Bytes()); err != nil {
  79. log.Errorf("Error caching request: %v", err)
  80. }
  81. return nil
  82. }