Procházet zdrojové kódy

register TTYWritter as an Event Processor

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof před 2 měsíci
rodič
revize
fd4f2f99cf
49 změnil soubory, kde provedl 408 přidání a 709 odebrání
  1. 14 1
      cmd/compose/compose.go
  2. 1 1
      cmd/compose/run.go
  3. 1 1
      cmd/compose/up.go
  4. 3 3
      pkg/compose/build.go
  5. 8 8
      pkg/compose/build_bake.go
  6. 3 3
      pkg/compose/build_buildkit.go
  7. 2 2
      pkg/compose/build_classic.go
  8. 4 4
      pkg/compose/commit.go
  9. 12 6
      pkg/compose/compose.go
  10. 24 22
      pkg/compose/convergence.go
  11. 25 34
      pkg/compose/convergence_test.go
  12. 3 3
      pkg/compose/cp.go
  13. 8 8
      pkg/compose/create.go
  14. 24 24
      pkg/compose/down.go
  15. 21 28
      pkg/compose/down_test.go
  16. 4 4
      pkg/compose/export.go
  17. 2 3
      pkg/compose/images_test.go
  18. 4 4
      pkg/compose/kill.go
  19. 6 8
      pkg/compose/kill_test.go
  20. 6 8
      pkg/compose/logs_test.go
  21. 7 7
      pkg/compose/model.go
  22. 4 4
      pkg/compose/pause.go
  23. 9 9
      pkg/compose/plugins.go
  24. 2 3
      pkg/compose/ps_test.go
  25. 4 4
      pkg/compose/publish.go
  26. 26 30
      pkg/compose/pull.go
  27. 9 9
      pkg/compose/push.go
  28. 3 3
      pkg/compose/remove.go
  29. 3 3
      pkg/compose/restart.go
  30. 1 1
      pkg/compose/run.go
  31. 1 1
      pkg/compose/scale.go
  32. 1 1
      pkg/compose/start.go
  33. 1 1
      pkg/compose/stop.go
  34. 3 4
      pkg/compose/stop_test.go
  35. 3 3
      pkg/compose/up.go
  36. 2 3
      pkg/compose/viz_test.go
  37. 2 2
      pkg/e2e/compose_run_test.go
  38. 5 21
      pkg/e2e/pull_test.go
  39. 12 0
      pkg/progress/event.go
  40. 9 28
      pkg/progress/json.go
  41. 0 29
      pkg/progress/json_test.go
  42. 0 76
      pkg/progress/mixed.go
  43. 0 39
      pkg/progress/noop.go
  44. 9 19
      pkg/progress/plain.go
  45. 53 0
      pkg/progress/progress.go
  46. 6 9
      pkg/progress/quiet.go
  47. 58 45
      pkg/progress/tty.go
  48. 0 149
      pkg/progress/writer.go
  49. 0 31
      pkg/progress/writer_test.go

+ 14 - 1
cmd/compose/compose.go

@@ -508,29 +508,42 @@ func RootCommand(dockerCli command.Cli, backendOptions *BackendOptions) *cobra.C
 				ui.Mode = ui.ModeTTY
 				ui.Mode = ui.ModeTTY
 			}
 			}
 
 
+			var ep ui.EventProcessor
 			switch opts.Progress {
 			switch opts.Progress {
 			case "", ui.ModeAuto:
 			case "", ui.ModeAuto:
-				if ansi == "never" {
+				switch {
+				case ansi == "never":
 					ui.Mode = ui.ModePlain
 					ui.Mode = ui.ModePlain
+					ep = ui.NewPlainWriter(dockerCli.Err())
+				case dockerCli.Out().IsTerminal():
+					ep = ui.NewTTYWriter(dockerCli.Err())
+				default:
+					ep = ui.NewPlainWriter(dockerCli.Err())
 				}
 				}
 			case ui.ModeTTY:
 			case ui.ModeTTY:
 				if ansi == "never" {
 				if ansi == "never" {
 					return fmt.Errorf("can't use --progress tty while ANSI support is disabled")
 					return fmt.Errorf("can't use --progress tty while ANSI support is disabled")
 				}
 				}
 				ui.Mode = ui.ModeTTY
 				ui.Mode = ui.ModeTTY
+				ep = ui.NewTTYWriter(dockerCli.Err())
+
 			case ui.ModePlain:
 			case ui.ModePlain:
 				if ansi == "always" {
 				if ansi == "always" {
 					return fmt.Errorf("can't use --progress plain while ANSI support is forced")
 					return fmt.Errorf("can't use --progress plain while ANSI support is forced")
 				}
 				}
 				ui.Mode = ui.ModePlain
 				ui.Mode = ui.ModePlain
+				ep = ui.NewPlainWriter(dockerCli.Err())
 			case ui.ModeQuiet, "none":
 			case ui.ModeQuiet, "none":
 				ui.Mode = ui.ModeQuiet
 				ui.Mode = ui.ModeQuiet
+				ep = ui.NewQuiedWriter()
 			case ui.ModeJSON:
 			case ui.ModeJSON:
 				ui.Mode = ui.ModeJSON
 				ui.Mode = ui.ModeJSON
 				logrus.SetFormatter(&logrus.JSONFormatter{})
 				logrus.SetFormatter(&logrus.JSONFormatter{})
+				ep = ui.NewJSONWriter(dockerCli.Err())
 			default:
 			default:
 				return fmt.Errorf("unsupported --progress value %q", opts.Progress)
 				return fmt.Errorf("unsupported --progress value %q", opts.Progress)
 			}
 			}
+			backendOptions.Add(compose.WithEventProcessor(ep))
 
 
 			// (4) options validation / normalization
 			// (4) options validation / normalization
 			if opts.WorkDir != "" {
 			if opts.WorkDir != "" {

+ 1 - 1
cmd/compose/run.go

@@ -25,6 +25,7 @@ import (
 	"github.com/compose-spec/compose-go/v2/dotenv"
 	"github.com/compose-spec/compose-go/v2/dotenv"
 	"github.com/compose-spec/compose-go/v2/format"
 	"github.com/compose-spec/compose-go/v2/format"
 	"github.com/docker/compose/v2/pkg/compose"
 	"github.com/docker/compose/v2/pkg/compose"
+	"github.com/docker/compose/v2/pkg/progress"
 	xprogress "github.com/moby/buildkit/util/progress/progressui"
 	xprogress "github.com/moby/buildkit/util/progress/progressui"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus"
 
 
@@ -38,7 +39,6 @@ import (
 
 
 	"github.com/docker/cli/cli"
 	"github.com/docker/cli/cli"
 	"github.com/docker/compose/v2/pkg/api"
 	"github.com/docker/compose/v2/pkg/api"
-	"github.com/docker/compose/v2/pkg/progress"
 	"github.com/docker/compose/v2/pkg/utils"
 	"github.com/docker/compose/v2/pkg/utils"
 )
 )
 
 

+ 1 - 1
cmd/compose/up.go

@@ -27,6 +27,7 @@ import (
 	"github.com/compose-spec/compose-go/v2/types"
 	"github.com/compose-spec/compose-go/v2/types"
 	"github.com/docker/cli/cli/command"
 	"github.com/docker/cli/cli/command"
 	"github.com/docker/compose/v2/pkg/compose"
 	"github.com/docker/compose/v2/pkg/compose"
+	ui "github.com/docker/compose/v2/pkg/progress"
 	xprogress "github.com/moby/buildkit/util/progress/progressui"
 	xprogress "github.com/moby/buildkit/util/progress/progressui"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus"
 	"github.com/spf13/cobra"
 	"github.com/spf13/cobra"
@@ -34,7 +35,6 @@ import (
 
 
 	"github.com/docker/compose/v2/cmd/formatter"
 	"github.com/docker/compose/v2/cmd/formatter"
 	"github.com/docker/compose/v2/pkg/api"
 	"github.com/docker/compose/v2/pkg/api"
-	ui "github.com/docker/compose/v2/pkg/progress"
 	"github.com/docker/compose/v2/pkg/utils"
 	"github.com/docker/compose/v2/pkg/utils"
 )
 )
 
 

+ 3 - 3
pkg/compose/build.go

@@ -64,7 +64,7 @@ func (s *composeService) Build(ctx context.Context, project *types.Project, opti
 				_, err := s.build(ctx, project, options, nil)
 				_, err := s.build(ctx, project, options, nil)
 				return err
 				return err
 			})(ctx)
 			})(ctx)
-	}, s.stdinfo(), "build")
+	}, "build", s.events)
 }
 }
 
 
 //nolint:gocyclo
 //nolint:gocyclo
@@ -226,7 +226,7 @@ func (s *composeService) build(ctx context.Context, project *types.Project, opti
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
-		s.events(ctx, progress.BuildingEvent("Image "+buildOptions.Tags[0]))
+		s.events.On(progress.BuildingEvent("Image " + buildOptions.Tags[0]))
 
 
 		trace.SpanFromContext(ctx).SetAttributes(attribute.String("builder", "buildkit"))
 		trace.SpanFromContext(ctx).SetAttributes(attribute.String("builder", "buildkit"))
 		digest, err := s.doBuildBuildkit(ctx, name, buildOptions, w, nodes)
 		digest, err := s.doBuildBuildkit(ctx, name, buildOptions, w, nodes)
@@ -256,7 +256,7 @@ func (s *composeService) build(ctx context.Context, project *types.Project, opti
 			service := project.Services[names[i]]
 			service := project.Services[names[i]]
 			imageRef := api.GetImageNameOrDefault(service, project.Name)
 			imageRef := api.GetImageNameOrDefault(service, project.Name)
 			imageIDs[imageRef] = imageDigest
 			imageIDs[imageRef] = imageDigest
-			s.events(ctx, progress.BuiltEvent("Image "+imageRef))
+			s.events.On(progress.BuiltEvent("Image " + imageRef))
 		}
 		}
 	}
 	}
 	return imageIDs, err
 	return imageIDs, err

+ 8 - 8
pkg/compose/build_bake.go

@@ -340,7 +340,7 @@ func (s *composeService) doBuildBake(ctx context.Context, project *types.Project
 	logrus.Debugf("Executing bake with args: %v", args)
 	logrus.Debugf("Executing bake with args: %v", args)
 
 
 	if s.dryRun {
 	if s.dryRun {
-		return s.dryRunBake(ctx, cfg), nil
+		return s.dryRunBake(cfg), nil
 	}
 	}
 	cmd := exec.CommandContext(ctx, buildx.Path, args...)
 	cmd := exec.CommandContext(ctx, buildx.Path, args...)
 
 
@@ -426,7 +426,7 @@ func (s *composeService) doBuildBake(ctx context.Context, project *types.Project
 			return nil, fmt.Errorf("build result not found in Bake metadata for service %s", name)
 			return nil, fmt.Errorf("build result not found in Bake metadata for service %s", name)
 		}
 		}
 		results[image] = built.Digest
 		results[image] = built.Digest
-		s.events(ctx, progress.BuiltEvent("Image "+image))
+		s.events.On(progress.BuiltEvent("Image " + image))
 	}
 	}
 	return results, nil
 	return results, nil
 }
 }
@@ -564,26 +564,26 @@ func dockerFilePath(ctxName string, dockerfile string) string {
 	return dockerfile
 	return dockerfile
 }
 }
 
 
-func (s composeService) dryRunBake(ctx context.Context, cfg bakeConfig) map[string]string {
+func (s composeService) dryRunBake(cfg bakeConfig) map[string]string {
 	bakeResponse := map[string]string{}
 	bakeResponse := map[string]string{}
 	for name, target := range cfg.Targets {
 	for name, target := range cfg.Targets {
 		dryRunUUID := fmt.Sprintf("dryRun-%x", sha1.Sum([]byte(name)))
 		dryRunUUID := fmt.Sprintf("dryRun-%x", sha1.Sum([]byte(name)))
-		s.displayDryRunBuildEvent(ctx, name, dryRunUUID, target.Tags[0])
+		s.displayDryRunBuildEvent(name, dryRunUUID, target.Tags[0])
 		bakeResponse[name] = dryRunUUID
 		bakeResponse[name] = dryRunUUID
 	}
 	}
 	for name := range bakeResponse {
 	for name := range bakeResponse {
-		s.events(ctx, progress.BuiltEvent(name))
+		s.events.On(progress.BuiltEvent(name))
 	}
 	}
 	return bakeResponse
 	return bakeResponse
 }
 }
 
 
