transfer.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package common
  2. import (
  3. "errors"
  4. "os"
  5. "path"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/drakkan/sftpgo/dataprovider"
  10. "github.com/drakkan/sftpgo/logger"
  11. "github.com/drakkan/sftpgo/metrics"
  12. "github.com/drakkan/sftpgo/vfs"
  13. )
  14. var (
  15. // ErrTransferClosed defines the error returned for a closed transfer
  16. ErrTransferClosed = errors.New("transfer already closed")
  17. )
  18. // BaseTransfer contains protocols common transfer details for an upload or a download.
  19. type BaseTransfer struct { //nolint:maligned
  20. ID uint64
  21. Fs vfs.Fs
  22. File *os.File
  23. Connection *BaseConnection
  24. cancelFn func()
  25. fsPath string
  26. start time.Time
  27. transferType int
  28. MinWriteOffset int64
  29. InitialSize int64
  30. isNewFile bool
  31. requestPath string
  32. BytesSent int64
  33. BytesReceived int64
  34. MaxWriteSize int64
  35. AbortTransfer int32
  36. sync.Mutex
  37. ErrTransfer error
  38. }
  39. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  40. func NewBaseTransfer(file *os.File, conn *BaseConnection, cancelFn func(), fsPath, requestPath string, transferType int,
  41. minWriteOffset, initialSize, maxWriteSize int64, isNewFile bool, fs vfs.Fs) *BaseTransfer {
  42. t := &BaseTransfer{
  43. ID: conn.GetTransferID(),
  44. File: file,
  45. Connection: conn,
  46. cancelFn: cancelFn,
  47. fsPath: fsPath,
  48. start: time.Now(),
  49. transferType: transferType,
  50. MinWriteOffset: minWriteOffset,
  51. InitialSize: initialSize,
  52. isNewFile: isNewFile,
  53. requestPath: requestPath,
  54. BytesSent: 0,
  55. BytesReceived: 0,
  56. MaxWriteSize: maxWriteSize,
  57. AbortTransfer: 0,
  58. Fs: fs,
  59. }
  60. conn.AddTransfer(t)
  61. return t
  62. }
  63. // GetID returns the transfer ID
  64. func (t *BaseTransfer) GetID() uint64 {
  65. return t.ID
  66. }
  67. // GetType returns the transfer type
  68. func (t *BaseTransfer) GetType() int {
  69. return t.transferType
  70. }
  71. // GetSize returns the transferred size
  72. func (t *BaseTransfer) GetSize() int64 {
  73. if t.transferType == TransferDownload {
  74. return atomic.LoadInt64(&t.BytesSent)
  75. }
  76. return atomic.LoadInt64(&t.BytesReceived)
  77. }
  78. // GetStartTime returns the start time
  79. func (t *BaseTransfer) GetStartTime() time.Time {
  80. return t.start
  81. }
  82. // SignalClose signals that the transfer should be closed.
  83. // For same protocols, for example WebDAV, we have no
  84. // access to the network connection, so we use this method
  85. // to make the next read or write to fail
  86. func (t *BaseTransfer) SignalClose() {
  87. atomic.StoreInt32(&(t.AbortTransfer), 1)
  88. }
  89. // GetVirtualPath returns the transfer virtual path
  90. func (t *BaseTransfer) GetVirtualPath() string {
  91. return t.requestPath
  92. }
  93. // GetFsPath returns the transfer filesystem path
  94. func (t *BaseTransfer) GetFsPath() string {
  95. return t.fsPath
  96. }
  97. // GetRealFsPath returns the real transfer filesystem path.
  98. // If atomic uploads are enabled this differ from fsPath
  99. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  100. if fsPath == t.GetFsPath() {
  101. if t.File != nil {
  102. return t.File.Name()
  103. }
  104. return t.fsPath
  105. }
  106. return ""
  107. }
  108. // SetCancelFn sets the cancel function for the transfer
  109. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  110. t.cancelFn = cancelFn
  111. }
  112. // Truncate changes the size of the opened file.
  113. // Supported for local fs only
  114. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  115. if fsPath == t.GetFsPath() {
  116. if t.File != nil {
  117. initialSize := t.InitialSize
  118. err := t.File.Truncate(size)
  119. if err == nil {
  120. t.Lock()
  121. t.InitialSize = size
  122. if t.MaxWriteSize > 0 {
  123. sizeDiff := initialSize - size
  124. t.MaxWriteSize += sizeDiff
  125. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  126. atomic.StoreInt64(&t.BytesReceived, 0)
  127. }
  128. t.Unlock()
  129. }
  130. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  131. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  132. return initialSize, err
  133. }
  134. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  135. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  136. return 0, nil
  137. }
  138. return 0, ErrOpUnsupported
  139. }
  140. return 0, errTransferMismatch
  141. }
  142. // TransferError is called if there is an unexpected error.
  143. // For example network or client issues
  144. func (t *BaseTransfer) TransferError(err error) {
  145. t.Lock()
  146. defer t.Unlock()
  147. if t.ErrTransfer != nil {
  148. return
  149. }
  150. t.ErrTransfer = err
  151. if t.cancelFn != nil {
  152. t.cancelFn()
  153. }
  154. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  155. t.Connection.Log(logger.LevelWarn, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  156. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  157. atomic.LoadInt64(&t.BytesReceived), elapsed)
  158. }
  159. // Close it is called when the transfer is completed.
  160. // It logs the transfer info, updates the user quota (for uploads)
  161. // and executes any defined action.
  162. // If there is an error no action will be executed and, in atomic mode,
  163. // we try to delete the temporary file
  164. func (t *BaseTransfer) Close() error {
  165. defer t.Connection.RemoveTransfer(t)
  166. var err error
  167. numFiles := 0
  168. if t.isNewFile {
  169. numFiles = 1
  170. }
  171. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  172. if t.ErrTransfer == ErrQuotaExceeded && t.File != nil {
  173. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  174. err = os.Remove(t.File.Name())
  175. if err == nil {
  176. numFiles--
  177. atomic.StoreInt64(&t.BytesReceived, 0)
  178. t.MinWriteOffset = 0
  179. }
  180. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  181. t.File.Name(), err)
  182. } else if t.transferType == TransferUpload && t.File != nil && t.File.Name() != t.fsPath {
  183. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  184. err = os.Rename(t.File.Name(), t.fsPath)
  185. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  186. t.File.Name(), t.fsPath, err)
  187. } else {
  188. err = os.Remove(t.File.Name())
  189. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
  190. "deletion error: %v", t.ErrTransfer, t.File.Name(), err)
  191. if err == nil {
  192. numFiles--
  193. atomic.StoreInt64(&t.BytesReceived, 0)
  194. t.MinWriteOffset = 0
  195. }
  196. }
  197. }
  198. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  199. if t.transferType == TransferDownload {
  200. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  201. t.Connection.ID, t.Connection.protocol)
  202. action := newActionNotification(&t.Connection.User, operationDownload, t.fsPath, "", "", t.Connection.protocol,
  203. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  204. go actionHandler.Handle(action) //nolint:errcheck
  205. } else {
  206. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  207. info, err := t.Fs.Stat(t.fsPath)
  208. if err == nil {
  209. fileSize = info.Size()
  210. }
  211. t.Connection.Log(logger.LevelDebug, "upload file size %v stat error %v", fileSize, err)
  212. t.updateQuota(numFiles, fileSize)
  213. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  214. t.Connection.ID, t.Connection.protocol)
  215. action := newActionNotification(&t.Connection.User, operationUpload, t.fsPath, "", "", t.Connection.protocol,
  216. fileSize, t.ErrTransfer)
  217. go actionHandler.Handle(action) //nolint:errcheck
  218. }
  219. if t.ErrTransfer != nil {
  220. t.Connection.Log(logger.LevelWarn, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  221. if err == nil {
  222. err = t.ErrTransfer
  223. }
  224. }
  225. return err
  226. }
  227. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  228. // S3 uploads are atomic, if there is an error nothing is uploaded
  229. if t.File == nil && t.ErrTransfer != nil {
  230. return false
  231. }
  232. sizeDiff := fileSize - t.InitialSize
  233. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  234. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  235. if err == nil {
  236. dataprovider.UpdateVirtualFolderQuota(vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  237. sizeDiff, false)
  238. if vfolder.IsIncludedInUserQuota() {
  239. dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  240. }
  241. } else {
  242. dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  243. }
  244. return true
  245. }
  246. return false
  247. }
  248. // HandleThrottle manage bandwidth throttling
  249. func (t *BaseTransfer) HandleThrottle() {
  250. var wantedBandwidth int64
  251. var trasferredBytes int64
  252. if t.transferType == TransferDownload {
  253. wantedBandwidth = t.Connection.User.DownloadBandwidth
  254. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  255. } else {
  256. wantedBandwidth = t.Connection.User.UploadBandwidth
  257. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  258. }
  259. if wantedBandwidth > 0 {
  260. // real and wanted elapsed as milliseconds, bytes as kilobytes
  261. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  262. // trasferredBytes / 1000 = KB/s, we multiply for 1000 to get milliseconds
  263. wantedElapsed := 1000 * (trasferredBytes / 1000) / wantedBandwidth
  264. if wantedElapsed > realElapsed {
  265. toSleep := time.Duration(wantedElapsed - realElapsed)
  266. time.Sleep(toSleep * time.Millisecond)
  267. }
  268. }
  269. }