rwfolder.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "os"
  13. "path/filepath"
  14. "time"
  15. "github.com/syncthing/protocol"
  16. "github.com/syncthing/syncthing/internal/config"
  17. "github.com/syncthing/syncthing/internal/db"
  18. "github.com/syncthing/syncthing/internal/events"
  19. "github.com/syncthing/syncthing/internal/ignore"
  20. "github.com/syncthing/syncthing/internal/osutil"
  21. "github.com/syncthing/syncthing/internal/scanner"
  22. "github.com/syncthing/syncthing/internal/symlinks"
  23. "github.com/syncthing/syncthing/internal/sync"
  24. "github.com/syncthing/syncthing/internal/versioner"
  25. )
  26. // TODO: Stop on errors
  27. const (
  28. pauseIntv = 60 * time.Second
  29. nextPullIntv = 10 * time.Second
  30. checkPullIntv = 1 * time.Second
  31. )
  32. // A pullBlockState is passed to the puller routine for each block that needs
  33. // to be fetched.
  34. type pullBlockState struct {
  35. *sharedPullerState
  36. block protocol.BlockInfo
  37. }
  38. // A copyBlocksState is passed to copy routine if the file has blocks to be
  39. // copied.
  40. type copyBlocksState struct {
  41. *sharedPullerState
  42. blocks []protocol.BlockInfo
  43. }
  44. var (
  45. activity = newDeviceActivity()
  46. errNoDevice = errors.New("no available source device")
  47. )
  48. type rwFolder struct {
  49. stateTracker
  50. model *Model
  51. progressEmitter *ProgressEmitter
  52. folder string
  53. dir string
  54. scanIntv time.Duration
  55. versioner versioner.Versioner
  56. ignorePerms bool
  57. lenientMtimes bool
  58. copiers int
  59. pullers int
  60. shortID uint64
  61. order config.PullOrder
  62. stop chan struct{}
  63. queue *jobQueue
  64. dbUpdates chan protocol.FileInfo
  65. scanTimer *time.Timer
  66. pullTimer *time.Timer
  67. delayScan chan time.Duration
  68. }
  69. func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
  70. return &rwFolder{
  71. stateTracker: stateTracker{
  72. folder: cfg.ID,
  73. mut: sync.NewMutex(),
  74. },
  75. model: m,
  76. progressEmitter: m.progressEmitter,
  77. folder: cfg.ID,
  78. dir: cfg.Path(),
  79. scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
  80. ignorePerms: cfg.IgnorePerms,
  81. lenientMtimes: cfg.LenientMtimes,
  82. copiers: cfg.Copiers,
  83. pullers: cfg.Pullers,
  84. shortID: shortID,
  85. order: cfg.Order,
  86. stop: make(chan struct{}),
  87. queue: newJobQueue(),
  88. pullTimer: time.NewTimer(checkPullIntv),
  89. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
  90. delayScan: make(chan time.Duration),
  91. }
  92. }
  93. // Serve will run scans and pulls. It will return when Stop()ed or on a
  94. // critical error.
  95. func (p *rwFolder) Serve() {
  96. if debug {
  97. l.Debugln(p, "starting")
  98. defer l.Debugln(p, "exiting")
  99. }
  100. defer func() {
  101. p.pullTimer.Stop()
  102. p.scanTimer.Stop()
  103. // TODO: Should there be an actual FolderStopped state?
  104. p.setState(FolderIdle)
  105. }()
  106. var prevVer int64
  107. var prevIgnoreHash string
  108. rescheduleScan := func() {
  109. if p.scanIntv == 0 {
  110. // We should not run scans, so it should not be rescheduled.
  111. return
  112. }
  113. // Sleep a random time between 3/4 and 5/4 of the configured interval.
  114. sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
  115. intv := time.Duration(sleepNanos) * time.Nanosecond
  116. if debug {
  117. l.Debugln(p, "next rescan in", intv)
  118. }
  119. p.scanTimer.Reset(intv)
  120. }
  121. // We don't start pulling files until a scan has been completed.
  122. initialScanCompleted := false
  123. for {
  124. select {
  125. case <-p.stop:
  126. return
  127. // TODO: We could easily add a channel here for notifications from
  128. // Index(), so that we immediately start a pull when new index
  129. // information is available. Before that though, I'd like to build a
  130. // repeatable benchmark of how long it takes to sync a change from
  131. // device A to device B, so we have something to work against.
  132. case <-p.pullTimer.C:
  133. if !initialScanCompleted {
  134. if debug {
  135. l.Debugln(p, "skip (initial)")
  136. }
  137. p.pullTimer.Reset(nextPullIntv)
  138. continue
  139. }
  140. p.model.fmut.RLock()
  141. curIgnores := p.model.folderIgnores[p.folder]
  142. p.model.fmut.RUnlock()
  143. if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
  144. // The ignore patterns have changed. We need to re-evaluate if
  145. // there are files we need now that were ignored before.
  146. if debug {
  147. l.Debugln(p, "ignore patterns have changed, resetting prevVer")
  148. }
  149. prevVer = 0
  150. prevIgnoreHash = newHash
  151. }
  152. // RemoteLocalVersion() is a fast call, doesn't touch the database.
  153. curVer := p.model.RemoteLocalVersion(p.folder)
  154. if curVer == prevVer {
  155. if debug {
  156. l.Debugln(p, "skip (curVer == prevVer)", prevVer)
  157. }
  158. p.pullTimer.Reset(checkPullIntv)
  159. continue
  160. }
  161. if debug {
  162. l.Debugln(p, "pulling", prevVer, curVer)
  163. }
  164. p.setState(FolderSyncing)
  165. tries := 0
  166. for {
  167. tries++
  168. changed := p.pullerIteration(curIgnores)
  169. if debug {
  170. l.Debugln(p, "changed", changed)
  171. }
  172. if changed == 0 {
  173. // No files were changed by the puller, so we are in
  174. // sync. Remember the local version number and
  175. // schedule a resync a little bit into the future.
  176. if lv := p.model.RemoteLocalVersion(p.folder); lv < curVer {
  177. // There's a corner case where the device we needed
  178. // files from disconnected during the puller
  179. // iteration. The files will have been removed from
  180. // the index, so we've concluded that we don't need
  181. // them, but at the same time we have the local
  182. // version that includes those files in curVer. So we
  183. // catch the case that localVersion might have
  184. // decreased here.
  185. l.Debugln(p, "adjusting curVer", lv)
  186. curVer = lv
  187. }
  188. prevVer = curVer
  189. if debug {
  190. l.Debugln(p, "next pull in", nextPullIntv)
  191. }
  192. p.pullTimer.Reset(nextPullIntv)
  193. break
  194. }
  195. if tries > 10 {
  196. // We've tried a bunch of times to get in sync, but
  197. // we're not making it. Probably there are write
  198. // errors preventing us. Flag this with a warning and
  199. // wait a bit longer before retrying.
  200. l.Warnf("Folder %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.folder, pauseIntv)
  201. if debug {
  202. l.Debugln(p, "next pull in", pauseIntv)
  203. }
  204. p.pullTimer.Reset(pauseIntv)
  205. break
  206. }
  207. }
  208. p.setState(FolderIdle)
  209. // The reason for running the scanner from within the puller is that
  210. // this is the easiest way to make sure we are not doing both at the
  211. // same time.
  212. case <-p.scanTimer.C:
  213. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  214. l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
  215. rescheduleScan()
  216. continue
  217. }
  218. if debug {
  219. l.Debugln(p, "rescan")
  220. }
  221. if err := p.model.ScanFolder(p.folder); err != nil {
  222. // Potentially sets the error twice, once in the scanner just
  223. // by doing a check, and once here, if the error returned is
  224. // the same one as returned by CheckFolderHealth, though
  225. // duplicate set is handled by setError.
  226. p.setError(err)
  227. rescheduleScan()
  228. continue
  229. }
  230. if p.scanIntv > 0 {
  231. rescheduleScan()
  232. }
  233. if !initialScanCompleted {
  234. l.Infoln("Completed initial scan (rw) of folder", p.folder)
  235. initialScanCompleted = true
  236. }
  237. case next := <-p.delayScan:
  238. p.scanTimer.Reset(next)
  239. }
  240. }
  241. }
  242. func (p *rwFolder) Stop() {
  243. close(p.stop)
  244. }
  245. func (p *rwFolder) String() string {
  246. return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
  247. }
  248. // pullerIteration runs a single puller iteration for the given folder and
  249. // returns the number items that should have been synced (even those that
  250. // might have failed). One puller iteration handles all files currently
  251. // flagged as needed in the folder.
  252. func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
  253. pullChan := make(chan pullBlockState)
  254. copyChan := make(chan copyBlocksState)
  255. finisherChan := make(chan *sharedPullerState)
  256. updateWg := sync.NewWaitGroup()
  257. copyWg := sync.NewWaitGroup()
  258. pullWg := sync.NewWaitGroup()
  259. doneWg := sync.NewWaitGroup()
  260. if debug {
  261. l.Debugln(p, "c", p.copiers, "p", p.pullers)
  262. }
  263. p.dbUpdates = make(chan protocol.FileInfo)
  264. updateWg.Add(1)
  265. go func() {
  266. // dbUpdaterRoutine finishes when p.dbUpdates is closed
  267. p.dbUpdaterRoutine()
  268. updateWg.Done()
  269. }()
  270. for i := 0; i < p.copiers; i++ {
  271. copyWg.Add(1)
  272. go func() {
  273. // copierRoutine finishes when copyChan is closed
  274. p.copierRoutine(copyChan, pullChan, finisherChan)
  275. copyWg.Done()
  276. }()
  277. }
  278. for i := 0; i < p.pullers; i++ {
  279. pullWg.Add(1)
  280. go func() {
  281. // pullerRoutine finishes when pullChan is closed
  282. p.pullerRoutine(pullChan, finisherChan)
  283. pullWg.Done()
  284. }()
  285. }
  286. doneWg.Add(1)
  287. // finisherRoutine finishes when finisherChan is closed
  288. go func() {
  289. p.finisherRoutine(finisherChan)
  290. doneWg.Done()
  291. }()
  292. p.model.fmut.RLock()
  293. folderFiles := p.model.folderFiles[p.folder]
  294. p.model.fmut.RUnlock()
  295. // !!!
  296. // WithNeed takes a database snapshot (by necessity). By the time we've
  297. // handled a bunch of files it might have become out of date and we might
  298. // be attempting to sync with an old version of a file...
  299. // !!!
  300. changed := 0
  301. fileDeletions := map[string]protocol.FileInfo{}
  302. dirDeletions := []protocol.FileInfo{}
  303. buckets := map[string][]protocol.FileInfo{}
  304. folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
  305. // Needed items are delivered sorted lexicographically. We'll handle
  306. // directories as they come along, so parents before children. Files
  307. // are queued and the order may be changed later.
  308. file := intf.(protocol.FileInfo)
  309. if ignores.Match(file.Name) {
  310. // This is an ignored file. Skip it, continue iteration.
  311. return true
  312. }
  313. if debug {
  314. l.Debugln(p, "handling", file.Name)
  315. }
  316. switch {
  317. case file.IsDeleted():
  318. // A deleted file, directory or symlink
  319. if file.IsDirectory() {
  320. dirDeletions = append(dirDeletions, file)
  321. } else {
  322. fileDeletions[file.Name] = file
  323. df, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  324. // Local file can be already deleted, but with a lower version
  325. // number, hence the deletion coming in again as part of
  326. // WithNeed, furthermore, the file can simply be of the wrong
  327. // type if we haven't yet managed to pull it.
  328. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
  329. // Put files into buckets per first hash
  330. key := string(df.Blocks[0].Hash)
  331. buckets[key] = append(buckets[key], df)
  332. }
  333. }
  334. case file.IsDirectory() && !file.IsSymlink():
  335. // A new or changed directory
  336. if debug {
  337. l.Debugln("Creating directory", file.Name)
  338. }
  339. p.handleDir(file)
  340. default:
  341. // A new or changed file or symlink. This is the only case where we
  342. // do stuff concurrently in the background
  343. p.queue.Push(file.Name, file.Size(), file.Modified)
  344. }
  345. changed++
  346. return true
  347. })
  348. // Reorder the file queue according to configuration
  349. switch p.order {
  350. case config.OrderRandom:
  351. p.queue.Shuffle()
  352. case config.OrderAlphabetic:
  353. // The queue is already in alphabetic order.
  354. case config.OrderSmallestFirst:
  355. p.queue.SortSmallestFirst()
  356. case config.OrderLargestFirst:
  357. p.queue.SortLargestFirst()
  358. case config.OrderOldestFirst:
  359. p.queue.SortOldestFirst()
  360. case config.OrderNewestFirst:
  361. p.queue.SortOldestFirst()
  362. }
  363. // Process the file queue
  364. nextFile:
  365. for {
  366. fileName, ok := p.queue.Pop()
  367. if !ok {
  368. break
  369. }
  370. f, ok := p.model.CurrentGlobalFile(p.folder, fileName)
  371. if !ok {
  372. // File is no longer in the index. Mark it as done and drop it.
  373. p.queue.Done(fileName)
  374. continue
  375. }
  376. // Local file can be already deleted, but with a lower version
  377. // number, hence the deletion coming in again as part of
  378. // WithNeed, furthermore, the file can simply be of the wrong type if
  379. // the global index changed while we were processing this iteration.
  380. if !f.IsDeleted() && !f.IsSymlink() && !f.IsDirectory() {
  381. key := string(f.Blocks[0].Hash)
  382. for i, candidate := range buckets[key] {
  383. if scanner.BlocksEqual(candidate.Blocks, f.Blocks) {
  384. // Remove the candidate from the bucket
  385. lidx := len(buckets[key]) - 1
  386. buckets[key][i] = buckets[key][lidx]
  387. buckets[key] = buckets[key][:lidx]
  388. // candidate is our current state of the file, where as the
  389. // desired state with the delete bit set is in the deletion
  390. // map.
  391. desired := fileDeletions[candidate.Name]
  392. // Remove the pending deletion (as we perform it by renaming)
  393. delete(fileDeletions, candidate.Name)
  394. p.renameFile(desired, f)
  395. p.queue.Done(fileName)
  396. continue nextFile
  397. }
  398. }
  399. }
  400. // Not a rename or a symlink, deal with it.
  401. p.handleFile(f, copyChan, finisherChan)
  402. }
  403. // Signal copy and puller routines that we are done with the in data for
  404. // this iteration. Wait for them to finish.
  405. close(copyChan)
  406. copyWg.Wait()
  407. close(pullChan)
  408. pullWg.Wait()
  409. // Signal the finisher chan that there will be no more input.
  410. close(finisherChan)
  411. // Wait for the finisherChan to finish.
  412. doneWg.Wait()
  413. for _, file := range fileDeletions {
  414. if debug {
  415. l.Debugln("Deleting file", file.Name)
  416. }
  417. p.deleteFile(file)
  418. }
  419. for i := range dirDeletions {
  420. dir := dirDeletions[len(dirDeletions)-i-1]
  421. if debug {
  422. l.Debugln("Deleting dir", dir.Name)
  423. }
  424. p.deleteDir(dir)
  425. }
  426. // Wait for db updates to complete
  427. close(p.dbUpdates)
  428. updateWg.Wait()
  429. return changed
  430. }
  431. // handleDir creates or updates the given directory
  432. func (p *rwFolder) handleDir(file protocol.FileInfo) {
  433. var err error
  434. events.Default.Log(events.ItemStarted, map[string]interface{}{
  435. "folder": p.folder,
  436. "item": file.Name,
  437. "type": "dir",
  438. "action": "update",
  439. })
  440. defer func() {
  441. events.Default.Log(events.ItemFinished, map[string]interface{}{
  442. "folder": p.folder,
  443. "item": file.Name,
  444. "error": err,
  445. "type": "dir",
  446. "action": "update",
  447. })
  448. }()
  449. realName := filepath.Join(p.dir, file.Name)
  450. mode := os.FileMode(file.Flags & 0777)
  451. if p.ignorePerms {
  452. mode = 0755
  453. }
  454. if debug {
  455. curFile, _ := p.model.CurrentFolderFile(p.folder, file.Name)
  456. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  457. }
  458. info, err := osutil.Lstat(realName)
  459. switch {
  460. // There is already something under that name, but it's a file/link.
  461. // Most likely a file/link is getting replaced with a directory.
  462. // Remove the file/link and fall through to directory creation.
  463. case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
  464. err = osutil.InWritableDir(osutil.Remove, realName)
  465. if err != nil {
  466. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  467. return
  468. }
  469. fallthrough
  470. // The directory doesn't exist, so we create it with the right
  471. // mode bits from the start.
  472. case err != nil && os.IsNotExist(err):
  473. // We declare a function that acts on only the path name, so
  474. // we can pass it to InWritableDir. We use a regular Mkdir and
  475. // not MkdirAll because the parent should already exist.
  476. mkdir := func(path string) error {
  477. err = os.Mkdir(path, mode)
  478. if err != nil || p.ignorePerms {
  479. return err
  480. }
  481. return os.Chmod(path, mode)
  482. }
  483. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  484. p.dbUpdates <- file
  485. } else {
  486. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  487. }
  488. return
  489. // Weird error when stat()'ing the dir. Probably won't work to do
  490. // anything else with it if we can't even stat() it.
  491. case err != nil:
  492. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  493. return
  494. }
  495. // The directory already exists, so we just correct the mode bits. (We
  496. // don't handle modification times on directories, because that sucks...)
  497. // It's OK to change mode bits on stuff within non-writable directories.
  498. if p.ignorePerms {
  499. p.dbUpdates <- file
  500. } else if err := os.Chmod(realName, mode); err == nil {
  501. p.dbUpdates <- file
  502. } else {
  503. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  504. }
  505. }
  506. // deleteDir attempts to delete the given directory
  507. func (p *rwFolder) deleteDir(file protocol.FileInfo) {
  508. var err error
  509. events.Default.Log(events.ItemStarted, map[string]interface{}{
  510. "folder": p.folder,
  511. "item": file.Name,
  512. "type": "dir",
  513. "action": "delete",
  514. })
  515. defer func() {
  516. events.Default.Log(events.ItemFinished, map[string]interface{}{
  517. "folder": p.folder,
  518. "item": file.Name,
  519. "error": err,
  520. "type": "dir",
  521. "action": "delete",
  522. })
  523. }()
  524. realName := filepath.Join(p.dir, file.Name)
  525. // Delete any temporary files lying around in the directory
  526. dir, _ := os.Open(realName)
  527. if dir != nil {
  528. files, _ := dir.Readdirnames(-1)
  529. for _, file := range files {
  530. if defTempNamer.IsTemporary(file) {
  531. osutil.InWritableDir(osutil.Remove, filepath.Join(realName, file))
  532. }
  533. }
  534. }
  535. err = osutil.InWritableDir(osutil.Remove, realName)
  536. if err == nil || os.IsNotExist(err) {
  537. p.dbUpdates <- file
  538. } else {
  539. l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
  540. }
  541. }
  542. // deleteFile attempts to delete the given file
  543. func (p *rwFolder) deleteFile(file protocol.FileInfo) {
  544. var err error
  545. events.Default.Log(events.ItemStarted, map[string]interface{}{
  546. "folder": p.folder,
  547. "item": file.Name,
  548. "type": "file",
  549. "action": "delete",
  550. })
  551. defer func() {
  552. events.Default.Log(events.ItemFinished, map[string]interface{}{
  553. "folder": p.folder,
  554. "item": file.Name,
  555. "error": err,
  556. "type": "file",
  557. "action": "delete",
  558. })
  559. }()
  560. realName := filepath.Join(p.dir, file.Name)
  561. cur, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  562. if ok && p.inConflict(cur.Version, file.Version) {
  563. // There is a conflict here. Move the file to a conflict copy instead
  564. // of deleting. Also merge with the version vector we had, to indicate
  565. // we have resolved the conflict.
  566. file.Version = file.Version.Merge(cur.Version)
  567. err = osutil.InWritableDir(moveForConflict, realName)
  568. } else if p.versioner != nil {
  569. err = osutil.InWritableDir(p.versioner.Archive, realName)
  570. } else {
  571. err = osutil.InWritableDir(osutil.Remove, realName)
  572. }
  573. if err != nil && !os.IsNotExist(err) {
  574. l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
  575. } else {
  576. p.dbUpdates <- file
  577. }
  578. }
  579. // renameFile attempts to rename an existing file to a destination
  580. // and set the right attributes on it.
  581. func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
  582. var err error
  583. events.Default.Log(events.ItemStarted, map[string]interface{}{
  584. "folder": p.folder,
  585. "item": source.Name,
  586. "type": "file",
  587. "action": "delete",
  588. })
  589. events.Default.Log(events.ItemStarted, map[string]interface{}{
  590. "folder": p.folder,
  591. "item": target.Name,
  592. "type": "file",
  593. "action": "update",
  594. })
  595. defer func() {
  596. events.Default.Log(events.ItemFinished, map[string]interface{}{
  597. "folder": p.folder,
  598. "item": source.Name,
  599. "error": err,
  600. "type": "file",
  601. "action": "delete",
  602. })
  603. events.Default.Log(events.ItemFinished, map[string]interface{}{
  604. "folder": p.folder,
  605. "item": target.Name,
  606. "error": err,
  607. "type": "file",
  608. "action": "update",
  609. })
  610. }()
  611. if debug {
  612. l.Debugln(p, "taking rename shortcut", source.Name, "->", target.Name)
  613. }
  614. from := filepath.Join(p.dir, source.Name)
  615. to := filepath.Join(p.dir, target.Name)
  616. if p.versioner != nil {
  617. err = osutil.Copy(from, to)
  618. if err == nil {
  619. err = osutil.InWritableDir(p.versioner.Archive, from)
  620. }
  621. } else {
  622. err = osutil.TryRename(from, to)
  623. }
  624. if err == nil {
  625. // The file was renamed, so we have handled both the necessary delete
  626. // of the source and the creation of the target. Fix-up the metadata,
  627. // and update the local index of the target file.
  628. p.dbUpdates <- source
  629. err = p.shortcutFile(target)
  630. if err != nil {
  631. l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", p.folder, target.Name, source.Name, err)
  632. return
  633. }
  634. } else {
  635. // We failed the rename so we have a source file that we still need to
  636. // get rid of. Attempt to delete it instead so that we make *some*
  637. // progress. The target is unhandled.
  638. err = osutil.InWritableDir(osutil.Remove, from)
  639. if err != nil {
  640. l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", p.folder, target.Name, source.Name, err)
  641. return
  642. }
  643. p.dbUpdates <- source
  644. }
  645. }
  646. // handleFile queues the copies and pulls as necessary for a single new or
  647. // changed file.
  648. func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
  649. events.Default.Log(events.ItemStarted, map[string]interface{}{
  650. "folder": p.folder,
  651. "item": file.Name,
  652. "type": "file",
  653. "action": "update",
  654. })
  655. curFile, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  656. if ok && len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) {
  657. // We are supposed to copy the entire file, and then fetch nothing. We
  658. // are only updating metadata, so we don't actually *need* to make the
  659. // copy.
  660. if debug {
  661. l.Debugln(p, "taking shortcut on", file.Name)
  662. }
  663. p.queue.Done(file.Name)
  664. var err error
  665. if file.IsSymlink() {
  666. err = p.shortcutSymlink(file)
  667. } else {
  668. err = p.shortcutFile(file)
  669. }
  670. events.Default.Log(events.ItemFinished, map[string]interface{}{
  671. "folder": p.folder,
  672. "item": file.Name,
  673. "error": err,
  674. "type": "file",
  675. "action": "update",
  676. })
  677. return
  678. }
  679. scanner.PopulateOffsets(file.Blocks)
  680. // Figure out the absolute filenames we need once and for all
  681. tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
  682. realName := filepath.Join(p.dir, file.Name)
  683. reused := 0
  684. var blocks []protocol.BlockInfo
  685. // Check for an old temporary file which might have some blocks we could
  686. // reuse.
  687. tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize)
  688. if err == nil {
  689. // Check for any reusable blocks in the temp file
  690. tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
  691. // block.String() returns a string unique to the block
  692. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  693. for _, block := range tempCopyBlocks {
  694. existingBlocks[block.String()] = struct{}{}
  695. }
  696. // Since the blocks are already there, we don't need to get them.
  697. for _, block := range file.Blocks {
  698. _, ok := existingBlocks[block.String()]
  699. if !ok {
  700. blocks = append(blocks, block)
  701. }
  702. }
  703. // The sharedpullerstate will know which flags to use when opening the
  704. // temp file depending if we are reusing any blocks or not.
  705. reused = len(file.Blocks) - len(blocks)
  706. if reused == 0 {
  707. // Otherwise, discard the file ourselves in order for the
  708. // sharedpuller not to panic when it fails to exclusively create a
  709. // file which already exists
  710. os.Remove(tempName)
  711. }
  712. } else {
  713. blocks = file.Blocks
  714. }
  715. s := sharedPullerState{
  716. file: file,
  717. folder: p.folder,
  718. tempName: tempName,
  719. realName: realName,
  720. copyTotal: len(blocks),
  721. copyNeeded: len(blocks),
  722. reused: reused,
  723. ignorePerms: p.ignorePerms,
  724. version: curFile.Version,
  725. mut: sync.NewMutex(),
  726. }
  727. if debug {
  728. l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
  729. }
  730. cs := copyBlocksState{
  731. sharedPullerState: &s,
  732. blocks: blocks,
  733. }
  734. copyChan <- cs
  735. }
  736. // shortcutFile sets file mode and modification time, when that's the only
  737. // thing that has changed.
  738. func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) {
  739. realName := filepath.Join(p.dir, file.Name)
  740. if !p.ignorePerms {
  741. err = os.Chmod(realName, os.FileMode(file.Flags&0777))
  742. if err != nil {
  743. l.Infof("Puller (folder %q, file %q): shortcut: %v", p.folder, file.Name, err)
  744. return
  745. }
  746. }
  747. t := time.Unix(file.Modified, 0)
  748. err = os.Chtimes(realName, t, t)
  749. if err != nil {
  750. if p.lenientMtimes {
  751. err = nil
  752. // We accept the failure with a warning here and allow the sync to
  753. // continue. We'll sync the new mtime back to the other devices later.
  754. // If they have the same problem & setting, we might never get in
  755. // sync.
  756. l.Infof("Puller (folder %q, file %q): shortcut: %v (continuing anyway as requested)", p.folder, file.Name, err)
  757. } else {
  758. l.Infof("Puller (folder %q, file %q): shortcut: %v", p.folder, file.Name, err)
  759. return
  760. }
  761. }
  762. // This may have been a conflict. We should merge the version vectors so
  763. // that our clock doesn't move backwards.
  764. if cur, ok := p.model.CurrentFolderFile(p.folder, file.Name); ok {
  765. file.Version = file.Version.Merge(cur.Version)
  766. }
  767. p.dbUpdates <- file
  768. return
  769. }
  770. // shortcutSymlink changes the symlinks type if necessary.
  771. func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
  772. err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags)
  773. if err == nil {
  774. p.dbUpdates <- file
  775. } else {
  776. l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
  777. }
  778. return
  779. }
  780. // copierRoutine reads copierStates until the in channel closes and performs
  781. // the relevant copies when possible, or passes it to the puller routine.
  782. func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  783. buf := make([]byte, protocol.BlockSize)
  784. for state := range in {
  785. if p.progressEmitter != nil {
  786. p.progressEmitter.Register(state.sharedPullerState)
  787. }
  788. dstFd, err := state.tempFile()
  789. if err != nil {
  790. // Nothing more to do for this failed file (the error was logged
  791. // when it happened)
  792. out <- state.sharedPullerState
  793. continue
  794. }
  795. folderRoots := make(map[string]string)
  796. p.model.fmut.RLock()
  797. for folder, cfg := range p.model.folderCfgs {
  798. folderRoots[folder] = cfg.Path()
  799. }
  800. p.model.fmut.RUnlock()
  801. for _, block := range state.blocks {
  802. buf = buf[:int(block.Size)]
  803. found := p.model.finder.Iterate(block.Hash, func(folder, file string, index int32) bool {
  804. fd, err := os.Open(filepath.Join(folderRoots[folder], file))
  805. if err != nil {
  806. return false
  807. }
  808. _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
  809. fd.Close()
  810. if err != nil {
  811. return false
  812. }
  813. hash, err := scanner.VerifyBuffer(buf, block)
  814. if err != nil {
  815. if hash != nil {
  816. if debug {
  817. l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
  818. }
  819. err = p.model.finder.Fix(folder, file, index, block.Hash, hash)
  820. if err != nil {
  821. l.Warnln("finder fix:", err)
  822. }
  823. } else if debug {
  824. l.Debugln("Finder failed to verify buffer", err)
  825. }
  826. return false
  827. }
  828. _, err = dstFd.WriteAt(buf, block.Offset)
  829. if err != nil {
  830. state.fail("dst write", err)
  831. }
  832. if file == state.file.Name {
  833. state.copiedFromOrigin()
  834. }
  835. return true
  836. })
  837. if state.failed() != nil {
  838. break
  839. }
  840. if !found {
  841. state.pullStarted()
  842. ps := pullBlockState{
  843. sharedPullerState: state.sharedPullerState,
  844. block: block,
  845. }
  846. pullChan <- ps
  847. } else {
  848. state.copyDone()
  849. }
  850. }
  851. out <- state.sharedPullerState
  852. }
  853. }
  854. func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  855. for state := range in {
  856. if state.failed() != nil {
  857. continue
  858. }
  859. // Get an fd to the temporary file. Technically we don't need it until
  860. // after fetching the block, but if we run into an error here there is
  861. // no point in issuing the request to the network.
  862. fd, err := state.tempFile()
  863. if err != nil {
  864. continue
  865. }
  866. var lastError error
  867. potentialDevices := p.model.Availability(p.folder, state.file.Name)
  868. for {
  869. // Select the least busy device to pull the block from. If we found no
  870. // feasible device at all, fail the block (and in the long run, the
  871. // file).
  872. selected := activity.leastBusy(potentialDevices)
  873. if selected == (protocol.DeviceID{}) {
  874. if lastError != nil {
  875. state.fail("pull", lastError)
  876. } else {
  877. state.fail("pull", errNoDevice)
  878. }
  879. break
  880. }
  881. potentialDevices = removeDevice(potentialDevices, selected)
  882. // Fetch the block, while marking the selected device as in use so that
  883. // leastBusy can select another device when someone else asks.
  884. activity.using(selected)
  885. buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, 0, nil)
  886. activity.done(selected)
  887. if lastError != nil {
  888. continue
  889. }
  890. // Verify that the received block matches the desired hash, if not
  891. // try pulling it from another device.
  892. _, lastError = scanner.VerifyBuffer(buf, state.block)
  893. if lastError != nil {
  894. continue
  895. }
  896. // Save the block data we got from the cluster
  897. _, err = fd.WriteAt(buf, state.block.Offset)
  898. if err != nil {
  899. state.fail("save", err)
  900. } else {
  901. state.pullDone()
  902. }
  903. break
  904. }
  905. out <- state.sharedPullerState
  906. }
  907. }
  908. func (p *rwFolder) performFinish(state *sharedPullerState) {
  909. var err error
  910. defer func() {
  911. events.Default.Log(events.ItemFinished, map[string]interface{}{
  912. "folder": p.folder,
  913. "item": state.file.Name,
  914. "error": err,
  915. "type": "file",
  916. "action": "update",
  917. })
  918. }()
  919. // Set the correct permission bits on the new file
  920. if !p.ignorePerms {
  921. err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
  922. if err != nil {
  923. l.Warnln("Puller: final:", err)
  924. return
  925. }
  926. }
  927. // Set the correct timestamp on the new file
  928. t := time.Unix(state.file.Modified, 0)
  929. err = os.Chtimes(state.tempName, t, t)
  930. if err != nil {
  931. if p.lenientMtimes {
  932. // We accept the failure with a warning here and allow the sync to
  933. // continue. We'll sync the new mtime back to the other devices later.
  934. // If they have the same problem & setting, we might never get in
  935. // sync.
  936. l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err)
  937. } else {
  938. l.Warnln("Puller: final:", err)
  939. return
  940. }
  941. }
  942. if p.inConflict(state.version, state.file.Version) {
  943. // The new file has been changed in conflict with the existing one. We
  944. // should file it away as a conflict instead of just removing or
  945. // archiving. Also merge with the version vector we had, to indicate
  946. // we have resolved the conflict.
  947. state.file.Version = state.file.Version.Merge(state.version)
  948. err = osutil.InWritableDir(moveForConflict, state.realName)
  949. } else if p.versioner != nil {
  950. // If we should use versioning, let the versioner archive the old
  951. // file before we replace it. Archiving a non-existent file is not
  952. // an error.
  953. err = p.versioner.Archive(state.realName)
  954. } else {
  955. err = nil
  956. }
  957. if err != nil {
  958. l.Warnln("Puller: final:", err)
  959. return
  960. }
  961. // If the target path is a symlink or a directory, we cannot copy
  962. // over it, hence remove it before proceeding.
  963. stat, err := osutil.Lstat(state.realName)
  964. if err == nil && (stat.IsDir() || stat.Mode()&os.ModeSymlink != 0) {
  965. osutil.InWritableDir(osutil.Remove, state.realName)
  966. }
  967. // Replace the original content with the new one
  968. err = osutil.Rename(state.tempName, state.realName)
  969. if err != nil {
  970. l.Warnln("Puller: final:", err)
  971. return
  972. }
  973. // If it's a symlink, the target of the symlink is inside the file.
  974. if state.file.IsSymlink() {
  975. content, err := ioutil.ReadFile(state.realName)
  976. if err != nil {
  977. l.Warnln("Puller: final: reading symlink:", err)
  978. return
  979. }
  980. // Remove the file, and replace it with a symlink.
  981. err = osutil.InWritableDir(func(path string) error {
  982. os.Remove(path)
  983. return symlinks.Create(path, string(content), state.file.Flags)
  984. }, state.realName)
  985. if err != nil {
  986. l.Warnln("Puller: final: creating symlink:", err)
  987. return
  988. }
  989. }
  990. // Record the updated file in the index
  991. p.dbUpdates <- state.file
  992. }
  993. func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
  994. for state := range in {
  995. if closed, err := state.finalClose(); closed {
  996. if debug {
  997. l.Debugln(p, "closing", state.file.Name)
  998. }
  999. if err != nil {
  1000. l.Warnln("Puller: final:", err)
  1001. continue
  1002. }
  1003. p.queue.Done(state.file.Name)
  1004. if state.failed() == nil {
  1005. p.performFinish(state)
  1006. } else {
  1007. events.Default.Log(events.ItemFinished, map[string]interface{}{
  1008. "folder": p.folder,
  1009. "item": state.file.Name,
  1010. "error": state.failed(),
  1011. "type": "file",
  1012. "action": "update",
  1013. })
  1014. }
  1015. p.model.receivedFile(p.folder, state.file.Name)
  1016. if p.progressEmitter != nil {
  1017. p.progressEmitter.Deregister(state)
  1018. }
  1019. }
  1020. }
  1021. }
  1022. // Moves the given filename to the front of the job queue
  1023. func (p *rwFolder) BringToFront(filename string) {
  1024. p.queue.BringToFront(filename)
  1025. }
  1026. func (p *rwFolder) Jobs() ([]string, []string) {
  1027. return p.queue.Jobs()
  1028. }
  1029. func (p *rwFolder) DelayScan(next time.Duration) {
  1030. p.delayScan <- next
  1031. }
  1032. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1033. // larger than 1000 items, and no more delayed than 2 seconds.
  1034. func (p *rwFolder) dbUpdaterRoutine() {
  1035. const (
  1036. maxBatchSize = 1000
  1037. maxBatchTime = 2 * time.Second
  1038. )
  1039. batch := make([]protocol.FileInfo, 0, maxBatchSize)
  1040. tick := time.NewTicker(maxBatchTime)
  1041. defer tick.Stop()
  1042. loop:
  1043. for {
  1044. select {
  1045. case file, ok := <-p.dbUpdates:
  1046. if !ok {
  1047. break loop
  1048. }
  1049. file.LocalVersion = 0
  1050. batch = append(batch, file)
  1051. if len(batch) == maxBatchSize {
  1052. p.model.updateLocals(p.folder, batch)
  1053. batch = batch[:0]
  1054. }
  1055. case <-tick.C:
  1056. if len(batch) > 0 {
  1057. p.model.updateLocals(p.folder, batch)
  1058. batch = batch[:0]
  1059. }
  1060. }
  1061. }
  1062. if len(batch) > 0 {
  1063. p.model.updateLocals(p.folder, batch)
  1064. }
  1065. }
  1066. func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool {
  1067. if current.Concurrent(replacement) {
  1068. // Obvious case
  1069. return true
  1070. }
  1071. if replacement.Counter(p.shortID) > current.Counter(p.shortID) {
  1072. // The replacement file contains a higher version for ourselves than
  1073. // what we have. This isn't supposed to be possible, since it's only
  1074. // we who can increment that counter. We take it as a sign that
  1075. // something is wrong (our index may have been corrupted or removed)
  1076. // and flag it as a conflict.
  1077. return true
  1078. }
  1079. return false
  1080. }
  1081. func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
  1082. for i := range cfg.Folders {
  1083. folder := &cfg.Folders[i]
  1084. if folder.ID == folderID {
  1085. folder.Invalid = err.Error()
  1086. return
  1087. }
  1088. }
  1089. }
  1090. func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID {
  1091. for i := range devices {
  1092. if devices[i] == device {
  1093. devices[i] = devices[len(devices)-1]
  1094. return devices[:len(devices)-1]
  1095. }
  1096. }
  1097. return devices
  1098. }
  1099. func moveForConflict(name string) error {
  1100. ext := filepath.Ext(name)
  1101. withoutExt := name[:len(name)-len(ext)]
  1102. newName := withoutExt + time.Now().Format(".sync-conflict-20060102-150405") + ext
  1103. err := os.Rename(name, newName)
  1104. if os.IsNotExist(err) {
  1105. // We were supposed to move a file away but it does not exist. Either
  1106. // the user has already moved it away, or the conflict was between a
  1107. // remote modification and a local delete. In either way it does not
  1108. // matter, go ahead as if the move succeeded.
  1109. return nil
  1110. }
  1111. return err
  1112. }