progressemitter.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This program is free software: you can redistribute it and/or modify it
  4. // under the terms of the GNU General Public License as published by the Free
  5. // Software Foundation, either version 3 of the License, or (at your option)
  6. // any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful, but WITHOUT
  9. // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  11. // more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program. If not, see <http://www.gnu.org/licenses/>.
  15. package model
  16. import (
  17. "path/filepath"
  18. "reflect"
  19. "sync"
  20. "time"
  21. "github.com/syncthing/syncthing/internal/config"
  22. "github.com/syncthing/syncthing/internal/events"
  23. )
  24. type ProgressEmitter struct {
  25. registry map[string]*sharedPullerState
  26. interval time.Duration
  27. last map[string]map[string]*pullerProgress
  28. mut sync.Mutex
  29. timer *time.Timer
  30. stop chan struct{}
  31. }
  32. // Creates a new progress emitter which emits DownloadProgress events every
  33. // interval.
  34. func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
  35. t := &ProgressEmitter{
  36. stop: make(chan struct{}),
  37. registry: make(map[string]*sharedPullerState),
  38. last: make(map[string]map[string]*pullerProgress),
  39. timer: time.NewTimer(time.Millisecond),
  40. }
  41. t.Changed(cfg.Raw())
  42. cfg.Subscribe(t)
  43. return t
  44. }
  45. // Starts progress emitter which starts emitting DownloadProgress events as
  46. // the progress happens.
  47. func (t *ProgressEmitter) Serve() {
  48. for {
  49. select {
  50. case <-t.stop:
  51. if debug {
  52. l.Debugln("progress emitter: stopping")
  53. }
  54. return
  55. case <-t.timer.C:
  56. t.mut.Lock()
  57. if debug {
  58. l.Debugln("progress emitter: timer - looking after", len(t.registry))
  59. }
  60. output := make(map[string]map[string]*pullerProgress)
  61. for _, puller := range t.registry {
  62. if output[puller.folder] == nil {
  63. output[puller.folder] = make(map[string]*pullerProgress)
  64. }
  65. output[puller.folder][puller.file.Name] = puller.Progress()
  66. }
  67. if !reflect.DeepEqual(t.last, output) {
  68. events.Default.Log(events.DownloadProgress, output)
  69. t.last = output
  70. if debug {
  71. l.Debugf("progress emitter: emitting %#v", output)
  72. }
  73. } else if debug {
  74. l.Debugln("progress emitter: nothing new")
  75. }
  76. if len(t.registry) != 0 {
  77. t.timer.Reset(t.interval)
  78. }
  79. t.mut.Unlock()
  80. }
  81. }
  82. }
  83. // Interface method to handle configuration changes
  84. func (t *ProgressEmitter) Changed(cfg config.Configuration) error {
  85. t.mut.Lock()
  86. defer t.mut.Unlock()
  87. t.interval = time.Duration(cfg.Options.ProgressUpdateIntervalS) * time.Second
  88. if debug {
  89. l.Debugln("progress emitter: updated interval", t.interval)
  90. }
  91. return nil
  92. }
  93. // Stops the emitter.
  94. func (t *ProgressEmitter) Stop() {
  95. t.stop <- struct{}{}
  96. }
  97. // Register a puller with the emitter which will start broadcasting pullers
  98. // progress.
  99. func (t *ProgressEmitter) Register(s *sharedPullerState) {
  100. t.mut.Lock()
  101. defer t.mut.Unlock()
  102. if debug {
  103. l.Debugln("progress emitter: registering", s.folder, s.file.Name)
  104. }
  105. if len(t.registry) == 0 {
  106. t.timer.Reset(t.interval)
  107. }
  108. t.registry[filepath.Join(s.folder, s.file.Name)] = s
  109. }
  110. // Deregister a puller which will stop boardcasting pullers state.
  111. func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
  112. t.mut.Lock()
  113. defer t.mut.Unlock()
  114. if debug {
  115. l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
  116. }
  117. delete(t.registry, filepath.Join(s.folder, s.file.Name))
  118. }
  119. // Returns number of bytes completed in the given folder.
  120. func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
  121. t.mut.Lock()
  122. defer t.mut.Unlock()
  123. for _, s := range t.registry {
  124. if s.folder == folder {
  125. bytes += s.Progress().BytesDone
  126. }
  127. }
  128. if debug {
  129. l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
  130. }
  131. return
  132. }