瀏覽代碼

Merge pull request #3075 from dnephin/reactive_logs

Display logs for new containers started after the command is run
Aanand Prasad 9 年之前
父節點
當前提交
371ea479f5

+ 168 - 40
compose/cli/log_printer.py

@@ -2,67 +2,152 @@ from __future__ import absolute_import
 from __future__ import unicode_literals
 
 import sys
+from collections import namedtuple
 from itertools import cycle
+from threading import Thread
+
+from six.moves import _thread as thread
+from six.moves.queue import Empty
+from six.moves.queue import Queue
 
 from . import colors
-from .multiplexer import Multiplexer
 from compose import utils
+from compose.cli.signals import ShutdownException
 from compose.utils import split_buffer
 
 
+class LogPresenter(object):
+
+    def __init__(self, prefix_width, color_func):
+        self.prefix_width = prefix_width
+        self.color_func = color_func
+
+    def present(self, container, line):
+        prefix = container.name_without_project.ljust(self.prefix_width)
+        return '{prefix} {line}'.format(
+            prefix=self.color_func(prefix + ' |'),
+            line=line)
+
+
+def build_log_presenters(service_names, monochrome):
+    """Return an iterable of functions.
+
+    Each function can be used to format the logs output of a container.
+    """
+    prefix_width = max_name_width(service_names)
+
+    def no_color(text):
+        return text
+
+    for color_func in cycle([no_color] if monochrome else colors.rainbow()):
+        yield LogPresenter(prefix_width, color_func)
+
+
+def max_name_width(service_names, max_index_width=3):
+    """Calculate the maximum width of container names so we can make the log
+    prefixes line up like so:
+
+    db_1  | Listening
+    web_1 | Listening
+    """
+    return max(len(name) for name in service_names) + max_index_width
+
+
 class LogPrinter(object):
     """Print logs from many containers to a single output stream."""
 
     def __init__(self,
                  containers,
+                 presenters,
+                 event_stream,
                  output=sys.stdout,
-                 monochrome=False,
                  cascade_stop=False,
                  log_args=None):
-        log_args = log_args or {}
         self.containers = containers
+        self.presenters = presenters
+        self.event_stream = event_stream
         self.output = utils.get_output_stream(output)
-        self.monochrome = monochrome
         self.cascade_stop = cascade_stop
-        self.log_args = log_args
+        self.log_args = log_args or {}
 
     def run(self):
         if not self.containers:
             return
 
-        prefix_width = max_name_width(self.containers)
-        generators = list(self._make_log_generators(self.monochrome, prefix_width))
-        for line in Multiplexer(generators, cascade_stop=self.cascade_stop).loop():
+        queue = Queue()
+        thread_args = queue, self.log_args
+        thread_map = build_thread_map(self.containers, self.presenters, thread_args)
+        start_producer_thread((
+            thread_map,
+            self.event_stream,
+            self.presenters,
+            thread_args))
+
+        for line in consume_queue(queue, self.cascade_stop):
+            remove_stopped_threads(thread_map)
+
+            if not line:
+                if not thread_map:
+                    # There are no running containers left to tail, so exit
+                    return
+                # We got an empty line because of a timeout, but there are still
+                # active containers to tail, so continue
+                continue
+
             self.output.write(line)
             self.output.flush()
 
-    def _make_log_generators(self, monochrome, prefix_width):
-        def no_color(text):
-            return text
 
-        if monochrome:
-            color_funcs = cycle([no_color])
-        else:
-            color_funcs = cycle(colors.rainbow())
+def remove_stopped_threads(thread_map):
+    for container_id, tailer_thread in list(thread_map.items()):
+        if not tailer_thread.is_alive():
+            thread_map.pop(container_id, None)
 
-        for color_func, container in zip(color_funcs, self.containers):
-            generator_func = get_log_generator(container)
-            prefix = color_func(build_log_prefix(container, prefix_width))
-            yield generator_func(container, prefix, color_func, self.log_args)
 
