log_printer.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import sys
  4. from itertools import cycle
  5. from threading import Thread
  6. from six.moves import _thread as thread
  7. from six.moves.queue import Empty
  8. from six.moves.queue import Queue
  9. from . import colors
  10. from compose import utils
  11. from compose.cli.signals import ShutdownException
  12. from compose.utils import split_buffer
  13. STOP = object()
  14. class LogPresenter(object):
  15. def __init__(self, prefix_width, color_func):
  16. self.prefix_width = prefix_width
  17. self.color_func = color_func
  18. def present(self, container, line):
  19. prefix = container.name_without_project.ljust(self.prefix_width)
  20. return '{prefix} {line}'.format(
  21. prefix=self.color_func(prefix + ' |'),
  22. line=line)
  23. def build_log_presenters(service_names, monochrome):
  24. """Return an iterable of functions.
  25. Each function can be used to format the logs output of a container.
  26. """
  27. prefix_width = max_name_width(service_names)
  28. def no_color(text):
  29. return text
  30. for color_func in cycle([no_color] if monochrome else colors.rainbow()):
  31. yield LogPresenter(prefix_width, color_func)
  32. def max_name_width(service_names, max_index_width=3):
  33. """Calculate the maximum width of container names so we can make the log
  34. prefixes line up like so:
  35. db_1 | Listening
  36. web_1 | Listening
  37. """
  38. return max(len(name) for name in service_names) + max_index_width
  39. class LogPrinter(object):
  40. """Print logs from many containers to a single output stream."""
  41. def __init__(self,
  42. containers,
  43. presenters,
  44. event_stream,
  45. output=sys.stdout,
  46. cascade_stop=False,
  47. log_args=None):
  48. self.containers = containers
  49. self.presenters = presenters
  50. self.event_stream = event_stream
  51. self.output = utils.get_output_stream(output)
  52. self.cascade_stop = cascade_stop
  53. self.log_args = log_args or {}
  54. def run(self):
  55. if not self.containers:
  56. return
  57. queue = Queue()
  58. thread_args = queue, self.log_args
  59. thread_map = build_thread_map(self.containers, self.presenters, thread_args)
  60. start_producer_thread(
  61. thread_map,
  62. self.event_stream,
  63. self.presenters,
  64. thread_args)
  65. for line in consume_queue(queue, self.cascade_stop):
  66. self.output.write(line)
  67. self.output.flush()
  68. # TODO: this needs more logic
  69. # TODO: does consume_queue need to yield Nones to get to this point?
  70. if not thread_map:
  71. return
  72. def build_thread_map(initial_containers, presenters, thread_args):
  73. def build_thread(container):
  74. tailer = Thread(
  75. target=tail_container_logs,
  76. args=(container, presenters.next()) + thread_args)
  77. tailer.daemon = True
  78. tailer.start()
  79. return tailer
  80. return {
  81. container.id: build_thread(container)
  82. for container in initial_containers
  83. }
  84. def tail_container_logs(container, presenter, queue, log_args):
  85. generator = get_log_generator(container)
  86. try:
  87. for item in generator(container, log_args):
  88. queue.put((item, None))
  89. if log_args.get('follow'):
  90. yield presenter.color_func(wait_on_exit(container))
  91. queue.put((STOP, None))
  92. except Exception as e:
  93. queue.put((None, e))
  94. def get_log_generator(container):
  95. if container.has_api_logs:
  96. return build_log_generator
  97. return build_no_log_generator
  98. def build_no_log_generator(container, log_args):
  99. """Return a generator that prints a warning about logs and waits for
  100. container to exit.
  101. """
  102. yield "WARNING: no logs are available with the '{}' log driver\n".format(
  103. container.log_driver)
  104. def build_log_generator(container, log_args):
  105. # if the container doesn't have a log_stream we need to attach to container
  106. # before log printer starts running
  107. if container.log_stream is None:
  108. stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
  109. else:
  110. stream = container.log_stream
  111. return split_buffer(stream)
  112. def wait_on_exit(container):
  113. exit_code = container.wait()
  114. return "%s exited with code %s\n" % (container.name, exit_code)
  115. def start_producer_thread(thread_map, event_stream, presenters, thread_args):
  116. queue, log_args = thread_args
  117. def watch_events():
  118. for event in event_stream:
  119. # TODO: handle start and stop events
  120. pass
  121. producer = Thread(target=watch_events)
  122. producer.daemon = True
  123. producer.start()
  124. def consume_queue(queue, cascade_stop):
  125. """Consume the queue by reading lines off of it and yielding them."""
  126. while True:
  127. try:
  128. item, exception = queue.get(timeout=0.1)
  129. except Empty:
  130. pass
  131. # See https://github.com/docker/compose/issues/189
  132. except thread.error:
  133. raise ShutdownException()
  134. if exception:
  135. raise exception
  136. if item is STOP:
  137. if cascade_stop:
  138. raise StopIteration
  139. else:
  140. continue
  141. yield item