Browse Source

logtail: avoid racing eventbus subscriptions with shutdown (#17695)

In #17639 we moved the subscription into NewLogger to ensure we would not race
subscribing with shutdown of the eventbus client. Doing so fixed that problem,
but exposed another: As we were only servicing events occasionally when waiting
for the network to come up, we could leave the eventbus to stall in cases where
a number of network deltas arrived later and weren't processed.

To address that, let's separate the concerns: As before, we'll Subscribe early
to avoid conflicts with shutdown; but instead of using the subscriber directly
to determine readiness, we'll keep track of the last-known network state in a
selectable condition that the subscriber updates for us.  When we want to wait,
we'll wait on that condition (or until our context ends), ensuring all the
events get processed in a timely manner.

Updates #17638
Updates #15160

Change-Id: I28339a372be4ab24be46e2834a218874c33a0d2d
Signed-off-by: M. J. Fromberger <[email protected]>
M. J. Fromberger 4 months ago
parent
commit
95426b79a9
10 changed files with 43 additions and 30 deletions
  1. 1 0
      cmd/k8s-operator/depaware.txt
  2. 1 0
      cmd/tailscaled/depaware.txt
  3. 1 0
      cmd/tsidp/depaware.txt
  4. 1 1
      flake.nix
  5. 2 1
      go.mod
  6. 1 1
      go.mod.sri
  7. 6 4
      go.sum
  8. 28 22
      logtail/logtail.go
  9. 1 1
      shell.nix
  10. 1 0
      tsnet/depaware.txt

+ 1 - 0
cmd/k8s-operator/depaware.txt

@@ -12,6 +12,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
         github.com/coder/websocket/internal/errd                     from github.com/coder/websocket
         github.com/coder/websocket/internal/util                     from github.com/coder/websocket
         github.com/coder/websocket/internal/xsync                    from github.com/coder/websocket
+        github.com/creachadair/msync/trigger                         from tailscale.com/logtail
      💣 github.com/davecgh/go-spew/spew                              from k8s.io/apimachinery/pkg/util/dump
    W 💣 github.com/dblohm7/wingoes                                   from tailscale.com/net/tshttpproxy+
    W 💣 github.com/dblohm7/wingoes/com                               from tailscale.com/util/osdiag+

+ 1 - 0
cmd/tailscaled/depaware.txt

@@ -86,6 +86,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
         github.com/coder/websocket/internal/util                     from github.com/coder/websocket
         github.com/coder/websocket/internal/xsync                    from github.com/coder/websocket
    L    github.com/coreos/go-iptables/iptables                       from tailscale.com/util/linuxfw
+        github.com/creachadair/msync/trigger                         from tailscale.com/logtail
   LD 💣 github.com/creack/pty                                        from tailscale.com/ssh/tailssh
    W 💣 github.com/dblohm7/wingoes                                   from github.com/dblohm7/wingoes/com+
    W 💣 github.com/dblohm7/wingoes/com                               from tailscale.com/cmd/tailscaled+

+ 1 - 0
cmd/tsidp/depaware.txt

@@ -9,6 +9,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar
         github.com/coder/websocket/internal/errd                     from github.com/coder/websocket
         github.com/coder/websocket/internal/util                     from github.com/coder/websocket
         github.com/coder/websocket/internal/xsync                    from github.com/coder/websocket
+        github.com/creachadair/msync/trigger                         from tailscale.com/logtail
    W 💣 github.com/dblohm7/wingoes                                   from tailscale.com/net/tshttpproxy+
    W 💣 github.com/dblohm7/wingoes/com                               from tailscale.com/util/osdiag+
    W 💣 github.com/dblohm7/wingoes/com/automation                    from tailscale.com/util/osdiag/internal/wsc

+ 1 - 1
flake.nix

@@ -151,5 +151,5 @@
     });
   };
 }
-# nix-direnv cache busting line: sha256-rV3C2Vi48FCifGt58OdEO4+Av0HRIs8sUJVvp/gEBLw=
+# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw=
 

