| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 | 
							- // Copyright (C) 2016 The Syncthing Authors.
 
- //
 
- // This Source Code Form is subject to the terms of the Mozilla Public
 
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
 
- // You can obtain one at http://mozilla.org/MPL/2.0/.
 
- package watchaggregator
 
- import (
 
- 	"context"
 
- 	"fmt"
 
- 	"path/filepath"
 
- 	"strings"
 
- 	"time"
 
- 	"github.com/syncthing/syncthing/lib/config"
 
- 	"github.com/syncthing/syncthing/lib/events"
 
- 	"github.com/syncthing/syncthing/lib/fs"
 
- )
 
- // Not meant to be changed, but must be changeable for tests
 
- var (
 
- 	maxFiles       = 512
 
- 	maxFilesPerDir = 128
 
- )
 
- // aggregatedEvent represents potentially multiple events at and/or recursively
 
- // below one path until it times out and a scan is scheduled.
 
- // If it represents multiple events and there are events of both Remove and
 
- // NonRemove types, the evType attribute is Mixed (as returned by fs.Event.Merge).
 
- type aggregatedEvent struct {
 
- 	firstModTime time.Time
 
- 	lastModTime  time.Time
 
- 	evType       fs.EventType
 
- }
 
- // Stores pointers to both aggregated events directly within this directory and
 
- // child directories recursively containing aggregated events themselves.
 
- type eventDir struct {
 
- 	events map[string]*aggregatedEvent
 
- 	dirs   map[string]*eventDir
 
- }
 
- func newEventDir() *eventDir {
 
- 	return &eventDir{
 
- 		events: make(map[string]*aggregatedEvent),
 
- 		dirs:   make(map[string]*eventDir),
 
- 	}
 
- }
 
- func (dir *eventDir) childCount() int {
 
- 	return len(dir.events) + len(dir.dirs)
 
- }
 
- func (dir *eventDir) firstModTime() time.Time {
 
- 	if dir.childCount() == 0 {
 
- 		panic("bug: firstModTime must not be used on empty eventDir")
 
- 	}
 
- 	firstModTime := time.Now()
 
- 	for _, childDir := range dir.dirs {
 
- 		dirTime := childDir.firstModTime()
 
- 		if dirTime.Before(firstModTime) {
 
- 			firstModTime = dirTime
 
- 		}
 
- 	}
 
- 	for _, event := range dir.events {
 
- 		if event.firstModTime.Before(firstModTime) {
 
- 			firstModTime = event.firstModTime
 
- 		}
 
- 	}
 
- 	return firstModTime
 
- }
 
- func (dir *eventDir) eventType() fs.EventType {
 
- 	if dir.childCount() == 0 {
 
- 		panic("bug: eventType must not be used on empty eventDir")
 
- 	}
 
- 	var evType fs.EventType
 
- 	for _, childDir := range dir.dirs {
 
- 		evType |= childDir.eventType()
 
- 		if evType == fs.Mixed {
 
- 			return fs.Mixed
 
- 		}
 
- 	}
 
- 	for _, event := range dir.events {
 
- 		evType |= event.evType
 
- 		if evType == fs.Mixed {
 
- 			return fs.Mixed
 
- 		}
 
- 	}
 
- 	return evType
 
- }
 
- type aggregator struct {
 
- 	// folderID never changes and is accessed in CommitConfiguration, which
 
- 	// asynchronously updates folderCfg -> can't use folderCfg.ID (racy)
 
- 	folderID        string
 
- 	folderCfg       config.FolderConfiguration
 
- 	folderCfgUpdate chan config.FolderConfiguration
 
- 	// Time after which an event is scheduled for scanning when no modifications occur.
 
- 	notifyDelay time.Duration
 
- 	// Time after which an event is scheduled for scanning even though modifications occur.
 
- 	notifyTimeout         time.Duration
 
- 	notifyTimer           *time.Timer
 
- 	notifyTimerNeedsReset bool
 
- 	notifyTimerResetChan  chan time.Duration
 
- 	counts                eventCounter
 
- 	root                  *eventDir
 
- 	ctx                   context.Context
 
- }
 
