utils.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import sys
  6. from docker.errors import APIError
  7. from Queue import Queue, Empty
  8. from threading import Thread
  9. log = logging.getLogger(__name__)
  10. def parallel_execute(objects, obj_callable, msg_index, msg):
  11. """
  12. For a given list of objects, call the callable passing in the first
  13. object we give it.
  14. """
  15. stream = codecs.getwriter('utf-8')(sys.stdout)
  16. lines = []
  17. errors = {}
  18. for obj in objects:
  19. write_out_msg(stream, lines, msg_index(obj), msg)
  20. q = Queue()
  21. def inner_execute_function(an_callable, parameter, msg_index):
  22. try:
  23. result = an_callable(parameter)
  24. except APIError as e:
  25. errors[msg_index] = e.explanation
  26. result = "error"
  27. q.put((msg_index, result))
  28. for an_object in objects:
  29. t = Thread(
  30. target=inner_execute_function,
  31. args=(obj_callable, an_object, msg_index(an_object)),
  32. )
  33. t.daemon = True
  34. t.start()
  35. done = 0
  36. total_to_execute = len(objects)
  37. while done < total_to_execute:
  38. try:
  39. msg_index, result = q.get(timeout=1)
  40. if result == 'error':
  41. write_out_msg(stream, lines, msg_index, msg, status='error')
  42. else:
  43. write_out_msg(stream, lines, msg_index, msg)
  44. done += 1
  45. except Empty:
  46. pass
  47. if errors:
  48. stream.write("\n")
  49. for error in errors:
  50. stream.write("ERROR: for {} {} \n".format(error, errors[error]))
  51. def write_out_msg(stream, lines, msg_index, msg, status="done"):
  52. """
  53. Using special ANSI code characters we can write out the msg over the top of
  54. a previous status message, if it exists.
  55. """
  56. obj_index = msg_index
  57. if msg_index in lines:
  58. position = lines.index(obj_index)
  59. diff = len(lines) - position
  60. # move up
  61. stream.write("%c[%dA" % (27, diff))
  62. # erase
  63. stream.write("%c[2K\r" % 27)
  64. stream.write("{} {}... {}\n".format(msg, obj_index, status))
  65. # move back down
  66. stream.write("%c[%dB" % (27, diff))
  67. else:
  68. diff = 0
  69. lines.append(obj_index)
  70. stream.write("{} {}... \r\n".format(msg, obj_index))
  71. stream.flush()
  72. def json_hash(obj):
  73. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  74. h = hashlib.sha256()
  75. h.update(dump)
  76. return h.hexdigest()