filequeue.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package main
  2. import (
  3. "log"
  4. "sort"
  5. "sync"
  6. "time"
  7. "github.com/calmh/syncthing/scanner"
  8. )
  9. type Monitor interface {
  10. FileBegins(<-chan content) error
  11. FileDone() error
  12. }
  13. type FileQueue struct {
  14. files queuedFileList
  15. sorted bool
  16. fmut sync.Mutex // protects files and sorted
  17. availability map[string][]string
  18. amut sync.Mutex // protects availability
  19. queued map[string]bool
  20. }
  21. type queuedFile struct {
  22. name string
  23. blocks []scanner.Block
  24. activeBlocks []bool
  25. given int
  26. remaining int
  27. channel chan content
  28. nodes []string
  29. nodesChecked time.Time
  30. monitor Monitor
  31. }
  32. type content struct {
  33. offset int64
  34. data []byte
  35. }
  36. type queuedFileList []queuedFile
  37. func (l queuedFileList) Len() int { return len(l) }
  38. func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
  39. func (l queuedFileList) Less(a, b int) bool {
  40. // Sort by most blocks already given out, then alphabetically
  41. if l[a].given != l[b].given {
  42. return l[a].given > l[b].given
  43. }
  44. return l[a].name < l[b].name
  45. }
  46. type queuedBlock struct {
  47. name string
  48. block scanner.Block
  49. index int
  50. }
  51. func NewFileQueue() *FileQueue {
  52. return &FileQueue{
  53. availability: make(map[string][]string),
  54. queued: make(map[string]bool),
  55. }
  56. }
  57. func (q *FileQueue) Add(name string, blocks []scanner.Block, monitor Monitor) {
  58. q.fmut.Lock()
  59. defer q.fmut.Unlock()
  60. if q.queued[name] {
  61. return
  62. }
  63. q.files = append(q.files, queuedFile{
  64. name: name,
  65. blocks: blocks,
  66. activeBlocks: make([]bool, len(blocks)),
  67. remaining: len(blocks),
  68. channel: make(chan content),
  69. monitor: monitor,
  70. })
  71. q.queued[name] = true
  72. q.sorted = false
  73. }
  74. func (q *FileQueue) Len() int {
  75. q.fmut.Lock()
  76. defer q.fmut.Unlock()
  77. return len(q.files)
  78. }
  79. func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
  80. q.fmut.Lock()
  81. defer q.fmut.Unlock()
  82. if !q.sorted {
  83. sort.Sort(q.files)
  84. q.sorted = true
  85. }
  86. for i := range q.files {
  87. qf := &q.files[i]
  88. q.amut.Lock()
  89. av := q.availability[qf.name]
  90. q.amut.Unlock()
  91. if len(av) == 0 {
  92. // Noone has the file we want; abort.
  93. if qf.remaining != len(qf.blocks) {
  94. // We have already started on this file; close it down
  95. close(qf.channel)
  96. if mon := qf.monitor; mon != nil {
  97. mon.FileDone()
  98. }
  99. }
  100. delete(q.queued, qf.name)
  101. q.deleteAt(i)
  102. return queuedBlock{}, false
  103. }
  104. for _, ni := range av {
  105. // Find and return the next block in the queue
  106. if ni == nodeID {
  107. for j, b := range qf.blocks {
  108. if !qf.activeBlocks[j] {
  109. qf.activeBlocks[j] = true
  110. qf.given++
  111. return queuedBlock{
  112. name: qf.name,
  113. block: b,
  114. index: j,
  115. }, true
  116. }
  117. }
  118. break
  119. }
  120. }
  121. }
  122. // We found nothing to do
  123. return queuedBlock{}, false
  124. }
  125. func (q *FileQueue) Done(file string, offset int64, data []byte) {
  126. q.fmut.Lock()
  127. defer q.fmut.Unlock()
  128. c := content{
  129. offset: offset,
  130. data: data,
  131. }
  132. for i := range q.files {
  133. qf := &q.files[i]
  134. if qf.name == file {
  135. if qf.monitor != nil && qf.remaining == len(qf.blocks) {
  136. err := qf.monitor.FileBegins(qf.channel)
  137. if err != nil {
  138. log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
  139. delete(q.queued, qf.name)
  140. q.deleteAt(i)
  141. return
  142. }
  143. }
  144. qf.channel <- c
  145. qf.remaining--
  146. if qf.remaining == 0 {
  147. close(qf.channel)
  148. if qf.monitor != nil {
  149. err := qf.monitor.FileDone()
  150. if err != nil {
  151. log.Printf("WARNING: %s: %v", qf.name, err)
  152. }
  153. }
  154. delete(q.queued, qf.name)
  155. q.deleteAt(i)
  156. }
  157. return
  158. }
  159. }
  160. // We found nothing, might have errored out already
  161. }
  162. func (q *FileQueue) QueuedFiles() (files []string) {
  163. q.fmut.Lock()
  164. defer q.fmut.Unlock()
  165. for _, qf := range q.files {
  166. files = append(files, qf.name)
  167. }
  168. return
  169. }
  170. func (q *FileQueue) deleteAt(i int) {
  171. q.files = append(q.files[:i], q.files[i+1:]...)
  172. }
  173. func (q *FileQueue) deleteFile(n string) {
  174. for i, file := range q.files {
  175. if n == file.name {
  176. q.deleteAt(i)
  177. delete(q.queued, file.name)
  178. return
  179. }
  180. }
  181. }
  182. func (q *FileQueue) SetAvailable(file string, nodes []string) {
  183. q.amut.Lock()
  184. defer q.amut.Unlock()
  185. q.availability[file] = nodes
  186. }
  187. func (q *FileQueue) RemoveAvailable(toRemove string) {
  188. q.fmut.Lock()
  189. q.amut.Lock()
  190. defer q.amut.Unlock()
  191. defer q.fmut.Unlock()
  192. for file, nodes := range q.availability {
  193. for i, node := range nodes {
  194. if node == toRemove {
  195. q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])]
  196. if len(q.availability[file]) == 0 {
  197. q.deleteFile(file)
  198. }
  199. }
  200. break
  201. }
  202. }
  203. }