broker.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package pubsub
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. const bufferSize = 1024 * 1024
  7. type Logger interface {
  8. Debug(msg string, args ...any)
  9. Info(msg string, args ...any)
  10. Warn(msg string, args ...any)
  11. Error(msg string, args ...any)
  12. }
  13. // Broker allows clients to publish events and subscribe to events
  14. type Broker[T any] struct {
  15. subs map[chan Event[T]]struct{} // subscriptions
  16. mu sync.Mutex // sync access to map
  17. done chan struct{} // close when broker is shutting down
  18. }
  19. // NewBroker constructs a pub/sub broker.
  20. func NewBroker[T any]() *Broker[T] {
  21. b := &Broker[T]{
  22. subs: make(map[chan Event[T]]struct{}),
  23. done: make(chan struct{}),
  24. }
  25. return b
  26. }
  27. // Shutdown the broker, terminating any subscriptions.
  28. func (b *Broker[T]) Shutdown() {
  29. close(b.done)
  30. b.mu.Lock()
  31. defer b.mu.Unlock()
  32. // Remove each subscriber entry, so Publish() cannot send any further
  33. // messages, and close each subscriber's channel, so the subscriber cannot
  34. // consume any more messages.
  35. for ch := range b.subs {
  36. delete(b.subs, ch)
  37. close(ch)
  38. }
  39. }
  40. // Subscribe subscribes the caller to a stream of events. The returned channel
  41. // is closed when the broker is shutdown.
  42. func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
  43. b.mu.Lock()
  44. defer b.mu.Unlock()
  45. // Check if broker has shutdown and if so return closed channel
  46. select {
  47. case <-b.done:
  48. ch := make(chan Event[T])
  49. close(ch)
  50. return ch
  51. default:
  52. }
  53. // Subscribe
  54. sub := make(chan Event[T], bufferSize)
  55. b.subs[sub] = struct{}{}
  56. // Unsubscribe when context is done.
  57. go func() {
  58. <-ctx.Done()
  59. b.mu.Lock()
  60. defer b.mu.Unlock()
  61. // Check if broker has shutdown and if so do nothing
  62. select {
  63. case <-b.done:
  64. return
  65. default:
  66. }
  67. delete(b.subs, sub)
  68. close(sub)
  69. }()
  70. return sub
  71. }
  72. // Publish an event to subscribers.
  73. func (b *Broker[T]) Publish(t EventType, payload T) {
  74. b.mu.Lock()
  75. defer b.mu.Unlock()
  76. for sub := range b.subs {
  77. select {
  78. case sub <- Event[T]{Type: t, Payload: payload}:
  79. case <-b.done:
  80. return
  81. }
  82. }
  83. }