| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- // Copyright (C) 2019 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at http://mozilla.org/MPL/2.0/.
- //go:build go1.15 && !noquic
- // +build go1.15,!noquic
- package connections
- import (
- "context"
- "crypto/tls"
- "net"
- "net/url"
- "sync"
- "sync/atomic"
- "time"
- "github.com/quic-go/quic-go"
- "github.com/syncthing/syncthing/lib/config"
- "github.com/syncthing/syncthing/lib/connections/registry"
- "github.com/syncthing/syncthing/lib/nat"
- "github.com/syncthing/syncthing/lib/stun"
- "github.com/syncthing/syncthing/lib/svcutil"
- )
- func init() {
- factory := &quicListenerFactory{}
- for _, scheme := range []string{"quic", "quic4", "quic6"} {
- listeners[scheme] = factory
- }
- }
- type quicListener struct {
- svcutil.ServiceWithError
- nat atomic.Uint64 // Holds a stun.NATType.
- onAddressesChangedNotifier
- uri *url.URL
- cfg config.Wrapper
- tlsCfg *tls.Config
- conns chan internalConn
- factory listenerFactory
- registry *registry.Registry
- lanChecker *lanChecker
- address *url.URL
- laddr net.Addr
- mut sync.Mutex
- }
- func (t *quicListener) OnNATTypeChanged(natType stun.NATType) {
- if natType != stun.NATUnknown {
- l.Infof("%s detected NAT type: %s", t.uri, natType)
- }
- t.nat.Store(uint64(natType))
- }
- func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string) {
- var uri *url.URL
- if address != nil {
- copy := *t.uri
- uri = ©
- uri.Host = address.TransportAddr()
- }
- t.mut.Lock()
- existingAddress := t.address
- t.address = uri
- t.mut.Unlock()
- if uri != nil && (existingAddress == nil || existingAddress.String() != uri.String()) {
- l.Infof("%s resolved external address %s (via %s)", t.uri, uri.String(), via)
- t.notifyAddressesChanged(t)
- } else if uri == nil && existingAddress != nil {
- t.notifyAddressesChanged(t)
- }
- }
- func (t *quicListener) serve(ctx context.Context) error {
- network := quicNetwork(t.uri)
- udpAddr, err := net.ResolveUDPAddr(network, t.uri.Host)
- if err != nil {
- l.Infoln("Listen (BEP/quic):", err)
- return err
- }
- udpConn, err := net.ListenUDP(network, udpAddr)
- if err != nil {
- l.Infoln("Listen (BEP/quic):", err)
- return err
- }
- defer udpConn.Close()
- tracer := &writeTrackingTracer{}
- quicTransport := &quic.Transport{
- Conn: udpConn,
- Tracer: tracer,
- }
- defer quicTransport.Close()
- svc := stun.New(t.cfg, t, &transportPacketConn{tran: quicTransport}, tracer)
- go svc.Serve(ctx)
- t.registry.Register(t.uri.Scheme, quicTransport)
- defer t.registry.Unregister(t.uri.Scheme, quicTransport)
- listener, err := quicTransport.Listen(t.tlsCfg, quicConfig)
- if err != nil {
- l.Infoln("Listen (BEP/quic):", err)
- return err
- }
- defer listener.Close()
- t.notifyAddressesChanged(t)
- defer t.clearAddresses(t)
- l.Infof("QUIC listener (%v) starting", udpConn.LocalAddr())
- defer l.Infof("QUIC listener (%v) shutting down", udpConn.LocalAddr())
- t.mut.Lock()
- t.laddr = udpConn.LocalAddr()
- t.mut.Unlock()
- defer func() {
- t.mut.Lock()
- t.laddr = nil
- t.mut.Unlock()
- }()
- acceptFailures := 0
- const maxAcceptFailures = 10
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- session, err := listener.Accept(ctx)
- if err == context.Canceled {
- return nil
- } else if err != nil {
- l.Infoln("Listen (BEP/quic): Accepting connection:", err)
- acceptFailures++
- if acceptFailures > maxAcceptFailures {
- // Return to restart the listener, because something
- // seems permanently damaged.
- return err
- }
- // Slightly increased delay for each failure.
- time.Sleep(time.Duration(acceptFailures) * time.Second)
- continue
- }
- acceptFailures = 0
- l.Debugln("connect from", session.RemoteAddr())
- streamCtx, cancel := context.WithTimeout(ctx, quicOperationTimeout)
- stream, err := session.AcceptStream(streamCtx)
- cancel()
- if err != nil {
- l.Debugf("failed to accept stream from %s: %v", session.RemoteAddr(), err)
- _ = session.CloseWithError(1, err.Error())
- continue
- }
- priority := t.cfg.Options().ConnectionPriorityQUICWAN
- isLocal := t.lanChecker.isLAN(session.RemoteAddr())
- if isLocal {
- priority = t.cfg.Options().ConnectionPriorityQUICLAN
- }
- t.conns <- newInternalConn(&quicTlsConn{session, stream, nil}, connTypeQUICServer, isLocal, priority)
- }
- }
- func (t *quicListener) URI() *url.URL {
- return t.uri
- }
- func (t *quicListener) WANAddresses() []*url.URL {
- t.mut.Lock()
- uris := []*url.URL{maybeReplacePort(t.uri, t.laddr)}
- if t.address != nil {
- uris = append(uris, t.address)
- }
- t.mut.Unlock()
- return uris
- }
- func (t *quicListener) LANAddresses() []*url.URL {
- t.mut.Lock()
- uri := maybeReplacePort(t.uri, t.laddr)
- t.mut.Unlock()
- addrs := []*url.URL{uri}
- network := quicNetwork(uri)
- addrs = append(addrs, getURLsForAllAdaptersIfUnspecified(network, uri)...)
- return addrs
- }
- func (t *quicListener) String() string {
- return t.uri.String()
- }
- func (t *quicListener) Factory() listenerFactory {
- return t.factory
- }
- func (t *quicListener) NATType() string {
- v := stun.NATType(t.nat.Load())
- if v == stun.NATUnknown || v == stun.NATError {
- return "unknown"
- }
- return v.String()
- }
- type quicListenerFactory struct{}
- func (*quicListenerFactory) Valid(config.Configuration) error {
- return nil
- }
- func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, _ *nat.Service, registry *registry.Registry, lanChecker *lanChecker) genericListener {
- l := &quicListener{
- uri: fixupPort(uri, config.DefaultQUICPort),
- cfg: cfg,
- tlsCfg: tlsCfg,
- conns: conns,
- factory: f,
- registry: registry,
- lanChecker: lanChecker,
- }
- l.ServiceWithError = svcutil.AsService(l.serve, l.String())
- l.nat.Store(uint64(stun.NATUnknown))
- return l
- }
- func (quicListenerFactory) Enabled(_ config.Configuration) bool {
- return true
- }
|