transfer.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "errors"
  17. "fmt"
  18. "io/fs"
  19. "path"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  24. "github.com/drakkan/sftpgo/v2/internal/logger"
  25. "github.com/drakkan/sftpgo/v2/internal/metric"
  26. "github.com/drakkan/sftpgo/v2/internal/vfs"
  27. )
  28. var (
  29. // ErrTransferClosed defines the error returned for a closed transfer
  30. ErrTransferClosed = errors.New("transfer already closed")
  31. )
  32. // BaseTransfer contains protocols common transfer details for an upload or a download.
  33. type BaseTransfer struct { //nolint:maligned
  34. ID int64
  35. BytesSent atomic.Int64
  36. BytesReceived atomic.Int64
  37. Fs vfs.Fs
  38. File vfs.File
  39. Connection *BaseConnection
  40. cancelFn func()
  41. fsPath string
  42. effectiveFsPath string
  43. requestPath string
  44. ftpMode string
  45. start time.Time
  46. MaxWriteSize int64
  47. MinWriteOffset int64
  48. InitialSize int64
  49. truncatedSize int64
  50. isNewFile bool
  51. transferType int
  52. AbortTransfer atomic.Bool
  53. aTime time.Time
  54. mTime time.Time
  55. transferQuota dataprovider.TransferQuota
  56. metadata map[string]string
  57. sync.Mutex
  58. errAbort error
  59. ErrTransfer error
  60. }
  61. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  62. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  63. transferType int, minWriteOffset, initialSize, maxWriteSize, truncatedSize int64, isNewFile bool, fs vfs.Fs,
  64. transferQuota dataprovider.TransferQuota,
  65. ) *BaseTransfer {
  66. t := &BaseTransfer{
  67. ID: conn.GetTransferID(),
  68. File: file,
  69. Connection: conn,
  70. cancelFn: cancelFn,
  71. fsPath: fsPath,
  72. effectiveFsPath: effectiveFsPath,
  73. start: time.Now(),
  74. transferType: transferType,
  75. MinWriteOffset: minWriteOffset,
  76. InitialSize: initialSize,
  77. isNewFile: isNewFile,
  78. requestPath: requestPath,
  79. MaxWriteSize: maxWriteSize,
  80. truncatedSize: truncatedSize,
  81. transferQuota: transferQuota,
  82. Fs: fs,
  83. }
  84. t.AbortTransfer.Store(false)
  85. t.BytesSent.Store(0)
  86. t.BytesReceived.Store(0)
  87. conn.AddTransfer(t)
  88. return t
  89. }
  90. // GetTransferQuota returns data transfer quota limits
  91. func (t *BaseTransfer) GetTransferQuota() dataprovider.TransferQuota {
  92. return t.transferQuota
  93. }
  94. // SetFtpMode sets the FTP mode for the current transfer
  95. func (t *BaseTransfer) SetFtpMode(mode string) {
  96. t.ftpMode = mode
  97. }
  98. // GetID returns the transfer ID
  99. func (t *BaseTransfer) GetID() int64 {
  100. return t.ID
  101. }
  102. // GetType returns the transfer type
  103. func (t *BaseTransfer) GetType() int {
  104. return t.transferType
  105. }
  106. // GetSize returns the transferred size
  107. func (t *BaseTransfer) GetSize() int64 {
  108. if t.transferType == TransferDownload {
  109. return t.BytesSent.Load()
  110. }
  111. return t.BytesReceived.Load()
  112. }
  113. // GetDownloadedSize returns the transferred size
  114. func (t *BaseTransfer) GetDownloadedSize() int64 {
  115. return t.BytesSent.Load()
  116. }
  117. // GetUploadedSize returns the transferred size
  118. func (t *BaseTransfer) GetUploadedSize() int64 {
  119. return t.BytesReceived.Load()
  120. }
  121. // GetStartTime returns the start time
  122. func (t *BaseTransfer) GetStartTime() time.Time {
  123. return t.start
  124. }
  125. // GetAbortError returns the error to send to the client if the transfer was aborted
  126. func (t *BaseTransfer) GetAbortError() error {
  127. t.Lock()
  128. defer t.Unlock()
  129. if t.errAbort != nil {
  130. return t.errAbort
  131. }
  132. return getQuotaExceededError(t.Connection.protocol)
  133. }
  134. // SignalClose signals that the transfer should be closed after the next read/write.
  135. // The optional error argument allow to send a specific error, otherwise a generic
  136. // transfer aborted error is sent
  137. func (t *BaseTransfer) SignalClose(err error) {
  138. t.Lock()
  139. t.errAbort = err
  140. t.Unlock()
  141. t.AbortTransfer.Store(true)
  142. }
  143. // GetTruncatedSize returns the truncated sized if this is an upload overwriting
  144. // an existing file
  145. func (t *BaseTransfer) GetTruncatedSize() int64 {
  146. return t.truncatedSize
  147. }
  148. // HasSizeLimit returns true if there is an upload or download size limit
  149. func (t *BaseTransfer) HasSizeLimit() bool {
  150. if t.MaxWriteSize > 0 {
  151. return true
  152. }
  153. if t.transferQuota.HasSizeLimits() {
  154. return true
  155. }
  156. return false
  157. }
  158. // GetVirtualPath returns the transfer virtual path
  159. func (t *BaseTransfer) GetVirtualPath() string {
  160. return t.requestPath
  161. }
  162. // GetFsPath returns the transfer filesystem path
  163. func (t *BaseTransfer) GetFsPath() string {
  164. return t.fsPath
  165. }
  166. // SetTimes stores access and modification times if fsPath matches the current file
  167. func (t *BaseTransfer) SetTimes(fsPath string, atime time.Time, mtime time.Time) bool {
  168. if fsPath == t.GetFsPath() {
  169. t.aTime = atime
  170. t.mTime = mtime
  171. return true
  172. }
  173. return false
  174. }
  175. // GetRealFsPath returns the real transfer filesystem path.
  176. // If atomic uploads are enabled this differ from fsPath
  177. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  178. if fsPath == t.GetFsPath() {
  179. if t.File != nil || vfs.IsLocalOsFs(t.Fs) {
  180. return t.effectiveFsPath
  181. }
  182. return t.fsPath
  183. }
  184. return ""
  185. }
  186. // SetMetadata sets the metadata for the file
  187. func (t *BaseTransfer) SetMetadata(val map[string]string) {
  188. t.metadata = val
  189. }
  190. // SetCancelFn sets the cancel function for the transfer
  191. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  192. t.cancelFn = cancelFn
  193. }
  194. // ConvertError accepts an error that occurs during a read or write and
  195. // converts it into a more understandable form for the client if it is a
  196. // well-known type of error
  197. func (t *BaseTransfer) ConvertError(err error) error {
  198. var pathError *fs.PathError
  199. if errors.As(err, &pathError) {
  200. return fmt.Errorf("%s %s: %s", pathError.Op, t.GetVirtualPath(), pathError.Err.Error())
  201. }
  202. return t.Connection.GetFsError(t.Fs, err)
  203. }
  204. // CheckRead returns an error if read if not allowed
  205. func (t *BaseTransfer) CheckRead() error {
  206. if t.transferQuota.AllowedDLSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  207. return nil
  208. }
  209. if t.transferQuota.AllowedTotalSize > 0 {
  210. if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
  211. return t.Connection.GetReadQuotaExceededError()
  212. }
  213. } else if t.transferQuota.AllowedDLSize > 0 {
  214. if t.BytesSent.Load() > t.transferQuota.AllowedDLSize {
  215. return t.Connection.GetReadQuotaExceededError()
  216. }
  217. }
  218. return nil
  219. }
  220. // CheckWrite returns an error if write if not allowed
  221. func (t *BaseTransfer) CheckWrite() error {
  222. if t.MaxWriteSize > 0 && t.BytesReceived.Load() > t.MaxWriteSize {
  223. return t.Connection.GetQuotaExceededError()
  224. }
  225. if t.transferQuota.AllowedULSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  226. return nil
  227. }
  228. if t.transferQuota.AllowedTotalSize > 0 {
  229. if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
  230. return t.Connection.GetQuotaExceededError()
  231. }
  232. } else if t.transferQuota.AllowedULSize > 0 {
  233. if t.BytesReceived.Load() > t.transferQuota.AllowedULSize {
  234. return t.Connection.GetQuotaExceededError()
  235. }
  236. }
  237. return nil
  238. }
  239. // Truncate changes the size of the opened file.
  240. // Supported for local fs only
  241. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  242. if fsPath == t.GetFsPath() {
  243. if t.File != nil {
  244. initialSize := t.InitialSize
  245. err := t.File.Truncate(size)
  246. if err == nil {
  247. t.Lock()
  248. t.InitialSize = size
  249. if t.MaxWriteSize > 0 {
  250. sizeDiff := initialSize - size
  251. t.MaxWriteSize += sizeDiff
  252. metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
  253. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  254. if t.transferQuota.HasSizeLimits() {
  255. go func(ulSize, dlSize int64, user dataprovider.User) {
  256. dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
  257. }(t.BytesReceived.Load(), t.BytesSent.Load(), t.Connection.User)
  258. }
  259. t.BytesReceived.Store(0)
  260. }
  261. t.Unlock()
  262. }
  263. t.Connection.Log(logger.LevelDebug, "file %q truncated to size %v max write size %v new initial size %v err: %v",
  264. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  265. return initialSize, err
  266. }
  267. if size == 0 && t.BytesSent.Load() == 0 {
  268. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads.
  269. // For buffered SFTP and local fs we can have buffered bytes so we returns an error
  270. if !vfs.IsBufferedLocalOrSFTPFs(t.Fs) {
  271. return 0, nil
  272. }
  273. }
  274. return 0, vfs.ErrVfsUnsupported
  275. }
  276. return 0, errTransferMismatch
  277. }
  278. // TransferError is called if there is an unexpected error.
  279. // For example network or client issues
  280. func (t *BaseTransfer) TransferError(err error) {
  281. t.Lock()
  282. defer t.Unlock()
  283. if t.ErrTransfer != nil {
  284. return
  285. }
  286. t.ErrTransfer = err
  287. if t.cancelFn != nil {
  288. t.cancelFn()
  289. }
  290. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  291. t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %q, error: \"%v\" bytes sent: %v, "+
  292. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, t.BytesSent.Load(),
  293. t.BytesReceived.Load(), elapsed)
  294. }
  295. func (t *BaseTransfer) getUploadFileSize() (int64, int, error) {
  296. var fileSize int64
  297. var deletedFiles int
  298. info, err := t.Fs.Stat(t.fsPath)
  299. if err == nil {
  300. fileSize = info.Size()
  301. }
  302. if t.ErrTransfer != nil && vfs.IsCryptOsFs(t.Fs) {
  303. errDelete := t.Fs.Remove(t.fsPath, false)
  304. if errDelete != nil {
  305. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %q: %v", t.fsPath, errDelete)
  306. } else {
  307. fileSize = 0
  308. deletedFiles = 1
  309. t.BytesReceived.Store(0)
  310. t.MinWriteOffset = 0
  311. }
  312. }
  313. return fileSize, deletedFiles, err
  314. }
  315. // return 1 if the file is outside the user home dir
  316. func (t *BaseTransfer) checkUploadOutsideHomeDir(err error) int {
  317. if err == nil {
  318. return 0
  319. }
  320. if t.ErrTransfer == nil {
  321. t.ErrTransfer = err
  322. }
  323. if Config.TempPath == "" {
  324. return 0
  325. }
  326. err = t.Fs.Remove(t.effectiveFsPath, false)
  327. t.Connection.Log(logger.LevelWarn, "upload in temp path cannot be renamed, delete temporary file: %q, deletion error: %v",
  328. t.effectiveFsPath, err)
  329. // the file is outside the home dir so don't update the quota
  330. t.BytesReceived.Store(0)
  331. t.MinWriteOffset = 0
  332. return 1
  333. }
  334. // Close it is called when the transfer is completed.
  335. // It logs the transfer info, updates the user quota (for uploads)
  336. // and executes any defined action.
  337. // If there is an error no action will be executed and, in atomic mode,
  338. // we try to delete the temporary file
  339. func (t *BaseTransfer) Close() error {
  340. defer t.Connection.RemoveTransfer(t)
  341. var err error
  342. numFiles := t.getUploadedFiles()
  343. metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
  344. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  345. if t.transferQuota.HasSizeLimits() {
  346. dataprovider.UpdateUserTransferQuota(&t.Connection.User, t.BytesReceived.Load(), //nolint:errcheck
  347. t.BytesSent.Load(), false)
  348. }
  349. if (t.File != nil || vfs.IsLocalOsFs(t.Fs)) && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
  350. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  351. err = t.Fs.Remove(t.effectiveFsPath, false)
  352. if err == nil {
  353. t.BytesReceived.Store(0)
  354. t.MinWriteOffset = 0
  355. }
  356. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %q, deletion error: %v",
  357. t.effectiveFsPath, err)
  358. } else if t.isAtomicUpload() {
  359. if t.ErrTransfer == nil || Config.UploadMode&UploadModeAtomicWithResume != 0 {
  360. _, _, err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  361. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %q -> %q, error: %v",
  362. t.effectiveFsPath, t.fsPath, err)
  363. // the file must be removed if it is uploaded to a path outside the home dir and cannot be renamed
  364. t.checkUploadOutsideHomeDir(err)
  365. } else {
  366. err = t.Fs.Remove(t.effectiveFsPath, false)
  367. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %q, deletion error: %v",
  368. t.ErrTransfer, t.effectiveFsPath, err)
  369. if err == nil {
  370. t.BytesReceived.Store(0)
  371. t.MinWriteOffset = 0
  372. }
  373. }
  374. }
  375. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  376. var uploadFileSize int64
  377. if t.transferType == TransferDownload {
  378. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, t.BytesSent.Load(), t.Connection.User.Username,
  379. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode,
  380. t.ErrTransfer)
  381. ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck
  382. t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata)
  383. } else {
  384. statSize, deletedFiles, errStat := t.getUploadFileSize()
  385. if errStat == nil {
  386. uploadFileSize = statSize
  387. } else {
  388. uploadFileSize = t.BytesReceived.Load() + t.MinWriteOffset
  389. if t.Fs.IsNotExist(errStat) {
  390. uploadFileSize = 0
  391. numFiles--
  392. }
  393. }
  394. numFiles -= deletedFiles
  395. t.Connection.Log(logger.LevelDebug, "upload file size %d, num files %d, deleted files %d, fs path %q",
  396. uploadFileSize, numFiles, deletedFiles, t.fsPath)
  397. numFiles, uploadFileSize = t.executeUploadHook(numFiles, uploadFileSize, elapsed)
  398. t.updateQuota(numFiles, uploadFileSize)
  399. t.updateTimes()
  400. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, t.BytesReceived.Load(), t.Connection.User.Username,
  401. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode,
  402. t.ErrTransfer)
  403. }
  404. if t.ErrTransfer != nil {
  405. t.Connection.Log(logger.LevelError, "transfer error: %v, path: %q", t.ErrTransfer, t.fsPath)
  406. if err == nil {
  407. err = t.ErrTransfer
  408. }
  409. }
  410. t.updateTransferTimestamps(uploadFileSize, elapsed)
  411. return err
  412. }
  413. func (t *BaseTransfer) isAtomicUpload() bool {
  414. return t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath
  415. }
  416. func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize, elapsed int64) {
  417. if t.ErrTransfer != nil {
  418. return
  419. }
  420. if t.transferType == TransferUpload {
  421. if t.Connection.User.FirstUpload == 0 && !t.Connection.uploadDone.Load() {
  422. if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, true); err == nil {
  423. t.Connection.uploadDone.Store(true)
  424. ExecuteActionNotification(t.Connection, operationFirstUpload, t.fsPath, t.requestPath, "", //nolint:errcheck
  425. "", "", uploadFileSize, t.ErrTransfer, elapsed, t.metadata)
  426. }
  427. }
  428. return
  429. }
  430. if t.Connection.User.FirstDownload == 0 && !t.Connection.downloadDone.Load() && t.BytesSent.Load() > 0 {
  431. if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, false); err == nil {
  432. t.Connection.downloadDone.Store(true)
  433. ExecuteActionNotification(t.Connection, operationFirstDownload, t.fsPath, t.requestPath, "", //nolint:errcheck
  434. "", "", t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata)
  435. }
  436. }
  437. }
  438. func (t *BaseTransfer) executeUploadHook(numFiles int, fileSize, elapsed int64) (int, int64) {
  439. err := ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "",
  440. fileSize, t.ErrTransfer, elapsed, t.metadata)
  441. if err != nil {
  442. if t.ErrTransfer == nil {
  443. t.ErrTransfer = err
  444. }
  445. // try to remove the uploaded file
  446. err = t.Fs.Remove(t.fsPath, false)
  447. if err == nil {
  448. numFiles--
  449. fileSize = 0
  450. t.BytesReceived.Store(0)
  451. t.MinWriteOffset = 0
  452. } else {
  453. t.Connection.Log(logger.LevelWarn, "unable to remove path %q after upload hook failure: %v", t.fsPath, err)
  454. }
  455. }
  456. return numFiles, fileSize
  457. }
  458. func (t *BaseTransfer) getUploadedFiles() int {
  459. numFiles := 0
  460. if t.isNewFile {
  461. numFiles = 1
  462. }
  463. return numFiles
  464. }
  465. func (t *BaseTransfer) updateTimes() {
  466. if !t.aTime.IsZero() && !t.mTime.IsZero() {
  467. err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, false)
  468. t.Connection.Log(logger.LevelDebug, "set times for file %q, atime: %v, mtime: %v, err: %v",
  469. t.fsPath, t.aTime, t.mTime, err)
  470. }
  471. }
  472. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  473. // Uploads on some filesystem (S3 and similar) are atomic, if there is an error nothing is uploaded
  474. if t.File == nil && t.ErrTransfer != nil && vfs.HasImplicitAtomicUploads(t.Fs) {
  475. return false
  476. }
  477. sizeDiff := fileSize - t.InitialSize
  478. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff != 0) {
  479. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  480. if err == nil {
  481. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  482. sizeDiff, false)
  483. if vfolder.IsIncludedInUserQuota() {
  484. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  485. }
  486. } else {
  487. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  488. }
  489. return true
  490. }
  491. return false
  492. }
  493. // HandleThrottle manage bandwidth throttling
  494. func (t *BaseTransfer) HandleThrottle() {
  495. var wantedBandwidth int64
  496. var trasferredBytes int64
  497. if t.transferType == TransferDownload {
  498. wantedBandwidth = t.Connection.User.DownloadBandwidth
  499. trasferredBytes = t.BytesSent.Load()
  500. } else {
  501. wantedBandwidth = t.Connection.User.UploadBandwidth
  502. trasferredBytes = t.BytesReceived.Load()
  503. }
  504. if wantedBandwidth > 0 {
  505. // real and wanted elapsed as milliseconds, bytes as kilobytes
  506. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  507. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  508. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  509. if wantedElapsed > realElapsed {
  510. toSleep := time.Duration(wantedElapsed - realElapsed)
  511. time.Sleep(toSleep * time.Millisecond)
  512. }
  513. }
  514. }