discover.go 10 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "io"
  12. "net"
  13. "strconv"
  14. "time"
  15. "github.com/syncthing/protocol"
  16. "github.com/syncthing/syncthing/internal/beacon"
  17. "github.com/syncthing/syncthing/internal/events"
  18. "github.com/syncthing/syncthing/internal/sync"
  19. )
  20. type Discoverer struct {
  21. myID protocol.DeviceID
  22. listenAddrs []string
  23. localBcastIntv time.Duration
  24. localBcastStart time.Time
  25. cacheLifetime time.Duration
  26. negCacheCutoff time.Duration
  27. beacons []beacon.Interface
  28. extPort uint16
  29. localBcastTick <-chan time.Time
  30. forcedBcastTick chan time.Time
  31. registryLock sync.RWMutex
  32. registry map[protocol.DeviceID][]CacheEntry
  33. lastLookup map[protocol.DeviceID]time.Time
  34. clients []Client
  35. mut sync.RWMutex
  36. }
  37. type CacheEntry struct {
  38. Address string
  39. Seen time.Time
  40. }
  41. var (
  42. ErrIncorrectMagic = errors.New("incorrect magic number")
  43. )
  44. func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
  45. return &Discoverer{
  46. myID: id,
  47. listenAddrs: addresses,
  48. localBcastIntv: 30 * time.Second,
  49. cacheLifetime: 5 * time.Minute,
  50. negCacheCutoff: 3 * time.Minute,
  51. registry: make(map[protocol.DeviceID][]CacheEntry),
  52. lastLookup: make(map[protocol.DeviceID]time.Time),
  53. registryLock: sync.NewRWMutex(),
  54. mut: sync.NewRWMutex(),
  55. }
  56. }
  57. func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
  58. if localPort > 0 {
  59. d.startLocalIPv4Broadcasts(localPort)
  60. }
  61. if len(localMCAddr) > 0 {
  62. d.startLocalIPv6Multicasts(localMCAddr)
  63. }
  64. if len(d.beacons) == 0 {
  65. l.Warnln("Local discovery unavailable")
  66. return
  67. }
  68. d.localBcastTick = time.Tick(d.localBcastIntv)
  69. d.forcedBcastTick = make(chan time.Time)
  70. d.localBcastStart = time.Now()
  71. go d.sendLocalAnnouncements()
  72. }
  73. func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
  74. bb, err := beacon.NewBroadcast(localPort)
  75. if err != nil {
  76. if debug {
  77. l.Debugln("discover: Start local v4:", err)
  78. }
  79. l.Infoln("Local discovery over IPv4 unavailable")
  80. return
  81. }
  82. d.beacons = append(d.beacons, bb)
  83. go d.recvAnnouncements(bb)
  84. }
  85. func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {
  86. intfs, err := net.Interfaces()
  87. if err != nil {
  88. if debug {
  89. l.Debugln("discover: interfaces:", err)
  90. }
  91. l.Infoln("Local discovery over IPv6 unavailable")
  92. return
  93. }
  94. v6Intfs := 0
  95. for _, intf := range intfs {
  96. if intf.Flags&net.FlagUp == 0 || intf.Flags&net.FlagMulticast == 0 {
  97. continue
  98. }
  99. mb, err := beacon.NewMulticast(localMCAddr, intf.Name)
  100. if err != nil {
  101. if debug {
  102. l.Debugln("discover: Start local v6:", err)
  103. }
  104. continue
  105. }
  106. d.beacons = append(d.beacons, mb)
  107. go d.recvAnnouncements(mb)
  108. v6Intfs++
  109. }
  110. if v6Intfs == 0 {
  111. l.Infoln("Local discovery over IPv6 unavailable")
  112. }
  113. }
  114. func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
  115. d.mut.Lock()
  116. defer d.mut.Unlock()
  117. if len(d.clients) > 0 {
  118. d.stopGlobal()
  119. }
  120. d.extPort = extPort
  121. pkt := d.announcementPkt()
  122. wg := sync.NewWaitGroup()
  123. clients := make(chan Client, len(servers))
  124. for _, address := range servers {
  125. wg.Add(1)
  126. go func(addr string) {
  127. defer wg.Done()
  128. client, err := New(addr, pkt)
  129. if err != nil {
  130. l.Infoln("Error creating discovery client", addr, err)
  131. return
  132. }
  133. clients <- client
  134. }(address)
  135. }
  136. wg.Wait()
  137. close(clients)
  138. for client := range clients {
  139. d.clients = append(d.clients, client)
  140. }
  141. }
  142. func (d *Discoverer) StopGlobal() {
  143. d.mut.Lock()
  144. defer d.mut.Unlock()
  145. d.stopGlobal()
  146. }
  147. func (d *Discoverer) stopGlobal() {
  148. for _, client := range d.clients {
  149. client.Stop()
  150. }
  151. d.clients = []Client{}
  152. }
  153. func (d *Discoverer) ExtAnnounceOK() map[string]bool {
  154. d.mut.RLock()
  155. defer d.mut.RUnlock()
  156. ret := make(map[string]bool)
  157. for _, client := range d.clients {
  158. ret[client.Address()] = client.StatusOK()
  159. }
  160. return ret
  161. }
  162. func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
  163. d.registryLock.RLock()
  164. cached := d.filterCached(d.registry[device])
  165. lastLookup := d.lastLookup[device]
  166. d.registryLock.RUnlock()
  167. d.mut.RLock()
  168. defer d.mut.RUnlock()
  169. if len(cached) > 0 {
  170. // There are cached address entries.
  171. addrs := make([]string, len(cached))
  172. for i := range cached {
  173. addrs[i] = cached[i].Address
  174. }
  175. return addrs
  176. }
  177. if time.Since(lastLookup) < d.negCacheCutoff {
  178. // We have recently tried to lookup this address and failed. Lets
  179. // chill for a while.
  180. return nil
  181. }
  182. if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
  183. // Only perform external lookups if we have at least one external
  184. // server client and one local announcement interval has passed. This is
  185. // to avoid finding local peers on their remote address at startup.
  186. results := make(chan []string, len(d.clients))
  187. wg := sync.NewWaitGroup()
  188. for _, client := range d.clients {
  189. wg.Add(1)
  190. go func(c Client) {
  191. defer wg.Done()
  192. results <- c.Lookup(device)
  193. }(client)
  194. }
  195. wg.Wait()
  196. close(results)
  197. cached := []CacheEntry{}
  198. seen := make(map[string]struct{})
  199. now := time.Now()
  200. var addrs []string
  201. for result := range results {
  202. for _, addr := range result {
  203. _, ok := seen[addr]
  204. if !ok {
  205. cached = append(cached, CacheEntry{
  206. Address: addr,
  207. Seen: now,
  208. })
  209. seen[addr] = struct{}{}
  210. addrs = append(addrs, addr)
  211. }
  212. }
  213. }
  214. d.registryLock.Lock()
  215. d.registry[device] = cached
  216. d.lastLookup[device] = time.Now()
  217. d.registryLock.Unlock()
  218. return addrs
  219. }
  220. return nil
  221. }
  222. func (d *Discoverer) Hint(device string, addrs []string) {
  223. resAddrs := resolveAddrs(addrs)
  224. var id protocol.DeviceID
  225. id.UnmarshalText([]byte(device))
  226. d.registerDevice(nil, Device{
  227. Addresses: resAddrs,
  228. ID: id[:],
  229. })
  230. }
  231. func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
  232. d.registryLock.RLock()
  233. devices := make(map[protocol.DeviceID][]CacheEntry, len(d.registry))
  234. for device, addrs := range d.registry {
  235. addrsCopy := make([]CacheEntry, len(addrs))
  236. copy(addrsCopy, addrs)
  237. devices[device] = addrsCopy
  238. }
  239. d.registryLock.RUnlock()
  240. return devices
  241. }
  242. func (d *Discoverer) announcementPkt() *Announce {
  243. var addrs []Address
  244. if d.extPort != 0 {
  245. addrs = []Address{{Port: d.extPort}}
  246. } else {
  247. for _, astr := range d.listenAddrs {
  248. addr, err := net.ResolveTCPAddr("tcp", astr)
  249. if err != nil {
  250. l.Warnln("discover: %v: not announcing %s", err, astr)
  251. continue
  252. } else if debug {
  253. l.Debugf("discover: resolved %s as %#v", astr, addr)
  254. }
  255. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  256. addrs = append(addrs, Address{Port: uint16(addr.Port)})
  257. } else if bs := addr.IP.To4(); bs != nil {
  258. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  259. } else if bs := addr.IP.To16(); bs != nil {
  260. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  261. }
  262. }
  263. }
  264. return &Announce{
  265. Magic: AnnouncementMagic,
  266. This: Device{d.myID[:], addrs},
  267. }
  268. }
  269. func (d *Discoverer) sendLocalAnnouncements() {
  270. var addrs = resolveAddrs(d.listenAddrs)
  271. var pkt = Announce{
  272. Magic: AnnouncementMagic,
  273. This: Device{d.myID[:], addrs},
  274. }
  275. msg := pkt.MustMarshalXDR()
  276. for {
  277. for _, b := range d.beacons {
  278. b.Send(msg)
  279. }
  280. select {
  281. case <-d.localBcastTick:
  282. case <-d.forcedBcastTick:
  283. }
  284. }
  285. }
  286. func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
  287. for {
  288. buf, addr := b.Recv()
  289. var pkt Announce
  290. err := pkt.UnmarshalXDR(buf)
  291. if err != nil && err != io.EOF {
  292. if debug {
  293. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  294. }
  295. continue
  296. }
  297. if debug {
  298. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  299. }
  300. var newDevice bool
  301. if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
  302. newDevice = d.registerDevice(addr, pkt.This)
  303. }
  304. if newDevice {
  305. select {
  306. case d.forcedBcastTick <- time.Now():
  307. }
  308. }
  309. }
  310. }
  311. func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
  312. var id protocol.DeviceID
  313. copy(id[:], device.ID)
  314. d.registryLock.Lock()
  315. defer d.registryLock.Unlock()
  316. current := d.filterCached(d.registry[id])
  317. orig := current
  318. for _, a := range device.Addresses {
  319. var deviceAddr string
  320. if len(a.IP) > 0 {
  321. deviceAddr = net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  322. } else if addr != nil {
  323. ua := addr.(*net.UDPAddr)
  324. ua.Port = int(a.Port)
  325. deviceAddr = ua.String()
  326. }
  327. for i := range current {
  328. if current[i].Address == deviceAddr {
  329. current[i].Seen = time.Now()
  330. goto done
  331. }
  332. }
  333. current = append(current, CacheEntry{
  334. Address: deviceAddr,
  335. Seen: time.Now(),
  336. })
  337. done:
  338. }
  339. if debug {
  340. l.Debugf("discover: Caching %s addresses: %v", id, current)
  341. }
  342. d.registry[id] = current
  343. if len(current) > len(orig) {
  344. addrs := make([]string, len(current))
  345. for i := range current {
  346. addrs[i] = current[i].Address
  347. }
  348. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  349. "device": id.String(),
  350. "addrs": addrs,
  351. })
  352. }
  353. return len(current) > len(orig)
  354. }
  355. func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
  356. for i := 0; i < len(c); {
  357. if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
  358. if debug {
  359. l.Debugf("discover: Removing cached address %s - seen %v ago", c[i].Address, ago)
  360. }
  361. c[i] = c[len(c)-1]
  362. c = c[:len(c)-1]
  363. } else {
  364. i++
  365. }
  366. }
  367. return c
  368. }
  369. func addrToAddr(addr *net.TCPAddr) Address {
  370. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  371. return Address{Port: uint16(addr.Port)}
  372. } else if bs := addr.IP.To4(); bs != nil {
  373. return Address{IP: bs, Port: uint16(addr.Port)}
  374. } else if bs := addr.IP.To16(); bs != nil {
  375. return Address{IP: bs, Port: uint16(addr.Port)}
  376. }
  377. return Address{}
  378. }
  379. func resolveAddrs(addrs []string) []Address {
  380. var raddrs []Address
  381. for _, addrStr := range addrs {
  382. addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
  383. if err != nil {
  384. continue
  385. }
  386. addr := addrToAddr(addrRes)
  387. if len(addr.IP) > 0 {
  388. raddrs = append(raddrs, addr)
  389. } else {
  390. raddrs = append(raddrs, Address{Port: addr.Port})
  391. }
  392. }
  393. return raddrs
  394. }