Browse Source

Merge pull request #2654 from dnephin/fix_multiplex_test

Fix flaky multiplex test
Aanand Prasad 9 years ago
parent
commit
1ffd172084
1 changed files with 9 additions and 14 deletions
  1. 9 14
      tests/unit/multiplexer_test.py

+ 9 - 14
tests/unit/multiplexer_test.py

@@ -49,18 +49,13 @@ class MultiplexerTest(unittest.TestCase):
             list(mux.loop())
 
     def test_cascade_stop(self):
-        mux = Multiplexer([
-            ((lambda x: sleep(0.01) or x)(x) for x in ['after 0.01 sec T1',
-                                                       'after 0.02 sec T1',
-                                                       'after 0.03 sec T1']),
-            ((lambda x: sleep(0.02) or x)(x) for x in ['after 0.02 sec T2',
-                                                       'after 0.04 sec T2',
-                                                       'after 0.06 sec T2']),
-        ], cascade_stop=True)
+        def fast_stream():
+            for num in range(3):
+                yield "stream1 %s" % num
 
-        self.assertEqual(
-            ['after 0.01 sec T1',
-             'after 0.02 sec T1',
-             'after 0.02 sec T2',
-             'after 0.03 sec T1'],
-            sorted(list(mux.loop())))
+        def slow_stream():
+            sleep(5)
+            yield "stream2 FAIL"
+
+        mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True)
+        assert "stream2 FAIL" not in set(mux.loop())