local.go 8.3 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "context"
  9. "encoding/binary"
  10. "encoding/hex"
  11. "fmt"
  12. "io"
  13. "net"
  14. "net/url"
  15. "strconv"
  16. "time"
  17. "github.com/syncthing/syncthing/lib/beacon"
  18. "github.com/syncthing/syncthing/lib/events"
  19. "github.com/syncthing/syncthing/lib/protocol"
  20. "github.com/syncthing/syncthing/lib/rand"
  21. "github.com/syncthing/syncthing/lib/svcutil"
  22. "github.com/thejerf/suture/v4"
  23. )
  24. type localClient struct {
  25. *suture.Supervisor
  26. myID protocol.DeviceID
  27. addrList AddressLister
  28. name string
  29. evLogger events.Logger
  30. beacon beacon.Interface
  31. localBcastStart time.Time
  32. localBcastTick <-chan time.Time
  33. forcedBcastTick chan time.Time
  34. *cache
  35. }
  36. const (
  37. BroadcastInterval = 30 * time.Second
  38. CacheLifeTime = 3 * BroadcastInterval
  39. Magic = uint32(0x2EA7D90B) // same as in BEP
  40. v13Magic = uint32(0x7D79BC40) // previous version
  41. )
  42. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
  43. c := &localClient{
  44. Supervisor: suture.New("local", svcutil.SpecWithDebugLogger(l)),
  45. myID: id,
  46. addrList: addrList,
  47. evLogger: evLogger,
  48. localBcastTick: time.NewTicker(BroadcastInterval).C,
  49. forcedBcastTick: make(chan time.Time),
  50. localBcastStart: time.Now(),
  51. cache: newCache(),
  52. }
  53. host, port, err := net.SplitHostPort(addr)
  54. if err != nil {
  55. return nil, err
  56. }
  57. if host == "" {
  58. // A broadcast client
  59. c.name = "IPv4 local"
  60. bcPort, err := strconv.Atoi(port)
  61. if err != nil {
  62. return nil, err
  63. }
  64. c.beacon = beacon.NewBroadcast(bcPort)
  65. } else {
  66. // A multicast client
  67. c.name = "IPv6 local"
  68. c.beacon = beacon.NewMulticast(addr)
  69. }
  70. c.Add(c.beacon)
  71. c.Add(svcutil.AsService(c.recvAnnouncements, fmt.Sprintf("%s/recv", c)))
  72. c.Add(svcutil.AsService(c.sendLocalAnnouncements, fmt.Sprintf("%s/sendLocal", c)))
  73. return c, nil
  74. }
  75. // Lookup returns a list of addresses the device is available at.
  76. func (c *localClient) Lookup(_ context.Context, device protocol.DeviceID) (addresses []string, err error) {
  77. if cache, ok := c.Get(device); ok {
  78. if time.Since(cache.when) < CacheLifeTime {
  79. addresses = cache.Addresses
  80. }
  81. }
  82. return
  83. }
  84. func (c *localClient) String() string {
  85. return c.name
  86. }
  87. func (c *localClient) Error() error {
  88. return c.beacon.Error()
  89. }
  90. // announcementPkt appends the local discovery packet to send to msg. Returns
  91. // true if the packet should be sent, false if there is nothing useful to
  92. // send.
  93. func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, bool) {
  94. addrs := c.addrList.AllAddresses()
  95. // The list of all addresses can include unspecified addresses intended
  96. // for a discovery server to complete, based on the packet source. We
  97. // don't do that for local discovery, so filter out addresses that are
  98. // usable as-is.
  99. addrs = filterUnspecifiedLocal(addrs)
  100. // do not leak relay tokens to discovery
  101. addrs = sanitizeRelayAddresses(addrs)
  102. if len(addrs) == 0 {
  103. // Nothing to announce
  104. return msg, false
  105. }
  106. pkt := Announce{
  107. ID: c.myID,
  108. Addresses: addrs,
  109. InstanceID: instanceID,
  110. }
  111. bs, _ := pkt.Marshal()
  112. if pktLen := 4 + len(bs); cap(msg) < pktLen {
  113. msg = make([]byte, 0, pktLen)
  114. }
  115. msg = msg[:4]
  116. binary.BigEndian.PutUint32(msg, Magic)
  117. msg = append(msg, bs...)
  118. return msg, true
  119. }
  120. func (c *localClient) sendLocalAnnouncements(ctx context.Context) error {
  121. var msg []byte
  122. var ok bool
  123. instanceID := rand.Int63()
  124. for {
  125. if msg, ok = c.announcementPkt(instanceID, msg[:0]); ok {
  126. c.beacon.Send(msg)
  127. }
  128. select {
  129. case <-c.localBcastTick:
  130. case <-c.forcedBcastTick:
  131. case <-ctx.Done():
  132. return ctx.Err()
  133. }
  134. }
  135. }
  136. func (c *localClient) recvAnnouncements(ctx context.Context) error {
  137. b := c.beacon
  138. warnedAbout := make(map[string]bool)
  139. for {
  140. select {
  141. case <-ctx.Done():
  142. return ctx.Err()
  143. default:
  144. }
  145. buf, addr := b.Recv()
  146. if addr == nil {
  147. continue
  148. }
  149. if len(buf) < 4 {
  150. l.Debugf("discover: short packet from %s", addr.String())
  151. continue
  152. }
  153. magic := binary.BigEndian.Uint32(buf)
  154. switch magic {
  155. case Magic:
  156. // All good
  157. case v13Magic:
  158. // Old version
  159. if !warnedAbout[addr.String()] {
  160. l.Warnf("Incompatible (v0.13) local discovery packet from %v - upgrade that device to connect", addr)
  161. warnedAbout[addr.String()] = true
  162. }
  163. continue
  164. default:
  165. l.Debugf("discover: Incorrect magic %x from %s", magic, addr)
  166. continue
  167. }
  168. var pkt Announce
  169. err := pkt.Unmarshal(buf[4:])
  170. if err != nil && err != io.EOF {
  171. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  172. continue
  173. }
  174. l.Debugf("discover: Received local announcement from %s for %s", addr, pkt.ID)
  175. var newDevice bool
  176. if pkt.ID != c.myID {
  177. newDevice = c.registerDevice(addr, pkt)
  178. }
  179. if newDevice {
  180. // Force a transmit to announce ourselves, if we are ready to do
  181. // so right away.
  182. select {
  183. case c.forcedBcastTick <- time.Now():
  184. default:
  185. }
  186. }
  187. }
  188. }
  189. func (c *localClient) registerDevice(src net.Addr, device Announce) bool {
  190. // Remember whether we already had a valid cache entry for this device.
  191. // If the instance ID has changed the remote device has restarted since
  192. // we last heard from it, so we should treat it as a new device.
  193. ce, existsAlready := c.Get(device.ID)
  194. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime || ce.instanceID != device.InstanceID
  195. // Any empty or unspecified addresses should be set to the source address
  196. // of the announcement. We also skip any addresses we can't parse.
  197. l.Debugln("discover: Registering addresses for", device.ID)
  198. var validAddresses []string
  199. for _, addr := range device.Addresses {
  200. u, err := url.Parse(addr)
  201. if err != nil {
  202. continue
  203. }
  204. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  205. if err != nil {
  206. continue
  207. }
  208. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  209. srcAddr, err := net.ResolveTCPAddr("tcp", src.String())
  210. if err != nil {
  211. continue
  212. }
  213. // Do not use IPv6 source address if requested scheme is tcp4
  214. if u.Scheme == "tcp4" && srcAddr.IP.To4() == nil {
  215. continue
  216. }
  217. // Do not use IPv4 source address if requested scheme is tcp6
  218. if u.Scheme == "tcp6" && srcAddr.IP.To4() != nil {
  219. continue
  220. }
  221. host, _, err := net.SplitHostPort(src.String())
  222. if err != nil {
  223. continue
  224. }
  225. u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
  226. l.Debugf("discover: Reconstructed URL is %#v", u)
  227. validAddresses = append(validAddresses, u.String())
  228. l.Debugf("discover: Replaced address %v in %s to get %s", tcpAddr.IP, addr, u.String())
  229. } else {
  230. validAddresses = append(validAddresses, addr)
  231. l.Debugf("discover: Accepted address %s verbatim", addr)
  232. }
  233. }
  234. c.Set(device.ID, CacheEntry{
  235. Addresses: validAddresses,
  236. when: time.Now(),
  237. found: true,
  238. instanceID: device.InstanceID,
  239. })
  240. if isNewDevice {
  241. c.evLogger.Log(events.DeviceDiscovered, map[string]interface{}{
  242. "device": device.ID.String(),
  243. "addrs": validAddresses,
  244. })
  245. }
  246. return isNewDevice
  247. }
  248. // filterUnspecifiedLocal returns the list of addresses after removing any
  249. // unspecified, localhost, multicast, broadcast or port-zero addresses.
  250. func filterUnspecifiedLocal(addrs []string) []string {
  251. filtered := addrs[:0]
  252. for _, addr := range addrs {
  253. u, err := url.Parse(addr)
  254. if err != nil {
  255. continue
  256. }
  257. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  258. if err != nil {
  259. continue
  260. }
  261. switch {
  262. case len(tcpAddr.IP) == 0:
  263. case tcpAddr.Port == 0:
  264. case tcpAddr.IP.IsUnspecified():
  265. case !tcpAddr.IP.IsGlobalUnicast() && !tcpAddr.IP.IsLinkLocalUnicast():
  266. default:
  267. filtered = append(filtered, addr)
  268. }
  269. }
  270. return filtered
  271. }
  272. func sanitizeRelayAddresses(addrs []string) []string {
  273. filtered := addrs[:0]
  274. allowlist := []string{"id"}
  275. for _, addr := range addrs {
  276. u, err := url.Parse(addr)
  277. if err != nil {
  278. continue
  279. }
  280. if u.Scheme == "relay" {
  281. s := url.Values{}
  282. q := u.Query()
  283. for _, w := range allowlist {
  284. if q.Has(w) {
  285. s.Add(w, q.Get(w))
  286. }
  287. }
  288. u.RawQuery = s.Encode()
  289. addr = u.String()
  290. }
  291. filtered = append(filtered, addr)
  292. }
  293. return filtered
  294. }