2
0

send.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package taildrop
  4. import (
  5. "crypto/sha256"
  6. "errors"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "tailscale.com/envknob"
  13. "tailscale.com/ipn"
  14. "tailscale.com/tstime"
  15. "tailscale.com/version/distro"
  16. )
  17. type incomingFileKey struct {
  18. id ClientID
  19. name string // e.g., "foo.jpeg"
  20. }
  21. type incomingFile struct {
  22. clock tstime.DefaultClock
  23. started time.Time
  24. size int64 // or -1 if unknown; never 0
  25. w io.Writer // underlying writer
  26. sendFileNotify func() // called when done
  27. partialPath string // non-empty in direct mode
  28. mu sync.Mutex
  29. copied int64
  30. done bool
  31. lastNotify time.Time
  32. }
  33. func (f *incomingFile) Write(p []byte) (n int, err error) {
  34. n, err = f.w.Write(p)
  35. var needNotify bool
  36. defer func() {
  37. if needNotify {
  38. f.sendFileNotify()
  39. }
  40. }()
  41. if n > 0 {
  42. f.mu.Lock()
  43. defer f.mu.Unlock()
  44. f.copied += int64(n)
  45. now := f.clock.Now()
  46. if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second {
  47. f.lastNotify = now
  48. needNotify = true
  49. }
  50. }
  51. return n, err
  52. }
  53. // PutFile stores a file into [Manager.Dir] from a given client id.
  54. // The baseName must be a base filename without any slashes.
  55. // The length is the expected length of content to read from r,
  56. // it may be negative to indicate that it is unknown.
  57. // It returns the length of the entire file.
  58. //
  59. // If there is a failure reading from r, then the partial file is not deleted
  60. // for some period of time. The [Manager.PartialFiles] and [Manager.HashPartialFile]
  61. // methods may be used to list all partial files and to compute the hash for a
  62. // specific partial file. This allows the client to determine whether to resume
  63. // a partial file. While resuming, PutFile may be called again with a non-zero
  64. // offset to specify where to resume receiving data at.
  65. func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, length int64) (int64, error) {
  66. switch {
  67. case m == nil || m.opts.Dir == "":
  68. return 0, ErrNoTaildrop
  69. case !envknob.CanTaildrop():
  70. return 0, ErrNoTaildrop
  71. case distro.Get() == distro.Unraid && !m.opts.DirectFileMode:
  72. return 0, ErrNotAccessible
  73. }
  74. dstPath, err := joinDir(m.opts.Dir, baseName)
  75. if err != nil {
  76. return 0, err
  77. }
  78. redactAndLogError := func(action string, err error) error {
  79. err = redactError(err)
  80. m.opts.Logf("put %v error: %v", action, err)
  81. return err
  82. }
  83. avoidPartialRename := m.opts.DirectFileMode && m.opts.AvoidFinalRename
  84. if avoidPartialRename {
  85. // Users using AvoidFinalRename are depending on the exact filename
  86. // of the partial files. So avoid injecting the id into it.
  87. id = ""
  88. }
  89. // Check whether there is an in-progress transfer for the file.
  90. partialPath := dstPath + id.partialSuffix()
  91. inFileKey := incomingFileKey{id, baseName}
  92. inFile, loaded := m.incomingFiles.LoadOrInit(inFileKey, func() *incomingFile {
  93. inFile := &incomingFile{
  94. clock: m.opts.Clock,
  95. started: m.opts.Clock.Now(),
  96. size: length,
  97. sendFileNotify: m.opts.SendFileNotify,
  98. }
  99. if m.opts.DirectFileMode {
  100. inFile.partialPath = partialPath
  101. }
  102. return inFile
  103. })
  104. if loaded {
  105. return 0, ErrFileExists
  106. }
  107. defer m.incomingFiles.Delete(inFileKey)
  108. m.deleter.Remove(filepath.Base(partialPath)) // avoid deleting the partial file while receiving
  109. // Create (if not already) the partial file with read-write permissions.
  110. f, err := os.OpenFile(partialPath, os.O_CREATE|os.O_RDWR, 0666)
  111. if err != nil {
  112. return 0, redactAndLogError("Create", err)
  113. }
  114. defer func() {
  115. f.Close() // best-effort to cleanup dangling file handles
  116. if err != nil {
  117. if avoidPartialRename {
  118. os.Remove(partialPath) // best-effort
  119. return
  120. }
  121. m.deleter.Insert(filepath.Base(partialPath)) // mark partial file for eventual deletion
  122. }
  123. }()
  124. inFile.w = f
  125. // Record that we have started to receive at least one file.
  126. // This is used by the deleter upon a cold-start to scan the directory
  127. // for any files that need to be deleted.
  128. if m.opts.State != nil {
  129. if b, _ := m.opts.State.ReadState(ipn.TaildropReceivedKey); len(b) == 0 {
  130. if err := m.opts.State.WriteState(ipn.TaildropReceivedKey, []byte{1}); err != nil {
  131. m.opts.Logf("WriteState error: %v", err) // non-fatal error
  132. }
  133. }
  134. }
  135. // A positive offset implies that we are resuming an existing file.
  136. // Seek to the appropriate offset and truncate the file.
  137. if offset != 0 {
  138. currLength, err := f.Seek(0, io.SeekEnd)
  139. if err != nil {
  140. return 0, redactAndLogError("Seek", err)
  141. }
  142. if offset < 0 || offset > currLength {
  143. return 0, redactAndLogError("Seek", err)
  144. }
  145. if _, err := f.Seek(offset, io.SeekStart); err != nil {
  146. return 0, redactAndLogError("Seek", err)
  147. }
  148. if err := f.Truncate(offset); err != nil {
  149. return 0, redactAndLogError("Truncate", err)
  150. }
  151. }
  152. // Copy the contents of the file.
  153. copyLength, err := io.Copy(inFile, r)
  154. if err != nil {
  155. return 0, redactAndLogError("Copy", err)
  156. }
  157. if length >= 0 && copyLength != length {
  158. return 0, redactAndLogError("Copy", errors.New("copied an unexpected number of bytes"))
  159. }
  160. if err := f.Close(); err != nil {
  161. return 0, redactAndLogError("Close", err)
  162. }
  163. fileLength := offset + copyLength
  164. // Return early for avoidPartialRename since users of AvoidFinalRename
  165. // are depending on the exact naming of partial files.
  166. if avoidPartialRename {
  167. inFile.mu.Lock()
  168. inFile.done = true
  169. inFile.mu.Unlock()
  170. m.totalReceived.Add(1)
  171. m.opts.SendFileNotify()
  172. return fileLength, nil
  173. }
  174. // File has been successfully received, rename the partial file
  175. // to the final destination filename. If a file of that name already exists,
  176. // then try multiple times with variations of the filename.
  177. computePartialSum := sync.OnceValues(func() ([sha256.Size]byte, error) {
  178. return sha256File(partialPath)
  179. })
  180. maxRetries := 10
  181. for ; maxRetries > 0; maxRetries-- {
  182. // Atomically rename the partial file as the destination file if it doesn't exist.
  183. // Otherwise, it returns the length of the current destination file.
  184. // The operation is atomic.
  185. dstLength, err := func() (int64, error) {
  186. m.renameMu.Lock()
  187. defer m.renameMu.Unlock()
  188. switch fi, err := os.Stat(dstPath); {
  189. case os.IsNotExist(err):
  190. return -1, os.Rename(partialPath, dstPath)
  191. case err != nil:
  192. return -1, err
  193. default:
  194. return fi.Size(), nil
  195. }
  196. }()
  197. if err != nil {
  198. return 0, redactAndLogError("Rename", err)
  199. }
  200. if dstLength < 0 {
  201. break // we successfully renamed; so stop
  202. }
  203. // Avoid the final rename if a destination file has the same contents.
  204. if dstLength == fileLength {
  205. partialSum, err := computePartialSum()
  206. if err != nil {
  207. return 0, redactAndLogError("Rename", err)
  208. }
  209. dstSum, err := sha256File(dstPath)
  210. if err != nil {
  211. return 0, redactAndLogError("Rename", err)
  212. }
  213. if dstSum == partialSum {
  214. if err := os.Remove(partialPath); err != nil {
  215. return 0, redactAndLogError("Remove", err)
  216. }
  217. break // we successfully found a content match; so stop
  218. }
  219. }
  220. // Choose a new destination filename and try again.
  221. dstPath = NextFilename(dstPath)
  222. }
  223. if maxRetries <= 0 {
  224. return 0, errors.New("too many retries trying to rename partial file")
  225. }
  226. m.totalReceived.Add(1)
  227. m.opts.SendFileNotify()
  228. return fileLength, nil
  229. }
  230. func sha256File(file string) (out [sha256.Size]byte, err error) {
  231. h := sha256.New()
  232. f, err := os.Open(file)
  233. if err != nil {
  234. return out, err
  235. }
  236. defer f.Close()
  237. if _, err := io.Copy(h, f); err != nil {
  238. return out, err
  239. }
  240. return [sha256.Size]byte(h.Sum(nil)), nil
  241. }