Browse Source

Recover from corrupt block maps

Audrius Butkevicius 11 years ago
parent
commit
d4199c2d08

+ 12 - 0
internal/files/blockmap.go

@@ -179,6 +179,18 @@ func (f *BlockFinder) Iterate(hash []byte, iterFn func(string, string, uint32) b
 	return false
 }
 
+// A method for repairing incorrect blockmap entries, removes the old entry
+// and replaces it with a new entry for the given block
+func (f *BlockFinder) Fix(folder, file string, index uint32, oldHash, newHash []byte) error {
+	buf := make([]byte, 4)
+	binary.BigEndian.PutUint32(buf, uint32(index))
+
+	batch := new(leveldb.Batch)
+	batch.Delete(toBlockKey(oldHash, folder, file))
+	batch.Put(toBlockKey(newHash, folder, file), buf)
+	return f.db.Write(batch, nil)
+}
+
 // m.blockKey returns a byte slice encoding the following information:
 //	   keyTypeBlock (1 byte)
 //	   folder (64 bytes)

+ 31 - 0
internal/files/blockmap_test.go

@@ -235,3 +235,34 @@ func TestBlockFinderLookup(t *testing.T) {
 
 	f1.Flags = 0
 }
+
+func TestBlockFinderFix(t *testing.T) {
+	db, f := setup()
+
+	iterFn := func(folder, file string, index uint32) bool {
+		return true
+	}
+
+	m := NewBlockMap(db, "folder1")
+	err := m.Add([]protocol.FileInfo{f1})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !f.Iterate(f1.Blocks[0].Hash, iterFn) {
+		t.Fatal("Block not found")
+	}
+
+	err = f.Fix("folder1", f1.Name, 0, f1.Blocks[0].Hash, f2.Blocks[0].Hash)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if f.Iterate(f1.Blocks[0].Hash, iterFn) {
+		t.Fatal("Unexpected block")
+	}
+
+	if !f.Iterate(f2.Blocks[0].Hash, iterFn) {
+		t.Fatal("Block not found")
+	}
+}

+ 32 - 7
internal/model/puller.go

@@ -17,6 +17,7 @@ package model
 
 import (
 	"bytes"
+	"crypto/sha256"
 	"errors"
 	"fmt"
 	"os"
@@ -141,9 +142,16 @@ loop:
 			}
 			p.model.setState(p.folder, FolderSyncing)
 			tries := 0
+			checksum := false
 			for {
 				tries++
-				changed := p.pullerIteration(copiersPerFolder, pullersPerFolder, finishersPerFolder)
+				// Last resort mode, to get around corrupt/invalid block maps.
+				if tries == 10 {
+					l.Infoln("Desperation mode ON")
+					checksum = true
+				}
+
+				changed := p.pullerIteration(copiersPerFolder, pullersPerFolder, finishersPerFolder, checksum)
 				if debug {
 					l.Debugln(p, "changed", changed)
 				}
@@ -234,7 +242,7 @@ func (p *Puller) String() string {
 // finisher routines are used. It's seldom efficient to use more than one
 // copier routine, while multiple pullers are essential and multiple finishers
 // may be useful (they are primarily CPU bound due to hashing).
-func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
+func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int, checksum bool) int {
 	pullChan := make(chan pullBlockState)
 	copyChan := make(chan copyBlocksState)
 	finisherChan := make(chan *sharedPullerState)
@@ -247,7 +255,7 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
 		copyWg.Add(1)
 		go func() {
 			// copierRoutine finishes when copyChan is closed
-			p.copierRoutine(copyChan, pullChan, finisherChan)
+			p.copierRoutine(copyChan, pullChan, finisherChan, checksum)
 			copyWg.Done()
 		}()
 	}
@@ -549,7 +557,7 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) {
 
 // copierRoutine reads copierStates until the in channel closes and performs
 // the relevant copies when possible, or passes it to the puller routine.
-func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
+func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState, checksum bool) {
 	buf := make([]byte, protocol.BlockSize)
 
 nextFile:
@@ -574,10 +582,10 @@ nextFile:
 			}
 		}()
 
+		hasher := sha256.New()
 		for _, block := range state.blocks {
 			buf = buf[:int(block.Size)]
-
-			success := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
+			found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
 				path := filepath.Join(p.model.folderCfgs[folder].Path, file)
 
 				var fd *os.File
@@ -598,6 +606,23 @@ nextFile:
 					return false
 				}
 
