multiplexer_test.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import unittest
  4. from time import sleep
  5. from compose.cli.multiplexer import Multiplexer
  6. class MultiplexerTest(unittest.TestCase):
  7. def test_no_iterators(self):
  8. mux = Multiplexer([])
  9. self.assertEqual([], list(mux.loop()))
  10. def test_empty_iterators(self):
  11. mux = Multiplexer([
  12. (x for x in []),
  13. (x for x in []),
  14. ])
  15. self.assertEqual([], list(mux.loop()))
  16. def test_aggregates_output(self):
  17. mux = Multiplexer([
  18. (x for x in [0, 2, 4]),
  19. (x for x in [1, 3, 5]),
  20. ])
  21. self.assertEqual(
  22. [0, 1, 2, 3, 4, 5],
  23. sorted(list(mux.loop())),
  24. )
  25. def test_exception(self):
  26. class Problem(Exception):
  27. pass
  28. def problematic_iterator():
  29. yield 0
  30. yield 2
  31. raise Problem(":(")
  32. mux = Multiplexer([
  33. problematic_iterator(),
  34. (x for x in [1, 3, 5]),
  35. ])
  36. with self.assertRaises(Problem):
  37. list(mux.loop())
  38. def test_cascade_stop(self):
  39. mux = Multiplexer([
  40. ((lambda x: sleep(0.01) or x)(x) for x in ['after 0.01 sec T1',
  41. 'after 0.02 sec T1',
  42. 'after 0.03 sec T1']),
  43. ((lambda x: sleep(0.02) or x)(x) for x in ['after 0.02 sec T2',
  44. 'after 0.04 sec T2',
  45. 'after 0.06 sec T2']),
  46. ], cascade_stop=True)
  47. self.assertEqual(
  48. ['after 0.01 sec T1',
  49. 'after 0.02 sec T1',
  50. 'after 0.02 sec T2',
  51. 'after 0.03 sec T1'],
  52. sorted(list(mux.loop())))