blockqueue.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package scanner
  7. import (
  8. "errors"
  9. "os"
  10. "path/filepath"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. )
  14. // The parallel hasher reads FileInfo structures from the inbox, hashes the
  15. // file to populate the Blocks element and sends it to the outbox. A number of
  16. // workers are used in parallel. The outbox will become closed when the inbox
  17. // is closed and all items handled.
  18. func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter Counter, done, cancel chan struct{}, useWeakHashes bool) {
  19. wg := sync.NewWaitGroup()
  20. wg.Add(workers)
  21. for i := 0; i < workers; i++ {
  22. go func() {
  23. hashFiles(dir, blockSize, outbox, inbox, counter, cancel, useWeakHashes)
  24. wg.Done()
  25. }()
  26. }
  27. go func() {
  28. wg.Wait()
  29. if done != nil {
  30. close(done)
  31. }
  32. close(outbox)
  33. }()
  34. }
  35. // HashFile hashes the files and returns a list of blocks representing the file.
  36. func HashFile(path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
  37. fd, err := os.Open(path)
  38. if err != nil {
  39. l.Debugln("open:", err)
  40. return nil, err
  41. }
  42. defer fd.Close()
  43. // Get the size and modtime of the file before we start hashing it.
  44. fi, err := fd.Stat()
  45. if err != nil {
  46. l.Debugln("stat before:", err)
  47. return nil, err
  48. }
  49. size := fi.Size()
  50. modTime := fi.ModTime()
  51. // Hash the file. This may take a while for large files.
  52. blocks, err := Blocks(fd, blockSize, size, counter, useWeakHashes)
  53. if err != nil {
  54. l.Debugln("blocks:", err)
  55. return nil, err
  56. }
  57. // Recheck the size and modtime again. If they differ, the file changed
  58. // while we were reading it and our hash results are invalid.
  59. fi, err = fd.Stat()
  60. if err != nil {
  61. l.Debugln("stat after:", err)
  62. return nil, err
  63. }
  64. if size != fi.Size() || !modTime.Equal(fi.ModTime()) {
  65. return nil, errors.New("file changed during hashing")
  66. }
  67. return blocks, nil
  68. }
  69. func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter Counter, cancel chan struct{}, useWeakHashes bool) {
  70. for {
  71. select {
  72. case f, ok := <-inbox:
  73. if !ok {
  74. return
  75. }
  76. if f.IsDirectory() || f.IsDeleted() {
  77. panic("Bug. Asked to hash a directory or a deleted file.")
  78. }
  79. blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, counter, useWeakHashes)
  80. if err != nil {
  81. l.Debugln("hash error:", f.Name, err)
  82. continue
  83. }
  84. f.Blocks = blocks
  85. // The size we saw when initially deciding to hash the file
  86. // might not have been the size it actually had when we hashed
  87. // it. Update the size from the block list.
  88. f.Size = 0
  89. for _, b := range blocks {
  90. f.Size += int64(b.Size)
  91. }
  92. select {
  93. case outbox <- f:
  94. case <-cancel:
  95. return
  96. }
  97. case <-cancel:
  98. return
  99. }
  100. }
  101. }