s3.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. // Copyright (C) 2024 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package s3
  7. import (
  8. "context"
  9. "io"
  10. "time"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/aws/credentials"
  13. "github.com/aws/aws-sdk-go/aws/session"
  14. "github.com/aws/aws-sdk-go/service/s3"
  15. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  16. "github.com/syncthing/syncthing/internal/blob"
  17. )
  18. var _ blob.Store = (*Session)(nil)
  19. type Session struct {
  20. bucket string
  21. s3sess *session.Session
  22. }
  23. type Object = s3.Object
  24. func NewSession(endpoint, region, bucket, accessKeyID, secretKey string) (*Session, error) {
  25. sess, err := session.NewSession(&aws.Config{
  26. Region: aws.String(region),
  27. Endpoint: aws.String(endpoint),
  28. Credentials: credentials.NewStaticCredentials(accessKeyID, secretKey, ""),
  29. S3ForcePathStyle: aws.Bool(true),
  30. })
  31. if err != nil {
  32. return nil, err
  33. }
  34. return &Session{
  35. bucket: bucket,
  36. s3sess: sess,
  37. }, nil
  38. }
  39. func (s *Session) Upload(_ context.Context, key string, r io.Reader) error {
  40. uploader := s3manager.NewUploader(s.s3sess)
  41. _, err := uploader.Upload(&s3manager.UploadInput{
  42. Bucket: aws.String(s.bucket),
  43. Key: aws.String(key),
  44. Body: r,
  45. })
  46. return err
  47. }
  48. func (s *Session) Download(_ context.Context, key string, w blob.Writer) error {
  49. downloader := s3manager.NewDownloader(s.s3sess)
  50. _, err := downloader.Download(w, &s3.GetObjectInput{
  51. Bucket: aws.String(s.bucket),
  52. Key: aws.String(key),
  53. })
  54. return err
  55. }
  56. func (s *Session) LatestKey(_ context.Context) (string, error) {
  57. var latestKey string
  58. var lastModified time.Time
  59. if err := s.list(func(obj *Object) bool {
  60. if latestKey == "" || obj.LastModified.After(lastModified) {
  61. latestKey = *obj.Key
  62. lastModified = *obj.LastModified
  63. }
  64. return true
  65. }); err != nil {
  66. return "", err
  67. }
  68. return latestKey, nil
  69. }
  70. func (s *Session) list(fn func(*Object) bool) error {
  71. svc := s3.New(s.s3sess)
  72. opts := &s3.ListObjectsV2Input{
  73. Bucket: aws.String(s.bucket),
  74. }
  75. for {
  76. resp, err := svc.ListObjectsV2(opts)
  77. if err != nil {
  78. return err
  79. }
  80. for _, item := range resp.Contents {
  81. if !fn(item) {
  82. return nil
  83. }
  84. }
  85. if resp.NextContinuationToken == nil || *resp.NextContinuationToken == "" {
  86. break
  87. }
  88. opts.ContinuationToken = resp.NextContinuationToken
  89. }
  90. return nil
  91. }