浏览代码

Hash blocks after receipt, try multiple peers (fixes #1166)

Audrius Butkevicius 10 年之前
父节点
当前提交
5ac01a3af4
共有 5 个文件被更改,包括 90 次插入71 次删除
  1. 1 1
      internal/model/model.go
  2. 65 63
      internal/model/puller.go
  3. 5 6
      internal/model/puller_test.go
  4. 18 0
      internal/scanner/blocks.go
  5. 1 1
      test/sync_test.go

+ 1 - 1
internal/model/model.go

@@ -1369,7 +1369,7 @@ func (m *Model) RemoteLocalVersion(folder string) uint64 {
 
 
 func (m *Model) availability(folder, file string) []protocol.DeviceID {
 func (m *Model) availability(folder, file string) []protocol.DeviceID {
 	// Acquire this lock first, as the value returned from foldersFiles can
 	// Acquire this lock first, as the value returned from foldersFiles can
-	// gen heavily modified on Close()
+	// get heavily modified on Close()
 	m.pmut.RLock()
 	m.pmut.RLock()
 	defer m.pmut.RUnlock()
 	defer m.pmut.RUnlock()
 
 

+ 65 - 63
internal/model/puller.go

@@ -16,8 +16,6 @@
 package model
 package model
 
 
 import (
 import (
-	"bytes"
-	"crypto/sha256"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"io/ioutil"
 	"io/ioutil"
@@ -158,16 +156,10 @@ loop:
 			}
 			}
 			p.model.setState(p.folder, FolderSyncing)
 			p.model.setState(p.folder, FolderSyncing)
 			tries := 0
 			tries := 0
-			checksum := false
 			for {
 			for {
 				tries++
 				tries++
-				// Last resort mode, to get around corrupt/invalid block maps.
-				if tries == 10 {
-					l.Infoln("Desperation mode ON")
-					checksum = true
-				}
 
 
-				changed := p.pullerIteration(checksum, curIgnores)
+				changed := p.pullerIteration(curIgnores)
 				if debug {
 				if debug {
 					l.Debugln(p, "changed", changed)
 					l.Debugln(p, "changed", changed)
 				}
 				}
@@ -255,7 +247,7 @@ func (p *Puller) String() string {
 // returns the number items that should have been synced (even those that
 // returns the number items that should have been synced (even those that
 // might have failed). One puller iteration handles all files currently
 // might have failed). One puller iteration handles all files currently
 // flagged as needed in the folder.
 // flagged as needed in the folder.
-func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int {
+func (p *Puller) pullerIteration(ignores *ignore.Matcher) int {
 	pullChan := make(chan pullBlockState)
 	pullChan := make(chan pullBlockState)
 	copyChan := make(chan copyBlocksState)
 	copyChan := make(chan copyBlocksState)
 	finisherChan := make(chan *sharedPullerState)
 	finisherChan := make(chan *sharedPullerState)
@@ -272,7 +264,7 @@ func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int {
 		copyWg.Add(1)
 		copyWg.Add(1)
 		go func() {
 		go func() {
 			// copierRoutine finishes when copyChan is closed
 			// copierRoutine finishes when copyChan is closed
-			p.copierRoutine(copyChan, pullChan, finisherChan, checksum)
+			p.copierRoutine(copyChan, pullChan, finisherChan)
 			copyWg.Done()
 			copyWg.Done()
 		}()
 		}()
 	}
 	}
@@ -613,7 +605,7 @@ func (p *Puller) shortcutSymlink(curFile, file protocol.FileInfo) {
 
 
 // copierRoutine reads copierStates until the in channel closes and performs
 // copierRoutine reads copierStates until the in channel closes and performs
 // the relevant copies when possible, or passes it to the puller routine.
 // the relevant copies when possible, or passes it to the puller routine.
-func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState, checksum bool) {
+func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
 	buf := make([]byte, protocol.BlockSize)
 	buf := make([]byte, protocol.BlockSize)
 
 
 nextFile:
 nextFile:
@@ -649,7 +641,6 @@ nextFile:
 		}
 		}
 		p.model.fmut.RUnlock()
 		p.model.fmut.RUnlock()
 
 
-		hasher := sha256.New()
 		for _, block := range state.blocks {
 		for _, block := range state.blocks {
 			buf = buf[:int(block.Size)]
 			buf = buf[:int(block.Size)]
 			found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
 			found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
@@ -673,12 +664,9 @@ nextFile:
 					return false
 					return false
 				}
 				}
 
 
-				// Only done on second to last puller attempt
-				if checksum {
-					hasher.Write(buf)
-					hash := hasher.Sum(nil)
-					hasher.Reset()
-					if !bytes.Equal(hash, block.Hash) {
+				hash, err := scanner.VerifyBuffer(buf, block)
+				if err != nil {
+					if hash != nil {
 						if debug {
 						if debug {
 							l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
 							l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
 						}
 						}
@@ -686,8 +674,10 @@ nextFile:
 						if err != nil {
 						if err != nil {
 							l.Warnln("finder fix:", err)
 							l.Warnln("finder fix:", err)
 						}
 						}
-						return false
+					} else if debug {
+						l.Debugln("Finder failed to verify buffer", err)
 					}
 					}
+					return false
 				}
 				}
 
 
 				_, err = dstFd.WriteAt(buf, block.Offset)
 				_, err = dstFd.WriteAt(buf, block.Offset)
@@ -722,20 +712,9 @@ nextFile:
 }
 }
 
 
 func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
 func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
-nextBlock:
 	for state := range in {
 	for state := range in {
 		if state.failed() != nil {
 		if state.failed() != nil {
-			continue nextBlock
-		}
-
-		// Select the least busy device to pull the block from. If we found no
-		// feasible device at all, fail the block (and in the long run, the
-		// file).
-		potentialDevices := p.model.availability(p.folder, state.file.Name)
-		selected := activity.leastBusy(potentialDevices)
-		if selected == (protocol.DeviceID{}) {
-			state.earlyClose("pull", errNoDevice)
-			continue nextBlock
+			continue
 		}
 		}
 
 
 		// Get an fd to the temporary file. Tehcnically we don't need it until
 		// Get an fd to the temporary file. Tehcnically we don't need it until
@@ -743,45 +722,58 @@ nextBlock:
 		// no point in issuing the request to the network.
 		// no point in issuing the request to the network.
 		fd, err := state.tempFile()
 		fd, err := state.tempFile()
 		if err != nil {
 		if err != nil {
-			continue nextBlock
+			continue
 		}
 		}
 
 
-		// Fetch the block, while marking the selected device as in use so that
-		// leastBusy can select another device when someone else asks.
-		activity.using(selected)
-		buf, err := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash)
-		activity.done(selected)
-		if err != nil {
-			state.earlyClose("pull", err)
-			continue nextBlock
-		}
+		var lastError error
+		potentialDevices := p.model.availability(p.folder, state.file.Name)
+		for {
+			// Select the least busy device to pull the block from. If we found no
+			// feasible device at all, fail the block (and in the long run, the
+			// file).
+			selected := activity.leastBusy(potentialDevices)
+			if selected == (protocol.DeviceID{}) {
+				if lastError != nil {
+					state.earlyClose("pull", lastError)
+				} else {
+					state.earlyClose("pull", errNoDevice)
+				}
+				break
+			}
 
 
-		// Save the block data we got from the cluster
-		_, err = fd.WriteAt(buf, state.block.Offset)
-		if err != nil {
-			state.earlyClose("save", err)
-			continue nextBlock
-		}
+			potentialDevices = removeDevice(potentialDevices, selected)
 
 
-		state.pullDone()
-		out <- state.sharedPullerState
+			// Fetch the block, while marking the selected device as in use so that
+			// leastBusy can select another device when someone else asks.
+			activity.using(selected)
+			buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash)
+			activity.done(selected)
+			if lastError != nil {
+				continue
+			}
+
+			// Verify that the received block matches the desired hash, if not
+			// try pulling it from another device.
+			_, lastError = scanner.VerifyBuffer(buf, state.block)
+			if lastError != nil {
+				continue
+			}
+
+			// Save the block data we got from the cluster
+			_, err = fd.WriteAt(buf, state.block.Offset)
+			if err != nil {
+				state.earlyClose("save", err)
+			} else {
+				state.pullDone()
+				out <- state.sharedPullerState
+			}
+			break
+		}
 	}
 	}
 }
 }
 
 
 func (p *Puller) performFinish(state *sharedPullerState) {
 func (p *Puller) performFinish(state *sharedPullerState) {
-	// Verify the file against expected hashes
-	fd, err := os.Open(state.tempName)
-	if err != nil {
-		l.Warnln("puller: final:", err)
-		return
-	}
-	err = scanner.Verify(fd, protocol.BlockSize, state.file.Blocks)
-	fd.Close()
-	if err != nil {
-		l.Infoln("puller:", state.file.Name, err, "(file changed during pull?)")
-		return
-	}
-
+	var err error
 	// Set the correct permission bits on the new file
 	// Set the correct permission bits on the new file
 	if !p.ignorePerms {
 	if !p.ignorePerms {
 		err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
 		err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
@@ -893,3 +885,13 @@ func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
 		}
 		}
 	}
 	}
 }
 }
