buffer.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package logtail
  4. import (
  5. "bytes"
  6. "errors"
  7. "fmt"
  8. "sync"
  9. )
  10. type Buffer interface {
  11. // TryReadLine tries to read a log line from the ring buffer.
  12. // If no line is available it returns a nil slice.
  13. // If the ring buffer is closed it returns io.EOF.
  14. //
  15. // The returned slice may point to data that will be overwritten
  16. // by a subsequent call to TryReadLine.
  17. TryReadLine() ([]byte, error)
  18. // Write writes a log line into the ring buffer.
  19. // Implementations must not retain the provided buffer.
  20. Write([]byte) (int, error)
  21. }
  22. func NewMemoryBuffer(numEntries int) Buffer {
  23. return &memBuffer{
  24. pending: make(chan qentry, numEntries),
  25. }
  26. }
  27. type memBuffer struct {
  28. next []byte
  29. pending chan qentry
  30. dropMu sync.Mutex
  31. dropCount int
  32. }
  33. func (m *memBuffer) TryReadLine() ([]byte, error) {
  34. if m.next != nil {
  35. msg := m.next
  36. m.next = nil
  37. return msg, nil
  38. }
  39. select {
  40. case ent := <-m.pending:
  41. if ent.dropCount > 0 {
  42. m.next = ent.msg
  43. return fmt.Appendf(nil, "----------- %d logs dropped ----------", ent.dropCount), nil
  44. }
  45. return ent.msg, nil
  46. default:
  47. return nil, nil
  48. }
  49. }
  50. func (m *memBuffer) Write(b []byte) (int, error) {
  51. m.dropMu.Lock()
  52. defer m.dropMu.Unlock()
  53. ent := qentry{
  54. msg: bytes.Clone(b),
  55. dropCount: m.dropCount,
  56. }
  57. select {
  58. case m.pending <- ent:
  59. m.dropCount = 0
  60. return len(b), nil
  61. default:
  62. m.dropCount++
  63. return 0, errBufferFull
  64. }
  65. }
  66. type qentry struct {
  67. msg []byte
  68. dropCount int
  69. }
  70. var errBufferFull = errors.New("logtail: buffer full")