1
0

log_printer.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. import _thread as thread
  2. import sys
  3. from collections import namedtuple
  4. from itertools import cycle
  5. from queue import Empty
  6. from queue import Queue
  7. from threading import Thread
  8. from docker.errors import APIError
  9. from . import colors
  10. from compose.cli.signals import ShutdownException
  11. from compose.utils import split_buffer
  12. class LogPresenter(object):
  13. def __init__(self, prefix_width, color_func):
  14. self.prefix_width = prefix_width
  15. self.color_func = color_func
  16. def present(self, container, line):
  17. prefix = container.name_without_project.ljust(self.prefix_width)
  18. return '{prefix} {line}'.format(
  19. prefix=self.color_func(prefix + ' |'),
  20. line=line)
  21. def build_log_presenters(service_names, monochrome):
  22. """Return an iterable of functions.
  23. Each function can be used to format the logs output of a container.
  24. """
  25. prefix_width = max_name_width(service_names)
  26. def no_color(text):
  27. return text
  28. for color_func in cycle([no_color] if monochrome else colors.rainbow()):
  29. yield LogPresenter(prefix_width, color_func)
  30. def max_name_width(service_names, max_index_width=3):
  31. """Calculate the maximum width of container names so we can make the log
  32. prefixes line up like so:
  33. db_1 | Listening
  34. web_1 | Listening
  35. """
  36. return max(len(name) for name in service_names) + max_index_width
  37. class LogPrinter(object):
  38. """Print logs from many containers to a single output stream."""
  39. def __init__(self,
  40. containers,
  41. presenters,
  42. event_stream,
  43. output=sys.stdout,
  44. cascade_stop=False,
  45. log_args=None):
  46. self.containers = containers
  47. self.presenters = presenters
  48. self.event_stream = event_stream
  49. self.output = output
  50. self.cascade_stop = cascade_stop
  51. self.log_args = log_args or {}
  52. def run(self):
  53. if not self.containers:
  54. return
  55. queue = Queue()
  56. thread_args = queue, self.log_args
  57. thread_map = build_thread_map(self.containers, self.presenters, thread_args)
  58. start_producer_thread((
  59. thread_map,
  60. self.event_stream,
  61. self.presenters,
  62. thread_args))
  63. for line in consume_queue(queue, self.cascade_stop):
  64. remove_stopped_threads(thread_map)
  65. if self.cascade_stop:
  66. matching_container = [cont.name for cont in self.containers if cont.name == line]
  67. if line in matching_container:
  68. # Returning the name of the container that started the
  69. # the cascade_stop so we can return the correct exit code
  70. return line
  71. if not line:
  72. if not thread_map:
  73. # There are no running containers left to tail, so exit
  74. return
  75. # We got an empty line because of a timeout, but there are still
  76. # active containers to tail, so continue
  77. continue
  78. self.write(line)
  79. def write(self, line):
  80. try:
  81. self.output.write(line)
  82. except UnicodeEncodeError:
  83. # This may happen if the user's locale settings don't support UTF-8
  84. # and UTF-8 characters are present in the log line. The following
  85. # will output a "degraded" log with unsupported characters
  86. # replaced by `?`
  87. self.output.write(line.encode('ascii', 'replace').decode())
  88. self.output.flush()
  89. def remove_stopped_threads(thread_map):
  90. for container_id, tailer_thread in list(thread_map.items()):
  91. if not tailer_thread.is_alive():
  92. thread_map.pop(container_id, None)
  93. def build_thread(container, presenter, queue, log_args):
  94. tailer = Thread(
  95. target=tail_container_logs,
  96. args=(container, presenter, queue, log_args))
  97. tailer.daemon = True
  98. tailer.start()
  99. return tailer
  100. def build_thread_map(initial_containers, presenters, thread_args):
  101. return {
  102. container.id: build_thread(container, next(presenters), *thread_args)
  103. # Container order is unspecified, so they are sorted by name in order to make
  104. # container:presenter (log color) assignment deterministic when given a list of containers
  105. # with the same names.
  106. for container in sorted(initial_containers, key=lambda c: c.name)
  107. }
  108. class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
  109. @classmethod
  110. def new(cls, item):
  111. return cls(item, None, None)
  112. @classmethod
  113. def exception(cls, exc):
  114. return cls(None, None, exc)
  115. @classmethod
  116. def stop(cls, item=None):
  117. return cls(item, True, None)
  118. def tail_container_logs(container, presenter, queue, log_args):
  119. generator = get_log_generator(container)
  120. try:
  121. for item in generator(container, log_args):
  122. queue.put(QueueItem.new(presenter.present(container, item)))
  123. except Exception as e:
  124. queue.put(QueueItem.exception(e))
  125. return
  126. if log_args.get('follow'):
  127. queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
  128. queue.put(QueueItem.stop(container.name))
  129. def get_log_generator(container):
  130. if container.has_api_logs:
  131. return build_log_generator
  132. return build_no_log_generator
  133. def build_no_log_generator(container, log_args):
  134. """Return a generator that prints a warning about logs and waits for
  135. container to exit.
  136. """
  137. yield "WARNING: no logs are available with the '{}' log driver\n".format(
  138. container.log_driver)
  139. def build_log_generator(container, log_args):
  140. # if the container doesn't have a log_stream we need to attach to container
  141. # before log printer starts running
  142. if container.log_stream is None:
  143. stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
  144. else:
  145. stream = container.log_stream
  146. return split_buffer(stream)
  147. def wait_on_exit(container):
  148. try:
  149. exit_code = container.wait()
  150. return "%s exited with code %s\n" % (container.name, exit_code)
  151. except APIError as e:
  152. return "Unexpected API error for %s (HTTP code %s)\nResponse body:\n%s\n" % (
  153. container.name, e.response.status_code,
  154. e.response.text or '[empty]'
  155. )
  156. def start_producer_thread(thread_args):
  157. producer = Thread(target=watch_events, args=thread_args)
  158. producer.daemon = True
  159. producer.start()
  160. def watch_events(thread_map, event_stream, presenters, thread_args):
  161. crashed_containers = set()
  162. for event in event_stream:
  163. if event['action'] == 'stop':
  164. thread_map.pop(event['id'], None)
  165. if event['action'] == 'die':
  166. thread_map.pop(event['id'], None)
  167. crashed_containers.add(event['id'])
  168. if event['action'] != 'start':
  169. continue
  170. if event['id'] in thread_map:
  171. if thread_map[event['id']].is_alive():
  172. continue
  173. # Container was stopped and started, we need a new thread
  174. thread_map.pop(event['id'], None)
  175. # Container crashed so we should reattach to it
  176. if event['id'] in crashed_containers:
  177. container = event['container']
  178. if not container.is_restarting:
  179. try:
  180. container.attach_log_stream()
  181. except APIError:
  182. # Just ignore errors when reattaching to already crashed containers
  183. pass
  184. crashed_containers.remove(event['id'])
  185. thread_map[event['id']] = build_thread(
  186. event['container'],
  187. next(presenters),
  188. *thread_args
  189. )
  190. def consume_queue(queue, cascade_stop):
  191. """Consume the queue by reading lines off of it and yielding them."""
  192. while True:
  193. try:
  194. item = queue.get(timeout=0.1)
  195. except Empty:
  196. yield None
  197. continue
  198. # See https://github.com/docker/compose/issues/189
  199. except thread.error:
  200. raise ShutdownException()
  201. if item.exc:
  202. raise item.exc
  203. if item.is_stop and not cascade_stop:
  204. continue
  205. yield item.item