transfer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. package common
  2. import (
  3. "errors"
  4. "path"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/drakkan/sftpgo/v2/dataprovider"
  9. "github.com/drakkan/sftpgo/v2/logger"
  10. "github.com/drakkan/sftpgo/v2/metric"
  11. "github.com/drakkan/sftpgo/v2/vfs"
  12. )
  13. var (
  14. // ErrTransferClosed defines the error returned for a closed transfer
  15. ErrTransferClosed = errors.New("transfer already closed")
  16. )
  17. // BaseTransfer contains protocols common transfer details for an upload or a download.
  18. type BaseTransfer struct { //nolint:maligned
  19. ID int64
  20. BytesSent int64
  21. BytesReceived int64
  22. Fs vfs.Fs
  23. File vfs.File
  24. Connection *BaseConnection
  25. cancelFn func()
  26. fsPath string
  27. effectiveFsPath string
  28. requestPath string
  29. ftpMode string
  30. start time.Time
  31. MaxWriteSize int64
  32. MinWriteOffset int64
  33. InitialSize int64
  34. truncatedSize int64
  35. isNewFile bool
  36. transferType int
  37. AbortTransfer int32
  38. aTime time.Time
  39. mTime time.Time
  40. transferQuota dataprovider.TransferQuota
  41. sync.Mutex
  42. errAbort error
  43. ErrTransfer error
  44. }
  45. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  46. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  47. transferType int, minWriteOffset, initialSize, maxWriteSize, truncatedSize int64, isNewFile bool, fs vfs.Fs,
  48. transferQuota dataprovider.TransferQuota,
  49. ) *BaseTransfer {
  50. t := &BaseTransfer{
  51. ID: conn.GetTransferID(),
  52. File: file,
  53. Connection: conn,
  54. cancelFn: cancelFn,
  55. fsPath: fsPath,
  56. effectiveFsPath: effectiveFsPath,
  57. start: time.Now(),
  58. transferType: transferType,
  59. MinWriteOffset: minWriteOffset,
  60. InitialSize: initialSize,
  61. isNewFile: isNewFile,
  62. requestPath: requestPath,
  63. BytesSent: 0,
  64. BytesReceived: 0,
  65. MaxWriteSize: maxWriteSize,
  66. AbortTransfer: 0,
  67. truncatedSize: truncatedSize,
  68. transferQuota: transferQuota,
  69. Fs: fs,
  70. }
  71. conn.AddTransfer(t)
  72. return t
  73. }
  74. // GetTransferQuota returns data transfer quota limits
  75. func (t *BaseTransfer) GetTransferQuota() dataprovider.TransferQuota {
  76. return t.transferQuota
  77. }
  78. // SetFtpMode sets the FTP mode for the current transfer
  79. func (t *BaseTransfer) SetFtpMode(mode string) {
  80. t.ftpMode = mode
  81. }
  82. // GetID returns the transfer ID
  83. func (t *BaseTransfer) GetID() int64 {
  84. return t.ID
  85. }
  86. // GetType returns the transfer type
  87. func (t *BaseTransfer) GetType() int {
  88. return t.transferType
  89. }
  90. // GetSize returns the transferred size
  91. func (t *BaseTransfer) GetSize() int64 {
  92. if t.transferType == TransferDownload {
  93. return atomic.LoadInt64(&t.BytesSent)
  94. }
  95. return atomic.LoadInt64(&t.BytesReceived)
  96. }
  97. // GetDownloadedSize returns the transferred size
  98. func (t *BaseTransfer) GetDownloadedSize() int64 {
  99. return atomic.LoadInt64(&t.BytesSent)
  100. }
  101. // GetUploadedSize returns the transferred size
  102. func (t *BaseTransfer) GetUploadedSize() int64 {
  103. return atomic.LoadInt64(&t.BytesReceived)
  104. }
  105. // GetStartTime returns the start time
  106. func (t *BaseTransfer) GetStartTime() time.Time {
  107. return t.start
  108. }
  109. // GetAbortError returns the error to send to the client if the transfer was aborted
  110. func (t *BaseTransfer) GetAbortError() error {
  111. t.Lock()
  112. defer t.Unlock()
  113. if t.errAbort != nil {
  114. return t.errAbort
  115. }
  116. return getQuotaExceededError(t.Connection.protocol)
  117. }
  118. // SignalClose signals that the transfer should be closed after the next read/write.
  119. // The optional error argument allow to send a specific error, otherwise a generic
  120. // transfer aborted error is sent
  121. func (t *BaseTransfer) SignalClose(err error) {
  122. t.Lock()
  123. t.errAbort = err
  124. t.Unlock()
  125. atomic.StoreInt32(&(t.AbortTransfer), 1)
  126. }
  127. // GetTruncatedSize returns the truncated sized if this is an upload overwriting
  128. // an existing file
  129. func (t *BaseTransfer) GetTruncatedSize() int64 {
  130. return t.truncatedSize
  131. }
  132. // HasSizeLimit returns true if there is an upload or download size limit
  133. func (t *BaseTransfer) HasSizeLimit() bool {
  134. if t.MaxWriteSize > 0 {
  135. return true
  136. }
  137. if t.transferQuota.AllowedDLSize > 0 || t.transferQuota.AllowedULSize > 0 ||
  138. t.transferQuota.AllowedTotalSize > 0 {
  139. return true
  140. }
  141. return false
  142. }
  143. // GetVirtualPath returns the transfer virtual path
  144. func (t *BaseTransfer) GetVirtualPath() string {
  145. return t.requestPath
  146. }
  147. // GetFsPath returns the transfer filesystem path
  148. func (t *BaseTransfer) GetFsPath() string {
  149. return t.fsPath
  150. }
  151. // SetTimes stores access and modification times if fsPath matches the current file
  152. func (t *BaseTransfer) SetTimes(fsPath string, atime time.Time, mtime time.Time) bool {
  153. if fsPath == t.GetFsPath() {
  154. t.aTime = atime
  155. t.mTime = mtime
  156. return true
  157. }
  158. return false
  159. }
  160. // GetRealFsPath returns the real transfer filesystem path.
  161. // If atomic uploads are enabled this differ from fsPath
  162. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  163. if fsPath == t.GetFsPath() {
  164. if t.File != nil {
  165. return t.File.Name()
  166. }
  167. return t.fsPath
  168. }
  169. return ""
  170. }
  171. // SetCancelFn sets the cancel function for the transfer
  172. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  173. t.cancelFn = cancelFn
  174. }
  175. // CheckRead returns an error if read if not allowed
  176. func (t *BaseTransfer) CheckRead() error {
  177. if t.transferQuota.AllowedDLSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  178. return nil
  179. }
  180. if t.transferQuota.AllowedTotalSize > 0 {
  181. if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
  182. return t.Connection.GetReadQuotaExceededError()
  183. }
  184. } else if t.transferQuota.AllowedDLSize > 0 {
  185. if atomic.LoadInt64(&t.BytesSent) > t.transferQuota.AllowedDLSize {
  186. return t.Connection.GetReadQuotaExceededError()
  187. }
  188. }
  189. return nil
  190. }
  191. // CheckWrite returns an error if write if not allowed
  192. func (t *BaseTransfer) CheckWrite() error {
  193. if t.MaxWriteSize > 0 && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
  194. return t.Connection.GetQuotaExceededError()
  195. }
  196. if t.transferQuota.AllowedULSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  197. return nil
  198. }
  199. if t.transferQuota.AllowedTotalSize > 0 {
  200. if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
  201. return t.Connection.GetQuotaExceededError()
  202. }
  203. } else if t.transferQuota.AllowedULSize > 0 {
  204. if atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedULSize {
  205. return t.Connection.GetQuotaExceededError()
  206. }
  207. }
  208. return nil
  209. }
  210. // Truncate changes the size of the opened file.
  211. // Supported for local fs only
  212. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  213. if fsPath == t.GetFsPath() {
  214. if t.File != nil {
  215. initialSize := t.InitialSize
  216. err := t.File.Truncate(size)
  217. if err == nil {
  218. t.Lock()
  219. t.InitialSize = size
  220. if t.MaxWriteSize > 0 {
  221. sizeDiff := initialSize - size
  222. t.MaxWriteSize += sizeDiff
  223. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  224. go func(ulSize, dlSize int64, user dataprovider.User) {
  225. dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
  226. }(atomic.LoadInt64(&t.BytesReceived), atomic.LoadInt64(&t.BytesSent), t.Connection.User)
  227. atomic.StoreInt64(&t.BytesReceived, 0)
  228. }
  229. t.Unlock()
  230. }
  231. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  232. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  233. return initialSize, err
  234. }
  235. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  236. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  237. // for buffered SFTP we can have buffered bytes so we returns an error
  238. if !vfs.IsBufferedSFTPFs(t.Fs) {
  239. return 0, nil
  240. }
  241. }
  242. return 0, vfs.ErrVfsUnsupported
  243. }
  244. return 0, errTransferMismatch
  245. }
  246. // TransferError is called if there is an unexpected error.
  247. // For example network or client issues
  248. func (t *BaseTransfer) TransferError(err error) {
  249. t.Lock()
  250. defer t.Unlock()
  251. if t.ErrTransfer != nil {
  252. return
  253. }
  254. t.ErrTransfer = err
  255. if t.cancelFn != nil {
  256. t.cancelFn()
  257. }
  258. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  259. t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  260. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  261. atomic.LoadInt64(&t.BytesReceived), elapsed)
  262. }
  263. func (t *BaseTransfer) getUploadFileSize() (int64, error) {
  264. var fileSize int64
  265. info, err := t.Fs.Stat(t.fsPath)
  266. if err == nil {
  267. fileSize = info.Size()
  268. }
  269. if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
  270. errDelete := t.Fs.Remove(t.fsPath, false)
  271. if errDelete != nil {
  272. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  273. }
  274. }
  275. return fileSize, err
  276. }
  277. // Close it is called when the transfer is completed.
  278. // It logs the transfer info, updates the user quota (for uploads)
  279. // and executes any defined action.
  280. // If there is an error no action will be executed and, in atomic mode,
  281. // we try to delete the temporary file
  282. func (t *BaseTransfer) Close() error {
  283. defer t.Connection.RemoveTransfer(t)
  284. var err error
  285. numFiles := 0
  286. if t.isNewFile {
  287. numFiles = 1
  288. }
  289. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
  290. t.transferType, t.ErrTransfer)
  291. dataprovider.UpdateUserTransferQuota(&t.Connection.User, atomic.LoadInt64(&t.BytesReceived), //nolint:errcheck
  292. atomic.LoadInt64(&t.BytesSent), false)
  293. if t.File != nil && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
  294. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  295. err = t.Fs.Remove(t.File.Name(), false)
  296. if err == nil {
  297. numFiles--
  298. atomic.StoreInt64(&t.BytesReceived, 0)
  299. t.MinWriteOffset = 0
  300. }
  301. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  302. t.File.Name(), err)
  303. } else if t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath {
  304. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  305. err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  306. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  307. t.effectiveFsPath, t.fsPath, err)
  308. } else {
  309. err = t.Fs.Remove(t.effectiveFsPath, false)
  310. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
  311. "deletion error: %v", t.ErrTransfer, t.effectiveFsPath, err)
  312. if err == nil {
  313. numFiles--
  314. atomic.StoreInt64(&t.BytesReceived, 0)
  315. t.MinWriteOffset = 0
  316. }
  317. }
  318. }
  319. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  320. if t.transferType == TransferDownload {
  321. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  322. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  323. ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "",
  324. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  325. } else {
  326. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  327. if statSize, err := t.getUploadFileSize(); err == nil {
  328. fileSize = statSize
  329. }
  330. t.Connection.Log(logger.LevelDebug, "uploaded file size %v", fileSize)
  331. t.updateQuota(numFiles, fileSize)
  332. t.updateTimes()
  333. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  334. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  335. ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "", fileSize, t.ErrTransfer)
  336. }
  337. if t.ErrTransfer != nil {
  338. t.Connection.Log(logger.LevelError, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  339. if err == nil {
  340. err = t.ErrTransfer
  341. }
  342. }
  343. return err
  344. }
  345. func (t *BaseTransfer) updateTimes() {
  346. if !t.aTime.IsZero() && !t.mTime.IsZero() {
  347. err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
  348. t.Connection.Log(logger.LevelDebug, "set times for file %#v, atime: %v, mtime: %v, err: %v",
  349. t.fsPath, t.aTime, t.mTime, err)
  350. }
  351. }
  352. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  353. // S3 uploads are atomic, if there is an error nothing is uploaded
  354. if t.File == nil && t.ErrTransfer != nil && !t.Connection.User.HasBufferedSFTP(t.GetVirtualPath()) {
  355. return false
  356. }
  357. sizeDiff := fileSize - t.InitialSize
  358. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  359. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  360. if err == nil {
  361. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  362. sizeDiff, false)
  363. if vfolder.IsIncludedInUserQuota() {
  364. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  365. }
  366. } else {
  367. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  368. }
  369. return true
  370. }
  371. return false
  372. }
  373. // HandleThrottle manage bandwidth throttling
  374. func (t *BaseTransfer) HandleThrottle() {
  375. var wantedBandwidth int64
  376. var trasferredBytes int64
  377. if t.transferType == TransferDownload {
  378. wantedBandwidth = t.Connection.User.DownloadBandwidth
  379. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  380. } else {
  381. wantedBandwidth = t.Connection.User.UploadBandwidth
  382. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  383. }
  384. if wantedBandwidth > 0 {
  385. // real and wanted elapsed as milliseconds, bytes as kilobytes
  386. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  387. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  388. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  389. if wantedElapsed > realElapsed {
  390. toSleep := time.Duration(wantedElapsed - realElapsed)
  391. time.Sleep(toSleep * time.Millisecond)
  392. }
  393. }
  394. }