12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- from __future__ import absolute_import
- from __future__ import unicode_literals
- import unittest
- from time import sleep
- from compose.cli.multiplexer import Multiplexer
- class MultiplexerTest(unittest.TestCase):
- def test_no_iterators(self):
- mux = Multiplexer([])
- self.assertEqual([], list(mux.loop()))
- def test_empty_iterators(self):
- mux = Multiplexer([
- (x for x in []),
- (x for x in []),
- ])
- self.assertEqual([], list(mux.loop()))
- def test_aggregates_output(self):
- mux = Multiplexer([
- (x for x in [0, 2, 4]),
- (x for x in [1, 3, 5]),
- ])
- self.assertEqual(
- [0, 1, 2, 3, 4, 5],
- sorted(list(mux.loop())),
- )
- def test_exception(self):
- class Problem(Exception):
- pass
- def problematic_iterator():
- yield 0
- yield 2
- raise Problem(":(")
- mux = Multiplexer([
- problematic_iterator(),
- (x for x in [1, 3, 5]),
- ])
- with self.assertRaises(Problem):
- list(mux.loop())
- def test_cascade_stop(self):
- def fast_stream():
- for num in range(3):
- yield "stream1 %s" % num
- def slow_stream():
- sleep(5)
- yield "stream2 FAIL"
- mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True)
- assert "stream2 FAIL" not in set(mux.loop())
|