浏览代码

update pool

Germey 8 年之前
父节点
当前提交
d5aa999148
共有 11 个文件被更改,包括 371 次插入307 次删除
  1. 6 10
      proxypool/api.py
  2. 83 0
      proxypool/crawler.py
  3. 83 32
      proxypool/db.py
  4. 1 10
      proxypool/error.py
  5. 39 75
      proxypool/getter.py
  6. 0 134
      proxypool/schedule.py
  7. 58 0
      proxypool/scheduler.py
  8. 25 9
      proxypool/setting.py
  9. 60 0
      proxypool/tester.py
  10. 12 33
      proxypool/utils.py
  11. 4 4
      run.py

+ 6 - 10
proxypool/api.py

@@ -8,13 +8,9 @@ app = Flask(__name__)
 
 
 def get_conn():
-    """
-    Opens a new redis connection if there is none yet for the
-    current application context.
-    """
-    if not hasattr(g, 'redis_client'):
-        g.redis_client = RedisClient()
-    return g.redis_client
+    if not hasattr(g, 'redis'):
+        g.redis = RedisClient()
+    return g.redis
 
 
 @app.route('/')
@@ -22,13 +18,13 @@ def index():
     return '<h2>Welcome to Proxy Pool System</h2>'
 
 
[email protected]('/get')
[email protected]('/random')
 def get_proxy():
     """
     Get a proxy
     """
     conn = get_conn()
-    return conn.pop()
+    return conn.random()
 
 
 @app.route('/count')
@@ -37,7 +33,7 @@ def get_counts():
     Get the count of proxies
     """
     conn = get_conn()
-    return str(conn.queue_len)
+    return str(conn.count())
 
 
 if __name__ == '__main__':

+ 83 - 0
proxypool/crawler.py

@@ -0,0 +1,83 @@
+import json
+
+from .utils import get_page
+from pyquery import PyQuery as pq
+
+
+class ProxyMetaclass(type):
+    def __new__(cls, name, bases, attrs):
+        count = 0
+        attrs['__CrawlFunc__'] = []
+        for k, v in attrs.items():
+            if 'crawl_' in k:
+                attrs['__CrawlFunc__'].append(k)
+                count += 1
+        attrs['__CrawlFuncCount__'] = count
+        return type.__new__(cls, name, bases, attrs)
+
+
+class Crawler(object, metaclass=ProxyMetaclass):
+    def get_proxies(self, callback):
+        proxies = []
+        for proxy in eval("self.{}()".format(callback)):
+            print('成功获取到代理', proxy)
+            proxies.append(proxy)
+        return proxies
+    
+    def crawl_kuaidaili(self):
+        url = 'http://dev.kuaidaili.com/api/getproxy/?orderid=959961765125099&num=100&b_pcchrome=1&b_pcie=1&b_pcff=1&protocol=1&method=1&an_an=1&an_ha=1&quality=1&format=json&sep=2'
+        html = get_page(url)
+        if html:
+            result = json.loads(html)
+            ips = result.get('data').get('proxy_list')
+            for ip in ips:
+                yield ip
+    
+    def crawl_daili66(self, page_count=4):
+        """
+        获取代理66
+        :param page_count:
+        :return:
+        """
+        start_url = 'http://www.66ip.cn/{}.html'
+        urls = [start_url.format(page) for page in range(1, page_count + 1)]
+        for url in urls:
+            print('Crawling', url)
+            html = get_page(url)
+            if html:
+                doc = pq(html)
+                trs = doc('.containerbox table tr:gt(0)').items()
+                for tr in trs:
+                    ip = tr.find('td:nth-child(1)').text()
+                    port = tr.find('td:nth-child(2)').text()
+                    yield ':'.join([ip, port])
+    
+    def crawl_proxy360(self):
+        """
+        获取Proxy360
+        :return:
+        """
+        start_url = 'http://www.proxy360.cn/Region/China'
+        print('Crawling', start_url)
+        html = get_page(start_url)
+        if html:
+            doc = pq(html)
+            lines = doc('div[name="list_proxy_ip"]').items()
+            for line in lines:
+                ip = line.find('.tbBottomLine:nth-child(1)').text()
+                port = line.find('.tbBottomLine:nth-child(2)').text()
+                yield ':'.join([ip, port])
+    
+    def crawl_goubanjia(self):
+        """
+        获取Goubanjia
+        :return:
+        """
+        start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'
+        html = get_page(start_url)
+        if html:
+            doc = pq(html)
+            tds = doc('td.ip').items()
+            for td in tds:
+                td.find('p').remove()
+                yield td.text().replace(' ', '')

+ 83 - 32
proxypool/db.py

@@ -1,52 +1,103 @@
 import redis
 from proxypool.error import PoolEmptyError
-from proxypool.setting import HOST, PORT, PASSWORD
+from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY
+from proxypool.setting import MAX_SCORE, MIN_SCORE
+from random import choice
 
 
 class RedisClient(object):
-    def __init__(self, host=HOST, port=PORT):
-        if PASSWORD:
-            self._db = redis.Redis(host=host, port=port, password=PASSWORD)
-        else:
-            self._db = redis.Redis(host=host, port=port)
-
-    def get(self, count=1):
+    def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
         """
