tester.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import asyncio
  2. import aiohttp
  3. from loguru import logger
  4. from proxypool.schemas import Proxy
  5. from proxypool.storages.redis import RedisClient
  6. from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS
  7. from aiohttp import ClientProxyConnectionError, ServerDisconnectedError
  8. from asyncio import TimeoutError
  9. EXCEPTIONS = (
  10. ClientProxyConnectionError,
  11. ConnectionRefusedError,
  12. TimeoutError,
  13. ServerDisconnectedError
  14. )
  15. class Tester(object):
  16. """
  17. tester for testing proxies in queue
  18. """
  19. def __init__(self):
  20. """
  21. init redis
  22. """
  23. self.redis = RedisClient()
  24. self.loop = asyncio.get_event_loop()
  25. async def test(self, proxy: Proxy):
  26. """
  27. test single proxy
  28. :param proxy: Proxy object
  29. :return:
  30. """
  31. async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
  32. try:
  33. logger.debug(f'testing {proxy.string()}')
  34. async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
  35. allow_redirects=False) as response:
  36. if response.status in TEST_VALID_STATUS:
  37. self.redis.max(proxy)
  38. logger.debug(f'proxy {proxy.string()} is valid, set max score')
  39. else:
  40. self.redis.decrease(proxy)
  41. logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
  42. except EXCEPTIONS:
  43. self.redis.decrease(proxy)
  44. logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
  45. @logger.catch
  46. def run(self):
  47. """
  48. test main method
  49. :return:
  50. """
  51. # event loop of aiohttp
  52. logger.info('stating tester...')
  53. count = self.redis.count()
  54. logger.debug(f'{count} proxies to test')
  55. for i in range(0, count, TEST_BATCH):
  56. # start end end offset
  57. start, end = i, min(i + TEST_BATCH, count)
  58. logger.debug(f'testing proxies from {start} to {end} indices')
  59. proxies = self.redis.batch(start, end)
  60. tasks = [self.test(proxy) for proxy in proxies]
  61. # run tasks using event loop
  62. self.loop.run_until_complete(asyncio.wait(tasks))
  63. if __name__ == '__main__':
  64. tester = Tester()
  65. tester.run()