puller.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703
  1. // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
  2. // Use of this source code is governed by an MIT-style license that can be
  3. // found in the LICENSE file.
  4. package model
  5. import (
  6. "bytes"
  7. "errors"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "time"
  12. "github.com/calmh/syncthing/cid"
  13. "github.com/calmh/syncthing/config"
  14. "github.com/calmh/syncthing/osutil"
  15. "github.com/calmh/syncthing/protocol"
  16. "github.com/calmh/syncthing/scanner"
  17. "github.com/calmh/syncthing/versioner"
  18. )
  19. type requestResult struct {
  20. node string
  21. file scanner.File
  22. filepath string // full filepath name
  23. offset int64
  24. data []byte
  25. err error
  26. }
  27. type openFile struct {
  28. filepath string // full filepath name
  29. temp string // temporary filename
  30. availability uint64 // availability bitset
  31. file *os.File
  32. err error // error when opening or writing to file, all following operations are cancelled
  33. outstanding int // number of requests we still have outstanding
  34. done bool // we have sent all requests for this file
  35. }
  36. type activityMap map[string]int
  37. func (m activityMap) leastBusyNode(availability uint64, cm *cid.Map) string {
  38. var low int = 2<<30 - 1
  39. var selected string
  40. for _, node := range cm.Names() {
  41. id := cm.Get(node)
  42. if id == cid.LocalID {
  43. continue
  44. }
  45. usage := m[node]
  46. if availability&(1<<id) != 0 {
  47. if usage < low {
  48. low = usage
  49. selected = node
  50. }
  51. }
  52. }
  53. m[selected]++
  54. return selected
  55. }
  56. func (m activityMap) decrease(node string) {
  57. m[node]--
  58. }
  59. var errNoNode = errors.New("no available source node")
  60. type puller struct {
  61. cfg *config.Configuration
  62. repoCfg config.RepositoryConfiguration
  63. bq *blockQueue
  64. model *Model
  65. oustandingPerNode activityMap
  66. openFiles map[string]openFile
  67. requestSlots chan bool
  68. blocks chan bqBlock
  69. requestResults chan requestResult
  70. versioner versioner.Versioner
  71. }
  72. func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller {
  73. p := &puller{
  74. repoCfg: repoCfg,
  75. cfg: cfg,
  76. bq: newBlockQueue(),
  77. model: model,
  78. oustandingPerNode: make(activityMap),
  79. openFiles: make(map[string]openFile),
  80. requestSlots: make(chan bool, slots),
  81. blocks: make(chan bqBlock),
  82. requestResults: make(chan requestResult),
  83. }
  84. if len(repoCfg.Versioning.Type) > 0 {
  85. factory, ok := versioner.Factories[repoCfg.Versioning.Type]
  86. if !ok {
  87. l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type)
  88. }
  89. p.versioner = factory(repoCfg.Versioning.Params)
  90. }
  91. if slots > 0 {
  92. // Read/write
  93. for i := 0; i < slots; i++ {
  94. p.requestSlots <- true
  95. }
  96. if debug {
  97. l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots)
  98. }
  99. go p.run()
  100. } else {
  101. // Read only
  102. if debug {
  103. l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory)
  104. }
  105. go p.runRO()
  106. }
  107. return p
  108. }
  109. func (p *puller) run() {
  110. go func() {
  111. // fill blocks queue when there are free slots
  112. for {
  113. <-p.requestSlots
  114. b := p.bq.get()
  115. if debug {
  116. l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy))
  117. }
  118. p.blocks <- b
  119. }
  120. }()
  121. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  122. timeout := time.Tick(5 * time.Second)
  123. changed := true
  124. var prevVer uint64
  125. for {
  126. // Run the pulling loop as long as there are blocks to fetch
  127. pull:
  128. for {
  129. select {
  130. case res := <-p.requestResults:
  131. p.model.setState(p.repoCfg.ID, RepoSyncing)
  132. changed = true
  133. p.requestSlots <- true
  134. p.handleRequestResult(res)
  135. case b := <-p.blocks:
  136. p.model.setState(p.repoCfg.ID, RepoSyncing)
  137. changed = true
  138. if p.handleBlock(b) {
  139. // Block was fully handled, free up the slot
  140. p.requestSlots <- true
  141. }
  142. case <-timeout:
  143. if len(p.openFiles) == 0 && p.bq.empty() {
  144. // Nothing more to do for the moment
  145. break pull
  146. }
  147. if debug {
  148. l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles))
  149. i := 5
  150. for _, f := range p.openFiles {
  151. l.Debugf(" %v", f)
  152. i--
  153. if i == 0 {
  154. break
  155. }
  156. }
  157. }
  158. }
  159. }
  160. if changed {
  161. p.model.setState(p.repoCfg.ID, RepoCleaning)
  162. p.fixupDirectories()
  163. changed = false
  164. }
  165. p.model.setState(p.repoCfg.ID, RepoIdle)
  166. // Do a rescan if it's time for it
  167. select {
  168. case <-walkTicker:
  169. if debug {
  170. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  171. }
  172. err := p.model.ScanRepo(p.repoCfg.ID)
  173. if err != nil {
  174. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  175. return
  176. }
  177. default:
  178. }
  179. if v := p.model.Version(p.repoCfg.ID); v > prevVer {
  180. // Queue more blocks to fetch, if any
  181. p.queueNeededBlocks()
  182. prevVer = v
  183. }
  184. }
  185. }
  186. func (p *puller) runRO() {
  187. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  188. for _ = range walkTicker {
  189. if debug {
  190. l.Debugf("%q: time for rescan", p.repoCfg.ID)
  191. }
  192. err := p.model.ScanRepo(p.repoCfg.ID)
  193. if err != nil {
  194. invalidateRepo(p.cfg, p.repoCfg.ID, err)
  195. return
  196. }
  197. }
  198. }
  199. func (p *puller) fixupDirectories() {
  200. var deleteDirs []string
  201. var changed = 0
  202. var walkFn = func(path string, info os.FileInfo, err error) error {
  203. if err != nil {
  204. return err
  205. }
  206. if !info.IsDir() {
  207. return nil
  208. }
  209. rn, err := filepath.Rel(p.repoCfg.Directory, path)
  210. if err != nil {
  211. return nil
  212. }
  213. if rn == "." {
  214. return nil
  215. }
  216. if filepath.Base(rn) == ".stversions" {
  217. return nil
  218. }
  219. cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn)
  220. if cur.Name != rn {
  221. // No matching dir in current list; weird
  222. if debug {
  223. l.Debugf("missing dir: %s; %v", rn, cur)
  224. }
  225. return nil
  226. }
  227. if protocol.IsDeleted(cur.Flags) {
  228. if debug {
  229. l.Debugf("queue delete dir: %v", cur)
  230. }
  231. // We queue the directories to delete since we walk the
  232. // tree in depth first order and need to remove the
  233. // directories in the opposite order.
  234. deleteDirs = append(deleteDirs, path)
  235. return nil
  236. }
  237. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) {
  238. err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
  239. if err != nil {
  240. l.Warnf("Restoring folder flags: %q: %v", path, err)
  241. } else {
  242. changed++
  243. if debug {
  244. l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
  245. }
  246. }
  247. }
  248. if cur.Modified != info.ModTime().Unix() {
  249. t := time.Unix(cur.Modified, 0)
  250. err := os.Chtimes(path, t, t)
  251. if err != nil {
  252. if runtime.GOOS != "windows" {
  253. // https://code.google.com/p/go/issues/detail?id=8090
  254. l.Warnf("Restoring folder modtime: %q: %v", path, err)
  255. }
  256. } else {
  257. changed++
  258. if debug {
  259. l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
  260. }
  261. }
  262. }
  263. return nil
  264. }
  265. for {
  266. deleteDirs = nil
  267. changed = 0
  268. filepath.Walk(p.repoCfg.Directory, walkFn)
  269. var deleted = 0
  270. // Delete any queued directories
  271. for i := len(deleteDirs) - 1; i >= 0; i-- {
  272. dir := deleteDirs[i]
  273. if debug {
  274. l.Debugln("delete dir:", dir)
  275. }
  276. err := os.Remove(dir)
  277. if err == nil {
  278. deleted++
  279. } else if p.versioner == nil { // Failures are expected in the presence of versioning
  280. l.Warnln(err)
  281. }
  282. }
  283. if debug {
  284. l.Debugf("changed %d, deleted %d dirs", changed, deleted)
  285. }
  286. if changed+deleted == 0 {
  287. return
  288. }
  289. }
  290. }
  291. func (p *puller) handleRequestResult(res requestResult) {
  292. p.oustandingPerNode.decrease(res.node)
  293. f := res.file
  294. of, ok := p.openFiles[f.Name]
  295. if !ok || of.err != nil {
  296. // no entry in openFiles means there was an error and we've cancelled the operation
  297. return
  298. }
  299. _, of.err = of.file.WriteAt(res.data, res.offset)
  300. of.outstanding--
  301. p.openFiles[f.Name] = of
  302. if debug {
  303. l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
  304. }
  305. if of.done && of.outstanding == 0 {
  306. p.closeFile(f)
  307. }
  308. }
  309. // handleBlock fulfills the block request by copying, ignoring or fetching
  310. // from the network. Returns true if the block was fully handled
  311. // synchronously, i.e. if the slot can be reused.
  312. func (p *puller) handleBlock(b bqBlock) bool {
  313. f := b.file
  314. // For directories, making sure they exist is enough.
  315. // Deleted directories we mark as handled and delete later.
  316. if protocol.IsDirectory(f.Flags) {
  317. if !protocol.IsDeleted(f.Flags) {
  318. path := filepath.Join(p.repoCfg.Directory, f.Name)
  319. _, err := os.Stat(path)
  320. if err != nil && os.IsNotExist(err) {
  321. if debug {
  322. l.Debugf("create dir: %v", f)
  323. }
  324. err = os.MkdirAll(path, 0777)
  325. if err != nil {
  326. l.Warnf("Create folder: %q: %v", path, err)
  327. }
  328. }
  329. } else if debug {
  330. l.Debugf("ignore delete dir: %v", f)
  331. }
  332. p.model.updateLocal(p.repoCfg.ID, f)
  333. return true
  334. }
  335. if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last {
  336. // We are supposed to copy the entire file, and then fetch nothing.
  337. // We don't actually need to make the copy.
  338. if debug {
  339. l.Debugln("taking shortcut:", f)
  340. }
  341. fp := filepath.Join(p.repoCfg.Directory, f.Name)
  342. t := time.Unix(f.Modified, 0)
  343. err := os.Chtimes(fp, t, t)
  344. if debug && err != nil {
  345. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  346. }
  347. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  348. err = os.Chmod(fp, os.FileMode(f.Flags&0777))
  349. if debug && err != nil {
  350. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  351. }
  352. }
  353. p.model.updateLocal(p.repoCfg.ID, f)
  354. return true
  355. }
  356. of, ok := p.openFiles[f.Name]
  357. of.done = b.last
  358. if !ok {
  359. if debug {
  360. l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
  361. }
  362. of.availability = uint64(p.model.repoFiles[p.repoCfg.ID].Availability(f.Name))
  363. of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
  364. of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))
  365. dirName := filepath.Dir(of.filepath)
  366. _, err := os.Stat(dirName)
  367. if err != nil {
  368. err = os.MkdirAll(dirName, 0777)
  369. }
  370. if err != nil {
  371. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  372. }
  373. of.file, of.err = os.Create(of.temp)
  374. if of.err != nil {
  375. if debug {
  376. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  377. }
  378. if !b.last {
  379. p.openFiles[f.Name] = of
  380. }
  381. return true
  382. }
  383. osutil.HideFile(of.temp)
  384. }
  385. if of.err != nil {
  386. // We have already failed this file.
  387. if debug {
  388. l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err)
  389. }
  390. if b.last {
  391. delete(p.openFiles, f.Name)
  392. }
  393. return true
  394. }
  395. p.openFiles[f.Name] = of
  396. switch {
  397. case len(b.copy) > 0:
  398. p.handleCopyBlock(b)
  399. return true
  400. case b.block.Size > 0:
  401. return p.handleRequestBlock(b)
  402. default:
  403. p.handleEmptyBlock(b)
  404. return true
  405. }
  406. }
  407. func (p *puller) handleCopyBlock(b bqBlock) {
  408. // We have blocks to copy from the existing file
  409. f := b.file
  410. of := p.openFiles[f.Name]
  411. if debug {
  412. l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name)
  413. }
  414. var exfd *os.File
  415. exfd, of.err = os.Open(of.filepath)
  416. if of.err != nil {
  417. if debug {
  418. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  419. }
  420. of.file.Close()
  421. of.file = nil
  422. p.openFiles[f.Name] = of
  423. return
  424. }
  425. defer exfd.Close()
  426. for _, b := range b.copy {
  427. bs := make([]byte, b.Size)
  428. _, of.err = exfd.ReadAt(bs, b.Offset)
  429. if of.err == nil {
  430. _, of.err = of.file.WriteAt(bs, b.Offset)
  431. }
  432. if of.err != nil {
  433. if debug {
  434. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err)
  435. }
  436. exfd.Close()
  437. of.file.Close()
  438. of.file = nil
  439. p.openFiles[f.Name] = of
  440. return
  441. }
  442. }
  443. }
  444. // handleRequestBlock tries to pull a block from the network. Returns true if
  445. // the block could _not_ be fetched (i.e. it was fully handled, matching the
  446. // return criteria of handleBlock)
  447. func (p *puller) handleRequestBlock(b bqBlock) bool {
  448. f := b.file
  449. of, ok := p.openFiles[f.Name]
  450. if !ok {
  451. panic("bug: request for non-open file")
  452. }
  453. node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.cm)
  454. if len(node) == 0 {
  455. of.err = errNoNode
  456. if of.file != nil {
  457. of.file.Close()
  458. of.file = nil
  459. os.Remove(of.temp)
  460. }
  461. if b.last {
  462. delete(p.openFiles, f.Name)
  463. } else {
  464. p.openFiles[f.Name] = of
  465. }
  466. return true
  467. }
  468. of.outstanding++
  469. p.openFiles[f.Name] = of
  470. go func(node string, b bqBlock) {
  471. if debug {
  472. l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
  473. }
  474. bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil)
  475. p.requestResults <- requestResult{
  476. node: node,
  477. file: f,
  478. filepath: of.filepath,
  479. offset: b.block.Offset,
  480. data: bs,
  481. err: err,
  482. }
  483. }(node, b)
  484. return false
  485. }
  486. func (p *puller) handleEmptyBlock(b bqBlock) {
  487. f := b.file
  488. of := p.openFiles[f.Name]
  489. if b.last {
  490. if of.err == nil {
  491. of.file.Close()
  492. }
  493. }
  494. if protocol.IsDeleted(f.Flags) {
  495. if debug {
  496. l.Debugf("pull: delete %q", f.Name)
  497. }
  498. os.Remove(of.temp)
  499. os.Chmod(of.filepath, 0666)
  500. if p.versioner != nil {
  501. if err := p.versioner.Archive(of.filepath); err == nil {
  502. p.model.updateLocal(p.repoCfg.ID, f)
  503. }
  504. } else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
  505. p.model.updateLocal(p.repoCfg.ID, f)
  506. }
  507. } else {
  508. if debug {
  509. l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name)
  510. }
  511. t := time.Unix(f.Modified, 0)
  512. if os.Chtimes(of.temp, t, t) != nil {
  513. delete(p.openFiles, f.Name)
  514. return
  515. }
  516. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
  517. delete(p.openFiles, f.Name)
  518. return
  519. }
  520. osutil.ShowFile(of.temp)
  521. if osutil.Rename(of.temp, of.filepath) == nil {
  522. p.model.updateLocal(p.repoCfg.ID, f)
  523. }
  524. }
  525. delete(p.openFiles, f.Name)
  526. }
  527. func (p *puller) queueNeededBlocks() {
  528. queued := 0
  529. for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) {
  530. lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name)
  531. have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
  532. if debug {
  533. l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
  534. }
  535. queued++
  536. p.bq.put(bqAdd{
  537. file: f,
  538. have: have,
  539. need: need,
  540. })
  541. }
  542. if debug && queued > 0 {
  543. l.Debugf("%q: queued %d blocks", p.repoCfg.ID, queued)
  544. }
  545. }
  546. func (p *puller) closeFile(f scanner.File) {
  547. if debug {
  548. l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name)
  549. }
  550. of := p.openFiles[f.Name]
  551. of.file.Close()
  552. defer os.Remove(of.temp)
  553. delete(p.openFiles, f.Name)
  554. fd, err := os.Open(of.temp)
  555. if err != nil {
  556. if debug {
  557. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  558. }
  559. return
  560. }
  561. hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
  562. fd.Close()
  563. if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
  564. if debug {
  565. l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1)
  566. }
  567. return
  568. }
  569. for i := range hb {
  570. if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
  571. l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
  572. return
  573. }
  574. }
  575. t := time.Unix(f.Modified, 0)
  576. err = os.Chtimes(of.temp, t, t)
  577. if debug && err != nil {
  578. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  579. }
  580. if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) {
  581. err = os.Chmod(of.temp, os.FileMode(f.Flags&0777))
  582. if debug && err != nil {
  583. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  584. }
  585. }
  586. osutil.ShowFile(of.temp)
  587. if p.versioner != nil {
  588. err := p.versioner.Archive(of.filepath)
  589. if err != nil {
  590. if debug {
  591. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  592. }
  593. return
  594. }
  595. }
  596. if debug {
  597. l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath)
  598. }
  599. if err := osutil.Rename(of.temp, of.filepath); err == nil {
  600. p.model.updateLocal(p.repoCfg.ID, f)
  601. } else {
  602. l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err)
  603. }
  604. }
  605. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  606. for i := range cfg.Repositories {
  607. repo := &cfg.Repositories[i]
  608. if repo.ID == repoID {
  609. repo.Invalid = err.Error()
  610. return
  611. }
  612. }
  613. }