Kaynağa Gözat

wgengine/router: use eventbus.Monitor in linuxRouter (#17232)

This commit does not change the order or meaning of any eventbus activity, it
only updates the way the plumbing is set up.

Updates #15160

Change-Id: I61b863f9c05459d530a4c34063a8bad9046c0e27
Signed-off-by: M. J. Fromberger <[email protected]>
M. J. Fromberger 5 ay önce
ebeveyn
işleme
daad5c2b5c
1 değiştirilmiş dosya ile 15 ekleme ve 14 silme
  1. 15 14
      wgengine/router/router_linux.go

+ 15 - 14
wgengine/router/router_linux.go

@@ -49,8 +49,7 @@ type linuxRouter struct {
 	tunname           string
 	tunname           string
 	netMon            *netmon.Monitor
 	netMon            *netmon.Monitor
 	health            *health.Tracker
 	health            *health.Tracker
-	eventClient       *eventbus.Client
-	ruleDeletedSub    *eventbus.Subscriber[netmon.RuleDeleted]
+	eventSubs         eventbus.Monitor
 	rulesAddedPub     *eventbus.Publisher[AddIPRules]
 	rulesAddedPub     *eventbus.Publisher[AddIPRules]
 	unregNetMon       func()
 	unregNetMon       func()
 	addrs             map[netip.Prefix]bool
 	addrs             map[netip.Prefix]bool
@@ -100,7 +99,6 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon
 		tunname:       tunname,
 		tunname:       tunname,
 		netfilterMode: netfilterOff,
 		netfilterMode: netfilterOff,
 		netMon:        netMon,
 		netMon:        netMon,
-		eventClient:   bus.Client("router-linux"),
 		health:        health,
 		health:        health,
 
 
 		cmd: cmd,
 		cmd: cmd,
@@ -108,9 +106,9 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon
 		ipRuleFixLimiter: rate.NewLimiter(rate.Every(5*time.Second), 10),
 		ipRuleFixLimiter: rate.NewLimiter(rate.Every(5*time.Second), 10),
 		ipPolicyPrefBase: 5200,
 		ipPolicyPrefBase: 5200,
 	}
 	}
-	r.ruleDeletedSub = eventbus.Subscribe[netmon.RuleDeleted](r.eventClient)
-	r.rulesAddedPub = eventbus.Publish[AddIPRules](r.eventClient)
-	go r.consumeEventbusTopics()
+	ec := bus.Client("router-linux")
+	r.rulesAddedPub = eventbus.Publish[AddIPRules](ec)
+	r.eventSubs = ec.Monitor(r.consumeEventbusTopics(ec))
 
 
 	if r.useIPCommand() {
 	if r.useIPCommand() {
 		r.ipRuleAvailable = (cmd.run("ip", "rule") == nil)
 		r.ipRuleAvailable = (cmd.run("ip", "rule") == nil)
@@ -159,13 +157,16 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon
 // always handled in the order they are received, i.e. the next event is not
 // always handled in the order they are received, i.e. the next event is not
 // read until the previous event's handler has returned. It returns when the
 // read until the previous event's handler has returned. It returns when the
 // [eventbus.Client] is closed.
 // [eventbus.Client] is closed.
-func (r *linuxRouter) consumeEventbusTopics() {
-	for {
-		select {
-		case <-r.eventClient.Done():
-			return
-		case rulesDeleted := <-r.ruleDeletedSub.Events():
-			r.onIPRuleDeleted(rulesDeleted.Table, rulesDeleted.Priority)
+func (r *linuxRouter) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) {
+	ruleDeletedSub := eventbus.Subscribe[netmon.RuleDeleted](ec)
+	return func(ec *eventbus.Client) {
+		for {
+			select {
+			case <-ec.Done():
+				return
+			case rs := <-ruleDeletedSub.Events():
+				r.onIPRuleDeleted(rs.Table, rs.Priority)
+			}
 		}
 		}
 	}
 	}
 }
 }
@@ -362,7 +363,7 @@ func (r *linuxRouter) Close() error {
 	if r.unregNetMon != nil {
 	if r.unregNetMon != nil {
 		r.unregNetMon()
 		r.unregNetMon()
 	}
 	}
-	r.eventClient.Close()
+	r.eventSubs.Close()
 	if err := r.downInterface(); err != nil {
 	if err := r.downInterface(); err != nil {
 		return err
 		return err
 	}
 	}