kcp.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. package kcp
  3. import (
  4. "encoding/binary"
  5. "sync/atomic"
  6. )
  7. const (
  8. IKCP_RTO_NDL = 30 // no delay min rto
  9. IKCP_RTO_MIN = 100 // normal min rto
  10. IKCP_RTO_DEF = 200
  11. IKCP_RTO_MAX = 60000
  12. IKCP_CMD_PUSH = 81 // cmd: push data
  13. IKCP_CMD_ACK = 82 // cmd: ack
  14. IKCP_CMD_WASK = 83 // cmd: window probe (ask)
  15. IKCP_CMD_WINS = 84 // cmd: window size (tell)
  16. IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK
  17. IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS
  18. IKCP_WND_SND = 32
  19. IKCP_WND_RCV = 32
  20. IKCP_MTU_DEF = 1400
  21. IKCP_ACK_FAST = 3
  22. IKCP_INTERVAL = 100
  23. IKCP_OVERHEAD = 24
  24. IKCP_DEADLINK = 20
  25. IKCP_THRESH_INIT = 2
  26. IKCP_THRESH_MIN = 2
  27. IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
  28. IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
  29. )
  30. // output_callback is a prototype which ought capture conn and call conn.Write
  31. type output_callback func(buf []byte, size int)
  32. /* encode 8 bits unsigned int */
  33. func ikcp_encode8u(p []byte, c byte) []byte {
  34. p[0] = c
  35. return p[1:]
  36. }
  37. /* decode 8 bits unsigned int */
  38. func ikcp_decode8u(p []byte, c *byte) []byte {
  39. *c = p[0]
  40. return p[1:]
  41. }
  42. /* encode 16 bits unsigned int (lsb) */
  43. func ikcp_encode16u(p []byte, w uint16) []byte {
  44. binary.LittleEndian.PutUint16(p, w)
  45. return p[2:]
  46. }
  47. /* decode 16 bits unsigned int (lsb) */
  48. func ikcp_decode16u(p []byte, w *uint16) []byte {
  49. *w = binary.LittleEndian.Uint16(p)
  50. return p[2:]
  51. }
  52. /* encode 32 bits unsigned int (lsb) */
  53. func ikcp_encode32u(p []byte, l uint32) []byte {
  54. binary.LittleEndian.PutUint32(p, l)
  55. return p[4:]
  56. }
  57. /* decode 32 bits unsigned int (lsb) */
  58. func ikcp_decode32u(p []byte, l *uint32) []byte {
  59. *l = binary.LittleEndian.Uint32(p)
  60. return p[4:]
  61. }
  62. func _imin_(a, b uint32) uint32 {
  63. if a <= b {
  64. return a
  65. }
  66. return b
  67. }
  68. func _imax_(a, b uint32) uint32 {
  69. if a >= b {
  70. return a
  71. }
  72. return b
  73. }
  74. func _ibound_(lower, middle, upper uint32) uint32 {
  75. return _imin_(_imax_(lower, middle), upper)
  76. }
  77. func _itimediff(later, earlier uint32) int32 {
  78. return (int32)(later - earlier)
  79. }
  80. // segment defines a KCP segment
  81. type segment struct {
  82. conv uint32
  83. cmd uint8
  84. frg uint8
  85. wnd uint16
  86. ts uint32
  87. sn uint32
  88. una uint32
  89. rto uint32
  90. xmit uint32
  91. resendts uint32
  92. fastack uint32
  93. data []byte
  94. }
  95. // encode a segment into buffer
  96. func (seg *segment) encode(ptr []byte) []byte {
  97. ptr = ikcp_encode32u(ptr, seg.conv)
  98. ptr = ikcp_encode8u(ptr, seg.cmd)
  99. ptr = ikcp_encode8u(ptr, seg.frg)
  100. ptr = ikcp_encode16u(ptr, seg.wnd)
  101. ptr = ikcp_encode32u(ptr, seg.ts)
  102. ptr = ikcp_encode32u(ptr, seg.sn)
  103. ptr = ikcp_encode32u(ptr, seg.una)
  104. ptr = ikcp_encode32u(ptr, uint32(len(seg.data)))
  105. atomic.AddUint64(&DefaultSnmp.OutSegs, 1)
  106. return ptr
  107. }
  108. // KCP defines a single KCP connection
  109. type KCP struct {
  110. conv, mtu, mss, state uint32
  111. snd_una, snd_nxt, rcv_nxt uint32
  112. ssthresh uint32
  113. rx_rttvar, rx_srtt int32
  114. rx_rto, rx_minrto uint32
  115. snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
  116. interval, ts_flush uint32
  117. nodelay, updated uint32
  118. ts_probe, probe_wait uint32
  119. dead_link, incr uint32
  120. fastresend int32
  121. nocwnd, stream int32
  122. snd_queue []segment
  123. rcv_queue []segment
  124. snd_buf []segment
  125. rcv_buf []segment
  126. acklist []ackItem
  127. buffer []byte
  128. output output_callback
  129. }
  130. type ackItem struct {
  131. sn uint32
  132. ts uint32
  133. }
  134. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  135. // from the same connection.
  136. func NewKCP(conv uint32, output output_callback) *KCP {
  137. kcp := new(KCP)
  138. kcp.conv = conv
  139. kcp.snd_wnd = IKCP_WND_SND
  140. kcp.rcv_wnd = IKCP_WND_RCV
  141. kcp.rmt_wnd = IKCP_WND_RCV
  142. kcp.mtu = IKCP_MTU_DEF
  143. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  144. kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3)
  145. kcp.rx_rto = IKCP_RTO_DEF
  146. kcp.rx_minrto = IKCP_RTO_MIN
  147. kcp.interval = IKCP_INTERVAL
  148. kcp.ts_flush = IKCP_INTERVAL
  149. kcp.ssthresh = IKCP_THRESH_INIT
  150. kcp.dead_link = IKCP_DEADLINK
  151. kcp.output = output
  152. return kcp
  153. }
  154. // newSegment creates a KCP segment
  155. func (kcp *KCP) newSegment(size int) (seg segment) {
  156. seg.data = xmitBuf.Get().([]byte)[:size]
  157. return
  158. }
  159. // delSegment recycles a KCP segment
  160. func (kcp *KCP) delSegment(seg segment) {
  161. xmitBuf.Put(seg.data)
  162. }
  163. // PeekSize checks the size of next message in the recv queue
  164. func (kcp *KCP) PeekSize() (length int) {
  165. if len(kcp.rcv_queue) == 0 {
  166. return -1
  167. }
  168. seg := &kcp.rcv_queue[0]
  169. if seg.frg == 0 {
  170. return len(seg.data)
  171. }
  172. if len(kcp.rcv_queue) < int(seg.frg+1) {
  173. return -1
  174. }
  175. for k := range kcp.rcv_queue {
  176. seg := &kcp.rcv_queue[k]
  177. length += len(seg.data)
  178. if seg.frg == 0 {
  179. break
  180. }
  181. }
  182. return
  183. }
  184. // Recv is user/upper level recv: returns size, returns below zero for EAGAIN
  185. func (kcp *KCP) Recv(buffer []byte) (n int) {
  186. if len(kcp.rcv_queue) == 0 {
  187. return -1
  188. }
  189. peeksize := kcp.PeekSize()
  190. if peeksize < 0 {
  191. return -2
  192. }
  193. if peeksize > len(buffer) {
  194. return -3
  195. }
  196. var fast_recover bool
  197. if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
  198. fast_recover = true
  199. }
  200. // merge fragment
  201. count := 0
  202. for k := range kcp.rcv_queue {
  203. seg := &kcp.rcv_queue[k]
  204. copy(buffer, seg.data)
  205. buffer = buffer[len(seg.data):]
  206. n += len(seg.data)
  207. count++
  208. kcp.delSegment(*seg)
  209. if seg.frg == 0 {
  210. break
  211. }
  212. }
  213. if count > 0 {
  214. kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
  215. }
  216. // move available data from rcv_buf -> rcv_queue
  217. count = 0
  218. for k := range kcp.rcv_buf {
  219. seg := &kcp.rcv_buf[k]
  220. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  221. kcp.rcv_nxt++
  222. count++
  223. } else {
  224. break
  225. }
  226. }
  227. if count > 0 {
  228. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  229. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  230. }
  231. // fast recover
  232. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
  233. // ready to send back IKCP_CMD_WINS in ikcp_flush
  234. // tell remote my window size
  235. kcp.probe |= IKCP_ASK_TELL
  236. }
  237. return
  238. }
  239. // Send is user/upper level send, returns below zero for error
  240. func (kcp *KCP) Send(buffer []byte) int {
  241. var count int
  242. if len(buffer) == 0 {
  243. return -1
  244. }
  245. // append to previous segment in streaming mode (if possible)
  246. if kcp.stream != 0 {
  247. n := len(kcp.snd_queue)
  248. if n > 0 {
  249. seg := &kcp.snd_queue[n-1]
  250. if len(seg.data) < int(kcp.mss) {
  251. capacity := int(kcp.mss) - len(seg.data)
  252. extend := capacity
  253. if len(buffer) < capacity {
  254. extend = len(buffer)
  255. }
  256. // grow slice, the underlying cap is guaranteed to
  257. // be larger than kcp.mss
  258. oldlen := len(seg.data)
  259. seg.data = seg.data[:oldlen+extend]
  260. copy(seg.data[oldlen:], buffer)
  261. buffer = buffer[extend:]
  262. }
  263. }
  264. if len(buffer) == 0 {
  265. return 0
  266. }
  267. }
  268. if len(buffer) <= int(kcp.mss) {
  269. count = 1
  270. } else {
  271. count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
  272. }
  273. if count > 255 {
  274. return -2
  275. }
  276. if count == 0 {
  277. count = 1
  278. }
  279. for i := 0; i < count; i++ {
  280. var size int
  281. if len(buffer) > int(kcp.mss) {
  282. size = int(kcp.mss)
  283. } else {
  284. size = len(buffer)
  285. }
  286. seg := kcp.newSegment(size)
  287. copy(seg.data, buffer[:size])
  288. if kcp.stream == 0 { // message mode
  289. seg.frg = uint8(count - i - 1)
  290. } else { // stream mode
  291. seg.frg = 0
  292. }
  293. kcp.snd_queue = append(kcp.snd_queue, seg)
  294. buffer = buffer[size:]
  295. }
  296. return 0
  297. }
  298. func (kcp *KCP) update_ack(rtt int32) {
  299. // https://tools.ietf.org/html/rfc6298
  300. var rto uint32
  301. if kcp.rx_srtt == 0 {
  302. kcp.rx_srtt = rtt
  303. kcp.rx_rttvar = rtt >> 1
  304. } else {
  305. delta := rtt - kcp.rx_srtt
  306. kcp.rx_srtt += delta >> 3
  307. if delta < 0 {
  308. delta = -delta
  309. }
  310. if rtt < kcp.rx_srtt-kcp.rx_rttvar {
  311. // if the new RTT sample is below the bottom of the range of
  312. // what an RTT measurement is expected to be.
  313. // give an 8x reduced weight versus its normal weighting
  314. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 5
  315. } else {
  316. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 2
  317. }
  318. }
  319. rto = uint32(kcp.rx_srtt) + _imax_(kcp.interval, uint32(kcp.rx_rttvar)<<2)
  320. kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
  321. }
  322. func (kcp *KCP) shrink_buf() {
  323. if len(kcp.snd_buf) > 0 {
  324. seg := &kcp.snd_buf[0]
  325. kcp.snd_una = seg.sn
  326. } else {
  327. kcp.snd_una = kcp.snd_nxt
  328. }
  329. }
  330. func (kcp *KCP) parse_ack(sn uint32) {
  331. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  332. return
  333. }
  334. for k := range kcp.snd_buf {
  335. seg := &kcp.snd_buf[k]
  336. if sn == seg.sn {
  337. kcp.delSegment(*seg)
  338. copy(kcp.snd_buf[k:], kcp.snd_buf[k+1:])
  339. kcp.snd_buf[len(kcp.snd_buf)-1] = segment{}
  340. kcp.snd_buf = kcp.snd_buf[:len(kcp.snd_buf)-1]
  341. break
  342. }
  343. if _itimediff(sn, seg.sn) < 0 {
  344. break
  345. }
  346. }
  347. }
  348. func (kcp *KCP) parse_fastack(sn uint32) {
  349. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  350. return
  351. }
  352. for k := range kcp.snd_buf {
  353. seg := &kcp.snd_buf[k]
  354. if _itimediff(sn, seg.sn) < 0 {
  355. break
  356. } else if sn != seg.sn {
  357. seg.fastack++
  358. }
  359. }
  360. }
  361. func (kcp *KCP) parse_una(una uint32) {
  362. count := 0
  363. for k := range kcp.snd_buf {
  364. seg := &kcp.snd_buf[k]
  365. if _itimediff(una, seg.sn) > 0 {
  366. kcp.delSegment(*seg)
  367. count++
  368. } else {
  369. break
  370. }
  371. }
  372. if count > 0 {
  373. kcp.snd_buf = kcp.remove_front(kcp.snd_buf, count)
  374. }
  375. }
  376. // ack append
  377. func (kcp *KCP) ack_push(sn, ts uint32) {
  378. kcp.acklist = append(kcp.acklist, ackItem{sn, ts})
  379. }
  380. func (kcp *KCP) parse_data(newseg segment) {
  381. sn := newseg.sn
  382. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  383. _itimediff(sn, kcp.rcv_nxt) < 0 {
  384. kcp.delSegment(newseg)
  385. return
  386. }
  387. n := len(kcp.rcv_buf) - 1
  388. insert_idx := 0
  389. repeat := false
  390. for i := n; i >= 0; i-- {
  391. seg := &kcp.rcv_buf[i]
  392. if seg.sn == sn {
  393. repeat = true
  394. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  395. break
  396. }
  397. if _itimediff(sn, seg.sn) > 0 {
  398. insert_idx = i + 1
  399. break
  400. }
  401. }
  402. if !repeat {
  403. if insert_idx == n+1 {
  404. kcp.rcv_buf = append(kcp.rcv_buf, newseg)
  405. } else {
  406. kcp.rcv_buf = append(kcp.rcv_buf, segment{})
  407. copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
  408. kcp.rcv_buf[insert_idx] = newseg
  409. }
  410. } else {
  411. kcp.delSegment(newseg)
  412. }
  413. // move available data from rcv_buf -> rcv_queue
  414. count := 0
  415. for k := range kcp.rcv_buf {
  416. seg := &kcp.rcv_buf[k]
  417. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  418. kcp.rcv_nxt++
  419. count++
  420. } else {
  421. break
  422. }
  423. }
  424. if count > 0 {
  425. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  426. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  427. }
  428. }
  429. // Input when you received a low level packet (eg. UDP packet), call it
  430. // regular indicates a regular packet has received(not from FEC)
  431. func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
  432. una := kcp.snd_una
  433. if len(data) < IKCP_OVERHEAD {
  434. return -1
  435. }
  436. var maxack uint32
  437. var lastackts uint32
  438. var flag int
  439. var inSegs uint64
  440. for {
  441. var ts, sn, length, una, conv uint32
  442. var wnd uint16
  443. var cmd, frg uint8
  444. if len(data) < int(IKCP_OVERHEAD) {
  445. break
  446. }
  447. data = ikcp_decode32u(data, &conv)
  448. if conv != kcp.conv {
  449. return -1
  450. }
  451. data = ikcp_decode8u(data, &cmd)
  452. data = ikcp_decode8u(data, &frg)
  453. data = ikcp_decode16u(data, &wnd)
  454. data = ikcp_decode32u(data, &ts)
  455. data = ikcp_decode32u(data, &sn)
  456. data = ikcp_decode32u(data, &una)
  457. data = ikcp_decode32u(data, &length)
  458. if len(data) < int(length) {
  459. return -2
  460. }
  461. if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
  462. cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
  463. return -3
  464. }
  465. // only trust window updates from regular packets. i.e: latest update
  466. if regular {
  467. kcp.rmt_wnd = uint32(wnd)
  468. }
  469. kcp.parse_una(una)
  470. kcp.shrink_buf()
  471. if cmd == IKCP_CMD_ACK {
  472. kcp.parse_ack(sn)
  473. kcp.shrink_buf()
  474. if flag == 0 {
  475. flag = 1
  476. maxack = sn
  477. lastackts = ts
  478. } else if _itimediff(sn, maxack) > 0 {
  479. maxack = sn
  480. lastackts = ts
  481. }
  482. } else if cmd == IKCP_CMD_PUSH {
  483. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
  484. kcp.ack_push(sn, ts)
  485. if _itimediff(sn, kcp.rcv_nxt) >= 0 {
  486. seg := kcp.newSegment(int(length))
  487. seg.conv = conv
  488. seg.cmd = cmd
  489. seg.frg = frg
  490. seg.wnd = wnd
  491. seg.ts = ts
  492. seg.sn = sn
  493. seg.una = una
  494. copy(seg.data, data[:length])
  495. kcp.parse_data(seg)
  496. } else {
  497. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  498. }
  499. } else {
  500. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  501. }
  502. } else if cmd == IKCP_CMD_WASK {
  503. // ready to send back IKCP_CMD_WINS in Ikcp_flush
  504. // tell remote my window size
  505. kcp.probe |= IKCP_ASK_TELL
  506. } else if cmd == IKCP_CMD_WINS {
  507. // do nothing
  508. } else {
  509. return -3
  510. }
  511. inSegs++
  512. data = data[length:]
  513. }
  514. atomic.AddUint64(&DefaultSnmp.InSegs, inSegs)
  515. if flag != 0 && regular {
  516. kcp.parse_fastack(maxack)
  517. current := currentMs()
  518. if _itimediff(current, lastackts) >= 0 {
  519. kcp.update_ack(_itimediff(current, lastackts))
  520. }
  521. }
  522. if _itimediff(kcp.snd_una, una) > 0 {
  523. if kcp.cwnd < kcp.rmt_wnd {
  524. mss := kcp.mss
  525. if kcp.cwnd < kcp.ssthresh {
  526. kcp.cwnd++
  527. kcp.incr += mss
  528. } else {
  529. if kcp.incr < mss {
  530. kcp.incr = mss
  531. }
  532. kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
  533. if (kcp.cwnd+1)*mss <= kcp.incr {
  534. kcp.cwnd++
  535. }
  536. }
  537. if kcp.cwnd > kcp.rmt_wnd {
  538. kcp.cwnd = kcp.rmt_wnd
  539. kcp.incr = kcp.rmt_wnd * mss
  540. }
  541. }
  542. }
  543. if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
  544. kcp.flush(true)
  545. }
  546. return 0
  547. }
  548. func (kcp *KCP) wnd_unused() uint16 {
  549. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  550. return uint16(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
  551. }
  552. return 0
  553. }
  554. // flush pending data
  555. func (kcp *KCP) flush(ackOnly bool) {
  556. var seg segment
  557. seg.conv = kcp.conv
  558. seg.cmd = IKCP_CMD_ACK
  559. seg.wnd = kcp.wnd_unused()
  560. seg.una = kcp.rcv_nxt
  561. buffer := kcp.buffer
  562. // flush acknowledges
  563. ptr := buffer
  564. for i, ack := range kcp.acklist {
  565. size := len(buffer) - len(ptr)
  566. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  567. kcp.output(buffer, size)
  568. ptr = buffer
  569. }
  570. // filter jitters caused by bufferbloat
  571. if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
  572. seg.sn, seg.ts = ack.sn, ack.ts
  573. ptr = seg.encode(ptr)
  574. }
  575. }
  576. kcp.acklist = kcp.acklist[0:0]
  577. if ackOnly { // flash remain ack segments
  578. size := len(buffer) - len(ptr)
  579. if size > 0 {
  580. kcp.output(buffer, size)
  581. }
  582. return
  583. }
  584. // probe window size (if remote window size equals zero)
  585. if kcp.rmt_wnd == 0 {
  586. current := currentMs()
  587. if kcp.probe_wait == 0 {
  588. kcp.probe_wait = IKCP_PROBE_INIT
  589. kcp.ts_probe = current + kcp.probe_wait
  590. } else {
  591. if _itimediff(current, kcp.ts_probe) >= 0 {
  592. if kcp.probe_wait < IKCP_PROBE_INIT {
  593. kcp.probe_wait = IKCP_PROBE_INIT
  594. }
  595. kcp.probe_wait += kcp.probe_wait / 2
  596. if kcp.probe_wait > IKCP_PROBE_LIMIT {
  597. kcp.probe_wait = IKCP_PROBE_LIMIT
  598. }
  599. kcp.ts_probe = current + kcp.probe_wait
  600. kcp.probe |= IKCP_ASK_SEND
  601. }
  602. }
  603. } else {
  604. kcp.ts_probe = 0
  605. kcp.probe_wait = 0
  606. }
  607. // flush window probing commands
  608. if (kcp.probe & IKCP_ASK_SEND) != 0 {
  609. seg.cmd = IKCP_CMD_WASK
  610. size := len(buffer) - len(ptr)
  611. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  612. kcp.output(buffer, size)
  613. ptr = buffer
  614. }
  615. ptr = seg.encode(ptr)
  616. }
  617. // flush window probing commands
  618. if (kcp.probe & IKCP_ASK_TELL) != 0 {
  619. seg.cmd = IKCP_CMD_WINS
  620. size := len(buffer) - len(ptr)
  621. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  622. kcp.output(buffer, size)
  623. ptr = buffer
  624. }
  625. ptr = seg.encode(ptr)
  626. }
  627. kcp.probe = 0
  628. // calculate window size
  629. cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
  630. if kcp.nocwnd == 0 {
  631. cwnd = _imin_(kcp.cwnd, cwnd)
  632. }
  633. // sliding window, controlled by snd_nxt && sna_una+cwnd
  634. newSegsCount := 0
  635. for k := range kcp.snd_queue {
  636. if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
  637. break
  638. }
  639. newseg := kcp.snd_queue[k]
  640. newseg.conv = kcp.conv
  641. newseg.cmd = IKCP_CMD_PUSH
  642. newseg.sn = kcp.snd_nxt
  643. kcp.snd_buf = append(kcp.snd_buf, newseg)
  644. kcp.snd_nxt++
  645. newSegsCount++
  646. kcp.snd_queue[k].data = nil
  647. }
  648. if newSegsCount > 0 {
  649. kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
  650. }
  651. // calculate resent
  652. resent := uint32(kcp.fastresend)
  653. if kcp.fastresend <= 0 {
  654. resent = 0xffffffff
  655. }
  656. // check for retransmissions
  657. current := currentMs()
  658. var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
  659. for k := range kcp.snd_buf {
  660. segment := &kcp.snd_buf[k]
  661. needsend := false
  662. if segment.xmit == 0 { // initial transmit
  663. needsend = true
  664. segment.rto = kcp.rx_rto
  665. segment.resendts = current + segment.rto
  666. } else if _itimediff(current, segment.resendts) >= 0 { // RTO
  667. needsend = true
  668. if kcp.nodelay == 0 {
  669. segment.rto += kcp.rx_rto
  670. } else {
  671. segment.rto += kcp.rx_rto / 2
  672. }
  673. segment.resendts = current + segment.rto
  674. lost++
  675. lostSegs++
  676. } else if segment.fastack >= resent { // fast retransmit
  677. needsend = true
  678. segment.fastack = 0
  679. segment.rto = kcp.rx_rto
  680. segment.resendts = current + segment.rto
  681. change++
  682. fastRetransSegs++
  683. } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
  684. needsend = true
  685. segment.fastack = 0
  686. segment.rto = kcp.rx_rto
  687. segment.resendts = current + segment.rto
  688. change++
  689. earlyRetransSegs++
  690. }
  691. if needsend {
  692. segment.xmit++
  693. segment.ts = current
  694. segment.wnd = seg.wnd
  695. segment.una = seg.una
  696. size := len(buffer) - len(ptr)
  697. need := IKCP_OVERHEAD + len(segment.data)
  698. if size+need > int(kcp.mtu) {
  699. kcp.output(buffer, size)
  700. current = currentMs() // time update for a blocking call
  701. ptr = buffer
  702. }
  703. ptr = segment.encode(ptr)
  704. copy(ptr, segment.data)
  705. ptr = ptr[len(segment.data):]
  706. if segment.xmit >= kcp.dead_link {
  707. kcp.state = 0xFFFFFFFF
  708. }
  709. }
  710. }
  711. // flash remain segments
  712. size := len(buffer) - len(ptr)
  713. if size > 0 {
  714. kcp.output(buffer, size)
  715. }
  716. // counter updates
  717. sum := lostSegs
  718. if lostSegs > 0 {
  719. atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
  720. }
  721. if fastRetransSegs > 0 {
  722. atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
  723. sum += fastRetransSegs
  724. }
  725. if earlyRetransSegs > 0 {
  726. atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
  727. sum += earlyRetransSegs
  728. }
  729. if sum > 0 {
  730. atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
  731. }
  732. // update ssthresh
  733. // rate halving, https://tools.ietf.org/html/rfc6937
  734. if change > 0 {
  735. inflight := kcp.snd_nxt - kcp.snd_una
  736. kcp.ssthresh = inflight / 2
  737. if kcp.ssthresh < IKCP_THRESH_MIN {
  738. kcp.ssthresh = IKCP_THRESH_MIN
  739. }
  740. kcp.cwnd = kcp.ssthresh + resent
  741. kcp.incr = kcp.cwnd * kcp.mss
  742. }
  743. // congestion control, https://tools.ietf.org/html/rfc5681
  744. if lost > 0 {
  745. kcp.ssthresh = cwnd / 2
  746. if kcp.ssthresh < IKCP_THRESH_MIN {
  747. kcp.ssthresh = IKCP_THRESH_MIN
  748. }
  749. kcp.cwnd = 1
  750. kcp.incr = kcp.mss
  751. }
  752. if kcp.cwnd < 1 {
  753. kcp.cwnd = 1
  754. kcp.incr = kcp.mss
  755. }
  756. }
  757. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  758. // ikcp_check when to call it again (without ikcp_input/_send calling).
  759. // 'current' - current timestamp in millisec.
  760. func (kcp *KCP) Update() {
  761. var slap int32
  762. current := currentMs()
  763. if kcp.updated == 0 {
  764. kcp.updated = 1
  765. kcp.ts_flush = current
  766. }
  767. slap = _itimediff(current, kcp.ts_flush)
  768. if slap >= 10000 || slap < -10000 {
  769. kcp.ts_flush = current
  770. slap = 0
  771. }
  772. if slap >= 0 {
  773. kcp.ts_flush += kcp.interval
  774. if _itimediff(current, kcp.ts_flush) >= 0 {
  775. kcp.ts_flush = current + kcp.interval
  776. }
  777. kcp.flush(false)
  778. }
  779. }
  780. // Check determines when should you invoke ikcp_update:
  781. // returns when you should invoke ikcp_update in millisec, if there
  782. // is no ikcp_input/_send calling. you can call ikcp_update in that
  783. // time, instead of call update repeatly.
  784. // Important to reduce unnacessary ikcp_update invoking. use it to
  785. // schedule ikcp_update (eg. implementing an epoll-like mechanism,
  786. // or optimize ikcp_update when handling massive kcp connections)
  787. func (kcp *KCP) Check() uint32 {
  788. current := currentMs()
  789. ts_flush := kcp.ts_flush
  790. tm_flush := int32(0x7fffffff)
  791. tm_packet := int32(0x7fffffff)
  792. minimal := uint32(0)
  793. if kcp.updated == 0 {
  794. return current
  795. }
  796. if _itimediff(current, ts_flush) >= 10000 ||
  797. _itimediff(current, ts_flush) < -10000 {
  798. ts_flush = current
  799. }
  800. if _itimediff(current, ts_flush) >= 0 {
  801. return current
  802. }
  803. tm_flush = _itimediff(ts_flush, current)
  804. for k := range kcp.snd_buf {
  805. seg := &kcp.snd_buf[k]
  806. diff := _itimediff(seg.resendts, current)
  807. if diff <= 0 {
  808. return current
  809. }
  810. if diff < tm_packet {
  811. tm_packet = diff
  812. }
  813. }
  814. minimal = uint32(tm_packet)
  815. if tm_packet >= tm_flush {
  816. minimal = uint32(tm_flush)
  817. }
  818. if minimal >= kcp.interval {
  819. minimal = kcp.interval
  820. }
  821. return current + minimal
  822. }
  823. // SetMtu changes MTU size, default is 1400
  824. func (kcp *KCP) SetMtu(mtu int) int {
  825. if mtu < 50 || mtu < IKCP_OVERHEAD {
  826. return -1
  827. }
  828. buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3)
  829. if buffer == nil {
  830. return -2
  831. }
  832. kcp.mtu = uint32(mtu)
  833. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  834. kcp.buffer = buffer
  835. return 0
  836. }
  837. // NoDelay options
  838. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  839. // nodelay: 0:disable(default), 1:enable
  840. // interval: internal update timer interval in millisec, default is 100ms
  841. // resend: 0:disable fast resend(default), 1:enable fast resend
  842. // nc: 0:normal congestion control(default), 1:disable congestion control
  843. func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int {
  844. if nodelay >= 0 {
  845. kcp.nodelay = uint32(nodelay)
  846. if nodelay != 0 {
  847. kcp.rx_minrto = IKCP_RTO_NDL
  848. } else {
  849. kcp.rx_minrto = IKCP_RTO_MIN
  850. }
  851. }
  852. if interval >= 0 {
  853. if interval > 5000 {
  854. interval = 5000
  855. } else if interval < 10 {
  856. interval = 10
  857. }
  858. kcp.interval = uint32(interval)
  859. }
  860. if resend >= 0 {
  861. kcp.fastresend = int32(resend)
  862. }
  863. if nc >= 0 {
  864. kcp.nocwnd = int32(nc)
  865. }
  866. return 0
  867. }
  868. // WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default
  869. func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
  870. if sndwnd > 0 {
  871. kcp.snd_wnd = uint32(sndwnd)
  872. }
  873. if rcvwnd > 0 {
  874. kcp.rcv_wnd = uint32(rcvwnd)
  875. }
  876. return 0
  877. }
  878. // WaitSnd gets how many packet is waiting to be sent
  879. func (kcp *KCP) WaitSnd() int {
  880. return len(kcp.snd_buf) + len(kcp.snd_queue)
  881. }
  882. // remove front n elements from queue
  883. func (kcp *KCP) remove_front(q []segment, n int) []segment {
  884. newn := copy(q, q[n:])
  885. for i := newn; i < len(q); i++ {
  886. q[i] = segment{} // manual set nil for GC
  887. }
  888. return q[:newn]
  889. }