multiplexer.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. from threading import Thread
  4. from six.moves import _thread as thread
  5. try:
  6. from Queue import Queue, Empty
  7. except ImportError:
  8. from queue import Queue, Empty # Python 3.x
  9. STOP = object()
  10. class Multiplexer(object):
  11. """
  12. Create a single iterator from several iterators by running all of them in
  13. parallel and yielding results as they come in.
  14. """
  15. def __init__(self, iterators):
  16. self.iterators = iterators
  17. self._num_running = len(iterators)
  18. self.queue = Queue()
  19. def loop(self):
  20. self._init_readers()
  21. while self._num_running > 0:
  22. try:
  23. item, exception = self.queue.get(timeout=0.1)
  24. if exception:
  25. raise exception
  26. if item is STOP:
  27. self._num_running -= 1
  28. else:
  29. yield item
  30. except Empty:
  31. pass
  32. # See https://github.com/docker/compose/issues/189
  33. except thread.error:
  34. raise KeyboardInterrupt()
  35. def _init_readers(self):
  36. for iterator in self.iterators:
  37. t = Thread(target=_enqueue_output, args=(iterator, self.queue))
  38. t.daemon = True
  39. t.start()
  40. def _enqueue_output(iterator, queue):
  41. try:
  42. for item in iterator:
  43. queue.put((item, None))
  44. queue.put((STOP, None))
  45. except Exception as e:
  46. queue.put((None, e))