api-compose-object.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. /*
  2. * Minio Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2017 Minio, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package minio
  18. import (
  19. "context"
  20. "encoding/base64"
  21. "fmt"
  22. "net/http"
  23. "net/url"
  24. "strconv"
  25. "strings"
  26. "time"
  27. "github.com/minio/minio-go/pkg/s3utils"
  28. )
  29. // SSEInfo - represents Server-Side-Encryption parameters specified by
  30. // a user.
  31. type SSEInfo struct {
  32. key []byte
  33. algo string
  34. }
  35. // NewSSEInfo - specifies (binary or un-encoded) encryption key and
  36. // algorithm name. If algo is empty, it defaults to "AES256". Ref:
  37. // https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
  38. func NewSSEInfo(key []byte, algo string) SSEInfo {
  39. if algo == "" {
  40. algo = "AES256"
  41. }
  42. return SSEInfo{key, algo}
  43. }
  44. // internal method that computes SSE-C headers
  45. func (s *SSEInfo) getSSEHeaders(isCopySource bool) map[string]string {
  46. if s == nil {
  47. return nil
  48. }
  49. cs := ""
  50. if isCopySource {
  51. cs = "copy-source-"
  52. }
  53. return map[string]string{
  54. "x-amz-" + cs + "server-side-encryption-customer-algorithm": s.algo,
  55. "x-amz-" + cs + "server-side-encryption-customer-key": base64.StdEncoding.EncodeToString(s.key),
  56. "x-amz-" + cs + "server-side-encryption-customer-key-MD5": sumMD5Base64(s.key),
  57. }
  58. }
  59. // GetSSEHeaders - computes and returns headers for SSE-C as key-value
  60. // pairs. They can be set as metadata in PutObject* requests (for
  61. // encryption) or be set as request headers in `Core.GetObject` (for
  62. // decryption).
  63. func (s *SSEInfo) GetSSEHeaders() map[string]string {
  64. return s.getSSEHeaders(false)
  65. }
  66. // DestinationInfo - type with information about the object to be
  67. // created via server-side copy requests, using the Compose API.
  68. type DestinationInfo struct {
  69. bucket, object string
  70. // key for encrypting destination
  71. encryption *SSEInfo
  72. // if no user-metadata is provided, it is copied from source
  73. // (when there is only once source object in the compose
  74. // request)
  75. userMetadata map[string]string
  76. }
  77. // NewDestinationInfo - creates a compose-object/copy-source
  78. // destination info object.
  79. //
  80. // `encSSEC` is the key info for server-side-encryption with customer
  81. // provided key. If it is nil, no encryption is performed.
  82. //
  83. // `userMeta` is the user-metadata key-value pairs to be set on the
  84. // destination. The keys are automatically prefixed with `x-amz-meta-`
  85. // if needed. If nil is passed, and if only a single source (of any
  86. // size) is provided in the ComposeObject call, then metadata from the
  87. // source is copied to the destination.
  88. func NewDestinationInfo(bucket, object string, encryptSSEC *SSEInfo,
  89. userMeta map[string]string) (d DestinationInfo, err error) {
  90. // Input validation.
  91. if err = s3utils.CheckValidBucketName(bucket); err != nil {
  92. return d, err
  93. }
  94. if err = s3utils.CheckValidObjectName(object); err != nil {
  95. return d, err
  96. }
  97. // Process custom-metadata to remove a `x-amz-meta-` prefix if
  98. // present and validate that keys are distinct (after this
  99. // prefix removal).
  100. m := make(map[string]string)
  101. for k, v := range userMeta {
  102. if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
  103. k = k[len("x-amz-meta-"):]
  104. }
  105. if _, ok := m[k]; ok {
  106. return d, ErrInvalidArgument(fmt.Sprintf("Cannot add both %s and x-amz-meta-%s keys as custom metadata", k, k))
  107. }
  108. m[k] = v
  109. }
  110. return DestinationInfo{
  111. bucket: bucket,
  112. object: object,
  113. encryption: encryptSSEC,
  114. userMetadata: m,
  115. }, nil
  116. }
  117. // getUserMetaHeadersMap - construct appropriate key-value pairs to send
  118. // as headers from metadata map to pass into copy-object request. For
  119. // single part copy-object (i.e. non-multipart object), enable the
  120. // withCopyDirectiveHeader to set the `x-amz-metadata-directive` to
  121. // `REPLACE`, so that metadata headers from the source are not copied
  122. // over.
  123. func (d *DestinationInfo) getUserMetaHeadersMap(withCopyDirectiveHeader bool) map[string]string {
  124. if len(d.userMetadata) == 0 {
  125. return nil
  126. }
  127. r := make(map[string]string)
  128. if withCopyDirectiveHeader {
  129. r["x-amz-metadata-directive"] = "REPLACE"
  130. }
  131. for k, v := range d.userMetadata {
  132. r["x-amz-meta-"+k] = v
  133. }
  134. return r
  135. }
  136. // SourceInfo - represents a source object to be copied, using
  137. // server-side copying APIs.
  138. type SourceInfo struct {
  139. bucket, object string
  140. start, end int64
  141. decryptKey *SSEInfo
  142. // Headers to send with the upload-part-copy request involving
  143. // this source object.
  144. Headers http.Header
  145. }
  146. // NewSourceInfo - create a compose-object/copy-object source info
  147. // object.
  148. //
  149. // `decryptSSEC` is the decryption key using server-side-encryption
  150. // with customer provided key. It may be nil if the source is not
  151. // encrypted.
  152. func NewSourceInfo(bucket, object string, decryptSSEC *SSEInfo) SourceInfo {
  153. r := SourceInfo{
  154. bucket: bucket,
  155. object: object,
  156. start: -1, // range is unspecified by default
  157. decryptKey: decryptSSEC,
  158. Headers: make(http.Header),
  159. }
  160. // Set the source header
  161. r.Headers.Set("x-amz-copy-source", s3utils.EncodePath(bucket+"/"+object))
  162. // Assemble decryption headers for upload-part-copy request
  163. for k, v := range decryptSSEC.getSSEHeaders(true) {
  164. r.Headers.Set(k, v)
  165. }
  166. return r
  167. }
  168. // SetRange - Set the start and end offset of the source object to be
  169. // copied. If this method is not called, the whole source object is
  170. // copied.
  171. func (s *SourceInfo) SetRange(start, end int64) error {
  172. if start > end || start < 0 {
  173. return ErrInvalidArgument("start must be non-negative, and start must be at most end.")
  174. }
  175. // Note that 0 <= start <= end
  176. s.start, s.end = start, end
  177. return nil
  178. }
  179. // SetMatchETagCond - Set ETag match condition. The object is copied
  180. // only if the etag of the source matches the value given here.
  181. func (s *SourceInfo) SetMatchETagCond(etag string) error {
  182. if etag == "" {
  183. return ErrInvalidArgument("ETag cannot be empty.")
  184. }
  185. s.Headers.Set("x-amz-copy-source-if-match", etag)
  186. return nil
  187. }
  188. // SetMatchETagExceptCond - Set the ETag match exception
  189. // condition. The object is copied only if the etag of the source is
  190. // not the value given here.
  191. func (s *SourceInfo) SetMatchETagExceptCond(etag string) error {
  192. if etag == "" {
  193. return ErrInvalidArgument("ETag cannot be empty.")
  194. }
  195. s.Headers.Set("x-amz-copy-source-if-none-match", etag)
  196. return nil
  197. }
  198. // SetModifiedSinceCond - Set the modified since condition.
  199. func (s *SourceInfo) SetModifiedSinceCond(modTime time.Time) error {
  200. if modTime.IsZero() {
  201. return ErrInvalidArgument("Input time cannot be 0.")
  202. }
  203. s.Headers.Set("x-amz-copy-source-if-modified-since", modTime.Format(http.TimeFormat))
  204. return nil
  205. }
  206. // SetUnmodifiedSinceCond - Set the unmodified since condition.
  207. func (s *SourceInfo) SetUnmodifiedSinceCond(modTime time.Time) error {
  208. if modTime.IsZero() {
  209. return ErrInvalidArgument("Input time cannot be 0.")
  210. }
  211. s.Headers.Set("x-amz-copy-source-if-unmodified-since", modTime.Format(http.TimeFormat))
  212. return nil
  213. }
  214. // Helper to fetch size and etag of an object using a StatObject call.
  215. func (s *SourceInfo) getProps(c Client) (size int64, etag string, userMeta map[string]string, err error) {
  216. // Get object info - need size and etag here. Also, decryption
  217. // headers are added to the stat request if given.
  218. var objInfo ObjectInfo
  219. opts := StatObjectOptions{}
  220. for k, v := range s.decryptKey.getSSEHeaders(false) {
  221. opts.Set(k, v)
  222. }
  223. objInfo, err = c.statObject(context.Background(), s.bucket, s.object, opts)
  224. if err != nil {
  225. err = ErrInvalidArgument(fmt.Sprintf("Could not stat object - %s/%s: %v", s.bucket, s.object, err))
  226. } else {
  227. size = objInfo.Size
  228. etag = objInfo.ETag
  229. userMeta = make(map[string]string)
  230. for k, v := range objInfo.Metadata {
  231. if strings.HasPrefix(k, "x-amz-meta-") {
  232. if len(v) > 0 {
  233. userMeta[k] = v[0]
  234. }
  235. }
  236. }
  237. }
  238. return
  239. }
  240. // Low level implementation of CopyObject API, supports only upto 5GiB worth of copy.
  241. func (c Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string,
  242. metadata map[string]string) (ObjectInfo, error) {
  243. // Build headers.
  244. headers := make(http.Header)
  245. // Set all the metadata headers.
  246. for k, v := range metadata {
  247. headers.Set(k, v)
  248. }
  249. // Set the source header
  250. headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
  251. // Send upload-part-copy request
  252. resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
  253. bucketName: destBucket,
  254. objectName: destObject,
  255. customHeader: headers,
  256. })
  257. defer closeResponse(resp)
  258. if err != nil {
  259. return ObjectInfo{}, err
  260. }
  261. // Check if we got an error response.
  262. if resp.StatusCode != http.StatusOK {
  263. return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject)
  264. }
  265. cpObjRes := copyObjectResult{}
  266. err = xmlDecoder(resp.Body, &cpObjRes)
  267. if err != nil {
  268. return ObjectInfo{}, err
  269. }
  270. objInfo := ObjectInfo{
  271. Key: destObject,
  272. ETag: strings.Trim(cpObjRes.ETag, "\""),
  273. LastModified: cpObjRes.LastModified,
  274. }
  275. return objInfo, nil
  276. }
  277. func (c Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string,
  278. partID int, startOffset int64, length int64, metadata map[string]string) (p CompletePart, err error) {
  279. headers := make(http.Header)
  280. // Set source
  281. headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
  282. if startOffset < 0 {
  283. return p, ErrInvalidArgument("startOffset must be non-negative")
  284. }
  285. if length >= 0 {
  286. headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
  287. }
  288. for k, v := range metadata {
  289. headers.Set(k, v)
  290. }
  291. queryValues := make(url.Values)
  292. queryValues.Set("partNumber", strconv.Itoa(partID))
  293. queryValues.Set("uploadId", uploadID)
  294. resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
  295. bucketName: destBucket,
  296. objectName: destObject,
  297. customHeader: headers,
  298. queryValues: queryValues,
  299. })
  300. defer closeResponse(resp)
  301. if err != nil {
  302. return
  303. }
  304. // Check if we got an error response.
  305. if resp.StatusCode != http.StatusOK {
  306. return p, httpRespToErrorResponse(resp, destBucket, destObject)
  307. }
  308. // Decode copy-part response on success.
  309. cpObjRes := copyObjectResult{}
  310. err = xmlDecoder(resp.Body, &cpObjRes)
  311. if err != nil {
  312. return p, err
  313. }
  314. p.PartNumber, p.ETag = partID, cpObjRes.ETag
  315. return p, nil
  316. }
  317. // uploadPartCopy - helper function to create a part in a multipart
  318. // upload via an upload-part-copy request
  319. // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
  320. func (c Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
  321. headers http.Header) (p CompletePart, err error) {
  322. // Build query parameters
  323. urlValues := make(url.Values)
  324. urlValues.Set("partNumber", strconv.Itoa(partNumber))
  325. urlValues.Set("uploadId", uploadID)
  326. // Send upload-part-copy request
  327. resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
  328. bucketName: bucket,
  329. objectName: object,
  330. customHeader: headers,
  331. queryValues: urlValues,
  332. })
  333. defer closeResponse(resp)
  334. if err != nil {
  335. return p, err
  336. }
  337. // Check if we got an error response.
  338. if resp.StatusCode != http.StatusOK {
  339. return p, httpRespToErrorResponse(resp, bucket, object)
  340. }
  341. // Decode copy-part response on success.
  342. cpObjRes := copyObjectResult{}
  343. err = xmlDecoder(resp.Body, &cpObjRes)
  344. if err != nil {
  345. return p, err
  346. }
  347. p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
  348. return p, nil
  349. }
  350. // ComposeObject - creates an object using server-side copying of
  351. // existing objects. It takes a list of source objects (with optional
  352. // offsets) and concatenates them into a new object using only
  353. // server-side copying operations.
  354. func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
  355. if len(srcs) < 1 || len(srcs) > maxPartsCount {
  356. return ErrInvalidArgument("There must be as least one and up to 10000 source objects.")
  357. }
  358. ctx := context.Background()
  359. srcSizes := make([]int64, len(srcs))
  360. var totalSize, size, totalParts int64
  361. var srcUserMeta map[string]string
  362. var etag string
  363. var err error
  364. for i, src := range srcs {
  365. size, etag, srcUserMeta, err = src.getProps(c)
  366. if err != nil {
  367. return err
  368. }
  369. // Error out if client side encryption is used in this source object when
  370. // more than one source objects are given.
  371. if len(srcs) > 1 && src.Headers.Get("x-amz-meta-x-amz-key") != "" {
  372. return ErrInvalidArgument(
  373. fmt.Sprintf("Client side encryption is used in source object %s/%s", src.bucket, src.object))
  374. }
  375. // Since we did a HEAD to get size, we use the ETag
  376. // value to make sure the object has not changed by
  377. // the time we perform the copy. This is done, only if
  378. // the user has not set their own ETag match
  379. // condition.
  380. if src.Headers.Get("x-amz-copy-source-if-match") == "" {
  381. src.SetMatchETagCond(etag)
  382. }
  383. // Check if a segment is specified, and if so, is the
  384. // segment within object bounds?
  385. if src.start != -1 {
  386. // Since range is specified,
  387. // 0 <= src.start <= src.end
  388. // so only invalid case to check is:
  389. if src.end >= size {
  390. return ErrInvalidArgument(
  391. fmt.Sprintf("SourceInfo %d has invalid segment-to-copy [%d, %d] (size is %d)",
  392. i, src.start, src.end, size))
  393. }
  394. size = src.end - src.start + 1
  395. }
  396. // Only the last source may be less than `absMinPartSize`
  397. if size < absMinPartSize && i < len(srcs)-1 {
  398. return ErrInvalidArgument(
  399. fmt.Sprintf("SourceInfo %d is too small (%d) and it is not the last part", i, size))
  400. }
  401. // Is data to copy too large?
  402. totalSize += size
  403. if totalSize > maxMultipartPutObjectSize {
  404. return ErrInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
  405. }
  406. // record source size
  407. srcSizes[i] = size
  408. // calculate parts needed for current source
  409. totalParts += partsRequired(size)
  410. // Do we need more parts than we are allowed?
  411. if totalParts > maxPartsCount {
  412. return ErrInvalidArgument(fmt.Sprintf(
  413. "Your proposed compose object requires more than %d parts", maxPartsCount))
  414. }
  415. }
  416. // Single source object case (i.e. when only one source is
  417. // involved, it is being copied wholly and at most 5GiB in
  418. // size).
  419. if totalParts == 1 && srcs[0].start == -1 && totalSize <= maxPartSize {
  420. h := srcs[0].Headers
  421. // Add destination encryption headers
  422. for k, v := range dst.encryption.getSSEHeaders(false) {
  423. h.Set(k, v)
  424. }
  425. // If no user metadata is specified (and so, the
  426. // for-loop below is not entered), metadata from the
  427. // source is copied to the destination (due to
  428. // single-part copy-object PUT request behaviour).
  429. for k, v := range dst.getUserMetaHeadersMap(true) {
  430. h.Set(k, v)
  431. }
  432. // Send copy request
  433. resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
  434. bucketName: dst.bucket,
  435. objectName: dst.object,
  436. customHeader: h,
  437. })
  438. defer closeResponse(resp)
  439. if err != nil {
  440. return err
  441. }
  442. // Check if we got an error response.
  443. if resp.StatusCode != http.StatusOK {
  444. return httpRespToErrorResponse(resp, dst.bucket, dst.object)
  445. }
  446. // Return nil on success.
  447. return nil
  448. }
  449. // Now, handle multipart-copy cases.
  450. // 1. Initiate a new multipart upload.
  451. // Set user-metadata on the destination object. If no
  452. // user-metadata is specified, and there is only one source,
  453. // (only) then metadata from source is copied.
  454. userMeta := dst.getUserMetaHeadersMap(false)
  455. metaMap := userMeta
  456. if len(userMeta) == 0 && len(srcs) == 1 {
  457. metaMap = srcUserMeta
  458. }
  459. metaHeaders := make(map[string]string)
  460. for k, v := range metaMap {
  461. metaHeaders[k] = v
  462. }
  463. uploadID, err := c.newUploadID(ctx, dst.bucket, dst.object, PutObjectOptions{UserMetadata: metaHeaders})
  464. if err != nil {
  465. return err
  466. }
  467. // 2. Perform copy part uploads
  468. objParts := []CompletePart{}
  469. partIndex := 1
  470. for i, src := range srcs {
  471. h := src.Headers
  472. // Add destination encryption headers
  473. for k, v := range dst.encryption.getSSEHeaders(false) {
  474. h.Set(k, v)
  475. }
  476. // calculate start/end indices of parts after
  477. // splitting.
  478. startIdx, endIdx := calculateEvenSplits(srcSizes[i], src)
  479. for j, start := range startIdx {
  480. end := endIdx[j]
  481. // Add (or reset) source range header for
  482. // upload part copy request.
  483. h.Set("x-amz-copy-source-range",
  484. fmt.Sprintf("bytes=%d-%d", start, end))
  485. // make upload-part-copy request
  486. complPart, err := c.uploadPartCopy(ctx, dst.bucket,
  487. dst.object, uploadID, partIndex, h)
  488. if err != nil {
  489. return err
  490. }
  491. objParts = append(objParts, complPart)
  492. partIndex++
  493. }
  494. }
  495. // 3. Make final complete-multipart request.
  496. _, err = c.completeMultipartUpload(ctx, dst.bucket, dst.object, uploadID,
  497. completeMultipartUpload{Parts: objParts})
  498. if err != nil {
  499. return err
  500. }
  501. return nil
  502. }
  503. // partsRequired is ceiling(size / copyPartSize)
  504. func partsRequired(size int64) int64 {
  505. r := size / copyPartSize
  506. if size%copyPartSize > 0 {
  507. r++
  508. }
  509. return r
  510. }
  511. // calculateEvenSplits - computes splits for a source and returns
  512. // start and end index slices. Splits happen evenly to be sure that no
  513. // part is less than 5MiB, as that could fail the multipart request if
  514. // it is not the last part.
  515. func calculateEvenSplits(size int64, src SourceInfo) (startIndex, endIndex []int64) {
  516. if size == 0 {
  517. return
  518. }
  519. reqParts := partsRequired(size)
  520. startIndex = make([]int64, reqParts)
  521. endIndex = make([]int64, reqParts)
  522. // Compute number of required parts `k`, as:
  523. //
  524. // k = ceiling(size / copyPartSize)
  525. //
  526. // Now, distribute the `size` bytes in the source into
  527. // k parts as evenly as possible:
  528. //
  529. // r parts sized (q+1) bytes, and
  530. // (k - r) parts sized q bytes, where
  531. //
  532. // size = q * k + r (by simple division of size by k,
  533. // so that 0 <= r < k)
  534. //
  535. start := src.start
  536. if start == -1 {
  537. start = 0
  538. }
  539. quot, rem := size/reqParts, size%reqParts
  540. nextStart := start
  541. for j := int64(0); j < reqParts; j++ {
  542. curPartSize := quot
  543. if j < rem {
  544. curPartSize++
  545. }
  546. cStart := nextStart
  547. cEnd := cStart + curPartSize - 1
  548. nextStart = cEnd + 1
  549. startIndex[j], endIndex[j] = cStart, cEnd
  550. }
  551. return
  552. }