tcp_listen.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "context"
  9. "crypto/tls"
  10. "net"
  11. "net/url"
  12. "sync"
  13. "time"
  14. "github.com/syncthing/syncthing/lib/config"
  15. "github.com/syncthing/syncthing/lib/connections/registry"
  16. "github.com/syncthing/syncthing/lib/dialer"
  17. "github.com/syncthing/syncthing/lib/nat"
  18. "github.com/syncthing/syncthing/lib/svcutil"
  19. )
  20. func init() {
  21. factory := &tcpListenerFactory{}
  22. for _, scheme := range []string{"tcp", "tcp4", "tcp6"} {
  23. listeners[scheme] = factory
  24. }
  25. }
  26. type tcpListener struct {
  27. svcutil.ServiceWithError
  28. onAddressesChangedNotifier
  29. uri *url.URL
  30. cfg config.Wrapper
  31. tlsCfg *tls.Config
  32. conns chan internalConn
  33. factory listenerFactory
  34. registry *registry.Registry
  35. lanChecker *lanChecker
  36. natService *nat.Service
  37. mapping *nat.Mapping
  38. laddr net.Addr
  39. mut sync.RWMutex
  40. }
  41. func (t *tcpListener) serve(ctx context.Context) error {
  42. tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host)
  43. if err != nil {
  44. l.Infoln("Listen (BEP/tcp):", err)
  45. return err
  46. }
  47. lc := net.ListenConfig{
  48. Control: dialer.ReusePortControl,
  49. }
  50. listener, err := lc.Listen(context.TODO(), t.uri.Scheme, tcaddr.String())
  51. if err != nil {
  52. l.Infoln("Listen (BEP/tcp):", err)
  53. return err
  54. }
  55. defer listener.Close()
  56. // We might bind to :0, so use the port we've been given.
  57. tcaddr = listener.Addr().(*net.TCPAddr)
  58. t.notifyAddressesChanged(t)
  59. defer t.clearAddresses(t)
  60. t.registry.Register(t.uri.Scheme, tcaddr)
  61. defer t.registry.Unregister(t.uri.Scheme, tcaddr)
  62. l.Infof("TCP listener (%v) starting", tcaddr)
  63. defer l.Infof("TCP listener (%v) shutting down", tcaddr)
  64. var ipVersion nat.IPVersion
  65. if t.uri.Scheme == "tcp4" {
  66. ipVersion = nat.IPv4Only
  67. } else if t.uri.Scheme == "tcp6" {
  68. ipVersion = nat.IPv6Only
  69. } else {
  70. ipVersion = nat.IPvAny
  71. }
  72. mapping := t.natService.NewMapping(nat.TCP, ipVersion, tcaddr.IP, tcaddr.Port)
  73. mapping.OnChanged(func() {
  74. t.notifyAddressesChanged(t)
  75. })
  76. // Should be called after t.mapping is nil'ed out.
  77. defer t.natService.RemoveMapping(mapping)
  78. t.mut.Lock()
  79. t.mapping = mapping
  80. t.laddr = tcaddr
  81. t.mut.Unlock()
  82. defer func() {
  83. t.mut.Lock()
  84. t.mapping = nil
  85. t.laddr = nil
  86. t.mut.Unlock()
  87. }()
  88. acceptFailures := 0
  89. const maxAcceptFailures = 10
  90. // :(, but what can you do.
  91. tcpListener := listener.(*net.TCPListener)
  92. for {
  93. _ = tcpListener.SetDeadline(time.Now().Add(time.Second))
  94. conn, err := tcpListener.Accept()
  95. select {
  96. case <-ctx.Done():
  97. if err == nil {
  98. conn.Close()
  99. }
  100. return nil
  101. default:
  102. }
  103. if err != nil {
  104. if err, ok := err.(*net.OpError); !ok || !err.Timeout() {
  105. l.Warnln("Listen (BEP/tcp): Accepting connection:", err)
  106. acceptFailures++
  107. if acceptFailures > maxAcceptFailures {
  108. // Return to restart the listener, because something
  109. // seems permanently damaged.
  110. return err
  111. }
  112. // Slightly increased delay for each failure.
  113. time.Sleep(time.Duration(acceptFailures) * time.Second)
  114. }
  115. continue
  116. }
  117. acceptFailures = 0
  118. l.Debugln("Listen (BEP/tcp): connect from", conn.RemoteAddr())
  119. if err := dialer.SetTCPOptions(conn); err != nil {
  120. l.Debugln("Listen (BEP/tcp): setting tcp options:", err)
  121. }
  122. if tc := t.cfg.Options().TrafficClass; tc != 0 {
  123. if err := dialer.SetTrafficClass(conn, tc); err != nil {
  124. l.Debugln("Listen (BEP/tcp): setting traffic class:", err)
  125. }
  126. }
  127. tc := tls.Server(conn, t.tlsCfg)
  128. if err := tlsTimedHandshake(tc); err != nil {
  129. l.Infoln("Listen (BEP/tcp): TLS handshake:", err)
  130. tc.Close()
  131. continue
  132. }
  133. priority := t.cfg.Options().ConnectionPriorityTCPWAN
  134. isLocal := t.lanChecker.isLAN(conn.RemoteAddr())
  135. if isLocal {
  136. priority = t.cfg.Options().ConnectionPriorityTCPLAN
  137. }
  138. t.conns <- newInternalConn(tc, connTypeTCPServer, isLocal, priority)
  139. }
  140. }
  141. func (t *tcpListener) URI() *url.URL {
  142. return t.uri
  143. }
  144. func (t *tcpListener) WANAddresses() []*url.URL {
  145. t.mut.RLock()
  146. uris := []*url.URL{
  147. maybeReplacePort(t.uri, t.laddr),
  148. }
  149. if t.mapping != nil {
  150. addrs := t.mapping.ExternalAddresses()
  151. for _, addr := range addrs {
  152. uri := *t.uri
  153. // Does net.JoinHostPort internally
  154. uri.Host = addr.String()
  155. uris = append(uris, &uri)
  156. // For every address with a specified IP, add one without an IP,
  157. // just in case the specified IP is still internal (router behind DMZ).
  158. if len(addr.IP) != 0 && !addr.IP.IsUnspecified() {
  159. zeroUri := *t.uri
  160. addr.IP = nil
  161. zeroUri.Host = addr.String()
  162. uris = append(uris, &zeroUri)
  163. }
  164. }
  165. }
  166. t.mut.RUnlock()
  167. // If we support ReusePort, add an unspecified zero port address, which will be resolved by the discovery server
  168. // in hopes that TCP punch through works.
  169. if dialer.SupportsReusePort {
  170. uri := *t.uri
  171. uri.Host = "0.0.0.0:0"
  172. uris = append([]*url.URL{&uri}, uris...)
  173. }
  174. return uris
  175. }
  176. func (t *tcpListener) LANAddresses() []*url.URL {
  177. t.mut.RLock()
  178. uri := maybeReplacePort(t.uri, t.laddr)
  179. t.mut.RUnlock()
  180. addrs := []*url.URL{uri}
  181. addrs = append(addrs, getURLsForAllAdaptersIfUnspecified(uri.Scheme, uri)...)
  182. return addrs
  183. }
  184. func (t *tcpListener) String() string {
  185. return t.uri.String()
  186. }
  187. func (t *tcpListener) Factory() listenerFactory {
  188. return t.factory
  189. }
  190. func (*tcpListener) NATType() string {
  191. return "unknown"
  192. }
  193. type tcpListenerFactory struct{}
  194. func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service, registry *registry.Registry, lanChecker *lanChecker) genericListener {
  195. l := &tcpListener{
  196. uri: fixupPort(uri, config.DefaultTCPPort),
  197. cfg: cfg,
  198. tlsCfg: tlsCfg,
  199. conns: conns,
  200. natService: natService,
  201. factory: f,
  202. registry: registry,
  203. lanChecker: lanChecker,
  204. }
  205. l.ServiceWithError = svcutil.AsService(l.serve, l.String())
  206. return l
  207. }
  208. func (tcpListenerFactory) Valid(_ config.Configuration) error {
  209. // Always valid
  210. return nil
  211. }