| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- // Copyright (c) Tailscale Inc & AUTHORS
- // SPDX-License-Identifier: BSD-3-Clause
- // netlogfmt parses a stream of JSON log messages from stdin and
- // formats the network traffic logs produced by "tailscale.com/wgengine/netlog"
- // according to the schema in "tailscale.com/types/netlogtype.Message"
- // in a more humanly readable format.
- //
- // Example usage:
- //
- // $ cat netlog.json | go run tailscale.com/cmd/netlogfmt
- // =========================================================================================
- // NodeID: n123456CNTRL
- // Logged: 2022-10-13T20:23:10.165Z
- // Window: 2022-10-13T20:23:09.644Z (5s)
- // --------------------------------------------------- Tx[P/s] Tx[B/s] Rx[P/s] Rx[B/s]
- // VirtualTraffic: 16.80 1.64Ki 11.20 1.03Ki
- // TCP: 100.109.51.95:22 -> 100.85.80.41:42912 16.00 1.59Ki 10.40 1008.84
- // TCP: 100.109.51.95:21291 -> 100.107.177.2:53133 0.40 27.60 0.40 24.20
- // TCP: 100.109.51.95:21291 -> 100.107.177.2:53134 0.40 23.40 0.40 24.20
- // PhysicalTraffic: 16.80 2.32Ki 11.20 1.48Ki
- // 100.85.80.41 -> 192.168.0.101:41641 16.00 2.23Ki 10.40 1.40Ki
- // 100.107.177.2 -> 192.168.0.100:41641 0.80 83.20 0.80 83.20
- // =========================================================================================
- package main
- import (
- "cmp"
- "encoding/base64"
- "encoding/json"
- "flag"
- "fmt"
- "io"
- "log"
- "math"
- "net/http"
- "net/netip"
- "os"
- "slices"
- "strconv"
- "strings"
- "time"
- "github.com/dsnet/try"
- jsonv2 "github.com/go-json-experiment/json"
- "github.com/go-json-experiment/json/jsontext"
- "tailscale.com/tailcfg"
- "tailscale.com/types/bools"
- "tailscale.com/types/logid"
- "tailscale.com/types/netlogtype"
- "tailscale.com/util/must"
- )
- var (
- resolveNames = flag.Bool("resolve-names", false, "This is equivalent to specifying \"--resolve-addrs=name\".")
- resolveAddrs = flag.String("resolve-addrs", "", "Resolve each tailscale IP address as a node ID, name, or user.\n"+
- "If network flow logs do not support embedded node information,\n"+
- "then --api-key and --tailnet-name must also be provided.\n"+
- "Valid values include \"nodeId\", \"name\", or \"user\".")
- apiKey = flag.String("api-key", "", "The API key to query the Tailscale API with.\nSee https://login.tailscale.com/admin/settings/keys")
- tailnetName = flag.String("tailnet-name", "", "The Tailnet name to lookup nodes within.\nSee https://login.tailscale.com/admin/settings/general")
- )
- var (
- tailnetNodesByAddr map[netip.Addr]netlogtype.Node
- tailnetNodesByID map[tailcfg.StableNodeID]netlogtype.Node
- )
- func main() {
- flag.Parse()
- if *resolveNames {
- *resolveAddrs = "name"
- }
- *resolveAddrs = strings.ToLower(*resolveAddrs) // make case-insensitive
- *resolveAddrs = strings.TrimSuffix(*resolveAddrs, "s") // allow plural form
- *resolveAddrs = strings.ReplaceAll(*resolveAddrs, " ", "") // ignore spaces
- *resolveAddrs = strings.ReplaceAll(*resolveAddrs, "-", "") // ignore dashes
- *resolveAddrs = strings.ReplaceAll(*resolveAddrs, "_", "") // ignore underscores
- switch *resolveAddrs {
- case "id", "nodeid":
- *resolveAddrs = "nodeid"
- case "name", "hostname":
- *resolveAddrs = "name"
- case "user", "tag", "usertag", "taguser":
- *resolveAddrs = "user" // tag resolution is implied
- default:
- log.Fatalf("--resolve-addrs must be \"nodeId\", \"name\", or \"user\"")
- }
- mustLoadTailnetNodes()
- // The logic handles a stream of arbitrary JSON.
- // So long as a JSON object seems like a network log message,
- // then this will unmarshal and print it.
- if err := processStream(os.Stdin); err != nil {
- if err == io.EOF {
- return
- }
- log.Fatalf("processStream: %v", err)
- }
- }
- func processStream(r io.Reader) (err error) {
- defer try.Handle(&err)
- dec := jsontext.NewDecoder(os.Stdin)
- for {
- processValue(dec)
- }
- }
- func processValue(dec *jsontext.Decoder) {
- switch dec.PeekKind() {
- case '[':
- processArray(dec)
- case '{':
- processObject(dec)
- default:
- try.E(dec.SkipValue())
- }
- }
- func processArray(dec *jsontext.Decoder) {
- try.E1(dec.ReadToken()) // parse '['
- for dec.PeekKind() != ']' {
- processValue(dec)
- }
- try.E1(dec.ReadToken()) // parse ']'
- }
- func processObject(dec *jsontext.Decoder) {
- var hasTraffic bool
- var rawMsg jsontext.Value
- try.E1(dec.ReadToken()) // parse '{'
- for dec.PeekKind() != '}' {
- // Capture any members that could belong to a network log message.
- switch name := try.E1(dec.ReadToken()); name.String() {
- case "virtualTraffic", "subnetTraffic", "exitTraffic", "physicalTraffic":
- hasTraffic = true
- fallthrough
- case "logtail", "nodeId", "logged", "srcNode", "dstNodes", "start", "end":
- if len(rawMsg) == 0 {
- rawMsg = append(rawMsg, '{')
- } else {
- rawMsg = append(rawMsg[:len(rawMsg)-1], ',')
- }
- rawMsg, _ = jsontext.AppendQuote(rawMsg, name.String())
- rawMsg = append(rawMsg, ':')
- rawMsg = append(rawMsg, try.E1(dec.ReadValue())...)
- rawMsg = append(rawMsg, '}')
- default:
- processValue(dec)
- }
- }
- try.E1(dec.ReadToken()) // parse '}'
- // If this appears to be a network log message, then unmarshal and print it.
- if hasTraffic {
- var msg message
- try.E(jsonv2.Unmarshal(rawMsg, &msg))
- printMessage(msg)
- }
- }
- type message struct {
- Logtail struct {
- ID logid.PublicID `json:"id"`
- Logged time.Time `json:"server_time"`
- } `json:"logtail"`
- Logged time.Time `json:"logged"`
- netlogtype.Message
- }
- func printMessage(msg message) {
- var nodesByAddr map[netip.Addr]netlogtype.Node
- var tailnetDNS string // e.g., ".acme-corp.ts.net"
- if *resolveAddrs != "" {
- nodesByAddr = make(map[netip.Addr]netlogtype.Node)
- insertNode := func(node netlogtype.Node) {
- for _, addr := range node.Addresses {
- nodesByAddr[addr] = node
- }
- }
- for _, node := range msg.DstNodes {
- insertNode(node)
- }
- insertNode(msg.SrcNode)
- // Derive the Tailnet DNS of the self node.
- detectTailnetDNS := func(nodeName string) {
- if prefix, ok := strings.CutSuffix(nodeName, ".ts.net"); ok {
- if i := strings.LastIndexByte(prefix, '.'); i > 0 {
- tailnetDNS = nodeName[i:]
- }
- }
- }
- detectTailnetDNS(msg.SrcNode.Name)
- detectTailnetDNS(tailnetNodesByID[msg.NodeID].Name)
- }
- // Construct a table of network traffic per connection.
- rows := [][7]string{{3: "Tx[P/s]", 4: "Tx[B/s]", 5: "Rx[P/s]", 6: "Rx[B/s]"}}
- duration := msg.End.Sub(msg.Start)
- addRows := func(heading string, traffic []netlogtype.ConnectionCounts) {
- if len(traffic) == 0 {
- return
- }
- slices.SortFunc(traffic, func(x, y netlogtype.ConnectionCounts) int {
- nx := x.TxPackets + x.TxBytes + x.RxPackets + x.RxBytes
- ny := y.TxPackets + y.TxBytes + y.RxPackets + y.RxBytes
- return cmp.Compare(ny, nx)
- })
- var sum netlogtype.Counts
- for _, cc := range traffic {
- sum = sum.Add(cc.Counts)
- }
- rows = append(rows, [7]string{
- 0: heading + ":",
- 3: formatSI(float64(sum.TxPackets) / duration.Seconds()),
- 4: formatIEC(float64(sum.TxBytes) / duration.Seconds()),
- 5: formatSI(float64(sum.RxPackets) / duration.Seconds()),
- 6: formatIEC(float64(sum.RxBytes) / duration.Seconds()),
- })
- if len(traffic) == 1 && traffic[0].Connection.IsZero() {
- return // this is already a summary counts
- }
- formatAddrPort := func(a netip.AddrPort) string {
- if !a.IsValid() {
- return ""
- }
- name := a.Addr().String()
- node, ok := tailnetNodesByAddr[a.Addr()]
- if !ok {
- node, ok = nodesByAddr[a.Addr()]
- }
- if ok {
- switch *resolveAddrs {
- case "nodeid":
- name = cmp.Or(string(node.NodeID), name)
- case "name":
- name = cmp.Or(strings.TrimSuffix(string(node.Name), tailnetDNS), name)
- case "user":
- name = cmp.Or(bools.IfElse(len(node.Tags) > 0, fmt.Sprint(node.Tags), node.User), name)
- }
- }
- if a.Port() != 0 {
- return name + ":" + strconv.Itoa(int(a.Port()))
- }
- return name
- }
- for _, cc := range traffic {
- row := [7]string{
- 0: " ",
- 1: formatAddrPort(cc.Src),
- 2: formatAddrPort(cc.Dst),
- 3: formatSI(float64(cc.TxPackets) / duration.Seconds()),
- 4: formatIEC(float64(cc.TxBytes) / duration.Seconds()),
- 5: formatSI(float64(cc.RxPackets) / duration.Seconds()),
- 6: formatIEC(float64(cc.RxBytes) / duration.Seconds()),
- }
- if cc.Proto > 0 {
- row[0] += cc.Proto.String() + ":"
- }
- rows = append(rows, row)
- }
- }
- addRows("VirtualTraffic", msg.VirtualTraffic)
- addRows("SubnetTraffic", msg.SubnetTraffic)
- addRows("ExitTraffic", msg.ExitTraffic)
- addRows("PhysicalTraffic", msg.PhysicalTraffic)
- // Compute the maximum width of each field.
- var maxWidths [7]int
- for _, row := range rows {
- for i, col := range row {
- if maxWidths[i] < len(col) && !(i == 0 && !strings.HasPrefix(col, " ")) {
- maxWidths[i] = len(col)
- }
- }
- }
- var maxSum int
- for _, n := range maxWidths {
- maxSum += n
- }
- // Output a table of network traffic per connection.
- line := make([]byte, 0, maxSum+len(" ")+len(" -> ")+4*len(" "))
- line = appendRepeatByte(line, '=', cap(line))
- fmt.Println(string(line))
- if !msg.Logtail.ID.IsZero() {
- fmt.Printf("LogID: %s\n", msg.Logtail.ID)
- }
- if msg.NodeID != "" {
- fmt.Printf("NodeID: %s\n", msg.NodeID)
- }
- formatTime := func(t time.Time) string {
- return t.In(time.Local).Format("2006-01-02 15:04:05.000")
- }
- switch {
- case !msg.Logged.IsZero():
- fmt.Printf("Logged: %s\n", formatTime(msg.Logged))
- case !msg.Logtail.Logged.IsZero():
- fmt.Printf("Logged: %s\n", formatTime(msg.Logtail.Logged))
- }
- fmt.Printf("Window: %s (%0.3fs)\n", formatTime(msg.Start), duration.Seconds())
- for i, row := range rows {
- line = line[:0]
- isHeading := !strings.HasPrefix(row[0], " ")
- for j, col := range row {
- if isHeading && j == 0 {
- col = "" // headings will be printed later
- }
- switch j {
- case 0, 2: // left justified
- line = append(line, col...)
- line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
- case 1, 3, 4, 5, 6: // right justified
- line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
- line = append(line, col...)
- }
- switch j {
- case 0:
- line = append(line, " "...)
- case 1:
- if row[1] == "" && row[2] == "" {
- line = append(line, " "...)
- } else {
- line = append(line, " -> "...)
- }
- case 2, 3, 4, 5:
- line = append(line, " "...)
- }
- }
- switch {
- case i == 0: // print dashed-line table heading
- line = appendRepeatByte(line[:0], '-', maxWidths[0]+len(" ")+maxWidths[1]+len(" -> ")+maxWidths[2])[:cap(line)]
- case isHeading:
- copy(line[:], row[0])
- }
- fmt.Println(string(line))
- }
- }
- func mustLoadTailnetNodes() {
- switch {
- case *apiKey == "" && *tailnetName == "":
- return // rely on embedded node information in the logs themselves
- case *apiKey == "":
- log.Fatalf("--api-key must be specified with --resolve-names")
- case *tailnetName == "":
- log.Fatalf("--tailnet must be specified with --resolve-names")
- }
- // Query the Tailscale API for a list of devices in the tailnet.
- const apiURL = "https://api.tailscale.com/api/v2"
- req := must.Get(http.NewRequest("GET", apiURL+"/tailnet/"+*tailnetName+"/devices", nil))
- req.Header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(*apiKey+":")))
- resp := must.Get(http.DefaultClient.Do(req))
- defer resp.Body.Close()
- b := must.Get(io.ReadAll(resp.Body))
- if resp.StatusCode != 200 {
- log.Fatalf("http: %v: %s", http.StatusText(resp.StatusCode), b)
- }
- // Unmarshal the API response.
- var m struct {
- Devices []netlogtype.Node `json:"devices"`
- }
- must.Do(json.Unmarshal(b, &m))
- // Construct a mapping of Tailscale IP addresses to node information.
- tailnetNodesByAddr = make(map[netip.Addr]netlogtype.Node)
- tailnetNodesByID = make(map[tailcfg.StableNodeID]netlogtype.Node)
- for _, node := range m.Devices {
- for _, addr := range node.Addresses {
- tailnetNodesByAddr[addr] = node
- }
- tailnetNodesByID[node.NodeID] = node
- }
- }
- func appendRepeatByte(b []byte, c byte, n int) []byte {
- for range n {
- b = append(b, c)
- }
- return b
- }
- func formatSI(n float64) string {
- switch n := math.Abs(n); {
- case n < 1e3:
- return fmt.Sprintf("%0.2f ", n/(1e0))
- case n < 1e6:
- return fmt.Sprintf("%0.2fk", n/(1e3))
- case n < 1e9:
- return fmt.Sprintf("%0.2fM", n/(1e6))
- default:
- return fmt.Sprintf("%0.2fG", n/(1e9))
- }
- }
- func formatIEC(n float64) string {
- switch n := math.Abs(n); {
- case n < 1<<10:
- return fmt.Sprintf("%0.2f ", n/(1<<0))
- case n < 1<<20:
- return fmt.Sprintf("%0.2fKi", n/(1<<10))
- case n < 1<<30:
- return fmt.Sprintf("%0.2fMi", n/(1<<20))
- default:
- return fmt.Sprintf("%0.2fGi", n/(1<<30))
- }
- }
|