Browse Source

Serialize scans and pulls (fixes #1391)

Jakob Borg 10 years ago
parent
commit
f73d5a9ab2
4 changed files with 216 additions and 4 deletions
  1. 16 0
      internal/model/model.go
  2. 41 2
      internal/model/rofolder.go
  3. 36 2
      internal/model/rwfolder.go
  4. 123 0
      test/sync_test.go

+ 16 - 0
internal/model/model.go

@@ -55,6 +55,7 @@ type service interface {
 	BringToFront(string)
 	DelayScan(d time.Duration)
 	IndexUpdated() // Remote index was updated notification
+	Scan(subs []string) error
 
 	setState(state folderState)
 	setError(err error)
@@ -1226,6 +1227,21 @@ func (m *Model) ScanFolder(folder string) error {
 }
 
 func (m *Model) ScanFolderSubs(folder string, subs []string) error {
+	m.fmut.Lock()
+	runner, ok := m.folderRunners[folder]
+	m.fmut.Unlock()
+
+	// Folders are added to folderRunners only when they are started. We can't
+	// scan them before they have started, so that's what we need to check for
+	// here.
+	if !ok {
+		return errors.New("no such folder")
+	}
+
+	return runner.Scan(subs)
+}
+
+func (m *Model) internalScanFolderSubs(folder string, subs []string) error {
 	for i, sub := range subs {
 		sub = osutil.NativeFilename(sub)
 		if p := filepath.Clean(filepath.Join(folder, sub)); !strings.HasPrefix(p, folder) {

+ 41 - 2
internal/model/rofolder.go

@@ -22,9 +22,15 @@ type roFolder struct {
 	timer     *time.Timer
 	model     *Model
 	stop      chan struct{}
+	scanNow   chan rescanRequest
 	delayScan chan time.Duration
 }
 
+type rescanRequest struct {
+	subs []string
+	err  chan error
+}
+
 func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
 	return &roFolder{
 		stateTracker: stateTracker{
@@ -36,6 +42,7 @@ func newROFolder(model *Model, folder string, interval time.Duration) *roFolder
 		timer:     time.NewTimer(time.Millisecond),
 		model:     model,
 		stop:      make(chan struct{}),
+		scanNow:   make(chan rescanRequest),
 		delayScan: make(chan time.Duration),
 	}
 }
@@ -76,7 +83,7 @@ func (s *roFolder) Serve() {
 				l.Debugln(s, "rescan")
 			}
 
-			if err := s.model.ScanFolder(s.folder); err != nil {
+			if err := s.model.internalScanFolderSubs(s.folder, nil); err != nil {
 				// Potentially sets the error twice, once in the scanner just
 				// by doing a check, and once here, if the error returned is
 				// the same one as returned by CheckFolderHealth, though
@@ -92,11 +99,34 @@ func (s *roFolder) Serve() {
 			}
 
 			if s.intv == 0 {
-				return
+				continue
 			}
 
 			reschedule()
 
+		case req := <-s.scanNow:
+			if err := s.model.CheckFolderHealth(s.folder); err != nil {
+				l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err)
+				req.err <- err
+				continue
+			}
+
+			if debug {
+				l.Debugln(s, "forced rescan")
+			}
+
+			if err := s.model.internalScanFolderSubs(s.folder, req.subs); err != nil {
+				// Potentially sets the error twice, once in the scanner just
+				// by doing a check, and once here, if the error returned is
+				// the same one as returned by CheckFolderHealth, though
+				// duplicate set is handled by setError.
+				s.setError(err)
+				req.err <- err
+				continue
+			}
+
+			req.err <- nil
+
 		case next := <-s.delayScan:
 			s.timer.Reset(next)
 		}
@@ -110,6 +140,15 @@ func (s *roFolder) Stop() {
 func (s *roFolder) IndexUpdated() {
 }
 
+func (s *roFolder) Scan(subs []string) error {
+	req := rescanRequest{
+		subs: subs,
+		err:  make(chan error),
+	}
+	s.scanNow <- req
+	return <-req.err
+}
+
 func (s *roFolder) String() string {
 	return fmt.Sprintf("roFolder/%s@%p", s.folder, s)
 }

+ 36 - 2
internal/model/rwfolder.go

@@ -32,7 +32,7 @@ import (
 const (
 	pauseIntv     = 60 * time.Second
 	nextPullIntv  = 10 * time.Second
-	shortPullIntv = 5 * time.Second
+	shortPullIntv = time.Second
 )
 
 // A pullBlockState is passed to the puller routine for each block that needs
@@ -90,6 +90,7 @@ type rwFolder struct {
 	scanTimer   *time.Timer
 	pullTimer   *time.Timer
 	delayScan   chan time.Duration
+	scanNow     chan rescanRequest
 	remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
 }
 
@@ -118,6 +119,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
 		pullTimer:   time.NewTimer(shortPullIntv),
 		scanTimer:   time.NewTimer(time.Millisecond), // The first scan should be done immediately.
 		delayScan:   make(chan time.Duration),
+		scanNow:     make(chan rescanRequest),
 		remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
 	}
 }
@@ -278,7 +280,7 @@ func (p *rwFolder) Serve() {
 				l.Debugln(p, "rescan")
 			}
 
-			if err := p.model.ScanFolder(p.folder); err != nil {
+			if err := p.model.internalScanFolderSubs(p.folder, nil); err != nil {
 				// Potentially sets the error twice, once in the scanner just
 				// by doing a check, and once here, if the error returned is
 				// the same one as returned by CheckFolderHealth, though
@@ -296,6 +298,29 @@ func (p *rwFolder) Serve() {
 				initialScanCompleted = true
 			}
 
+		case req := <-p.scanNow:
+			if err := p.model.CheckFolderHealth(p.folder); err != nil {
+				l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
+				req.err <- err
+				continue
+			}
+
+			if debug {
+				l.Debugln(p, "forced rescan")
+			}
+
+			if err := p.model.internalScanFolderSubs(p.folder, req.subs); err != nil {
+				// Potentially sets the error twice, once in the scanner just
+				// by doing a check, and once here, if the error returned is
+				// the same one as returned by CheckFolderHealth, though
+				// duplicate set is handled by setError.
+				p.setError(err)
+				req.err <- err
+				continue
+			}
+
+			req.err <- nil
+
 		case next := <-p.delayScan:
 			p.scanTimer.Reset(next)
 		}
@@ -317,6 +342,15 @@ func (p *rwFolder) IndexUpdated() {
 	}
 }
 
+func (p *rwFolder) Scan(subs []string) error {
+	req := rescanRequest{
+		subs: subs,
+		err:  make(chan error),
+	}
+	p.scanNow <- req
+	return <-req.err
+}
+
 func (p *rwFolder) String() string {
 	return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
 }

+ 123 - 0
test/sync_test.go

@@ -10,6 +10,7 @@ package integration
 
 import (
 	"fmt"
+	"io/ioutil"
 	"log"
 	"math/rand"
 	"os"
@@ -82,6 +83,18 @@ func TestSyncClusterStaggeredVersioning(t *testing.T) {
 	testSyncCluster(t)
 }
 
+func TestSyncClusterForcedRescan(t *testing.T) {
+	// Use no versioning
+	id, _ := protocol.DeviceIDFromString(id2)
+	cfg, _ := config.Load("h2/config.xml", id)
+	fld := cfg.Folders()["default"]
+	fld.Versioning = config.VersioningConfiguration{}
+	cfg.SetFolder(fld)
+	cfg.Save()
+
+	testSyncClusterForcedRescan(t)
+}
+
 func testSyncCluster(t *testing.T) {
 	// This tests syncing files back and forth between three cluster members.
 	// Their configs are in h1, h2 and h3. The folder "default" is shared
@@ -287,6 +300,116 @@ func testSyncCluster(t *testing.T) {
 	}
 }
 
+func testSyncClusterForcedRescan(t *testing.T) {
+	// During this test, we create 1K files, remove and then create them
+	// again. However, during these operations we will perform scan operations
+	// such that other nodes will retrieve these options while data is
+	// changing.
+
+	// When -short is passed, keep it more reasonable.
+	timeLimit := longTimeLimit
+	if testing.Short() {
+		timeLimit = shortTimeLimit
+	}
+
+	log.Println("Cleaning...")
+	err := removeAll("s1", "s12-1",
+		"s2", "s12-2", "s23-2",
+		"s3", "s23-3",
+		"h1/index*", "h2/index*", "h3/index*")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Create initial folder contents. All three devices have stuff in
+	// "default", which should be merged. The other two folders are initially
+	// empty on one side.
+
+	log.Println("Generating files...")
+	if err := os.MkdirAll("s1/test-stable-files", 0755); err != nil {
+		t.Fatal(err)
+	}
+	for i := 0; i < 1000; i++ {
+		name := fmt.Sprintf("s1/test-stable-files/%d", i)
+		if err := ioutil.WriteFile(name, []byte(time.Now().Format(time.RFC3339Nano)), 0644); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Prepare the expected state of folders after the sync
+	expected, err := directoryContents("s1")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Start the syncers
+	p0 := startInstance(t, 1)
+	defer checkedStop(t, p0)
+	p1 := startInstance(t, 2)
+	defer checkedStop(t, p1)
+	p2 := startInstance(t, 3)
+	defer checkedStop(t, p2)
+
+	p := []*rc.Process{p0, p1, p2}
+
+	start := time.Now()
+	for time.Since(start) < timeLimit {
+		rescan := func() {
+			for i := range p {
+				if err := p[i].Rescan("default"); err != nil {
+					t.Fatal(err)
+				}
+			}
+		}
+
+		log.Println("Forcing rescan...")
+		rescan()
+
+		// Sync stuff and verify it looks right
+		err = scSyncAndCompare(p, [][]fileInfo{expected})
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		log.Println("Altering...")
+
+		// Delete and recreate stable files while scanners and pullers are active
+		for i := 0; i < 1000; i++ {
+			name := fmt.Sprintf("s1/test-stable-files/%d", i)
+			if err := os.Remove(name); err != nil {
+				t.Fatal(err)
+			}
+			if rand.Intn(10) == 0 {
+				rescan()
+			}
+		}
+
+		rescan()
+
+		time.Sleep(50 * time.Millisecond)
+		for i := 0; i < 1000; i++ {
+			name := fmt.Sprintf("s1/test-stable-files/%d", i)
+			if err := ioutil.WriteFile(name, []byte(time.Now().Format(time.RFC3339Nano)), 0644); err != nil {
+				t.Fatal(err)
+			}
+			if rand.Intn(10) == 0 {
+				rescan()
+			}
+		}
+
+		rescan()
+
+		// Prepare the expected state of folders after the sync
+		expected, err = directoryContents("s1")
+		if err != nil {
+			t.Fatal(err)
+		}
+		if len(expected) != 1001 {
+			t.Fatal("s1 does not have 1001 files;", len(expected))
+		}
+	}
+}
+
 func scSyncAndCompare(p []*rc.Process, expected [][]fileInfo) error {
 	log.Println("Syncing...")