prober.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package prober implements a simple blackbox prober. Each probe runs
  4. // in its own goroutine, and run results are recorded as Prometheus
  5. // metrics.
  6. package prober
  7. import (
  8. "bytes"
  9. "cmp"
  10. "container/ring"
  11. "context"
  12. "encoding/json"
  13. "fmt"
  14. "hash/fnv"
  15. "log"
  16. "maps"
  17. "math/rand"
  18. "net/http"
  19. "slices"
  20. "sync"
  21. "time"
  22. "github.com/prometheus/client_golang/prometheus"
  23. "golang.org/x/sync/errgroup"
  24. "tailscale.com/syncs"
  25. "tailscale.com/tsweb"
  26. )
  27. // recentHistSize is the number of recent probe results and latencies to keep
  28. // in memory.
  29. const recentHistSize = 10
  30. // ProbeClass defines a probe of a specific type: a probing function that will
  31. // be regularly ran, and metric labels that will be added automatically to all
  32. // probes using this class.
  33. type ProbeClass struct {
  34. // Probe is a function that probes something and reports whether the Probe
  35. // succeeded. The provided context's deadline must be obeyed for correct
  36. // Probe scheduling.
  37. Probe func(context.Context) error
  38. // Class defines a user-facing name of the probe class that will be used
  39. // in the `class` metric label.
  40. Class string
  41. // Labels defines a set of metric labels that will be added to all metrics
  42. // exposed by this probe class.
  43. Labels Labels
  44. // Timeout is the maximum time the probe function is allowed to run before
  45. // its context is cancelled. Defaults to 80% of the scheduling interval.
  46. Timeout time.Duration
  47. // Concurrency is the maximum number of concurrent probe executions
  48. // allowed for this probe class. Defaults to 1.
  49. Concurrency int
  50. // Metrics allows a probe class to export custom Metrics. Can be nil.
  51. Metrics func(prometheus.Labels) []prometheus.Metric
  52. }
  53. // FuncProbe wraps a simple probe function in a ProbeClass.
  54. func FuncProbe(fn func(context.Context) error) ProbeClass {
  55. return ProbeClass{
  56. Probe: fn,
  57. }
  58. }
  59. // a Prober manages a set of probes and keeps track of their results.
  60. type Prober struct {
  61. // Whether to spread probe execution over time by introducing a
  62. // random delay before the first probe run.
  63. spread bool
  64. // Whether to run all probes once instead of running them in a loop.
  65. once bool
  66. // Time-related functions that get faked out during tests.
  67. now func() time.Time
  68. newTicker func(time.Duration) ticker
  69. mu sync.Mutex // protects all following fields
  70. probes map[string]*Probe
  71. namespace string
  72. metrics *prometheus.Registry
  73. }
  74. // New returns a new Prober.
  75. func New() *Prober {
  76. return newForTest(time.Now, newRealTicker)
  77. }
  78. func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober {
  79. p := &Prober{
  80. now: now,
  81. newTicker: newTicker,
  82. probes: map[string]*Probe{},
  83. metrics: prometheus.NewRegistry(),
  84. namespace: "prober",
  85. }
  86. prometheus.DefaultRegisterer.MustRegister(p.metrics)
  87. return p
  88. }
  89. // Run executes probe class function every interval, and exports probe results under probeName.
  90. //
  91. // If interval is negative, the probe will run continuously. If it encounters a failure while
  92. // running continuously, it will pause for -1*interval and then retry.
  93. //
  94. // Registering a probe under an already-registered name panics.
  95. func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc ProbeClass) *Probe {
  96. p.mu.Lock()
  97. defer p.mu.Unlock()
  98. if _, ok := p.probes[name]; ok {
  99. panic(fmt.Sprintf("probe named %q already registered", name))
  100. }
  101. lb := prometheus.Labels{
  102. "name": name,
  103. "class": pc.Class,
  104. }
  105. for k, v := range pc.Labels {
  106. lb[k] = v
  107. }
  108. for k, v := range labels {
  109. lb[k] = v
  110. }
  111. probe := newProbe(p, name, interval, lb, pc)
  112. p.probes[name] = probe
  113. go probe.loop()
  114. return probe
  115. }
  116. // newProbe creates a new Probe with the given parameters, but does not start it.
  117. func newProbe(p *Prober, name string, interval time.Duration, lg prometheus.Labels, pc ProbeClass) *Probe {
  118. ctx, cancel := context.WithCancel(context.Background())
  119. probe := &Probe{
  120. prober: p,
  121. ctx: ctx,
  122. cancel: cancel,
  123. stopped: make(chan struct{}),
  124. runSema: syncs.NewSemaphore(cmp.Or(pc.Concurrency, 1)),
  125. name: name,
  126. probeClass: pc,
  127. interval: interval,
  128. timeout: cmp.Or(pc.Timeout, time.Duration(float64(interval)*0.8)),
  129. initialDelay: initialDelay(name, interval),
  130. successHist: ring.New(recentHistSize),
  131. latencyHist: ring.New(recentHistSize),
  132. metrics: prometheus.NewRegistry(),
  133. metricLabels: lg,
  134. mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, lg),
  135. mStartTime: prometheus.NewDesc("start_secs", "Latest probe start time (seconds since epoch)", nil, lg),
  136. mEndTime: prometheus.NewDesc("end_secs", "Latest probe end time (seconds since epoch)", nil, lg),
  137. mLatency: prometheus.NewDesc("latency_millis", "Latest probe latency (ms)", nil, lg),
  138. mResult: prometheus.NewDesc("result", "Latest probe result (1 = success, 0 = failure)", nil, lg),
  139. mAttempts: prometheus.NewCounterVec(prometheus.CounterOpts{
  140. Name: "attempts_total", Help: "Total number of probing attempts", ConstLabels: lg,
  141. }, []string{"status"}),
  142. mSeconds: prometheus.NewCounterVec(prometheus.CounterOpts{
  143. Name: "seconds_total", Help: "Total amount of time spent executing the probe", ConstLabels: lg,
  144. }, []string{"status"}),
  145. }
  146. if p.metrics != nil {
  147. prometheus.WrapRegistererWithPrefix(p.namespace+"_", p.metrics).MustRegister(probe.metrics)
  148. }
  149. probe.metrics.MustRegister(probe)
  150. return probe
  151. }
  152. // unregister removes a probe from the prober's internal state.
  153. func (p *Prober) unregister(probe *Probe) {
  154. p.mu.Lock()
  155. defer p.mu.Unlock()
  156. probe.metrics.Unregister(probe)
  157. p.metrics.Unregister(probe.metrics)
  158. name := probe.name
  159. delete(p.probes, name)
  160. }
  161. // WithSpread is used to enable random delay before the first run of
  162. // each added probe.
  163. func (p *Prober) WithSpread(s bool) *Prober {
  164. p.spread = s
  165. return p
  166. }
  167. // WithOnce mode can be used if you want to run all configured probes once
  168. // rather than on a schedule.
  169. func (p *Prober) WithOnce(s bool) *Prober {
  170. p.once = s
  171. return p
  172. }
  173. // WithMetricNamespace allows changing metric name prefix from the default `prober`.
  174. func (p *Prober) WithMetricNamespace(n string) *Prober {
  175. p.namespace = n
  176. return p
  177. }
  178. // Wait blocks until all probes have finished execution. It should typically
  179. // be used with the `once` mode to wait for probes to finish before collecting
  180. // their results.
  181. func (p *Prober) Wait() {
  182. for {
  183. chans := make([]chan struct{}, 0)
  184. p.mu.Lock()
  185. for _, p := range p.probes {
  186. chans = append(chans, p.stopped)
  187. }
  188. p.mu.Unlock()
  189. for _, c := range chans {
  190. <-c
  191. }
  192. // Since probes can add other probes, retry if the number of probes has changed.
  193. if p.activeProbes() != len(chans) {
  194. continue
  195. }
  196. return
  197. }
  198. }
  199. // Reports the number of registered probes.
  200. func (p *Prober) activeProbes() int {
  201. p.mu.Lock()
  202. defer p.mu.Unlock()
  203. return len(p.probes)
  204. }
  205. // Probe is a probe that healthchecks something and updates Prometheus
  206. // metrics with the results.
  207. type Probe struct {
  208. prober *Prober
  209. ctx context.Context
  210. cancel context.CancelFunc // run to initiate shutdown
  211. stopped chan struct{} // closed when shutdown is complete
  212. runSema syncs.Semaphore // restricts concurrency per probe
  213. name string
  214. probeClass ProbeClass
  215. interval time.Duration
  216. timeout time.Duration
  217. initialDelay time.Duration
  218. tick ticker
  219. // metrics is a Prometheus metrics registry for metrics exported by this probe.
  220. // Using a separate registry allows cleanly removing metrics exported by this
  221. // probe when it gets unregistered.
  222. metrics *prometheus.Registry
  223. metricLabels prometheus.Labels
  224. mInterval *prometheus.Desc
  225. mStartTime *prometheus.Desc
  226. mEndTime *prometheus.Desc
  227. mLatency *prometheus.Desc
  228. mResult *prometheus.Desc
  229. mAttempts *prometheus.CounterVec
  230. mSeconds *prometheus.CounterVec
  231. mu sync.Mutex
  232. start time.Time // last time doProbe started
  233. end time.Time // last time doProbe returned
  234. latency time.Duration // last successful probe latency
  235. succeeded bool // whether the last doProbe call succeeded
  236. lastErr error
  237. // History of recent probe results and latencies.
  238. successHist *ring.Ring
  239. latencyHist *ring.Ring
  240. }
  241. // IsContinuous indicates that this is a continuous probe.
  242. func (p *Probe) IsContinuous() bool {
  243. return p.interval < 0
  244. }
  245. // Close shuts down the Probe and unregisters it from its Prober.
  246. // It is safe to Run a new probe of the same name after Close returns.
  247. func (p *Probe) Close() error {
  248. p.cancel()
  249. <-p.stopped
  250. p.prober.unregister(p)
  251. return nil
  252. }
  253. // probeLoop invokes runProbe on fun every interval. The first probe
  254. // is run after a random delay (if spreading is enabled) or immediately.
  255. func (p *Probe) loop() {
  256. defer close(p.stopped)
  257. if p.prober.spread && p.initialDelay > 0 {
  258. t := p.prober.newTicker(p.initialDelay)
  259. select {
  260. case <-t.Chan():
  261. case <-p.ctx.Done():
  262. t.Stop()
  263. return
  264. }
  265. t.Stop()
  266. }
  267. if p.prober.once {
  268. p.run()
  269. return
  270. }
  271. if p.IsContinuous() {
  272. // Probe function is going to run continuously.
  273. for {
  274. p.run()
  275. // Wait and then retry if probe fails. We use the inverse of the
  276. // configured negative interval as our sleep period.
  277. // TODO(percy):implement exponential backoff, possibly using util/backoff.
  278. select {
  279. case <-time.After(-1 * p.interval):
  280. p.run()
  281. case <-p.ctx.Done():
  282. return
  283. }
  284. }
  285. }
  286. p.tick = p.prober.newTicker(p.interval)
  287. defer p.tick.Stop()
  288. for {
  289. // Run the probe in a new goroutine every tick. Default concurrency & timeout
  290. // settings will ensure that only one probe is running at a time.
  291. go p.run()
  292. select {
  293. case <-p.tick.Chan():
  294. case <-p.ctx.Done():
  295. return
  296. }
  297. }
  298. }
  299. // run invokes the probe function and records the result. It returns the probe
  300. // result and an error if the probe failed.
  301. //
  302. // The probe function is invoked with a timeout slightly less than interval, so
  303. // that the probe either succeeds or fails before the next cycle is scheduled to
  304. // start.
  305. func (p *Probe) run() (pi ProbeInfo, err error) {
  306. // Probes are scheduled each p.interval, so we don't wait longer than that.
  307. semaCtx, cancel := context.WithTimeout(p.ctx, p.interval)
  308. defer cancel()
  309. if !p.runSema.AcquireContext(semaCtx) {
  310. return pi, fmt.Errorf("probe %s: context cancelled", p.name)
  311. }
  312. defer p.runSema.Release()
  313. p.recordStart()
  314. defer func() {
  315. // Prevent a panic within one probe function from killing the
  316. // entire prober, so that a single buggy probe doesn't destroy
  317. // our entire ability to monitor anything. A panic is recorded
  318. // as a probe failure, so panicking probes will trigger an
  319. // alert for debugging.
  320. if r := recover(); r != nil {
  321. log.Printf("probe %s panicked: %v", p.name, r)
  322. err = fmt.Errorf("panic: %v", r)
  323. p.recordEndLocked(err)
  324. }
  325. }()
  326. ctx := p.ctx
  327. if !p.IsContinuous() {
  328. var cancel func()
  329. ctx, cancel = context.WithTimeout(ctx, p.timeout)
  330. defer cancel()
  331. }
  332. err = p.probeClass.Probe(ctx)
  333. p.mu.Lock()
  334. defer p.mu.Unlock()
  335. p.recordEndLocked(err)
  336. if err != nil {
  337. log.Printf("probe %s: %v", p.name, err)
  338. }
  339. pi = p.probeInfoLocked()
  340. return
  341. }
  342. func (p *Probe) recordStart() {
  343. p.mu.Lock()
  344. p.start = p.prober.now()
  345. p.mu.Unlock()
  346. }
  347. func (p *Probe) recordEndLocked(err error) {
  348. end := p.prober.now()
  349. p.end = end
  350. p.succeeded = err == nil
  351. p.lastErr = err
  352. latency := end.Sub(p.start)
  353. if p.succeeded {
  354. p.latency = latency
  355. p.mAttempts.WithLabelValues("ok").Inc()
  356. p.mSeconds.WithLabelValues("ok").Add(latency.Seconds())
  357. p.latencyHist.Value = latency
  358. p.latencyHist = p.latencyHist.Next()
  359. p.mAttempts.WithLabelValues("fail").Add(0)
  360. p.mSeconds.WithLabelValues("fail").Add(0)
  361. } else {
  362. p.latency = 0
  363. p.mAttempts.WithLabelValues("fail").Inc()
  364. p.mSeconds.WithLabelValues("fail").Add(latency.Seconds())
  365. p.mAttempts.WithLabelValues("ok").Add(0)
  366. p.mSeconds.WithLabelValues("ok").Add(0)
  367. }
  368. p.successHist.Value = p.succeeded
  369. p.successHist = p.successHist.Next()
  370. }
  371. // ProbeStatus indicates the status of a probe.
  372. type ProbeStatus string
  373. const (
  374. ProbeStatusUnknown = "unknown"
  375. ProbeStatusRunning = "running"
  376. ProbeStatusFailed = "failed"
  377. ProbeStatusSucceeded = "succeeded"
  378. )
  379. // ProbeInfo is a snapshot of the configuration and state of a Probe.
  380. type ProbeInfo struct {
  381. Name string
  382. Class string
  383. Interval time.Duration
  384. Labels map[string]string
  385. Start time.Time
  386. End time.Time
  387. Latency time.Duration
  388. Status ProbeStatus
  389. Error string
  390. RecentResults []bool
  391. RecentLatencies []time.Duration
  392. }
  393. // RecentSuccessRatio returns the success ratio of the probe in the recent history.
  394. func (pb ProbeInfo) RecentSuccessRatio() float64 {
  395. if len(pb.RecentResults) == 0 {
  396. return 0
  397. }
  398. var sum int
  399. for _, r := range pb.RecentResults {
  400. if r {
  401. sum++
  402. }
  403. }
  404. return float64(sum) / float64(len(pb.RecentResults))
  405. }
  406. // RecentMedianLatency returns the median latency of the probe in the recent history.
  407. func (pb ProbeInfo) RecentMedianLatency() time.Duration {
  408. if len(pb.RecentLatencies) == 0 {
  409. return 0
  410. }
  411. return pb.RecentLatencies[len(pb.RecentLatencies)/2]
  412. }
  413. func (pb ProbeInfo) Continuous() bool {
  414. return pb.Interval < 0
  415. }
  416. // ProbeInfo returns the state of all probes.
  417. func (p *Prober) ProbeInfo() map[string]ProbeInfo {
  418. out := map[string]ProbeInfo{}
  419. p.mu.Lock()
  420. probes := make([]*Probe, 0, len(p.probes))
  421. for _, probe := range p.probes {
  422. probes = append(probes, probe)
  423. }
  424. p.mu.Unlock()
  425. for _, probe := range probes {
  426. probe.mu.Lock()
  427. out[probe.name] = probe.probeInfoLocked()
  428. probe.mu.Unlock()
  429. }
  430. return out
  431. }
  432. // probeInfoLocked returns the state of the probe.
  433. func (probe *Probe) probeInfoLocked() ProbeInfo {
  434. inf := ProbeInfo{
  435. Name: probe.name,
  436. Class: probe.probeClass.Class,
  437. Interval: probe.interval,
  438. Labels: probe.metricLabels,
  439. Start: probe.start,
  440. End: probe.end,
  441. }
  442. inf.Status = ProbeStatusUnknown
  443. if probe.end.Before(probe.start) {
  444. inf.Status = ProbeStatusRunning
  445. } else if probe.succeeded {
  446. inf.Status = ProbeStatusSucceeded
  447. } else if probe.lastErr != nil {
  448. inf.Status = ProbeStatusFailed
  449. inf.Error = probe.lastErr.Error()
  450. }
  451. if probe.latency > 0 {
  452. inf.Latency = probe.latency
  453. }
  454. probe.latencyHist.Do(func(v any) {
  455. if latency, ok := v.(time.Duration); ok {
  456. inf.RecentLatencies = append(inf.RecentLatencies, latency)
  457. }
  458. })
  459. probe.successHist.Do(func(v any) {
  460. if r, ok := v.(bool); ok {
  461. inf.RecentResults = append(inf.RecentResults, r)
  462. }
  463. })
  464. return inf
  465. }
  466. // RunHandlerResponse is the JSON response format for the RunHandler.
  467. type RunHandlerResponse struct {
  468. ProbeInfo ProbeInfo
  469. PreviousSuccessRatio float64
  470. PreviousMedianLatency time.Duration
  471. }
  472. // RunHandler runs a probe by name and returns the result as an HTTP response.
  473. func (p *Prober) RunHandler(w http.ResponseWriter, r *http.Request) error {
  474. // Look up prober by name.
  475. name := r.FormValue("name")
  476. if name == "" {
  477. return tsweb.Error(http.StatusBadRequest, "missing name parameter", nil)
  478. }
  479. p.mu.Lock()
  480. probe, ok := p.probes[name]
  481. p.mu.Unlock()
  482. if !ok || probe.IsContinuous() {
  483. return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil)
  484. }
  485. probe.mu.Lock()
  486. prevInfo := probe.probeInfoLocked()
  487. probe.mu.Unlock()
  488. info, err := probe.run()
  489. respStatus := http.StatusOK
  490. if err != nil {
  491. respStatus = http.StatusFailedDependency
  492. }
  493. // Return serialized JSON response if the client requested JSON
  494. if r.Header.Get("Accept") == "application/json" {
  495. resp := &RunHandlerResponse{
  496. ProbeInfo: info,
  497. PreviousSuccessRatio: prevInfo.RecentSuccessRatio(),
  498. PreviousMedianLatency: prevInfo.RecentMedianLatency(),
  499. }
  500. w.Header().Set("Content-Type", "application/json")
  501. w.WriteHeader(respStatus)
  502. if err := json.NewEncoder(w).Encode(resp); err != nil {
  503. return tsweb.Error(http.StatusInternalServerError, "error encoding JSON response", err)
  504. }
  505. return nil
  506. }
  507. stats := fmt.Sprintf("Last %d probes (including this one): success rate %d%%, median latency %v\n",
  508. len(info.RecentResults),
  509. int(info.RecentSuccessRatio()*100), info.RecentMedianLatency())
  510. if err != nil {
  511. return tsweb.Error(respStatus, fmt.Sprintf("Probe failed: %s\n%s", err.Error(), stats), err)
  512. }
  513. w.WriteHeader(respStatus)
  514. fmt.Fprintf(w, "Probe succeeded in %v\n%s", info.Latency, stats)
  515. return nil
  516. }
  517. type RunHandlerAllResponse struct {
  518. Results map[string]RunHandlerResponse
  519. }
  520. func (p *Prober) RunAllHandler(w http.ResponseWriter, r *http.Request) error {
  521. excluded := r.URL.Query()["exclude"]
  522. probes := make(map[string]*Probe)
  523. p.mu.Lock()
  524. for _, probe := range p.probes {
  525. if !probe.IsContinuous() && !slices.Contains(excluded, probe.name) {
  526. probes[probe.name] = probe
  527. }
  528. }
  529. p.mu.Unlock()
  530. // Do not abort running probes just because one of them has failed.
  531. g := new(errgroup.Group)
  532. var resultsMu sync.Mutex
  533. results := make(map[string]RunHandlerResponse)
  534. for name, probe := range probes {
  535. g.Go(func() error {
  536. probe.mu.Lock()
  537. prevInfo := probe.probeInfoLocked()
  538. probe.mu.Unlock()
  539. info, err := probe.run()
  540. resultsMu.Lock()
  541. results[name] = RunHandlerResponse{
  542. ProbeInfo: info,
  543. PreviousSuccessRatio: prevInfo.RecentSuccessRatio(),
  544. PreviousMedianLatency: prevInfo.RecentMedianLatency(),
  545. }
  546. resultsMu.Unlock()
  547. return err
  548. })
  549. }
  550. respStatus := http.StatusOK
  551. if err := g.Wait(); err != nil {
  552. respStatus = http.StatusFailedDependency
  553. }
  554. // Return serialized JSON response if the client requested JSON
  555. resp := &RunHandlerAllResponse{
  556. Results: results,
  557. }
  558. var b bytes.Buffer
  559. if err := json.NewEncoder(&b).Encode(resp); err != nil {
  560. return tsweb.Error(http.StatusInternalServerError, "error encoding JSON response", err)
  561. }
  562. w.Header().Set("Content-Type", "application/json")
  563. w.WriteHeader(respStatus)
  564. w.Write(b.Bytes())
  565. return nil
  566. }
  567. // Describe implements prometheus.Collector.
  568. func (p *Probe) Describe(ch chan<- *prometheus.Desc) {
  569. ch <- p.mInterval
  570. ch <- p.mStartTime
  571. ch <- p.mEndTime
  572. ch <- p.mResult
  573. ch <- p.mLatency
  574. p.mAttempts.Describe(ch)
  575. p.mSeconds.Describe(ch)
  576. if p.probeClass.Metrics != nil {
  577. for _, m := range p.probeClass.Metrics(p.metricLabels) {
  578. ch <- m.Desc()
  579. }
  580. }
  581. }
  582. // Collect implements prometheus.Collector.
  583. func (p *Probe) Collect(ch chan<- prometheus.Metric) {
  584. p.mu.Lock()
  585. defer p.mu.Unlock()
  586. ch <- prometheus.MustNewConstMetric(p.mInterval, prometheus.GaugeValue, p.interval.Seconds())
  587. if !p.start.IsZero() {
  588. ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix()))
  589. }
  590. // For periodic probes that haven't ended, don't collect probe metrics yet.
  591. if p.end.IsZero() && !p.IsContinuous() {
  592. return
  593. }
  594. ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix()))
  595. if p.succeeded {
  596. ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 1)
  597. } else {
  598. ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 0)
  599. }
  600. if p.latency > 0 {
  601. ch <- prometheus.MustNewConstMetric(p.mLatency, prometheus.GaugeValue, float64(p.latency.Milliseconds()))
  602. }
  603. p.mAttempts.Collect(ch)
  604. p.mSeconds.Collect(ch)
  605. if p.probeClass.Metrics != nil {
  606. for _, m := range p.probeClass.Metrics(p.metricLabels) {
  607. ch <- m
  608. }
  609. }
  610. }
  611. // ticker wraps a time.Ticker in a way that can be faked for tests.
  612. type ticker interface {
  613. Chan() <-chan time.Time
  614. Stop()
  615. }
  616. type realTicker struct {
  617. *time.Ticker
  618. }
  619. func (t *realTicker) Chan() <-chan time.Time {
  620. return t.Ticker.C
  621. }
  622. func newRealTicker(d time.Duration) ticker {
  623. return &realTicker{time.NewTicker(d)}
  624. }
  625. // initialDelay returns a pseudorandom duration in [0, interval) that
  626. // is based on the provided seed string.
  627. func initialDelay(seed string, interval time.Duration) time.Duration {
  628. h := fnv.New64()
  629. fmt.Fprint(h, seed)
  630. r := rand.New(rand.NewSource(int64(h.Sum64()))).Float64()
  631. return time.Duration(float64(interval) * r)
  632. }
  633. // Labels is a set of metric labels used by a prober.
  634. type Labels map[string]string
  635. func (lb Labels) With(k, v string) Labels {
  636. new := maps.Clone(lb)
  637. new[k] = v
  638. return new
  639. }