sharedpullerstate.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This program is free software: you can redistribute it and/or modify it
  4. // under the terms of the GNU General Public License as published by the Free
  5. // Software Foundation, either version 3 of the License, or (at your option)
  6. // any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful, but WITHOUT
  9. // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  11. // more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program. If not, see <http://www.gnu.org/licenses/>.
  15. package model
  16. import (
  17. "io"
  18. "os"
  19. "path/filepath"
  20. "sync"
  21. "github.com/syncthing/syncthing/internal/protocol"
  22. )
  23. // A sharedPullerState is kept for each file that is being synced and is kept
  24. // updated along the way.
  25. type sharedPullerState struct {
  26. // Immutable, does not require locking
  27. file protocol.FileInfo
  28. folder string
  29. tempName string
  30. realName string
  31. reused uint32 // Number of blocks reused from temporary file
  32. // Mutable, must be locked for access
  33. err error // The first error we hit
  34. fd *os.File // The fd of the temp file
  35. copyTotal uint32 // Total number of copy actions for the whole job
  36. pullTotal uint32 // Total number of pull actions for the whole job
  37. copyOrigin uint32 // Number of blocks copied from the original file
  38. copyNeeded uint32 // Number of copy actions still pending
  39. pullNeeded uint32 // Number of block pulls still pending
  40. closed bool // Set when the file has been closed
  41. mut sync.Mutex // Protects the above
  42. }
  43. // A momentary state representing the progress of the puller
  44. type pullerProgress struct {
  45. Total uint32
  46. Reused uint32
  47. CopiedFromOrigin uint32
  48. CopiedFromElsewhere uint32
  49. Pulled uint32
  50. Pulling uint32
  51. BytesDone int64
  52. BytesTotal int64
  53. }
  54. // A lockedWriterAt synchronizes WriteAt calls with an external mutex.
  55. // WriteAt() is goroutine safe by itself, but not against for example Close().
  56. type lockedWriterAt struct {
  57. mut *sync.Mutex
  58. wr io.WriterAt
  59. }
  60. func (w lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
  61. w.mut.Lock()
  62. defer w.mut.Unlock()
  63. return w.wr.WriteAt(p, off)
  64. }
  65. // tempFile returns the fd for the temporary file, reusing an open fd
  66. // or creating the file as necessary.
  67. func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
  68. s.mut.Lock()
  69. defer s.mut.Unlock()
  70. // If we've already hit an error, return early
  71. if s.err != nil {
  72. return nil, s.err
  73. }
  74. // If the temp file is already open, return the file descriptor
  75. if s.fd != nil {
  76. return lockedWriterAt{&s.mut, s.fd}, nil
  77. }
  78. // Ensure that the parent directory is writable. This is
  79. // osutil.InWritableDir except we need to do more stuff so we duplicate it
  80. // here.
  81. dir := filepath.Dir(s.tempName)
  82. if info, err := os.Stat(dir); err != nil {
  83. s.earlyCloseLocked("dst stat dir", err)
  84. return nil, err
  85. } else if info.Mode()&0200 == 0 {
  86. err := os.Chmod(dir, 0755)
  87. if err == nil {
  88. defer func() {
  89. err := os.Chmod(dir, info.Mode().Perm())
  90. if err != nil {
  91. panic(err)
  92. }
  93. }()
  94. }
  95. }
  96. // Attempt to create the temp file
  97. flags := os.O_WRONLY
  98. if s.reused == 0 {
  99. flags |= os.O_CREATE | os.O_EXCL
  100. } else {
  101. // With sufficiently bad luck when exiting or crashing, we may have
  102. // had time to chmod the temp file to read only state but not yet
  103. // moved it to it's final name. This leaves us with a read only temp
  104. // file that we're going to try to reuse. To handle that, we need to
  105. // make sure we have write permissions on the file before opening it.
  106. err := os.Chmod(s.tempName, 0644)
  107. if err != nil {
  108. s.earlyCloseLocked("dst create chmod", err)
  109. return nil, err
  110. }
  111. }
  112. fd, err := os.OpenFile(s.tempName, flags, 0644)
  113. if err != nil {
  114. s.earlyCloseLocked("dst create", err)
  115. return nil, err
  116. }
  117. // Same fd will be used by all writers
  118. s.fd = fd
  119. return lockedWriterAt{&s.mut, s.fd}, nil
  120. }
  121. // sourceFile opens the existing source file for reading
  122. func (s *sharedPullerState) sourceFile() (*os.File, error) {
  123. s.mut.Lock()
  124. defer s.mut.Unlock()
  125. // If we've already hit an error, return early
  126. if s.err != nil {
  127. return nil, s.err
  128. }
  129. // Attempt to open the existing file
  130. fd, err := os.Open(s.realName)
  131. if err != nil {
  132. s.earlyCloseLocked("src open", err)
  133. return nil, err
  134. }
  135. return fd, nil
  136. }
  137. // earlyClose prints a warning message composed of the context and
  138. // error, and marks the sharedPullerState as failed. Is a no-op when called on
  139. // an already failed state.
  140. func (s *sharedPullerState) earlyClose(context string, err error) {
  141. s.mut.Lock()
  142. defer s.mut.Unlock()
  143. s.earlyCloseLocked(context, err)
  144. }
  145. func (s *sharedPullerState) earlyCloseLocked(context string, err error) {
  146. if s.err != nil {
  147. return
  148. }
  149. l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
  150. s.err = err
  151. if s.fd != nil {
  152. s.fd.Close()
  153. }
  154. s.closed = true
  155. }
  156. func (s *sharedPullerState) failed() error {
  157. s.mut.Lock()
  158. defer s.mut.Unlock()
  159. return s.err
  160. }
  161. func (s *sharedPullerState) copyDone() {
  162. s.mut.Lock()
  163. s.copyNeeded--
  164. if debug {
  165. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  166. }
  167. s.mut.Unlock()
  168. }
  169. func (s *sharedPullerState) copiedFromOrigin() {
  170. s.mut.Lock()
  171. s.copyOrigin++
  172. s.mut.Unlock()
  173. }
  174. func (s *sharedPullerState) pullStarted() {
  175. s.mut.Lock()
  176. s.copyTotal--
  177. s.copyNeeded--
  178. s.pullTotal++
  179. s.pullNeeded++
  180. if debug {
  181. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  182. }
  183. s.mut.Unlock()
  184. }
  185. func (s *sharedPullerState) pullDone() {
  186. s.mut.Lock()
  187. s.pullNeeded--
  188. if debug {
  189. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  190. }
  191. s.mut.Unlock()
  192. }
  193. // finalClose atomically closes and returns closed status of a file. A true
  194. // first return value means the file was closed and should be finished, with
  195. // the error indicating the success or failure of the close. A false first
  196. // return value indicates the file is not ready to be closed, or is already
  197. // closed and should in either case not be finished off now.
  198. func (s *sharedPullerState) finalClose() (bool, error) {
  199. s.mut.Lock()
  200. defer s.mut.Unlock()
  201. if s.pullNeeded+s.copyNeeded != 0 {
  202. // Not done yet.
  203. return false, nil
  204. }
  205. if s.closed {
  206. // Already handled.
  207. return false, nil
  208. }
  209. s.closed = true
  210. if fd := s.fd; fd != nil {
  211. s.fd = nil
  212. return true, fd.Close()
  213. }
  214. return true, nil
  215. }
  216. // Returns the momentarily progress for the puller
  217. func (s *sharedPullerState) Progress() *pullerProgress {
  218. s.mut.Lock()
  219. defer s.mut.Unlock()
  220. total := s.reused + s.copyTotal + s.pullTotal
  221. done := total - s.copyNeeded - s.pullNeeded
  222. return &pullerProgress{
  223. Total: total,
  224. Reused: s.reused,
  225. CopiedFromOrigin: s.copyOrigin,
  226. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  227. Pulled: s.pullTotal - s.pullNeeded,
  228. Pulling: s.pullNeeded,
  229. BytesTotal: protocol.BlocksToSize(total),
  230. BytesDone: protocol.BlocksToSize(done),
  231. }
  232. }