filequeue.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package model
  2. import (
  3. "log"
  4. "sort"
  5. "sync"
  6. "time"
  7. )
  8. type Monitor interface {
  9. FileBegins(<-chan content) error
  10. FileDone() error
  11. }
  12. type FileQueue struct {
  13. files queuedFileList
  14. lock sync.Mutex
  15. sorted bool
  16. availability map[string][]string
  17. }
  18. type queuedFile struct {
  19. name string
  20. blocks []Block
  21. activeBlocks []bool
  22. given int
  23. remaining int
  24. channel chan content
  25. nodes []string
  26. nodesChecked time.Time
  27. monitor Monitor
  28. }
  29. type content struct {
  30. offset int64
  31. data []byte
  32. }
  33. type queuedFileList []queuedFile
  34. func (l queuedFileList) Len() int { return len(l) }
  35. func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
  36. func (l queuedFileList) Less(a, b int) bool {
  37. // Sort by most blocks already given out, then alphabetically
  38. if l[a].given != l[b].given {
  39. return l[a].given > l[b].given
  40. }
  41. return l[a].name < l[b].name
  42. }
  43. type queuedBlock struct {
  44. name string
  45. block Block
  46. index int
  47. }
  48. func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
  49. q.lock.Lock()
  50. defer q.lock.Unlock()
  51. q.files = append(q.files, queuedFile{
  52. name: name,
  53. blocks: blocks,
  54. activeBlocks: make([]bool, len(blocks)),
  55. remaining: len(blocks),
  56. channel: make(chan content),
  57. monitor: monitor,
  58. })
  59. q.sorted = false
  60. }
  61. func (q *FileQueue) Len() int {
  62. q.lock.Lock()
  63. defer q.lock.Unlock()
  64. return len(q.files)
  65. }
  66. func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
  67. q.lock.Lock()
  68. defer q.lock.Unlock()
  69. if !q.sorted {
  70. sort.Sort(q.files)
  71. q.sorted = true
  72. }
  73. for i := range q.files {
  74. qf := &q.files[i]
  75. if len(q.availability[qf.name]) == 0 {
  76. // Noone has the file we want; abort.
  77. if qf.remaining != len(qf.blocks) {
  78. // We have already started on this file; close it down
  79. close(qf.channel)
  80. if mon := qf.monitor; mon != nil {
  81. mon.FileDone()
  82. }
  83. }
  84. q.deleteAt(i)
  85. return queuedBlock{}, false
  86. }
  87. for _, ni := range q.availability[qf.name] {
  88. // Find and return the next block in the queue
  89. if ni == nodeID {
  90. for j, b := range qf.blocks {
  91. if !qf.activeBlocks[j] {
  92. qf.activeBlocks[j] = true
  93. qf.given++
  94. return queuedBlock{
  95. name: qf.name,
  96. block: b,
  97. index: j,
  98. }, true
  99. }
  100. }
  101. break
  102. }
  103. }
  104. }
  105. // We found nothing to do
  106. return queuedBlock{}, false
  107. }
  108. func (q *FileQueue) Done(file string, offset int64, data []byte) {
  109. q.lock.Lock()
  110. defer q.lock.Unlock()
  111. c := content{
  112. offset: offset,
  113. data: data,
  114. }
  115. for i := range q.files {
  116. qf := &q.files[i]
  117. if qf.name == file {
  118. if qf.monitor != nil && qf.remaining == len(qf.blocks) {
  119. err := qf.monitor.FileBegins(qf.channel)
  120. if err != nil {
  121. log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
  122. q.deleteAt(i)
  123. return
  124. }
  125. }
  126. qf.channel <- c
  127. qf.remaining--
  128. if qf.remaining == 0 {
  129. close(qf.channel)
  130. if qf.monitor != nil {
  131. err := qf.monitor.FileDone()
  132. if err != nil {
  133. log.Printf("WARNING: %s: %v", qf.name, err)
  134. }
  135. }
  136. q.deleteAt(i)
  137. }
  138. return
  139. }
  140. }
  141. panic("unreachable")
  142. }
  143. func (q *FileQueue) Queued(file string) bool {
  144. q.lock.Lock()
  145. defer q.lock.Unlock()
  146. for _, qf := range q.files {
  147. if qf.name == file {
  148. return true
  149. }
  150. }
  151. return false
  152. }
  153. func (q *FileQueue) QueuedFiles() (files []string) {
  154. q.lock.Lock()
  155. defer q.lock.Unlock()
  156. for _, qf := range q.files {
  157. files = append(files, qf.name)
  158. }
  159. return
  160. }
  161. func (q *FileQueue) deleteAt(i int) {
  162. q.files = q.files[:i+copy(q.files[i:], q.files[i+1:])]
  163. }
  164. func (q *FileQueue) SetAvailable(file, node string) {
  165. q.lock.Lock()
  166. defer q.lock.Unlock()
  167. if q.availability == nil {
  168. q.availability = make(map[string][]string)
  169. }
  170. q.availability[file] = []string{node}
  171. }
  172. func (q *FileQueue) AddAvailable(file, node string) {
  173. q.lock.Lock()
  174. defer q.lock.Unlock()
  175. if q.availability == nil {
  176. q.availability = make(map[string][]string)
  177. }
  178. q.availability[file] = append(q.availability[file], node)
  179. }