12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- import time
- from multiprocessing import Process
- from proxypool.api import app
- from proxypool.getter import Getter
- from proxypool.tester import Tester
- from proxypool.db import RedisClient
- from proxypool.setting import *
- class Scheduler(object):
- def schedule_tester(cycle=TESTER_CYCLE):
- """
- 定时测试代理
- """
- redis = RedisClient()
- tester = Tester()
- while True:
- print('测试器开始运行')
- count = redis.count()
- if count == 0:
- print('代理池已枯竭,等待添加代理')
- time.sleep(cycle)
- continue
- proxies = redis.all()
- tester.set_proxies(proxies)
- print('开始检测全部代理')
- tester.run()
- time.sleep(cycle)
-
- def schedule_getter(cycle=GETTER_CYCLE):
- """
- 定时获取代理
- """
- while True:
- print('开始抓取代理')
- getter = Getter()
- getter.run()
- time.sleep(cycle)
-
- def schedule_api(self):
- """
- 开启API
- """
- app.run(API_HOST, API_PORT)
-
- def run(self):
- print('代理池开始运行')
-
- if TESTER_ENABLED:
- tester_process = Process(target=self.schedule_tester)
- tester_process.start()
-
- if GETTER_ENABLED:
- getter_process = Process(target=self.schedule_getter)
- getter_process.start()
-
- if API_ENABLED:
- api_process = Process(target=self.schedule_api)
- api_process.start()
|