multiplexer.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. from threading import Thread
  4. from six.moves import _thread as thread
  5. try:
  6. from Queue import Queue, Empty
  7. except ImportError:
  8. from queue import Queue, Empty # Python 3.x
  9. from compose.cli.signals import ShutdownException
  10. STOP = object()
  11. class Multiplexer(object):
  12. """
  13. Create a single iterator from several iterators by running all of them in
  14. parallel and yielding results as they come in.
  15. """
  16. def __init__(self, iterators, cascade_stop=False):
  17. self.iterators = iterators
  18. self.cascade_stop = cascade_stop
  19. self._num_running = len(iterators)
  20. self.queue = Queue()
  21. def loop(self):
  22. self._init_readers()
  23. while self._num_running > 0:
  24. try:
  25. item, exception = self.queue.get(timeout=0.1)
  26. if exception:
  27. raise exception
  28. if item is STOP:
  29. if self.cascade_stop is True:
  30. break
  31. else:
  32. self._num_running -= 1
  33. else:
  34. yield item
  35. except Empty:
  36. pass
  37. # See https://github.com/docker/compose/issues/189
  38. except thread.error:
  39. raise ShutdownException()
  40. def _init_readers(self):
  41. for iterator in self.iterators:
  42. t = Thread(target=_enqueue_output, args=(iterator, self.queue))
  43. t.daemon = True
  44. t.start()
  45. def _enqueue_output(iterator, queue):
  46. try:
  47. for item in iterator:
  48. queue.put((item, None))
  49. queue.put((STOP, None))
  50. except Exception as e:
  51. queue.put((None, e))