您的位置:首页 >新闻资讯 > 正文

如何实现增强版的代理IP池?

来源:互联网 作者:admin 时间:2019-09-02 15:35:48

如何实现增强版的代理IP池?之前实现了一个版本的代理IP池:爬虫代理IP池的实现。整个过程和设计思路都是我自己想出来的,实际用下来效果还可以,但是有一些不是很如意的地方,比如说:


如何实现增强版的代理IP池?


用起来很麻烦。如果你要在其他程序里使用这个代理代码,你需要不断复制粘贴proxy.py,同时Python环境要支持sqlite3

刚初始化的时候IP的质量还不错,但是如果隔一天在跑,原本在数据库里的IP很多都不能用了(但是仍会被标记为有效的),这样就会导致第二天用的时候前几个IP质量很差。

IP质量的鉴别,我后来才知道有时候IP不能用可能只是临时的,应该再给它一次机会。

代理是爬虫必不可少的一部分,自然一个能够高效、稳定提供高质量的代理IP池就是爬虫工作的刚需了。我就开始想,一个“代理IP池”的理想状态究竟应该是什么样?它至少要满足两个条件:


用起来要方便,最好在代码的任何地方,只要调用一个接口就行,没有任何限制条件。

这个代理池要稳定,每时每刻都要有合理数量的有效IP在池里。

后来我就想不如把它做成一个类似于“服务”的程序:


正好手头有一个AWS的EC2,装了Ubuntu 16.04.2 LTS, Python 3.5, 可以当作服务器用用

程序维护一个大概20个左右的IP池, 起一个crontab,保证这个程序不会挂

程序采用多进程:

获取IP,如果要从各种网站抓,那这里可能要再分配几个线程;我这里直接用代理网站的接口,一条足以。

校验IP池里IP的有效性,这个要多几条线程,线程的数量取决于校验的速度(异步IO?)。

提供IP的接口,这里我本来想做个socket服务端,后来看网上说可以用Flask做成一个轻量的web服务器,之后用GET请求就可以了。

定时清理没用的IP

Web端再做一个app,用来返回请求的IP地址,供判断代理是否生效。

代码细节

如何评估IP的质量

之前的版本对IP的评估过于草率,因为一个如果一个IP一次校验的时候失败了,这并不代表这个IP是无效的,它可能只是这次请求没成功,下次说不定就成功了。因此,最快能想到的评估IP质量的方法就是计分,每个IP都有一个基础分,如果校验成功则分数+1,失败则分数-1。


POOL_IP_QUALITY配置项决定了IP的质量,它有HIGH,MEDIUM,LOW几个级别,默认情况下IP质量等级是MEDIUM。如果cleaner发现一个IP的分数低于MEDIUM的值,那么这个IP就会被认为是无效的,会被清除掉。IP质量级别越高,分数阈值也越高,理论上,质量级别越高,IP越难找。


webapi会提供一个请求,返回当前进程池里分数最高的一个IP,同时这个IP的分数会-1。这样做的目的是避免每次都返回这个IP,给其他IP一个机会。


存储IP方面,我最开始是想用Redis数据库的集合Set做的,后来无意中瞄到了set下面的有序set。有序set不仅是有序的,而且还自带一个score属性,简直是为这个项目量身定做的数据类型。


其二,使用集合可以避免添加重复的IP。


其三,Redis数据库是一个独立运行的数据库,也许这就是数据持久化。


还有其四,按照一般的多进程思路,因为我使用的是一个全局Redis连接,那么多个进程竞争的话,必须要加上锁。但是我查了一下Redis的文档,上面说Redis的操作都是原子操作,所以锁就可以省去了。(我不确定这么理解对不对,我写完之后跑了大概一天,代码没出问题。)


proxy.py

主进程,会产生四条子进程。


采用Flask实现轻量级WebAPI

具体参考下文的webapi.py


补充IP到进程池 - retriever

retriever会按照配置项中RETRIEVER_INTERVAL的大小定时检查进程池,如果发现进程池里的IP数量小于POOL_SIZE就会从IP源请求足够的IP来填满进程池。IP源可以是自己写爬虫抓取的免费IP,也可以自己花钱买,方式不重要。


