client.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package ws_helper
  2. import (
  3. "bytes"
  4. "github.com/allanpk716/ChineseSubFinder/internal/pkg/log_helper"
  5. "github.com/gorilla/websocket"
  6. "net/http"
  7. "time"
  8. )
  9. const (
  10. // Time allowed to write a message to the peer.
  11. writeWait = 10 * time.Second
  12. // Time allowed to read the next pong message from the peer.
  13. pongWait = 60 * time.Second
  14. // Send pings to peer with this period. Must be less than pongWait.
  15. pingPeriod = (pongWait * 9) / 10
  16. // Maximum message size allowed from peer.
  17. maxMessageSize = 512
  18. // send buffer size
  19. bufSize = 256
  20. )
  21. var (
  22. newline = []byte{'\n'}
  23. space = []byte{' '}
  24. )
  25. var upgrader = websocket.Upgrader{
  26. ReadBufferSize: 5 * 1024,
  27. WriteBufferSize: 5 * 1024,
  28. CheckOrigin: func(r *http.Request) bool {
  29. return true
  30. },
  31. }
  32. type Client struct {
  33. hub *Hub
  34. conn *websocket.Conn
  35. send chan []byte
  36. }
  37. func (c *Client) readPump() {
  38. defer func() {
  39. c.hub.unregister <- c
  40. _ = c.conn.Close()
  41. }()
  42. c.conn.SetReadLimit(maxMessageSize)
  43. err := c.conn.SetReadDeadline(time.Now().Add(pongWait))
  44. if err != nil {
  45. log_helper.GetLogger().Errorln("readPump.SetReadDeadline", err)
  46. return
  47. }
  48. c.conn.SetPongHandler(func(string) error {
  49. return c.conn.SetReadDeadline(time.Now().Add(pongWait))
  50. })
  51. for {
  52. _, message, err := c.conn.ReadMessage()
  53. if err != nil {
  54. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  55. log_helper.GetLogger().Errorln("readPump.IsUnexpectedCloseError", err)
  56. }
  57. break
  58. }
  59. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  60. c.hub.broadcast <- message
  61. }
  62. }
  63. func (c *Client) writePump() {
  64. ticker := time.NewTicker(pingPeriod)
  65. defer func() {
  66. ticker.Stop()
  67. _ = c.conn.Close()
  68. }()
  69. for {
  70. select {
  71. case message, ok := <-c.send:
  72. err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  73. if err != nil {
  74. log_helper.GetLogger().Errorln("writePump.SetWriteDeadline", err)
  75. return
  76. }
  77. if ok == false {
  78. // The hub closed the channel.
  79. err = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  80. if err != nil {
  81. log_helper.GetLogger().Errorln("writePump close hub WriteMessage", err)
  82. }
  83. return
  84. }
  85. w, err := c.conn.NextWriter(websocket.TextMessage)
  86. if err != nil {
  87. return
  88. }
  89. _, err = w.Write(message)
  90. if err != nil {
  91. log_helper.GetLogger().Errorln("writePump.Write", err)
  92. return
  93. }
  94. // Add queued chat messages to the current websocket message.
  95. n := len(c.send)
  96. for i := 0; i < n; i++ {
  97. w.Write(newline)
  98. w.Write(<-c.send)
  99. }
  100. if err := w.Close(); err != nil {
  101. return
  102. }
  103. case <-ticker.C:
  104. if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
  105. log_helper.GetLogger().Errorln("writePump.ticker.C.SetWriteDeadline", err)
  106. return
  107. }
  108. if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  109. log_helper.GetLogger().Errorln("writePump.ticker.C.WriteMessage", err)
  110. return
  111. }
  112. }
  113. }
  114. }
  115. func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
  116. conn, err := upgrader.Upgrade(w, r, nil)
  117. if err != nil {
  118. log_helper.GetLogger().Errorln("ServeWs", err)
  119. return
  120. }
  121. client := &Client{
  122. hub: hub,
  123. conn: conn,
  124. send: make(chan []byte, bufSize),
  125. }
  126. client.hub.register <- client
  127. go client.writePump()
  128. go client.readPump()
  129. }