Browse Source

pull logs and events better than aggregate events from multiple channels

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 4 years ago
parent
commit
a4b003ecfa

+ 17 - 9
api/compose/api.go

@@ -65,10 +65,8 @@ type CreateOptions struct {
 
 // StartOptions group options of the Start API
 type StartOptions struct {
-	// Attach will attach to container and pipe stdout/stderr to LogConsumer
-	Attach LogConsumer
-	// Listener will get notified on container events
-	Listener chan ContainerExited
+	// Attach will attach to service containers and pipe stdout/stderr to channel
+	Attach chan ContainerEvent
 }
 
 // UpOptions group options of the Up API
@@ -185,11 +183,21 @@ type Stack struct {
 // LogConsumer is a callback to process log messages from services
 type LogConsumer interface {
 	Log(service, container, message string)
-	Status(service, container, message string)
+	Status(service, container, msg string)
 }
 
-// ContainerExited let us know a Container exited
-type ContainerExited struct {
-	Service string
-	Status  int
+// ContainerEvent notify an event has been collected on Source container implementing Service
+type ContainerEvent struct {
+	Type     int
+	Source   string
+	Service  string
+	Line     string
+	ExitCode int
 }
+
+const (
+	// ContainerEventLog is a ContainerEvent of type log. Line is set
+	ContainerEventLog = iota
+	// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
+	ContainerEventExit
+)

+ 20 - 6
cli/cmd/compose/start.go

@@ -18,14 +18,12 @@ package compose
 
 import (
 	"context"
-	"os"
-
-	"github.com/spf13/cobra"
 
 	"github.com/docker/compose-cli/api/client"
 	"github.com/docker/compose-cli/api/compose"
 	"github.com/docker/compose-cli/api/progress"
-	"github.com/docker/compose-cli/cli/formatter"
+
+	"github.com/spf13/cobra"
 )
 
 type startOptions struct {
@@ -67,7 +65,23 @@ func runStart(ctx context.Context, opts startOptions, services []string) error {
 		return err
 	}
 
-	return c.ComposeService().Start(ctx, project, compose.StartOptions{
-		Attach: formatter.NewLogConsumer(ctx, os.Stdout),
+	queue := make(chan compose.ContainerEvent)
+	printer := printer{
+		queue: queue,
+	}
+	err = c.ComposeService().Start(ctx, project, compose.StartOptions{
+		Attach: queue,
+	})
+	if err != nil {
+		return err
+	}
+
+	_, err = printer.run(ctx, false, func() error {
+		ctx := context.Background()
+		_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
+			return "", c.ComposeService().Stop(ctx, project)
+		})
+		return err
 	})
+	return err
 }

+ 48 - 36
cli/cmd/compose/up.go

@@ -18,11 +18,11 @@ package compose
 
 import (
 	"context"
-	"errors"
 	"fmt"
-	"github.com/sirupsen/logrus"
 	"os"
+	"os/signal"
 	"path/filepath"
+	"syscall"
 
 	"github.com/docker/compose-cli/api/client"
 	"github.com/docker/compose-cli/api/compose"
@@ -31,6 +31,7 @@ import (
 	"github.com/docker/compose-cli/cli/formatter"
 
 	"github.com/compose-spec/compose-go/types"
+	"github.com/sirupsen/logrus"
 	"github.com/spf13/cobra"
 )
 
@@ -151,47 +152,35 @@ func runCreateStart(ctx context.Context, opts upOptions, services []string) erro
 		return nil
 	}
 
-	ctx, cancel := context.WithCancel(ctx)
-	listener := make(chan compose.ContainerExited)
-	exitCode := make(chan int)
+	queue := make(chan compose.ContainerEvent)
+	printer := printer{
+		queue: queue,
+	}
+
+	stopFunc := func() error {
+		ctx := context.Background()
+		_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
+			return "", c.ComposeService().Stop(ctx, project)
+		})
+		return err
+	}
+	signalChan := make(chan os.Signal, 1)
+	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
 	go func() {
-		var aborting bool
-		for {
-			exit := <-listener
-			if opts.cascadeStop && !aborting {
-				aborting = true
-				cancel()
-				exitCode <- exit.Status
-			}
-		}
+		<-signalChan
+		fmt.Println("Gracefully stopping...")
+		stopFunc() // nolint:errcheck
 	}()
 
 	err = c.ComposeService().Start(ctx, project, compose.StartOptions{
-		Attach:   formatter.NewLogConsumer(ctx, os.Stdout),
-		Listener: listener,
+		Attach: queue,
 	})
-
-	if errors.Is(ctx.Err(), context.Canceled) {
-		select {
-		case exit := <-exitCode:
-			fmt.Println("Aborting on container exit...")
-			err = stop(c, project)
-			logrus.Error(exit)
-			// os.Exit(exit)
-		default:
-			// cancelled by user
-			fmt.Println("Gracefully stopping...")
-			err = stop(c, project)
-		}
+	if err != nil {
+		return err
 	}
-	return err
-}
 
-func stop(c *client.Client, project *types.Project) error {
-	ctx := context.Background()
-	_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
-		return "", c.ComposeService().Stop(ctx, project)
-	})
+	_, err = printer.run(ctx, opts.cascadeStop, stopFunc)
+	// FIXME os.Exit
 	return err
 }
 
