Browse Source

net/netmon: remove usage of direct callbacks from netmon (#17292)

The callback itself is not removed as it is used in other repos, making
it simpler for those to slowly transition to the eventbus.

Updates #15160

Signed-off-by: Claus Lensbøl <[email protected]>
Claus Lensbøl 5 months ago
parent
commit
ce752b8a88

+ 25 - 8
cmd/tailscaled/debug.go

@@ -104,14 +104,10 @@ func runMonitor(ctx context.Context, loop bool) error {
 	}
 	defer mon.Close()
 
-	mon.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
-		if !delta.Major {
-			log.Printf("Network monitor fired; not a major change")
-			return
-		}
-		log.Printf("Network monitor fired. New state:")
-		dump(delta.New)
-	})
+	eventClient := b.Client("debug.runMonitor")
+	m := eventClient.Monitor(changeDeltaWatcher(eventClient, ctx, dump))
+	defer m.Close()
+
 	if loop {
 		log.Printf("Starting link change monitor; initial state:")
 	}
@@ -124,6 +120,27 @@ func runMonitor(ctx context.Context, loop bool) error {
 	select {}
 }
 
+func changeDeltaWatcher(ec *eventbus.Client, ctx context.Context, dump func(st *netmon.State)) func(*eventbus.Client) {
+	changeSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
+	return func(ec *eventbus.Client) {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-ec.Done():
+				return
+			case delta := <-changeSub.Events():
+				if !delta.Major {
+					log.Printf("Network monitor fired; not a major change")
+					return
+				}
+				log.Printf("Network monitor fired. New state:")
+				dump(delta.New)
+			}
+		}
+	}
+}
+
 func getURL(ctx context.Context, urlStr string) error {
 	if urlStr == "login" {
 		urlStr = "https://login.tailscale.com"

+ 9 - 2
cmd/tailscaled/tailscaled.go

@@ -433,7 +433,13 @@ func run() (err error) {
 
 	var publicLogID logid.PublicID
 	if buildfeatures.HasLogTail {
-		pol := logpolicy.New(logtail.CollectionNode, netMon, sys.HealthTracker.Get(), nil /* use log.Printf */)
+
+		pol := logpolicy.Options{
+			Collection: logtail.CollectionNode,
+			NetMon:     netMon,
+			Health:     sys.HealthTracker.Get(),
+			Bus:        sys.Bus.Get(),
+		}.New()
 		pol.SetVerbosityLevel(args.verbose)
 		publicLogID = pol.PublicID
 		logPol = pol
@@ -470,7 +476,7 @@ func run() (err error) {
 	// Always clean up, even if we're going to run the server. This covers cases
 	// such as when a system was rebooted without shutting down, or tailscaled
 	// crashed, and would for example restore system DNS configuration.
-	dns.CleanUp(logf, netMon, sys.HealthTracker.Get(), args.tunname)
+	dns.CleanUp(logf, netMon, sys.Bus.Get(), sys.HealthTracker.Get(), args.tunname)
 	router.CleanUp(logf, netMon, args.tunname)
 	// If the cleanUp flag was passed, then exit.
 	if args.cleanUp {
@@ -616,6 +622,7 @@ func getLocalBackend(ctx context.Context, logf logger.Logf, logID logid.PublicID
 	}
 
 	dialer := &tsdial.Dialer{Logf: logf} // mutated below (before used)
+	dialer.SetBus(sys.Bus.Get())
 	sys.Set(dialer)
 
 	onlyNetstack, err := createEngine(logf, sys)

+ 1 - 1
cmd/tsconnect/wasm/wasm_js.go

@@ -104,6 +104,7 @@ func newIPN(jsConfig js.Value) map[string]any {
 	sys := tsd.NewSystem()
 	sys.Set(store)
 	dialer := &tsdial.Dialer{Logf: logf}
+	dialer.SetBus(sys.Bus.Get())
 	eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{
 		Dialer:        dialer,
 		SetSubsystem:  sys.Set,
@@ -463,7 +464,6 @@ func (s *jsSSHSession) Run() {
 		cols = s.pendingResizeCols
 	}
 	err = session.RequestPty("xterm", rows, cols, ssh.TerminalModes{})
-
 	if err != nil {
 		writeError("Pseudo Terminal", err)
 		return

+ 2 - 0
control/controlclient/controlclient_test.go

@@ -223,6 +223,7 @@ func TestDirectProxyManual(t *testing.T) {
 
 	dialer := &tsdial.Dialer{}
 	dialer.SetNetMon(netmon.NewStatic())
+	dialer.SetBus(bus)
 
 	opts := Options{
 		Persist: persist.Persist{},
@@ -300,6 +301,7 @@ func testHTTPS(t *testing.T, withProxy bool) {
 
 	dialer := &tsdial.Dialer{}
 	dialer.SetNetMon(netmon.NewStatic())
+	dialer.SetBus(bus)
 	dialer.SetSystemDialerForTest(func(ctx context.Context, network, addr string) (net.Conn, error) {
 		host, _, err := net.SplitHostPort(addr)
 		if err != nil {

+ 6 - 2
control/controlclient/direct_test.go

@@ -27,13 +27,15 @@ func TestNewDirect(t *testing.T) {
 	bus := eventbustest.NewBus(t)
 
 	k := key.NewMachine()
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
 	opts := Options{
 		ServerURL: "https://example.com",
 		Hostinfo:  hi,
 		GetMachinePrivateKey: func() (key.MachinePrivate, error) {
 			return k, nil
 		},
-		Dialer: tsdial.NewDialer(netmon.NewStatic()),
+		Dialer: dialer,
 		Bus:    bus,
 	}
 	c, err := NewDirect(opts)
@@ -105,13 +107,15 @@ func TestTsmpPing(t *testing.T) {
 	bus := eventbustest.NewBus(t)
 
 	k := key.NewMachine()
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
 	opts := Options{
 		ServerURL: "https://example.com",
 		Hostinfo:  hi,
 		GetMachinePrivateKey: func() (key.MachinePrivate, error) {
 			return k, nil
 		},
-		Dialer: tsdial.NewDialer(netmon.NewStatic()),
+		Dialer: dialer,
 		Bus:    bus,
 	}
 

+ 3 - 0
control/controlclient/noise_test.go

@@ -22,6 +22,7 @@ import (
 	"tailscale.com/tstest/nettest"
 	"tailscale.com/types/key"
 	"tailscale.com/types/logger"
+	"tailscale.com/util/eventbus/eventbustest"
 )
 
 // maxAllowedNoiseVersion is the highest we expect the Tailscale
@@ -175,6 +176,7 @@ func (tt noiseClientTest) run(t *testing.T) {
 	serverPrivate := key.NewMachine()
 	clientPrivate := key.NewMachine()
 	chalPrivate := key.NewChallenge()
+	bus := eventbustest.NewBus(t)
 
 	const msg = "Hello, client"
 	h2 := &http2.Server{}
@@ -194,6 +196,7 @@ func (tt noiseClientTest) run(t *testing.T) {
 	defer hs.Close()
 
 	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
 	if nettest.PreferMemNetwork() {
 		dialer.SetSystemDialerForTest(nw.Dial)
 	}

+ 5 - 2
control/controlhttp/http_test.go

@@ -149,6 +149,8 @@ func testControlHTTP(t *testing.T, param httpTestParam) {
 	proxy := param.proxy
 	client, server := key.NewMachine(), key.NewMachine()
 
+	bus := eventbustest.NewBus(t)
+
 	const testProtocolVersion = 1
 	const earlyWriteMsg = "Hello, world!"
 	sch := make(chan serverResult, 1)
@@ -218,6 +220,7 @@ func testControlHTTP(t *testing.T, param httpTestParam) {
 
 	netMon := netmon.NewStatic()
 	dialer := tsdial.NewDialer(netMon)
+	dialer.SetBus(bus)
 	a := &Dialer{
 		Hostname:             "localhost",
 		HTTPPort:             strconv.Itoa(httpLn.Addr().(*net.TCPAddr).Port),
@@ -775,7 +778,7 @@ func runDialPlanTest(t *testing.T, plan *tailcfg.ControlDialPlan, want []netip.A
 	if allowFallback {
 		host = fallbackAddr.String()
 	}
-
+	bus := eventbustest.NewBus(t)
 	a := &Dialer{
 		Hostname:             host,
 		HTTPPort:             httpPort,
@@ -790,7 +793,7 @@ func runDialPlanTest(t *testing.T, plan *tailcfg.ControlDialPlan, want []netip.A
 		omitCertErrorLogging: true,
 		testFallbackDelay:    50 * time.Millisecond,
 		Clock:                clock,
-		HealthTracker:        health.NewTracker(eventbustest.NewBus(t)),
+		HealthTracker:        health.NewTracker(bus),
 	}
 
 	start := time.Now()

+ 1 - 1
ipn/ipnlocal/local.go

@@ -526,7 +526,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
 	}()
 
 	netMon := sys.NetMon.Get()
-	b.sockstatLogger, err = sockstatlog.NewLogger(logpolicy.LogsDir(logf), logf, logID, netMon, sys.HealthTracker.Get())
+	b.sockstatLogger, err = sockstatlog.NewLogger(logpolicy.LogsDir(logf), logf, logID, netMon, sys.HealthTracker.Get(), sys.Bus.Get())
 	if err != nil {
 		log.Printf("error setting up sockstat logger: %v", err)
 	}

+ 6 - 2
ipn/ipnlocal/local_test.go

@@ -480,7 +480,9 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend {
 		t.Log("Added fake userspace engine for testing")
 	}
 	if _, ok := sys.Dialer.GetOK(); !ok {
-		sys.Set(tsdial.NewDialer(netmon.NewStatic()))
+		dialer := tsdial.NewDialer(netmon.NewStatic())
+		dialer.SetBus(sys.Bus.Get())
+		sys.Set(dialer)
 		t.Log("Added static dialer for testing")
 	}
 	lb, err := NewLocalBackend(logf, logid.PublicID{}, sys, 0)
@@ -3108,12 +3110,14 @@ func TestAutoExitNodeSetNetInfoCallback(t *testing.T) {
 	b.hostinfo = hi
 	k := key.NewMachine()
 	var cc *mockControl
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(sys.Bus.Get())
 	opts := controlclient.Options{
 		ServerURL: "https://example.com",
 		GetMachinePrivateKey: func() (key.MachinePrivate, error) {
 			return k, nil
 		},
-		Dialer:       tsdial.NewDialer(netmon.NewStatic()),
+		Dialer:       dialer,
 		Logf:         b.logf,
 		PolicyClient: polc,
 	}

+ 3 - 1
ipn/ipnlocal/network-lock_test.go

@@ -54,6 +54,8 @@ func fakeControlClient(t *testing.T, c *http.Client) (*controlclient.Auto, *even
 	bus := eventbustest.NewBus(t)
 
 	k := key.NewMachine()
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
 	opts := controlclient.Options{
 		ServerURL: "https://example.com",
 		Hostinfo:  hi,
@@ -63,7 +65,7 @@ func fakeControlClient(t *testing.T, c *http.Client) (*controlclient.Auto, *even
 		HTTPTestClient:  c,
 		NoiseTestClient: c,
 		Observer:        observerFunc(func(controlclient.Status) {}),
-		Dialer:          tsdial.NewDialer(netmon.NewStatic()),
+		Dialer:          dialer,
 		Bus:             bus,
 	}
 

+ 1 - 0
ipn/ipnlocal/state_test.go

@@ -1668,6 +1668,7 @@ func newLocalBackendWithMockEngineAndControl(t *testing.T, enableLogging bool) (
 	sys := tsd.NewSystemWithBus(bus)
 	sys.Set(dialer)
 	sys.Set(dialer.NetMon())
+	dialer.SetBus(bus)
 
 	magicConn, err := magicsock.NewConn(magicsock.Options{
 		Logf:              logf,

+ 3 - 1
log/sockstatlog/logger.go

@@ -26,6 +26,7 @@ import (
 	"tailscale.com/net/sockstats"
 	"tailscale.com/types/logger"
 	"tailscale.com/types/logid"
+	"tailscale.com/util/eventbus"
 	"tailscale.com/util/mak"
 )
 
@@ -97,7 +98,7 @@ func SockstatLogID(logID logid.PublicID) logid.PrivateID {
 //
 // The netMon parameter is optional. It should be specified in environments where
 // Tailscaled is manipulating the routing table.
-func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID, netMon *netmon.Monitor, health *health.Tracker) (*Logger, error) {
+func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus) (*Logger, error) {
 	if !sockstats.IsAvailable || !buildfeatures.HasLogTail {
 		return nil, nil
 	}
@@ -127,6 +128,7 @@ func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID, netMon *ne
 		PrivateID:    SockstatLogID(logID),
 		Collection:   "sockstats.log.tailscale.io",
 		Buffer:       filch,
+		Bus:          bus,
 		CompressLogs: true,
 		FlushDelayFn: func() time.Duration {
 			// set flush delay to 100 years so it never flushes automatically

+ 1 - 1
log/sockstatlog/logger_test.go

@@ -24,7 +24,7 @@ func TestResourceCleanup(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	lg, err := NewLogger(td, logger.Discard, id.Public(), nil, nil)
+	lg, err := NewLogger(td, logger.Discard, id.Public(), nil, nil, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 7 - 0
logpolicy/logpolicy.go

@@ -50,6 +50,7 @@ import (
 	"tailscale.com/types/logger"
 	"tailscale.com/types/logid"
 	"tailscale.com/util/clientmetric"
+	"tailscale.com/util/eventbus"
 	"tailscale.com/util/must"
 	"tailscale.com/util/racebuild"
 	"tailscale.com/util/syspolicy/pkey"
@@ -489,6 +490,11 @@ type Options struct {
 	// If non-nil, it's used to construct the default HTTP client.
 	Health *health.Tracker
 
+	// Bus is an optional parameter for communication on the eventbus.
+	// If non-nil, it's passed to logtail for use in interface monitoring.
+	// TODO(cmol): Make this non-optional when it's plumbed in by the clients.
+	Bus *eventbus.Bus
+
 	// Logf is an optional logger to use.
 	// If nil, [log.Printf] will be used instead.
 	Logf logger.Logf
@@ -615,6 +621,7 @@ func (opts Options) init(disableLogging bool) (*logtail.Config, *Policy) {
 		Stderr:        logWriter{console},
 		CompressLogs:  true,
 		MaxUploadSize: opts.MaxUploadSize,
+		Bus:           opts.Bus,
 	}
 	if opts.Collection == logtail.CollectionNode {
 		conf.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta

+ 2 - 0
logtail/config.go

@@ -10,6 +10,7 @@ import (
 
 	"tailscale.com/tstime"
 	"tailscale.com/types/logid"
+	"tailscale.com/util/eventbus"
 )
 
 // DefaultHost is the default host name to upload logs to when
@@ -34,6 +35,7 @@ type Config struct {
 	LowMemory      bool            // if true, logtail minimizes memory use
 	Clock          tstime.Clock    // if set, Clock.Now substitutes uses of time.Now
 	Stderr         io.Writer       // if set, logs are sent here instead of os.Stderr
+	Bus            *eventbus.Bus   // if set, uses the eventbus for awaitInternetUp instead of callback
 	StderrLevel    int             // max verbosity level to write to stderr; 0 means the non-verbose messages only
 	Buffer         Buffer          // temp storage, if nil a MemoryBuffer
 	CompressLogs   bool            // whether to compress the log uploads

+ 31 - 0
logtail/logtail.go

@@ -32,6 +32,7 @@ import (
 	"tailscale.com/tstime"
 	tslogger "tailscale.com/types/logger"
 	"tailscale.com/types/logid"
+	"tailscale.com/util/eventbus"
 	"tailscale.com/util/set"
 	"tailscale.com/util/truncate"
 	"tailscale.com/util/zstdframe"
@@ -120,6 +121,10 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
 		shutdownStart: make(chan struct{}),
 		shutdownDone:  make(chan struct{}),
 	}
+
+	if cfg.Bus != nil {
+		l.eventClient = cfg.Bus.Client("logtail.Logger")
+	}
 	l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
 	l.compressLogs = cfg.CompressLogs
 
@@ -156,6 +161,7 @@ type Logger struct {
 	privateID      logid.PrivateID
 	httpDoCalls    atomic.Int32
 	sockstatsLabel atomicSocktatsLabel
+	eventClient    *eventbus.Client
 
 	procID              uint32
 	includeProcSequence bool
@@ -221,6 +227,9 @@ func (l *Logger) Shutdown(ctx context.Context) error {
 		l.httpc.CloseIdleConnections()
 	}()
 
+	if l.eventClient != nil {
+		l.eventClient.Close()
+	}
 	l.shutdownStartMu.Lock()
 	select {
 	case <-l.shutdownStart:
@@ -417,6 +426,10 @@ func (l *Logger) internetUp() bool {
 }
 
 func (l *Logger) awaitInternetUp(ctx context.Context) {
+	if l.eventClient != nil {
+		l.awaitInternetUpBus(ctx)
+		return
+	}
 	upc := make(chan bool, 1)
 	defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
 		if delta.New.AnyInterfaceUp() {
@@ -436,6 +449,24 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
 	}
 }
 
+func (l *Logger) awaitInternetUpBus(ctx context.Context) {
+	if l.internetUp() {
+		return
+	}
+	sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
+	defer sub.Close()
+	select {
+	case delta := <-sub.Events():
+		if delta.New.AnyInterfaceUp() {
+			fmt.Fprintf(l.stderr, "logtail: internet back up\n")
+			return
+		}
+		fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
+	case <-ctx.Done():
+		return
+	}
+}
+
 // upload uploads body to the log server.
 // origlen indicates the pre-compression body length.
 // origlen of -1 indicates that the body is not compressed.

+ 6 - 1
logtail/logtail_test.go

@@ -17,6 +17,7 @@ import (
 	"github.com/go-json-experiment/json/jsontext"
 	"tailscale.com/tstest"
 	"tailscale.com/tstime"
+	"tailscale.com/util/eventbus/eventbustest"
 	"tailscale.com/util/must"
 )
 
@@ -30,6 +31,7 @@ func TestFastShutdown(t *testing.T) {
 
 	l := NewLogger(Config{
 		BaseURL: testServ.URL,
+		Bus:     eventbustest.NewBus(t),
 	}, t.Logf)
 	err := l.Shutdown(ctx)
 	if err != nil {
@@ -62,7 +64,10 @@ func NewLogtailTestHarness(t *testing.T) (*LogtailTestServer, *Logger) {
 
 	t.Cleanup(ts.srv.Close)
 
-	l := NewLogger(Config{BaseURL: ts.srv.URL}, t.Logf)
+	l := NewLogger(Config{
+		BaseURL: ts.srv.URL,
+		Bus:     eventbustest.NewBus(t),
+	}, t.Logf)
 
 	// There is always an initial "logtail started" message
 	body := <-ts.uploaded

+ 3 - 1
net/dns/manager.go

@@ -30,6 +30,7 @@ import (
 	"tailscale.com/types/logger"
 	"tailscale.com/util/clientmetric"
 	"tailscale.com/util/dnsname"
+	"tailscale.com/util/eventbus"
 	"tailscale.com/util/slicesx"
 	"tailscale.com/util/syspolicy/policyclient"
 )
@@ -600,7 +601,7 @@ func (m *Manager) FlushCaches() error {
 // No other state needs to be instantiated before this runs.
 //
 // health must not be nil
-func CleanUp(logf logger.Logf, netMon *netmon.Monitor, health *health.Tracker, interfaceName string) {
+func CleanUp(logf logger.Logf, netMon *netmon.Monitor, bus *eventbus.Bus, health *health.Tracker, interfaceName string) {
 	if !buildfeatures.HasDNS {
 		return
 	}
@@ -611,6 +612,7 @@ func CleanUp(logf logger.Logf, netMon *netmon.Monitor, health *health.Tracker, i
 	}
 	d := &tsdial.Dialer{Logf: logf}
 	d.SetNetMon(netMon)
+	d.SetBus(bus)
 	dns := NewManager(logf, oscfg, health, d, nil, nil, runtime.GOOS)
 	if err := dns.Down(); err != nil {
 		logf("dns down: %v", err)

+ 8 - 2
net/dns/manager_tcp_test.go

@@ -90,7 +90,10 @@ func TestDNSOverTCP(t *testing.T) {
 			SearchDomains: fqdns("coffee.shop"),
 		},
 	}
-	m := NewManager(t.Logf, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "")
+	bus := eventbustest.NewBus(t)
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
+	m := NewManager(t.Logf, &f, health.NewTracker(bus), dialer, nil, nil, "")
 	m.resolver.TestOnlySetHook(f.SetResolver)
 	m.Set(Config{
 		Hosts: hosts(
@@ -175,7 +178,10 @@ func TestDNSOverTCP_TooLarge(t *testing.T) {
 			SearchDomains: fqdns("coffee.shop"),
 		},
 	}
-	m := NewManager(log, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "")
+	bus := eventbustest.NewBus(t)
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
+	m := NewManager(log, &f, health.NewTracker(bus), dialer, nil, nil, "")
 	m.resolver.TestOnlySetHook(f.SetResolver)
 	m.Set(Config{
 		Hosts:         hosts("andrew.ts.com.", "1.2.3.4"),

+ 8 - 2
net/dns/manager_test.go

@@ -933,7 +933,10 @@ func TestManager(t *testing.T) {
 				goos = "linux"
 			}
 			knobs := &controlknobs.Knobs{}
-			m := NewManager(t.Logf, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, knobs, goos)
+			bus := eventbustest.NewBus(t)
+			dialer := tsdial.NewDialer(netmon.NewStatic())
+			dialer.SetBus(bus)
+			m := NewManager(t.Logf, &f, health.NewTracker(bus), dialer, nil, knobs, goos)
 			m.resolver.TestOnlySetHook(f.SetResolver)
 
 			if err := m.Set(test.in); err != nil {
@@ -1039,7 +1042,10 @@ func TestConfigRecompilation(t *testing.T) {
 		SearchDomains: fqdns("foo.ts.net"),
 	}
 
-	m := NewManager(t.Logf, f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "darwin")
+	bus := eventbustest.NewBus(t)
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
+	m := NewManager(t.Logf, f, health.NewTracker(bus), dialer, nil, nil, "darwin")
 
 	var managerConfig *resolver.Config
 	m.resolver.TestOnlySetHook(func(cfg resolver.Config) {

+ 1 - 1
net/dns/resolver/forwarder_test.go

@@ -122,7 +122,6 @@ func TestResolversWithDelays(t *testing.T) {
 			}
 		})
 	}
-
 }
 
 func TestGetRCode(t *testing.T) {
@@ -454,6 +453,7 @@ func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports
 
 	var dialer tsdial.Dialer
 	dialer.SetNetMon(netMon)
+	dialer.SetBus(bus)
 
 	fwd := newForwarder(logf, netMon, nil, &dialer, health.NewTracker(bus), nil)
 	if modify != nil {

+ 5 - 2
net/dns/resolver/tsdns_test.go

@@ -353,10 +353,13 @@ func TestRDNSNameToIPv6(t *testing.T) {
 }
 
 func newResolver(t testing.TB) *Resolver {
+	bus := eventbustest.NewBus(t)
+	dialer := tsdial.NewDialer(netmon.NewStatic())
+	dialer.SetBus(bus)
 	return New(t.Logf,
 		nil, // no link selector
-		tsdial.NewDialer(netmon.NewStatic()),
-		health.NewTracker(eventbustest.NewBus(t)),
+		dialer,
+		health.NewTracker(bus),
 		nil, // no control knobs
 	)
 }

+ 19 - 3
net/netmon/loghelper.go

@@ -8,6 +8,7 @@ import (
 	"sync"
 
 	"tailscale.com/types/logger"
+	"tailscale.com/util/eventbus"
 )
 
 // LinkChangeLogLimiter returns a new [logger.Logf] that logs each unique
@@ -17,13 +18,12 @@ import (
 // done.
 func LinkChangeLogLimiter(ctx context.Context, logf logger.Logf, nm *Monitor) logger.Logf {
 	var formatSeen sync.Map // map[string]bool
-	unregister := nm.RegisterChangeCallback(func(cd *ChangeDelta) {
+	nm.b.Monitor(nm.changeDeltaWatcher(nm.b, ctx, func(cd ChangeDelta) {
 		// If we're in a major change or a time jump, clear the seen map.
 		if cd.Major || cd.TimeJumped {
 			formatSeen.Clear()
 		}
-	})
-	context.AfterFunc(ctx, unregister)
+	}))
 
 	return func(format string, args ...any) {
 		// We only store 'true' in the map, so if it's present then it
@@ -42,3 +42,19 @@ func LinkChangeLogLimiter(ctx context.Context, logf logger.Logf, nm *Monitor) lo
 		logf(format, args...)
 	}
 }
+
+func (nm *Monitor) changeDeltaWatcher(ec *eventbus.Client, ctx context.Context, fn func(ChangeDelta)) func(*eventbus.Client) {
+	sub := eventbus.Subscribe[ChangeDelta](ec)
+	return func(ec *eventbus.Client) {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-sub.Done():
+				return
+			case change := <-sub.Events():
+				fn(change)
+			}
+		}
+	}
+}

+ 8 - 13
net/netmon/loghelper_test.go

@@ -11,6 +11,7 @@ import (
 	"testing/synctest"
 
 	"tailscale.com/util/eventbus"
+	"tailscale.com/util/eventbus/eventbustest"
 )
 
 func TestLinkChangeLogLimiter(t *testing.T) { synctest.Test(t, syncTestLinkChangeLogLimiter) }
@@ -61,21 +62,15 @@ func syncTestLinkChangeLogLimiter(t *testing.T) {
 	// string cache and allow the next log to write to our log buffer.
 	//
 	// InjectEvent doesn't work because it's not a major event, so we
-	// instead reach into the netmon and grab the callback, and then call
-	// it ourselves.
-	mon.mu.Lock()
-	var cb func(*ChangeDelta)
-	for _, c := range mon.cbs {
-		cb = c
-		break
-	}
-	mon.mu.Unlock()
-
-	cb(&ChangeDelta{Major: true})
+	// instead inject the event ourselves.
+	injector := eventbustest.NewInjector(t, bus)
+	eventbustest.Inject(injector, ChangeDelta{Major: true})
+	synctest.Wait()
 
 	logf("hello %s", "world")
-	if got := logBuffer.String(); got != "hello world\nother message\nhello world\n" {
-		t.Errorf("unexpected log buffer contents: %q", got)
+	want := "hello world\nother message\nhello world\n"
+	if got := logBuffer.String(); got != want {
+		t.Errorf("unexpected log buffer contents, got: %q, want, %q", got, want)
 	}
 
 	// Canceling the context we passed to LinkChangeLogLimiter should

+ 43 - 0
net/tsdial/tsdial.go

@@ -28,6 +28,7 @@ import (
 	"tailscale.com/types/logger"
 	"tailscale.com/types/netmap"
 	"tailscale.com/util/clientmetric"
+	"tailscale.com/util/eventbus"
 	"tailscale.com/util/mak"
 	"tailscale.com/util/testenv"
 	"tailscale.com/version"
@@ -86,6 +87,8 @@ type Dialer struct {
 	dnsCache         *dnscache.MessageCache // nil until first non-empty SetExitDNSDoH
 	nextSysConnID    int
 	activeSysConns   map[int]net.Conn // active connections not yet closed
+	eventClient      *eventbus.Client
+	eventBusSubs     eventbus.Monitor
 }
 
 // sysConn wraps a net.Conn that was created using d.SystemDial.
@@ -158,6 +161,9 @@ func (d *Dialer) SetRoutes(routes, localRoutes []netip.Prefix) {
 }
 
 func (d *Dialer) Close() error {
+	if d.eventClient != nil {
+		d.eventBusSubs.Close()
+	}
 	d.mu.Lock()
 	defer d.mu.Unlock()
 	d.closed = true
@@ -186,6 +192,14 @@ func (d *Dialer) SetNetMon(netMon *netmon.Monitor) {
 		d.netMonUnregister = nil
 	}
 	d.netMon = netMon
+	// Having multiple watchers could lead to problems,
+	// so remove the eventClient if it exists.
+	// This should really not happen, but better checking for it than not.
+	// TODO(cmol): Should this just be a panic?
+	if d.eventClient != nil {
+		d.eventBusSubs.Close()
+		d.eventClient = nil
+	}
 	d.netMonUnregister = d.netMon.RegisterChangeCallback(d.linkChanged)
 }
 
@@ -197,6 +211,35 @@ func (d *Dialer) NetMon() *netmon.Monitor {
 	return d.netMon
 }
 
+func (d *Dialer) SetBus(bus *eventbus.Bus) {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	if d.eventClient != nil {
+		panic("eventbus has already been set")
+	}
+	// Having multiple watchers could lead to problems,
+	// so unregister the callback if it exists.
+	if d.netMonUnregister != nil {
+		d.netMonUnregister()
+	}
+	d.eventClient = bus.Client("tsdial.Dialer")
+	d.eventBusSubs = d.eventClient.Monitor(d.linkChangeWatcher(d.eventClient))
+}
+
+func (d *Dialer) linkChangeWatcher(ec *eventbus.Client) func(*eventbus.Client) {
+	linkChangeSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
+	return func(ec *eventbus.Client) {
+		for {
+			select {
+			case <-ec.Done():
+				return
+			case cd := <-linkChangeSub.Events():
+				d.linkChanged(&cd)
+			}
+		}
+	}
+}
+
 var (
 	metricLinkChangeConnClosed      = clientmetric.NewCounter("tsdial_linkchange_closes")
 	metricChangeDeltaNoDefaultRoute = clientmetric.NewCounter("tsdial_changedelta_no_default_route")

+ 2 - 0
tsnet/tsnet.go

@@ -592,6 +592,7 @@ func (s *Server) start() (reterr error) {
 	closePool.add(s.netMon)
 
 	s.dialer = &tsdial.Dialer{Logf: tsLogf} // mutated below (before used)
+	s.dialer.SetBus(sys.Bus.Get())
 	eng, err := wgengine.NewUserspaceEngine(tsLogf, wgengine.Config{
 		EventBus:      sys.Bus.Get(),
 		ListenPort:    s.Port,
@@ -767,6 +768,7 @@ func (s *Server) startLogger(closePool *closeOnErrorPool, health *health.Tracker
 		Stderr:       io.Discard, // log everything to Buffer
 		Buffer:       s.logbuffer,
 		CompressLogs: true,
+		Bus:          s.sys.Bus.Get(),
 		HTTPC:        &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost, s.netMon, health, tsLogf)},
 		MetricsDelta: clientmetric.EncodeLogTailMetricsDelta,
 	}

+ 3 - 1
wgengine/netlog/netlog.go

@@ -29,6 +29,7 @@ import (
 	"tailscale.com/tailcfg"
 	"tailscale.com/types/logid"
 	"tailscale.com/types/netlogtype"
+	"tailscale.com/util/eventbus"
 	"tailscale.com/wgengine/router"
 )
 
@@ -95,7 +96,7 @@ var testClient *http.Client
 // The IP protocol and source port are always zero.
 // The sock is used to populated the PhysicalTraffic field in Message.
 // The netMon parameter is optional; if non-nil it's used to do faster interface lookups.
-func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID logid.PrivateID, tun, sock Device, netMon *netmon.Monitor, health *health.Tracker, logExitFlowEnabledEnabled bool) error {
+func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID logid.PrivateID, tun, sock Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus, logExitFlowEnabledEnabled bool) error {
 	nl.mu.Lock()
 	defer nl.mu.Unlock()
 	if nl.logger != nil {
@@ -112,6 +113,7 @@ func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID lo
 		Collection:    "tailtraffic.log.tailscale.io",
 		PrivateID:     nodeLogID,
 		CopyPrivateID: domainLogID,
+		Bus:           bus,
 		Stderr:        io.Discard,
 		CompressLogs:  true,
 		HTTPC:         httpc,

+ 5 - 1
wgengine/userspace.go

@@ -312,6 +312,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
 	}
 	if conf.Dialer == nil {
 		conf.Dialer = &tsdial.Dialer{Logf: logf}
+		if conf.EventBus != nil {
+			conf.Dialer.SetBus(conf.EventBus)
+		}
 	}
 
 	var tsTUNDev *tstun.Wrapper
@@ -379,6 +382,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
 	tunName, _ := conf.Tun.Name()
 	conf.Dialer.SetTUNName(tunName)
 	conf.Dialer.SetNetMon(e.netMon)
+	conf.Dialer.SetBus(e.eventBus)
 	e.dns = dns.NewManager(logf, conf.DNS, e.health, conf.Dialer, fwdDNSLinkSelector{e, tunName}, conf.ControlKnobs, runtime.GOOS)
 
 	// TODO: there's probably a better place for this
@@ -1035,7 +1039,7 @@ func (e *userspaceEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config,
 		tid := cfg.NetworkLogging.DomainID
 		logExitFlowEnabled := cfg.NetworkLogging.LogExitFlowEnabled
 		e.logf("wgengine: Reconfig: starting up network logger (node:%s tailnet:%s)", nid.Public(), tid.Public())
-		if err := e.networkLogger.Startup(cfg.NodeID, nid, tid, e.tundev, e.magicConn, e.netMon, e.health, logExitFlowEnabled); err != nil {
+		if err := e.networkLogger.Startup(cfg.NodeID, nid, tid, e.tundev, e.magicConn, e.netMon, e.health, e.eventBus, logExitFlowEnabled); err != nil {
 			e.logf("wgengine: Reconfig: error starting up network logger: %v", err)
 		}
 		e.networkLogger.ReconfigRoutes(routerCfg)