send.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package taildrop
  4. import (
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "tailscale.com/envknob"
  10. "tailscale.com/ipn"
  11. "tailscale.com/tstime"
  12. "tailscale.com/version/distro"
  13. )
  14. type incomingFileKey struct {
  15. id clientID
  16. name string // e.g., "foo.jpeg"
  17. }
  18. type incomingFile struct {
  19. clock tstime.DefaultClock
  20. started time.Time
  21. size int64 // or -1 if unknown; never 0
  22. w io.Writer // underlying writer
  23. sendFileNotify func() // called when done
  24. partialPath string // non-empty in direct mode
  25. finalPath string // not used in direct mode
  26. mu sync.Mutex
  27. copied int64
  28. done bool
  29. lastNotify time.Time
  30. }
  31. func (f *incomingFile) Write(p []byte) (n int, err error) {
  32. n, err = f.w.Write(p)
  33. var needNotify bool
  34. defer func() {
  35. if needNotify {
  36. f.sendFileNotify()
  37. }
  38. }()
  39. if n > 0 {
  40. f.mu.Lock()
  41. defer f.mu.Unlock()
  42. f.copied += int64(n)
  43. now := f.clock.Now()
  44. if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second {
  45. f.lastNotify = now
  46. needNotify = true
  47. }
  48. }
  49. return n, err
  50. }
  51. // PutFile stores a file into [manager.Dir] from a given client id.
  52. // The baseName must be a base filename without any slashes.
  53. // The length is the expected length of content to read from r,
  54. // it may be negative to indicate that it is unknown.
  55. // It returns the length of the entire file.
  56. //
  57. // If there is a failure reading from r, then the partial file is not deleted
  58. // for some period of time. The [manager.PartialFiles] and [manager.HashPartialFile]
  59. // methods may be used to list all partial files and to compute the hash for a
  60. // specific partial file. This allows the client to determine whether to resume
  61. // a partial file. While resuming, PutFile may be called again with a non-zero
  62. // offset to specify where to resume receiving data at.
  63. func (m *manager) PutFile(id clientID, baseName string, r io.Reader, offset, length int64) (fileLength int64, err error) {
  64. switch {
  65. case m == nil || m.opts.fileOps == nil:
  66. return 0, ErrNoTaildrop
  67. case !envknob.CanTaildrop():
  68. return 0, ErrNoTaildrop
  69. case distro.Get() == distro.Unraid && !m.opts.DirectFileMode:
  70. return 0, ErrNotAccessible
  71. }
  72. if err := validateBaseName(baseName); err != nil {
  73. return 0, err
  74. }
  75. // and make sure we don't delete it while uploading:
  76. m.deleter.Remove(baseName)
  77. // Create (if not already) the partial file with read-write permissions.
  78. partialName := baseName + id.partialSuffix()
  79. wc, partialPath, err := m.opts.fileOps.OpenWriter(partialName, offset, 0o666)
  80. if err != nil {
  81. return 0, m.redactAndLogError("Create", err)
  82. }
  83. defer func() {
  84. wc.Close()
  85. if err != nil {
  86. m.deleter.Insert(partialName) // mark partial file for eventual deletion
  87. }
  88. }()
  89. // Check whether there is an in-progress transfer for the file.
  90. inFileKey := incomingFileKey{id, baseName}
  91. inFile, loaded := m.incomingFiles.LoadOrInit(inFileKey, func() *incomingFile {
  92. inFile := &incomingFile{
  93. clock: m.opts.Clock,
  94. started: m.opts.Clock.Now(),
  95. size: length,
  96. sendFileNotify: m.opts.SendFileNotify,
  97. }
  98. if m.opts.DirectFileMode {
  99. inFile.partialPath = partialPath
  100. }
  101. return inFile
  102. })
  103. inFile.w = wc
  104. if loaded {
  105. return 0, ErrFileExists
  106. }
  107. defer m.incomingFiles.Delete(inFileKey)
  108. // Record that we have started to receive at least one file.
  109. // This is used by the deleter upon a cold-start to scan the directory
  110. // for any files that need to be deleted.
  111. if st := m.opts.State; st != nil {
  112. if b, _ := st.ReadState(ipn.TaildropReceivedKey); len(b) == 0 {
  113. if werr := st.WriteState(ipn.TaildropReceivedKey, []byte{1}); werr != nil {
  114. m.opts.Logf("WriteState error: %v", werr) // non-fatal error
  115. }
  116. }
  117. }
  118. // Copy the contents of the file to the writer.
  119. copyLength, err := io.Copy(wc, r)
  120. if err != nil {
  121. return 0, m.redactAndLogError("Copy", err)
  122. }
  123. if length >= 0 && copyLength != length {
  124. return 0, m.redactAndLogError("Copy", fmt.Errorf("copied %d bytes; expected %d", copyLength, length))
  125. }
  126. if err := wc.Close(); err != nil {
  127. return 0, m.redactAndLogError("Close", err)
  128. }
  129. fileLength = offset + copyLength
  130. inFile.mu.Lock()
  131. inFile.done = true
  132. inFile.mu.Unlock()
  133. // 6) Finalize (rename/move) the partial into place via FileOps.Rename
  134. finalPath, err := m.opts.fileOps.Rename(partialPath, baseName)
  135. if err != nil {
  136. return 0, m.redactAndLogError("Rename", err)
  137. }
  138. inFile.finalPath = finalPath
  139. m.totalReceived.Add(1)
  140. m.opts.SendFileNotify()
  141. return fileLength, nil
  142. }
  143. func (m *manager) redactAndLogError(stage string, err error) error {
  144. err = redactError(err)
  145. m.opts.Logf("put %s error: %v", stage, err)
  146. return err
  147. }