1
0

utils.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import os
  6. import sys
  7. from docker.errors import APIError
  8. import concurrent.futures
  9. from .const import DEFAULT_MAX_WORKERS
  10. log = logging.getLogger(__name__)
  11. def parallel_execute(command, containers, doing_msg, done_msg, **options):
  12. """
  13. Execute a given command upon a list of containers in parallel.
  14. """
  15. max_workers = os.environ.get('COMPOSE_MAX_WORKERS', DEFAULT_MAX_WORKERS)
  16. stream = codecs.getwriter('utf-8')(sys.stdout)
  17. lines = []
  18. errors = {}
  19. for container in containers:
  20. write_out_msg(stream, lines, container.name, doing_msg)
  21. def container_command_execute(container, command, **options):
  22. try:
  23. getattr(container, command)(**options)
  24. except APIError as e:
  25. errors[container.name] = e.explanation
  26. with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  27. future_container = {
  28. executor.submit(
  29. container_command_execute,
  30. container,
  31. command,
  32. **options
  33. ): container for container in containers
  34. }
  35. for future in concurrent.futures.as_completed(future_container):
  36. container = future_container[future]
  37. write_out_msg(stream, lines, container.name, done_msg)
  38. if errors:
  39. for container in errors:
  40. stream.write("ERROR: for {} {} \n".format(container, errors[container]))
  41. def write_out_msg(stream, lines, container_name, msg):
  42. """
  43. Using special ANSI code characters we can write out the msg over the top of
  44. a previous status message, if it exists.
  45. """
  46. if container_name in lines:
  47. position = lines.index(container_name)
  48. diff = len(lines) - position
  49. # move up
  50. stream.write("%c[%dA" % (27, diff))
  51. # erase
  52. stream.write("%c[2K\r" % 27)
  53. stream.write("{}: {} \n".format(container_name, msg))
  54. # move back down
  55. stream.write("%c[%dB" % (27, diff))
  56. else:
  57. diff = 0
  58. lines.append(container_name)
  59. stream.write("{}: {}... \r\n".format(container_name, msg))
  60. stream.flush()
  61. def json_hash(obj):
  62. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  63. h = hashlib.sha256()
  64. h.update(dump)
  65. return h.hexdigest()