python locust 时间戳过期_Locust源码分析之runners.py模块(6)
runners.py模塊是控制locust運(yùn)行的核心功能,控制著性能測試的運(yùn)行機(jī)制。包含了本地運(yùn)行,分布式運(yùn)行等。
主要由LocustRunner,LocalLocustRunner,DistributedLocustRunner,MasterLocustRunner以及SlaveLocustRunner三個(gè)類組成。其中LocustRunner類是其他類的基類,MasterLocustRunner和SlaveLocustRunner是DistributedLocustRunner的拓展類。
LocustRunner
LocustRunner是其余所有runner類的基類,也是整個(gè)locust中運(yùn)行的核心代碼
class LocustRunner(object):
# 實(shí)例化LocustRunner時(shí),需要傳入兩個(gè)參數(shù):locust_classes和options
def __init__(self, locust_classes, options):
self.options = options # 命令行中傳入的參數(shù)
self.locust_classes = locust_classes # locust類,task任務(wù)列表
self.hatch_rate = options.hatch_rate # 命令行參數(shù):每秒啟動(dòng)數(shù)
self.num_clients = options.num_clients # 命令行參數(shù):并發(fā)數(shù)
self.host = options.host # 命令行參數(shù):被壓服務(wù)器地址
self.locusts = Group() #協(xié)程組:gevent.pool.Group()
self.greenlet = self.locusts
self.state = STATE_INIT # 壓測狀態(tài),初始狀態(tài)為“READY”
self.hatching_greenlet = None
self.exceptions = {}
self.stats = global_stats # 在stats模塊中,有一個(gè)全局變量global_stats用于存儲(chǔ)當(dāng)前狀態(tài)
# 注冊(cè)監(jiān)聽器,當(dāng)收到reset_stats指令時(shí)重置當(dāng)前性能數(shù)據(jù)stats狀態(tài)
def on_hatch_complete(user_count):
self.state = STATE_RUNNING
if self.options.reset_stats:
logger.info("Resetting stats\n")
self.stats.reset_all()
events.hatch_complete += on_hatch_complete
# 屬性request_stats:返回當(dāng)前性能指標(biāo)狀態(tài)
@property
def request_stats(self):
return self.stats.entries
# 屬性errors:返回當(dāng)前錯(cuò)誤信息
@property
def errors(self):
return self.stats.errors
# 屬性u(píng)ser_count,返回用戶并發(fā)數(shù)
@property
def user_count(self):
return len(self.locusts)
def weight_locusts(self, amount, stop_timeout = None):
"""
根據(jù)權(quán)重分發(fā)各個(gè)Locust類占有的并發(fā)數(shù)量bucket,amount為總并發(fā)數(shù)
"""
bucket = []
# 計(jì)算權(quán)重之和
weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))
for locust in self.locust_classes:
if not locust.task_set:
warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)
continue
if self.host is not None:
locust.host = self.host
if stop_timeout is not None:
locust.stop_timeout = stop_timeout
# 根據(jù)每個(gè)locust請(qǐng)求的權(quán)重計(jì)算所占的比例
percent = locust.weight / float(weight_sum)
# 計(jì)算出每個(gè)locust請(qǐng)求需要多少個(gè)并發(fā)數(shù)
num_locusts = int(round(amount * percent))
bucket.extend([locust for x in xrange(0, num_locusts)])
return bucket
def spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):
"""
執(zhí)行壓力測試并發(fā)任務(wù)
spawn_count: 并發(fā)數(shù)
stop_timeout:超時(shí)時(shí)間
wait: task任務(wù)執(zhí)行間隔
"""
# 如果沒有傳入spawn_count參數(shù)則使用命令行傳入的并發(fā)數(shù)
if spawn_count is None:
spawn_count = self.num_clients
#獲取任務(wù)并發(fā)數(shù)
bucket = self.weight_locusts(spawn_count, stop_timeout)
spawn_count = len(bucket)
#如果是首次啟動(dòng)/重啟性能測試,并發(fā)數(shù)等于傳入的spawn_count
if self.state == STATE_INIT or self.state == STATE_STOPPED:
self.state = STATE_HATCHING
self.num_clients = spawn_count
else: #如果當(dāng)前性能測試已經(jīng)啟動(dòng),則疊加spawn_count并發(fā)數(shù)
self.num_clients += spawn_count
logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))
#獲取每一個(gè)task任務(wù)
occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])
def hatch():
sleep_time = 1.0 / self.hatch_rate #用戶并發(fā)間隔,為每秒新增請(qǐng)求數(shù)的倒數(shù)
while True:
if not bucket:
# 當(dāng)bucket為空時(shí),表示增壓已經(jīng)完成
logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
events.hatch_complete.fire(user_count=self.num_clients) #任務(wù)條目全部執(zhí)行完,hatch_complete觸發(fā)相應(yīng)的事件處理Hook
return
# 當(dāng)bucket不為空時(shí),表示仍然需要繼續(xù)增加壓力。
locust = bucket.pop(random.randint(0, len(bucket)-1)) #從并發(fā)任務(wù)中隨機(jī)抽取一個(gè)執(zhí)行
occurence_count[locust.__name__] += 1 #將被執(zhí)行的任務(wù)+1
def start_locust(_):
try:
locust().run(runner=self) #執(zhí)行任務(wù)
except GreenletExit:
pass
new_locust = self.locusts.spawn(start_locust, locust)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
gevent.sleep(sleep_time)
hatch() #執(zhí)行壓力測試
if wait: #如果添加了wait參數(shù),則暫停所有的locust
self.locusts.join()
logger.info("All locusts dead\n")
def kill_locusts(self, kill_count):
"""
終止kill_count指定的并發(fā)任務(wù),并從當(dāng)前的Group中減少并發(fā)數(shù)目
"""
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
for g in self.locusts:
for l in bucket:
if l == g.args[0]:
dying.append(g)
bucket.remove(l)
break
for g in dying:
self.locusts.killone(g)
events.hatch_complete.fire(user_count=self.num_clients)
def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
"""
啟動(dòng)性能測試
locust_count:并發(fā)數(shù)量
hatch_rate:每秒啟動(dòng)并發(fā)數(shù)
"""
if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
self.stats.clear_all() # 清空之前存儲(chǔ)的性能測試狀態(tài)數(shù)據(jù)
self.stats.start_time = time() # 記錄當(dāng)前時(shí)間為開始時(shí)間
self.exceptions = {} # exception信息清空
events.locust_start_hatching.fire() # 觸發(fā)start_hatching事件
# 動(dòng)態(tài)修改當(dāng)前并發(fā)的用戶
if self.state != STATE_INIT and self.state != STATE_STOPPED: # 當(dāng)前的狀態(tài)不是hatching和running時(shí)
self.state = STATE_HATCHING # 當(dāng)前狀態(tài)不是ready或stop時(shí),將當(dāng)前的狀態(tài)設(shè)置為hatching
if self.num_clients > locust_count: # 如果目前的并發(fā)數(shù)超過需要的并發(fā)數(shù),則kill多余的并發(fā)數(shù)
kill_count = self.num_clients - locust_count
self.kill_locusts(kill_count)
elif self.num_clients < locust_count:
# 如果當(dāng)前并發(fā)數(shù)小于實(shí)際需要的并發(fā)數(shù),根據(jù)設(shè)置的每秒啟動(dòng)數(shù)量來繼續(xù)創(chuàng)建缺少的并發(fā)數(shù)
if hatch_rate:
self.hatch_rate = hatch_rate
spawn_count = locust_count - self.num_clients
self.spawn_locusts(spawn_count=spawn_count)
else: #如果并發(fā)數(shù)和實(shí)際需求的并發(fā)數(shù)相同,則表示壓力測試完成
events.hatch_complete.fire(user_count=self.num_clients)
# 如果當(dāng)前的狀態(tài)還是ready或stop時(shí),則調(diào)用spawn_locusts需要傳遞wait參數(shù)
else:
if hatch_rate:
self.hatch_rate = hatch_rate
if locust_count is not None:
self.spawn_locusts(locust_count, wait=wait)
else:
self.spawn_locusts(wait=wait)
def stop(self):
# 停止壓測時(shí),如果當(dāng)前還在增壓狀態(tài)下,首先需要停止增壓
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
# 停止當(dāng)前的所有施壓單元并修改狀態(tài)為stop
self.locusts.kill(block=True)
self.state = STATE_STOPPED
events.locust_stop_hatching.fire()
def quit(self):
# 退出時(shí)首先調(diào)用stop,再停止所有g(shù)reenlet
self.stop()
self.greenlet.kill(block=True)
def log_exception(self, node_id, msg, formatted_tb):
# 記錄異常值
key = hash(formatted_tb)
row = self.exceptions.setdefault(key, {"count": 0, "msg": msg, "traceback": formatted_tb, "nodes": set()})
row["count"] += 1
row["nodes"].add(node_id)
self.exceptions[key] = row
LocalLocustRunner
LocalLocustRunner是LocustRunner的擴(kuò)展類
class LocalLocustRunner(LocustRunner):
"""
當(dāng)options參數(shù)中沒有傳入master和slave時(shí),默認(rèn)使用本地執(zhí)行模式
"""
def __init__(self, locust_classes, options):
# 繼承LocalLocustRunner類的全部功能
super(LocalLocustRunner, self).__init__(locust_classes, options)
# 增加一個(gè)監(jiān)聽器用于監(jiān)控exception
def on_locust_error(locust_instance, exception, tb):
formatted_tb = "".join(traceback.format_tb(tb))
self.log_exception("local", str(exception), formatted_tb)
events.locust_error += on_locust_error
def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
# 調(diào)用LocustRunner中的start_hatching方法后得到的Group賦給greenlet
self.hatching_greenlet = gevent.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate, wait=wait))
self.greenlet = self.hatching_greenlet
DistributedLocustRunner
設(shè)置master節(jié)點(diǎn)的host地址/端口號(hào)/綁定地址/綁定端口,是MasterLocustRunner和SlaveLocustRunner類的基類
class DistributedLocustRunner(LocustRunner):
def __init__(self, locust_classes, options):
# 繼承LocustRunner類的全部功能,并設(shè)置host/port/bind_host/bind_port
super(DistributedLocustRunner, self).__init__(locust_classes, options)
self.master_host = options.master_host #master節(jié)點(diǎn)host
self.master_port = options.master_port
self.master_bind_host = options.master_bind_host
self.master_bind_port = options.master_bind_port
MasterLocustRunner
分布式壓測master節(jié)點(diǎn)任務(wù)分發(fā),slave節(jié)點(diǎn)狀態(tài)控制,并發(fā)數(shù)計(jì)算,以及性能測試數(shù)據(jù)收集
class MasterLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
super(MasterLocustRunner, self).__init__(*args, **kwargs)
class SlaveNodesDict(dict):
# 設(shè)置salve節(jié)點(diǎn)的壓測狀態(tài)
def get_by_state(self, state):
return [c for c in six.itervalues(self) if c.state == state]
# 設(shè)置全部slave節(jié)點(diǎn)狀態(tài)為ready
@property
def ready(self):
return self.get_by_state(STATE_INIT)
# 設(shè)置全部slave節(jié)點(diǎn)狀態(tài)為加壓中
@property
def hatching(self):
return self.get_by_state(STATE_HATCHING)
# 設(shè)置全部slave節(jié)點(diǎn)狀態(tài)為壓測中
@property
def running(self):
return self.get_by_state(STATE_RUNNING)
self.clients = SlaveNodesDict()
# 綁定master節(jié)點(diǎn)遠(yuǎn)程rpc調(diào)用服務(wù)器
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)
# 添加監(jiān)聽器收集slave節(jié)點(diǎn)用戶并發(fā)數(shù)量
def on_slave_report(client_id, data):
if client_id not in self.clients:
logger.info("Discarded report from unrecognized slave %s", client_id)
return
self.clients[client_id].user_count = data["user_count"]
events.slave_report += on_slave_report
# 添加監(jiān)聽器發(fā)送quit信息到各個(gè)slave節(jié)點(diǎn)
def on_quitting():
self.quit()
events.quitting += on_quitting
#user_count屬性,用以返回各slave節(jié)點(diǎn)并發(fā)數(shù)之和
@property
def user_count(self):
return sum([c.user_count for c in six.itervalues(self.clients)])
#開始施壓
def start_hatching(self, locust_count, hatch_rate):
"""
locust_count: 并發(fā)任務(wù)數(shù)量
hatch_rate:每秒并發(fā)數(shù)
"""
#空閑slave節(jié)點(diǎn)數(shù)=ready狀態(tài)+running狀態(tài)之和
num_slaves = len(self.clients.ready) + len(self.clients.running)
if not num_slaves:
logger.warning("You are running in distributed mode but have no slave servers connected. "
"Please connect slaves prior to swarming.")
return
self.num_clients = locust_count # 并發(fā)總量
slave_num_clients = locust_count // (num_slaves or 1) #每個(gè)slave節(jié)點(diǎn)需要執(zhí)行的并發(fā)數(shù)
slave_hatch_rate = float(hatch_rate) / (num_slaves or 1) #每個(gè)slave節(jié)點(diǎn)的每秒并發(fā)數(shù)
remaining = locust_count % num_slaves #未分配到salve節(jié)點(diǎn)的并發(fā)數(shù)
logger.info("Sending hatch jobs to %d ready clients", num_slaves)
if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
self.stats.clear_all() #重置性能測試數(shù)據(jù)
self.exceptions = {}
events.master_start_hatching.fire() #開始施加壓力
for client in six.itervalues(self.clients):
data = {
"hatch_rate":slave_hatch_rate,
"num_clients":slave_num_clients,
"host":self.host,
"stop_timeout":None
}
# 未分配的并發(fā)數(shù)再依次添加到各個(gè)slave上
if remaining > 0:
data["num_clients"] += 1
remaining -= 1
self.server.send(Message("hatch", data, None)) #master節(jié)點(diǎn)發(fā)送數(shù)據(jù)data到slave
#壓測開始時(shí)間
self.stats.start_time = time()
self.state = STATE_HATCHING
#將加壓中和運(yùn)行中的壓力測試停止
def stop(self):
for client in self.clients.hatching + self.clients.running:
self.server.send(Message("stop", None, None)) #發(fā)送停止指令到slave節(jié)點(diǎn)
events.master_stop_hatching.fire() #觸發(fā)停止任務(wù)hook
#中止壓測
def quit(self):
for client in six.itervalues(self.clients):
self.server.send(Message("quit", None, None))
self.greenlet.kill(block=True)
#獲取master節(jié)點(diǎn)發(fā)送的數(shù)據(jù)
def client_listener(self):
while True:
msg = self.server.recv()
if msg.type == "client_ready": #如果msg消息類型為client_ready,從cliens列表中刪除slave節(jié)點(diǎn)
id = msg.node_id
self.clients[id] = SlaveNode(id)
logger.info("Client %r reported as ready. Currently %i clients ready to swarm." % (id, len(self.clients.ready)))
elif msg.type == "client_stopped": #如果msg消息類型為client_stopped,從cliens列表中刪除slave節(jié)點(diǎn)
del self.clients[msg.node_id]
if len(self.clients.hatching + self.clients.running) == 0: #如果沒有slave節(jié)點(diǎn)處于hatching/running狀態(tài),則標(biāo)記壓測狀態(tài)為STATE_STOPPED
self.state = STATE_STOPPED
logger.info("Removing %s client from running clients" % (msg.node_id))
elif msg.type == "stats": #如果msg消息類型為stats,則計(jì)算各slave節(jié)點(diǎn)并發(fā)數(shù)之和
events.slave_report.fire(client_id=msg.node_id, data=msg.data)
elif msg.type == "hatching": #如果msg消息類型為hatching,則標(biāo)記slave節(jié)點(diǎn)狀態(tài)為STATE_HATCHING
self.clients[msg.node_id].state = STATE_HATCHING
elif msg.type == "hatch_complete": #如果msg消息類型為hatch_complete,標(biāo)記slave狀態(tài)為STATE_RUNNING
self.clients[msg.node_id].state = STATE_RUNNING
self.clients[msg.node_id].user_count = msg.data["count"] #獲取并發(fā)數(shù)
if len(self.clients.hatching) == 0:
count = sum(c.user_count for c in six.itervalues(self.clients)) #計(jì)算所有slave節(jié)點(diǎn)的并發(fā)數(shù)之和
events.hatch_complete.fire(user_count=count) #記錄并發(fā)數(shù)
elif msg.type == "quit": #如果msg消息類型為quit,則結(jié)束壓測任務(wù)
if msg.node_id in self.clients:
del self.clients[msg.node_id]
logger.info("Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready)))
elif msg.type == "exception":
self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])
# slave_count屬性,返回ready/hatching/running狀態(tài)slave節(jié)點(diǎn)之和
@property
def slave_count(self):
return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)
SlaveLocustRunner
slave節(jié)點(diǎn)任務(wù)執(zhí)行
class SlaveLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
super(SlaveLocustRunner, self).__init__(*args, **kwargs)
# 根據(jù)hostname,當(dāng)前時(shí)間戳和隨機(jī)數(shù)生成唯一client_id
self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0,10000)).encode('utf-8')).hexdigest()
# 連接rpc服務(wù)端,即master節(jié)點(diǎn)
self.client = rpc.Client(self.master_host, self.master_port)
self.greenlet = Group() #協(xié)程任務(wù)池
# 執(zhí)行任務(wù)
self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
# slave節(jié)點(diǎn)發(fā)送當(dāng)前狀態(tài)client_ready和client_id到master
self.client.send(Message("client_ready", None, self.client_id))
# salve發(fā)送性能測試狀態(tài)數(shù)據(jù)到master
self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)
# 增加監(jiān)聽器,當(dāng)任務(wù)完成時(shí)發(fā)送hatch_complete狀態(tài)/并發(fā)數(shù)/client_id到master
def on_hatch_complete(user_count):
self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))
events.hatch_complete += on_hatch_complete
# 增加監(jiān)聽器,發(fā)送salve節(jié)點(diǎn)并發(fā)數(shù)到master
def on_report_to_master(client_id, data):
data["user_count"] = self.user_count
events.report_to_master += on_report_to_master
# 增加監(jiān)聽器,發(fā)送quit消息到master
def on_quitting():
self.client.send(Message("quit", None, self.client_id))
events.quitting += on_quitting
# 增加監(jiān)聽器,發(fā)送異常消息到master
def on_locust_error(locust_instance, exception, tb):
formatted_tb = "".join(traceback.format_tb(tb))
self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id))
events.locust_error += on_locust_error
# slave節(jié)點(diǎn)執(zhí)行壓測任務(wù)
def worker(self):
while True:
msg = self.client.recv()
if msg.type == "hatch": # 收到hatch指令,根據(jù)master傳遞的data,施壓
self.client.send(Message("hatching", None, self.client_id))
job = msg.data
self.hatch_rate = job["hatch_rate"]
#self.num_clients = job["num_clients"]
self.host = job["host"]
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
elif msg.type == "stop": # 收到stop指令,停止壓測,并發(fā)送slave狀態(tài)到master
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
self.client.send(Message("client_ready", None, self.client_id))
elif msg.type == "quit": # 收到quit指令,停止壓測,并強(qiáng)制kill壓測協(xié)程
logger.info("Got quit message from master, shutting down...")
self.stop()
self.greenlet.kill(block=True)
# 發(fā)送性能數(shù)據(jù)到master
def stats_reporter(self):
while True:
data = {}
events.report_to_master.fire(client_id=self.client_id, data=data)
try:
self.client.send(Message("stats", data, self.client_id))
except:
logger.error("Connection lost to master server. Aborting...")
break
gevent.sleep(SLAVE_REPORT_INTERVAL)
參考文獻(xiàn):
https://www.missshi.cn/api/view/blog/5a0aef86483c561314000002
總結(jié)
以上是生活随笔為你收集整理的python locust 时间戳过期_Locust源码分析之runners.py模块(6)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java播放swf文件_Java-如何在
- 下一篇: python--jupyter note