azblobfs.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179
  1. //go:build !noazblob
  2. // +build !noazblob
  3. package vfs
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/base64"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "mime"
  12. "net/http"
  13. "os"
  14. "path"
  15. "path/filepath"
  16. "strings"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  21. "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
  22. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  23. "github.com/eikenb/pipeat"
  24. "github.com/pkg/sftp"
  25. "github.com/drakkan/sftpgo/v2/logger"
  26. "github.com/drakkan/sftpgo/v2/metric"
  27. "github.com/drakkan/sftpgo/v2/plugin"
  28. "github.com/drakkan/sftpgo/v2/util"
  29. "github.com/drakkan/sftpgo/v2/version"
  30. )
  31. const (
  32. azureDefaultEndpoint = "blob.core.windows.net"
  33. )
  34. // AzureBlobFs is a Fs implementation for Azure Blob storage.
  35. type AzureBlobFs struct {
  36. connectionID string
  37. localTempDir string
  38. // if not empty this fs is mouted as virtual folder in the specified path
  39. mountPath string
  40. config *AzBlobFsConfig
  41. hasContainerAccess bool
  42. containerClient *azblob.ContainerClient
  43. ctxTimeout time.Duration
  44. ctxLongTimeout time.Duration
  45. }
  46. func init() {
  47. version.AddFeature("+azblob")
  48. }
  49. // NewAzBlobFs returns an AzBlobFs object that allows to interact with Azure Blob storage
  50. func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsConfig) (Fs, error) {
  51. if localTempDir == "" {
  52. if tempPath != "" {
  53. localTempDir = tempPath
  54. } else {
  55. localTempDir = filepath.Clean(os.TempDir())
  56. }
  57. }
  58. fs := &AzureBlobFs{
  59. connectionID: connectionID,
  60. localTempDir: localTempDir,
  61. mountPath: getMountPath(mountPath),
  62. config: &config,
  63. ctxTimeout: 30 * time.Second,
  64. ctxLongTimeout: 90 * time.Second,
  65. }
  66. if err := fs.config.validate(); err != nil {
  67. return fs, err
  68. }
  69. if err := fs.config.tryDecrypt(); err != nil {
  70. return fs, err
  71. }
  72. fs.setConfigDefaults()
  73. version := version.Get()
  74. clientOptions := &azblob.ClientOptions{
  75. Telemetry: policy.TelemetryOptions{
  76. ApplicationID: fmt.Sprintf("SFTPGo-%v_%v", version.Version, version.CommitHash),
  77. },
  78. }
  79. if fs.config.SASURL.GetPayload() != "" {
  80. parts, err := azblob.NewBlobURLParts(fs.config.SASURL.GetPayload())
  81. if err != nil {
  82. return fs, fmt.Errorf("invalid SAS URL: %w", err)
  83. }
  84. if parts.ContainerName != "" {
  85. if fs.config.Container != "" && fs.config.Container != parts.ContainerName {
  86. return fs, fmt.Errorf("container name in SAS URL %#v and container provided %#v do not match",
  87. parts.ContainerName, fs.config.Container)
  88. }
  89. fs.config.Container = parts.ContainerName
  90. } else {
  91. if fs.config.Container == "" {
  92. return fs, errors.New("container is required with this SAS URL")
  93. }
  94. }
  95. svc, err := azblob.NewServiceClientWithNoCredential(fs.config.SASURL.GetPayload(), clientOptions)
  96. if err != nil {
  97. return fs, fmt.Errorf("invalid credentials: %v", err)
  98. }
  99. fs.hasContainerAccess = false
  100. fs.containerClient, err = svc.NewContainerClient(fs.config.Container)
  101. return fs, err
  102. }
  103. credential, err := azblob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey.GetPayload())
  104. if err != nil {
  105. return fs, fmt.Errorf("invalid credentials: %v", err)
  106. }
  107. var endpoint string
  108. if fs.config.UseEmulator {
  109. endpoint = fmt.Sprintf("%s/%s", fs.config.Endpoint, fs.config.AccountName)
  110. } else {
  111. endpoint = fmt.Sprintf("https://%s.%s/", fs.config.AccountName, fs.config.Endpoint)
  112. }
  113. svc, err := azblob.NewServiceClientWithSharedKey(endpoint, credential, clientOptions)
  114. if err != nil {
  115. return fs, fmt.Errorf("invalid credentials: %v", err)
  116. }
  117. fs.hasContainerAccess = true
  118. fs.containerClient, err = svc.NewContainerClient(fs.config.Container)
  119. return fs, err
  120. }
  121. // Name returns the name for the Fs implementation
  122. func (fs *AzureBlobFs) Name() string {
  123. if !fs.config.SASURL.IsEmpty() {
  124. return fmt.Sprintf("Azure Blob with SAS URL, container %#v", fs.config.Container)
  125. }
  126. return fmt.Sprintf("Azure Blob container %#v", fs.config.Container)
  127. }
  128. // ConnectionID returns the connection ID associated to this Fs implementation
  129. func (fs *AzureBlobFs) ConnectionID() string {
  130. return fs.connectionID
  131. }
  132. // Stat returns a FileInfo describing the named file
  133. func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) {
  134. if name == "" || name == "." {
  135. if fs.hasContainerAccess {
  136. err := fs.checkIfBucketExists()
  137. if err != nil {
  138. return nil, err
  139. }
  140. }
  141. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  142. }
  143. if fs.config.KeyPrefix == name+"/" {
  144. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  145. }
  146. attrs, err := fs.headObject(name)
  147. if err == nil {
  148. contentType := util.GetStringFromPointer(attrs.ContentType)
  149. isDir := contentType == dirMimeType
  150. metric.AZListObjectsCompleted(nil)
  151. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir,
  152. util.GetIntFromPointer(attrs.ContentLength),
  153. util.GetTimeFromPointer(attrs.LastModified), false))
  154. }
  155. if !fs.IsNotExist(err) {
  156. return nil, err
  157. }
  158. // now check if this is a prefix (virtual directory)
  159. hasContents, err := fs.hasContents(name)
  160. if err != nil {
  161. return nil, err
  162. }
  163. if hasContents {
  164. return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
  165. }
  166. return nil, os.ErrNotExist
  167. }
  168. // Lstat returns a FileInfo describing the named file
  169. func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) {
  170. return fs.Stat(name)
  171. }
  172. // Open opens the named file for reading
  173. func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
  174. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  175. if err != nil {
  176. return nil, nil, nil, err
  177. }
  178. blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
  179. if err != nil {
  180. r.Close()
  181. w.Close()
  182. return nil, nil, nil, err
  183. }
  184. ctx, cancelFn := context.WithCancel(context.Background())
  185. go func() {
  186. defer cancelFn()
  187. err := fs.handleMultipartDownload(ctx, blockBlob, offset, w)
  188. w.CloseWithError(err) //nolint:errcheck
  189. fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, w.GetWrittenBytes(), err)
  190. metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err)
  191. }()
  192. return nil, r, cancelFn, nil
  193. }
  194. // Create creates or opens the named file for writing
  195. func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(), error) {
  196. r, w, err := pipeat.PipeInDir(fs.localTempDir)
  197. if err != nil {
  198. return nil, nil, nil, err
  199. }
  200. blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
  201. if err != nil {
  202. r.Close()
  203. w.Close()
  204. return nil, nil, nil, err
  205. }
  206. ctx, cancelFn := context.WithCancel(context.Background())
  207. p := NewPipeWriter(w)
  208. headers := azblob.BlobHTTPHeaders{}
  209. var contentType string
  210. if flag == -1 {
  211. contentType = dirMimeType
  212. } else {
  213. contentType = mime.TypeByExtension(path.Ext(name))
  214. }
  215. if contentType != "" {
  216. headers.BlobContentType = &contentType
  217. }
  218. go func() {
  219. defer cancelFn()
  220. err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers)
  221. r.CloseWithError(err) //nolint:errcheck
  222. p.Done(err)
  223. fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %+v", name, r.GetReadedBytes(), err)
  224. metric.AZTransferCompleted(r.GetReadedBytes(), 0, err)
  225. }()
  226. return nil, p, cancelFn, nil
  227. }
  228. // Rename renames (moves) source to target.
  229. // We don't support renaming non empty directories since we should
  230. // rename all the contents too and this could take long time: think
  231. // about directories with thousands of files, for each file we should
  232. // execute a StartCopyFromURL call.
  233. func (fs *AzureBlobFs) Rename(source, target string) error {
  234. if source == target {
  235. return nil
  236. }
  237. fi, err := fs.Stat(source)
  238. if err != nil {
  239. return err
  240. }
  241. if fi.IsDir() {
  242. hasContents, err := fs.hasContents(source)
  243. if err != nil {
  244. return err
  245. }
  246. if hasContents {
  247. return fmt.Errorf("cannot rename non empty directory: %#v", source)
  248. }
  249. }
  250. dstBlob, err := fs.containerClient.NewBlockBlobClient(target)
  251. if err != nil {
  252. return err
  253. }
  254. srcBlob, err := fs.containerClient.NewBlockBlobClient(source)
  255. if err != nil {
  256. return err
  257. }
  258. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
  259. defer cancelFn()
  260. resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions())
  261. if err != nil {
  262. metric.AZCopyObjectCompleted(err)
  263. return err
  264. }
  265. copyStatus := azblob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
  266. nErrors := 0
  267. for copyStatus == azblob.CopyStatusTypePending {
  268. // Poll until the copy is complete.
  269. time.Sleep(500 * time.Millisecond)
  270. resp, err := dstBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
  271. BlobAccessConditions: &azblob.BlobAccessConditions{},
  272. })
  273. if err != nil {
  274. // A GetProperties failure may be transient, so allow a couple
  275. // of them before giving up.
  276. nErrors++
  277. if ctx.Err() != nil || nErrors == 3 {
  278. metric.AZCopyObjectCompleted(err)
  279. return err
  280. }
  281. } else {
  282. copyStatus = azblob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
  283. }
  284. }
  285. if copyStatus != azblob.CopyStatusTypeSuccess {
  286. err := fmt.Errorf("copy failed with status: %s", copyStatus)
  287. metric.AZCopyObjectCompleted(err)
  288. return err
  289. }
  290. metric.AZCopyObjectCompleted(nil)
  291. fs.preserveModificationTime(source, target, fi)
  292. return fs.Remove(source, fi.IsDir())
  293. }
  294. // Remove removes the named file or (empty) directory.
  295. func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
  296. if isDir {
  297. hasContents, err := fs.hasContents(name)
  298. if err != nil {
  299. return err
  300. }
  301. if hasContents {
  302. return fmt.Errorf("cannot remove non empty directory: %#v", name)
  303. }
  304. }
  305. blobBlock, err := fs.containerClient.NewBlockBlobClient(name)
  306. if err != nil {
  307. return err
  308. }
  309. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  310. defer cancelFn()
  311. _, err = blobBlock.Delete(ctx, &azblob.BlobDeleteOptions{
  312. DeleteSnapshots: azblob.DeleteSnapshotsOptionTypeInclude.ToPtr(),
  313. })
  314. metric.AZDeleteObjectCompleted(err)
  315. if plugin.Handler.HasMetadater() && err == nil && !isDir {
  316. if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
  317. fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %+v", name, errMetadata)
  318. }
  319. }
  320. return err
  321. }
  322. // Mkdir creates a new directory with the specified name and default permissions
  323. func (fs *AzureBlobFs) Mkdir(name string) error {
  324. _, err := fs.Stat(name)
  325. if !fs.IsNotExist(err) {
  326. return err
  327. }
  328. _, w, _, err := fs.Create(name, -1)
  329. if err != nil {
  330. return err
  331. }
  332. return w.Close()
  333. }
  334. // Symlink creates source as a symbolic link to target.
  335. func (*AzureBlobFs) Symlink(source, target string) error {
  336. return ErrVfsUnsupported
  337. }
  338. // Readlink returns the destination of the named symbolic link
  339. func (*AzureBlobFs) Readlink(name string) (string, error) {
  340. return "", ErrVfsUnsupported
  341. }
  342. // Chown changes the numeric uid and gid of the named file.
  343. func (*AzureBlobFs) Chown(name string, uid int, gid int) error {
  344. return ErrVfsUnsupported
  345. }
  346. // Chmod changes the mode of the named file to mode.
  347. func (*AzureBlobFs) Chmod(name string, mode os.FileMode) error {
  348. return ErrVfsUnsupported
  349. }
  350. // Chtimes changes the access and modification times of the named file.
  351. func (fs *AzureBlobFs) Chtimes(name string, atime, mtime time.Time, isUploading bool) error {
  352. if !plugin.Handler.HasMetadater() {
  353. return ErrVfsUnsupported
  354. }
  355. if !isUploading {
  356. info, err := fs.Stat(name)
  357. if err != nil {
  358. return err
  359. }
  360. if info.IsDir() {
  361. return ErrVfsUnsupported
  362. }
  363. }
  364. return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
  365. util.GetTimeAsMsSinceEpoch(mtime))
  366. }
  367. // Truncate changes the size of the named file.
  368. // Truncate by path is not supported, while truncating an opened
  369. // file is handled inside base transfer
  370. func (*AzureBlobFs) Truncate(name string, size int64) error {
  371. return ErrVfsUnsupported
  372. }
  373. // ReadDir reads the directory named by dirname and returns
  374. // a list of directory entries.
  375. func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
  376. var result []os.FileInfo
  377. // dirname must be already cleaned
  378. prefix := fs.getPrefix(dirname)
  379. modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
  380. if err != nil {
  381. return result, err
  382. }
  383. prefixes := make(map[string]bool)
  384. pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{
  385. Include: []azblob.ListBlobsIncludeItem{},
  386. Prefix: &prefix,
  387. })
  388. hasNext := true
  389. for hasNext {
  390. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  391. defer cancelFn()
  392. if hasNext = pager.NextPage(ctx); hasNext {
  393. resp := pager.PageResponse()
  394. for _, blobPrefix := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobPrefixes {
  395. name := util.GetStringFromPointer(blobPrefix.Name)
  396. // we don't support prefixes == "/" this will be sent if a key starts with "/"
  397. if name == "" || name == "/" {
  398. continue
  399. }
  400. // sometime we have duplicate prefixes, maybe an Azurite bug
  401. name = strings.TrimPrefix(name, prefix)
  402. if _, ok := prefixes[strings.TrimSuffix(name, "/")]; ok {
  403. continue
  404. }
  405. result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
  406. prefixes[strings.TrimSuffix(name, "/")] = true
  407. }
  408. for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
  409. name := util.GetStringFromPointer(blobItem.Name)
  410. name = strings.TrimPrefix(name, prefix)
  411. size := int64(0)
  412. isDir := false
  413. modTime := time.Now()
  414. if blobItem.Properties != nil {
  415. size = util.GetIntFromPointer(blobItem.Properties.ContentLength)
  416. modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified)
  417. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  418. isDir = (contentType == dirMimeType)
  419. if isDir {
  420. // check if the dir is already included, it will be sent as blob prefix if it contains at least one item
  421. if _, ok := prefixes[name]; ok {
  422. continue
  423. }
  424. prefixes[name] = true
  425. }
  426. }
  427. if t, ok := modTimes[name]; ok {
  428. modTime = util.GetTimeFromMsecSinceEpoch(t)
  429. }
  430. result = append(result, NewFileInfo(name, isDir, size, modTime, false))
  431. }
  432. }
  433. }
  434. err = pager.Err()
  435. metric.AZListObjectsCompleted(err)
  436. return result, err
  437. }
  438. // IsUploadResumeSupported returns true if resuming uploads is supported.
  439. // Resuming uploads is not supported on Azure Blob
  440. func (*AzureBlobFs) IsUploadResumeSupported() bool {
  441. return false
  442. }
  443. // IsAtomicUploadSupported returns true if atomic upload is supported.
  444. // Azure Blob uploads are already atomic, we don't need to upload to a temporary
  445. // file
  446. func (*AzureBlobFs) IsAtomicUploadSupported() bool {
  447. return false
  448. }
  449. // IsNotExist returns a boolean indicating whether the error is known to
  450. // report that a file or directory does not exist
  451. func (*AzureBlobFs) IsNotExist(err error) bool {
  452. if err == nil {
  453. return false
  454. }
  455. var errStorage *azblob.StorageError
  456. if errors.As(err, &errStorage) {
  457. return errStorage.StatusCode() == http.StatusNotFound
  458. }
  459. var errResp *azcore.ResponseError
  460. if errors.As(err, &errResp) {
  461. return errResp.StatusCode == http.StatusNotFound
  462. }
  463. // os.ErrNotExist can be returned internally by fs.Stat
  464. return errors.Is(err, os.ErrNotExist)
  465. }
  466. // IsPermission returns a boolean indicating whether the error is known to
  467. // report that permission is denied.
  468. func (*AzureBlobFs) IsPermission(err error) bool {
  469. if err == nil {
  470. return false
  471. }
  472. var errStorage *azblob.StorageError
  473. if errors.As(err, &errStorage) {
  474. statusCode := errStorage.StatusCode()
  475. return statusCode == http.StatusForbidden || statusCode == http.StatusUnauthorized
  476. }
  477. var errResp *azcore.ResponseError
  478. if errors.As(err, &errResp) {
  479. return errResp.StatusCode == http.StatusForbidden || errResp.StatusCode == http.StatusUnauthorized
  480. }
  481. return false
  482. }
  483. // IsNotSupported returns true if the error indicate an unsupported operation
  484. func (*AzureBlobFs) IsNotSupported(err error) bool {
  485. if err == nil {
  486. return false
  487. }
  488. return err == ErrVfsUnsupported
  489. }
  490. // CheckRootPath creates the specified local root directory if it does not exists
  491. func (fs *AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
  492. // we need a local directory for temporary files
  493. osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
  494. return osFs.CheckRootPath(username, uid, gid)
  495. }
  496. // ScanRootDirContents returns the number of files contained in the bucket,
  497. // and their size
  498. func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
  499. numFiles := 0
  500. size := int64(0)
  501. pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
  502. Prefix: &fs.config.KeyPrefix,
  503. })
  504. hasNext := true
  505. for hasNext {
  506. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  507. defer cancelFn()
  508. if hasNext = pager.NextPage(ctx); hasNext {
  509. resp := pager.PageResponse()
  510. for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
  511. if blobItem.Properties != nil {
  512. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  513. isDir := (contentType == dirMimeType)
  514. blobSize := util.GetIntFromPointer(blobItem.Properties.ContentLength)
  515. if isDir && blobSize == 0 {
  516. continue
  517. }
  518. numFiles++
  519. size += blobSize
  520. }
  521. }
  522. }
  523. }
  524. err := pager.Err()
  525. metric.AZListObjectsCompleted(err)
  526. return numFiles, size, err
  527. }
  528. func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
  529. fileNames := make(map[string]bool)
  530. prefix := ""
  531. if fsPrefix != "/" {
  532. prefix = strings.TrimPrefix(fsPrefix, "/")
  533. }
  534. pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{
  535. Include: []azblob.ListBlobsIncludeItem{},
  536. Prefix: &prefix,
  537. })
  538. hasNext := true
  539. for hasNext {
  540. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  541. defer cancelFn()
  542. if hasNext = pager.NextPage(ctx); hasNext {
  543. resp := pager.PageResponse()
  544. for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
  545. name := util.GetStringFromPointer(blobItem.Name)
  546. name = strings.TrimPrefix(name, prefix)
  547. if blobItem.Properties != nil {
  548. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  549. isDir := (contentType == dirMimeType)
  550. if isDir {
  551. continue
  552. }
  553. fileNames[name] = true
  554. }
  555. }
  556. }
  557. }
  558. err := pager.Err()
  559. metric.AZListObjectsCompleted(err)
  560. return fileNames, err
  561. }
  562. // CheckMetadata checks the metadata consistency
  563. func (fs *AzureBlobFs) CheckMetadata() error {
  564. return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
  565. }
  566. // GetDirSize returns the number of files and the size for a folder
  567. // including any subfolders
  568. func (*AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
  569. return 0, 0, ErrVfsUnsupported
  570. }
  571. // GetAtomicUploadPath returns the path to use for an atomic upload.
  572. // Azure Blob Storage uploads are already atomic, we never call this method
  573. func (*AzureBlobFs) GetAtomicUploadPath(name string) string {
  574. return ""
  575. }
  576. // GetRelativePath returns the path for a file relative to the user's home dir.
  577. // This is the path as seen by SFTPGo users
  578. func (fs *AzureBlobFs) GetRelativePath(name string) string {
  579. rel := path.Clean(name)
  580. if rel == "." {
  581. rel = ""
  582. }
  583. if !path.IsAbs(rel) {
  584. rel = "/" + rel
  585. }
  586. if fs.config.KeyPrefix != "" {
  587. if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) {
  588. rel = "/"
  589. }
  590. rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
  591. }
  592. if fs.mountPath != "" {
  593. rel = path.Join(fs.mountPath, rel)
  594. }
  595. return rel
  596. }
  597. // Walk walks the file tree rooted at root, calling walkFn for each file or
  598. // directory in the tree, including root
  599. func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
  600. prefix := fs.getPrefix(root)
  601. pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
  602. Prefix: &prefix,
  603. })
  604. hasNext := true
  605. for hasNext {
  606. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  607. defer cancelFn()
  608. if hasNext = pager.NextPage(ctx); hasNext {
  609. resp := pager.PageResponse()
  610. for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
  611. name := util.GetStringFromPointer(blobItem.Name)
  612. if fs.isEqual(name, prefix) {
  613. continue
  614. }
  615. blobSize := int64(0)
  616. lastModified := time.Now()
  617. isDir := false
  618. if blobItem.Properties != nil {
  619. contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
  620. isDir = (contentType == dirMimeType)
  621. blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength)
  622. lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified)
  623. }
  624. err := walkFn(name, NewFileInfo(name, isDir, blobSize, lastModified, false), nil)
  625. if err != nil {
  626. return err
  627. }
  628. }
  629. }
  630. }
  631. err := pager.Err()
  632. if err != nil {
  633. metric.AZListObjectsCompleted(err)
  634. return err
  635. }
  636. metric.AZListObjectsCompleted(nil)
  637. return walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), nil)
  638. }
  639. // Join joins any number of path elements into a single path
  640. func (*AzureBlobFs) Join(elem ...string) string {
  641. return strings.TrimPrefix(path.Join(elem...), "/")
  642. }
  643. // HasVirtualFolders returns true if folders are emulated
  644. func (*AzureBlobFs) HasVirtualFolders() bool {
  645. return true
  646. }
  647. // ResolvePath returns the matching filesystem path for the specified sftp path
  648. func (fs *AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
  649. if fs.mountPath != "" {
  650. virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
  651. }
  652. if !path.IsAbs(virtualPath) {
  653. virtualPath = path.Clean("/" + virtualPath)
  654. }
  655. return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
  656. }
  657. func (fs *AzureBlobFs) headObject(name string) (azblob.BlobGetPropertiesResponse, error) {
  658. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  659. defer cancelFn()
  660. blobClient, err := fs.containerClient.NewBlockBlobClient(name)
  661. if err != nil {
  662. return azblob.BlobGetPropertiesResponse{}, err
  663. }
  664. resp, err := blobClient.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
  665. BlobAccessConditions: &azblob.BlobAccessConditions{},
  666. })
  667. metric.AZHeadObjectCompleted(err)
  668. return resp, err
  669. }
  670. // GetMimeType returns the content type
  671. func (fs *AzureBlobFs) GetMimeType(name string) (string, error) {
  672. response, err := fs.headObject(name)
  673. if err != nil {
  674. return "", err
  675. }
  676. return util.GetStringFromPointer(response.ContentType), nil
  677. }
  678. // Close closes the fs
  679. func (*AzureBlobFs) Close() error {
  680. return nil
  681. }
  682. // GetAvailableDiskSize returns the available size for the specified path
  683. func (*AzureBlobFs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
  684. return nil, ErrStorageSizeUnavailable
  685. }
  686. func (*AzureBlobFs) getPrefix(name string) string {
  687. prefix := ""
  688. if name != "" && name != "." {
  689. prefix = strings.TrimPrefix(name, "/")
  690. if !strings.HasSuffix(prefix, "/") {
  691. prefix += "/"
  692. }
  693. }
  694. return prefix
  695. }
  696. func (fs *AzureBlobFs) isEqual(key string, virtualName string) bool {
  697. if key == virtualName {
  698. return true
  699. }
  700. if key == virtualName+"/" {
  701. return true
  702. }
  703. if key+"/" == virtualName {
  704. return true
  705. }
  706. return false
  707. }
  708. func (fs *AzureBlobFs) setConfigDefaults() {
  709. if fs.config.Endpoint == "" {
  710. fs.config.Endpoint = azureDefaultEndpoint
  711. }
  712. if fs.config.UploadPartSize == 0 {
  713. fs.config.UploadPartSize = 5
  714. }
  715. if fs.config.UploadPartSize < 1024*1024 {
  716. fs.config.UploadPartSize *= 1024 * 1024
  717. }
  718. if fs.config.UploadConcurrency == 0 {
  719. fs.config.UploadConcurrency = 5
  720. }
  721. if fs.config.DownloadPartSize == 0 {
  722. fs.config.DownloadPartSize = 5
  723. }
  724. if fs.config.DownloadPartSize < 1024*1024 {
  725. fs.config.DownloadPartSize *= 1024 * 1024
  726. }
  727. if fs.config.DownloadConcurrency == 0 {
  728. fs.config.DownloadConcurrency = 5
  729. }
  730. }
  731. func (fs *AzureBlobFs) checkIfBucketExists() error {
  732. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  733. defer cancelFn()
  734. _, err := fs.containerClient.GetProperties(ctx, &azblob.ContainerGetPropertiesOptions{})
  735. metric.AZHeadContainerCompleted(err)
  736. return err
  737. }
  738. func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
  739. result := false
  740. prefix := fs.getPrefix(name)
  741. maxResults := int32(1)
  742. pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
  743. MaxResults: &maxResults,
  744. Prefix: &prefix,
  745. })
  746. ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
  747. defer cancelFn()
  748. if pager.NextPage(ctx) {
  749. resp := pager.PageResponse()
  750. result = len(resp.ListBlobsFlatSegmentResponse.Segment.BlobItems) > 0
  751. }
  752. err := pager.Err()
  753. metric.AZListObjectsCompleted(err)
  754. return result, err
  755. }
  756. func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *azblob.BlockBlobClient, buf []byte,
  757. w io.WriterAt, offset, count, writeOffset int64,
  758. ) error {
  759. if count == 0 {
  760. return nil
  761. }
  762. resp, err := blockBlob.Download(ctx, &azblob.BlobDownloadOptions{
  763. Offset: &offset,
  764. Count: &count,
  765. })
  766. if err != nil {
  767. return err
  768. }
  769. body := resp.Body(&azblob.RetryReaderOptions{MaxRetryRequests: 2})
  770. defer body.Close()
  771. _, err = io.ReadAtLeast(body, buf, int(count))
  772. if err != nil {
  773. return err
  774. }
  775. _, err = fs.writeAtFull(w, buf, writeOffset, int(count))
  776. return err
  777. }
  778. func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *azblob.BlockBlobClient,
  779. offset int64, writer io.WriterAt,
  780. ) error {
  781. props, err := blockBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
  782. BlobAccessConditions: &azblob.BlobAccessConditions{},
  783. })
  784. if err != nil {
  785. fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err)
  786. return err
  787. }
  788. contentLength := util.GetIntFromPointer(props.ContentLength)
  789. sizeToDownload := contentLength - offset
  790. if sizeToDownload < 0 {
  791. fsLog(fs, logger.LevelError, "invalid multipart download size or offset, size: %v, offset: %v, size to download: %v",
  792. contentLength, offset, sizeToDownload)
  793. return errors.New("the requested offset exceeds the file size")
  794. }
  795. if sizeToDownload == 0 {
  796. fsLog(fs, logger.LevelDebug, "nothing to download, offset %v, content length %v", offset, contentLength)
  797. return nil
  798. }
  799. partSize := fs.config.DownloadPartSize
  800. guard := make(chan struct{}, fs.config.DownloadConcurrency)
  801. blockCtxTimeout := time.Duration(fs.config.DownloadPartSize/(1024*1024)) * time.Minute
  802. pool := newBufferAllocator(int(partSize))
  803. finished := false
  804. var wg sync.WaitGroup
  805. var errOnce sync.Once
  806. var hasError int32
  807. var poolError error
  808. poolCtx, poolCancel := context.WithCancel(ctx)
  809. defer poolCancel()
  810. for part := 0; !finished; part++ {
  811. start := offset
  812. end := offset + partSize
  813. if end >= contentLength {
  814. end = contentLength
  815. finished = true
  816. }
  817. writeOffset := int64(part) * partSize
  818. offset = end
  819. guard <- struct{}{}
  820. if atomic.LoadInt32(&hasError) == 1 {
  821. fsLog(fs, logger.LevelDebug, "pool error, download for part %v not started", part)
  822. break
  823. }
  824. buf := pool.getBuffer()
  825. wg.Add(1)
  826. go func(start, end, writeOffset int64, buf []byte) {
  827. defer func() {
  828. pool.releaseBuffer(buf)
  829. <-guard
  830. wg.Done()
  831. }()
  832. innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
  833. defer cancelFn()
  834. count := end - start
  835. err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset)
  836. if err != nil {
  837. errOnce.Do(func() {
  838. fsLog(fs, logger.LevelError, "multipart download error: %+v", err)
  839. atomic.StoreInt32(&hasError, 1)
  840. poolError = fmt.Errorf("multipart download error: %w", err)
  841. poolCancel()
  842. })
  843. }
  844. }(start, end, writeOffset, buf)
  845. }
  846. wg.Wait()
  847. close(guard)
  848. pool.free()
  849. return poolError
  850. }
  851. func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader,
  852. blockBlob *azblob.BlockBlobClient, httpHeaders *azblob.BlobHTTPHeaders,
  853. ) error {
  854. partSize := fs.config.UploadPartSize
  855. guard := make(chan struct{}, fs.config.UploadConcurrency)
  856. blockCtxTimeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute
  857. // sync.Pool seems to use a lot of memory so prefer our own, very simple, allocator
  858. // we only need to recycle few byte slices
  859. pool := newBufferAllocator(int(partSize))
  860. finished := false
  861. binaryBlockID := make([]byte, 8)
  862. var blocks []string
  863. var wg sync.WaitGroup
  864. var errOnce sync.Once
  865. var hasError int32
  866. var poolError error
  867. poolCtx, poolCancel := context.WithCancel(ctx)
  868. defer poolCancel()
  869. for part := 0; !finished; part++ {
  870. buf := pool.getBuffer()
  871. n, err := fs.readFill(reader, buf)
  872. if err == io.EOF {
  873. // read finished, if n > 0 we need to process the last data chunck
  874. if n == 0 {
  875. pool.releaseBuffer(buf)
  876. break
  877. }
  878. finished = true
  879. } else if err != nil {
  880. pool.releaseBuffer(buf)
  881. pool.free()
  882. return err
  883. }
  884. fs.incrementBlockID(binaryBlockID)
  885. blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
  886. blocks = append(blocks, blockID)
  887. guard <- struct{}{}
  888. if atomic.LoadInt32(&hasError) == 1 {
  889. fsLog(fs, logger.LevelError, "pool error, upload for part %v not started", part)
  890. pool.releaseBuffer(buf)
  891. break
  892. }
  893. wg.Add(1)
  894. go func(blockID string, buf []byte, bufSize int) {
  895. defer func() {
  896. pool.releaseBuffer(buf)
  897. <-guard
  898. wg.Done()
  899. }()
  900. bufferReader := &bytesReaderWrapper{
  901. Reader: bytes.NewReader(buf[:bufSize]),
  902. }
  903. innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
  904. defer cancelFn()
  905. _, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &azblob.BlockBlobStageBlockOptions{})
  906. if err != nil {
  907. errOnce.Do(func() {
  908. fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
  909. atomic.StoreInt32(&hasError, 1)
  910. poolError = fmt.Errorf("multipart upload error: %w", err)
  911. poolCancel()
  912. })
  913. }
  914. }(blockID, buf, n)
  915. }
  916. wg.Wait()
  917. close(guard)
  918. pool.free()
  919. if poolError != nil {
  920. return poolError
  921. }
  922. commitOptions := azblob.BlockBlobCommitBlockListOptions{
  923. BlobHTTPHeaders: httpHeaders,
  924. }
  925. if fs.config.AccessTier != "" {
  926. commitOptions.Tier = (*azblob.AccessTier)(&fs.config.AccessTier)
  927. }
  928. _, err := blockBlob.CommitBlockList(ctx, blocks, &commitOptions)
  929. return err
  930. }
  931. func (*AzureBlobFs) writeAtFull(w io.WriterAt, buf []byte, offset int64, count int) (int, error) {
  932. written := 0
  933. for written < count {
  934. n, err := w.WriteAt(buf[written:count], offset+int64(written))
  935. written += n
  936. if err != nil {
  937. return written, err
  938. }
  939. }
  940. return written, nil
  941. }
  942. // copied from rclone
  943. func (*AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) {
  944. var nn int
  945. for n < len(buf) && err == nil {
  946. nn, err = r.Read(buf[n:])
  947. n += nn
  948. }
  949. return n, err
  950. }
  951. // copied from rclone
  952. func (*AzureBlobFs) incrementBlockID(blockID []byte) {
  953. for i, digit := range blockID {
  954. newDigit := digit + 1
  955. blockID[i] = newDigit
  956. if newDigit >= digit {
  957. // exit if no carry
  958. break
  959. }
  960. }
  961. }
  962. func (fs *AzureBlobFs) preserveModificationTime(source, target string, fi os.FileInfo) {
  963. if plugin.Handler.HasMetadater() {
  964. if !fi.IsDir() {
  965. err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
  966. util.GetTimeAsMsSinceEpoch(fi.ModTime()))
  967. if err != nil {
  968. fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
  969. source, target, err)
  970. }
  971. }
  972. }
  973. }
  974. func (fs *AzureBlobFs) getCopyOptions() *azblob.BlobStartCopyOptions {
  975. copyOptions := &azblob.BlobStartCopyOptions{}
  976. if fs.config.AccessTier != "" {
  977. copyOptions.Tier = (*azblob.AccessTier)(&fs.config.AccessTier)
  978. }
  979. return copyOptions
  980. }
  981. func (fs *AzureBlobFs) getStorageID() string {
  982. if fs.config.Endpoint != "" {
  983. if !strings.HasSuffix(fs.config.Endpoint, "/") {
  984. return fmt.Sprintf("azblob://%v/%v", fs.config.Endpoint, fs.config.Container)
  985. }
  986. return fmt.Sprintf("azblob://%v%v", fs.config.Endpoint, fs.config.Container)
  987. }
  988. return fmt.Sprintf("azblob://%v", fs.config.Container)
  989. }
  990. type bytesReaderWrapper struct {
  991. *bytes.Reader
  992. }
  993. func (b *bytesReaderWrapper) Close() error {
  994. return nil
  995. }
  996. type bufferAllocator struct {
  997. sync.Mutex
  998. available [][]byte
  999. bufferSize int
  1000. finalized bool
  1001. }
  1002. func newBufferAllocator(size int) *bufferAllocator {
  1003. return &bufferAllocator{
  1004. bufferSize: size,
  1005. finalized: false,
  1006. }
  1007. }
  1008. func (b *bufferAllocator) getBuffer() []byte {
  1009. b.Lock()
  1010. defer b.Unlock()
  1011. if len(b.available) > 0 {
  1012. var result []byte
  1013. truncLength := len(b.available) - 1
  1014. result = b.available[truncLength]
  1015. b.available[truncLength] = nil
  1016. b.available = b.available[:truncLength]
  1017. return result
  1018. }
  1019. return make([]byte, b.bufferSize)
  1020. }
  1021. func (b *bufferAllocator) releaseBuffer(buf []byte) {
  1022. b.Lock()
  1023. defer b.Unlock()
  1024. if b.finalized || len(buf) != b.bufferSize {
  1025. return
  1026. }
  1027. b.available = append(b.available, buf)
  1028. }
  1029. func (b *bufferAllocator) free() {
  1030. b.Lock()
  1031. defer b.Unlock()
  1032. b.available = nil
  1033. b.finalized = true
  1034. }