Browse Source

Implement BlockMap

Audrius Butkevicius 11 năm trước cách đây
mục cha
commit
435f9113e8

+ 201 - 0
internal/files/blockmap.go

@@ -0,0 +1,201 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see <http://www.gnu.org/licenses/>.
+
+// Package files provides a set type to track local/remote files with newness
+// checks. We must do a certain amount of normalization in here. We will get
+// fed paths with either native or wire-format separators and encodings
+// depending on who calls us. We transform paths to wire-format (NFC and
+// slashes) on the way to the database, and transform to native format
+// (varying separator and encoding) on the way back out.
+
+package files
+
+import (
+	"bytes"
+	"encoding/binary"
+	"sort"
+	"sync"
+
+	"github.com/syncthing/syncthing/internal/config"
+	"github.com/syncthing/syncthing/internal/protocol"
+
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/util"
+)
+
+var blockFinder *BlockFinder
+
+type BlockMap struct {
+	db     *leveldb.DB
+	folder string
+}
+
+func NewBlockMap(db *leveldb.DB, folder string) *BlockMap {
+	return &BlockMap{
+		db:     db,
+		folder: folder,
+	}
+}
+
+// Add files to the block map, ignoring any deleted or invalid files.
+func (m *BlockMap) Add(files []protocol.FileInfo) error {
+	batch := new(leveldb.Batch)
+	buf := make([]byte, 4)
+	for _, file := range files {
+		if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() {
+			continue
+		}
+
+		for i, block := range file.Blocks {
+			binary.BigEndian.PutUint32(buf, uint32(i))
+			batch.Put(m.blockKey(block.Hash, file.Name), buf)
+		}
+	}
+	return m.db.Write(batch, nil)
+}
+
+// Update block map state, removing any deleted or invalid files.
+func (m *BlockMap) Update(files []protocol.FileInfo) error {
+	batch := new(leveldb.Batch)
+	buf := make([]byte, 4)
+	for _, file := range files {
+		if file.IsDirectory() {
+			continue
+		}
+
+		if file.IsDeleted() || file.IsInvalid() {
+			for _, block := range file.Blocks {
+				batch.Delete(m.blockKey(block.Hash, file.Name))
+			}
+			continue
+		}
+
+		for i, block := range file.Blocks {
+			binary.BigEndian.PutUint32(buf, uint32(i))
+			batch.Put(m.blockKey(block.Hash, file.Name), buf)
+		}
+	}
+	return m.db.Write(batch, nil)
+}
+
+// Drop block map, removing all entries related to this block map from the db.
+func (m *BlockMap) Drop() error {
+	batch := new(leveldb.Batch)
+	iter := m.db.NewIterator(util.BytesPrefix(m.blockKey(nil, "")[:1+64]), nil)
+	defer iter.Release()
+	for iter.Next() {
+		batch.Delete(iter.Key())
+	}
+	if iter.Error() != nil {
+		return iter.Error()
+	}
+	return m.db.Write(batch, nil)
+}
+
+func (m *BlockMap) blockKey(hash []byte, file string) []byte {
+	return toBlockKey(hash, m.folder, file)
+}
+
+type BlockFinder struct {
+	db      *leveldb.DB
+	folders []string
+	mut     sync.RWMutex
+}
+
+func NewBlockFinder(db *leveldb.DB, cfg *config.ConfigWrapper) *BlockFinder {
+	if blockFinder != nil {
+		return blockFinder
+	}
+
+	f := &BlockFinder{
+		db: db,
+	}
+	f.Changed(cfg.Raw())
+	cfg.Subscribe(f)
+	return f
+}
+
+// Implements config.Handler interface
+func (f *BlockFinder) Changed(cfg config.Configuration) error {
+	folders := make([]string, len(cfg.Folders))
+	for i, folder := range cfg.Folders {
+		folders[i] = folder.ID
+	}
+
+	sort.Strings(folders)
+
+	f.mut.Lock()
+	f.folders = folders
+	f.mut.Unlock()
+
+	return nil
+}
+
+// An iterator function which iterates over all matching blocks for the given
+// hash. The iterator function has to return either true (if they are happy with
+// the block) or false to continue iterating for whatever reason.
+// The iterator finally returns the result, whether or not a satisfying block
+// was eventually found.
+func (f *BlockFinder) Iterate(hash []byte, iterFn func(string, string, uint32) bool) bool {
+	f.mut.RLock()
+	folders := f.folders
+	f.mut.RUnlock()
+	for _, folder := range folders {
+		key := toBlockKey(hash, folder, "")
+		iter := f.db.NewIterator(util.BytesPrefix(key), nil)
+		defer iter.Release()
+
+		for iter.Next() && iter.Error() == nil {
+			folder, file := fromBlockKey(iter.Key())
+			index := binary.BigEndian.Uint32(iter.Value())
+			if iterFn(folder, nativeFilename(file), index) {
+				return true
+			}
+		}
+	}
+	return false
+}
+
+// m.blockKey returns a byte slice encoding the following information:
+//	   keyTypeBlock (1 byte)
+//	   folder (64 bytes)
+//	   block hash (64 bytes)
+//	   file name (variable size)
+func toBlockKey(hash []byte, folder, file string) []byte {
+	o := make([]byte, 1+64+64+len(file))
+	o[0] = keyTypeBlock
+	copy(o[1:], []byte(folder))
+	copy(o[1+64:], []byte(hash))
+	copy(o[1+64+64:], []byte(file))
+	return o
+}
+
+func fromBlockKey(data []byte) (string, string) {
+	if len(data) < 1+64+64+1 {
+		panic("Incorrect key length")
+	}
+	if data[0] != keyTypeBlock {
+		panic("Incorrect key type")
+	}
+
+	file := string(data[1+64+64:])
+
+	slice := data[1 : 1+64]
+	izero := bytes.IndexByte(slice, 0)
+	if izero > -1 {
+		return string(slice[:izero]), file
+	}
+	return string(slice), file
+}

