folder_summary.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "fmt"
  9. "strings"
  10. "time"
  11. "github.com/thejerf/suture"
  12. "github.com/syncthing/syncthing/lib/config"
  13. "github.com/syncthing/syncthing/lib/events"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/sync"
  16. "github.com/syncthing/syncthing/lib/util"
  17. )
  18. const minSummaryInterval = time.Minute
  19. type FolderSummaryService interface {
  20. suture.Service
  21. Summary(folder string) (map[string]interface{}, error)
  22. OnEventRequest()
  23. }
  24. // The folderSummaryService adds summary information events (FolderSummary and
  25. // FolderCompletion) into the event stream at certain intervals.
  26. type folderSummaryService struct {
  27. *suture.Supervisor
  28. cfg config.Wrapper
  29. model Model
  30. id protocol.DeviceID
  31. evLogger events.Logger
  32. immediate chan string
  33. // For keeping track of folders to recalculate for
  34. foldersMut sync.Mutex
  35. folders map[string]struct{}
  36. // For keeping track of when the last event request on the API was
  37. lastEventReq time.Time
  38. lastEventReqMut sync.Mutex
  39. }
  40. func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService {
  41. service := &folderSummaryService{
  42. Supervisor: suture.New("folderSummaryService", suture.Spec{
  43. PassThroughPanics: true,
  44. }),
  45. cfg: cfg,
  46. model: m,
  47. id: id,
  48. evLogger: evLogger,
  49. immediate: make(chan string),
  50. folders: make(map[string]struct{}),
  51. foldersMut: sync.NewMutex(),
  52. lastEventReqMut: sync.NewMutex(),
  53. }
  54. service.Add(util.AsService(service.listenForUpdates))
  55. service.Add(util.AsService(service.calculateSummaries))
  56. return service
  57. }
  58. func (c *folderSummaryService) String() string {
  59. return fmt.Sprintf("FolderSummaryService@%p", c)
  60. }
  61. func (c *folderSummaryService) Summary(folder string) (map[string]interface{}, error) {
  62. var res = make(map[string]interface{})
  63. errors, err := c.model.FolderErrors(folder)
  64. if err != nil && err != ErrFolderPaused {
  65. // Stats from the db can still be obtained if the folder is just paused
  66. return nil, err
  67. }
  68. res["errors"] = len(errors)
  69. res["pullErrors"] = len(errors) // deprecated
  70. res["invalid"] = "" // Deprecated, retains external API for now
  71. global := c.model.GlobalSize(folder)
  72. res["globalFiles"], res["globalDirectories"], res["globalSymlinks"], res["globalDeleted"], res["globalBytes"], res["globalTotalItems"] = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems()
  73. local := c.model.LocalSize(folder)
  74. res["localFiles"], res["localDirectories"], res["localSymlinks"], res["localDeleted"], res["localBytes"], res["localTotalItems"] = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems()
  75. need := c.model.NeedSize(folder)
  76. res["needFiles"], res["needDirectories"], res["needSymlinks"], res["needDeletes"], res["needBytes"], res["needTotalItems"] = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems()
  77. if c.cfg.Folders()[folder].Type == config.FolderTypeReceiveOnly {
  78. // Add statistics for things that have changed locally in a receive
  79. // only folder.
  80. ro := c.model.ReceiveOnlyChangedSize(folder)
  81. res["receiveOnlyChangedFiles"] = ro.Files
  82. res["receiveOnlyChangedDirectories"] = ro.Directories
  83. res["receiveOnlyChangedSymlinks"] = ro.Symlinks
  84. res["receiveOnlyChangedDeletes"] = ro.Deleted
  85. res["receiveOnlyChangedBytes"] = ro.Bytes
  86. res["receiveOnlyTotalItems"] = ro.TotalItems()
  87. }
  88. res["inSyncFiles"], res["inSyncBytes"] = global.Files-need.Files, global.Bytes-need.Bytes
  89. res["state"], res["stateChanged"], err = c.model.State(folder)
  90. if err != nil {
  91. res["error"] = err.Error()
  92. }
  93. ourSeq, _ := c.model.CurrentSequence(folder)
  94. remoteSeq, _ := c.model.RemoteSequence(folder)
  95. res["version"] = ourSeq + remoteSeq // legacy
  96. res["sequence"] = ourSeq + remoteSeq // new name
  97. ignorePatterns, _, _ := c.model.GetIgnores(folder)
  98. res["ignorePatterns"] = false
  99. for _, line := range ignorePatterns {
  100. if len(line) > 0 && !strings.HasPrefix(line, "//") {
  101. res["ignorePatterns"] = true
  102. break
  103. }
  104. }
  105. err = c.model.WatchError(folder)
  106. if err != nil {
  107. res["watchError"] = err.Error()
  108. }
  109. return res, nil
  110. }
  111. func (c *folderSummaryService) OnEventRequest() {
  112. c.lastEventReqMut.Lock()
  113. c.lastEventReq = time.Now()
  114. c.lastEventReqMut.Unlock()
  115. }
  116. // listenForUpdates subscribes to the event bus and makes note of folders that
  117. // need their data recalculated.
  118. func (c *folderSummaryService) listenForUpdates(stop chan struct{}) {
  119. sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress)
  120. defer sub.Unsubscribe()
  121. for {
  122. // This loop needs to be fast so we don't miss too many events.
  123. select {
  124. case ev := <-sub.C():
  125. c.processUpdate(ev)
  126. case <-stop:
  127. return
  128. }
  129. }
  130. }
  131. func (c *folderSummaryService) processUpdate(ev events.Event) {
  132. var folder string
  133. switch ev.Type {
  134. case events.DeviceConnected:
  135. // When a device connects we schedule a refresh of all
  136. // folders shared with that device.
  137. data := ev.Data.(map[string]string)
  138. deviceID, _ := protocol.DeviceIDFromString(data["id"])
  139. c.foldersMut.Lock()
  140. nextFolder:
  141. for _, folder := range c.cfg.Folders() {
  142. for _, dev := range folder.Devices {
  143. if dev.DeviceID == deviceID {
  144. c.folders[folder.ID] = struct{}{}
  145. continue nextFolder
  146. }
  147. }
  148. }
  149. c.foldersMut.Unlock()
  150. return
  151. case events.DownloadProgress:
  152. data := ev.Data.(map[string]map[string]*pullerProgress)
  153. c.foldersMut.Lock()
  154. for folder := range data {
  155. c.folders[folder] = struct{}{}
  156. }
  157. c.foldersMut.Unlock()
  158. return
  159. case events.StateChanged:
  160. data := ev.Data.(map[string]interface{})
  161. if !(data["to"].(string) == "idle" && data["from"].(string) == "syncing") {
  162. return
  163. }
  164. // The folder changed to idle from syncing. We should do an
  165. // immediate refresh to update the GUI. The send to
  166. // c.immediate must be nonblocking so that we can continue
  167. // handling events.
  168. folder = data["folder"].(string)
  169. select {
  170. case c.immediate <- folder:
  171. c.foldersMut.Lock()
  172. delete(c.folders, folder)
  173. c.foldersMut.Unlock()
  174. return
  175. default:
  176. // Refresh whenever we do the next summary.
  177. }
  178. default:
  179. // The other events all have a "folder" attribute that they
  180. // affect. Whenever the local or remote index is updated for a
  181. // given folder we make a note of it.
  182. // This folder needs to be refreshed whenever we do the next
  183. // refresh.
  184. folder = ev.Data.(map[string]interface{})["folder"].(string)
  185. }
  186. c.foldersMut.Lock()
  187. c.folders[folder] = struct{}{}
  188. c.foldersMut.Unlock()
  189. }
  190. // calculateSummaries periodically recalculates folder summaries and
  191. // completion percentage, and sends the results on the event bus.
  192. func (c *folderSummaryService) calculateSummaries(stop chan struct{}) {
  193. const pumpInterval = 2 * time.Second
  194. pump := time.NewTimer(pumpInterval)
  195. for {
  196. select {
  197. case <-pump.C:
  198. t0 := time.Now()
  199. for _, folder := range c.foldersToHandle() {
  200. c.sendSummary(folder)
  201. }
  202. // We don't want to spend all our time calculating summaries. Lets
  203. // set an arbitrary limit at not spending more than about 30% of
  204. // our time here...
  205. wait := 2*time.Since(t0) + pumpInterval
  206. pump.Reset(wait)
  207. case folder := <-c.immediate:
  208. c.sendSummary(folder)
  209. case <-stop:
  210. return
  211. }
  212. }
  213. }
  214. // foldersToHandle returns the list of folders needing a summary update, and
  215. // clears the list.
  216. func (c *folderSummaryService) foldersToHandle() []string {
  217. // We only recalculate summaries if someone is listening to events
  218. // (a request to /rest/events has been made within the last
  219. // pingEventInterval).
  220. c.lastEventReqMut.Lock()
  221. last := c.lastEventReq
  222. c.lastEventReqMut.Unlock()
  223. if time.Since(last) > minSummaryInterval {
  224. return nil
  225. }
  226. c.foldersMut.Lock()
  227. res := make([]string, 0, len(c.folders))
  228. for folder := range c.folders {
  229. res = append(res, folder)
  230. delete(c.folders, folder)
  231. }
  232. c.foldersMut.Unlock()
  233. return res
  234. }
  235. // sendSummary send the summary events for a single folder
  236. func (c *folderSummaryService) sendSummary(folder string) {
  237. // The folder summary contains how many bytes, files etc
  238. // are in the folder and how in sync we are.
  239. data, err := c.Summary(folder)
  240. if err != nil {
  241. return
  242. }
  243. c.evLogger.Log(events.FolderSummary, map[string]interface{}{
  244. "folder": folder,
  245. "summary": data,
  246. })
  247. for _, devCfg := range c.cfg.Folders()[folder].Devices {
  248. if devCfg.DeviceID.Equals(c.id) {
  249. // We already know about ourselves.
  250. continue
  251. }
  252. if _, ok := c.model.Connection(devCfg.DeviceID); !ok {
  253. // We're not interested in disconnected devices.
  254. continue
  255. }
  256. // Get completion percentage of this folder for the
  257. // remote device.
  258. comp := c.model.Completion(devCfg.DeviceID, folder).Map()
  259. comp["folder"] = folder
  260. comp["device"] = devCfg.DeviceID.String()
  261. c.evLogger.Log(events.FolderCompletion, comp)
  262. }
  263. }