aggregator.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package watchaggregator
  7. import (
  8. "context"
  9. "fmt"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/syncthing/syncthing/lib/config"
  14. "github.com/syncthing/syncthing/lib/events"
  15. "github.com/syncthing/syncthing/lib/fs"
  16. )
  17. // Not meant to be changed, but must be changeable for tests
  18. var (
  19. maxFiles = 512
  20. maxFilesPerDir = 128
  21. )
  22. // aggregatedEvent represents potentially multiple events at and/or recursively
  23. // below one path until it times out and a scan is scheduled.
  24. type aggregatedEvent struct {
  25. firstModTime time.Time
  26. lastModTime time.Time
  27. evType fs.EventType
  28. }
  29. // Stores pointers to both aggregated events directly within this directory and
  30. // child directories recursively containing aggregated events themselves.
  31. type eventDir struct {
  32. events map[string]*aggregatedEvent
  33. dirs map[string]*eventDir
  34. }
  35. func newEventDir() *eventDir {
  36. return &eventDir{
  37. events: make(map[string]*aggregatedEvent),
  38. dirs: make(map[string]*eventDir),
  39. }
  40. }
  41. func (dir *eventDir) eventCount() int {
  42. count := len(dir.events)
  43. for _, dir := range dir.dirs {
  44. count += dir.eventCount()
  45. }
  46. return count
  47. }
  48. func (dir *eventDir) childCount() int {
  49. return len(dir.events) + len(dir.dirs)
  50. }
  51. func (dir *eventDir) firstModTime() time.Time {
  52. if dir.childCount() == 0 {
  53. panic("bug: firstModTime must not be used on empty eventDir")
  54. }
  55. firstModTime := time.Now()
  56. for _, childDir := range dir.dirs {
  57. dirTime := childDir.firstModTime()
  58. if dirTime.Before(firstModTime) {
  59. firstModTime = dirTime
  60. }
  61. }
  62. for _, event := range dir.events {
  63. if event.firstModTime.Before(firstModTime) {
  64. firstModTime = event.firstModTime
  65. }
  66. }
  67. return firstModTime
  68. }
  69. func (dir *eventDir) eventType() fs.EventType {
  70. if dir.childCount() == 0 {
  71. panic("bug: eventType must not be used on empty eventDir")
  72. }
  73. var evType fs.EventType
  74. for _, childDir := range dir.dirs {
  75. evType |= childDir.eventType()
  76. if evType == fs.Mixed {
  77. return fs.Mixed
  78. }
  79. }
  80. for _, event := range dir.events {
  81. evType |= event.evType
  82. if evType == fs.Mixed {
  83. return fs.Mixed
  84. }
  85. }
  86. return evType
  87. }
  88. type aggregator struct {
  89. // folderID never changes and is accessed in CommitConfiguration, which
  90. // asynchronously updates folderCfg -> can't use folderCfg.ID (racy)
  91. folderID string
  92. folderCfg config.FolderConfiguration
  93. folderCfgUpdate chan config.FolderConfiguration
  94. // Time after which an event is scheduled for scanning when no modifications occur.
  95. notifyDelay time.Duration
  96. // Time after which an event is scheduled for scanning even though modifications occur.
  97. notifyTimeout time.Duration
  98. notifyTimer *time.Timer
  99. notifyTimerNeedsReset bool
  100. notifyTimerResetChan chan time.Duration
  101. ctx context.Context
  102. }
  103. func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *aggregator {
  104. a := &aggregator{
  105. folderID: folderCfg.ID,
  106. folderCfgUpdate: make(chan config.FolderConfiguration),
  107. notifyTimerNeedsReset: false,
  108. notifyTimerResetChan: make(chan time.Duration),
  109. ctx: ctx,
  110. }
  111. a.updateConfig(folderCfg)
  112. return a
  113. }
  114. func Aggregate(in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg *config.Wrapper, ctx context.Context) {
  115. a := newAggregator(folderCfg, ctx)
  116. // Necessary for unit tests where the backend is mocked
  117. go a.mainLoop(in, out, cfg)
  118. }
  119. func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg *config.Wrapper) {
  120. a.notifyTimer = time.NewTimer(a.notifyDelay)
  121. defer a.notifyTimer.Stop()
  122. inProgress := make(map[string]struct{})
  123. inProgressItemSubscription := events.Default.Subscribe(events.ItemStarted | events.ItemFinished)
  124. cfg.Subscribe(a)
  125. rootEventDir := newEventDir()
  126. for {
  127. select {
  128. case event := <-in:
  129. a.newEvent(event, rootEventDir, inProgress)
  130. case event := <-inProgressItemSubscription.C():
  131. updateInProgressSet(event, inProgress)
  132. case <-a.notifyTimer.C:
  133. a.actOnTimer(rootEventDir, out)
  134. case interval := <-a.notifyTimerResetChan:
  135. a.resetNotifyTimer(interval)
  136. case folderCfg := <-a.folderCfgUpdate:
  137. a.updateConfig(folderCfg)
  138. case <-a.ctx.Done():
  139. cfg.Unsubscribe(a)
  140. l.Debugln(a, "Stopped")
  141. return
  142. }
  143. }
  144. }
  145. func (a *aggregator) newEvent(event fs.Event, rootEventDir *eventDir, inProgress map[string]struct{}) {
  146. if _, ok := rootEventDir.events["."]; ok {
  147. l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)
  148. return
  149. }
  150. if _, ok := inProgress[event.Name]; ok {
  151. l.Debugln(a, "Skipping path we modified:", event.Name)
  152. return
  153. }
  154. a.aggregateEvent(event, time.Now(), rootEventDir)
  155. }
  156. func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventDir *eventDir) {
  157. if event.Name == "." || rootEventDir.eventCount() == maxFiles {
  158. l.Debugln(a, "Scan entire folder")
  159. firstModTime := evTime
  160. if rootEventDir.childCount() != 0 {
  161. event.Type |= rootEventDir.eventType()
  162. firstModTime = rootEventDir.firstModTime()
  163. }
  164. rootEventDir.dirs = make(map[string]*eventDir)
  165. rootEventDir.events = make(map[string]*aggregatedEvent)
  166. rootEventDir.events["."] = &aggregatedEvent{
  167. firstModTime: firstModTime,
  168. lastModTime: evTime,
  169. evType: event.Type,
  170. }
  171. a.resetNotifyTimerIfNeeded()
  172. return
  173. }
  174. parentDir := rootEventDir
  175. // Check if any parent directory is already tracked or will exceed
  176. // events per directory limit bottom up
  177. pathSegments := strings.Split(filepath.ToSlash(event.Name), "/")
  178. // As root dir cannot be further aggregated, allow up to maxFiles
  179. // children.
  180. localMaxFilesPerDir := maxFiles
  181. var currPath string
  182. for i, name := range pathSegments[:len(pathSegments)-1] {
  183. currPath = filepath.Join(currPath, name)
  184. if ev, ok := parentDir.events[name]; ok {
  185. ev.lastModTime = evTime
  186. ev.evType |= event.Type
  187. l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)
  188. return
  189. }
  190. if parentDir.childCount() == localMaxFilesPerDir {
  191. l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)
  192. event.Name = filepath.Dir(currPath)
  193. a.aggregateEvent(event, evTime, rootEventDir)
  194. return
  195. }
  196. // If there are no events below path, but we need to recurse
  197. // into that path, create eventDir at path.
  198. if newParent, ok := parentDir.dirs[name]; ok {
  199. parentDir = newParent
  200. } else {
  201. l.Debugln(a, "Creating eventDir at:", currPath)
  202. newParent = newEventDir()
  203. parentDir.dirs[name] = newParent
  204. parentDir = newParent
  205. }
  206. // Reset allowed children count to maxFilesPerDir for non-root
  207. if i == 0 {
  208. localMaxFilesPerDir = maxFilesPerDir
  209. }
  210. }
  211. name := pathSegments[len(pathSegments)-1]
  212. if ev, ok := parentDir.events[name]; ok {
  213. ev.lastModTime = evTime
  214. ev.evType |= event.Type
  215. l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)
  216. return
  217. }
  218. childDir, ok := parentDir.dirs[name]
  219. // If a dir existed at path, it would be removed from dirs, thus
  220. // childCount would not increase.
  221. if !ok && parentDir.childCount() == localMaxFilesPerDir {
  222. l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)
  223. event.Name = filepath.Dir(event.Name)
  224. a.aggregateEvent(event, evTime, rootEventDir)
  225. return
  226. }
  227. firstModTime := evTime
  228. if ok {
  229. firstModTime = childDir.firstModTime()
  230. event.Type |= childDir.eventType()
  231. delete(parentDir.dirs, name)
  232. }
  233. l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)
  234. parentDir.events[name] = &aggregatedEvent{
  235. firstModTime: firstModTime,
  236. lastModTime: evTime,
  237. evType: event.Type,
  238. }
  239. a.resetNotifyTimerIfNeeded()
  240. }
  241. func (a *aggregator) resetNotifyTimerIfNeeded() {
  242. if a.notifyTimerNeedsReset {
  243. a.resetNotifyTimer(a.notifyDelay)
  244. }
  245. }
  246. // resetNotifyTimer should only ever be called when notifyTimer has stopped
  247. // and notifyTimer.C been read from. Otherwise, call resetNotifyTimerIfNeeded.
  248. func (a *aggregator) resetNotifyTimer(duration time.Duration) {
  249. l.Debugln(a, "Resetting notifyTimer to", duration.String())
  250. a.notifyTimerNeedsReset = false
  251. a.notifyTimer.Reset(duration)
  252. }
  253. func (a *aggregator) actOnTimer(rootEventDir *eventDir, out chan<- []string) {
  254. eventCount := rootEventDir.eventCount()
  255. if eventCount == 0 {
  256. l.Debugln(a, "No tracked events, waiting for new event.")
  257. a.notifyTimerNeedsReset = true
  258. return
  259. }
  260. oldevents := a.popOldEvents(rootEventDir, ".", time.Now())
  261. if len(oldevents) == 0 {
  262. l.Debugln(a, "No old fs events")
  263. a.resetNotifyTimer(a.notifyDelay)
  264. return
  265. }
  266. // Sending to channel might block for a long time, but we need to keep
  267. // reading from notify backend channel to avoid overflow
  268. go a.notify(oldevents, out)
  269. }
  270. // Schedule scan for given events dispatching deletes last and reset notification
  271. // afterwards to set up for the next scan scheduling.
  272. func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []string) {
  273. timeBeforeSending := time.Now()
  274. l.Debugf("%v Notifying about %d fs events", a, len(oldEvents))
  275. separatedBatches := make(map[fs.EventType][]string)
  276. for path, event := range oldEvents {
  277. separatedBatches[event.evType] = append(separatedBatches[event.evType], path)
  278. }
  279. for _, evType := range [3]fs.EventType{fs.NonRemove, fs.Mixed, fs.Remove} {
  280. currBatch := separatedBatches[evType]
  281. if len(currBatch) != 0 {
  282. select {
  283. case out <- currBatch:
  284. case <-a.ctx.Done():
  285. return
  286. }
  287. }
  288. }
  289. // If sending to channel blocked for a long time,
  290. // shorten next notifyDelay accordingly.
  291. duration := time.Since(timeBeforeSending)
  292. buffer := time.Millisecond
  293. var nextDelay time.Duration
  294. switch {
  295. case duration < a.notifyDelay/10:
  296. nextDelay = a.notifyDelay
  297. case duration+buffer > a.notifyDelay:
  298. nextDelay = buffer
  299. default:
  300. nextDelay = a.notifyDelay - duration
  301. }
  302. select {
  303. case a.notifyTimerResetChan <- nextDelay:
  304. case <-a.ctx.Done():
  305. }
  306. }
  307. // popOldEvents finds events that should be scheduled for scanning recursively in dirs,
  308. // removes those events and empty eventDirs and returns a map with all the removed
  309. // events referenced by their filesystem path
  310. func (a *aggregator) popOldEvents(dir *eventDir, dirPath string, currTime time.Time) map[string]*aggregatedEvent {
  311. oldEvents := make(map[string]*aggregatedEvent)
  312. for childName, childDir := range dir.dirs {
  313. for evPath, event := range a.popOldEvents(childDir, filepath.Join(dirPath, childName), currTime) {
  314. oldEvents[evPath] = event
  315. }
  316. if childDir.childCount() == 0 {
  317. delete(dir.dirs, childName)
  318. }
  319. }
  320. for name, event := range dir.events {
  321. if a.isOld(event, currTime) {
  322. oldEvents[filepath.Join(dirPath, name)] = event
  323. delete(dir.events, name)
  324. }
  325. }
  326. return oldEvents
  327. }
  328. func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time) bool {
  329. // Deletes should always be scanned last, therefore they are always
  330. // delayed by letting them time out (see below).
  331. // An event that has not registered any new modifications recently is scanned.
  332. // a.notifyDelay is the user facing value signifying the normal delay between
  333. // a picking up a modification and scanning it. As scheduling scans happens at
  334. // regular intervals of a.notifyDelay the delay of a single event is not exactly
  335. // a.notifyDelay, but lies in in the range of 0.5 to 1.5 times a.notifyDelay.
  336. if ev.evType == fs.NonRemove && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
  337. return true
  338. }
  339. // When an event registers repeat modifications or involves removals it
  340. // is delayed to reduce resource usage, but after a certain time (notifyTimeout)
  341. // passed it is scanned anyway.
  342. return currTime.Sub(ev.firstModTime) > a.notifyTimeout
  343. }
  344. func (a *aggregator) String() string {
  345. return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())
  346. }
  347. func (a *aggregator) VerifyConfiguration(from, to config.Configuration) error {
  348. return nil
  349. }
  350. func (a *aggregator) CommitConfiguration(from, to config.Configuration) bool {
  351. for _, folderCfg := range to.Folders {
  352. if folderCfg.ID == a.folderID {
  353. select {
  354. case a.folderCfgUpdate <- folderCfg:
  355. case <-a.ctx.Done():
  356. }
  357. return true
  358. }
  359. }
  360. // Nothing to do, model will soon stop this
  361. return true
  362. }
  363. func (a *aggregator) updateConfig(folderCfg config.FolderConfiguration) {
  364. a.notifyDelay = time.Duration(folderCfg.FSWatcherDelayS) * time.Second
  365. a.notifyTimeout = notifyTimeout(folderCfg.FSWatcherDelayS)
  366. a.folderCfg = folderCfg
  367. }
  368. func updateInProgressSet(event events.Event, inProgress map[string]struct{}) {
  369. if event.Type == events.ItemStarted {
  370. path := event.Data.(map[string]string)["item"]
  371. inProgress[path] = struct{}{}
  372. } else if event.Type == events.ItemFinished {
  373. path := event.Data.(map[string]interface{})["item"].(string)
  374. delete(inProgress, path)
  375. }
  376. }
  377. // Events that involve removals or continuously receive new modifications are
  378. // delayed but must time out at some point. The following numbers come out of thin
  379. // air, they were just considered as a sensible compromise between fast updates and
  380. // saving resources. For short delays the timeout is 6 times the delay, capped at 1
  381. // minute. For delays longer than 1 minute, the delay and timeout are equal.
  382. func notifyTimeout(eventDelayS int) time.Duration {
  383. shortDelayS := 10
  384. shortDelayMultiplicator := 6
  385. longDelayS := 60
  386. longDelayTimeout := time.Duration(1) * time.Minute
  387. if eventDelayS < shortDelayS {
  388. return time.Duration(eventDelayS*shortDelayMultiplicator) * time.Second
  389. }
  390. if eventDelayS < longDelayS {
  391. return longDelayTimeout
  392. }
  393. return time.Duration(eventDelayS) * time.Second
  394. }