ソースを参照

Parallelise scale

Signed-off-by: Mazz Mosley <[email protected]>
Mazz Mosley 10 年 前
コミット
5c29ded6ac
2 ファイル変更79 行追加27 行削除
  1. 34 27
      compose/service.py
  2. 45 0
      compose/utils.py

+ 34 - 27
compose/service.py

@@ -24,7 +24,7 @@ from .const import (
 from .container import Container
 from .legacy import check_for_legacy_containers
 from .progress_stream import stream_output, StreamOutputError
-from .utils import json_hash
+from .utils import json_hash, parallel_create_execute, parallel_execute
 
 log = logging.getLogger(__name__)
 
@@ -162,36 +162,43 @@ class Service(object):
                      'for this service are created on a single host, the port will clash.'
                      % self.name)
 
-        # Create enough containers
-        containers = self.containers(stopped=True)
-        while len(containers) < desired_num:
-            containers.append(self.create_container())
+        def create_and_start(number):
+            container = self.create_container(number=number, quiet=True)
+            container.start()
+            return container
 
-        running_containers = []
-        stopped_containers = []
-        for c in containers:
-            if c.is_running:
-                running_containers.append(c)
-            else:
-                stopped_containers.append(c)
-        running_containers.sort(key=lambda c: c.number)
-        stopped_containers.sort(key=lambda c: c.number)
+        msgs = {'doing': 'Creating', 'done': 'Started'}
 
-        # Stop containers
-        while len(running_containers) > desired_num:
-            c = running_containers.pop()
-            log.info("Stopping %s..." % c.name)
-            c.stop(timeout=timeout)
-            stopped_containers.append(c)
+        running_containers = self.containers(stopped=False)
+        num_running = len(running_containers)
+
+        if desired_num == num_running:
+            # do nothing as we already have the desired number
+            log.info('Desired container number already achieved')
+            return
+
+        if desired_num > num_running:
+            num_to_create = desired_num - num_running
+            next_number = self._next_container_number()
+            container_numbers = [
+                number for number in range(
+                    next_number, next_number + num_to_create
+                )
+            ]
+            parallel_create_execute(create_and_start, container_numbers, msgs)
+
+        if desired_num < num_running:
+            sorted_running_containers = sorted(running_containers, key=attrgetter('number'))
 
-        # Start containers
-        while len(running_containers) < desired_num:
-            c = stopped_containers.pop(0)
-            log.info("Starting %s..." % c.name)
-            self.start_container(c)
-            running_containers.append(c)
+            if desired_num < num_running:
+                # count number of running containers.
+                num_to_stop = num_running - desired_num
 
-        self.remove_stopped()
+                containers_to_stop = sorted_running_containers[-num_to_stop:]
+                # TODO: refactor these out?
+                parallel_execute("stop", containers_to_stop, "Stopping", "Stopped")
+                parallel_execute("remove", containers_to_stop, "Removing", "Removed")
+        # self.remove_stopped()
 
     def remove_stopped(self, **options):
         for c in self.containers(stopped=True):

+ 45 - 0
compose/utils.py

@@ -12,6 +12,51 @@ from threading import Thread
 log = logging.getLogger(__name__)
 
 
+def parallel_create_execute(create_function, container_numbers, msgs={}, **options):
+    """
+    Parallel container creation by calling the create_function for each new container
+    number passed in.
+    """
+    stream = codecs.getwriter('utf-8')(sys.stdout)
+    lines = []
+    errors = {}
+
+    for number in container_numbers:
+        write_out_msg(stream, lines, number, msgs['doing'])
+
+    q = Queue()
+
+    def inner_call_function(create_function, number):
+        try:
+            container = create_function(number)
+        except APIError as e:
+            errors[number] = e.explanation
+        q.put(container)
+
+    for number in container_numbers:
+        t = Thread(
+            target=inner_call_function,
+            args=(create_function, number),
+            kwargs=options,
+        )
+        t.daemon = True
+        t.start()
+
+    done = 0
+    total_to_create = len(container_numbers)
+    while done < total_to_create:
+        try:
+            container = q.get(timeout=1)
+            write_out_msg(stream, lines, container.name, msgs['done'])
+            done += 1
+        except Empty:
+            pass
+
+    if errors:
+        for number in errors:
+            stream.write("ERROR: for {}  {} \n".format(number, errors[number]))
+
+
 def parallel_execute(command, containers, doing_msg, done_msg, **options):
     """
     Execute a given command upon a list of containers in parallel.