multiplexer.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. from __future__ import absolute_import
  2. from threading import Thread
  3. try:
  4. from Queue import Queue, Empty
  5. except ImportError:
  6. from queue import Queue, Empty # Python 3.x
  7. STOP = object()
  8. class Multiplexer(object):
  9. """
  10. Create a single iterator from several iterators by running all of them in
  11. parallel and yielding results as they come in.
  12. """
  13. def __init__(self, iterators):
  14. self.iterators = iterators
  15. self._num_running = len(iterators)
  16. self.queue = Queue()
  17. def loop(self):
  18. self._init_readers()
  19. while self._num_running > 0:
  20. try:
  21. item, exception = self.queue.get(timeout=0.1)
  22. if exception:
  23. raise exception
  24. if item is STOP:
  25. self._num_running -= 1
  26. else:
  27. yield item
  28. except Empty:
  29. pass
  30. def _init_readers(self):
  31. for iterator in self.iterators:
  32. t = Thread(target=_enqueue_output, args=(iterator, self.queue))
  33. t.daemon = True
  34. t.start()
  35. def _enqueue_output(iterator, queue):
  36. try:
  37. for item in iterator:
  38. queue.put((item, None))
  39. queue.put((STOP, None))
  40. except Exception as e:
  41. queue.put((None, e))