Browse Source

attach to containers added by "scale"

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 4 years ago
parent
commit
c16834cba6

+ 106 - 0
api/compose/printer.go

@@ -0,0 +1,106 @@
+/*
+   Copyright 2020 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package compose
+
+import (
+	"fmt"
+
+	"github.com/sirupsen/logrus"
+)
+
+// LogPrinter watch application containers an collect their logs
+type LogPrinter interface {
+	HandleEvent(event ContainerEvent)
+	Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
+	Cancel()
+}
+
+// NewLogPrinter builds a LogPrinter passing containers logs to LogConsumer
+func NewLogPrinter(consumer LogConsumer) LogPrinter {
+	queue := make(chan ContainerEvent)
+	printer := printer{
+		consumer: consumer,
+		queue:    queue,
+	}
+	return &printer
+}
+
+func (p *printer) Cancel() {
+	p.queue <- ContainerEvent{
+		Type: UserCancel,
+	}
+}
+
+type printer struct {
+	queue    chan ContainerEvent
+	consumer LogConsumer
+}
+
+func (p *printer) HandleEvent(event ContainerEvent) {
+	p.queue <- event
+}
+
+func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) {
+	var (
+		aborting bool
+		exitCode int
+	)
+	containers := map[string]struct{}{}
+	for {
+		event := <-p.queue
+		switch event.Type {
+		case UserCancel:
+			aborting = true
+		case ContainerEventAttach:
+			if _, ok := containers[event.Container]; ok {
+				continue
+			}
+			containers[event.Container] = struct{}{}
+			p.consumer.Register(event.Container)
+		case ContainerEventExit:
+			delete(containers, event.Container)
+			if !aborting {
+				p.consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode))
+			}
+			if cascadeStop {
+				if !aborting {
+					aborting = true
+					fmt.Println("Aborting on container exit...")
+					err := stopFn()
+					if err != nil {
+						return 0, err
+					}
+				}
+				if exitCodeFrom == "" {
+					exitCodeFrom = event.Service
+				}
+				if exitCodeFrom == event.Service {
+					logrus.Error(event.ExitCode)
+					exitCode = event.ExitCode
+				}
+			}
+			if len(containers) == 0 {
+				// Last container terminated, done
+				return exitCode, nil
+			}
+		case ContainerEventLog:
+			if !aborting {
+				p.consumer.Log(event.Container, event.Service, event.Line)
+			}
+		}
+	}
+}

+ 7 - 65
cli/cmd/compose/up.go

@@ -29,7 +29,6 @@ import (
 
 	"github.com/compose-spec/compose-go/types"
 	"github.com/docker/cli/cli"
-	"github.com/sirupsen/logrus"
 	"github.com/spf13/cobra"
 	"golang.org/x/sync/errgroup"
 
@@ -275,13 +274,12 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions
 		return nil
 	}
 
-	queue := make(chan compose.ContainerEvent)
-	printer := printer{
-		queue: queue,
-	}
+	consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix)
+	printer := compose.NewLogPrinter(consumer)
 
 	signalChan := make(chan os.Signal, 1)
 	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+
 	stopFunc := func() error {
 		ctx := context.Background()
 		_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
@@ -296,27 +294,21 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions
 	}
 	go func() {
 		<-signalChan
-		queue <- compose.ContainerEvent{
-			Type: compose.UserCancel,
-		}
+		printer.Cancel()
 		fmt.Println("Gracefully stopping... (press Ctrl+C again to force)")
 		stopFunc() // nolint:errcheck
 	}()
 
-	consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix)
-
 	var exitCode int
 	eg, ctx := errgroup.WithContext(ctx)
 	eg.Go(func() error {
-		code, err := printer.run(opts.cascadeStop, opts.exitCodeFrom, consumer, stopFunc)
+		code, err := printer.Run(opts.cascadeStop, opts.exitCodeFrom, stopFunc)
 		exitCode = code
 		return err
 	})
 
 	err = backend.Start(ctx, project, compose.StartOptions{
-		Attach: func(event compose.ContainerEvent) {
-			queue <- event
-		},
+		Attach:   printer.HandleEvent,
 		Services: services,
 	})
 	if err != nil {
@@ -341,11 +333,7 @@ func setServiceScale(project *types.Project, name string, replicas int) error {
 			if err != nil {
 				return err
 			}
-			if service.Deploy == nil {
-				service.Deploy = &types.DeployConfig{}
-			}
-			count := uint64(replicas)
-			service.Deploy.Replicas = &count
+			service.Scale = replicas
 			project.Services[i] = service
 			return nil
 		}
@@ -392,49 +380,3 @@ func setup(opts composeOptions, services []string) (*types.Project, error) {
 
 	return project, nil
 }
-
-type printer struct {
-	queue chan compose.ContainerEvent
-}
-
-func (p printer) run(cascadeStop bool, exitCodeFrom string, consumer compose.LogConsumer, stopFn func() error) (int, error) {
-	var aborting bool
-	var count int
-	for {
-		event := <-p.queue
-		switch event.Type {
-		case compose.UserCancel:
-			aborting = true
-		case compose.ContainerEventAttach:
-			consumer.Register(event.Container)
-			count++
-		case compose.ContainerEventExit:
-			if !aborting {
-				consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode))
-			}
-			if cascadeStop {
-				if !aborting {
-					aborting = true
-					fmt.Println("Aborting on container exit...")
-					err := stopFn()
-					if err != nil {
-						return 0, err
-					}
-				}
-				if exitCodeFrom == "" || exitCodeFrom == event.Service {
-					logrus.Error(event.ExitCode)
-					return event.ExitCode, nil
-				}
-			}
-			count--
-			if count == 0 {
-				// Last container terminated, done
-				return 0, nil
-			}
-		case compose.ContainerEventLog:
-			if !aborting {
-				consumer.Log(event.Container, event.Service, event.Line)
-			}
-		}
-	}
-}

+ 1 - 1
cli/cmd/compose/up_test.go

@@ -39,5 +39,5 @@ func TestApplyScaleOpt(t *testing.T) {
 	assert.NilError(t, err)
 	foo, err := p.GetService("foo")
 	assert.NilError(t, err)
-	assert.Check(t, *foo.Deploy.Replicas == 2)
+	assert.Equal(t, foo.Scale, 2)
 }

+ 0 - 42
local/compose/attach.go

@@ -33,10 +33,6 @@ import (
 )
 
 func (s *composeService) attach(ctx context.Context, project *types.Project, listener compose.ContainerEventListener, selectedServices []string) (Containers, error) {
-	if len(selectedServices) == 0 {
-		selectedServices = project.ServiceNames()
-	}
-
 	containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, selectedServices...)
 	if err != nil {
 		return nil, err
@@ -57,44 +53,6 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
 			return nil, err
 		}
 	}
-
-	// Watch events to capture container restart and re-attach
-	go func() {
-		crashed := map[string]struct{}{}
-		s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck
-			Services: selectedServices,
-			Consumer: func(event compose.Event) error {
-				if event.Status == "die" {
-					crashed[event.Container] = struct{}{}
-					return nil
-				}
-				if _, ok := crashed[event.Container]; ok {
-					inspect, err := s.apiClient.ContainerInspect(ctx, event.Container)
-					if err != nil {
-						return err
-					}
-
-					container := moby.Container{
-						ID:    event.Container,
-						Names: []string{inspect.Name},
-						State: convert.ContainerRunning,
-						Labels: map[string]string{
-							projectLabel: project.Name,
-							serviceLabel: event.Service,
-						},
-					}
-
-					// Just ignore errors when reattaching to already crashed containers
-					s.attachContainer(ctx, container, listener, project) // nolint: errcheck
-					delete(crashed, event.Container)
-
-					s.waitContainer(container, listener)
-				}
-				return nil
-			},
-		})
-	}()
-
 	return containers, err
 }
 

+ 14 - 10
local/compose/down.go

@@ -191,18 +191,22 @@ func (s *composeService) removeVolume(ctx context.Context, id string, w progress
 }
 
 func (s *composeService) stopContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error {
+	eg, ctx := errgroup.WithContext(ctx)
 	for _, container := range containers {
-		toStop := container
-		eventName := getContainerProgressName(toStop)
-		w.Event(progress.StoppingEvent(eventName))
-		err := s.apiClient.ContainerStop(ctx, toStop.ID, timeout)
-		if err != nil {
-			w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
-			return err
-		}
-		w.Event(progress.StoppedEvent(eventName))
+		container := container
+		eg.Go(func() error {
+			eventName := getContainerProgressName(container)
+			w.Event(progress.StoppingEvent(eventName))
+			err := s.apiClient.ContainerStop(ctx, container.ID, timeout)
+			if err != nil {
+				w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
+				return err
+			}
+			w.Event(progress.StoppedEvent(eventName))
+			return nil
+		})
 	}
-	return nil
+	return eg.Wait()
 }
 
 func (s *composeService) removeContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error {

+ 53 - 9
local/compose/start.go

@@ -20,12 +20,13 @@ import (
 	"context"
 
 	"github.com/docker/compose-cli/api/compose"
+	convert "github.com/docker/compose-cli/local/moby"
 	"github.com/docker/compose-cli/utils"
 
 	"github.com/compose-spec/compose-go/types"
 	moby "github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/container"
-	"github.com/sirupsen/logrus"
+	"golang.org/x/sync/errgroup"
 )
 
 func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
@@ -35,11 +36,52 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
 
 	var containers Containers
 	if options.Attach != nil {
-		c, err := s.attach(ctx, project, options.Attach, options.Services)
+		attached, err := s.attach(ctx, project, options.Attach, options.Services)
 		if err != nil {
 			return err
 		}
-		containers = c
+		containers = attached
+
+		// Watch events to capture container restart and re-attach
+		go func() {
+			watched := map[string]struct{}{}
+			for _, c := range containers {
+				watched[c.ID] = struct{}{}
+			}
+			s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck
+				Services: options.Services,
+				Consumer: func(event compose.Event) error {
+					if event.Status == "start" {
+						inspect, err := s.apiClient.ContainerInspect(ctx, event.Container)
+						if err != nil {
+							return err
+						}
+
+						container := moby.Container{
+							ID:    event.Container,
+							Names: []string{inspect.Name},
+							State: convert.ContainerRunning,
+							Labels: map[string]string{
+								projectLabel: project.Name,
+								serviceLabel: event.Service,
+							},
+						}
+
+						// Just ignore errors when reattaching to already crashed containers
+						s.attachContainer(ctx, container, options.Attach, project) // nolint: errcheck
+
+						if _, ok := watched[inspect.ID]; !ok {
+							// a container has been added to the service, see --scale option
+							watched[inspect.ID] = struct{}{}
+							go func() {
+								s.waitContainer(container, options.Attach) // nolint: errcheck
+							}()
+						}
+					}
+					return nil
+				},
+			})
+		}()
 	}
 
 	err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
@@ -56,16 +98,17 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
 		return nil
 	}
 
+	eg, ctx := errgroup.WithContext(ctx)
 	for _, c := range containers {
 		c := c
-		go func() {
-			s.waitContainer(c, options.Attach)
-		}()
+		eg.Go(func() error {
+			return s.waitContainer(c, options.Attach)
+		})
 	}
-	return nil
+	return eg.Wait()
 }
 
-func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) {
+func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) error {
 	statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning)
 	name := getContainerNameWithoutProject(c)
 	select {
@@ -76,7 +119,8 @@ func (s *composeService) waitContainer(c moby.Container, listener compose.Contai
 			Service:   c.Labels[serviceLabel],
 			ExitCode:  int(status.StatusCode),
 		})
+		return nil
 	case err := <-errC:
-		logrus.Warnf("Unexpected API error for %s : %s", name, err.Error())
+		return err
 	}
 }

+ 4 - 4
local/compose/stop_test.go

@@ -38,7 +38,7 @@ func TestStopTimeout(t *testing.T) {
 	tested.apiClient = api
 
 	ctx := context.Background()
-	api.EXPECT().ContainerList(ctx, projectFilterListOpt()).Return(
+	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt()).Return(
 		[]moby.Container{
 			testContainer("service1", "123"),
 			testContainer("service1", "456"),
@@ -46,9 +46,9 @@ func TestStopTimeout(t *testing.T) {
 		}, nil)
 
 	timeout := time.Duration(2) * time.Second
-	api.EXPECT().ContainerStop(ctx, "123", &timeout).Return(nil)
-	api.EXPECT().ContainerStop(ctx, "456", &timeout).Return(nil)
-	api.EXPECT().ContainerStop(ctx, "789", &timeout).Return(nil)
+	api.EXPECT().ContainerStop(gomock.Any(), "123", &timeout).Return(nil)
+	api.EXPECT().ContainerStop(gomock.Any(), "456", &timeout).Return(nil)
+	api.EXPECT().ContainerStop(gomock.Any(), "789", &timeout).Return(nil)
 
 	err := tested.Stop(ctx, &types.Project{
 		Name: testProject,

+ 1 - 0
utils/hash.go

@@ -29,6 +29,7 @@ func ServiceHash(o types.ServiceConfig) (string, error) {
 	// remove the Build config when generating the service hash
 	o.Build = nil
 	o.PullPolicy = ""
+	o.Scale = 1
 	bytes, err := json.Marshal(o)
 	if err != nil {
 		return "", err