puller.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "errors"
  7. "fmt"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "github.com/syncthing/syncthing/internal/config"
  13. "github.com/syncthing/syncthing/internal/events"
  14. "github.com/syncthing/syncthing/internal/osutil"
  15. "github.com/syncthing/syncthing/internal/protocol"
  16. "github.com/syncthing/syncthing/internal/scanner"
  17. "github.com/syncthing/syncthing/internal/versioner"
  18. )
  19. // TODO: Stop on errors
  20. const (
  21. copiersPerRepo = 1
  22. pullersPerRepo = 16
  23. finishersPerRepo = 2
  24. pauseIntv = 60 * time.Second
  25. nextPullIntv = 10 * time.Second
  26. checkPullIntv = 1 * time.Second
  27. )
  28. // A pullBlockState is passed to the puller routine for each block that needs
  29. // to be fetched.
  30. type pullBlockState struct {
  31. *sharedPullerState
  32. block protocol.BlockInfo
  33. }
  34. // A copyBlocksState is passed to copy routine if the file has blocks to be
  35. // copied from the original.
  36. type copyBlocksState struct {
  37. *sharedPullerState
  38. blocks []protocol.BlockInfo
  39. }
  40. var (
  41. activity = newNodeActivity()
  42. errNoNode = errors.New("no available source node")
  43. )
  44. type Puller struct {
  45. repo string
  46. dir string
  47. scanIntv time.Duration
  48. model *Model
  49. stop chan struct{}
  50. versioner versioner.Versioner
  51. }
  52. // Serve will run scans and pulls. It will return when Stop()ed or on a
  53. // critical error.
  54. func (p *Puller) Serve() {
  55. if debug {
  56. l.Debugln(p, "starting")
  57. defer l.Debugln(p, "exiting")
  58. }
  59. p.stop = make(chan struct{})
  60. pullTimer := time.NewTimer(checkPullIntv)
  61. scanTimer := time.NewTimer(p.scanIntv)
  62. defer func() {
  63. pullTimer.Stop()
  64. scanTimer.Stop()
  65. // TODO: Should there be an actual RepoStopped state?
  66. p.model.setState(p.repo, RepoIdle)
  67. }()
  68. var prevVer uint64
  69. // Clean out old temporaries before we start pulling
  70. p.clean()
  71. loop:
  72. for {
  73. select {
  74. case <-p.stop:
  75. return
  76. // TODO: We could easily add a channel here for notifications from
  77. // Index(), so that we immediately start a pull when new index
  78. // information is available. Before that though, I'd like to build a
  79. // repeatable benchmark of how long it takes to sync a change from
  80. // node A to node B, so we have something to work against.
  81. case <-pullTimer.C:
  82. // RemoteLocalVersion() is a fast call, doesn't touch the database.
  83. curVer := p.model.RemoteLocalVersion(p.repo)
  84. if curVer == prevVer {
  85. pullTimer.Reset(checkPullIntv)
  86. continue
  87. }
  88. if debug {
  89. l.Debugln(p, "pulling", prevVer, curVer)
  90. }
  91. p.model.setState(p.repo, RepoSyncing)
  92. tries := 0
  93. for {
  94. tries++
  95. changed := p.pullerIteration(copiersPerRepo, pullersPerRepo, finishersPerRepo)
  96. if debug {
  97. l.Debugln(p, "changed", changed)
  98. }
  99. if changed == 0 {
  100. // No files were changed by the puller, so we are in
  101. // sync. Remember the local version number and
  102. // schedule a resync a little bit into the future.
  103. if lv := p.model.RemoteLocalVersion(p.repo); lv < curVer {
  104. // There's a corner case where the node we needed
  105. // files from disconnected during the puller
  106. // iteration. The files will have been removed from
  107. // the index, so we've concluded that we don't need
  108. // them, but at the same time we have the local
  109. // version that includes those files in curVer. So we
  110. // catch the case that localVersion might have
  111. // decresed here.
  112. l.Debugln(p,"adjusting curVer",lv)
  113. curVer = lv
  114. }
  115. prevVer = curVer
  116. pullTimer.Reset(nextPullIntv)
  117. break
  118. }
  119. if tries > 10 {
  120. // We've tried a bunch of times to get in sync, but
  121. // we're not making it. Probably there are write
  122. // errors preventing us. Flag this with a warning and
  123. // wait a bit longer before retrying.
  124. l.Warnf("Repo %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.repo, pauseIntv)
  125. pullTimer.Reset(pauseIntv)
  126. break
  127. }
  128. }
  129. p.model.setState(p.repo, RepoIdle)
  130. // The reason for running the scanner from within the puller is that
  131. // this is the easiest way to make sure we are not doing both at the
  132. // same time.
  133. case <-scanTimer.C:
  134. if debug {
  135. l.Debugln(p, "rescan")
  136. }
  137. p.model.setState(p.repo, RepoScanning)
  138. if err := p.model.ScanRepo(p.repo); err != nil {
  139. invalidateRepo(p.model.cfg, p.repo, err)
  140. break loop
  141. }
  142. p.model.setState(p.repo, RepoIdle)
  143. scanTimer.Reset(p.scanIntv)
  144. }
  145. }
  146. }
  147. func (p *Puller) Stop() {
  148. close(p.stop)
  149. }
  150. func (p *Puller) String() string {
  151. return fmt.Sprintf("puller/%s@%p", p.repo, p)
  152. }
  153. // pullerIteration runs a single puller iteration for the given repo and
  154. // returns the number items that should have been synced (even those that
  155. // might have failed). One puller iteration handles all files currently
  156. // flagged as needed in the repo. The specified number of copier, puller and
  157. // finisher routines are used. It's seldom efficient to use more than one
  158. // copier routine, while multiple pullers are essential and multiple finishers
  159. // may be useful (they are primarily CPU bound due to hashing).
  160. func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
  161. pullChan := make(chan pullBlockState)
  162. copyChan := make(chan copyBlocksState)
  163. finisherChan := make(chan *sharedPullerState)
  164. var wg sync.WaitGroup
  165. var doneWg sync.WaitGroup
  166. for i := 0; i < ncopiers; i++ {
  167. wg.Add(1)
  168. go func() {
  169. // copierRoutine finishes when copyChan is closed
  170. p.copierRoutine(copyChan, finisherChan)
  171. wg.Done()
  172. }()
  173. }
  174. for i := 0; i < npullers; i++ {
  175. wg.Add(1)
  176. go func() {
  177. // pullerRoutine finishes when pullChan is closed
  178. p.pullerRoutine(pullChan, finisherChan)
  179. wg.Done()
  180. }()
  181. }
  182. for i := 0; i < nfinishers; i++ {
  183. doneWg.Add(1)
  184. // finisherRoutine finishes when finisherChan is closed
  185. go func() {
  186. p.finisherRoutine(finisherChan)
  187. doneWg.Done()
  188. }()
  189. }
  190. p.model.rmut.RLock()
  191. files := p.model.repoFiles[p.repo]
  192. p.model.rmut.RUnlock()
  193. // !!!
  194. // WithNeed takes a database snapshot (by necessity). By the time we've
  195. // handled a bunch of files it might have become out of date and we might
  196. // be attempting to sync with an old version of a file...
  197. // !!!
  198. changed := 0
  199. files.WithNeed(protocol.LocalNodeID, func(intf protocol.FileIntf) bool {
  200. // Needed items are delivered sorted lexicographically. This isn't
  201. // really optimal from a performance point of view - it would be
  202. // better if files were handled in random order, to spread the load
  203. // over the cluster. But it means that we can be sure that we fully
  204. // handle directories before the files that go inside them, which is
  205. // nice.
  206. file := intf.(protocol.FileInfo)
  207. events.Default.Log(events.ItemStarted, map[string]string{
  208. "repo": p.repo,
  209. "item": file.Name,
  210. })
  211. if debug {
  212. l.Debugln(p, "handling", file.Name)
  213. }
  214. switch {
  215. case protocol.IsDirectory(file.Flags) && protocol.IsDeleted(file.Flags):
  216. // A deleted directory
  217. p.deleteDir(file)
  218. case protocol.IsDirectory(file.Flags):
  219. // A new or changed directory
  220. p.handleDir(file)
  221. case protocol.IsDeleted(file.Flags):
  222. // A deleted file
  223. p.deleteFile(file)
  224. default:
  225. // A new or changed file. This is the only case where we do stuff
  226. // in the background; the other three are done synchronously.
  227. p.handleFile(file, copyChan, pullChan)
  228. }
  229. changed++
  230. return true
  231. })
  232. // Signal copy and puller routines that we are done with the in data for
  233. // this iteration
  234. close(copyChan)
  235. close(pullChan)
  236. // Wait for them to finish, then signal the finisher chan that there will
  237. // be no more input.
  238. wg.Wait()
  239. close(finisherChan)
  240. // Wait for the finisherChan to finish.
  241. doneWg.Wait()
  242. return changed
  243. }
  244. // handleDir creates or updates the given directory
  245. func (p *Puller) handleDir(file protocol.FileInfo) {
  246. realName := filepath.Join(p.dir, file.Name)
  247. mode := os.FileMode(file.Flags & 0777)
  248. if debug {
  249. curFile := p.model.CurrentRepoFile(p.repo, file.Name)
  250. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  251. }
  252. if info, err := os.Stat(realName); err != nil {
  253. if os.IsNotExist(err) {
  254. // The directory doesn't exist, so we create it with the right
  255. // mode bits from the start.
  256. mkdir := func(path string) error {
  257. // We declare a function that acts on only the path name, so
  258. // we can pass it to InWritableDir. We use a regular Mkdir and
  259. // not MkdirAll because the parent should already exist.
  260. return os.Mkdir(path, mode)
  261. }
  262. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  263. p.model.updateLocal(p.repo, file)
  264. } else {
  265. l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
  266. }
  267. return
  268. }
  269. // Weird error when stat()'ing the dir. Probably won't work to do
  270. // anything else with it if we can't even stat() it.
  271. l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
  272. return
  273. } else if !info.IsDir() {
  274. l.Infof("Puller (repo %q, file %q): should be dir, but is not", p.repo, file.Name)
  275. return
  276. }
  277. // The directory already exists, so we just correct the mode bits. (We
  278. // don't handle modification times on directories, because that sucks...)
  279. // It's OK to change mode bits on stuff within non-writable directories.
  280. if err := os.Chmod(realName, mode); err == nil {
  281. p.model.updateLocal(p.repo, file)
  282. } else {
  283. l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
  284. }
  285. }
  286. // deleteDir attempts to delete the given directory
  287. func (p *Puller) deleteDir(file protocol.FileInfo) {
  288. realName := filepath.Join(p.dir, file.Name)
  289. err := osutil.InWritableDir(os.Remove, realName)
  290. if err == nil || os.IsNotExist(err) {
  291. p.model.updateLocal(p.repo, file)
  292. }
  293. }
  294. // deleteFile attempts to delete the given file
  295. func (p *Puller) deleteFile(file protocol.FileInfo) {
  296. realName := filepath.Join(p.dir, file.Name)
  297. var err error
  298. if p.versioner != nil {
  299. err = osutil.InWritableDir(p.versioner.Archive, realName)
  300. } else {
  301. err = osutil.InWritableDir(os.Remove, realName)
  302. }
  303. if err != nil {
  304. l.Infof("Puller (repo %q, file %q): delete: %v", p.repo, file.Name, err)
  305. } else {
  306. p.model.updateLocal(p.repo, file)
  307. }
  308. }
  309. // handleFile queues the copies and pulls as necessary for a single new or
  310. // changed file.
  311. func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState) {
  312. curFile := p.model.CurrentRepoFile(p.repo, file.Name)
  313. copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks)
  314. if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 {
  315. // We are supposed to copy the entire file, and then fetch nothing. We
  316. // are only updating metadata, so we don't actually *need* to make the
  317. // copy.
  318. if debug {
  319. l.Debugln(p, "taking shortcut on", file.Name)
  320. }
  321. p.shortcutFile(file)
  322. return
  323. }
  324. // Figure out the absolute filenames we need once and for all
  325. tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
  326. realName := filepath.Join(p.dir, file.Name)
  327. s := sharedPullerState{
  328. file: file,
  329. repo: p.repo,
  330. tempName: tempName,
  331. realName: realName,
  332. pullNeeded: len(pullBlocks),
  333. }
  334. if len(copyBlocks) > 0 {
  335. s.copyNeeded = 1
  336. }
  337. if debug {
  338. l.Debugf("%v need file %s; copy %d, pull %d", p, file.Name, len(copyBlocks), len(pullBlocks))
  339. }
  340. if len(copyBlocks) > 0 {
  341. cs := copyBlocksState{
  342. sharedPullerState: &s,
  343. blocks: copyBlocks,
  344. }
  345. copyChan <- cs
  346. }
  347. if len(pullBlocks) > 0 {
  348. for _, block := range pullBlocks {
  349. ps := pullBlockState{
  350. sharedPullerState: &s,
  351. block: block,
  352. }
  353. pullChan <- ps
  354. }
  355. }
  356. }
  357. // shortcutFile sets file mode and modification time, when that's the only
  358. // thing that has changed.
  359. func (p *Puller) shortcutFile(file protocol.FileInfo) {
  360. realName := filepath.Join(p.dir, file.Name)
  361. err := os.Chmod(realName, os.FileMode(file.Flags&0777))
  362. if err != nil {
  363. l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err)
  364. return
  365. }
  366. t := time.Unix(file.Modified, 0)
  367. err = os.Chtimes(realName, t, t)
  368. if err != nil {
  369. l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err)
  370. return
  371. }
  372. p.model.updateLocal(p.repo, file)
  373. }
  374. // copierRoutine reads pullerStates until the in channel closes and performs
  375. // the relevant copy.
  376. func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) {
  377. buf := make([]byte, scanner.StandardBlockSize)
  378. nextFile:
  379. for state := range in {
  380. dstFd, err := state.tempFile()
  381. if err != nil {
  382. // Nothing more to do for this failed file (the error was logged
  383. // when it happened)
  384. continue nextFile
  385. }
  386. srcFd, err := state.sourceFile()
  387. if err != nil {
  388. // As above
  389. continue nextFile
  390. }
  391. for _, block := range state.blocks {
  392. buf = buf[:int(block.Size)]
  393. _, err = srcFd.ReadAt(buf, block.Offset)
  394. if err != nil {
  395. state.earlyClose("src read", err)
  396. srcFd.Close()
  397. continue nextFile
  398. }
  399. _, err = dstFd.WriteAt(buf, block.Offset)
  400. if err != nil {
  401. state.earlyClose("dst write", err)
  402. srcFd.Close()
  403. continue nextFile
  404. }
  405. }
  406. srcFd.Close()
  407. state.copyDone()
  408. out <- state.sharedPullerState
  409. }
  410. }
  411. func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  412. nextBlock:
  413. for state := range in {
  414. if state.failed() != nil {
  415. continue nextBlock
  416. }
  417. // Select the least busy node to pull the block frop.model. If we found no
  418. // feasible node at all, fail the block (and in the long run, the
  419. // file).
  420. potentialNodes := p.model.availability(p.repo, state.file.Name)
  421. selected := activity.leastBusy(potentialNodes)
  422. if selected == (protocol.NodeID{}) {
  423. state.earlyClose("pull", errNoNode)
  424. continue nextBlock
  425. }
  426. // Get an fd to the temporary file. Tehcnically we don't need it until
  427. // after fetching the block, but if we run into an error here there is
  428. // no point in issuing the request to the network.
  429. fd, err := state.tempFile()
  430. if err != nil {
  431. continue nextBlock
  432. }
  433. // Fetch the block, while marking the selected node as in use so that
  434. // leastBusy can select another node when someone else asks.
  435. activity.using(selected)
  436. buf, err := p.model.requestGlobal(selected, p.repo, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash)
  437. activity.done(selected)
  438. if err != nil {
  439. state.earlyClose("pull", err)
  440. continue nextBlock
  441. }
  442. // Save the block data we got from the cluster
  443. _, err = fd.WriteAt(buf, state.block.Offset)
  444. if err != nil {
  445. state.earlyClose("save", err)
  446. continue nextBlock
  447. }
  448. state.pullDone()
  449. out <- state.sharedPullerState
  450. }
  451. }
  452. func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
  453. for state := range in {
  454. if closed, err := state.finalClose(); closed {
  455. if debug {
  456. l.Debugln(p, "closing", state.file.Name)
  457. }
  458. if err != nil {
  459. l.Warnln("puller: final:", err)
  460. continue
  461. }
  462. // Verify the file against expected hashes
  463. fd, err := os.Open(state.tempName)
  464. if err != nil {
  465. l.Warnln("puller: final:", err)
  466. continue
  467. }
  468. err = scanner.Verify(fd, scanner.StandardBlockSize, state.file.Blocks)
  469. fd.Close()
  470. if err != nil {
  471. l.Warnln("puller: final:", state.file.Name, err)
  472. continue
  473. }
  474. // Set the correct permission bits on the new file
  475. err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
  476. if err != nil {
  477. os.Remove(state.tempName)
  478. l.Warnln("puller: final:", err)
  479. continue
  480. }
  481. // Set the correct timestamp on the new file
  482. t := time.Unix(state.file.Modified, 0)
  483. err = os.Chtimes(state.tempName, t, t)
  484. if err != nil {
  485. os.Remove(state.tempName)
  486. l.Warnln("puller: final:", err)
  487. continue
  488. }
  489. // If we should use versioning, let the versioner archive the old
  490. // file before we replace it. Archiving a non-existent file is not
  491. // an error.
  492. if p.versioner != nil {
  493. err = p.versioner.Archive(state.realName)
  494. if err != nil {
  495. os.Remove(state.tempName)
  496. l.Warnln("puller: final:", err)
  497. continue
  498. }
  499. }
  500. // Replace the original file with the new one
  501. err = osutil.Rename(state.tempName, state.realName)
  502. if err != nil {
  503. os.Remove(state.tempName)
  504. l.Warnln("puller: final:", err)
  505. continue
  506. }
  507. // Record the updated file in the index
  508. p.model.updateLocal(p.repo, state.file)
  509. }
  510. }
  511. }
  512. // clean deletes orphaned temporary files
  513. func (p *Puller) clean() {
  514. filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error {
  515. if err != nil {
  516. return err
  517. }
  518. if info.Mode().IsRegular() && defTempNamer.IsTemporary(path) {
  519. os.Remove(path)
  520. }
  521. return nil
  522. })
  523. }
  524. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  525. for i := range cfg.Repositories {
  526. repo := &cfg.Repositories[i]
  527. if repo.ID == repoID {
  528. repo.Invalid = err.Error()
  529. return
  530. }
  531. }
  532. }