Browse Source

lib/model: Trigger pulls instead of pulling periodically

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4340
Simon Frei 8 years ago
parent
commit
f6ea2a7f8e
5 changed files with 139 additions and 126 deletions
  1. 0 1
      lib/config/folderconfiguration.go
  2. 3 2
      lib/model/folder.go
  3. 8 6
      lib/model/model.go
  4. 0 2
      lib/model/requests_test.go
  5. 128 115
      lib/model/rwfolder.go

+ 0 - 1
lib/config/folderconfiguration.go

@@ -42,7 +42,6 @@ type FolderConfiguration struct {
 	Order                 PullOrder                   `xml:"order" json:"order"`
 	IgnoreDelete          bool                        `xml:"ignoreDelete" json:"ignoreDelete"`
 	ScanProgressIntervalS int                         `xml:"scanProgressIntervalS" json:"scanProgressIntervalS"` // Set to a negative value to disable. Value of 0 will get replaced with value of 2 (default value)
-	PullerSleepS          int                         `xml:"pullerSleepS" json:"pullerSleepS"`
 	PullerPauseS          int                         `xml:"pullerPauseS" json:"pullerPauseS"`
 	MaxConflicts          int                         `xml:"maxConflicts" json:"maxConflicts"`
 	DisableSparseFiles    bool                        `xml:"disableSparseFiles" json:"disableSparseFiles"`

+ 3 - 2
lib/model/folder.go

@@ -50,8 +50,7 @@ func (f *folder) DelayScan(next time.Duration) {
 	f.scan.Delay(next)
 }
 
-func (f *folder) IndexUpdated() {
-}
+func (f *folder) IndexUpdated() {}
 
 func (f *folder) IgnoresUpdated() {
 	if f.FSWatcherEnabled {
@@ -59,6 +58,8 @@ func (f *folder) IgnoresUpdated() {
 	}
 }
 
+func (f *folder) SchedulePull() {}
+
 func (f *folder) Jobs() ([]string, []string) {
 	return nil, nil
 }

+ 8 - 6
lib/model/model.go

@@ -47,8 +47,9 @@ const (
 type service interface {
 	BringToFront(string)
 	DelayScan(d time.Duration)
-	IndexUpdated()              // Remote index was updated notification
-	IgnoresUpdated()            // ignore matcher was updated notification
+	IndexUpdated()   // Remote index was updated notification
+	IgnoresUpdated() // ignore matcher was updated notification
+	SchedulePull()
 	Jobs() ([]string, []string) // In progress, Queued
 	Scan(subs []string) error
 	Serve()
@@ -813,7 +814,7 @@ func (m *Model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.F
 	if runner != nil {
 		// Runner may legitimately not be set if this is the "cleanup" Index
 		// message at startup.
-		defer runner.IndexUpdated()
+		defer runner.SchedulePull()
 	}
 
 	m.pmut.RLock()
@@ -862,7 +863,7 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot
 		"version": files.Sequence(deviceID),
 	})
 
-	runner.IndexUpdated()
+	runner.SchedulePull()
 }
 
 func (m *Model) folderSharedWith(folder string, deviceID protocol.DeviceID) bool {
@@ -1002,7 +1003,7 @@ func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
 					// that we need to pull so let the folder runner know
 					// that it should recheck the index data.
 					if runner := m.folderRunners[folder.ID]; runner != nil {
-						defer runner.IndexUpdated()
+						defer runner.SchedulePull()
 					}
 				}
 			}
@@ -1853,7 +1854,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
 	// Check if the ignore patterns changed as part of scanning this folder.
 	// If they did we should schedule a pull of the folder so that we
 	// request things we might have suddenly become unignored and so on.
-
 	oldHash := ignores.Hash()
 	defer func() {
 		if ignores.Hash() != oldHash {
@@ -1879,6 +1879,8 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
 		return err
 	}
 
+	defer runner.SchedulePull()
+
 	// Clean the list of subitems to ensure that we start at a known
 	// directory, and don't scan subdirectories of things we've already
 	// scanned.

+ 0 - 2
lib/model/requests_test.go

@@ -216,7 +216,6 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) {
 
 	cfg := defaultConfig.RawCopy()
 	cfg.Folders[0] = config.NewFolderConfiguration("default", fs.FilesystemTypeBasic, "_tmpfolder")
-	cfg.Folders[0].PullerSleepS = 1
 	cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{
 		{DeviceID: device1},
 		{DeviceID: device2},
@@ -289,7 +288,6 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) {
 func setupModelWithConnection() (*Model, *fakeConnection) {
 	cfg := defaultConfig.RawCopy()
 	cfg.Folders[0] = config.NewFolderConfiguration("default", fs.FilesystemTypeBasic, "_tmpfolder")
-	cfg.Folders[0].PullerSleepS = 1
 	cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{
 		{DeviceID: device1},
 		{DeviceID: device2},

+ 128 - 115
lib/model/rwfolder.go

@@ -67,10 +67,10 @@ const (
 )
 
 const (
-	defaultCopiers     = 2
-	defaultPullers     = 64
-	defaultPullerSleep = 10 * time.Second
-	defaultPullerPause = 60 * time.Second
+	defaultCopiers      = 2
+	defaultPullers      = 64
+	defaultPullerPause  = 60 * time.Second
+	maxPullerIterations = 3
 )
 
 type dbUpdateJob struct {
@@ -83,13 +83,11 @@ type sendReceiveFolder struct {
 
 	fs        fs.Filesystem
 	versioner versioner.Versioner
-	sleep     time.Duration
 	pause     time.Duration
 
-	queue       *jobQueue
-	dbUpdates   chan dbUpdateJob
-	pullTimer   *time.Timer
-	remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
+	queue         *jobQueue
+	dbUpdates     chan dbUpdateJob
+	pullScheduled chan struct{}
 
 	errors    map[string]string // path -> error string
 	errorsMut sync.Mutex
@@ -105,9 +103,8 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers
 		fs:        fs,
 		versioner: ver,
 
-		queue:       newJobQueue(),
-		pullTimer:   time.NewTimer(time.Second),
-		remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
+		queue:         newJobQueue(),
+		pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
 
 		errorsMut: sync.NewMutex(),
 
@@ -128,17 +125,7 @@ func (f *sendReceiveFolder) configureCopiersAndPullers() {
 		f.Pullers = defaultPullers
 	}
 
-	if f.PullerPauseS == 0 {
-		f.pause = defaultPullerPause
-	} else {
-		f.pause = time.Duration(f.PullerPauseS) * time.Second
-	}
-
-	if f.PullerSleepS == 0 {
-		f.sleep = defaultPullerSleep
-	} else {
-		f.sleep = time.Duration(f.PullerSleepS) * time.Second
-	}
+	f.pause = f.basePause()
 }
 
 // Helper function to check whether either the ignorePerm flag has been
@@ -155,14 +142,16 @@ func (f *sendReceiveFolder) Serve() {
 	defer l.Debugln(f, "exiting")
 
 	defer func() {
-		f.pullTimer.Stop()
 		f.scan.timer.Stop()
 		// TODO: Should there be an actual FolderStopped state?
 		f.setState(FolderIdle)
 	}()
 
-	var prevSec int64
+	var prevSeq int64
 	var prevIgnoreHash string
+	var success bool
+	pullFailTimer := time.NewTimer(time.Duration(0))
+	<-pullFailTimer.C
 
 	if f.FSWatcherEnabled && f.CheckHealth() == nil {
 		f.startWatch()
@@ -173,102 +162,27 @@ func (f *sendReceiveFolder) Serve() {
 		case <-f.ctx.Done():
 			return
 
-		case <-f.remoteIndex:
-			prevSec = 0
-			f.pullTimer.Reset(0)
-			l.Debugln(f, "remote index updated, rescheduling pull")
-
-		case <-f.pullTimer.C:
+		case <-f.pullScheduled:
+			pullFailTimer.Stop()
 			select {
-			case <-f.initialScanFinished:
+			case <-pullFailTimer.C:
 			default:
-				// We don't start pulling files until a scan has been completed.
-				l.Debugln(f, "skip (initial)")
-				f.pullTimer.Reset(f.sleep)
-				continue
 			}
 
-			f.model.fmut.RLock()
-			curIgnores := f.model.folderIgnores[f.folderID]
-			f.model.fmut.RUnlock()
-
-			if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
-				// The ignore patterns have changed. We need to re-evaluate if
-				// there are files we need now that were ignored before.
-				l.Debugln(f, "ignore patterns have changed, resetting prevVer")
-				prevSec = 0
-				prevIgnoreHash = newHash
-			}
-
-			// RemoteSequence() is a fast call, doesn't touch the database.
-			curSeq, ok := f.model.RemoteSequence(f.folderID)
-			if !ok || curSeq == prevSec {
-				l.Debugln(f, "skip (curSeq == prevSeq)", prevSec, ok)
-				f.pullTimer.Reset(f.sleep)
-				continue
-			}
-
-			if err := f.CheckHealth(); err != nil {
-				l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
-				f.pullTimer.Reset(f.sleep)
-				continue
+			if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
+				// Pulling failed, try again later.
+				pullFailTimer.Reset(f.pause)
 			}
 
-			l.Debugln(f, "pulling", prevSec, curSeq)
-
-			f.setState(FolderSyncing)
-			f.clearErrors()
-			tries := 0
-
-			for {
-				tries++
-
-				changed := f.pullerIteration(curIgnores)
-				l.Debugln(f, "changed", changed)
-
-				if changed == 0 {
-					// No files were changed by the puller, so we are in
-					// sync. Remember the local version number and
-					// schedule a resync a little bit into the future.
-
-					if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < curSeq {
-						// There's a corner case where the device we needed
-						// files from disconnected during the puller
-						// iteration. The files will have been removed from
-						// the index, so we've concluded that we don't need
-						// them, but at the same time we have the local
-						// version that includes those files in curVer. So we
-						// catch the case that sequence might have
-						// decreased here.
-						l.Debugln(f, "adjusting curVer", lv)
-						curSeq = lv
-					}
-					prevSec = curSeq
-					l.Debugln(f, "next pull in", f.sleep)
-					f.pullTimer.Reset(f.sleep)
-					break
-				}
-
-				if tries > 2 {
-					// We've tried a bunch of times to get in sync, but
-					// we're not making it. Probably there are write
-					// errors preventing us. Flag this with a warning and
-					// wait a bit longer before retrying.
-					if folderErrors := f.currentErrors(); len(folderErrors) > 0 {
-						events.Default.Log(events.FolderErrors, map[string]interface{}{
-							"folder": f.folderID,
-							"errors": folderErrors,
-						})
-					}
-
-					l.Infof("Folder %v isn't making progress. Pausing puller for %v.", f.Description(), f.pause)
-					l.Debugln(f, "next pull in", f.pause)
-
-					f.pullTimer.Reset(f.pause)
-					break
+		case <-pullFailTimer.C:
+			if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
+				// Pulling failed, try again later.
+				pullFailTimer.Reset(f.pause)
+				// Back off from retrying to pull with an upper limit.
+				if f.pause < 60*f.basePause() {
+					f.pause *= 2
 				}
 			}
-			f.setState(FolderIdle)
 
 		// The reason for running the scanner from within the puller is that
 		// this is the easiest way to make sure we are not doing both at the
@@ -293,9 +207,9 @@ func (f *sendReceiveFolder) Serve() {
 	}
 }
 
-func (f *sendReceiveFolder) IndexUpdated() {
+func (f *sendReceiveFolder) SchedulePull() {
 	select {
-	case f.remoteIndex <- struct{}{}:
+	case f.pullScheduled <- struct{}{}:
 	default:
 		// We might be busy doing a pull and thus not reading from this
 		// channel. The channel is 1-buffered, so one notification will be
@@ -308,6 +222,98 @@ func (f *sendReceiveFolder) String() string {
 	return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f)
 }
 
+func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq int64, curIgnoreHash string, success bool) {
+	select {
+	case <-f.initialScanFinished:
+	default:
+		// Once the initial scan finished, a pull will be scheduled
+		return prevSeq, prevIgnoreHash, true
+	}
+
+	f.model.fmut.RLock()
+	curIgnores := f.model.folderIgnores[f.folderID]
+	f.model.fmut.RUnlock()
+
+	curSeq = prevSeq
+	if curIgnoreHash = curIgnores.Hash(); curIgnoreHash != prevIgnoreHash {
+		// The ignore patterns have changed. We need to re-evaluate if
+		// there are files we need now that were ignored before.
+		l.Debugln(f, "ignore patterns have changed, resetting curSeq")
+		curSeq = 0
+	}
+
+	// RemoteSequence() is a fast call, doesn't touch the database.
+	remoteSeq, ok := f.model.RemoteSequence(f.folderID)
+	if !ok || remoteSeq == curSeq {
+		l.Debugln(f, "skip (remoteSeq == curSeq)", curSeq, ok)
+		return curSeq, curIgnoreHash, true
+	}
+
+	if err := f.CheckHealth(); err != nil {
+		l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
+		return curSeq, curIgnoreHash, true
+	}
+
+	l.Debugln(f, "pulling", curSeq, remoteSeq)
+
+	f.setState(FolderSyncing)
+	f.clearErrors()
+	var changed int
+	tries := 0
+
+	for {
+		tries++
+
+		changed = f.pullerIteration(curIgnores)
+		l.Debugln(f, "changed", changed)
+
+		if changed == 0 {
+			// No files were changed by the puller, so we are in
+			// sync. Update the local version number.
+
+			if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < remoteSeq {
+				// There's a corner case where the device we needed
+				// files from disconnected during the puller
+				// iteration. The files will have been removed from
+				// the index, so we've concluded that we don't need
+				// them, but at the same time we have the old remote sequence
+				// that includes those files in remoteSeq. So we
+				// catch the case that this sequence might have
+				// decreased here.
+				l.Debugf("%v adjusting remoteSeq from %d to %d", remoteSeq, lv)
+				remoteSeq = lv
+			}
+			curSeq = remoteSeq
+
+			f.pause = f.basePause()
+
+			break
+		}
+
+		if tries == maxPullerIterations {
+			// We've tried a bunch of times to get in sync, but
+			// we're not making it. Probably there are write
+			// errors preventing us. Flag this with a warning and
+			// wait a bit longer before retrying.
+			if folderErrors := f.currentErrors(); len(folderErrors) > 0 {
+				events.Default.Log(events.FolderErrors, map[string]interface{}{
+					"folder": f.folderID,
+					"errors": folderErrors,
+				})
+			}
+
+			l.Infof("Folder %v isn't making progress. Pausing puller for %v.", f.Description(), f.pause)
+			l.Debugln(f, "next pull in", f.pause)
+
+			break
+		}
+	}
+
+	f.setState(FolderIdle)
+
+	return curSeq, curIgnoreHash, changed == 0
+}
+
 // pullerIteration runs a single puller iteration for the given folder and
 // returns the number items that should have been synced (even those that
 // might have failed). One puller iteration handles all files currently
@@ -1693,6 +1699,13 @@ func (f *sendReceiveFolder) currentErrors() []fileError {
 	return errors
 }
 
+func (f *sendReceiveFolder) basePause() time.Duration {
+	if f.PullerPauseS == 0 {
+		return defaultPullerPause
+	}
+	return time.Duration(f.PullerPauseS) * time.Second
+}
+
 func (f *sendReceiveFolder) IgnoresUpdated() {
 	f.folder.IgnoresUpdated()
 	f.IndexUpdated()