utils.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import sys
  6. from docker.errors import APIError
  7. from Queue import Queue, Empty
  8. from threading import Thread
  9. log = logging.getLogger(__name__)
  10. def parallel_execute(objects, obj_callable, msg_index, msg):
  11. """
  12. For a given list of objects, call the callable passing in the first
  13. object we give it.
  14. """
  15. stream = codecs.getwriter('utf-8')(sys.stdout)
  16. lines = []
  17. errors = {}
  18. for obj in objects:
  19. write_out_msg(stream, lines, msg_index(obj), msg)
  20. q = Queue()
  21. def inner_execute_function(an_callable, parameter, msg_index):
  22. try:
  23. result = an_callable(parameter)
  24. except APIError as e:
  25. errors[msg_index] = e.explanation
  26. result = "error"
  27. except Exception as e:
  28. errors[msg_index] = e
  29. result = 'unexpected_exception'
  30. q.put((msg_index, result))
  31. for an_object in objects:
  32. t = Thread(
  33. target=inner_execute_function,
  34. args=(obj_callable, an_object, msg_index(an_object)),
  35. )
  36. t.daemon = True
  37. t.start()
  38. done = 0
  39. total_to_execute = len(objects)
  40. while done < total_to_execute:
  41. try:
  42. msg_index, result = q.get(timeout=1)
  43. if result == 'unexpected_exception':
  44. raise errors[msg_index]
  45. if result == 'error':
  46. write_out_msg(stream, lines, msg_index, msg, status='error')
  47. else:
  48. write_out_msg(stream, lines, msg_index, msg)
  49. done += 1
  50. except Empty:
  51. pass
  52. if errors:
  53. stream.write("\n")
  54. for error in errors:
  55. stream.write("ERROR: for {} {} \n".format(error, errors[error]))
  56. def write_out_msg(stream, lines, msg_index, msg, status="done"):
  57. """
  58. Using special ANSI code characters we can write out the msg over the top of
  59. a previous status message, if it exists.
  60. """
  61. obj_index = msg_index
  62. if msg_index in lines:
  63. position = lines.index(obj_index)
  64. diff = len(lines) - position
  65. # move up
  66. stream.write("%c[%dA" % (27, diff))
  67. # erase
  68. stream.write("%c[2K\r" % 27)
  69. stream.write("{} {}... {}\n".format(msg, obj_index, status))
  70. # move back down
  71. stream.write("%c[%dB" % (27, diff))
  72. else:
  73. diff = 0
  74. lines.append(obj_index)
  75. stream.write("{} {}... \r\n".format(msg, obj_index))
  76. stream.flush()
  77. def json_hash(obj):
  78. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  79. h = hashlib.sha256()
  80. h.update(dump)
  81. return h.hexdigest()