-func (s composeService) displayDryRunBuildEvent(ctx context.Context, name, dryRunUUID, tag string) {
-	s.events(ctx, progress.Event{
+func (s composeService) displayDryRunBuildEvent(name, dryRunUUID, tag string) {
+	s.events.On(progress.Event{
 		ID:     name + " ==>",
 		ID:     name + " ==>",
 		Status: progress.Done,
 		Status: progress.Done,
 		Text:   fmt.Sprintf("==> writing image %s", dryRunUUID),
 		Text:   fmt.Sprintf("==> writing image %s", dryRunUUID),
 	})
 	})
-	s.events(ctx, progress.Event{
+	s.events.On(progress.Event{
 		ID:     name + " ==> ==>",
 		ID:     name + " ==> ==>",
 		Status: progress.Done,
 		Status: progress.Done,
 		Text:   fmt.Sprintf(`naming to %s`, tag),
 		Text:   fmt.Sprintf(`naming to %s`, tag),

+ 3 - 3
pkg/compose/build_buildkit.go

@@ -39,7 +39,7 @@ func (s *composeService) doBuildBuildkit(ctx context.Context, service string, op
 		err      error
 		err      error
 	)
 	)
 	if s.dryRun {
 	if s.dryRun {
-		response = s.dryRunBuildResponse(ctx, service, opts)
+		response = s.dryRunBuildResponse(service, opts)
 	} else {
 	} else {
 		response, err = build.Build(ctx, nodes,
 		response, err = build.Build(ctx, nodes,
 			map[string]build.Options{service: opts},
 			map[string]build.Options{service: opts},
@@ -65,10 +65,10 @@ func (s *composeService) doBuildBuildkit(ctx context.Context, service string, op
 	return "", fmt.Errorf("buildkit response is missing expected result for %s", service)
 	return "", fmt.Errorf("buildkit response is missing expected result for %s", service)
 }
 }
 
 
-func (s composeService) dryRunBuildResponse(ctx context.Context, name string, options build.Options) map[string]*client.SolveResponse {
+func (s composeService) dryRunBuildResponse(name string, options build.Options) map[string]*client.SolveResponse {
 	buildResponse := map[string]*client.SolveResponse{}
 	buildResponse := map[string]*client.SolveResponse{}
 	dryRunUUID := fmt.Sprintf("dryRun-%x", sha1.Sum([]byte(name)))
 	dryRunUUID := fmt.Sprintf("dryRun-%x", sha1.Sum([]byte(name)))
-	s.displayDryRunBuildEvent(ctx, name, dryRunUUID, options.Tags[0])
+	s.displayDryRunBuildEvent(name, dryRunUUID, options.Tags[0])
 	buildResponse[name] = &client.SolveResponse{ExporterResponse: map[string]string{
 	buildResponse[name] = &client.SolveResponse{ExporterResponse: map[string]string{
 		"containerimage.digest": dryRunUUID,
 		"containerimage.digest": dryRunUUID,
 	}}
 	}}

+ 2 - 2
pkg/compose/build_classic.go

@@ -184,7 +184,7 @@ func (s *composeService) doBuildClassic(ctx context.Context, project *types.Proj
 
 
 	ctx, cancel := context.WithCancel(ctx)
 	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 	defer cancel()
-	s.events(ctx, progress2.BuildingEvent("Image "+imageName))
+	s.events.On(progress2.BuildingEvent("Image " + imageName))
 	response, err := s.apiClient().ImageBuild(ctx, body, buildOpts)
 	response, err := s.apiClient().ImageBuild(ctx, body, buildOpts)
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
@@ -213,7 +213,7 @@ func (s *composeService) doBuildClassic(ctx context.Context, project *types.Proj
 		}
 		}
 		return "", err
 		return "", err
 	}
 	}
-	s.events(ctx, progress2.BuiltEvent("Image "+imageName))
+	s.events.On(progress2.BuiltEvent("Image " + imageName))
 	return imageID, nil
 	return imageID, nil
 }
 }
 
 

+ 4 - 4
pkg/compose/commit.go

@@ -29,7 +29,7 @@ import (
 func (s *composeService) Commit(ctx context.Context, projectName string, options api.CommitOptions) error {
 func (s *composeService) Commit(ctx context.Context, projectName string, options api.CommitOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.commit(ctx, projectName, options)
 		return s.commit(ctx, projectName, options)
-	}, s.stdinfo(), "commit")
+	}, "commit", s.events)
 }
 }
 
 
 func (s *composeService) commit(ctx context.Context, projectName string, options api.CommitOptions) error {
 func (s *composeService) commit(ctx context.Context, projectName string, options api.CommitOptions) error {
@@ -43,7 +43,7 @@ func (s *composeService) commit(ctx context.Context, projectName string, options
 	name := getCanonicalContainerName(ctr)
 	name := getCanonicalContainerName(ctr)
 	msg := fmt.Sprintf("Commit %s", name)
 	msg := fmt.Sprintf("Commit %s", name)
 
 
-	s.events(ctx, progress.Event{
+	s.events.On(progress.Event{
 		ID:         name,
 		ID:         name,
 		Text:       msg,
 		Text:       msg,
 		Status:     progress.Working,
 		Status:     progress.Working,
@@ -51,7 +51,7 @@ func (s *composeService) commit(ctx context.Context, projectName string, options
 	})
 	})
 
 
 	if s.dryRun {
 	if s.dryRun {
-		s.events(ctx, progress.Event{
+		s.events.On(progress.Event{
 			ID:         name,
 			ID:         name,
 			Text:       msg,
 			Text:       msg,
 			Status:     progress.Done,
 			Status:     progress.Done,
@@ -72,7 +72,7 @@ func (s *composeService) commit(ctx context.Context, projectName string, options
 		return err
 		return err
 	}
 	}
 
 
-	s.events(ctx, progress.Event{
+	s.events.On(progress.Event{
 		ID:         name,
 		ID:         name,
 		Text:       msg,
 		Text:       msg,
 		Status:     progress.Done,
 		Status:     progress.Done,

+ 12 - 6
pkg/compose/compose.go

@@ -82,10 +82,6 @@ func NewComposeService(dockerCli command.Cli, options ...Option) (api.Compose, e
 		clock:          clockwork.NewRealClock(),
 		clock:          clockwork.NewRealClock(),
 		maxConcurrency: -1,
 		maxConcurrency: -1,
 		dryRun:         false,
 		dryRun:         false,
-		events: func(ctx context.Context, e ...progress.Event) {
-			// FIXME(ndeloof) temporary during refactoring
-			progress.ContextWriter(ctx).Events(e)
-		},
 	}
 	}
 	for _, option := range options {
 	for _, option := range options {
 		if err := option(s); err != nil {
 		if err := option(s); err != nil {
@@ -99,6 +95,9 @@ func NewComposeService(dockerCli command.Cli, options ...Option) (api.Compose, e
 			return defaultValue, nil
 			return defaultValue, nil
 		}
 		}
 	}
 	}
+	if s.events == nil {
+		s.events = progress.NewQuiedWriter()
+	}
 
 
 	// If custom streams were provided, wrap the Docker CLI to use them
 	// If custom streams were provided, wrap the Docker CLI to use them
 	if s.outStream != nil || s.errStream != nil || s.inStream != nil {
 	if s.outStream != nil || s.errStream != nil || s.inStream != nil {
@@ -196,14 +195,21 @@ func WithDryRun(s *composeService) error {
 
 
 type Prompt func(message string, defaultValue bool) (bool, error)
 type Prompt func(message string, defaultValue bool) (bool, error)
 
 
-type EventBus func(ctx context.Context, e ...progress.Event)
+// WithEventProcessor configure component to get notified on Compose operation and progress events.
+// Typically used to configure a progress UI
+func WithEventProcessor(bus progress.EventProcessor) Option {
+	return func(s *composeService) error {
+		s.events = bus
+		return nil
+	}
+}
 
 
 type composeService struct {
 type composeService struct {
 	dockerCli command.Cli
 	dockerCli command.Cli
 	// prompt is used to interact with user and confirm actions
 	// prompt is used to interact with user and confirm actions
 	prompt Prompt
 	prompt Prompt
 	// eventBus collects tasks execution events
 	// eventBus collects tasks execution events
-	events EventBus
+	events progress.EventProcessor
 
 
 	// Optional overrides for specific components (for SDK users)
 	// Optional overrides for specific components (for SDK users)
 	outStream   io.Writer
 	outStream   io.Writer

+ 24 - 22
pkg/compose/convergence.go

@@ -187,7 +187,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
 		name := getContainerProgressName(ctr)
 		name := getContainerProgressName(ctr)
 		switch ctr.State {
 		switch ctr.State {
 		case container.StateRunning:
 		case container.StateRunning:
-			c.compose.events(ctx, progress.RunningEvent(name))
+			c.compose.events.On(progress.RunningEvent(name))
 		case container.StateCreated:
 		case container.StateCreated:
 		case container.StateRestarting:
 		case container.StateRestarting:
 		case container.StateExited:
 		case container.StateExited:
@@ -461,7 +461,7 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 		}
 		}
 
 
 		waitingFor := containers.filter(isService(dep), isNotOneOff)
 		waitingFor := containers.filter(isService(dep), isNotOneOff)
-		s.events(ctx, containerEvents(waitingFor, progress.Waiting)...)
+		s.events.On(containerEvents(waitingFor, progress.Waiting)...)
 		if len(waitingFor) == 0 {
 		if len(waitingFor) == 0 {
 			if config.Required {
 			if config.Required {
 				return fmt.Errorf("%s is missing dependency %s", dependant, dep)
 				return fmt.Errorf("%s is missing dependency %s", dependant, dep)
@@ -484,7 +484,7 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 					healthy, err := s.isServiceHealthy(ctx, waitingFor, true)
 					healthy, err := s.isServiceHealthy(ctx, waitingFor, true)
 					if err != nil {
 					if err != nil {
 						if !config.Required {
 						if !config.Required {
-							s.events(ctx, containerReasonEvents(waitingFor, progress.SkippedEvent,
+							s.events.On(containerReasonEvents(waitingFor, progress.SkippedEvent,
 								fmt.Sprintf("optional dependency %q is not running or is unhealthy", dep))...)
 								fmt.Sprintf("optional dependency %q is not running or is unhealthy", dep))...)
 							logrus.Warnf("optional dependency %q is not running or is unhealthy: %s", dep, err.Error())
 							logrus.Warnf("optional dependency %q is not running or is unhealthy: %s", dep, err.Error())
 							return nil
 							return nil
@@ -492,23 +492,23 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 						return err
 						return err
 					}
 					}
 					if healthy {
 					if healthy {
-						s.events(ctx, containerEvents(waitingFor, progress.Healthy)...)
+						s.events.On(containerEvents(waitingFor, progress.Healthy)...)
 						return nil
 						return nil
 					}
 					}
 				case types.ServiceConditionHealthy:
 				case types.ServiceConditionHealthy:
 					healthy, err := s.isServiceHealthy(ctx, waitingFor, false)
 					healthy, err := s.isServiceHealthy(ctx, waitingFor, false)
 					if err != nil {
 					if err != nil {
 						if !config.Required {
 						if !config.Required {
-							s.events(ctx, containerReasonEvents(waitingFor, progress.SkippedEvent,
+							s.events.On(containerReasonEvents(waitingFor, progress.SkippedEvent,
 								fmt.Sprintf("optional dependency %q failed to start", dep))...)
 								fmt.Sprintf("optional dependency %q failed to start", dep))...)
 							logrus.Warnf("optional dependency %q failed to start: %s", dep, err.Error())
 							logrus.Warnf("optional dependency %q failed to start: %s", dep, err.Error())
 							return nil
 							return nil
 						}
 						}
-						s.events(ctx, containerEvents(waitingFor, progress.ErrorEvent)...)
+						s.events.On(containerEvents(waitingFor, progress.ErrorEvent)...)
 						return fmt.Errorf("dependency failed to start: %w", err)
 						return fmt.Errorf("dependency failed to start: %w", err)
 					}
 					}
 					if healthy {
 					if healthy {
-						s.events(ctx, containerEvents(waitingFor, progress.Healthy)...)
+						s.events.On(containerEvents(waitingFor, progress.Healthy)...)
 						return nil
 						return nil
 					}
 					}
 				case types.ServiceConditionCompletedSuccessfully:
 				case types.ServiceConditionCompletedSuccessfully:
@@ -518,21 +518,21 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 					}
 					}
 					if exited {
 					if exited {
 						if code == 0 {
 						if code == 0 {
-							s.events(ctx, containerEvents(waitingFor, progress.Exited)...)
+							s.events.On(containerEvents(waitingFor, progress.Exited)...)
 							return nil
 							return nil
 						}
 						}
 
 
 						messageSuffix := fmt.Sprintf("%q didn't complete successfully: exit %d", dep, code)
 						messageSuffix := fmt.Sprintf("%q didn't complete successfully: exit %d", dep, code)
 						if !config.Required {
 						if !config.Required {
 							// optional -> mark as skipped & don't propagate error
 							// optional -> mark as skipped & don't propagate error
-							s.events(ctx, containerReasonEvents(waitingFor, progress.SkippedEvent,
+							s.events.On(containerReasonEvents(waitingFor, progress.SkippedEvent,
 								fmt.Sprintf("optional dependency %s", messageSuffix))...)
 								fmt.Sprintf("optional dependency %s", messageSuffix))...)
 							logrus.Warnf("optional dependency %s", messageSuffix)
 							logrus.Warnf("optional dependency %s", messageSuffix)
 							return nil
 							return nil
 						}
 						}
 
 
 						msg := fmt.Sprintf("service %s", messageSuffix)
 						msg := fmt.Sprintf("service %s", messageSuffix)
-						s.events(ctx, containerReasonEvents(waitingFor, progress.ErrorMessageEvent, msg)...)
+						s.events.On(containerReasonEvents(waitingFor, progress.ErrorMessageEvent, msg)...)
 						return errors.New(msg)
 						return errors.New(msg)
 					}
 					}
 				default:
 				default:
@@ -595,11 +595,11 @@ func (s *composeService) createContainer(ctx context.Context, project *types.Pro
 	name string, number int, opts createOptions,
 	name string, number int, opts createOptions,
 ) (ctr container.Summary, err error) {
 ) (ctr container.Summary, err error) {
 	eventName := "Container " + name
 	eventName := "Container " + name
-	s.events(ctx, progress.CreatingEvent(eventName))
+	s.events.On(progress.CreatingEvent(eventName))
 	ctr, err = s.createMobyContainer(ctx, project, service, name, number, nil, opts)
 	ctr, err = s.createMobyContainer(ctx, project, service, name, number, nil, opts)
 	if err != nil {
 	if err != nil {
 		if ctx.Err() == nil {
 		if ctx.Err() == nil {
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:         eventName,
 				ID:         eventName,
 				Status:     progress.Error,
 				Status:     progress.Error,
 				StatusText: err.Error(),
 				StatusText: err.Error(),
@@ -607,7 +607,7 @@ func (s *composeService) createContainer(ctx context.Context, project *types.Pro
 		}
 		}
 		return
 		return
 	}
 	}
-	s.events(ctx, progress.CreatedEvent(eventName))
+	s.events.On(progress.CreatedEvent(eventName))
 	return
 	return
 }
 }
 
 
@@ -615,10 +615,10 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
 	replaced container.Summary, inherit bool, timeout *time.Duration,
 	replaced container.Summary, inherit bool, timeout *time.Duration,
 ) (created container.Summary, err error) {
 ) (created container.Summary, err error) {
 	eventName := getContainerProgressName(replaced)
 	eventName := getContainerProgressName(replaced)
-	s.events(ctx, progress.NewEvent(eventName, progress.Working, "Recreate"))
+	s.events.On(progress.NewEvent(eventName, progress.Working, "Recreate"))
 	defer func() {
 	defer func() {
 		if err != nil && ctx.Err() == nil {
 		if err != nil && ctx.Err() == nil {
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:         eventName,
 				ID:         eventName,
 				Status:     progress.Error,
 				Status:     progress.Error,
 				StatusText: err.Error(),
 				StatusText: err.Error(),
@@ -669,7 +669,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
 		return created, err
 		return created, err
 	}
 	}
 
 
-	s.events(ctx, progress.NewEvent(eventName, progress.Done, "Recreated"))
+	s.events.On(progress.NewEvent(eventName, progress.Done, "Recreated"))
 	return created, err
 	return created, err
 }
 }
 
 
@@ -677,18 +677,20 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
 var startMx sync.Mutex
 var startMx sync.Mutex
 
 
 func (s *composeService) startContainer(ctx context.Context, ctr container.Summary) error {
 func (s *composeService) startContainer(ctx context.Context, ctr container.Summary) error {
-	s.events(ctx, progress.NewEvent(getContainerProgressName(ctr), progress.Working, "Restart"))
+	s.events.On(progress.NewEvent(getContainerProgressName(ctr), progress.Working, "Restart"))
 	startMx.Lock()
 	startMx.Lock()
 	defer startMx.Unlock()
 	defer startMx.Unlock()
 	err := s.apiClient().ContainerStart(ctx, ctr.ID, container.StartOptions{})
 	err := s.apiClient().ContainerStart(ctx, ctr.ID, container.StartOptions{})
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	s.events(ctx, progress.NewEvent(getContainerProgressName(ctr), progress.Done, "Restarted"))
+	s.events.On(progress.NewEvent(getContainerProgressName(ctr), progress.Done, "Restarted"))
 	return nil
 	return nil
 }
 }
 
 
-func (s *composeService) createMobyContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int, inherit *container.Summary, opts createOptions, ) (container.Summary, error) {
+func (s *composeService) createMobyContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
+	name string, number int, inherit *container.Summary, opts createOptions,
+) (container.Summary, error) {
 	var created container.Summary
 	var created container.Summary
 	cfgs, err := s.getCreateConfigs(ctx, project, service, number, inherit, opts)
 	cfgs, err := s.getCreateConfigs(ctx, project, service, number, inherit, opts)
 	if err != nil {
 	if err != nil {
@@ -713,7 +715,7 @@ func (s *composeService) createMobyContainer(ctx context.Context, project *types
 		return created, err
 		return created, err
 	}
 	}
 	for _, warning := range response.Warnings {
 	for _, warning := range response.Warnings {
-		s.events(ctx, progress.Event{
+		s.events.On(progress.Event{
 			ID:     service.Name,
 			ID:     service.Name,
 			Status: progress.Warning,
 			Status: progress.Warning,
 			Text:   warning,
 			Text:   warning,
@@ -900,7 +902,7 @@ func (s *composeService) startService(ctx context.Context,
 		}
 		}
 
 
 		eventName := getContainerProgressName(ctr)
 		eventName := getContainerProgressName(ctr)
-		s.events(ctx, progress.StartingEvent(eventName))
+		s.events.On(progress.StartingEvent(eventName))
 		err = s.apiClient().ContainerStart(ctx, ctr.ID, container.StartOptions{})
 		err = s.apiClient().ContainerStart(ctx, ctr.ID, container.StartOptions{})
 		if err != nil {
 		if err != nil {
 			return err
 			return err
@@ -913,7 +915,7 @@ func (s *composeService) startService(ctx context.Context,
 			}
 			}
 		}
 		}
 
 
-		s.events(ctx, progress.StartedEvent(eventName))
+		s.events.On(progress.StartedEvent(eventName))
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 25 - 34
pkg/compose/convergence_test.go

@@ -35,7 +35,6 @@ import (
 
 
 	"github.com/docker/compose/v2/pkg/api"
 	"github.com/docker/compose/v2/pkg/api"
 	"github.com/docker/compose/v2/pkg/mocks"
 	"github.com/docker/compose/v2/pkg/mocks"
-	"github.com/docker/compose/v2/pkg/progress"
 )
 )
 
 
 func TestContainerName(t *testing.T) {
 func TestContainerName(t *testing.T) {
@@ -87,9 +86,8 @@ func TestServiceLinks(t *testing.T) {
 
 
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
-		tested := composeService{
-			dockerCli: cli,
-		}
+		tested, err := NewComposeService(cli)
+		assert.NilError(t, err)
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 
 
 		s.Links = []string{"db"}
 		s.Links = []string{"db"}
@@ -97,7 +95,7 @@ func TestServiceLinks(t *testing.T) {
 		c := testContainer("db", dbContainerName, false)
 		c := testContainer("db", dbContainerName, false)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
 
 
-		links, err := tested.getLinks(context.Background(), testProject, s, 1)
+		links, err := tested.(*composeService).getLinks(context.Background(), testProject, s, 1)
 		assert.NilError(t, err)
 		assert.NilError(t, err)
 
 
 		assert.Equal(t, len(links), 3)
 		assert.Equal(t, len(links), 3)
@@ -111,9 +109,8 @@ func TestServiceLinks(t *testing.T) {
 		defer mockCtrl.Finish()
 		defer mockCtrl.Finish()
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
-		tested := composeService{
-			dockerCli: cli,
-		}
+		tested, err := NewComposeService(cli)
+		assert.NilError(t, err)
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 
 
 		s.Links = []string{"db:db"}
 		s.Links = []string{"db:db"}
@@ -121,7 +118,7 @@ func TestServiceLinks(t *testing.T) {
 		c := testContainer("db", dbContainerName, false)
 		c := testContainer("db", dbContainerName, false)
 
 
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
-		links, err := tested.getLinks(context.Background(), testProject, s, 1)
+		links, err := tested.(*composeService).getLinks(context.Background(), testProject, s, 1)
 		assert.NilError(t, err)
 		assert.NilError(t, err)
 
 
 		assert.Equal(t, len(links), 3)
 		assert.Equal(t, len(links), 3)
@@ -135,9 +132,8 @@ func TestServiceLinks(t *testing.T) {
 		defer mockCtrl.Finish()
 		defer mockCtrl.Finish()
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
-		tested := composeService{
-			dockerCli: cli,
-		}
+		tested, err := NewComposeService(cli)
+		assert.NilError(t, err)
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 
 
 		s.Links = []string{"db:dbname"}
 		s.Links = []string{"db:dbname"}
@@ -145,7 +141,7 @@ func TestServiceLinks(t *testing.T) {
 		c := testContainer("db", dbContainerName, false)
 		c := testContainer("db", dbContainerName, false)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
 
 
-		links, err := tested.getLinks(context.Background(), testProject, s, 1)
+		links, err := tested.(*composeService).getLinks(context.Background(), testProject, s, 1)
 		assert.NilError(t, err)
 		assert.NilError(t, err)
 
 
 		assert.Equal(t, len(links), 3)
 		assert.Equal(t, len(links), 3)
@@ -159,9 +155,8 @@ func TestServiceLinks(t *testing.T) {
 		defer mockCtrl.Finish()
 		defer mockCtrl.Finish()
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
-		tested := composeService{
-			dockerCli: cli,
-		}
+		tested, err := NewComposeService(cli)
+		assert.NilError(t, err)
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 
 
 		s.Links = []string{"db:dbname"}
 		s.Links = []string{"db:dbname"}
@@ -170,7 +165,7 @@ func TestServiceLinks(t *testing.T) {
 		c := testContainer("db", dbContainerName, false)
 		c := testContainer("db", dbContainerName, false)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptions).Return([]container.Summary{c}, nil)
 
 
-		links, err := tested.getLinks(context.Background(), testProject, s, 1)
+		links, err := tested.(*composeService).getLinks(context.Background(), testProject, s, 1)
 		assert.NilError(t, err)
 		assert.NilError(t, err)
 
 
 		assert.Equal(t, len(links), 4)
 		assert.Equal(t, len(links), 4)
@@ -187,9 +182,8 @@ func TestServiceLinks(t *testing.T) {
 		defer mockCtrl.Finish()
 		defer mockCtrl.Finish()
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
-		tested := composeService{
-			dockerCli: cli,
-		}
+		tested, err := NewComposeService(cli)
+		assert.NilError(t, err)
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 
 
 		s.Links = []string{}
 		s.Links = []string{}
@@ -208,7 +202,7 @@ func TestServiceLinks(t *testing.T) {
 		}
 		}
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptionsOneOff).Return([]container.Summary{c}, nil)
 		apiClient.EXPECT().ContainerList(gomock.Any(), containerListOptionsOneOff).Return([]container.Summary{c}, nil)
 
 
-		links, err := tested.getLinks(context.Background(), testProject, s, 1)
+		links, err := tested.(*composeService).getLinks(context.Background(), testProject, s, 1)
 		assert.NilError(t, err)
 		assert.NilError(t, err)
 
 
 		assert.Equal(t, len(links), 3)
 		assert.Equal(t, len(links), 3)
@@ -224,9 +218,8 @@ func TestWaitDependencies(t *testing.T) {
 
 
 	apiClient := mocks.NewMockAPIClient(mockCtrl)
 	apiClient := mocks.NewMockAPIClient(mockCtrl)
 	cli := mocks.NewMockCli(mockCtrl)
 	cli := mocks.NewMockCli(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 	cli.EXPECT().Client().Return(apiClient).AnyTimes()
 	cli.EXPECT().Client().Return(apiClient).AnyTimes()
 
 
 	t.Run("should skip dependencies with scale 0", func(t *testing.T) {
 	t.Run("should skip dependencies with scale 0", func(t *testing.T) {
@@ -240,7 +233,7 @@ func TestWaitDependencies(t *testing.T) {
 			"db":    {Condition: ServiceConditionRunningOrHealthy},
 			"db":    {Condition: ServiceConditionRunningOrHealthy},
 			"redis": {Condition: ServiceConditionRunningOrHealthy},
 			"redis": {Condition: ServiceConditionRunningOrHealthy},
 		}
 		}
-		assert.NilError(t, tested.waitDependencies(context.Background(), &project, "", dependencies, nil, 0))
+		assert.NilError(t, tested.(*composeService).waitDependencies(context.Background(), &project, "", dependencies, nil, 0))
 	})
 	})
 	t.Run("should skip dependencies with condition service_started", func(t *testing.T) {
 	t.Run("should skip dependencies with condition service_started", func(t *testing.T) {
 		dbService := types.ServiceConfig{Name: "db", Scale: intPtr(1)}
 		dbService := types.ServiceConfig{Name: "db", Scale: intPtr(1)}
@@ -253,7 +246,7 @@ func TestWaitDependencies(t *testing.T) {
 			"db":    {Condition: types.ServiceConditionStarted, Required: true},
 			"db":    {Condition: types.ServiceConditionStarted, Required: true},
 			"redis": {Condition: types.ServiceConditionStarted, Required: true},
 			"redis": {Condition: types.ServiceConditionStarted, Required: true},
 		}
 		}
-		assert.NilError(t, tested.waitDependencies(context.Background(), &project, "", dependencies, nil, 0))
+		assert.NilError(t, tested.(*composeService).waitDependencies(context.Background(), &project, "", dependencies, nil, 0))
 	})
 	})
 }
 }
 
 
@@ -263,9 +256,8 @@ func TestCreateMobyContainer(t *testing.T) {
 		defer mockCtrl.Finish()
 		defer mockCtrl.Finish()
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
-		tested := composeService{
-			dockerCli: cli,
-		}
+		tested, err := NewComposeService(cli)
+		assert.NilError(t, err)
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().ConfigFile().Return(&configfile.ConfigFile{}).AnyTimes()
 		cli.EXPECT().ConfigFile().Return(&configfile.ConfigFile{}).AnyTimes()
 		apiClient.EXPECT().DaemonHost().Return("").AnyTimes()
 		apiClient.EXPECT().DaemonHost().Return("").AnyTimes()
@@ -341,7 +333,7 @@ func TestCreateMobyContainer(t *testing.T) {
 				Aliases:    []string{"bork-test-0"},
 				Aliases:    []string{"bork-test-0"},
 			}))
 			}))
 
 
-		_, err := tested.createMobyContainer(context.Background(), &project, service, "test", 0, nil, createOptions{
+		_, err = tested.(*composeService).createMobyContainer(context.Background(), &project, service, "test", 0, nil, createOptions{
 			Labels: make(types.Labels),
 			Labels: make(types.Labels),
 		})
 		})
 		assert.NilError(t, err)
 		assert.NilError(t, err)
@@ -352,9 +344,8 @@ func TestCreateMobyContainer(t *testing.T) {
 		defer mockCtrl.Finish()
 		defer mockCtrl.Finish()
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		apiClient := mocks.NewMockAPIClient(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
 		cli := mocks.NewMockCli(mockCtrl)
-		tested := composeService{
-			dockerCli: cli,
-		}
+		tested, err := NewComposeService(cli)
+		assert.NilError(t, err)
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().Client().Return(apiClient).AnyTimes()
 		cli.EXPECT().ConfigFile().Return(&configfile.ConfigFile{}).AnyTimes()
 		cli.EXPECT().ConfigFile().Return(&configfile.ConfigFile{}).AnyTimes()
 		apiClient.EXPECT().DaemonHost().Return("").AnyTimes()
 		apiClient.EXPECT().DaemonHost().Return("").AnyTimes()
@@ -428,7 +419,7 @@ func TestCreateMobyContainer(t *testing.T) {
 				NetworkSettings: &container.NetworkSettings{},
 				NetworkSettings: &container.NetworkSettings{},
 			}, nil)
 			}, nil)
 
 
-		_, err := tested.createMobyContainer(context.Background(), &project, service, "test", 0, nil, createOptions{
+		_, err = tested.(*composeService).createMobyContainer(context.Background(), &project, service, "test", 0, nil, createOptions{
 			Labels: make(types.Labels),
 			Labels: make(types.Labels),
 		})
 		})
 		assert.NilError(t, err)
 		assert.NilError(t, err)

+ 3 - 3
pkg/compose/cp.go

@@ -45,7 +45,7 @@ const (
 func (s *composeService) Copy(ctx context.Context, projectName string, options api.CopyOptions) error {
 func (s *composeService) Copy(ctx context.Context, projectName string, options api.CopyOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.copy(ctx, projectName, options)
 		return s.copy(ctx, projectName, options)
-	}, s.stdinfo(), "copy")
+	}, "copy", s.events)
 }
 }
 
 
 func (s *composeService) copy(ctx context.Context, projectName string, options api.CopyOptions) error {
 func (s *composeService) copy(ctx context.Context, projectName string, options api.CopyOptions) error {
@@ -90,7 +90,7 @@ func (s *composeService) copy(ctx context.Context, projectName string, options a
 			} else {
 			} else {
 				msg = fmt.Sprintf("copy %s to %s:%s", srcPath, name, dstPath)
 				msg = fmt.Sprintf("copy %s to %s:%s", srcPath, name, dstPath)
 			}
 			}
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:         name,
 				ID:         name,
 				Text:       msg,
 				Text:       msg,
 				Status:     progress.Working,
 				Status:     progress.Working,
@@ -99,7 +99,7 @@ func (s *composeService) copy(ctx context.Context, projectName string, options a
 			if err := copyFunc(ctx, ctr.ID, srcPath, dstPath, options); err != nil {
 			if err := copyFunc(ctx, ctr.ID, srcPath, dstPath, options); err != nil {
 				return err
 				return err
 			}
 			}
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:         name,
 				ID:         name,
 				Text:       msg,
 				Text:       msg,
 				Status:     progress.Done,
 				Status:     progress.Done,

+ 8 - 8
pkg/compose/create.go

@@ -63,7 +63,7 @@ type createConfigs struct {
 func (s *composeService) Create(ctx context.Context, project *types.Project, createOpts api.CreateOptions) error {
 func (s *composeService) Create(ctx context.Context, project *types.Project, createOpts api.CreateOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.create(ctx, project, createOpts)
 		return s.create(ctx, project, createOpts)
-	}, s.stdinfo(), "create")
+	}, "create", s.events)
 }
 }
 
 
 func (s *composeService) create(ctx context.Context, project *types.Project, options api.CreateOptions) error {
 func (s *composeService) create(ctx context.Context, project *types.Project, options api.CreateOptions) error {
@@ -1394,14 +1394,14 @@ func (s *composeService) resolveOrCreateNetwork(ctx context.Context, project *ty
 	}
 	}
 
 
 	networkEventName := fmt.Sprintf("Network %s", n.Name)
 	networkEventName := fmt.Sprintf("Network %s", n.Name)
-	s.events(ctx, progress.CreatingEvent(networkEventName))
+	s.events.On(progress.CreatingEvent(networkEventName))
 
 
 	resp, err := s.apiClient().NetworkCreate(ctx, n.Name, createOpts)
 	resp, err := s.apiClient().NetworkCreate(ctx, n.Name, createOpts)
 	if err != nil {
 	if err != nil {
-		s.events(ctx, progress.ErrorEvent(networkEventName))
+		s.events.On(progress.ErrorEvent(networkEventName))
 		return "", fmt.Errorf("failed to create network %s: %w", n.Name, err)
 		return "", fmt.Errorf("failed to create network %s: %w", n.Name, err)
 	}
 	}
-	s.events(ctx, progress.CreatedEvent(networkEventName))
+	s.events.On(progress.CreatedEvent(networkEventName))
 
 
 	err = s.connectNetwork(ctx, n.Name, dangledContainers, nil)
 	err = s.connectNetwork(ctx, n.Name, dangledContainers, nil)
 	if err != nil {
 	if err != nil {
@@ -1443,7 +1443,7 @@ func (s *composeService) removeDivergedNetwork(ctx context.Context, project *typ
 
 
 	err = s.apiClient().NetworkRemove(ctx, n.Name)
 	err = s.apiClient().NetworkRemove(ctx, n.Name)
 	eventName := fmt.Sprintf("Network %s", n.Name)
 	eventName := fmt.Sprintf("Network %s", n.Name)
-	s.events(ctx, progress.RemovedEvent(eventName))
+	s.events.On(progress.RemovedEvent(eventName))
 	return containers, err
 	return containers, err
 }
 }
 
 
@@ -1622,7 +1622,7 @@ func (s *composeService) removeDivergedVolume(ctx context.Context, name string,
 
 
 func (s *composeService) createVolume(ctx context.Context, volume types.VolumeConfig) error {
 func (s *composeService) createVolume(ctx context.Context, volume types.VolumeConfig) error {
 	eventName := fmt.Sprintf("Volume %s", volume.Name)
 	eventName := fmt.Sprintf("Volume %s", volume.Name)
-	s.events(ctx, progress.CreatingEvent(eventName))
+	s.events.On(progress.CreatingEvent(eventName))
 	hash, err := VolumeHash(volume)
 	hash, err := VolumeHash(volume)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -1635,9 +1635,9 @@ func (s *composeService) createVolume(ctx context.Context, volume types.VolumeCo
 		DriverOpts: volume.DriverOpts,
 		DriverOpts: volume.DriverOpts,
 	})
 	})
 	if err != nil {
 	if err != nil {
-		s.events(ctx, progress.ErrorEvent(eventName))
+		s.events.On(progress.ErrorEvent(eventName))
 		return err
 		return err
 	}
 	}
-	s.events(ctx, progress.CreatedEvent(eventName))
+	s.events.On(progress.CreatedEvent(eventName))
 	return nil
 	return nil
 }
 }

+ 24 - 24
pkg/compose/down.go

@@ -40,7 +40,7 @@ type downOp func() error
 func (s *composeService) Down(ctx context.Context, projectName string, options api.DownOptions) error {
 func (s *composeService) Down(ctx context.Context, projectName string, options api.DownOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.down(ctx, strings.ToLower(projectName), options)
 		return s.down(ctx, strings.ToLower(projectName), options)
-	}, s.stdinfo(), "down")
+	}, "down", s.events)
 }
 }
 
 
 func (s *composeService) down(ctx context.Context, projectName string, options api.DownOptions) error { //nolint:gocyclo
 func (s *composeService) down(ctx context.Context, projectName string, options api.DownOptions) error { //nolint:gocyclo
@@ -210,7 +210,7 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 	}
 	}
 
 
 	eventName := fmt.Sprintf("Network %s", name)
 	eventName := fmt.Sprintf("Network %s", name)
-	s.events(ctx, progress.RemovingEvent(eventName))
+	s.events.On(progress.RemovingEvent(eventName))
 
 
 	var found int
 	var found int
 	for _, net := range networks {
 	for _, net := range networks {
@@ -219,14 +219,14 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 		}
 		}
 		nw, err := s.apiClient().NetworkInspect(ctx, net.ID, network.InspectOptions{})
 		nw, err := s.apiClient().NetworkInspect(ctx, net.ID, network.InspectOptions{})
 		if errdefs.IsNotFound(err) {
 		if errdefs.IsNotFound(err) {
-			s.events(ctx, progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
+			s.events.On(progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
 			return nil
 			return nil
 		}
 		}
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
 		if len(nw.Containers) > 0 {
 		if len(nw.Containers) > 0 {
-			s.events(ctx, progress.NewEvent(eventName, progress.Warning, "Resource is still in use"))
+			s.events.On(progress.NewEvent(eventName, progress.Warning, "Resource is still in use"))
 			found++
 			found++
 			continue
 			continue
 		}
 		}
@@ -235,10 +235,10 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 			if errdefs.IsNotFound(err) {
 			if errdefs.IsNotFound(err) {
 				continue
 				continue
 			}
 			}
-			s.events(ctx, progress.ErrorEvent(eventName))
+			s.events.On(progress.ErrorEvent(eventName))
 			return fmt.Errorf("failed to remove network %s: %w", name, err)
 			return fmt.Errorf("failed to remove network %s: %w", name, err)
 		}
 		}
-		s.events(ctx, progress.RemovedEvent(eventName))
+		s.events.On(progress.RemovedEvent(eventName))
 		found++
 		found++
 	}
 	}
 
 
@@ -246,7 +246,7 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 		// in practice, it's extremely unlikely for this to ever occur, as it'd
 		// in practice, it's extremely unlikely for this to ever occur, as it'd
 		// mean the network was present when we queried at the start of this
 		// mean the network was present when we queried at the start of this
 		// method but was then deleted by something else in the interim
 		// method but was then deleted by something else in the interim
-		s.events(ctx, progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
+		s.events.On(progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
 		return nil
 		return nil
 	}
 	}
 	return nil
 	return nil
@@ -254,18 +254,18 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 
 
 func (s *composeService) removeImage(ctx context.Context, image string) error {
 func (s *composeService) removeImage(ctx context.Context, image string) error {
 	id := fmt.Sprintf("Image %s", image)
 	id := fmt.Sprintf("Image %s", image)
-	s.events(ctx, progress.NewEvent(id, progress.Working, "Removing"))
+	s.events.On(progress.NewEvent(id, progress.Working, "Removing"))
 	_, err := s.apiClient().ImageRemove(ctx, image, imageapi.RemoveOptions{})
 	_, err := s.apiClient().ImageRemove(ctx, image, imageapi.RemoveOptions{})
 	if err == nil {
 	if err == nil {
-		s.events(ctx, progress.NewEvent(id, progress.Done, "Removed"))
+		s.events.On(progress.NewEvent(id, progress.Done, "Removed"))
 		return nil
 		return nil
 	}
 	}
 	if errdefs.IsConflict(err) {
 	if errdefs.IsConflict(err) {
-		s.events(ctx, progress.NewEvent(id, progress.Warning, "Resource is still in use"))
+		s.events.On(progress.NewEvent(id, progress.Warning, "Resource is still in use"))
 		return nil
 		return nil
 	}
 	}
 	if errdefs.IsNotFound(err) {
 	if errdefs.IsNotFound(err) {
-		s.events(ctx, progress.NewEvent(id, progress.Done, "Warning: No resource found to remove"))
+		s.events.On(progress.NewEvent(id, progress.Done, "Warning: No resource found to remove"))
 		return nil
 		return nil
 	}
 	}
 	return err
 	return err
@@ -280,26 +280,26 @@ func (s *composeService) removeVolume(ctx context.Context, id string) error {
 		return nil
 		return nil
 	}
 	}
 
 
-	s.events(ctx, progress.NewEvent(resource, progress.Working, "Removing"))
+	s.events.On(progress.NewEvent(resource, progress.Working, "Removing"))
 	err = s.apiClient().VolumeRemove(ctx, id, true)
 	err = s.apiClient().VolumeRemove(ctx, id, true)
 	if err == nil {
 	if err == nil {
-		s.events(ctx, progress.NewEvent(resource, progress.Done, "Removed"))
+		s.events.On(progress.NewEvent(resource, progress.Done, "Removed"))
 		return nil
 		return nil
 	}
 	}
 	if errdefs.IsConflict(err) {
 	if errdefs.IsConflict(err) {
-		s.events(ctx, progress.NewEvent(resource, progress.Warning, "Resource is still in use"))
+		s.events.On(progress.NewEvent(resource, progress.Warning, "Resource is still in use"))
 		return nil
 		return nil
 	}
 	}
 	if errdefs.IsNotFound(err) {
 	if errdefs.IsNotFound(err) {
-		s.events(ctx, progress.NewEvent(resource, progress.Done, "Warning: No resource found to remove"))
+		s.events.On(progress.NewEvent(resource, progress.Done, "Warning: No resource found to remove"))
 		return nil
 		return nil
 	}
 	}
 	return err
 	return err
 }
 }
 
 
-func (s *composeService) stopContainer(ctx context.Context, service *types.ServiceConfig, ctr containerType.Summary, timeout *time.Duration, listener api.ContainerEventListener, ) error {
+func (s *composeService) stopContainer(ctx context.Context, service *types.ServiceConfig, ctr containerType.Summary, timeout *time.Duration, listener api.ContainerEventListener) error {
 	eventName := getContainerProgressName(ctr)
 	eventName := getContainerProgressName(ctr)
-	s.events(ctx, progress.StoppingEvent(eventName))
+	s.events.On(progress.StoppingEvent(eventName))
 
 
 	if service != nil {
 	if service != nil {
 		for _, hook := range service.PreStop {
 		for _, hook := range service.PreStop {
@@ -317,14 +317,14 @@ func (s *composeService) stopContainer(ctx context.Context, service *types.Servi
 	timeoutInSecond := utils.DurationSecondToInt(timeout)
 	timeoutInSecond := utils.DurationSecondToInt(timeout)
 	err := s.apiClient().ContainerStop(ctx, ctr.ID, containerType.StopOptions{Timeout: timeoutInSecond})
 	err := s.apiClient().ContainerStop(ctx, ctr.ID, containerType.StopOptions{Timeout: timeoutInSecond})
 	if err != nil {
 	if err != nil {
-		s.events(ctx, progress.ErrorMessageEvent(eventName, "Error while Stopping"))
+		s.events.On(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
 		return err
 		return err
 	}
 	}
-	s.events(ctx, progress.StoppedEvent(eventName))
+	s.events.On(progress.StoppedEvent(eventName))
 	return nil
 	return nil
 }
 }
 
 
-func (s *composeService) stopContainers(ctx context.Context, serv *types.ServiceConfig, containers []containerType.Summary, timeout *time.Duration, listener api.ContainerEventListener, ) error {
+func (s *composeService) stopContainers(ctx context.Context, serv *types.ServiceConfig, containers []containerType.Summary, timeout *time.Duration, listener api.ContainerEventListener) error {
 	eg, ctx := errgroup.WithContext(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
 	for _, ctr := range containers {
 	for _, ctr := range containers {
 		eg.Go(func() error {
 		eg.Go(func() error {
@@ -348,22 +348,22 @@ func (s *composeService) stopAndRemoveContainer(ctx context.Context, ctr contain
 	eventName := getContainerProgressName(ctr)
 	eventName := getContainerProgressName(ctr)
 	err := s.stopContainer(ctx, service, ctr, timeout, nil)
 	err := s.stopContainer(ctx, service, ctr, timeout, nil)
 	if errdefs.IsNotFound(err) {
 	if errdefs.IsNotFound(err) {
-		s.events(ctx, progress.RemovedEvent(eventName))
+		s.events.On(progress.RemovedEvent(eventName))
 		return nil
 		return nil
 	}
 	}
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	s.events(ctx, progress.RemovingEvent(eventName))
+	s.events.On(progress.RemovingEvent(eventName))
 	err = s.apiClient().ContainerRemove(ctx, ctr.ID, containerType.RemoveOptions{
 	err = s.apiClient().ContainerRemove(ctx, ctr.ID, containerType.RemoveOptions{
 		Force:         true,
 		Force:         true,
 		RemoveVolumes: volumes,
 		RemoveVolumes: volumes,
 	})
 	})
 	if err != nil && !errdefs.IsNotFound(err) && !errdefs.IsConflict(err) {
 	if err != nil && !errdefs.IsNotFound(err) && !errdefs.IsConflict(err) {
-		s.events(ctx, progress.ErrorMessageEvent(eventName, "Error while Removing"))
+		s.events.On(progress.ErrorMessageEvent(eventName, "Error while Removing"))
 		return err
 		return err
 	}
 	}
-	s.events(ctx, progress.RemovedEvent(eventName))
+	s.events.On(progress.RemovedEvent(eventName))
 	return nil
 	return nil
 }
 }
 
 

+ 21 - 28
pkg/compose/down_test.go

@@ -43,9 +43,8 @@ func TestDown(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 		[]container.Summary{
 		[]container.Summary{
@@ -91,7 +90,7 @@ func TestDown(t *testing.T) {
 	api.EXPECT().NetworkRemove(gomock.Any(), "abc123").Return(nil)
 	api.EXPECT().NetworkRemove(gomock.Any(), "abc123").Return(nil)
 	api.EXPECT().NetworkRemove(gomock.Any(), "def456").Return(nil)
 	api.EXPECT().NetworkRemove(gomock.Any(), "def456").Return(nil)
 
 
-	err := tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{})
+	err = tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 }
 }
 
 
@@ -100,9 +99,8 @@ func TestDownWithGivenServices(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 		[]container.Summary{
 		[]container.Summary{
@@ -141,7 +139,7 @@ func TestDownWithGivenServices(t *testing.T) {
 	api.EXPECT().NetworkInspect(gomock.Any(), "abc123", gomock.Any()).Return(network.Inspect{ID: "abc123"}, nil)
 	api.EXPECT().NetworkInspect(gomock.Any(), "abc123", gomock.Any()).Return(network.Inspect{ID: "abc123"}, nil)
 	api.EXPECT().NetworkRemove(gomock.Any(), "abc123").Return(nil)
 	api.EXPECT().NetworkRemove(gomock.Any(), "abc123").Return(nil)
 
 
-	err := tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{
+	err = tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{
 		Services: []string{"service1", "not-running-service"},
 		Services: []string{"service1", "not-running-service"},
 	})
 	})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
@@ -152,9 +150,8 @@ func TestDownWithSpecifiedServiceButTheServicesAreNotRunning(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 		[]container.Summary{
 		[]container.Summary{
@@ -178,7 +175,7 @@ func TestDownWithSpecifiedServiceButTheServicesAreNotRunning(t *testing.T) {
 			{ID: "def456", Name: "myProject_default", Labels: map[string]string{compose.NetworkLabel: "default"}},
 			{ID: "def456", Name: "myProject_default", Labels: map[string]string{compose.NetworkLabel: "default"}},
 		}, nil)
 		}, nil)
 
 
-	err := tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{
+	err = tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{
 		Services: []string{"not-running-service1", "not-running-service2"},
 		Services: []string{"not-running-service1", "not-running-service2"},
 	})
 	})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
@@ -189,9 +186,8 @@ func TestDownRemoveOrphans(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(true)).Return(
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(true)).Return(
 		[]container.Summary{
 		[]container.Summary{
@@ -231,7 +227,7 @@ func TestDownRemoveOrphans(t *testing.T) {
 	api.EXPECT().NetworkInspect(gomock.Any(), "abc123", gomock.Any()).Return(network.Inspect{ID: "abc123"}, nil)
 	api.EXPECT().NetworkInspect(gomock.Any(), "abc123", gomock.Any()).Return(network.Inspect{ID: "abc123"}, nil)
 	api.EXPECT().NetworkRemove(gomock.Any(), "abc123").Return(nil)
 	api.EXPECT().NetworkRemove(gomock.Any(), "abc123").Return(nil)
 
 
-	err := tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{RemoveOrphans: true})
+	err = tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{RemoveOrphans: true})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 }
 }
 
 
@@ -240,9 +236,8 @@ func TestDownRemoveVolumes(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 		[]container.Summary{testContainer("service1", "123", false)}, nil)
 		[]container.Summary{testContainer("service1", "123", false)}, nil)
@@ -264,7 +259,7 @@ func TestDownRemoveVolumes(t *testing.T) {
 
 
 	api.EXPECT().VolumeRemove(gomock.Any(), "myProject_volume", true).Return(nil)
 	api.EXPECT().VolumeRemove(gomock.Any(), "myProject_volume", true).Return(nil)
 
 
-	err := tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{Volumes: true})
+	err = tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{Volumes: true})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 }
 }
 
 
@@ -287,9 +282,8 @@ func TestDownRemoveImages(t *testing.T) {
 	}
 	}
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).
 		Return([]container.Summary{
 		Return([]container.Summary{
@@ -352,7 +346,7 @@ func TestDownRemoveImages(t *testing.T) {
 
 
 	t.Log("-> docker compose down --rmi=local")
 	t.Log("-> docker compose down --rmi=local")
 	opts.Images = "local"
 	opts.Images = "local"
-	err := tested.Down(context.Background(), strings.ToLower(testProject), opts)
+	err = tested.Down(context.Background(), strings.ToLower(testProject), opts)
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 
 
 	otherImagesToBeRemoved := []string{
 	otherImagesToBeRemoved := []string{
@@ -376,9 +370,8 @@ func TestDownRemoveImages_NoLabel(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	ctr := testContainer("service1", "123", false)
 	ctr := testContainer("service1", "123", false)
 
 
@@ -413,7 +406,7 @@ func TestDownRemoveImages_NoLabel(t *testing.T) {
 
 
 	api.EXPECT().ImageRemove(gomock.Any(), "testproject-service1:latest", image.RemoveOptions{}).Return(nil, nil)
 	api.EXPECT().ImageRemove(gomock.Any(), "testproject-service1:latest", image.RemoveOptions{}).Return(nil, nil)
 
 
-	err := tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{Images: "local"})
+	err = tested.Down(context.Background(), strings.ToLower(testProject), compose.DownOptions{Images: "local"})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 }
 }
 
 

+ 4 - 4
pkg/compose/export.go

@@ -31,7 +31,7 @@ import (
 func (s *composeService) Export(ctx context.Context, projectName string, options api.ExportOptions) error {
 func (s *composeService) Export(ctx context.Context, projectName string, options api.ExportOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.export(ctx, projectName, options)
 		return s.export(ctx, projectName, options)
-	}, s.stdinfo(), "export")
+	}, "export", s.events)
 }
 }
 
 
 func (s *composeService) export(ctx context.Context, projectName string, options api.ExportOptions) error {
 func (s *composeService) export(ctx context.Context, projectName string, options api.ExportOptions) error {
@@ -53,7 +53,7 @@ func (s *composeService) export(ctx context.Context, projectName string, options
 	name := getCanonicalContainerName(container)
 	name := getCanonicalContainerName(container)
 	msg := fmt.Sprintf("export %s to %s", name, options.Output)
 	msg := fmt.Sprintf("export %s to %s", name, options.Output)
 
 
-	s.events(ctx, progress.Event{
+	s.events.On(progress.Event{
 		ID:         name,
 		ID:         name,
 		Text:       msg,
 		Text:       msg,
 		Status:     progress.Working,
 		Status:     progress.Working,
@@ -67,7 +67,7 @@ func (s *composeService) export(ctx context.Context, projectName string, options
 
 
 	defer func() {
 	defer func() {
 		if err := responseBody.Close(); err != nil {
 		if err := responseBody.Close(); err != nil {
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:         name,
 				ID:         name,
 				Text:       msg,
 				Text:       msg,
 				Status:     progress.Error,
 				Status:     progress.Error,
@@ -92,7 +92,7 @@ func (s *composeService) export(ctx context.Context, projectName string, options
 		}
 		}
 	}
 	}
 
 
-	s.events(ctx, progress.Event{
+	s.events.On(progress.Event{
 		ID:         name,
 		ID:         name,
 		Text:       msg,
 		Text:       msg,
 		Status:     progress.Done,
 		Status:     progress.Done,

+ 2 - 3
pkg/compose/images_test.go

@@ -37,9 +37,8 @@ func TestImages(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	ctx := context.Background()
 	ctx := context.Background()
 	args := filters.NewArgs(projectFilter(strings.ToLower(testProject)))
 	args := filters.NewArgs(projectFilter(strings.ToLower(testProject)))

+ 4 - 4
pkg/compose/kill.go

@@ -31,7 +31,7 @@ import (
 func (s *composeService) Kill(ctx context.Context, projectName string, options api.KillOptions) error {
 func (s *composeService) Kill(ctx context.Context, projectName string, options api.KillOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.kill(ctx, strings.ToLower(projectName), options)
 		return s.kill(ctx, strings.ToLower(projectName), options)
-	}, s.stdinfo(), "kill")
+	}, "kill", s.events)
 }
 }
 
 
 func (s *composeService) kill(ctx context.Context, projectName string, options api.KillOptions) error {
 func (s *composeService) kill(ctx context.Context, projectName string, options api.KillOptions) error {
@@ -63,13 +63,13 @@ func (s *composeService) kill(ctx context.Context, projectName string, options a
 	containers.forEach(func(ctr container.Summary) {
 	containers.forEach(func(ctr container.Summary) {
 		eg.Go(func() error {
 		eg.Go(func() error {
 			eventName := getContainerProgressName(ctr)
 			eventName := getContainerProgressName(ctr)
-			s.events(ctx, progress.KillingEvent(eventName))
+			s.events.On(progress.KillingEvent(eventName))
 			err := s.apiClient().ContainerKill(ctx, ctr.ID, options.Signal)
 			err := s.apiClient().ContainerKill(ctx, ctr.ID, options.Signal)
 			if err != nil {
 			if err != nil {
-				s.events(ctx, progress.ErrorMessageEvent(eventName, "Error while Killing"))
+				s.events.On(progress.ErrorMessageEvent(eventName, "Error while Killing"))
 				return err
 				return err
 			}
 			}
-			s.events(ctx, progress.KilledEvent(eventName))
+			s.events.On(progress.KilledEvent(eventName))
 			return nil
 			return nil
 		})
 		})
 	})
 	})

+ 6 - 8
pkg/compose/kill_test.go

@@ -40,9 +40,8 @@ func TestKillAll(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	name := strings.ToLower(testProject)
 	name := strings.ToLower(testProject)
 
 
@@ -65,7 +64,7 @@ func TestKillAll(t *testing.T) {
 	api.EXPECT().ContainerKill(anyCancellableContext(), "456", "").Return(nil)
 	api.EXPECT().ContainerKill(anyCancellableContext(), "456", "").Return(nil)
 	api.EXPECT().ContainerKill(anyCancellableContext(), "789", "").Return(nil)
 	api.EXPECT().ContainerKill(anyCancellableContext(), "789", "").Return(nil)
 
 
-	err := tested.kill(ctx, name, compose.KillOptions{})
+	err = tested.Kill(ctx, name, compose.KillOptions{})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 }
 }
 
 
@@ -75,9 +74,8 @@ func TestKillSignal(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	name := strings.ToLower(testProject)
 	name := strings.ToLower(testProject)
 	listOptions := container.ListOptions{
 	listOptions := container.ListOptions{
@@ -98,7 +96,7 @@ func TestKillSignal(t *testing.T) {
 		}, nil)
 		}, nil)
 	api.EXPECT().ContainerKill(anyCancellableContext(), "123", "SIGTERM").Return(nil)
 	api.EXPECT().ContainerKill(anyCancellableContext(), "123", "SIGTERM").Return(nil)
 
 
-	err := tested.kill(ctx, name, compose.KillOptions{Services: []string{serviceName}, Signal: "SIGTERM"})
+	err = tested.Kill(ctx, name, compose.KillOptions{Services: []string{serviceName}, Signal: "SIGTERM"})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 }
 }
 
 

+ 6 - 8
pkg/compose/logs_test.go

@@ -39,9 +39,8 @@ func TestComposeService_Logs_Demux(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	require.NoError(t, err)
 
 
 	name := strings.ToLower(testProject)
 	name := strings.ToLower(testProject)
 
 
@@ -88,7 +87,7 @@ func TestComposeService_Logs_Demux(t *testing.T) {
 	}
 	}
 
 
 	consumer := &testLogConsumer{}
 	consumer := &testLogConsumer{}
-	err := tested.Logs(ctx, name, consumer, opts)
+	err = tested.Logs(ctx, name, consumer, opts)
 	require.NoError(t, err)
 	require.NoError(t, err)
 
 
 	require.Equal(
 	require.Equal(
@@ -110,9 +109,8 @@ func TestComposeService_Logs_ServiceFiltering(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	require.NoError(t, err)
 
 
 	name := strings.ToLower(testProject)
 	name := strings.ToLower(testProject)
 
 
@@ -159,7 +157,7 @@ func TestComposeService_Logs_ServiceFiltering(t *testing.T) {
 	opts := compose.LogOptions{
 	opts := compose.LogOptions{
 		Project: proj,
 		Project: proj,
 	}
 	}
-	err := tested.Logs(ctx, name, consumer, opts)
+	err = tested.Logs(ctx, name, consumer, opts)
 	require.NoError(t, err)
 	require.NoError(t, err)
 
 
 	require.Equal(t, []string{"hello c1"}, consumer.LogsForContainer("c1"))
 	require.Equal(t, []string{"hello c1"}, consumer.LogsForContainer("c1"))

+ 7 - 7
pkg/compose/model.go

@@ -101,8 +101,8 @@ func (m *modelAPI) Close() {
 	m.cleanup()
 	m.cleanup()
 }
 }
 
 
-func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quietPull bool, events EventBus) error {
-	events(ctx, progress.Event{
+func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quietPull bool, events progress.EventProcessor) error {
+	events.On(progress.Event{
 		ID:     model.Name,
 		ID:     model.Name,
 		Status: progress.Working,
 		Status: progress.Working,
 		Text:   "Pulling",
 		Text:   "Pulling",
@@ -131,7 +131,7 @@ func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quiet
 		}
 		}
 
 
 		if !quietPull {
 		if !quietPull {
-			events(ctx, progress.Event{
+			events.On(progress.Event{
 				ID:         model.Name,
 				ID:         model.Name,
 				Status:     progress.Working,
 				Status:     progress.Working,
 				Text:       "Pulling",
 				Text:       "Pulling",
@@ -142,9 +142,9 @@ func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quiet
 
 
 	err = cmd.Wait()
 	err = cmd.Wait()
 	if err != nil {
 	if err != nil {
-		events(ctx, progress.ErrorMessageEvent(model.Name, err.Error()))
+		events.On(progress.ErrorMessageEvent(model.Name, err.Error()))
 	}
 	}
-	events(ctx, progress.Event{
+	events.On(progress.Event{
 		ID:     model.Name,
 		ID:     model.Name,
 		Status: progress.Working,
 		Status: progress.Working,
 		Text:   "Pulled",
 		Text:   "Pulled",
@@ -152,8 +152,8 @@ func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quiet
 	return err
 	return err
 }
 }
 
 
-func (m *modelAPI) ConfigureModel(ctx context.Context, config types.ModelConfig, events EventBus) error {
-	events(ctx, progress.Event{
+func (m *modelAPI) ConfigureModel(ctx context.Context, config types.ModelConfig, events progress.EventProcessor) error {
+	events.On(progress.Event{
 		ID:     config.Name,
 		ID:     config.Name,
 		Status: progress.Working,
 		Status: progress.Working,
 		Text:   "Configuring",
 		Text:   "Configuring",

+ 4 - 4
pkg/compose/pause.go

@@ -30,7 +30,7 @@ import (
 func (s *composeService) Pause(ctx context.Context, projectName string, options api.PauseOptions) error {
 func (s *composeService) Pause(ctx context.Context, projectName string, options api.PauseOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.pause(ctx, strings.ToLower(projectName), options)
 		return s.pause(ctx, strings.ToLower(projectName), options)
-	}, s.stdinfo(), "pause")
+	}, "pause", s.events)
 }
 }
 
 
 func (s *composeService) pause(ctx context.Context, projectName string, options api.PauseOptions) error {
 func (s *composeService) pause(ctx context.Context, projectName string, options api.PauseOptions) error {
@@ -49,7 +49,7 @@ func (s *composeService) pause(ctx context.Context, projectName string, options
 			err := s.apiClient().ContainerPause(ctx, container.ID)
 			err := s.apiClient().ContainerPause(ctx, container.ID)
 			if err == nil {
 			if err == nil {
 				eventName := getContainerProgressName(container)
 				eventName := getContainerProgressName(container)
-				s.events(ctx, progress.NewEvent(eventName, progress.Done, "Paused"))
+				s.events.On(progress.NewEvent(eventName, progress.Done, "Paused"))
 			}
 			}
 			return err
 			return err
 		})
 		})
@@ -60,7 +60,7 @@ func (s *composeService) pause(ctx context.Context, projectName string, options
 func (s *composeService) UnPause(ctx context.Context, projectName string, options api.PauseOptions) error {
 func (s *composeService) UnPause(ctx context.Context, projectName string, options api.PauseOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.unPause(ctx, strings.ToLower(projectName), options)
 		return s.unPause(ctx, strings.ToLower(projectName), options)
-	}, s.stdinfo(), "unpause")
+	}, "unpause", s.events)
 }
 }
 
 
 func (s *composeService) unPause(ctx context.Context, projectName string, options api.PauseOptions) error {
 func (s *composeService) unPause(ctx context.Context, projectName string, options api.PauseOptions) error {
@@ -79,7 +79,7 @@ func (s *composeService) unPause(ctx context.Context, projectName string, option
 			err = s.apiClient().ContainerUnpause(ctx, ctr.ID)
 			err = s.apiClient().ContainerUnpause(ctx, ctr.ID)
 			if err == nil {
 			if err == nil {
 				eventName := getContainerProgressName(ctr)
 				eventName := getContainerProgressName(ctr)
-				s.events(ctx, progress.NewEvent(eventName, progress.Done, "Unpaused"))
+				s.events.On(progress.NewEvent(eventName, progress.Done, "Unpaused"))
 			}
 			}
 			return err
 			return err
 		})
 		})

+ 9 - 9
pkg/compose/plugins.go

@@ -66,7 +66,7 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project,
 		return err
 		return err
 	}
 	}
 
 
-	variables, err := s.executePlugin(ctx, cmd, command, service)
+	variables, err := s.executePlugin(cmd, command, service)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -85,14 +85,14 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project,
 	return nil
 	return nil
 }
 }
 
 
-func (s *composeService) executePlugin(ctx context.Context, cmd *exec.Cmd, command string, service types.ServiceConfig) (types.Mapping, error) {
+func (s *composeService) executePlugin(cmd *exec.Cmd, command string, service types.ServiceConfig) (types.Mapping, error) {
 	var action string
 	var action string
 	switch command {
 	switch command {
 	case "up":
 	case "up":
-		s.events(ctx, progress.CreatingEvent(service.Name))
+		s.events.On(progress.CreatingEvent(service.Name))
 		action = "create"
 		action = "create"
 	case "down":
 	case "down":
-		s.events(ctx, progress.RemovingEvent(service.Name))
+		s.events.On(progress.RemovingEvent(service.Name))
 		action = "remove"
 		action = "remove"
 	default:
 	default:
 		return nil, fmt.Errorf("unsupported plugin command: %s", command)
 		return nil, fmt.Errorf("unsupported plugin command: %s", command)
@@ -124,10 +124,10 @@ func (s *composeService) executePlugin(ctx context.Context, cmd *exec.Cmd, comma
 		}
 		}
 		switch msg.Type {
 		switch msg.Type {
 		case ErrorType:
 		case ErrorType:
-			s.events(ctx, progress.NewEvent(service.Name, progress.Error, msg.Message))
+			s.events.On(progress.NewEvent(service.Name, progress.Error, msg.Message))
 			return nil, errors.New(msg.Message)
 			return nil, errors.New(msg.Message)
 		case InfoType:
 		case InfoType:
-			s.events(ctx, progress.NewEvent(service.Name, progress.Working, msg.Message))
+			s.events.On(progress.NewEvent(service.Name, progress.Working, msg.Message))
 		case SetEnvType:
 		case SetEnvType:
 			key, val, found := strings.Cut(msg.Message, "=")
 			key, val, found := strings.Cut(msg.Message, "=")
 			if !found {
 			if !found {
@@ -143,14 +143,14 @@ func (s *composeService) executePlugin(ctx context.Context, cmd *exec.Cmd, comma
 
 
 	err = cmd.Wait()
 	err = cmd.Wait()
 	if err != nil {
 	if err != nil {
-		s.events(ctx, progress.ErrorMessageEvent(service.Name, err.Error()))
+		s.events.On(progress.ErrorMessageEvent(service.Name, err.Error()))
 		return nil, fmt.Errorf("failed to %s service provider: %s", action, err.Error())
 		return nil, fmt.Errorf("failed to %s service provider: %s", action, err.Error())
 	}
 	}
 	switch command {
 	switch command {
 	case "up":
 	case "up":
-		s.events(ctx, progress.CreatedEvent(service.Name))
+		s.events.On(progress.CreatedEvent(service.Name))
 	case "down":
 	case "down":
-		s.events(ctx, progress.RemovedEvent(service.Name))
+		s.events.On(progress.RemovedEvent(service.Name))
 	}
 	}
 	return variables, nil
 	return variables, nil
 }
 }

+ 2 - 3
pkg/compose/ps_test.go

@@ -34,9 +34,8 @@ func TestPs(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	ctx := context.Background()
 	ctx := context.Background()
 	args := filters.NewArgs(projectFilter(strings.ToLower(testProject)), hasConfigHashLabel())
 	args := filters.NewArgs(projectFilter(strings.ToLower(testProject)), hasConfigHashLabel())

+ 4 - 4
pkg/compose/publish.go

@@ -45,7 +45,7 @@ import (
 func (s *composeService) Publish(ctx context.Context, project *types.Project, repository string, options api.PublishOptions) error {
 func (s *composeService) Publish(ctx context.Context, project *types.Project, repository string, options api.PublishOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.publish(ctx, project, repository, options)
 		return s.publish(ctx, project, repository, options)
-	}, s.stdinfo(), "publish")
+	}, "publish", s.events)
 }
 }
 
 
 //nolint:gocyclo
 //nolint:gocyclo
@@ -71,7 +71,7 @@ func (s *composeService) publish(ctx context.Context, project *types.Project, re
 		return err
 		return err
 	}
 	}
 
 
-	s.events(ctx, progress.Event{
+	s.events.On(progress.Event{
 		ID:     repository,
 		ID:     repository,
 		Text:   "publishing",
 		Text:   "publishing",
 		Status: progress.Working,
 		Status: progress.Working,
@@ -93,7 +93,7 @@ func (s *composeService) publish(ctx context.Context, project *types.Project, re
 
 
 		descriptor, err := oci.PushManifest(ctx, resolver, named, layers, options.OCIVersion)
 		descriptor, err := oci.PushManifest(ctx, resolver, named, layers, options.OCIVersion)
 		if err != nil {
 		if err != nil {
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:     repository,
 				ID:     repository,
 				Text:   "publishing",
 				Text:   "publishing",
 				Status: progress.Error,
 				Status: progress.Error,
@@ -145,7 +145,7 @@ func (s *composeService) publish(ctx context.Context, project *types.Project, re
 			}
 			}
 		}
 		}
 	}
 	}
-	s.events(ctx, progress.Event{
+	s.events.On(progress.Event{
 		ID:     repository,
 		ID:     repository,
 		Text:   "published",
 		Text:   "published",
 		Status: progress.Done,
 		Status: progress.Done,

+ 26 - 30
pkg/compose/pull.go

@@ -46,7 +46,7 @@ import (
 func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
 func (s *composeService) Pull(ctx context.Context, project *types.Project, options api.PullOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.pull(ctx, project, options)
 		return s.pull(ctx, project, options)
-	}, s.stdinfo(), "pull")
+	}, "pull", s.events)
 }
 }
 
 
 func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
 func (s *composeService) pull(ctx context.Context, project *types.Project, opts api.PullOptions) error { //nolint:gocyclo
@@ -67,7 +67,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 	i := 0
 	i := 0
 	for name, service := range project.Services {
 	for name, service := range project.Services {
 		if service.Image == "" {
 		if service.Image == "" {
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:     name,
 				ID:     name,
 				Status: progress.Done,
 				Status: progress.Done,
 				Text:   "Skipped - No image to be pulled",
 				Text:   "Skipped - No image to be pulled",
@@ -77,16 +77,16 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 
 
 		switch service.PullPolicy {
 		switch service.PullPolicy {
 		case types.PullPolicyNever, types.PullPolicyBuild:
 		case types.PullPolicyNever, types.PullPolicyBuild:
-			s.events(ctx, progress.Event{
-				ID:     name,
+			s.events.On(progress.Event{
+				ID:     "Image " + service.Image,
 				Status: progress.Done,
 				Status: progress.Done,
 				Text:   "Skipped",
 				Text:   "Skipped",
 			})
 			})
 			continue
 			continue
 		case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
 		case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
 			if imageAlreadyPresent(service.Image, images) {
 			if imageAlreadyPresent(service.Image, images) {
-				s.events(ctx, progress.Event{
-					ID:     name,
+				s.events.On(progress.Event{
+					ID:     "Image " + service.Image,
 					Status: progress.Done,
 					Status: progress.Done,
 					Text:   "Skipped - Image is already present locally",
 					Text:   "Skipped - Image is already present locally",
 				})
 				})
@@ -95,20 +95,15 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 		}
 		}
 
 
 		if service.Build != nil && opts.IgnoreBuildable {
 		if service.Build != nil && opts.IgnoreBuildable {
-			s.events(ctx, progress.Event{
-				ID:     name,
+			s.events.On(progress.Event{
+				ID:     "Image " + service.Image,
 				Status: progress.Done,
 				Status: progress.Done,
 				Text:   "Skipped - Image can be built",
 				Text:   "Skipped - Image can be built",
 			})
 			})
 			continue
 			continue
 		}
 		}
 
 
-		if img, ok := imagesBeingPulled[service.Image]; ok {
-			s.events(ctx, progress.Event{
-				ID:     name,
-				Status: progress.Done,
-				Text:   fmt.Sprintf("Skipped - Image is already being pulled by %v", img),
-			})
+		if _, ok := imagesBeingPulled[service.Image]; ok {
 			continue
 			continue
 		}
 		}
 
 
@@ -124,8 +119,8 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 				}
 				}
 				if !opts.IgnoreFailures && service.Build == nil {
 				if !opts.IgnoreFailures && service.Build == nil {
 					if s.dryRun {
 					if s.dryRun {
-						s.events(ctx, progress.Event{
-							ID:     name,
+						s.events.On(progress.Event{
+							ID:     "Image " + service.Image,
 							Status: progress.Error,
 							Status: progress.Error,
 							Text:   fmt.Sprintf(" - Pull error for image: %s", service.Image),
 							Text:   fmt.Sprintf(" - Pull error for image: %s", service.Image),
 						})
 						})
@@ -177,9 +172,10 @@ func getUnwrappedErrorMessage(err error) string {
 	return err.Error()
 	return err.Error()
 }
 }
 
 
-func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, quietPull bool, defaultPlatform string, ) (string, error) {
-	s.events(ctx, progress.Event{
-		ID:     service.Name,
+func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, quietPull bool, defaultPlatform string) (string, error) {
+	resource := "Image " + service.Image
+	s.events.On(progress.Event{
+		ID:     resource,
 		Status: progress.Working,
 		Status: progress.Working,
 		Text:   "Pulling",
 		Text:   "Pulling",
 	})
 	})
@@ -204,8 +200,8 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 	})
 	})
 
 
 	if ctx.Err() != nil {
 	if ctx.Err() != nil {
-		s.events(ctx, progress.Event{
-			ID:         service.Name,
+		s.events.On(progress.Event{
+			ID:         resource,
 			Status:     progress.Warning,
 			Status:     progress.Warning,
 			StatusText: "Interrupted",
 			StatusText: "Interrupted",
 		})
 		})
@@ -215,8 +211,8 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 	// check if has error and the service has a build section
 	// check if has error and the service has a build section
 	// then the status should be warning instead of error
 	// then the status should be warning instead of error
 	if err != nil && service.Build != nil {
 	if err != nil && service.Build != nil {
-		s.events(ctx, progress.Event{
-			ID:         service.Name,
+		s.events.On(progress.Event{
+			ID:         resource,
 			Status:     progress.Warning,
 			Status:     progress.Warning,
 			Text:       "Warning",
 			Text:       "Warning",
 			StatusText: getUnwrappedErrorMessage(err),
 			StatusText: getUnwrappedErrorMessage(err),
@@ -225,8 +221,8 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 	}
 	}
 
 
 	if err != nil {
 	if err != nil {
-		s.events(ctx, progress.Event{
-			ID:         service.Name,
+		s.events.On(progress.Event{
+			ID:         resource,
 			Status:     progress.Error,
 			Status:     progress.Error,
 			Text:       "Error",
 			Text:       "Error",
 			StatusText: getUnwrappedErrorMessage(err),
 			StatusText: getUnwrappedErrorMessage(err),
@@ -247,11 +243,11 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 			return "", errors.New(jm.Error.Message)
 			return "", errors.New(jm.Error.Message)
 		}
 		}
 		if !quietPull {
 		if !quietPull {
-			toPullProgressEvent(ctx, service.Name, jm, s.events)
+			toPullProgressEvent(resource, jm, s.events)
 		}
 		}
 	}
 	}
-	s.events(ctx, progress.Event{
-		ID:     service.Name,
+	s.events.On(progress.Event{
+		ID:     resource,
 		Status: progress.Done,
 		Status: progress.Done,
 		Text:   "Pulled",
 		Text:   "Pulled",
 	})
 	})
@@ -411,7 +407,7 @@ const (
 	PullCompletePhase      = "Pull complete"
 	PullCompletePhase      = "Pull complete"
 )
 )
 
 
-func toPullProgressEvent(ctx context.Context, parent string, jm jsonmessage.JSONMessage, events EventBus) {
+func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, events progress.EventProcessor) {
 	if jm.ID == "" || jm.Progress == nil {
 	if jm.ID == "" || jm.Progress == nil {
 		return
 		return
 	}
 	}
@@ -453,7 +449,7 @@ func toPullProgressEvent(ctx context.Context, parent string, jm jsonmessage.JSON
 		text = jm.Error.Message
 		text = jm.Error.Message
 	}
 	}
 
 
-	events(ctx, progress.Event{
+	events.On(progress.Event{
 		ID:         jm.ID,
 		ID:         jm.ID,
 		ParentID:   parent,
 		ParentID:   parent,
 		Current:    current,
 		Current:    current,

+ 9 - 9
pkg/compose/push.go

@@ -42,7 +42,7 @@ func (s *composeService) Push(ctx context.Context, project *types.Project, optio
 	}
 	}
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.push(ctx, project, options)
 		return s.push(ctx, project, options)
-	}, s.stdinfo(), "push")
+	}, "push", s.events)
 }
 }
 
 
 func (s *composeService) push(ctx context.Context, project *types.Project, options api.PushOptions) error {
 func (s *composeService) push(ctx context.Context, project *types.Project, options api.PushOptions) error {
@@ -54,7 +54,7 @@ func (s *composeService) push(ctx context.Context, project *types.Project, optio
 			if options.ImageMandatory && service.Image == "" && service.Provider == nil {
 			if options.ImageMandatory && service.Image == "" && service.Provider == nil {
 				return fmt.Errorf("%q attribute is mandatory to push an image for service %q", "service.image", service.Name)
 				return fmt.Errorf("%q attribute is mandatory to push an image for service %q", "service.image", service.Name)
 			}
 			}
-			s.events(ctx, progress.Event{
+			s.events.On(progress.Event{
 				ID:     service.Name,
 				ID:     service.Name,
 				Status: progress.Done,
 				Status: progress.Done,
 				Text:   "Skipped",
 				Text:   "Skipped",
@@ -68,16 +68,16 @@ func (s *composeService) push(ctx context.Context, project *types.Project, optio
 
 
 		for _, tag := range tags {
 		for _, tag := range tags {
 			eg.Go(func() error {
 			eg.Go(func() error {
-				s.events(ctx, progress.NewEvent(tag, progress.Working, "Pushing"))
+				s.events.On(progress.NewEvent(tag, progress.Working, "Pushing"))
 				err := s.pushServiceImage(ctx, tag, options.Quiet)
 				err := s.pushServiceImage(ctx, tag, options.Quiet)
 				if err != nil {
 				if err != nil {
 					if !options.IgnoreFailures {
 					if !options.IgnoreFailures {
-						s.events(ctx, progress.NewEvent(tag, progress.Error, err.Error()))
+						s.events.On(progress.NewEvent(tag, progress.Error, err.Error()))
 						return err
 						return err
 					}
 					}
-					s.events(ctx, progress.NewEvent(tag, progress.Warning, err.Error()))
+					s.events.On(progress.NewEvent(tag, progress.Warning, err.Error()))
 				} else {
 				} else {
-					s.events(ctx, progress.NewEvent(tag, progress.Done, "Pushed"))
+					s.events.On(progress.NewEvent(tag, progress.Done, "Pushed"))
 				}
 				}
 				return nil
 				return nil
 			})
 			})
@@ -122,14 +122,14 @@ func (s *composeService) pushServiceImage(ctx context.Context, tag string, quiet
 		}
 		}
 
 
 		if !quietPush {
 		if !quietPush {
-			toPushProgressEvent(ctx, tag, jm, s.events)
+			toPushProgressEvent(tag, jm, s.events)
 		}
 		}
 	}
 	}
 
 
 	return nil
 	return nil
 }
 }
 
 
-func toPushProgressEvent(ctx context.Context, prefix string, jm jsonmessage.JSONMessage, events EventBus) {
+func toPushProgressEvent(prefix string, jm jsonmessage.JSONMessage, events progress.EventProcessor) {
 	if jm.ID == "" {
 	if jm.ID == "" {
 		// skipped
 		// skipped
 		return
 		return
@@ -160,7 +160,7 @@ func toPushProgressEvent(ctx context.Context, prefix string, jm jsonmessage.JSON
 		}
 		}
 	}
 	}
 
 
-	events(ctx, progress.Event{
+	events.On(progress.Event{
 		ParentID:   prefix,
 		ParentID:   prefix,
 		ID:         jm.ID,
 		ID:         jm.ID,
 		Text:       jm.Status,
 		Text:       jm.Status,

+ 3 - 3
pkg/compose/remove.go

@@ -94,7 +94,7 @@ func (s *composeService) Remove(ctx context.Context, projectName string, options
 	}
 	}
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.remove(ctx, stoppedContainers, options)
 		return s.remove(ctx, stoppedContainers, options)
-	}, s.stdinfo(), "remove")
+	}, "remove", s.events)
 }
 }
 
 
 func (s *composeService) remove(ctx context.Context, containers Containers, options api.RemoveOptions) error {
 func (s *composeService) remove(ctx context.Context, containers Containers, options api.RemoveOptions) error {
@@ -102,13 +102,13 @@ func (s *composeService) remove(ctx context.Context, containers Containers, opti
 	for _, ctr := range containers {
 	for _, ctr := range containers {
 		eg.Go(func() error {
 		eg.Go(func() error {
 			eventName := getContainerProgressName(ctr)
 			eventName := getContainerProgressName(ctr)
-			s.events(ctx, progress.RemovingEvent(eventName))
+			s.events.On(progress.RemovingEvent(eventName))
 			err := s.apiClient().ContainerRemove(ctx, ctr.ID, container.RemoveOptions{
 			err := s.apiClient().ContainerRemove(ctx, ctr.ID, container.RemoveOptions{
 				RemoveVolumes: options.Volumes,
 				RemoveVolumes: options.Volumes,
 				Force:         options.Force,
 				Force:         options.Force,
 			})
 			})
 			if err == nil {
 			if err == nil {
-				s.events(ctx, progress.RemovedEvent(eventName))
+				s.events.On(progress.RemovedEvent(eventName))
 			}
 			}
 			return err
 			return err
 		})
 		})

+ 3 - 3
pkg/compose/restart.go

@@ -31,7 +31,7 @@ import (
 func (s *composeService) Restart(ctx context.Context, projectName string, options api.RestartOptions) error {
 func (s *composeService) Restart(ctx context.Context, projectName string, options api.RestartOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.restart(ctx, strings.ToLower(projectName), options)
 		return s.restart(ctx, strings.ToLower(projectName), options)
-	}, s.stdinfo(), "restart")
+	}, "restart", s.events)
 }
 }
 
 
 func (s *composeService) restart(ctx context.Context, projectName string, options api.RestartOptions) error { //nolint:gocyclo
 func (s *composeService) restart(ctx context.Context, projectName string, options api.RestartOptions) error { //nolint:gocyclo
@@ -93,13 +93,13 @@ func (s *composeService) restart(ctx context.Context, projectName string, option
 					}
 					}
 				}
 				}
 				eventName := getContainerProgressName(ctr)
 				eventName := getContainerProgressName(ctr)
-				s.events(ctx, progress.RestartingEvent(eventName))
+				s.events.On(progress.RestartingEvent(eventName))
 				timeout := utils.DurationSecondToInt(options.Timeout)
 				timeout := utils.DurationSecondToInt(options.Timeout)
 				err = s.apiClient().ContainerRestart(ctx, ctr.ID, container.StopOptions{Timeout: timeout})
 				err = s.apiClient().ContainerRestart(ctx, ctr.ID, container.StopOptions{Timeout: timeout})
 				if err != nil {
 				if err != nil {
 					return err
 					return err
 				}
 				}
-				s.events(ctx, progress.StartedEvent(eventName))
+				s.events.On(progress.StartedEvent(eventName))
 				for _, hook := range def.PostStart {
 				for _, hook := range def.PostStart {
 					err = s.runHook(ctx, ctr, def, hook, nil)
 					err = s.runHook(ctx, ctr, def, hook, nil)
 					if err != nil {
 					if err != nil {

+ 1 - 1
pkg/compose/run.go

@@ -67,7 +67,7 @@ func (s *composeService) prepareRun(ctx context.Context, project *types.Project,
 
 
 	err = progress.Run(ctx, func(ctx context.Context) error {
 	err = progress.Run(ctx, func(ctx context.Context) error {
 		return s.startDependencies(ctx, project, opts)
 		return s.startDependencies(ctx, project, opts)
-	}, s.stdinfo(), "run")
+	}, "run", s.events)
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
 	}
 	}

+ 1 - 1
pkg/compose/scale.go

@@ -31,5 +31,5 @@ func (s *composeService) Scale(ctx context.Context, project *types.Project, opti
 			return err
 			return err
 		}
 		}
 		return s.start(ctx, project.Name, api.StartOptions{Project: project, Services: options.Services}, nil)
 		return s.start(ctx, project.Name, api.StartOptions{Project: project, Services: options.Services}, nil)
-	}), s.stdinfo(), "scale")
+	}), "scale", s.events)
 }
 }

+ 1 - 1
pkg/compose/start.go

@@ -33,7 +33,7 @@ import (
 func (s *composeService) Start(ctx context.Context, projectName string, options api.StartOptions) error {
 func (s *composeService) Start(ctx context.Context, projectName string, options api.StartOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.start(ctx, strings.ToLower(projectName), options, nil)
 		return s.start(ctx, strings.ToLower(projectName), options, nil)
-	}, s.stdinfo(), "start")
+	}, "start", s.events)
 }
 }
 
 
 func (s *composeService) start(ctx context.Context, projectName string, options api.StartOptions, listener api.ContainerEventListener) error {
 func (s *composeService) start(ctx context.Context, projectName string, options api.StartOptions, listener api.ContainerEventListener) error {

+ 1 - 1
pkg/compose/stop.go

@@ -28,7 +28,7 @@ import (
 func (s *composeService) Stop(ctx context.Context, projectName string, options api.StopOptions) error {
 func (s *composeService) Stop(ctx context.Context, projectName string, options api.StopOptions) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		return s.stop(ctx, strings.ToLower(projectName), options, nil)
 		return s.stop(ctx, strings.ToLower(projectName), options, nil)
-	}, s.stdinfo(), "stop")
+	}, "stop", s.events)
 }
 }
 
 
 func (s *composeService) stop(ctx context.Context, projectName string, options api.StopOptions, event api.ContainerEventListener) error {
 func (s *composeService) stop(ctx context.Context, projectName string, options api.StopOptions, event api.ContainerEventListener) error {

+ 3 - 4
pkg/compose/stop_test.go

@@ -38,9 +38,8 @@ func TestStopTimeout(t *testing.T) {
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 
 
 	api, cli := prepareMocks(mockCtrl)
 	api, cli := prepareMocks(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	assert.NilError(t, err)
 
 
 	ctx := context.Background()
 	ctx := context.Background()
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
 	api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt(false)).Return(
@@ -64,7 +63,7 @@ func TestStopTimeout(t *testing.T) {
 	api.EXPECT().ContainerStop(gomock.Any(), "456", stopConfig).Return(nil)
 	api.EXPECT().ContainerStop(gomock.Any(), "456", stopConfig).Return(nil)
 	api.EXPECT().ContainerStop(gomock.Any(), "789", stopConfig).Return(nil)
 	api.EXPECT().ContainerStop(gomock.Any(), "789", stopConfig).Return(nil)
 
 
-	err := tested.Stop(ctx, strings.ToLower(testProject), compose.StopOptions{
+	err = tested.Stop(ctx, strings.ToLower(testProject), compose.StopOptions{
 		Timeout: &timeout,
 		Timeout: &timeout,
 	})
 	})
 	assert.NilError(t, err)
 	assert.NilError(t, err)

+ 3 - 3
pkg/compose/up.go

@@ -49,7 +49,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
 			return s.start(ctx, project.Name, options.Start, nil)
 			return s.start(ctx, project.Name, options.Start, nil)
 		}
 		}
 		return nil
 		return nil
-	}), s.stdinfo(), "up")
+	}), "up", s.events)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -133,7 +133,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
 						Services: options.Create.Services,
 						Services: options.Create.Services,
 						Project:  project,
 						Project:  project,
 					}, printer.HandleEvent)
 					}, printer.HandleEvent)
-				}, s.stdinfo(), logConsumer)
+				}, "stop", s.events, logConsumer)
 				appendErr(err)
 				appendErr(err)
 				return nil
 				return nil
 			})
 			})
@@ -214,7 +214,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
 							Services: options.Create.Services,
 							Services: options.Create.Services,
 							Project:  project,
 							Project:  project,
 						}, printer.HandleEvent)
 						}, printer.HandleEvent)
-					}, s.stdinfo(), logConsumer)
+					}, "stop", s.events, logConsumer)
 					appendErr(err)
 					appendErr(err)
 					return nil
 					return nil
 				})
 				})

+ 2 - 3
pkg/compose/viz_test.go

@@ -116,9 +116,8 @@ func TestViz(t *testing.T) {
 	mockCtrl := gomock.NewController(t)
 	mockCtrl := gomock.NewController(t)
 	defer mockCtrl.Finish()
 	defer mockCtrl.Finish()
 	cli := mocks.NewMockCli(mockCtrl)
 	cli := mocks.NewMockCli(mockCtrl)
-	tested := composeService{
-		dockerCli: cli,
-	}
+	tested, err := NewComposeService(cli)
+	require.NoError(t, err)
 
 
 	ctx := context.Background()
 	ctx := context.Background()
 
 

+ 2 - 2
pkg/e2e/compose_run_test.go

@@ -187,8 +187,8 @@ func TestLocalComposeRun(t *testing.T) {
 		res.Assert(t, icmd.Success)
 		res.Assert(t, icmd.Success)
 
 
 		res = c.RunDockerComposeCmd(t, "-f", "./fixtures/run-test/pull.yaml", "run", "--pull", "always", "backend")
 		res = c.RunDockerComposeCmd(t, "-f", "./fixtures/run-test/pull.yaml", "run", "--pull", "always", "backend")
-		assert.Assert(t, strings.Contains(res.Combined(), "backend Pulling"), res.Combined())
-		assert.Assert(t, strings.Contains(res.Combined(), "backend Pulled"), res.Combined())
+		assert.Assert(t, strings.Contains(res.Combined(), "Image nginx Pulling"), res.Combined())
+		assert.Assert(t, strings.Contains(res.Combined(), "Image nginx Pulled"), res.Combined())
 	})
 	})
 
 
 	t.Run("compose run --env-from-file", func(t *testing.T) {
 	t.Run("compose run --env-from-file", func(t *testing.T) {

+ 5 - 21
pkg/e2e/pull_test.go

@@ -34,31 +34,15 @@ func TestComposePull(t *testing.T) {
 		res := c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/simple", "pull")
 		res := c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/simple", "pull")
 		output := res.Combined()
 		output := res.Combined()
 
 
-		assert.Assert(t, strings.Contains(output, "simple Pulled"))
-		assert.Assert(t, strings.Contains(output, "another Pulled"))
+		assert.Assert(t, strings.Contains(output, "Image alpine:3.14 Pulled"))
+		assert.Assert(t, strings.Contains(output, "Image alpine:3.15 Pulled"))
 
 
 		// verify default policy is 'always' for pull command
 		// verify default policy is 'always' for pull command
 		res = c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/simple", "pull")
 		res = c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/simple", "pull")
 		output = res.Combined()
 		output = res.Combined()
 
 
-		assert.Assert(t, strings.Contains(output, "simple Pulled"))
-		assert.Assert(t, strings.Contains(output, "another Pulled"))
-	})
-
-	t.Run("Verify a image is pulled once", func(t *testing.T) {
-		// cleanup existing images
-		c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/duplicate-images", "down", "--rmi", "all")
-
-		res := c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/duplicate-images", "pull")
-		output := res.Combined()
-
-		if strings.Contains(output, "another Pulled") {
-			assert.Assert(t, strings.Contains(output, "another Pulled"))
-			assert.Assert(t, strings.Contains(output, "Skipped - Image is already being pulled by another"))
-		} else {
-			assert.Assert(t, strings.Contains(output, "simple Pulled"))
-			assert.Assert(t, strings.Contains(output, "Skipped - Image is already being pulled by simple"))
-		}
+		assert.Assert(t, strings.Contains(output, "Image alpine:3.14 Pulled"))
+		assert.Assert(t, strings.Contains(output, "Image alpine:3.15 Pulled"))
 	})
 	})
 
 
 	t.Run("Verify skipped pull if image is already present locally", func(t *testing.T) {
 	t.Run("Verify skipped pull if image is already present locally", func(t *testing.T) {
@@ -68,7 +52,7 @@ func TestComposePull(t *testing.T) {
 		res := c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/image-present-locally", "pull")
 		res := c.RunDockerComposeCmd(t, "--project-directory", "fixtures/compose-pull/image-present-locally", "pull")
 		output := res.Combined()
 		output := res.Combined()
 
 
-		assert.Assert(t, strings.Contains(output, "simple Skipped - Image is already present locally"))
+		assert.Assert(t, strings.Contains(output, "alpine:3.13.12 Skipped - Image is already present locally"))
 		// image with :latest tag gets pulled regardless if pull_policy: missing or if_not_present
 		// image with :latest tag gets pulled regardless if pull_policy: missing or if_not_present
 		assert.Assert(t, strings.Contains(output, "latest Pulled"))
 		assert.Assert(t, strings.Contains(output, "latest Pulled"))
 	})
 	})

+ 12 - 0
pkg/progress/event.go

@@ -16,6 +16,8 @@
 
 
 package progress
 package progress
 
 
+import "context"
+
 // EventStatus indicates the status of an action
 // EventStatus indicates the status of an action
 type EventStatus int
 type EventStatus int
 
 
@@ -159,3 +161,13 @@ func NewEvent(id string, status EventStatus, statusText string) Event {
 		StatusText: statusText,
 		StatusText: statusText,
 	}
 	}
 }
 }
+
+// EventProcessor is notified about Compose operations and tasks
+type EventProcessor interface {
+	// Start is triggered as a Compose operation is starting with context
+	Start(ctx context.Context, operation string)
+	// On notify about (sub)task and progress processing operation
+	On(events ...Event)
+	// Done is triggered as a Compose operation completed
+	Done(operation string, success bool)
+}

+ 9 - 28
pkg/progress/json.go

@@ -23,9 +23,14 @@ import (
 	"io"
 	"io"
 )
 )
 
 
+func NewJSONWriter(out io.Writer) EventProcessor {
+	return &jsonWriter{
+		out: out,
+	}
+}
+
 type jsonWriter struct {
 type jsonWriter struct {
 	out    io.Writer
 	out    io.Writer
-	done   chan bool
 	dryRun bool
 	dryRun bool
 }
 }
 
 
@@ -41,13 +46,7 @@ type jsonMessage struct {
 	Percent  int    `json:"percent,omitempty"`
 	Percent  int    `json:"percent,omitempty"`
 }
 }
 
 
-func (p *jsonWriter) Start(ctx context.Context) error {
-	select {
-	case <-ctx.Done():
-		return ctx.Err()
-	case <-p.done:
-		return nil
-	}
+func (p *jsonWriter) Start(ctx context.Context, operation string) {
 }
 }
 
 
 func (p *jsonWriter) Event(e Event) {
 func (p *jsonWriter) Event(e Event) {
@@ -68,29 +67,11 @@ func (p *jsonWriter) Event(e Event) {
 	}
 	}
 }
 }
 
 
-func (p *jsonWriter) Events(events []Event) {
+func (p *jsonWriter) On(events ...Event) {
 	for _, e := range events {
 	for _, e := range events {
 		p.Event(e)
 		p.Event(e)
 	}
 	}
 }
 }
 
 
-func (p *jsonWriter) TailMsgf(msg string, args ...interface{}) {
-	message := &jsonMessage{
-		DryRun: p.dryRun,
-		Tail:   true,
-		ID:     "",
-		Text:   fmt.Sprintf(msg, args...),
-		Status: "",
-	}
-	marshal, err := json.Marshal(message)
-	if err == nil {
-		_, _ = fmt.Fprintln(p.out, string(marshal))
-	}
-}
-
-func (p *jsonWriter) Stop() {
-	p.done <- true
-}
-
-func (p *jsonWriter) HasMore(bool) {
+func (p *jsonWriter) Done(_ string, _ bool) {
 }
 }

+ 0 - 29
pkg/progress/json_test.go

@@ -18,7 +18,6 @@ package progress
 
 
 import (
 import (
 	"bytes"
 	"bytes"
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"testing"
 	"testing"
 
 
@@ -29,7 +28,6 @@ func TestJsonWriter_Event(t *testing.T) {
 	var out bytes.Buffer
 	var out bytes.Buffer
 	w := &jsonWriter{
 	w := &jsonWriter{
 		out:    &out,
 		out:    &out,
-		done:   make(chan bool),
 		dryRun: true,
 		dryRun: true,
 	}
 	}
 
 
@@ -60,30 +58,3 @@ func TestJsonWriter_Event(t *testing.T) {
 	}
 	}
 	assert.DeepEqual(t, expected, actual)
 	assert.DeepEqual(t, expected, actual)
 }
 }
-
-func TestJsonWriter_TailMsgf(t *testing.T) {
-	var out bytes.Buffer
-	w := &jsonWriter{
-		out:    &out,
-		done:   make(chan bool),
-		dryRun: false,
-	}
-
-	go func() {
-		_ = w.Start(context.Background())
-	}()
-
-	w.TailMsgf("hello %s", "world")
-
-	w.Stop()
-
-	var actual jsonMessage
-	err := json.Unmarshal(out.Bytes(), &actual)
-	assert.NilError(t, err)
-
-	expected := jsonMessage{
-		Tail: true,
-		Text: "hello world",
-	}
-	assert.DeepEqual(t, expected, actual)
-}

+ 0 - 76
pkg/progress/mixed.go

@@ -1,76 +0,0 @@
-/*
-   Copyright 2020 Docker Compose CLI authors
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package progress
-
-import (
-	"context"
-	"fmt"
-
-	"github.com/docker/cli/cli/streams"
-	"github.com/docker/compose/v2/pkg/api"
-)
-
-// NewMixedWriter creates a Writer which allows to mix output from progress.Writer with a api.LogConsumer
-func NewMixedWriter(out *streams.Out, consumer api.LogConsumer, dryRun bool) Writer {
-	isTerminal := out.IsTerminal()
-	if Mode != ModeAuto || !isTerminal {
-		return &plainWriter{
-			out:    out,
-			done:   make(chan bool),
-			dryRun: dryRun,
-		}
-	}
-	return &mixedWriter{
-		out:    consumer,
-		done:   make(chan bool),
-		dryRun: dryRun,
-	}
-}
-
-type mixedWriter struct {
-	done   chan bool
-	dryRun bool
-	out    api.LogConsumer
-}
-
-func (p *mixedWriter) Start(ctx context.Context) error {
-	select {
-	case <-ctx.Done():
-		return ctx.Err()
-	case <-p.done:
-		return nil
-	}
-}
-
-func (p *mixedWriter) Event(e Event) {
-	p.out.Status("", fmt.Sprintf("%s %s %s", e.ID, e.Text, SuccessColor(e.StatusText)))
-}
-
-func (p *mixedWriter) Events(events []Event) {
-	for _, e := range events {
-		p.Event(e)
-	}
-}
-
-func (p *mixedWriter) TailMsgf(msg string, args ...interface{}) {
-	msg = fmt.Sprintf(msg, args...)
-	p.out.Status("", WarningColor(msg))
-}
-
-func (p *mixedWriter) Stop() {
-	p.done <- true
-}

+ 0 - 39
pkg/progress/noop.go

@@ -1,39 +0,0 @@
-/*
-   Copyright 2020 Docker Compose CLI authors
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package progress
-
-import (
-	"context"
-)
-
-type noopWriter struct{}
-
-func (p *noopWriter) Start(ctx context.Context) error {
-	return nil
-}
-
-func (p *noopWriter) Event(Event) {
-}
-
-func (p *noopWriter) Events([]Event) {
-}
-
-func (p *noopWriter) TailMsgf(_ string, _ ...interface{}) {
-}
-
-func (p *noopWriter) Stop() {
-}

+ 9 - 19
pkg/progress/plain.go

@@ -24,19 +24,18 @@ import (
 	"github.com/docker/compose/v2/pkg/api"
 	"github.com/docker/compose/v2/pkg/api"
 )
 )
 
 
+func NewPlainWriter(out io.Writer) EventProcessor {
+	return &plainWriter{
+		out: out,
+	}
+}
+
 type plainWriter struct {
 type plainWriter struct {
 	out    io.Writer
 	out    io.Writer
-	done   chan bool
 	dryRun bool
 	dryRun bool
 }
 }
 
 
-func (p *plainWriter) Start(ctx context.Context) error {
-	select {
-	case <-ctx.Done():
-		return ctx.Err()
-	case <-p.done:
-		return nil
-	}
+func (p *plainWriter) Start(ctx context.Context, operation string) {
 }
 }
 
 
 func (p *plainWriter) Event(e Event) {
 func (p *plainWriter) Event(e Event) {
@@ -47,20 +46,11 @@ func (p *plainWriter) Event(e Event) {
 	_, _ = fmt.Fprintln(p.out, prefix, e.ID, e.Text, e.StatusText)
 	_, _ = fmt.Fprintln(p.out, prefix, e.ID, e.Text, e.StatusText)
 }
 }
 
 
-func (p *plainWriter) Events(events []Event) {
+func (p *plainWriter) On(events ...Event) {
 	for _, e := range events {
 	for _, e := range events {
 		p.Event(e)
 		p.Event(e)
 	}
 	}
 }
 }
 
 
-func (p *plainWriter) TailMsgf(msg string, args ...interface{}) {
-	msg = fmt.Sprintf(msg, args...)
-	if p.dryRun {
-		msg = api.DRYRUN_PREFIX + msg
-	}
-	_, _ = fmt.Fprintln(p.out, msg)
-}
-
-func (p *plainWriter) Stop() {
-	p.done <- true
+func (p *plainWriter) Done(_ string, _ bool) {
 }
 }

+ 53 - 0
pkg/progress/progress.go

@@ -0,0 +1,53 @@
+/*
+   Copyright 2020 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package progress
+
+import (
+	"context"
+
+	"github.com/docker/compose/v2/pkg/api"
+)
+
+type progressFunc func(context.Context) error
+
+func RunWithLog(ctx context.Context, pf progressFunc, operation string, bus EventProcessor, logConsumer api.LogConsumer) error {
+	// FIXME(ndeloof) re-implement support for logs during stop sequence
+	return pf(ctx)
+}
+
+func Run(ctx context.Context, pf progressFunc, operation string, bus EventProcessor) error {
+	bus.Start(ctx, operation)
+	err := pf(ctx)
+	bus.Done(operation, err != nil)
+	return err
+}
+
+const (
+	// ModeAuto detect console capabilities
+	ModeAuto = "auto"
+	// ModeTTY use terminal capability for advanced rendering
+	ModeTTY = "tty"
+	// ModePlain dump raw events to output
+	ModePlain = "plain"
+	// ModeQuiet don't display events
+	ModeQuiet = "quiet"
+	// ModeJSON outputs a machine-readable JSON stream
+	ModeJSON = "json"
+)
+
+// Mode define how progress should be rendered, either as ModePlain or ModeTTY
+var Mode = ModeAuto

+ 6 - 9
pkg/progress/quiet.go

@@ -18,20 +18,17 @@ package progress
 
 
 import "context"
 import "context"
 
 
-type quiet struct{}
-
-func (q quiet) Start(_ context.Context) error {
-	return nil
+func NewQuiedWriter() EventProcessor {
+	return &quiet{}
 }
 }
 
 
-func (q quiet) Stop() {
-}
+type quiet struct{}
 
 
-func (q quiet) Event(_ Event) {
+func (q *quiet) Start(_ context.Context, _ string) {
 }
 }
 
 
-func (q quiet) Events(_ []Event) {
+func (q *quiet) Done(_ string, _ bool) {
 }
 }
 
 
-func (q quiet) TailMsgf(_ string, _ ...interface{}) {
+func (q *quiet) On(_ ...Event) {
 }
 }

+ 58 - 45
pkg/progress/tty.go

@@ -32,6 +32,18 @@ import (
 	"github.com/morikuni/aec"
 	"github.com/morikuni/aec"
 )
 )
 
 
+// NewTTYWriter creates an EventProcessor that render advanced UI within a terminal.
+// On Start, TUI lists task with a progress timer
+func NewTTYWriter(out io.Writer) EventProcessor {
+	return &ttyWriter{
+		out:   out,
+		tasks: map[string]task{},
+		ids:   []string{},
+		done:  make(chan bool),
+		mtx:   &sync.Mutex{},
+	}
+}
+
 type ttyWriter struct {
 type ttyWriter struct {
 	out             io.Writer
 	out             io.Writer
 	tasks           map[string]task
 	tasks           map[string]task
@@ -40,10 +52,10 @@ type ttyWriter struct {
 	numLines        int
 	numLines        int
 	done            chan bool
 	done            chan bool
 	mtx             *sync.Mutex
 	mtx             *sync.Mutex
-	tailEvents      []string
-	dryRun          bool
+	dryRun          bool // FIXME(ndeloof) (re)implement support for dry-run
 	skipChildEvents bool
 	skipChildEvents bool
-	progressTitle   string
+	title           string
+	ticker          *time.Ticker
 }
 }
 
 
 type task struct {
 type task struct {
@@ -69,34 +81,40 @@ func (t *task) hasMore() {
 	t.spinner.Restart()
 	t.spinner.Restart()
 }
 }
 
 
-func (w *ttyWriter) Start(ctx context.Context) error {
-	ticker := time.NewTicker(100 * time.Millisecond)
-	defer ticker.Stop()
-
-	for {
-		select {
-		case <-ctx.Done():
-			w.print()
-			w.printTailEvents()
-			return ctx.Err()
-		case <-w.done:
-			w.print()
-			w.printTailEvents()
-			return nil
-		case <-ticker.C:
-			w.print()
+func (w *ttyWriter) Start(ctx context.Context, operation string) {
+	w.ticker = time.NewTicker(100 * time.Millisecond)
+	w.title = operation
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				// interrupted
+				w.ticker.Stop()
+				return
+			case <-w.done:
+				w.print()
+				w.mtx.Lock()
+				w.ticker.Stop()
+				w.title = ""
+				w.mtx.Unlock()
+				return
+			case <-w.ticker.C:
+				w.print()
+			}
 		}
 		}
-	}
+	}()
 }
 }
 
 
-func (w *ttyWriter) Stop() {
+func (w *ttyWriter) Done(operation string, success bool) {
 	w.done <- true
 	w.done <- true
 }
 }
 
 
-func (w *ttyWriter) Event(e Event) {
+func (w *ttyWriter) On(events ...Event) {
 	w.mtx.Lock()
 	w.mtx.Lock()
 	defer w.mtx.Unlock()
 	defer w.mtx.Unlock()
-	w.event(e)
+	for _, e := range events {
+		w.event(e)
+	}
 }
 }
 
 
 func (w *ttyWriter) event(e Event) {
 func (w *ttyWriter) event(e Event) {
@@ -149,32 +167,27 @@ func (w *ttyWriter) event(e Event) {
 		}
 		}
 		w.tasks[e.ID] = t
 		w.tasks[e.ID] = t
 	}
 	}
+	w.printEvent(e)
 }
 }
 
 
-func (w *ttyWriter) Events(events []Event) {
-	w.mtx.Lock()
-	defer w.mtx.Unlock()
-	for _, e := range events {
-		w.event(e)
-	}
-}
-
-func (w *ttyWriter) TailMsgf(msg string, args ...interface{}) {
-	w.mtx.Lock()
-	defer w.mtx.Unlock()
-	msgWithPrefix := msg
-	if w.dryRun {
-		msgWithPrefix = strings.TrimSpace(api.DRYRUN_PREFIX + msg)
+func (w *ttyWriter) printEvent(e Event) {
+	if w.title != "" {
+		// event will be displayed by progress UI on ticker's ticks
+		return
 	}
 	}
-	w.tailEvents = append(w.tailEvents, fmt.Sprintf(msgWithPrefix, args...))
-}
 
 
-func (w *ttyWriter) printTailEvents() {
-	w.mtx.Lock()
-	defer w.mtx.Unlock()
-	for _, msg := range w.tailEvents {
-		_, _ = fmt.Fprintln(w.out, msg)
+	var color colorFunc
+	switch e.Status {
+	case Working:
+		color = SuccessColor
+	case Done:
+		color = SuccessColor
+	case Warning:
+		color = WarningColor
+	case Error:
+		color = ErrorColor
 	}
 	}
+	_, _ = fmt.Fprintf(w.out, "%s %s %s\n", e.ID, e.Text, color(e.StatusText))
 }
 }
 
 
 func (w *ttyWriter) print() { //nolint:gocyclo
 func (w *ttyWriter) print() { //nolint:gocyclo
@@ -200,7 +213,7 @@ func (w *ttyWriter) print() { //nolint:gocyclo
 		_, _ = fmt.Fprint(w.out, aec.Show)
 		_, _ = fmt.Fprint(w.out, aec.Show)
 	}()
 	}()
 
 
-	firstLine := fmt.Sprintf("[+] %s %d/%d", w.progressTitle, numDone(w.tasks), len(w.tasks))
+	firstLine := fmt.Sprintf("[+] %s %d/%d", w.title, numDone(w.tasks), len(w.tasks))
 	if w.numLines != 0 && numDone(w.tasks) == w.numLines {
 	if w.numLines != 0 && numDone(w.tasks) == w.numLines {
 		firstLine = DoneColor(firstLine)
 		firstLine = DoneColor(firstLine)
 	}
 	}

+ 0 - 149
pkg/progress/writer.go

@@ -1,149 +0,0 @@
-/*
-   Copyright 2020 Docker Compose CLI authors
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package progress
-
-import (
-	"context"
-	"fmt"
-	"io"
-	"sync"
-
-	"github.com/docker/cli/cli/streams"
-	"golang.org/x/sync/errgroup"
-
-	"github.com/docker/compose/v2/pkg/api"
-)
-
-// Writer can write multiple progress events
-type Writer interface {
-	Start(context.Context) error
-	Stop()
-	Event(Event)
-	Events([]Event)
-	TailMsgf(string, ...interface{})
-}
-
-type writerKey struct{}
-
-// WithContextWriter adds the writer to the context
-func WithContextWriter(ctx context.Context, writer Writer) context.Context {
-	return context.WithValue(ctx, writerKey{}, writer)
-}
-
-// ContextWriter returns the writer from the context
-func ContextWriter(ctx context.Context) Writer {
-	s, ok := ctx.Value(writerKey{}).(Writer)
-	if !ok {
-		return &noopWriter{}
-	}
-	return s
-}
-
-type progressFunc func(context.Context) error
-
-func RunWithLog(ctx context.Context, pf progressFunc, out *streams.Out, logConsumer api.LogConsumer) error {
-	w := NewMixedWriter(out, logConsumer, false) // FIXME(ndeloof) re-implement dry-run
-	eg, _ := errgroup.WithContext(ctx)
-	eg.Go(func() error {
-		return w.Start(context.Background())
-	})
-	eg.Go(func() error {
-		defer w.Stop()
-		ctx = WithContextWriter(ctx, w)
-		err := pf(ctx)
-		return err
-	})
-	return eg.Wait()
-}
-
-func Run(ctx context.Context, pf progressFunc, out *streams.Out, progressTitle string) error {
-	eg, _ := errgroup.WithContext(ctx)
-	w, err := NewWriter(ctx, out, progressTitle)
-	if err != nil {
-		return err
-	}
-	eg.Go(func() error {
-		return w.Start(context.Background())
-	})
-
-	ctx = WithContextWriter(ctx, w)
-
-	eg.Go(func() error {
-		defer w.Stop()
-		err := pf(ctx)
-		return err
-	})
-	return eg.Wait()
-}
-
-const (
-	// ModeAuto detect console capabilities
-	ModeAuto = "auto"
-	// ModeTTY use terminal capability for advanced rendering
-	ModeTTY = "tty"
-	// ModePlain dump raw events to output
-	ModePlain = "plain"
-	// ModeQuiet don't display events
-	ModeQuiet = "quiet"
-	// ModeJSON outputs a machine-readable JSON stream
-	ModeJSON = "json"
-)
-
-// Mode define how progress should be rendered, either as ModePlain or ModeTTY
-var Mode = ModeAuto
-
-// NewWriter returns a new multi-progress writer
-func NewWriter(ctx context.Context, out *streams.Out, progressTitle string) (Writer, error) {
-	isTerminal := out.IsTerminal()
-	switch Mode {
-	case ModeQuiet:
-		return quiet{}, nil
-	case ModeJSON:
-		return &jsonWriter{
-			out:    out,
-			done:   make(chan bool),
-			dryRun: false, // FIXME(ndeloof) re-implement dry-run
-		}, nil
-	case ModeTTY:
-		return newTTYWriter(out, false, progressTitle)
-	case ModeAuto, "":
-		if isTerminal {
-			return newTTYWriter(out, false, progressTitle)
-		}
-		fallthrough
-	case ModePlain:
-		return &plainWriter{
-			out:    out,
-			done:   make(chan bool),
-			dryRun: false,
-		}, nil
-	}
-	return nil, fmt.Errorf("unknown progress mode: %s", Mode)
-}
-
-func newTTYWriter(out io.Writer, dryRun bool, progressTitle string) (Writer, error) {
-	return &ttyWriter{
-		out:           out,
-		ids:           []string{},
-		tasks:         map[string]task{},
-		repeated:      false,
-		done:          make(chan bool),
-		mtx:           &sync.Mutex{},
-		dryRun:        dryRun,
-		progressTitle: progressTitle,
-	}, nil
-}

+ 0 - 31
pkg/progress/writer_test.go

@@ -1,31 +0,0 @@
-/*
-   Copyright 2020 Docker Compose CLI authors
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package progress
-
-import (
-	"context"
-	"testing"
-
-	"gotest.tools/v3/assert"
-)
-
-func TestNoopWriter(t *testing.T) {
-	todo := context.TODO()
-	writer := ContextWriter(todo)
-
-	assert.Equal(t, writer, &noopWriter{})
-}