buffer.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package logtail
  5. import (
  6. "bytes"
  7. "errors"
  8. "fmt"
  9. "sync"
  10. )
  11. type Buffer interface {
  12. // TryReadLine tries to read a log line from the ring buffer.
  13. // If no line is available it returns a nil slice.
  14. // If the ring buffer is closed it returns io.EOF.
  15. //
  16. // The returned slice may point to data that will be overwritten
  17. // by a subsequent call to TryReadLine.
  18. TryReadLine() ([]byte, error)
  19. // Write writes a log line into the ring buffer.
  20. Write([]byte) (int, error)
  21. }
  22. func NewMemoryBuffer(numEntries int) Buffer {
  23. return &memBuffer{
  24. pending: make(chan qentry, numEntries),
  25. }
  26. }
  27. type memBuffer struct {
  28. next []byte
  29. pending chan qentry
  30. dropMu sync.Mutex
  31. dropCount int
  32. }
  33. func (m *memBuffer) TryReadLine() ([]byte, error) {
  34. if m.next != nil {
  35. msg := m.next
  36. m.next = nil
  37. return msg, nil
  38. }
  39. select {
  40. case ent := <-m.pending:
  41. if ent.dropCount > 0 {
  42. m.next = ent.msg
  43. buf := new(bytes.Buffer)
  44. fmt.Fprintf(buf, "----------- %d logs dropped ----------", ent.dropCount)
  45. return buf.Bytes(), nil
  46. }
  47. return ent.msg, nil
  48. default:
  49. return nil, nil
  50. }
  51. }
  52. func (m *memBuffer) Write(b []byte) (int, error) {
  53. m.dropMu.Lock()
  54. defer m.dropMu.Unlock()
  55. ent := qentry{
  56. msg: b,
  57. dropCount: m.dropCount,
  58. }
  59. select {
  60. case m.pending <- ent:
  61. m.dropCount = 0
  62. return len(b), nil
  63. default:
  64. m.dropCount++
  65. return 0, errBufferFull
  66. }
  67. }
  68. type qentry struct {
  69. msg []byte
  70. dropCount int
  71. }
  72. var errBufferFull = errors.New("logtail: buffer full")