| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964 | 
							- package kcp
 
- import (
 
- 	"crypto/rand"
 
- 	"encoding/binary"
 
- 	"hash/crc32"
 
- 	"net"
 
- 	"sync"
 
- 	"sync/atomic"
 
- 	"time"
 
- 	"github.com/pkg/errors"
 
- 	"golang.org/x/net/ipv4"
 
- )
 
- type errTimeout struct {
 
- 	error
 
- }
 
- func (errTimeout) Timeout() bool   { return true }
 
- func (errTimeout) Temporary() bool { return true }
 
- func (errTimeout) Error() string   { return "i/o timeout" }
 
- const (
 
- 	// 16-bytes magic number for each packet
 
- 	nonceSize = 16
 
- 	// 4-bytes packet checksum
 
- 	crcSize = 4
 
- 	// overall crypto header size
 
- 	cryptHeaderSize = nonceSize + crcSize
 
- 	// maximum packet size
 
- 	mtuLimit = 1500
 
- 	// FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
 
- 	rxFECMulti = 3
 
- 	// accept backlog
 
- 	acceptBacklog = 128
 
- 	// prerouting(to session) queue
 
- 	qlen = 128
 
- )
 
- const (
 
- 	errBrokenPipe       = "broken pipe"
 
- 	errInvalidOperation = "invalid operation"
 
- )
 
- var (
 
- 	// global packet buffer
 
- 	// shared among sending/receiving/FEC
 
- 	xmitBuf sync.Pool
 
- )
 
- func init() {
 
- 	xmitBuf.New = func() interface{} {
 
- 		return make([]byte, mtuLimit)
 
- 	}
 
- }
 
- type (
 
- 	// UDPSession defines a KCP session implemented by UDP
 
- 	UDPSession struct {
 
- 		updaterIdx int            // record slice index in updater
 
- 		conn       net.PacketConn // the underlying packet connection
 
- 		kcp        *KCP           // KCP ARQ protocol
 
- 		l          *Listener      // point to the Listener if it's accepted by Listener
 
- 		block      BlockCrypt     // block encryption
 
- 		// kcp receiving is based on packets
 
- 		// recvbuf turns packets into stream
 
- 		recvbuf []byte
 
- 		bufptr  []byte
 
- 		// extended output buffer(with header)
 
- 		ext []byte
 
- 		// FEC
 
- 		fecDecoder *fecDecoder
 
- 		fecEncoder *fecEncoder
 
- 		// settings
 
- 		remote     net.Addr  // remote peer address
 
- 		rd         time.Time // read deadline
 
- 		wd         time.Time // write deadline
 
- 		headerSize int       // the overall header size added before KCP frame
 
- 		ackNoDelay bool      // send ack immediately for each incoming packet
 
- 		writeDelay bool      // delay kcp.flush() for Write() for bulk transfer
 
- 		dup        int       // duplicate udp packets
 
- 		// notifications
 
- 		die          chan struct{} // notify session has Closed
 
- 		chReadEvent  chan struct{} // notify Read() can be called without blocking
 
- 		chWriteEvent chan struct{} // notify Write() can be called without blocking
 
- 		chErrorEvent chan error    // notify Read() have an error
 
- 		// nonce generator
 
- 		nonce nonceMD5
 
- 		isClosed bool // flag the session has Closed
 
- 		mu       sync.Mutex
 
- 	}
 
- 	setReadBuffer interface {
 
- 		SetReadBuffer(bytes int) error
 
- 	}
 
- 	setWriteBuffer interface {
 
- 		SetWriteBuffer(bytes int) error
 
- 	}
 
- )
 
- // newUDPSession create a new udp session for client or server
 
- func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
 
- 	sess := new(UDPSession)
 
- 	sess.die = make(chan struct{})
 
- 	sess.chReadEvent = make(chan struct{}, 1)
 
- 	sess.chWriteEvent = make(chan struct{}, 1)
 
