Browse Source

(refactoring) introduce monitor to manage containers events and application termination

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 6 months ago
parent
commit
485b6200ee

+ 0 - 4
cmd/formatter/logs.go

@@ -56,10 +56,6 @@ func NewLogConsumer(ctx context.Context, stdout, stderr io.Writer, color, prefix
 	}
 }
 
-func (l *logConsumer) Register(name string) {
-	l.register(name)
-}
-
 func (l *logConsumer) register(name string) *presenter {
 	var p *presenter
 	root, _, found := strings.Cut(name, " ")

+ 17 - 14
pkg/api/api.go

@@ -649,7 +649,6 @@ type LogConsumer interface {
 	Log(containerName, message string)
 	Err(containerName, message string)
 	Status(container, msg string)
-	Register(container string)
 }
 
 // ContainerEventListener is a callback to process ContainerEvent from services
@@ -657,16 +656,18 @@ type ContainerEventListener func(event ContainerEvent)
 
 // ContainerEvent notify an event has been collected on source container implementing Service
 type ContainerEvent struct {
-	Type int
-	// Container is the name of the container _without the project prefix_.
+	Type      int
+	Time      int64
+	Container *ContainerSummary
+	// Source is the name of the container _without the project prefix_.
 	//
 	// This is only suitable for display purposes within Compose, as it's
 	// not guaranteed to be unique across services.
-	Container string
-	ID        string
-	Service   string
-	Line      string
-	// ContainerEventExit only
+	Source  string
+	ID      string
+	Service string
+	Line    string
+	// ContainerEventExited only
 	ExitCode   int
 	Restarting bool
 }
@@ -676,17 +677,19 @@ const (
 	ContainerEventLog = iota
 	// ContainerEventErr is a ContainerEvent of type log on stderr. Line is set
 	ContainerEventErr
-	// ContainerEventAttach is a ContainerEvent of type attach. First event sent about a container
-	ContainerEventAttach
+	// ContainerEventStarted let consumer know a container has been started
+	ContainerEventStarted
+	// ContainerEventRestarted let consumer know a container has been restarted
+	ContainerEventRestarted
 	// ContainerEventStopped is a ContainerEvent of type stopped.
 	ContainerEventStopped
+	// ContainerEventCreated let consumer know a new container has been created
+	ContainerEventCreated
 	// ContainerEventRecreated let consumer know container stopped but his being replaced
 	ContainerEventRecreated
-	// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
-	ContainerEventExit
+	// ContainerEventExited is a ContainerEvent of type exit. ExitCode is set
+	ContainerEventExited
 	// UserCancel user cancelled compose up, we are stopping containers
-	UserCancel
-	// HookEventLog is a ContainerEvent of type log on stdout by service hook
 	HookEventLog
 )
 

+ 21 - 25
pkg/compose/attach.go

@@ -61,41 +61,37 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
 }
 
 func (s *composeService) attachContainer(ctx context.Context, container containerType.Summary, listener api.ContainerEventListener) error {
-	serviceName := container.Labels[api.ServiceLabel]
-	containerName := getContainerNameWithoutProject(container)
-
-	listener(api.ContainerEvent{
-		Type:      api.ContainerEventAttach,
-		Container: containerName,
-		ID:        container.ID,
-		Service:   serviceName,
-	})
+	service := container.Labels[api.ServiceLabel]
+	name := getContainerNameWithoutProject(container)
+	return s.doAttachContainer(ctx, service, container.ID, name, listener)
+}
+
+func (s *composeService) doAttachContainer(ctx context.Context, service, id, name string, listener api.ContainerEventListener) error {
+	inspect, err := s.apiClient().ContainerInspect(ctx, id)
+	if err != nil {
+		return err
+	}
 
 	wOut := utils.GetWriter(func(line string) {
 		listener(api.ContainerEvent{
-			Type:      api.ContainerEventLog,
-			Container: containerName,
-			ID:        container.ID,
-			Service:   serviceName,
-			Line:      line,
+			Type:    api.ContainerEventLog,
+			Source:  name,
+			ID:      id,
+			Service: service,
+			Line:    line,
 		})
 	})
 	wErr := utils.GetWriter(func(line string) {
 		listener(api.ContainerEvent{
-			Type:      api.ContainerEventErr,
-			Container: containerName,
-			ID:        container.ID,
-			Service:   serviceName,
-			Line:      line,
+			Type:    api.ContainerEventErr,
+			Source:  name,
+			ID:      id,
+			Service: service,
+			Line:    line,
 		})
 	})
 
-	inspect, err := s.apiClient().ContainerInspect(ctx, container.ID)
-	if err != nil {
-		return err
-	}
-
-	_, _, err = s.attachContainerStreams(ctx, container.ID, inspect.Config.Tty, nil, wOut, wErr)
+	_, _, err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, nil, wOut, wErr)
 	return err
 }
 