def fn_retriever():

    global redis_clent

    fn_name = sys._getframe().f_code.co_name

    while True:

        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Routine Retriever Start.")

        cnt = redis_client.zcard(Configs.REDIS_KEY)

        ip_cnt_needed = Configs.POOL_SIZE - redis_client.zcard(Configs.REDIS_KEY)


        if ip_cnt_needed <= 0:

            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Enough IP(s) in Redis.")

            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Entering Sleep: {0:d}s.".format(Configs.RETRIEVER_INTERVAL))

            time.sleep(Configs.RETRIEVER_INTERVAL)

            continue


        tid = Configs.DAXIANG_RPOXY_ORDERID

        url = "http://tvp.daxiangdaili.com/ip/?tid={0:s}&num={1:d}&delay=5&category=2&sortby=time&filter=on&format=json&protocol=https".format(tid, ip_cnt_needed)


        try:

            response = requests.get(url)

            content = None


            if response.status_code == requests.codes.ok:

                content = response.text

        except Exception as e:

            print (e)


        res_json = ast.literal_eval(content.strip())


        if res_json:

            for addr in res_json:

                if addr.get('error'):

                    continue

                redis_client.zadd(Configs.REDIS_KEY, Configs.MEDIUM, '{0:s}:{1:d}'.format(addr.get('host'), addr.get('port')))


        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Refill {0:d} IP(s) to Redis.".format(ip_cnt_needed))

校验进程池里的IP的有效性 - validator

validator会每隔VALIDATOR_INTERVAL秒检查一下次当前进程池里分数低于POOL_IP_QUALITY的IP,为什么是这个值?因为之后cleaner会把所有分数低于POOL_IP_QUALITY 的IP清理掉,如果IP的分数高于POOL_IP_QUALITY,那么即使校验失败分数也不会因为低于阈值而被清理掉;同理,如果IP的分数低于POOL_IP_QUALITY,即使校验成功仍然会因为分数低于阈值而被清理掉;只有分数等于POOL_IP_QUALITY的IP,还有一次翻身的机会。


这里为什么不检查所有的IP?目的就是为了加速校验这个过程。当然这里也可以使用多线程或异步IO进行全局IP校验,这肯定是可以的。不过我的进程池很小,我就没这么做。当然如果进程池要很大的话,我个人推荐直接去购买相应的代理IP服务。


判断一个IP是否有效,只需要用它访问一个页面,检查返回码是否为200即可。另外,我在webapi里提供了一个/myIP地址,访问的话,它会返回一个页面内容为请求头中Remote Address字段的值。


def __validation(addr):

    proxies = {

        "http": "http://{0}".format(addr),

        "https": "http://{0}".format(addr)

    }


    header = {}

    header['user-agent'] = choice(Configs.FakeUserAgents)


    try:

        response = requests.get("http://52.206.77.228:5000/myIP", headers=header, proxies=proxies, timeout=5)

    except Exception as e:

        return False

    else:

        if response.status_code == requests.codes.ok:

            #print (response.text)

            return True


def fn_validator():

    global redis_clent

    fn_name = sys._getframe().f_code.co_name

    while True:

        # Test all ips whose score >= POOL_IP_QUALITY - 1

        # False ==> score - 1

        # True  ==> score + 1 

        maxscore = redis_client.zrange(Configs.REDIS_KEY, 0, 0, desc=True, withscores=True)

        if not maxscore:

            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Pool is empty. Entering Sleep: {0:d}s.".format(Configs.VALIDATOR_INTERVAL))

            time.sleep(Configs.VALIDATOR_INTERVAL)

            continue


        maxscore = maxscore[0][1]

        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Max score in this round: {0:d}.".format(int(maxscore)))

        res = redis_client.zrangebyscore(Configs.REDIS_KEY, Configs.POOL_IP_QUALITY - 1, maxscore)

        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Start to Validate {0:d} IP(s).".format(len(res)))


        increment = []


        i = 0

        for ip in res:

            n = 1 if __validation(ip.decode('utf-8')) else -1

            increment.append([ip, n])

            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "[{0:d}]Validated[{1:s}], Result:[{2:d}].".format(i, ip.decode('utf-8'), n))

            i += 1

            

        for inc in increment:

            redis_client.zincrby(Configs.REDIS_KEY, inc[0], inc[1])


        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Validation finished. Entering Sleep: {0:d}s.".format(Configs.VALIDATOR_INTERVAL))

        time.sleep(Configs.VALIDATOR_INTERVAL)

