convergence.go 17 KB


  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "github.com/compose-spec/compose-go/types"
  22. "github.com/containerd/containerd/platforms"
  23. moby "github.com/docker/docker/api/types"
  24. "github.com/docker/docker/api/types/filters"
  25. "github.com/docker/docker/api/types/network"
  26. specs "github.com/opencontainers/image-spec/specs-go/v1"
  27. "github.com/sirupsen/logrus"
  28. "golang.org/x/sync/errgroup"
  29. "github.com/docker/compose-cli/pkg/api"
  30. "github.com/docker/compose-cli/pkg/progress"
  31. "github.com/docker/compose-cli/pkg/utils"
  32. )
  33. const (
  34. extLifecycle = "x-lifecycle"
  35. forceRecreate = "force_recreate"
  36. doubledContainerNameWarning = "WARNING: The %q service is using the custom container name %q. " +
  37. "Docker requires each container to have a unique name. " +
  38. "Remove the custom name to scale the service.\n"
  39. )
  40. // convergence manages service's container lifecycle.
  41. // Based on initially observed state, it reconciles the existing container with desired state, which might include
  42. // re-creating container, adding or removing replicas, or starting stopped containers.
  43. // Cross services dependencies are managed by creating services in expected order and updating `service:xx` reference
  44. // when a service has converged, so dependent ones can be managed with resolved containers references.
  45. type convergence struct {
  46. service *composeService
  47. observedState map[string]Containers
  48. stateMutex sync.Mutex
  49. }
  50. func (c *convergence) getObservedState(serviceName string) Containers {
  51. c.stateMutex.Lock()
  52. defer c.stateMutex.Unlock()
  53. return c.observedState[serviceName]
  54. }
  55. func (c *convergence) setObservedState(serviceName string, containers Containers) {
  56. c.stateMutex.Lock()
  57. defer c.stateMutex.Unlock()
  58. c.observedState[serviceName] = containers
  59. }
  60. func newConvergence(services []string, state Containers, s *composeService) *convergence {
  61. observedState := map[string]Containers{}
  62. for _, s := range services {
  63. observedState[s] = Containers{}
  64. }
  65. for _, c := range state.filter(isNotOneOff) {
  66. service := c.Labels[api.ServiceLabel]
  67. observedState[service] = append(observedState[service], c)
  68. }
  69. return &convergence{
  70. service: s,
  71. observedState: observedState,
  72. }
  73. }
  74. func (c *convergence) apply(ctx context.Context, project *types.Project, options api.CreateOptions) error {
  75. return InDependencyOrder(ctx, project, func(ctx context.Context, name string) error {
  76. service, err := project.GetService(name)
  77. if err != nil {
  78. return err
  79. }
  80. strategy := options.RecreateDependencies
  81. if utils.StringContains(options.Services, name) {
  82. strategy = options.Recreate
  83. }
  84. err = c.ensureService(ctx, project, service, strategy, options.Inherit, options.Timeout)
  85. if err != nil {
  86. return err
  87. }
  88. c.updateProject(project, name)
  89. return nil
  90. })
  91. }
  92. var mu sync.Mutex
  93. // updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
  94. func (c *convergence) updateProject(project *types.Project, service string) {
  95. containers := c.getObservedState(service)
  96. container := containers[0]
  97. // operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
  98. mu.Lock()
  99. defer mu.Unlock()
  100. for i, s := range project.Services {
  101. if d := getDependentServiceFromMode(s.NetworkMode); d == service {
  102. s.NetworkMode = types.NetworkModeContainerPrefix + container.ID
  103. }
  104. if d := getDependentServiceFromMode(s.Ipc); d == service {
  105. s.Ipc = types.NetworkModeContainerPrefix + container.ID
  106. }
  107. if d := getDependentServiceFromMode(s.Pid); d == service {
  108. s.Pid = types.NetworkModeContainerPrefix + container.ID
  109. }
  110. var links []string
  111. for _, serviceLink := range s.Links {
  112. parts := strings.Split(serviceLink, ":")
  113. serviceName := serviceLink
  114. serviceAlias := ""
  115. if len(parts) == 2 {
  116. serviceName = parts[0]
  117. serviceAlias = parts[1]
  118. }
  119. if serviceName != service {
  120. links = append(links, serviceLink)
  121. continue
  122. }
  123. for _, container := range containers {
  124. name := getCanonicalContainerName(container)
  125. if serviceAlias != "" {
  126. links = append(links,
  127. fmt.Sprintf("%s:%s", name, serviceAlias))
  128. }
  129. links = append(links,
  130. fmt.Sprintf("%s:%s", name, name),
  131. fmt.Sprintf("%s:%s", name, getContainerNameWithoutProject(container)))
  132. }
  133. s.Links = links
  134. }
  135. project.Services[i] = s
  136. }
  137. }
  138. func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error {
  139. expected, err := getScale(service)
  140. if err != nil {
  141. return err
  142. }
  143. containers := c.getObservedState(service.Name)
  144. actual := len(containers)
  145. updated := make(Containers, expected)
  146. eg, _ := errgroup.WithContext(ctx)
  147. for i, container := range containers {
  148. if i > expected {
  149. // Scale Down
  150. container := container
  151. eg.Go(func() error {
  152. err := c.service.apiClient.ContainerStop(ctx, container.ID, timeout)
  153. if err != nil {
  154. return err
  155. }
  156. return c.service.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{})
  157. })
  158. continue
  159. }
  160. if recreate == api.RecreateNever {
  161. continue
  162. }
  163. // Re-create diverged containers
  164. configHash, err := ServiceHash(service)
  165. if err != nil {
  166. return err
  167. }
  168. name := getContainerProgressName(container)
  169. diverged := container.Labels[api.ConfigHashLabel] != configHash
  170. if diverged || recreate == api.RecreateForce || service.Extensions[extLifecycle] == forceRecreate {
  171. i, container := i, container
  172. eg.Go(func() error {
  173. recreated, err := c.service.recreateContainer(ctx, project, service, container, inherit, timeout)
  174. updated[i] = recreated
  175. return err
  176. })
  177. continue
  178. }
  179. // Enforce non-diverged containers are running
  180. w := progress.ContextWriter(ctx)
  181. switch container.State {
  182. case ContainerRunning:
  183. w.Event(progress.RunningEvent(name))
  184. case ContainerCreated:
  185. case ContainerRestarting:
  186. case ContainerExited:
  187. w.Event(progress.CreatedEvent(name))
  188. default:
  189. container := container
  190. eg.Go(func() error {
  191. return c.service.startContainer(ctx, container)
  192. })
  193. }
  194. updated[i] = container
  195. }
  196. next, err := nextContainerNumber(containers)
  197. if err != nil {
  198. return err
  199. }
  200. for i := 0; i < expected-actual; i++ {
  201. // Scale UP
  202. number := next + i
  203. name := getContainerName(project.Name, service, number)
  204. i := i
  205. eg.Go(func() error {
  206. container, err := c.service.createContainer(ctx, project, service, name, number, false, true)
  207. updated[actual+i] = container
  208. return err
  209. })
  210. continue
  211. }
  212. err = eg.Wait()
  213. c.setObservedState(service.Name, updated)
  214. return err
  215. }
  216. func getContainerName(projectName string, service types.ServiceConfig, number int) string {
  217. name := fmt.Sprintf("%s_%s_%d", projectName, service.Name, number)
  218. if service.ContainerName != "" {
  219. name = service.ContainerName
  220. }
  221. return name
  222. }
  223. func getContainerProgressName(container moby.Container) string {
  224. return "Container " + getCanonicalContainerName(container)
  225. }
  226. func (s *composeService) waitDependencies(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
  227. eg, _ := errgroup.WithContext(ctx)
  228. for dep, config := range service.DependsOn {
  229. dep, config := dep, config
  230. eg.Go(func() error {
  231. ticker := time.NewTicker(500 * time.Millisecond)
  232. defer ticker.Stop()
  233. for {
  234. <-ticker.C
  235. switch config.Condition {
  236. case types.ServiceConditionHealthy:
  237. healthy, err := s.isServiceHealthy(ctx, project, dep)
  238. if err != nil {
  239. return err
  240. }
  241. if healthy {
  242. return nil
  243. }
  244. case types.ServiceConditionCompletedSuccessfully:
  245. exited, code, err := s.isServiceCompleted(ctx, project, dep)
  246. if err != nil {
  247. return err
  248. }
  249. if exited {
  250. if code != 0 {
  251. return fmt.Errorf("service %q didn't completed successfully: exit %d", dep, code)
  252. }
  253. return nil
  254. }
  255. case types.ServiceConditionStarted:
  256. // already managed by InDependencyOrder
  257. return nil
  258. default:
  259. logrus.Warnf("unsupported depends_on condition: %s", config.Condition)
  260. return nil
  261. }
  262. }
  263. })
  264. }
  265. return eg.Wait()
  266. }
  267. func nextContainerNumber(containers []moby.Container) (int, error) {
  268. max := 0
  269. for _, c := range containers {
  270. n, err := strconv.Atoi(c.Labels[api.ContainerNumberLabel])
  271. if err != nil {
  272. return 0, err
  273. }
  274. if n > max {
  275. max = n
  276. }
  277. }
  278. return max + 1, nil
  279. }
  280. func getScale(config types.ServiceConfig) (int, error) {
  281. scale := 1
  282. var err error
  283. if config.Deploy != nil && config.Deploy.Replicas != nil {
  284. scale = int(*config.Deploy.Replicas)
  285. }
  286. if config.Scale != 0 {
  287. scale = config.Scale
  288. }
  289. if scale > 1 && config.ContainerName != "" {
  290. scale = -1
  291. err = fmt.Errorf(doubledContainerNameWarning,
  292. config.Name,
  293. config.ContainerName)
  294. }
  295. return scale, err
  296. }
  297. func (s *composeService) createContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
  298. name string, number int, autoRemove bool, useNetworkAliases bool) (container moby.Container, err error) {
  299. w := progress.ContextWriter(ctx)
  300. eventName := "Container " + name
  301. w.Event(progress.CreatingEvent(eventName))
  302. container, err = s.createMobyContainer(ctx, project, service, name, number, nil, autoRemove, useNetworkAliases)
  303. if err != nil {
  304. return
  305. }
  306. w.Event(progress.CreatedEvent(eventName))
  307. return
  308. }
  309. func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
  310. replaced moby.Container, inherit bool, timeout *time.Duration) (moby.Container, error) {
  311. var created moby.Container
  312. w := progress.ContextWriter(ctx)
  313. w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Working, "Recreate"))
  314. err := s.apiClient.ContainerStop(ctx, replaced.ID, timeout)
  315. if err != nil {
  316. return created, err
  317. }
  318. name := getCanonicalContainerName(replaced)
  319. tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name)
  320. err = s.apiClient.ContainerRename(ctx, replaced.ID, tmpName)
  321. if err != nil {
  322. return created, err
  323. }
  324. number, err := strconv.Atoi(replaced.Labels[api.ContainerNumberLabel])
  325. if err != nil {
  326. return created, err
  327. }
  328. var inherited *moby.Container
  329. if inherit {
  330. inherited = &replaced
  331. }
  332. created, err = s.createMobyContainer(ctx, project, service, name, number, inherited, false, true)
  333. if err != nil {
  334. return created, err
  335. }
  336. err = s.apiClient.ContainerRemove(ctx, replaced.ID, moby.ContainerRemoveOptions{})
  337. if err != nil {
  338. return created, err
  339. }
  340. w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Done, "Recreated"))
  341. setDependentLifecycle(project, service.Name, forceRecreate)
  342. return created, err
  343. }
  344. // setDependentLifecycle define the Lifecycle strategy for all services to depend on specified service
  345. func setDependentLifecycle(project *types.Project, service string, strategy string) {
  346. for i, s := range project.Services {
  347. if utils.StringContains(s.GetDependencies(), service) {
  348. if s.Extensions == nil {
  349. s.Extensions = map[string]interface{}{}
  350. }
  351. s.Extensions[extLifecycle] = strategy
  352. project.Services[i] = s
  353. }
  354. }
  355. }
  356. func (s *composeService) startContainer(ctx context.Context, container moby.Container) error {
  357. w := progress.ContextWriter(ctx)
  358. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Working, "Restart"))
  359. err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
  360. if err != nil {
  361. return err
  362. }
  363. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Done, "Restarted"))
  364. return nil
  365. }
  366. func (s *composeService) createMobyContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
  367. name string, number int, inherit *moby.Container, autoRemove bool, useNetworkAliases bool) (moby.Container, error) {
  368. var created moby.Container
  369. containerConfig, hostConfig, networkingConfig, err := s.getCreateOptions(ctx, project, service, number, inherit, autoRemove)
  370. if err != nil {
  371. return created, err
  372. }
  373. var plat *specs.Platform
  374. if service.Platform != "" {
  375. var p specs.Platform
  376. p, err = platforms.Parse(service.Platform)
  377. if err != nil {
  378. return created, err
  379. }
  380. plat = &p
  381. }
  382. response, err := s.apiClient.ContainerCreate(ctx, containerConfig, hostConfig, networkingConfig, plat, name)
  383. if err != nil {
  384. return created, err
  385. }
  386. inspectedContainer, err := s.apiClient.ContainerInspect(ctx, response.ID)
  387. if err != nil {
  388. return created, err
  389. }
  390. created = moby.Container{
  391. ID: inspectedContainer.ID,
  392. Labels: inspectedContainer.Config.Labels,
  393. Names: []string{inspectedContainer.Name},
  394. NetworkSettings: &moby.SummaryNetworkSettings{
  395. Networks: inspectedContainer.NetworkSettings.Networks,
  396. },
  397. }
  398. links := append(service.Links, service.ExternalLinks...)
  399. for _, netName := range service.NetworksByPriority() {
  400. netwrk := project.Networks[netName]
  401. cfg := service.Networks[netName]
  402. aliases := []string{getContainerName(project.Name, service, number)}
  403. if useNetworkAliases {
  404. aliases = append(aliases, service.Name)
  405. if cfg != nil {
  406. aliases = append(aliases, cfg.Aliases...)
  407. }
  408. }
  409. if val, ok := created.NetworkSettings.Networks[netwrk.Name]; ok {
  410. if shortIDAliasExists(created.ID, val.Aliases...) {
  411. continue
  412. }
  413. err = s.apiClient.NetworkDisconnect(ctx, netwrk.Name, created.ID, false)
  414. if err != nil {
  415. return created, err
  416. }
  417. }
  418. err = s.connectContainerToNetwork(ctx, created.ID, netwrk.Name, cfg, links, aliases...)
  419. if err != nil {
  420. return created, err
  421. }
  422. }
  423. return created, err
  424. }
  425. func shortIDAliasExists(containerID string, aliases ...string) bool {
  426. for _, alias := range aliases {
  427. if alias == containerID[:12] {
  428. return true
  429. }
  430. }
  431. return false
  432. }
  433. func (s *composeService) connectContainerToNetwork(ctx context.Context, id string, netwrk string, cfg *types.ServiceNetworkConfig, links []string, aliases ...string) error {
  434. var (
  435. ipv4Address string
  436. ipv6Address string
  437. ipam *network.EndpointIPAMConfig
  438. )
  439. if cfg != nil {
  440. ipv4Address = cfg.Ipv4Address
  441. ipv6Address = cfg.Ipv6Address
  442. ipam = &network.EndpointIPAMConfig{
  443. IPv4Address: ipv4Address,
  444. IPv6Address: ipv6Address,
  445. }
  446. }
  447. err := s.apiClient.NetworkConnect(ctx, netwrk, id, &network.EndpointSettings{
  448. Aliases: aliases,
  449. IPAddress: ipv4Address,
  450. GlobalIPv6Address: ipv6Address,
  451. Links: links,
  452. IPAMConfig: ipam,
  453. })
  454. if err != nil {
  455. return err
  456. }
  457. return nil
  458. }
  459. func (s *composeService) isServiceHealthy(ctx context.Context, project *types.Project, service string) (bool, error) {
  460. containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, service)
  461. if err != nil {
  462. return false, err
  463. }
  464. if len(containers) == 0 {
  465. return false, nil
  466. }
  467. for _, c := range containers {
  468. container, err := s.apiClient.ContainerInspect(ctx, c.ID)
  469. if err != nil {
  470. return false, err
  471. }
  472. if container.State == nil || container.State.Health == nil {
  473. return false, fmt.Errorf("container for service %q has no healthcheck configured", service)
  474. }
  475. if container.State.Health.Status != moby.Healthy {
  476. return false, nil
  477. }
  478. }
  479. return true, nil
  480. }
  481. func (s *composeService) isServiceCompleted(ctx context.Context, project *types.Project, dep string) (bool, int, error) {
  482. containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, dep)
  483. if err != nil {
  484. return false, 0, err
  485. }
  486. for _, c := range containers {
  487. container, err := s.apiClient.ContainerInspect(ctx, c.ID)
  488. if err != nil {
  489. return false, 0, err
  490. }
  491. if container.State != nil && container.State.Status == "exited" {
  492. return true, container.State.ExitCode, nil
  493. }
  494. }
  495. return false, 0, nil
  496. }
  497. func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
  498. err := s.waitDependencies(ctx, project, service)
  499. if err != nil {
  500. return err
  501. }
  502. containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
  503. Filters: filters.NewArgs(
  504. projectFilter(project.Name),
  505. serviceFilter(service.Name),
  506. oneOffFilter(false),
  507. ),
  508. All: true,
  509. })
  510. if err != nil {
  511. return err
  512. }
  513. if len(containers) == 0 {
  514. return fmt.Errorf("no containers to start")
  515. }
  516. w := progress.ContextWriter(ctx)
  517. eg, ctx := errgroup.WithContext(ctx)
  518. for _, container := range containers {
  519. if container.State == ContainerRunning {
  520. continue
  521. }
  522. container := container
  523. eg.Go(func() error {
  524. eventName := getContainerProgressName(container)
  525. w.Event(progress.StartingEvent(eventName))
  526. err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
  527. if err == nil {
  528. w.Event(progress.StartedEvent(eventName))
  529. }
  530. return err
  531. })
  532. }
  533. return eg.Wait()
  534. }