command_log.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package libbox
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "io"
  6. "net"
  7. )
  8. func (s *CommandServer) WriteMessage(message string) {
  9. s.subscriber.Emit(message)
  10. s.access.Lock()
  11. s.savedLines.PushBack(message)
  12. if s.savedLines.Len() > s.maxLines {
  13. s.savedLines.Remove(s.savedLines.Front())
  14. }
  15. s.access.Unlock()
  16. }
  17. func readLog(reader io.Reader) ([]byte, error) {
  18. var messageLength uint16
  19. err := binary.Read(reader, binary.BigEndian, &messageLength)
  20. if err != nil {
  21. return nil, err
  22. }
  23. if messageLength == 0 {
  24. return nil, nil
  25. }
  26. data := make([]byte, messageLength)
  27. _, err = io.ReadFull(reader, data)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return data, nil
  32. }
  33. func writeLog(writer io.Writer, message []byte) error {
  34. err := binary.Write(writer, binary.BigEndian, uint8(0))
  35. if err != nil {
  36. return err
  37. }
  38. err = binary.Write(writer, binary.BigEndian, uint16(len(message)))
  39. if err != nil {
  40. return err
  41. }
  42. if len(message) > 0 {
  43. _, err = writer.Write(message)
  44. }
  45. return err
  46. }
  47. func writeClearLog(writer io.Writer) error {
  48. return binary.Write(writer, binary.BigEndian, uint8(1))
  49. }
  50. func (s *CommandServer) handleLogConn(conn net.Conn) error {
  51. var savedLines []string
  52. s.access.Lock()
  53. savedLines = make([]string, 0, s.savedLines.Len())
  54. for element := s.savedLines.Front(); element != nil; element = element.Next() {
  55. savedLines = append(savedLines, element.Value)
  56. }
  57. s.access.Unlock()
  58. subscription, done, err := s.observer.Subscribe()
  59. if err != nil {
  60. return err
  61. }
  62. defer s.observer.UnSubscribe(subscription)
  63. for _, line := range savedLines {
  64. err = writeLog(conn, []byte(line))
  65. if err != nil {
  66. return err
  67. }
  68. }
  69. ctx := connKeepAlive(conn)
  70. for {
  71. select {
  72. case <-ctx.Done():
  73. return ctx.Err()
  74. case message := <-subscription:
  75. err = writeLog(conn, []byte(message))
  76. if err != nil {
  77. return err
  78. }
  79. case <-s.logReset:
  80. err = writeClearLog(conn)
  81. if err != nil {
  82. return err
  83. }
  84. case <-done:
  85. return nil
  86. }
  87. }
  88. }
  89. func (c *CommandClient) handleLogConn(conn net.Conn) {
  90. for {
  91. var messageType uint8
  92. err := binary.Read(conn, binary.BigEndian, &messageType)
  93. if err != nil {
  94. c.handler.Disconnected(err.Error())
  95. return
  96. }
  97. var message []byte
  98. switch messageType {
  99. case 0:
  100. message, err = readLog(conn)
  101. if err != nil {
  102. c.handler.Disconnected(err.Error())
  103. return
  104. }
  105. c.handler.WriteLog(string(message))
  106. case 1:
  107. c.handler.ClearLog()
  108. }
  109. }
  110. }
  111. func connKeepAlive(reader io.Reader) context.Context {
  112. ctx, cancel := context.WithCancelCause(context.Background())
  113. go func() {
  114. for {
  115. _, err := readLog(reader)
  116. if err != nil {
  117. cancel(err)
  118. return
  119. }
  120. }
  121. }()
  122. return ctx
  123. }