link_cstp.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package handler
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "net"
  6. "time"
  7. "github.com/bjdgyc/anylink/base"
  8. "github.com/bjdgyc/anylink/dbdata"
  9. "github.com/bjdgyc/anylink/pkg/utils"
  10. "github.com/bjdgyc/anylink/sessdata"
  11. )
  12. func LinkCstp(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
  13. base.Debug("LinkCstp connect ip:", cSess.IpAddr, "user:", cSess.Username, "rip:", conn.RemoteAddr())
  14. defer func() {
  15. base.Debug("LinkCstp return", cSess.Username, cSess.IpAddr)
  16. _ = conn.Close()
  17. cSess.Close()
  18. }()
  19. var (
  20. err error
  21. n int
  22. dataLen uint16
  23. dead = time.Duration(cSess.CstpDpd+5) * time.Second
  24. )
  25. go cstpWrite(conn, bufRW, cSess)
  26. for {
  27. // 设置超时限制
  28. err = conn.SetReadDeadline(utils.NowSec().Add(dead))
  29. if err != nil {
  30. base.Error("SetDeadline: ", cSess.Username, err)
  31. return
  32. }
  33. // hdata := make([]byte, BufferSize)
  34. pl := getPayload()
  35. n, err = bufRW.Read(pl.Data)
  36. if err != nil {
  37. base.Error("read hdata: ", cSess.Username, err)
  38. return
  39. }
  40. // 限流设置
  41. err = cSess.RateLimit(n, true)
  42. if err != nil {
  43. base.Error(err)
  44. }
  45. switch pl.Data[6] {
  46. case 0x07: // KEEPALIVE
  47. // do nothing
  48. // base.Debug("recv keepalive", cSess.IpAddr)
  49. case 0x05: // DISCONNECT
  50. cSess.UserLogoutCode = dbdata.UserLogoutClient
  51. base.Debug("DISCONNECT", cSess.Username, cSess.IpAddr)
  52. return
  53. case 0x03: // DPD-REQ
  54. // base.Debug("recv DPD-REQ", cSess.IpAddr)
  55. pl.PType = 0x04
  56. if payloadOutCstp(cSess, pl) {
  57. return
  58. }
  59. case 0x04:
  60. // log.Println("recv DPD-RESP")
  61. case 0x08: // decompress
  62. if cSess.CstpPickCmp == nil {
  63. continue
  64. }
  65. dst := getByteFull()
  66. nn, err := cSess.CstpPickCmp.Uncompress(pl.Data[8:], *dst)
  67. if err != nil {
  68. putByte(dst)
  69. base.Error("cstp decompress error", err, nn)
  70. continue
  71. }
  72. binary.BigEndian.PutUint16(pl.Data[4:6], uint16(nn))
  73. pl.Data = append(pl.Data[:8], (*dst)[:nn]...)
  74. putByte(dst)
  75. fallthrough
  76. case 0x00: // DATA
  77. // 获取数据长度
  78. dataLen = binary.BigEndian.Uint16(pl.Data[4:6]) // 4,5
  79. // 修复 cstp 数据长度溢出报错
  80. if 8+dataLen > BufferSize {
  81. base.Error("recv error dataLen", cSess.Username, dataLen)
  82. continue
  83. }
  84. // 去除数据头
  85. copy(pl.Data, pl.Data[8:8+dataLen])
  86. // 更新切片长度
  87. pl.Data = pl.Data[:dataLen]
  88. // pl.Data = append(pl.Data[:0], pl.Data[8:8+dataLen]...)
  89. if payloadIn(cSess, pl) {
  90. return
  91. }
  92. }
  93. }
  94. }
  95. func cstpWrite(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
  96. defer func() {
  97. base.Debug("cstpWrite return", cSess.Username, cSess.IpAddr)
  98. _ = conn.Close()
  99. cSess.Close()
  100. }()
  101. var (
  102. err error
  103. n int
  104. pl *sessdata.Payload
  105. )
  106. for {
  107. select {
  108. case pl = <-cSess.PayloadOutCstp:
  109. case <-cSess.CloseChan:
  110. return
  111. }
  112. if pl.LType != sessdata.LTypeIPData {
  113. continue
  114. }
  115. if pl.PType == 0x00 {
  116. isCompress := false
  117. if cSess.CstpPickCmp != nil && len(pl.Data) > base.Cfg.NoCompressLimit {
  118. dst := getByteFull()
  119. size, err := cSess.CstpPickCmp.Compress(pl.Data, (*dst)[8:])
  120. if err == nil && size < len(pl.Data) {
  121. copy((*dst)[:8], plHeader)
  122. binary.BigEndian.PutUint16((*dst)[4:6], uint16(size))
  123. (*dst)[6] = 0x08
  124. pl.Data = append(pl.Data[:0], (*dst)[:size+8]...)
  125. isCompress = true
  126. }
  127. putByte(dst)
  128. }
  129. if !isCompress {
  130. // 获取数据长度
  131. l := len(pl.Data)
  132. // 先扩容 +8
  133. pl.Data = pl.Data[:l+8]
  134. // 数据后移
  135. copy(pl.Data[8:], pl.Data)
  136. // 添加头信息
  137. copy(pl.Data[:8], plHeader)
  138. // 更新头长度
  139. binary.BigEndian.PutUint16(pl.Data[4:6], uint16(l))
  140. }
  141. } else {
  142. pl.Data = append(pl.Data[:0], plHeader...)
  143. // 设置头类型
  144. pl.Data[6] = pl.PType
  145. }
  146. n, err = conn.Write(pl.Data)
  147. if err != nil {
  148. base.Error("write err", cSess.Username, err)
  149. return
  150. }
  151. putPayload(pl)
  152. // 限流设置
  153. err = cSess.RateLimit(n, false)
  154. if err != nil {
  155. base.Error(err)
  156. }
  157. }
  158. }