client_udp.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "encoding/hex"
  9. "io"
  10. "net"
  11. "net/url"
  12. "strconv"
  13. "time"
  14. "github.com/syncthing/protocol"
  15. "github.com/syncthing/syncthing/lib/events"
  16. "github.com/syncthing/syncthing/lib/sync"
  17. )
  18. func init() {
  19. for _, proto := range []string{"udp", "udp4", "udp6"} {
  20. Register(proto, func(uri *url.URL, announcer Announcer) (Client, error) {
  21. c := &UDPClient{
  22. announcer: announcer,
  23. wg: sync.NewWaitGroup(),
  24. mut: sync.NewRWMutex(),
  25. }
  26. err := c.Start(uri)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return c, nil
  31. })
  32. }
  33. }
  34. type UDPClient struct {
  35. url *url.URL
  36. stop chan struct{}
  37. wg sync.WaitGroup
  38. listenAddress *net.UDPAddr
  39. globalBroadcastInterval time.Duration
  40. errorRetryInterval time.Duration
  41. announcer Announcer
  42. status bool
  43. mut sync.RWMutex
  44. }
  45. func (d *UDPClient) Start(uri *url.URL) error {
  46. d.url = uri
  47. d.stop = make(chan struct{})
  48. params := uri.Query()
  49. // The address must not have a port, as otherwise both announce and lookup
  50. // sockets would try to bind to the same port.
  51. addr, err := net.ResolveUDPAddr(d.url.Scheme, params.Get("listenaddress")+":0")
  52. if err != nil {
  53. return err
  54. }
  55. d.listenAddress = addr
  56. broadcastSeconds, err := strconv.ParseUint(params.Get("broadcast"), 0, 0)
  57. if err != nil {
  58. d.globalBroadcastInterval = DefaultGlobalBroadcastInterval
  59. } else {
  60. d.globalBroadcastInterval = time.Duration(broadcastSeconds) * time.Second
  61. }
  62. retrySeconds, err := strconv.ParseUint(params.Get("retry"), 0, 0)
  63. if err != nil {
  64. d.errorRetryInterval = DefaultErrorRetryInternval
  65. } else {
  66. d.errorRetryInterval = time.Duration(retrySeconds) * time.Second
  67. }
  68. d.wg.Add(1)
  69. go d.broadcast()
  70. return nil
  71. }
  72. func (d *UDPClient) broadcast() {
  73. defer d.wg.Done()
  74. conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress)
  75. for err != nil {
  76. if debug {
  77. l.Debugf("discover %s: broadcast listen: %v; trying again in %v", d.url, err, d.errorRetryInterval)
  78. }
  79. select {
  80. case <-d.stop:
  81. return
  82. case <-time.After(d.errorRetryInterval):
  83. }
  84. conn, err = net.ListenUDP(d.url.Scheme, d.listenAddress)
  85. }
  86. defer conn.Close()
  87. remote, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  88. for err != nil {
  89. if debug {
  90. l.Debugf("discover %s: broadcast resolve: %v; trying again in %v", d.url, err, d.errorRetryInterval)
  91. }
  92. select {
  93. case <-d.stop:
  94. return
  95. case <-time.After(d.errorRetryInterval):
  96. }
  97. remote, err = net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  98. }
  99. timer := time.NewTimer(0)
  100. eventSub := events.Default.Subscribe(events.ExternalPortMappingChanged)
  101. defer events.Default.Unsubscribe(eventSub)
  102. for {
  103. select {
  104. case <-d.stop:
  105. return
  106. case <-eventSub.C():
  107. ok := d.sendAnnouncement(remote, conn)
  108. d.mut.Lock()
  109. d.status = ok
  110. d.mut.Unlock()
  111. case <-timer.C:
  112. ok := d.sendAnnouncement(remote, conn)
  113. d.mut.Lock()
  114. d.status = ok
  115. d.mut.Unlock()
  116. if ok {
  117. timer.Reset(d.globalBroadcastInterval)
  118. } else {
  119. timer.Reset(d.errorRetryInterval)
  120. }
  121. }
  122. }
  123. }
  124. func (d *UDPClient) sendAnnouncement(remote net.Addr, conn *net.UDPConn) bool {
  125. if debug {
  126. l.Debugf("discover %s: broadcast: Sending self announcement to %v", d.url, remote)
  127. }
  128. ann := d.announcer.Announcement()
  129. pkt, err := ann.MarshalXDR()
  130. if err != nil {
  131. return false
  132. }
  133. myID := protocol.DeviceIDFromBytes(ann.This.ID)
  134. _, err = conn.WriteTo(pkt, remote)
  135. if err != nil {
  136. if debug {
  137. l.Debugf("discover %s: broadcast: Failed to send self announcement: %s", d.url, err)
  138. }
  139. return false
  140. }
  141. // Verify that the announce server responds positively for our device ID
  142. time.Sleep(1 * time.Second)
  143. ann, err = d.Lookup(myID)
  144. if err != nil && debug {
  145. l.Debugf("discover %s: broadcast: Self-lookup failed: %v", d.url, err)
  146. } else if debug {
  147. l.Debugf("discover %s: broadcast: Self-lookup returned: %v", d.url, ann.This.Addresses)
  148. }
  149. return len(ann.This.Addresses) > 0
  150. }
  151. func (d *UDPClient) Lookup(device protocol.DeviceID) (Announce, error) {
  152. extIP, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
  153. if err != nil {
  154. if debug {
  155. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  156. }
  157. return Announce{}, err
  158. }
  159. conn, err := net.DialUDP(d.url.Scheme, d.listenAddress, extIP)
  160. if err != nil {
  161. if debug {
  162. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  163. }
  164. return Announce{}, err
  165. }
  166. defer conn.Close()
  167. err = conn.SetDeadline(time.Now().Add(5 * time.Second))
  168. if err != nil {
  169. if debug {
  170. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  171. }
  172. return Announce{}, err
  173. }
  174. buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
  175. _, err = conn.Write(buf)
  176. if err != nil {
  177. if debug {
  178. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  179. }
  180. return Announce{}, err
  181. }
  182. buf = make([]byte, 2048)
  183. n, err := conn.Read(buf)
  184. if err != nil {
  185. if err, ok := err.(net.Error); ok && err.Timeout() {
  186. // Expected if the server doesn't know about requested device ID
  187. return Announce{}, err
  188. }
  189. if debug {
  190. l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
  191. }
  192. return Announce{}, err
  193. }
  194. var pkt Announce
  195. err = pkt.UnmarshalXDR(buf[:n])
  196. if err != nil && err != io.EOF {
  197. if debug {
  198. l.Debugf("discover %s: Lookup(%s): %s\n%s", d.url, device, err, hex.Dump(buf[:n]))
  199. }
  200. return Announce{}, err
  201. }
  202. if debug {
  203. l.Debugf("discover %s: Lookup(%s) result: %v relays: %v", d.url, device, pkt.This.Addresses, pkt.This.Relays)
  204. }
  205. return pkt, nil
  206. }
  207. func (d *UDPClient) Stop() {
  208. if d.stop != nil {
  209. close(d.stop)
  210. d.wg.Wait()
  211. }
  212. }
  213. func (d *UDPClient) StatusOK() bool {
  214. d.mut.RLock()
  215. defer d.mut.RUnlock()
  216. return d.status
  217. }
  218. func (d *UDPClient) Address() string {
  219. return d.url.String()
  220. }