Browse Source

Add validation cache

Jakob Borg 10 years ago
parent
commit
3f59d6daff
3 changed files with 190 additions and 82 deletions
  1. 90 50
      internal/model/model.go
  2. 68 10
      test/transfer-bench_test.go
  3. 32 22
      test/util.go

+ 90 - 50
internal/model/model.go

@@ -38,10 +38,12 @@ import (
 
 // How many files to send in each Index/IndexUpdate message.
 const (
-	indexTargetSize   = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
-	indexPerFileSize  = 250        // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
-	IndexPerBlockSize = 40         // Each BlockInfo is approximately this big
-	indexBatchSize    = 1000       // Either way, don't include more files than this
+	indexTargetSize        = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
+	indexPerFileSize       = 250        // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
+	indexPerBlockSize      = 40         // Each BlockInfo is approximately this big
+	indexBatchSize         = 1000       // Either way, don't include more files than this
+	reqValidationTime      = time.Hour  // How long to cache validation entries for Request messages
+	reqValidationCacheSize = 1000       // How many entries to aim for in the validation cache size
 )
 
 type service interface {
@@ -86,6 +88,9 @@ type Model struct {
 
 	addedFolder bool
 	started     bool
+
+	reqValidationCache map[string]time.Time // folder / file name => time when confirmed to exist
+	rvmut              sync.RWMutex         // protects reqValidationCache
 }
 
 var (
@@ -97,29 +102,31 @@ var (
 // for file data without altering the local folder in any way.
 func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model {
 	m := &Model{
-		cfg:             cfg,
-		db:              ldb,
-		finder:          db.NewBlockFinder(ldb, cfg),
-		progressEmitter: NewProgressEmitter(cfg),
-		id:              id,
-		shortID:         id.Short(),
-		deviceName:      deviceName,
-		clientName:      clientName,
-		clientVersion:   clientVersion,
-		folderCfgs:      make(map[string]config.FolderConfiguration),
-		folderFiles:     make(map[string]*db.FileSet),
-		folderDevices:   make(map[string][]protocol.DeviceID),
-		deviceFolders:   make(map[protocol.DeviceID][]string),
-		deviceStatRefs:  make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
-		folderIgnores:   make(map[string]*ignore.Matcher),
-		folderRunners:   make(map[string]service),
-		folderStatRefs:  make(map[string]*stats.FolderStatisticsReference),
-		protoConn:       make(map[protocol.DeviceID]protocol.Connection),
-		rawConn:         make(map[protocol.DeviceID]io.Closer),
-		deviceVer:       make(map[protocol.DeviceID]string),
-
-		fmut: sync.NewRWMutex(),
-		pmut: sync.NewRWMutex(),
+		cfg:                cfg,
+		db:                 ldb,
+		finder:             db.NewBlockFinder(ldb, cfg),
+		progressEmitter:    NewProgressEmitter(cfg),
+		id:                 id,
+		shortID:            id.Short(),
+		deviceName:         deviceName,
+		clientName:         clientName,
+		clientVersion:      clientVersion,
+		folderCfgs:         make(map[string]config.FolderConfiguration),
+		folderFiles:        make(map[string]*db.FileSet),
+		folderDevices:      make(map[string][]protocol.DeviceID),
+		deviceFolders:      make(map[protocol.DeviceID][]string),
+		deviceStatRefs:     make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
+		folderIgnores:      make(map[string]*ignore.Matcher),
+		folderRunners:      make(map[string]service),
+		folderStatRefs:     make(map[string]*stats.FolderStatisticsReference),
+		protoConn:          make(map[protocol.DeviceID]protocol.Connection),
+		rawConn:            make(map[protocol.DeviceID]io.Closer),
+		deviceVer:          make(map[protocol.DeviceID]string),
+		reqValidationCache: make(map[string]time.Time),
+
+		fmut:  sync.NewRWMutex(),
+		pmut:  sync.NewRWMutex(),
+		rvmut: sync.NewRWMutex(),
 	}
 	if cfg.Options().ProgressUpdateIntervalS > -1 {
 		go m.progressEmitter.Serve()
@@ -729,33 +736,61 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
 		return nil, fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags)
 	}
 
-	// Verify that the requested file exists in the local model.
-	m.fmut.RLock()
-	folderFiles, ok := m.folderFiles[folder]
-	m.fmut.RUnlock()
+	// Verify that the requested file exists in the local model. We only need
+	// to validate this file if we haven't done so recently, so we keep a
+	// cache of successfull results. "Recently" can be quite a long time, as
+	// we remove validation cache entries when we detect local changes. If
+	// we're out of sync here and the file actually doesn't exist any more, or
+	// has shrunk or something, then we'll anyway get a read error that we
+	// pass on to the other side.
 
-	if !ok {
-		l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder)
-		return nil, protocol.ErrNoSuchFile
-	}
+	m.rvmut.RLock()
+	validated := m.reqValidationCache[folder+"/"+name]
+	m.rvmut.RUnlock()
 
-	lf, ok := folderFiles.Get(protocol.LocalDeviceID, name)
-	if !ok {
-		return nil, protocol.ErrNoSuchFile
-	}
+	if time.Since(validated) > reqValidationTime {
+		m.fmut.RLock()
+		folderFiles, ok := m.folderFiles[folder]
+		m.fmut.RUnlock()
 
-	if lf.IsInvalid() || lf.IsDeleted() {
-		if debug {
-			l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf)
+		if !ok {
+			l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder)
+			return nil, protocol.ErrNoSuchFile
 		}
-		return nil, protocol.ErrInvalid
-	}
 
-	if offset > lf.Size() {
-		if debug {
-			l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size)
+		// This call is really expensive for large files, as we load the full
+		// block list which may be megabytes and megabytes of data to allocate
+		// space for, read, and deserialize.
+		lf, ok := folderFiles.Get(protocol.LocalDeviceID, name)
+		if !ok {
+			return nil, protocol.ErrNoSuchFile
 		}
-		return nil, protocol.ErrNoSuchFile
+
+		if lf.IsInvalid() || lf.IsDeleted() {
+			if debug {
+				l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf)
+			}
+			return nil, protocol.ErrInvalid
+		}
+
+		if offset > lf.Size() {
+			if debug {
+				l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size)
+			}
+			return nil, protocol.ErrNoSuchFile
+		}
+
+		m.rvmut.Lock()
+		m.reqValidationCache[folder+"/"+name] = time.Now()
+		if len(m.reqValidationCache) > reqValidationCacheSize {
+			// Don't let the cache grow infinitely
+			for name, validated := range m.reqValidationCache {
+				if time.Since(validated) > time.Minute {
+					delete(m.reqValidationCache, name)
+				}
+			}
+		}
+		m.rvmut.Unlock()
 	}
 
 	if debug && deviceID != protocol.LocalDeviceID {
@@ -767,7 +802,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
 
 	var reader io.ReaderAt
 	var err error
-	if lf.IsSymlink() {
+	if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 {
 		target, _, err := symlinks.Read(fn)
 		if err != nil {
 			return nil, err
@@ -1048,7 +1083,7 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold
 		}
 
 		batch = append(batch, f)
-		currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
+		currentBatchSize += indexPerFileSize + len(f.Blocks)*indexPerBlockSize
 		return true
 	})
 
@@ -1071,6 +1106,11 @@ func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) {
 	m.fmut.RLock()
 	m.folderFiles[folder].Update(protocol.LocalDeviceID, fs)
 	m.fmut.RUnlock()
+	m.rvmut.Lock()
+	for _, f := range fs {
+		delete(m.reqValidationCache, folder+"/"+f.Name)
+	}
+	m.rvmut.Unlock()
 
 	events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
 		"folder": folder,

+ 68 - 10
test/transfer-bench_test.go

@@ -10,6 +10,8 @@ package integration
 
 import (
 	"log"
+	"os"
+	"runtime"
 	"syscall"
 	"testing"
 	"time"
@@ -19,8 +21,23 @@ func TestBenchmarkTransferManyFiles(t *testing.T) {
 	benchmarkTransfer(t, 50000, 15)
 }
 
-func TestBenchmarkTransferLargeFiles(t *testing.T) {
-	benchmarkTransfer(t, 200, 28)
+func TestBenchmarkTransferLargeFile1G(t *testing.T) {
+	benchmarkTransfer(t, 1, 30)
+}
+func TestBenchmarkTransferLargeFile2G(t *testing.T) {
+	benchmarkTransfer(t, 1, 31)
+}
+func TestBenchmarkTransferLargeFile4G(t *testing.T) {
+	benchmarkTransfer(t, 1, 32)
+}
+func TestBenchmarkTransferLargeFile8G(t *testing.T) {
+	benchmarkTransfer(t, 1, 33)
+}
+func TestBenchmarkTransferLargeFile16G(t *testing.T) {
+	benchmarkTransfer(t, 1, 34)
+}
+func TestBenchmarkTransferLargeFile32G(t *testing.T) {
+	benchmarkTransfer(t, 1, 35)
 }
 
 func benchmarkTransfer(t *testing.T, files, sizeExp int) {
@@ -31,7 +48,20 @@ func benchmarkTransfer(t *testing.T, files, sizeExp int) {
 	}
 
 	log.Println("Generating files...")
-	err = generateFiles("s1", files, sizeExp, "../LICENSE")
+	if files == 1 {
+		// Special case. Generate one file with the specified size exactly.
+		fd, err := os.Open("../LICENSE")
+		if err != nil {
+			t.Fatal(err)
+		}
+		err = os.MkdirAll("s1", 0755)
+		if err != nil {
+			t.Fatal(err)
+		}
+		err = generateOneFile(fd, "s1/onefile", 1<<uint(sizeExp))
+	} else {
+		err = generateFiles("s1", files, sizeExp, "../LICENSE")
+	}
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -39,6 +69,15 @@ func benchmarkTransfer(t *testing.T, files, sizeExp int) {
 	if err != nil {
 		t.Fatal(err)
 	}
+	var total int64
+	var nfiles int
+	for _, f := range expected {
+		total += f.size
+		if f.mode.IsRegular() {
+			nfiles++
+		}
+	}
+	log.Printf("Total %.01f MiB in %d files", float64(total)/1024/1024, nfiles)
 
 	log.Println("Starting sender...")
 	sender := syncthingProcess{ // id1
@@ -116,8 +155,11 @@ loop:
 		time.Sleep(250 * time.Millisecond)
 	}
 
-	sender.stop()
-	proc, err := receiver.stop()
+	sendProc, err := sender.stop()
+	if err != nil {
+		t.Fatal(err)
+	}
+	recvProc, err := receiver.stop()
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -134,10 +176,26 @@ loop:
 	}
 
 	log.Println("Result: Wall time:", t1.Sub(t0))
-
-	if rusage, ok := proc.SysUsage().(*syscall.Rusage); ok {
-		log.Println("Result: Utime:", time.Duration(rusage.Utime.Nano()))
-		log.Println("Result: Stime:", time.Duration(rusage.Stime.Nano()))
-		log.Println("Result: MaxRSS:", rusage.Maxrss/1024, "KiB")
+	log.Printf("Result: %.1f MiB/s synced", float64(total)/1024/1024/t1.Sub(t0).Seconds())
+
+	if rusage, ok := recvProc.SysUsage().(*syscall.Rusage); ok {
+		log.Println("Receiver: Utime:", time.Duration(rusage.Utime.Nano()))
+		log.Println("Receiver: Stime:", time.Duration(rusage.Stime.Nano()))
+		if runtime.GOOS == "darwin" {
+			// Darwin reports in bytes, Linux seems to report in KiB even
+			// though the manpage says otherwise.
+			rusage.Maxrss /= 1024
+		}
+		log.Println("Receiver: MaxRSS:", rusage.Maxrss, "KiB")
+	}
+	if rusage, ok := sendProc.SysUsage().(*syscall.Rusage); ok {
+		log.Println("Sender: Utime:", time.Duration(rusage.Utime.Nano()))
+		log.Println("Sender: Stime:", time.Duration(rusage.Stime.Nano()))
+		if runtime.GOOS == "darwin" {
+			// Darwin reports in bytes, Linux seems to report in KiB even
+			// though the manpage says otherwise.
+			rusage.Maxrss /= 1024
+		}
+		log.Println("Sender: MaxRSS:", rusage.Maxrss, "KiB")
 	}
 }

+ 32 - 22
test/util.go

@@ -60,38 +60,46 @@ func generateFiles(dir string, files, maxexp int, srcname string) error {
 			log.Fatal(err)
 		}
 
-		s := 1 << uint(rand.Intn(maxexp))
-		a := 128 * 1024
+		p1 := filepath.Join(p0, n)
+
+		s := int64(1 << uint(rand.Intn(maxexp)))
+		a := int64(128 * 1024)
 		if a > s {
 			a = s
 		}
-		s += rand.Intn(a)
+		s += rand.Int63n(a)
 
-		src := io.LimitReader(&inifiteReader{fd}, int64(s))
-
-		p1 := filepath.Join(p0, n)
-		dst, err := os.Create(p1)
-		if err != nil {
+		if err := generateOneFile(fd, p1, s); err != nil {
 			return err
 		}
+	}
 
-		_, err = io.Copy(dst, src)
-		if err != nil {
-			return err
-		}
+	return nil
+}
 
-		err = dst.Close()
-		if err != nil {
-			return err
-		}
+func generateOneFile(fd io.ReadSeeker, p1 string, s int64) error {
+	src := io.LimitReader(&inifiteReader{fd}, int64(s))
+	dst, err := os.Create(p1)
+	if err != nil {
+		return err
+	}
 
-		_ = os.Chmod(p1, os.FileMode(rand.Intn(0777)|0400))
+	_, err = io.Copy(dst, src)
+	if err != nil {
+		return err
+	}
 
-		t := time.Now().Add(-time.Duration(rand.Intn(30*86400)) * time.Second)
-		err = os.Chtimes(p1, t, t)
-		if err != nil {
-			return err
-		}
+	err = dst.Close()
+	if err != nil {
+		return err
+	}
+
+	_ = os.Chmod(p1, os.FileMode(rand.Intn(0777)|0400))
+
+	t := time.Now().Add(-time.Duration(rand.Intn(30*86400)) * time.Second)
+	err = os.Chtimes(p1, t, t)
+	if err != nil {
+		return err
 	}
 
 	return nil
@@ -367,6 +375,7 @@ type fileInfo struct {
 	mode os.FileMode
 	mod  int64
 	hash [16]byte
+	size int64
 }
 
 func (f fileInfo) String() string {
@@ -428,6 +437,7 @@ func startWalker(dir string, res chan<- fileInfo, abort <-chan struct{}) chan er
 				name: rn,
 				mode: info.Mode(),
 				mod:  info.ModTime().Unix(),
+				size: info.Size(),
 			}
 			sum, err := md5file(path)
 			if err != nil {