buffer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // Copyright (c) Tailscale Inc & contributors
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. //go:build !ts_omit_logtail
  4. package logtail
  5. import (
  6. "bytes"
  7. "errors"
  8. "expvar"
  9. "fmt"
  10. "tailscale.com/metrics"
  11. "tailscale.com/syncs"
  12. )
  13. type Buffer interface {
  14. // TryReadLine tries to read a log line from the ring buffer.
  15. // If no line is available it returns a nil slice.
  16. // If the ring buffer is closed it returns io.EOF.
  17. //
  18. // The returned slice may point to data that will be overwritten
  19. // by a subsequent call to TryReadLine.
  20. TryReadLine() ([]byte, error)
  21. // Write writes a log line into the ring buffer.
  22. // Implementations must not retain the provided buffer.
  23. Write([]byte) (int, error)
  24. }
  25. func NewMemoryBuffer(numEntries int) Buffer {
  26. return &memBuffer{
  27. pending: make(chan qentry, numEntries),
  28. }
  29. }
  30. type memBuffer struct {
  31. next []byte
  32. pending chan qentry
  33. dropMu syncs.Mutex
  34. dropCount int
  35. // Metrics (see [memBuffer.ExpVar] for details).
  36. writeCalls expvar.Int
  37. readCalls expvar.Int
  38. writeBytes expvar.Int
  39. readBytes expvar.Int
  40. droppedBytes expvar.Int
  41. storedBytes expvar.Int
  42. }
  43. // ExpVar returns a [metrics.Set] with metrics about the buffer.
  44. //
  45. // - counter_write_calls: Total number of write calls.
  46. // - counter_read_calls: Total number of read calls.
  47. // - counter_write_bytes: Total number of bytes written.
  48. // - counter_read_bytes: Total number of bytes read.
  49. // - counter_dropped_bytes: Total number of bytes dropped.
  50. // - gauge_stored_bytes: Current number of bytes stored in memory.
  51. func (b *memBuffer) ExpVar() expvar.Var {
  52. m := new(metrics.Set)
  53. m.Set("counter_write_calls", &b.writeCalls)
  54. m.Set("counter_read_calls", &b.readCalls)
  55. m.Set("counter_write_bytes", &b.writeBytes)
  56. m.Set("counter_read_bytes", &b.readBytes)
  57. m.Set("counter_dropped_bytes", &b.droppedBytes)
  58. m.Set("gauge_stored_bytes", &b.storedBytes)
  59. return m
  60. }
  61. func (m *memBuffer) TryReadLine() ([]byte, error) {
  62. m.readCalls.Add(1)
  63. if m.next != nil {
  64. msg := m.next
  65. m.next = nil
  66. m.readBytes.Add(int64(len(msg)))
  67. m.storedBytes.Add(-int64(len(msg)))
  68. return msg, nil
  69. }
  70. select {
  71. case ent := <-m.pending:
  72. if ent.dropCount > 0 {
  73. m.next = ent.msg
  74. b := fmt.Appendf(nil, "----------- %d logs dropped ----------", ent.dropCount)
  75. m.writeBytes.Add(int64(len(b))) // indicate pseudo-injected log message
  76. m.readBytes.Add(int64(len(b)))
  77. return b, nil
  78. }
  79. m.readBytes.Add(int64(len(ent.msg)))
  80. m.storedBytes.Add(-int64(len(ent.msg)))
  81. return ent.msg, nil
  82. default:
  83. return nil, nil
  84. }
  85. }
  86. func (m *memBuffer) Write(b []byte) (int, error) {
  87. m.writeCalls.Add(1)
  88. m.dropMu.Lock()
  89. defer m.dropMu.Unlock()
  90. ent := qentry{
  91. msg: bytes.Clone(b),
  92. dropCount: m.dropCount,
  93. }
  94. select {
  95. case m.pending <- ent:
  96. m.writeBytes.Add(int64(len(b)))
  97. m.storedBytes.Add(+int64(len(b)))
  98. m.dropCount = 0
  99. return len(b), nil
  100. default:
  101. m.dropCount++
  102. m.droppedBytes.Add(int64(len(b)))
  103. return 0, errBufferFull
  104. }
  105. }
  106. type qentry struct {
  107. msg []byte
  108. dropCount int
  109. }
  110. var errBufferFull = errors.New("logtail: buffer full")