utils.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import sys
  6. from threading import Thread
  7. import six
  8. from docker.errors import APIError
  9. from six.moves.queue import Empty
  10. from six.moves.queue import Queue
  11. log = logging.getLogger(__name__)
  12. def parallel_execute(objects, obj_callable, msg_index, msg):
  13. """
  14. For a given list of objects, call the callable passing in the first
  15. object we give it.
  16. """
  17. stream = get_output_stream(sys.stdout)
  18. lines = []
  19. errors = {}
  20. for obj in objects:
  21. write_out_msg(stream, lines, msg_index(obj), msg)
  22. q = Queue()
  23. def inner_execute_function(an_callable, parameter, msg_index):
  24. try:
  25. result = an_callable(parameter)
  26. except APIError as e:
  27. errors[msg_index] = e.explanation
  28. result = "error"
  29. except Exception as e:
  30. errors[msg_index] = e
  31. result = 'unexpected_exception'
  32. q.put((msg_index, result))
  33. for an_object in objects:
  34. t = Thread(
  35. target=inner_execute_function,
  36. args=(obj_callable, an_object, msg_index(an_object)),
  37. )
  38. t.daemon = True
  39. t.start()
  40. done = 0
  41. total_to_execute = len(objects)
  42. while done < total_to_execute:
  43. try:
  44. msg_index, result = q.get(timeout=1)
  45. if result == 'unexpected_exception':
  46. raise errors[msg_index]
  47. if result == 'error':
  48. write_out_msg(stream, lines, msg_index, msg, status='error')
  49. else:
  50. write_out_msg(stream, lines, msg_index, msg)
  51. done += 1
  52. except Empty:
  53. pass
  54. if errors:
  55. stream.write("\n")
  56. for error in errors:
  57. stream.write("ERROR: for {} {} \n".format(error, errors[error]))
  58. def get_output_stream(stream):
  59. if six.PY3:
  60. return stream
  61. return codecs.getwriter('utf-8')(stream)
  62. def write_out_msg(stream, lines, msg_index, msg, status="done"):
  63. """
  64. Using special ANSI code characters we can write out the msg over the top of
  65. a previous status message, if it exists.
  66. """
  67. obj_index = msg_index
  68. if msg_index in lines:
  69. position = lines.index(obj_index)
  70. diff = len(lines) - position
  71. # move up
  72. stream.write("%c[%dA" % (27, diff))
  73. # erase
  74. stream.write("%c[2K\r" % 27)
  75. stream.write("{} {} ... {}\n".format(msg, obj_index, status))
  76. # move back down
  77. stream.write("%c[%dB" % (27, diff))
  78. else:
  79. diff = 0
  80. lines.append(obj_index)
  81. stream.write("{} {} ... \r\n".format(msg, obj_index))
  82. stream.flush()
  83. def json_hash(obj):
  84. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  85. h = hashlib.sha256()
  86. h.update(dump.encode('utf8'))
  87. return h.hexdigest()