Преглед на файлове

detect replacement container is created and inform printer so it attach and don't stop

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof преди 2 години
родител
ревизия
0f5b5ccbd0
променени са 10 файла, в които са добавени 96 реда и са изтрити 42 реда
  1. 1 1
      cmd/compose/up.go
  2. 3 0
      pkg/api/api.go
  3. 2 0
      pkg/api/labels.go
  4. 3 0
      pkg/compose/attach.go
  5. 0 11
      pkg/compose/containers.go
  6. 17 12
      pkg/compose/convergence.go
  7. 10 0
      pkg/compose/logs.go
  8. 8 5
      pkg/compose/printer.go
  9. 51 12
      pkg/compose/start.go
  10. 1 1
      pkg/utils/slices.go

+ 1 - 1
cmd/compose/up.go

@@ -171,7 +171,7 @@ func runUp(ctx context.Context, streams api.Streams, backend api.Service, create
 	if len(attachTo) == 0 {
 	if len(attachTo) == 0 {
 		attachTo = project.ServiceNames()
 		attachTo = project.ServiceNames()
 	}
 	}
-	attachTo = utils.RemoveAll(attachTo, upOptions.noAttach)
+	attachTo = utils.Remove(attachTo, upOptions.noAttach...)
 
 
 	create := api.CreateOptions{
 	create := api.CreateOptions{
 		Services:             services,
 		Services:             services,

+ 3 - 0
pkg/api/api.go

@@ -468,6 +468,7 @@ type ContainerEvent struct {
 	// This is only suitable for display purposes within Compose, as it's
 	// This is only suitable for display purposes within Compose, as it's
 	// not guaranteed to be unique across services.
 	// not guaranteed to be unique across services.
 	Container string
 	Container string
+	ID        string
 	Service   string
 	Service   string
 	Line      string
 	Line      string
 	// ContainerEventExit only
 	// ContainerEventExit only
@@ -484,6 +485,8 @@ const (
 	ContainerEventAttach
 	ContainerEventAttach
 	// ContainerEventStopped is a ContainerEvent of type stopped.
 	// ContainerEventStopped is a ContainerEvent of type stopped.
 	ContainerEventStopped
 	ContainerEventStopped
+	// ContainerEventRecreated let consumer know container stopped but his being replaced
+	ContainerEventRecreated
 	// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
 	// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
 	ContainerEventExit
 	ContainerEventExit
 	// UserCancel user cancelled compose up, we are stopping containers
 	// UserCancel user cancelled compose up, we are stopping containers

+ 2 - 0
pkg/api/labels.go

@@ -55,6 +55,8 @@ const (
 	VersionLabel = "com.docker.compose.version"
 	VersionLabel = "com.docker.compose.version"
 	// ImageBuilderLabel stores the builder (classic or BuildKit) used to produce the image.
 	// ImageBuilderLabel stores the builder (classic or BuildKit) used to produce the image.
 	ImageBuilderLabel = "com.docker.compose.image.builder"
 	ImageBuilderLabel = "com.docker.compose.image.builder"
+	// ContainerReplaceLabel is set when container is created to replace another container (recreated)
+	ContainerReplaceLabel = "com.docker.compose.replace"
 )
 )
 
 
 // ComposeVersion is the compose tool version as declared by label VersionLabel
 // ComposeVersion is the compose tool version as declared by label VersionLabel

+ 3 - 0
pkg/compose/attach.go

@@ -66,6 +66,7 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con
 	listener(api.ContainerEvent{
 	listener(api.ContainerEvent{
 		Type:      api.ContainerEventAttach,
 		Type:      api.ContainerEventAttach,
 		Container: containerName,
 		Container: containerName,
+		ID:        container.ID,
 		Service:   serviceName,
 		Service:   serviceName,
 	})
 	})
 
 
@@ -73,6 +74,7 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con
 		listener(api.ContainerEvent{
 		listener(api.ContainerEvent{
 			Type:      api.ContainerEventLog,
 			Type:      api.ContainerEventLog,
 			Container: containerName,
 			Container: containerName,
+			ID:        container.ID,
 			Service:   serviceName,
 			Service:   serviceName,
 			Line:      line,
 			Line:      line,
 		})
 		})
@@ -81,6 +83,7 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con
 		listener(api.ContainerEvent{
 		listener(api.ContainerEvent{
 			Type:      api.ContainerEventErr,
 			Type:      api.ContainerEventErr,
 			Container: containerName,
 			Container: containerName,
+			ID:        container.ID,
 			Service:   serviceName,
 			Service:   serviceName,
 			Line:      line,
 			Line:      line,
 		})
 		})

+ 0 - 11
pkg/compose/containers.go

@@ -148,14 +148,3 @@ func (containers Containers) sorted() Containers {
 	})
 	})
 	return containers
 	return containers
 }
 }
