api-put-object-streaming.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. /*
  2. * Minio Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2017 Minio, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package minio
  18. import (
  19. "context"
  20. "fmt"
  21. "io"
  22. "net/http"
  23. "sort"
  24. "strings"
  25. "github.com/minio/minio-go/pkg/s3utils"
  26. )
  27. // putObjectMultipartStream - upload a large object using
  28. // multipart upload and streaming signature for signing payload.
  29. // Comprehensive put object operation involving multipart uploads.
  30. //
  31. // Following code handles these types of readers.
  32. //
  33. // - *minio.Object
  34. // - Any reader which has a method 'ReadAt()'
  35. //
  36. func (c Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
  37. reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
  38. if !isObject(reader) && isReadAt(reader) {
  39. // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
  40. n, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
  41. } else {
  42. n, err = c.putObjectMultipartStreamNoChecksum(ctx, bucketName, objectName, reader, size, opts)
  43. }
  44. if err != nil {
  45. errResp := ToErrorResponse(err)
  46. // Verify if multipart functionality is not available, if not
  47. // fall back to single PutObject operation.
  48. if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
  49. // Verify if size of reader is greater than '5GiB'.
  50. if size > maxSinglePutObjectSize {
  51. return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
  52. }
  53. // Fall back to uploading as single PutObject operation.
  54. return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
  55. }
  56. }
  57. return n, err
  58. }
  59. // uploadedPartRes - the response received from a part upload.
  60. type uploadedPartRes struct {
  61. Error error // Any error encountered while uploading the part.
  62. PartNum int // Number of the part uploaded.
  63. Size int64 // Size of the part uploaded.
  64. Part *ObjectPart
  65. }
  66. type uploadPartReq struct {
  67. PartNum int // Number of the part uploaded.
  68. Part *ObjectPart // Size of the part uploaded.
  69. }
  70. // putObjectMultipartFromReadAt - Uploads files bigger than 64MiB.
  71. // Supports all readers which implements io.ReaderAt interface
  72. // (ReadAt method).
  73. //
  74. // NOTE: This function is meant to be used for all readers which
  75. // implement io.ReaderAt which allows us for resuming multipart
  76. // uploads but reading at an offset, which would avoid re-read the
  77. // data which was already uploaded. Internally this function uses
  78. // temporary files for staging all the data, these temporary files are
  79. // cleaned automatically when the caller i.e http client closes the
  80. // stream after uploading all the contents successfully.
  81. func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
  82. reader io.ReaderAt, size int64, opts PutObjectOptions) (n int64, err error) {
  83. // Input validation.
  84. if err = s3utils.CheckValidBucketName(bucketName); err != nil {
  85. return 0, err
  86. }
  87. if err = s3utils.CheckValidObjectName(objectName); err != nil {
  88. return 0, err
  89. }
  90. // Calculate the optimal parts info for a given size.
  91. totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
  92. if err != nil {
  93. return 0, err
  94. }
  95. // Initiate a new multipart upload.
  96. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
  97. if err != nil {
  98. return 0, err
  99. }
  100. // Aborts the multipart upload in progress, if the
  101. // function returns any error, since we do not resume
  102. // we should purge the parts which have been uploaded
  103. // to relinquish storage space.
  104. defer func() {
  105. if err != nil {
  106. c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
  107. }
  108. }()
  109. // Total data read and written to server. should be equal to 'size' at the end of the call.
  110. var totalUploadedSize int64
  111. // Complete multipart upload.
  112. var complMultipartUpload completeMultipartUpload
  113. // Declare a channel that sends the next part number to be uploaded.
  114. // Buffered to 10000 because thats the maximum number of parts allowed
  115. // by S3.
  116. uploadPartsCh := make(chan uploadPartReq, 10000)
  117. // Declare a channel that sends back the response of a part upload.
  118. // Buffered to 10000 because thats the maximum number of parts allowed
  119. // by S3.
  120. uploadedPartsCh := make(chan uploadedPartRes, 10000)
  121. // Used for readability, lastPartNumber is always totalPartsCount.
  122. lastPartNumber := totalPartsCount
  123. // Send each part number to the channel to be processed.
  124. for p := 1; p <= totalPartsCount; p++ {
  125. uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil}
  126. }
  127. close(uploadPartsCh)
  128. // Receive each part number from the channel allowing three parallel uploads.
  129. for w := 1; w <= opts.getNumThreads(); w++ {
  130. go func(partSize int64) {
  131. // Each worker will draw from the part channel and upload in parallel.
  132. for uploadReq := range uploadPartsCh {
  133. // If partNumber was not uploaded we calculate the missing
  134. // part offset and size. For all other part numbers we
  135. // calculate offset based on multiples of partSize.
  136. readOffset := int64(uploadReq.PartNum-1) * partSize
  137. // As a special case if partNumber is lastPartNumber, we
  138. // calculate the offset based on the last part size.
  139. if uploadReq.PartNum == lastPartNumber {
  140. readOffset = (size - lastPartSize)
  141. partSize = lastPartSize
  142. }
  143. // Get a section reader on a particular offset.
  144. sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
  145. // Proceed to upload the part.
  146. var objPart ObjectPart
  147. objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID,
  148. sectionReader, uploadReq.PartNum,
  149. "", "", partSize, opts.UserMetadata)
  150. if err != nil {
  151. uploadedPartsCh <- uploadedPartRes{
  152. Size: 0,
  153. Error: err,
  154. }
  155. // Exit the goroutine.
  156. return
  157. }
  158. // Save successfully uploaded part metadata.
  159. uploadReq.Part = &objPart
  160. // Send successful part info through the channel.
  161. uploadedPartsCh <- uploadedPartRes{
  162. Size: objPart.Size,
  163. PartNum: uploadReq.PartNum,
  164. Part: uploadReq.Part,
  165. Error: nil,
  166. }
  167. }
  168. }(partSize)
  169. }
  170. // Gather the responses as they occur and update any
  171. // progress bar.
  172. for u := 1; u <= totalPartsCount; u++ {
  173. uploadRes := <-uploadedPartsCh
  174. if uploadRes.Error != nil {
  175. return totalUploadedSize, uploadRes.Error
  176. }
  177. // Retrieve each uploaded part and store it to be completed.
  178. // part, ok := partsInfo[uploadRes.PartNum]
  179. part := uploadRes.Part
  180. if part == nil {
  181. return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum))
  182. }
  183. // Update the totalUploadedSize.
  184. totalUploadedSize += uploadRes.Size
  185. // Store the parts to be completed in order.
  186. complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
  187. ETag: part.ETag,
  188. PartNumber: part.PartNumber,
  189. })
  190. }
  191. // Verify if we uploaded all the data.
  192. if totalUploadedSize != size {
  193. return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
  194. }
  195. // Sort all completed parts.
  196. sort.Sort(completedParts(complMultipartUpload.Parts))
  197. _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
  198. if err != nil {
  199. return totalUploadedSize, err
  200. }
  201. // Return final size.
  202. return totalUploadedSize, nil
  203. }
  204. func (c Client) putObjectMultipartStreamNoChecksum(ctx context.Context, bucketName, objectName string,
  205. reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
  206. // Input validation.
  207. if err = s3utils.CheckValidBucketName(bucketName); err != nil {
  208. return 0, err
  209. }
  210. if err = s3utils.CheckValidObjectName(objectName); err != nil {
  211. return 0, err
  212. }
  213. // Calculate the optimal parts info for a given size.
  214. totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
  215. if err != nil {
  216. return 0, err
  217. }
  218. // Initiates a new multipart request
  219. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
  220. if err != nil {
  221. return 0, err
  222. }
  223. // Aborts the multipart upload if the function returns
  224. // any error, since we do not resume we should purge
  225. // the parts which have been uploaded to relinquish
  226. // storage space.
  227. defer func() {
  228. if err != nil {
  229. c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
  230. }
  231. }()
  232. // Total data read and written to server. should be equal to 'size' at the end of the call.
  233. var totalUploadedSize int64
  234. // Initialize parts uploaded map.
  235. partsInfo := make(map[int]ObjectPart)
  236. // Part number always starts with '1'.
  237. var partNumber int
  238. for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
  239. // Update progress reader appropriately to the latest offset
  240. // as we read from the source.
  241. hookReader := newHook(reader, opts.Progress)
  242. // Proceed to upload the part.
  243. if partNumber == totalPartsCount {
  244. partSize = lastPartSize
  245. }
  246. var objPart ObjectPart
  247. objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID,
  248. io.LimitReader(hookReader, partSize),
  249. partNumber, "", "", partSize, opts.UserMetadata)
  250. if err != nil {
  251. return totalUploadedSize, err
  252. }
  253. // Save successfully uploaded part metadata.
  254. partsInfo[partNumber] = objPart
  255. // Save successfully uploaded size.
  256. totalUploadedSize += partSize
  257. }
  258. // Verify if we uploaded all the data.
  259. if size > 0 {
  260. if totalUploadedSize != size {
  261. return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
  262. }
  263. }
  264. // Complete multipart upload.
  265. var complMultipartUpload completeMultipartUpload
  266. // Loop over total uploaded parts to save them in
  267. // Parts array before completing the multipart request.
  268. for i := 1; i < partNumber; i++ {
  269. part, ok := partsInfo[i]
  270. if !ok {
  271. return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i))
  272. }
  273. complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
  274. ETag: part.ETag,
  275. PartNumber: part.PartNumber,
  276. })
  277. }
  278. // Sort all completed parts.
  279. sort.Sort(completedParts(complMultipartUpload.Parts))
  280. _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
  281. if err != nil {
  282. return totalUploadedSize, err
  283. }
  284. // Return final size.
  285. return totalUploadedSize, nil
  286. }
  287. // putObjectNoChecksum special function used Google Cloud Storage. This special function
  288. // is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
  289. func (c Client) putObjectNoChecksum(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
  290. // Input validation.
  291. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  292. return 0, err
  293. }
  294. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  295. return 0, err
  296. }
  297. // Size -1 is only supported on Google Cloud Storage, we error
  298. // out in all other situations.
  299. if size < 0 && !s3utils.IsGoogleEndpoint(c.endpointURL) {
  300. return 0, ErrEntityTooSmall(size, bucketName, objectName)
  301. }
  302. if size > 0 {
  303. if isReadAt(reader) && !isObject(reader) {
  304. seeker, _ := reader.(io.Seeker)
  305. offset, err := seeker.Seek(0, io.SeekCurrent)
  306. if err != nil {
  307. return 0, ErrInvalidArgument(err.Error())
  308. }
  309. reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size)
  310. }
  311. }
  312. // Update progress reader appropriately to the latest offset as we
  313. // read from the source.
  314. readSeeker := newHook(reader, opts.Progress)
  315. // This function does not calculate sha256 and md5sum for payload.
  316. // Execute put object.
  317. st, err := c.putObjectDo(ctx, bucketName, objectName, readSeeker, "", "", size, opts)
  318. if err != nil {
  319. return 0, err
  320. }
  321. if st.Size != size {
  322. return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName)
  323. }
  324. return size, nil
  325. }
  326. // putObjectDo - executes the put object http operation.
  327. // NOTE: You must have WRITE permissions on a bucket to add an object to it.
  328. func (c Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (ObjectInfo, error) {
  329. // Input validation.
  330. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  331. return ObjectInfo{}, err
  332. }
  333. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  334. return ObjectInfo{}, err
  335. }
  336. // Set headers.
  337. customHeader := opts.Header()
  338. // Populate request metadata.
  339. reqMetadata := requestMetadata{
  340. bucketName: bucketName,
  341. objectName: objectName,
  342. customHeader: customHeader,
  343. contentBody: reader,
  344. contentLength: size,
  345. contentMD5Base64: md5Base64,
  346. contentSHA256Hex: sha256Hex,
  347. }
  348. // Execute PUT an objectName.
  349. resp, err := c.executeMethod(ctx, "PUT", reqMetadata)
  350. defer closeResponse(resp)
  351. if err != nil {
  352. return ObjectInfo{}, err
  353. }
  354. if resp != nil {
  355. if resp.StatusCode != http.StatusOK {
  356. return ObjectInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
  357. }
  358. }
  359. var objInfo ObjectInfo
  360. // Trim off the odd double quotes from ETag in the beginning and end.
  361. objInfo.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
  362. objInfo.ETag = strings.TrimSuffix(objInfo.ETag, "\"")
  363. // A success here means data was written to server successfully.
  364. objInfo.Size = size
  365. // Return here.
  366. return objInfo, nil
  367. }