scheduler.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import time
  2. import multiprocessing
  3. from proxypool.processors.server import app
  4. from proxypool.processors.getter import Getter
  5. from proxypool.processors.tester import Tester
  6. from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \
  7. ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
  8. from loguru import logger
  9. if IS_WINDOWS:
  10. multiprocessing.freeze_support()
  11. tester_process, getter_process, server_process = None, None, None
  12. class Scheduler():
  13. """
  14. scheduler
  15. """
  16. def run_tester(self, cycle=CYCLE_TESTER):
  17. """
  18. run tester
  19. """
  20. if not ENABLE_TESTER:
  21. logger.info('tester not enabled, exit')
  22. return
  23. tester = Tester()
  24. loop = 0
  25. while True:
  26. logger.debug(f'tester loop {loop} start...')
  27. tester.run()
  28. loop += 1
  29. time.sleep(cycle)
  30. def run_getter(self, cycle=CYCLE_GETTER):
  31. """
  32. run getter
  33. """
  34. if not ENABLE_GETTER:
  35. logger.info('getter not enabled, exit')
  36. return
  37. getter = Getter()
  38. loop = 0
  39. while True:
  40. logger.debug(f'getter loop {loop} start...')
  41. getter.run()
  42. loop += 1
  43. time.sleep(cycle)
  44. def run_server(self):
  45. """
  46. run server for api
  47. """
  48. if not ENABLE_SERVER:
  49. logger.info('server not enabled, exit')
  50. return
  51. app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
  52. def run(self):
  53. global tester_process, getter_process, server_process
  54. try:
  55. logger.info('starting proxypool...')
  56. if ENABLE_TESTER:
  57. tester_process = multiprocessing.Process(target=self.run_tester)
  58. logger.info(f'starting tester, pid {tester_process.pid}...')
  59. tester_process.start()
  60. if ENABLE_GETTER:
  61. getter_process = multiprocessing.Process(target=self.run_getter)
  62. logger.info(f'starting getter, pid{getter_process.pid}...')
  63. getter_process.start()
  64. if ENABLE_SERVER:
  65. server_process = multiprocessing.Process(target=self.run_server)
  66. logger.info(f'starting server, pid{server_process.pid}...')
  67. server_process.start()
  68. tester_process.join()
  69. getter_process.join()
  70. server_process.join()
  71. except KeyboardInterrupt:
  72. logger.info('received keyboard interrupt signal')
  73. tester_process.terminate()
  74. getter_process.terminate()
  75. server_process.terminate()
  76. finally:
  77. # must call join method before calling is_alive
  78. tester_process.join()
  79. getter_process.join()
  80. server_process.join()
  81. logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
  82. logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
  83. logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
  84. logger.info('proxy terminated')
  85. if __name__ == '__main__':
  86. scheduler = Scheduler()
  87. scheduler.run()