manager.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate counterfeiter -o mocks/manager.go --fake-name Manager . Manager
  7. package discover
  8. import (
  9. "context"
  10. "crypto/tls"
  11. "fmt"
  12. "sort"
  13. "time"
  14. "github.com/thejerf/suture/v4"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. "github.com/syncthing/syncthing/lib/svcutil"
  19. "github.com/syncthing/syncthing/lib/sync"
  20. "github.com/syncthing/syncthing/lib/util"
  21. )
  22. // The Manager aggregates results from multiple Finders. Each Finder has
  23. // an associated cache time and negative cache time. The cache time sets how
  24. // long we cache and return successful lookup results, the negative cache
  25. // time sets how long we refrain from asking about the same device ID after
  26. // receiving a negative answer. The value of zero disables caching (positive
  27. // or negative).
  28. type Manager interface {
  29. FinderService
  30. ChildErrors() map[string]error
  31. }
  32. type manager struct {
  33. *suture.Supervisor
  34. myID protocol.DeviceID
  35. cfg config.Wrapper
  36. cert tls.Certificate
  37. evLogger events.Logger
  38. addressLister AddressLister
  39. finders map[string]cachedFinder
  40. mut sync.RWMutex
  41. }
  42. func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister) Manager {
  43. m := &manager{
  44. Supervisor: suture.New("discover.Manager", svcutil.SpecWithDebugLogger(l)),
  45. myID: myID,
  46. cfg: cfg,
  47. cert: cert,
  48. evLogger: evLogger,
  49. addressLister: lister,
  50. finders: make(map[string]cachedFinder),
  51. mut: sync.NewRWMutex(),
  52. }
  53. m.Add(svcutil.AsService(m.serve, m.String()))
  54. return m
  55. }
  56. func (m *manager) serve(ctx context.Context) error {
  57. m.cfg.Subscribe(m)
  58. m.CommitConfiguration(config.Configuration{}, m.cfg.RawCopy())
  59. <-ctx.Done()
  60. m.cfg.Unsubscribe(m)
  61. return nil
  62. }
  63. func (m *manager) addLocked(identity string, finder Finder, cacheTime, negCacheTime time.Duration) {
  64. entry := cachedFinder{
  65. Finder: finder,
  66. cacheTime: cacheTime,
  67. negCacheTime: negCacheTime,
  68. cache: newCache(),
  69. token: nil,
  70. }
  71. if service, ok := finder.(suture.Service); ok {
  72. token := m.Supervisor.Add(service)
  73. entry.token = &token
  74. }
  75. m.finders[identity] = entry
  76. l.Infoln("Using discovery mechanism:", identity)
  77. }
  78. func (m *manager) removeLocked(identity string) {
  79. entry, ok := m.finders[identity]
  80. if !ok {
  81. return
  82. }
  83. if entry.token != nil {
  84. err := m.Supervisor.Remove(*entry.token)
  85. if err != nil {
  86. l.Warnf("removing discovery %s: %s", identity, err)
  87. }
  88. }
  89. delete(m.finders, identity)
  90. l.Infoln("Stopped using discovery mechanism: ", identity)
  91. }
  92. // Lookup attempts to resolve the device ID using any of the added Finders,
  93. // while obeying the cache settings.
  94. func (m *manager) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) {
  95. m.mut.RLock()
  96. for _, finder := range m.finders {
  97. if cacheEntry, ok := finder.cache.Get(deviceID); ok {
  98. // We have a cache entry. Lets see what it says.
  99. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  100. // It's a positive, valid entry. Use it.
  101. l.Debugln("cached discovery entry for", deviceID, "at", finder)
  102. l.Debugln(" cache:", cacheEntry)
  103. addresses = append(addresses, cacheEntry.Addresses...)
  104. continue
  105. }
  106. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  107. if !cacheEntry.found && valid {
  108. // It's a negative, valid entry. We should not make another
  109. // attempt right now.
  110. l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
  111. continue
  112. }
  113. // It's expired. Ignore and continue.
  114. }
  115. // Perform the actual lookup and cache the result.
  116. if addrs, err := finder.Lookup(ctx, deviceID); err == nil {
  117. l.Debugln("lookup for", deviceID, "at", finder)
  118. l.Debugln(" addresses:", addrs)
  119. addresses = append(addresses, addrs...)
  120. finder.cache.Set(deviceID, CacheEntry{
  121. Addresses: addrs,
  122. when: time.Now(),
  123. found: len(addrs) > 0,
  124. })
  125. } else {
  126. // Lookup returned error, add a negative cache entry.
  127. entry := CacheEntry{
  128. when: time.Now(),
  129. found: false,
  130. }
  131. if err, ok := err.(cachedError); ok {
  132. entry.validUntil = time.Now().Add(err.CacheFor())
  133. }
  134. finder.cache.Set(deviceID, entry)
  135. }
  136. }
  137. m.mut.RUnlock()
  138. addresses = util.UniqueTrimmedStrings(addresses)
  139. sort.Strings(addresses)
  140. l.Debugln("lookup results for", deviceID)
  141. l.Debugln(" addresses: ", addresses)
  142. return addresses, nil
  143. }
  144. func (m *manager) String() string {
  145. return "discovery cache"
  146. }
  147. func (m *manager) Error() error {
  148. return nil
  149. }
  150. func (m *manager) ChildErrors() map[string]error {
  151. children := make(map[string]error, len(m.finders))
  152. m.mut.RLock()
  153. for _, f := range m.finders {
  154. children[f.String()] = f.Error()
  155. }
  156. m.mut.RUnlock()
  157. return children
  158. }
  159. func (m *manager) Cache() map[protocol.DeviceID]CacheEntry {
  160. // Res will be the "total" cache, i.e. the union of our cache and all our
  161. // children's caches.
  162. res := make(map[protocol.DeviceID]CacheEntry)
  163. m.mut.RLock()
  164. for _, finder := range m.finders {
  165. // Each finder[i] has a corresponding cache. Go through
  166. // it and populate the total, appending any addresses and keeping
  167. // the newest "when" time. We skip any negative cache finders.
  168. for k, v := range finder.cache.Cache() {
  169. if v.found {
  170. cur := res[k]
  171. if v.when.After(cur.when) {
  172. cur.when = v.when
  173. }
  174. cur.Addresses = append(cur.Addresses, v.Addresses...)
  175. res[k] = cur
  176. }
  177. }
  178. // Then ask the finder itself for its cache and do the same. If this
  179. // finder is a global discovery client, it will have no cache. If it's
  180. // a local discovery client, this will be its current state.
  181. for k, v := range finder.Cache() {
  182. if v.found {
  183. cur := res[k]
  184. if v.when.After(cur.when) {
  185. cur.when = v.when
  186. }
  187. cur.Addresses = append(cur.Addresses, v.Addresses...)
  188. res[k] = cur
  189. }
  190. }
  191. }
  192. m.mut.RUnlock()
  193. for k, v := range res {
  194. v.Addresses = util.UniqueTrimmedStrings(v.Addresses)
  195. res[k] = v
  196. }
  197. return res
  198. }
  199. func (m *manager) VerifyConfiguration(_, _ config.Configuration) error {
  200. return nil
  201. }
  202. func (m *manager) CommitConfiguration(_, to config.Configuration) (handled bool) {
  203. m.mut.Lock()
  204. defer m.mut.Unlock()
  205. toIdentities := make(map[string]struct{})
  206. if to.Options.GlobalAnnEnabled {
  207. for _, srv := range to.Options.GlobalDiscoveryServers() {
  208. toIdentities[globalDiscoveryIdentity(srv)] = struct{}{}
  209. }
  210. }
  211. if to.Options.LocalAnnEnabled {
  212. toIdentities[ipv4Identity(to.Options.LocalAnnPort)] = struct{}{}
  213. toIdentities[ipv6Identity(to.Options.LocalAnnMCAddr)] = struct{}{}
  214. }
  215. // Remove things that we're not expected to have.
  216. for identity := range m.finders {
  217. if _, ok := toIdentities[identity]; !ok {
  218. m.removeLocked(identity)
  219. }
  220. }
  221. // Add things we don't have.
  222. if to.Options.GlobalAnnEnabled {
  223. for _, srv := range to.Options.GlobalDiscoveryServers() {
  224. identity := globalDiscoveryIdentity(srv)
  225. // Skip, if it's already running.
  226. if _, ok := m.finders[identity]; ok {
  227. continue
  228. }
  229. gd, err := NewGlobal(srv, m.cert, m.addressLister, m.evLogger)
  230. if err != nil {
  231. l.Warnln("Global discovery:", err)
  232. continue
  233. }
  234. // Each global discovery server gets its results cached for five
  235. // minutes, and is not asked again for a minute when it's returned
  236. // unsuccessfully.
  237. m.addLocked(identity, gd, 5*time.Minute, time.Minute)
  238. }
  239. }
  240. if to.Options.LocalAnnEnabled {
  241. // v4 broadcasts
  242. v4Identity := ipv4Identity(to.Options.LocalAnnPort)
  243. if _, ok := m.finders[v4Identity]; !ok {
  244. bcd, err := NewLocal(m.myID, fmt.Sprintf(":%d", to.Options.LocalAnnPort), m.addressLister, m.evLogger)
  245. if err != nil {
  246. l.Warnln("IPv4 local discovery:", err)
  247. } else {
  248. m.addLocked(v4Identity, bcd, 0, 0)
  249. }
  250. }
  251. // v6 multicasts
  252. v6Identity := ipv6Identity(to.Options.LocalAnnMCAddr)
  253. if _, ok := m.finders[v6Identity]; !ok {
  254. mcd, err := NewLocal(m.myID, to.Options.LocalAnnMCAddr, m.addressLister, m.evLogger)
  255. if err != nil {
  256. l.Warnln("IPv6 local discovery:", err)
  257. } else {
  258. m.addLocked(v6Identity, mcd, 0, 0)
  259. }
  260. }
  261. }
  262. return true
  263. }