folder.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "errors"
  10. "fmt"
  11. "math/rand"
  12. "time"
  13. "github.com/syncthing/syncthing/lib/config"
  14. "github.com/syncthing/syncthing/lib/db"
  15. "github.com/syncthing/syncthing/lib/events"
  16. "github.com/syncthing/syncthing/lib/ignore"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. "github.com/syncthing/syncthing/lib/sync"
  19. "github.com/syncthing/syncthing/lib/watchaggregator"
  20. )
  21. var errWatchNotStarted = errors.New("not started")
  22. type folder struct {
  23. stateTracker
  24. config.FolderConfiguration
  25. localFlags uint32
  26. model *Model
  27. shortID protocol.ShortID
  28. ctx context.Context
  29. cancel context.CancelFunc
  30. scanInterval time.Duration
  31. scanTimer *time.Timer
  32. scanNow chan rescanRequest
  33. scanDelay chan time.Duration
  34. initialScanFinished chan struct{}
  35. stopped chan struct{}
  36. pullScheduled chan struct{}
  37. watchCancel context.CancelFunc
  38. watchChan chan []string
  39. restartWatchChan chan struct{}
  40. watchErr error
  41. watchErrMut sync.Mutex
  42. puller puller
  43. }
  44. type rescanRequest struct {
  45. subdirs []string
  46. err chan error
  47. }
  48. type puller interface {
  49. pull() bool // true when successfull and should not be retried
  50. }
  51. func newFolder(model *Model, cfg config.FolderConfiguration) folder {
  52. ctx, cancel := context.WithCancel(context.Background())
  53. return folder{
  54. stateTracker: newStateTracker(cfg.ID),
  55. FolderConfiguration: cfg,
  56. model: model,
  57. shortID: model.shortID,
  58. ctx: ctx,
  59. cancel: cancel,
  60. scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
  61. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
  62. scanNow: make(chan rescanRequest),
  63. scanDelay: make(chan time.Duration),
  64. initialScanFinished: make(chan struct{}),
  65. stopped: make(chan struct{}),
  66. pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
  67. watchCancel: func() {},
  68. restartWatchChan: make(chan struct{}, 1),
  69. watchErrMut: sync.NewMutex(),
  70. }
  71. }
  72. func (f *folder) Serve() {
  73. l.Debugln(f, "starting")
  74. defer l.Debugln(f, "exiting")
  75. defer func() {
  76. f.scanTimer.Stop()
  77. f.setState(FolderIdle)
  78. close(f.stopped)
  79. }()
  80. pause := f.basePause()
  81. pullFailTimer := time.NewTimer(0)
  82. <-pullFailTimer.C
  83. if f.FSWatcherEnabled && f.CheckHealth() == nil {
  84. f.startWatch()
  85. }
  86. initialCompleted := f.initialScanFinished
  87. for {
  88. select {
  89. case <-f.ctx.Done():
  90. return
  91. case <-f.pullScheduled:
  92. pullFailTimer.Stop()
  93. select {
  94. case <-pullFailTimer.C:
  95. default:
  96. }
  97. if !f.puller.pull() {
  98. // Pulling failed, try again later.
  99. pullFailTimer.Reset(pause)
  100. }
  101. case <-pullFailTimer.C:
  102. if f.puller.pull() {
  103. // We're good. Don't schedule another fail pull and reset
  104. // the pause interval.
  105. pause = f.basePause()
  106. continue
  107. }
  108. // Pulling failed, try again later.
  109. l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), pause)
  110. pullFailTimer.Reset(pause)
  111. // Back off from retrying to pull with an upper limit.
  112. if pause < 60*f.basePause() {
  113. pause *= 2
  114. }
  115. case <-initialCompleted:
  116. // Initial scan has completed, we should do a pull
  117. initialCompleted = nil // never hit this case again
  118. if !f.puller.pull() {
  119. // Pulling failed, try again later.
  120. pullFailTimer.Reset(pause)
  121. }
  122. // The reason for running the scanner from within the puller is that
  123. // this is the easiest way to make sure we are not doing both at the
  124. // same time.
  125. case <-f.scanTimer.C:
  126. l.Debugln(f, "Scanning subdirectories")
  127. f.scanTimerFired()
  128. case req := <-f.scanNow:
  129. req.err <- f.scanSubdirs(req.subdirs)
  130. case next := <-f.scanDelay:
  131. f.scanTimer.Reset(next)
  132. case fsEvents := <-f.watchChan:
  133. l.Debugln(f, "filesystem notification rescan")
  134. f.scanSubdirs(fsEvents)
  135. case <-f.restartWatchChan:
  136. f.restartWatch()
  137. }
  138. }
  139. }
  140. func (f *folder) BringToFront(string) {}
  141. func (f *folder) Override(fs *db.FileSet, updateFn func([]protocol.FileInfo)) {}
  142. func (f *folder) Revert(fs *db.FileSet, updateFn func([]protocol.FileInfo)) {}
  143. func (f *folder) DelayScan(next time.Duration) {
  144. f.Delay(next)
  145. }
  146. func (f *folder) IgnoresUpdated() {
  147. if f.FSWatcherEnabled {
  148. f.scheduleWatchRestart()
  149. }
  150. }
  151. func (f *folder) SchedulePull() {
  152. select {
  153. case f.pullScheduled <- struct{}{}:
  154. default:
  155. // We might be busy doing a pull and thus not reading from this
  156. // channel. The channel is 1-buffered, so one notification will be
  157. // queued to ensure we recheck after the pull, but beyond that we must
  158. // make sure to not block index receiving.
  159. }
  160. }
  161. func (f *folder) Jobs() ([]string, []string) {
  162. return nil, nil
  163. }
  164. func (f *folder) Scan(subdirs []string) error {
  165. <-f.initialScanFinished
  166. req := rescanRequest{
  167. subdirs: subdirs,
  168. err: make(chan error),
  169. }
  170. select {
  171. case f.scanNow <- req:
  172. return <-req.err
  173. case <-f.ctx.Done():
  174. return f.ctx.Err()
  175. }
  176. }
  177. func (f *folder) Reschedule() {
  178. if f.scanInterval == 0 {
  179. return
  180. }
  181. // Sleep a random time between 3/4 and 5/4 of the configured interval.
  182. sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
  183. interval := time.Duration(sleepNanos) * time.Nanosecond
  184. l.Debugln(f, "next rescan in", interval)
  185. f.scanTimer.Reset(interval)
  186. }
  187. func (f *folder) Delay(next time.Duration) {
  188. f.scanDelay <- next
  189. }
  190. func (f *folder) Stop() {
  191. f.cancel()
  192. <-f.stopped
  193. }
  194. // CheckHealth checks the folder for common errors, updates the folder state
  195. // and returns the current folder error, or nil if the folder is healthy.
  196. func (f *folder) CheckHealth() error {
  197. err := f.getHealthError()
  198. f.setError(err)
  199. return err
  200. }
  201. func (f *folder) getHealthError() error {
  202. // Check for folder errors, with the most serious and specific first and
  203. // generic ones like out of space on the home disk later.
  204. if err := f.CheckPath(); err != nil {
  205. return err
  206. }
  207. if err := f.model.cfg.CheckHomeFreeSpace(); err != nil {
  208. return err
  209. }
  210. return nil
  211. }
  212. func (f *folder) scanSubdirs(subDirs []string) error {
  213. if err := f.model.internalScanFolderSubdirs(f.ctx, f.folderID, subDirs, f.localFlags); err != nil {
  214. // Potentially sets the error twice, once in the scanner just
  215. // by doing a check, and once here, if the error returned is
  216. // the same one as returned by CheckHealth, though
  217. // duplicate set is handled by setError.
  218. f.setError(err)
  219. return err
  220. }
  221. return nil
  222. }
  223. func (f *folder) scanTimerFired() {
  224. err := f.scanSubdirs(nil)
  225. select {
  226. case <-f.initialScanFinished:
  227. default:
  228. status := "Completed"
  229. if err != nil {
  230. status = "Failed"
  231. }
  232. l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
  233. close(f.initialScanFinished)
  234. }
  235. f.Reschedule()
  236. }
  237. func (f *folder) WatchError() error {
  238. f.watchErrMut.Lock()
  239. defer f.watchErrMut.Unlock()
  240. return f.watchErr
  241. }
  242. // stopWatch immediately aborts watching and may be called asynchronously
  243. func (f *folder) stopWatch() {
  244. f.watchCancel()
  245. f.watchErrMut.Lock()
  246. prevErr := f.watchErr
  247. f.watchErr = errWatchNotStarted
  248. f.watchErrMut.Unlock()
  249. if prevErr != errWatchNotStarted {
  250. data := map[string]interface{}{
  251. "folder": f.ID,
  252. "to": errWatchNotStarted.Error(),
  253. }
  254. if prevErr != nil {
  255. data["from"] = prevErr.Error()
  256. }
  257. events.Default.Log(events.FolderWatchStateChanged, data)
  258. }
  259. }
  260. // scheduleWatchRestart makes sure watching is restarted from the main for loop
  261. // in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
  262. func (f *folder) scheduleWatchRestart() {
  263. select {
  264. case f.restartWatchChan <- struct{}{}:
  265. default:
  266. // We might be busy doing a pull and thus not reading from this
  267. // channel. The channel is 1-buffered, so one notification will be
  268. // queued to ensure we recheck after the pull.
  269. }
  270. }
  271. // restartWatch should only ever be called synchronously. If you want to use
  272. // this asynchronously, you should probably use scheduleWatchRestart instead.
  273. func (f *folder) restartWatch() {
  274. f.stopWatch()
  275. f.startWatch()
  276. f.scanSubdirs(nil)
  277. }
  278. // startWatch should only ever be called synchronously. If you want to use
  279. // this asynchronously, you should probably use scheduleWatchRestart instead.
  280. func (f *folder) startWatch() {
  281. ctx, cancel := context.WithCancel(f.ctx)
  282. f.model.fmut.RLock()
  283. ignores := f.model.folderIgnores[f.folderID]
  284. f.model.fmut.RUnlock()
  285. f.watchChan = make(chan []string)
  286. f.watchCancel = cancel
  287. go f.startWatchAsync(ctx, ignores)
  288. }
  289. // startWatchAsync tries to start the filesystem watching and retries every minute on failure.
  290. // It is a convenience function that should not be used except in startWatch.
  291. func (f *folder) startWatchAsync(ctx context.Context, ignores *ignore.Matcher) {
  292. timer := time.NewTimer(0)
  293. for {
  294. select {
  295. case <-timer.C:
  296. eventChan, err := f.Filesystem().Watch(".", ignores, ctx, f.IgnorePerms)
  297. f.watchErrMut.Lock()
  298. prevErr := f.watchErr
  299. f.watchErr = err
  300. f.watchErrMut.Unlock()
  301. if err != prevErr {
  302. data := map[string]interface{}{
  303. "folder": f.ID,
  304. }
  305. if prevErr != nil {
  306. data["from"] = prevErr.Error()
  307. }
  308. if err != nil {
  309. data["to"] = err.Error()
  310. }
  311. events.Default.Log(events.FolderWatchStateChanged, data)
  312. }
  313. if err != nil {
  314. if prevErr == errWatchNotStarted {
  315. l.Infof("Error while trying to start filesystem watcher for folder %s, trying again in 1min: %v", f.Description(), err)
  316. } else {
  317. l.Debugf("Repeat error while trying to start filesystem watcher for folder %s, trying again in 1min: %v", f.Description(), err)
  318. }
  319. timer.Reset(time.Minute)
  320. continue
  321. }
  322. watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, ctx)
  323. l.Debugln("Started filesystem watcher for folder", f.Description())
  324. return
  325. case <-ctx.Done():
  326. return
  327. }
  328. }
  329. }
  330. func (f *folder) setError(err error) {
  331. _, _, oldErr := f.getState()
  332. if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
  333. return
  334. }
  335. if err != nil {
  336. if oldErr == nil {
  337. l.Warnf("Error on folder %s: %v", f.Description(), err)
  338. } else {
  339. l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
  340. }
  341. } else {
  342. l.Infoln("Cleared error on folder", f.Description())
  343. }
  344. if f.FSWatcherEnabled {
  345. if err != nil {
  346. f.stopWatch()
  347. } else {
  348. f.scheduleWatchRestart()
  349. }
  350. }
  351. f.stateTracker.setError(err)
  352. }
  353. func (f *folder) basePause() time.Duration {
  354. if f.PullerPauseS == 0 {
  355. return defaultPullerPause
  356. }
  357. return time.Duration(f.PullerPauseS) * time.Second
  358. }
  359. func (f *folder) String() string {
  360. return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
  361. }