puller.go 16 KB


  1. // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
  2. // Use of this source code is governed by an MIT-style license that can be
  3. // found in the LICENSE file.
  4. package model
  5. import (
  6. "bytes"
  7. "errors"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "time"
  12. "github.com/calmh/syncthing/buffers"
  13. "github.com/calmh/syncthing/cid"
  14. "github.com/calmh/syncthing/config"
  15. "github.com/calmh/syncthing/osutil"
  16. "github.com/calmh/syncthing/protocol"
  17. "github.com/calmh/syncthing/scanner"
  18. "github.com/calmh/syncthing/versioner"
  19. )
  20. type requestResult struct {
  21. node string
  22. file scanner.File
  23. filepath string // full filepath name
  24. offset int64
  25. data []byte
  26. err error
  27. }
  28. type openFile struct {
  29. filepath string // full filepath name
  30. temp string // temporary filename
  31. availability uint64 // availability bitset
  32. file *os.File
  33. err error // error when opening or writing to file, all following operations are cancelled
  34. outstanding int // number of requests we still have outstanding
  35. done bool // we have sent all requests for this file
  36. }
  37. type activityMap map[string]int
  38. func (m activityMap) leastBusyNode(availability uint64, cm *cid.Map) string {
  39. var low int = 2<<30 - 1
  40. var selected string
  41. for _, node := range cm.Names() {
  42. id := cm.Get(node)
  43. if id == cid.LocalID {
  44. continue
  45. }
  46. usage := m[node]
  47. if availability&(1<<id) != 0 {
  48. if usage < low {
  49. low = usage
  50. selected = node
  51. }
  52. }
  53. }
  54. m[selected]++
  55. return selected
  56. }
  57. func (m activityMap) decrease(node string) {
  58. m[node]--
  59. }
  60. var errNoNode = errors.New("no available source node")
  61. type puller struct {
  62. cfg *config.Configuration
  63. repoCfg config.RepositoryConfiguration
  64. bq *blockQueue
  65. model *Model
  66. oustandingPerNode activityMap
  67. openFiles map[string]openFile
  68. requestSlots chan bool
  69. blocks chan bqBlock
  70. requestResults chan requestResult
  71. versioner versioner.Versioner
  72. }
  73. func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
  74. p := &puller{
  75. repoCfg: repoCfg,
  76. cfg: cfg,
  77. bq: newBlockQueue(),
  78. model: model,
  79. oustandingPerNode: make(activityMap),
  80. openFiles: make(map[string]openFile),
  81. requestSlots: make(chan bool, slots),
  82. blocks: make(chan bqBlock),
  83. requestResults: make(chan requestResult),
  84. }
  85. if len(repoCfg.Versioning.Type) > 0 {
  86. factory, ok := versioner.Factories[repoCfg.Versioning.Type]
  87. if !ok {
  88. l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type)
  89. }
  90. p.versioner = factory(repoCfg.Versioning.Params)
  91. }
  92. if slots > 0 {
  93. // Read/write
  94. for i := 0; i < slots; i++ {
  95. p.requestSlots <- true
  96. }
  97. if debug {
  98. l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
  99. }
  100. go p.run()
  101. } else {
  102. // Read only
  103. if debug {
  104. l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
  105. }
  106. go p.runRO()
  107. }
  108. return p
  109. }
  110. func (p *puller) run() {
  111. go func() {
  112. // fill blocks queue when there are free slots
  113. for {
  114. <-p.requestSlots
  115. b := p.bq.get()
  116. if debug {
  117. l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
  118. }
  119. p.blocks <- b
  120. }
  121. }()
  122. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  123. timeout := time.Tick(5 * time.Second)
  124. changed := true
  125. for {
  126. // Run the pulling loop as long as there are blocks to fetch
  127. pull:
  128. for {
  129. select {
  130. case res := <-p.requestResults:
  131. p.model.setState(p.repoCfg.ID, RepoSyncing)
  132. changed = true
  133. p.requestSlots <- true
  134. p.handleRequestResult(res)
  135. case b := <-p.blocks:
  136. p.model.setState(p.repoCfg.ID, RepoSyncing)
  137. changed = true
  138. if p.handleBlock(b) {
  139. // Block was fully handled, free up the slot
  140. p.requestSlots <- true
  141. }
  142. case <-timeout:
  143. if len(p.openFiles) == 0 && p.bq.empty() {
  144. // Nothing more to do for the moment
  145. break pull
  146. }
  147. if debug {
  148. l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
  149. i := 5
  150. for _, f := range p.openFiles {
  151. l.Debugf(" %v", f)
  152. i--
  153. if i == 0 {
  154. break
  155. }
  156. }
  157. }
  158. }
  159. }
  160. if changed {
  161. p.model.setState(p.repoCfg.ID, RepoCleaning)
  162. p.fixupDirectories()
  163. changed = false
  164. }
  165. p.model.setState(p.repoCfg.ID, RepoIdle)
  166. // Do a rescan if it's time for it
  167. select {
  168. case <-walkTicker:
  169. if debug {
  170. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  171. }
  172. err := p.model.ScanRepo(p.repoCfg.ID)
  173. if err != nil {
  174. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  175. return
  176. }
  177. default:
  178. }
  179. // Queue more blocks to fetch, if any
  180. p.queueNeededBlocks()
  181. }
  182. }
  183. func (p *puller) runRO() {
  184. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  185. for _ = range walkTicker {
  186. if debug {
  187. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  188. }
  189. err := p.model.ScanRepo(p.repoCfg.ID)
  190. if err != nil {
  191. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  192. return
  193. }
  194. }
  195. }
  196. func (p *puller) fixupDirectories() {
  197. var deleteDirs []string
  198. var changed = 0
  199. var walkFn = func(path string, info os.FileInfo, err error) error {
  200. if err != nil {
  201. return err
  202. }
  203. if !info.IsDir() {
  204. return nil
  205. }
  206. rn, err := filepath.Rel(p.repoCfg.Directory, path)
  207. if err != nil {
  208. return nil
  209. }
  210. if rn == "." {
  211. return nil
  212. }
  213. if filepath.Base(rn) == ".stversions" {
  214. return nil
  215. }
  216. cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
  217. if cur.Name != rn {
  218. // No matching dir in current list; weird
  219. if debug {
  220. l.Debugf("missing dir: %s; %v", rn, cur)
  221. }
  222. return nil
  223. }
  224. if protocol.IsDeleted(cur.Flags) {
  225. if debug {
  226. l.Debugf("queue delete dir: %v", cur)
  227. }
  228. // We queue the directories to delete since we walk the
  229. // tree in depth first order and need to remove the
  230. // directories in the opposite order.
  231. deleteDirs = append(deleteDirs, path)
  232. return nil
  233. }
  234. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) {
  235. err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
  236. if err != nil {
  237. l.Warnf("Restoring folder flags: %q: %v", path, err)
  238. } else {
  239. changed++
  240. if debug {
  241. l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
  242. }
  243. }
  244. }
  245. if cur.Modified != info.ModTime().Unix() {
  246. t := time.Unix(cur.Modified, 0)
  247. err := os.Chtimes(path, t, t)
  248. if err != nil {
  249. if runtime.GOOS != "windows" {
  250. // https://code.google.com/p/go/issues/detail?id=8090
  251. l.Warnf("Restoring folder modtime: %q: %v", path, err)
  252. }
  253. } else {
  254. changed++
  255. if debug {
  256. l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
  257. }
  258. }
  259. }
  260. return nil
  261. }
  262. for {
  263. deleteDirs = nil
  264. changed = 0
  265. filepath.Walk(p.repoCfg.Directory, walkFn)
  266. var deleted = 0
  267. // Delete any queued directories
  268. for i := len(deleteDirs) - 1; i >= 0; i-- {
  269. dir := deleteDirs[i]
  270. if debug {
  271. l.Debugln("delete dir:", dir)
  272. }
  273. err := os.Remove(dir)
  274. if err == nil {
  275. deleted++
  276. } else if p.versioner == nil { // Failures are expected in the presence of versioning
  277. l.Warnln(err)
  278. }
  279. }
  280. if debug {
  281. l.Debugf("changed %d, deleted %d dirs", changed, deleted)
  282. }
  283. if changed+deleted == 0 {
  284. return
  285. }
  286. }
  287. }
  288. func (p *puller) handleRequestResult(res requestResult) {
  289. p.oustandingPerNode.decrease(res.node)
  290. f := res.file
  291. of, ok := p.openFiles[f.Name]
  292. if !ok || of.err != nil {
  293. // no entry in openFiles means there was an error and we've cancelled the operation
  294. return
  295. }
  296. _, of.err = of.file.WriteAt(res.data, res.offset)
  297. buffers.Put(res.data)
  298. of.outstanding--
  299. p.openFiles[f.Name] = of
  300. if debug {
  301. l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
  302. }
  303. if of.done && of.outstanding == 0 {
  304. p.closeFile(f)
  305. }
  306. }
  307. // handleBlock fulfills the block request by copying, ignoring or fetching
  308. // from the network. Returns true if the block was fully handled
  309. // synchronously, i.e. if the slot can be reused.
  310. func (p *puller) handleBlock(b bqBlock) bool {
  311. f := b.file
  312. // For directories, making sure they exist is enough.
  313. // Deleted directories we mark as handled and delete later.
  314. if protocol.IsDirectory(f.Flags) {
  315. if !protocol.IsDeleted(f.Flags) {
  316. path := filepath.Join(p.repoCfg.Directory, f.Name)
  317. _, err := os.Stat(path)
  318. if err != nil && os.IsNotExist(err) {
  319. if debug {
  320. l.Debugf("create dir: %v", f)
  321. }
  322. err = os.MkdirAll(path, 0777)
  323. if err != nil {
  324. l.Warnf("Create folder: %q: %v", path, err)
  325. }
  326. }
  327. } else if debug {
  328. l.Debugf("ignore delete dir: %v", f)
  329. }
  330. p.model.updateLocal(p.repoCfg.ID, f)
  331. return true
  332. }
  333. if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last {
  334. // We are supposed to copy the entire file, and then fetch nothing.
  335. // We don't actually need to make the copy.
  336. if debug {
  337. l.Debugln("taking shortcut:", f)
  338. }
  339. fp := filepath.Join(p.repoCfg.Directory, f.Name)
  340. t := time.Unix(f.Modified, 0)
  341. err := os.Chtimes(fp, t, t)
  342. if debug && err != nil {
  343. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  344. }
  345. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  346. err = os.Chmod(fp, os.FileMode(f.Flags&0777))
  347. if debug && err != nil {
  348. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  349. }
  350. }
  351. p.model.updateLocal(p.repoCfg.ID, f)
  352. return true
  353. }
  354. of, ok := p.openFiles[f.Name]
  355. of.done = b.last
  356. if !ok {
  357. if debug {
  358. l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
  359. }
  360. of.availability = uint64(p.model.repoFiles[p.repoCfg.ID].Availability(f.Name))
  361. of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
  362. of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
  363. dirName := filepath.Dir(of.filepath)
  364. _, err := os.Stat(dirName)
  365. if err != nil {
  366. err = os.MkdirAll(dirName, 0777)
  367. }
  368. if err != nil {
  369. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  370. }
  371. of.file, of.err = os.Create(of.temp)
  372. if of.err != nil {
  373. if debug {
  374. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  375. }
  376. if !b.last {
  377. p.openFiles[f.Name] = of
  378. }
  379. return true
  380. }
  381. osutil.HideFile(of.temp)
  382. }
  383. if of.err != nil {
  384. // We have already failed this file.
  385. if debug {
  386. l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
  387. }
  388. if b.last {
  389. delete(p.openFiles, f.Name)
  390. }
  391. return true
  392. }
  393. p.openFiles[f.Name] = of
  394. switch {
  395. case len(b.copy) > 0:
  396. p.handleCopyBlock(b)
  397. return true
  398. case b.block.Size > 0:
  399. return p.handleRequestBlock(b)
  400. default:
  401. p.handleEmptyBlock(b)
  402. return true
  403. }
  404. }
  405. func (p *puller) handleCopyBlock(b bqBlock) {
  406. // We have blocks to copy from the existing file
  407. f := b.file
  408. of := p.openFiles[f.Name]
  409. if debug {
  410. l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
  411. }
  412. var exfd *os.File
  413. exfd, of.err = os.Open(of.filepath)
  414. if of.err != nil {
  415. if debug {
  416. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  417. }
  418. of.file.Close()
  419. of.file = nil
  420. p.openFiles[f.Name] = of
  421. return
  422. }
  423. defer exfd.Close()
  424. for _, b := range b.copy {
  425. bs := buffers.Get(int(b.Size))
  426. _, of.err = exfd.ReadAt(bs, b.Offset)
  427. if of.err == nil {
  428. _, of.err = of.file.WriteAt(bs, b.Offset)
  429. }
  430. buffers.Put(bs)
  431. if of.err != nil {
  432. if debug {
  433. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  434. }
  435. exfd.Close()
  436. of.file.Close()
  437. of.file = nil
  438. p.openFiles[f.Name] = of
  439. return
  440. }
  441. }
  442. }
  443. // handleRequestBlock tries to pull a block from the network. Returns true if
  444. // the block could _not_ be fetched (i.e. it was fully handled, matching the
  445. // return criteria of handleBlock)
  446. func (p *puller) handleRequestBlock(b bqBlock) bool {
  447. f := b.file
  448. of, ok := p.openFiles[f.Name]
  449. if !ok {
  450. panic("bug: request for non-open file")
  451. }
  452. node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.cm)
  453. if len(node) == 0 {
  454. of.err = errNoNode
  455. if of.file != nil {
  456. of.file.Close()
  457. of.file = nil
  458. os.Remove(of.temp)
  459. }
  460. if b.last {
  461. delete(p.openFiles, f.Name)
  462. } else {
  463. p.openFiles[f.Name] = of
  464. }
  465. return true
  466. }
  467. of.outstanding++
  468. p.openFiles[f.Name] = of
  469. go func(node string, b bqBlock) {
  470. if debug {
  471. l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
  472. }
  473. bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
  474. p.requestResults <- requestResult{
  475. node: node,
  476. file: f,
  477. filepath: of.filepath,
  478. offset: b.block.Offset,
  479. data: bs,
  480. err: err,
  481. }
  482. }(node, b)
  483. return false
  484. }
  485. func (p *puller) handleEmptyBlock(b bqBlock) {
  486. f := b.file
  487. of := p.openFiles[f.Name]
  488. if b.last {
  489. if of.err == nil {
  490. of.file.Close()
  491. }
  492. }
  493. if protocol.IsDeleted(f.Flags) {
  494. if debug {
  495. l.Debugf("pull: delete %q", f.Name)
  496. }
  497. os.Remove(of.temp)
  498. os.Chmod(of.filepath, 0666)
  499. if p.versioner != nil {
  500. if err := p.versioner.Archive(of.filepath); err == nil {
  501. p.model.updateLocal(p.repoCfg.ID, f)
  502. }
  503. } else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
  504. p.model.updateLocal(p.repoCfg.ID, f)
  505. }
  506. } else {
  507. if debug {
  508. l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
  509. }
  510. t := time.Unix(f.Modified, 0)
  511. if os.Chtimes(of.temp, t, t) != nil {
  512. delete(p.openFiles, f.Name)
  513. return
  514. }
  515. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
  516. delete(p.openFiles, f.Name)
  517. return
  518. }
  519. osutil.ShowFile(of.temp)
  520. if osutil.Rename(of.temp, of.filepath) == nil {
  521. p.model.updateLocal(p.repoCfg.ID, f)
  522. }
  523. }
  524. delete(p.openFiles, f.Name)
  525. }
  526. func (p *puller) queueNeededBlocks() {
  527. queued := 0
  528. for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
  529. lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
  530. have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
  531. if debug {
  532. l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
  533. }
  534. queued++
  535. p.bq.put(bqAdd{
  536. file: f,
  537. have: have,
  538. need: need,
  539. })
  540. }
  541. if debug && queued > 0 {
  542. l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
  543. }
  544. }
  545. func (p *puller) closeFile(f scanner.File) {
  546. if debug {
  547. l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
  548. }
  549. of := p.openFiles[f.Name]
  550. of.file.Close()
  551. defer os.Remove(of.temp)
  552. delete(p.openFiles, f.Name)
  553. fd, err := os.Open(of.temp)
  554. if err != nil {
  555. if debug {
  556. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  557. }
  558. return
  559. }
  560. hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
  561. fd.Close()
  562. if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
  563. if debug {
  564. l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
  565. }
  566. return
  567. }
  568. for i := range hb {
  569. if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
  570. l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
  571. return
  572. }
  573. }
  574. t := time.Unix(f.Modified, 0)
  575. err = os.Chtimes(of.temp, t, t)
  576. if debug && err != nil {
  577. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  578. }
  579. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  580. err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
  581. if debug && err != nil {
  582. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  583. }
  584. }
  585. osutil.ShowFile(of.temp)
  586. if p.versioner != nil {
  587. err := p.versioner.Archive(of.filepath)
  588. if err != nil {
  589. if debug {
  590. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  591. }
  592. return
  593. }
  594. }
  595. if debug {
  596. l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
  597. }
  598. if err := osutil.Rename(of.temp, of.filepath); err == nil {
  599. p.model.updateLocal(p.repoCfg.ID, f)
  600. } else {
  601. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  602. }
  603. }
  604. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  605. for i := range cfg.Repositories {
  606. repo := &cfg.Repositories[i]
  607. if repo.ID == repoID {
  608. repo.Invalid = err.Error()
  609. return
  610. }
  611. }
  612. }