|
@@ -29,11 +29,6 @@ const (
|
|
|
upGraderWriteBufferSize = 5 * 1024
|
|
|
)
|
|
|
|
|
|
-var (
|
|
|
- newline = []byte{'\n'}
|
|
|
- space = []byte{' '}
|
|
|
-)
|
|
|
-
|
|
|
var upGrader = websocket.Upgrader{
|
|
|
ReadBufferSize: upGraderReadBufferSize,
|
|
|
WriteBufferSize: upGraderWriteBufferSize,
|
|
@@ -43,20 +38,22 @@ var upGrader = websocket.Upgrader{
|
|
|
}
|
|
|
|
|
|
type Client struct {
|
|
|
- hub *Hub
|
|
|
-
|
|
|
- conn *websocket.Conn // 与服务器连接实例
|
|
|
-
|
|
|
- sendLogLineIndex int // 日志发送到那个位置了
|
|
|
-
|
|
|
- send chan []byte
|
|
|
+ hub *Hub
|
|
|
+ conn *websocket.Conn // 与服务器连接实例
|
|
|
+ sendLogLineIndex int // 日志发送到那个位置了
|
|
|
+ authed bool // 是否已经通过认证
|
|
|
+ send chan []byte // 发送给 client 的内容 bytes
|
|
|
}
|
|
|
|
|
|
+// 接收 Client 发送来的消息
|
|
|
func (c *Client) readPump() {
|
|
|
+
|
|
|
defer func() {
|
|
|
+ // 触发移除 client 的逻辑
|
|
|
c.hub.unregister <- c
|
|
|
_ = c.conn.Close()
|
|
|
}()
|
|
|
+
|
|
|
c.conn.SetReadLimit(maxMessageSize)
|
|
|
err := c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
|
if err != nil {
|
|
@@ -66,6 +63,7 @@ func (c *Client) readPump() {
|
|
|
c.conn.SetPongHandler(func(string) error {
|
|
|
return c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
|
})
|
|
|
+ // 收取 client 发送过来的消息
|
|
|
for {
|
|
|
_, message, err := c.conn.ReadMessage()
|
|
|
if err != nil {
|
|
@@ -74,12 +72,15 @@ func (c *Client) readPump() {
|
|
|
}
|
|
|
break
|
|
|
}
|
|
|
- message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
|
|
|
+ message = bytes.TrimSpace(bytes.Replace(message, []byte{}, []byte{}, -1))
|
|
|
c.hub.broadcast <- message
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// 向 Client 发送消息的队列
|
|
|
func (c *Client) writePump() {
|
|
|
+
|
|
|
+ // 心跳计时器
|
|
|
ticker := time.NewTicker(pingPeriod)
|
|
|
defer func() {
|
|
|
ticker.Stop()
|
|
@@ -89,6 +90,8 @@ func (c *Client) writePump() {
|
|
|
for {
|
|
|
select {
|
|
|
case message, ok := <-c.send:
|
|
|
+ // 这里是需要发送给 client 的消息
|
|
|
+ // 当然首先还是得先把当前消息的发送超时,给确定下来
|
|
|
err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
if err != nil {
|
|
|
log_helper.GetLogger().Errorln("writePump.SetWriteDeadline", err)
|
|
@@ -105,6 +108,7 @@ func (c *Client) writePump() {
|
|
|
|
|
|
w, err := c.conn.NextWriter(websocket.TextMessage)
|
|
|
if err != nil {
|
|
|
+ log_helper.GetLogger().Errorln("writePump.NextWriter", err)
|
|
|
return
|
|
|
}
|
|
|
_, err = w.Write(message)
|
|
@@ -115,18 +119,25 @@ func (c *Client) writePump() {
|
|
|
// Add queued chat messages to the current websocket message.
|
|
|
n := len(c.send)
|
|
|
for i := 0; i < n; i++ {
|
|
|
- w.Write(newline)
|
|
|
- w.Write(<-c.send)
|
|
|
+
|
|
|
+ _, err = w.Write(<-c.send)
|
|
|
+ if err != nil {
|
|
|
+ log_helper.GetLogger().Errorln("writePump.Write", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if err := w.Close(); err != nil {
|
|
|
+ log_helper.GetLogger().Errorln("writePump.Close", err)
|
|
|
return
|
|
|
}
|
|
|
case <-ticker.C:
|
|
|
+ // 心跳相关,这里是定时器到了触发的间隔,设置发送下一条心跳的超时时间
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
|
|
|
log_helper.GetLogger().Errorln("writePump.ticker.C.SetWriteDeadline", err)
|
|
|
return
|
|
|
}
|
|
|
+ // 然后发送心跳
|
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
|
log_helper.GetLogger().Errorln("writePump.ticker.C.WriteMessage", err)
|
|
|
return
|
|
@@ -135,17 +146,21 @@ func (c *Client) writePump() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// ServeWs 每个 Client 连接 ws 上线时触发
|
|
|
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
|
|
|
+
|
|
|
conn, err := upGrader.Upgrade(w, r, nil)
|
|
|
if err != nil {
|
|
|
- log_helper.GetLogger().Errorln("ServeWs", err)
|
|
|
+ log_helper.GetLogger().Errorln("ServeWs.Upgrade", err)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
client := &Client{
|
|
|
- hub: hub,
|
|
|
- conn: conn,
|
|
|
- send: make(chan []byte, bufSize),
|
|
|
+ hub: hub,
|
|
|
+ conn: conn,
|
|
|
+ sendLogLineIndex: 0,
|
|
|
+ authed: false,
|
|
|
+ send: make(chan []byte, bufSize),
|
|
|
}
|
|
|
client.hub.register <- client
|
|
|
|