filequeue.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package model
  2. import (
  3. "log"
  4. "sort"
  5. "sync"
  6. "time"
  7. )
  8. type Monitor interface {
  9. FileBegins(<-chan content) error
  10. FileDone() error
  11. }
  12. type FileQueue struct {
  13. files queuedFileList
  14. sorted bool
  15. fmut sync.Mutex // protects files and sorted
  16. availability map[string][]string
  17. amut sync.Mutex // protects availability
  18. }
  19. type queuedFile struct {
  20. name string
  21. blocks []Block
  22. activeBlocks []bool
  23. given int
  24. remaining int
  25. channel chan content
  26. nodes []string
  27. nodesChecked time.Time
  28. monitor Monitor
  29. }
  30. type content struct {
  31. offset int64
  32. data []byte
  33. }
  34. type queuedFileList []queuedFile
  35. func (l queuedFileList) Len() int { return len(l) }
  36. func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
  37. func (l queuedFileList) Less(a, b int) bool {
  38. // Sort by most blocks already given out, then alphabetically
  39. if l[a].given != l[b].given {
  40. return l[a].given > l[b].given
  41. }
  42. return l[a].name < l[b].name
  43. }
  44. type queuedBlock struct {
  45. name string
  46. block Block
  47. index int
  48. }
  49. func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
  50. q.fmut.Lock()
  51. defer q.fmut.Unlock()
  52. for _, f := range q.files {
  53. if f.name == name {
  54. panic("re-adding added file " + f.name)
  55. }
  56. }
  57. q.files = append(q.files, queuedFile{
  58. name: name,
  59. blocks: blocks,
  60. activeBlocks: make([]bool, len(blocks)),
  61. remaining: len(blocks),
  62. channel: make(chan content),
  63. monitor: monitor,
  64. })
  65. q.sorted = false
  66. }
  67. func (q *FileQueue) Len() int {
  68. q.fmut.Lock()
  69. defer q.fmut.Unlock()
  70. return len(q.files)
  71. }
  72. func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
  73. q.fmut.Lock()
  74. defer q.fmut.Unlock()
  75. if !q.sorted {
  76. sort.Sort(q.files)
  77. q.sorted = true
  78. }
  79. for i := range q.files {
  80. qf := &q.files[i]
  81. q.amut.Lock()
  82. av := q.availability[qf.name]
  83. q.amut.Unlock()
  84. if len(av) == 0 {
  85. // Noone has the file we want; abort.
  86. if qf.remaining != len(qf.blocks) {
  87. // We have already started on this file; close it down
  88. close(qf.channel)
  89. if mon := qf.monitor; mon != nil {
  90. mon.FileDone()
  91. }
  92. }
  93. q.deleteAt(i)
  94. return queuedBlock{}, false
  95. }
  96. for _, ni := range av {
  97. // Find and return the next block in the queue
  98. if ni == nodeID {
  99. for j, b := range qf.blocks {
  100. if !qf.activeBlocks[j] {
  101. qf.activeBlocks[j] = true
  102. qf.given++
  103. return queuedBlock{
  104. name: qf.name,
  105. block: b,
  106. index: j,
  107. }, true
  108. }
  109. }
  110. break
  111. }
  112. }
  113. }
  114. // We found nothing to do
  115. return queuedBlock{}, false
  116. }
  117. func (q *FileQueue) Done(file string, offset int64, data []byte) {
  118. q.fmut.Lock()
  119. defer q.fmut.Unlock()
  120. c := content{
  121. offset: offset,
  122. data: data,
  123. }
  124. for i := range q.files {
  125. qf := &q.files[i]
  126. if qf.name == file {
  127. if qf.monitor != nil && qf.remaining == len(qf.blocks) {
  128. err := qf.monitor.FileBegins(qf.channel)
  129. if err != nil {
  130. log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
  131. q.deleteAt(i)
  132. return
  133. }
  134. }
  135. qf.channel <- c
  136. qf.remaining--
  137. if qf.remaining == 0 {
  138. close(qf.channel)
  139. if qf.monitor != nil {
  140. err := qf.monitor.FileDone()
  141. if err != nil {
  142. log.Printf("WARNING: %s: %v", qf.name, err)
  143. }
  144. }
  145. q.deleteAt(i)
  146. }
  147. return
  148. }
  149. }
  150. panic("unreachable")
  151. }
  152. func (q *FileQueue) Queued(file string) bool {
  153. q.fmut.Lock()
  154. defer q.fmut.Unlock()
  155. for _, qf := range q.files {
  156. if qf.name == file {
  157. return true
  158. }
  159. }
  160. return false
  161. }
  162. func (q *FileQueue) QueuedFiles() (files []string) {
  163. q.fmut.Lock()
  164. defer q.fmut.Unlock()
  165. for _, qf := range q.files {
  166. files = append(files, qf.name)
  167. }
  168. return
  169. }
  170. func (q *FileQueue) deleteAt(i int) {
  171. q.files = q.files[:i+copy(q.files[i:], q.files[i+1:])]
  172. }
  173. func (q *FileQueue) deleteFile(n string) {
  174. for i, file := range q.files {
  175. if n == file.name {
  176. q.deleteAt(i)
  177. return
  178. }
  179. }
  180. }
  181. func (q *FileQueue) SetAvailable(file, node string) {
  182. q.amut.Lock()
  183. defer q.amut.Unlock()
  184. if q.availability == nil {
  185. q.availability = make(map[string][]string)
  186. }
  187. q.availability[file] = []string{node}
  188. }
  189. func (q *FileQueue) AddAvailable(file, node string) {
  190. q.amut.Lock()
  191. defer q.amut.Unlock()
  192. if q.availability == nil {
  193. q.availability = make(map[string][]string)
  194. }
  195. q.availability[file] = append(q.availability[file], node)
  196. }
  197. func (q *FileQueue) RemoveAvailable(toRemove string) {
  198. q.amut.Lock()
  199. defer q.amut.Unlock()
  200. for file, nodes := range q.availability {
  201. for i, node := range nodes {
  202. if node == toRemove {
  203. q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])]
  204. if len(q.availability[file]) == 0 {
  205. q.deleteFile(file)
  206. }
  207. }
  208. break
  209. }
  210. }
  211. }