소스 검색

Merge pull request #10030 from ndeloof/max_concurrency

introduce --parallel to limit concurrent engine calls
Guillaume Lours 3 년 전
부모
커밋
12dad4f8d0
9개의 변경된 파일51개의 추가작업 그리고 6개의 파일을 삭제
  1. 9 4
      cmd/compose/compose.go
  2. 1 0
      docs/reference/compose.md
  3. 10 0
      docs/reference/docker_compose.yaml
  4. 2 0
      pkg/api/api.go
  5. 6 0
      pkg/api/proxy.go
  6. 8 2
      pkg/compose/compose.go
  7. 2 0
      pkg/compose/pull.go
  8. 1 0
      pkg/compose/push.go
  9. 12 0
      pkg/mocks/mock_docker_compose_api.go

+ 9 - 4
cmd/compose/compose.go

@@ -263,10 +263,11 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command {
 
 
 	opts := projectOptions{}
 	opts := projectOptions{}
 	var (
 	var (
-		ansi    string
-		noAnsi  bool
-		verbose bool
-		version bool
+		ansi     string
+		noAnsi   bool
+		verbose  bool
+		version  bool
+		parallel int
 	)
 	)
 	c := &cobra.Command{
 	c := &cobra.Command{
 		Short:            "Docker Compose",
 		Short:            "Docker Compose",
@@ -325,6 +326,9 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command {
 				opts.ProjectDir = opts.WorkDir
 				opts.ProjectDir = opts.WorkDir
 				fmt.Fprint(os.Stderr, aec.Apply("option '--workdir' is DEPRECATED at root level! Please use '--project-directory' instead.\n", aec.RedF))
 				fmt.Fprint(os.Stderr, aec.Apply("option '--workdir' is DEPRECATED at root level! Please use '--project-directory' instead.\n", aec.RedF))
 			}
 			}
+			if parallel > 0 {
+				backend.MaxConcurrency(parallel)
+			}
 			return nil
 			return nil
 		},
 		},
 	}
 	}
@@ -370,6 +374,7 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command {
 	)
 	)
 
 
 	c.Flags().StringVar(&ansi, "ansi", "auto", `Control when to print ANSI control characters ("never"|"always"|"auto")`)
 	c.Flags().StringVar(&ansi, "ansi", "auto", `Control when to print ANSI control characters ("never"|"always"|"auto")`)
+	c.Flags().IntVar(&parallel, "parallel", -1, `Control max parallelism, -1 for unlimited`)
 	c.Flags().BoolVarP(&version, "version", "v", false, "Show the Docker Compose version information")
 	c.Flags().BoolVarP(&version, "version", "v", false, "Show the Docker Compose version information")
 	c.Flags().MarkHidden("version") //nolint:errcheck
 	c.Flags().MarkHidden("version") //nolint:errcheck
 	c.Flags().BoolVar(&noAnsi, "no-ansi", false, `Do not print ANSI control characters (DEPRECATED)`)
 	c.Flags().BoolVar(&noAnsi, "no-ansi", false, `Do not print ANSI control characters (DEPRECATED)`)

+ 1 - 0
docs/reference/compose.md

@@ -42,6 +42,7 @@ Docker Compose
 | `--compatibility` |  |  | Run compose in backward compatibility mode |
 | `--compatibility` |  |  | Run compose in backward compatibility mode |
 | `--env-file` | `string` |  | Specify an alternate environment file. |
 | `--env-file` | `string` |  | Specify an alternate environment file. |
 | `-f`, `--file` | `stringArray` |  | Compose configuration files |
 | `-f`, `--file` | `stringArray` |  | Compose configuration files |
+| `--parallel` | `int` | `-1` | Control max parallelism, -1 for unlimited |
 | `--profile` | `stringArray` |  | Specify a profile to enable |
 | `--profile` | `stringArray` |  | Specify a profile to enable |
 | `--project-directory` | `string` |  | Specify an alternate working directory
 | `--project-directory` | `string` |  | Specify an alternate working directory
 (default: the path of the, first specified, Compose file) |
 (default: the path of the, first specified, Compose file) |

+ 10 - 0
docs/reference/docker_compose.yaml

@@ -208,6 +208,16 @@ options:
       experimentalcli: false
       experimentalcli: false
       kubernetes: false
       kubernetes: false
       swarm: false
       swarm: false
+    - option: parallel
+      value_type: int
+      default_value: "-1"
+      description: Control max parallelism, -1 for unlimited
+      deprecated: false
+      hidden: false
+      experimental: false
+      experimentalcli: false
+      kubernetes: false
+      swarm: false
     - option: profile
     - option: profile
       value_type: stringArray
       value_type: stringArray
       default_value: '[]'
       default_value: '[]'

+ 2 - 0
pkg/api/api.go

@@ -75,6 +75,8 @@ type Service interface {
 	Port(ctx context.Context, projectName string, service string, port uint16, options PortOptions) (string, int, error)
 	Port(ctx context.Context, projectName string, service string, port uint16, options PortOptions) (string, int, error)
 	// Images executes the equivalent of a `compose images`
 	// Images executes the equivalent of a `compose images`
 	Images(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error)
 	Images(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error)
+	// MaxConcurrency defines upper limit for concurrent operations against engine API
+	MaxConcurrency(parallel int)
 }
 }
 
 
 // BuildOptions group options of the Build API
 // BuildOptions group options of the Build API

+ 6 - 0
pkg/api/proxy.go

