浏览代码

Merge pull request #1263 from docker/exitCodeFrom

Nicolas De loof 4 年之前
父节点
当前提交
5ddcc84a3d

+ 1 - 1
aci/compose.go

@@ -60,7 +60,7 @@ func (cs *aciComposeService) Create(ctx context.Context, project *types.Project,
 	return errdefs.ErrNotImplemented
 }
 
-func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
+func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
 	return errdefs.ErrNotImplemented
 }
 

+ 1 - 1
api/client/compose.go

@@ -44,7 +44,7 @@ func (c *composeService) Create(ctx context.Context, project *types.Project, opt
 	return errdefs.ErrNotImplemented
 }
 
-func (c *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
+func (c *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
 	return errdefs.ErrNotImplemented
 }
 

+ 27 - 1
api/compose/api.go

@@ -34,7 +34,7 @@ type Service interface {
 	// Create executes the equivalent to a `compose create`
 	Create(ctx context.Context, project *types.Project, opts CreateOptions) error
 	// Start executes the equivalent to a `compose start`
-	Start(ctx context.Context, project *types.Project, consumer LogConsumer) error
+	Start(ctx context.Context, project *types.Project, options StartOptions) error
 	// Stop executes the equivalent to a `compose stop`
 	Stop(ctx context.Context, project *types.Project) error
 	// Up executes the equivalent to a `compose up`
@@ -63,6 +63,12 @@ type CreateOptions struct {
 	Recreate string
 }
 
+// StartOptions group options of the Start API
+type StartOptions struct {
+	// Attach will attach to service containers and send container logs and events
+	Attach ContainerEventListener
+}
+
 // UpOptions group options of the Up API
 type UpOptions struct {
 	// Detach will create services and return immediately
@@ -177,4 +183,24 @@ type Stack struct {
 // LogConsumer is a callback to process log messages from services
 type LogConsumer interface {
 	Log(service, container, message string)
+	Status(service, container, msg string)
+}
+
+// ContainerEventListener is a callback to process ContainerEvent from services
+type ContainerEventListener func(event ContainerEvent)
+
+// ContainerEvent notify an event has been collected on Source container implementing Service
+type ContainerEvent struct {
+	Type     int
+	Source   string
+	Service  string
+	Line     string
+	ExitCode int
 }
+
+const (
+	// ContainerEventLog is a ContainerEvent of type log. Line is set
+	ContainerEventLog = iota
+	// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
+	ContainerEventExit
+)

+ 1 - 1
cli/cmd/compose/run.go

@@ -102,7 +102,7 @@ func startDependencies(ctx context.Context, c *client.Client, project *types.Pro
 	if err := c.ComposeService().Create(ctx, project, compose.CreateOptions{}); err != nil {
 		return err
 	}
-	if err := c.ComposeService().Start(ctx, project, nil); err != nil {
+	if err := c.ComposeService().Start(ctx, project, compose.StartOptions{}); err != nil {
 		return err
 	}
 	return nil

+ 25 - 6
cli/cmd/compose/start.go

@@ -18,13 +18,12 @@ package compose
 
 import (
 	"context"
-	"os"
-
-	"github.com/spf13/cobra"
 
 	"github.com/docker/compose-cli/api/client"
+	"github.com/docker/compose-cli/api/compose"
 	"github.com/docker/compose-cli/api/progress"
-	"github.com/docker/compose-cli/cli/formatter"
+
+	"github.com/spf13/cobra"
 )
 
 type startOptions struct {
@@ -61,10 +60,30 @@ func runStart(ctx context.Context, opts startOptions, services []string) error {
 
 	if opts.Detach {
 		_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
-			return "", c.ComposeService().Start(ctx, project, nil)
+			return "", c.ComposeService().Start(ctx, project, compose.StartOptions{})
 		})
 		return err
 	}
 
-	return c.ComposeService().Start(ctx, project, formatter.NewLogConsumer(ctx, os.Stdout))
+	queue := make(chan compose.ContainerEvent)
+	printer := printer{
+		queue: queue,
+	}
+	err = c.ComposeService().Start(ctx, project, compose.StartOptions{
+		Attach: func(event compose.ContainerEvent) {
+			queue <- event
+		},
+	})
+	if err != nil {
+		return err
+	}
+
+	_, err = printer.run(ctx, false, "", func() error {
+		ctx := context.Background()
+		_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
+			return "", c.ComposeService().Stop(ctx, project)
+		})
+		return err
+	})
+	return err
 }

+ 86 - 7
cli/cmd/compose/up.go

@@ -18,18 +18,21 @@ package compose
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"os"
+	"os/signal"
 	"path/filepath"
+	"syscall"
 
 	"github.com/docker/compose-cli/api/client"
 	"github.com/docker/compose-cli/api/compose"
 	"github.com/docker/compose-cli/api/context/store"
 	"github.com/docker/compose-cli/api/progress"
+	"github.com/docker/compose-cli/cli/cmd"
 	"github.com/docker/compose-cli/cli/formatter"
 
 	"github.com/compose-spec/compose-go/types"
+	"github.com/sirupsen/logrus"
 	"github.com/spf13/cobra"
 )
 
@@ -49,6 +52,8 @@ type upOptions struct {
 	forceRecreate bool
 	noRecreate    bool
 	noStart       bool
+	cascadeStop   bool
+	exitCodeFrom  string
 }
 
 func (o upOptions) recreateStrategy() string {
@@ -73,6 +78,12 @@ func upCommand(p *projectOptions, contextType string) *cobra.Command {
 		RunE: func(cmd *cobra.Command, args []string) error {
 			switch contextType {
 			case store.LocalContextType, store.DefaultContextType, store.EcsLocalSimulationContextType:
+				if opts.exitCodeFrom != "" {
+					opts.cascadeStop = true
+				}
+				if opts.cascadeStop && opts.Detach {
+					return fmt.Errorf("--abort-on-container-exit and --detach are incompatible")
+				}
 				if opts.forceRecreate && opts.noRecreate {
 					return fmt.Errorf("--force-recreate and --no-recreate are incompatible")
 				}
@@ -95,6 +106,8 @@ func upCommand(p *projectOptions, contextType string) *cobra.Command {
 		flags.BoolVar(&opts.forceRecreate, "force-recreate", false, "Recreate containers even if their configuration and image haven't changed.")
 		flags.BoolVar(&opts.noRecreate, "no-recreate", false, "If containers already exist, don't recreate them. Incompatible with --force-recreate.")
 		flags.BoolVar(&opts.noStart, "no-start", false, "Don't start the services after creating them.")
+		flags.BoolVar(&opts.cascadeStop, "abort-on-container-exit", false, "Stops all containers if any container was stopped. Incompatible with -d")
+		flags.StringVar(&opts.exitCodeFrom, "exit-code-from", "", "Return the exit code of the selected service container. Implies --abort-on-container-exit")
 	}
 
 	return upCmd
@@ -120,6 +133,13 @@ func runCreateStart(ctx context.Context, opts upOptions, services []string) erro
 		return err
 	}
 
+	if opts.exitCodeFrom != "" {
+		_, err := project.GetService(opts.exitCodeFrom)
+		if err != nil {
+			return err
+		}
+	}
+
 	_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
 		err := c.ComposeService().Create(ctx, project, compose.CreateOptions{
 			RemoveOrphans: opts.removeOrphans,
@@ -129,7 +149,7 @@ func runCreateStart(ctx context.Context, opts upOptions, services []string) erro
 			return "", err
 		}
 		if opts.Detach {
-			err = c.ComposeService().Start(ctx, project, nil)
+			err = c.ComposeService().Start(ctx, project, compose.StartOptions{})
 		}
 		return "", err
 	})
@@ -145,13 +165,38 @@ func runCreateStart(ctx context.Context, opts upOptions, services []string) erro
 		return nil
 	}
 
-	err = c.ComposeService().Start(ctx, project, formatter.NewLogConsumer(ctx, os.Stdout))
-	if errors.Is(ctx.Err(), context.Canceled) {
-		fmt.Println("Gracefully stopping...")
-		ctx = context.Background()
-		_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
+	queue := make(chan compose.ContainerEvent)
+	printer := printer{
+		queue: queue,
+	}
+
+	stopFunc := func() error {
+		ctx := context.Background()
+		_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
 			return "", c.ComposeService().Stop(ctx, project)
 		})
+		return err
+	}
+	signalChan := make(chan os.Signal, 1)
+	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+	go func() {
+		<-signalChan
+		fmt.Println("Gracefully stopping...")
+		stopFunc() // nolint:errcheck
+	}()
+
+	err = c.ComposeService().Start(ctx, project, compose.StartOptions{
+		Attach: func(event compose.ContainerEvent) {
+			queue <- event
+		},
+	})
+	if err != nil {
+		return err
+	}
+
+	exitCode, err := printer.run(ctx, opts.cascadeStop, opts.exitCodeFrom, stopFunc)
+	if exitCode != 0 {
+		return cmd.ExitCodeError{ExitCode: exitCode}
 	}
 	return err
 }
@@ -196,3 +241,37 @@ func setup(ctx context.Context, opts composeOptions, services []string) (*client
 
 	return c, project, nil
 }
+
+type printer struct {
+	queue chan compose.ContainerEvent
+}
+
+func (p printer) run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) { //nolint:unparam
+	consumer := formatter.NewLogConsumer(ctx, os.Stdout)
+	var aborting bool
+	for {
+		event := <-p.queue
+		switch event.Type {
+		case compose.ContainerEventExit:
+			if !aborting {
+				consumer.Status(event.Service, event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
+			}
+			if cascadeStop && !aborting {
+				aborting = true
+				fmt.Println("Aborting on container exit...")
+				err := stopFn()
+				if err != nil {
+					return 0, err
+				}
+			}
+			if exitCodeFrom == "" || exitCodeFrom == event.Service {
+				logrus.Error(event.ExitCode)
+				return event.ExitCode, nil
+			}
+		case compose.ContainerEventLog:
+			if !aborting {
+				consumer.Log(event.Service, event.Source, event.Line)
+			}
+		}
+	}
+}

+ 28 - 0
cli/cmd/exit.go

@@ -0,0 +1,28 @@
+/*
+   Copyright 2020 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package cmd
+
+import "strconv"
+
+// ExitCodeError reports an exit code set by command.
+type ExitCodeError struct {
+	ExitCode int
+}
+
+func (e ExitCodeError) Error() string {
+	return strconv.Itoa(e.ExitCode)
+}

+ 17 - 6
cli/formatter/logs.go

@@ -42,12 +42,7 @@ func (l *logConsumer) Log(service, container, message string) {
 	if l.ctx.Err() != nil {
 		return
 	}
-	cf, ok := l.colors[service]
-	if !ok {
-		cf = <-loop
-		l.colors[service] = cf
-		l.computeWidth()
-	}
+	cf := l.getColorFunc(service)
 	prefix := fmt.Sprintf("%-"+strconv.Itoa(l.width)+"s |", container)
 
 	for _, line := range strings.Split(message, "\n") {
@@ -56,6 +51,22 @@ func (l *logConsumer) Log(service, container, message string) {
 	}
 }
 
+func (l *logConsumer) Status(service, container, msg string) {
+	cf := l.getColorFunc(service)
+	buf := bytes.NewBufferString(cf(fmt.Sprintf("%s %s\n", container, msg)))
+	l.writer.Write(buf.Bytes()) // nolint:errcheck
+}
+
+func (l *logConsumer) getColorFunc(service string) colorFunc {
+	cf, ok := l.colors[service]
+	if !ok {
+		cf = <-loop
+		l.colors[service] = cf
+		l.computeWidth()
+	}
+	return cf
+}
+
 func (l *logConsumer) computeWidth() {
 	width := 0
 	for n := range l.colors {

+ 9 - 0
cli/main.go

@@ -170,6 +170,10 @@ func main() {
 		fmt.Fprintf(os.Stderr, "Unable to parse logging level: %s\n", opts.LogLevel)
 		os.Exit(1)
 	}
+	logrus.SetFormatter(&logrus.TextFormatter{
+		DisableTimestamp:       true,
+		DisableLevelTruncation: true,
+	})
 	logrus.SetLevel(level)
 	if opts.Debug {
 		logrus.SetLevel(logrus.DebugLevel)
@@ -241,6 +245,11 @@ $ docker context create %s <name>`, cc.Type(), store.EcsContextType), ctype)
 }
 
 func exit(ctx string, err error, ctype string) {
+	if exit, ok := err.(cmd.ExitCodeError); ok {
+		metrics.Track(ctype, os.Args[1:], metrics.SuccessStatus)
+		os.Exit(exit.ExitCode)
+	}
+
 	metrics.Track(ctype, os.Args[1:], metrics.FailureStatus)
 
 	if errors.Is(err, errdefs.ErrLoginRequired) {

+ 2 - 2
ecs/local/compose.go

@@ -53,8 +53,8 @@ func (e ecsLocalSimulation) Create(ctx context.Context, project *types.Project,
 	return e.compose.Create(ctx, enhanced, opts)
 }
 
-func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
-	return e.compose.Start(ctx, project, consumer)
+func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
+	return e.compose.Start(ctx, project, options)
 }
 
 func (e ecsLocalSimulation) Stop(ctx context.Context, project *types.Project) error {

+ 2 - 26
ecs/logs.go

@@ -20,37 +20,13 @@ import (
 	"context"
 
 	"github.com/docker/compose-cli/api/compose"
+	"github.com/docker/compose-cli/utils"
 )
 
 func (b *ecsAPIService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer, options compose.LogOptions) error {
 	if len(options.Services) > 0 {
-		consumer = filteredLogConsumer(consumer, options.Services)
+		consumer = utils.FilteredLogConsumer(consumer, options.Services)
 	}
 	err := b.aws.GetLogs(ctx, projectName, consumer.Log, options.Follow)
 	return err
 }
-
-func filteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer {
-	if len(services) == 0 {
-		return consumer
-	}
-	allowed := map[string]bool{}
-	for _, s := range services {
-		allowed[s] = true
-	}
-	return &allowListLogConsumer{
-		allowList: allowed,
-		delegate:  consumer,
-	}
-}
-
-type allowListLogConsumer struct {
-	allowList map[string]bool
-	delegate  compose.LogConsumer
-}
-
-func (a *allowListLogConsumer) Log(service, container, message string) {
-	if a.allowList[service] {
-		a.delegate.Log(service, container, message)
-	}
-}

+ 1 - 1
ecs/up.go

@@ -47,7 +47,7 @@ func (b *ecsAPIService) Create(ctx context.Context, project *types.Project, opts
 	return errdefs.ErrNotImplemented
 }
 
-func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
+func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
 	return errdefs.ErrNotImplemented
 }
 

+ 1 - 0
go.sum

@@ -485,6 +485,7 @@ github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
 github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/fvbommel/sortorder v1.0.1 h1:dSnXLt4mJYH25uDDGa3biZNQsozaUWDSWeKJ0qqFfzE=
 github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
 github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7 h1:LofdAjjjqCSXMwLGgOgnE+rdPuvX9DxCqaHwKy7i/ko=
 github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=

+ 1 - 1
kube/compose.go

@@ -144,7 +144,7 @@ func (s *composeService) Create(ctx context.Context, project *types.Project, opt
 }
 
 // Start executes the equivalent to a `compose start`
-func (s *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
+func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
 	return errdefs.ErrNotImplemented
 }
 

+ 18 - 26
local/compose/attach.go

@@ -24,26 +24,17 @@ import (
 
 	"github.com/docker/compose-cli/api/compose"
 	convert "github.com/docker/compose-cli/local/moby"
-	"github.com/docker/compose-cli/utils"
 
 	"github.com/compose-spec/compose-go/types"
 	moby "github.com/docker/docker/api/types"
-	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/pkg/stdcopy"
-	"golang.org/x/sync/errgroup"
 )
 
-func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.LogConsumer) (*errgroup.Group, error) {
-	containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
-		Filters: filters.NewArgs(
-			projectFilter(project.Name),
-		),
-		All: true,
-	})
+func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.ContainerEventListener) (Containers, error) {
+	containers, err := s.getContainers(ctx, project)
 	if err != nil {
 		return nil, err
 	}
-	containers = Containers(containers).filter(isService(project.ServiceNames()...))
 
 	var names []string
 	for _, c := range containers {
@@ -51,19 +42,18 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con
 	}
 	fmt.Printf("Attaching to %s\n", strings.Join(names, ", "))
 
-	eg, ctx := errgroup.WithContext(ctx)
-	for _, c := range containers {
-		container := c
-		eg.Go(func() error {
-			return s.attachContainer(ctx, container, consumer, project)
-		})
+	for _, container := range containers {
+		err := s.attachContainer(ctx, container, consumer, project)
+		if err != nil {
+			return nil, err
+		}
 	}
-	return eg, nil
+	return containers, nil
 }
 
-func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error {
+func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.ContainerEventListener, project *types.Project) error {
 	serviceName := container.Labels[serviceLabel]
-	w := utils.GetWriter(serviceName, getCanonicalContainerName(container), consumer)
+	w := getWriter(serviceName, getContainerNameWithoutProject(container), consumer)
 
 	service, err := project.GetService(serviceName)
 	if err != nil {
@@ -94,13 +84,15 @@ func (s *composeService) attachContainerStreams(ctx context.Context, container m
 	}
 
 	if w != nil {
-		if tty {
-			_, err = io.Copy(w, stdout)
-		} else {
-			_, err = stdcopy.StdCopy(w, w, stdout)
-		}
+		go func() {
+			if tty {
+				io.Copy(w, stdout) // nolint:errcheck
+			} else {
+				stdcopy.StdCopy(w, w, stdout) // nolint:errcheck
+			}
+		}()
 	}
-	return err
+	return nil
 }
 
 func (s *composeService) getContainerStreams(ctx context.Context, container moby.Container) (io.WriteCloser, io.ReadCloser, error) {

+ 10 - 0
local/compose/compose.go

@@ -55,6 +55,16 @@ func getCanonicalContainerName(c moby.Container) string {
 	return c.Names[0][1:]
 }
 
+func getContainerNameWithoutProject(c moby.Container) string {
+	name := getCanonicalContainerName(c)
+	project := c.Labels[projectLabel]
+	prefix := fmt.Sprintf("%s_%s_", project, c.Labels[serviceLabel])
+	if strings.HasPrefix(name, prefix) {
+		return name[len(project)+1:]
+	}
+	return name
+}
+
 func (s *composeService) Convert(ctx context.Context, project *types.Project, options compose.ConvertOptions) ([]byte, error) {
 	switch options.Format {
 	case "json":

+ 22 - 1
local/compose/containers.go

@@ -16,11 +16,32 @@
 
 package compose
 
-import moby "github.com/docker/docker/api/types"
+import (
+	"context"
+
+	"github.com/compose-spec/compose-go/types"
+	moby "github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/filters"
+)
 
 // Containers is a set of moby Container
 type Containers []moby.Container
 
+func (s *composeService) getContainers(ctx context.Context, project *types.Project) (Containers, error) {
+	var containers Containers
+	containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
+		Filters: filters.NewArgs(
+			projectFilter(project.Name),
+		),
+		All: true,
+	})
+	if err != nil {
+		return nil, err
+	}
+	containers = containers.filter(isService(project.ServiceNames()...))
+	return containers, nil
+}
+
 // containerPredicate define a predicate we want container to satisfy for filtering operations
 type containerPredicate func(c moby.Container) bool
 

+ 33 - 1
local/compose/logs.go

@@ -17,6 +17,7 @@
 package compose
 
 import (
+	"bytes"
 	"context"
 	"io"
 
@@ -52,6 +53,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 	}
 	eg, ctx := errgroup.WithContext(ctx)
 	for _, c := range list {
+		c := c
 		service := c.Labels[serviceLabel]
 		if ignore(service) {
 			continue
@@ -73,7 +75,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 			if err != nil {
 				return err
 			}
-			w := utils.GetWriter(service, container.Name[1:], consumer)
+			w := utils.GetWriter(service, getContainerNameWithoutProject(c), consumer)
 			if container.Config.Tty {
 				_, err = io.Copy(w, r)
 			} else {
@@ -84,3 +86,33 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 	}
 	return eg.Wait()
 }
+
+type splitBuffer struct {
+	service   string
+	container string
+	consumer  compose.ContainerEventListener
+}
+
+// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
+func getWriter(service, container string, events compose.ContainerEventListener) io.Writer {
+	return splitBuffer{
+		service:   service,
+		container: container,
+		consumer:  events,
+	}
+}
+
+func (s splitBuffer) Write(b []byte) (n int, err error) {
+	split := bytes.Split(b, []byte{'\n'})
+	for _, line := range split {
+		if len(line) != 0 {
+			s.consumer(compose.ContainerEvent{
+				Type:    compose.ContainerEventLog,
+				Service: s.service,
+				Source:  s.container,
+				Line:    string(line),
+			})
+		}
+	}
+	return len(b), nil
+}

+ 28 - 8
local/compose/start.go

@@ -22,17 +22,18 @@ import (
 	"github.com/docker/compose-cli/api/compose"
 
 	"github.com/compose-spec/compose-go/types"
-	"golang.org/x/sync/errgroup"
+	"github.com/docker/docker/api/types/container"
+	"github.com/sirupsen/logrus"
 )
 
-func (s *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
-	var group *errgroup.Group
-	if consumer != nil {
-		eg, err := s.attach(ctx, project, consumer)
+func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
+	var containers Containers
+	if options.Attach != nil {
+		c, err := s.attach(ctx, project, options.Attach)
 		if err != nil {
 			return err
 		}
-		group = eg
+		containers = c
 	}
 
 	err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
@@ -41,8 +42,27 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, cons
 	if err != nil {
 		return err
 	}
-	if group != nil {
-		return group.Wait()
+
+	if options.Attach == nil {
+		return nil
+	}
+
+	for _, c := range containers {
+		c := c
+		go func() {
+			statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning)
+			select {
+			case status := <-statusC:
+				options.Attach(compose.ContainerEvent{
+					Type:     compose.ContainerEventExit,
+					Source:   getCanonicalContainerName(c),
+					Service:  c.Labels[serviceLabel],
+					ExitCode: int(status.StatusCode),
+				})
+			case err := <-errC:
+				logrus.Warnf("Unexpected API error for %s : %s\n", getCanonicalContainerName(c), err.Error())
+			}
+		}()
 	}
 	return nil
 }

+ 48 - 0
local/e2e/compose/cascade_stop_test.go

@@ -0,0 +1,48 @@
+/*
+   Copyright 2020 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package e2e
+
+import (
+	"testing"
+
+	"gotest.tools/v3/icmd"
+
+	. "github.com/docker/compose-cli/utils/e2e"
+)
+
+func TestCascadeStop(t *testing.T) {
+	c := NewParallelE2eCLI(t, binDir)
+
+	const projectName = "compose-e2e-logs"
+
+	t.Run("abort-on-container-exit", func(t *testing.T) {
+		res := c.RunDockerOrExitError("compose", "-f", "./fixtures/cascade-stop-test/compose.yaml", "--project-name", projectName, "up", "--abort-on-container-exit")
+		res.Assert(t, icmd.Expected{ExitCode: 1, Out: `should_fail_1 exited with code 1`})
+		res.Assert(t, icmd.Expected{ExitCode: 1, Out: `Aborting on container exit...`})
+	})
+
+	t.Run("exit-code-from", func(t *testing.T) {
+		res := c.RunDockerOrExitError("compose", "-f", "./fixtures/cascade-stop-test/compose.yaml", "--project-name", projectName, "up", "--exit-code-from=sleep")
+		res.Assert(t, icmd.Expected{ExitCode: 137, Out: `should_fail_1 exited with code 1`})
+		res.Assert(t, icmd.Expected{ExitCode: 137, Out: `Aborting on container exit...`})
+	})
+
+	t.Run("exit-code-from unknown", func(t *testing.T) {
+		res := c.RunDockerOrExitError("compose", "-f", "./fixtures/cascade-stop-test/compose.yaml", "--project-name", projectName, "up", "--exit-code-from=unknown")
+		res.Assert(t, icmd.Expected{ExitCode: 1, Err: `no such service: unknown`})
+	})
+}

+ 7 - 0
local/e2e/compose/fixtures/cascade-stop-test/compose.yaml

@@ -0,0 +1,7 @@
+services:
+  should_fail:
+    image: busybox:1.27.2
+    command: ls /does_not_exist
+  sleep: # will be killed
+    image: busybox:1.27.2
+    command: ping localhost

+ 6 - 0
utils/logconsumer.go

@@ -58,6 +58,12 @@ func (a *allowListLogConsumer) Log(service, container, message string) {
 	}
 }
 
+func (a *allowListLogConsumer) Status(service, container, message string) {
+	if a.allowList[service] {
+		a.delegate.Status(service, container, message)
+	}
+}
+
 type splitBuffer struct {
 	service   string
 	container string