-        get proxies from redis
+        初始化
+        :param host:
+        :param port:
+        :param password:
         """
-        proxies = self._db.lrange("proxies", 0, count - 1)
-        self._db.ltrim("proxies", count, -1)
-        return proxies
-
-    def put(self, proxy):
+        self.db = redis.StrictRedis(host=host, port=port, password=password)
+    
+    def top(self):
         """
-        add proxy to right top
+        获取排名第一的代理
+        :return:
         """
-        self._db.rpush("proxies", proxy)
-
-    def pop(self):
+        proxies = self.db.zrevrange(REDIS_KEY, 0, 0)
+        if proxies:
+            return proxies[0].decode('utf-8')
+        else:
+            raise PoolEmptyError
+    
+    def add(self, proxy, score=MAX_SCORE):
+        """
+        添加代理,设置分数为最高
+        :param proxy:
+        :param score:
+        :return:
         """
-        get proxy from right.
+        self.db.zadd(REDIS_KEY, score, proxy)
+    
+    def random(self):
         """
-        try:
-            return self._db.rpop("proxies").decode('utf-8')
-        except:
+        随机获取有效代理
+        :return:
+        """
+        result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
+        if len(result):
+            return choice(result).decode('utf-8')
+        else:
             raise PoolEmptyError
-
-    @property
-    def queue_len(self):
+    
+    def decrease(self, proxy):
         """
-        get length from queue.
+        代理值减一分,小于最小值则删除
+        :param proxy:
+        :return:
         """
-        return self._db.llen("proxies")
-
-    def flush(self):
+        score = self.db.zscore(REDIS_KEY, proxy)
+        if score and score > MIN_SCORE:
+            self.db.zincrby(REDIS_KEY, proxy, -1)
+            print('代理', proxy, '当前分数', score, '减1')
+        else:
+            self.db.zrem(REDIS_KEY, proxy)
+            print('代理', proxy, '当前分数', score, '移除')
+    
+    def exists(self, proxy):
+        """
+        判断是否存在
+        :param proxy: 
+        :return: 
+        """
+        return not self.db.zscore(REDIS_KEY, proxy) == None
+    
+    def max(self, proxy):
+        """
+        将代理设置为MAX_SCORE
+        :param proxy:
+        :return:
+        """
+        self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
+    
+    def count(self):
+        """
+        获取数量
+        :return:
+        """
+        return self.db.zcard(REDIS_KEY)
+    
+    def all(self):
         """
