Browse Source

Fast parallel file hasher (fixes #293)

Jakob Borg 11 years ago
parent
commit
2be1218aa3
3 changed files with 91 additions and 32 deletions
  1. 65 0
      scanner/blockqueue.go
  2. 10 32
      scanner/walk.go
  3. 16 0
      scanner/walk_test.go

+ 65 - 0
scanner/blockqueue.go

@@ -0,0 +1,65 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package scanner
+
+import (
+	"os"
+	"path/filepath"
+	"sync"
+
+	"github.com/calmh/syncthing/protocol"
+)
+
+// The parallell hasher reads FileInfo structures from the inbox, hashes the
+// file to populate the Blocks element and sends it to the outbox. A number of
+// workers are used in parallel. The outbox will become closed when the inbox
+// is closed and all items handled.
+
+func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo) {
+	var wg sync.WaitGroup
+	wg.Add(workers)
+
+	for i := 0; i < workers; i++ {
+		go func() {
+			hashFile(dir, blockSize, outbox, inbox)
+			wg.Done()
+		}()
+	}
+
+	go func() {
+		wg.Wait()
+		close(outbox)
+	}()
+}
+
+func hashFile(dir string, blockSize int, outbox, inbox chan protocol.FileInfo) {
+	for f := range inbox {
+		if protocol.IsDirectory(f.Flags) || protocol.IsDeleted(f.Flags) {
+			outbox <- f
+			continue
+		}
+
+		fd, err := os.Open(filepath.Join(dir, f.Name))
+		if err != nil {
+			if debug {
+				l.Debugln("open:", err)
+			}
+			continue
+		}
+
+		blocks, err := Blocks(fd, blockSize)
+		fd.Close()
+
+		if err != nil {
+			if debug {
+				l.Debugln("hash error:", f.Name, err)
+			}
+			continue
+		}
+
+		f.Blocks = blocks
+		outbox <- f
+	}
+}

+ 10 - 32
scanner/walk.go

@@ -13,7 +13,6 @@ import (
 	"path/filepath"
 	"runtime"
 	"strings"
-	"time"
 	"code.google.com/p/go.text/unicode/norm"
 
 	"github.com/calmh/syncthing/lamport"
@@ -60,18 +59,20 @@ type CurrentFiler interface {
 
 // Walk returns the list of files found in the local repository by scanning the
 // file system. Files are blockwise hashed.
-func (w *Walker) Walk() (files chan protocol.FileInfo, ignore map[string][]string, err error) {
+func (w *Walker) Walk() (chan protocol.FileInfo, map[string][]string, error) {
 	if debug {
 		l.Debugln("Walk", w.Dir, w.BlockSize, w.IgnoreFile)
 	}
 
-	err = checkDir(w.Dir)
+	err := checkDir(w.Dir)
 	if err != nil {
-		return
+		return nil, nil, err
 	}
 
-	ignore = make(map[string][]string)
-	files = make(chan protocol.FileInfo)
+	ignore := make(map[string][]string)
+	files := make(chan protocol.FileInfo)
+	hashedFiles := make(chan protocol.FileInfo)
+	newParallelHasher(w.Dir, w.BlockSize, runtime.NumCPU(), hashedFiles, files)
 	hashFiles := w.walkAndHashFiles(files, ignore)
 
 	go func() {
@@ -80,7 +81,7 @@ func (w *Walker) Walk() (files chan protocol.FileInfo, ignore map[string][]strin
 		close(files)
 	}()
 
-	return
+	return hashedFiles, ignore, nil
 }
 
 // CleanTempFiles removes all files that match the temporary filename pattern.
@@ -219,40 +220,17 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo, ign map[string][
 				}
 			}
 
-			fd, err := os.Open(p)
-			if err != nil {
-				if debug {
-					l.Debugln("open:", p, err)
-				}
-				return nil
-			}
-			defer fd.Close()
-
-			t0 := time.Now()
-			blocks, err := Blocks(fd, w.BlockSize)
-			if err != nil {
-				if debug {
-					l.Debugln("hash error:", rn, err)
-				}
-				return nil
-			}
-			if debug {
-				t1 := time.Now()
-				l.Debugln("hashed:", rn, ";", len(blocks), "blocks;", info.Size(), "bytes;", int(float64(info.Size())/1024/t1.Sub(t0).Seconds()), "KB/s")
-			}
-
 			var flags = uint32(info.Mode() & os.ModePerm)
 			if w.IgnorePerms {
 				flags = protocol.FlagNoPermBits | 0666
 			}
-			f := protocol.FileInfo{
+
+			fchan <- protocol.FileInfo{
 				Name:     rn,
 				Version:  lamport.Default.Tick(0),
 				Flags:    flags,
 				Modified: info.ModTime().Unix(),
-				Blocks:   blocks,
 			}
-			fchan <- f
 		}
 
 		return nil

+ 16 - 0
scanner/walk_test.go

@@ -7,6 +7,7 @@ package scanner
 import (
 	"fmt"
 	"reflect"
+	"sort"
 	"testing"
 	"time"
 
@@ -39,6 +40,7 @@ func TestWalk(t *testing.T) {
 	for f := range fchan {
 		files = append(files, f)
 	}
+	sort.Sort(fileList(files))
 
 	if err != nil {
 		t.Fatal(err)
@@ -133,3 +135,17 @@ func TestIgnore(t *testing.T) {
 		}
 	}
 }
+
+type fileList []protocol.FileInfo
+
+func (f fileList) Len() int {
+	return len(f)
+}
+
+func (f fileList) Less(a, b int) bool {
+	return f[a].Name < f[b].Name
+}
+
+func (f fileList) Swap(a, b int) {
+	f[a], f[b] = f[b], f[a]
+}