transfer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package common
  2. import (
  3. "errors"
  4. "path"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/drakkan/sftpgo/v2/dataprovider"
  9. "github.com/drakkan/sftpgo/v2/logger"
  10. "github.com/drakkan/sftpgo/v2/metric"
  11. "github.com/drakkan/sftpgo/v2/vfs"
  12. )
  13. var (
  14. // ErrTransferClosed defines the error returned for a closed transfer
  15. ErrTransferClosed = errors.New("transfer already closed")
  16. )
  17. // BaseTransfer contains protocols common transfer details for an upload or a download.
  18. type BaseTransfer struct { //nolint:maligned
  19. ID uint64
  20. BytesSent int64
  21. BytesReceived int64
  22. Fs vfs.Fs
  23. File vfs.File
  24. Connection *BaseConnection
  25. cancelFn func()
  26. fsPath string
  27. effectiveFsPath string
  28. requestPath string
  29. ftpMode string
  30. start time.Time
  31. MaxWriteSize int64
  32. MinWriteOffset int64
  33. InitialSize int64
  34. isNewFile bool
  35. transferType int
  36. AbortTransfer int32
  37. aTime time.Time
  38. mTime time.Time
  39. sync.Mutex
  40. ErrTransfer error
  41. }
  42. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  43. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  44. transferType int, minWriteOffset, initialSize, maxWriteSize int64, isNewFile bool, fs vfs.Fs) *BaseTransfer {
  45. t := &BaseTransfer{
  46. ID: conn.GetTransferID(),
  47. File: file,
  48. Connection: conn,
  49. cancelFn: cancelFn,
  50. fsPath: fsPath,
  51. effectiveFsPath: effectiveFsPath,
  52. start: time.Now(),
  53. transferType: transferType,
  54. MinWriteOffset: minWriteOffset,
  55. InitialSize: initialSize,
  56. isNewFile: isNewFile,
  57. requestPath: requestPath,
  58. BytesSent: 0,
  59. BytesReceived: 0,
  60. MaxWriteSize: maxWriteSize,
  61. AbortTransfer: 0,
  62. Fs: fs,
  63. }
  64. conn.AddTransfer(t)
  65. return t
  66. }
  67. // SetFtpMode sets the FTP mode for the current transfer
  68. func (t *BaseTransfer) SetFtpMode(mode string) {
  69. t.ftpMode = mode
  70. }
  71. // GetID returns the transfer ID
  72. func (t *BaseTransfer) GetID() uint64 {
  73. return t.ID
  74. }
  75. // GetType returns the transfer type
  76. func (t *BaseTransfer) GetType() int {
  77. return t.transferType
  78. }
  79. // GetSize returns the transferred size
  80. func (t *BaseTransfer) GetSize() int64 {
  81. if t.transferType == TransferDownload {
  82. return atomic.LoadInt64(&t.BytesSent)
  83. }
  84. return atomic.LoadInt64(&t.BytesReceived)
  85. }
  86. // GetStartTime returns the start time
  87. func (t *BaseTransfer) GetStartTime() time.Time {
  88. return t.start
  89. }
  90. // SignalClose signals that the transfer should be closed.
  91. // For same protocols, for example WebDAV, we have no
  92. // access to the network connection, so we use this method
  93. // to make the next read or write to fail
  94. func (t *BaseTransfer) SignalClose() {
  95. atomic.StoreInt32(&(t.AbortTransfer), 1)
  96. }
  97. // GetVirtualPath returns the transfer virtual path
  98. func (t *BaseTransfer) GetVirtualPath() string {
  99. return t.requestPath
  100. }
  101. // GetFsPath returns the transfer filesystem path
  102. func (t *BaseTransfer) GetFsPath() string {
  103. return t.fsPath
  104. }
  105. // SetTimes stores access and modification times if fsPath matches the current file
  106. func (t *BaseTransfer) SetTimes(fsPath string, atime time.Time, mtime time.Time) bool {
  107. if fsPath == t.GetFsPath() {
  108. t.aTime = atime
  109. t.mTime = mtime
  110. return true
  111. }
  112. return false
  113. }
  114. // GetRealFsPath returns the real transfer filesystem path.
  115. // If atomic uploads are enabled this differ from fsPath
  116. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  117. if fsPath == t.GetFsPath() {
  118. if t.File != nil {
  119. return t.File.Name()
  120. }
  121. return t.fsPath
  122. }
  123. return ""
  124. }
  125. // SetCancelFn sets the cancel function for the transfer
  126. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  127. t.cancelFn = cancelFn
  128. }
  129. // Truncate changes the size of the opened file.
  130. // Supported for local fs only
  131. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  132. if fsPath == t.GetFsPath() {
  133. if t.File != nil {
  134. initialSize := t.InitialSize
  135. err := t.File.Truncate(size)
  136. if err == nil {
  137. t.Lock()
  138. t.InitialSize = size
  139. if t.MaxWriteSize > 0 {
  140. sizeDiff := initialSize - size
  141. t.MaxWriteSize += sizeDiff
  142. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  143. atomic.StoreInt64(&t.BytesReceived, 0)
  144. }
  145. t.Unlock()
  146. }
  147. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  148. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  149. return initialSize, err
  150. }
  151. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  152. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  153. // for buffered SFTP we can have buffered bytes so we returns an error
  154. if !vfs.IsBufferedSFTPFs(t.Fs) {
  155. return 0, nil
  156. }
  157. }
  158. return 0, vfs.ErrVfsUnsupported
  159. }
  160. return 0, errTransferMismatch
  161. }
  162. // TransferError is called if there is an unexpected error.
  163. // For example network or client issues
  164. func (t *BaseTransfer) TransferError(err error) {
  165. t.Lock()
  166. defer t.Unlock()
  167. if t.ErrTransfer != nil {
  168. return
  169. }
  170. t.ErrTransfer = err
  171. if t.cancelFn != nil {
  172. t.cancelFn()
  173. }
  174. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  175. t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  176. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  177. atomic.LoadInt64(&t.BytesReceived), elapsed)
  178. }
  179. func (t *BaseTransfer) getUploadFileSize() (int64, error) {
  180. var fileSize int64
  181. info, err := t.Fs.Stat(t.fsPath)
  182. if err == nil {
  183. fileSize = info.Size()
  184. }
  185. if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
  186. errDelete := t.Fs.Remove(t.fsPath, false)
  187. if errDelete != nil {
  188. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  189. }
  190. }
  191. return fileSize, err
  192. }
  193. // Close it is called when the transfer is completed.
  194. // It logs the transfer info, updates the user quota (for uploads)
  195. // and executes any defined action.
  196. // If there is an error no action will be executed and, in atomic mode,
  197. // we try to delete the temporary file
  198. func (t *BaseTransfer) Close() error {
  199. defer t.Connection.RemoveTransfer(t)
  200. var err error
  201. numFiles := 0
  202. if t.isNewFile {
  203. numFiles = 1
  204. }
  205. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  206. if t.File != nil && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
  207. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  208. err = t.Fs.Remove(t.File.Name(), false)
  209. if err == nil {
  210. numFiles--
  211. atomic.StoreInt64(&t.BytesReceived, 0)
  212. t.MinWriteOffset = 0
  213. }
  214. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  215. t.File.Name(), err)
  216. } else if t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath {
  217. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  218. err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  219. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  220. t.effectiveFsPath, t.fsPath, err)
  221. } else {
  222. err = t.Fs.Remove(t.effectiveFsPath, false)
  223. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
  224. "deletion error: %v", t.ErrTransfer, t.effectiveFsPath, err)
  225. if err == nil {
  226. numFiles--
  227. atomic.StoreInt64(&t.BytesReceived, 0)
  228. t.MinWriteOffset = 0
  229. }
  230. }
  231. }
  232. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  233. if t.transferType == TransferDownload {
  234. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  235. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  236. ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "",
  237. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  238. } else {
  239. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  240. if statSize, err := t.getUploadFileSize(); err == nil {
  241. fileSize = statSize
  242. }
  243. t.Connection.Log(logger.LevelDebug, "uploaded file size %v", fileSize)
  244. t.updateQuota(numFiles, fileSize)
  245. t.updateTimes()
  246. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  247. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  248. ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "", fileSize, t.ErrTransfer)
  249. }
  250. if t.ErrTransfer != nil {
  251. t.Connection.Log(logger.LevelError, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  252. if err == nil {
  253. err = t.ErrTransfer
  254. }
  255. }
  256. return err
  257. }
  258. func (t *BaseTransfer) updateTimes() {
  259. if !t.aTime.IsZero() && !t.mTime.IsZero() {
  260. err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
  261. t.Connection.Log(logger.LevelDebug, "set times for file %#v, atime: %v, mtime: %v, err: %v",
  262. t.fsPath, t.aTime, t.mTime, err)
  263. }
  264. }
  265. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  266. // S3 uploads are atomic, if there is an error nothing is uploaded
  267. if t.File == nil && t.ErrTransfer != nil && !t.Connection.User.HasBufferedSFTP(t.GetVirtualPath()) {
  268. return false
  269. }
  270. sizeDiff := fileSize - t.InitialSize
  271. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  272. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  273. if err == nil {
  274. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  275. sizeDiff, false)
  276. if vfolder.IsIncludedInUserQuota() {
  277. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  278. }
  279. } else {
  280. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  281. }
  282. return true
  283. }
  284. return false
  285. }
  286. // HandleThrottle manage bandwidth throttling
  287. func (t *BaseTransfer) HandleThrottle() {
  288. var wantedBandwidth int64
  289. var trasferredBytes int64
  290. if t.transferType == TransferDownload {
  291. wantedBandwidth = t.Connection.User.DownloadBandwidth
  292. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  293. } else {
  294. wantedBandwidth = t.Connection.User.UploadBandwidth
  295. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  296. }
  297. if wantedBandwidth > 0 {
  298. // real and wanted elapsed as milliseconds, bytes as kilobytes
  299. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  300. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  301. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  302. if wantedElapsed > realElapsed {
  303. toSleep := time.Duration(wantedElapsed - realElapsed)
  304. time.Sleep(toSleep * time.Millisecond)
  305. }
  306. }
  307. }