123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import time
- from multiprocessing import Process
- import asyncio
- import aiohttp
- try:
- from aiohttp.errors import ProxyConnectionError
- except:
- from aiohttp import ClientProxyConnectionError as ProxyConnectionError
- from proxypool.db import RedisClient
- from proxypool.error import ResourceDepletionError
- from proxypool.getter import FreeProxyGetter
- from proxypool.setting import *
- from asyncio import TimeoutError
- class ValidityTester(object):
- test_api = TEST_API
- def __init__(self):
- self._raw_proxies = None
- self._usable_proxies = []
- def set_raw_proxies(self, proxies):
- self._raw_proxies = proxies
- self._conn = RedisClient()
- async def test_single_proxy(self, proxy):
- """
- text one proxy, if valid, put them to usable_proxies.
- """
- async with aiohttp.ClientSession() as session:
- try:
- if isinstance(proxy, bytes):
- proxy = proxy.decode('utf-8')
- real_proxy = 'http://' + proxy
- print('Testing', proxy)
- async with session.get(self.test_api, proxy=real_proxy, timeout=15) as response:
- if response.status == 200:
- self._conn.put(proxy)
- print('Valid proxy', proxy)
- except (ProxyConnectionError, TimeoutError, ValueError):
- print('Invalid proxy', proxy)
- def test(self):
- """
- aio test all proxies.
- """
- print('ValidityTester is working')
- try:
- loop = asyncio.get_event_loop()
- tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies]
- loop.run_until_complete(asyncio.wait(tasks))
- except ValueError:
- print('Async Error')
- class PoolAdder(object):
- """
- add proxy to pool
- """
- def __init__(self, threshold):
- self._threshold = threshold
- self._conn = RedisClient()
- self._tester = ValidityTester()
- self._crawler = FreeProxyGetter()
- def is_over_threshold(self):
- """
- judge if count is overflow.
- """
- if self._conn.queue_len >= self._threshold:
- return True
- else:
- return False
- def add_to_queue(self):
- print('PoolAdder is working')
- proxy_count = 0
- while not self.is_over_threshold():
- for callback_label in range(self._crawler.__CrawlFuncCount__):
- callback = self._crawler.__CrawlFunc__[callback_label]
- raw_proxies = self._crawler.get_raw_proxies(callback)
- # test crawled proxies
- self._tester.set_raw_proxies(raw_proxies)
- self._tester.test()
- proxy_count += len(raw_proxies)
- if self.is_over_threshold():
- print('IP is enough, waiting to be used')
- break
- if proxy_count == 0:
- raise ResourceDepletionError
- class Schedule(object):
- @staticmethod
- def valid_proxy(cycle=VALID_CHECK_CYCLE):
- """
- Get half of proxies which in redis
- """
- conn = RedisClient()
- tester = ValidityTester()
- while True:
- print('Refreshing ip')
- count = int(0.5 * conn.queue_len)
- if count == 0:
- print('Waiting for adding')
- time.sleep(cycle)
- continue
- raw_proxies = conn.get(count)
- tester.set_raw_proxies(raw_proxies)
- tester.test()
- time.sleep(cycle)
- @staticmethod
- def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,
- upper_threshold=POOL_UPPER_THRESHOLD,
- cycle=POOL_LEN_CHECK_CYCLE):
- """
- If the number of proxies less than lower_threshold, add proxy
- """
- conn = RedisClient()
- adder = PoolAdder(upper_threshold)
- while True:
- if conn.queue_len < lower_threshold:
- adder.add_to_queue()
- time.sleep(cycle)
- def run(self):
- print('Ip processing running')
- valid_process = Process(target=Schedule.valid_proxy)
- check_process = Process(target=Schedule.check_pool)
- valid_process.start()
- check_process.start()
|