浏览代码

lib/model, lib/protocol: Send ClusterConfig on config change (fixes #7020) (#7018)

Simon Frei 5 年之前
父节点
当前提交
a20c6ca868
共有 4 个文件被更改,包括 155 次插入116 次删除
  1. 8 1
      lib/model/fakeconns_test.go
  2. 79 86
      lib/model/model.go
  3. 60 24
      lib/model/model_test.go
  4. 8 5
      lib/protocol/protocol.go

+ 8 - 1
lib/model/fakeconns_test.go

@@ -35,6 +35,7 @@ type fakeConnection struct {
 	indexFn                  func(context.Context, string, []protocol.FileInfo)
 	requestFn                func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
 	closeFn                  func(error)
+	clusterConfigFn          func(protocol.ClusterConfig)
 	mut                      sync.Mutex
 }
 
@@ -91,7 +92,13 @@ func (f *fakeConnection) Request(ctx context.Context, folder, name string, offse
 	return f.fileData[name], nil
 }
 
-func (f *fakeConnection) ClusterConfig(protocol.ClusterConfig) {}
+func (f *fakeConnection) ClusterConfig(cc protocol.ClusterConfig) {
+	f.mut.Lock()
+	defer f.mut.Unlock()
+	if f.clusterConfigFn != nil {
+		f.clusterConfigFn(cc)
+	}
+}
 
 func (f *fakeConnection) Ping() bool {
 	f.mut.Lock()

+ 79 - 86
lib/model/model.go

@@ -150,6 +150,7 @@ type model struct {
 	helloMessages       map[protocol.DeviceID]protocol.Hello
 	deviceDownloads     map[protocol.DeviceID]*deviceDownloadState
 	remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders
+	indexSenderTokens   map[protocol.DeviceID][]suture.ServiceToken
 
 	foldersRunning int32 // for testing only
 }
@@ -222,6 +223,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
 		helloMessages:       make(map[protocol.DeviceID]protocol.Hello),
 		deviceDownloads:     make(map[protocol.DeviceID]*deviceDownloadState),
 		remotePausedFolders: make(map[protocol.DeviceID][]string),
+		indexSenderTokens:   make(map[protocol.DeviceID][]suture.ServiceToken),
 	}
 	for devID := range cfg.Devices() {
 		m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String())
@@ -257,13 +259,16 @@ func (m *model) onServe() {
 func (m *model) Stop() {
 	m.cfg.Unsubscribe(m)
 	m.Supervisor.Stop()
-	devs := m.cfg.Devices()
-	ids := make([]protocol.DeviceID, 0, len(devs))
-	for id := range devs {
-		ids = append(ids, id)
+	m.pmut.RLock()
+	closed := make([]chan struct{}, 0, len(m.conn))
+	for id, conn := range m.conn {
+		closed = append(closed, m.closed[id])
+		go conn.Close(errStopped)
+	}
+	m.pmut.RUnlock()
+	for _, c := range closed {
+		<-c
 	}
-	w := m.closeConns(ids, errStopped)
-	w.Wait()
 }
 
 // StartDeadlockDetector starts a deadlock detector on the models locks which
@@ -393,7 +398,12 @@ func (m *model) warnAboutOverwritingProtectedFiles(cfg config.FolderConfiguratio
 }
 
 func (m *model) removeFolder(cfg config.FolderConfiguration) {
-	m.stopFolder(cfg, fmt.Errorf("removing folder %v", cfg.Description()))
+	m.fmut.RLock()
+	token, ok := m.folderRunnerToken[cfg.ID]
+	m.fmut.RUnlock()
+	if ok {
+		m.RemoveAndWait(token, 0)
+	}
 
 	m.fmut.Lock()
 
@@ -417,22 +427,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
 	db.DropFolder(m.db, cfg.ID)
 }
 
-func (m *model) stopFolder(cfg config.FolderConfiguration, err error) {
-	// Stop the services running for this folder and wait for them to finish
-	// stopping to prevent races on restart.
-	m.fmut.RLock()
-	token, ok := m.folderRunnerToken[cfg.ID]
-	m.fmut.RUnlock()
-
-	if ok {
-		m.RemoveAndWait(token, 0)
-	}
-
-	// Wait for connections to stop to ensure that no more calls to methods
-	// expecting this folder to exist happen (e.g. .IndexUpdate).
-	m.closeConns(cfg.DeviceIDs(), err).Wait()
-}
-
 // Need to hold lock on m.fmut when calling this.
 func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) {
 	// clear up our config maps
@@ -464,25 +458,13 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
 	restartMut.Lock()
 	defer restartMut.Unlock()
 
-	var infoMsg string
-	var errMsg string
-	switch {
-	case to.Paused:
-		infoMsg = "Paused"
-		errMsg = "pausing"
-	case from.Paused:
-		infoMsg = "Unpaused"
-		errMsg = "unpausing"
-	default:
-		infoMsg = "Restarted"
-		errMsg = "restarting"
+	m.fmut.RLock()
+	token, ok := m.folderRunnerToken[from.ID]
+	m.fmut.RUnlock()
+	if ok {
+		m.RemoveAndWait(token, 0)
 	}
 
-	err := fmt.Errorf("%v folder %v", errMsg, to.Description())
-	m.stopFolder(from, err)
-	// Need to send CC change to both from and to devices.
-	m.closeConns(to.DeviceIDs(), err)
-
 	m.fmut.Lock()
 	defer m.fmut.Unlock()
 
@@ -499,6 +481,16 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
 		}
 		m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles)
 	}
+
+	var infoMsg string
+	switch {
+	case to.Paused:
+		infoMsg = "Paused"
+	case from.Paused:
+		infoMsg = "Unpaused"
+	default:
+		infoMsg = "Restarted"
+	}
 	l.Infof("%v folder %v (%v)", infoMsg, to.Description(), to.Type)
 }
 
