Просмотр исходного кода

derp: add debug traffic handler

This adds a handler on the DERP server for logging bytes send and received by clients of the
server, by holding open a connection and recording if there is a difference between the number
of bytes sent and received. It sends a JSON marshalled object if there is an increase in the
number of bytes.

Signed-off-by: julianknodt <[email protected]>
julianknodt 4 лет назад
Родитель
Сommit
3728634af9
4 измененных файлов с 147 добавлено и 25 удалено
  1. 1 0
      cmd/derper/derper.go
  2. 127 25
      derp/derp_server.go
  3. 11 0
      derp/derp_test.go
  4. 8 0
      derp/testdata/example_ss.txt

+ 1 - 0
cmd/derper/derper.go

@@ -171,6 +171,7 @@ func main() {
 			io.WriteString(w, "derp.Server ConsistencyCheck okay")
 		}
 	}))
+	debug.Handle("traffic", "Traffic check", http.HandlerFunc(s.ServeDebugTraffic))
 
 	if *runSTUN {
 		go serveSTUN()

+ 127 - 25
derp/derp_server.go

@@ -23,7 +23,9 @@ import (
 	"math"
 	"math/big"
 	"math/rand"
+	"net/http"
 	"os"
+	"os/exec"
 	"runtime"
 	"strconv"
 	"strings"
@@ -34,6 +36,7 @@ import (
 	"go4.org/mem"
 	"golang.org/x/crypto/nacl/box"
 	"golang.org/x/sync/errgroup"
+	"inet.af/netaddr"
 	"tailscale.com/disco"
 	"tailscale.com/metrics"
 	"tailscale.com/types/key"
@@ -141,6 +144,9 @@ type Server struct {
 	// because it includes intra-region forwarded packets as the
 	// src.
 	sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum
+
+	// maps from netaddr.IPPort to a client's public key
+	keyOfAddr map[netaddr.IPPort]key.Public
 }
 
 // PacketForwarder is something that can forward packets.
@@ -186,6 +192,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
 		watchers:             map[*sclient]bool{},
 		sentTo:               map[key.Public]map[key.Public]int64{},
 		avgQueueDuration:     new(uint64),
+		keyOfAddr:            map[netaddr.IPPort]key.Public{},
 	}
 	s.initMetacert()
 	s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
@@ -343,6 +350,7 @@ func (s *Server) registerClient(c *sclient) {
 	if _, ok := s.clientsMesh[c.key]; !ok {
 		s.clientsMesh[c.key] = nil // just for varz of total users in cluster
 	}
+	s.keyOfAddr[c.remoteIPPort] = c.key
 	s.curClients.Add(1)
 	s.broadcastPeerStateChangeLocked(c.key, true)
 }
@@ -377,6 +385,8 @@ func (s *Server) unregisterClient(c *sclient) {
 		delete(s.watchers, c)
 	}
 
+	delete(s.keyOfAddr, c.remoteIPPort)
+
 	s.curClients.Add(-1)
 	if c.preferred {
 		s.curHomeClients.Add(-1)
@@ -450,20 +460,23 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
+	remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr)
+
 	c := &sclient{
-		connNum:     connNum,
-		s:           s,
-		key:         clientKey,
-		nc:          nc,
-		br:          br,
-		bw:          bw,
-		logf:        logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)),
-		done:        ctx.Done(),
-		remoteAddr:  remoteAddr,
-		connectedAt: time.Now(),
-		sendQueue:   make(chan pkt, perClientSendQueueDepth),
-		peerGone:    make(chan key.Public),
-		canMesh:     clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
+		connNum:      connNum,
+		s:            s,
+		key:          clientKey,
+		nc:           nc,
+		br:           br,
+		bw:           bw,
+		logf:         logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)),
+		done:         ctx.Done(),
+		remoteAddr:   remoteAddr,
+		remoteIPPort: remoteIPPort,
+		connectedAt:  time.Now(),
+		sendQueue:    make(chan pkt, perClientSendQueueDepth),
+		peerGone:     make(chan key.Public),
+		canMesh:      clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
 	}
 	if c.canMesh {
 		c.meshUpdate = make(chan struct{})
@@ -892,18 +905,19 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
 // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
 type sclient struct {
 	// Static after construction.
-	connNum    int64 // process-wide unique counter, incremented each Accept
-	s          *Server
-	nc         Conn
-	key        key.Public
-	info       clientInfo
-	logf       logger.Logf
-	done       <-chan struct{} // closed when connection closes
-	remoteAddr string          // usually ip:port from net.Conn.RemoteAddr().String()
-	sendQueue  chan pkt        // packets queued to this client; never closed
-	peerGone   chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
-	meshUpdate chan struct{}   // write request to write peerStateChange
-	canMesh    bool            // clientInfo had correct mesh token for inter-region routing
+	connNum      int64 // process-wide unique counter, incremented each Accept
+	s            *Server
+	nc           Conn
+	key          key.Public
+	info         clientInfo
+	logf         logger.Logf
+	done         <-chan struct{} // closed when connection closes
+	remoteAddr   string          // usually ip:port from net.Conn.RemoteAddr().String()
+	remoteIPPort netaddr.IPPort  // zero if remoteAddr is not ip:port.
+	sendQueue    chan pkt        // packets queued to this client; never closed
+	peerGone     chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
+	meshUpdate   chan struct{}   // write request to write peerStateChange
+	canMesh      bool            // clientInfo had correct mesh token for inter-region routing
 
 	// Owned by run, not thread-safe.
 	br          *bufio.Reader
@@ -1398,3 +1412,91 @@ func writePublicKey(bw *bufio.Writer, key *key.Public) error {
 	}
 	return nil
 }
+
+const minTimeBetweenLogs = 2 * time.Second
+
+// BytesSentRecv records the number of bytes that have been sent since the last traffic check
+// for a given process, as well as the public key of the process sending those bytes.
+type BytesSentRecv struct {
+	Sent uint64
+	Recv uint64
+	// Key is the public key of the client which sent/received these bytes.
+	Key key.Public
+}
+
+// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic.
+// Separated out for ease of testing.
+func parseSSOutput(raw string) map[netaddr.IPPort]BytesSentRecv {
+	newState := map[netaddr.IPPort]BytesSentRecv{}
+	// parse every 2 lines and get src and dst ips, and kv pairs
+	lines := strings.Split(raw, "\n")
+	for i := 0; i < len(lines); i += 2 {
+		ipInfo := strings.Fields(strings.TrimSpace(lines[i]))
+		if len(ipInfo) < 5 {
+			continue
+		}
+		src, err := netaddr.ParseIPPort(ipInfo[3])
+		if err != nil {
+			continue
+		}
+		/*
+		   TODO(jknodt) do we care about the full route or just the src?
+		   dst, err := netaddr.ParseIPPort(string(ipInfo[4]))
+		   if err != nil {
+		     continue
+		   }
+		*/
+		stats := strings.Fields(strings.TrimSpace(lines[i+1]))
+		stat := BytesSentRecv{}
+		for _, s := range stats {
+			if strings.Contains(s, "bytes_sent") {
+				sent, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
+				if err == nil {
+					stat.Sent = uint64(sent)
+				}
+			} else if strings.Contains(s, "bytes_received") {
+				recv, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
+				if err == nil {
+					stat.Recv = uint64(recv)
+				}
+			}
+		}
+		newState[src] = stat
+	}
+	return newState
+}
+
+func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) {
+	prevState := map[netaddr.IPPort]BytesSentRecv{}
+	enc := json.NewEncoder(w)
+	for r.Context().Err() == nil {
+		output, err := exec.Command("ss", "-i", "-H", "-t").Output()
+		if err != nil {
+			fmt.Fprintf(w, "ss failed: %v", err)
+			return
+		}
+		newState := parseSSOutput(string(output))
+		s.mu.Lock()
+		for k, next := range newState {
+			prev := prevState[k]
+			if prev.Sent < next.Sent || prev.Recv < next.Recv {
+				if pkey, ok := s.keyOfAddr[k]; ok {
+					next.Key = pkey
+					if err := enc.Encode(next); err != nil {
+						s.mu.Unlock()
+						return
+					}
+				}
+			}
+		}
+		s.mu.Unlock()
+		prevState = newState
+		if _, err := fmt.Fprintln(w); err != nil {
+			return
+		}
+		if f, ok := w.(http.Flusher); ok {
+			f.Flush()
+		}
+		time.Sleep(minTimeBetweenLogs)
+	}
+}

