command_log.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package libbox
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "io"
  6. "net"
  7. )
  8. func (s *CommandServer) WriteMessage(message string) {
  9. s.subscriber.Emit(message)
  10. s.access.Lock()
  11. s.savedLines.PushBack(message)
  12. if s.savedLines.Len() > 100 {
  13. s.savedLines.Remove(s.savedLines.Front())
  14. }
  15. s.access.Unlock()
  16. }
  17. func readLog(reader io.Reader) ([]byte, error) {
  18. var messageLength uint16
  19. err := binary.Read(reader, binary.BigEndian, &messageLength)
  20. if err != nil {
  21. return nil, err
  22. }
  23. data := make([]byte, messageLength)
  24. _, err = io.ReadFull(reader, data)
  25. if err != nil {
  26. return nil, err
  27. }
  28. return data, nil
  29. }
  30. func writeLog(writer io.Writer, message []byte) error {
  31. err := binary.Write(writer, binary.BigEndian, uint16(len(message)))
  32. if err != nil {
  33. return err
  34. }
  35. _, err = writer.Write(message)
  36. return err
  37. }
  38. func (s *CommandServer) handleLogConn(conn net.Conn) error {
  39. var savedLines []string
  40. s.access.Lock()
  41. savedLines = make([]string, 0, s.savedLines.Len())
  42. for element := s.savedLines.Front(); element != nil; element = element.Next() {
  43. savedLines = append(savedLines, element.Value)
  44. }
  45. s.access.Unlock()
  46. subscription, done, err := s.observer.Subscribe()
  47. if err != nil {
  48. return err
  49. }
  50. defer s.observer.UnSubscribe(subscription)
  51. for _, line := range savedLines {
  52. err = writeLog(conn, []byte(line))
  53. if err != nil {
  54. return err
  55. }
  56. }
  57. ctx := connKeepAlive(conn)
  58. for {
  59. select {
  60. case <-ctx.Done():
  61. return ctx.Err()
  62. case message := <-subscription:
  63. err = writeLog(conn, []byte(message))
  64. if err != nil {
  65. return err
  66. }
  67. case <-done:
  68. return nil
  69. }
  70. }
  71. }
  72. func (c *CommandClient) handleLogConn(conn net.Conn) {
  73. for {
  74. message, err := readLog(conn)
  75. if err != nil {
  76. c.handler.Disconnected(err.Error())
  77. return
  78. }
  79. c.handler.WriteLog(string(message))
  80. }
  81. }
  82. func connKeepAlive(reader io.Reader) context.Context {
  83. ctx, cancel := context.WithCancelCause(context.Background())
  84. go func() {
  85. for {
  86. _, err := readLog(reader)
  87. if err != nil {
  88. cancel(err)
  89. return
  90. }
  91. }
  92. }()
  93. return ctx
  94. }