- 	sess.chErrorEvent = make(chan error, 1)
 
- 	sess.remote = remote
 
- 	sess.conn = conn
 
- 	sess.l = l
 
- 	sess.block = block
 
- 	sess.recvbuf = make([]byte, mtuLimit)
 
- 	// FEC initialization
 
- 	sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
 
- 	if sess.block != nil {
 
- 		sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
 
- 	} else {
 
- 		sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
 
- 	}
 
- 	// calculate header size
 
- 	if sess.block != nil {
 
- 		sess.headerSize += cryptHeaderSize
 
- 	}
 
- 	if sess.fecEncoder != nil {
 
- 		sess.headerSize += fecHeaderSizePlus2
 
- 	}
 
- 	// only allocate extended packet buffer
 
- 	// when the extra header is required
 
- 	if sess.headerSize > 0 {
 
- 		sess.ext = make([]byte, mtuLimit)
 
- 	}
 
- 	sess.kcp = NewKCP(conv, func(buf []byte, size int) {
 
- 		if size >= IKCP_OVERHEAD {
 
- 			sess.output(buf[:size])
 
- 		}
 
- 	})
 
- 	sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
 
- 	blacklist.add(remote.String(), conv)
 
- 	// add current session to the global updater,
 
- 	// which periodically calls sess.update()
 
- 	updater.addSession(sess)
 
- 	if sess.l == nil { // it's a client connection
 
- 		go sess.readLoop()
 
- 		atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
 
- 	} else {
 
- 		atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
 
- 	}
 
- 	currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
 
- 	maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
 
- 	if currestab > maxconn {
 
- 		atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab)
 
- 	}
 
- 	return sess
 
- }
 
- // Read implements net.Conn
 
- func (s *UDPSession) Read(b []byte) (n int, err error) {
 
- 	for {
 
- 		s.mu.Lock()
 
- 		if len(s.bufptr) > 0 { // copy from buffer into b
 
- 			n = copy(b, s.bufptr)
 
- 			s.bufptr = s.bufptr[n:]
 
- 			s.mu.Unlock()
 
- 			return n, nil
 
- 		}
 
- 		if s.isClosed {
 
- 			s.mu.Unlock()
 
- 			return 0, errors.New(errBrokenPipe)
 
- 		}
 
- 		if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
 
- 			atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
 
- 			if len(b) >= size { // direct write to b
 
- 				s.kcp.Recv(b)
 
- 				s.mu.Unlock()
 
- 				return size, nil
 
- 			}
 
- 			// resize kcp receive buffer
 
- 			// to make sure recvbuf has enough capacity
 
- 			if cap(s.recvbuf) < size {
 
- 				s.recvbuf = make([]byte, size)
 
- 			}
 
- 			// resize recvbuf slice length
 
- 			s.recvbuf = s.recvbuf[:size]
 
- 			s.kcp.Recv(s.recvbuf)
 
- 			n = copy(b, s.recvbuf)   // copy to b
 
- 			s.bufptr = s.recvbuf[n:] // update pointer
 
- 			s.mu.Unlock()
 
- 			return n, nil
 
- 		}
 
- 		// read deadline
 
- 		var timeout *time.Timer
 
- 		var c <-chan time.Time
 
- 		if !s.rd.IsZero() {
 
- 			if time.Now().After(s.rd) {
 
- 				s.mu.Unlock()
 
- 				return 0, errTimeout{}
 
- 			}
 
- 			delay := s.rd.Sub(time.Now())
 
- 			timeout = time.NewTimer(delay)
 
- 			c = timeout.C
 
- 		}
 
- 		s.mu.Unlock()
 
- 		// wait for read event or timeout
 
- 		select {
 
- 		case <-s.chReadEvent:
 
- 		case <-c:
 
- 		case <-s.die:
 
- 		case err = <-s.chErrorEvent:
 
- 			if timeout != nil {
 
- 				timeout.Stop()
 
- 			}
 
- 			return n, err
 
- 		}
 
- 		if timeout != nil {
 
- 			timeout.Stop()
 
- 		}
 
- 	}
 
