|  | @@ -38,7 +38,6 @@ from .errors import HealthCheckFailed
 | 
	
		
			
				|  |  |  from .errors import NoHealthCheckConfigured
 | 
	
		
			
				|  |  |  from .errors import OperationFailedError
 | 
	
		
			
				|  |  |  from .parallel import parallel_execute
 | 
	
		
			
				|  |  | -from .parallel import parallel_start
 | 
	
		
			
				|  |  |  from .progress_stream import stream_output
 | 
	
		
			
				|  |  |  from .progress_stream import StreamOutputError
 | 
	
		
			
				|  |  |  from .utils import json_hash
 | 
	
	
		
			
				|  | @@ -48,7 +47,7 @@ from .utils import parse_seconds_float
 | 
	
		
			
				|  |  |  log = logging.getLogger(__name__)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -DOCKER_START_KEYS = [
 | 
	
		
			
				|  |  | +HOST_CONFIG_KEYS = [
 | 
	
		
			
				|  |  |      'cap_add',
 | 
	
		
			
				|  |  |      'cap_drop',
 | 
	
		
			
				|  |  |      'cgroup_parent',
 | 
	
	
		
			
				|  | @@ -60,6 +59,7 @@ DOCKER_START_KEYS = [
 | 
	
		
			
				|  |  |      'env_file',
 | 
	
		
			
				|  |  |      'extra_hosts',
 | 
	
		
			
				|  |  |      'group_add',
 | 
	
		
			
				|  |  | +    'init',
 | 
	
		
			
				|  |  |      'ipc',
 | 
	
		
			
				|  |  |      'read_only',
 | 
	
		
			
				|  |  |      'log_driver',
 | 
	
	
		
			
				|  | @@ -147,6 +147,7 @@ class Service(object):
 | 
	
		
			
				|  |  |          network_mode=None,
 | 
	
		
			
				|  |  |          networks=None,
 | 
	
		
			
				|  |  |          secrets=None,
 | 
	
		
			
				|  |  | +        scale=None,
 | 
	
		
			
				|  |  |          **options
 | 
	
		
			
				|  |  |      ):
 | 
	
		
			
				|  |  |          self.name = name
 | 
	
	
		
			
				|  | @@ -158,6 +159,7 @@ class Service(object):
 | 
	
		
			
				|  |  |          self.network_mode = network_mode or NetworkMode(None)
 | 
	
		
			
				|  |  |          self.networks = networks or {}
 | 
	
		
			
				|  |  |          self.secrets = secrets or []
 | 
	
		
			
				|  |  | +        self.scale_num = scale or 1
 | 
	
		
			
				|  |  |          self.options = options
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __repr__(self):
 | 
	
	
		
			
				|  | @@ -188,16 +190,7 @@ class Service(object):
 | 
	
		
			
				|  |  |              self.start_container_if_stopped(c, **options)
 | 
	
		
			
				|  |  |          return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def scale(self, desired_num, timeout=None):
 | 
	
		
			
				|  |  | -        """
 | 
	
		
			
				|  |  | -        Adjusts the number of containers to the specified number and ensures
 | 
	
		
			
				|  |  | -        they are running.
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        - creates containers until there are at least `desired_num`
 | 
	
		
			
				|  |  | -        - stops containers until there are at most `desired_num` running
 | 
	
		
			
				|  |  | -        - starts containers until there are at least `desired_num` running
 | 
	
		
			
				|  |  | -        - removes all stopped containers
 | 
	
		
			
				|  |  | -        """
 | 
	
		
			
				|  |  | +    def show_scale_warnings(self, desired_num):
 | 
	
		
			
				|  |  |          if self.custom_container_name and desired_num > 1:
 | 
	
		
			
				|  |  |              log.warn('The "%s" service is using the custom container name "%s". '
 | 
	
		
			
				|  |  |                       'Docker requires each container to have a unique name. '
 | 
	
	
		
			
				|  | @@ -209,14 +202,18 @@ class Service(object):
 | 
	
		
			
				|  |  |                       'for this service are created on a single host, the port will clash.'
 | 
	
		
			
				|  |  |                       % self.name)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        def create_and_start(service, number):
 | 
	
		
			
				|  |  | -            container = service.create_container(number=number, quiet=True)
 | 
	
		
			
				|  |  | -            service.start_container(container)
 | 
	
		
			
				|  |  | -            return container
 | 
	
		
			
				|  |  | +    def scale(self, desired_num, timeout=None):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        Adjusts the number of containers to the specified number and ensures
 | 
	
		
			
				|  |  | +        they are running.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        def stop_and_remove(container):
 | 
	
		
			
				|  |  | -            container.stop(timeout=self.stop_timeout(timeout))
 | 
	
		
			
				|  |  | -            container.remove()
 | 
	
		
			
				|  |  | +        - creates containers until there are at least `desired_num`
 | 
	
		
			
				|  |  | +        - stops containers until there are at most `desired_num` running
 | 
	
		
			
				|  |  | +        - starts containers until there are at least `desired_num` running
 | 
	
		
			
				|  |  | +        - removes all stopped containers
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        self.show_scale_warnings(desired_num)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          running_containers = self.containers(stopped=False)
 | 
	
		
			
				|  |  |          num_running = len(running_containers)
 | 
	
	
		
			
				|  | @@ -227,11 +224,10 @@ class Service(object):
 | 
	
		
			
				|  |  |              return
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if desired_num > num_running:
 | 
	
		
			
				|  |  | -            # we need to start/create until we have desired_num
 | 
	
		
			
				|  |  |              all_containers = self.containers(stopped=True)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              if num_running != len(all_containers):
 | 
	
		
			
				|  |  | -                # we have some stopped containers, let's start them up again
 | 
	
		
			
				|  |  | +                # we have some stopped containers, check for divergences
 | 
	
		
			
				|  |  |                  stopped_containers = [
 | 
	
		
			
				|  |  |                      c for c in all_containers if not c.is_running
 | 
	
		
			
				|  |  |                  ]
 | 
	
	
		
			
				|  | @@ -240,38 +236,14 @@ class Service(object):
 | 
	
		
			
				|  |  |                  divergent_containers = [
 | 
	
		
			
				|  |  |                      c for c in stopped_containers if self._containers_have_diverged([c])
 | 
	
		
			
				|  |  |                  ]
 | 
	
		
			
				|  |  | -                stopped_containers = sorted(
 | 
	
		
			
				|  |  | -                    set(stopped_containers) - set(divergent_containers),
 | 
	
		
			
				|  |  | -                    key=attrgetter('number')
 | 
	
		
			
				|  |  | -                )
 | 
	
		
			
				|  |  |                  for c in divergent_containers:
 | 
	
		
			
				|  |  |                          c.remove()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                num_stopped = len(stopped_containers)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                if num_stopped + num_running > desired_num:
 | 
	
		
			
				|  |  | -                    num_to_start = desired_num - num_running
 | 
	
		
			
				|  |  | -                    containers_to_start = stopped_containers[:num_to_start]
 | 
	
		
			
				|  |  | -                else:
 | 
	
		
			
				|  |  | -                    containers_to_start = stopped_containers
 | 
	
		
			
				|  |  | +                all_containers = list(set(all_containers) - set(divergent_containers))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                parallel_start(containers_to_start, {})
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                num_running += len(containers_to_start)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            num_to_create = desired_num - num_running
 | 
	
		
			
				|  |  | -            next_number = self._next_container_number()
 | 
	
		
			
				|  |  | -            container_numbers = [
 | 
	
		
			
				|  |  | -                number for number in range(
 | 
	
		
			
				|  |  | -                    next_number, next_number + num_to_create
 | 
	
		
			
				|  |  | -                )
 | 
	
		
			
				|  |  | -            ]
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            parallel_execute(
 | 
	
		
			
				|  |  | -                container_numbers,
 | 
	
		
			
				|  |  | -                lambda n: create_and_start(service=self, number=n),
 | 
	
		
			
				|  |  | -                lambda n: self.get_container_name(n),
 | 
	
		
			
				|  |  | -                "Creating and starting"
 | 
	
		
			
				|  |  | +            sorted_containers = sorted(all_containers, key=attrgetter('number'))
 | 
	
		
			
				|  |  | +            self._execute_convergence_start(
 | 
	
		
			
				|  |  | +                sorted_containers, desired_num, timeout, True, True
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if desired_num < num_running:
 | 
	
	
		
			
				|  | @@ -281,12 +253,7 @@ class Service(object):
 | 
	
		
			
				|  |  |                  running_containers,
 | 
	
		
			
				|  |  |                  key=attrgetter('number'))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            parallel_execute(
 | 
	
		
			
				|  |  | -                sorted_running_containers[-num_to_stop:],
 | 
	
		
			
				|  |  | -                stop_and_remove,
 | 
	
		
			
				|  |  | -                lambda c: c.name,
 | 
	
		
			
				|  |  | -                "Stopping and removing",
 | 
	
		
			
				|  |  | -            )
 | 
	
		
			
				|  |  | +            self._downscale(sorted_running_containers[-num_to_stop:], timeout)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def create_container(self,
 | 
	
		
			
				|  |  |                           one_off=False,
 | 
	
	
		
			
				|  | @@ -399,51 +366,120 @@ class Service(object):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          return has_diverged
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def execute_convergence_plan(self,
 | 
	
		
			
				|  |  | -                                 plan,
 | 
	
		
			
				|  |  | -                                 timeout=None,
 | 
	
		
			
				|  |  | -                                 detached=False,
 | 
	
		
			
				|  |  | -                                 start=True):
 | 
	
		
			
				|  |  | -        (action, containers) = plan
 | 
	
		
			
				|  |  | -        should_attach_logs = not detached
 | 
	
		
			
				|  |  | +    def _execute_convergence_create(self, scale, detached, start):
 | 
	
		
			
				|  |  | +            i = self._next_container_number()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        if action == 'create':
 | 
	
		
			
				|  |  | -            container = self.create_container()
 | 
	
		
			
				|  |  | +            def create_and_start(service, n):
 | 
	
		
			
				|  |  | +                container = service.create_container(number=n)
 | 
	
		
			
				|  |  | +                if not detached:
 | 
	
		
			
				|  |  | +                    container.attach_log_stream()
 | 
	
		
			
				|  |  | +                if start:
 | 
	
		
			
				|  |  | +                    self.start_container(container)
 | 
	
		
			
				|  |  | +                return container
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            if should_attach_logs:
 | 
	
		
			
				|  |  | -                container.attach_log_stream()
 | 
	
		
			
				|  |  | +            containers, errors = parallel_execute(
 | 
	
		
			
				|  |  | +                range(i, i + scale),
 | 
	
		
			
				|  |  | +                lambda n: create_and_start(self, n),
 | 
	
		
			
				|  |  | +                lambda n: self.get_container_name(n),
 | 
	
		
			
				|  |  | +                "Creating"
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            for error in errors.values():
 | 
	
		
			
				|  |  | +                raise OperationFailedError(error)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            if start:
 | 
	
		
			
				|  |  | -                self.start_container(container)
 | 
	
		
			
				|  |  | +            return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            return [container]
 | 
	
		
			
				|  |  | +    def _execute_convergence_recreate(self, containers, scale, timeout, detached, start):
 | 
	
		
			
				|  |  | +            if len(containers) > scale:
 | 
	
		
			
				|  |  | +                self._downscale(containers[scale:], timeout)
 | 
	
		
			
				|  |  | +                containers = containers[:scale]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        elif action == 'recreate':
 | 
	
		
			
				|  |  | -            return [
 | 
	
		
			
				|  |  | -                self.recreate_container(
 | 
	
		
			
				|  |  | -                    container,
 | 
	
		
			
				|  |  | -                    timeout=timeout,
 | 
	
		
			
				|  |  | -                    attach_logs=should_attach_logs,
 | 
	
		
			
				|  |  | +            def recreate(container):
 | 
	
		
			
				|  |  | +                return self.recreate_container(
 | 
	
		
			
				|  |  | +                    container, timeout=timeout, attach_logs=not detached,
 | 
	
		
			
				|  |  |                      start_new_container=start
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  | -                for container in containers
 | 
	
		
			
				|  |  | -            ]
 | 
	
		
			
				|  |  | +            containers, errors = parallel_execute(
 | 
	
		
			
				|  |  | +                containers,
 | 
	
		
			
				|  |  | +                recreate,
 | 
	
		
			
				|  |  | +                lambda c: c.name,
 | 
	
		
			
				|  |  | +                "Recreating"
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            for error in errors.values():
 | 
	
		
			
				|  |  | +                raise OperationFailedError(error)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        elif action == 'start':
 | 
	
		
			
				|  |  | +            if len(containers) < scale:
 | 
	
		
			
				|  |  | +                containers.extend(self._execute_convergence_create(
 | 
	
		
			
				|  |  | +                    scale - len(containers), detached, start
 | 
	
		
			
				|  |  | +                ))
 | 
	
		
			
				|  |  | +            return containers
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _execute_convergence_start(self, containers, scale, timeout, detached, start):
 | 
	
		
			
				|  |  | +            if len(containers) > scale:
 | 
	
		
			
				|  |  | +                self._downscale(containers[scale:], timeout)
 | 
	
		
			
				|  |  | +                containers = containers[:scale]
 | 
	
		
			
				|  |  |              if start:
 | 
	
		
			
				|  |  | -                for container in containers:
 | 
	
		
			
				|  |  | -                    self.start_container_if_stopped(container, attach_logs=should_attach_logs)
 | 
	
		
			
				|  |  | +                _, errors = parallel_execute(
 | 
	
		
			
				|  |  | +                    containers,
 | 
	
		
			
				|  |  | +                    lambda c: self.start_container_if_stopped(c, attach_logs=not detached),
 | 
	
		
			
				|  |  | +                    lambda c: c.name,
 | 
	
		
			
				|  |  | +                    "Starting"
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                for error in errors.values():
 | 
	
		
			
				|  |  | +                    raise OperationFailedError(error)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +            if len(containers) < scale:
 | 
	
		
			
				|  |  | +                containers.extend(self._execute_convergence_create(
 | 
	
		
			
				|  |  | +                    scale - len(containers), detached, start
 | 
	
		
			
				|  |  | +                ))
 | 
	
		
			
				|  |  |              return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        elif action == 'noop':
 | 
	
		
			
				|  |  | +    def _downscale(self, containers, timeout=None):
 | 
	
		
			
				|  |  | +        def stop_and_remove(container):
 | 
	
		
			
				|  |  | +            container.stop(timeout=self.stop_timeout(timeout))
 | 
	
		
			
				|  |  | +            container.remove()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        parallel_execute(
 | 
	
		
			
				|  |  | +            containers,
 | 
	
		
			
				|  |  | +            stop_and_remove,
 | 
	
		
			
				|  |  | +            lambda c: c.name,
 | 
	
		
			
				|  |  | +            "Stopping and removing",
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def execute_convergence_plan(self, plan, timeout=None, detached=False,
 | 
	
		
			
				|  |  | +                                 start=True, scale_override=None):
 | 
	
		
			
				|  |  | +        (action, containers) = plan
 | 
	
		
			
				|  |  | +        scale = scale_override if scale_override is not None else self.scale_num
 | 
	
		
			
				|  |  | +        containers = sorted(containers, key=attrgetter('number'))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        self.show_scale_warnings(scale)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'create':
 | 
	
		
			
				|  |  | +            return self._execute_convergence_create(
 | 
	
		
			
				|  |  | +                scale, detached, start
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'recreate':
 | 
	
		
			
				|  |  | +            return self._execute_convergence_recreate(
 | 
	
		
			
				|  |  | +                containers, scale, timeout, detached, start
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'start':
 | 
	
		
			
				|  |  | +            return self._execute_convergence_start(
 | 
	
		
			
				|  |  | +                containers, scale, timeout, detached, start
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'noop':
 | 
	
		
			
				|  |  | +            if scale != len(containers):
 | 
	
		
			
				|  |  | +                return self._execute_convergence_start(
 | 
	
		
			
				|  |  | +                    containers, scale, timeout, detached, start
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  |              for c in containers:
 | 
	
		
			
				|  |  |                  log.info("%s is up-to-date" % c.name)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        else:
 | 
	
		
			
				|  |  | -            raise Exception("Invalid action: {}".format(action))
 | 
	
		
			
				|  |  | +        raise Exception("Invalid action: {}".format(action))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def recreate_container(
 | 
	
		
			
				|  |  |              self,
 | 
	
	
		
			
				|  | @@ -729,8 +765,8 @@ class Service(object):
 | 
	
		
			
				|  |  |              number,
 | 
	
		
			
				|  |  |              self.config_hash if add_config_hash else None)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        # Delete options which are only used when starting
 | 
	
		
			
				|  |  | -        for key in DOCKER_START_KEYS:
 | 
	
		
			
				|  |  | +        # Delete options which are only used in HostConfig
 | 
	
		
			
				|  |  | +        for key in HOST_CONFIG_KEYS:
 | 
	
		
			
				|  |  |              container_options.pop(key, None)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          container_options['host_config'] = self._get_container_host_config(
 | 
	
	
		
			
				|  | @@ -750,8 +786,12 @@ class Service(object):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          logging_dict = options.get('logging', None)
 | 
	
		
			
				|  |  |          log_config = get_log_config(logging_dict)
 | 
	
		
			
				|  |  | +        init_path = None
 | 
	
		
			
				|  |  | +        if isinstance(options.get('init'), six.string_types):
 | 
	
		
			
				|  |  | +            init_path = options.get('init')
 | 
	
		
			
				|  |  | +            options['init'] = True
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        host_config = self.client.create_host_config(
 | 
	
		
			
				|  |  | +        return self.client.create_host_config(
 | 
	
		
			
				|  |  |              links=self._get_links(link_to_self=one_off),
 | 
	
		
			
				|  |  |              port_bindings=build_port_bindings(
 | 
	
		
			
				|  |  |                  formatted_ports(options.get('ports', []))
 | 
	
	
		
			
				|  | @@ -786,15 +826,12 @@ class Service(object):
 | 
	
		
			
				|  |  |              oom_score_adj=options.get('oom_score_adj'),
 | 
	
		
			
				|  |  |              mem_swappiness=options.get('mem_swappiness'),
 | 
	
		
			
				|  |  |              group_add=options.get('group_add'),
 | 
	
		
			
				|  |  | -            userns_mode=options.get('userns_mode')
 | 
	
		
			
				|  |  | +            userns_mode=options.get('userns_mode'),
 | 
	
		
			
				|  |  | +            init=options.get('init', None),
 | 
	
		
			
				|  |  | +            init_path=init_path,
 | 
	
		
			
				|  |  | +            isolation=options.get('isolation'),
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        # TODO: Add as an argument to create_host_config once it's supported
 | 
	
		
			
				|  |  | -        # in docker-py
 | 
	
		
			
				|  |  | -        host_config['Isolation'] = options.get('isolation')
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        return host_config
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      def get_secret_volumes(self):
 | 
	
		
			
				|  |  |          def build_spec(secret):
 | 
	
		
			
				|  |  |              target = '{}/{}'.format(
 |