stats.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package stats
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/errors"
  7. "github.com/xtls/xray-core/features/stats"
  8. )
  9. // Manager is an implementation of stats.Manager.
  10. type Manager struct {
  11. access sync.RWMutex
  12. counters map[string]*Counter
  13. onlineMap map[string]*OnlineMap
  14. channels map[string]*Channel
  15. running bool
  16. }
  17. // NewManager creates an instance of Statistics Manager.
  18. func NewManager(ctx context.Context, config *Config) (*Manager, error) {
  19. m := &Manager{
  20. counters: make(map[string]*Counter),
  21. onlineMap: make(map[string]*OnlineMap),
  22. channels: make(map[string]*Channel),
  23. }
  24. return m, nil
  25. }
  26. // Type implements common.HasType.
  27. func (*Manager) Type() interface{} {
  28. return stats.ManagerType()
  29. }
  30. // RegisterCounter implements stats.Manager.
  31. func (m *Manager) RegisterCounter(name string) (stats.Counter, error) {
  32. m.access.Lock()
  33. defer m.access.Unlock()
  34. if _, found := m.counters[name]; found {
  35. return nil, errors.New("Counter ", name, " already registered.")
  36. }
  37. errors.LogDebug(context.Background(), "create new counter ", name)
  38. c := new(Counter)
  39. m.counters[name] = c
  40. return c, nil
  41. }
  42. // UnregisterCounter implements stats.Manager.
  43. func (m *Manager) UnregisterCounter(name string) error {
  44. m.access.Lock()
  45. defer m.access.Unlock()
  46. if _, found := m.counters[name]; found {
  47. errors.LogDebug(context.Background(), "remove counter ", name)
  48. delete(m.counters, name)
  49. }
  50. return nil
  51. }
  52. // GetCounter implements stats.Manager.
  53. func (m *Manager) GetCounter(name string) stats.Counter {
  54. m.access.RLock()
  55. defer m.access.RUnlock()
  56. if c, found := m.counters[name]; found {
  57. return c
  58. }
  59. return nil
  60. }
  61. // VisitCounters calls visitor function on all managed counters.
  62. func (m *Manager) VisitCounters(visitor func(string, stats.Counter) bool) {
  63. m.access.RLock()
  64. defer m.access.RUnlock()
  65. for name, c := range m.counters {
  66. if !visitor(name, c) {
  67. break
  68. }
  69. }
  70. }
  71. // RegisterOnlineMap implements stats.Manager.
  72. func (m *Manager) RegisterOnlineMap(name string) (stats.OnlineMap, error) {
  73. m.access.Lock()
  74. defer m.access.Unlock()
  75. if _, found := m.onlineMap[name]; found {
  76. return nil, errors.New("onlineMap ", name, " already registered.")
  77. }
  78. errors.LogDebug(context.Background(), "create new onlineMap ", name)
  79. om := NewOnlineMap()
  80. m.onlineMap[name] = om
  81. return om, nil
  82. }
  83. // UnregisterOnlineMap implements stats.Manager.
  84. func (m *Manager) UnregisterOnlineMap(name string) error {
  85. m.access.Lock()
  86. defer m.access.Unlock()
  87. if _, found := m.onlineMap[name]; found {
  88. errors.LogDebug(context.Background(), "remove onlineMap ", name)
  89. delete(m.onlineMap, name)
  90. }
  91. return nil
  92. }
  93. // GetOnlineMap implements stats.Manager.
  94. func (m *Manager) GetOnlineMap(name string) stats.OnlineMap {
  95. m.access.RLock()
  96. defer m.access.RUnlock()
  97. if om, found := m.onlineMap[name]; found {
  98. return om
  99. }
  100. return nil
  101. }
  102. // RegisterChannel implements stats.Manager.
  103. func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
  104. m.access.Lock()
  105. defer m.access.Unlock()
  106. if _, found := m.channels[name]; found {
  107. return nil, errors.New("Channel ", name, " already registered.")
  108. }
  109. errors.LogDebug(context.Background(), "create new channel ", name)
  110. c := NewChannel(&ChannelConfig{BufferSize: 64, Blocking: false})
  111. m.channels[name] = c
  112. if m.running {
  113. return c, c.Start()
  114. }
  115. return c, nil
  116. }
  117. // UnregisterChannel implements stats.Manager.
  118. func (m *Manager) UnregisterChannel(name string) error {
  119. m.access.Lock()
  120. defer m.access.Unlock()
  121. if c, found := m.channels[name]; found {
  122. errors.LogDebug(context.Background(), "remove channel ", name)
  123. delete(m.channels, name)
  124. return c.Close()
  125. }
  126. return nil
  127. }
  128. // GetChannel implements stats.Manager.
  129. func (m *Manager) GetChannel(name string) stats.Channel {
  130. m.access.RLock()
  131. defer m.access.RUnlock()
  132. if c, found := m.channels[name]; found {
  133. return c
  134. }
  135. return nil
  136. }
  137. // Start implements common.Runnable.
  138. func (m *Manager) Start() error {
  139. m.access.Lock()
  140. defer m.access.Unlock()
  141. m.running = true
  142. errs := []error{}
  143. for _, channel := range m.channels {
  144. if err := channel.Start(); err != nil {
  145. errs = append(errs, err)
  146. }
  147. }
  148. if len(errs) != 0 {
  149. return errors.Combine(errs...)
  150. }
  151. return nil
  152. }
  153. // Close implement common.Closable.
  154. func (m *Manager) Close() error {
  155. m.access.Lock()
  156. defer m.access.Unlock()
  157. m.running = false
  158. errs := []error{}
  159. for name, channel := range m.channels {
  160. errors.LogDebug(context.Background(), "remove channel ", name)
  161. delete(m.channels, name)
  162. if err := channel.Close(); err != nil {
  163. errs = append(errs, err)
  164. }
  165. }
  166. if len(errs) != 0 {
  167. return errors.Combine(errs...)
  168. }
  169. return nil
  170. }
  171. func init() {
  172. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  173. return NewManager(ctx, config.(*Config))
  174. }))
  175. }