2
0
Эх сурвалжийг харах

prober: support multiple probes running concurrently

Some probes might need to run for longer than their scheduling interval,
so this change relaxes the 1-at-a-time restriction, allowing us to
configure probe concurrency and timeout separately. The default values
remain the same (concurrency of 1; timeout of 80% of interval).

Updates tailscale/corp#25479

Signed-off-by: Anton Tolchanov <[email protected]>
Anton Tolchanov 1 жил өмнө
parent
commit
c2af1cd9e3
2 өөрчлөгдсөн 105 нэмэгдсэн , 15 устгасан
  1. 34 14
      prober/prober.go
  2. 71 1
      prober/prober_test.go

+ 34 - 14
prober/prober.go

@@ -7,6 +7,7 @@
 package prober
 
 import (
+	"cmp"
 	"container/ring"
 	"context"
 	"encoding/json"
@@ -20,6 +21,7 @@ import (
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
+	"tailscale.com/syncs"
 	"tailscale.com/tsweb"
 )
 
@@ -44,6 +46,14 @@ type ProbeClass struct {
 	// exposed by this probe class.
 	Labels Labels
 
+	// Timeout is the maximum time the probe function is allowed to run before
+	// its context is cancelled. Defaults to 80% of the scheduling interval.
+	Timeout time.Duration
+
+	// Concurrency is the maximum number of concurrent probe executions
+	// allowed for this probe class. Defaults to 1.
+	Concurrency int
+
 	// Metrics allows a probe class to export custom Metrics. Can be nil.
 	Metrics func(prometheus.Labels) []prometheus.Metric
 }
@@ -131,9 +141,12 @@ func newProbe(p *Prober, name string, interval time.Duration, l prometheus.Label
 		cancel:  cancel,
 		stopped: make(chan struct{}),
 
+		runSema: syncs.NewSemaphore(cmp.Or(pc.Concurrency, 1)),
+
 		name:         name,
 		probeClass:   pc,
 		interval:     interval,
+		timeout:      cmp.Or(pc.Timeout, time.Duration(float64(interval)*0.8)),
 		initialDelay: initialDelay(name, interval),
 		successHist:  ring.New(recentHistSize),
 		latencyHist:  ring.New(recentHistSize),
@@ -226,11 +239,12 @@ type Probe struct {
 	ctx     context.Context
 	cancel  context.CancelFunc // run to initiate shutdown
 	stopped chan struct{}      // closed when shutdown is complete
-	runMu   sync.Mutex         // ensures only one probe runs at a time
+	runSema syncs.Semaphore    // restricts concurrency per probe
 
 	name         string
 	probeClass   ProbeClass
 	interval     time.Duration
+	timeout      time.Duration
 	initialDelay time.Duration
 	tick         ticker
 
@@ -282,17 +296,15 @@ func (p *Probe) loop() {
 		t := p.prober.newTicker(p.initialDelay)
 		select {
 		case <-t.Chan():
-			p.run()
 		case <-p.ctx.Done():
 			t.Stop()
 			return
 		}
 		t.Stop()
-	} else {
-		p.run()
 	}
 
 	if p.prober.once {
+		p.run()
 		return
 	}
 
@@ -315,9 +327,12 @@ func (p *Probe) loop() {
 	p.tick = p.prober.newTicker(p.interval)
 	defer p.tick.Stop()
 	for {
+		// Run the probe in a new goroutine every tick. Default concurrency & timeout
+		// settings will ensure that only one probe is running at a time.
+		go p.run()
+
 		select {
 		case <-p.tick.Chan():
-			p.run()
 		case <-p.ctx.Done():
 			return
 		}
@@ -331,8 +346,13 @@ func (p *Probe) loop() {
 // that the probe either succeeds or fails before the next cycle is scheduled to
 // start.
 func (p *Probe) run() (pi ProbeInfo, err error) {
-	p.runMu.Lock()
-	defer p.runMu.Unlock()
+	// Probes are scheduled each p.interval, so we don't wait longer than that.
+	semaCtx, cancel := context.WithTimeout(p.ctx, p.interval)
+	defer cancel()
+	if !p.runSema.AcquireContext(semaCtx) {
+		return pi, fmt.Errorf("probe %s: context cancelled", p.name)
+	}
+	defer p.runSema.Release()
 
 	p.recordStart()
 	defer func() {
@@ -344,19 +364,21 @@ func (p *Probe) run() (pi ProbeInfo, err error) {
 		if r := recover(); r != nil {
 			log.Printf("probe %s panicked: %v", p.name, r)
 			err = fmt.Errorf("panic: %v", r)
-			p.recordEnd(err)
+			p.recordEndLocked(err)
 		}
 	}()
 	ctx := p.ctx
 	if !p.IsContinuous() {
-		timeout := time.Duration(float64(p.interval) * 0.8)
 		var cancel func()
-		ctx, cancel = context.WithTimeout(ctx, timeout)
+		ctx, cancel = context.WithTimeout(ctx, p.timeout)
 		defer cancel()
 	}
 
 	err = p.probeClass.Probe(ctx)
-	p.recordEnd(err)
+
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.recordEndLocked(err)
 	if err != nil {
 		log.Printf("probe %s: %v", p.name, err)
 	}
@@ -370,10 +392,8 @@ func (p *Probe) recordStart() {
 	p.mu.Unlock()
 }
 
-func (p *Probe) recordEnd(err error) {
+func (p *Probe) recordEndLocked(err error) {
 	end := p.prober.now()
-	p.mu.Lock()
-	defer p.mu.Unlock()
 	p.end = end
 	p.succeeded = err == nil
 	p.lastErr = err

+ 71 - 1
prober/prober_test.go

@@ -149,6 +149,74 @@ func TestProberTimingSpread(t *testing.T) {
 	notCalled()
 }
 
+func TestProberTimeout(t *testing.T) {
+	clk := newFakeTime()
+	p := newForTest(clk.Now, clk.NewTicker)
+
+	var done sync.WaitGroup
+	done.Add(1)
+	pfunc := FuncProbe(func(ctx context.Context) error {
+		defer done.Done()
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	})
+	pfunc.Timeout = time.Microsecond
+	probe := p.Run("foo", 30*time.Second, nil, pfunc)
+	waitActiveProbes(t, p, clk, 1)
+	done.Wait()
+	probe.mu.Lock()
+	info := probe.probeInfoLocked()
+	probe.mu.Unlock()
+	wantInfo := ProbeInfo{
+		Name:            "foo",
+		Interval:        30 * time.Second,
+		Labels:          map[string]string{"class": "", "name": "foo"},
+		Status:          ProbeStatusFailed,
+		Error:           "context deadline exceeded",
+		RecentResults:   []bool{false},
+		RecentLatencies: nil,
+	}
+	if diff := cmp.Diff(wantInfo, info, cmpopts.IgnoreFields(ProbeInfo{}, "Start", "End", "Latency")); diff != "" {
+		t.Fatalf("unexpected ProbeInfo (-want +got):\n%s", diff)
+	}
+	if got := info.Latency; got > time.Second {
+		t.Errorf("info.Latency = %v, want at most 1s", got)
+	}
+}
+
+func TestProberConcurrency(t *testing.T) {
+	clk := newFakeTime()
+	p := newForTest(clk.Now, clk.NewTicker)
+
+	var ran atomic.Int64
+	stopProbe := make(chan struct{})
+	pfunc := FuncProbe(func(ctx context.Context) error {
+		ran.Add(1)
+		<-stopProbe
+		return nil
+	})
+	pfunc.Timeout = time.Hour
+	pfunc.Concurrency = 3
+	p.Run("foo", time.Second, nil, pfunc)
+	waitActiveProbes(t, p, clk, 1)
+
+	for range 50 {
+		clk.Advance(time.Second)
+	}
+
+	if err := tstest.WaitFor(convergenceTimeout, func() error {
+		if got, want := ran.Load(), int64(3); got != want {
+			return fmt.Errorf("expected %d probes to run concurrently, got %d", want, got)
+		}
+		return nil
+	}); err != nil {
+		t.Fatal(err)
+	}
+	close(stopProbe)
+}
+
 func TestProberRun(t *testing.T) {
 	clk := newFakeTime()
 	p := newForTest(clk.Now, clk.NewTicker)
@@ -450,9 +518,11 @@ func TestProbeInfoRecent(t *testing.T) {
 			for _, r := range tt.results {
 				probe.recordStart()
 				clk.Advance(r.latency)
-				probe.recordEnd(r.err)
+				probe.recordEndLocked(r.err)
 			}
+			probe.mu.Lock()
 			info := probe.probeInfoLocked()
+			probe.mu.Unlock()
 			if diff := cmp.Diff(tt.wantProbeInfo, info, cmpopts.IgnoreFields(ProbeInfo{}, "Start", "End", "Interval")); diff != "" {
 				t.Fatalf("unexpected ProbeInfo (-want +got):\n%s", diff)
 			}