- type eventCounter struct {
 
- 	removes    int // Includes mixed events.
 
- 	nonRemoves int
 
- }
 
- func (c *eventCounter) add(typ fs.EventType, n int) {
 
- 	if typ&fs.Remove != 0 {
 
- 		c.removes += n
 
- 	} else {
 
- 		c.nonRemoves += n
 
- 	}
 
- }
 
- func (c *eventCounter) total() int { return c.removes + c.nonRemoves }
 
- func newAggregator(ctx context.Context, folderCfg config.FolderConfiguration) *aggregator {
 
- 	a := &aggregator{
 
- 		folderID:              folderCfg.ID,
 
- 		folderCfgUpdate:       make(chan config.FolderConfiguration),
 
- 		notifyTimerNeedsReset: false,
 
- 		notifyTimerResetChan:  make(chan time.Duration),
 
- 		root:                  newEventDir(),
 
- 		ctx:                   ctx,
 
- 	}
 
- 	a.updateConfig(folderCfg)
 
- 	return a
 
- }
 
- func Aggregate(ctx context.Context, in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger) {
 
- 	a := newAggregator(ctx, folderCfg)
 
- 	// Necessary for unit tests where the backend is mocked
 
- 	go a.mainLoop(in, out, cfg, evLogger)
 
- }
 
- func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg config.Wrapper, evLogger events.Logger) {
 
- 	a.notifyTimer = time.NewTimer(a.notifyDelay)
 
- 	defer a.notifyTimer.Stop()
 
- 	inProgressItemSubscription := evLogger.Subscribe(events.ItemStarted | events.ItemFinished)
 
- 	defer inProgressItemSubscription.Unsubscribe()
 
- 	cfg.Subscribe(a)
 
- 	defer cfg.Unsubscribe(a)
 
- 	inProgress := make(map[string]struct{})
 
- 	for {
 
- 		select {
 
- 		case event := <-in:
 
- 			a.newEvent(event, inProgress)
 
- 		case event, ok := <-inProgressItemSubscription.C():
 
- 			if ok {
 
- 				updateInProgressSet(event, inProgress)
 
- 			}
 
- 		case <-a.notifyTimer.C:
 
- 			a.actOnTimer(out)
 
- 		case interval := <-a.notifyTimerResetChan:
 
- 			a.resetNotifyTimer(interval)
 
- 		case folderCfg := <-a.folderCfgUpdate:
 
- 			a.updateConfig(folderCfg)
 
- 		case <-a.ctx.Done():
 
- 			l.Debugln(a, "Stopped")
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- func (a *aggregator) newEvent(event fs.Event, inProgress map[string]struct{}) {
 
- 	if _, ok := a.root.events["."]; ok {
 
- 		l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)
 
- 		return
 
- 	}
 
- 	if _, ok := inProgress[event.Name]; ok {
 
- 		l.Debugln(a, "Skipping path we modified:", event.Name)
 
- 		return
 
- 	}
 
- 	a.aggregateEvent(event, time.Now())
 
- }
 
- func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time) {
 
- 	if event.Name == "." || a.counts.total() == maxFiles {
 
- 		l.Debugln(a, "Scan entire folder")
 
- 		firstModTime := evTime
 
- 		if a.root.childCount() != 0 {
 
- 			event.Type = event.Type.Merge(a.root.eventType())
 
- 			firstModTime = a.root.firstModTime()
 
- 		}
 
- 		a.root.dirs = make(map[string]*eventDir)
 
- 		a.root.events = make(map[string]*aggregatedEvent)
 
- 		a.root.events["."] = &aggregatedEvent{
 
- 			firstModTime: firstModTime,
 
- 			lastModTime:  evTime,
 
- 			evType:       event.Type,
 
- 		}
 
- 		a.counts = eventCounter{}
 
- 		a.counts.add(event.Type, 1)
 
- 		a.resetNotifyTimerIfNeeded()
 
- 		return
 
- 	}
 
- 	parentDir := a.root
 
- 	// Check if any parent directory is already tracked or will exceed
 
- 	// events per directory limit bottom up
 
- 	pathSegments := strings.Split(filepath.ToSlash(event.Name), "/")
 
- 	// As root dir cannot be further aggregated, allow up to maxFiles
 
- 	// children.
 
- 	localMaxFilesPerDir := maxFiles
 
- 	var currPath string
 
- 	for i, name := range pathSegments[:len(pathSegments)-1] {
 
- 		currPath = filepath.Join(currPath, name)
 
- 		if ev, ok := parentDir.events[name]; ok {
 
- 			ev.lastModTime = evTime
 
- 			if merged := event.Type.Merge(ev.evType); ev.evType != merged {
 
- 				a.counts.add(ev.evType, -1)
 
- 				a.counts.add(merged, 1)
 
- 				ev.evType = merged
 
- 			}
 
- 			l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)
 
- 			return
 
- 		}
 
- 		if parentDir.childCount() == localMaxFilesPerDir {
 
- 			l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)
 
- 			event.Name = filepath.Dir(currPath)
 
- 			a.aggregateEvent(event, evTime)
 
- 			return
 
- 		}
 
- 		// If there are no events below path, but we need to recurse
 
- 		// into that path, create eventDir at path.
 
- 		if newParent, ok := parentDir.dirs[name]; ok {
 
- 			parentDir = newParent
 
- 		} else {
 
- 			l.Debugln(a, "Creating eventDir at:", currPath)
 
- 			newParent = newEventDir()
 
- 			parentDir.dirs[name] = newParent
 
- 			parentDir = newParent
 
- 		}
 
- 		// Reset allowed children count to maxFilesPerDir for non-root
 
- 		if i == 0 {
 
- 			localMaxFilesPerDir = maxFilesPerDir
 
- 		}
 
- 	}
 
