summaryservice.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "time"
  9. "github.com/syncthing/syncthing/lib/events"
  10. "github.com/syncthing/syncthing/lib/protocol"
  11. "github.com/syncthing/syncthing/lib/sync"
  12. "github.com/thejerf/suture"
  13. )
  14. // The folderSummaryService adds summary information events (FolderSummary and
  15. // FolderCompletion) into the event stream at certain intervals.
  16. type folderSummaryService struct {
  17. *suture.Supervisor
  18. cfg configIntf
  19. model modelIntf
  20. stop chan struct{}
  21. immediate chan string
  22. // For keeping track of folders to recalculate for
  23. foldersMut sync.Mutex
  24. folders map[string]struct{}
  25. // For keeping track of when the last event request on the API was
  26. lastEventReq time.Time
  27. lastEventReqMut sync.Mutex
  28. }
  29. func newFolderSummaryService(cfg configIntf, m modelIntf) *folderSummaryService {
  30. service := &folderSummaryService{
  31. Supervisor: suture.New("folderSummaryService", suture.Spec{
  32. PassThroughPanics: true,
  33. }),
  34. cfg: cfg,
  35. model: m,
  36. stop: make(chan struct{}),
  37. immediate: make(chan string),
  38. folders: make(map[string]struct{}),
  39. foldersMut: sync.NewMutex(),
  40. lastEventReqMut: sync.NewMutex(),
  41. }
  42. service.Add(serviceFunc(service.listenForUpdates))
  43. service.Add(serviceFunc(service.calculateSummaries))
  44. return service
  45. }
  46. func (c *folderSummaryService) Stop() {
  47. c.Supervisor.Stop()
  48. close(c.stop)
  49. }
  50. // listenForUpdates subscribes to the event bus and makes note of folders that
  51. // need their data recalculated.
  52. func (c *folderSummaryService) listenForUpdates() {
  53. sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged)
  54. defer events.Default.Unsubscribe(sub)
  55. for {
  56. // This loop needs to be fast so we don't miss too many events.
  57. select {
  58. case ev := <-sub.C():
  59. if ev.Type == events.DeviceConnected {
  60. // When a device connects we schedule a refresh of all
  61. // folders shared with that device.
  62. data := ev.Data.(map[string]string)
  63. deviceID, _ := protocol.DeviceIDFromString(data["id"])
  64. c.foldersMut.Lock()
  65. nextFolder:
  66. for _, folder := range c.cfg.Folders() {
  67. for _, dev := range folder.Devices {
  68. if dev.DeviceID == deviceID {
  69. c.folders[folder.ID] = struct{}{}
  70. continue nextFolder
  71. }
  72. }
  73. }
  74. c.foldersMut.Unlock()
  75. continue
  76. }
  77. // The other events all have a "folder" attribute that they
  78. // affect. Whenever the local or remote index is updated for a
  79. // given folder we make a note of it.
  80. data := ev.Data.(map[string]interface{})
  81. folder := data["folder"].(string)
  82. switch ev.Type {
  83. case events.StateChanged:
  84. if data["to"].(string) == "idle" && data["from"].(string) == "syncing" {
  85. // The folder changed to idle from syncing. We should do an
  86. // immediate refresh to update the GUI. The send to
  87. // c.immediate must be nonblocking so that we can continue
  88. // handling events.
  89. c.foldersMut.Lock()
  90. select {
  91. case c.immediate <- folder:
  92. delete(c.folders, folder)
  93. default:
  94. c.folders[folder] = struct{}{}
  95. }
  96. c.foldersMut.Unlock()
  97. }
  98. default:
  99. // This folder needs to be refreshed whenever we do the next
  100. // refresh.
  101. c.foldersMut.Lock()
  102. c.folders[folder] = struct{}{}
  103. c.foldersMut.Unlock()
  104. }
  105. case <-c.stop:
  106. return
  107. }
  108. }
  109. }
  110. // calculateSummaries periodically recalculates folder summaries and
  111. // completion percentage, and sends the results on the event bus.
  112. func (c *folderSummaryService) calculateSummaries() {
  113. const pumpInterval = 2 * time.Second
  114. pump := time.NewTimer(pumpInterval)
  115. for {
  116. select {
  117. case <-pump.C:
  118. t0 := time.Now()
  119. for _, folder := range c.foldersToHandle() {
  120. c.sendSummary(folder)
  121. }
  122. // We don't want to spend all our time calculating summaries. Lets
  123. // set an arbitrary limit at not spending more than about 30% of
  124. // our time here...
  125. wait := 2*time.Since(t0) + pumpInterval
  126. pump.Reset(wait)
  127. case folder := <-c.immediate:
  128. c.sendSummary(folder)
  129. case <-c.stop:
  130. return
  131. }
  132. }
  133. }
  134. // foldersToHandle returns the list of folders needing a summary update, and
  135. // clears the list.
  136. func (c *folderSummaryService) foldersToHandle() []string {
  137. // We only recalculate summaries if someone is listening to events
  138. // (a request to /rest/events has been made within the last
  139. // pingEventInterval).
  140. c.lastEventReqMut.Lock()
  141. last := c.lastEventReq
  142. c.lastEventReqMut.Unlock()
  143. if time.Since(last) > defaultEventTimeout {
  144. return nil
  145. }
  146. c.foldersMut.Lock()
  147. res := make([]string, 0, len(c.folders))
  148. for folder := range c.folders {
  149. res = append(res, folder)
  150. delete(c.folders, folder)
  151. }
  152. c.foldersMut.Unlock()
  153. return res
  154. }
  155. // sendSummary send the summary events for a single folder
  156. func (c *folderSummaryService) sendSummary(folder string) {
  157. // The folder summary contains how many bytes, files etc
  158. // are in the folder and how in sync we are.
  159. data, err := folderSummary(c.cfg, c.model, folder)
  160. if err != nil {
  161. return
  162. }
  163. events.Default.Log(events.FolderSummary, map[string]interface{}{
  164. "folder": folder,
  165. "summary": data,
  166. })
  167. for _, devCfg := range c.cfg.Folders()[folder].Devices {
  168. if devCfg.DeviceID.Equals(myID) {
  169. // We already know about ourselves.
  170. continue
  171. }
  172. if _, ok := c.model.Connection(devCfg.DeviceID); !ok {
  173. // We're not interested in disconnected devices.
  174. continue
  175. }
  176. // Get completion percentage of this folder for the
  177. // remote device.
  178. comp := jsonCompletion(c.model.Completion(devCfg.DeviceID, folder))
  179. comp["folder"] = folder
  180. comp["device"] = devCfg.DeviceID.String()
  181. events.Default.Log(events.FolderCompletion, comp)
  182. }
  183. }
  184. func (c *folderSummaryService) gotEventRequest() {
  185. c.lastEventReqMut.Lock()
  186. c.lastEventReq = time.Now()
  187. c.lastEventReqMut.Unlock()
  188. }
  189. // serviceFunc wraps a function to create a suture.Service without stop
  190. // functionality.
  191. type serviceFunc func()
  192. func (f serviceFunc) Serve() { f() }
  193. func (f serviceFunc) Stop() {}