-
-func (containers Containers) remove(id string) Containers {
-	for i, c := range containers {
-		if c.ID == id {
-			l := len(containers) - 1
-			containers[i] = containers[l]
-			return containers[:l]
-		}
-	}
-	return containers
-}

+ 17 - 12
pkg/compose/convergence.go

@@ -416,35 +416,40 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
 	var created moby.Container
 	var created moby.Container
 	w := progress.ContextWriter(ctx)
 	w := progress.ContextWriter(ctx)
 	w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Working, "Recreate"))
 	w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Working, "Recreate"))
-	timeoutInSecond := utils.DurationSecondToInt(timeout)
-	err := s.apiClient().ContainerStop(ctx, replaced.ID, containerType.StopOptions{Timeout: timeoutInSecond})
+
+	number, err := strconv.Atoi(replaced.Labels[api.ContainerNumberLabel])
 	if err != nil {
 	if err != nil {
 		return created, err
 		return created, err
 	}
 	}
-	name := getCanonicalContainerName(replaced)
+
+	var inherited *moby.Container
+	if inherit {
+		inherited = &replaced
+	}
+	name := getContainerName(project.Name, service, number)
 	tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name)
 	tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name)
-	err = s.apiClient().ContainerRename(ctx, replaced.ID, tmpName)
+	service.Labels[api.ContainerReplaceLabel] = replaced.ID
+	created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, false, true, false, w)
 	if err != nil {
 	if err != nil {
 		return created, err
 		return created, err
 	}
 	}
-	number, err := strconv.Atoi(replaced.Labels[api.ContainerNumberLabel])
+
+	timeoutInSecond := utils.DurationSecondToInt(timeout)
+	err = s.apiClient().ContainerStop(ctx, replaced.ID, containerType.StopOptions{Timeout: timeoutInSecond})
 	if err != nil {
 	if err != nil {
 		return created, err
 		return created, err
 	}
 	}
 
 
-	var inherited *moby.Container
-	if inherit {
-		inherited = &replaced
-	}
-	name = getContainerName(project.Name, service, number)
-	created, err = s.createMobyContainer(ctx, project, service, name, number, inherited, false, true, false, w)
+	err = s.apiClient().ContainerRemove(ctx, replaced.ID, moby.ContainerRemoveOptions{})
 	if err != nil {
 	if err != nil {
 		return created, err
 		return created, err
 	}
 	}
-	err = s.apiClient().ContainerRemove(ctx, replaced.ID, moby.ContainerRemoveOptions{})
+
+	err = s.apiClient().ContainerRename(ctx, created.ID, name)
 	if err != nil {
 	if err != nil {
 		return created, err
 		return created, err
 	}
 	}
+
 	w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Done, "Recreated"))
 	w.Event(progress.NewEvent(getContainerProgressName(replaced), progress.Done, "Recreated"))
 	setDependentLifecycle(project, service.Name, forceRecreate)
 	setDependentLifecycle(project, service.Name, forceRecreate)
 	return created, err
 	return created, err

+ 10 - 0
pkg/compose/logs.go

@@ -83,6 +83,7 @@ func (s *composeService) Logs(
 			printer.HandleEvent(api.ContainerEvent{
 			printer.HandleEvent(api.ContainerEvent{
 				Type:      api.ContainerEventAttach,
 				Type:      api.ContainerEventAttach,
 				Container: getContainerNameWithoutProject(c),
 				Container: getContainerNameWithoutProject(c),
+				ID:        c.ID,
 				Service:   c.Labels[api.ServiceLabel],
 				Service:   c.Labels[api.ServiceLabel],
 			})
 			})
 		}
 		}
