puller.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "errors"
  7. "fmt"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "github.com/syncthing/syncthing/internal/config"
  13. "github.com/syncthing/syncthing/internal/events"
  14. "github.com/syncthing/syncthing/internal/osutil"
  15. "github.com/syncthing/syncthing/internal/protocol"
  16. "github.com/syncthing/syncthing/internal/scanner"
  17. "github.com/syncthing/syncthing/internal/versioner"
  18. )
  19. // TODO: Stop on errors
  20. const (
  21. copiersPerRepo = 1
  22. pullersPerRepo = 16
  23. finishersPerRepo = 2
  24. pauseIntv = 60 * time.Second
  25. nextPullIntv = 10 * time.Second
  26. checkPullIntv = 1 * time.Second
  27. )
  28. // A pullBlockState is passed to the puller routine for each block that needs
  29. // to be fetched.
  30. type pullBlockState struct {
  31. *sharedPullerState
  32. block protocol.BlockInfo
  33. }
  34. // A copyBlocksState is passed to copy routine if the file has blocks to be
  35. // copied from the original.
  36. type copyBlocksState struct {
  37. *sharedPullerState
  38. blocks []protocol.BlockInfo
  39. }
  40. var (
  41. activity = newNodeActivity()
  42. errNoNode = errors.New("no available source node")
  43. )
  44. type Puller struct {
  45. repo string
  46. dir string
  47. scanIntv time.Duration
  48. model *Model
  49. stop chan struct{}
  50. versioner versioner.Versioner
  51. }
  52. // Serve will run scans and pulls. It will return when Stop()ed or on a
  53. // critical error.
  54. func (p *Puller) Serve() {
  55. if debug {
  56. l.Debugln(p, "starting")
  57. defer l.Debugln(p, "exiting")
  58. }
  59. p.stop = make(chan struct{})
  60. pullTimer := time.NewTimer(checkPullIntv)
  61. scanTimer := time.NewTimer(p.scanIntv)
  62. defer func() {
  63. pullTimer.Stop()
  64. scanTimer.Stop()
  65. // TODO: Should there be an actual RepoStopped state?
  66. p.model.setState(p.repo, RepoIdle)
  67. }()
  68. var prevVer uint64
  69. // Clean out old temporaries before we start pulling
  70. p.clean()
  71. loop:
  72. for {
  73. select {
  74. case <-p.stop:
  75. return
  76. // TODO: We could easily add a channel here for notifications from
  77. // Index(), so that we immediately start a pull when new index
  78. // information is available. Before that though, I'd like to build a
  79. // repeatable benchmark of how long it takes to sync a change from
  80. // node A to node B, so we have something to work against.
  81. case <-pullTimer.C:
  82. // RemoteLocalVersion() is a fast call, doesn't touch the database.
  83. curVer := p.model.RemoteLocalVersion(p.repo)
  84. if curVer == prevVer {
  85. pullTimer.Reset(checkPullIntv)
  86. continue
  87. }
  88. if debug {
  89. l.Debugln(p, "pulling", prevVer, curVer)
  90. }
  91. p.model.setState(p.repo, RepoSyncing)
  92. tries := 0
  93. for {
  94. tries++
  95. changed := p.pullerIteration(copiersPerRepo, pullersPerRepo, finishersPerRepo)
  96. if debug {
  97. l.Debugln(p, "changed", changed)
  98. }
  99. if changed == 0 {
  100. // No files were changed by the puller, so we are in
  101. // sync. Remember the local version number and
  102. // schedule a resync a little bit into the future.
  103. prevVer = curVer
  104. pullTimer.Reset(nextPullIntv)
  105. break
  106. }
  107. if tries > 10 {
  108. // We've tried a bunch of times to get in sync, but
  109. // we're not making it. Probably there are write
  110. // errors preventing us. Flag this with a warning and
  111. // wait a bit longer before retrying.
  112. l.Warnf("Repo %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.repo, pauseIntv)
  113. pullTimer.Reset(pauseIntv)
  114. break
  115. }
  116. }
  117. p.model.setState(p.repo, RepoIdle)
  118. // The reason for running the scanner from within the puller is that
  119. // this is the easiest way to make sure we are not doing both at the
  120. // same time.
  121. case <-scanTimer.C:
  122. if debug {
  123. l.Debugln(p, "rescan")
  124. }
  125. p.model.setState(p.repo, RepoScanning)
  126. if err := p.model.ScanRepo(p.repo); err != nil {
  127. invalidateRepo(p.model.cfg, p.repo, err)
  128. break loop
  129. }
  130. p.model.setState(p.repo, RepoIdle)
  131. scanTimer.Reset(p.scanIntv)
  132. }
  133. }
  134. }
  135. func (p *Puller) Stop() {
  136. close(p.stop)
  137. }
  138. func (p *Puller) String() string {
  139. return fmt.Sprintf("puller/%s@%p", p.repo, p)
  140. }
  141. // pullerIteration runs a single puller iteration for the given repo and
  142. // returns the number items that should have been synced (even those that
  143. // might have failed). One puller iteration handles all files currently
  144. // flagged as needed in the repo. The specified number of copier, puller and
  145. // finisher routines are used. It's seldom efficient to use more than one
  146. // copier routine, while multiple pullers are essential and multiple finishers
  147. // may be useful (they are primarily CPU bound due to hashing).
  148. func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int {
  149. pullChan := make(chan pullBlockState)
  150. copyChan := make(chan copyBlocksState)
  151. finisherChan := make(chan *sharedPullerState)
  152. var wg sync.WaitGroup
  153. var doneWg sync.WaitGroup
  154. for i := 0; i < ncopiers; i++ {
  155. wg.Add(1)
  156. go func() {
  157. // copierRoutine finishes when copyChan is closed
  158. p.copierRoutine(copyChan, finisherChan)
  159. wg.Done()
  160. }()
  161. }
  162. for i := 0; i < npullers; i++ {
  163. wg.Add(1)
  164. go func() {
  165. // pullerRoutine finishes when pullChan is closed
  166. p.pullerRoutine(pullChan, finisherChan)
  167. wg.Done()
  168. }()
  169. }
  170. for i := 0; i < nfinishers; i++ {
  171. doneWg.Add(1)
  172. // finisherRoutine finishes when finisherChan is closed
  173. go func() {
  174. p.finisherRoutine(finisherChan)
  175. doneWg.Done()
  176. }()
  177. }
  178. p.model.rmut.RLock()
  179. files := p.model.repoFiles[p.repo]
  180. p.model.rmut.RUnlock()
  181. // !!!
  182. // WithNeed takes a database snapshot (by necessity). By the time we've
  183. // handled a bunch of files it might have become out of date and we might
  184. // be attempting to sync with an old version of a file...
  185. // !!!
  186. changed := 0
  187. files.WithNeed(protocol.LocalNodeID, func(intf protocol.FileIntf) bool {
  188. file := intf.(protocol.FileInfo)
  189. events.Default.Log(events.ItemStarted, map[string]string{
  190. "repo": p.repo,
  191. "item": file.Name,
  192. })
  193. if debug {
  194. l.Debugln(p, "handling", file.Name)
  195. }
  196. switch {
  197. case protocol.IsDirectory(file.Flags) && protocol.IsDeleted(file.Flags):
  198. // A deleted directory
  199. p.deleteDir(file)
  200. case protocol.IsDirectory(file.Flags):
  201. // A new or changed directory
  202. p.handleDir(file)
  203. case protocol.IsDeleted(file.Flags):
  204. // A deleted file
  205. p.deleteFile(file)
  206. default:
  207. // A new or changed file
  208. p.handleFile(file, copyChan, pullChan)
  209. }
  210. changed++
  211. return true
  212. })
  213. // Signal copy and puller routines that we are done with the in data for
  214. // this iteration
  215. close(copyChan)
  216. close(pullChan)
  217. // Wait for them to finish, then signal the finisher chan that there will
  218. // be no more input.
  219. wg.Wait()
  220. close(finisherChan)
  221. // Wait for the finisherChan to finish.
  222. doneWg.Wait()
  223. return changed
  224. }
  225. // handleDir creates or updates the given directory
  226. func (p *Puller) handleDir(file protocol.FileInfo) {
  227. realName := filepath.Join(p.dir, file.Name)
  228. mode := os.FileMode(file.Flags & 0777)
  229. if debug {
  230. curFile := p.model.CurrentRepoFile(p.repo, file.Name)
  231. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  232. }
  233. if info, err := os.Stat(realName); err != nil {
  234. if os.IsNotExist(err) {
  235. // The directory doesn't exist, so we create it with the right
  236. // mode bits from the start.
  237. mkdir := func(path string) error {
  238. // We declare a function that acts on only the path name, so
  239. // we can pass it to InWritableDir. We use a regular Mkdir and
  240. // not MkdirAll because the parent should already exist.
  241. return os.Mkdir(path, mode)
  242. }
  243. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  244. p.model.updateLocal(p.repo, file)
  245. } else {
  246. l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
  247. }
  248. return
  249. }
  250. // Weird error when stat()'ing the dir. Probably won't work to do
  251. // anything else with it if we can't even stat() it.
  252. l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
  253. return
  254. } else if !info.IsDir() {
  255. l.Infof("Puller (repo %q, file %q): should be dir, but is not", p.repo, file.Name)
  256. return
  257. }
  258. // The directory already exists, so we just correct the mode bits. (We
  259. // don't handle modification times on directories, because that sucks...)
  260. // It's OK to change mode bits on stuff within non-writable directories.
  261. if err := os.Chmod(realName, mode); err == nil {
  262. p.model.updateLocal(p.repo, file)
  263. } else {
  264. l.Infof("Puller (repo %q, file %q): %v", p.repo, file.Name, err)
  265. }
  266. }
  267. // deleteDir attempts to delete the given directory
  268. func (p *Puller) deleteDir(file protocol.FileInfo) {
  269. realName := filepath.Join(p.dir, file.Name)
  270. err := osutil.InWritableDir(os.Remove, realName)
  271. if err == nil || os.IsNotExist(err) {
  272. p.model.updateLocal(p.repo, file)
  273. }
  274. }
  275. // deleteFile attempts to delete the given file
  276. func (p *Puller) deleteFile(file protocol.FileInfo) {
  277. realName := filepath.Join(p.dir, file.Name)
  278. var err error
  279. if p.versioner != nil {
  280. err = osutil.InWritableDir(p.versioner.Archive, realName)
  281. } else {
  282. err = osutil.InWritableDir(os.Remove, realName)
  283. }
  284. if err != nil {
  285. l.Infof("Puller (repo %q, file %q): delete: %v", p.repo, file.Name, err)
  286. } else {
  287. p.model.updateLocal(p.repo, file)
  288. }
  289. }
  290. // handleFile queues the copies and pulls as necessary for a single new or
  291. // changed file.
  292. func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState) {
  293. curFile := p.model.CurrentRepoFile(p.repo, file.Name)
  294. copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks)
  295. if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 {
  296. // We are supposed to copy the entire file, and then fetch nothing. We
  297. // are only updating metadata, so we don't actually *need* to make the
  298. // copy.
  299. if debug {
  300. l.Debugln(p, "taking shortcut on", file.Name)
  301. }
  302. p.shortcutFile(file)
  303. return
  304. }
  305. // Figure out the absolute filenames we need once and for all
  306. tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
  307. realName := filepath.Join(p.dir, file.Name)
  308. s := sharedPullerState{
  309. file: file,
  310. repo: p.repo,
  311. tempName: tempName,
  312. realName: realName,
  313. pullNeeded: len(pullBlocks),
  314. }
  315. if len(copyBlocks) > 0 {
  316. s.copyNeeded = 1
  317. }
  318. if debug {
  319. l.Debugf("%v need file %s; copy %d, pull %d", p, file.Name, len(copyBlocks), len(pullBlocks))
  320. }
  321. if len(copyBlocks) > 0 {
  322. cs := copyBlocksState{
  323. sharedPullerState: &s,
  324. blocks: copyBlocks,
  325. }
  326. copyChan <- cs
  327. }
  328. if len(pullBlocks) > 0 {
  329. for _, block := range pullBlocks {
  330. ps := pullBlockState{
  331. sharedPullerState: &s,
  332. block: block,
  333. }
  334. pullChan <- ps
  335. }
  336. }
  337. }
  338. // shortcutFile sets file mode and modification time, when that's the only
  339. // thing that has changed.
  340. func (p *Puller) shortcutFile(file protocol.FileInfo) {
  341. realName := filepath.Join(p.dir, file.Name)
  342. err := os.Chmod(realName, os.FileMode(file.Flags&0777))
  343. if err != nil {
  344. l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err)
  345. return
  346. }
  347. t := time.Unix(file.Modified, 0)
  348. err = os.Chtimes(realName, t, t)
  349. if err != nil {
  350. l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err)
  351. return
  352. }
  353. p.model.updateLocal(p.repo, file)
  354. }
  355. // copierRoutine reads pullerStates until the in channel closes and performs
  356. // the relevant copy.
  357. func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) {
  358. buf := make([]byte, scanner.StandardBlockSize)
  359. nextFile:
  360. for state := range in {
  361. dstFd, err := state.tempFile()
  362. if err != nil {
  363. // Nothing more to do for this failed file (the error was logged
  364. // when it happened)
  365. continue nextFile
  366. }
  367. srcFd, err := state.sourceFile()
  368. if err != nil {
  369. // As above
  370. continue nextFile
  371. }
  372. for _, block := range state.blocks {
  373. buf = buf[:int(block.Size)]
  374. _, err = srcFd.ReadAt(buf, block.Offset)
  375. if err != nil {
  376. state.earlyClose("src read", err)
  377. srcFd.Close()
  378. continue nextFile
  379. }
  380. _, err = dstFd.WriteAt(buf, block.Offset)
  381. if err != nil {
  382. state.earlyClose("dst write", err)
  383. srcFd.Close()
  384. continue nextFile
  385. }
  386. }
  387. srcFd.Close()
  388. state.copyDone()
  389. out <- state.sharedPullerState
  390. }
  391. }
  392. func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  393. nextBlock:
  394. for state := range in {
  395. if state.failed() != nil {
  396. continue nextBlock
  397. }
  398. // Select the least busy node to pull the block frop.model. If we found no
  399. // feasible node at all, fail the block (and in the long run, the
  400. // file).
  401. potentialNodes := p.model.availability(p.repo, state.file.Name)
  402. selected := activity.leastBusy(potentialNodes)
  403. if selected == (protocol.NodeID{}) {
  404. state.earlyClose("pull", errNoNode)
  405. continue nextBlock
  406. }
  407. // Get an fd to the temporary file. Tehcnically we don't need it until
  408. // after fetching the block, but if we run into an error here there is
  409. // no point in issuing the request to the network.
  410. fd, err := state.tempFile()
  411. if err != nil {
  412. continue nextBlock
  413. }
  414. // Fetch the block, while marking the selected node as in use so that
  415. // leastBusy can select another node when someone else asks.
  416. activity.using(selected)
  417. buf, err := p.model.requestGlobal(selected, p.repo, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash)
  418. activity.done(selected)
  419. if err != nil {
  420. state.earlyClose("pull", err)
  421. continue nextBlock
  422. }
  423. // Save the block data we got from the cluster
  424. _, err = fd.WriteAt(buf, state.block.Offset)
  425. if err != nil {
  426. state.earlyClose("save", err)
  427. continue nextBlock
  428. }
  429. state.pullDone()
  430. out <- state.sharedPullerState
  431. }
  432. }
  433. func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
  434. for state := range in {
  435. if closed, err := state.finalClose(); closed {
  436. if debug {
  437. l.Debugln(p, "closing", state.file.Name)
  438. }
  439. if err != nil {
  440. l.Warnln("puller: final:", err)
  441. continue
  442. }
  443. // Verify the file against expected hashes
  444. fd, err := os.Open(state.tempName)
  445. if err != nil {
  446. l.Warnln("puller: final:", err)
  447. continue
  448. }
  449. err = scanner.Verify(fd, scanner.StandardBlockSize, state.file.Blocks)
  450. fd.Close()
  451. if err != nil {
  452. l.Warnln("puller: final:", state.file.Name, err)
  453. continue
  454. }
  455. // Set the correct permission bits on the new file
  456. err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
  457. if err != nil {
  458. os.Remove(state.tempName)
  459. l.Warnln("puller: final:", err)
  460. continue
  461. }
  462. // Set the correct timestamp on the new file
  463. t := time.Unix(state.file.Modified, 0)
  464. err = os.Chtimes(state.tempName, t, t)
  465. if err != nil {
  466. os.Remove(state.tempName)
  467. l.Warnln("puller: final:", err)
  468. continue
  469. }
  470. // If we should use versioning, let the versioner archive the old
  471. // file before we replace it. Archiving a non-existent file is not
  472. // an error.
  473. if p.versioner != nil {
  474. err = p.versioner.Archive(state.realName)
  475. if err != nil {
  476. os.Remove(state.tempName)
  477. l.Warnln("puller: final:", err)
  478. continue
  479. }
  480. }
  481. // Replace the original file with the new one
  482. err = osutil.Rename(state.tempName, state.realName)
  483. if err != nil {
  484. os.Remove(state.tempName)
  485. l.Warnln("puller: final:", err)
  486. continue
  487. }
  488. // Record the updated file in the index
  489. p.model.updateLocal(p.repo, state.file)
  490. }
  491. }
  492. }
  493. // clean deletes orphaned temporary files
  494. func (p *Puller) clean() {
  495. filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error {
  496. if err != nil {
  497. return err
  498. }
  499. if info.Mode().IsRegular() && defTempNamer.IsTemporary(path) {
  500. os.Remove(path)
  501. }
  502. return nil
  503. })
  504. }
  505. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  506. for i := range cfg.Repositories {
  507. repo := &cfg.Repositories[i]
  508. if repo.ID == repoID {
  509. repo.Invalid = err.Error()
  510. return
  511. }
  512. }
  513. }