pull.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/compose-spec/compose-go/v2/types"
  25. "github.com/distribution/reference"
  26. "github.com/docker/buildx/driver"
  27. "github.com/docker/cli/cli/config/configfile"
  28. "github.com/docker/docker/api/types/image"
  29. "github.com/docker/docker/client"
  30. "github.com/docker/docker/pkg/jsonmessage"
  31. "github.com/docker/docker/registry"
  32. "github.com/hashicorp/go-multierror"
  33. "github.com/opencontainers/go-digest"
  34. "golang.org/x/sync/errgroup"
  35. "github.com/docker/compose/v2/pkg/api"
  36. "github.com/docker/compose/v2/pkg/progress"
  37. )
  38. func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
  39. return progress.RunWithTitle(ctx, func(ctx context.Context) error {
  40. return s.pull(ctx, project, options)
  41. }, s.stdinfo(), "Pulling")
  42. }
  43. func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
  44. images, err := s.getLocalImagesDigests(ctx, project)
  45. if err != nil {
  46. return err
  47. }
  48. w := progress.ContextWriter(ctx)
  49. eg, ctx := errgroup.WithContext(ctx)
  50. eg.SetLimit(s.maxConcurrency)
  51. var (
  52. mustBuild []string
  53. pullErrors = make([]error, len(project.Services))
  54. imagesBeingPulled = map[string]string{}
  55. )
  56. i := 0
  57. for name, service := range project.Services {
  58. if service.Image == "" {
  59. w.Event(progress.Event{
  60. ID: name,
  61. Status: progress.Done,
  62. Text: "Skipped - No image to be pulled",
  63. })
  64. continue
  65. }
  66. switch service.PullPolicy {
  67. case types.PullPolicyNever, types.PullPolicyBuild:
  68. w.Event(progress.Event{
  69. ID: name,
  70. Status: progress.Done,
  71. Text: "Skipped",
  72. })
  73. continue
  74. case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
  75. if imageAlreadyPresent(service.Image, images) {
  76. w.Event(progress.Event{
  77. ID: name,
  78. Status: progress.Done,
  79. Text: "Skipped - Image is already present locally",
  80. })
  81. continue
  82. }
  83. }
  84. if service.Build != nil && opts.IgnoreBuildable {
  85. w.Event(progress.Event{
  86. ID: name,
  87. Status: progress.Done,
  88. Text: "Skipped - Image can be built",
  89. })
  90. continue
  91. }
  92. if s, ok := imagesBeingPulled[service.Image]; ok {
  93. w.Event(progress.Event{
  94. ID: name,
  95. Status: progress.Done,
  96. Text: fmt.Sprintf("Skipped - Image is already being pulled by %v", s),
  97. })
  98. continue
  99. }
  100. imagesBeingPulled[service.Image] = service.Name
  101. idx := i
  102. eg.Go(func() error {
  103. _, err := s.pullServiceImage(ctx, service, s.configFile(), w, opts.Quiet, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  104. if err != nil {
  105. pullErrors[idx] = err
  106. if service.Build != nil {
  107. mustBuild = append(mustBuild, service.Name)
  108. }
  109. if !opts.IgnoreFailures && service.Build == nil {
  110. if s.dryRun {
  111. w.Event(progress.Event{
  112. ID: name,
  113. Status: progress.Error,
  114. Text: fmt.Sprintf(" - Pull error for image: %s", service.Image),
  115. })
  116. }
  117. // fail fast if image can't be pulled nor built
  118. return err
  119. }
  120. }
  121. return nil
  122. })
  123. i++
  124. }
  125. err = eg.Wait()
  126. if len(mustBuild) > 0 {
  127. w.TailMsgf("WARNING: Some service image(s) must be built from source by running:\n docker compose build %s", strings.Join(mustBuild, " "))
  128. }
  129. if err != nil {
  130. return err
  131. }
  132. if opts.IgnoreFailures {
  133. return nil
  134. }
  135. return multierror.Append(nil, pullErrors...).ErrorOrNil()
  136. }
  137. func imageAlreadyPresent(serviceImage string, localImages map[string]api.ImageSummary) bool {
  138. normalizedImage, err := reference.ParseDockerRef(serviceImage)
  139. if err != nil {
  140. return false
  141. }
  142. tagged, ok := normalizedImage.(reference.NamedTagged)
  143. if !ok {
  144. return false
  145. }
  146. _, ok = localImages[serviceImage]
  147. return ok && tagged.Tag() != "latest"
  148. }
  149. func getUnwrappedErrorMessage(err error) string {
  150. derr := errors.Unwrap(err)
  151. if derr != nil {
  152. return getUnwrappedErrorMessage(derr)
  153. }
  154. return err.Error()
  155. }
  156. func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig,
  157. configFile driver.Auth, w progress.Writer, quietPull bool, defaultPlatform string,
  158. ) (string, error) {
  159. w.Event(progress.Event{
  160. ID: service.Name,
  161. Status: progress.Working,
  162. Text: "Pulling",
  163. })
  164. ref, err := reference.ParseNormalizedNamed(service.Image)
  165. if err != nil {
  166. return "", err
  167. }
  168. encodedAuth, err := encodedAuth(ref, configFile)
  169. if err != nil {
  170. return "", err
  171. }
  172. platform := service.Platform
  173. if platform == "" {
  174. platform = defaultPlatform
  175. }
  176. stream, err := s.apiClient().ImagePull(ctx, service.Image, image.PullOptions{
  177. RegistryAuth: encodedAuth,
  178. Platform: platform,
  179. })
  180. if ctx.Err() != nil {
  181. w.Event(progress.Event{
  182. ID: service.Name,
  183. Status: progress.Warning,
  184. StatusText: "Interrupted",
  185. })
  186. return "", nil
  187. }
  188. // check if has error and the service has a build section
  189. // then the status should be warning instead of error
  190. if err != nil && service.Build != nil {
  191. w.Event(progress.Event{
  192. ID: service.Name,
  193. Status: progress.Warning,
  194. Text: "Warning",
  195. StatusText: getUnwrappedErrorMessage(err),
  196. })
  197. return "", err
  198. }
  199. if err != nil {
  200. w.Event(progress.Event{
  201. ID: service.Name,
  202. Status: progress.Error,
  203. Text: "Error",
  204. StatusText: getUnwrappedErrorMessage(err),
  205. })
  206. return "", err
  207. }
  208. dec := json.NewDecoder(stream)
  209. for {
  210. var jm jsonmessage.JSONMessage
  211. if err := dec.Decode(&jm); err != nil {
  212. if errors.Is(err, io.EOF) {
  213. break
  214. }
  215. return "", err
  216. }
  217. if jm.Error != nil {
  218. return "", errors.New(jm.Error.Message)
  219. }
  220. if !quietPull {
  221. toPullProgressEvent(service.Name, jm, w)
  222. }
  223. }
  224. w.Event(progress.Event{
  225. ID: service.Name,
  226. Status: progress.Done,
  227. Text: "Pulled",
  228. })
  229. inspected, err := s.apiClient().ImageInspect(ctx, service.Image)
  230. if err != nil {
  231. return "", err
  232. }
  233. return inspected.ID, nil
  234. }
  235. // ImageDigestResolver creates a func able to resolve image digest from a docker ref,
  236. func ImageDigestResolver(ctx context.Context, file *configfile.ConfigFile, apiClient client.APIClient) func(named reference.Named) (digest.Digest, error) {
  237. return func(named reference.Named) (digest.Digest, error) {
  238. auth, err := encodedAuth(named, file)
  239. if err != nil {
  240. return "", err
  241. }
  242. inspect, err := apiClient.DistributionInspect(ctx, named.String(), auth)
  243. if err != nil {
  244. return "",
  245. fmt.Errorf("failed to resolve digest for %s: %w", named.String(), err)
  246. }
  247. return inspect.Descriptor.Digest, nil
  248. }
  249. }
  250. func encodedAuth(ref reference.Named, configFile driver.Auth) (string, error) {
  251. repoInfo, err := registry.ParseRepositoryInfo(ref)
  252. if err != nil {
  253. return "", err
  254. }
  255. key := registry.GetAuthConfigKey(repoInfo.Index)
  256. authConfig, err := configFile.GetAuthConfig(key)
  257. if err != nil {
  258. return "", err
  259. }
  260. buf, err := json.Marshal(authConfig)
  261. if err != nil {
  262. return "", err
  263. }
  264. return base64.URLEncoding.EncodeToString(buf), nil
  265. }
  266. func (s *composeService) pullRequiredImages(ctx context.Context, project *types.Project, images map[string]api.ImageSummary, quietPull bool) error {
  267. needPull := map[string]types.ServiceConfig{}
  268. for name, service := range project.Services {
  269. pull, err := mustPull(service, images)
  270. if err != nil {
  271. return err
  272. }
  273. if pull {
  274. needPull[name] = service
  275. }
  276. for i, vol := range service.Volumes {
  277. if vol.Type == types.VolumeTypeImage {
  278. if _, ok := images[vol.Source]; !ok {
  279. // Hack: create a fake ServiceConfig so we pull missing volume image
  280. n := fmt.Sprintf("%s:volume %d", name, i)
  281. needPull[n] = types.ServiceConfig{
  282. Name: n,
  283. Image: vol.Source,
  284. }
  285. }
  286. }
  287. }
  288. }
  289. if len(needPull) == 0 {
  290. return nil
  291. }
  292. return progress.Run(ctx, func(ctx context.Context) error {
  293. w := progress.ContextWriter(ctx)
  294. eg, ctx := errgroup.WithContext(ctx)
  295. eg.SetLimit(s.maxConcurrency)
  296. pulledImages := map[string]api.ImageSummary{}
  297. var mutex sync.Mutex
  298. for name, service := range needPull {
  299. eg.Go(func() error {
  300. id, err := s.pullServiceImage(ctx, service, s.configFile(), w, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  301. mutex.Lock()
  302. defer mutex.Unlock()
  303. pulledImages[name] = api.ImageSummary{
  304. ID: id,
  305. Repository: service.Image,
  306. LastTagTime: time.Now(),
  307. }
  308. if err != nil && isServiceImageToBuild(service, project.Services) {
  309. // image can be built, so we can ignore pull failure
  310. return nil
  311. }
  312. return err
  313. })
  314. }
  315. err := eg.Wait()
  316. for i, service := range needPull {
  317. if pulledImages[i].ID != "" {
  318. images[service.Image] = pulledImages[i]
  319. }
  320. }
  321. return err
  322. }, s.stdinfo())
  323. }
  324. func mustPull(service types.ServiceConfig, images map[string]api.ImageSummary) (bool, error) {
  325. if service.Provider != nil {
  326. return false, nil
  327. }
  328. if service.Image == "" {
  329. return false, nil
  330. }
  331. policy, duration, err := service.GetPullPolicy()
  332. if err != nil {
  333. return false, err
  334. }
  335. switch policy {
  336. case types.PullPolicyAlways:
  337. // force pull
  338. return true, nil
  339. case types.PullPolicyNever, types.PullPolicyBuild:
  340. return false, nil
  341. case types.PullPolicyRefresh:
  342. img, ok := images[service.Image]
  343. if !ok {
  344. return true, nil
  345. }
  346. return time.Now().After(img.LastTagTime.Add(duration)), nil
  347. default: // Pull if missing
  348. _, ok := images[service.Image]
  349. return !ok, nil
  350. }
  351. }
  352. func isServiceImageToBuild(service types.ServiceConfig, services types.Services) bool {
  353. if service.Build != nil {
  354. return true
  355. }
  356. if service.Image == "" {
  357. // N.B. this should be impossible as service must have either `build` or `image` (or both)
  358. return false
  359. }
  360. // look through the other services to see if another has a build definition for the same
  361. // image name
  362. for _, svc := range services {
  363. if svc.Image == service.Image && svc.Build != nil {
  364. return true
  365. }
  366. }
  367. return false
  368. }
  369. const (
  370. PreparingPhase = "Preparing"
  371. WaitingPhase = "Waiting"
  372. PullingFsPhase = "Pulling fs layer"
  373. DownloadingPhase = "Downloading"
  374. DownloadCompletePhase = "Download complete"
  375. ExtractingPhase = "Extracting"
  376. VerifyingChecksumPhase = "Verifying Checksum"
  377. AlreadyExistsPhase = "Already exists"
  378. PullCompletePhase = "Pull complete"
  379. )
  380. func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, w progress.Writer) {
  381. if jm.ID == "" || jm.Progress == nil {
  382. return
  383. }
  384. var (
  385. text string
  386. total int64
  387. percent int
  388. current int64
  389. status = progress.Working
  390. )
  391. text = jm.Progress.String()
  392. switch jm.Status {
  393. case PreparingPhase, WaitingPhase, PullingFsPhase:
  394. percent = 0
  395. case DownloadingPhase, ExtractingPhase, VerifyingChecksumPhase:
  396. if jm.Progress != nil {
  397. current = jm.Progress.Current
  398. total = jm.Progress.Total
  399. if jm.Progress.Total > 0 {
  400. percent = int(jm.Progress.Current * 100 / jm.Progress.Total)
  401. }
  402. }
  403. case DownloadCompletePhase, AlreadyExistsPhase, PullCompletePhase:
  404. status = progress.Done
  405. percent = 100
  406. }
  407. if strings.Contains(jm.Status, "Image is up to date") ||
  408. strings.Contains(jm.Status, "Downloaded newer image") {
  409. status = progress.Done
  410. percent = 100
  411. }
  412. if jm.Error != nil {
  413. status = progress.Error
  414. text = jm.Error.Message
  415. }
  416. w.Event(progress.Event{
  417. ID: jm.ID,
  418. ParentID: parent,
  419. Current: current,
  420. Total: total,
  421. Percent: percent,
  422. Text: jm.Status,
  423. Status: status,
  424. StatusText: text,
  425. })
  426. }