sharedpullerstate.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "fmt"
  9. "io"
  10. "path/filepath"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/fs"
  13. "github.com/syncthing/syncthing/lib/protocol"
  14. "github.com/syncthing/syncthing/lib/sync"
  15. )
  16. // A sharedPullerState is kept for each file that is being synced and is kept
  17. // updated along the way.
  18. type sharedPullerState struct {
  19. // Immutable, does not require locking
  20. file protocol.FileInfo // The new file (desired end state)
  21. fs fs.Filesystem
  22. folder string
  23. tempName string
  24. realName string
  25. reused int // Number of blocks reused from temporary file
  26. ignorePerms bool
  27. hasCurFile bool // Whether curFile is set
  28. curFile protocol.FileInfo // The file as it exists now in our database
  29. sparse bool
  30. created time.Time
  31. // Mutable, must be locked for access
  32. err error // The first error we hit
  33. fd fs.File // The fd of the temp file
  34. copyTotal int // Total number of copy actions for the whole job
  35. pullTotal int // Total number of pull actions for the whole job
  36. copyOrigin int // Number of blocks copied from the original file
  37. copyOriginShifted int // Number of blocks copied from the original file but shifted
  38. copyNeeded int // Number of copy actions still pending
  39. pullNeeded int // Number of block pulls still pending
  40. updated time.Time // Time when any of the counters above were last updated
  41. closed bool // True if the file has been finalClosed.
  42. available []int32 // Indexes of the blocks that are available in the temporary file
  43. availableUpdated time.Time // Time when list of available blocks was last updated
  44. mut sync.RWMutex // Protects the above
  45. }
  46. // A momentary state representing the progress of the puller
  47. type pullerProgress struct {
  48. Total int `json:"total"`
  49. Reused int `json:"reused"`
  50. CopiedFromOrigin int `json:"copiedFromOrigin"`
  51. CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
  52. CopiedFromElsewhere int `json:"copiedFromElsewhere"`
  53. Pulled int `json:"pulled"`
  54. Pulling int `json:"pulling"`
  55. BytesDone int64 `json:"bytesDone"`
  56. BytesTotal int64 `json:"bytesTotal"`
  57. }
  58. // A lockedWriterAt synchronizes WriteAt calls with an external mutex.
  59. // WriteAt() is goroutine safe by itself, but not against for example Close().
  60. type lockedWriterAt struct {
  61. mut *sync.RWMutex
  62. wr io.WriterAt
  63. }
  64. func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  65. (*w.mut).Lock()
  66. defer (*w.mut).Unlock()
  67. return w.wr.WriteAt(p, off)
  68. }
  69. // tempFile returns the fd for the temporary file, reusing an open fd
  70. // or creating the file as necessary.
  71. func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
  72. s.mut.Lock()
  73. defer s.mut.Unlock()
  74. // If we've already hit an error, return early
  75. if s.err != nil {
  76. return nil, s.err
  77. }
  78. // If the temp file is already open, return the file descriptor
  79. if s.fd != nil {
  80. return lockedWriterAt{&s.mut, s.fd}, nil
  81. }
  82. // Ensure that the parent directory is writable. This is
  83. // osutil.InWritableDir except we need to do more stuff so we duplicate it
  84. // here.
  85. dir := filepath.Dir(s.tempName)
  86. if info, err := s.fs.Stat(dir); err != nil {
  87. if fs.IsNotExist(err) {
  88. // XXX: This works around a bug elsewhere, a race condition when
  89. // things are deleted while being synced. However that happens, we
  90. // end up with a directory for "foo" with the delete bit, but a
  91. // file "foo/bar" that we want to sync. We never create the
  92. // directory, and hence fail to create the file and end up looping
  93. // forever on it. This breaks that by creating the directory; on
  94. // next scan it'll be found and the delete bit on it is removed.
  95. // The user can then clean up as they like...
  96. l.Infoln("Resurrecting directory", dir)
  97. if err := s.fs.MkdirAll(dir, 0755); err != nil {
  98. s.failLocked("resurrect dir", err)
  99. return nil, err
  100. }
  101. } else {
  102. s.failLocked("dst stat dir", err)
  103. return nil, err
  104. }
  105. } else if info.Mode()&0200 == 0 {
  106. err := s.fs.Chmod(dir, 0755)
  107. if !s.ignorePerms && err == nil {
  108. defer func() {
  109. err := s.fs.Chmod(dir, info.Mode()&fs.ModePerm)
  110. if err != nil {
  111. panic(err)
  112. }
  113. }()
  114. }
  115. }
  116. // The permissions to use for the temporary file should be those of the
  117. // final file, except we need user read & write at minimum. The
  118. // permissions will be set to the final value later, but in the meantime
  119. // we don't want to have a temporary file with looser permissions than
  120. // the final outcome.
  121. mode := fs.FileMode(s.file.Permissions) | 0600
  122. if s.ignorePerms {
  123. // When ignorePerms is set we use a very permissive mode and let the
  124. // system umask filter it.
  125. mode = 0666
  126. }
  127. // Attempt to create the temp file
  128. // RDWR because of issue #2994.
  129. flags := fs.OptReadWrite
  130. if s.reused == 0 {
  131. flags |= fs.OptCreate | fs.OptExclusive
  132. } else if !s.ignorePerms {
  133. // With sufficiently bad luck when exiting or crashing, we may have
  134. // had time to chmod the temp file to read only state but not yet
  135. // moved it to its final name. This leaves us with a read only temp
  136. // file that we're going to try to reuse. To handle that, we need to
  137. // make sure we have write permissions on the file before opening it.
  138. //
  139. // When ignorePerms is set we trust that the permissions are fine
  140. // already and make no modification, as we would otherwise override
  141. // what the umask dictates.
  142. if err := s.fs.Chmod(s.tempName, mode); err != nil {
  143. s.failLocked("dst create chmod", err)
  144. return nil, err
  145. }
  146. }
  147. fd, err := s.fs.OpenFile(s.tempName, flags, mode)
  148. if err != nil {
  149. s.failLocked("dst create", err)
  150. return nil, err
  151. }
  152. // Hide the temporary file
  153. s.fs.Hide(s.tempName)
  154. // Don't truncate symlink files, as that will mean that the path will
  155. // contain a bunch of nulls.
  156. if s.sparse && !s.file.IsSymlink() {
  157. // Truncate sets the size of the file. This creates a sparse file or a
  158. // space reservation, depending on the underlying filesystem.
  159. if err := fd.Truncate(s.file.Size); err != nil {
  160. s.failLocked("dst truncate", err)
  161. return nil, err
  162. }
  163. }
  164. // Same fd will be used by all writers
  165. s.fd = fd
  166. return lockedWriterAt{&s.mut, s.fd}, nil
  167. }
  168. // sourceFile opens the existing source file for reading
  169. func (s *sharedPullerState) sourceFile() (fs.File, error) {
  170. s.mut.Lock()
  171. defer s.mut.Unlock()
  172. // If we've already hit an error, return early
  173. if s.err != nil {
  174. return nil, s.err
  175. }
  176. // Attempt to open the existing file
  177. fd, err := s.fs.Open(s.realName)
  178. if err != nil {
  179. s.failLocked("src open", err)
  180. return nil, err
  181. }
  182. return fd, nil
  183. }
  184. // fail sets the error on the puller state compose of error, and marks the
  185. // sharedPullerState as failed. Is a no-op when called on an already failed state.
  186. func (s *sharedPullerState) fail(context string, err error) {
  187. s.mut.Lock()
  188. defer s.mut.Unlock()
  189. s.failLocked(context, err)
  190. }
  191. func (s *sharedPullerState) failLocked(context string, err error) {
  192. if s.err != nil || err == nil {
  193. return
  194. }
  195. s.err = fmt.Errorf("%s: %s", context, err.Error())
  196. }
  197. func (s *sharedPullerState) failed() error {
  198. s.mut.RLock()
  199. err := s.err
  200. s.mut.RUnlock()
  201. return err
  202. }
  203. func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
  204. s.mut.Lock()
  205. s.copyNeeded--
  206. s.updated = time.Now()
  207. s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
  208. s.availableUpdated = time.Now()
  209. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  210. s.mut.Unlock()
  211. }
  212. func (s *sharedPullerState) copiedFromOrigin() {
  213. s.mut.Lock()
  214. s.copyOrigin++
  215. s.updated = time.Now()
  216. s.mut.Unlock()
  217. }
  218. func (s *sharedPullerState) copiedFromOriginShifted() {
  219. s.mut.Lock()
  220. s.copyOrigin++
  221. s.copyOriginShifted++
  222. s.updated = time.Now()
  223. s.mut.Unlock()
  224. }
  225. func (s *sharedPullerState) pullStarted() {
  226. s.mut.Lock()
  227. s.copyTotal--
  228. s.copyNeeded--
  229. s.pullTotal++
  230. s.pullNeeded++
  231. s.updated = time.Now()
  232. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  233. s.mut.Unlock()
  234. }
  235. func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
  236. s.mut.Lock()
  237. s.pullNeeded--
  238. s.updated = time.Now()
  239. s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
  240. s.availableUpdated = time.Now()
  241. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  242. s.mut.Unlock()
  243. }
  244. // finalClose atomically closes and returns closed status of a file. A true
  245. // first return value means the file was closed and should be finished, with
  246. // the error indicating the success or failure of the close. A false first
  247. // return value indicates the file is not ready to be closed, or is already
  248. // closed and should in either case not be finished off now.
  249. func (s *sharedPullerState) finalClose() (bool, error) {
  250. s.mut.Lock()
  251. defer s.mut.Unlock()
  252. if s.closed {
  253. // Already closed
  254. return false, nil
  255. }
  256. if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
  257. // Not done yet, and not errored
  258. return false, nil
  259. }
  260. if s.fd != nil {
  261. // This is our error if we weren't errored before. Otherwise we
  262. // keep the earlier error.
  263. if fsyncErr := s.fd.Sync(); fsyncErr != nil && s.err == nil {
  264. s.err = fsyncErr
  265. }
  266. if closeErr := s.fd.Close(); closeErr != nil && s.err == nil {
  267. s.err = closeErr
  268. }
  269. s.fd = nil
  270. }
  271. s.closed = true
  272. // Unhide the temporary file when we close it, as it's likely to
  273. // immediately be renamed to the final name. If this is a failed temp
  274. // file we will also unhide it, but I'm fine with that as we're now
  275. // leaving it around for potentially quite a while.
  276. s.fs.Unhide(s.tempName)
  277. return true, s.err
  278. }
  279. // Progress returns the momentarily progress for the puller
  280. func (s *sharedPullerState) Progress() *pullerProgress {
  281. s.mut.RLock()
  282. defer s.mut.RUnlock()
  283. total := s.reused + s.copyTotal + s.pullTotal
  284. done := total - s.copyNeeded - s.pullNeeded
  285. return &pullerProgress{
  286. Total: total,
  287. Reused: s.reused,
  288. CopiedFromOrigin: s.copyOrigin,
  289. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  290. Pulled: s.pullTotal - s.pullNeeded,
  291. Pulling: s.pullNeeded,
  292. BytesTotal: blocksToSize(total),
  293. BytesDone: blocksToSize(done),
  294. }
  295. }
  296. // Updated returns the time when any of the progress related counters was last updated.
  297. func (s *sharedPullerState) Updated() time.Time {
  298. s.mut.RLock()
  299. t := s.updated
  300. s.mut.RUnlock()
  301. return t
  302. }
  303. // AvailableUpdated returns the time last time list of available blocks was updated
  304. func (s *sharedPullerState) AvailableUpdated() time.Time {
  305. s.mut.RLock()
  306. t := s.availableUpdated
  307. s.mut.RUnlock()
  308. return t
  309. }
  310. // Available returns blocks available in the current temporary file
  311. func (s *sharedPullerState) Available() []int32 {
  312. s.mut.RLock()
  313. blocks := s.available
  314. s.mut.RUnlock()
  315. return blocks
  316. }
  317. func blocksToSize(num int) int64 {
  318. if num < 2 {
  319. return protocol.BlockSize / 2
  320. }
  321. return int64(num-1)*protocol.BlockSize + protocol.BlockSize/2
  322. }