puller.go 16 KB


  1. // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
  2. // Use of this source code is governed by an MIT-style license that can be
  3. // found in the LICENSE file.
  4. package model
  5. import (
  6. "bytes"
  7. "errors"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "time"
  12. "github.com/calmh/syncthing/config"
  13. "github.com/calmh/syncthing/osutil"
  14. "github.com/calmh/syncthing/protocol"
  15. "github.com/calmh/syncthing/scanner"
  16. "github.com/calmh/syncthing/versioner"
  17. )
  18. type requestResult struct {
  19. node protocol.NodeID
  20. file scanner.File
  21. filepath string // full filepath name
  22. offset int64
  23. data []byte
  24. err error
  25. }
  26. type openFile struct {
  27. filepath string // full filepath name
  28. temp string // temporary filename
  29. availability []protocol.NodeID
  30. file *os.File
  31. err error // error when opening or writing to file, all following operations are cancelled
  32. outstanding int // number of requests we still have outstanding
  33. done bool // we have sent all requests for this file
  34. }
  35. type activityMap map[protocol.NodeID]int
  36. func (m activityMap) leastBusyNode(availability []protocol.NodeID) protocol.NodeID {
  37. var low int = 2<<30 - 1
  38. var selected protocol.NodeID
  39. for _, node := range availability {
  40. usage := m[node]
  41. if usage < low {
  42. low = usage
  43. selected = node
  44. }
  45. }
  46. m[selected]++
  47. return selected
  48. }
  49. func (m activityMap) decrease(node protocol.NodeID) {
  50. m[node]--
  51. }
  52. var errNoNode = errors.New("no available source node")
  53. type puller struct {
  54. cfg *config.Configuration
  55. repoCfg config.RepositoryConfiguration
  56. bq *blockQueue
  57. model *Model
  58. oustandingPerNode activityMap
  59. openFiles map[string]openFile
  60. requestSlots chan bool
  61. blocks chan bqBlock
  62. requestResults chan requestResult
  63. versioner versioner.Versioner
  64. }
  65. func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
  66. p := &puller{
  67. repoCfg: repoCfg,
  68. cfg: cfg,
  69. bq: newBlockQueue(),
  70. model: model,
  71. oustandingPerNode: make(activityMap),
  72. openFiles: make(map[string]openFile),
  73. requestSlots: make(chan bool, slots),
  74. blocks: make(chan bqBlock),
  75. requestResults: make(chan requestResult),
  76. }
  77. if len(repoCfg.Versioning.Type) > 0 {
  78. factory, ok := versioner.Factories[repoCfg.Versioning.Type]
  79. if !ok {
  80. l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type)
  81. }
  82. p.versioner = factory(repoCfg.Versioning.Params)
  83. }
  84. if slots > 0 {
  85. // Read/write
  86. for i := 0; i < slots; i++ {
  87. p.requestSlots <- true
  88. }
  89. if debug {
  90. l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
  91. }
  92. go p.run()
  93. } else {
  94. // Read only
  95. if debug {
  96. l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
  97. }
  98. go p.runRO()
  99. }
  100. return p
  101. }
  102. func (p *puller) run() {
  103. go func() {
  104. // fill blocks queue when there are free slots
  105. for {
  106. <-p.requestSlots
  107. b := p.bq.get()
  108. if debug {
  109. l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
  110. }
  111. p.blocks <- b
  112. }
  113. }()
  114. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  115. timeout := time.Tick(5 * time.Second)
  116. changed := true
  117. var prevVer uint64
  118. for {
  119. // Run the pulling loop as long as there are blocks to fetch
  120. pull:
  121. for {
  122. select {
  123. case res := <-p.requestResults:
  124. p.model.setState(p.repoCfg.ID, RepoSyncing)
  125. changed = true
  126. p.requestSlots <- true
  127. p.handleRequestResult(res)
  128. case b := <-p.blocks:
  129. p.model.setState(p.repoCfg.ID, RepoSyncing)
  130. changed = true
  131. if p.handleBlock(b) {
  132. // Block was fully handled, free up the slot
  133. p.requestSlots <- true
  134. }
  135. case <-timeout:
  136. if len(p.openFiles) == 0 && p.bq.empty() {
  137. // Nothing more to do for the moment
  138. break pull
  139. }
  140. if debug {
  141. l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
  142. i := 5
  143. for _, f := range p.openFiles {
  144. l.Debugf(" %v", f)
  145. i--
  146. if i == 0 {
  147. break
  148. }
  149. }
  150. }
  151. }
  152. }
  153. if changed {
  154. p.model.setState(p.repoCfg.ID, RepoCleaning)
  155. p.fixupDirectories()
  156. changed = false
  157. }
  158. p.model.setState(p.repoCfg.ID, RepoIdle)
  159. // Do a rescan if it's time for it
  160. select {
  161. case <-walkTicker:
  162. if debug {
  163. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  164. }
  165. err := p.model.ScanRepo(p.repoCfg.ID)
  166. if err != nil {
  167. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  168. return
  169. }
  170. default:
  171. }
  172. if v := p.model.Version(p.repoCfg.ID); v > prevVer {
  173. // Queue more blocks to fetch, if any
  174. p.queueNeededBlocks()
  175. prevVer = v
  176. }
  177. }
  178. }
  179. func (p *puller) runRO() {
  180. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  181. for _ = range walkTicker {
  182. if debug {
  183. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  184. }
  185. err := p.model.ScanRepo(p.repoCfg.ID)
  186. if err != nil {
  187. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  188. return
  189. }
  190. }
  191. }
  192. func (p *puller) fixupDirectories() {
  193. var deleteDirs []string
  194. var changed = 0
  195. var walkFn = func(path string, info os.FileInfo, err error) error {
  196. if err != nil {
  197. return err
  198. }
  199. if !info.IsDir() {
  200. return nil
  201. }
  202. rn, err := filepath.Rel(p.repoCfg.Directory, path)
  203. if err != nil {
  204. return nil
  205. }
  206. if rn == "." {
  207. return nil
  208. }
  209. if filepath.Base(rn) == ".stversions" {
  210. return filepath.SkipDir
  211. }
  212. cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
  213. if cur.Name != rn {
  214. // No matching dir in current list; weird
  215. if debug {
  216. l.Debugf("missing dir: %s; %v", rn, cur)
  217. }
  218. return nil
  219. }
  220. if protocol.IsDeleted(cur.Flags) {
  221. if debug {
  222. l.Debugf("queue delete dir: %v", cur)
  223. }
  224. // We queue the directories to delete since we walk the
  225. // tree in depth first order and need to remove the
  226. // directories in the opposite order.
  227. deleteDirs = append(deleteDirs, path)
  228. return nil
  229. }
  230. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) {
  231. err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
  232. if err != nil {
  233. l.Warnf("Restoring folder flags: %q: %v", path, err)
  234. } else {
  235. changed++
  236. if debug {
  237. l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
  238. }
  239. }
  240. }
  241. if cur.Modified != info.ModTime().Unix() {
  242. t := time.Unix(cur.Modified, 0)
  243. err := os.Chtimes(path, t, t)
  244. if err != nil {
  245. if runtime.GOOS != "windows" {
  246. // https://code.google.com/p/go/issues/detail?id=8090
  247. l.Warnf("Restoring folder modtime: %q: %v", path, err)
  248. }
  249. } else {
  250. changed++
  251. if debug {
  252. l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
  253. }
  254. }
  255. }
  256. return nil
  257. }
  258. for {
  259. deleteDirs = nil
  260. changed = 0
  261. filepath.Walk(p.repoCfg.Directory, walkFn)
  262. var deleted = 0
  263. // Delete any queued directories
  264. for i := len(deleteDirs) - 1; i >= 0; i-- {
  265. dir := deleteDirs[i]
  266. if debug {
  267. l.Debugln("delete dir:", dir)
  268. }
  269. err := os.Remove(dir)
  270. if err == nil {
  271. deleted++
  272. } else {
  273. l.Warnln("Delete dir:", err)
  274. }
  275. }
  276. if debug {
  277. l.Debugf("changed %d, deleted %d dirs", changed, deleted)
  278. }
  279. if changed+deleted == 0 {
  280. return
  281. }
  282. }
  283. }
  284. func (p *puller) handleRequestResult(res requestResult) {
  285. p.oustandingPerNode.decrease(res.node)
  286. f := res.file
  287. of, ok := p.openFiles[f.Name]
  288. if !ok || of.err != nil {
  289. // no entry in openFiles means there was an error and we've cancelled the operation
  290. return
  291. }
  292. _, of.err = of.file.WriteAt(res.data, res.offset)
  293. of.outstanding--
  294. p.openFiles[f.Name] = of
  295. if debug {
  296. l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
  297. }
  298. if of.done && of.outstanding == 0 {
  299. p.closeFile(f)
  300. }
  301. }
  302. // handleBlock fulfills the block request by copying, ignoring or fetching
  303. // from the network. Returns true if the block was fully handled
  304. // synchronously, i.e. if the slot can be reused.
  305. func (p *puller) handleBlock(b bqBlock) bool {
  306. f := b.file
  307. // For directories, making sure they exist is enough.
  308. // Deleted directories we mark as handled and delete later.
  309. if protocol.IsDirectory(f.Flags) {
  310. if !protocol.IsDeleted(f.Flags) {
  311. path := filepath.Join(p.repoCfg.Directory, f.Name)
  312. _, err := os.Stat(path)
  313. if err != nil && os.IsNotExist(err) {
  314. if debug {
  315. l.Debugf("create dir: %v", f)
  316. }
  317. err = os.MkdirAll(path, 0777)
  318. if err != nil {
  319. l.Warnf("Create folder: %q: %v", path, err)
  320. }
  321. }
  322. } else if debug {
  323. l.Debugf("ignore delete dir: %v", f)
  324. }
  325. p.model.updateLocal(p.repoCfg.ID, f)
  326. return true
  327. }
  328. if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last {
  329. // We are supposed to copy the entire file, and then fetch nothing.
  330. // We don't actually need to make the copy.
  331. if debug {
  332. l.Debugln("taking shortcut:", f)
  333. }
  334. fp := filepath.Join(p.repoCfg.Directory, f.Name)
  335. t := time.Unix(f.Modified, 0)
  336. err := os.Chtimes(fp, t, t)
  337. if debug && err != nil {
  338. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  339. }
  340. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  341. err = os.Chmod(fp, os.FileMode(f.Flags&0777))
  342. if debug && err != nil {
  343. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  344. }
  345. }
  346. p.model.updateLocal(p.repoCfg.ID, f)
  347. return true
  348. }
  349. of, ok := p.openFiles[f.Name]
  350. of.done = b.last
  351. if !ok {
  352. if debug {
  353. l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
  354. }
  355. of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name)
  356. of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
  357. of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
  358. dirName := filepath.Dir(of.filepath)
  359. _, err := os.Stat(dirName)
  360. if err != nil {
  361. err = os.MkdirAll(dirName, 0777)
  362. }
  363. if err != nil {
  364. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  365. }
  366. of.file, of.err = os.Create(of.temp)
  367. if of.err != nil {
  368. if debug {
  369. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  370. }
  371. if !b.last {
  372. p.openFiles[f.Name] = of
  373. }
  374. return true
  375. }
  376. osutil.HideFile(of.temp)
  377. }
  378. if of.err != nil {
  379. // We have already failed this file.
  380. if debug {
  381. l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
  382. }
  383. if b.last {
  384. delete(p.openFiles, f.Name)
  385. }
  386. return true
  387. }
  388. p.openFiles[f.Name] = of
  389. switch {
  390. case len(b.copy) > 0:
  391. p.handleCopyBlock(b)
  392. return true
  393. case b.block.Size > 0:
  394. return p.handleRequestBlock(b)
  395. default:
  396. p.handleEmptyBlock(b)
  397. return true
  398. }
  399. }
  400. func (p *puller) handleCopyBlock(b bqBlock) {
  401. // We have blocks to copy from the existing file
  402. f := b.file
  403. of := p.openFiles[f.Name]
  404. if debug {
  405. l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
  406. }
  407. var exfd *os.File
  408. exfd, of.err = os.Open(of.filepath)
  409. if of.err != nil {
  410. if debug {
  411. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  412. }
  413. of.file.Close()
  414. of.file = nil
  415. p.openFiles[f.Name] = of
  416. return
  417. }
  418. defer exfd.Close()
  419. for _, b := range b.copy {
  420. bs := make([]byte, b.Size)
  421. _, of.err = exfd.ReadAt(bs, b.Offset)
  422. if of.err == nil {
  423. _, of.err = of.file.WriteAt(bs, b.Offset)
  424. }
  425. if of.err != nil {
  426. if debug {
  427. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  428. }
  429. exfd.Close()
  430. of.file.Close()
  431. of.file = nil
  432. p.openFiles[f.Name] = of
  433. return
  434. }
  435. }
  436. }
  437. // handleRequestBlock tries to pull a block from the network. Returns true if
  438. // the block could _not_ be fetched (i.e. it was fully handled, matching the
  439. // return criteria of handleBlock)
  440. func (p *puller) handleRequestBlock(b bqBlock) bool {
  441. f := b.file
  442. of, ok := p.openFiles[f.Name]
  443. if !ok {
  444. panic("bug: request for non-open file")
  445. }
  446. node := p.oustandingPerNode.leastBusyNode(of.availability)
  447. if len(node) == 0 {
  448. of.err = errNoNode
  449. if of.file != nil {
  450. of.file.Close()
  451. of.file = nil
  452. os.Remove(of.temp)
  453. }
  454. if b.last {
  455. delete(p.openFiles, f.Name)
  456. } else {
  457. p.openFiles[f.Name] = of
  458. }
  459. return true
  460. }
  461. of.outstanding++
  462. p.openFiles[f.Name] = of
  463. go func(node protocol.NodeID, b bqBlock) {
  464. if debug {
  465. l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
  466. }
  467. bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
  468. p.requestResults <- requestResult{
  469. node: node,
  470. file: f,
  471. filepath: of.filepath,
  472. offset: b.block.Offset,
  473. data: bs,
  474. err: err,
  475. }
  476. }(node, b)
  477. return false
  478. }
  479. func (p *puller) handleEmptyBlock(b bqBlock) {
  480. f := b.file
  481. of := p.openFiles[f.Name]
  482. if b.last {
  483. if of.err == nil {
  484. of.file.Close()
  485. }
  486. }
  487. if protocol.IsDeleted(f.Flags) {
  488. if debug {
  489. l.Debugf("pull: delete %q", f.Name)
  490. }
  491. os.Remove(of.temp)
  492. os.Chmod(of.filepath, 0666)
  493. if p.versioner != nil {
  494. if debug {
  495. l.Debugln("pull: deleting with versioner")
  496. }
  497. if err := p.versioner.Archive(p.repoCfg.Directory, of.filepath); err == nil {
  498. p.model.updateLocal(p.repoCfg.ID, f)
  499. } else if debug {
  500. l.Debugln("pull: error:", err)
  501. }
  502. } else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
  503. p.model.updateLocal(p.repoCfg.ID, f)
  504. }
  505. } else {
  506. if debug {
  507. l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
  508. }
  509. t := time.Unix(f.Modified, 0)
  510. if os.Chtimes(of.temp, t, t) != nil {
  511. delete(p.openFiles, f.Name)
  512. return
  513. }
  514. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
  515. delete(p.openFiles, f.Name)
  516. return
  517. }
  518. osutil.ShowFile(of.temp)
  519. if osutil.Rename(of.temp, of.filepath) == nil {
  520. p.model.updateLocal(p.repoCfg.ID, f)
  521. }
  522. }
  523. delete(p.openFiles, f.Name)
  524. }
  525. func (p *puller) queueNeededBlocks() {
  526. queued := 0
  527. for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
  528. lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
  529. have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
  530. if debug {
  531. l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
  532. }
  533. queued++
  534. p.bq.put(bqAdd{
  535. file: f,
  536. have: have,
  537. need: need,
  538. })
  539. }
  540. if debug && queued > 0 {
  541. l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
  542. }
  543. }
  544. func (p *puller) closeFile(f scanner.File) {
  545. if debug {
  546. l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
  547. }
  548. of := p.openFiles[f.Name]
  549. of.file.Close()
  550. defer os.Remove(of.temp)
  551. delete(p.openFiles, f.Name)
  552. fd, err := os.Open(of.temp)
  553. if err != nil {
  554. if debug {
  555. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  556. }
  557. return
  558. }
  559. hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
  560. fd.Close()
  561. if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
  562. if debug {
  563. l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
  564. }
  565. return
  566. }
  567. for i := range hb {
  568. if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
  569. l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
  570. return
  571. }
  572. }
  573. t := time.Unix(f.Modified, 0)
  574. err = os.Chtimes(of.temp, t, t)
  575. if debug && err != nil {
  576. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  577. }
  578. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  579. err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
  580. if debug && err != nil {
  581. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  582. }
  583. }
  584. osutil.ShowFile(of.temp)
  585. if p.versioner != nil {
  586. err := p.versioner.Archive(p.repoCfg.Directory, of.filepath)
  587. if err != nil {
  588. if debug {
  589. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  590. }
  591. return
  592. }
  593. }
  594. if debug {
  595. l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
  596. }
  597. if err := osutil.Rename(of.temp, of.filepath); err == nil {
  598. p.model.updateLocal(p.repoCfg.ID, f)
  599. } else {
  600. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  601. }
  602. }
  603. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  604. for i := range cfg.Repositories {
  605. repo := &cfg.Repositories[i]
  606. if repo.ID == repoID {
  607. repo.Invalid = err.Error()
  608. return
  609. }
  610. }
  611. }