@@ -92,6 +93,7 @@ func (s *composeService) Logs(
 				printer.HandleEvent(api.ContainerEvent{
 				printer.HandleEvent(api.ContainerEvent{
 					Type:      api.ContainerEventAttach,
 					Type:      api.ContainerEventAttach,
 					Container: getContainerNameWithoutProject(c),
 					Container: getContainerNameWithoutProject(c),
+					ID:        c.ID,
 					Service:   c.Labels[api.ServiceLabel],
 					Service:   c.Labels[api.ServiceLabel],
 				})
 				})
 				err := s.logContainers(ctx, consumer, c, api.LogOptions{
 				err := s.logContainers(ctx, consumer, c, api.LogOptions{
@@ -106,6 +108,14 @@ func (s *composeService) Logs(
 					return nil
 					return nil
 				}
 				}
 				return err
 				return err
+			}, func(c types.Container, t time.Time) error {
+				printer.HandleEvent(api.ContainerEvent{
+					Type:      api.ContainerEventAttach,
+					Container: "", // actual name will be set by start event
+					ID:        c.ID,
+					Service:   c.Labels[api.ServiceLabel],
+				})
+				return nil
 			})
 			})
 			printer.Stop()
 			printer.Stop()
 			return err
 			return err

+ 8 - 5
pkg/compose/printer.go

@@ -74,22 +74,25 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
 		case <-p.stopCh:
 		case <-p.stopCh:
 			return exitCode, nil
 			return exitCode, nil
 		case event := <-p.queue:
 		case event := <-p.queue:
-			container := event.Container
+			container, id := event.Container, event.ID
 			switch event.Type {
 			switch event.Type {
 			case api.UserCancel:
 			case api.UserCancel:
 				aborting = true
 				aborting = true
 			case api.ContainerEventAttach:
 			case api.ContainerEventAttach:
-				if _, ok := containers[container]; ok {
+				if _, ok := containers[id]; ok {
 					continue
 					continue
 				}
 				}
-				containers[container] = struct{}{}
+				containers[id] = struct{}{}
 				p.consumer.Register(container)
 				p.consumer.Register(container)
-			case api.ContainerEventExit, api.ContainerEventStopped:
+			case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated:
 				if !event.Restarting {
 				if !event.Restarting {
-					delete(containers, container)
+					delete(containers, id)
 				}
 				}
 				if !aborting {
 				if !aborting {
 					p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
 					p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
+					if event.Type == api.ContainerEventRecreated {
+						p.consumer.Status(container, "has been recreated")
+					}
 				}
 				}
 				if cascadeStop {
 				if cascadeStop {
 					if !aborting {
 					if !aborting {

+ 51 - 12
pkg/compose/start.go

@@ -63,6 +63,14 @@ func (s *composeService) start(ctx context.Context, projectName string, options
 			return s.watchContainers(context.Background(), project.Name, options.AttachTo, options.Services, listener, attached,
 			return s.watchContainers(context.Background(), project.Name, options.AttachTo, options.Services, listener, attached,
 				func(container moby.Container, _ time.Time) error {
 				func(container moby.Container, _ time.Time) error {
 					return s.attachContainer(ctx, container, listener)
 					return s.attachContainer(ctx, container, listener)
+				}, func(container moby.Container, _ time.Time) error {
+					listener(api.ContainerEvent{
+						Type:      api.ContainerEventAttach,
+						Container: "", // actual name will be set by start event
+						ID:        container.ID,
+						Service:   container.Labels[api.ServiceLabel],
+					})
+					return nil
 				})
 				})
 		})
 		})
 	}
 	}
@@ -114,7 +122,7 @@ type containerWatchFn func(container moby.Container, t time.Time) error
 // watchContainers uses engine events to capture container start/die and notify ContainerEventListener
 // watchContainers uses engine events to capture container start/die and notify ContainerEventListener
 func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
 func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
 	projectName string, services, required []string,
 	projectName string, services, required []string,
-	listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
+	listener api.ContainerEventListener, containers Containers, onStart, onRecreate containerWatchFn) error {
 	if len(containers) == 0 {
 	if len(containers) == 0 {
 		return nil
 		return nil
 	}
 	}
@@ -123,12 +131,13 @@ func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
 	}
 	}
 
 
 	var (
 	var (
-		expected Containers
+		expected []string
 		watched  = map[string]int{}
 		watched  = map[string]int{}
+		replaced []string
 	)
 	)
 	for _, c := range containers {
 	for _, c := range containers {
 		if utils.Contains(required, c.Labels[api.ServiceLabel]) {
 		if utils.Contains(required, c.Labels[api.ServiceLabel]) {
-			expected = append(expected, c)
+			expected = append(expected, c.ID)
 		}
 		}
 		watched[c.ID] = 0
 		watched[c.ID] = 0
 	}
 	}
