model_puller.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package main
  2. /*
  3. Locking
  4. =======
  5. These methods are never called from the outside so don't follow the locking
  6. policy in model.go. Instead, appropriate locks are acquired when needed and
  7. held for as short a time as possible.
  8. TODO(jb): Refactor this into smaller and cleaner pieces.
  9. TODO(jb): Some kind of coalescing / rate limiting of index sending, so we don't
  10. send hundreds of index updates in a short period if time when deleting files
  11. etc.
  12. */
  13. import (
  14. "bytes"
  15. "fmt"
  16. "io"
  17. "os"
  18. "path"
  19. "sync"
  20. "time"
  21. "github.com/calmh/syncthing/buffers"
  22. )
  23. func (m *Model) pullFile(name string) error {
  24. m.RLock()
  25. var localFile = m.local[name]
  26. var globalFile = m.global[name]
  27. m.RUnlock()
  28. filename := path.Join(m.dir, name)
  29. sdir := path.Dir(filename)
  30. _, err := os.Stat(sdir)
  31. if err != nil && os.IsNotExist(err) {
  32. os.MkdirAll(sdir, 0777)
  33. }
  34. tmpFilename := tempName(filename, globalFile.Modified)
  35. tmpFile, err := os.Create(tmpFilename)
  36. if err != nil {
  37. return err
  38. }
  39. defer tmpFile.Close()
  40. contentChan := make(chan content, 32)
  41. var applyDone sync.WaitGroup
  42. applyDone.Add(1)
  43. go func() {
  44. applyContent(contentChan, tmpFile)
  45. applyDone.Done()
  46. }()
  47. local, remote := localFile.Blocks.To(globalFile.Blocks)
  48. var fetchDone sync.WaitGroup
  49. // One local copy routing
  50. fetchDone.Add(1)
  51. go func() {
  52. for _, block := range local {
  53. data, err := m.Request("<local>", name, block.Offset, block.Length, block.Hash)
  54. if err != nil {
  55. break
  56. }
  57. contentChan <- content{
  58. offset: int64(block.Offset),
  59. data: data,
  60. }
  61. }
  62. fetchDone.Done()
  63. }()
  64. // N remote copy routines
  65. m.RLock()
  66. var nodeIDs = m.whoHas(name)
  67. m.RUnlock()
  68. var remoteBlocksChan = make(chan Block)
  69. go func() {
  70. for _, block := range remote {
  71. remoteBlocksChan <- block
  72. }
  73. close(remoteBlocksChan)
  74. }()
  75. // XXX: This should be rewritten into something nicer that takes differing
  76. // peer performance into account.
  77. for i := 0; i < RemoteFetchers; i++ {
  78. for _, nodeID := range nodeIDs {
  79. fetchDone.Add(1)
  80. go func(nodeID string) {
  81. for block := range remoteBlocksChan {
  82. data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash)
  83. if err != nil {
  84. break
  85. }
  86. contentChan <- content{
  87. offset: int64(block.Offset),
  88. data: data,
  89. }
  90. }
  91. fetchDone.Done()
  92. }(nodeID)
  93. }
  94. }
  95. fetchDone.Wait()
  96. close(contentChan)
  97. applyDone.Wait()
  98. rf, err := os.Open(tmpFilename)
  99. if err != nil {
  100. return err
  101. }
  102. defer rf.Close()
  103. writtenBlocks, err := Blocks(rf, BlockSize)
  104. if err != nil {
  105. return err
  106. }
  107. if len(writtenBlocks) != len(globalFile.Blocks) {
  108. return fmt.Errorf("%s: incorrect number of blocks after sync", tmpFilename)
  109. }
  110. for i := range writtenBlocks {
  111. if bytes.Compare(writtenBlocks[i].Hash, globalFile.Blocks[i].Hash) != 0 {
  112. return fmt.Errorf("%s: hash mismatch after sync\n %v\n %v", tmpFilename, writtenBlocks[i], globalFile.Blocks[i])
  113. }
  114. }
  115. err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0))
  116. if err != nil {
  117. return err
  118. }
  119. err = os.Rename(tmpFilename, filename)
  120. if err != nil {
  121. return err
  122. }
  123. return nil
  124. }
  125. func (m *Model) puller() {
  126. for {
  127. for {
  128. var n string
  129. var f File
  130. m.RLock()
  131. for n = range m.need {
  132. break // just pick first name
  133. }
  134. if len(n) != 0 {
  135. f = m.global[n]
  136. }
  137. m.RUnlock()
  138. if len(n) == 0 {
  139. // we got nothing
  140. break
  141. }
  142. var err error
  143. if f.Flags&FlagDeleted == 0 {
  144. if traceFile {
  145. debugf("FILE: Pull %q", n)
  146. }
  147. err = m.pullFile(n)
  148. } else {
  149. if traceFile {
  150. debugf("FILE: Remove %q", n)
  151. }
  152. // Cheerfully ignore errors here
  153. _ = os.Remove(path.Join(m.dir, n))
  154. }
  155. if err == nil {
  156. m.UpdateLocal(f)
  157. } else {
  158. warnln(err)
  159. }
  160. }
  161. time.Sleep(time.Second)
  162. }
  163. }
  164. type content struct {
  165. offset int64
  166. data []byte
  167. }
  168. func applyContent(cc <-chan content, dst io.WriterAt) error {
  169. var err error
  170. for c := range cc {
  171. _, err = dst.WriteAt(c.data, c.offset)
  172. if err != nil {
  173. return err
  174. }
  175. buffers.Put(c.data)
  176. }
  177. return nil
  178. }