log_printer.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import _thread as thread
  2. import sys
  3. from collections import namedtuple
  4. from itertools import cycle
  5. from queue import Empty
  6. from queue import Queue
  7. from threading import Thread
  8. from docker.errors import APIError
  9. from . import colors
  10. from compose import utils
  11. from compose.cli.signals import ShutdownException
  12. from compose.utils import split_buffer
  13. class LogPresenter(object):
  14. def __init__(self, prefix_width, color_func):
  15. self.prefix_width = prefix_width
  16. self.color_func = color_func
  17. def present(self, container, line):
  18. prefix = container.name_without_project.ljust(self.prefix_width)
  19. return '{prefix} {line}'.format(
  20. prefix=self.color_func(prefix + ' |'),
  21. line=line)
  22. def build_log_presenters(service_names, monochrome):
  23. """Return an iterable of functions.
  24. Each function can be used to format the logs output of a container.
  25. """
  26. prefix_width = max_name_width(service_names)
  27. def no_color(text):
  28. return text
  29. for color_func in cycle([no_color] if monochrome else colors.rainbow()):
  30. yield LogPresenter(prefix_width, color_func)
  31. def max_name_width(service_names, max_index_width=3):
  32. """Calculate the maximum width of container names so we can make the log
  33. prefixes line up like so:
  34. db_1 | Listening
  35. web_1 | Listening
  36. """
  37. return max(len(name) for name in service_names) + max_index_width
  38. class LogPrinter(object):
  39. """Print logs from many containers to a single output stream."""
  40. def __init__(self,
  41. containers,
  42. presenters,
  43. event_stream,
  44. output=sys.stdout,
  45. cascade_stop=False,
  46. log_args=None):
  47. self.containers = containers
  48. self.presenters = presenters
  49. self.event_stream = event_stream
  50. self.output = utils.get_output_stream(output)
  51. self.cascade_stop = cascade_stop
  52. self.log_args = log_args or {}
  53. def run(self):
  54. if not self.containers:
  55. return
  56. queue = Queue()
  57. thread_args = queue, self.log_args
  58. thread_map = build_thread_map(self.containers, self.presenters, thread_args)
  59. start_producer_thread((
  60. thread_map,
  61. self.event_stream,
  62. self.presenters,
  63. thread_args))
  64. for line in consume_queue(queue, self.cascade_stop):
  65. remove_stopped_threads(thread_map)
  66. if self.cascade_stop:
  67. matching_container = [cont.name for cont in self.containers if cont.name == line]
  68. if line in matching_container:
  69. # Returning the name of the container that started the
  70. # the cascade_stop so we can return the correct exit code
  71. return line
  72. if not line:
  73. if not thread_map:
  74. # There are no running containers left to tail, so exit
  75. return
  76. # We got an empty line because of a timeout, but there are still
  77. # active containers to tail, so continue
  78. continue
  79. self.write(line)
  80. def write(self, line):
  81. try:
  82. self.output.write(line)
  83. except UnicodeEncodeError:
  84. # This may happen if the user's locale settings don't support UTF-8
  85. # and UTF-8 characters are present in the log line. The following
  86. # will output a "degraded" log with unsupported characters
  87. # replaced by `?`
  88. self.output.write(line.encode('ascii', 'replace').decode())
  89. self.output.flush()
  90. def remove_stopped_threads(thread_map):
  91. for container_id, tailer_thread in list(thread_map.items()):
  92. if not tailer_thread.is_alive():
  93. thread_map.pop(container_id, None)
  94. def build_thread(container, presenter, queue, log_args):
  95. tailer = Thread(
  96. target=tail_container_logs,
  97. args=(container, presenter, queue, log_args))
  98. tailer.daemon = True
  99. tailer.start()
  100. return tailer
  101. def build_thread_map(initial_containers, presenters, thread_args):
  102. return {
  103. container.id: build_thread(container, next(presenters), *thread_args)
  104. # Container order is unspecified, so they are sorted by name in order to make
  105. # container:presenter (log color) assignment deterministic when given a list of containers
  106. # with the same names.
  107. for container in sorted(initial_containers, key=lambda c: c.name)
  108. }
  109. class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
  110. @classmethod
  111. def new(cls, item):
  112. return cls(item, None, None)
  113. @classmethod
  114. def exception(cls, exc):
  115. return cls(None, None, exc)
  116. @classmethod
  117. def stop(cls, item=None):
  118. return cls(item, True, None)
  119. def tail_container_logs(container, presenter, queue, log_args):
  120. generator = get_log_generator(container)
  121. try:
  122. for item in generator(container, log_args):
  123. queue.put(QueueItem.new(presenter.present(container, item)))
  124. except Exception as e:
  125. queue.put(QueueItem.exception(e))
  126. return
  127. if log_args.get('follow'):
  128. queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
  129. queue.put(QueueItem.stop(container.name))
  130. def get_log_generator(container):
  131. if container.has_api_logs:
  132. return build_log_generator
  133. return build_no_log_generator
  134. def build_no_log_generator(container, log_args):
  135. """Return a generator that prints a warning about logs and waits for
  136. container to exit.
  137. """
  138. yield "WARNING: no logs are available with the '{}' log driver\n".format(
  139. container.log_driver)
  140. def build_log_generator(container, log_args):
  141. # if the container doesn't have a log_stream we need to attach to container
  142. # before log printer starts running
  143. if container.log_stream is None:
  144. stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
  145. else:
  146. stream = container.log_stream
  147. return split_buffer(stream)
  148. def wait_on_exit(container):
  149. try:
  150. exit_code = container.wait()
  151. return "%s exited with code %s\n" % (container.name, exit_code)
  152. except APIError as e:
  153. return "Unexpected API error for %s (HTTP code %s)\nResponse body:\n%s\n" % (
  154. container.name, e.response.status_code,
  155. e.response.text or '[empty]'
  156. )
  157. def start_producer_thread(thread_args):
  158. producer = Thread(target=watch_events, args=thread_args)
  159. producer.daemon = True
  160. producer.start()
  161. def watch_events(thread_map, event_stream, presenters, thread_args):
  162. crashed_containers = set()
  163. for event in event_stream:
  164. if event['action'] == 'stop':
  165. thread_map.pop(event['id'], None)
  166. if event['action'] == 'die':
  167. thread_map.pop(event['id'], None)
  168. crashed_containers.add(event['id'])
  169. if event['action'] != 'start':
  170. continue
  171. if event['id'] in thread_map:
  172. if thread_map[event['id']].is_alive():
  173. continue
  174. # Container was stopped and started, we need a new thread
  175. thread_map.pop(event['id'], None)
  176. # Container crashed so we should reattach to it
  177. if event['id'] in crashed_containers:
  178. container = event['container']
  179. if not container.is_restarting:
  180. try:
  181. container.attach_log_stream()
  182. except APIError:
  183. # Just ignore errors when reattaching to already crashed containers
  184. pass
  185. crashed_containers.remove(event['id'])
  186. thread_map[event['id']] = build_thread(
  187. event['container'],
  188. next(presenters),
  189. *thread_args
  190. )
  191. def consume_queue(queue, cascade_stop):
  192. """Consume the queue by reading lines off of it and yielding them."""
  193. while True:
  194. try:
  195. item = queue.get(timeout=0.1)
  196. except Empty:
  197. yield None
  198. continue
  199. # See https://github.com/docker/compose/issues/189
  200. except thread.error:
  201. raise ShutdownException()
  202. if item.exc:
  203. raise item.exc
  204. if item.is_stop and not cascade_stop:
  205. continue
  206. yield item.item