- 	name := pathSegments[len(pathSegments)-1]
 
- 	if ev, ok := parentDir.events[name]; ok {
 
- 		ev.lastModTime = evTime
 
- 		if merged := event.Type.Merge(ev.evType); ev.evType != merged {
 
- 			a.counts.add(ev.evType, -1)
 
- 			a.counts.add(merged, 1)
 
- 			ev.evType = merged
 
- 		}
 
- 		l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)
 
- 		return
 
- 	}
 
- 	childDir, ok := parentDir.dirs[name]
 
- 	// If a dir existed at path, it would be removed from dirs, thus
 
- 	// childCount would not increase.
 
- 	if !ok && parentDir.childCount() == localMaxFilesPerDir {
 
- 		l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)
 
- 		event.Name = filepath.Dir(event.Name)
 
- 		a.aggregateEvent(event, evTime)
 
- 		return
 
- 	}
 
- 	firstModTime := evTime
 
- 	if ok {
 
- 		firstModTime = childDir.firstModTime()
 
- 		if merged := event.Type.Merge(childDir.eventType()); event.Type != merged {
 
- 			a.counts.add(event.Type, -1)
 
- 			event.Type = merged
 
- 		}
 
- 		delete(parentDir.dirs, name)
 
- 	}
 
- 	l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)
 
- 	parentDir.events[name] = &aggregatedEvent{
 
- 		firstModTime: firstModTime,
 
- 		lastModTime:  evTime,
 
- 		evType:       event.Type,
 
- 	}
 
- 	a.counts.add(event.Type, 1)
 
- 	a.resetNotifyTimerIfNeeded()
 
- }
 
- func (a *aggregator) resetNotifyTimerIfNeeded() {
 
- 	if a.notifyTimerNeedsReset {
 
- 		a.resetNotifyTimer(a.notifyDelay)
 
- 	}
 
- }
 
- // resetNotifyTimer should only ever be called when notifyTimer has stopped
 
- // and notifyTimer.C been read from. Otherwise, call resetNotifyTimerIfNeeded.
 