- }
 
- // Write implements net.Conn
 
- func (s *UDPSession) Write(b []byte) (n int, err error) {
 
- 	for {
 
- 		s.mu.Lock()
 
- 		if s.isClosed {
 
- 			s.mu.Unlock()
 
- 			return 0, errors.New(errBrokenPipe)
 
- 		}
 
- 		// api flow control
 
- 		if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
 
- 			n = len(b)
 
- 			for {
 
- 				if len(b) <= int(s.kcp.mss) {
 
- 					s.kcp.Send(b)
 
- 					break
 
- 				} else {
 
- 					s.kcp.Send(b[:s.kcp.mss])
 
- 					b = b[s.kcp.mss:]
 
- 				}
 
- 			}
 
- 			if !s.writeDelay {
 
- 				s.kcp.flush(false)
 
- 			}
 
- 			s.mu.Unlock()
 
- 			atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
 
- 			return n, nil
 
- 		}
 
- 		// write deadline
 
- 		var timeout *time.Timer
 
- 		var c <-chan time.Time
 
- 		if !s.wd.IsZero() {
 
- 			if time.Now().After(s.wd) {
 
- 				s.mu.Unlock()
 
- 				return 0, errTimeout{}
 
- 			}
 
- 			delay := s.wd.Sub(time.Now())
 
- 			timeout = time.NewTimer(delay)
 
- 			c = timeout.C
 
- 		}
 
- 		s.mu.Unlock()
 
- 		// wait for write event or timeout
 
- 		select {
 
- 		case <-s.chWriteEvent:
 
- 		case <-c:
 
- 		case <-s.die:
 
- 		}
 
- 		if timeout != nil {
 
- 			timeout.Stop()
 
- 		}
 
- 	}
 
- }
 
- // Close closes the connection.
 
- func (s *UDPSession) Close() error {
 
- 	// remove this session from updater & listener(if necessary)
 
- 	updater.removeSession(s)
 
- 	if s.l != nil { // notify listener
 
- 		s.l.closeSession(sessionKey{
 
- 			addr:   s.remote.String(),
 
- 			convID: s.kcp.conv,
 
- 		})
 
- 	}
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	if s.isClosed {
 
- 		return errors.New(errBrokenPipe)
 
- 	}
 
- 	close(s.die)
 
- 	s.isClosed = true
 
- 	atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
 
- 	if s.l == nil { // client socket close
 
- 		return s.conn.Close()
 
- 	}
 
- 	return nil
 
- }
 
- // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
 
- func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() }
 
- // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
 
- func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
 
- // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
 
- func (s *UDPSession) SetDeadline(t time.Time) error {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.rd = t
 
- 	s.wd = t
 
- 	return nil
 
- }
 
- // SetReadDeadline implements the Conn SetReadDeadline method.
 
- func (s *UDPSession) SetReadDeadline(t time.Time) error {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.rd = t
 
- 	return nil
 
- }
 
- // SetWriteDeadline implements the Conn SetWriteDeadline method.
 
- func (s *UDPSession) SetWriteDeadline(t time.Time) error {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.wd = t
 
- 	return nil
 
- }
 
- // SetWriteDelay delays write for bulk transfer until the next update interval
 
- func (s *UDPSession) SetWriteDelay(delay bool) {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.writeDelay = delay
 
- }
 
- // SetWindowSize set maximum window size
 
- func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.kcp.WndSize(sndwnd, rcvwnd)
 
- }
 
- // SetMtu sets the maximum transmission unit(not including UDP header)
 
- func (s *UDPSession) SetMtu(mtu int) bool {
 
- 	if mtu > mtuLimit {
 
- 		return false
 
- 	}
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.kcp.SetMtu(mtu - s.headerSize)
 
- 	return true
 
- }
 
