local.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate go run ../../script/protofmt.go local.proto
  7. //go:generate protoc -I ../../ -I . --gogofast_out=. local.proto
  8. package discover
  9. import (
  10. "encoding/binary"
  11. "encoding/hex"
  12. "io"
  13. "net"
  14. "net/url"
  15. "strconv"
  16. "time"
  17. "github.com/syncthing/syncthing/lib/beacon"
  18. "github.com/syncthing/syncthing/lib/events"
  19. "github.com/syncthing/syncthing/lib/protocol"
  20. "github.com/syncthing/syncthing/lib/rand"
  21. "github.com/thejerf/suture"
  22. )
  23. type localClient struct {
  24. *suture.Supervisor
  25. myID protocol.DeviceID
  26. addrList AddressLister
  27. name string
  28. beacon beacon.Interface
  29. localBcastStart time.Time
  30. localBcastTick <-chan time.Time
  31. forcedBcastTick chan time.Time
  32. *cache
  33. }
  34. const (
  35. BroadcastInterval = 30 * time.Second
  36. CacheLifeTime = 3 * BroadcastInterval
  37. Magic = uint32(0x2EA7D90B) // same as in BEP
  38. v13Magic = uint32(0x7D79BC40) // previous version
  39. )
  40. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister) (FinderService, error) {
  41. c := &localClient{
  42. Supervisor: suture.New("local", suture.Spec{
  43. PassThroughPanics: true,
  44. }),
  45. myID: id,
  46. addrList: addrList,
  47. localBcastTick: time.NewTicker(BroadcastInterval).C,
  48. forcedBcastTick: make(chan time.Time),
  49. localBcastStart: time.Now(),
  50. cache: newCache(),
  51. }
  52. host, port, err := net.SplitHostPort(addr)
  53. if err != nil {
  54. return nil, err
  55. }
  56. if len(host) == 0 {
  57. // A broadcast client
  58. c.name = "IPv4 local"
  59. bcPort, err := strconv.Atoi(port)
  60. if err != nil {
  61. return nil, err
  62. }
  63. c.startLocalIPv4Broadcasts(bcPort)
  64. } else {
  65. // A multicast client
  66. c.name = "IPv6 local"
  67. c.startLocalIPv6Multicasts(addr)
  68. }
  69. go c.sendLocalAnnouncements()
  70. return c, nil
  71. }
  72. func (c *localClient) startLocalIPv4Broadcasts(localPort int) {
  73. c.beacon = beacon.NewBroadcast(localPort)
  74. c.Add(c.beacon)
  75. go c.recvAnnouncements(c.beacon)
  76. }
  77. func (c *localClient) startLocalIPv6Multicasts(localMCAddr string) {
  78. c.beacon = beacon.NewMulticast(localMCAddr)
  79. c.Add(c.beacon)
  80. go c.recvAnnouncements(c.beacon)
  81. }
  82. // Lookup returns a list of addresses the device is available at.
  83. func (c *localClient) Lookup(device protocol.DeviceID) (addresses []string, err error) {
  84. if cache, ok := c.Get(device); ok {
  85. if time.Since(cache.when) < CacheLifeTime {
  86. addresses = cache.Addresses
  87. }
  88. }
  89. return
  90. }
  91. func (c *localClient) String() string {
  92. return c.name
  93. }
  94. func (c *localClient) Error() error {
  95. return c.beacon.Error()
  96. }
  97. // announcementPkt appends the local discovery packet to send to msg. Returns
  98. // true if the packet should be sent, false if there is nothing useful to
  99. // send.
  100. func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, bool) {
  101. addrs := c.addrList.AllAddresses()
  102. if len(addrs) == 0 {
  103. // Nothing to announce
  104. return msg, false
  105. }
  106. if cap(msg) >= 4 {
  107. msg = msg[:4]
  108. } else {
  109. msg = make([]byte, 4)
  110. }
  111. binary.BigEndian.PutUint32(msg, Magic)
  112. pkt := Announce{
  113. ID: c.myID,
  114. Addresses: addrs,
  115. InstanceID: instanceID,
  116. }
  117. bs, _ := pkt.Marshal()
  118. msg = append(msg, bs...)
  119. return msg, true
  120. }
  121. func (c *localClient) sendLocalAnnouncements() {
  122. var msg []byte
  123. var ok bool
  124. instanceID := rand.Int63()
  125. for {
  126. if msg, ok = c.announcementPkt(instanceID, msg[:0]); ok {
  127. c.beacon.Send(msg)
  128. }
  129. select {
  130. case <-c.localBcastTick:
  131. case <-c.forcedBcastTick:
  132. }
  133. }
  134. }
  135. func (c *localClient) recvAnnouncements(b beacon.Interface) {
  136. warnedAbout := make(map[string]bool)
  137. for {
  138. buf, addr := b.Recv()
  139. if len(buf) < 4 {
  140. l.Debugf("discover: short packet from %s")
  141. continue
  142. }
  143. magic := binary.BigEndian.Uint32(buf)
  144. switch magic {
  145. case Magic:
  146. // All good
  147. case v13Magic:
  148. // Old version
  149. if !warnedAbout[addr.String()] {
  150. l.Warnf("Incompatible (v0.13) local discovery packet from %v - upgrade that device to connect", addr)
  151. warnedAbout[addr.String()] = true
  152. }
  153. continue
  154. default:
  155. l.Debugf("discover: Incorrect magic %x from %s", magic, addr)
  156. continue
  157. }
  158. var pkt Announce
  159. err := pkt.Unmarshal(buf[4:])
  160. if err != nil && err != io.EOF {
  161. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  162. continue
  163. }
  164. l.Debugf("discover: Received local announcement from %s for %s", addr, pkt.ID)
  165. var newDevice bool
  166. if pkt.ID != c.myID {
  167. newDevice = c.registerDevice(addr, pkt)
  168. }
  169. if newDevice {
  170. // Force a transmit to announce ourselves, if we are ready to do
  171. // so right away.
  172. select {
  173. case c.forcedBcastTick <- time.Now():
  174. default:
  175. }
  176. }
  177. }
  178. }
  179. func (c *localClient) registerDevice(src net.Addr, device Announce) bool {
  180. // Remember whether we already had a valid cache entry for this device.
  181. // If the instance ID has changed the remote device has restarted since
  182. // we last heard from it, so we should treat it as a new device.
  183. ce, existsAlready := c.Get(device.ID)
  184. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime || ce.instanceID != device.InstanceID
  185. // Any empty or unspecified addresses should be set to the source address
  186. // of the announcement. We also skip any addresses we can't parse.
  187. l.Debugln("discover: Registering addresses for", device.ID)
  188. var validAddresses []string
  189. for _, addr := range device.Addresses {
  190. u, err := url.Parse(addr)
  191. if err != nil {
  192. continue
  193. }
  194. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  195. if err != nil {
  196. continue
  197. }
  198. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  199. srcAddr, err := net.ResolveTCPAddr("tcp", src.String())
  200. if err != nil {
  201. continue
  202. }
  203. // Do not use IPv6 source address if requested scheme is tcp4
  204. if u.Scheme == "tcp4" && srcAddr.IP.To4() == nil {
  205. continue
  206. }
  207. // Do not use IPv4 source address if requested scheme is tcp6
  208. if u.Scheme == "tcp6" && srcAddr.IP.To4() != nil {
  209. continue
  210. }
  211. host, _, err := net.SplitHostPort(src.String())
  212. if err != nil {
  213. continue
  214. }
  215. u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
  216. l.Debugf("discover: Reconstructed URL is %#v", u)
  217. validAddresses = append(validAddresses, u.String())
  218. l.Debugf("discover: Replaced address %v in %s to get %s", tcpAddr.IP, addr, u.String())
  219. } else {
  220. validAddresses = append(validAddresses, addr)
  221. l.Debugf("discover: Accepted address %s verbatim", addr)
  222. }
  223. }
  224. c.Set(device.ID, CacheEntry{
  225. Addresses: validAddresses,
  226. when: time.Now(),
  227. found: true,
  228. instanceID: device.InstanceID,
  229. })
  230. if isNewDevice {
  231. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  232. "device": device.ID.String(),
  233. "addrs": validAddresses,
  234. })
  235. }
  236. return isNewDevice
  237. }