|
|
@@ -425,6 +425,24 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.
|
|
|
txRecords := make([]txRecord, 0, packetsPerSecond*int(packetTimeout.Seconds()))
|
|
|
var txRecordsMu sync.Mutex
|
|
|
|
|
|
+ // applyTimeouts walks over txRecords and expires any records that are older
|
|
|
+ // than packetTimeout, recording in metrics that they were removed.
|
|
|
+ applyTimeouts := func() {
|
|
|
+ txRecordsMu.Lock()
|
|
|
+ defer txRecordsMu.Unlock()
|
|
|
+
|
|
|
+ now := time.Now()
|
|
|
+ recs := txRecords[:0]
|
|
|
+ for _, r := range txRecords {
|
|
|
+ if now.Sub(r.at) > packetTimeout {
|
|
|
+ packetsDropped.Add(1)
|
|
|
+ } else {
|
|
|
+ recs = append(recs, r)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ txRecords = recs
|
|
|
+ }
|
|
|
+
|
|
|
// Send the packets.
|
|
|
sendErrC := make(chan error, 1)
|
|
|
// TODO: construct a disco CallMeMaybe in the same fashion as magicsock, e.g. magic bytes, src pub, seal payload.
|
|
|
@@ -445,10 +463,12 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.
|
|
|
case <-ctx.Done():
|
|
|
return
|
|
|
case <-t.C:
|
|
|
+ applyTimeouts()
|
|
|
txRecordsMu.Lock()
|
|
|
if len(txRecords) == cap(txRecords) {
|
|
|
txRecords = slices.Delete(txRecords, 0, 1)
|
|
|
packetsDropped.Add(1)
|
|
|
+ log.Printf("unexpected: overflow in txRecords")
|
|
|
}
|
|
|
txRecords = append(txRecords, txRecord{time.Now(), seq})
|
|
|
txRecordsMu.Unlock()
|