utils.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import codecs
  2. import hashlib
  3. import json
  4. import json.decoder
  5. import logging
  6. import sys
  7. from threading import Thread
  8. import six
  9. from docker.errors import APIError
  10. from six.moves.queue import Empty
  11. from six.moves.queue import Queue
  12. log = logging.getLogger(__name__)
  13. json_decoder = json.JSONDecoder()
  14. def parallel_execute(objects, obj_callable, msg_index, msg):
  15. """
  16. For a given list of objects, call the callable passing in the first
  17. object we give it.
  18. """
  19. stream = get_output_stream(sys.stdout)
  20. lines = []
  21. for obj in objects:
  22. write_out_msg(stream, lines, msg_index(obj), msg)
  23. q = Queue()
  24. def inner_execute_function(an_callable, parameter, msg_index):
  25. error = None
  26. try:
  27. result = an_callable(parameter)
  28. except APIError as e:
  29. error = e.explanation
  30. result = "error"
  31. except Exception as e:
  32. error = e
  33. result = 'unexpected_exception'
  34. q.put((msg_index, result, error))
  35. for an_object in objects:
  36. t = Thread(
  37. target=inner_execute_function,
  38. args=(obj_callable, an_object, msg_index(an_object)),
  39. )
  40. t.daemon = True
  41. t.start()
  42. done = 0
  43. errors = {}
  44. total_to_execute = len(objects)
  45. while done < total_to_execute:
  46. try:
  47. msg_index, result, error = q.get(timeout=1)
  48. if result == 'unexpected_exception':
  49. errors[msg_index] = result, error
  50. if result == 'error':
  51. errors[msg_index] = result, error
  52. write_out_msg(stream, lines, msg_index, msg, status='error')
  53. else:
  54. write_out_msg(stream, lines, msg_index, msg)
  55. done += 1
  56. except Empty:
  57. pass
  58. if not errors:
  59. return
  60. stream.write("\n")
  61. for msg_index, (result, error) in errors.items():
  62. stream.write("ERROR: for {} {} \n".format(msg_index, error))
  63. if result == 'unexpected_exception':
  64. raise error
  65. def get_output_stream(stream):
  66. if six.PY3:
  67. return stream
  68. return codecs.getwriter('utf-8')(stream)
  69. def stream_as_text(stream):
  70. """Given a stream of bytes or text, if any of the items in the stream
  71. are bytes convert them to text.
  72. This function can be removed once docker-py returns text streams instead
  73. of byte streams.
  74. """
  75. for data in stream:
  76. if not isinstance(data, six.text_type):
  77. data = data.decode('utf-8')
  78. yield data
  79. def line_splitter(buffer, separator=u'\n'):
  80. index = buffer.find(six.text_type(separator))
  81. if index == -1:
  82. return None, None
  83. return buffer[:index + 1], buffer[index + 1:]
  84. def split_buffer(stream, splitter=None, decoder=lambda a: a):
  85. """Given a generator which yields strings and a splitter function,
  86. joins all input, splits on the separator and yields each chunk.
  87. Unlike string.split(), each chunk includes the trailing
  88. separator, except for the last one if none was found on the end
  89. of the input.
  90. """
  91. splitter = splitter or line_splitter
  92. buffered = six.text_type('')
  93. for data in stream_as_text(stream):
  94. buffered += data
  95. while True:
  96. item, rest = splitter(buffered)
  97. if not item:
  98. break
  99. buffered = rest
  100. yield item
  101. if buffered:
  102. yield decoder(buffered)
  103. def json_splitter(buffer):
  104. """Attempt to parse a json object from a buffer. If there is at least one
  105. object, return it and the rest of the buffer, otherwise return None.
  106. """
  107. try:
  108. obj, index = json_decoder.raw_decode(buffer)
  109. rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]
  110. return obj, rest
  111. except ValueError:
  112. return None, None
  113. def json_stream(stream):
  114. """Given a stream of text, return a stream of json objects.
  115. This handles streams which are inconsistently buffered (some entries may
  116. be newline delimited, and others are not).
  117. """
  118. return split_buffer(stream_as_text(stream), json_splitter, json_decoder.decode)
  119. def write_out_msg(stream, lines, msg_index, msg, status="done"):
  120. """
  121. Using special ANSI code characters we can write out the msg over the top of
  122. a previous status message, if it exists.
  123. """
  124. obj_index = msg_index
  125. if msg_index in lines:
  126. position = lines.index(obj_index)
  127. diff = len(lines) - position
  128. # move up
  129. stream.write("%c[%dA" % (27, diff))
  130. # erase
  131. stream.write("%c[2K\r" % 27)
  132. stream.write("{} {} ... {}\n".format(msg, obj_index, status))
  133. # move back down
  134. stream.write("%c[%dB" % (27, diff))
  135. else:
  136. diff = 0
  137. lines.append(obj_index)
  138. stream.write("{} {} ... \r\n".format(msg, obj_index))
  139. stream.flush()
  140. def json_hash(obj):
  141. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  142. h = hashlib.sha256()
  143. h.update(dump.encode('utf8'))
  144. return h.hexdigest()