puller.go 16 KB


  1. package model
  2. import (
  3. "bytes"
  4. "errors"
  5. "os"
  6. "path/filepath"
  7. "runtime"
  8. "time"
  9. "github.com/calmh/syncthing/buffers"
  10. "github.com/calmh/syncthing/cid"
  11. "github.com/calmh/syncthing/config"
  12. "github.com/calmh/syncthing/osutil"
  13. "github.com/calmh/syncthing/protocol"
  14. "github.com/calmh/syncthing/scanner"
  15. "github.com/calmh/syncthing/versioner"
  16. )
  17. type requestResult struct {
  18. node string
  19. file scanner.File
  20. filepath string // full filepath name
  21. offset int64
  22. data []byte
  23. err error
  24. }
  25. type openFile struct {
  26. filepath string // full filepath name
  27. temp string // temporary filename
  28. availability uint64 // availability bitset
  29. file *os.File
  30. err error // error when opening or writing to file, all following operations are cancelled
  31. outstanding int // number of requests we still have outstanding
  32. done bool // we have sent all requests for this file
  33. }
  34. type activityMap map[string]int
  35. func (m activityMap) leastBusyNode(availability uint64, cm *cid.Map) string {
  36. var low int = 2<<30 - 1
  37. var selected string
  38. for _, node := range cm.Names() {
  39. id := cm.Get(node)
  40. if id == cid.LocalID {
  41. continue
  42. }
  43. usage := m[node]
  44. if availability&(1<<id) != 0 {
  45. if usage < low {
  46. low = usage
  47. selected = node
  48. }
  49. }
  50. }
  51. m[selected]++
  52. return selected
  53. }
  54. func (m activityMap) decrease(node string) {
  55. m[node]--
  56. }
  57. var errNoNode = errors.New("no available source node")
  58. type puller struct {
  59. cfg *config.Configuration
  60. repoCfg config.RepositoryConfiguration
  61. bq *blockQueue
  62. model *Model
  63. oustandingPerNode activityMap
  64. openFiles map[string]openFile
  65. requestSlots chan bool
  66. blocks chan bqBlock
  67. requestResults chan requestResult
  68. versioner versioner.Versioner
  69. }
  70. func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
  71. p := &puller{
  72. repoCfg: repoCfg,
  73. cfg: cfg,
  74. bq: newBlockQueue(),
  75. model: model,
  76. oustandingPerNode: make(activityMap),
  77. openFiles: make(map[string]openFile),
  78. requestSlots: make(chan bool, slots),
  79. blocks: make(chan bqBlock),
  80. requestResults: make(chan requestResult),
  81. }
  82. if len(repoCfg.Versioning.Type) > 0 {
  83. factory, ok := versioner.Factories[repoCfg.Versioning.Type]
  84. if !ok {
  85. l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type)
  86. }
  87. p.versioner = factory(repoCfg.Versioning.Params)
  88. }
  89. if slots > 0 {
  90. // Read/write
  91. for i := 0; i < slots; i++ {
  92. p.requestSlots <- true
  93. }
  94. if debug {
  95. l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
  96. }
  97. go p.run()
  98. } else {
  99. // Read only
  100. if debug {
  101. l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
  102. }
  103. go p.runRO()
  104. }
  105. return p
  106. }
  107. func (p *puller) run() {
  108. go func() {
  109. // fill blocks queue when there are free slots
  110. for {
  111. <-p.requestSlots
  112. b := p.bq.get()
  113. if debug {
  114. l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
  115. }
  116. p.blocks <- b
  117. }
  118. }()
  119. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  120. timeout := time.Tick(5 * time.Second)
  121. changed := true
  122. for {
  123. // Run the pulling loop as long as there are blocks to fetch
  124. pull:
  125. for {
  126. select {
  127. case res := <-p.requestResults:
  128. p.model.setState(p.repoCfg.ID, RepoSyncing)
  129. changed = true
  130. p.requestSlots <- true
  131. p.handleRequestResult(res)
  132. case b := <-p.blocks:
  133. p.model.setState(p.repoCfg.ID, RepoSyncing)
  134. changed = true
  135. if p.handleBlock(b) {
  136. // Block was fully handled, free up the slot
  137. p.requestSlots <- true
  138. }
  139. case <-timeout:
  140. if len(p.openFiles) == 0 && p.bq.empty() {
  141. // Nothing more to do for the moment
  142. break pull
  143. }
  144. if debug {
  145. l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
  146. i := 5
  147. for _, f := range p.openFiles {
  148. l.Debugf(" %v", f)
  149. i--
  150. if i == 0 {
  151. break
  152. }
  153. }
  154. }
  155. }
  156. }
  157. if changed {
  158. p.model.setState(p.repoCfg.ID, RepoCleaning)
  159. p.fixupDirectories()
  160. changed = false
  161. }
  162. p.model.setState(p.repoCfg.ID, RepoIdle)
  163. // Do a rescan if it's time for it
  164. select {
  165. case <-walkTicker:
  166. if debug {
  167. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  168. }
  169. err := p.model.ScanRepo(p.repoCfg.ID)
  170. if err != nil {
  171. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  172. return
  173. }
  174. default:
  175. }
  176. // Queue more blocks to fetch, if any
  177. p.queueNeededBlocks()
  178. }
  179. }
  180. func (p *puller) runRO() {
  181. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  182. for _ = range walkTicker {
  183. if debug {
  184. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  185. }
  186. err := p.model.ScanRepo(p.repoCfg.ID)
  187. if err != nil {
  188. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  189. return
  190. }
  191. }
  192. }
  193. func (p *puller) fixupDirectories() {
  194. var deleteDirs []string
  195. var changed = 0
  196. var walkFn = func(path string, info os.FileInfo, err error) error {
  197. if !info.IsDir() {
  198. return nil
  199. }
  200. rn, err := filepath.Rel(p.repoCfg.Directory, path)
  201. if err != nil {
  202. return nil
  203. }
  204. if rn == "." {
  205. return nil
  206. }
  207. if filepath.Base(rn) == ".stversions" {
  208. return nil
  209. }
  210. cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
  211. if cur.Name != rn {
  212. // No matching dir in current list; weird
  213. if debug {
  214. l.Debugf("missing dir: %s; %v", rn, cur)
  215. }
  216. return nil
  217. }
  218. if protocol.IsDeleted(cur.Flags) {
  219. if debug {
  220. l.Debugf("queue delete dir: %v", cur)
  221. }
  222. // We queue the directories to delete since we walk the
  223. // tree in depth first order and need to remove the
  224. // directories in the opposite order.
  225. deleteDirs = append(deleteDirs, path)
  226. return nil
  227. }
  228. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) {
  229. err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
  230. if err != nil {
  231. l.Warnf("Restoring folder flags: %q: %v", path, err)
  232. } else {
  233. changed++
  234. if debug {
  235. l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
  236. }
  237. }
  238. }
  239. if cur.Modified != info.ModTime().Unix() {
  240. t := time.Unix(cur.Modified, 0)
  241. err := os.Chtimes(path, t, t)
  242. if err != nil {
  243. if runtime.GOOS != "windows" {
  244. // https://code.google.com/p/go/issues/detail?id=8090
  245. l.Warnf("Restoring folder modtime: %q: %v", path, err)
  246. }
  247. } else {
  248. changed++
  249. if debug {
  250. l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
  251. }
  252. }
  253. }
  254. return nil
  255. }
  256. for {
  257. deleteDirs = nil
  258. changed = 0
  259. filepath.Walk(p.repoCfg.Directory, walkFn)
  260. var deleted = 0
  261. // Delete any queued directories
  262. for i := len(deleteDirs) - 1; i >= 0; i-- {
  263. dir := deleteDirs[i]
  264. if debug {
  265. l.Debugln("delete dir:", dir)
  266. }
  267. err := os.Remove(dir)
  268. if err == nil {
  269. deleted++
  270. } else if p.versioner == nil { // Failures are expected in the presence of versioning
  271. l.Warnln(err)
  272. }
  273. }
  274. if debug {
  275. l.Debugf("changed %d, deleted %d dirs", changed, deleted)
  276. }
  277. if changed+deleted == 0 {
  278. return
  279. }
  280. }
  281. }
  282. func (p *puller) handleRequestResult(res requestResult) {
  283. p.oustandingPerNode.decrease(res.node)
  284. f := res.file
  285. of, ok := p.openFiles[f.Name]
  286. if !ok || of.err != nil {
  287. // no entry in openFiles means there was an error and we've cancelled the operation
  288. return
  289. }
  290. _, of.err = of.file.WriteAt(res.data, res.offset)
  291. buffers.Put(res.data)
  292. of.outstanding--
  293. p.openFiles[f.Name] = of
  294. if debug {
  295. l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
  296. }
  297. if of.done && of.outstanding == 0 {
  298. p.closeFile(f)
  299. }
  300. }
  301. // handleBlock fulfills the block request by copying, ignoring or fetching
  302. // from the network. Returns true if the block was fully handled
  303. // synchronously, i.e. if the slot can be reused.
  304. func (p *puller) handleBlock(b bqBlock) bool {
  305. f := b.file
  306. // For directories, making sure they exist is enough.
  307. // Deleted directories we mark as handled and delete later.
  308. if protocol.IsDirectory(f.Flags) {
  309. if !protocol.IsDeleted(f.Flags) {
  310. path := filepath.Join(p.repoCfg.Directory, f.Name)
  311. _, err := os.Stat(path)
  312. if err != nil && os.IsNotExist(err) {
  313. if debug {
  314. l.Debugf("create dir: %v", f)
  315. }
  316. err = os.MkdirAll(path, 0777)
  317. if err != nil {
  318. l.Warnf("Create folder: %q: %v", path, err)
  319. }
  320. }
  321. } else if debug {
  322. l.Debugf("ignore delete dir: %v", f)
  323. }
  324. p.model.updateLocal(p.repoCfg.ID, f)
  325. return true
  326. }
  327. if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last {
  328. // We are supposed to copy the entire file, and then fetch nothing.
  329. // We don't actually need to make the copy.
  330. if debug {
  331. l.Debugln("taking shortcut:", f)
  332. }
  333. fp := filepath.Join(p.repoCfg.Directory, f.Name)
  334. t := time.Unix(f.Modified, 0)
  335. err := os.Chtimes(fp, t, t)
  336. if debug && err != nil {
  337. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  338. }
  339. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  340. err = os.Chmod(fp, os.FileMode(f.Flags&0777))
  341. if debug && err != nil {
  342. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  343. }
  344. }
  345. p.model.updateLocal(p.repoCfg.ID, f)
  346. return true
  347. }
  348. of, ok := p.openFiles[f.Name]
  349. of.done = b.last
  350. if !ok {
  351. if debug {
  352. l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
  353. }
  354. of.availability = uint64(p.model.repoFiles[p.repoCfg.ID].Availability(f.Name))
  355. of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
  356. of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
  357. dirName := filepath.Dir(of.filepath)
  358. _, err := os.Stat(dirName)
  359. if err != nil {
  360. err = os.MkdirAll(dirName, 0777)
  361. }
  362. if err != nil {
  363. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  364. }
  365. of.file, of.err = os.Create(of.temp)
  366. if of.err != nil {
  367. if debug {
  368. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  369. }
  370. if !b.last {
  371. p.openFiles[f.Name] = of
  372. }
  373. return true
  374. }
  375. osutil.HideFile(of.temp)
  376. }
  377. if of.err != nil {
  378. // We have already failed this file.
  379. if debug {
  380. l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
  381. }
  382. if b.last {
  383. delete(p.openFiles, f.Name)
  384. }
  385. return true
  386. }
  387. p.openFiles[f.Name] = of
  388. switch {
  389. case len(b.copy) > 0:
  390. p.handleCopyBlock(b)
  391. return true
  392. case b.block.Size > 0:
  393. return p.handleRequestBlock(b)
  394. default:
  395. p.handleEmptyBlock(b)
  396. return true
  397. }
  398. }
  399. func (p *puller) handleCopyBlock(b bqBlock) {
  400. // We have blocks to copy from the existing file
  401. f := b.file
  402. of := p.openFiles[f.Name]
  403. if debug {
  404. l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
  405. }
  406. var exfd *os.File
  407. exfd, of.err = os.Open(of.filepath)
  408. if of.err != nil {
  409. if debug {
  410. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  411. }
  412. of.file.Close()
  413. of.file = nil
  414. p.openFiles[f.Name] = of
  415. return
  416. }
  417. defer exfd.Close()
  418. for _, b := range b.copy {
  419. bs := buffers.Get(int(b.Size))
  420. _, of.err = exfd.ReadAt(bs, b.Offset)
  421. if of.err == nil {
  422. _, of.err = of.file.WriteAt(bs, b.Offset)
  423. }
  424. buffers.Put(bs)
  425. if of.err != nil {
  426. if debug {
  427. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  428. }
  429. exfd.Close()
  430. of.file.Close()
  431. of.file = nil
  432. p.openFiles[f.Name] = of
  433. return
  434. }
  435. }
  436. }
  437. // handleRequestBlock tries to pull a block from the network. Returns true if
  438. // the block could _not_ be fetched (i.e. it was fully handled, matching the
  439. // return criteria of handleBlock)
  440. func (p *puller) handleRequestBlock(b bqBlock) bool {
  441. f := b.file
  442. of, ok := p.openFiles[f.Name]
  443. if !ok {
  444. panic("bug: request for non-open file")
  445. }
  446. node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.cm)
  447. if len(node) == 0 {
  448. of.err = errNoNode
  449. if of.file != nil {
  450. of.file.Close()
  451. of.file = nil
  452. os.Remove(of.temp)
  453. }
  454. if b.last {
  455. delete(p.openFiles, f.Name)
  456. } else {
  457. p.openFiles[f.Name] = of
  458. }
  459. return true
  460. }
  461. of.outstanding++
  462. p.openFiles[f.Name] = of
  463. go func(node string, b bqBlock) {
  464. if debug {
  465. l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
  466. }
  467. bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
  468. p.requestResults <- requestResult{
  469. node: node,
  470. file: f,
  471. filepath: of.filepath,
  472. offset: b.block.Offset,
  473. data: bs,
  474. err: err,
  475. }
  476. }(node, b)
  477. return false
  478. }
  479. func (p *puller) handleEmptyBlock(b bqBlock) {
  480. f := b.file
  481. of := p.openFiles[f.Name]
  482. if b.last {
  483. if of.err == nil {
  484. of.file.Close()
  485. }
  486. }
  487. if protocol.IsDeleted(f.Flags) {
  488. if debug {
  489. l.Debugf("pull: delete %q", f.Name)
  490. }
  491. os.Remove(of.temp)
  492. os.Chmod(of.filepath, 0666)
  493. if p.versioner != nil {
  494. if err := p.versioner.Archive(of.filepath); err == nil {
  495. p.model.updateLocal(p.repoCfg.ID, f)
  496. }
  497. } else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
  498. p.model.updateLocal(p.repoCfg.ID, f)
  499. }
  500. } else {
  501. if debug {
  502. l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
  503. }
  504. t := time.Unix(f.Modified, 0)
  505. if os.Chtimes(of.temp, t, t) != nil {
  506. delete(p.openFiles, f.Name)
  507. return
  508. }
  509. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
  510. delete(p.openFiles, f.Name)
  511. return
  512. }
  513. osutil.ShowFile(of.temp)
  514. if osutil.Rename(of.temp, of.filepath) == nil {
  515. p.model.updateLocal(p.repoCfg.ID, f)
  516. }
  517. }
  518. delete(p.openFiles, f.Name)
  519. }
  520. func (p *puller) queueNeededBlocks() {
  521. queued := 0
  522. for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
  523. lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
  524. have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
  525. if debug {
  526. l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
  527. }
  528. queued++
  529. p.bq.put(bqAdd{
  530. file: f,
  531. have: have,
  532. need: need,
  533. })
  534. }
  535. if debug && queued > 0 {
  536. l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
  537. }
  538. }
  539. func (p *puller) closeFile(f scanner.File) {
  540. if debug {
  541. l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
  542. }
  543. of := p.openFiles[f.Name]
  544. of.file.Close()
  545. defer os.Remove(of.temp)
  546. delete(p.openFiles, f.Name)
  547. fd, err := os.Open(of.temp)
  548. if err != nil {
  549. if debug {
  550. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  551. }
  552. return
  553. }
  554. hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
  555. fd.Close()
  556. if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
  557. if debug {
  558. l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
  559. }
  560. return
  561. }
  562. for i := range hb {
  563. if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
  564. l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
  565. return
  566. }
  567. }
  568. t := time.Unix(f.Modified, 0)
  569. err = os.Chtimes(of.temp, t, t)
  570. if debug && err != nil {
  571. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  572. }
  573. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  574. err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
  575. if debug && err != nil {
  576. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  577. }
  578. }
  579. osutil.ShowFile(of.temp)
  580. if p.versioner != nil {
  581. err := p.versioner.Archive(of.filepath)
  582. if err != nil {
  583. if debug {
  584. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  585. }
  586. return
  587. }
  588. }
  589. if debug {
  590. l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
  591. }
  592. if err := osutil.Rename(of.temp, of.filepath); err == nil {
  593. p.model.updateLocal(p.repoCfg.ID, f)
  594. } else {
  595. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  596. }
  597. }
  598. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  599. for i := range cfg.Repositories {
  600. repo := &cfg.Repositories[i]
  601. if repo.ID == repoID {
  602. repo.Invalid = err.Error()
  603. return
  604. }
  605. }
  606. }