local.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "io"
  12. "net"
  13. "net/url"
  14. "strconv"
  15. "time"
  16. "github.com/syncthing/syncthing/lib/beacon"
  17. "github.com/syncthing/syncthing/lib/events"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/thejerf/suture"
  20. )
  21. type localClient struct {
  22. *suture.Supervisor
  23. myID protocol.DeviceID
  24. addrList AddressLister
  25. relayStat RelayStatusProvider
  26. name string
  27. beacon beacon.Interface
  28. localBcastStart time.Time
  29. localBcastTick <-chan time.Time
  30. forcedBcastTick chan time.Time
  31. *cache
  32. }
  33. const (
  34. BroadcastInterval = 30 * time.Second
  35. CacheLifeTime = 3 * BroadcastInterval
  36. )
  37. var (
  38. ErrIncorrectMagic = errors.New("incorrect magic number")
  39. )
  40. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, relayStat RelayStatusProvider) (FinderService, error) {
  41. c := &localClient{
  42. Supervisor: suture.NewSimple("local"),
  43. myID: id,
  44. addrList: addrList,
  45. relayStat: relayStat,
  46. localBcastTick: time.Tick(BroadcastInterval),
  47. forcedBcastTick: make(chan time.Time),
  48. localBcastStart: time.Now(),
  49. cache: newCache(),
  50. }
  51. host, port, err := net.SplitHostPort(addr)
  52. if err != nil {
  53. return nil, err
  54. }
  55. if len(host) == 0 {
  56. // A broadcast client
  57. c.name = "IPv4 local"
  58. bcPort, err := strconv.Atoi(port)
  59. if err != nil {
  60. return nil, err
  61. }
  62. c.startLocalIPv4Broadcasts(bcPort)
  63. } else {
  64. // A multicast client
  65. c.name = "IPv6 local"
  66. c.startLocalIPv6Multicasts(addr)
  67. }
  68. go c.sendLocalAnnouncements()
  69. return c, nil
  70. }
  71. func (c *localClient) startLocalIPv4Broadcasts(localPort int) {
  72. c.beacon = beacon.NewBroadcast(localPort)
  73. c.Add(c.beacon)
  74. go c.recvAnnouncements(c.beacon)
  75. }
  76. func (c *localClient) startLocalIPv6Multicasts(localMCAddr string) {
  77. c.beacon = beacon.NewMulticast(localMCAddr)
  78. c.Add(c.beacon)
  79. go c.recvAnnouncements(c.beacon)
  80. }
  81. // Lookup returns a list of addresses the device is available at. Local
  82. // discovery never returns relays.
  83. func (c *localClient) Lookup(device protocol.DeviceID) (direct []string, relays []Relay, err error) {
  84. if cache, ok := c.Get(device); ok {
  85. if time.Since(cache.when) < CacheLifeTime {
  86. direct = cache.Direct
  87. relays = cache.Relays
  88. }
  89. }
  90. return
  91. }
  92. func (c *localClient) String() string {
  93. return c.name
  94. }
  95. func (c *localClient) Error() error {
  96. return c.beacon.Error()
  97. }
  98. func (c *localClient) announcementPkt() Announce {
  99. var addrs []Address
  100. for _, addr := range c.addrList.AllAddresses() {
  101. addrs = append(addrs, Address{
  102. URL: addr,
  103. })
  104. }
  105. var relays []Relay
  106. if c.relayStat != nil {
  107. for _, relay := range c.relayStat.Relays() {
  108. latency, ok := c.relayStat.RelayStatus(relay)
  109. if ok {
  110. relays = append(relays, Relay{
  111. URL: relay,
  112. Latency: int32(latency / time.Millisecond),
  113. })
  114. }
  115. }
  116. }
  117. return Announce{
  118. Magic: AnnouncementMagic,
  119. This: Device{
  120. ID: c.myID[:],
  121. Addresses: addrs,
  122. Relays: relays,
  123. },
  124. }
  125. }
  126. func (c *localClient) sendLocalAnnouncements() {
  127. var pkt = c.announcementPkt()
  128. msg := pkt.MustMarshalXDR()
  129. for {
  130. c.beacon.Send(msg)
  131. select {
  132. case <-c.localBcastTick:
  133. case <-c.forcedBcastTick:
  134. }
  135. }
  136. }
  137. func (c *localClient) recvAnnouncements(b beacon.Interface) {
  138. for {
  139. buf, addr := b.Recv()
  140. var pkt Announce
  141. err := pkt.UnmarshalXDR(buf)
  142. if err != nil && err != io.EOF {
  143. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  144. continue
  145. }
  146. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  147. var newDevice bool
  148. if !bytes.Equal(pkt.This.ID, c.myID[:]) {
  149. newDevice = c.registerDevice(addr, pkt.This)
  150. }
  151. if newDevice {
  152. select {
  153. case c.forcedBcastTick <- time.Now():
  154. }
  155. }
  156. }
  157. }
  158. func (c *localClient) registerDevice(src net.Addr, device Device) bool {
  159. var id protocol.DeviceID
  160. copy(id[:], device.ID)
  161. // Remember whether we already had a valid cache entry for this device.
  162. ce, existsAlready := c.Get(id)
  163. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime
  164. // Any empty or unspecified addresses should be set to the source address
  165. // of the announcement. We also skip any addresses we can't parse.
  166. l.Debugln("discover: Registering addresses for", id)
  167. var validAddresses []string
  168. for _, addr := range device.Addresses {
  169. u, err := url.Parse(addr.URL)
  170. if err != nil {
  171. continue
  172. }
  173. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  174. if err != nil {
  175. continue
  176. }
  177. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  178. host, _, err := net.SplitHostPort(src.String())
  179. if err != nil {
  180. continue
  181. }
  182. u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
  183. l.Debugf("discover: Reconstructed URL is %#v", u)
  184. validAddresses = append(validAddresses, u.String())
  185. l.Debugf("discover: Replaced address %v in %s to get %s", tcpAddr.IP, addr.URL, u.String())
  186. } else {
  187. validAddresses = append(validAddresses, addr.URL)
  188. l.Debugf("discover: Accepted address %s verbatim", addr.URL)
  189. }
  190. }
  191. c.Set(id, CacheEntry{
  192. Direct: validAddresses,
  193. Relays: device.Relays,
  194. when: time.Now(),
  195. found: true,
  196. })
  197. if isNewDevice {
  198. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  199. "device": id.String(),
  200. "addrs": validAddresses,
  201. "relays": device.Relays,
  202. })
  203. }
  204. return isNewDevice
  205. }