|
|
@@ -63,7 +63,8 @@ var errNoNode = errors.New("no available source node")
|
|
|
type puller struct {
|
|
|
cfg *config.Configuration
|
|
|
repoCfg config.RepositoryConfiguration
|
|
|
- bq *blockQueue
|
|
|
+ bq blockQueue
|
|
|
+ slots int
|
|
|
model *Model
|
|
|
oustandingPerNode activityMap
|
|
|
openFiles map[string]openFile
|
|
|
@@ -75,9 +76,9 @@ type puller struct {
|
|
|
|
|
|
func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
|
|
|
p := &puller{
|
|
|
- repoCfg: repoCfg,
|
|
|
cfg: cfg,
|
|
|
- bq: newBlockQueue(),
|
|
|
+ repoCfg: repoCfg,
|
|
|
+ slots: slots,
|
|
|
model: model,
|
|
|
oustandingPerNode: make(activityMap),
|
|
|
openFiles: make(map[string]openFile),
|
|
|
@@ -96,9 +97,6 @@ func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int,
|
|
|
|
|
|
if slots > 0 {
|
|
|
// Read/write
|
|
|
- for i := 0; i < slots; i++ {
|
|
|
- p.requestSlots <- true
|
|
|
- }
|
|
|
if debug {
|
|
|
l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
|
|
|
}
|
|
|
@@ -114,57 +112,70 @@ func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int,
|
|
|
}
|
|
|
|
|
|
func (p *puller) run() {
|
|
|
- go func() {
|
|
|
- // fill blocks queue when there are free slots
|
|
|
- for {
|
|
|
- <-p.requestSlots
|
|
|
- b := p.bq.get()
|
|
|
- if debug {
|
|
|
- l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
|
|
|
- }
|
|
|
- p.blocks <- b
|
|
|
- }
|
|
|
- }()
|
|
|
-
|
|
|
- timeout := time.Tick(5 * time.Second)
|
|
|
changed := true
|
|
|
scanintv := time.Duration(p.cfg.Options.RescanIntervalS) * time.Second
|
|
|
lastscan := time.Now()
|
|
|
var prevVer uint64
|
|
|
+ var queued int
|
|
|
+
|
|
|
+ // Load up the request slots
|
|
|
+ for i := 0; i < cap(p.requestSlots); i++ {
|
|
|
+ p.requestSlots <- true
|
|
|
+ }
|
|
|
|
|
|
for {
|
|
|
// Run the pulling loop as long as there are blocks to fetch
|
|
|
- pull:
|
|
|
- for {
|
|
|
- select {
|
|
|
- case res := <-p.requestResults:
|
|
|
- p.model.setState(p.repoCfg.ID, RepoSyncing)
|
|
|
- changed = true
|
|
|
- p.requestSlots <- true
|
|
|
- p.handleRequestResult(res)
|
|
|
-
|
|
|
- case b := <-p.blocks:
|
|
|
- p.model.setState(p.repoCfg.ID, RepoSyncing)
|
|
|
- changed = true
|
|
|
- if p.handleBlock(b) {
|
|
|
- // Block was fully handled, free up the slot
|
|
|
+
|
|
|
+ prevVer, queued = p.queueNeededBlocks(prevVer)
|
|
|
+ if queued > 0 {
|
|
|
+
|
|
|
+ pull:
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case res := <-p.requestResults:
|
|
|
+ p.model.setState(p.repoCfg.ID, RepoSyncing)
|
|
|
+ changed = true
|
|
|
p.requestSlots <- true
|
|
|
- }
|
|
|
+ p.handleRequestResult(res)
|
|
|
|
|
|
- case <-timeout:
|
|
|
- if len(p.openFiles) == 0 && p.bq.empty() {
|
|
|
- // Nothing more to do for the moment
|
|
|
- break pull
|
|
|
- }
|
|
|
- if debug {
|
|
|
- l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
|
|
|
- i := 5
|
|
|
- for _, f := range p.openFiles {
|
|
|
- l.Debugf(" %v", f)
|
|
|
- i--
|
|
|
- if i == 0 {
|
|
|
- break
|
|
|
+ case <-p.requestSlots:
|
|
|
+ b, ok := p.bq.get()
|
|
|
+
|
|
|
+ if !ok {
|
|
|
+ if debug {
|
|
|
+ l.Debugf("%q: pulling loop needs more blocks", p.repoCfg.ID)
|
|
|
}
|
|
|
+ prevVer, _ = p.queueNeededBlocks(prevVer)
|
|
|
+ b, ok = p.bq.get()
|
|
|
+ }
|
|
|
+
|
|
|
+ if !ok && len(p.openFiles) == 0 {
|
|
|
+ // Nothing queued, nothing outstanding
|
|
|
+ if debug {
|
|
|
+ l.Debugf("%q: pulling loop done", p.repoCfg.ID)
|
|
|
+ }
|
|
|
+ break pull
|
|
|
+ }
|
|
|
+
|
|
|
+ if !ok {
|
|
|
+ // Nothing queued, but there are still open files.
|
|
|
+ // Give the situation a moment to change.
|
|
|
+ if debug {
|
|
|
+ l.Debugf("%q: pulling loop paused", p.repoCfg.ID)
|
|
|
+ }
|
|
|
+ p.requestSlots <- true
|
|
|
+ time.Sleep(100 * time.Millisecond)
|
|
|
+ continue pull
|
|
|
+ }
|
|
|
+
|
|
|
+ if debug {
|
|
|
+ l.Debugf("queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
|
|
|
+ }
|
|
|
+ p.model.setState(p.repoCfg.ID, RepoSyncing)
|
|
|
+ changed = true
|
|
|
+ if p.handleBlock(b) {
|
|
|
+ // Block was fully handled, free up the slot
|
|
|
+ p.requestSlots <- true
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -192,19 +203,7 @@ func (p *puller) run() {
|
|
|
lastscan = time.Now()
|
|
|
}
|
|
|
|
|
|
- if v := p.model.LocalVersion(p.repoCfg.ID); v != prevVer {
|
|
|
- if debug {
|
|
|
- l.Debugf("%q: checking for more needed blocks", p.repoCfg.ID)
|
|
|
- }
|
|
|
- // Queue more blocks to fetch, if any
|
|
|
- if p.queueNeededBlocks() == 0 {
|
|
|
- if debug {
|
|
|
- l.Debugf("%q: no more needed blocks", p.repoCfg.ID)
|
|
|
- }
|
|
|
- // We've fetched all blocks we need
|
|
|
- prevVer = v
|
|
|
- }
|
|
|
- }
|
|
|
+ time.Sleep(5 * time.Second)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -620,9 +619,21 @@ func (p *puller) handleEmptyBlock(b bqBlock) {
|
|
|
delete(p.openFiles, f.Name)
|
|
|
}
|
|
|
|
|
|
-func (p *puller) queueNeededBlocks() int {
|
|
|
+func (p *puller) queueNeededBlocks(prevVer uint64) (uint64, int) {
|
|
|
+ curVer := p.model.LocalVersion(p.repoCfg.ID)
|
|
|
+ if curVer == prevVer {
|
|
|
+ return curVer, 0
|
|
|
+ }
|
|
|
+
|
|
|
+ if debug {
|
|
|
+ l.Debugf("%q: checking for more needed blocks", p.repoCfg.ID)
|
|
|
+ }
|
|
|
+
|
|
|
queued := 0
|
|
|
for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
|
|
|
+ if _, ok := p.openFiles[f.Name]; ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
|
|
|
have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
|
|
|
if debug {
|
|
|
@@ -638,7 +649,12 @@ func (p *puller) queueNeededBlocks() int {
|
|
|
if debug && queued > 0 {
|
|
|
l.Debugf("%q: queued %d items", p.repoCfg.ID, queued)
|
|
|
}
|
|
|
- return queued
|
|
|
+
|
|
|
+ if queued > 0 {
|
|
|
+ return prevVer, queued
|
|
|
+ } else {
|
|
|
+ return curVer, 0
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (p *puller) closeFile(f protocol.FileInfo) {
|