transfer.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package common
  2. import (
  3. "errors"
  4. "path"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/drakkan/sftpgo/dataprovider"
  9. "github.com/drakkan/sftpgo/logger"
  10. "github.com/drakkan/sftpgo/metrics"
  11. "github.com/drakkan/sftpgo/vfs"
  12. )
  13. var (
  14. // ErrTransferClosed defines the error returned for a closed transfer
  15. ErrTransferClosed = errors.New("transfer already closed")
  16. )
  17. // BaseTransfer contains protocols common transfer details for an upload or a download.
  18. type BaseTransfer struct { //nolint:maligned
  19. ID uint64
  20. Fs vfs.Fs
  21. File vfs.File
  22. Connection *BaseConnection
  23. cancelFn func()
  24. fsPath string
  25. start time.Time
  26. transferType int
  27. MinWriteOffset int64
  28. InitialSize int64
  29. isNewFile bool
  30. requestPath string
  31. BytesSent int64
  32. BytesReceived int64
  33. MaxWriteSize int64
  34. AbortTransfer int32
  35. sync.Mutex
  36. ErrTransfer error
  37. }
  38. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  39. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, requestPath string, transferType int,
  40. minWriteOffset, initialSize, maxWriteSize int64, isNewFile bool, fs vfs.Fs) *BaseTransfer {
  41. t := &BaseTransfer{
  42. ID: conn.GetTransferID(),
  43. File: file,
  44. Connection: conn,
  45. cancelFn: cancelFn,
  46. fsPath: fsPath,
  47. start: time.Now(),
  48. transferType: transferType,
  49. MinWriteOffset: minWriteOffset,
  50. InitialSize: initialSize,
  51. isNewFile: isNewFile,
  52. requestPath: requestPath,
  53. BytesSent: 0,
  54. BytesReceived: 0,
  55. MaxWriteSize: maxWriteSize,
  56. AbortTransfer: 0,
  57. Fs: fs,
  58. }
  59. conn.AddTransfer(t)
  60. return t
  61. }
  62. // GetID returns the transfer ID
  63. func (t *BaseTransfer) GetID() uint64 {
  64. return t.ID
  65. }
  66. // GetType returns the transfer type
  67. func (t *BaseTransfer) GetType() int {
  68. return t.transferType
  69. }
  70. // GetSize returns the transferred size
  71. func (t *BaseTransfer) GetSize() int64 {
  72. if t.transferType == TransferDownload {
  73. return atomic.LoadInt64(&t.BytesSent)
  74. }
  75. return atomic.LoadInt64(&t.BytesReceived)
  76. }
  77. // GetStartTime returns the start time
  78. func (t *BaseTransfer) GetStartTime() time.Time {
  79. return t.start
  80. }
  81. // SignalClose signals that the transfer should be closed.
  82. // For same protocols, for example WebDAV, we have no
  83. // access to the network connection, so we use this method
  84. // to make the next read or write to fail
  85. func (t *BaseTransfer) SignalClose() {
  86. atomic.StoreInt32(&(t.AbortTransfer), 1)
  87. }
  88. // GetVirtualPath returns the transfer virtual path
  89. func (t *BaseTransfer) GetVirtualPath() string {
  90. return t.requestPath
  91. }
  92. // GetFsPath returns the transfer filesystem path
  93. func (t *BaseTransfer) GetFsPath() string {
  94. return t.fsPath
  95. }
  96. // GetRealFsPath returns the real transfer filesystem path.
  97. // If atomic uploads are enabled this differ from fsPath
  98. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  99. if fsPath == t.GetFsPath() {
  100. if t.File != nil {
  101. return t.File.Name()
  102. }
  103. return t.fsPath
  104. }
  105. return ""
  106. }
  107. // SetCancelFn sets the cancel function for the transfer
  108. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  109. t.cancelFn = cancelFn
  110. }
  111. // Truncate changes the size of the opened file.
  112. // Supported for local fs only
  113. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  114. if fsPath == t.GetFsPath() {
  115. if t.File != nil {
  116. initialSize := t.InitialSize
  117. err := t.File.Truncate(size)
  118. if err == nil {
  119. t.Lock()
  120. t.InitialSize = size
  121. if t.MaxWriteSize > 0 {
  122. sizeDiff := initialSize - size
  123. t.MaxWriteSize += sizeDiff
  124. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  125. atomic.StoreInt64(&t.BytesReceived, 0)
  126. }
  127. t.Unlock()
  128. }
  129. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  130. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  131. return initialSize, err
  132. }
  133. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  134. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  135. return 0, nil
  136. }
  137. return 0, ErrOpUnsupported
  138. }
  139. return 0, errTransferMismatch
  140. }
  141. // TransferError is called if there is an unexpected error.
  142. // For example network or client issues
  143. func (t *BaseTransfer) TransferError(err error) {
  144. t.Lock()
  145. defer t.Unlock()
  146. if t.ErrTransfer != nil {
  147. return
  148. }
  149. t.ErrTransfer = err
  150. if t.cancelFn != nil {
  151. t.cancelFn()
  152. }
  153. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  154. t.Connection.Log(logger.LevelWarn, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  155. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  156. atomic.LoadInt64(&t.BytesReceived), elapsed)
  157. }
  158. func (t *BaseTransfer) getUploadFileSize() (int64, error) {
  159. var fileSize int64
  160. info, err := t.Fs.Stat(t.fsPath)
  161. if err == nil {
  162. fileSize = info.Size()
  163. }
  164. if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
  165. errDelete := t.Connection.Fs.Remove(t.fsPath, false)
  166. if errDelete != nil {
  167. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  168. }
  169. }
  170. return fileSize, err
  171. }
  172. // Close it is called when the transfer is completed.
  173. // It logs the transfer info, updates the user quota (for uploads)
  174. // and executes any defined action.
  175. // If there is an error no action will be executed and, in atomic mode,
  176. // we try to delete the temporary file
  177. func (t *BaseTransfer) Close() error {
  178. defer t.Connection.RemoveTransfer(t)
  179. var err error
  180. numFiles := 0
  181. if t.isNewFile {
  182. numFiles = 1
  183. }
  184. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  185. if t.ErrTransfer == ErrQuotaExceeded && t.File != nil {
  186. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  187. err = t.Connection.Fs.Remove(t.File.Name(), false)
  188. if err == nil {
  189. numFiles--
  190. atomic.StoreInt64(&t.BytesReceived, 0)
  191. t.MinWriteOffset = 0
  192. }
  193. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  194. t.File.Name(), err)
  195. } else if t.transferType == TransferUpload && t.File != nil && t.File.Name() != t.fsPath {
  196. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  197. err = t.Connection.Fs.Rename(t.File.Name(), t.fsPath)
  198. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  199. t.File.Name(), t.fsPath, err)
  200. } else {
  201. err = t.Connection.Fs.Remove(t.File.Name(), false)
  202. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
  203. "deletion error: %v", t.ErrTransfer, t.File.Name(), err)
  204. if err == nil {
  205. numFiles--
  206. atomic.StoreInt64(&t.BytesReceived, 0)
  207. t.MinWriteOffset = 0
  208. }
  209. }
  210. }
  211. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  212. if t.transferType == TransferDownload {
  213. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  214. t.Connection.ID, t.Connection.protocol)
  215. action := newActionNotification(&t.Connection.User, operationDownload, t.fsPath, "", "", t.Connection.protocol,
  216. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  217. go actionHandler.Handle(action) //nolint:errcheck
  218. } else {
  219. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  220. if statSize, err := t.getUploadFileSize(); err == nil {
  221. fileSize = statSize
  222. }
  223. t.Connection.Log(logger.LevelDebug, "uploaded file size %v", fileSize)
  224. t.updateQuota(numFiles, fileSize)
  225. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  226. t.Connection.ID, t.Connection.protocol)
  227. action := newActionNotification(&t.Connection.User, operationUpload, t.fsPath, "", "", t.Connection.protocol,
  228. fileSize, t.ErrTransfer)
  229. go actionHandler.Handle(action) //nolint:errcheck
  230. }
  231. if t.ErrTransfer != nil {
  232. t.Connection.Log(logger.LevelWarn, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  233. if err == nil {
  234. err = t.ErrTransfer
  235. }
  236. }
  237. return err
  238. }
  239. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  240. // S3 uploads are atomic, if there is an error nothing is uploaded
  241. if t.File == nil && t.ErrTransfer != nil {
  242. return false
  243. }
  244. sizeDiff := fileSize - t.InitialSize
  245. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  246. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  247. if err == nil {
  248. dataprovider.UpdateVirtualFolderQuota(vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  249. sizeDiff, false)
  250. if vfolder.IsIncludedInUserQuota() {
  251. dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  252. }
  253. } else {
  254. dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  255. }
  256. return true
  257. }
  258. return false
  259. }
  260. // HandleThrottle manage bandwidth throttling
  261. func (t *BaseTransfer) HandleThrottle() {
  262. var wantedBandwidth int64
  263. var trasferredBytes int64
  264. if t.transferType == TransferDownload {
  265. wantedBandwidth = t.Connection.User.DownloadBandwidth
  266. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  267. } else {
  268. wantedBandwidth = t.Connection.User.UploadBandwidth
  269. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  270. }
  271. if wantedBandwidth > 0 {
  272. // real and wanted elapsed as milliseconds, bytes as kilobytes
  273. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  274. // trasferredBytes / 1000 = KB/s, we multiply for 1000 to get milliseconds
  275. wantedElapsed := 1000 * (trasferredBytes / 1000) / wantedBandwidth
  276. if wantedElapsed > realElapsed {
  277. toSleep := time.Duration(wantedElapsed - realElapsed)
  278. time.Sleep(toSleep * time.Millisecond)
  279. }
  280. }
  281. }