blockqueue.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
  2. // Use of this source code is governed by an MIT-style license that can be
  3. // found in the LICENSE file.
  4. package model
  5. import (
  6. "sync"
  7. "github.com/calmh/syncthing/scanner"
  8. )
  9. type bqAdd struct {
  10. file scanner.File
  11. have []scanner.Block
  12. need []scanner.Block
  13. }
  14. type bqBlock struct {
  15. file scanner.File
  16. block scanner.Block // get this block from the network
  17. copy []scanner.Block // copy these blocks from the old version of the file
  18. first bool
  19. last bool
  20. }
  21. type blockQueue struct {
  22. inbox chan bqAdd
  23. outbox chan bqBlock
  24. queued []bqBlock
  25. mut sync.Mutex
  26. }
  27. func newBlockQueue() *blockQueue {
  28. q := &blockQueue{
  29. inbox: make(chan bqAdd),
  30. outbox: make(chan bqBlock),
  31. }
  32. go q.run()
  33. return q
  34. }
  35. func (q *blockQueue) addBlock(a bqAdd) {
  36. q.mut.Lock()
  37. defer q.mut.Unlock()
  38. // If we already have it queued, return
  39. for _, b := range q.queued {
  40. if b.file.Name == a.file.Name {
  41. return
  42. }
  43. }
  44. l := len(a.need)
  45. if len(a.have) > 0 {
  46. // First queue a copy operation
  47. q.queued = append(q.queued, bqBlock{
  48. file: a.file,
  49. copy: a.have,
  50. first: true,
  51. last: l == 0,
  52. })
  53. }
  54. // Queue the needed blocks individually
  55. for i, b := range a.need {
  56. q.queued = append(q.queued, bqBlock{
  57. file: a.file,
  58. block: b,
  59. first: len(a.have) == 0 && i == 0,
  60. last: i == l-1,
  61. })
  62. }
  63. if len(a.need)+len(a.have) == 0 {
  64. // If we didn't have anything to fetch, queue an empty block with the "last" flag set to close the file.
  65. q.queued = append(q.queued, bqBlock{
  66. file: a.file,
  67. last: true,
  68. })
  69. }
  70. }
  71. func (q *blockQueue) run() {
  72. for {
  73. if len(q.queued) == 0 {
  74. q.addBlock(<-q.inbox)
  75. } else {
  76. q.mut.Lock()
  77. next := q.queued[0]
  78. q.mut.Unlock()
  79. select {
  80. case a := <-q.inbox:
  81. q.addBlock(a)
  82. case q.outbox <- next:
  83. q.mut.Lock()
  84. q.queued = q.queued[1:]
  85. q.mut.Unlock()
  86. }
  87. }
  88. }
  89. }
  90. func (q *blockQueue) put(a bqAdd) {
  91. q.inbox <- a
  92. }
  93. func (q *blockQueue) get() bqBlock {
  94. return <-q.outbox
  95. }
  96. func (q *blockQueue) empty() bool {
  97. q.mut.Lock()
  98. defer q.mut.Unlock()
  99. return len(q.queued) == 0
  100. }