+
+func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID {
+	for i := range devices {
+		if devices[i] == device {
+			devices[i] = devices[len(devices)-1]
+			return devices[:len(devices)-1]
+		}
+	}
+	return devices
+}

+ 5 - 6
internal/model/puller_test.go

@@ -221,7 +221,7 @@ func TestCopierFinder(t *testing.T) {
 	finisherChan := make(chan *sharedPullerState, 1)
 	finisherChan := make(chan *sharedPullerState, 1)
 
 
 	// Run a single fetcher routine
 	// Run a single fetcher routine
-	go p.copierRoutine(copyChan, pullChan, finisherChan, false)
+	go p.copierRoutine(copyChan, pullChan, finisherChan)
 
 
 	p.handleFile(requiredFile, copyChan, finisherChan)
 	p.handleFile(requiredFile, copyChan, finisherChan)
 
 
@@ -317,9 +317,8 @@ func TestCopierCleanup(t *testing.T) {
 	}
 	}
 }
 }
 
 
-// On the 10th iteration, we start hashing the content which we receive by
-// following blockfinder's instructions. Make sure that the copier routine
-// hashes the content when asked, and pulls if it fails to find the block.
+// Make sure that the copier routine hashes the content when asked, and pulls
+// if it fails to find the block.
 func TestLastResortPulling(t *testing.T) {
 func TestLastResortPulling(t *testing.T) {
 	fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"}
 	fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"}
 	cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}}
 	cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}}
