manager.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate go tool counterfeiter -o mocks/manager.go --fake-name Manager . Manager
  7. package discover
  8. import (
  9. "context"
  10. "crypto/tls"
  11. "fmt"
  12. "log/slog"
  13. "slices"
  14. "sync"
  15. "time"
  16. "github.com/thejerf/suture/v4"
  17. "github.com/syncthing/syncthing/internal/slogutil"
  18. "github.com/syncthing/syncthing/lib/config"
  19. "github.com/syncthing/syncthing/lib/connections/registry"
  20. "github.com/syncthing/syncthing/lib/events"
  21. "github.com/syncthing/syncthing/lib/protocol"
  22. "github.com/syncthing/syncthing/lib/stringutil"
  23. "github.com/syncthing/syncthing/lib/svcutil"
  24. )
  25. // The Manager aggregates results from multiple Finders. Each Finder has
  26. // an associated cache time and negative cache time. The cache time sets how
  27. // long we cache and return successful lookup results, the negative cache
  28. // time sets how long we refrain from asking about the same device ID after
  29. // receiving a negative answer. The value of zero disables caching (positive
  30. // or negative).
  31. type Manager interface {
  32. FinderService
  33. ChildErrors() map[string]error
  34. }
  35. type manager struct {
  36. *suture.Supervisor
  37. myID protocol.DeviceID
  38. cfg config.Wrapper
  39. cert tls.Certificate
  40. evLogger events.Logger
  41. addressLister AddressLister
  42. registry *registry.Registry
  43. finders map[string]cachedFinder
  44. mut sync.RWMutex
  45. }
  46. func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister, registry *registry.Registry) Manager {
  47. m := &manager{
  48. Supervisor: suture.New("discover.Manager", svcutil.SpecWithDebugLogger()),
  49. myID: myID,
  50. cfg: cfg,
  51. cert: cert,
  52. evLogger: evLogger,
  53. addressLister: lister,
  54. registry: registry,
  55. finders: make(map[string]cachedFinder),
  56. }
  57. m.Add(svcutil.AsService(m.serve, m.String()))
  58. return m
  59. }
  60. func (m *manager) serve(ctx context.Context) error {
  61. m.cfg.Subscribe(m)
  62. m.CommitConfiguration(config.Configuration{}, m.cfg.RawCopy())
  63. <-ctx.Done()
  64. m.cfg.Unsubscribe(m)
  65. return nil
  66. }
  67. func (m *manager) addLocked(identity string, finder Finder, cacheTime, negCacheTime time.Duration) {
  68. entry := cachedFinder{
  69. Finder: finder,
  70. cacheTime: cacheTime,
  71. negCacheTime: negCacheTime,
  72. cache: newCache(),
  73. token: nil,
  74. }
  75. if service, ok := finder.(suture.Service); ok {
  76. token := m.Supervisor.Add(service)
  77. entry.token = &token
  78. }
  79. m.finders[identity] = entry
  80. slog.Info("Using discovery mechanism", "identity", identity)
  81. }
  82. func (m *manager) removeLocked(identity string) {
  83. entry, ok := m.finders[identity]
  84. if !ok {
  85. return
  86. }
  87. if entry.token != nil {
  88. err := m.Supervisor.Remove(*entry.token)
  89. if err != nil {
  90. slog.Warn("Failed to remove discovery mechanism", slog.String("identity", identity), slogutil.Error(err))
  91. }
  92. }
  93. delete(m.finders, identity)
  94. slog.Info("Stopped using discovery mechanism", "identity", identity)
  95. }
  96. // Lookup attempts to resolve the device ID using any of the added Finders,
  97. // while obeying the cache settings.
  98. func (m *manager) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) {
  99. m.mut.RLock()
  100. for _, finder := range m.finders {
  101. if cacheEntry, ok := finder.cache.Get(deviceID); ok {
  102. // We have a cache entry. Lets see what it says.
  103. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  104. // It's a positive, valid entry. Use it.
  105. slog.DebugContext(ctx, "Found cached discovery entry", "device", deviceID, "finder", finder, "entry", cacheEntry)
  106. addresses = append(addresses, cacheEntry.Addresses...)
  107. continue
  108. }
  109. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  110. if !cacheEntry.found && valid {
  111. // It's a negative, valid entry. We should not make another
  112. // attempt right now.
  113. slog.DebugContext(ctx, "Negative cache entry", "device", deviceID, "finder", finder, "until1", cacheEntry.when.Add(finder.negCacheTime), "until2", cacheEntry.validUntil)
  114. continue
  115. }
  116. // It's expired. Ignore and continue.
  117. }
  118. // Perform the actual lookup and cache the result.
  119. if addrs, err := finder.Lookup(ctx, deviceID); err == nil {
  120. slog.DebugContext(ctx, "Got finder result", "device", deviceID, "finder", finder, "address", addrs)
  121. addresses = append(addresses, addrs...)
  122. finder.cache.Set(deviceID, CacheEntry{
  123. Addresses: addrs,
  124. when: time.Now(),
  125. found: len(addrs) > 0,
  126. })
  127. } else {
  128. // Lookup returned error, add a negative cache entry.
  129. entry := CacheEntry{
  130. when: time.Now(),
  131. found: false,
  132. }
  133. if err, ok := err.(cachedError); ok {
  134. entry.validUntil = time.Now().Add(err.CacheFor())
  135. }
  136. finder.cache.Set(deviceID, entry)
  137. }
  138. }
  139. m.mut.RUnlock()
  140. addresses = stringutil.UniqueTrimmedStrings(addresses)
  141. slices.Sort(addresses)
  142. slog.DebugContext(ctx, "Final lookup results", "device", deviceID, "addresses", addresses)
  143. return addresses, nil
  144. }
  145. func (*manager) String() string {
  146. return "discovery cache"
  147. }
  148. func (*manager) Error() error {
  149. return nil
  150. }
  151. func (m *manager) ChildErrors() map[string]error {
  152. children := make(map[string]error, len(m.finders))
  153. m.mut.RLock()
  154. for _, f := range m.finders {
  155. children[f.String()] = f.Error()
  156. }
  157. m.mut.RUnlock()
  158. return children
  159. }
  160. func (m *manager) Cache() map[protocol.DeviceID]CacheEntry {
  161. // Res will be the "total" cache, i.e. the union of our cache and all our
  162. // children's caches.
  163. res := make(map[protocol.DeviceID]CacheEntry)
  164. m.mut.RLock()
  165. for _, finder := range m.finders {
  166. // Each finder[i] has a corresponding cache. Go through
  167. // it and populate the total, appending any addresses and keeping
  168. // the newest "when" time. We skip any negative cache finders.
  169. for k, v := range finder.cache.Cache() {
  170. if v.found {
  171. cur := res[k]
  172. if v.when.After(cur.when) {
  173. cur.when = v.when
  174. }
  175. cur.Addresses = append(cur.Addresses, v.Addresses...)
  176. res[k] = cur
  177. }
  178. }
  179. // Then ask the finder itself for its cache and do the same. If this
  180. // finder is a global discovery client, it will have no cache. If it's
  181. // a local discovery client, this will be its current state.
  182. for k, v := range finder.Cache() {
  183. if v.found {
  184. cur := res[k]
  185. if v.when.After(cur.when) {
  186. cur.when = v.when
  187. }
  188. cur.Addresses = append(cur.Addresses, v.Addresses...)
  189. res[k] = cur
  190. }
  191. }
  192. }
  193. m.mut.RUnlock()
  194. for k, v := range res {
  195. v.Addresses = stringutil.UniqueTrimmedStrings(v.Addresses)
  196. res[k] = v
  197. }
  198. return res
  199. }
  200. func (m *manager) CommitConfiguration(_, to config.Configuration) (handled bool) {
  201. m.mut.Lock()
  202. defer m.mut.Unlock()
  203. toIdentities := make(map[string]struct{})
  204. if to.Options.GlobalAnnEnabled {
  205. for _, srv := range to.Options.GlobalDiscoveryServers() {
  206. toIdentities[globalDiscoveryIdentity(srv)] = struct{}{}
  207. }
  208. }
  209. if to.Options.LocalAnnEnabled {
  210. toIdentities[ipv4Identity(to.Options.LocalAnnPort)] = struct{}{}
  211. toIdentities[ipv6Identity(to.Options.LocalAnnMCAddr)] = struct{}{}
  212. }
  213. // Remove things that we're not expected to have.
  214. for identity := range m.finders {
  215. if _, ok := toIdentities[identity]; !ok {
  216. m.removeLocked(identity)
  217. }
  218. }
  219. // Add things we don't have.
  220. if to.Options.GlobalAnnEnabled {
  221. for _, srv := range to.Options.GlobalDiscoveryServers() {
  222. identity := globalDiscoveryIdentity(srv)
  223. // Skip, if it's already running.
  224. if _, ok := m.finders[identity]; ok {
  225. continue
  226. }
  227. gd, err := NewGlobal(srv, m.cert, m.addressLister, m.evLogger, m.registry)
  228. if err != nil {
  229. slog.Warn("Failed to initialize global discovery", slogutil.Error(err))
  230. continue
  231. }
  232. // Each global discovery server gets its results cached for five
  233. // minutes, and is not asked again for a minute when it's returned
  234. // unsuccessfully.
  235. m.addLocked(identity, gd, 5*time.Minute, time.Minute)
  236. }
  237. }
  238. if to.Options.LocalAnnEnabled {
  239. // v4 broadcasts
  240. v4Identity := ipv4Identity(to.Options.LocalAnnPort)
  241. if _, ok := m.finders[v4Identity]; !ok {
  242. bcd, err := NewLocal(m.myID, fmt.Sprintf(":%d", to.Options.LocalAnnPort), m.addressLister, m.evLogger)
  243. if err != nil {
  244. slog.Warn("Failed to initialize IPv4 local discovery", slogutil.Error(err))
  245. } else {
  246. m.addLocked(v4Identity, bcd, 0, 0)
  247. }
  248. }
  249. // v6 multicasts
  250. v6Identity := ipv6Identity(to.Options.LocalAnnMCAddr)
  251. if _, ok := m.finders[v6Identity]; !ok {
  252. mcd, err := NewLocal(m.myID, to.Options.LocalAnnMCAddr, m.addressLister, m.evLogger)
  253. if err != nil {
  254. slog.Warn("Failed to initialize IPv6 local discovery", slogutil.Error(err))
  255. } else {
  256. m.addLocked(v6Identity, mcd, 0, 0)
  257. }
  258. }
  259. }
  260. return true
  261. }