@@ -507,9 +499,6 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
 	// we do it outside of the lock.
 	fset := db.NewFileSet(cfg.ID, cfg.Filesystem(), m.db)
 
-	// Close connections to affected devices
-	m.closeConns(cfg.DeviceIDs(), fmt.Errorf("started folder %v", cfg.Description()))
-
 	m.fmut.Lock()
 	defer m.fmut.Unlock()
 	m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles)
@@ -992,6 +981,9 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
 	m.pmut.RLock()
 	conn, ok := m.conn[deviceID]
 	closed := m.closed[deviceID]
+	for _, token := range m.indexSenderTokens[deviceID] {
+		m.RemoveAndWait(token, 0)
+	}
 	m.pmut.RUnlock()
 	if !ok {
 		panic("bug: ClusterConfig called on closed or nonexistent connection")
@@ -1024,6 +1016,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
 	}
 
 	var paused []string
+	indexSenderTokens := make([]suture.ServiceToken, 0, len(cm.Folders))
 	for _, folder := range cm.Folders {
 		cfg, ok := m.cfg.Folder(folder.ID)
 		if !ok || !cfg.SharedWith(deviceID) {
@@ -1142,14 +1135,12 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
 			evLogger:     m.evLogger,
 		}
 		is.Service = util.AsService(is.serve, is.String())
-		// The token isn't tracked as the service stops when the connection
-		// terminates and is automatically removed from supervisor (by
-		// implementing suture.IsCompletable).
-		m.Add(is)
+		indexSenderTokens = append(indexSenderTokens, m.Add(is))
 	}
 
 	m.pmut.Lock()
 	m.remotePausedFolders[deviceID] = paused
+	m.indexSenderTokens[deviceID] = indexSenderTokens
 	m.pmut.Unlock()
 
 	// This breaks if we send multiple CM messages during the same connection.
@@ -1397,41 +1388,6 @@ func (m *model) Closed(conn protocol.Connection, err error) {
 	close(closed)
 }
 
