local.go 6.0 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "net"
  14. "net/url"
  15. "strconv"
  16. "time"
  17. "github.com/syncthing/syncthing/lib/beacon"
  18. "github.com/syncthing/syncthing/lib/events"
  19. "github.com/syncthing/syncthing/lib/protocol"
  20. "github.com/thejerf/suture"
  21. )
  22. type localClient struct {
  23. *suture.Supervisor
  24. myID protocol.DeviceID
  25. addrList AddressLister
  26. relayStat RelayStatusProvider
  27. name string
  28. beacon beacon.Interface
  29. localBcastStart time.Time
  30. localBcastTick <-chan time.Time
  31. forcedBcastTick chan time.Time
  32. *cache
  33. }
  34. const (
  35. BroadcastInterval = 30 * time.Second
  36. CacheLifeTime = 3 * BroadcastInterval
  37. )
  38. var (
  39. ErrIncorrectMagic = errors.New("incorrect magic number")
  40. )
  41. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, relayStat RelayStatusProvider) (FinderService, error) {
  42. c := &localClient{
  43. Supervisor: suture.NewSimple("local"),
  44. myID: id,
  45. addrList: addrList,
  46. relayStat: relayStat,
  47. localBcastTick: time.Tick(BroadcastInterval),
  48. forcedBcastTick: make(chan time.Time),
  49. localBcastStart: time.Now(),
  50. cache: newCache(),
  51. }
  52. host, port, err := net.SplitHostPort(addr)
  53. if err != nil {
  54. return nil, err
  55. }
  56. if len(host) == 0 {
  57. // A broadcast client
  58. c.name = "IPv4 local"
  59. bcPort, err := strconv.Atoi(port)
  60. if err != nil {
  61. return nil, err
  62. }
  63. c.startLocalIPv4Broadcasts(bcPort)
  64. } else {
  65. // A multicast client
  66. c.name = "IPv6 local"
  67. c.startLocalIPv6Multicasts(addr)
  68. }
  69. go c.sendLocalAnnouncements()
  70. return c, nil
  71. }
  72. func (c *localClient) startLocalIPv4Broadcasts(localPort int) {
  73. c.beacon = beacon.NewBroadcast(localPort)
  74. c.Add(c.beacon)
  75. go c.recvAnnouncements(c.beacon)
  76. }
  77. func (c *localClient) startLocalIPv6Multicasts(localMCAddr string) {
  78. c.beacon = beacon.NewMulticast(localMCAddr)
  79. c.Add(c.beacon)
  80. go c.recvAnnouncements(c.beacon)
  81. }
  82. // Lookup returns a list of addresses the device is available at. Local
  83. // discovery never returns relays.
  84. func (c *localClient) Lookup(device protocol.DeviceID) (direct []string, relays []Relay, err error) {
  85. if cache, ok := c.Get(device); ok {
  86. if time.Since(cache.when) < CacheLifeTime {
  87. direct = cache.Direct
  88. relays = cache.Relays
  89. }
  90. }
  91. return
  92. }
  93. func (c *localClient) String() string {
  94. return c.name
  95. }
  96. func (c *localClient) Error() error {
  97. return c.beacon.Error()
  98. }
  99. func (c *localClient) announcementPkt() Announce {
  100. var addrs []Address
  101. for _, addr := range c.addrList.AllAddresses() {
  102. addrs = append(addrs, Address{
  103. URL: addr,
  104. })
  105. }
  106. var relays []Relay
  107. for _, relay := range c.relayStat.Relays() {
  108. latency, ok := c.relayStat.RelayStatus(relay)
  109. if ok {
  110. relays = append(relays, Relay{
  111. URL: relay,
  112. Latency: int32(latency / time.Millisecond),
  113. })
  114. }
  115. }
  116. return Announce{
  117. Magic: AnnouncementMagic,
  118. This: Device{
  119. ID: c.myID[:],
  120. Addresses: addrs,
  121. Relays: relays,
  122. },
  123. }
  124. }
  125. func (c *localClient) sendLocalAnnouncements() {
  126. var pkt = c.announcementPkt()
  127. msg := pkt.MustMarshalXDR()
  128. for {
  129. c.beacon.Send(msg)
  130. select {
  131. case <-c.localBcastTick:
  132. case <-c.forcedBcastTick:
  133. }
  134. }
  135. }
  136. func (c *localClient) recvAnnouncements(b beacon.Interface) {
  137. for {
  138. buf, addr := b.Recv()
  139. var pkt Announce
  140. err := pkt.UnmarshalXDR(buf)
  141. if err != nil && err != io.EOF {
  142. if debug {
  143. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  144. }
  145. continue
  146. }
  147. if debug {
  148. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  149. }
  150. var newDevice bool
  151. if bytes.Compare(pkt.This.ID, c.myID[:]) != 0 {
  152. newDevice = c.registerDevice(addr, pkt.This)
  153. }
  154. if newDevice {
  155. select {
  156. case c.forcedBcastTick <- time.Now():
  157. }
  158. }
  159. }
  160. }
  161. func (c *localClient) registerDevice(src net.Addr, device Device) bool {
  162. var id protocol.DeviceID
  163. copy(id[:], device.ID)
  164. // Remember whether we already had a valid cache entry for this device.
  165. ce, existsAlready := c.Get(id)
  166. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime
  167. // Any empty or unspecified addresses should be set to the source address
  168. // of the announcement. We also skip any addresses we can't parse.
  169. var validAddresses []string
  170. for _, addr := range device.Addresses {
  171. u, err := url.Parse(addr.URL)
  172. if err != nil {
  173. continue
  174. }
  175. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  176. if err != nil {
  177. continue
  178. }
  179. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  180. host, _, err := net.SplitHostPort(src.String())
  181. if err != nil {
  182. continue
  183. }
  184. u.Host = fmt.Sprintf("%s:%d", host, tcpAddr.Port)
  185. validAddresses = append(validAddresses, u.String())
  186. } else {
  187. validAddresses = append(validAddresses, addr.URL)
  188. }
  189. }
  190. c.Set(id, CacheEntry{
  191. Direct: validAddresses,
  192. Relays: device.Relays,
  193. when: time.Now(),
  194. found: true,
  195. })
  196. if isNewDevice {
  197. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  198. "device": id.String(),
  199. "addrs": device.Addresses,
  200. "relays": device.Relays,
  201. })
  202. }
  203. return isNewDevice
  204. }
  205. func addrToAddr(addr *net.TCPAddr) string {
  206. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  207. return fmt.Sprintf(":%c", addr.Port)
  208. } else if bs := addr.IP.To4(); bs != nil {
  209. return fmt.Sprintf("%s:%c", bs.String(), addr.Port)
  210. } else if bs := addr.IP.To16(); bs != nil {
  211. return fmt.Sprintf("[%s]:%c", bs.String(), addr.Port)
  212. }
  213. return ""
  214. }
  215. func resolveAddrs(addrs []string) []string {
  216. var raddrs []string
  217. for _, addrStr := range addrs {
  218. uri, err := url.Parse(addrStr)
  219. if err != nil {
  220. continue
  221. }
  222. addrRes, err := net.ResolveTCPAddr("tcp", uri.Host)
  223. if err != nil {
  224. continue
  225. }
  226. addr := addrToAddr(addrRes)
  227. if len(addr) > 0 {
  228. uri.Host = addr
  229. raddrs = append(raddrs, uri.String())
  230. }
  231. }
  232. return raddrs
  233. }