folder_summary.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "fmt"
  9. "strings"
  10. "time"
  11. "github.com/syncthing/syncthing/lib/config"
  12. "github.com/syncthing/syncthing/lib/events"
  13. "github.com/syncthing/syncthing/lib/protocol"
  14. "github.com/syncthing/syncthing/lib/sync"
  15. "github.com/thejerf/suture"
  16. )
  17. const minSummaryInterval = time.Minute
  18. type FolderSummaryService interface {
  19. suture.Service
  20. Summary(folder string) (map[string]interface{}, error)
  21. OnEventRequest()
  22. }
  23. // The folderSummaryService adds summary information events (FolderSummary and
  24. // FolderCompletion) into the event stream at certain intervals.
  25. type folderSummaryService struct {
  26. *suture.Supervisor
  27. cfg config.Wrapper
  28. model Model
  29. id protocol.DeviceID
  30. stop chan struct{}
  31. immediate chan string
  32. // For keeping track of folders to recalculate for
  33. foldersMut sync.Mutex
  34. folders map[string]struct{}
  35. // For keeping track of when the last event request on the API was
  36. lastEventReq time.Time
  37. lastEventReqMut sync.Mutex
  38. }
  39. func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID) FolderSummaryService {
  40. service := &folderSummaryService{
  41. Supervisor: suture.New("folderSummaryService", suture.Spec{
  42. PassThroughPanics: true,
  43. }),
  44. cfg: cfg,
  45. model: m,
  46. id: id,
  47. stop: make(chan struct{}),
  48. immediate: make(chan string),
  49. folders: make(map[string]struct{}),
  50. foldersMut: sync.NewMutex(),
  51. lastEventReqMut: sync.NewMutex(),
  52. }
  53. service.Add(serviceFunc(service.listenForUpdates))
  54. service.Add(serviceFunc(service.calculateSummaries))
  55. return service
  56. }
  57. func (c *folderSummaryService) Stop() {
  58. c.Supervisor.Stop()
  59. close(c.stop)
  60. }
  61. func (c *folderSummaryService) String() string {
  62. return fmt.Sprintf("FolderSummaryService@%p", c)
  63. }
  64. func (c *folderSummaryService) Summary(folder string) (map[string]interface{}, error) {
  65. var res = make(map[string]interface{})
  66. errors, err := c.model.FolderErrors(folder)
  67. if err != nil && err != ErrFolderPaused {
  68. // Stats from the db can still be obtained if the folder is just paused
  69. return nil, err
  70. }
  71. res["errors"] = len(errors)
  72. res["pullErrors"] = len(errors) // deprecated
  73. res["invalid"] = "" // Deprecated, retains external API for now
  74. global := c.model.GlobalSize(folder)
  75. res["globalFiles"], res["globalDirectories"], res["globalSymlinks"], res["globalDeleted"], res["globalBytes"], res["globalTotalItems"] = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems()
  76. local := c.model.LocalSize(folder)
  77. res["localFiles"], res["localDirectories"], res["localSymlinks"], res["localDeleted"], res["localBytes"], res["localTotalItems"] = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems()
  78. need := c.model.NeedSize(folder)
  79. res["needFiles"], res["needDirectories"], res["needSymlinks"], res["needDeletes"], res["needBytes"], res["needTotalItems"] = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems()
  80. if c.cfg.Folders()[folder].Type == config.FolderTypeReceiveOnly {
  81. // Add statistics for things that have changed locally in a receive
  82. // only folder.
  83. ro := c.model.ReceiveOnlyChangedSize(folder)
  84. res["receiveOnlyChangedFiles"] = ro.Files
  85. res["receiveOnlyChangedDirectories"] = ro.Directories
  86. res["receiveOnlyChangedSymlinks"] = ro.Symlinks
  87. res["receiveOnlyChangedDeletes"] = ro.Deleted
  88. res["receiveOnlyChangedBytes"] = ro.Bytes
  89. res["receiveOnlyTotalItems"] = ro.TotalItems()
  90. }
  91. res["inSyncFiles"], res["inSyncBytes"] = global.Files-need.Files, global.Bytes-need.Bytes
  92. res["state"], res["stateChanged"], err = c.model.State(folder)
  93. if err != nil {
  94. res["error"] = err.Error()
  95. }
  96. ourSeq, _ := c.model.CurrentSequence(folder)
  97. remoteSeq, _ := c.model.RemoteSequence(folder)
  98. res["version"] = ourSeq + remoteSeq // legacy
  99. res["sequence"] = ourSeq + remoteSeq // new name
  100. ignorePatterns, _, _ := c.model.GetIgnores(folder)
  101. res["ignorePatterns"] = false
  102. for _, line := range ignorePatterns {
  103. if len(line) > 0 && !strings.HasPrefix(line, "//") {
  104. res["ignorePatterns"] = true
  105. break
  106. }
  107. }
  108. err = c.model.WatchError(folder)
  109. if err != nil {
  110. res["watchError"] = err.Error()
  111. }
  112. return res, nil
  113. }
  114. func (c *folderSummaryService) OnEventRequest() {
  115. c.lastEventReqMut.Lock()
  116. c.lastEventReq = time.Now()
  117. c.lastEventReqMut.Unlock()
  118. }
  119. // listenForUpdates subscribes to the event bus and makes note of folders that
  120. // need their data recalculated.
  121. func (c *folderSummaryService) listenForUpdates() {
  122. sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged)
  123. defer events.Default.Unsubscribe(sub)
  124. for {
  125. // This loop needs to be fast so we don't miss too many events.
  126. select {
  127. case ev := <-sub.C():
  128. if ev.Type == events.DeviceConnected {
  129. // When a device connects we schedule a refresh of all
  130. // folders shared with that device.
  131. data := ev.Data.(map[string]string)
  132. deviceID, _ := protocol.DeviceIDFromString(data["id"])
  133. c.foldersMut.Lock()
  134. nextFolder:
  135. for _, folder := range c.cfg.Folders() {
  136. for _, dev := range folder.Devices {
  137. if dev.DeviceID == deviceID {
  138. c.folders[folder.ID] = struct{}{}
  139. continue nextFolder
  140. }
  141. }
  142. }
  143. c.foldersMut.Unlock()
  144. continue
  145. }
  146. // The other events all have a "folder" attribute that they
  147. // affect. Whenever the local or remote index is updated for a
  148. // given folder we make a note of it.
  149. data := ev.Data.(map[string]interface{})
  150. folder := data["folder"].(string)
  151. switch ev.Type {
  152. case events.StateChanged:
  153. if data["to"].(string) == "idle" && data["from"].(string) == "syncing" {
  154. // The folder changed to idle from syncing. We should do an
  155. // immediate refresh to update the GUI. The send to
  156. // c.immediate must be nonblocking so that we can continue
  157. // handling events.
  158. c.foldersMut.Lock()
  159. select {
  160. case c.immediate <- folder:
  161. delete(c.folders, folder)
  162. default:
  163. c.folders[folder] = struct{}{}
  164. }
  165. c.foldersMut.Unlock()
  166. }
  167. default:
  168. // This folder needs to be refreshed whenever we do the next
  169. // refresh.
  170. c.foldersMut.Lock()
  171. c.folders[folder] = struct{}{}
  172. c.foldersMut.Unlock()
  173. }
  174. case <-c.stop:
  175. return
  176. }
  177. }
  178. }
  179. // calculateSummaries periodically recalculates folder summaries and
  180. // completion percentage, and sends the results on the event bus.
  181. func (c *folderSummaryService) calculateSummaries() {
  182. const pumpInterval = 2 * time.Second
  183. pump := time.NewTimer(pumpInterval)
  184. for {
  185. select {
  186. case <-pump.C:
  187. t0 := time.Now()
  188. for _, folder := range c.foldersToHandle() {
  189. c.sendSummary(folder)
  190. }
  191. // We don't want to spend all our time calculating summaries. Lets
  192. // set an arbitrary limit at not spending more than about 30% of
  193. // our time here...
  194. wait := 2*time.Since(t0) + pumpInterval
  195. pump.Reset(wait)
  196. case folder := <-c.immediate:
  197. c.sendSummary(folder)
  198. case <-c.stop:
  199. return
  200. }
  201. }
  202. }
  203. // foldersToHandle returns the list of folders needing a summary update, and
  204. // clears the list.
  205. func (c *folderSummaryService) foldersToHandle() []string {
  206. // We only recalculate summaries if someone is listening to events
  207. // (a request to /rest/events has been made within the last
  208. // pingEventInterval).
  209. c.lastEventReqMut.Lock()
  210. last := c.lastEventReq
  211. c.lastEventReqMut.Unlock()
  212. if time.Since(last) > minSummaryInterval {
  213. return nil
  214. }
  215. c.foldersMut.Lock()
  216. res := make([]string, 0, len(c.folders))
  217. for folder := range c.folders {
  218. res = append(res, folder)
  219. delete(c.folders, folder)
  220. }
  221. c.foldersMut.Unlock()
  222. return res
  223. }
  224. // sendSummary send the summary events for a single folder
  225. func (c *folderSummaryService) sendSummary(folder string) {
  226. // The folder summary contains how many bytes, files etc
  227. // are in the folder and how in sync we are.
  228. data, err := c.Summary(folder)
  229. if err != nil {
  230. return
  231. }
  232. events.Default.Log(events.FolderSummary, map[string]interface{}{
  233. "folder": folder,
  234. "summary": data,
  235. })
  236. for _, devCfg := range c.cfg.Folders()[folder].Devices {
  237. if devCfg.DeviceID.Equals(c.id) {
  238. // We already know about ourselves.
  239. continue
  240. }
  241. if _, ok := c.model.Connection(devCfg.DeviceID); !ok {
  242. // We're not interested in disconnected devices.
  243. continue
  244. }
  245. // Get completion percentage of this folder for the
  246. // remote device.
  247. comp := c.model.Completion(devCfg.DeviceID, folder).Map()
  248. comp["folder"] = folder
  249. comp["device"] = devCfg.DeviceID.String()
  250. events.Default.Log(events.FolderCompletion, comp)
  251. }
  252. }
  253. // serviceFunc wraps a function to create a suture.Service without stop
  254. // functionality.
  255. type serviceFunc func()
  256. func (f serviceFunc) Serve() { f() }
  257. func (f serviceFunc) Stop() {}