blockqueue.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package scanner
  7. import (
  8. "errors"
  9. "os"
  10. "path/filepath"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. )
  14. // The parallel hasher reads FileInfo structures from the inbox, hashes the
  15. // file to populate the Blocks element and sends it to the outbox. A number of
  16. // workers are used in parallel. The outbox will become closed when the inbox
  17. // is closed and all items handled.
  18. func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter Counter, done, cancel chan struct{}) {
  19. wg := sync.NewWaitGroup()
  20. wg.Add(workers)
  21. for i := 0; i < workers; i++ {
  22. go func() {
  23. hashFiles(dir, blockSize, outbox, inbox, counter, cancel)
  24. wg.Done()
  25. }()
  26. }
  27. go func() {
  28. wg.Wait()
  29. if done != nil {
  30. close(done)
  31. }
  32. close(outbox)
  33. }()
  34. }
  35. func HashFile(path string, blockSize int, counter Counter) ([]protocol.BlockInfo, error) {
  36. fd, err := os.Open(path)
  37. if err != nil {
  38. l.Debugln("open:", err)
  39. return nil, err
  40. }
  41. defer fd.Close()
  42. // Get the size and modtime of the file before we start hashing it.
  43. fi, err := fd.Stat()
  44. if err != nil {
  45. l.Debugln("stat before:", err)
  46. return nil, err
  47. }
  48. size := fi.Size()
  49. modTime := fi.ModTime()
  50. // Hash the file. This may take a while for large files.
  51. blocks, err := Blocks(fd, blockSize, size, counter)
  52. if err != nil {
  53. l.Debugln("blocks:", err)
  54. return nil, err
  55. }
  56. // Recheck the size and modtime again. If they differ, the file changed
  57. // while we were reading it and our hash results are invalid.
  58. fi, err = fd.Stat()
  59. if err != nil {
  60. l.Debugln("stat after:", err)
  61. return nil, err
  62. }
  63. if size != fi.Size() || !modTime.Equal(fi.ModTime()) {
  64. return nil, errors.New("file changed during hashing")
  65. }
  66. return blocks, nil
  67. }
  68. func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter Counter, cancel chan struct{}) {
  69. for {
  70. select {
  71. case f, ok := <-inbox:
  72. if !ok {
  73. return
  74. }
  75. if f.IsDirectory() || f.IsDeleted() {
  76. panic("Bug. Asked to hash a directory or a deleted file.")
  77. }
  78. blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, counter)
  79. if err != nil {
  80. l.Debugln("hash error:", f.Name, err)
  81. continue
  82. }
  83. f.Blocks = blocks
  84. // The size we saw when initially deciding to hash the file
  85. // might not have been the size it actually had when we hashed
  86. // it. Update the size from the block list.
  87. f.Size = 0
  88. for _, b := range blocks {
  89. f.Size += int64(b.Size)
  90. }
  91. select {
  92. case outbox <- f:
  93. case <-cancel:
  94. return
  95. }
  96. case <-cancel:
  97. return
  98. }
  99. }
  100. }