command_log.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package libbox
  2. import (
  3. "bufio"
  4. "context"
  5. "io"
  6. "net"
  7. "time"
  8. "github.com/sagernet/sing/common/binary"
  9. E "github.com/sagernet/sing/common/exceptions"
  10. )
  11. func (s *CommandServer) WriteMessage(message string) {
  12. s.subscriber.Emit(message)
  13. s.access.Lock()
  14. s.savedLines.PushBack(message)
  15. if s.savedLines.Len() > s.maxLines {
  16. s.savedLines.Remove(s.savedLines.Front())
  17. }
  18. s.access.Unlock()
  19. }
  20. func writeLog(writer *bufio.Writer, messages []string) error {
  21. err := binary.Write(writer, binary.BigEndian, uint8(0))
  22. if err != nil {
  23. return err
  24. }
  25. err = binary.WriteData(writer, binary.BigEndian, messages)
  26. if err != nil {
  27. return err
  28. }
  29. return writer.Flush()
  30. }
  31. func writeClearLog(writer *bufio.Writer) error {
  32. err := binary.Write(writer, binary.BigEndian, uint8(1))
  33. if err != nil {
  34. return err
  35. }
  36. return writer.Flush()
  37. }
  38. func (s *CommandServer) handleLogConn(conn net.Conn) error {
  39. var (
  40. interval int64
  41. timer *time.Timer
  42. )
  43. err := binary.Read(conn, binary.BigEndian, &interval)
  44. if err != nil {
  45. return E.Cause(err, "read interval")
  46. }
  47. timer = time.NewTimer(time.Duration(interval))
  48. if !timer.Stop() {
  49. <-timer.C
  50. }
  51. var savedLines []string
  52. s.access.Lock()
  53. savedLines = make([]string, 0, s.savedLines.Len())
  54. for element := s.savedLines.Front(); element != nil; element = element.Next() {
  55. savedLines = append(savedLines, element.Value)
  56. }
  57. s.access.Unlock()
  58. subscription, done, err := s.observer.Subscribe()
  59. if err != nil {
  60. return err
  61. }
  62. defer s.observer.UnSubscribe(subscription)
  63. writer := bufio.NewWriter(conn)
  64. if len(savedLines) > 0 {
  65. err = writeLog(writer, savedLines)
  66. if err != nil {
  67. return err
  68. }
  69. }
  70. ctx := connKeepAlive(conn)
  71. var logLines []string
  72. for {
  73. select {
  74. case <-ctx.Done():
  75. return ctx.Err()
  76. case <-s.logReset:
  77. err = writeClearLog(writer)
  78. if err != nil {
  79. return err
  80. }
  81. case <-done:
  82. return nil
  83. case logLine := <-subscription:
  84. logLines = logLines[:0]
  85. logLines = append(logLines, logLine)
  86. timer.Reset(time.Duration(interval))
  87. loopLogs:
  88. for {
  89. select {
  90. case logLine = <-subscription:
  91. logLines = append(logLines, logLine)
  92. case <-timer.C:
  93. break loopLogs
  94. }
  95. }
  96. err = writeLog(writer, logLines)
  97. if err != nil {
  98. return err
  99. }
  100. }
  101. }
  102. }
  103. func (c *CommandClient) handleLogConn(conn net.Conn) {
  104. reader := bufio.NewReader(conn)
  105. for {
  106. var messageType uint8
  107. err := binary.Read(reader, binary.BigEndian, &messageType)
  108. if err != nil {
  109. c.handler.Disconnected(err.Error())
  110. return
  111. }
  112. var messages []string
  113. switch messageType {
  114. case 0:
  115. err = binary.ReadData(reader, binary.BigEndian, &messages)
  116. if err != nil {
  117. c.handler.Disconnected(err.Error())
  118. return
  119. }
  120. c.handler.WriteLogs(newIterator(messages))
  121. case 1:
  122. c.handler.ClearLogs()
  123. }
  124. }
  125. }
  126. func connKeepAlive(reader io.Reader) context.Context {
  127. ctx, cancel := context.WithCancelCause(context.Background())
  128. go func() {
  129. for {
  130. _, err := reader.Read(make([]byte, 1))
  131. if err != nil {
  132. cancel(err)
  133. return
  134. }
  135. }
  136. }()
  137. return ctx
  138. }