summaryservice.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "time"
  9. "github.com/syncthing/syncthing/lib/events"
  10. "github.com/syncthing/syncthing/lib/protocol"
  11. "github.com/syncthing/syncthing/lib/sync"
  12. "github.com/thejerf/suture"
  13. )
  14. // The folderSummaryService adds summary information events (FolderSummary and
  15. // FolderCompletion) into the event stream at certain intervals.
  16. type folderSummaryService struct {
  17. *suture.Supervisor
  18. cfg configIntf
  19. model modelIntf
  20. stop chan struct{}
  21. immediate chan string
  22. // For keeping track of folders to recalculate for
  23. foldersMut sync.Mutex
  24. folders map[string]struct{}
  25. // For keeping track of when the last event request on the API was
  26. lastEventReq time.Time
  27. lastEventReqMut sync.Mutex
  28. }
  29. func newFolderSummaryService(cfg configIntf, m modelIntf) *folderSummaryService {
  30. service := &folderSummaryService{
  31. Supervisor: suture.NewSimple("folderSummaryService"),
  32. cfg: cfg,
  33. model: m,
  34. stop: make(chan struct{}),
  35. immediate: make(chan string),
  36. folders: make(map[string]struct{}),
  37. foldersMut: sync.NewMutex(),
  38. lastEventReqMut: sync.NewMutex(),
  39. }
  40. service.Add(serviceFunc(service.listenForUpdates))
  41. service.Add(serviceFunc(service.calculateSummaries))
  42. return service
  43. }
  44. func (c *folderSummaryService) Stop() {
  45. c.Supervisor.Stop()
  46. close(c.stop)
  47. }
  48. // listenForUpdates subscribes to the event bus and makes note of folders that
  49. // need their data recalculated.
  50. func (c *folderSummaryService) listenForUpdates() {
  51. sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected)
  52. defer events.Default.Unsubscribe(sub)
  53. for {
  54. // This loop needs to be fast so we don't miss too many events.
  55. select {
  56. case ev := <-sub.C():
  57. if ev.Type == events.DeviceConnected {
  58. // When a device connects we schedule a refresh of all
  59. // folders shared with that device.
  60. data := ev.Data.(map[string]string)
  61. deviceID, _ := protocol.DeviceIDFromString(data["id"])
  62. c.foldersMut.Lock()
  63. nextFolder:
  64. for _, folder := range c.cfg.Folders() {
  65. for _, dev := range folder.Devices {
  66. if dev.DeviceID == deviceID {
  67. c.folders[folder.ID] = struct{}{}
  68. continue nextFolder
  69. }
  70. }
  71. }
  72. c.foldersMut.Unlock()
  73. continue
  74. }
  75. // The other events all have a "folder" attribute that they
  76. // affect. Whenever the local or remote index is updated for a
  77. // given folder we make a note of it.
  78. data := ev.Data.(map[string]interface{})
  79. folder := data["folder"].(string)
  80. switch ev.Type {
  81. case events.StateChanged:
  82. if data["to"].(string) == "idle" && data["from"].(string) == "syncing" {
  83. // The folder changed to idle from syncing. We should do an
  84. // immediate refresh to update the GUI. The send to
  85. // c.immediate must be nonblocking so that we can continue
  86. // handling events.
  87. select {
  88. case c.immediate <- folder:
  89. c.foldersMut.Lock()
  90. delete(c.folders, folder)
  91. c.foldersMut.Unlock()
  92. default:
  93. }
  94. }
  95. default:
  96. // This folder needs to be refreshed whenever we do the next
  97. // refresh.
  98. c.foldersMut.Lock()
  99. c.folders[folder] = struct{}{}
  100. c.foldersMut.Unlock()
  101. }
  102. case <-c.stop:
  103. return
  104. }
  105. }
  106. }
  107. // calculateSummaries periodically recalculates folder summaries and
  108. // completion percentage, and sends the results on the event bus.
  109. func (c *folderSummaryService) calculateSummaries() {
  110. const pumpInterval = 2 * time.Second
  111. pump := time.NewTimer(pumpInterval)
  112. for {
  113. select {
  114. case <-pump.C:
  115. t0 := time.Now()
  116. for _, folder := range c.foldersToHandle() {
  117. c.sendSummary(folder)
  118. }
  119. // We don't want to spend all our time calculating summaries. Lets
  120. // set an arbitrary limit at not spending more than about 30% of
  121. // our time here...
  122. wait := 2*time.Since(t0) + pumpInterval
  123. pump.Reset(wait)
  124. case folder := <-c.immediate:
  125. c.sendSummary(folder)
  126. case <-c.stop:
  127. return
  128. }
  129. }
  130. }
  131. // foldersToHandle returns the list of folders needing a summary update, and
  132. // clears the list.
  133. func (c *folderSummaryService) foldersToHandle() []string {
  134. // We only recalculate summaries if someone is listening to events
  135. // (a request to /rest/events has been made within the last
  136. // pingEventInterval).
  137. c.lastEventReqMut.Lock()
  138. last := c.lastEventReq
  139. c.lastEventReqMut.Unlock()
  140. if time.Since(last) > pingEventInterval {
  141. return nil
  142. }
  143. c.foldersMut.Lock()
  144. res := make([]string, 0, len(c.folders))
  145. for folder := range c.folders {
  146. res = append(res, folder)
  147. delete(c.folders, folder)
  148. }
  149. c.foldersMut.Unlock()
  150. return res
  151. }
  152. // sendSummary send the summary events for a single folder
  153. func (c *folderSummaryService) sendSummary(folder string) {
  154. // The folder summary contains how many bytes, files etc
  155. // are in the folder and how in sync we are.
  156. data := folderSummary(c.cfg, c.model, folder)
  157. events.Default.Log(events.FolderSummary, map[string]interface{}{
  158. "folder": folder,
  159. "summary": data,
  160. })
  161. for _, devCfg := range c.cfg.Folders()[folder].Devices {
  162. if devCfg.DeviceID.Equals(myID) {
  163. // We already know about ourselves.
  164. continue
  165. }
  166. if !c.model.ConnectedTo(devCfg.DeviceID) {
  167. // We're not interested in disconnected devices.
  168. continue
  169. }
  170. // Get completion percentage of this folder for the
  171. // remote device.
  172. comp := c.model.Completion(devCfg.DeviceID, folder)
  173. events.Default.Log(events.FolderCompletion, map[string]interface{}{
  174. "folder": folder,
  175. "device": devCfg.DeviceID.String(),
  176. "completion": comp,
  177. })
  178. }
  179. }
  180. func (c *folderSummaryService) gotEventRequest() {
  181. c.lastEventReqMut.Lock()
  182. c.lastEventReq = time.Now()
  183. c.lastEventReqMut.Unlock()
  184. }
  185. // serviceFunc wraps a function to create a suture.Service without stop
  186. // functionality.
  187. type serviceFunc func()
  188. func (f serviceFunc) Serve() { f() }
  189. func (f serviceFunc) Stop() {}