Browse Source

util/eventbus: add tests for a subscriber trying to acquire the same mutex as a publisher

As of 2025-11-20, publishing more events than the eventbus's
internal queues can hold may deadlock if a subscriber tries
to acquire a mutex that can also be held by a publisher.

This commit adds a test that demonstrates this deadlock,
and skips it until the bug is fixed.

Updates #17973

Signed-off-by: Nick Khyl <[email protected]>
Nick Khyl 3 months ago
parent
commit
016ccae2da
1 changed files with 70 additions and 0 deletions
  1. 70 0
      util/eventbus/bus_test.go

+ 70 - 0
util/eventbus/bus_test.go

@@ -9,6 +9,7 @@ import (
 	"fmt"
 	"log"
 	"regexp"
+	"sync"
 	"testing"
 	"testing/synctest"
 	"time"
@@ -593,6 +594,75 @@ func TestRegression(t *testing.T) {
 	})
 }
 
+const (
+	maxQueuedItems      = 16                 // same as in queue.go
+	totalMaxQueuedItems = maxQueuedItems * 2 // both publisher and subscriber sides
+)
+
+func TestPublishWithMutex(t *testing.T) {
+	t.Run("FewEvents", func(t *testing.T) {
+		// As of 2025-11-20, publishing up to [totalMaxQueuedItems] is fine.
+		testPublishWithMutex(t, totalMaxQueuedItems)
+	})
+	t.Run("ManyEvents", func(t *testing.T) {
+		// As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock.
+		t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/17973")
+
+		const N = 3 // N larger than one increases the chance of deadlock.
+		testPublishWithMutex(t, totalMaxQueuedItems+N)
+	})
+}
+
+// testPublishWithMutex publishes the specified number of events,
+// acquiring and releasing a mutex around each publish and each
+// subscriber event receive.
+//
+// The test fails if it loses any events or times out due to a deadlock.
+// Unfortunately, a goroutine waiting on a mutex held by a durably blocked
+// goroutine is not itself considered durably blocked, so [synctest] cannot
+// detect this deadlock on its own.
+func testPublishWithMutex(t *testing.T, n int) {
+	synctest.Test(t, func(t *testing.T) {
+		b := eventbus.New()
+		defer b.Close()
+
+		c := b.Client("TestClient")
+
+		evts := make([]any, n)
+		for i := range evts {
+			evts[i] = EventA{Counter: i}
+		}
+		exp := expectEvents(t, evts...)
+
+		var mu sync.Mutex
+		eventbus.SubscribeFunc[EventA](c, func(e EventA) {
+			// As of 2025-11-20, this can deadlock if n is large enough
+			// and event queues fill up.
+			mu.Lock()
+			mu.Unlock()
+
+			// Mark event as received, so we can check for lost events.
+			// Not required for the deadlock to occur.
+			exp.Got(e)
+		})
+
+		p := eventbus.Publish[EventA](c)
+		go func() {
+			for i := range n {
+				mu.Lock()
+				p.Publish(EventA{Counter: i})
+				mu.Unlock()
+			}
+		}()
+
+		synctest.Wait()
+
+		if !exp.Empty() {
+			t.Errorf("unexpected extra events: %+v", exp.want)
+		}
+	})
+}
+
 type queueChecker struct {
 	t    *testing.T
 	want []any