pull.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/compose-spec/compose-go/v2/types"
  25. "github.com/distribution/reference"
  26. "github.com/docker/buildx/driver"
  27. "github.com/docker/cli/cli/config/configfile"
  28. "github.com/docker/docker/api/types/image"
  29. "github.com/docker/docker/client"
  30. "github.com/docker/docker/pkg/jsonmessage"
  31. "github.com/opencontainers/go-digest"
  32. "github.com/sirupsen/logrus"
  33. "golang.org/x/sync/errgroup"
  34. "github.com/docker/compose/v2/internal/registry"
  35. "github.com/docker/compose/v2/pkg/api"
  36. )
  37. func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
  38. return Run(ctx, func(ctx context.Context) error {
  39. return s.pull(ctx, project, options)
  40. }, "pull", s.events)
  41. }
  42. func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
  43. images, err := s.getLocalImagesDigests(ctx, project)
  44. if err != nil {
  45. return err
  46. }
  47. eg, ctx := errgroup.WithContext(ctx)
  48. eg.SetLimit(s.maxConcurrency)
  49. var (
  50. mustBuild []string
  51. pullErrors = make([]error, len(project.Services))
  52. imagesBeingPulled = map[string]string{}
  53. )
  54. i := 0
  55. for name, service := range project.Services {
  56. if service.Image == "" {
  57. s.events.On(api.Resource{
  58. ID: name,
  59. Status: api.Done,
  60. Text: "Skipped",
  61. Details: "No image to be pulled",
  62. })
  63. continue
  64. }
  65. switch service.PullPolicy {
  66. case types.PullPolicyNever, types.PullPolicyBuild:
  67. s.events.On(api.Resource{
  68. ID: "Image " + service.Image,
  69. Status: api.Done,
  70. Text: "Skipped",
  71. })
  72. continue
  73. case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
  74. if imageAlreadyPresent(service.Image, images) {
  75. s.events.On(api.Resource{
  76. ID: "Image " + service.Image,
  77. Status: api.Done,
  78. Text: "Skipped",
  79. Details: "Image is already present locally",
  80. })
  81. continue
  82. }
  83. }
  84. if service.Build != nil && opts.IgnoreBuildable {
  85. s.events.On(api.Resource{
  86. ID: "Image " + service.Image,
  87. Status: api.Done,
  88. Text: "Skipped",
  89. Details: "Image can be built",
  90. })
  91. continue
  92. }
  93. if _, ok := imagesBeingPulled[service.Image]; ok {
  94. continue
  95. }
  96. imagesBeingPulled[service.Image] = service.Name
  97. idx := i
  98. eg.Go(func() error {
  99. _, err := s.pullServiceImage(ctx, service, opts.Quiet, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  100. if err != nil {
  101. pullErrors[idx] = err
  102. if service.Build != nil {
  103. mustBuild = append(mustBuild, service.Name)
  104. }
  105. if !opts.IgnoreFailures && service.Build == nil {
  106. if s.dryRun {
  107. s.events.On(errorEventf("Image "+service.Image,
  108. "error pulling image: %s", service.Image))
  109. }
  110. // fail fast if image can't be pulled nor built
  111. return err
  112. }
  113. }
  114. return nil
  115. })
  116. i++
  117. }
  118. err = eg.Wait()
  119. if len(mustBuild) > 0 {
  120. logrus.Warnf("WARNING: Some service image(s) must be built from source by running:\n docker compose build %s", strings.Join(mustBuild, " "))
  121. }
  122. if err != nil {
  123. return err
  124. }
  125. if opts.IgnoreFailures {
  126. return nil
  127. }
  128. return errors.Join(pullErrors...)
  129. }
  130. func imageAlreadyPresent(serviceImage string, localImages map[string]api.ImageSummary) bool {
  131. normalizedImage, err := reference.ParseDockerRef(serviceImage)
  132. if err != nil {
  133. return false
  134. }
  135. switch refType := normalizedImage.(type) {
  136. case reference.NamedTagged:
  137. _, ok := localImages[serviceImage]
  138. return ok && refType.Tag() != "latest"
  139. default:
  140. _, ok := localImages[serviceImage]
  141. return ok
  142. }
  143. }
  144. func getUnwrappedErrorMessage(err error) string {
  145. derr := errors.Unwrap(err)
  146. if derr != nil {
  147. return getUnwrappedErrorMessage(derr)
  148. }
  149. return err.Error()
  150. }
  151. func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, quietPull bool, defaultPlatform string) (string, error) {
  152. resource := "Image " + service.Image
  153. s.events.On(pullingEvent(service.Image))
  154. ref, err := reference.ParseNormalizedNamed(service.Image)
  155. if err != nil {
  156. return "", err
  157. }
  158. encodedAuth, err := encodedAuth(ref, s.configFile())
  159. if err != nil {
  160. return "", err
  161. }
  162. platform := service.Platform
  163. if platform == "" {
  164. platform = defaultPlatform
  165. }
  166. stream, err := s.apiClient().ImagePull(ctx, service.Image, image.PullOptions{
  167. RegistryAuth: encodedAuth,
  168. Platform: platform,
  169. })
  170. if ctx.Err() != nil {
  171. s.events.On(api.Resource{
  172. ID: resource,
  173. Status: api.Warning,
  174. Text: "Interrupted",
  175. })
  176. return "", nil
  177. }
  178. // check if has error and the service has a build section
  179. // then the status should be warning instead of error
  180. if err != nil && service.Build != nil {
  181. s.events.On(api.Resource{
  182. ID: resource,
  183. Status: api.Warning,
  184. Text: getUnwrappedErrorMessage(err),
  185. })
  186. return "", err
  187. }
  188. if err != nil {
  189. s.events.On(errorEvent(resource, getUnwrappedErrorMessage(err)))
  190. return "", err
  191. }
  192. dec := json.NewDecoder(stream)
  193. for {
  194. var jm jsonmessage.JSONMessage
  195. if err := dec.Decode(&jm); err != nil {
  196. if errors.Is(err, io.EOF) {
  197. break
  198. }
  199. return "", err
  200. }
  201. if jm.Error != nil {
  202. return "", errors.New(jm.Error.Message)
  203. }
  204. if !quietPull {
  205. toPullProgressEvent(resource, jm, s.events)
  206. }
  207. }
  208. s.events.On(pulledEvent(service.Image))
  209. inspected, err := s.apiClient().ImageInspect(ctx, service.Image)
  210. if err != nil {
  211. return "", err
  212. }
  213. return inspected.ID, nil
  214. }
  215. // ImageDigestResolver creates a func able to resolve image digest from a docker ref,
  216. func ImageDigestResolver(ctx context.Context, file *configfile.ConfigFile, apiClient client.APIClient) func(named reference.Named) (digest.Digest, error) {
  217. return func(named reference.Named) (digest.Digest, error) {
  218. auth, err := encodedAuth(named, file)
  219. if err != nil {
  220. return "", err
  221. }
  222. inspect, err := apiClient.DistributionInspect(ctx, named.String(), auth)
  223. if err != nil {
  224. return "",
  225. fmt.Errorf("failed to resolve digest for %s: %w", named.String(), err)
  226. }
  227. return inspect.Descriptor.Digest, nil
  228. }
  229. }
  230. func encodedAuth(ref reference.Named, configFile driver.Auth) (string, error) {
  231. authConfig, err := configFile.GetAuthConfig(registry.GetAuthConfigKey(reference.Domain(ref)))
  232. if err != nil {
  233. return "", err
  234. }
  235. buf, err := json.Marshal(authConfig)
  236. if err != nil {
  237. return "", err
  238. }
  239. return base64.URLEncoding.EncodeToString(buf), nil
  240. }
  241. func (s *composeService) pullRequiredImages(ctx context.Context, project *types.Project, images map[string]api.ImageSummary, quietPull bool) error {
  242. needPull := map[string]types.ServiceConfig{}
  243. for name, service := range project.Services {
  244. pull, err := mustPull(service, images)
  245. if err != nil {
  246. return err
  247. }
  248. if pull {
  249. needPull[name] = service
  250. }
  251. for i, vol := range service.Volumes {
  252. if vol.Type == types.VolumeTypeImage {
  253. if _, ok := images[vol.Source]; !ok {
  254. // Hack: create a fake ServiceConfig so we pull missing volume image
  255. n := fmt.Sprintf("%s:volume %d", name, i)
  256. needPull[n] = types.ServiceConfig{
  257. Name: n,
  258. Image: vol.Source,
  259. }
  260. }
  261. }
  262. }
  263. }
  264. if len(needPull) == 0 {
  265. return nil
  266. }
  267. eg, ctx := errgroup.WithContext(ctx)
  268. eg.SetLimit(s.maxConcurrency)
  269. pulledImages := map[string]api.ImageSummary{}
  270. var mutex sync.Mutex
  271. for name, service := range needPull {
  272. eg.Go(func() error {
  273. id, err := s.pullServiceImage(ctx, service, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  274. mutex.Lock()
  275. defer mutex.Unlock()
  276. pulledImages[name] = api.ImageSummary{
  277. ID: id,
  278. Repository: service.Image,
  279. LastTagTime: time.Now(),
  280. }
  281. if err != nil && isServiceImageToBuild(service, project.Services) {
  282. // image can be built, so we can ignore pull failure
  283. return nil
  284. }
  285. return err
  286. })
  287. }
  288. err := eg.Wait()
  289. for i, service := range needPull {
  290. if pulledImages[i].ID != "" {
  291. images[service.Image] = pulledImages[i]
  292. }
  293. }
  294. return err
  295. }
  296. func mustPull(service types.ServiceConfig, images map[string]api.ImageSummary) (bool, error) {
  297. if service.Provider != nil {
  298. return false, nil
  299. }
  300. if service.Image == "" {
  301. return false, nil
  302. }
  303. policy, duration, err := service.GetPullPolicy()
  304. if err != nil {
  305. return false, err
  306. }
  307. switch policy {
  308. case types.PullPolicyAlways:
  309. // force pull
  310. return true, nil
  311. case types.PullPolicyNever, types.PullPolicyBuild:
  312. return false, nil
  313. case types.PullPolicyRefresh:
  314. img, ok := images[service.Image]
  315. if !ok {
  316. return true, nil
  317. }
  318. return time.Now().After(img.LastTagTime.Add(duration)), nil
  319. default: // Pull if missing
  320. _, ok := images[service.Image]
  321. return !ok, nil
  322. }
  323. }
  324. func isServiceImageToBuild(service types.ServiceConfig, services types.Services) bool {
  325. if service.Build != nil {
  326. return true
  327. }
  328. if service.Image == "" {
  329. // N.B. this should be impossible as service must have either `build` or `image` (or both)
  330. return false
  331. }
  332. // look through the other services to see if another has a build definition for the same
  333. // image name
  334. for _, svc := range services {
  335. if svc.Image == service.Image && svc.Build != nil {
  336. return true
  337. }
  338. }
  339. return false
  340. }
  341. const (
  342. PreparingPhase = "Preparing"
  343. WaitingPhase = "waiting"
  344. PullingFsPhase = "Pulling fs layer"
  345. DownloadingPhase = "Downloading"
  346. DownloadCompletePhase = "Download complete"
  347. ExtractingPhase = "Extracting"
  348. VerifyingChecksumPhase = "Verifying Checksum"
  349. AlreadyExistsPhase = "Already exists"
  350. PullCompletePhase = "Pull complete"
  351. )
  352. func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, events api.EventProcessor) {
  353. if jm.ID == "" || jm.Progress == nil {
  354. return
  355. }
  356. var (
  357. text string
  358. total int64
  359. percent int
  360. current int64
  361. status = api.Working
  362. )
  363. text = jm.Progress.String()
  364. switch jm.Status {
  365. case PreparingPhase, WaitingPhase, PullingFsPhase:
  366. percent = 0
  367. case DownloadingPhase, ExtractingPhase, VerifyingChecksumPhase:
  368. if jm.Progress != nil {
  369. current = jm.Progress.Current
  370. total = jm.Progress.Total
  371. if jm.Progress.Total > 0 {
  372. percent = int(jm.Progress.Current * 100 / jm.Progress.Total)
  373. }
  374. }
  375. case DownloadCompletePhase, AlreadyExistsPhase, PullCompletePhase:
  376. status = api.Done
  377. percent = 100
  378. }
  379. if strings.Contains(jm.Status, "Image is up to date") ||
  380. strings.Contains(jm.Status, "Downloaded newer image") {
  381. status = api.Done
  382. percent = 100
  383. }
  384. if jm.Error != nil {
  385. status = api.Error
  386. text = jm.Error.Message
  387. }
  388. events.On(api.Resource{
  389. ID: jm.ID,
  390. ParentID: parent,
  391. Current: current,
  392. Total: total,
  393. Percent: percent,
  394. Status: status,
  395. Text: text,
  396. })
  397. }