@@ -235,3 +224,26 @@ func setup(ctx context.Context, opts composeOptions, services []string) (*client
 
 	return c, project, nil
 }
+
+type printer struct {
+	queue chan compose.ContainerEvent
+}
+
+func (p printer) run(ctx context.Context, cascadeStop bool, stopFn func() error) (int, error) { //nolint:unparam
+	consumer := formatter.NewLogConsumer(ctx, os.Stdout)
+	for {
+		event := <-p.queue
+		switch event.Type {
+		case compose.ContainerEventExit:
+			consumer.Status(event.Service, event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
+			if cascadeStop {
+				fmt.Println("Aborting on container exit...")
+				err := stopFn()
+				logrus.Error(event.ExitCode)
+				return event.ExitCode, err
+			}
+		case compose.ContainerEventLog:
+			consumer.Log(event.Service, event.Source, event.Line)
+		}
+	}
+}

+ 2 - 32
ecs/logs.go

@@ -20,43 +20,13 @@ import (
 	"context"
 
 	"github.com/docker/compose-cli/api/compose"
+	"github.com/docker/compose-cli/utils"
 )
 
 func (b *ecsAPIService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer, options compose.LogOptions) error {
 	if len(options.Services) > 0 {
-		consumer = filteredLogConsumer(consumer, options.Services)
+		consumer = utils.FilteredLogConsumer(consumer, options.Services)
 	}
 	err := b.aws.GetLogs(ctx, projectName, consumer.Log, options.Follow)
 	return err
 }
-
-func filteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer {
-	if len(services) == 0 {
-		return consumer
-	}
-	allowed := map[string]bool{}
-	for _, s := range services {
-		allowed[s] = true
-	}
-	return &allowListLogConsumer{
-		allowList: allowed,
-		delegate:  consumer,
-	}
-}
-
-type allowListLogConsumer struct {
-	allowList map[string]bool
-	delegate  compose.LogConsumer
-}
-
-func (a *allowListLogConsumer) Log(service, container, message string) {
-	if a.allowList[service] {
-		a.delegate.Log(service, container, message)
-	}
-}
-
-func (a *allowListLogConsumer) Status(service, container, message string) {
-	if a.allowList[service] {
-		a.delegate.Status(service, container, message)
-	}
-}

+ 2 - 3
local/compose/attach.go

@@ -24,14 +24,13 @@ import (
 
 	"github.com/docker/compose-cli/api/compose"
 	convert "github.com/docker/compose-cli/local/moby"
-	"github.com/docker/compose-cli/utils"
 
 	"github.com/compose-spec/compose-go/types"
 	moby "github.com/docker/docker/api/types"
 	"github.com/docker/docker/pkg/stdcopy"
 )
 
-func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.LogConsumer) (Containers, error) {
+func (s *composeService) attach(ctx context.Context, project *types.Project, consumer chan compose.ContainerEvent) (Containers, error) {
 	containers, err := s.getContainers(ctx, project)
 	if err != nil {
 		return nil, err
@@ -52,7 +51,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con
 	return containers, nil
 }
 
-func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error {
+func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer chan compose.ContainerEvent, project *types.Project) error {
 	serviceName := container.Labels[serviceLabel]
 	w := getWriter(serviceName, getContainerNameWithoutProject(container), consumer)
 

+ 33 - 1
local/compose/logs.go

@@ -17,6 +17,7 @@
 package compose
 
 import (
+	"bytes"
 	"context"
 	"io"
 
@@ -52,6 +53,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 	}
 	eg, ctx := errgroup.WithContext(ctx)
 	for _, c := range list {
+		c := c
 		service := c.Labels[serviceLabel]
 		if ignore(service) {
 			continue
@@ -73,7 +75,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 			if err != nil {
 				return err
 			}
-			w := utils.GetWriter(service, container.Name[1:], consumer)
+			w := utils.GetWriter(service, getContainerNameWithoutProject(c), consumer)
 			if container.Config.Tty {
 				_, err = io.Copy(w, r)
 			} else {
@@ -84,3 +86,33 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 	}
 	return eg.Wait()
 }
+
+type splitBuffer struct {
+	service   string
+	container string
+	consumer  chan compose.ContainerEvent
+}
+
+// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
+func getWriter(service, container string, events chan compose.ContainerEvent) io.Writer {
+	return splitBuffer{
+		service:   service,
+		container: container,
+		consumer:  events,
+	}
+}
+
+func (s splitBuffer) Write(b []byte) (n int, err error) {
+	split := bytes.Split(b, []byte{'\n'})
+	for _, line := range split {
+		if len(line) != 0 {
+			s.consumer <- compose.ContainerEvent{
+				Type:    compose.ContainerEventLog,
+				Service: s.service,
+				Source:  s.container,
+				Line:    string(line),
+			}
+		}
+	}
+	return len(b), nil
+}

+ 10 - 22
local/compose/start.go

@@ -18,13 +18,12 @@ package compose
 
 import (
 	"context"
-	"fmt"
 
 	"github.com/docker/compose-cli/api/compose"
 
 	"github.com/compose-spec/compose-go/types"
 	"github.com/docker/docker/api/types/container"
-	"golang.org/x/sync/errgroup"
+	"github.com/sirupsen/logrus"
 )
 
 func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
@@ -35,12 +34,6 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
 			return err
 		}
 		containers = c
-	} else {
-		c, err := s.getContainers(ctx, project)
-		if err != nil {
-			return err
-		}
-		containers = c
 	}
 
 	err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
@@ -54,26 +47,21 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
 		return nil
 	}
 
-	eg, ctx := errgroup.WithContext(ctx)
 	for _, c := range containers {
 		c := c
-		eg.Go(func() error {
-			statusC, errC := s.apiClient.ContainerWait(ctx, c.ID, container.WaitConditionNotRunning)
+		go func() {
+			statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning)
 			select {
 			case status := <-statusC:
-				service := c.Labels[serviceLabel]
-				options.Attach.Status(service, getCanonicalContainerName(c), fmt.Sprintf("exited with code %d", status.StatusCode))
-				if options.Listener != nil {
-					options.Listener <- compose.ContainerExited{
-						Service: service,
-						Status:  int(status.StatusCode),
-					}
+				options.Attach <- compose.ContainerEvent{
+					Type:     compose.ContainerEventExit,
+					Source:   getCanonicalContainerName(c),
+					ExitCode: int(status.StatusCode),
 				}
-				return nil
 			case err := <-errC:
-				return err
+				logrus.Warnf("Unexpected API error for %s : %s\n", getCanonicalContainerName(c), err.Error())
 			}
-		})
+		}()
 	}
-	return eg.Wait()
+	return nil
 }

