Browse Source

Merge pull request #1226 from syncthing/deregister-fix

All roads lead to Finisher (fixes #1201)
Jakob Borg 11 years ago
parent
commit
219ef996f5

+ 0 - 2
internal/model/progressemitter_test.go

@@ -47,8 +47,6 @@ func expectTimeout(w *events.Subscription, t *testing.T) {
 }
 
 func TestProgressEmitter(t *testing.T) {
-	l.Debugln("test progress emitter")
-
 	w := events.Default.Subscribe(events.DownloadProgress)
 
 	c := config.Wrap("/tmp/test", config.Configuration{})

+ 14 - 12
internal/model/puller.go

@@ -610,17 +610,17 @@ func (p *Puller) shortcutSymlink(curFile, file protocol.FileInfo) {
 func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
 	buf := make([]byte, protocol.BlockSize)
 
-nextFile:
 	for state := range in {
+		if p.progressEmitter != nil {
+			p.progressEmitter.Register(state.sharedPullerState)
+		}
+
 		dstFd, err := state.tempFile()
 		if err != nil {
 			// Nothing more to do for this failed file (the error was logged
 			// when it happened)
-			continue nextFile
-		}
-
-		if p.progressEmitter != nil {
-			p.progressEmitter.Register(state.sharedPullerState)
+			out <- state.sharedPullerState
+			continue
 		}
 
 		evictionChan := make(chan lfu.Eviction)
@@ -684,7 +684,7 @@ nextFile:
 
 				_, err = dstFd.WriteAt(buf, block.Offset)
 				if err != nil {
-					state.earlyClose("dst write", err)
+					state.fail("dst write", err)
 				}
 				if file == state.file.Name {
 					state.copiedFromOrigin()
@@ -736,9 +736,9 @@ func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPulle
 			selected := activity.leastBusy(potentialDevices)
 			if selected == (protocol.DeviceID{}) {
 				if lastError != nil {
-					state.earlyClose("pull", lastError)
+					state.fail("pull", lastError)
 				} else {
-					state.earlyClose("pull", errNoDevice)
+					state.fail("pull", errNoDevice)
 				}
 				break
 			}
@@ -764,13 +764,13 @@ func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPulle
 			// Save the block data we got from the cluster
 			_, err = fd.WriteAt(buf, state.block.Offset)
 			if err != nil {
-				state.earlyClose("save", err)
+				state.fail("save", err)
 			} else {
 				state.pullDone()
-				out <- state.sharedPullerState
 			}
 			break
 		}
+		out <- state.sharedPullerState
 	}
 }
 
@@ -860,7 +860,9 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
 			}
 
 			p.queue.Done(state.file.Name)
-			p.performFinish(state)
+			if state.failed() == nil {
+				p.performFinish(state)
+			}
 			p.model.receivedFile(p.folder, state.file.Name)
 			if p.progressEmitter != nil {
 				p.progressEmitter.Deregister(state)

+ 169 - 0
internal/model/puller_test.go

@@ -382,3 +382,172 @@ func TestLastResortPulling(t *testing.T) {
 	(<-finisherChan).fd.Close()
 	os.Remove(filepath.Join("testdata", defTempNamer.TempName("newfile")))
 }
+
+func TestDeregisterOnFailInCopy(t *testing.T) {
+	file := protocol.FileInfo{
+		Name:     "filex",
+		Flags:    0,
+		Modified: 0,
+		Blocks: []protocol.BlockInfo{
+			blocks[0], blocks[2], blocks[0], blocks[0],
+			blocks[5], blocks[0], blocks[0], blocks[8],
+		},
+	}
+	defer os.Remove(defTempNamer.TempName("filex"))
+
+	db, _ := leveldb.Open(storage.NewMemStorage(), nil)
+	cw := config.Wrap("/tmp/test", config.Configuration{})
+	m := NewModel(cw, "device", "syncthing", "dev", db)
+	m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+
+	emitter := NewProgressEmitter(cw)
+	go emitter.Serve()
+
+	p := Puller{
+		folder:          "default",
+		dir:             "testdata",
+		model:           m,
+		queue:           newJobQueue(),
+		progressEmitter: emitter,
+	}
+
+	// queue.Done should be called by the finisher routine
+	p.queue.Push("filex")
+	p.queue.Pop()
+
+	if len(p.queue.progress) != 1 {
+		t.Fatal("Expected file in progress")
+	}
+
+	copyChan := make(chan copyBlocksState)
+	pullChan := make(chan pullBlockState)
+	finisherBufferChan := make(chan *sharedPullerState)
+	finisherChan := make(chan *sharedPullerState)
+
+	go p.copierRoutine(copyChan, pullChan, finisherBufferChan)
+	go p.finisherRoutine(finisherChan)
+
+	p.handleFile(file, copyChan, finisherChan)
+
+	// Receive a block at puller, to indicate that atleast a single copier
+	// loop has been performed.
+	toPull := <-pullChan
+	// Wait until copier is trying to pass something down to the puller again
+	time.Sleep(100 * time.Millisecond)
+	// Close the file
+	toPull.sharedPullerState.fail("test", os.ErrNotExist)
+	// Unblock copier
+	<-pullChan
+
+	select {
+	case state := <-finisherBufferChan:
+		// At this point the file should still be registered with both the job
+		// queue, and the progress emitter. Verify this.
+		if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 {
+			t.Fatal("Could not find file")
+		}
+
+		// Pass the file down the real finisher, and give it time to consume
+		finisherChan <- state
+		time.Sleep(100 * time.Millisecond)
+
+		if state.fd != nil {
+			t.Fatal("File not closed?")
+		}
+
+		if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
+			t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued))
+		}
+
+		// Doing it again should have no effect
+		finisherChan <- state
+		time.Sleep(100 * time.Millisecond)
+
+		if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
+			t.Fatal("Still registered")
+		}
+	case <-time.After(time.Second):
+		t.Fatal("Didn't get anything to the finisher")
+	}
+}
+
+func TestDeregisterOnFailInPull(t *testing.T) {
+	file := protocol.FileInfo{
+		Name:     "filex",
+		Flags:    0,
+		Modified: 0,
+		Blocks: []protocol.BlockInfo{
+			blocks[0], blocks[2], blocks[0], blocks[0],
+			blocks[5], blocks[0], blocks[0], blocks[8],
+		},
+	}
+	defer os.Remove(defTempNamer.TempName("filex"))
+
+	db, _ := leveldb.Open(storage.NewMemStorage(), nil)
+	cw := config.Wrap("/tmp/test", config.Configuration{})
+	m := NewModel(cw, "device", "syncthing", "dev", db)
+	m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+
+	emitter := NewProgressEmitter(cw)
+	go emitter.Serve()
+
+	p := Puller{
+		folder:          "default",
+		dir:             "testdata",
+		model:           m,
+		queue:           newJobQueue(),
+		progressEmitter: emitter,
+	}
+
+	// queue.Done should be called by the finisher routine
+	p.queue.Push("filex")
+	p.queue.Pop()
+
+	if len(p.queue.progress) != 1 {
+		t.Fatal("Expected file in progress")
+	}
+
+	copyChan := make(chan copyBlocksState)
+	pullChan := make(chan pullBlockState)
+	finisherBufferChan := make(chan *sharedPullerState)
+	finisherChan := make(chan *sharedPullerState)
+
+	go p.copierRoutine(copyChan, pullChan, finisherBufferChan)
+	go p.pullerRoutine(pullChan, finisherBufferChan)
+	go p.finisherRoutine(finisherChan)
+
+	p.handleFile(file, copyChan, finisherChan)
+
+	// Receove at finisher, we shoud error out as puller has nowhere to pull
+	// from.
+	select {
+	case state := <-finisherBufferChan:
+		// At this point the file should still be registered with both the job
+		// queue, and the progress emitter. Verify this.
+		if len(p.progressEmitter.registry) != 1 || len(p.queue.progress) != 1 || len(p.queue.queued) != 0 {
+			t.Fatal("Could not find file")
+		}
+
+		// Pass the file down the real finisher, and give it time to consume
+		finisherChan <- state
+		time.Sleep(100 * time.Millisecond)
+
+		if state.fd != nil {
+			t.Fatal("File not closed?")
+		}
+
+		if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
+			t.Fatal("Still registered", len(p.progressEmitter.registry), len(p.queue.progress), len(p.queue.queued))
+		}
+
+		// Doing it again should have no effect
+		finisherChan <- state
+		time.Sleep(100 * time.Millisecond)
+
+		if len(p.progressEmitter.registry) != 0 || len(p.queue.progress) != 0 || len(p.queue.queued) != 0 {
+			t.Fatal("Still registered")
+		}
+	case <-time.After(time.Second):
+		t.Fatal("Didn't get anything to the finisher")
+	}
+}

