|
@@ -43,55 +43,60 @@ class GlobalLimit(object):
|
|
|
cls.global_limiter = Semaphore(value)
|
|
|
|
|
|
|
|
|
-def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None):
|
|
|
- """Runs func on objects in parallel while ensuring that func is
|
|
|
- ran on object only after it is ran on all its dependencies.
|
|
|
-
|
|
|
- get_deps called on object must return a collection with its dependencies.
|
|
|
- get_name called on object must return its name.
|
|
|
+def parallel_execute_watch(events, writer, errors, results, msg, get_name):
|
|
|
+ """ Watch events from a parallel execution, update status and fill errors and results.
|
|
|
+ Returns exception to re-raise.
|
|
|
"""
|
|
|
- objects = list(objects)
|
|
|
- stream = get_output_stream(sys.stderr)
|
|
|
-
|
|
|
- writer = ParallelStreamWriter(stream, msg)
|
|
|
-
|
|
|
- display_objects = list(parent_objects) if parent_objects else objects
|
|
|
-
|
|
|
- for obj in display_objects:
|
|
|
- writer.add_object(get_name(obj))
|
|
|
-
|
|
|
- # write data in a second loop to consider all objects for width alignment
|
|
|
- # and avoid duplicates when parent_objects exists
|
|
|
- for obj in objects:
|
|
|
- writer.write_initial(get_name(obj))
|
|
|
-
|
|
|
- events = parallel_execute_iter(objects, func, get_deps, limit)
|
|
|
-
|
|
|
- errors = {}
|
|
|
- results = []
|
|
|
error_to_reraise = None
|
|
|
-
|
|
|
for obj, result, exception in events:
|
|
|
if exception is None:
|
|
|
- writer.write(get_name(obj), 'done', green)
|
|
|
+ writer.write(msg, get_name(obj), 'done', green)
|
|
|
results.append(result)
|
|
|
elif isinstance(exception, ImageNotFound):
|
|
|
# This is to bubble up ImageNotFound exceptions to the client so we
|
|
|
# can prompt the user if they want to rebuild.
|
|
|
errors[get_name(obj)] = exception.explanation
|
|
|
- writer.write(get_name(obj), 'error', red)
|
|
|
+ writer.write(msg, get_name(obj), 'error', red)
|
|
|
error_to_reraise = exception
|
|
|
elif isinstance(exception, APIError):
|
|
|
errors[get_name(obj)] = exception.explanation
|
|
|
- writer.write(get_name(obj), 'error', red)
|
|
|
+ writer.write(msg, get_name(obj), 'error', red)
|
|
|
elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)):
|
|
|
errors[get_name(obj)] = exception.msg
|
|
|
- writer.write(get_name(obj), 'error', red)
|
|
|
+ writer.write(msg, get_name(obj), 'error', red)
|
|
|
elif isinstance(exception, UpstreamError):
|
|
|
- writer.write(get_name(obj), 'error', red)
|
|
|
+ writer.write(msg, get_name(obj), 'error', red)
|
|
|
else:
|
|
|
errors[get_name(obj)] = exception
|
|
|
error_to_reraise = exception
|
|
|
+ return error_to_reraise
|
|
|
+
|
|
|
+
|
|
|
+def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
|
|
|
+ """Runs func on objects in parallel while ensuring that func is
|
|
|
+ ran on object only after it is ran on all its dependencies.
|
|
|
+
|
|
|
+ get_deps called on object must return a collection with its dependencies.
|
|
|
+ get_name called on object must return its name.
|
|
|
+ """
|
|
|
+ objects = list(objects)
|
|
|
+ stream = get_output_stream(sys.stderr)
|
|
|
+
|
|
|
+ if ParallelStreamWriter.instance:
|
|
|
+ writer = ParallelStreamWriter.instance
|
|
|
+ else:
|
|
|
+ writer = ParallelStreamWriter(stream)
|
|
|
+
|
|
|
+ for obj in objects:
|
|
|
+ writer.add_object(msg, get_name(obj))
|
|
|
+ for obj in objects:
|
|
|
+ writer.write_initial(msg, get_name(obj))
|
|
|
+
|
|
|
+ events = parallel_execute_iter(objects, func, get_deps, limit)
|
|
|
+
|
|
|
+ errors = {}
|
|
|
+ results = []
|
|
|
+ error_to_reraise = parallel_execute_watch(events, writer, errors, results, msg, get_name)
|
|
|
|
|
|
for obj_name, error in errors.items():
|
|
|
stream.write("\nERROR: for {} {}\n".format(obj_name, error))
|
|
@@ -253,55 +258,58 @@ class ParallelStreamWriter(object):
|
|
|
|
|
|
noansi = False
|
|
|
lock = Lock()
|
|
|
+ instance = None
|
|
|
|
|
|
@classmethod
|
|
|
def set_noansi(cls, value=True):
|
|
|
cls.noansi = value
|
|
|
|
|
|
- def __init__(self, stream, msg):
|
|
|
+ def __init__(self, stream):
|
|
|
self.stream = stream
|
|
|
- self.msg = msg
|
|
|
self.lines = []
|
|
|
self.width = 0
|
|
|
+ ParallelStreamWriter.instance = self
|
|
|
|
|
|
- def add_object(self, obj_index):
|
|
|
- self.lines.append(obj_index)
|
|
|
- self.width = max(self.width, len(obj_index))
|
|
|
+ def add_object(self, msg, obj_index):
|
|
|
+ if msg is None:
|
|
|
+ return
|
|
|
+ self.lines.append(msg + obj_index)
|
|
|
+ self.width = max(self.width, len(msg + ' ' + obj_index))
|
|
|
|
|
|
- def write_initial(self, obj_index):
|
|
|
- if self.msg is None:
|
|
|
+ def write_initial(self, msg, obj_index):
|
|
|
+ if msg is None:
|
|
|
return
|
|
|
- self.stream.write("{} {:<{width}} ... \r\n".format(
|
|
|
- self.msg, self.lines[self.lines.index(obj_index)], width=self.width))
|
|
|
+ self.stream.write("{:<{width}} ... \r\n".format(
|
|
|
+ msg + ' ' + obj_index, width=self.width))
|
|
|
self.stream.flush()
|
|
|
|
|
|
- def _write_ansi(self, obj_index, status):
|
|
|
+ def _write_ansi(self, msg, obj_index, status):
|
|
|
self.lock.acquire()
|
|
|
- position = self.lines.index(obj_index)
|
|
|
+ position = self.lines.index(msg + obj_index)
|
|
|
diff = len(self.lines) - position
|
|
|
# move up
|
|
|
self.stream.write("%c[%dA" % (27, diff))
|
|
|
# erase
|
|
|
self.stream.write("%c[2K\r" % 27)
|
|
|
- self.stream.write("{} {:<{width}} ... {}\r".format(self.msg, obj_index,
|
|
|
+ self.stream.write("{:<{width}} ... {}\r".format(msg + ' ' + obj_index,
|
|
|
status, width=self.width))
|
|
|
# move back down
|
|
|
self.stream.write("%c[%dB" % (27, diff))
|
|
|
self.stream.flush()
|
|
|
self.lock.release()
|
|
|
|
|
|
- def _write_noansi(self, obj_index, status):
|
|
|
- self.stream.write("{} {:<{width}} ... {}\r\n".format(self.msg, obj_index,
|
|
|
+ def _write_noansi(self, msg, obj_index, status):
|
|
|
+ self.stream.write("{:<{width}} ... {}\r\n".format(msg + ' ' + obj_index,
|
|
|
status, width=self.width))
|
|
|
self.stream.flush()
|
|
|
|
|
|
- def write(self, obj_index, status, color_func):
|
|
|
- if self.msg is None:
|
|
|
+ def write(self, msg, obj_index, status, color_func):
|
|
|
+ if msg is None:
|
|
|
return
|
|
|
if self.noansi:
|
|
|
- self._write_noansi(obj_index, status)
|
|
|
+ self._write_noansi(msg, obj_index, status)
|
|
|
else:
|
|
|
- self._write_ansi(obj_index, color_func(status))
|
|
|
+ self._write_ansi(msg, obj_index, color_func(status))
|
|
|
|
|
|
|
|
|
def parallel_operation(containers, operation, options, message):
|