|
|
@@ -185,5 +185,55 @@ func TestBufferedSub(t *testing.T) {
|
|
|
recv = ev.ID
|
|
|
}
|
|
|
}
|
|
|
+}
|
|
|
+
|
|
|
+func BenchmarkBufferedSub(b *testing.B) {
|
|
|
+ l := events.NewLogger()
|
|
|
+
|
|
|
+ s := l.Subscribe(events.AllEvents)
|
|
|
+ defer l.Unsubscribe(s)
|
|
|
+ bufferSize := events.BufferSize
|
|
|
+ bs := events.NewBufferedSubscription(s, bufferSize)
|
|
|
+
|
|
|
+ // The coord channel paces the sender according to the receiver,
|
|
|
+ // ensuring that no events are dropped. The benchmark measures sending +
|
|
|
+ // receiving + synchronization overhead.
|
|
|
+
|
|
|
+ coord := make(chan struct{}, bufferSize)
|
|
|
+ for i := 0; i < bufferSize-1; i++ {
|
|
|
+ coord <- struct{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ // Receive the events
|
|
|
+ done := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ defer close(done)
|
|
|
+ recv := 0
|
|
|
+ var evs []events.Event
|
|
|
+ for i := 0; i < b.N; {
|
|
|
+ evs = bs.Since(recv, evs[:0])
|
|
|
+ for _, ev := range evs {
|
|
|
+ if ev.ID != recv+1 {
|
|
|
+ b.Fatal("skipped event", ev.ID, recv)
|
|
|
+ }
|
|
|
+ recv = ev.ID
|
|
|
+ coord <- struct{}{}
|
|
|
+ }
|
|
|
+ i += len(evs)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // Send the events
|
|
|
+ eventData := map[string]string{
|
|
|
+ "foo": "bar",
|
|
|
+ "other": "data",
|
|
|
+ "and": "something else",
|
|
|
+ }
|
|
|
+ for i := 0; i < b.N; i++ {
|
|
|
+ l.Log(events.DeviceConnected, eventData)
|
|
|
+ <-coord
|
|
|
+ }
|
|
|
|
|
|
+ <-done
|
|
|
+ b.ReportAllocs()
|
|
|
}
|