message.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package message
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/cloudwego/eino/schema"
  6. "github.com/google/uuid"
  7. "github.com/kujtimiihoxha/termai/internal/db"
  8. "github.com/kujtimiihoxha/termai/internal/pubsub"
  9. )
  10. type Message struct {
  11. ID string
  12. SessionID string
  13. MessageData schema.Message
  14. CreatedAt int64
  15. UpdatedAt int64
  16. }
  17. type Service interface {
  18. pubsub.Suscriber[Message]
  19. Create(sessionID string, messageData schema.Message) (Message, error)
  20. Get(id string) (Message, error)
  21. List(sessionID string) ([]Message, error)
  22. Delete(id string) error
  23. DeleteSessionMessages(sessionID string) error
  24. }
  25. type service struct {
  26. *pubsub.Broker[Message]
  27. q db.Querier
  28. ctx context.Context
  29. }
  30. func (s *service) Create(sessionID string, messageData schema.Message) (Message, error) {
  31. messageDataJSON, err := json.Marshal(messageData)
  32. if err != nil {
  33. return Message{}, err
  34. }
  35. dbMessage, err := s.q.CreateMessage(s.ctx, db.CreateMessageParams{
  36. ID: uuid.New().String(),
  37. SessionID: sessionID,
  38. MessageData: string(messageDataJSON),
  39. })
  40. if err != nil {
  41. return Message{}, err
  42. }
  43. message := s.fromDBItem(dbMessage)
  44. s.Publish(pubsub.CreatedEvent, message)
  45. return message, nil
  46. }
  47. func (s *service) Delete(id string) error {
  48. message, err := s.Get(id)
  49. if err != nil {
  50. return err
  51. }
  52. err = s.q.DeleteMessage(s.ctx, message.ID)
  53. if err != nil {
  54. return err
  55. }
  56. s.Publish(pubsub.DeletedEvent, message)
  57. return nil
  58. }
  59. func (s *service) DeleteSessionMessages(sessionID string) error {
  60. messages, err := s.List(sessionID)
  61. if err != nil {
  62. return err
  63. }
  64. for _, message := range messages {
  65. if message.SessionID == sessionID {
  66. err = s.Delete(message.ID)
  67. if err != nil {
  68. return err
  69. }
  70. }
  71. }
  72. return nil
  73. }
  74. func (s *service) Get(id string) (Message, error) {
  75. dbMessage, err := s.q.GetMessage(s.ctx, id)
  76. if err != nil {
  77. return Message{}, err
  78. }
  79. return s.fromDBItem(dbMessage), nil
  80. }
  81. func (s *service) List(sessionID string) ([]Message, error) {
  82. dbMessages, err := s.q.ListMessagesBySession(s.ctx, sessionID)
  83. if err != nil {
  84. return nil, err
  85. }
  86. messages := make([]Message, len(dbMessages))
  87. for i, dbMessage := range dbMessages {
  88. messages[i] = s.fromDBItem(dbMessage)
  89. }
  90. return messages, nil
  91. }
  92. func (s *service) fromDBItem(item db.Message) Message {
  93. var messageData schema.Message
  94. json.Unmarshal([]byte(item.MessageData), &messageData)
  95. return Message{
  96. ID: item.ID,
  97. SessionID: item.SessionID,
  98. MessageData: messageData,
  99. CreatedAt: item.CreatedAt,
  100. UpdatedAt: item.UpdatedAt,
  101. }
  102. }
  103. func NewService(ctx context.Context, q db.Querier) Service {
  104. return &service{
  105. Broker: pubsub.NewBroker[Message](),
  106. q: q,
  107. ctx: ctx,
  108. }
  109. }