Browse Source

Refactor setup_queue()

- Stop sharing set objects across threads
- Use a second queue to signal when producer threads are done
- Use a single consumer thread to check dependencies and kick off new
  producers

Signed-off-by: Aanand Prasad <[email protected]>
Aanand Prasad 9 years ago
parent
commit
bcdf541c8c
2 changed files with 47 additions and 22 deletions
  1. 44 22
      compose/parallel.py
  2. 3 0
      compose/service.py

+ 44 - 22
compose/parallel.py

@@ -1,6 +1,7 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 from __future__ import unicode_literals
 from __future__ import unicode_literals
 
 
+import logging
 import operator
 import operator
 import sys
 import sys
 from threading import Thread
 from threading import Thread
@@ -14,6 +15,9 @@ from compose.cli.signals import ShutdownException
 from compose.utils import get_output_stream
 from compose.utils import get_output_stream
 
 
 
 
+log = logging.getLogger(__name__)
+
+
 def parallel_execute(objects, func, get_name, msg, get_deps=None):
 def parallel_execute(objects, func, get_name, msg, get_deps=None):
     """Runs func on objects in parallel while ensuring that func is
     """Runs func on objects in parallel while ensuring that func is
     ran on object only after it is ran on all its dependencies.
     ran on object only after it is ran on all its dependencies.
@@ -73,35 +77,53 @@ def setup_queue(objects, func, get_deps, get_name):
         get_deps = _no_deps
         get_deps = _no_deps
 
 
     results = Queue()
     results = Queue()
-    started = set()   # objects being processed
-    finished = set()  # objects which have been processed
-
-    def do_op(obj):
+    output = Queue()
+
+    def consumer():
+        started = set()   # objects being processed
+        finished = set()  # objects which have been processed
+
+        def ready(obj):
+            """
+            Returns true if obj is ready to be processed:
+              - all dependencies have been processed
+              - obj is not already being processed
+            """
+            return obj not in started and all(
+                dep not in objects or dep in finished
+                for dep in get_deps(obj)
+            )
+
+        while len(finished) < len(objects):
+            for obj in filter(ready, objects):
+                log.debug('Starting producer thread for {}'.format(obj))
+                t = Thread(target=producer, args=(obj,))
+                t.daemon = True
+                t.start()
+                started.add(obj)
+
+            try:
+                event = results.get(timeout=1)
+            except Empty:
+                continue
+
+            obj = event[0]
+            log.debug('Finished processing: {}'.format(obj))
+            finished.add(obj)
+            output.put(event)
+
+    def producer(obj):
         try:
         try:
             result = func(obj)
             result = func(obj)
             results.put((obj, result, None))
             results.put((obj, result, None))
         except Exception as e:
         except Exception as e:
             results.put((obj, None, e))
             results.put((obj, None, e))
 
 
-        finished.add(obj)
-        feed()
+    t = Thread(target=consumer)
+    t.daemon = True
+    t.start()
 
 
-    def ready(obj):
-        # Is object ready for performing operation
-        return obj not in started and all(
-            dep not in objects or dep in finished
-            for dep in get_deps(obj)
-        )
-
-    def feed():
-        for obj in filter(ready, objects):
-            started.add(obj)
-            t = Thread(target=do_op, args=(obj,))
-            t.daemon = True
-            t.start()
-
-    feed()
-    return results
+    return output
 
 
 
 
 class ParallelStreamWriter(object):
 class ParallelStreamWriter(object):

+ 3 - 0
compose/service.py

@@ -135,6 +135,9 @@ class Service(object):
         self.networks = networks or {}
         self.networks = networks or {}
         self.options = options
         self.options = options
 
 
+    def __repr__(self):
+        return '<Service: {}>'.format(self.name)
+
     def containers(self, stopped=False, one_off=False, filters={}):
     def containers(self, stopped=False, one_off=False, filters={}):
         filters.update({'label': self.labels(one_off=one_off)})
         filters.update({'label': self.labels(one_off=one_off)})