client_udp.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "encoding/hex"
  9. "io"
  10. "net"
  11. "net/url"
  12. "strconv"
  13. "time"
  14. "github.com/syncthing/protocol"
  15. "github.com/syncthing/syncthing/internal/sync"
  16. )
  17. func init() {
  18. for _, proto := range []string{"udp", "udp4", "udp6"} {
  19. Register(proto, func(uri *url.URL, pkt *Announce) (Client, error) {
  20. c := &UDPClient{
  21. wg: sync.NewWaitGroup(),
  22. mut: sync.NewRWMutex(),
  23. }
  24. err := c.Start(uri, pkt)
  25. if err != nil {
  26. return nil, err
  27. }
  28. return c, nil
  29. })
  30. }
  31. }
  32. type UDPClient struct {
  33. url *url.URL
  34. id protocol.DeviceID
  35. stop chan struct{}
  36. wg sync.WaitGroup
  37. listenAddress *net.UDPAddr
  38. globalBroadcastInterval time.Duration
  39. errorRetryInterval time.Duration
  40. status bool
  41. mut sync.RWMutex
  42. }
  43. func (d *UDPClient) Start(uri *url.URL, pkt *Announce) error {
  44. d.url = uri
  45. d.id = protocol.DeviceIDFromBytes(pkt.This.ID)
  46. d.stop = make(chan struct{})
  47. params := uri.Query()
  48. // The address must not have a port, as otherwise both announce and lookup
  49. // sockets would try to bind to the same port.
  50. addr, err := net.ResolveUDPAddr(d.url.Scheme, params.Get("listenaddress")+":0")
  51. if err != nil {
  52. return err
  53. }
  54. d.listenAddress = addr
  55. broadcastSeconds, err := strconv.ParseUint(params.Get("broadcast"), 0, 0)
  56. if err != nil {
  57. d.globalBroadcastInterval = DefaultGlobalBroadcastInterval
  58. } else {
  59. d.globalBroadcastInterval = time.Duration(broadcastSeconds) * time.Second
  60. }
  61. retrySeconds, err := strconv.ParseUint(params.Get("retry"), 0, 0)
  62. if err != nil {
  63. d.errorRetryInterval = DefaultErrorRetryInternval
  64. } else {
  65. d.errorRetryInterval = time.Duration(retrySeconds) * time.Second
  66. }
  67. d.wg.Add(1)
  68. go d.broadcast(pkt.MustMarshalXDR())
  69. return nil
  70. }
  71. func (d *UDPClient) broadcast(pkt []byte) {
  72. defer d.wg.Done()
  73. conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress)
  74. for err != nil {
  75. if debug {
  76. l.Debugf("discover %s: broadcast listen: %v; trying again in %v", d.url, err, d.errorRetryInterval)
  77. }
  78. select {
  79. case <-d.stop:
  80. return
  81. case <-time.After(d.errorRetryInterval):
  82. }
  83. conn, err = net.ListenUDP(d.url.Scheme, d.listenAddress)
  84. }
  85. defer conn.Close()
  86. remote, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  87. for err != nil {
  88. if debug {
  89. l.Debugf("discover %s: broadcast resolve: %v; trying again in %v", d.url, err, d.errorRetryInterval)
  90. }
  91. select {
  92. case <-d.stop:
  93. return
  94. case <-time.After(d.errorRetryInterval):
  95. }
  96. remote, err = net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  97. }
  98. timer := time.NewTimer(0)
  99. for {
  100. select {
  101. case <-d.stop:
  102. return
  103. case <-timer.C:
  104. var ok bool
  105. if debug {
  106. l.Debugf("discover %s: broadcast: Sending self announcement to %v", d.url, remote)
  107. }
  108. _, err := conn.WriteTo(pkt, remote)
  109. if err != nil {
  110. if debug {
  111. l.Debugf("discover %s: broadcast: Failed to send self announcement: %s", d.url, err)
  112. }
  113. ok = false
  114. } else {
  115. // Verify that the announce server responds positively for our device ID
  116. time.Sleep(1 * time.Second)
  117. res := d.Lookup(d.id)
  118. if debug {
  119. l.Debugf("discover %s: broadcast: Self-lookup returned: %v", d.url, res)
  120. }
  121. ok = len(res) > 0
  122. }
  123. d.mut.Lock()
  124. d.status = ok
  125. d.mut.Unlock()
  126. if ok {
  127. timer.Reset(d.globalBroadcastInterval)
  128. } else {
  129. timer.Reset(d.errorRetryInterval)
  130. }
  131. }
  132. }
  133. }
  134. func (d *UDPClient) Lookup(device protocol.DeviceID) []string {
  135. extIP, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  136. if err != nil {
  137. if debug {
  138. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  139. }
  140. return nil
  141. }
  142. conn, err := net.DialUDP(d.url.Scheme, d.listenAddress, extIP)
  143. if err != nil {
  144. if debug {
  145. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  146. }
  147. return nil
  148. }
  149. defer conn.Close()
  150. err = conn.SetDeadline(time.Now().Add(5 * time.Second))
  151. if err != nil {
  152. if debug {
  153. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  154. }
  155. return nil
  156. }
  157. buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
  158. _, err = conn.Write(buf)
  159. if err != nil {
  160. if debug {
  161. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  162. }
  163. return nil
  164. }
  165. buf = make([]byte, 2048)
  166. n, err := conn.Read(buf)
  167. if err != nil {
  168. if err, ok := err.(net.Error); ok && err.Timeout() {
  169. // Expected if the server doesn't know about requested device ID
  170. return nil
  171. }
  172. if debug {
  173. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  174. }
  175. return nil
  176. }
  177. var pkt Announce
  178. err = pkt.UnmarshalXDR(buf[:n])
  179. if err != nil && err != io.EOF {
  180. if debug {
  181. l.Debugf("discover %s: Lookup(%s): %s\n%s", d.url, device, err, hex.Dump(buf[:n]))
  182. }
  183. return nil
  184. }
  185. var addrs []string
  186. for _, a := range pkt.This.Addresses {
  187. deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  188. addrs = append(addrs, deviceAddr)
  189. }
  190. if debug {
  191. l.Debugf("discover %s: Lookup(%s) result: %v", d.url, device, addrs)
  192. }
  193. return addrs
  194. }
  195. func (d *UDPClient) Stop() {
  196. if d.stop != nil {
  197. close(d.stop)
  198. d.wg.Wait()
  199. }
  200. }
  201. func (d *UDPClient) StatusOK() bool {
  202. d.mut.RLock()
  203. defer d.mut.RUnlock()
  204. return d.status
  205. }
  206. func (d *UDPClient) Address() string {
  207. return d.url.String()
  208. }