| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 | import _thread as threadimport sysfrom collections import namedtuplefrom itertools import cyclefrom operator import attrgetterfrom queue import Emptyfrom queue import Queuefrom threading import Threadfrom docker.errors import APIErrorfrom . import colorsfrom compose.cli.signals import ShutdownExceptionfrom compose.utils import split_bufferclass LogPresenter:    def __init__(self, prefix_width, color_func):        self.prefix_width = prefix_width        self.color_func = color_func    def present(self, container, line):        prefix = container.name_without_project.ljust(self.prefix_width)        return '{prefix} {line}'.format(            prefix=self.color_func(prefix + ' |'),            line=line)def build_log_presenters(service_names, monochrome):    """Return an iterable of functions.    Each function can be used to format the logs output of a container.    """    prefix_width = max_name_width(service_names)    def no_color(text):        return text    for color_func in cycle([no_color] if monochrome else colors.rainbow()):        yield LogPresenter(prefix_width, color_func)def max_name_width(service_names, max_index_width=3):    """Calculate the maximum width of container names so we can make the log    prefixes line up like so:    db_1  | Listening    web_1 | Listening    """    return max(len(name) for name in service_names) + max_index_widthclass LogPrinter:    """Print logs from many containers to a single output stream."""    def __init__(self,                 containers,                 presenters,                 event_stream,                 output=sys.stdout,                 cascade_stop=False,                 log_args=None):        self.containers = containers        self.presenters = presenters        self.event_stream = event_stream        self.output = output        self.cascade_stop = cascade_stop        self.log_args = log_args or {}    def run(self):        if not self.containers:            return        queue = Queue()        thread_args = queue, self.log_args        thread_map = build_thread_map(self.containers, self.presenters, thread_args)        start_producer_thread((            thread_map,            self.event_stream,            self.presenters,            thread_args))        for line in consume_queue(queue, self.cascade_stop):            remove_stopped_threads(thread_map)            if self.cascade_stop:                matching_container = [cont.name for cont in self.containers if cont.name == line]                if line in matching_container:                    # Returning the name of the container that started the                    # the cascade_stop so we can return the correct exit code                    return line            if not line:                if not thread_map:                    # There are no running containers left to tail, so exit                    return                # We got an empty line because of a timeout, but there are still                # active containers to tail, so continue                continue            self.write(line)    def write(self, line):        try:            self.output.write(line)        except UnicodeEncodeError:            # This may happen if the user's locale settings don't support UTF-8            # and UTF-8 characters are present in the log line. The following            # will output a "degraded" log with unsupported characters            # replaced by `?`            self.output.write(line.encode('ascii', 'replace').decode())        self.output.flush()def remove_stopped_threads(thread_map):    for container_id, tailer_thread in list(thread_map.items()):        if not tailer_thread.is_alive():            thread_map.pop(container_id, None)def build_thread(container, presenter, queue, log_args):    tailer = Thread(        target=tail_container_logs,        args=(container, presenter, queue, log_args))    tailer.daemon = True    tailer.start()    return tailerdef build_thread_map(initial_containers, presenters, thread_args):    return {        container.id: build_thread(container, next(presenters), *thread_args)        # Container order is unspecified, so they are sorted by name in order to make        # container:presenter (log color) assignment deterministic when given a list of containers        # with the same names.        for container in sorted(initial_containers, key=attrgetter('name'))    }class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):    @classmethod    def new(cls, item):        return cls(item, None, None)    @classmethod    def exception(cls, exc):        return cls(None, None, exc)    @classmethod    def stop(cls, item=None):        return cls(item, True, None)def tail_container_logs(container, presenter, queue, log_args):    generator = get_log_generator(container)    try:        for item in generator(container, log_args):            queue.put(QueueItem.new(presenter.present(container, item)))    except Exception as e:        queue.put(QueueItem.exception(e))        return    if log_args.get('follow'):        queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))    queue.put(QueueItem.stop(container.name))def get_log_generator(container):    if container.has_api_logs:        return build_log_generator    return build_no_log_generatordef build_no_log_generator(container, log_args):    """Return a generator that prints a warning about logs and waits for    container to exit.    """    yield "WARNING: no logs are available with the '{}' log driver\n".format(        container.log_driver)def build_log_generator(container, log_args):    # if the container doesn't have a log_stream we need to attach to container    # before log printer starts running    if container.log_stream is None:        stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)    else:        stream = container.log_stream    return split_buffer(stream)def wait_on_exit(container):    try:        exit_code = container.wait()        return "{} exited with code {}\n".format(container.name, exit_code)    except APIError as e:        return "Unexpected API error for {} (HTTP code {})\nResponse body:\n{}\n".format(            container.name, e.response.status_code,            e.response.text or '[empty]'        )def start_producer_thread(thread_args):    producer = Thread(target=watch_events, args=thread_args)    producer.daemon = True    producer.start()def watch_events(thread_map, event_stream, presenters, thread_args):    crashed_containers = set()    for event in event_stream:        if event['action'] == 'stop':            thread_map.pop(event['id'], None)        if event['action'] == 'die':            thread_map.pop(event['id'], None)            crashed_containers.add(event['id'])        if event['action'] != 'start':            continue        if event['id'] in thread_map:            if thread_map[event['id']].is_alive():                continue            # Container was stopped and started, we need a new thread            thread_map.pop(event['id'], None)        # Container crashed so we should reattach to it        if event['id'] in crashed_containers:            container = event['container']            if not container.is_restarting:                try:                    container.attach_log_stream()                except APIError:                    # Just ignore errors when reattaching to already crashed containers                    pass            crashed_containers.remove(event['id'])        thread_map[event['id']] = build_thread(            event['container'],            next(presenters),            *thread_args        )def consume_queue(queue, cascade_stop):    """Consume the queue by reading lines off of it and yielding them."""    while True:        try:            item = queue.get(timeout=0.1)        except Empty:            yield None            continue        # See https://github.com/docker/compose/issues/189        except thread.error:            raise ShutdownException()        if item.exc:            raise item.exc        if item.is_stop and not cascade_stop:            continue        yield item.item
 |