logtail.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. // Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package logtail sends logs to log.tailscale.io.
  5. package logtail
  6. import (
  7. "bytes"
  8. "context"
  9. "crypto/rand"
  10. "encoding/binary"
  11. "encoding/json"
  12. "fmt"
  13. "io"
  14. "log"
  15. "net/http"
  16. "os"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "tailscale.com/envknob"
  23. "tailscale.com/logtail/backoff"
  24. "tailscale.com/net/interfaces"
  25. tslogger "tailscale.com/types/logger"
  26. "tailscale.com/wgengine/monitor"
  27. )
  28. // DefaultHost is the default host name to upload logs to when
  29. // Config.BaseURL isn't provided.
  30. const DefaultHost = "log.tailscale.io"
  31. const defaultFlushDelay = 2 * time.Second
  32. const (
  33. // CollectionNode is the name of a logtail Config.Collection
  34. // for tailscaled (or equivalent: IPNExtension, Android app).
  35. CollectionNode = "tailnode.log.tailscale.io"
  36. )
  37. type Encoder interface {
  38. EncodeAll(src, dst []byte) []byte
  39. Close() error
  40. }
  41. type Config struct {
  42. Collection string // collection name, a domain name
  43. PrivateID PrivateID // private ID for the primary log stream
  44. CopyPrivateID PrivateID // private ID for a log stream that is a superset of this log stream
  45. BaseURL string // if empty defaults to "https://log.tailscale.io"
  46. HTTPC *http.Client // if empty defaults to http.DefaultClient
  47. SkipClientTime bool // if true, client_time is not written to logs
  48. LowMemory bool // if true, logtail minimizes memory use
  49. TimeNow func() time.Time // if set, substitutes uses of time.Now
  50. Stderr io.Writer // if set, logs are sent here instead of os.Stderr
  51. StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
  52. Buffer Buffer // temp storage, if nil a MemoryBuffer
  53. NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
  54. // MetricsDelta, if non-nil, is a func that returns an encoding
  55. // delta in clientmetrics to upload alongside existing logs.
  56. // It can return either an empty string (for nothing) or a string
  57. // that's safe to embed in a JSON string literal without further escaping.
  58. MetricsDelta func() string
  59. // FlushDelay is how long to wait to accumulate logs before
  60. // uploading them.
  61. //
  62. // If zero, a default value is used. (currently 2 seconds)
  63. //
  64. // Negative means to upload immediately.
  65. FlushDelay time.Duration
  66. // IncludeProcID, if true, results in an ephemeral process identifier being
  67. // included in logs. The ID is random and not guaranteed to be globally
  68. // unique, but it can be used to distinguish between different instances
  69. // running with same PrivateID.
  70. IncludeProcID bool
  71. // IncludeProcSequence, if true, results in an ephemeral sequence number
  72. // being included in the logs. The sequence number is incremented for each
  73. // log message sent, but is not persisted across process restarts.
  74. IncludeProcSequence bool
  75. }
  76. func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
  77. if cfg.BaseURL == "" {
  78. cfg.BaseURL = "https://" + DefaultHost
  79. }
  80. if cfg.HTTPC == nil {
  81. cfg.HTTPC = http.DefaultClient
  82. }
  83. if cfg.TimeNow == nil {
  84. cfg.TimeNow = time.Now
  85. }
  86. if cfg.Stderr == nil {
  87. cfg.Stderr = os.Stderr
  88. }
  89. if cfg.Buffer == nil {
  90. pendingSize := 256
  91. if cfg.LowMemory {
  92. pendingSize = 64
  93. }
  94. cfg.Buffer = NewMemoryBuffer(pendingSize)
  95. }
  96. var procID uint32
  97. if cfg.IncludeProcID {
  98. keyBytes := make([]byte, 4)
  99. rand.Read(keyBytes)
  100. procID = binary.LittleEndian.Uint32(keyBytes)
  101. if procID == 0 {
  102. // 0 is the empty/off value, assign a different (non-zero) value to
  103. // make sure we still include an ID (actual value does not matter).
  104. procID = 7
  105. }
  106. }
  107. if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
  108. var err error
  109. cfg.FlushDelay, err = time.ParseDuration(s)
  110. if err != nil {
  111. log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
  112. }
  113. } else if cfg.FlushDelay == 0 && !envknob.Bool("IN_TS_TEST") {
  114. cfg.FlushDelay = defaultFlushDelay
  115. }
  116. stdLogf := func(f string, a ...any) {
  117. fmt.Fprintf(cfg.Stderr, strings.TrimSuffix(f, "\n")+"\n", a...)
  118. }
  119. var urlSuffix string
  120. if !cfg.CopyPrivateID.IsZero() {
  121. urlSuffix = "?copyId=" + cfg.CopyPrivateID.String()
  122. }
  123. l := &Logger{
  124. privateID: cfg.PrivateID,
  125. stderr: cfg.Stderr,
  126. stderrLevel: int64(cfg.StderrLevel),
  127. httpc: cfg.HTTPC,
  128. url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
  129. lowMem: cfg.LowMemory,
  130. buffer: cfg.Buffer,
  131. skipClientTime: cfg.SkipClientTime,
  132. drainWake: make(chan struct{}, 1),
  133. sentinel: make(chan int32, 16),
  134. flushDelay: cfg.FlushDelay,
  135. timeNow: cfg.TimeNow,
  136. bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second),
  137. metricsDelta: cfg.MetricsDelta,
  138. procID: procID,
  139. includeProcSequence: cfg.IncludeProcSequence,
  140. shutdownStart: make(chan struct{}),
  141. shutdownDone: make(chan struct{}),
  142. }
  143. if cfg.NewZstdEncoder != nil {
  144. l.zstdEncoder = cfg.NewZstdEncoder()
  145. }
  146. ctx, cancel := context.WithCancel(context.Background())
  147. l.uploadCancel = cancel
  148. go l.uploading(ctx)
  149. l.Write([]byte("logtail started"))
  150. return l
  151. }
  152. // Logger writes logs, splitting them as configured between local
  153. // logging facilities and uploading to a log server.
  154. type Logger struct {
  155. stderr io.Writer
  156. stderrLevel int64 // accessed atomically
  157. httpc *http.Client
  158. url string
  159. lowMem bool
  160. skipClientTime bool
  161. linkMonitor *monitor.Mon
  162. buffer Buffer
  163. drainWake chan struct{} // signal to speed up drain
  164. flushDelay time.Duration // negative or zero to upload agressively, or >0 to batch at this delay
  165. flushPending atomic.Bool
  166. sentinel chan int32
  167. timeNow func() time.Time
  168. bo *backoff.Backoff
  169. zstdEncoder Encoder
  170. uploadCancel func()
  171. explainedRaw bool
  172. metricsDelta func() string // or nil
  173. privateID PrivateID
  174. httpDoCalls atomic.Int32
  175. procID uint32
  176. includeProcSequence bool
  177. writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
  178. procSequence uint64
  179. flushTimer *time.Timer // used when flushDelay is >0
  180. shutdownStart chan struct{} // closed when shutdown begins
  181. shutdownDone chan struct{} // closed when shutdown complete
  182. }
  183. // SetVerbosityLevel controls the verbosity level that should be
  184. // written to stderr. 0 is the default (not verbose). Levels 1 or higher
  185. // are increasingly verbose.
  186. func (l *Logger) SetVerbosityLevel(level int) {
  187. atomic.StoreInt64(&l.stderrLevel, int64(level))
  188. }
  189. // SetLinkMonitor sets the optional the link monitor.
  190. //
  191. // It should not be changed concurrently with log writes and should
  192. // only be set once.
  193. func (l *Logger) SetLinkMonitor(lm *monitor.Mon) {
  194. l.linkMonitor = lm
  195. }
  196. // PrivateID returns the logger's private log ID.
  197. //
  198. // It exists for internal use only.
  199. func (l *Logger) PrivateID() PrivateID { return l.privateID }
  200. // Shutdown gracefully shuts down the logger while completing any
  201. // remaining uploads.
  202. //
  203. // It will block, continuing to try and upload unless the passed
  204. // context object interrupts it by being done.
  205. // If the shutdown is interrupted, an error is returned.
  206. func (l *Logger) Shutdown(ctx context.Context) error {
  207. done := make(chan struct{})
  208. go func() {
  209. select {
  210. case <-ctx.Done():
  211. l.uploadCancel()
  212. <-l.shutdownDone
  213. case <-l.shutdownDone:
  214. }
  215. close(done)
  216. }()
  217. close(l.shutdownStart)
  218. io.WriteString(l, "logger closing down\n")
  219. <-done
  220. if l.zstdEncoder != nil {
  221. return l.zstdEncoder.Close()
  222. }
  223. return nil
  224. }
  225. // Close shuts down this logger object, the background log uploader
  226. // process, and any associated goroutines.
  227. //
  228. // Deprecated: use Shutdown
  229. func (l *Logger) Close() {
  230. l.Shutdown(context.Background())
  231. }
  232. // drainBlock is called by drainPending when there are no logs to drain.
  233. //
  234. // In typical operation, every call to the Write method unblocks and triggers a
  235. // buffer.TryReadline, so logs are written with very low latency.
  236. //
  237. // If the caller specified FlushInterface, drainWake is only sent to
  238. // periodically.
  239. func (l *Logger) drainBlock() (shuttingDown bool) {
  240. select {
  241. case <-l.shutdownStart:
  242. return true
  243. case <-l.drainWake:
  244. }
  245. return false
  246. }
  247. // drainPending drains and encodes a batch of logs from the buffer for upload.
  248. // It uses scratch as its initial buffer.
  249. // If no logs are available, drainPending blocks until logs are available.
  250. func (l *Logger) drainPending(scratch []byte) (res []byte) {
  251. buf := bytes.NewBuffer(scratch[:0])
  252. buf.WriteByte('[')
  253. entries := 0
  254. var batchDone bool
  255. const maxLen = 256 << 10
  256. for buf.Len() < maxLen && !batchDone {
  257. b, err := l.buffer.TryReadLine()
  258. if err == io.EOF {
  259. break
  260. } else if err != nil {
  261. b = fmt.Appendf(nil, "reading ringbuffer: %v", err)
  262. batchDone = true
  263. } else if b == nil {
  264. if entries > 0 {
  265. break
  266. }
  267. batchDone = l.drainBlock()
  268. continue
  269. }
  270. if len(b) == 0 {
  271. continue
  272. }
  273. if b[0] != '{' || !json.Valid(b) {
  274. // This is probably a log added to stderr by filch
  275. // outside of the logtail logger. Encode it.
  276. if !l.explainedRaw {
  277. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  278. fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
  279. fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
  280. fmt.Fprintf(l.stderr, "RAW-STDERR:\n")
  281. l.explainedRaw = true
  282. }
  283. fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
  284. // Do not add a client time, as it could have been
  285. // been written a long time ago. Don't include instance key or ID
  286. // either, since this came from a different instance.
  287. b = l.encodeText(b, true, 0, 0, 0)
  288. }
  289. if entries > 0 {
  290. buf.WriteByte(',')
  291. }
  292. buf.Write(b)
  293. entries++
  294. }
  295. buf.WriteByte(']')
  296. if buf.Len() <= len("[]") {
  297. return nil
  298. }
  299. return buf.Bytes()
  300. }
  301. // This is the goroutine that repeatedly uploads logs in the background.
  302. func (l *Logger) uploading(ctx context.Context) {
  303. defer close(l.shutdownDone)
  304. scratch := make([]byte, 4096) // reusable buffer to write into
  305. for {
  306. body := l.drainPending(scratch)
  307. origlen := -1 // sentinel value: uncompressed
  308. // Don't attempt to compress tiny bodies; not worth the CPU cycles.
  309. if l.zstdEncoder != nil && len(body) > 256 {
  310. zbody := l.zstdEncoder.EncodeAll(body, nil)
  311. // Only send it compressed if the bandwidth savings are sufficient.
  312. // Just the extra headers associated with enabling compression
  313. // are 50 bytes by themselves.
  314. if len(body)-len(zbody) > 64 {
  315. origlen = len(body)
  316. body = zbody
  317. }
  318. }
  319. for len(body) > 0 {
  320. select {
  321. case <-ctx.Done():
  322. return
  323. default:
  324. }
  325. uploaded, err := l.upload(ctx, body, origlen)
  326. if err != nil {
  327. if !l.internetUp() {
  328. fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
  329. l.awaitInternetUp(ctx)
  330. continue
  331. }
  332. fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
  333. }
  334. l.bo.BackOff(ctx, err)
  335. if uploaded {
  336. break
  337. }
  338. }
  339. select {
  340. case <-l.shutdownStart:
  341. return
  342. default:
  343. }
  344. }
  345. }
  346. func (l *Logger) internetUp() bool {
  347. if l.linkMonitor == nil {
  348. // No way to tell, so assume it is.
  349. return true
  350. }
  351. return l.linkMonitor.InterfaceState().AnyInterfaceUp()
  352. }
  353. func (l *Logger) awaitInternetUp(ctx context.Context) {
  354. upc := make(chan bool, 1)
  355. defer l.linkMonitor.RegisterChangeCallback(func(changed bool, st *interfaces.State) {
  356. if st.AnyInterfaceUp() {
  357. select {
  358. case upc <- true:
  359. default:
  360. }
  361. }
  362. })()
  363. if l.internetUp() {
  364. return
  365. }
  366. select {
  367. case <-upc:
  368. fmt.Fprintf(l.stderr, "logtail: internet back up\n")
  369. case <-ctx.Done():
  370. }
  371. }
  372. // upload uploads body to the log server.
  373. // origlen indicates the pre-compression body length.
  374. // origlen of -1 indicates that the body is not compressed.
  375. func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded bool, err error) {
  376. const maxUploadTime = 45 * time.Second
  377. ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
  378. defer cancel()
  379. req, err := http.NewRequestWithContext(ctx, "POST", l.url, bytes.NewReader(body))
  380. if err != nil {
  381. // I know of no conditions under which this could fail.
  382. // Report it very loudly.
  383. // TODO record logs to disk
  384. panic("logtail: cannot build http request: " + err.Error())
  385. }
  386. if origlen != -1 {
  387. req.Header.Add("Content-Encoding", "zstd")
  388. req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
  389. }
  390. req.Header["User-Agent"] = nil // not worth writing one; save some bytes
  391. compressedNote := "not-compressed"
  392. if origlen != -1 {
  393. compressedNote = "compressed"
  394. }
  395. l.httpDoCalls.Add(1)
  396. resp, err := l.httpc.Do(req)
  397. if err != nil {
  398. return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
  399. }
  400. defer resp.Body.Close()
  401. if resp.StatusCode != 200 {
  402. uploaded = resp.StatusCode == 400 // the server saved the logs anyway
  403. b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
  404. return uploaded, fmt.Errorf("log upload of %d bytes %s failed %d: %q", len(body), compressedNote, resp.StatusCode, b)
  405. }
  406. // Try to read to EOF, in case server's response is
  407. // chunked. We want to reuse the TCP connection if it's
  408. // HTTP/1. On success, we expect 0 bytes.
  409. // TODO(bradfitz): can remove a few days after 2020-04-04 once
  410. // server is fixed.
  411. if resp.ContentLength == -1 {
  412. resp.Body.Read(make([]byte, 1))
  413. }
  414. return true, nil
  415. }
  416. // Flush uploads all logs to the server.
  417. // It blocks until complete or there is an unrecoverable error.
  418. func (l *Logger) Flush() error {
  419. return nil
  420. }
  421. // logtailDisabled is whether logtail uploads to logcatcher are disabled.
  422. var logtailDisabled atomic.Bool
  423. // Disable disables logtail uploads for the lifetime of the process.
  424. func Disable() {
  425. logtailDisabled.Store(true)
  426. }
  427. var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
  428. // tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
  429. // It does not block.
  430. func (l *Logger) tryDrainWake() {
  431. l.flushPending.Store(false)
  432. if debugWakesAndUploads() {
  433. // Using println instead of log.Printf here to avoid recursing back into
  434. // ourselves.
  435. println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load())
  436. }
  437. select {
  438. case l.drainWake <- struct{}{}:
  439. default:
  440. }
  441. }
  442. func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
  443. if logtailDisabled.Load() {
  444. return len(jsonBlob), nil
  445. }
  446. n, err := l.buffer.Write(jsonBlob)
  447. if l.flushDelay > 0 {
  448. if l.flushPending.CompareAndSwap(false, true) {
  449. if l.flushTimer == nil {
  450. l.flushTimer = time.AfterFunc(l.flushDelay, l.tryDrainWake)
  451. } else {
  452. l.flushTimer.Reset(l.flushDelay)
  453. }
  454. }
  455. } else {
  456. l.tryDrainWake()
  457. }
  458. return n, err
  459. }
  460. // TODO: instead of allocating, this should probably just append
  461. // directly into the output log buffer.
  462. func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
  463. now := l.timeNow()
  464. // Factor in JSON encoding overhead to try to only do one alloc
  465. // in the make below (so appends don't resize the buffer).
  466. overhead := len(`{"text": ""}\n`)
  467. includeLogtail := !skipClientTime || procID != 0 || procSequence != 0
  468. if includeLogtail {
  469. overhead += len(`"logtail": {},`)
  470. }
  471. if !skipClientTime {
  472. overhead += len(`"client_time": "2006-01-02T15:04:05.999999999Z07:00",`)
  473. }
  474. if procID != 0 {
  475. overhead += len(`"proc_id": 4294967296,`)
  476. }
  477. if procSequence != 0 {
  478. overhead += len(`"proc_seq": 9007199254740992,`)
  479. }
  480. // TODO: do a pass over buf and count how many backslashes will be needed?
  481. // For now just factor in a dozen.
  482. overhead += 12
  483. // Put a sanity cap on buf's size.
  484. max := 16 << 10
  485. if l.lowMem {
  486. max = 255
  487. }
  488. var nTruncated int
  489. if len(buf) > max {
  490. nTruncated = len(buf) - max
  491. // TODO: this can break a UTF-8 character
  492. // mid-encoding. We don't tend to log
  493. // non-ASCII stuff ourselves, but e.g. client
  494. // names might be.
  495. buf = buf[:max]
  496. }
  497. b := make([]byte, 0, len(buf)+overhead)
  498. b = append(b, '{')
  499. if includeLogtail {
  500. b = append(b, `"logtail": {`...)
  501. if !skipClientTime {
  502. b = append(b, `"client_time": "`...)
  503. b = now.UTC().AppendFormat(b, time.RFC3339Nano)
  504. b = append(b, `",`...)
  505. }
  506. if procID != 0 {
  507. b = append(b, `"proc_id": `...)
  508. b = strconv.AppendUint(b, uint64(procID), 10)
  509. b = append(b, ',')
  510. }
  511. if procSequence != 0 {
  512. b = append(b, `"proc_seq": `...)
  513. b = strconv.AppendUint(b, procSequence, 10)
  514. b = append(b, ',')
  515. }
  516. b = bytes.TrimRight(b, ",")
  517. b = append(b, "}, "...)
  518. }
  519. if l.metricsDelta != nil {
  520. if d := l.metricsDelta(); d != "" {
  521. b = append(b, `"metrics": "`...)
  522. b = append(b, d...)
  523. b = append(b, `",`...)
  524. }
  525. }
  526. // Add the log level, if non-zero. Note that we only use log
  527. // levels 1 and 2 currently. It's unlikely we'll ever make it
  528. // past 9.
  529. if level > 0 && level < 10 {
  530. b = append(b, `"v":`...)
  531. b = append(b, '0'+byte(level))
  532. b = append(b, ',')
  533. }
  534. b = append(b, "\"text\": \""...)
  535. for _, c := range buf {
  536. switch c {
  537. case '\b':
  538. b = append(b, '\\', 'b')
  539. case '\f':
  540. b = append(b, '\\', 'f')
  541. case '\n':
  542. b = append(b, '\\', 'n')
  543. case '\r':
  544. b = append(b, '\\', 'r')
  545. case '\t':
  546. b = append(b, '\\', 't')
  547. case '"':
  548. b = append(b, '\\', '"')
  549. case '\\':
  550. b = append(b, '\\', '\\')
  551. default:
  552. // TODO: what about binary gibberish or non UTF-8?
  553. b = append(b, c)
  554. }
  555. }
  556. if nTruncated > 0 {
  557. b = append(b, "…+"...)
  558. b = strconv.AppendInt(b, int64(nTruncated), 10)
  559. }
  560. b = append(b, "\"}\n"...)
  561. return b
  562. }
  563. func (l *Logger) encodeLocked(buf []byte, level int) []byte {
  564. if l.includeProcSequence {
  565. l.procSequence++
  566. }
  567. if buf[0] != '{' {
  568. return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path
  569. }
  570. now := l.timeNow()
  571. obj := make(map[string]any)
  572. if err := json.Unmarshal(buf, &obj); err != nil {
  573. for k := range obj {
  574. delete(obj, k)
  575. }
  576. obj["text"] = string(buf)
  577. }
  578. if txt, isStr := obj["text"].(string); l.lowMem && isStr && len(txt) > 254 {
  579. // TODO(crawshaw): trim to unicode code point
  580. obj["text"] = txt[:254] + "…"
  581. }
  582. hasLogtail := obj["logtail"] != nil
  583. if hasLogtail {
  584. obj["error_has_logtail"] = obj["logtail"]
  585. obj["logtail"] = nil
  586. }
  587. if !l.skipClientTime || l.procID != 0 || l.procSequence != 0 {
  588. logtail := map[string]any{}
  589. if !l.skipClientTime {
  590. logtail["client_time"] = now.UTC().Format(time.RFC3339Nano)
  591. }
  592. if l.procID != 0 {
  593. logtail["proc_id"] = l.procID
  594. }
  595. if l.procSequence != 0 {
  596. logtail["proc_seq"] = l.procSequence
  597. }
  598. obj["logtail"] = logtail
  599. }
  600. if level > 0 {
  601. obj["v"] = level
  602. }
  603. b, err := json.Marshal(obj)
  604. if err != nil {
  605. fmt.Fprintf(l.stderr, "logtail: re-encoding JSON failed: %v\n", err)
  606. // I know of no conditions under which this could fail.
  607. // Report it very loudly.
  608. panic("logtail: re-encoding JSON failed: " + err.Error())
  609. }
  610. b = append(b, '\n')
  611. return b
  612. }
  613. // Logf logs to l using the provided fmt-style format and optional arguments.
  614. func (l *Logger) Logf(format string, args ...any) {
  615. fmt.Fprintf(l, format, args...)
  616. }
  617. // Write logs an encoded JSON blob.
  618. //
  619. // If the []byte passed to Write is not an encoded JSON blob,
  620. // then contents is fit into a JSON blob and written.
  621. //
  622. // This is intended as an interface for the stdlib "log" package.
  623. func (l *Logger) Write(buf []byte) (int, error) {
  624. if len(buf) == 0 {
  625. return 0, nil
  626. }
  627. level, buf := parseAndRemoveLogLevel(buf)
  628. if l.stderr != nil && l.stderr != io.Discard && int64(level) <= atomic.LoadInt64(&l.stderrLevel) {
  629. if buf[len(buf)-1] == '\n' {
  630. l.stderr.Write(buf)
  631. } else {
  632. // The log package always line-terminates logs,
  633. // so this is an uncommon path.
  634. withNL := append(buf[:len(buf):len(buf)], '\n')
  635. l.stderr.Write(withNL)
  636. }
  637. }
  638. l.writeLock.Lock()
  639. defer l.writeLock.Unlock()
  640. b := l.encodeLocked(buf, level)
  641. _, err := l.sendLocked(b)
  642. return len(buf), err
  643. }
  644. var (
  645. openBracketV = []byte("[v")
  646. v1 = []byte("[v1] ")
  647. v2 = []byte("[v2] ")
  648. vJSON = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
  649. )
  650. // level 0 is normal (or unknown) level; 1+ are increasingly verbose
  651. func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
  652. if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
  653. return 0, buf
  654. }
  655. if bytes.Contains(buf, v1) {
  656. return 1, bytes.ReplaceAll(buf, v1, nil)
  657. }
  658. if bytes.Contains(buf, v2) {
  659. return 2, bytes.ReplaceAll(buf, v2, nil)
  660. }
  661. if i := bytes.Index(buf, vJSON); i != -1 {
  662. rest := buf[i+len(vJSON):]
  663. if len(rest) >= 2 {
  664. v := rest[0]
  665. if v >= '0' && v <= '9' {
  666. return int(v - '0'), rest[1:]
  667. }
  668. }
  669. }
  670. return 0, buf
  671. }