utils.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import sys
  6. from threading import Thread
  7. import six
  8. from docker.errors import APIError
  9. from six.moves.queue import Empty
  10. from six.moves.queue import Queue
  11. log = logging.getLogger(__name__)
  12. def parallel_execute(objects, obj_callable, msg_index, msg):
  13. """
  14. For a given list of objects, call the callable passing in the first
  15. object we give it.
  16. """
  17. stream = get_output_stream(sys.stdout)
  18. lines = []
  19. for obj in objects:
  20. write_out_msg(stream, lines, msg_index(obj), msg)
  21. q = Queue()
  22. def inner_execute_function(an_callable, parameter, msg_index):
  23. error = None
  24. try:
  25. result = an_callable(parameter)
  26. except APIError as e:
  27. error = e.explanation
  28. result = "error"
  29. except Exception as e:
  30. error = e
  31. result = 'unexpected_exception'
  32. q.put((msg_index, result, error))
  33. for an_object in objects:
  34. t = Thread(
  35. target=inner_execute_function,
  36. args=(obj_callable, an_object, msg_index(an_object)),
  37. )
  38. t.daemon = True
  39. t.start()
  40. done = 0
  41. errors = {}
  42. total_to_execute = len(objects)
  43. while done < total_to_execute:
  44. try:
  45. msg_index, result, error = q.get(timeout=1)
  46. if result == 'unexpected_exception':
  47. errors[msg_index] = result, error
  48. if result == 'error':
  49. errors[msg_index] = result, error
  50. write_out_msg(stream, lines, msg_index, msg, status='error')
  51. else:
  52. write_out_msg(stream, lines, msg_index, msg)
  53. done += 1
  54. except Empty:
  55. pass
  56. if not errors:
  57. return
  58. stream.write("\n")
  59. for msg_index, (result, error) in errors.items():
  60. stream.write("ERROR: for {} {} \n".format(msg_index, error))
  61. if result == 'unexpected_exception':
  62. raise error
  63. def get_output_stream(stream):
  64. if six.PY3:
  65. return stream
  66. return codecs.getwriter('utf-8')(stream)
  67. def write_out_msg(stream, lines, msg_index, msg, status="done"):
  68. """
  69. Using special ANSI code characters we can write out the msg over the top of
  70. a previous status message, if it exists.
  71. """
  72. obj_index = msg_index
  73. if msg_index in lines:
  74. position = lines.index(obj_index)
  75. diff = len(lines) - position
  76. # move up
  77. stream.write("%c[%dA" % (27, diff))
  78. # erase
  79. stream.write("%c[2K\r" % 27)
  80. stream.write("{} {} ... {}\n".format(msg, obj_index, status))
  81. # move back down
  82. stream.write("%c[%dB" % (27, diff))
  83. else:
  84. diff = 0
  85. lines.append(obj_index)
  86. stream.write("{} {} ... \r\n".format(msg, obj_index))
  87. stream.flush()
  88. def json_hash(obj):
  89. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  90. h = hashlib.sha256()
  91. h.update(dump.encode('utf8'))
  92. return h.hexdigest()