local.go 5.9 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "encoding/hex"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "net"
  14. "net/url"
  15. "strconv"
  16. "time"
  17. "github.com/syncthing/protocol"
  18. "github.com/syncthing/syncthing/lib/beacon"
  19. "github.com/syncthing/syncthing/lib/events"
  20. "github.com/thejerf/suture"
  21. )
  22. type localClient struct {
  23. *suture.Supervisor
  24. myID protocol.DeviceID
  25. addrList AddressLister
  26. relayStat RelayStatusProvider
  27. name string
  28. beacon beacon.Interface
  29. localBcastStart time.Time
  30. localBcastTick <-chan time.Time
  31. forcedBcastTick chan time.Time
  32. *cache
  33. }
  34. const (
  35. BroadcastInterval = 30 * time.Second
  36. CacheLifeTime = 3 * BroadcastInterval
  37. )
  38. var (
  39. ErrIncorrectMagic = errors.New("incorrect magic number")
  40. )
  41. func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, relayStat RelayStatusProvider) (FinderService, error) {
  42. c := &localClient{
  43. Supervisor: suture.NewSimple("local"),
  44. myID: id,
  45. addrList: addrList,
  46. relayStat: relayStat,
  47. localBcastTick: time.Tick(BroadcastInterval),
  48. forcedBcastTick: make(chan time.Time),
  49. localBcastStart: time.Now(),
  50. cache: newCache(),
  51. }
  52. host, port, err := net.SplitHostPort(addr)
  53. if err != nil {
  54. return nil, err
  55. }
  56. if len(host) == 0 {
  57. // A broadcast client
  58. c.name = "IPv4 local"
  59. bcPort, err := strconv.Atoi(port)
  60. if err != nil {
  61. return nil, err
  62. }
  63. c.startLocalIPv4Broadcasts(bcPort)
  64. } else {
  65. // A multicast client
  66. c.name = "IPv6 local"
  67. c.startLocalIPv6Multicasts(addr)
  68. }
  69. go c.sendLocalAnnouncements()
  70. return c, nil
  71. }
  72. func (c *localClient) startLocalIPv4Broadcasts(localPort int) {
  73. c.beacon = beacon.NewBroadcast(localPort)
  74. c.Add(c.beacon)
  75. go c.recvAnnouncements(c.beacon)
  76. }
  77. func (c *localClient) startLocalIPv6Multicasts(localMCAddr string) {
  78. c.beacon = beacon.NewMulticast(localMCAddr)
  79. c.Add(c.beacon)
  80. go c.recvAnnouncements(c.beacon)
  81. }
  82. // Lookup returns a list of addresses the device is available at. Local
  83. // discovery never returns relays.
  84. func (c *localClient) Lookup(device protocol.DeviceID) (direct []string, relays []Relay, err error) {
  85. if cache, ok := c.Get(device); ok {
  86. if time.Since(cache.when) < CacheLifeTime {
  87. direct = cache.Direct
  88. relays = cache.Relays
  89. }
  90. }
  91. return
  92. }
  93. func (c *localClient) String() string {
  94. return c.name
  95. }
  96. func (c *localClient) Error() error {
  97. return c.beacon.Error()
  98. }
  99. func (c *localClient) announcementPkt() Announce {
  100. addrs := c.addrList.AllAddresses()
  101. var relays []Relay
  102. for _, relay := range c.relayStat.Relays() {
  103. latency, ok := c.relayStat.RelayStatus(relay)
  104. if ok {
  105. relays = append(relays, Relay{
  106. URL: relay,
  107. Latency: int32(latency / time.Millisecond),
  108. })
  109. }
  110. }
  111. return Announce{
  112. Magic: AnnouncementMagic,
  113. This: Device{
  114. ID: c.myID[:],
  115. Addresses: addrs,
  116. Relays: relays,
  117. },
  118. }
  119. }
  120. func (c *localClient) sendLocalAnnouncements() {
  121. var pkt = c.announcementPkt()
  122. msg := pkt.MustMarshalXDR()
  123. for {
  124. c.beacon.Send(msg)
  125. select {
  126. case <-c.localBcastTick:
  127. case <-c.forcedBcastTick:
  128. }
  129. }
  130. }
  131. func (c *localClient) recvAnnouncements(b beacon.Interface) {
  132. for {
  133. buf, addr := b.Recv()
  134. var pkt Announce
  135. err := pkt.UnmarshalXDR(buf)
  136. if err != nil && err != io.EOF {
  137. if debug {
  138. l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
  139. }
  140. continue
  141. }
  142. if debug {
  143. l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
  144. }
  145. var newDevice bool
  146. if bytes.Compare(pkt.This.ID, c.myID[:]) != 0 {
  147. newDevice = c.registerDevice(addr, pkt.This)
  148. }
  149. if newDevice {
  150. select {
  151. case c.forcedBcastTick <- time.Now():
  152. }
  153. }
  154. }
  155. }
  156. func (c *localClient) registerDevice(src net.Addr, device Device) bool {
  157. var id protocol.DeviceID
  158. copy(id[:], device.ID)
  159. // Remember whether we already had a valid cache entry for this device.
  160. ce, existsAlready := c.Get(id)
  161. isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime
  162. // Any empty or unspecified addresses should be set to the source address
  163. // of the announcement. We also skip any addresses we can't parse.
  164. var validAddresses []string
  165. for _, addr := range device.Addresses {
  166. u, err := url.Parse(addr)
  167. if err != nil {
  168. continue
  169. }
  170. tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
  171. if err != nil {
  172. continue
  173. }
  174. if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
  175. host, _, err := net.SplitHostPort(src.String())
  176. if err != nil {
  177. continue
  178. }
  179. u.Host = fmt.Sprintf("%s:%d", host, tcpAddr.Port)
  180. validAddresses = append(validAddresses, u.String())
  181. } else {
  182. validAddresses = append(validAddresses, addr)
  183. }
  184. }
  185. c.Set(id, CacheEntry{
  186. Direct: validAddresses,
  187. Relays: device.Relays,
  188. when: time.Now(),
  189. found: true,
  190. })
  191. if isNewDevice {
  192. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  193. "device": id.String(),
  194. "addrs": device.Addresses,
  195. "relays": device.Relays,
  196. })
  197. }
  198. return isNewDevice
  199. }
  200. func addrToAddr(addr *net.TCPAddr) string {
  201. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  202. return fmt.Sprintf(":%c", addr.Port)
  203. } else if bs := addr.IP.To4(); bs != nil {
  204. return fmt.Sprintf("%s:%c", bs.String(), addr.Port)
  205. } else if bs := addr.IP.To16(); bs != nil {
  206. return fmt.Sprintf("[%s]:%c", bs.String(), addr.Port)
  207. }
  208. return ""
  209. }
  210. func resolveAddrs(addrs []string) []string {
  211. var raddrs []string
  212. for _, addrStr := range addrs {
  213. uri, err := url.Parse(addrStr)
  214. if err != nil {
  215. continue
  216. }
  217. addrRes, err := net.ResolveTCPAddr("tcp", uri.Host)
  218. if err != nil {
  219. continue
  220. }
  221. addr := addrToAddr(addrRes)
  222. if len(addr) > 0 {
  223. uri.Host = addr
  224. raddrs = append(raddrs, uri.String())
  225. }
  226. }
  227. return raddrs
  228. }