1
0
Эх сурвалжийг харах

feat: able to get message sending status now (#58)

JustSong 2 жил өмнө
parent
commit
4530104264

+ 9 - 7
README.md

@@ -58,16 +58,17 @@ _✨ 搭建专属于你的消息推送服务,支持多种消息推送方式,
    + Telegram 机器人,
    + Discord 群机器人,
    + 群组消息,可以将多个推送通道组合成一个群组,然后向群组发送消息,可以实现一次性推送到多个渠道的功能。
-2. 多种用户登录注册方式:
+2. 支持在 Web 端管理发送的消息,支持自动刷新。
+3. 支持异步消息。
+4. 多种用户登录注册方式:
    + 邮箱登录注册以及通过邮箱进行密码重置。
    + [GitHub 开放授权](https://github.com/settings/applications/new)。
    + 微信公众号授权(需要额外部署 [WeChat Server](https://github.com/songquanpeng/wechat-server))。
-3. 支持 Markdown。
-4. 支持用户管理。
-5. Cloudflare Turnstile 用户校验。
-6. 支持在线发布公告,设置关于界面以及页脚。
-7. 支持在 Web 端管理发送的消息,支持自动刷新。
-8. API 兼容其他消息推送服务,例如 [Server 酱](https://sct.ftqq.com/)。
+5. 支持 Markdown。
+6. 支持用户管理。
+7. Cloudflare Turnstile 用户校验。
+8. 支持在线发布公告,设置关于界面以及页脚。
+9. API 兼容其他消息推送服务,例如 [Server 酱](https://sct.ftqq.com/)。
 
 ## 用途
 1. [整合进自己的博客系统,每当有人登录时发微信消息提醒](https://github.com/songquanpeng/blog/blob/486d63e96ef7906a6c767653a20ec2d3278e9a4a/routes/user.js#L27)。
@@ -180,6 +181,7 @@ proxy_send_timeout 300s;
    7. `to`:选填,推送给指定用户,如果不填则默认推送给自己,受限于具体的消息推送方式,有些推送方式不支持此项。
       1. `@all`:推送给所有用户。
       2. `user1|user2|user3`:推送给多个用户,用户之间使用 `|` 分隔。
+   8. `async`:选填,如果设置为 `true` 则消息推送将在后台异步进行,返回结果包含 `uuid` 字段,可用于后续[获取消息发送状态](./docs/API.md#通过消息 UUID 获取消息发送状态)。
 3. `POST` 请求方式:字段与上面 `GET` 请求方式保持一致。
    + 注意:请求体编码格式为 `application/json`,`v0.3.2` 版本起支持 Post Form。
 

+ 63 - 0
channel/message-queue.go

@@ -0,0 +1,63 @@
+package channel
+
+import (
+	"message-pusher/common"
+	"message-pusher/model"
+)
+
+var AsyncMessageQueue chan int
+var AsyncMessageQueueSize = 128
+var AsyncMessageSenderNum = 2
+
+func init() {
+	AsyncMessageQueue = make(chan int, AsyncMessageQueueSize)
+	for i := 0; i < AsyncMessageSenderNum; i++ {
+		go asyncMessageSender()
+	}
+}
+
+// LoadAsyncMessages loads async pending messages from database.
+// We have to wait the database connection is ready.
+func LoadAsyncMessages() {
+	ids, err := model.GetAsyncPendingMessageIds()
+	if err != nil {
+		common.FatalLog("failed to load async pending messages: " + err.Error())
+	}
+	for _, id := range ids {
+		AsyncMessageQueue <- id
+	}
+}
+
+func asyncMessageSenderHelper(message *model.Message) error {
+	user, err := model.GetUserById(message.UserId, false)
+	if err != nil {
+		return err
+	}
+	channel_, err := model.GetChannelByName(message.Channel, user.Id)
+	if err != nil {
+		return err
+	}
+	return SendMessage(message, user, channel_)
+}
+
+func asyncMessageSender() {
+	for {
+		id := <-AsyncMessageQueue
+		message, err := model.GetMessageById(id)
+		if err != nil {
+			common.SysError("async message sender error: " + err.Error())
+			continue
+		}
+		err = asyncMessageSenderHelper(message)
+		status := common.MessageSendStatusFailed
+		if err != nil {
+			common.SysError("async message sender error: " + err.Error())
+		} else {
+			status = common.MessageSendStatusSent
+		}
+		err = message.UpdateStatus(status)
+		if err != nil {
+			common.SysError("async message sender error: " + err.Error())
+		}
+	}
+}

+ 5 - 4
common/constants.go

@@ -101,10 +101,11 @@ const (
 )
 
 const (
-	MessageSendStatusUnknown = 0
-	MessageSendStatusPending = 1
-	MessageSendStatusSent    = 2
-	MessageSendStatusFailed  = 3
+	MessageSendStatusUnknown      = 0
+	MessageSendStatusPending      = 1
+	MessageSendStatusSent         = 2
+	MessageSendStatusFailed       = 3
+	MessageSendStatusAsyncPending = 4
 )
 
 const (

+ 41 - 8
controller/message.go

@@ -38,6 +38,7 @@ func GetPushMessage(c *gin.Context) {
 		Desp:        c.Query("desp"),
 		Short:       c.Query("short"),
 		OpenId:      c.Query("openid"),
+		Async:       c.Query("async") == "true",
 	}
 	keepCompatible(&message)
 	pushMessageHelper(c, &message)
@@ -55,6 +56,7 @@ func PostPushMessage(c *gin.Context) {
 		Desp:        c.PostForm("desp"),
 		Short:       c.PostForm("short"),
 		OpenId:      c.PostForm("openid"),
+		Async:       c.PostForm("async") == "true",
 	}
 	if message == (model.Message{}) {
 		// Looks like the user is using JSON
@@ -142,6 +144,7 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
 	c.JSON(http.StatusOK, gin.H{
 		"success": true,
 		"message": "",
+		"uuid":    message.Link,
 	})
 }
 
@@ -149,6 +152,7 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode
 	if channel_.Status != common.ChannelStatusEnabled {
 		return errors.New("该渠道已被禁用")
 	}
+	common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value
 	message.Link = common.GetUUID()
 	if message.URL == "" {
 		message.URL = fmt.Sprintf("%s/message/%s", common.ServerAddress, message.Link)
@@ -158,25 +162,36 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode
 		defer func() {
 			// Update the status of the message
 			status := common.MessageSendStatusFailed
-			if success {
-				status = common.MessageSendStatusSent
+			if message.Async {
+				status = common.MessageSendStatusAsyncPending
+			} else {
+				if success {
+					status = common.MessageSendStatusSent
+				}
 			}
 			err := message.UpdateStatus(status)
 			if err != nil {
 				common.SysError("failed to update the status of the message: " + err.Error())
 			}
+			if message.Async {
+				channel.AsyncMessageQueue <- message.Id
+			}
 		}()
 		err := message.UpdateAndInsert(user.Id)
 		if err != nil {
 			return err
 		}
 	} else {
+		if message.Async {
+			return errors.New("异步发送消息需要用户具备消息持久化的权限")
+		}
 		message.Link = "unsaved" // This is for user to identify whether the message is saved
 	}
-	err := channel.SendMessage(message, user, channel_)
-	common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value
-	if err != nil {
-		return err
+	if !message.Async {
+		err := channel.SendMessage(message, user, channel_)
+		if err != nil {
+			return err
+		}
 	}
 	success = true
 	return nil // After this line, the message status will be updated
@@ -258,7 +273,7 @@ func GetUserMessages(c *gin.Context) {
 func GetMessage(c *gin.Context) {
 	messageId, _ := strconv.Atoi(c.Param("id"))
 	userId := c.GetInt("id")
-	message, err := model.GetMessageById(messageId, userId)
+	message, err := model.GetMessageByIds(messageId, userId)
 	if err != nil {
 		c.JSON(http.StatusOK, gin.H{
 			"success": false,
@@ -274,6 +289,24 @@ func GetMessage(c *gin.Context) {
 	return
 }
 
+func GetMessageStatus(c *gin.Context) {
+	link := c.Param("link")
+	status, err := model.GetMessageStatusByLink(link)
+	if err != nil {
+		c.JSON(http.StatusOK, gin.H{
+			"success": false,
+			"message": err.Error(),
+		})
+		return
+	}
+	c.JSON(http.StatusOK, gin.H{
+		"success": true,
+		"message": "",
+		"status":  status,
+	})
+	return
+}
+
 func SearchMessages(c *gin.Context) {
 	keyword := c.Query("keyword")
 	messages, err := model.SearchMessages(keyword)
@@ -296,7 +329,7 @@ func ResendMessage(c *gin.Context) {
 	messageId, _ := strconv.Atoi(c.Param("id"))
 	userId := c.GetInt("id")
 	helper := func() error {
-		message, err := model.GetMessageById(messageId, userId)
+		message, err := model.GetMessageByIds(messageId, userId)
 		message.Id = 0
 		if err != nil {
 			return err

+ 26 - 1
docs/API.md

@@ -33,4 +33,29 @@
 1. 官方 WebSocket 桌面客户端实现:https://github.com/songquanpeng/personal-assistant
 2. 待补充
 
-欢迎在此提交你的客户端实现。
+欢迎在此提交你的客户端实现。
+
+
+## 通过消息 UUID 获取消息发送状态
+1. API 端点为:`https://<domain>:<port>/api/message/status/<uuid>`
+2. 由于使用的是消息的 UUID 而非 ID,因此此处不需要鉴权,
+3. 返回内容示例:
+   ```json
+   {
+    "success": true,
+    "message": "",
+    "status": 2
+   }
+   ```
+4. 返回内容字段:
+   1. `success`:本次请求是否成功
+   2. `message`:错误信息
+   3. `status`:消息状态码。
+5. 消息状态码定义如下:
+   ```
+   MessageSendStatusUnknown      = 0
+   MessageSendStatusPending      = 1
+   MessageSendStatusSent         = 2
+   MessageSendStatusFailed       = 3
+   MessageSendStatusAsyncPending = 4
+   ```

+ 1 - 0
main.go

@@ -33,6 +33,7 @@ func main() {
 	if err != nil {
 		common.FatalLog(err)
 	}
+	go channel.LoadAsyncMessages()
 	defer func() {
 		err := model.CloseDB()
 		if err != nil {

+ 30 - 6
model/message.go

@@ -18,14 +18,15 @@ type Message struct {
 	HTMLContent string `json:"html_content"  gorm:"-:all"`
 	Timestamp   int64  `json:"timestamp" gorm:"type:bigint"`
 	Link        string `json:"link" gorm:"unique;index"`
-	To          string `json:"to" gorm:"column:to"`     // if specified, will send to this user(s)
-	Status      int    `json:"status" gorm:"default:0"` // pending, sent, failed
-	OpenId      string `json:"openid" gorm:"-:all"`     // alias for to
-	Desp        string `json:"desp" gorm:"-:all"`       // alias for content
-	Short       string `json:"short" gorm:"-:all"`      // alias for description
+	To          string `json:"to" gorm:"column:to"`           // if specified, will send to this user(s)
+	Status      int    `json:"status" gorm:"default:0;index"` // pending, sent, failed
+	OpenId      string `json:"openid" gorm:"-:all"`           // alias for to
+	Desp        string `json:"desp" gorm:"-:all"`             // alias for content
+	Short       string `json:"short" gorm:"-:all"`            // alias for description
+	Async       bool   `json:"async" gorm:"-"`                // if true, will send message asynchronously
 }
 
-func GetMessageById(id int, userId int) (*Message, error) {
+func GetMessageByIds(id int, userId int) (*Message, error) {
 	if id == 0 || userId == 0 {
 		return nil, errors.New("id 或 userId 为空!")
 	}
@@ -34,6 +35,20 @@ func GetMessageById(id int, userId int) (*Message, error) {
 	return &message, err
 }
 
+func GetMessageById(id int) (*Message, error) {
+	if id == 0 {
+		return nil, errors.New("id 为空!")
+	}
+	message := Message{Id: id}
+	err := DB.Where(message).First(&message).Error
+	return &message, err
+}
+
+func GetAsyncPendingMessageIds() (ids []int, err error) {
+	err = DB.Model(&Message{}).Where("status = ?", common.MessageSendStatusAsyncPending).Pluck("id", &ids).Error
+	return ids, err
+}
+
 func GetMessageByLink(link string) (*Message, error) {
 	if link == "" {
 		return nil, errors.New("link 为空!")
@@ -43,6 +58,15 @@ func GetMessageByLink(link string) (*Message, error) {
 	return &message, err
 }
 
+func GetMessageStatusByLink(link string) (int, error) {
+	if link == "" {
+		return common.MessageSendStatusUnknown, errors.New("link 为空!")
+	}
+	message := Message{}
+	err := DB.Where("link = ?", link).Select("status").First(&message).Error
+	return message.Status, err
+}
+
 func GetMessagesByUserId(userId int, startIdx int, num int) (messages []*Message, err error) {
 	err = DB.Where("user_id = ?", userId).Order("id desc").Limit(num).Offset(startIdx).Find(&messages).Error
 	return messages, err

+ 1 - 0
router/api-router.go

@@ -59,6 +59,7 @@ func SetApiRouter(router *gin.Engine) {
 		{
 			messageRoute.GET("/", middleware.UserAuth(), controller.GetUserMessages)
 			messageRoute.GET("/search", middleware.UserAuth(), controller.SearchMessages)
+			messageRoute.GET("/status/:link", controller.GetMessageStatus)
 			messageRoute.POST("/resend/:id", middleware.UserAuth(), controller.ResendMessage)
 			messageRoute.GET("/:id", middleware.UserAuth(), controller.GetMessage)
 			messageRoute.DELETE("/", middleware.RootAuth(), controller.DeleteAllMessages)

+ 7 - 1
web/src/components/MessagesTable.js

@@ -17,7 +17,7 @@ function renderStatus(status) {
     case 1:
       return (
         <Label basic color='olive'>
-          投递中...
+          正在投递
         </Label>
       );
     case 2:
@@ -32,6 +32,12 @@ function renderStatus(status) {
           发送失败
         </Label>
       );
+    case 4:
+      return (
+        <Label basic color='orange'>
+          已在队列
+        </Label>
+      );
     default:
       return (
         <Label basic color='grey'>