folder.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "errors"
  10. "time"
  11. "github.com/syncthing/syncthing/lib/config"
  12. "github.com/syncthing/syncthing/lib/ignore"
  13. "github.com/syncthing/syncthing/lib/protocol"
  14. "github.com/syncthing/syncthing/lib/sync"
  15. "github.com/syncthing/syncthing/lib/watchaggregator"
  16. )
  17. var errWatchNotStarted error = errors.New("not started")
  18. type folder struct {
  19. stateTracker
  20. config.FolderConfiguration
  21. model *Model
  22. shortID protocol.ShortID
  23. ctx context.Context
  24. cancel context.CancelFunc
  25. scan folderScanner
  26. initialScanFinished chan struct{}
  27. pullScheduled chan struct{}
  28. watchCancel context.CancelFunc
  29. watchChan chan []string
  30. restartWatchChan chan struct{}
  31. watchErr error
  32. watchErrMut sync.Mutex
  33. puller puller
  34. }
  35. type puller interface {
  36. pull() bool // true when successfull and should not be retried
  37. }
  38. func newFolder(model *Model, cfg config.FolderConfiguration) folder {
  39. ctx, cancel := context.WithCancel(context.Background())
  40. return folder{
  41. stateTracker: newStateTracker(cfg.ID),
  42. FolderConfiguration: cfg,
  43. model: model,
  44. shortID: model.shortID,
  45. ctx: ctx,
  46. cancel: cancel,
  47. scan: newFolderScanner(cfg),
  48. initialScanFinished: make(chan struct{}),
  49. pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
  50. watchCancel: func() {},
  51. watchErr: errWatchNotStarted,
  52. watchErrMut: sync.NewMutex(),
  53. }
  54. }
  55. func (f *folder) Serve() {
  56. l.Debugln(f, "starting")
  57. defer l.Debugln(f, "exiting")
  58. defer func() {
  59. f.scan.timer.Stop()
  60. f.setState(FolderIdle)
  61. }()
  62. pause := f.basePause()
  63. pullFailTimer := time.NewTimer(0)
  64. <-pullFailTimer.C
  65. if f.FSWatcherEnabled && f.CheckHealth() == nil {
  66. f.startWatch()
  67. }
  68. initialCompleted := f.initialScanFinished
  69. for {
  70. select {
  71. case <-f.ctx.Done():
  72. return
  73. case <-f.pullScheduled:
  74. pullFailTimer.Stop()
  75. select {
  76. case <-pullFailTimer.C:
  77. default:
  78. }
  79. if !f.puller.pull() {
  80. // Pulling failed, try again later.
  81. pullFailTimer.Reset(pause)
  82. }
  83. case <-pullFailTimer.C:
  84. if f.puller.pull() {
  85. // We're good. Don't schedule another fail pull and reset
  86. // the pause interval.
  87. pause = f.basePause()
  88. continue
  89. }
  90. // Pulling failed, try again later.
  91. l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), pause)
  92. pullFailTimer.Reset(pause)
  93. // Back off from retrying to pull with an upper limit.
  94. if pause < 60*f.basePause() {
  95. pause *= 2
  96. }
  97. case <-initialCompleted:
  98. // Initial scan has completed, we should do a pull
  99. initialCompleted = nil // never hit this case again
  100. if !f.puller.pull() {
  101. // Pulling failed, try again later.
  102. pullFailTimer.Reset(pause)
  103. }
  104. // The reason for running the scanner from within the puller is that
  105. // this is the easiest way to make sure we are not doing both at the
  106. // same time.
  107. case <-f.scan.timer.C:
  108. l.Debugln(f, "Scanning subdirectories")
  109. f.scanTimerFired()
  110. case req := <-f.scan.now:
  111. req.err <- f.scanSubdirs(req.subdirs)
  112. case next := <-f.scan.delay:
  113. f.scan.timer.Reset(next)
  114. case fsEvents := <-f.watchChan:
  115. l.Debugln(f, "filesystem notification rescan")
  116. f.scanSubdirs(fsEvents)
  117. case <-f.restartWatchChan:
  118. f.restartWatch()
  119. }
  120. }
  121. }
  122. func (f *folder) BringToFront(string) {}
  123. func (f *folder) DelayScan(next time.Duration) {
  124. f.scan.Delay(next)
  125. }
  126. func (f *folder) IgnoresUpdated() {
  127. if f.FSWatcherEnabled {
  128. f.scheduleWatchRestart()
  129. }
  130. }
  131. func (f *folder) SchedulePull() {
  132. select {
  133. case f.pullScheduled <- struct{}{}:
  134. default:
  135. // We might be busy doing a pull and thus not reading from this
  136. // channel. The channel is 1-buffered, so one notification will be
  137. // queued to ensure we recheck after the pull, but beyond that we must
  138. // make sure to not block index receiving.
  139. }
  140. }
  141. func (f *folder) Jobs() ([]string, []string) {
  142. return nil, nil
  143. }
  144. func (f *folder) Scan(subdirs []string) error {
  145. <-f.initialScanFinished
  146. return f.scan.Scan(subdirs)
  147. }
  148. func (f *folder) Stop() {
  149. f.cancel()
  150. }
  151. // CheckHealth checks the folder for common errors, updates the folder state
  152. // and returns the current folder error, or nil if the folder is healthy.
  153. func (f *folder) CheckHealth() error {
  154. err := f.getHealthError()
  155. f.setError(err)
  156. return err
  157. }
  158. func (f *folder) getHealthError() error {
  159. // Check for folder errors, with the most serious and specific first and
  160. // generic ones like out of space on the home disk later.
  161. if err := f.CheckPath(); err != nil {
  162. return err
  163. }
  164. if err := f.CheckFreeSpace(); err != nil {
  165. return err
  166. }
  167. if err := f.model.cfg.CheckHomeFreeSpace(); err != nil {
  168. return err
  169. }
  170. return nil
  171. }
  172. func (f *folder) scanSubdirs(subDirs []string) error {
  173. if err := f.model.internalScanFolderSubdirs(f.ctx, f.folderID, subDirs); err != nil {
  174. // Potentially sets the error twice, once in the scanner just
  175. // by doing a check, and once here, if the error returned is
  176. // the same one as returned by CheckHealth, though
  177. // duplicate set is handled by setError.
  178. f.setError(err)
  179. return err
  180. }
  181. return nil
  182. }
  183. func (f *folder) scanTimerFired() {
  184. err := f.scanSubdirs(nil)
  185. select {
  186. case <-f.initialScanFinished:
  187. default:
  188. status := "Completed"
  189. if err != nil {
  190. status = "Failed"
  191. }
  192. l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
  193. close(f.initialScanFinished)
  194. }
  195. f.scan.Reschedule()
  196. }
  197. func (f *folder) WatchError() error {
  198. f.watchErrMut.Lock()
  199. defer f.watchErrMut.Unlock()
  200. return f.watchErr
  201. }
  202. // stopWatch immediately aborts watching and may be called asynchronously
  203. func (f *folder) stopWatch() {
  204. f.watchCancel()
  205. f.watchErrMut.Lock()
  206. f.watchErr = errWatchNotStarted
  207. f.watchErrMut.Unlock()
  208. }
  209. // scheduleWatchRestart makes sure watching is restarted from the main for loop
  210. // in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
  211. func (f *folder) scheduleWatchRestart() {
  212. select {
  213. case f.restartWatchChan <- struct{}{}:
  214. default:
  215. // We might be busy doing a pull and thus not reading from this
  216. // channel. The channel is 1-buffered, so one notification will be
  217. // queued to ensure we recheck after the pull.
  218. }
  219. }
  220. // restartWatch should only ever be called synchronously. If you want to use
  221. // this asynchronously, you should probably use scheduleWatchRestart instead.
  222. func (f *folder) restartWatch() {
  223. f.stopWatch()
  224. f.startWatch()
  225. f.Scan(nil)
  226. }
  227. // startWatch should only ever be called synchronously. If you want to use
  228. // this asynchronously, you should probably use scheduleWatchRestart instead.
  229. func (f *folder) startWatch() {
  230. ctx, cancel := context.WithCancel(f.ctx)
  231. f.model.fmut.RLock()
  232. ignores := f.model.folderIgnores[f.folderID]
  233. f.model.fmut.RUnlock()
  234. f.watchChan = make(chan []string)
  235. f.watchCancel = cancel
  236. go f.startWatchAsync(ctx, ignores)
  237. }
  238. // startWatchAsync tries to start the filesystem watching and retries every minute on failure.
  239. // It is a convenience function that should not be used except in startWatch.
  240. func (f *folder) startWatchAsync(ctx context.Context, ignores *ignore.Matcher) {
  241. timer := time.NewTimer(0)
  242. for {
  243. select {
  244. case <-timer.C:
  245. eventChan, err := f.Filesystem().Watch(".", ignores, ctx, f.IgnorePerms)
  246. f.watchErrMut.Lock()
  247. prevErr := f.watchErr
  248. f.watchErr = err
  249. f.watchErrMut.Unlock()
  250. if err != nil {
  251. if prevErr == errWatchNotStarted {
  252. l.Warnf("Failed to start filesystem watcher for folder %s: %v", f.Description(), err)
  253. } else {
  254. l.Debugf("Failed to start filesystem watcher for folder %s again: %v", f.Description(), err)
  255. }
  256. timer.Reset(time.Minute)
  257. continue
  258. }
  259. watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, ctx)
  260. l.Debugln("Started filesystem watcher for folder", f.Description())
  261. return
  262. case <-ctx.Done():
  263. return
  264. }
  265. }
  266. }
  267. func (f *folder) setError(err error) {
  268. _, _, oldErr := f.getState()
  269. if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
  270. return
  271. }
  272. if err != nil {
  273. if oldErr == nil {
  274. l.Warnf("Error on folder %s: %v", f.Description(), err)
  275. } else {
  276. l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
  277. }
  278. } else {
  279. l.Infoln("Cleared error on folder", f.Description())
  280. }
  281. if f.FSWatcherEnabled {
  282. if err != nil {
  283. f.stopWatch()
  284. } else {
  285. f.scheduleWatchRestart()
  286. }
  287. }
  288. f.stateTracker.setError(err)
  289. }
  290. func (f *folder) basePause() time.Duration {
  291. if f.PullerPauseS == 0 {
  292. return defaultPullerPause
  293. }
  294. return time.Duration(f.PullerPauseS) * time.Second
  295. }