transfer.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package ftpd
  2. import (
  3. "errors"
  4. "io"
  5. "sync/atomic"
  6. "github.com/eikenb/pipeat"
  7. "github.com/drakkan/sftpgo/common"
  8. "github.com/drakkan/sftpgo/vfs"
  9. )
  10. // transfer contains the transfer details for an upload or a download.
  11. // It implements the ftpserver.FileTransfer interface to handle files downloads and uploads
  12. type transfer struct {
  13. *common.BaseTransfer
  14. writer io.WriteCloser
  15. reader io.ReadCloser
  16. isFinished bool
  17. expectedOffset int64
  18. }
  19. func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
  20. expectedOffset int64) *transfer {
  21. var writer io.WriteCloser
  22. var reader io.ReadCloser
  23. if baseTransfer.File != nil {
  24. writer = baseTransfer.File
  25. reader = baseTransfer.File
  26. } else if pipeWriter != nil {
  27. writer = pipeWriter
  28. } else if pipeReader != nil {
  29. reader = pipeReader
  30. }
  31. return &transfer{
  32. BaseTransfer: baseTransfer,
  33. writer: writer,
  34. reader: reader,
  35. isFinished: false,
  36. expectedOffset: expectedOffset,
  37. }
  38. }
  39. // Read reads the contents to downloads.
  40. func (t *transfer) Read(p []byte) (n int, err error) {
  41. t.Connection.UpdateLastActivity()
  42. var readed int
  43. var e error
  44. readed, e = t.reader.Read(p)
  45. atomic.AddInt64(&t.BytesSent, int64(readed))
  46. if e != nil && e != io.EOF {
  47. t.TransferError(e)
  48. return readed, e
  49. }
  50. t.HandleThrottle()
  51. return readed, e
  52. }
  53. // Write writes the uploaded contents.
  54. func (t *transfer) Write(p []byte) (n int, err error) {
  55. t.Connection.UpdateLastActivity()
  56. var written int
  57. var e error
  58. written, e = t.writer.Write(p)
  59. atomic.AddInt64(&t.BytesReceived, int64(written))
  60. if t.MaxWriteSize > 0 && e == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
  61. e = common.ErrQuotaExceeded
  62. }
  63. if e != nil {
  64. t.TransferError(e)
  65. return written, e
  66. }
  67. t.HandleThrottle()
  68. return written, e
  69. }
  70. // Seek sets the offset to resume an upload or a download
  71. func (t *transfer) Seek(offset int64, whence int) (int64, error) {
  72. t.Connection.UpdateLastActivity()
  73. if t.File != nil {
  74. ret, err := t.File.Seek(offset, whence)
  75. if err != nil {
  76. t.TransferError(err)
  77. }
  78. return ret, err
  79. }
  80. if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart {
  81. return offset, nil
  82. }
  83. t.TransferError(errors.New("seek is unsupported for this transfer"))
  84. return 0, common.ErrOpUnsupported
  85. }
  86. // Close it is called when the transfer is completed.
  87. func (t *transfer) Close() error {
  88. if err := t.setFinished(); err != nil {
  89. return err
  90. }
  91. err := t.closeIO()
  92. errBaseClose := t.BaseTransfer.Close()
  93. if errBaseClose != nil {
  94. err = errBaseClose
  95. }
  96. return t.Connection.GetFsError(err)
  97. }
  98. func (t *transfer) closeIO() error {
  99. var err error
  100. if t.File != nil {
  101. err = t.File.Close()
  102. } else if t.writer != nil {
  103. err = t.writer.Close()
  104. t.Lock()
  105. // we set ErrTransfer here so quota is not updated, in this case the uploads are atomic
  106. if err != nil && t.ErrTransfer == nil {
  107. t.ErrTransfer = err
  108. }
  109. t.Unlock()
  110. } else if t.reader != nil {
  111. err = t.reader.Close()
  112. }
  113. return err
  114. }
  115. func (t *transfer) setFinished() error {
  116. t.Lock()
  117. defer t.Unlock()
  118. if t.isFinished {
  119. return common.ErrTransferClosed
  120. }
  121. t.isFinished = true
  122. return nil
  123. }