logtail.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package logtail sends logs to log.tailscale.io.
  4. package logtail
  5. import (
  6. "bytes"
  7. "context"
  8. "crypto/rand"
  9. "encoding/binary"
  10. "fmt"
  11. "io"
  12. "log"
  13. mrand "math/rand/v2"
  14. "net/http"
  15. "net/netip"
  16. "os"
  17. "regexp"
  18. "runtime"
  19. "slices"
  20. "strconv"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/go-json-experiment/json/jsontext"
  25. "tailscale.com/envknob"
  26. "tailscale.com/net/netmon"
  27. "tailscale.com/net/sockstats"
  28. "tailscale.com/net/tsaddr"
  29. "tailscale.com/tstime"
  30. tslogger "tailscale.com/types/logger"
  31. "tailscale.com/types/logid"
  32. "tailscale.com/util/set"
  33. "tailscale.com/util/truncate"
  34. "tailscale.com/util/zstdframe"
  35. )
  36. // maxSize is the maximum size that a single log entry can be.
  37. // It is also the maximum body size that may be uploaded at a time.
  38. const maxSize = 256 << 10
  39. // maxTextSize is the maximum size for a text log message.
  40. // Note that JSON log messages can be as large as maxSize.
  41. const maxTextSize = 16 << 10
  42. // lowMemRatio reduces maxSize and maxTextSize by this ratio in lowMem mode.
  43. const lowMemRatio = 4
  44. // bufferSize is the typical buffer size to retain.
  45. // It is large enough to handle most log messages,
  46. // but not too large to be a notable waste of memory if retained forever.
  47. const bufferSize = 4 << 10
  48. // DefaultHost is the default host name to upload logs to when
  49. // Config.BaseURL isn't provided.
  50. const DefaultHost = "log.tailscale.io"
  51. const defaultFlushDelay = 2 * time.Second
  52. const (
  53. // CollectionNode is the name of a logtail Config.Collection
  54. // for tailscaled (or equivalent: IPNExtension, Android app).
  55. CollectionNode = "tailnode.log.tailscale.io"
  56. )
  57. type Config struct {
  58. Collection string // collection name, a domain name
  59. PrivateID logid.PrivateID // private ID for the primary log stream
  60. CopyPrivateID logid.PrivateID // private ID for a log stream that is a superset of this log stream
  61. BaseURL string // if empty defaults to "https://log.tailscale.io"
  62. HTTPC *http.Client // if empty defaults to http.DefaultClient
  63. SkipClientTime bool // if true, client_time is not written to logs
  64. LowMemory bool // if true, logtail minimizes memory use
  65. Clock tstime.Clock // if set, Clock.Now substitutes uses of time.Now
  66. Stderr io.Writer // if set, logs are sent here instead of os.Stderr
  67. StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
  68. Buffer Buffer // temp storage, if nil a MemoryBuffer
  69. CompressLogs bool // whether to compress the log uploads
  70. // MetricsDelta, if non-nil, is a func that returns an encoding
  71. // delta in clientmetrics to upload alongside existing logs.
  72. // It can return either an empty string (for nothing) or a string
  73. // that's safe to embed in a JSON string literal without further escaping.
  74. MetricsDelta func() string
  75. // FlushDelayFn, if non-nil is a func that returns how long to wait to
  76. // accumulate logs before uploading them. 0 or negative means to upload
  77. // immediately.
  78. //
  79. // If nil, a default value is used. (currently 2 seconds)
  80. FlushDelayFn func() time.Duration
  81. // IncludeProcID, if true, results in an ephemeral process identifier being
  82. // included in logs. The ID is random and not guaranteed to be globally
  83. // unique, but it can be used to distinguish between different instances
  84. // running with same PrivateID.
  85. IncludeProcID bool
  86. // IncludeProcSequence, if true, results in an ephemeral sequence number
  87. // being included in the logs. The sequence number is incremented for each
  88. // log message sent, but is not persisted across process restarts.
  89. IncludeProcSequence bool
  90. }
  91. func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
  92. if cfg.BaseURL == "" {
  93. cfg.BaseURL = "https://" + DefaultHost
  94. }
  95. if cfg.HTTPC == nil {
  96. cfg.HTTPC = http.DefaultClient
  97. }
  98. if cfg.Clock == nil {
  99. cfg.Clock = tstime.StdClock{}
  100. }
  101. if cfg.Stderr == nil {
  102. cfg.Stderr = os.Stderr
  103. }
  104. if cfg.Buffer == nil {
  105. pendingSize := 256
  106. if cfg.LowMemory {
  107. pendingSize = 64
  108. }
  109. cfg.Buffer = NewMemoryBuffer(pendingSize)
  110. }
  111. var procID uint32
  112. if cfg.IncludeProcID {
  113. keyBytes := make([]byte, 4)
  114. rand.Read(keyBytes)
  115. procID = binary.LittleEndian.Uint32(keyBytes)
  116. if procID == 0 {
  117. // 0 is the empty/off value, assign a different (non-zero) value to
  118. // make sure we still include an ID (actual value does not matter).
  119. procID = 7
  120. }
  121. }
  122. if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
  123. if delay, err := time.ParseDuration(s); err == nil {
  124. cfg.FlushDelayFn = func() time.Duration { return delay }
  125. } else {
  126. log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
  127. }
  128. } else if cfg.FlushDelayFn == nil && envknob.Bool("IN_TS_TEST") {
  129. cfg.FlushDelayFn = func() time.Duration { return 0 }
  130. }
  131. var urlSuffix string
  132. if !cfg.CopyPrivateID.IsZero() {
  133. urlSuffix = "?copyId=" + cfg.CopyPrivateID.String()
  134. }
  135. l := &Logger{
  136. privateID: cfg.PrivateID,
  137. stderr: cfg.Stderr,
  138. stderrLevel: int64(cfg.StderrLevel),
  139. httpc: cfg.HTTPC,
  140. url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
  141. lowMem: cfg.LowMemory,
  142. buffer: cfg.Buffer,
  143. skipClientTime: cfg.SkipClientTime,
  144. drainWake: make(chan struct{}, 1),
  145. sentinel: make(chan int32, 16),
  146. flushDelayFn: cfg.FlushDelayFn,
  147. clock: cfg.Clock,
  148. metricsDelta: cfg.MetricsDelta,
  149. procID: procID,
  150. includeProcSequence: cfg.IncludeProcSequence,
  151. shutdownStart: make(chan struct{}),
  152. shutdownDone: make(chan struct{}),
  153. }
  154. l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
  155. l.compressLogs = cfg.CompressLogs
  156. ctx, cancel := context.WithCancel(context.Background())
  157. l.uploadCancel = cancel
  158. go l.uploading(ctx)
  159. l.Write([]byte("logtail started"))
  160. return l
  161. }
  162. // Logger writes logs, splitting them as configured between local
  163. // logging facilities and uploading to a log server.
  164. type Logger struct {
  165. stderr io.Writer
  166. stderrLevel int64 // accessed atomically
  167. httpc *http.Client
  168. url string
  169. lowMem bool
  170. skipClientTime bool
  171. netMonitor *netmon.Monitor
  172. buffer Buffer
  173. drainWake chan struct{} // signal to speed up drain
  174. drainBuf []byte // owned by drainPending for reuse
  175. flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
  176. flushPending atomic.Bool
  177. sentinel chan int32
  178. clock tstime.Clock
  179. compressLogs bool
  180. uploadCancel func()
  181. explainedRaw bool
  182. metricsDelta func() string // or nil
  183. privateID logid.PrivateID
  184. httpDoCalls atomic.Int32
  185. sockstatsLabel atomicSocktatsLabel
  186. procID uint32
  187. includeProcSequence bool
  188. writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
  189. procSequence uint64
  190. flushTimer tstime.TimerController // used when flushDelay is >0
  191. writeBuf [bufferSize]byte // owned by Write for reuse
  192. jsonDec jsontext.Decoder // owned by appendTextOrJSONLocked for reuse
  193. shutdownStartMu sync.Mutex // guards the closing of shutdownStart
  194. shutdownStart chan struct{} // closed when shutdown begins
  195. shutdownDone chan struct{} // closed when shutdown complete
  196. }
  197. type atomicSocktatsLabel struct{ p atomic.Uint32 }
  198. func (p *atomicSocktatsLabel) Load() sockstats.Label { return sockstats.Label(p.p.Load()) }
  199. func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(label)) }
  200. // SetVerbosityLevel controls the verbosity level that should be
  201. // written to stderr. 0 is the default (not verbose). Levels 1 or higher
  202. // are increasingly verbose.
  203. func (l *Logger) SetVerbosityLevel(level int) {
  204. atomic.StoreInt64(&l.stderrLevel, int64(level))
  205. }
  206. // SetNetMon sets the network monitor.
  207. //
  208. // It should not be changed concurrently with log writes and should
  209. // only be set once.
  210. func (l *Logger) SetNetMon(lm *netmon.Monitor) {
  211. l.netMonitor = lm
  212. }
  213. // SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
  214. func (l *Logger) SetSockstatsLabel(label sockstats.Label) {
  215. l.sockstatsLabel.Store(label)
  216. }
  217. // PrivateID returns the logger's private log ID.
  218. //
  219. // It exists for internal use only.
  220. func (l *Logger) PrivateID() logid.PrivateID { return l.privateID }
  221. // Shutdown gracefully shuts down the logger while completing any
  222. // remaining uploads.
  223. //
  224. // It will block, continuing to try and upload unless the passed
  225. // context object interrupts it by being done.
  226. // If the shutdown is interrupted, an error is returned.
  227. func (l *Logger) Shutdown(ctx context.Context) error {
  228. done := make(chan struct{})
  229. go func() {
  230. select {
  231. case <-ctx.Done():
  232. l.uploadCancel()
  233. <-l.shutdownDone
  234. case <-l.shutdownDone:
  235. }
  236. close(done)
  237. l.httpc.CloseIdleConnections()
  238. }()
  239. l.shutdownStartMu.Lock()
  240. select {
  241. case <-l.shutdownStart:
  242. l.shutdownStartMu.Unlock()
  243. return nil
  244. default:
  245. }
  246. close(l.shutdownStart)
  247. l.shutdownStartMu.Unlock()
  248. io.WriteString(l, "logger closing down\n")
  249. <-done
  250. return nil
  251. }
  252. // Close shuts down this logger object, the background log uploader
  253. // process, and any associated goroutines.
  254. //
  255. // Deprecated: use Shutdown
  256. func (l *Logger) Close() {
  257. l.Shutdown(context.Background())
  258. }
  259. // drainBlock is called by drainPending when there are no logs to drain.
  260. //
  261. // In typical operation, every call to the Write method unblocks and triggers a
  262. // buffer.TryReadline, so logs are written with very low latency.
  263. //
  264. // If the caller specified FlushInterface, drainWake is only sent to
  265. // periodically.
  266. func (l *Logger) drainBlock() (shuttingDown bool) {
  267. select {
  268. case <-l.shutdownStart:
  269. return true
  270. case <-l.drainWake:
  271. }
  272. return false
  273. }
  274. // drainPending drains and encodes a batch of logs from the buffer for upload.
  275. // If no logs are available, drainPending blocks until logs are available.
  276. // The returned buffer is only valid until the next call to drainPending.
  277. func (l *Logger) drainPending() (b []byte) {
  278. b = l.drainBuf[:0]
  279. b = append(b, '[')
  280. defer func() {
  281. b = bytes.TrimRight(b, ",")
  282. b = append(b, ']')
  283. l.drainBuf = b
  284. if len(b) <= len("[]") {
  285. b = nil
  286. }
  287. }()
  288. maxLen := maxSize
  289. if l.lowMem {
  290. // When operating in a low memory environment, it is better to upload
  291. // in multiple operations than it is to allocate a large body and OOM.
  292. // Even if maxLen is less than maxSize, we can still upload an entry
  293. // that is up to maxSize if we happen to encounter one.
  294. maxLen /= lowMemRatio
  295. }
  296. for len(b) < maxLen {
  297. line, err := l.buffer.TryReadLine()
  298. switch {
  299. case err == io.EOF:
  300. return b
  301. case err != nil:
  302. b = append(b, '{')
  303. b = l.appendMetadata(b, false, true, 0, 0, "reading ringbuffer: "+err.Error(), nil, 0)
  304. b = bytes.TrimRight(b, ",")
  305. b = append(b, '}')
  306. return b
  307. case line == nil:
  308. // If we read at least some log entries, return immediately.
  309. if len(b) > len("[") {
  310. return b
  311. }
  312. // We're about to block. If we're holding on to too much memory
  313. // in our buffer from a previous large write, let it go.
  314. if cap(b) > bufferSize {
  315. b = bytes.Clone(b)
  316. l.drainBuf = b
  317. }
  318. if shuttingDown := l.drainBlock(); shuttingDown {
  319. return b
  320. }
  321. continue
  322. }
  323. switch {
  324. case len(line) == 0:
  325. continue
  326. case line[0] == '{' && jsontext.Value(line).IsValid():
  327. // This is already a valid JSON object, so just append it.
  328. // This may exceed maxLen, but should be no larger than maxSize
  329. // so long as logic writing into the buffer enforces the limit.
  330. b = append(b, line...)
  331. default:
  332. // This is probably a log added to stderr by filch
  333. // outside of the logtail logger. Encode it.
  334. if !l.explainedRaw {
  335. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  336. fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
  337. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  338. fmt.Fprintf(l.stderr, "RAW-STDERR:\n")
  339. l.explainedRaw = true
  340. }
  341. fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
  342. // Do not add a client time, as it could be really old.
  343. // Do not include instance key or ID either,
  344. // since this came from a different instance.
  345. b = l.appendText(b, line, true, 0, 0, 0)
  346. }
  347. b = append(b, ',')
  348. }
  349. return b
  350. }
  351. // This is the goroutine that repeatedly uploads logs in the background.
  352. func (l *Logger) uploading(ctx context.Context) {
  353. defer close(l.shutdownDone)
  354. for {
  355. body := l.drainPending()
  356. origlen := -1 // sentinel value: uncompressed
  357. // Don't attempt to compress tiny bodies; not worth the CPU cycles.
  358. if l.compressLogs && len(body) > 256 {
  359. zbody := zstdframe.AppendEncode(nil, body,
  360. zstdframe.FastestCompression, zstdframe.LowMemory(true))
  361. // Only send it compressed if the bandwidth savings are sufficient.
  362. // Just the extra headers associated with enabling compression
  363. // are 50 bytes by themselves.
  364. if len(body)-len(zbody) > 64 {
  365. origlen = len(body)
  366. body = zbody
  367. }
  368. }
  369. var lastError string
  370. var numFailures int
  371. var firstFailure time.Time
  372. for len(body) > 0 && ctx.Err() == nil {
  373. retryAfter, err := l.upload(ctx, body, origlen)
  374. if err != nil {
  375. numFailures++
  376. firstFailure = l.clock.Now()
  377. if !l.internetUp() {
  378. fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
  379. l.awaitInternetUp(ctx)
  380. continue
  381. }
  382. // Only print the same message once.
  383. if currError := err.Error(); lastError != currError {
  384. fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
  385. lastError = currError
  386. }
  387. // Sleep for the specified retryAfter period,
  388. // otherwise default to some random value.
  389. if retryAfter <= 0 {
  390. retryAfter = mrand.N(30*time.Second) + 30*time.Second
  391. }
  392. tstime.Sleep(ctx, retryAfter)
  393. } else {
  394. // Only print a success message after recovery.
  395. if numFailures > 0 {
  396. fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, l.clock.Since(firstFailure).Round(time.Second))
  397. }
  398. break
  399. }
  400. }
  401. select {
  402. case <-l.shutdownStart:
  403. return
  404. default:
  405. }
  406. }
  407. }
  408. func (l *Logger) internetUp() bool {
  409. if l.netMonitor == nil {
  410. // No way to tell, so assume it is.
  411. return true
  412. }
  413. return l.netMonitor.InterfaceState().AnyInterfaceUp()
  414. }
  415. func (l *Logger) awaitInternetUp(ctx context.Context) {
  416. upc := make(chan bool, 1)
  417. defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
  418. if delta.New.AnyInterfaceUp() {
  419. select {
  420. case upc <- true:
  421. default:
  422. }
  423. }
  424. })()
  425. if l.internetUp() {
  426. return
  427. }
  428. select {
  429. case <-upc:
  430. fmt.Fprintf(l.stderr, "logtail: internet back up\n")
  431. case <-ctx.Done():
  432. }
  433. }
  434. // upload uploads body to the log server.
  435. // origlen indicates the pre-compression body length.
  436. // origlen of -1 indicates that the body is not compressed.
  437. func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) {
  438. const maxUploadTime = 45 * time.Second
  439. ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel.Load(), l.Logf)
  440. ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
  441. defer cancel()
  442. req, err := http.NewRequestWithContext(ctx, "POST", l.url, bytes.NewReader(body))
  443. if err != nil {
  444. // I know of no conditions under which this could fail.
  445. // Report it very loudly.
  446. // TODO record logs to disk
  447. panic("logtail: cannot build http request: " + err.Error())
  448. }
  449. if origlen != -1 {
  450. req.Header.Add("Content-Encoding", "zstd")
  451. req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
  452. }
  453. if runtime.GOOS == "js" {
  454. // We once advertised we'd accept optional client certs (for internal use)
  455. // on log.tailscale.io but then Tailscale SSH js/wasm clients prompted
  456. // users (on some browsers?) to pick a client cert. We'll fix the server's
  457. // TLS ServerHello, but we can also fix it client side for good measure.
  458. //
  459. // Corp details: https://github.com/tailscale/corp/issues/18177#issuecomment-2026598715
  460. // and https://github.com/tailscale/corp/pull/18775#issuecomment-2027505036
  461. //
  462. // See https://github.com/golang/go/wiki/WebAssembly#configuring-fetch-options-while-using-nethttp
  463. // and https://developer.mozilla.org/en-US/docs/Web/API/fetch#credentials
  464. req.Header.Set("js.fetch:credentials", "omit")
  465. }
  466. req.Header["User-Agent"] = nil // not worth writing one; save some bytes
  467. compressedNote := "not-compressed"
  468. if origlen != -1 {
  469. compressedNote = "compressed"
  470. }
  471. l.httpDoCalls.Add(1)
  472. resp, err := l.httpc.Do(req)
  473. if err != nil {
  474. return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
  475. }
  476. defer resp.Body.Close()
  477. if resp.StatusCode != http.StatusOK {
  478. n, _ := strconv.Atoi(resp.Header.Get("Retry-After"))
  479. b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
  480. return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b))
  481. }
  482. return 0, nil
  483. }
  484. // Flush uploads all logs to the server. It blocks until complete or there is an
  485. // unrecoverable error.
  486. //
  487. // TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
  488. // Finish cleaning this up.
  489. func (l *Logger) Flush() error {
  490. return nil
  491. }
  492. // StartFlush starts a log upload, if anything is pending.
  493. //
  494. // If l is nil, StartFlush is a no-op.
  495. func (l *Logger) StartFlush() {
  496. if l != nil {
  497. l.tryDrainWake()
  498. }
  499. }
  500. // logtailDisabled is whether logtail uploads to logcatcher are disabled.
  501. var logtailDisabled atomic.Bool
  502. // Disable disables logtail uploads for the lifetime of the process.
  503. func Disable() {
  504. logtailDisabled.Store(true)
  505. }
  506. var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
  507. // tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
  508. // It does not block.
  509. func (l *Logger) tryDrainWake() {
  510. l.flushPending.Store(false)
  511. if debugWakesAndUploads() {
  512. // Using println instead of log.Printf here to avoid recursing back into
  513. // ourselves.
  514. println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load())
  515. }
  516. select {
  517. case l.drainWake <- struct{}{}:
  518. default:
  519. }
  520. }
  521. func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
  522. tapSend(jsonBlob)
  523. if logtailDisabled.Load() {
  524. return len(jsonBlob), nil
  525. }
  526. n, err := l.buffer.Write(jsonBlob)
  527. flushDelay := defaultFlushDelay
  528. if l.flushDelayFn != nil {
  529. flushDelay = l.flushDelayFn()
  530. }
  531. if flushDelay > 0 {
  532. if l.flushPending.CompareAndSwap(false, true) {
  533. if l.flushTimer == nil {
  534. l.flushTimer = l.clock.AfterFunc(flushDelay, l.tryDrainWake)
  535. } else {
  536. l.flushTimer.Reset(flushDelay)
  537. }
  538. }
  539. } else {
  540. l.tryDrainWake()
  541. }
  542. return n, err
  543. }
  544. // appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
  545. // This assumes dst is already within a JSON object.
  546. // Each member is comma-terminated.
  547. func (l *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, procID uint32, procSequence uint64, errDetail string, errData jsontext.Value, level int) []byte {
  548. // Append optional logtail metadata.
  549. if !skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
  550. dst = append(dst, `"logtail":{`...)
  551. if !skipClientTime {
  552. dst = append(dst, `"client_time":"`...)
  553. dst = l.clock.Now().UTC().AppendFormat(dst, time.RFC3339Nano)
  554. dst = append(dst, '"', ',')
  555. }
  556. if procID != 0 {
  557. dst = append(dst, `"proc_id":`...)
  558. dst = strconv.AppendUint(dst, uint64(procID), 10)
  559. dst = append(dst, ',')
  560. }
  561. if procSequence != 0 {
  562. dst = append(dst, `"proc_seq":`...)
  563. dst = strconv.AppendUint(dst, procSequence, 10)
  564. dst = append(dst, ',')
  565. }
  566. if errDetail != "" || errData != nil {
  567. dst = append(dst, `"error":{`...)
  568. if errDetail != "" {
  569. dst = append(dst, `"detail":`...)
  570. dst, _ = jsontext.AppendQuote(dst, errDetail)
  571. dst = append(dst, ',')
  572. }
  573. if errData != nil {
  574. dst = append(dst, `"bad_data":`...)
  575. dst = append(dst, errData...)
  576. dst = append(dst, ',')
  577. }
  578. dst = bytes.TrimRight(dst, ",")
  579. dst = append(dst, '}', ',')
  580. }
  581. dst = bytes.TrimRight(dst, ",")
  582. dst = append(dst, '}', ',')
  583. }
  584. // Append optional metrics metadata.
  585. if !skipMetrics && l.metricsDelta != nil {
  586. if d := l.metricsDelta(); d != "" {
  587. dst = append(dst, `"metrics":"`...)
  588. dst = append(dst, d...)
  589. dst = append(dst, '"', ',')
  590. }
  591. }
  592. // Add the optional log level, if non-zero.
  593. // Note that we only use log levels 1 and 2 currently.
  594. // It's unlikely we'll ever make it past 9.
  595. if level > 0 && level < 10 {
  596. dst = append(dst, `"v":`...)
  597. dst = append(dst, '0'+byte(level))
  598. dst = append(dst, ',')
  599. }
  600. return dst
  601. }
  602. // appendText appends a raw text message in the Tailscale JSON log entry format.
  603. func (l *Logger) appendText(dst, src []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
  604. dst = slices.Grow(dst, len(src))
  605. dst = append(dst, '{')
  606. dst = l.appendMetadata(dst, skipClientTime, false, procID, procSequence, "", nil, level)
  607. if len(src) == 0 {
  608. dst = bytes.TrimRight(dst, ",")
  609. return append(dst, "}\n"...)
  610. }
  611. // Append the text string, which may be truncated.
  612. // Invalid UTF-8 will be mangled with the Unicode replacement character.
  613. max := maxTextSize
  614. if l.lowMem {
  615. max /= lowMemRatio
  616. }
  617. dst = append(dst, `"text":`...)
  618. dst = appendTruncatedString(dst, src, max)
  619. return append(dst, "}\n"...)
  620. }
  621. // appendTruncatedString appends a JSON string for src,
  622. // truncating the src to be no larger than n.
  623. func appendTruncatedString(dst, src []byte, n int) []byte {
  624. srcLen := len(src)
  625. src = truncate.String(src, n)
  626. dst, _ = jsontext.AppendQuote(dst, src) // ignore error; only occurs for invalid UTF-8
  627. if srcLen > len(src) {
  628. dst = dst[:len(dst)-len(`"`)] // trim off preceding double-quote
  629. dst = append(dst, "…+"...)
  630. dst = strconv.AppendInt(dst, int64(srcLen-len(src)), 10)
  631. dst = append(dst, '"') // re-append succeeding double-quote
  632. }
  633. return dst
  634. }
  635. func (l *Logger) AppendTextOrJSONLocked(dst, src []byte) []byte {
  636. l.clock = tstime.StdClock{}
  637. return l.appendTextOrJSONLocked(dst, src, 0)
  638. }
  639. // appendTextOrJSONLocked appends a raw text message or a raw JSON object
  640. // in the Tailscale JSON log format.
  641. func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
  642. if l.includeProcSequence {
  643. l.procSequence++
  644. }
  645. if len(src) == 0 || src[0] != '{' {
  646. return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
  647. }
  648. // Check whether the input is a valid JSON object and
  649. // whether it contains the reserved "logtail" name at the top-level.
  650. var logtailKeyOffset, logtailValOffset, logtailValLength int
  651. validJSON := func() bool {
  652. // TODO(dsnet): Avoid allocation of bytes.Buffer struct.
  653. dec := &l.jsonDec
  654. dec.Reset(bytes.NewBuffer(src))
  655. if tok, err := dec.ReadToken(); tok.Kind() != '{' || err != nil {
  656. return false
  657. }
  658. for dec.PeekKind() != '}' {
  659. keyOffset := dec.InputOffset()
  660. tok, err := dec.ReadToken()
  661. if err != nil {
  662. return false
  663. }
  664. isLogtail := tok.String() == "logtail"
  665. valOffset := dec.InputOffset()
  666. if dec.SkipValue() != nil {
  667. return false
  668. }
  669. if isLogtail {
  670. logtailKeyOffset = int(keyOffset)
  671. logtailValOffset = int(valOffset)
  672. logtailValLength = int(dec.InputOffset()) - logtailValOffset
  673. }
  674. }
  675. if tok, err := dec.ReadToken(); tok.Kind() != '}' || err != nil {
  676. return false
  677. }
  678. if _, err := dec.ReadToken(); err != io.EOF {
  679. return false // trailing junk after JSON object
  680. }
  681. return true
  682. }()
  683. // Treat invalid JSON as a raw text message.
  684. if !validJSON {
  685. return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
  686. }
  687. // Check whether the JSON payload is too large.
  688. // Due to logtail metadata, the formatted log entry could exceed maxSize.
  689. // That's okay as the Tailscale log service limit is actually 2*maxSize.
  690. // However, so long as logging applications aim to target the maxSize limit,
  691. // there should be no trouble eventually uploading logs.
  692. if len(src) > maxSize {
  693. errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
  694. errData := appendTruncatedString(nil, src, maxSize/len(`\uffff`)) // escaping could increase size
  695. dst = append(dst, '{')
  696. dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
  697. dst = bytes.TrimRight(dst, ",")
  698. return append(dst, "}\n"...)
  699. }
  700. // Check whether the reserved logtail member occurs in the log data.
  701. // If so, it is moved to the the logtail/error member.
  702. const jsonSeperators = ",:" // per RFC 8259, section 2
  703. const jsonWhitespace = " \n\r\t" // per RFC 8259, section 2
  704. var errDetail string
  705. var errData jsontext.Value
  706. if logtailValLength > 0 {
  707. errDetail = "duplicate logtail member"
  708. errData = bytes.Trim(src[logtailValOffset:][:logtailValLength], jsonSeperators+jsonWhitespace)
  709. }
  710. dst = slices.Grow(dst, len(src))
  711. dst = append(dst, '{')
  712. dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
  713. if logtailValLength > 0 {
  714. // Exclude original logtail member from the message.
  715. dst = appendWithoutNewline(dst, src[len("{"):logtailKeyOffset])
  716. dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
  717. dst = appendWithoutNewline(dst, src[logtailValOffset+logtailValLength:])
  718. } else {
  719. dst = appendWithoutNewline(dst, src[len("{"):])
  720. }
  721. dst = bytes.TrimRight(dst, jsonWhitespace)
  722. dst = dst[:len(dst)-len("}")]
  723. dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
  724. return append(dst, "}\n"...)
  725. }
  726. // appendWithoutNewline appends src to dst except that it ignores newlines
  727. // since newlines are used to frame individual log entries.
  728. func appendWithoutNewline(dst, src []byte) []byte {
  729. for _, c := range src {
  730. if c != '\n' {
  731. dst = append(dst, c)
  732. }
  733. }
  734. return dst
  735. }
  736. // Logf logs to l using the provided fmt-style format and optional arguments.
  737. func (l *Logger) Logf(format string, args ...any) {
  738. fmt.Fprintf(l, format, args...)
  739. }
  740. var obscureIPs = envknob.RegisterBool("TS_OBSCURE_LOGGED_IPS")
  741. // Write logs an encoded JSON blob.
  742. //
  743. // If the []byte passed to Write is not an encoded JSON blob,
  744. // then contents is fit into a JSON blob and written.
  745. //
  746. // This is intended as an interface for the stdlib "log" package.
  747. func (l *Logger) Write(buf []byte) (int, error) {
  748. if len(buf) == 0 {
  749. return 0, nil
  750. }
  751. inLen := len(buf) // length as provided to us, before modifications to downstream writers
  752. level, buf := parseAndRemoveLogLevel(buf)
  753. if l.stderr != nil && l.stderr != io.Discard && int64(level) <= atomic.LoadInt64(&l.stderrLevel) {
  754. if buf[len(buf)-1] == '\n' {
  755. l.stderr.Write(buf)
  756. } else {
  757. // The log package always line-terminates logs,
  758. // so this is an uncommon path.
  759. withNL := append(buf[:len(buf):len(buf)], '\n')
  760. l.stderr.Write(withNL)
  761. }
  762. }
  763. if obscureIPs() {
  764. buf = redactIPs(buf)
  765. }
  766. l.writeLock.Lock()
  767. defer l.writeLock.Unlock()
  768. b := l.appendTextOrJSONLocked(l.writeBuf[:0], buf, level)
  769. _, err := l.sendLocked(b)
  770. return inLen, err
  771. }
  772. var (
  773. regexMatchesIPv6 = regexp.MustCompile(`([0-9a-fA-F]{1,4}):([0-9a-fA-F]{1,4}):([0-9a-fA-F:]{1,4})*`)
  774. regexMatchesIPv4 = regexp.MustCompile(`(\d{1,3})\.(\d{1,3})\.\d{1,3}\.\d{1,3}`)
  775. )
  776. // redactIPs is a helper function used in Write() to redact IPs (other than tailscale IPs).
  777. // This function takes a log line as a byte slice and
  778. // uses regex matching to parse and find IP addresses. Based on if the IP address is IPv4 or
  779. // IPv6, it parses and replaces the end of the addresses with an "x". This function returns the
  780. // log line with the IPs redacted.
  781. func redactIPs(buf []byte) []byte {
  782. out := regexMatchesIPv6.ReplaceAllFunc(buf, func(b []byte) []byte {
  783. ip, err := netip.ParseAddr(string(b))
  784. if err != nil || tsaddr.IsTailscaleIP(ip) {
  785. return b // don't change this one
  786. }
  787. prefix := bytes.Split(b, []byte(":"))
  788. return bytes.Join(append(prefix[:2], []byte("x")), []byte(":"))
  789. })
  790. out = regexMatchesIPv4.ReplaceAllFunc(out, func(b []byte) []byte {
  791. ip, err := netip.ParseAddr(string(b))
  792. if err != nil || tsaddr.IsTailscaleIP(ip) {
  793. return b // don't change this one
  794. }
  795. prefix := bytes.Split(b, []byte("."))
  796. return bytes.Join(append(prefix[:2], []byte("x.x")), []byte("."))
  797. })
  798. return []byte(out)
  799. }
  800. var (
  801. openBracketV = []byte("[v")
  802. v1 = []byte("[v1] ")
  803. v2 = []byte("[v2] ")
  804. vJSON = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
  805. )
  806. // level 0 is normal (or unknown) level; 1+ are increasingly verbose
  807. func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
  808. if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
  809. return 0, buf
  810. }
  811. if bytes.Contains(buf, v1) {
  812. return 1, bytes.ReplaceAll(buf, v1, nil)
  813. }
  814. if bytes.Contains(buf, v2) {
  815. return 2, bytes.ReplaceAll(buf, v2, nil)
  816. }
  817. if i := bytes.Index(buf, vJSON); i != -1 {
  818. rest := buf[i+len(vJSON):]
  819. if len(rest) >= 2 {
  820. v := rest[0]
  821. if v >= '0' && v <= '9' {
  822. return int(v - '0'), rest[1:]
  823. }
  824. }
  825. }
  826. return 0, buf
  827. }
  828. var (
  829. tapSetSize atomic.Int32
  830. tapMu sync.Mutex
  831. tapSet set.HandleSet[chan<- string]
  832. )
  833. // RegisterLogTap registers dst to get a copy of every log write. The caller
  834. // must call unregister when done watching.
  835. //
  836. // This would ideally be a method on Logger, but Logger isn't really available
  837. // in most places; many writes go via stderr which filch redirects to the
  838. // singleton Logger set up early. For better or worse, there's basically only
  839. // one Logger within the program. This mechanism at least works well for
  840. // tailscaled. It works less well for a binary with multiple tsnet.Servers. Oh
  841. // well. This then subscribes to all of them.
  842. func RegisterLogTap(dst chan<- string) (unregister func()) {
  843. tapMu.Lock()
  844. defer tapMu.Unlock()
  845. h := tapSet.Add(dst)
  846. tapSetSize.Store(int32(len(tapSet)))
  847. return func() {
  848. tapMu.Lock()
  849. defer tapMu.Unlock()
  850. delete(tapSet, h)
  851. tapSetSize.Store(int32(len(tapSet)))
  852. }
  853. }
  854. // tapSend relays the JSON blob to any/all registered local debug log watchers
  855. // (somebody running "tailscale debug daemon-logs").
  856. func tapSend(jsonBlob []byte) {
  857. if tapSetSize.Load() == 0 {
  858. return
  859. }
  860. s := string(jsonBlob)
  861. tapMu.Lock()
  862. defer tapMu.Unlock()
  863. for _, dst := range tapSet {
  864. select {
  865. case dst <- s:
  866. default:
  867. }
  868. }
  869. }