123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- // Copyright (C) 2014 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package model
- import (
- "encoding/binary"
- "fmt"
- "io"
- "sync"
- "time"
- "google.golang.org/protobuf/proto"
- "github.com/syncthing/syncthing/internal/protoutil"
- "github.com/syncthing/syncthing/lib/fs"
- "github.com/syncthing/syncthing/lib/osutil"
- "github.com/syncthing/syncthing/lib/protocol"
- )
- // A sharedPullerState is kept for each file that is being synced and is kept
- // updated along the way.
- type sharedPullerState struct {
- // Immutable, does not require locking
- file protocol.FileInfo // The new file (desired end state)
- fs fs.Filesystem
- folder string
- tempName string
- realName string
- reused int // Number of blocks reused from temporary file
- ignorePerms bool
- hasCurFile bool // Whether curFile is set
- curFile protocol.FileInfo // The file as it exists now in our database
- sparse bool
- created time.Time
- fsync bool
- // Mutable, must be locked for access
- err error // The first error we hit
- writer *lockedWriterAt // Wraps fd to prevent fd closing at the same time as writing
- copyTotal int // Total number of copy actions for the whole job
- pullTotal int // Total number of pull actions for the whole job
- copyOrigin int // Number of blocks copied from the original file
- copyNeeded int // Number of copy actions still pending
- pullNeeded int // Number of block pulls still pending
- updated time.Time // Time when any of the counters above were last updated
- closed bool // True if the file has been finalClosed.
- available []int // Indexes of the blocks that are available in the temporary file
- availableUpdated time.Time // Time when list of available blocks was last updated
- mut sync.RWMutex // Protects the above
- }
- func newSharedPullerState(file protocol.FileInfo, fs fs.Filesystem, folderID, tempName string, blocks []protocol.BlockInfo, reused []int, ignorePerms, hasCurFile bool, curFile protocol.FileInfo, sparse bool, fsync bool) *sharedPullerState {
- return &sharedPullerState{
- file: file,
- fs: fs,
- folder: folderID,
- tempName: tempName,
- realName: file.Name,
- copyTotal: len(blocks),
- copyNeeded: len(blocks),
- reused: len(reused),
- updated: time.Now(),
- available: reused,
- availableUpdated: time.Now(),
- ignorePerms: ignorePerms,
- hasCurFile: hasCurFile,
- curFile: curFile,
- sparse: sparse,
- fsync: fsync,
- created: time.Now(),
- }
- }
- // A momentary state representing the progress of the puller
- type PullerProgress struct {
- Total int `json:"total"`
- Reused int `json:"reused"`
- CopiedFromOrigin int `json:"copiedFromOrigin"`
- CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
- CopiedFromElsewhere int `json:"copiedFromElsewhere"`
- Pulled int `json:"pulled"`
- Pulling int `json:"pulling"`
- BytesDone int64 `json:"bytesDone"`
- BytesTotal int64 `json:"bytesTotal"`
- }
- // lockedWriterAt adds a lock to protect from closing the fd at the same time as writing.
- // WriteAt() is goroutine safe by itself, but not against for example Close().
- type lockedWriterAt struct {
- mut sync.RWMutex
- fd fs.File
- }
- // WriteAt itself is goroutine safe, thus just needs to acquire a read-lock to
- // prevent closing concurrently (see SyncClose).
- func (w *lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
- w.mut.RLock()
- defer w.mut.RUnlock()
- return w.fd.WriteAt(p, off)
- }
- // SyncClose ensures that no more writes are happening before going ahead and
- // syncing and closing the fd, thus needs to acquire a write-lock.
- func (w *lockedWriterAt) SyncClose(fsync bool) error {
- w.mut.Lock()
- defer w.mut.Unlock()
- if fsync {
- if err := w.fd.Sync(); err != nil {
- // Sync() is nice if it works but not worth failing the
- // operation over if it fails.
- l.Debugf("fsync failed: %v", err)
- }
- }
- return w.fd.Close()
- }
- // tempFile returns the fd for the temporary file, reusing an open fd
- // or creating the file as necessary.
- func (s *sharedPullerState) tempFile() (*lockedWriterAt, error) {
- s.mut.Lock()
- defer s.mut.Unlock()
- // If we've already hit an error, return early
- if s.err != nil {
- return nil, s.err
- }
- // If the temp file is already open, return the file descriptor
- if s.writer != nil {
- return s.writer, nil
- }
- if err := s.addWriterLocked(); err != nil {
- s.failLocked(err)
- return nil, err
- }
- return s.writer, nil
- }
- func (s *sharedPullerState) addWriterLocked() error {
- return inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms)
- }
- // tempFileInWritableDir should only be called from tempFile.
- func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
- // The permissions to use for the temporary file should be those of the
- // final file, except we need user read & write at minimum. The
- // permissions will be set to the final value later, but in the meantime
- // we don't want to have a temporary file with looser permissions than
- // the final outcome.
- mode := fs.FileMode(s.file.Permissions) | 0o600
- if s.ignorePerms {
- // When ignorePerms is set we use a very permissive mode and let the
- // system umask filter it.
- mode = 0o666
- }
- // Attempt to create the temp file
- // RDWR because of issue #2994.
- flags := fs.OptReadWrite
- if s.reused == 0 {
- flags |= fs.OptCreate | fs.OptExclusive
- } else if !s.ignorePerms {
- // With sufficiently bad luck when exiting or crashing, we may have
- // had time to chmod the temp file to read only state but not yet
- // moved it to its final name. This leaves us with a read only temp
- // file that we're going to try to reuse. To handle that, we need to
- // make sure we have write permissions on the file before opening it.
- //
- // When ignorePerms is set we trust that the permissions are fine
- // already and make no modification, as we would otherwise override
- // what the umask dictates.
- if err := s.fs.Chmod(s.tempName, mode); err != nil {
- return fmt.Errorf("setting perms on temp file: %w", err)
- }
- }
- fd, err := s.fs.OpenFile(s.tempName, flags, mode)
- if err != nil {
- return fmt.Errorf("opening temp file: %w", err)
- }
- // Hide the temporary file
- s.fs.Hide(s.tempName)
- // Don't truncate symlink files, as that will mean that the path will
- // contain a bunch of nulls.
- if s.sparse && !s.file.IsSymlink() {
- size := s.file.Size
- // Trailer added to encrypted files
- if len(s.file.Encrypted) > 0 {
- size += encryptionTrailerSize(s.file)
- }
- // Truncate sets the size of the file. This creates a sparse file or a
- // space reservation, depending on the underlying filesystem.
- if err := fd.Truncate(size); err != nil {
- // The truncate call failed. That can happen in some cases when
- // space reservation isn't possible or over some network
- // filesystems... This generally doesn't matter.
- if s.reused > 0 {
- // ... but if we are attempting to reuse a file we have a
- // corner case when the old file is larger than the new one
- // and we can't just overwrite blocks and let the old data
- // linger at the end. In this case we attempt a delete of
- // the file and hope for better luck next time, when we
- // should come around with s.reused == 0.
- fd.Close()
- if remErr := s.fs.Remove(s.tempName); remErr != nil {
- l.Debugln("failed to remove temporary file:", remErr)
- }
- return err
- }
- }
- }
- // Same fd will be used by all writers
- s.writer = &lockedWriterAt{fd: fd}
- return nil
- }
- // fail sets the error on the puller state compose of error, and marks the
- // sharedPullerState as failed. Is a no-op when called on an already failed state.
- func (s *sharedPullerState) fail(err error) {
- s.mut.Lock()
- defer s.mut.Unlock()
- s.failLocked(err)
- }
- func (s *sharedPullerState) failLocked(err error) {
- if s.err != nil || err == nil {
- return
- }
- s.err = err
- }
- func (s *sharedPullerState) failed() error {
- s.mut.RLock()
- err := s.err
- s.mut.RUnlock()
- return err
- }
- func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
- s.mut.Lock()
- s.copyNeeded--
- s.updated = time.Now()
- s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
- s.availableUpdated = time.Now()
- l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
- s.mut.Unlock()
- }
- func (s *sharedPullerState) copiedFromOrigin(bytes int) {
- s.mut.Lock()
- s.copyOrigin++
- s.updated = time.Now()
- s.mut.Unlock()
- metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOrigin).Add(float64(bytes))
- }
- func (s *sharedPullerState) copiedFromElsewhere(bytes int) {
- metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOther).Add(float64(bytes))
- }
- func (s *sharedPullerState) skippedSparseBlock(bytes int) {
- // pretend we copied it, historical
- s.mut.Lock()
- s.copyOrigin++
- s.updated = time.Now()
- s.mut.Unlock()
- metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceSkipped).Add(float64(bytes))
- }
- func (s *sharedPullerState) pullStarted() {
- s.mut.Lock()
- s.copyTotal--
- s.copyNeeded--
- s.pullTotal++
- s.pullNeeded++
- s.updated = time.Now()
- l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
- s.mut.Unlock()
- }
- func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
- s.mut.Lock()
- s.pullNeeded--
- s.updated = time.Now()
- s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
- s.availableUpdated = time.Now()
- l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
- s.mut.Unlock()
- metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceNetwork).Add(float64(block.Size))
- }
- // finalClose atomically closes and returns closed status of a file. A true
- // first return value means the file was closed and should be finished, with
- // the error indicating the success or failure of the close. A false first
- // return value indicates the file is not ready to be closed, or is already
- // closed and should in either case not be finished off now.
- func (s *sharedPullerState) finalClose() (bool, error) {
- s.mut.Lock()
- defer s.mut.Unlock()
- if s.closed {
- // Already closed
- return false, nil
- }
- if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
- // Not done yet, and not errored
- return false, nil
- }
- if s.writer == nil {
- // If we didn't even create a temp file up to this point, now is the
- // time to do so. This also truncates the file to the correct size
- // if we're using sparse file.
- if err := s.addWriterLocked(); err != nil {
- return false, err
- }
- }
- if len(s.file.Encrypted) > 0 {
- if err := s.finalizeEncrypted(); err != nil && s.err == nil {
- // This is our error as we weren't errored before.
- s.err = err
- }
- }
- if s.writer != nil {
- if err := s.writer.SyncClose(s.fsync); err != nil && s.err == nil {
- // This is our error as we weren't errored before.
- s.err = err
- }
- s.writer = nil
- }
- s.closed = true
- // Unhide the temporary file when we close it, as it's likely to
- // immediately be renamed to the final name. If this is a failed temp
- // file we will also unhide it, but I'm fine with that as we're now
- // leaving it around for potentially quite a while.
- s.fs.Unhide(s.tempName)
- return true, s.err
- }
- // finalizeEncrypted adds a trailer to the encrypted file containing the
- // serialized FileInfo and the length of that FileInfo. When initializing a
- // folder from encrypted data we can extract this FileInfo from the end of
- // the file and regain the original metadata.
- func (s *sharedPullerState) finalizeEncrypted() error {
- trailerSize, err := writeEncryptionTrailer(s.file, s.writer)
- if err != nil {
- return err
- }
- s.file.Size += trailerSize
- s.file.EncryptionTrailerSize = int(trailerSize)
- return nil
- }
- // Returns the size of the written trailer.
- func writeEncryptionTrailer(file protocol.FileInfo, writer io.WriterAt) (int64, error) {
- // Here the file is in native format, while encryption happens in
- // wire format (always slashes).
- wireFile := file
- wireFile.Name = osutil.NormalizedFilename(wireFile.Name)
- trailerSize := encryptionTrailerSize(wireFile)
- bs := make([]byte, trailerSize)
- n, err := protoutil.MarshalTo(bs, wireFile.ToWire(false))
- if err != nil {
- return 0, err
- }
- binary.BigEndian.PutUint32(bs[n:], uint32(n)) //nolint:gosec
- bs = bs[:n+4]
- if _, err := writer.WriteAt(bs, wireFile.Size); err != nil {
- return 0, err
- }
- return trailerSize, nil
- }
- func encryptionTrailerSize(file protocol.FileInfo) int64 {
- return int64(proto.Size(file.ToWire(false))) + 4 // XXX: Inefficient
- }
- // Progress returns the momentarily progress for the puller
- func (s *sharedPullerState) Progress() *PullerProgress {
- s.mut.RLock()
- defer s.mut.RUnlock()
- total := s.reused + s.copyTotal + s.pullTotal
- done := total - s.copyNeeded - s.pullNeeded
- file := len(s.file.Blocks)
- return &PullerProgress{
- Total: total,
- Reused: s.reused,
- CopiedFromOrigin: s.copyOrigin,
- CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
- Pulled: s.pullTotal - s.pullNeeded,
- Pulling: s.pullNeeded,
- BytesTotal: blocksToSize(total, file, s.file.BlockSize(), s.file.Size),
- BytesDone: blocksToSize(done, file, s.file.BlockSize(), s.file.Size),
- }
- }
- // Updated returns the time when any of the progress related counters was last updated.
- func (s *sharedPullerState) Updated() time.Time {
- s.mut.RLock()
- t := s.updated
- s.mut.RUnlock()
- return t
- }
- // AvailableUpdated returns the time last time list of available blocks was updated
- func (s *sharedPullerState) AvailableUpdated() time.Time {
- s.mut.RLock()
- t := s.availableUpdated
- s.mut.RUnlock()
- return t
- }
- // Available returns blocks available in the current temporary file
- func (s *sharedPullerState) Available() []int {
- s.mut.RLock()
- blocks := s.available
- s.mut.RUnlock()
- return blocks
- }
- func blocksToSize(blocks, blocksInFile, blockSize int, fileSize int64) int64 {
- // The last/only block has somewhere between 1 and blockSize bytes. We do
- // not know whether the smaller block is part of the blocks and use an
- // estimate assuming a random chance that the small block is contained.
- if blocksInFile == 0 {
- return 0
- }
- return int64(blocks)*int64(blockSize) - (int64(blockSize)-fileSize%int64(blockSize))*int64(blocks)/int64(blocksInFile)
- }
|