@@ -361,8 +360,8 @@ func TestLastResortPulling(t *testing.T) {
 	pullChan := make(chan pullBlockState, 1)
 	pullChan := make(chan pullBlockState, 1)
 	finisherChan := make(chan *sharedPullerState, 1)
 	finisherChan := make(chan *sharedPullerState, 1)
 
 
-	// Run a single copier routine with checksumming enabled
-	go p.copierRoutine(copyChan, pullChan, finisherChan, true)
+	// Run a single copier routine
+	go p.copierRoutine(copyChan, pullChan, finisherChan)
 
 
 	p.handleFile(file, copyChan, finisherChan)
 	p.handleFile(file, copyChan, finisherChan)
 
 

+ 18 - 0
internal/scanner/blocks.go

@@ -130,6 +130,24 @@ func Verify(r io.Reader, blocksize int, blocks []protocol.BlockInfo) error {
 	return nil
 	return nil
 }
 }
 
 
+func VerifyBuffer(buf []byte, block protocol.BlockInfo) ([]byte, error) {
+	if len(buf) != int(block.Size) {
+		return nil, fmt.Errorf("length mismatch %d != %d", len(buf), block.Size)
+	}
+	hf := sha256.New()
+	_, err := hf.Write(buf)
+	if err != nil {
+		return nil, err
+	}
+	hash := hf.Sum(nil)
+
+	if !bytes.Equal(hash, block.Hash) {
+		return hash, fmt.Errorf("hash mismatch %x != %x", hash, block.Hash)
+	}
+
+	return hash, nil
+}
+
 // BlockEqual returns whether two slices of blocks are exactly the same hash
 // BlockEqual returns whether two slices of blocks are exactly the same hash
 // and index pair wise.
 // and index pair wise.
 func BlocksEqual(src, tgt []protocol.BlockInfo) bool {
 func BlocksEqual(src, tgt []protocol.BlockInfo) bool {

+ 1 - 1
test/sync_test.go

@@ -28,7 +28,7 @@ import (
 	"github.com/syncthing/syncthing/internal/protocol"
 	"github.com/syncthing/syncthing/internal/protocol"
 )
 )
 
 
-func TestSyncCluster(t *testing.T) {
+func TestSyncClusterWithoutVersioning(t *testing.T) {
 	// Use no versioning
 	// Use no versioning
 	id, _ := protocol.DeviceIDFromString(id2)
 	id, _ := protocol.DeviceIDFromString(id2)
 	cfg, _ := config.Load("h2/config.xml", id)
 	cfg, _ := config.Load("h2/config.xml", id)