|
|
@@ -64,18 +64,30 @@ def _no_deps(x):
|
|
|
return []
|
|
|
|
|
|
|
|
|
+class State(object):
|
|
|
+ def __init__(self, objects):
|
|
|
+ self.objects = objects
|
|
|
+
|
|
|
+ self.started = set() # objects being processed
|
|
|
+ self.finished = set() # objects which have been processed
|
|
|
+ self.failed = set() # objects which either failed or whose dependencies failed
|
|
|
+
|
|
|
+ def is_done(self):
|
|
|
+ return len(self.finished) + len(self.failed) >= len(self.objects)
|
|
|
+
|
|
|
+ def pending(self):
|
|
|
+ return set(self.objects) - self.started - self.finished - self.failed
|
|
|
+
|
|
|
+
|
|
|
def parallel_execute_stream(objects, func, get_deps):
|
|
|
if get_deps is None:
|
|
|
get_deps = _no_deps
|
|
|
|
|
|
results = Queue()
|
|
|
+ state = State(objects)
|
|
|
|
|
|
- started = set() # objects being processed
|
|
|
- finished = set() # objects which have been processed
|
|
|
- failed = set() # objects which either failed or whose dependencies failed
|
|
|
-
|
|
|
- while len(finished) + len(failed) < len(objects):
|
|
|
- for event in feed_queue(objects, func, get_deps, results, started, finished, failed):
|
|
|
+ while not state.is_done():
|
|
|
+ for event in feed_queue(objects, func, get_deps, results, state):
|
|
|
yield event
|
|
|
|
|
|
try:
|
|
|
@@ -89,10 +101,10 @@ def parallel_execute_stream(objects, func, get_deps):
|
|
|
obj, _, exception = event
|
|
|
if exception is None:
|
|
|
log.debug('Finished processing: {}'.format(obj))
|
|
|
- finished.add(obj)
|
|
|
+ state.finished.add(obj)
|
|
|
else:
|
|
|
log.debug('Failed: {}'.format(obj))
|
|
|
- failed.add(obj)
|
|
|
+ state.failed.add(obj)
|
|
|
|
|
|
yield event
|
|
|
|
|
|
@@ -105,26 +117,26 @@ def queue_producer(obj, func, results):
|
|
|
results.put((obj, None, e))
|
|
|
|
|
|
|
|
|
-def feed_queue(objects, func, get_deps, results, started, finished, failed):
|
|
|
- pending = set(objects) - started - finished - failed
|
|
|
+def feed_queue(objects, func, get_deps, results, state):
|
|
|
+ pending = state.pending()
|
|
|
log.debug('Pending: {}'.format(pending))
|
|
|
|
|
|
for obj in pending:
|
|
|
deps = get_deps(obj)
|
|
|
|
|
|
- if any(dep in failed for dep in deps):
|
|
|
+ if any(dep in state.failed for dep in deps):
|
|
|
log.debug('{} has upstream errors - not processing'.format(obj))
|
|
|
yield (obj, None, UpstreamError())
|
|
|
- failed.add(obj)
|
|
|
+ state.failed.add(obj)
|
|
|
elif all(
|
|
|
- dep not in objects or dep in finished
|
|
|
+ dep not in objects or dep in state.finished
|
|
|
for dep in deps
|
|
|
):
|
|
|
log.debug('Starting producer thread for {}'.format(obj))
|
|
|
t = Thread(target=queue_producer, args=(obj, func, results))
|
|
|
t.daemon = True
|
|
|
t.start()
|
|
|
- started.add(obj)
|
|
|
+ state.started.add(obj)
|
|
|
|
|
|
|
|
|
class UpstreamError(Exception):
|