discover.go 10 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "io"
  12. "net"
  13. "strconv"
  14. "sync"
  15. "time"
  16. "github.com/syncthing/protocol"
  17. "github.com/syncthing/syncthing/internal/beacon"
  18. "github.com/syncthing/syncthing/internal/events"
  19. )
  20. type Discoverer struct {
  21. myID protocol.DeviceID
  22. listenAddrs []string
  23. localBcastIntv time.Duration
  24. localBcastStart time.Time
  25. cacheLifetime time.Duration
  26. negCacheCutoff time.Duration
  27. beacons []beacon.Interface
  28. extPort uint16
  29. localBcastTick <-chan time.Time
  30. forcedBcastTick chan time.Time
  31. registryLock sync.RWMutex
  32. registry map[protocol.DeviceID][]CacheEntry
  33. lastLookup map[protocol.DeviceID]time.Time
  34. clients []Client
  35. mut sync.RWMutex
  36. }
  37. type CacheEntry struct {
  38. Address string
  39. Seen time.Time
  40. }
  41. var (
  42. ErrIncorrectMagic = errors.New("incorrect magic number")
  43. )
  44. func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
  45. return &Discoverer{
  46. myID: id,
  47. listenAddrs: addresses,
  48. localBcastIntv: 30 * time.Second,
  49. cacheLifetime: 5 * time.Minute,
  50. negCacheCutoff: 3 * time.Minute,
  51. registry: make(map[protocol.DeviceID][]CacheEntry),
  52. lastLookup: make(map[protocol.DeviceID]time.Time),
  53. }
  54. }
  55. func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
  56. if localPort > 0 {
  57. d.startLocalIPv4Broadcasts(localPort)
  58. }
  59. if len(localMCAddr) > 0 {
  60. d.startLocalIPv6Multicasts(localMCAddr)
  61. }
  62. if len(d.beacons) == 0 {
  63. l.Warnln("Local discovery unavailable")
  64. return
  65. }
  66. d.localBcastTick = time.Tick(d.localBcastIntv)
  67. d.forcedBcastTick = make(chan time.Time)
  68. d.localBcastStart = time.Now()
  69. go d.sendLocalAnnouncements()
  70. }
  71. func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
  72. bb, err := beacon.NewBroadcast(localPort)
  73. if err != nil {
  74. if debug {
  75. l.Debugln("discover: Start local v4:", err)
  76. }
  77. l.Infoln("Local discovery over IPv4 unavailable")
  78. return
  79. }
  80. d.beacons = append(d.beacons, bb)
  81. go d.recvAnnouncements(bb)
  82. }
  83. func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {
  84. intfs, err := net.Interfaces()
  85. if err != nil {
  86. if debug {
  87. l.Debugln("discover: interfaces:", err)
  88. }
  89. l.Infoln("Local discovery over IPv6 unavailable")
  90. return
  91. }
  92. v6Intfs := 0
  93. for _, intf := range intfs {
  94. if intf.Flags&net.FlagUp == 0 || intf.Flags&net.FlagMulticast == 0 {
  95. continue
  96. }
  97. mb, err := beacon.NewMulticast(localMCAddr, intf.Name)
  98. if err != nil {
  99. if debug {
  100. l.Debugln("discover: Start local v6:", err)
  101. }
  102. continue
  103. }
  104. d.beacons = append(d.beacons, mb)
  105. go d.recvAnnouncements(mb)
  106. v6Intfs++
  107. }
  108. if v6Intfs == 0 {
  109. l.Infoln("Local discovery over IPv6 unavailable")
  110. }
  111. }
  112. func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
  113. d.mut.Lock()
  114. defer d.mut.Unlock()
  115. if len(d.clients) > 0 {
  116. d.stopGlobal()
  117. }
  118. d.extPort = extPort
  119. pkt := d.announcementPkt()
  120. wg := sync.WaitGroup{}
  121. clients := make(chan Client, len(servers))
  122. for _, address := range servers {
  123. wg.Add(1)
  124. go func(addr string) {
  125. defer wg.Done()
  126. client, err := New(addr, pkt)
  127. if err != nil {
  128. l.Infoln("Error creating discovery client", addr, err)
  129. return
  130. }
  131. clients <- client
  132. }(address)
  133. }
  134. wg.Wait()
  135. close(clients)
  136. for client := range clients {
  137. d.clients = append(d.clients, client)
  138. }
  139. }
  140. func (d *Discoverer) StopGlobal() {
  141. d.mut.Lock()
  142. defer d.mut.Unlock()
  143. d.stopGlobal()
  144. }
  145. func (d *Discoverer) stopGlobal() {
  146. for _, client := range d.clients {
  147. client.Stop()
  148. }
  149. d.clients = []Client{}
  150. }
  151. func (d *Discoverer) ExtAnnounceOK() map[string]bool {
  152. d.mut.RLock()
  153. defer d.mut.RUnlock()
  154. ret := make(map[string]bool)
  155. for _, client := range d.clients {
  156. ret[client.Address()] = client.StatusOK()
  157. }
  158. return ret
  159. }
  160. func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
  161. d.registryLock.RLock()
  162. cached := d.filterCached(d.registry[device])
  163. lastLookup := d.lastLookup[device]
  164. d.registryLock.RUnlock()
  165. d.mut.RLock()
  166. defer d.mut.RUnlock()
  167. if len(cached) > 0 {
  168. // There are cached address entries.
  169. addrs := make([]string, len(cached))
  170. for i := range cached {
  171. addrs[i] = cached[i].Address
  172. }
  173. return addrs
  174. }
  175. if time.Since(lastLookup) < d.negCacheCutoff {
  176. // We have recently tried to lookup this address and failed. Lets
  177. // chill for a while.
  178. return nil
  179. }
  180. if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
  181. // Only perform external lookups if we have at least one external
  182. // server client and one local announcement interval has passed. This is
  183. // to avoid finding local peers on their remote address at startup.
  184. results := make(chan []string, len(d.clients))
  185. wg := sync.WaitGroup{}
  186. for _, client := range d.clients {
  187. wg.Add(1)
  188. go func(c Client) {
  189. defer wg.Done()
  190. results <- c.Lookup(device)
  191. }(client)
  192. }
  193. wg.Wait()
  194. close(results)
  195. cached := []CacheEntry{}
  196. seen := make(map[string]struct{})
  197. now := time.Now()
  198. var addrs []string
  199. for result := range results {
  200. for _, addr := range result {
  201. _, ok := seen[addr]
  202. if !ok {
  203. cached = append(cached, CacheEntry{
  204. Address: addr,
  205. Seen: now,
  206. })
  207. seen[addr] = struct{}{}
  208. addrs = append(addrs, addr)
  209. }
  210. }
  211. }
  212. d.registryLock.Lock()
  213. d.registry[device] = cached
  214. d.lastLookup[device] = time.Now()
  215. d.registryLock.Unlock()
  216. return addrs
  217. }
  218. return nil
  219. }
  220. func (d *Discoverer) Hint(device string, addrs []string) {
  221. resAddrs := resolveAddrs(addrs)
  222. var id protocol.DeviceID
  223. id.UnmarshalText([]byte(device))
  224. d.registerDevice(nil, Device{
  225. Addresses: resAddrs,
  226. ID: id[:],
  227. })
  228. }
  229. func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
  230. d.registryLock.RLock()
  231. devices := make(map[protocol.DeviceID][]CacheEntry, len(d.registry))
  232. for device, addrs := range d.registry {
  233. addrsCopy := make([]CacheEntry, len(addrs))
  234. copy(addrsCopy, addrs)
  235. devices[device] = addrsCopy
  236. }
  237. d.registryLock.RUnlock()
  238. return devices
  239. }
  240. func (d *Discoverer) announcementPkt() *Announce {
  241. var addrs []Address
  242. if d.extPort != 0 {
  243. addrs = []Address{{Port: d.extPort}}
  244. } else {
  245. for _, astr := range d.listenAddrs {
  246. addr, err := net.ResolveTCPAddr("tcp", astr)
  247. if err != nil {
  248. l.Warnln("discover: %v: not announcing %s", err, astr)
  249. continue
  250. } else if debug {
  251. l.Debugf("discover: resolved %s as %#v", astr, addr)
  252. }
  253. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  254. addrs = append(addrs, Address{Port: uint16(addr.Port)})
  255. } else if bs := addr.IP.To4(); bs != nil {
  256. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  257. } else if bs := addr.IP.To16(); bs != nil {
  258. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  259. }
  260. }
  261. }
  262. return &Announce{
  263. Magic: AnnouncementMagic,
  264. This: Device{d.myID[:], addrs},
  265. }
  266. }
  267. func (d *Discoverer) sendLocalAnnouncements() {
  268. var addrs = resolveAddrs(d.listenAddrs)
  269. var pkt = Announce{
  270. Magic: AnnouncementMagic,
  271. This: Device{d.myID[:], addrs},
  272. }
  273. msg := pkt.MustMarshalXDR()
  274. for {
  275. for _, b := range d.beacons {
  276. b.Send(msg)
  277. }
  278. select {
  279. case <-d.localBcastTick:
  280. case <-d.forcedBcastTick:
  281. }
  282. }
  283. }
  284. func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
  285. for {
  286. buf, addr := b.Recv()
  287. var pkt Announce
  288. err := pkt.UnmarshalXDR(buf)
  289. if err != nil && err != io.EOF {
  290. if debug {
  291. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  292. }
  293. continue
  294. }
  295. if debug {
  296. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  297. }
  298. var newDevice bool
  299. if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
  300. newDevice = d.registerDevice(addr, pkt.This)
  301. }
  302. if newDevice {
  303. select {
  304. case d.forcedBcastTick <- time.Now():
  305. }
  306. }
  307. }
  308. }
  309. func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
  310. var id protocol.DeviceID
  311. copy(id[:], device.ID)
  312. d.registryLock.Lock()
  313. defer d.registryLock.Unlock()
  314. current := d.filterCached(d.registry[id])
  315. orig := current
  316. for _, a := range device.Addresses {
  317. var deviceAddr string
  318. if len(a.IP) > 0 {
  319. deviceAddr = net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  320. } else if addr != nil {
  321. ua := addr.(*net.UDPAddr)
  322. ua.Port = int(a.Port)
  323. deviceAddr = ua.String()
  324. }
  325. for i := range current {
  326. if current[i].Address == deviceAddr {
  327. current[i].Seen = time.Now()
  328. goto done
  329. }
  330. }
  331. current = append(current, CacheEntry{
  332. Address: deviceAddr,
  333. Seen: time.Now(),
  334. })
  335. done:
  336. }
  337. if debug {
  338. l.Debugf("discover: Caching %s addresses: %v", id, current)
  339. }
  340. d.registry[id] = current
  341. if len(current) > len(orig) {
  342. addrs := make([]string, len(current))
  343. for i := range current {
  344. addrs[i] = current[i].Address
  345. }
  346. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  347. "device": id.String(),
  348. "addrs": addrs,
  349. })
  350. }
  351. return len(current) > len(orig)
  352. }
  353. func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
  354. for i := 0; i < len(c); {
  355. if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
  356. if debug {
  357. l.Debugf("discover: Removing cached address %s - seen %v ago", c[i].Address, ago)
  358. }
  359. c[i] = c[len(c)-1]
  360. c = c[:len(c)-1]
  361. } else {
  362. i++
  363. }
  364. }
  365. return c
  366. }
  367. func addrToAddr(addr *net.TCPAddr) Address {
  368. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  369. return Address{Port: uint16(addr.Port)}
  370. } else if bs := addr.IP.To4(); bs != nil {
  371. return Address{IP: bs, Port: uint16(addr.Port)}
  372. } else if bs := addr.IP.To16(); bs != nil {
  373. return Address{IP: bs, Port: uint16(addr.Port)}
  374. }
  375. return Address{}
  376. }
  377. func resolveAddrs(addrs []string) []Address {
  378. var raddrs []Address
  379. for _, addrStr := range addrs {
  380. addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
  381. if err != nil {
  382. continue
  383. }
  384. addr := addrToAddr(addrRes)
  385. if len(addr.IP) > 0 {
  386. raddrs = append(raddrs, addr)
  387. } else {
  388. raddrs = append(raddrs, Address{Port: addr.Port})
  389. }
  390. }
  391. return raddrs
  392. }