|
|
@@ -30,12 +30,6 @@ import (
|
|
|
|
|
|
// TODO: Stop on errors
|
|
|
|
|
|
-const (
|
|
|
- pauseIntv = 60 * time.Second
|
|
|
- nextPullIntv = 10 * time.Second
|
|
|
- shortPullIntv = time.Second
|
|
|
-)
|
|
|
-
|
|
|
// A pullBlockState is passed to the puller routine for each block that needs
|
|
|
// to be fetched.
|
|
|
type pullBlockState struct {
|
|
|
@@ -67,8 +61,10 @@ const (
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- defaultCopiers = 1
|
|
|
- defaultPullers = 16
|
|
|
+ defaultCopiers = 1
|
|
|
+ defaultPullers = 16
|
|
|
+ defaultPullerSleep = 10 * time.Second
|
|
|
+ defaultPullerPause = 60 * time.Second
|
|
|
)
|
|
|
|
|
|
type dbUpdateJob struct {
|
|
|
@@ -92,6 +88,8 @@ type rwFolder struct {
|
|
|
pullers int
|
|
|
shortID uint64
|
|
|
order config.PullOrder
|
|
|
+ sleep time.Duration
|
|
|
+ pause time.Duration
|
|
|
|
|
|
stop chan struct{}
|
|
|
queue *jobQueue
|
|
|
@@ -128,7 +126,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
|
|
|
|
|
|
stop: make(chan struct{}),
|
|
|
queue: newJobQueue(),
|
|
|
- pullTimer: time.NewTimer(shortPullIntv),
|
|
|
+ pullTimer: time.NewTimer(time.Second),
|
|
|
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
|
|
|
delayScan: make(chan time.Duration),
|
|
|
scanNow: make(chan rescanRequest),
|
|
|
@@ -144,6 +142,18 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
|
|
|
p.pullers = defaultPullers
|
|
|
}
|
|
|
|
|
|
+ if cfg.PullerPauseS == 0 {
|
|
|
+ p.pause = defaultPullerPause
|
|
|
+ } else {
|
|
|
+ p.pause = time.Duration(cfg.PullerPauseS) * time.Second
|
|
|
+ }
|
|
|
+
|
|
|
+ if cfg.PullerSleepS == 0 {
|
|
|
+ p.sleep = defaultPullerSleep
|
|
|
+ } else {
|
|
|
+ p.sleep = time.Duration(cfg.PullerSleepS) * time.Second
|
|
|
+ }
|
|
|
+
|
|
|
return p
|
|
|
}
|
|
|
|
|
|
@@ -194,19 +204,13 @@ func (p *rwFolder) Serve() {
|
|
|
|
|
|
case <-p.remoteIndex:
|
|
|
prevVer = 0
|
|
|
- p.pullTimer.Reset(shortPullIntv)
|
|
|
+ p.pullTimer.Reset(0)
|
|
|
l.Debugln(p, "remote index updated, rescheduling pull")
|
|
|
|
|
|
case <-p.pullTimer.C:
|
|
|
if !initialScanCompleted {
|
|
|
l.Debugln(p, "skip (initial)")
|
|
|
- p.pullTimer.Reset(nextPullIntv)
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- if err := p.model.CheckFolderHealth(p.folder); err != nil {
|
|
|
- l.Infoln("Skipping folder", p.folder, "pull due to folder error:", err)
|
|
|
- p.pullTimer.Reset(nextPullIntv)
|
|
|
+ p.pullTimer.Reset(p.sleep)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -226,7 +230,13 @@ func (p *rwFolder) Serve() {
|
|
|
curVer, ok := p.model.RemoteLocalVersion(p.folder)
|
|
|
if !ok || curVer == prevVer {
|
|
|
l.Debugln(p, "skip (curVer == prevVer)", prevVer, ok)
|
|
|
- p.pullTimer.Reset(nextPullIntv)
|
|
|
+ p.pullTimer.Reset(p.sleep)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := p.model.CheckFolderHealth(p.folder); err != nil {
|
|
|
+ l.Infoln("Skipping folder", p.folder, "pull due to folder error:", err)
|
|
|
+ p.pullTimer.Reset(p.sleep)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -260,8 +270,8 @@ func (p *rwFolder) Serve() {
|
|
|
curVer = lv
|
|
|
}
|
|
|
prevVer = curVer
|
|
|
- l.Debugln(p, "next pull in", nextPullIntv)
|
|
|
- p.pullTimer.Reset(nextPullIntv)
|
|
|
+ l.Debugln(p, "next pull in", p.sleep)
|
|
|
+ p.pullTimer.Reset(p.sleep)
|
|
|
break
|
|
|
}
|
|
|
|
|
|
@@ -270,8 +280,8 @@ func (p *rwFolder) Serve() {
|
|
|
// we're not making it. Probably there are write
|
|
|
// errors preventing us. Flag this with a warning and
|
|
|
// wait a bit longer before retrying.
|
|
|
- l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, pauseIntv)
|
|
|
- l.Debugln(p, "next pull in", pauseIntv)
|
|
|
+ l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, p.pause)
|
|
|
+ l.Debugln(p, "next pull in", p.pause)
|
|
|
|
|
|
if folderErrors := p.currentErrors(); len(folderErrors) > 0 {
|
|
|
events.Default.Log(events.FolderErrors, map[string]interface{}{
|
|
|
@@ -280,7 +290,7 @@ func (p *rwFolder) Serve() {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- p.pullTimer.Reset(pauseIntv)
|
|
|
+ p.pullTimer.Reset(p.pause)
|
|
|
break
|
|
|
}
|
|
|
}
|