| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- // Copyright (C) 2014 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package model
- import (
- "context"
- "errors"
- "fmt"
- "math/rand"
- "time"
- "github.com/syncthing/syncthing/lib/config"
- "github.com/syncthing/syncthing/lib/db"
- "github.com/syncthing/syncthing/lib/events"
- "github.com/syncthing/syncthing/lib/ignore"
- "github.com/syncthing/syncthing/lib/protocol"
- "github.com/syncthing/syncthing/lib/sync"
- "github.com/syncthing/syncthing/lib/watchaggregator"
- )
- var errWatchNotStarted = errors.New("not started")
- type folder struct {
- stateTracker
- config.FolderConfiguration
- localFlags uint32
- model *Model
- shortID protocol.ShortID
- ctx context.Context
- cancel context.CancelFunc
- scanInterval time.Duration
- scanTimer *time.Timer
- scanNow chan rescanRequest
- scanDelay chan time.Duration
- initialScanFinished chan struct{}
- stopped chan struct{}
- pullScheduled chan struct{}
- watchCancel context.CancelFunc
- watchChan chan []string
- restartWatchChan chan struct{}
- watchErr error
- watchErrMut sync.Mutex
- puller puller
- }
- type rescanRequest struct {
- subdirs []string
- err chan error
- }
- type puller interface {
- pull() bool // true when successfull and should not be retried
- }
- func newFolder(model *Model, cfg config.FolderConfiguration) folder {
- ctx, cancel := context.WithCancel(context.Background())
- return folder{
- stateTracker: newStateTracker(cfg.ID),
- FolderConfiguration: cfg,
- model: model,
- shortID: model.shortID,
- ctx: ctx,
- cancel: cancel,
- scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
- scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
- scanNow: make(chan rescanRequest),
- scanDelay: make(chan time.Duration),
- initialScanFinished: make(chan struct{}),
- stopped: make(chan struct{}),
- pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
- watchCancel: func() {},
- restartWatchChan: make(chan struct{}, 1),
- watchErrMut: sync.NewMutex(),
- }
- }
- func (f *folder) Serve() {
- l.Debugln(f, "starting")
- defer l.Debugln(f, "exiting")
- defer func() {
- f.scanTimer.Stop()
- f.setState(FolderIdle)
- close(f.stopped)
- }()
- pause := f.basePause()
- pullFailTimer := time.NewTimer(0)
- <-pullFailTimer.C
- if f.FSWatcherEnabled && f.CheckHealth() == nil {
- f.startWatch()
- }
- initialCompleted := f.initialScanFinished
- for {
- select {
- case <-f.ctx.Done():
- return
- case <-f.pullScheduled:
- pullFailTimer.Stop()
- select {
- case <-pullFailTimer.C:
- default:
- }
- if !f.puller.pull() {
- // Pulling failed, try again later.
- pullFailTimer.Reset(pause)
- }
- case <-pullFailTimer.C:
- if f.puller.pull() {
- // We're good. Don't schedule another fail pull and reset
- // the pause interval.
- pause = f.basePause()
- continue
- }
- // Pulling failed, try again later.
- l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), pause)
- pullFailTimer.Reset(pause)
- // Back off from retrying to pull with an upper limit.
- if pause < 60*f.basePause() {
- pause *= 2
- }
- case <-initialCompleted:
- // Initial scan has completed, we should do a pull
- initialCompleted = nil // never hit this case again
- if !f.puller.pull() {
- // Pulling failed, try again later.
- pullFailTimer.Reset(pause)
- }
- // The reason for running the scanner from within the puller is that
- // this is the easiest way to make sure we are not doing both at the
- // same time.
- case <-f.scanTimer.C:
- l.Debugln(f, "Scanning subdirectories")
- f.scanTimerFired()
- case req := <-f.scanNow:
- req.err <- f.scanSubdirs(req.subdirs)
- case next := <-f.scanDelay:
- f.scanTimer.Reset(next)
- case fsEvents := <-f.watchChan:
- l.Debugln(f, "filesystem notification rescan")
- f.scanSubdirs(fsEvents)
- case <-f.restartWatchChan:
- f.restartWatch()
- }
- }
- }
- func (f *folder) BringToFront(string) {}
- func (f *folder) Override(fs *db.FileSet, updateFn func([]protocol.FileInfo)) {}
- func (f *folder) Revert(fs *db.FileSet, updateFn func([]protocol.FileInfo)) {}
- func (f *folder) DelayScan(next time.Duration) {
- f.Delay(next)
- }
- func (f *folder) IgnoresUpdated() {
- if f.FSWatcherEnabled {
- f.scheduleWatchRestart()
- }
- }
- func (f *folder) SchedulePull() {
- select {
- case f.pullScheduled <- struct{}{}:
- default:
- // We might be busy doing a pull and thus not reading from this
- // channel. The channel is 1-buffered, so one notification will be
- // queued to ensure we recheck after the pull, but beyond that we must
- // make sure to not block index receiving.
- }
- }
- func (f *folder) Jobs() ([]string, []string) {
- return nil, nil
- }
- func (f *folder) Scan(subdirs []string) error {
- <-f.initialScanFinished
- req := rescanRequest{
- subdirs: subdirs,
- err: make(chan error),
- }
- select {
- case f.scanNow <- req:
- return <-req.err
- case <-f.ctx.Done():
- return f.ctx.Err()
- }
- }
- func (f *folder) Reschedule() {
- if f.scanInterval == 0 {
- return
- }
- // Sleep a random time between 3/4 and 5/4 of the configured interval.
- sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
- interval := time.Duration(sleepNanos) * time.Nanosecond
- l.Debugln(f, "next rescan in", interval)
- f.scanTimer.Reset(interval)
- }
- func (f *folder) Delay(next time.Duration) {
- f.scanDelay <- next
- }
- func (f *folder) Stop() {
- f.cancel()
- <-f.stopped
- }
- // CheckHealth checks the folder for common errors, updates the folder state
- // and returns the current folder error, or nil if the folder is healthy.
- func (f *folder) CheckHealth() error {
- err := f.getHealthError()
- f.setError(err)
- return err
- }
- func (f *folder) getHealthError() error {
- // Check for folder errors, with the most serious and specific first and
- // generic ones like out of space on the home disk later.
- if err := f.CheckPath(); err != nil {
- return err
- }
- if err := f.model.cfg.CheckHomeFreeSpace(); err != nil {
- return err
- }
- return nil
- }
- func (f *folder) scanSubdirs(subDirs []string) error {
- if err := f.model.internalScanFolderSubdirs(f.ctx, f.folderID, subDirs, f.localFlags); err != nil {
- // Potentially sets the error twice, once in the scanner just
- // by doing a check, and once here, if the error returned is
- // the same one as returned by CheckHealth, though
- // duplicate set is handled by setError.
- f.setError(err)
- return err
- }
- return nil
- }
- func (f *folder) scanTimerFired() {
- err := f.scanSubdirs(nil)
- select {
- case <-f.initialScanFinished:
- default:
- status := "Completed"
- if err != nil {
- status = "Failed"
- }
- l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
- close(f.initialScanFinished)
- }
- f.Reschedule()
- }
- func (f *folder) WatchError() error {
- f.watchErrMut.Lock()
- defer f.watchErrMut.Unlock()
- return f.watchErr
- }
- // stopWatch immediately aborts watching and may be called asynchronously
- func (f *folder) stopWatch() {
- f.watchCancel()
- f.watchErrMut.Lock()
- prevErr := f.watchErr
- f.watchErr = errWatchNotStarted
- f.watchErrMut.Unlock()
- if prevErr != errWatchNotStarted {
- data := map[string]interface{}{
- "folder": f.ID,
- "to": errWatchNotStarted.Error(),
- }
- if prevErr != nil {
- data["from"] = prevErr.Error()
- }
- events.Default.Log(events.FolderWatchStateChanged, data)
- }
- }
- // scheduleWatchRestart makes sure watching is restarted from the main for loop
- // in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
- func (f *folder) scheduleWatchRestart() {
- select {
- case f.restartWatchChan <- struct{}{}:
- default:
- // We might be busy doing a pull and thus not reading from this
- // channel. The channel is 1-buffered, so one notification will be
- // queued to ensure we recheck after the pull.
- }
- }
- // restartWatch should only ever be called synchronously. If you want to use
- // this asynchronously, you should probably use scheduleWatchRestart instead.
- func (f *folder) restartWatch() {
- f.stopWatch()
- f.startWatch()
- f.scanSubdirs(nil)
- }
- // startWatch should only ever be called synchronously. If you want to use
- // this asynchronously, you should probably use scheduleWatchRestart instead.
- func (f *folder) startWatch() {
- ctx, cancel := context.WithCancel(f.ctx)
- f.model.fmut.RLock()
- ignores := f.model.folderIgnores[f.folderID]
- f.model.fmut.RUnlock()
- f.watchChan = make(chan []string)
- f.watchCancel = cancel
- go f.startWatchAsync(ctx, ignores)
- }
- // startWatchAsync tries to start the filesystem watching and retries every minute on failure.
- // It is a convenience function that should not be used except in startWatch.
- func (f *folder) startWatchAsync(ctx context.Context, ignores *ignore.Matcher) {
- timer := time.NewTimer(0)
- for {
- select {
- case <-timer.C:
- eventChan, err := f.Filesystem().Watch(".", ignores, ctx, f.IgnorePerms)
- f.watchErrMut.Lock()
- prevErr := f.watchErr
- f.watchErr = err
- f.watchErrMut.Unlock()
- if err != prevErr {
- data := map[string]interface{}{
- "folder": f.ID,
- }
- if prevErr != nil {
- data["from"] = prevErr.Error()
- }
- if err != nil {
- data["to"] = err.Error()
- }
- events.Default.Log(events.FolderWatchStateChanged, data)
- }
- if err != nil {
- if prevErr == errWatchNotStarted {
- l.Infof("Error while trying to start filesystem watcher for folder %s, trying again in 1min: %v", f.Description(), err)
- } else {
- l.Debugf("Repeat error while trying to start filesystem watcher for folder %s, trying again in 1min: %v", f.Description(), err)
- }
- timer.Reset(time.Minute)
- continue
- }
- watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, ctx)
- l.Debugln("Started filesystem watcher for folder", f.Description())
- return
- case <-ctx.Done():
- return
- }
- }
- }
- func (f *folder) setError(err error) {
- _, _, oldErr := f.getState()
- if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
- return
- }
- if err != nil {
- if oldErr == nil {
- l.Warnf("Error on folder %s: %v", f.Description(), err)
- } else {
- l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
- }
- } else {
- l.Infoln("Cleared error on folder", f.Description())
- }
- if f.FSWatcherEnabled {
- if err != nil {
- f.stopWatch()
- } else {
- f.scheduleWatchRestart()
- }
- }
- f.stateTracker.setError(err)
- }
- func (f *folder) basePause() time.Duration {
- if f.PullerPauseS == 0 {
- return defaultPullerPause
- }
- return time.Duration(f.PullerPauseS) * time.Second
- }
- func (f *folder) String() string {
- return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
- }
|