main.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // netlogfmt parses a stream of JSON log messages from stdin and
  4. // formats the network traffic logs produced by "tailscale.com/wgengine/netlog"
  5. // according to the schema in "tailscale.com/types/netlogtype.Message"
  6. // in a more humanly readable format.
  7. //
  8. // Example usage:
  9. //
  10. // $ cat netlog.json | go run tailscale.com/cmd/netlogfmt
  11. // =========================================================================================
  12. // NodeID: n123456CNTRL
  13. // Logged: 2022-10-13T20:23:10.165Z
  14. // Window: 2022-10-13T20:23:09.644Z (5s)
  15. // --------------------------------------------------- Tx[P/s] Tx[B/s] Rx[P/s] Rx[B/s]
  16. // VirtualTraffic: 16.80 1.64Ki 11.20 1.03Ki
  17. // TCP: 100.109.51.95:22 -> 100.85.80.41:42912 16.00 1.59Ki 10.40 1008.84
  18. // TCP: 100.109.51.95:21291 -> 100.107.177.2:53133 0.40 27.60 0.40 24.20
  19. // TCP: 100.109.51.95:21291 -> 100.107.177.2:53134 0.40 23.40 0.40 24.20
  20. // PhysicalTraffic: 16.80 2.32Ki 11.20 1.48Ki
  21. // 100.85.80.41 -> 192.168.0.101:41641 16.00 2.23Ki 10.40 1.40Ki
  22. // 100.107.177.2 -> 192.168.0.100:41641 0.80 83.20 0.80 83.20
  23. // =========================================================================================
  24. package main
  25. import (
  26. "cmp"
  27. "encoding/base64"
  28. "encoding/json"
  29. "flag"
  30. "fmt"
  31. "io"
  32. "log"
  33. "math"
  34. "net/http"
  35. "net/netip"
  36. "os"
  37. "slices"
  38. "strconv"
  39. "strings"
  40. "time"
  41. "github.com/dsnet/try"
  42. jsonv2 "github.com/go-json-experiment/json"
  43. "github.com/go-json-experiment/json/jsontext"
  44. "tailscale.com/tailcfg"
  45. "tailscale.com/types/bools"
  46. "tailscale.com/types/logid"
  47. "tailscale.com/types/netlogtype"
  48. "tailscale.com/util/must"
  49. )
  50. var (
  51. resolveNames = flag.Bool("resolve-names", false, "This is equivalent to specifying \"--resolve-addrs=name\".")
  52. resolveAddrs = flag.String("resolve-addrs", "", "Resolve each tailscale IP address as a node ID, name, or user.\n"+
  53. "If network flow logs do not support embedded node information,\n"+
  54. "then --api-key and --tailnet-name must also be provided.\n"+
  55. "Valid values include \"nodeId\", \"name\", or \"user\".")
  56. apiKey = flag.String("api-key", "", "The API key to query the Tailscale API with.\nSee https://login.tailscale.com/admin/settings/keys")
  57. tailnetName = flag.String("tailnet-name", "", "The Tailnet name to lookup nodes within.\nSee https://login.tailscale.com/admin/settings/general")
  58. )
  59. var (
  60. tailnetNodesByAddr map[netip.Addr]netlogtype.Node
  61. tailnetNodesByID map[tailcfg.StableNodeID]netlogtype.Node
  62. )
  63. func main() {
  64. flag.Parse()
  65. if *resolveNames {
  66. *resolveAddrs = "name"
  67. }
  68. *resolveAddrs = strings.ToLower(*resolveAddrs) // make case-insensitive
  69. *resolveAddrs = strings.TrimSuffix(*resolveAddrs, "s") // allow plural form
  70. *resolveAddrs = strings.ReplaceAll(*resolveAddrs, " ", "") // ignore spaces
  71. *resolveAddrs = strings.ReplaceAll(*resolveAddrs, "-", "") // ignore dashes
  72. *resolveAddrs = strings.ReplaceAll(*resolveAddrs, "_", "") // ignore underscores
  73. switch *resolveAddrs {
  74. case "":
  75. case "id", "nodeid":
  76. *resolveAddrs = "nodeid"
  77. case "name", "hostname":
  78. *resolveAddrs = "name"
  79. case "user", "tag", "usertag", "taguser":
  80. *resolveAddrs = "user" // tag resolution is implied
  81. default:
  82. log.Fatalf("--resolve-addrs must be \"nodeId\", \"name\", or \"user\"")
  83. }
  84. mustLoadTailnetNodes()
  85. // The logic handles a stream of arbitrary JSON.
  86. // So long as a JSON object seems like a network log message,
  87. // then this will unmarshal and print it.
  88. if err := processStream(os.Stdin); err != nil {
  89. if err == io.EOF {
  90. return
  91. }
  92. log.Fatalf("processStream: %v", err)
  93. }
  94. }
  95. func processStream(r io.Reader) (err error) {
  96. defer try.Handle(&err)
  97. dec := jsontext.NewDecoder(os.Stdin)
  98. for {
  99. processValue(dec)
  100. }
  101. }
  102. func processValue(dec *jsontext.Decoder) {
  103. switch dec.PeekKind() {
  104. case '[':
  105. processArray(dec)
  106. case '{':
  107. processObject(dec)
  108. default:
  109. try.E(dec.SkipValue())
  110. }
  111. }
  112. func processArray(dec *jsontext.Decoder) {
  113. try.E1(dec.ReadToken()) // parse '['
  114. for dec.PeekKind() != ']' {
  115. processValue(dec)
  116. }
  117. try.E1(dec.ReadToken()) // parse ']'
  118. }
  119. func processObject(dec *jsontext.Decoder) {
  120. var hasTraffic bool
  121. var rawMsg jsontext.Value
  122. try.E1(dec.ReadToken()) // parse '{'
  123. for dec.PeekKind() != '}' {
  124. // Capture any members that could belong to a network log message.
  125. switch name := try.E1(dec.ReadToken()); name.String() {
  126. case "virtualTraffic", "subnetTraffic", "exitTraffic", "physicalTraffic":
  127. hasTraffic = true
  128. fallthrough
  129. case "logtail", "nodeId", "logged", "srcNode", "dstNodes", "start", "end":
  130. if len(rawMsg) == 0 {
  131. rawMsg = append(rawMsg, '{')
  132. } else {
  133. rawMsg = append(rawMsg[:len(rawMsg)-1], ',')
  134. }
  135. rawMsg, _ = jsontext.AppendQuote(rawMsg, name.String())
  136. rawMsg = append(rawMsg, ':')
  137. rawMsg = append(rawMsg, try.E1(dec.ReadValue())...)
  138. rawMsg = append(rawMsg, '}')
  139. default:
  140. processValue(dec)
  141. }
  142. }
  143. try.E1(dec.ReadToken()) // parse '}'
  144. // If this appears to be a network log message, then unmarshal and print it.
  145. if hasTraffic {
  146. var msg message
  147. try.E(jsonv2.Unmarshal(rawMsg, &msg))
  148. printMessage(msg)
  149. }
  150. }
  151. type message struct {
  152. Logtail struct {
  153. ID logid.PublicID `json:"id"`
  154. Logged time.Time `json:"server_time"`
  155. } `json:"logtail"`
  156. Logged time.Time `json:"logged"`
  157. netlogtype.Message
  158. }
  159. func printMessage(msg message) {
  160. var nodesByAddr map[netip.Addr]netlogtype.Node
  161. var tailnetDNS string // e.g., ".acme-corp.ts.net"
  162. if *resolveAddrs != "" {
  163. nodesByAddr = make(map[netip.Addr]netlogtype.Node)
  164. insertNode := func(node netlogtype.Node) {
  165. for _, addr := range node.Addresses {
  166. nodesByAddr[addr] = node
  167. }
  168. }
  169. for _, node := range msg.DstNodes {
  170. insertNode(node)
  171. }
  172. insertNode(msg.SrcNode)
  173. // Derive the Tailnet DNS of the self node.
  174. detectTailnetDNS := func(nodeName string) {
  175. if prefix, ok := strings.CutSuffix(nodeName, ".ts.net"); ok {
  176. if i := strings.LastIndexByte(prefix, '.'); i > 0 {
  177. tailnetDNS = nodeName[i:]
  178. }
  179. }
  180. }
  181. detectTailnetDNS(msg.SrcNode.Name)
  182. detectTailnetDNS(tailnetNodesByID[msg.NodeID].Name)
  183. }
  184. // Construct a table of network traffic per connection.
  185. rows := [][7]string{{3: "Tx[P/s]", 4: "Tx[B/s]", 5: "Rx[P/s]", 6: "Rx[B/s]"}}
  186. duration := msg.End.Sub(msg.Start)
  187. addRows := func(heading string, traffic []netlogtype.ConnectionCounts) {
  188. if len(traffic) == 0 {
  189. return
  190. }
  191. slices.SortFunc(traffic, func(x, y netlogtype.ConnectionCounts) int {
  192. nx := x.TxPackets + x.TxBytes + x.RxPackets + x.RxBytes
  193. ny := y.TxPackets + y.TxBytes + y.RxPackets + y.RxBytes
  194. return cmp.Compare(ny, nx)
  195. })
  196. var sum netlogtype.Counts
  197. for _, cc := range traffic {
  198. sum = sum.Add(cc.Counts)
  199. }
  200. rows = append(rows, [7]string{
  201. 0: heading + ":",
  202. 3: formatSI(float64(sum.TxPackets) / duration.Seconds()),
  203. 4: formatIEC(float64(sum.TxBytes) / duration.Seconds()),
  204. 5: formatSI(float64(sum.RxPackets) / duration.Seconds()),
  205. 6: formatIEC(float64(sum.RxBytes) / duration.Seconds()),
  206. })
  207. if len(traffic) == 1 && traffic[0].Connection.IsZero() {
  208. return // this is already a summary counts
  209. }
  210. formatAddrPort := func(a netip.AddrPort) string {
  211. if !a.IsValid() {
  212. return ""
  213. }
  214. name := a.Addr().String()
  215. node, ok := tailnetNodesByAddr[a.Addr()]
  216. if !ok {
  217. node, ok = nodesByAddr[a.Addr()]
  218. }
  219. if ok {
  220. switch *resolveAddrs {
  221. case "nodeid":
  222. name = cmp.Or(string(node.NodeID), name)
  223. case "name":
  224. name = cmp.Or(strings.TrimSuffix(string(node.Name), tailnetDNS), name)
  225. case "user":
  226. name = cmp.Or(bools.IfElse(len(node.Tags) > 0, fmt.Sprint(node.Tags), node.User), name)
  227. }
  228. }
  229. if a.Port() != 0 {
  230. return name + ":" + strconv.Itoa(int(a.Port()))
  231. }
  232. return name
  233. }
  234. for _, cc := range traffic {
  235. row := [7]string{
  236. 0: " ",
  237. 1: formatAddrPort(cc.Src),
  238. 2: formatAddrPort(cc.Dst),
  239. 3: formatSI(float64(cc.TxPackets) / duration.Seconds()),
  240. 4: formatIEC(float64(cc.TxBytes) / duration.Seconds()),
  241. 5: formatSI(float64(cc.RxPackets) / duration.Seconds()),
  242. 6: formatIEC(float64(cc.RxBytes) / duration.Seconds()),
  243. }
  244. if cc.Proto > 0 {
  245. row[0] += cc.Proto.String() + ":"
  246. }
  247. rows = append(rows, row)
  248. }
  249. }
  250. addRows("VirtualTraffic", msg.VirtualTraffic)
  251. addRows("SubnetTraffic", msg.SubnetTraffic)
  252. addRows("ExitTraffic", msg.ExitTraffic)
  253. addRows("PhysicalTraffic", msg.PhysicalTraffic)
  254. // Compute the maximum width of each field.
  255. var maxWidths [7]int
  256. for _, row := range rows {
  257. for i, col := range row {
  258. if maxWidths[i] < len(col) && !(i == 0 && !strings.HasPrefix(col, " ")) {
  259. maxWidths[i] = len(col)
  260. }
  261. }
  262. }
  263. var maxSum int
  264. for _, n := range maxWidths {
  265. maxSum += n
  266. }
  267. // Output a table of network traffic per connection.
  268. line := make([]byte, 0, maxSum+len(" ")+len(" -> ")+4*len(" "))
  269. line = appendRepeatByte(line, '=', cap(line))
  270. fmt.Println(string(line))
  271. if !msg.Logtail.ID.IsZero() {
  272. fmt.Printf("LogID: %s\n", msg.Logtail.ID)
  273. }
  274. if msg.NodeID != "" {
  275. fmt.Printf("NodeID: %s\n", msg.NodeID)
  276. }
  277. formatTime := func(t time.Time) string {
  278. return t.In(time.Local).Format("2006-01-02 15:04:05.000")
  279. }
  280. switch {
  281. case !msg.Logged.IsZero():
  282. fmt.Printf("Logged: %s\n", formatTime(msg.Logged))
  283. case !msg.Logtail.Logged.IsZero():
  284. fmt.Printf("Logged: %s\n", formatTime(msg.Logtail.Logged))
  285. }
  286. fmt.Printf("Window: %s (%0.3fs)\n", formatTime(msg.Start), duration.Seconds())
  287. for i, row := range rows {
  288. line = line[:0]
  289. isHeading := !strings.HasPrefix(row[0], " ")
  290. for j, col := range row {
  291. if isHeading && j == 0 {
  292. col = "" // headings will be printed later
  293. }
  294. switch j {
  295. case 0, 2: // left justified
  296. line = append(line, col...)
  297. line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
  298. case 1, 3, 4, 5, 6: // right justified
  299. line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
  300. line = append(line, col...)
  301. }
  302. switch j {
  303. case 0:
  304. line = append(line, " "...)
  305. case 1:
  306. if row[1] == "" && row[2] == "" {
  307. line = append(line, " "...)
  308. } else {
  309. line = append(line, " -> "...)
  310. }
  311. case 2, 3, 4, 5:
  312. line = append(line, " "...)
  313. }
  314. }
  315. switch {
  316. case i == 0: // print dashed-line table heading
  317. line = appendRepeatByte(line[:0], '-', maxWidths[0]+len(" ")+maxWidths[1]+len(" -> ")+maxWidths[2])[:cap(line)]
  318. case isHeading:
  319. copy(line[:], row[0])
  320. }
  321. fmt.Println(string(line))
  322. }
  323. }
  324. func mustLoadTailnetNodes() {
  325. switch {
  326. case *apiKey == "" && *tailnetName == "":
  327. return // rely on embedded node information in the logs themselves
  328. case *apiKey == "":
  329. log.Fatalf("--api-key must be specified with --resolve-names")
  330. case *tailnetName == "":
  331. log.Fatalf("--tailnet must be specified with --resolve-names")
  332. }
  333. // Query the Tailscale API for a list of devices in the tailnet.
  334. const apiURL = "https://api.tailscale.com/api/v2"
  335. req := must.Get(http.NewRequest("GET", apiURL+"/tailnet/"+*tailnetName+"/devices", nil))
  336. req.Header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(*apiKey+":")))
  337. resp := must.Get(http.DefaultClient.Do(req))
  338. defer resp.Body.Close()
  339. b := must.Get(io.ReadAll(resp.Body))
  340. if resp.StatusCode != 200 {
  341. log.Fatalf("http: %v: %s", http.StatusText(resp.StatusCode), b)
  342. }
  343. // Unmarshal the API response.
  344. var m struct {
  345. Devices []netlogtype.Node `json:"devices"`
  346. }
  347. must.Do(json.Unmarshal(b, &m))
  348. // Construct a mapping of Tailscale IP addresses to node information.
  349. tailnetNodesByAddr = make(map[netip.Addr]netlogtype.Node)
  350. tailnetNodesByID = make(map[tailcfg.StableNodeID]netlogtype.Node)
  351. for _, node := range m.Devices {
  352. for _, addr := range node.Addresses {
  353. tailnetNodesByAddr[addr] = node
  354. }
  355. tailnetNodesByID[node.NodeID] = node
  356. }
  357. }
  358. func appendRepeatByte(b []byte, c byte, n int) []byte {
  359. for range n {
  360. b = append(b, c)
  361. }
  362. return b
  363. }
  364. func formatSI(n float64) string {
  365. switch n := math.Abs(n); {
  366. case n < 1e3:
  367. return fmt.Sprintf("%0.2f ", n/(1e0))
  368. case n < 1e6:
  369. return fmt.Sprintf("%0.2fk", n/(1e3))
  370. case n < 1e9:
  371. return fmt.Sprintf("%0.2fM", n/(1e6))
  372. default:
  373. return fmt.Sprintf("%0.2fG", n/(1e9))
  374. }
  375. }
  376. func formatIEC(n float64) string {
  377. switch n := math.Abs(n); {
  378. case n < 1<<10:
  379. return fmt.Sprintf("%0.2f ", n/(1<<0))
  380. case n < 1<<20:
  381. return fmt.Sprintf("%0.2fKi", n/(1<<10))
  382. case n < 1<<30:
  383. return fmt.Sprintf("%0.2fMi", n/(1<<20))
  384. default:
  385. return fmt.Sprintf("%0.2fGi", n/(1<<30))
  386. }
  387. }