sharedpullerstate.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "io"
  9. "os"
  10. "path/filepath"
  11. "sync"
  12. "github.com/syncthing/protocol"
  13. "github.com/syncthing/syncthing/internal/db"
  14. )
  15. // A sharedPullerState is kept for each file that is being synced and is kept
  16. // updated along the way.
  17. type sharedPullerState struct {
  18. // Immutable, does not require locking
  19. file protocol.FileInfo
  20. folder string
  21. tempName string
  22. realName string
  23. reused int // Number of blocks reused from temporary file
  24. ignorePerms bool
  25. // Mutable, must be locked for access
  26. err error // The first error we hit
  27. fd *os.File // The fd of the temp file
  28. copyTotal int // Total number of copy actions for the whole job
  29. pullTotal int // Total number of pull actions for the whole job
  30. copyOrigin int // Number of blocks copied from the original file
  31. copyNeeded int // Number of copy actions still pending
  32. pullNeeded int // Number of block pulls still pending
  33. mut sync.Mutex // Protects the above
  34. }
  35. // A momentary state representing the progress of the puller
  36. type pullerProgress struct {
  37. Total int `json:"total"`
  38. Reused int `json:"reused"`
  39. CopiedFromOrigin int `json:"copiedFromOrigin"`
  40. CopiedFromElsewhere int `json:"copiedFromElsewhere"`
  41. Pulled int `json:"pulled"`
  42. Pulling int `json:"pulling"`
  43. BytesDone int64 `json:"bytesDone"`
  44. BytesTotal int64 `json:"bytesTotal"`
  45. }
  46. // A lockedWriterAt synchronizes WriteAt calls with an external mutex.
  47. // WriteAt() is goroutine safe by itself, but not against for example Close().
  48. type lockedWriterAt struct {
  49. mut *sync.Mutex
  50. wr io.WriterAt
  51. }
  52. func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  53. w.mut.Lock()
  54. defer w.mut.Unlock()
  55. return w.wr.WriteAt(p, off)
  56. }
  57. // tempFile returns the fd for the temporary file, reusing an open fd
  58. // or creating the file as necessary.
  59. func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
  60. s.mut.Lock()
  61. defer s.mut.Unlock()
  62. // If we've already hit an error, return early
  63. if s.err != nil {
  64. return nil, s.err
  65. }
  66. // If the temp file is already open, return the file descriptor
  67. if s.fd != nil {
  68. return lockedWriterAt{&s.mut, s.fd}, nil
  69. }
  70. // Ensure that the parent directory is writable. This is
  71. // osutil.InWritableDir except we need to do more stuff so we duplicate it
  72. // here.
  73. dir := filepath.Dir(s.tempName)
  74. if info, err := os.Stat(dir); err != nil {
  75. s.failLocked("dst stat dir", err)
  76. return nil, err
  77. } else if info.Mode()&0200 == 0 {
  78. err := os.Chmod(dir, 0755)
  79. if !s.ignorePerms && err == nil {
  80. defer func() {
  81. err := os.Chmod(dir, info.Mode().Perm())
  82. if err != nil {
  83. panic(err)
  84. }
  85. }()
  86. }
  87. }
  88. // Attempt to create the temp file
  89. flags := os.O_WRONLY
  90. if s.reused == 0 {
  91. flags |= os.O_CREATE | os.O_EXCL
  92. } else {
  93. // With sufficiently bad luck when exiting or crashing, we may have
  94. // had time to chmod the temp file to read only state but not yet
  95. // moved it to it's final name. This leaves us with a read only temp
  96. // file that we're going to try to reuse. To handle that, we need to
  97. // make sure we have write permissions on the file before opening it.
  98. err := os.Chmod(s.tempName, 0644)
  99. if !s.ignorePerms && err != nil {
  100. s.failLocked("dst create chmod", err)
  101. return nil, err
  102. }
  103. }
  104. fd, err := os.OpenFile(s.tempName, flags, 0644)
  105. if err != nil {
  106. s.failLocked("dst create", err)
  107. return nil, err
  108. }
  109. // Same fd will be used by all writers
  110. s.fd = fd
  111. return lockedWriterAt{&s.mut, s.fd}, nil
  112. }
  113. // sourceFile opens the existing source file for reading
  114. func (s *sharedPullerState) sourceFile() (*os.File, error) {
  115. s.mut.Lock()
  116. defer s.mut.Unlock()
  117. // If we've already hit an error, return early
  118. if s.err != nil {
  119. return nil, s.err
  120. }
  121. // Attempt to open the existing file
  122. fd, err := os.Open(s.realName)
  123. if err != nil {
  124. s.failLocked("src open", err)
  125. return nil, err
  126. }
  127. return fd, nil
  128. }
  129. // earlyClose prints a warning message composed of the context and
  130. // error, and marks the sharedPullerState as failed. Is a no-op when called on
  131. // an already failed state.
  132. func (s *sharedPullerState) fail(context string, err error) {
  133. s.mut.Lock()
  134. defer s.mut.Unlock()
  135. s.failLocked(context, err)
  136. }
  137. func (s *sharedPullerState) failLocked(context string, err error) {
  138. if s.err != nil {
  139. return
  140. }
  141. l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
  142. s.err = err
  143. }
  144. func (s *sharedPullerState) failed() error {
  145. s.mut.Lock()
  146. defer s.mut.Unlock()
  147. return s.err
  148. }
  149. func (s *sharedPullerState) copyDone() {
  150. s.mut.Lock()
  151. s.copyNeeded--
  152. if debug {
  153. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  154. }
  155. s.mut.Unlock()
  156. }
  157. func (s *sharedPullerState) copiedFromOrigin() {
  158. s.mut.Lock()
  159. s.copyOrigin++
  160. s.mut.Unlock()
  161. }
  162. func (s *sharedPullerState) pullStarted() {
  163. s.mut.Lock()
  164. s.copyTotal--
  165. s.copyNeeded--
  166. s.pullTotal++
  167. s.pullNeeded++
  168. if debug {
  169. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  170. }
  171. s.mut.Unlock()
  172. }
  173. func (s *sharedPullerState) pullDone() {
  174. s.mut.Lock()
  175. s.pullNeeded--
  176. if debug {
  177. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  178. }
  179. s.mut.Unlock()
  180. }
  181. // finalClose atomically closes and returns closed status of a file. A true
  182. // first return value means the file was closed and should be finished, with
  183. // the error indicating the success or failure of the close. A false first
  184. // return value indicates the file is not ready to be closed, or is already
  185. // closed and should in either case not be finished off now.
  186. func (s *sharedPullerState) finalClose() (bool, error) {
  187. s.mut.Lock()
  188. defer s.mut.Unlock()
  189. if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
  190. // Not done yet.
  191. return false, nil
  192. }
  193. if fd := s.fd; fd != nil {
  194. s.fd = nil
  195. return true, fd.Close()
  196. }
  197. return false, nil
  198. }
  199. // Returns the momentarily progress for the puller
  200. func (s *sharedPullerState) Progress() *pullerProgress {
  201. s.mut.Lock()
  202. defer s.mut.Unlock()
  203. total := s.reused + s.copyTotal + s.pullTotal
  204. done := total - s.copyNeeded - s.pullNeeded
  205. return &pullerProgress{
  206. Total: total,
  207. Reused: s.reused,
  208. CopiedFromOrigin: s.copyOrigin,
  209. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  210. Pulled: s.pullTotal - s.pullNeeded,
  211. Pulling: s.pullNeeded,
  212. BytesTotal: db.BlocksToSize(total),
  213. BytesDone: db.BlocksToSize(done),
  214. }
  215. }