Browse Source

Merge branch 'pr/830'

* pr/830:
  Delete files and directories after pulling
  Add fetcher tests
  Track total block counts, count copier blocks
  Fix tests
  Implement block fetcher (fixes #781, fixes #3)
  Populate BlockMap
  Implement BlockMap
Jakob Borg 11 years ago
parent
commit
15b875b116

+ 201 - 0
internal/files/blockmap.go

@@ -0,0 +1,201 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see <http://www.gnu.org/licenses/>.
+
+// Package files provides a set type to track local/remote files with newness
+// checks. We must do a certain amount of normalization in here. We will get
+// fed paths with either native or wire-format separators and encodings
+// depending on who calls us. We transform paths to wire-format (NFC and
+// slashes) on the way to the database, and transform to native format
+// (varying separator and encoding) on the way back out.
+
+package files
+
+import (
+	"bytes"
+	"encoding/binary"
+	"sort"
+	"sync"
+
+	"github.com/syncthing/syncthing/internal/config"
+	"github.com/syncthing/syncthing/internal/protocol"
+
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/util"
+)
+
+var blockFinder *BlockFinder
+
+type BlockMap struct {
+	db     *leveldb.DB
+	folder string
+}
+
+func NewBlockMap(db *leveldb.DB, folder string) *BlockMap {
+	return &BlockMap{
+		db:     db,
+		folder: folder,
+	}
+}
+
+// Add files to the block map, ignoring any deleted or invalid files.
+func (m *BlockMap) Add(files []protocol.FileInfo) error {
+	batch := new(leveldb.Batch)
+	buf := make([]byte, 4)
+	for _, file := range files {
+		if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() {
+			continue
+		}
+
+		for i, block := range file.Blocks {
+			binary.BigEndian.PutUint32(buf, uint32(i))
+			batch.Put(m.blockKey(block.Hash, file.Name), buf)
+		}
+	}
+	return m.db.Write(batch, nil)
+}
+
+// Update block map state, removing any deleted or invalid files.
+func (m *BlockMap) Update(files []protocol.FileInfo) error {
+	batch := new(leveldb.Batch)
+	buf := make([]byte, 4)
+	for _, file := range files {
+		if file.IsDirectory() {
+			continue
+		}
+
+		if file.IsDeleted() || file.IsInvalid() {
+			for _, block := range file.Blocks {
+				batch.Delete(m.blockKey(block.Hash, file.Name))
+			}
+			continue
+		}
+
+		for i, block := range file.Blocks {
+			binary.BigEndian.PutUint32(buf, uint32(i))
+			batch.Put(m.blockKey(block.Hash, file.Name), buf)
+		}
+	}
+	return m.db.Write(batch, nil)
+}
+
+// Drop block map, removing all entries related to this block map from the db.
+func (m *BlockMap) Drop() error {
+	batch := new(leveldb.Batch)
+	iter := m.db.NewIterator(util.BytesPrefix(m.blockKey(nil, "")[:1+64]), nil)
+	defer iter.Release()
+	for iter.Next() {
+		batch.Delete(iter.Key())
+	}
+	if iter.Error() != nil {
+		return iter.Error()
+	}
+	return m.db.Write(batch, nil)
+}
+
+func (m *BlockMap) blockKey(hash []byte, file string) []byte {
+	return toBlockKey(hash, m.folder, file)
+}
+
+type BlockFinder struct {
+	db      *leveldb.DB
+	folders []string
+	mut     sync.RWMutex
+}
+
+func NewBlockFinder(db *leveldb.DB, cfg *config.ConfigWrapper) *BlockFinder {
+	if blockFinder != nil {
+		return blockFinder
+	}
+
+	f := &BlockFinder{
+		db: db,
+	}
+	f.Changed(cfg.Raw())
+	cfg.Subscribe(f)
+	return f
+}
+
+// Implements config.Handler interface
+func (f *BlockFinder) Changed(cfg config.Configuration) error {
+	folders := make([]string, len(cfg.Folders))
+	for i, folder := range cfg.Folders {
+		folders[i] = folder.ID
+	}
+
+	sort.Strings(folders)
+
+	f.mut.Lock()
+	f.folders = folders
+	f.mut.Unlock()
+
+	return nil
+}
+
+// An iterator function which iterates over all matching blocks for the given
+// hash. The iterator function has to return either true (if they are happy with
+// the block) or false to continue iterating for whatever reason.
+// The iterator finally returns the result, whether or not a satisfying block
+// was eventually found.
+func (f *BlockFinder) Iterate(hash []byte, iterFn func(string, string, uint32) bool) bool {
+	f.mut.RLock()
+	folders := f.folders
+	f.mut.RUnlock()
+	for _, folder := range folders {
+		key := toBlockKey(hash, folder, "")
+		iter := f.db.NewIterator(util.BytesPrefix(key), nil)
+		defer iter.Release()
+
+		for iter.Next() && iter.Error() == nil {
+			folder, file := fromBlockKey(iter.Key())
+			index := binary.BigEndian.Uint32(iter.Value())
+			if iterFn(folder, nativeFilename(file), index) {
+				return true
+			}
+		}
+	}
+	return false
+}
+
+// m.blockKey returns a byte slice encoding the following information:
+//	   keyTypeBlock (1 byte)
+//	   folder (64 bytes)
+//	   block hash (64 bytes)
+//	   file name (variable size)
+func toBlockKey(hash []byte, folder, file string) []byte {
+	o := make([]byte, 1+64+64+len(file))
+	o[0] = keyTypeBlock
+	copy(o[1:], []byte(folder))
+	copy(o[1+64:], []byte(hash))
+	copy(o[1+64+64:], []byte(file))
+	return o
+}
+
+func fromBlockKey(data []byte) (string, string) {
+	if len(data) < 1+64+64+1 {
+		panic("Incorrect key length")
+	}
+	if data[0] != keyTypeBlock {
+		panic("Incorrect key type")
+	}
+
+	file := string(data[1+64+64:])
+
+	slice := data[1 : 1+64]
+	izero := bytes.IndexByte(slice, 0)
+	if izero > -1 {
+		return string(slice[:izero]), file
+	}
+	return string(slice), file
+}

+ 235 - 0
internal/files/blockmap_test.go

@@ -0,0 +1,235 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package files
+
+import (
+	"testing"
+
+	"github.com/syncthing/syncthing/internal/config"
+	"github.com/syncthing/syncthing/internal/protocol"
+
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+func genBlocks(n int) []protocol.BlockInfo {
+	b := make([]protocol.BlockInfo, n)
+	for i := range b {
+		h := make([]byte, 32)
+		for j := range h {
+			h[j] = byte(i + j)
+		}
+		b[i].Size = uint32(i)
+		b[i].Hash = h
+	}
+	return b
+}
+
+var f1, f2, f3 protocol.FileInfo
+
+func init() {
+	blocks := genBlocks(30)
+
+	f1 = protocol.FileInfo{
+		Name:   "f1",
+		Blocks: blocks[:10],
+	}
+
+	f2 = protocol.FileInfo{
+		Name:   "f2",
+		Blocks: blocks[10:20],
+	}
+
+	f3 = protocol.FileInfo{
+		Name:   "f3",
+		Blocks: blocks[20:],
+	}
+}
+
+func setup() (*leveldb.DB, *BlockFinder) {
+	// Setup
+
+	db, err := leveldb.Open(storage.NewMemStorage(), nil)
+	if err != nil {
+		panic(err)
+	}
+
+	wrapper := config.Wrap("", config.Configuration{})
+	wrapper.SetFolder(config.FolderConfiguration{
+		ID: "folder1",
+	})
+	wrapper.SetFolder(config.FolderConfiguration{
+		ID: "folder2",
+	})
+
+	return db, NewBlockFinder(db, wrapper)
+}
+
+func dbEmpty(db *leveldb.DB) bool {
+	iter := db.NewIterator(nil, nil)
+	defer iter.Release()
+	if iter.Next() {
+		return false
+	}
+	return true
+}
+
+func TestBlockMapAddUpdateWipe(t *testing.T) {
+	db, f := setup()
+
+	if !dbEmpty(db) {
+		t.Fatal("db not empty")
+	}
+
+	m := NewBlockMap(db, "folder1")
+
+	f3.Flags |= protocol.FlagDirectory
+
+	err := m.Add([]protocol.FileInfo{f1, f2, f3})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		if folder != "folder1" || file != "f1" || index != 0 {
+			t.Fatal("Mismatch")
+		}
+		return true
+	})
+
+	f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		if folder != "folder1" || file != "f2" || index != 0 {
+			t.Fatal("Mismatch")
+		}
+		return true
+	})
+
+	f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		t.Fatal("Unexpected block")
+		return true
+	})
+
+	f3.Flags = f1.Flags
+	f1.Flags |= protocol.FlagDeleted
+	f2.Flags |= protocol.FlagInvalid
+
+	// Should remove
+	err = m.Update([]protocol.FileInfo{f1, f2, f3})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		t.Fatal("Unexpected block")
+		return false
+	})
+
+	f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		t.Fatal("Unexpected block")
+		return false
+	})
+
+	f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		if folder != "folder1" || file != "f3" || index != 0 {
+			t.Fatal("Mismatch")
+		}
+		return true
+	})
+
+	err = m.Drop()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !dbEmpty(db) {
+		t.Fatal("db not empty")
+	}
+
+	// Should not add
+	err = m.Add([]protocol.FileInfo{f1, f2})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !dbEmpty(db) {
+		t.Fatal("db not empty")
+	}
+
+	f1.Flags = 0
+	f2.Flags = 0
+	f3.Flags = 0
+}
+
+func TestBlockMapFinderLookup(t *testing.T) {
+	db, f := setup()
+
+	m1 := NewBlockMap(db, "folder1")
+	m2 := NewBlockMap(db, "folder2")
+
+	err := m1.Add([]protocol.FileInfo{f1})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = m2.Add([]protocol.FileInfo{f1})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	counter := 0
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		counter++
+		switch counter {
+		case 1:
+			if folder != "folder1" || file != "f1" || index != 0 {
+				t.Fatal("Mismatch")
+			}
+		case 2:
+			if folder != "folder2" || file != "f1" || index != 0 {
+				t.Fatal("Mismatch")
+			}
+		default:
+			t.Fatal("Unexpected block")
+		}
+		return false
+	})
+	if counter != 2 {
+		t.Fatal("Incorrect count", counter)
+	}
+
+	f1.Flags |= protocol.FlagDeleted
+
+	err = m1.Update([]protocol.FileInfo{f1})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	counter = 0
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		counter++
+		switch counter {
+		case 1:
+			if folder != "folder2" || file != "f1" || index != 0 {
+				t.Fatal("Mismatch")
+			}
+		default:
+			t.Fatal("Unexpected block")
+		}
+		return false
+	})
+	if counter != 1 {
+		t.Fatal("Incorrect count")
+	}
+}

