main.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // netlogfmt parses a stream of JSON log messages from stdin and
  4. // formats the network traffic logs produced by "tailscale.com/wgengine/netlog"
  5. // according to the schema in "tailscale.com/types/netlogtype.Message"
  6. // in a more humanly readable format.
  7. //
  8. // Example usage:
  9. //
  10. // $ cat netlog.json | go run tailscale.com/cmd/netlogfmt
  11. // =========================================================================================
  12. // NodeID: n123456CNTRL
  13. // Logged: 2022-10-13T20:23:10.165Z
  14. // Window: 2022-10-13T20:23:09.644Z (5s)
  15. // --------------------------------------------------- Tx[P/s] Tx[B/s] Rx[P/s] Rx[B/s]
  16. // VirtualTraffic: 16.80 1.64Ki 11.20 1.03Ki
  17. // TCP: 100.109.51.95:22 -> 100.85.80.41:42912 16.00 1.59Ki 10.40 1008.84
  18. // TCP: 100.109.51.95:21291 -> 100.107.177.2:53133 0.40 27.60 0.40 24.20
  19. // TCP: 100.109.51.95:21291 -> 100.107.177.2:53134 0.40 23.40 0.40 24.20
  20. // PhysicalTraffic: 16.80 2.32Ki 11.20 1.48Ki
  21. // 100.85.80.41 -> 192.168.0.101:41641 16.00 2.23Ki 10.40 1.40Ki
  22. // 100.107.177.2 -> 192.168.0.100:41641 0.80 83.20 0.80 83.20
  23. // =========================================================================================
  24. package main
  25. import (
  26. "cmp"
  27. "encoding/base64"
  28. "encoding/json"
  29. "flag"
  30. "fmt"
  31. "io"
  32. "log"
  33. "math"
  34. "net/http"
  35. "net/netip"
  36. "os"
  37. "slices"
  38. "strconv"
  39. "strings"
  40. "time"
  41. "github.com/dsnet/try"
  42. jsonv2 "github.com/go-json-experiment/json"
  43. "github.com/go-json-experiment/json/jsontext"
  44. "tailscale.com/tailcfg"
  45. "tailscale.com/types/bools"
  46. "tailscale.com/types/logid"
  47. "tailscale.com/types/netlogtype"
  48. "tailscale.com/util/must"
  49. )
  50. var (
  51. resolveNames = flag.Bool("resolve-names", false, "This is equivalent to specifying \"--resolve-addrs=name\".")
  52. resolveAddrs = flag.String("resolve-addrs", "", "Resolve each tailscale IP address as a node ID, name, or user.\n"+
  53. "If network flow logs do not support embedded node information,\n"+
  54. "then --api-key and --tailnet-name must also be provided.\n"+
  55. "Valid values include \"nodeId\", \"name\", or \"user\".")
  56. apiKey = flag.String("api-key", "", "The API key to query the Tailscale API with.\nSee https://login.tailscale.com/admin/settings/keys")
  57. tailnetName = flag.String("tailnet-name", "", "The Tailnet name to lookup nodes within.\nSee https://login.tailscale.com/admin/settings/general")
  58. )
  59. var (
  60. tailnetNodesByAddr map[netip.Addr]netlogtype.Node
  61. tailnetNodesByID map[tailcfg.StableNodeID]netlogtype.Node
  62. )
  63. func main() {
  64. flag.Parse()
  65. if *resolveNames {
  66. *resolveAddrs = "name"
  67. }
  68. *resolveAddrs = strings.ToLower(*resolveAddrs) // make case-insensitive
  69. *resolveAddrs = strings.TrimSuffix(*resolveAddrs, "s") // allow plural form
  70. *resolveAddrs = strings.ReplaceAll(*resolveAddrs, " ", "") // ignore spaces
  71. *resolveAddrs = strings.ReplaceAll(*resolveAddrs, "-", "") // ignore dashes
  72. *resolveAddrs = strings.ReplaceAll(*resolveAddrs, "_", "") // ignore underscores
  73. switch *resolveAddrs {
  74. case "id", "nodeid":
  75. *resolveAddrs = "nodeid"
  76. case "name", "hostname":
  77. *resolveAddrs = "name"
  78. case "user", "tag", "usertag", "taguser":
  79. *resolveAddrs = "user" // tag resolution is implied
  80. default:
  81. log.Fatalf("--resolve-addrs must be \"nodeId\", \"name\", or \"user\"")
  82. }
  83. mustLoadTailnetNodes()
  84. // The logic handles a stream of arbitrary JSON.
  85. // So long as a JSON object seems like a network log message,
  86. // then this will unmarshal and print it.
  87. if err := processStream(os.Stdin); err != nil {
  88. if err == io.EOF {
  89. return
  90. }
  91. log.Fatalf("processStream: %v", err)
  92. }
  93. }
  94. func processStream(r io.Reader) (err error) {
  95. defer try.Handle(&err)
  96. dec := jsontext.NewDecoder(os.Stdin)
  97. for {
  98. processValue(dec)
  99. }
  100. }
  101. func processValue(dec *jsontext.Decoder) {
  102. switch dec.PeekKind() {
  103. case '[':
  104. processArray(dec)
  105. case '{':
  106. processObject(dec)
  107. default:
  108. try.E(dec.SkipValue())
  109. }
  110. }
  111. func processArray(dec *jsontext.Decoder) {
  112. try.E1(dec.ReadToken()) // parse '['
  113. for dec.PeekKind() != ']' {
  114. processValue(dec)
  115. }
  116. try.E1(dec.ReadToken()) // parse ']'
  117. }
  118. func processObject(dec *jsontext.Decoder) {
  119. var hasTraffic bool
  120. var rawMsg jsontext.Value
  121. try.E1(dec.ReadToken()) // parse '{'
  122. for dec.PeekKind() != '}' {
  123. // Capture any members that could belong to a network log message.
  124. switch name := try.E1(dec.ReadToken()); name.String() {
  125. case "virtualTraffic", "subnetTraffic", "exitTraffic", "physicalTraffic":
  126. hasTraffic = true
  127. fallthrough
  128. case "logtail", "nodeId", "logged", "srcNode", "dstNodes", "start", "end":
  129. if len(rawMsg) == 0 {
  130. rawMsg = append(rawMsg, '{')
  131. } else {
  132. rawMsg = append(rawMsg[:len(rawMsg)-1], ',')
  133. }
  134. rawMsg, _ = jsontext.AppendQuote(rawMsg, name.String())
  135. rawMsg = append(rawMsg, ':')
  136. rawMsg = append(rawMsg, try.E1(dec.ReadValue())...)
  137. rawMsg = append(rawMsg, '}')
  138. default:
  139. processValue(dec)
  140. }
  141. }
  142. try.E1(dec.ReadToken()) // parse '}'
  143. // If this appears to be a network log message, then unmarshal and print it.
  144. if hasTraffic {
  145. var msg message
  146. try.E(jsonv2.Unmarshal(rawMsg, &msg))
  147. printMessage(msg)
  148. }
  149. }
  150. type message struct {
  151. Logtail struct {
  152. ID logid.PublicID `json:"id"`
  153. Logged time.Time `json:"server_time"`
  154. } `json:"logtail"`
  155. Logged time.Time `json:"logged"`
  156. netlogtype.Message
  157. }
  158. func printMessage(msg message) {
  159. var nodesByAddr map[netip.Addr]netlogtype.Node
  160. var tailnetDNS string // e.g., ".acme-corp.ts.net"
  161. if *resolveAddrs != "" {
  162. nodesByAddr = make(map[netip.Addr]netlogtype.Node)
  163. insertNode := func(node netlogtype.Node) {
  164. for _, addr := range node.Addresses {
  165. nodesByAddr[addr] = node
  166. }
  167. }
  168. for _, node := range msg.DstNodes {
  169. insertNode(node)
  170. }
  171. insertNode(msg.SrcNode)
  172. // Derive the Tailnet DNS of the self node.
  173. detectTailnetDNS := func(nodeName string) {
  174. if prefix, ok := strings.CutSuffix(nodeName, ".ts.net"); ok {
  175. if i := strings.LastIndexByte(prefix, '.'); i > 0 {
  176. tailnetDNS = nodeName[i:]
  177. }
  178. }
  179. }
  180. detectTailnetDNS(msg.SrcNode.Name)
  181. detectTailnetDNS(tailnetNodesByID[msg.NodeID].Name)
  182. }
  183. // Construct a table of network traffic per connection.
  184. rows := [][7]string{{3: "Tx[P/s]", 4: "Tx[B/s]", 5: "Rx[P/s]", 6: "Rx[B/s]"}}
  185. duration := msg.End.Sub(msg.Start)
  186. addRows := func(heading string, traffic []netlogtype.ConnectionCounts) {
  187. if len(traffic) == 0 {
  188. return
  189. }
  190. slices.SortFunc(traffic, func(x, y netlogtype.ConnectionCounts) int {
  191. nx := x.TxPackets + x.TxBytes + x.RxPackets + x.RxBytes
  192. ny := y.TxPackets + y.TxBytes + y.RxPackets + y.RxBytes
  193. return cmp.Compare(ny, nx)
  194. })
  195. var sum netlogtype.Counts
  196. for _, cc := range traffic {
  197. sum = sum.Add(cc.Counts)
  198. }
  199. rows = append(rows, [7]string{
  200. 0: heading + ":",
  201. 3: formatSI(float64(sum.TxPackets) / duration.Seconds()),
  202. 4: formatIEC(float64(sum.TxBytes) / duration.Seconds()),
  203. 5: formatSI(float64(sum.RxPackets) / duration.Seconds()),
  204. 6: formatIEC(float64(sum.RxBytes) / duration.Seconds()),
  205. })
  206. if len(traffic) == 1 && traffic[0].Connection.IsZero() {
  207. return // this is already a summary counts
  208. }
  209. formatAddrPort := func(a netip.AddrPort) string {
  210. if !a.IsValid() {
  211. return ""
  212. }
  213. name := a.Addr().String()
  214. node, ok := tailnetNodesByAddr[a.Addr()]
  215. if !ok {
  216. node, ok = nodesByAddr[a.Addr()]
  217. }
  218. if ok {
  219. switch *resolveAddrs {
  220. case "nodeid":
  221. name = cmp.Or(string(node.NodeID), name)
  222. case "name":
  223. name = cmp.Or(strings.TrimSuffix(string(node.Name), tailnetDNS), name)
  224. case "user":
  225. name = cmp.Or(bools.IfElse(len(node.Tags) > 0, fmt.Sprint(node.Tags), node.User), name)
  226. }
  227. }
  228. if a.Port() != 0 {
  229. return name + ":" + strconv.Itoa(int(a.Port()))
  230. }
  231. return name
  232. }
  233. for _, cc := range traffic {
  234. row := [7]string{
  235. 0: " ",
  236. 1: formatAddrPort(cc.Src),
  237. 2: formatAddrPort(cc.Dst),
  238. 3: formatSI(float64(cc.TxPackets) / duration.Seconds()),
  239. 4: formatIEC(float64(cc.TxBytes) / duration.Seconds()),
  240. 5: formatSI(float64(cc.RxPackets) / duration.Seconds()),
  241. 6: formatIEC(float64(cc.RxBytes) / duration.Seconds()),
  242. }
  243. if cc.Proto > 0 {
  244. row[0] += cc.Proto.String() + ":"
  245. }
  246. rows = append(rows, row)
  247. }
  248. }
  249. addRows("VirtualTraffic", msg.VirtualTraffic)
  250. addRows("SubnetTraffic", msg.SubnetTraffic)
  251. addRows("ExitTraffic", msg.ExitTraffic)
  252. addRows("PhysicalTraffic", msg.PhysicalTraffic)
  253. // Compute the maximum width of each field.
  254. var maxWidths [7]int
  255. for _, row := range rows {
  256. for i, col := range row {
  257. if maxWidths[i] < len(col) && !(i == 0 && !strings.HasPrefix(col, " ")) {
  258. maxWidths[i] = len(col)
  259. }
  260. }
  261. }
  262. var maxSum int
  263. for _, n := range maxWidths {
  264. maxSum += n
  265. }
  266. // Output a table of network traffic per connection.
  267. line := make([]byte, 0, maxSum+len(" ")+len(" -> ")+4*len(" "))
  268. line = appendRepeatByte(line, '=', cap(line))
  269. fmt.Println(string(line))
  270. if !msg.Logtail.ID.IsZero() {
  271. fmt.Printf("LogID: %s\n", msg.Logtail.ID)
  272. }
  273. if msg.NodeID != "" {
  274. fmt.Printf("NodeID: %s\n", msg.NodeID)
  275. }
  276. formatTime := func(t time.Time) string {
  277. return t.In(time.Local).Format("2006-01-02 15:04:05.000")
  278. }
  279. switch {
  280. case !msg.Logged.IsZero():
  281. fmt.Printf("Logged: %s\n", formatTime(msg.Logged))
  282. case !msg.Logtail.Logged.IsZero():
  283. fmt.Printf("Logged: %s\n", formatTime(msg.Logtail.Logged))
  284. }
  285. fmt.Printf("Window: %s (%0.3fs)\n", formatTime(msg.Start), duration.Seconds())
  286. for i, row := range rows {
  287. line = line[:0]
  288. isHeading := !strings.HasPrefix(row[0], " ")
  289. for j, col := range row {
  290. if isHeading && j == 0 {
  291. col = "" // headings will be printed later
  292. }
  293. switch j {
  294. case 0, 2: // left justified
  295. line = append(line, col...)
  296. line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
  297. case 1, 3, 4, 5, 6: // right justified
  298. line = appendRepeatByte(line, ' ', maxWidths[j]-len(col))
  299. line = append(line, col...)
  300. }
  301. switch j {
  302. case 0:
  303. line = append(line, " "...)
  304. case 1:
  305. if row[1] == "" && row[2] == "" {
  306. line = append(line, " "...)
  307. } else {
  308. line = append(line, " -> "...)
  309. }
  310. case 2, 3, 4, 5:
  311. line = append(line, " "...)
  312. }
  313. }
  314. switch {
  315. case i == 0: // print dashed-line table heading
  316. line = appendRepeatByte(line[:0], '-', maxWidths[0]+len(" ")+maxWidths[1]+len(" -> ")+maxWidths[2])[:cap(line)]
  317. case isHeading:
  318. copy(line[:], row[0])
  319. }
  320. fmt.Println(string(line))
  321. }
  322. }
  323. func mustLoadTailnetNodes() {
  324. switch {
  325. case *apiKey == "" && *tailnetName == "":
  326. return // rely on embedded node information in the logs themselves
  327. case *apiKey == "":
  328. log.Fatalf("--api-key must be specified with --resolve-names")
  329. case *tailnetName == "":
  330. log.Fatalf("--tailnet must be specified with --resolve-names")
  331. }
  332. // Query the Tailscale API for a list of devices in the tailnet.
  333. const apiURL = "https://api.tailscale.com/api/v2"
  334. req := must.Get(http.NewRequest("GET", apiURL+"/tailnet/"+*tailnetName+"/devices", nil))
  335. req.Header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(*apiKey+":")))
  336. resp := must.Get(http.DefaultClient.Do(req))
  337. defer resp.Body.Close()
  338. b := must.Get(io.ReadAll(resp.Body))
  339. if resp.StatusCode != 200 {
  340. log.Fatalf("http: %v: %s", http.StatusText(resp.StatusCode), b)
  341. }
  342. // Unmarshal the API response.
  343. var m struct {
  344. Devices []netlogtype.Node `json:"devices"`
  345. }
  346. must.Do(json.Unmarshal(b, &m))
  347. // Construct a mapping of Tailscale IP addresses to node information.
  348. tailnetNodesByAddr = make(map[netip.Addr]netlogtype.Node)
  349. tailnetNodesByID = make(map[tailcfg.StableNodeID]netlogtype.Node)
  350. for _, node := range m.Devices {
  351. for _, addr := range node.Addresses {
  352. tailnetNodesByAddr[addr] = node
  353. }
  354. tailnetNodesByID[node.NodeID] = node
  355. }
  356. }
  357. func appendRepeatByte(b []byte, c byte, n int) []byte {
  358. for range n {
  359. b = append(b, c)
  360. }
  361. return b
  362. }
  363. func formatSI(n float64) string {
  364. switch n := math.Abs(n); {
  365. case n < 1e3:
  366. return fmt.Sprintf("%0.2f ", n/(1e0))
  367. case n < 1e6:
  368. return fmt.Sprintf("%0.2fk", n/(1e3))
  369. case n < 1e9:
  370. return fmt.Sprintf("%0.2fM", n/(1e6))
  371. default:
  372. return fmt.Sprintf("%0.2fG", n/(1e9))
  373. }
  374. }
  375. func formatIEC(n float64) string {
  376. switch n := math.Abs(n); {
  377. case n < 1<<10:
  378. return fmt.Sprintf("%0.2f ", n/(1<<0))
  379. case n < 1<<20:
  380. return fmt.Sprintf("%0.2fKi", n/(1<<10))
  381. case n < 1<<30:
  382. return fmt.Sprintf("%0.2fMi", n/(1<<20))
  383. default:
  384. return fmt.Sprintf("%0.2fGi", n/(1<<30))
  385. }
  386. }