| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 | // Copyright (C) 2016 The Syncthing Authors.//// This Source Code Form is subject to the terms of the Mozilla Public// License, v. 2.0. If a copy of the MPL was not distributed with this file,// You can obtain one at http://mozilla.org/MPL/2.0/.package watchaggregatorimport (	"context"	"fmt"	"path/filepath"	"strings"	"time"	"github.com/syncthing/syncthing/lib/config"	"github.com/syncthing/syncthing/lib/events"	"github.com/syncthing/syncthing/lib/fs")// Not meant to be changed, but must be changeable for testsvar (	maxFiles       = 512	maxFilesPerDir = 128)// aggregatedEvent represents potentially multiple events at and/or recursively// below one path until it times out and a scan is scheduled.// If it represents multiple events and there are events of both Remove and// NonRemove types, the evType attribute is Mixed (as returned by fs.Event.Merge).type aggregatedEvent struct {	firstModTime time.Time	lastModTime  time.Time	evType       fs.EventType}// Stores pointers to both aggregated events directly within this directory and// child directories recursively containing aggregated events themselves.type eventDir struct {	events map[string]*aggregatedEvent	dirs   map[string]*eventDir}func newEventDir() *eventDir {	return &eventDir{		events: make(map[string]*aggregatedEvent),		dirs:   make(map[string]*eventDir),	}}func (dir *eventDir) childCount() int {	return len(dir.events) + len(dir.dirs)}func (dir *eventDir) firstModTime() time.Time {	if dir.childCount() == 0 {		panic("bug: firstModTime must not be used on empty eventDir")	}	firstModTime := time.Now()	for _, childDir := range dir.dirs {		dirTime := childDir.firstModTime()		if dirTime.Before(firstModTime) {			firstModTime = dirTime		}	}	for _, event := range dir.events {		if event.firstModTime.Before(firstModTime) {			firstModTime = event.firstModTime		}	}	return firstModTime}func (dir *eventDir) eventType() fs.EventType {	if dir.childCount() == 0 {		panic("bug: eventType must not be used on empty eventDir")	}	var evType fs.EventType	for _, childDir := range dir.dirs {		evType |= childDir.eventType()		if evType == fs.Mixed {			return fs.Mixed		}	}	for _, event := range dir.events {		evType |= event.evType		if evType == fs.Mixed {			return fs.Mixed		}	}	return evType}type aggregator struct {	// folderID never changes and is accessed in CommitConfiguration, which	// asynchronously updates folderCfg -> can't use folderCfg.ID (racy)	folderID        string	folderCfg       config.FolderConfiguration	folderCfgUpdate chan config.FolderConfiguration	// Time after which an event is scheduled for scanning when no modifications occur.	notifyDelay time.Duration	// Time after which an event is scheduled for scanning even though modifications occur.	notifyTimeout         time.Duration	notifyTimer           *time.Timer	notifyTimerNeedsReset bool	notifyTimerResetChan  chan time.Duration	counts                eventCounter	root                  *eventDir	ctx                   context.Context}type eventCounter struct {	removes    int // Includes mixed events.	nonRemoves int}func (c *eventCounter) add(typ fs.EventType, n int) {	if typ&fs.Remove != 0 {		c.removes += n	} else {		c.nonRemoves += n	}}func (c *eventCounter) total() int { return c.removes + c.nonRemoves }func newAggregator(ctx context.Context, folderCfg config.FolderConfiguration) *aggregator {	a := &aggregator{		folderID:              folderCfg.ID,		folderCfgUpdate:       make(chan config.FolderConfiguration),		notifyTimerNeedsReset: false,		notifyTimerResetChan:  make(chan time.Duration),		root:                  newEventDir(),		ctx:                   ctx,	}	a.updateConfig(folderCfg)	return a}func Aggregate(ctx context.Context, in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger) {	a := newAggregator(ctx, folderCfg)	// Necessary for unit tests where the backend is mocked	go a.mainLoop(in, out, cfg, evLogger)}func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg config.Wrapper, evLogger events.Logger) {	a.notifyTimer = time.NewTimer(a.notifyDelay)	defer a.notifyTimer.Stop()	inProgressItemSubscription := evLogger.Subscribe(events.ItemStarted | events.ItemFinished)	defer inProgressItemSubscription.Unsubscribe()	cfg.Subscribe(a)	defer cfg.Unsubscribe(a)	inProgress := make(map[string]struct{})	for {		select {		case event := <-in:			a.newEvent(event, inProgress)		case event, ok := <-inProgressItemSubscription.C():			if ok {				updateInProgressSet(event, inProgress)			}		case <-a.notifyTimer.C:			a.actOnTimer(out)		case interval := <-a.notifyTimerResetChan:			a.resetNotifyTimer(interval)		case folderCfg := <-a.folderCfgUpdate:			a.updateConfig(folderCfg)		case <-a.ctx.Done():			l.Debugln(a, "Stopped")			return		}	}}func (a *aggregator) newEvent(event fs.Event, inProgress map[string]struct{}) {	if _, ok := a.root.events["."]; ok {		l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)		return	}	if _, ok := inProgress[event.Name]; ok {		l.Debugln(a, "Skipping path we modified:", event.Name)		return	}	a.aggregateEvent(event, time.Now())}func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time) {	if event.Name == "." || a.counts.total() == maxFiles {		l.Debugln(a, "Scan entire folder")		firstModTime := evTime		if a.root.childCount() != 0 {			event.Type = event.Type.Merge(a.root.eventType())			firstModTime = a.root.firstModTime()		}		a.root.dirs = make(map[string]*eventDir)		a.root.events = make(map[string]*aggregatedEvent)		a.root.events["."] = &aggregatedEvent{			firstModTime: firstModTime,			lastModTime:  evTime,			evType:       event.Type,		}		a.counts = eventCounter{}		a.counts.add(event.Type, 1)		a.resetNotifyTimerIfNeeded()		return	}	parentDir := a.root	// Check if any parent directory is already tracked or will exceed	// events per directory limit bottom up	pathSegments := strings.Split(filepath.ToSlash(event.Name), "/")	// As root dir cannot be further aggregated, allow up to maxFiles	// children.	localMaxFilesPerDir := maxFiles	var currPath string	for i, name := range pathSegments[:len(pathSegments)-1] {		currPath = filepath.Join(currPath, name)		if ev, ok := parentDir.events[name]; ok {			ev.lastModTime = evTime			if merged := event.Type.Merge(ev.evType); ev.evType != merged {				a.counts.add(ev.evType, -1)				a.counts.add(merged, 1)				ev.evType = merged			}			l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)			return		}		if parentDir.childCount() == localMaxFilesPerDir {			l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)			event.Name = filepath.Dir(currPath)			a.aggregateEvent(event, evTime)			return		}		// If there are no events below path, but we need to recurse		// into that path, create eventDir at path.		if newParent, ok := parentDir.dirs[name]; ok {			parentDir = newParent		} else {			l.Debugln(a, "Creating eventDir at:", currPath)			newParent = newEventDir()			parentDir.dirs[name] = newParent			parentDir = newParent		}		// Reset allowed children count to maxFilesPerDir for non-root		if i == 0 {			localMaxFilesPerDir = maxFilesPerDir		}	}	name := pathSegments[len(pathSegments)-1]	if ev, ok := parentDir.events[name]; ok {		ev.lastModTime = evTime		if merged := event.Type.Merge(ev.evType); ev.evType != merged {			a.counts.add(ev.evType, -1)			a.counts.add(merged, 1)			ev.evType = merged		}		l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)		return	}	childDir, ok := parentDir.dirs[name]	// If a dir existed at path, it would be removed from dirs, thus	// childCount would not increase.	if !ok && parentDir.childCount() == localMaxFilesPerDir {		l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)		event.Name = filepath.Dir(event.Name)		a.aggregateEvent(event, evTime)		return	}	firstModTime := evTime	if ok {		firstModTime = childDir.firstModTime()		if merged := event.Type.Merge(childDir.eventType()); event.Type != merged {			a.counts.add(event.Type, -1)			event.Type = merged		}		delete(parentDir.dirs, name)	}	l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)	parentDir.events[name] = &aggregatedEvent{		firstModTime: firstModTime,		lastModTime:  evTime,		evType:       event.Type,	}	a.counts.add(event.Type, 1)	a.resetNotifyTimerIfNeeded()}func (a *aggregator) resetNotifyTimerIfNeeded() {	if a.notifyTimerNeedsReset {		a.resetNotifyTimer(a.notifyDelay)	}}// resetNotifyTimer should only ever be called when notifyTimer has stopped// and notifyTimer.C been read from. Otherwise, call resetNotifyTimerIfNeeded.func (a *aggregator) resetNotifyTimer(duration time.Duration) {	l.Debugln(a, "Resetting notifyTimer to", duration.String())	a.notifyTimerNeedsReset = false	a.notifyTimer.Reset(duration)}func (a *aggregator) actOnTimer(out chan<- []string) {	c := a.counts.total()	if c == 0 {		l.Debugln(a, "No tracked events, waiting for new event.")		a.notifyTimerNeedsReset = true		return	}	oldEvents := make(map[string]*aggregatedEvent, c)	a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)	if a.notifyDelay != a.notifyTimeout && a.counts.nonRemoves == 0 && a.counts.removes != 0 {		// Only delayed events remaining, no need to delay them additionally		a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)	}	if len(oldEvents) == 0 {		l.Debugln(a, "No old fs events")		a.resetNotifyTimer(a.notifyDelay)		return	}	// Sending to channel might block for a long time, but we need to keep	// reading from notify backend channel to avoid overflow	go a.notify(oldEvents, out)}// Schedule scan for given events dispatching deletes last and reset notification// afterwards to set up for the next scan scheduling.func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []string) {	timeBeforeSending := time.Now()	l.Debugf("%v Notifying about %d fs events", a, len(oldEvents))	separatedBatches := make(map[fs.EventType][]string)	for path, event := range oldEvents {		separatedBatches[event.evType] = append(separatedBatches[event.evType], path)	}	for _, evType := range [3]fs.EventType{fs.NonRemove, fs.Mixed, fs.Remove} {		currBatch := separatedBatches[evType]		if len(currBatch) != 0 {			select {			case out <- currBatch:			case <-a.ctx.Done():				return			}		}	}	// If sending to channel blocked for a long time,	// shorten next notifyDelay accordingly.	duration := time.Since(timeBeforeSending)	buffer := time.Millisecond	var nextDelay time.Duration	switch {	case duration < a.notifyDelay/10:		nextDelay = a.notifyDelay	case duration+buffer > a.notifyDelay:		nextDelay = buffer	default:		nextDelay = a.notifyDelay - duration	}	select {	case a.notifyTimerResetChan <- nextDelay:	case <-a.ctx.Done():	}}// popOldEvents finds events that should be scheduled for scanning recursively in dirs,// removes those events and empty eventDirs and returns a map with all the removed// events referenced by their filesystem pathfunc (a *aggregator) popOldEventsTo(to map[string]*aggregatedEvent, dir *eventDir, dirPath string, currTime time.Time, delayRem bool) {	for childName, childDir := range dir.dirs {		a.popOldEventsTo(to, childDir, filepath.Join(dirPath, childName), currTime, delayRem)		if childDir.childCount() == 0 {			delete(dir.dirs, childName)		}	}	for name, event := range dir.events {		if a.isOld(event, currTime, delayRem) {			to[filepath.Join(dirPath, name)] = event			delete(dir.events, name)			a.counts.add(event.evType, -1)		}	}}func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time, delayRem bool) bool {	// Deletes should in general be scanned last, therefore they are delayed by	// letting them time out. This behaviour is overridden by delayRem == false.	// Refer to following comments as to why.	// An event that has not registered any new modifications recently is scanned.	// a.notifyDelay is the user facing value signifying the normal delay between	// picking up a modification and scanning it. As scheduling scans happens at	// regular intervals of a.notifyDelay the delay of a single event is not exactly	// a.notifyDelay, but lies in the range of 0.5 to 1.5 times a.notifyDelay.	if (!delayRem || ev.evType == fs.NonRemove) && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {		return true	}	// When an event registers repeat modifications or involves removals it	// is delayed to reduce resource usage, but after a certain time (notifyTimeout)	// passed it is scanned anyway.	// If only removals are remaining to be scanned, there is no point to delay	// removals further, so this behaviour is overridden by delayRem == false.	return currTime.Sub(ev.firstModTime) > a.notifyTimeout}func (a *aggregator) String() string {	return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())}func (a *aggregator) CommitConfiguration(_, to config.Configuration) bool {	for _, folderCfg := range to.Folders {		if folderCfg.ID == a.folderID {			select {			case a.folderCfgUpdate <- folderCfg:			case <-a.ctx.Done():			}			return true		}	}	// Nothing to do, model will soon stop this	return true}func (a *aggregator) updateConfig(folderCfg config.FolderConfiguration) {	a.notifyDelay = time.Duration(folderCfg.FSWatcherDelayS) * time.Second	if maxDelay := folderCfg.FSWatcherTimeoutS; maxDelay > 0 {		// FSWatcherTimeoutS is set explicitly so use that, but it also		// can't be lower than FSWatcherDelayS		a.notifyTimeout = time.Duration(max(maxDelay, folderCfg.FSWatcherDelayS) * float64(time.Second))	} else {		// Use the default FSWatcherTimeoutS calculation		a.notifyTimeout = notifyTimeout(folderCfg.FSWatcherDelayS)	}	a.folderCfg = folderCfg}func updateInProgressSet(event events.Event, inProgress map[string]struct{}) {	if event.Type == events.ItemStarted {		path := event.Data.(map[string]string)["item"]		inProgress[path] = struct{}{}	} else if event.Type == events.ItemFinished {		path := event.Data.(map[string]interface{})["item"].(string)		delete(inProgress, path)	}}// Events that involve removals or continuously receive new modifications are// delayed but must time out at some point. The following numbers come out of thin// air, they were just considered as a sensible compromise between fast updates and// saving resources. For short delays the timeout is 6 times the delay, capped at 1// minute. For delays longer than 1 minute, the delay and timeout are equal.func notifyTimeout(eventDelayS float64) time.Duration {	const (		shortDelayS             = 10		shortDelayMultiplicator = 6		longDelayS              = 60		longDelayTimeout        = time.Minute	)	if eventDelayS < shortDelayS {		return time.Duration(eventDelayS * shortDelayMultiplicator * float64(time.Second))	}	if eventDelayS < longDelayS {		return longDelayTimeout	}	return time.Duration(eventDelayS * float64(time.Second))}
 |