| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- import codecs
- import hashlib
- import json
- import logging
- import sys
- from docker.errors import APIError
- from Queue import Queue, Empty
- from threading import Thread
- log = logging.getLogger(__name__)
- def parallel_create_execute(create_function, container_numbers, msgs={}, **options):
- """
- Parallel container creation by calling the create_function for each new container
- number passed in.
- """
- stream = codecs.getwriter('utf-8')(sys.stdout)
- lines = []
- errors = {}
- for number in container_numbers:
- write_out_msg(stream, lines, number, msgs['doing'])
- q = Queue()
- def inner_call_function(create_function, number):
- try:
- container = create_function(number)
- except APIError as e:
- errors[number] = e.explanation
- q.put(container)
- for number in container_numbers:
- t = Thread(
- target=inner_call_function,
- args=(create_function, number),
- kwargs=options,
- )
- t.daemon = True
- t.start()
- done = 0
- total_to_create = len(container_numbers)
- while done < total_to_create:
- try:
- container = q.get(timeout=1)
- write_out_msg(stream, lines, container.name, msgs['done'])
- done += 1
- except Empty:
- pass
- if errors:
- for number in errors:
- stream.write("ERROR: for {} {} \n".format(number, errors[number]))
- def parallel_execute(command, containers, doing_msg, done_msg, **options):
- """
- Execute a given command upon a list of containers in parallel.
- """
- stream = codecs.getwriter('utf-8')(sys.stdout)
- lines = []
- errors = {}
- for container in containers:
- write_out_msg(stream, lines, container.name, doing_msg)
- q = Queue()
- def container_command_execute(container, command, **options):
- try:
- getattr(container, command)(**options)
- except APIError as e:
- errors[container.name] = e.explanation
- q.put(container)
- for container in containers:
- t = Thread(
- target=container_command_execute,
- args=(container, command),
- kwargs=options,
- )
- t.daemon = True
- t.start()
- done = 0
- while done < len(containers):
- try:
- container = q.get(timeout=1)
- write_out_msg(stream, lines, container.name, done_msg)
- done += 1
- except Empty:
- pass
- if errors:
- for container in errors:
- stream.write("ERROR: for {} {} \n".format(container, errors[container]))
- def write_out_msg(stream, lines, container_name, msg):
- """
- Using special ANSI code characters we can write out the msg over the top of
- a previous status message, if it exists.
- """
- if container_name in lines:
- position = lines.index(container_name)
- diff = len(lines) - position
- # move up
- stream.write("%c[%dA" % (27, diff))
- # erase
- stream.write("%c[2K\r" % 27)
- stream.write("{}: {} \n".format(container_name, msg))
- # move back down
- stream.write("%c[%dB" % (27, diff))
- else:
- diff = 0
- lines.append(container_name)
- stream.write("{}: {}... \r\n".format(container_name, msg))
- stream.flush()
- def json_hash(obj):
- dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
- h = hashlib.sha256()
- h.update(dump)
- return h.hexdigest()
|