transfer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. package common
  2. import (
  3. "errors"
  4. "path"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/drakkan/sftpgo/v2/dataprovider"
  9. "github.com/drakkan/sftpgo/v2/logger"
  10. "github.com/drakkan/sftpgo/v2/metric"
  11. "github.com/drakkan/sftpgo/v2/vfs"
  12. )
  13. var (
  14. // ErrTransferClosed defines the error returned for a closed transfer
  15. ErrTransferClosed = errors.New("transfer already closed")
  16. )
  17. // BaseTransfer contains protocols common transfer details for an upload or a download.
  18. type BaseTransfer struct { //nolint:maligned
  19. ID int64
  20. BytesSent int64
  21. BytesReceived int64
  22. Fs vfs.Fs
  23. File vfs.File
  24. Connection *BaseConnection
  25. cancelFn func()
  26. fsPath string
  27. effectiveFsPath string
  28. requestPath string
  29. ftpMode string
  30. start time.Time
  31. MaxWriteSize int64
  32. MinWriteOffset int64
  33. InitialSize int64
  34. truncatedSize int64
  35. isNewFile bool
  36. transferType int
  37. AbortTransfer int32
  38. aTime time.Time
  39. mTime time.Time
  40. transferQuota dataprovider.TransferQuota
  41. sync.Mutex
  42. errAbort error
  43. ErrTransfer error
  44. }
  45. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  46. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  47. transferType int, minWriteOffset, initialSize, maxWriteSize, truncatedSize int64, isNewFile bool, fs vfs.Fs,
  48. transferQuota dataprovider.TransferQuota,
  49. ) *BaseTransfer {
  50. t := &BaseTransfer{
  51. ID: conn.GetTransferID(),
  52. File: file,
  53. Connection: conn,
  54. cancelFn: cancelFn,
  55. fsPath: fsPath,
  56. effectiveFsPath: effectiveFsPath,
  57. start: time.Now(),
  58. transferType: transferType,
  59. MinWriteOffset: minWriteOffset,
  60. InitialSize: initialSize,
  61. isNewFile: isNewFile,
  62. requestPath: requestPath,
  63. BytesSent: 0,
  64. BytesReceived: 0,
  65. MaxWriteSize: maxWriteSize,
  66. AbortTransfer: 0,
  67. truncatedSize: truncatedSize,
  68. transferQuota: transferQuota,
  69. Fs: fs,
  70. }
  71. conn.AddTransfer(t)
  72. return t
  73. }
  74. // GetTransferQuota returns data transfer quota limits
  75. func (t *BaseTransfer) GetTransferQuota() dataprovider.TransferQuota {
  76. return t.transferQuota
  77. }
  78. // SetFtpMode sets the FTP mode for the current transfer
  79. func (t *BaseTransfer) SetFtpMode(mode string) {
  80. t.ftpMode = mode
  81. }
  82. // GetID returns the transfer ID
  83. func (t *BaseTransfer) GetID() int64 {
  84. return t.ID
  85. }
  86. // GetType returns the transfer type
  87. func (t *BaseTransfer) GetType() int {
  88. return t.transferType
  89. }
  90. // GetSize returns the transferred size
  91. func (t *BaseTransfer) GetSize() int64 {
  92. if t.transferType == TransferDownload {
  93. return atomic.LoadInt64(&t.BytesSent)
  94. }
  95. return atomic.LoadInt64(&t.BytesReceived)
  96. }
  97. // GetDownloadedSize returns the transferred size
  98. func (t *BaseTransfer) GetDownloadedSize() int64 {
  99. return atomic.LoadInt64(&t.BytesSent)
  100. }
  101. // GetUploadedSize returns the transferred size
  102. func (t *BaseTransfer) GetUploadedSize() int64 {
  103. return atomic.LoadInt64(&t.BytesReceived)
  104. }
  105. // GetStartTime returns the start time
  106. func (t *BaseTransfer) GetStartTime() time.Time {
  107. return t.start
  108. }
  109. // GetAbortError returns the error to send to the client if the transfer was aborted
  110. func (t *BaseTransfer) GetAbortError() error {
  111. t.Lock()
  112. defer t.Unlock()
  113. if t.errAbort != nil {
  114. return t.errAbort
  115. }
  116. return getQuotaExceededError(t.Connection.protocol)
  117. }
  118. // SignalClose signals that the transfer should be closed after the next read/write.
  119. // The optional error argument allow to send a specific error, otherwise a generic
  120. // transfer aborted error is sent
  121. func (t *BaseTransfer) SignalClose(err error) {
  122. t.Lock()
  123. t.errAbort = err
  124. t.Unlock()
  125. atomic.StoreInt32(&(t.AbortTransfer), 1)
  126. }
  127. // GetTruncatedSize returns the truncated sized if this is an upload overwriting
  128. // an existing file
  129. func (t *BaseTransfer) GetTruncatedSize() int64 {
  130. return t.truncatedSize
  131. }
  132. // HasSizeLimit returns true if there is an upload or download size limit
  133. func (t *BaseTransfer) HasSizeLimit() bool {
  134. if t.MaxWriteSize > 0 {
  135. return true
  136. }
  137. if t.transferQuota.HasSizeLimits() {
  138. return true
  139. }
  140. return false
  141. }
  142. // GetVirtualPath returns the transfer virtual path
  143. func (t *BaseTransfer) GetVirtualPath() string {
  144. return t.requestPath
  145. }
  146. // GetFsPath returns the transfer filesystem path
  147. func (t *BaseTransfer) GetFsPath() string {
  148. return t.fsPath
  149. }
  150. // SetTimes stores access and modification times if fsPath matches the current file
  151. func (t *BaseTransfer) SetTimes(fsPath string, atime time.Time, mtime time.Time) bool {
  152. if fsPath == t.GetFsPath() {
  153. t.aTime = atime
  154. t.mTime = mtime
  155. return true
  156. }
  157. return false
  158. }
  159. // GetRealFsPath returns the real transfer filesystem path.
  160. // If atomic uploads are enabled this differ from fsPath
  161. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  162. if fsPath == t.GetFsPath() {
  163. if t.File != nil {
  164. return t.File.Name()
  165. }
  166. return t.fsPath
  167. }
  168. return ""
  169. }
  170. // SetCancelFn sets the cancel function for the transfer
  171. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  172. t.cancelFn = cancelFn
  173. }
  174. // CheckRead returns an error if read if not allowed
  175. func (t *BaseTransfer) CheckRead() error {
  176. if t.transferQuota.AllowedDLSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  177. return nil
  178. }
  179. if t.transferQuota.AllowedTotalSize > 0 {
  180. if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
  181. return t.Connection.GetReadQuotaExceededError()
  182. }
  183. } else if t.transferQuota.AllowedDLSize > 0 {
  184. if atomic.LoadInt64(&t.BytesSent) > t.transferQuota.AllowedDLSize {
  185. return t.Connection.GetReadQuotaExceededError()
  186. }
  187. }
  188. return nil
  189. }
  190. // CheckWrite returns an error if write if not allowed
  191. func (t *BaseTransfer) CheckWrite() error {
  192. if t.MaxWriteSize > 0 && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
  193. return t.Connection.GetQuotaExceededError()
  194. }
  195. if t.transferQuota.AllowedULSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  196. return nil
  197. }
  198. if t.transferQuota.AllowedTotalSize > 0 {
  199. if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
  200. return t.Connection.GetQuotaExceededError()
  201. }
  202. } else if t.transferQuota.AllowedULSize > 0 {
  203. if atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedULSize {
  204. return t.Connection.GetQuotaExceededError()
  205. }
  206. }
  207. return nil
  208. }
  209. // Truncate changes the size of the opened file.
  210. // Supported for local fs only
  211. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  212. if fsPath == t.GetFsPath() {
  213. if t.File != nil {
  214. initialSize := t.InitialSize
  215. err := t.File.Truncate(size)
  216. if err == nil {
  217. t.Lock()
  218. t.InitialSize = size
  219. if t.MaxWriteSize > 0 {
  220. sizeDiff := initialSize - size
  221. t.MaxWriteSize += sizeDiff
  222. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  223. if t.transferQuota.HasSizeLimits() {
  224. go func(ulSize, dlSize int64, user dataprovider.User) {
  225. dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
  226. }(atomic.LoadInt64(&t.BytesReceived), atomic.LoadInt64(&t.BytesSent), t.Connection.User)
  227. }
  228. atomic.StoreInt64(&t.BytesReceived, 0)
  229. }
  230. t.Unlock()
  231. }
  232. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  233. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  234. return initialSize, err
  235. }
  236. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  237. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  238. // for buffered SFTP we can have buffered bytes so we returns an error
  239. if !vfs.IsBufferedSFTPFs(t.Fs) {
  240. return 0, nil
  241. }
  242. }
  243. return 0, vfs.ErrVfsUnsupported
  244. }
  245. return 0, errTransferMismatch
  246. }
  247. // TransferError is called if there is an unexpected error.
  248. // For example network or client issues
  249. func (t *BaseTransfer) TransferError(err error) {
  250. t.Lock()
  251. defer t.Unlock()
  252. if t.ErrTransfer != nil {
  253. return
  254. }
  255. t.ErrTransfer = err
  256. if t.cancelFn != nil {
  257. t.cancelFn()
  258. }
  259. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  260. t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  261. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  262. atomic.LoadInt64(&t.BytesReceived), elapsed)
  263. }
  264. func (t *BaseTransfer) getUploadFileSize() (int64, error) {
  265. var fileSize int64
  266. info, err := t.Fs.Stat(t.fsPath)
  267. if err == nil {
  268. fileSize = info.Size()
  269. }
  270. if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
  271. errDelete := t.Fs.Remove(t.fsPath, false)
  272. if errDelete != nil {
  273. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  274. }
  275. }
  276. return fileSize, err
  277. }
  278. // return 1 if the file is outside the user home dir
  279. func (t *BaseTransfer) checkUploadOutsideHomeDir(err error) int {
  280. if err == nil {
  281. return 0
  282. }
  283. if Config.TempPath == "" {
  284. return 0
  285. }
  286. err = t.Fs.Remove(t.effectiveFsPath, false)
  287. t.Connection.Log(logger.LevelWarn, "upload in temp path cannot be renamed, delete temporary file: %#v, deletion error: %v",
  288. t.effectiveFsPath, err)
  289. // the file is outside the home dir so don't update the quota
  290. atomic.StoreInt64(&t.BytesReceived, 0)
  291. t.MinWriteOffset = 0
  292. return 1
  293. }
  294. // Close it is called when the transfer is completed.
  295. // It logs the transfer info, updates the user quota (for uploads)
  296. // and executes any defined action.
  297. // If there is an error no action will be executed and, in atomic mode,
  298. // we try to delete the temporary file
  299. func (t *BaseTransfer) Close() error {
  300. defer t.Connection.RemoveTransfer(t)
  301. var err error
  302. numFiles := 0
  303. if t.isNewFile {
  304. numFiles = 1
  305. }
  306. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
  307. t.transferType, t.ErrTransfer)
  308. if t.transferQuota.HasSizeLimits() {
  309. dataprovider.UpdateUserTransferQuota(&t.Connection.User, atomic.LoadInt64(&t.BytesReceived), //nolint:errcheck
  310. atomic.LoadInt64(&t.BytesSent), false)
  311. }
  312. if t.File != nil && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
  313. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  314. err = t.Fs.Remove(t.File.Name(), false)
  315. if err == nil {
  316. numFiles--
  317. atomic.StoreInt64(&t.BytesReceived, 0)
  318. t.MinWriteOffset = 0
  319. }
  320. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  321. t.File.Name(), err)
  322. } else if t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath {
  323. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  324. err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  325. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  326. t.effectiveFsPath, t.fsPath, err)
  327. // the file must be removed if it is uploaded to a path outside the home dir and cannot be renamed
  328. numFiles -= t.checkUploadOutsideHomeDir(err)
  329. } else {
  330. err = t.Fs.Remove(t.effectiveFsPath, false)
  331. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, deletion error: %v",
  332. t.ErrTransfer, t.effectiveFsPath, err)
  333. if err == nil {
  334. numFiles--
  335. atomic.StoreInt64(&t.BytesReceived, 0)
  336. t.MinWriteOffset = 0
  337. }
  338. }
  339. }
  340. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  341. if t.transferType == TransferDownload {
  342. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  343. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  344. ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "",
  345. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  346. } else {
  347. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  348. if statSize, errStat := t.getUploadFileSize(); errStat == nil {
  349. fileSize = statSize
  350. }
  351. t.Connection.Log(logger.LevelDebug, "uploaded file size %v", fileSize)
  352. t.updateQuota(numFiles, fileSize)
  353. t.updateTimes()
  354. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  355. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  356. ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "", fileSize, t.ErrTransfer)
  357. }
  358. if t.ErrTransfer != nil {
  359. t.Connection.Log(logger.LevelError, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  360. if err == nil {
  361. err = t.ErrTransfer
  362. }
  363. }
  364. return err
  365. }
  366. func (t *BaseTransfer) updateTimes() {
  367. if !t.aTime.IsZero() && !t.mTime.IsZero() {
  368. err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
  369. t.Connection.Log(logger.LevelDebug, "set times for file %#v, atime: %v, mtime: %v, err: %v",
  370. t.fsPath, t.aTime, t.mTime, err)
  371. }
  372. }
  373. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  374. // S3 uploads are atomic, if there is an error nothing is uploaded
  375. if t.File == nil && t.ErrTransfer != nil && !t.Connection.User.HasBufferedSFTP(t.GetVirtualPath()) {
  376. return false
  377. }
  378. sizeDiff := fileSize - t.InitialSize
  379. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  380. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  381. if err == nil {
  382. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  383. sizeDiff, false)
  384. if vfolder.IsIncludedInUserQuota() {
  385. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  386. }
  387. } else {
  388. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  389. }
  390. return true
  391. }
  392. return false
  393. }
  394. // HandleThrottle manage bandwidth throttling
  395. func (t *BaseTransfer) HandleThrottle() {
  396. var wantedBandwidth int64
  397. var trasferredBytes int64
  398. if t.transferType == TransferDownload {
  399. wantedBandwidth = t.Connection.User.DownloadBandwidth
  400. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  401. } else {
  402. wantedBandwidth = t.Connection.User.UploadBandwidth
  403. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  404. }
  405. if wantedBandwidth > 0 {
  406. // real and wanted elapsed as milliseconds, bytes as kilobytes
  407. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  408. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  409. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  410. if wantedElapsed > realElapsed {
  411. toSleep := time.Duration(wantedElapsed - realElapsed)
  412. time.Sleep(toSleep * time.Millisecond)
  413. }
  414. }
  415. }