payload_access_audit.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package handler
  2. import (
  3. "crypto/md5"
  4. "encoding/binary"
  5. "time"
  6. "github.com/bjdgyc/anylink/base"
  7. "github.com/bjdgyc/anylink/dbdata"
  8. "github.com/bjdgyc/anylink/pkg/utils"
  9. "github.com/bjdgyc/anylink/sessdata"
  10. "github.com/ivpusic/grpool"
  11. "github.com/songgao/water/waterutil"
  12. )
  13. const (
  14. acc_proto_udp = iota + 1
  15. acc_proto_tcp
  16. acc_proto_https
  17. acc_proto_http
  18. )
  19. var (
  20. auditPayload *AuditPayload
  21. logBatch *LogBatch
  22. )
  23. // 分析审计日志
  24. type AuditPayload struct {
  25. Pool *grpool.Pool
  26. IpAuditMap utils.IMaps
  27. }
  28. // 保存审计日志
  29. type LogBatch struct {
  30. Logs []dbdata.AccessAudit
  31. LogChan chan dbdata.AccessAudit
  32. }
  33. // 异步写入pool
  34. func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) {
  35. select {
  36. case p.Pool.JobQueue <- func() {
  37. logAudit(userName, pl)
  38. }:
  39. default:
  40. putPayload(pl)
  41. base.Error("AccessAudit: AuditPayload channel is full")
  42. }
  43. }
  44. // 数据落盘
  45. func (l *LogBatch) Write() {
  46. if len(l.Logs) == 0 {
  47. return
  48. }
  49. _ = dbdata.AddBatch(l.Logs)
  50. l.Reset()
  51. }
  52. // 清空数据
  53. func (l *LogBatch) Reset() {
  54. l.Logs = []dbdata.AccessAudit{}
  55. }
  56. // 开启批量写入数据功能
  57. func logAuditBatch() {
  58. if base.Cfg.AuditInterval < 0 {
  59. return
  60. }
  61. auditPayload = &AuditPayload{
  62. Pool: grpool.NewPool(10, 10240),
  63. IpAuditMap: utils.NewMap("cmap", 0),
  64. }
  65. logBatch = &LogBatch{
  66. LogChan: make(chan dbdata.AccessAudit, 10240),
  67. }
  68. var (
  69. limit = 100 // 超过上限批量写入数据表
  70. outTime = time.NewTimer(time.Second)
  71. accessAudit = dbdata.AccessAudit{}
  72. )
  73. for {
  74. // 重置超时 时间
  75. outTime.Reset(time.Second * 1)
  76. select {
  77. case accessAudit = <-logBatch.LogChan:
  78. logBatch.Logs = append(logBatch.Logs, accessAudit)
  79. if len(logBatch.Logs) >= limit {
  80. if !outTime.Stop() {
  81. <-outTime.C
  82. }
  83. logBatch.Write()
  84. }
  85. case <-outTime.C:
  86. logBatch.Write()
  87. }
  88. }
  89. }
  90. // 解析IP包的数据
  91. func logAudit(userName string, pl *sessdata.Payload) {
  92. defer putPayload(pl)
  93. if !(pl.LType == sessdata.LTypeIPData && pl.PType == 0x00) {
  94. return
  95. }
  96. ipProto := waterutil.IPv4Protocol(pl.Data)
  97. // 访问协议
  98. var accessProto uint8
  99. // 只统计 tcp和udp 的访问
  100. switch ipProto {
  101. case waterutil.TCP:
  102. accessProto = acc_proto_tcp
  103. case waterutil.UDP:
  104. accessProto = acc_proto_udp
  105. default:
  106. return
  107. }
  108. ipSrc := waterutil.IPv4Source(pl.Data)
  109. ipDst := waterutil.IPv4Destination(pl.Data)
  110. ipPort := waterutil.IPv4DestinationPort(pl.Data)
  111. b := getByte51()
  112. key := *b
  113. copy(key[:16], ipSrc)
  114. copy(key[16:32], ipDst)
  115. binary.BigEndian.PutUint16(key[32:34], ipPort)
  116. key[34] = byte(accessProto)
  117. copy(key[35:51], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})
  118. info := ""
  119. nu := utils.NowSec().Unix()
  120. if ipProto == waterutil.TCP {
  121. tcpPlData := waterutil.IPv4Payload(pl.Data)
  122. // 24 (ACK PSH)
  123. if len(tcpPlData) < 14 || tcpPlData[13] != 24 {
  124. return
  125. }
  126. accessProto, info = onTCP(tcpPlData)
  127. // HTTPS or HTTP
  128. if accessProto != acc_proto_tcp {
  129. // 提前存储只含ip数据的key, 避免即记录域名又记录一笔IP数据的记录
  130. ipKey := make([]byte, 51)
  131. copy(ipKey, key)
  132. ipS := utils.BytesToString(ipKey)
  133. auditPayload.IpAuditMap.Set(ipS, nu)
  134. key[34] = byte(accessProto)
  135. // 存储含域名的key
  136. if info != "" {
  137. md5Sum := md5.Sum([]byte(info))
  138. copy(key[35:51], md5Sum[:])
  139. }
  140. }
  141. }
  142. s := utils.BytesToString(key)
  143. // 判断已经存在,并且没有过期
  144. v, ok := auditPayload.IpAuditMap.Get(s)
  145. if ok && nu-v.(int64) < int64(base.Cfg.AuditInterval) {
  146. // 回收byte对象
  147. putByte51(b)
  148. return
  149. }
  150. auditPayload.IpAuditMap.Set(s, nu)
  151. audit := dbdata.AccessAudit{
  152. Username: userName,
  153. Protocol: uint8(ipProto),
  154. Src: ipSrc.String(),
  155. Dst: ipDst.String(),
  156. DstPort: ipPort,
  157. CreatedAt: utils.NowSec(),
  158. AccessProto: accessProto,
  159. Info: info,
  160. }
  161. select {
  162. case logBatch.LogChan <- audit:
  163. default:
  164. base.Error("AccessAudit: LogChan channel is full")
  165. return
  166. }
  167. }