message-queue.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package channel
  2. import (
  3. "message-pusher/common"
  4. "message-pusher/model"
  5. )
  6. var AsyncMessageQueue chan int
  7. var AsyncMessageQueueSize = 128
  8. var AsyncMessageSenderNum = 2
  9. func init() {
  10. AsyncMessageQueue = make(chan int, AsyncMessageQueueSize)
  11. for i := 0; i < AsyncMessageSenderNum; i++ {
  12. go asyncMessageSender()
  13. }
  14. }
  15. // LoadAsyncMessages loads async pending messages from database.
  16. // We have to wait the database connection is ready.
  17. func LoadAsyncMessages() {
  18. ids, err := model.GetAsyncPendingMessageIds()
  19. if err != nil {
  20. common.FatalLog("failed to load async pending messages: " + err.Error())
  21. }
  22. for _, id := range ids {
  23. AsyncMessageQueue <- id
  24. }
  25. }
  26. func asyncMessageSenderHelper(message *model.Message) error {
  27. user, err := model.GetUserById(message.UserId, false)
  28. if err != nil {
  29. return err
  30. }
  31. channel_, err := model.GetChannelByName(message.Channel, user.Id)
  32. if err != nil {
  33. return err
  34. }
  35. return SendMessage(message, user, channel_)
  36. }
  37. func asyncMessageSender() {
  38. for {
  39. id := <-AsyncMessageQueue
  40. message, err := model.GetMessageById(id)
  41. if err != nil {
  42. common.SysError("async message sender error: " + err.Error())
  43. continue
  44. }
  45. err = asyncMessageSenderHelper(message)
  46. status := common.MessageSendStatusFailed
  47. if err != nil {
  48. common.SysError("async message sender error: " + err.Error())
  49. } else {
  50. status = common.MessageSendStatusSent
  51. }
  52. err = message.UpdateStatus(status)
  53. if err != nil {
  54. common.SysError("async message sender error: " + err.Error())
  55. }
  56. }
  57. }