+				// Only done on second to last puller attempt
+				if checksum {
+					hasher.Write(buf)
+					hash := hasher.Sum(nil)
+					hasher.Reset()
+					if !bytes.Equal(hash, block.Hash) {
+						if debug {
+							l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
+						}
+						err = p.model.finder.Fix(folder, file, index, block.Hash, hash)
+						if err != nil {
+							l.Warnln("finder fix:", err)
+						}
+						return false
+					}
+				}
+
 				_, err = dstFd.WriteAt(buf, block.Offset)
 				if err != nil {
 					state.earlyClose("dst write", err)
@@ -612,7 +637,7 @@ nextFile:
 				break
 			}
 
-			if !success {
+			if !found {
 				state.pullStarted()
 				ps := pullBlockState{
 					sharedPullerState: state.sharedPullerState,

+ 68 - 1
internal/model/puller_test.go

@@ -210,7 +210,7 @@ func TestCopierFinder(t *testing.T) {
 	finisherChan := make(chan *sharedPullerState, 1)
 
 	// Run a single fetcher routine
-	go p.copierRoutine(copyChan, pullChan, finisherChan)
+	go p.copierRoutine(copyChan, pullChan, finisherChan, false)
 
 	p.handleFile(requiredFile, copyChan, finisherChan)
 
@@ -305,3 +305,70 @@ func TestCopierCleanup(t *testing.T) {
 		t.Error("Expected block not found")
 	}
 }
+
+// On the 10th iteration, we start hashing the content which we receive by
+// following blockfinder's instructions. Make sure that the copier routine
+// hashes the content when asked, and pulls if it fails to find the block.
+func TestLastResortPulling(t *testing.T) {
+	fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"}
+	cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}}
+
+	db, _ := leveldb.Open(storage.NewMemStorage(), nil)
+	m := NewModel(config.Wrap("/tmp/test", cfg), "device", "syncthing", "dev", db)
+	m.AddFolder(fcfg)
+
+	// Add a file to index (with the incorrect block representation, as content
+	// doesn't actually match the block list)
+	file := protocol.FileInfo{
+		Name:     "empty",
+		Flags:    0,
+		Modified: 0,
+		Blocks:   []protocol.BlockInfo{blocks[0]},
+	}
+	m.updateLocal("default", file)
+
+	// Pretend that we are handling a new file of the same content but
+	// with a different name (causing to copy that particular block)
+	file.Name = "newfile"
+
+	iterFn := func(folder, file string, index uint32) bool {
+		return true
+	}
+
+	// Check that that particular block is there
+	if !m.finder.Iterate(blocks[0].Hash, iterFn) {
+		t.Error("Expected block not found")
+	}
+
+	p := Puller{
+		folder: "default",
+		dir:    "testdata",
+		model:  m,
+	}
+
+	copyChan := make(chan copyBlocksState)
+	pullChan := make(chan pullBlockState, 1)
+	finisherChan := make(chan *sharedPullerState, 1)
+
+	// Run a single copier routine with checksumming enabled
+	go p.copierRoutine(copyChan, pullChan, finisherChan, true)
+
+	p.handleFile(file, copyChan, finisherChan)
+
+	// Copier should hash empty file, realise that the region it has read
+	// doesn't match the hash which was advertised by the block map, fix it
+	// and ask to pull the block.
+	<-pullChan
+
+	// Verify that it did fix the incorrect hash.
+	if m.finder.Iterate(blocks[0].Hash, iterFn) {
+		t.Error("Found unexpected block")
+	}
+
+	if !m.finder.Iterate(scanner.SHA256OfNothing, iterFn) {
+		t.Error("Expected block not found")
+	}
+
+	(<-finisherChan).fd.Close()
+	os.Remove(filepath.Join("testdata", defTempNamer.TempName("newfile")))
+}

+ 2 - 2
internal/scanner/blocks.go

@@ -24,7 +24,7 @@ import (
 	"github.com/syncthing/syncthing/internal/protocol"
 )
 
-var sha256OfNothing = []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}
+var SHA256OfNothing = []uint8{0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55}
 
 // Blocks returns the blockwise hash of the reader.
 func Blocks(r io.Reader, blocksize int, sizehint int64) ([]protocol.BlockInfo, error) {
@@ -61,7 +61,7 @@ func Blocks(r io.Reader, blocksize int, sizehint int64) ([]protocol.BlockInfo, e
 		blocks = append(blocks, protocol.BlockInfo{
 			Offset: 0,
 			Size:   0,
-			Hash:   sha256OfNothing,
+			Hash:   SHA256OfNothing,
 		})
 	}