Просмотр исходного кода

improve container events watch robustness

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 4 лет назад
Родитель
Сommit
6f6ae071d6

+ 3 - 1
api/compose/api.go

@@ -379,7 +379,9 @@ type ContainerEvent struct {
 	Container string
 	Service   string
 	Line      string
-	ExitCode  int
+	// ContainerEventExit only
+	ExitCode   int
+	Restarting bool
 }
 
 const (

+ 9 - 6
api/compose/printer.go

@@ -62,19 +62,22 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
 	containers := map[string]struct{}{}
 	for {
 		event := <-p.queue
+		container := event.Container
 		switch event.Type {
 		case UserCancel:
 			aborting = true
 		case ContainerEventAttach:
-			if _, ok := containers[event.Container]; ok {
+			if _, ok := containers[container]; ok {
 				continue
 			}
-			containers[event.Container] = struct{}{}
-			p.consumer.Register(event.Container)
+			containers[container] = struct{}{}
+			p.consumer.Register(container)
 		case ContainerEventExit:
-			delete(containers, event.Container)
+			if !event.Restarting {
+				delete(containers, container)
+			}
 			if !aborting {
-				p.consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode))
+				p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
 			}
 			if cascadeStop {
 				if !aborting {
@@ -99,7 +102,7 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
 			}
 		case ContainerEventLog:
 			if !aborting {
-				p.consumer.Log(event.Container, event.Service, event.Line)
+				p.consumer.Log(container, event.Service, event.Line)
 			}
 		}
 	}

+ 77 - 70
local/compose/start.go

@@ -20,68 +20,30 @@ import (
 	"context"
 
 	"github.com/docker/compose-cli/api/compose"
-	convert "github.com/docker/compose-cli/local/moby"
 	"github.com/docker/compose-cli/utils"
 
 	"github.com/compose-spec/compose-go/types"
 	moby "github.com/docker/docker/api/types"
-	"github.com/docker/docker/api/types/container"
+	"github.com/pkg/errors"
 	"golang.org/x/sync/errgroup"
 )
 
 func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
+	listener := options.Attach
 	if len(options.Services) == 0 {
 		options.Services = project.ServiceNames()
 	}
 
-	var containers Containers
-	if options.Attach != nil {
-		attached, err := s.attach(ctx, project, options.Attach, options.Services)
+	eg, ctx := errgroup.WithContext(ctx)
+	if listener != nil {
+		attached, err := s.attach(ctx, project, listener, options.Services)
 		if err != nil {
 			return err
 		}
-		containers = attached
 
-		// Watch events to capture container restart and re-attach
-		go func() {
-			watched := map[string]struct{}{}
-			for _, c := range containers {
-				watched[c.ID] = struct{}{}
-			}
-			s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck
-				Services: options.Services,
-				Consumer: func(event compose.Event) error {
-					if event.Status == "start" {
-						inspect, err := s.apiClient.ContainerInspect(ctx, event.Container)
-						if err != nil {
-							return err
-						}
-
-						container := moby.Container{
-							ID:    event.Container,
-							Names: []string{inspect.Name},
-							State: convert.ContainerRunning,
-							Labels: map[string]string{
-								projectLabel: project.Name,
-								serviceLabel: event.Service,
-							},
-						}
-
-						// Just ignore errors when reattaching to already crashed containers
-						s.attachContainer(ctx, container, options.Attach, project) // nolint: errcheck
-
-						if _, ok := watched[inspect.ID]; !ok {
-							// a container has been added to the service, see --scale option
-							watched[inspect.ID] = struct{}{}
-							go func() {
-								s.waitContainer(container, options.Attach) // nolint: errcheck
-							}()
-						}
-					}
-					return nil
-				},
-			})
-		}()
+		eg.Go(func() error {
+			return s.watchContainers(project, options.Services, listener, attached)
+		})
 	}
 
 	err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
@@ -93,34 +55,79 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
 	if err != nil {
 		return err
 	}
+	return eg.Wait()
+}
 
-	if options.Attach == nil {
-		return nil
-	}
-
-	eg, ctx := errgroup.WithContext(ctx)
+// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
+func (s *composeService) watchContainers(project *types.Project, services []string, listener compose.ContainerEventListener, containers Containers) error {
+	watched := map[string]int{}
 	for _, c := range containers {
-		c := c
-		eg.Go(func() error {
-			return s.waitContainer(c, options.Attach)
-		})
+		watched[c.ID] = 0
 	}
-	return eg.Wait()
-}
 
