| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- package pubsub
- import (
- "context"
- "sync"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- )
- func TestBrokerSubscribe(t *testing.T) {
- t.Parallel()
- t.Run("with cancellable context", func(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- ch := broker.Subscribe(ctx)
- assert.NotNil(t, ch)
- assert.Equal(t, 1, broker.GetSubscriberCount())
- // Cancel the context should remove the subscription
- cancel()
- time.Sleep(10 * time.Millisecond) // Give time for goroutine to process
- assert.Equal(t, 0, broker.GetSubscriberCount())
- })
- t.Run("with background context", func(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
- // Using context.Background() should not leak goroutines
- ch := broker.Subscribe(context.Background())
- assert.NotNil(t, ch)
- assert.Equal(t, 1, broker.GetSubscriberCount())
- // Shutdown should clean up all subscriptions
- broker.Shutdown()
- assert.Equal(t, 0, broker.GetSubscriberCount())
- })
- }
- func TestBrokerPublish(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
- ctx := t.Context()
- ch := broker.Subscribe(ctx)
- // Publish a message
- broker.Publish(EventTypeCreated, "test message")
- // Verify message is received
- select {
- case event := <-ch:
- assert.Equal(t, EventTypeCreated, event.Type)
- assert.Equal(t, "test message", event.Payload)
- case <-time.After(100 * time.Millisecond):
- t.Fatal("timeout waiting for message")
- }
- }
- func TestBrokerShutdown(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
- // Create multiple subscribers
- ch1 := broker.Subscribe(context.Background())
- ch2 := broker.Subscribe(context.Background())
- assert.Equal(t, 2, broker.GetSubscriberCount())
- // Shutdown should close all channels and clean up
- broker.Shutdown()
- // Verify channels are closed
- _, ok1 := <-ch1
- _, ok2 := <-ch2
- assert.False(t, ok1, "channel 1 should be closed")
- assert.False(t, ok2, "channel 2 should be closed")
- // Verify subscriber count is reset
- assert.Equal(t, 0, broker.GetSubscriberCount())
- }
- func TestBrokerConcurrency(t *testing.T) {
- t.Parallel()
- broker := NewBroker[int]()
- // Create a large number of subscribers
- const numSubscribers = 100
- var wg sync.WaitGroup
- wg.Add(numSubscribers)
- // Create a channel to collect received events
- receivedEvents := make(chan int, numSubscribers)
- for i := range numSubscribers {
- go func(id int) {
- defer wg.Done()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- ch := broker.Subscribe(ctx)
- // Receive one message then cancel
- select {
- case event := <-ch:
- receivedEvents <- event.Payload
- case <-time.After(1 * time.Second):
- t.Errorf("timeout waiting for message %d", id)
- }
- cancel()
- }(i)
- }
- // Give subscribers time to set up
- time.Sleep(10 * time.Millisecond)
- // Publish messages to all subscribers
- for i := range numSubscribers {
- broker.Publish(EventTypeCreated, i)
- }
- // Wait for all subscribers to finish
- wg.Wait()
- close(receivedEvents)
- // Give time for cleanup goroutines to run
- time.Sleep(10 * time.Millisecond)
- // Verify all subscribers are cleaned up
- assert.Equal(t, 0, broker.GetSubscriberCount())
- // Verify we received the expected number of events
- count := 0
- for range receivedEvents {
- count++
- }
- assert.Equal(t, numSubscribers, count)
- }
|