puller.go 16 KB


  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "bytes"
  7. "errors"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "time"
  12. "github.com/calmh/syncthing/config"
  13. "github.com/calmh/syncthing/events"
  14. "github.com/calmh/syncthing/osutil"
  15. "github.com/calmh/syncthing/protocol"
  16. "github.com/calmh/syncthing/scanner"
  17. "github.com/calmh/syncthing/versioner"
  18. )
  19. type requestResult struct {
  20. node protocol.NodeID
  21. file protocol.FileInfo
  22. filepath string // full filepath name
  23. offset int64
  24. data []byte
  25. err error
  26. }
  27. type openFile struct {
  28. filepath string // full filepath name
  29. temp string // temporary filename
  30. availability []protocol.NodeID
  31. file *os.File
  32. err error // error when opening or writing to file, all following operations are cancelled
  33. outstanding int // number of requests we still have outstanding
  34. done bool // we have sent all requests for this file
  35. }
  36. type activityMap map[protocol.NodeID]int
  37. func (m activityMap) leastBusyNode(availability []protocol.NodeID) protocol.NodeID {
  38. var low int = 2<<30 - 1
  39. var selected protocol.NodeID
  40. for _, node := range availability {
  41. usage := m[node]
  42. if usage < low {
  43. low = usage
  44. selected = node
  45. }
  46. }
  47. m[selected]++
  48. return selected
  49. }
  50. func (m activityMap) decrease(node protocol.NodeID) {
  51. m[node]--
  52. }
  53. var errNoNode = errors.New("no available source node")
  54. type puller struct {
  55. cfg *config.Configuration
  56. repoCfg config.RepositoryConfiguration
  57. bq *blockQueue
  58. model *Model
  59. oustandingPerNode activityMap
  60. openFiles map[string]openFile
  61. requestSlots chan bool
  62. blocks chan bqBlock
  63. requestResults chan requestResult
  64. versioner versioner.Versioner
  65. }
  66. func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
  67. p := &puller{
  68. repoCfg: repoCfg,
  69. cfg: cfg,
  70. bq: newBlockQueue(),
  71. model: model,
  72. oustandingPerNode: make(activityMap),
  73. openFiles: make(map[string]openFile),
  74. requestSlots: make(chan bool, slots),
  75. blocks: make(chan bqBlock),
  76. requestResults: make(chan requestResult),
  77. }
  78. if len(repoCfg.Versioning.Type) > 0 {
  79. factory, ok := versioner.Factories[repoCfg.Versioning.Type]
  80. if !ok {
  81. l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type)
  82. }
  83. p.versioner = factory(repoCfg.Versioning.Params)
  84. }
  85. if slots > 0 {
  86. // Read/write
  87. for i := 0; i < slots; i++ {
  88. p.requestSlots <- true
  89. }
  90. if debug {
  91. l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
  92. }
  93. go p.run()
  94. } else {
  95. // Read only
  96. if debug {
  97. l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
  98. }
  99. go p.runRO()
  100. }
  101. return p
  102. }
  103. func (p *puller) run() {
  104. go func() {
  105. // fill blocks queue when there are free slots
  106. for {
  107. <-p.requestSlots
  108. b := p.bq.get()
  109. if debug {
  110. l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
  111. }
  112. p.blocks <- b
  113. }
  114. }()
  115. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  116. timeout := time.Tick(5 * time.Second)
  117. changed := true
  118. var prevVer uint64
  119. for {
  120. // Run the pulling loop as long as there are blocks to fetch
  121. pull:
  122. for {
  123. select {
  124. case res := <-p.requestResults:
  125. p.model.setState(p.repoCfg.ID, RepoSyncing)
  126. changed = true
  127. p.requestSlots <- true
  128. p.handleRequestResult(res)
  129. case b := <-p.blocks:
  130. p.model.setState(p.repoCfg.ID, RepoSyncing)
  131. changed = true
  132. if p.handleBlock(b) {
  133. // Block was fully handled, free up the slot
  134. p.requestSlots <- true
  135. }
  136. case <-timeout:
  137. if len(p.openFiles) == 0 && p.bq.empty() {
  138. // Nothing more to do for the moment
  139. break pull
  140. }
  141. if debug {
  142. l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
  143. i := 5
  144. for _, f := range p.openFiles {
  145. l.Debugf(" %v", f)
  146. i--
  147. if i == 0 {
  148. break
  149. }
  150. }
  151. }
  152. }
  153. }
  154. if changed {
  155. p.model.setState(p.repoCfg.ID, RepoCleaning)
  156. p.fixupDirectories()
  157. changed = false
  158. }
  159. p.model.setState(p.repoCfg.ID, RepoIdle)
  160. // Do a rescan if it's time for it
  161. select {
  162. case <-walkTicker:
  163. if debug {
  164. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  165. }
  166. err := p.model.ScanRepo(p.repoCfg.ID)
  167. if err != nil {
  168. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  169. return
  170. }
  171. default:
  172. }
  173. if v := p.model.Version(p.repoCfg.ID); v > prevVer {
  174. // Queue more blocks to fetch, if any
  175. p.queueNeededBlocks()
  176. prevVer = v
  177. }
  178. }
  179. }
  180. func (p *puller) runRO() {
  181. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  182. for _ = range walkTicker {
  183. if debug {
  184. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  185. }
  186. err := p.model.ScanRepo(p.repoCfg.ID)
  187. if err != nil {
  188. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  189. return
  190. }
  191. }
  192. }
  193. func (p *puller) fixupDirectories() {
  194. var deleteDirs []string
  195. var changed = 0
  196. var walkFn = func(path string, info os.FileInfo, err error) error {
  197. if err != nil {
  198. return err
  199. }
  200. if !info.IsDir() {
  201. return nil
  202. }
  203. rn, err := filepath.Rel(p.repoCfg.Directory, path)
  204. if err != nil {
  205. return nil
  206. }
  207. if rn == "." {
  208. return nil
  209. }
  210. if filepath.Base(rn) == ".stversions" {
  211. return filepath.SkipDir
  212. }
  213. cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
  214. if cur.Name != rn {
  215. // No matching dir in current list; weird
  216. if debug {
  217. l.Debugf("missing dir: %s; %v", rn, cur)
  218. }
  219. return nil
  220. }
  221. if protocol.IsDeleted(cur.Flags) {
  222. if debug {
  223. l.Debugf("queue delete dir: %v", cur)
  224. }
  225. // We queue the directories to delete since we walk the
  226. // tree in depth first order and need to remove the
  227. // directories in the opposite order.
  228. deleteDirs = append(deleteDirs, path)
  229. return nil
  230. }
  231. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) {
  232. err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
  233. if err != nil {
  234. l.Warnf("Restoring folder flags: %q: %v", path, err)
  235. } else {
  236. changed++
  237. if debug {
  238. l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
  239. }
  240. }
  241. }
  242. if cur.Modified != info.ModTime().Unix() {
  243. t := time.Unix(cur.Modified, 0)
  244. err := os.Chtimes(path, t, t)
  245. if err != nil {
  246. if runtime.GOOS != "windows" {
  247. // https://code.google.com/p/go/issues/detail?id=8090
  248. l.Warnf("Restoring folder modtime: %q: %v", path, err)
  249. }
  250. } else {
  251. changed++
  252. if debug {
  253. l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
  254. }
  255. }
  256. }
  257. return nil
  258. }
  259. for {
  260. deleteDirs = nil
  261. changed = 0
  262. filepath.Walk(p.repoCfg.Directory, walkFn)
  263. var deleted = 0
  264. // Delete any queued directories
  265. for i := len(deleteDirs) - 1; i >= 0; i-- {
  266. dir := deleteDirs[i]
  267. if debug {
  268. l.Debugln("delete dir:", dir)
  269. }
  270. err := os.Remove(dir)
  271. if err == nil {
  272. deleted++
  273. } else {
  274. l.Warnln("Delete dir:", err)
  275. }
  276. }
  277. if debug {
  278. l.Debugf("changed %d, deleted %d dirs", changed, deleted)
  279. }
  280. if changed+deleted == 0 {
  281. return
  282. }
  283. }
  284. }
  285. func (p *puller) handleRequestResult(res requestResult) {
  286. p.oustandingPerNode.decrease(res.node)
  287. f := res.file
  288. of, ok := p.openFiles[f.Name]
  289. if !ok || of.err != nil {
  290. // no entry in openFiles means there was an error and we've cancelled the operation
  291. return
  292. }
  293. _, of.err = of.file.WriteAt(res.data, res.offset)
  294. of.outstanding--
  295. p.openFiles[f.Name] = of
  296. if debug {
  297. l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
  298. }
  299. if of.done && of.outstanding == 0 {
  300. p.closeFile(f)
  301. }
  302. }
  303. // handleBlock fulfills the block request by copying, ignoring or fetching
  304. // from the network. Returns true if the block was fully handled
  305. // synchronously, i.e. if the slot can be reused.
  306. func (p *puller) handleBlock(b bqBlock) bool {
  307. f := b.file
  308. // For directories, making sure they exist is enough.
  309. // Deleted directories we mark as handled and delete later.
  310. if protocol.IsDirectory(f.Flags) {
  311. if !protocol.IsDeleted(f.Flags) {
  312. path := filepath.Join(p.repoCfg.Directory, f.Name)
  313. _, err := os.Stat(path)
  314. if err != nil && os.IsNotExist(err) {
  315. if debug {
  316. l.Debugf("create dir: %v", f)
  317. }
  318. err = os.MkdirAll(path, 0777)
  319. if err != nil {
  320. l.Warnf("Create folder: %q: %v", path, err)
  321. }
  322. }
  323. } else if debug {
  324. l.Debugf("ignore delete dir: %v", f)
  325. }
  326. p.model.updateLocal(p.repoCfg.ID, f)
  327. return true
  328. }
  329. if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last {
  330. // We are supposed to copy the entire file, and then fetch nothing.
  331. // We don't actually need to make the copy.
  332. if debug {
  333. l.Debugln("taking shortcut:", f)
  334. }
  335. fp := filepath.Join(p.repoCfg.Directory, f.Name)
  336. t := time.Unix(f.Modified, 0)
  337. err := os.Chtimes(fp, t, t)
  338. if debug && err != nil {
  339. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  340. }
  341. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  342. err = os.Chmod(fp, os.FileMode(f.Flags&0777))
  343. if debug && err != nil {
  344. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  345. }
  346. }
  347. events.Default.Log(events.ItemStarted, map[string]string{
  348. "repo": p.repoCfg.ID,
  349. "item": f.Name,
  350. })
  351. p.model.updateLocal(p.repoCfg.ID, f)
  352. return true
  353. }
  354. of, ok := p.openFiles[f.Name]
  355. of.done = b.last
  356. if !ok {
  357. if debug {
  358. l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
  359. }
  360. events.Default.Log(events.ItemStarted, map[string]string{
  361. "repo": p.repoCfg.ID,
  362. "item": f.Name,
  363. })
  364. of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name)
  365. of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
  366. of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
  367. dirName := filepath.Dir(of.filepath)
  368. _, err := os.Stat(dirName)
  369. if err != nil {
  370. err = os.MkdirAll(dirName, 0777)
  371. }
  372. if err != nil {
  373. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  374. }
  375. of.file, of.err = os.Create(of.temp)
  376. if of.err != nil {
  377. if debug {
  378. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  379. }
  380. if !b.last {
  381. p.openFiles[f.Name] = of
  382. }
  383. return true
  384. }
  385. osutil.HideFile(of.temp)
  386. }
  387. if of.err != nil {
  388. // We have already failed this file.
  389. if debug {
  390. l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
  391. }
  392. if b.last {
  393. delete(p.openFiles, f.Name)
  394. }
  395. return true
  396. }
  397. p.openFiles[f.Name] = of
  398. switch {
  399. case len(b.copy) > 0:
  400. p.handleCopyBlock(b)
  401. return true
  402. case b.block.Size > 0:
  403. return p.handleRequestBlock(b)
  404. default:
  405. p.handleEmptyBlock(b)
  406. return true
  407. }
  408. }
  409. func (p *puller) handleCopyBlock(b bqBlock) {
  410. // We have blocks to copy from the existing file
  411. f := b.file
  412. of := p.openFiles[f.Name]
  413. if debug {
  414. l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
  415. }
  416. var exfd *os.File
  417. exfd, of.err = os.Open(of.filepath)
  418. if of.err != nil {
  419. if debug {
  420. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  421. }
  422. of.file.Close()
  423. of.file = nil
  424. p.openFiles[f.Name] = of
  425. return
  426. }
  427. defer exfd.Close()
  428. for _, b := range b.copy {
  429. bs := make([]byte, b.Size)
  430. _, of.err = exfd.ReadAt(bs, b.Offset)
  431. if of.err == nil {
  432. _, of.err = of.file.WriteAt(bs, b.Offset)
  433. }
  434. if of.err != nil {
  435. if debug {
  436. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  437. }
  438. exfd.Close()
  439. of.file.Close()
  440. of.file = nil
  441. p.openFiles[f.Name] = of
  442. return
  443. }
  444. }
  445. }
  446. // handleRequestBlock tries to pull a block from the network. Returns true if
  447. // the block could _not_ be fetched (i.e. it was fully handled, matching the
  448. // return criteria of handleBlock)
  449. func (p *puller) handleRequestBlock(b bqBlock) bool {
  450. f := b.file
  451. of, ok := p.openFiles[f.Name]
  452. if !ok {
  453. panic("bug: request for non-open file")
  454. }
  455. node := p.oustandingPerNode.leastBusyNode(of.availability)
  456. if len(node) == 0 {
  457. of.err = errNoNode
  458. if of.file != nil {
  459. of.file.Close()
  460. of.file = nil
  461. os.Remove(of.temp)
  462. }
  463. if b.last {
  464. delete(p.openFiles, f.Name)
  465. } else {
  466. p.openFiles[f.Name] = of
  467. }
  468. return true
  469. }
  470. of.outstanding++
  471. p.openFiles[f.Name] = of
  472. go func(node protocol.NodeID, b bqBlock) {
  473. if debug {
  474. l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
  475. }
  476. bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
  477. p.requestResults <- requestResult{
  478. node: node,
  479. file: f,
  480. filepath: of.filepath,
  481. offset: b.block.Offset,
  482. data: bs,
  483. err: err,
  484. }
  485. }(node, b)
  486. return false
  487. }
  488. func (p *puller) handleEmptyBlock(b bqBlock) {
  489. f := b.file
  490. of := p.openFiles[f.Name]
  491. if b.last {
  492. if of.err == nil {
  493. of.file.Close()
  494. }
  495. }
  496. if protocol.IsDeleted(f.Flags) {
  497. if debug {
  498. l.Debugf("pull: delete %q", f.Name)
  499. }
  500. os.Remove(of.temp)
  501. os.Chmod(of.filepath, 0666)
  502. if p.versioner != nil {
  503. if debug {
  504. l.Debugln("pull: deleting with versioner")
  505. }
  506. if err := p.versioner.Archive(p.repoCfg.Directory, of.filepath); err == nil {
  507. p.model.updateLocal(p.repoCfg.ID, f)
  508. } else if debug {
  509. l.Debugln("pull: error:", err)
  510. }
  511. } else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
  512. p.model.updateLocal(p.repoCfg.ID, f)
  513. }
  514. } else {
  515. if debug {
  516. l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
  517. }
  518. t := time.Unix(f.Modified, 0)
  519. if os.Chtimes(of.temp, t, t) != nil {
  520. delete(p.openFiles, f.Name)
  521. return
  522. }
  523. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
  524. delete(p.openFiles, f.Name)
  525. return
  526. }
  527. osutil.ShowFile(of.temp)
  528. if osutil.Rename(of.temp, of.filepath) == nil {
  529. p.model.updateLocal(p.repoCfg.ID, f)
  530. }
  531. }
  532. delete(p.openFiles, f.Name)
  533. }
  534. func (p *puller) queueNeededBlocks() {
  535. queued := 0
  536. for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
  537. lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
  538. have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
  539. if debug {
  540. l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
  541. }
  542. queued++
  543. p.bq.put(bqAdd{
  544. file: f,
  545. have: have,
  546. need: need,
  547. })
  548. }
  549. if debug && queued > 0 {
  550. l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
  551. }
  552. }
  553. func (p *puller) closeFile(f protocol.FileInfo) {
  554. if debug {
  555. l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
  556. }
  557. of := p.openFiles[f.Name]
  558. of.file.Close()
  559. defer os.Remove(of.temp)
  560. delete(p.openFiles, f.Name)
  561. fd, err := os.Open(of.temp)
  562. if err != nil {
  563. if debug {
  564. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  565. }
  566. return
  567. }
  568. hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
  569. fd.Close()
  570. if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
  571. if debug {
  572. l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
  573. }
  574. return
  575. }
  576. for i := range hb {
  577. if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
  578. l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
  579. return
  580. }
  581. }
  582. t := time.Unix(f.Modified, 0)
  583. err = os.Chtimes(of.temp, t, t)
  584. if debug && err != nil {
  585. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  586. }
  587. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  588. err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
  589. if debug && err != nil {
  590. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  591. }
  592. }
  593. osutil.ShowFile(of.temp)
  594. if p.versioner != nil {
  595. err := p.versioner.Archive(p.repoCfg.Directory, of.filepath)
  596. if err != nil {
  597. if debug {
  598. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  599. }
  600. return
  601. }
  602. }
  603. if debug {
  604. l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
  605. }
  606. if err := osutil.Rename(of.temp, of.filepath); err == nil {
  607. p.model.updateLocal(p.repoCfg.ID, f)
  608. } else {
  609. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  610. }
  611. }
  612. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  613. for i := range cfg.Repositories {
  614. repo := &cfg.Repositories[i]
  615. if repo.ID == repoID {
  616. repo.Invalid = err.Error()
  617. return
  618. }
  619. }
  620. }