浏览代码

Add COMPOSE_PARALLEL_LIMIT to restrict global number of parallel operations

Signed-off-by: Shea Rozmiarek <[email protected]>
Shea Rozmiarek 7 年之前
父节点
当前提交
48166a79c7
共有 3 个文件被更改,包括 43 次插入1 次删除
  1. 1 0
      compose/const.py
  2. 15 1
      compose/parallel.py
  3. 27 0
      tests/unit/parallel_test.py

+ 1 - 0
compose/const.py

@@ -18,6 +18,7 @@ LABEL_VERSION = 'com.docker.compose.version'
 LABEL_VOLUME = 'com.docker.compose.volume'
 LABEL_VOLUME = 'com.docker.compose.volume'
 LABEL_CONFIG_HASH = 'com.docker.compose.config-hash'
 LABEL_CONFIG_HASH = 'com.docker.compose.config-hash'
 NANOCPUS_SCALE = 1000000000
 NANOCPUS_SCALE = 1000000000
+PARALLEL_LIMIT = 64
 
 
 SECRETS_PATH = '/run/secrets'
 SECRETS_PATH = '/run/secrets'
 
 

+ 15 - 1
compose/parallel.py

@@ -15,6 +15,8 @@ from six.moves.queue import Queue
 from compose.cli.colors import green
 from compose.cli.colors import green
 from compose.cli.colors import red
 from compose.cli.colors import red
 from compose.cli.signals import ShutdownException
 from compose.cli.signals import ShutdownException
+from compose.config.environment import Environment
+from compose.const import PARALLEL_LIMIT
 from compose.errors import HealthCheckFailed
 from compose.errors import HealthCheckFailed
 from compose.errors import NoHealthCheckConfigured
 from compose.errors import NoHealthCheckConfigured
 from compose.errors import OperationFailedError
 from compose.errors import OperationFailedError
@@ -26,6 +28,18 @@ log = logging.getLogger(__name__)
 STOP = object()
 STOP = object()
 
 
 
 
+def get_configured_limit():
+    limit = Environment.from_command_line({'COMPOSE_PARALLEL_LIMIT': None})['COMPOSE_PARALLEL_LIMIT']
+    if limit:
+        limit = int(limit)
+    else:
+        limit = PARALLEL_LIMIT
+    return limit
+
+
+global_limiter = Semaphore(get_configured_limit())
+
+
 def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None):
 def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None):
     """Runs func on objects in parallel while ensuring that func is
     """Runs func on objects in parallel while ensuring that func is
     ran on object only after it is ran on all its dependencies.
     ran on object only after it is ran on all its dependencies.
@@ -173,7 +187,7 @@ def producer(obj, func, results, limiter):
     The entry point for a producer thread which runs func on a single object.
     The entry point for a producer thread which runs func on a single object.
     Places a tuple on the results queue once func has either returned or raised.
     Places a tuple on the results queue once func has either returned or raised.
     """
     """
-    with limiter:
+    with limiter, global_limiter:
         try:
         try:
             result = func(obj)
             result = func(obj)
             results.put((obj, result, None))
             results.put((obj, result, None))

+ 27 - 0
tests/unit/parallel_test.py

@@ -1,11 +1,13 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 from __future__ import unicode_literals
 from __future__ import unicode_literals
 
 
+import os
 from threading import Lock
 from threading import Lock
 
 
 import six
 import six
 from docker.errors import APIError
 from docker.errors import APIError
 
 
+from compose.parallel import get_configured_limit
 from compose.parallel import parallel_execute
 from compose.parallel import parallel_execute
 from compose.parallel import parallel_execute_iter
 from compose.parallel import parallel_execute_iter
 from compose.parallel import ParallelStreamWriter
 from compose.parallel import ParallelStreamWriter
@@ -67,6 +69,31 @@ def test_parallel_execute_with_limit():
     assert errors == {}
     assert errors == {}
 
 
 
 
+def test_parallel_execute_with_global_limit():
+    os.environ['COMPOSE_PARALLEL_LIMIT'] = '1'
+    tasks = 20
+    lock = Lock()
+
+    assert get_configured_limit() == 1
+
+    def f(obj):
+        locked = lock.acquire(False)
+        # we should always get the lock because we're the only thread running
+        assert locked
+        lock.release()
+        return None
+
+    results, errors = parallel_execute(
+        objects=list(range(tasks)),
+        func=f,
+        get_name=six.text_type,
+        msg="Testing",
+    )
+
+    assert results == tasks * [None]
+    assert errors == {}
+
+
 def test_parallel_execute_with_deps():
 def test_parallel_execute_with_deps():
     log = []
     log = []