cache.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "context"
  9. "sort"
  10. stdsync "sync"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/protocol"
  13. "github.com/syncthing/syncthing/lib/sync"
  14. "github.com/syncthing/syncthing/lib/util"
  15. "github.com/thejerf/suture"
  16. )
  17. // The CachingMux aggregates results from multiple Finders. Each Finder has
  18. // an associated cache time and negative cache time. The cache time sets how
  19. // long we cache and return successful lookup results, the negative cache
  20. // time sets how long we refrain from asking about the same device ID after
  21. // receiving a negative answer. The value of zero disables caching (positive
  22. // or negative).
  23. type CachingMux interface {
  24. FinderService
  25. Add(finder Finder, cacheTime, negCacheTime time.Duration)
  26. ChildErrors() map[string]error
  27. }
  28. type cachingMux struct {
  29. *suture.Supervisor
  30. finders []cachedFinder
  31. caches []*cache
  32. mut sync.RWMutex
  33. }
  34. // A cachedFinder is a Finder with associated cache timeouts.
  35. type cachedFinder struct {
  36. Finder
  37. cacheTime time.Duration
  38. negCacheTime time.Duration
  39. }
  40. // An error may implement cachedError, in which case it will be interrogated
  41. // to see how long we should cache the error. This overrides the default
  42. // negative cache time.
  43. type cachedError interface {
  44. CacheFor() time.Duration
  45. }
  46. func NewCachingMux() CachingMux {
  47. return &cachingMux{
  48. Supervisor: suture.New("discover.cachingMux", suture.Spec{
  49. PassThroughPanics: true,
  50. }),
  51. mut: sync.NewRWMutex(),
  52. }
  53. }
  54. // Add registers a new Finder, with associated cache timeouts.
  55. func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
  56. m.mut.Lock()
  57. m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
  58. m.caches = append(m.caches, newCache())
  59. m.mut.Unlock()
  60. if service, ok := finder.(suture.Service); ok {
  61. m.Supervisor.Add(service)
  62. }
  63. }
  64. // Lookup attempts to resolve the device ID using any of the added Finders,
  65. // while obeying the cache settings.
  66. func (m *cachingMux) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) {
  67. m.mut.RLock()
  68. for i, finder := range m.finders {
  69. if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
  70. // We have a cache entry. Lets see what it says.
  71. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  72. // It's a positive, valid entry. Use it.
  73. l.Debugln("cached discovery entry for", deviceID, "at", finder)
  74. l.Debugln(" cache:", cacheEntry)
  75. addresses = append(addresses, cacheEntry.Addresses...)
  76. continue
  77. }
  78. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  79. if !cacheEntry.found && valid {
  80. // It's a negative, valid entry. We should not make another
  81. // attempt right now.
  82. l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
  83. continue
  84. }
  85. // It's expired. Ignore and continue.
  86. }
  87. // Perform the actual lookup and cache the result.
  88. if addrs, err := finder.Lookup(ctx, deviceID); err == nil {
  89. l.Debugln("lookup for", deviceID, "at", finder)
  90. l.Debugln(" addresses:", addrs)
  91. addresses = append(addresses, addrs...)
  92. m.caches[i].Set(deviceID, CacheEntry{
  93. Addresses: addrs,
  94. when: time.Now(),
  95. found: len(addrs) > 0,
  96. })
  97. } else {
  98. // Lookup returned error, add a negative cache entry.
  99. entry := CacheEntry{
  100. when: time.Now(),
  101. found: false,
  102. }
  103. if err, ok := err.(cachedError); ok {
  104. entry.validUntil = time.Now().Add(err.CacheFor())
  105. }
  106. m.caches[i].Set(deviceID, entry)
  107. }
  108. }
  109. m.mut.RUnlock()
  110. addresses = util.UniqueTrimmedStrings(addresses)
  111. sort.Strings(addresses)
  112. l.Debugln("lookup results for", deviceID)
  113. l.Debugln(" addresses: ", addresses)
  114. return addresses, nil
  115. }
  116. func (m *cachingMux) String() string {
  117. return "discovery cache"
  118. }
  119. func (m *cachingMux) Error() error {
  120. return nil
  121. }
  122. func (m *cachingMux) ChildErrors() map[string]error {
  123. children := make(map[string]error, len(m.finders))
  124. m.mut.RLock()
  125. for _, f := range m.finders {
  126. children[f.String()] = f.Error()
  127. }
  128. m.mut.RUnlock()
  129. return children
  130. }
  131. func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry {
  132. // Res will be the "total" cache, i.e. the union of our cache and all our
  133. // children's caches.
  134. res := make(map[protocol.DeviceID]CacheEntry)
  135. m.mut.RLock()
  136. for i := range m.finders {
  137. // Each finder[i] has a corresponding cache at cache[i]. Go through
  138. // it and populate the total, appending any addresses and keeping
  139. // the newest "when" time. We skip any negative cache entries.
  140. for k, v := range m.caches[i].Cache() {
  141. if v.found {
  142. cur := res[k]
  143. if v.when.After(cur.when) {
  144. cur.when = v.when
  145. }
  146. cur.Addresses = append(cur.Addresses, v.Addresses...)
  147. res[k] = cur
  148. }
  149. }
  150. // Then ask the finder itself for its cache and do the same. If this
  151. // finder is a global discovery client, it will have no cache. If it's
  152. // a local discovery client, this will be its current state.
  153. for k, v := range m.finders[i].Cache() {
  154. if v.found {
  155. cur := res[k]
  156. if v.when.After(cur.when) {
  157. cur.when = v.when
  158. }
  159. cur.Addresses = append(cur.Addresses, v.Addresses...)
  160. res[k] = cur
  161. }
  162. }
  163. }
  164. m.mut.RUnlock()
  165. for k, v := range res {
  166. v.Addresses = util.UniqueTrimmedStrings(v.Addresses)
  167. res[k] = v
  168. }
  169. return res
  170. }
  171. // A cache can be embedded wherever useful
  172. type cache struct {
  173. entries map[protocol.DeviceID]CacheEntry
  174. mut stdsync.Mutex
  175. }
  176. func newCache() *cache {
  177. return &cache{
  178. entries: make(map[protocol.DeviceID]CacheEntry),
  179. }
  180. }
  181. func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
  182. c.mut.Lock()
  183. c.entries[id] = ce
  184. c.mut.Unlock()
  185. }
  186. func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
  187. c.mut.Lock()
  188. ce, ok := c.entries[id]
  189. c.mut.Unlock()
  190. return ce, ok
  191. }
  192. func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
  193. c.mut.Lock()
  194. m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
  195. for k, v := range c.entries {
  196. m[k] = v
  197. }
  198. c.mut.Unlock()
  199. return m
  200. }