Browse Source

lib/model: Don't info log repeat pull errors (#6149)

Simon Frei 6 năm trước cách đây
mục cha
commit
0d14ee4142

+ 3 - 3
lib/model/fakeconns_test.go

@@ -33,7 +33,7 @@ type fakeConnection struct {
 	folder                   string
 	model                    *model
 	indexFn                  func(string, []protocol.FileInfo)
-	requestFn                func(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
+	requestFn                func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
 	closeFn                  func(error)
 	mut                      sync.Mutex
 }
@@ -82,11 +82,11 @@ func (f *fakeConnection) IndexUpdate(folder string, fs []protocol.FileInfo) erro
 	return nil
 }
 
-func (f *fakeConnection) Request(folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
+func (f *fakeConnection) Request(ctx context.Context, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
 	f.mut.Lock()
 	defer f.mut.Unlock()
 	if f.requestFn != nil {
-		return f.requestFn(folder, name, offset, size, hash, fromTemporary)
+		return f.requestFn(ctx, folder, name, offset, size, hash, fromTemporary)
 	}
 	return f.fileData[name], nil
 }

+ 4 - 2
lib/model/folder.go

@@ -15,6 +15,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/pkg/errors"
+
 	"github.com/syncthing/syncthing/lib/config"
 	"github.com/syncthing/syncthing/lib/db"
 	"github.com/syncthing/syncthing/lib/events"
@@ -278,7 +280,7 @@ func (f *folder) getHealthError() error {
 	dbPath := locations.Get(locations.Database)
 	if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
 		if err = config.CheckFreeSpace(f.model.cfg.Options().MinHomeDiskFree, usage); err != nil {
-			return fmt.Errorf("insufficient space on disk for database (%v): %v", dbPath, err)
+			return errors.Wrapf(err, "insufficient space on disk for database (%v)", dbPath)
 		}
 	}
 
@@ -297,7 +299,7 @@ func (f *folder) scanSubdirs(subDirs []string) error {
 
 	oldHash := f.ignores.Hash()
 	if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
-		err = fmt.Errorf("loading ignores: %v", err)
+		err = errors.Wrap(err, "loading ignores")
 		f.setError(err)
 		return err
 	}

+ 31 - 17
lib/model/folder_sendrecv.go

@@ -104,7 +104,8 @@ type sendReceiveFolder struct {
 
 	queue *jobQueue
 
-	pullErrors    map[string]string // path -> error string
+	pullErrors    map[string]string // errors for most recent/current iteration
+	oldPullErrors map[string]string // errors from previous iterations for log filtering only
 	pullErrorsMut sync.Mutex
 }
 
@@ -169,7 +170,7 @@ func (f *sendReceiveFolder) pull() bool {
 		}
 	}()
 	if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
-		err = fmt.Errorf("loading ignores: %v", err)
+		err = errors.Wrap(err, "loading ignores")
 		f.setError(err)
 		return false
 	}
@@ -210,9 +211,10 @@ func (f *sendReceiveFolder) pull() bool {
 	}
 
 	f.pullErrorsMut.Lock()
-	hasPullErrs := len(f.pullErrors) > 0
+	pullErrNum := len(f.pullErrors)
 	f.pullErrorsMut.Unlock()