+ 0 - 6
pkg/compose/containers.go

@@ -128,12 +128,6 @@ func isService(services ...string) containerPredicate {
 	}
 }
 
-func isRunning() containerPredicate {
-	return func(c container.Summary) bool {
-		return c.State == "running"
-	}
-}
-
 // isOrphaned is a predicate to select containers without a matching service definition in compose project
 func isOrphaned(project *types.Project) containerPredicate {
 	services := append(project.ServiceNames(), project.DisabledServiceNames()...)

+ 5 - 5
pkg/compose/hook.go

@@ -32,11 +32,11 @@ import (
 func (s composeService) runHook(ctx context.Context, ctr container.Summary, service types.ServiceConfig, hook types.ServiceHook, listener api.ContainerEventListener) error {
 	wOut := utils.GetWriter(func(line string) {
 		listener(api.ContainerEvent{
-			Type:      api.HookEventLog,
-			Container: getContainerNameWithoutProject(ctr) + " ->",
-			ID:        ctr.ID,
-			Service:   service.Name,
-			Line:      line,
+			Type:    api.HookEventLog,
+			Source:  getContainerNameWithoutProject(ctr) + " ->",
+			ID:      ctr.ID,
+			Service: service.Name,
+			Line:    line,
 		})
 	})
 	defer wOut.Close() //nolint:errcheck

+ 25 - 42
pkg/compose/logs.go

@@ -62,7 +62,7 @@ func (s *composeService) Logs(
 	eg, ctx := errgroup.WithContext(ctx)
 	for _, ctr := range containers {
 		eg.Go(func() error {
-			err := s.logContainers(ctx, consumer, ctr, options)
+			err := s.logContainer(ctx, consumer, ctr, options)
 			if errdefs.IsNotImplemented(err) {
 				logrus.Warnf("Can't retrieve logs for %q: %s", getCanonicalContainerName(ctr), err.Error())
 				return nil
@@ -72,34 +72,21 @@ func (s *composeService) Logs(
 	}
 
 	if options.Follow {
-		containers = containers.filter(isRunning())
 		printer := newLogPrinter(consumer)
-		eg.Go(func() error {
-			_, err := printer.Run(api.CascadeIgnore, "", nil)
-			return err
-		})
-
-		for _, c := range containers {
-			printer.HandleEvent(api.ContainerEvent{
-				Type:      api.ContainerEventAttach,
-				Container: getContainerNameWithoutProject(c),
-				ID:        c.ID,
-				Service:   c.Labels[api.ServiceLabel],
-			})
-		}
+		eg.Go(printer.Run)
 
-		eg.Go(func() error {
-			err := s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c container.Summary, t time.Time) error {
-				printer.HandleEvent(api.ContainerEvent{
-					Type:      api.ContainerEventAttach,
-					Container: getContainerNameWithoutProject(c),
-					ID:        c.ID,
-					Service:   c.Labels[api.ServiceLabel],
-				})
+		monitor := newMonitor(s.apiClient(), options.Project)
+		monitor.withListener(func(event api.ContainerEvent) {
+			if event.Type == api.ContainerEventStarted {
 				eg.Go(func() error {
-					err := s.logContainers(ctx, consumer, c, api.LogOptions{
+					ctr, err := s.apiClient().ContainerInspect(ctx, event.ID)
+					if err != nil {
+						return err
+					}
+
+					err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{
 						Follow:     options.Follow,
-						Since:      t.Format(time.RFC3339Nano),
+						Since:      time.Unix(0, event.Time).Format(time.RFC3339Nano),
 						Until:      options.Until,
 						Tail:       options.Tail,
 						Timestamps: options.Timestamps,
@@ -110,31 +97,28 @@ func (s *composeService) Logs(
 					}
 					return err
 				})
-				return nil
-			}, func(c container.Summary, t time.Time) error {
-				printer.HandleEvent(api.ContainerEvent{
-					Type:      api.ContainerEventAttach,
-					Container: "", // actual name will be set by start event
-					ID:        c.ID,
-					Service:   c.Labels[api.ServiceLabel],
-				})
-				return nil
-			})
-			printer.Stop()
-			return err
+			}
+		})
+		eg.Go(func() error {
+			defer printer.Stop()
+			return monitor.Start(ctx)
 		})
 	}
 
 	return eg.Wait()
 }
 
-func (s *composeService) logContainers(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
-	cnt, err := s.apiClient().ContainerInspect(ctx, c.ID)
+func (s *composeService) logContainer(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
+	ctr, err := s.apiClient().ContainerInspect(ctx, c.ID)
 	if err != nil {
 		return err
 	}
+	name := getContainerNameWithoutProject(c)
+	return s.doLogContainer(ctx, consumer, name, ctr, options)
+}
 
-	r, err := s.apiClient().ContainerLogs(ctx, cnt.ID, container.LogsOptions{
+func (s *composeService) doLogContainer(ctx context.Context, consumer api.LogConsumer, name string, ctr container.InspectResponse, options api.LogOptions) error {
+	r, err := s.apiClient().ContainerLogs(ctx, ctr.ID, container.LogsOptions{
 		ShowStdout: true,
 		ShowStderr: true,
 		Follow:     options.Follow,
@@ -148,11 +132,10 @@ func (s *composeService) logContainers(ctx context.Context, consumer api.LogCons
 	}
 	defer r.Close() //nolint:errcheck
 
-	name := getContainerNameWithoutProject(c)
 	w := utils.GetWriter(func(line string) {
 		consumer.Log(name, line)
 	})
-	if cnt.Config.Tty {
+	if ctr.Config.Tty {
 		_, err = io.Copy(w, r)
 	} else {
 		_, err = stdcopy.StdCopy(w, w, r)

+ 0 - 2
pkg/compose/logs_test.go

@@ -189,8 +189,6 @@ func (l *testLogConsumer) Err(containerName, message string) {
 
 func (l *testLogConsumer) Status(containerName, msg string) {}
 
-func (l *testLogConsumer) Register(containerName string) {}
-
 func (l *testLogConsumer) LogsForContainer(containerName string) []string {
 	l.mu.Lock()
 	defer l.mu.Unlock()

+ 211 - 0
pkg/compose/monitor.go

@@ -0,0 +1,211 @@
+/*
+   Copyright 2020 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package compose
+
+import (
+	"context"
+	"strconv"
+
+	"github.com/compose-spec/compose-go/v2/types"
+	"github.com/containerd/errdefs"
+	"github.com/docker/docker/api/types/container"
+	"github.com/docker/docker/api/types/events"
+	"github.com/docker/docker/api/types/filters"
+	"github.com/docker/docker/client"
+	"github.com/sirupsen/logrus"
+
+	"github.com/docker/compose/v2/pkg/api"
+	"github.com/docker/compose/v2/pkg/utils"
+)
+
+type monitor struct {
+	api     client.APIClient
+	project *types.Project
+	// services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command
+	services  map[string]bool
+	listeners []api.ContainerEventListener
+}
+
+func newMonitor(api client.APIClient, project *types.Project) *monitor {
+	services := map[string]bool{}
+	if project != nil {
+		for name := range project.Services {
+			services[name] = true
+		}
+	}
+	return &monitor{
+		api:      api,
+		project:  project,
+		services: services,
+	}
+}
+
+// Start runs monitor to detect application events and return after termination
+//
+//nolint:gocyclo
+func (c *monitor) Start(ctx context.Context) error {
+	// collect initial application container
+	initialState, err := c.api.ContainerList(ctx, container.ListOptions{
+		All: true,
+		Filters: filters.NewArgs(
+			projectFilter(c.project.Name),
+			oneOffFilter(false),
+			hasConfigHashLabel(),
+		),
+	})
+	if err != nil {
+		return err
+	}
+
+	// containers is the set if container IDs the application is based on
+	containers := utils.Set[string]{}
+	for _, ctr := range initialState {
+		if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] {
+			containers.Add(ctr.ID)
+		}
+	}
+
+	restarting := utils.Set[string]{}
+
+	evtCh, errCh := c.api.Events(ctx, events.ListOptions{
+		Filters: filters.NewArgs(
+			filters.Arg("type", "container"),
+			projectFilter(c.project.Name)),
+	})
+	for {
+		select {
+		case <-ctx.Done():
+			return nil
+		case err := <-errCh:
+			return err
+		case event := <-evtCh:
+			if !c.services[event.Actor.Attributes[api.ServiceLabel]] {
+				continue
+			}
+			ctr, err := c.getContainerSummary(event)
+			if err != nil {
+				return err
+			}
+
+			switch event.Action {
+			case events.ActionCreate:
+				containers.Add(ctr.ID)
+				for _, listener := range c.listeners {
+					listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated))
+				}
+				logrus.Debugf("container %s created", ctr.Name)
+			case events.ActionStart:
+				restarted := restarting.Has(ctr.ID)
+				for _, listener := range c.listeners {
+					listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) {
+						e.Restarting = restarted
+					}))
+				}
+				if restarted {
+					logrus.Debugf("container %s restarted", ctr.Name)
+				} else {
+					logrus.Debugf("container %s started", ctr.Name)
+				}
+				containers.Add(ctr.ID)
+			case events.ActionRestart:
+				for _, listener := range c.listeners {
+					listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted))
+				}
+				logrus.Debugf("container %s restarted", ctr.Name)
+			case events.ActionStop:
+				// when a container is in restarting phase, and we stop the application (abort-on-container-exit)
+				// we won't get any additional start+die events, just this stop as a proof container is down
+				logrus.Debugf("container %s stopped", ctr.Name)
+				containers.Remove(ctr.ID)
+			case events.ActionDie:
+				logrus.Debugf("container %s exited with code %d", ctr.Name, ctr.ExitCode)
+				inspect, err := c.api.ContainerInspect(ctx, event.Actor.ID)
+				if errdefs.IsNotFound(err) {
+					// Source is already removed
+				} else if err != nil {
+					return err
+				}
+
+				if inspect.State != nil && inspect.State.Restarting || inspect.State.Running {
+					// State.Restarting is set by engine when container is configured to restart on exit
+					// on ContainerRestart it doesn't (see https://github.com/moby/moby/issues/45538)
+					// container state still is reported as "running"
+					logrus.Debugf("container %s is restarting", ctr.Name)
+					restarting.Add(ctr.ID)
+					for _, listener := range c.listeners {
+						listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventExited, func(e *api.ContainerEvent) {
+							e.Restarting = true
+						}))
+					}
+				} else {
+					for _, listener := range c.listeners {
+						listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventExited))
+					}
+					containers.Remove(ctr.ID)
+				}
+			}
+		}
+		if len(containers) == 0 {
+			return nil
+		}
+	}
+}
+
+func newContainerEvent(timeNano int64, ctr *api.ContainerSummary, eventType int, opts ...func(e *api.ContainerEvent)) api.ContainerEvent {
+	name := ctr.Name
+	defaultName := getDefaultContainerName(ctr.Project, ctr.Labels[api.ServiceLabel], ctr.Labels[api.ContainerNumberLabel])
+	if name == defaultName {
+		// remove project- prefix
+		name = name[len(ctr.Project)+1:]
+	}
+
+	event := api.ContainerEvent{
+		Type:      eventType,
+		Container: ctr,
+		Time:      timeNano,
+		Source:    name,
+		ID:        ctr.ID,
+		Service:   ctr.Service,
+		ExitCode:  ctr.ExitCode,
+	}
+	for _, opt := range opts {
+		opt(&event)
+	}
+	return event
+}
+
+func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSummary, error) {
+	ctr := &api.ContainerSummary{
+		ID:      event.Actor.ID,
+		Name:    event.Actor.Attributes["name"],
+		Project: c.project.Name,
+		Service: event.Actor.Attributes[api.ServiceLabel],
+		Labels:  event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us
+	}
+	if ec, ok := event.Actor.Attributes["exitCode"]; ok {
+		exitCode, err := strconv.Atoi(ec)
+		if err != nil {
+			return nil, err
+		}
+		ctr.ExitCode = exitCode
+	}
+	return ctr, nil
+}
+
+func (c *monitor) withListener(listener api.ContainerEventListener) {
+	c.listeners = append(c.listeners, listener)
+}

+ 9 - 72
pkg/compose/printer.go

@@ -26,8 +26,7 @@ import (
 // logPrinter watch application containers and collect their logs
 type logPrinter interface {
 	HandleEvent(event api.ContainerEvent)
-	Run(cascade api.Cascade, exitCodeFrom string, stopFn func() error) (int, error)
-	Cancel()
+	Run() error
 	Stop()
 }
 
@@ -49,11 +48,6 @@ func newLogPrinter(consumer api.LogConsumer) logPrinter {
 	return &printer
 }
 
-func (p *printer) Cancel() {
-	// note: HandleEvent is used to ensure this doesn't deadlock
-	p.HandleEvent(api.ContainerEvent{Type: api.UserCancel})
-}
-
 func (p *printer) Stop() {
 	p.stop.Do(func() {
 		close(p.stopCh)
@@ -78,82 +72,25 @@ func (p *printer) HandleEvent(event api.ContainerEvent) {
 	}
 }
 
-//nolint:gocyclo
-func (p *printer) Run(cascade api.Cascade, exitCodeFrom string, stopFn func() error) (int, error) {
-	var (
-		aborting bool
-		exitCode int
-	)
+func (p *printer) Run() error {
 	defer p.Stop()
 
 	// containers we are tracking. Use true when container is running, false after we receive a stop|die signal
-	containers := map[string]bool{}
 	for {
 		select {
 		case <-p.stopCh:
-			return exitCode, nil
+			return nil
 		case event := <-p.queue:
-			container, id := event.Container, event.ID
 			switch event.Type {
-			case api.UserCancel:
-				aborting = true
-			case api.ContainerEventAttach:
-				if attached, ok := containers[id]; ok && attached {
-					continue
-				}
-				containers[id] = true
-				p.consumer.Register(container)
-			case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated:
-				if !aborting && containers[id] {
-					p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
-					if event.Type == api.ContainerEventRecreated {
-						p.consumer.Status(container, "has been recreated")
-					}
-				}
-				containers[id] = false
-				if !event.Restarting {
-					delete(containers, id)
-				}
-
-				if cascade == api.CascadeStop {
-					if !aborting {
-						aborting = true
-						err := stopFn()
-						if err != nil {
-							return 0, err
-						}
-					}
-				}
-				if event.Type == api.ContainerEventExit {
-					if cascade == api.CascadeFail && event.ExitCode != 0 {
-						exitCodeFrom = event.Service
-						if !aborting {
-							aborting = true
-							err := stopFn()
-							if err != nil {
-								return 0, err
-							}
-						}
-					}
-					if cascade == api.CascadeStop && exitCodeFrom == "" {
-						exitCodeFrom = event.Service
-					}
-				}
-
-				if exitCodeFrom == event.Service && (event.Type == api.ContainerEventExit || event.Type == api.ContainerEventStopped) {
-					// Container was interrupted or exited, let's capture exit code
-					exitCode = event.ExitCode
-				}
-				if len(containers) == 0 {
-					// Last container terminated, done
-					return exitCode, nil
+			case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted:
+				p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
+				if event.Type == api.ContainerEventRecreated {
+					p.consumer.Status(event.Source, "has been recreated")
 				}
 			case api.ContainerEventLog, api.HookEventLog:
-				p.consumer.Log(container, event.Line)
+				p.consumer.Log(event.Source, event.Line)
 			case api.ContainerEventErr:
-				if !aborting {
-					p.consumer.Err(container, event.Line)
-				}
+				p.consumer.Err(event.Source, event.Line)
 			}
 		}
 	}

+ 0 - 225
pkg/compose/start.go

@@ -20,14 +20,10 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"slices"
 	"strings"
-	"time"
 
-	cerrdefs "github.com/containerd/errdefs"
 	"github.com/docker/compose/v2/pkg/api"
 	"github.com/docker/compose/v2/pkg/progress"
-	"github.com/docker/compose/v2/pkg/utils"
 	containerType "github.com/docker/docker/api/types/container"
 
 	"github.com/compose-spec/compose-go/v2/types"
@@ -66,48 +62,6 @@ func (s *composeService) start(ctx context.Context, projectName string, options
 		if err != nil {
 			return err
 		}
-
-		eg.Go(func() error {
-			// it's possible to have a required service whose log output is not desired
-			// (i.e. it's not in the attach set), so watch everything and then filter
-			// calls to attach; this ensures that `watchContainers` blocks until all
-			// required containers have exited, even if their output is not being shown
-			attachTo := utils.NewSet[string](options.AttachTo...)
-			required := utils.NewSet[string](options.Services...)
-			toWatch := attachTo.Union(required).Elements()
-
-			containers, err := s.getContainers(ctx, projectName, oneOffExclude, true, toWatch...)
-			if err != nil {
-				return err
-			}
-
-			// N.B. this uses the parent context (instead of attachCtx) so that the watch itself can
-			// continue even if one of the log streams fails
-			return s.watchContainers(ctx, project.Name, toWatch, required.Elements(), listener, containers,
-				func(ctr containerType.Summary, _ time.Time) error {
-					svc := ctr.Labels[api.ServiceLabel]
-					if attachTo.Has(svc) {
-						return s.attachContainer(attachCtx, ctr, listener)
-					}
-
-					// HACK: simulate an "attach" event
-					listener(api.ContainerEvent{
-						Type:      api.ContainerEventAttach,
-						Container: getContainerNameWithoutProject(ctr),
-						ID:        ctr.ID,
-						Service:   svc,
-					})
-					return nil
-				}, func(ctr containerType.Summary, _ time.Time) error {
-					listener(api.ContainerEvent{
-						Type:      api.ContainerEventAttach,
-						Container: "", // actual name will be set by start event
-						ID:        ctr.ID,
-						Service:   ctr.Labels[api.ServiceLabel],
-					})
-					return nil
-				})
-		})
 	}
 
 	var containers Containers
@@ -173,182 +127,3 @@ func getDependencyCondition(service types.ServiceConfig, project *types.Project)
 	}
 	return ServiceConditionRunningOrHealthy
 }
-
-type containerWatchFn func(ctr containerType.Summary, t time.Time) error
-
-// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
-func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo
-	projectName string, services, required []string,
-	listener api.ContainerEventListener, containers Containers, onStart, onRecreate containerWatchFn,
-) error {
-	if len(containers) == 0 {
-		return nil
-	}
-	if len(required) == 0 {
-		required = services
-	}
-
-	unexpected := utils.NewSet[string](required...).Diff(utils.NewSet[string](services...))
-	if len(unexpected) != 0 {
-		return fmt.Errorf(`required service(s) "%s" not present in watched service(s) "%s"`,
-			strings.Join(unexpected.Elements(), ", "),
-			strings.Join(services, ", "))
-	}
-
-	// predicate to tell if a container we receive event for should be considered or ignored
-	ofInterest := func(c containerType.Summary) bool {
-		if len(services) > 0 {
-			// we only watch some services
-			return slices.Contains(services, c.Labels[api.ServiceLabel])
-		}
-		return true
-	}
-
-	// predicate to tell if a container we receive event for should be watched until termination
-	isRequired := func(c containerType.Summary) bool {
-		if len(services) > 0 && len(required) > 0 {
-			// we only watch some services
-			return slices.Contains(required, c.Labels[api.ServiceLabel])
-		}
-		return true
-	}
-
-	var (
-		expected = utils.NewSet[string]()
-		watched  = map[string]int{}
-		replaced []string
-	)
-	for _, c := range containers {
-		if isRequired(c) {
-			expected.Add(c.ID)
-		}
-		watched[c.ID] = 0
-	}
-
-	ctx, stop := context.WithCancel(ctx)
-	err := s.Events(ctx, projectName, api.EventsOptions{
-		Services: services,
-		Consumer: func(event api.Event) error {
-			defer func() {
-				// after consuming each event, check to see if we're done
-				if len(expected) == 0 {
-					stop()
-				}
-			}()
-			inspected, err := s.apiClient().ContainerInspect(ctx, event.Container)
-			if err != nil {
-				if cerrdefs.IsNotFound(err) {
-					// it's possible to get "destroy" or "kill" events but not
-					// be able to inspect in time before they're gone from the
-					// API, so just remove the watch without erroring
-					delete(watched, event.Container)
-					expected.Remove(event.Container)
-					return nil
-				}
-				return err
-			}
-			container := containerType.Summary{
-				ID:     inspected.ID,
-				Names:  []string{inspected.Name},
-				Labels: inspected.Config.Labels,
-			}
-			name := getContainerNameWithoutProject(container)
-			service := container.Labels[api.ServiceLabel]
-			switch event.Status {
-			case "stop":
-				if inspected.State.Running {
-					// on sync+restart action the container stops -> dies -> start -> restart
-					// we do not want to stop the current container, we want to restart it
-					return nil
-				}
-				if _, ok := watched[container.ID]; ok {
-					eType := api.ContainerEventStopped
-					if slices.Contains(replaced, container.ID) {
-						replaced = slices.DeleteFunc(replaced, func(e string) bool { return e == container.ID })
-						eType = api.ContainerEventRecreated
-					}
-					listener(api.ContainerEvent{
-						Type:      eType,
-						Container: name,
-						ID:        container.ID,
-						Service:   service,
-						ExitCode:  inspected.State.ExitCode,
-					})
-				}
-
-				delete(watched, container.ID)
-				expected.Remove(container.ID)
-			case "die":
-				restarted := watched[container.ID]
-				watched[container.ID] = restarted + 1
-				// Container terminated.
-				willRestart := inspected.State.Restarting
-				if inspected.State.Running {
-					// on sync+restart action inspected.State.Restarting is false,
-					// however the container is already running before it restarts
-					willRestart = true
-				}
-
-				eType := api.ContainerEventExit
-				if slices.Contains(replaced, container.ID) {
-					replaced = slices.DeleteFunc(replaced, func(e string) bool { return e == container.ID })
-					eType = api.ContainerEventRecreated
-				}
-
-				listener(api.ContainerEvent{
-					Type:       eType,
-					Container:  name,
-					ID:         container.ID,
-					Service:    service,
-					ExitCode:   inspected.State.ExitCode,
-					Restarting: willRestart,
-				})
-
-				if !willRestart {
-					// we're done with this one
-					delete(watched, container.ID)
-					expected.Remove(container.ID)
-				}
-			case "start":
-				count, ok := watched[container.ID]
-				mustAttach := ok && count > 0 // Container restarted, need to re-attach
-				if !ok {
-					// A new container has just been added to service by scale
-					watched[container.ID] = 0
-					expected.Add(container.ID)
-					mustAttach = true
-				}
-				if mustAttach {
-					// Container restarted, need to re-attach
-					err := onStart(container, event.Timestamp)
-					if err != nil {
-						return err
-					}
-				}
-			case "create":
-				if id, ok := container.Labels[api.ContainerReplaceLabel]; ok {
-					replaced = append(replaced, id)
-					err = onRecreate(container, event.Timestamp)
-					if err != nil {
-						return err
-					}
-					if expected.Has(id) {
-						expected.Add(inspected.ID)
-						expected.Add(container.ID)
-					}
-					watched[container.ID] = 1
-				} else if ofInterest(container) {
-					watched[container.ID] = 1
-					if isRequired(container) {
-						expected.Add(container.ID)
-					}
-				}
-			}
-			return nil
-		},
-	})
-	if errors.Is(ctx.Err(), context.Canceled) {
-		return nil
-	}
-	return err
-}

+ 63 - 23
pkg/compose/up.go

@@ -159,24 +159,6 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
 		}
 	})
 
-	var exitCode int
-	eg.Go(func() error {
-		code, err := printer.Run(options.Start.OnExit, options.Start.ExitCodeFrom, func() error {
-			_, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...")
-			eg.Go(func() error {
-				return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error {
-					return s.stop(ctx, project.Name, api.StopOptions{
-						Services: options.Create.Services,
-						Project:  project,
-					}, printer.HandleEvent)
-				}, s.stdinfo(), logConsumer)
-			})
-			return nil
-		})
-		exitCode = code
-		return err
-	})
-
 	if options.Start.Watch && watcher != nil {
 		err = watcher.Start(ctx)
 		if err != nil {
@@ -184,17 +166,75 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
 		}
 	}
 
+	monitor := newMonitor(s.apiClient(), project)
+	monitor.withListener(printer.HandleEvent)
+
+	var exitCode int
+	if options.Start.OnExit != api.CascadeIgnore {
+		once := true
+		// detect first container to exit to trigger application shutdown
+		monitor.withListener(func(event api.ContainerEvent) {
+			if once && event.Type == api.ContainerEventExited {
+				exitCode = event.ExitCode
+				printer.Stop()
+				_, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...")
+				eg.Go(func() error {
+					return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error {
+						return s.stop(ctx, project.Name, api.StopOptions{
+							Services: options.Create.Services,
+							Project:  project,
+						}, printer.HandleEvent)
+					}, s.stdinfo(), logConsumer)
+				})
+				once = false
+			}
+		})
+	}
+
+	if options.Start.ExitCodeFrom != "" {
+		once := true
+		// capture exit code from first container to exit with selected service
+		monitor.withListener(func(event api.ContainerEvent) {
+			if once && event.Type == api.ContainerEventExited && event.Service == options.Start.ExitCodeFrom {
+				exitCode = event.ExitCode
+				once = false
+			}
+		})
+	}
+
+	monitor.withListener(func(event api.ContainerEvent) {
+		mustAttach := false
+		switch event.Type {
+		case api.ContainerEventCreated:
+			// A container has been added to the application (scale)
+			mustAttach = true
+		case api.ContainerEventStarted:
+			// A container is restarting - need to re-attach
+			mustAttach = event.Restarting
+		}
+		if mustAttach {
+			eg.Go(func() error {
+				// FIXME as container already started, we might miss the very first logs
+				return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent)
+			})
+		}
+	})
+
+	eg.Go(func() error {
+		err := monitor.Start(ctx)
+		fmt.Println("monitor complete")
+		// Signal for the signal-handler goroutines to stop
+		close(doneCh)
+		printer.Stop()
+		return err
+	})
+
 	// We use the parent context without cancellation as we manage sigterm to stop the stack
 	err = s.start(context.WithoutCancel(ctx), project.Name, options.Start, printer.HandleEvent)
 	if err != nil && !isTerminated.Load() { // Ignore error if the process is terminated
 		return err
 	}
 
-	// Signal for the signal-handler goroutines to stop
-	close(doneCh)
-
-	printer.Stop()
-
 	err = eg.Wait().ErrorOrNil()
 	if exitCode != 0 {
 		errMsg := ""

+ 0 - 1
pkg/compose/watch.go

@@ -192,7 +192,6 @@ func (s *composeService) watch(ctx context.Context, project *types.Project, opti
 		return nil, err
 	}
 	eg, ctx := errgroup.WithContext(ctx)
-	options.LogTo.Register(api.WatchLogger)
 
 	var (
 		rules []watchRule

+ 0 - 3
pkg/compose/watch_test.go

@@ -71,9 +71,6 @@ func (s stdLogger) Status(containerName, msg string) {
 	fmt.Printf("%s: %s\n", containerName, msg)
 }
 
-func (s stdLogger) Register(containerName string) {
-}
-
 func TestWatch_Sync(t *testing.T) {
 	mockCtrl := gomock.NewController(t)
 	cli := mocks.NewMockCli(mockCtrl)

+ 1 - 1
pkg/prompt/prompt_mock.go

@@ -1,5 +1,5 @@
 // Code generated by MockGen. DO NOT EDIT.
-// Container: github.com/docker/compose-cli/pkg/prompt (interfaces: UI)
+// Source: github.com/docker/compose-cli/pkg/prompt (interfaces: UI)
 
 // Package prompt is a generated GoMock package.
 package prompt