puller.go 17 KB


  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "bytes"
  7. "errors"
  8. "os"
  9. "path/filepath"
  10. "time"
  11. "github.com/calmh/syncthing/config"
  12. "github.com/calmh/syncthing/events"
  13. "github.com/calmh/syncthing/osutil"
  14. "github.com/calmh/syncthing/protocol"
  15. "github.com/calmh/syncthing/scanner"
  16. "github.com/calmh/syncthing/versioner"
  17. )
  18. type requestResult struct {
  19. node protocol.NodeID
  20. file protocol.FileInfo
  21. filepath string // full filepath name
  22. offset int64
  23. data []byte
  24. err error
  25. }
  26. type openFile struct {
  27. filepath string // full filepath name
  28. temp string // temporary filename
  29. availability []protocol.NodeID
  30. file *os.File
  31. err error // error when opening or writing to file, all following operations are cancelled
  32. outstanding int // number of requests we still have outstanding
  33. done bool // we have sent all requests for this file
  34. }
  35. type activityMap map[protocol.NodeID]int
  36. func (m activityMap) leastBusyNode(availability []protocol.NodeID, isValid func(protocol.NodeID) bool) protocol.NodeID {
  37. var low int = 2<<30 - 1
  38. var selected protocol.NodeID
  39. for _, node := range availability {
  40. usage := m[node]
  41. if usage < low && isValid(node) {
  42. low = usage
  43. selected = node
  44. }
  45. }
  46. m[selected]++
  47. return selected
  48. }
  49. func (m activityMap) decrease(node protocol.NodeID) {
  50. m[node]--
  51. }
  52. var errNoNode = errors.New("no available source node")
  53. type puller struct {
  54. cfg *config.Configuration
  55. repoCfg config.RepositoryConfiguration
  56. bq blockQueue
  57. slots int
  58. model *Model
  59. oustandingPerNode activityMap
  60. openFiles map[string]openFile
  61. requestSlots chan bool
  62. blocks chan bqBlock
  63. requestResults chan requestResult
  64. versioner versioner.Versioner
  65. }
  66. func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
  67. p := &puller{
  68. cfg: cfg,
  69. repoCfg: repoCfg,
  70. slots: slots,
  71. model: model,
  72. oustandingPerNode: make(activityMap),
  73. openFiles: make(map[string]openFile),
  74. requestSlots: make(chan bool, slots),
  75. blocks: make(chan bqBlock),
  76. requestResults: make(chan requestResult),
  77. }
  78. if len(repoCfg.Versioning.Type) > 0 {
  79. factory, ok := versioner.Factories[repoCfg.Versioning.Type]
  80. if !ok {
  81. l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type)
  82. }
  83. p.versioner = factory(repoCfg.Versioning.Params)
  84. }
  85. if slots > 0 {
  86. // Read/write
  87. if debug {
  88. l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
  89. }
  90. go p.run()
  91. } else {
  92. // Read only
  93. if debug {
  94. l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
  95. }
  96. go p.runRO()
  97. }
  98. return p
  99. }
  100. func (p *puller) run() {
  101. changed := true
  102. scanintv := time.Duration(p.cfg.Options.RescanIntervalS) * time.Second
  103. lastscan := time.Now()
  104. var prevVer uint64
  105. var queued int
  106. // Load up the request slots
  107. for i := 0; i < cap(p.requestSlots); i++ {
  108. p.requestSlots <- true
  109. }
  110. for {
  111. // Run the pulling loop as long as there are blocks to fetch
  112. prevVer, queued = p.queueNeededBlocks(prevVer)
  113. if queued > 0 {
  114. pull:
  115. for {
  116. select {
  117. case res := <-p.requestResults:
  118. p.model.setState(p.repoCfg.ID, RepoSyncing)
  119. changed = true
  120. p.requestSlots <- true
  121. p.handleRequestResult(res)
  122. case <-p.requestSlots:
  123. b, ok := p.bq.get()
  124. if !ok {
  125. if debug {
  126. l.Debugf("%q: pulling loop needs more blocks", p.repoCfg.ID)
  127. }
  128. prevVer, _ = p.queueNeededBlocks(prevVer)
  129. b, ok = p.bq.get()
  130. }
  131. if !ok && len(p.openFiles) == 0 {
  132. // Nothing queued, nothing outstanding
  133. if debug {
  134. l.Debugf("%q: pulling loop done", p.repoCfg.ID)
  135. }
  136. break pull
  137. }
  138. if !ok {
  139. // Nothing queued, but there are still open files.
  140. // Give the situation a moment to change.
  141. if debug {
  142. l.Debugf("%q: pulling loop paused", p.repoCfg.ID)
  143. }
  144. p.requestSlots <- true
  145. time.Sleep(100 * time.Millisecond)
  146. continue pull
  147. }
  148. if debug {
  149. l.Debugf("queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
  150. }
  151. p.model.setState(p.repoCfg.ID, RepoSyncing)
  152. changed = true
  153. if p.handleBlock(b) {
  154. // Block was fully handled, free up the slot
  155. p.requestSlots <- true
  156. }
  157. }
  158. }
  159. }
  160. if changed {
  161. p.model.setState(p.repoCfg.ID, RepoCleaning)
  162. p.fixupDirectories()
  163. changed = false
  164. }
  165. p.model.setState(p.repoCfg.ID, RepoIdle)
  166. // Do a rescan if it's time for it
  167. if time.Since(lastscan) > scanintv {
  168. if debug {
  169. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  170. }
  171. err := p.model.ScanRepo(p.repoCfg.ID)
  172. if err != nil {
  173. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  174. return
  175. }
  176. lastscan = time.Now()
  177. }
  178. time.Sleep(5 * time.Second)
  179. }
  180. }
  181. func (p *puller) runRO() {
  182. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  183. for _ = range walkTicker {
  184. if debug {
  185. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  186. }
  187. err := p.model.ScanRepo(p.repoCfg.ID)
  188. if err != nil {
  189. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  190. return
  191. }
  192. }
  193. }
  194. func (p *puller) fixupDirectories() {
  195. var deleteDirs []string
  196. var changed = 0
  197. var walkFn = func(path string, info os.FileInfo, err error) error {
  198. if err != nil {
  199. return err
  200. }
  201. if !info.IsDir() {
  202. return nil
  203. }
  204. rn, err := filepath.Rel(p.repoCfg.Directory, path)
  205. if err != nil {
  206. return nil
  207. }
  208. if rn == "." {
  209. return nil
  210. }
  211. if filepath.Base(rn) == ".stversions" {
  212. return filepath.SkipDir
  213. }
  214. cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
  215. if cur.Name != rn {
  216. // No matching dir in current list; weird
  217. if debug {
  218. l.Debugf("missing dir: %s; %v", rn, cur)
  219. }
  220. return nil
  221. }
  222. if protocol.IsDeleted(cur.Flags) {
  223. if debug {
  224. l.Debugf("queue delete dir: %v", cur)
  225. }
  226. // We queue the directories to delete since we walk the
  227. // tree in depth first order and need to remove the
  228. // directories in the opposite order.
  229. deleteDirs = append(deleteDirs, path)
  230. return nil
  231. }
  232. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) {
  233. err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
  234. if err != nil {
  235. l.Warnf("Restoring folder flags: %q: %v", path, err)
  236. } else {
  237. changed++
  238. if debug {
  239. l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
  240. }
  241. }
  242. }
  243. return nil
  244. }
  245. for {
  246. deleteDirs = nil
  247. changed = 0
  248. filepath.Walk(p.repoCfg.Directory, walkFn)
  249. var deleted = 0
  250. // Delete any queued directories
  251. for i := len(deleteDirs) - 1; i >= 0; i-- {
  252. dir := deleteDirs[i]
  253. if debug {
  254. l.Debugln("delete dir:", dir)
  255. }
  256. err := os.Remove(dir)
  257. if err == nil {
  258. deleted++
  259. } else {
  260. l.Warnln("Delete dir:", err)
  261. }
  262. }
  263. if debug {
  264. l.Debugf("changed %d, deleted %d dirs", changed, deleted)
  265. }
  266. if changed+deleted == 0 {
  267. return
  268. }
  269. }
  270. }
  271. func (p *puller) handleRequestResult(res requestResult) {
  272. p.oustandingPerNode.decrease(res.node)
  273. f := res.file
  274. of, ok := p.openFiles[f.Name]
  275. if !ok {
  276. // no entry in openFiles means there was an error and we've cancelled the operation
  277. return
  278. }
  279. if res.err != nil {
  280. // This request resulted in an error
  281. of.err = res.err
  282. if debug {
  283. l.Debugf("pull: not writing %q / %q offset %d: %v; (done=%v, outstanding=%d)", p.repoCfg.ID, f.Name, res.offset, res.err, of.done, of.outstanding)
  284. }
  285. } else if of.err == nil {
  286. // This request was sucessfull and nothing has failed previously either
  287. _, of.err = of.file.WriteAt(res.data, res.offset)
  288. if debug {
  289. l.Debugf("pull: wrote %q / %q offset %d len %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, len(res.data), of.outstanding, of.done)
  290. }
  291. }
  292. of.outstanding--
  293. p.openFiles[f.Name] = of
  294. if of.done && of.outstanding == 0 {
  295. p.closeFile(f)
  296. }
  297. }
  298. // handleBlock fulfills the block request by copying, ignoring or fetching
  299. // from the network. Returns true if the block was fully handled
  300. // synchronously, i.e. if the slot can be reused.
  301. func (p *puller) handleBlock(b bqBlock) bool {
  302. f := b.file
  303. // For directories, making sure they exist is enough.
  304. // Deleted directories we mark as handled and delete later.
  305. if protocol.IsDirectory(f.Flags) {
  306. if !protocol.IsDeleted(f.Flags) {
  307. path := filepath.Join(p.repoCfg.Directory, f.Name)
  308. _, err := os.Stat(path)
  309. if err != nil && os.IsNotExist(err) {
  310. if debug {
  311. l.Debugf("create dir: %v", f)
  312. }
  313. err = os.MkdirAll(path, os.FileMode(f.Flags&0777))
  314. if err != nil {
  315. l.Warnf("Create folder: %q: %v", path, err)
  316. }
  317. }
  318. } else if debug {
  319. l.Debugf("ignore delete dir: %v", f)
  320. }
  321. p.model.updateLocal(p.repoCfg.ID, f)
  322. return true
  323. }
  324. if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last {
  325. // We are supposed to copy the entire file, and then fetch nothing.
  326. // We don't actually need to make the copy.
  327. if debug {
  328. l.Debugln("taking shortcut:", f)
  329. }
  330. fp := filepath.Join(p.repoCfg.Directory, f.Name)
  331. t := time.Unix(f.Modified, 0)
  332. err := os.Chtimes(fp, t, t)
  333. if debug && err != nil {
  334. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  335. }
  336. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  337. err = os.Chmod(fp, os.FileMode(f.Flags&0777))
  338. if debug && err != nil {
  339. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  340. }
  341. }
  342. events.Default.Log(events.ItemStarted, map[string]string{
  343. "repo": p.repoCfg.ID,
  344. "item": f.Name,
  345. })
  346. p.model.updateLocal(p.repoCfg.ID, f)
  347. return true
  348. }
  349. of, ok := p.openFiles[f.Name]
  350. of.done = b.last
  351. if !ok {
  352. if debug {
  353. l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
  354. }
  355. events.Default.Log(events.ItemStarted, map[string]string{
  356. "repo": p.repoCfg.ID,
  357. "item": f.Name,
  358. })
  359. of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name)
  360. of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
  361. of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
  362. dirName := filepath.Dir(of.filepath)
  363. _, err := os.Stat(dirName)
  364. if err != nil {
  365. err = os.MkdirAll(dirName, 0777)
  366. }
  367. if err != nil {
  368. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  369. }
  370. of.file, of.err = os.Create(of.temp)
  371. if of.err != nil {
  372. if debug {
  373. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  374. }
  375. if !b.last {
  376. p.openFiles[f.Name] = of
  377. }
  378. return true
  379. }
  380. osutil.HideFile(of.temp)
  381. }
  382. if of.err != nil {
  383. // We have already failed this file.
  384. if debug {
  385. l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
  386. }
  387. if b.last {
  388. delete(p.openFiles, f.Name)
  389. }
  390. return true
  391. }
  392. p.openFiles[f.Name] = of
  393. switch {
  394. case len(b.copy) > 0:
  395. p.handleCopyBlock(b)
  396. return true
  397. case b.block.Size > 0:
  398. return p.handleRequestBlock(b)
  399. default:
  400. p.handleEmptyBlock(b)
  401. return true
  402. }
  403. }
  404. func (p *puller) handleCopyBlock(b bqBlock) {
  405. // We have blocks to copy from the existing file
  406. f := b.file
  407. of := p.openFiles[f.Name]
  408. if debug {
  409. l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
  410. }
  411. var exfd *os.File
  412. exfd, of.err = os.Open(of.filepath)
  413. if of.err != nil {
  414. if debug {
  415. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  416. }
  417. of.file.Close()
  418. of.file = nil
  419. p.openFiles[f.Name] = of
  420. return
  421. }
  422. defer exfd.Close()
  423. for _, b := range b.copy {
  424. bs := make([]byte, b.Size)
  425. _, of.err = exfd.ReadAt(bs, b.Offset)
  426. if of.err == nil {
  427. _, of.err = of.file.WriteAt(bs, b.Offset)
  428. }
  429. if of.err != nil {
  430. if debug {
  431. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  432. }
  433. exfd.Close()
  434. of.file.Close()
  435. of.file = nil
  436. p.openFiles[f.Name] = of
  437. return
  438. }
  439. }
  440. }
  441. // handleRequestBlock tries to pull a block from the network. Returns true if
  442. // the block could _not_ be fetched (i.e. it was fully handled, matching the
  443. // return criteria of handleBlock)
  444. func (p *puller) handleRequestBlock(b bqBlock) bool {
  445. f := b.file
  446. of, ok := p.openFiles[f.Name]
  447. if !ok {
  448. panic("bug: request for non-open file")
  449. }
  450. node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.ConnectedTo)
  451. if node == (protocol.NodeID{}) {
  452. of.err = errNoNode
  453. if of.file != nil {
  454. of.file.Close()
  455. of.file = nil
  456. os.Remove(of.temp)
  457. if debug {
  458. l.Debugf("pull: no source for %q / %q; closed", p.repoCfg.ID, f.Name)
  459. }
  460. }
  461. if b.last {
  462. if debug {
  463. l.Debugf("pull: no source for %q / %q; deleting", p.repoCfg.ID, f.Name)
  464. }
  465. delete(p.openFiles, f.Name)
  466. } else {
  467. if debug {
  468. l.Debugf("pull: no source for %q / %q; await more blocks", p.repoCfg.ID, f.Name)
  469. }
  470. p.openFiles[f.Name] = of
  471. }
  472. return true
  473. }
  474. of.outstanding++
  475. p.openFiles[f.Name] = of
  476. go func(node protocol.NodeID, b bqBlock) {
  477. if debug {
  478. l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
  479. }
  480. bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
  481. p.requestResults <- requestResult{
  482. node: node,
  483. file: f,
  484. filepath: of.filepath,
  485. offset: b.block.Offset,
  486. data: bs,
  487. err: err,
  488. }
  489. }(node, b)
  490. return false
  491. }
  492. func (p *puller) handleEmptyBlock(b bqBlock) {
  493. f := b.file
  494. of := p.openFiles[f.Name]
  495. if b.last {
  496. if of.err == nil {
  497. of.file.Close()
  498. }
  499. }
  500. if protocol.IsDeleted(f.Flags) {
  501. if debug {
  502. l.Debugf("pull: delete %q", f.Name)
  503. }
  504. os.Remove(of.temp)
  505. os.Chmod(of.filepath, 0666)
  506. if p.versioner != nil {
  507. if debug {
  508. l.Debugln("pull: deleting with versioner")
  509. }
  510. if err := p.versioner.Archive(p.repoCfg.Directory, of.filepath); err == nil {
  511. p.model.updateLocal(p.repoCfg.ID, f)
  512. } else if debug {
  513. l.Debugln("pull: error:", err)
  514. }
  515. } else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
  516. p.model.updateLocal(p.repoCfg.ID, f)
  517. }
  518. } else {
  519. if debug {
  520. l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
  521. }
  522. t := time.Unix(f.Modified, 0)
  523. if os.Chtimes(of.temp, t, t) != nil {
  524. delete(p.openFiles, f.Name)
  525. return
  526. }
  527. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
  528. delete(p.openFiles, f.Name)
  529. return
  530. }
  531. osutil.ShowFile(of.temp)
  532. if osutil.Rename(of.temp, of.filepath) == nil {
  533. p.model.updateLocal(p.repoCfg.ID, f)
  534. }
  535. }
  536. delete(p.openFiles, f.Name)
  537. }
  538. func (p *puller) queueNeededBlocks(prevVer uint64) (uint64, int) {
  539. curVer := p.model.LocalVersion(p.repoCfg.ID)
  540. if curVer == prevVer {
  541. return curVer, 0
  542. }
  543. if debug {
  544. l.Debugf("%q: checking for more needed blocks", p.repoCfg.ID)
  545. }
  546. queued := 0
  547. for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
  548. if _, ok := p.openFiles[f.Name]; ok {
  549. continue
  550. }
  551. lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
  552. have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
  553. if debug {
  554. l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
  555. }
  556. queued++
  557. p.bq.put(bqAdd{
  558. file: f,
  559. have: have,
  560. need: need,
  561. })
  562. }
  563. if debug && queued > 0 {
  564. l.Debugf("%q: queued %d items", p.repoCfg.ID, queued)
  565. }
  566. if queued > 0 {
  567. return prevVer, queued
  568. } else {
  569. return curVer, 0
  570. }
  571. }
  572. func (p *puller) closeFile(f protocol.FileInfo) {
  573. if debug {
  574. l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
  575. }
  576. of := p.openFiles[f.Name]
  577. of.file.Close()
  578. defer os.Remove(of.temp)
  579. delete(p.openFiles, f.Name)
  580. fd, err := os.Open(of.temp)
  581. if err != nil {
  582. if debug {
  583. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  584. }
  585. return
  586. }
  587. hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
  588. fd.Close()
  589. if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
  590. if debug {
  591. l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
  592. }
  593. return
  594. }
  595. for i := range hb {
  596. if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
  597. l.Debugf("pull: %q / %q: block %d hash mismatch\n\thave: %x\n\twant: %x", p.repoCfg.ID, f.Name, i, hb[i].Hash, f.Blocks[i].Hash)
  598. return
  599. }
  600. }
  601. t := time.Unix(f.Modified, 0)
  602. err = os.Chtimes(of.temp, t, t)
  603. if debug && err != nil {
  604. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  605. }
  606. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  607. err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
  608. if debug && err != nil {
  609. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  610. }
  611. }
  612. osutil.ShowFile(of.temp)
  613. if p.versioner != nil {
  614. err := p.versioner.Archive(p.repoCfg.Directory, of.filepath)
  615. if err != nil {
  616. if debug {
  617. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  618. }
  619. return
  620. }
  621. }
  622. if debug {
  623. l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
  624. }
  625. if err := osutil.Rename(of.temp, of.filepath); err == nil {
  626. p.model.updateLocal(p.repoCfg.ID, f)
  627. } else {
  628. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  629. }
  630. }
  631. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  632. for i := range cfg.Repositories {
  633. repo := &cfg.Repositories[i]
  634. if repo.ID == repoID {
  635. repo.Invalid = err.Error()
  636. return
  637. }
  638. }
  639. }