日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Python使用Redis实现IP代理池

發布時間:2023/11/28 生活经验 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python使用Redis实现IP代理池 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

可以使用快代理,芝麻代理,蘑菇代理?,訊代理等代理商提供API代理IP或者免費代理建立自己IP代理池

#使用apscheduler庫定時爬取ip,定時檢測ip刪除ip,做了2層檢測,第一層爬取后放入redis——db0進行檢測,成功的放入redis——db1再次進行檢測,確保獲取的代理ip的可用性import requests, redis
import pandas
import randomfrom apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import loggingdb_conn = redis.ConnectionPool(host="*.*.*.*", port=6379, password="123456")
redis_conn_0 = redis.Redis(connection_pool=db_conn, max_connections=10,db=0)
redis_conn_1 = redis.Redis(connection_pool=db_conn, max_connections=10,db=1)# 刪除redis數據庫里的ip
def remove_ip(ip,redis_conn):redis_conn.zrem("IP", ip)print("已刪除 %s..." % ip)# 獲取redis數據庫里一共有多少ip
def get_ip_num(redis_conn):num = redis_conn.zcard("IP")return num# 獲取ip的端口
def get_port(ip,redis_conn):port = redis_conn.zscore("IP", ip)port = str(port).replace(".0", "")return port# 添加ip和端口到數據庫里
def add_ip(ip, port,redis_conn):# nx: 不要更新已有的元素。總是添加新的元素,只有True,Falseredis_conn.zadd("IP", {ip: port}, nx=55)print("已添加 %s %s...ok" % (ip, port))# 列出所有的ip
def get_all_ip(redis_conn):all_ip = redis_conn.zrange("IP", 0, -1)return all_ip# 隨機獲取一個ip
def get_random_ip(redis_conn):end_num = get_ip_num(redis_conn)num = random.randint(0, end_num)random_ip = redis_conn.zrange("IP", num, num)if not random_ip:return "",""random_ip = str(random_ip[0]).replace("b", '').replace("'", "")port = get_port(random_ip,redis_conn)return random_ip, port# 獲取代理ip
def spider_ip(x,redis_conn):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)for p in range(1, 20):res = pandas.read_html("http://www.89ip.cn/index_{}.html".format(p))# print(res)# print(type(res[0]))for i in range(len(res[0])):ip = res[0].iloc[i, 0]port = res[0].iloc[i, 1]print("ip", ip)print("port", port)add_ip(str(ip), str(port),redis_conn)logging.basicConfig(level=logging.INFO,format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt='%Y-%m-%d %H:%M:%S',filename='log1.txt',filemode='a')def aps_detection_ip(x,redis_conn):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)res=get_random_ip(redis_conn)ip=res[0]port=res[1]try:requests.get("http://www.baidu.com",proxies={'https':'{ip}:{port}'.format(ip=ip,port=port)})print("可用",ip,port,res)if redis_conn!=redis_conn_1:add_ip(str(ip), str(port), redis_conn_1)except Exception:# ip錯誤失效就刪除remove_ip(ip,redis_conn)scheduler = BlockingScheduler()
scheduler.add_job(func=aps_detection_ip, args=('檢測循環任務0',redis_conn_0), trigger='interval', seconds=3, id='aps_detection_ip_task0',max_instances=10)
scheduler.add_job(func=spider_ip, args=('獲取循環任務0',redis_conn_0), trigger='interval', seconds=60*60*2, id='spider_ip_task0',max_instances=10)scheduler.add_job(func=aps_detection_ip, args=('檢測循環任務1',redis_conn_1), trigger='interval', seconds=3, id='aps_detection_ip_task1',max_instances=10)scheduler._logger = logging# scheduler.start()
if __name__ == '__main__':# print(get_ip_num())# spider_ip("獲取循環任務")scheduler.start()# aps_detection_ip("檢測循環任務")

?

總結

以上是生活随笔為你收集整理的Python使用Redis实现IP代理池的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。