| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- // Copyright (C) 2015 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at http://mozilla.org/MPL/2.0/.
- package discover
- import (
- "sort"
- stdsync "sync"
- "time"
- "github.com/syncthing/syncthing/lib/protocol"
- "github.com/syncthing/syncthing/lib/sync"
- "github.com/thejerf/suture"
- )
- // The CachingMux aggregates results from multiple Finders. Each Finder has
- // an associated cache time and negative cache time. The cache time sets how
- // long we cache and return successfull lookup results, the negative cache
- // time sets how long we refrain from asking about the same device ID after
- // receiving a negative answer. The value of zero disables caching (positive
- // or negative).
- type CachingMux struct {
- *suture.Supervisor
- finders []cachedFinder
- caches []*cache
- mut sync.Mutex
- }
- // A cachedFinder is a Finder with associated cache timeouts.
- type cachedFinder struct {
- Finder
- cacheTime time.Duration
- negCacheTime time.Duration
- priority int
- }
- // A prioritizedAddress is what we use to sort addresses returned from
- // different sources with different priorities.
- type prioritizedAddress struct {
- priority int
- addr string
- }
- // An error may implement cachedError, in which case it will be interrogated
- // to see how long we should cache the error. This overrides the default
- // negative cache time.
- type cachedError interface {
- CacheFor() time.Duration
- }
- func NewCachingMux() *CachingMux {
- return &CachingMux{
- Supervisor: suture.NewSimple("discover.cachingMux"),
- mut: sync.NewMutex(),
- }
- }
- // Add registers a new Finder, with associated cache timeouts.
- func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int) {
- m.mut.Lock()
- m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime, priority})
- m.caches = append(m.caches, newCache())
- m.mut.Unlock()
- if service, ok := finder.(suture.Service); ok {
- m.Supervisor.Add(service)
- }
- }
- // Lookup attempts to resolve the device ID using any of the added Finders,
- // while obeying the cache settings.
- func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
- var pdirect []prioritizedAddress
- m.mut.Lock()
- for i, finder := range m.finders {
- if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
- // We have a cache entry. Lets see what it says.
- if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
- // It's a positive, valid entry. Use it.
- l.Debugln("cached discovery entry for", deviceID, "at", finder)
- l.Debugln(" cache:", cacheEntry)
- for _, addr := range cacheEntry.Direct {
- pdirect = append(pdirect, prioritizedAddress{finder.priority, addr})
- }
- relays = append(relays, cacheEntry.Relays...)
- continue
- }
- valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
- if !cacheEntry.found && valid {
- // It's a negative, valid entry. We should not make another
- // attempt right now.
- l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
- continue
- }
- // It's expired. Ignore and continue.
- }
- // Perform the actual lookup and cache the result.
- if td, tr, err := finder.Lookup(deviceID); err == nil {
- l.Debugln("lookup for", deviceID, "at", finder)
- l.Debugln(" direct:", td)
- l.Debugln(" relays:", tr)
- for _, addr := range td {
- pdirect = append(pdirect, prioritizedAddress{finder.priority, addr})
- }
- relays = append(relays, tr...)
- m.caches[i].Set(deviceID, CacheEntry{
- Direct: td,
- Relays: tr,
- when: time.Now(),
- found: len(td)+len(tr) > 0,
- })
- } else {
- // Lookup returned error, add a negative cache entry.
- entry := CacheEntry{
- when: time.Now(),
- found: false,
- }
- if err, ok := err.(cachedError); ok {
- entry.validUntil = time.Now().Add(err.CacheFor())
- }
- m.caches[i].Set(deviceID, entry)
- }
- }
- m.mut.Unlock()
- direct = uniqueSortedAddrs(pdirect)
- relays = uniqueSortedRelays(relays)
- l.Debugln("lookup results for", deviceID)
- l.Debugln(" direct: ", direct)
- l.Debugln(" relays: ", relays)
- return direct, relays, nil
- }
- func (m *CachingMux) String() string {
- return "discovery cache"
- }
- func (m *CachingMux) Error() error {
- return nil
- }
- func (m *CachingMux) ChildErrors() map[string]error {
- m.mut.Lock()
- children := make(map[string]error, len(m.finders))
- for _, f := range m.finders {
- children[f.String()] = f.Error()
- }
- m.mut.Unlock()
- return children
- }
- func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
- // Res will be the "total" cache, i.e. the union of our cache and all our
- // children's caches.
- res := make(map[protocol.DeviceID]CacheEntry)
- m.mut.Lock()
- for i := range m.finders {
- // Each finder[i] has a corresponding cache at cache[i]. Go through it
- // and populate the total, if it's newer than what's already in there.
- // We skip any negative cache entries.
- for k, v := range m.caches[i].Cache() {
- if v.found && v.when.After(res[k].when) {
- res[k] = v
- }
- }
- // Then ask the finder itself for it's cache and do the same. If this
- // finder is a global discovery client, it will have no cache. If it's
- // a local discovery client, this will be it's current state.
- for k, v := range m.finders[i].Cache() {
- if v.found && v.when.After(res[k].when) {
- res[k] = v
- }
- }
- }
- m.mut.Unlock()
- return res
- }
- // A cache can be embedded wherever useful
- type cache struct {
- entries map[protocol.DeviceID]CacheEntry
- mut stdsync.Mutex
- }
- func newCache() *cache {
- return &cache{
- entries: make(map[protocol.DeviceID]CacheEntry),
- }
- }
- func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
- c.mut.Lock()
- c.entries[id] = ce
- c.mut.Unlock()
- }
- func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
- c.mut.Lock()
- ce, ok := c.entries[id]
- c.mut.Unlock()
- return ce, ok
- }
- func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
- c.mut.Lock()
- m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
- for k, v := range c.entries {
- m[k] = v
- }
- c.mut.Unlock()
- return m
- }
- func uniqueSortedAddrs(ss []prioritizedAddress) []string {
- // We sort the addresses by priority, then filter them based on seen
- // (first time seen is the on kept, so we retain priority).
- sort.Sort(prioritizedAddressList(ss))
- filtered := make([]string, 0, len(ss))
- seen := make(map[string]struct{}, len(ss))
- for _, s := range ss {
- if _, ok := seen[s.addr]; !ok {
- filtered = append(filtered, s.addr)
- seen[s.addr] = struct{}{}
- }
- }
- return filtered
- }
- func uniqueSortedRelays(rs []Relay) []Relay {
- m := make(map[string]Relay, len(rs))
- for _, r := range rs {
- m[r.URL] = r
- }
- var ur = make([]Relay, 0, len(m))
- for _, r := range m {
- ur = append(ur, r)
- }
- sort.Sort(relayList(ur))
- return ur
- }
- type relayList []Relay
- func (l relayList) Len() int {
- return len(l)
- }
- func (l relayList) Swap(a, b int) {
- l[a], l[b] = l[b], l[a]
- }
- func (l relayList) Less(a, b int) bool {
- return l[a].URL < l[b].URL
- }
- type prioritizedAddressList []prioritizedAddress
- func (l prioritizedAddressList) Len() int {
- return len(l)
- }
- func (l prioritizedAddressList) Swap(a, b int) {
- l[a], l[b] = l[b], l[a]
- }
- func (l prioritizedAddressList) Less(a, b int) bool {
- if l[a].priority != l[b].priority {
- return l[a].priority < l[b].priority
- }
- return l[a].addr < l[b].addr
- }
|