utils.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import sys
  6. from threading import Thread
  7. import six
  8. from docker.errors import APIError
  9. from six.moves.queue import Empty
  10. from six.moves.queue import Queue
  11. log = logging.getLogger(__name__)
  12. def parallel_execute(objects, obj_callable, msg_index, msg):
  13. """
  14. For a given list of objects, call the callable passing in the first
  15. object we give it.
  16. """
  17. stream = get_output_stream(sys.stdout)
  18. lines = []
  19. for obj in objects:
  20. write_out_msg(stream, lines, msg_index(obj), msg)
  21. q = Queue()
  22. def inner_execute_function(an_callable, parameter, msg_index):
  23. error = None
  24. try:
  25. result = an_callable(parameter)
  26. except APIError as e:
  27. error = e.explanation
  28. result = "error"
  29. except Exception as e:
  30. error = e
  31. result = 'unexpected_exception'
  32. q.put((msg_index, result, error))
  33. for an_object in objects:
  34. t = Thread(
  35. target=inner_execute_function,
  36. args=(obj_callable, an_object, msg_index(an_object)),
  37. )
  38. t.daemon = True
  39. t.start()
  40. done = 0
  41. errors = {}
  42. total_to_execute = len(objects)
  43. while done < total_to_execute:
  44. try:
  45. msg_index, result, error = q.get(timeout=1)
  46. if result == 'unexpected_exception':
  47. errors[msg_index] = result, error
  48. if result == 'error':
  49. errors[msg_index] = result, error
  50. write_out_msg(stream, lines, msg_index, msg, status='error')
  51. else:
  52. write_out_msg(stream, lines, msg_index, msg)
  53. done += 1
  54. except Empty:
  55. pass
  56. if not errors:
  57. return
  58. stream.write("\n")
  59. for msg_index, (result, error) in errors.items():
  60. stream.write("ERROR: for {} {} \n".format(msg_index, error))
  61. if result == 'unexpected_exception':
  62. raise error
  63. def get_output_stream(stream):
  64. if six.PY3:
  65. return stream
  66. return codecs.getwriter('utf-8')(stream)
  67. def stream_as_text(stream):
  68. """Given a stream of bytes or text, if any of the items in the stream
  69. are bytes convert them to text.
  70. This function can be removed once docker-py returns text streams instead
  71. of byte streams.
  72. """
  73. for data in stream:
  74. if not isinstance(data, six.text_type):
  75. data = data.decode('utf-8')
  76. yield data
  77. def split_buffer(reader, separator=u'\n'):
  78. """
  79. Given a generator which yields strings and a separator string,
  80. joins all input, splits on the separator and yields each chunk.
  81. Unlike string.split(), each chunk includes the trailing
  82. separator, except for the last one if none was found on the end
  83. of the input.
  84. """
  85. buffered = six.text_type('')
  86. separator = six.text_type(separator)
  87. for data in stream_as_text(reader):
  88. buffered += data
  89. while True:
  90. index = buffered.find(separator)
  91. if index == -1:
  92. break
  93. yield buffered[:index + 1]
  94. buffered = buffered[index + 1:]
  95. if len(buffered) > 0:
  96. yield buffered
  97. def write_out_msg(stream, lines, msg_index, msg, status="done"):
  98. """
  99. Using special ANSI code characters we can write out the msg over the top of
  100. a previous status message, if it exists.
  101. """
  102. obj_index = msg_index
  103. if msg_index in lines:
  104. position = lines.index(obj_index)
  105. diff = len(lines) - position
  106. # move up
  107. stream.write("%c[%dA" % (27, diff))
  108. # erase
  109. stream.write("%c[2K\r" % 27)
  110. stream.write("{} {} ... {}\n".format(msg, obj_index, status))
  111. # move back down
  112. stream.write("%c[%dB" % (27, diff))
  113. else:
  114. diff = 0
  115. lines.append(obj_index)
  116. stream.write("{} {} ... \r\n".format(msg, obj_index))
  117. stream.flush()
  118. def json_hash(obj):
  119. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  120. h = hashlib.sha256()
  121. h.update(dump.encode('utf8'))
  122. return h.hexdigest()