Browse Source

Dependency-ordered start/stop/up

Signed-off-by: Ilya Skriblovsky <[email protected]>
Ilya Skriblovsky 9 years ago
parent
commit
f933381a12
4 changed files with 128 additions and 46 deletions
  1. 70 36
      compose/parallel.py
  2. 53 7
      compose/project.py
  3. 3 2
      compose/service.py
  4. 2 1
      tests/acceptance/cli_test.py

+ 70 - 36
compose/parallel.py

@@ -14,68 +14,98 @@ from compose.cli.signals import ShutdownException
 from compose.utils import get_output_stream
 
 
-def perform_operation(func, arg, callback, index):
-    try:
-        callback((index, func(arg)))
-    except Exception as e:
-        callback((index, e))
+def parallel_execute(objects, func, get_name, msg, get_deps=None):
+    """Runs func on objects in parallel while ensuring that func is
+    ran on object only after it is ran on all its dependencies.
 
-
-def parallel_execute(objects, func, index_func, msg):
-    """For a given list of objects, call the callable passing in the first
-    object we give it.
+    get_deps called on object must return a collection with its dependencies.
+    get_name called on object must return its name.
     """
     objects = list(objects)
     stream = get_output_stream(sys.stderr)
+
     writer = ParallelStreamWriter(stream, msg)
-    q = setup_queue(writer, objects, func, index_func)
+    for obj in objects:
+        writer.initialize(get_name(obj))
+
+    q = setup_queue(objects, func, get_deps, get_name)
 
     done = 0
     errors = {}
+    error_to_reraise = None
+    returned = [None] * len(objects)
 
     while done < len(objects):
         try:
-            msg_index, result = q.get(timeout=1)
+            obj, result, exception = q.get(timeout=1)
         except Empty:
             continue
         # See https://github.com/docker/compose/issues/189
         except thread.error:
             raise ShutdownException()
 
-        if isinstance(result, APIError):
-            errors[msg_index] = "error", result.explanation
-            writer.write(msg_index, 'error')
-        elif isinstance(result, Exception):
-            errors[msg_index] = "unexpected_exception", result
+        if exception is None:
+            writer.write(get_name(obj), 'done')
+            returned[objects.index(obj)] = result
+        elif isinstance(exception, APIError):
+            errors[get_name(obj)] = exception.explanation
+            writer.write(get_name(obj), 'error')
         else:
-            writer.write(msg_index, 'done')
+            errors[get_name(obj)] = exception
+            error_to_reraise = exception
+
         done += 1
 
-    if not errors:
-        return
+    for obj_name, error in errors.items():
+        stream.write("\nERROR: for {}  {}\n".format(obj_name, error))
 
-    stream.write("\n")
-    for msg_index, (result, error) in errors.items():
-        stream.write("ERROR: for {}  {} \n".format(msg_index, error))
-        if result == 'unexpected_exception':
-            raise error
+    if error_to_reraise:
+        raise error_to_reraise
 
+    return returned
 
-def setup_queue(writer, objects, func, index_func):
-    for obj in objects:
-        writer.initialize(index_func(obj))
 
-    q = Queue()
+def _no_deps(x):
+    return []
 
-    # TODO: limit the number of threads #1828
-    for obj in objects:
-        t = Thread(
-            target=perform_operation,
-            args=(func, obj, q.put, index_func(obj)))
-        t.daemon = True
-        t.start()
 
-    return q
+def setup_queue(objects, func, get_deps, get_name):
+    if get_deps is None:
+        get_deps = _no_deps
+
+    results = Queue()
+
+    started = set()   # objects, threads were started for
+    finished = set()  # already finished objects
+
+    def do_op(obj):
+        try:
+            result = func(obj)
+            results.put((obj, result, None))
+        except Exception as e:
+            results.put((obj, None, e))
+
+        finished.add(obj)
+        feed()
+
+    def ready(obj):
+        # Is object ready for performing operation
+        return obj not in started and all(
+            dep not in objects or dep in finished
+            for dep in get_deps(obj)
+        )
+
+    def feed():
+        ready_objects = [o for o in objects if ready(o)]
+        for obj in ready_objects:
+            started.add(obj)
+            t = Thread(target=do_op,
+                       args=(obj,))
+            t.daemon = True
+            t.start()
+
+    feed()
+    return results
 
 
 class ParallelStreamWriter(object):
@@ -91,11 +121,15 @@ class ParallelStreamWriter(object):
         self.lines = []
 
     def initialize(self, obj_index):
+        if self.msg is None:
+            return
         self.lines.append(obj_index)
         self.stream.write("{} {} ... \r\n".format(self.msg, obj_index))
         self.stream.flush()
 
     def write(self, obj_index, status):
+        if self.msg is None:
+            return
         position = self.lines.index(obj_index)
         diff = len(self.lines) - position
         # move up

+ 53 - 7
compose/project.py

@@ -3,6 +3,7 @@ from __future__ import unicode_literals
 
 import datetime
 import logging
+import operator
 from functools import reduce
 
 from docker.errors import APIError
@@ -200,13 +201,40 @@ class Project(object):
 
     def start(self, service_names=None, **options):
         containers = []
-        for service in self.get_services(service_names):
-            service_containers = service.start(**options)
+
+        def start_service(service):
+            service_containers = service.start(quiet=True, **options)
             containers.extend(service_containers)
+
+        services = self.get_services(service_names)
+
+        def get_deps(service):
+            return {self.get_service(dep) for dep in service.get_dependency_names()}
+
+        parallel.parallel_execute(
+            services,
+            start_service,
+            operator.attrgetter('name'),
+            'Starting',
+            get_deps)
+
         return containers
 
     def stop(self, service_names=None, **options):
-        parallel.parallel_stop(self.containers(service_names), options)
+        containers = self.containers(service_names)
+
+        def get_deps(container):
+            # actually returning inversed dependencies
+            return {other for other in containers
+                    if container.service in
+                    self.get_service(other.service).get_dependency_names()}
+
+        parallel.parallel_execute(
+            containers,
+            operator.methodcaller('stop', **options),
+            operator.attrgetter('name'),
+            'Stopping',
+            get_deps)
 
     def pause(self, service_names=None, **options):
         containers = self.containers(service_names)
@@ -314,15 +342,33 @@ class Project(object):
             include_deps=start_deps)
 
         plans = self._get_convergence_plans(services, strategy)
-        return [
-            container
-            for service in services
-            for container in service.execute_convergence_plan(
+
+        for svc in services:
+            svc.ensure_image_exists(do_build=do_build)
+
+        def do(service):
+            return service.execute_convergence_plan(
                 plans[service.name],
                 do_build=do_build,
                 timeout=timeout,
                 detached=detached
             )
+
+        def get_deps(service):
+            return {self.get_service(dep) for dep in service.get_dependency_names()}
+
+        results = parallel.parallel_execute(
+            services,
+            do,
+            operator.attrgetter('name'),
+            None,
+            get_deps
+        )
+        return [
+            container
+            for svc_containers in results
+            if svc_containers is not None
+            for container in svc_containers
         ]
 
     def initialize(self):

+ 3 - 2
compose/service.py

@@ -436,9 +436,10 @@ class Service(object):
         container.remove()
         return new_container
 
-    def start_container_if_stopped(self, container, attach_logs=False):
+    def start_container_if_stopped(self, container, attach_logs=False, quiet=False):
         if not container.is_running:
-            log.info("Starting %s" % container.name)
+            if not quiet:
+                log.info("Starting %s" % container.name)
             if attach_logs:
                 container.attach_log_stream()
             return self.start_container(container)

+ 2 - 1
tests/acceptance/cli_test.py

@@ -8,6 +8,7 @@ import shlex
 import signal
 import subprocess
 import time
+from collections import Counter
 from collections import namedtuple
 from operator import attrgetter
 
@@ -1346,7 +1347,7 @@ class CLITestCase(DockerClientTestCase):
         os.kill(events_proc.pid, signal.SIGINT)
         result = wait_on_process(events_proc, returncode=1)
         lines = [json.loads(line) for line in result.stdout.rstrip().split('\n')]
-        assert [e['action'] for e in lines] == ['create', 'start', 'create', 'start']
+        assert Counter(e['action'] for e in lines) == {'create': 2, 'start': 2}
 
     def test_events_human_readable(self):
         events_proc = start_process(self.base_dir, ['events'])