logtail.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package logtail sends logs to log.tailscale.io.
  4. package logtail
  5. import (
  6. "bytes"
  7. "context"
  8. "crypto/rand"
  9. "encoding/binary"
  10. "encoding/json"
  11. "fmt"
  12. "io"
  13. "log"
  14. mrand "math/rand"
  15. "net/http"
  16. "os"
  17. "strconv"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "tailscale.com/envknob"
  22. "tailscale.com/net/netmon"
  23. "tailscale.com/net/sockstats"
  24. "tailscale.com/tstime"
  25. tslogger "tailscale.com/types/logger"
  26. "tailscale.com/types/logid"
  27. "tailscale.com/util/set"
  28. )
  29. // DefaultHost is the default host name to upload logs to when
  30. // Config.BaseURL isn't provided.
  31. const DefaultHost = "log.tailscale.io"
  32. const defaultFlushDelay = 2 * time.Second
  33. const (
  34. // CollectionNode is the name of a logtail Config.Collection
  35. // for tailscaled (or equivalent: IPNExtension, Android app).
  36. CollectionNode = "tailnode.log.tailscale.io"
  37. )
  38. type Encoder interface {
  39. EncodeAll(src, dst []byte) []byte
  40. Close() error
  41. }
  42. type Config struct {
  43. Collection string // collection name, a domain name
  44. PrivateID logid.PrivateID // private ID for the primary log stream
  45. CopyPrivateID logid.PrivateID // private ID for a log stream that is a superset of this log stream
  46. BaseURL string // if empty defaults to "https://log.tailscale.io"
  47. HTTPC *http.Client // if empty defaults to http.DefaultClient
  48. SkipClientTime bool // if true, client_time is not written to logs
  49. LowMemory bool // if true, logtail minimizes memory use
  50. Clock tstime.Clock // if set, Clock.Now substitutes uses of time.Now
  51. Stderr io.Writer // if set, logs are sent here instead of os.Stderr
  52. StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
  53. Buffer Buffer // temp storage, if nil a MemoryBuffer
  54. NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
  55. // MetricsDelta, if non-nil, is a func that returns an encoding
  56. // delta in clientmetrics to upload alongside existing logs.
  57. // It can return either an empty string (for nothing) or a string
  58. // that's safe to embed in a JSON string literal without further escaping.
  59. MetricsDelta func() string
  60. // FlushDelayFn, if non-nil is a func that returns how long to wait to
  61. // accumulate logs before uploading them. 0 or negative means to upload
  62. // immediately.
  63. //
  64. // If nil, a default value is used. (currently 2 seconds)
  65. FlushDelayFn func() time.Duration
  66. // IncludeProcID, if true, results in an ephemeral process identifier being
  67. // included in logs. The ID is random and not guaranteed to be globally
  68. // unique, but it can be used to distinguish between different instances
  69. // running with same PrivateID.
  70. IncludeProcID bool
  71. // IncludeProcSequence, if true, results in an ephemeral sequence number
  72. // being included in the logs. The sequence number is incremented for each
  73. // log message sent, but is not persisted across process restarts.
  74. IncludeProcSequence bool
  75. }
  76. func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
  77. if cfg.BaseURL == "" {
  78. cfg.BaseURL = "https://" + DefaultHost
  79. }
  80. if cfg.HTTPC == nil {
  81. cfg.HTTPC = http.DefaultClient
  82. }
  83. if cfg.Clock == nil {
  84. cfg.Clock = tstime.StdClock{}
  85. }
  86. if cfg.Stderr == nil {
  87. cfg.Stderr = os.Stderr
  88. }
  89. if cfg.Buffer == nil {
  90. pendingSize := 256
  91. if cfg.LowMemory {
  92. pendingSize = 64
  93. }
  94. cfg.Buffer = NewMemoryBuffer(pendingSize)
  95. }
  96. var procID uint32
  97. if cfg.IncludeProcID {
  98. keyBytes := make([]byte, 4)
  99. rand.Read(keyBytes)
  100. procID = binary.LittleEndian.Uint32(keyBytes)
  101. if procID == 0 {
  102. // 0 is the empty/off value, assign a different (non-zero) value to
  103. // make sure we still include an ID (actual value does not matter).
  104. procID = 7
  105. }
  106. }
  107. if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
  108. if delay, err := time.ParseDuration(s); err == nil {
  109. cfg.FlushDelayFn = func() time.Duration { return delay }
  110. } else {
  111. log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
  112. }
  113. } else if cfg.FlushDelayFn == nil && envknob.Bool("IN_TS_TEST") {
  114. cfg.FlushDelayFn = func() time.Duration { return 0 }
  115. }
  116. var urlSuffix string
  117. if !cfg.CopyPrivateID.IsZero() {
  118. urlSuffix = "?copyId=" + cfg.CopyPrivateID.String()
  119. }
  120. l := &Logger{
  121. privateID: cfg.PrivateID,
  122. stderr: cfg.Stderr,
  123. stderrLevel: int64(cfg.StderrLevel),
  124. httpc: cfg.HTTPC,
  125. url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
  126. lowMem: cfg.LowMemory,
  127. buffer: cfg.Buffer,
  128. skipClientTime: cfg.SkipClientTime,
  129. drainWake: make(chan struct{}, 1),
  130. sentinel: make(chan int32, 16),
  131. flushDelayFn: cfg.FlushDelayFn,
  132. clock: cfg.Clock,
  133. metricsDelta: cfg.MetricsDelta,
  134. procID: procID,
  135. includeProcSequence: cfg.IncludeProcSequence,
  136. shutdownStart: make(chan struct{}),
  137. shutdownDone: make(chan struct{}),
  138. }
  139. l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
  140. if cfg.NewZstdEncoder != nil {
  141. l.zstdEncoder = cfg.NewZstdEncoder()
  142. }
  143. ctx, cancel := context.WithCancel(context.Background())
  144. l.uploadCancel = cancel
  145. go l.uploading(ctx)
  146. l.Write([]byte("logtail started"))
  147. return l
  148. }
  149. // Logger writes logs, splitting them as configured between local
  150. // logging facilities and uploading to a log server.
  151. type Logger struct {
  152. stderr io.Writer
  153. stderrLevel int64 // accessed atomically
  154. httpc *http.Client
  155. url string
  156. lowMem bool
  157. skipClientTime bool
  158. netMonitor *netmon.Monitor
  159. buffer Buffer
  160. drainWake chan struct{} // signal to speed up drain
  161. flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
  162. flushPending atomic.Bool
  163. sentinel chan int32
  164. clock tstime.Clock
  165. zstdEncoder Encoder
  166. uploadCancel func()
  167. explainedRaw bool
  168. metricsDelta func() string // or nil
  169. privateID logid.PrivateID
  170. httpDoCalls atomic.Int32
  171. sockstatsLabel atomicSocktatsLabel
  172. procID uint32
  173. includeProcSequence bool
  174. writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
  175. procSequence uint64
  176. flushTimer tstime.TimerController // used when flushDelay is >0
  177. shutdownStartMu sync.Mutex // guards the closing of shutdownStart
  178. shutdownStart chan struct{} // closed when shutdown begins
  179. shutdownDone chan struct{} // closed when shutdown complete
  180. }
  181. type atomicSocktatsLabel struct{ p atomic.Uint32 }
  182. func (p *atomicSocktatsLabel) Load() sockstats.Label { return sockstats.Label(p.p.Load()) }
  183. func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(label)) }
  184. // SetVerbosityLevel controls the verbosity level that should be
  185. // written to stderr. 0 is the default (not verbose). Levels 1 or higher
  186. // are increasingly verbose.
  187. func (l *Logger) SetVerbosityLevel(level int) {
  188. atomic.StoreInt64(&l.stderrLevel, int64(level))
  189. }
  190. // SetNetMon sets the optional the network monitor.
  191. //
  192. // It should not be changed concurrently with log writes and should
  193. // only be set once.
  194. func (l *Logger) SetNetMon(lm *netmon.Monitor) {
  195. l.netMonitor = lm
  196. }
  197. // SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
  198. func (l *Logger) SetSockstatsLabel(label sockstats.Label) {
  199. l.sockstatsLabel.Store(label)
  200. }
  201. // PrivateID returns the logger's private log ID.
  202. //
  203. // It exists for internal use only.
  204. func (l *Logger) PrivateID() logid.PrivateID { return l.privateID }
  205. // Shutdown gracefully shuts down the logger while completing any
  206. // remaining uploads.
  207. //
  208. // It will block, continuing to try and upload unless the passed
  209. // context object interrupts it by being done.
  210. // If the shutdown is interrupted, an error is returned.
  211. func (l *Logger) Shutdown(ctx context.Context) error {
  212. done := make(chan struct{})
  213. go func() {
  214. select {
  215. case <-ctx.Done():
  216. l.uploadCancel()
  217. <-l.shutdownDone
  218. case <-l.shutdownDone:
  219. }
  220. close(done)
  221. }()
  222. l.shutdownStartMu.Lock()
  223. select {
  224. case <-l.shutdownStart:
  225. l.shutdownStartMu.Unlock()
  226. return nil
  227. default:
  228. }
  229. close(l.shutdownStart)
  230. l.shutdownStartMu.Unlock()
  231. io.WriteString(l, "logger closing down\n")
  232. <-done
  233. if l.zstdEncoder != nil {
  234. return l.zstdEncoder.Close()
  235. }
  236. return nil
  237. }
  238. // Close shuts down this logger object, the background log uploader
  239. // process, and any associated goroutines.
  240. //
  241. // Deprecated: use Shutdown
  242. func (l *Logger) Close() {
  243. l.Shutdown(context.Background())
  244. }
  245. // drainBlock is called by drainPending when there are no logs to drain.
  246. //
  247. // In typical operation, every call to the Write method unblocks and triggers a
  248. // buffer.TryReadline, so logs are written with very low latency.
  249. //
  250. // If the caller specified FlushInterface, drainWake is only sent to
  251. // periodically.
  252. func (l *Logger) drainBlock() (shuttingDown bool) {
  253. select {
  254. case <-l.shutdownStart:
  255. return true
  256. case <-l.drainWake:
  257. }
  258. return false
  259. }
  260. // drainPending drains and encodes a batch of logs from the buffer for upload.
  261. // It uses scratch as its initial buffer.
  262. // If no logs are available, drainPending blocks until logs are available.
  263. func (l *Logger) drainPending(scratch []byte) (res []byte) {
  264. buf := bytes.NewBuffer(scratch[:0])
  265. buf.WriteByte('[')
  266. entries := 0
  267. var batchDone bool
  268. const maxLen = 256 << 10
  269. for buf.Len() < maxLen && !batchDone {
  270. b, err := l.buffer.TryReadLine()
  271. if err == io.EOF {
  272. break
  273. } else if err != nil {
  274. b = fmt.Appendf(nil, "reading ringbuffer: %v", err)
  275. batchDone = true
  276. } else if b == nil {
  277. if entries > 0 {
  278. break
  279. }
  280. batchDone = l.drainBlock()
  281. continue
  282. }
  283. if len(b) == 0 {
  284. continue
  285. }
  286. if b[0] != '{' || !json.Valid(b) {
  287. // This is probably a log added to stderr by filch
  288. // outside of the logtail logger. Encode it.
  289. if !l.explainedRaw {
  290. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  291. fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
  292. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  293. fmt.Fprintf(l.stderr, "RAW-STDERR:\n")
  294. l.explainedRaw = true
  295. }
  296. fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
  297. // Do not add a client time, as it could have been
  298. // been written a long time ago. Don't include instance key or ID
  299. // either, since this came from a different instance.
  300. b = l.encodeText(b, true, 0, 0, 0)
  301. }
  302. if entries > 0 {
  303. buf.WriteByte(',')
  304. }
  305. buf.Write(b)
  306. entries++
  307. }
  308. buf.WriteByte(']')
  309. if buf.Len() <= len("[]") {
  310. return nil
  311. }
  312. return buf.Bytes()
  313. }
  314. // This is the goroutine that repeatedly uploads logs in the background.
  315. func (l *Logger) uploading(ctx context.Context) {
  316. defer close(l.shutdownDone)
  317. scratch := make([]byte, 4096) // reusable buffer to write into
  318. for {
  319. body := l.drainPending(scratch)
  320. origlen := -1 // sentinel value: uncompressed
  321. // Don't attempt to compress tiny bodies; not worth the CPU cycles.
  322. if l.zstdEncoder != nil && len(body) > 256 {
  323. zbody := l.zstdEncoder.EncodeAll(body, nil)
  324. // Only send it compressed if the bandwidth savings are sufficient.
  325. // Just the extra headers associated with enabling compression
  326. // are 50 bytes by themselves.
  327. if len(body)-len(zbody) > 64 {
  328. origlen = len(body)
  329. body = zbody
  330. }
  331. }
  332. var lastError string
  333. var numFailures int
  334. var firstFailure time.Time
  335. for len(body) > 0 && ctx.Err() == nil {
  336. retryAfter, err := l.upload(ctx, body, origlen)
  337. if err != nil {
  338. numFailures++
  339. firstFailure = l.clock.Now()
  340. if !l.internetUp() {
  341. fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
  342. l.awaitInternetUp(ctx)
  343. continue
  344. }
  345. // Only print the same message once.
  346. if currError := err.Error(); lastError != currError {
  347. fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
  348. lastError = currError
  349. }
  350. // Sleep for the specified retryAfter period,
  351. // otherwise default to some random value.
  352. if retryAfter <= 0 {
  353. retryAfter = time.Duration(30+mrand.Intn(30)) * time.Second
  354. }
  355. tstime.Sleep(ctx, retryAfter)
  356. } else {
  357. // Only print a success message after recovery.
  358. if numFailures > 0 {
  359. fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, l.clock.Since(firstFailure).Round(time.Second))
  360. }
  361. break
  362. }
  363. }
  364. select {
  365. case <-l.shutdownStart:
  366. return
  367. default:
  368. }
  369. }
  370. }
  371. func (l *Logger) internetUp() bool {
  372. if l.netMonitor == nil {
  373. // No way to tell, so assume it is.
  374. return true
  375. }
  376. return l.netMonitor.InterfaceState().AnyInterfaceUp()
  377. }
  378. func (l *Logger) awaitInternetUp(ctx context.Context) {
  379. upc := make(chan bool, 1)
  380. defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
  381. if delta.New.AnyInterfaceUp() {
  382. select {
  383. case upc <- true:
  384. default:
  385. }
  386. }
  387. })()
  388. if l.internetUp() {
  389. return
  390. }
  391. select {
  392. case <-upc:
  393. fmt.Fprintf(l.stderr, "logtail: internet back up\n")
  394. case <-ctx.Done():
  395. }
  396. }
  397. // upload uploads body to the log server.
  398. // origlen indicates the pre-compression body length.
  399. // origlen of -1 indicates that the body is not compressed.
  400. func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) {
  401. const maxUploadTime = 45 * time.Second
  402. ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel.Load(), l.Logf)
  403. ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
  404. defer cancel()
  405. req, err := http.NewRequestWithContext(ctx, "POST", l.url, bytes.NewReader(body))
  406. if err != nil {
  407. // I know of no conditions under which this could fail.
  408. // Report it very loudly.
  409. // TODO record logs to disk
  410. panic("logtail: cannot build http request: " + err.Error())
  411. }
  412. if origlen != -1 {
  413. req.Header.Add("Content-Encoding", "zstd")
  414. req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
  415. }
  416. req.Header["User-Agent"] = nil // not worth writing one; save some bytes
  417. compressedNote := "not-compressed"
  418. if origlen != -1 {
  419. compressedNote = "compressed"
  420. }
  421. l.httpDoCalls.Add(1)
  422. resp, err := l.httpc.Do(req)
  423. if err != nil {
  424. return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
  425. }
  426. defer resp.Body.Close()
  427. if resp.StatusCode != http.StatusOK {
  428. n, _ := strconv.Atoi(resp.Header.Get("Retry-After"))
  429. b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
  430. return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b))
  431. }
  432. return 0, nil
  433. }
  434. // Flush uploads all logs to the server. It blocks until complete or there is an
  435. // unrecoverable error.
  436. //
  437. // TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
  438. // Finish cleaning this up.
  439. func (l *Logger) Flush() error {
  440. return nil
  441. }
  442. // StartFlush starts a log upload, if anything is pending.
  443. //
  444. // If l is nil, StartFlush is a no-op.
  445. func (l *Logger) StartFlush() {
  446. if l != nil {
  447. l.tryDrainWake()
  448. }
  449. }
  450. // logtailDisabled is whether logtail uploads to logcatcher are disabled.
  451. var logtailDisabled atomic.Bool
  452. // Disable disables logtail uploads for the lifetime of the process.
  453. func Disable() {
  454. logtailDisabled.Store(true)
  455. }
  456. var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
  457. // tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
  458. // It does not block.
  459. func (l *Logger) tryDrainWake() {
  460. l.flushPending.Store(false)
  461. if debugWakesAndUploads() {
  462. // Using println instead of log.Printf here to avoid recursing back into
  463. // ourselves.
  464. println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load())
  465. }
  466. select {
  467. case l.drainWake <- struct{}{}:
  468. default:
  469. }
  470. }
  471. func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
  472. tapSend(jsonBlob)
  473. if logtailDisabled.Load() {
  474. return len(jsonBlob), nil
  475. }
  476. n, err := l.buffer.Write(jsonBlob)
  477. flushDelay := defaultFlushDelay
  478. if l.flushDelayFn != nil {
  479. flushDelay = l.flushDelayFn()
  480. }
  481. if flushDelay > 0 {
  482. if l.flushPending.CompareAndSwap(false, true) {
  483. if l.flushTimer == nil {
  484. l.flushTimer = l.clock.AfterFunc(flushDelay, l.tryDrainWake)
  485. } else {
  486. l.flushTimer.Reset(flushDelay)
  487. }
  488. }
  489. } else {
  490. l.tryDrainWake()
  491. }
  492. return n, err
  493. }
  494. // TODO: instead of allocating, this should probably just append
  495. // directly into the output log buffer.
  496. func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
  497. now := l.clock.Now()
  498. // Factor in JSON encoding overhead to try to only do one alloc
  499. // in the make below (so appends don't resize the buffer).
  500. overhead := len(`{"text": ""}\n`)
  501. includeLogtail := !skipClientTime || procID != 0 || procSequence != 0
  502. if includeLogtail {
  503. overhead += len(`"logtail": {},`)
  504. }
  505. if !skipClientTime {
  506. overhead += len(`"client_time": "2006-01-02T15:04:05.999999999Z07:00",`)
  507. }
  508. if procID != 0 {
  509. overhead += len(`"proc_id": 4294967296,`)
  510. }
  511. if procSequence != 0 {
  512. overhead += len(`"proc_seq": 9007199254740992,`)
  513. }
  514. // TODO: do a pass over buf and count how many backslashes will be needed?
  515. // For now just factor in a dozen.
  516. overhead += 12
  517. // Put a sanity cap on buf's size.
  518. max := 16 << 10
  519. if l.lowMem {
  520. max = 4 << 10
  521. }
  522. var nTruncated int
  523. if len(buf) > max {
  524. nTruncated = len(buf) - max
  525. // TODO: this can break a UTF-8 character
  526. // mid-encoding. We don't tend to log
  527. // non-ASCII stuff ourselves, but e.g. client
  528. // names might be.
  529. buf = buf[:max]
  530. }
  531. b := make([]byte, 0, len(buf)+overhead)
  532. b = append(b, '{')
  533. if includeLogtail {
  534. b = append(b, `"logtail": {`...)
  535. if !skipClientTime {
  536. b = append(b, `"client_time": "`...)
  537. b = now.UTC().AppendFormat(b, time.RFC3339Nano)
  538. b = append(b, `",`...)
  539. }
  540. if procID != 0 {
  541. b = append(b, `"proc_id": `...)
  542. b = strconv.AppendUint(b, uint64(procID), 10)
  543. b = append(b, ',')
  544. }
  545. if procSequence != 0 {
  546. b = append(b, `"proc_seq": `...)
  547. b = strconv.AppendUint(b, procSequence, 10)
  548. b = append(b, ',')
  549. }
  550. b = bytes.TrimRight(b, ",")
  551. b = append(b, "}, "...)
  552. }
  553. if l.metricsDelta != nil {
  554. if d := l.metricsDelta(); d != "" {
  555. b = append(b, `"metrics": "`...)
  556. b = append(b, d...)
  557. b = append(b, `",`...)
  558. }
  559. }
  560. // Add the log level, if non-zero. Note that we only use log
  561. // levels 1 and 2 currently. It's unlikely we'll ever make it
  562. // past 9.
  563. if level > 0 && level < 10 {
  564. b = append(b, `"v":`...)
  565. b = append(b, '0'+byte(level))
  566. b = append(b, ',')
  567. }
  568. b = append(b, "\"text\": \""...)
  569. for _, c := range buf {
  570. switch c {
  571. case '\b':
  572. b = append(b, '\\', 'b')
  573. case '\f':
  574. b = append(b, '\\', 'f')
  575. case '\n':
  576. b = append(b, '\\', 'n')
  577. case '\r':
  578. b = append(b, '\\', 'r')
  579. case '\t':
  580. b = append(b, '\\', 't')
  581. case '"':
  582. b = append(b, '\\', '"')
  583. case '\\':
  584. b = append(b, '\\', '\\')
  585. default:
  586. // TODO: what about binary gibberish or non UTF-8?
  587. b = append(b, c)
  588. }
  589. }
  590. if nTruncated > 0 {
  591. b = append(b, "…+"...)
  592. b = strconv.AppendInt(b, int64(nTruncated), 10)
  593. }
  594. b = append(b, "\"}\n"...)
  595. return b
  596. }
  597. func (l *Logger) encodeLocked(buf []byte, level int) []byte {
  598. if l.includeProcSequence {
  599. l.procSequence++
  600. }
  601. if buf[0] != '{' {
  602. return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path
  603. }
  604. now := l.clock.Now()
  605. obj := make(map[string]any)
  606. if err := json.Unmarshal(buf, &obj); err != nil {
  607. for k := range obj {
  608. delete(obj, k)
  609. }
  610. obj["text"] = string(buf)
  611. }
  612. if txt, isStr := obj["text"].(string); l.lowMem && isStr && len(txt) > 254 {
  613. // TODO(crawshaw): trim to unicode code point
  614. obj["text"] = txt[:254] + "…"
  615. }
  616. hasLogtail := obj["logtail"] != nil
  617. if hasLogtail {
  618. obj["error_has_logtail"] = obj["logtail"]
  619. obj["logtail"] = nil
  620. }
  621. if !l.skipClientTime || l.procID != 0 || l.procSequence != 0 {
  622. logtail := map[string]any{}
  623. if !l.skipClientTime {
  624. logtail["client_time"] = now.UTC().Format(time.RFC3339Nano)
  625. }
  626. if l.procID != 0 {
  627. logtail["proc_id"] = l.procID
  628. }
  629. if l.procSequence != 0 {
  630. logtail["proc_seq"] = l.procSequence
  631. }
  632. obj["logtail"] = logtail
  633. }
  634. if level > 0 {
  635. obj["v"] = level
  636. }
  637. b, err := json.Marshal(obj)
  638. if err != nil {
  639. fmt.Fprintf(l.stderr, "logtail: re-encoding JSON failed: %v\n", err)
  640. // I know of no conditions under which this could fail.
  641. // Report it very loudly.
  642. panic("logtail: re-encoding JSON failed: " + err.Error())
  643. }
  644. b = append(b, '\n')
  645. return b
  646. }
  647. // Logf logs to l using the provided fmt-style format and optional arguments.
  648. func (l *Logger) Logf(format string, args ...any) {
  649. fmt.Fprintf(l, format, args...)
  650. }
  651. // Write logs an encoded JSON blob.
  652. //
  653. // If the []byte passed to Write is not an encoded JSON blob,
  654. // then contents is fit into a JSON blob and written.
  655. //
  656. // This is intended as an interface for the stdlib "log" package.
  657. func (l *Logger) Write(buf []byte) (int, error) {
  658. if len(buf) == 0 {
  659. return 0, nil
  660. }
  661. inLen := len(buf) // length as provided to us, before modifications to downstream writers
  662. level, buf := parseAndRemoveLogLevel(buf)
  663. if l.stderr != nil && l.stderr != io.Discard && int64(level) <= atomic.LoadInt64(&l.stderrLevel) {
  664. if buf[len(buf)-1] == '\n' {
  665. l.stderr.Write(buf)
  666. } else {
  667. // The log package always line-terminates logs,
  668. // so this is an uncommon path.
  669. withNL := append(buf[:len(buf):len(buf)], '\n')
  670. l.stderr.Write(withNL)
  671. }
  672. }
  673. l.writeLock.Lock()
  674. defer l.writeLock.Unlock()
  675. b := l.encodeLocked(buf, level)
  676. _, err := l.sendLocked(b)
  677. return inLen, err
  678. }
  679. var (
  680. openBracketV = []byte("[v")
  681. v1 = []byte("[v1] ")
  682. v2 = []byte("[v2] ")
  683. vJSON = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
  684. )
  685. // level 0 is normal (or unknown) level; 1+ are increasingly verbose
  686. func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
  687. if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
  688. return 0, buf
  689. }
  690. if bytes.Contains(buf, v1) {
  691. return 1, bytes.ReplaceAll(buf, v1, nil)
  692. }
  693. if bytes.Contains(buf, v2) {
  694. return 2, bytes.ReplaceAll(buf, v2, nil)
  695. }
  696. if i := bytes.Index(buf, vJSON); i != -1 {
  697. rest := buf[i+len(vJSON):]
  698. if len(rest) >= 2 {
  699. v := rest[0]
  700. if v >= '0' && v <= '9' {
  701. return int(v - '0'), rest[1:]
  702. }
  703. }
  704. }
  705. return 0, buf
  706. }
  707. var (
  708. tapSetSize atomic.Int32
  709. tapMu sync.Mutex
  710. tapSet set.HandleSet[chan<- string]
  711. )
  712. // RegisterLogTap registers dst to get a copy of every log write. The caller
  713. // must call unregister when done watching.
  714. //
  715. // This would ideally be a method on Logger, but Logger isn't really available
  716. // in most places; many writes go via stderr which filch redirects to the
  717. // singleton Logger set up early. For better or worse, there's basically only
  718. // one Logger within the program. This mechanism at least works well for
  719. // tailscaled. It works less well for a binary with multiple tsnet.Servers. Oh
  720. // well. This then subscribes to all of them.
  721. func RegisterLogTap(dst chan<- string) (unregister func()) {
  722. tapMu.Lock()
  723. defer tapMu.Unlock()
  724. h := tapSet.Add(dst)
  725. tapSetSize.Store(int32(len(tapSet)))
  726. return func() {
  727. tapMu.Lock()
  728. defer tapMu.Unlock()
  729. delete(tapSet, h)
  730. tapSetSize.Store(int32(len(tapSet)))
  731. }
  732. }
  733. // tapSend relays the JSON blob to any/all registered local debug log watchers
  734. // (somebody running "tailscale debug daemon-logs").
  735. func tapSend(jsonBlob []byte) {
  736. if tapSetSize.Load() == 0 {
  737. return
  738. }
  739. s := string(jsonBlob)
  740. tapMu.Lock()
  741. defer tapMu.Unlock()
  742. for _, dst := range tapSet {
  743. select {
  744. case dst <- s:
  745. default:
  746. }
  747. }
  748. }