sharedpullerstate.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "io"
  9. "time"
  10. "github.com/pkg/errors"
  11. "github.com/syncthing/syncthing/lib/fs"
  12. "github.com/syncthing/syncthing/lib/protocol"
  13. "github.com/syncthing/syncthing/lib/sync"
  14. )
  15. // A sharedPullerState is kept for each file that is being synced and is kept
  16. // updated along the way.
  17. type sharedPullerState struct {
  18. // Immutable, does not require locking
  19. file protocol.FileInfo // The new file (desired end state)
  20. fs fs.Filesystem
  21. folder string
  22. tempName string
  23. realName string
  24. reused int // Number of blocks reused from temporary file
  25. ignorePerms bool
  26. hasCurFile bool // Whether curFile is set
  27. curFile protocol.FileInfo // The file as it exists now in our database
  28. sparse bool
  29. created time.Time
  30. // Mutable, must be locked for access
  31. err error // The first error we hit
  32. fd fs.File // The fd of the temp file
  33. copyTotal int // Total number of copy actions for the whole job
  34. pullTotal int // Total number of pull actions for the whole job
  35. copyOrigin int // Number of blocks copied from the original file
  36. copyOriginShifted int // Number of blocks copied from the original file but shifted
  37. copyNeeded int // Number of copy actions still pending
  38. pullNeeded int // Number of block pulls still pending
  39. updated time.Time // Time when any of the counters above were last updated
  40. closed bool // True if the file has been finalClosed.
  41. available []int32 // Indexes of the blocks that are available in the temporary file
  42. availableUpdated time.Time // Time when list of available blocks was last updated
  43. mut sync.RWMutex // Protects the above
  44. }
  45. // A momentary state representing the progress of the puller
  46. type pullerProgress struct {
  47. Total int `json:"total"`
  48. Reused int `json:"reused"`
  49. CopiedFromOrigin int `json:"copiedFromOrigin"`
  50. CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
  51. CopiedFromElsewhere int `json:"copiedFromElsewhere"`
  52. Pulled int `json:"pulled"`
  53. Pulling int `json:"pulling"`
  54. BytesDone int64 `json:"bytesDone"`
  55. BytesTotal int64 `json:"bytesTotal"`
  56. }
  57. // A lockedWriterAt synchronizes WriteAt calls with an external mutex.
  58. // WriteAt() is goroutine safe by itself, but not against for example Close().
  59. type lockedWriterAt struct {
  60. mut *sync.RWMutex
  61. wr io.WriterAt
  62. }
  63. func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  64. (*w.mut).Lock()
  65. defer (*w.mut).Unlock()
  66. return w.wr.WriteAt(p, off)
  67. }
  68. // tempFile returns the fd for the temporary file, reusing an open fd
  69. // or creating the file as necessary.
  70. func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
  71. s.mut.Lock()
  72. defer s.mut.Unlock()
  73. // If we've already hit an error, return early
  74. if s.err != nil {
  75. return nil, s.err
  76. }
  77. // If the temp file is already open, return the file descriptor
  78. if s.fd != nil {
  79. return lockedWriterAt{&s.mut, s.fd}, nil
  80. }
  81. if err := inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms); err != nil {
  82. s.failLocked(err)
  83. return nil, err
  84. }
  85. return lockedWriterAt{&s.mut, s.fd}, nil
  86. }
  87. // tempFileInWritableDir should only be called from tempFile.
  88. func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
  89. // The permissions to use for the temporary file should be those of the
  90. // final file, except we need user read & write at minimum. The
  91. // permissions will be set to the final value later, but in the meantime
  92. // we don't want to have a temporary file with looser permissions than
  93. // the final outcome.
  94. mode := fs.FileMode(s.file.Permissions) | 0600
  95. if s.ignorePerms {
  96. // When ignorePerms is set we use a very permissive mode and let the
  97. // system umask filter it.
  98. mode = 0666
  99. }
  100. // Attempt to create the temp file
  101. // RDWR because of issue #2994.
  102. flags := fs.OptReadWrite
  103. if s.reused == 0 {
  104. flags |= fs.OptCreate | fs.OptExclusive
  105. } else if !s.ignorePerms {
  106. // With sufficiently bad luck when exiting or crashing, we may have
  107. // had time to chmod the temp file to read only state but not yet
  108. // moved it to its final name. This leaves us with a read only temp
  109. // file that we're going to try to reuse. To handle that, we need to
  110. // make sure we have write permissions on the file before opening it.
  111. //
  112. // When ignorePerms is set we trust that the permissions are fine
  113. // already and make no modification, as we would otherwise override
  114. // what the umask dictates.
  115. if err := s.fs.Chmod(s.tempName, mode); err != nil {
  116. return errors.Wrap(err, "setting perms on temp file")
  117. }
  118. }
  119. fd, err := s.fs.OpenFile(s.tempName, flags, mode)
  120. if err != nil {
  121. return errors.Wrap(err, "opening temp file")
  122. }
  123. // Hide the temporary file
  124. s.fs.Hide(s.tempName)
  125. // Don't truncate symlink files, as that will mean that the path will
  126. // contain a bunch of nulls.
  127. if s.sparse && !s.file.IsSymlink() {
  128. // Truncate sets the size of the file. This creates a sparse file or a
  129. // space reservation, depending on the underlying filesystem.
  130. if err := fd.Truncate(s.file.Size); err != nil {
  131. // The truncate call failed. That can happen in some cases when
  132. // space reservation isn't possible or over some network
  133. // filesystems... This generally doesn't matter.
  134. if s.reused > 0 {
  135. // ... but if we are attempting to reuse a file we have a
  136. // corner case when the old file is larger than the new one
  137. // and we can't just overwrite blocks and let the old data
  138. // linger at the end. In this case we attempt a delete of
  139. // the file and hope for better luck next time, when we
  140. // should come around with s.reused == 0.
  141. fd.Close()
  142. if remErr := s.fs.Remove(s.tempName); remErr != nil {
  143. l.Debugln("failed to remove temporary file:", remErr)
  144. }
  145. return err
  146. }
  147. }
  148. }
  149. // Same fd will be used by all writers
  150. s.fd = fd
  151. return nil
  152. }
  153. // fail sets the error on the puller state compose of error, and marks the
  154. // sharedPullerState as failed. Is a no-op when called on an already failed state.
  155. func (s *sharedPullerState) fail(err error) {
  156. s.mut.Lock()
  157. defer s.mut.Unlock()
  158. s.failLocked(err)
  159. }
  160. func (s *sharedPullerState) failLocked(err error) {
  161. if s.err != nil || err == nil {
  162. return
  163. }
  164. s.err = err
  165. }
  166. func (s *sharedPullerState) failed() error {
  167. s.mut.RLock()
  168. err := s.err
  169. s.mut.RUnlock()
  170. return err
  171. }
  172. func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
  173. s.mut.Lock()
  174. s.copyNeeded--
  175. s.updated = time.Now()
  176. s.available = append(s.available, int32(block.Offset/int64(s.file.BlockSize())))
  177. s.availableUpdated = time.Now()
  178. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  179. s.mut.Unlock()
  180. }
  181. func (s *sharedPullerState) copiedFromOrigin() {
  182. s.mut.Lock()
  183. s.copyOrigin++
  184. s.updated = time.Now()
  185. s.mut.Unlock()
  186. }
  187. func (s *sharedPullerState) copiedFromOriginShifted() {
  188. s.mut.Lock()
  189. s.copyOrigin++
  190. s.copyOriginShifted++
  191. s.updated = time.Now()
  192. s.mut.Unlock()
  193. }
  194. func (s *sharedPullerState) pullStarted() {
  195. s.mut.Lock()
  196. s.copyTotal--
  197. s.copyNeeded--
  198. s.pullTotal++
  199. s.pullNeeded++
  200. s.updated = time.Now()
  201. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  202. s.mut.Unlock()
  203. }
  204. func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
  205. s.mut.Lock()
  206. s.pullNeeded--
  207. s.updated = time.Now()
  208. s.available = append(s.available, int32(block.Offset/int64(s.file.BlockSize())))
  209. s.availableUpdated = time.Now()
  210. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  211. s.mut.Unlock()
  212. }
  213. // finalClose atomically closes and returns closed status of a file. A true
  214. // first return value means the file was closed and should be finished, with
  215. // the error indicating the success or failure of the close. A false first
  216. // return value indicates the file is not ready to be closed, or is already
  217. // closed and should in either case not be finished off now.
  218. func (s *sharedPullerState) finalClose() (bool, error) {
  219. s.mut.Lock()
  220. defer s.mut.Unlock()
  221. if s.closed {
  222. // Already closed
  223. return false, nil
  224. }
  225. if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
  226. // Not done yet, and not errored
  227. return false, nil
  228. }
  229. if s.fd != nil {
  230. if err := s.fd.Sync(); err != nil {
  231. // Sync() is nice if it works but not worth failing the
  232. // operation over if it fails.
  233. l.Debugf("fsync %q failed: %v", s.tempName, err)
  234. }
  235. if err := s.fd.Close(); err != nil && s.err == nil {
  236. // This is our error as we weren't errored before.
  237. s.err = err
  238. }
  239. s.fd = nil
  240. }
  241. s.closed = true
  242. // Unhide the temporary file when we close it, as it's likely to
  243. // immediately be renamed to the final name. If this is a failed temp
  244. // file we will also unhide it, but I'm fine with that as we're now
  245. // leaving it around for potentially quite a while.
  246. s.fs.Unhide(s.tempName)
  247. return true, s.err
  248. }
  249. // Progress returns the momentarily progress for the puller
  250. func (s *sharedPullerState) Progress() *pullerProgress {
  251. s.mut.RLock()
  252. defer s.mut.RUnlock()
  253. total := s.reused + s.copyTotal + s.pullTotal
  254. done := total - s.copyNeeded - s.pullNeeded
  255. return &pullerProgress{
  256. Total: total,
  257. Reused: s.reused,
  258. CopiedFromOrigin: s.copyOrigin,
  259. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  260. Pulled: s.pullTotal - s.pullNeeded,
  261. Pulling: s.pullNeeded,
  262. BytesTotal: blocksToSize(s.file.BlockSize(), total),
  263. BytesDone: blocksToSize(s.file.BlockSize(), done),
  264. }
  265. }
  266. // Updated returns the time when any of the progress related counters was last updated.
  267. func (s *sharedPullerState) Updated() time.Time {
  268. s.mut.RLock()
  269. t := s.updated
  270. s.mut.RUnlock()
  271. return t
  272. }
  273. // AvailableUpdated returns the time last time list of available blocks was updated
  274. func (s *sharedPullerState) AvailableUpdated() time.Time {
  275. s.mut.RLock()
  276. t := s.availableUpdated
  277. s.mut.RUnlock()
  278. return t
  279. }
  280. // Available returns blocks available in the current temporary file
  281. func (s *sharedPullerState) Available() []int32 {
  282. s.mut.RLock()
  283. blocks := s.available
  284. s.mut.RUnlock()
  285. return blocks
  286. }
  287. func blocksToSize(size int, num int) int64 {
  288. if num < 2 {
  289. return int64(size / 2)
  290. }
  291. return int64(num-1)*int64(size) + int64(size/2)
  292. }