discover.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This program is free software: you can redistribute it and/or modify it
  4. // under the terms of the GNU General Public License as published by the Free
  5. // Software Foundation, either version 3 of the License, or (at your option)
  6. // any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful, but WITHOUT
  9. // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  11. // more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program. If not, see <http://www.gnu.org/licenses/>.
  15. package discover
  16. import (
  17. "bytes"
  18. "encoding/hex"
  19. "errors"
  20. "io"
  21. "net"
  22. "strconv"
  23. "sync"
  24. "time"
  25. "github.com/syncthing/syncthing/internal/beacon"
  26. "github.com/syncthing/syncthing/internal/events"
  27. "github.com/syncthing/syncthing/internal/protocol"
  28. )
  29. type Discoverer struct {
  30. myID protocol.DeviceID
  31. listenAddrs []string
  32. localBcastIntv time.Duration
  33. localBcastStart time.Time
  34. globalBcastIntv time.Duration
  35. errorRetryIntv time.Duration
  36. cacheLifetime time.Duration
  37. broadcastBeacon beacon.Interface
  38. multicastBeacon beacon.Interface
  39. registry map[protocol.DeviceID][]CacheEntry
  40. registryLock sync.RWMutex
  41. extServers []string
  42. extPort uint16
  43. localBcastTick <-chan time.Time
  44. stopGlobal chan struct{}
  45. globalWG sync.WaitGroup
  46. forcedBcastTick chan time.Time
  47. extAnnounceOK map[string]bool
  48. extAnnounceOKmut sync.Mutex
  49. }
  50. type CacheEntry struct {
  51. Address string
  52. Seen time.Time
  53. }
  54. var (
  55. ErrIncorrectMagic = errors.New("incorrect magic number")
  56. )
  57. func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
  58. return &Discoverer{
  59. myID: id,
  60. listenAddrs: addresses,
  61. localBcastIntv: 30 * time.Second,
  62. globalBcastIntv: 1800 * time.Second,
  63. errorRetryIntv: 60 * time.Second,
  64. cacheLifetime: 5 * time.Minute,
  65. registry: make(map[protocol.DeviceID][]CacheEntry),
  66. extAnnounceOK: make(map[string]bool),
  67. }
  68. }
  69. func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
  70. if localPort > 0 {
  71. bb, err := beacon.NewBroadcast(localPort)
  72. if err != nil {
  73. if debug {
  74. l.Debugln(err)
  75. }
  76. l.Infoln("Local discovery over IPv4 unavailable")
  77. } else {
  78. d.broadcastBeacon = bb
  79. go d.recvAnnouncements(bb)
  80. }
  81. }
  82. if len(localMCAddr) > 0 {
  83. mb, err := beacon.NewMulticast(localMCAddr)
  84. if err != nil {
  85. if debug {
  86. l.Debugln(err)
  87. }
  88. l.Infoln("Local discovery over IPv6 unavailable")
  89. } else {
  90. d.multicastBeacon = mb
  91. go d.recvAnnouncements(mb)
  92. }
  93. }
  94. if d.broadcastBeacon == nil && d.multicastBeacon == nil {
  95. l.Warnln("Local discovery unavailable")
  96. } else {
  97. d.localBcastTick = time.Tick(d.localBcastIntv)
  98. d.forcedBcastTick = make(chan time.Time)
  99. d.localBcastStart = time.Now()
  100. go d.sendLocalAnnouncements()
  101. }
  102. }
  103. func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
  104. // Wait for any previous announcer to stop before starting a new one.
  105. d.globalWG.Wait()
  106. d.extServers = servers
  107. d.extPort = extPort
  108. d.stopGlobal = make(chan struct{})
  109. d.globalWG.Add(1)
  110. go func() {
  111. defer d.globalWG.Done()
  112. buf := d.announcementPkt()
  113. for _, extServer := range d.extServers {
  114. d.globalWG.Add(1)
  115. go func(server string) {
  116. d.sendExternalAnnouncements(server, buf)
  117. d.globalWG.Done()
  118. }(extServer)
  119. }
  120. }()
  121. }
  122. func (d *Discoverer) StopGlobal() {
  123. if d.stopGlobal != nil {
  124. close(d.stopGlobal)
  125. d.globalWG.Wait()
  126. }
  127. }
  128. func (d *Discoverer) ExtAnnounceOK() map[string]bool {
  129. d.extAnnounceOKmut.Lock()
  130. defer d.extAnnounceOKmut.Unlock()
  131. return d.extAnnounceOK
  132. }
  133. func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
  134. d.registryLock.RLock()
  135. cached := d.filterCached(d.registry[device])
  136. d.registryLock.RUnlock()
  137. if len(cached) > 0 {
  138. addrs := make([]string, len(cached))
  139. for i := range cached {
  140. addrs[i] = cached[i].Address
  141. }
  142. return addrs
  143. } else if len(d.extServers) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
  144. // Only perform external lookups if we have at least one external
  145. // server and one local announcement interval has passed. This is to
  146. // avoid finding local peers on their remote address at startup.
  147. addrs := d.externalLookup(device)
  148. cached = make([]CacheEntry, len(addrs))
  149. for i := range addrs {
  150. cached[i] = CacheEntry{
  151. Address: addrs[i],
  152. Seen: time.Now(),
  153. }
  154. }
  155. d.registryLock.Lock()
  156. d.registry[device] = cached
  157. d.registryLock.Unlock()
  158. }
  159. return nil
  160. }
  161. func (d *Discoverer) Hint(device string, addrs []string) {
  162. resAddrs := resolveAddrs(addrs)
  163. var id protocol.DeviceID
  164. id.UnmarshalText([]byte(device))
  165. d.registerDevice(nil, Device{
  166. Addresses: resAddrs,
  167. ID: id[:],
  168. })
  169. }
  170. func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
  171. d.registryLock.RLock()
  172. devices := make(map[protocol.DeviceID][]CacheEntry, len(d.registry))
  173. for device, addrs := range d.registry {
  174. addrsCopy := make([]CacheEntry, len(addrs))
  175. copy(addrsCopy, addrs)
  176. devices[device] = addrsCopy
  177. }
  178. d.registryLock.RUnlock()
  179. return devices
  180. }
  181. func (d *Discoverer) announcementPkt() []byte {
  182. var addrs []Address
  183. if d.extPort != 0 {
  184. addrs = []Address{{Port: d.extPort}}
  185. } else {
  186. for _, astr := range d.listenAddrs {
  187. addr, err := net.ResolveTCPAddr("tcp", astr)
  188. if err != nil {
  189. l.Warnln("%v: not announcing %s", err, astr)
  190. continue
  191. } else if debug {
  192. l.Debugf("discover: announcing %s: %#v", astr, addr)
  193. }
  194. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  195. addrs = append(addrs, Address{Port: uint16(addr.Port)})
  196. } else if bs := addr.IP.To4(); bs != nil {
  197. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  198. } else if bs := addr.IP.To16(); bs != nil {
  199. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  200. }
  201. }
  202. }
  203. var pkt = Announce{
  204. Magic: AnnouncementMagic,
  205. This: Device{d.myID[:], addrs},
  206. }
  207. return pkt.MustMarshalXDR()
  208. }
  209. func (d *Discoverer) sendLocalAnnouncements() {
  210. var addrs = resolveAddrs(d.listenAddrs)
  211. var pkt = Announce{
  212. Magic: AnnouncementMagic,
  213. This: Device{d.myID[:], addrs},
  214. }
  215. msg := pkt.MustMarshalXDR()
  216. for {
  217. if d.multicastBeacon != nil {
  218. d.multicastBeacon.Send(msg)
  219. }
  220. if d.broadcastBeacon != nil {
  221. d.broadcastBeacon.Send(msg)
  222. }
  223. select {
  224. case <-d.localBcastTick:
  225. case <-d.forcedBcastTick:
  226. }
  227. }
  228. }
  229. func (d *Discoverer) sendExternalAnnouncements(extServer string, buf []byte) {
  230. timer := time.NewTimer(0)
  231. conn, err := net.ListenUDP("udp", nil)
  232. for err != nil {
  233. timer.Reset(d.errorRetryIntv)
  234. l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv)
  235. select {
  236. case <-d.stopGlobal:
  237. return
  238. case <-timer.C:
  239. }
  240. conn, err = net.ListenUDP("udp", nil)
  241. }
  242. remote, err := net.ResolveUDPAddr("udp", extServer)
  243. for err != nil {
  244. timer.Reset(d.errorRetryIntv)
  245. l.Warnf("Global discovery: %s: %v; trying again in %v", extServer, err, d.errorRetryIntv)
  246. select {
  247. case <-d.stopGlobal:
  248. return
  249. case <-timer.C:
  250. }
  251. remote, err = net.ResolveUDPAddr("udp", extServer)
  252. }
  253. // Delay the first announcement until after a full local announcement
  254. // cycle, to increase the chance of other peers finding us locally first.
  255. timer.Reset(d.localBcastIntv)
  256. for {
  257. select {
  258. case <-d.stopGlobal:
  259. return
  260. case <-timer.C:
  261. var ok bool
  262. if debug {
  263. l.Debugf("discover: send announcement -> %v\n%s", remote, hex.Dump(buf))
  264. }
  265. _, err := conn.WriteTo(buf, remote)
  266. if err != nil {
  267. if debug {
  268. l.Debugln("discover: %s: warning:", extServer, err)
  269. }
  270. ok = false
  271. } else {
  272. // Verify that the announce server responds positively for our device ID
  273. time.Sleep(1 * time.Second)
  274. res := d.externalLookupOnServer(extServer, d.myID)
  275. if debug {
  276. l.Debugln("discover:", extServer, "external lookup check:", res)
  277. }
  278. ok = len(res) > 0
  279. }
  280. d.extAnnounceOKmut.Lock()
  281. d.extAnnounceOK[extServer] = ok
  282. d.extAnnounceOKmut.Unlock()
  283. if ok {
  284. timer.Reset(d.globalBcastIntv)
  285. } else {
  286. timer.Reset(d.errorRetryIntv)
  287. }
  288. }
  289. }
  290. }
  291. func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
  292. for {
  293. buf, addr := b.Recv()
  294. if debug {
  295. l.Debugf("discover: read announcement from %s:\n%s", addr, hex.Dump(buf))
  296. }
  297. var pkt Announce
  298. err := pkt.UnmarshalXDR(buf)
  299. if err != nil && err != io.EOF {
  300. continue
  301. }
  302. var newDevice bool
  303. if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
  304. newDevice = d.registerDevice(addr, pkt.This)
  305. }
  306. if newDevice {
  307. select {
  308. case d.forcedBcastTick <- time.Now():
  309. }
  310. }
  311. }
  312. }
  313. func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
  314. var id protocol.DeviceID
  315. copy(id[:], device.ID)
  316. d.registryLock.Lock()
  317. defer d.registryLock.Unlock()
  318. current := d.filterCached(d.registry[id])
  319. orig := current
  320. for _, a := range device.Addresses {
  321. var deviceAddr string
  322. if len(a.IP) > 0 {
  323. deviceAddr = net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  324. } else if addr != nil {
  325. ua := addr.(*net.UDPAddr)
  326. ua.Port = int(a.Port)
  327. deviceAddr = ua.String()
  328. }
  329. for i := range current {
  330. if current[i].Address == deviceAddr {
  331. current[i].Seen = time.Now()
  332. goto done
  333. }
  334. }
  335. current = append(current, CacheEntry{
  336. Address: deviceAddr,
  337. Seen: time.Now(),
  338. })
  339. done:
  340. }
  341. if debug {
  342. l.Debugf("discover: register: %v -> %v", id, current)
  343. }
  344. d.registry[id] = current
  345. if len(current) > len(orig) {
  346. addrs := make([]string, len(current))
  347. for i := range current {
  348. addrs[i] = current[i].Address
  349. }
  350. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  351. "device": id.String(),
  352. "addrs": addrs,
  353. })
  354. }
  355. return len(current) > len(orig)
  356. }
  357. func (d *Discoverer) externalLookup(device protocol.DeviceID) []string {
  358. // Buffer up to as many answers as we have servers to query.
  359. results := make(chan []string, len(d.extServers))
  360. // Query all servers.
  361. wg := sync.WaitGroup{}
  362. for _, extServer := range d.extServers {
  363. wg.Add(1)
  364. go func(server string) {
  365. result := d.externalLookupOnServer(server, device)
  366. if debug {
  367. l.Debugln("discover:", result, "from", server, "for", device)
  368. }
  369. results <- result
  370. wg.Done()
  371. }(extServer)
  372. }
  373. wg.Wait()
  374. close(results)
  375. addrs := []string{}
  376. for result := range results {
  377. addrs = append(addrs, result...)
  378. }
  379. return addrs
  380. }
  381. func (d *Discoverer) externalLookupOnServer(extServer string, device protocol.DeviceID) []string {
  382. extIP, err := net.ResolveUDPAddr("udp", extServer)
  383. if err != nil {
  384. if debug {
  385. l.Debugf("discover: %s: %v; no external lookup", extServer, err)
  386. }
  387. return nil
  388. }
  389. conn, err := net.DialUDP("udp", nil, extIP)
  390. if err != nil {
  391. if debug {
  392. l.Debugf("discover: %s: %v; no external lookup", extServer, err)
  393. }
  394. return nil
  395. }
  396. defer conn.Close()
  397. err = conn.SetDeadline(time.Now().Add(5 * time.Second))
  398. if err != nil {
  399. if debug {
  400. l.Debugf("discover: %s: %v; no external lookup", extServer, err)
  401. }
  402. return nil
  403. }
  404. buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
  405. _, err = conn.Write(buf)
  406. if err != nil {
  407. if debug {
  408. l.Debugf("discover: %s: %v; no external lookup", extServer, err)
  409. }
  410. return nil
  411. }
  412. buf = make([]byte, 2048)
  413. n, err := conn.Read(buf)
  414. if err != nil {
  415. if err, ok := err.(net.Error); ok && err.Timeout() {
  416. // Expected if the server doesn't know about requested device ID
  417. return nil
  418. }
  419. if debug {
  420. l.Debugf("discover: %s: %v; no external lookup", extServer, err)
  421. }
  422. return nil
  423. }
  424. if debug {
  425. l.Debugf("discover: %s: read external:\n%s", extServer, hex.Dump(buf[:n]))
  426. }
  427. var pkt Announce
  428. err = pkt.UnmarshalXDR(buf[:n])
  429. if err != nil && err != io.EOF {
  430. if debug {
  431. l.Debugln("discover:", extServer, err)
  432. }
  433. return nil
  434. }
  435. var addrs []string
  436. for _, a := range pkt.This.Addresses {
  437. deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  438. addrs = append(addrs, deviceAddr)
  439. }
  440. return addrs
  441. }
  442. func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
  443. for i := 0; i < len(c); {
  444. if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
  445. if debug {
  446. l.Debugf("removing cached address %s: seen %v ago", c[i].Address, ago)
  447. }
  448. c[i] = c[len(c)-1]
  449. c = c[:len(c)-1]
  450. } else {
  451. i++
  452. }
  453. }
  454. return c
  455. }
  456. func addrToAddr(addr *net.TCPAddr) Address {
  457. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  458. return Address{Port: uint16(addr.Port)}
  459. } else if bs := addr.IP.To4(); bs != nil {
  460. return Address{IP: bs, Port: uint16(addr.Port)}
  461. } else if bs := addr.IP.To16(); bs != nil {
  462. return Address{IP: bs, Port: uint16(addr.Port)}
  463. }
  464. return Address{}
  465. }
  466. func resolveAddrs(addrs []string) []Address {
  467. var raddrs []Address
  468. for _, addrStr := range addrs {
  469. addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
  470. if err != nil {
  471. continue
  472. }
  473. addr := addrToAddr(addrRes)
  474. if len(addr.IP) > 0 {
  475. raddrs = append(raddrs, addr)
  476. } else {
  477. raddrs = append(raddrs, Address{Port: addr.Port})
  478. }
  479. }
  480. return raddrs
  481. }