|
|
@@ -16,6 +16,7 @@ import (
|
|
|
"time"
|
|
|
"unicode/utf8"
|
|
|
|
|
|
+ "github.com/rcrowley/go-metrics"
|
|
|
"github.com/syncthing/syncthing/lib/db"
|
|
|
"github.com/syncthing/syncthing/lib/events"
|
|
|
"github.com/syncthing/syncthing/lib/osutil"
|
|
|
@@ -143,7 +144,11 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
|
|
|
// which it receives the files we ask it to hash.
|
|
|
go func() {
|
|
|
var filesToHash []protocol.FileInfo
|
|
|
- var total, progress int64 = 1, 0
|
|
|
+ var total int64 = 1
|
|
|
+
|
|
|
+ progress := newByteCounter()
|
|
|
+ defer progress.Close()
|
|
|
+
|
|
|
for file := range toHashChan {
|
|
|
filesToHash = append(filesToHash, file)
|
|
|
total += int64(file.CachedSize)
|
|
|
@@ -151,7 +156,7 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
|
|
|
|
|
|
realToHashChan := make(chan protocol.FileInfo)
|
|
|
done := make(chan struct{})
|
|
|
- newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, &progress, done, w.Cancel)
|
|
|
+ newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, progress, done, w.Cancel)
|
|
|
|
|
|
// A routine which actually emits the FolderScanProgress events
|
|
|
// every w.ProgressTicker ticks, until the hasher routines terminate.
|
|
|
@@ -163,12 +168,14 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
|
|
|
ticker.Stop()
|
|
|
return
|
|
|
case <-ticker.C:
|
|
|
- current := atomic.LoadInt64(&progress)
|
|
|
- l.Debugf("Walk %s %s current progress %d/%d (%d%%)", w.Dir, w.Subs, current, total, current*100/total)
|
|
|
+ current := progress.Total()
|
|
|
+ rate := progress.Rate()
|
|
|
+ l.Debugf("Walk %s %s current progress %d/%d at %.01f MB/s (%d%%)", w.Dir, w.Subs, current, total, rate/1024/1024, current*100/total)
|
|
|
events.Default.Log(events.FolderScanProgress, map[string]interface{}{
|
|
|
"folder": w.Folder,
|
|
|
"current": current,
|
|
|
"total": total,
|
|
|
+ "rate": rate, // bytes per second
|
|
|
})
|
|
|
case <-w.Cancel:
|
|
|
ticker.Stop()
|
|
|
@@ -492,3 +499,48 @@ func SymlinkFlags(t symlinks.TargetType) uint32 {
|
|
|
}
|
|
|
panic("unknown symlink TargetType")
|
|
|
}
|
|
|
+
|
|
|
+// A byteCounter gets bytes added to it via Update() and then provides the
|
|
|
+// Total() and one minute moving average Rate() in bytes per second.
|
|
|
+type byteCounter struct {
|
|
|
+ total int64
|
|
|
+ metrics.EWMA
|
|
|
+ stop chan struct{}
|
|
|
+}
|
|
|
+
|
|
|
+func newByteCounter() *byteCounter {
|
|
|
+ c := &byteCounter{
|
|
|
+ EWMA: metrics.NewEWMA1(), // a one minute exponentially weighted moving average
|
|
|
+ stop: make(chan struct{}),
|
|
|
+ }
|
|
|
+ go c.ticker()
|
|
|
+ return c
|
|
|
+}
|
|
|
+
|
|
|
+func (c *byteCounter) ticker() {
|
|
|
+ // The metrics.EWMA expects clock ticks every five seconds in order to
|
|
|
+ // decay the average properly.
|
|
|
+ t := time.NewTicker(5 * time.Second)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-t.C:
|
|
|
+ c.Tick()
|
|
|
+ case <-c.stop:
|
|
|
+ t.Stop()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *byteCounter) Update(bytes int64) {
|
|
|
+ atomic.AddInt64(&c.total, bytes)
|
|
|
+ c.EWMA.Update(bytes)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *byteCounter) Total() int64 {
|
|
|
+ return atomic.LoadInt64(&c.total)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *byteCounter) Close() {
|
|
|
+ close(c.stop)
|
|
|
+}
|