| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 | from __future__ import absolute_importfrom __future__ import unicode_literalsfrom six import StringIOfrom compose import progress_streamfrom tests import unittestclass ProgressStreamTestCase(unittest.TestCase):    def test_stream_output(self):        output = [            b'{"status": "Downloading", "progressDetail": {"current": '            b'31019763, "start": 1413653874, "total": 62763875}, '            b'"progress": "..."}',        ]        events = progress_stream.stream_output(output, StringIO())        self.assertEqual(len(events), 1)    def test_stream_output_div_zero(self):        output = [            b'{"status": "Downloading", "progressDetail": {"current": '            b'0, "start": 1413653874, "total": 0}, '            b'"progress": "..."}',        ]        events = progress_stream.stream_output(output, StringIO())        self.assertEqual(len(events), 1)    def test_stream_output_null_total(self):        output = [            b'{"status": "Downloading", "progressDetail": {"current": '            b'0, "start": 1413653874, "total": null}, '            b'"progress": "..."}',        ]        events = progress_stream.stream_output(output, StringIO())        self.assertEqual(len(events), 1)    def test_stream_output_progress_event_tty(self):        events = [            b'{"status": "Already exists", "progressDetail": {}, "id": "8d05e3af52b0"}'        ]        class TTYStringIO(StringIO):            def isatty(self):                return True        output = TTYStringIO()        events = progress_stream.stream_output(events, output)        self.assertTrue(len(output.getvalue()) > 0)    def test_stream_output_progress_event_no_tty(self):        events = [            b'{"status": "Already exists", "progressDetail": {}, "id": "8d05e3af52b0"}'        ]        output = StringIO()        events = progress_stream.stream_output(events, output)        self.assertEqual(len(output.getvalue()), 0)    def test_stream_output_no_progress_event_no_tty(self):        events = [            b'{"status": "Pulling from library/xy", "id": "latest"}'        ]        output = StringIO()        events = progress_stream.stream_output(events, output)        self.assertTrue(len(output.getvalue()) > 0)
 |