+ 235 - 0
internal/files/blockmap_test.go

@@ -0,0 +1,235 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package files
+
+import (
+	"testing"
+
+	"github.com/syncthing/syncthing/internal/config"
+	"github.com/syncthing/syncthing/internal/protocol"
+
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+func genBlocks(n int) []protocol.BlockInfo {
+	b := make([]protocol.BlockInfo, n)
+	for i := range b {
+		h := make([]byte, 32)
+		for j := range h {
+			h[j] = byte(i + j)
+		}
+		b[i].Size = uint32(i)
+		b[i].Hash = h
+	}
+	return b
+}
+
+var f1, f2, f3 protocol.FileInfo
+
+func init() {
+	blocks := genBlocks(30)
+
+	f1 = protocol.FileInfo{
+		Name:   "f1",
+		Blocks: blocks[:10],
+	}
+
+	f2 = protocol.FileInfo{
+		Name:   "f2",
+		Blocks: blocks[10:20],
+	}
+
+	f3 = protocol.FileInfo{
+		Name:   "f3",
+		Blocks: blocks[20:],
+	}
+}
+
+func setup() (*leveldb.DB, *BlockFinder) {
+	// Setup
+
+	db, err := leveldb.Open(storage.NewMemStorage(), nil)
+	if err != nil {
+		panic(err)
+	}
+
+	wrapper := config.Wrap("", config.Configuration{})
+	wrapper.SetFolder(config.FolderConfiguration{
+		ID: "folder1",
+	})
+	wrapper.SetFolder(config.FolderConfiguration{
+		ID: "folder2",
+	})
+
+	return db, NewBlockFinder(db, wrapper)
+}
+
+func dbEmpty(db *leveldb.DB) bool {
+	iter := db.NewIterator(nil, nil)
+	defer iter.Release()
+	if iter.Next() {
+		return false
+	}
+	return true
+}
+
+func TestBlockMapAddUpdateWipe(t *testing.T) {
+	db, f := setup()
+
+	if !dbEmpty(db) {
+		t.Fatal("db not empty")
+	}
+
+	m := NewBlockMap(db, "folder1")
+
+	f3.Flags |= protocol.FlagDirectory
+
+	err := m.Add([]protocol.FileInfo{f1, f2, f3})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		if folder != "folder1" || file != "f1" || index != 0 {
+			t.Fatal("Mismatch")
+		}
+		return true
+	})
+
+	f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		if folder != "folder1" || file != "f2" || index != 0 {
+			t.Fatal("Mismatch")
+		}
+		return true
+	})
+
+	f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		t.Fatal("Unexpected block")
+		return true
+	})
+
+	f3.Flags = f1.Flags
+	f1.Flags |= protocol.FlagDeleted
+	f2.Flags |= protocol.FlagInvalid
+
+	// Should remove
+	err = m.Update([]protocol.FileInfo{f1, f2, f3})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		t.Fatal("Unexpected block")
+		return false
+	})
+
+	f.Iterate(f2.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		t.Fatal("Unexpected block")
+		return false
+	})
+
+	f.Iterate(f3.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		if folder != "folder1" || file != "f3" || index != 0 {
+			t.Fatal("Mismatch")
+		}
+		return true
+	})
+
+	err = m.Drop()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !dbEmpty(db) {
+		t.Fatal("db not empty")
+	}
+
+	// Should not add
+	err = m.Add([]protocol.FileInfo{f1, f2})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !dbEmpty(db) {
+		t.Fatal("db not empty")
+	}
+
+	f1.Flags = 0
+	f2.Flags = 0
+	f3.Flags = 0
+}
+
+func TestBlockMapFinderLookup(t *testing.T) {
+	db, f := setup()
+
+	m1 := NewBlockMap(db, "folder1")
+	m2 := NewBlockMap(db, "folder2")
+
+	err := m1.Add([]protocol.FileInfo{f1})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = m2.Add([]protocol.FileInfo{f1})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	counter := 0
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		counter++
+		switch counter {
+		case 1:
+			if folder != "folder1" || file != "f1" || index != 0 {
+				t.Fatal("Mismatch")
+			}
+		case 2:
+			if folder != "folder2" || file != "f1" || index != 0 {
+				t.Fatal("Mismatch")
+			}
+		default:
+			t.Fatal("Unexpected block")
+		}
+		return false
+	})
+	if counter != 2 {
+		t.Fatal("Incorrect count", counter)
+	}
+
+	f1.Flags |= protocol.FlagDeleted
+
+	err = m1.Update([]protocol.FileInfo{f1})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	counter = 0
+	f.Iterate(f1.Blocks[0].Hash, func(folder, file string, index uint32) bool {
+		counter++
+		switch counter {
+		case 1:
+			if folder != "folder2" || file != "f1" || index != 0 {
+				t.Fatal("Mismatch")
+			}
+		default:
+			t.Fatal("Unexpected block")
+		}
+		return false
+	})
+	if counter != 1 {
+		t.Fatal("Incorrect count")
+	}
+}

+ 1 - 0
internal/files/leveldb.go

@@ -49,6 +49,7 @@ func clock(v uint64) uint64 {
 const (
 	keyTypeDevice = iota
 	keyTypeGlobal
+	keyTypeBlock
 )
 
 type fileVersion struct {

+ 4 - 0
internal/protocol/message.go

@@ -54,6 +54,10 @@ func (f FileInfo) IsInvalid() bool {
 	return IsInvalid(f.Flags)
 }
 
+func (f FileInfo) IsDirectory() bool {
+	return IsDirectory(f.Flags)
+}
+
 // Used for unmarshalling a FileInfo structure but skipping the actual block list
 type FileInfoTruncated struct {
 	Name         string // max:8192