Browse Source

lib/config, lib/model: Auto adjust pullers based on desired amount of pending data (#4748)

This makes the number of pullers vary with the desired amount of outstanding requests instead of being a fixed number.
Jakob Borg 7 years ago
parent
commit
42cc64e2ed

+ 15 - 1
lib/config/config.go

@@ -32,7 +32,7 @@ import (
 
 const (
 	OldestHandledVersion = 10
-	CurrentVersion       = 26
+	CurrentVersion       = 27
 	MaxRescanIntervalS   = 365 * 24 * 60 * 60
 )
 
@@ -312,6 +312,9 @@ func (cfg *Configuration) clean() error {
 	if cfg.Version == 25 {
 		convertV25V26(cfg)
 	}
+	if cfg.Version == 26 {
+		convertV26V27(cfg)
+	}
 
 	// Build a list of available devices
 	existingDevices := make(map[protocol.DeviceID]bool)
@@ -371,6 +374,17 @@ func (cfg *Configuration) clean() error {
 	return nil
 }
 
+func convertV26V27(cfg *Configuration) {
+	for i := range cfg.Folders {
+		f := &cfg.Folders[i]
+		if f.DeprecatedPullers != 0 {
+			f.PullerMaxPendingKiB = 128 * f.DeprecatedPullers
+			f.DeprecatedPullers = 0
+		}
+	}
+	cfg.Version = 27
+}
+
 func convertV25V26(cfg *Configuration) {
 	// triggers database update
 	cfg.Version = 26

+ 0 - 1
lib/config/config_test.go

@@ -107,7 +107,6 @@ func TestDeviceConfig(t *testing.T) {
 				FSWatcherEnabled: false,
 				FSWatcherDelayS:  10,
 				Copiers:          0,
-				Pullers:          0,
 				Hashers:          0,
 				AutoNormalize:    true,
 				MinDiskFree:      Size{1, "%"},

+ 2 - 1
lib/config/folderconfiguration.go

@@ -40,7 +40,7 @@ type FolderConfiguration struct {
 	MinDiskFree           Size                        `xml:"minDiskFree" json:"minDiskFree"`
 	Versioning            VersioningConfiguration     `xml:"versioning" json:"versioning"`
 	Copiers               int                         `xml:"copiers" json:"copiers"` // This defines how many files are handled concurrently.
-	Pullers               int                         `xml:"pullers" json:"pullers"` // Defines how many blocks are fetched at the same time, possibly between separate copier routines.
+	PullerMaxPendingKiB   int                         `xml:"pullerMaxPendingKiB" json:"pullerMaxPendingKiB"`
 	Hashers               int                         `xml:"hashers" json:"hashers"` // Less than one sets the value to the number of cores. These are CPU bound due to hashing.
 	Order                 PullOrder                   `xml:"order" json:"order"`
 	IgnoreDelete          bool                        `xml:"ignoreDelete" json:"ignoreDelete"`
@@ -57,6 +57,7 @@ type FolderConfiguration struct {
 
 	DeprecatedReadOnly       bool    `xml:"ro,attr,omitempty" json:"-"`
 	DeprecatedMinDiskFreePct float64 `xml:"minDiskFreePct,omitempty" json:"-"`
+	DeprecatedPullers        int     `xml:"pullers,omitempty" json:"-"`
 }
 
 type FolderDeviceConfiguration struct {

+ 16 - 0
lib/config/testdata/v27.xml

@@ -0,0 +1,16 @@
+<configuration version="27">
+    <folder id="test" path="testdata" type="readonly" ignorePerms="false" rescanIntervalS="600" fsNotifications="false" notifyDelayS="10" autoNormalize="true">
+        <filesystemType>basic</filesystemType>
+        <device id="AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR"></device>
+        <device id="P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2"></device>
+        <minDiskFree unit="%">1</minDiskFree>
+        <maxConflicts>-1</maxConflicts>
+        <fsync>true</fsync>
+    </folder>
+    <device id="AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR" name="node one" compression="metadata">
+        <address>tcp://a</address>
+    </device>
+    <device id="P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2" name="node two" compression="metadata">
+        <address>tcp://b</address>
+    </device>
+</configuration>

+ 137 - 73
lib/model/rwfolder.go

@@ -15,6 +15,7 @@ import (
 	"runtime"
 	"sort"
 	"strings"
+	stdsync "sync"
 	"time"
 
 	"github.com/syncthing/syncthing/lib/config"
@@ -78,9 +79,10 @@ const (
 )
 
 const (
-	defaultCopiers      = 2
-	defaultPullers      = 64
-	defaultPullerPause  = 60 * time.Second
+	defaultCopiers          = 2
+	defaultPullerPause      = 60 * time.Second
+	defaultPullerPendingKiB = 8192 // must be larger than block size
+
 	maxPullerIterations = 3
 )
 
@@ -114,20 +116,23 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers
 		errorsMut: sync.NewMutex(),
 	}
 
-	f.configureCopiersAndPullers()
-
-	return f
-}
-
-func (f *sendReceiveFolder) configureCopiersAndPullers() {
 	if f.Copiers == 0 {
 		f.Copiers = defaultCopiers
 	}
-	if f.Pullers == 0 {
-		f.Pullers = defaultPullers
+
+	// If the configured max amount of pending data is zero, we use the
+	// default. If it's configured to something non-zero but less than the
+	// protocol block size we adjust it upwards accordingly.
+	if f.PullerMaxPendingKiB == 0 {
+		f.PullerMaxPendingKiB = defaultPullerPendingKiB
+	}
+	if blockSizeKiB := protocol.BlockSize / 1024; f.PullerMaxPendingKiB < blockSizeKiB {
+		f.PullerMaxPendingKiB = blockSizeKiB
 	}
 
 	f.pause = f.basePause()
+
+	return f
 }
 
 // Serve will run scans and pulls. It will return when Stop()ed or on a
@@ -317,7 +322,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
 	doneWg := sync.NewWaitGroup()
 	updateWg := sync.NewWaitGroup()
 
-	l.Debugln(f, "c", f.Copiers, "p", f.Pullers)
+	l.Debugln(f, "copiers:", f.Copiers, "pullerPendingKiB:", f.PullerMaxPendingKiB)
 
 	updateWg.Add(1)
 	go func() {
@@ -335,14 +340,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
 		}()
 	}
 
-	for i := 0; i < f.Pullers; i++ {
-		pullWg.Add(1)
-		go func() {
-			// pullerRoutine finishes when pullChan is closed
-			f.pullerRoutine(pullChan, finisherChan)
-			pullWg.Done()
-		}()
-	}
+	pullWg.Add(1)
+	go func() {
+		// pullerRoutine finishes when pullChan is closed
+		f.pullerRoutine(pullChan, finisherChan)
+		pullWg.Done()
+	}()
 
 	doneWg.Add(1)
 	// finisherRoutine finishes when finisherChan is closed
@@ -1340,76 +1343,99 @@ func verifyBuffer(buf []byte, block protocol.BlockInfo) ([]byte, error) {
 }
 
 func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
+	requestLimiter := newByteSemaphore(f.PullerMaxPendingKiB * 1024)
+	wg := sync.NewWaitGroup()
+
 	for state := range in {
 		if state.failed() != nil {
 			out <- state.sharedPullerState
 			continue
 		}
 
-		// Get an fd to the temporary file. Technically we don't need it until
-		// after fetching the block, but if we run into an error here there is
-		// no point in issuing the request to the network.
-		fd, err := state.tempFile()
-		if err != nil {
-			out <- state.sharedPullerState
-			continue
-		}
+		// The requestLimiter limits how many pending block requests we have
+		// ongoing at any given time, based on the size of the blocks
+		// themselves.
 
-		if !f.DisableSparseFiles && state.reused == 0 && state.block.IsEmpty() {
-			// There is no need to request a block of all zeroes. Pretend we
-			// requested it and handled it correctly.
-			state.pullDone(state.block)
-			out <- state.sharedPullerState
-			continue
-		}
+		state := state
+		bytes := int(state.block.Size)
 
-		var lastError error
-		candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block)
-		for {
-			// Select the least busy device to pull the block from. If we found no
-			// feasible device at all, fail the block (and in the long run, the
-			// file).
-			selected, found := activity.leastBusy(candidates)
-			if !found {
-				if lastError != nil {
-					state.fail("pull", lastError)
-				} else {
-					state.fail("pull", errNoDevice)
-				}
-				break
-			}
+		requestLimiter.take(bytes)
+		wg.Add(1)
 
-			candidates = removeAvailability(candidates, selected)
+		go func() {
+			defer wg.Done()
+			defer requestLimiter.give(bytes)
 
-			// Fetch the block, while marking the selected device as in use so that
-			// leastBusy can select another device when someone else asks.
-			activity.using(selected)
-			buf, lastError := f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
-			activity.done(selected)
-			if lastError != nil {
-				l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
-				continue
-			}
+			f.pullBlock(state, out)
+		}()
+	}
+	wg.Wait()
+}
 
-			// Verify that the received block matches the desired hash, if not
-			// try pulling it from another device.
-			_, lastError = verifyBuffer(buf, state.block)
-			if lastError != nil {
-				l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
-				continue
-			}
+func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPullerState) {
+	// Get an fd to the temporary file. Technically we don't need it until
+	// after fetching the block, but if we run into an error here there is
+	// no point in issuing the request to the network.
+	fd, err := state.tempFile()
+	if err != nil {
+		out <- state.sharedPullerState
+		return
+	}
 
-			// Save the block data we got from the cluster
-			_, err = fd.WriteAt(buf, state.block.Offset)
-			if err != nil {
-				state.fail("save", err)
+	if !f.DisableSparseFiles && state.reused == 0 && state.block.IsEmpty() {
+		// There is no need to request a block of all zeroes. Pretend we
+		// requested it and handled it correctly.
+		state.pullDone(state.block)
+		out <- state.sharedPullerState
+		return
+	}
+
+	var lastError error
+	candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block)
+	for {
+		// Select the least busy device to pull the block from. If we found no
+		// feasible device at all, fail the block (and in the long run, the
+		// file).
+		selected, found := activity.leastBusy(candidates)
+		if !found {
+			if lastError != nil {
+				state.fail("pull", lastError)
 			} else {
-				state.pullDone(state.block)
+				state.fail("pull", errNoDevice)
 			}
 			break
 		}
-		out <- state.sharedPullerState
+
+		candidates = removeAvailability(candidates, selected)
+
+		// Fetch the block, while marking the selected device as in use so that
+		// leastBusy can select another device when someone else asks.
+		activity.using(selected)
+		buf, lastError := f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
+		activity.done(selected)
+		if lastError != nil {
+			l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
+			continue
+		}
+
+		// Verify that the received block matches the desired hash, if not
+		// try pulling it from another device.
+		_, lastError = verifyBuffer(buf, state.block)
+		if lastError != nil {
+			l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
+			continue
+		}
+
+		// Save the block data we got from the cluster
+		_, err = fd.WriteAt(buf, state.block.Offset)
+		if err != nil {
+			state.fail("save", err)
+		} else {
+			state.pullDone(state.block)
+		}
+		break
 	}
+	out <- state.sharedPullerState
 }
 
 func (f *sendReceiveFolder) performFinish(ignores *ignore.Matcher, state *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
@@ -1899,3 +1925,41 @@ func componentCount(name string) int {
 	}
 	return count
 }
+
+type byteSemaphore struct {
+	max       int
+	available int
+	mut       stdsync.Mutex
+	cond      *stdsync.Cond
+}
+
+func newByteSemaphore(max int) *byteSemaphore {
+	s := byteSemaphore{
+		max:       max,
+		available: max,
+	}
+	s.cond = stdsync.NewCond(&s.mut)
+	return &s
+}
+
+func (s *byteSemaphore) take(bytes int) {
+	if bytes > s.max {
+		panic("bug: more than max bytes will never be available")
+	}
+	s.mut.Lock()
+	for bytes > s.available {
+		s.cond.Wait()
+	}
+	s.available -= bytes
+	s.mut.Unlock()
+}
+
+func (s *byteSemaphore) give(bytes int) {
+	s.mut.Lock()
+	if s.available+bytes > s.max {
+		panic("bug: can never give more than max")
+	}
+	s.available += bytes
+	s.cond.Broadcast()
+	s.mut.Unlock()
+}

+ 4 - 0
lib/model/rwfolder_test.go

@@ -17,6 +17,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/syncthing/syncthing/lib/config"
 	"github.com/syncthing/syncthing/lib/db"
 	"github.com/syncthing/syncthing/lib/fs"
 	"github.com/syncthing/syncthing/lib/ignore"
@@ -107,6 +108,9 @@ func setUpSendReceiveFolder(model *Model) *sendReceiveFolder {
 			model:               model,
 			initialScanFinished: make(chan struct{}),
 			ctx:                 context.TODO(),
+			FolderConfiguration: config.FolderConfiguration{
+				PullerMaxPendingKiB: defaultPullerPendingKiB,
+			},
 		},
 
 		fs:        fs.NewMtimeFS(fs.NewFilesystem(fs.FilesystemTypeBasic, "testdata"), db.NewNamespacedKV(model.db, "mtime")),

+ 0 - 2
test/h1/config.xml

@@ -8,7 +8,6 @@
         <minDiskFree unit="%">1</minDiskFree>
         <versioning></versioning>
         <copiers>1</copiers>
-        <pullers>16</pullers>
         <hashers>0</hashers>
         <order>random</order>
         <ignoreDelete>false</ignoreDelete>
@@ -28,7 +27,6 @@
         <minDiskFree unit="%">1</minDiskFree>
         <versioning></versioning>
         <copiers>1</copiers>
-        <pullers>16</pullers>
         <hashers>0</hashers>
         <order>random</order>
         <ignoreDelete>false</ignoreDelete>

+ 0 - 3
test/h2/config.xml

@@ -7,7 +7,6 @@
         <minDiskFree unit="%">1</minDiskFree>
         <versioning></versioning>
         <copiers>1</copiers>
-        <pullers>16</pullers>
         <hashers>0</hashers>
         <order>random</order>
         <ignoreDelete>false</ignoreDelete>
@@ -27,7 +26,6 @@
         <minDiskFree unit="%">1</minDiskFree>
         <versioning></versioning>
         <copiers>1</copiers>
-        <pullers>16</pullers>
         <hashers>0</hashers>
         <order>random</order>
         <ignoreDelete>false</ignoreDelete>
@@ -47,7 +45,6 @@
         <minDiskFree unit="%">1</minDiskFree>
         <versioning></versioning>
         <copiers>1</copiers>
-        <pullers>16</pullers>
         <hashers>0</hashers>
         <order>random</order>
         <ignoreDelete>false</ignoreDelete>

+ 0 - 2
test/h3/config.xml

@@ -9,7 +9,6 @@
             <param key="keep" val="5"></param>
         </versioning>
         <copiers>1</copiers>
-        <pullers>16</pullers>
         <hashers>0</hashers>
         <order>random</order>
         <ignoreDelete>false</ignoreDelete>
@@ -29,7 +28,6 @@
         <minDiskFree unit="%">1</minDiskFree>
         <versioning></versioning>
         <copiers>1</copiers>
-        <pullers>16</pullers>
         <hashers>0</hashers>
         <order>random</order>
         <ignoreDelete>false</ignoreDelete>