local.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "encoding/hex"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "net"
  16. "net/url"
  17. "strconv"
  18. "time"
  19. "github.com/thejerf/suture/v4"
  20. "google.golang.org/protobuf/proto"
  21. "github.com/syncthing/syncthing/internal/gen/discoproto"
  22. "github.com/syncthing/syncthing/lib/beacon"
  23. "github.com/syncthing/syncthing/lib/events"
  24. "github.com/syncthing/syncthing/lib/protocol"
  25. "github.com/syncthing/syncthing/lib/rand"
  26. "github.com/syncthing/syncthing/lib/svcutil"
  27. )
  28. type localClient struct {
  29. *suture.Supervisor
  30. myID protocol.DeviceID
  31. addrList AddressLister
  32. name string
  33. evLogger events.Logger
  34. beacon beacon.Interface
  35. localBcastStart time.Time
  36. localBcastTick <-chan time.Time
  37. forcedBcastTick chan time.Time
  38. *cache
  39. }
  40. const (
  41. BroadcastInterval = 30 * time.Second
  42. CacheLifeTime = 3 * BroadcastInterval
  43. Magic = uint32(0x2EA7D90B) // same as in BEP
  44. v13Magic = uint32(0x7D79BC40) // previous version
  45. )
  46. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
  47. c := &localClient{
  48. Supervisor: suture.New("local", svcutil.SpecWithDebugLogger(l)),
  49. myID: id,
  50. addrList: addrList,
  51. evLogger: evLogger,
  52. localBcastTick: time.NewTicker(BroadcastInterval).C,
  53. forcedBcastTick: make(chan time.Time),
  54. localBcastStart: time.Now(),
  55. cache: newCache(),
  56. }
  57. host, port, err := net.SplitHostPort(addr)
  58. if err != nil {
  59. return nil, err
  60. }
  61. if host == "" {
  62. // A broadcast client
  63. c.name = "IPv4 local"
  64. bcPort, err := strconv.Atoi(port)
  65. if err != nil {
  66. return nil, err
  67. }
  68. c.beacon = beacon.NewBroadcast(bcPort)
  69. } else {
  70. // A multicast client
  71. c.name = "IPv6 local"
  72. c.beacon = beacon.NewMulticast(addr)
  73. }
  74. c.Add(c.beacon)
  75. c.Add(svcutil.AsService(c.recvAnnouncements, fmt.Sprintf("%s/recv", c)))
  76. c.Add(svcutil.AsService(c.sendLocalAnnouncements, fmt.Sprintf("%s/sendLocal", c)))
  77. return c, nil
  78. }
  79. // Lookup returns a list of addresses the device is available at.
  80. func (c *localClient) Lookup(_ context.Context, device protocol.DeviceID) (addresses []string, err error) {
  81. if cache, ok := c.Get(device); ok {
  82. if time.Since(cache.when) < CacheLifeTime {
  83. addresses = cache.Addresses
  84. }
  85. }
  86. return
  87. }
  88. func (c *localClient) String() string {
  89. return c.name
  90. }
  91. func (c *localClient) Error() error {
  92. return c.beacon.Error()
  93. }
  94. // announcementPkt appends the local discovery packet to send to msg. Returns
  95. // true if the packet should be sent, false if there is nothing useful to
  96. // send.
  97. func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, bool) {
  98. addrs := c.addrList.AllAddresses()
  99. // remove all addresses which are not dialable
  100. addrs = filterUndialableLocal(addrs)
  101. // do not leak relay tokens to discovery
  102. addrs = sanitizeRelayAddresses(addrs)
  103. if len(addrs) == 0 {
  104. // Nothing to announce
  105. return msg, false
  106. }
  107. pkt := &discoproto.Announce{
  108. Id: c.myID[:],
  109. Addresses: addrs,
  110. InstanceId: instanceID,
  111. }
  112. bs, _ := proto.Marshal(pkt)
  113. if pktLen := 4 + len(bs); cap(msg) < pktLen {
  114. msg = make([]byte, 0, pktLen)
  115. }
  116. msg = msg[:4]
  117. binary.BigEndian.PutUint32(msg, Magic)
  118. msg = append(msg, bs...)
  119. return msg, true
  120. }
  121. func (c *localClient) sendLocalAnnouncements(ctx context.Context) error {
  122. var msg []byte
  123. var ok bool
  124. instanceID := rand.Int63()
  125. for {
  126. if msg, ok = c.announcementPkt(instanceID, msg[:0]); ok {
  127. c.beacon.Send(msg)
  128. }
  129. select {
  130. case <-c.localBcastTick:
  131. case <-c.forcedBcastTick:
  132. case <-ctx.Done():
  133. return ctx.Err()
  134. }
  135. }
  136. }
  137. func (c *localClient) recvAnnouncements(ctx context.Context) error {
  138. b := c.beacon
  139. warnedAbout := make(map[string]bool)
  140. for {
  141. select {
  142. case <-ctx.Done():
  143. return ctx.Err()
  144. default:
  145. }
  146. buf, addr := b.Recv()
  147. if addr == nil {
  148. continue
  149. }
  150. if len(buf) < 4 {
  151. l.Debugf("discover: short packet from %s", addr.String())
  152. continue
  153. }
  154. magic := binary.BigEndian.Uint32(buf)
  155. switch magic {
  156. case Magic:
  157. // All good
  158. case v13Magic:
  159. // Old version
  160. if !warnedAbout[addr.String()] {
  161. l.Warnf("Incompatible (v0.13) local discovery packet from %v - upgrade that device to connect", addr)
  162. warnedAbout[addr.String()] = true
  163. }
  164. continue
  165. default:
  166. l.Debugf("discover: Incorrect magic %x from %s", magic, addr)
  167. continue
  168. }
  169. var pkt discoproto.Announce
  170. err := proto.Unmarshal(buf[4:], &pkt)
  171. if err != nil && !errors.Is(err, io.EOF) {
  172. l.Debugf("discover: Failed to unmarshal local announcement from %s (%s):\n%s", addr, err, hex.Dump(buf[4:]))
  173. continue
  174. }
  175. id, _ := protocol.DeviceIDFromBytes(pkt.Id)
  176. l.Debugf("discover: Received local announcement from %s for %s", addr, id)
  177. var newDevice bool
  178. if !bytes.Equal(pkt.Id, c.myID[:]) {
  179. newDevice = c.registerDevice(addr, &pkt)
  180. }
  181. if newDevice {
  182. // Force a transmit to announce ourselves, if we are ready to do
  183. // so right away.
  184. select {
  185. case c.forcedBcastTick <- time.Now():
  186. default:
  187. }
  188. }
  189. }
  190. }
  191. func (c *localClient) registerDevice(src net.Addr, device *discoproto.Announce) bool {
  192. // Remember whether we already had a valid cache entry for this device.
  193. // If the instance ID has changed the remote device has restarted since
  194. // we last heard from it, so we should treat it as a new device.
  195. id, err := protocol.DeviceIDFromBytes(device.Id)
  196. if err != nil {
  197. l.Debugf("discover: Failed to parse device ID %x: %v", device.Id, err)
  198. return false
  199. }
  200. ce, existsAlready := c.Get(id)
  201. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime || ce.instanceID != device.InstanceId
  202. // Any empty or unspecified addresses should be set to the source address
  203. // of the announcement. We also skip any addresses we can't parse.
  204. l.Debugln("discover: Registering addresses for", id)
  205. var validAddresses []string
  206. for _, addr := range device.Addresses {
  207. u, err := url.Parse(addr)
  208. if err != nil {
  209. continue
  210. }
  211. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  212. if err != nil {
  213. continue
  214. }
  215. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  216. srcAddr, err := net.ResolveTCPAddr("tcp", src.String())
  217. if err != nil {
  218. continue
  219. }
  220. // Do not use IPv6 source address if requested scheme is tcp4
  221. if u.Scheme == "tcp4" && srcAddr.IP.To4() == nil {
  222. continue
  223. }
  224. // Do not use IPv4 source address if requested scheme is tcp6
  225. if u.Scheme == "tcp6" && srcAddr.IP.To4() != nil {
  226. continue
  227. }
  228. host, _, err := net.SplitHostPort(src.String())
  229. if err != nil {
  230. continue
  231. }
  232. u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
  233. l.Debugf("discover: Reconstructed URL is %v", u)
  234. validAddresses = append(validAddresses, u.String())
  235. l.Debugf("discover: Replaced address %v in %s to get %s", tcpAddr.IP, addr, u.String())
  236. } else {
  237. validAddresses = append(validAddresses, addr)
  238. l.Debugf("discover: Accepted address %s verbatim", addr)
  239. }
  240. }
  241. c.Set(id, CacheEntry{
  242. Addresses: validAddresses,
  243. when: time.Now(),
  244. found: true,
  245. instanceID: device.InstanceId,
  246. })
  247. if isNewDevice {
  248. c.evLogger.Log(events.DeviceDiscovered, map[string]interface{}{
  249. "device": id.String(),
  250. "addrs": validAddresses,
  251. })
  252. }
  253. return isNewDevice
  254. }
  255. // filterUndialableLocal returns the list of addresses after removing any
  256. // localhost, multicast, broadcast or port-zero addresses.
  257. func filterUndialableLocal(addrs []string) []string {
  258. filtered := addrs[:0]
  259. for _, addr := range addrs {
  260. u, err := url.Parse(addr)
  261. if err != nil {
  262. continue
  263. }
  264. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  265. if err != nil {
  266. continue
  267. }
  268. switch {
  269. case len(tcpAddr.IP) == 0:
  270. case tcpAddr.Port == 0:
  271. case tcpAddr.IP.IsGlobalUnicast(), tcpAddr.IP.IsLinkLocalUnicast(), tcpAddr.IP.IsUnspecified():
  272. filtered = append(filtered, addr)
  273. }
  274. }
  275. return filtered
  276. }
  277. func sanitizeRelayAddresses(addrs []string) []string {
  278. filtered := addrs[:0]
  279. allowlist := []string{"id"}
  280. for _, addr := range addrs {
  281. u, err := url.Parse(addr)
  282. if err != nil {
  283. continue
  284. }
  285. if u.Scheme == "relay" {
  286. s := url.Values{}
  287. q := u.Query()
  288. for _, w := range allowlist {
  289. if q.Has(w) {
  290. s.Add(w, q.Get(w))
  291. }
  292. }
  293. u.RawQuery = s.Encode()
  294. addr = u.String()
  295. }
  296. filtered = append(filtered, addr)
  297. }
  298. return filtered
  299. }