cache.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. stdsync "sync"
  9. "time"
  10. "github.com/calmh/suture"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. "github.com/syncthing/syncthing/lib/util"
  14. )
  15. // The CachingMux aggregates results from multiple Finders. Each Finder has
  16. // an associated cache time and negative cache time. The cache time sets how
  17. // long we cache and return successful lookup results, the negative cache
  18. // time sets how long we refrain from asking about the same device ID after
  19. // receiving a negative answer. The value of zero disables caching (positive
  20. // or negative).
  21. type CachingMux interface {
  22. FinderService
  23. Add(finder Finder, cacheTime, negCacheTime time.Duration)
  24. ChildErrors() map[string]error
  25. }
  26. type cachingMux struct {
  27. *suture.Supervisor
  28. finders []cachedFinder
  29. caches []*cache
  30. mut sync.RWMutex
  31. }
  32. // A cachedFinder is a Finder with associated cache timeouts.
  33. type cachedFinder struct {
  34. Finder
  35. cacheTime time.Duration
  36. negCacheTime time.Duration
  37. }
  38. // An error may implement cachedError, in which case it will be interrogated
  39. // to see how long we should cache the error. This overrides the default
  40. // negative cache time.
  41. type cachedError interface {
  42. CacheFor() time.Duration
  43. }
  44. func NewCachingMux() CachingMux {
  45. return &cachingMux{
  46. Supervisor: suture.New("discover.cachingMux", suture.Spec{
  47. PanicPanics: true,
  48. }),
  49. mut: sync.NewRWMutex(),
  50. }
  51. }
  52. // Add registers a new Finder, with associated cache timeouts.
  53. func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
  54. m.mut.Lock()
  55. m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
  56. m.caches = append(m.caches, newCache())
  57. m.mut.Unlock()
  58. if service, ok := finder.(suture.Service); ok {
  59. m.Supervisor.Add(service)
  60. }
  61. }
  62. // Lookup attempts to resolve the device ID using any of the added Finders,
  63. // while obeying the cache settings.
  64. func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err error) {
  65. m.mut.RLock()
  66. for i, finder := range m.finders {
  67. if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
  68. // We have a cache entry. Lets see what it says.
  69. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  70. // It's a positive, valid entry. Use it.
  71. l.Debugln("cached discovery entry for", deviceID, "at", finder)
  72. l.Debugln(" cache:", cacheEntry)
  73. addresses = append(addresses, cacheEntry.Addresses...)
  74. continue
  75. }
  76. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  77. if !cacheEntry.found && valid {
  78. // It's a negative, valid entry. We should not make another
  79. // attempt right now.
  80. l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
  81. continue
  82. }
  83. // It's expired. Ignore and continue.
  84. }
  85. // Perform the actual lookup and cache the result.
  86. if addrs, err := finder.Lookup(deviceID); err == nil {
  87. l.Debugln("lookup for", deviceID, "at", finder)
  88. l.Debugln(" addresses:", addrs)
  89. addresses = append(addresses, addrs...)
  90. m.caches[i].Set(deviceID, CacheEntry{
  91. Addresses: addrs,
  92. when: time.Now(),
  93. found: len(addrs) > 0,
  94. })
  95. } else {
  96. // Lookup returned error, add a negative cache entry.
  97. entry := CacheEntry{
  98. when: time.Now(),
  99. found: false,
  100. }
  101. if err, ok := err.(cachedError); ok {
  102. entry.validUntil = time.Now().Add(err.CacheFor())
  103. }
  104. m.caches[i].Set(deviceID, entry)
  105. }
  106. }
  107. m.mut.RUnlock()
  108. l.Debugln("lookup results for", deviceID)
  109. l.Debugln(" addresses: ", addresses)
  110. addresses = util.UniqueStrings(addresses)
  111. return addresses, nil
  112. }
  113. func (m *cachingMux) String() string {
  114. return "discovery cache"
  115. }
  116. func (m *cachingMux) Error() error {
  117. return nil
  118. }
  119. func (m *cachingMux) ChildErrors() map[string]error {
  120. children := make(map[string]error, len(m.finders))
  121. m.mut.RLock()
  122. for _, f := range m.finders {
  123. children[f.String()] = f.Error()
  124. }
  125. m.mut.RUnlock()
  126. return children
  127. }
  128. func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry {
  129. // Res will be the "total" cache, i.e. the union of our cache and all our
  130. // children's caches.
  131. res := make(map[protocol.DeviceID]CacheEntry)
  132. m.mut.RLock()
  133. for i := range m.finders {
  134. // Each finder[i] has a corresponding cache at cache[i]. Go through
  135. // it and populate the total, appending any addresses and keeping
  136. // the newest "when" time. We skip any negative cache entries.
  137. for k, v := range m.caches[i].Cache() {
  138. if v.found {
  139. cur := res[k]
  140. if v.when.After(cur.when) {
  141. cur.when = v.when
  142. }
  143. cur.Addresses = append(cur.Addresses, v.Addresses...)
  144. res[k] = cur
  145. }
  146. }
  147. // Then ask the finder itself for its cache and do the same. If this
  148. // finder is a global discovery client, it will have no cache. If it's
  149. // a local discovery client, this will be its current state.
  150. for k, v := range m.finders[i].Cache() {
  151. if v.found {
  152. cur := res[k]
  153. if v.when.After(cur.when) {
  154. cur.when = v.when
  155. }
  156. cur.Addresses = append(cur.Addresses, v.Addresses...)
  157. res[k] = cur
  158. }
  159. }
  160. }
  161. m.mut.RUnlock()
  162. for k, v := range res {
  163. v.Addresses = util.UniqueStrings(v.Addresses)
  164. res[k] = v
  165. }
  166. return res
  167. }
  168. // A cache can be embedded wherever useful
  169. type cache struct {
  170. entries map[protocol.DeviceID]CacheEntry
  171. mut stdsync.Mutex
  172. }
  173. func newCache() *cache {
  174. return &cache{
  175. entries: make(map[protocol.DeviceID]CacheEntry),
  176. }
  177. }
  178. func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
  179. c.mut.Lock()
  180. c.entries[id] = ce
  181. c.mut.Unlock()
  182. }
  183. func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
  184. c.mut.Lock()
  185. ce, ok := c.entries[id]
  186. c.mut.Unlock()
  187. return ce, ok
  188. }
  189. func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
  190. c.mut.Lock()
  191. m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
  192. for k, v := range c.entries {
  193. m[k] = v
  194. }
  195. c.mut.Unlock()
  196. return m
  197. }