| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package handler
- import (
- "bufio"
- "encoding/binary"
- "net"
- "time"
- "github.com/bjdgyc/anylink/base"
- "github.com/bjdgyc/anylink/pkg/utils"
- "github.com/bjdgyc/anylink/sessdata"
- )
- func LinkCstp(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
- base.Debug("LinkCstp connect ip:", cSess.IpAddr, "user:", cSess.Username, "rip:", conn.RemoteAddr())
- defer func() {
- base.Debug("LinkCstp return", cSess.IpAddr)
- _ = conn.Close()
- cSess.Close()
- }()
- var (
- err error
- n int
- dataLen uint16
- dead = time.Duration(cSess.CstpDpd+5) * time.Second
- )
- go cstpWrite(conn, bufRW, cSess)
- for {
- // 设置超时限制
- err = conn.SetReadDeadline(utils.NowSec().Add(dead))
- if err != nil {
- base.Error("SetDeadline: ", err)
- return
- }
- // hdata := make([]byte, BufferSize)
- pl := getPayload()
- n, err = bufRW.Read(pl.Data)
- if err != nil {
- base.Error("read hdata: ", err)
- return
- }
- // 限流设置
- err = cSess.RateLimit(n, true)
- if err != nil {
- base.Error(err)
- }
- switch pl.Data[6] {
- case 0x07: // KEEPALIVE
- // do nothing
- // base.Debug("recv keepalive", cSess.IpAddr)
- case 0x05: // DISCONNECT
- base.Debug("DISCONNECT", cSess.IpAddr)
- return
- case 0x03: // DPD-REQ
- // base.Debug("recv DPD-REQ", cSess.IpAddr)
- pl.PType = 0x04
- if payloadOutCstp(cSess, pl) {
- return
- }
- case 0x04:
- // log.Println("recv DPD-RESP")
- case 0x00: // DATA
- // 获取数据长度
- dataLen = binary.BigEndian.Uint16(pl.Data[4:6]) // 4,5
- // 修复 cstp 数据长度溢出报错
- if 8+dataLen > BufferSize {
- base.Error("recv error dataLen", dataLen)
- continue
- }
- // 去除数据头
- copy(pl.Data, pl.Data[8:8+dataLen])
- // 更新切片长度
- pl.Data = pl.Data[:dataLen]
- // pl.Data = append(pl.Data[:0], pl.Data[8:8+dataLen]...)
- if payloadIn(cSess, pl) {
- return
- }
- }
- }
- }
- func cstpWrite(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
- defer func() {
- base.Debug("cstpWrite return", cSess.IpAddr)
- _ = conn.Close()
- cSess.Close()
- }()
- var (
- err error
- n int
- pl *sessdata.Payload
- )
- for {
- select {
- case pl = <-cSess.PayloadOutCstp:
- case <-cSess.CloseChan:
- return
- }
- if pl.LType != sessdata.LTypeIPData {
- continue
- }
- if pl.PType == 0x00 {
- // 获取数据长度
- l := len(pl.Data)
- // 先扩容 +8
- pl.Data = pl.Data[:l+8]
- // 数据后移
- copy(pl.Data[8:], pl.Data)
- // 添加头信息
- copy(pl.Data[:8], plHeader)
- // 更新头长度
- binary.BigEndian.PutUint16(pl.Data[4:6], uint16(l))
- } else {
- pl.Data = append(pl.Data[:0], plHeader...)
- // 设置头类型
- pl.Data[6] = pl.PType
- }
- n, err = conn.Write(pl.Data)
- if err != nil {
- base.Error("write err", err)
- return
- }
- putPayload(pl)
- // 限流设置
- err = cSess.RateLimit(n, false)
- if err != nil {
- base.Error(err)
- }
- }
- }
|