ソースを参照

use eventBus to collect tasks progress

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 2 ヶ月 前
コミット
394466683a

+ 3 - 4
pkg/compose/build.go

@@ -199,7 +199,6 @@ func (s *composeService) build(ctx context.Context, project *types.Project, opti
 		return -1
 	}
 
-	cw := progress.ContextWriter(ctx)
 	err = InDependencyOrder(ctx, project, func(ctx context.Context, name string) error {
 		service, ok := serviceToBuild[name]
 		if !ok {
@@ -209,12 +208,12 @@ func (s *composeService) build(ctx context.Context, project *types.Project, opti
 
 		if !buildkitEnabled {
 			trace.SpanFromContext(ctx).SetAttributes(attribute.String("builder", "classic"))
-			cw.Event(progress.BuildingEvent(serviceName))
+			s.events(ctx, progress.BuildingEvent(serviceName))
 			id, err := s.doBuildClassic(ctx, project, service, options)
 			if err != nil {
 				return err
 			}
-			cw.Event(progress.BuiltEvent(serviceName))
+			s.events(ctx, progress.BuiltEvent(serviceName))
 			builtDigests[getServiceIndex(name)] = id
 
 			if options.Push {
@@ -260,7 +259,7 @@ func (s *composeService) build(ctx context.Context, project *types.Project, opti
 			service := project.Services[names[i]]
 			imageRef := api.GetImageNameOrDefault(service, project.Name)
 			imageIDs[imageRef] = imageDigest
-			cw.Event(progress.BuiltEvent(names[i]))
+			s.events(ctx, progress.BuiltEvent(names[i]))
 		}
 	}
 	return imageIDs, err

+ 8 - 10
pkg/compose/build_bake.go

@@ -340,7 +340,7 @@ func (s *composeService) doBuildBake(ctx context.Context, project *types.Project
 	logrus.Debugf("Executing bake with args: %v", args)
 
 	if s.dryRun {
-		return dryRunBake(ctx, cfg), nil
+		return s.dryRunBake(ctx, cfg), nil
 	}
 	cmd := exec.CommandContext(ctx, buildx.Path, args...)
 
@@ -417,7 +417,6 @@ func (s *composeService) doBuildBake(ctx context.Context, project *types.Project
 		return nil, err
 	}
 
-	cw := progress.ContextWriter(ctx)
 	results := map[string]string{}
 	for name := range serviceToBeBuild {
 		image := expectedImages[name]
@@ -427,7 +426,7 @@ func (s *composeService) doBuildBake(ctx context.Context, project *types.Project
 			return nil, fmt.Errorf("build result not found in Bake metadata for service %s", name)
 		}
 		results[image] = built.Digest
-		cw.Event(progress.BuiltEvent(image))
+		s.events(ctx, progress.BuiltEvent(image))
 	}
 	return results, nil
 }
@@ -565,27 +564,26 @@ func dockerFilePath(ctxName string, dockerfile string) string {
 	return dockerfile
 }
 
-func dryRunBake(ctx context.Context, cfg bakeConfig) map[string]string {
-	w := progress.ContextWriter(ctx)
+func (s composeService) dryRunBake(ctx context.Context, cfg bakeConfig) map[string]string {
 	bakeResponse := map[string]string{}
 	for name, target := range cfg.Targets {
 		dryRunUUID := fmt.Sprintf("dryRun-%x", sha1.Sum([]byte(name)))
-		displayDryRunBuildEvent(w, name, dryRunUUID, target.Tags[0])
+		s.displayDryRunBuildEvent(ctx, name, dryRunUUID, target.Tags[0])
 		bakeResponse[name] = dryRunUUID
 	}
 	for name := range bakeResponse {
-		w.Event(progress.BuiltEvent(name))
+		s.events(ctx, progress.BuiltEvent(name))
 	}
 	return bakeResponse
 }
 
-func displayDryRunBuildEvent(w progress.Writer, name string, dryRunUUID, tag string) {
-	w.Event(progress.Event{
+func (s composeService) displayDryRunBuildEvent(ctx context.Context, name, dryRunUUID, tag string) {
+	s.events(ctx, progress.Event{
 		ID:     name + " ==>",
 		Status: progress.Done,
 		Text:   fmt.Sprintf("==> writing image %s", dryRunUUID),
 	})
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:     name + " ==> ==>",
 		Status: progress.Done,
 		Text:   fmt.Sprintf(`naming to %s`, tag),

+ 1 - 3
pkg/compose/build_buildkit.go

@@ -30,7 +30,6 @@ import (
 	"github.com/docker/buildx/util/confutil"
 	"github.com/docker/buildx/util/dockerutil"
 	buildx "github.com/docker/buildx/util/progress"
-	"github.com/docker/compose/v2/pkg/progress"
 	"github.com/moby/buildkit/client"
 )
 
@@ -67,10 +66,9 @@ func (s *composeService) doBuildBuildkit(ctx context.Context, service string, op
 }
 
 func (s composeService) dryRunBuildResponse(ctx context.Context, name string, options build.Options) map[string]*client.SolveResponse {
-	w := progress.ContextWriter(ctx)
 	buildResponse := map[string]*client.SolveResponse{}
 	dryRunUUID := fmt.Sprintf("dryRun-%x", sha1.Sum([]byte(name)))
-	displayDryRunBuildEvent(w, name, dryRunUUID, options.Tags[0])
+	s.displayDryRunBuildEvent(ctx, name, dryRunUUID, options.Tags[0])
 	buildResponse[name] = &client.SolveResponse{ExporterResponse: map[string]string{
 		"containerimage.digest": dryRunUUID,
 	}}

+ 3 - 5
pkg/compose/commit.go

@@ -40,12 +40,10 @@ func (s *composeService) commit(ctx context.Context, projectName string, options
 		return err
 	}
 
-	w := progress.ContextWriter(ctx)
-
 	name := getCanonicalContainerName(ctr)
 	msg := fmt.Sprintf("Commit %s", name)
 
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:         name,
 		Text:       msg,
 		Status:     progress.Working,
@@ -53,7 +51,7 @@ func (s *composeService) commit(ctx context.Context, projectName string, options
 	})
 
 	if s.dryRun {
-		w.Event(progress.Event{
+		s.events(ctx, progress.Event{
 			ID:         name,
 			Text:       msg,
 			Status:     progress.Done,
@@ -74,7 +72,7 @@ func (s *composeService) commit(ctx context.Context, projectName string, options
 		return err
 	}
 
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:         name,
 		Text:       msg,
 		Status:     progress.Done,

+ 9 - 0
pkg/compose/compose.go

@@ -32,6 +32,7 @@ import (
 	"github.com/docker/cli/cli/config/configfile"
 	"github.com/docker/cli/cli/flags"
 	"github.com/docker/cli/cli/streams"
+	"github.com/docker/compose/v2/pkg/progress"
 	"github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/api/types/network"
@@ -81,6 +82,10 @@ func NewComposeService(dockerCli command.Cli, options ...Option) (api.Compose, e
 		clock:          clockwork.NewRealClock(),
 		maxConcurrency: -1,
 		dryRun:         false,
+		events: func(ctx context.Context, e ...progress.Event) {
+			// FIXME(ndeloof) temporary during refactoring
+			progress.ContextWriter(ctx).Events(e)
+		},
 	}
 	for _, option := range options {
 		if err := option(s); err != nil {
@@ -191,10 +196,14 @@ func WithDryRun(s *composeService) error {
 
 type Prompt func(message string, defaultValue bool) (bool, error)
 
+type EventBus func(ctx context.Context, e ...progress.Event)
+
 type composeService struct {
 	dockerCli command.Cli
 	// prompt is used to interact with user and confirm actions
 	prompt Prompt
+	// eventBus collects tasks execution events
+	events EventBus
 
 	// Optional overrides for specific components (for SDK users)
 	outStream   io.Writer

+ 35 - 46
pkg/compose/convergence.go

@@ -57,7 +57,7 @@ const (
 // Cross services dependencies are managed by creating services in expected order and updating `service:xx` reference
 // when a service has converged, so dependent ones can be managed with resolved containers references.
 type convergence struct {
-	service    *composeService
+	compose    *composeService
 	services   map[string]Containers
 	networks   map[string]string
 	volumes    map[string]string
@@ -86,7 +86,7 @@ func newConvergence(services []string, state Containers, networks map[string]str
 		observedState[service] = append(observedState[service], c)
 	}
 	return &convergence{
-		service:  s,
+		compose:  s,
 		services: observedState,
 		networks: networks,
 		volumes:  volumes,
@@ -112,7 +112,7 @@ func (c *convergence) apply(ctx context.Context, project *types.Project, options
 
 func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error { //nolint:gocyclo
 	if service.Provider != nil {
-		return c.service.runPlugin(ctx, project, service, "up")
+		return c.compose.runPlugin(ctx, project, service, "up")
 	}
 	expected, err := getScale(service)
 	if err != nil {
@@ -159,7 +159,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
 			ctr := ctr
 			traceOpts := append(tracing.ServiceOptions(service), tracing.ContainerOptions(ctr)...)
 			eg.Go(tracing.SpanWrapFuncForErrGroup(ctx, "service/scale/down", traceOpts, func(ctx context.Context) error {
-				return c.service.stopAndRemoveContainer(ctx, ctr, &service, timeout, false)
+				return c.compose.stopAndRemoveContainer(ctx, ctr, &service, timeout, false)
 			}))
 			continue
 		}
@@ -176,7 +176,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
 
 			i, ctr := i, ctr
 			eg.Go(tracing.SpanWrapFuncForErrGroup(ctx, "container/recreate", tracing.ContainerOptions(ctr), func(ctx context.Context) error {
-				recreated, err := c.service.recreateContainer(ctx, project, service, ctr, inherit, timeout)
+				recreated, err := c.compose.recreateContainer(ctx, project, service, ctr, inherit, timeout)
 				updated[i] = recreated
 				return err
 			}))
@@ -184,18 +184,17 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
 		}
 
 		// Enforce non-diverged containers are running
-		w := progress.ContextWriter(ctx)
 		name := getContainerProgressName(ctr)
 		switch ctr.State {
 		case container.StateRunning:
-			w.Event(progress.RunningEvent(name))
+			c.compose.events(ctx, progress.RunningEvent(name))
 		case container.StateCreated:
 		case container.StateRestarting:
 		case container.StateExited:
 		default:
 			ctr := ctr
 			eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "service/start", tracing.ContainerOptions(ctr), func(ctx context.Context) error {
-				return c.service.startContainer(ctx, ctr)
+				return c.compose.startContainer(ctx, ctr)
 			}))
 		}
 		updated[i] = ctr
@@ -214,7 +213,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
 				UseNetworkAliases: true,
 				Labels:            mergeLabels(service.Labels, service.CustomLabels),
 			}
-			ctr, err := c.service.createContainer(ctx, project, service, name, number, opts)
+			ctr, err := c.compose.createContainer(ctx, project, service, name, number, opts)
 			updated[actual+i] = ctr
 			return err
 		}))
@@ -234,7 +233,7 @@ func (c *convergence) stopDependentContainers(ctx context.Context, project *type
 	if len(dependents) == 0 {
 		return nil
 	}
-	err := c.service.stop(ctx, project.Name, api.StopOptions{
+	err := c.compose.stop(ctx, project.Name, api.StopOptions{
 		Services: dependents,
 		Project:  project,
 	}, nil)
@@ -454,7 +453,6 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 		ctx = withTimeout
 	}
 	eg, _ := errgroup.WithContext(ctx)
-	w := progress.ContextWriter(ctx)
 	for dep, config := range dependencies {
 		if shouldWait, err := shouldWaitForDependency(dep, config, project); err != nil {
 			return err
@@ -463,7 +461,7 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 		}
 
 		waitingFor := containers.filter(isService(dep), isNotOneOff)
-		w.Events(containerEvents(waitingFor, progress.Waiting))
+		s.events(ctx, containerEvents(waitingFor, progress.Waiting)...)
 		if len(waitingFor) == 0 {
 			if config.Required {
 				return fmt.Errorf("%s is missing dependency %s", dependant, dep)
@@ -486,29 +484,31 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 					healthy, err := s.isServiceHealthy(ctx, waitingFor, true)
 					if err != nil {
 						if !config.Required {
-							w.Events(containerReasonEvents(waitingFor, progress.SkippedEvent, fmt.Sprintf("optional dependency %q is not running or is unhealthy", dep)))
+							s.events(ctx, containerReasonEvents(waitingFor, progress.SkippedEvent,
+								fmt.Sprintf("optional dependency %q is not running or is unhealthy", dep))...)
 							logrus.Warnf("optional dependency %q is not running or is unhealthy: %s", dep, err.Error())
 							return nil
 						}
 						return err
 					}
 					if healthy {
-						w.Events(containerEvents(waitingFor, progress.Healthy))
+						s.events(ctx, containerEvents(waitingFor, progress.Healthy)...)
 						return nil
 					}
 				case types.ServiceConditionHealthy:
 					healthy, err := s.isServiceHealthy(ctx, waitingFor, false)
 					if err != nil {
 						if !config.Required {
-							w.Events(containerReasonEvents(waitingFor, progress.SkippedEvent, fmt.Sprintf("optional dependency %q failed to start", dep)))
+							s.events(ctx, containerReasonEvents(waitingFor, progress.SkippedEvent,
+								fmt.Sprintf("optional dependency %q failed to start", dep))...)
 							logrus.Warnf("optional dependency %q failed to start: %s", dep, err.Error())
 							return nil
 						}
-						w.Events(containerEvents(waitingFor, progress.ErrorEvent))
+						s.events(ctx, containerEvents(waitingFor, progress.ErrorEvent)...)
 						return fmt.Errorf("dependency failed to start: %w", err)
 					}
 					if healthy {
-						w.Events(containerEvents(waitingFor, progress.Healthy))
+						s.events(ctx, containerEvents(waitingFor, progress.Healthy)...)
 						return nil
 					}
 				case types.ServiceConditionCompletedSuccessfully:
@@ -518,20 +518,21 @@ func (s *composeService) waitDependencies(ctx context.Context, project *types.Pr
 					}
 					if exited {
 						if code == 0 {
-							w.Events(containerEvents(waitingFor, progress.Exited))
+							s.events(ctx, containerEvents(waitingFor, progress.Exited)...)
 							return nil
 						}
 
 						messageSuffix := fmt.Sprintf("%q didn't complete successfully: exit %d", dep, code)
 						if !config.Required {
 							// optional -> mark as skipped & don't propagate error
-							w.Events(containerReasonEvents(waitingFor, progress.SkippedEvent, fmt.Sprintf("optional dependency %s", messageSuffix)))
+							s.events(ctx, containerReasonEvents(waitingFor, progress.SkippedEvent,
+								fmt.Sprintf("optional dependency %s", messageSuffix))...)
 							logrus.Warnf("optional dependency %s", messageSuffix)
 							return nil
 						}
 
 						msg := fmt.Sprintf("service %s", messageSuffix)
-						w.Events(containerReasonEvents(waitingFor, progress.ErrorMessageEvent, msg))
+						s.events(ctx, containerReasonEvents(waitingFor, progress.ErrorMessageEvent, msg)...)
 						return errors.New(msg)
 					}
 				default:
@@ -593,13 +594,12 @@ func nextContainerNumber(containers []container.Summary) int {
 func (s *composeService) createContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
 	name string, number int, opts createOptions,
 ) (ctr container.Summary, err error) {
-	w := progress.ContextWriter(ctx)
 	eventName := "Container " + name
-	w.Event(progress.CreatingEvent(eventName))
-	ctr, err = s.createMobyContainer(ctx, project, service, name, number, nil, opts, w)
+	s.events(ctx, progress.CreatingEvent(eventName))
+	ctr, err = s.createMobyContainer(ctx, project, service, name, number, nil, opts)
 	if err != nil {
 		if ctx.Err() == nil {
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:         eventName,
 				Status:     progress.Error,
 				StatusText: err.Error(),
@@ -607,19 +607,18 @@ func (s *composeService) createContainer(ctx context.Context, project *types.Pro
 		}
 		return
 	}
-	w.Event(progress.CreatedEvent(eventName))
+	s.events(ctx, progress.CreatedEvent(eventName))
 	return
 }
 
 func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig,
 	replaced container.Summary, inherit bool, timeout *time.Duration,
 ) (created container.Summary, err error) {
-	w := progress.ContextWriter(ctx)
 	eventName := getContainerProgressName(replaced)
-	w.Event(progress.NewEvent(eventName, progress.Working, "Recreate"))
+	s.events(ctx, progress.NewEvent(eventName, progress.Working, "Recreate"))
 	defer func() {
 		if err != nil && ctx.Err() == nil {
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:         eventName,
 				Status:     progress.Error,
 				StatusText: err.Error(),
@@ -649,7 +648,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
 		UseNetworkAliases: true,
 		Labels:            mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replacedContainerName),
 	}
-	created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w)
+	created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts)
 	if err != nil {
 		return created, err
 	}
@@ -670,7 +669,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
 		return created, err
 	}
 
-	w.Event(progress.NewEvent(eventName, progress.Done, "Recreated"))
+	s.events(ctx, progress.NewEvent(eventName, progress.Done, "Recreated"))
 	return created, err
 }
 
@@ -678,27 +677,18 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
 var startMx sync.Mutex
 
 func (s *composeService) startContainer(ctx context.Context, ctr container.Summary) error {
-	w := progress.ContextWriter(ctx)
-	w.Event(progress.NewEvent(getContainerProgressName(ctr), progress.Working, "Restart"))
+	s.events(ctx, progress.NewEvent(getContainerProgressName(ctr), progress.Working, "Restart"))
 	startMx.Lock()
 	defer startMx.Unlock()
 	err := s.apiClient().ContainerStart(ctx, ctr.ID, container.StartOptions{})
 	if err != nil {
 		return err
 	}
-	w.Event(progress.NewEvent(getContainerProgressName(ctr), progress.Done, "Restarted"))
+	s.events(ctx, progress.NewEvent(getContainerProgressName(ctr), progress.Done, "Restarted"))
 	return nil
 }
 
-func (s *composeService) createMobyContainer(ctx context.Context,
-	project *types.Project,
-	service types.ServiceConfig,
-	name string,
-	number int,
-	inherit *container.Summary,
-	opts createOptions,
-	w progress.Writer,
-) (container.Summary, error) {
+func (s *composeService) createMobyContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int, inherit *container.Summary, opts createOptions, ) (container.Summary, error) {
 	var created container.Summary
 	cfgs, err := s.getCreateConfigs(ctx, project, service, number, inherit, opts)
 	if err != nil {
@@ -723,7 +713,7 @@ func (s *composeService) createMobyContainer(ctx context.Context,
 		return created, err
 	}
 	for _, warning := range response.Warnings {
-		w.Event(progress.Event{
+		s.events(ctx, progress.Event{
 			ID:     service.Name,
 			Status: progress.Warning,
 			Text:   warning,
@@ -894,7 +884,6 @@ func (s *composeService) startService(ctx context.Context,
 		return fmt.Errorf("service %q has no container to start", service.Name)
 	}
 
-	w := progress.ContextWriter(ctx)
 	for _, ctr := range containers.filter(isService(service.Name)) {
 		if ctr.State == container.StateRunning {
 			continue
@@ -911,7 +900,7 @@ func (s *composeService) startService(ctx context.Context,
 		}
 
 		eventName := getContainerProgressName(ctr)
-		w.Event(progress.StartingEvent(eventName))
+		s.events(ctx, progress.StartingEvent(eventName))
 		err = s.apiClient().ContainerStart(ctx, ctr.ID, container.StartOptions{})
 		if err != nil {
 			return err
@@ -924,7 +913,7 @@ func (s *composeService) startService(ctx context.Context,
 			}
 		}
 
-		w.Event(progress.StartedEvent(eventName))
+		s.events(ctx, progress.StartedEvent(eventName))
 	}
 	return nil
 }

+ 2 - 2
pkg/compose/convergence_test.go

@@ -343,7 +343,7 @@ func TestCreateMobyContainer(t *testing.T) {
 
 		_, err := tested.createMobyContainer(context.Background(), &project, service, "test", 0, nil, createOptions{
 			Labels: make(types.Labels),
-		}, progress.ContextWriter(context.TODO()))
+		})
 		assert.NilError(t, err)
 	})
 
@@ -430,7 +430,7 @@ func TestCreateMobyContainer(t *testing.T) {
 
 		_, err := tested.createMobyContainer(context.Background(), &project, service, "test", 0, nil, createOptions{
 			Labels: make(types.Labels),
-		}, progress.ContextWriter(context.TODO()))
+		})
 		assert.NilError(t, err)
 	})
 }

+ 2 - 3
pkg/compose/cp.go

@@ -79,7 +79,6 @@ func (s *composeService) copy(ctx context.Context, projectName string, options a
 		return err
 	}
 
-	w := progress.ContextWriter(ctx)
 	g := errgroup.Group{}
 	for _, cont := range containers {
 		ctr := cont
@@ -91,7 +90,7 @@ func (s *composeService) copy(ctx context.Context, projectName string, options a
 			} else {
 				msg = fmt.Sprintf("copy %s to %s:%s", srcPath, name, dstPath)
 			}
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:         name,
 				Text:       msg,
 				Status:     progress.Working,
@@ -100,7 +99,7 @@ func (s *composeService) copy(ctx context.Context, projectName string, options a
 			if err := copyFunc(ctx, ctr.ID, srcPath, dstPath, options); err != nil {
 				return err
 			}
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:         name,
 				Text:       msg,
 				Status:     progress.Done,

+ 7 - 9
pkg/compose/create.go

@@ -1394,15 +1394,14 @@ func (s *composeService) resolveOrCreateNetwork(ctx context.Context, project *ty
 	}
 
 	networkEventName := fmt.Sprintf("Network %s", n.Name)
-	w := progress.ContextWriter(ctx)
-	w.Event(progress.CreatingEvent(networkEventName))
+	s.events(ctx, progress.CreatingEvent(networkEventName))
 
 	resp, err := s.apiClient().NetworkCreate(ctx, n.Name, createOpts)
 	if err != nil {
-		w.Event(progress.ErrorEvent(networkEventName))
+		s.events(ctx, progress.ErrorEvent(networkEventName))
 		return "", fmt.Errorf("failed to create network %s: %w", n.Name, err)
 	}
-	w.Event(progress.CreatedEvent(networkEventName))
+	s.events(ctx, progress.CreatedEvent(networkEventName))
 
 	err = s.connectNetwork(ctx, n.Name, dangledContainers, nil)
 	if err != nil {
@@ -1444,7 +1443,7 @@ func (s *composeService) removeDivergedNetwork(ctx context.Context, project *typ
 
 	err = s.apiClient().NetworkRemove(ctx, n.Name)
 	eventName := fmt.Sprintf("Network %s", n.Name)
-	progress.ContextWriter(ctx).Event(progress.RemovedEvent(eventName))
+	s.events(ctx, progress.RemovedEvent(eventName))
 	return containers, err
 }
 
@@ -1623,8 +1622,7 @@ func (s *composeService) removeDivergedVolume(ctx context.Context, name string,
 
 func (s *composeService) createVolume(ctx context.Context, volume types.VolumeConfig) error {
 	eventName := fmt.Sprintf("Volume %s", volume.Name)
-	w := progress.ContextWriter(ctx)
-	w.Event(progress.CreatingEvent(eventName))
+	s.events(ctx, progress.CreatingEvent(eventName))
 	hash, err := VolumeHash(volume)
 	if err != nil {
 		return err
@@ -1637,9 +1635,9 @@ func (s *composeService) createVolume(ctx context.Context, volume types.VolumeCo
 		DriverOpts: volume.DriverOpts,
 	})
 	if err != nil {
-		w.Event(progress.ErrorEvent(eventName))
+		s.events(ctx, progress.ErrorEvent(eventName))
 		return err
 	}
-	w.Event(progress.CreatedEvent(eventName))
+	s.events(ctx, progress.CreatedEvent(eventName))
 	return nil
 }

+ 37 - 47
pkg/compose/down.go

@@ -44,7 +44,6 @@ func (s *composeService) Down(ctx context.Context, projectName string, options a
 }
 
 func (s *composeService) down(ctx context.Context, projectName string, options api.DownOptions) error { //nolint:gocyclo
-	w := progress.ContextWriter(ctx)
 	resourceToRemove := false
 
 	include := oneOffExclude
@@ -102,10 +101,10 @@ func (s *composeService) down(ctx context.Context, projectName string, options a
 		}
 	}
 
-	ops := s.ensureNetworksDown(ctx, project, w)
+	ops := s.ensureNetworksDown(ctx, project)
 
 	if options.Images != "" {
-		imgOps, err := s.ensureImagesDown(ctx, project, options, w)
+		imgOps, err := s.ensureImagesDown(ctx, project, options)
 		if err != nil {
 			return err
 		}
@@ -113,7 +112,7 @@ func (s *composeService) down(ctx context.Context, projectName string, options a
 	}
 
 	if options.Volumes {
-		ops = append(ops, s.ensureVolumesDown(ctx, project, w)...)
+		ops = append(ops, s.ensureVolumesDown(ctx, project)...)
 	}
 
 	if !resourceToRemove && len(ops) == 0 {
@@ -144,7 +143,7 @@ func checkSelectedServices(options api.DownOptions, project *types.Project) ([]s
 	return services, nil
 }
 
-func (s *composeService) ensureVolumesDown(ctx context.Context, project *types.Project, w progress.Writer) []downOp {
+func (s *composeService) ensureVolumesDown(ctx context.Context, project *types.Project) []downOp {
 	var ops []downOp
 	for _, vol := range project.Volumes {
 		if vol.External {
@@ -152,14 +151,14 @@ func (s *composeService) ensureVolumesDown(ctx context.Context, project *types.P
 		}
 		volumeName := vol.Name
 		ops = append(ops, func() error {
-			return s.removeVolume(ctx, volumeName, w)
+			return s.removeVolume(ctx, volumeName)
 		})
 	}
 
 	return ops
 }
 
-func (s *composeService) ensureImagesDown(ctx context.Context, project *types.Project, options api.DownOptions, w progress.Writer) ([]downOp, error) {
+func (s *composeService) ensureImagesDown(ctx context.Context, project *types.Project, options api.DownOptions) ([]downOp, error) {
 	imagePruner := NewImagePruner(s.apiClient(), project)
 	pruneOpts := ImagePruneOptions{
 		Mode:          ImagePruneMode(options.Images),
@@ -174,13 +173,13 @@ func (s *composeService) ensureImagesDown(ctx context.Context, project *types.Pr
 	for i := range images {
 		img := images[i]
 		ops = append(ops, func() error {
-			return s.removeImage(ctx, img, w)
+			return s.removeImage(ctx, img)
 		})
 	}
 	return ops, nil
 }
 
-func (s *composeService) ensureNetworksDown(ctx context.Context, project *types.Project, w progress.Writer) []downOp {
+func (s *composeService) ensureNetworksDown(ctx context.Context, project *types.Project) []downOp {
 	var ops []downOp
 	for key, n := range project.Networks {
 		if n.External {
@@ -190,13 +189,13 @@ func (s *composeService) ensureNetworksDown(ctx context.Context, project *types.
 		networkKey := key
 		idOrName := n.Name
 		ops = append(ops, func() error {
-			return s.removeNetwork(ctx, networkKey, project.Name, idOrName, w)
+			return s.removeNetwork(ctx, networkKey, project.Name, idOrName)
 		})
 	}
 	return ops
 }
 
-func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName string, projectName string, name string, w progress.Writer) error {
+func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName string, projectName string, name string) error {
 	networks, err := s.apiClient().NetworkList(ctx, network.ListOptions{
 		Filters: filters.NewArgs(
 			projectFilter(projectName),
@@ -211,7 +210,7 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 	}
 
 	eventName := fmt.Sprintf("Network %s", name)
-	w.Event(progress.RemovingEvent(eventName))
+	s.events(ctx, progress.RemovingEvent(eventName))
 
 	var found int
 	for _, net := range networks {
@@ -220,14 +219,14 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 		}
 		nw, err := s.apiClient().NetworkInspect(ctx, net.ID, network.InspectOptions{})
 		if errdefs.IsNotFound(err) {
-			w.Event(progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
+			s.events(ctx, progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
 			return nil
 		}
 		if err != nil {
 			return err
 		}
 		if len(nw.Containers) > 0 {
-			w.Event(progress.NewEvent(eventName, progress.Warning, "Resource is still in use"))
+			s.events(ctx, progress.NewEvent(eventName, progress.Warning, "Resource is still in use"))
 			found++
 			continue
 		}
@@ -236,10 +235,10 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 			if errdefs.IsNotFound(err) {
 				continue
 			}
-			w.Event(progress.ErrorEvent(eventName))
+			s.events(ctx, progress.ErrorEvent(eventName))
 			return fmt.Errorf("failed to remove network %s: %w", name, err)
 		}
-		w.Event(progress.RemovedEvent(eventName))
+		s.events(ctx, progress.RemovedEvent(eventName))
 		found++
 	}
 
@@ -247,32 +246,32 @@ func (s *composeService) removeNetwork(ctx context.Context, composeNetworkName s
 		// in practice, it's extremely unlikely for this to ever occur, as it'd
 		// mean the network was present when we queried at the start of this
 		// method but was then deleted by something else in the interim
-		w.Event(progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
+		s.events(ctx, progress.NewEvent(eventName, progress.Warning, "No resource found to remove"))
 		return nil
 	}
 	return nil
 }
 
-func (s *composeService) removeImage(ctx context.Context, image string, w progress.Writer) error {
+func (s *composeService) removeImage(ctx context.Context, image string) error {
 	id := fmt.Sprintf("Image %s", image)
-	w.Event(progress.NewEvent(id, progress.Working, "Removing"))
+	s.events(ctx, progress.NewEvent(id, progress.Working, "Removing"))
 	_, err := s.apiClient().ImageRemove(ctx, image, imageapi.RemoveOptions{})
 	if err == nil {
-		w.Event(progress.NewEvent(id, progress.Done, "Removed"))
+		s.events(ctx, progress.NewEvent(id, progress.Done, "Removed"))
 		return nil
 	}
 	if errdefs.IsConflict(err) {
-		w.Event(progress.NewEvent(id, progress.Warning, "Resource is still in use"))
+		s.events(ctx, progress.NewEvent(id, progress.Warning, "Resource is still in use"))
 		return nil
 	}
 	if errdefs.IsNotFound(err) {
-		w.Event(progress.NewEvent(id, progress.Done, "Warning: No resource found to remove"))
+		s.events(ctx, progress.NewEvent(id, progress.Done, "Warning: No resource found to remove"))
 		return nil
 	}
 	return err
 }
 
-func (s *composeService) removeVolume(ctx context.Context, id string, w progress.Writer) error {
+func (s *composeService) removeVolume(ctx context.Context, id string) error {
 	resource := fmt.Sprintf("Volume %s", id)
 
 	_, err := s.apiClient().VolumeInspect(ctx, id)
@@ -281,30 +280,26 @@ func (s *composeService) removeVolume(ctx context.Context, id string, w progress
 		return nil
 	}
 
-	w.Event(progress.NewEvent(resource, progress.Working, "Removing"))
+	s.events(ctx, progress.NewEvent(resource, progress.Working, "Removing"))
 	err = s.apiClient().VolumeRemove(ctx, id, true)
 	if err == nil {
-		w.Event(progress.NewEvent(resource, progress.Done, "Removed"))
+		s.events(ctx, progress.NewEvent(resource, progress.Done, "Removed"))
 		return nil
 	}
 	if errdefs.IsConflict(err) {
-		w.Event(progress.NewEvent(resource, progress.Warning, "Resource is still in use"))
+		s.events(ctx, progress.NewEvent(resource, progress.Warning, "Resource is still in use"))
 		return nil
 	}
 	if errdefs.IsNotFound(err) {
-		w.Event(progress.NewEvent(resource, progress.Done, "Warning: No resource found to remove"))
+		s.events(ctx, progress.NewEvent(resource, progress.Done, "Warning: No resource found to remove"))
 		return nil
 	}
 	return err
 }
 
-func (s *composeService) stopContainer(
-	ctx context.Context, w progress.Writer,
-	service *types.ServiceConfig, ctr containerType.Summary,
-	timeout *time.Duration, listener api.ContainerEventListener,
-) error {
+func (s *composeService) stopContainer(ctx context.Context, service *types.ServiceConfig, ctr containerType.Summary, timeout *time.Duration, listener api.ContainerEventListener, ) error {
 	eventName := getContainerProgressName(ctr)
-	w.Event(progress.StoppingEvent(eventName))
+	s.events(ctx, progress.StoppingEvent(eventName))
 
 	if service != nil {
 		for _, hook := range service.PreStop {
@@ -322,22 +317,18 @@ func (s *composeService) stopContainer(
 	timeoutInSecond := utils.DurationSecondToInt(timeout)
 	err := s.apiClient().ContainerStop(ctx, ctr.ID, containerType.StopOptions{Timeout: timeoutInSecond})
 	if err != nil {
-		w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
+		s.events(ctx, progress.ErrorMessageEvent(eventName, "Error while Stopping"))
 		return err
 	}
-	w.Event(progress.StoppedEvent(eventName))
+	s.events(ctx, progress.StoppedEvent(eventName))
 	return nil
 }
 
-func (s *composeService) stopContainers(
-	ctx context.Context, w progress.Writer,
-	serv *types.ServiceConfig, containers []containerType.Summary,
-	timeout *time.Duration, listener api.ContainerEventListener,
-) error {
+func (s *composeService) stopContainers(ctx context.Context, serv *types.ServiceConfig, containers []containerType.Summary, timeout *time.Duration, listener api.ContainerEventListener, ) error {
 	eg, ctx := errgroup.WithContext(ctx)
 	for _, ctr := range containers {
 		eg.Go(func() error {
-			return s.stopContainer(ctx, w, serv, ctr, timeout, listener)
+			return s.stopContainer(ctx, serv, ctr, timeout, listener)
 		})
 	}
 	return eg.Wait()
@@ -354,26 +345,25 @@ func (s *composeService) removeContainers(ctx context.Context, containers []cont
 }
 
 func (s *composeService) stopAndRemoveContainer(ctx context.Context, ctr containerType.Summary, service *types.ServiceConfig, timeout *time.Duration, volumes bool) error {
-	w := progress.ContextWriter(ctx)
 	eventName := getContainerProgressName(ctr)
-	err := s.stopContainer(ctx, w, service, ctr, timeout, nil)
+	err := s.stopContainer(ctx, service, ctr, timeout, nil)
 	if errdefs.IsNotFound(err) {
-		w.Event(progress.RemovedEvent(eventName))
+		s.events(ctx, progress.RemovedEvent(eventName))
 		return nil
 	}
 	if err != nil {
 		return err
 	}
-	w.Event(progress.RemovingEvent(eventName))
+	s.events(ctx, progress.RemovingEvent(eventName))
 	err = s.apiClient().ContainerRemove(ctx, ctr.ID, containerType.RemoveOptions{
 		Force:         true,
 		RemoveVolumes: volumes,
 	})
 	if err != nil && !errdefs.IsNotFound(err) && !errdefs.IsConflict(err) {
-		w.Event(progress.ErrorMessageEvent(eventName, "Error while Removing"))
+		s.events(ctx, progress.ErrorMessageEvent(eventName, "Error while Removing"))
 		return err
 	}
-	w.Event(progress.RemovedEvent(eventName))
+	s.events(ctx, progress.RemovedEvent(eventName))
 	return nil
 }
 

+ 3 - 5
pkg/compose/export.go

@@ -50,12 +50,10 @@ func (s *composeService) export(ctx context.Context, projectName string, options
 		return fmt.Errorf("failed to export container: %w", err)
 	}
 
-	w := progress.ContextWriter(ctx)
-
 	name := getCanonicalContainerName(container)
 	msg := fmt.Sprintf("export %s to %s", name, options.Output)
 
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:         name,
 		Text:       msg,
 		Status:     progress.Working,
@@ -69,7 +67,7 @@ func (s *composeService) export(ctx context.Context, projectName string, options
 
 	defer func() {
 		if err := responseBody.Close(); err != nil {
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:         name,
 				Text:       msg,
 				Status:     progress.Error,
@@ -94,7 +92,7 @@ func (s *composeService) export(ctx context.Context, projectName string, options
 		}
 	}
 
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:         name,
 		Text:       msg,
 		Status:     progress.Done,

+ 3 - 5
pkg/compose/kill.go

@@ -35,8 +35,6 @@ func (s *composeService) Kill(ctx context.Context, projectName string, options a
 }
 
 func (s *composeService) kill(ctx context.Context, projectName string, options api.KillOptions) error {
-	w := progress.ContextWriter(ctx)
-
 	services := options.Services
 
 	var containers Containers
@@ -65,13 +63,13 @@ func (s *composeService) kill(ctx context.Context, projectName string, options a
 	containers.forEach(func(ctr container.Summary) {
 		eg.Go(func() error {
 			eventName := getContainerProgressName(ctr)
-			w.Event(progress.KillingEvent(eventName))
+			s.events(ctx, progress.KillingEvent(eventName))
 			err := s.apiClient().ContainerKill(ctx, ctr.ID, options.Signal)
 			if err != nil {
-				w.Event(progress.ErrorMessageEvent(eventName, "Error while Killing"))
+				s.events(ctx, progress.ErrorMessageEvent(eventName, "Error while Killing"))
 				return err
 			}
-			w.Event(progress.KilledEvent(eventName))
+			s.events(ctx, progress.KilledEvent(eventName))
 			return nil
 		})
 	})

+ 9 - 10
pkg/compose/model.go

@@ -51,19 +51,18 @@ func (s *composeService) ensureModels(ctx context.Context, project *types.Projec
 		return api.SetModelVariables(ctx, project)
 	})
 
-	w := progress.ContextWriter(ctx)
 	for name, config := range project.Models {
 		if config.Name == "" {
 			config.Name = name
 		}
 		eg.Go(func() error {
 			if !slices.Contains(availableModels, config.Model) {
-				err = api.PullModel(ctx, config, quietPull, w)
+				err = api.PullModel(ctx, config, quietPull, s.events)
 				if err != nil {
 					return err
 				}
 			}
-			return api.ConfigureModel(ctx, config, w)
+			return api.ConfigureModel(ctx, config, s.events)
 		})
 	}
 	return eg.Wait()
@@ -102,8 +101,8 @@ func (m *modelAPI) Close() {
 	m.cleanup()
 }
 
-func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quietPull bool, w progress.Writer) error {
-	w.Event(progress.Event{
+func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quietPull bool, events EventBus) error {
+	events(ctx, progress.Event{
 		ID:     model.Name,
 		Status: progress.Working,
 		Text:   "Pulling",
@@ -132,7 +131,7 @@ func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quiet
 		}
 
 		if !quietPull {
-			w.Event(progress.Event{
+			events(ctx, progress.Event{
 				ID:         model.Name,
 				Status:     progress.Working,
 				Text:       "Pulling",
@@ -143,9 +142,9 @@ func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quiet
 
 	err = cmd.Wait()
 	if err != nil {
-		w.Event(progress.ErrorMessageEvent(model.Name, err.Error()))
+		events(ctx, progress.ErrorMessageEvent(model.Name, err.Error()))
 	}
-	w.Event(progress.Event{
+	events(ctx, progress.Event{
 		ID:     model.Name,
 		Status: progress.Working,
 		Text:   "Pulled",
@@ -153,8 +152,8 @@ func (m *modelAPI) PullModel(ctx context.Context, model types.ModelConfig, quiet
 	return err
 }
 
-func (m *modelAPI) ConfigureModel(ctx context.Context, config types.ModelConfig, w progress.Writer) error {
-	w.Event(progress.Event{
+func (m *modelAPI) ConfigureModel(ctx context.Context, config types.ModelConfig, events EventBus) error {
+	events(ctx, progress.Event{
 		ID:     config.Name,
 		Status: progress.Working,
 		Text:   "Configuring",

+ 2 - 4
pkg/compose/pause.go

@@ -43,14 +43,13 @@ func (s *composeService) pause(ctx context.Context, projectName string, options
 		containers = containers.filter(isService(options.Project.ServiceNames()...))
 	}
 
-	w := progress.ContextWriter(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
 	containers.forEach(func(container container.Summary) {
 		eg.Go(func() error {
 			err := s.apiClient().ContainerPause(ctx, container.ID)
 			if err == nil {
 				eventName := getContainerProgressName(container)
-				w.Event(progress.NewEvent(eventName, progress.Done, "Paused"))
+				s.events(ctx, progress.NewEvent(eventName, progress.Done, "Paused"))
 			}
 			return err
 		})
@@ -74,14 +73,13 @@ func (s *composeService) unPause(ctx context.Context, projectName string, option
 		containers = containers.filter(isService(options.Project.ServiceNames()...))
 	}
 
-	w := progress.ContextWriter(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
 	containers.forEach(func(ctr container.Summary) {
 		eg.Go(func() error {
 			err = s.apiClient().ContainerUnpause(ctx, ctr.ID)
 			if err == nil {
 				eventName := getContainerProgressName(ctr)
-				w.Event(progress.NewEvent(eventName, progress.Done, "Unpaused"))
+				s.events(ctx, progress.NewEvent(eventName, progress.Done, "Unpaused"))
 			}
 			return err
 		})

+ 7 - 8
pkg/compose/plugins.go

@@ -86,14 +86,13 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project,
 }
 
 func (s *composeService) executePlugin(ctx context.Context, cmd *exec.Cmd, command string, service types.ServiceConfig) (types.Mapping, error) {
-	pw := progress.ContextWriter(ctx)
 	var action string
 	switch command {
 	case "up":
-		pw.Event(progress.CreatingEvent(service.Name))
+		s.events(ctx, progress.CreatingEvent(service.Name))
 		action = "create"
 	case "down":
-		pw.Event(progress.RemovingEvent(service.Name))
+		s.events(ctx, progress.RemovingEvent(service.Name))
 		action = "remove"
 	default:
 		return nil, fmt.Errorf("unsupported plugin command: %s", command)
@@ -125,10 +124,10 @@ func (s *composeService) executePlugin(ctx context.Context, cmd *exec.Cmd, comma
 		}
 		switch msg.Type {
 		case ErrorType:
-			pw.Event(progress.NewEvent(service.Name, progress.Error, msg.Message))
+			s.events(ctx, progress.NewEvent(service.Name, progress.Error, msg.Message))
 			return nil, errors.New(msg.Message)
 		case InfoType:
-			pw.Event(progress.NewEvent(service.Name, progress.Working, msg.Message))
+			s.events(ctx, progress.NewEvent(service.Name, progress.Working, msg.Message))
 		case SetEnvType:
 			key, val, found := strings.Cut(msg.Message, "=")
 			if !found {
@@ -144,14 +143,14 @@ func (s *composeService) executePlugin(ctx context.Context, cmd *exec.Cmd, comma
 
 	err = cmd.Wait()
 	if err != nil {
-		pw.Event(progress.ErrorMessageEvent(service.Name, err.Error()))
+		s.events(ctx, progress.ErrorMessageEvent(service.Name, err.Error()))
 		return nil, fmt.Errorf("failed to %s service provider: %s", action, err.Error())
 	}
 	switch command {
 	case "up":
-		pw.Event(progress.CreatedEvent(service.Name))
+		s.events(ctx, progress.CreatedEvent(service.Name))
 	case "down":
-		pw.Event(progress.RemovedEvent(service.Name))
+		s.events(ctx, progress.RemovedEvent(service.Name))
 	}
 	return variables, nil
 }

+ 3 - 4
pkg/compose/publish.go

@@ -71,8 +71,7 @@ func (s *composeService) publish(ctx context.Context, project *types.Project, re
 		return err
 	}
 
-	w := progress.ContextWriter(ctx)
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:     repository,
 		Text:   "publishing",
 		Status: progress.Working,
@@ -94,7 +93,7 @@ func (s *composeService) publish(ctx context.Context, project *types.Project, re
 
 		descriptor, err := oci.PushManifest(ctx, resolver, named, layers, options.OCIVersion)
 		if err != nil {
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:     repository,
 				Text:   "publishing",
 				Status: progress.Error,
@@ -146,7 +145,7 @@ func (s *composeService) publish(ctx context.Context, project *types.Project, re
 			}
 		}
 	}
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:     repository,
 		Text:   "published",
 		Status: progress.Done,

+ 21 - 22
pkg/compose/pull.go

@@ -35,6 +35,7 @@ import (
 	"github.com/docker/docker/client"
 	"github.com/docker/docker/pkg/jsonmessage"
 	"github.com/opencontainers/go-digest"
+	"github.com/sirupsen/logrus"
 	"golang.org/x/sync/errgroup"
 
 	"github.com/docker/compose/v2/internal/registry"
@@ -54,7 +55,6 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 		return err
 	}
 
-	w := progress.ContextWriter(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
 	eg.SetLimit(s.maxConcurrency)
 
@@ -67,7 +67,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 	i := 0
 	for name, service := range project.Services {
 		if service.Image == "" {
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:     name,
 				Status: progress.Done,
 				Text:   "Skipped - No image to be pulled",
@@ -77,7 +77,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 
 		switch service.PullPolicy {
 		case types.PullPolicyNever, types.PullPolicyBuild:
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:     name,
 				Status: progress.Done,
 				Text:   "Skipped",
@@ -85,7 +85,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 			continue
 		case types.PullPolicyMissing, types.PullPolicyIfNotPresent:
 			if imageAlreadyPresent(service.Image, images) {
-				w.Event(progress.Event{
+				s.events(ctx, progress.Event{
 					ID:     name,
 					Status: progress.Done,
 					Text:   "Skipped - Image is already present locally",
@@ -95,7 +95,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 		}
 
 		if service.Build != nil && opts.IgnoreBuildable {
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:     name,
 				Status: progress.Done,
 				Text:   "Skipped - Image can be built",
@@ -103,11 +103,11 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 			continue
 		}
 
-		if s, ok := imagesBeingPulled[service.Image]; ok {
-			w.Event(progress.Event{
+		if img, ok := imagesBeingPulled[service.Image]; ok {
+			s.events(ctx, progress.Event{
 				ID:     name,
 				Status: progress.Done,
-				Text:   fmt.Sprintf("Skipped - Image is already being pulled by %v", s),
+				Text:   fmt.Sprintf("Skipped - Image is already being pulled by %v", img),
 			})
 			continue
 		}
@@ -116,7 +116,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 
 		idx := i
 		eg.Go(func() error {
-			_, err := s.pullServiceImage(ctx, service, w, opts.Quiet, project.Environment["DOCKER_DEFAULT_PLATFORM"])
+			_, err := s.pullServiceImage(ctx, service, opts.Quiet, project.Environment["DOCKER_DEFAULT_PLATFORM"])
 			if err != nil {
 				pullErrors[idx] = err
 				if service.Build != nil {
@@ -124,7 +124,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 				}
 				if !opts.IgnoreFailures && service.Build == nil {
 					if s.dryRun {
-						w.Event(progress.Event{
+						s.events(ctx, progress.Event{
 							ID:     name,
 							Status: progress.Error,
 							Text:   fmt.Sprintf(" - Pull error for image: %s", service.Image),
@@ -142,7 +142,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 	err = eg.Wait()
 
 	if len(mustBuild) > 0 {
-		w.TailMsgf("WARNING: Some service image(s) must be built from source by running:\n    docker compose build %s", strings.Join(mustBuild, " "))
+		logrus.Warnf("WARNING: Some service image(s) must be built from source by running:\n    docker compose build %s", strings.Join(mustBuild, " "))
 	}
 
 	if err != nil {
@@ -177,8 +177,8 @@ func getUnwrappedErrorMessage(err error) string {
 	return err.Error()
 }
 
-func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, w progress.Writer, quietPull bool, defaultPlatform string) (string, error) {
-	w.Event(progress.Event{
+func (s *composeService) pullServiceImage(ctx context.Context, service types.ServiceConfig, quietPull bool, defaultPlatform string, ) (string, error) {
+	s.events(ctx, progress.Event{
 		ID:     service.Name,
 		Status: progress.Working,
 		Text:   "Pulling",
@@ -204,7 +204,7 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 	})
 
 	if ctx.Err() != nil {
-		w.Event(progress.Event{
+		s.events(ctx, progress.Event{
 			ID:         service.Name,
 			Status:     progress.Warning,
 			StatusText: "Interrupted",
@@ -215,7 +215,7 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 	// check if has error and the service has a build section
 	// then the status should be warning instead of error
 	if err != nil && service.Build != nil {
-		w.Event(progress.Event{
+		s.events(ctx, progress.Event{
 			ID:         service.Name,
 			Status:     progress.Warning,
 			Text:       "Warning",
@@ -225,7 +225,7 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 	}
 
 	if err != nil {
-		w.Event(progress.Event{
+		s.events(ctx, progress.Event{
 			ID:         service.Name,
 			Status:     progress.Error,
 			Text:       "Error",
@@ -247,10 +247,10 @@ func (s *composeService) pullServiceImage(ctx context.Context, service types.Ser
 			return "", errors.New(jm.Error.Message)
 		}
 		if !quietPull {
-			toPullProgressEvent(service.Name, jm, w)
+			toPullProgressEvent(ctx, service.Name, jm, s.events)
 		}
 	}
-	w.Event(progress.Event{
+	s.events(ctx, progress.Event{
 		ID:     service.Name,
 		Status: progress.Done,
 		Text:   "Pulled",
@@ -321,14 +321,13 @@ func (s *composeService) pullRequiredImages(ctx context.Context, project *types.
 	}
 
 	return progress.Run(ctx, func(ctx context.Context) error {
-		w := progress.ContextWriter(ctx)
 		eg, ctx := errgroup.WithContext(ctx)
 		eg.SetLimit(s.maxConcurrency)
 		pulledImages := map[string]api.ImageSummary{}
 		var mutex sync.Mutex
 		for name, service := range needPull {
 			eg.Go(func() error {
-				id, err := s.pullServiceImage(ctx, service, w, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
+				id, err := s.pullServiceImage(ctx, service, quietPull, project.Environment["DOCKER_DEFAULT_PLATFORM"])
 				mutex.Lock()
 				defer mutex.Unlock()
 				pulledImages[name] = api.ImageSummary{
@@ -414,7 +413,7 @@ const (
 	PullCompletePhase      = "Pull complete"
 )
 
-func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, w progress.Writer) {
+func toPullProgressEvent(ctx context.Context, parent string, jm jsonmessage.JSONMessage, events EventBus) {
 	if jm.ID == "" || jm.Progress == nil {
 		return
 	}
@@ -456,7 +455,7 @@ func toPullProgressEvent(parent string, jm jsonmessage.JSONMessage, w progress.W
 		text = jm.Error.Message
 	}
 
-	w.Event(progress.Event{
+	events(ctx, progress.Event{
 		ID:         jm.ID,
 		ParentID:   parent,
 		Current:    current,

+ 13 - 9
pkg/compose/push.go

@@ -49,13 +49,12 @@ func (s *composeService) push(ctx context.Context, project *types.Project, optio
 	eg, ctx := errgroup.WithContext(ctx)
 	eg.SetLimit(s.maxConcurrency)
 
-	w := progress.ContextWriter(ctx)
 	for _, service := range project.Services {
 		if service.Build == nil || service.Image == "" {
 			if options.ImageMandatory && service.Image == "" && service.Provider == nil {
 				return fmt.Errorf("%q attribute is mandatory to push an image for service %q", "service.image", service.Name)
 			}
-			w.Event(progress.Event{
+			s.events(ctx, progress.Event{
 				ID:     service.Name,
 				Status: progress.Done,
 				Text:   "Skipped",
@@ -69,12 +68,16 @@ func (s *composeService) push(ctx context.Context, project *types.Project, optio
 
 		for _, tag := range tags {
 			eg.Go(func() error {
-				err := s.pushServiceImage(ctx, tag, w, options.Quiet)
+				s.events(ctx, progress.NewEvent(tag, progress.Working, "Pushing"))
+				err := s.pushServiceImage(ctx, tag, options.Quiet)
 				if err != nil {
 					if !options.IgnoreFailures {
+						s.events(ctx, progress.NewEvent(tag, progress.Error, err.Error()))
 						return err
 					}
-					w.TailMsgf("Pushing %s: %s", service.Name, err.Error())
+					s.events(ctx, progress.NewEvent(tag, progress.Warning, err.Error()))
+				} else {
+					s.events(ctx, progress.NewEvent(tag, progress.Done, "Pushed"))
 				}
 				return nil
 			})
@@ -83,7 +86,7 @@ func (s *composeService) push(ctx context.Context, project *types.Project, optio
 	return eg.Wait()
 }
 
-func (s *composeService) pushServiceImage(ctx context.Context, tag string, w progress.Writer, quietPush bool) error {
+func (s *composeService) pushServiceImage(ctx context.Context, tag string, quietPush bool) error {
 	ref, err := reference.ParseNormalizedNamed(tag)
 	if err != nil {
 		return err
@@ -119,14 +122,14 @@ func (s *composeService) pushServiceImage(ctx context.Context, tag string, w pro
 		}
 
 		if !quietPush {
-			toPushProgressEvent(tag, jm, w)
+			toPushProgressEvent(ctx, tag, jm, s.events)
 		}
 	}
 
 	return nil
 }
 
-func toPushProgressEvent(prefix string, jm jsonmessage.JSONMessage, w progress.Writer) {
+func toPushProgressEvent(ctx context.Context, prefix string, jm jsonmessage.JSONMessage, events EventBus) {
 	if jm.ID == "" {
 		// skipped
 		return
@@ -157,8 +160,9 @@ func toPushProgressEvent(prefix string, jm jsonmessage.JSONMessage, w progress.W
 		}
 	}
 
-	w.Event(progress.Event{
-		ID:         fmt.Sprintf("Pushing %s: %s", prefix, jm.ID),
+	events(ctx, progress.Event{
+		ParentID:   prefix,
+		ID:         jm.ID,
 		Text:       jm.Status,
 		Status:     status,
 		Current:    current,

+ 2 - 3
pkg/compose/remove.go

@@ -98,18 +98,17 @@ func (s *composeService) Remove(ctx context.Context, projectName string, options
 }
 
 func (s *composeService) remove(ctx context.Context, containers Containers, options api.RemoveOptions) error {
-	w := progress.ContextWriter(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
 	for _, ctr := range containers {
 		eg.Go(func() error {
 			eventName := getContainerProgressName(ctr)
-			w.Event(progress.RemovingEvent(eventName))
+			s.events(ctx, progress.RemovingEvent(eventName))
 			err := s.apiClient().ContainerRemove(ctx, ctr.ID, container.RemoveOptions{
 				RemoveVolumes: options.Volumes,
 				Force:         options.Force,
 			})
 			if err == nil {
-				w.Event(progress.RemovedEvent(eventName))
+				s.events(ctx, progress.RemovedEvent(eventName))
 			}
 			return err
 		})

+ 2 - 3
pkg/compose/restart.go

@@ -75,7 +75,6 @@ func (s *composeService) restart(ctx context.Context, projectName string, option
 		}
 	}
 
-	w := progress.ContextWriter(ctx)
 	return InDependencyOrder(ctx, project, func(c context.Context, service string) error {
 		config := project.Services[service]
 		err = s.waitDependencies(ctx, project, service, config.DependsOn, containers, 0)
@@ -94,13 +93,13 @@ func (s *composeService) restart(ctx context.Context, projectName string, option
 					}
 				}
 				eventName := getContainerProgressName(ctr)
-				w.Event(progress.RestartingEvent(eventName))
+				s.events(ctx, progress.RestartingEvent(eventName))
 				timeout := utils.DurationSecondToInt(options.Timeout)
 				err = s.apiClient().ContainerRestart(ctx, ctr.ID, container.StopOptions{Timeout: timeout})
 				if err != nil {
 					return err
 				}
-				w.Event(progress.StartedEvent(eventName))
+				s.events(ctx, progress.StartedEvent(eventName))
 				for _, hook := range def.PostStart {
 					err = s.runHook(ctx, ctr, def, hook, nil)
 					if err != nil {

+ 1 - 2
pkg/compose/stop.go

@@ -49,12 +49,11 @@ func (s *composeService) stop(ctx context.Context, projectName string, options a
 		options.Services = project.ServiceNames()
 	}
 
-	w := progress.ContextWriter(ctx)
 	return InReverseDependencyOrder(ctx, project, func(c context.Context, service string) error {
 		if !slices.Contains(options.Services, service) {
 			return nil
 		}
 		serv := project.Services[service]
-		return s.stopContainers(ctx, w, &serv, containers.filter(isService(service)).filter(isNotOneOff), options.Timeout, event)
+		return s.stopContainers(ctx, &serv, containers.filter(isService(service)).filter(isNotOneOff), options.Timeout, event)
 	})
 }