command_log.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package libbox
  2. import (
  3. "bufio"
  4. "context"
  5. "io"
  6. "net"
  7. "time"
  8. "github.com/sagernet/sing/common/binary"
  9. E "github.com/sagernet/sing/common/exceptions"
  10. "github.com/sagernet/sing/common/varbin"
  11. )
  12. func (s *CommandServer) ResetLog() {
  13. s.access.Lock()
  14. defer s.access.Unlock()
  15. s.savedLines.Init()
  16. select {
  17. case s.logReset <- struct{}{}:
  18. default:
  19. }
  20. }
  21. func (s *CommandServer) WriteMessage(message string) {
  22. s.subscriber.Emit(message)
  23. s.access.Lock()
  24. s.savedLines.PushBack(message)
  25. if s.savedLines.Len() > s.maxLines {
  26. s.savedLines.Remove(s.savedLines.Front())
  27. }
  28. s.access.Unlock()
  29. }
  30. func (s *CommandServer) handleLogConn(conn net.Conn) error {
  31. var (
  32. interval int64
  33. timer *time.Timer
  34. )
  35. err := binary.Read(conn, binary.BigEndian, &interval)
  36. if err != nil {
  37. return E.Cause(err, "read interval")
  38. }
  39. timer = time.NewTimer(time.Duration(interval))
  40. if !timer.Stop() {
  41. <-timer.C
  42. }
  43. var savedLines []string
  44. s.access.Lock()
  45. savedLines = make([]string, 0, s.savedLines.Len())
  46. for element := s.savedLines.Front(); element != nil; element = element.Next() {
  47. savedLines = append(savedLines, element.Value)
  48. }
  49. s.access.Unlock()
  50. subscription, done, err := s.observer.Subscribe()
  51. if err != nil {
  52. return err
  53. }
  54. defer s.observer.UnSubscribe(subscription)
  55. writer := bufio.NewWriter(conn)
  56. select {
  57. case <-s.logReset:
  58. err = writer.WriteByte(1)
  59. if err != nil {
  60. return err
  61. }
  62. err = writer.Flush()
  63. if err != nil {
  64. return err
  65. }
  66. default:
  67. }
  68. if len(savedLines) > 0 {
  69. err = writer.WriteByte(0)
  70. if err != nil {
  71. return err
  72. }
  73. err = varbin.Write(writer, binary.BigEndian, savedLines)
  74. if err != nil {
  75. return err
  76. }
  77. }
  78. ctx := connKeepAlive(conn)
  79. var logLines []string
  80. for {
  81. err = writer.Flush()
  82. if err != nil {
  83. return err
  84. }
  85. select {
  86. case <-ctx.Done():
  87. return ctx.Err()
  88. case <-s.logReset:
  89. err = writer.WriteByte(1)
  90. if err != nil {
  91. return err
  92. }
  93. case <-done:
  94. return nil
  95. case logLine := <-subscription:
  96. logLines = logLines[:0]
  97. logLines = append(logLines, logLine)
  98. timer.Reset(time.Duration(interval))
  99. loopLogs:
  100. for {
  101. select {
  102. case logLine = <-subscription:
  103. logLines = append(logLines, logLine)
  104. case <-timer.C:
  105. break loopLogs
  106. }
  107. }
  108. err = writer.WriteByte(0)
  109. if err != nil {
  110. return err
  111. }
  112. err = varbin.Write(writer, binary.BigEndian, logLines)
  113. if err != nil {
  114. return err
  115. }
  116. }
  117. }
  118. }
  119. func (c *CommandClient) handleLogConn(conn net.Conn) {
  120. reader := bufio.NewReader(conn)
  121. for {
  122. messageType, err := reader.ReadByte()
  123. if err != nil {
  124. c.handler.Disconnected(err.Error())
  125. return
  126. }
  127. var messages []string
  128. switch messageType {
  129. case 0:
  130. err = varbin.Read(reader, binary.BigEndian, &messages)
  131. if err != nil {
  132. c.handler.Disconnected(err.Error())
  133. return
  134. }
  135. c.handler.WriteLogs(newIterator(messages))
  136. case 1:
  137. c.handler.ClearLogs()
  138. }
  139. }
  140. }
  141. func connKeepAlive(reader io.Reader) context.Context {
  142. ctx, cancel := context.WithCancelCause(context.Background())
  143. go func() {
  144. for {
  145. _, err := reader.Read(make([]byte, 1))
  146. if err != nil {
  147. cancel(err)
  148. return
  149. }
  150. }
  151. }()
  152. return ctx
  153. }