- func (a *aggregator) resetNotifyTimer(duration time.Duration) {
 
- 	l.Debugln(a, "Resetting notifyTimer to", duration.String())
 
- 	a.notifyTimerNeedsReset = false
 
- 	a.notifyTimer.Reset(duration)
 
- }
 
- func (a *aggregator) actOnTimer(out chan<- []string) {
 
- 	c := a.counts.total()
 
- 	if c == 0 {
 
- 		l.Debugln(a, "No tracked events, waiting for new event.")
 
- 		a.notifyTimerNeedsReset = true
 
- 		return
 
- 	}
 
- 	oldEvents := make(map[string]*aggregatedEvent, c)
 
- 	a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)
 
- 	if a.notifyDelay != a.notifyTimeout && a.counts.nonRemoves == 0 && a.counts.removes != 0 {
 
- 		// Only delayed events remaining, no need to delay them additionally
 
- 		a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)
 
- 	}
 
- 	if len(oldEvents) == 0 {
 
- 		l.Debugln(a, "No old fs events")
 
- 		a.resetNotifyTimer(a.notifyDelay)
 
- 		return
 
- 	}
 
- 	// Sending to channel might block for a long time, but we need to keep
 
- 	// reading from notify backend channel to avoid overflow
 
- 	go a.notify(oldEvents, out)
 
- }
 
- // Schedule scan for given events dispatching deletes last and reset notification
 
- // afterwards to set up for the next scan scheduling.
 
- func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []string) {
 
- 	timeBeforeSending := time.Now()
 
- 	l.Debugf("%v Notifying about %d fs events", a, len(oldEvents))
 
- 	separatedBatches := make(map[fs.EventType][]string)
 
- 	for path, event := range oldEvents {
 
- 		separatedBatches[event.evType] = append(separatedBatches[event.evType], path)
 
- 	}
 
- 	for _, evType := range [3]fs.EventType{fs.NonRemove, fs.Mixed, fs.Remove} {
 
- 		currBatch := separatedBatches[evType]
 
- 		if len(currBatch) != 0 {
 
- 			select {
 
- 			case out <- currBatch:
 
- 			case <-a.ctx.Done():
 
- 				return
 
- 			}
 
- 		}
 
- 	}
 
- 	// If sending to channel blocked for a long time,
 
- 	// shorten next notifyDelay accordingly.
 
- 	duration := time.Since(timeBeforeSending)
 
- 	buffer := time.Millisecond
 
- 	var nextDelay time.Duration
 
- 	switch {
 
- 	case duration < a.notifyDelay/10:
 
- 		nextDelay = a.notifyDelay
 
- 	case duration+buffer > a.notifyDelay:
 
- 		nextDelay = buffer
 
- 	default:
 
- 		nextDelay = a.notifyDelay - duration
 
- 	}
 
- 	select {
 
- 	case a.notifyTimerResetChan <- nextDelay:
 
- 	case <-a.ctx.Done():
 
- 	}
 
- }
 
- // popOldEvents finds events that should be scheduled for scanning recursively in dirs,
 
- // removes those events and empty eventDirs and returns a map with all the removed
 
- // events referenced by their filesystem path
 
- func (a *aggregator) popOldEventsTo(to map[string]*aggregatedEvent, dir *eventDir, dirPath string, currTime time.Time, delayRem bool) {
 
- 	for childName, childDir := range dir.dirs {
 
- 		a.popOldEventsTo(to, childDir, filepath.Join(dirPath, childName), currTime, delayRem)
 
- 		if childDir.childCount() == 0 {
 
- 			delete(dir.dirs, childName)
 
- 		}
 
- 	}
 
- 	for name, event := range dir.events {
 
- 		if a.isOld(event, currTime, delayRem) {
 
- 			to[filepath.Join(dirPath, name)] = event
 
- 			delete(dir.events, name)
 
- 			a.counts.add(event.evType, -1)
 
- 		}
 
- 	}
 
- }
 
