瀏覽代碼

Execute container commands in parallel

Commands able to use this parallelisation are `stop`, `kill` and `rm`.

We're using a backported function from python 3, to allow us to make
the most of a pool of threads without having to write the low level
code for managing this ourselves.

A default value for number of threads is a low enough number so it
shouldn't cause performance problems but if someone knows the
capability of their system and wants to increase it, they can via
an environment variable DEFAULT_MAX_WORKERS

Signed-off-by: Mazz Mosley <[email protected]>
Mazz Mosley 10 年之前
父節點
當前提交
a68ca199a2
共有 6 個文件被更改,包括 52 次插入12 次删除
  1. 1 0
      compose/const.py
  2. 11 11
      compose/project.py
  3. 3 0
      compose/service.py
  4. 35 1
      compose/utils.py
  5. 1 0
      requirements.txt
  6. 1 0
      setup.py

+ 1 - 0
compose/const.py

@@ -1,4 +1,5 @@
 
+DEFAULT_MAX_WORKERS = 5
 DEFAULT_TIMEOUT = 10
 LABEL_CONTAINER_NUMBER = 'com.docker.compose.container-number'
 LABEL_ONE_OFF = 'com.docker.compose.oneoff'

+ 11 - 11
compose/project.py

@@ -1,15 +1,16 @@
 from __future__ import unicode_literals
 from __future__ import absolute_import
-import logging
 from functools import reduce
+import logging
 
 from docker.errors import APIError
 
 from .config import get_service_name_from_net, ConfigurationError
-from .const import LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF, DEFAULT_TIMEOUT
-from .service import Service
+from .const import DEFAULT_TIMEOUT, LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF
 from .container import Container
 from .legacy import check_for_legacy_containers
+from .service import Service
+from .utils import parallel_execute
 
 log = logging.getLogger(__name__)
 
@@ -197,12 +198,15 @@ class Project(object):
             service.start(**options)
 
     def stop(self, service_names=None, **options):
-        for service in reversed(self.get_services(service_names)):
-            service.stop(**options)
+        parallel_execute("stop", self.containers(service_names), "Stopping", "Stopped", **options)
 
     def kill(self, service_names=None, **options):
-        for service in reversed(self.get_services(service_names)):
-            service.kill(**options)
+        parallel_execute("kill", self.containers(service_names), "Killing", "Killed", **options)
+
+    def remove_stopped(self, service_names=None, **options):
+        all_containers = self.containers(service_names, stopped=True)
+        stopped_containers = [c for c in all_containers if not c.is_running]
+        parallel_execute("remove", stopped_containers, "Removing", "Removed", **options)
 
     def restart(self, service_names=None, **options):
         for service in self.get_services(service_names):
@@ -284,10 +288,6 @@ class Project(object):
         for service in self.get_services(service_names, include_deps=True):
             service.pull(insecure_registry=insecure_registry)
 
-    def remove_stopped(self, service_names=None, **options):
-        for service in self.get_services(service_names):
-            service.remove_stopped(**options)
-
     def containers(self, service_names=None, stopped=False, one_off=False):
         if service_names:
             self.validate_service_names(service_names)

+ 3 - 0
compose/service.py

@@ -129,6 +129,7 @@ class Service(object):
         for c in self.containers(stopped=True):
             self.start_container_if_stopped(c, **options)
 
+    # TODO: remove these functions, project takes care of starting/stopping,
     def stop(self, **options):
         for c in self.containers():
             log.info("Stopping %s..." % c.name)
@@ -144,6 +145,8 @@ class Service(object):
             log.info("Restarting %s..." % c.name)
             c.restart(**options)
 
+    # end TODO
+
     def scale(self, desired_num):
         """
         Adjusts the number of containers to the specified number and ensures

+ 35 - 1
compose/utils.py

@@ -1,5 +1,39 @@
-import json
 import hashlib
+import json
+import logging
+import os
+
+import concurrent.futures
+
+from .const import DEFAULT_MAX_WORKERS
+
+
+log = logging.getLogger(__name__)
+
+
+def parallel_execute(command, containers, doing_msg, done_msg, **options):
+    """
+    Execute a given command upon a list of containers in parallel.
+    """
+    max_workers = os.environ.get('MAX_WORKERS', DEFAULT_MAX_WORKERS)
+
+    def container_command_execute(container, command, **options):
+        log.info("{} {}...".format(doing_msg, container.name))
+        return getattr(container, command)(**options)
+
+    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+        future_container = {
+            executor.submit(
+                container_command_execute,
+                container,
+                command,
+                **options
+            ): container for container in containers
+        }
+
+        for future in concurrent.futures.as_completed(future_container):
+            container = future_container[future]
+            log.info("{} {}".format(done_msg, container.name))
 
 
 def json_hash(obj):

+ 1 - 0
requirements.txt

@@ -2,6 +2,7 @@ PyYAML==3.10
 docker-py==1.2.3
 dockerpty==0.3.4
 docopt==0.6.1
+futures==3.0.3
 requests==2.6.1
 six==1.7.3
 texttable==0.8.2

+ 1 - 0
setup.py

@@ -33,6 +33,7 @@ install_requires = [
     'docker-py >= 1.2.3, < 1.3',
     'dockerpty >= 0.3.4, < 0.4',
     'six >= 1.3.0, < 2',
+    'futures >= 3.0.3',
 ]