pull.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/compose-spec/compose-go/v2/types"
  25. "github.com/distribution/reference"
  26. "github.com/docker/buildx/driver"
  27. "github.com/docker/cli/cli/config/configfile"
  28. "github.com/docker/docker/api/types/image"
  29. "github.com/docker/docker/client"
  30. "github.com/docker/docker/pkg/jsonmessage"
  31. "github.com/opencontainers/go-digest"
  32. "github.com/sirupsen/logrus"
  33. "golang.org/x/sync/errgroup"
  34. "github.com/docker/compose/v2/internal/registry"
  35. "github.com/docker/compose/v2/pkg/api"
  36. "github.com/docker/compose/v2/pkg/progress"
  37. )
  38. func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
  39. return progress.Run(ctx, func(ctx context.Context) error {
  40. return s.pull(ctx, project, options)
  41. }, "pull", s.events)
  42. }
  43. func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
  44. images, err := s.getLocalImagesDigests(ctx, project)
  45. if err != nil {
  46. return err
  47. }
  48. eg, ctx := errgroup.WithContext(ctx)
  49. eg.SetLimit(s.maxConcurrency)
  50. var (
  51. mustBuild []string
  52. pullErrors = make([]error, len(project.Services))
  53. imagesBeingPulled = map[string]string{}
  54. )
  55. i := 0
  56. for name, service := range project.Services {
  57. if service.Image == "" {
  58. s.events.On(progress.Event{
  59. ID: name,
  60. Status: progress.Done,
  61. Text: "Skipped - No image to be pulled",
  62. })
  63. continue
  64. }
  65. switch service.PullPolicy {
  66. case types.PullPolicyNever, types.PullPolicyBuild:
  67. s.events.On(progress.Event{
  68. ID: "Image " + service.Image,
  69. Status: progress.Done,
  70. Text: "Skipped",
  71. })
  72. continue
  73. case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
  74. if imageAlreadyPresent(service.Image, images) {
  75. s.events.On(progress.Event{
  76. ID: "Image " + service.Image,
  77. Status: progress.Done,
  78. Text: "Skipped - Image is already present locally",
  79. })
  80. continue
  81. }
  82. }
  83. if service.Build != nil && opts.IgnoreBuildable {
  84. s.events.On(progress.Event{
  85. ID: "Image " + service.Image,
  86. Status: progress.Done,
  87. Text: "Skipped - Image can be built",
  88. })
  89. continue
  90. }
  91. if _, ok := imagesBeingPulled[service.Image]; ok {
  92. continue
  93. }
  94. imagesBeingPulled[service.Image] = service.Name
  95. idx := i
  96. eg.Go(func() error {
  97. _, err := s.pullServiceImage(ctx, service, opts.Quiet, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  98. if err != nil {
  99. pullErrors[idx] = err
  100. if service.Build != nil {
  101. mustBuild = append(mustBuild, service.Name)
  102. }
  103. if !opts.IgnoreFailures && service.Build == nil {
  104. if s.dryRun {
  105. s.events.On(progress.Event{
  106. ID: "Image " + service.Image,
  107. Status: progress.Error,
  108. Text: fmt.Sprintf(" - Pull error for image: %s", service.Image),
  109. })
  110. }
  111. // fail fast if image can't be pulled nor built
  112. return err
  113. }
  114. }
  115. return nil
  116. })
  117. i++
  118. }
  119. err = eg.Wait()
  120. if len(mustBuild) > 0 {
  121. logrus.Warnf("WARNING: Some service image(s) must be built from source by running:\n docker compose build %s", strings.Join(mustBuild, " "))
  122. }
  123. if err != nil {
  124. return err
  125. }
  126. if opts.IgnoreFailures {
  127. return nil
  128. }
  129. return errors.Join(pullErrors...)
  130. }
  131. func imageAlreadyPresent(serviceImage string, localImages map[string]api.ImageSummary) bool {
  132. normalizedImage, err := reference.ParseDockerRef(serviceImage)
  133. if err != nil {
  134. return false
  135. }
  136. switch refType := normalizedImage.(type) {
  137. case reference.NamedTagged:
  138. _, ok := localImages[serviceImage]
  139. return ok && refType.Tag() != "latest"
  140. default:
  141. _, ok := localImages[serviceImage]
  142. return ok
  143. }
  144. }
  145. func getUnwrappedErrorMessage(err error) string {
  146. derr := errors.Unwrap(err)
  147. if derr != nil {
  148. return getUnwrappedErrorMessage(derr)
  149. }
  150. return err.Error()
  151. }
  152. func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, quietPull bool, defaultPlatform string) (string, error) {
  153. resource := "Image " + service.Image
  154. s.events.On(progress.PullingEvent(service.Image))
  155. ref, err := reference.ParseNormalizedNamed(service.Image)
  156. if err != nil {
  157. return "", err
  158. }
  159. encodedAuth, err := encodedAuth(ref, s.configFile())
  160. if err != nil {
  161. return "", err
  162. }
  163. platform := service.Platform
  164. if platform == "" {
  165. platform = defaultPlatform
  166. }
  167. stream, err := s.apiClient().ImagePull(ctx, service.Image, image.PullOptions{
  168. RegistryAuth: encodedAuth,
  169. Platform: platform,
  170. })
  171. if ctx.Err() != nil {
  172. s.events.On(progress.Event{
  173. ID: resource,
  174. Status: progress.Warning,
  175. StatusText: "Interrupted",
  176. })
  177. return "", nil
  178. }
  179. // check if has error and the service has a build section
  180. // then the status should be warning instead of error
  181. if err != nil && service.Build != nil {
  182. s.events.On(progress.Event{
  183. ID: resource,
  184. Status: progress.Warning,
  185. Text: "Warning",
  186. StatusText: getUnwrappedErrorMessage(err),
  187. })
  188. return "", err
  189. }
  190. if err != nil {
  191. s.events.On(progress.Event{
  192. ID: resource,
  193. Status: progress.Error,
  194. Text: "Error",
  195. StatusText: getUnwrappedErrorMessage(err),
  196. })
  197. return "", err
  198. }
  199. dec := json.NewDecoder(stream)
  200. for {
  201. var jm jsonmessage.JSONMessage
  202. if err := dec.Decode(&jm); err != nil {
  203. if errors.Is(err, io.EOF) {
  204. break
  205. }
  206. return "", err
  207. }
  208. if jm.Error != nil {
  209. return "", errors.New(jm.Error.Message)
  210. }
  211. if !quietPull {
  212. toPullProgressEvent(resource, jm, s.events)
  213. }
  214. }
  215. s.events.On(progress.PulledEvent(service.Image))
  216. inspected, err := s.apiClient().ImageInspect(ctx, service.Image)
  217. if err != nil {
  218. return "", err
  219. }
  220. return inspected.ID, nil
  221. }
  222. // ImageDigestResolver creates a func able to resolve image digest from a docker ref,
  223. func ImageDigestResolver(ctx context.Context, file *configfile.ConfigFile, apiClient client.APIClient) func(named reference.Named) (digest.Digest, error) {
  224. return func(named reference.Named) (digest.Digest, error) {
  225. auth, err := encodedAuth(named, file)
  226. if err != nil {
  227. return "", err
  228. }
  229. inspect, err := apiClient.DistributionInspect(ctx, named.String(), auth)
  230. if err != nil {
  231. return "",
  232. fmt.Errorf("failed to resolve digest for %s: %w", named.String(), err)
  233. }
  234. return inspect.Descriptor.Digest, nil
  235. }
  236. }
  237. func encodedAuth(ref reference.Named, configFile driver.Auth) (string, error) {
  238. authConfig, err := configFile.GetAuthConfig(registry.GetAuthConfigKey(reference.Domain(ref)))
  239. if err != nil {
  240. return "", err
  241. }
  242. buf, err := json.Marshal(authConfig)
  243. if err != nil {
  244. return "", err
  245. }
  246. return base64.URLEncoding.EncodeToString(buf), nil
  247. }
  248. func (s *composeService) pullRequiredImages(ctx context.Context, project *types.Project, images map[string]api.ImageSummary, quietPull bool) error {
  249. needPull := map[string]types.ServiceConfig{}
  250. for name, service := range project.Services {
  251. pull, err := mustPull(service, images)
  252. if err != nil {
  253. return err
  254. }
  255. if pull {
  256. needPull[name] = service
  257. }
  258. for i, vol := range service.Volumes {
  259. if vol.Type == types.VolumeTypeImage {
  260. if _, ok := images[vol.Source]; !ok {
  261. // Hack: create a fake ServiceConfig so we pull missing volume image
  262. n := fmt.Sprintf("%s:volume %d", name, i)
  263. needPull[n] = types.ServiceConfig{
  264. Name: n,
  265. Image: vol.Source,
  266. }
  267. }
  268. }
  269. }
  270. }
  271. if len(needPull) == 0 {
  272. return nil
  273. }
  274. eg, ctx := errgroup.WithContext(ctx)
  275. eg.SetLimit(s.maxConcurrency)
  276. pulledImages := map[string]api.ImageSummary{}
  277. var mutex sync.Mutex
  278. for name, service := range needPull {
  279. eg.Go(func() error {
  280. id, err := s.pullServiceImage(ctx, service, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  281. mutex.Lock()
  282. defer mutex.Unlock()
  283. pulledImages[name] = api.ImageSummary{
  284. ID: id,
  285. Repository: service.Image,
  286. LastTagTime: time.Now(),
  287. }
  288. if err != nil && isServiceImageToBuild(service, project.Services) {
  289. // image can be built, so we can ignore pull failure
  290. return nil
  291. }
  292. return err
  293. })
  294. }
  295. err := eg.Wait()
  296. for i, service := range needPull {
  297. if pulledImages[i].ID != "" {
  298. images[service.Image] = pulledImages[i]
  299. }
  300. }
  301. return err
  302. }
  303. func mustPull(service types.ServiceConfig, images map[string]api.ImageSummary) (bool, error) {
  304. if service.Provider != nil {
  305. return false, nil
  306. }
  307. if service.Image == "" {
  308. return false, nil
  309. }
  310. policy, duration, err := service.GetPullPolicy()
  311. if err != nil {
  312. return false, err
  313. }
  314. switch policy {
  315. case types.PullPolicyAlways:
  316. // force pull
  317. return true, nil
  318. case types.PullPolicyNever, types.PullPolicyBuild:
  319. return false, nil
  320. case types.PullPolicyRefresh:
  321. img, ok := images[service.Image]
  322. if !ok {
  323. return true, nil
  324. }
  325. return time.Now().After(img.LastTagTime.Add(duration)), nil
  326. default: // Pull if missing
  327. _, ok := images[service.Image]
  328. return !ok, nil
  329. }
  330. }
  331. func isServiceImageToBuild(service types.ServiceConfig, services types.Services) bool {
  332. if service.Build != nil {
  333. return true
  334. }
  335. if service.Image == "" {
  336. // N.B. this should be impossible as service must have either `build` or `image` (or both)
  337. return false
  338. }
  339. // look through the other services to see if another has a build definition for the same
  340. // image name
  341. for _, svc := range services {
  342. if svc.Image == service.Image && svc.Build != nil {
  343. return true
  344. }
  345. }
  346. return false
  347. }
  348. const (
  349. PreparingPhase = "Preparing"
  350. WaitingPhase = "Waiting"
  351. PullingFsPhase = "Pulling fs layer"
  352. DownloadingPhase = "Downloading"
  353. DownloadCompletePhase = "Download complete"
  354. ExtractingPhase = "Extracting"
  355. VerifyingChecksumPhase = "Verifying Checksum"
  356. AlreadyExistsPhase = "Already exists"
  357. PullCompletePhase = "Pull complete"
  358. )
  359. func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, events progress.EventProcessor) {
  360. if jm.ID == "" || jm.Progress == nil {
  361. return
  362. }
  363. var (
  364. text string
  365. total int64
  366. percent int
  367. current int64
  368. status = progress.Working
  369. )
  370. text = jm.Progress.String()
  371. switch jm.Status {
  372. case PreparingPhase, WaitingPhase, PullingFsPhase:
  373. percent = 0
  374. case DownloadingPhase, ExtractingPhase, VerifyingChecksumPhase:
  375. if jm.Progress != nil {
  376. current = jm.Progress.Current
  377. total = jm.Progress.Total
  378. if jm.Progress.Total > 0 {
  379. percent = int(jm.Progress.Current * 100 / jm.Progress.Total)
  380. }
  381. }
  382. case DownloadCompletePhase, AlreadyExistsPhase, PullCompletePhase:
  383. status = progress.Done
  384. percent = 100
  385. }
  386. if strings.Contains(jm.Status, "Image is up to date") ||
  387. strings.Contains(jm.Status, "Downloaded newer image") {
  388. status = progress.Done
  389. percent = 100
  390. }
  391. if jm.Error != nil {
  392. status = progress.Error
  393. text = jm.Error.Message
  394. }
  395. events.On(progress.Event{
  396. ID: jm.ID,
  397. ParentID: parent,
  398. Current: current,
  399. Total: total,
  400. Percent: percent,
  401. Text: jm.Status,
  402. Status: status,
  403. StatusText: text,
  404. })
  405. }