log_printer.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. import sys
  4. from collections import namedtuple
  5. from itertools import cycle
  6. from threading import Thread
  7. from docker.errors import APIError
  8. from six.moves import _thread as thread
  9. from six.moves.queue import Empty
  10. from six.moves.queue import Queue
  11. from . import colors
  12. from compose import utils
  13. from compose.cli.signals import ShutdownException
  14. from compose.utils import split_buffer
  15. class LogPresenter(object):
  16. def __init__(self, prefix_width, color_func):
  17. self.prefix_width = prefix_width
  18. self.color_func = color_func
  19. def present(self, container, line):
  20. prefix = container.name_without_project.ljust(self.prefix_width)
  21. return '{prefix} {line}'.format(
  22. prefix=self.color_func(prefix + ' |'),
  23. line=line)
  24. def build_log_presenters(service_names, monochrome):
  25. """Return an iterable of functions.
  26. Each function can be used to format the logs output of a container.
  27. """
  28. prefix_width = max_name_width(service_names)
  29. def no_color(text):
  30. return text
  31. for color_func in cycle([no_color] if monochrome else colors.rainbow()):
  32. yield LogPresenter(prefix_width, color_func)
  33. def max_name_width(service_names, max_index_width=3):
  34. """Calculate the maximum width of container names so we can make the log
  35. prefixes line up like so:
  36. db_1 | Listening
  37. web_1 | Listening
  38. """
  39. return max(len(name) for name in service_names) + max_index_width
  40. class LogPrinter(object):
  41. """Print logs from many containers to a single output stream."""
  42. def __init__(self,
  43. containers,
  44. presenters,
  45. event_stream,
  46. output=sys.stdout,
  47. cascade_stop=False,
  48. log_args=None):
  49. self.containers = containers
  50. self.presenters = presenters
  51. self.event_stream = event_stream
  52. self.output = utils.get_output_stream(output)
  53. self.cascade_stop = cascade_stop
  54. self.log_args = log_args or {}
  55. def run(self):
  56. if not self.containers:
  57. return
  58. queue = Queue()
  59. thread_args = queue, self.log_args
  60. thread_map = build_thread_map(self.containers, self.presenters, thread_args)
  61. start_producer_thread((
  62. thread_map,
  63. self.event_stream,
  64. self.presenters,
  65. thread_args))
  66. for line in consume_queue(queue, self.cascade_stop):
  67. remove_stopped_threads(thread_map)
  68. if self.cascade_stop:
  69. matching_container = [cont.name for cont in self.containers if cont.name == line]
  70. if line in matching_container:
  71. # Returning the name of the container that started the
  72. # the cascade_stop so we can return the correct exit code
  73. return line
  74. if not line:
  75. if not thread_map:
  76. # There are no running containers left to tail, so exit
  77. return
  78. # We got an empty line because of a timeout, but there are still
  79. # active containers to tail, so continue
  80. continue
  81. self.write(line)
  82. def write(self, line):
  83. try:
  84. self.output.write(line)
  85. except UnicodeEncodeError:
  86. # This may happen if the user's locale settings don't support UTF-8
  87. # and UTF-8 characters are present in the log line. The following
  88. # will output a "degraded" log with unsupported characters
  89. # replaced by `?`
  90. self.output.write(line.encode('ascii', 'replace').decode())
  91. self.output.flush()
  92. def remove_stopped_threads(thread_map):
  93. for container_id, tailer_thread in list(thread_map.items()):
  94. if not tailer_thread.is_alive():
  95. thread_map.pop(container_id, None)
  96. def build_thread(container, presenter, queue, log_args):
  97. tailer = Thread(
  98. target=tail_container_logs,
  99. args=(container, presenter, queue, log_args))
  100. tailer.daemon = True
  101. tailer.start()
  102. return tailer
  103. def build_thread_map(initial_containers, presenters, thread_args):
  104. return {
  105. container.id: build_thread(container, next(presenters), *thread_args)
  106. # Container order is unspecified, so they are sorted by name in order to make
  107. # container:presenter (log color) assignment deterministic when given a list of containers
  108. # with the same names.
  109. for container in sorted(initial_containers, key=lambda c: c.name)
  110. }
  111. class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
  112. @classmethod
  113. def new(cls, item):
  114. return cls(item, None, None)
  115. @classmethod
  116. def exception(cls, exc):
  117. return cls(None, None, exc)
  118. @classmethod
  119. def stop(cls, item=None):
  120. return cls(item, True, None)
  121. def tail_container_logs(container, presenter, queue, log_args):
  122. generator = get_log_generator(container)
  123. try:
  124. for item in generator(container, log_args):
  125. queue.put(QueueItem.new(presenter.present(container, item)))
  126. except Exception as e:
  127. queue.put(QueueItem.exception(e))
  128. return
  129. if log_args.get('follow'):
  130. queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
  131. queue.put(QueueItem.stop(container.name))
  132. def get_log_generator(container):
  133. if container.has_api_logs:
  134. return build_log_generator
  135. return build_no_log_generator
  136. def build_no_log_generator(container, log_args):
  137. """Return a generator that prints a warning about logs and waits for
  138. container to exit.
  139. """
  140. yield "WARNING: no logs are available with the '{}' log driver\n".format(
  141. container.log_driver)
  142. def build_log_generator(container, log_args):
  143. # if the container doesn't have a log_stream we need to attach to container
  144. # before log printer starts running
  145. if container.log_stream is None:
  146. stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
  147. else:
  148. stream = container.log_stream
  149. return split_buffer(stream)
  150. def wait_on_exit(container):
  151. try:
  152. exit_code = container.wait()
  153. return "%s exited with code %s\n" % (container.name, exit_code)
  154. except APIError as e:
  155. return "Unexpected API error for %s (HTTP code %s)\nResponse body:\n%s\n" % (
  156. container.name, e.response.status_code,
  157. e.response.text or '[empty]'
  158. )
  159. def start_producer_thread(thread_args):
  160. producer = Thread(target=watch_events, args=thread_args)
  161. producer.daemon = True
  162. producer.start()
  163. def watch_events(thread_map, event_stream, presenters, thread_args):
  164. crashed_containers = set()
  165. for event in event_stream:
  166. if event['action'] == 'stop':
  167. thread_map.pop(event['id'], None)
  168. if event['action'] == 'die':
  169. thread_map.pop(event['id'], None)
  170. crashed_containers.add(event['id'])
  171. if event['action'] != 'start':
  172. continue
  173. if event['id'] in thread_map:
  174. if thread_map[event['id']].is_alive():
  175. continue
  176. # Container was stopped and started, we need a new thread
  177. thread_map.pop(event['id'], None)
  178. # Container crashed so we should reattach to it
  179. if event['id'] in crashed_containers:
  180. container = event['container']
  181. if not container.is_restarting:
  182. try:
  183. container.attach_log_stream()
  184. except APIError:
  185. # Just ignore errors when reattaching to already crashed containers
  186. pass
  187. crashed_containers.remove(event['id'])
  188. thread_map[event['id']] = build_thread(
  189. event['container'],
  190. next(presenters),
  191. *thread_args
  192. )
  193. def consume_queue(queue, cascade_stop):
  194. """Consume the queue by reading lines off of it and yielding them."""
  195. while True:
  196. try:
  197. item = queue.get(timeout=0.1)
  198. except Empty:
  199. yield None
  200. continue
  201. # See https://github.com/docker/compose/issues/189
  202. except thread.error:
  203. raise ShutdownException()
  204. if item.exc:
  205. raise item.exc
  206. if item.is_stop and not cascade_stop:
  207. continue
  208. yield item.item