Răsfoiți Sursa

util/clientmetrics: add new package to add metrics to the client

And annotate magicsock as a start.

And add localapi and debug handlers with the Prometheus-format
exporter.

Updates #3307

Change-Id: I47c5d535fe54424741df143d052760387248f8d3
Signed-off-by: Brad Fitzpatrick <[email protected]>
Brad Fitzpatrick 5 ani în urmă
părinte
comite
57b039c51d

+ 1 - 0
cmd/tailscaled/depaware.txt

@@ -221,6 +221,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
         tailscale.com/types/persist                                  from tailscale.com/control/controlclient+
         tailscale.com/types/preftype                                 from tailscale.com/ipn+
         tailscale.com/types/structs                                  from tailscale.com/control/controlclient+
+        tailscale.com/util/clientmetric                              from tailscale.com/ipn/localapi+
    L    tailscale.com/util/cmpver                                    from tailscale.com/net/dns
      💣 tailscale.com/util/deephash                                  from tailscale.com/ipn/ipnlocal+
         tailscale.com/util/dnsname                                   from tailscale.com/hostinfo+

+ 7 - 0
cmd/tailscaled/tailscaled.go

@@ -39,6 +39,7 @@ import (
 	"tailscale.com/safesocket"
 	"tailscale.com/types/flagtype"
 	"tailscale.com/types/logger"
+	"tailscale.com/util/clientmetric"
 	"tailscale.com/util/multierr"
 	"tailscale.com/util/osshare"
 	"tailscale.com/version"
@@ -478,6 +479,7 @@ func tryEngine(logf logger.Logf, linkMon *monitor.Mon, name string) (e wgengine.
 
 func newDebugMux() *http.ServeMux {
 	mux := http.NewServeMux()
+	mux.HandleFunc("/debug/metrics", servePrometheusMetrics)
 	mux.HandleFunc("/debug/pprof/", pprof.Index)
 	mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
 	mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
@@ -486,6 +488,11 @@ func newDebugMux() *http.ServeMux {
 	return mux
 }
 
+func servePrometheusMetrics(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Content-Type", "text/plain")
+	clientmetric.WritePrometheusExpositionFormat(w)
+}
+
 func runDebugServer(mux *http.ServeMux, addr string) {
 	srv := &http.Server{
 		Addr:    addr,

+ 14 - 0
ipn/localapi/localapi.go

@@ -31,6 +31,7 @@ import (
 	"tailscale.com/net/netknob"
 	"tailscale.com/tailcfg"
 	"tailscale.com/types/logger"
+	"tailscale.com/util/clientmetric"
 	"tailscale.com/version"
 )
 
@@ -113,6 +114,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		h.serveSetDNS(w, r)
 	case "/localapi/v0/derpmap":
 		h.serveDERPMap(w, r)
+	case "/localapi/v0/metrics":
+		h.serveMetrics(w, r)
 	case "/":
 		io.WriteString(w, "tailscaled\n")
 	default:
@@ -184,6 +187,17 @@ func (h *Handler) serveGoroutines(w http.ResponseWriter, r *http.Request) {
 	w.Write(buf)
 }
 
+func (h *Handler) serveMetrics(w http.ResponseWriter, r *http.Request) {
+	// Require write access out of paranoia that the metrics
+	// might contain something sensitive.
+	if !h.PermitWrite {
+		http.Error(w, "metric access denied", http.StatusForbidden)
+		return
+	}
+	w.Header().Set("Content-Type", "text/plain")
+	clientmetric.WritePrometheusExpositionFormat(w)
+}
+
 // serveProfileFunc is the implementation of Handler.serveProfile, after auth,
 // for platforms where we want to link it in.
 var serveProfileFunc func(http.ResponseWriter, *http.Request)

+ 1 - 0
tstest/integration/tailscaled_deps_test_darwin.go

@@ -30,6 +30,7 @@ import (
 	_ "tailscale.com/types/flagtype"
 	_ "tailscale.com/types/key"
 	_ "tailscale.com/types/logger"
+	_ "tailscale.com/util/clientmetric"
 	_ "tailscale.com/util/multierr"
 	_ "tailscale.com/util/osshare"
 	_ "tailscale.com/version"

+ 1 - 0
tstest/integration/tailscaled_deps_test_freebsd.go

@@ -30,6 +30,7 @@ import (
 	_ "tailscale.com/types/flagtype"
 	_ "tailscale.com/types/key"
 	_ "tailscale.com/types/logger"
+	_ "tailscale.com/util/clientmetric"
 	_ "tailscale.com/util/multierr"
 	_ "tailscale.com/util/osshare"
 	_ "tailscale.com/version"

+ 1 - 0
tstest/integration/tailscaled_deps_test_linux.go

@@ -30,6 +30,7 @@ import (
 	_ "tailscale.com/types/flagtype"
 	_ "tailscale.com/types/key"
 	_ "tailscale.com/types/logger"
+	_ "tailscale.com/util/clientmetric"
 	_ "tailscale.com/util/multierr"
 	_ "tailscale.com/util/osshare"
 	_ "tailscale.com/version"

+ 1 - 0
tstest/integration/tailscaled_deps_test_openbsd.go

@@ -30,6 +30,7 @@ import (
 	_ "tailscale.com/types/flagtype"
 	_ "tailscale.com/types/key"
 	_ "tailscale.com/types/logger"
+	_ "tailscale.com/util/clientmetric"
 	_ "tailscale.com/util/multierr"
 	_ "tailscale.com/util/osshare"
 	_ "tailscale.com/version"

+ 1 - 0
tstest/integration/tailscaled_deps_test_windows.go

@@ -34,6 +34,7 @@ import (
 	_ "tailscale.com/types/flagtype"
 	_ "tailscale.com/types/key"
 	_ "tailscale.com/types/logger"
+	_ "tailscale.com/util/clientmetric"
 	_ "tailscale.com/util/multierr"
 	_ "tailscale.com/util/osshare"
 	_ "tailscale.com/util/winutil"

+ 135 - 0
util/clientmetric/clientmetric.go

@@ -0,0 +1,135 @@
+// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package clientmetric provides client-side metrics whose values
+// get occasionally logged.
+package clientmetric
+
+import (
+	"fmt"
+	"io"
+	"sort"
+	"sync"
+	"sync/atomic"
+)
+
+var (
+	mu          sync.Mutex
+	metrics     = map[string]*Metric{}
+	sortedDirty bool
+	sorted      []*Metric
+)
+
+// Type is a metric type: counter or gauge.
+type Type uint8
+
+const (
+	TypeGauge Type = iota
+	TypeCounter
+)
+
+// Metric is an integer metric value that's tracked over time.
+//
+// It's safe for concurrent use.
+type Metric struct {
+	v    int64 // atomic; the metric value
+	name string
+
+	lastLogv int64 // v atomic, epoch seconds
+	lastLog  int64 // atomic, epoch seconds
+	logSec   int   // log every N seconds max
+	typ      Type
+}
+
+func (m *Metric) Name() string { return m.name }
+func (m *Metric) Value() int64 { return atomic.LoadInt64(&m.v) }
+func (m *Metric) Type() Type   { return m.typ }
+
+// Add increments m's value by n.
+//
+// If m is of type counter, n should not be negative.
+func (m *Metric) Add(n int64) {
+	atomic.AddInt64(&m.v, n)
+}
+
+// Set sets m's value to v.
+//
+// If m is of type counter, Set should not be used.
+func (m *Metric) Set(v int64) {
+	atomic.StoreInt64(&m.v, v)
+}
+
+// Publish registers a metric in the global map.
+// It panics if the name is a duplicate anywhere in the process.
+func (m *Metric) Publish() {
+	mu.Lock()
+	defer mu.Unlock()
+	if m.name == "" {
+		panic("unnamed Metric")
+	}
+	if _, dup := metrics[m.name]; dup {
+		panic("duplicate metric " + m.name)
+	}
+	metrics[m.name] = m
+	sortedDirty = true
+}
+
+// Metrics returns the sorted list of metrics.
+//
+// The returned slice should not be mutated.
+func Metrics() []*Metric {
+	mu.Lock()
+	defer mu.Unlock()
+	if sortedDirty {
+		sortedDirty = false
+		sorted = make([]*Metric, 0, len(metrics))
+		for _, m := range metrics {
+			sorted = append(sorted, m)
+		}
+		sort.Slice(sorted, func(i, j int) bool {
+			return sorted[i].name < sorted[j].name
+		})
+	}
+	return sorted
+}
+
+// NewUnpublished initializes a new Metric without calling Publish on
+// it.
+func NewUnpublished(name string, typ Type) *Metric {
+	return &Metric{
+		name:   name,
+		typ:    typ,
+		logSec: 10,
+	}
+}
+
+// NewCounter returns a new metric that can only increment.
+func NewCounter(name string) *Metric {
+	m := NewUnpublished(name, TypeCounter)
+	m.Publish()
+	return m
+}
+
+// NewGauge returns a new metric that can both increment and decrement.
+func NewGauge(name string) *Metric {
+	m := NewUnpublished(name, TypeGauge)
+	m.Publish()
+	return m
+}
+
+// WritePrometheusExpositionFormat writes all client metrics to w in
+// the Prometheus text-based exposition format.
+//
+// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md
+func WritePrometheusExpositionFormat(w io.Writer) {
+	for _, m := range Metrics() {
+		switch m.Type() {
+		case TypeGauge:
+			fmt.Fprintf(w, "# TYPE %s gauge\n", m.Name())
+		case TypeCounter:
+			fmt.Fprintf(w, "# TYPE %s counter\n", m.Name())
+		}
+		fmt.Fprintf(w, "%s %v\n", m.Name(), m.Value())
+	}
+}

+ 86 - 4
wgengine/magicsock/magicsock.go

@@ -51,6 +51,7 @@ import (
 	"tailscale.com/types/logger"
 	"tailscale.com/types/netmap"
 	"tailscale.com/types/nettype"
+	"tailscale.com/util/clientmetric"
 	"tailscale.com/util/uniq"
 	"tailscale.com/version"
 	"tailscale.com/wgengine/monitor"
@@ -1167,7 +1168,9 @@ var errNetworkDown = errors.New("magicsock: network down")
 func (c *Conn) networkDown() bool { return !c.networkUp.Get() }
 
 func (c *Conn) Send(b []byte, ep conn.Endpoint) error {
+	metricSendData.Add(1)
 	if c.networkDown() {
+		metricSendDataNetworkDown.Add(1)
 		return errNetworkDown
 	}
 	return ep.(*endpoint).send(b)
@@ -1191,9 +1194,14 @@ func (c *Conn) sendUDP(ipp netaddr.IPPort, b []byte) (sent bool, err error) {
 	}
 	ua := udpAddrPool.Get().(*net.UDPAddr)
 	sent, err = c.sendUDPStd(ipp.UDPAddrAt(ua), b)
-	if err == nil {
+	if err != nil {
+		metricSendUDPError.Add(1)
+	} else {
 		// Only return it to the pool on success; Issue 3122.
 		udpAddrPool.Put(ua)
+		if sent {
+			metricSendUDP.Add(1)
+		}
 	}
 	return
 }
@@ -1239,6 +1247,7 @@ func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.NodePublic, b []byte) (s
 
 	ch := c.derpWriteChanOfAddr(addr, pubKey)
 	if ch == nil {
+		metricSendDERPErrorChan.Add(1)
 		return false, nil
 	}
 
@@ -1252,10 +1261,13 @@ func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.NodePublic, b []byte) (s
 
 	select {
 	case <-c.donec:
+		metricSendDERPErrorClosed.Add(1)
 		return false, errConnClosed
 	case ch <- derpWriteRequest{addr, pubKey, pkt}:
+		metricSendDERPQueued.Add(1)
 		return true, nil
 	default:
+		metricSendDERPErrorQueue.Add(1)
 		// Too many writes queued. Drop packet.
 		return false, errDropDerpPacket
 	}
@@ -1367,6 +1379,7 @@ func (c *Conn) derpWriteChanOfAddr(addr netaddr.IPPort, peer key.NodePublic) cha
 	*ad.lastWrite = time.Now()
 	ad.createTime = time.Now()
 	c.activeDerp[regionID] = ad
+	metricNumDERPConns.Set(int64(len(c.activeDerp)))
 	c.logActiveDerpLocked()
 	c.setPeerLastDerpLocked(peer, regionID, regionID)
 	c.scheduleCleanStaleDerpLocked()
@@ -1618,6 +1631,7 @@ func (c *Conn) receiveIPv6(b []byte) (int, conn.Endpoint, error) {
 			return 0, nil, err
 		}
 		if ep, ok := c.receiveIP(b[:n], ipp, &c.ippEndpoint6); ok {
+			metricRecvDataIPv6.Add(1)
 			return n, ep, nil
 		}
 	}
@@ -1633,6 +1647,7 @@ func (c *Conn) receiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
 			return 0, nil, err
 		}
 		if ep, ok := c.receiveIP(b[:n], ipp, &c.ippEndpoint4); ok {
+			metricRecvDataIPv4.Add(1)
 			return n, ep, nil
 		}
 	}
@@ -1691,6 +1706,7 @@ func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) {
 			// No data read occurred. Wait for another packet.
 			continue
 		}
+		metricRecvDataDERP.Add(1)
 		return n, ep, nil
 	}
 	return 0, nil, net.ErrClosed
@@ -1762,6 +1778,13 @@ func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.NodePublic, dstDi
 	di := c.discoInfoLocked(dstDisco)
 	c.mu.Unlock()
 
+	isDERP := dst.IP() == derpMagicIPAddr
+	if isDERP {
+		metricSendDiscoDERP.Add(1)
+	} else {
+		metricSendDiscoUDP.Add(1)
+	}
+
 	box := di.sharedKey.Seal(m.AppendMarshal(nil))
 	pkt = append(pkt, box...)
 	sent, err = c.sendAddr(dst, dstKey, pkt)
@@ -1773,6 +1796,11 @@ func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.NodePublic, dstDi
 			}
 			c.logf("[v1] magicsock: disco: %v->%v (%v, %v) sent %v", c.discoShort, dstDisco.ShortString(), node, derpStr(dst.String()), disco.MessageSummary(m))
 		}
+		if isDERP {
+			metricSentDiscoDERP.Add(1)
+		} else {
+			metricSentDiscoUDP.Add(1)
+		}
 	} else if err == nil {
 		// Can't send. (e.g. no IPv6 locally)
 	} else {
@@ -1833,6 +1861,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort, derpNodeSrc ke
 	}
 
 	if !c.peerMap.anyEndpointForDiscoKey(sender) {
+		metricRecvDiscoBadPeer.Add(1)
 		if debugDisco {
 			c.logf("magicsock: disco: ignoring disco-looking frame, don't know endpoint for %v", sender.ShortString())
 		}
@@ -1860,7 +1889,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort, derpNodeSrc ke
 		if debugDisco {
 			c.logf("magicsock: disco: failed to open naclbox from %v (wrong rcpt?)", sender)
 		}
-		// TODO(bradfitz): add some counter for this that logs rarely
+		metricRecvDiscoBadKey.Add(1)
 		return
 	}
 
@@ -1874,14 +1903,23 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort, derpNodeSrc ke
 		// newer version of Tailscale that we don't
 		// understand. Not even worth logging about, lest it
 		// be too spammy for old clients.
-		// TODO(bradfitz): add some counter for this that logs rarely
+		metricRecvDiscoBadParse.Add(1)
 		return
 	}
 
+	isDERP := src.IP() == derpMagicIPAddr
+	if isDERP {
+		metricRecvDiscoDERP.Add(1)
+	} else {
+		metricRecvDiscoUDP.Add(1)
+	}
+
 	switch dm := dm.(type) {
 	case *disco.Ping:
+		metricRecvDiscoPing.Add(1)
 		c.handlePingLocked(dm, src, di, derpNodeSrc)
 	case *disco.Pong:
+		metricRecvDiscoPong.Add(1)
 		// There might be multiple nodes for the sender's DiscoKey.
 		// Ask each to handle it, stopping once one reports that
 		// the Pong's TxID was theirs.
@@ -1892,7 +1930,8 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort, derpNodeSrc ke
 			}
 		})
 	case *disco.CallMeMaybe:
-		if src.IP() != derpMagicIPAddr || derpNodeSrc.IsZero() {
+		metricRecvDiscoCallMeMaybe.Add(1)
+		if !isDERP || derpNodeSrc.IsZero() {
 			// CallMeMaybe messages should only come via DERP.
 			c.logf("[unexpected] CallMeMaybe packets should only come via DERP")
 			return
@@ -1900,6 +1939,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort, derpNodeSrc ke
 		nodeKey := derpNodeSrc
 		ep, ok := c.peerMap.endpointForNodeKey(nodeKey)
 		if !ok {
+			metricRecvDiscoCallMeMaybeBadNode.Add(1)
 			c.logf("magicsock: disco: ignoring CallMeMaybe from %v; %v is unknown", sender.ShortString(), derpNodeSrc.ShortString())
 			return
 		}
@@ -1907,6 +1947,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort, derpNodeSrc ke
 			return
 		}
 		if ep.discoKey != di.discoKey {
+			metricRecvDiscoCallMeMaybeBadDisco.Add(1)
 			c.logf("[unexpected] CallMeMaybe from peer via DERP whose netmap discokey != disco source")
 			return
 		}
@@ -2244,6 +2285,8 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
 		}
 	}
 
