Browse Source

prober: optionally spread probes over time

By default all probes with the same probe interval that have been added
together will run on a synchronized schedule, which results in spiky
resource usage and potential throttling by third-party systems (for
example, OCSP servers used by the TLS probes).

To address this, prober can now run in "spread" mode that will
introduce a random delay before the first run of each probe.

Signed-off-by: Anton Tolchanov <[email protected]>
Anton Tolchanov 3 years ago
parent
commit
bd47e28638
2 changed files with 114 additions and 21 deletions
  1. 50 14
      prober/prober.go
  2. 64 7
      prober/prober_test.go

+ 50 - 14
prober/prober.go

@@ -13,8 +13,10 @@ import (
 	"errors"
 	"expvar"
 	"fmt"
+	"hash/fnv"
 	"io"
 	"log"
+	"math/rand"
 	"sort"
 	"strings"
 	"sync"
@@ -28,6 +30,10 @@ type ProbeFunc func(context.Context) error
 
 // a Prober manages a set of probes and keeps track of their results.
 type Prober struct {
+	// Whether to spread probe execution over time by introducing a
+	// random delay before the first probe run.
+	spread bool
+
 	// Time-related functions that get faked out during tests.
 	now       func() time.Time
 	newTicker func(time.Duration) ticker
@@ -65,18 +71,17 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri
 	}
 
 	ctx, cancel := context.WithCancel(context.Background())
-	ticker := p.newTicker(interval)
 	probe := &Probe{
 		prober:  p,
 		ctx:     ctx,
 		cancel:  cancel,
 		stopped: make(chan struct{}),
 
-		name:     name,
-		doProbe:  fun,
-		interval: interval,
-		tick:     ticker,
-		labels:   labels,
+		name:         name,
+		doProbe:      fun,
+		interval:     interval,
+		initialDelay: initialDelay(name, interval),
+		labels:       labels,
 	}
 	p.probes[name] = probe
 	go probe.loop()
@@ -90,6 +95,13 @@ func (p *Prober) unregister(probe *Probe) {
 	delete(p.probes, name)
 }
 
+// WithSpread is used to enable random delay before the first run of
+// each added probe.
+func (p *Prober) WithSpread(s bool) *Prober {
+	p.spread = s
+	return p
+}
+
 // Reports the number of registered probes. For tests only.
 func (p *Prober) activeProbes() int {
 	p.mu.Lock()
@@ -105,11 +117,12 @@ type Probe struct {
 	cancel  context.CancelFunc // run to initiate shutdown
 	stopped chan struct{}      // closed when shutdown is complete
 
-	name     string
-	doProbe  ProbeFunc
-	interval time.Duration
-	tick     ticker
-	labels   map[string]string
+	name         string
+	doProbe      ProbeFunc
+	interval     time.Duration
+	initialDelay time.Duration
+	tick         ticker
+	labels       map[string]string
 
 	mu     sync.Mutex
 	start  time.Time // last time doProbe started
@@ -127,12 +140,26 @@ func (p *Probe) Close() error {
 }
 
 // probeLoop invokes runProbe on fun every interval. The first probe
-// is run after interval.
+// is run after a random delay (if spreading is enabled) or immediately.
 func (p *Probe) loop() {
 	defer close(p.stopped)
 
-	// Do a first probe right away, so that the prober immediately exports results for everything.
-	p.run()
+	if p.prober.spread && p.initialDelay > 0 {
+		t := p.prober.newTicker(p.initialDelay)
+		select {
+		case <-t.Chan():
+			p.run()
+		case <-p.ctx.Done():
+			t.Stop()
+			return
+		}
+		t.Stop()
+	} else {
+		p.run()
+	}
+
+	p.tick = p.prober.newTicker(p.interval)
+	defer p.tick.Stop()
 	for {
 		select {
 		case <-p.tick.Chan():
@@ -310,3 +337,12 @@ func (t *realTicker) Chan() <-chan time.Time {
 func newRealTicker(d time.Duration) ticker {
 	return &realTicker{time.NewTicker(d)}
 }
+
+// initialDelay returns a pseudorandom duration in [0, interval) that
+// is based on the provided seed string.
+func initialDelay(seed string, interval time.Duration) time.Duration {
+	h := fnv.New64()
+	fmt.Fprint(h, seed)
+	r := rand.New(rand.NewSource(int64(h.Sum64()))).Float64()
+	return time.Duration(float64(interval) * r)
+}

+ 64 - 7
prober/prober_test.go

@@ -60,7 +60,7 @@ func TestProberTiming(t *testing.T) {
 		return nil
 	})
 
-	waitActiveProbes(t, p, 1)
+	waitActiveProbes(t, p, clk, 1)
 
 	called()
 	notCalled()
@@ -74,6 +74,49 @@ func TestProberTiming(t *testing.T) {
 	notCalled()
 }
 
+func TestProberTimingSpread(t *testing.T) {
+	clk := newFakeTime()
+	p := newForTest(clk.Now, clk.NewTicker).WithSpread(true)
+
+	invoked := make(chan struct{}, 1)
+
+	notCalled := func() {
+		t.Helper()
+		select {
+		case <-invoked:
+			t.Fatal("probe was invoked earlier than expected")
+		default:
+		}
+	}
+	called := func() {
+		t.Helper()
+		select {
+		case <-invoked:
+		case <-time.After(2 * time.Second):
+			t.Fatal("probe wasn't invoked as expected")
+		}
+	}
+
+	p.Run("test-spread-probe", probeInterval, nil, func(context.Context) error {
+		invoked <- struct{}{}
+		return nil
+	})
+
+	waitActiveProbes(t, p, clk, 1)
+
+	notCalled()
+	// Name of the probe (test-spread-probe) has been chosen to ensure that
+	// the initial delay is smaller than half of the probe interval.
+	clk.Advance(halfProbeInterval)
+	called()
+	notCalled()
+	clk.Advance(quarterProbeInterval)
+	notCalled()
+	clk.Advance(probeInterval)
+	called()
+	notCalled()
+}
+
 func TestProberRun(t *testing.T) {
 	clk := newFakeTime()
 	p := newForTest(clk.Now, clk.NewTicker)
@@ -111,7 +154,7 @@ func TestProberRun(t *testing.T) {
 		}
 	}
 
-	waitActiveProbes(t, p, startingProbes)
+	waitActiveProbes(t, p, clk, startingProbes)
 	checkCnt(startingProbes)
 	clk.Advance(probeInterval + halfProbeInterval)
 	checkCnt(startingProbes)
@@ -121,7 +164,7 @@ func TestProberRun(t *testing.T) {
 	for i := keep; i < startingProbes; i++ {
 		probes[i].Close()
 	}
-	waitActiveProbes(t, p, keep)
+	waitActiveProbes(t, p, clk, keep)
 
 	clk.Advance(probeInterval)
 	checkCnt(keep)
@@ -140,7 +183,7 @@ func TestExpvar(t *testing.T) {
 		return errors.New("failing, as instructed by test")
 	})
 
-	waitActiveProbes(t, p, 1)
+	waitActiveProbes(t, p, clk, 1)
 
 	check := func(name string, want probeInfo) {
 		t.Helper()
@@ -198,7 +241,7 @@ func TestPrometheus(t *testing.T) {
 		return errors.New("failing, as instructed by test")
 	})
 
-	waitActiveProbes(t, p, 1)
+	waitActiveProbes(t, p, clk, 1)
 
 	err := tstest.WaitFor(convergenceTimeout, func() error {
 		var b bytes.Buffer
@@ -326,6 +369,17 @@ func (t *fakeTime) Advance(d time.Duration) {
 	}
 }
 
+func (t *fakeTime) activeTickers() (count int) {
+	t.Lock()
+	defer t.Unlock()
+	for _, tick := range t.tickers {
+		if !tick.stopped {
+			count += 1
+		}
+	}
+	return
+}
+
 func probeExpvar(t *testing.T, p *Prober) map[string]*probeInfo {
 	t.Helper()
 	s := p.Expvar().String()
@@ -336,11 +390,14 @@ func probeExpvar(t *testing.T, p *Prober) map[string]*probeInfo {
 	return ret
 }
 
-func waitActiveProbes(t *testing.T, p *Prober, want int) {
+func waitActiveProbes(t *testing.T, p *Prober, clk *fakeTime, want int) {
 	t.Helper()
 	err := tstest.WaitFor(convergenceTimeout, func() error {
 		if got := p.activeProbes(); got != want {
-			return fmt.Errorf("active probe count is %d, want %d", got, want)
+			return fmt.Errorf("installed probe count is %d, want %d", got, want)
+		}
+		if got := clk.activeTickers(); got != want {
+			return fmt.Errorf("active ticker count is %d, want %d", got, want)
 		}
 		return nil
 	})