2
0

derp.go 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package prober
  4. import (
  5. "bytes"
  6. "cmp"
  7. "context"
  8. crand "crypto/rand"
  9. "crypto/tls"
  10. "encoding/binary"
  11. "encoding/json"
  12. "errors"
  13. "expvar"
  14. "fmt"
  15. "io"
  16. "log"
  17. "maps"
  18. "net"
  19. "net/http"
  20. "net/netip"
  21. "slices"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/prometheus/client_golang/prometheus"
  27. wgconn "github.com/tailscale/wireguard-go/conn"
  28. "github.com/tailscale/wireguard-go/device"
  29. "github.com/tailscale/wireguard-go/tun"
  30. "go4.org/netipx"
  31. "tailscale.com/client/local"
  32. "tailscale.com/derp"
  33. "tailscale.com/derp/derphttp"
  34. "tailscale.com/net/netmon"
  35. "tailscale.com/net/stun"
  36. "tailscale.com/net/tstun"
  37. "tailscale.com/syncs"
  38. "tailscale.com/tailcfg"
  39. "tailscale.com/types/key"
  40. "tailscale.com/types/logger"
  41. )
  42. // derpProber dynamically manages several probes for each DERP server
  43. // based on the current DERPMap.
  44. type derpProber struct {
  45. p *Prober
  46. derpMapURL string // or "local"
  47. meshKey key.DERPMesh
  48. udpInterval time.Duration
  49. meshInterval time.Duration
  50. tlsInterval time.Duration
  51. // Optional bandwidth probing.
  52. bwInterval time.Duration
  53. bwProbeSize int64
  54. bwTUNIPv4Prefix *netip.Prefix // or nil to not use TUN
  55. // Optional queuing delay probing.
  56. qdPacketsPerSecond int // in packets per second
  57. qdPacketTimeout time.Duration
  58. // Optionally restrict probes to a single regionCodeOrID.
  59. regionCodeOrID string
  60. // Probe class for fetching & updating the DERP map.
  61. ProbeMap ProbeClass
  62. // Probe classes for probing individual derpers.
  63. tlsProbeFn func(string, *tls.Config) ProbeClass
  64. udpProbeFn func(string, int) ProbeClass
  65. meshProbeFn func(string, string) ProbeClass
  66. bwProbeFn func(string, string, int64) ProbeClass
  67. qdProbeFn func(string, string, int, time.Duration, key.DERPMesh) ProbeClass
  68. sync.Mutex
  69. lastDERPMap *tailcfg.DERPMap
  70. lastDERPMapAt time.Time
  71. nodes map[string]*tailcfg.DERPNode
  72. probes map[string]*Probe
  73. }
  74. type DERPOpt func(*derpProber)
  75. // WithBandwidthProbing enables bandwidth probing. When enabled, a payload of
  76. // `size` bytes will be regularly transferred through each DERP server, and each
  77. // pair of DERP servers in every region. If tunAddress is specified, probes will
  78. // use a TCP connection over a TUN device at this address in order to exercise
  79. // TCP-in-TCP in similar fashion to TCP over Tailscale via DERP.
  80. func WithBandwidthProbing(interval time.Duration, size int64, tunAddress string) DERPOpt {
  81. return func(d *derpProber) {
  82. d.bwInterval = interval
  83. d.bwProbeSize = size
  84. if tunAddress != "" {
  85. prefix, err := netip.ParsePrefix(fmt.Sprintf("%s/30", tunAddress))
  86. if err != nil {
  87. log.Fatalf("failed to parse IP prefix from bw-tun-ipv4-addr: %v", err)
  88. }
  89. d.bwTUNIPv4Prefix = &prefix
  90. }
  91. }
  92. }
  93. // WithQueuingDelayProbing enables/disables queuing delay probing. qdSendRate
  94. // is the number of packets sent per second. qdTimeout is the amount of time
  95. // after which a sent packet is considered to have timed out.
  96. func WithQueuingDelayProbing(qdPacketsPerSecond int, qdPacketTimeout time.Duration) DERPOpt {
  97. return func(d *derpProber) {
  98. d.qdPacketsPerSecond = qdPacketsPerSecond
  99. d.qdPacketTimeout = qdPacketTimeout
  100. }
  101. }
  102. // WithMeshProbing enables mesh probing. When enabled, a small message will be
  103. // transferred through each DERP server and each pair of DERP servers.
  104. func WithMeshProbing(interval time.Duration) DERPOpt {
  105. return func(d *derpProber) {
  106. d.meshInterval = interval
  107. }
  108. }
  109. // WithSTUNProbing enables STUN/UDP probing, with a STUN request being sent
  110. // to each DERP server every `interval`.
  111. func WithSTUNProbing(interval time.Duration) DERPOpt {
  112. return func(d *derpProber) {
  113. d.udpInterval = interval
  114. }
  115. }
  116. // WithTLSProbing enables TLS probing that will check TLS certificate on port
  117. // 443 of each DERP server every `interval`.
  118. func WithTLSProbing(interval time.Duration) DERPOpt {
  119. return func(d *derpProber) {
  120. d.tlsInterval = interval
  121. }
  122. }
  123. // WithRegionCodeOrID restricts probing to the specified region identified by its code
  124. // (e.g. "lax") or its id (e.g. "17"). This is case sensitive.
  125. func WithRegionCodeOrID(regionCode string) DERPOpt {
  126. return func(d *derpProber) {
  127. d.regionCodeOrID = regionCode
  128. }
  129. }
  130. func WithMeshKey(meshKey key.DERPMesh) DERPOpt {
  131. return func(d *derpProber) {
  132. d.meshKey = meshKey
  133. }
  134. }
  135. // DERP creates a new derpProber.
  136. //
  137. // If derpMapURL is "local", the DERPMap is fetched via
  138. // the local machine's tailscaled.
  139. func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) {
  140. d := &derpProber{
  141. p: p,
  142. derpMapURL: derpMapURL,
  143. tlsProbeFn: TLS,
  144. nodes: make(map[string]*tailcfg.DERPNode),
  145. probes: make(map[string]*Probe),
  146. }
  147. d.ProbeMap = ProbeClass{
  148. Probe: d.probeMapFn,
  149. Class: "derp_map",
  150. }
  151. for _, o := range opts {
  152. o(d)
  153. }
  154. d.udpProbeFn = d.ProbeUDP
  155. d.meshProbeFn = d.probeMesh
  156. d.bwProbeFn = d.probeBandwidth
  157. d.qdProbeFn = d.probeQueuingDelay
  158. return d, nil
  159. }
  160. // probeMapFn fetches the DERPMap and creates/destroys probes for each
  161. // DERP server as necessary. It should get regularly executed as a
  162. // probe function itself.
  163. func (d *derpProber) probeMapFn(ctx context.Context) error {
  164. if err := d.updateMap(ctx); err != nil {
  165. return err
  166. }
  167. wantProbes := map[string]bool{}
  168. d.Lock()
  169. defer d.Unlock()
  170. for _, region := range d.lastDERPMap.Regions {
  171. if d.skipRegion(region) {
  172. continue
  173. }
  174. for _, server := range region.Nodes {
  175. labels := Labels{
  176. "region": region.RegionCode,
  177. "region_id": strconv.Itoa(region.RegionID),
  178. "hostname": server.HostName,
  179. }
  180. if d.tlsInterval > 0 {
  181. n := fmt.Sprintf("derp/%s/%s/tls", region.RegionCode, server.Name)
  182. wantProbes[n] = true
  183. if d.probes[n] == nil {
  184. log.Printf("adding DERP TLS probe for %s (%s) every %v", server.Name, region.RegionName, d.tlsInterval)
  185. derpPort := cmp.Or(server.DERPPort, 443)
  186. d.probes[n] = d.p.Run(n, d.tlsInterval, labels, d.tlsProbeFn(fmt.Sprintf("%s:%d", server.HostName, derpPort), nil))
  187. }
  188. }
  189. if d.udpInterval > 0 {
  190. for idx, ipStr := range []string{server.IPv6, server.IPv4} {
  191. n := fmt.Sprintf("derp/%s/%s/udp", region.RegionCode, server.Name)
  192. if idx == 0 {
  193. n += "6"
  194. }
  195. if ipStr == "" || server.STUNPort == -1 {
  196. continue
  197. }
  198. wantProbes[n] = true
  199. if d.probes[n] == nil {
  200. log.Printf("adding DERP UDP probe for %s (%s) every %v", server.Name, n, d.udpInterval)
  201. d.probes[n] = d.p.Run(n, d.udpInterval, labels, d.udpProbeFn(ipStr, server.STUNPort))
  202. }
  203. }
  204. }
  205. for _, to := range region.Nodes {
  206. if d.meshInterval > 0 {
  207. n := fmt.Sprintf("derp/%s/%s/%s/mesh", region.RegionCode, server.Name, to.Name)
  208. wantProbes[n] = true
  209. if d.probes[n] == nil {
  210. log.Printf("adding DERP mesh probe for %s->%s (%s) every %v", server.Name, to.Name, region.RegionName, d.meshInterval)
  211. d.probes[n] = d.p.Run(n, d.meshInterval, labels, d.meshProbeFn(server.Name, to.Name))
  212. }
  213. }
  214. if d.bwInterval != 0 && d.bwProbeSize > 0 {
  215. n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name)
  216. wantProbes[n] = true
  217. if d.probes[n] == nil {
  218. tunString := ""
  219. if d.bwTUNIPv4Prefix != nil {
  220. tunString = " (TUN)"
  221. }
  222. log.Printf("adding%s DERP bandwidth probe for %s->%s (%s) %v bytes every %v", tunString, server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval)
  223. d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize))
  224. }
  225. }
  226. if d.qdPacketsPerSecond > 0 {
  227. n := fmt.Sprintf("derp/%s/%s/%s/qd", region.RegionCode, server.Name, to.Name)
  228. wantProbes[n] = true
  229. if d.probes[n] == nil {
  230. log.Printf("adding DERP queuing delay probe for %s->%s (%s)", server.Name, to.Name, region.RegionName)
  231. d.probes[n] = d.p.Run(n, -10*time.Second, labels, d.qdProbeFn(server.Name, to.Name, d.qdPacketsPerSecond, d.qdPacketTimeout, d.meshKey))
  232. }
  233. }
  234. }
  235. }
  236. }
  237. for n, probe := range d.probes {
  238. if !wantProbes[n] {
  239. log.Printf("removing DERP probe %s", n)
  240. probe.Close()
  241. delete(d.probes, n)
  242. }
  243. }
  244. return nil
  245. }
  246. // probeMesh returns a probe class that sends a test packet through a pair of DERP
  247. // servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to'
  248. // are expected to be names (DERPNode.Name) of two DERP servers in the same region.
  249. func (d *derpProber) probeMesh(from, to string) ProbeClass {
  250. derpPath := "mesh"
  251. if from == to {
  252. derpPath = "single"
  253. }
  254. return ProbeClass{
  255. Probe: func(ctx context.Context) error {
  256. fromN, toN, err := d.getNodePair(from, to)
  257. if err != nil {
  258. return err
  259. }
  260. dm := d.lastDERPMap
  261. return derpProbeNodePair(ctx, dm, fromN, toN, d.meshKey)
  262. },
  263. Class: "derp_mesh",
  264. Labels: Labels{"derp_path": derpPath},
  265. }
  266. }
  267. // probeBandwidth returns a probe class that sends a payload of a given size
  268. // through a pair of DERP servers (or just one server, if 'from' and 'to' are
  269. // the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two
  270. // DERP servers in the same region.
  271. func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass {
  272. derpPath := "mesh"
  273. if from == to {
  274. derpPath = "single"
  275. }
  276. var transferTimeSeconds expvar.Float
  277. var totalBytesTransferred expvar.Float
  278. return ProbeClass{
  279. Probe: func(ctx context.Context) error {
  280. fromN, toN, err := d.getNodePair(from, to)
  281. if err != nil {
  282. return err
  283. }
  284. return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTimeSeconds, &totalBytesTransferred, d.bwTUNIPv4Prefix, d.meshKey)
  285. },
  286. Class: "derp_bw",
  287. Labels: Labels{
  288. "derp_path": derpPath,
  289. "tcp_in_tcp": strconv.FormatBool(d.bwTUNIPv4Prefix != nil),
  290. },
  291. Metrics: func(lb prometheus.Labels) []prometheus.Metric {
  292. metrics := []prometheus.Metric{
  293. prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_size_bytes", "Payload size of the bandwidth prober", nil, lb), prometheus.GaugeValue, float64(size)),
  294. prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_transfer_time_seconds_total", "Time it took to transfer data", nil, lb), prometheus.CounterValue, transferTimeSeconds.Value()),
  295. }
  296. if d.bwTUNIPv4Prefix != nil {
  297. // For TCP-in-TCP probes, also record cumulative bytes transferred.
  298. metrics = append(metrics, prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_bytes_total", "Amount of data transferred", nil, lb), prometheus.CounterValue, totalBytesTransferred.Value()))
  299. }
  300. return metrics
  301. },
  302. }
  303. }
  304. // probeQueuingDelay returns a probe class that continuously sends packets
  305. // through a pair of DERP servers (or just one server, if 'from' and 'to' are
  306. // the same) at a rate of `packetsPerSecond` packets per second in order to
  307. // measure queuing delays. Packets arriving after `packetTimeout` don't contribute
  308. // to the queuing delay measurement and are recorded as dropped. 'from' and 'to' are
  309. // expected to be names (DERPNode.Name) of two DERP servers in the same region,
  310. // and may refer to the same server.
  311. func (d *derpProber) probeQueuingDelay(from, to string, packetsPerSecond int, packetTimeout time.Duration, meshKey key.DERPMesh) ProbeClass {
  312. derpPath := "mesh"
  313. if from == to {
  314. derpPath = "single"
  315. }
  316. var packetsDropped expvar.Float
  317. qdh := newHistogram([]float64{.005, .01, .025, .05, .1, .25, .5, 1})
  318. return ProbeClass{
  319. Probe: func(ctx context.Context) error {
  320. fromN, toN, err := d.getNodePair(from, to)
  321. if err != nil {
  322. return err
  323. }
  324. return derpProbeQueuingDelay(ctx, d.lastDERPMap, fromN, toN, packetsPerSecond, packetTimeout, &packetsDropped, qdh, meshKey)
  325. },
  326. Class: "derp_qd",
  327. Labels: Labels{"derp_path": derpPath},
  328. Metrics: func(lb prometheus.Labels) []prometheus.Metric {
  329. qdh.mx.Lock()
  330. result := []prometheus.Metric{
  331. prometheus.MustNewConstMetric(prometheus.NewDesc("derp_qd_probe_dropped_packets", "Total packets dropped", nil, lb), prometheus.CounterValue, float64(packetsDropped.Value())),
  332. prometheus.MustNewConstHistogram(prometheus.NewDesc("derp_qd_probe_delays_seconds", "Distribution of queuing delays", nil, lb), qdh.count, qdh.sum, maps.Clone(qdh.bucketedCounts)),
  333. }
  334. qdh.mx.Unlock()
  335. return result
  336. },
  337. }
  338. }
  339. // derpProbeQueuingDelay continuously sends data between two local DERP clients
  340. // connected to two DERP servers in order to measure queuing delays. From and to
  341. // can be the same server.
  342. func derpProbeQueuingDelay(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, packetsPerSecond int, packetTimeout time.Duration, packetsDropped *expvar.Float, qdh *histogram, meshKey key.DERPMesh) (err error) {
  343. // This probe uses clients with isProber=false to avoid spamming the derper
  344. // logs with every packet sent by the queuing delay probe.
  345. fromc, err := newConn(ctx, dm, from, false, meshKey)
  346. if err != nil {
  347. return err
  348. }
  349. defer fromc.Close()
  350. toc, err := newConn(ctx, dm, to, false, meshKey)
  351. if err != nil {
  352. return err
  353. }
  354. defer toc.Close()
  355. // Wait a bit for from's node to hear about to existing on the
  356. // other node in the region, in the case where the two nodes
  357. // are different.
  358. if from.Name != to.Name {
  359. time.Sleep(100 * time.Millisecond) // pretty arbitrary
  360. }
  361. if err := runDerpProbeQueuingDelayContinously(ctx, from, to, fromc, toc, packetsPerSecond, packetTimeout, packetsDropped, qdh); err != nil {
  362. // Record pubkeys on failed probes to aid investigation.
  363. return fmt.Errorf("%s -> %s: %w",
  364. fromc.SelfPublicKey().ShortString(),
  365. toc.SelfPublicKey().ShortString(), err)
  366. }
  367. return nil
  368. }
  369. func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, packetsPerSecond int, packetTimeout time.Duration, packetsDropped *expvar.Float, qdh *histogram) error {
  370. // Make sure all goroutines have finished.
  371. var wg sync.WaitGroup
  372. defer wg.Wait()
  373. // Close the clients to make sure goroutines that are reading/writing from them terminate.
  374. defer fromc.Close()
  375. defer toc.Close()
  376. type txRecord struct {
  377. at time.Time
  378. seq uint64
  379. }
  380. // txRecords is sized to hold enough transmission records to keep timings
  381. // for packets up to their timeout. As records age out of the front of this
  382. // list, if the associated packet arrives, we won't have a txRecord for it
  383. // and will consider it to have timed out.
  384. txRecords := make([]txRecord, 0, packetsPerSecond*int(packetTimeout.Seconds()))
  385. var txRecordsMu sync.Mutex
  386. // applyTimeouts walks over txRecords and expires any records that are older
  387. // than packetTimeout, recording in metrics that they were removed.
  388. applyTimeouts := func() {
  389. txRecordsMu.Lock()
  390. defer txRecordsMu.Unlock()
  391. now := time.Now()
  392. recs := txRecords[:0]
  393. for _, r := range txRecords {
  394. if now.Sub(r.at) > packetTimeout {
  395. packetsDropped.Add(1)
  396. } else {
  397. recs = append(recs, r)
  398. }
  399. }
  400. txRecords = recs
  401. }
  402. // Send the packets.
  403. sendErrC := make(chan error, 1)
  404. // TODO: construct a disco CallMeMaybe in the same fashion as magicsock, e.g. magic bytes, src pub, seal payload.
  405. // DERP server handling of disco may vary from non-disco, and we may want to measure queue delay of both.
  406. pkt := make([]byte, 260) // the same size as a CallMeMaybe packet observed on a Tailscale client.
  407. crand.Read(pkt)
  408. wg.Add(1)
  409. go func() {
  410. defer wg.Done()
  411. t := time.NewTicker(time.Second / time.Duration(packetsPerSecond))
  412. defer t.Stop()
  413. toDERPPubKey := toc.SelfPublicKey()
  414. seq := uint64(0)
  415. for {
  416. select {
  417. case <-ctx.Done():
  418. return
  419. case <-t.C:
  420. applyTimeouts()
  421. txRecordsMu.Lock()
  422. if len(txRecords) == cap(txRecords) {
  423. txRecords = slices.Delete(txRecords, 0, 1)
  424. packetsDropped.Add(1)
  425. log.Printf("unexpected: overflow in txRecords")
  426. }
  427. txRecords = append(txRecords, txRecord{time.Now(), seq})
  428. txRecordsMu.Unlock()
  429. binary.BigEndian.PutUint64(pkt, seq)
  430. seq++
  431. if err := fromc.Send(toDERPPubKey, pkt); err != nil {
  432. sendErrC <- fmt.Errorf("sending packet %w", err)
  433. return
  434. }
  435. }
  436. }
  437. }()
  438. // Receive the packets.
  439. recvFinishedC := make(chan error, 1)
  440. wg.Add(1)
  441. go func() {
  442. defer wg.Done()
  443. defer close(recvFinishedC) // to break out of 'select' below.
  444. fromDERPPubKey := fromc.SelfPublicKey()
  445. for {
  446. m, err := toc.Recv()
  447. if err != nil {
  448. recvFinishedC <- err
  449. return
  450. }
  451. switch v := m.(type) {
  452. case derp.ReceivedPacket:
  453. now := time.Now()
  454. if v.Source != fromDERPPubKey {
  455. recvFinishedC <- fmt.Errorf("got data packet from unexpected source, %v", v.Source)
  456. return
  457. }
  458. seq := binary.BigEndian.Uint64(v.Data)
  459. txRecordsMu.Lock()
  460. findTxRecord:
  461. for i, record := range txRecords {
  462. switch {
  463. case record.seq == seq:
  464. rtt := now.Sub(record.at)
  465. qdh.add(rtt.Seconds())
  466. txRecords = slices.Delete(txRecords, i, i+1)
  467. break findTxRecord
  468. case record.seq > seq:
  469. // No sent time found, probably a late arrival already
  470. // recorded as drop by sender when deleted.
  471. break findTxRecord
  472. case record.seq < seq:
  473. continue
  474. }
  475. }
  476. txRecordsMu.Unlock()
  477. case derp.KeepAliveMessage:
  478. // Silently ignore.
  479. default:
  480. log.Printf("%v: ignoring Recv frame type %T", to.Name, v)
  481. // Loop.
  482. }
  483. }
  484. }()
  485. select {
  486. case <-ctx.Done():
  487. return fmt.Errorf("timeout: %w", ctx.Err())
  488. case err := <-sendErrC:
  489. return fmt.Errorf("error sending via %q: %w", from.Name, err)
  490. case err := <-recvFinishedC:
  491. if err != nil {
  492. return fmt.Errorf("error receiving from %q: %w", to.Name, err)
  493. }
  494. }
  495. return nil
  496. }
  497. // getNodePair returns DERPNode objects for two DERP servers based on their
  498. // short names.
  499. func (d *derpProber) getNodePair(n1, n2 string) (ret1, ret2 *tailcfg.DERPNode, _ error) {
  500. d.Lock()
  501. defer d.Unlock()
  502. ret1, ok := d.nodes[n1]
  503. if !ok {
  504. return nil, nil, fmt.Errorf("could not find derp node %s", n1)
  505. }
  506. ret2, ok = d.nodes[n2]
  507. if !ok {
  508. return nil, nil, fmt.Errorf("could not find derp node %s", n2)
  509. }
  510. return ret1, ret2, nil
  511. }
  512. var tsLocalClient local.Client
  513. // updateMap refreshes the locally-cached DERP map.
  514. func (d *derpProber) updateMap(ctx context.Context) error {
  515. var dm *tailcfg.DERPMap
  516. if d.derpMapURL == "local" {
  517. var err error
  518. dm, err = tsLocalClient.CurrentDERPMap(ctx)
  519. if err != nil {
  520. return err
  521. }
  522. } else {
  523. req, err := http.NewRequestWithContext(ctx, "GET", d.derpMapURL, nil)
  524. if err != nil {
  525. return err
  526. }
  527. res, err := httpOrFileClient.Do(req)
  528. if err != nil {
  529. d.Lock()
  530. defer d.Unlock()
  531. if d.lastDERPMap != nil && time.Since(d.lastDERPMapAt) < 10*time.Minute {
  532. log.Printf("Error while fetching DERP map, using cached one: %s", err)
  533. // Assume that control is restarting and use
  534. // the same one for a bit.
  535. return nil
  536. }
  537. return err
  538. }
  539. defer res.Body.Close()
  540. if res.StatusCode != 200 {
  541. return fmt.Errorf("fetching %s: %s", d.derpMapURL, res.Status)
  542. }
  543. dm = new(tailcfg.DERPMap)
  544. if err := json.NewDecoder(res.Body).Decode(dm); err != nil {
  545. return fmt.Errorf("decoding %s JSON: %v", d.derpMapURL, err)
  546. }
  547. }
  548. d.Lock()
  549. defer d.Unlock()
  550. d.lastDERPMap = dm
  551. d.lastDERPMapAt = time.Now()
  552. d.nodes = make(map[string]*tailcfg.DERPNode)
  553. for _, reg := range d.lastDERPMap.Regions {
  554. if d.skipRegion(reg) {
  555. continue
  556. }
  557. for _, n := range reg.Nodes {
  558. if existing, ok := d.nodes[n.Name]; ok {
  559. return fmt.Errorf("derpmap has duplicate nodes: %+v and %+v", existing, n)
  560. }
  561. // Allow the prober to monitor nodes marked as
  562. // STUN only in the default map
  563. n.STUNOnly = false
  564. d.nodes[n.Name] = n
  565. }
  566. }
  567. return nil
  568. }
  569. func (d *derpProber) ProbeUDP(ipaddr string, port int) ProbeClass {
  570. initLabels := make(Labels)
  571. ip := net.ParseIP(ipaddr)
  572. if ip.To4() != nil {
  573. initLabels["address_family"] = "ipv4"
  574. } else if ip.To16() != nil { // Will return an IPv4 as 16 byte, so ensure the check for IPv4 precedes this
  575. initLabels["address_family"] = "ipv6"
  576. } else {
  577. initLabels["address_family"] = "unknown"
  578. }
  579. return ProbeClass{
  580. Probe: func(ctx context.Context) error {
  581. return derpProbeUDP(ctx, ipaddr, port)
  582. },
  583. Class: "derp_udp",
  584. Labels: initLabels,
  585. }
  586. }
  587. func (d *derpProber) skipRegion(region *tailcfg.DERPRegion) bool {
  588. return d.regionCodeOrID != "" && region.RegionCode != d.regionCodeOrID && strconv.Itoa(region.RegionID) != d.regionCodeOrID
  589. }
  590. func derpProbeUDP(ctx context.Context, ipStr string, port int) error {
  591. pc, err := net.ListenPacket("udp", ":0")
  592. if err != nil {
  593. return err
  594. }
  595. defer pc.Close()
  596. uc := pc.(*net.UDPConn)
  597. tx := stun.NewTxID()
  598. req := stun.Request(tx)
  599. if port == 0 {
  600. port = 3478
  601. }
  602. for {
  603. ip := net.ParseIP(ipStr)
  604. _, err := uc.WriteToUDP(req, &net.UDPAddr{IP: ip, Port: port})
  605. if err != nil {
  606. return err
  607. }
  608. // Binding requests and responses are fairly small (~40 bytes),
  609. // but in practice a STUN response can be up to the size of the
  610. // path MTU, so we use a jumbo frame size buffer here.
  611. buf := make([]byte, 9000)
  612. uc.SetReadDeadline(time.Now().Add(2 * time.Second))
  613. t0 := time.Now()
  614. n, _, err := uc.ReadFromUDP(buf)
  615. d := time.Since(t0)
  616. if err != nil {
  617. if ctx.Err() != nil {
  618. return fmt.Errorf("timeout reading from %v: %v", ip, err)
  619. }
  620. if d < time.Second {
  621. return fmt.Errorf("error reading from %v: %v", ip, err)
  622. }
  623. time.Sleep(100 * time.Millisecond)
  624. continue
  625. }
  626. txBack, _, err := stun.ParseResponse(buf[:n])
  627. if err != nil {
  628. return fmt.Errorf("parsing STUN response from %v: %v", ip, err)
  629. }
  630. if txBack != tx {
  631. return fmt.Errorf("read wrong tx back from %v", ip)
  632. }
  633. break
  634. }
  635. return nil
  636. }
  637. // derpProbeBandwidth sends a payload of a given size between two local
  638. // DERP clients connected to two DERP servers.If tunIPv4Address is specified,
  639. // probes will use a TCP connection over a TUN device at this address in order
  640. // to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP.
  641. func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTimeSeconds, totalBytesTransferred *expvar.Float, tunIPv4Prefix *netip.Prefix, meshKey key.DERPMesh) (err error) {
  642. // This probe uses clients with isProber=false to avoid spamming the derper logs with every packet
  643. // sent by the bandwidth probe.
  644. fromc, err := newConn(ctx, dm, from, false, meshKey)
  645. if err != nil {
  646. return err
  647. }
  648. defer fromc.Close()
  649. toc, err := newConn(ctx, dm, to, false, meshKey)
  650. if err != nil {
  651. return err
  652. }
  653. defer toc.Close()
  654. // Wait a bit for from's node to hear about to existing on the
  655. // other node in the region, in the case where the two nodes
  656. // are different.
  657. if from.Name != to.Name {
  658. time.Sleep(100 * time.Millisecond) // pretty arbitrary
  659. }
  660. if tunIPv4Prefix != nil {
  661. err = derpProbeBandwidthTUN(ctx, transferTimeSeconds, totalBytesTransferred, from, to, fromc, toc, size, tunIPv4Prefix)
  662. } else {
  663. err = derpProbeBandwidthDirect(ctx, transferTimeSeconds, from, to, fromc, toc, size)
  664. }
  665. if err != nil {
  666. // Record pubkeys on failed probes to aid investigation.
  667. return fmt.Errorf("%s -> %s: %w",
  668. fromc.SelfPublicKey().ShortString(),
  669. toc.SelfPublicKey().ShortString(), err)
  670. }
  671. return nil
  672. }
  673. // derpProbeNodePair sends a small packet between two local DERP clients
  674. // connected to two DERP servers.
  675. func derpProbeNodePair(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, meshKey key.DERPMesh) (err error) {
  676. fromc, err := newConn(ctx, dm, from, true, meshKey)
  677. if err != nil {
  678. return err
  679. }
  680. defer fromc.Close()
  681. toc, err := newConn(ctx, dm, to, true, meshKey)
  682. if err != nil {
  683. return err
  684. }
  685. defer toc.Close()
  686. // Wait a bit for from's node to hear about to existing on the
  687. // other node in the region, in the case where the two nodes
  688. // are different.
  689. if from.Name != to.Name {
  690. time.Sleep(100 * time.Millisecond) // pretty arbitrary
  691. }
  692. const meshProbePacketSize = 8
  693. if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, meshProbePacketSize); err != nil {
  694. // Record pubkeys on failed probes to aid investigation.
  695. return fmt.Errorf("%s -> %s: %w",
  696. fromc.SelfPublicKey().ShortString(),
  697. toc.SelfPublicKey().ShortString(), err)
  698. }
  699. return nil
  700. }
  701. // probePackets stores a pregenerated slice of probe packets keyed by their total size.
  702. var probePackets syncs.Map[int64, [][]byte]
  703. // packetsForSize returns a slice of packet payloads with a given total size.
  704. func packetsForSize(size int64) [][]byte {
  705. // For a small payload, create a unique random packet.
  706. if size <= derp.MaxPacketSize {
  707. pkt := make([]byte, size)
  708. crand.Read(pkt)
  709. return [][]byte{pkt}
  710. }
  711. // For a large payload, create a bunch of packets once and re-use them
  712. // across probes.
  713. pkts, _ := probePackets.LoadOrInit(size, func() [][]byte {
  714. const packetSize = derp.MaxPacketSize
  715. var pkts [][]byte
  716. for remaining := size; remaining > 0; remaining -= packetSize {
  717. pkt := make([]byte, min(remaining, packetSize))
  718. crand.Read(pkt)
  719. pkts = append(pkts, pkt)
  720. }
  721. return pkts
  722. })
  723. return pkts
  724. }
  725. // runDerpProbeNodePair takes two DERP clients (fromc and toc) connected to two
  726. // DERP servers (from and to) and sends a test payload of a given size from one
  727. // to another.
  728. func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64) error {
  729. // To avoid derper dropping enqueued packets, limit the number of packets in flight.
  730. // The value here is slightly smaller than perClientSendQueueDepth in derp_server.go
  731. inFlight := syncs.NewSemaphore(30)
  732. pkts := packetsForSize(size)
  733. // Send the packets.
  734. sendc := make(chan error, 1)
  735. go func() {
  736. toDERPPubKey := toc.SelfPublicKey()
  737. for idx, pkt := range pkts {
  738. inFlight.AcquireContext(ctx)
  739. if err := fromc.Send(toDERPPubKey, pkt); err != nil {
  740. sendc <- fmt.Errorf("sending packet %d: %w", idx, err)
  741. return
  742. }
  743. }
  744. }()
  745. // Receive the packets.
  746. recvc := make(chan error, 1)
  747. go func() {
  748. defer close(recvc) // to break out of 'select' below.
  749. idx := 0
  750. fromDERPPubKey := fromc.SelfPublicKey()
  751. for {
  752. m, err := toc.Recv()
  753. if err != nil {
  754. recvc <- fmt.Errorf("after %d data packets: %w", idx, err)
  755. return
  756. }
  757. switch v := m.(type) {
  758. case derp.ReceivedPacket:
  759. inFlight.Release()
  760. if v.Source != fromDERPPubKey {
  761. recvc <- fmt.Errorf("got data packet %d from unexpected source, %v", idx, v.Source)
  762. return
  763. }
  764. // This assumes that the packets are received reliably and in order.
  765. // The DERP protocol does not guarantee this, but this probe assumes it.
  766. if got, want := v.Data, pkts[idx]; !bytes.Equal(got, want) {
  767. recvc <- fmt.Errorf("unexpected data packet %d (out of %d)", idx, len(pkts))
  768. return
  769. }
  770. idx += 1
  771. if idx == len(pkts) {
  772. return
  773. }
  774. case derp.KeepAliveMessage:
  775. // Silently ignore.
  776. default:
  777. log.Printf("%v: ignoring Recv frame type %T", to.Name, v)
  778. // Loop.
  779. }
  780. }
  781. }()
  782. select {
  783. case <-ctx.Done():
  784. return fmt.Errorf("timeout: %w", ctx.Err())
  785. case err := <-sendc:
  786. if err != nil {
  787. return fmt.Errorf("error sending via %q: %w", from.Name, err)
  788. }
  789. case err := <-recvc:
  790. if err != nil {
  791. return fmt.Errorf("error receiving from %q: %w", to.Name, err)
  792. }
  793. }
  794. return nil
  795. }
  796. // derpProbeBandwidthDirect takes two DERP clients (fromc and toc) connected to two
  797. // DERP servers (from and to) and sends a test payload of a given size from one
  798. // to another using runDerpProbeNodePair. The time taken to finish the transfer is
  799. // recorded in `transferTimeSeconds`.
  800. func derpProbeBandwidthDirect(ctx context.Context, transferTimeSeconds *expvar.Float, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64) error {
  801. start := time.Now()
  802. defer func() { transferTimeSeconds.Add(time.Since(start).Seconds()) }()
  803. return runDerpProbeNodePair(ctx, from, to, fromc, toc, size)
  804. }
  805. // derpProbeBandwidthTUNMu ensures that TUN bandwidth probes don't run concurrently.
  806. // This is necessary to avoid conflicts trying to create the TUN device, and
  807. // it also has the nice benefit of preventing concurrent bandwidth probes from
  808. // influencing each other's results.
  809. //
  810. // This guards derpProbeBandwidthTUN.
  811. var derpProbeBandwidthTUNMu sync.Mutex
  812. // derpProbeBandwidthTUN takes two DERP clients (fromc and toc) connected to two
  813. // DERP servers (from and to) and sends a test payload of a given size from one
  814. // to another over a TUN device at an address at the start of the usable host IP
  815. // range that the given tunAddress lives in. The time taken to finish the transfer
  816. // is recorded in `transferTimeSeconds`.
  817. func derpProbeBandwidthTUN(ctx context.Context, transferTimeSeconds, totalBytesTransferred *expvar.Float, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64, prefix *netip.Prefix) error {
  818. // Make sure all goroutines have finished.
  819. var wg sync.WaitGroup
  820. defer wg.Wait()
  821. // Close the clients to make sure goroutines that are reading/writing from them terminate.
  822. defer fromc.Close()
  823. defer toc.Close()
  824. ipRange := netipx.RangeOfPrefix(*prefix)
  825. // Start of the usable host IP range from the address we have been passed in.
  826. ifAddr := ipRange.From().Next()
  827. // Destination address to dial. This is the next address in the range from
  828. // our ifAddr to ensure that the underlying networking stack is actually being
  829. // utilized instead of being optimized away and treated as a loopback. Packets
  830. // sent to this address will be routed over the TUN.
  831. destinationAddr := ifAddr.Next()
  832. derpProbeBandwidthTUNMu.Lock()
  833. defer derpProbeBandwidthTUNMu.Unlock()
  834. // Temporarily set up a TUN device with which to simulate a real client TCP connection
  835. // tunneling over DERP. Use `tstun.DefaultTUNMTU()` (e.g., 1280) as our MTU as this is
  836. // the minimum safe MTU used by Tailscale.
  837. dev, err := tun.CreateTUN(tunName, int(tstun.DefaultTUNMTU()))
  838. if err != nil {
  839. return fmt.Errorf("failed to create TUN device: %w", err)
  840. }
  841. defer func() {
  842. if err := dev.Close(); err != nil {
  843. log.Printf("failed to close TUN device: %s", err)
  844. }
  845. }()
  846. mtu, err := dev.MTU()
  847. if err != nil {
  848. return fmt.Errorf("failed to get TUN MTU: %w", err)
  849. }
  850. name, err := dev.Name()
  851. if err != nil {
  852. return fmt.Errorf("failed to get device name: %w", err)
  853. }
  854. // Perform platform specific configuration of the TUN device.
  855. err = configureTUN(*prefix, name)
  856. if err != nil {
  857. return fmt.Errorf("failed to configure tun: %w", err)
  858. }
  859. // Depending on platform, we need some space for headers at the front
  860. // of TUN I/O op buffers. The below constant is more than enough space
  861. // for any platform that this might run on.
  862. tunStartOffset := device.MessageTransportHeaderSize
  863. // This goroutine reads packets from the TUN device and evaluates if they
  864. // are IPv4 packets destined for loopback via DERP. If so, it performs L3 NAT
  865. // (swap src/dst) and writes them towards DERP in order to loopback via the
  866. // `toc` DERP client. It only reports errors to `tunReadErrC`.
  867. wg.Add(1)
  868. tunReadErrC := make(chan error, 1)
  869. go func() {
  870. defer wg.Done()
  871. numBufs := wgconn.IdealBatchSize
  872. bufs := make([][]byte, 0, numBufs)
  873. sizes := make([]int, numBufs)
  874. for range numBufs {
  875. bufs = append(bufs, make([]byte, mtu+tunStartOffset))
  876. }
  877. destinationAddrBytes := destinationAddr.AsSlice()
  878. scratch := make([]byte, 4)
  879. toDERPPubKey := toc.SelfPublicKey()
  880. for {
  881. n, err := dev.Read(bufs, sizes, tunStartOffset)
  882. if err != nil {
  883. tunReadErrC <- err
  884. return
  885. }
  886. for i := range n {
  887. pkt := bufs[i][tunStartOffset : sizes[i]+tunStartOffset]
  888. // Skip everything except valid IPv4 packets
  889. if len(pkt) < 20 {
  890. // Doesn't even have a full IPv4 header
  891. continue
  892. }
  893. if pkt[0]>>4 != 4 {
  894. // Not IPv4
  895. continue
  896. }
  897. if !bytes.Equal(pkt[16:20], destinationAddrBytes) {
  898. // Unexpected dst address
  899. continue
  900. }
  901. copy(scratch, pkt[12:16])
  902. copy(pkt[12:16], pkt[16:20])
  903. copy(pkt[16:20], scratch)
  904. if err := fromc.Send(toDERPPubKey, pkt); err != nil {
  905. tunReadErrC <- err
  906. return
  907. }
  908. }
  909. }
  910. }()
  911. // This goroutine reads packets from the `toc` DERP client and writes them towards the TUN.
  912. // It only reports errors to `recvErrC` channel.
  913. wg.Add(1)
  914. recvErrC := make(chan error, 1)
  915. go func() {
  916. defer wg.Done()
  917. buf := make([]byte, mtu+tunStartOffset)
  918. bufs := make([][]byte, 1)
  919. fromDERPPubKey := fromc.SelfPublicKey()
  920. for {
  921. m, err := toc.Recv()
  922. if err != nil {
  923. recvErrC <- fmt.Errorf("failed to receive: %w", err)
  924. return
  925. }
  926. switch v := m.(type) {
  927. case derp.ReceivedPacket:
  928. if v.Source != fromDERPPubKey {
  929. recvErrC <- fmt.Errorf("got data packet from unexpected source, %v", v.Source)
  930. return
  931. }
  932. pkt := v.Data
  933. copy(buf[tunStartOffset:], pkt)
  934. bufs[0] = buf[:len(pkt)+tunStartOffset]
  935. if _, err := dev.Write(bufs, tunStartOffset); err != nil {
  936. recvErrC <- fmt.Errorf("failed to write to TUN device: %w", err)
  937. return
  938. }
  939. case derp.KeepAliveMessage:
  940. // Silently ignore.
  941. default:
  942. log.Printf("%v: ignoring Recv frame type %T", to.Name, v)
  943. // Loop.
  944. }
  945. }
  946. }()
  947. // Start a listener to receive the data
  948. ln, err := net.Listen("tcp", net.JoinHostPort(ifAddr.String(), "0"))
  949. if err != nil {
  950. return fmt.Errorf("failed to listen: %s", err)
  951. }
  952. defer ln.Close()
  953. // 128KB by default
  954. const writeChunkSize = 128 << 10
  955. randData := make([]byte, writeChunkSize)
  956. _, err = crand.Read(randData)
  957. if err != nil {
  958. return fmt.Errorf("failed to initialize random data: %w", err)
  959. }
  960. // Dial ourselves
  961. _, port, err := net.SplitHostPort(ln.Addr().String())
  962. if err != nil {
  963. return fmt.Errorf("failed to split address %q: %w", ln.Addr().String(), err)
  964. }
  965. connAddr := net.JoinHostPort(destinationAddr.String(), port)
  966. conn, err := net.Dial("tcp", connAddr)
  967. if err != nil {
  968. return fmt.Errorf("failed to dial address %q: %w", connAddr, err)
  969. }
  970. defer conn.Close()
  971. // Timing only includes the actual sending and receiving of data.
  972. start := time.Now()
  973. // This goroutine reads data from the TCP stream being looped back via DERP.
  974. // It reports to `readFinishedC` when `size` bytes have been read, or if an
  975. // error occurs.
  976. wg.Add(1)
  977. readFinishedC := make(chan error, 1)
  978. go func() {
  979. defer wg.Done()
  980. readConn, err := ln.Accept()
  981. if err != nil {
  982. readFinishedC <- err
  983. return
  984. }
  985. defer readConn.Close()
  986. deadline, ok := ctx.Deadline()
  987. if ok {
  988. // Don't try reading past our context's deadline.
  989. if err := readConn.SetReadDeadline(deadline); err != nil {
  990. readFinishedC <- fmt.Errorf("unable to set read deadline: %w", err)
  991. return
  992. }
  993. }
  994. n, err := io.CopyN(io.Discard, readConn, size)
  995. // Measure transfer time and bytes transferred irrespective of whether it succeeded or failed.
  996. transferTimeSeconds.Add(time.Since(start).Seconds())
  997. totalBytesTransferred.Add(float64(n))
  998. readFinishedC <- err
  999. }()
  1000. // This goroutine sends data to the TCP stream being looped back via DERP.
  1001. // It only reports errors to `sendErrC`.
  1002. wg.Add(1)
  1003. sendErrC := make(chan error, 1)
  1004. go func() {
  1005. defer wg.Done()
  1006. for wrote := 0; wrote < int(size); wrote += len(randData) {
  1007. b := randData
  1008. if wrote+len(randData) > int(size) {
  1009. // This is the last chunk and we don't need the whole thing
  1010. b = b[0 : int(size)-wrote]
  1011. }
  1012. if _, err := conn.Write(b); err != nil {
  1013. sendErrC <- fmt.Errorf("failed to write to conn: %w", err)
  1014. return
  1015. }
  1016. }
  1017. }()
  1018. select {
  1019. case <-ctx.Done():
  1020. return fmt.Errorf("timeout: %w", ctx.Err())
  1021. case err := <-tunReadErrC:
  1022. return fmt.Errorf("error reading from TUN via %q: %w", from.Name, err)
  1023. case err := <-sendErrC:
  1024. return fmt.Errorf("error sending via %q: %w", from.Name, err)
  1025. case err := <-recvErrC:
  1026. return fmt.Errorf("error receiving from %q: %w", to.Name, err)
  1027. case err := <-readFinishedC:
  1028. if err != nil {
  1029. return fmt.Errorf("error reading from %q to TUN: %w", to.Name, err)
  1030. }
  1031. }
  1032. return nil
  1033. }
  1034. func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode, isProber bool, meshKey key.DERPMesh) (*derphttp.Client, error) {
  1035. // To avoid spamming the log with regular connection messages.
  1036. logf := logger.Filtered(log.Printf, func(s string) bool {
  1037. return !strings.Contains(s, "derphttp.Client.Connect: connecting to")
  1038. })
  1039. priv := key.NewNode()
  1040. dc := derphttp.NewRegionClient(priv, logf, netmon.NewStatic(), func() *tailcfg.DERPRegion {
  1041. rid := n.RegionID
  1042. return &tailcfg.DERPRegion{
  1043. RegionID: rid,
  1044. RegionCode: fmt.Sprintf("%s-%s", dm.Regions[rid].RegionCode, n.Name),
  1045. RegionName: dm.Regions[rid].RegionName,
  1046. Nodes: []*tailcfg.DERPNode{n},
  1047. }
  1048. })
  1049. dc.IsProber = isProber
  1050. dc.MeshKey = meshKey
  1051. err := dc.Connect(ctx)
  1052. if err != nil {
  1053. return nil, err
  1054. }
  1055. // Only verify TLS state if this is a prober.
  1056. if isProber {
  1057. cs, ok := dc.TLSConnectionState()
  1058. if !ok {
  1059. dc.Close()
  1060. return nil, errors.New("no TLS state")
  1061. }
  1062. if len(cs.PeerCertificates) == 0 {
  1063. dc.Close()
  1064. return nil, errors.New("no peer certificates")
  1065. }
  1066. if cs.ServerName != n.HostName {
  1067. dc.Close()
  1068. return nil, fmt.Errorf("TLS server name %q != derp hostname %q", cs.ServerName, n.HostName)
  1069. }
  1070. }
  1071. errc := make(chan error, 1)
  1072. go func() {
  1073. m, err := dc.Recv()
  1074. if err != nil {
  1075. errc <- err
  1076. return
  1077. }
  1078. switch m.(type) {
  1079. case derp.ServerInfoMessage:
  1080. errc <- nil
  1081. default:
  1082. errc <- fmt.Errorf("unexpected first message type %T", m)
  1083. }
  1084. }()
  1085. select {
  1086. case err := <-errc:
  1087. if err != nil {
  1088. go dc.Close()
  1089. return nil, err
  1090. }
  1091. case <-ctx.Done():
  1092. go dc.Close()
  1093. return nil, fmt.Errorf("timeout waiting for ServerInfoMessage: %w", ctx.Err())
  1094. }
  1095. return dc, nil
  1096. }
  1097. var httpOrFileClient = &http.Client{Transport: httpOrFileTransport()}
  1098. func httpOrFileTransport() http.RoundTripper {
  1099. tr := http.DefaultTransport.(*http.Transport).Clone()
  1100. tr.RegisterProtocol("file", http.NewFileTransport(http.Dir("/")))
  1101. return tr
  1102. }