convergence.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "github.com/compose-spec/compose-go/types"
  22. "github.com/containerd/containerd/platforms"
  23. moby "github.com/docker/docker/api/types"
  24. "github.com/docker/docker/api/types/filters"
  25. "github.com/docker/docker/api/types/network"
  26. specs "github.com/opencontainers/image-spec/specs-go/v1"
  27. "github.com/sirupsen/logrus"
  28. "golang.org/x/sync/errgroup"
  29. "github.com/docker/compose-cli/pkg/api"
  30. "github.com/docker/compose-cli/pkg/progress"
  31. "github.com/docker/compose-cli/pkg/utils"
  32. )
  33. const (
  34. extLifecycle = "x-lifecycle"
  35. forceRecreate = "force_recreate"
  36. doubledContainerNameWarning = "WARNING: The %q service is using the custom container name %q. " +
  37. "Docker requires each container to have a unique name. " +
  38. "Remove the custom name to scale the service.\n"
  39. )
  40. // convergence manages service's container lifecycle.
  41. // Based on initially observed state, it reconciles the existing container with desired state, which might include
  42. // re-creating container, adding or removing replicas, or starting stopped containers.
  43. // Cross services dependencies are managed by creating services in expected order and updating `service:xx` reference
  44. // when a service has converged, so dependent ones can be managed with resolved containers references.
  45. type convergence struct {
  46. service *composeService
  47. observedState map[string]Containers
  48. }
  49. func newConvergence(services []string, state Containers, s *composeService) *convergence {
  50. observedState := map[string]Containers{}
  51. for _, s := range services {
  52. observedState[s] = Containers{}
  53. }
  54. for _, c := range state.filter(isNotOneOff) {
  55. service := c.Labels[api.ServiceLabel]
  56. observedState[service] = append(observedState[service], c)
  57. }
  58. return &convergence{
  59. service: s,
  60. observedState: observedState,
  61. }
  62. }
  63. func (c *convergence) apply(ctx context.Context, project *types.Project, options api.CreateOptions) error {
  64. return InDependencyOrder(ctx, project, func(ctx context.Context, name string) error {
  65. service, err := project.GetService(name)
  66. if err != nil {
  67. return err
  68. }
  69. strategy := options.RecreateDependencies
  70. if utils.StringContains(options.Services, name) {
  71. strategy = options.Recreate
  72. }
  73. err = c.ensureService(ctx, project, service, strategy, options.Inherit, options.Timeout)
  74. if err != nil {
  75. return err
  76. }
  77. c.updateProject(project, name)
  78. return nil
  79. })
  80. }
  81. var mu sync.Mutex
  82. // updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
  83. func (c *convergence) updateProject(project *types.Project, service string) {
  84. containers := c.observedState[service]
  85. container := containers[0]
  86. // operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
  87. mu.Lock()
  88. defer mu.Unlock()
  89. for i, s := range project.Services {
  90. if d := getDependentServiceFromMode(s.NetworkMode); d == service {
  91. s.NetworkMode = types.NetworkModeContainerPrefix + container.ID
  92. }
  93. if d := getDependentServiceFromMode(s.Ipc); d == service {
  94. s.Ipc = types.NetworkModeContainerPrefix + container.ID
  95. }
  96. if d := getDependentServiceFromMode(s.Pid); d == service {
  97. s.Pid = types.NetworkModeContainerPrefix + container.ID
  98. }
  99. var links []string
  100. for _, serviceLink := range s.Links {
  101. parts := strings.Split(serviceLink, ":")
  102. serviceName := serviceLink
  103. serviceAlias := ""
  104. if len(parts) == 2 {
  105. serviceName = parts[0]
  106. serviceAlias = parts[1]
  107. }
  108. if serviceName != service {
  109. links = append(links, serviceLink)
  110. continue
  111. }
  112. for _, container := range containers {
  113. name := getCanonicalContainerName(container)
  114. if serviceAlias != "" {
  115. links = append(links,
  116. fmt.Sprintf("%s:%s", name, serviceAlias))
  117. }
  118. links = append(links,
  119. fmt.Sprintf("%s:%s", name, name),
  120. fmt.Sprintf("%s:%s", name, getContainerNameWithoutProject(container)))
  121. }
  122. s.Links = links
  123. }
  124. project.Services[i] = s
  125. }
  126. }
  127. func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error {
  128. expected, err := getScale(service)
  129. if err != nil {
  130. return err
  131. }
  132. containers := c.observedState[service.Name]
  133. actual := len(containers)
  134. updated := make(Containers, expected)
  135. eg, _ := errgroup.WithContext(ctx)
  136. for i, container := range containers {
  137. if i > expected {
  138. // Scale Down
  139. eg.Go(func() error {
  140. err := c.service.apiClient.ContainerStop(ctx, container.ID, timeout)
  141. if err != nil {
  142. return err
  143. }
  144. return c.service.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{})
  145. })
  146. continue
  147. }
  148. if recreate == api.RecreateNever {
  149. continue
  150. }
  151. // Re-create diverged containers
  152. configHash, err := ServiceHash(service)
  153. if err != nil {
  154. return err
  155. }
  156. name := getContainerProgressName(container)
  157. diverged := container.Labels[api.ConfigHashLabel] != configHash
  158. if diverged || recreate == api.RecreateForce || service.Extensions[extLifecycle] == forceRecreate {
  159. i := i
  160. eg.Go(func() error {
  161. recreated, err := c.service.recreateContainer(ctx, project, service, container, inherit, timeout)
  162. updated[i] = recreated
  163. return err
  164. })
  165. continue
  166. }
  167. // Enforce non-diverged containers are running
  168. w := progress.ContextWriter(ctx)
  169. switch container.State {
  170. case ContainerRunning:
  171. w.Event(progress.RunningEvent(name))
  172. case ContainerCreated:
  173. case ContainerRestarting:
  174. case ContainerExited:
  175. w.Event(progress.CreatedEvent(name))
  176. default:
  177. eg.Go(func() error {
  178. return c.service.startContainer(ctx, container)
  179. })
  180. }
  181. updated[i] = container
  182. }
  183. next, err := nextContainerNumber(containers)
  184. if err != nil {
  185. return err
  186. }
  187. for i := 0; i < expected-actual; i++ {
  188. // Scale UP
  189. number := next + i
  190. name := getContainerName(project.Name, service, number)
  191. eg.Go(func() error {
  192. container, err := c.service.createContainer(ctx, project, service, name, number, false, true)
  193. updated[actual+i-1] = container
  194. return err
  195. })
  196. continue
  197. }
  198. err = eg.Wait()
  199. c.observedState[service.Name] = updated
  200. return err
  201. }
  202. func getContainerName(projectName string, service types.ServiceConfig, number int) string {
  203. name := fmt.Sprintf("%s_%s_%d", projectName, service.Name, number)
  204. if service.ContainerName != "" {
  205. name = service.ContainerName
  206. }
  207. return name
  208. }
  209. func getContainerProgressName(container moby.Container) string {
  210. return "Container " + getCanonicalContainerName(container)
  211. }
  212. func (s *composeService) waitDependencies(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
  213. eg, _ := errgroup.WithContext(ctx)
  214. for dep, config := range service.DependsOn {
  215. dep, config := dep, config
  216. eg.Go(func() error {
  217. ticker := time.NewTicker(500 * time.Millisecond)
  218. defer ticker.Stop()
  219. for {
  220. <-ticker.C
  221. switch config.Condition {
  222. case types.ServiceConditionHealthy:
  223. healthy, err := s.isServiceHealthy(ctx, project, dep)
  224. if err != nil {
  225. return err
  226. }
  227. if healthy {
  228. return nil
  229. }
  230. case types.ServiceConditionCompletedSuccessfully:
  231. exited, code, err := s.isServiceCompleted(ctx, project, dep)
  232. if err != nil {
  233. return err
  234. }
  235. if exited {
  236. if code != 0 {
  237. return fmt.Errorf("service %q didn't completed successfully: exit %d", dep, code)
  238. }
  239. return nil
  240. }
  241. case types.ServiceConditionStarted:
  242. // already managed by InDependencyOrder
  243. return nil
  244. default:
  245. logrus.Warnf("unsupported depends_on condition: %s", config.Condition)
  246. return nil
  247. }
  248. }
  249. })
  250. }
  251. return eg.Wait()
  252. }
  253. func nextContainerNumber(containers []moby.Container) (int, error) {
  254. max := 0
  255. for _, c := range containers {
  256. n, err := strconv.Atoi(c.Labels[api.ContainerNumberLabel])
  257. if err != nil {
  258. return 0, err
  259. }
  260. if n > max {
  261. max = n
  262. }
  263. }
  264. return max + 1, nil
  265. }
  266. func getScale(config types.ServiceConfig) (int, error) {
  267. scale := 1
  268. var err error
  269. if config.Deploy != nil && config.Deploy.Replicas != nil {
  270. scale = int(*config.Deploy.Replicas)
  271. }
  272. if config.Scale != 0 {
  273. scale = config.Scale
  274. }
  275. if scale > 1 && config.ContainerName != "" {
  276. scale = -1
  277. err = fmt.Errorf(doubledContainerNameWarning,
  278. config.Name,
  279. config.ContainerName)
  280. }
  281. return scale, err
  282. }
  283. func (s *composeService) createContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
  284. name string, number int, autoRemove bool, useNetworkAliases bool) (container moby.Container, err error) {
  285. w := progress.ContextWriter(ctx)
  286. eventName := "Container " + name
  287. w.Event(progress.CreatingEvent(eventName))
  288. container, err = s.createMobyContainer(ctx, project, service, name, number, nil, autoRemove, useNetworkAliases)
  289. if err != nil {
  290. return
  291. }
  292. w.Event(progress.CreatedEvent(eventName))
  293. return
  294. }
  295. func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
  296. replaced moby.Container, inherit bool, timeout *time.Duration) (moby.Container, error) {
  297. var created moby.Container
  298. w := progress.ContextWriter(ctx)
  299. w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Working, "Recreate"))
  300. err := s.apiClient.ContainerStop(ctx, replaced.ID, timeout)
  301. if err != nil {
  302. return created, err
  303. }
  304. name := getCanonicalContainerName(replaced)
  305. tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name)
  306. err = s.apiClient.ContainerRename(ctx, replaced.ID, tmpName)
  307. if err != nil {
  308. return created, err
  309. }
  310. number, err := strconv.Atoi(replaced.Labels[api.ContainerNumberLabel])
  311. if err != nil {
  312. return created, err
  313. }
  314. var inherited *moby.Container
  315. if inherit {
  316. inherited = &replaced
  317. }
  318. created, err = s.createMobyContainer(ctx, project, service, name, number, inherited, false, true)
  319. if err != nil {
  320. return created, err
  321. }
  322. err = s.apiClient.ContainerRemove(ctx, replaced.ID, moby.ContainerRemoveOptions{})
  323. if err != nil {
  324. return created, err
  325. }
  326. w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Done, "Recreated"))
  327. setDependentLifecycle(project, service.Name, forceRecreate)
  328. return created, err
  329. }
  330. // setDependentLifecycle define the Lifecycle strategy for all services to depend on specified service
  331. func setDependentLifecycle(project *types.Project, service string, strategy string) {
  332. for i, s := range project.Services {
  333. if utils.StringContains(s.GetDependencies(), service) {
  334. if s.Extensions == nil {
  335. s.Extensions = map[string]interface{}{}
  336. }
  337. s.Extensions[extLifecycle] = strategy
  338. project.Services[i] = s
  339. }
  340. }
  341. }
  342. func (s *composeService) startContainer(ctx context.Context, container moby.Container) error {
  343. w := progress.ContextWriter(ctx)
  344. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Working, "Restart"))
  345. err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
  346. if err != nil {
  347. return err
  348. }
  349. w.Event(progress.NewEvent(getContainerProgressName(container), progress.Done, "Restarted"))
  350. return nil
  351. }
  352. func (s *composeService) createMobyContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
  353. name string, number int, inherit *moby.Container, autoRemove bool, useNetworkAliases bool) (moby.Container, error) {
  354. var created moby.Container
  355. containerConfig, hostConfig, networkingConfig, err := s.getCreateOptions(ctx, project, service, number, inherit, autoRemove)
  356. if err != nil {
  357. return created, err
  358. }
  359. var plat *specs.Platform
  360. if service.Platform != "" {
  361. var p specs.Platform
  362. p, err = platforms.Parse(service.Platform)
  363. if err != nil {
  364. return created, err
  365. }
  366. plat = &p
  367. }
  368. response, err := s.apiClient.ContainerCreate(ctx, containerConfig, hostConfig, networkingConfig, plat, name)
  369. if err != nil {
  370. return created, err
  371. }
  372. inspectedContainer, err := s.apiClient.ContainerInspect(ctx, response.ID)
  373. if err != nil {
  374. return created, err
  375. }
  376. created = moby.Container{
  377. ID: inspectedContainer.ID,
  378. Labels: inspectedContainer.Config.Labels,
  379. Names: []string{inspectedContainer.Name},
  380. NetworkSettings: &moby.SummaryNetworkSettings{
  381. Networks: inspectedContainer.NetworkSettings.Networks,
  382. },
  383. }
  384. links := append(service.Links, service.ExternalLinks...)
  385. for _, netName := range service.NetworksByPriority() {
  386. netwrk := project.Networks[netName]
  387. cfg := service.Networks[netName]
  388. aliases := []string{getContainerName(project.Name, service, number)}
  389. if useNetworkAliases {
  390. aliases = append(aliases, service.Name)
  391. if cfg != nil {
  392. aliases = append(aliases, cfg.Aliases...)
  393. }
  394. }
  395. if val, ok := created.NetworkSettings.Networks[netwrk.Name]; ok {
  396. if shortIDAliasExists(created.ID, val.Aliases...) {
  397. continue
  398. }
  399. err = s.apiClient.NetworkDisconnect(ctx, netwrk.Name, created.ID, false)
  400. if err != nil {
  401. return created, err
  402. }
  403. }
  404. err = s.connectContainerToNetwork(ctx, created.ID, netwrk.Name, cfg, links, aliases...)
  405. if err != nil {
  406. return created, err
  407. }
  408. }
  409. return created, err
  410. }
  411. func shortIDAliasExists(containerID string, aliases ...string) bool {
  412. for _, alias := range aliases {
  413. if alias == containerID[:12] {
  414. return true
  415. }
  416. }
  417. return false
  418. }
  419. func (s *composeService) connectContainerToNetwork(ctx context.Context, id string, netwrk string, cfg *types.ServiceNetworkConfig, links []string, aliases ...string) error {
  420. var (
  421. ipv4Address string
  422. ipv6Address string
  423. ipam *network.EndpointIPAMConfig
  424. )
  425. if cfg != nil {
  426. ipv4Address = cfg.Ipv4Address
  427. ipv6Address = cfg.Ipv6Address
  428. ipam = &network.EndpointIPAMConfig{
  429. IPv4Address: ipv4Address,
  430. IPv6Address: ipv6Address,
  431. }
  432. }
  433. err := s.apiClient.NetworkConnect(ctx, netwrk, id, &network.EndpointSettings{
  434. Aliases: aliases,
  435. IPAddress: ipv4Address,
  436. GlobalIPv6Address: ipv6Address,
  437. Links: links,
  438. IPAMConfig: ipam,
  439. })
  440. if err != nil {
  441. return err
  442. }
  443. return nil
  444. }
  445. func (s *composeService) isServiceHealthy(ctx context.Context, project *types.Project, service string) (bool, error) {
  446. containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, service)
  447. if err != nil {
  448. return false, err
  449. }
  450. if len(containers) == 0 {
  451. return false, nil
  452. }
  453. for _, c := range containers {
  454. container, err := s.apiClient.ContainerInspect(ctx, c.ID)
  455. if err != nil {
  456. return false, err
  457. }
  458. if container.State == nil || container.State.Health == nil {
  459. return false, fmt.Errorf("container for service %q has no healthcheck configured", service)
  460. }
  461. if container.State.Health.Status != moby.Healthy {
  462. return false, nil
  463. }
  464. }
  465. return true, nil
  466. }
  467. func (s *composeService) isServiceCompleted(ctx context.Context, project *types.Project, dep string) (bool, int, error) {
  468. containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, dep)
  469. if err != nil {
  470. return false, 0, err
  471. }
  472. for _, c := range containers {
  473. container, err := s.apiClient.ContainerInspect(ctx, c.ID)
  474. if err != nil {
  475. return false, 0, err
  476. }
  477. if container.State != nil && container.State.Status == "exited" {
  478. return true, container.State.ExitCode, nil
  479. }
  480. }
  481. return false, 0, nil
  482. }
  483. func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
  484. err := s.waitDependencies(ctx, project, service)
  485. if err != nil {
  486. return err
  487. }
  488. containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
  489. Filters: filters.NewArgs(
  490. projectFilter(project.Name),
  491. serviceFilter(service.Name),
  492. oneOffFilter(false),
  493. ),
  494. All: true,
  495. })
  496. if err != nil {
  497. return err
  498. }
  499. w := progress.ContextWriter(ctx)
  500. eg, ctx := errgroup.WithContext(ctx)
  501. for _, c := range containers {
  502. container := c
  503. if container.State == ContainerRunning {
  504. continue
  505. }
  506. eg.Go(func() error {
  507. eventName := getContainerProgressName(container)
  508. w.Event(progress.StartingEvent(eventName))
  509. err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
  510. if err == nil {
  511. w.Event(progress.StartedEvent(eventName))
  512. }
  513. return err
  514. })
  515. }
  516. return eg.Wait()
  517. }