convergence.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "strconv"
  18. "time"
  19. "github.com/compose-spec/compose-go/types"
  20. "github.com/containerd/containerd/platforms"
  21. moby "github.com/docker/docker/api/types"
  22. "github.com/docker/docker/api/types/filters"
  23. "github.com/docker/docker/api/types/network"
  24. specs "github.com/opencontainers/image-spec/specs-go/v1"
  25. "github.com/sirupsen/logrus"
  26. "golang.org/x/sync/errgroup"
  27. "github.com/docker/compose-cli/api/compose"
  28. "github.com/docker/compose-cli/api/progress"
  29. status "github.com/docker/compose-cli/local/moby"
  30. "github.com/docker/compose-cli/utils"
  31. )
  32. const (
  33. extLifecycle = "x-lifecycle"
  34. forceRecreate = "force_recreate"
  35. doubledContainerNameWarning = "WARNING: The %q service is using the custom container name %q. " +
  36. "Docker requires each container to have a unique name. " +
  37. "Remove the custom name to scale the service.\n"
  38. )
  39. func (s *composeService) ensureScale(ctx context.Context, project *types.Project, service types.ServiceConfig, timeout *time.Duration) (*errgroup.Group, []moby.Container, error) {
  40. cState, err := GetContextContainerState(ctx)
  41. if err != nil {
  42. return nil, nil, err
  43. }
  44. observedState := cState.GetContainers()
  45. actual := observedState.filter(isService(service.Name)).filter(isNotOneOff)
  46. scale, err := getScale(service)
  47. if err != nil {
  48. return nil, nil, err
  49. }
  50. eg, _ := errgroup.WithContext(ctx)
  51. if len(actual) < scale {
  52. next, err := nextContainerNumber(actual)
  53. if err != nil {
  54. return nil, actual, err
  55. }
  56. missing := scale - len(actual)
  57. for i := 0; i < missing; i++ {
  58. number := next + i
  59. name := getContainerName(project.Name, service, number)
  60. eg.Go(func() error {
  61. return s.createContainer(ctx, project, service, name, number, false, true)
  62. })
  63. }
  64. }
  65. if len(actual) > scale {
  66. for i := scale; i < len(actual); i++ {
  67. container := actual[i]
  68. eg.Go(func() error {
  69. err := s.apiClient.ContainerStop(ctx, container.ID, timeout)
  70. if err != nil {
  71. return err
  72. }
  73. return s.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{})
  74. })
  75. }
  76. actual = actual[:scale]
  77. }
  78. return eg, actual, nil
  79. }
  80. func (s *composeService) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error {
  81. eg, actual, err := s.ensureScale(ctx, project, service, timeout)
  82. if err != nil {
  83. return err
  84. }
  85. if recreate == compose.RecreateNever {
  86. return nil
  87. }
  88. expected, err := utils.ServiceHash(service)
  89. if err != nil {
  90. return err
  91. }
  92. for _, container := range actual {
  93. container := container
  94. name := getContainerProgressName(container)
  95. diverged := container.Labels[configHashLabel] != expected
  96. if diverged || recreate == compose.RecreateForce || service.Extensions[extLifecycle] == forceRecreate {
  97. eg.Go(func() error {
  98. return s.recreateContainer(ctx, project, service, container, inherit, timeout)
  99. })
  100. continue
  101. }
  102. w := progress.ContextWriter(ctx)
  103. switch container.State {
  104. case status.ContainerRunning:
  105. w.Event(progress.RunningEvent(name))
  106. case status.ContainerCreated:
  107. case status.ContainerRestarting:
  108. case status.ContainerExited:
  109. w.Event(progress.CreatedEvent(name))
  110. default:
  111. eg.Go(func() error {
  112. return s.startContainer(ctx, container)
  113. })
  114. }
  115. }
  116. return eg.Wait()
  117. }
  118. func getContainerName(projectName string, service types.ServiceConfig, number int) string {
  119. name := fmt.Sprintf("%s_%s_%d", projectName, service.Name, number)
  120. if service.ContainerName != "" {
  121. name = service.ContainerName
  122. }
  123. return name
  124. }
  125. func getContainerProgressName(container moby.Container) string {
  126. return "Container " + getCanonicalContainerName(container)
  127. }
  128. func (s *composeService) waitDependencies(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
  129. eg, _ := errgroup.WithContext(ctx)
  130. for dep, config := range service.DependsOn {
  131. dep, config := dep, config
  132. eg.Go(func() error {
  133. ticker := time.NewTicker(500 * time.Millisecond)
  134. defer ticker.Stop()
  135. for {
  136. <-ticker.C
  137. switch config.Condition {
  138. case types.ServiceConditionHealthy:
  139. healthy, err := s.isServiceHealthy(ctx, project, dep)
  140. if err != nil {
  141. return err
  142. }
  143. if healthy {
  144. return nil
  145. }
  146. case types.ServiceConditionCompletedSuccessfully:
  147. exited, code, err := s.isServiceCompleted(ctx, project, dep)
  148. if err != nil {
  149. return err
  150. }
  151. if exited {
  152. if code != 0 {
  153. return fmt.Errorf("service %q didn't completed successfully: exit %d", dep, code)
  154. }
  155. return nil
  156. }
  157. case types.ServiceConditionStarted:
  158. // already managed by InDependencyOrder
  159. return nil
  160. default:
  161. logrus.Warnf("unsupported depends_on condition: %s", config.Condition)
  162. return nil
  163. }
  164. }
  165. })
  166. }
  167. return eg.Wait()
  168. }
  169. func nextContainerNumber(containers []moby.Container) (int, error) {
  170. max := 0
  171. for _, c := range containers {
  172. n, err := strconv.Atoi(c.Labels[containerNumberLabel])
  173. if err != nil {
  174. return 0, err
  175. }
  176. if n > max {
  177. max = n
  178. }
  179. }
  180. return max + 1, nil
  181. }
  182. func getScale(config types.ServiceConfig) (int, error) {
  183. scale := 1
  184. var err error
  185. if config.Deploy != nil && config.Deploy.Replicas != nil {
  186. scale = int(*config.Deploy.Replicas)
  187. }
  188. if config.Scale != 0 {
  189. scale = config.Scale
  190. }
  191. if scale > 1 && config.ContainerName != "" {
  192. scale = -1
  193. err = fmt.Errorf(doubledContainerNameWarning,
  194. config.Name,
  195. config.ContainerName)
  196. }
  197. return scale, err
  198. }
  199. func (s *composeService) createContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int, autoRemove bool, useNetworkAliases bool) error {
  200. w := progress.ContextWriter(ctx)
  201. eventName := "Container " + name
  202. w.Event(progress.CreatingEvent(eventName))
  203. err := s.createMobyContainer(ctx, project, service, name, number, nil, autoRemove, useNetworkAliases)
  204. if err != nil {
  205. return err
  206. }
  207. w.Event(progress.CreatedEvent(eventName))
  208. return nil
  209. }
  210. func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, container moby.Container, inherit bool, timeout *time.Duration) error {
  211. w := progress.ContextWriter(ctx)
  212. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Working, "Recreate"))
  213. err := s.apiClient.ContainerStop(ctx, container.ID, timeout)
  214. if err != nil {
  215. return err
  216. }
  217. name := getCanonicalContainerName(container)
  218. tmpName := fmt.Sprintf("%s_%s", container.ID[:12], name)
  219. err = s.apiClient.ContainerRename(ctx, container.ID, tmpName)
  220. if err != nil {
  221. return err
  222. }
  223. number, err := strconv.Atoi(container.Labels[containerNumberLabel])
  224. if err != nil {
  225. return err
  226. }
  227. var inherited *moby.Container
  228. if inherit {
  229. inherited = &container
  230. }
  231. err = s.createMobyContainer(ctx, project, service, name, number, inherited, false, true)
  232. if err != nil {
  233. return err
  234. }
  235. err = s.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{})
  236. if err != nil {
  237. return err
  238. }
  239. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Done, "Recreated"))
  240. setDependentLifecycle(project, service.Name, forceRecreate)
  241. return nil
  242. }
  243. // setDependentLifecycle define the Lifecycle strategy for all services to depend on specified service
  244. func setDependentLifecycle(project *types.Project, service string, strategy string) {
  245. for i, s := range project.Services {
  246. if utils.StringContains(s.GetDependencies(), service) {
  247. if s.Extensions == nil {
  248. s.Extensions = map[string]interface{}{}
  249. }
  250. s.Extensions[extLifecycle] = strategy
  251. project.Services[i] = s
  252. }
  253. }
  254. }
  255. func (s *composeService) startContainer(ctx context.Context, container moby.Container) error {
  256. w := progress.ContextWriter(ctx)
  257. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Working, "Restart"))
  258. err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
  259. if err != nil {
  260. return err
  261. }
  262. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Done, "Restarted"))
  263. return nil
  264. }
  265. func (s *composeService) createMobyContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int,
  266. inherit *moby.Container,
  267. autoRemove bool,
  268. useNetworkAliases bool) error {
  269. cState, err := GetContextContainerState(ctx)
  270. if err != nil {
  271. return err
  272. }
  273. containerConfig, hostConfig, networkingConfig, err := s.getCreateOptions(ctx, project, service, number, inherit, autoRemove)
  274. if err != nil {
  275. return err
  276. }
  277. var plat *specs.Platform
  278. if service.Platform != "" {
  279. p, err := platforms.Parse(service.Platform)
  280. if err != nil {
  281. return err
  282. }
  283. plat = &p
  284. }
  285. created, err := s.apiClient.ContainerCreate(ctx, containerConfig, hostConfig, networkingConfig, plat, name)
  286. if err != nil {
  287. return err
  288. }
  289. createdContainer := moby.Container{
  290. ID: created.ID,
  291. Labels: containerConfig.Labels,
  292. }
  293. cState.Add(createdContainer)
  294. for _, netName := range service.NetworksByPriority() {
  295. netwrk := project.Networks[netName]
  296. cfg := service.Networks[netName]
  297. aliases := []string{getContainerName(project.Name, service, number)}
  298. if useNetworkAliases {
  299. aliases = append(aliases, service.Name)
  300. if cfg != nil {
  301. aliases = append(aliases, cfg.Aliases...)
  302. }
  303. }
  304. err = s.connectContainerToNetwork(ctx, created.ID, netwrk.Name, cfg, aliases...)
  305. if err != nil {
  306. return err
  307. }
  308. }
  309. return nil
  310. }
  311. func (s *composeService) connectContainerToNetwork(ctx context.Context, id string, netwrk string, cfg *types.ServiceNetworkConfig, aliases ...string) error {
  312. var (
  313. ipv4ddress string
  314. ipv6Address string
  315. )
  316. if cfg != nil {
  317. ipv4ddress = cfg.Ipv4Address
  318. ipv6Address = cfg.Ipv6Address
  319. }
  320. err := s.apiClient.NetworkConnect(ctx, netwrk, id, &network.EndpointSettings{
  321. Aliases: aliases,
  322. IPAddress: ipv4ddress,
  323. GlobalIPv6Address: ipv6Address,
  324. })
  325. if err != nil {
  326. return err
  327. }
  328. return nil
  329. }
  330. func (s *composeService) isServiceHealthy(ctx context.Context, project *types.Project, service string) (bool, error) {
  331. containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, service)
  332. if err != nil {
  333. return false, err
  334. }
  335. if len(containers) == 0 {
  336. return false, nil
  337. }
  338. for _, c := range containers {
  339. container, err := s.apiClient.ContainerInspect(ctx, c.ID)
  340. if err != nil {
  341. return false, err
  342. }
  343. if container.State == nil || container.State.Health == nil {
  344. return false, fmt.Errorf("container for service %q has no healthcheck configured", service)
  345. }
  346. if container.State.Health.Status != moby.Healthy {
  347. return false, nil
  348. }
  349. }
  350. return true, nil
  351. }
  352. func (s *composeService) isServiceCompleted(ctx context.Context, project *types.Project, dep string) (bool, int, error) {
  353. containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, dep)
  354. if err != nil {
  355. return false, 0, err
  356. }
  357. for _, c := range containers {
  358. container, err := s.apiClient.ContainerInspect(ctx, c.ID)
  359. if err != nil {
  360. return false, 0, err
  361. }
  362. if container.State != nil && container.State.Status == "exited" {
  363. return true, container.State.ExitCode, nil
  364. }
  365. }
  366. return false, 0, nil
  367. }
  368. func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
  369. err := s.waitDependencies(ctx, project, service)
  370. if err != nil {
  371. return err
  372. }
  373. containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
  374. Filters: filters.NewArgs(
  375. projectFilter(project.Name),
  376. serviceFilter(service.Name),
  377. ),
  378. All: true,
  379. })
  380. if err != nil {
  381. return err
  382. }
  383. w := progress.ContextWriter(ctx)
  384. eg, ctx := errgroup.WithContext(ctx)
  385. for _, c := range containers {
  386. container := c
  387. if container.State == status.ContainerRunning {
  388. continue
  389. }
  390. eg.Go(func() error {
  391. eventName := getContainerProgressName(container)
  392. w.Event(progress.StartingEvent(eventName))
  393. err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
  394. if err == nil {
  395. w.Event(progress.StartedEvent(eventName))
  396. }
  397. return err
  398. })
  399. }
  400. return eg.Wait()
  401. }
  402. func (s *composeService) restartService(ctx context.Context, serviceName string, timeout *time.Duration) error {
  403. containerState, err := GetContextContainerState(ctx)
  404. if err != nil {
  405. return err
  406. }
  407. containers := containerState.GetContainers().filter(isService(serviceName))
  408. w := progress.ContextWriter(ctx)
  409. eg, ctx := errgroup.WithContext(ctx)
  410. for _, c := range containers {
  411. container := c
  412. eg.Go(func() error {
  413. eventName := getContainerProgressName(container)
  414. w.Event(progress.RestartingEvent(eventName))
  415. err := s.apiClient.ContainerRestart(ctx, container.ID, timeout)
  416. if err == nil {
  417. w.Event(progress.StartedEvent(eventName))
  418. }
  419. return err
  420. })
  421. }
  422. return eg.Wait()
  423. }