sharedpullerstate.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This program is free software: you can redistribute it and/or modify it
  4. // under the terms of the GNU General Public License as published by the Free
  5. // Software Foundation, either version 3 of the License, or (at your option)
  6. // any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful, but WITHOUT
  9. // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  11. // more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program. If not, see <http://www.gnu.org/licenses/>.
  15. package model
  16. import (
  17. "os"
  18. "path/filepath"
  19. "sync"
  20. "github.com/syncthing/syncthing/internal/protocol"
  21. )
  22. // A sharedPullerState is kept for each file that is being synced and is kept
  23. // updated along the way.
  24. type sharedPullerState struct {
  25. // Immutable, does not require locking
  26. file protocol.FileInfo
  27. folder string
  28. tempName string
  29. realName string
  30. reused uint32 // Number of blocks reused from temporary file
  31. // Mutable, must be locked for access
  32. err error // The first error we hit
  33. fd *os.File // The fd of the temp file
  34. copyTotal uint32 // Total number of copy actions for the whole job
  35. pullTotal uint32 // Total number of pull actions for the whole job
  36. copyOrigin uint32 // Number of blocks copied from the original file
  37. copyNeeded uint32 // Number of copy actions still pending
  38. pullNeeded uint32 // Number of block pulls still pending
  39. closed bool // Set when the file has been closed
  40. mut sync.Mutex // Protects the above
  41. }
  42. // A momentary state representing the progress of the puller
  43. type pullerProgress struct {
  44. Total uint32
  45. Reused uint32
  46. CopiedFromOrigin uint32
  47. CopiedFromElsewhere uint32
  48. Pulled uint32
  49. Pulling uint32
  50. BytesDone int64
  51. BytesTotal int64
  52. }
  53. // tempFile returns the fd for the temporary file, reusing an open fd
  54. // or creating the file as necessary.
  55. func (s *sharedPullerState) tempFile() (*os.File, error) {
  56. s.mut.Lock()
  57. defer s.mut.Unlock()
  58. // If we've already hit an error, return early
  59. if s.err != nil {
  60. return nil, s.err
  61. }
  62. // If the temp file is already open, return the file descriptor
  63. if s.fd != nil {
  64. return s.fd, nil
  65. }
  66. // Ensure that the parent directory is writable. This is
  67. // osutil.InWritableDir except we need to do more stuff so we duplicate it
  68. // here.
  69. dir := filepath.Dir(s.tempName)
  70. if info, err := os.Stat(dir); err != nil {
  71. s.earlyCloseLocked("dst stat dir", err)
  72. return nil, err
  73. } else if info.Mode()&0200 == 0 {
  74. err := os.Chmod(dir, 0755)
  75. if err == nil {
  76. defer func() {
  77. err := os.Chmod(dir, info.Mode().Perm())
  78. if err != nil {
  79. panic(err)
  80. }
  81. }()
  82. }
  83. }
  84. // Attempt to create the temp file
  85. flags := os.O_WRONLY
  86. if s.reused == 0 {
  87. flags |= os.O_CREATE | os.O_EXCL
  88. }
  89. fd, err := os.OpenFile(s.tempName, flags, 0644)
  90. if err != nil {
  91. s.earlyCloseLocked("dst create", err)
  92. return nil, err
  93. }
  94. // Same fd will be used by all writers
  95. s.fd = fd
  96. return fd, nil
  97. }
  98. // sourceFile opens the existing source file for reading
  99. func (s *sharedPullerState) sourceFile() (*os.File, error) {
  100. s.mut.Lock()
  101. defer s.mut.Unlock()
  102. // If we've already hit an error, return early
  103. if s.err != nil {
  104. return nil, s.err
  105. }
  106. // Attempt to open the existing file
  107. fd, err := os.Open(s.realName)
  108. if err != nil {
  109. s.earlyCloseLocked("src open", err)
  110. return nil, err
  111. }
  112. return fd, nil
  113. }
  114. // earlyClose prints a warning message composed of the context and
  115. // error, and marks the sharedPullerState as failed. Is a no-op when called on
  116. // an already failed state.
  117. func (s *sharedPullerState) earlyClose(context string, err error) {
  118. s.mut.Lock()
  119. defer s.mut.Unlock()
  120. s.earlyCloseLocked(context, err)
  121. }
  122. func (s *sharedPullerState) earlyCloseLocked(context string, err error) {
  123. if s.err != nil {
  124. return
  125. }
  126. l.Infof("Puller (folder %q, file %q): %s: %v", s.folder, s.file.Name, context, err)
  127. s.err = err
  128. if s.fd != nil {
  129. s.fd.Close()
  130. }
  131. s.closed = true
  132. }
  133. func (s *sharedPullerState) failed() error {
  134. s.mut.Lock()
  135. defer s.mut.Unlock()
  136. return s.err
  137. }
  138. func (s *sharedPullerState) copyDone() {
  139. s.mut.Lock()
  140. s.copyNeeded--
  141. if debug {
  142. l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
  143. }
  144. s.mut.Unlock()
  145. }
  146. func (s *sharedPullerState) copiedFromOrigin() {
  147. s.mut.Lock()
  148. s.copyOrigin++
  149. s.mut.Unlock()
  150. }
  151. func (s *sharedPullerState) pullStarted() {
  152. s.mut.Lock()
  153. s.copyTotal--
  154. s.copyNeeded--
  155. s.pullTotal++
  156. s.pullNeeded++
  157. if debug {
  158. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
  159. }
  160. s.mut.Unlock()
  161. }
  162. func (s *sharedPullerState) pullDone() {
  163. s.mut.Lock()
  164. s.pullNeeded--
  165. if debug {
  166. l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
  167. }
  168. s.mut.Unlock()
  169. }
  170. // finalClose atomically closes and returns closed status of a file. A true
  171. // first return value means the file was closed and should be finished, with
  172. // the error indicating the success or failure of the close. A false first
  173. // return value indicates the file is not ready to be closed, or is already
  174. // closed and should in either case not be finished off now.
  175. func (s *sharedPullerState) finalClose() (bool, error) {
  176. s.mut.Lock()
  177. defer s.mut.Unlock()
  178. if s.pullNeeded+s.copyNeeded != 0 {
  179. // Not done yet.
  180. return false, nil
  181. }
  182. if s.closed {
  183. // Already handled.
  184. return false, nil
  185. }
  186. s.closed = true
  187. if fd := s.fd; fd != nil {
  188. s.fd = nil
  189. return true, fd.Close()
  190. }
  191. return true, nil
  192. }
  193. // Returns the momentarily progress for the puller
  194. func (s *sharedPullerState) Progress() *pullerProgress {
  195. s.mut.Lock()
  196. defer s.mut.Unlock()
  197. total := s.reused + s.copyTotal + s.pullTotal
  198. done := total - s.copyNeeded - s.pullNeeded
  199. return &pullerProgress{
  200. Total: total,
  201. Reused: s.reused,
  202. CopiedFromOrigin: s.copyOrigin,
  203. CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
  204. Pulled: s.pullTotal - s.pullNeeded,
  205. Pulling: s.pullNeeded,
  206. BytesTotal: protocol.BlocksToSize(total),
  207. BytesDone: protocol.BlocksToSize(done),
  208. }
  209. }