+ 2 - 1
go.mod

@@ -21,6 +21,7 @@ require (
 	github.com/coder/websocket v1.8.12
 	github.com/coreos/go-iptables v0.7.1-0.20240112124308-65c67c9f46e6
 	github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
+	github.com/creachadair/msync v0.7.1
 	github.com/creachadair/taskgroup v0.13.2
 	github.com/creack/pty v1.1.23
 	github.com/dblohm7/wingoes v0.0.0-20240119213807-a09d6be7affa
@@ -114,7 +115,7 @@ require (
 	golang.zx2c4.com/wireguard/windows v0.5.3
 	gopkg.in/square/go-jose.v2 v2.6.0
 	gvisor.dev/gvisor v0.0.0-20250205023644-9414b50a5633
-	honnef.co/go/tools v0.5.1
+	honnef.co/go/tools v0.6.1
 	k8s.io/api v0.32.0
 	k8s.io/apimachinery v0.32.0
 	k8s.io/apiserver v0.32.0

+ 1 - 1
go.mod.sri

@@ -1 +1 @@
-sha256-rV3C2Vi48FCifGt58OdEO4+Av0HRIs8sUJVvp/gEBLw=
+sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw=

+ 6 - 4
go.sum

@@ -244,8 +244,10 @@ github.com/coreos/go-iptables v0.7.1-0.20240112124308-65c67c9f46e6/go.mod h1:Qe8
 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU=
 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
 github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
-github.com/creachadair/mds v0.17.1 h1:lXQbTGKmb3nE3aK6OEp29L1gCx6B5ynzlQ6c1KOBurc=
-github.com/creachadair/mds v0.17.1/go.mod h1:4b//mUiL8YldH6TImXjmW45myzTLNS1LLjOmrk888eg=
+github.com/creachadair/mds v0.25.9 h1:080Hr8laN2h+l3NeVCGMBpXtIPnl9mz8e4HLraGPqtA=
+github.com/creachadair/mds v0.25.9/go.mod h1:4hatI3hRM+qhzuAmqPRFvaBM8mONkS7nsLxkcuTYUIs=
+github.com/creachadair/msync v0.7.1 h1:SeZmuEBXQPe5GqV/C94ER7QIZPwtvFbeQiykzt/7uho=
+github.com/creachadair/msync v0.7.1/go.mod h1:8CcFlLsSujfHE5wWm19uUBLHIPDAUr6LXDwneVMO008=
 github.com/creachadair/taskgroup v0.13.2 h1:3KyqakBuFsm3KkXi/9XIb0QcA8tEzLHLgaoidf0MdVc=
 github.com/creachadair/taskgroup v0.13.2/go.mod h1:i3V1Zx7H8RjwljUEeUWYT30Lmb9poewSb2XI1yTwD0g=
 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
@@ -1534,8 +1536,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
 honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
 honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
-honnef.co/go/tools v0.5.1 h1:4bH5o3b5ZULQ4UrBmP+63W9r7qIkqJClEA9ko5YKx+I=
-honnef.co/go/tools v0.5.1/go.mod h1:e9irvo83WDG9/irijV44wr3tbhcFeRnfpVlRqVwpzMs=
+honnef.co/go/tools v0.6.1 h1:R094WgE8K4JirYjBaOpz/AvTyUu/3wbmAoskKN/pxTI=
+honnef.co/go/tools v0.6.1/go.mod h1:3puzxxljPCe8RGJX7BIy1plGbxEOZni5mR2aXe3/uk4=
 howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM=
 howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g=
 k8s.io/api v0.32.0 h1:OL9JpbvAU5ny9ga2fb24X8H6xQlVp+aJMFlgtQjR9CE=

+ 28 - 22
logtail/logtail.go

@@ -25,6 +25,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/creachadair/msync/trigger"
 	"github.com/go-json-experiment/json/jsontext"
 	"tailscale.com/envknob"
 	"tailscale.com/net/netmon"
@@ -124,6 +125,8 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
 
 	if cfg.Bus != nil {
 		l.eventClient = cfg.Bus.Client("logtail.Logger")
+		// Subscribe to change deltas from NetMon to detect when the network comes up.
+		eventbus.SubscribeFunc(l.eventClient, l.onChangeDelta)
 	}
 	l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
 	l.compressLogs = cfg.CompressLogs
@@ -162,6 +165,7 @@ type Logger struct {
 	httpDoCalls    atomic.Int32
 	sockstatsLabel atomicSocktatsLabel
 	eventClient    *eventbus.Client
+	networkIsUp    trigger.Cond // set/reset by netmon.ChangeDelta events
 
 	procID              uint32
 	includeProcSequence bool
@@ -418,16 +422,36 @@ func (l *Logger) uploading(ctx context.Context) {
 }
 
 func (l *Logger) internetUp() bool {
-	if l.netMonitor == nil {
-		// No way to tell, so assume it is.
+	select {
+	case <-l.networkIsUp.Ready():
 		return true
+	default:
+		if l.netMonitor == nil {
+			return true // No way to tell, so assume it is.
+		}
+		return l.netMonitor.InterfaceState().AnyInterfaceUp()
+	}
+}
+
+// onChangeDelta is an eventbus subscriber function that handles
+// [netmon.ChangeDelta] events to detect whether the Internet is expected to be
+// reachable.
+func (l *Logger) onChangeDelta(delta *netmon.ChangeDelta) {
+	if delta.New.AnyInterfaceUp() {
+		fmt.Fprintf(l.stderr, "logtail: internet back up\n")
+		l.networkIsUp.Set()
+	} else {
+		fmt.Fprintf(l.stderr, "logtail: network changed, but is not up\n")
+		l.networkIsUp.Reset()
 	}
-	return l.netMonitor.InterfaceState().AnyInterfaceUp()
 }
 
 func (l *Logger) awaitInternetUp(ctx context.Context) {
 	if l.eventClient != nil {
-		l.awaitInternetUpBus(ctx)
+		select {
+		case <-l.networkIsUp.Ready():
+		case <-ctx.Done():
+		}
 		return
 	}
 	upc := make(chan bool, 1)
@@ -449,24 +473,6 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
 	}
 }
 
-func (l *Logger) awaitInternetUpBus(ctx context.Context) {
-	if l.internetUp() {
-		return
-	}
-	sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
-	defer sub.Close()
-	select {
-	case delta := <-sub.Events():
-		if delta.New.AnyInterfaceUp() {
-			fmt.Fprintf(l.stderr, "logtail: internet back up\n")
-			return
-		}
-		fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
-	case <-ctx.Done():
-		return
-	}
-}
-
 // upload uploads body to the log server.
 // origlen indicates the pre-compression body length.
 // origlen of -1 indicates that the body is not compressed.

+ 1 - 1
shell.nix

@@ -16,4 +16,4 @@
 ) {
   src =  ./.;
 }).shellNix
-# nix-direnv cache busting line: sha256-rV3C2Vi48FCifGt58OdEO4+Av0HRIs8sUJVvp/gEBLw=
+# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw=

+ 1 - 0
tsnet/depaware.txt

@@ -9,6 +9,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware)
  LDW    github.com/coder/websocket/internal/errd                     from github.com/coder/websocket
  LDW    github.com/coder/websocket/internal/util                     from github.com/coder/websocket
  LDW    github.com/coder/websocket/internal/xsync                    from github.com/coder/websocket
+        github.com/creachadair/msync/trigger                         from tailscale.com/logtail
    W 💣 github.com/dblohm7/wingoes                                   from tailscale.com/net/tshttpproxy+
    W 💣 github.com/dblohm7/wingoes/com                               from tailscale.com/util/osdiag+
    W 💣 github.com/dblohm7/wingoes/com/automation                    from tailscale.com/util/osdiag/internal/wsc