tester.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import asyncio
  2. import aiohttp
  3. from loguru import logger
  4. from proxypool.schemas import Proxy
  5. from proxypool.storages.redis import RedisClient
  6. from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS, TEST_ANONYMOUS, \
  7. TEST_DONT_SET_MAX_SCORE
  8. from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
  9. from asyncio import TimeoutError
  10. from proxypool.testers import __all__ as testers_cls
  11. EXCEPTIONS = (
  12. ClientProxyConnectionError,
  13. ConnectionRefusedError,
  14. TimeoutError,
  15. ServerDisconnectedError,
  16. ClientOSError,
  17. ClientHttpProxyError,
  18. AssertionError
  19. )
  20. class Tester(object):
  21. """
  22. tester for testing proxies in queue
  23. """
  24. def __init__(self):
  25. """
  26. init redis
  27. """
  28. self.redis = RedisClient()
  29. self.loop = asyncio.get_event_loop()
  30. self.testers_cls = testers_cls
  31. self.testers = [tester_cls() for tester_cls in self.testers_cls]
  32. async def test(self, proxy: Proxy):
  33. """
  34. test single proxy
  35. :param proxy: Proxy object
  36. :return:
  37. """
  38. async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
  39. try:
  40. logger.debug(f'testing {proxy.string()}')
  41. # if TEST_ANONYMOUS is True, make sure that
  42. # the proxy has the effect of hiding the real IP
  43. if TEST_ANONYMOUS:
  44. url = 'https://httpbin.org/ip'
  45. async with session.get(url, timeout=TEST_TIMEOUT) as response:
  46. resp_json = await response.json()
  47. origin_ip = resp_json['origin']
  48. async with session.get(url, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT) as response:
  49. resp_json = await response.json()
  50. anonymous_ip = resp_json['origin']
  51. assert origin_ip != anonymous_ip
  52. assert proxy.host == anonymous_ip
  53. async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
  54. allow_redirects=False) as response:
  55. if response.status in TEST_VALID_STATUS:
  56. if TEST_DONT_SET_MAX_SCORE:
  57. logger.debug(f'proxy {proxy.string()} is valid, remain current score')
  58. else:
  59. self.redis.max(proxy)
  60. logger.debug(f'proxy {proxy.string()} is valid, set max score')
  61. else:
  62. self.redis.decrease(proxy)
  63. logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
  64. # if independent tester class found, create new set of storage and do the extra test
  65. for tester in self.testers:
  66. key = tester.key
  67. if self.redis.exists(proxy, key):
  68. test_url = tester.test_url
  69. headers = tester.headers()
  70. cookies = tester.cookies()
  71. async with session.get(test_url, proxy=f'http://{proxy.string()}',
  72. timeout=TEST_TIMEOUT,
  73. headers=headers,
  74. cookies=cookies,
  75. allow_redirects=False) as response:
  76. resp_text = await response.text()
  77. is_valid = await tester.parse(resp_text, test_url, proxy.string())
  78. if is_valid:
  79. if tester.test_dont_set_max_score:
  80. logger.info(f'key[{key}] proxy {proxy.string()} is valid, remain current score')
  81. else:
  82. self.redis.max(proxy, key, tester.proxy_score_max)
  83. logger.info(f'key[{key}] proxy {proxy.string()} is valid, set max score')
  84. else:
  85. self.redis.decrease(proxy, tester.key, tester.proxy_score_min)
  86. logger.info(f'key[{key}] proxy {proxy.string()} is invalid, decrease score')
  87. except EXCEPTIONS:
  88. self.redis.decrease(proxy)
  89. [self.redis.decrease(proxy, tester.key, tester.proxy_score_min) for tester in self.testers]
  90. logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
  91. @logger.catch
  92. def run(self):
  93. """
  94. test main method
  95. :return:
  96. """
  97. # event loop of aiohttp
  98. logger.info('stating tester...')
  99. count = self.redis.count()
  100. logger.debug(f'{count} proxies to test')
  101. cursor = 0
  102. while True:
  103. logger.debug(f'testing proxies use cursor {cursor}, count {TEST_BATCH}')
  104. cursor, proxies = self.redis.batch(cursor, count=TEST_BATCH)
  105. if proxies:
  106. tasks = [self.loop.create_task(self.test(proxy)) for proxy in proxies]
  107. self.loop.run_until_complete(asyncio.wait(tasks))
  108. if not cursor:
  109. break
  110. def run_tester():
  111. host = '96.113.165.182'
  112. port = '3128'
  113. tasks = [tester.test(Proxy(host=host, port=port))]
  114. tester.loop.run_until_complete(asyncio.wait(tasks))
  115. if __name__ == '__main__':
  116. tester = Tester()
  117. tester.run()
  118. # run_tester()