scheduler.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import time
  2. import multiprocessing
  3. from proxypool.processors.server import app
  4. from proxypool.processors.getter import Getter
  5. from proxypool.processors.tester import Tester
  6. from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, \
  7. API_THREADED, API_PORT, ENABLE_SERVER, IS_PROD, APP_PROD_METHOD, \
  8. ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
  9. from loguru import logger
  10. if IS_WINDOWS:
  11. multiprocessing.freeze_support()
  12. tester_process, getter_process, server_process = None, None, None
  13. class Scheduler():
  14. """
  15. scheduler
  16. """
  17. def run_tester(self, cycle=CYCLE_TESTER):
  18. """
  19. run tester
  20. """
  21. if not ENABLE_TESTER:
  22. logger.info('tester not enabled, exit')
  23. return
  24. tester = Tester()
  25. loop = 0
  26. while True:
  27. logger.debug(f'tester loop {loop} start...')
  28. tester.run()
  29. loop += 1
  30. time.sleep(cycle)
  31. def run_getter(self, cycle=CYCLE_GETTER):
  32. """
  33. run getter
  34. """
  35. if not ENABLE_GETTER:
  36. logger.info('getter not enabled, exit')
  37. return
  38. getter = Getter()
  39. loop = 0
  40. while True:
  41. logger.debug(f'getter loop {loop} start...')
  42. getter.run()
  43. loop += 1
  44. time.sleep(cycle)
  45. def run_server(self):
  46. """
  47. run server for api
  48. """
  49. if not ENABLE_SERVER:
  50. logger.info('server not enabled, exit')
  51. return
  52. if IS_PROD:
  53. if APP_PROD_METHOD == 'gevent':
  54. try:
  55. from gevent.pywsgi import WSGIServer
  56. except ImportError as e:
  57. logger.exception(e)
  58. else:
  59. http_server = WSGIServer((API_HOST, API_PORT), app)
  60. http_server.serve_forever()
  61. elif APP_PROD_METHOD == 'tornado':
  62. try:
  63. from tornado.wsgi import WSGIContainer
  64. from tornado.httpserver import HTTPServer
  65. from tornado.ioloop import IOLoop
  66. except ImportError as e:
  67. logger.exception(e)
  68. else:
  69. http_server = HTTPServer(WSGIContainer(app))
  70. http_server.listen(API_PORT)
  71. IOLoop.instance().start()
  72. elif APP_PROD_METHOD == "meinheld":
  73. try:
  74. import meinheld
  75. except ImportError as e:
  76. logger.exception(e)
  77. else:
  78. meinheld.listen((API_HOST, API_PORT))
  79. meinheld.run(app)
  80. else:
  81. logger.error("unsupported APP_PROD_METHOD")
  82. return
  83. else:
  84. app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
  85. def run(self):
  86. global tester_process, getter_process, server_process
  87. try:
  88. logger.info('starting proxypool...')
  89. if ENABLE_TESTER:
  90. tester_process = multiprocessing.Process(
  91. target=self.run_tester)
  92. logger.info(f'starting tester, pid {tester_process.pid}...')
  93. tester_process.start()
  94. if ENABLE_GETTER:
  95. getter_process = multiprocessing.Process(
  96. target=self.run_getter)
  97. logger.info(f'starting getter, pid {getter_process.pid}...')
  98. getter_process.start()
  99. if ENABLE_SERVER:
  100. server_process = multiprocessing.Process(
  101. target=self.run_server)
  102. logger.info(f'starting server, pid {server_process.pid}...')
  103. server_process.start()
  104. tester_process and tester_process.join()
  105. getter_process and getter_process.join()
  106. server_process and server_process.join()
  107. except KeyboardInterrupt:
  108. logger.info('received keyboard interrupt signal')
  109. tester_process and tester_process.terminate()
  110. getter_process and getter_process.terminate()
  111. server_process and server_process.terminate()
  112. finally:
  113. # must call join method before calling is_alive
  114. tester_process and tester_process.join()
  115. getter_process and getter_process.join()
  116. server_process and server_process.join()
  117. logger.info(
  118. f'tester is {"alive" if tester_process.is_alive() else "dead"}')
  119. logger.info(
  120. f'getter is {"alive" if getter_process.is_alive() else "dead"}')
  121. logger.info(
  122. f'server is {"alive" if server_process.is_alive() else "dead"}')
  123. logger.info('proxy terminated')
  124. if __name__ == '__main__':
  125. scheduler = Scheduler()
  126. scheduler.run()