Browse Source

Handle needed files in batches

Jakob Borg 11 years ago
parent
commit
7943902d73
3 changed files with 54 additions and 30 deletions
  1. 1 1
      files/leveldb.go
  2. 46 25
      model/model.go
  3. 7 4
      model/puller.go

+ 1 - 1
files/leveldb.go

@@ -207,8 +207,8 @@ func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint6
 		if debug {
 			l.Debugf("delete; repo=%q node=%x name=%q", repo, node, name)
 		}
-		batch.Delete(dbi.Key())
 		ldbRemoveFromGlobal(db, batch, repo, node, name)
+		batch.Delete(dbi.Key())
 		return 0
 	})
 }

+ 46 - 25
model/model.go

@@ -278,8 +278,17 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
 
 // NeedSize returns the number and total size of currently needed files.
 func (m *Model) NeedSize(repo string) (files int, bytes int64) {
-	f, d, b := sizeOf(m.NeedFilesRepo(repo))
-	return f + d, b
+	m.rmut.RLock()
+	defer m.rmut.RUnlock()
+	if rf, ok := m.repoFiles[repo]; ok {
+		rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
+			fs, de, by := sizeOfFile(f)
+			files += fs + de
+			bytes += by
+			return true
+		})
+	}
+	return
 }
 
 // NeedFiles returns the list of currently needed files
@@ -287,10 +296,10 @@ func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
 	m.rmut.RLock()
 	defer m.rmut.RUnlock()
 	if rf, ok := m.repoFiles[repo]; ok {
-		var fs []protocol.FileInfo
+		fs := make([]protocol.FileInfo, 0, indexBatchSize)
 		rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
 			fs = append(fs, f)
-			return true
+			return len(fs) < indexBatchSize
 		})
 		if r := m.repoCfgs[repo].FileRanker(); r != nil {
 			files.SortBy(r).Sort(fs)
@@ -721,9 +730,10 @@ func (m *Model) ScanRepo(repo string) error {
 	if err != nil {
 		return err
 	}
-	batch := make([]protocol.FileInfo, 0, indexBatchSize)
+	batchSize := 100
+	batch := make([]protocol.FileInfo, 0, 00)
 	for f := range fchan {
-		if len(batch) == indexBatchSize {
+		if len(batch) == batchSize {
 			fs.Update(protocol.LocalNodeID, batch)
 			batch = batch[:0]
 		}
@@ -736,7 +746,7 @@ func (m *Model) ScanRepo(repo string) error {
 	batch = batch[:0]
 	fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
 		if !protocol.IsDeleted(f.Flags) {
-			if len(batch) == indexBatchSize {
+			if len(batch) == batchSize {
 				fs.Update(protocol.LocalNodeID, batch)
 				batch = batch[:0]
 			}
@@ -810,41 +820,52 @@ func (m *Model) State(repo string) string {
 }
 
 func (m *Model) Override(repo string) {
-	fs := m.NeedFilesRepo(repo)
-
 	m.rmut.RLock()
-	r := m.repoFiles[repo]
+	fs := m.repoFiles[repo]
 	m.rmut.RUnlock()
 
-	for i := range fs {
-		f := &fs[i]
-		h := r.Get(protocol.LocalNodeID, f.Name)
-		if h.Name != f.Name {
+	batch := make([]protocol.FileInfo, 0, indexBatchSize)
+	fs.WithNeed(protocol.LocalNodeID, func(need protocol.FileInfo) bool {
+		if len(batch) == indexBatchSize {
+			fs.Update(protocol.LocalNodeID, batch)
+			batch = batch[:0]
+		}
+
+		have := fs.Get(protocol.LocalNodeID, need.Name)
+		if have.Name != need.Name {
 			// We are missing the file
-			f.Flags |= protocol.FlagDeleted
-			f.Blocks = nil
+			need.Flags |= protocol.FlagDeleted
+			need.Blocks = nil
 		} else {
 			// We have the file, replace with our version
-			*f = h
+			need = have
 		}
-		f.Version = lamport.Default.Tick(f.Version)
-		f.LocalVersion = 0
+		need.Version = lamport.Default.Tick(need.Version)
+		need.LocalVersion = 0
+		batch = append(batch, need)
+		return true
+	})
+	if len(batch) > 0 {
+		fs.Update(protocol.LocalNodeID, batch)
 	}
-
-	r.Update(protocol.LocalNodeID, fs)
 }
 
 // Version returns the change version for the given repository. This is
 // guaranteed to increment if the contents of the local or global repository
 // has changed.
 func (m *Model) LocalVersion(repo string) uint64 {
-	var ver uint64
-
 	m.rmut.Lock()
+	defer m.rmut.Unlock()
+
+	fs, ok := m.repoFiles[repo]
+	if !ok {
+		return 0
+	}
+
+	ver := fs.LocalVersion(protocol.LocalNodeID)
 	for _, n := range m.repoNodes[repo] {
-		ver += m.repoFiles[repo].LocalVersion(n)
+		ver += fs.LocalVersion(n)
 	}
-	m.rmut.Unlock()
 
 	return ver
 }

+ 7 - 4
model/puller.go

@@ -195,8 +195,10 @@ func (p *puller) run() {
 
 		if v := p.model.LocalVersion(p.repoCfg.ID); v > prevVer {
 			// Queue more blocks to fetch, if any
-			p.queueNeededBlocks()
-			prevVer = v
+			if p.queueNeededBlocks() == 0 {
+				// We've fetched all blocks we need
+				prevVer = v
+			}
 		}
 	}
 }
@@ -618,7 +620,7 @@ func (p *puller) handleEmptyBlock(b bqBlock) {
 	delete(p.openFiles, f.Name)
 }
 
-func (p *puller) queueNeededBlocks() {
+func (p *puller) queueNeededBlocks() int {
 	queued := 0
 	for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
 		lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
@@ -634,8 +636,9 @@ func (p *puller) queueNeededBlocks() {
 		})
 	}
 	if debug && queued > 0 {
-		l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
+		l.Debugf("%q: queued %d items", p.repoCfg.ID, queued)
 	}
+	return queued
 }
 
 func (p *puller) closeFile(f protocol.FileInfo) {