cache.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "sort"
  9. stdsync "sync"
  10. "time"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. "github.com/thejerf/suture"
  14. )
  15. // The CachingMux aggregates results from multiple Finders. Each Finder has
  16. // an associated cache time and negative cache time. The cache time sets how
  17. // long we cache and return successfull lookup results, the negative cache
  18. // time sets how long we refrain from asking about the same device ID after
  19. // receiving a negative answer. The value of zero disables caching (positive
  20. // or negative).
  21. type CachingMux struct {
  22. *suture.Supervisor
  23. finders []cachedFinder
  24. caches []*cache
  25. mut sync.Mutex
  26. }
  27. // A cachedFinder is a Finder with associated cache timeouts.
  28. type cachedFinder struct {
  29. Finder
  30. cacheTime time.Duration
  31. negCacheTime time.Duration
  32. }
  33. func NewCachingMux() *CachingMux {
  34. return &CachingMux{
  35. Supervisor: suture.NewSimple("discover.cachingMux"),
  36. mut: sync.NewMutex(),
  37. }
  38. }
  39. // Add registers a new Finder, with associated cache timeouts.
  40. func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
  41. m.mut.Lock()
  42. m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
  43. m.caches = append(m.caches, newCache())
  44. m.mut.Unlock()
  45. if svc, ok := finder.(suture.Service); ok {
  46. m.Supervisor.Add(svc)
  47. }
  48. }
  49. // Lookup attempts to resolve the device ID using any of the added Finders,
  50. // while obeying the cache settings.
  51. func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
  52. m.mut.Lock()
  53. for i, finder := range m.finders {
  54. if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
  55. // We have a cache entry. Lets see what it says.
  56. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  57. // It's a positive, valid entry. Use it.
  58. if debug {
  59. l.Debugln("cached discovery entry for", deviceID, "at", finder.String())
  60. l.Debugln(" ", cacheEntry)
  61. }
  62. direct = append(direct, cacheEntry.Direct...)
  63. relays = append(relays, cacheEntry.Relays...)
  64. continue
  65. }
  66. if !cacheEntry.found && time.Since(cacheEntry.when) < finder.negCacheTime {
  67. // It's a negative, valid entry. We should not make another
  68. // attempt right now.
  69. if debug {
  70. l.Debugln("negative cache entry for", deviceID, "at", finder.String())
  71. }
  72. continue
  73. }
  74. // It's expired. Ignore and continue.
  75. }
  76. // Perform the actual lookup and cache the result.
  77. if td, tr, err := finder.Lookup(deviceID); err == nil {
  78. if debug {
  79. l.Debugln("lookup for", deviceID, "at", finder.String())
  80. l.Debugln(" ", td)
  81. l.Debugln(" ", tr)
  82. }
  83. direct = append(direct, td...)
  84. relays = append(relays, tr...)
  85. m.caches[i].Set(deviceID, CacheEntry{
  86. Direct: td,
  87. Relays: tr,
  88. when: time.Now(),
  89. found: len(td)+len(tr) > 0,
  90. })
  91. }
  92. }
  93. m.mut.Unlock()
  94. if debug {
  95. l.Debugln("lookup results for", deviceID)
  96. l.Debugln(" ", direct)
  97. l.Debugln(" ", relays)
  98. }
  99. return uniqueSortedStrings(direct), uniqueSortedRelays(relays), nil
  100. }
  101. func (m *CachingMux) String() string {
  102. return "discovery cache"
  103. }
  104. func (m *CachingMux) Error() error {
  105. return nil
  106. }
  107. func (m *CachingMux) ChildErrors() map[string]error {
  108. m.mut.Lock()
  109. children := make(map[string]error, len(m.finders))
  110. for _, f := range m.finders {
  111. children[f.String()] = f.Error()
  112. }
  113. m.mut.Unlock()
  114. return children
  115. }
  116. func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
  117. // Res will be the "total" cache, i.e. the union of our cache and all our
  118. // children's caches.
  119. res := make(map[protocol.DeviceID]CacheEntry)
  120. m.mut.Lock()
  121. for i := range m.finders {
  122. // Each finder[i] has a corresponding cache at cache[i]. Go through it
  123. // and populate the total, if it's newer than what's already in there.
  124. // We skip any negative cache entries.
  125. for k, v := range m.caches[i].Cache() {
  126. if v.found && v.when.After(res[k].when) {
  127. res[k] = v
  128. }
  129. }
  130. // Then ask the finder itself for it's cache and do the same. If this
  131. // finder is a global discovery client, it will have no cache. If it's
  132. // a local discovery client, this will be it's current state.
  133. for k, v := range m.finders[i].Cache() {
  134. if v.found && v.when.After(res[k].when) {
  135. res[k] = v
  136. }
  137. }
  138. }
  139. m.mut.Unlock()
  140. return res
  141. }
  142. // A cache can be embedded wherever useful
  143. type cache struct {
  144. entries map[protocol.DeviceID]CacheEntry
  145. mut stdsync.Mutex
  146. }
  147. func newCache() *cache {
  148. return &cache{
  149. entries: make(map[protocol.DeviceID]CacheEntry),
  150. }
  151. }
  152. func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
  153. c.mut.Lock()
  154. c.entries[id] = ce
  155. c.mut.Unlock()
  156. }
  157. func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
  158. c.mut.Lock()
  159. ce, ok := c.entries[id]
  160. c.mut.Unlock()
  161. return ce, ok
  162. }
  163. func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
  164. c.mut.Lock()
  165. m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
  166. for k, v := range c.entries {
  167. m[k] = v
  168. }
  169. c.mut.Unlock()
  170. return m
  171. }
  172. func uniqueSortedStrings(ss []string) []string {
  173. m := make(map[string]struct{}, len(ss))
  174. for _, s := range ss {
  175. m[s] = struct{}{}
  176. }
  177. var us = make([]string, 0, len(m))
  178. for k := range m {
  179. us = append(us, k)
  180. }
  181. sort.Strings(us)
  182. return us
  183. }
  184. func uniqueSortedRelays(rs []Relay) []Relay {
  185. m := make(map[string]Relay, len(rs))
  186. for _, r := range rs {
  187. m[r.URL] = r
  188. }
  189. var ur = make([]Relay, 0, len(m))
  190. for _, r := range m {
  191. ur = append(ur, r)
  192. }
  193. sort.Sort(relayList(ur))
  194. return ur
  195. }
  196. type relayList []Relay
  197. func (l relayList) Len() int {
  198. return len(l)
  199. }
  200. func (l relayList) Swap(a, b int) {
  201. l[a], l[b] = l[b], l[a]
  202. }
  203. func (l relayList) Less(a, b int) bool {
  204. return l[a].URL < l[b].URL
  205. }