sess.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964
  1. package kcp
  2. import (
  3. "crypto/rand"
  4. "encoding/binary"
  5. "hash/crc32"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/pkg/errors"
  11. "golang.org/x/net/ipv4"
  12. )
  13. type errTimeout struct {
  14. error
  15. }
  16. func (errTimeout) Timeout() bool { return true }
  17. func (errTimeout) Temporary() bool { return true }
  18. func (errTimeout) Error() string { return "i/o timeout" }
  19. const (
  20. // 16-bytes magic number for each packet
  21. nonceSize = 16
  22. // 4-bytes packet checksum
  23. crcSize = 4
  24. // overall crypto header size
  25. cryptHeaderSize = nonceSize + crcSize
  26. // maximum packet size
  27. mtuLimit = 1500
  28. // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
  29. rxFECMulti = 3
  30. // accept backlog
  31. acceptBacklog = 128
  32. // prerouting(to session) queue
  33. qlen = 128
  34. )
  35. const (
  36. errBrokenPipe = "broken pipe"
  37. errInvalidOperation = "invalid operation"
  38. )
  39. var (
  40. // global packet buffer
  41. // shared among sending/receiving/FEC
  42. xmitBuf sync.Pool
  43. )
  44. func init() {
  45. xmitBuf.New = func() interface{} {
  46. return make([]byte, mtuLimit)
  47. }
  48. }
  49. type (
  50. // UDPSession defines a KCP session implemented by UDP
  51. UDPSession struct {
  52. updaterIdx int // record slice index in updater
  53. conn net.PacketConn // the underlying packet connection
  54. kcp *KCP // KCP ARQ protocol
  55. l *Listener // point to the Listener if it's accepted by Listener
  56. block BlockCrypt // block encryption
  57. // kcp receiving is based on packets
  58. // recvbuf turns packets into stream
  59. recvbuf []byte
  60. bufptr []byte
  61. // extended output buffer(with header)
  62. ext []byte
  63. // FEC
  64. fecDecoder *fecDecoder
  65. fecEncoder *fecEncoder
  66. // settings
  67. remote net.Addr // remote peer address
  68. rd time.Time // read deadline
  69. wd time.Time // write deadline
  70. headerSize int // the overall header size added before KCP frame
  71. ackNoDelay bool // send ack immediately for each incoming packet
  72. writeDelay bool // delay kcp.flush() for Write() for bulk transfer
  73. dup int // duplicate udp packets
  74. // notifications
  75. die chan struct{} // notify session has Closed
  76. chReadEvent chan struct{} // notify Read() can be called without blocking
  77. chWriteEvent chan struct{} // notify Write() can be called without blocking
  78. chErrorEvent chan error // notify Read() have an error
  79. // nonce generator
  80. nonce nonceMD5
  81. isClosed bool // flag the session has Closed
  82. mu sync.Mutex
  83. }
  84. setReadBuffer interface {
  85. SetReadBuffer(bytes int) error
  86. }
  87. setWriteBuffer interface {
  88. SetWriteBuffer(bytes int) error
  89. }
  90. )
  91. // newUDPSession create a new udp session for client or server
  92. func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
  93. sess := new(UDPSession)
  94. sess.die = make(chan struct{})
  95. sess.chReadEvent = make(chan struct{}, 1)
  96. sess.chWriteEvent = make(chan struct{}, 1)
  97. sess.chErrorEvent = make(chan error, 1)
  98. sess.remote = remote
  99. sess.conn = conn
  100. sess.l = l
  101. sess.block = block
  102. sess.recvbuf = make([]byte, mtuLimit)
  103. // FEC initialization
  104. sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  105. if sess.block != nil {
  106. sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
  107. } else {
  108. sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
  109. }
  110. // calculate header size
  111. if sess.block != nil {
  112. sess.headerSize += cryptHeaderSize
  113. }
  114. if sess.fecEncoder != nil {
  115. sess.headerSize += fecHeaderSizePlus2
  116. }
  117. // only allocate extended packet buffer
  118. // when the extra header is required
  119. if sess.headerSize > 0 {
  120. sess.ext = make([]byte, mtuLimit)
  121. }
  122. sess.kcp = NewKCP(conv, func(buf []byte, size int) {
  123. if size >= IKCP_OVERHEAD {
  124. sess.output(buf[:size])
  125. }
  126. })
  127. sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
  128. blacklist.add(remote.String(), conv)
  129. // add current session to the global updater,
  130. // which periodically calls sess.update()
  131. updater.addSession(sess)
  132. if sess.l == nil { // it's a client connection
  133. go sess.readLoop()
  134. atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
  135. } else {
  136. atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
  137. }
  138. currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
  139. maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
  140. if currestab > maxconn {
  141. atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab)
  142. }
  143. return sess
  144. }
  145. // Read implements net.Conn
  146. func (s *UDPSession) Read(b []byte) (n int, err error) {
  147. for {
  148. s.mu.Lock()
  149. if len(s.bufptr) > 0 { // copy from buffer into b
  150. n = copy(b, s.bufptr)
  151. s.bufptr = s.bufptr[n:]
  152. s.mu.Unlock()
  153. return n, nil
  154. }
  155. if s.isClosed {
  156. s.mu.Unlock()
  157. return 0, errors.New(errBrokenPipe)
  158. }
  159. if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
  160. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
  161. if len(b) >= size { // direct write to b
  162. s.kcp.Recv(b)
  163. s.mu.Unlock()
  164. return size, nil
  165. }
  166. // resize kcp receive buffer
  167. // to make sure recvbuf has enough capacity
  168. if cap(s.recvbuf) < size {
  169. s.recvbuf = make([]byte, size)
  170. }
  171. // resize recvbuf slice length
  172. s.recvbuf = s.recvbuf[:size]
  173. s.kcp.Recv(s.recvbuf)
  174. n = copy(b, s.recvbuf) // copy to b
  175. s.bufptr = s.recvbuf[n:] // update pointer
  176. s.mu.Unlock()
  177. return n, nil
  178. }
  179. // read deadline
  180. var timeout *time.Timer
  181. var c <-chan time.Time
  182. if !s.rd.IsZero() {
  183. if time.Now().After(s.rd) {
  184. s.mu.Unlock()
  185. return 0, errTimeout{}
  186. }
  187. delay := s.rd.Sub(time.Now())
  188. timeout = time.NewTimer(delay)
  189. c = timeout.C
  190. }
  191. s.mu.Unlock()
  192. // wait for read event or timeout
  193. select {
  194. case <-s.chReadEvent:
  195. case <-c:
  196. case <-s.die:
  197. case err = <-s.chErrorEvent:
  198. if timeout != nil {
  199. timeout.Stop()
  200. }
  201. return n, err
  202. }
  203. if timeout != nil {
  204. timeout.Stop()
  205. }
  206. }
  207. }
  208. // Write implements net.Conn
  209. func (s *UDPSession) Write(b []byte) (n int, err error) {
  210. for {
  211. s.mu.Lock()
  212. if s.isClosed {
  213. s.mu.Unlock()
  214. return 0, errors.New(errBrokenPipe)
  215. }
  216. // api flow control
  217. if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
  218. n = len(b)
  219. for {
  220. if len(b) <= int(s.kcp.mss) {
  221. s.kcp.Send(b)
  222. break
  223. } else {
  224. s.kcp.Send(b[:s.kcp.mss])
  225. b = b[s.kcp.mss:]
  226. }
  227. }
  228. if !s.writeDelay {
  229. s.kcp.flush(false)
  230. }
  231. s.mu.Unlock()
  232. atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
  233. return n, nil
  234. }
  235. // write deadline
  236. var timeout *time.Timer
  237. var c <-chan time.Time
  238. if !s.wd.IsZero() {
  239. if time.Now().After(s.wd) {
  240. s.mu.Unlock()
  241. return 0, errTimeout{}
  242. }
  243. delay := s.wd.Sub(time.Now())
  244. timeout = time.NewTimer(delay)
  245. c = timeout.C
  246. }
  247. s.mu.Unlock()
  248. // wait for write event or timeout
  249. select {
  250. case <-s.chWriteEvent:
  251. case <-c:
  252. case <-s.die:
  253. }
  254. if timeout != nil {
  255. timeout.Stop()
  256. }
  257. }
  258. }
  259. // Close closes the connection.
  260. func (s *UDPSession) Close() error {
  261. // remove this session from updater & listener(if necessary)
  262. updater.removeSession(s)
  263. if s.l != nil { // notify listener
  264. s.l.closeSession(sessionKey{
  265. addr: s.remote.String(),
  266. convID: s.kcp.conv,
  267. })
  268. }
  269. s.mu.Lock()
  270. defer s.mu.Unlock()
  271. if s.isClosed {
  272. return errors.New(errBrokenPipe)
  273. }
  274. close(s.die)
  275. s.isClosed = true
  276. atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
  277. if s.l == nil { // client socket close
  278. return s.conn.Close()
  279. }
  280. return nil
  281. }
  282. // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
  283. func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() }
  284. // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
  285. func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
  286. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  287. func (s *UDPSession) SetDeadline(t time.Time) error {
  288. s.mu.Lock()
  289. defer s.mu.Unlock()
  290. s.rd = t
  291. s.wd = t
  292. return nil
  293. }
  294. // SetReadDeadline implements the Conn SetReadDeadline method.
  295. func (s *UDPSession) SetReadDeadline(t time.Time) error {
  296. s.mu.Lock()
  297. defer s.mu.Unlock()
  298. s.rd = t
  299. return nil
  300. }
  301. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  302. func (s *UDPSession) SetWriteDeadline(t time.Time) error {
  303. s.mu.Lock()
  304. defer s.mu.Unlock()
  305. s.wd = t
  306. return nil
  307. }
  308. // SetWriteDelay delays write for bulk transfer until the next update interval
  309. func (s *UDPSession) SetWriteDelay(delay bool) {
  310. s.mu.Lock()
  311. defer s.mu.Unlock()
  312. s.writeDelay = delay
  313. }
  314. // SetWindowSize set maximum window size
  315. func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
  316. s.mu.Lock()
  317. defer s.mu.Unlock()
  318. s.kcp.WndSize(sndwnd, rcvwnd)
  319. }
  320. // SetMtu sets the maximum transmission unit(not including UDP header)
  321. func (s *UDPSession) SetMtu(mtu int) bool {
  322. if mtu > mtuLimit {
  323. return false
  324. }
  325. s.mu.Lock()
  326. defer s.mu.Unlock()
  327. s.kcp.SetMtu(mtu - s.headerSize)
  328. return true
  329. }
  330. // SetStreamMode toggles the stream mode on/off
  331. func (s *UDPSession) SetStreamMode(enable bool) {
  332. s.mu.Lock()
  333. defer s.mu.Unlock()
  334. if enable {
  335. s.kcp.stream = 1
  336. } else {
  337. s.kcp.stream = 0
  338. }
  339. }
  340. // SetACKNoDelay changes ack flush option, set true to flush ack immediately,
  341. func (s *UDPSession) SetACKNoDelay(nodelay bool) {
  342. s.mu.Lock()
  343. defer s.mu.Unlock()
  344. s.ackNoDelay = nodelay
  345. }
  346. // SetDUP duplicates udp packets for kcp output, for testing purpose only
  347. func (s *UDPSession) SetDUP(dup int) {
  348. s.mu.Lock()
  349. defer s.mu.Unlock()
  350. s.dup = dup
  351. }
  352. // SetNoDelay calls nodelay() of kcp
  353. // https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
  354. func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
  355. s.mu.Lock()
  356. defer s.mu.Unlock()
  357. s.kcp.NoDelay(nodelay, interval, resend, nc)
  358. }
  359. // SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
  360. func (s *UDPSession) SetDSCP(dscp int) error {
  361. s.mu.Lock()
  362. defer s.mu.Unlock()
  363. if s.l == nil {
  364. if nc, ok := s.conn.(*connectedUDPConn); ok {
  365. return ipv4.NewConn(nc.UDPConn).SetTOS(dscp << 2)
  366. } else if nc, ok := s.conn.(net.Conn); ok {
  367. return ipv4.NewConn(nc).SetTOS(dscp << 2)
  368. }
  369. }
  370. return errors.New(errInvalidOperation)
  371. }
  372. // SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
  373. func (s *UDPSession) SetReadBuffer(bytes int) error {
  374. s.mu.Lock()
  375. defer s.mu.Unlock()
  376. if s.l == nil {
  377. if nc, ok := s.conn.(setReadBuffer); ok {
  378. return nc.SetReadBuffer(bytes)
  379. }
  380. }
  381. return errors.New(errInvalidOperation)
  382. }
  383. // SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
  384. func (s *UDPSession) SetWriteBuffer(bytes int) error {
  385. s.mu.Lock()
  386. defer s.mu.Unlock()
  387. if s.l == nil {
  388. if nc, ok := s.conn.(setWriteBuffer); ok {
  389. return nc.SetWriteBuffer(bytes)
  390. }
  391. }
  392. return errors.New(errInvalidOperation)
  393. }
  394. // output pipeline entry
  395. // steps for output data processing:
  396. // 0. Header extends
  397. // 1. FEC
  398. // 2. CRC32
  399. // 3. Encryption
  400. // 4. WriteTo kernel
  401. func (s *UDPSession) output(buf []byte) {
  402. var ecc [][]byte
  403. // 0. extend buf's header space(if necessary)
  404. ext := buf
  405. if s.headerSize > 0 {
  406. ext = s.ext[:s.headerSize+len(buf)]
  407. copy(ext[s.headerSize:], buf)
  408. }
  409. // 1. FEC encoding
  410. if s.fecEncoder != nil {
  411. ecc = s.fecEncoder.encode(ext)
  412. }
  413. // 2&3. crc32 & encryption
  414. if s.block != nil {
  415. s.nonce.Fill(ext[:nonceSize])
  416. checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
  417. binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
  418. s.block.Encrypt(ext, ext)
  419. for k := range ecc {
  420. s.nonce.Fill(ecc[k][:nonceSize])
  421. checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
  422. binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
  423. s.block.Encrypt(ecc[k], ecc[k])
  424. }
  425. }
  426. // 4. WriteTo kernel
  427. nbytes := 0
  428. npkts := 0
  429. for i := 0; i < s.dup+1; i++ {
  430. if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
  431. nbytes += n
  432. npkts++
  433. }
  434. }
  435. for k := range ecc {
  436. if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
  437. nbytes += n
  438. npkts++
  439. }
  440. }
  441. atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
  442. atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
  443. }
  444. // kcp update, returns interval for next calling
  445. func (s *UDPSession) update() (interval time.Duration) {
  446. s.mu.Lock()
  447. s.kcp.flush(false)
  448. if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
  449. s.notifyWriteEvent()
  450. }
  451. interval = time.Duration(s.kcp.interval) * time.Millisecond
  452. s.mu.Unlock()
  453. return
  454. }
  455. // GetConv gets conversation id of a session
  456. func (s *UDPSession) GetConv() uint32 { return s.kcp.conv }
  457. func (s *UDPSession) notifyReadEvent() {
  458. select {
  459. case s.chReadEvent <- struct{}{}:
  460. default:
  461. }
  462. }
  463. func (s *UDPSession) notifyWriteEvent() {
  464. select {
  465. case s.chWriteEvent <- struct{}{}:
  466. default:
  467. }
  468. }
  469. func (s *UDPSession) kcpInput(data []byte) {
  470. var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
  471. if s.fecDecoder != nil {
  472. f := s.fecDecoder.decodeBytes(data)
  473. s.mu.Lock()
  474. if f.flag == typeData {
  475. if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
  476. kcpInErrors++
  477. }
  478. }
  479. if f.flag == typeData || f.flag == typeFEC {
  480. if f.flag == typeFEC {
  481. fecParityShards++
  482. }
  483. recovers := s.fecDecoder.decode(f)
  484. for _, r := range recovers {
  485. if len(r) >= 2 { // must be larger than 2bytes
  486. sz := binary.LittleEndian.Uint16(r)
  487. if int(sz) <= len(r) && sz >= 2 {
  488. if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
  489. fecRecovered++
  490. } else {
  491. kcpInErrors++
  492. }
  493. } else {
  494. fecErrs++
  495. }
  496. } else {
  497. fecErrs++
  498. }
  499. }
  500. }
  501. // notify reader
  502. if n := s.kcp.PeekSize(); n > 0 {
  503. s.notifyReadEvent()
  504. }
  505. s.mu.Unlock()
  506. } else {
  507. s.mu.Lock()
  508. if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
  509. kcpInErrors++
  510. }
  511. // notify reader
  512. if n := s.kcp.PeekSize(); n > 0 {
  513. s.notifyReadEvent()
  514. }
  515. s.mu.Unlock()
  516. }
  517. atomic.AddUint64(&DefaultSnmp.InPkts, 1)
  518. atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
  519. if fecParityShards > 0 {
  520. atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
  521. }
  522. if kcpInErrors > 0 {
  523. atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
  524. }
  525. if fecErrs > 0 {
  526. atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
  527. }
  528. if fecRecovered > 0 {
  529. atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
  530. }
  531. }
  532. func (s *UDPSession) receiver(ch chan<- []byte) {
  533. for {
  534. data := xmitBuf.Get().([]byte)[:mtuLimit]
  535. if n, _, err := s.conn.ReadFrom(data); err == nil && n >= s.headerSize+IKCP_OVERHEAD {
  536. select {
  537. case ch <- data[:n]:
  538. case <-s.die:
  539. return
  540. }
  541. } else if err != nil {
  542. s.chErrorEvent <- err
  543. return
  544. } else {
  545. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  546. }
  547. }
  548. }
  549. // read loop for client session
  550. func (s *UDPSession) readLoop() {
  551. chPacket := make(chan []byte, qlen)
  552. go s.receiver(chPacket)
  553. for {
  554. select {
  555. case data := <-chPacket:
  556. raw := data
  557. dataValid := false
  558. if s.block != nil {
  559. s.block.Decrypt(data, data)
  560. data = data[nonceSize:]
  561. checksum := crc32.ChecksumIEEE(data[crcSize:])
  562. if checksum == binary.LittleEndian.Uint32(data) {
  563. data = data[crcSize:]
  564. dataValid = true
  565. } else {
  566. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  567. }
  568. } else if s.block == nil {
  569. dataValid = true
  570. }
  571. if dataValid {
  572. s.kcpInput(data)
  573. }
  574. xmitBuf.Put(raw)
  575. case <-s.die:
  576. return
  577. }
  578. }
  579. }
  580. type (
  581. sessionKey struct {
  582. addr string
  583. convID uint32
  584. }
  585. // Listener defines a server listening for connections
  586. Listener struct {
  587. block BlockCrypt // block encryption
  588. dataShards int // FEC data shard
  589. parityShards int // FEC parity shard
  590. fecDecoder *fecDecoder // FEC mock initialization
  591. conn net.PacketConn // the underlying packet connection
  592. sessions map[sessionKey]*UDPSession // all sessions accepted by this Listener
  593. chAccepts chan *UDPSession // Listen() backlog
  594. chSessionClosed chan sessionKey // session close queue
  595. headerSize int // the overall header size added before KCP frame
  596. die chan struct{} // notify the listener has closed
  597. rd atomic.Value // read deadline for Accept()
  598. wd atomic.Value
  599. }
  600. // incoming packet
  601. inPacket struct {
  602. from net.Addr
  603. data []byte
  604. }
  605. )
  606. // monitor incoming data for all connections of server
  607. func (l *Listener) monitor() {
  608. // cache last session
  609. var lastKey sessionKey
  610. var lastSession *UDPSession
  611. chPacket := make(chan inPacket, qlen)
  612. go l.receiver(chPacket)
  613. for {
  614. select {
  615. case p := <-chPacket:
  616. raw := p.data
  617. data := p.data
  618. from := p.from
  619. dataValid := false
  620. if l.block != nil {
  621. l.block.Decrypt(data, data)
  622. data = data[nonceSize:]
  623. checksum := crc32.ChecksumIEEE(data[crcSize:])
  624. if checksum == binary.LittleEndian.Uint32(data) {
  625. data = data[crcSize:]
  626. dataValid = true
  627. } else {
  628. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  629. }
  630. } else if l.block == nil {
  631. dataValid = true
  632. }
  633. if dataValid {
  634. var conv uint32
  635. convValid := false
  636. if l.fecDecoder != nil {
  637. isfec := binary.LittleEndian.Uint16(data[4:])
  638. if isfec == typeData {
  639. conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
  640. convValid = true
  641. }
  642. } else {
  643. conv = binary.LittleEndian.Uint32(data)
  644. convValid = true
  645. }
  646. if convValid {
  647. key := sessionKey{
  648. addr: from.String(),
  649. convID: conv,
  650. }
  651. var s *UDPSession
  652. var ok bool
  653. // packets received from an address always come in batch.
  654. // cache the session for next packet, without querying map.
  655. if key == lastKey {
  656. s, ok = lastSession, true
  657. } else if s, ok = l.sessions[key]; ok {
  658. lastSession = s
  659. lastKey = key
  660. }
  661. if !ok { // new session
  662. if !blacklist.has(from.String(), conv) && len(l.chAccepts) < cap(l.chAccepts) && len(l.sessions) < 4096 { // do not let new session overwhelm accept queue and connection count
  663. s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
  664. s.kcpInput(data)
  665. l.sessions[key] = s
  666. l.chAccepts <- s
  667. }
  668. } else {
  669. s.kcpInput(data)
  670. }
  671. }
  672. }
  673. xmitBuf.Put(raw)
  674. case key := <-l.chSessionClosed:
  675. if key == lastKey {
  676. lastKey = sessionKey{}
  677. }
  678. delete(l.sessions, key)
  679. case <-l.die:
  680. return
  681. }
  682. }
  683. }
  684. func (l *Listener) receiver(ch chan<- inPacket) {
  685. for {
  686. data := xmitBuf.Get().([]byte)[:mtuLimit]
  687. if n, from, err := l.conn.ReadFrom(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD {
  688. select {
  689. case ch <- inPacket{from, data[:n]}:
  690. case <-l.die:
  691. return
  692. }
  693. } else if err != nil {
  694. return
  695. } else {
  696. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  697. }
  698. }
  699. }
  700. // SetReadBuffer sets the socket read buffer for the Listener
  701. func (l *Listener) SetReadBuffer(bytes int) error {
  702. if nc, ok := l.conn.(setReadBuffer); ok {
  703. return nc.SetReadBuffer(bytes)
  704. }
  705. return errors.New(errInvalidOperation)
  706. }
  707. // SetWriteBuffer sets the socket write buffer for the Listener
  708. func (l *Listener) SetWriteBuffer(bytes int) error {
  709. if nc, ok := l.conn.(setWriteBuffer); ok {
  710. return nc.SetWriteBuffer(bytes)
  711. }
  712. return errors.New(errInvalidOperation)
  713. }
  714. // SetDSCP sets the 6bit DSCP field of IP header
  715. func (l *Listener) SetDSCP(dscp int) error {
  716. if nc, ok := l.conn.(net.Conn); ok {
  717. return ipv4.NewConn(nc).SetTOS(dscp << 2)
  718. }
  719. return errors.New(errInvalidOperation)
  720. }
  721. // Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
  722. func (l *Listener) Accept() (net.Conn, error) {
  723. return l.AcceptKCP()
  724. }
  725. // AcceptKCP accepts a KCP connection
  726. func (l *Listener) AcceptKCP() (*UDPSession, error) {
  727. var timeout <-chan time.Time
  728. if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
  729. timeout = time.After(tdeadline.Sub(time.Now()))
  730. }
  731. select {
  732. case <-timeout:
  733. return nil, &errTimeout{}
  734. case c := <-l.chAccepts:
  735. return c, nil
  736. case <-l.die:
  737. return nil, errors.New(errBrokenPipe)
  738. }
  739. }
  740. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  741. func (l *Listener) SetDeadline(t time.Time) error {
  742. l.SetReadDeadline(t)
  743. l.SetWriteDeadline(t)
  744. return nil
  745. }
  746. // SetReadDeadline implements the Conn SetReadDeadline method.
  747. func (l *Listener) SetReadDeadline(t time.Time) error {
  748. l.rd.Store(t)
  749. return nil
  750. }
  751. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  752. func (l *Listener) SetWriteDeadline(t time.Time) error {
  753. l.wd.Store(t)
  754. return nil
  755. }
  756. // Close stops listening on the UDP address. Already Accepted connections are not closed.
  757. func (l *Listener) Close() error {
  758. close(l.die)
  759. return l.conn.Close()
  760. }
  761. // closeSession notify the listener that a session has closed
  762. func (l *Listener) closeSession(key sessionKey) bool {
  763. select {
  764. case l.chSessionClosed <- key:
  765. return true
  766. case <-l.die:
  767. return false
  768. }
  769. }
  770. // Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
  771. func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
  772. // Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
  773. func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
  774. // ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
  775. // dataShards, parityShards defines Reed-Solomon Erasure Coding parameters
  776. func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
  777. udpaddr, err := net.ResolveUDPAddr("udp", laddr)
  778. if err != nil {
  779. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  780. }
  781. conn, err := net.ListenUDP("udp", udpaddr)
  782. if err != nil {
  783. return nil, errors.Wrap(err, "net.ListenUDP")
  784. }
  785. return ServeConn(block, dataShards, parityShards, conn)
  786. }
  787. // ServeConn serves KCP protocol for a single packet connection.
  788. func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
  789. l := new(Listener)
  790. l.conn = conn
  791. l.sessions = make(map[sessionKey]*UDPSession)
  792. l.chAccepts = make(chan *UDPSession, acceptBacklog)
  793. l.chSessionClosed = make(chan sessionKey)
  794. l.die = make(chan struct{})
  795. l.dataShards = dataShards
  796. l.parityShards = parityShards
  797. l.block = block
  798. l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  799. // calculate header size
  800. if l.block != nil {
  801. l.headerSize += cryptHeaderSize
  802. }
  803. if l.fecDecoder != nil {
  804. l.headerSize += fecHeaderSizePlus2
  805. }
  806. go l.monitor()
  807. return l, nil
  808. }
  809. // Dial connects to the remote address "raddr" on the network "udp"
  810. func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
  811. // DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
  812. func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
  813. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  814. if err != nil {
  815. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  816. }
  817. udpconn, err := net.DialUDP("udp", nil, udpaddr)
  818. if err != nil {
  819. return nil, errors.Wrap(err, "net.DialUDP")
  820. }
  821. return NewConn(raddr, block, dataShards, parityShards, &connectedUDPConn{udpconn})
  822. }
  823. // NewConn establishes a session and talks KCP protocol over a packet connection.
  824. func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
  825. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  826. if err != nil {
  827. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  828. }
  829. var convid uint32
  830. binary.Read(rand.Reader, binary.LittleEndian, &convid)
  831. return newUDPSession(convid, dataShards, parityShards, nil, conn, udpaddr, block), nil
  832. }
  833. // returns current time in milliseconds
  834. func currentMs() uint32 { return uint32(time.Now().UnixNano() / int64(time.Millisecond)) }
  835. // connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
  836. // to Write syscalls that are 4 times faster on some OS'es. This should only be
  837. // used for connections that were produced by a net.Dial* call.
  838. type connectedUDPConn struct{ *net.UDPConn }
  839. // WriteTo redirects all writes to the Write syscall, which is 4 times faster.
  840. func (c *connectedUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { return c.Write(b) }