+ 1 - 0
internal/files/leveldb.go

@@ -49,6 +49,7 @@ func clock(v uint64) uint64 {
 const (
 	keyTypeDevice = iota
 	keyTypeGlobal
+	keyTypeBlock
 )
 
 type fileVersion struct {

+ 18 - 0
internal/files/set.go

@@ -42,6 +42,7 @@ type Set struct {
 	mutex        sync.Mutex
 	folder       string
 	db           *leveldb.DB
+	blockmap     *BlockMap
 }
 
 func NewSet(folder string, db *leveldb.DB) *Set {
@@ -49,6 +50,7 @@ func NewSet(folder string, db *leveldb.DB) *Set {
 		localVersion: make(map[protocol.DeviceID]uint64),
 		folder:       folder,
 		db:           db,
+		blockmap:     NewBlockMap(db, folder),
 	}
 
 	var deviceID protocol.DeviceID
@@ -80,6 +82,10 @@ func (s *Set) Replace(device protocol.DeviceID, fs []protocol.FileInfo) {
 		// Reset the local version if all files were removed.
 		s.localVersion[device] = 0
 	}
+	if device == protocol.LocalDeviceID {
+		s.blockmap.Drop()
+		s.blockmap.Add(fs)
+	}
 }
 
 func (s *Set) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo) {
@@ -92,6 +98,10 @@ func (s *Set) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo
 	if lv := ldbReplaceWithDelete(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] {
 		s.localVersion[device] = lv
 	}
+	if device == protocol.LocalDeviceID {
+		s.blockmap.Drop()
+		s.blockmap.Add(fs)
+	}
 }
 
 func (s *Set) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
@@ -104,6 +114,9 @@ func (s *Set) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
 	if lv := ldbUpdate(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] {
 		s.localVersion[device] = lv
 	}
+	if device == protocol.LocalDeviceID {
+		s.blockmap.Update(fs)
+	}
 }
 
 func (s *Set) WithNeed(device protocol.DeviceID, fn fileIterator) {
@@ -179,6 +192,11 @@ func ListFolders(db *leveldb.DB) []string {
 // database.
 func DropFolder(db *leveldb.DB, folder string) {
 	ldbDropFolder(db, []byte(folder))
+	bm := &BlockMap{
+		db:     db,
+		folder: folder,
+	}
+	bm.Drop()
 }
 
 func normalizeFilenames(fs []protocol.FileInfo) {

+ 4 - 2
internal/model/model.go

@@ -81,8 +81,9 @@ type service interface {
 }
 
 type Model struct {
-	cfg *config.ConfigWrapper
-	db  *leveldb.DB
+	cfg    *config.ConfigWrapper
+	db     *leveldb.DB
+	finder *files.BlockFinder
 
 	deviceName    string
 	clientName    string
@@ -137,6 +138,7 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s
 		protoConn:          make(map[protocol.DeviceID]protocol.Connection),
 		rawConn:            make(map[protocol.DeviceID]io.Closer),
 		deviceVer:          make(map[protocol.DeviceID]string),
+		finder:             files.NewBlockFinder(db, cfg),
 	}
 
 	var timeout = 20 * 60 // seconds

+ 6 - 2
internal/model/model_test.go

@@ -390,8 +390,12 @@ func TestIgnores(t *testing.T) {
 	}
 
 	db, _ := leveldb.Open(storage.NewMemStorage(), nil)
-	m := NewModel(config.Wrap("", config.Configuration{}), "device", "syncthing", "dev", db)
-	m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+	fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"}
+	cfg := config.Wrap("/tmp", config.Configuration{
+		Folders: []config.FolderConfiguration{fcfg},
+	})
+	m := NewModel(cfg, "device", "syncthing", "dev", db)
+	m.AddFolder(fcfg)
 
 	expected := []string{
 		".*",

+ 118 - 96
internal/model/puller.go

@@ -16,6 +16,7 @@
 package model
 
 import (
+	"bytes"
 	"errors"
 	"fmt"
 	"os"
@@ -23,6 +24,8 @@ import (
 	"sync"
 	"time"
 
+	"github.com/AudriusButkevicius/lfu-go"
+
 	"github.com/syncthing/syncthing/internal/config"
 	"github.com/syncthing/syncthing/internal/events"
 	"github.com/syncthing/syncthing/internal/osutil"
@@ -50,7 +53,7 @@ type pullBlockState struct {
 }
 
 // A copyBlocksState is passed to copy routine if the file has blocks to be
-// copied from the original.
+// copied.
 type copyBlocksState struct {
 	*sharedPullerState
 	blocks []protocol.BlockInfo
@@ -236,24 +239,25 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
 	copyChan := make(chan copyBlocksState)
 	finisherChan := make(chan *sharedPullerState)
 
-	var wg sync.WaitGroup
+	var copyWg sync.WaitGroup
+	var pullWg sync.WaitGroup
 	var doneWg sync.WaitGroup
 
 	for i := 0; i < ncopiers; i++ {
-		wg.Add(1)
+		copyWg.Add(1)
 		go func() {
 			// copierRoutine finishes when copyChan is closed
-			p.copierRoutine(copyChan, finisherChan)
-			wg.Done()
+			p.copierRoutine(copyChan, pullChan, finisherChan)
+			copyWg.Done()
 		}()
 	}
 
 	for i := 0; i < npullers; i++ {
-		wg.Add(1)
+		pullWg.Add(1)
 		go func() {
 			// pullerRoutine finishes when pullChan is closed
 			p.pullerRoutine(pullChan, finisherChan)
-			wg.Done()
+			pullWg.Done()
 		}()
 	}
 
@@ -277,6 +281,9 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
 	// !!!
 
 	changed := 0
+
+	var deletions []protocol.FileInfo
+
 	files.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
 
 		// Needed items are delivered sorted lexicographically. This isn't
@@ -298,19 +305,16 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
 		}
 
 		switch {
-		case protocol.IsDirectory(file.Flags) && protocol.IsDeleted(file.Flags):
-			// A deleted directory
-			p.deleteDir(file)
+		case protocol.IsDeleted(file.Flags):
+			// A deleted file or directory
+			deletions = append(deletions, file)
 		case protocol.IsDirectory(file.Flags):
 			// A new or changed directory
 			p.handleDir(file)
-		case protocol.IsDeleted(file.Flags):
-			// A deleted file
-			p.deleteFile(file)
 		default:
 			// A new or changed file. This is the only case where we do stuff
 			// in the background; the other three are done synchronously.
-			p.handleFile(file, copyChan, pullChan, finisherChan)
+			p.handleFile(file, copyChan, finisherChan)
 		}
 
 		changed++
@@ -318,18 +322,27 @@ func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
 	})
 
 	// Signal copy and puller routines that we are done with the in data for
-	// this iteration
+	// this iteration. Wait for them to finish.
 	close(copyChan)
+	copyWg.Wait()
 	close(pullChan)
+	pullWg.Wait()
 
-	// Wait for them to finish, then signal the finisher chan that there will
-	// be no more input.
-	wg.Wait()
+	// Signal the finisher chan that there will be no more input.
 	close(finisherChan)
 
 	// Wait for the finisherChan to finish.
 	doneWg.Wait()
 
+	for i := range deletions {
+		deletion := deletions[len(deletions)-i-1]
+		if deletion.IsDirectory() {
+			p.deleteDir(deletion)
+		} else {
+			p.deleteFile(deletion)
+		}
+	}
+
 	return changed
 }
 
@@ -419,11 +432,15 @@ func (p *Puller) deleteFile(file protocol.FileInfo) {
 
 // handleFile queues the copies and pulls as necessary for a single new or
 // changed file.
-func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) {
+func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
 	curFile := p.model.CurrentFolderFile(p.folder, file.Name)
-	copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks)
 
-	if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 {
+	if len(curFile.Blocks) == len(file.Blocks) {
+		for i := range file.Blocks {
+			if !bytes.Equal(curFile.Blocks[i].Hash, file.Blocks[i].Hash) {
+				goto FilesAreDifferent
+			}
+		}
 		// We are supposed to copy the entire file, and then fetch nothing. We
 		// are only updating metadata, so we don't actually *need* to make the
 		// copy.
@@ -434,11 +451,14 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
 		return
 	}
 
+FilesAreDifferent:
+
 	// Figure out the absolute filenames we need once and for all
 	tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
 	realName := filepath.Join(p.dir, file.Name)
 
-	var reuse bool
+	reused := 0
+	var blocks []protocol.BlockInfo
 
 	// Check for an old temporary file which might have some blocks we could
 	// reuse.
@@ -453,38 +473,25 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
 			existingBlocks[block.String()] = true
 		}
 
-		// Since the blocks are already there, we don't need to copy them
-		// nor we need to pull them, hence discard blocks which are already
-		// there, if they are exactly the same...
-		var newCopyBlocks []protocol.BlockInfo
-		for _, block := range copyBlocks {
+		// Since the blocks are already there, we don't need to get them.
+		for _, block := range file.Blocks {
 			_, ok := existingBlocks[block.String()]
 			if !ok {
-				newCopyBlocks = append(newCopyBlocks, block)
+				blocks = append(blocks, block)
 			}
 		}
 
-		var newPullBlocks []protocol.BlockInfo
-		for _, block := range pullBlocks {
-			_, ok := existingBlocks[block.String()]
-			if !ok {
-				newPullBlocks = append(newPullBlocks, block)
-			}
-		}
-
-		// If any blocks could be reused, let the sharedpullerstate know
-		// which flags it is expected to set on the file.
-		// Also update the list of work for the routines.
-		if len(copyBlocks) != len(newCopyBlocks) || len(pullBlocks) != len(newPullBlocks) {
-			reuse = true
-			copyBlocks = newCopyBlocks
-			pullBlocks = newPullBlocks
-		} else {
+		// The sharedpullerstate will know which flags to use when opening the
+		// temp file depending if we are reusing any blocks or not.
+		reused = len(file.Blocks) - len(blocks)
+		if reused == 0 {
 			// Otherwise, discard the file ourselves in order for the
 			// sharedpuller not to panic when it fails to exlusively create a
 			// file which already exists
 			os.Remove(tempName)
 		}
+	} else {
+		blocks = file.Blocks
 	}
 
 	s := sharedPullerState{
@@ -492,43 +499,20 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
 		folder:     p.folder,
 		tempName:   tempName,
 		realName:   realName,
-		pullNeeded: len(pullBlocks),
-		reuse:      reuse,
-	}
-	if len(copyBlocks) > 0 {
-		s.copyNeeded = 1
+		copyTotal:  len(blocks),
+		copyNeeded: len(blocks),
+		reused:     reused,
 	}
 
 	if debug {
-		l.Debugf("%v need file %s; copy %d, pull %d, reuse %v", p, file.Name, len(copyBlocks), len(pullBlocks), reuse)
-	}
-
-	if len(copyBlocks) > 0 {
-		cs := copyBlocksState{
-			sharedPullerState: &s,
-			blocks:            copyBlocks,
-		}
-		copyChan <- cs
-	}
-
-	if len(pullBlocks) > 0 {
-		for _, block := range pullBlocks {
-			ps := pullBlockState{
-				sharedPullerState: &s,
-				block:             block,
-			}
-			pullChan <- ps
-		}
+		l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
 	}
 
-	if len(pullBlocks) == 0 && len(copyBlocks) == 0 {
-		if !reuse {
-			panic("bug: nothing to do with file?")
-		}
-		// We have a temp file that we can reuse totally. Jump directly to the
-		// finisher stage.
-		finisherChan <- &s
+	cs := copyBlocksState{
+		sharedPullerState: &s,
+		blocks:            blocks,
 	}
+	copyChan <- cs
 }
 
 // shortcutFile sets file mode and modification time, when that's the only
@@ -561,9 +545,9 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) {
 	p.model.updateLocal(p.folder, file)
 }
 
-// copierRoutine reads pullerStates until the in channel closes and performs
-// the relevant copy.
-func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) {
+// copierRoutine reads copierStates until the in channel closes and performs
+// the relevant copies when possible, or passes it to the puller routine.
+func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
 	buf := make([]byte, protocol.BlockSize)
 
 nextFile:
@@ -575,32 +559,70 @@ nextFile:
 			continue nextFile
 		}
 
-		srcFd, err := state.sourceFile()
-		if err != nil {
-			// As above
-			continue nextFile
-		}
+		evictionChan := make(chan lfu.Eviction)
+
+		fdCache := lfu.New()
+		fdCache.UpperBound = 50
+		fdCache.LowerBound = 20
+		fdCache.EvictionChannel = evictionChan
+
+		go func() {
+			for item := range evictionChan {
+				item.Value.(*os.File).Close()
+			}
+		}()
 
 		for _, block := range state.blocks {
 			buf = buf[:int(block.Size)]
 
-			_, err = srcFd.ReadAt(buf, block.Offset)
-			if err != nil {
-				state.earlyClose("src read", err)
-				srcFd.Close()
-				continue nextFile
+			success := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
+				path := filepath.Join(p.model.folderCfgs[folder].Path, file)
+
+				var fd *os.File
+
+				fdi := fdCache.Get(path)
+				if fdi != nil {
+					fd = fdi.(*os.File)
+				} else {
+					fd, err = os.Open(path)
+					if err != nil {
+						return false
+					}
+					fdCache.Set(path, fd)
+				}
+
+				_, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
+				if err != nil {
+					return false
+				}
+
+				_, err = dstFd.WriteAt(buf, block.Offset)
+				if err != nil {
+					state.earlyClose("dst write", err)
+				}
+				if file == state.file.Name {
+					state.copiedFromOrigin()
+				}
+				return true
+			})
+
+			if state.failed() != nil {
+				break
 			}
 
-			_, err = dstFd.WriteAt(buf, block.Offset)
-			if err != nil {
-				state.earlyClose("dst write", err)
-				srcFd.Close()
-				continue nextFile
+			if !success {
+				state.pullStarted()
+				ps := pullBlockState{
+					sharedPullerState: state.sharedPullerState,
+					block:             block,
+				}
+				pullChan <- ps
+			} else {
+				state.copyDone()
 			}
 		}
-
-		srcFd.Close()
-		state.copyDone()
+		fdCache.Evict(fdCache.Len())
+		close(evictionChan)
 		out <- state.sharedPullerState
 	}
 }

