sharedpullerstate.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "encoding/binary"
  9. "time"
  10. "github.com/pkg/errors"
  11. "github.com/syncthing/syncthing/lib/fs"
  12. "github.com/syncthing/syncthing/lib/protocol"
  13. "github.com/syncthing/syncthing/lib/sync"
  14. )
  15. // A sharedPullerState is kept for each file that is being synced and is kept
  16. // updated along the way.
  17. type sharedPullerState struct {
  18. // Immutable, does not require locking
  19. file protocol.FileInfo // The new file (desired end state)
  20. fs fs.Filesystem
  21. folder string
  22. tempName string
  23. realName string
  24. reused int // Number of blocks reused from temporary file
  25. ignorePerms bool
  26. hasCurFile bool // Whether curFile is set
  27. curFile protocol.FileInfo // The file as it exists now in our database
  28. sparse bool
  29. created time.Time
  30. fsync bool
  31. // Mutable, must be locked for access
  32. err error // The first error we hit
  33. writer *lockedWriterAt // Wraps fd to prevent fd closing at the same time as writing
  34. copyTotal int // Total number of copy actions for the whole job
  35. pullTotal int // Total number of pull actions for the whole job
  36. copyOrigin int // Number of blocks copied from the original file
  37. copyOriginShifted int // Number of blocks copied from the original file but shifted
  38. copyNeeded int // Number of copy actions still pending
  39. pullNeeded int // Number of block pulls still pending
  40. updated time.Time // Time when any of the counters above were last updated
  41. closed bool // True if the file has been finalClosed.
  42. available []int // Indexes of the blocks that are available in the temporary file
  43. availableUpdated time.Time // Time when list of available blocks was last updated
  44. mut sync.RWMutex // Protects the above
  45. }
  46. func newSharedPullerState(file protocol.FileInfo, fs fs.Filesystem, folderID, tempName string, blocks []protocol.BlockInfo, reused []int, ignorePerms, hasCurFile bool, curFile protocol.FileInfo, sparse bool, fsync bool) *sharedPullerState {
  47. return &sharedPullerState{
  48. file: file,
  49. fs: fs,
  50. folder: folderID,
  51. tempName: tempName,
  52. realName: file.Name,
  53. copyTotal: len(blocks),
  54. copyNeeded: len(blocks),
  55. reused: len(reused),
  56. updated: time.Now(),
  57. available: reused,
  58. availableUpdated: time.Now(),
  59. ignorePerms: ignorePerms,
  60. hasCurFile: hasCurFile,
  61. curFile: curFile,
  62. mut: sync.NewRWMutex(),
  63. sparse: sparse,
  64. fsync: fsync,
  65. created: time.Now(),
  66. }
  67. }
  68. // A momentary state representing the progress of the puller
  69. type pullerProgress struct {
  70. Total int `json:"total"`
  71. Reused int `json:"reused"`
  72. CopiedFromOrigin int `json:"copiedFromOrigin"`
  73. CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
  74. CopiedFromElsewhere int `json:"copiedFromElsewhere"`
  75. Pulled int `json:"pulled"`
  76. Pulling int `json:"pulling"`
  77. BytesDone int64 `json:"bytesDone"`
  78. BytesTotal int64 `json:"bytesTotal"`
  79. }
  80. // lockedWriterAt adds a lock to protect from closing the fd at the same time as writing.
  81. // WriteAt() is goroutine safe by itself, but not against for example Close().
  82. type lockedWriterAt struct {
  83. mut sync.RWMutex
  84. fd fs.File
  85. }
  86. // WriteAt itself is goroutine safe, thus just needs to acquire a read-lock to
  87. // prevent closing concurrently (see SyncClose).
  88. func (w *lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  89. w.mut.RLock()
  90. defer w.mut.RUnlock()
  91. return w.fd.WriteAt(p, off)
  92. }
  93. // SyncClose ensures that no more writes are happening before going ahead and
  94. // syncing and closing the fd, thus needs to acquire a write-lock.
  95. func (w *lockedWriterAt) SyncClose(fsync bool) error {
  96. w.mut.Lock()
  97. defer w.mut.Unlock()
  98. if fsync {
  99. if err := w.fd.Sync(); err != nil {
  100. // Sync() is nice if it works but not worth failing the
  101. // operation over if it fails.
  102. l.Debugf("fsync failed: %v", err)
  103. }
  104. }
  105. return w.fd.Close()
  106. }
  107. // tempFile returns the fd for the temporary file, reusing an open fd
  108. // or creating the file as necessary.
  109. func (s *sharedPullerState) tempFile() (*lockedWriterAt, error) {
  110. s.mut.Lock()
  111. defer s.mut.Unlock()
  112. // If we've already hit an error, return early
  113. if s.err != nil {
  114. return nil, s.err
  115. }
  116. // If the temp file is already open, return the file descriptor
  117. if s.writer != nil {
  118. return s.writer, nil
  119. }
  120. if err := s.addWriterLocked(); err != nil {
  121. s.failLocked(err)
  122. return nil, err
  123. }
  124. return s.writer, nil
  125. }
  126. func (s *sharedPullerState) addWriterLocked() error {
  127. return inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms)
  128. }
  129. // tempFileInWritableDir should only be called from tempFile.
  130. func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
  131. // The permissions to use for the temporary file should be those of the
  132. // final file, except we need user read & write at minimum. The
  133. // permissions will be set to the final value later, but in the meantime
  134. // we don't want to have a temporary file with looser permissions than
  135. // the final outcome.
  136. mode := fs.FileMode(s.file.Permissions) | 0600
  137. if s.ignorePerms {
  138. // When ignorePerms is set we use a very permissive mode and let the
  139. // system umask filter it.
  140. mode = 0666
  141. }
  142. // Attempt to create the temp file
  143. // RDWR because of issue #2994.
  144. flags := fs.OptReadWrite
  145. if s.reused == 0 {
  146. flags |= fs.OptCreate | fs.OptExclusive
  147. } else if !s.ignorePerms {
  148. // With sufficiently bad luck when exiting or crashing, we may have
  149. // had time to chmod the temp file to read only state but not yet
  150. // moved it to its final name. This leaves us with a read only temp
  151. // file that we're going to try to reuse. To handle that, we need to
  152. // make sure we have write permissions on the file before opening it.
  153. //
  154. // When ignorePerms is set we trust that the permissions are fine
  155. // already and make no modification, as we would otherwise override
  156. // what the umask dictates.
  157. if err := s.fs.Chmod(s.tempName, mode); err != nil {
  158. return errors.Wrap(err, "setting perms on temp file")
  159. }
  160. }
  161. fd, err := s.fs.OpenFile(s.tempName, flags, mode)
  162. if err != nil {
  163. return errors.Wrap(err, "opening temp file")
  164. }
  165. // Hide the temporary file
  166. s.fs.Hide(s.tempName)
  167. // Don't truncate symlink files, as that will mean that the path will
  168. // contain a bunch of nulls.
  169. if s.sparse && !s.file.IsSymlink() {
  170. size := s.file.Size
  171. // Trailer added to encrypted files
  172. if len(s.file.Encrypted) > 0 {
  173. size += encryptionTrailerSize(s.file)
  174. }
  175. // Truncate sets the size of the file. This creates a sparse file or a
  176. // space reservation, depending on the underlying filesystem.
  177. if err := fd.Truncate(size); err != nil {
  178. // The truncate call failed. That can happen in some cases when
  179. // space reservation isn't possible or over some network
  180. // filesystems... This generally doesn't matter.
  181. if s.reused > 0 {
  182. // ... but if we are attempting to reuse a file we have a
  183. // corner case when the old file is larger than the new one
  184. // and we can't just overwrite blocks and let the old data
  185. // linger at the end. In this case we attempt a delete of
  186. // the file and hope for better luck next time, when we
  187. // should come around with s.reused == 0.
  188. fd.Close()
  189. if remErr := s.fs.Remove(s.tempName); remErr != nil {
  190. l.Debugln("failed to remove temporary file:", remErr)
  191. }
  192. return err
  193. }
  194. }
  195. }
  196. // Same fd will be used by all writers
  197. s.writer = &lockedWriterAt{sync.NewRWMutex(), fd}
  198. return nil
  199. }
  200. // fail sets the error on the puller state compose of error, and marks the
  201. // sharedPullerState as failed. Is a no-op when called on an already failed state.
  202. func (s *sharedPullerState) fail(err error) {
  203. s.mut.Lock()
  204. defer s.mut.Unlock()
  205. s.failLocked(err)
  206. }
  207. func (s *sharedPullerState) failLocked(err error) {
  208. if s.err != nil || err == nil {
  209. return
  210. }
  211. s.err = err
  212. }
  213. func (s *sharedPullerState) failed() error {
  214. s.mut.RLock()
  215. err := s.err
  216. s.mut.RUnlock()
  217. return err
  218. }
  219. func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
  220. s.mut.Lock()
  221. s.copyNeeded--
  222. s.updated = time.Now()
  223. s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
  224. s.availableUpdated = time.Now()
  225. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  226. s.mut.Unlock()
  227. }
  228. func (s *sharedPullerState) copiedFromOrigin() {
  229. s.mut.Lock()
  230. s.copyOrigin++
  231. s.updated = time.Now()
  232. s.mut.Unlock()
  233. }
  234. func (s *sharedPullerState) copiedFromOriginShifted() {
  235. s.mut.Lock()
  236. s.copyOrigin++
  237. s.copyOriginShifted++
  238. s.updated = time.Now()
  239. s.mut.Unlock()
  240. }
  241. func (s *sharedPullerState) pullStarted() {
  242. s.mut.Lock()
  243. s.copyTotal--
  244. s.copyNeeded--
  245. s.pullTotal++
  246. s.pullNeeded++
  247. s.updated = time.Now()
  248. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  249. s.mut.Unlock()
  250. }
  251. func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
  252. s.mut.Lock()
  253. s.pullNeeded--
  254. s.updated = time.Now()
  255. s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
  256. s.availableUpdated = time.Now()
  257. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  258. s.mut.Unlock()
  259. }
  260. // finalClose atomically closes and returns closed status of a file. A true
  261. // first return value means the file was closed and should be finished, with
  262. // the error indicating the success or failure of the close. A false first
  263. // return value indicates the file is not ready to be closed, or is already
  264. // closed and should in either case not be finished off now.
  265. func (s *sharedPullerState) finalClose() (bool, error) {
  266. s.mut.Lock()
  267. defer s.mut.Unlock()
  268. if s.closed {
  269. // Already closed
  270. return false, nil
  271. }
  272. if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
  273. // Not done yet, and not errored
  274. return false, nil
  275. }
  276. if len(s.file.Encrypted) > 0 {
  277. if err := s.finalizeEncrypted(); err != nil && s.err == nil {
  278. // This is our error as we weren't errored before.
  279. s.err = err
  280. }
  281. }
  282. if s.writer != nil {
  283. if err := s.writer.SyncClose(s.fsync); err != nil && s.err == nil {
  284. // This is our error as we weren't errored before.
  285. s.err = err
  286. }
  287. s.writer = nil
  288. }
  289. s.closed = true
  290. // Unhide the temporary file when we close it, as it's likely to
  291. // immediately be renamed to the final name. If this is a failed temp
  292. // file we will also unhide it, but I'm fine with that as we're now
  293. // leaving it around for potentially quite a while.
  294. s.fs.Unhide(s.tempName)
  295. return true, s.err
  296. }
  297. // finalizeEncrypted adds a trailer to the encrypted file containing the
  298. // serialized FileInfo and the length of that FileInfo. When initializing a
  299. // folder from encrypted data we can extract this FileInfo from the end of
  300. // the file and regain the original metadata.
  301. func (s *sharedPullerState) finalizeEncrypted() error {
  302. bs := make([]byte, encryptionTrailerSize(s.file))
  303. n, err := s.file.MarshalTo(bs)
  304. if err != nil {
  305. return err
  306. }
  307. binary.BigEndian.PutUint32(bs[n:], uint32(n))
  308. bs = bs[:n+4]
  309. if s.writer == nil {
  310. if err := s.addWriterLocked(); err != nil {
  311. return err
  312. }
  313. }
  314. if _, err := s.writer.WriteAt(bs, s.file.Size); err != nil {
  315. return err
  316. }
  317. return nil
  318. }
  319. func encryptionTrailerSize(file protocol.FileInfo) int64 {
  320. return int64(file.ProtoSize()) + 4
  321. }
  322. // Progress returns the momentarily progress for the puller
  323. func (s *sharedPullerState) Progress() *pullerProgress {
  324. s.mut.RLock()
  325. defer s.mut.RUnlock()
  326. total := s.reused + s.copyTotal + s.pullTotal
  327. done := total - s.copyNeeded - s.pullNeeded
  328. return &pullerProgress{
  329. Total: total,
  330. Reused: s.reused,
  331. CopiedFromOrigin: s.copyOrigin,
  332. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  333. Pulled: s.pullTotal - s.pullNeeded,
  334. Pulling: s.pullNeeded,
  335. BytesTotal: blocksToSize(s.file.BlockSize(), total),
  336. BytesDone: blocksToSize(s.file.BlockSize(), done),
  337. }
  338. }
  339. // Updated returns the time when any of the progress related counters was last updated.
  340. func (s *sharedPullerState) Updated() time.Time {
  341. s.mut.RLock()
  342. t := s.updated
  343. s.mut.RUnlock()
  344. return t
  345. }
  346. // AvailableUpdated returns the time last time list of available blocks was updated
  347. func (s *sharedPullerState) AvailableUpdated() time.Time {
  348. s.mut.RLock()
  349. t := s.availableUpdated
  350. s.mut.RUnlock()
  351. return t
  352. }
  353. // Available returns blocks available in the current temporary file
  354. func (s *sharedPullerState) Available() []int {
  355. s.mut.RLock()
  356. blocks := s.available
  357. s.mut.RUnlock()
  358. return blocks
  359. }
  360. func blocksToSize(size int, num int) int64 {
  361. if num < 2 {
  362. return int64(size / 2)
  363. }
  364. return int64(num-1)*int64(size) + int64(size/2)
  365. }