filequeue.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package model
  2. import (
  3. "log"
  4. "sort"
  5. "sync"
  6. "time"
  7. )
  8. type Monitor interface {
  9. FileBegins(<-chan content) error
  10. FileDone() error
  11. }
  12. type FileQueue struct {
  13. files queuedFileList
  14. sorted bool
  15. fmut sync.Mutex // protects files and sorted
  16. availability map[string][]string
  17. amut sync.Mutex // protects availability
  18. }
  19. type queuedFile struct {
  20. name string
  21. blocks []Block
  22. activeBlocks []bool
  23. given int
  24. remaining int
  25. channel chan content
  26. nodes []string
  27. nodesChecked time.Time
  28. monitor Monitor
  29. }
  30. type content struct {
  31. offset int64
  32. data []byte
  33. }
  34. type queuedFileList []queuedFile
  35. func (l queuedFileList) Len() int { return len(l) }
  36. func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
  37. func (l queuedFileList) Less(a, b int) bool {
  38. // Sort by most blocks already given out, then alphabetically
  39. if l[a].given != l[b].given {
  40. return l[a].given > l[b].given
  41. }
  42. return l[a].name < l[b].name
  43. }
  44. type queuedBlock struct {
  45. name string
  46. block Block
  47. index int
  48. }
  49. func NewFileQueue() *FileQueue {
  50. return &FileQueue{
  51. availability: make(map[string][]string),
  52. }
  53. }
  54. func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
  55. q.fmut.Lock()
  56. defer q.fmut.Unlock()
  57. for _, f := range q.files {
  58. if f.name == name {
  59. panic("re-adding added file " + f.name)
  60. }
  61. }
  62. q.files = append(q.files, queuedFile{
  63. name: name,
  64. blocks: blocks,
  65. activeBlocks: make([]bool, len(blocks)),
  66. remaining: len(blocks),
  67. channel: make(chan content),
  68. monitor: monitor,
  69. })
  70. q.sorted = false
  71. }
  72. func (q *FileQueue) Len() int {
  73. q.fmut.Lock()
  74. defer q.fmut.Unlock()
  75. return len(q.files)
  76. }
  77. func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
  78. q.fmut.Lock()
  79. defer q.fmut.Unlock()
  80. if !q.sorted {
  81. sort.Sort(q.files)
  82. q.sorted = true
  83. }
  84. for i := range q.files {
  85. qf := &q.files[i]
  86. q.amut.Lock()
  87. av := q.availability[qf.name]
  88. q.amut.Unlock()
  89. if len(av) == 0 {
  90. // Noone has the file we want; abort.
  91. if qf.remaining != len(qf.blocks) {
  92. // We have already started on this file; close it down
  93. close(qf.channel)
  94. if mon := qf.monitor; mon != nil {
  95. mon.FileDone()
  96. }
  97. }
  98. q.deleteAt(i)
  99. return queuedBlock{}, false
  100. }
  101. for _, ni := range av {
  102. // Find and return the next block in the queue
  103. if ni == nodeID {
  104. for j, b := range qf.blocks {
  105. if !qf.activeBlocks[j] {
  106. qf.activeBlocks[j] = true
  107. qf.given++
  108. return queuedBlock{
  109. name: qf.name,
  110. block: b,
  111. index: j,
  112. }, true
  113. }
  114. }
  115. break
  116. }
  117. }
  118. }
  119. // We found nothing to do
  120. return queuedBlock{}, false
  121. }
  122. func (q *FileQueue) Done(file string, offset int64, data []byte) {
  123. q.fmut.Lock()
  124. defer q.fmut.Unlock()
  125. c := content{
  126. offset: offset,
  127. data: data,
  128. }
  129. for i := range q.files {
  130. qf := &q.files[i]
  131. if qf.name == file {
  132. if qf.monitor != nil && qf.remaining == len(qf.blocks) {
  133. err := qf.monitor.FileBegins(qf.channel)
  134. if err != nil {
  135. log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
  136. q.deleteAt(i)
  137. return
  138. }
  139. }
  140. qf.channel <- c
  141. qf.remaining--
  142. if qf.remaining == 0 {
  143. close(qf.channel)
  144. if qf.monitor != nil {
  145. err := qf.monitor.FileDone()
  146. if err != nil {
  147. log.Printf("WARNING: %s: %v", qf.name, err)
  148. }
  149. }
  150. q.deleteAt(i)
  151. }
  152. return
  153. }
  154. }
  155. panic("unreachable")
  156. }
  157. func (q *FileQueue) Queued(file string) bool {
  158. q.fmut.Lock()
  159. defer q.fmut.Unlock()
  160. for _, qf := range q.files {
  161. if qf.name == file {
  162. return true
  163. }
  164. }
  165. return false
  166. }
  167. func (q *FileQueue) QueuedFiles() (files []string) {
  168. q.fmut.Lock()
  169. defer q.fmut.Unlock()
  170. for _, qf := range q.files {
  171. files = append(files, qf.name)
  172. }
  173. return
  174. }
  175. func (q *FileQueue) deleteAt(i int) {
  176. q.files = q.files[:i+copy(q.files[i:], q.files[i+1:])]
  177. }
  178. func (q *FileQueue) deleteFile(n string) {
  179. for i, file := range q.files {
  180. if n == file.name {
  181. q.deleteAt(i)
  182. return
  183. }
  184. }
  185. }
  186. func (q *FileQueue) SetAvailable(file string, nodes []string) {
  187. q.amut.Lock()
  188. defer q.amut.Unlock()
  189. q.availability[file] = nodes
  190. }
  191. func (q *FileQueue) RemoveAvailable(toRemove string) {
  192. q.amut.Lock()
  193. defer q.amut.Unlock()
  194. for file, nodes := range q.availability {
  195. for i, node := range nodes {
  196. if node == toRemove {
  197. q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])]
  198. if len(q.availability[file]) == 0 {
  199. q.deleteFile(file)
  200. }
  201. }
  202. break
  203. }
  204. }
  205. }