-// closeConns will close the underlying connection for given devices and return
-// a waiter that will return once all the connections are finished closing.
-func (m *model) closeConns(devs []protocol.DeviceID, err error) config.Waiter {
-	conns := make([]connections.Connection, 0, len(devs))
-	closed := make([]chan struct{}, 0, len(devs))
-	m.pmut.RLock()
-	for _, dev := range devs {
-		if conn, ok := m.conn[dev]; ok {
-			conns = append(conns, conn)
-			closed = append(closed, m.closed[dev])
-		}
-	}
-	m.pmut.RUnlock()
-	for _, conn := range conns {
-		conn.Close(err)
-	}
-	return &channelWaiter{chans: closed}
-}
-
-// closeConn closes the underlying connection for the given device and returns
-// a waiter that will return once the connection is finished closing.
-func (m *model) closeConn(dev protocol.DeviceID, err error) config.Waiter {
-	return m.closeConns([]protocol.DeviceID{dev}, err)
-}
-
-type channelWaiter struct {
-	chans []chan struct{}
-}
-
-func (w *channelWaiter) Wait() {
-	for _, c := range w.chans {
-		<-c
-	}
-}
-
 // Implements protocol.RequestResponse
 type requestResponse struct {
 	data   []byte
@@ -2467,6 +2423,9 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 
 	// Go through the folder configs and figure out if we need to restart or not.
 
+	// Tracks devices affected by any configuration change to resend ClusterConfig.
+	clusterConfigDevices := make(map[protocol.DeviceID]struct{}, len(from.Devices)+len(to.Devices))
+
 	fromFolders := mapFolders(from.Folders)
 	toFolders := mapFolders(to.Folders)
 	for folderID, cfg := range toFolders {
@@ -2478,6 +2437,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 				l.Infoln("Adding folder", cfg.Description())
 				m.newFolder(cfg, to.Options.CacheIgnoredFiles)
 			}
+			clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, cfg.DeviceIDs())
 		}
 	}
 
@@ -2486,6 +2446,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 		if !ok {
 			// The folder was removed.
 			m.removeFolder(fromCfg)
+			clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs())
 			continue
 		}
 
@@ -2497,6 +2458,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 		// Check if anything differs that requires a restart.
 		if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles {
 			m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles)
+			clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs())
+			clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, toCfg.DeviceIDs())
 		}
 
 		// Emit the folder pause/resume event
@@ -2519,6 +2482,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 	// Pausing a device, unpausing is handled by the connection service.
 	fromDevices := from.DeviceMap()
 	toDevices := to.DeviceMap()
+	closeDevices := make([]protocol.DeviceID, 0, len(to.Devices))
 	for deviceID, toCfg := range toDevices {
 		fromCfg, ok := fromDevices[deviceID]
 		if !ok {
@@ -2534,13 +2498,14 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 		}
 
 		// Ignored folder was removed, reconnect to retrigger the prompt.
-		if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) {
-			m.closeConn(deviceID, errIgnoredFolderRemoved)
+		if !toCfg.Paused && len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) {
+			closeDevices = append(closeDevices, deviceID)
 		}
 
 		if toCfg.Paused {
 			l.Infoln("Pausing", deviceID)
-			m.closeConn(deviceID, errDevicePaused)
+			closeDevices = append(closeDevices, deviceID)
+			delete(clusterConfigDevices, deviceID)
 			m.evLogger.Log(events.DevicePaused, map[string]string{"device": deviceID.String()})
 		} else {
 			m.evLogger.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()})
@@ -2551,9 +2516,28 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 	for deviceID := range fromDevices {
 		delete(m.deviceStatRefs, deviceID)
 		removedDevices = append(removedDevices, deviceID)
+		delete(clusterConfigDevices, deviceID)
 	}
 	m.fmut.Unlock()
-	m.closeConns(removedDevices, errDeviceRemoved)
+
+	m.pmut.RLock()
+	for _, id := range closeDevices {
+		if conn, ok := m.conn[id]; ok {
+			go conn.Close(errDevicePaused)
+		}
+	}
+	for _, id := range removedDevices {
+		if conn, ok := m.conn[id]; ok {
+			go conn.Close(errDeviceRemoved)
+		}
+	}
+	for id := range clusterConfigDevices {
+		if conn, ok := m.conn[id]; ok {
+			cm := m.generateClusterConfig(conn.ID())
+			go conn.ClusterConfig(cm)
+		}
+	}
+	m.pmut.RUnlock()
 
 	m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB())
 	m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency())
@@ -2758,3 +2742,12 @@ func sanitizePath(path string) string {
 
 	return strings.TrimSpace(b.String())
 }
+
+func addDeviceIDsToMap(m map[protocol.DeviceID]struct{}, s []protocol.DeviceID) map[protocol.DeviceID]struct{} {
+	for _, id := range s {
+		if _, ok := m[id]; !ok {
+			m[id] = struct{}{}
+		}
+	}
+	return m
+}

+ 60 - 24
lib/model/model_test.go

