discover.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package discover
  5. import (
  6. "bytes"
  7. "encoding/hex"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "sync"
  13. "time"
  14. "github.com/calmh/syncthing/beacon"
  15. "github.com/calmh/syncthing/events"
  16. "github.com/calmh/syncthing/protocol"
  17. )
  18. type Discoverer struct {
  19. myID protocol.NodeID
  20. listenAddrs []string
  21. localBcastIntv time.Duration
  22. globalBcastIntv time.Duration
  23. beacon *beacon.Beacon
  24. registry map[protocol.NodeID][]string
  25. registryLock sync.RWMutex
  26. extServer string
  27. extPort uint16
  28. localBcastTick <-chan time.Time
  29. forcedBcastTick chan time.Time
  30. extAnnounceOK bool
  31. extAnnounceOKmut sync.Mutex
  32. }
  33. var (
  34. ErrIncorrectMagic = errors.New("incorrect magic number")
  35. )
  36. // We tolerate a certain amount of errors because we might be running on
  37. // laptops that sleep and wake, have intermittent network connectivity, etc.
  38. // When we hit this many errors in succession, we stop.
  39. const maxErrors = 30
  40. func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Discoverer, error) {
  41. b, err := beacon.New(localPort)
  42. if err != nil {
  43. return nil, err
  44. }
  45. disc := &Discoverer{
  46. myID: id,
  47. listenAddrs: addresses,
  48. localBcastIntv: 30 * time.Second,
  49. globalBcastIntv: 1800 * time.Second,
  50. beacon: b,
  51. registry: make(map[protocol.NodeID][]string),
  52. }
  53. go disc.recvAnnouncements()
  54. return disc, nil
  55. }
  56. func (d *Discoverer) StartLocal() {
  57. d.localBcastTick = time.Tick(d.localBcastIntv)
  58. d.forcedBcastTick = make(chan time.Time)
  59. go d.sendLocalAnnouncements()
  60. }
  61. func (d *Discoverer) StartGlobal(server string, extPort uint16) {
  62. d.extServer = server
  63. d.extPort = extPort
  64. go d.sendExternalAnnouncements()
  65. }
  66. func (d *Discoverer) ExtAnnounceOK() bool {
  67. d.extAnnounceOKmut.Lock()
  68. defer d.extAnnounceOKmut.Unlock()
  69. return d.extAnnounceOK
  70. }
  71. func (d *Discoverer) Lookup(node protocol.NodeID) []string {
  72. d.registryLock.Lock()
  73. addr, ok := d.registry[node]
  74. d.registryLock.Unlock()
  75. if ok {
  76. return addr
  77. } else if len(d.extServer) != 0 {
  78. // We might want to cache this, but not permanently so it needs some intelligence
  79. return d.externalLookup(node)
  80. }
  81. return nil
  82. }
  83. func (d *Discoverer) Hint(node string, addrs []string) {
  84. resAddrs := resolveAddrs(addrs)
  85. var id protocol.NodeID
  86. id.UnmarshalText([]byte(node))
  87. d.registerNode(nil, Node{
  88. Addresses: resAddrs,
  89. ID: id[:],
  90. })
  91. }
  92. func (d *Discoverer) All() map[protocol.NodeID][]string {
  93. d.registryLock.RLock()
  94. nodes := make(map[protocol.NodeID][]string, len(d.registry))
  95. for node, addrs := range d.registry {
  96. addrsCopy := make([]string, len(addrs))
  97. copy(addrsCopy, addrs)
  98. nodes[node] = addrsCopy
  99. }
  100. d.registryLock.RUnlock()
  101. return nodes
  102. }
  103. func (d *Discoverer) announcementPkt() []byte {
  104. var addrs []Address
  105. for _, astr := range d.listenAddrs {
  106. addr, err := net.ResolveTCPAddr("tcp", astr)
  107. if err != nil {
  108. l.Warnln("%v: not announcing %s", err, astr)
  109. continue
  110. } else if debug {
  111. l.Debugf("discover: announcing %s: %#v", astr, addr)
  112. }
  113. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  114. addrs = append(addrs, Address{Port: uint16(addr.Port)})
  115. } else if bs := addr.IP.To4(); bs != nil {
  116. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  117. } else if bs := addr.IP.To16(); bs != nil {
  118. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  119. }
  120. }
  121. var pkt = Announce{
  122. Magic: AnnouncementMagic,
  123. This: Node{d.myID[:], addrs},
  124. }
  125. return pkt.MarshalXDR()
  126. }
  127. func (d *Discoverer) sendLocalAnnouncements() {
  128. var addrs = resolveAddrs(d.listenAddrs)
  129. var pkt = Announce{
  130. Magic: AnnouncementMagic,
  131. This: Node{d.myID[:], addrs},
  132. }
  133. for {
  134. pkt.Extra = nil
  135. d.registryLock.RLock()
  136. for node, addrs := range d.registry {
  137. if len(pkt.Extra) == 16 {
  138. break
  139. }
  140. anode := Node{node[:], resolveAddrs(addrs)}
  141. pkt.Extra = append(pkt.Extra, anode)
  142. }
  143. d.registryLock.RUnlock()
  144. d.beacon.Send(pkt.MarshalXDR())
  145. select {
  146. case <-d.localBcastTick:
  147. case <-d.forcedBcastTick:
  148. }
  149. }
  150. }
  151. func (d *Discoverer) sendExternalAnnouncements() {
  152. // this should go in the Discoverer struct
  153. errorRetryIntv := 60 * time.Second
  154. remote, err := net.ResolveUDPAddr("udp", d.extServer)
  155. for err != nil {
  156. l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
  157. time.Sleep(errorRetryIntv)
  158. remote, err = net.ResolveUDPAddr("udp", d.extServer)
  159. }
  160. conn, err := net.ListenUDP("udp", nil)
  161. for err != nil {
  162. l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
  163. time.Sleep(errorRetryIntv)
  164. conn, err = net.ListenUDP("udp", nil)
  165. }
  166. var buf []byte
  167. if d.extPort != 0 {
  168. var pkt = Announce{
  169. Magic: AnnouncementMagic,
  170. This: Node{d.myID[:], []Address{{Port: d.extPort}}},
  171. }
  172. buf = pkt.MarshalXDR()
  173. } else {
  174. buf = d.announcementPkt()
  175. }
  176. for {
  177. var ok bool
  178. if debug {
  179. l.Debugf("discover: send announcement -> %v\n%s", remote, hex.Dump(buf))
  180. }
  181. _, err := conn.WriteTo(buf, remote)
  182. if err != nil {
  183. if debug {
  184. l.Debugln("discover: warning:", err)
  185. }
  186. ok = false
  187. } else {
  188. // Verify that the announce server responds positively for our node ID
  189. time.Sleep(1 * time.Second)
  190. res := d.externalLookup(d.myID)
  191. if debug {
  192. l.Debugln("discover: external lookup check:", res)
  193. }
  194. ok = len(res) > 0
  195. }
  196. d.extAnnounceOKmut.Lock()
  197. d.extAnnounceOK = ok
  198. d.extAnnounceOKmut.Unlock()
  199. if ok {
  200. time.Sleep(d.globalBcastIntv)
  201. } else {
  202. time.Sleep(errorRetryIntv)
  203. }
  204. }
  205. }
  206. func (d *Discoverer) recvAnnouncements() {
  207. for {
  208. buf, addr := d.beacon.Recv()
  209. if debug {
  210. l.Debugf("discover: read announcement:\n%s", hex.Dump(buf))
  211. }
  212. var pkt Announce
  213. err := pkt.UnmarshalXDR(buf)
  214. if err != nil && err != io.EOF {
  215. continue
  216. }
  217. if debug {
  218. l.Debugf("discover: parsed announcement: %#v", pkt)
  219. }
  220. var newNode bool
  221. if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
  222. newNode = d.registerNode(addr, pkt.This)
  223. for _, node := range pkt.Extra {
  224. if bytes.Compare(node.ID, d.myID[:]) != 0 {
  225. if d.registerNode(nil, node) {
  226. newNode = true
  227. }
  228. }
  229. }
  230. }
  231. if newNode {
  232. select {
  233. case d.forcedBcastTick <- time.Now():
  234. }
  235. }
  236. }
  237. }
  238. func (d *Discoverer) registerNode(addr net.Addr, node Node) bool {
  239. var addrs []string
  240. for _, a := range node.Addresses {
  241. var nodeAddr string
  242. if len(a.IP) > 0 {
  243. nodeAddr = fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
  244. addrs = append(addrs, nodeAddr)
  245. } else if addr != nil {
  246. ua := addr.(*net.UDPAddr)
  247. ua.Port = int(a.Port)
  248. nodeAddr = ua.String()
  249. addrs = append(addrs, nodeAddr)
  250. }
  251. }
  252. if len(addrs) == 0 {
  253. if debug {
  254. l.Debugln("discover: no valid address for", node.ID)
  255. }
  256. }
  257. if debug {
  258. l.Debugf("discover: register: %s -> %#v", node.ID, addrs)
  259. }
  260. var id protocol.NodeID
  261. copy(id[:], node.ID)
  262. d.registryLock.Lock()
  263. _, seen := d.registry[id]
  264. d.registry[id] = addrs
  265. d.registryLock.Unlock()
  266. if !seen {
  267. events.Default.Log(events.NodeDiscovered, map[string]interface{}{
  268. "node": id.String(),
  269. "addrs": addrs,
  270. })
  271. }
  272. return !seen
  273. }
  274. func (d *Discoverer) externalLookup(node protocol.NodeID) []string {
  275. extIP, err := net.ResolveUDPAddr("udp", d.extServer)
  276. if err != nil {
  277. if debug {
  278. l.Debugf("discover: %v; no external lookup", err)
  279. }
  280. return nil
  281. }
  282. conn, err := net.DialUDP("udp", nil, extIP)
  283. if err != nil {
  284. if debug {
  285. l.Debugf("discover: %v; no external lookup", err)
  286. }
  287. return nil
  288. }
  289. defer conn.Close()
  290. err = conn.SetDeadline(time.Now().Add(5 * time.Second))
  291. if err != nil {
  292. if debug {
  293. l.Debugf("discover: %v; no external lookup", err)
  294. }
  295. return nil
  296. }
  297. buf := Query{QueryMagic, node[:]}.MarshalXDR()
  298. _, err = conn.Write(buf)
  299. if err != nil {
  300. if debug {
  301. l.Debugf("discover: %v; no external lookup", err)
  302. }
  303. return nil
  304. }
  305. buf = make([]byte, 2048)
  306. n, err := conn.Read(buf)
  307. if err != nil {
  308. if err, ok := err.(net.Error); ok && err.Timeout() {
  309. // Expected if the server doesn't know about requested node ID
  310. return nil
  311. }
  312. if debug {
  313. l.Debugf("discover: %v; no external lookup", err)
  314. }
  315. return nil
  316. }
  317. if debug {
  318. l.Debugf("discover: read external:\n%s", hex.Dump(buf[:n]))
  319. }
  320. var pkt Announce
  321. err = pkt.UnmarshalXDR(buf[:n])
  322. if err != nil && err != io.EOF {
  323. if debug {
  324. l.Debugln("discover:", err)
  325. }
  326. return nil
  327. }
  328. if debug {
  329. l.Debugf("discover: parsed external: %#v", pkt)
  330. }
  331. var addrs []string
  332. for _, a := range pkt.This.Addresses {
  333. nodeAddr := fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
  334. addrs = append(addrs, nodeAddr)
  335. }
  336. return addrs
  337. }
  338. func addrToAddr(addr *net.TCPAddr) Address {
  339. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  340. return Address{Port: uint16(addr.Port)}
  341. } else if bs := addr.IP.To4(); bs != nil {
  342. return Address{IP: bs, Port: uint16(addr.Port)}
  343. } else if bs := addr.IP.To16(); bs != nil {
  344. return Address{IP: bs, Port: uint16(addr.Port)}
  345. }
  346. return Address{}
  347. }
  348. func resolveAddrs(addrs []string) []Address {
  349. var raddrs []Address
  350. for _, addrStr := range addrs {
  351. addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
  352. if err != nil {
  353. continue
  354. }
  355. addr := addrToAddr(addrRes)
  356. if len(addr.IP) > 0 {
  357. raddrs = append(raddrs, addr)
  358. } else {
  359. raddrs = append(raddrs, Address{Port: addr.Port})
  360. }
  361. }
  362. return raddrs
  363. }