Browse Source

add sub proxy pool mechanics (#213)

inVains 1 year ago
parent
commit
78b324498b

+ 4 - 1
proxypool/processors/getter.py

@@ -2,7 +2,7 @@ from loguru import logger
 from proxypool.storages.redis import RedisClient
 from proxypool.storages.redis import RedisClient
 from proxypool.setting import PROXY_NUMBER_MAX
 from proxypool.setting import PROXY_NUMBER_MAX
 from proxypool.crawlers import __all__ as crawlers_cls
 from proxypool.crawlers import __all__ as crawlers_cls
-
+from proxypool.testers import __all__ as testers_cls
 
 
 class Getter(object):
 class Getter(object):
     """
     """
@@ -16,6 +16,8 @@ class Getter(object):
         self.redis = RedisClient()
         self.redis = RedisClient()
         self.crawlers_cls = crawlers_cls
         self.crawlers_cls = crawlers_cls
         self.crawlers = [crawler_cls() for crawler_cls in self.crawlers_cls]
         self.crawlers = [crawler_cls() for crawler_cls in self.crawlers_cls]
+        self.testers_cls = testers_cls
+        self.testers = [tester_cls() for tester_cls in self.testers_cls]
 
 
     def is_full(self):
     def is_full(self):
         """
         """
@@ -36,6 +38,7 @@ class Getter(object):
             logger.info(f'crawler {crawler} to get proxy')
             logger.info(f'crawler {crawler} to get proxy')
             for proxy in crawler.crawl():
             for proxy in crawler.crawl():
                 self.redis.add(proxy)
                 self.redis.add(proxy)
+                [self.redis.add(proxy, redis_key=tester.key) for tester in self.testers]
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':

+ 17 - 4
proxypool/processors/server.py

@@ -1,6 +1,7 @@
 from flask import Flask, g, request
 from flask import Flask, g, request
+from proxypool.exceptions import PoolEmptyException
 from proxypool.storages.redis import RedisClient
 from proxypool.storages.redis import RedisClient
-from proxypool.setting import API_HOST, API_PORT, API_THREADED, API_KEY, IS_DEV
+from proxypool.setting import API_HOST, API_PORT, API_THREADED, API_KEY, IS_DEV, PROXY_RAND_KEY_DEGRADED
 import functools
 import functools
 
 
 __all__ = ['app']
 __all__ = ['app']
@@ -53,10 +54,19 @@ def index():
 @auth_required
 @auth_required
 def get_proxy():
 def get_proxy():
     """
     """
-    get a random proxy
+    get a random proxy, can query the specific sub-pool according the (redis) key
+    if PROXY_RAND_KEY_DEGRADED is set to True, will get a universal random proxy if no proxy found in the sub-pool
     :return: get a random proxy
     :return: get a random proxy
     """
     """
+    key = request.args.get('key')
     conn = get_conn()
     conn = get_conn()
+    # return conn.random(key).string() if key else conn.random().string()
+    if key:
+        try:
+            return conn.random(key).string()
+        except PoolEmptyException:
+            if not PROXY_RAND_KEY_DEGRADED:
+                raise
     return conn.random().string()
     return conn.random().string()
 
 
 
 
@@ -67,8 +77,10 @@ def get_proxy_all():
     get a random proxy
     get a random proxy
     :return: get a random proxy
     :return: get a random proxy
     """
     """
+    key = request.args.get('key')
+
     conn = get_conn()
     conn = get_conn()
-    proxies = conn.all()
+    proxies = conn.all(key) if key else conn.all()
     proxies_string = ''
     proxies_string = ''
     if proxies:
     if proxies:
         for proxy in proxies:
         for proxy in proxies:
@@ -85,7 +97,8 @@ def get_count():
     :return: count, int
     :return: count, int
     """
     """
     conn = get_conn()
     conn = get_conn()
-    return str(conn.count())
+    key = request.args.get('key')
+    return str(conn.count(key)) if key else conn.count()
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':

+ 28 - 0
proxypool/processors/tester.py

@@ -7,6 +7,7 @@ from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STA
     TEST_DONT_SET_MAX_SCORE
     TEST_DONT_SET_MAX_SCORE
 from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
 from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
 from asyncio import TimeoutError
 from asyncio import TimeoutError
+from proxypool.testers import __all__ as testers_cls
 
 
 EXCEPTIONS = (
 EXCEPTIONS = (
     ClientProxyConnectionError,
     ClientProxyConnectionError,
@@ -30,6 +31,8 @@ class Tester(object):
         """
         """
         self.redis = RedisClient()
         self.redis = RedisClient()
         self.loop = asyncio.get_event_loop()
         self.loop = asyncio.get_event_loop()
+        self.testers_cls = testers_cls
+        self.testers = [tester_cls() for tester_cls in self.testers_cls]
 
 
     async def test(self, proxy: Proxy):
     async def test(self, proxy: Proxy):
         """
         """
@@ -63,8 +66,33 @@ class Tester(object):
                     else:
                     else:
                         self.redis.decrease(proxy)
                         self.redis.decrease(proxy)
                         logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
                         logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
+                # if independent tester class found, create new set of storage and do the extra test
+                for tester in self.testers:
+                    key = tester.key
+                    if self.redis.exists(proxy, key):
+                        test_url = tester.test_url
+                        headers = tester.headers()
+                        cookies = tester.cookies()
+                        async with session.get(test_url, proxy=f'http://{proxy.string()}',
+                                               timeout=TEST_TIMEOUT,
+                                               headers=headers,
+                                               cookies=cookies,
+                                               allow_redirects=False) as response:
+                            resp_text = await response.text()
+                            is_valid = await tester.parse(resp_text, test_url, proxy.string())
+                            if is_valid:
+                                if tester.test_dont_set_max_score:
+                                    logger.info(f'key[{key}] proxy {proxy.string()} is valid, remain current score')
+                                else:
+                                    self.redis.max(proxy, key, tester.proxy_score_max)
+                                    logger.info(f'key[{key}] proxy {proxy.string()} is valid, set max score')
+                            else:
+                                self.redis.decrease(proxy, tester.key, tester.proxy_score_min)
+                                logger.info(f'key[{key}] proxy {proxy.string()} is invalid, decrease score')
+
             except EXCEPTIONS:
             except EXCEPTIONS:
                 self.redis.decrease(proxy)
                 self.redis.decrease(proxy)
+                [self.redis.decrease(proxy, tester.key, tester.proxy_score_min) for tester in self.testers]
                 logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
                 logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
 
 
     @logger.catch
     @logger.catch

+ 2 - 0
proxypool/setting.py

@@ -56,6 +56,8 @@ REDIS_KEY = env.str('PROXYPOOL_REDIS_KEY', env.str(
 PROXY_SCORE_MAX = env.int('PROXY_SCORE_MAX', 100)
 PROXY_SCORE_MAX = env.int('PROXY_SCORE_MAX', 100)
 PROXY_SCORE_MIN = env.int('PROXY_SCORE_MIN', 0)
 PROXY_SCORE_MIN = env.int('PROXY_SCORE_MIN', 0)
 PROXY_SCORE_INIT = env.int('PROXY_SCORE_INIT', 10)
 PROXY_SCORE_INIT = env.int('PROXY_SCORE_INIT', 10)
+# whether to get a universal random proxy if no proxy exists in the sub-pool identified by a specific key
+PROXY_RAND_KEY_DEGRADED = env.bool('TEST_ANONYMOUS', True)
 
 
 # definition of proxy number
 # definition of proxy number
 PROXY_NUMBER_MAX = 50000
 PROXY_NUMBER_MAX = 50000

+ 25 - 25
proxypool/storages/redis.py

@@ -34,7 +34,7 @@ class RedisClient(object):
             self.db = redis.StrictRedis(
             self.db = redis.StrictRedis(
                 host=host, port=port, password=password, db=db, decode_responses=True, **kwargs)
                 host=host, port=port, password=password, db=db, decode_responses=True, **kwargs)
 
 
-    def add(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int:
+    def add(self, proxy: Proxy, score=PROXY_SCORE_INIT, redis_key=REDIS_KEY) -> int:
         """
         """
         add proxy and set it to init score
         add proxy and set it to init score
         :param proxy: proxy, ip:port, like 8.8.8.8:88
         :param proxy: proxy, ip:port, like 8.8.8.8:88
@@ -44,12 +44,12 @@ class RedisClient(object):
         if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):
         if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):
             logger.info(f'invalid proxy {proxy}, throw it')
             logger.info(f'invalid proxy {proxy}, throw it')
             return
             return
-        if not self.exists(proxy):
+        if not self.exists(proxy, redis_key):
             if IS_REDIS_VERSION_2:
             if IS_REDIS_VERSION_2:
-                return self.db.zadd(REDIS_KEY, score, proxy.string())
-            return self.db.zadd(REDIS_KEY, {proxy.string(): score})
+                return self.db.zadd(redis_key, score, proxy.string())
+            return self.db.zadd(redis_key, {proxy.string(): score})
 
 
-    def random(self) -> Proxy:
+    def random(self, redis_key=REDIS_KEY, proxy_score_min=PROXY_SCORE_MIN, proxy_score_max=PROXY_SCORE_MAX) -> Proxy:
         """
         """
         get random proxy
         get random proxy
         firstly try to get proxy with max score
         firstly try to get proxy with max score
@@ -59,74 +59,74 @@ class RedisClient(object):
         """
         """
         # try to get proxy with max score
         # try to get proxy with max score
         proxies = self.db.zrangebyscore(
         proxies = self.db.zrangebyscore(
-            REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX)
+            redis_key, proxy_score_max, proxy_score_max)
         if len(proxies):
         if len(proxies):
             return convert_proxy_or_proxies(choice(proxies))
             return convert_proxy_or_proxies(choice(proxies))
         # else get proxy by rank
         # else get proxy by rank
         proxies = self.db.zrevrange(
         proxies = self.db.zrevrange(
-            REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX)
+            redis_key, proxy_score_min, proxy_score_max)
         if len(proxies):
         if len(proxies):
             return convert_proxy_or_proxies(choice(proxies))
             return convert_proxy_or_proxies(choice(proxies))
         # else raise error
         # else raise error
         raise PoolEmptyException
         raise PoolEmptyException
 
 
-    def decrease(self, proxy: Proxy) -> int:
+    def decrease(self, proxy: Proxy, redis_key=REDIS_KEY, proxy_score_min=PROXY_SCORE_MIN) -> int:
         """
         """
         decrease score of proxy, if small than PROXY_SCORE_MIN, delete it
         decrease score of proxy, if small than PROXY_SCORE_MIN, delete it
         :param proxy: proxy
         :param proxy: proxy
         :return: new score
         :return: new score
         """
         """
         if IS_REDIS_VERSION_2:
         if IS_REDIS_VERSION_2:
-            self.db.zincrby(REDIS_KEY, proxy.string(), -1)
+            self.db.zincrby(redis_key, proxy.string(), -1)
         else:
         else:
-            self.db.zincrby(REDIS_KEY, -1, proxy.string())
-        score = self.db.zscore(REDIS_KEY, proxy.string())
+            self.db.zincrby(redis_key, -1, proxy.string())
+        score = self.db.zscore(redis_key, proxy.string())
         logger.info(f'{proxy.string()} score decrease 1, current {score}')
         logger.info(f'{proxy.string()} score decrease 1, current {score}')
-        if score <= PROXY_SCORE_MIN:
+        if score <= proxy_score_min:
             logger.info(f'{proxy.string()} current score {score}, remove')
             logger.info(f'{proxy.string()} current score {score}, remove')
-            self.db.zrem(REDIS_KEY, proxy.string())
+            self.db.zrem(redis_key, proxy.string())
 
 
-    def exists(self, proxy: Proxy) -> bool:
+    def exists(self, proxy: Proxy, redis_key=REDIS_KEY) -> bool:
         """
         """
         if proxy exists
         if proxy exists
         :param proxy: proxy
         :param proxy: proxy
         :return: if exists, bool
         :return: if exists, bool
         """
         """
-        return not self.db.zscore(REDIS_KEY, proxy.string()) is None
+        return not self.db.zscore(redis_key, proxy.string()) is None
 
 
-    def max(self, proxy: Proxy) -> int:
+    def max(self, proxy: Proxy, redis_key=REDIS_KEY, proxy_score_max=PROXY_SCORE_MAX) -> int:
         """
         """
         set proxy to max score
         set proxy to max score
         :param proxy: proxy
         :param proxy: proxy
         :return: new score
         :return: new score
         """
         """
-        logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}')
+        logger.info(f'{proxy.string()} is valid, set to {proxy_score_max}')
         if IS_REDIS_VERSION_2:
         if IS_REDIS_VERSION_2:
-            return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string())
-        return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})
+            return self.db.zadd(redis_key, proxy_score_max, proxy.string())
+        return self.db.zadd(redis_key, {proxy.string(): proxy_score_max})
 
 
-    def count(self) -> int:
+    def count(self, redis_key=REDIS_KEY) -> int:
         """
         """
         get count of proxies
         get count of proxies
         :return: count, int
         :return: count, int
         """
         """
