Browse Source

Merge pull request #2343 from calmh/discoprio2

Add discovery source priorities (fixes #2339)
Audrius Butkevicius 10 years ago
parent
commit
7476c583e7
3 changed files with 87 additions and 34 deletions
  1. 10 3
      cmd/syncthing/main.go
  2. 54 22
      lib/discover/cache.go
  3. 23 9
      lib/discover/cache_test.go

+ 10 - 3
cmd/syncthing/main.go

@@ -73,6 +73,13 @@ const (
 	pingEventInterval    = time.Minute
 )
 
+// The discovery results are sorted by their source priority.
+const (
+	ipv6LocalDiscoveryPriority = iota
+	ipv4LocalDiscoveryPriority
+	globalDiscoveryPriority
+)
+
 var l = logger.DefaultLogger
 
 func init() {
@@ -703,7 +710,7 @@ func syncthingMain() {
 			// Each global discovery server gets its results cached for five
 			// minutes, and is not asked again for a minute when it's returned
 			// unsuccessfully.
-			cachedDiscovery.Add(gd, 5*time.Minute, time.Minute)
+			cachedDiscovery.Add(gd, 5*time.Minute, time.Minute, globalDiscoveryPriority)
 		}
 	}
 
@@ -713,14 +720,14 @@ func syncthingMain() {
 		if err != nil {
 			l.Warnln("IPv4 local discovery:", err)
 		} else {
-			cachedDiscovery.Add(bcd, 0, 0)
+			cachedDiscovery.Add(bcd, 0, 0, ipv4LocalDiscoveryPriority)
 		}
 		// v6 multicasts
 		mcd, err := discover.NewLocal(myID, cfg.Options().LocalAnnMCAddr, addrList, relaySvc)
 		if err != nil {
 			l.Warnln("IPv6 local discovery:", err)
 		} else {
-			cachedDiscovery.Add(mcd, 0, 0)
+			cachedDiscovery.Add(mcd, 0, 0, ipv6LocalDiscoveryPriority)
 		}
 	}
 

+ 54 - 22
lib/discover/cache.go

@@ -34,6 +34,14 @@ type cachedFinder struct {
 	Finder
 	cacheTime    time.Duration
 	negCacheTime time.Duration
+	priority     int
+}
+
+// A prioritizedAddress is what we use to sort addresses returned from
+// different sources with different priorities.
+type prioritizedAddress struct {
+	priority int
+	addr     string
 }
 
 func NewCachingMux() *CachingMux {
@@ -44,9 +52,9 @@ func NewCachingMux() *CachingMux {
 }
 
 // Add registers a new Finder, with associated cache timeouts.
-func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
+func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int) {
 	m.mut.Lock()
-	m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
+	m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime, priority})
 	m.caches = append(m.caches, newCache())
 	m.mut.Unlock()
 
@@ -58,6 +66,8 @@ func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
 // Lookup attempts to resolve the device ID using any of the added Finders,
 // while obeying the cache settings.
 func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
+	var pdirect []prioritizedAddress
+
 	m.mut.Lock()
 	for i, finder := range m.finders {
 		if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
@@ -67,9 +77,11 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
 				// It's a positive, valid entry. Use it.
 				if debug {
 					l.Debugln("cached discovery entry for", deviceID, "at", finder.String())
-					l.Debugln("   ", cacheEntry)
+					l.Debugln("  cache:", cacheEntry)
+				}
+				for _, addr := range cacheEntry.Direct {
+					pdirect = append(pdirect, prioritizedAddress{finder.priority, addr})
 				}
-				direct = append(direct, cacheEntry.Direct...)
 				relays = append(relays, cacheEntry.Relays...)
 				continue
 			}