+def build_thread(container, presenter, queue, log_args):
+    tailer = Thread(
+        target=tail_container_logs,
+        args=(container, presenter, queue, log_args))
+    tailer.daemon = True
+    tailer.start()
+    return tailer
 
-def build_log_prefix(container, prefix_width):
-    return container.name_without_project.ljust(prefix_width) + ' | '
 
+def build_thread_map(initial_containers, presenters, thread_args):
+    return {
+        container.id: build_thread(container, next(presenters), *thread_args)
+        for container in initial_containers
+    }
 
-def max_name_width(containers):
-    """Calculate the maximum width of container names so we can make the log
-    prefixes line up like so:
 
-    db_1  | Listening
-    web_1 | Listening
-    """
-    return max(len(container.name_without_project) for container in containers)
+class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
+
+    @classmethod
+    def new(cls, item):
+        return cls(item, None, None)
+
+    @classmethod
+    def exception(cls, exc):
+        return cls(None, None, exc)
+
+    @classmethod
+    def stop(cls):
+        return cls(None, True, None)
+
+
+def tail_container_logs(container, presenter, queue, log_args):
+    generator = get_log_generator(container)
+
+    try:
+        for item in generator(container, log_args):
+            queue.put(QueueItem.new(presenter.present(container, item)))
+    except Exception as e:
+        queue.put(QueueItem.exception(e))
+        return
+
+    if log_args.get('follow'):
+        queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
+    queue.put(QueueItem.stop())
 
 
 def get_log_generator(container):
@@ -71,32 +156,75 @@ def get_log_generator(container):
     return build_no_log_generator
 
 
-def build_no_log_generator(container, prefix, color_func, log_args):
+def build_no_log_generator(container, log_args):
     """Return a generator that prints a warning about logs and waits for
     container to exit.
     """
