tester.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import asyncio
  2. import aiohttp
  3. from loguru import logger
  4. from proxypool.schemas import Proxy
  5. from proxypool.storages.redis import RedisClient
  6. from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS, TEST_ANONYMOUS
  7. from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
  8. from asyncio import TimeoutError
  9. EXCEPTIONS = (
  10. ClientProxyConnectionError,
  11. ConnectionRefusedError,
  12. TimeoutError,
  13. ServerDisconnectedError,
  14. ClientOSError,
  15. ClientHttpProxyError,
  16. AssertionError
  17. )
  18. class Tester(object):
  19. """
  20. tester for testing proxies in queue
  21. """
  22. def __init__(self):
  23. """
  24. init redis
  25. """
  26. self.redis = RedisClient()
  27. self.loop = asyncio.get_event_loop()
  28. async def test(self, proxy: Proxy):
  29. """
  30. test single proxy
  31. :param proxy: Proxy object
  32. :return:
  33. """
  34. async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
  35. try:
  36. logger.debug(f'testing {proxy.string()}')
  37. # if TEST_ANONYMOUS is True, make sure that
  38. # the proxy has the effect of hiding the real IP
  39. if TEST_ANONYMOUS:
  40. url = 'https://httpbin.org/ip'
  41. async with session.get(url, timeout=TEST_TIMEOUT) as response:
  42. resp_json = await response.json()
  43. origin_ip = resp_json['origin']
  44. async with session.get(url, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT) as response:
  45. resp_json = await response.json()
  46. anonymous_ip = resp_json['origin']
  47. assert origin_ip != anonymous_ip
  48. assert proxy.host == anonymous_ip
  49. async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
  50. allow_redirects=False) as response:
  51. if response.status in TEST_VALID_STATUS:
  52. self.redis.max(proxy)
  53. logger.debug(f'proxy {proxy.string()} is valid, set max score')
  54. else:
  55. self.redis.decrease(proxy)
  56. logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
  57. except EXCEPTIONS:
  58. self.redis.decrease(proxy)
  59. logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
  60. @logger.catch
  61. def run(self):
  62. """
  63. test main method
  64. :return:
  65. """
  66. # event loop of aiohttp
  67. logger.info('stating tester...')
  68. count = self.redis.count()
  69. logger.debug(f'{count} proxies to test')
  70. for i in range(0, count, TEST_BATCH):
  71. # start end end offset
  72. start, end = i, min(i + TEST_BATCH, count)
  73. logger.debug(f'testing proxies from {start} to {end} indices')
  74. proxies = self.redis.batch(start, end)
  75. tasks = [self.test(proxy) for proxy in proxies]
  76. # run tasks using event loop
  77. self.loop.run_until_complete(asyncio.wait(tasks))
  78. if __name__ == '__main__':
  79. tester = Tester()
  80. tester.run()