pull.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "strings"
  22. "github.com/compose-spec/compose-go/types"
  23. "github.com/distribution/distribution/v3/reference"
  24. "github.com/docker/buildx/driver"
  25. moby "github.com/docker/docker/api/types"
  26. "github.com/docker/docker/pkg/jsonmessage"
  27. "github.com/docker/docker/registry"
  28. "github.com/hashicorp/go-multierror"
  29. "golang.org/x/sync/errgroup"
  30. "github.com/docker/compose/v2/pkg/api"
  31. "github.com/docker/compose/v2/pkg/progress"
  32. )
  33. func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
  34. if options.Quiet {
  35. return s.pull(ctx, project, options)
  36. }
  37. return progress.Run(ctx, func(ctx context.Context) error {
  38. return s.pull(ctx, project, options)
  39. })
  40. }
  41. func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
  42. info, err := s.apiClient().Info(ctx)
  43. if err != nil {
  44. return err
  45. }
  46. if info.IndexServerAddress == "" {
  47. info.IndexServerAddress = registry.IndexServer
  48. }
  49. images, err := s.getLocalImagesDigests(ctx, project)
  50. if err != nil {
  51. return err
  52. }
  53. w := progress.ContextWriter(ctx)
  54. eg, ctx := errgroup.WithContext(ctx)
  55. eg.SetLimit(s.maxConcurrency)
  56. var (
  57. mustBuild []string
  58. pullErrors = make([]error, len(project.Services))
  59. imagesBeingPulled = map[string]string{}
  60. )
  61. for i, service := range project.Services {
  62. i, service := i, service
  63. if service.Image == "" {
  64. w.Event(progress.Event{
  65. ID: service.Name,
  66. Status: progress.Done,
  67. Text: "Skipped - No image to be pulled",
  68. })
  69. continue
  70. }
  71. switch service.PullPolicy {
  72. case types.PullPolicyNever, types.PullPolicyBuild:
  73. w.Event(progress.Event{
  74. ID: service.Name,
  75. Status: progress.Done,
  76. Text: "Skipped",
  77. })
  78. continue
  79. case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
  80. if imageAlreadyPresent(service.Image, images) {
  81. w.Event(progress.Event{
  82. ID: service.Name,
  83. Status: progress.Done,
  84. Text: "Skipped - Image is already present locally",
  85. })
  86. continue
  87. }
  88. }
  89. if s, ok := imagesBeingPulled[service.Image]; ok {
  90. w.Event(progress.Event{
  91. ID: service.Name,
  92. Status: progress.Done,
  93. Text: fmt.Sprintf("Skipped - Image is already being pulled by %v", s),
  94. })
  95. continue
  96. }
  97. imagesBeingPulled[service.Image] = service.Name
  98. eg.Go(func() error {
  99. _, err := s.pullServiceImage(ctx, service, info, s.configFile(), w, false, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  100. if err != nil {
  101. pullErrors[i] = err
  102. if service.Build != nil {
  103. mustBuild = append(mustBuild, service.Name)
  104. }
  105. if !opts.IgnoreFailures && service.Build == nil {
  106. // fail fast if image can't be pulled nor built
  107. return err
  108. }
  109. }
  110. return nil
  111. })
  112. }
  113. err = eg.Wait()
  114. if len(mustBuild) > 0 {
  115. w.TailMsgf("WARNING: Some service image(s) must be built from source by running:\n docker compose build %s", strings.Join(mustBuild, " "))
  116. }
  117. if err != nil {
  118. return err
  119. }
  120. if opts.IgnoreFailures {
  121. return nil
  122. }
  123. return multierror.Append(nil, pullErrors...).ErrorOrNil()
  124. }
  125. func imageAlreadyPresent(serviceImage string, localImages map[string]string) bool {
  126. normalizedImage, err := reference.ParseDockerRef(serviceImage)
  127. if err != nil {
  128. return false
  129. }
  130. tagged, ok := normalizedImage.(reference.NamedTagged)
  131. if !ok {
  132. return false
  133. }
  134. _, ok = localImages[serviceImage]
  135. return ok && tagged.Tag() != "latest"
  136. }
  137. func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, info moby.Info,
  138. configFile driver.Auth, w progress.Writer, quietPull bool, defaultPlatform string) (string, error) {
  139. w.Event(progress.Event{
  140. ID: service.Name,
  141. Status: progress.Working,
  142. Text: "Pulling",
  143. })
  144. ref, err := reference.ParseNormalizedNamed(service.Image)
  145. if err != nil {
  146. return "", err
  147. }
  148. encodedAuth, err := encodedAuth(ref, info, configFile)
  149. if err != nil {
  150. return "", err
  151. }
  152. platform := service.Platform
  153. if platform == "" {
  154. platform = defaultPlatform
  155. }
  156. stream, err := s.apiClient().ImagePull(ctx, service.Image, moby.ImagePullOptions{
  157. RegistryAuth: encodedAuth,
  158. Platform: platform,
  159. })
  160. // check if has error and the service has a build section
  161. // then the status should be warning instead of error
  162. if err != nil && service.Build != nil {
  163. w.Event(progress.Event{
  164. ID: service.Name,
  165. Status: progress.Warning,
  166. Text: "Warning",
  167. })
  168. return "", WrapCategorisedComposeError(err, PullFailure)
  169. }
  170. if err != nil {
  171. w.Event(progress.Event{
  172. ID: service.Name,
  173. Status: progress.Error,
  174. Text: "Error",
  175. })
  176. return "", WrapCategorisedComposeError(err, PullFailure)
  177. }
  178. dec := json.NewDecoder(stream)
  179. for {
  180. var jm jsonmessage.JSONMessage
  181. if err := dec.Decode(&jm); err != nil {
  182. if err == io.EOF {
  183. break
  184. }
  185. return "", WrapCategorisedComposeError(err, PullFailure)
  186. }
  187. if jm.Error != nil {
  188. return "", WrapCategorisedComposeError(errors.New(jm.Error.Message), PullFailure)
  189. }
  190. if !quietPull {
  191. toPullProgressEvent(service.Name, jm, w)
  192. }
  193. }
  194. w.Event(progress.Event{
  195. ID: service.Name,
  196. Status: progress.Done,
  197. Text: "Pulled",
  198. })
  199. inspected, _, err := s.dockerCli.Client().ImageInspectWithRaw(ctx, service.Image)
  200. if err != nil {
  201. return "", err
  202. }
  203. return inspected.ID, nil
  204. }
  205. func encodedAuth(ref reference.Named, info moby.Info, configFile driver.Auth) (string, error) {
  206. repoInfo, err := registry.ParseRepositoryInfo(ref)
  207. if err != nil {
  208. return "", err
  209. }
  210. key := repoInfo.Index.Name
  211. if repoInfo.Index.Official {
  212. key = info.IndexServerAddress
  213. }
  214. authConfig, err := configFile.GetAuthConfig(key)
  215. if err != nil {
  216. return "", err
  217. }
  218. buf, err := json.Marshal(authConfig)
  219. if err != nil {
  220. return "", err
  221. }
  222. return base64.URLEncoding.EncodeToString(buf), nil
  223. }
  224. func (s *composeService) pullRequiredImages(ctx context.Context, project *types.Project, images map[string]string, quietPull bool) error {
  225. info, err := s.apiClient().Info(ctx)
  226. if err != nil {
  227. return err
  228. }
  229. if info.IndexServerAddress == "" {
  230. info.IndexServerAddress = registry.IndexServer
  231. }
  232. var needPull []types.ServiceConfig
  233. for _, service := range project.Services {
  234. if service.Image == "" {
  235. continue
  236. }
  237. switch service.PullPolicy {
  238. case "", types.PullPolicyMissing, types.PullPolicyIfNotPresent:
  239. if _, ok := images[service.Image]; ok {
  240. continue
  241. }
  242. case types.PullPolicyNever, types.PullPolicyBuild:
  243. continue
  244. case types.PullPolicyAlways:
  245. // force pull
  246. }
  247. needPull = append(needPull, service)
  248. }
  249. if len(needPull) == 0 {
  250. return nil
  251. }
  252. return progress.Run(ctx, func(ctx context.Context) error {
  253. w := progress.ContextWriter(ctx)
  254. eg, ctx := errgroup.WithContext(ctx)
  255. eg.SetLimit(s.maxConcurrency)
  256. pulledImages := make([]string, len(needPull))
  257. for i, service := range needPull {
  258. i, service := i, service
  259. eg.Go(func() error {
  260. id, err := s.pullServiceImage(ctx, service, info, s.configFile(), w, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
  261. pulledImages[i] = id
  262. if err != nil && isServiceImageToBuild(service, project.Services) {
  263. // image can be built, so we can ignore pull failure
  264. return nil
  265. }
  266. return err
  267. })
  268. }
  269. err := eg.Wait()
  270. for i, service := range needPull {
  271. if pulledImages[i] != "" {
  272. images[service.Image] = pulledImages[i]
  273. }
  274. }
  275. return err
  276. })
  277. }
  278. func isServiceImageToBuild(service types.ServiceConfig, services []types.ServiceConfig) bool {
  279. if service.Build != nil {
  280. return true
  281. }
  282. for _, depService := range services {
  283. if depService.Image == service.Image && depService.Build != nil {
  284. return true
  285. }
  286. }
  287. return false
  288. }
  289. func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, w progress.Writer) {
  290. if jm.ID == "" || jm.Progress == nil {
  291. return
  292. }
  293. var (
  294. text string
  295. status = progress.Working
  296. )
  297. text = jm.Progress.String()
  298. if jm.Status == "Pull complete" ||
  299. jm.Status == "Already exists" ||
  300. strings.Contains(jm.Status, "Image is up to date") ||
  301. strings.Contains(jm.Status, "Downloaded newer image") {
  302. status = progress.Done
  303. }
  304. if jm.Error != nil {
  305. status = progress.Error
  306. text = jm.Error.Message
  307. }
  308. w.Event(progress.Event{
  309. ID: jm.ID,
  310. ParentID: parent,
  311. Text: jm.Status,
  312. Status: status,
  313. StatusText: text,
  314. })
  315. }