Browse Source

net/netmon: make ChangeDelta event not a pointer (#17112)

This makes things work slightly better over the eventbus.

Also switches ipnlocal to use the event over the eventbus instead of the
direct callback.

Updates #15160

Signed-off-by: Claus Lensbøl <[email protected]>
Claus Lensbøl 5 months ago
parent
commit
df362d0a08
4 changed files with 61 additions and 42 deletions
  1. 6 5
      ipn/ipnlocal/local.go
  2. 4 8
      net/netmon/netmon.go
  3. 1 1
      net/netmon/netmon_test.go
  4. 50 28
      wgengine/userspace.go

+ 6 - 5
ipn/ipnlocal/local.go

@@ -207,6 +207,7 @@ type LocalBackend struct {
 	clientVersionSub         *eventbus.Subscriber[tailcfg.ClientVersion]
 	autoUpdateSub            *eventbus.Subscriber[controlclient.AutoUpdate]
 	healthChangeSub          *eventbus.Subscriber[health.Change]
+	changeDeltaSub           *eventbus.Subscriber[netmon.ChangeDelta]
 	subsDoneCh               chan struct{}       // closed when consumeEventbusTopics returns
 	health                   *health.Tracker     // always non-nil
 	polc                     policyclient.Client // always non-nil
@@ -216,7 +217,6 @@ type LocalBackend struct {
 	dialer                   *tsdial.Dialer  // non-nil; TODO(bradfitz): remove; use sys
 	pushDeviceToken          syncs.AtomicValue[string]
 	backendLogID             logid.PublicID
-	unregisterNetMon         func()
 	unregisterSysPolicyWatch func()
 	portpoll                 *portlist.Poller // may be nil
 	portpollOnce             sync.Once        // guards starting readPoller
@@ -544,6 +544,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
 	b.clientVersionSub = eventbus.Subscribe[tailcfg.ClientVersion](b.eventClient)
 	b.autoUpdateSub = eventbus.Subscribe[controlclient.AutoUpdate](b.eventClient)
 	b.healthChangeSub = eventbus.Subscribe[health.Change](b.eventClient)
+	b.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](b.eventClient)
 	nb := newNodeBackend(ctx, b.sys.Bus.Get())
 	b.currentNodeAtomic.Store(nb)
 	nb.ready()
@@ -591,10 +592,9 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
 	b.e.SetStatusCallback(b.setWgengineStatus)
 
 	b.prevIfState = netMon.InterfaceState()
-	// Call our linkChange code once with the current state, and
-	// then also whenever it changes:
+	// Call our linkChange code once with the current state.
+	// Following changes are triggered via the eventbus.
 	b.linkChange(&netmon.ChangeDelta{New: netMon.InterfaceState()})
-	b.unregisterNetMon = netMon.RegisterChangeCallback(b.linkChange)
 
 	if tunWrap, ok := b.sys.Tun.GetOK(); ok {
 		tunWrap.PeerAPIPort = b.GetPeerAPIPort
@@ -633,6 +633,8 @@ func (b *LocalBackend) consumeEventbusTopics() {
 			b.onTailnetDefaultAutoUpdate(au.Value)
 		case change := <-b.healthChangeSub.Events():
 			b.onHealthChange(change)
+		case changeDelta := <-b.changeDeltaSub.Events():
+			b.linkChange(&changeDelta)
 		}
 	}
 }
@@ -1160,7 +1162,6 @@ func (b *LocalBackend) Shutdown() {
 	}
 	b.stopOfflineAutoUpdate()
 
-	b.unregisterNetMon()
 	b.unregisterSysPolicyWatch()
 	if cc != nil {
 		cc.Shutdown()

+ 4 - 8
net/netmon/netmon.go

@@ -53,7 +53,7 @@ type osMon interface {
 type Monitor struct {
 	logf    logger.Logf
 	b       *eventbus.Client
-	changed *eventbus.Publisher[*ChangeDelta]
+	changed *eventbus.Publisher[ChangeDelta]
 
 	om     osMon         // nil means not supported on this platform
 	change chan bool     // send false to wake poller, true to also force ChangeDeltas be sent
@@ -84,9 +84,6 @@ type ChangeFunc func(*ChangeDelta)
 
 // ChangeDelta describes the difference between two network states.
 type ChangeDelta struct {
-	// Monitor is the network monitor that sent this delta.
-	Monitor *Monitor
-
 	// Old is the old interface state, if known.
 	// It's nil if the old state is unknown.
 	// Do not mutate it.
@@ -126,7 +123,7 @@ func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) {
 		stop:     make(chan struct{}),
 		lastWall: wallTime(),
 	}
-	m.changed = eventbus.Publish[*ChangeDelta](m.b)
+	m.changed = eventbus.Publish[ChangeDelta](m.b)
 	st, err := m.interfaceStateUncached()
 	if err != nil {
 		return nil, err
@@ -401,8 +398,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) {
 		return
 	}
 
-	delta := &ChangeDelta{
-		Monitor:    m,
+	delta := ChangeDelta{
 		Old:        oldState,
 		New:        newState,
 		TimeJumped: timeJumped,
@@ -437,7 +433,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) {
 	}
 	m.changed.Publish(delta)
 	for _, cb := range m.cbs {
-		go cb(delta)
+		go cb(&delta)
 	}
 }
 

+ 1 - 1
net/netmon/netmon_test.go

@@ -81,7 +81,7 @@ func TestMonitorInjectEventOnBus(t *testing.T) {
 
 	mon.Start()
 	mon.InjectEvent()
-	if err := eventbustest.Expect(tw, eventbustest.Type[*ChangeDelta]()); err != nil {
+	if err := eventbustest.Expect(tw, eventbustest.Type[ChangeDelta]()); err != nil {
 		t.Error(err)
 	}
 }

+ 50 - 28
wgengine/userspace.go

@@ -93,26 +93,28 @@ const networkLoggerUploadTimeout = 5 * time.Second
 type userspaceEngine struct {
 	// eventBus will eventually become required, but for now may be nil.
 	// TODO(creachadair): Enforce that this is non-nil at construction.
-	eventBus *eventbus.Bus
-
-	logf             logger.Logf
-	wgLogger         *wglog.Logger // a wireguard-go logging wrapper
-	reqCh            chan struct{}
-	waitCh           chan struct{} // chan is closed when first Close call completes; contrast with closing bool
-	timeNow          func() mono.Time
-	tundev           *tstun.Wrapper
-	wgdev            *device.Device
-	router           router.Router
-	dialer           *tsdial.Dialer
-	confListenPort   uint16 // original conf.ListenPort
-	dns              *dns.Manager
-	magicConn        *magicsock.Conn
-	netMon           *netmon.Monitor
-	health           *health.Tracker
-	netMonOwned      bool                // whether we created netMon (and thus need to close it)
-	netMonUnregister func()              // unsubscribes from changes; used regardless of netMonOwned
-	birdClient       BIRDClient          // or nil
-	controlKnobs     *controlknobs.Knobs // or nil
+	eventBus       *eventbus.Bus
+	eventClient    *eventbus.Client
+	changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta]
+	subsDoneCh     chan struct{} // closed when consumeEventbusTopics returns
+
+	logf           logger.Logf
+	wgLogger       *wglog.Logger // a wireguard-go logging wrapper
+	reqCh          chan struct{}
+	waitCh         chan struct{} // chan is closed when first Close call completes; contrast with closing bool
+	timeNow        func() mono.Time
+	tundev         *tstun.Wrapper
+	wgdev          *device.Device
+	router         router.Router
+	dialer         *tsdial.Dialer
+	confListenPort uint16 // original conf.ListenPort
+	dns            *dns.Manager
+	magicConn      *magicsock.Conn
+	netMon         *netmon.Monitor
+	health         *health.Tracker
+	netMonOwned    bool                // whether we created netMon (and thus need to close it)
+	birdClient     BIRDClient          // or nil
+	controlKnobs   *controlknobs.Knobs // or nil
 
 	testMaybeReconfigHook func() // for tests; if non-nil, fires if maybeReconfigWireguardLocked called
 
@@ -352,7 +354,11 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
 		controlKnobs:   conf.ControlKnobs,
 		reconfigureVPN: conf.ReconfigureVPN,
 		health:         conf.HealthTracker,
+		subsDoneCh:     make(chan struct{}),
 	}
+	e.eventClient = e.eventBus.Client("userspaceEngine")
+	e.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](e.eventClient)
+	closePool.addFunc(e.eventClient.Close)
 
 	if e.birdClient != nil {
 		// Disable the protocol at start time.
@@ -385,13 +391,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
 
 	logf("link state: %+v", e.netMon.InterfaceState())
 
-	unregisterMonWatch := e.netMon.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
-		tshttpproxy.InvalidateCache()
-		e.linkChange(delta)
-	})
-	closePool.addFunc(unregisterMonWatch)
-	e.netMonUnregister = unregisterMonWatch
-
 	endpointsFn := func(endpoints []tailcfg.Endpoint) {
 		e.mu.Lock()
 		e.endpoints = append(e.endpoints[:0], endpoints...)
@@ -546,10 +545,31 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
 		}
 	}
 
+	go e.consumeEventbusTopics()
+
 	e.logf("Engine created.")
 	return e, nil
 }
 
