|
|
@@ -32,8 +32,8 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
|
|
|
|
|
|
done = 0
|
|
|
errors = {}
|
|
|
+ results = []
|
|
|
error_to_reraise = None
|
|
|
- returned = [None] * len(objects)
|
|
|
|
|
|
while done < len(objects):
|
|
|
try:
|
|
|
@@ -46,14 +46,13 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
|
|
|
|
|
|
if exception is None:
|
|
|
writer.write(get_name(obj), 'done')
|
|
|
- returned[objects.index(obj)] = result
|
|
|
+ results.append(result)
|
|
|
elif isinstance(exception, APIError):
|
|
|
errors[get_name(obj)] = exception.explanation
|
|
|
writer.write(get_name(obj), 'error')
|
|
|
else:
|
|
|
errors[get_name(obj)] = exception
|
|
|
error_to_reraise = exception
|
|
|
-
|
|
|
done += 1
|
|
|
|
|
|
for obj_name, error in errors.items():
|
|
|
@@ -62,7 +61,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
|
|
|
if error_to_reraise:
|
|
|
raise error_to_reraise
|
|
|
|
|
|
- return returned
|
|
|
+ return results
|
|
|
|
|
|
|
|
|
def _no_deps(x):
|
|
|
@@ -74,9 +73,8 @@ def setup_queue(objects, func, get_deps, get_name):
|
|
|
get_deps = _no_deps
|
|
|
|
|
|
results = Queue()
|
|
|
-
|
|
|
- started = set() # objects, threads were started for
|
|
|
- finished = set() # already finished objects
|
|
|
+ started = set() # objects being processed
|
|
|
+ finished = set() # objects which have been processed
|
|
|
|
|
|
def do_op(obj):
|
|
|
try:
|
|
|
@@ -96,11 +94,9 @@ def setup_queue(objects, func, get_deps, get_name):
|
|
|
)
|
|
|
|
|
|
def feed():
|
|
|
- ready_objects = [o for o in objects if ready(o)]
|
|
|
- for obj in ready_objects:
|
|
|
+ for obj in filter(ready, objects):
|
|
|
started.add(obj)
|
|
|
- t = Thread(target=do_op,
|
|
|
- args=(obj,))
|
|
|
+ t = Thread(target=do_op, args=(obj,))
|
|
|
t.daemon = True
|
|
|
t.start()
|
|
|
|