|
|
@@ -11,10 +11,6 @@ from asyncio import TimeoutError
|
|
|
|
|
|
|
|
|
class ValidityTester(object):
|
|
|
- """
|
|
|
- 检验器,负责对未知的代理进行异步检测。
|
|
|
- """
|
|
|
- # 用百度的首页来检验
|
|
|
test_api = TEST_API
|
|
|
|
|
|
def __init__(self):
|
|
|
@@ -22,15 +18,12 @@ class ValidityTester(object):
|
|
|
self._usable_proxies = []
|
|
|
|
|
|
def set_raw_proxies(self, proxies):
|
|
|
- """
|
|
|
- 设置待检测的代理。
|
|
|
- """
|
|
|
self._raw_proxies = proxies
|
|
|
- self._usable_proxies = []
|
|
|
+ self._conn = RedisClient()
|
|
|
|
|
|
async def test_single_proxy(self, proxy):
|
|
|
"""
|
|
|
- 检测单个代理,如果可用,则将其加入_usable_proxies
|
|
|
+ text one proxy, if valid, put them to usable_proxies.
|
|
|
"""
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
try:
|
|
|
@@ -40,14 +33,14 @@ class ValidityTester(object):
|
|
|
print('Testing', proxy)
|
|
|
async with session.get(self.test_api, proxy=real_proxy, timeout=15) as response:
|
|
|
if response.status == 200:
|
|
|
- self._usable_proxies.append(proxy)
|
|
|
+ self._conn.put(proxy)
|
|
|
print('Valid proxy', proxy)
|
|
|
- except (ProxyConnectionError, TimeoutError):
|
|
|
+ except (ProxyConnectionError, TimeoutError, ValueError):
|
|
|
print('Invalid proxy', proxy)
|
|
|
|
|
|
def test(self):
|
|
|
"""
|
|
|
- 异步检测_raw_proxies中的全部代理。
|
|
|
+ aio test all proxies.
|
|
|
"""
|
|
|
print('ValidityTester is working')
|
|
|
try:
|
|
|
@@ -57,13 +50,10 @@ class ValidityTester(object):
|
|
|
except ValueError:
|
|
|
print('Async Error')
|
|
|
|
|
|
- def get_usable_proxies(self):
|
|
|
- return self._usable_proxies
|
|
|
-
|
|
|
|
|
|
class PoolAdder(object):
|
|
|
"""
|
|
|
- 添加器,负责向池中补充代理
|
|
|
+ add proxy to pool
|
|
|
"""
|
|
|
|
|
|
def __init__(self, threshold):
|
|
|
@@ -74,7 +64,7 @@ class PoolAdder(object):
|
|
|
|
|
|
def is_over_threshold(self):
|
|
|
"""
|
|
|
- 判断代理池中的数据量是否达到阈值。
|
|
|
+ judge if count is overflow.
|
|
|
"""
|
|
|
if self._conn.queue_len >= self._threshold:
|
|
|
return True
|
|
|
@@ -82,20 +72,15 @@ class PoolAdder(object):
|
|
|
return False
|
|
|
|
|
|
def add_to_queue(self):
|
|
|
- """
|
|
|
- 命令爬虫抓取一定量未检测的代理,然后检测,将通过检测的代理
|
|
|
- 加入到代理池中。
|
|
|
- """
|
|
|
print('PoolAdder is working')
|
|
|
proxy_count = 0
|
|
|
while not self.is_over_threshold():
|
|
|
for callback_label in range(self._crawler.__CrawlFuncCount__):
|
|
|
callback = self._crawler.__CrawlFunc__[callback_label]
|
|
|
raw_proxies = self._crawler.get_raw_proxies(callback)
|
|
|
+ # test crawled proxies
|
|
|
self._tester.set_raw_proxies(raw_proxies)
|
|
|
self._tester.test()
|
|
|
- proxies = self._tester.get_usable_proxies()
|
|
|
- self._conn.put_many(proxies)
|
|
|
proxy_count += len(raw_proxies)
|
|
|
if self.is_over_threshold():
|
|
|
print('IP is enough, waiting to be used')
|
|
|
@@ -105,16 +90,10 @@ class PoolAdder(object):
|
|
|
|
|
|
|
|
|
class Schedule(object):
|
|
|
- """
|
|
|
- 总调度器,用于协调各调度器模块
|
|
|
- """
|
|
|
-
|
|
|
@staticmethod
|
|
|
def valid_proxy(cycle=VALID_CHECK_CYCLE):
|
|
|
"""
|
|
|
- 对已经如池的代理进行检测,防止池中的代理因长期
|
|
|
- 不使用而过期。
|
|
|
- 抽出代理池队列中前1/2的代理,检测,合格者压入队列尾。
|
|
|
+ Get half of proxies which in redis
|
|
|
"""
|
|
|
conn = RedisClient()
|
|
|
tester = ValidityTester()
|
|
|
@@ -128,8 +107,6 @@ class Schedule(object):
|
|
|
raw_proxies = conn.get(count)
|
|
|
tester.set_raw_proxies(raw_proxies)
|
|
|
tester.test()
|
|
|
- proxies = tester.get_usable_proxies()
|
|
|
- conn.put_many(proxies)
|
|
|
time.sleep(cycle)
|
|
|
|
|
|
@staticmethod
|
|
|
@@ -137,8 +114,7 @@ class Schedule(object):
|
|
|
upper_threshold=POOL_UPPER_THRESHOLD,
|
|
|
cycle=POOL_LEN_CHECK_CYCLE):
|
|
|
"""
|
|
|
- 协调添加器,当代理池中可用代理的数量低于下阈值时,触发添加器,启动爬虫
|
|
|
- 补充代理,当代理达到上阈值时,添加器停止工作。
|
|
|
+ If the number of proxies less than lower_threshold, add proxy
|
|
|
"""
|
|
|
conn = RedisClient()
|
|
|
adder = PoolAdder(upper_threshold)
|
|
|
@@ -148,9 +124,6 @@ class Schedule(object):
|
|
|
time.sleep(cycle)
|
|
|
|
|
|
def run(self):
|
|
|
- """
|
|
|
- 运行调度器,创建两个进程,对代理池进行维护。
|
|
|
- """
|
|
|
print('Ip processing running')
|
|
|
valid_process = Process(target=Schedule.valid_proxy)
|
|
|
check_process = Process(target=Schedule.check_pool)
|