puller.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This program is free software: you can redistribute it and/or modify it
  4. // under the terms of the GNU General Public License as published by the Free
  5. // Software Foundation, either version 3 of the License, or (at your option)
  6. // any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful, but WITHOUT
  9. // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  11. // more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program. If not, see <http://www.gnu.org/licenses/>.
  15. package model
  16. import (
  17. "bytes"
  18. "crypto/sha256"
  19. "errors"
  20. "fmt"
  21. "io/ioutil"
  22. "os"
  23. "path/filepath"
  24. "sync"
  25. "time"
  26. "github.com/AudriusButkevicius/lfu-go"
  27. "github.com/syncthing/syncthing/internal/config"
  28. "github.com/syncthing/syncthing/internal/events"
  29. "github.com/syncthing/syncthing/internal/osutil"
  30. "github.com/syncthing/syncthing/internal/protocol"
  31. "github.com/syncthing/syncthing/internal/scanner"
  32. "github.com/syncthing/syncthing/internal/symlinks"
  33. "github.com/syncthing/syncthing/internal/versioner"
  34. )
  35. // TODO: Stop on errors
  36. const (
  37. pauseIntv = 60 * time.Second
  38. nextPullIntv = 10 * time.Second
  39. checkPullIntv = 1 * time.Second
  40. )
  41. // A pullBlockState is passed to the puller routine for each block that needs
  42. // to be fetched.
  43. type pullBlockState struct {
  44. *sharedPullerState
  45. block protocol.BlockInfo
  46. }
  47. // A copyBlocksState is passed to copy routine if the file has blocks to be
  48. // copied.
  49. type copyBlocksState struct {
  50. *sharedPullerState
  51. blocks []protocol.BlockInfo
  52. }
  53. var (
  54. activity = newDeviceActivity()
  55. errNoDevice = errors.New("no available source device")
  56. )
  57. type Puller struct {
  58. folder string
  59. dir string
  60. scanIntv time.Duration
  61. model *Model
  62. stop chan struct{}
  63. versioner versioner.Versioner
  64. ignorePerms bool
  65. lenientMtimes bool
  66. progressEmitter *ProgressEmitter
  67. copiers int
  68. pullers int
  69. finishers int
  70. }
  71. // Serve will run scans and pulls. It will return when Stop()ed or on a
  72. // critical error.
  73. func (p *Puller) Serve() {
  74. if debug {
  75. l.Debugln(p, "starting")
  76. defer l.Debugln(p, "exiting")
  77. }
  78. p.stop = make(chan struct{})
  79. pullTimer := time.NewTimer(checkPullIntv)
  80. scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately.
  81. defer func() {
  82. pullTimer.Stop()
  83. scanTimer.Stop()
  84. // TODO: Should there be an actual FolderStopped state?
  85. p.model.setState(p.folder, FolderIdle)
  86. }()
  87. var prevVer uint64
  88. // We don't start pulling files until a scan has been completed.
  89. initialScanCompleted := false
  90. loop:
  91. for {
  92. select {
  93. case <-p.stop:
  94. return
  95. // TODO: We could easily add a channel here for notifications from
  96. // Index(), so that we immediately start a pull when new index
  97. // information is available. Before that though, I'd like to build a
  98. // repeatable benchmark of how long it takes to sync a change from
  99. // device A to device B, so we have something to work against.
  100. case <-pullTimer.C:
  101. if !initialScanCompleted {
  102. // How did we even get here?
  103. if debug {
  104. l.Debugln(p, "skip (initial)")
  105. }
  106. pullTimer.Reset(nextPullIntv)
  107. continue
  108. }
  109. // RemoteLocalVersion() is a fast call, doesn't touch the database.
  110. curVer := p.model.RemoteLocalVersion(p.folder)
  111. if curVer == prevVer {
  112. if debug {
  113. l.Debugln(p, "skip (curVer == prevVer)", prevVer)
  114. }
  115. pullTimer.Reset(checkPullIntv)
  116. continue
  117. }
  118. if debug {
  119. l.Debugln(p, "pulling", prevVer, curVer)
  120. }
  121. p.model.setState(p.folder, FolderSyncing)
  122. tries := 0
  123. checksum := false
  124. for {
  125. tries++
  126. // Last resort mode, to get around corrupt/invalid block maps.
  127. if tries == 10 {
  128. l.Infoln("Desperation mode ON")
  129. checksum = true
  130. }
  131. changed := p.pullerIteration(checksum)
  132. if debug {
  133. l.Debugln(p, "changed", changed)
  134. }
  135. if changed == 0 {
  136. // No files were changed by the puller, so we are in
  137. // sync. Remember the local version number and
  138. // schedule a resync a little bit into the future.
  139. if lv := p.model.RemoteLocalVersion(p.folder); lv < curVer {
  140. // There's a corner case where the device we needed
  141. // files from disconnected during the puller
  142. // iteration. The files will have been removed from
  143. // the index, so we've concluded that we don't need
  144. // them, but at the same time we have the local
  145. // version that includes those files in curVer. So we
  146. // catch the case that localVersion might have
  147. // decresed here.
  148. l.Debugln(p, "adjusting curVer", lv)
  149. curVer = lv
  150. }
  151. prevVer = curVer
  152. if debug {
  153. l.Debugln(p, "next pull in", nextPullIntv)
  154. }
  155. pullTimer.Reset(nextPullIntv)
  156. break
  157. }
  158. if tries > 10 {
  159. // We've tried a bunch of times to get in sync, but
  160. // we're not making it. Probably there are write
  161. // errors preventing us. Flag this with a warning and
  162. // wait a bit longer before retrying.
  163. l.Warnf("Folder %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.folder, pauseIntv)
  164. if debug {
  165. l.Debugln(p, "next pull in", pauseIntv)
  166. }
  167. pullTimer.Reset(pauseIntv)
  168. break
  169. }
  170. }
  171. p.model.setState(p.folder, FolderIdle)
  172. // The reason for running the scanner from within the puller is that
  173. // this is the easiest way to make sure we are not doing both at the
  174. // same time.
  175. case <-scanTimer.C:
  176. if debug {
  177. l.Debugln(p, "rescan")
  178. }
  179. p.model.setState(p.folder, FolderScanning)
  180. if err := p.model.ScanFolder(p.folder); err != nil {
  181. p.model.cfg.InvalidateFolder(p.folder, err.Error())
  182. break loop
  183. }
  184. p.model.setState(p.folder, FolderIdle)
  185. if p.scanIntv > 0 {
  186. if debug {
  187. l.Debugln(p, "next rescan in", p.scanIntv)
  188. }
  189. scanTimer.Reset(p.scanIntv)
  190. }
  191. if !initialScanCompleted {
  192. l.Infoln("Completed initial scan (rw) of folder", p.folder)
  193. initialScanCompleted = true
  194. }
  195. }
  196. }
  197. }
  198. func (p *Puller) Stop() {
  199. close(p.stop)
  200. }
  201. func (p *Puller) String() string {
  202. return fmt.Sprintf("puller/%s@%p", p.folder, p)
  203. }
  204. // pullerIteration runs a single puller iteration for the given folder and
  205. // returns the number items that should have been synced (even those that
  206. // might have failed). One puller iteration handles all files currently
  207. // flagged as needed in the folder.
  208. func (p *Puller) pullerIteration(checksum bool) int {
  209. pullChan := make(chan pullBlockState)
  210. copyChan := make(chan copyBlocksState)
  211. finisherChan := make(chan *sharedPullerState)
  212. var copyWg sync.WaitGroup
  213. var pullWg sync.WaitGroup
  214. var doneWg sync.WaitGroup
  215. if debug {
  216. l.Debugln(p, "c", p.copiers, "p", p.pullers, "f", p.finishers)
  217. }
  218. for i := 0; i < p.copiers; i++ {
  219. copyWg.Add(1)
  220. go func() {
  221. // copierRoutine finishes when copyChan is closed
  222. p.copierRoutine(copyChan, pullChan, finisherChan, checksum)
  223. copyWg.Done()
  224. }()
  225. }
  226. for i := 0; i < p.pullers; i++ {
  227. pullWg.Add(1)
  228. go func() {
  229. // pullerRoutine finishes when pullChan is closed
  230. p.pullerRoutine(pullChan, finisherChan)
  231. pullWg.Done()
  232. }()
  233. }
  234. for i := 0; i < p.finishers; i++ {
  235. doneWg.Add(1)
  236. // finisherRoutine finishes when finisherChan is closed
  237. go func() {
  238. p.finisherRoutine(finisherChan)
  239. doneWg.Done()
  240. }()
  241. }
  242. p.model.fmut.RLock()
  243. files := p.model.folderFiles[p.folder]
  244. p.model.fmut.RUnlock()
  245. // !!!
  246. // WithNeed takes a database snapshot (by necessity). By the time we've
  247. // handled a bunch of files it might have become out of date and we might
  248. // be attempting to sync with an old version of a file...
  249. // !!!
  250. changed := 0
  251. var deletions []protocol.FileInfo
  252. files.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
  253. // Needed items are delivered sorted lexicographically. This isn't
  254. // really optimal from a performance point of view - it would be
  255. // better if files were handled in random order, to spread the load
  256. // over the cluster. But it means that we can be sure that we fully
  257. // handle directories before the files that go inside them, which is
  258. // nice.
  259. file := intf.(protocol.FileInfo)
  260. events.Default.Log(events.ItemStarted, map[string]string{
  261. "folder": p.folder,
  262. "item": file.Name,
  263. })
  264. if debug {
  265. l.Debugln(p, "handling", file.Name)
  266. }
  267. switch {
  268. case file.IsDeleted():
  269. // A deleted file, directory or symlink
  270. deletions = append(deletions, file)
  271. case file.IsDirectory() && !file.IsSymlink():
  272. // A new or changed directory
  273. p.handleDir(file)
  274. default:
  275. // A new or changed file or symlink. This is the only case where we
  276. // do stuff in the background; the other three are done
  277. // synchronously.
  278. p.handleFile(file, copyChan, finisherChan)
  279. }
  280. changed++
  281. return true
  282. })
  283. // Signal copy and puller routines that we are done with the in data for
  284. // this iteration. Wait for them to finish.
  285. close(copyChan)
  286. copyWg.Wait()
  287. close(pullChan)
  288. pullWg.Wait()
  289. // Signal the finisher chan that there will be no more input.
  290. close(finisherChan)
  291. // Wait for the finisherChan to finish.
  292. doneWg.Wait()
  293. for i := range deletions {
  294. deletion := deletions[len(deletions)-i-1]
  295. if deletion.IsDirectory() {
  296. p.deleteDir(deletion)
  297. } else {
  298. p.deleteFile(deletion)
  299. }
  300. }
  301. return changed
  302. }
  303. // handleDir creates or updates the given directory
  304. func (p *Puller) handleDir(file protocol.FileInfo) {
  305. realName := filepath.Join(p.dir, file.Name)
  306. mode := os.FileMode(file.Flags & 0777)
  307. if p.ignorePerms {
  308. mode = 0755
  309. }
  310. if debug {
  311. curFile := p.model.CurrentFolderFile(p.folder, file.Name)
  312. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  313. }
  314. info, err := os.Lstat(realName)
  315. switch {
  316. // There is already something under that name, but it's a file/link.
  317. // Most likely a file/link is getting replaced with a directory.
  318. // Remove the file/link and fall through to directory creation.
  319. case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
  320. err = osutil.InWritableDir(os.Remove, realName)
  321. if err != nil {
  322. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  323. return
  324. }
  325. fallthrough
  326. // The directory doesn't exist, so we create it with the right
  327. // mode bits from the start.
  328. case err != nil && os.IsNotExist(err):
  329. // We declare a function that acts on only the path name, so
  330. // we can pass it to InWritableDir. We use a regular Mkdir and
  331. // not MkdirAll because the parent should already exist.
  332. mkdir := func(path string) error {
  333. return os.Mkdir(path, mode)
  334. }
  335. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  336. p.model.updateLocal(p.folder, file)
  337. } else {
  338. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  339. }
  340. return
  341. // Weird error when stat()'ing the dir. Probably won't work to do
  342. // anything else with it if we can't even stat() it.
  343. case err != nil:
  344. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  345. return
  346. }
  347. // The directory already exists, so we just correct the mode bits. (We
  348. // don't handle modification times on directories, because that sucks...)
  349. // It's OK to change mode bits on stuff within non-writable directories.
  350. if p.ignorePerms {
  351. p.model.updateLocal(p.folder, file)
  352. } else if err := os.Chmod(realName, mode); err == nil {
  353. p.model.updateLocal(p.folder, file)
  354. } else {
  355. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  356. }
  357. }
  358. // deleteDir attempts to delete the given directory
  359. func (p *Puller) deleteDir(file protocol.FileInfo) {
  360. realName := filepath.Join(p.dir, file.Name)
  361. // Delete any temporary files lying around in the directory
  362. dir, _ := os.Open(realName)
  363. if dir != nil {
  364. files, _ := dir.Readdirnames(-1)
  365. for _, file := range files {
  366. if defTempNamer.IsTemporary(file) {
  367. osutil.InWritableDir(os.Remove, filepath.Join(realName, file))
  368. }
  369. }
  370. }
  371. err := osutil.InWritableDir(os.Remove, realName)
  372. if err == nil || os.IsNotExist(err) {
  373. p.model.updateLocal(p.folder, file)
  374. } else {
  375. l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
  376. }
  377. }
  378. // deleteFile attempts to delete the given file
  379. func (p *Puller) deleteFile(file protocol.FileInfo) {
  380. realName := filepath.Join(p.dir, file.Name)
  381. var err error
  382. if p.versioner != nil {
  383. err = osutil.InWritableDir(p.versioner.Archive, realName)
  384. } else {
  385. err = osutil.InWritableDir(os.Remove, realName)
  386. }
  387. if err != nil && !os.IsNotExist(err) {
  388. l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
  389. } else {
  390. p.model.updateLocal(p.folder, file)
  391. }
  392. }
  393. // handleFile queues the copies and pulls as necessary for a single new or
  394. // changed file.
  395. func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
  396. curFile := p.model.CurrentFolderFile(p.folder, file.Name)
  397. if len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) {
  398. // We are supposed to copy the entire file, and then fetch nothing. We
  399. // are only updating metadata, so we don't actually *need* to make the
  400. // copy.
  401. if debug {
  402. l.Debugln(p, "taking shortcut on", file.Name)
  403. }
  404. if file.IsSymlink() {
  405. p.shortcutSymlink(curFile, file)
  406. } else {
  407. p.shortcutFile(file)
  408. }
  409. return
  410. }
  411. scanner.PopulateOffsets(file.Blocks)
  412. // Figure out the absolute filenames we need once and for all
  413. tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
  414. realName := filepath.Join(p.dir, file.Name)
  415. reused := 0
  416. var blocks []protocol.BlockInfo
  417. // Check for an old temporary file which might have some blocks we could
  418. // reuse.
  419. tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize)
  420. if err == nil {
  421. // Check for any reusable blocks in the temp file
  422. tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
  423. // block.String() returns a string unique to the block
  424. existingBlocks := make(map[string]bool, len(tempCopyBlocks))
  425. for _, block := range tempCopyBlocks {
  426. existingBlocks[block.String()] = true
  427. }
  428. // Since the blocks are already there, we don't need to get them.
  429. for _, block := range file.Blocks {
  430. _, ok := existingBlocks[block.String()]
  431. if !ok {
  432. blocks = append(blocks, block)
  433. }
  434. }
  435. // The sharedpullerstate will know which flags to use when opening the
  436. // temp file depending if we are reusing any blocks or not.
  437. reused = len(file.Blocks) - len(blocks)
  438. if reused == 0 {
  439. // Otherwise, discard the file ourselves in order for the
  440. // sharedpuller not to panic when it fails to exlusively create a
  441. // file which already exists
  442. os.Remove(tempName)
  443. }
  444. } else {
  445. blocks = file.Blocks
  446. }
  447. s := sharedPullerState{
  448. file: file,
  449. folder: p.folder,
  450. tempName: tempName,
  451. realName: realName,
  452. copyTotal: uint32(len(blocks)),
  453. copyNeeded: uint32(len(blocks)),
  454. reused: uint32(reused),
  455. }
  456. if debug {
  457. l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
  458. }
  459. cs := copyBlocksState{
  460. sharedPullerState: &s,
  461. blocks: blocks,
  462. }
  463. copyChan <- cs
  464. }
  465. // shortcutFile sets file mode and modification time, when that's the only
  466. // thing that has changed.
  467. func (p *Puller) shortcutFile(file protocol.FileInfo) {
  468. realName := filepath.Join(p.dir, file.Name)
  469. if !p.ignorePerms {
  470. err := os.Chmod(realName, os.FileMode(file.Flags&0777))
  471. if err != nil {
  472. l.Infof("Puller (folder %q, file %q): shortcut: %v", p.folder, file.Name, err)
  473. return
  474. }
  475. }
  476. t := time.Unix(file.Modified, 0)
  477. err := os.Chtimes(realName, t, t)
  478. if err != nil {
  479. if p.lenientMtimes {
  480. // We accept the failure with a warning here and allow the sync to
  481. // continue. We'll sync the new mtime back to the other devices later.
  482. // If they have the same problem & setting, we might never get in
  483. // sync.
  484. l.Infof("Puller (folder %q, file %q): shortcut: %v (continuing anyway as requested)", p.folder, file.Name, err)
  485. } else {
  486. l.Infof("Puller (folder %q, file %q): shortcut: %v", p.folder, file.Name, err)
  487. return
  488. }
  489. }
  490. p.model.updateLocal(p.folder, file)
  491. }
  492. // shortcutSymlink changes the symlinks type if necessery.
  493. func (p *Puller) shortcutSymlink(curFile, file protocol.FileInfo) {
  494. err := symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags)
  495. if err != nil {
  496. l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
  497. return
  498. }
  499. p.model.updateLocal(p.folder, file)
  500. }
  501. // copierRoutine reads copierStates until the in channel closes and performs
  502. // the relevant copies when possible, or passes it to the puller routine.
  503. func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState, checksum bool) {
  504. buf := make([]byte, protocol.BlockSize)
  505. nextFile:
  506. for state := range in {
  507. dstFd, err := state.tempFile()
  508. if err != nil {
  509. // Nothing more to do for this failed file (the error was logged
  510. // when it happened)
  511. continue nextFile
  512. }
  513. if p.progressEmitter != nil {
  514. p.progressEmitter.Register(state.sharedPullerState)
  515. }
  516. evictionChan := make(chan lfu.Eviction)
  517. fdCache := lfu.New()
  518. fdCache.UpperBound = 50
  519. fdCache.LowerBound = 20
  520. fdCache.EvictionChannel = evictionChan
  521. go func() {
  522. for item := range evictionChan {
  523. item.Value.(*os.File).Close()
  524. }
  525. }()
  526. folderRoots := make(map[string]string)
  527. p.model.fmut.RLock()
  528. for folder, cfg := range p.model.folderCfgs {
  529. folderRoots[folder] = cfg.Path
  530. }
  531. p.model.fmut.RUnlock()
  532. hasher := sha256.New()
  533. for _, block := range state.blocks {
  534. buf = buf[:int(block.Size)]
  535. found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool {
  536. path := filepath.Join(folderRoots[folder], file)
  537. var fd *os.File
  538. fdi := fdCache.Get(path)
  539. if fdi != nil {
  540. fd = fdi.(*os.File)
  541. } else {
  542. fd, err = os.Open(path)
  543. if err != nil {
  544. return false
  545. }
  546. fdCache.Set(path, fd)
  547. }
  548. _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
  549. if err != nil {
  550. return false
  551. }
  552. // Only done on second to last puller attempt
  553. if checksum {
  554. hasher.Write(buf)
  555. hash := hasher.Sum(nil)
  556. hasher.Reset()
  557. if !bytes.Equal(hash, block.Hash) {
  558. if debug {
  559. l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
  560. }
  561. err = p.model.finder.Fix(folder, file, index, block.Hash, hash)
  562. if err != nil {
  563. l.Warnln("finder fix:", err)
  564. }
  565. return false
  566. }
  567. }
  568. _, err = dstFd.WriteAt(buf, block.Offset)
  569. if err != nil {
  570. state.earlyClose("dst write", err)
  571. }
  572. if file == state.file.Name {
  573. state.copiedFromOrigin()
  574. }
  575. return true
  576. })
  577. if state.failed() != nil {
  578. break
  579. }
  580. if !found {
  581. state.pullStarted()
  582. ps := pullBlockState{
  583. sharedPullerState: state.sharedPullerState,
  584. block: block,
  585. }
  586. pullChan <- ps
  587. } else {
  588. state.copyDone()
  589. }
  590. }
  591. fdCache.Evict(fdCache.Len())
  592. close(evictionChan)
  593. out <- state.sharedPullerState
  594. }
  595. }
  596. func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  597. nextBlock:
  598. for state := range in {
  599. if state.failed() != nil {
  600. continue nextBlock
  601. }
  602. // Select the least busy device to pull the block from. If we found no
  603. // feasible device at all, fail the block (and in the long run, the
  604. // file).
  605. potentialDevices := p.model.availability(p.folder, state.file.Name)
  606. selected := activity.leastBusy(potentialDevices)
  607. if selected == (protocol.DeviceID{}) {
  608. state.earlyClose("pull", errNoDevice)
  609. continue nextBlock
  610. }
  611. // Get an fd to the temporary file. Tehcnically we don't need it until
  612. // after fetching the block, but if we run into an error here there is
  613. // no point in issuing the request to the network.
  614. fd, err := state.tempFile()
  615. if err != nil {
  616. continue nextBlock
  617. }
  618. // Fetch the block, while marking the selected device as in use so that
  619. // leastBusy can select another device when someone else asks.
  620. activity.using(selected)
  621. buf, err := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash)
  622. activity.done(selected)
  623. if err != nil {
  624. state.earlyClose("pull", err)
  625. continue nextBlock
  626. }
  627. // Save the block data we got from the cluster
  628. _, err = fd.WriteAt(buf, state.block.Offset)
  629. if err != nil {
  630. state.earlyClose("save", err)
  631. continue nextBlock
  632. }
  633. state.pullDone()
  634. out <- state.sharedPullerState
  635. }
  636. }
  637. func (p *Puller) performFinish(state *sharedPullerState) {
  638. // Verify the file against expected hashes
  639. fd, err := os.Open(state.tempName)
  640. if err != nil {
  641. l.Warnln("puller: final:", err)
  642. return
  643. }
  644. err = scanner.Verify(fd, protocol.BlockSize, state.file.Blocks)
  645. fd.Close()
  646. if err != nil {
  647. l.Infoln("puller:", state.file.Name, err, "(file changed during pull?)")
  648. return
  649. }
  650. // Set the correct permission bits on the new file
  651. if !p.ignorePerms {
  652. err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
  653. if err != nil {
  654. l.Warnln("puller: final:", err)
  655. return
  656. }
  657. }
  658. // Set the correct timestamp on the new file
  659. t := time.Unix(state.file.Modified, 0)
  660. err = os.Chtimes(state.tempName, t, t)
  661. if err != nil {
  662. if p.lenientMtimes {
  663. // We accept the failure with a warning here and allow the sync to
  664. // continue. We'll sync the new mtime back to the other devices later.
  665. // If they have the same problem & setting, we might never get in
  666. // sync.
  667. l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err)
  668. } else {
  669. l.Warnln("puller: final:", err)
  670. return
  671. }
  672. }
  673. // If we should use versioning, let the versioner archive the old
  674. // file before we replace it. Archiving a non-existent file is not
  675. // an error.
  676. if p.versioner != nil {
  677. err = p.versioner.Archive(state.realName)
  678. if err != nil {
  679. l.Warnln("puller: final:", err)
  680. return
  681. }
  682. }
  683. // If the target path is a symlink or a directory, we cannot copy
  684. // over it, hence remove it before proceeding.
  685. stat, err := os.Lstat(state.realName)
  686. if err == nil && (stat.IsDir() || stat.Mode()&os.ModeSymlink != 0) {
  687. osutil.InWritableDir(os.Remove, state.realName)
  688. }
  689. // Replace the original content with the new one
  690. err = osutil.Rename(state.tempName, state.realName)
  691. if err != nil {
  692. l.Warnln("puller: final:", err)
  693. return
  694. }
  695. // If it's a symlink, the target of the symlink is inside the file.
  696. if state.file.IsSymlink() {
  697. content, err := ioutil.ReadFile(state.realName)
  698. if err != nil {
  699. l.Warnln("puller: final: reading symlink:", err)
  700. return
  701. }
  702. // Remove the file, and replace it with a symlink.
  703. err = osutil.InWritableDir(func(path string) error {
  704. os.Remove(path)
  705. return symlinks.Create(path, string(content), state.file.Flags)
  706. }, state.realName)
  707. if err != nil {
  708. l.Warnln("puller: final: creating symlink:", err)
  709. return
  710. }
  711. }
  712. // Record the updated file in the index
  713. p.model.updateLocal(p.folder, state.file)
  714. }
  715. func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
  716. for state := range in {
  717. if closed, err := state.finalClose(); closed {
  718. if debug {
  719. l.Debugln(p, "closing", state.file.Name)
  720. }
  721. if err != nil {
  722. l.Warnln("puller: final:", err)
  723. continue
  724. }
  725. p.performFinish(state)
  726. if p.progressEmitter != nil {
  727. p.progressEmitter.Deregister(state)
  728. }
  729. }
  730. }
  731. }
  732. func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
  733. for i := range cfg.Folders {
  734. folder := &cfg.Folders[i]
  735. if folder.ID == folderID {
  736. folder.Invalid = err.Error()
  737. return
  738. }
  739. }
  740. }