|
|
@@ -15,6 +15,7 @@ import (
|
|
|
"net/netip"
|
|
|
"reflect"
|
|
|
"runtime"
|
|
|
+ "slices"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
@@ -55,7 +56,8 @@ func init() {
|
|
|
// to the peer.
|
|
|
type endpoint struct {
|
|
|
// atomically accessed; declared first for alignment reasons
|
|
|
- lastRecv mono.Time
|
|
|
+ lastRecvWG mono.Time // last time there were incoming packets from this peer destined for wireguard-go (e.g. not disco)
|
|
|
+ lastRecvUDPAny mono.Time // last time there were incoming UDP packets from this peer of any kind
|
|
|
numStopAndResetAtomic int64
|
|
|
debugUpdates *ringbuffer.RingBuffer[EndpointChange]
|
|
|
|
|
|
@@ -73,11 +75,12 @@ type endpoint struct {
|
|
|
mu sync.Mutex // Lock ordering: Conn.mu, then endpoint.mu
|
|
|
|
|
|
heartBeatTimer *time.Timer // nil when idle
|
|
|
- lastSend mono.Time // last time there was outgoing packets sent to this peer (from wireguard-go)
|
|
|
+ lastSendExt mono.Time // last time there were outgoing packets sent to this peer from an external trigger (e.g. wireguard-go or disco pingCLI)
|
|
|
+ lastSendAny mono.Time // last time there were outgoing packets sent this peer from any trigger, internal or external to magicsock
|
|
|
lastFullPing mono.Time // last time we pinged all disco or wireguard only endpoints
|
|
|
derpAddr netip.AddrPort // fallback/bootstrap path, if non-zero (non-zero for well-behaved clients)
|
|
|
|
|
|
- bestAddr addrQuality // best non-DERP path; zero if none
|
|
|
+ bestAddr addrQuality // best non-DERP path; zero if none; mutate via setBestAddrLocked()
|
|
|
bestAddrAt mono.Time // time best address re-confirmed
|
|
|
trustBestAddrUntil mono.Time // time when bestAddr expires
|
|
|
sentPing map[stun.TxID]sentPing
|
|
|
@@ -88,11 +91,240 @@ type endpoint struct {
|
|
|
// implementation that's a WIP as of 2022-10-20.
|
|
|
// See #540 for background.
|
|
|
heartbeatDisabled bool
|
|
|
+ probeUDPLifetime *probeUDPLifetime // UDP path lifetime probing; nil if disabled
|
|
|
|
|
|
expired bool // whether the node has expired
|
|
|
isWireguardOnly bool // whether the endpoint is WireGuard only
|
|
|
}
|
|
|
|
|
|
+func (de *endpoint) setBestAddrLocked(v addrQuality) {
|
|
|
+ if v.AddrPort != de.bestAddr.AddrPort {
|
|
|
+ de.probeUDPLifetime.resetCycleEndpointLocked()
|
|
|
+ }
|
|
|
+ de.bestAddr = v
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ // udpLifetimeProbeCliffSlack is how much slack to use relative to a
|
|
|
+ // ProbeUDPLifetimeConfig.Cliffs duration in order to account for RTT,
|
|
|
+ // scheduling jitter, buffers, etc. If the cliff is 10s, we attempt to probe
|
|
|
+ // after 10s - 2s (8s) amount of inactivity.
|
|
|
+ udpLifetimeProbeCliffSlack = time.Second * 2
|
|
|
+ // udpLifetimeProbeSchedulingTolerance is how much of a difference can be
|
|
|
+ // tolerated between a UDP lifetime probe scheduling target and when it
|
|
|
+ // actually fired. This must be some fraction of udpLifetimeProbeCliffSlack.
|
|
|
+ udpLifetimeProbeSchedulingTolerance = udpLifetimeProbeCliffSlack / 8
|
|
|
+)
|
|
|
+
|
|
|
+// probeUDPLifetime represents the configuration and state tied to probing UDP
|
|
|
+// path lifetime. A probe "cycle" involves pinging the UDP path at various
|
|
|
+// timeout cliffs, which are pre-defined durations of interest commonly used by
|
|
|
+// NATs/firewalls as default stateful session timeout values. Cliffs are probed
|
|
|
+// in ascending order. A "cycle" completes when all cliffs have received a pong,
|
|
|
+// or when a ping times out. Cycles may extend across endpoint session lifetimes
|
|
|
+// if they are disrupted by user traffic.
|
|
|
+type probeUDPLifetime struct {
|
|
|
+ // All fields are guarded by endpoint.mu. probeUDPLifetime methods are for
|
|
|
+ // convenience.
|
|
|
+
|
|
|
+ // config holds the probing configuration.
|
|
|
+ config ProbeUDPLifetimeConfig
|
|
|
+
|
|
|
+ // timer is nil when idle. A non-nil timer indicates we intend to probe a
|
|
|
+ // timeout cliff in the future.
|
|
|
+ timer *time.Timer
|
|
|
+
|
|
|
+ // bestAddr contains the endpoint.bestAddr.AddrPort at the time a cycle was
|
|
|
+ // scheduled to start. A probing cycle is 1:1 with the current
|
|
|
+ // endpoint.bestAddr.AddrPort in the interest of simplicity. When
|
|
|
+ // endpoint.bestAddr.AddrPort changes, any active probing cycle will reset.
|
|
|
+ bestAddr netip.AddrPort
|
|
|
+ // cycleStartedAt contains the time at which the first cliff
|
|
|
+ // (ProbeUDPLifetimeConfig.Cliffs[0]) was pinged for the current/last cycle.
|
|
|
+ cycleStartedAt time.Time
|
|
|
+ // cycleActive is true if a probing cycle is active, otherwise false.
|
|
|
+ cycleActive bool
|
|
|
+ // currentCliff represents the index into ProbeUDPLifetimeConfig.Cliffs for
|
|
|
+ // the cliff that we are waiting to ping, or waiting on a pong/timeout.
|
|
|
+ currentCliff int
|
|
|
+ // lastTxID is the ID for the last ping that was sent.
|
|
|
+ lastTxID stun.TxID
|
|
|
+}
|
|
|
+
|
|
|
+func (p *probeUDPLifetime) currentCliffDurationEndpointLocked() time.Duration {
|
|
|
+ if p == nil {
|
|
|
+ return 0
|
|
|
+ }
|
|
|
+ return p.config.Cliffs[p.currentCliff]
|
|
|
+}
|
|
|
+
|
|
|
+// cycleCompleteMaxCliffEndpointLocked records the max cliff (as an index of
|
|
|
+// ProbeUDPLifetimeConfig.Cliffs) a probing cycle reached, i.e. received a pong
|
|
|
+// for. A value < 0 indicates no cliff was reached. It is a no-op if the active
|
|
|
+// configuration does not equal defaultProbeUDPLifetimeConfig.
|
|
|
+func (p *probeUDPLifetime) cycleCompleteMaxCliffEndpointLocked(cliff int) {
|
|
|
+ if !p.config.Equals(defaultProbeUDPLifetimeConfig) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ switch {
|
|
|
+ case cliff < 0:
|
|
|
+ metricUDPLifetimeCycleCompleteNoCliffReached.Add(1)
|
|
|
+ case cliff == 0:
|
|
|
+ metricUDPLifetimeCycleCompleteAt10sCliff.Add(1)
|
|
|
+ case cliff == 1:
|
|
|
+ metricUDPLifetimeCycleCompleteAt30sCliff.Add(1)
|
|
|
+ case cliff == 2:
|
|
|
+ metricUDPLifetimeCycleCompleteAt60sCliff.Add(1)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// resetCycleEndpointLocked resets the state contained in p to reflect an
|
|
|
+// inactive cycle.
|
|
|
+func (p *probeUDPLifetime) resetCycleEndpointLocked() {
|
|
|
+ if p == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if p.timer != nil {
|
|
|
+ p.timer.Stop()
|
|
|
+ p.timer = nil
|
|
|
+ }
|
|
|
+ p.cycleActive = false
|
|
|
+ p.currentCliff = 0
|
|
|
+ p.bestAddr = netip.AddrPort{}
|
|
|
+}
|
|
|
+
|
|
|
+// ProbeUDPLifetimeConfig represents the configuration for probing UDP path
|
|
|
+// lifetime.
|
|
|
+type ProbeUDPLifetimeConfig struct {
|
|
|
+ // The timeout cliffs to probe. Values are in ascending order. Ascending
|
|
|
+ // order is chosen over descending because we have limited opportunities to
|
|
|
+ // probe. With a descending order we are stuck waiting for a new UDP
|
|
|
+ // path/session if the first value times out. When that new path is
|
|
|
+ // established is anyone's guess.
|
|
|
+ Cliffs []time.Duration
|
|
|
+ // CycleCanStartEvery represents the min duration between cycles starting
|
|
|
+ // up.
|
|
|
+ CycleCanStartEvery time.Duration
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ // defaultProbeUDPLifetimeConfig is the configuration that must be used
|
|
|
+ // for UDP path lifetime probing until it can be wholly disseminated (not
|
|
|
+ // just on/off) from upstream control components, and associated metrics
|
|
|
+ // (metricUDPLifetime*) have lifetime management.
|
|
|
+ //
|
|
|
+ // TODO(#10928): support dynamic config via tailcfg.PeerCapMap.
|
|
|
+ defaultProbeUDPLifetimeConfig = &ProbeUDPLifetimeConfig{
|
|
|
+ Cliffs: []time.Duration{
|
|
|
+ time.Second * 10,
|
|
|
+ time.Second * 30,
|
|
|
+ time.Second * 60,
|
|
|
+ },
|
|
|
+ CycleCanStartEvery: time.Hour * 24,
|
|
|
+ }
|
|
|
+)
|
|
|
+
|
|
|
+// Equals returns true if b equals p, otherwise false. If both sides are nil,
|
|
|
+// Equals returns true. If only one side is nil, Equals returns false.
|
|
|
+func (p *ProbeUDPLifetimeConfig) Equals(b *ProbeUDPLifetimeConfig) bool {
|
|
|
+ if p == b {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ if (p == nil && b != nil) || (b == nil && p != nil) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if !slices.Equal(p.Cliffs, b.Cliffs) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if p.CycleCanStartEvery != b.CycleCanStartEvery {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+// Valid returns true if p is valid, otherwise false. p must be non-nil.
|
|
|
+func (p *ProbeUDPLifetimeConfig) Valid() bool {
|
|
|
+ if len(p.Cliffs) < 1 {
|
|
|
+ // We need at least one cliff, otherwise there is nothing to probe.
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if p.CycleCanStartEvery < 1 {
|
|
|
+ // Probing must be constrained by a positive CycleCanStartEvery.
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ for i, c := range p.Cliffs {
|
|
|
+ if c <= max(udpLifetimeProbeCliffSlack*2, heartbeatInterval) {
|
|
|
+ // A timeout cliff less than or equal to twice
|
|
|
+ // udpLifetimeProbeCliffSlack is invalid due to being effectively
|
|
|
+ // zero when the cliff slack is subtracted from the cliff value at
|
|
|
+ // scheduling time.
|
|
|
+ //
|
|
|
+ // A timeout cliff less or equal to the heartbeatInterval is also
|
|
|
+ // invalid, as we may attempt to schedule on the tail end of the
|
|
|
+ // last heartbeat tied to an active session.
|
|
|
+ //
|
|
|
+ // These values are constants, but max()'d in case they change in
|
|
|
+ // the future.
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if i == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if c <= p.Cliffs[i-1] {
|
|
|
+ // Cliffs must be in ascending order.
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+// setProbeUDPLifetimeOn enables or disables probing of UDP path lifetime based
|
|
|
+// on v. In the case of enablement defaultProbeUDPLifetimeConfig is used as the
|
|
|
+// desired configuration.
|
|
|
+func (de *endpoint) setProbeUDPLifetimeOn(v bool) {
|
|
|
+ de.mu.Lock()
|
|
|
+ if v {
|
|
|
+ de.setProbeUDPLifetimeConfigLocked(defaultProbeUDPLifetimeConfig)
|
|
|
+ } else {
|
|
|
+ de.setProbeUDPLifetimeConfigLocked(nil)
|
|
|
+ }
|
|
|
+ de.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+// setProbeUDPLifetimeConfigLocked sets the desired configuration for probing
|
|
|
+// UDP path lifetime. Ownership of desired is passed to endpoint, it must not be
|
|
|
+// mutated once this call is made. A nil value disables the feature. If desired
|
|
|
+// is non-nil but desired.Valid() returns false this is a no-op.
|
|
|
+func (de *endpoint) setProbeUDPLifetimeConfigLocked(desired *ProbeUDPLifetimeConfig) {
|
|
|
+ if de.isWireguardOnly {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if desired == nil {
|
|
|
+ if de.probeUDPLifetime == nil {
|
|
|
+ // noop, not currently configured or desired
|
|
|
+ return
|
|
|
+ }
|
|
|
+ de.probeUDPLifetime.resetCycleEndpointLocked()
|
|
|
+ de.probeUDPLifetime = nil
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !desired.Valid() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if de.probeUDPLifetime != nil {
|
|
|
+ if de.probeUDPLifetime.config.Equals(desired) {
|
|
|
+ // noop, current config equals desired
|
|
|
+ return
|
|
|
+ }
|
|
|
+ de.probeUDPLifetime.resetCycleEndpointLocked()
|
|
|
+ } else {
|
|
|
+ de.probeUDPLifetime = &probeUDPLifetime{}
|
|
|
+ }
|
|
|
+ p := de.probeUDPLifetime
|
|
|
+ p.config = *desired
|
|
|
+ p.resetCycleEndpointLocked()
|
|
|
+}
|
|
|
+
|
|
|
// endpointDisco is the current disco key and short string for an endpoint. This
|
|
|
// structure is immutable.
|
|
|
type endpointDisco struct {
|
|
|
@@ -219,7 +451,7 @@ func (de *endpoint) deleteEndpointLocked(why string, ep netip.AddrPort) {
|
|
|
What: "deleteEndpointLocked-bestAddr-" + why,
|
|
|
From: de.bestAddr,
|
|
|
})
|
|
|
- de.bestAddr = addrQuality{}
|
|
|
+ de.setBestAddrLocked(addrQuality{})
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -236,9 +468,7 @@ func (de *endpoint) initFakeUDPAddr() {
|
|
|
|
|
|
// noteRecvActivity records receive activity on de, and invokes
|
|
|
// Conn.noteRecvActivity no more than once every 10s.
|
|
|
-func (de *endpoint) noteRecvActivity(ipp netip.AddrPort) {
|
|
|
- now := mono.Now()
|
|
|
-
|
|
|
+func (de *endpoint) noteRecvActivity(ipp netip.AddrPort, now mono.Time) {
|
|
|
if de.isWireguardOnly {
|
|
|
de.mu.Lock()
|
|
|
de.bestAddr.AddrPort = ipp
|
|
|
@@ -257,9 +487,9 @@ func (de *endpoint) noteRecvActivity(ipp netip.AddrPort) {
|
|
|
de.mu.Unlock()
|
|
|
}
|
|
|
|
|
|
- elapsed := now.Sub(de.lastRecv.LoadAtomic())
|
|
|
+ elapsed := now.Sub(de.lastRecvWG.LoadAtomic())
|
|
|
if elapsed > 10*time.Second {
|
|
|
- de.lastRecv.StoreAtomic(now)
|
|
|
+ de.lastRecvWG.StoreAtomic(now)
|
|
|
|
|
|
if de.c.noteRecvActivity == nil {
|
|
|
return
|
|
|
@@ -410,12 +640,140 @@ func (de *endpoint) addrForPingSizeLocked(now mono.Time, size int) (udpAddr, der
|
|
|
return netip.AddrPort{}, de.derpAddr
|
|
|
}
|
|
|
|
|
|
+// maybeProbeUDPLifetimeLocked returns an afterInactivityFor duration and true
|
|
|
+// if de is a candidate for UDP path lifetime probing in the future, otherwise
|
|
|
+// false.
|
|
|
+func (de *endpoint) maybeProbeUDPLifetimeLocked() (afterInactivityFor time.Duration, maybe bool) {
|
|
|
+ p := de.probeUDPLifetime
|
|
|
+ if p == nil {
|
|
|
+ return afterInactivityFor, false
|
|
|
+ }
|
|
|
+ if !de.bestAddr.IsValid() {
|
|
|
+ return afterInactivityFor, false
|
|
|
+ }
|
|
|
+ epDisco := de.disco.Load()
|
|
|
+ if epDisco == nil {
|
|
|
+ // peer does not support disco
|
|
|
+ return afterInactivityFor, false
|
|
|
+ }
|
|
|
+ // We compare disco keys, which may have a shorter lifetime than node keys
|
|
|
+ // since disco keys reset on startup. This has the desired side effect of
|
|
|
+ // shuffling probing probability where the local node ends up with a large
|
|
|
+ // key value lexicographically relative to the other nodes it tends to
|
|
|
+ // communicate with. If de's disco key changes, the cycle will reset.
|
|
|
+ if de.c.discoPublic.Compare(epDisco.key) >= 0 {
|
|
|
+ // lower disco pub key node probes higher
|
|
|
+ return afterInactivityFor, false
|
|
|
+ }
|
|
|
+ if !p.cycleActive && time.Since(p.cycleStartedAt) < p.config.CycleCanStartEvery {
|
|
|
+ // This is conservative as it doesn't account for afterInactivityFor use
|
|
|
+ // by the caller, potentially delaying the start of the next cycle. We
|
|
|
+ // assume the cycle could start immediately following
|
|
|
+ // maybeProbeUDPLifetimeLocked(), regardless of the value of
|
|
|
+ // afterInactivityFor relative to latest packets in/out time.
|
|
|
+ return afterInactivityFor, false
|
|
|
+ }
|
|
|
+ afterInactivityFor = p.currentCliffDurationEndpointLocked() - udpLifetimeProbeCliffSlack
|
|
|
+ if afterInactivityFor < 0 {
|
|
|
+ // shouldn't happen
|
|
|
+ return afterInactivityFor, false
|
|
|
+ }
|
|
|
+ return afterInactivityFor, true
|
|
|
+}
|
|
|
+
|
|
|
+// heartbeatForLifetimeVia represents the scheduling source of
|
|
|
+// endpoint.heartbeatForLifetime().
|
|
|
+type heartbeatForLifetimeVia string
|
|
|
+
|
|
|
+const (
|
|
|
+ heartbeatForLifetimeViaSessionInactive heartbeatForLifetimeVia = "session-inactive"
|
|
|
+ heartbeatForLifetimeViaPongRx heartbeatForLifetimeVia = "pong-rx"
|
|
|
+ heartbeatForLifetimeViaSelf heartbeatForLifetimeVia = "self"
|
|
|
+)
|
|
|
+
|
|
|
+// scheduleHeartbeatForLifetimeLocked schedules de.heartbeatForLifetime to fire
|
|
|
+// in the future (after). The caller must describe themselves in the via arg.
|
|
|
+func (de *endpoint) scheduleHeartbeatForLifetimeLocked(after time.Duration, via heartbeatForLifetimeVia) {
|
|
|
+ p := de.probeUDPLifetime
|
|
|
+ if p == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ de.c.dlogf("[v1] magicsock: disco: scheduling UDP lifetime probe for cliff=%v via=%v to %v (%v)",
|
|
|
+ p.currentCliffDurationEndpointLocked(), via, de.publicKey.ShortString(), de.discoShort())
|
|
|
+ p.bestAddr = de.bestAddr.AddrPort
|
|
|
+ p.timer = time.AfterFunc(after, de.heartbeatForLifetime)
|
|
|
+ if via == heartbeatForLifetimeViaSelf {
|
|
|
+ metricUDPLifetimeCliffsRescheduled.Add(1)
|
|
|
+ } else {
|
|
|
+ metricUDPLifetimeCliffsScheduled.Add(1)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// heartbeatForLifetime sends a disco ping recorded locally with a purpose of
|
|
|
+// pingHeartbeatForUDPLifetime to de if de.bestAddr has remained stable, and it
|
|
|
+// has been inactive for a duration that is within the error bounds for current
|
|
|
+// lifetime probing cliff. Alternatively it may reschedule itself into the
|
|
|
+// future, which is one of three scheduling sources. The other scheduling
|
|
|
+// sources are de.heartbeat() and de.probeUDPLifetimeCliffDoneLocked().
|
|
|
+func (de *endpoint) heartbeatForLifetime() {
|
|
|
+ de.mu.Lock()
|
|
|
+ defer de.mu.Unlock()
|
|
|
+ p := de.probeUDPLifetime
|
|
|
+ if p == nil || p.timer == nil {
|
|
|
+ // We raced with a code path trying to p.timer.Stop() us. Give up early
|
|
|
+ // in the interest of simplicity. If p.timer.Stop() happened in
|
|
|
+ // de.heartbeat() presumably because of recent packets in/out we *could*
|
|
|
+ // still probe here, and it would be meaningful, but the time logic
|
|
|
+ // below would reschedule as-is.
|
|
|
+ return
|
|
|
+ }
|
|
|
+ p.timer = nil
|
|
|
+ if !p.bestAddr.IsValid() || de.bestAddr.AddrPort != p.bestAddr {
|
|
|
+ // best path changed
|
|
|
+ p.resetCycleEndpointLocked()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ afterInactivityFor, ok := de.maybeProbeUDPLifetimeLocked()
|
|
|
+ if !ok {
|
|
|
+ p.resetCycleEndpointLocked()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ inactiveFor := mono.Now().Sub(max(de.lastRecvUDPAny.LoadAtomic(), de.lastSendAny))
|
|
|
+ delta := afterInactivityFor - inactiveFor
|
|
|
+ if delta.Abs() > udpLifetimeProbeSchedulingTolerance {
|
|
|
+ if delta < 0 {
|
|
|
+ // We missed our opportunity. We can resume this cliff at the tail
|
|
|
+ // end of another session.
|
|
|
+ metricUDPLifetimeCliffsMissed.Add(1)
|
|
|
+ return
|
|
|
+ } else {
|
|
|
+ // We need to wait longer before sending a ping. This can happen for
|
|
|
+ // a number of reasons, which are described in more detail in
|
|
|
+ // de.heartbeat().
|
|
|
+ de.scheduleHeartbeatForLifetimeLocked(delta, heartbeatForLifetimeViaSelf)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if p.currentCliff == 0 {
|
|
|
+ p.cycleStartedAt = time.Now()
|
|
|
+ p.cycleActive = true
|
|
|
+ }
|
|
|
+ de.c.dlogf("[v1] magicsock: disco: sending disco ping for UDP lifetime probe cliff=%v to %v (%v)",
|
|
|
+ p.currentCliffDurationEndpointLocked(), de.publicKey.ShortString(), de.discoShort())
|
|
|
+ de.startDiscoPingLocked(de.bestAddr.AddrPort, mono.Now(), pingHeartbeatForUDPLifetime, 0, nil)
|
|
|
+}
|
|
|
+
|
|
|
// heartbeat is called every heartbeatInterval to keep the best UDP path alive,
|
|
|
-// or kick off discovery of other paths.
|
|
|
+// kick off discovery of other paths, or schedule the probing of UDP path
|
|
|
+// lifetime on the tail end of an active session.
|
|
|
func (de *endpoint) heartbeat() {
|
|
|
de.mu.Lock()
|
|
|
defer de.mu.Unlock()
|
|
|
|
|
|
+ if de.probeUDPLifetime != nil && de.probeUDPLifetime.timer != nil {
|
|
|
+ de.probeUDPLifetime.timer.Stop()
|
|
|
+ de.probeUDPLifetime.timer = nil
|
|
|
+ }
|
|
|
de.heartBeatTimer = nil
|
|
|
|
|
|
if de.heartbeatDisabled {
|
|
|
@@ -423,18 +781,42 @@ func (de *endpoint) heartbeat() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- if de.lastSend.IsZero() {
|
|
|
+ if de.lastSendExt.IsZero() {
|
|
|
// Shouldn't happen.
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- if mono.Since(de.lastSend) > sessionActiveTimeout {
|
|
|
+ now := mono.Now()
|
|
|
+ if now.Sub(de.lastSendExt) > sessionActiveTimeout {
|
|
|
// Session's idle. Stop heartbeating.
|
|
|
de.c.dlogf("[v1] magicsock: disco: ending heartbeats for idle session to %v (%v)", de.publicKey.ShortString(), de.discoShort())
|
|
|
+ if afterInactivityFor, ok := de.maybeProbeUDPLifetimeLocked(); ok {
|
|
|
+ // This is the best place to best effort schedule a probe of UDP
|
|
|
+ // path lifetime in the future as it loosely translates to "UDP path
|
|
|
+ // is inactive".
|
|
|
+ //
|
|
|
+ // Note: wireguard-go schedules a WireGuard keepalive packet (by
|
|
|
+ // default, not tied to persistent keepalive feature) 10 seconds in
|
|
|
+ // the future after receiving an authenticated data packet. It's
|
|
|
+ // typically only sent by one side based on how the WireGuard state
|
|
|
+ // machine controls the timer. So, if we are on the receiving end of
|
|
|
+ // that keepalive, de.lastSendExt won't move, assuming there is no
|
|
|
+ // other user-generated traffic. This is one reason why we perform
|
|
|
+ // a more granular check of the last packets in/out time, below, as
|
|
|
+ // a WireGuard keepalive may have fallen somewhere within the
|
|
|
+ // sessionActiveTimeout window. heartbeatForLifetime will also
|
|
|
+ // perform a similar check, and reschedule as necessary.
|
|
|
+ inactiveFor := now.Sub(max(de.lastSendAny, de.lastRecvUDPAny.LoadAtomic()))
|
|
|
+ after := afterInactivityFor - inactiveFor
|
|
|
+ if after < 0 {
|
|
|
+ // shouldn't happen
|
|
|
+ return
|
|
|
+ }
|
|
|
+ de.scheduleHeartbeatForLifetimeLocked(after, heartbeatForLifetimeViaSessionInactive)
|
|
|
+ }
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- now := mono.Now()
|
|
|
udpAddr, _, _ := de.addrForSendLocked(now)
|
|
|
if udpAddr.IsValid() {
|
|
|
// We have a preferred path. Ping that every 2 seconds.
|
|
|
@@ -478,8 +860,8 @@ func (de *endpoint) wantFullPingLocked(now mono.Time) bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
-func (de *endpoint) noteActiveLocked() {
|
|
|
- de.lastSend = mono.Now()
|
|
|
+func (de *endpoint) noteTxActivityExtTriggerLocked(now mono.Time) {
|
|
|
+ de.lastSendExt = now
|
|
|
if de.heartBeatTimer == nil && !de.heartbeatDisabled {
|
|
|
de.heartBeatTimer = time.AfterFunc(heartbeatInterval, de.heartbeat)
|
|
|
}
|
|
|
@@ -536,7 +918,6 @@ func (de *endpoint) discoPing(res *ipnstate.PingResult, size int, cb func(*ipnst
|
|
|
de.startDiscoPingLocked(ep, now, pingCLI, size, resCB)
|
|
|
}
|
|
|
}
|
|
|
- de.noteActiveLocked()
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
@@ -562,7 +943,8 @@ func (de *endpoint) send(buffs [][]byte) error {
|
|
|
} else if !udpAddr.IsValid() || now.After(de.trustBestAddrUntil) {
|
|
|
de.sendDiscoPingsLocked(now, true)
|
|
|
}
|
|
|
- de.noteActiveLocked()
|
|
|
+ de.noteTxActivityExtTriggerLocked(now)
|
|
|
+ de.lastSendAny = now
|
|
|
de.mu.Unlock()
|
|
|
|
|
|
if !udpAddr.IsValid() && !derpAddr.IsValid() {
|
|
|
@@ -606,6 +988,42 @@ func (de *endpoint) send(buffs [][]byte) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+// probeUDPLifetimeCliffDoneLocked is called when a disco
|
|
|
+// pingHeartbeatForUDPLifetime is being cleaned up. result contains the reason
|
|
|
+// for the cleanup, txid contains the ping's txid.
|
|
|
+// probeUDPLifetimeCliffDoneLocked may schedule another
|
|
|
+// pingHeartbeatForUDPLifetime in the future if there is another cliff remaining
|
|
|
+// for the current probing cycle.
|
|
|
+func (de *endpoint) probeUDPLifetimeCliffDoneLocked(result discoPingResult, txid stun.TxID) {
|
|
|
+ p := de.probeUDPLifetime
|
|
|
+ if p == nil || !p.cycleActive || de.probeUDPLifetime.timer != nil || txid != p.lastTxID {
|
|
|
+ // Probing may have been disabled while heartbeats were in flight. This
|
|
|
+ // can also be a duplicate or late arriving result.
|
|
|
+ return
|
|
|
+ }
|
|
|
+ metricUDPLifetimeCliffsCompleted.Add(1)
|
|
|
+ if result != discoPongReceived || p.currentCliff >= len(p.config.Cliffs)-1 {
|
|
|
+ maxCliffIndex := p.currentCliff
|
|
|
+ if result != discoPongReceived {
|
|
|
+ maxCliffIndex = p.currentCliff - 1
|
|
|
+ }
|
|
|
+ var maxCliffDuration time.Duration
|
|
|
+ if maxCliffIndex >= 0 {
|
|
|
+ maxCliffDuration = p.config.Cliffs[maxCliffIndex]
|
|
|
+ }
|
|
|
+ p.cycleCompleteMaxCliffEndpointLocked(maxCliffIndex)
|
|
|
+ de.c.dlogf("[v1] magicsock: disco: UDP lifetime probe cycle completed max cliff=%v for %v (%v)",
|
|
|
+ maxCliffDuration, de.publicKey.ShortString(), de.discoShort())
|
|
|
+ metricUDPLifetimeCyclesCompleted.Add(1)
|
|
|
+ p.resetCycleEndpointLocked()
|
|
|
+ } else {
|
|
|
+ p.currentCliff++
|
|
|
+ if after, ok := de.maybeProbeUDPLifetimeLocked(); ok {
|
|
|
+ de.scheduleHeartbeatForLifetimeLocked(after, heartbeatForLifetimeViaPongRx)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (de *endpoint) discoPingTimeout(txid stun.TxID) {
|
|
|
de.mu.Lock()
|
|
|
defer de.mu.Unlock()
|
|
|
@@ -616,23 +1034,36 @@ func (de *endpoint) discoPingTimeout(txid stun.TxID) {
|
|
|
if debugDisco() || !de.bestAddr.IsValid() || mono.Now().After(de.trustBestAddrUntil) {
|
|
|
de.c.dlogf("[v1] magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid[:6], sp.to, de.publicKey.ShortString(), de.discoShort())
|
|
|
}
|
|
|
- de.removeSentDiscoPingLocked(txid, sp)
|
|
|
+ de.removeSentDiscoPingLocked(txid, sp, discoPingTimedOut)
|
|
|
}
|
|
|
|
|
|
-// forgetDiscoPing is called by a timer when a ping either fails to send or
|
|
|
-// has taken too long to get a pong reply.
|
|
|
+// forgetDiscoPing is called when a ping fails to send.
|
|
|
func (de *endpoint) forgetDiscoPing(txid stun.TxID) {
|
|
|
de.mu.Lock()
|
|
|
defer de.mu.Unlock()
|
|
|
if sp, ok := de.sentPing[txid]; ok {
|
|
|
- de.removeSentDiscoPingLocked(txid, sp)
|
|
|
+ de.removeSentDiscoPingLocked(txid, sp, discoPingFailed)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (de *endpoint) removeSentDiscoPingLocked(txid stun.TxID, sp sentPing) {
|
|
|
+// discoPingResult represents the result of an attempted disco ping send
|
|
|
+// operation.
|
|
|
+type discoPingResult int
|
|
|
+
|
|
|
+const (
|
|
|
+ discoPingResultUnknown discoPingResult = iota
|
|
|
+ discoPingFailed
|
|
|
+ discoPingTimedOut
|
|
|
+ discoPongReceived
|
|
|
+)
|
|
|
+
|
|
|
+func (de *endpoint) removeSentDiscoPingLocked(txid stun.TxID, sp sentPing, result discoPingResult) {
|
|
|
// Stop the timer for the case where sendPing failed to write to UDP.
|
|
|
// In the case of a timer already having fired, this is a no-op:
|
|
|
sp.timer.Stop()
|
|
|
+ if sp.purpose == pingHeartbeatForUDPLifetime {
|
|
|
+ de.probeUDPLifetimeCliffDoneLocked(result, txid)
|
|
|
+ }
|
|
|
delete(de.sentPing, txid)
|
|
|
}
|
|
|
|
|
|
@@ -685,6 +1116,11 @@ const (
|
|
|
// pingCLI means that the user is running "tailscale ping"
|
|
|
// from the CLI. These types of pings can go over DERP.
|
|
|
pingCLI
|
|
|
+
|
|
|
+ // pingHeartbeatForUDPLifetime means that the purpose of a ping was to
|
|
|
+ // discover whether the UDP path was still active through any and all
|
|
|
+ // stateful middleboxes involved.
|
|
|
+ pingHeartbeatForUDPLifetime
|
|
|
)
|
|
|
|
|
|
// startDiscoPingLocked sends a disco ping to ep in a separate goroutine. resCB,
|
|
|
@@ -731,6 +1167,10 @@ func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpo
|
|
|
if purpose == pingHeartbeat {
|
|
|
logLevel = discoVerboseLog
|
|
|
}
|
|
|
+ if purpose == pingCLI {
|
|
|
+ de.noteTxActivityExtTriggerLocked(now)
|
|
|
+ }
|
|
|
+ de.lastSendAny = now
|
|
|
for _, s := range sizes {
|
|
|
txid := stun.NewTxID()
|
|
|
de.sentPing[txid] = sentPing{
|
|
|
@@ -741,6 +1181,9 @@ func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpo
|
|
|
resCB: resCB,
|
|
|
size: s,
|
|
|
}
|
|
|
+ if purpose == pingHeartbeatForUDPLifetime && de.probeUDPLifetime != nil {
|
|
|
+ de.probeUDPLifetime.lastTxID = txid
|
|
|
+ }
|
|
|
go de.sendDiscoPing(ep, epDisco.key, txid, s, logLevel)
|
|
|
}
|
|
|
|
|
|
@@ -864,7 +1307,7 @@ func (de *endpoint) setLastPing(ipp netip.AddrPort, now mono.Time) {
|
|
|
|
|
|
// updateFromNode updates the endpoint based on a tailcfg.Node from a NetMap
|
|
|
// update.
|
|
|
-func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool) {
|
|
|
+func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, probeUDPLifetimeEnabled bool) {
|
|
|
if !n.Valid() {
|
|
|
panic("nil node when updating endpoint")
|
|
|
}
|
|
|
@@ -872,6 +1315,11 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool) {
|
|
|
defer de.mu.Unlock()
|
|
|
|
|
|
de.heartbeatDisabled = heartbeatDisabled
|
|
|
+ if probeUDPLifetimeEnabled {
|
|
|
+ de.setProbeUDPLifetimeConfigLocked(defaultProbeUDPLifetimeConfig)
|
|
|
+ } else {
|
|
|
+ de.setProbeUDPLifetimeConfigLocked(nil)
|
|
|
+ }
|
|
|
de.expired = n.Expired()
|
|
|
|
|
|
epDisco := de.disco.Load()
|
|
|
@@ -1009,7 +1457,7 @@ func (de *endpoint) addCandidateEndpoint(ep netip.AddrPort, forRxPingTxID stun.T
|
|
|
//
|
|
|
// de.mu must be held.
|
|
|
func (de *endpoint) clearBestAddrLocked() {
|
|
|
- de.bestAddr = addrQuality{}
|
|
|
+ de.setBestAddrLocked(addrQuality{})
|
|
|
de.bestAddrAt = 0
|
|
|
de.trustBestAddrUntil = 0
|
|
|
}
|
|
|
@@ -1093,7 +1541,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip
|
|
|
return false
|
|
|
}
|
|
|
knownTxID = true // for naked returns below
|
|
|
- de.removeSentDiscoPingLocked(m.TxID, sp)
|
|
|
+ de.removeSentDiscoPingLocked(m.TxID, sp, discoPongReceived)
|
|
|
|
|
|
pktLen := int(pingSizeToPktLen(sp.size, sp.to.Addr().Is6()))
|
|
|
if sp.size != 0 {
|
|
|
@@ -1124,7 +1572,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- if sp.purpose != pingHeartbeat {
|
|
|
+ if sp.purpose != pingHeartbeat && sp.purpose != pingHeartbeatForUDPLifetime {
|
|
|
de.c.dlogf("[v1] magicsock: disco: %v<-%v (%v, %v) got pong tx=%x latency=%v pktlen=%v pong.src=%v%v", de.c.discoShort, de.discoShort(), de.publicKey.ShortString(), src, m.TxID[:6], latency.Round(time.Millisecond), pktLen, m.Src, logger.ArgWriter(func(bw *bufio.Writer) {
|
|
|
if sp.to != src {
|
|
|
fmt.Fprintf(bw, " ping.to=%v", sp.to)
|
|
|
@@ -1152,7 +1600,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip
|
|
|
From: de.bestAddr,
|
|
|
To: thisPong,
|
|
|
})
|
|
|
- de.bestAddr = thisPong
|
|
|
+ de.setBestAddrLocked(thisPong)
|
|
|
}
|
|
|
if de.bestAddr.AddrPort == thisPong.AddrPort {
|
|
|
de.debugUpdates.Add(EndpointChange{
|
|
|
@@ -1327,13 +1775,13 @@ func (de *endpoint) populatePeerStatus(ps *ipnstate.PeerStatus) {
|
|
|
|
|
|
ps.Relay = de.c.derpRegionCodeOfIDLocked(int(de.derpAddr.Port()))
|
|
|
|
|
|
- if de.lastSend.IsZero() {
|
|
|
+ if de.lastSendExt.IsZero() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
now := mono.Now()
|
|
|
- ps.LastWrite = de.lastSend.WallTime()
|
|
|
- ps.Active = now.Sub(de.lastSend) < sessionActiveTimeout
|
|
|
+ ps.LastWrite = de.lastSendExt.WallTime()
|
|
|
+ ps.Active = now.Sub(de.lastSendExt) < sessionActiveTimeout
|
|
|
|
|
|
if udpAddr, derpAddr, _ := de.addrForSendLocked(now); udpAddr.IsValid() && !derpAddr.IsValid() {
|
|
|
ps.CurAddr = udpAddr.String()
|
|
|
@@ -1372,7 +1820,7 @@ func (de *endpoint) stopAndReset() {
|
|
|
// DERP-only endpoint. It does not stop the endpoint's heartbeat
|
|
|
// timer, if one is running.
|
|
|
func (de *endpoint) resetLocked() {
|
|
|
- de.lastSend = 0
|
|
|
+ de.lastSendExt = 0
|
|
|
de.lastFullPing = 0
|
|
|
de.clearBestAddrLocked()
|
|
|
for _, es := range de.endpointState {
|
|
|
@@ -1380,9 +1828,10 @@ func (de *endpoint) resetLocked() {
|
|
|
}
|
|
|
if !de.isWireguardOnly {
|
|
|
for txid, sp := range de.sentPing {
|
|
|
- de.removeSentDiscoPingLocked(txid, sp)
|
|
|
+ de.removeSentDiscoPingLocked(txid, sp, discoPingResultUnknown)
|
|
|
}
|
|
|
}
|
|
|
+ de.probeUDPLifetime.resetCycleEndpointLocked()
|
|
|
}
|
|
|
|
|
|
func (de *endpoint) numStopAndReset() int64 {
|