123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- from __future__ import absolute_import
- from __future__ import unicode_literals
- import sys
- from collections import namedtuple
- from itertools import cycle
- from threading import Thread
- from docker.errors import APIError
- from six.moves import _thread as thread
- from six.moves.queue import Empty
- from six.moves.queue import Queue
- from . import colors
- from compose import utils
- from compose.cli.signals import ShutdownException
- from compose.utils import split_buffer
- class LogPresenter(object):
- def __init__(self, prefix_width, color_func):
- self.prefix_width = prefix_width
- self.color_func = color_func
- def present(self, container, line):
- prefix = container.name_without_project.ljust(self.prefix_width)
- return '{prefix} {line}'.format(
- prefix=self.color_func(prefix + ' |'),
- line=line)
- def build_log_presenters(service_names, monochrome):
- """Return an iterable of functions.
- Each function can be used to format the logs output of a container.
- """
- prefix_width = max_name_width(service_names)
- def no_color(text):
- return text
- for color_func in cycle([no_color] if monochrome else colors.rainbow()):
- yield LogPresenter(prefix_width, color_func)
- def max_name_width(service_names, max_index_width=3):
- """Calculate the maximum width of container names so we can make the log
- prefixes line up like so:
- db_1 | Listening
- web_1 | Listening
- """
- return max(len(name) for name in service_names) + max_index_width
- class LogPrinter(object):
- """Print logs from many containers to a single output stream."""
- def __init__(self,
- containers,
- presenters,
- event_stream,
- output=sys.stdout,
- cascade_stop=False,
- log_args=None):
- self.containers = containers
- self.presenters = presenters
- self.event_stream = event_stream
- self.output = utils.get_output_stream(output)
- self.cascade_stop = cascade_stop
- self.log_args = log_args or {}
- def run(self):
- if not self.containers:
- return
- queue = Queue()
- thread_args = queue, self.log_args
- thread_map = build_thread_map(self.containers, self.presenters, thread_args)
- start_producer_thread((
- thread_map,
- self.event_stream,
- self.presenters,
- thread_args))
- for line in consume_queue(queue, self.cascade_stop):
- remove_stopped_threads(thread_map)
- if self.cascade_stop:
- matching_container = [cont.name for cont in self.containers if cont.name == line]
- if line in matching_container:
- # Returning the name of the container that started the
- # the cascade_stop so we can return the correct exit code
- return line
- if not line:
- if not thread_map:
- # There are no running containers left to tail, so exit
- return
- # We got an empty line because of a timeout, but there are still
- # active containers to tail, so continue
- continue
- self.write(line)
- def write(self, line):
- try:
- self.output.write(line)
- except UnicodeEncodeError:
- # This may happen if the user's locale settings don't support UTF-8
- # and UTF-8 characters are present in the log line. The following
- # will output a "degraded" log with unsupported characters
- # replaced by `?`
- self.output.write(line.encode('ascii', 'replace').decode())
- self.output.flush()
- def remove_stopped_threads(thread_map):
- for container_id, tailer_thread in list(thread_map.items()):
- if not tailer_thread.is_alive():
- thread_map.pop(container_id, None)
- def build_thread(container, presenter, queue, log_args):
- tailer = Thread(
- target=tail_container_logs,
- args=(container, presenter, queue, log_args))
- tailer.daemon = True
- tailer.start()
- return tailer
- def build_thread_map(initial_containers, presenters, thread_args):
- return {
- container.id: build_thread(container, next(presenters), *thread_args)
- for container in initial_containers
- }
- class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
- @classmethod
- def new(cls, item):
- return cls(item, None, None)
- @classmethod
- def exception(cls, exc):
- return cls(None, None, exc)
- @classmethod
- def stop(cls, item=None):
- return cls(item, True, None)
- def tail_container_logs(container, presenter, queue, log_args):
- generator = get_log_generator(container)
- try:
- for item in generator(container, log_args):
- queue.put(QueueItem.new(presenter.present(container, item)))
- except Exception as e:
- queue.put(QueueItem.exception(e))
- return
- if log_args.get('follow'):
- queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
- queue.put(QueueItem.stop(container.name))
- def get_log_generator(container):
- if container.has_api_logs:
- return build_log_generator
- return build_no_log_generator
- def build_no_log_generator(container, log_args):
- """Return a generator that prints a warning about logs and waits for
- container to exit.
- """
- yield "WARNING: no logs are available with the '{}' log driver\n".format(
- container.log_driver)
- def build_log_generator(container, log_args):
- # if the container doesn't have a log_stream we need to attach to container
- # before log printer starts running
- if container.log_stream is None:
- stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
- else:
- stream = container.log_stream
- return split_buffer(stream)
- def wait_on_exit(container):
- try:
- exit_code = container.wait()
- return "%s exited with code %s\n" % (container.name, exit_code)
- except APIError as e:
- return "Unexpected API error for %s (HTTP code %s)\nResponse body:\n%s\n" % (
- container.name, e.response.status_code,
- e.response.text or '[empty]'
- )
- def start_producer_thread(thread_args):
- producer = Thread(target=watch_events, args=thread_args)
- producer.daemon = True
- producer.start()
- def watch_events(thread_map, event_stream, presenters, thread_args):
- crashed_containers = set()
- for event in event_stream:
- if event['action'] == 'stop':
- thread_map.pop(event['id'], None)
- if event['action'] == 'die':
- thread_map.pop(event['id'], None)
- crashed_containers.add(event['id'])
- if event['action'] != 'start':
- continue
- if event['id'] in thread_map:
- if thread_map[event['id']].is_alive():
- continue
- # Container was stopped and started, we need a new thread
- thread_map.pop(event['id'], None)
- # Container crashed so we should reattach to it
- if event['id'] in crashed_containers:
- event['container'].attach_log_stream()
- crashed_containers.remove(event['id'])
- thread_map[event['id']] = build_thread(
- event['container'],
- next(presenters),
- *thread_args
- )
- def consume_queue(queue, cascade_stop):
- """Consume the queue by reading lines off of it and yielding them."""
- while True:
- try:
- item = queue.get(timeout=0.1)
- except Empty:
- yield None
- continue
- # See https://github.com/docker/compose/issues/189
- except thread.error:
- raise ShutdownException()
- if item.exc:
- raise item.exc
- if item.is_stop and not cascade_stop:
- continue
- yield item.item
|