cache.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "sort"
  9. stdsync "sync"
  10. "time"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. "github.com/syncthing/syncthing/lib/util"
  14. "github.com/thejerf/suture"
  15. )
  16. // The CachingMux aggregates results from multiple Finders. Each Finder has
  17. // an associated cache time and negative cache time. The cache time sets how
  18. // long we cache and return successful lookup results, the negative cache
  19. // time sets how long we refrain from asking about the same device ID after
  20. // receiving a negative answer. The value of zero disables caching (positive
  21. // or negative).
  22. type CachingMux interface {
  23. FinderService
  24. Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int)
  25. ChildErrors() map[string]error
  26. }
  27. type cachingMux struct {
  28. *suture.Supervisor
  29. finders []cachedFinder
  30. caches []*cache
  31. mut sync.RWMutex
  32. }
  33. // A cachedFinder is a Finder with associated cache timeouts.
  34. type cachedFinder struct {
  35. Finder
  36. cacheTime time.Duration
  37. negCacheTime time.Duration
  38. priority int
  39. }
  40. // A prioritizedAddress is what we use to sort addresses returned from
  41. // different sources with different priorities.
  42. type prioritizedAddress struct {
  43. priority int
  44. addr string
  45. }
  46. // An error may implement cachedError, in which case it will be interrogated
  47. // to see how long we should cache the error. This overrides the default
  48. // negative cache time.
  49. type cachedError interface {
  50. CacheFor() time.Duration
  51. }
  52. func NewCachingMux() CachingMux {
  53. return &cachingMux{
  54. Supervisor: suture.NewSimple("discover.cachingMux"),
  55. mut: sync.NewRWMutex(),
  56. }
  57. }
  58. // Add registers a new Finder, with associated cache timeouts.
  59. func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int) {
  60. m.mut.Lock()
  61. m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime, priority})
  62. m.caches = append(m.caches, newCache())
  63. m.mut.Unlock()
  64. if service, ok := finder.(suture.Service); ok {
  65. m.Supervisor.Add(service)
  66. }
  67. }
  68. // Lookup attempts to resolve the device ID using any of the added Finders,
  69. // while obeying the cache settings.
  70. func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err error) {
  71. var paddresses []prioritizedAddress
  72. m.mut.RLock()
  73. for i, finder := range m.finders {
  74. if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
  75. // We have a cache entry. Lets see what it says.
  76. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  77. // It's a positive, valid entry. Use it.
  78. l.Debugln("cached discovery entry for", deviceID, "at", finder)
  79. l.Debugln(" cache:", cacheEntry)
  80. for _, addr := range cacheEntry.Addresses {
  81. paddresses = append(paddresses, prioritizedAddress{finder.priority, addr})
  82. }
  83. continue
  84. }
  85. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  86. if !cacheEntry.found && valid {
  87. // It's a negative, valid entry. We should not make another
  88. // attempt right now.
  89. l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
  90. continue
  91. }
  92. // It's expired. Ignore and continue.
  93. }
  94. // Perform the actual lookup and cache the result.
  95. if addrs, err := finder.Lookup(deviceID); err == nil {
  96. l.Debugln("lookup for", deviceID, "at", finder)
  97. l.Debugln(" addresses:", addrs)
  98. for _, addr := range addrs {
  99. paddresses = append(paddresses, prioritizedAddress{finder.priority, addr})
  100. }
  101. m.caches[i].Set(deviceID, CacheEntry{
  102. Addresses: addrs,
  103. when: time.Now(),
  104. found: len(addrs) > 0,
  105. })
  106. } else {
  107. // Lookup returned error, add a negative cache entry.
  108. entry := CacheEntry{
  109. when: time.Now(),
  110. found: false,
  111. }
  112. if err, ok := err.(cachedError); ok {
  113. entry.validUntil = time.Now().Add(err.CacheFor())
  114. }
  115. m.caches[i].Set(deviceID, entry)
  116. }
  117. }
  118. m.mut.RUnlock()
  119. addresses = uniqueSortedAddrs(paddresses)
  120. l.Debugln("lookup results for", deviceID)
  121. l.Debugln(" addresses: ", addresses)
  122. return addresses, nil
  123. }
  124. func (m *cachingMux) String() string {
  125. return "discovery cache"
  126. }
  127. func (m *cachingMux) Error() error {
  128. return nil
  129. }
  130. func (m *cachingMux) ChildErrors() map[string]error {
  131. children := make(map[string]error, len(m.finders))
  132. m.mut.RLock()
  133. for _, f := range m.finders {
  134. children[f.String()] = f.Error()
  135. }
  136. m.mut.RUnlock()
  137. return children
  138. }
  139. func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry {
  140. // Res will be the "total" cache, i.e. the union of our cache and all our
  141. // children's caches.
  142. res := make(map[protocol.DeviceID]CacheEntry)
  143. m.mut.RLock()
  144. for i := range m.finders {
  145. // Each finder[i] has a corresponding cache at cache[i]. Go through
  146. // it and populate the total, appending any addresses and keeping
  147. // the newest "when" time. We skip any negative cache entries.
  148. for k, v := range m.caches[i].Cache() {
  149. if v.found {
  150. cur := res[k]
  151. if v.when.After(cur.when) {
  152. cur.when = v.when
  153. }
  154. cur.Addresses = append(cur.Addresses, v.Addresses...)
  155. res[k] = cur
  156. }
  157. }
  158. // Then ask the finder itself for it's cache and do the same. If this
  159. // finder is a global discovery client, it will have no cache. If it's
  160. // a local discovery client, this will be it's current state.
  161. for k, v := range m.finders[i].Cache() {
  162. if v.found {
  163. cur := res[k]
  164. if v.when.After(cur.when) {
  165. cur.when = v.when
  166. }
  167. cur.Addresses = append(cur.Addresses, v.Addresses...)
  168. res[k] = cur
  169. }
  170. }
  171. }
  172. m.mut.RUnlock()
  173. for k, v := range res {
  174. v.Addresses = util.UniqueStrings(v.Addresses)
  175. res[k] = v
  176. }
  177. return res
  178. }
  179. // A cache can be embedded wherever useful
  180. type cache struct {
  181. entries map[protocol.DeviceID]CacheEntry
  182. mut stdsync.Mutex
  183. }
  184. func newCache() *cache {
  185. return &cache{
  186. entries: make(map[protocol.DeviceID]CacheEntry),
  187. }
  188. }
  189. func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
  190. c.mut.Lock()
  191. c.entries[id] = ce
  192. c.mut.Unlock()
  193. }
  194. func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
  195. c.mut.Lock()
  196. ce, ok := c.entries[id]
  197. c.mut.Unlock()
  198. return ce, ok
  199. }
  200. func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
  201. c.mut.Lock()
  202. m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
  203. for k, v := range c.entries {
  204. m[k] = v
  205. }
  206. c.mut.Unlock()
  207. return m
  208. }
  209. func uniqueSortedAddrs(ss []prioritizedAddress) []string {
  210. // We sort the addresses by priority, then filter them based on seen
  211. // (first time seen is the on kept, so we retain priority).
  212. sort.Sort(prioritizedAddressList(ss))
  213. filtered := make([]string, 0, len(ss))
  214. seen := make(map[string]struct{}, len(ss))
  215. for _, s := range ss {
  216. if _, ok := seen[s.addr]; !ok {
  217. filtered = append(filtered, s.addr)
  218. seen[s.addr] = struct{}{}
  219. }
  220. }
  221. return filtered
  222. }
  223. type prioritizedAddressList []prioritizedAddress
  224. func (l prioritizedAddressList) Len() int {
  225. return len(l)
  226. }
  227. func (l prioritizedAddressList) Swap(a, b int) {
  228. l[a], l[b] = l[b], l[a]
  229. }
  230. func (l prioritizedAddressList) Less(a, b int) bool {
  231. if l[a].priority != l[b].priority {
  232. return l[a].priority < l[b].priority
  233. }
  234. return l[a].addr < l[b].addr
  235. }