Browse Source

Refactor log printing to support containers that are started later.

Signed-off-by: Daniel Nephin <[email protected]>
Daniel Nephin 10 years ago
parent
commit
65797558f8

+ 130 - 40
compose/cli/log_printer.py

@@ -3,66 +3,127 @@ from __future__ import unicode_literals
 
 import sys
 from itertools import cycle
+from threading import Thread
+
+from six.moves import _thread as thread
+from six.moves.queue import Empty
+from six.moves.queue import Queue
 
 from . import colors
-from .multiplexer import Multiplexer
 from compose import utils
+from compose.cli.signals import ShutdownException
 from compose.utils import split_buffer
 
 
+STOP = object()
+
+
+class LogPresenter(object):
+
+    def __init__(self, prefix_width, color_func):
+        self.prefix_width = prefix_width
+        self.color_func = color_func
+
+    def present(self, container, line):
+        prefix = container.name_without_project.ljust(self.prefix_width)
+        return '{prefix} {line}'.format(
+            prefix=self.color_func(prefix + ' |'),
+            line=line)
+
+
+def build_log_presenters(service_names, monochrome):
+    """Return an iterable of functions.
+
+    Each function can be used to format the logs output of a container.
+    """
+    prefix_width = max_name_width(service_names)
+
+    def no_color(text):
+        return text
+
+    for color_func in cycle([no_color] if monochrome else colors.rainbow()):
+        yield LogPresenter(prefix_width, color_func)
+
+
+def max_name_width(service_names, max_index_width=3):
+    """Calculate the maximum width of container names so we can make the log
+    prefixes line up like so:
+
+    db_1  | Listening
+    web_1 | Listening
+    """
+    return max(len(name) for name in service_names) + max_index_width
+
+
 class LogPrinter(object):
     """Print logs from many containers to a single output stream."""
 
     def __init__(self,
                  containers,
+                 presenters,
+                 event_stream,
                  output=sys.stdout,
-                 monochrome=False,
                  cascade_stop=False,
                  log_args=None):
-        log_args = log_args or {}
         self.containers = containers
+        self.presenters = presenters
+        self.event_stream = event_stream
         self.output = utils.get_output_stream(output)
-        self.monochrome = monochrome
         self.cascade_stop = cascade_stop
-        self.log_args = log_args
+        self.log_args = log_args or {}
 
     def run(self):
         if not self.containers:
             return
 
-        prefix_width = max_name_width(self.containers)
-        generators = list(self._make_log_generators(self.monochrome, prefix_width))
-        for line in Multiplexer(generators, cascade_stop=self.cascade_stop).loop():
+        queue = Queue()
+        thread_args = queue, self.log_args
+        thread_map = build_thread_map(self.containers, self.presenters, thread_args)
+        start_producer_thread(
+            thread_map,
+            self.event_stream,
+            self.presenters,
+            thread_args)
+
+        for line in consume_queue(queue, self.cascade_stop):
             self.output.write(line)
             self.output.flush()
 
-    def _make_log_generators(self, monochrome, prefix_width):
-        def no_color(text):
-            return text
+            # TODO: this needs more logic
+            # TODO: does consume_queue need to yield Nones to get to this point?
+            if not thread_map:
+                return
 
-        if monochrome:
-            color_funcs = cycle([no_color])
-        else:
-            color_funcs = cycle(colors.rainbow())
 
-        for color_func, container in zip(color_funcs, self.containers):
-            generator_func = get_log_generator(container)
-            prefix = color_func(build_log_prefix(container, prefix_width))
-            yield generator_func(container, prefix, color_func, self.log_args)
+def build_thread_map(initial_containers, presenters, thread_args):
+    def build_thread(container):
+        tailer = Thread(
+            target=tail_container_logs,
+            args=(container, presenters.next()) + thread_args)
+        tailer.daemon = True
+        tailer.start()
+        return tailer
 
+    return {
+        container.id: build_thread(container)
+        for container in initial_containers
+    }
 
-def build_log_prefix(container, prefix_width):
-    return container.name_without_project.ljust(prefix_width) + ' | '
 
+def tail_container_logs(container, presenter, queue, log_args):
+    generator = get_log_generator(container)
 
