Browse Source

Merge pull request #1754 from aanand/wait-for-all-containers-to-exit

Wait for all containers to exit
Aanand Prasad 10 years ago
parent
commit
b47bb84ea1
3 changed files with 70 additions and 14 deletions
  1. 1 2
      compose/cli/log_printer.py
  2. 24 12
      compose/cli/multiplexer.py
  3. 45 0
      tests/unit/multiplexer_test.py

+ 1 - 2
compose/cli/log_printer.py

@@ -4,7 +4,7 @@ import sys
 
 from itertools import cycle
 
-from .multiplexer import Multiplexer, STOP
+from .multiplexer import Multiplexer
 from . import colors
 from .utils import split_buffer
 
@@ -61,7 +61,6 @@ class LogPrinter(object):
 
         exit_code = container.wait()
         yield color_fn("%s exited with code %s\n" % (container.name, exit_code))
-        yield STOP
 
     def _generate_prefix(self, container):
         """

+ 24 - 12
compose/cli/multiplexer.py

@@ -7,36 +7,48 @@ except ImportError:
     from queue import Queue, Empty  # Python 3.x
 
 
-# Yield STOP from an input generator to stop the
-# top-level loop without processing any more input.
 STOP = object()
 
 
 class Multiplexer(object):
-    def __init__(self, generators):
-        self.generators = generators
+    """
+    Create a single iterator from several iterators by running all of them in
+    parallel and yielding results as they come in.
+    """
+
+    def __init__(self, iterators):
+        self.iterators = iterators
+        self._num_running = len(iterators)
         self.queue = Queue()
 
     def loop(self):
         self._init_readers()
 
-        while True:
+        while self._num_running > 0:
             try:
-                item = self.queue.get(timeout=0.1)
+                item, exception = self.queue.get(timeout=0.1)
+
+                if exception:
+                    raise exception
+
                 if item is STOP:
-                    break
+                    self._num_running -= 1
                 else:
                     yield item
             except Empty:
                 pass
 
     def _init_readers(self):
-        for generator in self.generators:
-            t = Thread(target=_enqueue_output, args=(generator, self.queue))
+        for iterator in self.iterators:
+            t = Thread(target=_enqueue_output, args=(iterator, self.queue))
             t.daemon = True
             t.start()
 
 
-def _enqueue_output(generator, queue):
-    for item in generator:
-        queue.put(item)
+def _enqueue_output(iterator, queue):
+    try:
+        for item in iterator:
+            queue.put((item, None))
+        queue.put((STOP, None))
+    except Exception as e:
+        queue.put((None, e))

+ 45 - 0
tests/unit/multiplexer_test.py

@@ -0,0 +1,45 @@
+import unittest
+
+from compose.cli.multiplexer import Multiplexer
+
+
+class MultiplexerTest(unittest.TestCase):
+    def test_no_iterators(self):
+        mux = Multiplexer([])
+        self.assertEqual([], list(mux.loop()))
+
+    def test_empty_iterators(self):
+        mux = Multiplexer([
+            (x for x in []),
+            (x for x in []),
+        ])
+
+        self.assertEqual([], list(mux.loop()))
+
+    def test_aggregates_output(self):
+        mux = Multiplexer([
+            (x for x in [0, 2, 4]),
+            (x for x in [1, 3, 5]),
+        ])
+
+        self.assertEqual(
+            [0, 1, 2, 3, 4, 5],
+            sorted(list(mux.loop())),
+        )
+
+    def test_exception(self):
+        class Problem(Exception):
+            pass
+
+        def problematic_iterator():
+            yield 0
+            yield 2
+            raise Problem(":(")
+
+        mux = Multiplexer([
+            problematic_iterator(),
+            (x for x in [1, 3, 5]),
+        ])
+
+        with self.assertRaises(Problem):
+            list(mux.loop())