+	metricNumPeers.Set(int64(len(nm.Peers)))
+
 	c.logf("[v1] magicsock: got updated network map; %d peers", len(nm.Peers))
 	if numNoDisco != 0 {
 		c.logf("[v1] magicsock: %d DERP-only peers (no discokey)", numNoDisco)
@@ -2347,6 +2390,7 @@ func (c *Conn) closeDerpLocked(node int, why string) {
 		go ad.c.Close()
 		ad.cancel()
 		delete(c.activeDerp, node)
+		metricNumDERPConns.Set(int64(len(c.activeDerp)))
 	}
 }
 
@@ -3998,3 +4042,41 @@ func (di *discoInfo) setNodeKey(nk key.NodePublic) {
 	di.lastNodeKey = nk
 	di.lastNodeKeyTime = time.Now()
 }
+
+var (
+	metricNumPeers     = clientmetric.NewGauge("magicsock_netmap_num_peers")
+	metricNumDERPConns = clientmetric.NewGauge("magicsock_num_derp_conns")
+
+	// Sends (data or disco)
+	metricSendDERPQueued      = clientmetric.NewCounter("magicsock_send_derp_queued")
+	metricSendDERPErrorChan   = clientmetric.NewCounter("magicsock_send_derp_error_chan")
+	metricSendDERPErrorClosed = clientmetric.NewCounter("magicsock_send_derp_error_closed")
+	metricSendDERPErrorQueue  = clientmetric.NewCounter("magicsock_send_derp_error_queue")
+	metricSendUDP             = clientmetric.NewCounter("magicsock_send_udp")
+	metricSendUDPError        = clientmetric.NewCounter("magicsock_send_udp_error")
+
+	// Data packets (non-disco)
+	metricSendData            = clientmetric.NewCounter("magicsock_send_data")
+	metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down")
+	metricRecvData            = clientmetric.NewCounter("magicsock_recv_data")
+	metricRecvDataDERP        = clientmetric.NewCounter("magicsock_recv_data_derp")
+	metricRecvDataIPv4        = clientmetric.NewCounter("magicsock_recv_data_ipv4")
+	metricRecvDataIPv6        = clientmetric.NewCounter("magicsock_recv_data_ipv6")
+
+	// Disco packets
+	metricSendDiscoUDP      = clientmetric.NewCounter("magicsock_disco_send_udp")
+	metricSendDiscoDERP     = clientmetric.NewCounter("magicsock_disco_send_derp")
+	metricSentDiscoUDP      = clientmetric.NewCounter("magicsock_disco_sent_udp")
+	metricSentDiscoDERP     = clientmetric.NewCounter("magicsock_disco_sent_derp")
+	metricRecvDiscoBadPeer  = clientmetric.NewCounter("magicsock_disco_recv_bad_peer")
+	metricRecvDiscoBadKey   = clientmetric.NewCounter("magicsock_disco_recv_bad_key")
+	metricRecvDiscoBadParse = clientmetric.NewCounter("magicsock_disco_recv_bad_parse")
+
+	metricRecvDiscoUDP                 = clientmetric.NewCounter("magicsock_disco_recv_udp")
+	metricRecvDiscoDERP                = clientmetric.NewCounter("magicsock_disco_recv_derp")
+	metricRecvDiscoPing                = clientmetric.NewCounter("magicsock_disco_recv_ping")
+	metricRecvDiscoPong                = clientmetric.NewCounter("magicsock_disco_recv_pong")
+	metricRecvDiscoCallMeMaybe         = clientmetric.NewCounter("magicsock_disco_recv_callmemaybe")
+	metricRecvDiscoCallMeMaybeBadNode  = clientmetric.NewCounter("magicsock_disco_recv_callmemaybe_bad_node")
+	metricRecvDiscoCallMeMaybeBadDisco = clientmetric.NewCounter("magicsock_disco_recv_callmemaybe_bad_disco")
+)