+// consumeEventbusTopics consumes events from all relevant
+// [eventbus.Subscriber]'s and passes them to their related handler. Events are
+// always handled in the order they are received, i.e. the next event is not
+// read until the previous event's handler has returned. It returns when the
+// [eventbus.Client] is closed.
+func (e *userspaceEngine) consumeEventbusTopics() {
+	defer close(e.subsDoneCh)
+
+	for {
+		select {
+		case <-e.eventClient.Done():
+			return
+		case changeDelta := <-e.changeDeltaSub.Events():
+			tshttpproxy.InvalidateCache()
+			e.linkChange(&changeDelta)
+		}
+	}
+}
+
 // echoRespondToAll is an inbound post-filter responding to all echo requests.
 func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) {
 	if p.IsEchoRequest() {
@@ -1208,6 +1228,9 @@ func (e *userspaceEngine) RequestStatus() {
 }
 
 func (e *userspaceEngine) Close() {
+	e.eventClient.Close()
+	<-e.subsDoneCh
+
 	e.mu.Lock()
 	if e.closing {
 		e.mu.Unlock()
@@ -1219,7 +1242,6 @@ func (e *userspaceEngine) Close() {
 	r := bufio.NewReader(strings.NewReader(""))
 	e.wgdev.IpcSetOperation(r)
 	e.magicConn.Close()
-	e.netMonUnregister()
 	if e.netMonOwned {
 		e.netMon.Close()
 	}