123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- from __future__ import absolute_import
- from __future__ import unicode_literals
- from threading import Thread
- from six.moves import _thread as thread
- try:
- from Queue import Queue, Empty
- except ImportError:
- from queue import Queue, Empty # Python 3.x
- from compose.cli.signals import ShutdownException
- STOP = object()
- class Multiplexer(object):
- """
- Create a single iterator from several iterators by running all of them in
- parallel and yielding results as they come in.
- """
- def __init__(self, iterators, cascade_stop=False):
- self.iterators = iterators
- self.cascade_stop = cascade_stop
- self._num_running = len(iterators)
- self.queue = Queue()
- def loop(self):
- self._init_readers()
- while self._num_running > 0:
- try:
- item, exception = self.queue.get(timeout=0.1)
- if exception:
- raise exception
- if item is STOP:
- if self.cascade_stop is True:
- break
- else:
- self._num_running -= 1
- else:
- yield item
- except Empty:
- pass
- # See https://github.com/docker/compose/issues/189
- except thread.error:
- raise ShutdownException()
- def _init_readers(self):
- for iterator in self.iterators:
- t = Thread(target=_enqueue_output, args=(iterator, self.queue))
- t.daemon = True
- t.start()
- def _enqueue_output(iterator, queue):
- try:
- for item in iterator:
- queue.put((item, None))
- queue.put((STOP, None))
- except Exception as e:
- queue.put((None, e))
|