-        return self.db.zcard(REDIS_KEY)
+        return self.db.zcard(redis_key)
 
 
-    def all(self) -> List[Proxy]:
+    def all(self, redis_key=REDIS_KEY, proxy_score_min=PROXY_SCORE_MIN, proxy_score_max=PROXY_SCORE_MAX) -> List[Proxy]:
         """
         """
         get all proxies
         get all proxies
         :return: list of proxies
         :return: list of proxies
         """
         """
-        return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))
+        return convert_proxy_or_proxies(self.db.zrangebyscore(redis_key, proxy_score_min, proxy_score_max))
 
 
-    def batch(self, cursor, count) -> List[Proxy]:
+    def batch(self, cursor, count, redis_key=REDIS_KEY) -> List[Proxy]:
         """
         """
         get batch of proxies
         get batch of proxies
         :param cursor: scan cursor
         :param cursor: scan cursor
         :param count: scan count
         :param count: scan count
         :return: list of proxies
         :return: list of proxies
         """
         """
-        cursor, proxies = self.db.zscan(REDIS_KEY, cursor, count=count)
+        cursor, proxies = self.db.zscan(redis_key, cursor, count=count)
         return cursor, convert_proxy_or_proxies([i[0] for i in proxies])
         return cursor, convert_proxy_or_proxies([i[0] for i in proxies])
 
 
 
 

