progressemitter.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "fmt"
  10. "time"
  11. "github.com/thejerf/suture"
  12. "github.com/syncthing/syncthing/lib/config"
  13. "github.com/syncthing/syncthing/lib/events"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sync"
  16. "github.com/syncthing/syncthing/lib/util"
  17. )
  18. type ProgressEmitter struct {
  19. suture.Service
  20. registry map[string]map[string]*sharedPullerState // folder: name: puller
  21. interval time.Duration
  22. minBlocks int
  23. sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages.
  24. connections map[protocol.DeviceID]protocol.Connection
  25. foldersByConns map[protocol.DeviceID][]string
  26. disabled bool
  27. evLogger events.Logger
  28. mut sync.Mutex
  29. timer *time.Timer
  30. }
  31. // NewProgressEmitter creates a new progress emitter which emits
  32. // DownloadProgress events every interval.
  33. func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter {
  34. t := &ProgressEmitter{
  35. registry: make(map[string]map[string]*sharedPullerState),
  36. timer: time.NewTimer(time.Millisecond),
  37. sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
  38. connections: make(map[protocol.DeviceID]protocol.Connection),
  39. foldersByConns: make(map[protocol.DeviceID][]string),
  40. evLogger: evLogger,
  41. mut: sync.NewMutex(),
  42. }
  43. t.Service = util.AsService(t.serve, t.String())
  44. t.CommitConfiguration(config.Configuration{}, cfg.RawCopy())
  45. cfg.Subscribe(t)
  46. return t
  47. }
  48. // serve starts the progress emitter which starts emitting DownloadProgress
  49. // events as the progress happens.
  50. func (t *ProgressEmitter) serve(ctx context.Context) {
  51. var lastUpdate time.Time
  52. var lastCount, newCount int
  53. for {
  54. select {
  55. case <-ctx.Done():
  56. l.Debugln("progress emitter: stopping")
  57. return
  58. case <-t.timer.C:
  59. t.mut.Lock()
  60. l.Debugln("progress emitter: timer - looking after", len(t.registry))
  61. newLastUpdated := lastUpdate
  62. newCount = t.lenRegistryLocked()
  63. for _, pullers := range t.registry {
  64. for _, puller := range pullers {
  65. if updated := puller.Updated(); updated.After(newLastUpdated) {
  66. newLastUpdated = updated
  67. }
  68. }
  69. }
  70. if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount {
  71. lastUpdate = newLastUpdated
  72. lastCount = newCount
  73. t.sendDownloadProgressEventLocked()
  74. if len(t.connections) > 0 {
  75. t.sendDownloadProgressMessagesLocked(ctx)
  76. }
  77. } else {
  78. l.Debugln("progress emitter: nothing new")
  79. }
  80. if newCount != 0 {
  81. t.timer.Reset(t.interval)
  82. }
  83. t.mut.Unlock()
  84. }
  85. }
  86. }
  87. func (t *ProgressEmitter) sendDownloadProgressEventLocked() {
  88. output := make(map[string]map[string]*pullerProgress)
  89. for folder, pullers := range t.registry {
  90. if len(pullers) == 0 {
  91. continue
  92. }
  93. output[folder] = make(map[string]*pullerProgress)
  94. for name, puller := range pullers {
  95. output[folder][name] = puller.Progress()
  96. }
  97. }
  98. t.evLogger.Log(events.DownloadProgress, output)
  99. l.Debugf("progress emitter: emitting %#v", output)
  100. }
  101. func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context) {
  102. for id, conn := range t.connections {
  103. for _, folder := range t.foldersByConns[id] {
  104. pullers, ok := t.registry[folder]
  105. if !ok {
  106. // There's never been any puller registered for this folder yet
  107. continue
  108. }
  109. state, ok := t.sentDownloadStates[id]
  110. if !ok {
  111. state = &sentDownloadState{
  112. folderStates: make(map[string]*sentFolderDownloadState),
  113. }
  114. t.sentDownloadStates[id] = state
  115. }
  116. activePullers := make([]*sharedPullerState, 0, len(pullers))
  117. for _, puller := range pullers {
  118. if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks {
  119. continue
  120. }
  121. activePullers = append(activePullers, puller)
  122. }
  123. // For every new puller that hasn't yet been seen, it will send all the blocks the puller has available
  124. // For every existing puller, it will check for new blocks, and send update for the new blocks only
  125. // For every puller that we've seen before but is no longer there, we will send a forget message
  126. updates := state.update(folder, activePullers)
  127. if len(updates) > 0 {
  128. conn.DownloadProgress(ctx, folder, updates)
  129. }
  130. }
  131. }
  132. // Clean up sentDownloadStates for devices which we are no longer connected to.
  133. for id := range t.sentDownloadStates {
  134. _, ok := t.connections[id]
  135. if !ok {
  136. // Null out outstanding entries for device
  137. delete(t.sentDownloadStates, id)
  138. }
  139. }
  140. // If a folder was unshared from some device, tell it that all temp files
  141. // are now gone.
  142. for id, state := range t.sentDownloadStates {
  143. // For each of the folders that the state is aware of,
  144. // try to match it with a shared folder we've discovered above,
  145. nextFolder:
  146. for _, folder := range state.folders() {
  147. for _, existingFolder := range t.foldersByConns[id] {
  148. if existingFolder == folder {
  149. continue nextFolder
  150. }
  151. }
  152. // If we fail to find that folder, we tell the state to forget about it
  153. // and return us a list of updates which would clean up the state
  154. // on the remote end.
  155. state.cleanup(folder)
  156. // updates := state.cleanup(folder)
  157. // if len(updates) > 0 {
  158. // XXX: Don't send this now, as the only way we've unshared a folder
  159. // is by breaking the connection and reconnecting, hence sending
  160. // forget messages for some random folder currently makes no sense.
  161. // deviceConns[id].DownloadProgress(folder, updates, 0, nil)
  162. // }
  163. }
  164. }
  165. }
  166. // VerifyConfiguration implements the config.Committer interface
  167. func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error {
  168. return nil
  169. }
  170. // CommitConfiguration implements the config.Committer interface
  171. func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) bool {
  172. t.mut.Lock()
  173. defer t.mut.Unlock()
  174. switch {
  175. case t.disabled && to.Options.ProgressUpdateIntervalS >= 0:
  176. t.disabled = false
  177. l.Debugln("progress emitter: enabled")
  178. fallthrough
  179. case !t.disabled && from.Options.ProgressUpdateIntervalS != to.Options.ProgressUpdateIntervalS:
  180. t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
  181. if t.interval < time.Second {
  182. t.interval = time.Second
  183. }
  184. l.Debugln("progress emitter: updated interval", t.interval)
  185. case !t.disabled && to.Options.ProgressUpdateIntervalS < 0:
  186. t.clearLocked()
  187. t.disabled = true
  188. l.Debugln("progress emitter: disabled")
  189. }
  190. t.minBlocks = to.Options.TempIndexMinBlocks
  191. return true
  192. }
  193. // Register a puller with the emitter which will start broadcasting pullers
  194. // progress.
  195. func (t *ProgressEmitter) Register(s *sharedPullerState) {
  196. t.mut.Lock()
  197. defer t.mut.Unlock()
  198. if t.disabled {
  199. l.Debugln("progress emitter: disabled, skip registering")
  200. return
  201. }
  202. l.Debugln("progress emitter: registering", s.folder, s.file.Name)
  203. if t.emptyLocked() {
  204. t.timer.Reset(t.interval)
  205. }
  206. if _, ok := t.registry[s.folder]; !ok {
  207. t.registry[s.folder] = make(map[string]*sharedPullerState)
  208. }
  209. t.registry[s.folder][s.file.Name] = s
  210. }
  211. // Deregister a puller which will stop broadcasting pullers state.
  212. func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
  213. t.mut.Lock()
  214. defer t.mut.Unlock()
  215. if t.disabled {
  216. l.Debugln("progress emitter: disabled, skip deregistering")
  217. return
  218. }
  219. l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
  220. delete(t.registry[s.folder], s.file.Name)
  221. }
  222. // BytesCompleted returns the number of bytes completed in the given folder.
  223. func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
  224. t.mut.Lock()
  225. defer t.mut.Unlock()
  226. for _, s := range t.registry[folder] {
  227. bytes += s.Progress().BytesDone
  228. }
  229. l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
  230. return
  231. }
  232. func (t *ProgressEmitter) String() string {
  233. return fmt.Sprintf("ProgressEmitter@%p", t)
  234. }
  235. func (t *ProgressEmitter) lenRegistry() int {
  236. t.mut.Lock()
  237. defer t.mut.Unlock()
  238. return t.lenRegistryLocked()
  239. }
  240. func (t *ProgressEmitter) lenRegistryLocked() (out int) {
  241. for _, pullers := range t.registry {
  242. out += len(pullers)
  243. }
  244. return out
  245. }
  246. func (t *ProgressEmitter) emptyLocked() bool {
  247. for _, pullers := range t.registry {
  248. if len(pullers) != 0 {
  249. return false
  250. }
  251. }
  252. return true
  253. }
  254. func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) {
  255. t.mut.Lock()
  256. defer t.mut.Unlock()
  257. t.connections[conn.ID()] = conn
  258. t.foldersByConns[conn.ID()] = folders
  259. }
  260. func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) {
  261. t.mut.Lock()
  262. defer t.mut.Unlock()
  263. delete(t.connections, conn.ID())
  264. delete(t.foldersByConns, conn.ID())
  265. }
  266. func (t *ProgressEmitter) clearLocked() {
  267. for id, state := range t.sentDownloadStates {
  268. conn, ok := t.connections[id]
  269. if !ok {
  270. continue
  271. }
  272. for _, folder := range state.folders() {
  273. if updates := state.cleanup(folder); len(updates) > 0 {
  274. conn.DownloadProgress(context.Background(), folder, updates)
  275. }
  276. }
  277. }
  278. t.registry = make(map[string]map[string]*sharedPullerState)
  279. t.sentDownloadStates = make(map[protocol.DeviceID]*sentDownloadState)
  280. t.connections = make(map[protocol.DeviceID]protocol.Connection)
  281. t.foldersByConns = make(map[protocol.DeviceID][]string)
  282. }