multiplexer.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from __future__ import absolute_import
  2. from __future__ import unicode_literals
  3. from threading import Thread
  4. from six.moves import _thread as thread
  5. try:
  6. from Queue import Queue, Empty
  7. except ImportError:
  8. from queue import Queue, Empty # Python 3.x
  9. STOP = object()
  10. class Multiplexer(object):
  11. """
  12. Create a single iterator from several iterators by running all of them in
  13. parallel and yielding results as they come in.
  14. """
  15. def __init__(self, iterators, cascade_stop=False):
  16. self.iterators = iterators
  17. self.cascade_stop = cascade_stop
  18. self._num_running = len(iterators)
  19. self.queue = Queue()
  20. def loop(self):
  21. self._init_readers()
  22. while self._num_running > 0:
  23. try:
  24. item, exception = self.queue.get(timeout=0.1)
  25. if exception:
  26. raise exception
  27. if item is STOP:
  28. if self.cascade_stop is True:
  29. break
  30. else:
  31. self._num_running -= 1
  32. else:
  33. yield item
  34. except Empty:
  35. pass
  36. # See https://github.com/docker/compose/issues/189
  37. except thread.error:
  38. raise KeyboardInterrupt()
  39. def _init_readers(self):
  40. for iterator in self.iterators:
  41. t = Thread(target=_enqueue_output, args=(iterator, self.queue))
  42. t.daemon = True
  43. t.start()
  44. def _enqueue_output(iterator, queue):
  45. try:
  46. for item in iterator:
  47. queue.put((item, None))
  48. queue.put((STOP, None))
  49. except Exception as e:
  50. queue.put((None, e))