| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 | 
							- import asyncio
 
- import aiohttp
 
- from loguru import logger
 
- from proxypool.schemas import Proxy
 
- from proxypool.storages.redis import RedisClient
 
- from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS, TEST_ANONYMOUS
 
- from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
 
- from asyncio import TimeoutError
 
- EXCEPTIONS = (
 
-     ClientProxyConnectionError,
 
-     ConnectionRefusedError,
 
-     TimeoutError,
 
-     ServerDisconnectedError,
 
-     ClientOSError,
 
-     ClientHttpProxyError,
 
-     AssertionError
 
- )
 
- class Tester(object):
 
-     """
 
-     tester for testing proxies in queue
 
-     """
 
-     
 
-     def __init__(self):
 
-         """
 
-         init redis
 
-         """
 
-         self.redis = RedisClient()
 
-         self.loop = asyncio.get_event_loop()
 
-     
 
-     async def test(self, proxy: Proxy):
 
-         """
 
-         test single proxy
 
-         :param proxy: Proxy object
 
-         :return:
 
-         """
 
-         async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
 
-             try:
 
-                 logger.debug(f'testing {proxy.string()}')
 
-                 # if TEST_ANONYMOUS is True, make sure that
 
-                 # the proxy has the effect of hiding the real IP
 
-                 if TEST_ANONYMOUS:
 
-                     url = 'https://httpbin.org/ip'
 
-                     async with session.get(url, timeout=TEST_TIMEOUT) as response:
 
-                         resp_json = await response.json()
 
-                         origin_ip = resp_json['origin']
 
-                     async with session.get(url, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT) as response:
 
-                         resp_json = await response.json()
 
-                         anonymous_ip = resp_json['origin']
 
-                     assert origin_ip != anonymous_ip
 
-                     assert proxy.host == anonymous_ip
 
-                 async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
 
-                                        allow_redirects=False) as response:
 
-                     if response.status in TEST_VALID_STATUS:
 
-                         self.redis.max(proxy)
 
-                         logger.debug(f'proxy {proxy.string()} is valid, set max score')
 
-                     else:
 
-                         self.redis.decrease(proxy)
 
-                         logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
 
-             except EXCEPTIONS:
 
-                 self.redis.decrease(proxy)
 
-                 logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
 
-     
 
-     @logger.catch
 
-     def run(self):
 
-         """
 
-         test main method
 
-         :return:
 
-         """
 
-         # event loop of aiohttp
 
-         logger.info('stating tester...')
 
-         count = self.redis.count()
 
-         logger.debug(f'{count} proxies to test')
 
-         for i in range(0, count, TEST_BATCH):
 
-             # start end end offset
 
-             start, end = i, min(i + TEST_BATCH, count)
 
-             logger.debug(f'testing proxies from {start} to {end} indices')
 
-             proxies = self.redis.batch(start, end)
 
-             tasks = [self.test(proxy) for proxy in proxies]
 
-             # run tasks using event loop
 
-             self.loop.run_until_complete(asyncio.wait(tasks))
 
- if __name__ == '__main__':
 
-     tester = Tester()
 
-     tester.run()
 
 
  |