Pārlūkot izejas kodu

introduce --parallel to limit concurrent engine calls

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 2 gadi atpakaļ
vecāks
revīzija
a0acc20d88

+ 9 - 4
cmd/compose/compose.go

@@ -263,10 +263,11 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command {
 
 	opts := projectOptions{}
 	var (
-		ansi    string
-		noAnsi  bool
-		verbose bool
-		version bool
+		ansi     string
+		noAnsi   bool
+		verbose  bool
+		version  bool
+		parallel int
 	)
 	c := &cobra.Command{
 		Short:            "Docker Compose",
@@ -325,6 +326,9 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command {
 				opts.ProjectDir = opts.WorkDir
 				fmt.Fprint(os.Stderr, aec.Apply("option '--workdir' is DEPRECATED at root level! Please use '--project-directory' instead.\n", aec.RedF))
 			}
+			if parallel > 0 {
+				backend.MaxConcurrency(parallel)
+			}
 			return nil
 		},
 	}
@@ -370,6 +374,7 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command {
 	)
 
 	c.Flags().StringVar(&ansi, "ansi", "auto", `Control when to print ANSI control characters ("never"|"always"|"auto")`)
+	c.Flags().IntVar(&parallel, "parallel", -1, `Control max parallelism, -1 for unlimited`)
 	c.Flags().BoolVarP(&version, "version", "v", false, "Show the Docker Compose version information")
 	c.Flags().MarkHidden("version") //nolint:errcheck
 	c.Flags().BoolVar(&noAnsi, "no-ansi", false, `Do not print ANSI control characters (DEPRECATED)`)

+ 1 - 0
docs/reference/compose.md

@@ -42,6 +42,7 @@ Docker Compose
 | `--compatibility` |  |  | Run compose in backward compatibility mode |
 | `--env-file` | `string` |  | Specify an alternate environment file. |
 | `-f`, `--file` | `stringArray` |  | Compose configuration files |
+| `--parallel` | `int` | `-1` | Control max parallelism, -1 for unlimited |
 | `--profile` | `stringArray` |  | Specify a profile to enable |
 | `--project-directory` | `string` |  | Specify an alternate working directory
 (default: the path of the, first specified, Compose file) |

+ 10 - 0
docs/reference/docker_compose.yaml

@@ -208,6 +208,16 @@ options:
       experimentalcli: false
       kubernetes: false
       swarm: false
+    - option: parallel
+      value_type: int
+      default_value: "-1"
+      description: Control max parallelism, -1 for unlimited
+      deprecated: false
+      hidden: false
+      experimental: false
+      experimentalcli: false
+      kubernetes: false
+      swarm: false
     - option: profile
       value_type: stringArray
       default_value: '[]'

+ 2 - 0
pkg/api/api.go

@@ -75,6 +75,8 @@ type Service interface {
 	Port(ctx context.Context, projectName string, service string, port uint16, options PortOptions) (string, int, error)
 	// Images executes the equivalent of a `compose images`
 	Images(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error)
+	// MaxConcurrency defines upper limit for concurrent operations against engine API
+	MaxConcurrency(parallel int)
 }
 
 // BuildOptions group options of the Build API

+ 6 - 0
pkg/api/proxy.go

@@ -50,6 +50,7 @@ type ServiceProxy struct {
 	EventsFn             func(ctx context.Context, project string, options EventsOptions) error
 	PortFn               func(ctx context.Context, project string, service string, port uint16, options PortOptions) (string, int, error)
 	ImagesFn             func(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error)
+	MaxConcurrencyFn     func(parallel int)
 	interceptors         []Interceptor
 }
 
@@ -87,6 +88,7 @@ func (s *ServiceProxy) WithService(service Service) *ServiceProxy {
 	s.EventsFn = service.Events
 	s.PortFn = service.Port
 	s.ImagesFn = service.Images
+	s.MaxConcurrencyFn = service.MaxConcurrency
 	return s
 }
 
@@ -308,3 +310,7 @@ func (s *ServiceProxy) Images(ctx context.Context, project string, options Image
 	}
 	return s.ImagesFn(ctx, project, options)
 }
+
+func (s *ServiceProxy) MaxConcurrency(i int) {
+	s.MaxConcurrencyFn(i)
+}

+ 8 - 2
pkg/compose/compose.go

@@ -41,12 +41,14 @@ import (
 // NewComposeService create a local implementation of the compose.Service API
 func NewComposeService(dockerCli command.Cli) api.Service {
 	return &composeService{
-		dockerCli: dockerCli,
+		dockerCli:      dockerCli,
+		maxConcurrency: -1,
 	}
 }
 
 type composeService struct {
-	dockerCli command.Cli
+	dockerCli      command.Cli
+	maxConcurrency int
 }
 
 func (s *composeService) apiClient() client.APIClient {
@@ -57,6 +59,10 @@ func (s *composeService) configFile() *configfile.ConfigFile {
 	return s.dockerCli.ConfigFile()
 }
 
+func (s *composeService) MaxConcurrency(i int) {
+	s.maxConcurrency = i
+}
+
 func (s *composeService) stdout() *streams.Out {
 	return s.dockerCli.Out()
 }

+ 2 - 0
pkg/compose/pull.go

@@ -63,6 +63,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 
 	w := progress.ContextWriter(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
+	eg.SetLimit(s.maxConcurrency)
 
 	var mustBuild []string
 
@@ -279,6 +280,7 @@ func (s *composeService) pullRequiredImages(ctx context.Context, project *types.
 	return progress.Run(ctx, func(ctx context.Context) error {
 		w := progress.ContextWriter(ctx)
 		eg, ctx := errgroup.WithContext(ctx)
+		eg.SetLimit(s.maxConcurrency)
 		pulledImages := make([]string, len(needPull))
 		for i, service := range needPull {
 			i, service := i, service

+ 1 - 0
pkg/compose/push.go

@@ -47,6 +47,7 @@ func (s *composeService) Push(ctx context.Context, project *types.Project, optio
 
 func (s *composeService) push(ctx context.Context, project *types.Project, options api.PushOptions) error {
 	eg, ctx := errgroup.WithContext(ctx)
+	eg.SetLimit(s.maxConcurrency)
 
 	info, err := s.apiClient().Info(ctx)
 	if err != nil {

+ 12 - 0
pkg/mocks/mock_docker_compose_api.go

@@ -194,6 +194,18 @@ func (mr *MockServiceMockRecorder) Logs(ctx, projectName, consumer, options inte
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Logs", reflect.TypeOf((*MockService)(nil).Logs), ctx, projectName, consumer, options)
 }
 
+// MaxConcurrency mocks base method.
+func (m *MockService) MaxConcurrency(parallel int) {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "MaxConcurrency", parallel)
+}
+
+// MaxConcurrency indicates an expected call of MaxConcurrency.
+func (mr *MockServiceMockRecorder) MaxConcurrency(parallel interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxConcurrency", reflect.TypeOf((*MockService)(nil).MaxConcurrency), parallel)
+}
+
 // Pause mocks base method.
 func (m *MockService) Pause(ctx context.Context, projectName string, options api.PauseOptions) error {
 	m.ctrl.T.Helper()