folder_summary.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
  7. //go:generate counterfeiter -o mocks/folderSummaryService.go --fake-name FolderSummaryService . FolderSummaryService
  8. package model
  9. import (
  10. "context"
  11. "fmt"
  12. "strings"
  13. "time"
  14. "github.com/thejerf/suture/v4"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/db"
  17. "github.com/syncthing/syncthing/lib/events"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/syncthing/syncthing/lib/svcutil"
  20. "github.com/syncthing/syncthing/lib/sync"
  21. )
  22. const maxDurationSinceLastEventReq = time.Minute
  23. type FolderSummaryService interface {
  24. suture.Service
  25. Summary(folder string) (*FolderSummary, error)
  26. OnEventRequest()
  27. }
  28. // The folderSummaryService adds summary information events (FolderSummary and
  29. // FolderCompletion) into the event stream at certain intervals.
  30. type folderSummaryService struct {
  31. *suture.Supervisor
  32. cfg config.Wrapper
  33. model Model
  34. id protocol.DeviceID
  35. evLogger events.Logger
  36. immediate chan string
  37. // For keeping track of folders to recalculate for
  38. foldersMut sync.Mutex
  39. folders map[string]struct{}
  40. // For keeping track of when the last event request on the API was
  41. lastEventReq time.Time
  42. lastEventReqMut sync.Mutex
  43. }
  44. func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService {
  45. service := &folderSummaryService{
  46. Supervisor: suture.New("folderSummaryService", svcutil.SpecWithDebugLogger(l)),
  47. cfg: cfg,
  48. model: m,
  49. id: id,
  50. evLogger: evLogger,
  51. immediate: make(chan string),
  52. folders: make(map[string]struct{}),
  53. foldersMut: sync.NewMutex(),
  54. lastEventReqMut: sync.NewMutex(),
  55. }
  56. service.Add(svcutil.AsService(service.listenForUpdates, fmt.Sprintf("%s/listenForUpdates", service)))
  57. service.Add(svcutil.AsService(service.calculateSummaries, fmt.Sprintf("%s/calculateSummaries", service)))
  58. return service
  59. }
  60. func (c *folderSummaryService) String() string {
  61. return fmt.Sprintf("FolderSummaryService@%p", c)
  62. }
  63. // FolderSummary replaces the previously used map[string]interface{}, and needs
  64. // to keep the structure/naming for api backwards compatibility
  65. type FolderSummary struct {
  66. Errors int `json:"errors"`
  67. PullErrors int `json:"pullErrors"` // deprecated
  68. Invalid string `json:"invalid"` // deprecated
  69. GlobalFiles int `json:"globalFiles"`
  70. GlobalDirectories int `json:"globalDirectories"`
  71. GlobalSymlinks int `json:"globalSymlinks"`
  72. GlobalDeleted int `json:"globalDeleted"`
  73. GlobalBytes int64 `json:"globalBytes"`
  74. GlobalTotalItems int `json:"globalTotalItems"`
  75. LocalFiles int `json:"localFiles"`
  76. LocalDirectories int `json:"localDirectories"`
  77. LocalSymlinks int `json:"localSymlinks"`
  78. LocalDeleted int `json:"localDeleted"`
  79. LocalBytes int64 `json:"localBytes"`
  80. LocalTotalItems int `json:"localTotalItems"`
  81. NeedFiles int `json:"needFiles"`
  82. NeedDirectories int `json:"needDirectories"`
  83. NeedSymlinks int `json:"needSymlinks"`
  84. NeedDeletes int `json:"needDeletes"`
  85. NeedBytes int64 `json:"needBytes"`
  86. NeedTotalItems int `json:"needTotalItems"`
  87. ReceiveOnlyChangedFiles int `json:"receiveOnlyChangedFiles"`
  88. ReceiveOnlyChangedDirectories int `json:"receiveOnlyChangedDirectories"`
  89. ReceiveOnlyChangedSymlinks int `json:"receiveOnlyChangedSymlinks"`
  90. ReceiveOnlyChangedDeletes int `json:"receiveOnlyChangedDeletes"`
  91. ReceiveOnlyChangedBytes int64 `json:"receiveOnlyChangedBytes"`
  92. ReceiveOnlyTotalItems int `json:"receiveOnlyTotalItems"`
  93. InSyncFiles int `json:"inSyncFiles"`
  94. InSyncBytes int64 `json:"inSyncBytes"`
  95. State string `json:"state"`
  96. StateChanged time.Time `json:"stateChanged"`
  97. Error string `json:"error"`
  98. Version int64 `json:"version"` // deprecated
  99. Sequence int64 `json:"sequence"`
  100. IgnorePatterns bool `json:"ignorePatterns"`
  101. WatchError string `json:"watchError"`
  102. }
  103. func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) {
  104. res := new(FolderSummary)
  105. var local, global, need, ro db.Counts
  106. var ourSeq, remoteSeq int64
  107. errors, err := c.model.FolderErrors(folder)
  108. if err == nil {
  109. var snap *db.Snapshot
  110. if snap, err = c.model.DBSnapshot(folder); err == nil {
  111. global = snap.GlobalSize()
  112. local = snap.LocalSize()
  113. need = snap.NeedSize(protocol.LocalDeviceID)
  114. ro = snap.ReceiveOnlyChangedSize()
  115. ourSeq = snap.Sequence(protocol.LocalDeviceID)
  116. remoteSeq = snap.Sequence(protocol.GlobalDeviceID)
  117. snap.Release()
  118. }
  119. }
  120. // For API backwards compatibility (SyncTrayzor needs it) an empty folder
  121. // summary is returned for not running folders, an error might actually be
  122. // more appropriate
  123. if err != nil && err != ErrFolderPaused && err != ErrFolderNotRunning {
  124. return nil, err
  125. }
  126. res.Errors = len(errors)
  127. res.PullErrors = len(errors) // deprecated
  128. res.Invalid = "" // Deprecated, retains external API for now
  129. res.GlobalFiles, res.GlobalDirectories, res.GlobalSymlinks, res.GlobalDeleted, res.GlobalBytes, res.GlobalTotalItems = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems()
  130. res.LocalFiles, res.LocalDirectories, res.LocalSymlinks, res.LocalDeleted, res.LocalBytes, res.LocalTotalItems = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems()
  131. fcfg, haveFcfg := c.cfg.Folder(folder)
  132. if haveFcfg && fcfg.IgnoreDelete {
  133. need.Deleted = 0
  134. }
  135. need.Bytes -= c.model.FolderProgressBytesCompleted(folder)
  136. // This may happen if we are in progress of pulling files that were
  137. // deleted globally after the pull started.
  138. if need.Bytes < 0 {
  139. need.Bytes = 0
  140. }
  141. res.NeedFiles, res.NeedDirectories, res.NeedSymlinks, res.NeedDeletes, res.NeedBytes, res.NeedTotalItems = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems()
  142. if haveFcfg && (fcfg.Type == config.FolderTypeReceiveOnly || fcfg.Type == config.FolderTypeReceiveEncrypted) {
  143. // Add statistics for things that have changed locally in a receive
  144. // only or receive encrypted folder.
  145. res.ReceiveOnlyChangedFiles = ro.Files
  146. res.ReceiveOnlyChangedDirectories = ro.Directories
  147. res.ReceiveOnlyChangedSymlinks = ro.Symlinks
  148. res.ReceiveOnlyChangedDeletes = ro.Deleted
  149. res.ReceiveOnlyChangedBytes = ro.Bytes
  150. res.ReceiveOnlyTotalItems = ro.TotalItems()
  151. }
  152. res.InSyncFiles, res.InSyncBytes = global.Files-need.Files, global.Bytes-need.Bytes
  153. res.State, res.StateChanged, err = c.model.State(folder)
  154. if err != nil {
  155. res.Error = err.Error()
  156. }
  157. res.Version = ourSeq + remoteSeq // legacy
  158. res.Sequence = ourSeq + remoteSeq // new name
  159. ignorePatterns, _, _ := c.model.CurrentIgnores(folder)
  160. res.IgnorePatterns = false
  161. for _, line := range ignorePatterns {
  162. if len(line) > 0 && !strings.HasPrefix(line, "//") {
  163. res.IgnorePatterns = true
  164. break
  165. }
  166. }
  167. err = c.model.WatchError(folder)
  168. if err != nil {
  169. res.WatchError = err.Error()
  170. }
  171. return res, nil
  172. }
  173. func (c *folderSummaryService) OnEventRequest() {
  174. c.lastEventReqMut.Lock()
  175. c.lastEventReq = time.Now()
  176. c.lastEventReqMut.Unlock()
  177. }
  178. // listenForUpdates subscribes to the event bus and makes note of folders that
  179. // need their data recalculated.
  180. func (c *folderSummaryService) listenForUpdates(ctx context.Context) error {
  181. sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress)
  182. defer sub.Unsubscribe()
  183. for {
  184. // This loop needs to be fast so we don't miss too many events.
  185. select {
  186. case ev, ok := <-sub.C():
  187. if !ok {
  188. <-ctx.Done()
  189. return ctx.Err()
  190. }
  191. c.processUpdate(ev)
  192. case <-ctx.Done():
  193. return ctx.Err()
  194. }
  195. }
  196. }
  197. func (c *folderSummaryService) processUpdate(ev events.Event) {
  198. var folder string
  199. switch ev.Type {
  200. case events.DeviceConnected:
  201. // When a device connects we schedule a refresh of all
  202. // folders shared with that device.
  203. data := ev.Data.(map[string]string)
  204. deviceID, _ := protocol.DeviceIDFromString(data["id"])
  205. c.foldersMut.Lock()
  206. nextFolder:
  207. for _, folder := range c.cfg.Folders() {
  208. for _, dev := range folder.Devices {
  209. if dev.DeviceID == deviceID {
  210. c.folders[folder.ID] = struct{}{}
  211. continue nextFolder
  212. }
  213. }
  214. }
  215. c.foldersMut.Unlock()
  216. return
  217. case events.DownloadProgress:
  218. data := ev.Data.(map[string]map[string]*pullerProgress)
  219. c.foldersMut.Lock()
  220. for folder := range data {
  221. c.folders[folder] = struct{}{}
  222. }
  223. c.foldersMut.Unlock()
  224. return
  225. case events.StateChanged:
  226. data := ev.Data.(map[string]interface{})
  227. if data["to"].(string) != "idle" {
  228. return
  229. }
  230. if from := data["from"].(string); from != "syncing" && from != "sync-preparing" {
  231. return
  232. }
  233. // The folder changed to idle from syncing. We should do an
  234. // immediate refresh to update the GUI. The send to
  235. // c.immediate must be nonblocking so that we can continue
  236. // handling events.
  237. folder = data["folder"].(string)
  238. select {
  239. case c.immediate <- folder:
  240. c.foldersMut.Lock()
  241. delete(c.folders, folder)
  242. c.foldersMut.Unlock()
  243. return
  244. default:
  245. // Refresh whenever we do the next summary.
  246. }
  247. default:
  248. // The other events all have a "folder" attribute that they
  249. // affect. Whenever the local or remote index is updated for a
  250. // given folder we make a note of it.
  251. // This folder needs to be refreshed whenever we do the next
  252. // refresh.
  253. folder = ev.Data.(map[string]interface{})["folder"].(string)
  254. }
  255. c.foldersMut.Lock()
  256. c.folders[folder] = struct{}{}
  257. c.foldersMut.Unlock()
  258. }
  259. // calculateSummaries periodically recalculates folder summaries and
  260. // completion percentage, and sends the results on the event bus.
  261. func (c *folderSummaryService) calculateSummaries(ctx context.Context) error {
  262. const pumpInterval = 2 * time.Second
  263. pump := time.NewTimer(pumpInterval)
  264. for {
  265. select {
  266. case <-pump.C:
  267. t0 := time.Now()
  268. for _, folder := range c.foldersToHandle() {
  269. select {
  270. case <-ctx.Done():
  271. return ctx.Err()
  272. default:
  273. }
  274. c.sendSummary(ctx, folder)
  275. }
  276. // We don't want to spend all our time calculating summaries. Lets
  277. // set an arbitrary limit at not spending more than about 30% of
  278. // our time here...
  279. wait := 2*time.Since(t0) + pumpInterval
  280. pump.Reset(wait)
  281. case folder := <-c.immediate:
  282. c.sendSummary(ctx, folder)
  283. case <-ctx.Done():
  284. return ctx.Err()
  285. }
  286. }
  287. }
  288. // foldersToHandle returns the list of folders needing a summary update, and
  289. // clears the list.
  290. func (c *folderSummaryService) foldersToHandle() []string {
  291. // We only recalculate summaries if someone is listening to events
  292. // (a request to /rest/events has been made within the last
  293. // pingEventInterval).
  294. c.lastEventReqMut.Lock()
  295. last := c.lastEventReq
  296. c.lastEventReqMut.Unlock()
  297. if time.Since(last) > maxDurationSinceLastEventReq {
  298. return nil
  299. }
  300. c.foldersMut.Lock()
  301. res := make([]string, 0, len(c.folders))
  302. for folder := range c.folders {
  303. res = append(res, folder)
  304. delete(c.folders, folder)
  305. }
  306. c.foldersMut.Unlock()
  307. return res
  308. }
  309. type FolderSummaryEventData struct {
  310. Folder string `json:"folder"`
  311. Summary *FolderSummary `json:"summary"`
  312. }
  313. // sendSummary send the summary events for a single folder
  314. func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
  315. // The folder summary contains how many bytes, files etc
  316. // are in the folder and how in sync we are.
  317. data, err := c.Summary(folder)
  318. if err != nil {
  319. return
  320. }
  321. c.evLogger.Log(events.FolderSummary, FolderSummaryEventData{
  322. Folder: folder,
  323. Summary: data,
  324. })
  325. for _, devCfg := range c.cfg.Folders()[folder].Devices {
  326. select {
  327. case <-ctx.Done():
  328. return
  329. default:
  330. }
  331. if devCfg.DeviceID.Equals(c.id) {
  332. // We already know about ourselves.
  333. continue
  334. }
  335. if _, ok := c.model.Connection(devCfg.DeviceID); !ok {
  336. // We're not interested in disconnected devices.
  337. continue
  338. }
  339. // Get completion percentage of this folder for the
  340. // remote device.
  341. comp, err := c.model.Completion(devCfg.DeviceID, folder)
  342. if err != nil {
  343. l.Debugf("Error getting completion for folder %v, device %v: %v", folder, devCfg.DeviceID, err)
  344. continue
  345. }
  346. ev := comp.Map()
  347. ev["folder"] = folder
  348. ev["device"] = devCfg.DeviceID.String()
  349. c.evLogger.Log(events.FolderCompletion, ev)
  350. }
  351. }