kcp_listen.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "crypto/tls"
  9. "net"
  10. "net/url"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/AudriusButkevicius/pfilter"
  16. "github.com/ccding/go-stun/stun"
  17. "github.com/xtaci/kcp-go"
  18. "github.com/xtaci/smux"
  19. "github.com/syncthing/syncthing/lib/config"
  20. "github.com/syncthing/syncthing/lib/nat"
  21. )
  22. const stunRetryInterval = 5 * time.Minute
  23. func init() {
  24. factory := &kcpListenerFactory{}
  25. for _, scheme := range []string{"kcp", "kcp4", "kcp6"} {
  26. listeners[scheme] = factory
  27. }
  28. }
  29. type kcpListener struct {
  30. onAddressesChangedNotifier
  31. uri *url.URL
  32. cfg *config.Wrapper
  33. tlsCfg *tls.Config
  34. stop chan struct{}
  35. conns chan internalConn
  36. factory listenerFactory
  37. nat atomic.Value
  38. address *url.URL
  39. err error
  40. mut sync.RWMutex
  41. }
  42. func (t *kcpListener) Serve() {
  43. t.mut.Lock()
  44. t.err = nil
  45. t.mut.Unlock()
  46. network := strings.Replace(t.uri.Scheme, "kcp", "udp", -1)
  47. packetConn, err := net.ListenPacket(network, t.uri.Host)
  48. if err != nil {
  49. t.mut.Lock()
  50. t.err = err
  51. t.mut.Unlock()
  52. l.Infoln("Listen (BEP/kcp):", err)
  53. return
  54. }
  55. filterConn := pfilter.NewPacketFilter(packetConn)
  56. kcpConn := filterConn.NewConn(kcpNoFilterPriority, nil)
  57. stunConn := filterConn.NewConn(kcpStunFilterPriority, &stunFilter{
  58. ids: make(map[string]time.Time),
  59. })
  60. filterConn.Start()
  61. registerFilter(filterConn)
  62. listener, err := kcp.ServeConn(nil, 0, 0, kcpConn)
  63. if err != nil {
  64. t.mut.Lock()
  65. t.err = err
  66. t.mut.Unlock()
  67. l.Infoln("Listen (BEP/kcp):", err)
  68. return
  69. }
  70. defer listener.Close()
  71. defer stunConn.Close()
  72. defer kcpConn.Close()
  73. defer deregisterFilter(filterConn)
  74. defer packetConn.Close()
  75. l.Infof("KCP listener (%v) starting", kcpConn.LocalAddr())
  76. defer l.Infof("KCP listener (%v) shutting down", kcpConn.LocalAddr())
  77. go t.stunRenewal(stunConn)
  78. for {
  79. listener.SetDeadline(time.Now().Add(time.Second))
  80. conn, err := listener.AcceptKCP()
  81. select {
  82. case <-t.stop:
  83. if err == nil {
  84. conn.Close()
  85. }
  86. return
  87. default:
  88. }
  89. if err != nil {
  90. if err, ok := err.(net.Error); !ok || !err.Timeout() {
  91. l.Warnln("Listen (BEP/kcp): Accepting connection:", err)
  92. }
  93. continue
  94. }
  95. opts := t.cfg.Options()
  96. conn.SetStreamMode(true)
  97. conn.SetACKNoDelay(false)
  98. conn.SetWindowSize(opts.KCPSendWindowSize, opts.KCPReceiveWindowSize)
  99. conn.SetNoDelay(boolInt(opts.KCPNoDelay), opts.KCPUpdateIntervalMs, boolInt(opts.KCPFastResend), boolInt(!opts.KCPCongestionControl))
  100. l.Debugln("connect from", conn.RemoteAddr())
  101. ses, err := smux.Server(conn, smuxConfig)
  102. if err != nil {
  103. l.Debugln("smux server:", err)
  104. conn.Close()
  105. continue
  106. }
  107. ses.SetDeadline(time.Now().Add(10 * time.Second))
  108. stream, err := ses.AcceptStream()
  109. if err != nil {
  110. l.Debugln("smux accept:", err)
  111. ses.Close()
  112. continue
  113. }
  114. ses.SetDeadline(time.Time{})
  115. tc := tls.Server(&sessionClosingStream{stream, ses}, t.tlsCfg)
  116. tc.SetDeadline(time.Now().Add(time.Second * 10))
  117. err = tc.Handshake()
  118. if err != nil {
  119. l.Debugln("TLS handshake (BEP/kcp):", err)
  120. tc.Close()
  121. continue
  122. }
  123. tc.SetDeadline(time.Time{})
  124. t.conns <- internalConn{tc, connTypeKCPServer, kcpPriority}
  125. }
  126. }
  127. func (t *kcpListener) Stop() {
  128. close(t.stop)
  129. }
  130. func (t *kcpListener) URI() *url.URL {
  131. return t.uri
  132. }
  133. func (t *kcpListener) WANAddresses() []*url.URL {
  134. uris := t.LANAddresses()
  135. t.mut.RLock()
  136. if t.address != nil {
  137. uris = append(uris, t.address)
  138. }
  139. t.mut.RUnlock()
  140. return uris
  141. }
  142. func (t *kcpListener) LANAddresses() []*url.URL {
  143. return []*url.URL{t.uri}
  144. }
  145. func (t *kcpListener) Error() error {
  146. t.mut.RLock()
  147. err := t.err
  148. t.mut.RUnlock()
  149. return err
  150. }
  151. func (t *kcpListener) String() string {
  152. return t.uri.String()
  153. }
  154. func (t *kcpListener) Factory() listenerFactory {
  155. return t.factory
  156. }
  157. func (t *kcpListener) NATType() string {
  158. v := t.nat.Load().(stun.NATType)
  159. if v == stun.NATUnknown || v == stun.NATError {
  160. return "unknown"
  161. }
  162. return v.String()
  163. }
  164. func (t *kcpListener) stunRenewal(listener net.PacketConn) {
  165. client := stun.NewClientWithConnection(listener)
  166. client.SetSoftwareName("syncthing")
  167. var natType stun.NATType
  168. var extAddr *stun.Host
  169. var udpAddr *net.UDPAddr
  170. var err error
  171. oldType := stun.NATUnknown
  172. for {
  173. disabled:
  174. if t.cfg.Options().StunKeepaliveS < 1 {
  175. time.Sleep(time.Second)
  176. oldType = stun.NATUnknown
  177. t.nat.Store(stun.NATUnknown)
  178. t.mut.Lock()
  179. t.address = nil
  180. t.mut.Unlock()
  181. continue
  182. }
  183. for _, addr := range t.cfg.StunServers() {
  184. // Resolve the address, so that in case the server advertises two
  185. // IPs, we always hit the same one, as otherwise, the mapping might
  186. // expire as we hit the other address, and cause us to flip flop
  187. // between servers/external addresses, as a result flooding discovery
  188. // servers.
  189. udpAddr, err = net.ResolveUDPAddr("udp", addr)
  190. if err != nil {
  191. l.Debugf("%s stun addr resolution on %s: %s", t.uri, addr, err)
  192. continue
  193. }
  194. client.SetServerAddr(udpAddr.String())
  195. natType, extAddr, err = client.Discover()
  196. if err != nil || extAddr == nil {
  197. l.Debugf("%s stun discovery on %s: %s", t.uri, addr, err)
  198. continue
  199. }
  200. // The stun server is most likely borked, try another one.
  201. if natType == stun.NATError || natType == stun.NATUnknown || natType == stun.NATBlocked {
  202. l.Debugf("%s stun discovery on %s resolved to %s", t.uri, addr, natType)
  203. continue
  204. }
  205. if oldType != natType {
  206. l.Infof("%s detected NAT type: %s", t.uri, natType)
  207. t.nat.Store(natType)
  208. oldType = natType
  209. }
  210. // We can't punch through this one, so no point doing keepalives
  211. // and such, just try again in a minute and hope that the NAT type changes.
  212. if !isPunchable(natType) {
  213. break
  214. }
  215. for {
  216. changed := false
  217. uri := *t.uri
  218. uri.Host = extAddr.TransportAddr()
  219. t.mut.Lock()
  220. if t.address == nil || t.address.String() != uri.String() {
  221. l.Infof("%s resolved external address %s (via %s)", t.uri, uri.String(), addr)
  222. t.address = &uri
  223. changed = true
  224. }
  225. t.mut.Unlock()
  226. // This will most likely result in a call to WANAddresses() which tries to
  227. // get t.mut, so notify while unlocked.
  228. if changed {
  229. t.notifyAddressesChanged(t)
  230. }
  231. select {
  232. case <-time.After(time.Duration(t.cfg.Options().StunKeepaliveS) * time.Second):
  233. case <-t.stop:
  234. return
  235. }
  236. if t.cfg.Options().StunKeepaliveS < 1 {
  237. goto disabled
  238. }
  239. extAddr, err = client.Keepalive()
  240. if err != nil {
  241. l.Debugf("%s stun keepalive on %s: %s (%v)", t.uri, addr, err, extAddr)
  242. break
  243. }
  244. }
  245. }
  246. // We failed to contact all provided stun servers or the nat is not punchable.
  247. // Chillout for a while.
  248. time.Sleep(stunRetryInterval)
  249. }
  250. }
  251. type kcpListenerFactory struct{}
  252. func (f *kcpListenerFactory) New(uri *url.URL, cfg *config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener {
  253. l := &kcpListener{
  254. uri: fixupPort(uri, config.DefaultKCPPort),
  255. cfg: cfg,
  256. tlsCfg: tlsCfg,
  257. conns: conns,
  258. stop: make(chan struct{}),
  259. factory: f,
  260. }
  261. l.nat.Store(stun.NATUnknown)
  262. return l
  263. }
  264. func (kcpListenerFactory) Enabled(cfg config.Configuration) bool {
  265. return true
  266. }
  267. func isPunchable(natType stun.NATType) bool {
  268. return natType == stun.NATNone || natType == stun.NATPortRestricted || natType == stun.NATRestricted || natType == stun.NATFull
  269. }