multiplexer.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. from __future__ import absolute_import
  2. from threading import Thread
  3. try:
  4. from Queue import Queue, Empty
  5. except ImportError:
  6. from queue import Queue, Empty # Python 3.x
  7. # Yield STOP from an input generator to stop the
  8. # top-level loop without processing any more input.
  9. STOP = object()
  10. class Multiplexer(object):
  11. """
  12. Create a single iterator from several iterators by running all of them in
  13. parallel and yielding results as they come in.
  14. """
  15. def __init__(self, iterators):
  16. self.iterators = iterators
  17. self.queue = Queue()
  18. def loop(self):
  19. self._init_readers()
  20. while True:
  21. try:
  22. item = self.queue.get(timeout=0.1)
  23. if item is STOP:
  24. break
  25. else:
  26. yield item
  27. except Empty:
  28. pass
  29. def _init_readers(self):
  30. for iterator in self.iterators:
  31. t = Thread(target=_enqueue_output, args=(iterator, self.queue))
  32. t.daemon = True
  33. t.start()
  34. def _enqueue_output(iterator, queue):
  35. for item in iterator:
  36. queue.put(item)