脚本检测CDN节点资源是否与源站资源一致
生活随笔
收集整理的這篇文章主要介紹了
脚本检测CDN节点资源是否与源站资源一致
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
需求:
1、所有要檢測的資源url放到一個(gè)單獨(dú)文件中
2、檢測cdn節(jié)點(diǎn)資源大小與源站文件大小是否一致
3、隨機(jī)抽查幾個(gè)資源,檢查md5sum是否一致
4、使用多線程,可配置線程數(shù)
?
代碼目錄:
hexm:Hexm hexm$ tree ./checkcdn ./checkcdn ├── README.TXT ├── check.py # 主程序 ├── conf │ └── url.txt # 配置文件 ├── lib │ ├── __init__.py │ ├── common.py │ └── threadpool.py # 線程池 └── tmp├── cdn # 存放從CDN節(jié)點(diǎn)系在的資源└── origin # 存放從源站下載的資源?
README.TXT
依賴:requests 兼容性:兼容Python3以及Python2.7使用方法:usage: check.py [-h] [-t THREADS] [-c COUNTS]optional arguments:-h, --help show this help message and exit-t THREADS, --threads THREADS開啟多少線程,默認(rèn)5個(gè)-c COUNTS, --counts COUNTS檢測多少個(gè)包的md5值,默認(rèn)3個(gè)conf/url.txt
http://xxx_1020101.apkhttp://xxx_1020102.apk
http://xxx_1020103.apk
http://xxx_1020104.apk
check.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : check.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-24 10:03import os import sys import random import argparse import requestsBASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(BASE_DIR)# 代理IP PROXIES = {"http": "http://183.136.135.191:80", } # 配置文件 CONFIG = BASE_DIR + '/conf/url.txt' # 保存CDN節(jié)點(diǎn)文件臨時(shí)目錄 CDNTEMPDIR = BASE_DIR + '/tmp/cdn/' # 保存源站文件臨時(shí)目錄 ORIGINTEMPDIR = BASE_DIR + '/tmp/origin/'from lib.threadpool import ThreadPool from lib.common import isdir, download, getfilemd5def callback(status, result):"""回調(diào)函數(shù),如果函數(shù)有返回值得話用得到:param status: 狀態(tài) True or None:param result: 函數(shù)返回值"""passdef checkstatus(url):"""通過head方法查看源站與當(dāng)前CDN節(jié)點(diǎn)資源大小是否一致:param url: url:return: None"""r1 = requests.head(url, proxies=PROXIES)r2 = requests.head(url)if r1.status_code == 200 and r2.status_code == 200:if r1.headers['Content-Length'] == r2.headers['Content-Length']:print("%s 源站和CDN節(jié)點(diǎn)資源\033[0;32m一致\033[0m, 源站文件大小為%s,CDN節(jié)點(diǎn)文件大小為%s"% (url,r1.headers['Content-Length'],r2.headers['Content-Length']))else:print("%s 源站和CDN節(jié)點(diǎn)資源\033[0;31m不一致\033[0m, 源站文件大小為%s,CDN節(jié)點(diǎn)文件大小為%s"% (url,r1.headers['Content-Length'],r2.headers['Content-Length']))else:print("%s 源站和CDN節(jié)點(diǎn)狀態(tài)碼\033[0;31m異常\033[0m,源站狀態(tài)碼為%s,CDN節(jié)點(diǎn)狀態(tài)碼為%s"% (url,r1.status_code,r2.status_code))def checkmd5(url, cdnTempDir, originTempDir):"""檢查源站與當(dāng)前cdn節(jié)點(diǎn)資源是否一致,下載超時(shí)300s:param url: url:param cdnTempDir: 保存從cdn節(jié)點(diǎn)下載的臨時(shí)文件目錄:param originTempDir: 保存從源站下載的臨時(shí)文件目錄:return: None"""filename = url.split('/')[-1]tempCdnFile = cdnTempDir + filenametempOriginFile = originTempDir + filenamestatus1 = download(url, tempOriginFile, proxies=PROXIES)if status1 is not None:if status1 == 200:status2 = download(url, tempCdnFile)else:print("%s \033[0;31m狀態(tài)碼異常\033[0m校驗(yàn)失敗" % url)if status1 == 200 and status2 == 200:if getfilemd5(tempCdnFile) == getfilemd5(tempOriginFile):print("%s 源站和cdn節(jié)點(diǎn)資源md5值\033[0;32m一致\033[0m," % url)else:print("%s 源站和cdn節(jié)點(diǎn)資源md5值\033[0;31m不一致\033[0m" % url)elif status1 is None or status2 is None:print("%s \033[0;31m下載失敗\033[0m" % url)# 檢查后刪除下載的文件try:os.remove(tempOriginFile)os.remove(tempCdnFile)except Exception as e:passdef parse_args():"""解析命令行參數(shù):return: args"""parser = argparse.ArgumentParser()help = '開啟多少線程,默認(rèn)5個(gè)'parser.add_argument('-t', '--threads', type=int, help=help, default='5')help = '檢測多少個(gè)包的md5值,默認(rèn)3個(gè)'parser.add_argument('-c', '--counts', type=int, help=help, default=3)args = parser.parse_args()return argsif __name__ == "__main__":if not isdir(CDNTEMPDIR): os.makedirs(CDNTEMPDIR)if not isdir(ORIGINTEMPDIR): os.makedirs(ORIGINTEMPDIR)# 從文件中獲取所有urlurls = [line.strip() for line in open(CONFIG, mode='r').readlines()]args = parse_args()# 檢查包大小pool = ThreadPool(args.threads) # 最多創(chuàng)建5個(gè)線程for url in urls:pool.run(checkstatus, (url,), callback=None)# 隨機(jī)抽查3個(gè),檢查md5for randurl in random.sample(urls, args.counts):pool.run(checkmd5, (randurl, CDNTEMPDIR, ORIGINTEMPDIR,), callback=None)pool.close() check.pylib/common.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : common.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-24 10:03import os import hashlib import requestsdef getfilesize(path):"""獲取文件大小:param path: 文件路徑:return: 返回文件大小"""return os.path.getsize(path)def isfile(path):"""判斷是否是文件:param path: 文件路徑:return: 如果是返回True,否則返回None"""if os.path.isfile(path): return Truedef isdir(path):"""判斷是否是目錄:param path: 路徑:return: True or None"""if os.path.isdir(path): return Truedef getstatus(url, proxies=None):"""返回狀態(tài)碼:param url: url:return: 狀態(tài)碼"""return requests.head(url, proxies).status_codedef download(url, path, proxies=None):"""下載文件,并返回狀態(tài)碼:param url: 下載的url:param path: 保存文件的路徑:param proxies: 使用代理的地址:return: 返回狀態(tài)碼"""try:response = requests.get(url, proxies=proxies, stream=True, timeout=60)status = response.status_codetotal_size = int(response.headers['Content-Length'])# print(response.headers)if status == 200:with open(path, 'wb') as f:for chunk in response.iter_content(chunk_size=8192):if chunk: f.write(chunk)if total_size == getfilesize(path): # 下載文件大小與頭部Content-Length大小一致,則下載成功return status# 狀態(tài)碼非200,返回狀態(tài)碼else: return statusexcept Exception as e:return Nonedef getfilemd5(path):"""返回文件的md5sum:param path: 文件路徑:return: 返回校驗(yàn)和,否則返回None"""if isfile(path):md5obj = hashlib.md5()maxbuf = 8192f = open(path, 'rb')while True:buf = f.read(maxbuf)if not buf:breakmd5obj.update(buf)f.close()hash = md5obj.hexdigest()return hashreturn Noneif __name__ == "__main__":pass View Codelib/threadpool.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : threadpool.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-23 20:03import sys if sys.version > '3':import queue else:import Queue as queue import threading import contextlib import timeStopEvent = object() # 終止線程信號class ThreadPool(object):"""1、解決線程重用問題,當(dāng)前線程執(zhí)行完任務(wù)后,不殺掉,放到空閑線程列表,繼續(xù)執(zhí)行下個(gè)任務(wù)2、根據(jù)任務(wù)量開啟線程,如果設(shè)置10個(gè)線程,只有2個(gè)任務(wù),最多只會(huì)開啟兩個(gè)線程3、如果有500個(gè)任務(wù),任務(wù)執(zhí)行非常快,2個(gè)線程就能完成,如果設(shè)置開啟10個(gè)線程,只會(huì)開啟兩個(gè)線程"""def __init__(self, max_num, max_task_num = None):if max_task_num:self.q = queue.Queue(max_task_num) # 指定任務(wù)最大數(shù),默認(rèn)為None,不限定else:self.q = queue.Queue()self.max_num = max_num # 最多多少線程self.cancel = False # 執(zhí)行完所有任務(wù),終止線程信號self.terminal = False # 無論執(zhí)行完畢與否,都終止所有線程self.generate_list = [] # 已創(chuàng)建多少線程self.free_list = [] # 空閑多少線程def run(self, func, args, callback=None):"""線程池執(zhí)行一個(gè)任務(wù):param func: 任務(wù)函數(shù):param args: 任務(wù)函數(shù)所需參數(shù):param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù),回調(diào)函數(shù)有兩個(gè)參數(shù)1、任務(wù)函數(shù)執(zhí)行狀態(tài);2、任務(wù)函數(shù)返回值:return: 如果線程池已經(jīng)終止,則返回True否則None"""if self.cancel:return# 沒有空閑線程 并且已創(chuàng)建線程小于最大線程數(shù)才創(chuàng)建線程,if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread() # 滿足則創(chuàng)建線程,并將任務(wù)放進(jìn)隊(duì)列w = (func, args, callback,)# 函數(shù),元組,函數(shù) ,將這三個(gè)參數(shù)放在元組里面,當(dāng)成一個(gè)整體放到隊(duì)列里面self.q.put(w) # 滿足條件則創(chuàng)建線程,并把任務(wù)放隊(duì)列里面def generate_thread(self):"""創(chuàng)建一個(gè)線程"""t = threading.Thread(target=self.call) # 每一個(gè)線程被創(chuàng)建,執(zhí)行call方法 t.start()def call(self):"""循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)"""current_thread = threading.currentThread()self.generate_list.append(current_thread) # 每創(chuàng)建一個(gè)線程,將當(dāng)前線程名加進(jìn)已創(chuàng)建的線程列表 event = self.q.get() # 在隊(duì)列中取任務(wù), 沒任務(wù)線程就阻塞,等待取到任務(wù),線程繼續(xù)向下執(zhí)行while event != StopEvent: # 是否滿足終止線程 func, arguments, callback = event # 取出隊(duì)列中一個(gè)任務(wù)try:result = func(*arguments) # 執(zhí)行函數(shù),并將參數(shù)傳進(jìn)去success = Trueexcept Exception as e:success = Falseresult = Noneif callback is not None:try:callback(success, result)except Exception as e:passwith self.worker_state(self.free_list, current_thread): # 當(dāng)前線程執(zhí)行完任務(wù),將當(dāng)前線程置于空閑狀態(tài),#這個(gè)線程等待隊(duì)列中下一個(gè)任務(wù)到來,如果沒來,一直處于空閑, 如果到來,去任務(wù)if self.terminal:event = StopEventelse:event = self.q.get() # 將當(dāng)前任務(wù)加入到空閑列表后,如果有任務(wù),取到,沒有阻塞 取到后,移除當(dāng)前線程else: # 滿足終止線程,在創(chuàng)建的線程列表中移除當(dāng)前線程 self.generate_list.remove(current_thread)def close(self):"""執(zhí)行完所有的任務(wù)后,殺掉所有線程"""self.cancel = True # 標(biāo)志設(shè)置為Truefull_size = len(self.generate_list) + 1 # 已生成線程個(gè)數(shù), +1 針對python2.7while full_size:self.q.put(StopEvent) # full_size -= 1def terminate(self):"""無論是否還有任務(wù),終止線程"""self.terminal = Truewhile self.generate_list:self.q.put(StopEvent)self.q.queue.clear()@contextlib.contextmanagerdef worker_state(self, state_list, worker_thread):"""用于記錄線程中正在等待的線程數(shù)"""state_list.append(worker_thread) # 將當(dāng)前空閑線程加入空閑列表try:yieldfinally:state_list.remove(worker_thread) # 取到任務(wù)后,將當(dāng)前空閑線程從空閑線程里移除,# 使用例子 if __name__ == "__main__":pool = ThreadPool(5) # 創(chuàng)建pool對象,最多創(chuàng)建5個(gè)線程def callback(status, result):passdef action(i):time.sleep(1)print(i)for i in range(30): # 共30個(gè)任務(wù)ret = pool.run(action, (i,), callback=None) # 將action函數(shù),及action的參數(shù),callback函數(shù)傳給run()方法pool.close() View Code?
例子:
?
轉(zhuǎn)載于:https://www.cnblogs.com/xiaoming279/p/6626768.html
總結(jié)
以上是生活随笔為你收集整理的脚本检测CDN节点资源是否与源站资源一致的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BZOJ1036 (其实这只是一份板子)
- 下一篇: 接口测试用例设计思路