tester.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import asyncio
  2. import aiohttp
  3. import time
  4. import sys
  5. try:
  6. from aiohttp import ClientError
  7. except:
  8. from aiohttp import ClientProxyConnectionError as ProxyConnectionError
  9. from proxypool.db import RedisClient
  10. from proxypool.setting import *
  11. class Tester(object):
  12. def __init__(self):
  13. self.redis = RedisClient()
  14. async def test_single_proxy(self, proxy):
  15. """
  16. 测试单个代理
  17. :param proxy:
  18. :return:
  19. """
  20. conn = aiohttp.TCPConnector(verify_ssl=False)
  21. async with aiohttp.ClientSession(connector=conn) as session:
  22. try:
  23. if isinstance(proxy, bytes):
  24. proxy = proxy.decode('utf-8')
  25. real_proxy = 'http://' + proxy
  26. print('正在测试', proxy)
  27. async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
  28. if response.status in VALID_STATUS_CODES:
  29. self.redis.max(proxy)
  30. print('代理可用', proxy)
  31. else:
  32. self.redis.decrease(proxy)
  33. print('请求响应码不合法 ', response.status, 'IP', proxy)
  34. except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):
  35. self.redis.decrease(proxy)
  36. print('代理请求失败', proxy)
  37. def run(self):
  38. """
  39. 测试主函数
  40. :return:
  41. """
  42. print('测试器开始运行')
  43. try:
  44. count = self.redis.count()
  45. print('当前剩余', count, '个代理')
  46. for i in range(0, count, BATCH_TEST_SIZE):
  47. start = i
  48. stop = min(i + BATCH_TEST_SIZE, count)
  49. print('正在测试第', start + 1, '-', stop, '个代理')
  50. test_proxies = self.redis.batch(start, stop)
  51. loop = asyncio.get_event_loop()
  52. tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
  53. loop.run_until_complete(asyncio.wait(tasks))
  54. sys.stdout.flush()
  55. time.sleep(5)
  56. except Exception as e:
  57. print('测试器发生错误', e.args)