-	if hasPullErrs {
+	if pullErrNum > 0 {
+		l.Infof("%v: Failed to sync %v items", f.Description(), pullErrNum)
 		f.evLogger.Log(events.FolderErrors, map[string]interface{}{
 			"folder": f.folderID,
 			"errors": f.Errors(),
@@ -227,6 +229,11 @@ func (f *sendReceiveFolder) pull() bool {
 // might have failed). One puller iteration handles all files currently
 // flagged as needed in the folder.
 func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
+	f.pullErrorsMut.Lock()
+	f.oldPullErrors = f.pullErrors
+	f.pullErrors = make(map[string]string)
+	f.pullErrorsMut.Unlock()
+
 	pullChan := make(chan pullBlockState)
 	copyChan := make(chan copyBlocksState)
 	finisherChan := make(chan *sharedPullerState)
@@ -269,9 +276,6 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
 		doneWg.Done()
 	}()
 
-	// Clear out all previous errors
-	f.clearPullErrors()
-
 	changed, fileDeletions, dirDeletions, err := f.processNeeded(dbUpdateChan, copyChan, scanChan)
 
 	// Signal copy and puller routines that we are done with the in data for
@@ -294,6 +298,10 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int {
 	close(dbUpdateChan)
 	updateWg.Wait()
 
+	f.pullErrorsMut.Lock()
+	f.oldPullErrors = nil
+	f.pullErrorsMut.Unlock()
+
 	return changed
 }
 
@@ -1435,7 +1443,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
 		select {
 		case <-f.ctx.Done():
 			state.fail(errors.Wrap(f.ctx.Err(), "folder stopped"))
-			return
+			break
 		default:
 		}
 
@@ -1458,7 +1466,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
 		// leastBusy can select another device when someone else asks.
 		activity.using(selected)
 		var buf []byte
-		buf, lastError = f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, state.block.WeakHash, selected.FromTemporary)
+		buf, lastError = f.model.requestGlobal(f.ctx, selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, state.block.WeakHash, selected.FromTemporary)
 		activity.done(selected)
 		if lastError != nil {
 			l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
@@ -1757,6 +1765,11 @@ func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan cha
 }
 
 func (f *sendReceiveFolder) newPullError(path string, err error) {
+	if errors.Cause(err) == f.ctx.Err() {
+		// Error because the folder stopped - no point logging/tracking
+		return
+	}
+
 	f.pullErrorsMut.Lock()
 	defer f.pullErrorsMut.Unlock()
 
@@ -1767,18 +1780,19 @@ func (f *sendReceiveFolder) newPullError(path string, err error) {
 		return
 	}
 
-	l.Infof("Puller (folder %s, item %q): %v", f.Description(), path, err)
-
 	// Establish context to differentiate from errors while scanning.
 	// Use "syncing" as opposed to "pulling" as the latter might be used
 	// for errors occurring specificly in the puller routine.
-	f.pullErrors[path] = fmt.Sprintln("syncing:", err)
-}
+	errStr := fmt.Sprintln("syncing:", err)
+	f.pullErrors[path] = errStr
 
-func (f *sendReceiveFolder) clearPullErrors() {
-	f.pullErrorsMut.Lock()
-	f.pullErrors = make(map[string]string)
-	f.pullErrorsMut.Unlock()
+	if oldErr, ok := f.oldPullErrors[path]; ok && oldErr == errStr {
+		l.Debugf("Repeat error on puller (folder %s, item %q): %v", f.Description(), path, err)
+		delete(f.oldPullErrors, path) // Potential repeats are now caught by f.pullErrors itself
+		return
+	}
+
+	l.Infof("Puller (folder %s, item %q): %v", f.Description(), path, err)
 }
 
 func (f *sendReceiveFolder) Errors() []FileError {

+ 4 - 8
lib/model/model.go

@@ -8,6 +8,7 @@ package model
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -431,18 +432,13 @@ func (m *model) stopFolder(cfg config.FolderConfiguration, err error) {
 	tokens := m.folderRunnerTokens[cfg.ID]
 	m.fmut.RUnlock()
 
-	// Close connections to affected devices
-	// Must happen before stopping the folder service to abort ongoing
-	// transmissions and thus allow timely service termination.
-	w := m.closeConns(cfg.DeviceIDs(), err)
-
 	for _, id := range tokens {
 		m.RemoveAndWait(id, 0)
 	}
 
 	// Wait for connections to stop to ensure that no more calls to methods
 	// expecting this folder to exist happen (e.g. .IndexUpdate).
-	w.Wait()
+	m.closeConns(cfg.DeviceIDs(), err).Wait()
 }
 
 // Need to hold lock on m.fmut when calling this.
@@ -2103,7 +2099,7 @@ func (s *indexSender) sendIndexTo() error {
 	return err
 }
 
-func (m *model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
+func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
 	m.pmut.RLock()
 	nc, ok := m.conn[deviceID]
 	m.pmut.RUnlock()
@@ -2114,7 +2110,7 @@ func (m *model) requestGlobal(deviceID protocol.DeviceID, folder, name string, o
 
 	l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x wh=%x ft=%t", m, deviceID, folder, name, offset, size, hash, weakHash, fromTemporary)
 
-	return nc.Request(folder, name, offset, size, hash, weakHash, fromTemporary)
+	return nc.Request(ctx, folder, name, offset, size, hash, weakHash, fromTemporary)
 }
 
 func (m *model) ScanFolders() map[string]error {

+ 2 - 1
lib/model/model_test.go

@@ -8,6 +8,7 @@ package model
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	"io"
@@ -254,7 +255,7 @@ func BenchmarkRequestOut(b *testing.B) {
 
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, 0, false)
+		data, err := m.requestGlobal(context.Background(), device1, "default", files[i%n].Name, 0, 32, nil, 0, false)
 		if err != nil {
 			b.Error(err)
 		}

+ 4 - 3
lib/model/requests_test.go

@@ -8,6 +8,7 @@ package model
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"io/ioutil"
 	"os"
@@ -136,7 +137,7 @@ func TestSymlinkTraversalWrite(t *testing.T) {
 			}
 		}
 	}
-	fc.requestFn = func(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
+	fc.requestFn = func(_ context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
 		if name != "symlink" && strings.HasPrefix(name, "symlink") {
 			badReq <- name
 		}
@@ -411,7 +412,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
 	}
 	// Make sure pulling doesn't interfere, as index updates are racy and
 	// thus we cannot distinguish between scan and pull results.
-	fc.requestFn = func(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
+	fc.requestFn = func(_ context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
 		return nil, nil
 	}
 	fc.mut.Unlock()
@@ -996,7 +997,7 @@ func TestNeedFolderFiles(t *testing.T) {
 
 	errPreventSync := errors.New("you aren't getting any of this")
 	fc.mut.Lock()
-	fc.requestFn = func(string, string, int64, int, []byte, bool) ([]byte, error) {
+	fc.requestFn = func(context.Context, string, string, int64, int, []byte, bool) ([]byte, error) {
 		return nil, errPreventSync
 	}
 	fc.mut.Unlock()

+ 3 - 2
lib/protocol/benchmark_test.go

@@ -3,6 +3,7 @@
 package protocol
 
 import (
+	"context"
 	"crypto/tls"
 	"encoding/binary"
 	"net"
@@ -80,9 +81,9 @@ func benchmarkRequestsConnPair(b *testing.B, conn0, conn1 net.Conn) {
 		// Use c0 and c1 for each alternating request, so we get as much
 		// data flowing in both directions.
 		if i%2 == 0 {
-			buf, err = c0.Request("folder", "file", int64(i), 128<<10, nil, 0, false)
+			buf, err = c0.Request(context.Background(), "folder", "file", int64(i), 128<<10, nil, 0, false)
 		} else {
-			buf, err = c1.Request("folder", "file", int64(i), 128<<10, nil, 0, false)
+			buf, err = c1.Request(context.Background(), "folder", "file", int64(i), 128<<10, nil, 0, false)
 		}
 
 		if err != nil {

+ 20 - 14
lib/protocol/protocol.go

@@ -3,6 +3,7 @@
 package protocol
 
 import (
+	"context"
 	"crypto/sha256"
 	"encoding/binary"
 	"errors"
@@ -136,7 +137,7 @@ type Connection interface {
 	Name() string
 	Index(folder string, files []FileInfo) error
 	IndexUpdate(folder string, files []FileInfo) error
-	Request(folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
+	Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
 	ClusterConfig(config ClusterConfig)
 	DownloadProgress(folder string, updates []FileDownloadProgressUpdate)
 	Statistics() Statistics
@@ -255,7 +256,7 @@ func (c *rawConnection) Index(folder string, idx []FileInfo) error {
 	default:
 	}
 	c.idxMut.Lock()
-	c.send(&Index{
+	c.send(context.TODO(), &Index{
 		Folder: folder,
 		Files:  idx,
 	}, nil)
@@ -271,7 +272,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
 	default:
 	}
 	c.idxMut.Lock()
-	c.send(&IndexUpdate{
+	c.send(context.TODO(), &IndexUpdate{
 		Folder: folder,
 		Files:  idx,
 	}, nil)
@@ -280,7 +281,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
 }
 
 // Request returns the bytes for the specified block after fetching them from the connected peer.
-func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
+func (c *rawConnection) Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
 	c.nextIDMut.Lock()
 	id := c.nextID
 	c.nextID++
@@ -294,7 +295,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
 	c.awaiting[id] = rc
 	c.awaitingMut.Unlock()
 
-	ok := c.send(&Request{
+	ok := c.send(ctx, &Request{
 		ID:            id,
 		Folder:        folder,
 		Name:          name,
@@ -308,11 +309,15 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
 		return nil, ErrClosed
 	}
 
-	res, ok := <-rc
-	if !ok {
-		return nil, ErrClosed
+	select {
+	case res, ok := <-rc:
+		if !ok {
+			return nil, ErrClosed
+		}
+		return res.val, res.err
+	case <-ctx.Done():
+		return nil, ctx.Err()
 	}
-	return res.val, res.err
 }
 
 // ClusterConfig sends the cluster configuration message to the peer.
@@ -336,14 +341,14 @@ func (c *rawConnection) Closed() bool {
 
 // DownloadProgress sends the progress updates for the files that are currently being downloaded.
 func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate) {
-	c.send(&DownloadProgress{
+	c.send(context.TODO(), &DownloadProgress{
 		Folder:  folder,
 		Updates: updates,
 	}, nil)
 }
 
 func (c *rawConnection) ping() bool {
-	return c.send(&Ping{}, nil)
+	return c.send(context.Background(), &Ping{}, nil)
 }
 
 func (c *rawConnection) readerLoop() {
@@ -613,14 +618,14 @@ func checkFilename(name string) error {
 func (c *rawConnection) handleRequest(req Request) {
 	res, err := c.receiver.Request(c.id, req.Folder, req.Name, req.Size, req.Offset, req.Hash, req.WeakHash, req.FromTemporary)
 	if err != nil {
-		c.send(&Response{
+		c.send(context.Background(), &Response{
 			ID:   req.ID,
 			Code: errorToCode(err),
 		}, nil)
 		return
 	}
 	done := make(chan struct{})
-	c.send(&Response{
+	c.send(context.Background(), &Response{
 		ID:   req.ID,
 		Data: res.Data(),
 		Code: errorToCode(nil),
@@ -639,12 +644,13 @@ func (c *rawConnection) handleResponse(resp Response) {
 	c.awaitingMut.Unlock()
 }
 
-func (c *rawConnection) send(msg message, done chan struct{}) bool {
+func (c *rawConnection) send(ctx context.Context, msg message, done chan struct{}) bool {
 	select {
 	case c.outbox <- asyncMessage{msg, done}:
 		return true
 	case <-c.preventSends:
 	case <-c.closed:
+	case <-ctx.Done():
 	}
 	if done != nil {
 		close(done)

+ 3 - 2
lib/protocol/protocol_test.go

@@ -4,6 +4,7 @@ package protocol
 
 import (
 	"bytes"
+	"context"
 	"crypto/sha256"
 	"encoding/hex"
 	"encoding/json"
@@ -77,7 +78,7 @@ func TestClose(t *testing.T) {
 	c0.Index("default", nil)
 	c0.Index("default", nil)
 
-	if _, err := c0.Request("default", "foo", 0, 0, nil, 0, false); err == nil {
+	if _, err := c0.Request(context.Background(), "default", "foo", 0, 0, nil, 0, false); err == nil {
 		t.Error("Request should return an error")
 	}
 }
@@ -194,7 +195,7 @@ func TestClusterConfigFirst(t *testing.T) {
 	c.ClusterConfig(ClusterConfig{})
 
 	done := make(chan struct{})
-	if ok := c.send(&Ping{}, done); !ok {
+	if ok := c.send(context.Background(), &Ping{}, done); !ok {
 		t.Fatal("send ping after cluster config returned false")
 	}
 	select {

+ 3 - 2
lib/protocol/wireformat.go

@@ -3,6 +3,7 @@
 package protocol
 
 import (
+	"context"
 	"path/filepath"
 
 	"golang.org/x/text/unicode/norm"
@@ -34,7 +35,7 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error {
 	return c.Connection.IndexUpdate(folder, myFs)
 }
 
-func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
+func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
 	name = norm.NFC.String(filepath.ToSlash(name))
-	return c.Connection.Request(folder, name, offset, size, hash, weakHash, fromTemporary)
+	return c.Connection.Request(ctx, folder, name, offset, size, hash, weakHash, fromTemporary)
 }