|
@@ -87,8 +87,7 @@ def parallel_execute_stream(objects, func, get_deps):
|
|
|
state = State(objects)
|
|
|
|
|
|
while not state.is_done():
|
|
|
- for event in feed_queue(objects, func, get_deps, results, state):
|
|
|
- yield event
|
|
|
+ feed_queue(objects, func, get_deps, results, state)
|
|
|
|
|
|
try:
|
|
|
event = results.get(timeout=0.1)
|
|
@@ -126,7 +125,7 @@ def feed_queue(objects, func, get_deps, results, state):
|
|
|
|
|
|
if any(dep in state.failed for dep in deps):
|
|
|
log.debug('{} has upstream errors - not processing'.format(obj))
|
|
|
- yield (obj, None, UpstreamError())
|
|
|
+ results.put((obj, None, UpstreamError()))
|
|
|
state.failed.add(obj)
|
|
|
elif all(
|
|
|
dep not in objects or dep in state.finished
|