Browse Source

cmd/netlogfmt: handle any stream of network logs (#6108)

Make netlogfmt useful regardless of the exact schema of the input.
If a JSON object looks like a network log message,
then unmarshal it as one and then print it.
This allows netlogfmt to support both a stream of JSON objects
directly serialized from netlogtype.Message, or the schema
returned by the /api/v2/tailnet/{{tailnet}}/network-logs API endpoint.

Signed-off-by: Joe Tsai <[email protected]>
Joe Tsai 3 years ago
parent
commit
b2035a1dca
3 changed files with 215 additions and 127 deletions
  1. 209 127
      cmd/netlogfmt/main.go
  2. 2 0
      go.mod
  3. 4 0
      go.sum

+ 209 - 127
cmd/netlogfmt/main.go

@@ -4,13 +4,16 @@
 
 // netlogfmt parses a stream of JSON log messages from stdin and
 // formats the network traffic logs produced by "tailscale.com/wgengine/netlog"
+// according to the schema in "tailscale.com/types/netlogtype.Message"
 // in a more humanly readable format.
 //
 // Example usage:
 //
-//	$ cat netlog.json | netlogfmt
+//	$ cat netlog.json | go run tailscale.com/cmd/netlogfmt
 //	=========================================================================================
-//	Time: 2022-10-13T20:23:09.644Z (5s)
+//	NodeID: n123456CNTRL
+//	Logged: 2022-10-13T20:23:10.165Z
+//	Window: 2022-10-13T20:23:09.644Z (5s)
 //	---------------------------------------------------  Tx[P/s]  Tx[B/s]  Rx[P/s]    Rx[B/s]
 //	VirtualTraffic:                                       16.80    1.64Ki   11.20      1.03Ki
 //	    TCP:    100.109.51.95:22 -> 100.85.80.41:42912    16.00    1.59Ki   10.40   1008.84
@@ -37,8 +40,11 @@ import (
 	"strings"
 	"time"
 
+	"github.com/dsnet/try"
+	jsonv2 "github.com/go-json-experiment/json"
 	"golang.org/x/exp/maps"
 	"golang.org/x/exp/slices"
+	"tailscale.com/logtail"
 	"tailscale.com/types/netlogtype"
 	"tailscale.com/util/must"
 )
@@ -49,156 +55,232 @@ var (
 	tailnetName  = flag.String("tailnet-name", "", "tailnet domain name to lookup devices in; see https://login.tailscale.com/admin/settings/general")
 )
 
+var namesByAddr map[netip.Addr]string
+
 func main() {
 	flag.Parse()
+	if *resolveNames {
+		namesByAddr = mustMakeNamesByAddr()
+	}
 
-	namesByAddr := mustMakeNamesByAddr()
-	dec := json.NewDecoder(os.Stdin)
-	for {
-		// Unmarshal the log message containing network traffics.
-		var msg struct {
-			Logtail struct {
-				ID string `json:"id"`
-			} `json:"logtail"`
-			netlogtype.Message
+	// The logic handles a stream of arbitrary JSON.
+	// So long as a JSON object seems like a network log message,
+	// then this will unmarshal and print it.
+	if err := processStream(os.Stdin); err != nil {
+		if err == io.EOF {
+			return
 		}
-		if err := dec.Decode(&msg); err != nil {
-			if err == io.EOF {
-				break
+		log.Fatalf("processStream: %v", err)
+	}
+}
+
+func processStream(r io.Reader) (err error) {
+	defer try.Handle(&err)
+	dec := jsonv2.NewDecoder(os.Stdin)
+	for {
+		processValue(dec)
+	}
+}
+
+func processValue(dec *jsonv2.Decoder) {
+	switch dec.PeekKind() {
+	case '[':
+		processArray(dec)
+	case '{':
+		processObject(dec)
+	default:
+		try.E(dec.SkipValue())
+	}
+}
+
+func processArray(dec *jsonv2.Decoder) {
+	try.E1(dec.ReadToken()) // parse '['
+	for dec.PeekKind() != ']' {
+		processValue(dec)
+	}
+	try.E1(dec.ReadToken()) // parse ']'
+}
+
+func processObject(dec *jsonv2.Decoder) {
+	var hasTraffic bool
+	var rawMsg []byte
+	try.E1(dec.ReadToken()) // parse '{'
+	for dec.PeekKind() != '}' {
+		// Capture any members that could belong to a network log message.
+		switch name := try.E1(dec.ReadToken()); name.String() {
+		case "virtualTraffic", "subnetTraffic", "exitTraffic", "physicalTraffic":
+			hasTraffic = true
+			fallthrough
+		case "logtail", "nodeId", "logged", "start", "end":
+			if len(rawMsg) == 0 {
+				rawMsg = append(rawMsg, '{')
+			} else {
+				rawMsg = append(rawMsg[:len(rawMsg)-1], ',')
 			}
-			log.Fatalf("UnmarshalNext: %v", err)
-		}
-		if len(msg.VirtualTraffic)+len(msg.SubnetTraffic)+len(msg.ExitTraffic)+len(msg.PhysicalTraffic) == 0 {
-			continue // nothing to print
+			rawMsg = append(append(append(rawMsg, '"'), name.String()...), '"')
+			rawMsg = append(rawMsg, ':')
+			rawMsg = append(rawMsg, try.E1(dec.ReadValue())...)
+			rawMsg = append(rawMsg, '}')
+		default:
+			processValue(dec)
 		}
+	}
+	try.E1(dec.ReadToken()) // parse '}'
 
-		// Construct a table of network traffic per connection.
-		rows := [][7]string{{3: "Tx[P/s]", 4: "Tx[B/s]", 5: "Rx[P/s]", 6: "Rx[B/s]"}}
-		duration := msg.End.Sub(msg.Start)
-		addRows := func(heading string, traffic []netlogtype.ConnectionCounts) {
-			if len(traffic) == 0 {
-				return
-			}
-			slices.SortFunc(traffic, func(x, y netlogtype.ConnectionCounts) bool {
-				nx := x.TxPackets + x.TxBytes + x.RxPackets + x.RxBytes
-				ny := y.TxPackets + y.TxBytes + y.RxPackets + y.RxBytes
-				return nx > ny
-			})
-			var sum netlogtype.Counts
-			for _, cc := range traffic {
-				sum = sum.Add(cc.Counts)
-			}
-			rows = append(rows, [7]string{
-				0: heading + ":",
-				3: formatSI(float64(sum.TxPackets) / duration.Seconds()),
-				4: formatIEC(float64(sum.TxBytes) / duration.Seconds()),
-				5: formatSI(float64(sum.RxPackets) / duration.Seconds()),
-				6: formatIEC(float64(sum.RxBytes) / duration.Seconds()),
-			})
-			if len(traffic) == 1 && traffic[0].Connection.IsZero() {
-				return // this is already a summary counts
+	// If this appears to be a network log message, then unmarshal and print it.
+	if hasTraffic {
+		var msg message
+		try.E(jsonv2.Unmarshal(rawMsg, &msg))
+		printMessage(msg)
+	}
+}
+
+type message struct {
+	Logtail struct {
+		ID     logtail.PublicID `json:"id"`
+		Logged time.Time        `json:"server_time"`
+	} `json:"logtail"`
+	Logged time.Time `json:"logged"`
+	netlogtype.Message
+}
+
+func printMessage(msg message) {
+	// Construct a table of network traffic per connection.
+	rows := [][7]string{{3: "Tx[P/s]", 4: "Tx[B/s]", 5: "Rx[P/s]", 6: "Rx[B/s]"}}
+	duration := msg.End.Sub(msg.Start)
+	addRows := func(heading string, traffic []netlogtype.ConnectionCounts) {
+		if len(traffic) == 0 {
+			return
+		}
+		slices.SortFunc(traffic, func(x, y netlogtype.ConnectionCounts) bool {
+			nx := x.TxPackets + x.TxBytes + x.RxPackets + x.RxBytes
+			ny := y.TxPackets + y.TxBytes + y.RxPackets + y.RxBytes
+			return nx > ny
+		})
+		var sum netlogtype.Counts
+		for _, cc := range traffic {
+			sum = sum.Add(cc.Counts)
+		}
+		rows = append(rows, [7]string{
+			0: heading + ":",
+			3: formatSI(float64(sum.TxPackets) / duration.Seconds()),
+			4: formatIEC(float64(sum.TxBytes) / duration.Seconds()),
+			5: formatSI(float64(sum.RxPackets) / duration.Seconds()),
+			6: formatIEC(float64(sum.RxBytes) / duration.Seconds()),
+		})
+		if len(traffic) == 1 && traffic[0].Connection.IsZero() {
+			return // this is already a summary counts
+		}
+		formatAddrPort := func(a netip.AddrPort) string {
+			if !a.IsValid() {
+				return ""
 			}
-			formatAddrPort := func(a netip.AddrPort) string {
-				if !a.IsValid() {
-					return ""
-				}
-				if name, ok := namesByAddr[a.Addr()]; ok {
-					if a.Port() == 0 {
-						return name
-					}
-					return name + ":" + strconv.Itoa(int(a.Port()))
-				}
+			if name, ok := namesByAddr[a.Addr()]; ok {
 				if a.Port() == 0 {
-					return a.Addr().String()
+					return name
 				}
-				return a.String()
+				return name + ":" + strconv.Itoa(int(a.Port()))
 			}
-			for _, cc := range traffic {
-				row := [7]string{
-					0: "    ",
-					1: formatAddrPort(cc.Src),
-					2: formatAddrPort(cc.Dst),
-					3: formatSI(float64(cc.TxPackets) / duration.Seconds()),
-					4: formatIEC(float64(cc.TxBytes) / duration.Seconds()),
-					5: formatSI(float64(cc.RxPackets) / duration.Seconds()),
-					6: formatIEC(float64(cc.RxBytes) / duration.Seconds()),
-				}
-				if cc.Proto > 0 {
-					row[0] += cc.Proto.String() + ":"
-				}
-				rows = append(rows, row)
+			if a.Port() == 0 {
+				return a.Addr().String()
 			}
+			return a.String()
 		}
-		addRows("VirtualTraffic", msg.VirtualTraffic)
-		addRows("SubnetTraffic", msg.SubnetTraffic)
-		addRows("ExitTraffic", msg.ExitTraffic)
-		addRows("PhysicalTraffic", msg.PhysicalTraffic)
-
-		// Compute the maximum width of each field.
-		var maxWidths [7]int
-		for _, row := range rows {
-			for i, col := range row {
-				if maxWidths[i] < len(col) && !(i == 0 && !strings.HasPrefix(col, " ")) {
-					maxWidths[i] = len(col)
-				}
+		for _, cc := range traffic {
+			row := [7]string{
+				0: "    ",
+				1: formatAddrPort(cc.Src),
+				2: formatAddrPort(cc.Dst),
+				3: formatSI(float64(cc.TxPackets) / duration.Seconds()),
+				4: formatIEC(float64(cc.TxBytes) / duration.Seconds()),
+				5: formatSI(float64(cc.RxPackets) / duration.Seconds()),
+				6: formatIEC(float64(cc.RxBytes) / duration.Seconds()),
 			}
+			if cc.Proto > 0 {
+				row[0] += cc.Proto.String() + ":"
+			}
+			rows = append(rows, row)
 		}
-		var maxSum int
-		for _, n := range maxWidths {
-			maxSum += n
-		}
+	}
+	addRows("VirtualTraffic", msg.VirtualTraffic)
+	addRows("SubnetTraffic", msg.SubnetTraffic)
+	addRows("ExitTraffic", msg.ExitTraffic)
+	addRows("PhysicalTraffic", msg.PhysicalTraffic)
 
-		// Output a table of network traffic per connection.
-		line := make([]byte, 0, maxSum+len(" ")+len(" -> ")+4*len("  "))
-		line = appendRepeatByte(line, '=', cap(line))
-		fmt.Println(string(line))
-		if msg.Logtail.ID != "" {
-			fmt.Printf("ID:   %s\n", msg.Logtail.ID)
+	// Compute the maximum width of each field.
+	var maxWidths [7]int
+	for _, row := range rows {
+		for i, col := range row {
+			if maxWidths[i] < len(col) && !(i == 0 && !strings.HasPrefix(col, " ")) {
+				maxWidths[i] = len(col)
+			}
 		}
-		fmt.Printf("Time: %s (%s)\n", msg.Start.Round(time.Millisecond).Format(time.RFC3339Nano), duration.Round(time.Millisecond))
-		for i, row := range rows {
-			line = line[:0]
-			isHeading := !strings.HasPrefix(row[0], " ")
-			for j, col := range row {
-				if isHeading && j == 0 {
-					col = "" // headings will be printed later
-				}
-				switch j {
-				case 0, 2: // left justified
-					line = append(line, col...)
-					line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
-				case 1, 3, 4, 5, 6: // right justified
-					line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
-					line = append(line, col...)
-				}
-				switch j {
-				case 0:
-					line = append(line, " "...)
-				case 1:
-					if row[1] == "" && row[2] == "" {
-						line = append(line, "    "...)
-					} else {
-						line = append(line, " -> "...)
-					}
-				case 2, 3, 4, 5:
-					line = append(line, "  "...)
-				}
+	}
+	var maxSum int
+	for _, n := range maxWidths {
+		maxSum += n
+	}
+
+	// Output a table of network traffic per connection.
+	line := make([]byte, 0, maxSum+len(" ")+len(" -> ")+4*len("  "))
+	line = appendRepeatByte(line, '=', cap(line))
+	fmt.Println(string(line))
+	if !msg.Logtail.ID.IsZero() {
+		fmt.Printf("LogID:  %s\n", msg.Logtail.ID)
+	}
+	if msg.NodeID != "" {
+		fmt.Printf("NodeID: %s\n", msg.NodeID)
+	}
+	formatTime := func(t time.Time) string {
+		return t.In(time.Local).Format("2006-01-02 15:04:05.000")
+	}
+	switch {
+	case !msg.Logged.IsZero():
+		fmt.Printf("Logged: %s\n", formatTime(msg.Logged))
+	case !msg.Logtail.Logged.IsZero():
+		fmt.Printf("Logged: %s\n", formatTime(msg.Logtail.Logged))
+	}
+	fmt.Printf("Window: %s (%0.3fs)\n", formatTime(msg.Start), duration.Seconds())
+	for i, row := range rows {
+		line = line[:0]
+		isHeading := !strings.HasPrefix(row[0], " ")
+		for j, col := range row {
+			if isHeading && j == 0 {
+				col = "" // headings will be printed later
+			}
+			switch j {
+			case 0, 2: // left justified
+				line = append(line, col...)
+				line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
+			case 1, 3, 4, 5, 6: // right justified
+				line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
+				line = append(line, col...)
 			}
-			switch {
-			case i == 0: // print dashed-line table heading
-				line = appendRepeatByte(line[:0], '-', maxWidths[0]+len(" ")+maxWidths[1]+len(" -> ")+maxWidths[2])[:cap(line)]
-			case isHeading:
-				copy(line[:], row[0])
+			switch j {
+			case 0:
+				line = append(line, " "...)
+			case 1:
+				if row[1] == "" && row[2] == "" {
+					line = append(line, "    "...)
+				} else {
+					line = append(line, " -> "...)
+				}
+			case 2, 3, 4, 5:
+				line = append(line, "  "...)
 			}
-			fmt.Println(string(line))
 		}
+		switch {
+		case i == 0: // print dashed-line table heading
+			line = appendRepeatByte(line[:0], '-', maxWidths[0]+len(" ")+maxWidths[1]+len(" -> ")+maxWidths[2])[:cap(line)]
+		case isHeading:
+			copy(line[:], row[0])
+		}
+		fmt.Println(string(line))
 	}
 }
 
 func mustMakeNamesByAddr() map[netip.Addr]string {
 	switch {
-	case !*resolveNames:
-		return nil
 	case *apiKey == "":
 		log.Fatalf("--api-key must be specified with --resolve-names")
 	case *tailnetName == "":

+ 2 - 0
go.mod

@@ -17,9 +17,11 @@ require (
 	github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
 	github.com/creack/pty v1.1.17
 	github.com/dave/jennifer v1.4.1
+	github.com/dsnet/try v0.0.3
 	github.com/evanw/esbuild v0.14.53
 	github.com/frankban/quicktest v1.14.0
 	github.com/fxamacker/cbor/v2 v2.4.0
+	github.com/go-json-experiment/json v0.0.0-20221017203807-c5ed296b8c92
 	github.com/go-ole/go-ole v1.2.6
 	github.com/godbus/dbus/v5 v5.0.6
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da

+ 4 - 0
go.sum

@@ -261,6 +261,8 @@ github.com/docker/docker v20.10.16+incompatible h1:2Db6ZR/+FUR3hqPMwnogOPHFn405c
 github.com/docker/docker v20.10.16+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
 github.com/docker/docker-credential-helpers v0.6.4 h1:axCks+yV+2MR3/kZhAmy07yC56WZ2Pwu/fKWtKuZB0o=
 github.com/docker/docker-credential-helpers v0.6.4/go.mod h1:ofX3UI0Gz1TteYBjtgs07O36Pyasyp66D2uKT7H8W1c=
+github.com/dsnet/try v0.0.3 h1:ptR59SsrcFUYbT/FhAbKTV6iLkeD6O18qfIWRml2fqI=
+github.com/dsnet/try v0.0.3/go.mod h1:WBM8tRpUmnXXhY1U6/S8dt6UWdHTQ7y8A5YSkRCkq40=
 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
@@ -328,6 +330,8 @@ github.com/go-git/go-git/v5 v5.4.2/go.mod h1:gQ1kArt6d+n+BGd+/B/I74HwRTLhth2+zti
 github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
+github.com/go-json-experiment/json v0.0.0-20221017203807-c5ed296b8c92 h1:eoE7yxLELqDQVlHGoYYxXLFZqF8NcdOnrukTm4ObJaY=
+github.com/go-json-experiment/json v0.0.0-20221017203807-c5ed296b8c92/go.mod h1:I+I5/LT2lLP0eZsBNaVDrOrYASx9h7o7mRHmy+535/A=
 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
 github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=