-def max_name_width(containers):
-    """Calculate the maximum width of container names so we can make the log
-    prefixes line up like so:
+    try:
+        for item in generator(container, log_args):
+            queue.put((item, None))
 
-    db_1  | Listening
-    web_1 | Listening
-    """
-    return max(len(container.name_without_project) for container in containers)
+        if log_args.get('follow'):
+            yield presenter.color_func(wait_on_exit(container))
+
+        queue.put((STOP, None))
+
+    except Exception as e:
+        queue.put((None, e))
 
 
 def get_log_generator(container):
@@ -71,32 +132,61 @@ def get_log_generator(container):
     return build_no_log_generator
 
 
-def build_no_log_generator(container, prefix, color_func, log_args):
+def build_no_log_generator(container, log_args):
     """Return a generator that prints a warning about logs and waits for
     container to exit.
     """
-    yield "{} WARNING: no logs are available with the '{}' log driver\n".format(
-        prefix,
+    yield "WARNING: no logs are available with the '{}' log driver\n".format(
         container.log_driver)
-    if log_args.get('follow'):
-        yield color_func(wait_on_exit(container))
 
 
-def build_log_generator(container, prefix, color_func, log_args):
+def build_log_generator(container, log_args):
     # if the container doesn't have a log_stream we need to attach to container
     # before log printer starts running
     if container.log_stream is None:
         stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
-        line_generator = split_buffer(stream)
     else:
-        line_generator = split_buffer(container.log_stream)
+        stream = container.log_stream
 
-    for line in line_generator:
-        yield prefix + line
-    if log_args.get('follow'):
-        yield color_func(wait_on_exit(container))
+    return split_buffer(stream)
 
 
 def wait_on_exit(container):
     exit_code = container.wait()
     return "%s exited with code %s\n" % (container.name, exit_code)
+
+
+def start_producer_thread(thread_map, event_stream, presenters, thread_args):
+    queue, log_args = thread_args
+
+    def watch_events():
+        for event in event_stream:
+            # TODO: handle start and stop events
+            pass
+
+    producer = Thread(target=watch_events)
+    producer.daemon = True
+    producer.start()
+
+
+def consume_queue(queue, cascade_stop):
+    """Consume the queue by reading lines off of it and yielding them."""
+    while True:
+        try:
+            item, exception = queue.get(timeout=0.1)
+        except Empty:
+            pass
+        # See https://github.com/docker/compose/issues/189
+        except thread.error:
+            raise ShutdownException()
+
+        if exception:
+            raise exception
+
+        if item is STOP:
+            if cascade_stop:
+                raise StopIteration
+            else:
+                continue
+
+        yield item

+ 0 - 66
compose/cli/multiplexer.py

@@ -1,66 +0,0 @@
-from __future__ import absolute_import
-from __future__ import unicode_literals
-
-from threading import Thread
-
-from six.moves import _thread as thread
-
-try:
-    from Queue import Queue, Empty
-except ImportError:
-    from queue import Queue, Empty  # Python 3.x
-
-from compose.cli.signals import ShutdownException
-
-STOP = object()
-
-
-class Multiplexer(object):
-    """
-    Create a single iterator from several iterators by running all of them in
-    parallel and yielding results as they come in.
-    """
-
-    def __init__(self, iterators, cascade_stop=False):
-        self.iterators = iterators
-        self.cascade_stop = cascade_stop
-        self._num_running = len(iterators)
-        self.queue = Queue()
-
-    def loop(self):
-        self._init_readers()
-
-        while self._num_running > 0:
-            try:
-                item, exception = self.queue.get(timeout=0.1)
-
-                if exception:
-                    raise exception
-
-                if item is STOP:
-                    if self.cascade_stop is True:
-                        break
-                    else:
-                        self._num_running -= 1
-                else:
-                    yield item
-            except Empty:
-                pass
-            # See https://github.com/docker/compose/issues/189
-            except thread.error:
-                raise ShutdownException()
-
-    def _init_readers(self):
-        for iterator in self.iterators:
-            t = Thread(target=_enqueue_output, args=(iterator, self.queue))
-            t.daemon = True
-            t.start()
-
-
-def _enqueue_output(iterator, queue):
-    try:
-        for item in iterator:
-            queue.put((item, None))
-        queue.put((STOP, None))
-    except Exception as e:
-        queue.put((None, e))

+ 2 - 1
compose/project.py

@@ -309,7 +309,8 @@ class Project(object):
                 'attributes': {
                     'name': container.name,
                     'image': event['from'],
-                }
+                },
+                'container': container,
             }
 
         service_names = set(self.service_names)

