broker.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package pubsub
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. const bufferSize = 64
  7. type Broker[T any] struct {
  8. subs map[chan Event[T]]struct{}
  9. mu sync.RWMutex
  10. done chan struct{}
  11. subCount int
  12. maxEvents int
  13. }
  14. func NewBroker[T any]() *Broker[T] {
  15. return NewBrokerWithOptions[T](bufferSize, 1000)
  16. }
  17. func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
  18. b := &Broker[T]{
  19. subs: make(map[chan Event[T]]struct{}),
  20. done: make(chan struct{}),
  21. subCount: 0,
  22. maxEvents: maxEvents,
  23. }
  24. return b
  25. }
  26. func (b *Broker[T]) Shutdown() {
  27. select {
  28. case <-b.done: // Already closed
  29. return
  30. default:
  31. close(b.done)
  32. }
  33. b.mu.Lock()
  34. defer b.mu.Unlock()
  35. for ch := range b.subs {
  36. delete(b.subs, ch)
  37. close(ch)
  38. }
  39. b.subCount = 0
  40. }
  41. func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
  42. b.mu.Lock()
  43. defer b.mu.Unlock()
  44. select {
  45. case <-b.done:
  46. ch := make(chan Event[T])
  47. close(ch)
  48. return ch
  49. default:
  50. }
  51. sub := make(chan Event[T], bufferSize)
  52. b.subs[sub] = struct{}{}
  53. b.subCount++
  54. go func() {
  55. <-ctx.Done()
  56. b.mu.Lock()
  57. defer b.mu.Unlock()
  58. select {
  59. case <-b.done:
  60. return
  61. default:
  62. }
  63. delete(b.subs, sub)
  64. close(sub)
  65. b.subCount--
  66. }()
  67. return sub
  68. }
  69. func (b *Broker[T]) GetSubscriberCount() int {
  70. b.mu.RLock()
  71. defer b.mu.RUnlock()
  72. return b.subCount
  73. }
  74. func (b *Broker[T]) Publish(t EventType, payload T) {
  75. b.mu.RLock()
  76. select {
  77. case <-b.done:
  78. b.mu.RUnlock()
  79. return
  80. default:
  81. }
  82. subscribers := make([]chan Event[T], 0, len(b.subs))
  83. for sub := range b.subs {
  84. subscribers = append(subscribers, sub)
  85. }
  86. b.mu.RUnlock()
  87. event := Event[T]{Type: t, Payload: payload}
  88. for _, sub := range subscribers {
  89. select {
  90. case sub <- event:
  91. default:
  92. }
  93. }
  94. }