discover.go 13 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "net"
  14. "net/url"
  15. "sort"
  16. "time"
  17. "github.com/syncthing/protocol"
  18. "github.com/syncthing/syncthing/lib/beacon"
  19. "github.com/syncthing/syncthing/lib/events"
  20. "github.com/syncthing/syncthing/lib/osutil"
  21. "github.com/syncthing/syncthing/lib/sync"
  22. )
  23. type Discoverer struct {
  24. myID protocol.DeviceID
  25. listenAddrs []string
  26. relayStatusProvider relayStatusProvider
  27. localBcastIntv time.Duration
  28. localBcastStart time.Time
  29. cacheLifetime time.Duration
  30. negCacheCutoff time.Duration
  31. beacons []beacon.Interface
  32. extPort uint16
  33. localBcastTick <-chan time.Time
  34. forcedBcastTick chan time.Time
  35. registryLock sync.RWMutex
  36. addressRegistry map[protocol.DeviceID][]CacheEntry
  37. relayRegistry map[protocol.DeviceID][]CacheEntry
  38. lastLookup map[protocol.DeviceID]time.Time
  39. clients []Client
  40. mut sync.RWMutex
  41. }
  42. type relayStatusProvider interface {
  43. ClientStatus() map[string]bool
  44. }
  45. type CacheEntry struct {
  46. Address string
  47. Seen time.Time
  48. }
  49. var (
  50. ErrIncorrectMagic = errors.New("incorrect magic number")
  51. )
  52. func NewDiscoverer(id protocol.DeviceID, addresses []string, relayStatusProvider relayStatusProvider) *Discoverer {
  53. return &Discoverer{
  54. myID: id,
  55. listenAddrs: addresses,
  56. relayStatusProvider: relayStatusProvider,
  57. localBcastIntv: 30 * time.Second,
  58. cacheLifetime: 5 * time.Minute,
  59. negCacheCutoff: 3 * time.Minute,
  60. addressRegistry: make(map[protocol.DeviceID][]CacheEntry),
  61. relayRegistry: make(map[protocol.DeviceID][]CacheEntry),
  62. lastLookup: make(map[protocol.DeviceID]time.Time),
  63. registryLock: sync.NewRWMutex(),
  64. mut: sync.NewRWMutex(),
  65. }
  66. }
  67. func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
  68. if localPort > 0 {
  69. d.startLocalIPv4Broadcasts(localPort)
  70. }
  71. if len(localMCAddr) > 0 {
  72. d.startLocalIPv6Multicasts(localMCAddr)
  73. }
  74. if len(d.beacons) == 0 {
  75. l.Warnln("Local discovery unavailable")
  76. return
  77. }
  78. d.localBcastTick = time.Tick(d.localBcastIntv)
  79. d.forcedBcastTick = make(chan time.Time)
  80. d.localBcastStart = time.Now()
  81. go d.sendLocalAnnouncements()
  82. }
  83. func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
  84. bb := beacon.NewBroadcast(localPort)
  85. d.beacons = append(d.beacons, bb)
  86. go d.recvAnnouncements(bb)
  87. bb.ServeBackground()
  88. }
  89. func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {
  90. mb, err := beacon.NewMulticast(localMCAddr)
  91. if err != nil {
  92. if debug {
  93. l.Debugln("beacon.NewMulticast:", err)
  94. }
  95. l.Infoln("Local discovery over IPv6 unavailable")
  96. return
  97. }
  98. d.beacons = append(d.beacons, mb)
  99. go d.recvAnnouncements(mb)
  100. }
  101. func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
  102. d.mut.Lock()
  103. defer d.mut.Unlock()
  104. if len(d.clients) > 0 {
  105. d.stopGlobal()
  106. }
  107. d.extPort = extPort
  108. wg := sync.NewWaitGroup()
  109. clients := make(chan Client, len(servers))
  110. for _, address := range servers {
  111. wg.Add(1)
  112. go func(addr string) {
  113. defer wg.Done()
  114. client, err := New(addr, d)
  115. if err != nil {
  116. l.Infoln("Error creating discovery client", addr, err)
  117. return
  118. }
  119. clients <- client
  120. }(address)
  121. }
  122. wg.Wait()
  123. close(clients)
  124. for client := range clients {
  125. d.clients = append(d.clients, client)
  126. }
  127. }
  128. func (d *Discoverer) StopGlobal() {
  129. d.mut.Lock()
  130. defer d.mut.Unlock()
  131. d.stopGlobal()
  132. }
  133. func (d *Discoverer) stopGlobal() {
  134. for _, client := range d.clients {
  135. client.Stop()
  136. }
  137. d.clients = []Client{}
  138. }
  139. func (d *Discoverer) ExtAnnounceOK() map[string]bool {
  140. d.mut.RLock()
  141. defer d.mut.RUnlock()
  142. ret := make(map[string]bool)
  143. for _, client := range d.clients {
  144. ret[client.Address()] = client.StatusOK()
  145. }
  146. return ret
  147. }
  148. // Lookup returns a list of addresses the device is available at, as well as
  149. // a list of relays the device is supposed to be available on sorted by the
  150. // sum of latencies between this device, and the device in question.
  151. func (d *Discoverer) Lookup(device protocol.DeviceID) ([]string, []string) {
  152. d.registryLock.RLock()
  153. cachedAddresses := d.filterCached(d.addressRegistry[device])
  154. cachedRelays := d.filterCached(d.relayRegistry[device])
  155. lastLookup := d.lastLookup[device]
  156. d.registryLock.RUnlock()
  157. d.mut.RLock()
  158. defer d.mut.RUnlock()
  159. relays := make([]string, len(cachedRelays))
  160. for i := range cachedRelays {
  161. relays[i] = cachedRelays[i].Address
  162. }
  163. if len(cachedAddresses) > 0 {
  164. // There are cached address entries.
  165. addrs := make([]string, len(cachedAddresses))
  166. for i := range cachedAddresses {
  167. addrs[i] = cachedAddresses[i].Address
  168. }
  169. return addrs, relays
  170. }
  171. if time.Since(lastLookup) < d.negCacheCutoff {
  172. // We have recently tried to lookup this address and failed. Lets
  173. // chill for a while.
  174. return nil, relays
  175. }
  176. if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
  177. // Only perform external lookups if we have at least one external
  178. // server client and one local announcement interval has passed. This is
  179. // to avoid finding local peers on their remote address at startup.
  180. results := make(chan Announce, len(d.clients))
  181. wg := sync.NewWaitGroup()
  182. for _, client := range d.clients {
  183. wg.Add(1)
  184. go func(c Client) {
  185. defer wg.Done()
  186. ann, err := c.Lookup(device)
  187. if err == nil {
  188. results <- ann
  189. }
  190. }(client)
  191. }
  192. wg.Wait()
  193. close(results)
  194. cachedAddresses := []CacheEntry{}
  195. availableRelays := []Relay{}
  196. seenAddresses := make(map[string]struct{})
  197. seenRelays := make(map[string]struct{})
  198. now := time.Now()
  199. var addrs []string
  200. for result := range results {
  201. for _, addr := range result.This.Addresses {
  202. _, ok := seenAddresses[addr]
  203. if !ok {
  204. cachedAddresses = append(cachedAddresses, CacheEntry{
  205. Address: addr,
  206. Seen: now,
  207. })
  208. seenAddresses[addr] = struct{}{}
  209. addrs = append(addrs, addr)
  210. }
  211. }
  212. for _, relay := range result.This.Relays {
  213. _, ok := seenRelays[relay.Address]
  214. if !ok {
  215. availableRelays = append(availableRelays, relay)
  216. seenRelays[relay.Address] = struct{}{}
  217. }
  218. }
  219. }
  220. relays = RelayAddressesSortedByLatency(availableRelays)
  221. cachedRelays := make([]CacheEntry, len(relays))
  222. for i := range relays {
  223. cachedRelays[i] = CacheEntry{
  224. Address: relays[i],
  225. Seen: now,
  226. }
  227. }
  228. d.registryLock.Lock()
  229. d.addressRegistry[device] = cachedAddresses
  230. d.relayRegistry[device] = cachedRelays
  231. d.lastLookup[device] = time.Now()
  232. d.registryLock.Unlock()
  233. return addrs, relays
  234. }
  235. return nil, relays
  236. }
  237. func (d *Discoverer) Hint(device string, addrs []string) {
  238. resAddrs := resolveAddrs(addrs)
  239. var id protocol.DeviceID
  240. id.UnmarshalText([]byte(device))
  241. d.registerDevice(nil, Device{
  242. Addresses: resAddrs,
  243. ID: id[:],
  244. })
  245. }
  246. func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
  247. d.registryLock.RLock()
  248. devices := make(map[protocol.DeviceID][]CacheEntry, len(d.addressRegistry))
  249. for device, addrs := range d.addressRegistry {
  250. addrsCopy := make([]CacheEntry, len(addrs))
  251. copy(addrsCopy, addrs)
  252. devices[device] = addrsCopy
  253. }
  254. d.registryLock.RUnlock()
  255. return devices
  256. }
  257. func (d *Discoverer) Announcement() Announce {
  258. return d.announcementPkt(true)
  259. }
  260. func (d *Discoverer) announcementPkt(allowExternal bool) Announce {
  261. var addrs []string
  262. if d.extPort != 0 && allowExternal {
  263. addrs = []string{fmt.Sprintf("tcp://:%d", d.extPort)}
  264. } else {
  265. addrs = resolveAddrs(d.listenAddrs)
  266. }
  267. var relayAddrs []string
  268. if d.relayStatusProvider != nil {
  269. status := d.relayStatusProvider.ClientStatus()
  270. for uri, ok := range status {
  271. if ok {
  272. relayAddrs = append(relayAddrs, uri)
  273. }
  274. }
  275. }
  276. return Announce{
  277. Magic: AnnouncementMagic,
  278. This: Device{d.myID[:], addrs, measureLatency(relayAddrs)},
  279. }
  280. }
  281. func (d *Discoverer) sendLocalAnnouncements() {
  282. var pkt = d.announcementPkt(false)
  283. msg := pkt.MustMarshalXDR()
  284. for {
  285. for _, b := range d.beacons {
  286. b.Send(msg)
  287. }
  288. select {
  289. case <-d.localBcastTick:
  290. case <-d.forcedBcastTick:
  291. }
  292. }
  293. }
  294. func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
  295. for {
  296. buf, addr := b.Recv()
  297. var pkt Announce
  298. err := pkt.UnmarshalXDR(buf)
  299. if err != nil && err != io.EOF {
  300. if debug {
  301. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  302. }
  303. continue
  304. }
  305. if debug {
  306. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  307. }
  308. var newDevice bool
  309. if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
  310. newDevice = d.registerDevice(addr, pkt.This)
  311. }
  312. if newDevice {
  313. select {
  314. case d.forcedBcastTick <- time.Now():
  315. }
  316. }
  317. }
  318. }
  319. func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
  320. var id protocol.DeviceID
  321. copy(id[:], device.ID)
  322. d.registryLock.Lock()
  323. defer d.registryLock.Unlock()
  324. current := d.filterCached(d.addressRegistry[id])
  325. orig := current
  326. for _, deviceAddr := range device.Addresses {
  327. uri, err := url.Parse(deviceAddr)
  328. if err != nil {
  329. if debug {
  330. l.Debugf("discover: Failed to parse address %s: %s", deviceAddr, err)
  331. }
  332. continue
  333. }
  334. host, port, err := net.SplitHostPort(uri.Host)
  335. if err != nil {
  336. if debug {
  337. l.Debugf("discover: Failed to split address host %s: %s", deviceAddr, err)
  338. }
  339. continue
  340. }
  341. if host == "" {
  342. uri.Host = net.JoinHostPort(addr.(*net.UDPAddr).IP.String(), port)
  343. deviceAddr = uri.String()
  344. }
  345. for i := range current {
  346. if current[i].Address == deviceAddr {
  347. current[i].Seen = time.Now()
  348. goto done
  349. }
  350. }
  351. current = append(current, CacheEntry{
  352. Address: deviceAddr,
  353. Seen: time.Now(),
  354. })
  355. done:
  356. }
  357. if debug {
  358. l.Debugf("discover: Caching %s addresses: %v", id, current)
  359. }
  360. d.addressRegistry[id] = current
  361. if len(current) > len(orig) {
  362. addrs := make([]string, len(current))
  363. for i := range current {
  364. addrs[i] = current[i].Address
  365. }
  366. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  367. "device": id.String(),
  368. "addrs": addrs,
  369. })
  370. }
  371. return len(current) > len(orig)
  372. }
  373. func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
  374. for i := 0; i < len(c); {
  375. if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
  376. if debug {
  377. l.Debugf("discover: Removing cached entry %s - seen %v ago", c[i].Address, ago)
  378. }
  379. c[i] = c[len(c)-1]
  380. c = c[:len(c)-1]
  381. } else {
  382. i++
  383. }
  384. }
  385. return c
  386. }
  387. func addrToAddr(addr *net.TCPAddr) string {
  388. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  389. return fmt.Sprintf(":%d", addr.Port)
  390. } else if bs := addr.IP.To4(); bs != nil {
  391. return fmt.Sprintf("%s:%d", bs.String(), addr.Port)
  392. } else if bs := addr.IP.To16(); bs != nil {
  393. return fmt.Sprintf("[%s]:%d", bs.String(), addr.Port)
  394. }
  395. return ""
  396. }
  397. func resolveAddrs(addrs []string) []string {
  398. var raddrs []string
  399. for _, addrStr := range addrs {
  400. uri, err := url.Parse(addrStr)
  401. if err != nil {
  402. continue
  403. }
  404. addrRes, err := net.ResolveTCPAddr("tcp", uri.Host)
  405. if err != nil {
  406. continue
  407. }
  408. addr := addrToAddr(addrRes)
  409. if len(addr) > 0 {
  410. uri.Host = addr
  411. raddrs = append(raddrs, uri.String())
  412. }
  413. }
  414. return raddrs
  415. }
  416. func measureLatency(relayAdresses []string) []Relay {
  417. relays := make([]Relay, 0, len(relayAdresses))
  418. for i, addr := range relayAdresses {
  419. relay := Relay{
  420. Address: addr,
  421. Latency: int32(time.Hour / time.Millisecond),
  422. }
  423. relays = append(relays, relay)
  424. if latency, err := osutil.GetLatencyForURL(addr); err == nil {
  425. if debug {
  426. l.Debugf("Relay %s latency %s", addr, latency)
  427. }
  428. relays[i].Latency = int32(latency / time.Millisecond)
  429. } else {
  430. l.Debugf("Failed to get relay %s latency %s", addr, err)
  431. }
  432. }
  433. return relays
  434. }
  435. // RelayAddressesSortedByLatency adds local latency to the relay, and sorts them
  436. // by sum latency, and returns the addresses.
  437. func RelayAddressesSortedByLatency(input []Relay) []string {
  438. relays := make([]Relay, len(input))
  439. copy(relays, input)
  440. for i, relay := range relays {
  441. if latency, err := osutil.GetLatencyForURL(relay.Address); err == nil {
  442. relays[i].Latency += int32(latency / time.Millisecond)
  443. } else {
  444. relays[i].Latency += int32(time.Hour / time.Millisecond)
  445. }
  446. }
  447. sort.Sort(relayList(relays))
  448. addresses := make([]string, 0, len(relays))
  449. for _, relay := range relays {
  450. addresses = append(addresses, relay.Address)
  451. }
  452. return addresses
  453. }
  454. type relayList []Relay
  455. func (l relayList) Len() int {
  456. return len(l)
  457. }
  458. func (l relayList) Less(a, b int) bool {
  459. return l[a].Latency < l[b].Latency
  460. }
  461. func (l relayList) Swap(a, b int) {
  462. l[a], l[b] = l[b], l[a]
  463. }