1
0

multiplexer.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from __future__ import absolute_import
  2. from threading import Thread
  3. from six.moves import _thread as thread
  4. try:
  5. from Queue import Queue, Empty
  6. except ImportError:
  7. from queue import Queue, Empty # Python 3.x
  8. STOP = object()
  9. class Multiplexer(object):
  10. """
  11. Create a single iterator from several iterators by running all of them in
  12. parallel and yielding results as they come in.
  13. """
  14. def __init__(self, iterators):
  15. self.iterators = iterators
  16. self._num_running = len(iterators)
  17. self.queue = Queue()
  18. def loop(self):
  19. self._init_readers()
  20. while self._num_running > 0:
  21. try:
  22. item, exception = self.queue.get(timeout=0.1)
  23. if exception:
  24. raise exception
  25. if item is STOP:
  26. self._num_running -= 1
  27. else:
  28. yield item
  29. except Empty:
  30. pass
  31. # See https://github.com/docker/compose/issues/189
  32. except thread.error:
  33. raise KeyboardInterrupt()
  34. def _init_readers(self):
  35. for iterator in self.iterators:
  36. t = Thread(target=_enqueue_output, args=(iterator, self.queue))
  37. t.daemon = True
  38. t.start()
  39. def _enqueue_output(iterator, queue):
  40. try:
  41. for item in iterator:
  42. queue.put((item, None))
  43. queue.put((STOP, None))
  44. except Exception as e:
  45. queue.put((None, e))