filequeue.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package model
  2. import (
  3. "log"
  4. "sort"
  5. "sync"
  6. "time"
  7. )
  8. type Monitor interface {
  9. FileBegins(<-chan content) error
  10. FileDone() error
  11. }
  12. type FileQueue struct {
  13. files queuedFileList
  14. sorted bool
  15. fmut sync.Mutex // protects files and sorted
  16. availability map[string][]string
  17. amut sync.Mutex // protects availability
  18. queued map[string]bool
  19. }
  20. type queuedFile struct {
  21. name string
  22. blocks []Block
  23. activeBlocks []bool
  24. given int
  25. remaining int
  26. channel chan content
  27. nodes []string
  28. nodesChecked time.Time
  29. monitor Monitor
  30. }
  31. type content struct {
  32. offset int64
  33. data []byte
  34. }
  35. type queuedFileList []queuedFile
  36. func (l queuedFileList) Len() int { return len(l) }
  37. func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
  38. func (l queuedFileList) Less(a, b int) bool {
  39. // Sort by most blocks already given out, then alphabetically
  40. if l[a].given != l[b].given {
  41. return l[a].given > l[b].given
  42. }
  43. return l[a].name < l[b].name
  44. }
  45. type queuedBlock struct {
  46. name string
  47. block Block
  48. index int
  49. }
  50. func NewFileQueue() *FileQueue {
  51. return &FileQueue{
  52. availability: make(map[string][]string),
  53. queued: make(map[string]bool),
  54. }
  55. }
  56. func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
  57. q.fmut.Lock()
  58. defer q.fmut.Unlock()
  59. if q.queued[name] {
  60. return
  61. }
  62. q.files = append(q.files, queuedFile{
  63. name: name,
  64. blocks: blocks,
  65. activeBlocks: make([]bool, len(blocks)),
  66. remaining: len(blocks),
  67. channel: make(chan content),
  68. monitor: monitor,
  69. })
  70. q.queued[name] = true
  71. q.sorted = false
  72. }
  73. func (q *FileQueue) Len() int {
  74. q.fmut.Lock()
  75. defer q.fmut.Unlock()
  76. return len(q.files)
  77. }
  78. func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
  79. q.fmut.Lock()
  80. defer q.fmut.Unlock()
  81. if !q.sorted {
  82. sort.Sort(q.files)
  83. q.sorted = true
  84. }
  85. for i := range q.files {
  86. qf := &q.files[i]
  87. q.amut.Lock()
  88. av := q.availability[qf.name]
  89. q.amut.Unlock()
  90. if len(av) == 0 {
  91. // Noone has the file we want; abort.
  92. if qf.remaining != len(qf.blocks) {
  93. // We have already started on this file; close it down
  94. close(qf.channel)
  95. if mon := qf.monitor; mon != nil {
  96. mon.FileDone()
  97. }
  98. }
  99. delete(q.queued, qf.name)
  100. q.deleteAt(i)
  101. return queuedBlock{}, false
  102. }
  103. for _, ni := range av {
  104. // Find and return the next block in the queue
  105. if ni == nodeID {
  106. for j, b := range qf.blocks {
  107. if !qf.activeBlocks[j] {
  108. qf.activeBlocks[j] = true
  109. qf.given++
  110. return queuedBlock{
  111. name: qf.name,
  112. block: b,
  113. index: j,
  114. }, true
  115. }
  116. }
  117. break
  118. }
  119. }
  120. }
  121. // We found nothing to do
  122. return queuedBlock{}, false
  123. }
  124. func (q *FileQueue) Done(file string, offset int64, data []byte) {
  125. q.fmut.Lock()
  126. defer q.fmut.Unlock()
  127. c := content{
  128. offset: offset,
  129. data: data,
  130. }
  131. for i := range q.files {
  132. qf := &q.files[i]
  133. if qf.name == file {
  134. if qf.monitor != nil && qf.remaining == len(qf.blocks) {
  135. err := qf.monitor.FileBegins(qf.channel)
  136. if err != nil {
  137. log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
  138. delete(q.queued, qf.name)
  139. q.deleteAt(i)
  140. return
  141. }
  142. }
  143. qf.channel <- c
  144. qf.remaining--
  145. if qf.remaining == 0 {
  146. close(qf.channel)
  147. if qf.monitor != nil {
  148. err := qf.monitor.FileDone()
  149. if err != nil {
  150. log.Printf("WARNING: %s: %v", qf.name, err)
  151. }
  152. }
  153. delete(q.queued, qf.name)
  154. q.deleteAt(i)
  155. }
  156. return
  157. }
  158. }
  159. // We found nothing, might have errored out already
  160. }
  161. func (q *FileQueue) QueuedFiles() (files []string) {
  162. q.fmut.Lock()
  163. defer q.fmut.Unlock()
  164. for _, qf := range q.files {
  165. files = append(files, qf.name)
  166. }
  167. return
  168. }
  169. func (q *FileQueue) deleteAt(i int) {
  170. q.files = append(q.files[:i], q.files[i+1:]...)
  171. }
  172. func (q *FileQueue) deleteFile(n string) {
  173. for i, file := range q.files {
  174. if n == file.name {
  175. q.deleteAt(i)
  176. delete(q.queued, file.name)
  177. return
  178. }
  179. }
  180. }
  181. func (q *FileQueue) SetAvailable(file string, nodes []string) {
  182. q.amut.Lock()
  183. defer q.amut.Unlock()
  184. q.availability[file] = nodes
  185. }
  186. func (q *FileQueue) RemoveAvailable(toRemove string) {
  187. q.amut.Lock()
  188. q.fmut.Lock()
  189. defer q.fmut.Unlock()
  190. defer q.amut.Unlock()
  191. for file, nodes := range q.availability {
  192. for i, node := range nodes {
  193. if node == toRemove {
  194. q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])]
  195. if len(q.availability[file]) == 0 {
  196. q.deleteFile(file)
  197. }
  198. }
  199. break
  200. }
  201. }
  202. }