- // SetStreamMode toggles the stream mode on/off
 
- func (s *UDPSession) SetStreamMode(enable bool) {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	if enable {
 
- 		s.kcp.stream = 1
 
- 	} else {
 
- 		s.kcp.stream = 0
 
- 	}
 
- }
 
- // SetACKNoDelay changes ack flush option, set true to flush ack immediately,
 
- func (s *UDPSession) SetACKNoDelay(nodelay bool) {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.ackNoDelay = nodelay
 
- }
 
- // SetDUP duplicates udp packets for kcp output, for testing purpose only
 
- func (s *UDPSession) SetDUP(dup int) {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.dup = dup
 
- }
 
- // SetNoDelay calls nodelay() of kcp
 
- // https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
 
- func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	s.kcp.NoDelay(nodelay, interval, resend, nc)
 
- }
 
- // SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
 
- func (s *UDPSession) SetDSCP(dscp int) error {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	if s.l == nil {
 
- 		if nc, ok := s.conn.(*connectedUDPConn); ok {
 
- 			return ipv4.NewConn(nc.UDPConn).SetTOS(dscp << 2)
 
- 		} else if nc, ok := s.conn.(net.Conn); ok {
 
- 			return ipv4.NewConn(nc).SetTOS(dscp << 2)
 
- 		}
 
- 	}
 
- 	return errors.New(errInvalidOperation)
 
- }
 
- // SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
 
- func (s *UDPSession) SetReadBuffer(bytes int) error {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	if s.l == nil {
 
- 		if nc, ok := s.conn.(setReadBuffer); ok {
 
- 			return nc.SetReadBuffer(bytes)
 
- 		}
 
- 	}
 
- 	return errors.New(errInvalidOperation)
 
- }
 
- // SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
 
- func (s *UDPSession) SetWriteBuffer(bytes int) error {
 
- 	s.mu.Lock()
 
- 	defer s.mu.Unlock()
 
- 	if s.l == nil {
 
- 		if nc, ok := s.conn.(setWriteBuffer); ok {
 
- 			return nc.SetWriteBuffer(bytes)
 
- 		}
 
- 	}
 
- 	return errors.New(errInvalidOperation)
 
- }
 
- // output pipeline entry
 
- // steps for output data processing:
 
- // 0. Header extends
 
- // 1. FEC
 
- // 2. CRC32
 
- // 3. Encryption
 
- // 4. WriteTo kernel
 
- func (s *UDPSession) output(buf []byte) {
 
- 	var ecc [][]byte
 
- 	// 0. extend buf's header space(if necessary)
 
- 	ext := buf
 
- 	if s.headerSize > 0 {
 
- 		ext = s.ext[:s.headerSize+len(buf)]
 
- 		copy(ext[s.headerSize:], buf)
 
- 	}
 
- 	// 1. FEC encoding
 
- 	if s.fecEncoder != nil {
 
- 		ecc = s.fecEncoder.encode(ext)
 
- 	}
 
- 	// 2&3. crc32 & encryption
 
- 	if s.block != nil {
 
- 		s.nonce.Fill(ext[:nonceSize])
 
- 		checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
 
- 		binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
 
- 		s.block.Encrypt(ext, ext)
 
- 		for k := range ecc {
 
- 			s.nonce.Fill(ecc[k][:nonceSize])
 
- 			checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
 
- 			binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
 
- 			s.block.Encrypt(ecc[k], ecc[k])
 
- 		}
 
- 	}
 
- 	// 4. WriteTo kernel
 
- 	nbytes := 0
 
- 	npkts := 0
 
- 	for i := 0; i < s.dup+1; i++ {
 
- 		if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
 
- 			nbytes += n
 
- 			npkts++
 
- 		}
 
- 	}
 
- 	for k := range ecc {
 
- 		if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
 
- 			nbytes += n
 
- 			npkts++
 
- 		}
 
