utils.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import sys
  6. from docker.errors import APIError
  7. from Queue import Queue, Empty
  8. from threading import Thread
  9. log = logging.getLogger(__name__)
  10. def parallel_create_execute(create_function, container_numbers, msgs={}, **options):
  11. """
  12. Parallel container creation by calling the create_function for each new container
  13. number passed in.
  14. """
  15. stream = codecs.getwriter('utf-8')(sys.stdout)
  16. lines = []
  17. errors = {}
  18. for number in container_numbers:
  19. write_out_msg(stream, lines, number, msgs['doing'])
  20. q = Queue()
  21. def inner_call_function(create_function, number):
  22. try:
  23. container = create_function(number)
  24. except APIError as e:
  25. errors[number] = e.explanation
  26. q.put(container)
  27. for number in container_numbers:
  28. t = Thread(
  29. target=inner_call_function,
  30. args=(create_function, number),
  31. kwargs=options,
  32. )
  33. t.daemon = True
  34. t.start()
  35. done = 0
  36. total_to_create = len(container_numbers)
  37. while done < total_to_create:
  38. try:
  39. container = q.get(timeout=1)
  40. write_out_msg(stream, lines, container.name, msgs['done'])
  41. done += 1
  42. except Empty:
  43. pass
  44. if errors:
  45. for number in errors:
  46. stream.write("ERROR: for {} {} \n".format(number, errors[number]))
  47. def parallel_execute(command, containers, doing_msg, done_msg, **options):
  48. """
  49. Execute a given command upon a list of containers in parallel.
  50. """
  51. stream = codecs.getwriter('utf-8')(sys.stdout)
  52. lines = []
  53. errors = {}
  54. for container in containers:
  55. write_out_msg(stream, lines, container.name, doing_msg)
  56. q = Queue()
  57. def container_command_execute(container, command, **options):
  58. try:
  59. getattr(container, command)(**options)
  60. except APIError as e:
  61. errors[container.name] = e.explanation
  62. q.put(container)
  63. for container in containers:
  64. t = Thread(
  65. target=container_command_execute,
  66. args=(container, command),
  67. kwargs=options,
  68. )
  69. t.daemon = True
  70. t.start()
  71. done = 0
  72. while done < len(containers):
  73. try:
  74. container = q.get(timeout=1)
  75. write_out_msg(stream, lines, container.name, done_msg)
  76. done += 1
  77. except Empty:
  78. pass
  79. if errors:
  80. for container in errors:
  81. stream.write("ERROR: for {} {} \n".format(container, errors[container]))
  82. def write_out_msg(stream, lines, container_name, msg):
  83. """
  84. Using special ANSI code characters we can write out the msg over the top of
  85. a previous status message, if it exists.
  86. """
  87. if container_name in lines:
  88. position = lines.index(container_name)
  89. diff = len(lines) - position
  90. # move up
  91. stream.write("%c[%dA" % (27, diff))
  92. # erase
  93. stream.write("%c[2K\r" % 27)
  94. stream.write("{}: {} \n".format(container_name, msg))
  95. # move back down
  96. stream.write("%c[%dB" % (27, diff))
  97. else:
  98. diff = 0
  99. lines.append(container_name)
  100. stream.write("{}: {}... \r\n".format(container_name, msg))
  101. stream.flush()
  102. def json_hash(obj):
  103. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  104. h = hashlib.sha256()
  105. h.update(dump)
  106. return h.hexdigest()