local.go 6.9 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate go run ../../proto/scripts/protofmt.go local.proto
  7. //go:generate protoc -I ../../ -I . --gogofast_out=. local.proto
  8. package discover
  9. import (
  10. "context"
  11. "encoding/binary"
  12. "encoding/hex"
  13. "fmt"
  14. "io"
  15. "net"
  16. "net/url"
  17. "strconv"
  18. "time"
  19. "github.com/syncthing/syncthing/lib/beacon"
  20. "github.com/syncthing/syncthing/lib/events"
  21. "github.com/syncthing/syncthing/lib/protocol"
  22. "github.com/syncthing/syncthing/lib/rand"
  23. "github.com/syncthing/syncthing/lib/util"
  24. "github.com/thejerf/suture"
  25. )
  26. type localClient struct {
  27. *suture.Supervisor
  28. myID protocol.DeviceID
  29. addrList AddressLister
  30. name string
  31. evLogger events.Logger
  32. beacon beacon.Interface
  33. localBcastStart time.Time
  34. localBcastTick <-chan time.Time
  35. forcedBcastTick chan time.Time
  36. *cache
  37. }
  38. const (
  39. BroadcastInterval = 30 * time.Second
  40. CacheLifeTime = 3 * BroadcastInterval
  41. Magic = uint32(0x2EA7D90B) // same as in BEP
  42. v13Magic = uint32(0x7D79BC40) // previous version
  43. )
  44. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
  45. c := &localClient{
  46. Supervisor: suture.New("local", suture.Spec{
  47. PassThroughPanics: true,
  48. }),
  49. myID: id,
  50. addrList: addrList,
  51. evLogger: evLogger,
  52. localBcastTick: time.NewTicker(BroadcastInterval).C,
  53. forcedBcastTick: make(chan time.Time),
  54. localBcastStart: time.Now(),
  55. cache: newCache(),
  56. }
  57. host, port, err := net.SplitHostPort(addr)
  58. if err != nil {
  59. return nil, err
  60. }
  61. if len(host) == 0 {
  62. // A broadcast client
  63. c.name = "IPv4 local"
  64. bcPort, err := strconv.Atoi(port)
  65. if err != nil {
  66. return nil, err
  67. }
  68. c.beacon = beacon.NewBroadcast(bcPort)
  69. } else {
  70. // A multicast client
  71. c.name = "IPv6 local"
  72. c.beacon = beacon.NewMulticast(addr)
  73. }
  74. c.Add(c.beacon)
  75. c.Add(util.AsService(c.recvAnnouncements, fmt.Sprintf("%s/recv", c)))
  76. c.Add(util.AsService(c.sendLocalAnnouncements, fmt.Sprintf("%s/sendLocal", c)))
  77. return c, nil
  78. }
  79. // Lookup returns a list of addresses the device is available at.
  80. func (c *localClient) Lookup(_ context.Context, device protocol.DeviceID) (addresses []string, err error) {
  81. if cache, ok := c.Get(device); ok {
  82. if time.Since(cache.when) < CacheLifeTime {
  83. addresses = cache.Addresses
  84. }
  85. }
  86. return
  87. }
  88. func (c *localClient) String() string {
  89. return c.name
  90. }
  91. func (c *localClient) Error() error {
  92. return c.beacon.Error()
  93. }
  94. // announcementPkt appends the local discovery packet to send to msg. Returns
  95. // true if the packet should be sent, false if there is nothing useful to
  96. // send.
  97. func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, bool) {
  98. addrs := c.addrList.AllAddresses()
  99. if len(addrs) == 0 {
  100. // Nothing to announce
  101. return msg, false
  102. }
  103. if cap(msg) >= 4 {
  104. msg = msg[:4]
  105. } else {
  106. msg = make([]byte, 4)
  107. }
  108. binary.BigEndian.PutUint32(msg, Magic)
  109. pkt := Announce{
  110. ID: c.myID,
  111. Addresses: addrs,
  112. InstanceID: instanceID,
  113. }
  114. bs, _ := pkt.Marshal()
  115. msg = append(msg, bs...)
  116. return msg, true
  117. }
  118. func (c *localClient) sendLocalAnnouncements(ctx context.Context) {
  119. var msg []byte
  120. var ok bool
  121. instanceID := rand.Int63()
  122. for {
  123. if msg, ok = c.announcementPkt(instanceID, msg[:0]); ok {
  124. c.beacon.Send(msg)
  125. }
  126. select {
  127. case <-c.localBcastTick:
  128. case <-c.forcedBcastTick:
  129. case <-ctx.Done():
  130. return
  131. }
  132. }
  133. }
  134. func (c *localClient) recvAnnouncements(ctx context.Context) {
  135. b := c.beacon
  136. warnedAbout := make(map[string]bool)
  137. for {
  138. select {
  139. case <-ctx.Done():
  140. return
  141. default:
  142. }
  143. buf, addr := b.Recv()
  144. if addr == nil {
  145. continue
  146. }
  147. if len(buf) < 4 {
  148. l.Debugf("discover: short packet from %s", addr.String())
  149. continue
  150. }
  151. magic := binary.BigEndian.Uint32(buf)
  152. switch magic {
  153. case Magic:
  154. // All good
  155. case v13Magic:
  156. // Old version
  157. if !warnedAbout[addr.String()] {
  158. l.Warnf("Incompatible (v0.13) local discovery packet from %v - upgrade that device to connect", addr)
  159. warnedAbout[addr.String()] = true
  160. }
  161. continue
  162. default:
  163. l.Debugf("discover: Incorrect magic %x from %s", magic, addr)
  164. continue
  165. }
  166. var pkt Announce
  167. err := pkt.Unmarshal(buf[4:])
  168. if err != nil && err != io.EOF {
  169. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  170. continue
  171. }
  172. l.Debugf("discover: Received local announcement from %s for %s", addr, pkt.ID)
  173. var newDevice bool
  174. if pkt.ID != c.myID {
  175. newDevice = c.registerDevice(addr, pkt)
  176. }
  177. if newDevice {
  178. // Force a transmit to announce ourselves, if we are ready to do
  179. // so right away.
  180. select {
  181. case c.forcedBcastTick <- time.Now():
  182. default:
  183. }
  184. }
  185. }
  186. }
  187. func (c *localClient) registerDevice(src net.Addr, device Announce) bool {
  188. // Remember whether we already had a valid cache entry for this device.
  189. // If the instance ID has changed the remote device has restarted since
  190. // we last heard from it, so we should treat it as a new device.
  191. ce, existsAlready := c.Get(device.ID)
  192. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime || ce.instanceID != device.InstanceID
  193. // Any empty or unspecified addresses should be set to the source address
  194. // of the announcement. We also skip any addresses we can't parse.
  195. l.Debugln("discover: Registering addresses for", device.ID)
  196. var validAddresses []string
  197. for _, addr := range device.Addresses {
  198. u, err := url.Parse(addr)
  199. if err != nil {
  200. continue
  201. }
  202. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  203. if err != nil {
  204. continue
  205. }
  206. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  207. srcAddr, err := net.ResolveTCPAddr("tcp", src.String())
  208. if err != nil {
  209. continue
  210. }
  211. // Do not use IPv6 source address if requested scheme is tcp4
  212. if u.Scheme == "tcp4" && srcAddr.IP.To4() == nil {
  213. continue
  214. }
  215. // Do not use IPv4 source address if requested scheme is tcp6
  216. if u.Scheme == "tcp6" && srcAddr.IP.To4() != nil {
  217. continue
  218. }
  219. host, _, err := net.SplitHostPort(src.String())
  220. if err != nil {
  221. continue
  222. }
  223. u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
  224. l.Debugf("discover: Reconstructed URL is %#v", u)
  225. validAddresses = append(validAddresses, u.String())
  226. l.Debugf("discover: Replaced address %v in %s to get %s", tcpAddr.IP, addr, u.String())
  227. } else {
  228. validAddresses = append(validAddresses, addr)
  229. l.Debugf("discover: Accepted address %s verbatim", addr)
  230. }
  231. }
  232. c.Set(device.ID, CacheEntry{
  233. Addresses: validAddresses,
  234. when: time.Now(),
  235. found: true,
  236. instanceID: device.InstanceID,
  237. })
  238. if isNewDevice {
  239. c.evLogger.Log(events.DeviceDiscovered, map[string]interface{}{
  240. "device": device.ID.String(),
  241. "addrs": validAddresses,
  242. })
  243. }
  244. return isNewDevice
  245. }