1
0
Эх сурвалжийг харах

feat: channel WebSocket client is ready

JustSong 2 жил өмнө
parent
commit
7a30ce60b0
4 өөрчлөгдсөн 163 нэмэгдсэн , 39 устгасан
  1. 1 1
      README.md
  2. 119 37
      channel/client.go
  3. 7 1
      controller/websocket.go
  4. 36 0
      docs/API.md

+ 1 - 1
README.md

@@ -50,7 +50,7 @@ _✨ 搭建专属于你的消息推送服务,支持多种消息推送方式,
    + 飞书群机器人,
    + 钉钉群机器人,
    + Bark App,
-   + [桌面客户端](https://github.com/songquanpeng/personal-assistant)(WIP)
+   + WebSocket 客户端([官方客户端](https://github.com/songquanpeng/personal-assistant),[接入文档](./docs/API.md#WebSocket%20客户端)),
 2. 多种用户登录注册方式:
    + 邮箱登录注册以及通过邮箱进行密码重置。
    + [GitHub 开放授权](https://github.com/settings/applications/new)。

+ 119 - 37
channel/client.go

@@ -6,62 +6,147 @@ import (
 	"message-pusher/common"
 	"message-pusher/model"
 	"sync"
+	"time"
 )
 
-var clientConnMap map[int]*websocket.Conn
-var clientConnMapMutex sync.Mutex
+const (
+	writeWait      = 10 * time.Second
+	pongWait       = 60 * time.Second
+	pingPeriod     = (pongWait * 9) / 10
+	maxMessageSize = 512
+)
 
-func init() {
-	clientConnMapMutex.Lock()
-	clientConnMap = make(map[int]*websocket.Conn)
-	clientConnMapMutex.Unlock()
+type webSocketClient struct {
+	userId    int
+	conn      *websocket.Conn
+	message   chan *Message
+	pong      chan bool
+	stop      chan bool
+	timestamp int64
+}
+
+func (c *webSocketClient) handleDataReading() {
+	c.conn.SetReadLimit(maxMessageSize)
+	_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
+	c.conn.SetPongHandler(func(string) error {
+		return c.conn.SetReadDeadline(time.Now().Add(pongWait))
+	})
+	for {
+		messageType, _, err := c.conn.ReadMessage()
+		if err != nil {
+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure) {
+				common.SysError("error read WebSocket client: " + err.Error())
+			}
+			c.close()
+			break
+		}
+		switch messageType {
+		case websocket.PingMessage:
+			c.pong <- true
+		case websocket.CloseMessage:
+			c.close()
+			break
+		}
+	}
+}
+
+func (c *webSocketClient) handleDataWriting() {
+	pingTicker := time.NewTicker(pingPeriod)
+	defer func() {
+		pingTicker.Stop()
+		clientConnMapMutex.Lock()
+		client, ok := clientMap[c.userId]
+		// otherwise we may delete the new added client!
+		if ok && client.timestamp == c.timestamp {
+			delete(clientMap, c.userId)
+		}
+		clientConnMapMutex.Unlock()
+		err := c.conn.Close()
+		if err != nil {
+			common.SysError("error close WebSocket client: " + err.Error())
+		}
+	}()
+	for {
+		select {
+		case message := <-c.message:
+			_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
+			err := c.conn.WriteJSON(message)
+			if err != nil {
+				common.SysError("error write data to WebSocket client: " + err.Error())
+				return
+			}
+		case <-c.pong:
+			err := c.conn.WriteMessage(websocket.PongMessage, nil)
+			if err != nil {
+				common.SysError("error send pong to WebSocket client: " + err.Error())
+				return
+			}
+		case <-pingTicker.C:
+			_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
+			err := c.conn.WriteMessage(websocket.PingMessage, nil)
+			if err != nil {
+				common.SysError("error write data to WebSocket client: " + err.Error())
+				return
+			}
+		case <-c.stop:
+			err := c.conn.WriteMessage(websocket.CloseMessage, nil)
+			if err != nil {
+				common.SysError("error write data to WebSocket client: " + err.Error())
+			}
+			return
+		}
+	}
+}
+
+func (c *webSocketClient) sendMessage(message *Message) {
+	c.message <- message
 }
 
-func SendMessageWithConn(message *Message, conn *websocket.Conn) error {
-	return conn.WriteJSON(message)
+func (c *webSocketClient) close() {
+	// should only be called once
+	c.stop <- true
+	// the defer function in handleDataWriting will do the cleanup
 }
 
-func LogoutClient(userId int) {
+var clientMap map[int]*webSocketClient
+var clientConnMapMutex sync.Mutex
+
+func init() {
 	clientConnMapMutex.Lock()
-	delete(clientConnMap, userId)
+	clientMap = make(map[int]*webSocketClient)
 	clientConnMapMutex.Unlock()
 }
 
 func RegisterClient(userId int, conn *websocket.Conn) {
 	clientConnMapMutex.Lock()
-	oldConn, existed := clientConnMap[userId]
+	oldClient, existed := clientMap[userId]
 	clientConnMapMutex.Unlock()
 	if existed {
 		byeMessage := &Message{
 			Title:       common.SystemName,
 			Description: "其他客户端已连接服务器,本客户端已被挤下线!",
 		}
-		err := SendMessageWithConn(byeMessage, oldConn)
-		if err != nil {
-			common.SysError("error send message to client: " + err.Error())
-		}
-		err = oldConn.Close()
-		if err != nil {
-			common.SysError("error close WebSocket connection: " + err.Error())
-		}
+		oldClient.sendMessage(byeMessage)
+		oldClient.close()
 	}
 	helloMessage := &Message{
 		Title:       common.SystemName,
 		Description: "客户端连接成功!",
 	}
-	err := SendMessageWithConn(helloMessage, conn)
-	if err != nil {
-		common.SysError("error send message to client: " + err.Error())
-		return
-	} else {
-		clientConnMapMutex.Lock()
-		clientConnMap[userId] = conn
-		clientConnMapMutex.Unlock()
-		conn.SetCloseHandler(func(code int, text string) error {
-			LogoutClient(userId)
-			return nil
-		})
+	newClient := &webSocketClient{
+		userId:    userId,
+		conn:      conn,
+		message:   make(chan *Message),
+		pong:      make(chan bool),
+		stop:      make(chan bool),
+		timestamp: time.Now().UnixMilli(),
 	}
+	go newClient.handleDataWriting()
+	go newClient.handleDataReading()
+	defer newClient.sendMessage(helloMessage)
+	clientConnMapMutex.Lock()
+	clientMap[userId] = newClient
+	clientConnMapMutex.Unlock()
 }
 
 func SendClientMessage(message *Message, user *model.User) error {
@@ -69,14 +154,11 @@ func SendClientMessage(message *Message, user *model.User) error {
 		return errors.New("未配置 WebSocket 客户端消息推送方式")
 	}
 	clientConnMapMutex.Lock()
-	conn, existed := clientConnMap[user.Id]
+	client, existed := clientMap[user.Id]
 	clientConnMapMutex.Unlock()
 	if !existed {
 		return errors.New("客户端未连接")
 	}
-	err := SendMessageWithConn(message, conn)
-	if err != nil {
-		LogoutClient(user.Id)
-	}
-	return err
+	client.sendMessage(message)
+	return nil
 }

+ 7 - 1
controller/websocket.go

@@ -8,7 +8,13 @@ import (
 	"net/http"
 )
 
-var upgrader = websocket.Upgrader{} // use default options
+var upgrader = websocket.Upgrader{
+	ReadBufferSize:  1024,
+	WriteBufferSize: 1024,
+	CheckOrigin: func(r *http.Request) bool {
+		return true
+	},
+}
 
 func RegisterClient(c *gin.Context) {
 	secret := c.Query("secret")

+ 36 - 0
docs/API.md

@@ -0,0 +1,36 @@
+# API 文档
+
+## WebSocket 客户端
+你可以使用 WebSocket 客户端连接服务器,具体的客户端的类型可以是桌面应用,手机应用或 Web 应用等,只要遵循下述协议即可。
+
+目前同一时间一个用户只能有一个客户端连接到服务器,之前已连接的客户端将被断开连接。
+
+### 连接协议
+1. API 端点为:`ws://<domain>:<port>/api/register_client/<username>?secret=<secret>`
+2. 如果启用了 HTTPS,则需要将 `ws` 替换为 `wss`。
+3. 上述 `secret` 为用户在后台设置的 `服务器连接密钥`,而非 `推送 token`。
+
+### 接收消息
+1. 消息编码格式为 JSON。
+2. 具体内容:
+   ```json
+   {
+    "title": "标题",
+    "description": "描述",
+    "content": "内容",
+    "html_content": "转换为 HTML 后的内容",
+    "url": "链接"
+   }
+   ```
+   可能还有多余字段,忽略即可。    
+
+### 连接保活
+1. 每 `56s` 服务器将发送 `ping` 报文,客户端需要在 `60s` 回复 `pong` 报文,否则服务端将不再维护该连接。
+2. 服务端会主动回复客户端发来的 `ping` 报文。
+
+### 实现列表
+当前可用的 WebSocket 客户端实现有:
+1. 官方 WebSocket 桌面客户端实现:https://github.com/songquanpeng/personal-assistant
+2. 待补充
+
+欢迎在此提交你的客户端实现。