|  | @@ -6,9 +6,11 @@ import sys
 | 
	
		
			
				|  |  |  from threading import Thread
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from docker.errors import APIError
 | 
	
		
			
				|  |  | +from six.moves import _thread as thread
 | 
	
		
			
				|  |  |  from six.moves.queue import Empty
 | 
	
		
			
				|  |  |  from six.moves.queue import Queue
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +from compose.cli.signals import ShutdownException
 | 
	
		
			
				|  |  |  from compose.utils import get_output_stream
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -26,19 +28,7 @@ def parallel_execute(objects, func, index_func, msg):
 | 
	
		
			
				|  |  |      objects = list(objects)
 | 
	
		
			
				|  |  |      stream = get_output_stream(sys.stderr)
 | 
	
		
			
				|  |  |      writer = ParallelStreamWriter(stream, msg)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    for obj in objects:
 | 
	
		
			
				|  |  | -        writer.initialize(index_func(obj))
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    q = Queue()
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    # TODO: limit the number of threads #1828
 | 
	
		
			
				|  |  | -    for obj in objects:
 | 
	
		
			
				|  |  | -        t = Thread(
 | 
	
		
			
				|  |  | -            target=perform_operation,
 | 
	
		
			
				|  |  | -            args=(func, obj, q.put, index_func(obj)))
 | 
	
		
			
				|  |  | -        t.daemon = True
 | 
	
		
			
				|  |  | -        t.start()
 | 
	
		
			
				|  |  | +    q = setup_queue(writer, objects, func, index_func)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      done = 0
 | 
	
		
			
				|  |  |      errors = {}
 | 
	
	
		
			
				|  | @@ -48,6 +38,9 @@ def parallel_execute(objects, func, index_func, msg):
 | 
	
		
			
				|  |  |              msg_index, result = q.get(timeout=1)
 | 
	
		
			
				|  |  |          except Empty:
 | 
	
		
			
				|  |  |              continue
 | 
	
		
			
				|  |  | +        # See https://github.com/docker/compose/issues/189
 | 
	
		
			
				|  |  | +        except thread.error:
 | 
	
		
			
				|  |  | +            raise ShutdownException()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if isinstance(result, APIError):
 | 
	
		
			
				|  |  |              errors[msg_index] = "error", result.explanation
 | 
	
	
		
			
				|  | @@ -68,6 +61,23 @@ def parallel_execute(objects, func, index_func, msg):
 | 
	
		
			
				|  |  |              raise error
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +def setup_queue(writer, objects, func, index_func):
 | 
	
		
			
				|  |  | +    for obj in objects:
 | 
	
		
			
				|  |  | +        writer.initialize(index_func(obj))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    q = Queue()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # TODO: limit the number of threads #1828
 | 
	
		
			
				|  |  | +    for obj in objects:
 | 
	
		
			
				|  |  | +        t = Thread(
 | 
	
		
			
				|  |  | +            target=perform_operation,
 | 
	
		
			
				|  |  | +            args=(func, obj, q.put, index_func(obj)))
 | 
	
		
			
				|  |  | +        t.daemon = True
 | 
	
		
			
				|  |  | +        t.start()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    return q
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  class ParallelStreamWriter(object):
 | 
	
		
			
				|  |  |      """Write out messages for operations happening in parallel.
 | 
	
		
			
				|  |  |  
 |