Ver código fonte

Revert "logtail: avoid racing eventbus subscriptions with Shutdown (#17639)" (#17684)

This reverts commit 4346615d77a6de16854c6e78f9d49375d6424e6e.
We averted the shutdown race, but will need to service the subscriber even when
we are not waiting for a change so that we do not delay the bus as a whole.

Updates #17638

Change-Id: I5488466ed83f5ad1141c95267f5ae54878a24657
Signed-off-by: M. J. Fromberger <[email protected]>
M. J. Fromberger 4 meses atrás
pai
commit
db5815fb97
1 arquivos alterados com 20 adições e 19 exclusões
  1. 20 19
      logtail/logtail.go

+ 20 - 19
logtail/logtail.go

@@ -124,7 +124,6 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
 
 	if cfg.Bus != nil {
 		l.eventClient = cfg.Bus.Client("logtail.Logger")
-		l.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
 	}
 	l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
 	l.compressLogs = cfg.CompressLogs
@@ -163,7 +162,6 @@ type Logger struct {
 	httpDoCalls    atomic.Int32
 	sockstatsLabel atomicSocktatsLabel
 	eventClient    *eventbus.Client
-	changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta]
 
 	procID              uint32
 	includeProcSequence bool
@@ -429,23 +427,8 @@ func (l *Logger) internetUp() bool {
 
 func (l *Logger) awaitInternetUp(ctx context.Context) {
 	if l.eventClient != nil {
-		for {
-			if l.internetUp() {
-				return
-			}
-			select {
-			case <-ctx.Done():
-				return // give up
-			case <-l.changeDeltaSub.Done():
-				return // give up (closing down)
-			case delta := <-l.changeDeltaSub.Events():
-				if delta.New.AnyInterfaceUp() || l.internetUp() {
-					fmt.Fprintf(l.stderr, "logtail: internet back up\n")
-					return
-				}
-				fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
-			}
-		}
+		l.awaitInternetUpBus(ctx)
+		return
 	}
 	upc := make(chan bool, 1)
 	defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
@@ -466,6 +449,24 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
 	}
 }
 
+func (l *Logger) awaitInternetUpBus(ctx context.Context) {
+	if l.internetUp() {
+		return
+	}
+	sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
+	defer sub.Close()
+	select {
+	case delta := <-sub.Events():
+		if delta.New.AnyInterfaceUp() {
+			fmt.Fprintf(l.stderr, "logtail: internet back up\n")
+			return
+		}
+		fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
+	case <-ctx.Done():
+		return
+	}
+}
+
 // upload uploads body to the log server.
 // origlen indicates the pre-compression body length.
 // origlen of -1 indicates that the body is not compressed.