-func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) error {
-	statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning)
-	name := getContainerNameWithoutProject(c)
-	select {
-	case status := <-statusC:
-		listener(compose.ContainerEvent{
-			Type:      compose.ContainerEventExit,
-			Container: name,
-			Service:   c.Labels[serviceLabel],
-			ExitCode:  int(status.StatusCode),
-		})
+	ctx, stop := context.WithCancel(context.Background())
+	err := s.Events(ctx, project.Name, compose.EventsOptions{
+		Services: services,
+		Consumer: func(event compose.Event) error {
+			inspected, err := s.apiClient.ContainerInspect(ctx, event.Container)
+			if err != nil {
+				return err
+			}
+			container := moby.Container{
+				ID:     inspected.ID,
+				Names:  []string{inspected.Name},
+				Labels: inspected.Config.Labels,
+			}
+			name := getContainerNameWithoutProject(container)
+
+			if event.Status == "die" {
+				restarted := watched[container.ID]
+				watched[container.ID] = restarted + 1
+				// Container terminated.
+				willRestart := inspected.HostConfig.RestartPolicy.MaximumRetryCount > restarted
+
+				listener(compose.ContainerEvent{
+					Type:       compose.ContainerEventExit,
+					Container:  name,
+					Service:    container.Labels[serviceLabel],
+					ExitCode:   inspected.State.ExitCode,
+					Restarting: willRestart,
+				})
+
+				if !willRestart {
+					// we're done with this one
+					delete(watched, container.ID)
+				}
+
+				if len(watched) == 0 {
+					// all project containers stopped, we're done
+					stop()
+				}
+				return nil
+			}
+
+			if event.Status == "start" {
+				count, ok := watched[container.ID]
+				mustAttach := ok && count > 0 // Container restarted, need to re-attach
+				if !ok {
+					// A new container has just been added to service by scale
+					watched[container.ID] = 0
+					mustAttach = true
+				}
+				if mustAttach {
+					// Container restarted, need to re-attach
+					err := s.attachContainer(ctx, container, listener, project)
+					if err != nil {
+						return err
+					}
+				}
+			}
+
+			return nil
+		},
+	})
+	if errors.Is(ctx.Err(), context.Canceled) {
 		return nil
-	case err := <-errC:
-		return err
 	}
+	return err
 }

+ 4 - 5
local/e2e/compose/compose_test.go

@@ -27,6 +27,7 @@ import (
 	"testing"
 	"time"
 
+	testify "github.com/stretchr/testify/assert"
 	"gotest.tools/v3/assert"
 	"gotest.tools/v3/icmd"
 
@@ -197,10 +198,10 @@ func TestAttachRestart(t *testing.T) {
 
 	c.WaitForCondition(func() (bool, string) {
 		debug := res.Combined()
-		return strings.Count(res.Stdout(), "another_1 exited with code 1") == 3, fmt.Sprintf("'another_1 exited with code 1' not found 3 times in : \n%s\n", debug)
+		return strings.Count(res.Stdout(), "failing_1 exited with code 1") == 3, fmt.Sprintf("'failing_1 exited with code 1' not found 3 times in : \n%s\n", debug)
 	}, 2*time.Minute, 2*time.Second)
 
-	assert.Equal(t, strings.Count(res.Stdout(), "another_1  | world"), 3, res.Combined())
+	assert.Equal(t, strings.Count(res.Stdout(), "failing_1  | world"), 3, res.Combined())
 }
 
 func TestInitContainer(t *testing.T) {
@@ -208,7 +209,5 @@ func TestInitContainer(t *testing.T) {
 
 	res := c.RunDockerOrExitError("compose", "--ansi=never", "--project-directory", "./fixtures/init-container", "up")
 	defer c.RunDockerOrExitError("compose", "-p", "init-container", "down")
-	output := res.Stdout()
-
-	assert.Assert(t, strings.Contains(output, "foo_1  | hello\nbar_1  | world"), res.Combined())
+	testify.Regexp(t, "foo_1  | hello(?m:.*)bar_1  | world", res.Stdout())
 }

+ 1 - 4
local/e2e/compose/fixtures/attach-restart/compose.yaml

@@ -1,8 +1,5 @@
 services:
-  simple:
-    image: alpine
-    command: sh -c "sleep infinity"
-  another:
+  failing:
     image: alpine
     command: sh -c "sleep 0.1 && echo world && /bin/false"
     deploy: