فهرست منبع

Merge pull request #1687 from mnowster/1651-parallelise-stopping-containers

1651 parallelise stopping containers
Aanand Prasad 10 سال پیش
والد
کامیت
35aef1aee0
8فایلهای تغییر یافته به همراه70 افزوده شده و 26 حذف شده
  1. 1 0
      compose/const.py
  2. 11 11
      compose/project.py
  3. 3 0
      compose/service.py
  4. 35 1
      compose/utils.py
  5. 6 0
      docs/cli.md
  6. 1 0
      requirements.txt
  7. 1 0
      setup.py
  8. 12 14
      tests/unit/project_test.py

+ 1 - 0
compose/const.py

@@ -1,4 +1,5 @@
 
+DEFAULT_MAX_WORKERS = 5
 DEFAULT_TIMEOUT = 10
 LABEL_CONTAINER_NUMBER = 'com.docker.compose.container-number'
 LABEL_ONE_OFF = 'com.docker.compose.oneoff'

+ 11 - 11
compose/project.py

@@ -1,15 +1,16 @@
 from __future__ import unicode_literals
 from __future__ import absolute_import
-import logging
 from functools import reduce
+import logging
 
 from docker.errors import APIError
 
 from .config import get_service_name_from_net, ConfigurationError
-from .const import LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF, DEFAULT_TIMEOUT
-from .service import Service
+from .const import DEFAULT_TIMEOUT, LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF
 from .container import Container
 from .legacy import check_for_legacy_containers
+from .service import Service
+from .utils import parallel_execute
 
 log = logging.getLogger(__name__)
 
@@ -197,12 +198,15 @@ class Project(object):
             service.start(**options)
 
     def stop(self, service_names=None, **options):
-        for service in reversed(self.get_services(service_names)):
-            service.stop(**options)
+        parallel_execute("stop", self.containers(service_names), "Stopping", "Stopped", **options)
 
     def kill(self, service_names=None, **options):
-        for service in reversed(self.get_services(service_names)):
-            service.kill(**options)
+        parallel_execute("kill", self.containers(service_names), "Killing", "Killed", **options)
+
+    def remove_stopped(self, service_names=None, **options):
+        all_containers = self.containers(service_names, stopped=True)
+        stopped_containers = [c for c in all_containers if not c.is_running]
+        parallel_execute("remove", stopped_containers, "Removing", "Removed", **options)
 
     def restart(self, service_names=None, **options):
         for service in self.get_services(service_names):
@@ -284,10 +288,6 @@ class Project(object):
         for service in self.get_services(service_names, include_deps=True):
             service.pull(insecure_registry=insecure_registry)
 
-    def remove_stopped(self, service_names=None, **options):
-        for service in self.get_services(service_names):
-            service.remove_stopped(**options)
-
     def containers(self, service_names=None, stopped=False, one_off=False):
         if service_names:
             self.validate_service_names(service_names)

+ 3 - 0
compose/service.py

@@ -129,6 +129,7 @@ class Service(object):
         for c in self.containers(stopped=True):
             self.start_container_if_stopped(c, **options)
 
+    # TODO: remove these functions, project takes care of starting/stopping,
     def stop(self, **options):
         for c in self.containers():
             log.info("Stopping %s..." % c.name)
@@ -144,6 +145,8 @@ class Service(object):
             log.info("Restarting %s..." % c.name)
             c.restart(**options)
 
