writer.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package logging
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/go-logfmt/logfmt"
  10. "github.com/opencode-ai/opencode/internal/pubsub"
  11. )
  12. const (
  13. persistKeyArg = "$_persist"
  14. PersistTimeArg = "$_persist_time"
  15. )
  16. const (
  17. // Maximum number of log messages to keep in memory
  18. maxLogMessages = 1000
  19. )
  20. type LogData struct {
  21. messages []LogMessage
  22. *pubsub.Broker[LogMessage]
  23. lock sync.Mutex
  24. }
  25. func (l *LogData) Add(msg LogMessage) {
  26. l.lock.Lock()
  27. defer l.lock.Unlock()
  28. // Add new message
  29. l.messages = append(l.messages, msg)
  30. // Trim if exceeding max capacity
  31. if len(l.messages) > maxLogMessages {
  32. l.messages = l.messages[len(l.messages)-maxLogMessages:]
  33. }
  34. l.Publish(pubsub.CreatedEvent, msg)
  35. }
  36. func (l *LogData) List() []LogMessage {
  37. l.lock.Lock()
  38. defer l.lock.Unlock()
  39. return l.messages
  40. }
  41. var defaultLogData = &LogData{
  42. messages: make([]LogMessage, 0, maxLogMessages),
  43. Broker: pubsub.NewBroker[LogMessage](),
  44. }
  45. type writer struct{}
  46. func (w *writer) Write(p []byte) (int, error) {
  47. d := logfmt.NewDecoder(bytes.NewReader(p))
  48. for d.ScanRecord() {
  49. msg := LogMessage{
  50. ID: fmt.Sprintf("%d", time.Now().UnixNano()),
  51. Time: time.Now(),
  52. }
  53. for d.ScanKeyval() {
  54. switch string(d.Key()) {
  55. case "time":
  56. parsed, err := time.Parse(time.RFC3339, string(d.Value()))
  57. if err != nil {
  58. return 0, fmt.Errorf("parsing time: %w", err)
  59. }
  60. msg.Time = parsed
  61. case "level":
  62. msg.Level = strings.ToLower(string(d.Value()))
  63. case "msg":
  64. msg.Message = string(d.Value())
  65. default:
  66. if string(d.Key()) == persistKeyArg {
  67. msg.Persist = true
  68. } else if string(d.Key()) == PersistTimeArg {
  69. parsed, err := time.ParseDuration(string(d.Value()))
  70. if err != nil {
  71. continue
  72. }
  73. msg.PersistTime = parsed
  74. } else {
  75. msg.Attributes = append(msg.Attributes, Attr{
  76. Key: string(d.Key()),
  77. Value: string(d.Value()),
  78. })
  79. }
  80. }
  81. }
  82. defaultLogData.Add(msg)
  83. }
  84. if d.Err() != nil {
  85. return 0, d.Err()
  86. }
  87. return len(p), nil
  88. }
  89. func NewWriter() *writer {
  90. w := &writer{}
  91. return w
  92. }
  93. func Subscribe(ctx context.Context) <-chan pubsub.Event[LogMessage] {
  94. return defaultLogData.Subscribe(ctx)
  95. }
  96. func List() []LogMessage {
  97. return defaultLogData.List()
  98. }