- 	}
 
- 	atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
 
- 	atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
 
- }
 
- // kcp update, returns interval for next calling
 
- func (s *UDPSession) update() (interval time.Duration) {
 
- 	s.mu.Lock()
 
- 	s.kcp.flush(false)
 
- 	if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
 
- 		s.notifyWriteEvent()
 
- 	}
 
- 	interval = time.Duration(s.kcp.interval) * time.Millisecond
 
- 	s.mu.Unlock()
 
- 	return
 
- }
 
- // GetConv gets conversation id of a session
 
- func (s *UDPSession) GetConv() uint32 { return s.kcp.conv }
 
- func (s *UDPSession) notifyReadEvent() {
 
- 	select {
 
- 	case s.chReadEvent <- struct{}{}:
 
- 	default:
 
- 	}
 
- }
 
- func (s *UDPSession) notifyWriteEvent() {
 
- 	select {
 
- 	case s.chWriteEvent <- struct{}{}:
 
- 	default:
 
- 	}
 
- }
 
- func (s *UDPSession) kcpInput(data []byte) {
 
- 	var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
 
- 	if s.fecDecoder != nil {
 
- 		f := s.fecDecoder.decodeBytes(data)
 
- 		s.mu.Lock()
 
- 		if f.flag == typeData {
 
- 			if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
 
- 				kcpInErrors++
 
- 			}
 
- 		}
 
- 		if f.flag == typeData || f.flag == typeFEC {
 
- 			if f.flag == typeFEC {
 
- 				fecParityShards++
 
- 			}
 
- 			recovers := s.fecDecoder.decode(f)
 
- 			for _, r := range recovers {
 
- 				if len(r) >= 2 { // must be larger than 2bytes
 
- 					sz := binary.LittleEndian.Uint16(r)
 
- 					if int(sz) <= len(r) && sz >= 2 {
 
- 						if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
 
- 							fecRecovered++
 
- 						} else {
 
- 							kcpInErrors++
 
- 						}
 
- 					} else {
 
- 						fecErrs++
 
- 					}
 
- 				} else {
 
- 					fecErrs++
 
- 				}
 
- 			}
 
- 		}
 
- 		// notify reader
 
- 		if n := s.kcp.PeekSize(); n > 0 {
 
- 			s.notifyReadEvent()
 
- 		}
 
- 		s.mu.Unlock()
 
- 	} else {
 
- 		s.mu.Lock()
 
- 		if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
 
- 			kcpInErrors++
 
- 		}
 
- 		// notify reader
 
- 		if n := s.kcp.PeekSize(); n > 0 {
 
- 			s.notifyReadEvent()
 
- 		}
 
- 		s.mu.Unlock()
 
- 	}
 
- 	atomic.AddUint64(&DefaultSnmp.InPkts, 1)
 
- 	atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
 
- 	if fecParityShards > 0 {
 
- 		atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
 
- 	}
 
- 	if kcpInErrors > 0 {
 
- 		atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
 
- 	}
 
- 	if fecErrs > 0 {
 
- 		atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
 
- 	}
 
- 	if fecRecovered > 0 {
 
- 		atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
 
- 	}
 
- }
 
- func (s *UDPSession) receiver(ch chan<- []byte) {
 
- 	for {
 
- 		data := xmitBuf.Get().([]byte)[:mtuLimit]
 
- 		if n, _, err := s.conn.ReadFrom(data); err == nil && n >= s.headerSize+IKCP_OVERHEAD {
 
- 			select {
 
- 			case ch <- data[:n]:
 
- 			case <-s.die:
 
- 				return
 
- 			}
 
- 		} else if err != nil {
 
- 			s.chErrorEvent <- err
 
- 			return
 
- 		} else {
 
- 			atomic.AddUint64(&DefaultSnmp.InErrs, 1)
 
- 		}
 
- 	}
 
- }
 
- // read loop for client session
 