+ 9 - 19
internal/model/sharedpullerstate.go

@@ -43,7 +43,6 @@ type sharedPullerState struct {
 	copyOrigin uint32     // Number of blocks copied from the original file
 	copyNeeded uint32     // Number of copy actions still pending
 	pullNeeded uint32     // Number of block pulls still pending
-	closed     bool       // Set when the file has been closed
 	mut        sync.Mutex // Protects the above
 }
 
@@ -93,7 +92,7 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
 	// here.
 	dir := filepath.Dir(s.tempName)
 	if info, err := os.Stat(dir); err != nil {
-		s.earlyCloseLocked("dst stat dir", err)
+		s.failLocked("dst stat dir", err)
 		return nil, err
 	} else if info.Mode()&0200 == 0 {
 		err := os.Chmod(dir, 0755)
@@ -119,13 +118,13 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
 		// make sure we have write permissions on the file before opening it.
 		err := os.Chmod(s.tempName, 0644)
 		if err != nil {
-			s.earlyCloseLocked("dst create chmod", err)
+			s.failLocked("dst create chmod", err)
 			return nil, err
 		}
 	}
 	fd, err := os.OpenFile(s.tempName, flags, 0644)
 	if err != nil {
-		s.earlyCloseLocked("dst create", err)
+		s.failLocked("dst create", err)
 		return nil, err
 	}
 
