Browse Source

lib/model: Add global request limiter (fixes #6302) (#6303)

This adds a new config with the simple and concise name
maxConcurrentIncomingRequestKiB. This limits how many bytes we have "in
the air" in the form of response data being read and processed.

After some testing I think that not having this limiter is seldom a
great idea and thus I propose a default value of 256 MiB for this new
setting.

I also refactored the folder IO limiter to be a model/folder attribute
instead of a package global.
Jakob Borg 5 years ago
parent
commit
55937b61ca

+ 25 - 0
lib/config/optionsconfiguration.go

@@ -10,6 +10,7 @@ import (
 	"fmt"
 	"runtime"
 
+	"github.com/syncthing/syncthing/lib/protocol"
 	"github.com/syncthing/syncthing/lib/rand"
 	"github.com/syncthing/syncthing/lib/util"
 )
@@ -60,6 +61,7 @@ type OptionsConfiguration struct {
 	StunKeepaliveMinS       int      `xml:"stunKeepaliveMinS" json:"stunKeepaliveMinS" default:"20"`      // 0 for off
 	RawStunServers          []string `xml:"stunServer" json:"stunServers" default:"default"`
 	DatabaseTuning          Tuning   `xml:"databaseTuning" json:"databaseTuning" restart:"true"`
+	RawMaxCIRequestKiB      int      `xml:"maxConcurrentIncomingRequestKiB" json:"maxConcurrentIncomingRequestKiB"`
 
 	DeprecatedUPnPEnabled        bool     `xml:"upnpEnabled,omitempty" json:"-"`
 	DeprecatedUPnPLeaseM         int      `xml:"upnpLeaseMinutes,omitempty" json:"-"`
@@ -175,3 +177,26 @@ func (opts OptionsConfiguration) MaxFolderConcurrency() int {
 	// of writing is two, 95-percentile at 12 folders.)
 	return 4 // https://xkcd.com/221/
 }
+
+func (opts OptionsConfiguration) MaxConcurrentIncomingRequestKiB() int {
+	// Negative is disabled, which in limiter land is spelled zero
+	if opts.RawMaxCIRequestKiB < 0 {
+		return 0
+	}
+
+	if opts.RawMaxFolderConcurrency == 0 {
+		// The default is 256 MiB
+		return 256 * 1024 // KiB
+	}
+
+	// We can't really do less than a couple of concurrent blocks or we'll
+	// pretty much stall completely. Check that an explicit value is large
+	// enough.
+	const minAllowed = 2 * protocol.MaxBlockSize / 1024
+	if opts.RawMaxCIRequestKiB < minAllowed {
+		return minAllowed
+	}
+
+	// Roll with it.
+	return opts.RawMaxCIRequestKiB
+}

+ 6 - 0
lib/model/bytesemaphore.go

@@ -18,6 +18,9 @@ type byteSemaphore struct {
 }
 
 func newByteSemaphore(max int) *byteSemaphore {
+	if max < 0 {
+		max = 0
+	}
 	s := byteSemaphore{
 		max:       max,
 		available: max,
@@ -56,6 +59,9 @@ func (s *byteSemaphore) give(bytes int) {
 }
 
 func (s *byteSemaphore) setCapacity(cap int) {
+	if cap < 0 {
+		cap = 0
+	}
 	s.mut.Lock()
 	diff := cap - s.max
 	s.max = cap

+ 7 - 9
lib/model/folder.go

@@ -33,15 +33,12 @@ import (
 	"github.com/thejerf/suture"
 )
 
-// folderIOLimiter limits the number of concurrent I/O heavy operations,
-// such as scans and pulls. A limit of zero means no limit.
-var folderIOLimiter = newByteSemaphore(0)
-
 type folder struct {
 	suture.Service
 	stateTracker
 	config.FolderConfiguration
 	*stats.FolderStatisticsReference
+	ioLimiter *byteSemaphore
 
 	localFlags uint32
 
@@ -79,11 +76,12 @@ type puller interface {
 	pull() bool // true when successfull and should not be retried
 }
 
-func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger) folder {
+func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *byteSemaphore) folder {
 	return folder{
 		stateTracker:              newStateTracker(cfg.ID, evLogger),
 		FolderConfiguration:       cfg,
 		FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID),
+		ioLimiter:                 ioLimiter,
 
 		model:   model,
 		shortID: model.shortID,
@@ -303,8 +301,8 @@ func (f *folder) pull() bool {
 	f.setState(FolderSyncWaiting)
 	defer f.setState(FolderIdle)
 
-	folderIOLimiter.take(1)
-	defer folderIOLimiter.give(1)
+	f.ioLimiter.take(1)
+	defer f.ioLimiter.give(1)
 
 	return f.puller.pull()
 }
@@ -342,8 +340,8 @@ func (f *folder) scanSubdirs(subDirs []string) error {
 	f.setError(nil)
 	f.setState(FolderScanWaiting)
 
-	folderIOLimiter.take(1)
-	defer folderIOLimiter.give(1)
+	f.ioLimiter.take(1)
+	defer f.ioLimiter.give(1)
 
 	for i := range subDirs {
 		sub := osutil.NativeFilename(subDirs[i])

+ 2 - 2
lib/model/folder_recvonly.go

@@ -57,8 +57,8 @@ type receiveOnlyFolder struct {
 	*sendReceiveFolder
 }
 
-func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service {
-	sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs, evLogger).(*sendReceiveFolder)
+func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
+	sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs, evLogger, ioLimiter).(*sendReceiveFolder)
 	sr.localFlags = protocol.FlagLocalReceiveOnly // gets propagated to the scanner, and set on locally changed files
 	return &receiveOnlyFolder{sr}
 }

+ 2 - 2
lib/model/folder_sendonly.go

@@ -25,9 +25,9 @@ type sendOnlyFolder struct {
 	folder
 }
 
-func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger) service {
+func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
 	f := &sendOnlyFolder{
-		folder: newFolder(model, fset, ignores, cfg, evLogger),
+		folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
 	}
 	f.folder.puller = f
 	f.folder.Service = util.AsService(f.serve, f.String())

+ 2 - 2
lib/model/folder_sendrecv.go

@@ -109,9 +109,9 @@ type sendReceiveFolder struct {
 	pullErrorsMut sync.Mutex
 }
 
-func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service {
+func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
 	f := &sendReceiveFolder{
-		folder:        newFolder(model, fset, ignores, cfg, evLogger),
+		folder:        newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
 		fs:            fs,
 		versioner:     ver,
 		queue:         newJobQueue(),

+ 67 - 42
lib/model/model.go

@@ -121,6 +121,12 @@ type model struct {
 	protectedFiles    []string
 	evLogger          events.Logger
 
+	// globalRequestLimiter limits the amount of data in concurrent incoming requests
+	globalRequestLimiter *byteSemaphore
+	// folderIOLimiter limits the number of concurrent I/O heavy operations,
+	// such as scans and pulls. A limit of zero means no limit.
+	folderIOLimiter *byteSemaphore
+
 	clientName    string
 	clientVersion string
 
@@ -145,7 +151,7 @@ type model struct {
 	foldersRunning int32 // for testing only
 }
 
-type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger) service
+type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger, *byteSemaphore) service
 
 var (
 	folderFactories = make(map[config.FolderType]folderFactory)
@@ -177,38 +183,39 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
 			},
 			PassThroughPanics: true,
 		}),
-		cfg:                 cfg,
-		db:                  ldb,
-		finder:              db.NewBlockFinder(ldb),
-		progressEmitter:     NewProgressEmitter(cfg, evLogger),
-		id:                  id,
-		shortID:             id.Short(),
-		cacheIgnoredFiles:   cfg.Options().CacheIgnoredFiles,
-		protectedFiles:      protectedFiles,
-		evLogger:            evLogger,
-		clientName:          clientName,
-		clientVersion:       clientVersion,
-		folderCfgs:          make(map[string]config.FolderConfiguration),
-		folderFiles:         make(map[string]*db.FileSet),
-		deviceStatRefs:      make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
-		folderIgnores:       make(map[string]*ignore.Matcher),
-		folderRunners:       make(map[string]service),
-		folderRunnerTokens:  make(map[string][]suture.ServiceToken),
-		folderVersioners:    make(map[string]versioner.Versioner),
-		conn:                make(map[protocol.DeviceID]connections.Connection),
-		connRequestLimiters: make(map[protocol.DeviceID]*byteSemaphore),
-		closed:              make(map[protocol.DeviceID]chan struct{}),
-		helloMessages:       make(map[protocol.DeviceID]protocol.HelloResult),
-		deviceDownloads:     make(map[protocol.DeviceID]*deviceDownloadState),
-		remotePausedFolders: make(map[protocol.DeviceID][]string),
-		fmut:                sync.NewRWMutex(),
-		pmut:                sync.NewRWMutex(),
+		cfg:                  cfg,
+		db:                   ldb,
+		finder:               db.NewBlockFinder(ldb),
+		progressEmitter:      NewProgressEmitter(cfg, evLogger),
+		id:                   id,
+		shortID:              id.Short(),
+		cacheIgnoredFiles:    cfg.Options().CacheIgnoredFiles,
+		protectedFiles:       protectedFiles,
+		evLogger:             evLogger,
+		globalRequestLimiter: newByteSemaphore(1024 * cfg.Options().MaxConcurrentIncomingRequestKiB()),
+		folderIOLimiter:      newByteSemaphore(cfg.Options().MaxFolderConcurrency()),
+		clientName:           clientName,
+		clientVersion:        clientVersion,
+		folderCfgs:           make(map[string]config.FolderConfiguration),
+		folderFiles:          make(map[string]*db.FileSet),
+		deviceStatRefs:       make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
+		folderIgnores:        make(map[string]*ignore.Matcher),
+		folderRunners:        make(map[string]service),
+		folderRunnerTokens:   make(map[string][]suture.ServiceToken),
+		folderVersioners:     make(map[string]versioner.Versioner),
+		conn:                 make(map[protocol.DeviceID]connections.Connection),
+		connRequestLimiters:  make(map[protocol.DeviceID]*byteSemaphore),
+		closed:               make(map[protocol.DeviceID]chan struct{}),
+		helloMessages:        make(map[protocol.DeviceID]protocol.HelloResult),
+		deviceDownloads:      make(map[protocol.DeviceID]*deviceDownloadState),
+		remotePausedFolders:  make(map[protocol.DeviceID][]string),
+		fmut:                 sync.NewRWMutex(),
+		pmut:                 sync.NewRWMutex(),
 	}
 	for devID := range cfg.Devices() {
 		m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String())
 	}
 	m.Add(m.progressEmitter)
-	folderIOLimiter.setCapacity(cfg.Options().MaxFolderConcurrency())
 
 	return m
 }
@@ -340,7 +347,7 @@ func (m *model) startFolderLocked(cfg config.FolderConfiguration) {
 
 	ignores := m.folderIgnores[folder]
 
-	p := folderFactory(m, fset, ignores, cfg, ver, ffs, m.evLogger)
+	p := folderFactory(m, fset, ignores, cfg, ver, ffs, m.evLogger, m.folderIOLimiter)
 
 	m.folderRunners[folder] = p
 
@@ -1500,12 +1507,10 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in
 	limiter := m.connRequestLimiters[deviceID]
 	m.pmut.RUnlock()
 
-	if limiter != nil {
-		limiter.take(int(size))
-	}
+	// The requestResponse releases the bytes to the buffer pool and the
+	// limiters when its Close method is called.
+	res := newLimitedRequestResponse(int(size), limiter, m.globalRequestLimiter)
 
-	// The requestResponse releases the bytes to the limiter when its Close method is called.
-	res := newRequestResponse(int(size))
 	defer func() {
 		// Close it ourselves if it isn't returned due to an error
 		if err != nil {
@@ -1513,13 +1518,6 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in
 		}
 	}()
 
-	if limiter != nil {
-		go func() {
-			res.Wait()
-			limiter.give(int(size))
-		}()
-	}
-
 	// Only check temp files if the flag is set, and if we are set to advertise
 	// the temp indexes.
 	if fromTemporary && !folderCfg.DisableTempIndexes {
@@ -1563,6 +1561,32 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in
 	return res, nil
 }
 
+// newLimitedRequestResponse takes size bytes from the limiters in order,
+// skipping nil limiters, then returns a requestResponse of the given size.
+// When the requestResponse is closed the limiters are given back the bytes,
+// in reverse order.
+func newLimitedRequestResponse(size int, limiters ...*byteSemaphore) *requestResponse {
+	for _, limiter := range limiters {
+		if limiter != nil {
+			limiter.take(size)
+		}
+	}
+
+	res := newRequestResponse(size)
+
+	go func() {
+		res.Wait()
+		for i := range limiters {
+			limiter := limiters[len(limiters)-1-i]
+			if limiter != nil {
+				limiter.give(size)
+			}
+		}
+	}()
+
+	return res
+}
+
 func (m *model) recheckFile(deviceID protocol.DeviceID, folderFs fs.Filesystem, folder, name string, size int32, offset int64, hash []byte) {
 	cf, ok := m.CurrentFolderFile(folder, name)
 	if !ok {
@@ -2483,7 +2507,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 	}
 	m.fmut.Unlock()
 
-	folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency())
+	m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB())
+	m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency())
 
 	// Some options don't require restart as those components handle it fine
 	// by themselves. Compare the options structs containing only the

+ 28 - 0
lib/model/model_test.go

@@ -3451,3 +3451,31 @@ func TestDeviceWasSeen(t *testing.T) {
 		t.Error("device should have been seen now")
 	}
 }
+
+func TestNewLimitedRequestResponse(t *testing.T) {
+	l0 := newByteSemaphore(0)
+	l1 := newByteSemaphore(1024)
+	l2 := (*byteSemaphore)(nil)
+
+	// Should take 500 bytes from any non-unlimited non-nil limiters.
+	res := newLimitedRequestResponse(500, l0, l1, l2)
+
+	if l1.available != 1024-500 {
+		t.Error("should have taken bytes from limited limiter")
+	}
+
+	// Closing the result should return the bytes.
+	res.Close()
+
+	// Try to take 1024 bytes to make sure the bytes were returned.
+	done := make(chan struct{})
+	go func() {
+		l1.take(1024)
+		close(done)
+	}()
+	select {
+	case <-done:
+	case <-time.After(time.Second):
+		t.Error("Bytes weren't returned in a timely fashion")
+	}
+}