traffic.go 7.2 KB


  1. package ssmapi
  2. import (
  3. "net"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/sagernet/sing-box/adapter"
  7. "github.com/sagernet/sing/common/bufio"
  8. N "github.com/sagernet/sing/common/network"
  9. )
  10. var _ adapter.SSMTracker = (*TrafficManager)(nil)
  11. type TrafficManager struct {
  12. globalUplink atomic.Int64
  13. globalDownlink atomic.Int64
  14. globalUplinkPackets atomic.Int64
  15. globalDownlinkPackets atomic.Int64
  16. globalTCPSessions atomic.Int64
  17. globalUDPSessions atomic.Int64
  18. userAccess sync.Mutex
  19. userUplink map[string]*atomic.Int64
  20. userDownlink map[string]*atomic.Int64
  21. userUplinkPackets map[string]*atomic.Int64
  22. userDownlinkPackets map[string]*atomic.Int64
  23. userTCPSessions map[string]*atomic.Int64
  24. userUDPSessions map[string]*atomic.Int64
  25. }
  26. func NewTrafficManager() *TrafficManager {
  27. manager := &TrafficManager{
  28. userUplink: make(map[string]*atomic.Int64),
  29. userDownlink: make(map[string]*atomic.Int64),
  30. userUplinkPackets: make(map[string]*atomic.Int64),
  31. userDownlinkPackets: make(map[string]*atomic.Int64),
  32. userTCPSessions: make(map[string]*atomic.Int64),
  33. userUDPSessions: make(map[string]*atomic.Int64),
  34. }
  35. return manager
  36. }
  37. func (s *TrafficManager) UpdateUsers(users []string) {
  38. s.userAccess.Lock()
  39. defer s.userAccess.Unlock()
  40. newUserUplink := make(map[string]*atomic.Int64)
  41. newUserDownlink := make(map[string]*atomic.Int64)
  42. newUserUplinkPackets := make(map[string]*atomic.Int64)
  43. newUserDownlinkPackets := make(map[string]*atomic.Int64)
  44. newUserTCPSessions := make(map[string]*atomic.Int64)
  45. newUserUDPSessions := make(map[string]*atomic.Int64)
  46. for _, user := range users {
  47. if counter, loaded := s.userUplink[user]; loaded {
  48. newUserUplink[user] = counter
  49. }
  50. if counter, loaded := s.userDownlink[user]; loaded {
  51. newUserDownlink[user] = counter
  52. }
  53. if counter, loaded := s.userUplinkPackets[user]; loaded {
  54. newUserUplinkPackets[user] = counter
  55. }
  56. if counter, loaded := s.userDownlinkPackets[user]; loaded {
  57. newUserDownlinkPackets[user] = counter
  58. }
  59. if counter, loaded := s.userTCPSessions[user]; loaded {
  60. newUserTCPSessions[user] = counter
  61. }
  62. if counter, loaded := s.userUDPSessions[user]; loaded {
  63. newUserUDPSessions[user] = counter
  64. }
  65. }
  66. s.userUplink = newUserUplink
  67. s.userDownlink = newUserDownlink
  68. s.userUplinkPackets = newUserUplinkPackets
  69. s.userDownlinkPackets = newUserDownlinkPackets
  70. s.userTCPSessions = newUserTCPSessions
  71. s.userUDPSessions = newUserUDPSessions
  72. }
  73. func (s *TrafficManager) userCounter(user string) (*atomic.Int64, *atomic.Int64, *atomic.Int64, *atomic.Int64, *atomic.Int64, *atomic.Int64) {
  74. s.userAccess.Lock()
  75. defer s.userAccess.Unlock()
  76. upCounter, loaded := s.userUplink[user]
  77. if !loaded {
  78. upCounter = new(atomic.Int64)
  79. s.userUplink[user] = upCounter
  80. }
  81. downCounter, loaded := s.userDownlink[user]
  82. if !loaded {
  83. downCounter = new(atomic.Int64)
  84. s.userDownlink[user] = downCounter
  85. }
  86. upPacketsCounter, loaded := s.userUplinkPackets[user]
  87. if !loaded {
  88. upPacketsCounter = new(atomic.Int64)
  89. s.userUplinkPackets[user] = upPacketsCounter
  90. }
  91. downPacketsCounter, loaded := s.userDownlinkPackets[user]
  92. if !loaded {
  93. downPacketsCounter = new(atomic.Int64)
  94. s.userDownlinkPackets[user] = downPacketsCounter
  95. }
  96. tcpSessionsCounter, loaded := s.userTCPSessions[user]
  97. if !loaded {
  98. tcpSessionsCounter = new(atomic.Int64)
  99. s.userTCPSessions[user] = tcpSessionsCounter
  100. }
  101. udpSessionsCounter, loaded := s.userUDPSessions[user]
  102. if !loaded {
  103. udpSessionsCounter = new(atomic.Int64)
  104. s.userUDPSessions[user] = udpSessionsCounter
  105. }
  106. return upCounter, downCounter, upPacketsCounter, downPacketsCounter, tcpSessionsCounter, udpSessionsCounter
  107. }
  108. func (s *TrafficManager) TrackConnection(conn net.Conn, metadata adapter.InboundContext) net.Conn {
  109. s.globalTCPSessions.Add(1)
  110. var readCounter []*atomic.Int64
  111. var writeCounter []*atomic.Int64
  112. readCounter = append(readCounter, &s.globalUplink)
  113. writeCounter = append(writeCounter, &s.globalDownlink)
  114. upCounter, downCounter, _, _, tcpSessionCounter, _ := s.userCounter(metadata.User)
  115. readCounter = append(readCounter, upCounter)
  116. writeCounter = append(writeCounter, downCounter)
  117. tcpSessionCounter.Add(1)
  118. return bufio.NewInt64CounterConn(conn, readCounter, writeCounter)
  119. }
  120. func (s *TrafficManager) TrackPacketConnection(conn N.PacketConn, metadata adapter.InboundContext) N.PacketConn {
  121. s.globalUDPSessions.Add(1)
  122. var readCounter []*atomic.Int64
  123. var readPacketCounter []*atomic.Int64
  124. var writeCounter []*atomic.Int64
  125. var writePacketCounter []*atomic.Int64
  126. readCounter = append(readCounter, &s.globalUplink)
  127. writeCounter = append(writeCounter, &s.globalDownlink)
  128. readPacketCounter = append(readPacketCounter, &s.globalUplinkPackets)
  129. writePacketCounter = append(writePacketCounter, &s.globalDownlinkPackets)
  130. upCounter, downCounter, upPacketsCounter, downPacketsCounter, _, udpSessionCounter := s.userCounter(metadata.User)
  131. readCounter = append(readCounter, upCounter)
  132. writeCounter = append(writeCounter, downCounter)
  133. readPacketCounter = append(readPacketCounter, upPacketsCounter)
  134. writePacketCounter = append(writePacketCounter, downPacketsCounter)
  135. udpSessionCounter.Add(1)
  136. return bufio.NewInt64CounterPacketConn(conn, readCounter, readPacketCounter, writeCounter, writePacketCounter)
  137. }
  138. func (s *TrafficManager) ReadUser(user *UserObject) {
  139. s.userAccess.Lock()
  140. defer s.userAccess.Unlock()
  141. s.readUser(user, false)
  142. }
  143. func (s *TrafficManager) readUser(user *UserObject, swap bool) {
  144. if counter, loaded := s.userUplink[user.UserName]; loaded {
  145. if swap {
  146. user.UplinkBytes = counter.Swap(0)
  147. } else {
  148. user.UplinkBytes = counter.Load()
  149. }
  150. }
  151. if counter, loaded := s.userDownlink[user.UserName]; loaded {
  152. if swap {
  153. user.DownlinkBytes = counter.Swap(0)
  154. } else {
  155. user.DownlinkBytes = counter.Load()
  156. }
  157. }
  158. if counter, loaded := s.userUplinkPackets[user.UserName]; loaded {
  159. if swap {
  160. user.UplinkPackets = counter.Swap(0)
  161. } else {
  162. user.UplinkPackets = counter.Load()
  163. }
  164. }
  165. if counter, loaded := s.userDownlinkPackets[user.UserName]; loaded {
  166. if swap {
  167. user.DownlinkPackets = counter.Swap(0)
  168. } else {
  169. user.DownlinkPackets = counter.Load()
  170. }
  171. }
  172. if counter, loaded := s.userTCPSessions[user.UserName]; loaded {
  173. if swap {
  174. user.TCPSessions = counter.Swap(0)
  175. } else {
  176. user.TCPSessions = counter.Load()
  177. }
  178. }
  179. if counter, loaded := s.userUDPSessions[user.UserName]; loaded {
  180. if swap {
  181. user.UDPSessions = counter.Swap(0)
  182. } else {
  183. user.UDPSessions = counter.Load()
  184. }
  185. }
  186. }
  187. func (s *TrafficManager) ReadUsers(users []*UserObject, swap bool) {
  188. s.userAccess.Lock()
  189. defer s.userAccess.Unlock()
  190. for _, user := range users {
  191. s.readUser(user, swap)
  192. }
  193. }
  194. func (s *TrafficManager) ReadGlobal(swap bool) (uplinkBytes int64, downlinkBytes int64, uplinkPackets int64, downlinkPackets int64, tcpSessions int64, udpSessions int64) {
  195. if swap {
  196. return s.globalUplink.Swap(0),
  197. s.globalDownlink.Swap(0),
  198. s.globalUplinkPackets.Swap(0),
  199. s.globalDownlinkPackets.Swap(0),
  200. s.globalTCPSessions.Swap(0),
  201. s.globalUDPSessions.Swap(0)
  202. } else {
  203. return s.globalUplink.Load(),
  204. s.globalDownlink.Load(),
  205. s.globalUplinkPackets.Load(),
  206. s.globalDownlinkPackets.Load(),
  207. s.globalTCPSessions.Load(),
  208. s.globalUDPSessions.Load()
  209. }
  210. }