@@ -148,7 +147,7 @@ func (s *sharedPullerState) sourceFile() (*os.File, error) {
 	// Attempt to open the existing file
 	fd, err := os.Open(s.realName)
 	if err != nil {
-		s.earlyCloseLocked("src open", err)
+		s.failLocked("src open", err)
 		return nil, err
 	}
 
@@ -158,24 +157,20 @@ func (s *sharedPullerState) sourceFile() (*os.File, error) {
 // earlyClose prints a warning message composed of the context and
 // error, and marks the sharedPullerState as failed. Is a no-op when called on
 // an already failed state.
-func (s *sharedPullerState) earlyClose(context string, err error) {
+func (s *sharedPullerState) fail(context string, err error) {
 	s.mut.Lock()
 	defer s.mut.Unlock()
 
-	s.earlyCloseLocked(context, err)
+	s.failLocked(context, err)
 }
 
-func (s *sharedPullerState) earlyCloseLocked(context string, err error) {
+func (s *sharedPullerState) failLocked(context string, err error) {
 	if s.err != nil {
 		return
 	}
 
 	l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
 	s.err = err
-	if s.fd != nil {
-		s.fd.Close()
-	}
-	s.closed = true
 }
 
 func (s *sharedPullerState) failed() error {
@@ -230,21 +225,16 @@ func (s *sharedPullerState) finalClose() (bool, error) {
 	s.mut.Lock()
 	defer s.mut.Unlock()
 
-	if s.pullNeeded+s.copyNeeded != 0 {
+	if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
 		// Not done yet.
 		return false, nil
 	}
-	if s.closed {
-		// Already handled.
-		return false, nil
-	}
 
-	s.closed = true
 	if fd := s.fd; fd != nil {
 		s.fd = nil
 		return true, fd.Close()
 	}
-	return true, nil
+	return false, nil
 }
 
 // Returns the momentarily progress for the puller

+ 1 - 1
internal/model/sharedpullerstate_test.go

@@ -86,5 +86,5 @@ func TestReadOnlyDir(t *testing.T) {
 		t.Fatal("Unexpected nil fd")
 	}
 
-	s.earlyClose("Test done", nil)
+	s.fail("Test done", nil)
 }