logreprocess.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // The logreprocess program tails a log and reprocesses it.
  4. package main
  5. import (
  6. "bufio"
  7. "encoding/json"
  8. "flag"
  9. "io"
  10. "log"
  11. "net/http"
  12. "os"
  13. "strings"
  14. "time"
  15. "tailscale.com/types/logid"
  16. )
  17. func main() {
  18. collection := flag.String("c", "", "logtail collection name to read")
  19. apiKey := flag.String("p", "", "logtail API key")
  20. timeout := flag.Duration("t", 0, "timeout after which logreprocess quits")
  21. flag.Parse()
  22. if len(flag.Args()) != 0 {
  23. flag.Usage()
  24. os.Exit(1)
  25. }
  26. log.SetFlags(0)
  27. if *timeout != 0 {
  28. go func() {
  29. <-time.After(*timeout)
  30. log.Printf("logreprocess: timeout reached, quitting")
  31. os.Exit(1)
  32. }()
  33. }
  34. req, err := http.NewRequest("GET", "https://log.tailscale.com/c/"+*collection+"?stream=true", nil)
  35. if err != nil {
  36. log.Fatal(err)
  37. }
  38. req.SetBasicAuth(*apiKey, "")
  39. resp, err := http.DefaultClient.Do(req)
  40. if err != nil {
  41. log.Fatal(err)
  42. }
  43. defer resp.Body.Close()
  44. if resp.StatusCode != 200 {
  45. b, err := io.ReadAll(resp.Body)
  46. if err != nil {
  47. log.Fatalf("logreprocess: read error %d: %v", resp.StatusCode, err)
  48. }
  49. log.Fatalf("logreprocess: read error %d: %s", resp.StatusCode, string(b))
  50. }
  51. tracebackCache := make(map[logid.PublicID]*ProcessedMsg)
  52. scanner := bufio.NewScanner(resp.Body)
  53. for scanner.Scan() {
  54. var msg Msg
  55. if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil {
  56. log.Fatalf("logreprocess of %q: %v", string(scanner.Bytes()), err)
  57. }
  58. var pMsg *ProcessedMsg
  59. if pMsg = tracebackCache[msg.Logtail.Instance]; pMsg != nil {
  60. pMsg.Text += "\n" + msg.Text
  61. if strings.HasPrefix(msg.Text, "Exception: ") {
  62. delete(tracebackCache, msg.Logtail.Instance)
  63. } else {
  64. continue // write later
  65. }
  66. } else {
  67. pMsg = &ProcessedMsg{
  68. OrigInstance: msg.Logtail.Instance,
  69. Text: msg.Text,
  70. }
  71. pMsg.Logtail.ClientTime = msg.Logtail.ClientTime
  72. }
  73. if strings.HasPrefix(msg.Text, "Traceback (most recent call last):") {
  74. tracebackCache[msg.Logtail.Instance] = pMsg
  75. continue // write later
  76. }
  77. b, err := json.Marshal(pMsg)
  78. if err != nil {
  79. log.Fatal(err)
  80. }
  81. log.Printf("%s", b)
  82. }
  83. if err := scanner.Err(); err != nil {
  84. log.Fatal(err)
  85. }
  86. }
  87. type Msg struct {
  88. Logtail struct {
  89. Instance logid.PublicID `json:"instance"`
  90. ClientTime time.Time `json:"client_time"`
  91. } `json:"logtail"`
  92. Text string `json:"text"`
  93. }
  94. type ProcessedMsg struct {
  95. Logtail struct {
  96. ClientTime time.Time `json:"client_time"`
  97. } `json:"logtail"`
  98. OrigInstance logid.PublicID `json:"orig_instance"`
  99. Text string `json:"text"`
  100. }