tester.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import asyncio
  2. import aiohttp
  3. from loguru import logger
  4. from proxypool.schemas import Proxy
  5. from proxypool.storages.redis import RedisClient
  6. from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS, TEST_ANONYMOUS, \
  7. TEST_DONT_SET_MAX_SCORE
  8. from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
  9. from asyncio import TimeoutError
  10. from proxypool.testers import __all__ as testers_cls
  11. EXCEPTIONS = (
  12. ClientProxyConnectionError,
  13. ConnectionRefusedError,
  14. TimeoutError,
  15. ServerDisconnectedError,
  16. ClientOSError,
  17. ClientHttpProxyError,
  18. AssertionError
  19. )
  20. class Tester(object):
  21. """
  22. tester for testing proxies in queue
  23. """
  24. def __init__(self):
  25. """
  26. init redis
  27. """
  28. self.redis = RedisClient()
  29. self.loop = asyncio.get_event_loop()
  30. self.testers_cls = testers_cls
  31. self.testers = [tester_cls() for tester_cls in self.testers_cls]
  32. async def test(self, proxy: Proxy):
  33. """
  34. test single proxy
  35. :param proxy: Proxy object
  36. :return:
  37. """
  38. async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
  39. try:
  40. logger.debug(f'testing {proxy.string()}')
  41. # if TEST_ANONYMOUS is True, make sure that
  42. # the proxy has the effect of hiding the real IP
  43. # logger.debug(f'TEST_ANONYMOUS {TEST_ANONYMOUS}')
  44. if TEST_ANONYMOUS:
  45. url = 'https://httpbin.org/ip'
  46. async with session.get(url, timeout=TEST_TIMEOUT) as response:
  47. resp_json = await response.json()
  48. origin_ip = resp_json['origin']
  49. # logger.debug(f'origin ip is {origin_ip}')
  50. async with session.get(url, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT) as response:
  51. resp_json = await response.json()
  52. anonymous_ip = resp_json['origin']
  53. logger.debug(f'anonymous ip is {anonymous_ip}')
  54. assert origin_ip != anonymous_ip
  55. assert proxy.host == anonymous_ip
  56. async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
  57. allow_redirects=False) as response:
  58. if response.status in TEST_VALID_STATUS:
  59. if TEST_DONT_SET_MAX_SCORE:
  60. logger.debug(
  61. f'proxy {proxy.string()} is valid, remain current score')
  62. else:
  63. self.redis.max(proxy)
  64. logger.debug(
  65. f'proxy {proxy.string()} is valid, set max score')
  66. else:
  67. self.redis.decrease(proxy)
  68. logger.debug(
  69. f'proxy {proxy.string()} is invalid, decrease score')
  70. # if independent tester class found, create new set of storage and do the extra test
  71. for tester in self.testers:
  72. key = tester.key
  73. if self.redis.exists(proxy, key):
  74. test_url = tester.test_url
  75. headers = tester.headers()
  76. cookies = tester.cookies()
  77. async with session.get(test_url, proxy=f'http://{proxy.string()}',
  78. timeout=TEST_TIMEOUT,
  79. headers=headers,
  80. cookies=cookies,
  81. allow_redirects=False) as response:
  82. resp_text = await response.text()
  83. is_valid = await tester.parse(resp_text, test_url, proxy.string())
  84. if is_valid:
  85. if tester.test_dont_set_max_score:
  86. logger.info(
  87. f'key[{key}] proxy {proxy.string()} is valid, remain current score')
  88. else:
  89. self.redis.max(
  90. proxy, key, tester.proxy_score_max)
  91. logger.info(
  92. f'key[{key}] proxy {proxy.string()} is valid, set max score')
  93. else:
  94. self.redis.decrease(
  95. proxy, tester.key, tester.proxy_score_min)
  96. logger.info(
  97. f'key[{key}] proxy {proxy.string()} is invalid, decrease score')
  98. except EXCEPTIONS:
  99. self.redis.decrease(proxy)
  100. [self.redis.decrease(proxy, tester.key, tester.proxy_score_min)
  101. for tester in self.testers]
  102. logger.debug(
  103. f'proxy {proxy.string()} is invalid, decrease score')
  104. @logger.catch
  105. def run(self):
  106. """
  107. test main method
  108. :return:
  109. """
  110. # event loop of aiohttp
  111. logger.info('stating tester...')
  112. count = self.redis.count()
  113. logger.debug(f'{count} proxies to test')
  114. cursor = 0
  115. while True:
  116. logger.debug(
  117. f'testing proxies use cursor {cursor}, count {TEST_BATCH}')
  118. cursor, proxies = self.redis.batch(cursor, count=TEST_BATCH)
  119. if proxies:
  120. tasks = [self.loop.create_task(
  121. self.test(proxy)) for proxy in proxies]
  122. self.loop.run_until_complete(asyncio.wait(tasks))
  123. if not cursor:
  124. break
  125. def run_tester():
  126. host = '96.113.165.182'
  127. port = '3128'
  128. tasks = [tester.test(Proxy(host=host, port=port))]
  129. tester.loop.run_until_complete(asyncio.wait(tasks))
  130. if __name__ == '__main__':
  131. tester = Tester()
  132. tester.run()
  133. # run_tester()