-    yield "{} WARNING: no logs are available with the '{}' log driver\n".format(
-        prefix,
+    yield "WARNING: no logs are available with the '{}' log driver\n".format(
         container.log_driver)
-    if log_args.get('follow'):
-        yield color_func(wait_on_exit(container))
 
 
-def build_log_generator(container, prefix, color_func, log_args):
+def build_log_generator(container, log_args):
     # if the container doesn't have a log_stream we need to attach to container
     # before log printer starts running
     if container.log_stream is None:
         stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
-        line_generator = split_buffer(stream)
     else:
-        line_generator = split_buffer(container.log_stream)
+        stream = container.log_stream
 
-    for line in line_generator:
-        yield prefix + line
-    if log_args.get('follow'):
-        yield color_func(wait_on_exit(container))
+    return split_buffer(stream)
 
 
 def wait_on_exit(container):
     exit_code = container.wait()
     return "%s exited with code %s\n" % (container.name, exit_code)
+
+
+def start_producer_thread(thread_args):
+    producer = Thread(target=watch_events, args=thread_args)
+    producer.daemon = True
+    producer.start()
+
+
+def watch_events(thread_map, event_stream, presenters, thread_args):
+    for event in event_stream:
+        if event['action'] == 'stop':
+            thread_map.pop(event['id'], None)
+
+        if event['action'] != 'start':
+            continue
+
+        if event['id'] in thread_map:
+            if thread_map[event['id']].is_alive():
+                continue
+            # Container was stopped and started, we need a new thread
+            thread_map.pop(event['id'], None)
+
+        thread_map[event['id']] = build_thread(
+            event['container'],
+            next(presenters),
+            *thread_args)
+
+
+def consume_queue(queue, cascade_stop):
+    """Consume the queue by reading lines off of it and yielding them."""
+    while True:
+        try:
+            item = queue.get(timeout=0.1)
+        except Empty:
+            yield None
+            continue
+        # See https://github.com/docker/compose/issues/189
+        except thread.error:
+            raise ShutdownException()
+
+        if item.exc:
+            raise item.exc
+
+        if item.is_stop:
+            if cascade_stop:
+                raise StopIteration
+            else:
+                continue
+
+        yield item.item

+ 39 - 12
compose/cli/main.py

@@ -35,6 +35,7 @@ from .docopt_command import NoSuchCommand
 from .errors import UserError
 from .formatter import ConsoleWarningFormatter
 from .formatter import Formatter
+from .log_printer import build_log_presenters
 from .log_printer import LogPrinter
 from .utils import get_version_info
 from .utils import yesno
@@ -279,6 +280,7 @@ class TopLevelCommand(object):
 
         def json_format_event(event):
             event['time'] = event['time'].isoformat()
+            event.pop('container')
             return json.dumps(event)
 
         for event in self.project.events():
@@ -376,7 +378,6 @@ class TopLevelCommand(object):
         """
         containers = self.project.containers(service_names=options['SERVICE'], stopped=True)
 
-        monochrome = options['--no-color']
         tail = options['--tail']
         if tail is not None:
             if tail.isdigit():
@@ -389,7 +390,11 @@ class TopLevelCommand(object):
             'timestamps': options['--timestamps']
         }
         print("Attaching to", list_containers(containers))
-        LogPrinter(containers, monochrome=monochrome, log_args=log_args).run()
+        log_printer_from_project(
+            self.project,
+            containers,
+            options['--no-color'],
+            log_args).run()
 
     def pause(self, options):
         """
@@ -671,7 +676,6 @@ class TopLevelCommand(object):
             --remove-orphans           Remove containers for services not
                                        defined in the Compose file
         """
-        monochrome = options['--no-color']
         start_deps = not options['--no-deps']
         cascade_stop = options['--abort-on-container-exit']
         service_names = options['SERVICE']
@@ -694,8 +698,14 @@ class TopLevelCommand(object):
 
             if detached:
                 return
-            log_args = {'follow': True}
-            log_printer = build_log_printer(to_attach, service_names, monochrome, cascade_stop, log_args)
+
+            log_printer = log_printer_from_project(
+                self.project,
+                filter_containers_to_service_names(to_attach, service_names),
+                options['--no-color'],
+                {'follow': True},
+                cascade_stop,
+                event_stream=self.project.events(service_names=service_names))
             print("Attaching to", list_containers(log_printer.containers))
             log_printer.run()
 
@@ -842,13 +852,30 @@ def run_one_off_container(container_options, project, service, options):
     sys.exit(exit_code)
 
 
-def build_log_printer(containers, service_names, monochrome, cascade_stop, log_args):
-    if service_names:
-        containers = [
-            container
-            for container in containers if container.service in service_names
-        ]
-    return LogPrinter(containers, monochrome=monochrome, cascade_stop=cascade_stop, log_args=log_args)
+def log_printer_from_project(
+    project,
+    containers,
+    monochrome,
+    log_args,
+    cascade_stop=False,
+    event_stream=None,
+):
+    return LogPrinter(
+        containers,
+        build_log_presenters(project.service_names, monochrome),
+        event_stream or project.events(),
+        cascade_stop=cascade_stop,
+        log_args=log_args)
+
+
+def filter_containers_to_service_names(containers, service_names):
+    if not service_names:
+        return containers
+
+    return [
+        container
+        for container in containers if container.service in service_names
+    ]
 
 
 @contextlib.contextmanager

+ 0 - 66
compose/cli/multiplexer.py

@@ -1,66 +0,0 @@
-from __future__ import absolute_import
-from __future__ import unicode_literals
-
-from threading import Thread
-
-from six.moves import _thread as thread
-
-try:
-    from Queue import Queue, Empty
-except ImportError:
-    from queue import Queue, Empty  # Python 3.x
-
-from compose.cli.signals import ShutdownException
-
-STOP = object()
-
-
-class Multiplexer(object):
-    """
-    Create a single iterator from several iterators by running all of them in
-    parallel and yielding results as they come in.
-    """
-
-    def __init__(self, iterators, cascade_stop=False):
-        self.iterators = iterators
-        self.cascade_stop = cascade_stop
-        self._num_running = len(iterators)
-        self.queue = Queue()
-
-    def loop(self):
-        self._init_readers()
-
-        while self._num_running > 0:
-            try:
-                item, exception = self.queue.get(timeout=0.1)
-
-                if exception:
-                    raise exception
-
-                if item is STOP:
-                    if self.cascade_stop is True:
-                        break
-                    else:
-                        self._num_running -= 1
-                else:
-                    yield item
-            except Empty:
-                pass
-            # See https://github.com/docker/compose/issues/189
-            except thread.error:
-                raise ShutdownException()
-
-    def _init_readers(self):
-        for iterator in self.iterators:
-            t = Thread(target=_enqueue_output, args=(iterator, self.queue))
-            t.daemon = True
-            t.start()
-
-
-def _enqueue_output(iterator, queue):
-    try:
-        for item in iterator:
-            queue.put((item, None))
-        queue.put((STOP, None))
-    except Exception as e:
-        queue.put((None, e))

+ 9 - 4
compose/project.py

@@ -297,7 +297,7 @@ class Project(object):
                 detached=True,
                 start=False)
 
-    def events(self):
+    def events(self, service_names=None):
         def build_container_event(event, container):
             time = datetime.datetime.fromtimestamp(event['time'])
             time = time.replace(
@@ -311,10 +311,11 @@ class Project(object):
                 'attributes': {
                     'name': container.name,
                     'image': event['from'],
-                }
+                },
+                'container': container,
             }
 
-        service_names = set(self.service_names)
+        service_names = set(service_names or self.service_names)
         for event in self.client.events(
             filters={'label': self.labels()},
             decode=True
@@ -325,7 +326,11 @@ class Project(object):
                 continue
 
             # TODO: get labels from the API v1.22 , see github issue 2618
-            container = Container.from_id(self.client, event['id'])
+            try:
+                # this can fail if the conatiner has been removed
+                container = Container.from_id(self.client, event['id'])
+            except APIError:
+                continue
             if container.service not in service_names:
                 continue
             yield build_container_event(event, container)

+ 44 - 22
tests/acceptance/cli_test.py

@@ -78,21 +78,20 @@ class ContainerCountCondition(object):
 
 class ContainerStateCondition(object):
 
-    def __init__(self, client, name, running):
+    def __init__(self, client, name, status):
         self.client = client
         self.name = name
-        self.running = running
+        self.status = status
 
     def __call__(self):
         try:
             container = self.client.inspect_container(self.name)
-            return container['State']['Running'] == self.running
+            return container['State']['Status'] == self.status
         except errors.APIError:
             return False
 
     def __str__(self):
-        state = 'running' if self.running else 'stopped'
-        return "waiting for container to be %s" % state
+        return "waiting for container to be %s" % self.status
 
 
 class CLITestCase(DockerClientTestCase):
@@ -397,8 +396,8 @@ class CLITestCase(DockerClientTestCase):
         self.base_dir = 'tests/fixtures/echo-services'
         result = self.dispatch(['up', '--no-color'])
 
-        assert 'simple_1  | simple' in result.stdout
-        assert 'another_1 | another' in result.stdout
+        assert 'simple_1   | simple' in result.stdout
+        assert 'another_1  | another' in result.stdout
         assert 'simple_1 exited with code 0' in result.stdout
         assert 'another_1 exited with code 0' in result.stdout
 
@@ -1091,26 +1090,26 @@ class CLITestCase(DockerClientTestCase):
         wait_on_condition(ContainerStateCondition(
             self.project.client,
             'simplecomposefile_simple_run_1',
-            running=True))
+            'running'))
 
         os.kill(proc.pid, signal.SIGINT)
         wait_on_condition(ContainerStateCondition(
             self.project.client,
             'simplecomposefile_simple_run_1',
-            running=False))
+            'exited'))
 
     def test_run_handles_sigterm(self):
         proc = start_process(self.base_dir, ['run', '-T', 'simple', 'top'])
         wait_on_condition(ContainerStateCondition(
             self.project.client,
             'simplecomposefile_simple_run_1',
-            running=True))
+            'running'))
 
         os.kill(proc.pid, signal.SIGTERM)
         wait_on_condition(ContainerStateCondition(
             self.project.client,
             'simplecomposefile_simple_run_1',
-            running=False))
+            'exited'))
 
     def test_rm(self):
         service = self.project.get_service('simple')
@@ -1206,7 +1205,7 @@ class CLITestCase(DockerClientTestCase):
 
     def test_logs_follow(self):
         self.base_dir = 'tests/fixtures/echo-services'
-        self.dispatch(['up', '-d'], None)
+        self.dispatch(['up', '-d'])
 
         result = self.dispatch(['logs', '-f'])
 
@@ -1215,29 +1214,52 @@ class CLITestCase(DockerClientTestCase):
         assert 'another' in result.stdout
         assert 'exited with code 0' in result.stdout
 
-    def test_logs_unfollow(self):
+    def test_logs_follow_logs_from_new_containers(self):
         self.base_dir = 'tests/fixtures/logs-composefile'
-        self.dispatch(['up', '-d'], None)
+        self.dispatch(['up', '-d', 'simple'])
+
+        proc = start_process(self.base_dir, ['logs', '-f'])
+
+        self.dispatch(['up', '-d', 'another'])
+        wait_on_condition(ContainerStateCondition(
+            self.project.client,
+            'logscomposefile_another_1',
+            'exited'))
+
+        os.kill(proc.pid, signal.SIGINT)
+        result = wait_on_process(proc, returncode=1)
+        assert 'test' in result.stdout
+
+    def test_logs_default(self):
+        self.base_dir = 'tests/fixtures/logs-composefile'
+        self.dispatch(['up', '-d'])
 
         result = self.dispatch(['logs'])
+        assert 'hello' in result.stdout
+        assert 'test' in result.stdout
+        assert 'exited with' not in result.stdout
 
-        assert result.stdout.count('\n') >= 1
-        assert 'exited with code 0' not in result.stdout
+    def test_logs_on_stopped_containers_exits(self):
+        self.base_dir = 'tests/fixtures/echo-services'
+        self.dispatch(['up'])
+
+        result = self.dispatch(['logs'])
+        assert 'simple' in result.stdout
+        assert 'another' in result.stdout
+        assert 'exited with' not in result.stdout
 
     def test_logs_timestamps(self):
         self.base_dir = 'tests/fixtures/echo-services'
-        self.dispatch(['up', '-d'], None)
-
-        result = self.dispatch(['logs', '-f', '-t'], None)
+        self.dispatch(['up', '-d'])
 
+        result = self.dispatch(['logs', '-f', '-t'])
         self.assertRegexpMatches(result.stdout, '(\d{4})-(\d{2})-(\d{2})T(\d{2})\:(\d{2})\:(\d{2})')
 
     def test_logs_tail(self):
         self.base_dir = 'tests/fixtures/logs-tail-composefile'
-        self.dispatch(['up'], None)
-
-        result = self.dispatch(['logs', '--tail', '2'], None)
+        self.dispatch(['up'])
 
+        result = self.dispatch(['logs', '--tail', '2'])
         assert result.stdout.count('\n') == 3
 
     def test_kill(self):

+ 141 - 62
tests/unit/cli/log_printer_test.py

@@ -1,27 +1,23 @@
 from __future__ import absolute_import
 from __future__ import unicode_literals
 
+import itertools
+
 import pytest
 import six
+from six.moves.queue import Queue
 
-from compose.cli.log_printer import LogPrinter
+from compose.cli.log_printer import build_log_generator
+from compose.cli.log_printer import build_log_presenters
+from compose.cli.log_printer import build_no_log_generator
+from compose.cli.log_printer import consume_queue
+from compose.cli.log_printer import QueueItem
 from compose.cli.log_printer import wait_on_exit
+from compose.cli.log_printer import watch_events
 from compose.container import Container
 from tests import mock
 
 
-def build_mock_container(reader):
-    return mock.Mock(
-        spec=Container,
-        name='myapp_web_1',
-        name_without_project='web_1',
-        has_api_logs=True,
-        log_stream=None,
-        logs=reader,
-        wait=mock.Mock(return_value=0),
-    )
-
-
 @pytest.fixture
 def output_stream():
     output = six.StringIO()
@@ -31,68 +27,151 @@ def output_stream():
 
 @pytest.fixture
 def mock_container():
-    def reader(*args, **kwargs):
-        yield b"hello\nworld"
-    return build_mock_container(reader)
+    return mock.Mock(spec=Container, name_without_project='web_1')
 
 
-class TestLogPrinter(object):
+class TestLogPresenter(object):
 
-    def test_single_container(self, output_stream, mock_container):
-        LogPrinter([mock_container], output=output_stream, log_args={'follow': True}).run()
+    def test_monochrome(self, mock_container):
+        presenters = build_log_presenters(['foo', 'bar'], True)
+        presenter = next(presenters)
+        actual = presenter.present(mock_container, "this line")
+        assert actual == "web_1  | this line"
 
-        output = output_stream.getvalue()
-        assert 'hello' in output
-        assert 'world' in output
-        # Call count is 2 lines + "container exited line"
-        assert output_stream.flush.call_count == 3
+    def test_polychrome(self, mock_container):
+        presenters = build_log_presenters(['foo', 'bar'], False)
+        presenter = next(presenters)
+        actual = presenter.present(mock_container, "this line")
+        assert '\033[' in actual
 
-    def test_single_container_without_stream(self, output_stream, mock_container):
-        LogPrinter([mock_container], output=output_stream).run()
 
-        output = output_stream.getvalue()
-        assert 'hello' in output
-        assert 'world' in output
-        # Call count is 2 lines
-        assert output_stream.flush.call_count == 2
+def test_wait_on_exit():
+    exit_status = 3
+    mock_container = mock.Mock(
+        spec=Container,
+        name='cname',
+        wait=mock.Mock(return_value=exit_status))
 
-    def test_monochrome(self, output_stream, mock_container):
-        LogPrinter([mock_container], output=output_stream, monochrome=True).run()
-        assert '\033[' not in output_stream.getvalue()
+    expected = '{} exited with code {}\n'.format(mock_container.name, exit_status)
+    assert expected == wait_on_exit(mock_container)
 
-    def test_polychrome(self, output_stream, mock_container):
-        LogPrinter([mock_container], output=output_stream).run()
-        assert '\033[' in output_stream.getvalue()
 
-    def test_unicode(self, output_stream):
-        glyph = u'\u2022'
+def test_build_no_log_generator(mock_container):
+    mock_container.has_api_logs = False
+    mock_container.log_driver = 'none'
+    output, = build_no_log_generator(mock_container, None)
+    assert "WARNING: no logs are available with the 'none' log driver\n" in output
+    assert "exited with code" not in output
+
 
-        def reader(*args, **kwargs):
-            yield glyph.encode('utf-8') + b'\n'
+class TestBuildLogGenerator(object):
 
-        container = build_mock_container(reader)
-        LogPrinter([container], output=output_stream).run()
-        output = output_stream.getvalue()
-        if six.PY2:
-            output = output.decode('utf-8')
+    def test_no_log_stream(self, mock_container):
+        mock_container.log_stream = None
+        mock_container.logs.return_value = iter([b"hello\nworld"])
+        log_args = {'follow': True}
 
-        assert glyph in output
+        generator = build_log_generator(mock_container, log_args)
+        assert next(generator) == "hello\n"
+        assert next(generator) == "world"
+        mock_container.logs.assert_called_once_with(
+            stdout=True,
+            stderr=True,
+            stream=True,
+            **log_args)
 
-    def test_wait_on_exit(self):
-        exit_status = 3
-        mock_container = mock.Mock(
-            spec=Container,
-            name='cname',
-            wait=mock.Mock(return_value=exit_status))
+    def test_with_log_stream(self, mock_container):
+        mock_container.log_stream = iter([b"hello\nworld"])
+        log_args = {'follow': True}
 
-        expected = '{} exited with code {}\n'.format(mock_container.name, exit_status)
-        assert expected == wait_on_exit(mock_container)
+        generator = build_log_generator(mock_container, log_args)
+        assert next(generator) == "hello\n"
+        assert next(generator) == "world"
 
-    def test_generator_with_no_logs(self, mock_container, output_stream):
-        mock_container.has_api_logs = False
-        mock_container.log_driver = 'none'
-        LogPrinter([mock_container], output=output_stream).run()
+    def test_unicode(self, output_stream):
+        glyph = u'\u2022\n'
+        mock_container.log_stream = iter([glyph.encode('utf-8')])
+
+        generator = build_log_generator(mock_container, {})
+        assert next(generator) == glyph
 
-        output = output_stream.getvalue()
-        assert "WARNING: no logs are available with the 'none' log driver\n" in output
-        assert "exited with code" not in output
+
[email protected]
+def thread_map():
+    return {'cid': mock.Mock()}
+
+
[email protected]
+def mock_presenters():
+    return itertools.cycle([mock.Mock()])
+
+
+class TestWatchEvents(object):
+
+    def test_stop_event(self, thread_map, mock_presenters):
+        event_stream = [{'action': 'stop', 'id': 'cid'}]
+        watch_events(thread_map, event_stream, mock_presenters, ())
+        assert not thread_map
+
+    def test_start_event(self, thread_map, mock_presenters):
+        container_id = 'abcd'
+        event = {'action': 'start', 'id': container_id, 'container': mock.Mock()}
+        event_stream = [event]
+        thread_args = 'foo', 'bar'
+
+        with mock.patch(
+            'compose.cli.log_printer.build_thread',
+            autospec=True
+        ) as mock_build_thread:
+            watch_events(thread_map, event_stream, mock_presenters, thread_args)
+            mock_build_thread.assert_called_once_with(
+                event['container'],
+                next(mock_presenters),
+                *thread_args)
+        assert container_id in thread_map
+
+    def test_other_event(self, thread_map, mock_presenters):
+        container_id = 'abcd'
+        event_stream = [{'action': 'create', 'id': container_id}]
+        watch_events(thread_map, event_stream, mock_presenters, ())
+        assert container_id not in thread_map
+
+
+class TestConsumeQueue(object):
+
+    def test_item_is_an_exception(self):
+
+        class Problem(Exception):
+            pass
+
+        queue = Queue()
+        error = Problem('oops')
+        for item in QueueItem.new('a'), QueueItem.new('b'), QueueItem.exception(error):
+            queue.put(item)
+
+        generator = consume_queue(queue, False)
+        assert next(generator) == 'a'
+        assert next(generator) == 'b'
+        with pytest.raises(Problem):
+            next(generator)
+
+    def test_item_is_stop_without_cascade_stop(self):
+        queue = Queue()
+        for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'):
+            queue.put(item)
+
+        generator = consume_queue(queue, False)
+        assert next(generator) == 'a'
+        assert next(generator) == 'b'
+
+    def test_item_is_stop_with_cascade_stop(self):
+        queue = Queue()
+        for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'):
+            queue.put(item)
+
+        assert list(consume_queue(queue, True)) == []
+
+    def test_item_is_none_when_timeout_is_hit(self):
+        queue = Queue()
+        generator = consume_queue(queue, False)
+        assert next(generator) is None

+ 7 - 7
tests/unit/cli/main_test.py

@@ -8,8 +8,8 @@ import pytest
 from compose import container
 from compose.cli.errors import UserError
 from compose.cli.formatter import ConsoleWarningFormatter
-from compose.cli.main import build_log_printer
 from compose.cli.main import convergence_strategy_from_opts
+from compose.cli.main import filter_containers_to_service_names
 from compose.cli.main import setup_console_handler
 from compose.service import ConvergenceStrategy
 from tests import mock
@@ -32,7 +32,7 @@ def logging_handler():
 
 class TestCLIMainTestCase(object):
 
-    def test_build_log_printer(self):
+    def test_filter_containers_to_service_names(self):
         containers = [
             mock_container('web', 1),
             mock_container('web', 2),
@@ -41,18 +41,18 @@ class TestCLIMainTestCase(object):
             mock_container('another', 1),
         ]
         service_names = ['web', 'db']
-        log_printer = build_log_printer(containers, service_names, True, False, {'follow': True})
-        assert log_printer.containers == containers[:3]
+        actual = filter_containers_to_service_names(containers, service_names)
+        assert actual == containers[:3]
 
-    def test_build_log_printer_all_services(self):
+    def test_filter_containers_to_service_names_all(self):
         containers = [
             mock_container('web', 1),
             mock_container('db', 1),
             mock_container('other', 1),
         ]
         service_names = []
-        log_printer = build_log_printer(containers, service_names, True, False, {'follow': True})
-        assert log_printer.containers == containers
+        actual = filter_containers_to_service_names(containers, service_names)
+        assert actual == containers
 
 
 class TestSetupConsoleHandlerTestCase(object):

+ 0 - 61
tests/unit/multiplexer_test.py

@@ -1,61 +0,0 @@
-from __future__ import absolute_import
-from __future__ import unicode_literals
-
-import unittest
-from time import sleep
-
-from compose.cli.multiplexer import Multiplexer
-
-
-class MultiplexerTest(unittest.TestCase):
-    def test_no_iterators(self):
-        mux = Multiplexer([])
-        self.assertEqual([], list(mux.loop()))
-
-    def test_empty_iterators(self):
-        mux = Multiplexer([
-            (x for x in []),
-            (x for x in []),
-        ])
-
-        self.assertEqual([], list(mux.loop()))
-
-    def test_aggregates_output(self):
-        mux = Multiplexer([
-            (x for x in [0, 2, 4]),
-            (x for x in [1, 3, 5]),
-        ])
-
-        self.assertEqual(
-            [0, 1, 2, 3, 4, 5],
-            sorted(list(mux.loop())),
-        )
-
-    def test_exception(self):
-        class Problem(Exception):
-            pass
-
-        def problematic_iterator():
-            yield 0
-            yield 2
-            raise Problem(":(")
-
-        mux = Multiplexer([
-            problematic_iterator(),
-            (x for x in [1, 3, 5]),
-        ])
-
-        with self.assertRaises(Problem):
-            list(mux.loop())
-
-    def test_cascade_stop(self):
-        def fast_stream():
-            for num in range(3):
-                yield "stream1 %s" % num
-
-        def slow_stream():
-            sleep(5)
-            yield "stream2 FAIL"
-
-        mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True)
-        assert "stream2 FAIL" not in set(mux.loop())

+ 3 - 0
tests/unit/project_test.py

@@ -309,6 +309,7 @@ class ProjectTest(unittest.TestCase):
                     'image': 'example/image',
                 },
                 'time': dt_with_microseconds(1420092061, 2),
+                'container': Container(None, {'Id': 'abcde'}),
             },
             {
                 'type': 'container',
@@ -320,6 +321,7 @@ class ProjectTest(unittest.TestCase):
                     'image': 'example/image',
                 },
                 'time': dt_with_microseconds(1420092061, 3),
+                'container': Container(None, {'Id': 'abcde'}),
             },
             {
                 'type': 'container',
@@ -331,6 +333,7 @@ class ProjectTest(unittest.TestCase):
                     'image': 'example/db',
                 },
                 'time': dt_with_microseconds(1420092061, 4),
+                'container': Container(None, {'Id': 'ababa'}),
             },
         ]