log_printer.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. import _thread as thread
  2. import sys
  3. from collections import namedtuple
  4. from itertools import cycle
  5. from operator import attrgetter
  6. from queue import Empty
  7. from queue import Queue
  8. from threading import Thread
  9. from docker.errors import APIError
  10. from . import colors
  11. from compose.cli.signals import ShutdownException
  12. from compose.utils import split_buffer
  13. class LogPresenter:
  14. def __init__(self, prefix_width, color_func, keep_prefix=True):
  15. self.prefix_width = prefix_width
  16. self.color_func = color_func
  17. self.keep_prefix = keep_prefix
  18. def present(self, container, line):
  19. to_log = '{line}'.format(line=line)
  20. if self.keep_prefix:
  21. prefix = container.name_without_project.ljust(self.prefix_width)
  22. to_log = '{prefix} '.format(prefix=self.color_func(prefix + ' |')) + to_log
  23. return to_log
  24. def build_log_presenters(service_names, monochrome, keep_prefix=True):
  25. """Return an iterable of functions.
  26. Each function can be used to format the logs output of a container.
  27. """
  28. prefix_width = max_name_width(service_names)
  29. def no_color(text):
  30. return text
  31. for color_func in cycle([no_color] if monochrome else colors.rainbow()):
  32. yield LogPresenter(prefix_width, color_func, keep_prefix)
  33. def max_name_width(service_names, max_index_width=3):
  34. """Calculate the maximum width of container names so we can make the log
  35. prefixes line up like so:
  36. db_1 | Listening
  37. web_1 | Listening
  38. """
  39. return max(len(name) for name in service_names) + max_index_width
  40. class LogPrinter:
  41. """Print logs from many containers to a single output stream."""
  42. def __init__(self,
  43. containers,
  44. presenters,
  45. event_stream,
  46. output=sys.stdout,
  47. cascade_stop=False,
  48. log_args=None):
  49. self.containers = containers
  50. self.presenters = presenters
  51. self.event_stream = event_stream
  52. self.output = output
  53. self.cascade_stop = cascade_stop
  54. self.log_args = log_args or {}
  55. def run(self):
  56. if not self.containers:
  57. return
  58. queue = Queue()
  59. thread_args = queue, self.log_args
  60. thread_map = build_thread_map(self.containers, self.presenters, thread_args)
  61. start_producer_thread((
  62. thread_map,
  63. self.event_stream,
  64. self.presenters,
  65. thread_args))
  66. for line in consume_queue(queue, self.cascade_stop):
  67. remove_stopped_threads(thread_map)
  68. if self.cascade_stop:
  69. matching_container = [cont.name for cont in self.containers if cont.name == line]
  70. if line in matching_container:
  71. # Returning the name of the container that started the
  72. # the cascade_stop so we can return the correct exit code
  73. return line
  74. if not line:
  75. if not thread_map:
  76. # There are no running containers left to tail, so exit
  77. return
  78. # We got an empty line because of a timeout, but there are still
  79. # active containers to tail, so continue
  80. continue
  81. self.write(line)
  82. def write(self, line):
  83. try:
  84. self.output.write(line)
  85. except UnicodeEncodeError:
  86. # This may happen if the user's locale settings don't support UTF-8
  87. # and UTF-8 characters are present in the log line. The following
  88. # will output a "degraded" log with unsupported characters
  89. # replaced by `?`
  90. self.output.write(line.encode('ascii', 'replace').decode())
  91. self.output.flush()
  92. def remove_stopped_threads(thread_map):
  93. for container_id, tailer_thread in list(thread_map.items()):
  94. if not tailer_thread.is_alive():
  95. thread_map.pop(container_id, None)
  96. def build_thread(container, presenter, queue, log_args):
  97. tailer = Thread(
  98. target=tail_container_logs,
  99. args=(container, presenter, queue, log_args))
  100. tailer.daemon = True
  101. tailer.start()
  102. return tailer
  103. def build_thread_map(initial_containers, presenters, thread_args):
  104. return {
  105. container.id: build_thread(container, next(presenters), *thread_args)
  106. # Container order is unspecified, so they are sorted by name in order to make
  107. # container:presenter (log color) assignment deterministic when given a list of containers
  108. # with the same names.
  109. for container in sorted(initial_containers, key=attrgetter('name'))
  110. }
  111. class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
  112. @classmethod
  113. def new(cls, item):
  114. return cls(item, None, None)
  115. @classmethod
  116. def exception(cls, exc):
  117. return cls(None, None, exc)
  118. @classmethod
  119. def stop(cls, item=None):
  120. return cls(item, True, None)
  121. def tail_container_logs(container, presenter, queue, log_args):
  122. try:
  123. for item in build_log_generator(container, log_args):
  124. queue.put(QueueItem.new(presenter.present(container, item)))
  125. except Exception as e:
  126. queue.put(QueueItem.exception(e))
  127. return
  128. if log_args.get('follow'):
  129. queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
  130. queue.put(QueueItem.stop(container.name))
  131. def build_log_generator(container, log_args):
  132. # if the container doesn't have a log_stream we need to attach to container
  133. # before log printer starts running
  134. if container.log_stream is None:
  135. stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
  136. else:
  137. stream = container.log_stream
  138. return split_buffer(stream)
  139. def wait_on_exit(container):
  140. try:
  141. exit_code = container.wait()
  142. return "{} exited with code {}\n".format(container.name, exit_code)
  143. except APIError as e:
  144. return "Unexpected API error for {} (HTTP code {})\nResponse body:\n{}\n".format(
  145. container.name, e.response.status_code,
  146. e.response.text or '[empty]'
  147. )
  148. def start_producer_thread(thread_args):
  149. producer = Thread(target=watch_events, args=thread_args)
  150. producer.daemon = True
  151. producer.start()
  152. def watch_events(thread_map, event_stream, presenters, thread_args):
  153. crashed_containers = set()
  154. for event in event_stream:
  155. if event['action'] == 'stop':
  156. thread_map.pop(event['id'], None)
  157. if event['action'] == 'die':
  158. thread_map.pop(event['id'], None)
  159. crashed_containers.add(event['id'])
  160. if event['action'] != 'start':
  161. continue
  162. if event['id'] in thread_map:
  163. if thread_map[event['id']].is_alive():
  164. continue
  165. # Container was stopped and started, we need a new thread
  166. thread_map.pop(event['id'], None)
  167. # Container crashed so we should reattach to it
  168. if event['id'] in crashed_containers:
  169. container = event['container']
  170. if not container.is_restarting:
  171. try:
  172. container.attach_log_stream()
  173. except APIError:
  174. # Just ignore errors when reattaching to already crashed containers
  175. pass
  176. crashed_containers.remove(event['id'])
  177. thread_map[event['id']] = build_thread(
  178. event['container'],
  179. next(presenters),
  180. *thread_args
  181. )
  182. def consume_queue(queue, cascade_stop):
  183. """Consume the queue by reading lines off of it and yielding them."""
  184. while True:
  185. try:
  186. item = queue.get(timeout=0.1)
  187. except Empty:
  188. yield None
  189. continue
  190. # See https://github.com/docker/compose/issues/189
  191. except thread.error:
  192. raise ShutdownException()
  193. if item.exc:
  194. raise item.exc
  195. if item.is_stop and not cascade_stop:
  196. continue
  197. yield item.item