Browse Source

util/eventbus: remove redundant code from eventbus.Publish

eventbus.Publish() calls newPublisher(), which in turn invokes (*Client).addPublisher().
That method adds the new publisher to c.pub, so we don’t need to add it again in eventbus.Publish.

Updates #cleanup

Signed-off-by: Nick Khyl <[email protected]>
Nick Khyl 8 months ago
parent
commit
866614202c
3 changed files with 13 additions and 20 deletions
  1. 7 6
      util/eventbus/client.go
  2. 1 5
      util/eventbus/publish.go
  3. 5 9
      util/eventbus/subscribe.go

+ 7 - 6
util/eventbus/client.go

@@ -113,15 +113,16 @@ func (c *Client) shouldPublish(t reflect.Type) bool {
 // Subscribe requests delivery of events of type T through the given
 // Queue. Panics if the queue already has a subscriber for T.
 func Subscribe[T any](c *Client) *Subscriber[T] {
-	return newSubscriber[T](c.subscribeState())
+	r := c.subscribeState()
+	s := newSubscriber[T](r)
+	r.addSubscriber(s)
+	return s
 }
 
 // Publisher returns a publisher for event type T using the given
 // client.
 func Publish[T any](c *Client) *Publisher[T] {
-	ret := newPublisher[T](c)
-	c.mu.Lock()
-	defer c.mu.Unlock()
-	c.pub.Add(ret)
-	return ret
+	p := newPublisher[T](c)
+	c.addPublisher(p)
+	return p
 }

+ 1 - 5
util/eventbus/publish.go

@@ -21,11 +21,7 @@ type Publisher[T any] struct {
 }
 
 func newPublisher[T any](c *Client) *Publisher[T] {
-	ret := &Publisher[T]{
-		client: c,
-	}
-	c.addPublisher(ret)
-	return ret
+	return &Publisher[T]{client: c}
 }
 
 // Close closes the publisher.

+ 5 - 9
util/eventbus/subscribe.go

@@ -91,7 +91,7 @@ func (q *subscribeState) pump(ctx context.Context) {
 			}
 		} else {
 			// Keep the cases in this select in sync with
-			// Subscriber.dispatch below. The only different should be
+			// Subscriber.dispatch below. The only difference should be
 			// that this select doesn't deliver queued values to
 			// anyone, and unconditionally accepts new values.
 			select {
@@ -134,9 +134,10 @@ func (s *subscribeState) subscribeTypes() []reflect.Type {
 	return ret
 }
 
-func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) {
+func (s *subscribeState) addSubscriber(sub subscriber) {
 	s.outputsMu.Lock()
 	defer s.outputsMu.Unlock()
+	t := sub.subscribeType()
 	if s.outputs[t] != nil {
 		panic(fmt.Errorf("double subscription for event %s", t))
 	}
@@ -183,15 +184,10 @@ type Subscriber[T any] struct {
 }
 
 func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
-	t := reflect.TypeFor[T]()
-
-	ret := &Subscriber[T]{
+	return &Subscriber[T]{
 		read:       make(chan T),
-		unregister: func() { r.deleteSubscriber(t) },
+		unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
 	}
-	r.addSubscriber(t, ret)
-
-	return ret
 }
 
 func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {