|
|
@@ -119,6 +119,7 @@ func (p *puller) run() {
|
|
|
|
|
|
walkTicker := time.Tick(time.Duration(cfg.Options.RescanIntervalS) * time.Second)
|
|
|
timeout := time.Tick(5 * time.Second)
|
|
|
+ changed := true
|
|
|
|
|
|
for {
|
|
|
// Run the pulling loop as long as there are blocks to fetch
|
|
|
@@ -126,16 +127,15 @@ func (p *puller) run() {
|
|
|
for {
|
|
|
select {
|
|
|
case res := <-p.requestResults:
|
|
|
+ changed = true
|
|
|
p.requestSlots <- true
|
|
|
p.handleRequestResult(res)
|
|
|
|
|
|
case b := <-p.blocks:
|
|
|
+ changed = true
|
|
|
p.handleBlock(b)
|
|
|
|
|
|
case <-timeout:
|
|
|
- if debugPull {
|
|
|
- dlog.Println("timeout")
|
|
|
- }
|
|
|
if len(p.openFiles) == 0 && p.bq.empty() {
|
|
|
// Nothing more to do for the moment
|
|
|
break pull
|
|
|
@@ -154,6 +154,11 @@ func (p *puller) run() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if changed {
|
|
|
+ p.fixupDirectories()
|
|
|
+ changed = false
|
|
|
+ }
|
|
|
+
|
|
|
// Do a rescan if it's time for it
|
|
|
select {
|
|
|
case <-walkTicker:
|
|
|
@@ -181,6 +186,72 @@ func (p *puller) runRO() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (p *puller) fixupDirectories() {
|
|
|
+ var deleteDirs []string
|
|
|
+ fn := func(path string, info os.FileInfo, err error) error {
|
|
|
+ if !info.IsDir() {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ rn, err := filepath.Rel(p.dir, path)
|
|
|
+ if err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if rn == "." {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ cur := p.model.CurrentGlobalFile(p.repo, rn)
|
|
|
+ if cur.Name != rn {
|
|
|
+ // No matching dir in current list; weird
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if cur.Flags&protocol.FlagDeleted != 0 {
|
|
|
+ if debugPull {
|
|
|
+ dlog.Printf("queue delete dir: %v", cur)
|
|
|
+ }
|
|
|
+
|
|
|
+ // We queue the directories to delete since we walk the
|
|
|
+ // tree in depth first order and need to remove the
|
|
|
+ // directories in the opposite order.
|
|
|
+
|
|
|
+ deleteDirs = append(deleteDirs, path)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if cur.Flags&uint32(os.ModePerm) != uint32(info.Mode()&os.ModePerm) {
|
|
|
+ os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
|
|
|
+ if debugPull {
|
|
|
+ dlog.Printf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if cur.Modified != info.ModTime().Unix() {
|
|
|
+ t := time.Unix(cur.Modified, 0)
|
|
|
+ os.Chtimes(path, t, t)
|
|
|
+ if debugPull {
|
|
|
+ dlog.Printf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ filepath.Walk(p.dir, fn)
|
|
|
+
|
|
|
+ // Delete any queued directories
|
|
|
+ for i := len(deleteDirs) - 1; i >= 0; i-- {
|
|
|
+ if debugPull {
|
|
|
+ dlog.Println("delete dir:", deleteDirs[i])
|
|
|
+ }
|
|
|
+ err := os.Remove(deleteDirs[i])
|
|
|
+ if err != nil {
|
|
|
+ warnln(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (p *puller) handleRequestResult(res requestResult) {
|
|
|
p.oustandingPerNode.decrease(res.node)
|
|
|
f := res.file
|
|
|
@@ -251,6 +322,18 @@ func (p *puller) handleRequestResult(res requestResult) {
|
|
|
func (p *puller) handleBlock(b bqBlock) {
|
|
|
f := b.file
|
|
|
|
|
|
+ // For directories, simply making sure they exist is enough
|
|
|
+ if f.Flags&protocol.FlagDirectory != 0 {
|
|
|
+ path := filepath.Join(p.dir, f.Name)
|
|
|
+ _, err := os.Stat(path)
|
|
|
+ if err != nil && os.IsNotExist(err) {
|
|
|
+ os.MkdirAll(path, 0777)
|
|
|
+ }
|
|
|
+ p.model.updateLocal(p.repo, f)
|
|
|
+ p.requestSlots <- true
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
of, ok := p.openFiles[f.Name]
|
|
|
of.done = b.last
|
|
|
|
|
|
@@ -429,13 +512,13 @@ func (p *puller) handleEmptyBlock(b bqBlock) {
|
|
|
Rename(of.temp, of.filepath)
|
|
|
}
|
|
|
delete(p.openFiles, f.Name)
|
|
|
- p.model.repoFiles[p.repo].Update(cid.LocalID, []scanner.File{f})
|
|
|
+ p.model.updateLocal(p.repo, f)
|
|
|
}
|
|
|
|
|
|
func (p *puller) queueNeededBlocks() {
|
|
|
queued := 0
|
|
|
- for _, f := range p.model.repoFiles[p.repo].Need(cid.LocalID) {
|
|
|
- lf := p.model.repoFiles[p.repo].Get(cid.LocalID, f.Name)
|
|
|
+ for _, f := range p.model.NeedFilesRepo(p.repo) {
|
|
|
+ lf := p.model.CurrentRepoFile(p.repo, f.Name)
|
|
|
have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
|
|
|
if debugNeed {
|
|
|
dlog.Printf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
|