瀏覽代碼

lib: Consistently unsubscribe from config-wrapper (fixes #6133) (#6205)

Simon Frei 5 年之前
父節點
當前提交
6fd5e78740
共有 5 個文件被更改,包括 27 次插入5 次删除
  1. 11 2
      lib/config/wrapper.go
  2. 6 0
      lib/connections/service.go
  3. 2 1
      lib/model/model.go
  4. 5 1
      lib/model/progressemitter.go
  5. 3 1
      lib/ur/usage_report.go

+ 11 - 2
lib/config/wrapper.go

@@ -94,6 +94,7 @@ type wrapper struct {
 	path     string
 	evLogger events.Logger
 
+	waiter    Waiter // Latest ongoing config change
 	deviceMap map[protocol.DeviceID]DeviceConfiguration
 	folderMap map[string]FolderConfiguration
 	subs      []Committer
@@ -109,6 +110,7 @@ func Wrap(path string, cfg Configuration, evLogger events.Logger) Wrapper {
 		cfg:      cfg,
 		path:     path,
 		evLogger: evLogger,
+		waiter:   noopWaiter{}, // Noop until first config change
 		mut:      sync.NewMutex(),
 	}
 	return w
@@ -144,7 +146,8 @@ func (w *wrapper) Subscribe(c Committer) {
 }
 
 // Unsubscribe de-registers the given handler from any future calls to
-// configuration changes
+// configuration changes and only returns after a potential ongoing config
+// change is done.
 func (w *wrapper) Unsubscribe(c Committer) {
 	w.mut.Lock()
 	for i := range w.subs {
@@ -155,7 +158,11 @@ func (w *wrapper) Unsubscribe(c Committer) {
 			break
 		}
 	}
+	waiter := w.waiter
 	w.mut.Unlock()
+	// Waiting mustn't be done under lock, as the goroutines in notifyListener
+	// may dead-lock when trying to access lock on config read operations.
+	waiter.Wait()
 }
 
 // RawCopy returns a copy of the currently wrapped Configuration object.
@@ -191,7 +198,9 @@ func (w *wrapper) replaceLocked(to Configuration) (Waiter, error) {
 	w.deviceMap = nil
 	w.folderMap = nil
 
-	return w.notifyListeners(from.Copy(), to.Copy()), nil
+	w.waiter = w.notifyListeners(from.Copy(), to.Copy())
+
+	return w.waiter, nil
 }
 
 func (w *wrapper) notifyListeners(from, to Configuration) Waiter {

+ 6 - 0
lib/connections/service.go

@@ -193,6 +193,12 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t
 	return service
 }
 
+func (s *service) Stop() {
+	s.cfg.Unsubscribe(s.limiter)
+	s.cfg.Unsubscribe(s)
+	s.Supervisor.Stop()
+}
+
 func (s *service) handle(ctx context.Context) {
 	var c internalConn
 	for {

+ 2 - 1
lib/model/model.go

@@ -217,7 +217,6 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
 	}
 	m.Add(m.progressEmitter)
 	scanLimiter.setCapacity(cfg.Options().MaxConcurrentScans)
-	cfg.Subscribe(m)
 
 	return m
 }
@@ -241,9 +240,11 @@ func (m *model) onServe() {
 		}
 		m.newFolder(folderCfg)
 	}
+	m.cfg.Subscribe(m)
 }
 
 func (m *model) Stop() {
+	m.cfg.Unsubscribe(m)
 	m.Supervisor.Stop()
 	devs := m.cfg.Devices()
 	ids := make([]protocol.DeviceID, 0, len(devs))

+ 5 - 1
lib/model/progressemitter.go

@@ -23,6 +23,7 @@ import (
 type ProgressEmitter struct {
 	suture.Service
 
+	cfg                config.Wrapper
 	registry           map[string]map[string]*sharedPullerState // folder: name: puller
 	interval           time.Duration
 	minBlocks          int
@@ -40,6 +41,7 @@ type ProgressEmitter struct {
 // DownloadProgress events every interval.
 func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter {
 	t := &ProgressEmitter{
+		cfg:                cfg,
 		registry:           make(map[string]map[string]*sharedPullerState),
 		timer:              time.NewTimer(time.Millisecond),
 		sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
@@ -51,7 +53,6 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi
 	t.Service = util.AsService(t.serve, t.String())
 
 	t.CommitConfiguration(config.Configuration{}, cfg.RawCopy())
-	cfg.Subscribe(t)
 
 	return t
 }
@@ -59,6 +60,9 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi
 // serve starts the progress emitter which starts emitting DownloadProgress
 // events as the progress happens.
 func (t *ProgressEmitter) serve(ctx context.Context) {
+	t.cfg.Subscribe(t)
+	defer t.cfg.Unsubscribe(t)
+
 	var lastUpdate time.Time
 	var lastCount, newCount int
 	for {

+ 3 - 1
lib/ur/usage_report.go

@@ -58,7 +58,6 @@ func New(cfg config.Wrapper, m model.Model, connectionsService connections.Servi
 		forceRun:           make(chan struct{}, 1), // Buffered to prevent locking
 	}
 	svc.Service = util.AsService(svc.serve, svc.String())
-	cfg.Subscribe(svc)
 	return svc
 }
 
@@ -385,6 +384,9 @@ func (s *Service) sendUsageReport() error {
 }
 
 func (s *Service) serve(ctx context.Context) {
+	s.cfg.Subscribe(s)
+	defer s.cfg.Unsubscribe(s)
+
 	t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second)
 	for {
 		select {