logtail.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. //go:build !ts_omit_logtail
  4. // Package logtail sends logs to log.tailscale.com.
  5. package logtail
  6. import (
  7. "bytes"
  8. "cmp"
  9. "context"
  10. "crypto/rand"
  11. "encoding/binary"
  12. "fmt"
  13. "io"
  14. "log"
  15. mrand "math/rand/v2"
  16. "net/http"
  17. "os"
  18. "runtime"
  19. "slices"
  20. "strconv"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/go-json-experiment/json/jsontext"
  25. "tailscale.com/envknob"
  26. "tailscale.com/net/netmon"
  27. "tailscale.com/net/sockstats"
  28. "tailscale.com/tstime"
  29. tslogger "tailscale.com/types/logger"
  30. "tailscale.com/types/logid"
  31. "tailscale.com/util/set"
  32. "tailscale.com/util/truncate"
  33. "tailscale.com/util/zstdframe"
  34. )
  35. // maxSize is the maximum size that a single log entry can be.
  36. // It is also the maximum body size that may be uploaded at a time.
  37. const maxSize = 256 << 10
  38. // maxTextSize is the maximum size for a text log message.
  39. // Note that JSON log messages can be as large as maxSize.
  40. const maxTextSize = 16 << 10
  41. // lowMemRatio reduces maxSize and maxTextSize by this ratio in lowMem mode.
  42. const lowMemRatio = 4
  43. // bufferSize is the typical buffer size to retain.
  44. // It is large enough to handle most log messages,
  45. // but not too large to be a notable waste of memory if retained forever.
  46. const bufferSize = 4 << 10
  47. func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
  48. if cfg.BaseURL == "" {
  49. cfg.BaseURL = "https://" + DefaultHost
  50. }
  51. if cfg.HTTPC == nil {
  52. cfg.HTTPC = http.DefaultClient
  53. }
  54. if cfg.Clock == nil {
  55. cfg.Clock = tstime.StdClock{}
  56. }
  57. if cfg.Stderr == nil {
  58. cfg.Stderr = os.Stderr
  59. }
  60. if cfg.Buffer == nil {
  61. pendingSize := 256
  62. if cfg.LowMemory {
  63. pendingSize = 64
  64. }
  65. cfg.Buffer = NewMemoryBuffer(pendingSize)
  66. }
  67. var procID uint32
  68. if cfg.IncludeProcID {
  69. keyBytes := make([]byte, 4)
  70. rand.Read(keyBytes)
  71. procID = binary.LittleEndian.Uint32(keyBytes)
  72. if procID == 0 {
  73. // 0 is the empty/off value, assign a different (non-zero) value to
  74. // make sure we still include an ID (actual value does not matter).
  75. procID = 7
  76. }
  77. }
  78. if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
  79. if delay, err := time.ParseDuration(s); err == nil {
  80. cfg.FlushDelayFn = func() time.Duration { return delay }
  81. } else {
  82. log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
  83. }
  84. } else if cfg.FlushDelayFn == nil && envknob.Bool("IN_TS_TEST") {
  85. cfg.FlushDelayFn = func() time.Duration { return 0 }
  86. }
  87. var urlSuffix string
  88. if !cfg.CopyPrivateID.IsZero() {
  89. urlSuffix = "?copyId=" + cfg.CopyPrivateID.String()
  90. }
  91. l := &Logger{
  92. privateID: cfg.PrivateID,
  93. stderr: cfg.Stderr,
  94. stderrLevel: int64(cfg.StderrLevel),
  95. httpc: cfg.HTTPC,
  96. url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
  97. lowMem: cfg.LowMemory,
  98. buffer: cfg.Buffer,
  99. maxUploadSize: cfg.MaxUploadSize,
  100. skipClientTime: cfg.SkipClientTime,
  101. drainWake: make(chan struct{}, 1),
  102. sentinel: make(chan int32, 16),
  103. flushDelayFn: cfg.FlushDelayFn,
  104. clock: cfg.Clock,
  105. metricsDelta: cfg.MetricsDelta,
  106. procID: procID,
  107. includeProcSequence: cfg.IncludeProcSequence,
  108. shutdownStart: make(chan struct{}),
  109. shutdownDone: make(chan struct{}),
  110. }
  111. l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
  112. l.compressLogs = cfg.CompressLogs
  113. ctx, cancel := context.WithCancel(context.Background())
  114. l.uploadCancel = cancel
  115. go l.uploading(ctx)
  116. l.Write([]byte("logtail started"))
  117. return l
  118. }
  119. // Logger writes logs, splitting them as configured between local
  120. // logging facilities and uploading to a log server.
  121. type Logger struct {
  122. stderr io.Writer
  123. stderrLevel int64 // accessed atomically
  124. httpc *http.Client
  125. url string
  126. lowMem bool
  127. skipClientTime bool
  128. netMonitor *netmon.Monitor
  129. buffer Buffer
  130. maxUploadSize int
  131. drainWake chan struct{} // signal to speed up drain
  132. drainBuf []byte // owned by drainPending for reuse
  133. flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
  134. flushPending atomic.Bool
  135. sentinel chan int32
  136. clock tstime.Clock
  137. compressLogs bool
  138. uploadCancel func()
  139. explainedRaw bool
  140. metricsDelta func() string // or nil
  141. privateID logid.PrivateID
  142. httpDoCalls atomic.Int32
  143. sockstatsLabel atomicSocktatsLabel
  144. procID uint32
  145. includeProcSequence bool
  146. writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
  147. procSequence uint64
  148. flushTimer tstime.TimerController // used when flushDelay is >0
  149. writeBuf [bufferSize]byte // owned by Write for reuse
  150. bytesBuf bytes.Buffer // owned by appendTextOrJSONLocked for reuse
  151. jsonDec jsontext.Decoder // owned by appendTextOrJSONLocked for reuse
  152. shutdownStartMu sync.Mutex // guards the closing of shutdownStart
  153. shutdownStart chan struct{} // closed when shutdown begins
  154. shutdownDone chan struct{} // closed when shutdown complete
  155. }
  156. type atomicSocktatsLabel struct{ p atomic.Uint32 }
  157. func (p *atomicSocktatsLabel) Load() sockstats.Label { return sockstats.Label(p.p.Load()) }
  158. func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(label)) }
  159. // SetVerbosityLevel controls the verbosity level that should be
  160. // written to stderr. 0 is the default (not verbose). Levels 1 or higher
  161. // are increasingly verbose.
  162. func (l *Logger) SetVerbosityLevel(level int) {
  163. atomic.StoreInt64(&l.stderrLevel, int64(level))
  164. }
  165. // SetNetMon sets the network monitor.
  166. //
  167. // It should not be changed concurrently with log writes and should
  168. // only be set once.
  169. func (l *Logger) SetNetMon(lm *netmon.Monitor) {
  170. l.netMonitor = lm
  171. }
  172. // SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
  173. func (l *Logger) SetSockstatsLabel(label sockstats.Label) {
  174. l.sockstatsLabel.Store(label)
  175. }
  176. // PrivateID returns the logger's private log ID.
  177. //
  178. // It exists for internal use only.
  179. func (l *Logger) PrivateID() logid.PrivateID { return l.privateID }
  180. // Shutdown gracefully shuts down the logger while completing any
  181. // remaining uploads.
  182. //
  183. // It will block, continuing to try and upload unless the passed
  184. // context object interrupts it by being done.
  185. // If the shutdown is interrupted, an error is returned.
  186. func (l *Logger) Shutdown(ctx context.Context) error {
  187. done := make(chan struct{})
  188. go func() {
  189. select {
  190. case <-ctx.Done():
  191. l.uploadCancel()
  192. <-l.shutdownDone
  193. case <-l.shutdownDone:
  194. }
  195. close(done)
  196. l.httpc.CloseIdleConnections()
  197. }()
  198. l.shutdownStartMu.Lock()
  199. select {
  200. case <-l.shutdownStart:
  201. l.shutdownStartMu.Unlock()
  202. return nil
  203. default:
  204. }
  205. close(l.shutdownStart)
  206. l.shutdownStartMu.Unlock()
  207. io.WriteString(l, "logger closing down\n")
  208. <-done
  209. return nil
  210. }
  211. // Close shuts down this logger object, the background log uploader
  212. // process, and any associated goroutines.
  213. //
  214. // Deprecated: use Shutdown
  215. func (l *Logger) Close() {
  216. l.Shutdown(context.Background())
  217. }
  218. // drainBlock is called by drainPending when there are no logs to drain.
  219. //
  220. // In typical operation, every call to the Write method unblocks and triggers a
  221. // buffer.TryReadline, so logs are written with very low latency.
  222. //
  223. // If the caller specified FlushInterface, drainWake is only sent to
  224. // periodically.
  225. func (l *Logger) drainBlock() (shuttingDown bool) {
  226. select {
  227. case <-l.shutdownStart:
  228. return true
  229. case <-l.drainWake:
  230. }
  231. return false
  232. }
  233. // drainPending drains and encodes a batch of logs from the buffer for upload.
  234. // If no logs are available, drainPending blocks until logs are available.
  235. // The returned buffer is only valid until the next call to drainPending.
  236. func (l *Logger) drainPending() (b []byte) {
  237. b = l.drainBuf[:0]
  238. b = append(b, '[')
  239. defer func() {
  240. b = bytes.TrimRight(b, ",")
  241. b = append(b, ']')
  242. l.drainBuf = b
  243. if len(b) <= len("[]") {
  244. b = nil
  245. }
  246. }()
  247. maxLen := cmp.Or(l.maxUploadSize, maxSize)
  248. if l.lowMem {
  249. // When operating in a low memory environment, it is better to upload
  250. // in multiple operations than it is to allocate a large body and OOM.
  251. // Even if maxLen is less than maxSize, we can still upload an entry
  252. // that is up to maxSize if we happen to encounter one.
  253. maxLen /= lowMemRatio
  254. }
  255. for len(b) < maxLen {
  256. line, err := l.buffer.TryReadLine()
  257. switch {
  258. case err == io.EOF:
  259. return b
  260. case err != nil:
  261. b = append(b, '{')
  262. b = l.appendMetadata(b, false, true, 0, 0, "reading ringbuffer: "+err.Error(), nil, 0)
  263. b = bytes.TrimRight(b, ",")
  264. b = append(b, '}')
  265. return b
  266. case line == nil:
  267. // If we read at least some log entries, return immediately.
  268. if len(b) > len("[") {
  269. return b
  270. }
  271. // We're about to block. If we're holding on to too much memory
  272. // in our buffer from a previous large write, let it go.
  273. if cap(b) > bufferSize {
  274. b = bytes.Clone(b)
  275. l.drainBuf = b
  276. }
  277. if shuttingDown := l.drainBlock(); shuttingDown {
  278. return b
  279. }
  280. continue
  281. }
  282. switch {
  283. case len(line) == 0:
  284. continue
  285. case line[0] == '{' && jsontext.Value(line).IsValid():
  286. // This is already a valid JSON object, so just append it.
  287. // This may exceed maxLen, but should be no larger than maxSize
  288. // so long as logic writing into the buffer enforces the limit.
  289. b = append(b, line...)
  290. default:
  291. // This is probably a log added to stderr by filch
  292. // outside of the logtail logger. Encode it.
  293. if !l.explainedRaw {
  294. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  295. fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
  296. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  297. fmt.Fprintf(l.stderr, "RAW-STDERR:\n")
  298. l.explainedRaw = true
  299. }
  300. fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
  301. // Do not add a client time, as it could be really old.
  302. // Do not include instance key or ID either,
  303. // since this came from a different instance.
  304. b = l.appendText(b, line, true, 0, 0, 0)
  305. }
  306. b = append(b, ',')
  307. }
  308. return b
  309. }
  310. // This is the goroutine that repeatedly uploads logs in the background.
  311. func (l *Logger) uploading(ctx context.Context) {
  312. defer close(l.shutdownDone)
  313. for {
  314. body := l.drainPending()
  315. origlen := -1 // sentinel value: uncompressed
  316. // Don't attempt to compress tiny bodies; not worth the CPU cycles.
  317. if l.compressLogs && len(body) > 256 {
  318. zbody := zstdframe.AppendEncode(nil, body,
  319. zstdframe.FastestCompression, zstdframe.LowMemory(true))
  320. // Only send it compressed if the bandwidth savings are sufficient.
  321. // Just the extra headers associated with enabling compression
  322. // are 50 bytes by themselves.
  323. if len(body)-len(zbody) > 64 {
  324. origlen = len(body)
  325. body = zbody
  326. }
  327. }
  328. var lastError string
  329. var numFailures int
  330. var firstFailure time.Time
  331. for len(body) > 0 && ctx.Err() == nil {
  332. retryAfter, err := l.upload(ctx, body, origlen)
  333. if err != nil {
  334. numFailures++
  335. firstFailure = l.clock.Now()
  336. if !l.internetUp() {
  337. fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
  338. l.awaitInternetUp(ctx)
  339. continue
  340. }
  341. // Only print the same message once.
  342. if currError := err.Error(); lastError != currError {
  343. fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
  344. lastError = currError
  345. }
  346. // Sleep for the specified retryAfter period,
  347. // otherwise default to some random value.
  348. if retryAfter <= 0 {
  349. retryAfter = mrand.N(30*time.Second) + 30*time.Second
  350. }
  351. tstime.Sleep(ctx, retryAfter)
  352. } else {
  353. // Only print a success message after recovery.
  354. if numFailures > 0 {
  355. fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, l.clock.Since(firstFailure).Round(time.Second))
  356. }
  357. break
  358. }
  359. }
  360. select {
  361. case <-l.shutdownStart:
  362. return
  363. default:
  364. }
  365. }
  366. }
  367. func (l *Logger) internetUp() bool {
  368. if l.netMonitor == nil {
  369. // No way to tell, so assume it is.
  370. return true
  371. }
  372. return l.netMonitor.InterfaceState().AnyInterfaceUp()
  373. }
  374. func (l *Logger) awaitInternetUp(ctx context.Context) {
  375. upc := make(chan bool, 1)
  376. defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
  377. if delta.New.AnyInterfaceUp() {
  378. select {
  379. case upc <- true:
  380. default:
  381. }
  382. }
  383. })()
  384. if l.internetUp() {
  385. return
  386. }
  387. select {
  388. case <-upc:
  389. fmt.Fprintf(l.stderr, "logtail: internet back up\n")
  390. case <-ctx.Done():
  391. }
  392. }
  393. // upload uploads body to the log server.
  394. // origlen indicates the pre-compression body length.
  395. // origlen of -1 indicates that the body is not compressed.
  396. func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) {
  397. const maxUploadTime = 45 * time.Second
  398. ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel.Load(), l.Logf)
  399. ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
  400. defer cancel()
  401. req, err := http.NewRequestWithContext(ctx, "POST", l.url, bytes.NewReader(body))
  402. if err != nil {
  403. // I know of no conditions under which this could fail.
  404. // Report it very loudly.
  405. // TODO record logs to disk
  406. panic("logtail: cannot build http request: " + err.Error())
  407. }
  408. if origlen != -1 {
  409. req.Header.Add("Content-Encoding", "zstd")
  410. req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
  411. }
  412. if runtime.GOOS == "js" {
  413. // We once advertised we'd accept optional client certs (for internal use)
  414. // on log.tailscale.com but then Tailscale SSH js/wasm clients prompted
  415. // users (on some browsers?) to pick a client cert. We'll fix the server's
  416. // TLS ServerHello, but we can also fix it client side for good measure.
  417. //
  418. // Corp details: https://github.com/tailscale/corp/issues/18177#issuecomment-2026598715
  419. // and https://github.com/tailscale/corp/pull/18775#issuecomment-2027505036
  420. //
  421. // See https://github.com/golang/go/wiki/WebAssembly#configuring-fetch-options-while-using-nethttp
  422. // and https://developer.mozilla.org/en-US/docs/Web/API/fetch#credentials
  423. req.Header.Set("js.fetch:credentials", "omit")
  424. }
  425. req.Header["User-Agent"] = nil // not worth writing one; save some bytes
  426. compressedNote := "not-compressed"
  427. if origlen != -1 {
  428. compressedNote = "compressed"
  429. }
  430. l.httpDoCalls.Add(1)
  431. resp, err := l.httpc.Do(req)
  432. if err != nil {
  433. return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
  434. }
  435. defer resp.Body.Close()
  436. if resp.StatusCode != http.StatusOK {
  437. n, _ := strconv.Atoi(resp.Header.Get("Retry-After"))
  438. b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
  439. return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b))
  440. }
  441. return 0, nil
  442. }
  443. // Flush uploads all logs to the server. It blocks until complete or there is an
  444. // unrecoverable error.
  445. //
  446. // TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
  447. // Finish cleaning this up.
  448. func (l *Logger) Flush() error {
  449. return nil
  450. }
  451. // StartFlush starts a log upload, if anything is pending.
  452. //
  453. // If l is nil, StartFlush is a no-op.
  454. func (l *Logger) StartFlush() {
  455. if l != nil {
  456. l.tryDrainWake()
  457. }
  458. }
  459. // logtailDisabled is whether logtail uploads to logcatcher are disabled.
  460. var logtailDisabled atomic.Bool
  461. // Disable disables logtail uploads for the lifetime of the process.
  462. func Disable() {
  463. logtailDisabled.Store(true)
  464. }
  465. var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
  466. // tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
  467. // It does not block.
  468. func (l *Logger) tryDrainWake() {
  469. l.flushPending.Store(false)
  470. if debugWakesAndUploads() {
  471. // Using println instead of log.Printf here to avoid recursing back into
  472. // ourselves.
  473. println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load())
  474. }
  475. select {
  476. case l.drainWake <- struct{}{}:
  477. default:
  478. }
  479. }
  480. func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
  481. tapSend(jsonBlob)
  482. if logtailDisabled.Load() {
  483. return len(jsonBlob), nil
  484. }
  485. n, err := l.buffer.Write(jsonBlob)
  486. flushDelay := defaultFlushDelay
  487. if l.flushDelayFn != nil {
  488. flushDelay = l.flushDelayFn()
  489. }
  490. if flushDelay > 0 {
  491. if l.flushPending.CompareAndSwap(false, true) {
  492. if l.flushTimer == nil {
  493. l.flushTimer = l.clock.AfterFunc(flushDelay, l.tryDrainWake)
  494. } else {
  495. l.flushTimer.Reset(flushDelay)
  496. }
  497. }
  498. } else {
  499. l.tryDrainWake()
  500. }
  501. return n, err
  502. }
  503. // appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
  504. // This assumes dst is already within a JSON object.
  505. // Each member is comma-terminated.
  506. func (l *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, procID uint32, procSequence uint64, errDetail string, errData jsontext.Value, level int) []byte {
  507. // Append optional logtail metadata.
  508. if !skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
  509. dst = append(dst, `"logtail":{`...)
  510. if !skipClientTime {
  511. dst = append(dst, `"client_time":"`...)
  512. dst = l.clock.Now().UTC().AppendFormat(dst, time.RFC3339Nano)
  513. dst = append(dst, '"', ',')
  514. }
  515. if procID != 0 {
  516. dst = append(dst, `"proc_id":`...)
  517. dst = strconv.AppendUint(dst, uint64(procID), 10)
  518. dst = append(dst, ',')
  519. }
  520. if procSequence != 0 {
  521. dst = append(dst, `"proc_seq":`...)
  522. dst = strconv.AppendUint(dst, procSequence, 10)
  523. dst = append(dst, ',')
  524. }
  525. if errDetail != "" || errData != nil {
  526. dst = append(dst, `"error":{`...)
  527. if errDetail != "" {
  528. dst = append(dst, `"detail":`...)
  529. dst, _ = jsontext.AppendQuote(dst, errDetail)
  530. dst = append(dst, ',')
  531. }
  532. if errData != nil {
  533. dst = append(dst, `"bad_data":`...)
  534. dst = append(dst, errData...)
  535. dst = append(dst, ',')
  536. }
  537. dst = bytes.TrimRight(dst, ",")
  538. dst = append(dst, '}', ',')
  539. }
  540. dst = bytes.TrimRight(dst, ",")
  541. dst = append(dst, '}', ',')
  542. }
  543. // Append optional metrics metadata.
  544. if !skipMetrics && l.metricsDelta != nil {
  545. if d := l.metricsDelta(); d != "" {
  546. dst = append(dst, `"metrics":"`...)
  547. dst = append(dst, d...)
  548. dst = append(dst, '"', ',')
  549. }
  550. }
  551. // Add the optional log level, if non-zero.
  552. // Note that we only use log levels 1 and 2 currently.
  553. // It's unlikely we'll ever make it past 9.
  554. if level > 0 && level < 10 {
  555. dst = append(dst, `"v":`...)
  556. dst = append(dst, '0'+byte(level))
  557. dst = append(dst, ',')
  558. }
  559. return dst
  560. }
  561. // appendText appends a raw text message in the Tailscale JSON log entry format.
  562. func (l *Logger) appendText(dst, src []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
  563. dst = slices.Grow(dst, len(src))
  564. dst = append(dst, '{')
  565. dst = l.appendMetadata(dst, skipClientTime, false, procID, procSequence, "", nil, level)
  566. if len(src) == 0 {
  567. dst = bytes.TrimRight(dst, ",")
  568. return append(dst, "}\n"...)
  569. }
  570. // Append the text string, which may be truncated.
  571. // Invalid UTF-8 will be mangled with the Unicode replacement character.
  572. max := maxTextSize
  573. if l.lowMem {
  574. max /= lowMemRatio
  575. }
  576. dst = append(dst, `"text":`...)
  577. dst = appendTruncatedString(dst, src, max)
  578. return append(dst, "}\n"...)
  579. }
  580. // appendTruncatedString appends a JSON string for src,
  581. // truncating the src to be no larger than n.
  582. func appendTruncatedString(dst, src []byte, n int) []byte {
  583. srcLen := len(src)
  584. src = truncate.String(src, n)
  585. dst, _ = jsontext.AppendQuote(dst, src) // ignore error; only occurs for invalid UTF-8
  586. if srcLen > len(src) {
  587. dst = dst[:len(dst)-len(`"`)] // trim off preceding double-quote
  588. dst = append(dst, "…+"...)
  589. dst = strconv.AppendInt(dst, int64(srcLen-len(src)), 10)
  590. dst = append(dst, '"') // re-append succeeding double-quote
  591. }
  592. return dst
  593. }
  594. // appendTextOrJSONLocked appends a raw text message or a raw JSON object
  595. // in the Tailscale JSON log format.
  596. func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
  597. if l.includeProcSequence {
  598. l.procSequence++
  599. }
  600. if len(src) == 0 || src[0] != '{' {
  601. return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
  602. }
  603. // Check whether the input is a valid JSON object and
  604. // whether it contains the reserved "logtail" name at the top-level.
  605. var logtailKeyOffset, logtailValOffset, logtailValLength int
  606. validJSON := func() bool {
  607. // The jsontext.NewDecoder API operates on an io.Reader, for which
  608. // bytes.Buffer provides a means to convert a []byte into an io.Reader.
  609. // However, bytes.NewBuffer normally allocates unless
  610. // we immediately shallow copy it into a pre-allocated Buffer struct.
  611. // See https://go.dev/issue/67004.
  612. l.bytesBuf = *bytes.NewBuffer(src)
  613. defer func() { l.bytesBuf = bytes.Buffer{} }() // avoid pinning src
  614. dec := &l.jsonDec
  615. dec.Reset(&l.bytesBuf)
  616. if tok, err := dec.ReadToken(); tok.Kind() != '{' || err != nil {
  617. return false
  618. }
  619. for dec.PeekKind() != '}' {
  620. keyOffset := dec.InputOffset()
  621. tok, err := dec.ReadToken()
  622. if err != nil {
  623. return false
  624. }
  625. isLogtail := tok.String() == "logtail"
  626. valOffset := dec.InputOffset()
  627. if dec.SkipValue() != nil {
  628. return false
  629. }
  630. if isLogtail {
  631. logtailKeyOffset = int(keyOffset)
  632. logtailValOffset = int(valOffset)
  633. logtailValLength = int(dec.InputOffset()) - logtailValOffset
  634. }
  635. }
  636. if tok, err := dec.ReadToken(); tok.Kind() != '}' || err != nil {
  637. return false
  638. }
  639. if _, err := dec.ReadToken(); err != io.EOF {
  640. return false // trailing junk after JSON object
  641. }
  642. return true
  643. }()
  644. // Treat invalid JSON as a raw text message.
  645. if !validJSON {
  646. return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
  647. }
  648. // Check whether the JSON payload is too large.
  649. // Due to logtail metadata, the formatted log entry could exceed maxSize.
  650. // That's okay as the Tailscale log service limit is actually 2*maxSize.
  651. // However, so long as logging applications aim to target the maxSize limit,
  652. // there should be no trouble eventually uploading logs.
  653. maxLen := cmp.Or(l.maxUploadSize, maxSize)
  654. if len(src) > maxLen {
  655. errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
  656. errData := appendTruncatedString(nil, src, maxLen/len(`\uffff`)) // escaping could increase size
  657. dst = append(dst, '{')
  658. dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
  659. dst = bytes.TrimRight(dst, ",")
  660. return append(dst, "}\n"...)
  661. }
  662. // Check whether the reserved logtail member occurs in the log data.
  663. // If so, it is moved to the the logtail/error member.
  664. const jsonSeperators = ",:" // per RFC 8259, section 2
  665. const jsonWhitespace = " \n\r\t" // per RFC 8259, section 2
  666. var errDetail string
  667. var errData jsontext.Value
  668. if logtailValLength > 0 {
  669. errDetail = "duplicate logtail member"
  670. errData = bytes.Trim(src[logtailValOffset:][:logtailValLength], jsonSeperators+jsonWhitespace)
  671. }
  672. dst = slices.Grow(dst, len(src))
  673. dst = append(dst, '{')
  674. dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
  675. if logtailValLength > 0 {
  676. // Exclude original logtail member from the message.
  677. dst = appendWithoutNewline(dst, src[len("{"):logtailKeyOffset])
  678. dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
  679. dst = appendWithoutNewline(dst, src[logtailValOffset+logtailValLength:])
  680. } else {
  681. dst = appendWithoutNewline(dst, src[len("{"):])
  682. }
  683. dst = bytes.TrimRight(dst, jsonWhitespace)
  684. dst = dst[:len(dst)-len("}")]
  685. dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
  686. return append(dst, "}\n"...)
  687. }
  688. // appendWithoutNewline appends src to dst except that it ignores newlines
  689. // since newlines are used to frame individual log entries.
  690. func appendWithoutNewline(dst, src []byte) []byte {
  691. for _, c := range src {
  692. if c != '\n' {
  693. dst = append(dst, c)
  694. }
  695. }
  696. return dst
  697. }
  698. // Logf logs to l using the provided fmt-style format and optional arguments.
  699. func (l *Logger) Logf(format string, args ...any) {
  700. fmt.Fprintf(l, format, args...)
  701. }
  702. // Write logs an encoded JSON blob.
  703. //
  704. // If the []byte passed to Write is not an encoded JSON blob,
  705. // then contents is fit into a JSON blob and written.
  706. //
  707. // This is intended as an interface for the stdlib "log" package.
  708. func (l *Logger) Write(buf []byte) (int, error) {
  709. if len(buf) == 0 {
  710. return 0, nil
  711. }
  712. inLen := len(buf) // length as provided to us, before modifications to downstream writers
  713. level, buf := parseAndRemoveLogLevel(buf)
  714. if l.stderr != nil && l.stderr != io.Discard && int64(level) <= atomic.LoadInt64(&l.stderrLevel) {
  715. if buf[len(buf)-1] == '\n' {
  716. l.stderr.Write(buf)
  717. } else {
  718. // The log package always line-terminates logs,
  719. // so this is an uncommon path.
  720. withNL := append(buf[:len(buf):len(buf)], '\n')
  721. l.stderr.Write(withNL)
  722. }
  723. }
  724. l.writeLock.Lock()
  725. defer l.writeLock.Unlock()
  726. b := l.appendTextOrJSONLocked(l.writeBuf[:0], buf, level)
  727. _, err := l.sendLocked(b)
  728. return inLen, err
  729. }
  730. var (
  731. openBracketV = []byte("[v")
  732. v1 = []byte("[v1] ")
  733. v2 = []byte("[v2] ")
  734. vJSON = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
  735. )
  736. // level 0 is normal (or unknown) level; 1+ are increasingly verbose
  737. func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
  738. if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
  739. return 0, buf
  740. }
  741. if bytes.Contains(buf, v1) {
  742. return 1, bytes.ReplaceAll(buf, v1, nil)
  743. }
  744. if bytes.Contains(buf, v2) {
  745. return 2, bytes.ReplaceAll(buf, v2, nil)
  746. }
  747. if i := bytes.Index(buf, vJSON); i != -1 {
  748. rest := buf[i+len(vJSON):]
  749. if len(rest) >= 2 {
  750. v := rest[0]
  751. if v >= '0' && v <= '9' {
  752. return int(v - '0'), rest[1:]
  753. }
  754. }
  755. }
  756. return 0, buf
  757. }
  758. var (
  759. tapSetSize atomic.Int32
  760. tapMu sync.Mutex
  761. tapSet set.HandleSet[chan<- string]
  762. )
  763. // RegisterLogTap registers dst to get a copy of every log write. The caller
  764. // must call unregister when done watching.
  765. //
  766. // This would ideally be a method on Logger, but Logger isn't really available
  767. // in most places; many writes go via stderr which filch redirects to the
  768. // singleton Logger set up early. For better or worse, there's basically only
  769. // one Logger within the program. This mechanism at least works well for
  770. // tailscaled. It works less well for a binary with multiple tsnet.Servers. Oh
  771. // well. This then subscribes to all of them.
  772. func RegisterLogTap(dst chan<- string) (unregister func()) {
  773. tapMu.Lock()
  774. defer tapMu.Unlock()
  775. h := tapSet.Add(dst)
  776. tapSetSize.Store(int32(len(tapSet)))
  777. return func() {
  778. tapMu.Lock()
  779. defer tapMu.Unlock()
  780. delete(tapSet, h)
  781. tapSetSize.Store(int32(len(tapSet)))
  782. }
  783. }
  784. // tapSend relays the JSON blob to any/all registered local debug log watchers
  785. // (somebody running "tailscale debug daemon-logs").
  786. func tapSend(jsonBlob []byte) {
  787. if tapSetSize.Load() == 0 {
  788. return
  789. }
  790. s := string(jsonBlob)
  791. tapMu.Lock()
  792. defer tapMu.Unlock()
  793. for _, dst := range tapSet {
  794. select {
  795. case dst <- s:
  796. default:
  797. }
  798. }
  799. }