Преглед изворни кода

lib/model: Refactor progressEmitter to de-/activate by config (fixes #4613) (#5623)

Simon Frei пре 6 година
родитељ
комит
3bea59b0d9
4 измењених фајлова са 152 додато и 116 уклоњено
  1. 2 6
      lib/model/folder_sendrecv.go
  2. 1 3
      lib/model/model.go
  3. 106 73
      lib/model/progressemitter.go
  4. 43 34
      lib/model/progressemitter_test.go

+ 2 - 6
lib/model/folder_sendrecv.go

@@ -1201,9 +1201,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
 			continue
 		}
 
-		if f.model.progressEmitter != nil {
-			f.model.progressEmitter.Register(state.sharedPullerState)
-		}
+		f.model.progressEmitter.Register(state.sharedPullerState)
 
 		folderFilesystems := make(map[string]fs.Filesystem)
 		var folders []string
@@ -1550,9 +1548,7 @@ func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpda
 				blockStatsMut.Unlock()
 			}
 
-			if f.model.progressEmitter != nil {
-				f.model.progressEmitter.Deregister(state)
-			}
+			f.model.progressEmitter.Deregister(state)
 
 			events.Default.Log(events.ItemFinished, map[string]interface{}{
 				"folder": f.folderID,

+ 1 - 3
lib/model/model.go

@@ -218,9 +218,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
 		fmut:                sync.NewRWMutex(),
 		pmut:                sync.NewRWMutex(),
 	}
-	if cfg.Options().ProgressUpdateIntervalS > -1 {
-		m.Add(m.progressEmitter)
-	}
+	m.Add(m.progressEmitter)
 	scanLimiter.setCapacity(cfg.Options().MaxConcurrentScans)
 	cfg.Subscribe(m)
 

+ 106 - 73
lib/model/progressemitter.go

@@ -17,11 +17,13 @@ import (
 )
 
 type ProgressEmitter struct {
-	registry           map[string]*sharedPullerState
+	registry           map[string]map[string]*sharedPullerState // folder: name: puller
 	interval           time.Duration
 	minBlocks          int
 	sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages.
-	connections        map[string][]protocol.Connection
+	connections        map[protocol.DeviceID]protocol.Connection
+	foldersByConns     map[protocol.DeviceID][]string
+	disabled           bool
 	mut                sync.Mutex
 
 	timer *time.Timer
@@ -34,10 +36,11 @@ type ProgressEmitter struct {
 func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter {
 	t := &ProgressEmitter{
 		stop:               make(chan struct{}),
-		registry:           make(map[string]*sharedPullerState),
+		registry:           make(map[string]map[string]*sharedPullerState),
 		timer:              time.NewTimer(time.Millisecond),
 		sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
-		connections:        make(map[string][]protocol.Connection),
+		connections:        make(map[protocol.DeviceID]protocol.Connection),
+		foldersByConns:     make(map[protocol.DeviceID][]string),
 		mut:                sync.NewMutex(),
 	}
 
@@ -62,20 +65,21 @@ func (t *ProgressEmitter) Serve() {
 			l.Debugln("progress emitter: timer - looking after", len(t.registry))
 
 			newLastUpdated := lastUpdate
-			newCount = len(t.registry)
-			for _, puller := range t.registry {
-				updated := puller.Updated()
-				if updated.After(newLastUpdated) {
-					newLastUpdated = updated
+			newCount = t.lenRegistryLocked()
+			for _, pullers := range t.registry {
+				for _, puller := range pullers {
+					if updated := puller.Updated(); updated.After(newLastUpdated) {
+						newLastUpdated = updated
+					}
 				}
 			}
 
 			if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount {
 				lastUpdate = newLastUpdated
 				lastCount = newCount
-				t.sendDownloadProgressEvent()
+				t.sendDownloadProgressEventLocked()
 				if len(t.connections) > 0 {
-					t.sendDownloadProgressMessages()
+					t.sendDownloadProgressMessagesLocked()
 				}
 			} else {
 				l.Debugln("progress emitter: nothing new")
@@ -89,30 +93,29 @@ func (t *ProgressEmitter) Serve() {
 	}
 }
 
-func (t *ProgressEmitter) sendDownloadProgressEvent() {
-	// registry lock already held
+func (t *ProgressEmitter) sendDownloadProgressEventLocked() {
 	output := make(map[string]map[string]*pullerProgress)
-	for _, puller := range t.registry {
-		if output[puller.folder] == nil {
-			output[puller.folder] = make(map[string]*pullerProgress)
+	for folder, pullers := range t.registry {
+		if len(pullers) == 0 {
+			continue
+		}
+		output[folder] = make(map[string]*pullerProgress)
+		for name, puller := range pullers {
+			output[folder][name] = puller.Progress()
 		}
-		output[puller.folder][puller.file.Name] = puller.Progress()
 	}
 	events.Default.Log(events.DownloadProgress, output)
 	l.Debugf("progress emitter: emitting %#v", output)
 }
 
-func (t *ProgressEmitter) sendDownloadProgressMessages() {
-	// registry lock already held
-	sharedFolders := make(map[protocol.DeviceID][]string)
-	deviceConns := make(map[protocol.DeviceID]protocol.Connection)
-	subscribers := t.connections
-	for folder, conns := range subscribers {
-		for _, conn := range conns {
-			id := conn.ID()
-
-			deviceConns[id] = conn
-			sharedFolders[id] = append(sharedFolders[id], folder)
+func (t *ProgressEmitter) sendDownloadProgressMessagesLocked() {
+	for id, conn := range t.connections {
+		for _, folder := range t.foldersByConns[id] {
+			pullers, ok := t.registry[folder]
+			if !ok {
+				// There's never been any puller registered for this folder yet
+				continue
+			}
 
 			state, ok := t.sentDownloadStates[id]
 			if !ok {
@@ -122,8 +125,8 @@ func (t *ProgressEmitter) sendDownloadProgressMessages() {
 				t.sentDownloadStates[id] = state
 			}
 
-			var activePullers []*sharedPullerState
-			for _, puller := range t.registry {
+			activePullers := make([]*sharedPullerState, 0, len(pullers))
+			for _, puller := range pullers {
 				if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks {
 					continue
 				}
@@ -143,7 +146,7 @@ func (t *ProgressEmitter) sendDownloadProgressMessages() {
 
 	// Clean up sentDownloadStates for devices which we are no longer connected to.
 	for id := range t.sentDownloadStates {
-		_, ok := deviceConns[id]
+		_, ok := t.connections[id]
 		if !ok {
 			// Null out outstanding entries for device
 			delete(t.sentDownloadStates, id)
@@ -152,13 +155,12 @@ func (t *ProgressEmitter) sendDownloadProgressMessages() {
 
 	// If a folder was unshared from some device, tell it that all temp files
 	// are now gone.
-	for id, sharedDeviceFolders := range sharedFolders {
-		state := t.sentDownloadStates[id]
-	nextFolder:
+	for id, state := range t.sentDownloadStates {
 		// For each of the folders that the state is aware of,
 		// try to match it with a shared folder we've discovered above,
+	nextFolder:
 		for _, folder := range state.folders() {
-			for _, existingFolder := range sharedDeviceFolders {
+			for _, existingFolder := range t.foldersByConns[id] {
 				if existingFolder == folder {
 					continue nextFolder
 				}
@@ -189,12 +191,23 @@ func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) boo
 	t.mut.Lock()
 	defer t.mut.Unlock()
 
-	t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
-	if t.interval < time.Second {
-		t.interval = time.Second
+	switch {
+	case t.disabled && to.Options.ProgressUpdateIntervalS >= 0:
+		t.disabled = false
+		l.Debugln("progress emitter: enabled")
+		fallthrough
+	case !t.disabled && from.Options.ProgressUpdateIntervalS != to.Options.ProgressUpdateIntervalS:
+		t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
+		if t.interval < time.Second {
+			t.interval = time.Second
+		}
+		l.Debugln("progress emitter: updated interval", t.interval)
+	case !t.disabled && to.Options.ProgressUpdateIntervalS < 0:
+		t.clearLocked()
+		t.disabled = true
+		l.Debugln("progress emitter: disabled")
 	}
 	t.minBlocks = to.Options.TempIndexMinBlocks
-	l.Debugln("progress emitter: updated interval", t.interval)
 
 	return true
 }
@@ -209,13 +222,18 @@ func (t *ProgressEmitter) Stop() {
 func (t *ProgressEmitter) Register(s *sharedPullerState) {
 	t.mut.Lock()
 	defer t.mut.Unlock()
+	if t.disabled {
+		l.Debugln("progress emitter: disabled, skip registering")
+		return
+	}
 	l.Debugln("progress emitter: registering", s.folder, s.file.Name)
-	if len(t.registry) == 0 {
+	if t.emptyLocked() {
 		t.timer.Reset(t.interval)
 	}
-	// Separate the folder ID (arbitrary string) and the file name by "//"
-	// because it never appears in a valid file name.
-	t.registry[s.folder+"//"+s.file.Name] = s
+	if _, ok := t.registry[s.folder]; !ok {
+		t.registry[s.folder] = make(map[string]*sharedPullerState)
+	}
+	t.registry[s.folder][s.file.Name] = s
 }
 
 // Deregister a puller which will stop broadcasting pullers state.
@@ -223,9 +241,13 @@ func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
 	t.mut.Lock()
 	defer t.mut.Unlock()
 
-	l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
+	if t.disabled {
+		l.Debugln("progress emitter: disabled, skip deregistering")
+		return
+	}
 
-	delete(t.registry, s.folder+"//"+s.file.Name)
+	l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
+	delete(t.registry[s.folder], s.file.Name)
 }
 
 // BytesCompleted returns the number of bytes completed in the given folder.
@@ -233,10 +255,8 @@ func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
 	t.mut.Lock()
 	defer t.mut.Unlock()
 
-	for _, s := range t.registry {
-		if s.folder == folder {
-			bytes += s.Progress().BytesDone
-		}
+	for _, s := range t.registry[folder] {
+		bytes += s.Progress().BytesDone
 	}
 	l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
 	return
@@ -249,40 +269,53 @@ func (t *ProgressEmitter) String() string {
 func (t *ProgressEmitter) lenRegistry() int {
 	t.mut.Lock()
 	defer t.mut.Unlock()
-	return len(t.registry)
+	return t.lenRegistryLocked()
 }
 
-func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) {
-	t.mut.Lock()
-	for _, folder := range folders {
-		t.connections[folder] = append(t.connections[folder], conn)
+func (t *ProgressEmitter) lenRegistryLocked() (out int) {
+	for _, pullers := range t.registry {
+		out += len(pullers)
 	}
-	t.mut.Unlock()
+	return out
 }
 
-func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) {
-	t.mut.Lock()
-	left := make(map[string][]protocol.Connection, len(t.connections))
-	for folder, conns := range t.connections {
-		connsLeft := connsWithout(conns, conn)
-		if len(connsLeft) > 0 {
-			left[folder] = connsLeft
+func (t *ProgressEmitter) emptyLocked() bool {
+	for _, pullers := range t.registry {
+		if len(pullers) != 0 {
+			return false
 		}
 	}
-	t.connections = left
-	t.mut.Unlock()
+	return true
 }
 
-func connsWithout(conns []protocol.Connection, conn protocol.Connection) []protocol.Connection {
-	if len(conns) == 0 {
-		return nil
-	}
+func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) {
+	t.mut.Lock()
+	defer t.mut.Unlock()
+	t.connections[conn.ID()] = conn
+	t.foldersByConns[conn.ID()] = folders
+}
+
+func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) {
+	t.mut.Lock()
+	defer t.mut.Unlock()
+	delete(t.connections, conn.ID())
+	delete(t.foldersByConns, conn.ID())
+}
 
-	newConns := make([]protocol.Connection, 0, len(conns)-1)
-	for _, existingConn := range conns {
-		if existingConn != conn {
-			newConns = append(newConns, existingConn)
+func (t *ProgressEmitter) clearLocked() {
+	for id, state := range t.sentDownloadStates {
+		conn, ok := t.connections[id]
+		if !ok {
+			continue
+		}
+		for _, folder := range state.folders() {
+			if updates := state.cleanup(folder); len(updates) > 0 {
+				conn.DownloadProgress(folder, updates)
+			}
 		}
 	}
-	return newConns
+	t.registry = make(map[string]map[string]*sharedPullerState)
+	t.sentDownloadStates = make(map[protocol.DeviceID]*sentDownloadState)
+	t.connections = make(map[protocol.DeviceID]protocol.Connection)
+	t.foldersByConns = make(map[protocol.DeviceID][]string)
 }

+ 43 - 34
lib/model/progressemitter_test.go

@@ -114,6 +114,9 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 
 	p := NewProgressEmitter(c)
 	p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"})
+	p.registry["folder"] = make(map[string]*sharedPullerState)
+	p.registry["folder2"] = make(map[string]*sharedPullerState)
+	p.registry["folderXXX"] = make(map[string]*sharedPullerState)
 
 	expect := func(updateIdx int, state *sharedPullerState, updateType protocol.FileDownloadProgressUpdateType, version protocol.Vector, blocks []int32, remove bool) {
 		messageIdx := -1
@@ -202,39 +205,39 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 		mut:              sync.NewRWMutex(),
 		availableUpdated: time.Now(),
 	}
-	p.registry["1"] = state1
+	p.registry["folder"]["1"] = state1
 
 	// Has no blocks, hence no message is sent
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 	expectEmpty()
 
 	// Returns update for puller with new extra blocks
 	state1.available = []int32{1}
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true)
 	expectEmpty()
 
 	// Does nothing if nothing changes
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 	expectEmpty()
 
 	// Does nothing if timestamp updated, but no new blocks (should never happen)
 	state1.availableUpdated = tick()
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 	expectEmpty()
 
 	// Does not return an update if date blocks change but date does not (should never happen)
 	state1.available = []int32{1, 2}
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 	expectEmpty()
 
 	// If the date and blocks changes, returns only the diff
 	state1.availableUpdated = tick()
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true)
 	expectEmpty()
@@ -242,7 +245,7 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 	// Returns forget and update if puller version has changed
 	state1.file.Version = v2
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(0, state1, protocol.UpdateTypeForget, v1, nil, false)
 	expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true)
@@ -254,7 +257,7 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 	state1.availableUpdated = tick()
 	state1.created = tick()
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
 	expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1}, true)
@@ -265,7 +268,7 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 	state1.available = nil
 	state1.availableUpdated = tick()
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
 	expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true)
@@ -308,11 +311,11 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 		available:        []int32{1, 2, 3},
 		availableUpdated: time.Now(),
 	}
-	p.registry["2"] = state2
-	p.registry["3"] = state3
-	p.registry["4"] = state4
+	p.registry["folder2"]["2"] = state2
+	p.registry["folder"]["3"] = state3
+	p.registry["folder2"]["4"] = state4
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
 	expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
@@ -326,10 +329,10 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 	state2.available = []int32{1, 2, 3, 4, 5}
 	state2.availableUpdated = tick()
 
-	delete(p.registry, "3")
-	delete(p.registry, "4")
+	delete(p.registry["folder"], "3")
+	delete(p.registry["folder2"], "4")
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
 	expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true)
@@ -338,8 +341,8 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 	expectEmpty()
 
 	// Deletions are sent only once (actual bug I found writing the tests)
-	p.sendDownloadProgressMessages()
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
+	sendMsgs(p)
 	expectEmpty()
 
 	// Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers
@@ -392,31 +395,31 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 		available:        []int32{1, 2, 3},
 		availableUpdated: time.Now(),
 	}
-	p.registry["5"] = state5
-	p.registry["6"] = state6
-	p.registry["7"] = state7
-	p.registry["8"] = state8
+	p.registry["folder"]["5"] = state5
+	p.registry["folder"]["6"] = state6
+	p.registry["folderXXX"]["7"] = state7
+	p.registry["folder"]["8"] = state8
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expectEmpty()
 
 	// Device is no longer subscribed to a particular folder
-	delete(p.registry, "1") // Clean up first
-	delete(p.registry, "2") // Clean up first
+	delete(p.registry["folder"], "1")  // Clean up first
+	delete(p.registry["folder2"], "2") // Clean up first
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 	expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true)
 	expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true)
 
 	expectEmpty()
 
-	p.registry["1"] = state1
-	p.registry["2"] = state2
-	p.registry["3"] = state3
-	p.registry["4"] = state4
+	p.registry["folder"]["1"] = state1
+	p.registry["folder2"]["2"] = state2
+	p.registry["folder"]["3"] = state3
+	p.registry["folder2"]["4"] = state4
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
 	expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
@@ -427,7 +430,7 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 	p.temporaryIndexUnsubscribe(fc)
 	p.temporaryIndexSubscribe(fc, []string{"folder"})
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 
 	// See progressemitter.go for explanation why this is commented out.
 	// Search for state.cleanup
@@ -439,9 +442,15 @@ func TestSendDownloadProgressMessages(t *testing.T) {
 	// Cleanup when device no longer exists
 	p.temporaryIndexUnsubscribe(fc)
 
-	p.sendDownloadProgressMessages()
+	sendMsgs(p)
 	_, ok := p.sentDownloadStates[fc.ID()]
 	if ok {
 		t.Error("Should not be there")
 	}
 }
+
+func sendMsgs(p *ProgressEmitter) {
+	p.mut.Lock()
+	defer p.mut.Unlock()
+	p.sendDownloadProgressMessagesLocked()
+}