Browse Source

ipn/ipnlocal: add IPN Bus NotifyRateLimit watch bit NotifyRateLimit

Limit spamming GUIs with boring updates to once in 3 seconds, unless
the notification is relatively interesting and the GUI should update
immediately.

This is basically @barnstar's #14119 but with the logic moved to be
per-watch-session (since the bit is per session), rather than
globally. And this distinguishes notable Notify messages (such as
state changes) and makes them send immediately.

Updates tailscale/corp#24553

Change-Id: I79cac52cce85280ce351e65e76ea11e107b00b49
Signed-off-by: Brad Fitzpatrick <[email protected]>
Brad Fitzpatrick 1 year ago
parent
commit
93db503565
5 changed files with 395 additions and 10 deletions
  1. 5 0
      cmd/tailscale/cli/debug.go
  2. 2 0
      ipn/backend.go
  3. 161 0
      ipn/ipnlocal/bus.go
  4. 220 0
      ipn/ipnlocal/bus_test.go
  5. 7 10
      ipn/ipnlocal/local.go

+ 5 - 0
cmd/tailscale/cli/debug.go

@@ -213,6 +213,7 @@ var debugCmd = &ffcli.Command{
 				fs := newFlagSet("watch-ipn")
 				fs.BoolVar(&watchIPNArgs.netmap, "netmap", true, "include netmap in messages")
 				fs.BoolVar(&watchIPNArgs.initial, "initial", false, "include initial status")
+				fs.BoolVar(&watchIPNArgs.rateLimit, "rate-limit", true, "rate limit messags")
 				fs.BoolVar(&watchIPNArgs.showPrivateKey, "show-private-key", false, "include node private key in printed netmap")
 				fs.IntVar(&watchIPNArgs.count, "count", 0, "exit after printing this many statuses, or 0 to keep going forever")
 				return fs
@@ -500,6 +501,7 @@ var watchIPNArgs struct {
 	netmap         bool
 	initial        bool
 	showPrivateKey bool
+	rateLimit      bool
 	count          int
 }
 
@@ -511,6 +513,9 @@ func runWatchIPN(ctx context.Context, args []string) error {
 	if !watchIPNArgs.showPrivateKey {
 		mask |= ipn.NotifyNoPrivateKeys
 	}
+	if watchIPNArgs.rateLimit {
+		mask |= ipn.NotifyRateLimit
+	}
 	watcher, err := localClient.WatchIPNBus(ctx, mask)
 	if err != nil {
 		return err

+ 2 - 0
ipn/backend.go

@@ -73,6 +73,8 @@ const (
 	NotifyInitialOutgoingFiles // if set, the first Notify message (sent immediately) will contain the current Taildrop OutgoingFiles
 
 	NotifyInitialHealthState // if set, the first Notify message (sent immediately) will contain the current health.State of the client
+
+	NotifyRateLimit // if set, rate limit spammy netmap updates to every few seconds
 )
 
 // Notify is a communication from a backend (e.g. tailscaled) to a frontend

+ 161 - 0
ipn/ipnlocal/bus.go

@@ -0,0 +1,161 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package ipnlocal
+
+import (
+	"context"
+	"time"
+
+	"tailscale.com/ipn"
+	"tailscale.com/tstime"
+)
+
+type rateLimitingBusSender struct {
+	fn              func(*ipn.Notify) (keepGoing bool)
+	lastFlush       time.Time           // last call to fn, or zero value if none
+	interval        time.Duration       // 0 to flush immediately; non-zero to rate limit sends
+	clock           tstime.DefaultClock // non-nil for testing
+	didSendTestHook func()              // non-nil for testing
+
+	// pending, if non-nil, is the pending notification that we
+	// haven't sent yet. We own this memory to mutate.
+	pending *ipn.Notify
+
+	// flushTimer is non-nil if the timer is armed.
+	flushTimer  tstime.TimerController // effectively a *time.Timer
+	flushTimerC <-chan time.Time       // ... said ~Timer's C chan
+}
+
+func (s *rateLimitingBusSender) close() {
+	if s.flushTimer != nil {
+		s.flushTimer.Stop()
+	}
+}
+
+func (s *rateLimitingBusSender) flushChan() <-chan time.Time {
+	return s.flushTimerC
+}
+
+func (s *rateLimitingBusSender) flush() (keepGoing bool) {
+	if n := s.pending; n != nil {
+		s.pending = nil
+		return s.flushNotify(n)
+	}
+	return true
+}
+
+func (s *rateLimitingBusSender) flushNotify(n *ipn.Notify) (keepGoing bool) {
+	s.lastFlush = s.clock.Now()
+	return s.fn(n)
+}
+
+// send conditionally sends n to the underlying fn, possibly rate
+// limiting it, depending on whether s.interval is set, and whether
+// n is a notable notification that the client (typically a GUI) would
+// want to act on (render) immediately.
+//
+// It returns whether the caller should keep looping.
+//
+// The passed-in memory 'n' is owned by the caller and should
+// not be mutated.
+func (s *rateLimitingBusSender) send(n *ipn.Notify) (keepGoing bool) {
+	if s.interval <= 0 {
+		// No rate limiting case.
+		return s.fn(n)
+	}
+	if isNotableNotify(n) {
+		// Notable notifications are always sent immediately.
+		// But first send any boring one that was pending.
+		// TODO(bradfitz): there might be a boring one pending
+		// with a NetMap or Engine field that is redundant
+		// with the new one (n) with NetMap or Engine populated.
+		// We should clear the pending one's NetMap/Engine in
+		// that case. Or really, merge the two, but mergeBoringNotifies
+		// only handles the case of both sides being boring.
+		// So for now, flush both.
+		if !s.flush() {
+			return false
+		}
+		return s.flushNotify(n)
+	}
+	s.pending = mergeBoringNotifies(s.pending, n)
+	d := s.clock.Now().Sub(s.lastFlush)
+	if d > s.interval {
+		return s.flush()
+	}
+	nextFlushIn := s.interval - d
+	if s.flushTimer == nil {
+		s.flushTimer, s.flushTimerC = s.clock.NewTimer(nextFlushIn)
+	} else {
+		s.flushTimer.Reset(nextFlushIn)
+	}
+	return true
+}
+
+func (s *rateLimitingBusSender) Run(ctx context.Context, ch <-chan *ipn.Notify) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case n, ok := <-ch:
+			if !ok {
+				return
+			}
+			if !s.send(n) {
+				return
+			}
+			if f := s.didSendTestHook; f != nil {
+				f()
+			}
+		case <-s.flushChan():
+			if !s.flush() {
+				return
+			}
+		}
+	}
+}
+
+// mergeBoringNotify merges new notify 'src' into possibly-nil 'dst',
+// either mutating 'dst' or allocating a new one if 'dst' is nil,
+// returning the merged result.
+//
+// dst and src must both be "boring" (i.e. not notable per isNotifiableNotify).
+func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify {
+	if dst == nil {
+		dst = &ipn.Notify{Version: src.Version}
+	}
+	if src.NetMap != nil {
+		dst.NetMap = src.NetMap
+	}
+	if src.Engine != nil {
+		dst.Engine = src.Engine
+	}
+	return dst
+}
+
+// isNotableNotify reports whether n is a "notable" notification that
+// should be sent on the IPN bus immediately (e.g. to GUIs) without
+// rate limiting it for a few seconds.
+//
+// It effectively reports whether n contains any field set that's
+// not NetMap or Engine.
+func isNotableNotify(n *ipn.Notify) bool {
+	if n == nil {
+		return false
+	}
+	return n.State != nil ||
+		n.SessionID != "" ||
+		n.BackendLogID != nil ||
+		n.BrowseToURL != nil ||
+		n.LocalTCPPort != nil ||
+		n.ClientVersion != nil ||
+		n.Prefs != nil ||
+		n.ErrMessage != nil ||
+		n.LoginFinished != nil ||
+		!n.DriveShares.IsNil() ||
+		n.Health != nil ||
+		len(n.IncomingFiles) > 0 ||
+		len(n.OutgoingFiles) > 0 ||
+		n.FilesWaiting != nil
+}

