folder_summary.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate counterfeiter -o mocks/folderSummaryService.go --fake-name FolderSummaryService . FolderSummaryService
  7. package model
  8. import (
  9. "context"
  10. "fmt"
  11. "strings"
  12. "time"
  13. "github.com/thejerf/suture/v4"
  14. "github.com/syncthing/syncthing/lib/config"
  15. "github.com/syncthing/syncthing/lib/db"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. "github.com/syncthing/syncthing/lib/svcutil"
  19. "github.com/syncthing/syncthing/lib/sync"
  20. )
  21. const maxDurationSinceLastEventReq = time.Minute
  22. type FolderSummaryService interface {
  23. suture.Service
  24. Summary(folder string) (map[string]interface{}, error)
  25. OnEventRequest()
  26. }
  27. // The folderSummaryService adds summary information events (FolderSummary and
  28. // FolderCompletion) into the event stream at certain intervals.
  29. type folderSummaryService struct {
  30. *suture.Supervisor
  31. cfg config.Wrapper
  32. model Model
  33. id protocol.DeviceID
  34. evLogger events.Logger
  35. immediate chan string
  36. // For keeping track of folders to recalculate for
  37. foldersMut sync.Mutex
  38. folders map[string]struct{}
  39. // For keeping track of when the last event request on the API was
  40. lastEventReq time.Time
  41. lastEventReqMut sync.Mutex
  42. }
  43. func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService {
  44. service := &folderSummaryService{
  45. Supervisor: suture.New("folderSummaryService", svcutil.SpecWithDebugLogger(l)),
  46. cfg: cfg,
  47. model: m,
  48. id: id,
  49. evLogger: evLogger,
  50. immediate: make(chan string),
  51. folders: make(map[string]struct{}),
  52. foldersMut: sync.NewMutex(),
  53. lastEventReqMut: sync.NewMutex(),
  54. }
  55. service.Add(svcutil.AsService(service.listenForUpdates, fmt.Sprintf("%s/listenForUpdates", service)))
  56. service.Add(svcutil.AsService(service.calculateSummaries, fmt.Sprintf("%s/calculateSummaries", service)))
  57. return service
  58. }
  59. func (c *folderSummaryService) String() string {
  60. return fmt.Sprintf("FolderSummaryService@%p", c)
  61. }
  62. func (c *folderSummaryService) Summary(folder string) (map[string]interface{}, error) {
  63. var res = make(map[string]interface{})
  64. var local, global, need, ro db.Counts
  65. var ourSeq, remoteSeq int64
  66. errors, err := c.model.FolderErrors(folder)
  67. if err == nil {
  68. var snap *db.Snapshot
  69. if snap, err = c.model.DBSnapshot(folder); err == nil {
  70. global = snap.GlobalSize()
  71. local = snap.LocalSize()
  72. need = snap.NeedSize(protocol.LocalDeviceID)
  73. ro = snap.ReceiveOnlyChangedSize()
  74. ourSeq = snap.Sequence(protocol.LocalDeviceID)
  75. remoteSeq = snap.Sequence(protocol.GlobalDeviceID)
  76. snap.Release()
  77. }
  78. }
  79. // For API backwards compatibility (SyncTrayzor needs it) an empty folder
  80. // summary is returned for not running folders, an error might actually be
  81. // more appropriate
  82. if err != nil && err != ErrFolderPaused && err != errFolderNotRunning {
  83. return nil, err
  84. }
  85. res["errors"] = len(errors)
  86. res["pullErrors"] = len(errors) // deprecated
  87. res["invalid"] = "" // Deprecated, retains external API for now
  88. res["globalFiles"], res["globalDirectories"], res["globalSymlinks"], res["globalDeleted"], res["globalBytes"], res["globalTotalItems"] = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems()
  89. res["localFiles"], res["localDirectories"], res["localSymlinks"], res["localDeleted"], res["localBytes"], res["localTotalItems"] = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems()
  90. fcfg, haveFcfg := c.cfg.Folder(folder)
  91. if haveFcfg && fcfg.IgnoreDelete {
  92. need.Deleted = 0
  93. }
  94. need.Bytes -= c.model.FolderProgressBytesCompleted(folder)
  95. // This may happen if we are in progress of pulling files that were
  96. // deleted globally after the pull started.
  97. if need.Bytes < 0 {
  98. need.Bytes = 0
  99. }
  100. res["needFiles"], res["needDirectories"], res["needSymlinks"], res["needDeletes"], res["needBytes"], res["needTotalItems"] = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems()
  101. if haveFcfg && (fcfg.Type == config.FolderTypeReceiveOnly || fcfg.Type == config.FolderTypeReceiveEncrypted) {
  102. // Add statistics for things that have changed locally in a receive
  103. // only or receive encrypted folder.
  104. res["receiveOnlyChangedFiles"] = ro.Files
  105. res["receiveOnlyChangedDirectories"] = ro.Directories
  106. res["receiveOnlyChangedSymlinks"] = ro.Symlinks
  107. res["receiveOnlyChangedDeletes"] = ro.Deleted
  108. res["receiveOnlyChangedBytes"] = ro.Bytes
  109. res["receiveOnlyTotalItems"] = ro.TotalItems()
  110. }
  111. res["inSyncFiles"], res["inSyncBytes"] = global.Files-need.Files, global.Bytes-need.Bytes
  112. res["state"], res["stateChanged"], err = c.model.State(folder)
  113. if err != nil {
  114. res["error"] = err.Error()
  115. }
  116. res["version"] = ourSeq + remoteSeq // legacy
  117. res["sequence"] = ourSeq + remoteSeq // new name
  118. ignorePatterns, _, _ := c.model.CurrentIgnores(folder)
  119. res["ignorePatterns"] = false
  120. for _, line := range ignorePatterns {
  121. if len(line) > 0 && !strings.HasPrefix(line, "//") {
  122. res["ignorePatterns"] = true
  123. break
  124. }
  125. }
  126. err = c.model.WatchError(folder)
  127. if err != nil {
  128. res["watchError"] = err.Error()
  129. }
  130. return res, nil
  131. }
  132. func (c *folderSummaryService) OnEventRequest() {
  133. c.lastEventReqMut.Lock()
  134. c.lastEventReq = time.Now()
  135. c.lastEventReqMut.Unlock()
  136. }
  137. // listenForUpdates subscribes to the event bus and makes note of folders that
  138. // need their data recalculated.
  139. func (c *folderSummaryService) listenForUpdates(ctx context.Context) error {
  140. sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress)
  141. defer sub.Unsubscribe()
  142. for {
  143. // This loop needs to be fast so we don't miss too many events.
  144. select {
  145. case ev := <-sub.C():
  146. c.processUpdate(ev)
  147. case <-ctx.Done():
  148. return ctx.Err()
  149. }
  150. }
  151. }
  152. func (c *folderSummaryService) processUpdate(ev events.Event) {
  153. var folder string
  154. switch ev.Type {
  155. case events.DeviceConnected:
  156. // When a device connects we schedule a refresh of all
  157. // folders shared with that device.
  158. data := ev.Data.(map[string]string)
  159. deviceID, _ := protocol.DeviceIDFromString(data["id"])
  160. c.foldersMut.Lock()
  161. nextFolder:
  162. for _, folder := range c.cfg.Folders() {
  163. for _, dev := range folder.Devices {
  164. if dev.DeviceID == deviceID {
  165. c.folders[folder.ID] = struct{}{}
  166. continue nextFolder
  167. }
  168. }
  169. }
  170. c.foldersMut.Unlock()
  171. return
  172. case events.DownloadProgress:
  173. data := ev.Data.(map[string]map[string]*pullerProgress)
  174. c.foldersMut.Lock()
  175. for folder := range data {
  176. c.folders[folder] = struct{}{}
  177. }
  178. c.foldersMut.Unlock()
  179. return
  180. case events.StateChanged:
  181. data := ev.Data.(map[string]interface{})
  182. if data["to"].(string) != "idle" {
  183. return
  184. }
  185. if from := data["from"].(string); from != "syncing" && from != "sync-preparing" {
  186. return
  187. }
  188. // The folder changed to idle from syncing. We should do an
  189. // immediate refresh to update the GUI. The send to
  190. // c.immediate must be nonblocking so that we can continue
  191. // handling events.
  192. folder = data["folder"].(string)
  193. select {
  194. case c.immediate <- folder:
  195. c.foldersMut.Lock()
  196. delete(c.folders, folder)
  197. c.foldersMut.Unlock()
  198. return
  199. default:
  200. // Refresh whenever we do the next summary.
  201. }
  202. default:
  203. // The other events all have a "folder" attribute that they
  204. // affect. Whenever the local or remote index is updated for a
  205. // given folder we make a note of it.
  206. // This folder needs to be refreshed whenever we do the next
  207. // refresh.
  208. folder = ev.Data.(map[string]interface{})["folder"].(string)
  209. }
  210. c.foldersMut.Lock()
  211. c.folders[folder] = struct{}{}
  212. c.foldersMut.Unlock()
  213. }
  214. // calculateSummaries periodically recalculates folder summaries and
  215. // completion percentage, and sends the results on the event bus.
  216. func (c *folderSummaryService) calculateSummaries(ctx context.Context) error {
  217. const pumpInterval = 2 * time.Second
  218. pump := time.NewTimer(pumpInterval)
  219. for {
  220. select {
  221. case <-pump.C:
  222. t0 := time.Now()
  223. for _, folder := range c.foldersToHandle() {
  224. select {
  225. case <-ctx.Done():
  226. return ctx.Err()
  227. default:
  228. }
  229. c.sendSummary(ctx, folder)
  230. }
  231. // We don't want to spend all our time calculating summaries. Lets
  232. // set an arbitrary limit at not spending more than about 30% of
  233. // our time here...
  234. wait := 2*time.Since(t0) + pumpInterval
  235. pump.Reset(wait)
  236. case folder := <-c.immediate:
  237. c.sendSummary(ctx, folder)
  238. case <-ctx.Done():
  239. return ctx.Err()
  240. }
  241. }
  242. }
  243. // foldersToHandle returns the list of folders needing a summary update, and
  244. // clears the list.
  245. func (c *folderSummaryService) foldersToHandle() []string {
  246. // We only recalculate summaries if someone is listening to events
  247. // (a request to /rest/events has been made within the last
  248. // pingEventInterval).
  249. c.lastEventReqMut.Lock()
  250. last := c.lastEventReq
  251. c.lastEventReqMut.Unlock()
  252. if time.Since(last) > maxDurationSinceLastEventReq {
  253. return nil
  254. }
  255. c.foldersMut.Lock()
  256. res := make([]string, 0, len(c.folders))
  257. for folder := range c.folders {
  258. res = append(res, folder)
  259. delete(c.folders, folder)
  260. }
  261. c.foldersMut.Unlock()
  262. return res
  263. }
  264. // sendSummary send the summary events for a single folder
  265. func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
  266. // The folder summary contains how many bytes, files etc
  267. // are in the folder and how in sync we are.
  268. data, err := c.Summary(folder)
  269. if err != nil {
  270. return
  271. }
  272. c.evLogger.Log(events.FolderSummary, map[string]interface{}{
  273. "folder": folder,
  274. "summary": data,
  275. })
  276. for _, devCfg := range c.cfg.Folders()[folder].Devices {
  277. select {
  278. case <-ctx.Done():
  279. return
  280. default:
  281. }
  282. if devCfg.DeviceID.Equals(c.id) {
  283. // We already know about ourselves.
  284. continue
  285. }
  286. if _, ok := c.model.Connection(devCfg.DeviceID); !ok {
  287. // We're not interested in disconnected devices.
  288. continue
  289. }
  290. // Get completion percentage of this folder for the
  291. // remote device.
  292. comp := c.model.Completion(devCfg.DeviceID, folder).Map()
  293. comp["folder"] = folder
  294. comp["device"] = devCfg.DeviceID.String()
  295. c.evLogger.Log(events.FolderCompletion, comp)
  296. }
  297. }