|
@@ -6,9 +6,11 @@ import sys
|
|
|
from threading import Thread
|
|
|
|
|
|
from docker.errors import APIError
|
|
|
+from six.moves import _thread as thread
|
|
|
from six.moves.queue import Empty
|
|
|
from six.moves.queue import Queue
|
|
|
|
|
|
+from compose.cli.signals import ShutdownException
|
|
|
from compose.utils import get_output_stream
|
|
|
|
|
|
|
|
@@ -26,19 +28,7 @@ def parallel_execute(objects, func, index_func, msg):
|
|
|
objects = list(objects)
|
|
|
stream = get_output_stream(sys.stderr)
|
|
|
writer = ParallelStreamWriter(stream, msg)
|
|
|
-
|
|
|
- for obj in objects:
|
|
|
- writer.initialize(index_func(obj))
|
|
|
-
|
|
|
- q = Queue()
|
|
|
-
|
|
|
- # TODO: limit the number of threads #1828
|
|
|
- for obj in objects:
|
|
|
- t = Thread(
|
|
|
- target=perform_operation,
|
|
|
- args=(func, obj, q.put, index_func(obj)))
|
|
|
- t.daemon = True
|
|
|
- t.start()
|
|
|
+ q = setup_queue(writer, objects, func, index_func)
|
|
|
|
|
|
done = 0
|
|
|
errors = {}
|
|
@@ -48,6 +38,9 @@ def parallel_execute(objects, func, index_func, msg):
|
|
|
msg_index, result = q.get(timeout=1)
|
|
|
except Empty:
|
|
|
continue
|
|
|
+ # See https://github.com/docker/compose/issues/189
|
|
|
+ except thread.error:
|
|
|
+ raise ShutdownException()
|
|
|
|
|
|
if isinstance(result, APIError):
|
|
|
errors[msg_index] = "error", result.explanation
|
|
@@ -68,6 +61,23 @@ def parallel_execute(objects, func, index_func, msg):
|
|
|
raise error
|
|
|
|
|
|
|
|
|
+def setup_queue(writer, objects, func, index_func):
|
|
|
+ for obj in objects:
|
|
|
+ writer.initialize(index_func(obj))
|
|
|
+
|
|
|
+ q = Queue()
|
|
|
+
|
|
|
+ # TODO: limit the number of threads #1828
|
|
|
+ for obj in objects:
|
|
|
+ t = Thread(
|
|
|
+ target=perform_operation,
|
|
|
+ args=(func, obj, q.put, index_func(obj)))
|
|
|
+ t.daemon = True
|
|
|
+ t.start()
|
|
|
+
|
|
|
+ return q
|
|
|
+
|
|
|
+
|
|
|
class ParallelStreamWriter(object):
|
|
|
"""Write out messages for operations happening in parallel.
|
|
|
|