@@ -3339,17 +3339,19 @@ func TestConnCloseOnRestart(t *testing.T) {
 	closed := m.closed[device1]
 	m.pmut.RUnlock()
 
-	newFcfg := fcfg.Copy()
-	newFcfg.Paused = true
+	waiter, err := w.RemoveDevice(device1)
+	if err != nil {
+		t.Fatal(err)
+	}
 	done := make(chan struct{})
 	go func() {
-		m.restartFolder(fcfg, newFcfg, false)
+		waiter.Wait()
 		close(done)
 	}()
 	select {
 	case <-done:
 	case <-time.After(5 * time.Second):
-		t.Fatal("Timed out before folder restart returned")
+		t.Fatal("Timed out before config took effect")
 	}
 	select {
 	case <-closed:
@@ -3845,8 +3847,8 @@ func TestScanRenameCaseOnly(t *testing.T) {
 	})
 }
 
-func TestConnectionTerminationOnFolderAdd(t *testing.T) {
-	testConfigChangeClosesConnections(t, false, true, nil, func(cfg config.Wrapper) {
+func TestClusterConfigOnFolderAdd(t *testing.T) {
+	testConfigChangeTriggersClusterConfigs(t, false, true, nil, func(cfg config.Wrapper) {
 		fcfg := testFolderConfigTmp()
 		fcfg.ID = "second"
 		fcfg.Label = "second"
@@ -3859,8 +3861,8 @@ func TestConnectionTerminationOnFolderAdd(t *testing.T) {
 	})
 }
 
-func TestConnectionTerminationOnFolderShare(t *testing.T) {
-	testConfigChangeClosesConnections(t, true, true, nil, func(cfg config.Wrapper) {
+func TestClusterConfigOnFolderShare(t *testing.T) {
+	testConfigChangeTriggersClusterConfigs(t, true, true, nil, func(cfg config.Wrapper) {
 		fcfg := cfg.FolderList()[0]
 		fcfg.Devices = []config.FolderDeviceConfiguration{{device2, protocol.EmptyDeviceID}}
 		if w, err := cfg.SetFolder(fcfg); err != nil {
@@ -3871,8 +3873,8 @@ func TestConnectionTerminationOnFolderShare(t *testing.T) {
 	})
 }
 
-func TestConnectionTerminationOnFolderUnshare(t *testing.T) {
-	testConfigChangeClosesConnections(t, true, false, nil, func(cfg config.Wrapper) {
+func TestClusterConfigOnFolderUnshare(t *testing.T) {
+	testConfigChangeTriggersClusterConfigs(t, true, false, nil, func(cfg config.Wrapper) {
 		fcfg := cfg.FolderList()[0]
 		fcfg.Devices = nil
 		if w, err := cfg.SetFolder(fcfg); err != nil {
@@ -3883,8 +3885,8 @@ func TestConnectionTerminationOnFolderUnshare(t *testing.T) {
 	})
 }
 
-func TestConnectionTerminationOnFolderRemove(t *testing.T) {
-	testConfigChangeClosesConnections(t, true, false, nil, func(cfg config.Wrapper) {
+func TestClusterConfigOnFolderRemove(t *testing.T) {
+	testConfigChangeTriggersClusterConfigs(t, true, false, nil, func(cfg config.Wrapper) {
 		rcfg := cfg.RawCopy()
 		rcfg.Folders = nil
 		if w, err := cfg.Replace(rcfg); err != nil {
@@ -3895,8 +3897,8 @@ func TestConnectionTerminationOnFolderRemove(t *testing.T) {
 	})
 }
 
-func TestConnectionTerminationOnFolderPause(t *testing.T) {
-	testConfigChangeClosesConnections(t, true, false, nil, func(cfg config.Wrapper) {
+func TestClusterConfigOnFolderPause(t *testing.T) {
+	testConfigChangeTriggersClusterConfigs(t, true, false, nil, func(cfg config.Wrapper) {
 		fcfg := cfg.FolderList()[0]
 		fcfg.Paused = true
 		if w, err := cfg.SetFolder(fcfg); err != nil {
@@ -3907,8 +3909,8 @@ func TestConnectionTerminationOnFolderPause(t *testing.T) {
 	})
 }
 
-func TestConnectionTerminationOnFolderUnpause(t *testing.T) {
-	testConfigChangeClosesConnections(t, true, false, func(cfg config.Wrapper) {
+func TestClusterConfigOnFolderUnpause(t *testing.T) {
+	testConfigChangeTriggersClusterConfigs(t, true, false, func(cfg config.Wrapper) {
 		fcfg := cfg.FolderList()[0]
 		fcfg.Paused = true
 		if w, err := cfg.SetFolder(fcfg); err != nil {
@@ -3984,7 +3986,7 @@ func TestScanDeletedROChangedOnSR(t *testing.T) {
 	}
 }
 
-func testConfigChangeClosesConnections(t *testing.T, expectFirstClosed, expectSecondClosed bool, pre func(config.Wrapper), fn func(config.Wrapper)) {
+func testConfigChangeTriggersClusterConfigs(t *testing.T, expectFirst, expectSecond bool, pre func(config.Wrapper), fn func(config.Wrapper)) {
 	t.Helper()
 	wcfg, _ := tmpDefaultWrapper()
 	m := setupModel(wcfg)
@@ -3999,21 +4001,55 @@ func testConfigChangeClosesConnections(t *testing.T, expectFirstClosed, expectSe
 		pre(wcfg)
 	}
 
-	fc1 := &fakeConnection{id: device1, model: m}
-	fc2 := &fakeConnection{id: device2, model: m}
+	cc1 := make(chan struct{}, 1)
+	cc2 := make(chan struct{}, 1)
+	fc1 := &fakeConnection{
+		id:    device1,
+		model: m,
+		clusterConfigFn: func(_ protocol.ClusterConfig) {
+			cc1 <- struct{}{}
+		},
+	}
+	fc2 := &fakeConnection{
+		id:    device2,
+		model: m,
+		clusterConfigFn: func(_ protocol.ClusterConfig) {
+			cc2 <- struct{}{}
+		},
+	}
 	m.AddConnection(fc1, protocol.Hello{})
 	m.AddConnection(fc2, protocol.Hello{})
 
+	// Initial CCs
+	select {
+	case <-cc1:
+	default:
+		t.Fatal("missing initial CC from device1")
+	}
+	select {
+	case <-cc2:
+	default:
+		t.Fatal("missing initial CC from device2")
+	}
+
 	t.Log("Applying config change")
 
 	fn(wcfg)
 
-	if expectFirstClosed != fc1.closed {
-		t.Errorf("first connection state mismatch: %t (expected) != %t", expectFirstClosed, fc1.closed)
+	timeout := time.NewTimer(time.Second)
+	if expectFirst {
+		select {
+		case <-cc1:
+		case <-timeout.C:
+			t.Errorf("timed out before receiving cluste rconfig for first device")
+		}
 	}
-
-	if expectSecondClosed != fc2.closed {
-		t.Errorf("second connection state mismatch: %t (expected) != %t", expectSecondClosed, fc2.closed)
+	if expectSecond {
+		select {
+		case <-cc2:
+		case <-timeout.C:
+			t.Errorf("timed out before receiving cluste rconfig for second device")
+		}
 	}
 }
 

+ 8 - 5
lib/protocol/protocol.go

@@ -322,11 +322,9 @@ func (c *rawConnection) Request(ctx context.Context, folder string, name string,
 }
 
 // ClusterConfig sends the cluster configuration message to the peer.
-// It must be called just once (as per BEP), otherwise it will panic.
 func (c *rawConnection) ClusterConfig(config ClusterConfig) {
 	select {
 	case c.clusterConfigBox <- &config:
-		close(c.clusterConfigBox)
 	case <-c.closed:
 	}
 }
@@ -386,13 +384,12 @@ func (c *rawConnection) dispatcherLoop() (err error) {
 		switch msg := msg.(type) {
 		case *ClusterConfig:
 			l.Debugln("read ClusterConfig message")
-			if state != stateInitial {
-				return fmt.Errorf("protocol error: cluster config message in state %d", state)
+			if state == stateInitial {
+				state = stateReady
 			}
 			if err := c.receiver.ClusterConfig(c.id, *msg); err != nil {
 				return errors.Wrap(err, "receiver error")
 			}
-			state = stateReady
 
 		case *Index:
 			l.Debugln("read Index message")
@@ -683,6 +680,12 @@ func (c *rawConnection) writerLoop() {
 	}
 	for {
 		select {
+		case cc := <-c.clusterConfigBox:
+			err := c.writeMessage(cc)
+			if err != nil {
+				c.internalClose(err)
+				return
+			}
 		case hm := <-c.outbox:
 			err := c.writeMessage(hm.msg)
 			if hm.done != nil {