pull.go 12 KB


  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/compose-spec/compose-go/v2/types"
  25. "github.com/containerd/platforms"
  26. "github.com/distribution/reference"
  27. "github.com/docker/cli/cli/config/configfile"
  28. clitypes "github.com/docker/cli/cli/config/types"
  29. "github.com/docker/go-units"
  30. "github.com/moby/moby/api/types/jsonstream"
  31. "github.com/moby/moby/client"
  32. "github.com/opencontainers/go-digest"
  33. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  34. "github.com/sirupsen/logrus"
  35. "golang.org/x/sync/errgroup"
  36. "github.com/docker/compose/v5/internal/registry"
  37. "github.com/docker/compose/v5/pkg/api"
  38. )
  39. func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
  40. return Run(ctx, func(ctx context.Context) error {
  41. return s.pull(ctx, project, options)
  42. }, "pull", s.events)
  43. }
  44. func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
  45. images, err := s.getLocalImagesDigests(ctx, project)
  46. if err != nil {
  47. return err
  48. }
  49. eg, ctx := errgroup.WithContext(ctx)
  50. eg.SetLimit(s.maxConcurrency)
  51. var (
  52. mustBuild []string
  53. pullErrors = make([]error, len(project.Services))
  54. imagesBeingPulled = map[string]string{}
  55. )
  56. i := 0
  57. for name, service := range project.Services {
  58. if service.Image == "" {
  59. s.events.On(api.Resource{
  60. ID: name,
  61. Status: api.Done,
  62. Text: "Skipped",
  63. Details: "No image to be pulled",
  64. })
  65. continue
  66. }
  67. switch service.PullPolicy {
  68. case types.PullPolicyNever, types.PullPolicyBuild:
  69. s.events.On(api.Resource{
  70. ID: "Image " + service.Image,
  71. Status: api.Done,
  72. Text: "Skipped",
  73. })
  74. continue
  75. case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
  76. if imageAlreadyPresent(service.Image, images) {
  77. s.events.On(api.Resource{
  78. ID: "Image " + service.Image,
  79. Status: api.Done,
  80. Text: "Skipped",
  81. Details: "Image is already present locally",
  82. })
  83. continue
  84. }
  85. }
  86. if service.Build != nil && opts.IgnoreBuildable {
  87. s.events.On(api.Resource{
  88. ID: "Image " + service.Image,
  89. Status: api.Done,
  90. Text: "Skipped",
  91. Details: "Image can be built",
  92. })
  93. continue
  94. }
  95. if _, ok := imagesBeingPulled[service.Image]; ok {
  96. continue
  97. }
  98. imagesBeingPulled[service.Image] = service.Name
  99. idx := i
  100. eg.Go(func() error {
  101. _, err := s.pullServiceImage(ctx, service, opts.Quiet, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  102. if err != nil {
  103. pullErrors[idx] = err
  104. if service.Build != nil {
  105. mustBuild = append(mustBuild, service.Name)
  106. }
  107. if !opts.IgnoreFailures && service.Build == nil {
  108. if s.dryRun {
  109. s.events.On(errorEventf("Image "+service.Image,
  110. "error pulling image: %s", service.Image))
  111. }
  112. // fail fast if image can't be pulled nor built
  113. return err
  114. }
  115. }
  116. return nil
  117. })
  118. i++
  119. }
  120. err = eg.Wait()
  121. if len(mustBuild) > 0 {
  122. logrus.Warnf("WARNING: Some service image(s) must be built from source by running:\n docker compose build %s", strings.Join(mustBuild, " "))
  123. }
  124. if err != nil {
  125. return err
  126. }
  127. if opts.IgnoreFailures {
  128. return nil
  129. }
  130. return errors.Join(pullErrors...)
  131. }
  132. func imageAlreadyPresent(serviceImage string, localImages map[string]api.ImageSummary) bool {
  133. normalizedImage, err := reference.ParseDockerRef(serviceImage)
  134. if err != nil {
  135. return false
  136. }
  137. switch refType := normalizedImage.(type) {
  138. case reference.NamedTagged:
  139. _, ok := localImages[serviceImage]
  140. return ok && refType.Tag() != "latest"
  141. default:
  142. _, ok := localImages[serviceImage]
  143. return ok
  144. }
  145. }
  146. func getUnwrappedErrorMessage(err error) string {
  147. derr := errors.Unwrap(err)
  148. if derr != nil {
  149. return getUnwrappedErrorMessage(derr)
  150. }
  151. return err.Error()
  152. }
  153. func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, quietPull bool, defaultPlatform string) (string, error) {
  154. resource := "Image " + service.Image
  155. s.events.On(pullingEvent(service.Image))
  156. ref, err := reference.ParseNormalizedNamed(service.Image)
  157. if err != nil {
  158. return "", err
  159. }
  160. encodedAuth, err := encodedAuth(ref, s.configFile())
  161. if err != nil {
  162. return "", err
  163. }
  164. platform := service.Platform
  165. if platform == "" {
  166. platform = defaultPlatform
  167. }
  168. var ociPlatforms []ocispec.Platform
  169. if platform != "" {
  170. p, err := platforms.Parse(platform)
  171. if err != nil {
  172. return "", err
  173. }
  174. ociPlatforms = append(ociPlatforms, p)
  175. }
  176. stream, err := s.apiClient().ImagePull(ctx, service.Image, client.ImagePullOptions{
  177. RegistryAuth: encodedAuth,
  178. Platforms: ociPlatforms,
  179. })
  180. if ctx.Err() != nil {
  181. s.events.On(api.Resource{
  182. ID: resource,
  183. Status: api.Warning,
  184. Text: "Interrupted",
  185. })
  186. return "", nil
  187. }
  188. // check if has error and the service has a build section
  189. // then the status should be warning instead of error
  190. if err != nil && service.Build != nil {
  191. s.events.On(api.Resource{
  192. ID: resource,
  193. Status: api.Warning,
  194. Text: getUnwrappedErrorMessage(err),
  195. })
  196. return "", err
  197. }
  198. if err != nil {
  199. s.events.On(errorEvent(resource, getUnwrappedErrorMessage(err)))
  200. return "", err
  201. }
  202. dec := json.NewDecoder(stream)
  203. for {
  204. var jm jsonstream.Message
  205. if err := dec.Decode(&jm); err != nil {
  206. if errors.Is(err, io.EOF) {
  207. break
  208. }
  209. return "", err
  210. }
  211. if jm.Error != nil {
  212. return "", errors.New(jm.Error.Message)
  213. }
  214. if !quietPull {
  215. toPullProgressEvent(resource, jm, s.events)
  216. }
  217. }
  218. s.events.On(pulledEvent(service.Image))
  219. inspected, err := s.apiClient().ImageInspect(ctx, service.Image)
  220. if err != nil {
  221. return "", err
  222. }
  223. return inspected.ID, nil
  224. }
  225. // ImageDigestResolver creates a func able to resolve image digest from a docker ref,
  226. func ImageDigestResolver(ctx context.Context, file *configfile.ConfigFile, apiClient client.APIClient) func(named reference.Named) (digest.Digest, error) {
  227. return func(named reference.Named) (digest.Digest, error) {
  228. auth, err := encodedAuth(named, file)
  229. if err != nil {
  230. return "", err
  231. }
  232. inspect, err := apiClient.DistributionInspect(ctx, named.String(), client.DistributionInspectOptions{
  233. EncodedRegistryAuth: auth,
  234. })
  235. if err != nil {
  236. return "",
  237. fmt.Errorf("failed to resolve digest for %s: %w", named.String(), err)
  238. }
  239. return inspect.Descriptor.Digest, nil
  240. }
  241. }
  242. type authProvider interface {
  243. GetAuthConfig(registryHostname string) (clitypes.AuthConfig, error)
  244. }
  245. func encodedAuth(ref reference.Named, configFile authProvider) (string, error) {
  246. authConfig, err := configFile.GetAuthConfig(registry.GetAuthConfigKey(reference.Domain(ref)))
  247. if err != nil {
  248. return "", err
  249. }
  250. buf, err := json.Marshal(authConfig)
  251. if err != nil {
  252. return "", err
  253. }
  254. return base64.URLEncoding.EncodeToString(buf), nil
  255. }
  256. func (s *composeService) pullRequiredImages(ctx context.Context, project *types.Project, images map[string]api.ImageSummary, quietPull bool) error {
  257. needPull := map[string]types.ServiceConfig{}
  258. for name, service := range project.Services {
  259. pull, err := mustPull(service, images)
  260. if err != nil {
  261. return err
  262. }
  263. if pull {
  264. needPull[name] = service
  265. }
  266. for i, vol := range service.Volumes {
  267. if vol.Type == types.VolumeTypeImage {
  268. if _, ok := images[vol.Source]; !ok {
  269. // Hack: create a fake ServiceConfig so we pull missing volume image
  270. n := fmt.Sprintf("%s:volume %d", name, i)
  271. needPull[n] = types.ServiceConfig{
  272. Name: n,
  273. Image: vol.Source,
  274. }
  275. }
  276. }
  277. }
  278. }
  279. if len(needPull) == 0 {
  280. return nil
  281. }
  282. eg, ctx := errgroup.WithContext(ctx)
  283. eg.SetLimit(s.maxConcurrency)
  284. pulledImages := map[string]api.ImageSummary{}
  285. var mutex sync.Mutex
  286. for name, service := range needPull {
  287. eg.Go(func() error {
  288. id, err := s.pullServiceImage(ctx, service, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  289. mutex.Lock()
  290. defer mutex.Unlock()
  291. pulledImages[name] = api.ImageSummary{
  292. ID: id,
  293. Repository: service.Image,
  294. LastTagTime: time.Now(),
  295. }
  296. if err != nil && isServiceImageToBuild(service, project.Services) {
  297. // image can be built, so we can ignore pull failure
  298. return nil
  299. }
  300. return err
  301. })
  302. }
  303. err := eg.Wait()
  304. for i, service := range needPull {
  305. if pulledImages[i].ID != "" {
  306. images[service.Image] = pulledImages[i]
  307. }
  308. }
  309. return err
  310. }
  311. func mustPull(service types.ServiceConfig, images map[string]api.ImageSummary) (bool, error) {
  312. if service.Provider != nil {
  313. return false, nil
  314. }
  315. if service.Image == "" {
  316. return false, nil
  317. }
  318. policy, duration, err := service.GetPullPolicy()
  319. if err != nil {
  320. return false, err
  321. }
  322. switch policy {
  323. case types.PullPolicyAlways:
  324. // force pull
  325. return true, nil
  326. case types.PullPolicyNever, types.PullPolicyBuild:
  327. return false, nil
  328. case types.PullPolicyRefresh:
  329. img, ok := images[service.Image]
  330. if !ok {
  331. return true, nil
  332. }
  333. return time.Now().After(img.LastTagTime.Add(duration)), nil
  334. default: // Pull if missing
  335. _, ok := images[service.Image]
  336. return !ok, nil
  337. }
  338. }
  339. func isServiceImageToBuild(service types.ServiceConfig, services types.Services) bool {
  340. if service.Build != nil {
  341. return true
  342. }
  343. if service.Image == "" {
  344. // N.B. this should be impossible as service must have either `build` or `image` (or both)
  345. return false
  346. }
  347. // look through the other services to see if another has a build definition for the same
  348. // image name
  349. for _, svc := range services {
  350. if svc.Image == service.Image && svc.Build != nil {
  351. return true
  352. }
  353. }
  354. return false
  355. }
  356. const (
  357. PreparingPhase = "Preparing"
  358. WaitingPhase = "waiting"
  359. PullingFsPhase = "Pulling fs layer"
  360. DownloadingPhase = "Downloading"
  361. DownloadCompletePhase = "Download complete"
  362. ExtractingPhase = "Extracting"
  363. VerifyingChecksumPhase = "Verifying Checksum"
  364. AlreadyExistsPhase = "Already exists"
  365. PullCompletePhase = "Pull complete"
  366. )
  367. func toPullProgressEvent(parent string, jm jsonstream.Message, events api.EventProcessor) {
  368. if jm.ID == "" || jm.Progress == nil {
  369. return
  370. }
  371. var (
  372. details string
  373. total int64
  374. percent int
  375. current int64
  376. status = api.Working
  377. )
  378. switch jm.Status {
  379. case PreparingPhase, WaitingPhase, PullingFsPhase:
  380. percent = 0
  381. case DownloadingPhase, ExtractingPhase, VerifyingChecksumPhase:
  382. if jm.Progress != nil {
  383. current = jm.Progress.Current
  384. total = jm.Progress.Total
  385. if jm.Progress.Total > 0 {
  386. percent = min(int(jm.Progress.Current*100/jm.Progress.Total), 100)
  387. }
  388. }
  389. case DownloadCompletePhase, AlreadyExistsPhase, PullCompletePhase:
  390. status = api.Done
  391. percent = 100
  392. }
  393. if strings.Contains(jm.Status, "Image is up to date") ||
  394. strings.Contains(jm.Status, "Downloaded newer image") {
  395. status = api.Done
  396. percent = 100
  397. }
  398. if jm.Error != nil {
  399. status = api.Error
  400. details = jm.Error.Message
  401. } else {
  402. details = units.HumanSize(float64(jm.Progress.Current))
  403. }
  404. events.On(api.Resource{
  405. ID: jm.ID,
  406. ParentID: parent,
  407. Current: current,
  408. Total: total,
  409. Percent: percent,
  410. Status: status,
  411. Text: jm.Status,
  412. Details: details,
  413. })
  414. }