| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 | from __future__ import absolute_importfrom __future__ import unicode_literalsimport operatorimport sysfrom threading import Threadfrom docker.errors import APIErrorfrom six.moves import _thread as threadfrom six.moves.queue import Emptyfrom six.moves.queue import Queuefrom compose.cli.signals import ShutdownExceptionfrom compose.utils import get_output_streamdef parallel_execute(objects, func, get_name, msg, get_deps=None):    """Runs func on objects in parallel while ensuring that func is    ran on object only after it is ran on all its dependencies.    get_deps called on object must return a collection with its dependencies.    get_name called on object must return its name.    """    objects = list(objects)    stream = get_output_stream(sys.stderr)    writer = ParallelStreamWriter(stream, msg)    for obj in objects:        writer.initialize(get_name(obj))    q = setup_queue(objects, func, get_deps, get_name)    done = 0    errors = {}    error_to_reraise = None    returned = [None] * len(objects)    while done < len(objects):        try:            obj, result, exception = q.get(timeout=1)        except Empty:            continue        # See https://github.com/docker/compose/issues/189        except thread.error:            raise ShutdownException()        if exception is None:            writer.write(get_name(obj), 'done')            returned[objects.index(obj)] = result        elif isinstance(exception, APIError):            errors[get_name(obj)] = exception.explanation            writer.write(get_name(obj), 'error')        else:            errors[get_name(obj)] = exception            error_to_reraise = exception        done += 1    for obj_name, error in errors.items():        stream.write("\nERROR: for {}  {}\n".format(obj_name, error))    if error_to_reraise:        raise error_to_reraise    return returneddef _no_deps(x):    return []def setup_queue(objects, func, get_deps, get_name):    if get_deps is None:        get_deps = _no_deps    results = Queue()    started = set()   # objects, threads were started for    finished = set()  # already finished objects    def do_op(obj):        try:            result = func(obj)            results.put((obj, result, None))        except Exception as e:            results.put((obj, None, e))        finished.add(obj)        feed()    def ready(obj):        # Is object ready for performing operation        return obj not in started and all(            dep not in objects or dep in finished            for dep in get_deps(obj)        )    def feed():        ready_objects = [o for o in objects if ready(o)]        for obj in ready_objects:            started.add(obj)            t = Thread(target=do_op,                       args=(obj,))            t.daemon = True            t.start()    feed()    return resultsclass ParallelStreamWriter(object):    """Write out messages for operations happening in parallel.    Each operation has it's own line, and ANSI code characters are used    to jump to the correct line, and write over the line.    """    def __init__(self, stream, msg):        self.stream = stream        self.msg = msg        self.lines = []    def initialize(self, obj_index):        if self.msg is None:            return        self.lines.append(obj_index)        self.stream.write("{} {} ... \r\n".format(self.msg, obj_index))        self.stream.flush()    def write(self, obj_index, status):        if self.msg is None:            return        position = self.lines.index(obj_index)        diff = len(self.lines) - position        # move up        self.stream.write("%c[%dA" % (27, diff))        # erase        self.stream.write("%c[2K\r" % 27)        self.stream.write("{} {} ... {}\r".format(self.msg, obj_index, status))        # move back down        self.stream.write("%c[%dB" % (27, diff))        self.stream.flush()def parallel_operation(containers, operation, options, message):    parallel_execute(        containers,        operator.methodcaller(operation, **options),        operator.attrgetter('name'),        message)def parallel_remove(containers, options):    stopped_containers = [c for c in containers if not c.is_running]    parallel_operation(stopped_containers, 'remove', options, 'Removing')def parallel_stop(containers, options):    parallel_operation(containers, 'stop', options, 'Stopping')def parallel_start(containers, options):    parallel_operation(containers, 'start', options, 'Starting')def parallel_pause(containers, options):    parallel_operation(containers, 'pause', options, 'Pausing')def parallel_unpause(containers, options):    parallel_operation(containers, 'unpause', options, 'Unpausing')def parallel_kill(containers, options):    parallel_operation(containers, 'kill', options, 'Killing')def parallel_restart(containers, options):    parallel_operation(containers, 'restart', options, 'Restarting')
 |