清除进程池里无效的IP - cleaner

如上文所述,cleaner会每隔CLEANER_INTERVAL秒清理一次进程池。


def fn_cleaner():

    global redis_clent

    fn_name = sys._getframe().f_code.co_name

    while True:

        # Remove all ips whose score < POOL_IP_QUALITY

        res = redis_client.zremrangebyscore(Configs.REDIS_KEY, -1, Configs.POOL_IP_QUALITY - 1)

        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Remove {0:d} IP(s) from Redis.".format(res))

        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Entering Sleep: {0:d}s.".format(Configs.CLEANER_INTERVAL))

        time.sleep(Configs.CLEANER_INTERVAL)

webapi.py - Flask

除了/myIP, webapi还提供了一个简单的欢迎页面和/getIP页面,后者会返回当前进程池里分数最高的IP,格式为IP:PORT的字符串,可以直接使用,非常方便。


[图片上传失败...(image-c5b5f9-1532102907537)]


[图片上传失败...(image-e95048-1532102907537)]


from flask import Flask

from flask import request

import redis


import proxy as Proxy

import configure as Configs


app = Flask("__name__")


@app.route('/')

def welcome():

    strr = '''

        <h3>Welcome to IP proxy pooling</h3><br>

        Access <strong>/myIP</strong> to retrieve 'remote address' header in GET request. For IP validation.<br>

        Access <strong>/getIP</strong> to retrieve an Proxy IP, format: IP:PORT<br>

        <p>Ethan Huang, <a href="https://journal.ethanshub.com/">https://journal.ethanshub.com/</a></p>

    '''

    return strr


@app.route("/myIP", methods=["GET"])

def myIP():

    return request.remote_addr


@app.route("/getIP")

def getIP():

    return Proxy.get_one_ip()


if __name__ == '__main__':

    app.run('0.0.0.0',5000)

logger

除了核心的部分,我还顺便实现了一个logger模块,它可以接受其他模块传来的日志,并根据配置项LOG_TYPE决定日志是作为标准输出至屏幕,还是写入文件。


日志分为三个级别LOG_LEVEL_FINE, LOG_LEVEL_FINER, LOG_LEVEL_FINEST,由产生日志的模块自行选择,但是配置项LOG_LEVEL决定输出哪个级别以该级别以上的日志,默认为LOG_LEVEL_FINE。


import time

import configure as Configs


def log(level, module_name, content):

    file = open(Configs.LOG_FILE, 'a', encoding='utf-8')

    #global file

    if Configs.LOG_TYPE == 0:

        fn = print

    elif Configs.LOG_TYPE == 1:

        fn = file.write

    else:

        fn = print


    strr = "[{0:d}][{1:s}] --- {2:12s} --- {3:s}".format(level, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), module_name, content)

    

    if level >= Configs.LOG_LEVEL:

        if fn == file.write:

            strr = strr + '\n'


        fn(strr)


    file.close()

写完这个,我发现Python竟然有logging模块,惊了个呆。反正我这个也能用,就先用着。


configure.py

配置项


# Proxy related

DAXIANG_RPOXY_ORDERID = '大象代理的订单号'

POOL_SIZE = 10

HIGH, MEDIUM, LOW = 8, 5, 2

POOL_IP_QUALITY = MEDIUM

VALIDATOR_THREAD_NUM = 1

RETRIEVER_INTERVAL = 60

VALIDATOR_INTERVAL = 60

CLEANER_INTERVAL = 60


# Web API

API_ADDR = '0.0.0.0'

API_PORT = '5000'


# Redis

REDIS_HOST = '52.206.77.228' # 这是我的服务器的公网IP

REDIS_PORT = '6379'

REDIS_KEY  = 'PROXYS'


# Logger

# 0 - Standard Output 1 - file

LOG_TYPE = 0

LOG_FILE = 'ohIPPool.log'


LOG_LEVEL_FINE = 0

LOG_LEVEL_FINER = 1

LOG_LEVEL_FINEST = 2

LOG_LEVEL = LOG_LEVEL_FINE

最后把代码部署到服务器上即可,像AWS的EC2需要额外配置安全组,其他就和本地一样了。还有一点就是Ubuntu一般会有两个甚至多个Python版本共存,执行的时候要注意使用相对应版本的python和pip。具体就不说了。


相关文章内容简介