@@ -50,6 +50,7 @@ type ServiceProxy struct {
 	EventsFn             func(ctx context.Context, project string, options EventsOptions) error
 	EventsFn             func(ctx context.Context, project string, options EventsOptions) error
 	PortFn               func(ctx context.Context, project string, service string, port uint16, options PortOptions) (string, int, error)
 	PortFn               func(ctx context.Context, project string, service string, port uint16, options PortOptions) (string, int, error)
 	ImagesFn             func(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error)
 	ImagesFn             func(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error)
+	MaxConcurrencyFn     func(parallel int)
 	interceptors         []Interceptor
 	interceptors         []Interceptor
 }
 }
 
 
@@ -87,6 +88,7 @@ func (s *ServiceProxy) WithService(service Service) *ServiceProxy {
 	s.EventsFn = service.Events
 	s.EventsFn = service.Events
 	s.PortFn = service.Port
 	s.PortFn = service.Port
 	s.ImagesFn = service.Images
 	s.ImagesFn = service.Images
+	s.MaxConcurrencyFn = service.MaxConcurrency
 	return s
 	return s
 }
 }
 
 
@@ -308,3 +310,7 @@ func (s *ServiceProxy) Images(ctx context.Context, project string, options Image
 	}
 	}
 	return s.ImagesFn(ctx, project, options)
 	return s.ImagesFn(ctx, project, options)
 }
 }
+
+func (s *ServiceProxy) MaxConcurrency(i int) {
+	s.MaxConcurrencyFn(i)
+}

+ 8 - 2
pkg/compose/compose.go

@@ -41,12 +41,14 @@ import (
 // NewComposeService create a local implementation of the compose.Service API
 // NewComposeService create a local implementation of the compose.Service API
 func NewComposeService(dockerCli command.Cli) api.Service {
 func NewComposeService(dockerCli command.Cli) api.Service {
 	return &composeService{
 	return &composeService{
-		dockerCli: dockerCli,
+		dockerCli:      dockerCli,
+		maxConcurrency: -1,
 	}
 	}
 }
 }
 
 
 type composeService struct {
 type composeService struct {
-	dockerCli command.Cli
+	dockerCli      command.Cli
+	maxConcurrency int
 }
 }
 
 
 func (s *composeService) apiClient() client.APIClient {
 func (s *composeService) apiClient() client.APIClient {
@@ -57,6 +59,10 @@ func (s *composeService) configFile() *configfile.ConfigFile {
 	return s.dockerCli.ConfigFile()
 	return s.dockerCli.ConfigFile()
 }
 }
 
 
+func (s *composeService) MaxConcurrency(i int) {
+	s.maxConcurrency = i
+}
+
 func (s *composeService) stdout() *streams.Out {
 func (s *composeService) stdout() *streams.Out {
 	return s.dockerCli.Out()
 	return s.dockerCli.Out()
 }
 }

+ 2 - 0
pkg/compose/pull.go

@@ -63,6 +63,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
 
 
 	w := progress.ContextWriter(ctx)
 	w := progress.ContextWriter(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
+	eg.SetLimit(s.maxConcurrency)
 
 
 	var mustBuild []string
 	var mustBuild []string
 
 
@@ -279,6 +280,7 @@ func (s *composeService) pullRequiredImages(ctx context.Context, project *types.
 	return progress.Run(ctx, func(ctx context.Context) error {
 	return progress.Run(ctx, func(ctx context.Context) error {
 		w := progress.ContextWriter(ctx)
 		w := progress.ContextWriter(ctx)
 		eg, ctx := errgroup.WithContext(ctx)
 		eg, ctx := errgroup.WithContext(ctx)
+		eg.SetLimit(s.maxConcurrency)
 		pulledImages := make([]string, len(needPull))
 		pulledImages := make([]string, len(needPull))
 		for i, service := range needPull {
 		for i, service := range needPull {
 			i, service := i, service
 			i, service := i, service

+ 1 - 0
pkg/compose/push.go

@@ -47,6 +47,7 @@ func (s *composeService) Push(ctx context.Context, project *types.Project, optio
 
 
 func (s *composeService) push(ctx context.Context, project *types.Project, options api.PushOptions) error {
 func (s *composeService) push(ctx context.Context, project *types.Project, options api.PushOptions) error {
 	eg, ctx := errgroup.WithContext(ctx)
 	eg, ctx := errgroup.WithContext(ctx)
+	eg.SetLimit(s.maxConcurrency)
 
 
 	info, err := s.apiClient().Info(ctx)
 	info, err := s.apiClient().Info(ctx)
 	if err != nil {
 	if err != nil {

+ 12 - 0
pkg/mocks/mock_docker_compose_api.go

@@ -194,6 +194,18 @@ func (mr *MockServiceMockRecorder) Logs(ctx, projectName, consumer, options inte
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Logs", reflect.TypeOf((*MockService)(nil).Logs), ctx, projectName, consumer, options)
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Logs", reflect.TypeOf((*MockService)(nil).Logs), ctx, projectName, consumer, options)
 }
 }
 
 
+// MaxConcurrency mocks base method.
+func (m *MockService) MaxConcurrency(parallel int) {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "MaxConcurrency", parallel)
+}
+
+// MaxConcurrency indicates an expected call of MaxConcurrency.
+func (mr *MockServiceMockRecorder) MaxConcurrency(parallel interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxConcurrency", reflect.TypeOf((*MockService)(nil).MaxConcurrency), parallel)
+}
+
 // Pause mocks base method.
 // Pause mocks base method.
 func (m *MockService) Pause(ctx context.Context, projectName string, options api.PauseOptions) error {
 func (m *MockService) Pause(ctx context.Context, projectName string, options api.PauseOptions) error {
 	m.ctrl.T.Helper()
 	m.ctrl.T.Helper()