model_puller.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package model
  2. /*
  3. Locking
  4. =======
  5. These methods are never called from the outside so don't follow the locking
  6. policy in model.go.
  7. TODO(jb): Refactor this into smaller and cleaner pieces.
  8. TODO(jb): Increase performance by taking apparent peer bandwidth into account.
  9. */
  10. import (
  11. "bytes"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "log"
  16. "os"
  17. "path"
  18. "sync"
  19. "time"
  20. "github.com/calmh/syncthing/buffers"
  21. )
  22. func (m *Model) pullFile(name string) error {
  23. m.RLock()
  24. var localFile = m.local[name]
  25. var globalFile = m.global[name]
  26. var nodeIDs = m.whoHas(name)
  27. m.RUnlock()
  28. if len(nodeIDs) == 0 {
  29. return fmt.Errorf("%s: no connected nodes with file available", name)
  30. }
  31. filename := path.Join(m.dir, name)
  32. sdir := path.Dir(filename)
  33. _, err := os.Stat(sdir)
  34. if err != nil && os.IsNotExist(err) {
  35. os.MkdirAll(sdir, 0777)
  36. }
  37. tmpFilename := tempName(filename, globalFile.Modified)
  38. tmpFile, err := os.Create(tmpFilename)
  39. if err != nil {
  40. return err
  41. }
  42. contentChan := make(chan content, 32)
  43. var applyDone sync.WaitGroup
  44. applyDone.Add(1)
  45. go func() {
  46. applyContent(contentChan, tmpFile)
  47. tmpFile.Close()
  48. applyDone.Done()
  49. }()
  50. local, remote := BlockDiff(localFile.Blocks, globalFile.Blocks)
  51. var fetchDone sync.WaitGroup
  52. // One local copy routine
  53. fetchDone.Add(1)
  54. go func() {
  55. for _, block := range local {
  56. data, err := m.Request("<local>", name, block.Offset, block.Length, block.Hash)
  57. if err != nil {
  58. break
  59. }
  60. contentChan <- content{
  61. offset: int64(block.Offset),
  62. data: data,
  63. }
  64. }
  65. fetchDone.Done()
  66. }()
  67. // N remote copy routines
  68. var remoteBlocks = blockIterator{blocks: remote}
  69. for i := 0; i < m.paralllelReqs; i++ {
  70. curNode := nodeIDs[i%len(nodeIDs)]
  71. fetchDone.Add(1)
  72. go func(nodeID string) {
  73. for {
  74. block, ok := remoteBlocks.Next()
  75. if !ok {
  76. break
  77. }
  78. data, err := m.requestGlobal(nodeID, name, block.Offset, block.Length, block.Hash)
  79. if err != nil {
  80. break
  81. }
  82. contentChan <- content{
  83. offset: int64(block.Offset),
  84. data: data,
  85. }
  86. }
  87. fetchDone.Done()
  88. }(curNode)
  89. }
  90. fetchDone.Wait()
  91. close(contentChan)
  92. applyDone.Wait()
  93. err = hashCheck(tmpFilename, globalFile.Blocks)
  94. if err != nil {
  95. return fmt.Errorf("%s: %s (deleting)", path.Base(name), err.Error())
  96. }
  97. err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0))
  98. if err != nil {
  99. return err
  100. }
  101. err = os.Rename(tmpFilename, filename)
  102. if err != nil {
  103. return err
  104. }
  105. return nil
  106. }
  107. func (m *Model) puller() {
  108. for {
  109. time.Sleep(time.Second)
  110. var ns []string
  111. m.RLock()
  112. for n := range m.need {
  113. ns = append(ns, n)
  114. }
  115. m.RUnlock()
  116. if len(ns) == 0 {
  117. continue
  118. }
  119. var limiter = make(chan bool, m.parallellFiles)
  120. var allDone sync.WaitGroup
  121. for _, n := range ns {
  122. limiter <- true
  123. allDone.Add(1)
  124. go func(n string) {
  125. defer func() {
  126. allDone.Done()
  127. <-limiter
  128. }()
  129. m.RLock()
  130. f, ok := m.global[n]
  131. m.RUnlock()
  132. if !ok {
  133. return
  134. }
  135. var err error
  136. if f.Flags&FlagDeleted == 0 {
  137. if m.trace["file"] {
  138. log.Printf("FILE: Pull %q", n)
  139. }
  140. err = m.pullFile(n)
  141. } else {
  142. if m.trace["file"] {
  143. log.Printf("FILE: Remove %q", n)
  144. }
  145. // Cheerfully ignore errors here
  146. _ = os.Remove(path.Join(m.dir, n))
  147. }
  148. if err == nil {
  149. m.Lock()
  150. m.updateLocal(f)
  151. m.Unlock()
  152. }
  153. }(n)
  154. }
  155. allDone.Wait()
  156. }
  157. }
  158. type content struct {
  159. offset int64
  160. data []byte
  161. }
  162. func applyContent(cc <-chan content, dst io.WriterAt) error {
  163. var err error
  164. for c := range cc {
  165. _, err = dst.WriteAt(c.data, c.offset)
  166. buffers.Put(c.data)
  167. if err != nil {
  168. return err
  169. }
  170. }
  171. return nil
  172. }
  173. func hashCheck(name string, correct []Block) error {
  174. rf, err := os.Open(name)
  175. if err != nil {
  176. return err
  177. }
  178. defer rf.Close()
  179. current, err := Blocks(rf, BlockSize)
  180. if err != nil {
  181. return err
  182. }
  183. if len(current) != len(correct) {
  184. return errors.New("incorrect number of blocks")
  185. }
  186. for i := range current {
  187. if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 {
  188. return fmt.Errorf("hash mismatch: %x != %x", current[i], correct[i])
  189. }
  190. }
  191. return nil
  192. }
  193. type blockIterator struct {
  194. sync.Mutex
  195. blocks []Block
  196. }
  197. func (i *blockIterator) Next() (b Block, ok bool) {
  198. i.Lock()
  199. defer i.Unlock()
  200. if len(i.blocks) == 0 {
  201. return
  202. }
  203. b, i.blocks = i.blocks[0], i.blocks[1:]
  204. ok = true
  205. return
  206. }