aggregator.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package watchaggregator
  7. import (
  8. "context"
  9. "fmt"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/syncthing/syncthing/lib/config"
  14. "github.com/syncthing/syncthing/lib/events"
  15. "github.com/syncthing/syncthing/lib/fs"
  16. )
  17. // Not meant to be changed, but must be changeable for tests
  18. var (
  19. maxFiles = 512
  20. maxFilesPerDir = 128
  21. )
  22. // aggregatedEvent represents potentially multiple events at and/or recursively
  23. // below one path until it times out and a scan is scheduled.
  24. // If it represents multiple events and there are events of both Remove and
  25. // NonRemove types, the evType attribute is Mixed (as returned by fs.Event.Merge).
  26. type aggregatedEvent struct {
  27. firstModTime time.Time
  28. lastModTime time.Time
  29. evType fs.EventType
  30. }
  31. // Stores pointers to both aggregated events directly within this directory and
  32. // child directories recursively containing aggregated events themselves.
  33. type eventDir struct {
  34. events map[string]*aggregatedEvent
  35. dirs map[string]*eventDir
  36. }
  37. func newEventDir() *eventDir {
  38. return &eventDir{
  39. events: make(map[string]*aggregatedEvent),
  40. dirs: make(map[string]*eventDir),
  41. }
  42. }
  43. func (dir *eventDir) childCount() int {
  44. return len(dir.events) + len(dir.dirs)
  45. }
  46. func (dir *eventDir) firstModTime() time.Time {
  47. if dir.childCount() == 0 {
  48. panic("bug: firstModTime must not be used on empty eventDir")
  49. }
  50. firstModTime := time.Now()
  51. for _, childDir := range dir.dirs {
  52. dirTime := childDir.firstModTime()
  53. if dirTime.Before(firstModTime) {
  54. firstModTime = dirTime
  55. }
  56. }
  57. for _, event := range dir.events {
  58. if event.firstModTime.Before(firstModTime) {
  59. firstModTime = event.firstModTime
  60. }
  61. }
  62. return firstModTime
  63. }
  64. func (dir *eventDir) eventType() fs.EventType {
  65. if dir.childCount() == 0 {
  66. panic("bug: eventType must not be used on empty eventDir")
  67. }
  68. var evType fs.EventType
  69. for _, childDir := range dir.dirs {
  70. evType |= childDir.eventType()
  71. if evType == fs.Mixed {
  72. return fs.Mixed
  73. }
  74. }
  75. for _, event := range dir.events {
  76. evType |= event.evType
  77. if evType == fs.Mixed {
  78. return fs.Mixed
  79. }
  80. }
  81. return evType
  82. }
  83. type aggregator struct {
  84. // folderID never changes and is accessed in CommitConfiguration, which
  85. // asynchronously updates folderCfg -> can't use folderCfg.ID (racy)
  86. folderID string
  87. folderCfg config.FolderConfiguration
  88. folderCfgUpdate chan config.FolderConfiguration
  89. // Time after which an event is scheduled for scanning when no modifications occur.
  90. notifyDelay time.Duration
  91. // Time after which an event is scheduled for scanning even though modifications occur.
  92. notifyTimeout time.Duration
  93. notifyTimer *time.Timer
  94. notifyTimerNeedsReset bool
  95. notifyTimerResetChan chan time.Duration
  96. counts eventCounter
  97. root *eventDir
  98. ctx context.Context
  99. }
  100. type eventCounter struct {
  101. removes int // Includes mixed events.
  102. nonRemoves int
  103. }
  104. func (c *eventCounter) add(typ fs.EventType, n int) {
  105. if typ&fs.Remove != 0 {
  106. c.removes += n
  107. } else {
  108. c.nonRemoves += n
  109. }
  110. }
  111. func (c *eventCounter) total() int { return c.removes + c.nonRemoves }
  112. func newAggregator(ctx context.Context, folderCfg config.FolderConfiguration) *aggregator {
  113. a := &aggregator{
  114. folderID: folderCfg.ID,
  115. folderCfgUpdate: make(chan config.FolderConfiguration),
  116. notifyTimerNeedsReset: false,
  117. notifyTimerResetChan: make(chan time.Duration),
  118. root: newEventDir(),
  119. ctx: ctx,
  120. }
  121. a.updateConfig(folderCfg)
  122. return a
  123. }
  124. func Aggregate(ctx context.Context, in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger) {
  125. a := newAggregator(ctx, folderCfg)
  126. // Necessary for unit tests where the backend is mocked
  127. go a.mainLoop(in, out, cfg, evLogger)
  128. }
  129. func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg config.Wrapper, evLogger events.Logger) {
  130. a.notifyTimer = time.NewTimer(a.notifyDelay)
  131. defer a.notifyTimer.Stop()
  132. inProgressItemSubscription := evLogger.Subscribe(events.ItemStarted | events.ItemFinished)
  133. defer inProgressItemSubscription.Unsubscribe()
  134. cfg.Subscribe(a)
  135. defer cfg.Unsubscribe(a)
  136. inProgress := make(map[string]struct{})
  137. for {
  138. select {
  139. case event := <-in:
  140. a.newEvent(event, inProgress)
  141. case event := <-inProgressItemSubscription.C():
  142. updateInProgressSet(event, inProgress)
  143. case <-a.notifyTimer.C:
  144. a.actOnTimer(out)
  145. case interval := <-a.notifyTimerResetChan:
  146. a.resetNotifyTimer(interval)
  147. case folderCfg := <-a.folderCfgUpdate:
  148. a.updateConfig(folderCfg)
  149. case <-a.ctx.Done():
  150. l.Debugln(a, "Stopped")
  151. return
  152. }
  153. }
  154. }
  155. func (a *aggregator) newEvent(event fs.Event, inProgress map[string]struct{}) {
  156. if _, ok := a.root.events["."]; ok {
  157. l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)
  158. return
  159. }
  160. if _, ok := inProgress[event.Name]; ok {
  161. l.Debugln(a, "Skipping path we modified:", event.Name)
  162. return
  163. }
  164. a.aggregateEvent(event, time.Now())
  165. }
  166. func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time) {
  167. if event.Name == "." || a.counts.total() == maxFiles {
  168. l.Debugln(a, "Scan entire folder")
  169. firstModTime := evTime
  170. if a.root.childCount() != 0 {
  171. event.Type = event.Type.Merge(a.root.eventType())
  172. firstModTime = a.root.firstModTime()
  173. }
  174. a.root.dirs = make(map[string]*eventDir)
  175. a.root.events = make(map[string]*aggregatedEvent)
  176. a.root.events["."] = &aggregatedEvent{
  177. firstModTime: firstModTime,
  178. lastModTime: evTime,
  179. evType: event.Type,
  180. }
  181. a.counts = eventCounter{}
  182. a.counts.add(event.Type, 1)
  183. a.resetNotifyTimerIfNeeded()
  184. return
  185. }
  186. parentDir := a.root
  187. // Check if any parent directory is already tracked or will exceed
  188. // events per directory limit bottom up
  189. pathSegments := strings.Split(filepath.ToSlash(event.Name), "/")
  190. // As root dir cannot be further aggregated, allow up to maxFiles
  191. // children.
  192. localMaxFilesPerDir := maxFiles
  193. var currPath string
  194. for i, name := range pathSegments[:len(pathSegments)-1] {
  195. currPath = filepath.Join(currPath, name)
  196. if ev, ok := parentDir.events[name]; ok {
  197. ev.lastModTime = evTime
  198. if merged := event.Type.Merge(ev.evType); ev.evType != merged {
  199. a.counts.add(ev.evType, -1)
  200. a.counts.add(merged, 1)
  201. ev.evType = merged
  202. }
  203. l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)
  204. return
  205. }
  206. if parentDir.childCount() == localMaxFilesPerDir {
  207. l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)
  208. event.Name = filepath.Dir(currPath)
  209. a.aggregateEvent(event, evTime)
  210. return
  211. }
  212. // If there are no events below path, but we need to recurse
  213. // into that path, create eventDir at path.
  214. if newParent, ok := parentDir.dirs[name]; ok {
  215. parentDir = newParent
  216. } else {
  217. l.Debugln(a, "Creating eventDir at:", currPath)
  218. newParent = newEventDir()
  219. parentDir.dirs[name] = newParent
  220. parentDir = newParent
  221. }
  222. // Reset allowed children count to maxFilesPerDir for non-root
  223. if i == 0 {
  224. localMaxFilesPerDir = maxFilesPerDir
  225. }
  226. }
  227. name := pathSegments[len(pathSegments)-1]
  228. if ev, ok := parentDir.events[name]; ok {
  229. ev.lastModTime = evTime
  230. if merged := event.Type.Merge(ev.evType); ev.evType != merged {
  231. a.counts.add(ev.evType, -1)
  232. a.counts.add(merged, 1)
  233. ev.evType = merged
  234. }
  235. l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)
  236. return
  237. }
  238. childDir, ok := parentDir.dirs[name]
  239. // If a dir existed at path, it would be removed from dirs, thus
  240. // childCount would not increase.
  241. if !ok && parentDir.childCount() == localMaxFilesPerDir {
  242. l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)
  243. event.Name = filepath.Dir(event.Name)
  244. a.aggregateEvent(event, evTime)
  245. return
  246. }
  247. firstModTime := evTime
  248. if ok {
  249. firstModTime = childDir.firstModTime()
  250. if merged := event.Type.Merge(childDir.eventType()); event.Type != merged {
  251. a.counts.add(event.Type, -1)
  252. event.Type = merged
  253. }
  254. delete(parentDir.dirs, name)
  255. }
  256. l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)
  257. parentDir.events[name] = &aggregatedEvent{
  258. firstModTime: firstModTime,
  259. lastModTime: evTime,
  260. evType: event.Type,
  261. }
  262. a.counts.add(event.Type, 1)
  263. a.resetNotifyTimerIfNeeded()
  264. }
  265. func (a *aggregator) resetNotifyTimerIfNeeded() {
  266. if a.notifyTimerNeedsReset {
  267. a.resetNotifyTimer(a.notifyDelay)
  268. }
  269. }
  270. // resetNotifyTimer should only ever be called when notifyTimer has stopped
  271. // and notifyTimer.C been read from. Otherwise, call resetNotifyTimerIfNeeded.
  272. func (a *aggregator) resetNotifyTimer(duration time.Duration) {
  273. l.Debugln(a, "Resetting notifyTimer to", duration.String())
  274. a.notifyTimerNeedsReset = false
  275. a.notifyTimer.Reset(duration)
  276. }
  277. func (a *aggregator) actOnTimer(out chan<- []string) {
  278. c := a.counts.total()
  279. if c == 0 {
  280. l.Debugln(a, "No tracked events, waiting for new event.")
  281. a.notifyTimerNeedsReset = true
  282. return
  283. }
  284. oldEvents := make(map[string]*aggregatedEvent, c)
  285. a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)
  286. if a.notifyDelay != a.notifyTimeout && a.counts.nonRemoves == 0 && a.counts.removes != 0 {
  287. // Only delayed events remaining, no need to delay them additionally
  288. a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)
  289. }
  290. if len(oldEvents) == 0 {
  291. l.Debugln(a, "No old fs events")
  292. a.resetNotifyTimer(a.notifyDelay)
  293. return
  294. }
  295. // Sending to channel might block for a long time, but we need to keep
  296. // reading from notify backend channel to avoid overflow
  297. go a.notify(oldEvents, out)
  298. }
  299. // Schedule scan for given events dispatching deletes last and reset notification
  300. // afterwards to set up for the next scan scheduling.
  301. func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []string) {
  302. timeBeforeSending := time.Now()
  303. l.Debugf("%v Notifying about %d fs events", a, len(oldEvents))
  304. separatedBatches := make(map[fs.EventType][]string)
  305. for path, event := range oldEvents {
  306. separatedBatches[event.evType] = append(separatedBatches[event.evType], path)
  307. }
  308. for _, evType := range [3]fs.EventType{fs.NonRemove, fs.Mixed, fs.Remove} {
  309. currBatch := separatedBatches[evType]
  310. if len(currBatch) != 0 {
  311. select {
  312. case out <- currBatch:
  313. case <-a.ctx.Done():
  314. return
  315. }
  316. }
  317. }
  318. // If sending to channel blocked for a long time,
  319. // shorten next notifyDelay accordingly.
  320. duration := time.Since(timeBeforeSending)
  321. buffer := time.Millisecond
  322. var nextDelay time.Duration
  323. switch {
  324. case duration < a.notifyDelay/10:
  325. nextDelay = a.notifyDelay
  326. case duration+buffer > a.notifyDelay:
  327. nextDelay = buffer
  328. default:
  329. nextDelay = a.notifyDelay - duration
  330. }
  331. select {
  332. case a.notifyTimerResetChan <- nextDelay:
  333. case <-a.ctx.Done():
  334. }
  335. }
  336. // popOldEvents finds events that should be scheduled for scanning recursively in dirs,
  337. // removes those events and empty eventDirs and returns a map with all the removed
  338. // events referenced by their filesystem path
  339. func (a *aggregator) popOldEventsTo(to map[string]*aggregatedEvent, dir *eventDir, dirPath string, currTime time.Time, delayRem bool) {
  340. for childName, childDir := range dir.dirs {
  341. a.popOldEventsTo(to, childDir, filepath.Join(dirPath, childName), currTime, delayRem)
  342. if childDir.childCount() == 0 {
  343. delete(dir.dirs, childName)
  344. }
  345. }
  346. for name, event := range dir.events {
  347. if a.isOld(event, currTime, delayRem) {
  348. to[filepath.Join(dirPath, name)] = event
  349. delete(dir.events, name)
  350. a.counts.add(event.evType, -1)
  351. }
  352. }
  353. }
  354. func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time, delayRem bool) bool {
  355. // Deletes should in general be scanned last, therefore they are delayed by
  356. // letting them time out. This behaviour is overriden by delayRem == false.
  357. // Refer to following comments as to why.
  358. // An event that has not registered any new modifications recently is scanned.
  359. // a.notifyDelay is the user facing value signifying the normal delay between
  360. // picking up a modification and scanning it. As scheduling scans happens at
  361. // regular intervals of a.notifyDelay the delay of a single event is not exactly
  362. // a.notifyDelay, but lies in the range of 0.5 to 1.5 times a.notifyDelay.
  363. if (!delayRem || ev.evType == fs.NonRemove) && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
  364. return true
  365. }
  366. // When an event registers repeat modifications or involves removals it
  367. // is delayed to reduce resource usage, but after a certain time (notifyTimeout)
  368. // passed it is scanned anyway.
  369. // If only removals are remaining to be scanned, there is no point to delay
  370. // removals further, so this behaviour is overriden by delayRem == false.
  371. return currTime.Sub(ev.firstModTime) > a.notifyTimeout
  372. }
  373. func (a *aggregator) String() string {
  374. return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())
  375. }
  376. func (a *aggregator) VerifyConfiguration(from, to config.Configuration) error {
  377. return nil
  378. }
  379. func (a *aggregator) CommitConfiguration(from, to config.Configuration) bool {
  380. for _, folderCfg := range to.Folders {
  381. if folderCfg.ID == a.folderID {
  382. select {
  383. case a.folderCfgUpdate <- folderCfg:
  384. case <-a.ctx.Done():
  385. }
  386. return true
  387. }
  388. }
  389. // Nothing to do, model will soon stop this
  390. return true
  391. }
  392. func (a *aggregator) updateConfig(folderCfg config.FolderConfiguration) {
  393. a.notifyDelay = time.Duration(folderCfg.FSWatcherDelayS) * time.Second
  394. a.notifyTimeout = notifyTimeout(folderCfg.FSWatcherDelayS)
  395. a.folderCfg = folderCfg
  396. }
  397. func updateInProgressSet(event events.Event, inProgress map[string]struct{}) {
  398. if event.Type == events.ItemStarted {
  399. path := event.Data.(map[string]string)["item"]
  400. inProgress[path] = struct{}{}
  401. } else if event.Type == events.ItemFinished {
  402. path := event.Data.(map[string]interface{})["item"].(string)
  403. delete(inProgress, path)
  404. }
  405. }
  406. // Events that involve removals or continuously receive new modifications are
  407. // delayed but must time out at some point. The following numbers come out of thin
  408. // air, they were just considered as a sensible compromise between fast updates and
  409. // saving resources. For short delays the timeout is 6 times the delay, capped at 1
  410. // minute. For delays longer than 1 minute, the delay and timeout are equal.
  411. func notifyTimeout(eventDelayS int) time.Duration {
  412. shortDelayS := 10
  413. shortDelayMultiplicator := 6
  414. longDelayS := 60
  415. longDelayTimeout := time.Duration(1) * time.Minute
  416. if eventDelayS < shortDelayS {
  417. return time.Duration(eventDelayS*shortDelayMultiplicator) * time.Second
  418. }
  419. if eventDelayS < longDelayS {
  420. return longDelayTimeout
  421. }
  422. return time.Duration(eventDelayS) * time.Second
  423. }