multiplexer.py 1017 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from __future__ import absolute_import
  2. from threading import Thread
  3. try:
  4. from Queue import Queue, Empty
  5. except ImportError:
  6. from queue import Queue, Empty # Python 3.x
  7. # Yield STOP from an input generator to stop the
  8. # top-level loop without processing any more input.
  9. STOP = object()
  10. class Multiplexer(object):
  11. def __init__(self, generators):
  12. self.generators = generators
  13. self.queue = Queue()
  14. def loop(self):
  15. self._init_readers()
  16. while True:
  17. try:
  18. item = self.queue.get(timeout=0.1)
  19. if item is STOP:
  20. break
  21. else:
  22. yield item
  23. except Empty:
  24. pass
  25. def _init_readers(self):
  26. for generator in self.generators:
  27. t = Thread(target=_enqueue_output, args=(generator, self.queue))
  28. t.daemon = True
  29. t.start()
  30. def _enqueue_output(generator, queue):
  31. for item in generator:
  32. queue.put(item)