transfer.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package sftpd
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. "github.com/drakkan/sftpgo/dataprovider"
  7. "github.com/drakkan/sftpgo/logger"
  8. "github.com/drakkan/sftpgo/metrics"
  9. )
  10. const (
  11. transferUpload = iota
  12. transferDownload
  13. )
  14. // Transfer contains the transfer details for an upload or a download.
  15. // It implements the io Reader and Writer interface to handle files downloads and uploads
  16. type Transfer struct {
  17. file *os.File
  18. path string
  19. start time.Time
  20. bytesSent int64
  21. bytesReceived int64
  22. user dataprovider.User
  23. connectionID string
  24. transferType int
  25. lastActivity time.Time
  26. isNewFile bool
  27. protocol string
  28. transferError error
  29. isFinished bool
  30. minWriteOffset int64
  31. }
  32. // TransferError is called if there is an unexpected error.
  33. // For example network or client issues
  34. func (t *Transfer) TransferError(err error) {
  35. t.transferError = err
  36. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  37. logger.Warn(logSender, t.connectionID, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  38. "bytes received: %v transfer running since %v ms", t.path, t.transferError, t.bytesSent, t.bytesReceived, elapsed)
  39. }
  40. // ReadAt reads len(p) bytes from the File to download starting at byte offset off and updates the bytes sent.
  41. // It handles download bandwidth throttling too
  42. func (t *Transfer) ReadAt(p []byte, off int64) (n int, err error) {
  43. t.lastActivity = time.Now()
  44. readed, e := t.file.ReadAt(p, off)
  45. t.bytesSent += int64(readed)
  46. t.handleThrottle()
  47. return readed, e
  48. }
  49. // WriteAt writes len(p) bytes to the uploaded file starting at byte offset off and updates the bytes received.
  50. // It handles upload bandwidth throttling too
  51. func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) {
  52. t.lastActivity = time.Now()
  53. if off < t.minWriteOffset {
  54. logger.Warn(logSender, t.connectionID, "Invalid write offset %v minimum valid value %v", off, t.minWriteOffset)
  55. return 0, fmt.Errorf("Invalid write offset %v", off)
  56. }
  57. written, e := t.file.WriteAt(p, off)
  58. t.bytesReceived += int64(written)
  59. t.handleThrottle()
  60. return written, e
  61. }
  62. // Close it is called when the transfer is completed.
  63. // It closes the underlying file, log the transfer info, update the user quota (for uploads)
  64. // and execute any defined actions.
  65. // If there is an error no action will be executed and, in atomic mode, we try to delete
  66. // the temporary file
  67. func (t *Transfer) Close() error {
  68. err := t.file.Close()
  69. if t.isFinished {
  70. return err
  71. }
  72. t.isFinished = true
  73. numFiles := 0
  74. if t.isNewFile {
  75. numFiles = 1
  76. }
  77. if t.transferType == transferUpload && t.file.Name() != t.path {
  78. if t.transferError == nil || uploadMode == uploadModeAtomicWithResume {
  79. err = os.Rename(t.file.Name(), t.path)
  80. logger.Debug(logSender, t.connectionID, "atomic upload completed, rename: %#v -> %#v, error: %v",
  81. t.file.Name(), t.path, err)
  82. } else {
  83. err = os.Remove(t.file.Name())
  84. logger.Warn(logSender, t.connectionID, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
  85. "deletion error: %v", t.transferError, t.file.Name(), err)
  86. if err == nil {
  87. numFiles--
  88. t.bytesReceived = 0
  89. }
  90. }
  91. }
  92. if t.transferError == nil {
  93. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  94. if t.transferType == transferDownload {
  95. logger.TransferLog(downloadLogSender, t.path, elapsed, t.bytesSent, t.user.Username, t.connectionID, t.protocol)
  96. executeAction(operationDownload, t.user.Username, t.path, "")
  97. } else {
  98. logger.TransferLog(uploadLogSender, t.path, elapsed, t.bytesReceived, t.user.Username, t.connectionID, t.protocol)
  99. executeAction(operationUpload, t.user.Username, t.path, "")
  100. }
  101. }
  102. metrics.TransferCompleted(t.bytesSent, t.bytesReceived, t.transferType, t.transferError)
  103. removeTransfer(t)
  104. if t.transferType == transferUpload && (numFiles != 0 || t.bytesReceived > 0) {
  105. dataprovider.UpdateUserQuota(dataProvider, t.user, numFiles, t.bytesReceived, false)
  106. }
  107. return err
  108. }
  109. func (t *Transfer) handleThrottle() {
  110. var wantedBandwidth int64
  111. var trasferredBytes int64
  112. if t.transferType == transferDownload {
  113. wantedBandwidth = t.user.DownloadBandwidth
  114. trasferredBytes = t.bytesSent
  115. } else {
  116. wantedBandwidth = t.user.UploadBandwidth
  117. trasferredBytes = t.bytesReceived
  118. }
  119. if wantedBandwidth > 0 {
  120. // real and wanted elapsed as milliseconds, bytes as kilobytes
  121. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  122. // trasferredBytes / 1000 = KB/s, we multiply for 1000 to get milliseconds
  123. wantedElapsed := 1000 * (trasferredBytes / 1000) / wantedBandwidth
  124. if wantedElapsed > realElapsed {
  125. toSleep := time.Duration(wantedElapsed - realElapsed)
  126. time.Sleep(toSleep * time.Millisecond)
  127. }
  128. }
  129. }