| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 | 
							- // Copyright (C) 2016 The Syncthing Authors.
 
- //
 
- // This Source Code Form is subject to the terms of the Mozilla Public
 
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
 
- // You can obtain one at http://mozilla.org/MPL/2.0/.
 
- package connections
 
- import (
 
- 	"crypto/tls"
 
- 	"net"
 
- 	"net/url"
 
- 	"strings"
 
- 	"sync"
 
- 	"sync/atomic"
 
- 	"time"
 
- 	"github.com/AudriusButkevicius/pfilter"
 
- 	"github.com/ccding/go-stun/stun"
 
- 	"github.com/xtaci/kcp-go"
 
- 	"github.com/xtaci/smux"
 
- 	"github.com/syncthing/syncthing/lib/config"
 
- 	"github.com/syncthing/syncthing/lib/nat"
 
- )
 
- const stunRetryInterval = 5 * time.Minute
 
- func init() {
 
- 	factory := &kcpListenerFactory{}
 
- 	for _, scheme := range []string{"kcp", "kcp4", "kcp6"} {
 
- 		listeners[scheme] = factory
 
- 	}
 
- }
 
- type kcpListener struct {
 
- 	onAddressesChangedNotifier
 
- 	uri     *url.URL
 
- 	cfg     *config.Wrapper
 
- 	tlsCfg  *tls.Config
 
- 	stop    chan struct{}
 
- 	conns   chan internalConn
 
- 	factory listenerFactory
 
- 	nat     atomic.Value
 
- 	address *url.URL
 
- 	err     error
 
- 	mut     sync.RWMutex
 
- }
 
- func (t *kcpListener) Serve() {
 
- 	t.mut.Lock()
 
- 	t.err = nil
 
- 	t.mut.Unlock()
 
- 	network := strings.Replace(t.uri.Scheme, "kcp", "udp", -1)
 
- 	packetConn, err := net.ListenPacket(network, t.uri.Host)
 
- 	if err != nil {
 
- 		t.mut.Lock()
 
- 		t.err = err
 
- 		t.mut.Unlock()
 
- 		l.Infoln("Listen (BEP/kcp):", err)
 
- 		return
 
- 	}
 
- 	filterConn := pfilter.NewPacketFilter(packetConn)
 
- 	kcpConn := filterConn.NewConn(kcpNoFilterPriority, nil)
 
- 	stunConn := filterConn.NewConn(kcpStunFilterPriority, &stunFilter{
 
- 		ids: make(map[string]time.Time),
 
- 	})
 
- 	filterConn.Start()
 
- 	registerFilter(filterConn)
 
- 	listener, err := kcp.ServeConn(nil, 0, 0, kcpConn)
 
- 	if err != nil {
 
- 		t.mut.Lock()
 
- 		t.err = err
 
- 		t.mut.Unlock()
 
- 		l.Infoln("Listen (BEP/kcp):", err)
 
- 		return
 
- 	}
 
- 	defer listener.Close()
 
- 	defer stunConn.Close()
 
- 	defer kcpConn.Close()
 
- 	defer deregisterFilter(filterConn)
 
- 	defer packetConn.Close()
 
- 	l.Infof("KCP listener (%v) starting", kcpConn.LocalAddr())
 
- 	defer l.Infof("KCP listener (%v) shutting down", kcpConn.LocalAddr())
 
- 	go t.stunRenewal(stunConn)
 
- 	for {
 
- 		listener.SetDeadline(time.Now().Add(time.Second))
 
- 		conn, err := listener.AcceptKCP()
 
- 		select {
 
- 		case <-t.stop:
 
- 			if err == nil {
 
- 				conn.Close()
 
- 			}
 
- 			return
 
- 		default:
 
- 		}
 
- 		if err != nil {
 
- 			if err, ok := err.(net.Error); !ok || !err.Timeout() {
 
- 				l.Warnln("Listen (BEP/kcp): Accepting connection:", err)
 
- 			}
 
- 			continue
 
- 		}
 
- 		opts := t.cfg.Options()
 
- 		conn.SetStreamMode(true)
 
- 		conn.SetACKNoDelay(false)
 
- 		conn.SetWindowSize(opts.KCPSendWindowSize, opts.KCPReceiveWindowSize)
 
- 		conn.SetNoDelay(boolInt(opts.KCPNoDelay), opts.KCPUpdateIntervalMs, boolInt(opts.KCPFastResend), boolInt(!opts.KCPCongestionControl))
 
- 		l.Debugln("connect from", conn.RemoteAddr())
 
- 		ses, err := smux.Server(conn, smuxConfig)
 
- 		if err != nil {
 
- 			l.Debugln("smux server:", err)
 
- 			conn.Close()
 
- 			continue
 
- 		}
 
- 		ses.SetDeadline(time.Now().Add(10 * time.Second))
 
- 		stream, err := ses.AcceptStream()
 
- 		if err != nil {
 
- 			l.Debugln("smux accept:", err)
 
- 			ses.Close()
 
- 			continue
 
- 		}
 
- 		ses.SetDeadline(time.Time{})
 
- 		tc := tls.Server(&sessionClosingStream{stream, ses}, t.tlsCfg)
 
- 		tc.SetDeadline(time.Now().Add(time.Second * 10))
 
- 		err = tc.Handshake()
 
- 		if err != nil {
 
- 			l.Debugln("TLS handshake (BEP/kcp):", err)
 
- 			tc.Close()
 
- 			continue
 
- 		}
 
- 		tc.SetDeadline(time.Time{})
 
- 		t.conns <- internalConn{tc, connTypeKCPServer, kcpPriority}
 
- 	}
 
- }
 
