| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import codecs
- import hashlib
- import json
- import logging
- import os
- import sys
- import concurrent.futures
- from .const import DEFAULT_MAX_WORKERS
- log = logging.getLogger(__name__)
- def parallel_execute(command, containers, doing_msg, done_msg, **options):
- """
- Execute a given command upon a list of containers in parallel.
- """
- max_workers = os.environ.get('COMPOSE_MAX_WORKERS', DEFAULT_MAX_WORKERS)
- stream = codecs.getwriter('utf-8')(sys.stdout)
- lines = []
- for container in containers:
- write_out_msg(stream, lines, container.name, doing_msg)
- def container_command_execute(container, command, **options):
- return getattr(container, command)(**options)
- with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_container = {
- executor.submit(
- container_command_execute,
- container,
- command,
- **options
- ): container for container in containers
- }
- for future in concurrent.futures.as_completed(future_container):
- container = future_container[future]
- write_out_msg(stream, lines, container.name, done_msg)
- def write_out_msg(stream, lines, container_name, msg):
- """
- Using special ANSI code characters we can write out the msg over the top of
- a previous status message, if it exists.
- """
- if container_name in lines:
- position = lines.index(container_name)
- diff = len(lines) - position
- # move up
- stream.write("%c[%dA" % (27, diff))
- # erase
- stream.write("%c[2K\r" % 27)
- stream.write("{}: {} \n".format(container_name, msg))
- # move back down
- stream.write("%c[%dB" % (27, diff))
- else:
- diff = 0
- lines.append(container_name)
- stream.write("{}: {}... \r\n".format(container_name, msg))
- stream.flush()
- def json_hash(obj):
- dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
- h = hashlib.sha256()
- h.update(dump)
- return h.hexdigest()
|