Pārlūkot izejas kodu

lib/model: Use semaphore to limit concurrent folder writes (fixes #6541) (#6573)

Jakob Borg 5 gadi atpakaļ
vecāks
revīzija
6c73617974

+ 1 - 0
lib/config/folderconfiguration.go

@@ -57,6 +57,7 @@ type FolderConfiguration struct {
 	MarkerName              string                      `xml:"markerName" json:"markerName"`
 	CopyOwnershipFromParent bool                        `xml:"copyOwnershipFromParent" json:"copyOwnershipFromParent"`
 	RawModTimeWindowS       int                         `xml:"modTimeWindowS" json:"modTimeWindowS"`
+	MaxConcurrentWrites     int                         `xml:"maxConcurrentWrites" json:"maxConcurrentWrites" default:"2"`
 
 	cachedFilesystem    fs.Filesystem
 	cachedModTimeWindow time.Duration

+ 15 - 5
lib/model/folder_sendrecv.go

@@ -9,6 +9,7 @@ package model
 import (
 	"bytes"
 	"fmt"
+	"io"
 	"path/filepath"
 	"runtime"
 	"sort"
@@ -102,7 +103,8 @@ type sendReceiveFolder struct {
 	fs        fs.Filesystem
 	versioner versioner.Versioner
 
-	queue *jobQueue
+	queue        *jobQueue
+	writeLimiter *byteSemaphore
 
 	pullErrors    map[string]string // errors for most recent/current iteration
 	oldPullErrors map[string]string // errors from previous iterations for log filtering only
@@ -115,6 +117,7 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche
 		fs:            fs,
 		versioner:     ver,
 		queue:         newJobQueue(),
+		writeLimiter:  newByteSemaphore(cfg.MaxConcurrentWrites),
 		pullErrorsMut: sync.NewMutex(),
 	}
 	f.folder.puller = f
@@ -1261,10 +1264,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 					return true
 				}
 
-				_, err = dstFd.WriteAt(buf, block.Offset)
+				_, err = f.limitedWriteAt(dstFd, buf, block.Offset)
 				if err != nil {
 					state.fail(errors.Wrap(err, "dst write"))
-
 				}
 				if offset == block.Offset {
 					state.copiedFromOrigin()
@@ -1297,7 +1299,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 						return false
 					}
 
-					_, err = dstFd.WriteAt(buf, block.Offset)
+					_, err = f.limitedWriteAt(dstFd, buf, block.Offset)
 					if err != nil {
 						state.fail(errors.Wrap(err, "dst write"))
 					}
@@ -1446,7 +1448,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
 		}
 
 		// Save the block data we got from the cluster
-		_, err = fd.WriteAt(buf, state.block.Offset)
+		_, err = f.limitedWriteAt(fd, buf, state.block.Offset)
 		if err != nil {
 			state.fail(errors.Wrap(err, "save"))
 		} else {
@@ -1936,6 +1938,14 @@ func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) er
 	return inWritableDir(fn, f.fs, path, f.IgnorePerms)
 }
 
+func (f *sendReceiveFolder) limitedWriteAt(fd io.WriterAt, data []byte, offset int64) (int, error) {
+	if err := f.writeLimiter.takeWithContext(f.ctx, 1); err != nil {
+		return 0, err
+	}
+	defer f.writeLimiter.give(1)
+	return fd.WriteAt(data, offset)
+}
+
 // A []FileError is sent as part of an event and will be JSON serialized.
 type FileError struct {
 	Path string `json:"path"`

+ 1 - 0
lib/model/folder_sendrecv_test.go

@@ -107,6 +107,7 @@ func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFol
 		},
 
 		queue:         newJobQueue(),
+		writeLimiter:  newByteSemaphore(2),
 		pullErrors:    make(map[string]string),
 		pullErrorsMut: sync.NewMutex(),
 	}