app_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package app
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "testing/synctest"
  7. "time"
  8. tea "charm.land/bubbletea/v2"
  9. "github.com/charmbracelet/crush/internal/pubsub"
  10. "github.com/stretchr/testify/require"
  11. "go.uber.org/goleak"
  12. )
  13. func TestSetupSubscriber_NormalFlow(t *testing.T) {
  14. synctest.Test(t, func(t *testing.T) {
  15. f := newSubscriberFixture(t, 10)
  16. time.Sleep(10 * time.Millisecond)
  17. synctest.Wait()
  18. f.broker.Publish(pubsub.CreatedEvent, "event1")
  19. f.broker.Publish(pubsub.CreatedEvent, "event2")
  20. for range 2 {
  21. select {
  22. case <-f.outputCh:
  23. case <-time.After(5 * time.Second):
  24. t.Fatal("Timed out waiting for messages")
  25. }
  26. }
  27. f.cancel()
  28. f.wg.Wait()
  29. })
  30. }
  31. func TestSetupSubscriber_SlowConsumer(t *testing.T) {
  32. synctest.Test(t, func(t *testing.T) {
  33. f := newSubscriberFixture(t, 0)
  34. const numEvents = 5
  35. var pubWg sync.WaitGroup
  36. pubWg.Go(func() {
  37. for range numEvents {
  38. f.broker.Publish(pubsub.CreatedEvent, "event")
  39. time.Sleep(10 * time.Millisecond)
  40. synctest.Wait()
  41. }
  42. })
  43. time.Sleep(time.Duration(numEvents) * (subscriberSendTimeout + 20*time.Millisecond))
  44. synctest.Wait()
  45. received := 0
  46. for {
  47. select {
  48. case <-f.outputCh:
  49. received++
  50. default:
  51. pubWg.Wait()
  52. f.cancel()
  53. f.wg.Wait()
  54. require.Less(t, received, numEvents, "Slow consumer should have dropped some messages")
  55. return
  56. }
  57. }
  58. })
  59. }
  60. func TestSetupSubscriber_ContextCancellation(t *testing.T) {
  61. synctest.Test(t, func(t *testing.T) {
  62. f := newSubscriberFixture(t, 10)
  63. f.broker.Publish(pubsub.CreatedEvent, "event1")
  64. time.Sleep(100 * time.Millisecond)
  65. synctest.Wait()
  66. f.cancel()
  67. f.wg.Wait()
  68. })
  69. }
  70. func TestSetupSubscriber_DrainAfterDrop(t *testing.T) {
  71. synctest.Test(t, func(t *testing.T) {
  72. f := newSubscriberFixture(t, 0)
  73. time.Sleep(10 * time.Millisecond)
  74. synctest.Wait()
  75. // First event: nobody reads outputCh so the timer fires (message dropped).
  76. f.broker.Publish(pubsub.CreatedEvent, "event1")
  77. time.Sleep(subscriberSendTimeout + 25*time.Millisecond)
  78. synctest.Wait()
  79. // Second event: triggers Stop()==false path; without the fix this deadlocks.
  80. f.broker.Publish(pubsub.CreatedEvent, "event2")
  81. // If the timer drain deadlocks, wg.Wait never returns.
  82. done := make(chan struct{})
  83. go func() {
  84. f.cancel()
  85. f.wg.Wait()
  86. close(done)
  87. }()
  88. select {
  89. case <-done:
  90. case <-time.After(5 * time.Second):
  91. t.Fatal("setupSubscriber goroutine hung — likely timer drain deadlock")
  92. }
  93. })
  94. }
  95. func TestSetupSubscriber_NoTimerLeak(t *testing.T) {
  96. defer goleak.VerifyNone(t)
  97. synctest.Test(t, func(t *testing.T) {
  98. f := newSubscriberFixture(t, 100)
  99. for range 100 {
  100. f.broker.Publish(pubsub.CreatedEvent, "event")
  101. time.Sleep(5 * time.Millisecond)
  102. synctest.Wait()
  103. }
  104. f.cancel()
  105. f.wg.Wait()
  106. })
  107. }
  108. type subscriberFixture struct {
  109. broker *pubsub.Broker[string]
  110. wg sync.WaitGroup
  111. outputCh chan tea.Msg
  112. cancel context.CancelFunc
  113. }
  114. func newSubscriberFixture(t *testing.T, bufSize int) *subscriberFixture {
  115. t.Helper()
  116. ctx, cancel := context.WithCancel(t.Context())
  117. t.Cleanup(cancel)
  118. f := &subscriberFixture{
  119. broker: pubsub.NewBroker[string](),
  120. outputCh: make(chan tea.Msg, bufSize),
  121. cancel: cancel,
  122. }
  123. t.Cleanup(f.broker.Shutdown)
  124. setupSubscriber(ctx, &f.wg, "test", func(ctx context.Context) <-chan pubsub.Event[string] {
  125. return f.broker.Subscribe(ctx)
  126. }, f.outputCh)
  127. return f
  128. }