| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 | import osimport randomimport shutilimport tempfilefrom io import StringIOfrom compose import progress_streamfrom tests import unittestclass ProgressStreamTestCase(unittest.TestCase):    def test_stream_output(self):        output = [            b'{"status": "Downloading", "progressDetail": {"current": '            b'31019763, "start": 1413653874, "total": 62763875}, '            b'"progress": "..."}',        ]        events = list(progress_stream.stream_output(output, StringIO()))        assert len(events) == 1    def test_stream_output_div_zero(self):        output = [            b'{"status": "Downloading", "progressDetail": {"current": '            b'0, "start": 1413653874, "total": 0}, '            b'"progress": "..."}',        ]        events = list(progress_stream.stream_output(output, StringIO()))        assert len(events) == 1    def test_stream_output_null_total(self):        output = [            b'{"status": "Downloading", "progressDetail": {"current": '            b'0, "start": 1413653874, "total": null}, '            b'"progress": "..."}',        ]        events = list(progress_stream.stream_output(output, StringIO()))        assert len(events) == 1    def test_stream_output_progress_event_tty(self):        events = [            b'{"status": "Already exists", "progressDetail": {}, "id": "8d05e3af52b0"}'        ]        class TTYStringIO(StringIO):            def isatty(self):                return True        output = TTYStringIO()        events = list(progress_stream.stream_output(events, output))        assert len(output.getvalue()) > 0    def test_stream_output_progress_event_no_tty(self):        events = [            b'{"status": "Already exists", "progressDetail": {}, "id": "8d05e3af52b0"}'        ]        output = StringIO()        events = list(progress_stream.stream_output(events, output))        assert len(output.getvalue()) == 0    def test_stream_output_no_progress_event_no_tty(self):        events = [            b'{"status": "Pulling from library/xy", "id": "latest"}'        ]        output = StringIO()        events = list(progress_stream.stream_output(events, output))        assert len(output.getvalue()) > 0    def test_mismatched_encoding_stream_write(self):        tmpdir = tempfile.mkdtemp()        self.addCleanup(shutil.rmtree, tmpdir, True)        def mktempfile(encoding):            fname = os.path.join(tmpdir, hex(random.getrandbits(128))[2:-1])            return open(fname, mode='w+', encoding=encoding)        text = '就吃饭'        with mktempfile(encoding='utf-8') as tf:            progress_stream.write_to_stream(text, tf)            tf.seek(0)            assert tf.read() == text        with mktempfile(encoding='utf-32') as tf:            progress_stream.write_to_stream(text, tf)            tf.seek(0)            assert tf.read() == text        with mktempfile(encoding='ascii') as tf:            progress_stream.write_to_stream(text, tf)            tf.seek(0)            assert tf.read() == '???'    def test_get_digest_from_push(self):        digest = "sha256:abcd"        events = [            {"status": "..."},            {"status": "..."},            {"progressDetail": {}, "aux": {"Digest": digest}},        ]        assert progress_stream.get_digest_from_push(events) == digest    def test_get_digest_from_pull(self):        events = list()        assert progress_stream.get_digest_from_pull(events) is None        digest = "sha256:abcd"        events = [            {"status": "..."},            {"status": "..."},            {"status": "Digest: %s" % digest},            {"status": "..."},        ]        assert progress_stream.get_digest_from_pull(events) == digest
 |