broker.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package pubsub
  2. import (
  3. "context"
  4. "fmt"
  5. "log/slog"
  6. "sync"
  7. "time"
  8. )
  9. const defaultChannelBufferSize = 100
  10. type Broker[T any] struct {
  11. subs map[chan Event[T]]context.CancelFunc
  12. mu sync.RWMutex
  13. isClosed bool
  14. }
  15. func NewBroker[T any]() *Broker[T] {
  16. return &Broker[T]{
  17. subs: make(map[chan Event[T]]context.CancelFunc),
  18. }
  19. }
  20. func (b *Broker[T]) Shutdown() {
  21. b.mu.Lock()
  22. if b.isClosed {
  23. b.mu.Unlock()
  24. return
  25. }
  26. b.isClosed = true
  27. for ch, cancel := range b.subs {
  28. cancel()
  29. close(ch)
  30. delete(b.subs, ch)
  31. }
  32. b.mu.Unlock()
  33. slog.Debug("PubSub broker shut down", "type", fmt.Sprintf("%T", *new(T)))
  34. }
  35. func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
  36. b.mu.Lock()
  37. defer b.mu.Unlock()
  38. if b.isClosed {
  39. closedCh := make(chan Event[T])
  40. close(closedCh)
  41. return closedCh
  42. }
  43. subCtx, subCancel := context.WithCancel(ctx)
  44. subscriberChannel := make(chan Event[T], defaultChannelBufferSize)
  45. b.subs[subscriberChannel] = subCancel
  46. go func() {
  47. <-subCtx.Done()
  48. b.mu.Lock()
  49. defer b.mu.Unlock()
  50. if _, ok := b.subs[subscriberChannel]; ok {
  51. close(subscriberChannel)
  52. delete(b.subs, subscriberChannel)
  53. }
  54. }()
  55. return subscriberChannel
  56. }
  57. func (b *Broker[T]) Publish(eventType EventType, payload T) {
  58. b.mu.RLock()
  59. defer b.mu.RUnlock()
  60. if b.isClosed {
  61. slog.Warn("Attempted to publish on a closed pubsub broker", "type", eventType, "payload_type", fmt.Sprintf("%T", payload))
  62. return
  63. }
  64. event := Event[T]{Type: eventType, Payload: payload}
  65. for ch := range b.subs {
  66. // Non-blocking send with a fallback to a goroutine to prevent slow subscribers
  67. // from blocking the publisher.
  68. select {
  69. case ch <- event:
  70. // Successfully sent
  71. default:
  72. // Subscriber channel is full or receiver is slow.
  73. // Send in a new goroutine to avoid blocking the publisher.
  74. // This might lead to out-of-order delivery for this specific slow subscriber.
  75. go func(sChan chan Event[T], ev Event[T]) {
  76. // Re-check if broker is closed before attempting send in goroutine
  77. b.mu.RLock()
  78. isBrokerClosed := b.isClosed
  79. b.mu.RUnlock()
  80. if isBrokerClosed {
  81. return
  82. }
  83. select {
  84. case sChan <- ev:
  85. case <-time.After(2 * time.Second): // Timeout for slow subscriber
  86. slog.Warn("PubSub: Dropped event for slow subscriber after timeout", "type", ev.Type)
  87. }
  88. }(ch, event)
  89. }
  90. }
  91. }
  92. func (b *Broker[T]) GetSubscriberCount() int {
  93. b.mu.RLock()
  94. defer b.mu.RUnlock()
  95. return len(b.subs)
  96. }