sharedpullerstate.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "io"
  9. "os"
  10. "path/filepath"
  11. "github.com/syncthing/syncthing/lib/db"
  12. "github.com/syncthing/syncthing/lib/protocol"
  13. "github.com/syncthing/syncthing/lib/sync"
  14. )
  15. // A sharedPullerState is kept for each file that is being synced and is kept
  16. // updated along the way.
  17. type sharedPullerState struct {
  18. // Immutable, does not require locking
  19. file protocol.FileInfo // The new file (desired end state)
  20. folder string
  21. tempName string
  22. realName string
  23. reused int // Number of blocks reused from temporary file
  24. ignorePerms bool
  25. version protocol.Vector // The current (old) version
  26. sparse bool
  27. // Mutable, must be locked for access
  28. err error // The first error we hit
  29. fd *os.File // The fd of the temp file
  30. copyTotal int // Total number of copy actions for the whole job
  31. pullTotal int // Total number of pull actions for the whole job
  32. copyOrigin int // Number of blocks copied from the original file
  33. copyNeeded int // Number of copy actions still pending
  34. pullNeeded int // Number of block pulls still pending
  35. closed bool // True if the file has been finalClosed.
  36. mut sync.Mutex // Protects the above
  37. }
  38. // A momentary state representing the progress of the puller
  39. type pullerProgress struct {
  40. Total int `json:"total"`
  41. Reused int `json:"reused"`
  42. CopiedFromOrigin int `json:"copiedFromOrigin"`
  43. CopiedFromElsewhere int `json:"copiedFromElsewhere"`
  44. Pulled int `json:"pulled"`
  45. Pulling int `json:"pulling"`
  46. BytesDone int64 `json:"bytesDone"`
  47. BytesTotal int64 `json:"bytesTotal"`
  48. }
  49. // A lockedWriterAt synchronizes WriteAt calls with an external mutex.
  50. // WriteAt() is goroutine safe by itself, but not against for example Close().
  51. type lockedWriterAt struct {
  52. mut *sync.Mutex
  53. wr io.WriterAt
  54. }
  55. func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  56. (*w.mut).Lock()
  57. defer (*w.mut).Unlock()
  58. return w.wr.WriteAt(p, off)
  59. }
  60. // tempFile returns the fd for the temporary file, reusing an open fd
  61. // or creating the file as necessary.
  62. func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
  63. s.mut.Lock()
  64. defer s.mut.Unlock()
  65. // If we've already hit an error, return early
  66. if s.err != nil {
  67. return nil, s.err
  68. }
  69. // If the temp file is already open, return the file descriptor
  70. if s.fd != nil {
  71. return lockedWriterAt{&s.mut, s.fd}, nil
  72. }
  73. // Ensure that the parent directory is writable. This is
  74. // osutil.InWritableDir except we need to do more stuff so we duplicate it
  75. // here.
  76. dir := filepath.Dir(s.tempName)
  77. if info, err := os.Stat(dir); err != nil {
  78. if os.IsNotExist(err) {
  79. // XXX: This works around a bug elsewhere, a race condition when
  80. // things are deleted while being synced. However that happens, we
  81. // end up with a directory for "foo" with the delete bit, but a
  82. // file "foo/bar" that we want to sync. We never create the
  83. // directory, and hence fail to create the file and end up looping
  84. // forever on it. This breaks that by creating the directory; on
  85. // next scan it'll be found and the delete bit on it is removed.
  86. // The user can then clean up as they like...
  87. l.Infoln("Resurrecting directory", dir)
  88. if err := os.MkdirAll(dir, 0755); err != nil {
  89. s.failLocked("resurrect dir", err)
  90. return nil, err
  91. }
  92. } else {
  93. s.failLocked("dst stat dir", err)
  94. return nil, err
  95. }
  96. } else if info.Mode()&0200 == 0 {
  97. err := os.Chmod(dir, 0755)
  98. if !s.ignorePerms && err == nil {
  99. defer func() {
  100. err := os.Chmod(dir, info.Mode().Perm())
  101. if err != nil {
  102. panic(err)
  103. }
  104. }()
  105. }
  106. }
  107. // Attempt to create the temp file
  108. flags := os.O_WRONLY
  109. if s.reused == 0 {
  110. flags |= os.O_CREATE | os.O_EXCL
  111. } else {
  112. // With sufficiently bad luck when exiting or crashing, we may have
  113. // had time to chmod the temp file to read only state but not yet
  114. // moved it to it's final name. This leaves us with a read only temp
  115. // file that we're going to try to reuse. To handle that, we need to
  116. // make sure we have write permissions on the file before opening it.
  117. err := os.Chmod(s.tempName, 0644)
  118. if !s.ignorePerms && err != nil {
  119. s.failLocked("dst create chmod", err)
  120. return nil, err
  121. }
  122. }
  123. fd, err := os.OpenFile(s.tempName, flags, 0666)
  124. if err != nil {
  125. s.failLocked("dst create", err)
  126. return nil, err
  127. }
  128. if s.sparse {
  129. // Truncate sets the size of the file. This creates a sparse file or a
  130. // space reservation, depending on the underlying filesystem.
  131. if err := fd.Truncate(s.file.Size()); err != nil {
  132. s.failLocked("dst truncate", err)
  133. return nil, err
  134. }
  135. }
  136. // Same fd will be used by all writers
  137. s.fd = fd
  138. return lockedWriterAt{&s.mut, s.fd}, nil
  139. }
  140. // sourceFile opens the existing source file for reading
  141. func (s *sharedPullerState) sourceFile() (*os.File, error) {
  142. s.mut.Lock()
  143. defer s.mut.Unlock()
  144. // If we've already hit an error, return early
  145. if s.err != nil {
  146. return nil, s.err
  147. }
  148. // Attempt to open the existing file
  149. fd, err := os.Open(s.realName)
  150. if err != nil {
  151. s.failLocked("src open", err)
  152. return nil, err
  153. }
  154. return fd, nil
  155. }
  156. // earlyClose prints a warning message composed of the context and
  157. // error, and marks the sharedPullerState as failed. Is a no-op when called on
  158. // an already failed state.
  159. func (s *sharedPullerState) fail(context string, err error) {
  160. s.mut.Lock()
  161. defer s.mut.Unlock()
  162. s.failLocked(context, err)
  163. }
  164. func (s *sharedPullerState) failLocked(context string, err error) {
  165. if s.err != nil {
  166. return
  167. }
  168. l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
  169. s.err = err
  170. }
  171. func (s *sharedPullerState) failed() error {
  172. s.mut.Lock()
  173. defer s.mut.Unlock()
  174. return s.err
  175. }
  176. func (s *sharedPullerState) copyDone() {
  177. s.mut.Lock()
  178. s.copyNeeded--
  179. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  180. s.mut.Unlock()
  181. }
  182. func (s *sharedPullerState) copiedFromOrigin() {
  183. s.mut.Lock()
  184. s.copyOrigin++
  185. s.mut.Unlock()
  186. }
  187. func (s *sharedPullerState) pullStarted() {
  188. s.mut.Lock()
  189. s.copyTotal--
  190. s.copyNeeded--
  191. s.pullTotal++
  192. s.pullNeeded++
  193. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  194. s.mut.Unlock()
  195. }
  196. func (s *sharedPullerState) pullDone() {
  197. s.mut.Lock()
  198. s.pullNeeded--
  199. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  200. s.mut.Unlock()
  201. }
  202. // finalClose atomically closes and returns closed status of a file. A true
  203. // first return value means the file was closed and should be finished, with
  204. // the error indicating the success or failure of the close. A false first
  205. // return value indicates the file is not ready to be closed, or is already
  206. // closed and should in either case not be finished off now.
  207. func (s *sharedPullerState) finalClose() (bool, error) {
  208. s.mut.Lock()
  209. defer s.mut.Unlock()
  210. if s.closed {
  211. // Already closed
  212. return false, nil
  213. }
  214. if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
  215. // Not done yet, and not errored
  216. return false, nil
  217. }
  218. if s.fd != nil {
  219. if closeErr := s.fd.Close(); closeErr != nil && s.err == nil {
  220. // This is our error if we weren't errored before. Otherwise we
  221. // keep the earlier error.
  222. s.err = closeErr
  223. }
  224. s.fd = nil
  225. }
  226. s.closed = true
  227. return true, s.err
  228. }
  229. // Returns the momentarily progress for the puller
  230. func (s *sharedPullerState) Progress() *pullerProgress {
  231. s.mut.Lock()
  232. defer s.mut.Unlock()
  233. total := s.reused + s.copyTotal + s.pullTotal
  234. done := total - s.copyNeeded - s.pullNeeded
  235. return &pullerProgress{
  236. Total: total,
  237. Reused: s.reused,
  238. CopiedFromOrigin: s.copyOrigin,
  239. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  240. Pulled: s.pullTotal - s.pullNeeded,
  241. Pulling: s.pullNeeded,
  242. BytesTotal: db.BlocksToSize(total),
  243. BytesDone: db.BlocksToSize(done),
  244. }
  245. }