sharedpullerstate.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "io"
  9. "os"
  10. "path/filepath"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/protocol"
  13. "github.com/syncthing/syncthing/lib/sync"
  14. )
  15. // A sharedPullerState is kept for each file that is being synced and is kept
  16. // updated along the way.
  17. type sharedPullerState struct {
  18. // Immutable, does not require locking
  19. file protocol.FileInfo // The new file (desired end state)
  20. folder string
  21. tempName string
  22. realName string
  23. reused int // Number of blocks reused from temporary file
  24. ignorePerms bool
  25. version protocol.Vector // The current (old) version
  26. sparse bool
  27. created time.Time
  28. // Mutable, must be locked for access
  29. err error // The first error we hit
  30. fd *os.File // The fd of the temp file
  31. copyTotal int // Total number of copy actions for the whole job
  32. pullTotal int // Total number of pull actions for the whole job
  33. copyOrigin int // Number of blocks copied from the original file
  34. copyOriginShifted int // Number of blocks copied from the original file but shifted
  35. copyNeeded int // Number of copy actions still pending
  36. pullNeeded int // Number of block pulls still pending
  37. updated time.Time // Time when any of the counters above were last updated
  38. closed bool // True if the file has been finalClosed.
  39. available []int32 // Indexes of the blocks that are available in the temporary file
  40. availableUpdated time.Time // Time when list of available blocks was last updated
  41. mut sync.RWMutex // Protects the above
  42. }
  43. // A momentary state representing the progress of the puller
  44. type pullerProgress struct {
  45. Total int `json:"total"`
  46. Reused int `json:"reused"`
  47. CopiedFromOrigin int `json:"copiedFromOrigin"`
  48. CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
  49. CopiedFromElsewhere int `json:"copiedFromElsewhere"`
  50. Pulled int `json:"pulled"`
  51. Pulling int `json:"pulling"`
  52. BytesDone int64 `json:"bytesDone"`
  53. BytesTotal int64 `json:"bytesTotal"`
  54. }
  55. // A lockedWriterAt synchronizes WriteAt calls with an external mutex.
  56. // WriteAt() is goroutine safe by itself, but not against for example Close().
  57. type lockedWriterAt struct {
  58. mut *sync.RWMutex
  59. wr io.WriterAt
  60. }
  61. func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  62. (*w.mut).Lock()
  63. defer (*w.mut).Unlock()
  64. return w.wr.WriteAt(p, off)
  65. }
  66. // tempFile returns the fd for the temporary file, reusing an open fd
  67. // or creating the file as necessary.
  68. func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
  69. s.mut.Lock()
  70. defer s.mut.Unlock()
  71. // If we've already hit an error, return early
  72. if s.err != nil {
  73. return nil, s.err
  74. }
  75. // If the temp file is already open, return the file descriptor
  76. if s.fd != nil {
  77. return lockedWriterAt{&s.mut, s.fd}, nil
  78. }
  79. // Ensure that the parent directory is writable. This is
  80. // osutil.InWritableDir except we need to do more stuff so we duplicate it
  81. // here.
  82. dir := filepath.Dir(s.tempName)
  83. if info, err := os.Stat(dir); err != nil {
  84. if os.IsNotExist(err) {
  85. // XXX: This works around a bug elsewhere, a race condition when
  86. // things are deleted while being synced. However that happens, we
  87. // end up with a directory for "foo" with the delete bit, but a
  88. // file "foo/bar" that we want to sync. We never create the
  89. // directory, and hence fail to create the file and end up looping
  90. // forever on it. This breaks that by creating the directory; on
  91. // next scan it'll be found and the delete bit on it is removed.
  92. // The user can then clean up as they like...
  93. l.Infoln("Resurrecting directory", dir)
  94. if err := os.MkdirAll(dir, 0755); err != nil {
  95. s.failLocked("resurrect dir", err)
  96. return nil, err
  97. }
  98. } else {
  99. s.failLocked("dst stat dir", err)
  100. return nil, err
  101. }
  102. } else if info.Mode()&0200 == 0 {
  103. err := os.Chmod(dir, 0755)
  104. if !s.ignorePerms && err == nil {
  105. defer func() {
  106. err := os.Chmod(dir, info.Mode().Perm())
  107. if err != nil {
  108. panic(err)
  109. }
  110. }()
  111. }
  112. }
  113. // The permissions to use for the temporary file should be those of the
  114. // final file, except we need user read & write at minimum. The
  115. // permissions will be set to the final value later, but in the meantime
  116. // we don't want to have a temporary file with looser permissions than
  117. // the final outcome.
  118. mode := os.FileMode(s.file.Permissions) | 0600
  119. if s.ignorePerms {
  120. // When ignorePerms is set we use a very permissive mode and let the
  121. // system umask filter it.
  122. mode = 0666
  123. }
  124. // Attempt to create the temp file
  125. // RDWR because of issue #2994.
  126. flags := os.O_RDWR
  127. if s.reused == 0 {
  128. flags |= os.O_CREATE | os.O_EXCL
  129. } else if !s.ignorePerms {
  130. // With sufficiently bad luck when exiting or crashing, we may have
  131. // had time to chmod the temp file to read only state but not yet
  132. // moved it to it's final name. This leaves us with a read only temp
  133. // file that we're going to try to reuse. To handle that, we need to
  134. // make sure we have write permissions on the file before opening it.
  135. //
  136. // When ignorePerms is set we trust that the permissions are fine
  137. // already and make no modification, as we would otherwise override
  138. // what the umask dictates.
  139. if err := os.Chmod(s.tempName, mode); err != nil {
  140. s.failLocked("dst create chmod", err)
  141. return nil, err
  142. }
  143. }
  144. fd, err := os.OpenFile(s.tempName, flags, mode)
  145. if err != nil {
  146. s.failLocked("dst create", err)
  147. return nil, err
  148. }
  149. // Don't truncate symlink files, as that will mean that the path will
  150. // contain a bunch of nulls.
  151. if s.sparse && !s.file.IsSymlink() {
  152. // Truncate sets the size of the file. This creates a sparse file or a
  153. // space reservation, depending on the underlying filesystem.
  154. if err := fd.Truncate(s.file.Size); err != nil {
  155. s.failLocked("dst truncate", err)
  156. return nil, err
  157. }
  158. }
  159. // Same fd will be used by all writers
  160. s.fd = fd
  161. return lockedWriterAt{&s.mut, s.fd}, nil
  162. }
  163. // sourceFile opens the existing source file for reading
  164. func (s *sharedPullerState) sourceFile() (*os.File, error) {
  165. s.mut.Lock()
  166. defer s.mut.Unlock()
  167. // If we've already hit an error, return early
  168. if s.err != nil {
  169. return nil, s.err
  170. }
  171. // Attempt to open the existing file
  172. fd, err := os.Open(s.realName)
  173. if err != nil {
  174. s.failLocked("src open", err)
  175. return nil, err
  176. }
  177. return fd, nil
  178. }
  179. // earlyClose prints a warning message composed of the context and
  180. // error, and marks the sharedPullerState as failed. Is a no-op when called on
  181. // an already failed state.
  182. func (s *sharedPullerState) fail(context string, err error) {
  183. s.mut.Lock()
  184. defer s.mut.Unlock()
  185. s.failLocked(context, err)
  186. }
  187. func (s *sharedPullerState) failLocked(context string, err error) {
  188. if s.err != nil {
  189. return
  190. }
  191. l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
  192. s.err = err
  193. }
  194. func (s *sharedPullerState) failed() error {
  195. s.mut.RLock()
  196. err := s.err
  197. s.mut.RUnlock()
  198. return err
  199. }
  200. func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
  201. s.mut.Lock()
  202. s.copyNeeded--
  203. s.updated = time.Now()
  204. s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
  205. s.availableUpdated = time.Now()
  206. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  207. s.mut.Unlock()
  208. }
  209. func (s *sharedPullerState) copiedFromOrigin() {
  210. s.mut.Lock()
  211. s.copyOrigin++
  212. s.updated = time.Now()
  213. s.mut.Unlock()
  214. }
  215. func (s *sharedPullerState) copiedFromOriginShifted() {
  216. s.mut.Lock()
  217. s.copyOrigin++
  218. s.copyOriginShifted++
  219. s.updated = time.Now()
  220. s.mut.Unlock()
  221. }
  222. func (s *sharedPullerState) pullStarted() {
  223. s.mut.Lock()
  224. s.copyTotal--
  225. s.copyNeeded--
  226. s.pullTotal++
  227. s.pullNeeded++
  228. s.updated = time.Now()
  229. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  230. s.mut.Unlock()
  231. }
  232. func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
  233. s.mut.Lock()
  234. s.pullNeeded--
  235. s.updated = time.Now()
  236. s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
  237. s.availableUpdated = time.Now()
  238. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  239. s.mut.Unlock()
  240. }
  241. // finalClose atomically closes and returns closed status of a file. A true
  242. // first return value means the file was closed and should be finished, with
  243. // the error indicating the success or failure of the close. A false first
  244. // return value indicates the file is not ready to be closed, or is already
  245. // closed and should in either case not be finished off now.
  246. func (s *sharedPullerState) finalClose() (bool, error) {
  247. s.mut.Lock()
  248. defer s.mut.Unlock()
  249. if s.closed {
  250. // Already closed
  251. return false, nil
  252. }
  253. if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
  254. // Not done yet, and not errored
  255. return false, nil
  256. }
  257. if s.fd != nil {
  258. if closeErr := s.fd.Close(); closeErr != nil && s.err == nil {
  259. // This is our error if we weren't errored before. Otherwise we
  260. // keep the earlier error.
  261. s.err = closeErr
  262. }
  263. s.fd = nil
  264. }
  265. s.closed = true
  266. return true, s.err
  267. }
  268. // Progress returns the momentarily progress for the puller
  269. func (s *sharedPullerState) Progress() *pullerProgress {
  270. s.mut.RLock()
  271. defer s.mut.RUnlock()
  272. total := s.reused + s.copyTotal + s.pullTotal
  273. done := total - s.copyNeeded - s.pullNeeded
  274. return &pullerProgress{
  275. Total: total,
  276. Reused: s.reused,
  277. CopiedFromOrigin: s.copyOrigin,
  278. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  279. Pulled: s.pullTotal - s.pullNeeded,
  280. Pulling: s.pullNeeded,
  281. BytesTotal: blocksToSize(total),
  282. BytesDone: blocksToSize(done),
  283. }
  284. }
  285. // Updated returns the time when any of the progress related counters was last updated.
  286. func (s *sharedPullerState) Updated() time.Time {
  287. s.mut.RLock()
  288. t := s.updated
  289. s.mut.RUnlock()
  290. return t
  291. }
  292. // AvailableUpdated returns the time last time list of available blocks was updated
  293. func (s *sharedPullerState) AvailableUpdated() time.Time {
  294. s.mut.RLock()
  295. t := s.availableUpdated
  296. s.mut.RUnlock()
  297. return t
  298. }
  299. // Available returns blocks available in the current temporary file
  300. func (s *sharedPullerState) Available() []int32 {
  301. s.mut.RLock()
  302. blocks := s.available
  303. s.mut.RUnlock()
  304. return blocks
  305. }
  306. func blocksToSize(num int) int64 {
  307. if num < 2 {
  308. return protocol.BlockSize / 2
  309. }
  310. return int64(num-1)*protocol.BlockSize + protocol.BlockSize/2
  311. }