- func (t *kcpListener) Stop() {
 
- 	close(t.stop)
 
- }
 
- func (t *kcpListener) URI() *url.URL {
 
- 	return t.uri
 
- }
 
- func (t *kcpListener) WANAddresses() []*url.URL {
 
- 	uris := t.LANAddresses()
 
- 	t.mut.RLock()
 
- 	if t.address != nil {
 
- 		uris = append(uris, t.address)
 
- 	}
 
- 	t.mut.RUnlock()
 
- 	return uris
 
- }
 
- func (t *kcpListener) LANAddresses() []*url.URL {
 
- 	return []*url.URL{t.uri}
 
- }
 
- func (t *kcpListener) Error() error {
 
- 	t.mut.RLock()
 
- 	err := t.err
 
- 	t.mut.RUnlock()
 
- 	return err
 
- }
 
- func (t *kcpListener) String() string {
 
- 	return t.uri.String()
 
- }
 
- func (t *kcpListener) Factory() listenerFactory {
 
- 	return t.factory
 
- }
 
- func (t *kcpListener) NATType() string {
 
- 	v := t.nat.Load().(stun.NATType)
 
- 	if v == stun.NATUnknown || v == stun.NATError {
 
- 		return "unknown"
 
- 	}
 
- 	return v.String()
 
- }
 
