Browse Source

lib/events: Become a service (fixes #5372) (#5373)

Here the event Logger is rewritten as a service with a main loop instead
of mutexes. This loop has a select with essentially two legs: incoming
events, and subscription changes. When both are possible select will
chose one randomly, thus ensuring that in practice unsubscribes will
happen timely and not block the system.
Jakob Borg 7 years ago
parent
commit
abb3fb8a31
2 changed files with 211 additions and 50 deletions
  1. 94 50
      lib/events/events.go
  2. 117 0
      lib/events/events_test.go

+ 94 - 50
lib/events/events.go

@@ -204,7 +204,9 @@ type Logger struct {
 	nextSubscriptionIDs []int
 	nextGlobalID        int
 	timeout             *time.Timer
-	mutex               sync.Mutex
+	events              chan Event
+	funcs               chan func()
+	stop                chan struct{}
 }
 
 type Event struct {
@@ -225,6 +227,13 @@ type Subscription struct {
 
 var Default = NewLogger()
 
+func init() {
+	// The default logger never stops. To ensure this we nil out the stop
+	// channel so any attempt to stop it will panic.
+	Default.stop = nil
+	go Default.Serve()
+}
+
 var (
 	ErrTimeout = errors.New("timeout")
 	ErrClosed  = errors.New("closed")
@@ -232,8 +241,10 @@ var (
 
 func NewLogger() *Logger {
 	l := &Logger{
-		mutex:   sync.NewMutex(),
 		timeout: time.NewTimer(time.Second),
+		events:  make(chan Event, BufferSize),
+		funcs:   make(chan func()),
+		stop:    make(chan struct{}),
 	}
 	// Make sure the timer is in the stopped state and hasn't fired anything
 	// into the channel.
@@ -243,20 +254,52 @@ func NewLogger() *Logger {
 	return l
 }
 
+func (l *Logger) Serve() {
+loop:
+	for {
+		select {
+		case e := <-l.events:
+			// Incoming events get sent
+			l.sendEvent(e)
+
+		case fn := <-l.funcs:
+			// Subscriptions etc are handled here.
+			fn()
+
+		case <-l.stop:
+			break loop
+		}
+	}
+
+	// Closing the event channels corresponds to what happens when a
+	// subscription is unsubscribed; this stops any BufferedSubscription,
+	// makes Poll() return ErrClosed, etc.
+	for _, s := range l.subs {
+		close(s.events)
+	}
+}
+
+func (l *Logger) Stop() {
+	close(l.stop)
+}
+
 func (l *Logger) Log(t EventType, data interface{}) {
-	l.mutex.Lock()
+	l.events <- Event{
+		Time: time.Now(),
+		Type: t,
+		Data: data,
+		// SubscriptionID and GlobalID are set in sendEvent
+	}
+}
+
+func (l *Logger) sendEvent(e Event) {
 	l.nextGlobalID++
-	dl.Debugln("log", l.nextGlobalID, t, data)
+	dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
 
-	e := Event{
-		GlobalID: l.nextGlobalID,
-		Time:     time.Now(),
-		Type:     t,
-		Data:     data,
-	}
+	e.GlobalID = l.nextGlobalID
 
 	for i, s := range l.subs {
-		if s.mask&t != 0 {
+		if s.mask&e.Type != 0 {
 			e.SubscriptionID = l.nextSubscriptionIDs[i]
 			l.nextSubscriptionIDs[i]++
 
@@ -278,59 +321,60 @@ func (l *Logger) Log(t EventType, data interface{}) {
 			}
 		}
 	}
-	l.mutex.Unlock()
 }
 
 func (l *Logger) Subscribe(mask EventType) *Subscription {
-	l.mutex.Lock()
-	dl.Debugln("subscribe", mask)
+	res := make(chan *Subscription)
+	l.funcs <- func() {
+		dl.Debugln("subscribe", mask)
+
+		s := &Subscription{
+			mask:    mask,
+			events:  make(chan Event, BufferSize),
+			timeout: time.NewTimer(0),
+		}
 
-	s := &Subscription{
-		mask:    mask,
-		events:  make(chan Event, BufferSize),
-		timeout: time.NewTimer(0),
-	}
+		// We need to create the timeout timer in the stopped, non-fired state so
+		// that Subscription.Poll() can safely reset it and select on the timeout
+		// channel. This ensures the timer is stopped and the channel drained.
+		if runningTests {
+			// Make the behavior stable when running tests to avoid randomly
+			// varying test coverage. This ensures, in practice if not in
+			// theory, that the timer fires and we take the true branch of the
+			// next if.
+			runtime.Gosched()
+		}
+		if !s.timeout.Stop() {
+			<-s.timeout.C
+		}
 
-	// We need to create the timeout timer in the stopped, non-fired state so
-	// that Subscription.Poll() can safely reset it and select on the timeout
-	// channel. This ensures the timer is stopped and the channel drained.
-	if runningTests {
-		// Make the behavior stable when running tests to avoid randomly
-		// varying test coverage. This ensures, in practice if not in
-		// theory, that the timer fires and we take the true branch of the
-		// next if.
-		runtime.Gosched()
+		l.subs = append(l.subs, s)
+		l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
+		res <- s
 	}
-	if !s.timeout.Stop() {
-		<-s.timeout.C
-	}
-
-	l.subs = append(l.subs, s)
-	l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
-	l.mutex.Unlock()
-	return s
+	return <-res
 }
 
 func (l *Logger) Unsubscribe(s *Subscription) {
-	l.mutex.Lock()
-	dl.Debugln("unsubscribe")
-	for i, ss := range l.subs {
-		if s == ss {
-			last := len(l.subs) - 1
+	l.funcs <- func() {
+		dl.Debugln("unsubscribe")
+		for i, ss := range l.subs {
+			if s == ss {
+				last := len(l.subs) - 1
 
-			l.subs[i] = l.subs[last]
-			l.subs[last] = nil
-			l.subs = l.subs[:last]
+				l.subs[i] = l.subs[last]
+				l.subs[last] = nil
+				l.subs = l.subs[:last]
 
-			l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
-			l.nextSubscriptionIDs[last] = 0
-			l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
+				l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
+				l.nextSubscriptionIDs[last] = 0
+				l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
 
-			break
+				break
+			}
 		}
+		close(s.events)
 	}
-	close(s.events)
-	l.mutex.Unlock()
 }
 
 // Poll returns an event from the subscription or an error if the poll times

+ 117 - 0
lib/events/events_test.go

@@ -9,6 +9,7 @@ package events
 import (
 	"encoding/json"
 	"fmt"
+	"sync"
 	"testing"
 	"time"
 )
@@ -28,6 +29,9 @@ func TestNewLogger(t *testing.T) {
 
 func TestSubscriber(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
+
 	s := l.Subscribe(0)
 	defer l.Unsubscribe(s)
 	if s == nil {
@@ -37,6 +41,9 @@ func TestSubscriber(t *testing.T) {
 
 func TestTimeout(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
+
 	s := l.Subscribe(0)
 	defer l.Unsubscribe(s)
 	_, err := s.Poll(timeout)
@@ -47,6 +54,8 @@ func TestTimeout(t *testing.T) {
 
 func TestEventBeforeSubscribe(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	l.Log(DeviceConnected, "foo")
 	s := l.Subscribe(0)
@@ -60,6 +69,8 @@ func TestEventBeforeSubscribe(t *testing.T) {
 
 func TestEventAfterSubscribe(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(AllEvents)
 	defer l.Unsubscribe(s)
@@ -85,6 +96,8 @@ func TestEventAfterSubscribe(t *testing.T) {
 
 func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(DeviceDisconnected)
 	defer l.Unsubscribe(s)
@@ -98,6 +111,8 @@ func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
 
 func TestBufferOverflow(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(AllEvents)
 	defer l.Unsubscribe(s)
@@ -121,6 +136,8 @@ func TestBufferOverflow(t *testing.T) {
 
 func TestUnsubscribe(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(AllEvents)
 	l.Log(DeviceConnected, "foo")
@@ -141,6 +158,8 @@ func TestUnsubscribe(t *testing.T) {
 
 func TestGlobalIDs(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(AllEvents)
 	defer l.Unsubscribe(s)
@@ -171,6 +190,8 @@ func TestGlobalIDs(t *testing.T) {
 
 func TestSubscriptionIDs(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(DeviceConnected)
 	defer l.Unsubscribe(s)
@@ -211,6 +232,8 @@ func TestSubscriptionIDs(t *testing.T) {
 
 func TestBufferedSub(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(AllEvents)
 	defer l.Unsubscribe(s)
@@ -240,6 +263,8 @@ func TestBufferedSub(t *testing.T) {
 
 func BenchmarkBufferedSub(b *testing.B) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(AllEvents)
 	defer l.Unsubscribe(s)
@@ -294,6 +319,8 @@ func BenchmarkBufferedSub(b *testing.B) {
 
 func TestSinceUsesSubscriptionId(t *testing.T) {
 	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
 
 	s := l.Subscribe(DeviceConnected)
 	defer l.Unsubscribe(s)
@@ -339,3 +366,93 @@ func TestUnmarshalEvent(t *testing.T) {
 		t.Fatal("Failed to unmarshal event:", err)
 	}
 }
+
+func TestUnsubscribeContention(t *testing.T) {
+	// Check that we can unsubscribe without blocking the whole system.
+
+	const (
+		listeners = 50
+		senders   = 1000
+	)
+
+	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
+
+	// Start listeners. These will poll until the stop channel is closed,
+	// then exit and unsubscribe.
+
+	stopListeners := make(chan struct{})
+	var listenerWg sync.WaitGroup
+	listenerWg.Add(listeners)
+	for i := 0; i < listeners; i++ {
+		go func() {
+			defer listenerWg.Done()
+
+			s := l.Subscribe(AllEvents)
+			defer l.Unsubscribe(s)
+
+			for {
+				select {
+				case <-s.C():
+
+				case <-stopListeners:
+					return
+				}
+			}
+		}()
+	}
+
+	// Start senders. These send pointless events until the stop channel is
+	// closed.
+
+	stopSenders := make(chan struct{})
+	defer close(stopSenders)
+	var senderWg sync.WaitGroup
+	senderWg.Add(senders)
+	for i := 0; i < senders; i++ {
+		go func() {
+			defer senderWg.Done()
+
+			t := time.NewTicker(time.Millisecond)
+
+			for {
+				select {
+				case <-t.C:
+					l.Log(StateChanged, nil)
+
+				case <-stopSenders:
+					return
+				}
+			}
+		}()
+	}
+
+	// Give everything time to start up.
+
+	time.Sleep(time.Second)
+
+	// Stop the listeners and wait for them to exit. This should happen in a
+	// reasonable time frame.
+
+	t0 := time.Now()
+	close(stopListeners)
+	listenerWg.Wait()
+	if d := time.Since(t0); d > time.Minute {
+		t.Error("It should not take", d, "to unsubscribe from an event stream")
+	}
+}
+
+func BenchmarkLogEvent(b *testing.B) {
+	l := NewLogger()
+	defer l.Stop()
+	go l.Serve()
+
+	s := l.Subscribe(AllEvents)
+	defer l.Unsubscribe(s)
+	NewBufferedSubscription(s, 1) // runs in the background
+
+	for i := 0; i < b.N; i++ {
+		l.Log(StateChanged, nil)
+	}
+}