schedule.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import time
  2. from multiprocessing import Process
  3. import asyncio
  4. import aiohttp
  5. try:
  6. from aiohttp.errors import ProxyConnectionError
  7. except:
  8. from aiohttp import ClientProxyConnectionError as ProxyConnectionError
  9. from proxypool.db import RedisClient
  10. from proxypool.error import ResourceDepletionError
  11. from proxypool.getter import FreeProxyGetter
  12. from proxypool.setting import *
  13. from asyncio import TimeoutError
  14. class ValidityTester(object):
  15. test_api = TEST_API
  16. def __init__(self):
  17. self._raw_proxies = None
  18. self._usable_proxies = []
  19. def set_raw_proxies(self, proxies):
  20. self._raw_proxies = proxies
  21. self._conn = RedisClient()
  22. async def test_single_proxy(self, proxy):
  23. """
  24. text one proxy, if valid, put them to usable_proxies.
  25. """
  26. async with aiohttp.ClientSession() as session:
  27. try:
  28. if isinstance(proxy, bytes):
  29. proxy = proxy.decode('utf-8')
  30. real_proxy = 'http://' + proxy
  31. print('Testing', proxy)
  32. async with session.get(self.test_api, proxy=real_proxy, timeout=15) as response:
  33. if response.status == 200:
  34. self._conn.put(proxy)
  35. print('Valid proxy', proxy)
  36. except (ProxyConnectionError, TimeoutError, ValueError):
  37. print('Invalid proxy', proxy)
  38. def test(self):
  39. """
  40. aio test all proxies.
  41. """
  42. print('ValidityTester is working')
  43. try:
  44. loop = asyncio.get_event_loop()
  45. tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies]
  46. loop.run_until_complete(asyncio.wait(tasks))
  47. except ValueError:
  48. print('Async Error')
  49. class PoolAdder(object):
  50. """
  51. add proxy to pool
  52. """
  53. def __init__(self, threshold):
  54. self._threshold = threshold
  55. self._conn = RedisClient()
  56. self._tester = ValidityTester()
  57. self._crawler = FreeProxyGetter()
  58. def is_over_threshold(self):
  59. """
  60. judge if count is overflow.
  61. """
  62. if self._conn.queue_len >= self._threshold:
  63. return True
  64. else:
  65. return False
  66. def add_to_queue(self):
  67. print('PoolAdder is working')
  68. proxy_count = 0
  69. while not self.is_over_threshold():
  70. for callback_label in range(self._crawler.__CrawlFuncCount__):
  71. callback = self._crawler.__CrawlFunc__[callback_label]
  72. raw_proxies = self._crawler.get_raw_proxies(callback)
  73. # test crawled proxies
  74. self._tester.set_raw_proxies(raw_proxies)
  75. self._tester.test()
  76. proxy_count += len(raw_proxies)
  77. if self.is_over_threshold():
  78. print('IP is enough, waiting to be used')
  79. break
  80. if proxy_count == 0:
  81. raise ResourceDepletionError
  82. class Schedule(object):
  83. @staticmethod
  84. def valid_proxy(cycle=VALID_CHECK_CYCLE):
  85. """
  86. Get half of proxies which in redis
  87. """
  88. conn = RedisClient()
  89. tester = ValidityTester()
  90. while True:
  91. print('Refreshing ip')
  92. count = int(0.5 * conn.queue_len)
  93. if count == 0:
  94. print('Waiting for adding')
  95. time.sleep(cycle)
  96. continue
  97. raw_proxies = conn.get(count)
  98. tester.set_raw_proxies(raw_proxies)
  99. tester.test()
  100. time.sleep(cycle)
  101. @staticmethod
  102. def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,
  103. upper_threshold=POOL_UPPER_THRESHOLD,
  104. cycle=POOL_LEN_CHECK_CYCLE):
  105. """
  106. If the number of proxies less than lower_threshold, add proxy
  107. """
  108. conn = RedisClient()
  109. adder = PoolAdder(upper_threshold)
  110. while True:
  111. if conn.queue_len < lower_threshold:
  112. adder.add_to_queue()
  113. time.sleep(cycle)
  114. def run(self):
  115. print('Ip processing running')
  116. valid_process = Process(target=Schedule.valid_proxy)
  117. check_process = Process(target=Schedule.check_pool)
  118. valid_process.start()
  119. check_process.start()