| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 | from __future__ import absolute_importfrom __future__ import unicode_literalsimport itertoolsimport pytestimport requestsimport sixfrom docker.errors import APIErrorfrom six.moves.queue import Queuefrom compose.cli.log_printer import build_log_generatorfrom compose.cli.log_printer import build_log_presentersfrom compose.cli.log_printer import build_no_log_generatorfrom compose.cli.log_printer import consume_queuefrom compose.cli.log_printer import QueueItemfrom compose.cli.log_printer import wait_on_exitfrom compose.cli.log_printer import watch_eventsfrom compose.container import Containerfrom tests import mock@pytest.fixturedef output_stream():    output = six.StringIO()    output.flush = mock.Mock()    return output@pytest.fixturedef mock_container():    return mock.Mock(spec=Container, name_without_project='web_1')class TestLogPresenter(object):    def test_monochrome(self, mock_container):        presenters = build_log_presenters(['foo', 'bar'], True)        presenter = next(presenters)        actual = presenter.present(mock_container, "this line")        assert actual == "web_1  | this line"    def test_polychrome(self, mock_container):        presenters = build_log_presenters(['foo', 'bar'], False)        presenter = next(presenters)        actual = presenter.present(mock_container, "this line")        assert '\033[' in actualdef test_wait_on_exit():    exit_status = 3    mock_container = mock.Mock(        spec=Container,        name='cname',        wait=mock.Mock(return_value=exit_status))    expected = '{} exited with code {}\n'.format(mock_container.name, exit_status)    assert expected == wait_on_exit(mock_container)def test_wait_on_exit_raises():    status_code = 500    def mock_wait():        resp = requests.Response()        resp.status_code = status_code        raise APIError('Bad server', resp)    mock_container = mock.Mock(        spec=Container,        name='cname',        wait=mock_wait    )    expected = 'Unexpected API error for {} (HTTP code {})\n'.format(        mock_container.name, status_code,    )    assert expected in wait_on_exit(mock_container)def test_build_no_log_generator(mock_container):    mock_container.has_api_logs = False    mock_container.log_driver = 'none'    output, = build_no_log_generator(mock_container, None)    assert "WARNING: no logs are available with the 'none' log driver\n" in output    assert "exited with code" not in outputclass TestBuildLogGenerator(object):    def test_no_log_stream(self, mock_container):        mock_container.log_stream = None        mock_container.logs.return_value = iter([b"hello\nworld"])        log_args = {'follow': True}        generator = build_log_generator(mock_container, log_args)        assert next(generator) == "hello\n"        assert next(generator) == "world"        mock_container.logs.assert_called_once_with(            stdout=True,            stderr=True,            stream=True,            **log_args)    def test_with_log_stream(self, mock_container):        mock_container.log_stream = iter([b"hello\nworld"])        log_args = {'follow': True}        generator = build_log_generator(mock_container, log_args)        assert next(generator) == "hello\n"        assert next(generator) == "world"    def test_unicode(self, output_stream):        glyph = u'\u2022\n'        mock_container.log_stream = iter([glyph.encode('utf-8')])        generator = build_log_generator(mock_container, {})        assert next(generator) == glyph@pytest.fixturedef thread_map():    return {'cid': mock.Mock()}@pytest.fixturedef mock_presenters():    return itertools.cycle([mock.Mock()])class TestWatchEvents(object):    def test_stop_event(self, thread_map, mock_presenters):        event_stream = [{'action': 'stop', 'id': 'cid'}]        watch_events(thread_map, event_stream, mock_presenters, ())        assert not thread_map    def test_start_event(self, thread_map, mock_presenters):        container_id = 'abcd'        event = {'action': 'start', 'id': container_id, 'container': mock.Mock()}        event_stream = [event]        thread_args = 'foo', 'bar'        with mock.patch(            'compose.cli.log_printer.build_thread',            autospec=True        ) as mock_build_thread:            watch_events(thread_map, event_stream, mock_presenters, thread_args)            mock_build_thread.assert_called_once_with(                event['container'],                next(mock_presenters),                *thread_args)        assert container_id in thread_map    def test_container_attach_event(self, thread_map, mock_presenters):        container_id = 'abcd'        mock_container = mock.Mock(is_restarting=False)        mock_container.attach_log_stream.side_effect = APIError("race condition")        event_die = {'action': 'die', 'id': container_id}        event_start = {'action': 'start', 'id': container_id, 'container': mock_container}        event_stream = [event_die, event_start]        thread_args = 'foo', 'bar'        watch_events(thread_map, event_stream, mock_presenters, thread_args)        assert mock_container.attach_log_stream.called    def test_other_event(self, thread_map, mock_presenters):        container_id = 'abcd'        event_stream = [{'action': 'create', 'id': container_id}]        watch_events(thread_map, event_stream, mock_presenters, ())        assert container_id not in thread_mapclass TestConsumeQueue(object):    def test_item_is_an_exception(self):        class Problem(Exception):            pass        queue = Queue()        error = Problem('oops')        for item in QueueItem.new('a'), QueueItem.new('b'), QueueItem.exception(error):            queue.put(item)        generator = consume_queue(queue, False)        assert next(generator) == 'a'        assert next(generator) == 'b'        with pytest.raises(Problem):            next(generator)    def test_item_is_stop_without_cascade_stop(self):        queue = Queue()        for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'):            queue.put(item)        generator = consume_queue(queue, False)        assert next(generator) == 'a'        assert next(generator) == 'b'    def test_item_is_stop_with_cascade_stop(self):        """Return the name of the container that caused the cascade_stop"""        queue = Queue()        for item in QueueItem.stop('foobar-1'), QueueItem.new('a'), QueueItem.new('b'):            queue.put(item)        generator = consume_queue(queue, True)        assert next(generator) == 'foobar-1'    def test_item_is_none_when_timeout_is_hit(self):        queue = Queue()        generator = consume_queue(queue, False)        assert next(generator) is None
 |