Browse Source

lib/connections: Trust the model to tell us if we are connected

This should address issue as described in https://forum.syncthing.net/t/stun-nig-party-with-paused-devices/10942/13
Essentially the model and the connection service goes out of sync in terms of thinking if we are connected or not.
Resort to model as being the ultimate source of truth.

I can't immediately pin down how this happens, yet some ideas.

ConfigSaved happens in separate routine, so it's possbile that we have some sort of device removed yet connection comes in parallel kind of thing.
However, in this case the connection exists in the model, and does not exist in the connection service and the only way for the connection to be removed
in the connection service is device removal from the config.

Given the subject, this might also be related to the device being paused.

Also, adds more info to the logs

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4533
Audrius Butkevicius 8 years ago
parent
commit
44a542391e

+ 3 - 2
cmd/syncthing/gui.go

@@ -26,6 +26,7 @@ import (
 
 
 	"github.com/rcrowley/go-metrics"
 	"github.com/rcrowley/go-metrics"
 	"github.com/syncthing/syncthing/lib/config"
 	"github.com/syncthing/syncthing/lib/config"
+	"github.com/syncthing/syncthing/lib/connections"
 	"github.com/syncthing/syncthing/lib/db"
 	"github.com/syncthing/syncthing/lib/db"
 	"github.com/syncthing/syncthing/lib/discover"
 	"github.com/syncthing/syncthing/lib/discover"
 	"github.com/syncthing/syncthing/lib/events"
 	"github.com/syncthing/syncthing/lib/events"
@@ -98,7 +99,7 @@ type modelIntf interface {
 	ScanFolders() map[string]error
 	ScanFolders() map[string]error
 	ScanFolderSubdirs(folder string, subs []string) error
 	ScanFolderSubdirs(folder string, subs []string) error
 	BringToFront(folder, file string)
 	BringToFront(folder, file string)
-	ConnectedTo(deviceID protocol.DeviceID) bool
+	Connection(deviceID protocol.DeviceID) (connections.Connection, bool)
 	GlobalSize(folder string) db.Counts
 	GlobalSize(folder string) db.Counts
 	LocalSize(folder string) db.Counts
 	LocalSize(folder string) db.Counts
 	CurrentSequence(folder string) (int64, bool)
 	CurrentSequence(folder string) (int64, bool)
@@ -1259,7 +1260,7 @@ func (s *apiService) getPeerCompletion(w http.ResponseWriter, r *http.Request) {
 	for _, folder := range s.cfg.Folders() {
 	for _, folder := range s.cfg.Folders() {
 		for _, device := range folder.DeviceIDs() {
 		for _, device := range folder.DeviceIDs() {
 			deviceStr := device.String()
 			deviceStr := device.String()
-			if s.model.ConnectedTo(device) {
+			if _, ok := s.model.Connection(device); ok {
 				tot[deviceStr] += s.model.Completion(device, folder.ID).CompletionPct
 				tot[deviceStr] += s.model.Completion(device, folder.ID).CompletionPct
 			} else {
 			} else {
 				tot[deviceStr] = 0
 				tot[deviceStr] = 0

+ 3 - 2
cmd/syncthing/mocked_model_test.go

@@ -9,6 +9,7 @@ package main
 import (
 import (
 	"time"
 	"time"
 
 
+	"github.com/syncthing/syncthing/lib/connections"
 	"github.com/syncthing/syncthing/lib/db"
 	"github.com/syncthing/syncthing/lib/db"
 	"github.com/syncthing/syncthing/lib/model"
 	"github.com/syncthing/syncthing/lib/model"
 	"github.com/syncthing/syncthing/lib/protocol"
 	"github.com/syncthing/syncthing/lib/protocol"
@@ -91,8 +92,8 @@ func (m *mockedModel) ScanFolderSubdirs(folder string, subs []string) error {
 
 
 func (m *mockedModel) BringToFront(folder, file string) {}
 func (m *mockedModel) BringToFront(folder, file string) {}
 
 
-func (m *mockedModel) ConnectedTo(deviceID protocol.DeviceID) bool {
-	return false
+func (m *mockedModel) Connection(deviceID protocol.DeviceID) (connections.Connection, bool) {
+	return nil, false
 }
 }
 
 
 func (m *mockedModel) GlobalSize(folder string) db.Counts {
 func (m *mockedModel) GlobalSize(folder string) db.Counts {

+ 1 - 1
cmd/syncthing/summaryservice.go

@@ -198,7 +198,7 @@ func (c *folderSummaryService) sendSummary(folder string) {
 			// We already know about ourselves.
 			// We already know about ourselves.
 			continue
 			continue
 		}
 		}
-		if !c.model.ConnectedTo(devCfg.DeviceID) {
+		if _, ok := c.model.Connection(devCfg.DeviceID); !ok {
 			// We're not interested in disconnected devices.
 			// We're not interested in disconnected devices.
 			continue
 			continue
 		}
 		}

+ 10 - 30
lib/connections/service.go

@@ -89,9 +89,6 @@ type Service struct {
 	listeners          map[string]genericListener
 	listeners          map[string]genericListener
 	listenerTokens     map[string]suture.ServiceToken
 	listenerTokens     map[string]suture.ServiceToken
 	listenerSupervisor *suture.Supervisor
 	listenerSupervisor *suture.Supervisor
-
-	curConMut         sync.Mutex
-	currentConnection map[protocol.DeviceID]completeConn
 }
 }
 
 
 func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder,
 func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder,
@@ -129,9 +126,6 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *
 			FailureThreshold: 2,
 			FailureThreshold: 2,
 			FailureBackoff:   600 * time.Second,
 			FailureBackoff:   600 * time.Second,
 		}),
 		}),
-
-		curConMut:         sync.NewMutex(),
-		currentConnection: make(map[protocol.DeviceID]completeConn),
 	}
 	}
 	cfg.Subscribe(service)
 	cfg.Subscribe(service)
 
 
@@ -228,15 +222,11 @@ next:
 
 
 		// If we have a relay connection, and the new incoming connection is
 		// If we have a relay connection, and the new incoming connection is
 		// not a relay connection, we should drop that, and prefer this one.
 		// not a relay connection, we should drop that, and prefer this one.
-		connected := s.model.ConnectedTo(remoteID)
-		s.curConMut.Lock()
-		ct, ok := s.currentConnection[remoteID]
-		s.curConMut.Unlock()
-		priorityKnown := ok && connected
+		ct, connected := s.model.Connection(remoteID)
 
 
 		// Lower priority is better, just like nice etc.
 		// Lower priority is better, just like nice etc.
-		if priorityKnown && ct.internalConn.priority > c.priority {
-			l.Debugln("Switching connections", remoteID)
+		if connected && ct.Priority() > c.priority {
+			l.Debugf("Switching connections %s (existing: %s new: %s)", remoteID, ct, c)
 		} else if connected {
 		} else if connected {
 			// We should not already be connected to the other party. TODO: This
 			// We should not already be connected to the other party. TODO: This
 			// could use some better handling. If the old connection is dead but
 			// could use some better handling. If the old connection is dead but
@@ -244,7 +234,7 @@ next:
 			// this one. But in case we are two devices connecting to each other
 			// this one. But in case we are two devices connecting to each other
 			// in parallel we don't want to do that or we end up with no
 			// in parallel we don't want to do that or we end up with no
 			// connections still established...
 			// connections still established...
-			l.Infof("Connected to already connected device (%s)", remoteID)
+			l.Infof("Connected to already connected device %s (existing: %s new: %s)", remoteID, ct, c)
 			c.Close()
 			c.Close()
 			continue
 			continue
 		}
 		}
@@ -284,9 +274,6 @@ next:
 		l.Infof("Established secure connection to %s at %s (%s)", remoteID, name, tlsCipherSuiteNames[c.ConnectionState().CipherSuite])
 		l.Infof("Established secure connection to %s at %s (%s)", remoteID, name, tlsCipherSuiteNames[c.ConnectionState().CipherSuite])
 
 
 		s.model.AddConnection(modelConn, hello)
 		s.model.AddConnection(modelConn, hello)
-		s.curConMut.Lock()
-		s.currentConnection[remoteID] = modelConn
-		s.curConMut.Unlock()
 		continue next
 		continue next
 	}
 	}
 }
 }
@@ -329,19 +316,13 @@ func (s *Service) connect() {
 				continue
 				continue
 			}
 			}
 
 
-			connected := s.model.ConnectedTo(deviceID)
-			s.curConMut.Lock()
-			ct, ok := s.currentConnection[deviceID]
-			s.curConMut.Unlock()
-			priorityKnown := ok && connected
+			ct, connected := s.model.Connection(deviceID)
 
 
-			if priorityKnown && ct.internalConn.priority == bestDialerPrio {
+			if connected && ct.Priority() == bestDialerPrio {
 				// Things are already as good as they can get.
 				// Things are already as good as they can get.
 				continue
 				continue
 			}
 			}
 
 
-			l.Debugln("Reconnect loop for", deviceID)
-
 			var addrs []string
 			var addrs []string
 			for _, addr := range deviceCfg.Addresses {
 			for _, addr := range deviceCfg.Addresses {
 				if addr == "dynamic" {
 				if addr == "dynamic" {
@@ -355,6 +336,8 @@ func (s *Service) connect() {
 				}
 				}
 			}
 			}
 
 
+			l.Debugln("Reconnect loop for", deviceID, addrs)
+
 			seen = append(seen, addrs...)
 			seen = append(seen, addrs...)
 
 
 			dialTargets := make([]dialTarget, 0)
 			dialTargets := make([]dialTarget, 0)
@@ -394,8 +377,8 @@ func (s *Service) connect() {
 
 
 				prio := dialerFactory.Priority()
 				prio := dialerFactory.Priority()
 
 
-				if priorityKnown && prio >= ct.internalConn.priority {
-					l.Debugf("Not dialing using %s as priority is less than current connection (%d >= %d)", dialerFactory, dialerFactory.Priority(), ct.internalConn.priority)
+				if connected && prio >= ct.Priority() {
+					l.Debugf("Not dialing using %s as priority is less than current connection (%d >= %d)", dialerFactory, dialerFactory.Priority(), ct.Priority())
 					continue
 					continue
 				}
 				}
 
 
@@ -492,9 +475,6 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
 
 
 	for _, dev := range from.Devices {
 	for _, dev := range from.Devices {
 		if !newDevices[dev.DeviceID] {
 		if !newDevices[dev.DeviceID] {
-			s.curConMut.Lock()
-			delete(s.currentConnection, dev.DeviceID)
-			s.curConMut.Unlock()
 			warningLimitersMut.Lock()
 			warningLimitersMut.Lock()
 			delete(warningLimiters, dev.DeviceID)
 			delete(warningLimiters, dev.DeviceID)
 			warningLimitersMut.Unlock()
 			warningLimitersMut.Unlock()

+ 6 - 1
lib/connections/structs.go

@@ -27,6 +27,7 @@ type Connection interface {
 	Type() string
 	Type() string
 	Transport() string
 	Transport() string
 	RemoteAddr() net.Addr
 	RemoteAddr() net.Addr
+	Priority() int
 }
 }
 
 
 // completeConn is the aggregation of an internalConn and the
 // completeConn is the aggregation of an internalConn and the
@@ -92,6 +93,10 @@ func (c internalConn) Type() string {
 	return c.connType.String()
 	return c.connType.String()
 }
 }
 
 
+func (c internalConn) Priority() int {
+	return c.priority
+}
+
 func (c internalConn) Transport() string {
 func (c internalConn) Transport() string {
 	transport := c.connType.Transport()
 	transport := c.connType.Transport()
 	host, _, err := net.SplitHostPort(c.LocalAddr().String())
 	host, _, err := net.SplitHostPort(c.LocalAddr().String())
@@ -152,7 +157,7 @@ type genericListener interface {
 type Model interface {
 type Model interface {
 	protocol.Model
 	protocol.Model
 	AddConnection(conn Connection, hello protocol.HelloResult)
 	AddConnection(conn Connection, hello protocol.HelloResult)
-	ConnectedTo(remoteID protocol.DeviceID) bool
+	Connection(remoteID protocol.DeviceID) (Connection, bool)
 	OnHello(protocol.DeviceID, net.Addr, protocol.HelloResult) error
 	OnHello(protocol.DeviceID, net.Addr, protocol.HelloResult) error
 	GetHello(protocol.DeviceID) protocol.HelloIntf
 	GetHello(protocol.DeviceID) protocol.HelloIntf
 }
 }

+ 4 - 4
lib/model/model.go

@@ -1330,15 +1330,15 @@ func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
 	return cf.m.CurrentFolderFile(cf.r, file)
 	return cf.m.CurrentFolderFile(cf.r, file)
 }
 }
 
 
-// ConnectedTo returns true if we are connected to the named device.
-func (m *Model) ConnectedTo(deviceID protocol.DeviceID) bool {
+// Connection returns the current connection for device, and a boolean wether a connection was found.
+func (m *Model) Connection(deviceID protocol.DeviceID) (connections.Connection, bool) {
 	m.pmut.RLock()
 	m.pmut.RLock()
-	_, ok := m.conn[deviceID]
+	cn, ok := m.conn[deviceID]
 	m.pmut.RUnlock()
 	m.pmut.RUnlock()
 	if ok {
 	if ok {
 		m.deviceWasSeen(deviceID)
 		m.deviceWasSeen(deviceID)
 	}
 	}
-	return ok
+	return cn, ok
 }
 }
 
 
 func (m *Model) GetIgnores(folder string) ([]string, []string, error) {
 func (m *Model) GetIgnores(folder string) ([]string, []string, error) {

+ 3 - 0
lib/model/model_test.go

@@ -311,6 +311,9 @@ func (f *fakeConnection) Type() string {
 func (f *fakeConnection) Transport() string {
 func (f *fakeConnection) Transport() string {
 	return "fake"
 	return "fake"
 }
 }
+func (f *fakeConnection) Priority() int {
+	return 9000
+}
 
 
 func (f *fakeConnection) DownloadProgress(folder string, updates []protocol.FileDownloadProgressUpdate) {
 func (f *fakeConnection) DownloadProgress(folder string, updates []protocol.FileDownloadProgressUpdate) {
 	f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{
 	f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{

+ 1 - 1
lib/model/rwfolder.go

@@ -399,7 +399,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
 
 
 			devices := folderFiles.Availability(file.Name)
 			devices := folderFiles.Availability(file.Name)
 			for _, dev := range devices {
 			for _, dev := range devices {
-				if f.model.ConnectedTo(dev) {
+				if _, ok := f.model.Connection(dev); ok {
 					f.queue.Push(file.Name, file.Size, file.ModTime())
 					f.queue.Push(file.Name, file.Size, file.ModTime())
 					changed++
 					changed++
 					return true
 					return true