| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- package pubsub
- import (
- "context"
- "fmt"
- "log/slog"
- "sync"
- "time"
- )
- const defaultChannelBufferSize = 100
- type Broker[T any] struct {
- subs map[chan Event[T]]context.CancelFunc
- mu sync.RWMutex
- isClosed bool
- }
- func NewBroker[T any]() *Broker[T] {
- return &Broker[T]{
- subs: make(map[chan Event[T]]context.CancelFunc),
- }
- }
- func (b *Broker[T]) Shutdown() {
- b.mu.Lock()
- if b.isClosed {
- b.mu.Unlock()
- return
- }
- b.isClosed = true
- for ch, cancel := range b.subs {
- cancel()
- close(ch)
- delete(b.subs, ch)
- }
- b.mu.Unlock()
- slog.Debug("PubSub broker shut down", "type", fmt.Sprintf("%T", *new(T)))
- }
- func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
- b.mu.Lock()
- defer b.mu.Unlock()
- if b.isClosed {
- closedCh := make(chan Event[T])
- close(closedCh)
- return closedCh
- }
- subCtx, subCancel := context.WithCancel(ctx)
- subscriberChannel := make(chan Event[T], defaultChannelBufferSize)
- b.subs[subscriberChannel] = subCancel
- go func() {
- <-subCtx.Done()
- b.mu.Lock()
- defer b.mu.Unlock()
- if _, ok := b.subs[subscriberChannel]; ok {
- close(subscriberChannel)
- delete(b.subs, subscriberChannel)
- }
- }()
- return subscriberChannel
- }
- func (b *Broker[T]) Publish(eventType EventType, payload T) {
- b.mu.RLock()
- defer b.mu.RUnlock()
- if b.isClosed {
- slog.Warn("Attempted to publish on a closed pubsub broker", "type", eventType, "payload_type", fmt.Sprintf("%T", payload))
- return
- }
- event := Event[T]{Type: eventType, Payload: payload}
- for ch := range b.subs {
- // Non-blocking send with a fallback to a goroutine to prevent slow subscribers
- // from blocking the publisher.
- select {
- case ch <- event:
- // Successfully sent
- default:
- // Subscriber channel is full or receiver is slow.
- // Send in a new goroutine to avoid blocking the publisher.
- // This might lead to out-of-order delivery for this specific slow subscriber.
- go func(sChan chan Event[T], ev Event[T]) {
- // Re-check if broker is closed before attempting send in goroutine
- b.mu.RLock()
- isBrokerClosed := b.isClosed
- b.mu.RUnlock()
- if isBrokerClosed {
- return
- }
- select {
- case sChan <- ev:
- case <-time.After(2 * time.Second): // Timeout for slow subscriber
- slog.Warn("PubSub: Dropped event for slow subscriber after timeout", "type", ev.Type)
- }
- }(ch, event)
- }
- }
- }
- func (b *Broker[T]) GetSubscriberCount() int {
- b.mu.RLock()
- defer b.mu.RUnlock()
- return len(b.subs)
- }
|