- func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time, delayRem bool) bool {
 
- 	// Deletes should in general be scanned last, therefore they are delayed by
 
- 	// letting them time out. This behaviour is overridden by delayRem == false.
 
- 	// Refer to following comments as to why.
 
- 	// An event that has not registered any new modifications recently is scanned.
 
- 	// a.notifyDelay is the user facing value signifying the normal delay between
 
- 	// picking up a modification and scanning it. As scheduling scans happens at
 
- 	// regular intervals of a.notifyDelay the delay of a single event is not exactly
 
- 	// a.notifyDelay, but lies in the range of 0.5 to 1.5 times a.notifyDelay.
 
- 	if (!delayRem || ev.evType == fs.NonRemove) && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
 
- 		return true
 
- 	}
 
- 	// When an event registers repeat modifications or involves removals it
 
- 	// is delayed to reduce resource usage, but after a certain time (notifyTimeout)
 
- 	// passed it is scanned anyway.
 
- 	// If only removals are remaining to be scanned, there is no point to delay
 
- 	// removals further, so this behaviour is overridden by delayRem == false.
 
- 	return currTime.Sub(ev.firstModTime) > a.notifyTimeout
 
- }
 
- func (a *aggregator) String() string {
 
- 	return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())
 
- }
 
- func (a *aggregator) CommitConfiguration(_, to config.Configuration) bool {
 
- 	for _, folderCfg := range to.Folders {
 
- 		if folderCfg.ID == a.folderID {
 
- 			select {
 
- 			case a.folderCfgUpdate <- folderCfg:
 
- 			case <-a.ctx.Done():
 
- 			}
 
- 			return true
 
- 		}
 
- 	}
 
- 	// Nothing to do, model will soon stop this
 
- 	return true
 
- }
 
- func (a *aggregator) updateConfig(folderCfg config.FolderConfiguration) {
 
- 	a.notifyDelay = time.Duration(folderCfg.FSWatcherDelayS) * time.Second
 
- 	if maxDelay := folderCfg.FSWatcherTimeoutS; maxDelay > 0 {
 
- 		// FSWatcherTimeoutS is set explicitly so use that, but it also
 
- 		// can't be lower than FSWatcherDelayS
 
- 		a.notifyTimeout = time.Duration(max(maxDelay, folderCfg.FSWatcherDelayS)) * time.Second
 
- 	} else {
 
- 		// Use the default FSWatcherTimeoutS calculation
 
- 		a.notifyTimeout = notifyTimeout(folderCfg.FSWatcherDelayS)
 
- 	}
 
- 	a.folderCfg = folderCfg
 
- }
 
- func updateInProgressSet(event events.Event, inProgress map[string]struct{}) {
 
- 	if event.Type == events.ItemStarted {
 
- 		path := event.Data.(map[string]string)["item"]
 
- 		inProgress[path] = struct{}{}
 
- 	} else if event.Type == events.ItemFinished {
 
- 		path := event.Data.(map[string]interface{})["item"].(string)
 
- 		delete(inProgress, path)
 
- 	}
 
- }
 
- // Events that involve removals or continuously receive new modifications are
 
- // delayed but must time out at some point. The following numbers come out of thin
 
- // air, they were just considered as a sensible compromise between fast updates and
 
- // saving resources. For short delays the timeout is 6 times the delay, capped at 1
 
- // minute. For delays longer than 1 minute, the delay and timeout are equal.
 
- func notifyTimeout(eventDelayS float64) time.Duration {
 
- 	const (
 
- 		shortDelayS             = 10
 
- 		shortDelayMultiplicator = 6
 
- 		longDelayS              = 60
 
- 		longDelayTimeout        = time.Minute
 
- 	)
 
- 	if eventDelayS < shortDelayS {
 
- 		return time.Duration(eventDelayS*shortDelayMultiplicator) * time.Second
 
- 	}
 
- 	if eventDelayS < longDelayS {
 
- 		return longDelayTimeout
 
- 	}
 
- 	return time.Duration(eventDelayS) * time.Second
 
- }
 
 
  |