123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import time
- import multiprocessing
- from proxypool.processors.server import app
- from proxypool.processors.getter import Getter
- from proxypool.processors.tester import Tester
- from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \
- ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
- from loguru import logger
- if IS_WINDOWS:
- multiprocessing.freeze_support()
- tester_process, getter_process, server_process = None, None, None
- class Scheduler():
- """
- scheduler
- """
-
- def run_tester(self, cycle=CYCLE_TESTER):
- """
- run tester
- """
- if not ENABLE_TESTER:
- logger.info('tester not enabled, exit')
- return
- tester = Tester()
- loop = 0
- while True:
- logger.debug(f'tester loop {loop} start...')
- tester.run()
- loop += 1
- time.sleep(cycle)
-
- def run_getter(self, cycle=CYCLE_GETTER):
- """
- run getter
- """
- if not ENABLE_GETTER:
- logger.info('getter not enabled, exit')
- return
- getter = Getter()
- loop = 0
- while True:
- logger.debug(f'getter loop {loop} start...')
- getter.run()
- loop += 1
- time.sleep(cycle)
-
- def run_server(self):
- """
- run server for api
- """
- if not ENABLE_SERVER:
- logger.info('server not enabled, exit')
- return
- app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
-
- def run(self):
- global tester_process, getter_process, server_process
- try:
- logger.info('starting proxypool...')
- if ENABLE_TESTER:
- tester_process = multiprocessing.Process(target=self.run_tester)
- logger.info(f'starting tester, pid {tester_process.pid}...')
- tester_process.start()
-
- if ENABLE_GETTER:
- getter_process = multiprocessing.Process(target=self.run_getter)
- logger.info(f'starting getter, pid{getter_process.pid}...')
- getter_process.start()
-
- if ENABLE_SERVER:
- server_process = multiprocessing.Process(target=self.run_server)
- logger.info(f'starting server, pid{server_process.pid}...')
- server_process.start()
-
- tester_process.join()
- getter_process.join()
- server_process.join()
- except KeyboardInterrupt:
- logger.info('received keyboard interrupt signal')
- tester_process.terminate()
- getter_process.terminate()
- server_process.terminate()
- finally:
- # must call join method before calling is_alive
- tester_process.join()
- getter_process.join()
- server_process.join()
- logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
- logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
- logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
- logger.info('proxy terminated')
- if __name__ == '__main__':
- scheduler = Scheduler()
- scheduler.run()
|