- func (s *UDPSession) readLoop() {
 
- 	chPacket := make(chan []byte, qlen)
 
- 	go s.receiver(chPacket)
 
- 	for {
 
- 		select {
 
- 		case data := <-chPacket:
 
- 			raw := data
 
- 			dataValid := false
 
- 			if s.block != nil {
 
- 				s.block.Decrypt(data, data)
 
- 				data = data[nonceSize:]
 
- 				checksum := crc32.ChecksumIEEE(data[crcSize:])
 
- 				if checksum == binary.LittleEndian.Uint32(data) {
 
- 					data = data[crcSize:]
 
- 					dataValid = true
 
- 				} else {
 
- 					atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
 
- 				}
 
- 			} else if s.block == nil {
 
- 				dataValid = true
 
- 			}
 
- 			if dataValid {
 
- 				s.kcpInput(data)
 
- 			}
 
- 			xmitBuf.Put(raw)
 
- 		case <-s.die:
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- type (
 
- 	sessionKey struct {
 
- 		addr   string
 
- 		convID uint32
 
- 	}
 
- 	// Listener defines a server listening for connections
 
- 	Listener struct {
 
- 		block        BlockCrypt     // block encryption
 
- 		dataShards   int            // FEC data shard
 
- 		parityShards int            // FEC parity shard
 
- 		fecDecoder   *fecDecoder    // FEC mock initialization
 
- 		conn         net.PacketConn // the underlying packet connection
 
- 		sessions        map[sessionKey]*UDPSession // all sessions accepted by this Listener
 
- 		chAccepts       chan *UDPSession           // Listen() backlog
 
- 		chSessionClosed chan sessionKey            // session close queue
 
- 		headerSize      int                        // the overall header size added before KCP frame
 
- 		die             chan struct{}              // notify the listener has closed
 
- 		rd              atomic.Value               // read deadline for Accept()
 
- 		wd              atomic.Value
 
- 	}
 
- 	// incoming packet
 
- 	inPacket struct {
 
- 		from net.Addr
 
- 		data []byte
 
- 	}
 
- )
 
- // monitor incoming data for all connections of server
 
- func (l *Listener) monitor() {
 
- 	// cache last session
 
- 	var lastKey sessionKey
 
- 	var lastSession *UDPSession
 
- 	chPacket := make(chan inPacket, qlen)
 
- 	go l.receiver(chPacket)
 
- 	for {
 
- 		select {
 
- 		case p := <-chPacket:
 
- 			raw := p.data
 
- 			data := p.data
 
- 			from := p.from
 
- 			dataValid := false
 
- 			if l.block != nil {
 
- 				l.block.Decrypt(data, data)
 
- 				data = data[nonceSize:]
 
- 				checksum := crc32.ChecksumIEEE(data[crcSize:])
 
- 				if checksum == binary.LittleEndian.Uint32(data) {
 
- 					data = data[crcSize:]
 
- 					dataValid = true
 
- 				} else {
 
- 					atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
 
- 				}
 
- 			} else if l.block == nil {
 
- 				dataValid = true
 
- 			}
 
- 			if dataValid {
 
- 				var conv uint32
 
- 				convValid := false
 
- 				if l.fecDecoder != nil {
 
- 					isfec := binary.LittleEndian.Uint16(data[4:])
 
- 					if isfec == typeData {
 
- 						conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
 
- 						convValid = true
 
- 					}
 
- 				} else {
 
- 					conv = binary.LittleEndian.Uint32(data)
 
- 					convValid = true
 
- 				}
 
- 				if convValid {
 
- 					key := sessionKey{
 
- 						addr:   from.String(),
 
- 						convID: conv,
 
- 					}
 
- 					var s *UDPSession
 
- 					var ok bool
 
- 					// packets received from an address always come in batch.
 
- 					// cache the session for next packet, without querying map.
 
- 					if key == lastKey {
 
- 						s, ok = lastSession, true
 
- 					} else if s, ok = l.sessions[key]; ok {
 
- 						lastSession = s
 
- 						lastKey = key
 
- 					}
 
- 					if !ok { // new session
 
- 						if !blacklist.has(from.String(), conv) && len(l.chAccepts) < cap(l.chAccepts) && len(l.sessions) < 4096 { // do not let new session overwhelm accept queue and connection count
 
- 							s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
 
- 							s.kcpInput(data)
 
- 							l.sessions[key] = s
 
- 							l.chAccepts <- s
 
- 						}
 
- 					} else {
 
- 						s.kcpInput(data)
 
- 					}
 
- 				}
 
- 			}
 
- 			xmitBuf.Put(raw)
 
- 		case key := <-l.chSessionClosed:
 
- 			if key == lastKey {
 
- 				lastKey = sessionKey{}
 
- 			}
 
- 			delete(l.sessions, key)
 
- 		case <-l.die:
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- func (l *Listener) receiver(ch chan<- inPacket) {
 
- 	for {
 
- 		data := xmitBuf.Get().([]byte)[:mtuLimit]
 
- 		if n, from, err := l.conn.ReadFrom(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD {
 
- 			select {
 
- 			case ch <- inPacket{from, data[:n]}:
 
- 			case <-l.die:
 
- 				return
 
- 			}
 
- 		} else if err != nil {
 
- 			return
 
- 		} else {
 
- 			atomic.AddUint64(&DefaultSnmp.InErrs, 1)
 
- 		}
 
- 	}
 
- }
 
- // SetReadBuffer sets the socket read buffer for the Listener
 
- func (l *Listener) SetReadBuffer(bytes int) error {
 
- 	if nc, ok := l.conn.(setReadBuffer); ok {
 
- 		return nc.SetReadBuffer(bytes)
 
- 	}
 
- 	return errors.New(errInvalidOperation)
 
- }
 
- // SetWriteBuffer sets the socket write buffer for the Listener
 
- func (l *Listener) SetWriteBuffer(bytes int) error {
 
- 	if nc, ok := l.conn.(setWriteBuffer); ok {
 
- 		return nc.SetWriteBuffer(bytes)
 
- 	}
 
- 	return errors.New(errInvalidOperation)
 
- }
 
- // SetDSCP sets the 6bit DSCP field of IP header
 
- func (l *Listener) SetDSCP(dscp int) error {
 
- 	if nc, ok := l.conn.(net.Conn); ok {
 
- 		return ipv4.NewConn(nc).SetTOS(dscp << 2)
 
- 	}
 
- 	return errors.New(errInvalidOperation)
 
- }
 
- // Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
 
- func (l *Listener) Accept() (net.Conn, error) {
 
- 	return l.AcceptKCP()
 
- }
 
- // AcceptKCP accepts a KCP connection
 
- func (l *Listener) AcceptKCP() (*UDPSession, error) {
 
- 	var timeout <-chan time.Time
 
- 	if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
 
- 		timeout = time.After(tdeadline.Sub(time.Now()))
 
- 	}
 
- 	select {
 
- 	case <-timeout:
 
- 		return nil, &errTimeout{}
 
- 	case c := <-l.chAccepts:
 
- 		return c, nil
 
- 	case <-l.die:
 
- 		return nil, errors.New(errBrokenPipe)
 
- 	}
 
- }
 
- // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
 
- func (l *Listener) SetDeadline(t time.Time) error {
 
- 	l.SetReadDeadline(t)
 
- 	l.SetWriteDeadline(t)
 
- 	return nil
 
- }
 
- // SetReadDeadline implements the Conn SetReadDeadline method.
 
- func (l *Listener) SetReadDeadline(t time.Time) error {
 
- 	l.rd.Store(t)
 
- 	return nil
 
- }
 
- // SetWriteDeadline implements the Conn SetWriteDeadline method.
 
- func (l *Listener) SetWriteDeadline(t time.Time) error {
 
- 	l.wd.Store(t)
 
- 	return nil
 
- }
 
- // Close stops listening on the UDP address. Already Accepted connections are not closed.
 
- func (l *Listener) Close() error {
 
- 	close(l.die)
 
- 	return l.conn.Close()
 
- }
 
- // closeSession notify the listener that a session has closed
 
- func (l *Listener) closeSession(key sessionKey) bool {
 
- 	select {
 
- 	case l.chSessionClosed <- key:
 
- 		return true
 
- 	case <-l.die:
 
- 		return false
 
- 	}
 
- }
 
- // Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
 
- func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
 
- // Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
 
- func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
 
- // ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
 
- // dataShards, parityShards defines Reed-Solomon Erasure Coding parameters
 
- func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
 
- 	udpaddr, err := net.ResolveUDPAddr("udp", laddr)
 
- 	if err != nil {
 
- 		return nil, errors.Wrap(err, "net.ResolveUDPAddr")
 
- 	}
 
- 	conn, err := net.ListenUDP("udp", udpaddr)
 
- 	if err != nil {
 
- 		return nil, errors.Wrap(err, "net.ListenUDP")
 
- 	}
 
- 	return ServeConn(block, dataShards, parityShards, conn)
 
- }
 
- // ServeConn serves KCP protocol for a single packet connection.
 
- func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
 
- 	l := new(Listener)
 
- 	l.conn = conn
 
- 	l.sessions = make(map[sessionKey]*UDPSession)
 
- 	l.chAccepts = make(chan *UDPSession, acceptBacklog)
 
- 	l.chSessionClosed = make(chan sessionKey)
 
- 	l.die = make(chan struct{})
 
- 	l.dataShards = dataShards
 
- 	l.parityShards = parityShards
 
- 	l.block = block
 
- 	l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
 
- 	// calculate header size
 
- 	if l.block != nil {
 
- 		l.headerSize += cryptHeaderSize
 
- 	}
 
- 	if l.fecDecoder != nil {
 
- 		l.headerSize += fecHeaderSizePlus2
 
- 	}
 
- 	go l.monitor()
 
- 	return l, nil
 
- }
 
- // Dial connects to the remote address "raddr" on the network "udp"
 
- func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
 
- // DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
 
- func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
 
- 	udpaddr, err := net.ResolveUDPAddr("udp", raddr)
 
- 	if err != nil {
 
- 		return nil, errors.Wrap(err, "net.ResolveUDPAddr")
 
- 	}
 
- 	udpconn, err := net.DialUDP("udp", nil, udpaddr)
 
- 	if err != nil {
 
- 		return nil, errors.Wrap(err, "net.DialUDP")
 
- 	}
 
- 	return NewConn(raddr, block, dataShards, parityShards, &connectedUDPConn{udpconn})
 
- }
 
- // NewConn establishes a session and talks KCP protocol over a packet connection.
 
- func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
 
- 	udpaddr, err := net.ResolveUDPAddr("udp", raddr)
 
- 	if err != nil {
 
- 		return nil, errors.Wrap(err, "net.ResolveUDPAddr")
 
- 	}
 
- 	var convid uint32
 
- 	binary.Read(rand.Reader, binary.LittleEndian, &convid)
 
- 	return newUDPSession(convid, dataShards, parityShards, nil, conn, udpaddr, block), nil
 
- }
 
- // returns current time in milliseconds
 
- func currentMs() uint32 { return uint32(time.Now().UnixNano() / int64(time.Millisecond)) }
 
- // connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
 
- // to Write syscalls that are 4 times faster on some OS'es. This should only be
 
- // used for connections that were produced by a net.Dial* call.
 
- type connectedUDPConn struct{ *net.UDPConn }
 
- // WriteTo redirects all writes to the Write syscall, which is 4 times faster.
 
- func (c *connectedUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { return c.Write(b) }
 
 
  |