@@ -90,10 +102,12 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
 		if td, tr, err := finder.Lookup(deviceID); err == nil {
 			if debug {
 				l.Debugln("lookup for", deviceID, "at", finder.String())
-				l.Debugln("   ", td)
-				l.Debugln("   ", tr)
+				l.Debugln("  direct:", td)
+				l.Debugln("  relays:", tr)
+			}
+			for _, addr := range td {
+				pdirect = append(pdirect, prioritizedAddress{finder.priority, addr})
 			}
-			direct = append(direct, td...)
 			relays = append(relays, tr...)
 			m.caches[i].Set(deviceID, CacheEntry{
 				Direct: td,
@@ -105,13 +119,15 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
 	}
 	m.mut.Unlock()
 
+	direct = uniqueSortedAddrs(pdirect)
+	relays = uniqueSortedRelays(relays)
 	if debug {
 		l.Debugln("lookup results for", deviceID)
-		l.Debugln("   ", direct)
-		l.Debugln("   ", relays)
+		l.Debugln("  direct: ", direct)
+		l.Debugln("  relays: ", relays)
 	}
 
-	return uniqueSortedStrings(direct), uniqueSortedRelays(relays), nil
+	return direct, relays, nil
 }
 
 func (m *CachingMux) String() string {
@@ -198,20 +214,19 @@ func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
 	return m
 }
 
-func uniqueSortedStrings(ss []string) []string {
-	m := make(map[string]struct{}, len(ss))
+func uniqueSortedAddrs(ss []prioritizedAddress) []string {
+	// We sort the addresses by priority, then filter them based on seen
+	// (first time seen is the on kept, so we retain priority).
+	sort.Sort(prioritizedAddressList(ss))
+	filtered := make([]string, 0, len(ss))
+	seen := make(map[string]struct{}, len(ss))
 	for _, s := range ss {
-		m[s] = struct{}{}
-	}
-
-	var us = make([]string, 0, len(m))
-	for k := range m {
-		us = append(us, k)
+		if _, ok := seen[s.addr]; !ok {
+			filtered = append(filtered, s.addr)
+			seen[s.addr] = struct{}{}
+		}
 	}
-
-	sort.Strings(us)
-
-	return us
+	return filtered
 }
 
 func uniqueSortedRelays(rs []Relay) []Relay {
@@ -243,3 +258,20 @@ func (l relayList) Swap(a, b int) {
 func (l relayList) Less(a, b int) bool {
 	return l[a].URL < l[b].URL
 }
+
+type prioritizedAddressList []prioritizedAddress
+
+func (l prioritizedAddressList) Len() int {
+	return len(l)
+}
+
+func (l prioritizedAddressList) Swap(a, b int) {
+	l[a], l[b] = l[b], l[a]
+}
+
+func (l prioritizedAddressList) Less(a, b int) bool {
+	if l[a].priority != l[b].priority {
+		return l[a].priority < l[b].priority
+	}
+	return l[a].addr < l[b].addr
+}

+ 23 - 9
lib/discover/cache_test.go

@@ -15,7 +15,21 @@ import (
 )
 
 func TestCacheUnique(t *testing.T) {
-	direct := []string{"tcp://192.0.2.42:22000", "tcp://192.0.2.43:22000"}
+	direct0 := []string{"tcp://192.0.2.44:22000", "tcp://192.0.2.42:22000"} // prio 0
+	direct1 := []string{"tcp://192.0.2.43:22000", "tcp://192.0.2.42:22000"} // prio 1
+
+	// what we expect from just direct0
+	direct0Sorted := []string{"tcp://192.0.2.42:22000", "tcp://192.0.2.44:22000"}
+
+	// what we expect from direct0+direct1
+	totalSorted := []string{
+		// first prio 0, sorted
+		"tcp://192.0.2.42:22000", "tcp://192.0.2.44:22000",
+		// then prio 1
+		"tcp://192.0.2.43:22000",
+		// no duplicate .42
+	}
+
 	relays := []Relay{{URL: "relay://192.0.2.44:443"}, {URL: "tcp://192.0.2.45:443"}}
 
 	c := NewCachingMux()
@@ -25,15 +39,15 @@ func TestCacheUnique(t *testing.T) {
 	// Add a fake discovery service and verify we get it's answers through the
 	// cache.
 
-	f1 := &fakeDiscovery{direct, relays}
-	c.Add(f1, time.Minute, 0)
+	f1 := &fakeDiscovery{direct0, relays}
+	c.Add(f1, time.Minute, 0, 0)
 
 	dir, rel, err := c.Lookup(protocol.LocalDeviceID)
 	if err != nil {
 		t.Fatal(err)
 	}
-	if !reflect.DeepEqual(dir, direct) {
-		t.Errorf("Incorrect direct; %+v != %+v", dir, direct)
+	if !reflect.DeepEqual(dir, direct0Sorted) {
+		t.Errorf("Incorrect direct; %+v != %+v", dir, direct0Sorted)
 	}
 	if !reflect.DeepEqual(rel, relays) {
 		t.Errorf("Incorrect relays; %+v != %+v", rel, relays)
@@ -42,15 +56,15 @@ func TestCacheUnique(t *testing.T) {
 	// Add one more that answers in the same way and check that we don't
 	// duplicate or otherwise mess up the responses now.
 
-	f2 := &fakeDiscovery{direct, relays}
-	c.Add(f2, time.Minute, 0)
+	f2 := &fakeDiscovery{direct1, relays}
+	c.Add(f2, time.Minute, 0, 1)
 
 	dir, rel, err = c.Lookup(protocol.LocalDeviceID)
 	if err != nil {
 		t.Fatal(err)
 	}
-	if !reflect.DeepEqual(dir, direct) {
-		t.Errorf("Incorrect direct; %+v != %+v", dir, direct)
+	if !reflect.DeepEqual(dir, totalSorted) {
+		t.Errorf("Incorrect direct; %+v != %+v", dir, totalSorted)
 	}
 	if !reflect.DeepEqual(rel, relays) {
 		t.Errorf("Incorrect relays; %+v != %+v", rel, relays)