+    # end TODO
+
     def scale(self, desired_num):
         """
         Adjusts the number of containers to the specified number and ensures

+ 35 - 1
compose/utils.py

@@ -1,5 +1,39 @@
-import json
 import hashlib
+import json
+import logging
+import os
+
+import concurrent.futures
+
+from .const import DEFAULT_MAX_WORKERS
+
+
+log = logging.getLogger(__name__)
+
+
+def parallel_execute(command, containers, doing_msg, done_msg, **options):
+    """
+    Execute a given command upon a list of containers in parallel.
+    """
+    max_workers = os.environ.get('MAX_WORKERS', DEFAULT_MAX_WORKERS)
+
+    def container_command_execute(container, command, **options):
+        log.info("{} {}...".format(doing_msg, container.name))
+        return getattr(container, command)(**options)
+
+    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+        future_container = {
+            executor.submit(
+                container_command_execute,
+                container,
+                command,
+                **options
+            ): container for container in containers
+        }
+
+        for future in concurrent.futures.as_completed(future_container):
+            container = future_container[future]
+            log.info("{} {}".format(done_msg, container.name))
 
 
 def json_hash(obj):

+ 6 - 0
docs/cli.md

@@ -193,6 +193,12 @@ the daemon.
 
 Configures the path to the `ca.pem`, `cert.pem`, and `key.pem` files used for TLS verification. Defaults to `~/.docker`.
 
+### DEFAULT\_MAX\_WORKERS
+
+Configures the maximum number of worker threads to be used when executing
+commands in parallel. Only a subset of commands execute in parallel, `stop`,
+`kill` and `rm`.
+
 ## Compose documentation
 
 - [User guide](/)

+ 1 - 0
requirements.txt

@@ -2,6 +2,7 @@ PyYAML==3.10
 docker-py==1.3.0
 dockerpty==0.3.4
 docopt==0.6.1
+futures==3.0.3
 requests==2.6.1
 six==1.7.3
 texttable==0.8.2

+ 1 - 0
setup.py

@@ -33,6 +33,7 @@ install_requires = [
     'docker-py >= 1.3.0, < 1.4',
     'dockerpty >= 0.3.4, < 0.4',
     'six >= 1.3.0, < 2',
+    'futures >= 3.0.3',
 ]
 
 

+ 12 - 14
tests/unit/project_test.py

@@ -9,6 +9,9 @@ import docker
 
 
 class ProjectTest(unittest.TestCase):
+    def setUp(self):
+        self.mock_client = mock.create_autospec(docker.Client)
+
     def test_from_dict(self):
         project = Project.from_dicts('composetest', [
             {
@@ -155,21 +158,19 @@ class ProjectTest(unittest.TestCase):
     def test_use_volumes_from_container(self):
         container_id = 'aabbccddee'
         container_dict = dict(Name='aaa', Id=container_id)
-        mock_client = mock.create_autospec(docker.Client)
-        mock_client.inspect_container.return_value = container_dict
+        self.mock_client.inspect_container.return_value = container_dict
         project = Project.from_dicts('test', [
             {
                 'name': 'test',
                 'image': 'busybox:latest',
                 'volumes_from': ['aaa']
             }
-        ], mock_client)
+        ], self.mock_client)
         self.assertEqual(project.get_service('test')._get_volumes_from(), [container_id])
 
     def test_use_volumes_from_service_no_container(self):
         container_name = 'test_vol_1'
-        mock_client = mock.create_autospec(docker.Client)
-        mock_client.containers.return_value = [
+        self.mock_client.containers.return_value = [
             {
                 "Name": container_name,
                 "Names": [container_name],
@@ -187,7 +188,7 @@ class ProjectTest(unittest.TestCase):
                 'image': 'busybox:latest',
                 'volumes_from': ['vol']
             }
-        ], mock_client)
+        ], self.mock_client)
         self.assertEqual(project.get_service('test')._get_volumes_from(), [container_name])
 
     @mock.patch.object(Service, 'containers')
@@ -211,13 +212,12 @@ class ProjectTest(unittest.TestCase):
         self.assertEqual(project.get_service('test')._get_volumes_from(), container_ids)
 
     def test_net_unset(self):
-        mock_client = mock.create_autospec(docker.Client)
         project = Project.from_dicts('test', [
             {
                 'name': 'test',
                 'image': 'busybox:latest',
             }
-        ], mock_client)
+        ], self.mock_client)
         service = project.get_service('test')
         self.assertEqual(service._get_net(), None)
         self.assertNotIn('NetworkMode', service._get_container_host_config({}))
@@ -225,22 +225,20 @@ class ProjectTest(unittest.TestCase):
     def test_use_net_from_container(self):
         container_id = 'aabbccddee'
         container_dict = dict(Name='aaa', Id=container_id)
-        mock_client = mock.create_autospec(docker.Client)
-        mock_client.inspect_container.return_value = container_dict
+        self.mock_client.inspect_container.return_value = container_dict
         project = Project.from_dicts('test', [
             {
                 'name': 'test',
                 'image': 'busybox:latest',
                 'net': 'container:aaa'
             }
-        ], mock_client)
+        ], self.mock_client)
         service = project.get_service('test')
         self.assertEqual(service._get_net(), 'container:' + container_id)
 
     def test_use_net_from_service(self):
         container_name = 'test_aaa_1'
-        mock_client = mock.create_autospec(docker.Client)
-        mock_client.containers.return_value = [
+        self.mock_client.containers.return_value = [
             {
                 "Name": container_name,
                 "Names": [container_name],
@@ -258,7 +256,7 @@ class ProjectTest(unittest.TestCase):
                 'image': 'busybox:latest',
                 'net': 'container:aaa'
             }
-        ], mock_client)
+        ], self.mock_client)
 
         service = project.get_service('test')
         self.assertEqual(service._get_net(), 'container:' + container_name)