folder.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "errors"
  10. "fmt"
  11. "math/rand"
  12. "path/filepath"
  13. "sort"
  14. "sync/atomic"
  15. "time"
  16. "github.com/syncthing/syncthing/lib/config"
  17. "github.com/syncthing/syncthing/lib/db"
  18. "github.com/syncthing/syncthing/lib/events"
  19. "github.com/syncthing/syncthing/lib/fs"
  20. "github.com/syncthing/syncthing/lib/ignore"
  21. "github.com/syncthing/syncthing/lib/locations"
  22. "github.com/syncthing/syncthing/lib/osutil"
  23. "github.com/syncthing/syncthing/lib/protocol"
  24. "github.com/syncthing/syncthing/lib/scanner"
  25. "github.com/syncthing/syncthing/lib/stats"
  26. "github.com/syncthing/syncthing/lib/sync"
  27. "github.com/syncthing/syncthing/lib/watchaggregator"
  28. "github.com/thejerf/suture"
  29. )
  30. // scanLimiter limits the number of concurrent scans. A limit of zero means no limit.
  31. var scanLimiter = newByteSemaphore(0)
  32. var errWatchNotStarted = errors.New("not started")
  33. type folder struct {
  34. suture.Service
  35. stateTracker
  36. config.FolderConfiguration
  37. *stats.FolderStatisticsReference
  38. localFlags uint32
  39. model *model
  40. shortID protocol.ShortID
  41. fset *db.FileSet
  42. ignores *ignore.Matcher
  43. ctx context.Context
  44. cancel context.CancelFunc
  45. scanInterval time.Duration
  46. scanTimer *time.Timer
  47. scanNow chan rescanRequest
  48. scanDelay chan time.Duration
  49. initialScanFinished chan struct{}
  50. scanErrors []FileError
  51. scanErrorsMut sync.Mutex
  52. pullScheduled chan struct{}
  53. watchCancel context.CancelFunc
  54. watchChan chan []string
  55. restartWatchChan chan struct{}
  56. watchErr error
  57. watchMut sync.Mutex
  58. puller puller
  59. }
  60. type rescanRequest struct {
  61. subdirs []string
  62. err chan error
  63. }
  64. type puller interface {
  65. pull() bool // true when successfull and should not be retried
  66. }
  67. func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration) folder {
  68. ctx, cancel := context.WithCancel(context.Background())
  69. return folder{
  70. stateTracker: newStateTracker(cfg.ID),
  71. FolderConfiguration: cfg,
  72. FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID),
  73. model: model,
  74. shortID: model.shortID,
  75. fset: fset,
  76. ignores: ignores,
  77. ctx: ctx,
  78. cancel: cancel,
  79. scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
  80. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
  81. scanNow: make(chan rescanRequest),
  82. scanDelay: make(chan time.Duration),
  83. initialScanFinished: make(chan struct{}),
  84. scanErrorsMut: sync.NewMutex(),
  85. pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
  86. watchCancel: func() {},
  87. restartWatchChan: make(chan struct{}, 1),
  88. watchMut: sync.NewMutex(),
  89. }
  90. }
  91. func (f *folder) serve(_ chan struct{}) {
  92. atomic.AddInt32(&f.model.foldersRunning, 1)
  93. defer atomic.AddInt32(&f.model.foldersRunning, -1)
  94. l.Debugln(f, "starting")
  95. defer l.Debugln(f, "exiting")
  96. defer func() {
  97. f.scanTimer.Stop()
  98. f.setState(FolderIdle)
  99. }()
  100. pause := f.basePause()
  101. pullFailTimer := time.NewTimer(0)
  102. <-pullFailTimer.C
  103. if f.FSWatcherEnabled && f.CheckHealth() == nil {
  104. f.startWatch()
  105. }
  106. initialCompleted := f.initialScanFinished
  107. pull := func() {
  108. startTime := time.Now()
  109. if f.puller.pull() {
  110. // We're good. Don't schedule another pull and reset
  111. // the pause interval.
  112. pause = f.basePause()
  113. return
  114. }
  115. // Pulling failed, try again later.
  116. delay := pause + time.Since(startTime)
  117. l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), delay)
  118. pullFailTimer.Reset(delay)
  119. if pause < 60*f.basePause() {
  120. pause *= 2
  121. }
  122. }
  123. for {
  124. select {
  125. case <-f.ctx.Done():
  126. return
  127. case <-f.pullScheduled:
  128. pullFailTimer.Stop()
  129. select {
  130. case <-pullFailTimer.C:
  131. default:
  132. }
  133. pull()
  134. case <-pullFailTimer.C:
  135. pull()
  136. case <-initialCompleted:
  137. // Initial scan has completed, we should do a pull
  138. initialCompleted = nil // never hit this case again
  139. if !f.puller.pull() {
  140. // Pulling failed, try again later.
  141. pullFailTimer.Reset(pause)
  142. }
  143. case <-f.scanTimer.C:
  144. l.Debugln(f, "Scanning subdirectories")
  145. f.scanTimerFired()
  146. case req := <-f.scanNow:
  147. req.err <- f.scanSubdirs(req.subdirs)
  148. case next := <-f.scanDelay:
  149. f.scanTimer.Reset(next)
  150. case fsEvents := <-f.watchChan:
  151. l.Debugln(f, "filesystem notification rescan")
  152. f.scanSubdirs(fsEvents)
  153. case <-f.restartWatchChan:
  154. f.restartWatch()
  155. }
  156. }
  157. }
  158. func (f *folder) BringToFront(string) {}
  159. func (f *folder) Override() {}
  160. func (f *folder) Revert() {}
  161. func (f *folder) DelayScan(next time.Duration) {
  162. f.Delay(next)
  163. }
  164. func (f *folder) ignoresUpdated() {
  165. if f.FSWatcherEnabled {
  166. f.scheduleWatchRestart()
  167. }
  168. }
  169. func (f *folder) SchedulePull() {
  170. select {
  171. case f.pullScheduled <- struct{}{}:
  172. default:
  173. // We might be busy doing a pull and thus not reading from this
  174. // channel. The channel is 1-buffered, so one notification will be
  175. // queued to ensure we recheck after the pull, but beyond that we must
  176. // make sure to not block index receiving.
  177. }
  178. }
  179. func (f *folder) Jobs(_, _ int) ([]string, []string, int) {
  180. return nil, nil, 0
  181. }
  182. func (f *folder) Scan(subdirs []string) error {
  183. <-f.initialScanFinished
  184. req := rescanRequest{
  185. subdirs: subdirs,
  186. err: make(chan error),
  187. }
  188. select {
  189. case f.scanNow <- req:
  190. return <-req.err
  191. case <-f.ctx.Done():
  192. return f.ctx.Err()
  193. }
  194. }
  195. func (f *folder) Reschedule() {
  196. if f.scanInterval == 0 {
  197. return
  198. }
  199. // Sleep a random time between 3/4 and 5/4 of the configured interval.
  200. sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
  201. interval := time.Duration(sleepNanos) * time.Nanosecond
  202. l.Debugln(f, "next rescan in", interval)
  203. f.scanTimer.Reset(interval)
  204. }
  205. func (f *folder) Delay(next time.Duration) {
  206. f.scanDelay <- next
  207. }
  208. func (f *folder) Stop() {
  209. f.cancel()
  210. f.Service.Stop()
  211. }
  212. // CheckHealth checks the folder for common errors, updates the folder state
  213. // and returns the current folder error, or nil if the folder is healthy.
  214. func (f *folder) CheckHealth() error {
  215. err := f.getHealthError()
  216. f.setError(err)
  217. return err
  218. }
  219. func (f *folder) getHealthError() error {
  220. // Check for folder errors, with the most serious and specific first and
  221. // generic ones like out of space on the home disk later.
  222. if err := f.CheckPath(); err != nil {
  223. return err
  224. }
  225. dbPath := locations.Get(locations.Database)
  226. if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
  227. if err = config.CheckFreeSpace(f.model.cfg.Options().MinHomeDiskFree, usage); err != nil {
  228. return fmt.Errorf("insufficient space on disk for database (%v): %v", dbPath, err)
  229. }
  230. }
  231. return nil
  232. }
  233. func (f *folder) scanSubdirs(subDirs []string) error {
  234. if err := f.CheckHealth(); err != nil {
  235. return err
  236. }
  237. mtimefs := f.fset.MtimeFS()
  238. f.setState(FolderScanWaiting)
  239. scanLimiter.take(1)
  240. defer scanLimiter.give(1)
  241. for i := range subDirs {
  242. sub := osutil.NativeFilename(subDirs[i])
  243. if sub == "" {
  244. // A blank subdirs means to scan the entire folder. We can trim
  245. // the subDirs list and go on our way.
  246. subDirs = nil
  247. break
  248. }
  249. subDirs[i] = sub
  250. }
  251. // Check if the ignore patterns changed as part of scanning this folder.
  252. // If they did we should schedule a pull of the folder so that we
  253. // request things we might have suddenly become unignored and so on.
  254. oldHash := f.ignores.Hash()
  255. defer func() {
  256. if f.ignores.Hash() != oldHash {
  257. l.Debugln("Folder", f.Description(), "ignore patterns change detected while scanning; triggering puller")
  258. f.ignoresUpdated()
  259. f.SchedulePull()
  260. }
  261. }()
  262. if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
  263. err = fmt.Errorf("loading ignores: %v", err)
  264. f.setError(err)
  265. return err
  266. }
  267. // Clean the list of subitems to ensure that we start at a known
  268. // directory, and don't scan subdirectories of things we've already
  269. // scanned.
  270. subDirs = unifySubs(subDirs, func(file string) bool {
  271. _, ok := f.fset.Get(protocol.LocalDeviceID, file)
  272. return ok
  273. })
  274. f.setState(FolderScanning)
  275. fchan := scanner.Walk(f.ctx, scanner.Config{
  276. Folder: f.ID,
  277. Subs: subDirs,
  278. Matcher: f.ignores,
  279. TempLifetime: time.Duration(f.model.cfg.Options().KeepTemporariesH) * time.Hour,
  280. CurrentFiler: cFiler{f.fset},
  281. Filesystem: mtimefs,
  282. IgnorePerms: f.IgnorePerms,
  283. AutoNormalize: f.AutoNormalize,
  284. Hashers: f.model.numHashers(f.ID),
  285. ShortID: f.shortID,
  286. ProgressTickIntervalS: f.ScanProgressIntervalS,
  287. LocalFlags: f.localFlags,
  288. })
  289. batchFn := func(fs []protocol.FileInfo) error {
  290. if err := f.CheckHealth(); err != nil {
  291. l.Debugf("Stopping scan of folder %s due to: %s", f.Description(), err)
  292. return err
  293. }
  294. f.updateLocalsFromScanning(fs)
  295. return nil
  296. }
  297. // Resolve items which are identical with the global state.
  298. if f.localFlags&protocol.FlagLocalReceiveOnly != 0 {
  299. oldBatchFn := batchFn // can't reference batchFn directly (recursion)
  300. batchFn = func(fs []protocol.FileInfo) error {
  301. for i := range fs {
  302. switch gf, ok := f.fset.GetGlobal(fs[i].Name); {
  303. case !ok:
  304. continue
  305. case gf.IsEquivalentOptional(fs[i], false, false, protocol.FlagLocalReceiveOnly):
  306. // What we have locally is equivalent to the global file.
  307. fs[i].Version = fs[i].Version.Merge(gf.Version)
  308. fallthrough
  309. case fs[i].IsDeleted() && gf.IsReceiveOnlyChanged():
  310. // Our item is deleted and the global item is our own
  311. // receive only file. We can't delete file infos, so
  312. // we just pretend it is a normal deleted file (nobody
  313. // cares about that).
  314. fs[i].LocalFlags &^= protocol.FlagLocalReceiveOnly
  315. }
  316. }
  317. return oldBatchFn(fs)
  318. }
  319. }
  320. batch := newFileInfoBatch(batchFn)
  321. // Schedule a pull after scanning, but only if we actually detected any
  322. // changes.
  323. changes := 0
  324. defer func() {
  325. if changes > 0 {
  326. f.SchedulePull()
  327. }
  328. }()
  329. f.clearScanErrors(subDirs)
  330. for res := range fchan {
  331. if res.Err != nil {
  332. f.newScanError(res.Path, res.Err)
  333. continue
  334. }
  335. if err := batch.flushIfFull(); err != nil {
  336. return err
  337. }
  338. batch.append(res.File)
  339. changes++
  340. }
  341. if err := batch.flush(); err != nil {
  342. return err
  343. }
  344. if len(subDirs) == 0 {
  345. // If we have no specific subdirectories to traverse, set it to one
  346. // empty prefix so we traverse the entire folder contents once.
  347. subDirs = []string{""}
  348. }
  349. // Do a scan of the database for each prefix, to check for deleted and
  350. // ignored files.
  351. var toIgnore []db.FileInfoTruncated
  352. ignoredParent := ""
  353. for _, sub := range subDirs {
  354. var iterError error
  355. f.fset.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
  356. file := fi.(db.FileInfoTruncated)
  357. if err := batch.flushIfFull(); err != nil {
  358. iterError = err
  359. return false
  360. }
  361. if ignoredParent != "" && !fs.IsParent(file.Name, ignoredParent) {
  362. for _, file := range toIgnore {
  363. l.Debugln("marking file as ignored", file)
  364. nf := file.ConvertToIgnoredFileInfo(f.shortID)
  365. batch.append(nf)
  366. changes++
  367. if err := batch.flushIfFull(); err != nil {
  368. iterError = err
  369. return false
  370. }
  371. }
  372. toIgnore = toIgnore[:0]
  373. ignoredParent = ""
  374. }
  375. switch ignored := f.ignores.Match(file.Name).IsIgnored(); {
  376. case !file.IsIgnored() && ignored:
  377. // File was not ignored at last pass but has been ignored.
  378. if file.IsDirectory() {
  379. // Delay ignoring as a child might be unignored.
  380. toIgnore = append(toIgnore, file)
  381. if ignoredParent == "" {
  382. // If the parent wasn't ignored already, set
  383. // this path as the "highest" ignored parent
  384. ignoredParent = file.Name
  385. }
  386. return true
  387. }
  388. l.Debugln("marking file as ignored", f)
  389. nf := file.ConvertToIgnoredFileInfo(f.shortID)
  390. batch.append(nf)
  391. changes++
  392. case file.IsIgnored() && !ignored:
  393. // Successfully scanned items are already un-ignored during
  394. // the scan, so check whether it is deleted.
  395. fallthrough
  396. case !file.IsIgnored() && !file.IsDeleted() && !file.IsUnsupported():
  397. // The file is not ignored, deleted or unsupported. Lets check if
  398. // it's still here. Simply stat:ing it wont do as there are
  399. // tons of corner cases (e.g. parent dir->symlink, missing
  400. // permissions)
  401. if !osutil.IsDeleted(mtimefs, file.Name) {
  402. if ignoredParent != "" {
  403. // Don't ignore parents of this not ignored item
  404. toIgnore = toIgnore[:0]
  405. ignoredParent = ""
  406. }
  407. return true
  408. }
  409. nf := protocol.FileInfo{
  410. Name: file.Name,
  411. Type: file.Type,
  412. Size: 0,
  413. ModifiedS: file.ModifiedS,
  414. ModifiedNs: file.ModifiedNs,
  415. ModifiedBy: f.shortID,
  416. Deleted: true,
  417. Version: file.Version.Update(f.shortID),
  418. LocalFlags: f.localFlags,
  419. }
  420. // We do not want to override the global version
  421. // with the deleted file. Keeping only our local
  422. // counter makes sure we are in conflict with any
  423. // other existing versions, which will be resolved
  424. // by the normal pulling mechanisms.
  425. if file.ShouldConflict() {
  426. nf.Version = nf.Version.DropOthers(f.shortID)
  427. }
  428. batch.append(nf)
  429. changes++
  430. }
  431. return true
  432. })
  433. if iterError == nil && len(toIgnore) > 0 {
  434. for _, file := range toIgnore {
  435. l.Debugln("marking file as ignored", f)
  436. nf := file.ConvertToIgnoredFileInfo(f.shortID)
  437. batch.append(nf)
  438. changes++
  439. if iterError = batch.flushIfFull(); iterError != nil {
  440. break
  441. }
  442. }
  443. toIgnore = toIgnore[:0]
  444. }
  445. if iterError != nil {
  446. return iterError
  447. }
  448. }
  449. if err := batch.flush(); err != nil {
  450. return err
  451. }
  452. f.ScanCompleted()
  453. f.setState(FolderIdle)
  454. return nil
  455. }
  456. func (f *folder) scanTimerFired() {
  457. err := f.scanSubdirs(nil)
  458. select {
  459. case <-f.initialScanFinished:
  460. default:
  461. status := "Completed"
  462. if err != nil {
  463. status = "Failed"
  464. }
  465. l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
  466. close(f.initialScanFinished)
  467. }
  468. f.Reschedule()
  469. }
  470. func (f *folder) WatchError() error {
  471. f.watchMut.Lock()
  472. defer f.watchMut.Unlock()
  473. return f.watchErr
  474. }
  475. // stopWatch immediately aborts watching and may be called asynchronously
  476. func (f *folder) stopWatch() {
  477. f.watchMut.Lock()
  478. f.watchCancel()
  479. prevErr := f.watchErr
  480. f.watchErr = errWatchNotStarted
  481. f.watchMut.Unlock()
  482. if prevErr != errWatchNotStarted {
  483. data := map[string]interface{}{
  484. "folder": f.ID,
  485. "to": errWatchNotStarted.Error(),
  486. }
  487. if prevErr != nil {
  488. data["from"] = prevErr.Error()
  489. }
  490. events.Default.Log(events.FolderWatchStateChanged, data)
  491. }
  492. }
  493. // scheduleWatchRestart makes sure watching is restarted from the main for loop
  494. // in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
  495. func (f *folder) scheduleWatchRestart() {
  496. select {
  497. case f.restartWatchChan <- struct{}{}:
  498. default:
  499. // We might be busy doing a pull and thus not reading from this
  500. // channel. The channel is 1-buffered, so one notification will be
  501. // queued to ensure we recheck after the pull.
  502. }
  503. }
  504. // restartWatch should only ever be called synchronously. If you want to use
  505. // this asynchronously, you should probably use scheduleWatchRestart instead.
  506. func (f *folder) restartWatch() {
  507. f.stopWatch()
  508. f.startWatch()
  509. f.scanSubdirs(nil)
  510. }
  511. // startWatch should only ever be called synchronously. If you want to use
  512. // this asynchronously, you should probably use scheduleWatchRestart instead.
  513. func (f *folder) startWatch() {
  514. ctx, cancel := context.WithCancel(f.ctx)
  515. f.watchMut.Lock()
  516. f.watchChan = make(chan []string)
  517. f.watchCancel = cancel
  518. f.watchMut.Unlock()
  519. go f.monitorWatch(ctx)
  520. }
  521. // monitorWatch starts the filesystem watching and retries every minute on failure.
  522. // It should not be used except in startWatch.
  523. func (f *folder) monitorWatch(ctx context.Context) {
  524. failTimer := time.NewTimer(0)
  525. aggrCtx, aggrCancel := context.WithCancel(ctx)
  526. var err error
  527. var eventChan <-chan fs.Event
  528. var errChan <-chan error
  529. warnedOutside := false
  530. for {
  531. select {
  532. case <-failTimer.C:
  533. eventChan, errChan, err = f.Filesystem().Watch(".", f.ignores, ctx, f.IgnorePerms)
  534. // We do this at most once per minute which is the
  535. // default rescan time without watcher.
  536. f.scanOnWatchErr()
  537. f.setWatchError(err)
  538. if err != nil {
  539. failTimer.Reset(time.Minute)
  540. continue
  541. }
  542. watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, aggrCtx)
  543. l.Debugln("Started filesystem watcher for folder", f.Description())
  544. case err = <-errChan:
  545. f.setWatchError(err)
  546. // This error was previously a panic and should never occur, so generate
  547. // a warning, but don't do it repetitively.
  548. if !warnedOutside {
  549. if _, ok := err.(*fs.ErrWatchEventOutsideRoot); ok {
  550. l.Warnln(err)
  551. warnedOutside = true
  552. return
  553. }
  554. }
  555. aggrCancel()
  556. errChan = nil
  557. aggrCtx, aggrCancel = context.WithCancel(ctx)
  558. failTimer.Reset(time.Minute)
  559. case <-ctx.Done():
  560. return
  561. }
  562. }
  563. }
  564. // setWatchError sets the current error state of the watch and should be called
  565. // regardless of whether err is nil or not.
  566. func (f *folder) setWatchError(err error) {
  567. f.watchMut.Lock()
  568. prevErr := f.watchErr
  569. f.watchErr = err
  570. f.watchMut.Unlock()
  571. if err != prevErr {
  572. data := map[string]interface{}{
  573. "folder": f.ID,
  574. }
  575. if prevErr != nil {
  576. data["from"] = prevErr.Error()
  577. }
  578. if err != nil {
  579. data["to"] = err.Error()
  580. }
  581. events.Default.Log(events.FolderWatchStateChanged, data)
  582. }
  583. if err == nil {
  584. return
  585. }
  586. if prevErr == errWatchNotStarted {
  587. l.Infof("Error while trying to start filesystem watcher for folder %s, trying again in 1min: %v", f.Description(), err)
  588. return
  589. }
  590. l.Debugf("Repeat error while trying to start filesystem watcher for folder %s, trying again in 1min: %v", f.Description(), err)
  591. }
  592. // scanOnWatchErr schedules a full scan immediately if an error occurred while watching.
  593. func (f *folder) scanOnWatchErr() {
  594. f.watchMut.Lock()
  595. if f.watchErr != nil && f.watchErr != errWatchNotStarted {
  596. f.Delay(0)
  597. }
  598. f.watchMut.Unlock()
  599. }
  600. func (f *folder) setError(err error) {
  601. select {
  602. case <-f.ctx.Done():
  603. return
  604. default:
  605. }
  606. _, _, oldErr := f.getState()
  607. if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
  608. return
  609. }
  610. if err != nil {
  611. if oldErr == nil {
  612. l.Warnf("Error on folder %s: %v", f.Description(), err)
  613. } else {
  614. l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
  615. }
  616. } else {
  617. l.Infoln("Cleared error on folder", f.Description())
  618. }
  619. if f.FSWatcherEnabled {
  620. if err != nil {
  621. f.stopWatch()
  622. } else {
  623. f.scheduleWatchRestart()
  624. }
  625. }
  626. f.stateTracker.setError(err)
  627. }
  628. func (f *folder) basePause() time.Duration {
  629. if f.PullerPauseS == 0 {
  630. return defaultPullerPause
  631. }
  632. return time.Duration(f.PullerPauseS) * time.Second
  633. }
  634. func (f *folder) String() string {
  635. return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
  636. }
  637. func (f *folder) newScanError(path string, err error) {
  638. f.scanErrorsMut.Lock()
  639. f.scanErrors = append(f.scanErrors, FileError{
  640. Err: err.Error(),
  641. Path: path,
  642. })
  643. f.scanErrorsMut.Unlock()
  644. }
  645. func (f *folder) clearScanErrors(subDirs []string) {
  646. f.scanErrorsMut.Lock()
  647. defer f.scanErrorsMut.Unlock()
  648. if len(subDirs) == 0 {
  649. f.scanErrors = nil
  650. return
  651. }
  652. filtered := f.scanErrors[:0]
  653. outer:
  654. for _, fe := range f.scanErrors {
  655. for _, sub := range subDirs {
  656. if fe.Path == sub || fs.IsParent(fe.Path, sub) {
  657. continue outer
  658. }
  659. }
  660. filtered = append(filtered, fe)
  661. }
  662. f.scanErrors = filtered
  663. }
  664. func (f *folder) Errors() []FileError {
  665. f.scanErrorsMut.Lock()
  666. defer f.scanErrorsMut.Unlock()
  667. return append([]FileError{}, f.scanErrors...)
  668. }
  669. // ForceRescan marks the file such that it gets rehashed on next scan and then
  670. // immediately executes that scan.
  671. func (f *folder) ForceRescan(file protocol.FileInfo) error {
  672. file.SetMustRescan(f.shortID)
  673. f.fset.Update(protocol.LocalDeviceID, []protocol.FileInfo{file})
  674. return f.Scan([]string{file.Name})
  675. }
  676. func (f *folder) updateLocalsFromScanning(fs []protocol.FileInfo) {
  677. f.updateLocals(fs)
  678. f.emitDiskChangeEvents(fs, events.LocalChangeDetected)
  679. }
  680. func (f *folder) updateLocalsFromPulling(fs []protocol.FileInfo) {
  681. f.updateLocals(fs)
  682. f.emitDiskChangeEvents(fs, events.RemoteChangeDetected)
  683. }
  684. func (f *folder) updateLocals(fs []protocol.FileInfo) {
  685. f.fset.Update(protocol.LocalDeviceID, fs)
  686. filenames := make([]string, len(fs))
  687. for i, file := range fs {
  688. filenames[i] = file.Name
  689. }
  690. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  691. "folder": f.ID,
  692. "items": len(fs),
  693. "filenames": filenames,
  694. "version": f.fset.Sequence(protocol.LocalDeviceID),
  695. })
  696. }
  697. func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events.EventType) {
  698. for _, file := range fs {
  699. if file.IsInvalid() {
  700. continue
  701. }
  702. objType := "file"
  703. action := "modified"
  704. switch {
  705. case file.IsDeleted():
  706. action = "deleted"
  707. // If our local vector is version 1 AND it is the only version
  708. // vector so far seen for this file then it is a new file. Else if
  709. // it is > 1 it's not new, and if it is 1 but another shortId
  710. // version vector exists then it is new for us but created elsewhere
  711. // so the file is still not new but modified by us. Only if it is
  712. // truly new do we change this to 'added', else we leave it as
  713. // 'modified'.
  714. case len(file.Version.Counters) == 1 && file.Version.Counters[0].Value == 1:
  715. action = "added"
  716. }
  717. if file.IsSymlink() {
  718. objType = "symlink"
  719. } else if file.IsDirectory() {
  720. objType = "dir"
  721. }
  722. // Two different events can be fired here based on what EventType is passed into function
  723. events.Default.Log(typeOfEvent, map[string]string{
  724. "folder": f.ID,
  725. "folderID": f.ID, // incorrect, deprecated, kept for historical compliance
  726. "label": f.Label,
  727. "action": action,
  728. "type": objType,
  729. "path": filepath.FromSlash(file.Name),
  730. "modifiedBy": file.ModifiedBy.String(),
  731. })
  732. }
  733. }
  734. // The exists function is expected to return true for all known paths
  735. // (excluding "" and ".")
  736. func unifySubs(dirs []string, exists func(dir string) bool) []string {
  737. if len(dirs) == 0 {
  738. return nil
  739. }
  740. sort.Strings(dirs)
  741. if dirs[0] == "" || dirs[0] == "." || dirs[0] == string(fs.PathSeparator) {
  742. return nil
  743. }
  744. prev := "./" // Anything that can't be parent of a clean path
  745. for i := 0; i < len(dirs); {
  746. dir, err := fs.Canonicalize(dirs[i])
  747. if err != nil {
  748. l.Debugf("Skipping %v for scan: %s", dirs[i], err)
  749. dirs = append(dirs[:i], dirs[i+1:]...)
  750. continue
  751. }
  752. if dir == prev || fs.IsParent(dir, prev) {
  753. dirs = append(dirs[:i], dirs[i+1:]...)
  754. continue
  755. }
  756. parent := filepath.Dir(dir)
  757. for parent != "." && parent != string(fs.PathSeparator) && !exists(parent) {
  758. dir = parent
  759. parent = filepath.Dir(dir)
  760. }
  761. dirs[i] = dir
  762. prev = dir
  763. i++
  764. }
  765. return dirs
  766. }
  767. type cFiler struct {
  768. *db.FileSet
  769. }
  770. // Implements scanner.CurrentFiler
  771. func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
  772. return cf.Get(protocol.LocalDeviceID, file)
  773. }