kcp_listen.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "crypto/tls"
  9. "net"
  10. "net/url"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/AudriusButkevicius/kcp-go"
  16. "github.com/AudriusButkevicius/pfilter"
  17. "github.com/ccding/go-stun/stun"
  18. "github.com/xtaci/smux"
  19. "github.com/syncthing/syncthing/lib/config"
  20. "github.com/syncthing/syncthing/lib/nat"
  21. )
  22. func init() {
  23. factory := &kcpListenerFactory{}
  24. for _, scheme := range []string{"kcp", "kcp4", "kcp6"} {
  25. listeners[scheme] = factory
  26. }
  27. }
  28. type kcpListener struct {
  29. onAddressesChangedNotifier
  30. uri *url.URL
  31. cfg *config.Wrapper
  32. tlsCfg *tls.Config
  33. stop chan struct{}
  34. conns chan internalConn
  35. factory listenerFactory
  36. nat atomic.Value
  37. address *url.URL
  38. err error
  39. mut sync.RWMutex
  40. }
  41. func (t *kcpListener) Serve() {
  42. t.mut.Lock()
  43. t.err = nil
  44. t.mut.Unlock()
  45. network := strings.Replace(t.uri.Scheme, "kcp", "udp", -1)
  46. packetConn, err := net.ListenPacket(network, t.uri.Host)
  47. if err != nil {
  48. t.mut.Lock()
  49. t.err = err
  50. t.mut.Unlock()
  51. l.Infoln("listen (BEP/kcp):", err)
  52. return
  53. }
  54. filterConn := pfilter.NewPacketFilter(packetConn)
  55. kcpConn := filterConn.NewConn(kcpNoFilterPriority, nil)
  56. stunConn := filterConn.NewConn(kcpStunFilterPriority, &stunFilter{
  57. ids: make(map[string]time.Time),
  58. })
  59. filterConn.Start()
  60. registerFilter(filterConn)
  61. listener, err := kcp.ServeConn(nil, 0, 0, kcpConn)
  62. if err != nil {
  63. t.mut.Lock()
  64. t.err = err
  65. t.mut.Unlock()
  66. l.Infoln("listen (BEP/kcp):", err)
  67. return
  68. }
  69. defer listener.Close()
  70. defer stunConn.Close()
  71. defer kcpConn.Close()
  72. defer deregisterFilter(filterConn)
  73. defer packetConn.Close()
  74. l.Infof("KCP listener (%v) starting", kcpConn.LocalAddr())
  75. defer l.Infof("KCP listener (%v) shutting down", kcpConn.LocalAddr())
  76. go t.stunRenewal(stunConn)
  77. for {
  78. listener.SetDeadline(time.Now().Add(time.Second))
  79. conn, err := listener.AcceptKCP()
  80. select {
  81. case <-t.stop:
  82. if err == nil {
  83. conn.Close()
  84. }
  85. return
  86. default:
  87. }
  88. if err != nil {
  89. if err, ok := err.(net.Error); !ok || !err.Timeout() {
  90. l.Warnln("Accepting connection (BEP/kcp):", err)
  91. }
  92. continue
  93. }
  94. opts := t.cfg.Options()
  95. conn.SetStreamMode(true)
  96. conn.SetACKNoDelay(false)
  97. conn.SetWindowSize(opts.KCPSendWindowSize, opts.KCPReceiveWindowSize)
  98. conn.SetNoDelay(boolInt(opts.KCPNoDelay), opts.KCPUpdateIntervalMs, boolInt(opts.KCPFastResend), boolInt(!opts.KCPCongestionControl))
  99. l.Debugln("connect from", conn.RemoteAddr())
  100. ses, err := smux.Server(conn, smuxConfig)
  101. if err != nil {
  102. l.Debugln("smux server:", err)
  103. conn.Close()
  104. continue
  105. }
  106. ses.SetDeadline(time.Now().Add(10 * time.Second))
  107. stream, err := ses.AcceptStream()
  108. if err != nil {
  109. l.Debugln("smux accept:", err)
  110. ses.Close()
  111. continue
  112. }
  113. ses.SetDeadline(time.Time{})
  114. tc := tls.Server(&sessionClosingStream{stream, ses}, t.tlsCfg)
  115. tc.SetDeadline(time.Now().Add(time.Second * 10))
  116. err = tc.Handshake()
  117. if err != nil {
  118. l.Debugln("TLS handshake (BEP/kcp):", err)
  119. tc.Close()
  120. continue
  121. }
  122. tc.SetDeadline(time.Time{})
  123. t.conns <- internalConn{tc, connTypeKCPServer, kcpPriority}
  124. }
  125. }
  126. func (t *kcpListener) Stop() {
  127. close(t.stop)
  128. }
  129. func (t *kcpListener) URI() *url.URL {
  130. return t.uri
  131. }
  132. func (t *kcpListener) WANAddresses() []*url.URL {
  133. uris := t.LANAddresses()
  134. t.mut.RLock()
  135. if t.address != nil {
  136. uris = append(uris, t.address)
  137. }
  138. t.mut.RUnlock()
  139. return uris
  140. }
  141. func (t *kcpListener) LANAddresses() []*url.URL {
  142. return []*url.URL{t.uri}
  143. }
  144. func (t *kcpListener) Error() error {
  145. t.mut.RLock()
  146. err := t.err
  147. t.mut.RUnlock()
  148. return err
  149. }
  150. func (t *kcpListener) String() string {
  151. return t.uri.String()
  152. }
  153. func (t *kcpListener) Factory() listenerFactory {
  154. return t.factory
  155. }
  156. func (t *kcpListener) NATType() string {
  157. v := t.nat.Load().(stun.NATType)
  158. if v == stun.NATUnknown || v == stun.NATError {
  159. return "unknown"
  160. }
  161. return v.String()
  162. }
  163. func (t *kcpListener) stunRenewal(listener net.PacketConn) {
  164. client := stun.NewClientWithConnection(listener)
  165. client.SetSoftwareName("syncthing")
  166. var natType stun.NATType
  167. var extAddr *stun.Host
  168. var udpAddr *net.UDPAddr
  169. var err error
  170. oldType := stun.NATUnknown
  171. for {
  172. disabled:
  173. if t.cfg.Options().StunKeepaliveS < 1 {
  174. time.Sleep(time.Second)
  175. oldType = stun.NATUnknown
  176. t.nat.Store(stun.NATUnknown)
  177. t.mut.Lock()
  178. t.address = nil
  179. t.mut.Unlock()
  180. continue
  181. }
  182. for _, addr := range t.cfg.StunServers() {
  183. // Resolve the address, so that in case the server advertises two
  184. // IPs, we always hit the same one, as otherwise, the mapping might
  185. // expire as we hit the other address, and cause us to flip flop
  186. // between servers/external addresses, as a result flooding discovery
  187. // servers.
  188. udpAddr, err = net.ResolveUDPAddr("udp", addr)
  189. if err != nil {
  190. l.Debugf("%s stun addr resolution on %s: %s", t.uri, addr, err)
  191. continue
  192. }
  193. client.SetServerAddr(udpAddr.String())
  194. natType, extAddr, err = client.Discover()
  195. if err != nil || extAddr == nil {
  196. l.Debugf("%s stun discovery on %s: %s", t.uri, addr, err)
  197. continue
  198. }
  199. // The stun server is most likely borked, try another one.
  200. if natType == stun.NATError || natType == stun.NATUnknown || natType == stun.NATBlocked {
  201. l.Debugf("%s stun discovery on %s resolved to %s", t.uri, addr, natType)
  202. continue
  203. }
  204. if oldType != natType {
  205. l.Infof("%s detected NAT type: %s", t.uri, natType)
  206. t.nat.Store(natType)
  207. }
  208. for {
  209. changed := false
  210. uri := *t.uri
  211. uri.Host = extAddr.TransportAddr()
  212. t.mut.Lock()
  213. if t.address == nil || t.address.String() != uri.String() {
  214. l.Infof("%s resolved external address %s (via %s)", t.uri, uri.String(), addr)
  215. t.address = &uri
  216. changed = true
  217. }
  218. t.mut.Unlock()
  219. // This will most likely result in a call to WANAddresses() which tries to
  220. // get t.mut, so notify while unlocked.
  221. if changed {
  222. t.notifyAddressesChanged(t)
  223. }
  224. select {
  225. case <-time.After(time.Duration(t.cfg.Options().StunKeepaliveS) * time.Second):
  226. case <-t.stop:
  227. return
  228. }
  229. if t.cfg.Options().StunKeepaliveS < 1 {
  230. goto disabled
  231. }
  232. extAddr, err = client.Keepalive()
  233. if err != nil {
  234. l.Debugf("%s stun keepalive on %s: %s (%v)", t.uri, addr, err, extAddr)
  235. break
  236. }
  237. }
  238. oldType = natType
  239. }
  240. // We failed to contact all provided stun servers, chillout for a while.
  241. time.Sleep(time.Minute)
  242. }
  243. }
  244. type kcpListenerFactory struct{}
  245. func (f *kcpListenerFactory) New(uri *url.URL, cfg *config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener {
  246. l := &kcpListener{
  247. uri: fixupPort(uri, config.DefaultKCPPort),
  248. cfg: cfg,
  249. tlsCfg: tlsCfg,
  250. conns: conns,
  251. stop: make(chan struct{}),
  252. factory: f,
  253. }
  254. l.nat.Store(stun.NATUnknown)
  255. return l
  256. }
  257. func (kcpListenerFactory) Enabled(cfg config.Configuration) bool {
  258. return true
  259. }