multiplexer_test.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import unittest
  4. from time import sleep
  5. from compose.cli.multiplexer import Multiplexer
  6. class MultiplexerTest(unittest.TestCase):
  7. def test_no_iterators(self):
  8. mux = Multiplexer([])
  9. self.assertEqual([], list(mux.loop()))
  10. def test_empty_iterators(self):
  11. mux = Multiplexer([
  12. (x for x in []),
  13. (x for x in []),
  14. ])
  15. self.assertEqual([], list(mux.loop()))
  16. def test_aggregates_output(self):
  17. mux = Multiplexer([
  18. (x for x in [0, 2, 4]),
  19. (x for x in [1, 3, 5]),
  20. ])
  21. self.assertEqual(
  22. [0, 1, 2, 3, 4, 5],
  23. sorted(list(mux.loop())),
  24. )
  25. def test_exception(self):
  26. class Problem(Exception):
  27. pass
  28. def problematic_iterator():
  29. yield 0
  30. yield 2
  31. raise Problem(":(")
  32. mux = Multiplexer([
  33. problematic_iterator(),
  34. (x for x in [1, 3, 5]),
  35. ])
  36. with self.assertRaises(Problem):
  37. list(mux.loop())
  38. def test_cascade_stop(self):
  39. def fast_stream():
  40. for num in range(3):
  41. yield "stream1 %s" % num
  42. def slow_stream():
  43. sleep(5)
  44. yield "stream2 FAIL"
  45. mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True)
  46. assert "stream2 FAIL" not in set(mux.loop())