folder.go 9.8 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "errors"
  10. "fmt"
  11. "math/rand"
  12. "time"
  13. "github.com/syncthing/syncthing/lib/config"
  14. "github.com/syncthing/syncthing/lib/db"
  15. "github.com/syncthing/syncthing/lib/ignore"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "github.com/syncthing/syncthing/lib/sync"
  18. "github.com/syncthing/syncthing/lib/watchaggregator"
  19. )
  20. var errWatchNotStarted = errors.New("not started")
  21. type folder struct {
  22. stateTracker
  23. config.FolderConfiguration
  24. model *Model
  25. shortID protocol.ShortID
  26. ctx context.Context
  27. cancel context.CancelFunc
  28. scanInterval time.Duration
  29. scanTimer *time.Timer
  30. scanNow chan rescanRequest
  31. scanDelay chan time.Duration
  32. initialScanFinished chan struct{}
  33. pullScheduled chan struct{}
  34. watchCancel context.CancelFunc
  35. watchChan chan []string
  36. restartWatchChan chan struct{}
  37. watchErr error
  38. watchErrMut sync.Mutex
  39. puller puller
  40. }
  41. type rescanRequest struct {
  42. subdirs []string
  43. err chan error
  44. }
  45. type puller interface {
  46. pull() bool // true when successfull and should not be retried
  47. }
  48. func newFolder(model *Model, cfg config.FolderConfiguration) folder {
  49. ctx, cancel := context.WithCancel(context.Background())
  50. return folder{
  51. stateTracker: newStateTracker(cfg.ID),
  52. FolderConfiguration: cfg,
  53. model: model,
  54. shortID: model.shortID,
  55. ctx: ctx,
  56. cancel: cancel,
  57. scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
  58. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
  59. scanNow: make(chan rescanRequest),
  60. scanDelay: make(chan time.Duration),
  61. initialScanFinished: make(chan struct{}),
  62. pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
  63. watchCancel: func() {},
  64. watchErr: errWatchNotStarted,
  65. watchErrMut: sync.NewMutex(),
  66. }
  67. }
  68. func (f *folder) Serve() {
  69. l.Debugln(f, "starting")
  70. defer l.Debugln(f, "exiting")
  71. defer func() {
  72. f.scanTimer.Stop()
  73. f.setState(FolderIdle)
  74. }()
  75. pause := f.basePause()
  76. pullFailTimer := time.NewTimer(0)
  77. <-pullFailTimer.C
  78. if f.FSWatcherEnabled && f.CheckHealth() == nil {
  79. f.startWatch()
  80. }
  81. initialCompleted := f.initialScanFinished
  82. for {
  83. select {
  84. case <-f.ctx.Done():
  85. return
  86. case <-f.pullScheduled:
  87. pullFailTimer.Stop()
  88. select {
  89. case <-pullFailTimer.C:
  90. default:
  91. }
  92. if !f.puller.pull() {
  93. // Pulling failed, try again later.
  94. pullFailTimer.Reset(pause)
  95. }
  96. case <-pullFailTimer.C:
  97. if f.puller.pull() {
  98. // We're good. Don't schedule another fail pull and reset
  99. // the pause interval.
  100. pause = f.basePause()
  101. continue
  102. }
  103. // Pulling failed, try again later.
  104. l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), pause)
  105. pullFailTimer.Reset(pause)
  106. // Back off from retrying to pull with an upper limit.
  107. if pause < 60*f.basePause() {
  108. pause *= 2
  109. }
  110. case <-initialCompleted:
  111. // Initial scan has completed, we should do a pull
  112. initialCompleted = nil // never hit this case again
  113. if !f.puller.pull() {
  114. // Pulling failed, try again later.
  115. pullFailTimer.Reset(pause)
  116. }
  117. // The reason for running the scanner from within the puller is that
  118. // this is the easiest way to make sure we are not doing both at the
  119. // same time.
  120. case <-f.scanTimer.C:
  121. l.Debugln(f, "Scanning subdirectories")
  122. f.scanTimerFired()
  123. case req := <-f.scanNow:
  124. req.err <- f.scanSubdirs(req.subdirs)
  125. case next := <-f.scanDelay:
  126. f.scanTimer.Reset(next)
  127. case fsEvents := <-f.watchChan:
  128. l.Debugln(f, "filesystem notification rescan")
  129. f.scanSubdirs(fsEvents)
  130. case <-f.restartWatchChan:
  131. f.restartWatch()
  132. }
  133. }
  134. }
  135. func (f *folder) BringToFront(string) {}
  136. func (f *folder) Override(fs *db.FileSet, updateFn func([]protocol.FileInfo)) {}
  137. func (f *folder) DelayScan(next time.Duration) {
  138. f.Delay(next)
  139. }
  140. func (f *folder) IgnoresUpdated() {
  141. if f.FSWatcherEnabled {
  142. f.scheduleWatchRestart()
  143. }
  144. }
  145. func (f *folder) SchedulePull() {
  146. select {
  147. case f.pullScheduled <- struct{}{}:
  148. default:
  149. // We might be busy doing a pull and thus not reading from this
  150. // channel. The channel is 1-buffered, so one notification will be
  151. // queued to ensure we recheck after the pull, but beyond that we must
  152. // make sure to not block index receiving.
  153. }
  154. }
  155. func (f *folder) Jobs() ([]string, []string) {
  156. return nil, nil
  157. }
  158. func (f *folder) Scan(subdirs []string) error {
  159. <-f.initialScanFinished
  160. req := rescanRequest{
  161. subdirs: subdirs,
  162. err: make(chan error),
  163. }
  164. f.scanNow <- req
  165. return <-req.err
  166. }
  167. func (f *folder) Reschedule() {
  168. if f.scanInterval == 0 {
  169. return
  170. }
  171. // Sleep a random time between 3/4 and 5/4 of the configured interval.
  172. sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
  173. interval := time.Duration(sleepNanos) * time.Nanosecond
  174. l.Debugln(f, "next rescan in", interval)
  175. f.scanTimer.Reset(interval)
  176. }
  177. func (f *folder) Delay(next time.Duration) {
  178. f.scanDelay <- next
  179. }
  180. func (f *folder) Stop() {
  181. f.cancel()
  182. }
  183. // CheckHealth checks the folder for common errors, updates the folder state
  184. // and returns the current folder error, or nil if the folder is healthy.
  185. func (f *folder) CheckHealth() error {
  186. err := f.getHealthError()
  187. f.setError(err)
  188. return err
  189. }
  190. func (f *folder) getHealthError() error {
  191. // Check for folder errors, with the most serious and specific first and
  192. // generic ones like out of space on the home disk later.
  193. if err := f.CheckPath(); err != nil {
  194. return err
  195. }
  196. if err := f.CheckFreeSpace(); err != nil {
  197. return err
  198. }
  199. if err := f.model.cfg.CheckHomeFreeSpace(); err != nil {
  200. return err
  201. }
  202. return nil
  203. }
  204. func (f *folder) scanSubdirs(subDirs []string) error {
  205. if err := f.model.internalScanFolderSubdirs(f.ctx, f.folderID, subDirs); err != nil {
  206. // Potentially sets the error twice, once in the scanner just
  207. // by doing a check, and once here, if the error returned is
  208. // the same one as returned by CheckHealth, though
  209. // duplicate set is handled by setError.
  210. f.setError(err)
  211. return err
  212. }
  213. return nil
  214. }
  215. func (f *folder) scanTimerFired() {
  216. err := f.scanSubdirs(nil)
  217. select {
  218. case <-f.initialScanFinished:
  219. default:
  220. status := "Completed"
  221. if err != nil {
  222. status = "Failed"
  223. }
  224. l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
  225. close(f.initialScanFinished)
  226. }
  227. f.Reschedule()
  228. }
  229. func (f *folder) WatchError() error {
  230. f.watchErrMut.Lock()
  231. defer f.watchErrMut.Unlock()
  232. return f.watchErr
  233. }
  234. // stopWatch immediately aborts watching and may be called asynchronously
  235. func (f *folder) stopWatch() {
  236. f.watchCancel()
  237. f.watchErrMut.Lock()
  238. f.watchErr = errWatchNotStarted
  239. f.watchErrMut.Unlock()
  240. }
  241. // scheduleWatchRestart makes sure watching is restarted from the main for loop
  242. // in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
  243. func (f *folder) scheduleWatchRestart() {
  244. select {
  245. case f.restartWatchChan <- struct{}{}:
  246. default:
  247. // We might be busy doing a pull and thus not reading from this
  248. // channel. The channel is 1-buffered, so one notification will be
  249. // queued to ensure we recheck after the pull.
  250. }
  251. }
  252. // restartWatch should only ever be called synchronously. If you want to use
  253. // this asynchronously, you should probably use scheduleWatchRestart instead.
  254. func (f *folder) restartWatch() {
  255. f.stopWatch()
  256. f.startWatch()
  257. f.Scan(nil)
  258. }
  259. // startWatch should only ever be called synchronously. If you want to use
  260. // this asynchronously, you should probably use scheduleWatchRestart instead.
  261. func (f *folder) startWatch() {
  262. ctx, cancel := context.WithCancel(f.ctx)
  263. f.model.fmut.RLock()
  264. ignores := f.model.folderIgnores[f.folderID]
  265. f.model.fmut.RUnlock()
  266. f.watchChan = make(chan []string)
  267. f.watchCancel = cancel
  268. go f.startWatchAsync(ctx, ignores)
  269. }
  270. // startWatchAsync tries to start the filesystem watching and retries every minute on failure.
  271. // It is a convenience function that should not be used except in startWatch.
  272. func (f *folder) startWatchAsync(ctx context.Context, ignores *ignore.Matcher) {
  273. timer := time.NewTimer(0)
  274. for {
  275. select {
  276. case <-timer.C:
  277. eventChan, err := f.Filesystem().Watch(".", ignores, ctx, f.IgnorePerms)
  278. f.watchErrMut.Lock()
  279. prevErr := f.watchErr
  280. f.watchErr = err
  281. f.watchErrMut.Unlock()
  282. if err != nil {
  283. if prevErr == errWatchNotStarted {
  284. l.Warnf("Failed to start filesystem watcher for folder %s: %v", f.Description(), err)
  285. } else {
  286. l.Debugf("Failed to start filesystem watcher for folder %s again: %v", f.Description(), err)
  287. }
  288. timer.Reset(time.Minute)
  289. continue
  290. }
  291. watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, ctx)
  292. l.Debugln("Started filesystem watcher for folder", f.Description())
  293. return
  294. case <-ctx.Done():
  295. return
  296. }
  297. }
  298. }
  299. func (f *folder) setError(err error) {
  300. _, _, oldErr := f.getState()
  301. if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
  302. return
  303. }
  304. if err != nil {
  305. if oldErr == nil {
  306. l.Warnf("Error on folder %s: %v", f.Description(), err)
  307. } else {
  308. l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
  309. }
  310. } else {
  311. l.Infoln("Cleared error on folder", f.Description())
  312. }
  313. if f.FSWatcherEnabled {
  314. if err != nil {
  315. f.stopWatch()
  316. } else {
  317. f.scheduleWatchRestart()
  318. }
  319. }
  320. f.stateTracker.setError(err)
  321. }
  322. func (f *folder) basePause() time.Duration {
  323. if f.PullerPauseS == 0 {
  324. return defaultPullerPause
  325. }
  326. return time.Duration(f.PullerPauseS) * time.Second
  327. }
  328. func (f *folder) String() string {
  329. return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
  330. }