|
@@ -38,7 +38,6 @@ from .errors import HealthCheckFailed
|
|
|
from .errors import NoHealthCheckConfigured
|
|
|
from .errors import OperationFailedError
|
|
|
from .parallel import parallel_execute
|
|
|
-from .parallel import parallel_start
|
|
|
from .progress_stream import stream_output
|
|
|
from .progress_stream import StreamOutputError
|
|
|
from .utils import json_hash
|
|
@@ -148,6 +147,7 @@ class Service(object):
|
|
|
network_mode=None,
|
|
|
networks=None,
|
|
|
secrets=None,
|
|
|
+ scale=None,
|
|
|
**options
|
|
|
):
|
|
|
self.name = name
|
|
@@ -159,6 +159,7 @@ class Service(object):
|
|
|
self.network_mode = network_mode or NetworkMode(None)
|
|
|
self.networks = networks or {}
|
|
|
self.secrets = secrets or []
|
|
|
+ self.scale_num = scale or 1
|
|
|
self.options = options
|
|
|
|
|
|
def __repr__(self):
|
|
@@ -189,16 +190,7 @@ class Service(object):
|
|
|
self.start_container_if_stopped(c, **options)
|
|
|
return containers
|
|
|
|
|
|
- def scale(self, desired_num, timeout=None):
|
|
|
- """
|
|
|
- Adjusts the number of containers to the specified number and ensures
|
|
|
- they are running.
|
|
|
-
|
|
|
- - creates containers until there are at least `desired_num`
|
|
|
- - stops containers until there are at most `desired_num` running
|
|
|
- - starts containers until there are at least `desired_num` running
|
|
|
- - removes all stopped containers
|
|
|
- """
|
|
|
+ def show_scale_warnings(self, desired_num):
|
|
|
if self.custom_container_name and desired_num > 1:
|
|
|
log.warn('The "%s" service is using the custom container name "%s". '
|
|
|
'Docker requires each container to have a unique name. '
|
|
@@ -210,14 +202,18 @@ class Service(object):
|
|
|
'for this service are created on a single host, the port will clash.'
|
|
|
% self.name)
|
|
|
|
|
|
- def create_and_start(service, number):
|
|
|
- container = service.create_container(number=number, quiet=True)
|
|
|
- service.start_container(container)
|
|
|
- return container
|
|
|
+ def scale(self, desired_num, timeout=None):
|
|
|
+ """
|
|
|
+ Adjusts the number of containers to the specified number and ensures
|
|
|
+ they are running.
|
|
|
|
|
|
- def stop_and_remove(container):
|
|
|
- container.stop(timeout=self.stop_timeout(timeout))
|
|
|
- container.remove()
|
|
|
+ - creates containers until there are at least `desired_num`
|
|
|
+ - stops containers until there are at most `desired_num` running
|
|
|
+ - starts containers until there are at least `desired_num` running
|
|
|
+ - removes all stopped containers
|
|
|
+ """
|
|
|
+
|
|
|
+ self.show_scale_warnings(desired_num)
|
|
|
|
|
|
running_containers = self.containers(stopped=False)
|
|
|
num_running = len(running_containers)
|
|
@@ -228,11 +224,10 @@ class Service(object):
|
|
|
return
|
|
|
|
|
|
if desired_num > num_running:
|
|
|
- # we need to start/create until we have desired_num
|
|
|
all_containers = self.containers(stopped=True)
|
|
|
|
|
|
if num_running != len(all_containers):
|
|
|
- # we have some stopped containers, let's start them up again
|
|
|
+ # we have some stopped containers, check for divergences
|
|
|
stopped_containers = [
|
|
|
c for c in all_containers if not c.is_running
|
|
|
]
|
|
@@ -241,38 +236,14 @@ class Service(object):
|
|
|
divergent_containers = [
|
|
|
c for c in stopped_containers if self._containers_have_diverged([c])
|
|
|
]
|
|
|
- stopped_containers = sorted(
|
|
|
- set(stopped_containers) - set(divergent_containers),
|
|
|
- key=attrgetter('number')
|
|
|
- )
|
|
|
for c in divergent_containers:
|
|
|
c.remove()
|
|
|
|
|
|
- num_stopped = len(stopped_containers)
|
|
|
-
|
|
|
- if num_stopped + num_running > desired_num:
|
|
|
- num_to_start = desired_num - num_running
|
|
|
- containers_to_start = stopped_containers[:num_to_start]
|
|
|
- else:
|
|
|
- containers_to_start = stopped_containers
|
|
|
-
|
|
|
- parallel_start(containers_to_start, {})
|
|
|
-
|
|
|
- num_running += len(containers_to_start)
|
|
|
+ all_containers = list(set(all_containers) - set(divergent_containers))
|
|
|
|
|
|
- num_to_create = desired_num - num_running
|
|
|
- next_number = self._next_container_number()
|
|
|
- container_numbers = [
|
|
|
- number for number in range(
|
|
|
- next_number, next_number + num_to_create
|
|
|
- )
|
|
|
- ]
|
|
|
-
|
|
|
- parallel_execute(
|
|
|
- container_numbers,
|
|
|
- lambda n: create_and_start(service=self, number=n),
|
|
|
- lambda n: self.get_container_name(n),
|
|
|
- "Creating and starting"
|
|
|
+ sorted_containers = sorted(all_containers, key=attrgetter('number'))
|
|
|
+ self._execute_convergence_start(
|
|
|
+ sorted_containers, desired_num, timeout, True, True
|
|
|
)
|
|
|
|
|
|
if desired_num < num_running:
|
|
@@ -282,12 +253,7 @@ class Service(object):
|
|
|
running_containers,
|
|
|
key=attrgetter('number'))
|
|
|
|
|
|
- parallel_execute(
|
|
|
- sorted_running_containers[-num_to_stop:],
|
|
|
- stop_and_remove,
|
|
|
- lambda c: c.name,
|
|
|
- "Stopping and removing",
|
|
|
- )
|
|
|
+ self._downscale(sorted_running_containers[-num_to_stop:], timeout)
|
|
|
|
|
|
def create_container(self,
|
|
|
one_off=False,
|
|
@@ -400,51 +366,109 @@ class Service(object):
|
|
|
|
|
|
return has_diverged
|
|
|
|
|
|
- def execute_convergence_plan(self,
|
|
|
- plan,
|
|
|
- timeout=None,
|
|
|
- detached=False,
|
|
|
- start=True):
|
|
|
- (action, containers) = plan
|
|
|
- should_attach_logs = not detached
|
|
|
-
|
|
|
- if action == 'create':
|
|
|
- container = self.create_container()
|
|
|
+ def _execute_convergence_create(self, scale, detached, start):
|
|
|
+ i = self._next_container_number()
|
|
|
|
|
|
- if should_attach_logs:
|
|
|
- container.attach_log_stream()
|
|
|
+ def create_and_start(service, n):
|
|
|
+ container = service.create_container(number=n)
|
|
|
+ if not detached:
|
|
|
+ container.attach_log_stream()
|
|
|
+ if start:
|
|
|
+ self.start_container(container)
|
|
|
+ return container
|
|
|
|
|
|
- if start:
|
|
|
- self.start_container(container)
|
|
|
+ return parallel_execute(
|
|
|
+ range(i, i + scale),
|
|
|
+ lambda n: create_and_start(self, n),
|
|
|
+ lambda n: self.get_container_name(n),
|
|
|
+ "Creating"
|
|
|
+ )[0]
|
|
|
|
|
|
- return [container]
|
|
|
+ def _execute_convergence_recreate(self, containers, scale, timeout, detached, start):
|
|
|
+ if len(containers) > scale:
|
|
|
+ self._downscale(containers[scale:], timeout)
|
|
|
+ containers = containers[:scale]
|
|
|
|
|
|
- elif action == 'recreate':
|
|
|
- return [
|
|
|
- self.recreate_container(
|
|
|
- container,
|
|
|
- timeout=timeout,
|
|
|
- attach_logs=should_attach_logs,
|
|
|
+ def recreate(container):
|
|
|
+ return self.recreate_container(
|
|
|
+ container, timeout=timeout, attach_logs=not detached,
|
|
|
start_new_container=start
|
|
|
)
|
|
|
- for container in containers
|
|
|
- ]
|
|
|
+ containers = parallel_execute(
|
|
|
+ containers,
|
|
|
+ recreate,
|
|
|
+ lambda c: c.name,
|
|
|
+ "Recreating"
|
|
|
+ )[0]
|
|
|
+ if len(containers) < scale:
|
|
|
+ containers.extend(self._execute_convergence_create(
|
|
|
+ scale - len(containers), detached, start
|
|
|
+ ))
|
|
|
+ return containers
|
|
|
|
|
|
- elif action == 'start':
|
|
|
+ def _execute_convergence_start(self, containers, scale, timeout, detached, start):
|
|
|
+ if len(containers) > scale:
|
|
|
+ self._downscale(containers[scale:], timeout)
|
|
|
+ containers = containers[:scale]
|
|
|
if start:
|
|
|
- for container in containers:
|
|
|
- self.start_container_if_stopped(container, attach_logs=should_attach_logs)
|
|
|
-
|
|
|
+ parallel_execute(
|
|
|
+ containers,
|
|
|
+ lambda c: self.start_container_if_stopped(c, attach_logs=not detached),
|
|
|
+ lambda c: c.name,
|
|
|
+ "Starting"
|
|
|
+ )
|
|
|
+ if len(containers) < scale:
|
|
|
+ containers.extend(self._execute_convergence_create(
|
|
|
+ scale - len(containers), detached, start
|
|
|
+ ))
|
|
|
return containers
|
|
|
|
|
|
- elif action == 'noop':
|
|
|
+ def _downscale(self, containers, timeout=None):
|
|
|
+ def stop_and_remove(container):
|
|
|
+ container.stop(timeout=self.stop_timeout(timeout))
|
|
|
+ container.remove()
|
|
|
+
|
|
|
+ parallel_execute(
|
|
|
+ containers,
|
|
|
+ stop_and_remove,
|
|
|
+ lambda c: c.name,
|
|
|
+ "Stopping and removing",
|
|
|
+ )
|
|
|
+
|
|
|
+ def execute_convergence_plan(self, plan, timeout=None, detached=False,
|
|
|
+ start=True, scale_override=None):
|
|
|
+ (action, containers) = plan
|
|
|
+ scale = scale_override if scale_override is not None else self.scale_num
|
|
|
+ containers = sorted(containers, key=attrgetter('number'))
|
|
|
+
|
|
|
+ self.show_scale_warnings(scale)
|
|
|
+
|
|
|
+ if action == 'create':
|
|
|
+ return self._execute_convergence_create(
|
|
|
+ scale, detached, start
|
|
|
+ )
|
|
|
+
|
|
|
+ if action == 'recreate':
|
|
|
+ return self._execute_convergence_recreate(
|
|
|
+ containers, scale, timeout, detached, start
|
|
|
+ )
|
|
|
+
|
|
|
+ if action == 'start':
|
|
|
+ return self._execute_convergence_start(
|
|
|
+ containers, scale, timeout, detached, start
|
|
|
+ )
|
|
|
+
|
|
|
+ if action == 'noop':
|
|
|
+ if scale != len(containers):
|
|
|
+ return self._execute_convergence_start(
|
|
|
+ containers, scale, timeout, detached, start
|
|
|
+ )
|
|
|
for c in containers:
|
|
|
log.info("%s is up-to-date" % c.name)
|
|
|
|
|
|
return containers
|
|
|
|
|
|
- else:
|
|
|
- raise Exception("Invalid action: {}".format(action))
|
|
|
+ raise Exception("Invalid action: {}".format(action))
|
|
|
|
|
|
def recreate_container(
|
|
|
self,
|