| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 | import codecsimport hashlibimport jsonimport json.decoderimport loggingimport sysfrom threading import Threadimport sixfrom docker.errors import APIErrorfrom six.moves.queue import Emptyfrom six.moves.queue import Queuelog = logging.getLogger(__name__)json_decoder = json.JSONDecoder()def parallel_execute(objects, obj_callable, msg_index, msg):    """    For a given list of objects, call the callable passing in the first    object we give it.    """    stream = get_output_stream(sys.stdout)    lines = []    for obj in objects:        write_out_msg(stream, lines, msg_index(obj), msg)    q = Queue()    def inner_execute_function(an_callable, parameter, msg_index):        error = None        try:            result = an_callable(parameter)        except APIError as e:            error = e.explanation            result = "error"        except Exception as e:            error = e            result = 'unexpected_exception'        q.put((msg_index, result, error))    for an_object in objects:        t = Thread(            target=inner_execute_function,            args=(obj_callable, an_object, msg_index(an_object)),        )        t.daemon = True        t.start()    done = 0    errors = {}    total_to_execute = len(objects)    while done < total_to_execute:        try:            msg_index, result, error = q.get(timeout=1)            if result == 'unexpected_exception':                errors[msg_index] = result, error            if result == 'error':                errors[msg_index] = result, error                write_out_msg(stream, lines, msg_index, msg, status='error')            else:                write_out_msg(stream, lines, msg_index, msg)            done += 1        except Empty:            pass    if not errors:        return    stream.write("\n")    for msg_index, (result, error) in errors.items():        stream.write("ERROR: for {}  {} \n".format(msg_index, error))        if result == 'unexpected_exception':            raise errordef get_output_stream(stream):    if six.PY3:        return stream    return codecs.getwriter('utf-8')(stream)def stream_as_text(stream):    """Given a stream of bytes or text, if any of the items in the stream    are bytes convert them to text.    This function can be removed once docker-py returns text streams instead    of byte streams.    """    for data in stream:        if not isinstance(data, six.text_type):            data = data.decode('utf-8', 'replace')        yield datadef line_splitter(buffer, separator=u'\n'):    index = buffer.find(six.text_type(separator))    if index == -1:        return None    return buffer[:index + 1], buffer[index + 1:]def split_buffer(stream, splitter=None, decoder=lambda a: a):    """Given a generator which yields strings and a splitter function,    joins all input, splits on the separator and yields each chunk.    Unlike string.split(), each chunk includes the trailing    separator, except for the last one if none was found on the end    of the input.    """    splitter = splitter or line_splitter    buffered = six.text_type('')    for data in stream_as_text(stream):        buffered += data        while True:            buffer_split = splitter(buffered)            if buffer_split is None:                break            item, buffered = buffer_split            yield item    if buffered:        yield decoder(buffered)def json_splitter(buffer):    """Attempt to parse a json object from a buffer. If there is at least one    object, return it and the rest of the buffer, otherwise return None.    """    try:        obj, index = json_decoder.raw_decode(buffer)        rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]        return obj, rest    except ValueError:        return Nonedef json_stream(stream):    """Given a stream of text, return a stream of json objects.    This handles streams which are inconsistently buffered (some entries may    be newline delimited, and others are not).    """    return split_buffer(stream, json_splitter, json_decoder.decode)def write_out_msg(stream, lines, msg_index, msg, status="done"):    """    Using special ANSI code characters we can write out the msg over the top of    a previous status message, if it exists.    """    obj_index = msg_index    if msg_index in lines:        position = lines.index(obj_index)        diff = len(lines) - position        # move up        stream.write("%c[%dA" % (27, diff))        # erase        stream.write("%c[2K\r" % 27)        stream.write("{} {} ... {}\r".format(msg, obj_index, status))        # move back down        stream.write("%c[%dB" % (27, diff))    else:        diff = 0        lines.append(obj_index)        stream.write("{} {} ... \r\n".format(msg, obj_index))    stream.flush()def json_hash(obj):    dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))    h = hashlib.sha256()    h.update(dump.encode('utf8'))    return h.hexdigest()
 |