cache.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. stdsync "sync"
  9. "time"
  10. "github.com/syncthing/syncthing/lib/protocol"
  11. "github.com/syncthing/syncthing/lib/sync"
  12. "github.com/syncthing/syncthing/lib/util"
  13. "github.com/thejerf/suture"
  14. )
  15. // The CachingMux aggregates results from multiple Finders. Each Finder has
  16. // an associated cache time and negative cache time. The cache time sets how
  17. // long we cache and return successful lookup results, the negative cache
  18. // time sets how long we refrain from asking about the same device ID after
  19. // receiving a negative answer. The value of zero disables caching (positive
  20. // or negative).
  21. type CachingMux interface {
  22. FinderService
  23. Add(finder Finder, cacheTime, negCacheTime time.Duration)
  24. ChildErrors() map[string]error
  25. }
  26. type cachingMux struct {
  27. *suture.Supervisor
  28. finders []cachedFinder
  29. caches []*cache
  30. mut sync.RWMutex
  31. }
  32. // A cachedFinder is a Finder with associated cache timeouts.
  33. type cachedFinder struct {
  34. Finder
  35. cacheTime time.Duration
  36. negCacheTime time.Duration
  37. }
  38. // An error may implement cachedError, in which case it will be interrogated
  39. // to see how long we should cache the error. This overrides the default
  40. // negative cache time.
  41. type cachedError interface {
  42. CacheFor() time.Duration
  43. }
  44. func NewCachingMux() CachingMux {
  45. return &cachingMux{
  46. Supervisor: suture.NewSimple("discover.cachingMux"),
  47. mut: sync.NewRWMutex(),
  48. }
  49. }
  50. // Add registers a new Finder, with associated cache timeouts.
  51. func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
  52. m.mut.Lock()
  53. m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
  54. m.caches = append(m.caches, newCache())
  55. m.mut.Unlock()
  56. if service, ok := finder.(suture.Service); ok {
  57. m.Supervisor.Add(service)
  58. }
  59. }
  60. // Lookup attempts to resolve the device ID using any of the added Finders,
  61. // while obeying the cache settings.
  62. func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err error) {
  63. m.mut.RLock()
  64. for i, finder := range m.finders {
  65. if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
  66. // We have a cache entry. Lets see what it says.
  67. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  68. // It's a positive, valid entry. Use it.
  69. l.Debugln("cached discovery entry for", deviceID, "at", finder)
  70. l.Debugln(" cache:", cacheEntry)
  71. addresses = append(addresses, cacheEntry.Addresses...)
  72. continue
  73. }
  74. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  75. if !cacheEntry.found && valid {
  76. // It's a negative, valid entry. We should not make another
  77. // attempt right now.
  78. l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
  79. continue
  80. }
  81. // It's expired. Ignore and continue.
  82. }
  83. // Perform the actual lookup and cache the result.
  84. if addrs, err := finder.Lookup(deviceID); err == nil {
  85. l.Debugln("lookup for", deviceID, "at", finder)
  86. l.Debugln(" addresses:", addrs)
  87. addresses = append(addresses, addrs...)
  88. m.caches[i].Set(deviceID, CacheEntry{
  89. Addresses: addrs,
  90. when: time.Now(),
  91. found: len(addrs) > 0,
  92. })
  93. } else {
  94. // Lookup returned error, add a negative cache entry.
  95. entry := CacheEntry{
  96. when: time.Now(),
  97. found: false,
  98. }
  99. if err, ok := err.(cachedError); ok {
  100. entry.validUntil = time.Now().Add(err.CacheFor())
  101. }
  102. m.caches[i].Set(deviceID, entry)
  103. }
  104. }
  105. m.mut.RUnlock()
  106. l.Debugln("lookup results for", deviceID)
  107. l.Debugln(" addresses: ", addresses)
  108. addresses = util.UniqueStrings(addresses)
  109. return addresses, nil
  110. }
  111. func (m *cachingMux) String() string {
  112. return "discovery cache"
  113. }
  114. func (m *cachingMux) Error() error {
  115. return nil
  116. }
  117. func (m *cachingMux) ChildErrors() map[string]error {
  118. children := make(map[string]error, len(m.finders))
  119. m.mut.RLock()
  120. for _, f := range m.finders {
  121. children[f.String()] = f.Error()
  122. }
  123. m.mut.RUnlock()
  124. return children
  125. }
  126. func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry {
  127. // Res will be the "total" cache, i.e. the union of our cache and all our
  128. // children's caches.
  129. res := make(map[protocol.DeviceID]CacheEntry)
  130. m.mut.RLock()
  131. for i := range m.finders {
  132. // Each finder[i] has a corresponding cache at cache[i]. Go through
  133. // it and populate the total, appending any addresses and keeping
  134. // the newest "when" time. We skip any negative cache entries.
  135. for k, v := range m.caches[i].Cache() {
  136. if v.found {
  137. cur := res[k]
  138. if v.when.After(cur.when) {
  139. cur.when = v.when
  140. }
  141. cur.Addresses = append(cur.Addresses, v.Addresses...)
  142. res[k] = cur
  143. }
  144. }
  145. // Then ask the finder itself for its cache and do the same. If this
  146. // finder is a global discovery client, it will have no cache. If it's
  147. // a local discovery client, this will be its current state.
  148. for k, v := range m.finders[i].Cache() {
  149. if v.found {
  150. cur := res[k]
  151. if v.when.After(cur.when) {
  152. cur.when = v.when
  153. }
  154. cur.Addresses = append(cur.Addresses, v.Addresses...)
  155. res[k] = cur
  156. }
  157. }
  158. }
  159. m.mut.RUnlock()
  160. for k, v := range res {
  161. v.Addresses = util.UniqueStrings(v.Addresses)
  162. res[k] = v
  163. }
  164. return res
  165. }
  166. // A cache can be embedded wherever useful
  167. type cache struct {
  168. entries map[protocol.DeviceID]CacheEntry
  169. mut stdsync.Mutex
  170. }
  171. func newCache() *cache {
  172. return &cache{
  173. entries: make(map[protocol.DeviceID]CacheEntry),
  174. }
  175. }
  176. func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
  177. c.mut.Lock()
  178. c.entries[id] = ce
  179. c.mut.Unlock()
  180. }
  181. func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
  182. c.mut.Lock()
  183. ce, ok := c.entries[id]
  184. c.mut.Unlock()
  185. return ce, ok
  186. }
  187. func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
  188. c.mut.Lock()
  189. m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
  190. for k, v := range c.entries {
  191. m[k] = v
  192. }
  193. c.mut.Unlock()
  194. return m
  195. }