transfer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package ftpd
  2. import (
  3. "errors"
  4. "io"
  5. "sync/atomic"
  6. "github.com/eikenb/pipeat"
  7. "github.com/drakkan/sftpgo/v2/common"
  8. "github.com/drakkan/sftpgo/v2/vfs"
  9. )
  10. // transfer contains the transfer details for an upload or a download.
  11. // It implements the ftpserver.FileTransfer interface to handle files downloads and uploads
  12. type transfer struct {
  13. *common.BaseTransfer
  14. writer io.WriteCloser
  15. reader io.ReadCloser
  16. isFinished bool
  17. expectedOffset int64
  18. }
  19. func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
  20. expectedOffset int64) *transfer {
  21. var writer io.WriteCloser
  22. var reader io.ReadCloser
  23. if baseTransfer.File != nil {
  24. writer = baseTransfer.File
  25. reader = baseTransfer.File
  26. } else if pipeWriter != nil {
  27. writer = pipeWriter
  28. } else if pipeReader != nil {
  29. reader = pipeReader
  30. }
  31. return &transfer{
  32. BaseTransfer: baseTransfer,
  33. writer: writer,
  34. reader: reader,
  35. isFinished: false,
  36. expectedOffset: expectedOffset,
  37. }
  38. }
  39. // Read reads the contents to downloads.
  40. func (t *transfer) Read(p []byte) (n int, err error) {
  41. t.Connection.UpdateLastActivity()
  42. n, err = t.reader.Read(p)
  43. atomic.AddInt64(&t.BytesSent, int64(n))
  44. if err == nil {
  45. err = t.CheckRead()
  46. }
  47. if err != nil && err != io.EOF {
  48. t.TransferError(err)
  49. return
  50. }
  51. t.HandleThrottle()
  52. return
  53. }
  54. // Write writes the uploaded contents.
  55. func (t *transfer) Write(p []byte) (n int, err error) {
  56. t.Connection.UpdateLastActivity()
  57. n, err = t.writer.Write(p)
  58. atomic.AddInt64(&t.BytesReceived, int64(n))
  59. if err == nil {
  60. err = t.CheckWrite()
  61. }
  62. if err != nil {
  63. t.TransferError(err)
  64. return
  65. }
  66. t.HandleThrottle()
  67. return
  68. }
  69. // Seek sets the offset to resume an upload or a download
  70. func (t *transfer) Seek(offset int64, whence int) (int64, error) {
  71. t.Connection.UpdateLastActivity()
  72. if t.File != nil {
  73. ret, err := t.File.Seek(offset, whence)
  74. if err != nil {
  75. t.TransferError(err)
  76. }
  77. return ret, err
  78. }
  79. if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart {
  80. return offset, nil
  81. }
  82. t.TransferError(errors.New("seek is unsupported for this transfer"))
  83. return 0, common.ErrOpUnsupported
  84. }
  85. // Close it is called when the transfer is completed.
  86. func (t *transfer) Close() error {
  87. if err := t.setFinished(); err != nil {
  88. return err
  89. }
  90. err := t.closeIO()
  91. errBaseClose := t.BaseTransfer.Close()
  92. if errBaseClose != nil {
  93. err = errBaseClose
  94. }
  95. return t.Connection.GetFsError(t.Fs, err)
  96. }
  97. func (t *transfer) closeIO() error {
  98. var err error
  99. if t.File != nil {
  100. err = t.File.Close()
  101. } else if t.writer != nil {
  102. err = t.writer.Close()
  103. t.Lock()
  104. // we set ErrTransfer here so quota is not updated, in this case the uploads are atomic
  105. if err != nil && t.ErrTransfer == nil {
  106. t.ErrTransfer = err
  107. }
  108. t.Unlock()
  109. } else if t.reader != nil {
  110. err = t.reader.Close()
  111. }
  112. return err
  113. }
  114. func (t *transfer) setFinished() error {
  115. t.Lock()
  116. defer t.Unlock()
  117. if t.isFinished {
  118. return common.ErrTransferClosed
  119. }
  120. t.isFinished = true
  121. return nil
  122. }