pubsub.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package pubsub
  2. import (
  3. "errors"
  4. "sync"
  5. "time"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/common/signal/done"
  8. "github.com/xtls/xray-core/common/task"
  9. )
  10. type Subscriber struct {
  11. buffer chan interface{}
  12. done *done.Instance
  13. }
  14. func (s *Subscriber) push(msg interface{}) {
  15. select {
  16. case s.buffer <- msg:
  17. default:
  18. }
  19. }
  20. func (s *Subscriber) Wait() <-chan interface{} {
  21. return s.buffer
  22. }
  23. func (s *Subscriber) Close() error {
  24. return s.done.Close()
  25. }
  26. func (s *Subscriber) IsClosed() bool {
  27. return s.done.Done()
  28. }
  29. type Service struct {
  30. sync.RWMutex
  31. subs map[string][]*Subscriber
  32. ctask *task.Periodic
  33. }
  34. func NewService() *Service {
  35. s := &Service{
  36. subs: make(map[string][]*Subscriber),
  37. }
  38. s.ctask = &task.Periodic{
  39. Execute: s.Cleanup,
  40. Interval: time.Second * 30,
  41. }
  42. return s
  43. }
  44. // Cleanup cleans up internal caches of subscribers.
  45. // Visible for testing only.
  46. func (s *Service) Cleanup() error {
  47. s.Lock()
  48. defer s.Unlock()
  49. if len(s.subs) == 0 {
  50. return errors.New("nothing to do")
  51. }
  52. for name, subs := range s.subs {
  53. newSub := make([]*Subscriber, 0, len(s.subs))
  54. for _, sub := range subs {
  55. if !sub.IsClosed() {
  56. newSub = append(newSub, sub)
  57. }
  58. }
  59. if len(newSub) == 0 {
  60. delete(s.subs, name)
  61. } else {
  62. s.subs[name] = newSub
  63. }
  64. }
  65. if len(s.subs) == 0 {
  66. s.subs = make(map[string][]*Subscriber)
  67. }
  68. return nil
  69. }
  70. func (s *Service) Subscribe(name string) *Subscriber {
  71. sub := &Subscriber{
  72. buffer: make(chan interface{}, 16),
  73. done: done.New(),
  74. }
  75. s.Lock()
  76. s.subs[name] = append(s.subs[name], sub)
  77. s.Unlock()
  78. common.Must(s.ctask.Start())
  79. return sub
  80. }
  81. func (s *Service) Publish(name string, message interface{}) {
  82. s.RLock()
  83. defer s.RUnlock()
  84. for _, sub := range s.subs[name] {
  85. if !sub.IsClosed() {
  86. sub.push(message)
  87. }
  88. }
  89. }