|
@@ -12,6 +12,8 @@ from six.moves.queue import Empty
|
|
|
from six.moves.queue import Queue
|
|
|
|
|
|
from compose.cli.signals import ShutdownException
|
|
|
+from compose.errors import HealthCheckFailed
|
|
|
+from compose.errors import NoHealthCheckConfigured
|
|
|
from compose.errors import OperationFailedError
|
|
|
from compose.utils import get_output_stream
|
|
|
|
|
@@ -48,7 +50,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
|
|
|
elif isinstance(exception, APIError):
|
|
|
errors[get_name(obj)] = exception.explanation
|
|
|
writer.write(get_name(obj), 'error')
|
|
|
- elif isinstance(exception, OperationFailedError):
|
|
|
+ elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)):
|
|
|
errors[get_name(obj)] = exception.msg
|
|
|
writer.write(get_name(obj), 'error')
|
|
|
elif isinstance(exception, UpstreamError):
|
|
@@ -164,21 +166,27 @@ def feed_queue(objects, func, get_deps, results, state):
|
|
|
|
|
|
for obj in pending:
|
|
|
deps = get_deps(obj)
|
|
|
-
|
|
|
- if any(dep[0] in state.failed for dep in deps):
|
|
|
- log.debug('{} has upstream errors - not processing'.format(obj))
|
|
|
- results.put((obj, None, UpstreamError()))
|
|
|
- state.failed.add(obj)
|
|
|
- elif all(
|
|
|
- dep not in objects or (
|
|
|
- dep in state.finished and (not ready_check or ready_check(dep))
|
|
|
- ) for dep, ready_check in deps
|
|
|
- ):
|
|
|
- log.debug('Starting producer thread for {}'.format(obj))
|
|
|
- t = Thread(target=producer, args=(obj, func, results))
|
|
|
- t.daemon = True
|
|
|
- t.start()
|
|
|
- state.started.add(obj)
|
|
|
+ try:
|
|
|
+ if any(dep[0] in state.failed for dep in deps):
|
|
|
+ log.debug('{} has upstream errors - not processing'.format(obj))
|
|
|
+ results.put((obj, None, UpstreamError()))
|
|
|
+ state.failed.add(obj)
|
|
|
+ elif all(
|
|
|
+ dep not in objects or (
|
|
|
+ dep in state.finished and (not ready_check or ready_check(dep))
|
|
|
+ ) for dep, ready_check in deps
|
|
|
+ ):
|
|
|
+ log.debug('Starting producer thread for {}'.format(obj))
|
|
|
+ t = Thread(target=producer, args=(obj, func, results))
|
|
|
+ t.daemon = True
|
|
|
+ t.start()
|
|
|
+ state.started.add(obj)
|
|
|
+ except (HealthCheckFailed, NoHealthCheckConfigured) as e:
|
|
|
+ log.debug(
|
|
|
+ 'Healthcheck for service(s) upstream of {} failed - '
|
|
|
+ 'not processing'.format(obj)
|
|
|
+ )
|
|
|
+ results.put((obj, None, e))
|
|
|
|
|
|
if state.is_done():
|
|
|
results.put(STOP)
|