- func (t *kcpListener) stunRenewal(listener net.PacketConn) {
 
- 	client := stun.NewClientWithConnection(listener)
 
- 	client.SetSoftwareName("syncthing")
 
- 	var natType stun.NATType
 
- 	var extAddr *stun.Host
 
- 	var udpAddr *net.UDPAddr
 
- 	var err error
 
- 	oldType := stun.NATUnknown
 
- 	for {
 
- 	disabled:
 
- 		if t.cfg.Options().StunKeepaliveS < 1 {
 
- 			time.Sleep(time.Second)
 
- 			oldType = stun.NATUnknown
 
- 			t.nat.Store(stun.NATUnknown)
 
- 			t.mut.Lock()
 
- 			t.address = nil
 
- 			t.mut.Unlock()
 
- 			continue
 
- 		}
 
- 		for _, addr := range t.cfg.StunServers() {
 
- 			// Resolve the address, so that in case the server advertises two
 
- 			// IPs, we always hit the same one, as otherwise, the mapping might
 
- 			// expire as we hit the other address, and cause us to flip flop
 
- 			// between servers/external addresses, as a result flooding discovery
 
- 			// servers.
 
- 			udpAddr, err = net.ResolveUDPAddr("udp", addr)
 
- 			if err != nil {
 
- 				l.Debugf("%s stun addr resolution on %s: %s", t.uri, addr, err)
 
- 				continue
 
- 			}
 
- 			client.SetServerAddr(udpAddr.String())
 
- 			natType, extAddr, err = client.Discover()
 
- 			if err != nil || extAddr == nil {
 
- 				l.Debugf("%s stun discovery on %s: %s", t.uri, addr, err)
 
- 				continue
 
- 			}
 
- 			// The stun server is most likely borked, try another one.
 
- 			if natType == stun.NATError || natType == stun.NATUnknown || natType == stun.NATBlocked {
 
- 				l.Debugf("%s stun discovery on %s resolved to %s", t.uri, addr, natType)
 
- 				continue
 
- 			}
 
- 			if oldType != natType {
 
- 				l.Infof("%s detected NAT type: %s", t.uri, natType)
 
- 				t.nat.Store(natType)
 
- 				oldType = natType
 
- 			}
 
- 			// We can't punch through this one, so no point doing keepalives
 
- 			// and such, just try again in a minute and hope that the NAT type changes.
 
- 			if !isPunchable(natType) {
 
- 				break
 
- 			}
 
- 			for {
 
- 				changed := false
 
- 				uri := *t.uri
 
- 				uri.Host = extAddr.TransportAddr()
 
- 				t.mut.Lock()
 
- 				if t.address == nil || t.address.String() != uri.String() {
 
- 					l.Infof("%s resolved external address %s (via %s)", t.uri, uri.String(), addr)
 
- 					t.address = &uri
 
- 					changed = true
 
- 				}
 
- 				t.mut.Unlock()
 
- 				// This will most likely result in a call to WANAddresses() which tries to
 
- 				// get t.mut, so notify while unlocked.
 
- 				if changed {
 
- 					t.notifyAddressesChanged(t)
 
- 				}
 
- 				select {
 
- 				case <-time.After(time.Duration(t.cfg.Options().StunKeepaliveS) * time.Second):
 
- 				case <-t.stop:
 
- 					return
 
- 				}
 
- 				if t.cfg.Options().StunKeepaliveS < 1 {
 
- 					goto disabled
 
- 				}
 
- 				extAddr, err = client.Keepalive()
 
- 				if err != nil {
 
- 					l.Debugf("%s stun keepalive on %s: %s (%v)", t.uri, addr, err, extAddr)
 
- 					break
 
- 				}
 
- 			}
 
- 		}
 
- 		// We failed to contact all provided stun servers or the nat is not punchable.
 
- 		// Chillout for a while.
 
- 		time.Sleep(stunRetryInterval)
 
- 	}
 
- }
 
- type kcpListenerFactory struct{}
 
- func (f *kcpListenerFactory) New(uri *url.URL, cfg *config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener {
 
- 	l := &kcpListener{
 
- 		uri:     fixupPort(uri, config.DefaultKCPPort),
 
- 		cfg:     cfg,
 
- 		tlsCfg:  tlsCfg,
 
- 		conns:   conns,
 
- 		stop:    make(chan struct{}),
 
- 		factory: f,
 
- 	}
 
- 	l.nat.Store(stun.NATUnknown)
 
- 	return l
 
- }
 
- func (kcpListenerFactory) Enabled(cfg config.Configuration) bool {
 
- 	return true
 
- }
 
- func isPunchable(natType stun.NATType) bool {
 
- 	return natType == stun.NATNone || natType == stun.NATPortRestricted || natType == stun.NATRestricted || natType == stun.NATFull
 
- }
 
 
  |