+ 220 - 0
ipn/ipnlocal/bus_test.go

@@ -0,0 +1,220 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package ipnlocal
+
+import (
+	"context"
+	"reflect"
+	"slices"
+	"testing"
+	"time"
+
+	"tailscale.com/drive"
+	"tailscale.com/ipn"
+	"tailscale.com/tstest"
+	"tailscale.com/tstime"
+	"tailscale.com/types/logger"
+	"tailscale.com/types/netmap"
+	"tailscale.com/types/views"
+)
+
+func TestIsNotableNotify(t *testing.T) {
+	tests := []struct {
+		name   string
+		notify *ipn.Notify
+		want   bool
+	}{
+		{"nil", nil, false},
+		{"empty", &ipn.Notify{}, false},
+		{"version", &ipn.Notify{Version: "foo"}, false},
+		{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false},
+		{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false},
+	}
+
+	// Then for all other fields, assume they're notable.
+	// We use reflect to catch fields that might be added in the future without
+	// remembering to update the [isNotableNotify] function.
+	rt := reflect.TypeFor[ipn.Notify]()
+	for i := range rt.NumField() {
+		n := &ipn.Notify{}
+		sf := rt.Field(i)
+		switch sf.Name {
+		case "_", "NetMap", "Engine", "Version":
+			// Already covered above or not applicable.
+			continue
+		case "DriveShares":
+			n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1))
+		default:
+			rf := reflect.ValueOf(n).Elem().Field(i)
+			switch rf.Kind() {
+			case reflect.Pointer:
+				rf.Set(reflect.New(rf.Type().Elem()))
+			case reflect.String:
+				rf.SetString("foo")
+			case reflect.Slice:
+				rf.Set(reflect.MakeSlice(rf.Type(), 1, 1))
+			default:
+				t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name)
+			}
+		}
+
+		tests = append(tests, struct {
+			name   string
+			notify *ipn.Notify
+			want   bool
+		}{
+			name:   "field-" + rt.Field(i).Name,
+			notify: n,
+			want:   true,
+		})
+	}
+
+	for _, tt := range tests {
+		if got := isNotableNotify(tt.notify); got != tt.want {
+			t.Errorf("%v: got %v; want %v", tt.name, got, tt.want)
+		}
+	}
+}
+
+type rateLimitingBusSenderTester struct {
+	tb    testing.TB
+	got   []*ipn.Notify
+	clock *tstest.Clock
+	s     *rateLimitingBusSender
+}
+
+func (st *rateLimitingBusSenderTester) init() {
+	if st.s != nil {
+		return
+	}
+	st.clock = tstest.NewClock(tstest.ClockOpts{
+		Start: time.Unix(1731777537, 0), // time I wrote this test :)
+	})
+	st.s = &rateLimitingBusSender{
+		clock: tstime.DefaultClock{Clock: st.clock},
+		fn: func(n *ipn.Notify) bool {
+			st.got = append(st.got, n)
+			return true
+		},
+	}
+}
+
+func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) {
+	st.tb.Helper()
+	st.init()
+	if !st.s.send(n) {
+		st.tb.Fatal("unexpected send failed")
+	}
+}
+
+func (st *rateLimitingBusSenderTester) advance(d time.Duration) {
+	st.tb.Helper()
+	st.clock.Advance(d)
+	select {
+	case <-st.s.flushChan():
+		if !st.s.flush() {
+			st.tb.Fatal("unexpected flush failed")
+		}
+	default:
+	}
+}
+
+func TestRateLimitingBusSender(t *testing.T) {
+	nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
+	nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
+	eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
+	eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
+
+	t.Run("unbuffered", func(t *testing.T) {
+		st := &rateLimitingBusSenderTester{tb: t}
+		st.send(nm1)
+		st.send(nm2)
+		st.send(eng1)
+		st.send(eng2)
+		if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) {
+			t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got))
+		}
+	})
+
+	t.Run("buffered", func(t *testing.T) {
+		st := &rateLimitingBusSenderTester{tb: t}
+		st.init()
+		st.s.interval = 1 * time.Second
+		st.send(&ipn.Notify{Version: "initial"})
+		if len(st.got) != 1 {
+			t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got))
+		}
+		st.send(nm1)
+		st.send(nm2)
+		st.send(eng1)
+		st.send(eng2)
+		if len(st.got) != 1 {
+			if len(st.got) != 1 {
+				t.Fatalf("got %d items; expected still just that first 1", len(st.got))
+			}
+		}
+
+		// But moving the clock should flush the rest, collasced into one new one.
+		st.advance(5 * time.Second)
+		if len(st.got) != 2 {
+			t.Fatalf("got %d items; want 2", len(st.got))
+		}
+		gotn := st.got[1]
+		if gotn.NetMap != nm2.NetMap {
+			t.Errorf("got wrong NetMap; got %p", gotn.NetMap)
+		}
+		if gotn.Engine != eng2.Engine {
+			t.Errorf("got wrong Engine; got %p", gotn.Engine)
+		}
+		if t.Failed() {
+			t.Logf("failed Notify was: %v", logger.AsJSON(gotn))
+		}
+	})
+
+	// Test the Run method
+	t.Run("run", func(t *testing.T) {
+		st := &rateLimitingBusSenderTester{tb: t}
+		st.init()
+		st.s.interval = 1 * time.Second
+		st.s.lastFlush = st.clock.Now() // pretend we just flushed
+
+		flushc := make(chan *ipn.Notify, 1)
+		st.s.fn = func(n *ipn.Notify) bool {
+			flushc <- n
+			return true
+		}
+		didSend := make(chan bool, 2)
+		st.s.didSendTestHook = func() { didSend <- true }
+		waitSend := func() {
+			select {
+			case <-didSend:
+			case <-time.After(5 * time.Second):
+				t.Error("timeout waiting for call to send")
+			}
+		}
+
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+
+		incoming := make(chan *ipn.Notify, 2)
+		go func() {
+			incoming <- nm1
+			waitSend()
+			incoming <- nm2
+			waitSend()
+			st.advance(5 * time.Second)
+			select {
+			case n := <-flushc:
+				if n.NetMap != nm2.NetMap {
+					t.Errorf("got wrong NetMap; got %p", n.NetMap)
+				}
+			case <-time.After(10 * time.Second):
+				t.Error("timeout")
+			}
+			cancel()
+		}()
+
+		st.s.Run(ctx, incoming)
+	})
+}

+ 7 - 10
ipn/ipnlocal/local.go

@@ -2780,20 +2780,17 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A
 		go b.pollRequestEngineStatus(ctx)
 	}
 
-	// TODO(marwan-at-work): check err
 	// TODO(marwan-at-work): streaming background logs?
 	defer b.DeleteForegroundSession(sessionID)
 
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case n := <-ch:
-			if !fn(n) {
-				return
-			}
-		}
+	sender := &rateLimitingBusSender{fn: fn}
+	defer sender.close()
+
+	if mask&ipn.NotifyRateLimit != 0 {
+		sender.interval = 3 * time.Second
 	}
+
+	sender.Run(ctx, ch)
 }
 
 // pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx