queue.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "cmp"
  9. "slices"
  10. "time"
  11. "github.com/syncthing/syncthing/lib/rand"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. )
  14. type jobQueue struct {
  15. progress []string
  16. queued []jobQueueEntry
  17. mut sync.Mutex
  18. }
  19. type jobQueueEntry struct {
  20. name string
  21. size int64
  22. modified int64
  23. }
  24. func newJobQueue() *jobQueue {
  25. return &jobQueue{
  26. mut: sync.NewMutex(),
  27. }
  28. }
  29. func (q *jobQueue) Push(file string, size int64, modified time.Time) {
  30. q.mut.Lock()
  31. // The range of UnixNano covers a range of reasonable timestamps.
  32. q.queued = append(q.queued, jobQueueEntry{file, size, modified.UnixNano()})
  33. q.mut.Unlock()
  34. }
  35. func (q *jobQueue) Pop() (string, bool) {
  36. q.mut.Lock()
  37. defer q.mut.Unlock()
  38. if len(q.queued) == 0 {
  39. return "", false
  40. }
  41. f := q.queued[0].name
  42. q.queued = q.queued[1:]
  43. q.progress = append(q.progress, f)
  44. return f, true
  45. }
  46. func (q *jobQueue) BringToFront(filename string) {
  47. q.mut.Lock()
  48. defer q.mut.Unlock()
  49. for i, cur := range q.queued {
  50. if cur.name == filename {
  51. if i > 0 {
  52. // Shift the elements before the selected element one step to
  53. // the right, overwriting the selected element
  54. copy(q.queued[1:i+1], q.queued[0:])
  55. // Put the selected element at the front
  56. q.queued[0] = cur
  57. }
  58. return
  59. }
  60. }
  61. }
  62. func (q *jobQueue) Done(file string) {
  63. q.mut.Lock()
  64. defer q.mut.Unlock()
  65. for i := range q.progress {
  66. if q.progress[i] == file {
  67. copy(q.progress[i:], q.progress[i+1:])
  68. q.progress = q.progress[:len(q.progress)-1]
  69. return
  70. }
  71. }
  72. }
  73. // Jobs returns a paginated list of file currently being pulled and files queued
  74. // to be pulled. It also returns how many items were skipped.
  75. func (q *jobQueue) Jobs(page, perpage int) ([]string, []string, int) {
  76. q.mut.Lock()
  77. defer q.mut.Unlock()
  78. toSkip := (page - 1) * perpage
  79. plen := len(q.progress)
  80. qlen := len(q.queued)
  81. if tot := plen + qlen; tot <= toSkip {
  82. return nil, nil, tot
  83. }
  84. if plen >= toSkip+perpage {
  85. progress := make([]string, perpage)
  86. copy(progress, q.progress[toSkip:toSkip+perpage])
  87. return progress, nil, toSkip
  88. }
  89. var progress []string
  90. if plen > toSkip {
  91. progress = make([]string, plen-toSkip)
  92. copy(progress, q.progress[toSkip:plen])
  93. toSkip = 0
  94. } else {
  95. toSkip -= plen
  96. }
  97. var queued []string
  98. if qlen-toSkip < perpage-len(progress) {
  99. queued = make([]string, qlen-toSkip)
  100. } else {
  101. queued = make([]string, perpage-len(progress))
  102. }
  103. for i := range queued {
  104. queued[i] = q.queued[i+toSkip].name
  105. }
  106. return progress, queued, (page - 1) * perpage
  107. }
  108. func (q *jobQueue) Shuffle() {
  109. q.mut.Lock()
  110. defer q.mut.Unlock()
  111. rand.Shuffle(q.queued)
  112. }
  113. func (q *jobQueue) Reset() {
  114. q.mut.Lock()
  115. defer q.mut.Unlock()
  116. q.progress = nil
  117. q.queued = nil
  118. }
  119. func (q *jobQueue) lenQueued() int {
  120. q.mut.Lock()
  121. defer q.mut.Unlock()
  122. return len(q.queued)
  123. }
  124. func (q *jobQueue) lenProgress() int {
  125. q.mut.Lock()
  126. defer q.mut.Unlock()
  127. return len(q.progress)
  128. }
  129. func (q *jobQueue) SortSmallestFirst() {
  130. q.mut.Lock()
  131. defer q.mut.Unlock()
  132. slices.SortFunc(q.queued, func(a, b jobQueueEntry) int {
  133. return cmp.Compare(a.size, b.size)
  134. })
  135. }
  136. func (q *jobQueue) SortLargestFirst() {
  137. q.mut.Lock()
  138. defer q.mut.Unlock()
  139. slices.SortFunc(q.queued, func(a, b jobQueueEntry) int {
  140. return cmp.Compare(b.size, a.size)
  141. })
  142. }
  143. func (q *jobQueue) SortOldestFirst() {
  144. q.mut.Lock()
  145. defer q.mut.Unlock()
  146. slices.SortFunc(q.queued, func(a, b jobQueueEntry) int {
  147. return cmp.Compare(a.modified, b.modified)
  148. })
  149. }
  150. func (q *jobQueue) SortNewestFirst() {
  151. q.mut.Lock()
  152. defer q.mut.Unlock()
  153. slices.SortFunc(q.queued, func(a, b jobQueueEntry) int {
  154. return cmp.Compare(b.modified, a.modified)
  155. })
  156. }