Sfoglia il codice sorgente

Fix race condition

If processing of all objects finishes before the queue is drained,
parallel_execute_iter() returns prematurely.

Signed-off-by: Aanand Prasad <[email protected]>
Aanand Prasad 9 anni fa
parent
commit
7cfb5e7bc9
1 ha cambiato i file con 9 aggiunte e 1 eliminazioni
  1. 9 1
      compose/parallel.py

+ 9 - 1
compose/parallel.py

@@ -17,6 +17,8 @@ from compose.utils import get_output_stream
 
 log = logging.getLogger(__name__)
 
+STOP = object()
+
 
 def parallel_execute(objects, func, get_name, msg, get_deps=None):
     """Runs func on objects in parallel while ensuring that func is
@@ -108,7 +110,7 @@ def parallel_execute_iter(objects, func, get_deps):
     results = Queue()
     state = State(objects)
 
-    while not state.is_done():
+    while True:
         feed_queue(objects, func, get_deps, results, state)
 
         try:
@@ -119,6 +121,9 @@ def parallel_execute_iter(objects, func, get_deps):
         except thread.error:
             raise ShutdownException()
 
+        if event is STOP:
+            break
+
         obj, _, exception = event
         if exception is None:
             log.debug('Finished processing: {}'.format(obj))
@@ -170,6 +175,9 @@ def feed_queue(objects, func, get_deps, results, state):
             t.start()
             state.started.add(obj)
 
+    if state.is_done():
+        results.put(STOP)
+
 
 class UpstreamError(Exception):
     pass