@@ -157,23 +166,38 @@ func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
 			service := container.Labels[api.ServiceLabel]
 			service := container.Labels[api.ServiceLabel]
 			switch event.Status {
 			switch event.Status {
 			case "stop":
 			case "stop":
-				listener(api.ContainerEvent{
-					Type:      api.ContainerEventStopped,
-					Container: name,
-					Service:   service,
-				})
+				if _, ok := watched[container.ID]; ok {
+					eType := api.ContainerEventStopped
+					if utils.Contains(replaced, container.ID) {
+						utils.Remove(replaced, container.ID)
+						eType = api.ContainerEventRecreated
+					}
+					listener(api.ContainerEvent{
+						Type:      eType,
+						Container: name,
+						ID:        container.ID,
+						Service:   service,
+					})
+				}
 
 
 				delete(watched, container.ID)
 				delete(watched, container.ID)
-				expected = expected.remove(container.ID)
+				expected = utils.Remove(expected, container.ID)
 			case "die":
 			case "die":
 				restarted := watched[container.ID]
 				restarted := watched[container.ID]
 				watched[container.ID] = restarted + 1
 				watched[container.ID] = restarted + 1
 				// Container terminated.
 				// Container terminated.
 				willRestart := inspected.State.Restarting
 				willRestart := inspected.State.Restarting
 
 
+				eType := api.ContainerEventExit
+				if utils.Contains(replaced, container.ID) {
+					utils.Remove(replaced, container.ID)
+					eType = api.ContainerEventRecreated
+				}
+
 				listener(api.ContainerEvent{
 				listener(api.ContainerEvent{
-					Type:       api.ContainerEventExit,
+					Type:       eType,
 					Container:  name,
 					Container:  name,
+					ID:         container.ID,
 					Service:    service,
 					Service:    service,
 					ExitCode:   inspected.State.ExitCode,
 					ExitCode:   inspected.State.ExitCode,
 					Restarting: willRestart,
 					Restarting: willRestart,
@@ -182,7 +206,7 @@ func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
 				if !willRestart {
 				if !willRestart {
 					// we're done with this one
 					// we're done with this one
 					delete(watched, container.ID)
 					delete(watched, container.ID)
-					expected = expected.remove(container.ID)
+					expected = utils.Remove(expected, container.ID)
 				}
 				}
 			case "start":
 			case "start":
 				count, ok := watched[container.ID]
 				count, ok := watched[container.ID]
@@ -190,7 +214,7 @@ func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
 				if !ok {
 				if !ok {
 					// A new container has just been added to service by scale
 					// A new container has just been added to service by scale
 					watched[container.ID] = 0
 					watched[container.ID] = 0
-					expected = append(expected, container)
+					expected = append(expected, container.ID)
 					mustAttach = true
 					mustAttach = true
 				}
 				}
 				if mustAttach {
 				if mustAttach {
@@ -200,6 +224,21 @@ func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
 						return err
 						return err
 					}
 					}
 				}
 				}
+			case "create":
+				if id, ok := container.Labels[api.ContainerReplaceLabel]; ok {
+					replaced = append(replaced, id)
+					err = onRecreate(container, event.Timestamp)
+					if err != nil {
+						return err
+					}
+					if utils.StringContains(expected, id) {
+						expected = append(expected, inspected.ID)
+					}
+					watched[container.ID] = 1
+					if utils.Contains(expected, id) {
+						expected = append(expected, container.ID)
+					}
+				}
 			}
 			}
 			if len(expected) == 0 {
 			if len(expected) == 0 {
 				stop()
 				stop()

+ 1 - 1
pkg/utils/slices.go

@@ -30,7 +30,7 @@ func Contains[T any](origin []T, element T) bool {
 }
 }
 
 
 // RemoveAll removes all elements from origin slice
 // RemoveAll removes all elements from origin slice
-func RemoveAll[T any](origin []T, elements []T) []T {
+func Remove[T any](origin []T, elements ...T) []T {
 	var filtered []T
 	var filtered []T
 	for _, v := range origin {
 	for _, v := range origin {
 		if !Contains(elements, v) {
 		if !Contains(elements, v) {