|  | @@ -38,7 +38,6 @@ from .errors import HealthCheckFailed
 | 
	
		
			
				|  |  |  from .errors import NoHealthCheckConfigured
 | 
	
		
			
				|  |  |  from .errors import OperationFailedError
 | 
	
		
			
				|  |  |  from .parallel import parallel_execute
 | 
	
		
			
				|  |  | -from .parallel import parallel_start
 | 
	
		
			
				|  |  |  from .progress_stream import stream_output
 | 
	
		
			
				|  |  |  from .progress_stream import StreamOutputError
 | 
	
		
			
				|  |  |  from .utils import json_hash
 | 
	
	
		
			
				|  | @@ -148,6 +147,7 @@ class Service(object):
 | 
	
		
			
				|  |  |          network_mode=None,
 | 
	
		
			
				|  |  |          networks=None,
 | 
	
		
			
				|  |  |          secrets=None,
 | 
	
		
			
				|  |  | +        scale=None,
 | 
	
		
			
				|  |  |          **options
 | 
	
		
			
				|  |  |      ):
 | 
	
		
			
				|  |  |          self.name = name
 | 
	
	
		
			
				|  | @@ -159,6 +159,7 @@ class Service(object):
 | 
	
		
			
				|  |  |          self.network_mode = network_mode or NetworkMode(None)
 | 
	
		
			
				|  |  |          self.networks = networks or {}
 | 
	
		
			
				|  |  |          self.secrets = secrets or []
 | 
	
		
			
				|  |  | +        self.scale_num = scale or 1
 | 
	
		
			
				|  |  |          self.options = options
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __repr__(self):
 | 
	
	
		
			
				|  | @@ -189,16 +190,7 @@ class Service(object):
 | 
	
		
			
				|  |  |              self.start_container_if_stopped(c, **options)
 | 
	
		
			
				|  |  |          return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def scale(self, desired_num, timeout=None):
 | 
	
		
			
				|  |  | -        """
 | 
	
		
			
				|  |  | -        Adjusts the number of containers to the specified number and ensures
 | 
	
		
			
				|  |  | -        they are running.
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        - creates containers until there are at least `desired_num`
 | 
	
		
			
				|  |  | -        - stops containers until there are at most `desired_num` running
 | 
	
		
			
				|  |  | -        - starts containers until there are at least `desired_num` running
 | 
	
		
			
				|  |  | -        - removes all stopped containers
 | 
	
		
			
				|  |  | -        """
 | 
	
		
			
				|  |  | +    def show_scale_warnings(self, desired_num):
 | 
	
		
			
				|  |  |          if self.custom_container_name and desired_num > 1:
 | 
	
		
			
				|  |  |              log.warn('The "%s" service is using the custom container name "%s". '
 | 
	
		
			
				|  |  |                       'Docker requires each container to have a unique name. '
 | 
	
	
		
			
				|  | @@ -210,14 +202,18 @@ class Service(object):
 | 
	
		
			
				|  |  |                       'for this service are created on a single host, the port will clash.'
 | 
	
		
			
				|  |  |                       % self.name)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        def create_and_start(service, number):
 | 
	
		
			
				|  |  | -            container = service.create_container(number=number, quiet=True)
 | 
	
		
			
				|  |  | -            service.start_container(container)
 | 
	
		
			
				|  |  | -            return container
 | 
	
		
			
				|  |  | +    def scale(self, desired_num, timeout=None):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        Adjusts the number of containers to the specified number and ensures
 | 
	
		
			
				|  |  | +        they are running.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        def stop_and_remove(container):
 | 
	
		
			
				|  |  | -            container.stop(timeout=self.stop_timeout(timeout))
 | 
	
		
			
				|  |  | -            container.remove()
 | 
	
		
			
				|  |  | +        - creates containers until there are at least `desired_num`
 | 
	
		
			
				|  |  | +        - stops containers until there are at most `desired_num` running
 | 
	
		
			
				|  |  | +        - starts containers until there are at least `desired_num` running
 | 
	
		
			
				|  |  | +        - removes all stopped containers
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        self.show_scale_warnings(desired_num)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          running_containers = self.containers(stopped=False)
 | 
	
		
			
				|  |  |          num_running = len(running_containers)
 | 
	
	
		
			
				|  | @@ -228,11 +224,10 @@ class Service(object):
 | 
	
		
			
				|  |  |              return
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if desired_num > num_running:
 | 
	
		
			
				|  |  | -            # we need to start/create until we have desired_num
 | 
	
		
			
				|  |  |              all_containers = self.containers(stopped=True)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              if num_running != len(all_containers):
 | 
	
		
			
				|  |  | -                # we have some stopped containers, let's start them up again
 | 
	
		
			
				|  |  | +                # we have some stopped containers, check for divergences
 | 
	
		
			
				|  |  |                  stopped_containers = [
 | 
	
		
			
				|  |  |                      c for c in all_containers if not c.is_running
 | 
	
		
			
				|  |  |                  ]
 | 
	
	
		
			
				|  | @@ -241,38 +236,14 @@ class Service(object):
 | 
	
		
			
				|  |  |                  divergent_containers = [
 | 
	
		
			
				|  |  |                      c for c in stopped_containers if self._containers_have_diverged([c])
 | 
	
		
			
				|  |  |                  ]
 | 
	
		
			
				|  |  | -                stopped_containers = sorted(
 | 
	
		
			
				|  |  | -                    set(stopped_containers) - set(divergent_containers),
 | 
	
		
			
				|  |  | -                    key=attrgetter('number')
 | 
	
		
			
				|  |  | -                )
 | 
	
		
			
				|  |  |                  for c in divergent_containers:
 | 
	
		
			
				|  |  |                          c.remove()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                num_stopped = len(stopped_containers)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                if num_stopped + num_running > desired_num:
 | 
	
		
			
				|  |  | -                    num_to_start = desired_num - num_running
 | 
	
		
			
				|  |  | -                    containers_to_start = stopped_containers[:num_to_start]
 | 
	
		
			
				|  |  | -                else:
 | 
	
		
			
				|  |  | -                    containers_to_start = stopped_containers
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                parallel_start(containers_to_start, {})
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                num_running += len(containers_to_start)
 | 
	
		
			
				|  |  | +                all_containers = list(set(all_containers) - set(divergent_containers))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            num_to_create = desired_num - num_running
 | 
	
		
			
				|  |  | -            next_number = self._next_container_number()
 | 
	
		
			
				|  |  | -            container_numbers = [
 | 
	
		
			
				|  |  | -                number for number in range(
 | 
	
		
			
				|  |  | -                    next_number, next_number + num_to_create
 | 
	
		
			
				|  |  | -                )
 | 
	
		
			
				|  |  | -            ]
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            parallel_execute(
 | 
	
		
			
				|  |  | -                container_numbers,
 | 
	
		
			
				|  |  | -                lambda n: create_and_start(service=self, number=n),
 | 
	
		
			
				|  |  | -                lambda n: self.get_container_name(n),
 | 
	
		
			
				|  |  | -                "Creating and starting"
 | 
	
		
			
				|  |  | +            sorted_containers = sorted(all_containers, key=attrgetter('number'))
 | 
	
		
			
				|  |  | +            self._execute_convergence_start(
 | 
	
		
			
				|  |  | +                sorted_containers, desired_num, timeout, True, True
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if desired_num < num_running:
 | 
	
	
		
			
				|  | @@ -282,12 +253,7 @@ class Service(object):
 | 
	
		
			
				|  |  |                  running_containers,
 | 
	
		
			
				|  |  |                  key=attrgetter('number'))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            parallel_execute(
 | 
	
		
			
				|  |  | -                sorted_running_containers[-num_to_stop:],
 | 
	
		
			
				|  |  | -                stop_and_remove,
 | 
	
		
			
				|  |  | -                lambda c: c.name,
 | 
	
		
			
				|  |  | -                "Stopping and removing",
 | 
	
		
			
				|  |  | -            )
 | 
	
		
			
				|  |  | +            self._downscale(sorted_running_containers[-num_to_stop:], timeout)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def create_container(self,
 | 
	
		
			
				|  |  |                           one_off=False,
 | 
	
	
		
			
				|  | @@ -400,51 +366,109 @@ class Service(object):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          return has_diverged
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def execute_convergence_plan(self,
 | 
	
		
			
				|  |  | -                                 plan,
 | 
	
		
			
				|  |  | -                                 timeout=None,
 | 
	
		
			
				|  |  | -                                 detached=False,
 | 
	
		
			
				|  |  | -                                 start=True):
 | 
	
		
			
				|  |  | -        (action, containers) = plan
 | 
	
		
			
				|  |  | -        should_attach_logs = not detached
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        if action == 'create':
 | 
	
		
			
				|  |  | -            container = self.create_container()
 | 
	
		
			
				|  |  | +    def _execute_convergence_create(self, scale, detached, start):
 | 
	
		
			
				|  |  | +            i = self._next_container_number()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            if should_attach_logs:
 | 
	
		
			
				|  |  | -                container.attach_log_stream()
 | 
	
		
			
				|  |  | +            def create_and_start(service, n):
 | 
	
		
			
				|  |  | +                container = service.create_container(number=n)
 | 
	
		
			
				|  |  | +                if not detached:
 | 
	
		
			
				|  |  | +                    container.attach_log_stream()
 | 
	
		
			
				|  |  | +                if start:
 | 
	
		
			
				|  |  | +                    self.start_container(container)
 | 
	
		
			
				|  |  | +                return container
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            if start:
 | 
	
		
			
				|  |  | -                self.start_container(container)
 | 
	
		
			
				|  |  | +            return parallel_execute(
 | 
	
		
			
				|  |  | +                range(i, i + scale),
 | 
	
		
			
				|  |  | +                lambda n: create_and_start(self, n),
 | 
	
		
			
				|  |  | +                lambda n: self.get_container_name(n),
 | 
	
		
			
				|  |  | +                "Creating"
 | 
	
		
			
				|  |  | +            )[0]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            return [container]
 | 
	
		
			
				|  |  | +    def _execute_convergence_recreate(self, containers, scale, timeout, detached, start):
 | 
	
		
			
				|  |  | +            if len(containers) > scale:
 | 
	
		
			
				|  |  | +                self._downscale(containers[scale:], timeout)
 | 
	
		
			
				|  |  | +                containers = containers[:scale]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        elif action == 'recreate':
 | 
	
		
			
				|  |  | -            return [
 | 
	
		
			
				|  |  | -                self.recreate_container(
 | 
	
		
			
				|  |  | -                    container,
 | 
	
		
			
				|  |  | -                    timeout=timeout,
 | 
	
		
			
				|  |  | -                    attach_logs=should_attach_logs,
 | 
	
		
			
				|  |  | +            def recreate(container):
 | 
	
		
			
				|  |  | +                return self.recreate_container(
 | 
	
		
			
				|  |  | +                    container, timeout=timeout, attach_logs=not detached,
 | 
	
		
			
				|  |  |                      start_new_container=start
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  | -                for container in containers
 | 
	
		
			
				|  |  | -            ]
 | 
	
		
			
				|  |  | +            containers = parallel_execute(
 | 
	
		
			
				|  |  | +                containers,
 | 
	
		
			
				|  |  | +                recreate,
 | 
	
		
			
				|  |  | +                lambda c: c.name,
 | 
	
		
			
				|  |  | +                "Recreating"
 | 
	
		
			
				|  |  | +            )[0]
 | 
	
		
			
				|  |  | +            if len(containers) < scale:
 | 
	
		
			
				|  |  | +                containers.extend(self._execute_convergence_create(
 | 
	
		
			
				|  |  | +                    scale - len(containers), detached, start
 | 
	
		
			
				|  |  | +                ))
 | 
	
		
			
				|  |  | +            return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        elif action == 'start':
 | 
	
		
			
				|  |  | +    def _execute_convergence_start(self, containers, scale, timeout, detached, start):
 | 
	
		
			
				|  |  | +            if len(containers) > scale:
 | 
	
		
			
				|  |  | +                self._downscale(containers[scale:], timeout)
 | 
	
		
			
				|  |  | +                containers = containers[:scale]
 | 
	
		
			
				|  |  |              if start:
 | 
	
		
			
				|  |  | -                for container in containers:
 | 
	
		
			
				|  |  | -                    self.start_container_if_stopped(container, attach_logs=should_attach_logs)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +                parallel_execute(
 | 
	
		
			
				|  |  | +                    containers,
 | 
	
		
			
				|  |  | +                    lambda c: self.start_container_if_stopped(c, attach_logs=not detached),
 | 
	
		
			
				|  |  | +                    lambda c: c.name,
 | 
	
		
			
				|  |  | +                    "Starting"
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +            if len(containers) < scale:
 | 
	
		
			
				|  |  | +                containers.extend(self._execute_convergence_create(
 | 
	
		
			
				|  |  | +                    scale - len(containers), detached, start
 | 
	
		
			
				|  |  | +                ))
 | 
	
		
			
				|  |  |              return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        elif action == 'noop':
 | 
	
		
			
				|  |  | +    def _downscale(self, containers, timeout=None):
 | 
	
		
			
				|  |  | +        def stop_and_remove(container):
 | 
	
		
			
				|  |  | +            container.stop(timeout=self.stop_timeout(timeout))
 | 
	
		
			
				|  |  | +            container.remove()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        parallel_execute(
 | 
	
		
			
				|  |  | +            containers,
 | 
	
		
			
				|  |  | +            stop_and_remove,
 | 
	
		
			
				|  |  | +            lambda c: c.name,
 | 
	
		
			
				|  |  | +            "Stopping and removing",
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def execute_convergence_plan(self, plan, timeout=None, detached=False,
 | 
	
		
			
				|  |  | +                                 start=True, scale_override=None):
 | 
	
		
			
				|  |  | +        (action, containers) = plan
 | 
	
		
			
				|  |  | +        scale = scale_override if scale_override is not None else self.scale_num
 | 
	
		
			
				|  |  | +        containers = sorted(containers, key=attrgetter('number'))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        self.show_scale_warnings(scale)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'create':
 | 
	
		
			
				|  |  | +            return self._execute_convergence_create(
 | 
	
		
			
				|  |  | +                scale, detached, start
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'recreate':
 | 
	
		
			
				|  |  | +            return self._execute_convergence_recreate(
 | 
	
		
			
				|  |  | +                containers, scale, timeout, detached, start
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'start':
 | 
	
		
			
				|  |  | +            return self._execute_convergence_start(
 | 
	
		
			
				|  |  | +                containers, scale, timeout, detached, start
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if action == 'noop':
 | 
	
		
			
				|  |  | +            if scale != len(containers):
 | 
	
		
			
				|  |  | +                return self._execute_convergence_start(
 | 
	
		
			
				|  |  | +                    containers, scale, timeout, detached, start
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  |              for c in containers:
 | 
	
		
			
				|  |  |                  log.info("%s is up-to-date" % c.name)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              return containers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        else:
 | 
	
		
			
				|  |  | -            raise Exception("Invalid action: {}".format(action))
 | 
	
		
			
				|  |  | +        raise Exception("Invalid action: {}".format(action))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def recreate_container(
 | 
	
		
			
				|  |  |              self,
 |