queue.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package core
  2. import (
  3. "errors"
  4. "sync"
  5. )
  6. var queues sync.Map
  7. type Queue struct {
  8. data []*QMessage
  9. head int
  10. tail int
  11. size int
  12. lock *sync.Mutex
  13. }
  14. func NewQueue(name string, size int) *Queue {
  15. q := &Queue{
  16. head: 0,
  17. tail: 0,
  18. size: size,
  19. }
  20. v, ok := queues.LoadOrStore(name, q)
  21. if ok {
  22. q = v.(*Queue)
  23. } else {
  24. q.data = make([]*QMessage, size)
  25. q.lock = new(sync.Mutex)
  26. }
  27. return q
  28. }
  29. func (q *Queue) Enqueue(value *QMessage) error {
  30. q.lock.Lock()
  31. defer q.lock.Unlock()
  32. if q.IsFull() {
  33. return errors.New("queue is full")
  34. }
  35. q.data[q.tail] = value
  36. q.tail = (q.tail + 1) % q.size
  37. return nil
  38. }
  39. func (q *Queue) Dequeue() (*QMessage, error) {
  40. q.lock.Lock()
  41. defer q.lock.Unlock()
  42. if q.IsEmpty() {
  43. return nil, errors.New("queue is empty")
  44. }
  45. value := q.data[q.head]
  46. q.head = (q.head + 1) % q.size
  47. return value, nil
  48. }
  49. func (q *Queue) IsEmpty() bool {
  50. return q.head == q.tail
  51. }
  52. func (q *Queue) IsFull() bool {
  53. return (q.tail+1)%q.size == q.head
  54. }
  55. func (q *Queue) Size() int {
  56. return (q.tail - q.head + q.size) % q.size
  57. }
  58. func (q *Queue) GetValues() []*QMessage {
  59. q.lock.Lock()
  60. defer q.lock.Unlock()
  61. values := make([]*QMessage, q.Size())
  62. for i := 0; i < q.Size(); i++ {
  63. values[i] = q.data[(q.head+i)%q.size]
  64. }
  65. return values
  66. }