+ 11 - 0
derp/derp_test.go

@@ -948,3 +948,14 @@ func waitConnect(t testing.TB, c *Client) {
 		t.Fatalf("client first Recv was unexpected type %T", v)
 	}
 }
+
+func TestParseSSOutput(t *testing.T) {
+	contents, err := ioutil.ReadFile("testdata/example_ss.txt")
+	if err != nil {
+		t.Errorf("ioutil.Readfile(example_ss.txt) failed: %v", err)
+	}
+	seen := parseSSOutput(string(contents))
+	if len(seen) == 0 {
+		t.Errorf("parseSSOutput expected non-empty map")
+	}
+}

+ 8 - 0
derp/testdata/example_ss.txt

@@ -0,0 +1,8 @@
+ESTAB 0      0       10.255.1.11:35238  34.210.105.16:https
+         cubic wscale:7,7 rto:236 rtt:34.14/3.432 ato:40 mss:1448 pmtu:1500 rcvmss:1448 advmss:1448 cwnd:8 ssthresh:6 bytes_sent:38056577 bytes_retrans:2918 bytes_acked:38053660 bytes_received:6973211 segs_out:165090 segs_in:124227 data_segs_out:78018 data_segs_in:71645 send 2.71Mbps lastsnd:1156 lastrcv:1120 lastack:1120 pacing_rate 3.26Mbps delivery_rate 2.35Mbps delivered:78017 app_limited busy:2586132ms retrans:0/6 dsack_dups:4 reordering:5 reord_seen:15 rcv_rtt:126355 rcv_space:65780 rcv_ssthresh:541928 minrtt:26.632
+ESTAB 0      80     100.79.58.14:ssh    100.95.73.104:58145
+         cubic wscale:6,7 rto:224 rtt:23.051/2.03 ato:172 mss:1228 pmtu:1280 rcvmss:1228 advmss:1228 cwnd:10 ssthresh:94 bytes_sent:1591815 bytes_retrans:944 bytes_acked:1590791 bytes_received:158925 segs_out:8070 segs_in:8858 data_segs_out:7452 data_segs_in:3789 send 4.26Mbps lastsnd:4 lastrcv:4 lastack:4 pacing_rate 8.52Mbps delivery_rate 10.9Mbps delivered:7451 app_limited busy:61656ms unacked:2 retrans:0/10 dsack_dups:10 rcv_rtt:174712 rcv_space:65025 rcv_ssthresh:64296 minrtt:16.186                                   
+ESTAB 0      374     10.255.1.11:43254 167.172.206.31:https
+         cubic wscale:7,7 rto:224 rtt:22.55/1.941 ato:40 mss:1448 pmtu:1500 rcvmss:1448 advmss:1448 cwnd:6 ssthresh:4 bytes_sent:14594668 bytes_retrans:173314 bytes_acked:14420981 bytes_received:4207111 segs_out:80566 segs_in:70310 data_segs_out:24317 data_segs_in:20365 send 3.08Mbps lastsnd:4 lastrcv:4 lastack:4 pacing_rate 3.7Mbps delivery_rate 3.05Mbps delivered:24111 app_limited busy:184820ms unacked:2 retrans:0/185 dsack_dups:1 reord_seen:3 rcv_rtt:651.262 rcv_space:226657 rcv_ssthresh:1557136 minrtt:10.18           
+ESTAB 0      0       10.255.1.11:33036    3.121.18.47:https
+         cubic wscale:7,7 rto:372 rtt:168.408/2.044 ato:40 mss:1448 pmtu:1500 rcvmss:1448 advmss:1448 cwnd:10 bytes_sent:27500 bytes_acked:27501 bytes_received:1386524 segs_out:10990 segs_in:11037 data_segs_out:303 data_segs_in:3414 send 688kbps lastsnd:125776 lastrcv:9640 lastack:22760 pacing_rate 1.38Mbps delivery_rate 482kbps delivered:304 app_limited busy:43024ms rcv_rtt:3345.12 rcv_space:62431 rcv_ssthresh:760472 minrtt:168.867