utils.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import sys
  6. from Queue import Empty
  7. from Queue import Queue
  8. from threading import Thread
  9. from docker.errors import APIError
  10. log = logging.getLogger(__name__)
  11. def parallel_execute(objects, obj_callable, msg_index, msg):
  12. """
  13. For a given list of objects, call the callable passing in the first
  14. object we give it.
  15. """
  16. stream = codecs.getwriter('utf-8')(sys.stdout)
  17. lines = []
  18. errors = {}
  19. for obj in objects:
  20. write_out_msg(stream, lines, msg_index(obj), msg)
  21. q = Queue()
  22. def inner_execute_function(an_callable, parameter, msg_index):
  23. try:
  24. result = an_callable(parameter)
  25. except APIError as e:
  26. errors[msg_index] = e.explanation
  27. result = "error"
  28. except Exception as e:
  29. errors[msg_index] = e
  30. result = 'unexpected_exception'
  31. q.put((msg_index, result))
  32. for an_object in objects:
  33. t = Thread(
  34. target=inner_execute_function,
  35. args=(obj_callable, an_object, msg_index(an_object)),
  36. )
  37. t.daemon = True
  38. t.start()
  39. done = 0
  40. total_to_execute = len(objects)
  41. while done < total_to_execute:
  42. try:
  43. msg_index, result = q.get(timeout=1)
  44. if result == 'unexpected_exception':
  45. raise errors[msg_index]
  46. if result == 'error':
  47. write_out_msg(stream, lines, msg_index, msg, status='error')
  48. else:
  49. write_out_msg(stream, lines, msg_index, msg)
  50. done += 1
  51. except Empty:
  52. pass
  53. if errors:
  54. stream.write("\n")
  55. for error in errors:
  56. stream.write("ERROR: for {} {} \n".format(error, errors[error]))
  57. def write_out_msg(stream, lines, msg_index, msg, status="done"):
  58. """
  59. Using special ANSI code characters we can write out the msg over the top of
  60. a previous status message, if it exists.
  61. """
  62. obj_index = msg_index
  63. if msg_index in lines:
  64. position = lines.index(obj_index)
  65. diff = len(lines) - position
  66. # move up
  67. stream.write("%c[%dA" % (27, diff))
  68. # erase
  69. stream.write("%c[2K\r" % 27)
  70. stream.write("{} {}... {}\n".format(msg, obj_index, status))
  71. # move back down
  72. stream.write("%c[%dB" % (27, diff))
  73. else:
  74. diff = 0
  75. lines.append(obj_index)
  76. stream.write("{} {}... \r\n".format(msg, obj_index))
  77. stream.flush()
  78. def json_hash(obj):
  79. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  80. h = hashlib.sha256()
  81. h.update(dump.encode('utf8'))
  82. return h.hexdigest()