+ 39 - 0
tests/unit/cli/log_printer_test.py

@@ -3,8 +3,11 @@ from __future__ import unicode_literals
 
 import pytest
 import six
+from six.moves.queue import Queue
 
+from compose.cli.log_printer import consume_queue
 from compose.cli.log_printer import LogPrinter
+from compose.cli.log_printer import STOP
 from compose.cli.log_printer import wait_on_exit
 from compose.container import Container
 from tests import mock
@@ -36,6 +39,7 @@ def mock_container():
     return build_mock_container(reader)
 
 
[email protected](True, reason="wip")
 class TestLogPrinter(object):
 
     def test_single_container(self, output_stream, mock_container):
@@ -96,3 +100,38 @@ class TestLogPrinter(object):
         output = output_stream.getvalue()
         assert "WARNING: no logs are available with the 'none' log driver\n" in output
         assert "exited with code" not in output
+
+
+class TestConsumeQueue(object):
+
+    def test_item_is_an_exception(self):
+
+        class Problem(Exception):
+            pass
+
+        queue = Queue()
+        error = Problem('oops')
+        for item in ('a', None), ('b', None), (None, error):
+            queue.put(item)
+
+        generator = consume_queue(queue, False)
+        assert generator.next() == 'a'
+        assert generator.next() == 'b'
+        with pytest.raises(Problem):
+            generator.next()
+
+    def test_item_is_stop_without_cascade_stop(self):
+        queue = Queue()
+        for item in (STOP, None), ('a', None), ('b', None):
+            queue.put(item)
+
+        generator = consume_queue(queue, False)
+        assert generator.next() == 'a'
+        assert generator.next() == 'b'
+
+    def test_item_is_stop_with_cascade_stop(self):
+        queue = Queue()
+        for item in (STOP, None), ('a', None), ('b', None):
+            queue.put(item)
+
+        assert list(consume_queue(queue, True)) == []

+ 0 - 61
tests/unit/multiplexer_test.py

@@ -1,61 +0,0 @@
-from __future__ import absolute_import
-from __future__ import unicode_literals
-
-import unittest
-from time import sleep
-
-from compose.cli.multiplexer import Multiplexer
-
-
-class MultiplexerTest(unittest.TestCase):
-    def test_no_iterators(self):
-        mux = Multiplexer([])
-        self.assertEqual([], list(mux.loop()))
-
-    def test_empty_iterators(self):
-        mux = Multiplexer([
-            (x for x in []),
-            (x for x in []),
-        ])
-
-        self.assertEqual([], list(mux.loop()))
-
-    def test_aggregates_output(self):
-        mux = Multiplexer([
-            (x for x in [0, 2, 4]),
-            (x for x in [1, 3, 5]),
-        ])
-
-        self.assertEqual(
-            [0, 1, 2, 3, 4, 5],
-            sorted(list(mux.loop())),
-        )
-
-    def test_exception(self):
-        class Problem(Exception):
-            pass
-
-        def problematic_iterator():
-            yield 0
-            yield 2
-            raise Problem(":(")
-
-        mux = Multiplexer([
-            problematic_iterator(),
-            (x for x in [1, 3, 5]),
-        ])
-
-        with self.assertRaises(Problem):
-            list(mux.loop())
-
-    def test_cascade_stop(self):
-        def fast_stream():
-            for num in range(3):
-                yield "stream1 %s" % num
-
-        def slow_stream():
-            sleep(5)
-            yield "stream2 FAIL"
-
-        mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True)
-        assert "stream2 FAIL" not in set(mux.loop())

+ 3 - 0
tests/unit/project_test.py

@@ -307,6 +307,7 @@ class ProjectTest(unittest.TestCase):
                     'image': 'example/image',
                 },
                 'time': dt_with_microseconds(1420092061, 2),
+                'container': Container(None, {'Id': 'abcde'}),
             },
             {
                 'type': 'container',
@@ -318,6 +319,7 @@ class ProjectTest(unittest.TestCase):
                     'image': 'example/image',
                 },
                 'time': dt_with_microseconds(1420092061, 3),
+                'container': Container(None, {'Id': 'abcde'}),
             },
             {
                 'type': 'container',
@@ -329,6 +331,7 @@ class ProjectTest(unittest.TestCase):
                     'image': 'example/db',
                 },
                 'time': dt_with_microseconds(1420092061, 4),
+                'container': Container(None, {'Id': 'ababa'}),
             },
         ]