+ 116 - 37
internal/model/puller_test.go

@@ -16,10 +16,13 @@
 package model
 
 import (
+	"os"
+	"path/filepath"
 	"testing"
 
 	"github.com/syncthing/syncthing/internal/config"
 	"github.com/syncthing/syncthing/internal/protocol"
+	"github.com/syncthing/syncthing/internal/scanner"
 
 	"github.com/syndtr/goleveldb/leveldb"
 	"github.com/syndtr/goleveldb/leveldb/storage"
@@ -47,7 +50,7 @@ func TestHandleFile(t *testing.T) {
 	// Copy: 2, 5, 8
 	// Pull: 1, 3, 4, 6, 7
 
-	// Create existing file, and update local index
+	// Create existing file
 	existingFile := protocol.FileInfo{
 		Name:     "filex",
 		Flags:    0,
@@ -65,6 +68,7 @@ func TestHandleFile(t *testing.T) {
 	db, _ := leveldb.Open(storage.NewMemStorage(), nil)
 	m := NewModel(config.Wrap("/tmp/test", config.Configuration{}), "device", "syncthing", "dev", db)
 	m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+	// Update index
 	m.updateLocal("default", existingFile)
 
 	p := Puller{
@@ -73,34 +77,20 @@ func TestHandleFile(t *testing.T) {
 		model:  m,
 	}
 
-	copyChan := make(chan copyBlocksState, 1) // Copy chan gets all blocks needed to copy in a wrapper struct
-	pullChan := make(chan pullBlockState, 5)  // Pull chan gets blocks one by one
+	copyChan := make(chan copyBlocksState, 1)
 
-	p.handleFile(requiredFile, copyChan, pullChan, nil)
+	p.handleFile(requiredFile, copyChan, nil)
 
 	// Receive the results
 	toCopy := <-copyChan
-	toPull := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan, <-pullChan}
 
-	select {
-	case <-pullChan:
-		t.Error("Channel not empty!")
-	default:
-	}
-
-	if len(toCopy.blocks) != 3 {
-		t.Errorf("Unexpected count of copy blocks: %d != 3", len(toCopy.blocks))
+	if len(toCopy.blocks) != 8 {
+		t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks))
 	}
 
-	for i, eq := range []int{2, 5, 8} {
-		if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) {
-			t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String())
-		}
-	}
-
-	for i, eq := range []int{1, 3, 4, 6, 7} {
-		if string(toPull[i].block.Hash) != string(blocks[eq].Hash) {
-			t.Errorf("Block mismatch: %s != %s", toPull[i].block.String(), blocks[eq].String())
+	for i, block := range toCopy.blocks {
+		if string(block.Hash) != string(blocks[i+1].Hash) {
+			t.Errorf("Block mismatch: %s != %s", block.String(), blocks[i+1].String())
 		}
 	}
 }
@@ -114,7 +104,7 @@ func TestHandleFileWithTemp(t *testing.T) {
 	// Copy: 5, 8
 	// Pull: 1, 6
 
-	// Create existing file, and update local index
+	// Create existing file
 	existingFile := protocol.FileInfo{
 		Name:     "file",
 		Flags:    0,
@@ -132,6 +122,7 @@ func TestHandleFileWithTemp(t *testing.T) {
 	db, _ := leveldb.Open(storage.NewMemStorage(), nil)
 	m := NewModel(config.Wrap("/tmp/test", config.Configuration{}), "device", "syncthing", "dev", db)
 	m.AddFolder(config.FolderConfiguration{ID: "default", Path: "testdata"})
+	// Update index
 	m.updateLocal("default", existingFile)
 
 	p := Puller{
@@ -140,34 +131,122 @@ func TestHandleFileWithTemp(t *testing.T) {
 		model:  m,
 	}
 
-	copyChan := make(chan copyBlocksState, 1) // Copy chan gets all blocks needed to copy in a wrapper struct
-	pullChan := make(chan pullBlockState, 2)  // Pull chan gets blocks one by one
+	copyChan := make(chan copyBlocksState, 1)
 
-	p.handleFile(requiredFile, copyChan, pullChan, nil)
+	p.handleFile(requiredFile, copyChan, nil)
 
 	// Receive the results
 	toCopy := <-copyChan
-	toPull := []pullBlockState{<-pullChan, <-pullChan}
+
+	if len(toCopy.blocks) != 4 {
+		t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks))
+	}
+
+	for i, eq := range []int{1, 5, 6, 8} {
+		if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) {
+			t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String())
+		}
+	}
+}
+
+func TestCopierFinder(t *testing.T) {
+	// After diff between required and existing we should:
+	// Copy: 1, 2, 3, 4, 6, 7, 8
+	// Since there is no existing file, nor a temp file
+
+	// After dropping out blocks found locally:
+	// Pull: 1, 5, 6, 8
+
+	tempFile := filepath.Join("testdata", defTempNamer.TempName("file2"))
+	err := os.Remove(tempFile)
+	if err != nil && !os.IsNotExist(err) {
+		t.Error(err)
+	}
+
+	// Create existing file
+	existingFile := protocol.FileInfo{
+		Name:     defTempNamer.TempName("file"),
+		Flags:    0,
+		Modified: 0,
+		Blocks: []protocol.BlockInfo{
+			blocks[0], blocks[2], blocks[3], blocks[4],
+			blocks[0], blocks[0], blocks[7], blocks[0],
+		},
+	}
+
+	// Create target file
+	requiredFile := existingFile
+	requiredFile.Blocks = blocks[1:]
+	requiredFile.Name = "file2"
+
+	fcfg := config.FolderConfiguration{ID: "default", Path: "testdata"}
+	cfg := config.Configuration{Folders: []config.FolderConfiguration{fcfg}}
+
+	db, _ := leveldb.Open(storage.NewMemStorage(), nil)
+	m := NewModel(config.Wrap("/tmp/test", cfg), "device", "syncthing", "dev", db)
+	m.AddFolder(fcfg)
+	// Update index
+	m.updateLocal("default", existingFile)
+
+	iterFn := func(folder, file string, index uint32) bool {
+		return true
+	}
+
+	// Verify that the blocks we say exist on file, really exist in the db.
+	for _, idx := range []int{2, 3, 4, 7} {
+		if m.finder.Iterate(blocks[idx].Hash, iterFn) == false {
+			t.Error("Didn't find block")
+		}
+	}
+
+	p := Puller{
+		folder: "default",
+		dir:    "testdata",
+		model:  m,
+	}
+
+	copyChan := make(chan copyBlocksState)
+	pullChan := make(chan pullBlockState, 4)
+	finisherChan := make(chan *sharedPullerState, 1)
+
+	// Run a single fetcher routine
+	go p.copierRoutine(copyChan, pullChan, finisherChan)
+
+	p.handleFile(requiredFile, copyChan, finisherChan)
+
+	pulls := []pullBlockState{<-pullChan, <-pullChan, <-pullChan, <-pullChan}
+	finish := <-finisherChan
 
 	select {
 	case <-pullChan:
-		t.Error("Channel not empty!")
+		t.Fatal("Finisher channel has data to be read")
+	case <-finisherChan:
+		t.Fatal("Finisher channel has data to be read")
 	default:
 	}
 
-	if len(toCopy.blocks) != 2 {
-		t.Errorf("Unexpected count of copy blocks: %d != 2", len(toCopy.blocks))
+	// Verify that the right blocks went into the pull list
+	for i, eq := range []int{1, 5, 6, 8} {
+		if string(pulls[i].block.Hash) != string(blocks[eq].Hash) {
+			t.Errorf("Block %d mismatch: %s != %s", eq, pulls[i].block.String(), blocks[eq].String())
+		}
+		if string(finish.file.Blocks[eq-1].Hash) != string(blocks[eq].Hash) {
+			t.Errorf("Block %d mismatch: %s != %s", eq, finish.file.Blocks[eq-1].String(), blocks[eq].String())
+		}
 	}
 
-	for i, eq := range []int{5, 8} {
-		if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) {
-			t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String())
-		}
+	// Verify that the fetched blocks have actually been written to the temp file
+	blks, err := scanner.HashFile(tempFile, protocol.BlockSize)
+	if err != nil {
+		t.Log(err)
 	}
 
-	for i, eq := range []int{1, 6} {
-		if string(toPull[i].block.Hash) != string(blocks[eq].Hash) {
-			t.Errorf("Block mismatch: %s != %s", toPull[i].block.String(), blocks[eq].String())
+	for _, eq := range []int{2, 3, 4, 7} {
+		if string(blks[eq-1].Hash) != string(blocks[eq].Hash) {
+			t.Errorf("Block %d mismatch: %s != %s", eq, blks[eq-1].String(), blocks[eq].String())
 		}
 	}
+	finish.fd.Close()
+
+	os.Remove(tempFile)
 }

+ 27 - 6
internal/model/sharedpullerstate.go

@@ -31,13 +31,16 @@ type sharedPullerState struct {
 	folder   string
 	tempName string
 	realName string
-	reuse    bool
+	reused   int // Number of blocks reused from temporary file
 
 	// Mutable, must be locked for access
 	err        error      // The first error we hit
 	fd         *os.File   // The fd of the temp file
-	copyNeeded int        // Number of copy actions we expect to happen
-	pullNeeded int        // Number of block pulls we expect to happen
+	copyTotal  int        // Total number of copy actions for the whole job
+	pullTotal  int        // Total number of pull actions for the whole job
+	copyNeeded int        // Number of copy actions still pending
+	pullNeeded int        // Number of block pulls still pending
+	copyOrigin int        // Number of blocks copied from the original file
 	closed     bool       // Set when the file has been closed
 	mut        sync.Mutex // Protects the above
 }
@@ -79,7 +82,7 @@ func (s *sharedPullerState) tempFile() (*os.File, error) {
 
 	// Attempt to create the temp file
 	flags := os.O_WRONLY
-	if !s.reuse {
+	if s.reused == 0 {
 		flags |= os.O_CREATE | os.O_EXCL
 	}
 	fd, err := os.OpenFile(s.tempName, flags, 0644)
@@ -149,7 +152,25 @@ func (s *sharedPullerState) copyDone() {
 	s.mut.Lock()
 	s.copyNeeded--
 	if debug {
-		l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.pullNeeded)
+		l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
+	}
+	s.mut.Unlock()
+}
+
+func (s *sharedPullerState) copiedFromOrigin() {
+	s.mut.Lock()
+	s.copyOrigin++
+	s.mut.Unlock()
+}
+
+func (s *sharedPullerState) pullStarted() {
+	s.mut.Lock()
+	s.copyTotal--
+	s.copyNeeded--
+	s.pullTotal++
+	s.pullNeeded++
+	if debug {
+		l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
 	}
 	s.mut.Unlock()
 }
@@ -158,7 +179,7 @@ func (s *sharedPullerState) pullDone() {
 	s.mut.Lock()
 	s.pullNeeded--
 	if debug {
-		l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded ->", s.pullNeeded)
+		l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
 	}
 	s.mut.Unlock()
 }

+ 4 - 0
internal/protocol/message.go

@@ -54,6 +54,10 @@ func (f FileInfo) IsInvalid() bool {
 	return IsInvalid(f.Flags)
 }
 
+func (f FileInfo) IsDirectory() bool {
+	return IsDirectory(f.Flags)
+}
+
 // Used for unmarshalling a FileInfo structure but skipping the actual block list
 type FileInfoTruncated struct {
 	Name         string // max:8192