transfer.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. // Copyright (C) 2019 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "errors"
  17. "path"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  22. "github.com/drakkan/sftpgo/v2/internal/logger"
  23. "github.com/drakkan/sftpgo/v2/internal/metric"
  24. "github.com/drakkan/sftpgo/v2/internal/vfs"
  25. )
  26. var (
  27. // ErrTransferClosed defines the error returned for a closed transfer
  28. ErrTransferClosed = errors.New("transfer already closed")
  29. )
  30. // BaseTransfer contains protocols common transfer details for an upload or a download.
  31. type BaseTransfer struct { //nolint:maligned
  32. ID int64
  33. BytesSent atomic.Int64
  34. BytesReceived atomic.Int64
  35. Fs vfs.Fs
  36. File vfs.File
  37. Connection *BaseConnection
  38. cancelFn func()
  39. fsPath string
  40. effectiveFsPath string
  41. requestPath string
  42. ftpMode string
  43. start time.Time
  44. MaxWriteSize int64
  45. MinWriteOffset int64
  46. InitialSize int64
  47. truncatedSize int64
  48. isNewFile bool
  49. transferType int
  50. AbortTransfer atomic.Bool
  51. aTime time.Time
  52. mTime time.Time
  53. transferQuota dataprovider.TransferQuota
  54. metadata map[string]string
  55. sync.Mutex
  56. errAbort error
  57. ErrTransfer error
  58. }
  59. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  60. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  61. transferType int, minWriteOffset, initialSize, maxWriteSize, truncatedSize int64, isNewFile bool, fs vfs.Fs,
  62. transferQuota dataprovider.TransferQuota,
  63. ) *BaseTransfer {
  64. t := &BaseTransfer{
  65. ID: conn.GetTransferID(),
  66. File: file,
  67. Connection: conn,
  68. cancelFn: cancelFn,
  69. fsPath: fsPath,
  70. effectiveFsPath: effectiveFsPath,
  71. start: time.Now(),
  72. transferType: transferType,
  73. MinWriteOffset: minWriteOffset,
  74. InitialSize: initialSize,
  75. isNewFile: isNewFile,
  76. requestPath: requestPath,
  77. MaxWriteSize: maxWriteSize,
  78. truncatedSize: truncatedSize,
  79. transferQuota: transferQuota,
  80. Fs: fs,
  81. }
  82. t.AbortTransfer.Store(false)
  83. t.BytesSent.Store(0)
  84. t.BytesReceived.Store(0)
  85. conn.AddTransfer(t)
  86. return t
  87. }
  88. // GetTransferQuota returns data transfer quota limits
  89. func (t *BaseTransfer) GetTransferQuota() dataprovider.TransferQuota {
  90. return t.transferQuota
  91. }
  92. // SetFtpMode sets the FTP mode for the current transfer
  93. func (t *BaseTransfer) SetFtpMode(mode string) {
  94. t.ftpMode = mode
  95. }
  96. // GetID returns the transfer ID
  97. func (t *BaseTransfer) GetID() int64 {
  98. return t.ID
  99. }
  100. // GetType returns the transfer type
  101. func (t *BaseTransfer) GetType() int {
  102. return t.transferType
  103. }
  104. // GetSize returns the transferred size
  105. func (t *BaseTransfer) GetSize() int64 {
  106. if t.transferType == TransferDownload {
  107. return t.BytesSent.Load()
  108. }
  109. return t.BytesReceived.Load()
  110. }
  111. // GetDownloadedSize returns the transferred size
  112. func (t *BaseTransfer) GetDownloadedSize() int64 {
  113. return t.BytesSent.Load()
  114. }
  115. // GetUploadedSize returns the transferred size
  116. func (t *BaseTransfer) GetUploadedSize() int64 {
  117. return t.BytesReceived.Load()
  118. }
  119. // GetStartTime returns the start time
  120. func (t *BaseTransfer) GetStartTime() time.Time {
  121. return t.start
  122. }
  123. // GetAbortError returns the error to send to the client if the transfer was aborted
  124. func (t *BaseTransfer) GetAbortError() error {
  125. t.Lock()
  126. defer t.Unlock()
  127. if t.errAbort != nil {
  128. return t.errAbort
  129. }
  130. return getQuotaExceededError(t.Connection.protocol)
  131. }
  132. // SignalClose signals that the transfer should be closed after the next read/write.
  133. // The optional error argument allow to send a specific error, otherwise a generic
  134. // transfer aborted error is sent
  135. func (t *BaseTransfer) SignalClose(err error) {
  136. t.Lock()
  137. t.errAbort = err
  138. t.Unlock()
  139. t.AbortTransfer.Store(true)
  140. }
  141. // GetTruncatedSize returns the truncated sized if this is an upload overwriting
  142. // an existing file
  143. func (t *BaseTransfer) GetTruncatedSize() int64 {
  144. return t.truncatedSize
  145. }
  146. // HasSizeLimit returns true if there is an upload or download size limit
  147. func (t *BaseTransfer) HasSizeLimit() bool {
  148. if t.MaxWriteSize > 0 {
  149. return true
  150. }
  151. if t.transferQuota.HasSizeLimits() {
  152. return true
  153. }
  154. return false
  155. }
  156. // GetVirtualPath returns the transfer virtual path
  157. func (t *BaseTransfer) GetVirtualPath() string {
  158. return t.requestPath
  159. }
  160. // GetFsPath returns the transfer filesystem path
  161. func (t *BaseTransfer) GetFsPath() string {
  162. return t.fsPath
  163. }
  164. // SetTimes stores access and modification times if fsPath matches the current file
  165. func (t *BaseTransfer) SetTimes(fsPath string, atime time.Time, mtime time.Time) bool {
  166. if fsPath == t.GetFsPath() {
  167. t.aTime = atime
  168. t.mTime = mtime
  169. return true
  170. }
  171. return false
  172. }
  173. // GetRealFsPath returns the real transfer filesystem path.
  174. // If atomic uploads are enabled this differ from fsPath
  175. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  176. if fsPath == t.GetFsPath() {
  177. if t.File != nil || vfs.IsLocalOsFs(t.Fs) {
  178. return t.effectiveFsPath
  179. }
  180. return t.fsPath
  181. }
  182. return ""
  183. }
  184. // SetMetadata sets the metadata for the file
  185. func (t *BaseTransfer) SetMetadata(val map[string]string) {
  186. t.metadata = val
  187. }
  188. // SetCancelFn sets the cancel function for the transfer
  189. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  190. t.cancelFn = cancelFn
  191. }
  192. // ConvertError accepts an error that occurs during a read or write and
  193. // converts it into a more understandable form for the client if it is a
  194. // well-known type of error
  195. func (t *BaseTransfer) ConvertError(err error) error {
  196. if t.Fs.IsNotExist(err) {
  197. return t.Connection.GetNotExistError()
  198. } else if t.Fs.IsPermission(err) {
  199. return t.Connection.GetPermissionDeniedError()
  200. }
  201. return err
  202. }
  203. // CheckRead returns an error if read if not allowed
  204. func (t *BaseTransfer) CheckRead() error {
  205. if t.transferQuota.AllowedDLSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  206. return nil
  207. }
  208. if t.transferQuota.AllowedTotalSize > 0 {
  209. if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
  210. return t.Connection.GetReadQuotaExceededError()
  211. }
  212. } else if t.transferQuota.AllowedDLSize > 0 {
  213. if t.BytesSent.Load() > t.transferQuota.AllowedDLSize {
  214. return t.Connection.GetReadQuotaExceededError()
  215. }
  216. }
  217. return nil
  218. }
  219. // CheckWrite returns an error if write if not allowed
  220. func (t *BaseTransfer) CheckWrite() error {
  221. if t.MaxWriteSize > 0 && t.BytesReceived.Load() > t.MaxWriteSize {
  222. return t.Connection.GetQuotaExceededError()
  223. }
  224. if t.transferQuota.AllowedULSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  225. return nil
  226. }
  227. if t.transferQuota.AllowedTotalSize > 0 {
  228. if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
  229. return t.Connection.GetQuotaExceededError()
  230. }
  231. } else if t.transferQuota.AllowedULSize > 0 {
  232. if t.BytesReceived.Load() > t.transferQuota.AllowedULSize {
  233. return t.Connection.GetQuotaExceededError()
  234. }
  235. }
  236. return nil
  237. }
  238. // Truncate changes the size of the opened file.
  239. // Supported for local fs only
  240. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  241. if fsPath == t.GetFsPath() {
  242. if t.File != nil {
  243. initialSize := t.InitialSize
  244. err := t.File.Truncate(size)
  245. if err == nil {
  246. t.Lock()
  247. t.InitialSize = size
  248. if t.MaxWriteSize > 0 {
  249. sizeDiff := initialSize - size
  250. t.MaxWriteSize += sizeDiff
  251. metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
  252. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  253. if t.transferQuota.HasSizeLimits() {
  254. go func(ulSize, dlSize int64, user dataprovider.User) {
  255. dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
  256. }(t.BytesReceived.Load(), t.BytesSent.Load(), t.Connection.User)
  257. }
  258. t.BytesReceived.Store(0)
  259. }
  260. t.Unlock()
  261. }
  262. t.Connection.Log(logger.LevelDebug, "file %q truncated to size %v max write size %v new initial size %v err: %v",
  263. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  264. return initialSize, err
  265. }
  266. if size == 0 && t.BytesSent.Load() == 0 {
  267. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads.
  268. // For buffered SFTP and local fs we can have buffered bytes so we returns an error
  269. if !vfs.IsBufferedLocalOrSFTPFs(t.Fs) {
  270. return 0, nil
  271. }
  272. }
  273. return 0, vfs.ErrVfsUnsupported
  274. }
  275. return 0, errTransferMismatch
  276. }
  277. // TransferError is called if there is an unexpected error.
  278. // For example network or client issues
  279. func (t *BaseTransfer) TransferError(err error) {
  280. t.Lock()
  281. defer t.Unlock()
  282. if t.ErrTransfer != nil {
  283. return
  284. }
  285. t.ErrTransfer = err
  286. if t.cancelFn != nil {
  287. t.cancelFn()
  288. }
  289. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  290. t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %q, error: \"%v\" bytes sent: %v, "+
  291. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, t.BytesSent.Load(),
  292. t.BytesReceived.Load(), elapsed)
  293. }
  294. func (t *BaseTransfer) getUploadFileSize() (int64, int, error) {
  295. var fileSize int64
  296. var deletedFiles int
  297. info, err := t.Fs.Stat(t.fsPath)
  298. if err == nil {
  299. fileSize = info.Size()
  300. }
  301. if t.ErrTransfer != nil && vfs.IsCryptOsFs(t.Fs) {
  302. errDelete := t.Fs.Remove(t.fsPath, false)
  303. if errDelete != nil {
  304. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %q: %v", t.fsPath, errDelete)
  305. } else {
  306. fileSize = 0
  307. deletedFiles = 1
  308. t.BytesReceived.Store(0)
  309. t.MinWriteOffset = 0
  310. }
  311. }
  312. return fileSize, deletedFiles, err
  313. }
  314. // return 1 if the file is outside the user home dir
  315. func (t *BaseTransfer) checkUploadOutsideHomeDir(err error) int {
  316. if err == nil {
  317. return 0
  318. }
  319. if Config.TempPath == "" {
  320. return 0
  321. }
  322. err = t.Fs.Remove(t.effectiveFsPath, false)
  323. t.Connection.Log(logger.LevelWarn, "upload in temp path cannot be renamed, delete temporary file: %q, deletion error: %v",
  324. t.effectiveFsPath, err)
  325. // the file is outside the home dir so don't update the quota
  326. t.BytesReceived.Store(0)
  327. t.MinWriteOffset = 0
  328. return 1
  329. }
  330. // Close it is called when the transfer is completed.
  331. // It logs the transfer info, updates the user quota (for uploads)
  332. // and executes any defined action.
  333. // If there is an error no action will be executed and, in atomic mode,
  334. // we try to delete the temporary file
  335. func (t *BaseTransfer) Close() error {
  336. defer t.Connection.RemoveTransfer(t)
  337. var err error
  338. numFiles := t.getUploadedFiles()
  339. metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
  340. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  341. if t.transferQuota.HasSizeLimits() {
  342. dataprovider.UpdateUserTransferQuota(&t.Connection.User, t.BytesReceived.Load(), //nolint:errcheck
  343. t.BytesSent.Load(), false)
  344. }
  345. if (t.File != nil || vfs.IsLocalOsFs(t.Fs)) && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
  346. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  347. err = t.Fs.Remove(t.effectiveFsPath, false)
  348. if err == nil {
  349. t.BytesReceived.Store(0)
  350. t.MinWriteOffset = 0
  351. }
  352. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %q, deletion error: %v",
  353. t.effectiveFsPath, err)
  354. } else if t.isAtomicUpload() {
  355. if t.ErrTransfer == nil || Config.UploadMode&UploadModeAtomicWithResume != 0 {
  356. _, _, err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  357. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %q -> %q, error: %v",
  358. t.effectiveFsPath, t.fsPath, err)
  359. // the file must be removed if it is uploaded to a path outside the home dir and cannot be renamed
  360. t.checkUploadOutsideHomeDir(err)
  361. } else {
  362. err = t.Fs.Remove(t.effectiveFsPath, false)
  363. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %q, deletion error: %v",
  364. t.ErrTransfer, t.effectiveFsPath, err)
  365. if err == nil {
  366. t.BytesReceived.Store(0)
  367. t.MinWriteOffset = 0
  368. }
  369. }
  370. }
  371. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  372. var uploadFileSize int64
  373. if t.transferType == TransferDownload {
  374. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, t.BytesSent.Load(), t.Connection.User.Username,
  375. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  376. ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck
  377. t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata)
  378. } else {
  379. statSize, deletedFiles, errStat := t.getUploadFileSize()
  380. if errStat == nil {
  381. uploadFileSize = statSize
  382. } else {
  383. uploadFileSize = t.BytesReceived.Load() + t.MinWriteOffset
  384. if t.Fs.IsNotExist(errStat) {
  385. uploadFileSize = 0
  386. numFiles--
  387. }
  388. }
  389. numFiles -= deletedFiles
  390. t.Connection.Log(logger.LevelDebug, "upload file size %d, num files %d, deleted files %d, fs path %q",
  391. uploadFileSize, numFiles, deletedFiles, t.fsPath)
  392. numFiles, uploadFileSize = t.executeUploadHook(numFiles, uploadFileSize, elapsed)
  393. t.updateQuota(numFiles, uploadFileSize)
  394. t.updateTimes()
  395. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, t.BytesReceived.Load(), t.Connection.User.Username,
  396. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  397. }
  398. if t.ErrTransfer != nil {
  399. t.Connection.Log(logger.LevelError, "transfer error: %v, path: %q", t.ErrTransfer, t.fsPath)
  400. if err == nil {
  401. err = t.ErrTransfer
  402. }
  403. }
  404. t.updateTransferTimestamps(uploadFileSize, elapsed)
  405. return err
  406. }
  407. func (t *BaseTransfer) isAtomicUpload() bool {
  408. return t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath
  409. }
  410. func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize, elapsed int64) {
  411. if t.ErrTransfer != nil {
  412. return
  413. }
  414. if t.transferType == TransferUpload {
  415. if t.Connection.User.FirstUpload == 0 && !t.Connection.uploadDone.Load() {
  416. if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, true); err == nil {
  417. t.Connection.uploadDone.Store(true)
  418. ExecuteActionNotification(t.Connection, operationFirstUpload, t.fsPath, t.requestPath, "", //nolint:errcheck
  419. "", "", uploadFileSize, t.ErrTransfer, elapsed, t.metadata)
  420. }
  421. }
  422. return
  423. }
  424. if t.Connection.User.FirstDownload == 0 && !t.Connection.downloadDone.Load() && t.BytesSent.Load() > 0 {
  425. if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, false); err == nil {
  426. t.Connection.downloadDone.Store(true)
  427. ExecuteActionNotification(t.Connection, operationFirstDownload, t.fsPath, t.requestPath, "", //nolint:errcheck
  428. "", "", t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata)
  429. }
  430. }
  431. }
  432. func (t *BaseTransfer) executeUploadHook(numFiles int, fileSize, elapsed int64) (int, int64) {
  433. err := ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "",
  434. fileSize, t.ErrTransfer, elapsed, t.metadata)
  435. if err != nil {
  436. if t.ErrTransfer == nil {
  437. t.ErrTransfer = err
  438. }
  439. // try to remove the uploaded file
  440. err = t.Fs.Remove(t.fsPath, false)
  441. if err == nil {
  442. numFiles--
  443. fileSize = 0
  444. t.BytesReceived.Store(0)
  445. t.MinWriteOffset = 0
  446. } else {
  447. t.Connection.Log(logger.LevelWarn, "unable to remove path %q after upload hook failure: %v", t.fsPath, err)
  448. }
  449. }
  450. return numFiles, fileSize
  451. }
  452. func (t *BaseTransfer) getUploadedFiles() int {
  453. numFiles := 0
  454. if t.isNewFile {
  455. numFiles = 1
  456. }
  457. return numFiles
  458. }
  459. func (t *BaseTransfer) updateTimes() {
  460. if !t.aTime.IsZero() && !t.mTime.IsZero() {
  461. err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
  462. t.Connection.Log(logger.LevelDebug, "set times for file %q, atime: %v, mtime: %v, err: %v",
  463. t.fsPath, t.aTime, t.mTime, err)
  464. }
  465. }
  466. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  467. // Uploads on some filesystem (S3 and similar) are atomic, if there is an error nothing is uploaded
  468. if t.File == nil && t.ErrTransfer != nil && vfs.HasImplicitAtomicUploads(t.Fs) {
  469. return false
  470. }
  471. sizeDiff := fileSize - t.InitialSize
  472. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff != 0) {
  473. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  474. if err == nil {
  475. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  476. sizeDiff, false)
  477. if vfolder.IsIncludedInUserQuota() {
  478. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  479. }
  480. } else {
  481. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  482. }
  483. return true
  484. }
  485. return false
  486. }
  487. // HandleThrottle manage bandwidth throttling
  488. func (t *BaseTransfer) HandleThrottle() {
  489. var wantedBandwidth int64
  490. var trasferredBytes int64
  491. if t.transferType == TransferDownload {
  492. wantedBandwidth = t.Connection.User.DownloadBandwidth
  493. trasferredBytes = t.BytesSent.Load()
  494. } else {
  495. wantedBandwidth = t.Connection.User.UploadBandwidth
  496. trasferredBytes = t.BytesReceived.Load()
  497. }
  498. if wantedBandwidth > 0 {
  499. // real and wanted elapsed as milliseconds, bytes as kilobytes
  500. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  501. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  502. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  503. if wantedElapsed > realElapsed {
  504. toSleep := time.Duration(wantedElapsed - realElapsed)
  505. time.Sleep(toSleep * time.Millisecond)
  506. }
  507. }
  508. }