cache.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "sort"
  9. stdsync "sync"
  10. "time"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. "github.com/syncthing/syncthing/lib/util"
  14. "github.com/thejerf/suture"
  15. )
  16. // The CachingMux aggregates results from multiple Finders. Each Finder has
  17. // an associated cache time and negative cache time. The cache time sets how
  18. // long we cache and return successful lookup results, the negative cache
  19. // time sets how long we refrain from asking about the same device ID after
  20. // receiving a negative answer. The value of zero disables caching (positive
  21. // or negative).
  22. type CachingMux interface {
  23. FinderService
  24. Add(finder Finder, cacheTime, negCacheTime time.Duration)
  25. ChildErrors() map[string]error
  26. }
  27. type cachingMux struct {
  28. *suture.Supervisor
  29. finders []cachedFinder
  30. caches []*cache
  31. mut sync.RWMutex
  32. }
  33. // A cachedFinder is a Finder with associated cache timeouts.
  34. type cachedFinder struct {
  35. Finder
  36. cacheTime time.Duration
  37. negCacheTime time.Duration
  38. }
  39. // An error may implement cachedError, in which case it will be interrogated
  40. // to see how long we should cache the error. This overrides the default
  41. // negative cache time.
  42. type cachedError interface {
  43. CacheFor() time.Duration
  44. }
  45. func NewCachingMux() CachingMux {
  46. return &cachingMux{
  47. Supervisor: suture.New("discover.cachingMux", suture.Spec{
  48. PassThroughPanics: true,
  49. }),
  50. mut: sync.NewRWMutex(),
  51. }
  52. }
  53. // Add registers a new Finder, with associated cache timeouts.
  54. func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
  55. m.mut.Lock()
  56. m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
  57. m.caches = append(m.caches, newCache())
  58. m.mut.Unlock()
  59. if service, ok := finder.(suture.Service); ok {
  60. m.Supervisor.Add(service)
  61. }
  62. }
  63. // Lookup attempts to resolve the device ID using any of the added Finders,
  64. // while obeying the cache settings.
  65. func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err error) {
  66. m.mut.RLock()
  67. for i, finder := range m.finders {
  68. if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
  69. // We have a cache entry. Lets see what it says.
  70. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  71. // It's a positive, valid entry. Use it.
  72. l.Debugln("cached discovery entry for", deviceID, "at", finder)
  73. l.Debugln(" cache:", cacheEntry)
  74. addresses = append(addresses, cacheEntry.Addresses...)
  75. continue
  76. }
  77. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  78. if !cacheEntry.found && valid {
  79. // It's a negative, valid entry. We should not make another
  80. // attempt right now.
  81. l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
  82. continue
  83. }
  84. // It's expired. Ignore and continue.
  85. }
  86. // Perform the actual lookup and cache the result.
  87. if addrs, err := finder.Lookup(deviceID); err == nil {
  88. l.Debugln("lookup for", deviceID, "at", finder)
  89. l.Debugln(" addresses:", addrs)
  90. addresses = append(addresses, addrs...)
  91. m.caches[i].Set(deviceID, CacheEntry{
  92. Addresses: addrs,
  93. when: time.Now(),
  94. found: len(addrs) > 0,
  95. })
  96. } else {
  97. // Lookup returned error, add a negative cache entry.
  98. entry := CacheEntry{
  99. when: time.Now(),
  100. found: false,
  101. }
  102. if err, ok := err.(cachedError); ok {
  103. entry.validUntil = time.Now().Add(err.CacheFor())
  104. }
  105. m.caches[i].Set(deviceID, entry)
  106. }
  107. }
  108. m.mut.RUnlock()
  109. addresses = util.UniqueTrimmedStrings(addresses)
  110. sort.Strings(addresses)
  111. l.Debugln("lookup results for", deviceID)
  112. l.Debugln(" addresses: ", addresses)
  113. return addresses, nil
  114. }
  115. func (m *cachingMux) String() string {
  116. return "discovery cache"
  117. }
  118. func (m *cachingMux) Error() error {
  119. return nil
  120. }
  121. func (m *cachingMux) ChildErrors() map[string]error {
  122. children := make(map[string]error, len(m.finders))
  123. m.mut.RLock()
  124. for _, f := range m.finders {
  125. children[f.String()] = f.Error()
  126. }
  127. m.mut.RUnlock()
  128. return children
  129. }
  130. func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry {
  131. // Res will be the "total" cache, i.e. the union of our cache and all our
  132. // children's caches.
  133. res := make(map[protocol.DeviceID]CacheEntry)
  134. m.mut.RLock()
  135. for i := range m.finders {
  136. // Each finder[i] has a corresponding cache at cache[i]. Go through
  137. // it and populate the total, appending any addresses and keeping
  138. // the newest "when" time. We skip any negative cache entries.
  139. for k, v := range m.caches[i].Cache() {
  140. if v.found {
  141. cur := res[k]
  142. if v.when.After(cur.when) {
  143. cur.when = v.when
  144. }
  145. cur.Addresses = append(cur.Addresses, v.Addresses...)
  146. res[k] = cur
  147. }
  148. }
  149. // Then ask the finder itself for its cache and do the same. If this
  150. // finder is a global discovery client, it will have no cache. If it's
  151. // a local discovery client, this will be its current state.
  152. for k, v := range m.finders[i].Cache() {
  153. if v.found {
  154. cur := res[k]
  155. if v.when.After(cur.when) {
  156. cur.when = v.when
  157. }
  158. cur.Addresses = append(cur.Addresses, v.Addresses...)
  159. res[k] = cur
  160. }
  161. }
  162. }
  163. m.mut.RUnlock()
  164. for k, v := range res {
  165. v.Addresses = util.UniqueTrimmedStrings(v.Addresses)
  166. res[k] = v
  167. }
  168. return res
  169. }
  170. // A cache can be embedded wherever useful
  171. type cache struct {
  172. entries map[protocol.DeviceID]CacheEntry
  173. mut stdsync.Mutex
  174. }
  175. func newCache() *cache {
  176. return &cache{
  177. entries: make(map[protocol.DeviceID]CacheEntry),
  178. }
  179. }
  180. func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
  181. c.mut.Lock()
  182. c.entries[id] = ce
  183. c.mut.Unlock()
  184. }
  185. func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
  186. c.mut.Lock()
  187. ce, ok := c.entries[id]
  188. c.mut.Unlock()
  189. return ce, ok
  190. }
  191. func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
  192. c.mut.Lock()
  193. m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
  194. for k, v := range c.entries {
  195. m[k] = v
  196. }
  197. c.mut.Unlock()
  198. return m
  199. }