local.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "io"
  12. "net"
  13. "net/url"
  14. "strconv"
  15. "time"
  16. "github.com/syncthing/syncthing/lib/beacon"
  17. "github.com/syncthing/syncthing/lib/events"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/thejerf/suture"
  20. )
  21. type localClient struct {
  22. *suture.Supervisor
  23. myID protocol.DeviceID
  24. addrList AddressLister
  25. relayStat RelayStatusProvider
  26. name string
  27. beacon beacon.Interface
  28. localBcastStart time.Time
  29. localBcastTick <-chan time.Time
  30. forcedBcastTick chan time.Time
  31. *cache
  32. }
  33. const (
  34. BroadcastInterval = 30 * time.Second
  35. CacheLifeTime = 3 * BroadcastInterval
  36. )
  37. var (
  38. ErrIncorrectMagic = errors.New("incorrect magic number")
  39. )
  40. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, relayStat RelayStatusProvider) (FinderService, error) {
  41. c := &localClient{
  42. Supervisor: suture.NewSimple("local"),
  43. myID: id,
  44. addrList: addrList,
  45. relayStat: relayStat,
  46. localBcastTick: time.Tick(BroadcastInterval),
  47. forcedBcastTick: make(chan time.Time),
  48. localBcastStart: time.Now(),
  49. cache: newCache(),
  50. }
  51. host, port, err := net.SplitHostPort(addr)
  52. if err != nil {
  53. return nil, err
  54. }
  55. if len(host) == 0 {
  56. // A broadcast client
  57. c.name = "IPv4 local"
  58. bcPort, err := strconv.Atoi(port)
  59. if err != nil {
  60. return nil, err
  61. }
  62. c.startLocalIPv4Broadcasts(bcPort)
  63. } else {
  64. // A multicast client
  65. c.name = "IPv6 local"
  66. c.startLocalIPv6Multicasts(addr)
  67. }
  68. go c.sendLocalAnnouncements()
  69. return c, nil
  70. }
  71. func (c *localClient) startLocalIPv4Broadcasts(localPort int) {
  72. c.beacon = beacon.NewBroadcast(localPort)
  73. c.Add(c.beacon)
  74. go c.recvAnnouncements(c.beacon)
  75. }
  76. func (c *localClient) startLocalIPv6Multicasts(localMCAddr string) {
  77. c.beacon = beacon.NewMulticast(localMCAddr)
  78. c.Add(c.beacon)
  79. go c.recvAnnouncements(c.beacon)
  80. }
  81. // Lookup returns a list of addresses the device is available at. Local
  82. // discovery never returns relays.
  83. func (c *localClient) Lookup(device protocol.DeviceID) (direct []string, relays []Relay, err error) {
  84. if cache, ok := c.Get(device); ok {
  85. if time.Since(cache.when) < CacheLifeTime {
  86. direct = cache.Direct
  87. relays = cache.Relays
  88. }
  89. }
  90. return
  91. }
  92. func (c *localClient) String() string {
  93. return c.name
  94. }
  95. func (c *localClient) Error() error {
  96. return c.beacon.Error()
  97. }
  98. func (c *localClient) announcementPkt() Announce {
  99. var addrs []Address
  100. for _, addr := range c.addrList.AllAddresses() {
  101. addrs = append(addrs, Address{
  102. URL: addr,
  103. })
  104. }
  105. var relays []Relay
  106. for _, relay := range c.relayStat.Relays() {
  107. latency, ok := c.relayStat.RelayStatus(relay)
  108. if ok {
  109. relays = append(relays, Relay{
  110. URL: relay,
  111. Latency: int32(latency / time.Millisecond),
  112. })
  113. }
  114. }
  115. return Announce{
  116. Magic: AnnouncementMagic,
  117. This: Device{
  118. ID: c.myID[:],
  119. Addresses: addrs,
  120. Relays: relays,
  121. },
  122. }
  123. }
  124. func (c *localClient) sendLocalAnnouncements() {
  125. var pkt = c.announcementPkt()
  126. msg := pkt.MustMarshalXDR()
  127. for {
  128. c.beacon.Send(msg)
  129. select {
  130. case <-c.localBcastTick:
  131. case <-c.forcedBcastTick:
  132. }
  133. }
  134. }
  135. func (c *localClient) recvAnnouncements(b beacon.Interface) {
  136. for {
  137. buf, addr := b.Recv()
  138. var pkt Announce
  139. err := pkt.UnmarshalXDR(buf)
  140. if err != nil && err != io.EOF {
  141. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  142. continue
  143. }
  144. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  145. var newDevice bool
  146. if bytes.Compare(pkt.This.ID, c.myID[:]) != 0 {
  147. newDevice = c.registerDevice(addr, pkt.This)
  148. }
  149. if newDevice {
  150. select {
  151. case c.forcedBcastTick <- time.Now():
  152. }
  153. }
  154. }
  155. }
  156. func (c *localClient) registerDevice(src net.Addr, device Device) bool {
  157. var id protocol.DeviceID
  158. copy(id[:], device.ID)
  159. // Remember whether we already had a valid cache entry for this device.
  160. ce, existsAlready := c.Get(id)
  161. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime
  162. // Any empty or unspecified addresses should be set to the source address
  163. // of the announcement. We also skip any addresses we can't parse.
  164. var validAddresses []string
  165. for _, addr := range device.Addresses {
  166. u, err := url.Parse(addr.URL)
  167. if err != nil {
  168. continue
  169. }
  170. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  171. if err != nil {
  172. continue
  173. }
  174. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  175. host, _, err := net.SplitHostPort(src.String())
  176. if err != nil {
  177. continue
  178. }
  179. u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
  180. validAddresses = append(validAddresses, u.String())
  181. } else {
  182. validAddresses = append(validAddresses, addr.URL)
  183. }
  184. }
  185. c.Set(id, CacheEntry{
  186. Direct: validAddresses,
  187. Relays: device.Relays,
  188. when: time.Now(),
  189. found: true,
  190. })
  191. if isNewDevice {
  192. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  193. "device": id.String(),
  194. "addrs": device.Addresses,
  195. "relays": device.Relays,
  196. })
  197. }
  198. return isNewDevice
  199. }