Browse Source

Trigger pull check on remote index updates (fixes #1765)

Without this, when an index update comes in we only do a new pull if the
remote `localVersion` was increased. But it may not be, because the
index is sent alphabetically and the file with the highest local version
may come first. In that case we'll never do a new pull when the rest of
the index comes in, and we'll be stuck in idle but with lots of out of
sync data.
Jakob Borg 10 years ago
parent
commit
245bd1eb17
3 changed files with 48 additions and 19 deletions
  1. 12 1
      internal/model/model.go
  2. 3 0
      internal/model/rofolder.go
  3. 33 18
      internal/model/rwfolder.go

+ 12 - 1
internal/model/model.go

@@ -50,6 +50,7 @@ type service interface {
 	Jobs() ([]string, []string) // In progress, Queued
 	BringToFront(string)
 	DelayScan(d time.Duration)
+	IndexUpdated() // Remote index was updated notification
 
 	setState(state folderState)
 	setError(err error)
@@ -469,8 +470,15 @@ func (m *Model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.F
 
 	m.fmut.RLock()
 	files, ok := m.folderFiles[folder]
+	runner := m.folderRunners[folder]
 	m.fmut.RUnlock()
 
+	if runner != nil {
+		// Runner may legitimately not be set if this is the "cleanup" Index
+		// message at startup.
+		defer runner.IndexUpdated()
+	}
+
 	if !ok {
 		l.Fatalf("Index for nonexistant folder %q", folder)
 	}
@@ -521,7 +529,8 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot
 	}
 
 	m.fmut.RLock()
-	files, ok := m.folderFiles[folder]
+	files := m.folderFiles[folder]
+	runner, ok := m.folderRunners[folder]
 	m.fmut.RUnlock()
 
 	if !ok {
@@ -554,6 +563,8 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot
 		"items":   len(fs),
 		"version": files.LocalVersion(deviceID),
 	})
+
+	runner.IndexUpdated()
 }
 
 func (m *Model) folderSharedWith(folder string, deviceID protocol.DeviceID) bool {

+ 3 - 0
internal/model/rofolder.go

@@ -104,6 +104,9 @@ func (s *roFolder) Stop() {
 	close(s.stop)
 }
 
+func (s *roFolder) IndexUpdated() {
+}
+
 func (s *roFolder) String() string {
 	return fmt.Sprintf("roFolder/%s@%p", s.folder, s)
 }

+ 33 - 18
internal/model/rwfolder.go

@@ -32,7 +32,7 @@ import (
 const (
 	pauseIntv     = 60 * time.Second
 	nextPullIntv  = 10 * time.Second
-	checkPullIntv = 1 * time.Second
+	shortPullIntv = 5 * time.Second
 )
 
 // A pullBlockState is passed to the puller routine for each block that needs
@@ -71,12 +71,13 @@ type rwFolder struct {
 	shortID       uint64
 	order         config.PullOrder
 
-	stop      chan struct{}
-	queue     *jobQueue
-	dbUpdates chan protocol.FileInfo
-	scanTimer *time.Timer
-	pullTimer *time.Timer
-	delayScan chan time.Duration
+	stop        chan struct{}
+	queue       *jobQueue
+	dbUpdates   chan protocol.FileInfo
+	scanTimer   *time.Timer
+	pullTimer   *time.Timer
+	delayScan   chan time.Duration
+	remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
 }
 
 func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
@@ -99,11 +100,12 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
 		shortID:       shortID,
 		order:         cfg.Order,
 
-		stop:      make(chan struct{}),
-		queue:     newJobQueue(),
-		pullTimer: time.NewTimer(checkPullIntv),
-		scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
-		delayScan: make(chan time.Duration),
+		stop:        make(chan struct{}),
+		queue:       newJobQueue(),
+		pullTimer:   time.NewTimer(shortPullIntv),
+		scanTimer:   time.NewTimer(time.Millisecond), // The first scan should be done immediately.
+		delayScan:   make(chan time.Duration),
+		remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
 	}
 }
 
@@ -149,11 +151,13 @@ func (p *rwFolder) Serve() {
 		case <-p.stop:
 			return
 
-		// TODO: We could easily add a channel here for notifications from
-		// Index(), so that we immediately start a pull when new index
-		// information is available. Before that though, I'd like to build a
-		// repeatable benchmark of how long it takes to sync a change from
-		// device A to device B, so we have something to work against.
+		case <-p.remoteIndex:
+			prevVer = 0
+			p.pullTimer.Reset(shortPullIntv)
+			if debug {
+				l.Debugln(p, "remote index updated, rescheduling pull")
+			}
+
 		case <-p.pullTimer.C:
 			if !initialScanCompleted {
 				if debug {
@@ -183,7 +187,7 @@ func (p *rwFolder) Serve() {
 				if debug {
 					l.Debugln(p, "skip (curVer == prevVer)", prevVer)
 				}
-				p.pullTimer.Reset(checkPullIntv)
+				p.pullTimer.Reset(nextPullIntv)
 				continue
 			}
 
@@ -282,6 +286,17 @@ func (p *rwFolder) Stop() {
 	close(p.stop)
 }
 
+func (p *rwFolder) IndexUpdated() {
+	select {
+	case p.remoteIndex <- struct{}{}:
+	default:
+		// We might be busy doing a pull and thus not reading from this
+		// channel. The channel is 1-buffered, so one notification will be
+		// queued to ensure we recheck after the pull, but beyond that we must
+		// make sure to not block index receiving.
+	}
+}
+
 func (p *rwFolder) String() string {
 	return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
 }