-        flush db
+        获取全部代理
+        :return:
         """
-        self._db.flushall()
+        all = self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
+        return [item.decode('utf-8') for item in all]
 
 
 if __name__ == '__main__':
     conn = RedisClient()
-    print(conn.pop())
+    result = conn.all()
+    print(result)
+    random = conn.random()
+    print('Random', random)
+    top = conn.top()
+    print('Top', top)
+    conn.decrease('a')

+ 1 - 10
proxypool/error.py

@@ -1,16 +1,7 @@
-class ResourceDepletionError(Exception):
-
-    def __init__(self):
-        Exception.__init__(self)
-
-    def __str__(self):
-        return repr('The proxy source is exhausted')
-
-
 class PoolEmptyError(Exception):
 
     def __init__(self):
         Exception.__init__(self)
 
     def __str__(self):
-        return repr('The proxy pool is empty')
+        return repr('代理池已经枯竭')

+ 39 - 75
proxypool/getter.py

@@ -1,75 +1,39 @@
-from .utils import get_page
-from pyquery import PyQuery as pq
-
-
-class ProxyMetaclass(type):
-    """
-        元类,在FreeProxyGetter类中加入
-        __CrawlFunc__和__CrawlFuncCount__
-        两个参数,分别表示爬虫函数,和爬虫函数的数量。
-    """
-    def __new__(cls, name, bases, attrs):
-        count = 0
-        attrs['__CrawlFunc__'] = []
-        for k, v in attrs.items():
-            if 'crawl_' in k:
-                attrs['__CrawlFunc__'].append(k)
-                count += 1
-        attrs['__CrawlFuncCount__'] = count
-        return type.__new__(cls, name, bases, attrs)
-
-
-class FreeProxyGetter(object, metaclass=ProxyMetaclass):
-
-    def get_raw_proxies(self, callback):
-        proxies = []
-        print('Callback', callback)
-        for proxy in eval("self.{}()".format(callback)):
-            print('Getting', proxy, 'from', callback)
-            proxies.append(proxy)
-        return proxies
-
-    def crawl_daili66(self, page_count=4):
-        start_url = 'http://www.66ip.cn/{}.html'
-        urls = [start_url.format(page) for page in range(1, page_count + 1)]
-        for url in urls:
-            print('Crawling', url)
-            html = get_page(url)
-            if html:
-                doc = pq(html)
-                trs = doc('.containerbox table tr:gt(0)').items()
-                for tr in trs:
-                    ip = tr.find('td:nth-child(1)').text()
-                    port = tr.find('td:nth-child(2)').text()
-                    yield ':'.join([ip, port])
-
-    def crawl_proxy360(self):
-        start_url = 'http://www.proxy360.cn/Region/China'
-        print('Crawling', start_url)
-        html = get_page(start_url)
-        if html:
-            doc = pq(html)
-            lines = doc('div[name="list_proxy_ip"]').items()
-            for line in lines:
-                ip = line.find('.tbBottomLine:nth-child(1)').text()
-                port = line.find('.tbBottomLine:nth-child(2)').text()
-                yield ':'.join([ip, port])
-
-    def crawl_goubanjia(self):
-        start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'
-        html = get_page(start_url)
-        if html:
-            doc = pq(html)
-            tds = doc('td.ip').items()
-            for td in tds:
-                td.find('p').remove()
-                yield td.text().replace(' ', '')
-
-    def crawl_haoip(self):
-        start_url = 'http://haoip.cc/tiqu.htm'
-        html = get_page(start_url)
-        if html:
-            doc = pq(html)
-            results = doc('.row .col-xs-12').html().split('<br/>')
-            for result in results:
-                if result: yield result.strip()
+from proxypool.tester import Tester
+from proxypool.db import RedisClient
+from proxypool.crawler import Crawler
+from proxypool.setting import *
+
+
+class Getter():
+    def __init__(self):
+        self.redis = RedisClient()
+        self.tester = Tester()
+        self.crawler = Crawler()
+    
+    def is_over_threshold(self):
+        """
+        判断是否达到了代理池限制
+        """
+        if self.redis.count() >= POOL_UPPER_THRESHOLD:
+            return True
+        else:
+            return False
+    
+    def run(self):
+        print('获取器开始执行')
+        proxy_count = 0
+        while not self.is_over_threshold():
+            for callback_label in range(self.crawler.__CrawlFuncCount__):
+                callback = self.crawler.__CrawlFunc__[callback_label]
+                # 获取代理
+                proxies = self.crawler.get_proxies(callback)
+                # 设置代理并测试
+                self.tester.set_proxies(proxies)
+                self.tester.run()
+                proxy_count += len(proxies)
+                if self.is_over_threshold():
+                    print('代理池已满,暂停抓取')
+                    break
+            if proxy_count == 0:
+                # 代理池枯竭
+                print('代理池已枯竭')

+ 0 - 134
proxypool/schedule.py

@@ -1,134 +0,0 @@
-import time
-from multiprocessing import Process
-import asyncio
-import aiohttp
-try:
-    from aiohttp.errors import ProxyConnectionError
-except:
-    from aiohttp import  ClientProxyConnectionError as ProxyConnectionError
-from proxypool.db import RedisClient
-from proxypool.error import ResourceDepletionError
-from proxypool.getter import FreeProxyGetter
-from proxypool.setting import *
-from asyncio import TimeoutError
-
-
-class ValidityTester(object):
-    test_api = TEST_API
-
-    def __init__(self):
-        self._raw_proxies = None
-        self._usable_proxies = []
-
-    def set_raw_proxies(self, proxies):
-        self._raw_proxies = proxies
-        self._conn = RedisClient()
-
-    async def test_single_proxy(self, proxy):
-        """
-        text one proxy, if valid, put them to usable_proxies.
-        """
-        async with aiohttp.ClientSession() as session:
-            try:
-                if isinstance(proxy, bytes):
-                    proxy = proxy.decode('utf-8')
-                real_proxy = 'http://' + proxy
-                print('Testing', proxy)
-                async with session.get(self.test_api, proxy=real_proxy, timeout=15) as response:
-                    if response.status == 200:
-                        self._conn.put(proxy)
-                        print('Valid proxy', proxy)
-            except (ProxyConnectionError, TimeoutError, ValueError):
-                print('Invalid proxy', proxy)
-
-    def test(self):
-        """
-        aio test all proxies.
-        """
-        print('ValidityTester is working')
-        try:
-            loop = asyncio.get_event_loop()
-            tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies]
-            loop.run_until_complete(asyncio.wait(tasks))
-        except ValueError:
-            print('Async Error')
-
-
-class PoolAdder(object):
-    """
-    add proxy to pool
-    """
-
-    def __init__(self, threshold):
-        self._threshold = threshold
-        self._conn = RedisClient()
-        self._tester = ValidityTester()
-        self._crawler = FreeProxyGetter()
-
-    def is_over_threshold(self):
-        """
-        judge if count is overflow.
-        """
-        if self._conn.queue_len >= self._threshold:
-            return True
-        else:
-            return False
-
-    def add_to_queue(self):
-        print('PoolAdder is working')
-        proxy_count = 0
-        while not self.is_over_threshold():
-            for callback_label in range(self._crawler.__CrawlFuncCount__):
-                callback = self._crawler.__CrawlFunc__[callback_label]
-                raw_proxies = self._crawler.get_raw_proxies(callback)
-                # test crawled proxies
-                self._tester.set_raw_proxies(raw_proxies)
-                self._tester.test()
-                proxy_count += len(raw_proxies)
-                if self.is_over_threshold():
-                    print('IP is enough, waiting to be used')
-                    break
-            if proxy_count == 0:
-                raise ResourceDepletionError
-
-
-class Schedule(object):
-    @staticmethod
-    def valid_proxy(cycle=VALID_CHECK_CYCLE):
-        """
-        Get half of proxies which in redis
-        """
-        conn = RedisClient()
-        tester = ValidityTester()
-        while True:
-            print('Refreshing ip')
-            count = int(0.5 * conn.queue_len)
-            if count == 0:
-                print('Waiting for adding')
-                time.sleep(cycle)
-                continue
-            raw_proxies = conn.get(count)
-            tester.set_raw_proxies(raw_proxies)
-            tester.test()
-            time.sleep(cycle)
-
-    @staticmethod
-    def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,
-                   upper_threshold=POOL_UPPER_THRESHOLD,
-                   cycle=POOL_LEN_CHECK_CYCLE):
-        """
-        If the number of proxies less than lower_threshold, add proxy
-        """
-        conn = RedisClient()
-        adder = PoolAdder(upper_threshold)
-        while True:
-            if conn.queue_len < lower_threshold:
-                adder.add_to_queue()
-            time.sleep(cycle)
-
-    def run(self):
-        print('Ip processing running')
-        valid_process = Process(target=Schedule.valid_proxy)
-        check_process = Process(target=Schedule.check_pool)
-        valid_process.start()
-        check_process.start()

+ 58 - 0
proxypool/scheduler.py

@@ -0,0 +1,58 @@
+import time
+from multiprocessing import Process
+from proxypool.api import app
+from proxypool.getter import Getter
+from proxypool.tester import Tester
+from proxypool.db import RedisClient
+from proxypool.setting import *
+
+class Scheduler(object):
+    def schedule_tester(cycle=TESTER_CYCLE):
+        """
+        定时测试代理
+        """
+        redis = RedisClient()
+        tester = Tester()
+        while True:
+            print('测试器开始运行')
+            count = redis.count()
+            if count == 0:
+                print('代理池已枯竭,等待添加代理')
+                time.sleep(cycle)
+                continue
+            proxies = redis.all()
+            tester.set_proxies(proxies)
+            print('开始检测全部代理')
+            tester.run()
+            time.sleep(cycle)
+    
+    def schedule_getter(cycle=GETTER_CYCLE):
+        """
+        定时获取代理
+        """
+        while True:
+            print('开始抓取代理')
+            getter = Getter()
+            getter.run()
+            time.sleep(cycle)
+    
+    def schedule_api(self):
+        """
+        开启API
+        """
+        app.run(API_HOST, API_PORT)
+    
+    def run(self):
+        print('代理池开始运行')
+        
+        if TESTER_ENABLED:
+            tester_process = Process(target=self.schedule_tester)
+            tester_process.start()
+        
+        if GETTER_ENABLED:
+            getter_process = Process(target=self.schedule_getter)
+            getter_process.start()
+        
+        if API_ENABLED:
+            api_process = Process(target=self.schedule_api)
+            api_process.start()

+ 25 - 9
proxypool/setting.py

@@ -1,17 +1,33 @@
 # Redis数据库的地址和端口
-HOST = 'localhost'
-PORT = 6379
+REDIS_HOST = 'localhost'
+REDIS_PORT = 6379
 
 # 如果Redis有密码,则添加这句密码,否则设置为None
-PASSWORD = 'foobared'
+REDIS_PASSWORD = 'foobared'
+REDIS_KEY = 'proxies'
+
+# 代理分数
+MAX_SCORE = 100
+MIN_SCORE = 0
+
+VALID_STATUS_CODES = [200]
 
 # 代理池数量界限
-POOL_LOWER_THRESHOLD = 10
-POOL_UPPER_THRESHOLD = 100
+POOL_UPPER_THRESHOLD = 10000
 
 # 检查周期
-VALID_CHECK_CYCLE = 60
-POOL_LEN_CHECK_CYCLE = 20
+TESTER_CYCLE = 30
+# 获取周期
+GETTER_CYCLE = 20
+
+# 测试API,建议抓哪个网站测哪个
+TEST_API = 'https://m.weibo.cn/api/container/getIndex?type=uid&value=2145291155&containerid=1076032145291155&page=14'
+
+# API配置
+API_HOST = '0.0.0.0'
+API_PORT = 5555
 
-# 测试API,用百度来测试
-TEST_API='http://www.baidu.com'
+# 开关
+TESTER_ENABLED = True
+GETTER_ENABLED = True
+API_ENABLED = True

+ 60 - 0
proxypool/tester.py

@@ -0,0 +1,60 @@
+import asyncio
+import aiohttp
+
+try:
+    from aiohttp import ClientError
+except:
+    from aiohttp import ClientProxyConnectionError as ProxyConnectionError
+from proxypool.db import RedisClient
+from proxypool.setting import *
+
+
+class Tester(object):
+    def __init__(self):
+        self.proxies = None
+        self.redis = RedisClient()
+    
+    def set_proxies(self, proxies):
+        """
+        设置代理
+        :param proxies:
+        :return:
+        """
+        self.proxies = proxies
+    
+    async def test_single_proxy(self, proxy):
+        """
+        测试单个代理
+        :param proxy: 
+        :return: 
+        """
+        async with aiohttp.ClientSession() as session:
+            try:
+                if isinstance(proxy, bytes):
+                    proxy = proxy.decode('utf-8')
+                real_proxy = 'http://' + proxy
+                print('正在测试', proxy)
+                async with session.get(TEST_API, proxy=real_proxy, timeout=15) as response:
+                    if response.status in VALID_STATUS_CODES:
+                        self.redis.add(proxy)
+                        print('代理可用', proxy)
+                    else:
+                        self.redis.decrease(proxy)
+                        print('请求响应码不合法,IP', proxy)
+            except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError):
+                if self.redis.exists(proxy):
+                    self.redis.decrease(proxy)
+                print('代理请求失败', proxy)
+    
+    def run(self):
+        """
+        测试主函数
+        :return:
+        """
+        print('测试器开始运行')
+        try:
+            loop = asyncio.get_event_loop()
+            tasks = [self.test_single_proxy(proxy) for proxy in self.proxies]
+            loop.run_until_complete(asyncio.wait(tasks))
+        except Exception as e:
+            print('测试器发生错误', e.args)

+ 12 - 33
proxypool/utils.py

@@ -1,6 +1,4 @@
 import requests
-import asyncio
-import aiohttp
 from requests.exceptions import ConnectionError
 
 base_headers = {
@@ -11,38 +9,19 @@ base_headers = {
 
 
 def get_page(url, options={}):
+    """
+    抓取代理
+    :param url:
+    :param options:
+    :return:
+    """
     headers = dict(base_headers, **options)
-    print('Getting', url)
+    print('正在抓取', url)
     try:
-        r = requests.get(url, headers=headers)
-        print('Getting result', url, r.status_code)
-        if r.status_code == 200:
-            return r.text
+        response = requests.get(url, headers=headers)
+        print('抓取成功', url, response.status_code)
+        if response.status_code == 200:
+            return response.text
     except ConnectionError:
-        print('Crawling Failed', url)
+        print('抓取失败', url)
         return None
-
-
-class Downloader(object):
-    """
-    一个异步下载器,可以对代理源异步抓取,但是容易被BAN。
-    """
-
-    def __init__(self, urls):
-        self.urls = urls
-        self._htmls = []
-
-    async def download_single_page(self, url):
-        async with aiohttp.ClientSession() as session:
-            async with session.get(url) as resp:
-                self._htmls.append(await resp.text())
-
-    def download(self):
-        loop = asyncio.get_event_loop()
-        tasks = [self.download_single_page(url) for url in self.urls]
-        loop.run_until_complete(asyncio.wait(tasks))
-
-    @property
-    def htmls(self):
-        self.download()
-        return self._htmls

+ 4 - 4
run.py

@@ -1,11 +1,11 @@
 from proxypool.api import app
-from proxypool.schedule import Schedule
+from proxypool.scheduler import Scheduler
+
 
 def main():
-    s = Schedule()
+    s = Scheduler()
     s.run()
-    app.run()
+
 
 if __name__ == '__main__':
     main()
-