1
0

progressemitter.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package model
  2. import (
  3. "path/filepath"
  4. "reflect"
  5. "sync"
  6. "time"
  7. "github.com/syncthing/syncthing/internal/config"
  8. "github.com/syncthing/syncthing/internal/events"
  9. )
  10. type ProgressEmitter struct {
  11. registry map[string]*sharedPullerState
  12. interval time.Duration
  13. last map[string]map[string]*pullerProgress
  14. mut sync.Mutex
  15. timer *time.Timer
  16. stop chan struct{}
  17. }
  18. // Creates a new progress emitter which emits DownloadProgress events every
  19. // interval.
  20. func NewProgressEmitter(cfg *config.ConfigWrapper) *ProgressEmitter {
  21. t := &ProgressEmitter{
  22. stop: make(chan struct{}),
  23. registry: make(map[string]*sharedPullerState),
  24. last: make(map[string]map[string]*pullerProgress),
  25. timer: time.NewTimer(time.Millisecond),
  26. }
  27. t.Changed(cfg.Raw())
  28. cfg.Subscribe(t)
  29. return t
  30. }
  31. // Starts progress emitter which starts emitting DownloadProgress events as
  32. // the progress happens.
  33. func (t *ProgressEmitter) Serve() {
  34. for {
  35. select {
  36. case <-t.stop:
  37. if debug {
  38. l.Debugln("progress emitter: stopping")
  39. }
  40. return
  41. case <-t.timer.C:
  42. t.mut.Lock()
  43. if debug {
  44. l.Debugln("progress emitter: timer - looking after", len(t.registry))
  45. }
  46. output := make(map[string]map[string]*pullerProgress)
  47. for _, puller := range t.registry {
  48. if output[puller.folder] == nil {
  49. output[puller.folder] = make(map[string]*pullerProgress)
  50. }
  51. output[puller.folder][puller.file.Name] = puller.Progress()
  52. }
  53. if !reflect.DeepEqual(t.last, output) {
  54. events.Default.Log(events.DownloadProgress, output)
  55. t.last = output
  56. if debug {
  57. l.Debugf("progress emitter: emitting %#v", output)
  58. }
  59. } else if debug {
  60. l.Debugln("progress emitter: nothing new")
  61. }
  62. if len(t.registry) != 0 {
  63. t.timer.Reset(t.interval)
  64. }
  65. t.mut.Unlock()
  66. }
  67. }
  68. }
  69. // Interface method to handle configuration changes
  70. func (t *ProgressEmitter) Changed(cfg config.Configuration) error {
  71. t.mut.Lock()
  72. defer t.mut.Unlock()
  73. t.interval = time.Duration(cfg.Options.ProgressUpdateIntervalS) * time.Second
  74. if debug {
  75. l.Debugln("progress emitter: updated interval", t.interval)
  76. }
  77. return nil
  78. }
  79. // Stops the emitter.
  80. func (t *ProgressEmitter) Stop() {
  81. t.stop <- struct{}{}
  82. }
  83. // Register a puller with the emitter which will start broadcasting pullers
  84. // progress.
  85. func (t *ProgressEmitter) Register(s *sharedPullerState) {
  86. t.mut.Lock()
  87. defer t.mut.Unlock()
  88. if debug {
  89. l.Debugln("progress emitter: registering", s.folder, s.file.Name)
  90. }
  91. if len(t.registry) == 0 {
  92. t.timer.Reset(t.interval)
  93. }
  94. t.registry[filepath.Join(s.folder, s.file.Name)] = s
  95. }
  96. // Deregister a puller which will stop boardcasting pullers state.
  97. func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
  98. t.mut.Lock()
  99. defer t.mut.Unlock()
  100. if debug {
  101. l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
  102. }
  103. delete(t.registry, filepath.Join(s.folder, s.file.Name))
  104. }
  105. // Returns number of bytes completed in the given folder.
  106. func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
  107. t.mut.Lock()
  108. defer t.mut.Unlock()
  109. for _, s := range t.registry {
  110. if s.folder == folder {
  111. bytes += s.Progress().BytesDone
  112. }
  113. }
  114. if debug {
  115. l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
  116. }
  117. return
  118. }