utils.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import codecs
  2. import hashlib
  3. import json
  4. import logging
  5. import os
  6. import sys
  7. import concurrent.futures
  8. from .const import DEFAULT_MAX_WORKERS
  9. log = logging.getLogger(__name__)
  10. def parallel_execute(command, containers, doing_msg, done_msg, **options):
  11. """
  12. Execute a given command upon a list of containers in parallel.
  13. """
  14. max_workers = os.environ.get('COMPOSE_MAX_WORKERS', DEFAULT_MAX_WORKERS)
  15. stream = codecs.getwriter('utf-8')(sys.stdout)
  16. lines = []
  17. for container in containers:
  18. write_out_msg(stream, lines, container.name, doing_msg)
  19. def container_command_execute(container, command, **options):
  20. return getattr(container, command)(**options)
  21. with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  22. future_container = {
  23. executor.submit(
  24. container_command_execute,
  25. container,
  26. command,
  27. **options
  28. ): container for container in containers
  29. }
  30. for future in concurrent.futures.as_completed(future_container):
  31. container = future_container[future]
  32. write_out_msg(stream, lines, container.name, done_msg)
  33. def write_out_msg(stream, lines, container_name, msg):
  34. """
  35. Using special ANSI code characters we can write out the msg over the top of
  36. a previous status message, if it exists.
  37. """
  38. if container_name in lines:
  39. position = lines.index(container_name)
  40. diff = len(lines) - position
  41. # move up
  42. stream.write("%c[%dA" % (27, diff))
  43. # erase
  44. stream.write("%c[2K\r" % 27)
  45. stream.write("{}: {} \n".format(container_name, msg))
  46. # move back down
  47. stream.write("%c[%dB" % (27, diff))
  48. else:
  49. diff = 0
  50. lines.append(container_name)
  51. stream.write("{}: {}... \r\n".format(container_name, msg))
  52. stream.flush()
  53. def json_hash(obj):
  54. dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
  55. h = hashlib.sha256()
  56. h.update(dump)
  57. return h.hexdigest()