puller.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. package model
  2. import (
  3. "bytes"
  4. "errors"
  5. "os"
  6. "path/filepath"
  7. "time"
  8. "github.com/calmh/syncthing/buffers"
  9. "github.com/calmh/syncthing/cid"
  10. "github.com/calmh/syncthing/config"
  11. "github.com/calmh/syncthing/protocol"
  12. "github.com/calmh/syncthing/scanner"
  13. )
  14. type requestResult struct {
  15. node string
  16. file scanner.File
  17. filepath string // full filepath name
  18. offset int64
  19. data []byte
  20. err error
  21. }
  22. type openFile struct {
  23. filepath string // full filepath name
  24. temp string // temporary filename
  25. availability uint64 // availability bitset
  26. file *os.File
  27. err error // error when opening or writing to file, all following operations are cancelled
  28. outstanding int // number of requests we still have outstanding
  29. done bool // we have sent all requests for this file
  30. }
  31. type activityMap map[string]int
  32. func (m activityMap) leastBusyNode(availability uint64, cm *cid.Map) string {
  33. var low int = 2<<30 - 1
  34. var selected string
  35. for _, node := range cm.Names() {
  36. id := cm.Get(node)
  37. if id == cid.LocalID {
  38. continue
  39. }
  40. usage := m[node]
  41. if availability&(1<<id) != 0 {
  42. if usage < low {
  43. low = usage
  44. selected = node
  45. }
  46. }
  47. }
  48. m[selected]++
  49. return selected
  50. }
  51. func (m activityMap) decrease(node string) {
  52. m[node]--
  53. }
  54. var errNoNode = errors.New("no available source node")
  55. type puller struct {
  56. cfg *config.Configuration
  57. repo string
  58. dir string
  59. bq *blockQueue
  60. model *Model
  61. oustandingPerNode activityMap
  62. openFiles map[string]openFile
  63. requestSlots chan bool
  64. blocks chan bqBlock
  65. requestResults chan requestResult
  66. }
  67. func newPuller(repo, dir string, model *Model, slots int, cfg *config.Configuration) *puller {
  68. p := &puller{
  69. cfg: cfg,
  70. repo: repo,
  71. dir: dir,
  72. bq: newBlockQueue(),
  73. model: model,
  74. oustandingPerNode: make(activityMap),
  75. openFiles: make(map[string]openFile),
  76. requestSlots: make(chan bool, slots),
  77. blocks: make(chan bqBlock),
  78. requestResults: make(chan requestResult),
  79. }
  80. if slots > 0 {
  81. // Read/write
  82. for i := 0; i < slots; i++ {
  83. p.requestSlots <- true
  84. }
  85. if debug {
  86. l.Debugf("starting puller; repo %q dir %q slots %d", repo, dir, slots)
  87. }
  88. go p.run()
  89. } else {
  90. // Read only
  91. if debug {
  92. l.Debugf("starting puller; repo %q dir %q (read only)", repo, dir)
  93. }
  94. go p.runRO()
  95. }
  96. return p
  97. }
  98. func (p *puller) run() {
  99. go func() {
  100. // fill blocks queue when there are free slots
  101. for {
  102. <-p.requestSlots
  103. b := p.bq.get()
  104. if debug {
  105. l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repo, b.file.Name, b.block.Offset, len(b.copy))
  106. }
  107. p.blocks <- b
  108. }
  109. }()
  110. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  111. timeout := time.Tick(5 * time.Second)
  112. changed := true
  113. for {
  114. // Run the pulling loop as long as there are blocks to fetch
  115. pull:
  116. for {
  117. select {
  118. case res := <-p.requestResults:
  119. p.model.setState(p.repo, RepoSyncing)
  120. changed = true
  121. p.requestSlots <- true
  122. p.handleRequestResult(res)
  123. case b := <-p.blocks:
  124. p.model.setState(p.repo, RepoSyncing)
  125. changed = true
  126. if p.handleBlock(b) {
  127. // Block was fully handled, free up the slot
  128. p.requestSlots <- true
  129. }
  130. case <-timeout:
  131. if len(p.openFiles) == 0 && p.bq.empty() {
  132. // Nothing more to do for the moment
  133. break pull
  134. }
  135. if debug {
  136. l.Debugf("%q: idle but have %d open files", p.repo, len(p.openFiles))
  137. i := 5
  138. for _, f := range p.openFiles {
  139. l.Debugf(" %v", f)
  140. i--
  141. if i == 0 {
  142. break
  143. }
  144. }
  145. }
  146. }
  147. }
  148. if changed {
  149. p.model.setState(p.repo, RepoCleaning)
  150. p.fixupDirectories()
  151. changed = false
  152. }
  153. p.model.setState(p.repo, RepoIdle)
  154. // Do a rescan if it's time for it
  155. select {
  156. case <-walkTicker:
  157. if debug {
  158. l.Debugf("%q: time for rescan", p.repo)
  159. }
  160. err := p.model.ScanRepo(p.repo)
  161. if err != nil {
  162. invalidateRepo(p.cfg, p.repo, err)
  163. return
  164. }
  165. default:
  166. }
  167. // Queue more blocks to fetch, if any
  168. p.queueNeededBlocks()
  169. }
  170. }
  171. func (p *puller) runRO() {
  172. walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
  173. for _ = range walkTicker {
  174. if debug {
  175. l.Debugf("%q: time for rescan", p.repo)
  176. }
  177. err := p.model.ScanRepo(p.repo)
  178. if err != nil {
  179. invalidateRepo(p.cfg, p.repo, err)
  180. return
  181. }
  182. }
  183. }
  184. func (p *puller) fixupDirectories() {
  185. var deleteDirs []string
  186. var changed = 0
  187. var walkFn = func(path string, info os.FileInfo, err error) error {
  188. if !info.IsDir() {
  189. return nil
  190. }
  191. rn, err := filepath.Rel(p.dir, path)
  192. if err != nil {
  193. return nil
  194. }
  195. if rn == "." {
  196. return nil
  197. }
  198. cur := p.model.CurrentRepoFile(p.repo, rn)
  199. if cur.Name != rn {
  200. // No matching dir in current list; weird
  201. if debug {
  202. l.Debugf("missing dir: %s; %v", rn, cur)
  203. }
  204. return nil
  205. }
  206. if protocol.IsDeleted(cur.Flags) {
  207. if debug {
  208. l.Debugf("queue delete dir: %v", cur)
  209. }
  210. // We queue the directories to delete since we walk the
  211. // tree in depth first order and need to remove the
  212. // directories in the opposite order.
  213. deleteDirs = append(deleteDirs, path)
  214. return nil
  215. }
  216. if !permsEqual(cur.Flags, uint32(info.Mode())) {
  217. err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
  218. if err != nil {
  219. l.Warnln("Restoring folder flags: %q: %v", path, err)
  220. } else {
  221. changed++
  222. if debug {
  223. l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
  224. }
  225. }
  226. }
  227. if cur.Modified != info.ModTime().Unix() {
  228. t := time.Unix(cur.Modified, 0)
  229. err := os.Chtimes(path, t, t)
  230. if err != nil {
  231. l.Warnln("Restoring folder modtime: %q: %v", path, err)
  232. } else {
  233. changed++
  234. if debug {
  235. l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
  236. }
  237. }
  238. }
  239. return nil
  240. }
  241. for {
  242. deleteDirs = nil
  243. changed = 0
  244. filepath.Walk(p.dir, walkFn)
  245. var deleted = 0
  246. // Delete any queued directories
  247. for i := len(deleteDirs) - 1; i >= 0; i-- {
  248. dir := deleteDirs[i]
  249. if debug {
  250. l.Debugln("delete dir:", dir)
  251. }
  252. err := os.Remove(dir)
  253. if err != nil {
  254. l.Warnln(err)
  255. } else {
  256. deleted++
  257. }
  258. }
  259. if debug {
  260. l.Debugf("changed %d, deleted %d dirs", changed, deleted)
  261. }
  262. if changed+deleted == 0 {
  263. return
  264. }
  265. }
  266. }
  267. func (p *puller) handleRequestResult(res requestResult) {
  268. p.oustandingPerNode.decrease(res.node)
  269. f := res.file
  270. of, ok := p.openFiles[f.Name]
  271. if !ok || of.err != nil {
  272. // no entry in openFiles means there was an error and we've cancelled the operation
  273. return
  274. }
  275. _, of.err = of.file.WriteAt(res.data, res.offset)
  276. buffers.Put(res.data)
  277. of.outstanding--
  278. p.openFiles[f.Name] = of
  279. if debug {
  280. l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repo, f.Name, res.offset, of.outstanding, of.done)
  281. }
  282. if of.done && of.outstanding == 0 {
  283. p.closeFile(f)
  284. }
  285. }
  286. // handleBlock fulfills the block request by copying, ignoring or fetching
  287. // from the network. Returns true if the block was fully handled
  288. // synchronously, i.e. if the slot can be reused.
  289. func (p *puller) handleBlock(b bqBlock) bool {
  290. f := b.file
  291. // For directories, making sure they exist is enough.
  292. // Deleted directories we mark as handled and delete later.
  293. if protocol.IsDirectory(f.Flags) {
  294. if !protocol.IsDeleted(f.Flags) {
  295. path := filepath.Join(p.dir, f.Name)
  296. _, err := os.Stat(path)
  297. if err != nil && os.IsNotExist(err) {
  298. if debug {
  299. l.Debugf("create dir: %v", f)
  300. }
  301. err = os.MkdirAll(path, 0777)
  302. if err != nil {
  303. l.Warnf("Create folder: %q: %v", path, err)
  304. }
  305. }
  306. } else if debug {
  307. l.Debugf("ignore delete dir: %v", f)
  308. }
  309. p.model.updateLocal(p.repo, f)
  310. return true
  311. }
  312. of, ok := p.openFiles[f.Name]
  313. of.done = b.last
  314. if !ok {
  315. if debug {
  316. l.Debugf("pull: %q: opening file %q", p.repo, f.Name)
  317. }
  318. of.availability = uint64(p.model.repoFiles[p.repo].Availability(f.Name))
  319. of.filepath = filepath.Join(p.dir, f.Name)
  320. of.temp = filepath.Join(p.dir, defTempNamer.TempName(f.Name))
  321. dirName := filepath.Dir(of.filepath)
  322. _, err := os.Stat(dirName)
  323. if err != nil {
  324. err = os.MkdirAll(dirName, 0777)
  325. }
  326. if err != nil {
  327. l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err)
  328. }
  329. of.file, of.err = os.Create(of.temp)
  330. if of.err != nil {
  331. if debug {
  332. l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
  333. }
  334. if !b.last {
  335. p.openFiles[f.Name] = of
  336. }
  337. return true
  338. }
  339. defTempNamer.Hide(of.temp)
  340. }
  341. if of.err != nil {
  342. // We have already failed this file.
  343. if debug {
  344. l.Debugf("pull: error: %q / %q has already failed: %v", p.repo, f.Name, of.err)
  345. }
  346. if b.last {
  347. delete(p.openFiles, f.Name)
  348. }
  349. return true
  350. }
  351. p.openFiles[f.Name] = of
  352. switch {
  353. case len(b.copy) > 0:
  354. p.handleCopyBlock(b)
  355. return true
  356. case b.block.Size > 0:
  357. return p.handleRequestBlock(b)
  358. default:
  359. p.handleEmptyBlock(b)
  360. return true
  361. }
  362. }
  363. func (p *puller) handleCopyBlock(b bqBlock) {
  364. // We have blocks to copy from the existing file
  365. f := b.file
  366. of := p.openFiles[f.Name]
  367. if debug {
  368. l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repo, f.Name)
  369. }
  370. var exfd *os.File
  371. exfd, of.err = os.Open(of.filepath)
  372. if of.err != nil {
  373. if debug {
  374. l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
  375. }
  376. of.file.Close()
  377. of.file = nil
  378. p.openFiles[f.Name] = of
  379. return
  380. }
  381. defer exfd.Close()
  382. for _, b := range b.copy {
  383. bs := buffers.Get(int(b.Size))
  384. _, of.err = exfd.ReadAt(bs, b.Offset)
  385. if of.err == nil {
  386. _, of.err = of.file.WriteAt(bs, b.Offset)
  387. }
  388. buffers.Put(bs)
  389. if of.err != nil {
  390. if debug {
  391. l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
  392. }
  393. exfd.Close()
  394. of.file.Close()
  395. of.file = nil
  396. p.openFiles[f.Name] = of
  397. return
  398. }
  399. }
  400. }
  401. // handleRequestBlock tries to pull a block from the network. Returns true if
  402. // the block could _not_ be fetched (i.e. it was fully handled, matching the
  403. // return criteria of handleBlock)
  404. func (p *puller) handleRequestBlock(b bqBlock) bool {
  405. f := b.file
  406. of, ok := p.openFiles[f.Name]
  407. if !ok {
  408. panic("bug: request for non-open file")
  409. }
  410. node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.cm)
  411. if len(node) == 0 {
  412. of.err = errNoNode
  413. if of.file != nil {
  414. of.file.Close()
  415. of.file = nil
  416. os.Remove(of.temp)
  417. }
  418. if b.last {
  419. delete(p.openFiles, f.Name)
  420. } else {
  421. p.openFiles[f.Name] = of
  422. }
  423. return true
  424. }
  425. of.outstanding++
  426. p.openFiles[f.Name] = of
  427. go func(node string, b bqBlock) {
  428. if debug {
  429. l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repo, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
  430. }
  431. bs, err := p.model.requestGlobal(node, p.repo, f.Name, b.block.Offset, int(b.block.Size), nil)
  432. p.requestResults <- requestResult{
  433. node: node,
  434. file: f,
  435. filepath: of.filepath,
  436. offset: b.block.Offset,
  437. data: bs,
  438. err: err,
  439. }
  440. }(node, b)
  441. return false
  442. }
  443. func (p *puller) handleEmptyBlock(b bqBlock) {
  444. f := b.file
  445. of := p.openFiles[f.Name]
  446. if b.last {
  447. if of.err == nil {
  448. of.file.Close()
  449. }
  450. }
  451. if protocol.IsDeleted(f.Flags) {
  452. if debug {
  453. l.Debugf("pull: delete %q", f.Name)
  454. }
  455. os.Remove(of.temp)
  456. os.Chmod(of.filepath, 0666)
  457. if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) {
  458. p.model.updateLocal(p.repo, f)
  459. }
  460. } else {
  461. if debug {
  462. l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repo, f.Name)
  463. }
  464. t := time.Unix(f.Modified, 0)
  465. if os.Chtimes(of.temp, t, t) != nil {
  466. delete(p.openFiles, f.Name)
  467. return
  468. }
  469. if os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil {
  470. delete(p.openFiles, f.Name)
  471. return
  472. }
  473. defTempNamer.Show(of.temp)
  474. if Rename(of.temp, of.filepath) == nil {
  475. p.model.updateLocal(p.repo, f)
  476. }
  477. }
  478. delete(p.openFiles, f.Name)
  479. }
  480. func (p *puller) queueNeededBlocks() {
  481. queued := 0
  482. for _, f := range p.model.NeedFilesRepo(p.repo) {
  483. lf := p.model.CurrentRepoFile(p.repo, f.Name)
  484. have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
  485. if debug {
  486. l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
  487. }
  488. queued++
  489. p.bq.put(bqAdd{
  490. file: f,
  491. have: have,
  492. need: need,
  493. })
  494. }
  495. if debug && queued > 0 {
  496. l.Debugf("%q: queued %d blocks", p.repo, queued)
  497. }
  498. }
  499. func (p *puller) closeFile(f scanner.File) {
  500. if debug {
  501. l.Debugf("pull: closing %q / %q", p.repo, f.Name)
  502. }
  503. of := p.openFiles[f.Name]
  504. of.file.Close()
  505. defer os.Remove(of.temp)
  506. delete(p.openFiles, f.Name)
  507. fd, err := os.Open(of.temp)
  508. if err != nil {
  509. if debug {
  510. l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err)
  511. }
  512. return
  513. }
  514. hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
  515. fd.Close()
  516. if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
  517. if debug {
  518. l.Debugf("pull: %q / %q: nblocks %d != %d", p.repo, f.Name, l0, l1)
  519. }
  520. return
  521. }
  522. for i := range hb {
  523. if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
  524. l.Debugf("pull: %q / %q: block %d hash mismatch", p.repo, f.Name, i)
  525. return
  526. }
  527. }
  528. t := time.Unix(f.Modified, 0)
  529. os.Chtimes(of.temp, t, t)
  530. os.Chmod(of.temp, os.FileMode(f.Flags&0777))
  531. defTempNamer.Show(of.temp)
  532. if debug {
  533. l.Debugf("pull: rename %q / %q: %q", p.repo, f.Name, of.filepath)
  534. }
  535. if err := Rename(of.temp, of.filepath); err == nil {
  536. p.model.updateLocal(p.repo, f)
  537. } else {
  538. l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err)
  539. }
  540. }
  541. func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
  542. for i := range cfg.Repositories {
  543. repo := &cfg.Repositories[i]
  544. if repo.ID == repoID {
  545. repo.Invalid = err.Error()
  546. return
  547. }
  548. }
  549. }