link_cstp.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package handler
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "net"
  6. "time"
  7. "github.com/bjdgyc/anylink/base"
  8. "github.com/bjdgyc/anylink/pkg/utils"
  9. "github.com/bjdgyc/anylink/sessdata"
  10. )
  11. func LinkCstp(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
  12. base.Debug("LinkCstp connect ip:", cSess.IpAddr, "user:", cSess.Username, "rip:", conn.RemoteAddr())
  13. defer func() {
  14. base.Debug("LinkCstp return", cSess.IpAddr)
  15. _ = conn.Close()
  16. cSess.Close()
  17. }()
  18. var (
  19. err error
  20. n int
  21. dataLen uint16
  22. dead = time.Duration(cSess.CstpDpd+5) * time.Second
  23. )
  24. go cstpWrite(conn, bufRW, cSess)
  25. for {
  26. // 设置超时限制
  27. err = conn.SetReadDeadline(utils.NowSec().Add(dead))
  28. if err != nil {
  29. base.Error("SetDeadline: ", err)
  30. return
  31. }
  32. // hdata := make([]byte, BufferSize)
  33. pl := getPayload()
  34. n, err = bufRW.Read(pl.Data)
  35. if err != nil {
  36. base.Error("read hdata: ", err)
  37. return
  38. }
  39. // 限流设置
  40. err = cSess.RateLimit(n, true)
  41. if err != nil {
  42. base.Error(err)
  43. }
  44. switch pl.Data[6] {
  45. case 0x07: // KEEPALIVE
  46. // do nothing
  47. // base.Debug("recv keepalive", cSess.IpAddr)
  48. case 0x05: // DISCONNECT
  49. base.Debug("DISCONNECT", cSess.IpAddr)
  50. return
  51. case 0x03: // DPD-REQ
  52. // base.Debug("recv DPD-REQ", cSess.IpAddr)
  53. pl.PType = 0x04
  54. if payloadOutCstp(cSess, pl) {
  55. return
  56. }
  57. case 0x04:
  58. // log.Println("recv DPD-RESP")
  59. case 0x00: // DATA
  60. // 获取数据长度
  61. dataLen = binary.BigEndian.Uint16(pl.Data[4:6]) // 4,5
  62. // 修复 cstp 数据长度溢出报错
  63. if 8+dataLen > BufferSize {
  64. base.Error("recv error dataLen", dataLen)
  65. continue
  66. }
  67. // 去除数据头
  68. copy(pl.Data, pl.Data[8:8+dataLen])
  69. // 更新切片长度
  70. pl.Data = pl.Data[:dataLen]
  71. // pl.Data = append(pl.Data[:0], pl.Data[8:8+dataLen]...)
  72. if payloadIn(cSess, pl) {
  73. return
  74. }
  75. }
  76. }
  77. }
  78. func cstpWrite(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
  79. defer func() {
  80. base.Debug("cstpWrite return", cSess.IpAddr)
  81. _ = conn.Close()
  82. cSess.Close()
  83. }()
  84. var (
  85. err error
  86. n int
  87. pl *sessdata.Payload
  88. )
  89. for {
  90. select {
  91. case pl = <-cSess.PayloadOutCstp:
  92. case <-cSess.CloseChan:
  93. return
  94. }
  95. if pl.LType != sessdata.LTypeIPData {
  96. continue
  97. }
  98. if pl.PType == 0x00 {
  99. // 获取数据长度
  100. l := len(pl.Data)
  101. // 先扩容 +8
  102. pl.Data = pl.Data[:l+8]
  103. // 数据后移
  104. copy(pl.Data[8:], pl.Data)
  105. // 添加头信息
  106. copy(pl.Data[:8], plHeader)
  107. // 更新头长度
  108. binary.BigEndian.PutUint16(pl.Data[4:6], uint16(l))
  109. } else {
  110. pl.Data = append(pl.Data[:0], plHeader...)
  111. // 设置头类型
  112. pl.Data[6] = pl.PType
  113. }
  114. n, err = conn.Write(pl.Data)
  115. if err != nil {
  116. base.Error("write err", err)
  117. return
  118. }
  119. putPayload(pl)
  120. // 限流设置
  121. err = cSess.RateLimit(n, false)
  122. if err != nil {
  123. base.Error(err)
  124. }
  125. }
  126. }