瀏覽代碼

lib/model: Progress emitter network operations dont need to be blocking (#6589)

* lib/model: Progress emitter network operations dont need to be blocking

* Do sends outside of the lock
Audrius Butkevicius 5 年之前
父節點
當前提交
f86deedd9c
共有 2 個文件被更改,包括 38 次插入7 次删除
  1. 32 5
      lib/model/progressemitter.go
  2. 6 2
      lib/model/progressemitter_test.go

+ 32 - 5
lib/model/progressemitter.go

@@ -37,6 +37,16 @@ type ProgressEmitter struct {
 	timer *time.Timer
 	timer *time.Timer
 }
 }
 
 
+type progressUpdate struct {
+	conn    protocol.Connection
+	folder  string
+	updates []protocol.FileDownloadProgressUpdate
+}
+
+func (p progressUpdate) send(ctx context.Context) {
+	p.conn.DownloadProgress(ctx, p.folder, p.updates)
+}
+
 // NewProgressEmitter creates a new progress emitter which emits
 // NewProgressEmitter creates a new progress emitter which emits
 // DownloadProgress events every interval.
 // DownloadProgress events every interval.
 func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter {
 func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter {
@@ -76,6 +86,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) {
 
 
 			newLastUpdated := lastUpdate
 			newLastUpdated := lastUpdate
 			newCount = t.lenRegistryLocked()
 			newCount = t.lenRegistryLocked()
+			var progressUpdates []progressUpdate
 			for _, pullers := range t.registry {
 			for _, pullers := range t.registry {
 				for _, puller := range pullers {
 				for _, puller := range pullers {
 					if updated := puller.Updated(); updated.After(newLastUpdated) {
 					if updated := puller.Updated(); updated.After(newLastUpdated) {
@@ -88,9 +99,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) {
 				lastUpdate = newLastUpdated
 				lastUpdate = newLastUpdated
 				lastCount = newCount
 				lastCount = newCount
 				t.sendDownloadProgressEventLocked()
 				t.sendDownloadProgressEventLocked()
-				if len(t.connections) > 0 {
-					t.sendDownloadProgressMessagesLocked(ctx)
-				}
+				progressUpdates = t.computeProgressUpdates()
 			} else {
 			} else {
 				l.Debugln("progress emitter: nothing new")
 				l.Debugln("progress emitter: nothing new")
 			}
 			}
@@ -99,6 +108,17 @@ func (t *ProgressEmitter) serve(ctx context.Context) {
 				t.timer.Reset(t.interval)
 				t.timer.Reset(t.interval)
 			}
 			}
 			t.mut.Unlock()
 			t.mut.Unlock()
+
+			// Do the sending outside of the lock.
+			// If these send block, the whole process of reporting progress to others stops, but that's probably fine.
+			// It's better to stop this component from working under back-pressure than causing other components that
+			// rely on this component to be waiting for locks.
+			//
+			// This might leave remote peers in some funky state where we are unable the fact that we no longer have
+			// something, but there is not much we can do here.
+			for _, update := range progressUpdates {
+				update.send(ctx)
+			}
 		}
 		}
 	}
 	}
 }
 }
@@ -118,7 +138,8 @@ func (t *ProgressEmitter) sendDownloadProgressEventLocked() {
 	l.Debugf("progress emitter: emitting %#v", output)
 	l.Debugf("progress emitter: emitting %#v", output)
 }
 }
 
 
-func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context) {
+func (t *ProgressEmitter) computeProgressUpdates() []progressUpdate {
+	var progressUpdates []progressUpdate
 	for id, conn := range t.connections {
 	for id, conn := range t.connections {
 		for _, folder := range t.foldersByConns[id] {
 		for _, folder := range t.foldersByConns[id] {
 			pullers, ok := t.registry[folder]
 			pullers, ok := t.registry[folder]
@@ -149,7 +170,11 @@ func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context
 			updates := state.update(folder, activePullers)
 			updates := state.update(folder, activePullers)
 
 
 			if len(updates) > 0 {
 			if len(updates) > 0 {
-				conn.DownloadProgress(ctx, folder, updates)
+				progressUpdates = append(progressUpdates, progressUpdate{
+					conn:    conn,
+					folder:  folder,
+					updates: updates,
+				})
 			}
 			}
 		}
 		}
 	}
 	}
@@ -189,6 +214,8 @@ func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context
 			// }
 			// }
 		}
 		}
 	}
 	}
+
+	return progressUpdates
 }
 }
 
 
 // VerifyConfiguration implements the config.Committer interface
 // VerifyConfiguration implements the config.Committer interface

+ 6 - 2
lib/model/progressemitter_test.go

@@ -461,6 +461,10 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 
 
 func sendMsgs(p *ProgressEmitter) {
 func sendMsgs(p *ProgressEmitter) {
 	p.mut.Lock()
 	p.mut.Lock()
-	defer p.mut.Unlock()
-	p.sendDownloadProgressMessagesLocked(context.Background())
+	updates := p.computeProgressUpdates()
+	p.mut.Unlock()
+	ctx := context.Background()
+	for _, update := range updates {
+		update.send(ctx)
+	}
 }
 }