pull.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/compose-spec/compose-go/v2/types"
  25. "github.com/distribution/reference"
  26. "github.com/docker/buildx/driver"
  27. "github.com/docker/cli/cli/config/configfile"
  28. "github.com/docker/docker/api/types/image"
  29. "github.com/docker/docker/client"
  30. "github.com/docker/docker/pkg/jsonmessage"
  31. "github.com/opencontainers/go-digest"
  32. "golang.org/x/sync/errgroup"
  33. "github.com/docker/compose/v2/internal/registry"
  34. "github.com/docker/compose/v2/pkg/api"
  35. "github.com/docker/compose/v2/pkg/progress"
  36. )
  37. func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
  38. return progress.RunWithTitle(ctx, func(ctx context.Context) error {
  39. return s.pull(ctx, project, options)
  40. }, s.stdinfo(), "Pulling")
  41. }
  42. func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
  43. images, err := s.getLocalImagesDigests(ctx, project)
  44. if err != nil {
  45. return err
  46. }
  47. w := progress.ContextWriter(ctx)
  48. eg, ctx := errgroup.WithContext(ctx)
  49. eg.SetLimit(s.maxConcurrency)
  50. var (
  51. mustBuild []string
  52. pullErrors = make([]error, len(project.Services))
  53. imagesBeingPulled = map[string]string{}
  54. )
  55. i := 0
  56. for name, service := range project.Services {
  57. if service.Image == "" {
  58. w.Event(progress.Event{
  59. ID: name,
  60. Status: progress.Done,
  61. Text: "Skipped - No image to be pulled",
  62. })
  63. continue
  64. }
  65. switch service.PullPolicy {
  66. case types.PullPolicyNever, types.PullPolicyBuild:
  67. w.Event(progress.Event{
  68. ID: name,
  69. Status: progress.Done,
  70. Text: "Skipped",
  71. })
  72. continue
  73. case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
  74. if imageAlreadyPresent(service.Image, images) {
  75. w.Event(progress.Event{
  76. ID: name,
  77. Status: progress.Done,
  78. Text: "Skipped - Image is already present locally",
  79. })
  80. continue
  81. }
  82. }
  83. if service.Build != nil && opts.IgnoreBuildable {
  84. w.Event(progress.Event{
  85. ID: name,
  86. Status: progress.Done,
  87. Text: "Skipped - Image can be built",
  88. })
  89. continue
  90. }
  91. if s, ok := imagesBeingPulled[service.Image]; ok {
  92. w.Event(progress.Event{
  93. ID: name,
  94. Status: progress.Done,
  95. Text: fmt.Sprintf("Skipped - Image is already being pulled by %v", s),
  96. })
  97. continue
  98. }
  99. imagesBeingPulled[service.Image] = service.Name
  100. idx := i
  101. eg.Go(func() error {
  102. _, err := s.pullServiceImage(ctx, service, s.configFile(), w, opts.Quiet, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  103. if err != nil {
  104. pullErrors[idx] = err
  105. if service.Build != nil {
  106. mustBuild = append(mustBuild, service.Name)
  107. }
  108. if !opts.IgnoreFailures && service.Build == nil {
  109. if s.dryRun {
  110. w.Event(progress.Event{
  111. ID: name,
  112. Status: progress.Error,
  113. Text: fmt.Sprintf(" - Pull error for image: %s", service.Image),
  114. })
  115. }
  116. // fail fast if image can't be pulled nor built
  117. return err
  118. }
  119. }
  120. return nil
  121. })
  122. i++
  123. }
  124. err = eg.Wait()
  125. if len(mustBuild) > 0 {
  126. w.TailMsgf("WARNING: Some service image(s) must be built from source by running:\n docker compose build %s", strings.Join(mustBuild, " "))
  127. }
  128. if err != nil {
  129. return err
  130. }
  131. if opts.IgnoreFailures {
  132. return nil
  133. }
  134. return errors.Join(pullErrors...)
  135. }
  136. func imageAlreadyPresent(serviceImage string, localImages map[string]api.ImageSummary) bool {
  137. normalizedImage, err := reference.ParseDockerRef(serviceImage)
  138. if err != nil {
  139. return false
  140. }
  141. switch refType := normalizedImage.(type) {
  142. case reference.NamedTagged:
  143. _, ok := localImages[serviceImage]
  144. return ok && refType.Tag() != "latest"
  145. default:
  146. _, ok := localImages[serviceImage]
  147. return ok
  148. }
  149. }
  150. func getUnwrappedErrorMessage(err error) string {
  151. derr := errors.Unwrap(err)
  152. if derr != nil {
  153. return getUnwrappedErrorMessage(derr)
  154. }
  155. return err.Error()
  156. }
  157. func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig,
  158. configFile driver.Auth, w progress.Writer, quietPull bool, defaultPlatform string,
  159. ) (string, error) {
  160. w.Event(progress.Event{
  161. ID: service.Name,
  162. Status: progress.Working,
  163. Text: "Pulling",
  164. })
  165. ref, err := reference.ParseNormalizedNamed(service.Image)
  166. if err != nil {
  167. return "", err
  168. }
  169. encodedAuth, err := encodedAuth(ref, configFile)
  170. if err != nil {
  171. return "", err
  172. }
  173. platform := service.Platform
  174. if platform == "" {
  175. platform = defaultPlatform
  176. }
  177. stream, err := s.apiClient().ImagePull(ctx, service.Image, image.PullOptions{
  178. RegistryAuth: encodedAuth,
  179. Platform: platform,
  180. })
  181. if ctx.Err() != nil {
  182. w.Event(progress.Event{
  183. ID: service.Name,
  184. Status: progress.Warning,
  185. StatusText: "Interrupted",
  186. })
  187. return "", nil
  188. }
  189. // check if has error and the service has a build section
  190. // then the status should be warning instead of error
  191. if err != nil && service.Build != nil {
  192. w.Event(progress.Event{
  193. ID: service.Name,
  194. Status: progress.Warning,
  195. Text: "Warning",
  196. StatusText: getUnwrappedErrorMessage(err),
  197. })
  198. return "", err
  199. }
  200. if err != nil {
  201. w.Event(progress.Event{
  202. ID: service.Name,
  203. Status: progress.Error,
  204. Text: "Error",
  205. StatusText: getUnwrappedErrorMessage(err),
  206. })
  207. return "", err
  208. }
  209. dec := json.NewDecoder(stream)
  210. for {
  211. var jm jsonmessage.JSONMessage
  212. if err := dec.Decode(&jm); err != nil {
  213. if errors.Is(err, io.EOF) {
  214. break
  215. }
  216. return "", err
  217. }
  218. if jm.Error != nil {
  219. return "", errors.New(jm.Error.Message)
  220. }
  221. if !quietPull {
  222. toPullProgressEvent(service.Name, jm, w)
  223. }
  224. }
  225. w.Event(progress.Event{
  226. ID: service.Name,
  227. Status: progress.Done,
  228. Text: "Pulled",
  229. })
  230. inspected, err := s.apiClient().ImageInspect(ctx, service.Image)
  231. if err != nil {
  232. return "", err
  233. }
  234. return inspected.ID, nil
  235. }
  236. // ImageDigestResolver creates a func able to resolve image digest from a docker ref,
  237. func ImageDigestResolver(ctx context.Context, file *configfile.ConfigFile, apiClient client.APIClient) func(named reference.Named) (digest.Digest, error) {
  238. return func(named reference.Named) (digest.Digest, error) {
  239. auth, err := encodedAuth(named, file)
  240. if err != nil {
  241. return "", err
  242. }
  243. inspect, err := apiClient.DistributionInspect(ctx, named.String(), auth)
  244. if err != nil {
  245. return "",
  246. fmt.Errorf("failed to resolve digest for %s: %w", named.String(), err)
  247. }
  248. return inspect.Descriptor.Digest, nil
  249. }
  250. }
  251. func encodedAuth(ref reference.Named, configFile driver.Auth) (string, error) {
  252. authConfig, err := configFile.GetAuthConfig(registry.GetAuthConfigKey(reference.Domain(ref)))
  253. if err != nil {
  254. return "", err
  255. }
  256. buf, err := json.Marshal(authConfig)
  257. if err != nil {
  258. return "", err
  259. }
  260. return base64.URLEncoding.EncodeToString(buf), nil
  261. }
  262. func (s *composeService) pullRequiredImages(ctx context.Context, project *types.Project, images map[string]api.ImageSummary, quietPull bool) error {
  263. needPull := map[string]types.ServiceConfig{}
  264. for name, service := range project.Services {
  265. pull, err := mustPull(service, images)
  266. if err != nil {
  267. return err
  268. }
  269. if pull {
  270. needPull[name] = service
  271. }
  272. for i, vol := range service.Volumes {
  273. if vol.Type == types.VolumeTypeImage {
  274. if _, ok := images[vol.Source]; !ok {
  275. // Hack: create a fake ServiceConfig so we pull missing volume image
  276. n := fmt.Sprintf("%s:volume %d", name, i)
  277. needPull[n] = types.ServiceConfig{
  278. Name: n,
  279. Image: vol.Source,
  280. }
  281. }
  282. }
  283. }
  284. }
  285. if len(needPull) == 0 {
  286. return nil
  287. }
  288. return progress.Run(ctx, func(ctx context.Context) error {
  289. w := progress.ContextWriter(ctx)
  290. eg, ctx := errgroup.WithContext(ctx)
  291. eg.SetLimit(s.maxConcurrency)
  292. pulledImages := map[string]api.ImageSummary{}
  293. var mutex sync.Mutex
  294. for name, service := range needPull {
  295. eg.Go(func() error {
  296. id, err := s.pullServiceImage(ctx, service, s.configFile(), w, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  297. mutex.Lock()
  298. defer mutex.Unlock()
  299. pulledImages[name] = api.ImageSummary{
  300. ID: id,
  301. Repository: service.Image,
  302. LastTagTime: time.Now(),
  303. }
  304. if err != nil && isServiceImageToBuild(service, project.Services) {
  305. // image can be built, so we can ignore pull failure
  306. return nil
  307. }
  308. return err
  309. })
  310. }
  311. err := eg.Wait()
  312. for i, service := range needPull {
  313. if pulledImages[i].ID != "" {
  314. images[service.Image] = pulledImages[i]
  315. }
  316. }
  317. return err
  318. }, s.stdinfo())
  319. }
  320. func mustPull(service types.ServiceConfig, images map[string]api.ImageSummary) (bool, error) {
  321. if service.Provider != nil {
  322. return false, nil
  323. }
  324. if service.Image == "" {
  325. return false, nil
  326. }
  327. policy, duration, err := service.GetPullPolicy()
  328. if err != nil {
  329. return false, err
  330. }
  331. switch policy {
  332. case types.PullPolicyAlways:
  333. // force pull
  334. return true, nil
  335. case types.PullPolicyNever, types.PullPolicyBuild:
  336. return false, nil
  337. case types.PullPolicyRefresh:
  338. img, ok := images[service.Image]
  339. if !ok {
  340. return true, nil
  341. }
  342. return time.Now().After(img.LastTagTime.Add(duration)), nil
  343. default: // Pull if missing
  344. _, ok := images[service.Image]
  345. return !ok, nil
  346. }
  347. }
  348. func isServiceImageToBuild(service types.ServiceConfig, services types.Services) bool {
  349. if service.Build != nil {
  350. return true
  351. }
  352. if service.Image == "" {
  353. // N.B. this should be impossible as service must have either `build` or `image` (or both)
  354. return false
  355. }
  356. // look through the other services to see if another has a build definition for the same
  357. // image name
  358. for _, svc := range services {
  359. if svc.Image == service.Image && svc.Build != nil {
  360. return true
  361. }
  362. }
  363. return false
  364. }
  365. const (
  366. PreparingPhase = "Preparing"
  367. WaitingPhase = "Waiting"
  368. PullingFsPhase = "Pulling fs layer"
  369. DownloadingPhase = "Downloading"
  370. DownloadCompletePhase = "Download complete"
  371. ExtractingPhase = "Extracting"
  372. VerifyingChecksumPhase = "Verifying Checksum"
  373. AlreadyExistsPhase = "Already exists"
  374. PullCompletePhase = "Pull complete"
  375. )
  376. func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, w progress.Writer) {
  377. if jm.ID == "" || jm.Progress == nil {
  378. return
  379. }
  380. var (
  381. text string
  382. total int64
  383. percent int
  384. current int64
  385. status = progress.Working
  386. )
  387. text = jm.Progress.String()
  388. switch jm.Status {
  389. case PreparingPhase, WaitingPhase, PullingFsPhase:
  390. percent = 0
  391. case DownloadingPhase, ExtractingPhase, VerifyingChecksumPhase:
  392. if jm.Progress != nil {
  393. current = jm.Progress.Current
  394. total = jm.Progress.Total
  395. if jm.Progress.Total > 0 {
  396. percent = int(jm.Progress.Current * 100 / jm.Progress.Total)
  397. }
  398. }
  399. case DownloadCompletePhase, AlreadyExistsPhase, PullCompletePhase:
  400. status = progress.Done
  401. percent = 100
  402. }
  403. if strings.Contains(jm.Status, "Image is up to date") ||
  404. strings.Contains(jm.Status, "Downloaded newer image") {
  405. status = progress.Done
  406. percent = 100
  407. }
  408. if jm.Error != nil {
  409. status = progress.Error
  410. text = jm.Error.Message
  411. }
  412. w.Event(progress.Event{
  413. ID: jm.ID,
  414. ParentID: parent,
  415. Current: current,
  416. Total: total,
  417. Percent: percent,
  418. Text: jm.Status,
  419. Status: status,
  420. StatusText: text,
  421. })
  422. }