folder_summary.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
  7. //go:generate counterfeiter -o mocks/folderSummaryService.go --fake-name FolderSummaryService . FolderSummaryService
  8. package model
  9. import (
  10. "context"
  11. "errors"
  12. "fmt"
  13. "strings"
  14. "time"
  15. "github.com/thejerf/suture/v4"
  16. "github.com/syncthing/syncthing/internal/db"
  17. "github.com/syncthing/syncthing/lib/config"
  18. "github.com/syncthing/syncthing/lib/events"
  19. "github.com/syncthing/syncthing/lib/protocol"
  20. "github.com/syncthing/syncthing/lib/svcutil"
  21. "github.com/syncthing/syncthing/lib/sync"
  22. )
  23. type FolderSummaryService interface {
  24. suture.Service
  25. Summary(folder string) (*FolderSummary, error)
  26. }
  27. // The folderSummaryService adds summary information events (FolderSummary and
  28. // FolderCompletion) into the event stream at certain intervals.
  29. type folderSummaryService struct {
  30. *suture.Supervisor
  31. cfg config.Wrapper
  32. model Model
  33. id protocol.DeviceID
  34. evLogger events.Logger
  35. immediate chan string
  36. // For keeping track of folders to recalculate for
  37. foldersMut sync.Mutex
  38. folders map[string]struct{}
  39. }
  40. func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService {
  41. service := &folderSummaryService{
  42. Supervisor: suture.New("folderSummaryService", svcutil.SpecWithDebugLogger(l)),
  43. cfg: cfg,
  44. model: m,
  45. id: id,
  46. evLogger: evLogger,
  47. immediate: make(chan string),
  48. folders: make(map[string]struct{}),
  49. foldersMut: sync.NewMutex(),
  50. }
  51. service.Add(svcutil.AsService(service.listenForUpdates, fmt.Sprintf("%s/listenForUpdates", service)))
  52. service.Add(svcutil.AsService(service.calculateSummaries, fmt.Sprintf("%s/calculateSummaries", service)))
  53. return service
  54. }
  55. func (c *folderSummaryService) String() string {
  56. return fmt.Sprintf("FolderSummaryService@%p", c)
  57. }
  58. // FolderSummary replaces the previously used map[string]interface{}, and needs
  59. // to keep the structure/naming for api backwards compatibility
  60. type FolderSummary struct {
  61. Errors int `json:"errors"`
  62. PullErrors int `json:"pullErrors"` // deprecated
  63. Invalid string `json:"invalid"` // deprecated
  64. GlobalFiles int `json:"globalFiles"`
  65. GlobalDirectories int `json:"globalDirectories"`
  66. GlobalSymlinks int `json:"globalSymlinks"`
  67. GlobalDeleted int `json:"globalDeleted"`
  68. GlobalBytes int64 `json:"globalBytes"`
  69. GlobalTotalItems int `json:"globalTotalItems"`
  70. LocalFiles int `json:"localFiles"`
  71. LocalDirectories int `json:"localDirectories"`
  72. LocalSymlinks int `json:"localSymlinks"`
  73. LocalDeleted int `json:"localDeleted"`
  74. LocalBytes int64 `json:"localBytes"`
  75. LocalTotalItems int `json:"localTotalItems"`
  76. NeedFiles int `json:"needFiles"`
  77. NeedDirectories int `json:"needDirectories"`
  78. NeedSymlinks int `json:"needSymlinks"`
  79. NeedDeletes int `json:"needDeletes"`
  80. NeedBytes int64 `json:"needBytes"`
  81. NeedTotalItems int `json:"needTotalItems"`
  82. ReceiveOnlyChangedFiles int `json:"receiveOnlyChangedFiles"`
  83. ReceiveOnlyChangedDirectories int `json:"receiveOnlyChangedDirectories"`
  84. ReceiveOnlyChangedSymlinks int `json:"receiveOnlyChangedSymlinks"`
  85. ReceiveOnlyChangedDeletes int `json:"receiveOnlyChangedDeletes"`
  86. ReceiveOnlyChangedBytes int64 `json:"receiveOnlyChangedBytes"`
  87. ReceiveOnlyTotalItems int `json:"receiveOnlyTotalItems"`
  88. InSyncFiles int `json:"inSyncFiles"`
  89. InSyncBytes int64 `json:"inSyncBytes"`
  90. State string `json:"state"`
  91. StateChanged time.Time `json:"stateChanged"`
  92. Error string `json:"error"`
  93. Version int64 `json:"version"` // deprecated
  94. Sequence int64 `json:"sequence"`
  95. RemoteSequence map[protocol.DeviceID]int64 `json:"remoteSequence"`
  96. IgnorePatterns bool `json:"ignorePatterns"`
  97. WatchError string `json:"watchError"`
  98. }
  99. func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) {
  100. res := new(FolderSummary)
  101. var local, global, need, ro db.Counts
  102. var ourSeq int64
  103. var remoteSeq map[protocol.DeviceID]int64
  104. errs, err := c.model.FolderErrors(folder)
  105. if err == nil {
  106. global, _ = c.model.GlobalSize(folder)
  107. local, _ = c.model.LocalSize(folder, protocol.LocalDeviceID)
  108. need, _ = c.model.NeedSize(folder, protocol.LocalDeviceID)
  109. ro, _ = c.model.ReceiveOnlySize(folder)
  110. ourSeq, _ = c.model.Sequence(folder, protocol.LocalDeviceID)
  111. remoteSeq, _ = c.model.RemoteSequences(folder)
  112. }
  113. // For API backwards compatibility (SyncTrayzor needs it) an empty folder
  114. // summary is returned for not running folders, an error might actually be
  115. // more appropriate
  116. if err != nil && !errors.Is(err, ErrFolderPaused) && !errors.Is(err, ErrFolderNotRunning) {
  117. return nil, err
  118. }
  119. res.Errors = len(errs)
  120. res.PullErrors = len(errs) // deprecated
  121. res.Invalid = "" // Deprecated, retains external API for now
  122. res.GlobalFiles, res.GlobalDirectories, res.GlobalSymlinks, res.GlobalDeleted, res.GlobalBytes, res.GlobalTotalItems = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems()
  123. res.LocalFiles, res.LocalDirectories, res.LocalSymlinks, res.LocalDeleted, res.LocalBytes, res.LocalTotalItems = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems()
  124. fcfg, haveFcfg := c.cfg.Folder(folder)
  125. if haveFcfg && fcfg.IgnoreDelete {
  126. need.Deleted = 0
  127. }
  128. need.Bytes -= c.model.FolderProgressBytesCompleted(folder)
  129. // This may happen if we are in progress of pulling files that were
  130. // deleted globally after the pull started.
  131. if need.Bytes < 0 {
  132. need.Bytes = 0
  133. }
  134. res.NeedFiles, res.NeedDirectories, res.NeedSymlinks, res.NeedDeletes, res.NeedBytes, res.NeedTotalItems = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems()
  135. if haveFcfg && (fcfg.Type == config.FolderTypeReceiveOnly || fcfg.Type == config.FolderTypeReceiveEncrypted) {
  136. // Add statistics for things that have changed locally in a receive
  137. // only or receive encrypted folder.
  138. res.ReceiveOnlyChangedFiles = ro.Files
  139. res.ReceiveOnlyChangedDirectories = ro.Directories
  140. res.ReceiveOnlyChangedSymlinks = ro.Symlinks
  141. res.ReceiveOnlyChangedDeletes = ro.Deleted
  142. res.ReceiveOnlyChangedBytes = ro.Bytes
  143. res.ReceiveOnlyTotalItems = ro.TotalItems()
  144. }
  145. res.InSyncFiles, res.InSyncBytes = global.Files-need.Files, global.Bytes-need.Bytes
  146. res.State, res.StateChanged, err = c.model.State(folder)
  147. if err != nil {
  148. res.Error = err.Error()
  149. }
  150. res.Version = ourSeq // legacy
  151. res.Sequence = ourSeq
  152. res.RemoteSequence = remoteSeq
  153. ignorePatterns, _, _ := c.model.CurrentIgnores(folder)
  154. res.IgnorePatterns = false
  155. for _, line := range ignorePatterns {
  156. if len(line) > 0 && !strings.HasPrefix(line, "//") {
  157. res.IgnorePatterns = true
  158. break
  159. }
  160. }
  161. err = c.model.WatchError(folder)
  162. if err != nil {
  163. res.WatchError = err.Error()
  164. }
  165. return res, nil
  166. }
  167. // listenForUpdates subscribes to the event bus and makes note of folders that
  168. // need their data recalculated.
  169. func (c *folderSummaryService) listenForUpdates(ctx context.Context) error {
  170. sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.ClusterConfigReceived | events.FolderWatchStateChanged | events.DownloadProgress)
  171. defer sub.Unsubscribe()
  172. for {
  173. // This loop needs to be fast so we don't miss too many events.
  174. select {
  175. case ev, ok := <-sub.C():
  176. if !ok {
  177. <-ctx.Done()
  178. return ctx.Err()
  179. }
  180. c.processUpdate(ev)
  181. case <-ctx.Done():
  182. return ctx.Err()
  183. }
  184. }
  185. }
  186. func (c *folderSummaryService) processUpdate(ev events.Event) {
  187. var folder string
  188. switch ev.Type {
  189. case events.DeviceConnected, events.ClusterConfigReceived:
  190. // When a device connects we schedule a refresh of all
  191. // folders shared with that device.
  192. var deviceID protocol.DeviceID
  193. if ev.Type == events.DeviceConnected {
  194. data := ev.Data.(map[string]string)
  195. deviceID, _ = protocol.DeviceIDFromString(data["id"])
  196. } else {
  197. data := ev.Data.(ClusterConfigReceivedEventData)
  198. deviceID = data.Device
  199. }
  200. c.foldersMut.Lock()
  201. nextFolder:
  202. for _, folder := range c.cfg.Folders() {
  203. for _, dev := range folder.Devices {
  204. if dev.DeviceID == deviceID {
  205. c.folders[folder.ID] = struct{}{}
  206. continue nextFolder
  207. }
  208. }
  209. }
  210. c.foldersMut.Unlock()
  211. return
  212. case events.DownloadProgress:
  213. data := ev.Data.(map[string]map[string]*PullerProgress)
  214. c.foldersMut.Lock()
  215. for folder := range data {
  216. c.folders[folder] = struct{}{}
  217. }
  218. c.foldersMut.Unlock()
  219. return
  220. case events.StateChanged:
  221. data := ev.Data.(map[string]interface{})
  222. if data["to"].(string) != "idle" {
  223. return
  224. }
  225. if from := data["from"].(string); from != "syncing" && from != "sync-preparing" {
  226. return
  227. }
  228. // The folder changed to idle from syncing. We should do an
  229. // immediate refresh to update the GUI. The send to
  230. // c.immediate must be nonblocking so that we can continue
  231. // handling events.
  232. folder = data["folder"].(string)
  233. select {
  234. case c.immediate <- folder:
  235. c.foldersMut.Lock()
  236. delete(c.folders, folder)
  237. c.foldersMut.Unlock()
  238. return
  239. default:
  240. // Refresh whenever we do the next summary.
  241. }
  242. default:
  243. // The other events all have a "folder" attribute that they
  244. // affect. Whenever the local or remote index is updated for a
  245. // given folder we make a note of it.
  246. // This folder needs to be refreshed whenever we do the next
  247. // refresh.
  248. folder = ev.Data.(map[string]interface{})["folder"].(string)
  249. }
  250. c.foldersMut.Lock()
  251. c.folders[folder] = struct{}{}
  252. c.foldersMut.Unlock()
  253. }
  254. // calculateSummaries periodically recalculates folder summaries and
  255. // completion percentage, and sends the results on the event bus.
  256. func (c *folderSummaryService) calculateSummaries(ctx context.Context) error {
  257. const pumpInterval = 2 * time.Second
  258. pump := time.NewTimer(pumpInterval)
  259. for {
  260. select {
  261. case <-pump.C:
  262. t0 := time.Now()
  263. for _, folder := range c.foldersToHandle() {
  264. select {
  265. case <-ctx.Done():
  266. return ctx.Err()
  267. default:
  268. }
  269. c.sendSummary(ctx, folder)
  270. }
  271. // We don't want to spend all our time calculating summaries. Lets
  272. // set an arbitrary limit at not spending more than about 30% of
  273. // our time here...
  274. wait := 2*time.Since(t0) + pumpInterval
  275. pump.Reset(wait)
  276. case folder := <-c.immediate:
  277. c.sendSummary(ctx, folder)
  278. case <-ctx.Done():
  279. return ctx.Err()
  280. }
  281. }
  282. }
  283. // foldersToHandle returns the list of folders needing a summary update, and
  284. // clears the list.
  285. func (c *folderSummaryService) foldersToHandle() []string {
  286. c.foldersMut.Lock()
  287. res := make([]string, 0, len(c.folders))
  288. for folder := range c.folders {
  289. res = append(res, folder)
  290. delete(c.folders, folder)
  291. }
  292. c.foldersMut.Unlock()
  293. return res
  294. }
  295. type FolderSummaryEventData struct {
  296. Folder string `json:"folder"`
  297. Summary *FolderSummary `json:"summary"`
  298. }
  299. // sendSummary send the summary events for a single folder
  300. func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
  301. // The folder summary contains how many bytes, files etc
  302. // are in the folder and how in sync we are.
  303. data, err := c.Summary(folder)
  304. if err != nil {
  305. return
  306. }
  307. c.evLogger.Log(events.FolderSummary, FolderSummaryEventData{
  308. Folder: folder,
  309. Summary: data,
  310. })
  311. metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeFiles).Set(float64(data.GlobalFiles))
  312. metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDirectories).Set(float64(data.GlobalDirectories))
  313. metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeSymlinks).Set(float64(data.GlobalSymlinks))
  314. metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDeleted).Set(float64(data.GlobalDeleted))
  315. metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeBytes).Set(float64(data.GlobalBytes))
  316. metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeFiles).Set(float64(data.LocalFiles))
  317. metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDirectories).Set(float64(data.LocalDirectories))
  318. metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeSymlinks).Set(float64(data.LocalSymlinks))
  319. metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDeleted).Set(float64(data.LocalDeleted))
  320. metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeBytes).Set(float64(data.LocalBytes))
  321. metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeFiles).Set(float64(data.NeedFiles))
  322. metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDirectories).Set(float64(data.NeedDirectories))
  323. metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeSymlinks).Set(float64(data.NeedSymlinks))
  324. metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDeleted).Set(float64(data.NeedDeletes))
  325. metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeBytes).Set(float64(data.NeedBytes))
  326. for _, devCfg := range c.cfg.Folders()[folder].Devices {
  327. select {
  328. case <-ctx.Done():
  329. return
  330. default:
  331. }
  332. if devCfg.DeviceID.Equals(c.id) {
  333. // We already know about ourselves.
  334. continue
  335. }
  336. // Get completion percentage of this folder for the
  337. // remote device.
  338. comp, err := c.model.Completion(devCfg.DeviceID, folder)
  339. if err != nil {
  340. l.Debugf("Error getting completion for folder %v, device %v: %v", folder, devCfg.DeviceID, err)
  341. continue
  342. }
  343. ev := comp.Map()
  344. ev["folder"] = folder
  345. ev["device"] = devCfg.DeviceID.String()
  346. c.evLogger.Log(events.FolderCompletion, ev)
  347. }
  348. }