client_udp.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "encoding/hex"
  9. "io"
  10. "net"
  11. "net/url"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "github.com/syncthing/protocol"
  16. )
  17. func init() {
  18. for _, proto := range []string{"udp", "udp4", "udp6"} {
  19. Register(proto, func(uri *url.URL, pkt *Announce) (Client, error) {
  20. c := &UDPClient{}
  21. err := c.Start(uri, pkt)
  22. if err != nil {
  23. return nil, err
  24. }
  25. return c, nil
  26. })
  27. }
  28. }
  29. type UDPClient struct {
  30. url *url.URL
  31. id protocol.DeviceID
  32. stop chan struct{}
  33. wg sync.WaitGroup
  34. listenAddress *net.UDPAddr
  35. globalBroadcastInterval time.Duration
  36. errorRetryInterval time.Duration
  37. status bool
  38. mut sync.RWMutex
  39. }
  40. func (d *UDPClient) Start(uri *url.URL, pkt *Announce) error {
  41. d.url = uri
  42. d.id = protocol.DeviceIDFromBytes(pkt.This.ID)
  43. d.stop = make(chan struct{})
  44. params := uri.Query()
  45. // The address must not have a port, as otherwise both announce and lookup
  46. // sockets would try to bind to the same port.
  47. addr, err := net.ResolveUDPAddr(d.url.Scheme, params.Get("listenaddress")+":0")
  48. if err != nil {
  49. return err
  50. }
  51. d.listenAddress = addr
  52. broadcastSeconds, err := strconv.ParseUint(params.Get("broadcast"), 0, 0)
  53. if err != nil {
  54. d.globalBroadcastInterval = DefaultGlobalBroadcastInterval
  55. } else {
  56. d.globalBroadcastInterval = time.Duration(broadcastSeconds) * time.Second
  57. }
  58. retrySeconds, err := strconv.ParseUint(params.Get("retry"), 0, 0)
  59. if err != nil {
  60. d.errorRetryInterval = DefaultErrorRetryInternval
  61. } else {
  62. d.errorRetryInterval = time.Duration(retrySeconds) * time.Second
  63. }
  64. d.wg.Add(1)
  65. go d.broadcast(pkt.MustMarshalXDR())
  66. return nil
  67. }
  68. func (d *UDPClient) broadcast(pkt []byte) {
  69. defer d.wg.Done()
  70. conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress)
  71. for err != nil {
  72. if debug {
  73. l.Debugf("discover %s: broadcast listen: %v; trying again in %v", d.url, err, d.errorRetryInterval)
  74. }
  75. select {
  76. case <-d.stop:
  77. return
  78. case <-time.After(d.errorRetryInterval):
  79. }
  80. conn, err = net.ListenUDP(d.url.Scheme, d.listenAddress)
  81. }
  82. defer conn.Close()
  83. remote, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  84. for err != nil {
  85. if debug {
  86. l.Debugf("discover %s: broadcast resolve: %v; trying again in %v", d.url, err, d.errorRetryInterval)
  87. }
  88. select {
  89. case <-d.stop:
  90. return
  91. case <-time.After(d.errorRetryInterval):
  92. }
  93. remote, err = net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  94. }
  95. timer := time.NewTimer(0)
  96. for {
  97. select {
  98. case <-d.stop:
  99. return
  100. case <-timer.C:
  101. var ok bool
  102. if debug {
  103. l.Debugf("discover %s: broadcast: Sending self announcement to %v", d.url, remote)
  104. }
  105. _, err := conn.WriteTo(pkt, remote)
  106. if err != nil {
  107. if debug {
  108. l.Debugf("discover %s: broadcast: Failed to send self announcement: %s", d.url, err)
  109. }
  110. ok = false
  111. } else {
  112. // Verify that the announce server responds positively for our device ID
  113. time.Sleep(1 * time.Second)
  114. res := d.Lookup(d.id)
  115. if debug {
  116. l.Debugf("discover %s: broadcast: Self-lookup returned: %v", d.url, res)
  117. }
  118. ok = len(res) > 0
  119. }
  120. d.mut.Lock()
  121. d.status = ok
  122. d.mut.Unlock()
  123. if ok {
  124. timer.Reset(d.globalBroadcastInterval)
  125. } else {
  126. timer.Reset(d.errorRetryInterval)
  127. }
  128. }
  129. }
  130. }
  131. func (d *UDPClient) Lookup(device protocol.DeviceID) []string {
  132. extIP, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  133. if err != nil {
  134. if debug {
  135. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  136. }
  137. return nil
  138. }
  139. conn, err := net.DialUDP(d.url.Scheme, d.listenAddress, extIP)
  140. if err != nil {
  141. if debug {
  142. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  143. }
  144. return nil
  145. }
  146. defer conn.Close()
  147. err = conn.SetDeadline(time.Now().Add(5 * time.Second))
  148. if err != nil {
  149. if debug {
  150. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  151. }
  152. return nil
  153. }
  154. buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
  155. _, err = conn.Write(buf)
  156. if err != nil {
  157. if debug {
  158. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  159. }
  160. return nil
  161. }
  162. buf = make([]byte, 2048)
  163. n, err := conn.Read(buf)
  164. if err != nil {
  165. if err, ok := err.(net.Error); ok && err.Timeout() {
  166. // Expected if the server doesn't know about requested device ID
  167. return nil
  168. }
  169. if debug {
  170. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  171. }
  172. return nil
  173. }
  174. var pkt Announce
  175. err = pkt.UnmarshalXDR(buf[:n])
  176. if err != nil && err != io.EOF {
  177. if debug {
  178. l.Debugf("discover %s: Lookup(%s): %s\n%s", d.url, device, err, hex.Dump(buf[:n]))
  179. }
  180. return nil
  181. }
  182. var addrs []string
  183. for _, a := range pkt.This.Addresses {
  184. deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  185. addrs = append(addrs, deviceAddr)
  186. }
  187. if debug {
  188. l.Debugf("discover %s: Lookup(%s) result: %v", d.url, device, addrs)
  189. }
  190. return addrs
  191. }
  192. func (d *UDPClient) Stop() {
  193. if d.stop != nil {
  194. close(d.stop)
  195. d.wg.Wait()
  196. }
  197. }
  198. func (d *UDPClient) StatusOK() bool {
  199. d.mut.RLock()
  200. defer d.mut.RUnlock()
  201. return d.status
  202. }
  203. func (d *UDPClient) Address() string {
  204. return d.url.String()
  205. }