+ 3 - 1
local/e2e/compose/cascade_stop_test.go

@@ -31,6 +31,8 @@ func TestCascadeStop(t *testing.T) {
 
 	res := c.RunDockerCmd("compose", "-f", "./fixtures/cascade-stop-test/compose.yaml", "--project-name", projectName, "up", "--abort-on-container-exit")
 	res.Assert(t, icmd.Expected{Out: `PING localhost (127.0.0.1)`})
-	res.Assert(t, icmd.Expected{Out: `ping_1 exited with code 0`})
+	res.Assert(t, icmd.Expected{Out: `/does_not_exist: No such file or directory`})
+	res.Assert(t, icmd.Expected{Out: `should_fail_1 exited with code 1`})
 	res.Assert(t, icmd.Expected{Out: `Aborting on container exit...`})
+	// FIXME res.Assert(t, icmd.Expected{ExitCode: 1})
 }

+ 4 - 1
local/e2e/compose/fixtures/cascade-stop-test/compose.yaml

@@ -1,4 +1,7 @@
 services:
+  should_fail:
+    image: busybox:1.27.2
+    command: ls /does_not_exist
   ping:
     image: busybox:1.27.2
-    command: ping localhost -c 1
+    command: ping localhost

+ 6 - 0
utils/logconsumer.go

@@ -58,6 +58,12 @@ func (a *allowListLogConsumer) Log(service, container, message string) {
 	}
 }
 
+func (a *allowListLogConsumer) Status(service, container, message string) {
+	if a.allowList[service] {
+		a.delegate.Status(service, container, message)
+	}
+}
+
 type splitBuffer struct {
 	service   string
 	container string