+ 16 - 0
proxypool/testers/__init__.py

@@ -0,0 +1,16 @@
+import pkgutil
+from .base import BaseTester
+import inspect
+
+
+# load classes subclass of BaseCrawler
+classes = []
+for loader, name, is_pkg in pkgutil.walk_packages(__path__):
+    module = loader.find_module(name).load_module(name)
+    for name, value in inspect.getmembers(module):
+        globals()[name] = value
+        if inspect.isclass(value) and issubclass(value, BaseTester) and value is not BaseTester \
+                and not getattr(value, 'ignore', False):
+            classes.append(value)
+__all__ = __ALL__ = classes
+

+ 19 - 0
proxypool/testers/base.py

@@ -0,0 +1,19 @@
+from proxypool.setting import TEST_DONT_SET_MAX_SCORE, PROXY_SCORE_INIT, PROXY_SCORE_MAX, PROXY_SCORE_MIN
+
+
+class BaseTester(object):
+    test_url = ""
+    key = ""
+    test_dont_set_max_score = TEST_DONT_SET_MAX_SCORE
+    proxy_score_init = PROXY_SCORE_INIT
+    proxy_score_max = PROXY_SCORE_MAX
+    proxy_score_min = PROXY_SCORE_MIN
+
+    def headers(self):
+        return None
+
+    def cookies(self):
+        return None
+
+    async def parse(self, html, url, proxy, expr='{"code":0'):
+        return True if expr in html else False