1
0

progressemitter.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "fmt"
  10. "log/slog"
  11. "sync"
  12. "time"
  13. "github.com/syncthing/syncthing/lib/config"
  14. "github.com/syncthing/syncthing/lib/events"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. )
  17. type ProgressEmitter struct {
  18. cfg config.Wrapper
  19. registry map[string]map[string]*sharedPullerState // folder: name: puller
  20. interval time.Duration
  21. minBlocks int
  22. sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages.
  23. connections map[protocol.DeviceID]protocol.Connection
  24. foldersByConns map[protocol.DeviceID][]string
  25. disabled bool
  26. evLogger events.Logger
  27. mut sync.Mutex
  28. timer *time.Timer
  29. }
  30. type progressUpdate struct {
  31. conn protocol.Connection
  32. folder string
  33. updates []protocol.FileDownloadProgressUpdate
  34. }
  35. func (p progressUpdate) send(ctx context.Context) {
  36. p.conn.DownloadProgress(ctx, &protocol.DownloadProgress{Folder: p.folder, Updates: p.updates})
  37. }
  38. // NewProgressEmitter creates a new progress emitter which emits
  39. // DownloadProgress events every interval.
  40. func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter {
  41. t := &ProgressEmitter{
  42. cfg: cfg,
  43. registry: make(map[string]map[string]*sharedPullerState),
  44. timer: time.NewTimer(time.Millisecond),
  45. sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
  46. connections: make(map[protocol.DeviceID]protocol.Connection),
  47. foldersByConns: make(map[protocol.DeviceID][]string),
  48. evLogger: evLogger,
  49. }
  50. t.CommitConfiguration(config.Configuration{}, cfg.RawCopy())
  51. return t
  52. }
  53. // serve starts the progress emitter which starts emitting DownloadProgress
  54. // events as the progress happens.
  55. func (t *ProgressEmitter) Serve(ctx context.Context) error {
  56. t.cfg.Subscribe(t)
  57. defer t.cfg.Unsubscribe(t)
  58. var lastUpdate time.Time
  59. var lastCount, newCount int
  60. for {
  61. select {
  62. case <-ctx.Done():
  63. slog.Debug("Progress emitter: stopping")
  64. return nil
  65. case <-t.timer.C:
  66. t.mut.Lock()
  67. newLastUpdated := lastUpdate
  68. newCount = t.lenRegistryLocked()
  69. var progressUpdates []progressUpdate
  70. for _, pullers := range t.registry {
  71. for _, puller := range pullers {
  72. if updated := puller.Updated(); updated.After(newLastUpdated) {
  73. newLastUpdated = updated
  74. }
  75. }
  76. }
  77. if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount {
  78. lastUpdate = newLastUpdated
  79. lastCount = newCount
  80. t.sendDownloadProgressEventLocked()
  81. progressUpdates = t.computeProgressUpdates()
  82. }
  83. if newCount != 0 {
  84. t.timer.Reset(t.interval)
  85. }
  86. t.mut.Unlock()
  87. // Do the sending outside of the lock.
  88. // If these send block, the whole process of reporting progress to others stops, but that's probably fine.
  89. // It's better to stop this component from working under back-pressure than causing other components that
  90. // rely on this component to be waiting for locks.
  91. //
  92. // This might leave remote peers in some funky state where we are unable the fact that we no longer have
  93. // something, but there is not much we can do here.
  94. for _, update := range progressUpdates {
  95. update.send(ctx)
  96. }
  97. }
  98. }
  99. }
  100. func (t *ProgressEmitter) sendDownloadProgressEventLocked() {
  101. output := make(map[string]map[string]*PullerProgress)
  102. for folder, pullers := range t.registry {
  103. if len(pullers) == 0 {
  104. continue
  105. }
  106. output[folder] = make(map[string]*PullerProgress)
  107. for name, puller := range pullers {
  108. output[folder][name] = puller.Progress()
  109. }
  110. }
  111. t.evLogger.Log(events.DownloadProgress, output)
  112. l.Debugf("progress emitter: emitting %#v", output)
  113. }
  114. func (t *ProgressEmitter) computeProgressUpdates() []progressUpdate {
  115. var progressUpdates []progressUpdate
  116. for id, conn := range t.connections {
  117. for _, folder := range t.foldersByConns[id] {
  118. pullers, ok := t.registry[folder]
  119. if !ok {
  120. // There's never been any puller registered for this folder yet
  121. continue
  122. }
  123. state, ok := t.sentDownloadStates[id]
  124. if !ok {
  125. state = &sentDownloadState{
  126. folderStates: make(map[string]*sentFolderDownloadState),
  127. }
  128. t.sentDownloadStates[id] = state
  129. }
  130. activePullers := make([]*sharedPullerState, 0, len(pullers))
  131. for _, puller := range pullers {
  132. if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks {
  133. continue
  134. }
  135. activePullers = append(activePullers, puller)
  136. }
  137. // For every new puller that hasn't yet been seen, it will send all the blocks the puller has available
  138. // For every existing puller, it will check for new blocks, and send update for the new blocks only
  139. // For every puller that we've seen before but is no longer there, we will send a forget message
  140. updates := state.update(folder, activePullers)
  141. if len(updates) > 0 {
  142. progressUpdates = append(progressUpdates, progressUpdate{
  143. conn: conn,
  144. folder: folder,
  145. updates: updates,
  146. })
  147. }
  148. }
  149. }
  150. // Clean up sentDownloadStates for devices which we are no longer connected to.
  151. for id := range t.sentDownloadStates {
  152. _, ok := t.connections[id]
  153. if !ok {
  154. // Null out outstanding entries for device
  155. delete(t.sentDownloadStates, id)
  156. }
  157. }
  158. // If a folder was unshared from some device, tell it that all temp files
  159. // are now gone.
  160. for id, state := range t.sentDownloadStates {
  161. // For each of the folders that the state is aware of,
  162. // try to match it with a shared folder we've discovered above,
  163. nextFolder:
  164. for _, folder := range state.folders() {
  165. for _, existingFolder := range t.foldersByConns[id] {
  166. if existingFolder == folder {
  167. continue nextFolder
  168. }
  169. }
  170. // If we fail to find that folder, we tell the state to forget about it
  171. // and return us a list of updates which would clean up the state
  172. // on the remote end.
  173. state.cleanup(folder)
  174. // updates := state.cleanup(folder)
  175. // if len(updates) > 0 {
  176. // XXX: Don't send this now, as the only way we've unshared a folder
  177. // is by breaking the connection and reconnecting, hence sending
  178. // forget messages for some random folder currently makes no sense.
  179. // deviceConns[id].DownloadProgress(folder, updates, 0, nil)
  180. // }
  181. }
  182. }
  183. return progressUpdates
  184. }
  185. // CommitConfiguration implements the config.Committer interface
  186. func (t *ProgressEmitter) CommitConfiguration(_, to config.Configuration) bool {
  187. t.mut.Lock()
  188. defer t.mut.Unlock()
  189. newInterval := time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
  190. if newInterval > 0 {
  191. if t.disabled {
  192. t.disabled = false
  193. slog.Debug("Progress emitter: enabled")
  194. }
  195. if t.interval != newInterval {
  196. t.interval = newInterval
  197. l.Debugln("Progress emitter: updated interval", t.interval)
  198. }
  199. } else if !t.disabled {
  200. t.clearLocked()
  201. t.disabled = true
  202. slog.Debug("Progress emitter: disabled")
  203. }
  204. t.minBlocks = to.Options.TempIndexMinBlocks
  205. if t.interval < time.Second {
  206. // can't happen when we're not disabled, but better safe than sorry.
  207. t.interval = time.Second
  208. }
  209. return true
  210. }
  211. // Register a puller with the emitter which will start broadcasting pullers
  212. // progress.
  213. func (t *ProgressEmitter) Register(s *sharedPullerState) {
  214. t.mut.Lock()
  215. defer t.mut.Unlock()
  216. if t.disabled {
  217. return
  218. }
  219. l.Debugln("progress emitter: registering", s.folder, s.file.Name)
  220. if t.emptyLocked() {
  221. t.timer.Reset(t.interval)
  222. }
  223. if _, ok := t.registry[s.folder]; !ok {
  224. t.registry[s.folder] = make(map[string]*sharedPullerState)
  225. }
  226. t.registry[s.folder][s.file.Name] = s
  227. }
  228. // Deregister a puller which will stop broadcasting pullers state.
  229. func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
  230. t.mut.Lock()
  231. defer t.mut.Unlock()
  232. if t.disabled {
  233. return
  234. }
  235. l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
  236. delete(t.registry[s.folder], s.file.Name)
  237. }
  238. // BytesCompleted returns the number of bytes completed in the given folder.
  239. func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
  240. t.mut.Lock()
  241. defer t.mut.Unlock()
  242. for _, s := range t.registry[folder] {
  243. bytes += s.Progress().BytesDone
  244. }
  245. l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
  246. return
  247. }
  248. func (t *ProgressEmitter) String() string {
  249. return fmt.Sprintf("ProgressEmitter@%p", t)
  250. }
  251. func (t *ProgressEmitter) lenRegistry() int {
  252. t.mut.Lock()
  253. defer t.mut.Unlock()
  254. return t.lenRegistryLocked()
  255. }
  256. func (t *ProgressEmitter) lenRegistryLocked() (out int) {
  257. for _, pullers := range t.registry {
  258. out += len(pullers)
  259. }
  260. return out
  261. }
  262. func (t *ProgressEmitter) emptyLocked() bool {
  263. for _, pullers := range t.registry {
  264. if len(pullers) != 0 {
  265. return false
  266. }
  267. }
  268. return true
  269. }
  270. func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) {
  271. t.mut.Lock()
  272. defer t.mut.Unlock()
  273. t.connections[conn.DeviceID()] = conn
  274. t.foldersByConns[conn.DeviceID()] = folders
  275. }
  276. func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) {
  277. t.mut.Lock()
  278. defer t.mut.Unlock()
  279. delete(t.connections, conn.DeviceID())
  280. delete(t.foldersByConns, conn.DeviceID())
  281. }
  282. func (t *ProgressEmitter) clearLocked() {
  283. for id, state := range t.sentDownloadStates {
  284. conn, ok := t.connections[id]
  285. if !ok {
  286. continue
  287. }
  288. for _, folder := range state.folders() {
  289. if updates := state.cleanup(folder); len(updates) > 0 {
  290. conn.DownloadProgress(context.Background(), &protocol.DownloadProgress{Folder: folder, Updates: updates})
  291. }
  292. }
  293. }
  294. t.registry = make(map[string]map[string]*sharedPullerState)
  295. t.sentDownloadStates = make(map[protocol.DeviceID]*sentDownloadState)
  296. t.connections = make(map[protocol.DeviceID]protocol.Connection)
  297. t.foldersByConns = make(map[protocol.DeviceID][]string)
  298. }