Jelajahi Sumber

lib/model, lib/protocol: Wait for reader/writer loops on close (fixes #4170) (#5657)

* lib/protocol: Wait for reader/writer loops on close (fixes #4170)

* waitgroup

* lib/model: Don't hold lock while closing connection

* fix comments

* review (lock once, func argument) and naming
Simon Frei 6 tahun lalu
induk
melakukan
5da41f75fa
2 mengubah file dengan 50 tambahan dan 44 penghapusan
  1. 32 36
      lib/model/model.go
  2. 18 8
      lib/protocol/protocol.go

+ 32 - 36
lib/model/model.go

@@ -238,15 +238,14 @@ func (m *model) StartDeadlockDetector(timeout time.Duration) {
 // StartFolder constructs the folder service and starts it.
 func (m *model) StartFolder(folder string) {
 	m.fmut.Lock()
-	m.pmut.Lock()
+	defer m.fmut.Unlock()
 	folderCfg := m.folderCfgs[folder]
 	m.startFolderLocked(folderCfg)
-	m.pmut.Unlock()
-	m.fmut.Unlock()
 
 	l.Infof("Ready to synchronize %s (%s)", folderCfg.Description(), folderCfg.Type)
 }
 
+// Need to hold lock on m.fmut when calling this.
 func (m *model) startFolderLocked(cfg config.FolderConfiguration) {
 	if err := m.checkFolderRunningLocked(cfg.ID); err == errFolderMissing {
 		panic("cannot start nonexistent folder " + cfg.Description())
@@ -274,9 +273,9 @@ func (m *model) startFolderLocked(cfg config.FolderConfiguration) {
 	}
 
 	// Close connections to affected devices
-	for _, id := range cfg.DeviceIDs() {
-		m.closeLocked(id, fmt.Errorf("started folder %v", cfg.Description()))
-	}
+	m.fmut.Unlock()
+	m.closeConns(cfg.DeviceIDs(), fmt.Errorf("started folder %v", cfg.Description()))
+	m.fmut.Lock()
 
 	v, ok := fset.Sequence(protocol.LocalDeviceID), true
 	indexHasFiles := ok && v > 0
@@ -382,9 +381,7 @@ func (m *model) addFolderLocked(cfg config.FolderConfiguration) {
 
 func (m *model) RemoveFolder(cfg config.FolderConfiguration) {
 	m.fmut.Lock()
-	m.pmut.Lock()
 	defer m.fmut.Unlock()
-	defer m.pmut.Unlock()
 
 	// Delete syncthing specific files
 	cfg.Filesystem().RemoveAll(config.DefaultMarkerName)
@@ -394,24 +391,24 @@ func (m *model) RemoveFolder(cfg config.FolderConfiguration) {
 	db.DropFolder(m.db, cfg.ID)
 }
 
+// Need to hold lock on m.fmut when calling this.
 func (m *model) tearDownFolderLocked(cfg config.FolderConfiguration, err error) {
-	// Close connections to affected devices
-	// Must happen before stopping the folder service to abort ongoing
-	// transmissions and thus allow timely service termination.
-	for _, dev := range cfg.Devices {
-		m.closeLocked(dev.DeviceID, err)
-	}
-
 	// Stop the services running for this folder and wait for them to finish
 	// stopping to prevent races on restart.
 	tokens := m.folderRunnerTokens[cfg.ID]
-	m.pmut.Unlock()
+
 	m.fmut.Unlock()
+
+	// Close connections to affected devices
+	// Must happen before stopping the folder service to abort ongoing
+	// transmissions and thus allow timely service termination.
+	m.closeConns(cfg.DeviceIDs(), err)
+
 	for _, id := range tokens {
 		m.RemoveAndWait(id, 0)
 	}
+
 	m.fmut.Lock()
-	m.pmut.Lock()
 
 	// Clean up our config maps
 	delete(m.folderCfgs, cfg.ID)
@@ -439,11 +436,6 @@ func (m *model) RestartFolder(from, to config.FolderConfiguration) {
 	restartMut.Lock()
 	defer restartMut.Unlock()
 
-	m.fmut.Lock()
-	m.pmut.Lock()
-	defer m.fmut.Unlock()
-	defer m.pmut.Unlock()
-
 	var infoMsg string
 	var errMsg string
 	switch {
@@ -458,6 +450,9 @@ func (m *model) RestartFolder(from, to config.FolderConfiguration) {
 		errMsg = "restarting"
 	}
 
+	m.fmut.Lock()
+	defer m.fmut.Unlock()
+
 	m.tearDownFolderLocked(from, fmt.Errorf("%v folder %v", errMsg, to.Description()))
 	if !to.Paused {
 		m.addFolderLocked(to)
@@ -1409,22 +1404,23 @@ func (m *model) Closed(conn protocol.Connection, err error) {
 	close(closed)
 }
 
-// close will close the underlying connection for a given device
-func (m *model) close(device protocol.DeviceID, err error) {
+// closeConns will close the underlying connection for given devices
+func (m *model) closeConns(devs []protocol.DeviceID, err error) {
+	conns := make([]connections.Connection, 0, len(devs))
 	m.pmut.Lock()
-	m.closeLocked(device, err)
+	for _, dev := range devs {
+		if conn, ok := m.conn[dev]; ok {
+			conns = append(conns, conn)
+		}
+	}
 	m.pmut.Unlock()
-}
-
-// closeLocked will close the underlying connection for a given device
-func (m *model) closeLocked(device protocol.DeviceID, err error) {
-	conn, ok := m.conn[device]
-	if !ok {
-		// There is no connection to close
-		return
+	for _, conn := range conns {
+		conn.Close(err)
 	}
+}
 
-	conn.Close(err)
+func (m *model) closeConn(dev protocol.DeviceID, err error) {
+	m.closeConns([]protocol.DeviceID{dev}, err)
 }
 
 // Implements protocol.RequestResponse
@@ -2569,12 +2565,12 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
 
 		// Ignored folder was removed, reconnect to retrigger the prompt.
 		if len(fromCfg.IgnoredFolders) > len(toCfg.IgnoredFolders) {
-			m.close(deviceID, errIgnoredFolderRemoved)
+			m.closeConn(deviceID, errIgnoredFolderRemoved)
 		}
 
 		if toCfg.Paused {
 			l.Infoln("Pausing", deviceID)
-			m.close(deviceID, errDevicePaused)
+			m.closeConn(deviceID, errDevicePaused)
 			events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()})
 		} else {
 			events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()})

+ 18 - 8
lib/protocol/protocol.go

@@ -187,6 +187,7 @@ type rawConnection struct {
 	closed            chan struct{}
 	closeOnce         sync.Once
 	sendCloseOnce     sync.Once
+	wg                sync.WaitGroup
 	compression       Compression
 }
 
@@ -239,6 +240,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
 // Start creates the goroutines for sending and receiving of messages. It must
 // be called exactly once after creating a connection.
 func (c *rawConnection) Start() {
+	c.wg.Add(4)
 	go func() {
 		err := c.readerLoop()
 		c.internalClose(err)
@@ -362,13 +364,12 @@ func (c *rawConnection) ping() bool {
 }
 
 func (c *rawConnection) readerLoop() (err error) {
+	defer c.wg.Done()
 	fourByteBuf := make([]byte, 4)
 	state := stateInitial
 	for {
-		select {
-		case <-c.closed:
+		if c.Closed() {
 			return ErrClosed
-		default:
 		}
 
 		msg, err := c.readMessage(fourByteBuf)
@@ -660,6 +661,7 @@ func (c *rawConnection) send(msg message, done chan struct{}) bool {
 }
 
 func (c *rawConnection) writerLoop() {
+	defer c.wg.Done()
 	for {
 		select {
 		case hm := <-c.outbox:
@@ -846,10 +848,7 @@ func (c *rawConnection) Close(err error) {
 		}
 	})
 
-	// No more sends are necessary, therefore further steps to close the
-	// connection outside of this package can proceed immediately.
-	// And this prevents a potential deadlock due to calling c.receiver.Closed
-	go c.internalClose(err)
+	c.internalClose(err)
 }
 
 // internalClose is called if there is an unexpected error during normal operation.
@@ -867,7 +866,14 @@ func (c *rawConnection) internalClose(err error) {
 		}
 		c.awaitingMut.Unlock()
 
-		c.receiver.Closed(c, err)
+		// Wait for all our operations to terminate before signaling
+		// to the receiver that the connection was closed.
+		c.wg.Wait()
+
+		// No more sends are necessary, therefore further steps to close the
+		// connection outside of this package can proceed immediately.
+		// And this prevents a potential deadlock.
+		go c.receiver.Closed(c, err)
 	})
 }
 
@@ -877,6 +883,8 @@ func (c *rawConnection) internalClose(err error) {
 // results in an effecting ping interval of somewhere between
 // PingSendInterval/2 and PingSendInterval.
 func (c *rawConnection) pingSender() {
+	defer c.wg.Done()
+
 	ticker := time.NewTicker(PingSendInterval / 2)
 	defer ticker.Stop()
 
@@ -902,6 +910,8 @@ func (c *rawConnection) pingSender() {
 // but we expect pings in the absence of other messages) within the last
 // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
 func (c *rawConnection) pingReceiver() {
+	defer c.wg.Done()
+
 	ticker := time.NewTicker(ReceiveTimeout / 2)
 	defer ticker.Stop()