discover.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "io"
  12. "net"
  13. "runtime"
  14. "strconv"
  15. "time"
  16. "github.com/syncthing/protocol"
  17. "github.com/syncthing/syncthing/internal/beacon"
  18. "github.com/syncthing/syncthing/internal/events"
  19. "github.com/syncthing/syncthing/internal/sync"
  20. )
  21. type Discoverer struct {
  22. myID protocol.DeviceID
  23. listenAddrs []string
  24. localBcastIntv time.Duration
  25. localBcastStart time.Time
  26. cacheLifetime time.Duration
  27. negCacheCutoff time.Duration
  28. beacons []beacon.Interface
  29. extPort uint16
  30. localBcastTick <-chan time.Time
  31. forcedBcastTick chan time.Time
  32. registryLock sync.RWMutex
  33. registry map[protocol.DeviceID][]CacheEntry
  34. lastLookup map[protocol.DeviceID]time.Time
  35. clients []Client
  36. mut sync.RWMutex
  37. }
  38. type CacheEntry struct {
  39. Address string
  40. Seen time.Time
  41. }
  42. var (
  43. ErrIncorrectMagic = errors.New("incorrect magic number")
  44. )
  45. func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
  46. return &Discoverer{
  47. myID: id,
  48. listenAddrs: addresses,
  49. localBcastIntv: 30 * time.Second,
  50. cacheLifetime: 5 * time.Minute,
  51. negCacheCutoff: 3 * time.Minute,
  52. registry: make(map[protocol.DeviceID][]CacheEntry),
  53. lastLookup: make(map[protocol.DeviceID]time.Time),
  54. registryLock: sync.NewRWMutex(),
  55. mut: sync.NewRWMutex(),
  56. }
  57. }
  58. func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
  59. if localPort > 0 {
  60. d.startLocalIPv4Broadcasts(localPort)
  61. }
  62. if len(localMCAddr) > 0 {
  63. d.startLocalIPv6Multicasts(localMCAddr)
  64. }
  65. if len(d.beacons) == 0 {
  66. l.Warnln("Local discovery unavailable")
  67. return
  68. }
  69. d.localBcastTick = time.Tick(d.localBcastIntv)
  70. d.forcedBcastTick = make(chan time.Time)
  71. d.localBcastStart = time.Now()
  72. go d.sendLocalAnnouncements()
  73. }
  74. func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
  75. bb := beacon.NewBroadcast(localPort)
  76. d.beacons = append(d.beacons, bb)
  77. go d.recvAnnouncements(bb)
  78. bb.ServeBackground()
  79. }
  80. func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {
  81. intfs, err := net.Interfaces()
  82. if err != nil {
  83. if debug {
  84. l.Debugln("discover: interfaces:", err)
  85. }
  86. l.Infoln("Local discovery over IPv6 unavailable")
  87. return
  88. }
  89. v6Intfs := 0
  90. for _, intf := range intfs {
  91. // Interface flags seem to always be 0 on Windows
  92. if runtime.GOOS != "windows" && (intf.Flags&net.FlagUp == 0 || intf.Flags&net.FlagMulticast == 0) {
  93. continue
  94. }
  95. mb, err := beacon.NewMulticast(localMCAddr, intf.Name)
  96. if err != nil {
  97. if debug {
  98. l.Debugln("discover: Start local v6:", err)
  99. }
  100. continue
  101. }
  102. d.beacons = append(d.beacons, mb)
  103. go d.recvAnnouncements(mb)
  104. v6Intfs++
  105. }
  106. if v6Intfs == 0 {
  107. l.Infoln("Local discovery over IPv6 unavailable")
  108. }
  109. }
  110. func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
  111. d.mut.Lock()
  112. defer d.mut.Unlock()
  113. if len(d.clients) > 0 {
  114. d.stopGlobal()
  115. }
  116. d.extPort = extPort
  117. pkt := d.announcementPkt()
  118. wg := sync.NewWaitGroup()
  119. clients := make(chan Client, len(servers))
  120. for _, address := range servers {
  121. wg.Add(1)
  122. go func(addr string) {
  123. defer wg.Done()
  124. client, err := New(addr, pkt)
  125. if err != nil {
  126. l.Infoln("Error creating discovery client", addr, err)
  127. return
  128. }
  129. clients <- client
  130. }(address)
  131. }
  132. wg.Wait()
  133. close(clients)
  134. for client := range clients {
  135. d.clients = append(d.clients, client)
  136. }
  137. }
  138. func (d *Discoverer) StopGlobal() {
  139. d.mut.Lock()
  140. defer d.mut.Unlock()
  141. d.stopGlobal()
  142. }
  143. func (d *Discoverer) stopGlobal() {
  144. for _, client := range d.clients {
  145. client.Stop()
  146. }
  147. d.clients = []Client{}
  148. }
  149. func (d *Discoverer) ExtAnnounceOK() map[string]bool {
  150. d.mut.RLock()
  151. defer d.mut.RUnlock()
  152. ret := make(map[string]bool)
  153. for _, client := range d.clients {
  154. ret[client.Address()] = client.StatusOK()
  155. }
  156. return ret
  157. }
  158. func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
  159. d.registryLock.RLock()
  160. cached := d.filterCached(d.registry[device])
  161. lastLookup := d.lastLookup[device]
  162. d.registryLock.RUnlock()
  163. d.mut.RLock()
  164. defer d.mut.RUnlock()
  165. if len(cached) > 0 {
  166. // There are cached address entries.
  167. addrs := make([]string, len(cached))
  168. for i := range cached {
  169. addrs[i] = cached[i].Address
  170. }
  171. return addrs
  172. }
  173. if time.Since(lastLookup) < d.negCacheCutoff {
  174. // We have recently tried to lookup this address and failed. Lets
  175. // chill for a while.
  176. return nil
  177. }
  178. if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
  179. // Only perform external lookups if we have at least one external
  180. // server client and one local announcement interval has passed. This is
  181. // to avoid finding local peers on their remote address at startup.
  182. results := make(chan []string, len(d.clients))
  183. wg := sync.NewWaitGroup()
  184. for _, client := range d.clients {
  185. wg.Add(1)
  186. go func(c Client) {
  187. defer wg.Done()
  188. results <- c.Lookup(device)
  189. }(client)
  190. }
  191. wg.Wait()
  192. close(results)
  193. cached := []CacheEntry{}
  194. seen := make(map[string]struct{})
  195. now := time.Now()
  196. var addrs []string
  197. for result := range results {
  198. for _, addr := range result {
  199. _, ok := seen[addr]
  200. if !ok {
  201. cached = append(cached, CacheEntry{
  202. Address: addr,
  203. Seen: now,
  204. })
  205. seen[addr] = struct{}{}
  206. addrs = append(addrs, addr)
  207. }
  208. }
  209. }
  210. d.registryLock.Lock()
  211. d.registry[device] = cached
  212. d.lastLookup[device] = time.Now()
  213. d.registryLock.Unlock()
  214. return addrs
  215. }
  216. return nil
  217. }
  218. func (d *Discoverer) Hint(device string, addrs []string) {
  219. resAddrs := resolveAddrs(addrs)
  220. var id protocol.DeviceID
  221. id.UnmarshalText([]byte(device))
  222. d.registerDevice(nil, Device{
  223. Addresses: resAddrs,
  224. ID: id[:],
  225. })
  226. }
  227. func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
  228. d.registryLock.RLock()
  229. devices := make(map[protocol.DeviceID][]CacheEntry, len(d.registry))
  230. for device, addrs := range d.registry {
  231. addrsCopy := make([]CacheEntry, len(addrs))
  232. copy(addrsCopy, addrs)
  233. devices[device] = addrsCopy
  234. }
  235. d.registryLock.RUnlock()
  236. return devices
  237. }
  238. func (d *Discoverer) announcementPkt() *Announce {
  239. var addrs []Address
  240. if d.extPort != 0 {
  241. addrs = []Address{{Port: d.extPort}}
  242. } else {
  243. for _, astr := range d.listenAddrs {
  244. addr, err := net.ResolveTCPAddr("tcp", astr)
  245. if err != nil {
  246. l.Warnln("discover: %v: not announcing %s", err, astr)
  247. continue
  248. } else if debug {
  249. l.Debugf("discover: resolved %s as %#v", astr, addr)
  250. }
  251. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  252. addrs = append(addrs, Address{Port: uint16(addr.Port)})
  253. } else if bs := addr.IP.To4(); bs != nil {
  254. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  255. } else if bs := addr.IP.To16(); bs != nil {
  256. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  257. }
  258. }
  259. }
  260. return &Announce{
  261. Magic: AnnouncementMagic,
  262. This: Device{d.myID[:], addrs},
  263. }
  264. }
  265. func (d *Discoverer) sendLocalAnnouncements() {
  266. var addrs = resolveAddrs(d.listenAddrs)
  267. var pkt = Announce{
  268. Magic: AnnouncementMagic,
  269. This: Device{d.myID[:], addrs},
  270. }
  271. msg := pkt.MustMarshalXDR()
  272. for {
  273. for _, b := range d.beacons {
  274. b.Send(msg)
  275. }
  276. select {
  277. case <-d.localBcastTick:
  278. case <-d.forcedBcastTick:
  279. }
  280. }
  281. }
  282. func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
  283. for {
  284. buf, addr := b.Recv()
  285. var pkt Announce
  286. err := pkt.UnmarshalXDR(buf)
  287. if err != nil && err != io.EOF {
  288. if debug {
  289. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  290. }
  291. continue
  292. }
  293. if debug {
  294. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  295. }
  296. var newDevice bool
  297. if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
  298. newDevice = d.registerDevice(addr, pkt.This)
  299. }
  300. if newDevice {
  301. select {
  302. case d.forcedBcastTick <- time.Now():
  303. }
  304. }
  305. }
  306. }
  307. func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
  308. var id protocol.DeviceID
  309. copy(id[:], device.ID)
  310. d.registryLock.Lock()
  311. defer d.registryLock.Unlock()
  312. current := d.filterCached(d.registry[id])
  313. orig := current
  314. for _, a := range device.Addresses {
  315. var deviceAddr string
  316. if len(a.IP) > 0 {
  317. deviceAddr = net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  318. } else if addr != nil {
  319. ua := addr.(*net.UDPAddr)
  320. ua.Port = int(a.Port)
  321. deviceAddr = ua.String()
  322. }
  323. for i := range current {
  324. if current[i].Address == deviceAddr {
  325. current[i].Seen = time.Now()
  326. goto done
  327. }
  328. }
  329. current = append(current, CacheEntry{
  330. Address: deviceAddr,
  331. Seen: time.Now(),
  332. })
  333. done:
  334. }
  335. if debug {
  336. l.Debugf("discover: Caching %s addresses: %v", id, current)
  337. }
  338. d.registry[id] = current
  339. if len(current) > len(orig) {
  340. addrs := make([]string, len(current))
  341. for i := range current {
  342. addrs[i] = current[i].Address
  343. }
  344. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  345. "device": id.String(),
  346. "addrs": addrs,
  347. })
  348. }
  349. return len(current) > len(orig)
  350. }
  351. func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
  352. for i := 0; i < len(c); {
  353. if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
  354. if debug {
  355. l.Debugf("discover: Removing cached address %s - seen %v ago", c[i].Address, ago)
  356. }
  357. c[i] = c[len(c)-1]
  358. c = c[:len(c)-1]
  359. } else {
  360. i++
  361. }
  362. }
  363. return c
  364. }
  365. func addrToAddr(addr *net.TCPAddr) Address {
  366. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  367. return Address{Port: uint16(addr.Port)}
  368. } else if bs := addr.IP.To4(); bs != nil {
  369. return Address{IP: bs, Port: uint16(addr.Port)}
  370. } else if bs := addr.IP.To16(); bs != nil {
  371. return Address{IP: bs, Port: uint16(addr.Port)}
  372. }
  373. return Address{}
  374. }
  375. func resolveAddrs(addrs []string) []Address {
  376. var raddrs []Address
  377. for _, addrStr := range addrs {
  378. addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
  379. if err != nil {
  380. continue
  381. }
  382. addr := addrToAddr(addrRes)
  383. if len(addr.IP) > 0 {
  384. raddrs = append(raddrs, addr)
  385. } else {
  386. raddrs = append(raddrs, Address{Port: addr.Port})
  387. }
  388. }
  389. return raddrs
  390. }