blockqueue.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package model
  2. import (
  3. "sync"
  4. "github.com/calmh/syncthing/scanner"
  5. )
  6. type bqAdd struct {
  7. file scanner.File
  8. have []scanner.Block
  9. need []scanner.Block
  10. }
  11. type bqBlock struct {
  12. file scanner.File
  13. block scanner.Block // get this block from the network
  14. copy []scanner.Block // copy these blocks from the old version of the file
  15. last bool
  16. }
  17. type blockQueue struct {
  18. inbox chan bqAdd
  19. outbox chan bqBlock
  20. queued []bqBlock
  21. mut sync.Mutex
  22. }
  23. func newBlockQueue() *blockQueue {
  24. q := &blockQueue{
  25. inbox: make(chan bqAdd),
  26. outbox: make(chan bqBlock),
  27. }
  28. go q.run()
  29. return q
  30. }
  31. func (q *blockQueue) addBlock(a bqAdd) {
  32. q.mut.Lock()
  33. defer q.mut.Unlock()
  34. // If we already have it queued, return
  35. for _, b := range q.queued {
  36. if b.file.Name == a.file.Name {
  37. return
  38. }
  39. }
  40. if len(a.have) > 0 {
  41. // First queue a copy operation
  42. q.queued = append(q.queued, bqBlock{
  43. file: a.file,
  44. copy: a.have,
  45. })
  46. }
  47. // Queue the needed blocks individually
  48. l := len(a.need)
  49. for i, b := range a.need {
  50. q.queued = append(q.queued, bqBlock{
  51. file: a.file,
  52. block: b,
  53. last: i == l-1,
  54. })
  55. }
  56. if l == 0 {
  57. // If we didn't have anything to fetch, queue an empty block with the "last" flag set to close the file.
  58. q.queued = append(q.queued, bqBlock{
  59. file: a.file,
  60. last: true,
  61. })
  62. }
  63. }
  64. func (q *blockQueue) run() {
  65. for {
  66. if len(q.queued) == 0 {
  67. q.addBlock(<-q.inbox)
  68. } else {
  69. q.mut.Lock()
  70. next := q.queued[0]
  71. q.mut.Unlock()
  72. select {
  73. case a := <-q.inbox:
  74. q.addBlock(a)
  75. case q.outbox <- next:
  76. q.mut.Lock()
  77. q.queued = q.queued[1:]
  78. q.mut.Unlock()
  79. }
  80. }
  81. }
  82. }
  83. func (q *blockQueue) put(a bqAdd) {
  84. q.inbox <- a
  85. }
  86. func (q *blockQueue) get() bqBlock {
  87. return <-q.outbox
  88. }
  89. func (q *blockQueue) empty() bool {
  90. q.mut.Lock()
  91. defer q.mut.Unlock()
  92. return len(q.queued) == 0
  93. }