logtail.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. //go:build !ts_omit_logtail
  4. // Package logtail sends logs to log.tailscale.com.
  5. package logtail
  6. import (
  7. "bytes"
  8. "cmp"
  9. "context"
  10. "crypto/rand"
  11. "encoding/binary"
  12. "expvar"
  13. "fmt"
  14. "io"
  15. "log"
  16. mrand "math/rand/v2"
  17. "net/http"
  18. "os"
  19. "runtime"
  20. "slices"
  21. "strconv"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/creachadair/msync/trigger"
  26. "github.com/go-json-experiment/json/jsontext"
  27. "tailscale.com/envknob"
  28. "tailscale.com/metrics"
  29. "tailscale.com/net/netmon"
  30. "tailscale.com/net/sockstats"
  31. "tailscale.com/tstime"
  32. tslogger "tailscale.com/types/logger"
  33. "tailscale.com/types/logid"
  34. "tailscale.com/util/eventbus"
  35. "tailscale.com/util/set"
  36. "tailscale.com/util/truncate"
  37. "tailscale.com/util/zstdframe"
  38. )
  39. // maxSize is the maximum size that a single log entry can be.
  40. // It is also the maximum body size that may be uploaded at a time.
  41. const maxSize = 256 << 10
  42. // maxTextSize is the maximum size for a text log message.
  43. // Note that JSON log messages can be as large as maxSize.
  44. const maxTextSize = 16 << 10
  45. // lowMemRatio reduces maxSize and maxTextSize by this ratio in lowMem mode.
  46. const lowMemRatio = 4
  47. // bufferSize is the typical buffer size to retain.
  48. // It is large enough to handle most log messages,
  49. // but not too large to be a notable waste of memory if retained forever.
  50. const bufferSize = 4 << 10
  51. func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
  52. if cfg.BaseURL == "" {
  53. cfg.BaseURL = "https://" + DefaultHost
  54. }
  55. if cfg.HTTPC == nil {
  56. cfg.HTTPC = http.DefaultClient
  57. }
  58. if cfg.Clock == nil {
  59. cfg.Clock = tstime.StdClock{}
  60. }
  61. if cfg.Stderr == nil {
  62. cfg.Stderr = os.Stderr
  63. }
  64. if cfg.Buffer == nil {
  65. pendingSize := 256
  66. if cfg.LowMemory {
  67. pendingSize = 64
  68. }
  69. cfg.Buffer = NewMemoryBuffer(pendingSize)
  70. }
  71. var procID uint32
  72. if cfg.IncludeProcID {
  73. keyBytes := make([]byte, 4)
  74. rand.Read(keyBytes)
  75. procID = binary.LittleEndian.Uint32(keyBytes)
  76. if procID == 0 {
  77. // 0 is the empty/off value, assign a different (non-zero) value to
  78. // make sure we still include an ID (actual value does not matter).
  79. procID = 7
  80. }
  81. }
  82. if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
  83. if delay, err := time.ParseDuration(s); err == nil {
  84. cfg.FlushDelayFn = func() time.Duration { return delay }
  85. } else {
  86. log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
  87. }
  88. } else if cfg.FlushDelayFn == nil && envknob.Bool("IN_TS_TEST") {
  89. cfg.FlushDelayFn = func() time.Duration { return 0 }
  90. }
  91. var urlSuffix string
  92. if !cfg.CopyPrivateID.IsZero() {
  93. urlSuffix = "?copyId=" + cfg.CopyPrivateID.String()
  94. }
  95. logger := &Logger{
  96. privateID: cfg.PrivateID,
  97. stderr: cfg.Stderr,
  98. stderrLevel: int64(cfg.StderrLevel),
  99. httpc: cfg.HTTPC,
  100. url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
  101. lowMem: cfg.LowMemory,
  102. buffer: cfg.Buffer,
  103. maxUploadSize: cfg.MaxUploadSize,
  104. skipClientTime: cfg.SkipClientTime,
  105. drainWake: make(chan struct{}, 1),
  106. sentinel: make(chan int32, 16),
  107. flushDelayFn: cfg.FlushDelayFn,
  108. clock: cfg.Clock,
  109. metricsDelta: cfg.MetricsDelta,
  110. procID: procID,
  111. includeProcSequence: cfg.IncludeProcSequence,
  112. shutdownStart: make(chan struct{}),
  113. shutdownDone: make(chan struct{}),
  114. }
  115. if cfg.Bus != nil {
  116. logger.eventClient = cfg.Bus.Client("logtail.Logger")
  117. // Subscribe to change deltas from NetMon to detect when the network comes up.
  118. eventbus.SubscribeFunc(logger.eventClient, logger.onChangeDelta)
  119. }
  120. logger.SetSockstatsLabel(sockstats.LabelLogtailLogger)
  121. logger.compressLogs = cfg.CompressLogs
  122. ctx, cancel := context.WithCancel(context.Background())
  123. logger.uploadCancel = cancel
  124. go logger.uploading(ctx)
  125. logger.Write([]byte("logtail started"))
  126. return logger
  127. }
  128. // Logger writes logs, splitting them as configured between local
  129. // logging facilities and uploading to a log server.
  130. type Logger struct {
  131. stderr io.Writer
  132. stderrLevel int64 // accessed atomically
  133. httpc *http.Client
  134. url string
  135. lowMem bool
  136. skipClientTime bool
  137. netMonitor *netmon.Monitor
  138. buffer Buffer
  139. maxUploadSize int
  140. drainWake chan struct{} // signal to speed up drain
  141. drainBuf []byte // owned by drainPending for reuse
  142. flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
  143. flushPending atomic.Bool
  144. sentinel chan int32
  145. clock tstime.Clock
  146. compressLogs bool
  147. uploadCancel func()
  148. explainedRaw bool
  149. metricsDelta func() string // or nil
  150. privateID logid.PrivateID
  151. httpDoCalls atomic.Int32
  152. sockstatsLabel atomicSocktatsLabel
  153. eventClient *eventbus.Client
  154. networkIsUp trigger.Cond // set/reset by netmon.ChangeDelta events
  155. procID uint32
  156. includeProcSequence bool
  157. writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
  158. procSequence uint64
  159. flushTimer tstime.TimerController // used when flushDelay is >0
  160. writeBuf [bufferSize]byte // owned by Write for reuse
  161. bytesBuf bytes.Buffer // owned by appendTextOrJSONLocked for reuse
  162. jsonDec jsontext.Decoder // owned by appendTextOrJSONLocked for reuse
  163. shutdownStartMu sync.Mutex // guards the closing of shutdownStart
  164. shutdownStart chan struct{} // closed when shutdown begins
  165. shutdownDone chan struct{} // closed when shutdown complete
  166. // Metrics (see [Logger.ExpVar] for details).
  167. uploadCalls expvar.Int
  168. failedCalls expvar.Int
  169. uploadedBytes expvar.Int
  170. uploadingTime expvar.Int
  171. }
  172. type atomicSocktatsLabel struct{ p atomic.Uint32 }
  173. func (p *atomicSocktatsLabel) Load() sockstats.Label { return sockstats.Label(p.p.Load()) }
  174. func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(label)) }
  175. // SetVerbosityLevel controls the verbosity level that should be
  176. // written to stderr. 0 is the default (not verbose). Levels 1 or higher
  177. // are increasingly verbose.
  178. func (lg *Logger) SetVerbosityLevel(level int) {
  179. atomic.StoreInt64(&lg.stderrLevel, int64(level))
  180. }
  181. // SetNetMon sets the network monitor.
  182. //
  183. // It should not be changed concurrently with log writes and should
  184. // only be set once.
  185. func (lg *Logger) SetNetMon(lm *netmon.Monitor) {
  186. lg.netMonitor = lm
  187. }
  188. // SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
  189. func (lg *Logger) SetSockstatsLabel(label sockstats.Label) {
  190. lg.sockstatsLabel.Store(label)
  191. }
  192. // PrivateID returns the logger's private log ID.
  193. //
  194. // It exists for internal use only.
  195. func (lg *Logger) PrivateID() logid.PrivateID { return lg.privateID }
  196. // Shutdown gracefully shuts down the logger while completing any
  197. // remaining uploads.
  198. //
  199. // It will block, continuing to try and upload unless the passed
  200. // context object interrupts it by being done.
  201. // If the shutdown is interrupted, an error is returned.
  202. func (lg *Logger) Shutdown(ctx context.Context) error {
  203. done := make(chan struct{})
  204. go func() {
  205. select {
  206. case <-ctx.Done():
  207. lg.uploadCancel()
  208. <-lg.shutdownDone
  209. case <-lg.shutdownDone:
  210. }
  211. close(done)
  212. lg.httpc.CloseIdleConnections()
  213. }()
  214. if lg.eventClient != nil {
  215. lg.eventClient.Close()
  216. }
  217. lg.shutdownStartMu.Lock()
  218. select {
  219. case <-lg.shutdownStart:
  220. lg.shutdownStartMu.Unlock()
  221. return nil
  222. default:
  223. }
  224. close(lg.shutdownStart)
  225. lg.shutdownStartMu.Unlock()
  226. io.WriteString(lg, "logger closing down\n")
  227. <-done
  228. return nil
  229. }
  230. // Close shuts down this logger object, the background log uploader
  231. // process, and any associated goroutines.
  232. //
  233. // Deprecated: use Shutdown
  234. func (lg *Logger) Close() {
  235. lg.Shutdown(context.Background())
  236. }
  237. // drainBlock is called by drainPending when there are no logs to drain.
  238. //
  239. // In typical operation, every call to the Write method unblocks and triggers a
  240. // buffer.TryReadline, so logs are written with very low latency.
  241. //
  242. // If the caller specified FlushInterface, drainWake is only sent to
  243. // periodically.
  244. func (lg *Logger) drainBlock() (shuttingDown bool) {
  245. select {
  246. case <-lg.shutdownStart:
  247. return true
  248. case <-lg.drainWake:
  249. }
  250. return false
  251. }
  252. // drainPending drains and encodes a batch of logs from the buffer for upload.
  253. // If no logs are available, drainPending blocks until logs are available.
  254. // The returned buffer is only valid until the next call to drainPending.
  255. func (lg *Logger) drainPending() (b []byte) {
  256. b = lg.drainBuf[:0]
  257. b = append(b, '[')
  258. defer func() {
  259. b = bytes.TrimRight(b, ",")
  260. b = append(b, ']')
  261. lg.drainBuf = b
  262. if len(b) <= len("[]") {
  263. b = nil
  264. }
  265. }()
  266. maxLen := cmp.Or(lg.maxUploadSize, maxSize)
  267. if lg.lowMem {
  268. // When operating in a low memory environment, it is better to upload
  269. // in multiple operations than it is to allocate a large body and OOM.
  270. // Even if maxLen is less than maxSize, we can still upload an entry
  271. // that is up to maxSize if we happen to encounter one.
  272. maxLen /= lowMemRatio
  273. }
  274. for len(b) < maxLen {
  275. line, err := lg.buffer.TryReadLine()
  276. switch {
  277. case err == io.EOF:
  278. return b
  279. case err != nil:
  280. b = append(b, '{')
  281. b = lg.appendMetadata(b, false, true, 0, 0, "reading ringbuffer: "+err.Error(), nil, 0)
  282. b = bytes.TrimRight(b, ",")
  283. b = append(b, '}')
  284. return b
  285. case line == nil:
  286. // If we read at least some log entries, return immediately.
  287. if len(b) > len("[") {
  288. return b
  289. }
  290. // We're about to block. If we're holding on to too much memory
  291. // in our buffer from a previous large write, let it go.
  292. if cap(b) > bufferSize {
  293. b = bytes.Clone(b)
  294. lg.drainBuf = b
  295. }
  296. if shuttingDown := lg.drainBlock(); shuttingDown {
  297. return b
  298. }
  299. continue
  300. }
  301. switch {
  302. case len(line) == 0:
  303. continue
  304. case line[0] == '{' && jsontext.Value(line).IsValid():
  305. // This is already a valid JSON object, so just append it.
  306. // This may exceed maxLen, but should be no larger than maxSize
  307. // so long as logic writing into the buffer enforces the limit.
  308. b = append(b, line...)
  309. default:
  310. // This is probably a log added to stderr by filch
  311. // outside of the logtail logger. Encode it.
  312. if !lg.explainedRaw {
  313. fmt.Fprintf(lg.stderr, "RAW-STDERR: ***\n")
  314. fmt.Fprintf(lg.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
  315. fmt.Fprintf(lg.stderr, "RAW-STDERR: ***\n")
  316. fmt.Fprintf(lg.stderr, "RAW-STDERR:\n")
  317. lg.explainedRaw = true
  318. }
  319. fmt.Fprintf(lg.stderr, "RAW-STDERR: %s", b)
  320. // Do not add a client time, as it could be really old.
  321. // Do not include instance key or ID either,
  322. // since this came from a different instance.
  323. b = lg.appendText(b, line, true, 0, 0, 0)
  324. }
  325. b = append(b, ',')
  326. }
  327. return b
  328. }
  329. // This is the goroutine that repeatedly uploads logs in the background.
  330. func (lg *Logger) uploading(ctx context.Context) {
  331. defer close(lg.shutdownDone)
  332. for {
  333. body := lg.drainPending()
  334. origlen := -1 // sentinel value: uncompressed
  335. // Don't attempt to compress tiny bodies; not worth the CPU cycles.
  336. if lg.compressLogs && len(body) > 256 {
  337. zbody := zstdframe.AppendEncode(nil, body,
  338. zstdframe.FastestCompression, zstdframe.LowMemory(true))
  339. // Only send it compressed if the bandwidth savings are sufficient.
  340. // Just the extra headers associated with enabling compression
  341. // are 50 bytes by themselves.
  342. if len(body)-len(zbody) > 64 {
  343. origlen = len(body)
  344. body = zbody
  345. }
  346. }
  347. var lastError string
  348. var numFailures int
  349. var firstFailure time.Time
  350. for len(body) > 0 && ctx.Err() == nil {
  351. retryAfter, err := lg.upload(ctx, body, origlen)
  352. if err != nil {
  353. numFailures++
  354. firstFailure = lg.clock.Now()
  355. if !lg.internetUp() {
  356. fmt.Fprintf(lg.stderr, "logtail: internet down; waiting\n")
  357. lg.awaitInternetUp(ctx)
  358. continue
  359. }
  360. // Only print the same message once.
  361. if currError := err.Error(); lastError != currError {
  362. fmt.Fprintf(lg.stderr, "logtail: upload: %v\n", err)
  363. lastError = currError
  364. }
  365. // Sleep for the specified retryAfter period,
  366. // otherwise default to some random value.
  367. if retryAfter <= 0 {
  368. retryAfter = mrand.N(30*time.Second) + 30*time.Second
  369. }
  370. tstime.Sleep(ctx, retryAfter)
  371. } else {
  372. // Only print a success message after recovery.
  373. if numFailures > 0 {
  374. fmt.Fprintf(lg.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, lg.clock.Since(firstFailure).Round(time.Second))
  375. }
  376. break
  377. }
  378. }
  379. select {
  380. case <-lg.shutdownStart:
  381. return
  382. default:
  383. }
  384. }
  385. }
  386. func (lg *Logger) internetUp() bool {
  387. select {
  388. case <-lg.networkIsUp.Ready():
  389. return true
  390. default:
  391. if lg.netMonitor == nil {
  392. return true // No way to tell, so assume it is.
  393. }
  394. return lg.netMonitor.InterfaceState().AnyInterfaceUp()
  395. }
  396. }
  397. // onChangeDelta is an eventbus subscriber function that handles
  398. // [netmon.ChangeDelta] events to detect whether the Internet is expected to be
  399. // reachable.
  400. func (lg *Logger) onChangeDelta(delta *netmon.ChangeDelta) {
  401. if delta.AnyInterfaceUp() {
  402. fmt.Fprintf(lg.stderr, "logtail: internet back up\n")
  403. lg.networkIsUp.Set()
  404. } else {
  405. fmt.Fprintf(lg.stderr, "logtail: network changed, but is not up\n")
  406. lg.networkIsUp.Reset()
  407. }
  408. }
  409. func (lg *Logger) awaitInternetUp(ctx context.Context) {
  410. if lg.eventClient != nil {
  411. select {
  412. case <-lg.networkIsUp.Ready():
  413. case <-ctx.Done():
  414. }
  415. return
  416. }
  417. upc := make(chan bool, 1)
  418. defer lg.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
  419. if delta.AnyInterfaceUp() {
  420. select {
  421. case upc <- true:
  422. default:
  423. }
  424. }
  425. })()
  426. if lg.internetUp() {
  427. return
  428. }
  429. select {
  430. case <-upc:
  431. fmt.Fprintf(lg.stderr, "logtail: internet back up\n")
  432. case <-ctx.Done():
  433. }
  434. }
  435. // upload uploads body to the log server.
  436. // origlen indicates the pre-compression body length.
  437. // origlen of -1 indicates that the body is not compressed.
  438. func (lg *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) {
  439. lg.uploadCalls.Add(1)
  440. startUpload := time.Now()
  441. const maxUploadTime = 45 * time.Second
  442. ctx = sockstats.WithSockStats(ctx, lg.sockstatsLabel.Load(), lg.Logf)
  443. ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
  444. defer cancel()
  445. req, err := http.NewRequestWithContext(ctx, "POST", lg.url, bytes.NewReader(body))
  446. if err != nil {
  447. // I know of no conditions under which this could fail.
  448. // Report it very loudly.
  449. // TODO record logs to disk
  450. panic("logtail: cannot build http request: " + err.Error())
  451. }
  452. if origlen != -1 {
  453. req.Header.Add("Content-Encoding", "zstd")
  454. req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
  455. }
  456. if runtime.GOOS == "js" {
  457. // We once advertised we'd accept optional client certs (for internal use)
  458. // on log.tailscale.com but then Tailscale SSH js/wasm clients prompted
  459. // users (on some browsers?) to pick a client cert. We'll fix the server's
  460. // TLS ServerHello, but we can also fix it client side for good measure.
  461. //
  462. // Corp details: https://github.com/tailscale/corp/issues/18177#issuecomment-2026598715
  463. // and https://github.com/tailscale/corp/pull/18775#issuecomment-2027505036
  464. //
  465. // See https://github.com/golang/go/wiki/WebAssembly#configuring-fetch-options-while-using-nethttp
  466. // and https://developer.mozilla.org/en-US/docs/Web/API/fetch#credentials
  467. req.Header.Set("js.fetch:credentials", "omit")
  468. }
  469. req.Header["User-Agent"] = nil // not worth writing one; save some bytes
  470. compressedNote := "not-compressed"
  471. if origlen != -1 {
  472. compressedNote = "compressed"
  473. }
  474. lg.httpDoCalls.Add(1)
  475. resp, err := lg.httpc.Do(req)
  476. if err != nil {
  477. lg.failedCalls.Add(1)
  478. return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
  479. }
  480. defer resp.Body.Close()
  481. if resp.StatusCode != http.StatusOK {
  482. lg.failedCalls.Add(1)
  483. n, _ := strconv.Atoi(resp.Header.Get("Retry-After"))
  484. b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
  485. return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b))
  486. }
  487. lg.uploadedBytes.Add(int64(len(body)))
  488. lg.uploadingTime.Add(int64(time.Since(startUpload)))
  489. return 0, nil
  490. }
  491. // Flush uploads all logs to the server. It blocks until complete or there is an
  492. // unrecoverable error.
  493. //
  494. // TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
  495. // Finish cleaning this up.
  496. func (lg *Logger) Flush() error {
  497. return nil
  498. }
  499. // StartFlush starts a log upload, if anything is pending.
  500. //
  501. // If l is nil, StartFlush is a no-op.
  502. func (lg *Logger) StartFlush() {
  503. if lg != nil {
  504. lg.tryDrainWake()
  505. }
  506. }
  507. // ExpVar report metrics about the logger.
  508. //
  509. // - counter_upload_calls: Total number of upload attempts.
  510. //
  511. // - counter_upload_errors: Total number of upload attempts that failed.
  512. //
  513. // - counter_uploaded_bytes: Total number of bytes successfully uploaded
  514. // (which is calculated after compression is applied).
  515. //
  516. // - counter_uploading_nsecs: Total number of nanoseconds spent uploading.
  517. //
  518. // - buffer: An optional [metrics.Set] with metrics for the [Buffer].
  519. func (lg *Logger) ExpVar() expvar.Var {
  520. m := new(metrics.Set)
  521. m.Set("counter_upload_calls", &lg.uploadCalls)
  522. m.Set("counter_upload_errors", &lg.failedCalls)
  523. m.Set("counter_uploaded_bytes", &lg.uploadedBytes)
  524. m.Set("counter_uploading_nsecs", &lg.uploadingTime)
  525. if v, ok := lg.buffer.(interface{ ExpVar() expvar.Var }); ok {
  526. m.Set("buffer", v.ExpVar())
  527. }
  528. return m
  529. }
  530. // logtailDisabled is whether logtail uploads to logcatcher are disabled.
  531. var logtailDisabled atomic.Bool
  532. // Disable disables logtail uploads for the lifetime of the process.
  533. func Disable() {
  534. logtailDisabled.Store(true)
  535. }
  536. var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
  537. // tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
  538. // It does not block.
  539. func (lg *Logger) tryDrainWake() {
  540. lg.flushPending.Store(false)
  541. if debugWakesAndUploads() {
  542. // Using println instead of log.Printf here to avoid recursing back into
  543. // ourselves.
  544. println("logtail: try drain wake, numHTTP:", lg.httpDoCalls.Load())
  545. }
  546. select {
  547. case lg.drainWake <- struct{}{}:
  548. default:
  549. }
  550. }
  551. func (lg *Logger) sendLocked(jsonBlob []byte) (int, error) {
  552. tapSend(jsonBlob)
  553. if logtailDisabled.Load() {
  554. return len(jsonBlob), nil
  555. }
  556. n, err := lg.buffer.Write(jsonBlob)
  557. flushDelay := defaultFlushDelay
  558. if lg.flushDelayFn != nil {
  559. flushDelay = lg.flushDelayFn()
  560. }
  561. if flushDelay > 0 {
  562. if lg.flushPending.CompareAndSwap(false, true) {
  563. if lg.flushTimer == nil {
  564. lg.flushTimer = lg.clock.AfterFunc(flushDelay, lg.tryDrainWake)
  565. } else {
  566. lg.flushTimer.Reset(flushDelay)
  567. }
  568. }
  569. } else {
  570. lg.tryDrainWake()
  571. }
  572. return n, err
  573. }
  574. // appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
  575. // This assumes dst is already within a JSON object.
  576. // Each member is comma-terminated.
  577. func (lg *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, procID uint32, procSequence uint64, errDetail string, errData jsontext.Value, level int) []byte {
  578. // Append optional logtail metadata.
  579. if !skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
  580. dst = append(dst, `"logtail":{`...)
  581. if !skipClientTime {
  582. dst = append(dst, `"client_time":"`...)
  583. dst = lg.clock.Now().UTC().AppendFormat(dst, time.RFC3339Nano)
  584. dst = append(dst, '"', ',')
  585. }
  586. if procID != 0 {
  587. dst = append(dst, `"proc_id":`...)
  588. dst = strconv.AppendUint(dst, uint64(procID), 10)
  589. dst = append(dst, ',')
  590. }
  591. if procSequence != 0 {
  592. dst = append(dst, `"proc_seq":`...)
  593. dst = strconv.AppendUint(dst, procSequence, 10)
  594. dst = append(dst, ',')
  595. }
  596. if errDetail != "" || errData != nil {
  597. dst = append(dst, `"error":{`...)
  598. if errDetail != "" {
  599. dst = append(dst, `"detail":`...)
  600. dst, _ = jsontext.AppendQuote(dst, errDetail)
  601. dst = append(dst, ',')
  602. }
  603. if errData != nil {
  604. dst = append(dst, `"bad_data":`...)
  605. dst = append(dst, errData...)
  606. dst = append(dst, ',')
  607. }
  608. dst = bytes.TrimRight(dst, ",")
  609. dst = append(dst, '}', ',')
  610. }
  611. dst = bytes.TrimRight(dst, ",")
  612. dst = append(dst, '}', ',')
  613. }
  614. // Append optional metrics metadata.
  615. if !skipMetrics && lg.metricsDelta != nil {
  616. if d := lg.metricsDelta(); d != "" {
  617. dst = append(dst, `"metrics":"`...)
  618. dst = append(dst, d...)
  619. dst = append(dst, '"', ',')
  620. }
  621. }
  622. // Add the optional log level, if non-zero.
  623. // Note that we only use log levels 1 and 2 currently.
  624. // It's unlikely we'll ever make it past 9.
  625. if level > 0 && level < 10 {
  626. dst = append(dst, `"v":`...)
  627. dst = append(dst, '0'+byte(level))
  628. dst = append(dst, ',')
  629. }
  630. return dst
  631. }
  632. // appendText appends a raw text message in the Tailscale JSON log entry format.
  633. func (lg *Logger) appendText(dst, src []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
  634. dst = slices.Grow(dst, len(src))
  635. dst = append(dst, '{')
  636. dst = lg.appendMetadata(dst, skipClientTime, false, procID, procSequence, "", nil, level)
  637. if len(src) == 0 {
  638. dst = bytes.TrimRight(dst, ",")
  639. return append(dst, "}\n"...)
  640. }
  641. // Append the text string, which may be truncated.
  642. // Invalid UTF-8 will be mangled with the Unicode replacement character.
  643. max := maxTextSize
  644. if lg.lowMem {
  645. max /= lowMemRatio
  646. }
  647. dst = append(dst, `"text":`...)
  648. dst = appendTruncatedString(dst, src, max)
  649. return append(dst, "}\n"...)
  650. }
  651. // appendTruncatedString appends a JSON string for src,
  652. // truncating the src to be no larger than n.
  653. func appendTruncatedString(dst, src []byte, n int) []byte {
  654. srcLen := len(src)
  655. src = truncate.String(src, n)
  656. dst, _ = jsontext.AppendQuote(dst, src) // ignore error; only occurs for invalid UTF-8
  657. if srcLen > len(src) {
  658. dst = dst[:len(dst)-len(`"`)] // trim off preceding double-quote
  659. dst = append(dst, "…+"...)
  660. dst = strconv.AppendInt(dst, int64(srcLen-len(src)), 10)
  661. dst = append(dst, '"') // re-append succeeding double-quote
  662. }
  663. return dst
  664. }
  665. // appendTextOrJSONLocked appends a raw text message or a raw JSON object
  666. // in the Tailscale JSON log format.
  667. func (lg *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
  668. if lg.includeProcSequence {
  669. lg.procSequence++
  670. }
  671. if len(src) == 0 || src[0] != '{' {
  672. return lg.appendText(dst, src, lg.skipClientTime, lg.procID, lg.procSequence, level)
  673. }
  674. // Check whether the input is a valid JSON object and
  675. // whether it contains the reserved "logtail" name at the top-level.
  676. var logtailKeyOffset, logtailValOffset, logtailValLength int
  677. validJSON := func() bool {
  678. // The jsontext.NewDecoder API operates on an io.Reader, for which
  679. // bytes.Buffer provides a means to convert a []byte into an io.Reader.
  680. // However, bytes.NewBuffer normally allocates unless
  681. // we immediately shallow copy it into a pre-allocated Buffer struct.
  682. // See https://go.dev/issue/67004.
  683. lg.bytesBuf = *bytes.NewBuffer(src)
  684. defer func() { lg.bytesBuf = bytes.Buffer{} }() // avoid pinning src
  685. dec := &lg.jsonDec
  686. dec.Reset(&lg.bytesBuf)
  687. if tok, err := dec.ReadToken(); tok.Kind() != '{' || err != nil {
  688. return false
  689. }
  690. for dec.PeekKind() != '}' {
  691. keyOffset := dec.InputOffset()
  692. tok, err := dec.ReadToken()
  693. if err != nil {
  694. return false
  695. }
  696. isLogtail := tok.String() == "logtail"
  697. valOffset := dec.InputOffset()
  698. if dec.SkipValue() != nil {
  699. return false
  700. }
  701. if isLogtail {
  702. logtailKeyOffset = int(keyOffset)
  703. logtailValOffset = int(valOffset)
  704. logtailValLength = int(dec.InputOffset()) - logtailValOffset
  705. }
  706. }
  707. if tok, err := dec.ReadToken(); tok.Kind() != '}' || err != nil {
  708. return false
  709. }
  710. if _, err := dec.ReadToken(); err != io.EOF {
  711. return false // trailing junk after JSON object
  712. }
  713. return true
  714. }()
  715. // Treat invalid JSON as a raw text message.
  716. if !validJSON {
  717. return lg.appendText(dst, src, lg.skipClientTime, lg.procID, lg.procSequence, level)
  718. }
  719. // Check whether the JSON payload is too large.
  720. // Due to logtail metadata, the formatted log entry could exceed maxSize.
  721. // That's okay as the Tailscale log service limit is actually 2*maxSize.
  722. // However, so long as logging applications aim to target the maxSize limit,
  723. // there should be no trouble eventually uploading logs.
  724. maxLen := cmp.Or(lg.maxUploadSize, maxSize)
  725. if len(src) > maxLen {
  726. errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
  727. errData := appendTruncatedString(nil, src, maxLen/len(`\uffff`)) // escaping could increase size
  728. dst = append(dst, '{')
  729. dst = lg.appendMetadata(dst, lg.skipClientTime, true, lg.procID, lg.procSequence, errDetail, errData, level)
  730. dst = bytes.TrimRight(dst, ",")
  731. return append(dst, "}\n"...)
  732. }
  733. // Check whether the reserved logtail member occurs in the log data.
  734. // If so, it is moved to the the logtail/error member.
  735. const jsonSeperators = ",:" // per RFC 8259, section 2
  736. const jsonWhitespace = " \n\r\t" // per RFC 8259, section 2
  737. var errDetail string
  738. var errData jsontext.Value
  739. if logtailValLength > 0 {
  740. errDetail = "duplicate logtail member"
  741. errData = bytes.Trim(src[logtailValOffset:][:logtailValLength], jsonSeperators+jsonWhitespace)
  742. }
  743. dst = slices.Grow(dst, len(src))
  744. dst = append(dst, '{')
  745. dst = lg.appendMetadata(dst, lg.skipClientTime, true, lg.procID, lg.procSequence, errDetail, errData, level)
  746. if logtailValLength > 0 {
  747. // Exclude original logtail member from the message.
  748. dst = appendWithoutNewline(dst, src[len("{"):logtailKeyOffset])
  749. dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
  750. dst = appendWithoutNewline(dst, src[logtailValOffset+logtailValLength:])
  751. } else {
  752. dst = appendWithoutNewline(dst, src[len("{"):])
  753. }
  754. dst = bytes.TrimRight(dst, jsonWhitespace)
  755. dst = dst[:len(dst)-len("}")]
  756. dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
  757. return append(dst, "}\n"...)
  758. }
  759. // appendWithoutNewline appends src to dst except that it ignores newlines
  760. // since newlines are used to frame individual log entries.
  761. func appendWithoutNewline(dst, src []byte) []byte {
  762. for _, c := range src {
  763. if c != '\n' {
  764. dst = append(dst, c)
  765. }
  766. }
  767. return dst
  768. }
  769. // Logf logs to l using the provided fmt-style format and optional arguments.
  770. func (lg *Logger) Logf(format string, args ...any) {
  771. fmt.Fprintf(lg, format, args...)
  772. }
  773. // Write logs an encoded JSON blob.
  774. //
  775. // If the []byte passed to Write is not an encoded JSON blob,
  776. // then contents is fit into a JSON blob and written.
  777. //
  778. // This is intended as an interface for the stdlib "log" package.
  779. func (lg *Logger) Write(buf []byte) (int, error) {
  780. if len(buf) == 0 {
  781. return 0, nil
  782. }
  783. inLen := len(buf) // length as provided to us, before modifications to downstream writers
  784. level, buf := parseAndRemoveLogLevel(buf)
  785. if lg.stderr != nil && lg.stderr != io.Discard && int64(level) <= atomic.LoadInt64(&lg.stderrLevel) {
  786. if buf[len(buf)-1] == '\n' {
  787. lg.stderr.Write(buf)
  788. } else {
  789. // The log package always line-terminates logs,
  790. // so this is an uncommon path.
  791. withNL := append(buf[:len(buf):len(buf)], '\n')
  792. lg.stderr.Write(withNL)
  793. }
  794. }
  795. lg.writeLock.Lock()
  796. defer lg.writeLock.Unlock()
  797. b := lg.appendTextOrJSONLocked(lg.writeBuf[:0], buf, level)
  798. _, err := lg.sendLocked(b)
  799. return inLen, err
  800. }
  801. var (
  802. openBracketV = []byte("[v")
  803. v1 = []byte("[v1] ")
  804. v2 = []byte("[v2] ")
  805. vJSON = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
  806. )
  807. // level 0 is normal (or unknown) level; 1+ are increasingly verbose
  808. func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
  809. if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
  810. return 0, buf
  811. }
  812. if bytes.Contains(buf, v1) {
  813. return 1, bytes.ReplaceAll(buf, v1, nil)
  814. }
  815. if bytes.Contains(buf, v2) {
  816. return 2, bytes.ReplaceAll(buf, v2, nil)
  817. }
  818. if i := bytes.Index(buf, vJSON); i != -1 {
  819. rest := buf[i+len(vJSON):]
  820. if len(rest) >= 2 {
  821. v := rest[0]
  822. if v >= '0' && v <= '9' {
  823. return int(v - '0'), rest[1:]
  824. }
  825. }
  826. }
  827. return 0, buf
  828. }
  829. var (
  830. tapSetSize atomic.Int32
  831. tapMu sync.Mutex
  832. tapSet set.HandleSet[chan<- string]
  833. )
  834. // RegisterLogTap registers dst to get a copy of every log write. The caller
  835. // must call unregister when done watching.
  836. //
  837. // This would ideally be a method on Logger, but Logger isn't really available
  838. // in most places; many writes go via stderr which filch redirects to the
  839. // singleton Logger set up early. For better or worse, there's basically only
  840. // one Logger within the program. This mechanism at least works well for
  841. // tailscaled. It works less well for a binary with multiple tsnet.Servers. Oh
  842. // well. This then subscribes to all of them.
  843. func RegisterLogTap(dst chan<- string) (unregister func()) {
  844. tapMu.Lock()
  845. defer tapMu.Unlock()
  846. h := tapSet.Add(dst)
  847. tapSetSize.Store(int32(len(tapSet)))
  848. return func() {
  849. tapMu.Lock()
  850. defer tapMu.Unlock()
  851. delete(tapSet, h)
  852. tapSetSize.Store(int32(len(tapSet)))
  853. }
  854. }
  855. // tapSend relays the JSON blob to any/all registered local debug log watchers
  856. // (somebody running "tailscale debug daemon-logs").
  857. func tapSend(jsonBlob []byte) {
  858. if tapSetSize.Load() == 0 {
  859. return
  860. }
  861. s := string(jsonBlob)
  862. tapMu.Lock()
  863. defer tapMu.Unlock()
  864. for _, dst := range tapSet {
  865. select {
  866. case dst <- s:
  867. default:
  868. }
  869. }
  870. }