sharedpullerstate.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "encoding/binary"
  9. "fmt"
  10. "io"
  11. "time"
  12. "google.golang.org/protobuf/proto"
  13. "github.com/syncthing/syncthing/internal/protoutil"
  14. "github.com/syncthing/syncthing/lib/fs"
  15. "github.com/syncthing/syncthing/lib/osutil"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "github.com/syncthing/syncthing/lib/sync"
  18. )
  19. // A sharedPullerState is kept for each file that is being synced and is kept
  20. // updated along the way.
  21. type sharedPullerState struct {
  22. // Immutable, does not require locking
  23. file protocol.FileInfo // The new file (desired end state)
  24. fs fs.Filesystem
  25. folder string
  26. tempName string
  27. realName string
  28. reused int // Number of blocks reused from temporary file
  29. ignorePerms bool
  30. hasCurFile bool // Whether curFile is set
  31. curFile protocol.FileInfo // The file as it exists now in our database
  32. sparse bool
  33. created time.Time
  34. fsync bool
  35. // Mutable, must be locked for access
  36. err error // The first error we hit
  37. writer *lockedWriterAt // Wraps fd to prevent fd closing at the same time as writing
  38. copyTotal int // Total number of copy actions for the whole job
  39. pullTotal int // Total number of pull actions for the whole job
  40. copyOrigin int // Number of blocks copied from the original file
  41. copyOriginShifted int // Number of blocks copied from the original file but shifted
  42. copyNeeded int // Number of copy actions still pending
  43. pullNeeded int // Number of block pulls still pending
  44. updated time.Time // Time when any of the counters above were last updated
  45. closed bool // True if the file has been finalClosed.
  46. available []int // Indexes of the blocks that are available in the temporary file
  47. availableUpdated time.Time // Time when list of available blocks was last updated
  48. mut sync.RWMutex // Protects the above
  49. }
  50. func newSharedPullerState(file protocol.FileInfo, fs fs.Filesystem, folderID, tempName string, blocks []protocol.BlockInfo, reused []int, ignorePerms, hasCurFile bool, curFile protocol.FileInfo, sparse bool, fsync bool) *sharedPullerState {
  51. return &sharedPullerState{
  52. file: file,
  53. fs: fs,
  54. folder: folderID,
  55. tempName: tempName,
  56. realName: file.Name,
  57. copyTotal: len(blocks),
  58. copyNeeded: len(blocks),
  59. reused: len(reused),
  60. updated: time.Now(),
  61. available: reused,
  62. availableUpdated: time.Now(),
  63. ignorePerms: ignorePerms,
  64. hasCurFile: hasCurFile,
  65. curFile: curFile,
  66. mut: sync.NewRWMutex(),
  67. sparse: sparse,
  68. fsync: fsync,
  69. created: time.Now(),
  70. }
  71. }
  72. // A momentary state representing the progress of the puller
  73. type PullerProgress struct {
  74. Total int `json:"total"`
  75. Reused int `json:"reused"`
  76. CopiedFromOrigin int `json:"copiedFromOrigin"`
  77. CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
  78. CopiedFromElsewhere int `json:"copiedFromElsewhere"`
  79. Pulled int `json:"pulled"`
  80. Pulling int `json:"pulling"`
  81. BytesDone int64 `json:"bytesDone"`
  82. BytesTotal int64 `json:"bytesTotal"`
  83. }
  84. // lockedWriterAt adds a lock to protect from closing the fd at the same time as writing.
  85. // WriteAt() is goroutine safe by itself, but not against for example Close().
  86. type lockedWriterAt struct {
  87. mut sync.RWMutex
  88. fd fs.File
  89. }
  90. // WriteAt itself is goroutine safe, thus just needs to acquire a read-lock to
  91. // prevent closing concurrently (see SyncClose).
  92. func (w *lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  93. w.mut.RLock()
  94. defer w.mut.RUnlock()
  95. return w.fd.WriteAt(p, off)
  96. }
  97. // SyncClose ensures that no more writes are happening before going ahead and
  98. // syncing and closing the fd, thus needs to acquire a write-lock.
  99. func (w *lockedWriterAt) SyncClose(fsync bool) error {
  100. w.mut.Lock()
  101. defer w.mut.Unlock()
  102. if fsync {
  103. if err := w.fd.Sync(); err != nil {
  104. // Sync() is nice if it works but not worth failing the
  105. // operation over if it fails.
  106. l.Debugf("fsync failed: %v", err)
  107. }
  108. }
  109. return w.fd.Close()
  110. }
  111. // tempFile returns the fd for the temporary file, reusing an open fd
  112. // or creating the file as necessary.
  113. func (s *sharedPullerState) tempFile() (*lockedWriterAt, error) {
  114. s.mut.Lock()
  115. defer s.mut.Unlock()
  116. // If we've already hit an error, return early
  117. if s.err != nil {
  118. return nil, s.err
  119. }
  120. // If the temp file is already open, return the file descriptor
  121. if s.writer != nil {
  122. return s.writer, nil
  123. }
  124. if err := s.addWriterLocked(); err != nil {
  125. s.failLocked(err)
  126. return nil, err
  127. }
  128. return s.writer, nil
  129. }
  130. func (s *sharedPullerState) addWriterLocked() error {
  131. return inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms)
  132. }
  133. // tempFileInWritableDir should only be called from tempFile.
  134. func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
  135. // The permissions to use for the temporary file should be those of the
  136. // final file, except we need user read & write at minimum. The
  137. // permissions will be set to the final value later, but in the meantime
  138. // we don't want to have a temporary file with looser permissions than
  139. // the final outcome.
  140. mode := fs.FileMode(s.file.Permissions) | 0o600
  141. if s.ignorePerms {
  142. // When ignorePerms is set we use a very permissive mode and let the
  143. // system umask filter it.
  144. mode = 0o666
  145. }
  146. // Attempt to create the temp file
  147. // RDWR because of issue #2994.
  148. flags := fs.OptReadWrite
  149. if s.reused == 0 {
  150. flags |= fs.OptCreate | fs.OptExclusive
  151. } else if !s.ignorePerms {
  152. // With sufficiently bad luck when exiting or crashing, we may have
  153. // had time to chmod the temp file to read only state but not yet
  154. // moved it to its final name. This leaves us with a read only temp
  155. // file that we're going to try to reuse. To handle that, we need to
  156. // make sure we have write permissions on the file before opening it.
  157. //
  158. // When ignorePerms is set we trust that the permissions are fine
  159. // already and make no modification, as we would otherwise override
  160. // what the umask dictates.
  161. if err := s.fs.Chmod(s.tempName, mode); err != nil {
  162. return fmt.Errorf("setting perms on temp file: %w", err)
  163. }
  164. }
  165. fd, err := s.fs.OpenFile(s.tempName, flags, mode)
  166. if err != nil {
  167. return fmt.Errorf("opening temp file: %w", err)
  168. }
  169. // Hide the temporary file
  170. s.fs.Hide(s.tempName)
  171. // Don't truncate symlink files, as that will mean that the path will
  172. // contain a bunch of nulls.
  173. if s.sparse && !s.file.IsSymlink() {
  174. size := s.file.Size
  175. // Trailer added to encrypted files
  176. if len(s.file.Encrypted) > 0 {
  177. size += encryptionTrailerSize(s.file)
  178. }
  179. // Truncate sets the size of the file. This creates a sparse file or a
  180. // space reservation, depending on the underlying filesystem.
  181. if err := fd.Truncate(size); err != nil {
  182. // The truncate call failed. That can happen in some cases when
  183. // space reservation isn't possible or over some network
  184. // filesystems... This generally doesn't matter.
  185. if s.reused > 0 {
  186. // ... but if we are attempting to reuse a file we have a
  187. // corner case when the old file is larger than the new one
  188. // and we can't just overwrite blocks and let the old data
  189. // linger at the end. In this case we attempt a delete of
  190. // the file and hope for better luck next time, when we
  191. // should come around with s.reused == 0.
  192. fd.Close()
  193. if remErr := s.fs.Remove(s.tempName); remErr != nil {
  194. l.Debugln("failed to remove temporary file:", remErr)
  195. }
  196. return err
  197. }
  198. }
  199. }
  200. // Same fd will be used by all writers
  201. s.writer = &lockedWriterAt{sync.NewRWMutex(), fd}
  202. return nil
  203. }
  204. // fail sets the error on the puller state compose of error, and marks the
  205. // sharedPullerState as failed. Is a no-op when called on an already failed state.
  206. func (s *sharedPullerState) fail(err error) {
  207. s.mut.Lock()
  208. defer s.mut.Unlock()
  209. s.failLocked(err)
  210. }
  211. func (s *sharedPullerState) failLocked(err error) {
  212. if s.err != nil || err == nil {
  213. return
  214. }
  215. s.err = err
  216. }
  217. func (s *sharedPullerState) failed() error {
  218. s.mut.RLock()
  219. err := s.err
  220. s.mut.RUnlock()
  221. return err
  222. }
  223. func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
  224. s.mut.Lock()
  225. s.copyNeeded--
  226. s.updated = time.Now()
  227. s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
  228. s.availableUpdated = time.Now()
  229. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  230. s.mut.Unlock()
  231. }
  232. func (s *sharedPullerState) copiedFromOrigin(bytes int) {
  233. s.mut.Lock()
  234. s.copyOrigin++
  235. s.updated = time.Now()
  236. s.mut.Unlock()
  237. metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOrigin).Add(float64(bytes))
  238. }
  239. func (s *sharedPullerState) copiedFromElsewhere(bytes int) {
  240. metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOther).Add(float64(bytes))
  241. }
  242. func (s *sharedPullerState) skippedSparseBlock(bytes int) {
  243. // pretend we copied it, historical
  244. s.mut.Lock()
  245. s.copyOrigin++
  246. s.updated = time.Now()
  247. s.mut.Unlock()
  248. metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceSkipped).Add(float64(bytes))
  249. }
  250. func (s *sharedPullerState) pullStarted() {
  251. s.mut.Lock()
  252. s.copyTotal--
  253. s.copyNeeded--
  254. s.pullTotal++
  255. s.pullNeeded++
  256. s.updated = time.Now()
  257. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  258. s.mut.Unlock()
  259. }
  260. func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
  261. s.mut.Lock()
  262. s.pullNeeded--
  263. s.updated = time.Now()
  264. s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
  265. s.availableUpdated = time.Now()
  266. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  267. s.mut.Unlock()
  268. metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceNetwork).Add(float64(block.Size))
  269. }
  270. // finalClose atomically closes and returns closed status of a file. A true
  271. // first return value means the file was closed and should be finished, with
  272. // the error indicating the success or failure of the close. A false first
  273. // return value indicates the file is not ready to be closed, or is already
  274. // closed and should in either case not be finished off now.
  275. func (s *sharedPullerState) finalClose() (bool, error) {
  276. s.mut.Lock()
  277. defer s.mut.Unlock()
  278. if s.closed {
  279. // Already closed
  280. return false, nil
  281. }
  282. if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
  283. // Not done yet, and not errored
  284. return false, nil
  285. }
  286. if len(s.file.Encrypted) > 0 {
  287. if err := s.finalizeEncrypted(); err != nil && s.err == nil {
  288. // This is our error as we weren't errored before.
  289. s.err = err
  290. }
  291. }
  292. if s.writer != nil {
  293. if err := s.writer.SyncClose(s.fsync); err != nil && s.err == nil {
  294. // This is our error as we weren't errored before.
  295. s.err = err
  296. }
  297. s.writer = nil
  298. }
  299. s.closed = true
  300. // Unhide the temporary file when we close it, as it's likely to
  301. // immediately be renamed to the final name. If this is a failed temp
  302. // file we will also unhide it, but I'm fine with that as we're now
  303. // leaving it around for potentially quite a while.
  304. s.fs.Unhide(s.tempName)
  305. return true, s.err
  306. }
  307. // finalizeEncrypted adds a trailer to the encrypted file containing the
  308. // serialized FileInfo and the length of that FileInfo. When initializing a
  309. // folder from encrypted data we can extract this FileInfo from the end of
  310. // the file and regain the original metadata.
  311. func (s *sharedPullerState) finalizeEncrypted() error {
  312. if s.writer == nil {
  313. if err := s.addWriterLocked(); err != nil {
  314. return err
  315. }
  316. }
  317. trailerSize, err := writeEncryptionTrailer(s.file, s.writer)
  318. if err != nil {
  319. return err
  320. }
  321. s.file.Size += trailerSize
  322. s.file.EncryptionTrailerSize = int(trailerSize)
  323. return nil
  324. }
  325. // Returns the size of the written trailer.
  326. func writeEncryptionTrailer(file protocol.FileInfo, writer io.WriterAt) (int64, error) {
  327. // Here the file is in native format, while encryption happens in
  328. // wire format (always slashes).
  329. wireFile := file
  330. wireFile.Name = osutil.NormalizedFilename(wireFile.Name)
  331. trailerSize := encryptionTrailerSize(wireFile)
  332. bs := make([]byte, trailerSize)
  333. n, err := protoutil.MarshalTo(bs, wireFile.ToWire(false))
  334. if err != nil {
  335. return 0, err
  336. }
  337. binary.BigEndian.PutUint32(bs[n:], uint32(n)) //nolint:gosec
  338. bs = bs[:n+4]
  339. if _, err := writer.WriteAt(bs, wireFile.Size); err != nil {
  340. return 0, err
  341. }
  342. return trailerSize, nil
  343. }
  344. func encryptionTrailerSize(file protocol.FileInfo) int64 {
  345. return int64(proto.Size(file.ToWire(false))) + 4 // XXX: Inefficient
  346. }
  347. // Progress returns the momentarily progress for the puller
  348. func (s *sharedPullerState) Progress() *PullerProgress {
  349. s.mut.RLock()
  350. defer s.mut.RUnlock()
  351. total := s.reused + s.copyTotal + s.pullTotal
  352. done := total - s.copyNeeded - s.pullNeeded
  353. file := len(s.file.Blocks)
  354. return &PullerProgress{
  355. Total: total,
  356. Reused: s.reused,
  357. CopiedFromOrigin: s.copyOrigin,
  358. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  359. Pulled: s.pullTotal - s.pullNeeded,
  360. Pulling: s.pullNeeded,
  361. BytesTotal: blocksToSize(total, file, s.file.BlockSize(), s.file.Size),
  362. BytesDone: blocksToSize(done, file, s.file.BlockSize(), s.file.Size),
  363. }
  364. }
  365. // Updated returns the time when any of the progress related counters was last updated.
  366. func (s *sharedPullerState) Updated() time.Time {
  367. s.mut.RLock()
  368. t := s.updated
  369. s.mut.RUnlock()
  370. return t
  371. }
  372. // AvailableUpdated returns the time last time list of available blocks was updated
  373. func (s *sharedPullerState) AvailableUpdated() time.Time {
  374. s.mut.RLock()
  375. t := s.availableUpdated
  376. s.mut.RUnlock()
  377. return t
  378. }
  379. // Available returns blocks available in the current temporary file
  380. func (s *sharedPullerState) Available() []int {
  381. s.mut.RLock()
  382. blocks := s.available
  383. s.mut.RUnlock()
  384. return blocks
  385. }
  386. func blocksToSize(blocks, blocksInFile, blockSize int, fileSize int64) int64 {
  387. // The last/only block has somewhere between 1 and blockSize bytes. We do
  388. // not know whether the smaller block is part of the blocks and use an
  389. // estimate assuming a random chance that the small block is contained.
  390. if blocksInFile == 0 {
  391. return 0
  392. }
  393. return int64(blocks)*int64(blockSize) - (int64(blockSize)-fileSize%int64(blockSize))*int64(blocks)/int64(blocksInFile)
  394. }