local.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "encoding/hex"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "log/slog"
  16. "net"
  17. "net/url"
  18. "strconv"
  19. "time"
  20. "github.com/thejerf/suture/v4"
  21. "google.golang.org/protobuf/proto"
  22. "github.com/syncthing/syncthing/internal/gen/discoproto"
  23. "github.com/syncthing/syncthing/internal/slogutil"
  24. "github.com/syncthing/syncthing/lib/beacon"
  25. "github.com/syncthing/syncthing/lib/events"
  26. "github.com/syncthing/syncthing/lib/protocol"
  27. "github.com/syncthing/syncthing/lib/rand"
  28. "github.com/syncthing/syncthing/lib/svcutil"
  29. )
  30. type localClient struct {
  31. *suture.Supervisor
  32. *cache
  33. myID protocol.DeviceID
  34. addrList AddressLister
  35. name string
  36. evLogger events.Logger
  37. beacon beacon.Interface
  38. localBcastStart time.Time
  39. localBcastTick <-chan time.Time
  40. forcedBcastTick chan time.Time
  41. }
  42. const (
  43. BroadcastInterval = 30 * time.Second
  44. CacheLifeTime = 3 * BroadcastInterval
  45. Magic = uint32(0x2EA7D90B) // same as in BEP
  46. v13Magic = uint32(0x7D79BC40) // previous version
  47. )
  48. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
  49. c := &localClient{
  50. Supervisor: suture.New("local", svcutil.SpecWithDebugLogger()),
  51. myID: id,
  52. addrList: addrList,
  53. evLogger: evLogger,
  54. localBcastTick: time.NewTicker(BroadcastInterval).C,
  55. forcedBcastTick: make(chan time.Time),
  56. localBcastStart: time.Now(),
  57. cache: newCache(),
  58. }
  59. host, port, err := net.SplitHostPort(addr)
  60. if err != nil {
  61. return nil, err
  62. }
  63. if host == "" {
  64. // A broadcast client
  65. c.name = "IPv4 local"
  66. bcPort, err := strconv.Atoi(port)
  67. if err != nil {
  68. return nil, err
  69. }
  70. c.beacon = beacon.NewBroadcast(bcPort)
  71. } else {
  72. // A multicast client
  73. c.name = "IPv6 local"
  74. c.beacon = beacon.NewMulticast(addr)
  75. }
  76. c.Add(c.beacon)
  77. c.Add(svcutil.AsService(c.recvAnnouncements, fmt.Sprintf("%s/recv", c)))
  78. c.Add(svcutil.AsService(c.sendLocalAnnouncements, fmt.Sprintf("%s/sendLocal", c)))
  79. return c, nil
  80. }
  81. // Lookup returns a list of addresses the device is available at.
  82. func (c *localClient) Lookup(_ context.Context, device protocol.DeviceID) (addresses []string, err error) {
  83. if cache, ok := c.Get(device); ok {
  84. if time.Since(cache.when) < CacheLifeTime {
  85. addresses = cache.Addresses
  86. }
  87. }
  88. return
  89. }
  90. func (c *localClient) String() string {
  91. return c.name
  92. }
  93. func (c *localClient) Error() error {
  94. return c.beacon.Error()
  95. }
  96. // announcementPkt appends the local discovery packet to send to msg. Returns
  97. // true if the packet should be sent, false if there is nothing useful to
  98. // send.
  99. func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, bool) {
  100. addrs := c.addrList.AllAddresses()
  101. // remove all addresses which are not dialable
  102. addrs = filterUndialableLocal(addrs)
  103. // do not leak relay tokens to discovery
  104. addrs = sanitizeRelayAddresses(addrs)
  105. if len(addrs) == 0 {
  106. // Nothing to announce
  107. return msg, false
  108. }
  109. pkt := &discoproto.Announce{
  110. Id: c.myID[:],
  111. Addresses: addrs,
  112. InstanceId: instanceID,
  113. }
  114. bs, _ := proto.Marshal(pkt)
  115. if pktLen := 4 + len(bs); cap(msg) < pktLen {
  116. msg = make([]byte, 0, pktLen)
  117. }
  118. msg = msg[:4]
  119. binary.BigEndian.PutUint32(msg, Magic)
  120. msg = append(msg, bs...)
  121. return msg, true
  122. }
  123. func (c *localClient) sendLocalAnnouncements(ctx context.Context) error {
  124. var msg []byte
  125. var ok bool
  126. instanceID := rand.Int63()
  127. for {
  128. if msg, ok = c.announcementPkt(instanceID, msg[:0]); ok {
  129. c.beacon.Send(msg)
  130. }
  131. select {
  132. case <-c.localBcastTick:
  133. case <-c.forcedBcastTick:
  134. case <-ctx.Done():
  135. return ctx.Err()
  136. }
  137. }
  138. }
  139. func (c *localClient) recvAnnouncements(ctx context.Context) error {
  140. b := c.beacon
  141. warnedAbout := make(map[string]bool)
  142. for {
  143. select {
  144. case <-ctx.Done():
  145. return ctx.Err()
  146. default:
  147. }
  148. buf, addr := b.Recv()
  149. if addr == nil {
  150. continue
  151. }
  152. if len(buf) < 4 {
  153. slog.DebugContext(ctx, "received short packet", "address", addr.String())
  154. continue
  155. }
  156. magic := binary.BigEndian.Uint32(buf)
  157. switch magic {
  158. case Magic:
  159. // All good
  160. case v13Magic:
  161. // Old version
  162. if !warnedAbout[addr.String()] {
  163. slog.ErrorContext(ctx, "Incompatible (v0.13) local discovery packet - upgrade that device to connect", slogutil.Address(addr))
  164. warnedAbout[addr.String()] = true
  165. }
  166. continue
  167. default:
  168. slog.DebugContext(ctx, "Incorrect magic", "magic", magic, "address", addr)
  169. continue
  170. }
  171. var pkt discoproto.Announce
  172. err := proto.Unmarshal(buf[4:], &pkt)
  173. if err != nil && !errors.Is(err, io.EOF) {
  174. slog.DebugContext(ctx, "Failed to unmarshal local announcement", "address", addr, slogutil.Error(err), "packet", hex.Dump(buf[4:]))
  175. continue
  176. }
  177. id, _ := protocol.DeviceIDFromBytes(pkt.Id)
  178. slog.DebugContext(ctx, "Received local announcement", "address", addr, "device", id)
  179. var newDevice bool
  180. if !bytes.Equal(pkt.Id, c.myID[:]) {
  181. newDevice = c.registerDevice(addr, &pkt)
  182. }
  183. if newDevice {
  184. // Force a transmit to announce ourselves, if we are ready to do
  185. // so right away.
  186. select {
  187. case c.forcedBcastTick <- time.Now():
  188. default:
  189. }
  190. }
  191. }
  192. }
  193. func (c *localClient) registerDevice(src net.Addr, device *discoproto.Announce) bool {
  194. // Remember whether we already had a valid cache entry for this device.
  195. // If the instance ID has changed the remote device has restarted since
  196. // we last heard from it, so we should treat it as a new device.
  197. id, err := protocol.DeviceIDFromBytes(device.Id)
  198. if err != nil {
  199. l.Debugf("discover: Failed to parse device ID %x: %v", device.Id, err)
  200. return false
  201. }
  202. ce, existsAlready := c.Get(id)
  203. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime || ce.instanceID != device.InstanceId
  204. // Any empty or unspecified addresses should be set to the source address
  205. // of the announcement. We also skip any addresses we can't parse.
  206. l.Debugln("discover: Registering addresses for", id)
  207. var validAddresses []string
  208. for _, addr := range device.Addresses {
  209. u, err := url.Parse(addr)
  210. if err != nil {
  211. continue
  212. }
  213. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  214. if err != nil {
  215. continue
  216. }
  217. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  218. srcAddr, err := net.ResolveTCPAddr("tcp", src.String())
  219. if err != nil {
  220. continue
  221. }
  222. // Do not use IPv6 source address if requested scheme is tcp4
  223. if u.Scheme == "tcp4" && srcAddr.IP.To4() == nil {
  224. continue
  225. }
  226. // Do not use IPv4 source address if requested scheme is tcp6
  227. if u.Scheme == "tcp6" && srcAddr.IP.To4() != nil {
  228. continue
  229. }
  230. host, _, err := net.SplitHostPort(src.String())
  231. if err != nil {
  232. continue
  233. }
  234. u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
  235. l.Debugf("discover: Reconstructed URL is %v", u)
  236. validAddresses = append(validAddresses, u.String())
  237. l.Debugf("discover: Replaced address %v in %s to get %s", tcpAddr.IP, addr, u.String())
  238. } else {
  239. validAddresses = append(validAddresses, addr)
  240. l.Debugf("discover: Accepted address %s verbatim", addr)
  241. }
  242. }
  243. c.Set(id, CacheEntry{
  244. Addresses: validAddresses,
  245. when: time.Now(),
  246. found: true,
  247. instanceID: device.InstanceId,
  248. })
  249. if isNewDevice {
  250. c.evLogger.Log(events.DeviceDiscovered, map[string]interface{}{
  251. "device": id.String(),
  252. "addrs": validAddresses,
  253. })
  254. }
  255. return isNewDevice
  256. }
  257. // filterUndialableLocal returns the list of addresses after removing any
  258. // localhost, multicast, broadcast or port-zero addresses.
  259. func filterUndialableLocal(addrs []string) []string {
  260. filtered := addrs[:0]
  261. for _, addr := range addrs {
  262. u, err := url.Parse(addr)
  263. if err != nil {
  264. continue
  265. }
  266. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  267. if err != nil {
  268. continue
  269. }
  270. switch {
  271. case len(tcpAddr.IP) == 0:
  272. case tcpAddr.Port == 0:
  273. case tcpAddr.IP.IsGlobalUnicast(), tcpAddr.IP.IsLinkLocalUnicast(), tcpAddr.IP.IsUnspecified():
  274. filtered = append(filtered, addr)
  275. }
  276. }
  277. return filtered
  278. }
  279. func sanitizeRelayAddresses(addrs []string) []string {
  280. filtered := addrs[:0]
  281. allowlist := []string{"id"}
  282. for _, addr := range addrs {
  283. u, err := url.Parse(addr)
  284. if err != nil {
  285. continue
  286. }
  287. if u.Scheme == "relay" {
  288. s := url.Values{}
  289. q := u.Query()
  290. for _, w := range allowlist {
  291. if q.Has(w) {
  292. s.Add(w, q.Get(w))
  293. }
  294. }
  295. u.RawQuery = s.Encode()
  296. addr = u.String()
  297. }
  298. filtered = append(filtered, addr)
  299. }
  300. return filtered
  301. }