123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import asyncio
- import aiohttp
- from loguru import logger
- from proxypool.schemas import Proxy
- from proxypool.storages.redis import RedisClient
- from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS, TEST_ANONYMOUS, \
- TEST_DONT_SET_MAX_SCORE
- from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
- from asyncio import TimeoutError
- EXCEPTIONS = (
- ClientProxyConnectionError,
- ConnectionRefusedError,
- TimeoutError,
- ServerDisconnectedError,
- ClientOSError,
- ClientHttpProxyError,
- AssertionError
- )
- class Tester(object):
- """
- tester for testing proxies in queue
- """
- def __init__(self):
- """
- init redis
- """
- self.redis = RedisClient()
- self.loop = asyncio.get_event_loop()
- async def test(self, proxy: Proxy):
- """
- test single proxy
- :param proxy: Proxy object
- :return:
- """
- async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
- try:
- logger.debug(f'testing {proxy.string()}')
- # if TEST_ANONYMOUS is True, make sure that
- # the proxy has the effect of hiding the real IP
- if TEST_ANONYMOUS:
- url = 'https://httpbin.org/ip'
- async with session.get(url, timeout=TEST_TIMEOUT) as response:
- resp_json = await response.json()
- origin_ip = resp_json['origin']
- async with session.get(url, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT) as response:
- resp_json = await response.json()
- anonymous_ip = resp_json['origin']
- assert origin_ip != anonymous_ip
- assert proxy.host == anonymous_ip
- async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
- allow_redirects=False) as response:
- if response.status in TEST_VALID_STATUS:
- if TEST_DONT_SET_MAX_SCORE:
- logger.debug(f'proxy {proxy.string()} is valid, remain current score')
- else:
- self.redis.max(proxy)
- logger.debug(f'proxy {proxy.string()} is valid, set max score')
- else:
- self.redis.decrease(proxy)
- logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
- except EXCEPTIONS:
- self.redis.decrease(proxy)
- logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
- @logger.catch
- def run(self):
- """
- test main method
- :return:
- """
- # event loop of aiohttp
- logger.info('stating tester...')
- count = self.redis.count()
- logger.debug(f'{count} proxies to test')
- cursor = 0
- while True:
- logger.debug(f'testing proxies use cursor {cursor}, count {TEST_BATCH}')
- cursor, proxies = self.redis.batch(cursor, count=TEST_BATCH)
- if proxies:
- tasks = [self.loop.create_task(self.test(proxy)) for proxy in proxies]
- self.loop.run_until_complete(asyncio.wait(tasks))
- if not cursor:
- break
- def run_tester():
- host = '96.113.165.182'
- port = '3128'
- tasks = [tester.test(Proxy(host=host, port=port))]
- tester.loop.run_until_complete(asyncio.wait(tasks))
- if __name__ == '__main__':
- tester = Tester()
- tester.run()
- # run_tester()
|