transfer.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package ftpd
  2. import (
  3. "errors"
  4. "io"
  5. "sync/atomic"
  6. "github.com/eikenb/pipeat"
  7. "github.com/drakkan/sftpgo/common"
  8. "github.com/drakkan/sftpgo/vfs"
  9. )
  10. // transfer contains the transfer details for an upload or a download.
  11. // It implements the ftpserver.FileTransfer interface to handle files downloads and uploads
  12. type transfer struct {
  13. *common.BaseTransfer
  14. writer io.WriteCloser
  15. reader io.ReadCloser
  16. isFinished bool
  17. maxWriteSize int64
  18. expectedOffset int64
  19. }
  20. func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
  21. maxWriteSize, expectedOffset int64) *transfer {
  22. var writer io.WriteCloser
  23. var reader io.ReadCloser
  24. if baseTransfer.File != nil {
  25. writer = baseTransfer.File
  26. reader = baseTransfer.File
  27. } else if pipeWriter != nil {
  28. writer = pipeWriter
  29. } else if pipeReader != nil {
  30. reader = pipeReader
  31. }
  32. return &transfer{
  33. BaseTransfer: baseTransfer,
  34. writer: writer,
  35. reader: reader,
  36. isFinished: false,
  37. maxWriteSize: maxWriteSize,
  38. expectedOffset: expectedOffset,
  39. }
  40. }
  41. // Read reads the contents to downloads.
  42. func (t *transfer) Read(p []byte) (n int, err error) {
  43. t.Connection.UpdateLastActivity()
  44. var readed int
  45. var e error
  46. readed, e = t.reader.Read(p)
  47. atomic.AddInt64(&t.BytesSent, int64(readed))
  48. if e != nil && e != io.EOF {
  49. t.TransferError(e)
  50. return readed, e
  51. }
  52. t.HandleThrottle()
  53. return readed, e
  54. }
  55. // Write writes the uploaded contents.
  56. func (t *transfer) Write(p []byte) (n int, err error) {
  57. t.Connection.UpdateLastActivity()
  58. var written int
  59. var e error
  60. written, e = t.writer.Write(p)
  61. atomic.AddInt64(&t.BytesReceived, int64(written))
  62. if t.maxWriteSize > 0 && e == nil && atomic.LoadInt64(&t.BytesReceived) > t.maxWriteSize {
  63. e = common.ErrQuotaExceeded
  64. }
  65. if e != nil {
  66. t.TransferError(e)
  67. return written, e
  68. }
  69. t.HandleThrottle()
  70. return written, e
  71. }
  72. // Seek sets the offset to resume an upload or a download
  73. func (t *transfer) Seek(offset int64, whence int) (int64, error) {
  74. t.Connection.UpdateLastActivity()
  75. if t.File != nil {
  76. ret, err := t.File.Seek(offset, whence)
  77. if err != nil {
  78. t.TransferError(err)
  79. }
  80. return ret, err
  81. }
  82. if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart {
  83. return offset, nil
  84. }
  85. t.TransferError(errors.New("seek is unsupported for this transfer"))
  86. return 0, common.ErrOpUnsupported
  87. }
  88. // Close it is called when the transfer is completed.
  89. func (t *transfer) Close() error {
  90. if err := t.setFinished(); err != nil {
  91. return err
  92. }
  93. err := t.closeIO()
  94. errBaseClose := t.BaseTransfer.Close()
  95. if errBaseClose != nil {
  96. err = errBaseClose
  97. }
  98. return t.Connection.GetFsError(err)
  99. }
  100. func (t *transfer) closeIO() error {
  101. var err error
  102. if t.File != nil {
  103. err = t.File.Close()
  104. } else if t.writer != nil {
  105. err = t.writer.Close()
  106. t.Lock()
  107. // we set ErrTransfer here so quota is not updated, in this case the uploads are atomic
  108. if err != nil && t.ErrTransfer == nil {
  109. t.ErrTransfer = err
  110. }
  111. t.Unlock()
  112. } else if t.reader != nil {
  113. err = t.reader.Close()
  114. }
  115. return err
  116. }
  117. func (t *transfer) setFinished() error {
  118. t.Lock()
  119. defer t.Unlock()
  120. if t.isFinished {
  121. return common.ErrTransferClosed
  122. }
  123. t.isFinished = true
  124. return nil
  125. }